Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/2108#discussion_r69136271
--- Diff:
flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/AtLeastOnceProducerTest.java
---
@@ -35,69 +35,77 @@
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.junit.Assert;
import org.junit.Test;
+import scala.concurrent.duration.Deadline;
+import scala.concurrent.duration.FiniteDuration;
+import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.Future;
-import java.util.concurrent.atomic.AtomicBoolean;
/**
* Test ensuring that the producer is not dropping buffered records
*/
@SuppressWarnings("unchecked")
public class AtLeastOnceProducerTest {
- @Test
+ // we set a timeout because the test will not finish if the logic is
broken
+ @Test(timeout=5000)
public void testAtLeastOnceProducer() throws Exception {
runTest(true);
}
// This test ensures that the actual test fails if the flushing is
disabled
- @Test(expected = AssertionError.class)
+ @Test(expected = AssertionError.class, timeout=5000)
public void ensureTestFails() throws Exception {
runTest(false);
}
private void runTest(boolean flushOnCheckpoint) throws Exception {
Properties props = new Properties();
- final TestingKafkaProducer<String> producer = new
TestingKafkaProducer<>("someTopic", new KeyedSerializationSchemaWrapper<>(new
SimpleStringSchema()), props);
+ final OneShotLatch snapshottingFinished = new OneShotLatch();
+ final TestingKafkaProducer<String> producer = new
TestingKafkaProducer<>("someTopic", new KeyedSerializationSchemaWrapper<>(new
SimpleStringSchema()), props,
+ snapshottingFinished);
producer.setFlushOnCheckpoint(flushOnCheckpoint);
producer.setRuntimeContext(new MockRuntimeContext(0, 1));
producer.open(new Configuration());
- for(int i = 0; i < 100; i++) {
+ for (int i = 0; i < 100; i++) {
producer.invoke("msg-" + i);
}
// start a thread confirming all pending records
final Tuple1<Throwable> runnableError = new Tuple1<>(null);
- final AtomicBoolean markOne = new AtomicBoolean(false);
+ final Thread threadA = Thread.currentThread();
+
Runnable confirmer = new Runnable() {
@Override
public void run() {
try {
MockProducer mp =
producer.getProducerInstance();
List<Callback> pending =
mp.getPending();
- // we ensure thread A is locked and
didn't reach markOne
- // give thread A some time to really
reach the snapshot state
- Thread.sleep(500);
- if(markOne.get()) {
- Assert.fail("Snapshot was
confirmed even though messages " +
- "were still in
the buffer");
+ // we need to find out if the
snapshot() method blocks forever
+ // this is not possible. If snapshot()
is running, it will
+ // start removing elements from the
pending list.
+ synchronized (threadA) {
+ threadA.wait(500L);
}
+ // we now check that no records have
been confirmed yet
Assert.assertEquals(100,
pending.size());
+ Assert.assertFalse("Snapshot method
returned before all records were confirmed",
+
snapshottingFinished.hasTriggered());
// now confirm all checkpoints
- for(Callback c: pending) {
+ for (Callback c: pending) {
c.onCompletion(null, null);
}
pending.clear();
- // wait for the snapshotState() method
to return
- Thread.sleep(100);
- Assert.assertTrue("Snapshot state
didn't return", markOne.get());
+ // wait for the snapshotState() method
to return. The will
+ // fail if snapshotState never returns.
+ snapshottingFinished.await();
--- End diff --
I think you don't need this condition here. There are two possibilities:
1. ThreadA leaves `snapshotState` successfully and waits for `threadB`.
Since it completed `snapshotState`, the `snapshottingFinished` will be
triggered. Thus, there is no waiting.
2. ThreadA blocks in `snapshotState`. Then `threadB` does not have to block
to trigger the timeout, because `threadA` is already blocked.
Consequently, I think you could replace the `OneShotLatch` with a volatile
boolean.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---