This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new d47670f61 [core] Fix nested row type convert to serializer loop (#3484)
d47670f61 is described below

commit d47670f61942bd3a73a84fd93f90f974d8feba88
Author: wgcn <[email protected]>
AuthorDate: Fri Jun 7 18:37:36 2024 +0800

    [core] Fix nested row type convert to serializer loop (#3484)
---
 .../java/org/apache/paimon/data/BinaryWriter.java  | 30 +++++++++++++++-------
 .../data/serializer/InternalRowSerializer.java     |  2 +-
 .../apache/paimon/flink/ReadWriteTableITCase.java  | 28 ++++++++++++++++++++
 3 files changed, 50 insertions(+), 10 deletions(-)

diff --git 
a/paimon-common/src/main/java/org/apache/paimon/data/BinaryWriter.java 
b/paimon-common/src/main/java/org/apache/paimon/data/BinaryWriter.java
index 266e409e8..8c890be52 100644
--- a/paimon-common/src/main/java/org/apache/paimon/data/BinaryWriter.java
+++ b/paimon-common/src/main/java/org/apache/paimon/data/BinaryWriter.java
@@ -24,6 +24,7 @@ import 
org.apache.paimon.data.serializer.InternalRowSerializer;
 import org.apache.paimon.data.serializer.InternalSerializers;
 import org.apache.paimon.data.serializer.Serializer;
 import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.DataTypeRoot;
 import org.apache.paimon.types.DecimalType;
 import org.apache.paimon.types.LocalZonedTimestampType;
 import org.apache.paimon.types.TimestampType;
@@ -150,6 +151,11 @@ public interface BinaryWriter {
      * @param elementType the element type
      */
     static ValueSetter createValueSetter(DataType elementType) {
+        return createValueSetter(elementType, null);
+    }
+
+    static ValueSetter createValueSetter(DataType elementType, Serializer<?> 
serializer) {
+        Serializer<?> finalSerializer = createSerializerIfNeed(elementType, 
serializer);
         // ordered by type root definition
         switch (elementType.getTypeRoot()) {
             case CHAR:
@@ -184,31 +190,37 @@ public interface BinaryWriter {
                 return (writer, pos, value) ->
                         writer.writeTimestamp(pos, (Timestamp) value, 
timestampPrecision);
             case ARRAY:
-                final Serializer<InternalArray> arraySerializer =
-                        InternalSerializers.create(elementType);
                 return (writer, pos, value) ->
                         writer.writeArray(
                                 pos,
                                 (InternalArray) value,
-                                (InternalArraySerializer) arraySerializer);
+                                (InternalArraySerializer) finalSerializer);
             case MULTISET:
             case MAP:
-                final Serializer<InternalMap> mapSerializer =
-                        InternalSerializers.create(elementType);
                 return (writer, pos, value) ->
                         writer.writeMap(
-                                pos, (InternalMap) value, 
(InternalMapSerializer) mapSerializer);
+                                pos, (InternalMap) value, 
(InternalMapSerializer) finalSerializer);
             case ROW:
-                final Serializer<InternalRow> rowSerializer =
-                        InternalSerializers.create(elementType);
                 return (writer, pos, value) ->
                         writer.writeRow(
-                                pos, (InternalRow) value, 
(InternalRowSerializer) rowSerializer);
+                                pos, (InternalRow) value, 
(InternalRowSerializer) finalSerializer);
             default:
                 throw new IllegalArgumentException();
         }
     }
 
+    static Serializer<?> createSerializerIfNeed(DataType elementType, 
Serializer<?> serializer) {
+        Serializer<?> finalSerializer = serializer;
+        DataTypeRoot typeRoot = elementType.getTypeRoot();
+        if (finalSerializer == null
+                && (typeRoot == DataTypeRoot.MAP
+                        || typeRoot == DataTypeRoot.ROW
+                        || typeRoot == DataTypeRoot.ARRAY)) {
+            finalSerializer = InternalSerializers.create(elementType);
+        }
+        return finalSerializer;
+    }
+
     /** Accessor for setting the elements of a binary writer during runtime. */
     interface ValueSetter extends Serializable {
         void setValue(BinaryWriter writer, int pos, Object value);
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/data/serializer/InternalRowSerializer.java
 
b/paimon-common/src/main/java/org/apache/paimon/data/serializer/InternalRowSerializer.java
index 8b4810f57..97a5ae31d 100644
--- 
a/paimon-common/src/main/java/org/apache/paimon/data/serializer/InternalRowSerializer.java
+++ 
b/paimon-common/src/main/java/org/apache/paimon/data/serializer/InternalRowSerializer.java
@@ -73,7 +73,7 @@ public class InternalRowSerializer extends 
AbstractRowDataSerializer<InternalRow
         for (int i = 0; i < types.length; i++) {
             DataType type = types[i];
             fieldGetters[i] = InternalRow.createFieldGetter(type, i);
-            valueSetters[i] = BinaryWriter.createValueSetter(type);
+            valueSetters[i] = BinaryWriter.createValueSetter(type, 
fieldSerializers[i]);
         }
     }
 
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ReadWriteTableITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ReadWriteTableITCase.java
index 3fa9d4fb4..d1e9b23e1 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ReadWriteTableITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ReadWriteTableITCase.java
@@ -35,7 +35,9 @@ import org.apache.flink.api.dag.Transformation;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.transformations.PartitionTransformation;
+import org.apache.flink.table.api.EnvironmentSettings;
 import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
 import org.apache.flink.table.catalog.ObjectIdentifier;
 import org.apache.flink.table.connector.sink.DataStreamSinkProvider;
@@ -101,6 +103,7 @@ import static 
org.apache.paimon.flink.util.ReadWriteTableTestUtil.validateStream
 import static org.apache.paimon.flink.util.ReadWriteTableTestUtil.warehouse;
 import static 
org.apache.paimon.testutils.assertj.PaimonAssertions.anyCauseMatches;
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatCode;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 /** Paimon reading and writing IT cases. */
@@ -1037,6 +1040,31 @@ public class ReadWriteTableITCase extends 
AbstractTestBase {
                 .isEqualTo(66);
     }
 
+    @Test
+    void testConvertRowType2Serializer() {
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        StreamTableEnvironment tEnv =
+                StreamTableEnvironment.create(
+                        env, 
EnvironmentSettings.newInstance().inBatchMode().build());
+        tEnv.executeSql(
+                "CREATE CATALOG my_catalog WITH (\n"
+                        + "    'type' = 'paimon',\n"
+                        + "    'warehouse' = '"
+                        + getTempDirPath()
+                        + "'\n"
+                        + ")");
+        tEnv.executeSql("USE CATALOG my_catalog");
+        tEnv.executeSql(
+                "CREATE TABLE tmp (\n"
+                        + "execution\n"
+                        + "ROW<`execution_server` STRING, 
`execution_insertion` ARRAY<ROW<`platform_id` BIGINT, `user_info` ROW<`user_id` 
STRING, `log_user_id` STRING, `is_internal_user` BOOLEAN, `ignore_usage` 
BOOLEAN, `anon_user_id` STRING, `retained_user_id` STRING>, `timing` 
ROW<`client_log_timestamp` BIGINT, `event_api_timestamp` BIGINT, 
`log_timestamp` BIGINT, `processing_timestamp` BIGINT>, `client_info` 
ROW<`client_type` STRING, `traffic_type` STRING>, `insertion_id` STRING, `req 
[...]
+        assertThatCode(
+                        () ->
+                                tEnv.executeSql(
+                                        "INSERT INTO tmp VALUES (CAST(NULL AS 
ROW<`execution_server` STRING, `execution_insertion` ARRAY<ROW<`platform_id` 
BIGINT, `user_info` ROW<`user_id` STRING, `log_user_id` STRING, 
`is_internal_user` BOOLEAN, `ignore_usage` BOOLEAN, `anon_user_id` STRING, 
`retained_user_id` STRING>, `timing` ROW<`client_log_timestamp` BIGINT, 
`event_api_timestamp` BIGINT, `log_timestamp` BIGINT, `processing_timestamp` 
BIGINT>, `client_info` ROW<`client_type` STRING,  [...]
+                .doesNotThrowAnyException();
+    }
+
     @Test
     public void testInferParallelism() throws Exception {
         String table =

Reply via email to