This is an automated email from the ASF dual-hosted git repository.

roman pushed a commit to branch release-1.13
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.13 by this push:
     new 3909c9f  [FLINK-22889] Add debug statements to 
JdbcExactlyOnceSinkE2eTest
3909c9f is described below

commit 3909c9f0a11e8b38b264db9e7716fb41e75cc524
Author: Roman Khachatryan <[email protected]>
AuthorDate: Tue Jul 6 22:34:11 2021 +0200

    [FLINK-22889] Add debug statements to JdbcExactlyOnceSinkE2eTest
---
 .../jdbc/xa/JdbcExactlyOnceSinkE2eTest.java        | 49 ++++++++++++++++++++--
 .../src/test/resources/log4j2-test.properties      |  4 ++
 2 files changed, 49 insertions(+), 4 deletions(-)

diff --git 
a/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/JdbcExactlyOnceSinkE2eTest.java
 
b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/JdbcExactlyOnceSinkE2eTest.java
index cf7a0e5..a49ab59 100644
--- 
a/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/JdbcExactlyOnceSinkE2eTest.java
+++ 
b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/JdbcExactlyOnceSinkE2eTest.java
@@ -50,6 +50,8 @@ import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 import org.postgresql.xa.PGXADataSource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.testcontainers.containers.JdbcDatabaseContainer;
 import org.testcontainers.containers.MySQLContainer;
 import org.testcontainers.containers.PostgreSQLContainer;
@@ -88,6 +90,8 @@ import static org.junit.Assert.assertTrue;
 public class JdbcExactlyOnceSinkE2eTest extends JdbcTestBase {
     private static final Random RANDOM = new 
Random(System.currentTimeMillis());
 
+    private static final Logger LOG = 
LoggerFactory.getLogger(JdbcExactlyOnceSinkE2eTest.class);
+
     private interface JdbcExactlyOnceSinkTestEnv {
         void start();
 
@@ -167,6 +171,8 @@ public class JdbcExactlyOnceSinkE2eTest extends 
JdbcTestBase {
 
     @Test
     public void testInsert() throws Exception {
+        long started = System.currentTimeMillis();
+        LOG.info("Test insert for {}", dbEnv);
         int elementsPerSource = 50;
         int numElementsPerCheckpoint = 7;
         int minElementsPerFailure = numElementsPerCheckpoint / 3;
@@ -207,6 +213,8 @@ public class JdbcExactlyOnceSinkE2eTest extends 
JdbcTestBase {
         assertTrue(
                 insertedIds.toString(),
                 insertedIds.size() == expectedIds.size() && 
expectedIds.containsAll(insertedIds));
+        LOG.info(
+                "Test insert for {} finished in {}ms", dbEnv, 
System.currentTimeMillis() - started);
     }
 
     @Override
@@ -295,9 +303,20 @@ public class JdbcExactlyOnceSinkE2eTest extends 
JdbcTestBase {
                 lastCheckpointId = -1L;
                 lastSnapshotConfirmed = false;
                 for (int j = start; j < start + count && running; j++) {
-                    ctx.collect(
-                            new TestEntry(
-                                    j, Integer.toString(j), 
Integer.toString(j), (double) (j), j));
+                    try {
+                        ctx.collect(
+                                new TestEntry(
+                                        j,
+                                        Integer.toString(j),
+                                        Integer.toString(j),
+                                        (double) (j),
+                                        j));
+                    } catch (Exception e) {
+                        if (!ExceptionUtils.findThrowable(e, 
TestException.class).isPresent()) {
+                            LOG.warn("Exception during record emission", e);
+                        }
+                        throw e;
+                    }
                     toAdvance.advance();
                 }
             }
@@ -335,6 +354,7 @@ public class JdbcExactlyOnceSinkE2eTest extends 
JdbcTestBase {
                                 SourceRange.forSubtask(
                                         
getRuntimeContext().getIndexOfThisSubtask(), numElements)));
             }
+            LOG.debug("Source initialized with ranges: {}", ranges.get());
         }
 
         @Override
@@ -343,10 +363,16 @@ public class JdbcExactlyOnceSinkE2eTest extends 
JdbcTestBase {
         }
 
         private void sleep(Supplier<Boolean> condition) {
+            long start = System.currentTimeMillis();
             while (condition.get()
                     && running
                     && !Thread.currentThread().isInterrupted()
                     && haveActiveSources()) {
+                if (System.currentTimeMillis() - start > 10_000) {
+                    // debugging FLINK-22889 (TODO: remove after resolved)
+                    LOG.debug("Slept more than 10s", new Exception());
+                    start = Long.MAX_VALUE;
+                }
                 try {
                     Thread.sleep(10);
                 } catch (InterruptedException e) {
@@ -357,7 +383,13 @@ public class JdbcExactlyOnceSinkE2eTest extends 
JdbcTestBase {
         }
 
         private void waitOtherSources() throws InterruptedException {
+            long start = System.currentTimeMillis();
             while (running && haveActiveSources()) {
+                if (System.currentTimeMillis() - start > 10_000) {
+                    // debugging FLINK-22889 (TODO: remove after resolved)
+                    LOG.debug("Slept more than 10s", new Exception());
+                    start = Long.MAX_VALUE;
+                }
                 activeSources
                         .get(getRuntimeContext().getAttemptNumber())
                         .await(100, TimeUnit.MILLISECONDS);
@@ -386,6 +418,11 @@ public class JdbcExactlyOnceSinkE2eTest extends 
JdbcTestBase {
                 checkState(from < to);
                 from++;
             }
+
+            @Override
+            public String toString() {
+                return String.format("%d..%d", from, to);
+            }
         }
     }
 
@@ -409,11 +446,13 @@ public class JdbcExactlyOnceSinkE2eTest extends 
JdbcTestBase {
                                     new CountDownLatch(
                                             
getRuntimeContext().getNumberOfParallelSubtasks()))
                     .countDown();
+            LOG.debug("Mapper will fail after {} records", remaining);
         }
 
         @Override
         public TestEntry map(TestEntry value) throws Exception {
             if (--remaining <= 0) {
+                LOG.debug("Mapper failing intentionally");
                 throw new TestException();
             }
             return value;
@@ -422,7 +461,9 @@ public class JdbcExactlyOnceSinkE2eTest extends 
JdbcTestBase {
 
     private static final class TestException extends Exception {
         public TestException() {
-            super("expected", null, true, false);
+            // use this string to prevent error parsing scripts from failing 
the build
+            // and still have exception type
+            super("java.lang.Exception: Artificial failure", null, true, 
false);
         }
     }
 
diff --git 
a/flink-connectors/flink-connector-jdbc/src/test/resources/log4j2-test.properties
 
b/flink-connectors/flink-connector-jdbc/src/test/resources/log4j2-test.properties
index 835c2ec..79c987a 100644
--- 
a/flink-connectors/flink-connector-jdbc/src/test/resources/log4j2-test.properties
+++ 
b/flink-connectors/flink-connector-jdbc/src/test/resources/log4j2-test.properties
@@ -21,6 +21,10 @@
 rootLogger.level = OFF
 rootLogger.appenderRef.test.ref = TestLogger
 
+# debugging FLINK-22889 (TODO: remove after resolved)
+logger.jdbc.name = 
org.apache.flink.connector.jdbc.xa.JdbcExactlyOnceSinkE2eTest
+logger.jdbc.level = DEBUG
+
 appender.testlogger.name = TestLogger
 appender.testlogger.type = CONSOLE
 appender.testlogger.target = SYSTEM_ERR

Reply via email to