This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a commit to branch release-0.8 in repository https://gitbox.apache.org/repos/asf/paimon.git
commit 462828752700724f9d48300f7e312c59f8cca389 Author: wgcn <[email protected]> AuthorDate: Fri Jun 7 18:37:36 2024 +0800 [core] Fix nested row type convert to serializer loop (#3484) --- .../java/org/apache/paimon/data/BinaryWriter.java | 30 +++++++++++++++------- .../data/serializer/InternalRowSerializer.java | 2 +- .../apache/paimon/flink/ReadWriteTableITCase.java | 28 ++++++++++++++++++++ 3 files changed, 50 insertions(+), 10 deletions(-) diff --git a/paimon-common/src/main/java/org/apache/paimon/data/BinaryWriter.java b/paimon-common/src/main/java/org/apache/paimon/data/BinaryWriter.java index 266e409e8..8c890be52 100644 --- a/paimon-common/src/main/java/org/apache/paimon/data/BinaryWriter.java +++ b/paimon-common/src/main/java/org/apache/paimon/data/BinaryWriter.java @@ -24,6 +24,7 @@ import org.apache.paimon.data.serializer.InternalRowSerializer; import org.apache.paimon.data.serializer.InternalSerializers; import org.apache.paimon.data.serializer.Serializer; import org.apache.paimon.types.DataType; +import org.apache.paimon.types.DataTypeRoot; import org.apache.paimon.types.DecimalType; import org.apache.paimon.types.LocalZonedTimestampType; import org.apache.paimon.types.TimestampType; @@ -150,6 +151,11 @@ public interface BinaryWriter { * @param elementType the element type */ static ValueSetter createValueSetter(DataType elementType) { + return createValueSetter(elementType, null); + } + + static ValueSetter createValueSetter(DataType elementType, Serializer<?> serializer) { + Serializer<?> finalSerializer = createSerializerIfNeed(elementType, serializer); // ordered by type root definition switch (elementType.getTypeRoot()) { case CHAR: @@ -184,31 +190,37 @@ public interface BinaryWriter { return (writer, pos, value) -> writer.writeTimestamp(pos, (Timestamp) value, timestampPrecision); case ARRAY: - final Serializer<InternalArray> arraySerializer = - InternalSerializers.create(elementType); return (writer, pos, value) -> writer.writeArray( pos, (InternalArray) value, - (InternalArraySerializer) arraySerializer); + (InternalArraySerializer) finalSerializer); case MULTISET: case MAP: - final Serializer<InternalMap> mapSerializer = - InternalSerializers.create(elementType); return (writer, pos, value) -> writer.writeMap( - pos, (InternalMap) value, (InternalMapSerializer) mapSerializer); + pos, (InternalMap) value, (InternalMapSerializer) finalSerializer); case ROW: - final Serializer<InternalRow> rowSerializer = - InternalSerializers.create(elementType); return (writer, pos, value) -> writer.writeRow( - pos, (InternalRow) value, (InternalRowSerializer) rowSerializer); + pos, (InternalRow) value, (InternalRowSerializer) finalSerializer); default: throw new IllegalArgumentException(); } } + static Serializer<?> createSerializerIfNeed(DataType elementType, Serializer<?> serializer) { + Serializer<?> finalSerializer = serializer; + DataTypeRoot typeRoot = elementType.getTypeRoot(); + if (finalSerializer == null + && (typeRoot == DataTypeRoot.MAP + || typeRoot == DataTypeRoot.ROW + || typeRoot == DataTypeRoot.ARRAY)) { + finalSerializer = InternalSerializers.create(elementType); + } + return finalSerializer; + } + /** Accessor for setting the elements of a binary writer during runtime. */ interface ValueSetter extends Serializable { void setValue(BinaryWriter writer, int pos, Object value); diff --git a/paimon-common/src/main/java/org/apache/paimon/data/serializer/InternalRowSerializer.java b/paimon-common/src/main/java/org/apache/paimon/data/serializer/InternalRowSerializer.java index 8b4810f57..97a5ae31d 100644 --- a/paimon-common/src/main/java/org/apache/paimon/data/serializer/InternalRowSerializer.java +++ b/paimon-common/src/main/java/org/apache/paimon/data/serializer/InternalRowSerializer.java @@ -73,7 +73,7 @@ public class InternalRowSerializer extends AbstractRowDataSerializer<InternalRow for (int i = 0; i < types.length; i++) { DataType type = types[i]; fieldGetters[i] = InternalRow.createFieldGetter(type, i); - valueSetters[i] = BinaryWriter.createValueSetter(type); + valueSetters[i] = BinaryWriter.createValueSetter(type, fieldSerializers[i]); } } diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ReadWriteTableITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ReadWriteTableITCase.java index 4b28bbc40..fc141ae84 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ReadWriteTableITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ReadWriteTableITCase.java @@ -35,7 +35,9 @@ import org.apache.flink.api.dag.Transformation; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSink; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.transformations.PartitionTransformation; +import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.table.catalog.ObjectIdentifier; import org.apache.flink.table.connector.sink.DataStreamSinkProvider; @@ -101,6 +103,7 @@ import static org.apache.paimon.flink.util.ReadWriteTableTestUtil.validateStream import static org.apache.paimon.flink.util.ReadWriteTableTestUtil.warehouse; import static org.apache.paimon.testutils.assertj.PaimonAssertions.anyCauseMatches; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatCode; import static org.assertj.core.api.Assertions.assertThatThrownBy; /** Paimon reading and writing IT cases. */ @@ -1019,6 +1022,31 @@ public class ReadWriteTableITCase extends AbstractTestBase { .isEqualTo(66); } + @Test + void testConvertRowType2Serializer() { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + StreamTableEnvironment tEnv = + StreamTableEnvironment.create( + env, EnvironmentSettings.newInstance().inBatchMode().build()); + tEnv.executeSql( + "CREATE CATALOG my_catalog WITH (\n" + + " 'type' = 'paimon',\n" + + " 'warehouse' = '" + + getTempDirPath() + + "'\n" + + ")"); + tEnv.executeSql("USE CATALOG my_catalog"); + tEnv.executeSql( + "CREATE TABLE tmp (\n" + + "execution\n" + + "ROW<`execution_server` STRING, `execution_insertion` ARRAY<ROW<`platform_id` BIGINT, `user_info` ROW<`user_id` STRING, `log_user_id` STRING, `is_internal_user` BOOLEAN, `ignore_usage` BOOLEAN, `anon_user_id` STRING, `retained_user_id` STRING>, `timing` ROW<`client_log_timestamp` BIGINT, `event_api_timestamp` BIGINT, `log_timestamp` BIGINT, `processing_timestamp` BIGINT>, `client_info` ROW<`client_type` STRING, `traffic_type` STRING>, `insertion_id` STRING, `req [...] + assertThatCode( + () -> + tEnv.executeSql( + "INSERT INTO tmp VALUES (CAST(NULL AS ROW<`execution_server` STRING, `execution_insertion` ARRAY<ROW<`platform_id` BIGINT, `user_info` ROW<`user_id` STRING, `log_user_id` STRING, `is_internal_user` BOOLEAN, `ignore_usage` BOOLEAN, `anon_user_id` STRING, `retained_user_id` STRING>, `timing` ROW<`client_log_timestamp` BIGINT, `event_api_timestamp` BIGINT, `log_timestamp` BIGINT, `processing_timestamp` BIGINT>, `client_info` ROW<`client_type` STRING, [...] + .doesNotThrowAnyException(); + } + @Test public void testInferParallelism() throws Exception { String table =
