Repository: flink
Updated Branches:
  refs/heads/master 6744b852d -> 9487fcbfb


[FLINK-4119] Refactor null checks in Cassandra IOF

This closes #2163


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9487fcbf
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9487fcbf
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9487fcbf

Branch: refs/heads/master
Commit: 9487fcbfbae85caf82729e0dda5403fab2fb7d84
Parents: 6744b85
Author: Andrea Sella <[email protected]>
Authored: Sat Jun 25 18:03:20 2016 +0200
Committer: zentol <[email protected]>
Committed: Mon Jun 27 12:19:02 2016 +0200

----------------------------------------------------------------------
 .../cassandra/CassandraInputFormat.java         | 22 +++++++++++---------
 .../cassandra/CassandraOutputFormat.java        | 22 +++++++++++---------
 2 files changed, 24 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/9487fcbf/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraInputFormat.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraInputFormat.java
 
b/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraInputFormat.java
index 6818288..849e023 100644
--- 
a/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraInputFormat.java
+++ 
b/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraInputFormat.java
@@ -31,6 +31,7 @@ import org.apache.flink.core.io.GenericInputSplit;
 import org.apache.flink.core.io.InputSplit;
 import org.apache.flink.core.io.InputSplitAssigner;
 import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder;
+import org.apache.flink.util.Preconditions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -52,12 +53,9 @@ public class CassandraInputFormat<OUT extends Tuple> extends 
RichInputFormat<OUT
        private transient ResultSet resultSet;
 
        public CassandraInputFormat(String query, ClusterBuilder builder) {
-               if (Strings.isNullOrEmpty(query)) {
-                       throw new IllegalArgumentException("Query cannot be 
null or empty");
-               }
-               if (builder == null) {
-                       throw new IllegalArgumentException("Builder cannot be 
null.");
-               }
+               Preconditions.checkArgument(!Strings.isNullOrEmpty(query), 
"Query cannot be null or empty");
+               Preconditions.checkArgument(builder != null, "Builder cannot be 
null");
+
                this.query = query;
                this.builder = builder;
        }
@@ -115,15 +113,19 @@ public class CassandraInputFormat<OUT extends Tuple> 
extends RichInputFormat<OUT
        @Override
        public void close() throws IOException {
                try {
-                       session.close();
+                       if (session != null) {
+                               session.close();
+                       }
                } catch (Exception e) {
-                       LOG.info("Inputformat couldn't be closed." + 
e.getMessage(), e);
+                       LOG.error("Error while closing session.", e);
                }
 
                try {
-                       cluster.close();
+                       if (cluster != null ) {
+                               cluster.close();
+                       }
                } catch (Exception e) {
-                       LOG.info("Inputformat couldn't be closed." + 
e.getMessage(), e);
+                       LOG.error("Error while closing cluster.", e);
                }
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/9487fcbf/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraOutputFormat.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraOutputFormat.java
 
b/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraOutputFormat.java
index 116db89..15d8fb3 100644
--- 
a/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraOutputFormat.java
+++ 
b/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraOutputFormat.java
@@ -28,6 +28,7 @@ import org.apache.flink.api.common.io.RichOutputFormat;
 import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder;
+import org.apache.flink.util.Preconditions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -51,12 +52,9 @@ public class CassandraOutputFormat<OUT extends Tuple> 
extends RichOutputFormat<O
        private transient Throwable exception = null;
 
        public CassandraOutputFormat(String insertQuery, ClusterBuilder 
builder) {
-               if (Strings.isNullOrEmpty(insertQuery)) {
-                       throw new IllegalArgumentException("insertQuery cannot 
be null or empty");
-               }
-               if (builder == null) {
-                       throw new IllegalArgumentException("Builder cannot be 
null.");
-               }
+               
Preconditions.checkArgument(!Strings.isNullOrEmpty(insertQuery), "Query cannot 
be null or empty");
+               Preconditions.checkArgument(builder != null, "Builder cannot be 
null");
+
                this.insertQuery = insertQuery;
                this.builder = builder;
        }
@@ -109,15 +107,19 @@ public class CassandraOutputFormat<OUT extends Tuple> 
extends RichOutputFormat<O
        @Override
        public void close() throws IOException {
                try {
-                       session.close();
+                       if (session != null) {
+                               session.close();
+                       }
                } catch (Exception e) {
-                       LOG.warn("Inputformat couldn't be closed.", e);
+                       LOG.error("Error while closing session.", e);
                }
 
                try {
-                       cluster.close();
+                       if (cluster != null ) {
+                               cluster.close();
+                       }
                } catch (Exception e) {
-                       LOG.warn("Inputformat couldn't be closed." , e);
+                       LOG.error("Error while closing cluster.", e);
                }
        }
 }

Reply via email to