Repository: flink Updated Branches: refs/heads/release-1.3 bb03c07d9 -> 5d6b5a677
[FLINK-5101] Track pending records in CassandraSinkBase Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a85b1881 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a85b1881 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a85b1881 Branch: refs/heads/release-1.3 Commit: a85b1881d184c02075441f57f3364ec80b2d4f4d Parents: bb03c07 Author: zentol <[email protected]> Authored: Wed Nov 23 16:59:22 2016 +0100 Committer: zentol <[email protected]> Committed: Wed May 10 12:07:33 2017 +0200 ---------------------------------------------------------------------- .../connectors/cassandra/CassandraPojoSink.java | 7 +-- .../connectors/cassandra/CassandraSinkBase.java | 56 +++++++++++++++----- 2 files changed, 47 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/a85b1881/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraPojoSink.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraPojoSink.java b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraPojoSink.java index 650c481..9cfb2f8 100644 --- a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraPojoSink.java +++ b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraPojoSink.java @@ -17,6 +17,7 @@ package org.apache.flink.streaming.connectors.cassandra; +import com.datastax.driver.core.ResultSet; import com.datastax.driver.mapping.Mapper; import com.datastax.driver.mapping.MappingManager; import com.google.common.util.concurrent.ListenableFuture; @@ -31,7 +32,7 @@ import org.apache.flink.configuration.Configuration; * * @param <IN> Type of the elements emitted by this sink */ -public class CassandraPojoSink<IN> extends CassandraSinkBase<IN, Void> { +public class CassandraPojoSink<IN> extends CassandraSinkBase<IN, ResultSet> { private static final long serialVersionUID = 1L; @@ -61,7 +62,7 @@ public class CassandraPojoSink<IN> extends CassandraSinkBase<IN, Void> { } @Override - public ListenableFuture<Void> send(IN value) { - return mapper.saveAsync(value); + public ListenableFuture<ResultSet> send(IN value) { + return session.executeAsync(mapper.saveQuery(value)); } } http://git-wip-us.apache.org/repos/asf/flink/blob/a85b1881/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java index 49b1efa..b281525 100644 --- a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java +++ b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java @@ -29,6 +29,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; +import java.util.concurrent.atomic.AtomicInteger; /** * CassandraSinkBase is the common abstract class of {@link CassandraPojoSink} and {@link CassandraTupleSink}. @@ -40,11 +41,13 @@ public abstract class CassandraSinkBase<IN, V> extends RichSinkFunction<IN> { protected transient Cluster cluster; protected transient Session session; - protected transient Throwable exception = null; + protected transient volatile Throwable exception; protected transient FutureCallback<V> callback; private final ClusterBuilder builder; + private final AtomicInteger updatesPending = new AtomicInteger(); + protected CassandraSinkBase(ClusterBuilder builder) { this.builder = builder; ClosureCleaner.clean(builder, true); @@ -55,11 +58,24 @@ public abstract class CassandraSinkBase<IN, V> extends RichSinkFunction<IN> { this.callback = new FutureCallback<V>() { @Override public void onSuccess(V ignored) { + int pending = updatesPending.decrementAndGet(); + if (pending == 0) { + synchronized (updatesPending) { + updatesPending.notifyAll(); + } + } } @Override public void onFailure(Throwable t) { + int pending = updatesPending.decrementAndGet(); + if (pending == 0) { + synchronized (updatesPending) { + updatesPending.notifyAll(); + } + } exception = t; + LOG.error("Error while sending value.", t); } }; @@ -70,29 +86,43 @@ public abstract class CassandraSinkBase<IN, V> extends RichSinkFunction<IN> { @Override public void invoke(IN value) throws Exception { if (exception != null) { - throw new IOException("invoke() failed", exception); + throw new IOException("Error while sending value.", exception); } ListenableFuture<V> result = send(value); + updatesPending.incrementAndGet(); Futures.addCallback(result, callback); } public abstract ListenableFuture<V> send(IN value); @Override - public void close() { + public void close() throws Exception { try { - if (session != null) { - session.close(); + if (exception != null) { + throw new IOException("Error while sending value.", exception); } - } catch (Exception e) { - LOG.error("Error while closing session.", e); - } - try { - if (cluster != null) { - cluster.close(); + + while (updatesPending.get() > 0) { + synchronized (updatesPending) { + updatesPending.wait(); + } + } + + } finally { + try { + if (session != null) { + session.close(); + } + } catch (Exception e) { + LOG.error("Error while closing session.", e); + } + try { + if (cluster != null) { + cluster.close(); + } + } catch (Exception e) { + LOG.error("Error while closing cluster.", e); } - } catch (Exception e) { - LOG.error("Error while closing cluster.", e); } } }
