libenchao commented on code in PR #23162: URL: https://github.com/apache/flink/pull/23162#discussion_r1383199488
########## flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/PbCodeSplitter.java: ########## @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.protobuf; + +import org.apache.flink.formats.protobuf.util.PbCodegenAppender; +import org.apache.flink.formats.protobuf.util.PbCodegenVarId; + +import java.util.ArrayList; +import java.util.List; + +/** + * PbCodeSplitter to split the serialization and deserialization code, for RowType each + * element,codeSpiltter will merge code segments of multiple elements, and split the code segment + * into a method if the code segment exceeds the threshold, and store the method in the + * splitMethodStack. + */ +public class PbCodeSplitter { + private final List<String> splitMethodStack = new ArrayList<>(); + + public PbCodeSplitter() {} Review Comment: No need to add a blank public default constructor. ########## flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/BigPbRowToProtoTest.java: ########## @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.protobuf; + +import org.apache.flink.formats.protobuf.serialize.PbRowDataSerializationSchema; +import org.apache.flink.formats.protobuf.testproto.BigPbClass; +import org.apache.flink.formats.protobuf.util.PbToRowTypeUtil; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.types.logical.RowType; + +import com.google.protobuf.ByteString; +import org.junit.Test; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +/** + * Test class for below case + * + * <PRE> + * syntax = "proto3"; + * package org.apache.flink.formats.protobuf.testproto; + * option java_package = "org.apache.flink.formats.protobuf.testproto"; + * option java_outer_classname = "BigPbClass"; + * import "google/protobuf/descriptor.proto"; + * message BigPbMessage { + * </PRE> + * + * <p>It is valid proto definition. + */ +public class BigPbRowToProtoTest { + + @Test + public void testSimple() throws Exception { + GenericRowData rowData = new GenericRowData(34); + rowData.setField(7, 20); + rowData.setField(8, StringData.fromString("test1")); + rowData.setField(9, false); + rowData.setField(10, 1F); + rowData.setField(11, 2D); + rowData.setField(12, new byte[] {1, 2, 3}); Review Comment: Can you set for all values, then we can be confident that splitting does not affect correctness. ########## flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/BigPbProtoToRowTest.java: ########## @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.protobuf; + +import org.apache.flink.formats.protobuf.deserialize.PbRowDataDeserializationSchema; +import org.apache.flink.formats.protobuf.testproto.BigPbClass; +import org.apache.flink.formats.protobuf.util.PbToRowTypeUtil; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.runtime.typeutils.InternalTypeInfo; +import org.apache.flink.table.types.logical.RowType; + +import com.google.protobuf.ByteString; +import org.junit.Test; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; + +/** + * Test class for below case + * + * <PRE> + * syntax = "proto3"; + * package org.apache.flink.formats.protobuf.testproto; + * option java_package = "org.apache.flink.formats.protobuf.testproto"; + * option java_outer_classname = "BigPbClass"; + * import "google/protobuf/descriptor.proto"; + * message BigPbMessage { + * </PRE> + * + * <p>It is valid proto definition. + */ +public class BigPbProtoToRowTest { Review Comment: All the comment in `BigPbRowToProtoTest` is also applied here. ########## flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/BigPbRowToProtoTest.java: ########## @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.protobuf; + +import org.apache.flink.formats.protobuf.serialize.PbRowDataSerializationSchema; +import org.apache.flink.formats.protobuf.testproto.BigPbClass; +import org.apache.flink.formats.protobuf.util.PbToRowTypeUtil; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.types.logical.RowType; + +import com.google.protobuf.ByteString; +import org.junit.Test; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +/** + * Test class for below case + * + * <PRE> + * syntax = "proto3"; + * package org.apache.flink.formats.protobuf.testproto; + * option java_package = "org.apache.flink.formats.protobuf.testproto"; + * option java_outer_classname = "BigPbClass"; + * import "google/protobuf/descriptor.proto"; + * message BigPbMessage { + * </PRE> + * + * <p>It is valid proto definition. Review Comment: I think this comment is also not needed, the protoc maven plugin will help us to compile it to java classes. A proper comment for this test class may be: `Test for huge proto definition, which may trigger some special optimizations such as code splitting.` ########## flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/deserialize/ProtoToRowConverter.java: ########## @@ -104,10 +106,19 @@ public ProtoToRowConverter(RowType rowType, PbFormatConfig formatConfig) PbCodegenDeserializer codegenDes = PbCodegenDeserializeFactory.getPbCodegenTopRowDes( descriptor, rowType, pbFormatContext); - String genCode = codegenDes.codegen("rowData", "message", 0); + // if codgen generate code size over threshod then split the code + PbCodeSplitter pbCodeSplitter = new PbCodeSplitter(); + LOG.info("Fast-pb generate split deserialize code"); Review Comment: I'm not sure why we need this log message ########## flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/PbCodeSplitter.java: ########## @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.protobuf; + +import org.apache.flink.formats.protobuf.util.PbCodegenAppender; +import org.apache.flink.formats.protobuf.util.PbCodegenVarId; + +import java.util.ArrayList; +import java.util.List; + +/** + * PbCodeSplitter to split the serialization and deserialization code, for RowType each + * element,codeSpiltter will merge code segments of multiple elements, and split the code segment + * into a method if the code segment exceeds the threshold, and store the method in the + * splitMethodStack. + */ +public class PbCodeSplitter { + private final List<String> splitMethodStack = new ArrayList<>(); + + public PbCodeSplitter() {} + + public String splitDeserializerRowTypeMethod( + String rowDataVar, String messageTypeStr, String messageTypeVar, String code) { + int uid = PbCodegenVarId.getInstance().getAndIncrement(); + String splitMethodName = "split" + uid; + PbCodegenAppender pbCodegenAppender = new PbCodegenAppender(); + pbCodegenAppender.appendSegment( + String.format( + "private static void %s (GenericRowData %s, %s %s) {\n %s \n}", + splitMethodName, rowDataVar, messageTypeStr, messageTypeVar, code)); + splitMethodStack.add(pbCodegenAppender.code()); + return String.format("%s(%s, %s);", splitMethodName, rowDataVar, messageTypeVar); + } + + public String splitSerializerRowTypeMethod( Review Comment: `splitSerializerRowTypeMethod` and `splitDeserializerRowTypeMethod` share most of codes, hence I'm wondering if we can reuse them. Further more, I think these two methods are actually not necessary, and `PbCodeSplitter` is kind of confusing. Can we just use `PbFormatContext` with: * Add a `final List<String> splitMethods = new ArrayList()` * Add a method `addCodeIntoMethod(String code)` And leave others to the caller, since there is only one caller of these two methods. Then we can avoid introducing `PbCodeSplitter` everywhere. ########## flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/util/PbCodegenUtils.java: ########## @@ -203,26 +204,29 @@ public static String pbDefaultValueCode( } /** - * This method will be called from serializer of flink array/map type because flink contains - * both array/map type in array format. Map/Array cannot contain null value in pb object then we - * must do conversion in case of null values in map/array type. + * Used to split array/map type java code segment This method will be called from serializer of Review Comment: Add period after a complete sentence. What's more, I don't think the comment should be changed, the sentence `Used to split array/map type java code segment` adds no information. ########## flink-formats/flink-protobuf/src/test/proto/test_big_pb.proto: ########## @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +syntax = "proto3"; + +package org.apache.flink.formats.protobuf.testproto; + +option java_package = "org.apache.flink.formats.protobuf.testproto"; +option java_outer_classname = "BigPbClass"; + +message BigPbMessage { + repeated string field = 1; + int32 int_field = 2; + float float_field = 3; + double double_field = 4; + bool bool_field = 5; + string string_field = 6; + bytes bytes_field = 7; + int32 a_field_1 = 8; + string a_field_2 = 9 ; + bool a_field_3 = 10; + float b_field_1 = 11; + double b_field_2 = 12; + bytes b_field_3 = 13; + int64 c_field_1 = 14; + uint32 c_field_2 = 15; + uint64 c_field_3 = 16; + int32 e_field_1 = 17; + float e_field_2 = 18; + string e_field_3 = 19; + bool e_field_4 = 20; + bytes e_field_5 = 21; + double f_field_1 = 22; + uint32 f_field_2 = 23; + uint64 f_field_3 = 24; + fixed32 f_field_4 = 25; + fixed64 f_field_5 = 26; + sfixed32 f_field_6 = 27; + sfixed64 f_field_7 = 28; + float f_field_8 = 29; + bool f_field_9 = 30; + string f_field_10 = 31; + bytes f_field_11 = 32; Review Comment: The field naming is not consistent(int_field, a_field_n, map_field), can you normalize it with one pattern? ########## flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/deserialize/ProtoToRowConverter.java: ########## @@ -129,4 +140,8 @@ public RowData convertProtoBinaryToRow(byte[] data) throws Exception { Object messageObj = parseFromMethod.invoke(null, data); return (RowData) decodeMethod.invoke(null, messageObj); } + + public boolean isCodeSplit() { Review Comment: Add `@VisibleForTesting` annotation for all `isCodeSplit` method since it's only used for testing purpose, and change it's modifier to `protected`. ########## flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/serialize/PbCodegenSerializer.java: ########## @@ -28,13 +29,18 @@ * </PRE> */ public interface PbCodegenSerializer { + /** * @param resultVar the final var name that is calculated by codegen. This var name will be used - * by outsider codegen environment. {@code resultVariable} should be protobuf object - * @param flinkObjectCode may be a variable or expression. Current codegen environment can use - * this literal name directly to access the input. {@code flinkObject} should be a flink - * internal object. + * by outsider codegen environment. {@code returnPbVarName} should be protobuf object + * @param internalDataGetStr may be a variable or expression. Current codegen environment can + * use this literal name directly to access the input. {@code internalDataGetStr} is a value + * coming from flink object. + * @param pbCodeSplitter when encode/decode method body over 4K, use PbCodeSplitter to Split Review Comment: Comments in `PbCodegenDeserializer` also apply here. ########## flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/deserialize/PbCodegenDeserializer.java: ########## @@ -28,12 +29,19 @@ * </PRE> */ public interface PbCodegenDeserializer { + /** * @param resultVar the final var name that is calculated by codegen. This var name will be used - * by outsider codegen environment. {@code resultVariable} should be flink object + * by outsider codegen environment. {@code returnInternalDataVarName} should be flink data + * object * @param pbObjectCode may be a variable or expression. Current codegen environment can use this - * literal name directly to access the input. {@code pbObject} should be a protobuf object + * literal name directly to access the input. {@code pbGetStr} is a value coming from + * protobuf object + * @param pbCodeSplitter when encode/decode method body over 4K, use PbCodeSplitter to Split Review Comment: Actually it's `PbConstant.CODEGEN_SPLIT_THRESHOLD` ########## flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/BigPbRowToProtoTest.java: ########## @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.protobuf; + +import org.apache.flink.formats.protobuf.serialize.PbRowDataSerializationSchema; +import org.apache.flink.formats.protobuf.testproto.BigPbClass; +import org.apache.flink.formats.protobuf.util.PbToRowTypeUtil; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.types.logical.RowType; + +import com.google.protobuf.ByteString; +import org.junit.Test; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +/** + * Test class for below case + * + * <PRE> + * syntax = "proto3"; + * package org.apache.flink.formats.protobuf.testproto; + * option java_package = "org.apache.flink.formats.protobuf.testproto"; + * option java_outer_classname = "BigPbClass"; + * import "google/protobuf/descriptor.proto"; + * message BigPbMessage { + * </PRE> + * + * <p>It is valid proto definition. + */ +public class BigPbRowToProtoTest { + + @Test + public void testSimple() throws Exception { + GenericRowData rowData = new GenericRowData(34); + rowData.setField(7, 20); + rowData.setField(8, StringData.fromString("test1")); + rowData.setField(9, false); + rowData.setField(10, 1F); + rowData.setField(11, 2D); + rowData.setField(12, new byte[] {1, 2, 3}); + + byte[] bytes = ProtobufTestHelper.rowToPbBytes(rowData, BigPbClass.BigPbMessage.class); + + BigPbClass.BigPbMessage bigPbMessage = BigPbClass.BigPbMessage.parseFrom(bytes); + + assertEquals(20, bigPbMessage.getAField1()); + assertEquals("test1", bigPbMessage.getAField2()); + assertFalse(bigPbMessage.getAField3()); + assertEquals(1F, bigPbMessage.getBField1()); + assertEquals(2D, bigPbMessage.getBField2()); + assertEquals(ByteString.copyFrom(new byte[] {1, 2, 3}), bigPbMessage.getBField3()); + } + + /* + * Flink-Protobuf serialize codegen code size is 13999, over code threshold. + * So pbCodeSplitter split the code. Review Comment: I think the comment is not really needed since the test name and body have already explained it. Besides, 13999 could go stale easily in the future's iteration. ########## flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/util/PbCodegenUtils.java: ########## @@ -203,26 +204,29 @@ public static String pbDefaultValueCode( } /** - * This method will be called from serializer of flink array/map type because flink contains - * both array/map type in array format. Map/Array cannot contain null value in pb object then we - * must do conversion in case of null values in map/array type. + * Used to split array/map type java code segment This method will be called from serializer of + * flink array/map type because flink contains both array/map type in array format. Map/Array + * cannot contain null value in pb object then we must do conversion in case of null values in + * map/array type. * * @param flinkArrDataVar code phrase represent arrayData of arr type or keyData/valueData in * map type. * @param iVar the index in arrDataVar * @param resultPbVar the returned pb variable name in codegen. * @param elementPbFd {@link FieldDescriptor} of element type in proto object * @param elementDataType {@link LogicalType} of element type in flink object + * @param pbCodeSplitter protobuf code splitter * @return The java code segment which represents field value retrieval. */ - public static String convertFlinkArrayElementToPbWithDefaultValueCode( + public static String convertFlinkArrayElementToPbWithDefaultValueCodeSplit( Review Comment: The method name is already long enough, we don't need to add `split` into it again. (Actually, the java doc for it is good enough) ########## flink-formats/flink-protobuf/src/test/java/org/apache/flink/formats/protobuf/BigPbRowToProtoTest.java: ########## @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.protobuf; + +import org.apache.flink.formats.protobuf.serialize.PbRowDataSerializationSchema; +import org.apache.flink.formats.protobuf.testproto.BigPbClass; +import org.apache.flink.formats.protobuf.util.PbToRowTypeUtil; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.types.logical.RowType; + +import com.google.protobuf.ByteString; +import org.junit.Test; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +/** + * Test class for below case + * + * <PRE> + * syntax = "proto3"; + * package org.apache.flink.formats.protobuf.testproto; + * option java_package = "org.apache.flink.formats.protobuf.testproto"; + * option java_outer_classname = "BigPbClass"; + * import "google/protobuf/descriptor.proto"; + * message BigPbMessage { + * </PRE> + * + * <p>It is valid proto definition. + */ +public class BigPbRowToProtoTest { + + @Test + public void testSimple() throws Exception { + GenericRowData rowData = new GenericRowData(34); + rowData.setField(7, 20); + rowData.setField(8, StringData.fromString("test1")); + rowData.setField(9, false); + rowData.setField(10, 1F); + rowData.setField(11, 2D); + rowData.setField(12, new byte[] {1, 2, 3}); + + byte[] bytes = ProtobufTestHelper.rowToPbBytes(rowData, BigPbClass.BigPbMessage.class); + + BigPbClass.BigPbMessage bigPbMessage = BigPbClass.BigPbMessage.parseFrom(bytes); + + assertEquals(20, bigPbMessage.getAField1()); + assertEquals("test1", bigPbMessage.getAField2()); + assertFalse(bigPbMessage.getAField3()); + assertEquals(1F, bigPbMessage.getBField1()); + assertEquals(2D, bigPbMessage.getBField2()); + assertEquals(ByteString.copyFrom(new byte[] {1, 2, 3}), bigPbMessage.getBField3()); + } + + /* + * Flink-Protobuf serialize codegen code size is 13999, over code threshold. + * So pbCodeSplitter split the code. + */ + @Test + public void testSerializeSplit() throws Exception { Review Comment: How about `testSplitInSerialization`. ########## flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/deserialize/PbCodegenDeserializer.java: ########## @@ -28,12 +29,19 @@ * </PRE> */ public interface PbCodegenDeserializer { + /** * @param resultVar the final var name that is calculated by codegen. This var name will be used - * by outsider codegen environment. {@code resultVariable} should be flink object + * by outsider codegen environment. {@code returnInternalDataVarName} should be flink data + * object * @param pbObjectCode may be a variable or expression. Current codegen environment can use this - * literal name directly to access the input. {@code pbObject} should be a protobuf object + * literal name directly to access the input. {@code pbGetStr} is a value coming from Review Comment: I'm not sure why we should reference something not in the method signature, and why we should change it from `resultVariable` to `returnInternalDataVarName`, and `pbObject` to `pbGetStr` ########## flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/serialize/PbCodegenSimpleSerializer.java: ########## @@ -41,8 +42,8 @@ public PbCodegenSimpleSerializer( this.formatContext = formatContext; } - @Override - public String codegen(String resultVar, String flinkObjectCode, int indent) + public String codegenSplit( Review Comment: `@Override` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
