twalthr commented on a change in pull request #18182:
URL: https://github.com/apache/flink/pull/18182#discussion_r779454540



##########
File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/AbstractCodeGeneratorCastRule.java
##########
@@ -208,36 +213,59 @@ public String declareVariable(String type, String 
variablePrefix) {
 
         @Override
         public String declareTypeSerializer(LogicalType type) {
-            return typeSerializers
-                    .computeIfAbsent(
-                            type,
-                            t -> {
-                                Map.Entry<String, TypeSerializer<?>> e =
-                                        new SimpleImmutableEntry<>(
-                                                "typeSerializer$" + 
variableIndex,
-                                                InternalSerializers.create(t));
-                                variableIndex++;
-                                return e;
-                            })
-                    .getKey();
+            return "this."
+                    + typeSerializers
+                            .computeIfAbsent(
+                                    type,
+                                    t -> {
+                                        String term = "typeSerializer$" + 
variableIndex;
+                                        TypeSerializer<?> serializer =
+                                                InternalSerializers.create(t);
+                                        this.classFields.add(
+                                                "private final "
+                                                        + 
className(serializer.getClass())
+                                                        + " "
+                                                        + term
+                                                        + ";");
+                                        this.constructorArguments.add(
+                                                new 
SimpleImmutableEntry<>(term, serializer));
+
+                                        variableIndex++;
+                                        return new 
SimpleImmutableEntry<>(term, serializer);
+                                    })
+                            .getKey();
         }
 
         @Override
-        public String declareClassField(String type, String name, String 
initialization) {
-            this.classFields.add(type + " " + name + " = " + initialization + 
";");
-            return "this." + name;
-        }
+        public String declareDataStructureConverter(LogicalType logicalType) {
+            return "this."
+                    + dataStructureConverters
+                            .computeIfAbsent(
+                                    logicalType,
+                                    t -> {
+                                        String term = 
"dataStructureConverter$" + variableIndex;
+                                        DataStructureConverter<Object, Object> 
converter =
+                                                
DataStructureConverters.getConverter(
+                                                        DataTypes.of(t));

Review comment:
       This is dangerous for structured types. `StructuredType` -> `DataType` 
cannot always work properly and might require input from the user to perform 
the conversion. This is esp. the case when the POJO is generated and cannot be 
enriched with `@DataTypeHint` annotations. 

##########
File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/AbstractCodeGeneratorCastRule.java
##########
@@ -208,36 +213,59 @@ public String declareVariable(String type, String 
variablePrefix) {
 
         @Override
         public String declareTypeSerializer(LogicalType type) {
-            return typeSerializers
-                    .computeIfAbsent(
-                            type,
-                            t -> {
-                                Map.Entry<String, TypeSerializer<?>> e =
-                                        new SimpleImmutableEntry<>(
-                                                "typeSerializer$" + 
variableIndex,
-                                                InternalSerializers.create(t));
-                                variableIndex++;
-                                return e;
-                            })
-                    .getKey();
+            return "this."
+                    + typeSerializers
+                            .computeIfAbsent(
+                                    type,
+                                    t -> {
+                                        String term = "typeSerializer$" + 
variableIndex;
+                                        TypeSerializer<?> serializer =
+                                                InternalSerializers.create(t);
+                                        this.classFields.add(
+                                                "private final "
+                                                        + 
className(serializer.getClass())
+                                                        + " "
+                                                        + term
+                                                        + ";");
+                                        this.constructorArguments.add(
+                                                new 
SimpleImmutableEntry<>(term, serializer));
+
+                                        variableIndex++;
+                                        return new 
SimpleImmutableEntry<>(term, serializer);
+                                    })
+                            .getKey();
         }
 
         @Override
-        public String declareClassField(String type, String name, String 
initialization) {
-            this.classFields.add(type + " " + name + " = " + initialization + 
";");
-            return "this." + name;
-        }
+        public String declareDataStructureConverter(LogicalType logicalType) {
+            return "this."
+                    + dataStructureConverters
+                            .computeIfAbsent(
+                                    logicalType,
+                                    t -> {
+                                        String term = 
"dataStructureConverter$" + variableIndex;
+                                        DataStructureConverter<Object, Object> 
converter =
+                                                
DataStructureConverters.getConverter(
+                                                        DataTypes.of(t));

Review comment:
       Try:
   ```
       private static final DataType MY_STRUCTURED_TYPE =
               STRUCTURED(
                       MyStructuredType.class,
                       FIELD("a", BIGINT().notNull()),
                       FIELD("b", TIMESTAMP_LTZ(3).bridgedTo(Long.class)),
                       FIELD("c", STRING()),
                       FIELD("d", ARRAY(STRING())));
   ```
   
   To reproduce the issue. Let's always go with the map representation in Table 
API. People can still implement a UDF if needed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to