This is an automated email from the ASF dual-hosted git repository.

libenchao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new df03ada10e2 [FLINK-33611][flink-protobuf] Split last segment only when 
size exceeds split threshold limit in deserializer
df03ada10e2 is described below

commit df03ada10e226053780cb2e5e9742add4536289c
Author: dsaisharath <[email protected]>
AuthorDate: Wed Dec 13 14:14:38 2023 -0800

    [FLINK-33611][flink-protobuf] Split last segment only when size exceeds 
split threshold limit in deserializer
    
    Close apache/flink#23937
---
 .../deserialize/PbCodegenRowDeserializer.java      |  5 +-
 .../protobuf/serialize/PbCodegenRowSerializer.java |  8 +-
 .../formats/protobuf/VeryBigPbProtoToRowTest.java  | 39 +++++++++
 .../formats/protobuf/VeryBigPbRowToProtoTest.java  | 39 +++++++++
 .../src/test/proto/test_very_big_pb.proto          | 98 ++++++++++++++++++++++
 5 files changed, 178 insertions(+), 11 deletions(-)

diff --git 
a/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/deserialize/PbCodegenRowDeserializer.java
 
b/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/deserialize/PbCodegenRowDeserializer.java
index f3cf414f540..541b1f83839 100644
--- 
a/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/deserialize/PbCodegenRowDeserializer.java
+++ 
b/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/deserialize/PbCodegenRowDeserializer.java
@@ -109,10 +109,7 @@ public class PbCodegenRowDeserializer implements 
PbCodegenDeserializer {
             index += 1;
         }
         if (!splitAppender.code().isEmpty()) {
-            String splitMethod =
-                    formatContext.splitDeserializerRowTypeMethod(
-                            flinkRowDataVar, pbMessageTypeStr, pbMessageVar, 
splitAppender.code());
-            appender.appendSegment(splitMethod);
+            appender.appendSegment(splitAppender.code());
         }
         appender.appendLine(resultVar + " = " + flinkRowDataVar);
         return appender.code();
diff --git 
a/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/serialize/PbCodegenRowSerializer.java
 
b/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/serialize/PbCodegenRowSerializer.java
index 342117cd272..6f72ab83b81 100644
--- 
a/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/serialize/PbCodegenRowSerializer.java
+++ 
b/flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/serialize/PbCodegenRowSerializer.java
@@ -130,13 +130,7 @@ public class PbCodegenRowSerializer implements 
PbCodegenSerializer {
             index += 1;
         }
         if (!splitAppender.code().isEmpty()) {
-            String splitMethod =
-                    formatContext.splitSerializerRowTypeMethod(
-                            flinkRowDataVar,
-                            pbMessageTypeStr + ".Builder",
-                            messageBuilderVar,
-                            splitAppender.code());
-            appender.appendSegment(splitMethod);
+            appender.appendSegment(splitAppender.code());
         }
         appender.appendLine(resultVar + " = " + messageBuilderVar + 
".build()");
         return appender.code();
diff --git 
a/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/VeryBigPbProtoToRowTest.java
 
b/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/VeryBigPbProtoToRowTest.java
new file mode 100644
index 00000000000..a3e09ff34c8
--- /dev/null
+++ 
b/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/VeryBigPbProtoToRowTest.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.protobuf;
+
+import org.apache.flink.formats.protobuf.testproto.VeryBigPbClass;
+
+import org.junit.Test;
+
+/**
+ * Test for very huge proto definition, which may trigger some special 
optimizations such as code
+ * splitting and java constant pool size optimization.
+ */
+public class VeryBigPbProtoToRowTest {
+
+    @Test
+    public void testSimple() throws Exception {
+        VeryBigPbClass.VeryBigPbMessage veryBigPbMessage =
+                VeryBigPbClass.VeryBigPbMessage.newBuilder().build();
+        // test generated code can be compiled
+        ProtobufTestHelper.pbBytesToRow(
+                VeryBigPbClass.VeryBigPbMessage.class, 
veryBigPbMessage.toByteArray());
+    }
+}
diff --git 
a/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/VeryBigPbRowToProtoTest.java
 
b/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/VeryBigPbRowToProtoTest.java
new file mode 100644
index 00000000000..57e0eb71506
--- /dev/null
+++ 
b/flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/VeryBigPbRowToProtoTest.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.protobuf;
+
+import org.apache.flink.formats.protobuf.testproto.VeryBigPbClass;
+import org.apache.flink.table.data.GenericRowData;
+
+import org.junit.Test;
+
+/**
+ * Test for very huge proto definition, which may trigger some special 
optimizations such as code
+ * splitting and java constant pool size optimization.
+ */
+public class VeryBigPbRowToProtoTest {
+
+    @Test
+    public void testSimple() throws Exception {
+        GenericRowData rowData = new GenericRowData(4);
+
+        // test generated code can be compiled
+        ProtobufTestHelper.rowToPbBytes(rowData, 
VeryBigPbClass.VeryBigPbMessage.class);
+    }
+}
diff --git a/flink-formats/flink-protobuf/src/test/proto/test_very_big_pb.proto 
b/flink-formats/flink-protobuf/src/test/proto/test_very_big_pb.proto
new file mode 100644
index 00000000000..19a937ea2be
--- /dev/null
+++ b/flink-formats/flink-protobuf/src/test/proto/test_very_big_pb.proto
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+syntax = "proto3";
+
+package org.apache.flink.formats.protobuf.testproto;
+
+option java_package = "org.apache.flink.formats.protobuf.testproto";
+option java_outer_classname = "VeryBigPbClass";
+
+message VeryBigPbMessage {
+  NestedType1 nested_field1 = 1;
+  NestedType1 nested_field2 = 2;
+}
+
+message NestedType1 {
+  NestedType2 nested_field1 = 1;
+  NestedType2 nested_field2 = 2;
+}
+
+message NestedType2 {
+  NestedType3 nested_field1 = 1;
+  NestedType3 nested_field2 = 2;
+}
+
+message NestedType3 {
+  NestedType4 nested_field1 = 1;
+  NestedType4 nested_field2 = 2;
+}
+
+message NestedType4 {
+  NestedType5 nested_field1 = 1;
+  NestedType5 nested_field2 = 2;
+}
+
+message NestedType5 {
+  NestedType6 nested_field1 = 1;
+  NestedType6 nested_field2 = 2;
+}
+
+message NestedType6 {
+  NestedType7 nested_field1 = 1;
+  NestedType7 nested_field2 = 2;
+}
+
+message NestedType7 {
+  NestedType8 nested_field1 = 1;
+  NestedType8 nested_field2 = 2;
+}
+
+message NestedType8 {
+  NestedType9 nested_field1 = 1;
+  NestedType9 nested_field2 = 2;
+}
+
+message NestedType9 {
+  NestedType10 nested_field1 = 1;
+  NestedType10 nested_field2 = 2;
+}
+
+message NestedType10 {
+  NestedType11 nested_field1 = 1;
+  NestedType11 nested_field2 = 2;
+}
+
+message NestedType11 {
+  NestedType12 nested_field1 = 1;
+  NestedType12 nested_field2 = 2;
+}
+
+message NestedType12 {
+  NestedType13 nested_field1 = 1;
+  NestedType13 nested_field2 = 2;
+}
+
+message NestedType13 {
+  NestedType14 nested_field1 = 1;
+  NestedType14 nested_field2 = 2;
+}
+
+message NestedType14 {
+  int32 nested_field1 = 1;
+}

Reply via email to