[FLINK-3265] [rabbitmq] Fix concurrency bug in RabbitMQ

This closes #1534


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6b01a890
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6b01a890
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6b01a890

Branch: refs/heads/master
Commit: 6b01a89020f2de3f7710cf72336291b1e8ca8562
Parents: d97fcda
Author: Robert Metzger <rmetz...@apache.org>
Authored: Thu Jan 21 12:22:21 2016 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu Jan 28 14:43:03 2016 +0100

----------------------------------------------------------------------
 .../connectors/rabbitmq/RMQSource.java          |  4 +-
 .../connectors/rabbitmq/RMQSourceTest.java      | 79 ++++++++++++++++++++
 .../source/MessageAcknowledgingSourceBase.java  | 51 +++++++------
 ...ltipleIdsMessageAcknowledgingSourceBase.java | 24 +++---
 4 files changed, 124 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/6b01a890/flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java
 
b/flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java
index 09bb07c..59bc057 100644
--- 
a/flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java
+++ 
b/flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java
@@ -196,7 +196,9 @@ public class RMQSource<OUT> extends 
MultipleIdsMessageAcknowledgingSourceBase<OU
                                                        continue;
                                                }
                                        }
-                                       sessionIds.add(deliveryTag);
+                                       synchronized (sessionIdsPerSnapshot) {
+                                               sessionIds.add(deliveryTag);
+                                       }
                                }
 
                                ctx.collect(result);

http://git-wip-us.apache.org/repos/asf/flink/blob/6b01a890/flink-streaming-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSourceTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSourceTest.java
 
b/flink-streaming-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSourceTest.java
index aa19e5d..0a3de84 100644
--- 
a/flink-streaming-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSourceTest.java
+++ 
b/flink-streaming-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSourceTest.java
@@ -23,6 +23,7 @@ import com.rabbitmq.client.Envelope;
 import com.rabbitmq.client.QueueingConsumer;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.api.java.tuple.Tuple1;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.configuration.Configuration;
@@ -31,6 +32,7 @@ import 
org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.util.serialization.DeserializationSchema;
 import org.junit.After;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -103,6 +105,83 @@ public class RMQSourceTest {
                sourceThread.join();
        }
 
+       /**
+        * Make sure concurrent access to snapshotState() and 
notifyCheckpointComplete() don't cause
+        * an issue.
+        *
+        * Without proper synchronization, the test will fail with a concurrent 
modification exception
+        *
+        */
+       @Test
+       public void testConcurrentAccess() throws Exception {
+               source.autoAck = false;
+               sourceThread.start();
+
+               final Tuple1<Throwable> error = new Tuple1<>(null);
+
+               Thread.sleep(5);
+
+               Thread snapshotThread = new Thread(new Runnable() {
+                       public long id = 0;
+
+                       @Override
+                       public void run() {
+                               while (!Thread.interrupted()) {
+                                       try {
+                                               source.snapshotState(id++, 0);
+                                       } catch (Exception e) {
+                                               error.f0 = e;
+                                               break; // stop thread
+                                       }
+                               }
+                       }
+               });
+
+               Thread notifyThread = new Thread(new Runnable() {
+                       @Override
+                       public void run() {
+                               while (!Thread.interrupted()) {
+                                       try {
+                                               // always remove all checkpoints
+                                               
source.notifyCheckpointComplete(Long.MAX_VALUE);
+                                       } catch (Exception e) {
+                                               error.f0 = e;
+                                               break; // stop thread
+                                       }
+                               }
+                       }
+               });
+
+               snapshotThread.start();
+               notifyThread.start();
+
+               long deadline = System.currentTimeMillis() + 1000L;
+               while(System.currentTimeMillis() < deadline) {
+                       if(!snapshotThread.isAlive()) {
+                               notifyThread.interrupt();
+                               break;
+                       }
+                       if(!notifyThread.isAlive()) {
+                               snapshotThread.interrupt();
+                               break;
+                       }
+                       Thread.sleep(10);
+               }
+               if(snapshotThread.isAlive()) {
+                       snapshotThread.interrupt();
+                       snapshotThread.join();
+               }
+               if(notifyThread.isAlive()) {
+                       notifyThread.interrupt();
+                       notifyThread.join();
+               }
+               if(error.f0 != null) {
+                       error.f0.printStackTrace();
+                       Assert.fail("Test failed with " + 
error.f0.getClass().getCanonicalName());
+               }
+
+       }
+
        @Test
        public void testCheckpointing() throws Exception {
                source.autoAck = false;

http://git-wip-us.apache.org/repos/asf/flink/blob/6b01a890/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/MessageAcknowledgingSourceBase.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/MessageAcknowledgingSourceBase.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/MessageAcknowledgingSourceBase.java
index 4385884..2f865d1 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/MessageAcknowledgingSourceBase.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/MessageAcknowledgingSourceBase.java
@@ -76,6 +76,7 @@ import org.slf4j.LoggerFactory;
  * @param <Type> The type of the messages created by the source.
  * @param <UId> The type of unique IDs which may be used to acknowledge 
elements.
  */
+@SuppressWarnings("SynchronizeOnNonFinalField")
 public abstract class MessageAcknowledgingSourceBase<Type, UId>
        extends RichSourceFunction<Type>
        implements Checkpointed<SerializedCheckpointData[]>, CheckpointNotifier 
{
@@ -166,41 +167,45 @@ public abstract class 
MessageAcknowledgingSourceBase<Type, UId>
                LOG.debug("Snapshotting state. Messages: {}, checkpoint id: {}, 
timestamp: {}",
                                        idsForCurrentCheckpoint, checkpointId, 
checkpointTimestamp);
 
-               pendingCheckpoints.addLast(new Tuple2<>(checkpointId, 
idsForCurrentCheckpoint));
+               synchronized (pendingCheckpoints) {
+                       pendingCheckpoints.addLast(new Tuple2<>(checkpointId, 
idsForCurrentCheckpoint));
 
-               idsForCurrentCheckpoint = new ArrayList<>(64);
+                       idsForCurrentCheckpoint = new ArrayList<>(64);
 
-               return SerializedCheckpointData.fromDeque(pendingCheckpoints, 
idSerializer);
+                       return 
SerializedCheckpointData.fromDeque(pendingCheckpoints, idSerializer);
+               }
        }
 
        @Override
        public void restoreState(SerializedCheckpointData[] state) throws 
Exception {
-               pendingCheckpoints = SerializedCheckpointData.toDeque(state, 
idSerializer);
-               // build a set which contains all processed ids. It may be used 
to check if we have
-               // already processed an incoming message.
-               for (Tuple2<Long, List<UId>> checkpoint : pendingCheckpoints) {
-                       idsProcessedButNotAcknowledged.addAll(checkpoint.f1);
+               synchronized (pendingCheckpoints) {
+                       pendingCheckpoints = 
SerializedCheckpointData.toDeque(state, idSerializer);
+                       // build a set which contains all processed ids. It may 
be used to check if we have
+                       // already processed an incoming message.
+                       for (Tuple2<Long, List<UId>> checkpoint : 
pendingCheckpoints) {
+                               
idsProcessedButNotAcknowledged.addAll(checkpoint.f1);
+                       }
                }
        }
 
        @Override
        public void notifyCheckpointComplete(long checkpointId) throws 
Exception {
                LOG.debug("Committing Messages externally for checkpoint {}", 
checkpointId);
-
-               for (Iterator<Tuple2<Long, List<UId>>> iter = 
pendingCheckpoints.iterator(); iter.hasNext();) {
-                       Tuple2<Long, List<UId>> checkpoint = iter.next();
-                       long id = checkpoint.f0;
-
-                       if (id <= checkpointId) {
-                               LOG.trace("Committing Messages with following 
IDs {}", checkpoint.f1);
-                               acknowledgeIDs(checkpointId, checkpoint.f1);
-                               // remove deduplication data
-                               
idsProcessedButNotAcknowledged.removeAll(checkpoint.f1);
-                               // remove checkpoint data
-                               iter.remove();
-                       }
-                       else {
-                               break;
+               synchronized (pendingCheckpoints) {
+                       for (Iterator<Tuple2<Long, List<UId>>> iter = 
pendingCheckpoints.iterator(); iter.hasNext(); ) {
+                               Tuple2<Long, List<UId>> checkpoint = 
iter.next();
+                               long id = checkpoint.f0;
+
+                               if (id <= checkpointId) {
+                                       LOG.trace("Committing Messages with 
following IDs {}", checkpoint.f1);
+                                       acknowledgeIDs(checkpointId, 
checkpoint.f1);
+                                       // remove deduplication data
+                                       
idsProcessedButNotAcknowledged.removeAll(checkpoint.f1);
+                                       // remove checkpoint data
+                                       iter.remove();
+                               } else {
+                                       break;
+                               }
                        }
                }
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/6b01a890/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/MultipleIdsMessageAcknowledgingSourceBase.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/MultipleIdsMessageAcknowledgingSourceBase.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/MultipleIdsMessageAcknowledgingSourceBase.java
index c097066..4709759 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/MultipleIdsMessageAcknowledgingSourceBase.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/MultipleIdsMessageAcknowledgingSourceBase.java
@@ -107,14 +107,16 @@ public abstract class 
MultipleIdsMessageAcknowledgingSourceBase<Type, UId, Sessi
         */
        protected final void acknowledgeIDs(long checkpointId, List<UId> 
uniqueIds) {
                LOG.debug("Acknowledging ids for checkpoint {}", checkpointId);
-               Iterator<Tuple2<Long, List<SessionId>>> iterator = 
sessionIdsPerSnapshot.iterator();
-               while (iterator.hasNext()) {
-                       final Tuple2<Long, List<SessionId>> next = 
iterator.next();
-                       long id = next.f0;
-                       if (id <= checkpointId) {
-                               acknowledgeSessionIDs(next.f1);
-                               // remove ids for this session
-                               iterator.remove();
+               synchronized (sessionIdsPerSnapshot) {
+                       Iterator<Tuple2<Long, List<SessionId>>> iterator = 
sessionIdsPerSnapshot.iterator();
+                       while (iterator.hasNext()) {
+                               final Tuple2<Long, List<SessionId>> next = 
iterator.next();
+                               long id = next.f0;
+                               if (id <= checkpointId) {
+                                       acknowledgeSessionIDs(next.f1);
+                                       // remove ids for this session
+                                       iterator.remove();
+                               }
                        }
                }
        }
@@ -132,8 +134,10 @@ public abstract class 
MultipleIdsMessageAcknowledgingSourceBase<Type, UId, Sessi
 
        @Override
        public SerializedCheckpointData[] snapshotState(long checkpointId, long 
checkpointTimestamp) throws Exception {
-               sessionIdsPerSnapshot.add(new Tuple2<>(checkpointId, 
sessionIds));
-               sessionIds = new ArrayList<>(64);
+               synchronized (sessionIdsPerSnapshot) {
+                       sessionIdsPerSnapshot.add(new Tuple2<>(checkpointId, 
sessionIds));
+                       sessionIds = new ArrayList<>(64);
+               }
                return super.snapshotState(checkpointId, checkpointTimestamp);
        }
 }

Reply via email to