This is an automated email from the ASF dual-hosted git repository. arvid pushed a commit to branch release-1.14 in repository https://gitbox.apache.org/repos/asf/flink.git
commit fcabf563c44058ec11cf0748a417623b6ed47ff1 Author: Qingsheng Ren <[email protected]> AuthorDate: Tue Sep 7 12:02:59 2021 +0800 [FLINK-23914][connector/testing-framework] Add INFO log to track running stage of connector testing framework (cherry picked from commit db1df307f8240ff7cd495978556c0efdf4247e94) --- .../test/common/testsuites/SourceTestSuiteBase.java | 21 ++++++++++++++++++--- 1 file changed, 18 insertions(+), 3 deletions(-) diff --git a/flink-test-utils-parent/flink-connector-testing/src/main/java/org/apache/flink/connectors/test/common/testsuites/SourceTestSuiteBase.java b/flink-test-utils-parent/flink-connector-testing/src/main/java/org/apache/flink/connectors/test/common/testsuites/SourceTestSuiteBase.java index 4639a3f..3b6590f 100644 --- a/flink-test-utils-parent/flink-connector-testing/src/main/java/org/apache/flink/connectors/test/common/testsuites/SourceTestSuiteBase.java +++ b/flink-test-utils-parent/flink-connector-testing/src/main/java/org/apache/flink/connectors/test/common/testsuites/SourceTestSuiteBase.java @@ -101,11 +101,13 @@ public abstract class SourceTestSuiteBase<T> { throws Exception { // Write test data to external system + LOG.info("Writing test data to split 0"); final List<T> testRecords = generateAndWriteTestData(0, externalContext); // Build and execute Flink job StreamExecutionEnvironment execEnv = testEnv.createExecutionEnvironment(); + LOG.info("Submitting Flink job to test environment"); try (CloseableIterator<T> resultIterator = execEnv.fromSource( externalContext.createSource(Boundedness.BOUNDED), @@ -114,6 +116,7 @@ public abstract class SourceTestSuiteBase<T> { .setParallelism(1) .executeAndCollect("Source Single Split Test")) { // Check test result + LOG.info("Checking test results"); assertThat(resultIterator, matchesSplitTestData(testRecords)); } } @@ -137,11 +140,12 @@ public abstract class SourceTestSuiteBase<T> { final int splitNumber = 4; final List<List<T>> testRecordsLists = new ArrayList<>(); + LOG.info("Writing test data to split 0 to 3..."); for (int i = 0; i < splitNumber; i++) { testRecordsLists.add(generateAndWriteTestData(i, externalContext)); } - LOG.debug("Build and execute Flink job"); + LOG.info("Submitting Flink job to test environment"); StreamExecutionEnvironment execEnv = testEnv.createExecutionEnvironment(); try (final CloseableIterator<T> resultIterator = @@ -152,6 +156,7 @@ public abstract class SourceTestSuiteBase<T> { .setParallelism(splitNumber) .executeAndCollect("Source Multiple Split Test")) { // Check test result + LOG.info("Checking test results"); assertThat(resultIterator, matchesMultipleSplitTestData(testRecordsLists)); } } @@ -177,10 +182,12 @@ public abstract class SourceTestSuiteBase<T> { final int splitNumber = 4; final List<List<T>> testRecordsLists = new ArrayList<>(); + LOG.info("Writing test data to split 0 to 3"); for (int i = 0; i < splitNumber; i++) { testRecordsLists.add(generateAndWriteTestData(i, externalContext)); } + LOG.info("Submitting Flink job to test environment"); try (CloseableIterator<T> resultIterator = testEnv.createExecutionEnvironment() .fromSource( @@ -189,6 +196,7 @@ public abstract class SourceTestSuiteBase<T> { "Tested Source") .setParallelism(splitNumber + 1) .executeAndCollect("Idle Reader Test")) { + LOG.info("Checking test results"); assertThat(resultIterator, matchesMultipleSplitTestData(testRecordsLists)); } } @@ -215,6 +223,7 @@ public abstract class SourceTestSuiteBase<T> { throws Exception { int splitIndex = 0; + LOG.info("Writing test data to split {}", splitIndex); final List<T> testRecordsBeforeFailure = externalContext.generateTestData( splitIndex, ThreadLocalRandom.current().nextLong()); @@ -249,27 +258,34 @@ public abstract class SourceTestSuiteBase<T> { CollectStreamSink<T> sink = new CollectStreamSink<>(dataStreamSource, factory); sink.name("Data stream collect sink"); env.addOperator(sink.getTransformation()); + + LOG.info("Submitting Flink job to test environment"); final JobClient jobClient = env.executeAsync("TaskManager Failover Test"); iterator.setJobClient(jobClient); // -------------------------------------- END --------------------------------------------- + LOG.info("Checking records before killing TaskManagers"); assertThat( iterator, matchesSplitTestData(testRecordsBeforeFailure, testRecordsBeforeFailure.size())); // -------------------------------- Trigger failover --------------------------------------- + LOG.info("Trigger TaskManager failover"); controller.triggerTaskManagerFailover(jobClient, () -> {}); + LOG.info("Waiting for job recovering from failure"); CommonTestUtils.waitForJobStatus( jobClient, Collections.singletonList(JobStatus.RUNNING), Deadline.fromNow(Duration.ofSeconds(30))); + LOG.info("Writing test data to split {}", splitIndex); final List<T> testRecordsAfterFailure = externalContext.generateTestData( splitIndex, ThreadLocalRandom.current().nextLong()); sourceSplitDataWriter.writeRecords(testRecordsAfterFailure); + LOG.info("Checking records after job failover"); assertThat( iterator, matchesSplitTestData(testRecordsAfterFailure, testRecordsAfterFailure.size())); @@ -291,8 +307,7 @@ public abstract class SourceTestSuiteBase<T> { * @param externalContext External context * @return List of generated test records */ - protected List<T> generateAndWriteTestData( - int splitIndex, ExternalContext<T> externalContext) { + protected List<T> generateAndWriteTestData(int splitIndex, ExternalContext<T> externalContext) { final List<T> testRecords = externalContext.generateTestData( splitIndex, ThreadLocalRandom.current().nextLong());
