jeyhunkarimov commented on code in PR #24776: URL: https://github.com/apache/flink/pull/24776#discussion_r1610462318
########## flink-end-to-end-tests/flink-stream-sql-test/src/test/java/org/apache/flink/sql/tests/StreamSQLTestProgramScalaPlannerITCase.java: ########## @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.sql.tests; + +import org.apache.flink.connector.testframe.container.FlinkContainers; +import org.apache.flink.connector.testframe.container.FlinkContainersSettings; +import org.apache.flink.connector.testframe.container.TestcontainersSettings; + +import org.junit.jupiter.api.AfterAll; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +class StreamSQLTestProgramScalaPlannerITCase extends AbstractStreamSQLTestProgramITCase { + private static final Logger LOGGER = + LoggerFactory.getLogger(StreamSQLTestProgramScalaPlannerITCase.class); + private static final String DIST_DIR = System.getProperty("distDir"); + + @Override + protected FlinkContainers createFlinkContainers() { Review Comment: We need to ensure that `DIST_DIR ` is not null and is not empty? ########## flink-end-to-end-tests/flink-stream-sql-test/src/test/java/org/apache/flink/sql/tests/StreamSQLTestProgramScalaPlannerITCase.java: ########## @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.sql.tests; + +import org.apache.flink.connector.testframe.container.FlinkContainers; +import org.apache.flink.connector.testframe.container.FlinkContainersSettings; +import org.apache.flink.connector.testframe.container.TestcontainersSettings; + +import org.junit.jupiter.api.AfterAll; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +class StreamSQLTestProgramScalaPlannerITCase extends AbstractStreamSQLTestProgramITCase { + private static final Logger LOGGER = + LoggerFactory.getLogger(StreamSQLTestProgramScalaPlannerITCase.class); + private static final String DIST_DIR = System.getProperty("distDir"); + + @Override + protected FlinkContainers createFlinkContainers() { + // Swap planner jar files in `lib/` and `opt/` folders + swapPlannerLoaderWithPlannerScala(); + + return FlinkContainers.builder() + .withFlinkContainersSettings( + FlinkContainersSettings.builder().numTaskManagers(4).build()) + .withTestcontainersSettings( + TestcontainersSettings.builder().network(NETWORK).logger(LOGGER).build()) + .build(); + } + + @AfterAll + static void afterAll() { + swapPlannerScalaWithPlannerLoader(); + } + + private void swapPlannerLoaderWithPlannerScala() { Review Comment: Maybe we can invoke this with `@BeforeAll` instead of doing in constructor? ########## flink-test-utils-parent/flink-connector-test-utils/src/test/java/org/apache/flink/connector/testframe/container/FlinkImageBuilderTest.java: ########## @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.testframe.container; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.TaskManagerOptions; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.startupcheck.OneShotStartupCheckStrategy; +import org.testcontainers.images.builder.ImageFromDockerfile; + +import java.nio.file.Path; + +import static org.assertj.core.api.Assertions.assertThat; + +class FlinkImageBuilderTest { + FlinkContainersSettings settings; + Configuration conf; + + @TempDir Path tempDir; + + @BeforeEach + void beforeEach() { + settings = FlinkContainersSettings.builder().numTaskManagers(2).build(); + conf = settings.getFlinkConfig(); + } + + @Test + void testFromDistJobManager() throws ImageBuildException { + ImageFromDockerfile image = + new FlinkImageBuilder() + .setFlinkDistPath(tempDir) + .setTempDirectory(tempDir) + .setConfiguration(conf) + .setLogProperties(settings.getLogProperties()) + .setBaseImage(settings.getBaseImage()) + .asJobManager() + .build(); + assertThat(image.getDockerImageName()).isEqualTo("flink-configured-jobmanager"); + try (final GenericContainer<?> container = new GenericContainer<>(image)) { + container + .withStartupCheckStrategy(new OneShotStartupCheckStrategy()) + .withCommand("cat", "/opt/flink/conf/config.yaml") + .start(); + assertThat(container.getLogs()).contains("rpc:\n address: jobmanager"); + } + } + + @Test + void testFromDistTaskManager() throws ImageBuildException { + ImageFromDockerfile image = + new FlinkImageBuilder() + .setFlinkDistPath(tempDir) + .setTempDirectory(tempDir) + .setConfiguration(conf) + .setLogProperties(settings.getLogProperties()) + .setBaseImage(settings.getBaseImage()) + .asTaskManager() + .build(); + assertThat(image.getDockerImageName()).isEqualTo("flink-configured-taskmanager"); + try (final GenericContainer<?> container = new GenericContainer<>(image)) { + container + .withStartupCheckStrategy(new OneShotStartupCheckStrategy()) + .withCommand("cat", "/opt/flink/conf/config.yaml") + .start(); + assertThat(container.getLogs()).contains("rpc:\n address: jobmanager"); + } + } + + @Test + void testFromDistTaskManagerWithHostParameter() throws ImageBuildException { + final Configuration taskManagerConf = new Configuration(); + taskManagerConf.addAll(conf); + taskManagerConf.set(TaskManagerOptions.HOST, "taskmanager1-host"); + ImageFromDockerfile image = + new FlinkImageBuilder() + .setFlinkDistPath(tempDir) + .setTempDirectory(tempDir) + .setConfiguration(taskManagerConf) + .setLogProperties(settings.getLogProperties()) + .setBaseImage(settings.getBaseImage()) + .asTaskManager() + .build(); + assertThat(image.getDockerImageName()).isEqualTo("flink-configured-taskmanager"); + try (final GenericContainer<?> container = new GenericContainer<>(image)) { + container + .withStartupCheckStrategy(new OneShotStartupCheckStrategy()) + .withCommand("cat", "/opt/flink/conf/config.yaml") + .start(); + assertThat(container.getLogs()).contains("host: taskmanager1-host"); + } + } + + @Test + void testFromDistWithLog4jFile() throws ImageBuildException { + ImageFromDockerfile image = + new FlinkImageBuilder() + .setFlinkDistPath(tempDir) + .setTempDirectory(tempDir) + .setConfiguration(conf) + .setLogProperties(settings.getLogProperties()) + .setBaseImage(settings.getBaseImage()) + .asJobManager() + .build(); + try (final GenericContainer<?> container = new GenericContainer<>(image)) { + container + .withStartupCheckStrategy(new OneShotStartupCheckStrategy()) + .withCommand("cat", "/opt/flink/conf/log4j-console.properties") + .start(); + System.out.println(container.getLogs()); + assertThat(container.getLogs()).contains("appender.console.name=ConsoleAppender"); + } + } + + @Test + void testJobManagerWithBaseImage() throws ImageBuildException { + final FlinkContainersSettings settings = Review Comment: You already have `settings` and `conf` as a instance variables, maybe you can also make two of them (as instance variable) with explicit names? ########## flink-end-to-end-tests/flink-stream-sql-test/src/test/java/org/apache/flink/sql/tests/StreamSQLTestProgramScalaPlannerITCase.java: ########## @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.sql.tests; + +import org.apache.flink.connector.testframe.container.FlinkContainers; +import org.apache.flink.connector.testframe.container.FlinkContainersSettings; +import org.apache.flink.connector.testframe.container.TestcontainersSettings; + +import org.junit.jupiter.api.AfterAll; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +class StreamSQLTestProgramScalaPlannerITCase extends AbstractStreamSQLTestProgramITCase { + private static final Logger LOGGER = + LoggerFactory.getLogger(StreamSQLTestProgramScalaPlannerITCase.class); + private static final String DIST_DIR = System.getProperty("distDir"); + + @Override + protected FlinkContainers createFlinkContainers() { + // Swap planner jar files in `lib/` and `opt/` folders + swapPlannerLoaderWithPlannerScala(); + + return FlinkContainers.builder() + .withFlinkContainersSettings( + FlinkContainersSettings.builder().numTaskManagers(4).build()) + .withTestcontainersSettings( + TestcontainersSettings.builder().network(NETWORK).logger(LOGGER).build()) + .build(); + } + + @AfterAll + static void afterAll() { + swapPlannerScalaWithPlannerLoader(); + } + + private void swapPlannerLoaderWithPlannerScala() { + move("flink-table-planner-loader", "lib", "opt"); + move("flink-table-planner_", "opt", "lib"); + } + + private static void swapPlannerScalaWithPlannerLoader() { + move("flink-table-planner-loader", "opt", "lib"); + move("flink-table-planner_", "lib", "opt"); + } + + private static void move(final String filePattern, final String from, final String to) { + final Path fromDirectory = Paths.get(DIST_DIR, from); + try (final Stream<Path> filesStream = Files.list(fromDirectory)) { + final List<Path> files = + filesStream + .filter(entry -> !Files.isDirectory(entry)) + .filter(file -> file.toString().endsWith(".jar")) + .filter(file -> file.getFileName().toString().startsWith(filePattern)) + .collect(Collectors.toList()); + if (files.size() != 1) { + throw new IllegalStateException( + "Found multiple file pattern '" + filePattern + "', expected only one."); Review Comment: It can also contain zero patterns no? Maybe we can improve the error message sth like `"Found " + file.size() " patterms, expected only one."`? ########## flink-test-utils-parent/flink-connector-test-utils/src/test/java/org/apache/flink/connector/testframe/utils/FlinkContainersOperationsITCase.java: ########## @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.testframe.utils; + +import org.apache.flink.connector.testframe.container.FlinkContainers; +import org.apache.flink.connector.testframe.container.FlinkContainersSettings; +import org.apache.flink.connector.testframe.container.TestcontainersSettings; + +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.containers.Container; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.Network; +import org.testcontainers.junit.jupiter.Testcontainers; + +import java.io.IOException; + +import static org.assertj.core.api.Assertions.assertThat; + +@Testcontainers +class FlinkContainersOperationsITCase { + private static final Logger LOGGER = + LoggerFactory.getLogger(FlinkContainersOperationsITCase.class); + public static final Network NETWORK = Network.newNetwork(); + private static final String OUTPUT_PATH = "/tmp/output"; + + @RegisterExtension + static FlinkContainers flink = + FlinkContainers.builder() + .withFlinkContainersSettings( + FlinkContainersSettings.builder() + .baseImage("flink:1.19.0-scala_2.12-java11") + .numTaskManagers(2) + .build()) + .withTestcontainersSettings( + TestcontainersSettings.builder() + .network(NETWORK) + .logger(LOGGER) + .environmentVariable("JOB_MANAGER_RPC_ADDRESS", "jobmanager") + .build()) + .build(); + + @BeforeAll Review Comment: do we need a cleanup with `@AfterAll` ? ########## flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/utils/FlinkContainersOperations.java: ########## @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.testframe.utils; + +import org.apache.flink.connector.testframe.container.FlinkContainers; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.containers.Container; +import org.testcontainers.containers.GenericContainer; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.StringReader; +import java.io.UncheckedIOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +/** A {@link FlinkContainers} docker containers related helper functions. */ +public class FlinkContainersOperations { + private static final Logger LOGGER = LoggerFactory.getLogger(FlinkContainersOperations.class); + private static final String NEWLINE = System.lineSeparator(); + + private final FlinkContainers flink; + + /** + * Creates an instance of {@link FlinkContainersOperations}. + * + * @param flink {@link FlinkContainers} docker based cluster + */ + public FlinkContainersOperations(FlinkContainers flink) { + this.flink = flink; + } + + /** + * Returns the content of files in the provided output path. + * + * <p>It expects that each line is terminated by line separator (e.g, '\n', '\r') or end-of-file + * in the output file. + * + * @param outputPath path of output files + * @param pattern file pattern to retrieve content + * @param isSorted should the output be sorted + * @return files content that match file pattern in the output path + */ + public String getOutputFileContent( + final String outputPath, final String pattern, final boolean isSorted) + throws IOException { + LOGGER.info( + "Fetching content of files in '{}' path with pattern '{}'.", outputPath, pattern); + final FileContentFetcher fileContentFetcher = new FileContentFetcher(outputPath, pattern); + final StringBuilder sb = new StringBuilder(); + for (final GenericContainer<?> taskManager : flink.getTaskManagers()) { + sb.append(fileContentFetcher.getFileContent(taskManager)); + } + return isSorted ? sortLines(sb.toString()) : sb.toString(); + } + + private String sortLines(final String input) throws IOException { + final BufferedReader bufferedReader = new BufferedReader(new StringReader(input)); + final List<String> lines = new ArrayList<>(); + String inputLine; + while ((inputLine = bufferedReader.readLine()) != null) { Review Comment: Maybe use sth like this to avoid possible leaks? `try (BufferedReader bufferedReader = new BufferedReader(new StringReader(input))) {...` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
