[
https://issues.apache.org/jira/browse/FLINK-5701?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15874242#comment-15874242
]
ASF GitHub Bot commented on FLINK-5701:
---------------------------------------
Github user tzulitai commented on a diff in the pull request:
https://github.com/apache/flink/pull/3278#discussion_r101973453
--- Diff:
flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBaseTest.java
---
@@ -88,201 +85,321 @@ public void testKeyValueDeserializersSetIfMissing()
throws Exception {
@Test
public void testPartitionerOpenedWithDeterminatePartitionList() throws
Exception {
KafkaPartitioner mockPartitioner = mock(KafkaPartitioner.class);
+
RuntimeContext mockRuntimeContext = mock(RuntimeContext.class);
when(mockRuntimeContext.getIndexOfThisSubtask()).thenReturn(0);
when(mockRuntimeContext.getNumberOfParallelSubtasks()).thenReturn(1);
-
- DummyFlinkKafkaProducer producer = new DummyFlinkKafkaProducer(
+
+ // out-of-order list of 4 partitions
+ List<PartitionInfo> mockPartitionsList = new ArrayList<>(4);
+ mockPartitionsList.add(new
PartitionInfo(DummyFlinkKafkaProducer.DUMMY_TOPIC, 3, null, null, null));
+ mockPartitionsList.add(new
PartitionInfo(DummyFlinkKafkaProducer.DUMMY_TOPIC, 1, null, null, null));
+ mockPartitionsList.add(new
PartitionInfo(DummyFlinkKafkaProducer.DUMMY_TOPIC, 0, null, null, null));
+ mockPartitionsList.add(new
PartitionInfo(DummyFlinkKafkaProducer.DUMMY_TOPIC, 2, null, null, null));
+
+ final DummyFlinkKafkaProducer producer = new
DummyFlinkKafkaProducer(
FakeStandardProducerConfig.get(), mockPartitioner);
producer.setRuntimeContext(mockRuntimeContext);
+ final KafkaProducer mockProducer =
producer.getMockKafkaProducer();
+
when(mockProducer.partitionsFor(anyString())).thenReturn(mockPartitionsList);
+ when(mockProducer.metrics()).thenReturn(null);
+
producer.open(new Configuration());
- // the internal mock KafkaProducer will return an out-of-order
list of 4 partitions,
- // which should be sorted before provided to the custom
partitioner's open() method
+ // the out-of-order partitions list should be sorted before
provided to the custom partitioner's open() method
int[] correctPartitionList = {0, 1, 2, 3};
verify(mockPartitioner).open(0, 1, correctPartitionList);
}
/**
- * Test ensuring that the producer is not dropping buffered records.;
- * we set a timeout because the test will not finish if the logic is
broken
+ * Test ensuring that if an invoke call happens right after an async
exception is caught, it should be rethrown
*/
- @Test(timeout=5000)
- public void testAtLeastOnceProducer() throws Throwable {
- runAtLeastOnceTest(true);
+ @Test
+ public void testAsyncErrorRethrownOnInvoke() throws Throwable {
+ final DummyFlinkKafkaProducer<String> producer = new
DummyFlinkKafkaProducer<>(
+ FakeStandardProducerConfig.get(), null);
+
+ OneInputStreamOperatorTestHarness<String, Object> testHarness =
+ new OneInputStreamOperatorTestHarness<>(new
StreamSink<>(producer));
+
+ testHarness.open();
+
+ testHarness.processElement(new StreamRecord<>("msg-1"));
+
+ // let the message request return an async exception
+ producer.getPendingCallbacks().get(0).onCompletion(null, new
Exception("artificial async exception"));
+
+ try {
+ testHarness.processElement(new StreamRecord<>("msg-2"));
+ } catch (Exception e) {
+ // the next invoke should rethrow the async exception
+
Assert.assertTrue(e.getCause().getMessage().contains("artificial async
exception"));
+
+ // test succeeded
+ return;
+ }
+
+ Assert.fail();
}
/**
- * Ensures that the at least once producing test fails if the flushing
is disabled
+ * Test ensuring that if a snapshot call happens right after an async
exception is caught, it should be rethrown
*/
- @Test(expected = AssertionError.class, timeout=5000)
- public void testAtLeastOnceProducerFailsIfFlushingDisabled() throws
Throwable {
- runAtLeastOnceTest(false);
- }
-
- private void runAtLeastOnceTest(boolean flushOnCheckpoint) throws
Throwable {
- final AtomicBoolean snapshottingFinished = new
AtomicBoolean(false);
+ @Test
+ public void testAsyncErrorRethrownOnCheckpoint() throws Throwable {
final DummyFlinkKafkaProducer<String> producer = new
DummyFlinkKafkaProducer<>(
- FakeStandardProducerConfig.get(), null,
snapshottingFinished);
- producer.setFlushOnCheckpoint(flushOnCheckpoint);
+ FakeStandardProducerConfig.get(), null);
OneInputStreamOperatorTestHarness<String, Object> testHarness =
- new OneInputStreamOperatorTestHarness<>(new
StreamSink(producer));
+ new OneInputStreamOperatorTestHarness<>(new
StreamSink<>(producer));
testHarness.open();
- for (int i = 0; i < 100; i++) {
- testHarness.processElement(new StreamRecord<>("msg-" +
i));
+ testHarness.processElement(new StreamRecord<>("msg-1"));
+
+ // let the message request return an async exception
+ producer.getPendingCallbacks().get(0).onCompletion(null, new
Exception("artificial async exception"));
+
+ try {
+ testHarness.snapshot(123L, 123L);
+ } catch (Exception e) {
+ // the next invoke should rethrow the async exception
+
Assert.assertTrue(e.getCause().getMessage().contains("artificial async
exception"));
+
+ // test succeeded
+ return;
}
- // start a thread confirming all pending records
- final Tuple1<Throwable> runnableError = new Tuple1<>(null);
- final Thread threadA = Thread.currentThread();
+ Assert.fail();
+ }
+
+ /**
+ * Test ensuring that if an async exception is caught for one of the
flushed requests on checkpoint,
+ * it should be rethrown; we set a timeout because the test will not
finish if the logic is broken.
+ *
+ * Note that this test does not test the snapshot method is blocked
correctly when there are pending recorrds.
+ * The test for that is covered in testAtLeastOnceProducer.
+ */
+ @SuppressWarnings("unchecked")
+ @Test(timeout=5000)
+ public void testAsyncErrorRethrownOnCheckpointAfterFlush() throws
Throwable {
+ final DummyFlinkKafkaProducer<String> producer = new
DummyFlinkKafkaProducer<>(
+ FakeStandardProducerConfig.get(), null);
+ producer.setFlushOnCheckpoint(true);
+
+ final KafkaProducer<?, ?> mockProducer =
producer.getMockKafkaProducer();
+
+ final OneInputStreamOperatorTestHarness<String, Object>
testHarness =
+ new OneInputStreamOperatorTestHarness<>(new
StreamSink<>(producer));
+
+ testHarness.open();
+
+ testHarness.processElement(new StreamRecord<>("msg-1"));
+ testHarness.processElement(new StreamRecord<>("msg-2"));
+ testHarness.processElement(new StreamRecord<>("msg-3"));
+
+ verify(mockProducer, times(3)).send(any(ProducerRecord.class),
any(Callback.class));
+
+ // only let the first callback succeed for now
+ producer.getPendingCallbacks().get(0).onCompletion(null, null);
- Runnable confirmer = new Runnable() {
+ CheckedThread snapshotThread = new CheckedThread() {
@Override
- public void run() {
- try {
- MockProducer mp =
producer.getProducerInstance();
- List<Callback> pending =
mp.getPending();
-
- // we need to find out if the
snapshot() method blocks forever
- // this is not possible. If snapshot()
is running, it will
- // start removing elements from the
pending list.
- synchronized (threadA) {
- threadA.wait(500L);
- }
- // we now check that no records have
been confirmed yet
- Assert.assertEquals(100,
pending.size());
- Assert.assertFalse("Snapshot method
returned before all records were confirmed",
- snapshottingFinished.get());
-
- // now confirm all checkpoints
- for (Callback c: pending) {
- c.onCompletion(null, null);
- }
- pending.clear();
- } catch(Throwable t) {
- runnableError.f0 = t;
- }
+ public void go() throws Exception {
+ // this should block at first, since there are
still two pending records that needs to be flushed
+ testHarness.snapshot(123L, 123L);
}
};
- Thread threadB = new Thread(confirmer);
- threadB.start();
+ snapshotThread.start();
- // this should block:
- testHarness.snapshot(0, 0);
+ // let the 2nd message fail with an async exception
+ producer.getPendingCallbacks().get(1).onCompletion(null, new
Exception("artificial async failure for 2nd message"));
+ producer.getPendingCallbacks().get(2).onCompletion(null, null);
- synchronized (threadA) {
- threadA.notifyAll(); // just in case, to let the test
fail faster
- }
- Assert.assertEquals(0,
producer.getProducerInstance().getPending().size());
- Deadline deadline = FiniteDuration.apply(5, "s").fromNow();
- while (deadline.hasTimeLeft() && threadB.isAlive()) {
- threadB.join(500);
- }
- Assert.assertFalse("Thread A is expected to be finished at this
point. If not, the test is prone to fail", threadB.isAlive());
- if (runnableError.f0 != null) {
- throw runnableError.f0;
+ try {
+ snapshotThread.sync();
+ } catch (Exception e) {
+ // the snapshot should have failed with the async
exception
+
Assert.assertTrue(e.getCause().getMessage().contains("artificial async failure
for 2nd message"));
+
+ // test succeeded
+ return;
}
+ Assert.fail();
+ }
+
+ /**
+ * Test ensuring that the producer is not dropping buffered records;
+ * we set a timeout because the test will not finish if the logic is
broken
+ */
+ @SuppressWarnings("unchecked")
+ @Test(timeout=10000)
+ public void testAtLeastOnceProducer() throws Throwable {
+ final DummyFlinkKafkaProducer<String> producer = new
DummyFlinkKafkaProducer<>(
+ FakeStandardProducerConfig.get(), null);
+ producer.setFlushOnCheckpoint(true);
+
+ final KafkaProducer<?, ?> mockProducer =
producer.getMockKafkaProducer();
+
+ final OneInputStreamOperatorTestHarness<String, Object>
testHarness =
+ new OneInputStreamOperatorTestHarness<>(new
StreamSink<>(producer));
+
+ testHarness.open();
+
+ testHarness.processElement(new StreamRecord<>("msg-1"));
+ testHarness.processElement(new StreamRecord<>("msg-2"));
+ testHarness.processElement(new StreamRecord<>("msg-3"));
+
+ verify(mockProducer, times(3)).send(any(ProducerRecord.class),
any(Callback.class));
+ Assert.assertEquals(3, producer.getPendingSize());
+
+ // start a thread to perform checkpointing
+ CheckedThread snapshotThread = new CheckedThread() {
+ @Override
+ public void go() throws Exception {
+ // this should block until all records are
flushed;
+ // if the snapshot implementation returns
before pending records are flushed,
+ testHarness.snapshot(123L, 123L);
+ }
+ };
+ snapshotThread.start();
+
+ // being extra safe that the snapshot is correctly blocked;
+ // after some arbitrary time, the snapshot should still be
blocked
+ snapshotThread.join(3000);
--- End diff --
Now I see what you mean, and the purpose of the latch you were previously
proposing. Thanks for the comment. I will address this and proceed to merge the
PR later today.
> FlinkKafkaProducer should check asyncException on checkpoints
> -------------------------------------------------------------
>
> Key: FLINK-5701
> URL: https://issues.apache.org/jira/browse/FLINK-5701
> Project: Flink
> Issue Type: Bug
> Components: Kafka Connector, Streaming Connectors
> Reporter: Tzu-Li (Gordon) Tai
> Priority: Critical
>
> Reported in ML:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Fink-KafkaProducer-Data-Loss-td11413.html
> The problem:
> The producer holds a {{pendingRecords}} value that is incremented on each
> invoke() and decremented on each callback, used to check if the producer
> needs to sync on pending callbacks on checkpoints.
> On each checkpoint, we should only consider the checkpoint succeeded iff
> after flushing the {{pendingRecords == 0}} and {{asyncException == null}}
> (currently, we’re only checking {{pendingRecords}}).
> A quick fix for this is to check and rethrow async exceptions in the
> {{snapshotState}} method both before and after flushing and
> {{pendingRecords}} becomes 0.
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)