Repository: flink
Updated Branches:
  refs/heads/master 508965e69 -> 74b09ce0d


[FLINK-4123] [cassandra] Fix concurrency issue in CassandraTupleWriteAheadSink

The updatesCount variable in the CassandraTupleWriteAheadSink.sendValues did 
not have
guaranteed visibility. Thus, it was possible that the callback thread would 
read an
outdated value for updatesCount, resulting in a deadlock. Replacing IntValue 
updatesCount
with AtomicInteger updatesCount fixes this issue.

Furthermore, the PR hardens the CassandraTupleWriteAheadSinkTest which could 
have failed
with a NPE if the callback runnable was not set in time.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/74b09ce0
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/74b09ce0
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/74b09ce0

Branch: refs/heads/master
Commit: 74b09ce0db4d24a0ac25de2ecac391fdf8bd5a90
Parents: 5c2da21
Author: Till Rohrmann <trohrm...@apache.org>
Authored: Tue Jul 12 14:44:29 2016 +0200
Committer: Till Rohrmann <trohrm...@apache.org>
Committed: Tue Jul 12 18:07:13 2016 +0200

----------------------------------------------------------------------
 .../cassandra/CassandraTupleWriteAheadSink.java |  24 +--
 .../cassandra/CassandraConnectorUnitTest.java   | 158 -------------------
 .../CassandraTupleWriteAheadSinkTest.java       | 127 +++++++++++++++
 .../operators/GenericWriteAheadSink.java        |   3 +
 4 files changed, 142 insertions(+), 170 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/74b09ce0/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraTupleWriteAheadSink.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraTupleWriteAheadSink.java
 
b/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraTupleWriteAheadSink.java
index 80dbcfe..1928431 100644
--- 
a/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraTupleWriteAheadSink.java
+++ 
b/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraTupleWriteAheadSink.java
@@ -31,7 +31,6 @@ import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.api.java.typeutils.runtime.TupleSerializer;
 import org.apache.flink.streaming.runtime.operators.CheckpointCommitter;
 import org.apache.flink.streaming.runtime.operators.GenericWriteAheadSink;
-import org.apache.flink.types.IntValue;
 
 import java.util.UUID;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -97,7 +96,7 @@ public class CassandraTupleWriteAheadSink<IN extends Tuple> 
extends GenericWrite
 
        @Override
        protected boolean sendValues(Iterable<IN> values, long timestamp) 
throws Exception {
-               final IntValue updatesCount = new IntValue(0);
+               final AtomicInteger updatesCount = new AtomicInteger(0);
                final AtomicInteger updatesConfirmed = new AtomicInteger(0);
 
                final AtomicReference<Throwable> exception = new 
AtomicReference<>();
@@ -106,8 +105,8 @@ public class CassandraTupleWriteAheadSink<IN extends Tuple> 
extends GenericWrite
                        @Override
                        public void onSuccess(ResultSet resultSet) {
                                updatesConfirmed.incrementAndGet();
-                               if (updatesCount.getValue() > 0) { // only set 
if all updates have been sent
-                                       if (updatesCount.getValue() == 
updatesConfirmed.get()) {
+                               if (updatesCount.get() > 0) { // only set if 
all updates have been sent
+                                       if (updatesCount.get() == 
updatesConfirmed.get()) {
                                                synchronized (updatesConfirmed) 
{
                                                        
updatesConfirmed.notifyAll();
                                                }
@@ -142,18 +141,19 @@ public class CassandraTupleWriteAheadSink<IN extends 
Tuple> extends GenericWrite
                                Futures.addCallback(result, callback);
                        }
                }
-               updatesCount.setValue(updatesSent);
+               updatesCount.set(updatesSent);
 
                synchronized (updatesConfirmed) {
-                       while (updatesSent != updatesConfirmed.get()) {
-                               if (exception.get() != null) { // verify that 
no query failed until now
-                                       LOG.warn("Sending a value failed.", 
exception.get());
-                                       break;
-                               }
+                       while (exception.get() == null && updatesSent != 
updatesConfirmed.get()) {
                                updatesConfirmed.wait();
                        }
                }
-               boolean success = updatesSent == updatesConfirmed.get();
-               return success;
+
+               if (exception.get() != null) {
+                       LOG.warn("Sending a value failed.", exception.get());
+                       return false;
+               } else {
+                       return true;
+               }
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/74b09ce0/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorUnitTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorUnitTest.java
 
b/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorUnitTest.java
deleted file mode 100644
index e7d9df9..0000000
--- 
a/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorUnitTest.java
+++ /dev/null
@@ -1,158 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.connectors.cassandra;
-
-import com.datastax.driver.core.BoundStatement;
-import com.datastax.driver.core.Cluster;
-import com.datastax.driver.core.PreparedStatement;
-import com.datastax.driver.core.ResultSetFuture;
-import com.datastax.driver.core.Session;
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.java.tuple.Tuple0;
-import org.apache.flink.api.java.typeutils.TupleTypeInfo;
-import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
-import org.apache.flink.streaming.runtime.operators.CheckpointCommitter;
-import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
-import org.apache.flink.util.IterableIterator;
-import org.junit.Assert;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.mockito.Matchers;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
-import org.powermock.core.classloader.annotations.PowerMockIgnore;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
-
-import java.util.Iterator;
-import java.util.concurrent.Executor;
-import java.util.concurrent.atomic.AtomicReference;
-
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyString;
-import static org.powermock.api.mockito.PowerMockito.doAnswer;
-import static org.powermock.api.mockito.PowerMockito.mock;
-import static org.powermock.api.mockito.PowerMockito.when;
-
-@RunWith(PowerMockRunner.class)
-@PrepareForTest({ResultPartitionWriter.class, 
CassandraTupleWriteAheadSink.class})
-@PowerMockIgnore({"javax.management.*", "com.sun.jndi.*"})
-public class CassandraConnectorUnitTest {
-       @Test
-       public void testAckLoopExitOnException() throws Exception {
-               final AtomicReference<Runnable> callback = new 
AtomicReference<>();
-
-               final ClusterBuilder clusterBuilder = new ClusterBuilder() {
-                       @Override
-                       protected Cluster buildCluster(Cluster.Builder builder) 
{
-                               try {
-                                       BoundStatement boundStatement = 
mock(BoundStatement.class);
-                                       
when(boundStatement.setDefaultTimestamp(any(long.class))).thenReturn(boundStatement);
-
-                                       PreparedStatement preparedStatement = 
mock(PreparedStatement.class);
-                                       
when(preparedStatement.bind(Matchers.anyVararg())).thenReturn(boundStatement);
-
-                                       ResultSetFuture future = 
mock(ResultSetFuture.class);
-                                       when(future.get()).thenThrow(new 
RuntimeException("Expected exception."));
-
-                                       doAnswer(new Answer<Void>() {
-                                               @Override
-                                               public Void 
answer(InvocationOnMock invocationOnMock) throws Throwable {
-                                                       
callback.set((((Runnable) invocationOnMock.getArguments()[0])));
-                                                       return null;
-                                               }
-                                       
}).when(future).addListener(any(Runnable.class), any(Executor.class));
-
-                                       Session session = mock(Session.class);
-                                       
when(session.prepare(anyString())).thenReturn(preparedStatement);
-                                       
when(session.executeAsync(any(BoundStatement.class))).thenReturn(future);
-
-                                       Cluster cluster = mock(Cluster.class);
-                                       
when(cluster.connect()).thenReturn(session);
-                                       return cluster;
-                               } catch (Exception e) {
-                                       throw new RuntimeException(e);
-                               }
-                       }
-               };
-
-               final IterableIterator<Tuple0> iter = new 
IterableIterator<Tuple0>() {
-                       private boolean exhausted = false;
-
-                       @Override
-                       public boolean hasNext() {
-                               return !exhausted;
-                       }
-
-                       @Override
-                       public Tuple0 next() {
-                               exhausted = true;
-                               return new Tuple0();
-                       }
-
-                       @Override
-                       public void remove() {
-                       }
-
-                       @Override
-                       public Iterator<Tuple0> iterator() {
-                               return this;
-                       }
-               };
-
-               final AtomicReference<Boolean> exceptionCaught = new 
AtomicReference<>();
-
-               Thread t = new Thread() {
-                       public void run() {
-                               try {
-                                       CheckpointCommitter cc = 
mock(CheckpointCommitter.class);
-                                       final 
CassandraTupleWriteAheadSink<Tuple0> sink = new CassandraTupleWriteAheadSink<>(
-                                               "abc",
-                                               
TupleTypeInfo.of(Tuple0.class).createSerializer(new ExecutionConfig()),
-                                               clusterBuilder,
-                                               cc
-                                       );
-
-                                       
OneInputStreamOperatorTestHarness<Tuple0, Tuple0> harness = new 
OneInputStreamOperatorTestHarness(sink);
-                                       
harness.getEnvironment().getTaskConfiguration().setBoolean("checkpointing", 
true);
-
-                                       harness.setup();
-                                       sink.open();
-                                       boolean result = sink.sendValues(iter, 
0L);
-                                       sink.close();
-                                       exceptionCaught.set(result == false);
-                               } catch (Exception e) {
-                                       throw new RuntimeException(e);
-                               }
-                       }
-               };
-               t.start();
-
-               int count = 0;
-               while (t.getState() != Thread.State.WAITING && count < 100) { 
// 10 second timeout 10 * 10 * 100ms
-                       Thread.sleep(100);
-                       count++;
-               }
-
-               callback.get().run();
-
-               t.join();
-
-               Assert.assertTrue(exceptionCaught.get());
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/74b09ce0/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraTupleWriteAheadSinkTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraTupleWriteAheadSinkTest.java
 
b/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraTupleWriteAheadSinkTest.java
new file mode 100644
index 0000000..847d1a0
--- /dev/null
+++ 
b/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraTupleWriteAheadSinkTest.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.cassandra;
+
+import com.datastax.driver.core.BoundStatement;
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.PreparedStatement;
+import com.datastax.driver.core.ResultSetFuture;
+import com.datastax.driver.core.Session;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.java.tuple.Tuple0;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.streaming.runtime.operators.CheckpointCommitter;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.junit.Test;
+import org.mockito.Matchers;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import java.util.Collections;
+import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.junit.Assert.assertFalse;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyString;
+import static org.powermock.api.mockito.PowerMockito.doAnswer;
+import static org.powermock.api.mockito.PowerMockito.mock;
+import static org.powermock.api.mockito.PowerMockito.when;
+
+public class CassandraTupleWriteAheadSinkTest {
+
+       @Test(timeout=20000)
+       public void testAckLoopExitOnException() throws Exception {
+               final AtomicReference<Runnable> runnableFuture = new 
AtomicReference<>();
+
+               final ClusterBuilder clusterBuilder = new ClusterBuilder() {
+                       private static final long serialVersionUID = 
4624400760492936756L;
+
+                       @Override
+                       protected Cluster buildCluster(Cluster.Builder builder) 
{
+                               try {
+                                       BoundStatement boundStatement = 
mock(BoundStatement.class);
+                                       
when(boundStatement.setDefaultTimestamp(any(long.class))).thenReturn(boundStatement);
+
+                                       PreparedStatement preparedStatement = 
mock(PreparedStatement.class);
+                                       
when(preparedStatement.bind(Matchers.anyVararg())).thenReturn(boundStatement);
+
+                                       ResultSetFuture future = 
mock(ResultSetFuture.class);
+                                       when(future.get()).thenThrow(new 
RuntimeException("Expected exception."));
+
+                                       doAnswer(new Answer<Void>() {
+                                               @Override
+                                               public Void 
answer(InvocationOnMock invocationOnMock) throws Throwable {
+                                                       synchronized 
(runnableFuture) {
+                                                               
runnableFuture.set((((Runnable) invocationOnMock.getArguments()[0])));
+                                                               
runnableFuture.notifyAll();
+                                                       }
+                                                       return null;
+                                               }
+                                       
}).when(future).addListener(any(Runnable.class), any(Executor.class));
+
+                                       Session session = mock(Session.class);
+                                       
when(session.prepare(anyString())).thenReturn(preparedStatement);
+                                       
when(session.executeAsync(any(BoundStatement.class))).thenReturn(future);
+
+                                       Cluster cluster = mock(Cluster.class);
+                                       
when(cluster.connect()).thenReturn(session);
+                                       return cluster;
+                               } catch (Exception e) {
+                                       throw new RuntimeException(e);
+                               }
+                       }
+               };
+
+               // Our asynchronous executor thread
+               new Thread(new Runnable() {
+                       @Override
+                       public void run() {
+                               synchronized (runnableFuture) {
+                                       while (runnableFuture.get() == null) {
+                                               try {
+                                                       runnableFuture.wait();
+                                               } catch (InterruptedException 
e) {
+                                                       // ignore interrupts
+                                               }
+                                       }
+                               }
+                               runnableFuture.get().run();
+                       }
+               }).start();
+
+               CheckpointCommitter cc = mock(CheckpointCommitter.class);
+               final CassandraTupleWriteAheadSink<Tuple0> sink = new 
CassandraTupleWriteAheadSink<>(
+                       "abc",
+                       TupleTypeInfo.of(Tuple0.class).createSerializer(new 
ExecutionConfig()),
+                       clusterBuilder,
+                       cc
+               );
+
+               OneInputStreamOperatorTestHarness<Tuple0, Tuple0> harness = new 
OneInputStreamOperatorTestHarness(sink);
+               
harness.getEnvironment().getTaskConfiguration().setBoolean("checkpointing", 
true);
+
+               harness.setup();
+               sink.open();
+
+               // we should leave the loop and return false since we've seen 
an exception
+               assertFalse(sink.sendValues(Collections.singleton(new 
Tuple0()), 0L));
+
+               sink.close();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/74b09ce0/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/GenericWriteAheadSink.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/GenericWriteAheadSink.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/GenericWriteAheadSink.java
index b6cc399..5545717 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/GenericWriteAheadSink.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/GenericWriteAheadSink.java
@@ -190,6 +190,9 @@ public abstract class GenericWriteAheadSink<IN> extends 
AbstractStreamOperator<I
         * used since the last completed checkpoint.
         **/
        public static class ExactlyOnceState implements 
StateHandle<Serializable> {
+
+               private static final long serialVersionUID = 
-3571063495273460743L;
+
                protected TreeMap<Long, Tuple2<Long, 
StateHandle<DataInputView>>> pendingHandles;
 
                public ExactlyOnceState() {

Reply via email to