This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new d47670f61 [core] Fix nested row type convert to serializer loop (#3484)
d47670f61 is described below
commit d47670f61942bd3a73a84fd93f90f974d8feba88
Author: wgcn <[email protected]>
AuthorDate: Fri Jun 7 18:37:36 2024 +0800
[core] Fix nested row type convert to serializer loop (#3484)
---
.../java/org/apache/paimon/data/BinaryWriter.java | 30 +++++++++++++++-------
.../data/serializer/InternalRowSerializer.java | 2 +-
.../apache/paimon/flink/ReadWriteTableITCase.java | 28 ++++++++++++++++++++
3 files changed, 50 insertions(+), 10 deletions(-)
diff --git
a/paimon-common/src/main/java/org/apache/paimon/data/BinaryWriter.java
b/paimon-common/src/main/java/org/apache/paimon/data/BinaryWriter.java
index 266e409e8..8c890be52 100644
--- a/paimon-common/src/main/java/org/apache/paimon/data/BinaryWriter.java
+++ b/paimon-common/src/main/java/org/apache/paimon/data/BinaryWriter.java
@@ -24,6 +24,7 @@ import
org.apache.paimon.data.serializer.InternalRowSerializer;
import org.apache.paimon.data.serializer.InternalSerializers;
import org.apache.paimon.data.serializer.Serializer;
import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.DataTypeRoot;
import org.apache.paimon.types.DecimalType;
import org.apache.paimon.types.LocalZonedTimestampType;
import org.apache.paimon.types.TimestampType;
@@ -150,6 +151,11 @@ public interface BinaryWriter {
* @param elementType the element type
*/
static ValueSetter createValueSetter(DataType elementType) {
+ return createValueSetter(elementType, null);
+ }
+
+ static ValueSetter createValueSetter(DataType elementType, Serializer<?>
serializer) {
+ Serializer<?> finalSerializer = createSerializerIfNeed(elementType,
serializer);
// ordered by type root definition
switch (elementType.getTypeRoot()) {
case CHAR:
@@ -184,31 +190,37 @@ public interface BinaryWriter {
return (writer, pos, value) ->
writer.writeTimestamp(pos, (Timestamp) value,
timestampPrecision);
case ARRAY:
- final Serializer<InternalArray> arraySerializer =
- InternalSerializers.create(elementType);
return (writer, pos, value) ->
writer.writeArray(
pos,
(InternalArray) value,
- (InternalArraySerializer) arraySerializer);
+ (InternalArraySerializer) finalSerializer);
case MULTISET:
case MAP:
- final Serializer<InternalMap> mapSerializer =
- InternalSerializers.create(elementType);
return (writer, pos, value) ->
writer.writeMap(
- pos, (InternalMap) value,
(InternalMapSerializer) mapSerializer);
+ pos, (InternalMap) value,
(InternalMapSerializer) finalSerializer);
case ROW:
- final Serializer<InternalRow> rowSerializer =
- InternalSerializers.create(elementType);
return (writer, pos, value) ->
writer.writeRow(
- pos, (InternalRow) value,
(InternalRowSerializer) rowSerializer);
+ pos, (InternalRow) value,
(InternalRowSerializer) finalSerializer);
default:
throw new IllegalArgumentException();
}
}
+ static Serializer<?> createSerializerIfNeed(DataType elementType,
Serializer<?> serializer) {
+ Serializer<?> finalSerializer = serializer;
+ DataTypeRoot typeRoot = elementType.getTypeRoot();
+ if (finalSerializer == null
+ && (typeRoot == DataTypeRoot.MAP
+ || typeRoot == DataTypeRoot.ROW
+ || typeRoot == DataTypeRoot.ARRAY)) {
+ finalSerializer = InternalSerializers.create(elementType);
+ }
+ return finalSerializer;
+ }
+
/** Accessor for setting the elements of a binary writer during runtime. */
interface ValueSetter extends Serializable {
void setValue(BinaryWriter writer, int pos, Object value);
diff --git
a/paimon-common/src/main/java/org/apache/paimon/data/serializer/InternalRowSerializer.java
b/paimon-common/src/main/java/org/apache/paimon/data/serializer/InternalRowSerializer.java
index 8b4810f57..97a5ae31d 100644
---
a/paimon-common/src/main/java/org/apache/paimon/data/serializer/InternalRowSerializer.java
+++
b/paimon-common/src/main/java/org/apache/paimon/data/serializer/InternalRowSerializer.java
@@ -73,7 +73,7 @@ public class InternalRowSerializer extends
AbstractRowDataSerializer<InternalRow
for (int i = 0; i < types.length; i++) {
DataType type = types[i];
fieldGetters[i] = InternalRow.createFieldGetter(type, i);
- valueSetters[i] = BinaryWriter.createValueSetter(type);
+ valueSetters[i] = BinaryWriter.createValueSetter(type,
fieldSerializers[i]);
}
}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ReadWriteTableITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ReadWriteTableITCase.java
index 3fa9d4fb4..d1e9b23e1 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ReadWriteTableITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ReadWriteTableITCase.java
@@ -35,7 +35,9 @@ import org.apache.flink.api.dag.Transformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.transformations.PartitionTransformation;
+import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.connector.sink.DataStreamSinkProvider;
@@ -101,6 +103,7 @@ import static
org.apache.paimon.flink.util.ReadWriteTableTestUtil.validateStream
import static org.apache.paimon.flink.util.ReadWriteTableTestUtil.warehouse;
import static
org.apache.paimon.testutils.assertj.PaimonAssertions.anyCauseMatches;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatCode;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
/** Paimon reading and writing IT cases. */
@@ -1037,6 +1040,31 @@ public class ReadWriteTableITCase extends
AbstractTestBase {
.isEqualTo(66);
}
+ @Test
+ void testConvertRowType2Serializer() {
+ StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ StreamTableEnvironment tEnv =
+ StreamTableEnvironment.create(
+ env,
EnvironmentSettings.newInstance().inBatchMode().build());
+ tEnv.executeSql(
+ "CREATE CATALOG my_catalog WITH (\n"
+ + " 'type' = 'paimon',\n"
+ + " 'warehouse' = '"
+ + getTempDirPath()
+ + "'\n"
+ + ")");
+ tEnv.executeSql("USE CATALOG my_catalog");
+ tEnv.executeSql(
+ "CREATE TABLE tmp (\n"
+ + "execution\n"
+ + "ROW<`execution_server` STRING,
`execution_insertion` ARRAY<ROW<`platform_id` BIGINT, `user_info` ROW<`user_id`
STRING, `log_user_id` STRING, `is_internal_user` BOOLEAN, `ignore_usage`
BOOLEAN, `anon_user_id` STRING, `retained_user_id` STRING>, `timing`
ROW<`client_log_timestamp` BIGINT, `event_api_timestamp` BIGINT,
`log_timestamp` BIGINT, `processing_timestamp` BIGINT>, `client_info`
ROW<`client_type` STRING, `traffic_type` STRING>, `insertion_id` STRING, `req
[...]
+ assertThatCode(
+ () ->
+ tEnv.executeSql(
+ "INSERT INTO tmp VALUES (CAST(NULL AS
ROW<`execution_server` STRING, `execution_insertion` ARRAY<ROW<`platform_id`
BIGINT, `user_info` ROW<`user_id` STRING, `log_user_id` STRING,
`is_internal_user` BOOLEAN, `ignore_usage` BOOLEAN, `anon_user_id` STRING,
`retained_user_id` STRING>, `timing` ROW<`client_log_timestamp` BIGINT,
`event_api_timestamp` BIGINT, `log_timestamp` BIGINT, `processing_timestamp`
BIGINT>, `client_info` ROW<`client_type` STRING, [...]
+ .doesNotThrowAnyException();
+ }
+
@Test
public void testInferParallelism() throws Exception {
String table =