libenchao commented on a change in pull request #14376:
URL: https://github.com/apache/flink/pull/14376#discussion_r595734073
##########
File path:
flink-formats/flink-protobuf/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
##########
@@ -0,0 +1 @@
+org.apache.flink.formats.protobuf.PbFormatFactory
Review comment:
Add license header to this file.
##########
File path:
flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/deserialize/PbRowDataDeserializationSchema.java
##########
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.protobuf.deserialize;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.formats.protobuf.PbCodegenException;
+import org.apache.flink.formats.protobuf.PbFormatUtils;
+import org.apache.flink.formats.protobuf.PbSchemaValidator;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Objects;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Deserialization schema from Protobuf to Flink types.
+ *
+ * <p>Deserializes a <code>byte[]</code> message as a protobuf object and
reads the specified
+ * fields.
+ *
+ * <p>Failures during deserialization are forwarded as wrapped IOExceptions.
+ */
+@PublicEvolving
Review comment:
I think this is only for internal usage, not a public API?
##########
File path:
flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/deserialize/PbRowDataDeserializationSchema.java
##########
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.protobuf.deserialize;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.formats.protobuf.PbCodegenException;
+import org.apache.flink.formats.protobuf.PbFormatUtils;
+import org.apache.flink.formats.protobuf.PbSchemaValidator;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Objects;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Deserialization schema from Protobuf to Flink types.
+ *
+ * <p>Deserializes a <code>byte[]</code> message as a protobuf object and
reads the specified
+ * fields.
+ *
+ * <p>Failures during deserialization are forwarded as wrapped IOExceptions.
+ */
+@PublicEvolving
+public class PbRowDataDeserializationSchema implements
DeserializationSchema<RowData> {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(PbRowDataDeserializationSchema.class);
+ private static final long serialVersionUID = -4040917522067315718L;
+
+ private final RowType rowType;
+ private final TypeInformation<RowData> resultTypeInfo;
+
+ private final String messageClassName;
+ private final boolean ignoreParseErrors;
+ private final boolean readDefaultValues;
+
+ private transient ProtoToRowConverter protoToRowConverter;
+
+ public PbRowDataDeserializationSchema(
+ RowType rowType,
+ TypeInformation<RowData> resultTypeInfo,
+ String messageClassName,
+ boolean ignoreParseErrors,
+ boolean readDefaultValues) {
+ checkNotNull(rowType, "Type information");
+ this.rowType = rowType;
+ this.resultTypeInfo = resultTypeInfo;
+ this.messageClassName = messageClassName;
+ this.ignoreParseErrors = ignoreParseErrors;
+ this.readDefaultValues = readDefaultValues;
+ // do it in client side to report error in the first place
+ new PbSchemaValidator(PbFormatUtils.getDescriptor(messageClassName),
rowType).validate();
+ // this step is only used to validate codegen in client side in the
first place
+ try {
+ // validate converter in client side to early detect errors
+ protoToRowConverter =
+ new ProtoToRowConverter(messageClassName, rowType,
readDefaultValues);
+ } catch (PbCodegenException e) {
+ throw new FlinkRuntimeException(e);
+ }
+ }
+
+ @Override
+ public void open(InitializationContext context) throws Exception {
+ protoToRowConverter = new ProtoToRowConverter(messageClassName,
rowType, readDefaultValues);
+ }
+
+ @Override
+ public RowData deserialize(byte[] message) throws IOException {
+ try {
+ return protoToRowConverter.convertProtoBinaryToRow(message);
+ } catch (Throwable t) {
+ if (ignoreParseErrors) {
+ return null;
+ }
+ LOG.error("Failed to deserialize PB object.", t);
Review comment:
we are throwing the error to the outside, hence we don't need to log it
here.
##########
File path:
flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/PbSchemaValidator.java
##########
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.protobuf;
+
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import org.apache.flink.table.types.logical.MapType;
+import org.apache.flink.table.types.logical.RowType;
+
+import com.google.protobuf.Descriptors;
+import com.google.protobuf.Descriptors.FieldDescriptor;
+import com.google.protobuf.Descriptors.FieldDescriptor.JavaType;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/** Validation class to verify protobuf definition and flink DDL schema. */
+public class PbSchemaValidator {
+ private Descriptors.Descriptor descriptor;
+ private RowType rowType;
+ private Map<JavaType, List<LogicalTypeRoot>> typeMatchMap = new HashMap();
+
+ public PbSchemaValidator(Descriptors.Descriptor descriptor, RowType
rowType) {
+ this.descriptor = descriptor;
+ this.rowType = rowType;
+ typeMatchMap.put(JavaType.BOOLEAN,
Collections.singletonList(LogicalTypeRoot.BOOLEAN));
+ typeMatchMap.put(
+ JavaType.BYTE_STRING,
+ Arrays.asList(LogicalTypeRoot.BINARY,
LogicalTypeRoot.VARBINARY));
+ typeMatchMap.put(JavaType.DOUBLE,
Collections.singletonList(LogicalTypeRoot.DOUBLE));
+ typeMatchMap.put(JavaType.FLOAT,
Collections.singletonList(LogicalTypeRoot.FLOAT));
+ typeMatchMap.put(
+ JavaType.ENUM, Arrays.asList(LogicalTypeRoot.VARCHAR,
LogicalTypeRoot.CHAR));
+ typeMatchMap.put(
+ JavaType.STRING, Arrays.asList(LogicalTypeRoot.VARCHAR,
LogicalTypeRoot.CHAR));
+ typeMatchMap.put(JavaType.INT,
Collections.singletonList(LogicalTypeRoot.INTEGER));
+ typeMatchMap.put(JavaType.LONG,
Collections.singletonList(LogicalTypeRoot.BIGINT));
+ }
+
+ public Descriptors.Descriptor getDescriptor() {
+ return descriptor;
+ }
+
+ public void setDescriptor(Descriptors.Descriptor descriptor) {
+ this.descriptor = descriptor;
+ }
+
+ public RowType getRowType() {
+ return rowType;
+ }
+
+ public void setRowType(RowType rowType) {
+ this.rowType = rowType;
+ }
+
+ public void validate() {
+ validateTypeMatch(descriptor, rowType);
+ if (!descriptor
+ .getFile()
+ .getOptions()
+ .getJavaPackage()
+ .equals(descriptor.getFile().getPackage())) {
+ throw new IllegalArgumentException(
+ "java_package and package must be the same in proto
definition");
+ }
+ if (!descriptor.getFile().getOptions().getJavaMultipleFiles()) {
Review comment:
why do you need this limitation?
##########
File path:
flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/serialize/PbCodegenSimpleSerializer.java
##########
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.protobuf.serialize;
+
+import org.apache.flink.formats.protobuf.PbCodegenAppender;
+import org.apache.flink.formats.protobuf.PbCodegenVarId;
+import org.apache.flink.formats.protobuf.PbFormatUtils;
+import org.apache.flink.table.types.logical.LogicalType;
+
+import com.google.protobuf.Descriptors;
+import com.google.protobuf.Descriptors.FieldDescriptor.JavaType;
+
+/** Serializer to convert flink simple type data to proto simple type object.
*/
+public class PbCodegenSimpleSerializer implements PbCodegenSerializer {
+ private Descriptors.FieldDescriptor fd;
+ private LogicalType type;
Review comment:
`final`
##########
File path:
flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/PbRowTypeInformation.java
##########
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.protobuf;
+
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.BooleanType;
+import org.apache.flink.table.types.logical.DoubleType;
+import org.apache.flink.table.types.logical.FloatType;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.MapType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.VarBinaryType;
+import org.apache.flink.table.types.logical.VarCharType;
+
+import com.google.protobuf.Descriptors;
+import com.google.protobuf.Descriptors.FieldDescriptor;
+import com.google.protobuf.Descriptors.FieldDescriptor.JavaType;
+
+/** Generate Row type information according to pb descriptors. */
+public class PbRowTypeInformation {
Review comment:
This is a util class, however the class name `PbRowTypeInformation`
sounds like a subclass of `TypeInformation`, could we name it
`PbRowTypeInformationUtil` or `PbRowTypeInformationGenerator`?
##########
File path:
flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/PbSchemaValidator.java
##########
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.protobuf;
+
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import org.apache.flink.table.types.logical.MapType;
+import org.apache.flink.table.types.logical.RowType;
+
+import com.google.protobuf.Descriptors;
+import com.google.protobuf.Descriptors.FieldDescriptor;
+import com.google.protobuf.Descriptors.FieldDescriptor.JavaType;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/** Validation class to verify protobuf definition and flink DDL schema. */
+public class PbSchemaValidator {
+ private Descriptors.Descriptor descriptor;
+ private RowType rowType;
+ private Map<JavaType, List<LogicalTypeRoot>> typeMatchMap = new HashMap();
+
+ public PbSchemaValidator(Descriptors.Descriptor descriptor, RowType
rowType) {
+ this.descriptor = descriptor;
+ this.rowType = rowType;
+ typeMatchMap.put(JavaType.BOOLEAN,
Collections.singletonList(LogicalTypeRoot.BOOLEAN));
Review comment:
we could move these initializations into a `static` block
##########
File path:
flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/PbSchemaValidator.java
##########
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.protobuf;
+
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import org.apache.flink.table.types.logical.MapType;
+import org.apache.flink.table.types.logical.RowType;
+
+import com.google.protobuf.Descriptors;
+import com.google.protobuf.Descriptors.FieldDescriptor;
+import com.google.protobuf.Descriptors.FieldDescriptor.JavaType;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/** Validation class to verify protobuf definition and flink DDL schema. */
+public class PbSchemaValidator {
+ private Descriptors.Descriptor descriptor;
+ private RowType rowType;
+ private Map<JavaType, List<LogicalTypeRoot>> typeMatchMap = new HashMap();
+
+ public PbSchemaValidator(Descriptors.Descriptor descriptor, RowType
rowType) {
+ this.descriptor = descriptor;
+ this.rowType = rowType;
+ typeMatchMap.put(JavaType.BOOLEAN,
Collections.singletonList(LogicalTypeRoot.BOOLEAN));
+ typeMatchMap.put(
+ JavaType.BYTE_STRING,
+ Arrays.asList(LogicalTypeRoot.BINARY,
LogicalTypeRoot.VARBINARY));
+ typeMatchMap.put(JavaType.DOUBLE,
Collections.singletonList(LogicalTypeRoot.DOUBLE));
+ typeMatchMap.put(JavaType.FLOAT,
Collections.singletonList(LogicalTypeRoot.FLOAT));
+ typeMatchMap.put(
+ JavaType.ENUM, Arrays.asList(LogicalTypeRoot.VARCHAR,
LogicalTypeRoot.CHAR));
+ typeMatchMap.put(
+ JavaType.STRING, Arrays.asList(LogicalTypeRoot.VARCHAR,
LogicalTypeRoot.CHAR));
+ typeMatchMap.put(JavaType.INT,
Collections.singletonList(LogicalTypeRoot.INTEGER));
+ typeMatchMap.put(JavaType.LONG,
Collections.singletonList(LogicalTypeRoot.BIGINT));
+ }
+
+ public Descriptors.Descriptor getDescriptor() {
+ return descriptor;
+ }
+
+ public void setDescriptor(Descriptors.Descriptor descriptor) {
+ this.descriptor = descriptor;
+ }
+
+ public RowType getRowType() {
+ return rowType;
+ }
+
+ public void setRowType(RowType rowType) {
+ this.rowType = rowType;
+ }
+
+ public void validate() {
+ validateTypeMatch(descriptor, rowType);
+ if (!descriptor
+ .getFile()
+ .getOptions()
+ .getJavaPackage()
+ .equals(descriptor.getFile().getPackage())) {
+ throw new IllegalArgumentException(
+ "java_package and package must be the same in proto
definition");
+ }
+ if (!descriptor.getFile().getOptions().getJavaMultipleFiles()) {
+ throw new IllegalArgumentException("java_multiple_files must set
to true");
+ }
+ }
+
+ /**
+ * Validate type match of row type.
+ *
+ * @param descriptor
Review comment:
better to give a description for the params.
##########
File path:
flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/deserialize/PbRowDataDeserializationSchema.java
##########
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.protobuf.deserialize;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.formats.protobuf.PbCodegenException;
+import org.apache.flink.formats.protobuf.PbFormatUtils;
+import org.apache.flink.formats.protobuf.PbSchemaValidator;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Objects;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Deserialization schema from Protobuf to Flink types.
+ *
+ * <p>Deserializes a <code>byte[]</code> message as a protobuf object and
reads the specified
+ * fields.
+ *
+ * <p>Failures during deserialization are forwarded as wrapped IOExceptions.
+ */
+@PublicEvolving
+public class PbRowDataDeserializationSchema implements
DeserializationSchema<RowData> {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(PbRowDataDeserializationSchema.class);
+ private static final long serialVersionUID = -4040917522067315718L;
+
+ private final RowType rowType;
+ private final TypeInformation<RowData> resultTypeInfo;
+
+ private final String messageClassName;
+ private final boolean ignoreParseErrors;
+ private final boolean readDefaultValues;
+
+ private transient ProtoToRowConverter protoToRowConverter;
+
+ public PbRowDataDeserializationSchema(
+ RowType rowType,
+ TypeInformation<RowData> resultTypeInfo,
+ String messageClassName,
+ boolean ignoreParseErrors,
+ boolean readDefaultValues) {
+ checkNotNull(rowType, "Type information");
Review comment:
give a better error message
##########
File path:
flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/deserialize/PbRowDataDeserializationSchema.java
##########
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.protobuf.deserialize;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.formats.protobuf.PbCodegenException;
+import org.apache.flink.formats.protobuf.PbFormatUtils;
+import org.apache.flink.formats.protobuf.PbSchemaValidator;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Objects;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Deserialization schema from Protobuf to Flink types.
+ *
+ * <p>Deserializes a <code>byte[]</code> message as a protobuf object and
reads the specified
+ * fields.
+ *
+ * <p>Failures during deserialization are forwarded as wrapped IOExceptions.
+ */
+@PublicEvolving
+public class PbRowDataDeserializationSchema implements
DeserializationSchema<RowData> {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(PbRowDataDeserializationSchema.class);
+ private static final long serialVersionUID = -4040917522067315718L;
Review comment:
We suggest to use `1L` for `serialVersionUID`
##########
File path:
flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/PbFormatUtils.java
##########
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.protobuf;
+
+import org.apache.flink.table.types.logical.LogicalType;
+
+import com.google.protobuf.Descriptors;
+
+/** Protobuf function util. */
+public class PbFormatUtils {
+
+ // protobuf code has a bug that, f_abc_7d will be convert to fAbc7d, but
actually we need fAbc7D
+ public static String fieldNameToJsonName(String name) {
+ final int length = name.length();
+ StringBuilder result = new StringBuilder(length);
+ boolean isNextUpperCase = false;
+ for (int i = 0; i < length; i++) {
+ char ch = name.charAt(i);
+ if (ch == '_') {
+ isNextUpperCase = true;
+ } else if (isNextUpperCase) {
+ // This closely matches the logic for ASCII characters in:
+ //
http://google3/google/protobuf/descriptor.cc?l=249-251&rcl=228891689
Review comment:
The link is broken?
##########
File path:
flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/PbSchemaValidator.java
##########
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.protobuf;
+
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import org.apache.flink.table.types.logical.MapType;
+import org.apache.flink.table.types.logical.RowType;
+
+import com.google.protobuf.Descriptors;
+import com.google.protobuf.Descriptors.FieldDescriptor;
+import com.google.protobuf.Descriptors.FieldDescriptor.JavaType;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/** Validation class to verify protobuf definition and flink DDL schema. */
+public class PbSchemaValidator {
+ private Descriptors.Descriptor descriptor;
+ private RowType rowType;
+ private Map<JavaType, List<LogicalTypeRoot>> typeMatchMap = new HashMap();
Review comment:
Maybe we could use a `EnumSet` instead of `List<LogicalTypeRoot>`?
A step further, maybe a `EnumMap<JavaType, EnumSet<LogicalTypeRoot>>` is ok?
##########
File path:
flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/PbFormatUtils.java
##########
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.protobuf;
+
+import org.apache.flink.table.types.logical.LogicalType;
+
+import com.google.protobuf.Descriptors;
+
+/** Protobuf function util. */
+public class PbFormatUtils {
+
+ // protobuf code has a bug that, f_abc_7d will be convert to fAbc7d, but
actually we need fAbc7D
+ public static String fieldNameToJsonName(String name) {
+ final int length = name.length();
+ StringBuilder result = new StringBuilder(length);
+ boolean isNextUpperCase = false;
+ for (int i = 0; i < length; i++) {
+ char ch = name.charAt(i);
+ if (ch == '_') {
+ isNextUpperCase = true;
+ } else if (isNextUpperCase) {
+ // This closely matches the logic for ASCII characters in:
+ //
http://google3/google/protobuf/descriptor.cc?l=249-251&rcl=228891689
+ if ('a' <= ch && ch <= 'z') {
+ ch = (char) (ch - 'a' + 'A');
+ isNextUpperCase = false;
+ }
+ result.append(ch);
+ } else {
+ result.append(ch);
+ }
+ }
+ return result.toString();
+ }
+
+ public static boolean isSimpleType(LogicalType type) {
+ switch (type.getTypeRoot()) {
+ case BOOLEAN:
+ case INTEGER:
+ case BIGINT:
+ case FLOAT:
+ case DOUBLE:
+ case CHAR:
+ case VARCHAR:
+ case BINARY:
+ case VARBINARY:
+ return true;
+ default:
+ return false;
+ }
+ }
+
+ public static String getStrongCamelCaseJsonName(String name) {
+ String jsonName = fieldNameToJsonName(name);
+ if (jsonName.length() == 1) {
+ return jsonName.toUpperCase();
+ } else {
+ return jsonName.substring(0, 1).toUpperCase() +
jsonName.substring(1);
+ }
+ }
+
+ public static Descriptors.Descriptor getDescriptor(String className) {
+ try {
+ Class<?> pbClass = Class.forName(className);
Review comment:
we should use the user classloader?
CC @wuchong
##########
File path:
flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/PbDecodingFormat.java
##########
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.protobuf;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import
org.apache.flink.formats.protobuf.deserialize.PbRowDataDeserializationSchema;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.format.DecodingFormat;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.RowType;
+
+/** {@link DecodingFormat} for protobuf decoding. */
+public class PbDecodingFormat implements
DecodingFormat<DeserializationSchema<RowData>> {
+ private String messageClassName;
+ private boolean ignoreParseErrors;
+ private boolean readDefaultValues;
Review comment:
make these fields `final`
##########
File path:
flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/serialize/PbRowDataSerializationSchema.java
##########
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.protobuf.serialize;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.formats.protobuf.PbCodegenException;
+import org.apache.flink.formats.protobuf.PbFormatUtils;
+import org.apache.flink.formats.protobuf.PbSchemaValidator;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import com.google.protobuf.Descriptors;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Serialization schema from Flink to Protobuf types.
+ *
+ * <p>Serializes a {@link RowData } to protobuf binary data.
+ *
+ * <p>Failures during deserialization are forwarded as wrapped {@link
FlinkRuntimeException}.
+ */
+@PublicEvolving
Review comment:
the same with `PbRowDataDeserializationSchema`, I don't think this is a
public API.
##########
File path:
flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/PbSchemaValidator.java
##########
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.protobuf;
+
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import org.apache.flink.table.types.logical.MapType;
+import org.apache.flink.table.types.logical.RowType;
+
+import com.google.protobuf.Descriptors;
+import com.google.protobuf.Descriptors.FieldDescriptor;
+import com.google.protobuf.Descriptors.FieldDescriptor.JavaType;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/** Validation class to verify protobuf definition and flink DDL schema. */
+public class PbSchemaValidator {
+ private Descriptors.Descriptor descriptor;
+ private RowType rowType;
+ private Map<JavaType, List<LogicalTypeRoot>> typeMatchMap = new HashMap();
+
+ public PbSchemaValidator(Descriptors.Descriptor descriptor, RowType
rowType) {
+ this.descriptor = descriptor;
+ this.rowType = rowType;
+ typeMatchMap.put(JavaType.BOOLEAN,
Collections.singletonList(LogicalTypeRoot.BOOLEAN));
+ typeMatchMap.put(
+ JavaType.BYTE_STRING,
+ Arrays.asList(LogicalTypeRoot.BINARY,
LogicalTypeRoot.VARBINARY));
+ typeMatchMap.put(JavaType.DOUBLE,
Collections.singletonList(LogicalTypeRoot.DOUBLE));
+ typeMatchMap.put(JavaType.FLOAT,
Collections.singletonList(LogicalTypeRoot.FLOAT));
+ typeMatchMap.put(
+ JavaType.ENUM, Arrays.asList(LogicalTypeRoot.VARCHAR,
LogicalTypeRoot.CHAR));
+ typeMatchMap.put(
+ JavaType.STRING, Arrays.asList(LogicalTypeRoot.VARCHAR,
LogicalTypeRoot.CHAR));
+ typeMatchMap.put(JavaType.INT,
Collections.singletonList(LogicalTypeRoot.INTEGER));
+ typeMatchMap.put(JavaType.LONG,
Collections.singletonList(LogicalTypeRoot.BIGINT));
+ }
+
+ public Descriptors.Descriptor getDescriptor() {
+ return descriptor;
+ }
+
+ public void setDescriptor(Descriptors.Descriptor descriptor) {
+ this.descriptor = descriptor;
+ }
+
+ public RowType getRowType() {
+ return rowType;
+ }
+
+ public void setRowType(RowType rowType) {
+ this.rowType = rowType;
+ }
+
+ public void validate() {
+ validateTypeMatch(descriptor, rowType);
+ if (!descriptor
+ .getFile()
+ .getOptions()
+ .getJavaPackage()
+ .equals(descriptor.getFile().getPackage())) {
+ throw new IllegalArgumentException(
+ "java_package and package must be the same in proto
definition");
+ }
+ if (!descriptor.getFile().getOptions().getJavaMultipleFiles()) {
+ throw new IllegalArgumentException("java_multiple_files must set
to true");
+ }
+ }
+
+ /**
+ * Validate type match of row type.
+ *
+ * @param descriptor
+ * @param rowType
+ */
+ public void validateTypeMatch(Descriptors.Descriptor descriptor, RowType
rowType) {
+ rowType.getFields()
+ .forEach(
+ rowField -> {
+ FieldDescriptor fieldDescriptor =
+
descriptor.findFieldByName(rowField.getName());
+ if (null != fieldDescriptor) {
+ validateTypeMatch(fieldDescriptor,
rowField.getType());
+ } else {
+ throw new ValidationException(
+ "Column "
+ + rowField.getName()
+ + " does not exists in
definition of proto class.");
+ }
+ });
+ }
+
+ /**
+ * Validate type match of general type.
+ *
+ * @param fd
+ * @param logicalType
+ */
+ public void validateTypeMatch(FieldDescriptor fd, LogicalType logicalType)
{
+ if (!fd.isRepeated()) {
+ if (fd.getJavaType() != JavaType.MESSAGE) {
+ // simple type
+ validateSimpleType(fd, logicalType.getTypeRoot());
+ } else {
+ // message type
+ validateTypeMatch(fd.getMessageType(), (RowType) logicalType);
Review comment:
if `logicalType` is not a `RowType`, then we'll get a `CastException`,
maybe we can check it before casting to give a better message?
##########
File path:
flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/deserialize/PbCodegenRowDeserializer.java
##########
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.protobuf.deserialize;
+
+import org.apache.flink.formats.protobuf.PbCodegenAppender;
+import org.apache.flink.formats.protobuf.PbCodegenException;
+import org.apache.flink.formats.protobuf.PbCodegenVarId;
+import org.apache.flink.formats.protobuf.PbFormatUtils;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+
+import com.google.protobuf.Descriptors.Descriptor;
+import com.google.protobuf.Descriptors.FieldDescriptor;
+
+/** Deserializer to convert proto message type object to flink row type data.
*/
+public class PbCodegenRowDeserializer implements PbCodegenDeserializer {
+ private Descriptor descriptor;
+ private RowType rowType;
+ private boolean readDefaultValues;
+ private PbCodegenAppender appender = new PbCodegenAppender();
+
+ public PbCodegenRowDeserializer(
+ Descriptor descriptor, RowType rowType, boolean readDefaultValues)
{
+ this.rowType = rowType;
+ this.descriptor = descriptor;
+ this.readDefaultValues = readDefaultValues;
+ }
+
+ @Override
+ public String codegen(String returnInternalDataVarName, String pbGetStr)
+ throws PbCodegenException {
+ // The type of messageGetStr is a native pb object,
+ // it should be converted to RowData of flink internal type
+ PbCodegenVarId varUid = PbCodegenVarId.getInstance();
+ int uid = varUid.getAndIncrement();
+ String pbMessageVar = "message" + uid;
+ String rowDataVar = "rowData" + uid;
+
+ int fieldSize = rowType.getFieldNames().size();
+ String pbMessageTypeStr = PbFormatUtils.getFullJavaName(descriptor);
+ appender.appendLine(pbMessageTypeStr + " " + pbMessageVar + " = " +
pbGetStr);
+ appender.appendLine(
+ "GenericRowData " + rowDataVar + " = new GenericRowData(" +
fieldSize + ")");
+ int index = 0;
+ for (String fieldName : rowType.getFieldNames()) {
+ int subUid = varUid.getAndIncrement();
+ String elementDataVar = "elementDataVar" + subUid;
+
+ LogicalType subType =
rowType.getTypeAt(rowType.getFieldIndex(fieldName));
+ FieldDescriptor elementFd = descriptor.findFieldByName(fieldName);
+ String strongCamelFieldName =
PbFormatUtils.getStrongCamelCaseJsonName(fieldName);
+ PbCodegenDeserializer codegen =
+ PbCodegenDeserializeFactory.getPbCodegenDes(
+ elementFd, subType, readDefaultValues);
+ appender.appendLine("Object " + elementDataVar + " = null");
+ if (!readDefaultValues) {
+ // only works in syntax=proto2 and readDefaultValues=false
+ // readDefaultValues must be true in pb3 mode
+ String isMessageNonEmptyStr =
+ isMessageNonEmptyStr(pbMessageVar,
strongCamelFieldName, elementFd);
+ appender.appendSegment("if(" + isMessageNonEmptyStr + "){");
+ }
+ String elementMessageGetStr =
+ pbMessageElementGetStr(pbMessageVar, strongCamelFieldName,
elementFd);
+ if (!elementFd.isRepeated()) {
+ // field is not map or array
+ // this step is needed to convert primitive type to boxed type
to unify the object
Review comment:
Is this step really necessary? I could pass all the tests after removing
this logic.
##########
File path:
flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/PbEncodingFormat.java
##########
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.protobuf;
+
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import
org.apache.flink.formats.protobuf.serialize.PbRowDataSerializationSchema;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.format.EncodingFormat;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.RowType;
+
+/** {@link EncodingFormat} for protobuf encoding. */
+public class PbEncodingFormat implements
EncodingFormat<SerializationSchema<RowData>> {
+ private String messageClassName;
Review comment:
`final`
##########
File path:
flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/serialize/RowToProtoConverter.java
##########
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.protobuf.serialize;
+
+import org.apache.flink.formats.protobuf.PbCodegenAppender;
+import org.apache.flink.formats.protobuf.PbCodegenException;
+import org.apache.flink.formats.protobuf.PbFormatUtils;
+import org.apache.flink.formats.protobuf.deserialize.ProtoToRowConverter;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.types.logical.RowType;
+
+import com.google.protobuf.AbstractMessage;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.Descriptors;
+import org.codehaus.janino.ScriptEvaluator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * {@link RowToProtoConverter} can convert flink row data to binary protobuf
message data by codegen
+ * process.
+ */
+public class RowToProtoConverter {
+ private static final Logger LOG =
LoggerFactory.getLogger(ProtoToRowConverter.class);
+ private ScriptEvaluator se;
Review comment:
`final`
##########
File path:
flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/serialize/PbRowDataSerializationSchema.java
##########
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.protobuf.serialize;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.formats.protobuf.PbCodegenException;
+import org.apache.flink.formats.protobuf.PbFormatUtils;
+import org.apache.flink.formats.protobuf.PbSchemaValidator;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import com.google.protobuf.Descriptors;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Serialization schema from Flink to Protobuf types.
+ *
+ * <p>Serializes a {@link RowData } to protobuf binary data.
+ *
+ * <p>Failures during deserialization are forwarded as wrapped {@link
FlinkRuntimeException}.
+ */
+@PublicEvolving
+public class PbRowDataSerializationSchema implements
SerializationSchema<RowData> {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(PbRowDataSerializationSchema.class);
Review comment:
unused field?
##########
File path:
flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/serialize/PbCodegenRowSerializer.java
##########
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.protobuf.serialize;
+
+import org.apache.flink.formats.protobuf.PbCodegenAppender;
+import org.apache.flink.formats.protobuf.PbCodegenException;
+import org.apache.flink.formats.protobuf.PbCodegenUtils;
+import org.apache.flink.formats.protobuf.PbCodegenVarId;
+import org.apache.flink.formats.protobuf.PbFormatUtils;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import org.apache.flink.table.types.logical.RowType;
+
+import com.google.protobuf.Descriptors;
+
+import java.util.List;
+
+/** Serializer to convert flink row type data to proto row type object. */
+public class PbCodegenRowSerializer implements PbCodegenSerializer {
+ private List<Descriptors.FieldDescriptor> fds;
+ private Descriptors.Descriptor descriptor;
+ private RowType rowType;
Review comment:
`final`
##########
File path:
flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/PbSchemaValidator.java
##########
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.protobuf;
+
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import org.apache.flink.table.types.logical.MapType;
+import org.apache.flink.table.types.logical.RowType;
+
+import com.google.protobuf.Descriptors;
+import com.google.protobuf.Descriptors.FieldDescriptor;
+import com.google.protobuf.Descriptors.FieldDescriptor.JavaType;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/** Validation class to verify protobuf definition and flink DDL schema. */
+public class PbSchemaValidator {
+ private Descriptors.Descriptor descriptor;
+ private RowType rowType;
+ private Map<JavaType, List<LogicalTypeRoot>> typeMatchMap = new HashMap();
+
+ public PbSchemaValidator(Descriptors.Descriptor descriptor, RowType
rowType) {
+ this.descriptor = descriptor;
+ this.rowType = rowType;
+ typeMatchMap.put(JavaType.BOOLEAN,
Collections.singletonList(LogicalTypeRoot.BOOLEAN));
+ typeMatchMap.put(
+ JavaType.BYTE_STRING,
+ Arrays.asList(LogicalTypeRoot.BINARY,
LogicalTypeRoot.VARBINARY));
+ typeMatchMap.put(JavaType.DOUBLE,
Collections.singletonList(LogicalTypeRoot.DOUBLE));
+ typeMatchMap.put(JavaType.FLOAT,
Collections.singletonList(LogicalTypeRoot.FLOAT));
+ typeMatchMap.put(
+ JavaType.ENUM, Arrays.asList(LogicalTypeRoot.VARCHAR,
LogicalTypeRoot.CHAR));
+ typeMatchMap.put(
+ JavaType.STRING, Arrays.asList(LogicalTypeRoot.VARCHAR,
LogicalTypeRoot.CHAR));
+ typeMatchMap.put(JavaType.INT,
Collections.singletonList(LogicalTypeRoot.INTEGER));
+ typeMatchMap.put(JavaType.LONG,
Collections.singletonList(LogicalTypeRoot.BIGINT));
+ }
+
+ public Descriptors.Descriptor getDescriptor() {
+ return descriptor;
+ }
+
+ public void setDescriptor(Descriptors.Descriptor descriptor) {
+ this.descriptor = descriptor;
+ }
+
+ public RowType getRowType() {
+ return rowType;
+ }
+
+ public void setRowType(RowType rowType) {
+ this.rowType = rowType;
+ }
+
+ public void validate() {
+ validateTypeMatch(descriptor, rowType);
+ if (!descriptor
+ .getFile()
+ .getOptions()
+ .getJavaPackage()
+ .equals(descriptor.getFile().getPackage())) {
+ throw new IllegalArgumentException(
+ "java_package and package must be the same in proto
definition");
+ }
+ if (!descriptor.getFile().getOptions().getJavaMultipleFiles()) {
+ throw new IllegalArgumentException("java_multiple_files must set
to true");
+ }
+ }
+
+ /**
+ * Validate type match of row type.
+ *
+ * @param descriptor
+ * @param rowType
+ */
+ public void validateTypeMatch(Descriptors.Descriptor descriptor, RowType
rowType) {
+ rowType.getFields()
+ .forEach(
+ rowField -> {
+ FieldDescriptor fieldDescriptor =
+
descriptor.findFieldByName(rowField.getName());
+ if (null != fieldDescriptor) {
+ validateTypeMatch(fieldDescriptor,
rowField.getType());
+ } else {
+ throw new ValidationException(
+ "Column "
+ + rowField.getName()
+ + " does not exists in
definition of proto class.");
+ }
+ });
+ }
+
+ /**
+ * Validate type match of general type.
+ *
+ * @param fd
+ * @param logicalType
+ */
+ public void validateTypeMatch(FieldDescriptor fd, LogicalType logicalType)
{
+ if (!fd.isRepeated()) {
+ if (fd.getJavaType() != JavaType.MESSAGE) {
+ // simple type
+ validateSimpleType(fd, logicalType.getTypeRoot());
+ } else {
+ // message type
+ validateTypeMatch(fd.getMessageType(), (RowType) logicalType);
Review comment:
there are other type casts which may produce `CastException`, please add
a check for them too.
##########
File path:
flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/deserialize/ProtoToRowConverter.java
##########
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.protobuf.deserialize;
+
+import org.apache.flink.formats.protobuf.PbCodegenAppender;
+import org.apache.flink.formats.protobuf.PbCodegenException;
+import org.apache.flink.formats.protobuf.PbConstant;
+import org.apache.flink.formats.protobuf.PbFormatUtils;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.GenericArrayData;
+import org.apache.flink.table.data.GenericMapData;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.binary.BinaryStringData;
+import org.apache.flink.table.types.logical.RowType;
+
+import com.google.protobuf.Descriptors;
+import com.google.protobuf.Descriptors.FileDescriptor.Syntax;
+import org.codehaus.janino.ScriptEvaluator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * {@link ProtoToRowConverter} can convert binary protobuf message data to
flink row data by codegen
+ * process.
+ */
+public class ProtoToRowConverter {
+ private static final Logger LOG =
LoggerFactory.getLogger(ProtoToRowConverter.class);
+ private ScriptEvaluator se;
+ private Method parseFromMethod;
Review comment:
make these fields `final`
##########
File path:
flink-formats/flink-protobuf/src/main/java/org/apache/flink/formats/protobuf/serialize/PbCodegenRowSerializer.java
##########
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.protobuf.serialize;
+
+import org.apache.flink.formats.protobuf.PbCodegenAppender;
+import org.apache.flink.formats.protobuf.PbCodegenException;
+import org.apache.flink.formats.protobuf.PbCodegenUtils;
+import org.apache.flink.formats.protobuf.PbCodegenVarId;
+import org.apache.flink.formats.protobuf.PbFormatUtils;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import org.apache.flink.table.types.logical.RowType;
+
+import com.google.protobuf.Descriptors;
+
+import java.util.List;
+
+/** Serializer to convert flink row type data to proto row type object. */
+public class PbCodegenRowSerializer implements PbCodegenSerializer {
+ private List<Descriptors.FieldDescriptor> fds;
Review comment:
unused field?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]