Repository: flink Updated Branches: refs/heads/master 62523acbe -> 5fa389014
[hotfix] [cassandra] Fix CassandraSinkBase serialization issue Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5fa38901 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5fa38901 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5fa38901 Branch: refs/heads/master Commit: 5fa389014a3ce40534703c8a5731c8a9a955058a Parents: 62523ac Author: zentol <[email protected]> Authored: Mon Nov 14 14:45:16 2016 +0100 Committer: zentol <[email protected]> Committed: Mon Nov 14 14:45:16 2016 +0100 ---------------------------------------------------------------------- .../flink/streaming/connectors/cassandra/CassandraSinkBase.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/5fa38901/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java b/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java index 9c4c430..713a286 100644 --- a/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java +++ b/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java @@ -42,7 +42,7 @@ public abstract class CassandraSinkBase<IN, V> extends RichSinkFunction<IN> { protected transient Cluster cluster; protected transient Session session; - protected transient final AtomicReference<Throwable> exception = new AtomicReference<>(); + protected transient AtomicReference<Throwable> exception; protected transient FutureCallback<V> callback; private final ClusterBuilder builder; @@ -56,6 +56,7 @@ public abstract class CassandraSinkBase<IN, V> extends RichSinkFunction<IN> { @Override public void open(Configuration configuration) { + this.exception = new AtomicReference<>(); this.callback = new FutureCallback<V>() { @Override public void onSuccess(V ignored) {
