morazow commented on code in PR #24776: URL: https://github.com/apache/flink/pull/24776#discussion_r1611417574
########## flink-end-to-end-tests/flink-stream-sql-test/src/test/java/org/apache/flink/sql/tests/StreamSQLTestProgramScalaPlannerITCase.java: ########## @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.sql.tests; + +import org.apache.flink.connector.testframe.container.FlinkContainers; +import org.apache.flink.connector.testframe.container.FlinkContainersSettings; +import org.apache.flink.connector.testframe.container.TestcontainersSettings; + +import org.junit.jupiter.api.AfterAll; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +class StreamSQLTestProgramScalaPlannerITCase extends AbstractStreamSQLTestProgramITCase { + private static final Logger LOGGER = + LoggerFactory.getLogger(StreamSQLTestProgramScalaPlannerITCase.class); + private static final String DIST_DIR = System.getProperty("distDir"); + + @Override + protected FlinkContainers createFlinkContainers() { + // Swap planner jar files in `lib/` and `opt/` folders + swapPlannerLoaderWithPlannerScala(); + + return FlinkContainers.builder() + .withFlinkContainersSettings( + FlinkContainersSettings.builder().numTaskManagers(4).build()) + .withTestcontainersSettings( + TestcontainersSettings.builder().network(NETWORK).logger(LOGGER).build()) + .build(); + } + + @AfterAll + static void afterAll() { + swapPlannerScalaWithPlannerLoader(); + } + + private void swapPlannerLoaderWithPlannerScala() { Review Comment: Added also test that checks correct planner jar is used ########## flink-test-utils-parent/flink-connector-test-utils/src/test/java/org/apache/flink/connector/testframe/utils/FlinkContainersOperationsITCase.java: ########## @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.testframe.utils; + +import org.apache.flink.connector.testframe.container.FlinkContainers; +import org.apache.flink.connector.testframe.container.FlinkContainersSettings; +import org.apache.flink.connector.testframe.container.TestcontainersSettings; + +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.containers.Container; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.Network; +import org.testcontainers.junit.jupiter.Testcontainers; + +import java.io.IOException; + +import static org.assertj.core.api.Assertions.assertThat; + +@Testcontainers +class FlinkContainersOperationsITCase { + private static final Logger LOGGER = + LoggerFactory.getLogger(FlinkContainersOperationsITCase.class); + public static final Network NETWORK = Network.newNetwork(); + private static final String OUTPUT_PATH = "/tmp/output"; + + @RegisterExtension + static FlinkContainers flink = + FlinkContainers.builder() + .withFlinkContainersSettings( + FlinkContainersSettings.builder() + .baseImage("flink:1.19.0-scala_2.12-java11") + .numTaskManagers(2) + .build()) + .withTestcontainersSettings( + TestcontainersSettings.builder() + .network(NETWORK) + .logger(LOGGER) + .environmentVariable("JOB_MANAGER_RPC_ADDRESS", "jobmanager") + .build()) + .build(); + + @BeforeAll Review Comment: I think not, since the it tagged with `@RegisterExtension`, the FlinkContainers is stopped in the afterall callback, so setup files inside the taskmanagers will be purged with container shutdown ########## flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/utils/FlinkContainersOperations.java: ########## @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.testframe.utils; + +import org.apache.flink.connector.testframe.container.FlinkContainers; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.containers.Container; +import org.testcontainers.containers.GenericContainer; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.StringReader; +import java.io.UncheckedIOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +/** A {@link FlinkContainers} docker containers related helper functions. */ +public class FlinkContainersOperations { + private static final Logger LOGGER = LoggerFactory.getLogger(FlinkContainersOperations.class); + private static final String NEWLINE = System.lineSeparator(); + + private final FlinkContainers flink; + + /** + * Creates an instance of {@link FlinkContainersOperations}. + * + * @param flink {@link FlinkContainers} docker based cluster + */ + public FlinkContainersOperations(FlinkContainers flink) { + this.flink = flink; + } + + /** + * Returns the content of files in the provided output path. + * + * <p>It expects that each line is terminated by line separator (e.g, '\n', '\r') or end-of-file + * in the output file. + * + * @param outputPath path of output files + * @param pattern file pattern to retrieve content + * @param isSorted should the output be sorted + * @return files content that match file pattern in the output path + */ + public String getOutputFileContent( + final String outputPath, final String pattern, final boolean isSorted) + throws IOException { + LOGGER.info( + "Fetching content of files in '{}' path with pattern '{}'.", outputPath, pattern); + final FileContentFetcher fileContentFetcher = new FileContentFetcher(outputPath, pattern); + final StringBuilder sb = new StringBuilder(); + for (final GenericContainer<?> taskManager : flink.getTaskManagers()) { + sb.append(fileContentFetcher.getFileContent(taskManager)); + } + return isSorted ? sortLines(sb.toString()) : sb.toString(); + } + + private String sortLines(final String input) throws IOException { + final BufferedReader bufferedReader = new BufferedReader(new StringReader(input)); + final List<String> lines = new ArrayList<>(); + String inputLine; + while ((inputLine = bufferedReader.readLine()) != null) { Review Comment: Yes good catch, addressed ########## flink-end-to-end-tests/flink-stream-sql-test/src/test/java/org/apache/flink/sql/tests/StreamSQLTestProgramScalaPlannerITCase.java: ########## @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.sql.tests; + +import org.apache.flink.connector.testframe.container.FlinkContainers; +import org.apache.flink.connector.testframe.container.FlinkContainersSettings; +import org.apache.flink.connector.testframe.container.TestcontainersSettings; + +import org.junit.jupiter.api.AfterAll; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +class StreamSQLTestProgramScalaPlannerITCase extends AbstractStreamSQLTestProgramITCase { + private static final Logger LOGGER = + LoggerFactory.getLogger(StreamSQLTestProgramScalaPlannerITCase.class); + private static final String DIST_DIR = System.getProperty("distDir"); + + @Override + protected FlinkContainers createFlinkContainers() { + // Swap planner jar files in `lib/` and `opt/` folders + swapPlannerLoaderWithPlannerScala(); + + return FlinkContainers.builder() + .withFlinkContainersSettings( + FlinkContainersSettings.builder().numTaskManagers(4).build()) + .withTestcontainersSettings( + TestcontainersSettings.builder().network(NETWORK).logger(LOGGER).build()) + .build(); + } + + @AfterAll + static void afterAll() { + swapPlannerScalaWithPlannerLoader(); + } + + private void swapPlannerLoaderWithPlannerScala() { Review Comment: I think this should be fine. Here I wanted to ensure that it happens before creating flink-containers since it uses the flink folder to create job and task managers. So swap should happen before that. But I think constructor is after the BeforeAll method, so should be fine, I will check ########## flink-test-utils-parent/flink-connector-test-utils/src/test/java/org/apache/flink/connector/testframe/container/FlinkImageBuilderTest.java: ########## @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.testframe.container; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.TaskManagerOptions; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.startupcheck.OneShotStartupCheckStrategy; +import org.testcontainers.images.builder.ImageFromDockerfile; + +import java.nio.file.Path; + +import static org.assertj.core.api.Assertions.assertThat; + +class FlinkImageBuilderTest { + FlinkContainersSettings settings; + Configuration conf; + + @TempDir Path tempDir; + + @BeforeEach + void beforeEach() { + settings = FlinkContainersSettings.builder().numTaskManagers(2).build(); + conf = settings.getFlinkConfig(); + } + + @Test + void testFromDistJobManager() throws ImageBuildException { + ImageFromDockerfile image = + new FlinkImageBuilder() + .setFlinkDistPath(tempDir) + .setTempDirectory(tempDir) + .setConfiguration(conf) + .setLogProperties(settings.getLogProperties()) + .setBaseImage(settings.getBaseImage()) + .asJobManager() + .build(); + assertThat(image.getDockerImageName()).isEqualTo("flink-configured-jobmanager"); + try (final GenericContainer<?> container = new GenericContainer<>(image)) { + container + .withStartupCheckStrategy(new OneShotStartupCheckStrategy()) + .withCommand("cat", "/opt/flink/conf/config.yaml") + .start(); + assertThat(container.getLogs()).contains("rpc:\n address: jobmanager"); + } + } + + @Test + void testFromDistTaskManager() throws ImageBuildException { + ImageFromDockerfile image = + new FlinkImageBuilder() + .setFlinkDistPath(tempDir) + .setTempDirectory(tempDir) + .setConfiguration(conf) + .setLogProperties(settings.getLogProperties()) + .setBaseImage(settings.getBaseImage()) + .asTaskManager() + .build(); + assertThat(image.getDockerImageName()).isEqualTo("flink-configured-taskmanager"); + try (final GenericContainer<?> container = new GenericContainer<>(image)) { + container + .withStartupCheckStrategy(new OneShotStartupCheckStrategy()) + .withCommand("cat", "/opt/flink/conf/config.yaml") + .start(); + assertThat(container.getLogs()).contains("rpc:\n address: jobmanager"); + } + } + + @Test + void testFromDistTaskManagerWithHostParameter() throws ImageBuildException { + final Configuration taskManagerConf = new Configuration(); + taskManagerConf.addAll(conf); + taskManagerConf.set(TaskManagerOptions.HOST, "taskmanager1-host"); + ImageFromDockerfile image = + new FlinkImageBuilder() + .setFlinkDistPath(tempDir) + .setTempDirectory(tempDir) + .setConfiguration(taskManagerConf) + .setLogProperties(settings.getLogProperties()) + .setBaseImage(settings.getBaseImage()) + .asTaskManager() + .build(); + assertThat(image.getDockerImageName()).isEqualTo("flink-configured-taskmanager"); + try (final GenericContainer<?> container = new GenericContainer<>(image)) { + container + .withStartupCheckStrategy(new OneShotStartupCheckStrategy()) + .withCommand("cat", "/opt/flink/conf/config.yaml") + .start(); + assertThat(container.getLogs()).contains("host: taskmanager1-host"); + } + } + + @Test + void testFromDistWithLog4jFile() throws ImageBuildException { + ImageFromDockerfile image = + new FlinkImageBuilder() + .setFlinkDistPath(tempDir) + .setTempDirectory(tempDir) + .setConfiguration(conf) + .setLogProperties(settings.getLogProperties()) + .setBaseImage(settings.getBaseImage()) + .asJobManager() + .build(); + try (final GenericContainer<?> container = new GenericContainer<>(image)) { + container + .withStartupCheckStrategy(new OneShotStartupCheckStrategy()) + .withCommand("cat", "/opt/flink/conf/log4j-console.properties") + .start(); + System.out.println(container.getLogs()); + assertThat(container.getLogs()).contains("appender.console.name=ConsoleAppender"); + } + } + + @Test + void testJobManagerWithBaseImage() throws ImageBuildException { + final FlinkContainersSettings settings = Review Comment: Ahh yes correct, I'd rename the local ones since they are different just for this test case -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
