[ 
https://issues.apache.org/jira/browse/FLINK-26539?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17508372#comment-17508372
 ] 

Jean Cai commented on FLINK-26539:
----------------------------------

Hi Yuan, Jingsong, hope my following inputs could help:
 # *Use Case*
My team have ~100 Flink jobs running on production, and most of them are quite 
flexible with the output of each operator. When one kafka event come, one 
operation might have 3 fields to write to Cassandra sink; and when the next 
event come, this same operation might produce 4 fields to write. It totally 
depends on the input and which cases this event is hitting. However, overall 
each operator has a finite possibility of output fields, which we know ahead of 
time as a static fact before job start.
 # *Design*
Currently, in our use cases, we assume jobs only do upsert update, or counter 
incremental update. Let's take upsert update as example. 
Our custom Cassandra sink leverages AbstractCassandraTupleSink underlying. A 
static list of possible fields in the Flink job config is required for an 
operator, which is connected to a Cassandra sink:
{code:java}
- <cassandra-dynamic-simple-connector>
  jobName: "mock-faas-job-name"
  operationId: "loadToSimpleTable"
  type: "cassandra-dynamic-simple-connector"
  connectorType: "cassandra-dynamic-simple-connector"
  name: "mock-operator-name"
  props:
    arity: 2
    partitionKey: "key_field"
    columnNames:
    - "feature_one"
    - "key_field"
  hosts:
  - "xxxx"
  keySpace: "mock_keyspace"
  tableName: "mock_table_name" {code}
At runtime,

 ## One special operator before the event reaches Cassandra sink, is to detect 
active fields used at runtime, based on the previous operator's output. We mark 
the missing fields value as _NON_PRESENT,_ and put them in order with the 
static list of _columnNames._ 
 ## Cassandra sink filters out fields with NON_PRESENT value, generates the the 
upsert query at runtime based on the active {_}columnNames{_}, _keySpace_ and 
{_}tableName{_}. 
 ## Cassandra sink has a in memory cache for 
[PreparedStatement|https://docs.datastax.com/en/developer/java-driver/4.2/manual/core/statements/prepared/]
 which get hit, to help with performance. 
 # *The Best Way of Doing This*
I provided my team's current solution above as a reference. I am more than 
happy to discuss with you folks about more potential alternatives if needed.

Let me try to draw a diagram about the workflow soon to state the design 
clearer. Thanks!

> Support dynamic partial features update in Cassandra connector
> --------------------------------------------------------------
>
>                 Key: FLINK-26539
>                 URL: https://issues.apache.org/jira/browse/FLINK-26539
>             Project: Flink
>          Issue Type: New Feature
>          Components: Connectors / Cassandra
>    Affects Versions: 1.16.0
>            Reporter: Zhenqiu Huang
>            Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to