This is an automated email from the ASF dual-hosted git repository.

arvid pushed a commit to branch release-1.14
in repository https://gitbox.apache.org/repos/asf/flink.git

commit fcabf563c44058ec11cf0748a417623b6ed47ff1
Author: Qingsheng Ren <[email protected]>
AuthorDate: Tue Sep 7 12:02:59 2021 +0800

    [FLINK-23914][connector/testing-framework] Add INFO log to track running 
stage of connector testing framework
    
    (cherry picked from commit db1df307f8240ff7cd495978556c0efdf4247e94)
---
 .../test/common/testsuites/SourceTestSuiteBase.java | 21 ++++++++++++++++++---
 1 file changed, 18 insertions(+), 3 deletions(-)

diff --git 
a/flink-test-utils-parent/flink-connector-testing/src/main/java/org/apache/flink/connectors/test/common/testsuites/SourceTestSuiteBase.java
 
b/flink-test-utils-parent/flink-connector-testing/src/main/java/org/apache/flink/connectors/test/common/testsuites/SourceTestSuiteBase.java
index 4639a3f..3b6590f 100644
--- 
a/flink-test-utils-parent/flink-connector-testing/src/main/java/org/apache/flink/connectors/test/common/testsuites/SourceTestSuiteBase.java
+++ 
b/flink-test-utils-parent/flink-connector-testing/src/main/java/org/apache/flink/connectors/test/common/testsuites/SourceTestSuiteBase.java
@@ -101,11 +101,13 @@ public abstract class SourceTestSuiteBase<T> {
             throws Exception {
 
         // Write test data to external system
+        LOG.info("Writing test data to split 0");
         final List<T> testRecords = generateAndWriteTestData(0, 
externalContext);
 
         // Build and execute Flink job
         StreamExecutionEnvironment execEnv = 
testEnv.createExecutionEnvironment();
 
+        LOG.info("Submitting Flink job to test environment");
         try (CloseableIterator<T> resultIterator =
                 execEnv.fromSource(
                                 
externalContext.createSource(Boundedness.BOUNDED),
@@ -114,6 +116,7 @@ public abstract class SourceTestSuiteBase<T> {
                         .setParallelism(1)
                         .executeAndCollect("Source Single Split Test")) {
             // Check test result
+            LOG.info("Checking test results");
             assertThat(resultIterator, matchesSplitTestData(testRecords));
         }
     }
@@ -137,11 +140,12 @@ public abstract class SourceTestSuiteBase<T> {
 
         final int splitNumber = 4;
         final List<List<T>> testRecordsLists = new ArrayList<>();
+        LOG.info("Writing test data to split 0 to 3...");
         for (int i = 0; i < splitNumber; i++) {
             testRecordsLists.add(generateAndWriteTestData(i, externalContext));
         }
 
-        LOG.debug("Build and execute Flink job");
+        LOG.info("Submitting Flink job to test environment");
         StreamExecutionEnvironment execEnv = 
testEnv.createExecutionEnvironment();
 
         try (final CloseableIterator<T> resultIterator =
@@ -152,6 +156,7 @@ public abstract class SourceTestSuiteBase<T> {
                         .setParallelism(splitNumber)
                         .executeAndCollect("Source Multiple Split Test")) {
             // Check test result
+            LOG.info("Checking test results");
             assertThat(resultIterator, 
matchesMultipleSplitTestData(testRecordsLists));
         }
     }
@@ -177,10 +182,12 @@ public abstract class SourceTestSuiteBase<T> {
 
         final int splitNumber = 4;
         final List<List<T>> testRecordsLists = new ArrayList<>();
+        LOG.info("Writing test data to split 0 to 3");
         for (int i = 0; i < splitNumber; i++) {
             testRecordsLists.add(generateAndWriteTestData(i, externalContext));
         }
 
+        LOG.info("Submitting Flink job to test environment");
         try (CloseableIterator<T> resultIterator =
                 testEnv.createExecutionEnvironment()
                         .fromSource(
@@ -189,6 +196,7 @@ public abstract class SourceTestSuiteBase<T> {
                                 "Tested Source")
                         .setParallelism(splitNumber + 1)
                         .executeAndCollect("Idle Reader Test")) {
+            LOG.info("Checking test results");
             assertThat(resultIterator, 
matchesMultipleSplitTestData(testRecordsLists));
         }
     }
@@ -215,6 +223,7 @@ public abstract class SourceTestSuiteBase<T> {
             throws Exception {
         int splitIndex = 0;
 
+        LOG.info("Writing test data to split {}", splitIndex);
         final List<T> testRecordsBeforeFailure =
                 externalContext.generateTestData(
                         splitIndex, ThreadLocalRandom.current().nextLong());
@@ -249,27 +258,34 @@ public abstract class SourceTestSuiteBase<T> {
         CollectStreamSink<T> sink = new CollectStreamSink<>(dataStreamSource, 
factory);
         sink.name("Data stream collect sink");
         env.addOperator(sink.getTransformation());
+
+        LOG.info("Submitting Flink job to test environment");
         final JobClient jobClient = env.executeAsync("TaskManager Failover 
Test");
         iterator.setJobClient(jobClient);
         // -------------------------------------- END 
---------------------------------------------
 
+        LOG.info("Checking records before killing TaskManagers");
         assertThat(
                 iterator,
                 matchesSplitTestData(testRecordsBeforeFailure, 
testRecordsBeforeFailure.size()));
 
         // -------------------------------- Trigger failover 
---------------------------------------
+        LOG.info("Trigger TaskManager failover");
         controller.triggerTaskManagerFailover(jobClient, () -> {});
 
+        LOG.info("Waiting for job recovering from failure");
         CommonTestUtils.waitForJobStatus(
                 jobClient,
                 Collections.singletonList(JobStatus.RUNNING),
                 Deadline.fromNow(Duration.ofSeconds(30)));
 
+        LOG.info("Writing test data to split {}", splitIndex);
         final List<T> testRecordsAfterFailure =
                 externalContext.generateTestData(
                         splitIndex, ThreadLocalRandom.current().nextLong());
         sourceSplitDataWriter.writeRecords(testRecordsAfterFailure);
 
+        LOG.info("Checking records after job failover");
         assertThat(
                 iterator,
                 matchesSplitTestData(testRecordsAfterFailure, 
testRecordsAfterFailure.size()));
@@ -291,8 +307,7 @@ public abstract class SourceTestSuiteBase<T> {
      * @param externalContext External context
      * @return List of generated test records
      */
-    protected List<T> generateAndWriteTestData(
-            int splitIndex, ExternalContext<T> externalContext) {
+    protected List<T> generateAndWriteTestData(int splitIndex, 
ExternalContext<T> externalContext) {
         final List<T> testRecords =
                 externalContext.generateTestData(
                         splitIndex, ThreadLocalRandom.current().nextLong());

Reply via email to