[
https://issues.apache.org/jira/browse/FLINK-35726?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17867168#comment-17867168
]
Santwana Verma commented on FLINK-35726:
----------------------------------------
Hey [~david.perkins]
I looked into this and this seems like this is the intended behaviour. There
are two ways to achieve the correct translation:
* Specifying schema in the `fromDataStream` method as described in my previous
message.
* Creating a TypeInformation for the class `MyRecord`. You can do this by
providing a
[TypeInfoFactory|https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/serialization/types_serialization/#defining-type-information-using-a-factory].
The factory should return a `TupleTypeInfo` for this use case.
The first way is much more straightforward.
Let me know in case you face any further issues.
> Data Stream to Table API converts Map to RAW 'java.util.Map'
> ------------------------------------------------------------
>
> Key: FLINK-35726
> URL: https://issues.apache.org/jira/browse/FLINK-35726
> Project: Flink
> Issue Type: Bug
> Components: Table SQL / Runtime
> Affects Versions: 1.17.2
> Reporter: David Perkins
> Assignee: Santwana Verma
> Priority: Major
>
> We have a use case where we convert from the Table API to a Data Stream with
> a class, perform some operations, and then convert back to the Table API.
> When the data contains a Map, the conversion back to the Table API converts
> the Map to {{{}RAW('java.util.Map', '...'){}}}. This causes an 'Incompatible
> types for sink column' exception.
> In this particular case, the Map contains the Kafka headers, which we need to
> preserve and write to the output topic. Both topics/table definitions use the
> same schema. We have set a {{DataTypeHint}} annotation on the Map field in
> the Java class. We are currently working around this issue by using a UDF to
> simply perform a type conversion from the RAW Java Map to the Table API Map.
> One note is that if no operations are performed on the stream, it work's
> correctly. But adding a simple identity map causes the exception.
> Here's a simple example to reproduce the problem.
> CREATE TABLE Source (
> id STRING,
> headers MAP<STRING, BYTES> METADATA
> ) WITH (
> 'connector' = 'kafka',
> 'topic' = 'source',
> 'properties.bootstrap.servers' = 'kafka-bootstrap-server',
> 'format' = 'json'
> );
> CREATE TABLE Target (
> id STRING,
> headers MAP<STRING, BYTES> METADATA
> ) WITH (
> 'connector' = 'kafka',
> 'topic' = 'target',
> 'properties.bootstrap.servers' = 'kafka-bootstrap-server',
> 'format' = 'json'
> );
> public class MyRecord {
> private String id;
> @DataTypeHint(value = "MAP<STRING, BYTES>")
> private Map<String,byte[]> headers;
> ...
> }
> public class MyJob {
> public static void main(String[] args) throws Exception{
> final StreamExecutionEnvironment streamEnv =
> StreamExecutionEnvironment.getExecutionEnvironment();
> final StreamTableEnvironment tableEnv =
> StreamTableEnvironment.create(streamEnv); Table sourceTable =
> tableEnv.from("Source"); var sourceStream =
> tableEnv.toDataStream(sourceTable, MyRecord.class); var mappedStream =
> sourceStream.map(row -> row); Table outputTable =
> tableEnv.fromDataStream(mappedStream);
> tableEnv.createStatementSet().add(outputTable.insertInto("Target"))
> .attachAsDataStream(); streamEnv.executeAsync("Table Datastream test");
> }
> }
--
This message was sent by Atlassian Jira
(v8.20.10#820010)