This is an automated email from the ASF dual-hosted git repository. chesnay pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit febe8e30bcf933647221248278068a4a6ecc3c5e Author: Chesnay Schepler <[email protected]> AuthorDate: Wed Jun 17 13:08:36 2020 +0200 [FLINK-18343][e2e] Refactor file-line replacement into separate method --- .../util/kafka/LocalStandaloneKafkaResource.java | 28 ++++---- .../apache/flink/tests/util/util/FileUtils.java | 58 +++++++++++++++++ .../flink/tests/util/util/FileUtilsTest.java | 76 ++++++++++++++++++++++ 3 files changed, 145 insertions(+), 17 deletions(-) diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/main/java/org/apache/flink/tests/util/kafka/LocalStandaloneKafkaResource.java b/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/main/java/org/apache/flink/tests/util/kafka/LocalStandaloneKafkaResource.java index e1a3ff4..06af5be 100644 --- a/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/main/java/org/apache/flink/tests/util/kafka/LocalStandaloneKafkaResource.java +++ b/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/main/java/org/apache/flink/tests/util/kafka/LocalStandaloneKafkaResource.java @@ -24,6 +24,7 @@ import org.apache.flink.tests.util.CommandLineWrapper; import org.apache.flink.tests.util.TestUtils; import org.apache.flink.tests.util.activation.OperatingSystemRestriction; import org.apache.flink.tests.util.cache.DownloadCache; +import org.apache.flink.tests.util.util.FileUtils; import org.apache.flink.util.OperatingSystem; import org.junit.rules.TemporaryFolder; @@ -33,15 +34,12 @@ import org.slf4j.LoggerFactory; import javax.annotation.Nullable; import java.io.IOException; -import java.io.OutputStreamWriter; import java.io.PrintStream; -import java.io.PrintWriter; import java.net.InetSocketAddress; import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; -import java.nio.file.StandardOpenOption; import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; @@ -115,22 +113,18 @@ public class LocalStandaloneKafkaResource implements KafkaResource { .build()); LOG.info("Updating ZooKeeper properties"); - final Path zookeeperPropertiesFile = kafkaDir.resolve(Paths.get("config", "zookeeper.properties")); - final List<String> zookeeperPropertiesFileLines = Files.readAllLines(zookeeperPropertiesFile); - try (PrintWriter pw = new PrintWriter(new OutputStreamWriter(Files.newOutputStream(zookeeperPropertiesFile, StandardOpenOption.TRUNCATE_EXISTING), StandardCharsets.UTF_8.name()))) { - zookeeperPropertiesFileLines.stream() - .map(line -> ZK_DATA_DIR_PATTERN.matcher(line).replaceAll("$1" + kafkaDir.resolve("zookeeper").toAbsolutePath())) - .forEachOrdered(pw::println); - } + FileUtils.replace( + kafkaDir.resolve(Paths.get("config", "zookeeper.properties")), + ZK_DATA_DIR_PATTERN, + matcher -> matcher.replaceAll("$1" + kafkaDir.resolve("zookeeper").toAbsolutePath()) + ); LOG.info("Updating Kafka properties"); - final Path kafkaPropertiesFile = kafkaDir.resolve(Paths.get("config", "server.properties")); - final List<String> kafkaPropertiesFileLines = Files.readAllLines(kafkaPropertiesFile); - try (PrintWriter pw = new PrintWriter(new OutputStreamWriter(Files.newOutputStream(kafkaPropertiesFile, StandardOpenOption.TRUNCATE_EXISTING), StandardCharsets.UTF_8.name()))) { - kafkaPropertiesFileLines.stream() - .map(line -> KAFKA_LOG_DIR_PATTERN.matcher(line).replaceAll("$1" + kafkaDir.resolve("kafka").toAbsolutePath())) - .forEachOrdered(pw::println); - } + FileUtils.replace( + kafkaDir.resolve(Paths.get("config", "server.properties")), + KAFKA_LOG_DIR_PATTERN, + matcher -> matcher.replaceAll("$1" + kafkaDir.resolve("kafka").toAbsolutePath()) + ); } private void setupKafkaCluster() throws IOException { diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/util/FileUtils.java b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/util/FileUtils.java new file mode 100644 index 0000000..acfa882 --- /dev/null +++ b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/util/FileUtils.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.tests.util.util; + +import java.io.IOException; +import java.io.OutputStreamWriter; +import java.io.PrintWriter; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.StandardOpenOption; +import java.util.List; +import java.util.function.Function; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +/** + * Collection of file-related utilities. + */ +public class FileUtils { + + /** + * Matches the given {@link Pattern} against all lines in the given file, and replaces all matches with the replacement + * generated by the given {@link Function}. + * All unmatched lines and provided replacements are written into the file, with the order corresponding to the + * original content. Newlines are automatically added to each line; this implies that an empty replacement string + * will result in an empty line to be written. + */ + public static void replace(Path file, Pattern pattern, Function<Matcher, String> replacer) throws IOException { + final List<String> fileLines = Files.readAllLines(file); + try (PrintWriter pw = new PrintWriter(new OutputStreamWriter(Files.newOutputStream(file, StandardOpenOption.TRUNCATE_EXISTING), StandardCharsets.UTF_8.name()))) { + for (String fileLine : fileLines) { + Matcher matcher = pattern.matcher(fileLine); + if (matcher.matches()) { + String replacement = replacer.apply(matcher); + pw.println(replacement); + } else { + pw.println(fileLine); + } + } + } + } +} diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-common/src/test/java/org/apache/flink/tests/util/util/FileUtilsTest.java b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/test/java/org/apache/flink/tests/util/util/FileUtilsTest.java new file mode 100644 index 0000000..43eadab --- /dev/null +++ b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/test/java/org/apache/flink/tests/util/util/FileUtilsTest.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.tests.util.util; + +import org.apache.flink.util.TestLogger; + +import org.junit.Assert; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.regex.Pattern; + +/** + * Tests for {@link FileUtils}. + */ +public class FileUtilsTest extends TestLogger { + + @ClassRule + public static final TemporaryFolder TMP = new TemporaryFolder(); + + private static final List<String> ORIGINAL_LINES = Collections.unmodifiableList(Arrays.asList("line1", "line2", "line3")); + private Path testFile; + + @Before + public void setupFile() throws IOException { + Path path = TMP.newFile().toPath(); + + Files.write(path, ORIGINAL_LINES); + + testFile = path; + } + + @Test + public void replaceSingleMatch() throws IOException { + FileUtils.replace(testFile, Pattern.compile("line1"), matcher -> "removed"); + + Assert.assertEquals(Arrays.asList("removed", ORIGINAL_LINES.get(1), ORIGINAL_LINES.get(2)), Files.readAllLines(testFile)); + } + + @Test + public void replaceMultipleMatch() throws IOException { + FileUtils.replace(testFile, Pattern.compile("line(.*)"), matcher -> matcher.group(1)); + + Assert.assertEquals(Arrays.asList("1", "2", "3"), Files.readAllLines(testFile)); + } + + @Test + public void replaceWithEmptyLine() throws IOException { + FileUtils.replace(testFile, Pattern.compile("line2"), matcher -> ""); + + Assert.assertEquals(Arrays.asList(ORIGINAL_LINES.get(0), "", ORIGINAL_LINES.get(2)), Files.readAllLines(testFile)); + } +}
