[FLINK-4123] Cassandra sink checks for exceptions in ack phase

add serialVersionUID

switch to AtomicReference

wait-notify

disable logging

add test case for leaving ackPhaseLoopOnException

prevent infinite loop in test

This closes #2183.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5c2da21f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5c2da21f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5c2da21f

Branch: refs/heads/master
Commit: 5c2da21f25741502dd8ca64ce9d314a1ebea1441
Parents: 508965e
Author: zentol <ches...@apache.org>
Authored: Wed Jun 29 16:21:21 2016 +0200
Committer: Till Rohrmann <trohrm...@apache.org>
Committed: Tue Jul 12 18:07:13 2016 +0200

----------------------------------------------------------------------
 .../cassandra/CassandraTupleWriteAheadSink.java |  73 +++++----
 .../cassandra/CassandraConnectorUnitTest.java   | 158 +++++++++++++++++++
 .../src/test/resources/log4j-test.properties    |   2 +-
 .../operators/GenericWriteAheadSink.java        |  16 +-
 .../operators/GenericWriteAheadSinkTest.java    |   3 +-
 .../util/OneInputStreamOperatorTestHarness.java |   4 +
 6 files changed, 221 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/5c2da21f/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraTupleWriteAheadSink.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraTupleWriteAheadSink.java
 
b/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraTupleWriteAheadSink.java
index 8bce9d6..80dbcfe 100644
--- 
a/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraTupleWriteAheadSink.java
+++ 
b/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraTupleWriteAheadSink.java
@@ -31,9 +31,11 @@ import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.api.java.typeutils.runtime.TupleSerializer;
 import org.apache.flink.streaming.runtime.operators.CheckpointCommitter;
 import org.apache.flink.streaming.runtime.operators.GenericWriteAheadSink;
+import org.apache.flink.types.IntValue;
 
 import java.util.UUID;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
 
 /**
  * Sink that emits its input elements into a Cassandra database. This sink 
stores incoming records within a
@@ -43,20 +45,16 @@ import java.util.concurrent.atomic.AtomicInteger;
  * @param <IN> Type of the elements emitted by this sink
  */
 public class CassandraTupleWriteAheadSink<IN extends Tuple> extends 
GenericWriteAheadSink<IN> {
+       private static final long serialVersionUID = 1L;
+
        protected transient Cluster cluster;
        protected transient Session session;
 
        private final String insertQuery;
        private transient PreparedStatement preparedStatement;
 
-       private transient Throwable exception = null;
-       private transient FutureCallback<ResultSet> callback;
-
        private ClusterBuilder builder;
 
-       private int updatesSent = 0;
-       private AtomicInteger updatesConfirmed = new AtomicInteger(0);
-
        private transient Object[] fields;
 
        protected CassandraTupleWriteAheadSink(String insertQuery, 
TypeSerializer<IN> serializer, ClusterBuilder builder, CheckpointCommitter 
committer) throws Exception {
@@ -71,18 +69,6 @@ public class CassandraTupleWriteAheadSink<IN extends Tuple> 
extends GenericWrite
                if (!getRuntimeContext().isCheckpointingEnabled()) {
                        throw new IllegalStateException("The write-ahead log 
requires checkpointing to be enabled.");
                }
-               this.callback = new FutureCallback<ResultSet>() {
-                       @Override
-                       public void onSuccess(ResultSet resultSet) {
-                               updatesConfirmed.incrementAndGet();
-                       }
-
-                       @Override
-                       public void onFailure(Throwable throwable) {
-                               exception = throwable;
-                               LOG.error("Error while sending value.", 
throwable);
-                       }
-               };
                cluster = builder.getCluster();
                session = cluster.connect();
                preparedStatement = session.prepare(insertQuery);
@@ -110,12 +96,38 @@ public class CassandraTupleWriteAheadSink<IN extends 
Tuple> extends GenericWrite
        }
 
        @Override
-       protected void sendValues(Iterable<IN> values, long timestamp) throws 
Exception {
-               //verify that no query failed until now
-               if (exception != null) {
-                       throw new Exception(exception);
-               }
+       protected boolean sendValues(Iterable<IN> values, long timestamp) 
throws Exception {
+               final IntValue updatesCount = new IntValue(0);
+               final AtomicInteger updatesConfirmed = new AtomicInteger(0);
+
+               final AtomicReference<Throwable> exception = new 
AtomicReference<>();
+
+               FutureCallback<ResultSet> callback = new 
FutureCallback<ResultSet>() {
+                       @Override
+                       public void onSuccess(ResultSet resultSet) {
+                               updatesConfirmed.incrementAndGet();
+                               if (updatesCount.getValue() > 0) { // only set 
if all updates have been sent
+                                       if (updatesCount.getValue() == 
updatesConfirmed.get()) {
+                                               synchronized (updatesConfirmed) 
{
+                                                       
updatesConfirmed.notifyAll();
+                                               }
+                                       }
+                               }
+                       }
+
+                       @Override
+                       public void onFailure(Throwable throwable) {
+                               if (exception.compareAndSet(null, throwable)) {
+                                       LOG.error("Error while sending value.", 
throwable);
+                                       synchronized (updatesConfirmed) {
+                                               updatesConfirmed.notifyAll();
+                                       }
+                               }
+                       }
+               };
+
                //set values for prepared statement
+               int updatesSent = 0;
                for (IN value : values) {
                        for (int x = 0; x < value.getArity(); x++) {
                                fields[x] = value.getField(x);
@@ -130,13 +142,18 @@ public class CassandraTupleWriteAheadSink<IN extends 
Tuple> extends GenericWrite
                                Futures.addCallback(result, callback);
                        }
                }
-               try {
+               updatesCount.setValue(updatesSent);
+
+               synchronized (updatesConfirmed) {
                        while (updatesSent != updatesConfirmed.get()) {
-                               Thread.sleep(100);
+                               if (exception.get() != null) { // verify that 
no query failed until now
+                                       LOG.warn("Sending a value failed.", 
exception.get());
+                                       break;
+                               }
+                               updatesConfirmed.wait();
                        }
-               } catch (InterruptedException e) {
                }
-               updatesSent = 0;
-               updatesConfirmed.set(0);
+               boolean success = updatesSent == updatesConfirmed.get();
+               return success;
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/5c2da21f/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorUnitTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorUnitTest.java
 
b/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorUnitTest.java
new file mode 100644
index 0000000..e7d9df9
--- /dev/null
+++ 
b/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorUnitTest.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.cassandra;
+
+import com.datastax.driver.core.BoundStatement;
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.PreparedStatement;
+import com.datastax.driver.core.ResultSetFuture;
+import com.datastax.driver.core.Session;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.java.tuple.Tuple0;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
+import org.apache.flink.streaming.runtime.operators.CheckpointCommitter;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.apache.flink.util.IterableIterator;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Matchers;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.util.Iterator;
+import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyString;
+import static org.powermock.api.mockito.PowerMockito.doAnswer;
+import static org.powermock.api.mockito.PowerMockito.mock;
+import static org.powermock.api.mockito.PowerMockito.when;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({ResultPartitionWriter.class, 
CassandraTupleWriteAheadSink.class})
+@PowerMockIgnore({"javax.management.*", "com.sun.jndi.*"})
+public class CassandraConnectorUnitTest {
+       @Test
+       public void testAckLoopExitOnException() throws Exception {
+               final AtomicReference<Runnable> callback = new 
AtomicReference<>();
+
+               final ClusterBuilder clusterBuilder = new ClusterBuilder() {
+                       @Override
+                       protected Cluster buildCluster(Cluster.Builder builder) 
{
+                               try {
+                                       BoundStatement boundStatement = 
mock(BoundStatement.class);
+                                       
when(boundStatement.setDefaultTimestamp(any(long.class))).thenReturn(boundStatement);
+
+                                       PreparedStatement preparedStatement = 
mock(PreparedStatement.class);
+                                       
when(preparedStatement.bind(Matchers.anyVararg())).thenReturn(boundStatement);
+
+                                       ResultSetFuture future = 
mock(ResultSetFuture.class);
+                                       when(future.get()).thenThrow(new 
RuntimeException("Expected exception."));
+
+                                       doAnswer(new Answer<Void>() {
+                                               @Override
+                                               public Void 
answer(InvocationOnMock invocationOnMock) throws Throwable {
+                                                       
callback.set((((Runnable) invocationOnMock.getArguments()[0])));
+                                                       return null;
+                                               }
+                                       
}).when(future).addListener(any(Runnable.class), any(Executor.class));
+
+                                       Session session = mock(Session.class);
+                                       
when(session.prepare(anyString())).thenReturn(preparedStatement);
+                                       
when(session.executeAsync(any(BoundStatement.class))).thenReturn(future);
+
+                                       Cluster cluster = mock(Cluster.class);
+                                       
when(cluster.connect()).thenReturn(session);
+                                       return cluster;
+                               } catch (Exception e) {
+                                       throw new RuntimeException(e);
+                               }
+                       }
+               };
+
+               final IterableIterator<Tuple0> iter = new 
IterableIterator<Tuple0>() {
+                       private boolean exhausted = false;
+
+                       @Override
+                       public boolean hasNext() {
+                               return !exhausted;
+                       }
+
+                       @Override
+                       public Tuple0 next() {
+                               exhausted = true;
+                               return new Tuple0();
+                       }
+
+                       @Override
+                       public void remove() {
+                       }
+
+                       @Override
+                       public Iterator<Tuple0> iterator() {
+                               return this;
+                       }
+               };
+
+               final AtomicReference<Boolean> exceptionCaught = new 
AtomicReference<>();
+
+               Thread t = new Thread() {
+                       public void run() {
+                               try {
+                                       CheckpointCommitter cc = 
mock(CheckpointCommitter.class);
+                                       final 
CassandraTupleWriteAheadSink<Tuple0> sink = new CassandraTupleWriteAheadSink<>(
+                                               "abc",
+                                               
TupleTypeInfo.of(Tuple0.class).createSerializer(new ExecutionConfig()),
+                                               clusterBuilder,
+                                               cc
+                                       );
+
+                                       
OneInputStreamOperatorTestHarness<Tuple0, Tuple0> harness = new 
OneInputStreamOperatorTestHarness(sink);
+                                       
harness.getEnvironment().getTaskConfiguration().setBoolean("checkpointing", 
true);
+
+                                       harness.setup();
+                                       sink.open();
+                                       boolean result = sink.sendValues(iter, 
0L);
+                                       sink.close();
+                                       exceptionCaught.set(result == false);
+                               } catch (Exception e) {
+                                       throw new RuntimeException(e);
+                               }
+                       }
+               };
+               t.start();
+
+               int count = 0;
+               while (t.getState() != Thread.State.WAITING && count < 100) { 
// 10 second timeout 10 * 10 * 100ms
+                       Thread.sleep(100);
+                       count++;
+               }
+
+               callback.get().run();
+
+               t.join();
+
+               Assert.assertTrue(exceptionCaught.get());
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/5c2da21f/flink-streaming-connectors/flink-connector-cassandra/src/test/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-cassandra/src/test/resources/log4j-test.properties
 
b/flink-streaming-connectors/flink-connector-cassandra/src/test/resources/log4j-test.properties
index 27914ce..a43d556 100644
--- 
a/flink-streaming-connectors/flink-connector-cassandra/src/test/resources/log4j-test.properties
+++ 
b/flink-streaming-connectors/flink-connector-cassandra/src/test/resources/log4j-test.properties
@@ -16,7 +16,7 @@
 # limitations under the License.
 
################################################################################
 
-log4j.rootLogger=INFO, testlogger
+log4j.rootLogger=OFF, testlogger
 
 log4j.appender.testlogger=org.apache.log4j.ConsoleAppender
 log4j.appender.testlogger.target= System.err

http://git-wip-us.apache.org/repos/asf/flink/blob/5c2da21f/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/GenericWriteAheadSink.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/GenericWriteAheadSink.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/GenericWriteAheadSink.java
index 4a27acb..b6cc399 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/GenericWriteAheadSink.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/GenericWriteAheadSink.java
@@ -49,6 +49,8 @@ import java.util.UUID;
  * @param <IN> Type of the elements emitted by this sink
  */
 public abstract class GenericWriteAheadSink<IN> extends 
AbstractStreamOperator<IN> implements OneInputStreamOperator<IN, IN> {
+       private static final long serialVersionUID = 1L;
+
        protected static final Logger LOG = 
LoggerFactory.getLogger(GenericWriteAheadSink.class);
        private final CheckpointCommitter committer;
        private transient AbstractStateBackend.CheckpointStateOutputView out;
@@ -140,10 +142,14 @@ public abstract class GenericWriteAheadSink<IN> extends 
AbstractStreamOperator<I
                                        if 
(!committer.isCheckpointCommitted(pastCheckpointId)) {
                                                Tuple2<Long, 
StateHandle<DataInputView>> handle = state.pendingHandles.get(pastCheckpointId);
                                                DataInputView in = 
handle.f1.getState(getUserCodeClassloader());
-                                               sendValues(new 
ReusingMutableToRegularIteratorWrapper<>(new InputViewIterator<>(in, 
serializer), serializer), handle.f0);
-                                               
committer.commitCheckpoint(pastCheckpointId);
+                                               boolean success = 
sendValues(new ReusingMutableToRegularIteratorWrapper<>(new 
InputViewIterator<>(in, serializer), serializer), handle.f0);
+                                               if (success) { //if the sending 
has failed we will retry on the next notify
+                                                       
committer.commitCheckpoint(pastCheckpointId);
+                                                       
checkpointsToRemove.add(pastCheckpointId);
+                                               }
+                                       } else {
+                                               
checkpointsToRemove.add(pastCheckpointId);
                                        }
-                                       
checkpointsToRemove.add(pastCheckpointId);
                                }
                        }
                        for (Long toRemove : checkpointsToRemove) {
@@ -159,10 +165,10 @@ public abstract class GenericWriteAheadSink<IN> extends 
AbstractStreamOperator<I
         * Write the given element into the backend.
         *
         * @param value value to be written
+        * @return true, if the sending was successful, false otherwise
         * @throws Exception
         */
-
-       protected abstract void sendValues(Iterable<IN> value, long timestamp) 
throws Exception;
+       protected abstract boolean sendValues(Iterable<IN> value, long 
timestamp) throws Exception;
 
        @Override
        public void processElement(StreamRecord<IN> element) throws Exception {

http://git-wip-us.apache.org/repos/asf/flink/blob/5c2da21f/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/GenericWriteAheadSinkTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/GenericWriteAheadSinkTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/GenericWriteAheadSinkTest.java
index 8282672..33896e8 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/GenericWriteAheadSinkTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/GenericWriteAheadSinkTest.java
@@ -117,10 +117,11 @@ public class GenericWriteAheadSinkTest extends 
WriteAheadSinkTestBase<Tuple1<Int
                }
 
                @Override
-               protected void sendValues(Iterable<Tuple1<Integer>> values, 
long timestamp) throws Exception {
+               protected boolean sendValues(Iterable<Tuple1<Integer>> values, 
long timestamp) throws Exception {
                        for (Tuple1<Integer> value : values) {
                                this.values.add(value.f0);
                        }
+                       return true;
                }
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/5c2da21f/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
index 63d22a5..66bdb57 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
@@ -147,6 +147,10 @@ public class OneInputStreamOperatorTestHarness<IN, OUT> {
                return mockTask.getCheckpointLock();
        }
 
+       public Environment getEnvironment() {
+               return this.mockTask.getEnvironment();
+       }
+
        public <K> void configureForKeyedStream(KeySelector<IN, K> keySelector, 
TypeInformation<K> keyType) {
                ClosureCleaner.clean(keySelector, false);
                config.setStatePartitioner(0, keySelector);

Reply via email to