wuchong commented on a change in pull request #14444:
URL: https://github.com/apache/flink/pull/14444#discussion_r547049153



##########
File path: 
flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowDataDeserializationSchema.java
##########
@@ -85,9 +88,27 @@ public AvroRowDataDeserializationSchema(
                        DeserializationSchema<GenericRecord> nestedSchema,
                        AvroToRowDataConverters.AvroToRowDataConverter 
runtimeConverter,
                        TypeInformation<RowData> typeInfo) {
+               this(nestedSchema, runtimeConverter, typeInfo, false);
+       }
+
+       /**
+        * Creates a Avro deserialization schema for the given logical type.
+        *
+        * @param nestedSchema     Deserialization schema to deserialize as 
{@link GenericRecord}
+        * @param runtimeConverter Converter that transforms a {@link 
GenericRecord} into {@link RowData}
+        * @param typeInfo         The TypeInformation to be used by
+        *                         {@link 
AvroRowDataDeserializationSchema#getProducedType()}
+        * @param ignoreParseError Indicate whether to skip rows with parsing 
error
+        */
+       public AvroRowDataDeserializationSchema(

Review comment:
       I would suggest to refactor this class into builder pattern. It's hard 
to maintain to introduce a new constructor when adding a new parameter.

##########
File path: 
flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowDataDeserializationSchema.java
##########
@@ -59,18 +59,21 @@
         */
        private final AvroToRowDataConverters.AvroToRowDataConverter 
runtimeConverter;
 
+       private final Boolean ignoreParseError;

Review comment:
       Use primitive type `boolean` otherwise, it maybe `null`. 

##########
File path: 
flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroRowDataDeSerializationSchemaTest.java
##########
@@ -205,4 +207,26 @@ public void testSpecificType() throws Exception {
                Assert.assertEquals("12:12:12", 
DataFormatConverters.LocalTimeConverter.INSTANCE.toExternal(
                                rowData.getInt(2)).toString());
        }
+
+       @Test
+       public void testIgnoreParseErrors() throws Exception {
+               DataType serDataType = ROW(FIELD("type_int", 
INT().notNull())).notNull();
+               RowType serRowType = (RowType) serDataType.getLogicalType();
+               AvroRowDataSerializationSchema serializationSchema = new 
AvroRowDataSerializationSchema(serRowType);
+               serializationSchema.open(null);
+               RowData input = GenericRowData.of(1);
+               byte[] byteData = serializationSchema.serialize(input);
+
+               DataType deserDataType = ROW(FIELD("type_int", 
DOUBLE().notNull())).notNull();
+               RowType deserRowType = (RowType) deserDataType.getLogicalType();
+               AvroRowDataDeserializationSchema deserializationSchema = new 
AvroRowDataDeserializationSchema(
+                               
AvroDeserializationSchema.forGeneric(AvroSchemaConverter.convertToSchema(deserRowType)),
+                               
AvroToRowDataConverters.createRowConverter(deserRowType),
+                               InternalTypeInfo.of(deserRowType),
+                               true);
+
+               deserializationSchema.open(null);
+               RowData actual = deserializationSchema.deserialize(byteData);
+               assertThat(actual, 
org.hamcrest.core.IsNull.nullValue(RowData.class));

Review comment:
       Why not use `assertNull(actual)` here?

##########
File path: 
flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowDataDeserializationSchema.java
##########
@@ -85,9 +88,27 @@ public AvroRowDataDeserializationSchema(
                        DeserializationSchema<GenericRecord> nestedSchema,
                        AvroToRowDataConverters.AvroToRowDataConverter 
runtimeConverter,
                        TypeInformation<RowData> typeInfo) {
+               this(nestedSchema, runtimeConverter, typeInfo, false);
+       }
+
+       /**
+        * Creates a Avro deserialization schema for the given logical type.
+        *
+        * @param nestedSchema     Deserialization schema to deserialize as 
{@link GenericRecord}
+        * @param runtimeConverter Converter that transforms a {@link 
GenericRecord} into {@link RowData}
+        * @param typeInfo         The TypeInformation to be used by
+        *                         {@link 
AvroRowDataDeserializationSchema#getProducedType()}
+        * @param ignoreParseError Indicate whether to skip rows with parsing 
error
+        */
+       public AvroRowDataDeserializationSchema(

Review comment:
       Besides, this class should be `@Internal` and not `@PublicEvolving`. 

##########
File path: 
flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroRowDataDeSerializationSchemaTest.java
##########
@@ -205,4 +207,26 @@ public void testSpecificType() throws Exception {
                Assert.assertEquals("12:12:12", 
DataFormatConverters.LocalTimeConverter.INSTANCE.toExternal(
                                rowData.getInt(2)).toString());
        }
+
+       @Test
+       public void testIgnoreParseErrors() throws Exception {
+               DataType serDataType = ROW(FIELD("type_int", 
INT().notNull())).notNull();
+               RowType serRowType = (RowType) serDataType.getLogicalType();
+               AvroRowDataSerializationSchema serializationSchema = new 
AvroRowDataSerializationSchema(serRowType);
+               serializationSchema.open(null);
+               RowData input = GenericRowData.of(1);
+               byte[] byteData = serializationSchema.serialize(input);

Review comment:
       Would be better to using numberic type to deseriazing string type to 
verify the parse error. For example:
   deserializing `[int: 1, string: abc]` with schema `(int, long)` should get 
`Row(1, null)`.
   
   I think deserializing null field is not a strong enough test. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to