[ 
https://issues.apache.org/jira/browse/FLINK-29738?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bill G updated FLINK-29738:
---------------------------
    Description: 
When streaming POJO types, the codec is registered automatically. However, when 
streaming a tuples containing a UDT, the cassandra driver can't serialize the 
type.

Motivating Example: If we have a table containing a collection of UDTs, then 
the only way to append is through a tuple stream.
{code:java}
create type link (
  title text,
  url text
);

create table users (
  id int primary key,
  links set<frozen<link>>
);
{code}
If we were to use a POJO stream, the field containing the collection would be 
overwritten with a new collection on each upsert. If we set the query in a 
tuple stream:
{code:java}
DataStream<Tuple2<Link, Integer>> linkStream = ...
CassandraSink.addSink(linkStream)
  .setQuery("update users set link = link + ? where id = ?")
  ...
  .build();
{code}
We will get a {{{}CodecNotFoundException{}}}. Using the datastax java driver 
outside of the Flink framework, it is easy to register a codec:
{code:java}
Session session = cluster.connect();
new MappingManager(session).udtCodec(Link.class);
{code}
However, this requires access to a session, which {{ClusterBuilder}} does not 
expose in any way.

Potential solutions: expose {{Session}} or {{MapperManager}} in some way to the 
{{ClusterBuilder}} class or create some method such as {{registerUDT}} on 
{{{}CassandraSinkBuilder{}}}.

  was:
When streaming POJO types, the codec is registered automatically. However, when 
streaming a tuples containing a UDT, the cassandra driver can't serialize the 
type.

Motivating Example: If we have a table containing a collection of UDTs, then 
the only way to append is through a tuple stream.
{code:java}
create type link (
  title text,
  url text
);

create table users (
  id int primary key,
  links set<frozen<link>>
);
{code}
If we were to use a POJO stream, the field containing the collection would be 
overwritten with a new collection on each upsert. If we set the query in a 
tuple stream:
{code:java}
DataStream<Tuple2<Link, Integer>> linkStream = ...
CassandraSink.addSink(linkStream)
  .setQuery("update users set link = link + ? where id = ?")
  ...
  .build();
{code}
We will get a {{{}CodecNotFoundException{}}}. Using the datastax java driver 
outside of the Flink framework, it is easy to register a codec:
{code:java}
Session session = cluster.connect();
new MappingManager(session).udtCodec(Link.class);
{code}
However, this requires access to session, which {{ClusterBuilder}} does not 
expose in any way.

Potential solutions: expose {{Session}} or {{MapperManager}} in some way to the 
{{ClusterBuilder}} class or create some method such as {{registerUDT}} on 
{{{}CassandraSinkBuilder{}}}.


> Allow UDT codec registration for CassandraSinkBuilder
> -----------------------------------------------------
>
>                 Key: FLINK-29738
>                 URL: https://issues.apache.org/jira/browse/FLINK-29738
>             Project: Flink
>          Issue Type: New Feature
>          Components: Connectors / Cassandra
>    Affects Versions: 1.16.0
>            Reporter: Bill G
>            Priority: Major
>
> When streaming POJO types, the codec is registered automatically. However, 
> when streaming a tuples containing a UDT, the cassandra driver can't 
> serialize the type.
> Motivating Example: If we have a table containing a collection of UDTs, then 
> the only way to append is through a tuple stream.
> {code:java}
> create type link (
>   title text,
>   url text
> );
> create table users (
>   id int primary key,
>   links set<frozen<link>>
> );
> {code}
> If we were to use a POJO stream, the field containing the collection would be 
> overwritten with a new collection on each upsert. If we set the query in a 
> tuple stream:
> {code:java}
> DataStream<Tuple2<Link, Integer>> linkStream = ...
> CassandraSink.addSink(linkStream)
>   .setQuery("update users set link = link + ? where id = ?")
>   ...
>   .build();
> {code}
> We will get a {{{}CodecNotFoundException{}}}. Using the datastax java driver 
> outside of the Flink framework, it is easy to register a codec:
> {code:java}
> Session session = cluster.connect();
> new MappingManager(session).udtCodec(Link.class);
> {code}
> However, this requires access to a session, which {{ClusterBuilder}} does not 
> expose in any way.
> Potential solutions: expose {{Session}} or {{MapperManager}} in some way to 
> the {{ClusterBuilder}} class or create some method such as {{registerUDT}} on 
> {{{}CassandraSinkBuilder{}}}.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to