This is an automated email from the ASF dual-hosted git repository. sewen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit a125e2a11444847920911927955be2d119716838 Author: ozan <ozancancice...@gmail.com> AuthorDate: Wed Jun 12 21:35:54 2019 +0900 [FLINK-12820] [Connectors / Cassandra] Support ignoring writing nulls for tuple types This closes #8714 --- .../cassandra/AbstractCassandraTupleSink.java | 17 +++++++++++- .../connectors/cassandra/CassandraSink.java | 13 ++++++++++ .../cassandra/CassandraSinkBaseConfig.java | 27 +++++++++++++++++-- .../cassandra/CassandraConnectorITCase.java | 30 ++++++++++++++++++++++ 4 files changed, 84 insertions(+), 3 deletions(-) diff --git a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/AbstractCassandraTupleSink.java b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/AbstractCassandraTupleSink.java index 5e1fcca..3cf8aa4 100644 --- a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/AbstractCassandraTupleSink.java +++ b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/AbstractCassandraTupleSink.java @@ -19,6 +19,7 @@ package org.apache.flink.streaming.connectors.cassandra; import org.apache.flink.configuration.Configuration; +import com.datastax.driver.core.BoundStatement; import com.datastax.driver.core.PreparedStatement; import com.datastax.driver.core.ResultSet; import com.google.common.util.concurrent.ListenableFuture; @@ -31,6 +32,7 @@ import com.google.common.util.concurrent.ListenableFuture; public abstract class AbstractCassandraTupleSink<IN> extends CassandraSinkBase<IN, ResultSet> { private final String insertQuery; private transient PreparedStatement ps; + private final boolean ignoreNullFields; public AbstractCassandraTupleSink( String insertQuery, @@ -39,6 +41,7 @@ public abstract class AbstractCassandraTupleSink<IN> extends CassandraSinkBase<I CassandraFailureHandler failureHandler) { super(builder, config, failureHandler); this.insertQuery = insertQuery; + this.ignoreNullFields = config.getIgnoreNullFields(); } @Override @@ -50,7 +53,19 @@ public abstract class AbstractCassandraTupleSink<IN> extends CassandraSinkBase<I @Override public ListenableFuture<ResultSet> send(IN value) { Object[] fields = extract(value); - return session.executeAsync(ps.bind(fields)); + return session.executeAsync(bind(fields)); + } + + private BoundStatement bind(Object[] fields) { + BoundStatement bs = ps.bind(fields); + if (ignoreNullFields) { + for (int i = 0; i < fields.length; i++) { + if (fields[i] == null) { + bs.unset(i); + } + } + } + return bs; } protected abstract Object[] extract(IN record); diff --git a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java index 128e5e0..2a8ebff 100644 --- a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java +++ b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java @@ -399,6 +399,19 @@ public class CassandraSink<IN> { } /** + * Enables ignoring null values, treats null values as unset and avoids writing null fields + * and creating tombstones. + * + * <p>This call has no effect if {@link CassandraSinkBuilder#enableWriteAheadLog()} is called. + * + * @return this builder + */ + public CassandraSinkBuilder<IN> enableIgnoreNullFields() { + this.configBuilder.setIgnoreNullFields(true); + return this; + } + + /** * Finalizes the configuration of this sink. * * @return finalized sink diff --git a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBaseConfig.java b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBaseConfig.java index cb8d904..d48f973 100644 --- a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBaseConfig.java +++ b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBaseConfig.java @@ -38,6 +38,12 @@ public final class CassandraSinkBaseConfig implements Serializable { */ public static final Duration DEFAULT_MAX_CONCURRENT_REQUESTS_TIMEOUT = Duration.ofMillis(Long.MAX_VALUE); + /** + * The default option to ignore null fields on insertion. By default, {@code false}. + */ + public static final boolean DEFAULT_IGNORE_NULL_FIELDS = false; + + // ------------------------- Configuration Fields ------------------------- /** Maximum number of concurrent requests allowed. */ @@ -46,9 +52,13 @@ public final class CassandraSinkBaseConfig implements Serializable { /** Timeout duration when acquiring a permit to execute. */ private final Duration maxConcurrentRequestsTimeout; + /** Whether to ignore null fields on insert. */ + private final boolean ignoreNullFields; + private CassandraSinkBaseConfig( int maxConcurrentRequests, - Duration maxConcurrentRequestsTimeout) { + Duration maxConcurrentRequestsTimeout, + boolean ignoreNullFields) { Preconditions.checkArgument(maxConcurrentRequests > 0, "Max concurrent requests is expected to be positive"); Preconditions.checkNotNull(maxConcurrentRequestsTimeout, @@ -57,6 +67,7 @@ public final class CassandraSinkBaseConfig implements Serializable { "Max concurrent requests timeout is expected to be positive"); this.maxConcurrentRequests = maxConcurrentRequests; this.maxConcurrentRequestsTimeout = maxConcurrentRequestsTimeout; + this.ignoreNullFields = ignoreNullFields; } public int getMaxConcurrentRequests() { @@ -67,11 +78,16 @@ public final class CassandraSinkBaseConfig implements Serializable { return maxConcurrentRequestsTimeout; } + public boolean getIgnoreNullFields() { + return ignoreNullFields; + } + @Override public String toString() { return "CassandraSinkBaseConfig{" + "maxConcurrentRequests=" + maxConcurrentRequests + ", maxConcurrentRequestsTimeout=" + maxConcurrentRequestsTimeout + + ", ignoreNullFields=" + ignoreNullFields + '}'; } @@ -85,6 +101,7 @@ public final class CassandraSinkBaseConfig implements Serializable { public static class Builder { private int maxConcurrentRequests = DEFAULT_MAX_CONCURRENT_REQUESTS; private Duration maxConcurrentRequestsTimeout = DEFAULT_MAX_CONCURRENT_REQUESTS_TIMEOUT; + private boolean ignoreNullFields = DEFAULT_IGNORE_NULL_FIELDS; Builder() { } @@ -98,10 +115,16 @@ public final class CassandraSinkBaseConfig implements Serializable { return this; } + public Builder setIgnoreNullFields(boolean ignoreNullFields) { + this.ignoreNullFields = ignoreNullFields; + return this; + } + public CassandraSinkBaseConfig build() { return new CassandraSinkBaseConfig( maxConcurrentRequests, - maxConcurrentRequestsTimeout); + maxConcurrentRequestsTimeout, + ignoreNullFields); } } } diff --git a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java index b3d8f65..0119c16 100644 --- a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java +++ b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java @@ -637,4 +637,34 @@ public class CassandraConnectorITCase extends WriteAheadSinkTestBase<Tuple3<Stri } Assert.assertEquals(0, scalaTupleCollection.size()); } + + @Test + public void testCassandraScalaTuplePartialColumnUpdate() throws Exception { + CassandraSinkBaseConfig config = CassandraSinkBaseConfig.newBuilder().setIgnoreNullFields(true).build(); + CassandraScalaProductSink<scala.Tuple3<String, Integer, Integer>> sink = new CassandraScalaProductSink<>(injectTableName(INSERT_DATA_QUERY), builder, config); + + String id = UUID.randomUUID().toString(); + Integer counter = 1; + Integer batchId = 0; + + // Send partial records across multiple request + scala.Tuple3<String, Integer, Integer> scalaTupleRecordFirst = new scala.Tuple3<>(id, counter, null); + scala.Tuple3<String, Integer, Integer> scalaTupleRecordSecond = new scala.Tuple3<>(id, null, batchId); + + try { + sink.open(new Configuration()); + sink.invoke(scalaTupleRecordFirst, SinkContextUtil.forTimestamp(0)); + sink.invoke(scalaTupleRecordSecond, SinkContextUtil.forTimestamp(0)); + } finally { + sink.close(); + } + + ResultSet rs = session.execute(injectTableName(SELECT_DATA_QUERY)); + List<com.datastax.driver.core.Row> rows = rs.all(); + Assert.assertEquals(1, rows.size()); + // Since nulls are ignored, we should be reading one complete record + for (com.datastax.driver.core.Row row : rows) { + Assert.assertEquals(new scala.Tuple3<>(id, counter, batchId), new scala.Tuple3<>(row.getString("id"), row.getInt("counter"), row.getInt("batch_id"))); + } + } }