[
https://issues.apache.org/jira/browse/FLINK-26539?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17508372#comment-17508372
]
Jean Cai commented on FLINK-26539:
----------------------------------
Hi Yuan, Jingsong, hope my following inputs could help:
# *Use Case*
My team have ~100 Flink jobs running on production, and most of them are quite
flexible with the output of each operator. When one kafka event come, one
operation might have 3 fields to write to Cassandra sink; and when the next
event come, this same operation might produce 4 fields to write. It totally
depends on the input and which cases this event is hitting. However, overall
each operator has a finite possibility of output fields, which we know ahead of
time as a static fact before job start.
# *Design*
Currently, in our use cases, we assume jobs only do upsert update, or counter
incremental update. Let's take upsert update as example.
Our custom Cassandra sink leverages AbstractCassandraTupleSink underlying. A
static list of possible fields in the Flink job config is required for an
operator, which is connected to a Cassandra sink:
{code:java}
- <cassandra-dynamic-simple-connector>
jobName: "mock-faas-job-name"
operationId: "loadToSimpleTable"
type: "cassandra-dynamic-simple-connector"
connectorType: "cassandra-dynamic-simple-connector"
name: "mock-operator-name"
props:
arity: 2
partitionKey: "key_field"
columnNames:
- "feature_one"
- "key_field"
hosts:
- "xxxx"
keySpace: "mock_keyspace"
tableName: "mock_table_name" {code}
At runtime,
## One special operator before the event reaches Cassandra sink, is to detect
active fields used at runtime, based on the previous operator's output. We mark
the missing fields value as _NON_PRESENT,_ and put them in order with the
static list of _columnNames._
## Cassandra sink filters out fields with NON_PRESENT value, generates the the
upsert query at runtime based on the active {_}columnNames{_}, _keySpace_ and
{_}tableName{_}.
## Cassandra sink has a in memory cache for
[PreparedStatement|https://docs.datastax.com/en/developer/java-driver/4.2/manual/core/statements/prepared/]
which get hit, to help with performance.
# *The Best Way of Doing This*
I provided my team's current solution above as a reference. I am more than
happy to discuss with you folks about more potential alternatives if needed.
Let me try to draw a diagram about the workflow soon to state the design
clearer. Thanks!
> Support dynamic partial features update in Cassandra connector
> --------------------------------------------------------------
>
> Key: FLINK-26539
> URL: https://issues.apache.org/jira/browse/FLINK-26539
> Project: Flink
> Issue Type: New Feature
> Components: Connectors / Cassandra
> Affects Versions: 1.16.0
> Reporter: Zhenqiu Huang
> Priority: Major
>
--
This message was sent by Atlassian Jira
(v8.20.1#820001)