leonardBang commented on a change in pull request #18496: URL: https://github.com/apache/flink/pull/18496#discussion_r806729035
########## File path: flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/testsuites/SinkTestSuiteBase.java ########## @@ -0,0 +1,629 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.testframe.testsuites; + +import org.apache.flink.annotation.Experimental; +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.JobStatus; +import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.api.common.time.Deadline; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.connector.sink2.Sink; +import org.apache.flink.api.connector.source.Boundedness; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.connector.testframe.environment.TestEnvironment; +import org.apache.flink.connector.testframe.environment.TestEnvironmentSettings; +import org.apache.flink.connector.testframe.external.ExternalSystemDataReader; +import org.apache.flink.connector.testframe.external.sink.DataStreamSinkExternalContext; +import org.apache.flink.connector.testframe.external.sink.DataStreamSinkV1ExternalContext; +import org.apache.flink.connector.testframe.external.sink.DataStreamSinkV2ExternalContext; +import org.apache.flink.connector.testframe.external.sink.TestingSinkSettings; +import org.apache.flink.connector.testframe.junit.extensions.ConnectorTestingExtension; +import org.apache.flink.connector.testframe.junit.extensions.TestCaseInvocationContextProvider; +import org.apache.flink.connector.testframe.source.FromElementsSource; +import org.apache.flink.connector.testframe.utils.CollectIteratorAssertions; +import org.apache.flink.connector.testframe.utils.MetricQuerier; +import org.apache.flink.core.execution.JobClient; +import org.apache.flink.core.execution.SavepointFormatType; +import org.apache.flink.runtime.metrics.MetricNames; +import org.apache.flink.streaming.api.CheckpointingMode; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.DataStreamSink; +import org.apache.flink.streaming.api.datastream.DataStreamSource; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.operators.collect.CollectResultIterator; +import org.apache.flink.streaming.api.operators.collect.CollectSinkOperator; +import org.apache.flink.streaming.api.operators.collect.CollectSinkOperatorFactory; +import org.apache.flink.streaming.api.operators.collect.CollectStreamSink; +import org.apache.flink.util.TestLoggerExtension; + +import org.apache.commons.math3.util.Precision; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.TestInstance; +import org.junit.jupiter.api.TestTemplate; +import org.junit.jupiter.api.extension.ExtendWith; +import org.opentest4j.TestAbortedException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +import static org.apache.flink.connector.testframe.utils.ConnectorTestConstants.DEFAULT_COLLECT_DATA_TIMEOUT; +import static org.apache.flink.connector.testframe.utils.ConnectorTestConstants.DEFAULT_JOB_STATUS_CHANGE_TIMEOUT; +import static org.apache.flink.runtime.testutils.CommonTestUtils.terminateJob; +import static org.apache.flink.runtime.testutils.CommonTestUtils.waitForAllTaskRunning; +import static org.apache.flink.runtime.testutils.CommonTestUtils.waitForJobStatus; +import static org.apache.flink.runtime.testutils.CommonTestUtils.waitUntilCondition; +import static org.apache.flink.streaming.api.CheckpointingMode.AT_LEAST_ONCE; +import static org.apache.flink.streaming.api.CheckpointingMode.EXACTLY_ONCE; +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.assertj.core.api.AssertionsForClassTypes.assertThat; + +/** + * Base class for sink test suite. + * + * <p>All cases should have well-descriptive JavaDoc, including: + * + * <ul> + * <li>What's the purpose of this case + * <li>Simple description of how this case works + * <li>Condition to fulfill in order to pass this case + * <li>Requirement of running this case + * </ul> + */ +@ExtendWith({ + ConnectorTestingExtension.class, + TestLoggerExtension.class, + TestCaseInvocationContextProvider.class +}) +@TestInstance(TestInstance.Lifecycle.PER_CLASS) +@Experimental +public abstract class SinkTestSuiteBase<T extends Comparable<T>> { + private static final Logger LOG = LoggerFactory.getLogger(SinkTestSuiteBase.class); + + // ----------------------------- Basic test cases --------------------------------- + + /** + * Test connector data stream sink. + * + * <p>This test will create a sink in the external system, generate a collection of test data + * and write them to this sink by the Flink Job. + * + * <p>In order to pass this test, the number of records produced by Flink need to be equals to + * the generated test data. And the records in the sink will be compared to the test data by the + * different semantic. There's no requirement for record order. + */ + @TestTemplate + @DisplayName("Test data stream sink") + public void testBasicSink( + TestEnvironment testEnv, + DataStreamSinkExternalContext<T> externalContext, + CheckpointingMode semantic) + throws Exception { + TestingSinkSettings sinkSettings = getTestingSinkSettings(semantic); + final List<T> testRecords = generateTestData(sinkSettings, externalContext); + + // Build and execute Flink job + StreamExecutionEnvironment execEnv = + testEnv.createExecutionEnvironment( + TestEnvironmentSettings.builder() + .setConnectorJarPaths(externalContext.getConnectorJarPaths()) + .build()); + execEnv.enableCheckpointing(50); + DataStream<T> dataStream = + execEnv.fromCollection(testRecords) + .name("sourceInSinkTest") + .setParallelism(1) + .returns(externalContext.getProducedType()); + tryCreateSink(dataStream, externalContext, sinkSettings) + .setParallelism(1) + .name("sinkInSinkTest"); + final JobClient jobClient = execEnv.executeAsync("DataStream Sink Test"); + + waitForJobStatus( + jobClient, + Collections.singletonList(JobStatus.FINISHED), + Deadline.fromNow(DEFAULT_JOB_STATUS_CHANGE_TIMEOUT)); + + // Check test result + checkResultWithSemantic( + externalContext.createSinkDataReader(sinkSettings), testRecords, semantic); + } + + /** + * Test connector source restart from a completed savepoint with the same parallelism. + * + * <p>This test will create a sink in the external system, generate a collection of test data + * and write a half part of them to this sink by the Flink Job with parallelism 2 at first. Then + * stop the job, restart the same job from the completed savepoint. After the job has been + * running, write the other part to the sink and compare the result. + * + * <p>In order to pass this test, the number of records produced by Flink need to be equals to + * the generated test data. And the records in the sink will be compared to the test data by the + * different semantic. There's no requirement for record order. + */ + @TestTemplate + @DisplayName("Test sink restarting from a savepoint") + public void testStartFromSavepoint( + TestEnvironment testEnv, + DataStreamSinkExternalContext<T> externalContext, + CheckpointingMode semantic) + throws Exception { + restartFromSavepoint(testEnv, externalContext, semantic, 2, 2); + } + + /** + * Test connector source restart from a completed savepoint with a higher parallelism. + * + * <p>This test will create a sink in the external system, generate a collection of test data + * and write a half part of them to this sink by the Flink Job with parallelism 2 at first. Then + * stop the job, restart the same job from the completed savepoint with a higher parallelism 4. + * After the job has been running, write the other part to the sink and compare the result. + * + * <p>In order to pass this test, the number of records produced by Flink need to be equals to + * the generated test data. And the records in the sink will be compared to the test data by the + * different semantic. There's no requirement for record order. + */ + @TestTemplate + @DisplayName("Test sink restarting with a higher parallelism") + public void testScaleUp( + TestEnvironment testEnv, + DataStreamSinkExternalContext<T> externalContext, + CheckpointingMode semantic) + throws Exception { + restartFromSavepoint(testEnv, externalContext, semantic, 2, 4); + } + + /** + * Test connector source restart from a completed savepoint with a lower parallelism. + * + * <p>This test will create a sink in the external system, generate a collection of test data + * and write a half part of them to this sink by the Flink Job with parallelism 4 at first. Then + * stop the job, restart the same job from the completed savepoint with a lower parallelism 2. + * After the job has been running, write the other part to the sink and compare the result. + * + * <p>In order to pass this test, the number of records produced by Flink need to be equals to + * the generated test data. And the records in the sink will be compared to the test data by the + * different semantic. There's no requirement for record order. + */ + @TestTemplate + @DisplayName("Test sink restarting with a lower parallelism") + public void testScaleDown( + TestEnvironment testEnv, + DataStreamSinkExternalContext<T> externalContext, + CheckpointingMode semantic) + throws Exception { + restartFromSavepoint(testEnv, externalContext, semantic, 4, 2); + } + + private void restartFromSavepoint( + TestEnvironment testEnv, + DataStreamSinkExternalContext<T> externalContext, + CheckpointingMode semantic, + final int beforeParallelism, + final int afterParallelism) + throws Exception { + // Step 1: Preparation + TestingSinkSettings sinkSettings = getTestingSinkSettings(semantic); + final StreamExecutionEnvironment execEnv = + testEnv.createExecutionEnvironment( + TestEnvironmentSettings.builder() + .setConnectorJarPaths(externalContext.getConnectorJarPaths()) + .build()); + execEnv.setRestartStrategy(RestartStrategies.noRestart()); + + // Step 2: Generate test data + final List<T> testRecords = generateTestData(sinkSettings, externalContext); + + // Step 3: Build and execute Flink job + int numBeforeSuccess = testRecords.size() / 2; + DataStreamSource<T> source = + execEnv.fromSource( + new FromElementsSource<>( + Boundedness.CONTINUOUS_UNBOUNDED, + testRecords, + numBeforeSuccess), + WatermarkStrategy.noWatermarks(), + "beforeRestartSource") + .setParallelism(1); + + DataStream<T> dataStream = source.returns(externalContext.getProducedType()); + tryCreateSink(dataStream, externalContext, sinkSettings) + .name("Sink restart test") + .setParallelism(beforeParallelism); + + /** + * The job should stop after consume a specified number of records. In order to know when + * the specified number of records have been consumed, a collect sink is need to be watched. + */ + CollectResultIterator<T> iterator = addCollectSink(source); + final JobClient jobClient = execEnv.executeAsync("Restart Test"); + iterator.setJobClient(jobClient); + + // Step 4: Wait for the expected result and stop Flink job with a savepoint + String savepointDir; + try { + final MetricQuerier queryRestClient = new MetricQuerier(new Configuration()); Review comment: make sense to me @ruanhang1993 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
