[
https://issues.apache.org/jira/browse/FLINK-12820?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16863009#comment-16863009
]
Ozan Cicekci commented on FLINK-12820:
--------------------------------------
Sure!
Just to clarify what I'd like to achieve, let's say we have a table in c* like
this;
{code:java}
CREATE TABLE example.test_table (id text, col1 text, col2 text, PRIMARY KEY
(id));
{code}
And we have a stream of tuples and a basic c* sink like this in Scala;
{code:java}
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.scala.{DataStream,
StreamExecutionEnvironment}
import org.apache.flink.streaming.connectors.cassandra.CassandraSink
val senv = StreamExecutionEnvironment.getExecutionEnvironment
val stream: DataStream[(String, String, String)] = senv
.fromElements(("1", "2", null), ("1", null, "3"))
CassandraSink.addSink(stream)
.setQuery("INSERT into example.test_table(id, col1, col2) values(?, ?, ?)")
.setClusterBuilder(new ClusterBuilder(props))
.build()
senv.execute("test")
{code}
Executing the below query will give the latest record in the stream like this;
{code:java}
select * from test_table ;
id | col1 | col2
---+------+------
1 | null | 3
{code}
What I'd like to achieve is this;
{code:java}
select * from test_table ;
id | col1 | col2
---+------+------
1 | 2 | 3
{code}
Coming to why this is not possible with or scala tuples or case classes,
currently all inserts of these types are done by creating a prepared statement
and executing it with [bound values
|[https://github.com/apache/flink/blob/d894d3517258191703a0a6dd6b42947c8a0d8d95/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/AbstractCassandraTupleSink.java#L53]]
In order not to write a specific column to Cassandra, the value of it should
simply be unbound. If you bind nulls to columns, then nulls will be written.
In earlier versions of Cassandra, there wasn't actually a way to have unbound
values in bound statements. On newer versions they've introduced unbinding
values from bound statements, as described here:
[https://docs.datastax.com/en/developer/java-driver/3.0/manual/statements/prepared/]
{code:java}
BoundStatement bound = ps1.bind()
.setString("sku", "324378")
.setString("description", "LCD screen");
// Using the unset method to unset previously set value.
// Positional setter:
bound.unset("description");
// Named setter:
bound.unset(1); {code}
Since all values in the object array are bound here:
[https://github.com/apache/flink/blob/d894d3517258191703a0a6dd6b42947c8a0d8d95/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/AbstractCassandraTupleSink.java#L53]
All elements of scala tuples or case classes end up being written to C*,
regardless they're null or not.
Also, it doesn't necessarily have to be null values that we're unbinding, but
null's have a specific meaning in c*. They indicate a deletion & create
tombstones which can negatively impact performance. So null values are good
candidates for unsetting from bound statements.
> Support ignoring null fields when writing to Cassandra
> ------------------------------------------------------
>
> Key: FLINK-12820
> URL: https://issues.apache.org/jira/browse/FLINK-12820
> Project: Flink
> Issue Type: Improvement
> Components: Connectors / Cassandra
> Affects Versions: 1.8.0
> Reporter: Ozan Cicekci
> Priority: Minor
> Labels: pull-request-available
> Time Spent: 10m
> Remaining Estimate: 0h
>
> Currently, records which have null fields are written to their corresponding
> columns in Cassandra as null. Writing null is basically a 'delete' for
> Cassandra, it's useful if nulls should correspond to deletes in the data
> model, but nulls can also indicate a missing data or partial column update.
> In that case, we end up overwriting columns of existing record on Cassandra
> with nulls.
>
> I believe it's already possible to ignore null values for POJO's with mapper
> options, as documented here:
> [https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/cassandra.html#cassandra-sink-example-for-streaming-pojo-data-type]
>
> But this is not possible when using scala tuples or case classes. Perhaps
> with a Cassandra sink configuration flag, null values can be unset using
> below option for tuples and case classes.
> [https://docs.datastax.com/en/drivers/java/3.0/com/datastax/driver/core/BoundStatement.html#unset-int-]
>
> Here is the equivalent configuration in spark-cassandra-connector;
> [https://github.com/datastax/spark-cassandra-connector/blob/master/doc/5_saving.md#globally-treating-all-nulls-as-unset]
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)