This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit febe8e30bcf933647221248278068a4a6ecc3c5e
Author: Chesnay Schepler <[email protected]>
AuthorDate: Wed Jun 17 13:08:36 2020 +0200

    [FLINK-18343][e2e] Refactor file-line replacement into separate method
---
 .../util/kafka/LocalStandaloneKafkaResource.java   | 28 ++++----
 .../apache/flink/tests/util/util/FileUtils.java    | 58 +++++++++++++++++
 .../flink/tests/util/util/FileUtilsTest.java       | 76 ++++++++++++++++++++++
 3 files changed, 145 insertions(+), 17 deletions(-)

diff --git 
a/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/main/java/org/apache/flink/tests/util/kafka/LocalStandaloneKafkaResource.java
 
b/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/main/java/org/apache/flink/tests/util/kafka/LocalStandaloneKafkaResource.java
index e1a3ff4..06af5be 100644
--- 
a/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/main/java/org/apache/flink/tests/util/kafka/LocalStandaloneKafkaResource.java
+++ 
b/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/main/java/org/apache/flink/tests/util/kafka/LocalStandaloneKafkaResource.java
@@ -24,6 +24,7 @@ import org.apache.flink.tests.util.CommandLineWrapper;
 import org.apache.flink.tests.util.TestUtils;
 import org.apache.flink.tests.util.activation.OperatingSystemRestriction;
 import org.apache.flink.tests.util.cache.DownloadCache;
+import org.apache.flink.tests.util.util.FileUtils;
 import org.apache.flink.util.OperatingSystem;
 
 import org.junit.rules.TemporaryFolder;
@@ -33,15 +34,12 @@ import org.slf4j.LoggerFactory;
 import javax.annotation.Nullable;
 
 import java.io.IOException;
-import java.io.OutputStreamWriter;
 import java.io.PrintStream;
-import java.io.PrintWriter;
 import java.net.InetSocketAddress;
 import java.nio.charset.StandardCharsets;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.Paths;
-import java.nio.file.StandardOpenOption;
 import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -115,22 +113,18 @@ public class LocalStandaloneKafkaResource implements 
KafkaResource {
                        .build());
 
                LOG.info("Updating ZooKeeper properties");
-               final Path zookeeperPropertiesFile = 
kafkaDir.resolve(Paths.get("config", "zookeeper.properties"));
-               final List<String> zookeeperPropertiesFileLines = 
Files.readAllLines(zookeeperPropertiesFile);
-               try (PrintWriter pw = new PrintWriter(new 
OutputStreamWriter(Files.newOutputStream(zookeeperPropertiesFile, 
StandardOpenOption.TRUNCATE_EXISTING), StandardCharsets.UTF_8.name()))) {
-                       zookeeperPropertiesFileLines.stream()
-                               .map(line -> 
ZK_DATA_DIR_PATTERN.matcher(line).replaceAll("$1" + 
kafkaDir.resolve("zookeeper").toAbsolutePath()))
-                               .forEachOrdered(pw::println);
-               }
+               FileUtils.replace(
+                       kafkaDir.resolve(Paths.get("config", 
"zookeeper.properties")),
+                       ZK_DATA_DIR_PATTERN,
+                       matcher -> matcher.replaceAll("$1" + 
kafkaDir.resolve("zookeeper").toAbsolutePath())
+               );
 
                LOG.info("Updating Kafka properties");
-               final Path kafkaPropertiesFile = 
kafkaDir.resolve(Paths.get("config", "server.properties"));
-               final List<String> kafkaPropertiesFileLines = 
Files.readAllLines(kafkaPropertiesFile);
-               try (PrintWriter pw = new PrintWriter(new 
OutputStreamWriter(Files.newOutputStream(kafkaPropertiesFile, 
StandardOpenOption.TRUNCATE_EXISTING), StandardCharsets.UTF_8.name()))) {
-                       kafkaPropertiesFileLines.stream()
-                               .map(line -> 
KAFKA_LOG_DIR_PATTERN.matcher(line).replaceAll("$1" + 
kafkaDir.resolve("kafka").toAbsolutePath()))
-                               .forEachOrdered(pw::println);
-               }
+               FileUtils.replace(
+                       kafkaDir.resolve(Paths.get("config", 
"server.properties")),
+                       KAFKA_LOG_DIR_PATTERN,
+                       matcher -> matcher.replaceAll("$1" + 
kafkaDir.resolve("kafka").toAbsolutePath())
+               );
        }
 
        private void setupKafkaCluster() throws IOException {
diff --git 
a/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/util/FileUtils.java
 
b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/util/FileUtils.java
new file mode 100644
index 0000000..acfa882
--- /dev/null
+++ 
b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/util/FileUtils.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.tests.util.util;
+
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.io.PrintWriter;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.StandardOpenOption;
+import java.util.List;
+import java.util.function.Function;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/**
+ * Collection of file-related utilities.
+ */
+public class FileUtils {
+
+       /**
+        * Matches the given {@link Pattern} against all lines in the given 
file, and replaces all matches with the replacement
+        * generated by the given {@link Function}.
+        * All unmatched lines and provided replacements are written into the 
file, with the order corresponding to the
+        * original content. Newlines are automatically added to each line; 
this implies that an empty replacement string
+        * will result in an empty line to be written.
+        */
+       public static void replace(Path file, Pattern pattern, 
Function<Matcher, String> replacer) throws IOException {
+               final List<String> fileLines = Files.readAllLines(file);
+               try (PrintWriter pw = new PrintWriter(new 
OutputStreamWriter(Files.newOutputStream(file, 
StandardOpenOption.TRUNCATE_EXISTING), StandardCharsets.UTF_8.name()))) {
+                       for (String fileLine : fileLines) {
+                               Matcher matcher = pattern.matcher(fileLine);
+                               if (matcher.matches()) {
+                                       String replacement = 
replacer.apply(matcher);
+                                       pw.println(replacement);
+                               } else {
+                                       pw.println(fileLine);
+                               }
+                       }
+               }
+       }
+}
diff --git 
a/flink-end-to-end-tests/flink-end-to-end-tests-common/src/test/java/org/apache/flink/tests/util/util/FileUtilsTest.java
 
b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/test/java/org/apache/flink/tests/util/util/FileUtilsTest.java
new file mode 100644
index 0000000..43eadab
--- /dev/null
+++ 
b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/test/java/org/apache/flink/tests/util/util/FileUtilsTest.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.tests.util.util;
+
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.regex.Pattern;
+
+/**
+ * Tests for {@link FileUtils}.
+ */
+public class FileUtilsTest extends TestLogger {
+
+       @ClassRule
+       public static final TemporaryFolder TMP = new TemporaryFolder();
+
+       private static final List<String> ORIGINAL_LINES = 
Collections.unmodifiableList(Arrays.asList("line1", "line2", "line3"));
+       private Path testFile;
+
+       @Before
+       public void setupFile() throws IOException {
+               Path path = TMP.newFile().toPath();
+
+               Files.write(path, ORIGINAL_LINES);
+
+               testFile = path;
+       }
+
+       @Test
+       public void replaceSingleMatch() throws IOException {
+               FileUtils.replace(testFile, Pattern.compile("line1"), matcher 
-> "removed");
+
+               Assert.assertEquals(Arrays.asList("removed", 
ORIGINAL_LINES.get(1), ORIGINAL_LINES.get(2)), Files.readAllLines(testFile));
+       }
+
+       @Test
+       public void replaceMultipleMatch() throws IOException {
+               FileUtils.replace(testFile, Pattern.compile("line(.*)"), 
matcher -> matcher.group(1));
+
+               Assert.assertEquals(Arrays.asList("1", "2", "3"), 
Files.readAllLines(testFile));
+       }
+
+       @Test
+       public void replaceWithEmptyLine() throws IOException {
+               FileUtils.replace(testFile, Pattern.compile("line2"), matcher 
-> "");
+
+               Assert.assertEquals(Arrays.asList(ORIGINAL_LINES.get(0), "", 
ORIGINAL_LINES.get(2)), Files.readAllLines(testFile));
+       }
+}

Reply via email to