This is an automated email from the ASF dual-hosted git repository.
roman pushed a commit to branch release-1.13
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.13 by this push:
new 3909c9f [FLINK-22889] Add debug statements to
JdbcExactlyOnceSinkE2eTest
3909c9f is described below
commit 3909c9f0a11e8b38b264db9e7716fb41e75cc524
Author: Roman Khachatryan <[email protected]>
AuthorDate: Tue Jul 6 22:34:11 2021 +0200
[FLINK-22889] Add debug statements to JdbcExactlyOnceSinkE2eTest
---
.../jdbc/xa/JdbcExactlyOnceSinkE2eTest.java | 49 ++++++++++++++++++++--
.../src/test/resources/log4j2-test.properties | 4 ++
2 files changed, 49 insertions(+), 4 deletions(-)
diff --git
a/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/JdbcExactlyOnceSinkE2eTest.java
b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/JdbcExactlyOnceSinkE2eTest.java
index cf7a0e5..a49ab59 100644
---
a/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/JdbcExactlyOnceSinkE2eTest.java
+++
b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/JdbcExactlyOnceSinkE2eTest.java
@@ -50,6 +50,8 @@ import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.postgresql.xa.PGXADataSource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.testcontainers.containers.JdbcDatabaseContainer;
import org.testcontainers.containers.MySQLContainer;
import org.testcontainers.containers.PostgreSQLContainer;
@@ -88,6 +90,8 @@ import static org.junit.Assert.assertTrue;
public class JdbcExactlyOnceSinkE2eTest extends JdbcTestBase {
private static final Random RANDOM = new
Random(System.currentTimeMillis());
+ private static final Logger LOG =
LoggerFactory.getLogger(JdbcExactlyOnceSinkE2eTest.class);
+
private interface JdbcExactlyOnceSinkTestEnv {
void start();
@@ -167,6 +171,8 @@ public class JdbcExactlyOnceSinkE2eTest extends
JdbcTestBase {
@Test
public void testInsert() throws Exception {
+ long started = System.currentTimeMillis();
+ LOG.info("Test insert for {}", dbEnv);
int elementsPerSource = 50;
int numElementsPerCheckpoint = 7;
int minElementsPerFailure = numElementsPerCheckpoint / 3;
@@ -207,6 +213,8 @@ public class JdbcExactlyOnceSinkE2eTest extends
JdbcTestBase {
assertTrue(
insertedIds.toString(),
insertedIds.size() == expectedIds.size() &&
expectedIds.containsAll(insertedIds));
+ LOG.info(
+ "Test insert for {} finished in {}ms", dbEnv,
System.currentTimeMillis() - started);
}
@Override
@@ -295,9 +303,20 @@ public class JdbcExactlyOnceSinkE2eTest extends
JdbcTestBase {
lastCheckpointId = -1L;
lastSnapshotConfirmed = false;
for (int j = start; j < start + count && running; j++) {
- ctx.collect(
- new TestEntry(
- j, Integer.toString(j),
Integer.toString(j), (double) (j), j));
+ try {
+ ctx.collect(
+ new TestEntry(
+ j,
+ Integer.toString(j),
+ Integer.toString(j),
+ (double) (j),
+ j));
+ } catch (Exception e) {
+ if (!ExceptionUtils.findThrowable(e,
TestException.class).isPresent()) {
+ LOG.warn("Exception during record emission", e);
+ }
+ throw e;
+ }
toAdvance.advance();
}
}
@@ -335,6 +354,7 @@ public class JdbcExactlyOnceSinkE2eTest extends
JdbcTestBase {
SourceRange.forSubtask(
getRuntimeContext().getIndexOfThisSubtask(), numElements)));
}
+ LOG.debug("Source initialized with ranges: {}", ranges.get());
}
@Override
@@ -343,10 +363,16 @@ public class JdbcExactlyOnceSinkE2eTest extends
JdbcTestBase {
}
private void sleep(Supplier<Boolean> condition) {
+ long start = System.currentTimeMillis();
while (condition.get()
&& running
&& !Thread.currentThread().isInterrupted()
&& haveActiveSources()) {
+ if (System.currentTimeMillis() - start > 10_000) {
+ // debugging FLINK-22889 (TODO: remove after resolved)
+ LOG.debug("Slept more than 10s", new Exception());
+ start = Long.MAX_VALUE;
+ }
try {
Thread.sleep(10);
} catch (InterruptedException e) {
@@ -357,7 +383,13 @@ public class JdbcExactlyOnceSinkE2eTest extends
JdbcTestBase {
}
private void waitOtherSources() throws InterruptedException {
+ long start = System.currentTimeMillis();
while (running && haveActiveSources()) {
+ if (System.currentTimeMillis() - start > 10_000) {
+ // debugging FLINK-22889 (TODO: remove after resolved)
+ LOG.debug("Slept more than 10s", new Exception());
+ start = Long.MAX_VALUE;
+ }
activeSources
.get(getRuntimeContext().getAttemptNumber())
.await(100, TimeUnit.MILLISECONDS);
@@ -386,6 +418,11 @@ public class JdbcExactlyOnceSinkE2eTest extends
JdbcTestBase {
checkState(from < to);
from++;
}
+
+ @Override
+ public String toString() {
+ return String.format("%d..%d", from, to);
+ }
}
}
@@ -409,11 +446,13 @@ public class JdbcExactlyOnceSinkE2eTest extends
JdbcTestBase {
new CountDownLatch(
getRuntimeContext().getNumberOfParallelSubtasks()))
.countDown();
+ LOG.debug("Mapper will fail after {} records", remaining);
}
@Override
public TestEntry map(TestEntry value) throws Exception {
if (--remaining <= 0) {
+ LOG.debug("Mapper failing intentionally");
throw new TestException();
}
return value;
@@ -422,7 +461,9 @@ public class JdbcExactlyOnceSinkE2eTest extends
JdbcTestBase {
private static final class TestException extends Exception {
public TestException() {
- super("expected", null, true, false);
+ // use this string to prevent error parsing scripts from failing
the build
+ // and still have exception type
+ super("java.lang.Exception: Artificial failure", null, true,
false);
}
}
diff --git
a/flink-connectors/flink-connector-jdbc/src/test/resources/log4j2-test.properties
b/flink-connectors/flink-connector-jdbc/src/test/resources/log4j2-test.properties
index 835c2ec..79c987a 100644
---
a/flink-connectors/flink-connector-jdbc/src/test/resources/log4j2-test.properties
+++
b/flink-connectors/flink-connector-jdbc/src/test/resources/log4j2-test.properties
@@ -21,6 +21,10 @@
rootLogger.level = OFF
rootLogger.appenderRef.test.ref = TestLogger
+# debugging FLINK-22889 (TODO: remove after resolved)
+logger.jdbc.name =
org.apache.flink.connector.jdbc.xa.JdbcExactlyOnceSinkE2eTest
+logger.jdbc.level = DEBUG
+
appender.testlogger.name = TestLogger
appender.testlogger.type = CONSOLE
appender.testlogger.target = SYSTEM_ERR