[
https://issues.apache.org/jira/browse/FLINK-29738?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Bill G updated FLINK-29738:
---------------------------
Description:
When streaming POJO types, the codec is registered automatically. However, when
streaming a tuples containing a UDT, the cassandra driver can't serialize the
type.
Motivating Example: If we have a table containing a collection of UDTs, then
the only way to append is through a tuple stream.
{code:java}
create type link (
title text,
url text
);
create table users (
id int primary key,
links set<frozen<link>>
);
{code}
If we were to use a POJO stream, the field containing the collection would be
overwritten with a new collection on each upsert. If we set the query in a
tuple stream:
{code:java}
DataStream<Tuple2<Link, Integer>> linkStream = ...
CassandraSink.addSink(linkStream)
.setQuery("update users set links = links + ? where id = ?")
...
.build();
{code}
We will get a {{{}CodecNotFoundException{}}}. Using the datastax java driver
outside of the Flink framework, it is easy to register a codec:
{code:java}
Session session = cluster.connect();
new MappingManager(session).udtCodec(Link.class);
{code}
However, this requires access to a session, which {{ClusterBuilder}} does not
expose in any way.
Potential solutions: expose {{Session}} or {{MapperManager}} in some way to the
{{ClusterBuilder}} class or create some method such as {{registerUDT}} on
{{{}CassandraSinkBuilder{}}}.
was:
When streaming POJO types, the codec is registered automatically. However, when
streaming a tuples containing a UDT, the cassandra driver can't serialize the
type.
Motivating Example: If we have a table containing a collection of UDTs, then
the only way to append is through a tuple stream.
{code:java}
create type link (
title text,
url text
);
create table users (
id int primary key,
links set<frozen<link>>
);
{code}
If we were to use a POJO stream, the field containing the collection would be
overwritten with a new collection on each upsert. If we set the query in a
tuple stream:
{code:java}
DataStream<Tuple2<Link, Integer>> linkStream = ...
CassandraSink.addSink(linkStream)
.setQuery("update users set link = link + ? where id = ?")
...
.build();
{code}
We will get a {{{}CodecNotFoundException{}}}. Using the datastax java driver
outside of the Flink framework, it is easy to register a codec:
{code:java}
Session session = cluster.connect();
new MappingManager(session).udtCodec(Link.class);
{code}
However, this requires access to a session, which {{ClusterBuilder}} does not
expose in any way.
Potential solutions: expose {{Session}} or {{MapperManager}} in some way to the
{{ClusterBuilder}} class or create some method such as {{registerUDT}} on
{{{}CassandraSinkBuilder{}}}.
> Allow UDT codec registration for CassandraSinkBuilder
> -----------------------------------------------------
>
> Key: FLINK-29738
> URL: https://issues.apache.org/jira/browse/FLINK-29738
> Project: Flink
> Issue Type: New Feature
> Components: Connectors / Cassandra
> Affects Versions: 1.16.0
> Reporter: Bill G
> Priority: Major
>
> When streaming POJO types, the codec is registered automatically. However,
> when streaming a tuples containing a UDT, the cassandra driver can't
> serialize the type.
> Motivating Example: If we have a table containing a collection of UDTs, then
> the only way to append is through a tuple stream.
> {code:java}
> create type link (
> title text,
> url text
> );
> create table users (
> id int primary key,
> links set<frozen<link>>
> );
> {code}
> If we were to use a POJO stream, the field containing the collection would be
> overwritten with a new collection on each upsert. If we set the query in a
> tuple stream:
> {code:java}
> DataStream<Tuple2<Link, Integer>> linkStream = ...
> CassandraSink.addSink(linkStream)
> .setQuery("update users set links = links + ? where id = ?")
> ...
> .build();
> {code}
> We will get a {{{}CodecNotFoundException{}}}. Using the datastax java driver
> outside of the Flink framework, it is easy to register a codec:
> {code:java}
> Session session = cluster.connect();
> new MappingManager(session).udtCodec(Link.class);
> {code}
> However, this requires access to a session, which {{ClusterBuilder}} does not
> expose in any way.
> Potential solutions: expose {{Session}} or {{MapperManager}} in some way to
> the {{ClusterBuilder}} class or create some method such as {{registerUDT}} on
> {{{}CassandraSinkBuilder{}}}.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)