wuchong commented on a change in pull request #14444:
URL: https://github.com/apache/flink/pull/14444#discussion_r547049153
##########
File path:
flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowDataDeserializationSchema.java
##########
@@ -85,9 +88,27 @@ public AvroRowDataDeserializationSchema(
DeserializationSchema<GenericRecord> nestedSchema,
AvroToRowDataConverters.AvroToRowDataConverter
runtimeConverter,
TypeInformation<RowData> typeInfo) {
+ this(nestedSchema, runtimeConverter, typeInfo, false);
+ }
+
+ /**
+ * Creates a Avro deserialization schema for the given logical type.
+ *
+ * @param nestedSchema Deserialization schema to deserialize as
{@link GenericRecord}
+ * @param runtimeConverter Converter that transforms a {@link
GenericRecord} into {@link RowData}
+ * @param typeInfo The TypeInformation to be used by
+ * {@link
AvroRowDataDeserializationSchema#getProducedType()}
+ * @param ignoreParseError Indicate whether to skip rows with parsing
error
+ */
+ public AvroRowDataDeserializationSchema(
Review comment:
I would suggest to refactor this class into builder pattern. It's hard
to maintain to introduce a new constructor when adding a new parameter.
##########
File path:
flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowDataDeserializationSchema.java
##########
@@ -59,18 +59,21 @@
*/
private final AvroToRowDataConverters.AvroToRowDataConverter
runtimeConverter;
+ private final Boolean ignoreParseError;
Review comment:
Use primitive type `boolean` otherwise, it maybe `null`.
##########
File path:
flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroRowDataDeSerializationSchemaTest.java
##########
@@ -205,4 +207,26 @@ public void testSpecificType() throws Exception {
Assert.assertEquals("12:12:12",
DataFormatConverters.LocalTimeConverter.INSTANCE.toExternal(
rowData.getInt(2)).toString());
}
+
+ @Test
+ public void testIgnoreParseErrors() throws Exception {
+ DataType serDataType = ROW(FIELD("type_int",
INT().notNull())).notNull();
+ RowType serRowType = (RowType) serDataType.getLogicalType();
+ AvroRowDataSerializationSchema serializationSchema = new
AvroRowDataSerializationSchema(serRowType);
+ serializationSchema.open(null);
+ RowData input = GenericRowData.of(1);
+ byte[] byteData = serializationSchema.serialize(input);
+
+ DataType deserDataType = ROW(FIELD("type_int",
DOUBLE().notNull())).notNull();
+ RowType deserRowType = (RowType) deserDataType.getLogicalType();
+ AvroRowDataDeserializationSchema deserializationSchema = new
AvroRowDataDeserializationSchema(
+
AvroDeserializationSchema.forGeneric(AvroSchemaConverter.convertToSchema(deserRowType)),
+
AvroToRowDataConverters.createRowConverter(deserRowType),
+ InternalTypeInfo.of(deserRowType),
+ true);
+
+ deserializationSchema.open(null);
+ RowData actual = deserializationSchema.deserialize(byteData);
+ assertThat(actual,
org.hamcrest.core.IsNull.nullValue(RowData.class));
Review comment:
Why not use `assertNull(actual)` here?
##########
File path:
flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowDataDeserializationSchema.java
##########
@@ -85,9 +88,27 @@ public AvroRowDataDeserializationSchema(
DeserializationSchema<GenericRecord> nestedSchema,
AvroToRowDataConverters.AvroToRowDataConverter
runtimeConverter,
TypeInformation<RowData> typeInfo) {
+ this(nestedSchema, runtimeConverter, typeInfo, false);
+ }
+
+ /**
+ * Creates a Avro deserialization schema for the given logical type.
+ *
+ * @param nestedSchema Deserialization schema to deserialize as
{@link GenericRecord}
+ * @param runtimeConverter Converter that transforms a {@link
GenericRecord} into {@link RowData}
+ * @param typeInfo The TypeInformation to be used by
+ * {@link
AvroRowDataDeserializationSchema#getProducedType()}
+ * @param ignoreParseError Indicate whether to skip rows with parsing
error
+ */
+ public AvroRowDataDeserializationSchema(
Review comment:
Besides, this class should be `@Internal` and not `@PublicEvolving`.
##########
File path:
flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroRowDataDeSerializationSchemaTest.java
##########
@@ -205,4 +207,26 @@ public void testSpecificType() throws Exception {
Assert.assertEquals("12:12:12",
DataFormatConverters.LocalTimeConverter.INSTANCE.toExternal(
rowData.getInt(2)).toString());
}
+
+ @Test
+ public void testIgnoreParseErrors() throws Exception {
+ DataType serDataType = ROW(FIELD("type_int",
INT().notNull())).notNull();
+ RowType serRowType = (RowType) serDataType.getLogicalType();
+ AvroRowDataSerializationSchema serializationSchema = new
AvroRowDataSerializationSchema(serRowType);
+ serializationSchema.open(null);
+ RowData input = GenericRowData.of(1);
+ byte[] byteData = serializationSchema.serialize(input);
Review comment:
Would be better to using numberic type to deseriazing string type to
verify the parse error. For example:
deserializing `[int: 1, string: abc]` with schema `(int, long)` should get
`Row(1, null)`.
I think deserializing null field is not a strong enough test.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]