This is an automated email from the ASF dual-hosted git repository.
libenchao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new df03ada10e2 [FLINK-33611][flink-protobuf] Split last segment only when
size exceeds split threshold limit in deserializer
df03ada10e2 is described below
commit df03ada10e226053780cb2e5e9742add4536289c
Author: dsaisharath <[email protected]>
AuthorDate: Wed Dec 13 14:14:38 2023 -0800
[FLINK-33611][flink-protobuf] Split last segment only when size exceeds
split threshold limit in deserializer
Close apache/flink#23937
---
.../deserialize/PbCodegenRowDeserializer.java | 5 +-
.../protobuf/serialize/PbCodegenRowSerializer.java | 8 +-
.../formats/protobuf/VeryBigPbProtoToRowTest.java | 39 +++++++++
.../formats/protobuf/VeryBigPbRowToProtoTest.java | 39 +++++++++
.../src/test/proto/test_very_big_pb.proto | 98 ++++++++++++++++++++++
5 files changed, 178 insertions(+), 11 deletions(-)
diff --git
a/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/deserialize/PbCodegenRowDeserializer.java
b/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/deserialize/PbCodegenRowDeserializer.java
index f3cf414f540..541b1f83839 100644
---
a/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/deserialize/PbCodegenRowDeserializer.java
+++
b/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/deserialize/PbCodegenRowDeserializer.java
@@ -109,10 +109,7 @@ public class PbCodegenRowDeserializer implements
PbCodegenDeserializer {
index += 1;
}
if (!splitAppender.code().isEmpty()) {
- String splitMethod =
- formatContext.splitDeserializerRowTypeMethod(
- flinkRowDataVar, pbMessageTypeStr, pbMessageVar,
splitAppender.code());
- appender.appendSegment(splitMethod);
+ appender.appendSegment(splitAppender.code());
}
appender.appendLine(resultVar + " = " + flinkRowDataVar);
return appender.code();
diff --git
a/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/serialize/PbCodegenRowSerializer.java
b/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/serialize/PbCodegenRowSerializer.java
index 342117cd272..6f72ab83b81 100644
---
a/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/serialize/PbCodegenRowSerializer.java
+++
b/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/serialize/PbCodegenRowSerializer.java
@@ -130,13 +130,7 @@ public class PbCodegenRowSerializer implements
PbCodegenSerializer {
index += 1;
}
if (!splitAppender.code().isEmpty()) {
- String splitMethod =
- formatContext.splitSerializerRowTypeMethod(
- flinkRowDataVar,
- pbMessageTypeStr + ".Builder",
- messageBuilderVar,
- splitAppender.code());
- appender.appendSegment(splitMethod);
+ appender.appendSegment(splitAppender.code());
}
appender.appendLine(resultVar + " = " + messageBuilderVar +
".build()");
return appender.code();
diff --git
a/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/VeryBigPbProtoToRowTest.java
b/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/VeryBigPbProtoToRowTest.java
new file mode 100644
index 00000000000..a3e09ff34c8
--- /dev/null
+++
b/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/VeryBigPbProtoToRowTest.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.protobuf;
+
+import org.apache.flink.formats.protobuf.testproto.VeryBigPbClass;
+
+import org.junit.Test;
+
+/**
+ * Test for very huge proto definition, which may trigger some special
optimizations such as code
+ * splitting and java constant pool size optimization.
+ */
+public class VeryBigPbProtoToRowTest {
+
+ @Test
+ public void testSimple() throws Exception {
+ VeryBigPbClass.VeryBigPbMessage veryBigPbMessage =
+ VeryBigPbClass.VeryBigPbMessage.newBuilder().build();
+ // test generated code can be compiled
+ ProtobufTestHelper.pbBytesToRow(
+ VeryBigPbClass.VeryBigPbMessage.class,
veryBigPbMessage.toByteArray());
+ }
+}
diff --git
a/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/VeryBigPbRowToProtoTest.java
b/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/VeryBigPbRowToProtoTest.java
new file mode 100644
index 00000000000..57e0eb71506
--- /dev/null
+++
b/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/VeryBigPbRowToProtoTest.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.protobuf;
+
+import org.apache.flink.formats.protobuf.testproto.VeryBigPbClass;
+import org.apache.flink.table.data.GenericRowData;
+
+import org.junit.Test;
+
+/**
+ * Test for very huge proto definition, which may trigger some special
optimizations such as code
+ * splitting and java constant pool size optimization.
+ */
+public class VeryBigPbRowToProtoTest {
+
+ @Test
+ public void testSimple() throws Exception {
+ GenericRowData rowData = new GenericRowData(4);
+
+ // test generated code can be compiled
+ ProtobufTestHelper.rowToPbBytes(rowData,
VeryBigPbClass.VeryBigPbMessage.class);
+ }
+}
diff --git a/flink-formats/flink-protobuf/src/test/proto/test_very_big_pb.proto
b/flink-formats/flink-protobuf/src/test/proto/test_very_big_pb.proto
new file mode 100644
index 00000000000..19a937ea2be
--- /dev/null
+++ b/flink-formats/flink-protobuf/src/test/proto/test_very_big_pb.proto
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+syntax = "proto3";
+
+package org.apache.flink.formats.protobuf.testproto;
+
+option java_package = "org.apache.flink.formats.protobuf.testproto";
+option java_outer_classname = "VeryBigPbClass";
+
+message VeryBigPbMessage {
+ NestedType1 nested_field1 = 1;
+ NestedType1 nested_field2 = 2;
+}
+
+message NestedType1 {
+ NestedType2 nested_field1 = 1;
+ NestedType2 nested_field2 = 2;
+}
+
+message NestedType2 {
+ NestedType3 nested_field1 = 1;
+ NestedType3 nested_field2 = 2;
+}
+
+message NestedType3 {
+ NestedType4 nested_field1 = 1;
+ NestedType4 nested_field2 = 2;
+}
+
+message NestedType4 {
+ NestedType5 nested_field1 = 1;
+ NestedType5 nested_field2 = 2;
+}
+
+message NestedType5 {
+ NestedType6 nested_field1 = 1;
+ NestedType6 nested_field2 = 2;
+}
+
+message NestedType6 {
+ NestedType7 nested_field1 = 1;
+ NestedType7 nested_field2 = 2;
+}
+
+message NestedType7 {
+ NestedType8 nested_field1 = 1;
+ NestedType8 nested_field2 = 2;
+}
+
+message NestedType8 {
+ NestedType9 nested_field1 = 1;
+ NestedType9 nested_field2 = 2;
+}
+
+message NestedType9 {
+ NestedType10 nested_field1 = 1;
+ NestedType10 nested_field2 = 2;
+}
+
+message NestedType10 {
+ NestedType11 nested_field1 = 1;
+ NestedType11 nested_field2 = 2;
+}
+
+message NestedType11 {
+ NestedType12 nested_field1 = 1;
+ NestedType12 nested_field2 = 2;
+}
+
+message NestedType12 {
+ NestedType13 nested_field1 = 1;
+ NestedType13 nested_field2 = 2;
+}
+
+message NestedType13 {
+ NestedType14 nested_field1 = 1;
+ NestedType14 nested_field2 = 2;
+}
+
+message NestedType14 {
+ int32 nested_field1 = 1;
+}