[
https://issues.apache.org/jira/browse/FLINK-5701?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15857769#comment-15857769
]
ASF GitHub Bot commented on FLINK-5701:
---------------------------------------
Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/3278#discussion_r100030377
--- Diff:
flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBaseTest.java
---
@@ -293,6 +293,61 @@ public void run() {
testHarness.close();
}
+ /**
+ * This test is meant to assure that testAtLeastOnceProducer is valid
by testing that if flushing is disabled,
+ * the snapshot method does indeed finishes without waiting for pending
records;
+ * we set a timeout because the test will not finish if the logic is
broken
+ */
+ @Test(timeout=5000)
+ public void testDoesNotWaitForPendingRecordsIfFlushingDisabled() throws
Throwable {
+ final OneShotLatch inputLatch = new OneShotLatch();
+
+ final DummyFlinkKafkaProducer<String> producer = new
DummyFlinkKafkaProducer<>(
+ FakeStandardProducerConfig.get(), null, inputLatch,
100, new AtomicBoolean(false));
+ producer.setFlushOnCheckpoint(false);
+
+ final OneInputStreamOperatorTestHarness<String, Object>
testHarness =
+ new OneInputStreamOperatorTestHarness<>(new
StreamSink(producer));
+
+ testHarness.open();
+
+ List<Callback> pending =
producer.getProducerInstance().getPending();
+
+ for (int i = 0; i < 100; i++) {
+ testHarness.processElement(new StreamRecord<>("msg-" +
i));
+ }
+
+ inputLatch.await();
+
+ // make sure that all callbacks have not been completed
+ Assert.assertEquals(100, pending.size());
+
+ // use a separate thread to continuously monitor whether
snapshotting has returned
+ final Tuple1<Throwable> runnableError = new Tuple1<>(null);
+ Thread snapshotThread = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ testHarness.snapshot(123L, 123L);
+ } catch (Exception e) {
+ runnableError.f0 = e;
+ }
+ }
+ });
--- End diff --
Why this additional `Thread`? I think calling directly
`testHarness.snapshot` is perfectly fine. Whether you block on this call or on
the `Thread.join` does not make a difference.
> FlinkKafkaProducer should check asyncException on checkpoints
> -------------------------------------------------------------
>
> Key: FLINK-5701
> URL: https://issues.apache.org/jira/browse/FLINK-5701
> Project: Flink
> Issue Type: Bug
> Components: Kafka Connector, Streaming Connectors
> Reporter: Tzu-Li (Gordon) Tai
> Priority: Critical
>
> Reported in ML:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Fink-KafkaProducer-Data-Loss-td11413.html
> The problem:
> The producer holds a {{pendingRecords}} value that is incremented on each
> invoke() and decremented on each callback, used to check if the producer
> needs to sync on pending callbacks on checkpoints.
> On each checkpoint, we should only consider the checkpoint succeeded iff
> after flushing the {{pendingRecords == 0}} and {{asyncException == null}}
> (currently, we’re only checking {{pendingRecords}}).
> A quick fix for this is to check and rethrow async exceptions in the
> {{snapshotState}} method both before and after flushing and
> {{pendingRecords}} becomes 0.
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)