Repository: flink Updated Branches: refs/heads/master 6744b852d -> 9487fcbfb
[FLINK-4119] Refactor null checks in Cassandra IOF This closes #2163 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9487fcbf Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9487fcbf Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9487fcbf Branch: refs/heads/master Commit: 9487fcbfbae85caf82729e0dda5403fab2fb7d84 Parents: 6744b85 Author: Andrea Sella <[email protected]> Authored: Sat Jun 25 18:03:20 2016 +0200 Committer: zentol <[email protected]> Committed: Mon Jun 27 12:19:02 2016 +0200 ---------------------------------------------------------------------- .../cassandra/CassandraInputFormat.java | 22 +++++++++++--------- .../cassandra/CassandraOutputFormat.java | 22 +++++++++++--------- 2 files changed, 24 insertions(+), 20 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/9487fcbf/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraInputFormat.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraInputFormat.java b/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraInputFormat.java index 6818288..849e023 100644 --- a/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraInputFormat.java +++ b/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraInputFormat.java @@ -31,6 +31,7 @@ import org.apache.flink.core.io.GenericInputSplit; import org.apache.flink.core.io.InputSplit; import org.apache.flink.core.io.InputSplitAssigner; import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder; +import org.apache.flink.util.Preconditions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -52,12 +53,9 @@ public class CassandraInputFormat<OUT extends Tuple> extends RichInputFormat<OUT private transient ResultSet resultSet; public CassandraInputFormat(String query, ClusterBuilder builder) { - if (Strings.isNullOrEmpty(query)) { - throw new IllegalArgumentException("Query cannot be null or empty"); - } - if (builder == null) { - throw new IllegalArgumentException("Builder cannot be null."); - } + Preconditions.checkArgument(!Strings.isNullOrEmpty(query), "Query cannot be null or empty"); + Preconditions.checkArgument(builder != null, "Builder cannot be null"); + this.query = query; this.builder = builder; } @@ -115,15 +113,19 @@ public class CassandraInputFormat<OUT extends Tuple> extends RichInputFormat<OUT @Override public void close() throws IOException { try { - session.close(); + if (session != null) { + session.close(); + } } catch (Exception e) { - LOG.info("Inputformat couldn't be closed." + e.getMessage(), e); + LOG.error("Error while closing session.", e); } try { - cluster.close(); + if (cluster != null ) { + cluster.close(); + } } catch (Exception e) { - LOG.info("Inputformat couldn't be closed." + e.getMessage(), e); + LOG.error("Error while closing cluster.", e); } } } http://git-wip-us.apache.org/repos/asf/flink/blob/9487fcbf/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraOutputFormat.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraOutputFormat.java b/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraOutputFormat.java index 116db89..15d8fb3 100644 --- a/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraOutputFormat.java +++ b/flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraOutputFormat.java @@ -28,6 +28,7 @@ import org.apache.flink.api.common.io.RichOutputFormat; import org.apache.flink.api.java.tuple.Tuple; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder; +import org.apache.flink.util.Preconditions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -51,12 +52,9 @@ public class CassandraOutputFormat<OUT extends Tuple> extends RichOutputFormat<O private transient Throwable exception = null; public CassandraOutputFormat(String insertQuery, ClusterBuilder builder) { - if (Strings.isNullOrEmpty(insertQuery)) { - throw new IllegalArgumentException("insertQuery cannot be null or empty"); - } - if (builder == null) { - throw new IllegalArgumentException("Builder cannot be null."); - } + Preconditions.checkArgument(!Strings.isNullOrEmpty(insertQuery), "Query cannot be null or empty"); + Preconditions.checkArgument(builder != null, "Builder cannot be null"); + this.insertQuery = insertQuery; this.builder = builder; } @@ -109,15 +107,19 @@ public class CassandraOutputFormat<OUT extends Tuple> extends RichOutputFormat<O @Override public void close() throws IOException { try { - session.close(); + if (session != null) { + session.close(); + } } catch (Exception e) { - LOG.warn("Inputformat couldn't be closed.", e); + LOG.error("Error while closing session.", e); } try { - cluster.close(); + if (cluster != null ) { + cluster.close(); + } } catch (Exception e) { - LOG.warn("Inputformat couldn't be closed." , e); + LOG.error("Error while closing cluster.", e); } } }
