Repository: flink Updated Branches: refs/heads/master 508965e69 -> 74b09ce0d
[FLINK-4123] [cassandra] Fix concurrency issue in CassandraTupleWriteAheadSink The updatesCount variable in the CassandraTupleWriteAheadSink.sendValues did not have guaranteed visibility. Thus, it was possible that the callback thread would read an outdated value for updatesCount, resulting in a deadlock. Replacing IntValue updatesCount with AtomicInteger updatesCount fixes this issue. Furthermore, the PR hardens the CassandraTupleWriteAheadSinkTest which could have failed with a NPE if the callback runnable was not set in time. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/74b09ce0 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/74b09ce0 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/74b09ce0 Branch: refs/heads/master Commit: 74b09ce0db4d24a0ac25de2ecac391fdf8bd5a90 Parents: 5c2da21 Author: Till Rohrmann <trohrm...@apache.org> Authored: Tue Jul 12 14:44:29 2016 +0200 Committer: Till Rohrmann <trohrm...@apache.org> Committed: Tue Jul 12 18:07:13 2016 +0200 ---------------------------------------------------------------------- .../cassandra/CassandraTupleWriteAheadSink.java | 24 +-- .../cassandra/CassandraConnectorUnitTest.java | 158 ------------------- .../CassandraTupleWriteAheadSinkTest.java | 127 +++++++++++++++ .../operators/GenericWriteAheadSink.java | 3 + 4 files changed, 142 insertions(+), 170 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/74b09ce0/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraTupleWriteAheadSink.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraTupleWriteAheadSink.java b/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraTupleWriteAheadSink.java index 80dbcfe..1928431 100644 --- a/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraTupleWriteAheadSink.java +++ b/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraTupleWriteAheadSink.java @@ -31,7 +31,6 @@ import org.apache.flink.api.java.tuple.Tuple; import org.apache.flink.api.java.typeutils.runtime.TupleSerializer; import org.apache.flink.streaming.runtime.operators.CheckpointCommitter; import org.apache.flink.streaming.runtime.operators.GenericWriteAheadSink; -import org.apache.flink.types.IntValue; import java.util.UUID; import java.util.concurrent.atomic.AtomicInteger; @@ -97,7 +96,7 @@ public class CassandraTupleWriteAheadSink<IN extends Tuple> extends GenericWrite @Override protected boolean sendValues(Iterable<IN> values, long timestamp) throws Exception { - final IntValue updatesCount = new IntValue(0); + final AtomicInteger updatesCount = new AtomicInteger(0); final AtomicInteger updatesConfirmed = new AtomicInteger(0); final AtomicReference<Throwable> exception = new AtomicReference<>(); @@ -106,8 +105,8 @@ public class CassandraTupleWriteAheadSink<IN extends Tuple> extends GenericWrite @Override public void onSuccess(ResultSet resultSet) { updatesConfirmed.incrementAndGet(); - if (updatesCount.getValue() > 0) { // only set if all updates have been sent - if (updatesCount.getValue() == updatesConfirmed.get()) { + if (updatesCount.get() > 0) { // only set if all updates have been sent + if (updatesCount.get() == updatesConfirmed.get()) { synchronized (updatesConfirmed) { updatesConfirmed.notifyAll(); } @@ -142,18 +141,19 @@ public class CassandraTupleWriteAheadSink<IN extends Tuple> extends GenericWrite Futures.addCallback(result, callback); } } - updatesCount.setValue(updatesSent); + updatesCount.set(updatesSent); synchronized (updatesConfirmed) { - while (updatesSent != updatesConfirmed.get()) { - if (exception.get() != null) { // verify that no query failed until now - LOG.warn("Sending a value failed.", exception.get()); - break; - } + while (exception.get() == null && updatesSent != updatesConfirmed.get()) { updatesConfirmed.wait(); } } - boolean success = updatesSent == updatesConfirmed.get(); - return success; + + if (exception.get() != null) { + LOG.warn("Sending a value failed.", exception.get()); + return false; + } else { + return true; + } } } http://git-wip-us.apache.org/repos/asf/flink/blob/74b09ce0/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorUnitTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorUnitTest.java b/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorUnitTest.java deleted file mode 100644 index e7d9df9..0000000 --- a/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorUnitTest.java +++ /dev/null @@ -1,158 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.streaming.connectors.cassandra; - -import com.datastax.driver.core.BoundStatement; -import com.datastax.driver.core.Cluster; -import com.datastax.driver.core.PreparedStatement; -import com.datastax.driver.core.ResultSetFuture; -import com.datastax.driver.core.Session; -import org.apache.flink.api.common.ExecutionConfig; -import org.apache.flink.api.java.tuple.Tuple0; -import org.apache.flink.api.java.typeutils.TupleTypeInfo; -import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter; -import org.apache.flink.streaming.runtime.operators.CheckpointCommitter; -import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; -import org.apache.flink.util.IterableIterator; -import org.junit.Assert; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.mockito.Matchers; -import org.mockito.invocation.InvocationOnMock; -import org.mockito.stubbing.Answer; -import org.powermock.core.classloader.annotations.PowerMockIgnore; -import org.powermock.core.classloader.annotations.PrepareForTest; -import org.powermock.modules.junit4.PowerMockRunner; - -import java.util.Iterator; -import java.util.concurrent.Executor; -import java.util.concurrent.atomic.AtomicReference; - -import static org.mockito.Matchers.any; -import static org.mockito.Matchers.anyString; -import static org.powermock.api.mockito.PowerMockito.doAnswer; -import static org.powermock.api.mockito.PowerMockito.mock; -import static org.powermock.api.mockito.PowerMockito.when; - -@RunWith(PowerMockRunner.class) -@PrepareForTest({ResultPartitionWriter.class, CassandraTupleWriteAheadSink.class}) -@PowerMockIgnore({"javax.management.*", "com.sun.jndi.*"}) -public class CassandraConnectorUnitTest { - @Test - public void testAckLoopExitOnException() throws Exception { - final AtomicReference<Runnable> callback = new AtomicReference<>(); - - final ClusterBuilder clusterBuilder = new ClusterBuilder() { - @Override - protected Cluster buildCluster(Cluster.Builder builder) { - try { - BoundStatement boundStatement = mock(BoundStatement.class); - when(boundStatement.setDefaultTimestamp(any(long.class))).thenReturn(boundStatement); - - PreparedStatement preparedStatement = mock(PreparedStatement.class); - when(preparedStatement.bind(Matchers.anyVararg())).thenReturn(boundStatement); - - ResultSetFuture future = mock(ResultSetFuture.class); - when(future.get()).thenThrow(new RuntimeException("Expected exception.")); - - doAnswer(new Answer<Void>() { - @Override - public Void answer(InvocationOnMock invocationOnMock) throws Throwable { - callback.set((((Runnable) invocationOnMock.getArguments()[0]))); - return null; - } - }).when(future).addListener(any(Runnable.class), any(Executor.class)); - - Session session = mock(Session.class); - when(session.prepare(anyString())).thenReturn(preparedStatement); - when(session.executeAsync(any(BoundStatement.class))).thenReturn(future); - - Cluster cluster = mock(Cluster.class); - when(cluster.connect()).thenReturn(session); - return cluster; - } catch (Exception e) { - throw new RuntimeException(e); - } - } - }; - - final IterableIterator<Tuple0> iter = new IterableIterator<Tuple0>() { - private boolean exhausted = false; - - @Override - public boolean hasNext() { - return !exhausted; - } - - @Override - public Tuple0 next() { - exhausted = true; - return new Tuple0(); - } - - @Override - public void remove() { - } - - @Override - public Iterator<Tuple0> iterator() { - return this; - } - }; - - final AtomicReference<Boolean> exceptionCaught = new AtomicReference<>(); - - Thread t = new Thread() { - public void run() { - try { - CheckpointCommitter cc = mock(CheckpointCommitter.class); - final CassandraTupleWriteAheadSink<Tuple0> sink = new CassandraTupleWriteAheadSink<>( - "abc", - TupleTypeInfo.of(Tuple0.class).createSerializer(new ExecutionConfig()), - clusterBuilder, - cc - ); - - OneInputStreamOperatorTestHarness<Tuple0, Tuple0> harness = new OneInputStreamOperatorTestHarness(sink); - harness.getEnvironment().getTaskConfiguration().setBoolean("checkpointing", true); - - harness.setup(); - sink.open(); - boolean result = sink.sendValues(iter, 0L); - sink.close(); - exceptionCaught.set(result == false); - } catch (Exception e) { - throw new RuntimeException(e); - } - } - }; - t.start(); - - int count = 0; - while (t.getState() != Thread.State.WAITING && count < 100) { // 10 second timeout 10 * 10 * 100ms - Thread.sleep(100); - count++; - } - - callback.get().run(); - - t.join(); - - Assert.assertTrue(exceptionCaught.get()); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/74b09ce0/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraTupleWriteAheadSinkTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraTupleWriteAheadSinkTest.java b/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraTupleWriteAheadSinkTest.java new file mode 100644 index 0000000..847d1a0 --- /dev/null +++ b/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraTupleWriteAheadSinkTest.java @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.cassandra; + +import com.datastax.driver.core.BoundStatement; +import com.datastax.driver.core.Cluster; +import com.datastax.driver.core.PreparedStatement; +import com.datastax.driver.core.ResultSetFuture; +import com.datastax.driver.core.Session; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.java.tuple.Tuple0; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; +import org.apache.flink.streaming.runtime.operators.CheckpointCommitter; +import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; +import org.junit.Test; +import org.mockito.Matchers; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +import java.util.Collections; +import java.util.concurrent.Executor; +import java.util.concurrent.atomic.AtomicReference; + +import static org.junit.Assert.assertFalse; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyString; +import static org.powermock.api.mockito.PowerMockito.doAnswer; +import static org.powermock.api.mockito.PowerMockito.mock; +import static org.powermock.api.mockito.PowerMockito.when; + +public class CassandraTupleWriteAheadSinkTest { + + @Test(timeout=20000) + public void testAckLoopExitOnException() throws Exception { + final AtomicReference<Runnable> runnableFuture = new AtomicReference<>(); + + final ClusterBuilder clusterBuilder = new ClusterBuilder() { + private static final long serialVersionUID = 4624400760492936756L; + + @Override + protected Cluster buildCluster(Cluster.Builder builder) { + try { + BoundStatement boundStatement = mock(BoundStatement.class); + when(boundStatement.setDefaultTimestamp(any(long.class))).thenReturn(boundStatement); + + PreparedStatement preparedStatement = mock(PreparedStatement.class); + when(preparedStatement.bind(Matchers.anyVararg())).thenReturn(boundStatement); + + ResultSetFuture future = mock(ResultSetFuture.class); + when(future.get()).thenThrow(new RuntimeException("Expected exception.")); + + doAnswer(new Answer<Void>() { + @Override + public Void answer(InvocationOnMock invocationOnMock) throws Throwable { + synchronized (runnableFuture) { + runnableFuture.set((((Runnable) invocationOnMock.getArguments()[0]))); + runnableFuture.notifyAll(); + } + return null; + } + }).when(future).addListener(any(Runnable.class), any(Executor.class)); + + Session session = mock(Session.class); + when(session.prepare(anyString())).thenReturn(preparedStatement); + when(session.executeAsync(any(BoundStatement.class))).thenReturn(future); + + Cluster cluster = mock(Cluster.class); + when(cluster.connect()).thenReturn(session); + return cluster; + } catch (Exception e) { + throw new RuntimeException(e); + } + } + }; + + // Our asynchronous executor thread + new Thread(new Runnable() { + @Override + public void run() { + synchronized (runnableFuture) { + while (runnableFuture.get() == null) { + try { + runnableFuture.wait(); + } catch (InterruptedException e) { + // ignore interrupts + } + } + } + runnableFuture.get().run(); + } + }).start(); + + CheckpointCommitter cc = mock(CheckpointCommitter.class); + final CassandraTupleWriteAheadSink<Tuple0> sink = new CassandraTupleWriteAheadSink<>( + "abc", + TupleTypeInfo.of(Tuple0.class).createSerializer(new ExecutionConfig()), + clusterBuilder, + cc + ); + + OneInputStreamOperatorTestHarness<Tuple0, Tuple0> harness = new OneInputStreamOperatorTestHarness(sink); + harness.getEnvironment().getTaskConfiguration().setBoolean("checkpointing", true); + + harness.setup(); + sink.open(); + + // we should leave the loop and return false since we've seen an exception + assertFalse(sink.sendValues(Collections.singleton(new Tuple0()), 0L)); + + sink.close(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/74b09ce0/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/GenericWriteAheadSink.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/GenericWriteAheadSink.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/GenericWriteAheadSink.java index b6cc399..5545717 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/GenericWriteAheadSink.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/GenericWriteAheadSink.java @@ -190,6 +190,9 @@ public abstract class GenericWriteAheadSink<IN> extends AbstractStreamOperator<I * used since the last completed checkpoint. **/ public static class ExactlyOnceState implements StateHandle<Serializable> { + + private static final long serialVersionUID = -3571063495273460743L; + protected TreeMap<Long, Tuple2<Long, StateHandle<DataInputView>>> pendingHandles; public ExactlyOnceState() {