Repository: flink
Updated Branches:
  refs/heads/release-1.3 bb03c07d9 -> 5d6b5a677


[FLINK-5101] Track pending records in CassandraSinkBase


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a85b1881
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a85b1881
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a85b1881

Branch: refs/heads/release-1.3
Commit: a85b1881d184c02075441f57f3364ec80b2d4f4d
Parents: bb03c07
Author: zentol <[email protected]>
Authored: Wed Nov 23 16:59:22 2016 +0100
Committer: zentol <[email protected]>
Committed: Wed May 10 12:07:33 2017 +0200

----------------------------------------------------------------------
 .../connectors/cassandra/CassandraPojoSink.java |  7 +--
 .../connectors/cassandra/CassandraSinkBase.java | 56 +++++++++++++++-----
 2 files changed, 47 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/a85b1881/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraPojoSink.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraPojoSink.java
 
b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraPojoSink.java
index 650c481..9cfb2f8 100644
--- 
a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraPojoSink.java
+++ 
b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraPojoSink.java
@@ -17,6 +17,7 @@
 
 package org.apache.flink.streaming.connectors.cassandra;
 
+import com.datastax.driver.core.ResultSet;
 import com.datastax.driver.mapping.Mapper;
 import com.datastax.driver.mapping.MappingManager;
 import com.google.common.util.concurrent.ListenableFuture;
@@ -31,7 +32,7 @@ import org.apache.flink.configuration.Configuration;
  *
  * @param <IN> Type of the elements emitted by this sink
  */
-public class CassandraPojoSink<IN> extends CassandraSinkBase<IN, Void> {
+public class CassandraPojoSink<IN> extends CassandraSinkBase<IN, ResultSet> {
 
        private static final long serialVersionUID = 1L;
 
@@ -61,7 +62,7 @@ public class CassandraPojoSink<IN> extends 
CassandraSinkBase<IN, Void> {
        }
 
        @Override
-       public ListenableFuture<Void> send(IN value) {
-               return mapper.saveAsync(value);
+       public ListenableFuture<ResultSet> send(IN value) {
+               return session.executeAsync(mapper.saveQuery(value));
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a85b1881/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java
 
b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java
index 49b1efa..b281525 100644
--- 
a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java
+++ 
b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java
@@ -29,6 +29,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.util.concurrent.atomic.AtomicInteger;
 
 /**
  * CassandraSinkBase is the common abstract class of {@link CassandraPojoSink} 
and {@link CassandraTupleSink}.
@@ -40,11 +41,13 @@ public abstract class CassandraSinkBase<IN, V> extends 
RichSinkFunction<IN> {
        protected transient Cluster cluster;
        protected transient Session session;
 
-       protected transient Throwable exception = null;
+       protected transient volatile Throwable exception;
        protected transient FutureCallback<V> callback;
 
        private final ClusterBuilder builder;
 
+       private final AtomicInteger updatesPending = new AtomicInteger();
+
        protected CassandraSinkBase(ClusterBuilder builder) {
                this.builder = builder;
                ClosureCleaner.clean(builder, true);
@@ -55,11 +58,24 @@ public abstract class CassandraSinkBase<IN, V> extends 
RichSinkFunction<IN> {
                this.callback = new FutureCallback<V>() {
                        @Override
                        public void onSuccess(V ignored) {
+                               int pending = updatesPending.decrementAndGet();
+                               if (pending == 0) {
+                                       synchronized (updatesPending) {
+                                               updatesPending.notifyAll();
+                                       }
+                               }
                        }
 
                        @Override
                        public void onFailure(Throwable t) {
+                               int pending = updatesPending.decrementAndGet();
+                               if (pending == 0) {
+                                       synchronized (updatesPending) {
+                                               updatesPending.notifyAll();
+                                       }
+                               }
                                exception = t;
+                               
                                LOG.error("Error while sending value.", t);
                        }
                };
@@ -70,29 +86,43 @@ public abstract class CassandraSinkBase<IN, V> extends 
RichSinkFunction<IN> {
        @Override
        public void invoke(IN value) throws Exception {
                if (exception != null) {
-                       throw new IOException("invoke() failed", exception);
+                       throw new IOException("Error while sending value.", 
exception);
                }
                ListenableFuture<V> result = send(value);
+               updatesPending.incrementAndGet();
                Futures.addCallback(result, callback);
        }
 
        public abstract ListenableFuture<V> send(IN value);
 
        @Override
-       public void close() {
+       public void close() throws Exception {
                try {
-                       if (session != null) {
-                               session.close();
+                       if (exception != null) {
+                               throw new IOException("Error while sending 
value.", exception);
                        }
-               } catch (Exception e) {
-                       LOG.error("Error while closing session.", e);
-               }
-               try {
-                       if (cluster != null) {
-                               cluster.close();
+
+                       while (updatesPending.get() > 0) {
+                               synchronized (updatesPending) {
+                                       updatesPending.wait();
+                               }
+                       }
+                       
+               } finally {
+                       try {
+                               if (session != null) {
+                                       session.close();
+                               }
+                       } catch (Exception e) {
+                               LOG.error("Error while closing session.", e);
+                       }
+                       try {
+                               if (cluster != null) {
+                                       cluster.close();
+                               }
+                       } catch (Exception e) {
+                               LOG.error("Error while closing cluster.", e);
                        }
-               } catch (Exception e) {
-                       LOG.error("Error while closing cluster.", e);
                }
        }
 }

Reply via email to