[FLINK-4123] Cassandra sink checks for exceptions in ack phase add serialVersionUID
switch to AtomicReference wait-notify disable logging add test case for leaving ackPhaseLoopOnException prevent infinite loop in test This closes #2183. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5c2da21f Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5c2da21f Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5c2da21f Branch: refs/heads/master Commit: 5c2da21f25741502dd8ca64ce9d314a1ebea1441 Parents: 508965e Author: zentol <ches...@apache.org> Authored: Wed Jun 29 16:21:21 2016 +0200 Committer: Till Rohrmann <trohrm...@apache.org> Committed: Tue Jul 12 18:07:13 2016 +0200 ---------------------------------------------------------------------- .../cassandra/CassandraTupleWriteAheadSink.java | 73 +++++---- .../cassandra/CassandraConnectorUnitTest.java | 158 +++++++++++++++++++ .../src/test/resources/log4j-test.properties | 2 +- .../operators/GenericWriteAheadSink.java | 16 +- .../operators/GenericWriteAheadSinkTest.java | 3 +- .../util/OneInputStreamOperatorTestHarness.java | 4 + 6 files changed, 221 insertions(+), 35 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/5c2da21f/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraTupleWriteAheadSink.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraTupleWriteAheadSink.java b/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraTupleWriteAheadSink.java index 8bce9d6..80dbcfe 100644 --- a/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraTupleWriteAheadSink.java +++ b/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraTupleWriteAheadSink.java @@ -31,9 +31,11 @@ import org.apache.flink.api.java.tuple.Tuple; import org.apache.flink.api.java.typeutils.runtime.TupleSerializer; import org.apache.flink.streaming.runtime.operators.CheckpointCommitter; import org.apache.flink.streaming.runtime.operators.GenericWriteAheadSink; +import org.apache.flink.types.IntValue; import java.util.UUID; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; /** * Sink that emits its input elements into a Cassandra database. This sink stores incoming records within a @@ -43,20 +45,16 @@ import java.util.concurrent.atomic.AtomicInteger; * @param <IN> Type of the elements emitted by this sink */ public class CassandraTupleWriteAheadSink<IN extends Tuple> extends GenericWriteAheadSink<IN> { + private static final long serialVersionUID = 1L; + protected transient Cluster cluster; protected transient Session session; private final String insertQuery; private transient PreparedStatement preparedStatement; - private transient Throwable exception = null; - private transient FutureCallback<ResultSet> callback; - private ClusterBuilder builder; - private int updatesSent = 0; - private AtomicInteger updatesConfirmed = new AtomicInteger(0); - private transient Object[] fields; protected CassandraTupleWriteAheadSink(String insertQuery, TypeSerializer<IN> serializer, ClusterBuilder builder, CheckpointCommitter committer) throws Exception { @@ -71,18 +69,6 @@ public class CassandraTupleWriteAheadSink<IN extends Tuple> extends GenericWrite if (!getRuntimeContext().isCheckpointingEnabled()) { throw new IllegalStateException("The write-ahead log requires checkpointing to be enabled."); } - this.callback = new FutureCallback<ResultSet>() { - @Override - public void onSuccess(ResultSet resultSet) { - updatesConfirmed.incrementAndGet(); - } - - @Override - public void onFailure(Throwable throwable) { - exception = throwable; - LOG.error("Error while sending value.", throwable); - } - }; cluster = builder.getCluster(); session = cluster.connect(); preparedStatement = session.prepare(insertQuery); @@ -110,12 +96,38 @@ public class CassandraTupleWriteAheadSink<IN extends Tuple> extends GenericWrite } @Override - protected void sendValues(Iterable<IN> values, long timestamp) throws Exception { - //verify that no query failed until now - if (exception != null) { - throw new Exception(exception); - } + protected boolean sendValues(Iterable<IN> values, long timestamp) throws Exception { + final IntValue updatesCount = new IntValue(0); + final AtomicInteger updatesConfirmed = new AtomicInteger(0); + + final AtomicReference<Throwable> exception = new AtomicReference<>(); + + FutureCallback<ResultSet> callback = new FutureCallback<ResultSet>() { + @Override + public void onSuccess(ResultSet resultSet) { + updatesConfirmed.incrementAndGet(); + if (updatesCount.getValue() > 0) { // only set if all updates have been sent + if (updatesCount.getValue() == updatesConfirmed.get()) { + synchronized (updatesConfirmed) { + updatesConfirmed.notifyAll(); + } + } + } + } + + @Override + public void onFailure(Throwable throwable) { + if (exception.compareAndSet(null, throwable)) { + LOG.error("Error while sending value.", throwable); + synchronized (updatesConfirmed) { + updatesConfirmed.notifyAll(); + } + } + } + }; + //set values for prepared statement + int updatesSent = 0; for (IN value : values) { for (int x = 0; x < value.getArity(); x++) { fields[x] = value.getField(x); @@ -130,13 +142,18 @@ public class CassandraTupleWriteAheadSink<IN extends Tuple> extends GenericWrite Futures.addCallback(result, callback); } } - try { + updatesCount.setValue(updatesSent); + + synchronized (updatesConfirmed) { while (updatesSent != updatesConfirmed.get()) { - Thread.sleep(100); + if (exception.get() != null) { // verify that no query failed until now + LOG.warn("Sending a value failed.", exception.get()); + break; + } + updatesConfirmed.wait(); } - } catch (InterruptedException e) { } - updatesSent = 0; - updatesConfirmed.set(0); + boolean success = updatesSent == updatesConfirmed.get(); + return success; } } http://git-wip-us.apache.org/repos/asf/flink/blob/5c2da21f/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorUnitTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorUnitTest.java b/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorUnitTest.java new file mode 100644 index 0000000..e7d9df9 --- /dev/null +++ b/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorUnitTest.java @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.cassandra; + +import com.datastax.driver.core.BoundStatement; +import com.datastax.driver.core.Cluster; +import com.datastax.driver.core.PreparedStatement; +import com.datastax.driver.core.ResultSetFuture; +import com.datastax.driver.core.Session; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.java.tuple.Tuple0; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; +import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter; +import org.apache.flink.streaming.runtime.operators.CheckpointCommitter; +import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; +import org.apache.flink.util.IterableIterator; +import org.junit.Assert; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Matchers; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; +import org.powermock.core.classloader.annotations.PowerMockIgnore; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; + +import java.util.Iterator; +import java.util.concurrent.Executor; +import java.util.concurrent.atomic.AtomicReference; + +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyString; +import static org.powermock.api.mockito.PowerMockito.doAnswer; +import static org.powermock.api.mockito.PowerMockito.mock; +import static org.powermock.api.mockito.PowerMockito.when; + +@RunWith(PowerMockRunner.class) +@PrepareForTest({ResultPartitionWriter.class, CassandraTupleWriteAheadSink.class}) +@PowerMockIgnore({"javax.management.*", "com.sun.jndi.*"}) +public class CassandraConnectorUnitTest { + @Test + public void testAckLoopExitOnException() throws Exception { + final AtomicReference<Runnable> callback = new AtomicReference<>(); + + final ClusterBuilder clusterBuilder = new ClusterBuilder() { + @Override + protected Cluster buildCluster(Cluster.Builder builder) { + try { + BoundStatement boundStatement = mock(BoundStatement.class); + when(boundStatement.setDefaultTimestamp(any(long.class))).thenReturn(boundStatement); + + PreparedStatement preparedStatement = mock(PreparedStatement.class); + when(preparedStatement.bind(Matchers.anyVararg())).thenReturn(boundStatement); + + ResultSetFuture future = mock(ResultSetFuture.class); + when(future.get()).thenThrow(new RuntimeException("Expected exception.")); + + doAnswer(new Answer<Void>() { + @Override + public Void answer(InvocationOnMock invocationOnMock) throws Throwable { + callback.set((((Runnable) invocationOnMock.getArguments()[0]))); + return null; + } + }).when(future).addListener(any(Runnable.class), any(Executor.class)); + + Session session = mock(Session.class); + when(session.prepare(anyString())).thenReturn(preparedStatement); + when(session.executeAsync(any(BoundStatement.class))).thenReturn(future); + + Cluster cluster = mock(Cluster.class); + when(cluster.connect()).thenReturn(session); + return cluster; + } catch (Exception e) { + throw new RuntimeException(e); + } + } + }; + + final IterableIterator<Tuple0> iter = new IterableIterator<Tuple0>() { + private boolean exhausted = false; + + @Override + public boolean hasNext() { + return !exhausted; + } + + @Override + public Tuple0 next() { + exhausted = true; + return new Tuple0(); + } + + @Override + public void remove() { + } + + @Override + public Iterator<Tuple0> iterator() { + return this; + } + }; + + final AtomicReference<Boolean> exceptionCaught = new AtomicReference<>(); + + Thread t = new Thread() { + public void run() { + try { + CheckpointCommitter cc = mock(CheckpointCommitter.class); + final CassandraTupleWriteAheadSink<Tuple0> sink = new CassandraTupleWriteAheadSink<>( + "abc", + TupleTypeInfo.of(Tuple0.class).createSerializer(new ExecutionConfig()), + clusterBuilder, + cc + ); + + OneInputStreamOperatorTestHarness<Tuple0, Tuple0> harness = new OneInputStreamOperatorTestHarness(sink); + harness.getEnvironment().getTaskConfiguration().setBoolean("checkpointing", true); + + harness.setup(); + sink.open(); + boolean result = sink.sendValues(iter, 0L); + sink.close(); + exceptionCaught.set(result == false); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + }; + t.start(); + + int count = 0; + while (t.getState() != Thread.State.WAITING && count < 100) { // 10 second timeout 10 * 10 * 100ms + Thread.sleep(100); + count++; + } + + callback.get().run(); + + t.join(); + + Assert.assertTrue(exceptionCaught.get()); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/5c2da21f/flink-streaming-connectors/flink-connector-cassandra/src/test/resources/log4j-test.properties ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-cassandra/src/test/resources/log4j-test.properties b/flink-streaming-connectors/flink-connector-cassandra/src/test/resources/log4j-test.properties index 27914ce..a43d556 100644 --- a/flink-streaming-connectors/flink-connector-cassandra/src/test/resources/log4j-test.properties +++ b/flink-streaming-connectors/flink-connector-cassandra/src/test/resources/log4j-test.properties @@ -16,7 +16,7 @@ # limitations under the License. ################################################################################ -log4j.rootLogger=INFO, testlogger +log4j.rootLogger=OFF, testlogger log4j.appender.testlogger=org.apache.log4j.ConsoleAppender log4j.appender.testlogger.target= System.err http://git-wip-us.apache.org/repos/asf/flink/blob/5c2da21f/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/GenericWriteAheadSink.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/GenericWriteAheadSink.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/GenericWriteAheadSink.java index 4a27acb..b6cc399 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/GenericWriteAheadSink.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/GenericWriteAheadSink.java @@ -49,6 +49,8 @@ import java.util.UUID; * @param <IN> Type of the elements emitted by this sink */ public abstract class GenericWriteAheadSink<IN> extends AbstractStreamOperator<IN> implements OneInputStreamOperator<IN, IN> { + private static final long serialVersionUID = 1L; + protected static final Logger LOG = LoggerFactory.getLogger(GenericWriteAheadSink.class); private final CheckpointCommitter committer; private transient AbstractStateBackend.CheckpointStateOutputView out; @@ -140,10 +142,14 @@ public abstract class GenericWriteAheadSink<IN> extends AbstractStreamOperator<I if (!committer.isCheckpointCommitted(pastCheckpointId)) { Tuple2<Long, StateHandle<DataInputView>> handle = state.pendingHandles.get(pastCheckpointId); DataInputView in = handle.f1.getState(getUserCodeClassloader()); - sendValues(new ReusingMutableToRegularIteratorWrapper<>(new InputViewIterator<>(in, serializer), serializer), handle.f0); - committer.commitCheckpoint(pastCheckpointId); + boolean success = sendValues(new ReusingMutableToRegularIteratorWrapper<>(new InputViewIterator<>(in, serializer), serializer), handle.f0); + if (success) { //if the sending has failed we will retry on the next notify + committer.commitCheckpoint(pastCheckpointId); + checkpointsToRemove.add(pastCheckpointId); + } + } else { + checkpointsToRemove.add(pastCheckpointId); } - checkpointsToRemove.add(pastCheckpointId); } } for (Long toRemove : checkpointsToRemove) { @@ -159,10 +165,10 @@ public abstract class GenericWriteAheadSink<IN> extends AbstractStreamOperator<I * Write the given element into the backend. * * @param value value to be written + * @return true, if the sending was successful, false otherwise * @throws Exception */ - - protected abstract void sendValues(Iterable<IN> value, long timestamp) throws Exception; + protected abstract boolean sendValues(Iterable<IN> value, long timestamp) throws Exception; @Override public void processElement(StreamRecord<IN> element) throws Exception { http://git-wip-us.apache.org/repos/asf/flink/blob/5c2da21f/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/GenericWriteAheadSinkTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/GenericWriteAheadSinkTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/GenericWriteAheadSinkTest.java index 8282672..33896e8 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/GenericWriteAheadSinkTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/GenericWriteAheadSinkTest.java @@ -117,10 +117,11 @@ public class GenericWriteAheadSinkTest extends WriteAheadSinkTestBase<Tuple1<Int } @Override - protected void sendValues(Iterable<Tuple1<Integer>> values, long timestamp) throws Exception { + protected boolean sendValues(Iterable<Tuple1<Integer>> values, long timestamp) throws Exception { for (Tuple1<Integer> value : values) { this.values.add(value.f0); } + return true; } } http://git-wip-us.apache.org/repos/asf/flink/blob/5c2da21f/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java index 63d22a5..66bdb57 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java @@ -147,6 +147,10 @@ public class OneInputStreamOperatorTestHarness<IN, OUT> { return mockTask.getCheckpointLock(); } + public Environment getEnvironment() { + return this.mockTask.getEnvironment(); + } + public <K> void configureForKeyedStream(KeySelector<IN, K> keySelector, TypeInformation<K> keyType) { ClosureCleaner.clean(keySelector, false); config.setStatePartitioner(0, keySelector);