This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit a125e2a11444847920911927955be2d119716838
Author: ozan <ozancancice...@gmail.com>
AuthorDate: Wed Jun 12 21:35:54 2019 +0900

    [FLINK-12820] [Connectors / Cassandra] Support ignoring writing nulls for 
tuple types
    
    This closes #8714
---
 .../cassandra/AbstractCassandraTupleSink.java      | 17 +++++++++++-
 .../connectors/cassandra/CassandraSink.java        | 13 ++++++++++
 .../cassandra/CassandraSinkBaseConfig.java         | 27 +++++++++++++++++--
 .../cassandra/CassandraConnectorITCase.java        | 30 ++++++++++++++++++++++
 4 files changed, 84 insertions(+), 3 deletions(-)

diff --git 
a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/AbstractCassandraTupleSink.java
 
b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/AbstractCassandraTupleSink.java
index 5e1fcca..3cf8aa4 100644
--- 
a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/AbstractCassandraTupleSink.java
+++ 
b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/AbstractCassandraTupleSink.java
@@ -19,6 +19,7 @@ package org.apache.flink.streaming.connectors.cassandra;
 
 import org.apache.flink.configuration.Configuration;
 
+import com.datastax.driver.core.BoundStatement;
 import com.datastax.driver.core.PreparedStatement;
 import com.datastax.driver.core.ResultSet;
 import com.google.common.util.concurrent.ListenableFuture;
@@ -31,6 +32,7 @@ import com.google.common.util.concurrent.ListenableFuture;
 public abstract class AbstractCassandraTupleSink<IN> extends 
CassandraSinkBase<IN, ResultSet> {
        private final String insertQuery;
        private transient PreparedStatement ps;
+       private final boolean ignoreNullFields;
 
        public AbstractCassandraTupleSink(
                        String insertQuery,
@@ -39,6 +41,7 @@ public abstract class AbstractCassandraTupleSink<IN> extends 
CassandraSinkBase<I
                        CassandraFailureHandler failureHandler) {
                super(builder, config, failureHandler);
                this.insertQuery = insertQuery;
+               this.ignoreNullFields = config.getIgnoreNullFields();
        }
 
        @Override
@@ -50,7 +53,19 @@ public abstract class AbstractCassandraTupleSink<IN> extends 
CassandraSinkBase<I
        @Override
        public ListenableFuture<ResultSet> send(IN value) {
                Object[] fields = extract(value);
-               return session.executeAsync(ps.bind(fields));
+               return session.executeAsync(bind(fields));
+       }
+
+       private BoundStatement bind(Object[] fields) {
+               BoundStatement bs = ps.bind(fields);
+               if (ignoreNullFields) {
+                       for (int i = 0; i < fields.length; i++) {
+                               if (fields[i] == null) {
+                                       bs.unset(i);
+                               }
+                       }
+               }
+               return bs;
        }
 
        protected abstract Object[] extract(IN record);
diff --git 
a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java
 
b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java
index 128e5e0..2a8ebff 100644
--- 
a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java
+++ 
b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java
@@ -399,6 +399,19 @@ public class CassandraSink<IN> {
                }
 
                /**
+                * Enables ignoring null values, treats null values as unset 
and avoids writing null fields
+                * and creating tombstones.
+                *
+                * <p>This call has no effect if {@link 
CassandraSinkBuilder#enableWriteAheadLog()} is called.
+                *
+                * @return this builder
+                */
+               public CassandraSinkBuilder<IN> enableIgnoreNullFields() {
+                       this.configBuilder.setIgnoreNullFields(true);
+                       return this;
+               }
+
+               /**
                 * Finalizes the configuration of this sink.
                 *
                 * @return finalized sink
diff --git 
a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBaseConfig.java
 
b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBaseConfig.java
index cb8d904..d48f973 100644
--- 
a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBaseConfig.java
+++ 
b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBaseConfig.java
@@ -38,6 +38,12 @@ public final class CassandraSinkBaseConfig implements 
Serializable  {
         */
        public static final Duration DEFAULT_MAX_CONCURRENT_REQUESTS_TIMEOUT = 
Duration.ofMillis(Long.MAX_VALUE);
 
+       /**
+        * The default option to ignore null fields on insertion. By default, 
{@code false}.
+        */
+       public static final boolean DEFAULT_IGNORE_NULL_FIELDS = false;
+
+
        // ------------------------- Configuration Fields 
-------------------------
 
        /** Maximum number of concurrent requests allowed. */
@@ -46,9 +52,13 @@ public final class CassandraSinkBaseConfig implements 
Serializable  {
        /** Timeout duration when acquiring a permit to execute. */
        private final Duration maxConcurrentRequestsTimeout;
 
+       /** Whether to ignore null fields on insert. */
+       private final boolean ignoreNullFields;
+
        private CassandraSinkBaseConfig(
                        int maxConcurrentRequests,
-                       Duration maxConcurrentRequestsTimeout) {
+                       Duration maxConcurrentRequestsTimeout,
+                       boolean ignoreNullFields) {
                Preconditions.checkArgument(maxConcurrentRequests > 0,
                        "Max concurrent requests is expected to be positive");
                Preconditions.checkNotNull(maxConcurrentRequestsTimeout,
@@ -57,6 +67,7 @@ public final class CassandraSinkBaseConfig implements 
Serializable  {
                        "Max concurrent requests timeout is expected to be 
positive");
                this.maxConcurrentRequests = maxConcurrentRequests;
                this.maxConcurrentRequestsTimeout = 
maxConcurrentRequestsTimeout;
+               this.ignoreNullFields = ignoreNullFields;
        }
 
        public int getMaxConcurrentRequests() {
@@ -67,11 +78,16 @@ public final class CassandraSinkBaseConfig implements 
Serializable  {
                return maxConcurrentRequestsTimeout;
        }
 
+       public boolean getIgnoreNullFields() {
+               return ignoreNullFields;
+       }
+
        @Override
        public String toString() {
                return "CassandraSinkBaseConfig{" +
                        "maxConcurrentRequests=" + maxConcurrentRequests +
                        ", maxConcurrentRequestsTimeout=" + 
maxConcurrentRequestsTimeout +
+                       ", ignoreNullFields=" + ignoreNullFields +
                        '}';
        }
 
@@ -85,6 +101,7 @@ public final class CassandraSinkBaseConfig implements 
Serializable  {
        public static class Builder {
                private int maxConcurrentRequests = 
DEFAULT_MAX_CONCURRENT_REQUESTS;
                private Duration maxConcurrentRequestsTimeout = 
DEFAULT_MAX_CONCURRENT_REQUESTS_TIMEOUT;
+               private boolean ignoreNullFields = DEFAULT_IGNORE_NULL_FIELDS;
 
                Builder() { }
 
@@ -98,10 +115,16 @@ public final class CassandraSinkBaseConfig implements 
Serializable  {
                        return this;
                }
 
+               public Builder setIgnoreNullFields(boolean ignoreNullFields) {
+                       this.ignoreNullFields = ignoreNullFields;
+                       return this;
+               }
+
                public CassandraSinkBaseConfig build() {
                        return new CassandraSinkBaseConfig(
                                maxConcurrentRequests,
-                               maxConcurrentRequestsTimeout);
+                               maxConcurrentRequestsTimeout,
+                               ignoreNullFields);
                }
        }
 }
diff --git 
a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
 
b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
index b3d8f65..0119c16 100644
--- 
a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
+++ 
b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
@@ -637,4 +637,34 @@ public class CassandraConnectorITCase extends 
WriteAheadSinkTestBase<Tuple3<Stri
                }
                Assert.assertEquals(0, scalaTupleCollection.size());
        }
+
+       @Test
+       public void testCassandraScalaTuplePartialColumnUpdate() throws 
Exception {
+               CassandraSinkBaseConfig config = 
CassandraSinkBaseConfig.newBuilder().setIgnoreNullFields(true).build();
+               CassandraScalaProductSink<scala.Tuple3<String, Integer, 
Integer>> sink = new 
CassandraScalaProductSink<>(injectTableName(INSERT_DATA_QUERY), builder, 
config);
+
+               String id = UUID.randomUUID().toString();
+               Integer counter = 1;
+               Integer batchId = 0;
+
+               // Send partial records across multiple request
+               scala.Tuple3<String, Integer, Integer> scalaTupleRecordFirst = 
new scala.Tuple3<>(id, counter, null);
+               scala.Tuple3<String, Integer, Integer> scalaTupleRecordSecond = 
new scala.Tuple3<>(id, null, batchId);
+
+               try {
+                       sink.open(new Configuration());
+                       sink.invoke(scalaTupleRecordFirst, 
SinkContextUtil.forTimestamp(0));
+                       sink.invoke(scalaTupleRecordSecond, 
SinkContextUtil.forTimestamp(0));
+               } finally {
+                       sink.close();
+               }
+
+               ResultSet rs = 
session.execute(injectTableName(SELECT_DATA_QUERY));
+               List<com.datastax.driver.core.Row> rows = rs.all();
+               Assert.assertEquals(1, rows.size());
+               // Since nulls are ignored, we should be reading one complete 
record
+               for (com.datastax.driver.core.Row row : rows) {
+                       Assert.assertEquals(new scala.Tuple3<>(id, counter, 
batchId), new scala.Tuple3<>(row.getString("id"), row.getInt("counter"), 
row.getInt("batch_id")));
+               }
+       }
 }

Reply via email to