This is an automated email from the ASF dual-hosted git repository.
zirui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new f3963ba3ee [INLONG-9491][Sort] CSV format support ignore trailing
unmappable fields (#9492)
f3963ba3ee is described below
commit f3963ba3eeeb2ad94782ea2f3a96ec0f7c9e4be2
Author: vernedeng <[email protected]>
AuthorDate: Tue Jan 9 10:11:01 2024 +0800
[INLONG-9491][Sort] CSV format support ignore trailing unmappable fields
(#9492)
---
.../sort-formats/format-inlongmsg-base/pom.xml | 2 +-
.../formats/inlongmsg/InLongMsgDecodingFormat.java | 64 ++++++++++-
.../formats/inlongmsg/InLongMsgFormatFactory.java | 12 +-
.../sort/formats/inlongmsg/InLongMsgOptions.java | 26 +++++
.../inlongmsg/InLongMsgRowDataSerDeTest.java | 123 +++++++++++++++++++++
5 files changed, 220 insertions(+), 7 deletions(-)
diff --git a/inlong-sort/sort-formats/format-inlongmsg-base/pom.xml
b/inlong-sort/sort-formats/format-inlongmsg-base/pom.xml
index 8a3f720721..8846aa0446 100644
--- a/inlong-sort/sort-formats/format-inlongmsg-base/pom.xml
+++ b/inlong-sort/sort-formats/format-inlongmsg-base/pom.xml
@@ -94,7 +94,7 @@
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-csv</artifactId>
- <scope>test</scope>
+ <scope>provided</scope>
</dependency>
</dependencies>
diff --git
a/inlong-sort/sort-formats/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgDecodingFormat.java
b/inlong-sort/sort-formats/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgDecodingFormat.java
index 0f67bbc072..93e970aa85 100644
---
a/inlong-sort/sort-formats/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgDecodingFormat.java
+++
b/inlong-sort/sort-formats/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgDecodingFormat.java
@@ -19,8 +19,17 @@ package org.apache.inlong.sort.formats.inlongmsg;
import
org.apache.inlong.sort.formats.inlongmsg.InLongMsgDeserializationSchema.MetadataConverter;
+import com.google.common.annotations.VisibleForTesting;
+import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.formats.csv.CsvRowDataDeserializationSchema;
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectReader;
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.CsvMapper;
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.CsvParser;
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.CsvSchema;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.format.DecodingFormat;
@@ -31,6 +40,7 @@ import org.apache.flink.table.data.TimestampData;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.utils.DataTypeUtils;
+import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@@ -40,6 +50,12 @@ import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.Stream;
+import static
org.apache.inlong.sort.formats.inlongmsg.InLongMsgOptions.CSV_EMPTY_STRING_AS_NULL;
+import static
org.apache.inlong.sort.formats.inlongmsg.InLongMsgOptions.CSV_IGNORE_TRAILING_UNMAPPABLE;
+import static
org.apache.inlong.sort.formats.inlongmsg.InLongMsgOptions.CSV_INSERT_NULLS_FOR_MISSING_COLUMNS;
+import static
org.apache.inlong.sort.formats.inlongmsg.InLongMsgOptions.IGNORE_PARSE_ERRORS;
+
+@Slf4j
public class InLongMsgDecodingFormat implements
DecodingFormat<DeserializationSchema<RowData>> {
private final String innerFormatMetaPrefix;
@@ -50,14 +66,23 @@ public class InLongMsgDecodingFormat implements
DecodingFormat<DeserializationSc
private final boolean ignoreErrors;
+ private final boolean ignoreTrailingUnmappable;
+
+ private final boolean insertNullsForMissingColumns;
+
+ private final boolean emptyStringAsNull;
+
public InLongMsgDecodingFormat(
DecodingFormat<DeserializationSchema<RowData>> innerDecodingFormat,
String innerFormatMetaPrefix,
- boolean ignoreErrors) {
+ ReadableConfig formatOptions) {
this.innerDecodingFormat = innerDecodingFormat;
this.innerFormatMetaPrefix = innerFormatMetaPrefix;
this.metadataKeys = Collections.emptyList();
- this.ignoreErrors = ignoreErrors;
+ this.ignoreErrors = formatOptions.get(IGNORE_PARSE_ERRORS);
+ this.ignoreTrailingUnmappable =
formatOptions.get(CSV_IGNORE_TRAILING_UNMAPPABLE);
+ this.insertNullsForMissingColumns =
formatOptions.get(CSV_INSERT_NULLS_FOR_MISSING_COLUMNS);
+ this.emptyStringAsNull = formatOptions.get(CSV_EMPTY_STRING_AS_NULL);
}
@Override
@@ -83,8 +108,15 @@ public class InLongMsgDecodingFormat implements
DecodingFormat<DeserializationSc
final TypeInformation<RowData> producedTypeInfo =
context.createTypeInformation(producedDataType);
+ DeserializationSchema<RowData> innerSchema =
+ innerDecodingFormat.createRuntimeDecoder(context,
physicalDataType);
+ if (innerSchema instanceof CsvRowDataDeserializationSchema) {
+ configCsvInnerFormat(innerSchema, ignoreTrailingUnmappable,
+ insertNullsForMissingColumns, emptyStringAsNull);
+ }
+
return new InLongMsgDeserializationSchema(
- innerDecodingFormat.createRuntimeDecoder(context,
physicalDataType),
+ innerSchema,
metadataConverters,
producedTypeInfo,
ignoreErrors);
@@ -190,4 +222,30 @@ public class InLongMsgDecodingFormat implements
DecodingFormat<DeserializationSc
this.converter = converter;
}
}
+
+ @VisibleForTesting
+ static void configCsvInnerFormat(
+ DeserializationSchema<RowData> innerSchema,
+ boolean ignoreTrailingUnmappable,
+ boolean insertNullsForMissingColumns,
+ boolean emptyStringAsNull) {
+ try {
+ Field readerField =
CsvRowDataDeserializationSchema.class.getDeclaredField("objectReader");
+ readerField.setAccessible(true);
+ ObjectReader oldReader = (ObjectReader)
readerField.get(innerSchema);
+
+ Field schemaField = ObjectReader.class.getDeclaredField("_schema");
+ schemaField.setAccessible(true);
+ CsvSchema oldSchema = (CsvSchema) schemaField.get(oldReader);
+ ObjectReader newReader = new CsvMapper()
+ .configure(CsvParser.Feature.IGNORE_TRAILING_UNMAPPABLE,
ignoreTrailingUnmappable)
+
.configure(CsvParser.Feature.INSERT_NULLS_FOR_MISSING_COLUMNS,
insertNullsForMissingColumns)
+ .configure(CsvParser.Feature.EMPTY_STRING_AS_NULL,
emptyStringAsNull)
+ .readerFor(JsonNode.class)
+ .with(oldSchema);
+ readerField.set(innerSchema, newReader);
+ } catch (Throwable t) {
+ log.error("failed to make csv inner format to ignore trailing
unmappable, ex is ", t);
+ }
+ }
}
diff --git
a/inlong-sort/sort-formats/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgFormatFactory.java
b/inlong-sort/sort-formats/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgFormatFactory.java
index c9b368a364..c7caa43290 100644
---
a/inlong-sort/sort-formats/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgFormatFactory.java
+++
b/inlong-sort/sort-formats/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgFormatFactory.java
@@ -34,6 +34,10 @@ import
org.apache.flink.table.factories.SerializationFormatFactory;
import java.util.HashSet;
import java.util.Set;
+import static
org.apache.inlong.sort.formats.inlongmsg.InLongMsgOptions.CSV_EMPTY_STRING_AS_NULL;
+import static
org.apache.inlong.sort.formats.inlongmsg.InLongMsgOptions.CSV_IGNORE_PARSE_ERRORS;
+import static
org.apache.inlong.sort.formats.inlongmsg.InLongMsgOptions.CSV_IGNORE_TRAILING_UNMAPPABLE;
+import static
org.apache.inlong.sort.formats.inlongmsg.InLongMsgOptions.CSV_INSERT_NULLS_FOR_MISSING_COLUMNS;
import static
org.apache.inlong.sort.formats.inlongmsg.InLongMsgOptions.IGNORE_PARSE_ERRORS;
import static
org.apache.inlong.sort.formats.inlongmsg.InLongMsgOptions.INNER_FORMAT;
import static
org.apache.inlong.sort.formats.inlongmsg.InLongMsgOptions.validateDecodingFormatOptions;
@@ -64,9 +68,7 @@ public final class InLongMsgFormatFactory
String innerFormatPrefix = INLONG_PREFIX + innerFormatMetaPrefix;
DecodingFormat<DeserializationSchema<RowData>> innerFormat =
innerFactory.createDecodingFormat(context, new
DelegatingConfiguration(allOptions, innerFormatPrefix));
- boolean ignoreErrors = formatOptions.get(IGNORE_PARSE_ERRORS);
-
- return new InLongMsgDecodingFormat(innerFormat, innerFormatMetaPrefix,
ignoreErrors);
+ return new InLongMsgDecodingFormat(innerFormat, innerFormatMetaPrefix,
formatOptions);
}
@Override
@@ -91,6 +93,10 @@ public final class InLongMsgFormatFactory
public Set<ConfigOption<?>> optionalOptions() {
Set<ConfigOption<?>> options = new HashSet<>();
options.add(IGNORE_PARSE_ERRORS);
+ options.add(CSV_IGNORE_TRAILING_UNMAPPABLE);
+ options.add(CSV_INSERT_NULLS_FOR_MISSING_COLUMNS);
+ options.add(CSV_EMPTY_STRING_AS_NULL);
+ options.add(CSV_IGNORE_PARSE_ERRORS);
return options;
}
}
diff --git
a/inlong-sort/sort-formats/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgOptions.java
b/inlong-sort/sort-formats/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgOptions.java
index 92a2ebc0b7..be0a9354fd 100644
---
a/inlong-sort/sort-formats/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgOptions.java
+++
b/inlong-sort/sort-formats/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgOptions.java
@@ -41,6 +41,32 @@ public class InLongMsgOptions {
.withDescription("Optional flag to skip fields and rows
with parse errors instead of failing;\n"
+ "fields are set to null in case of errors");
+ public static final ConfigOption<Boolean> CSV_IGNORE_PARSE_ERRORS =
+ ConfigOptions.key("csv.ignore-parse-errors")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription("Allows the case that real size exceeds
the expected size.\n "
+ + "The extra column will be skipped");
+
+ public static final ConfigOption<Boolean> CSV_IGNORE_TRAILING_UNMAPPABLE =
+ ConfigOptions.key("csv.ignore-trailing-unmappable")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription("Allows the case that real size exceeds
the expected size.\n "
+ + "The extra column will be skipped");
+
+ public static final ConfigOption<Boolean>
CSV_INSERT_NULLS_FOR_MISSING_COLUMNS =
+ ConfigOptions.key("csv.insert-nulls-for-missing-columns")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription("For missing columns, insert null.");
+
+ public static final ConfigOption<Boolean> CSV_EMPTY_STRING_AS_NULL =
+ ConfigOptions.key("csv.empty-string-as-null")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription("if the string value is empty, make it as
null");
+
public static void validateDecodingFormatOptions(ReadableConfig config) {
String innerFormat = config.get(INNER_FORMAT);
if (innerFormat == null) {
diff --git
a/inlong-sort/sort-formats/format-inlongmsg-base/src/test/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgRowDataSerDeTest.java
b/inlong-sort/sort-formats/format-inlongmsg-base/src/test/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgRowDataSerDeTest.java
index 9f8ddbdaf8..64985985ab 100644
---
a/inlong-sort/sort-formats/format-inlongmsg-base/src/test/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgRowDataSerDeTest.java
+++
b/inlong-sort/sort-formats/format-inlongmsg-base/src/test/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgRowDataSerDeTest.java
@@ -19,6 +19,7 @@ package org.apache.inlong.sort.formats.inlongmsg;
import org.apache.inlong.common.msg.InLongMsg;
+import com.google.common.collect.ImmutableList;
import org.apache.flink.api.common.functions.util.ListCollector;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.configuration.Configuration;
@@ -34,6 +35,7 @@ import org.apache.flink.table.factories.utils.FactoryMocks;
import
org.apache.flink.table.runtime.connector.source.ScanRuntimeProviderContext;
import org.junit.Test;
+import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.sql.Timestamp;
import java.util.ArrayList;
@@ -90,6 +92,127 @@ public class InLongMsgRowDataSerDeTest {
assertEquals(exceptedOutput, deData);
}
+ @Test
+ public void testIgnoreTrailing() throws IOException {
+ // mock data
+ InLongMsg inLongMsg = InLongMsg.newInLongMsg();
+ inLongMsg.addMsg("streamId=HAHA&t=202201011112",
+ "1,asdqw,heihei,2".getBytes(StandardCharsets.UTF_8));
+ byte[] input = inLongMsg.buildArray();
+ List<RowData> exceptedOutput = ImmutableList.of(
+ GenericRowData.of(1L, BinaryStringData.fromString("asdqw"),
BinaryStringData.fromString("heihei")));
+ final Map<String, String> tableOptions =
+ InLongMsgFormatFactoryTest.getModifiedOptions(opts -> {
+ opts.put("inlong-msg.inner.format", "csv");
+ opts.put("inlong-msg.csv.ignore-trailing-unmappable",
"true");
+ });
+ ResolvedSchema schema = ResolvedSchema.of(
+ Column.physical("id", DataTypes.BIGINT()),
+ Column.physical("f1", DataTypes.STRING()),
+ Column.physical("f2", DataTypes.STRING()));
+
+ DeserializationSchema<RowData> inLongMsgDeserializationSchema =
+
InLongMsgFormatFactoryTest.createDeserializationSchema(tableOptions, schema);
+ List<RowData> deData = new ArrayList<>();
+ ListCollector<RowData> out = new ListCollector<>(deData);
+
+ inLongMsgDeserializationSchema.deserialize(input, out);
+
+ assertEquals(exceptedOutput, deData);
+ }
+
+ @Test
+ public void testEmptyFieldValueAsNull() throws IOException {
+ // mock data
+ InLongMsg inLongMsg = InLongMsg.newInLongMsg();
+ inLongMsg.addMsg("streamId=HAHA&t=202201011112",
+ "1,asdqw,,2".getBytes(StandardCharsets.UTF_8));
+ byte[] input = inLongMsg.buildArray();
+ List<RowData> exceptedOutput = ImmutableList.of(
+ GenericRowData.of(1L, BinaryStringData.fromString("asdqw"),
null, 2L));
+ final Map<String, String> tableOptions =
+ InLongMsgFormatFactoryTest.getModifiedOptions(opts -> {
+ opts.put("inlong-msg.inner.format", "csv");
+ opts.put("inlong-msg.csv.empty-string-as-null", "true");
+ });
+ ResolvedSchema schema = ResolvedSchema.of(
+ Column.physical("id", DataTypes.BIGINT()),
+ Column.physical("f1", DataTypes.STRING()),
+ Column.physical("f2", DataTypes.STRING()),
+ Column.physical("f3", DataTypes.BIGINT()));
+
+ DeserializationSchema<RowData> inLongMsgDeserializationSchema =
+
InLongMsgFormatFactoryTest.createDeserializationSchema(tableOptions, schema);
+ List<RowData> deData = new ArrayList<>();
+ ListCollector<RowData> out = new ListCollector<>(deData);
+
+ inLongMsgDeserializationSchema.deserialize(input, out);
+
+ assertEquals(exceptedOutput, deData);
+ }
+
+ @Test
+ public void testWrongTypeFieldAsNull() throws IOException {
+ // mock data
+ InLongMsg inLongMsg = InLongMsg.newInLongMsg();
+ inLongMsg.addMsg("streamId=HAHA&t=202201011112",
+ "adf,asdqw,zdf,2".getBytes(StandardCharsets.UTF_8));
+ byte[] input = inLongMsg.buildArray();
+ List<RowData> exceptedOutput = ImmutableList.of(
+ GenericRowData.of(null, BinaryStringData.fromString("asdqw"),
BinaryStringData.fromString("zdf"), 2L));
+ final Map<String, String> tableOptions =
+ InLongMsgFormatFactoryTest.getModifiedOptions(opts -> {
+ opts.put("inlong-msg.inner.format", "csv");
+ opts.put("inlong-msg.csv.ignore-parse-errors", "true");
+ });
+ ResolvedSchema schema = ResolvedSchema.of(
+ Column.physical("id", DataTypes.BIGINT()),
+ Column.physical("f1", DataTypes.STRING()),
+ Column.physical("f2", DataTypes.STRING()),
+ Column.physical("f3", DataTypes.BIGINT()));
+
+ DeserializationSchema<RowData> inLongMsgDeserializationSchema =
+
InLongMsgFormatFactoryTest.createDeserializationSchema(tableOptions, schema);
+ List<RowData> deData = new ArrayList<>();
+ ListCollector<RowData> out = new ListCollector<>(deData);
+
+ inLongMsgDeserializationSchema.deserialize(input, out);
+
+ assertEquals(exceptedOutput, deData);
+ }
+
+ @Test
+ public void testInserNullForMissingColumn() throws IOException {
+ // mock data
+ InLongMsg inLongMsg = InLongMsg.newInLongMsg();
+ inLongMsg.addMsg("streamId=HAHA&t=202201011112",
+ "1,asdqw,123,2".getBytes(StandardCharsets.UTF_8));
+ byte[] input = inLongMsg.buildArray();
+ List<RowData> exceptedOutput = ImmutableList.of(
+ GenericRowData.of(1L, BinaryStringData.fromString("asdqw"),
+ BinaryStringData.fromString("123"), 2L, null));
+ final Map<String, String> tableOptions =
+ InLongMsgFormatFactoryTest.getModifiedOptions(opts -> {
+ opts.put("inlong-msg.inner.format", "csv");
+
opts.put("inlong-msg.csv.insert-nulls-for-missing-columns", "true");
+ });
+ ResolvedSchema schema = ResolvedSchema.of(
+ Column.physical("id", DataTypes.BIGINT()),
+ Column.physical("f1", DataTypes.STRING()),
+ Column.physical("f2", DataTypes.STRING()),
+ Column.physical("f3", DataTypes.BIGINT()),
+ Column.physical("f4", DataTypes.BIGINT()));
+
+ DeserializationSchema<RowData> inLongMsgDeserializationSchema =
+
InLongMsgFormatFactoryTest.createDeserializationSchema(tableOptions, schema);
+ List<RowData> deData = new ArrayList<>();
+ ListCollector<RowData> out = new ListCollector<>(deData);
+
+ inLongMsgDeserializationSchema.deserialize(input, out);
+
+ assertEquals(exceptedOutput, deData);
+ }
+
@Test
public void testDeserializeInLongMsgWithError() throws Exception {
// mock data