Repository: flink
Updated Branches:
  refs/heads/master 62523acbe -> 5fa389014


[hotfix] [cassandra] Fix CassandraSinkBase serialization issue


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5fa38901
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5fa38901
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5fa38901

Branch: refs/heads/master
Commit: 5fa389014a3ce40534703c8a5731c8a9a955058a
Parents: 62523ac
Author: zentol <[email protected]>
Authored: Mon Nov 14 14:45:16 2016 +0100
Committer: zentol <[email protected]>
Committed: Mon Nov 14 14:45:16 2016 +0100

----------------------------------------------------------------------
 .../flink/streaming/connectors/cassandra/CassandraSinkBase.java   | 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/5fa38901/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java
 
b/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java
index 9c4c430..713a286 100644
--- 
a/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java
+++ 
b/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java
@@ -42,7 +42,7 @@ public abstract class CassandraSinkBase<IN, V> extends 
RichSinkFunction<IN> {
        protected transient Cluster cluster;
        protected transient Session session;
 
-       protected transient final AtomicReference<Throwable> exception = new 
AtomicReference<>();
+       protected transient AtomicReference<Throwable> exception;
        protected transient FutureCallback<V> callback;
 
        private final ClusterBuilder builder;
@@ -56,6 +56,7 @@ public abstract class CassandraSinkBase<IN, V> extends 
RichSinkFunction<IN> {
 
        @Override
        public void open(Configuration configuration) {
+               this.exception = new AtomicReference<>();
                this.callback = new FutureCallback<V>() {
                        @Override
                        public void onSuccess(V ignored) {

Reply via email to