This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new ec3b69e647 [INLONG-11819][Sort] KV/CSV format support keep escape,
using line delimiter and call back when parse field has exception (#11820)
ec3b69e647 is described below
commit ec3b69e647e68234cf1b691d709afd33baa6e535
Author: Mingyu Bao <[email protected]>
AuthorDate: Tue Apr 15 14:07:03 2025 +0800
[INLONG-11819][Sort] KV/CSV format support keep escape, using line
delimiter and call back when parse field has exception (#11820)
---
.../inlong/sort/formats/base/TableFormatUtils.java | 7 +-
.../sort/formats/base/TextFormatBuilder.java | 7 ++
.../sort/formats/inlongmsg/FailureHandler.java | 14 +++
.../formats/inlongmsg/IgnoreFailureHandler.java | 9 ++
.../sort/formats/inlongmsg/NoOpFailureHandler.java | 9 ++
.../inlong/sort/formats/util/StringUtils.java | 30 ++++--
.../sort/formats/common/StringUtilsTest.java | 84 ++++++++++------
.../sort/formats/base/TableFormatUtilsTest.java | 28 +++---
.../sort/formats/csv/CsvDeserializationSchema.java | 2 +-
.../sort/formats/inlongmsg/row/InLongMsgUtils.java | 2 +
.../InLongMsgBinlogFormatDeserializer.java | 3 +-
.../inlongmsgbinlog/InLongMsgBinlogUtils.java | 7 +-
.../InLongMsgCsvFormatDeserializer.java | 17 +++-
.../InLongMsgCsvMixedFormatConverter.java | 2 +-
.../formats/inlongmsgcsv/InLongMsgCsvUtils.java | 9 +-
.../InLongMsgCsvFormatDeserializerTest.java | 6 ++
.../inlongmsgkv/InLongMsgKvFormatDeserializer.java | 2 +-
.../InLongMsgKvMixedFormatConverter.java | 2 +-
.../sort/formats/inlongmsgkv/InLongMsgKvUtils.java | 8 +-
.../InLongMsgKvFormatDeserializerTest.java | 6 ++
.../inlongmsgtlogcsv/InLongMsgTlogCsvUtils.java | 6 +-
.../InLongMsgTlogCsvFormatDeserializerTest.java | 6 ++
.../inlongmsgtlogkv/InLongMsgTlogKvUtils.java | 6 +-
.../InLongMsgTlogKvFormatDeserializerTest.java | 6 ++
.../sort/formats/kv/KvDeserializationSchema.java | 2 +-
.../sort/formats/inlongmsg/InLongMsgUtils.java | 5 +
.../InLongMsgBinlogFormatDeserializer.java | 3 +-
.../inlongmsgbinlog/InLongMsgBinlogUtils.java | 20 ++--
.../InLongMsgCsvFormatDeserializer.java | 52 ++++++++--
.../formats/inlongmsgcsv/InLongMsgCsvUtils.java | 13 ++-
.../InLongMsgCsvFormatDeserializerTest.java | 10 +-
.../inlongmsgkv/InLongMsgKvFormatDeserializer.java | 18 +++-
.../sort/formats/inlongmsgkv/InLongMsgKvUtils.java | 15 ++-
.../InLongMsgKvFormatDeserializerTest.java | 9 +-
.../InLongMsgTlogCsvFormatDeserializer.java | 111 +++++++++++++++++++--
.../inlongmsgtlogcsv/InLongMsgTlogCsvUtils.java | 44 +++++---
.../InLongMsgTlogCsvFormatDeserializerTest.java | 15 ++-
.../InLongMsgTlogKvFormatDeserializer.java | 71 ++++++++++---
.../inlongmsgtlogkv/InLongMsgTlogKvUtils.java | 28 ++++--
.../InLongMsgTlogKvFormatDeserializerTest.java | 38 ++++---
.../sort/formats/base/TableFormatUtilsTest.java | 28 +++---
.../csv/CsvRowDataDeserializationSchema.java | 27 ++++-
.../formats/kv/KvRowDataDeserializationSchema.java | 53 ++++++++--
.../apache/inlong/sort/formats/kv/KvUtilsTest.java | 100 ++++++++++++++++++-
44 files changed, 748 insertions(+), 192 deletions(-)
diff --git
a/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/base/TableFormatUtils.java
b/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/base/TableFormatUtils.java
index f68a0085f0..31e3cdaf09 100644
---
a/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/base/TableFormatUtils.java
+++
b/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/base/TableFormatUtils.java
@@ -58,6 +58,7 @@ import
org.apache.inlong.common.pojo.sort.dataflow.field.format.TimestampTypeInf
import org.apache.inlong.common.pojo.sort.dataflow.field.format.TypeInfo;
import
org.apache.inlong.common.pojo.sort.dataflow.field.format.VarBinaryFormatInfo;
import
org.apache.inlong.common.pojo.sort.dataflow.field.format.VarCharFormatInfo;
+import org.apache.inlong.sort.formats.inlongmsg.FailureHandler;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
@@ -547,7 +548,8 @@ public class TableFormatUtils {
String fieldName,
FormatInfo fieldFormatInfo,
String fieldText,
- String nullLiteral) {
+ String nullLiteral,
+ FailureHandler failureHandler) throws Exception {
checkState(fieldFormatInfo instanceof BasicFormatInfo);
if (fieldText == null) {
@@ -573,6 +575,9 @@ public class TableFormatUtils {
} catch (Exception e) {
LOG.warn("Could not properly deserialize the " + "text "
+ fieldText + " for field " + fieldName + ".", e);
+ if (failureHandler != null) {
+ failureHandler.onConvertingFieldFailure(fieldName, fieldText,
fieldFormatInfo, e);
+ }
}
return null;
}
diff --git
a/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/base/TextFormatBuilder.java
b/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/base/TextFormatBuilder.java
index b329c79149..12576d3e76 100644
---
a/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/base/TextFormatBuilder.java
+++
b/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/base/TextFormatBuilder.java
@@ -18,6 +18,7 @@
package org.apache.inlong.sort.formats.base;
import org.apache.inlong.common.pojo.sort.dataflow.field.format.RowFormatInfo;
+import org.apache.inlong.sort.formats.inlongmsg.FailureHandler;
import org.apache.flink.table.descriptors.DescriptorProperties;
@@ -44,6 +45,7 @@ public abstract class TextFormatBuilder<T extends
TextFormatBuilder> {
protected Character quoteChar = DEFAULT_QUOTE_CHARACTER;
protected String nullLiteral = DEFAULT_NULL_LITERAL;
protected boolean ignoreErrors = DEFAULT_IGNORE_ERRORS;
+ protected FailureHandler failureHandler;
protected TextFormatBuilder(RowFormatInfo rowFormatInfo) {
this.rowFormatInfo = rowFormatInfo;
@@ -79,6 +81,11 @@ public abstract class TextFormatBuilder<T extends
TextFormatBuilder> {
return (T) this;
}
+ public T setFailureHandler(FailureHandler failureHandler) {
+ this.failureHandler = failureHandler;
+ return (T) this;
+ }
+
@SuppressWarnings("unchecked")
public T configure(DescriptorProperties descriptorProperties) {
diff --git
a/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/inlongmsg/FailureHandler.java
b/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/inlongmsg/FailureHandler.java
index 6bdbf36103..e07b9fc815 100644
---
a/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/inlongmsg/FailureHandler.java
+++
b/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/inlongmsg/FailureHandler.java
@@ -17,6 +17,8 @@
package org.apache.inlong.sort.formats.inlongmsg;
+import org.apache.inlong.common.pojo.sort.dataflow.field.format.FormatInfo;
+
import java.io.Serializable;
/**
@@ -71,4 +73,16 @@ public interface FailureHandler extends Serializable {
*/
void onConvertingRowFailure(InLongMsgHead head, InLongMsgBody body,
Exception exception) throws Exception;
+ /**
+ * This method is called when there is a failure occurred while converting
any field to row.
+ *
+ * @param fieldName the filed name
+ * @param fieldText the filed test
+ * @param formatInfo the filed target type info
+ * @param exception the thrown exception
+ * @throws Exception the exception
+ */
+ void onConvertingFieldFailure(String fieldName, String fieldText,
FormatInfo formatInfo,
+ Exception exception) throws Exception;
+
}
diff --git
a/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/inlongmsg/IgnoreFailureHandler.java
b/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/inlongmsg/IgnoreFailureHandler.java
index b27fefc891..2afa460def 100644
---
a/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/inlongmsg/IgnoreFailureHandler.java
+++
b/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/inlongmsg/IgnoreFailureHandler.java
@@ -17,6 +17,8 @@
package org.apache.inlong.sort.formats.inlongmsg;
+import org.apache.inlong.common.pojo.sort.dataflow.field.format.FormatInfo;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -47,6 +49,13 @@ public class IgnoreFailureHandler implements FailureHandler {
LOG.warn("Cannot properly convert the InLongMsg ({}, {})", head, body,
exception);
}
+ @Override
+ public void onConvertingFieldFailure(String fieldName, String fieldText,
FormatInfo formatInfo,
+ Exception exception) throws Exception {
+ LOG.warn("Cannot convert the InLongMsg Filed (fieldName = {},
formatInfo = {}, fieldText = {}),",
+ fieldName, formatInfo, fieldText, exception);
+ }
+
@Override
public boolean isIgnoreFailure() {
return true;
diff --git
a/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/inlongmsg/NoOpFailureHandler.java
b/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/inlongmsg/NoOpFailureHandler.java
index d049920be1..9c4c764c49 100644
---
a/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/inlongmsg/NoOpFailureHandler.java
+++
b/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/inlongmsg/NoOpFailureHandler.java
@@ -17,6 +17,8 @@
package org.apache.inlong.sort.formats.inlongmsg;
+import org.apache.inlong.common.pojo.sort.dataflow.field.format.FormatInfo;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -51,6 +53,13 @@ public class NoOpFailureHandler implements FailureHandler {
throw exception;
}
+ @Override
+ public void onConvertingFieldFailure(String fieldName, String fieldText,
FormatInfo formatInfo,
+ Exception exception) throws Exception {
+ LOG.warn("Cannot convert the InLongMsg Filed (fieldName = {},
formatInfo = {}, fieldText = {}),",
+ fieldName, formatInfo, fieldText, exception);
+ }
+
@Override
public boolean isIgnoreFailure() {
return false;
diff --git
a/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/util/StringUtils.java
b/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/util/StringUtils.java
index 000d7a7175..807bbe00c4 100644
---
a/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/util/StringUtils.java
+++
b/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/util/StringUtils.java
@@ -39,7 +39,7 @@ public class StringUtils {
private static final int STATE_QUOTING = 16;
/**
- * @see StringUtils#splitKv(String, Character, Character,
Character,Character, Character)
+ * @see StringUtils#splitKv(String, Character, Character,
Character,Character)
*/
public static Map<String, String> splitKv(
@Nonnull String text,
@@ -48,7 +48,8 @@ public class StringUtils {
@Nullable Character escapeChar,
@Nullable Character quoteChar) {
List<Map<String, String>> lines =
- splitKv(text, entryDelimiter, kvDelimiter, escapeChar,
quoteChar, null);
+ splitKv(text, entryDelimiter, kvDelimiter, escapeChar,
quoteChar, null,
+ true);
if (lines.size() == 0) {
return new HashMap<>();
}
@@ -77,7 +78,8 @@ public class StringUtils {
@Nonnull Character kvDelimiter,
@Nullable Character escapeChar,
@Nullable Character quoteChar,
- @Nullable Character lineDelimiter) {
+ @Nullable Character lineDelimiter,
+ @Nullable boolean isDeleteEscapeChar) {
Map<String, String> fields = new HashMap<>();
List<Map<String, String>> lines = new ArrayList<>();
@@ -158,6 +160,9 @@ public class StringUtils {
case STATE_VALUE:
kvState = state;
state = STATE_ESCAPING;
+ if (!isDeleteEscapeChar) {
+ stringBuilder.append(ch);
+ }
break;
case STATE_ESCAPING:
stringBuilder.append(ch);
@@ -369,7 +374,7 @@ public class StringUtils {
/**
* Splits a single line of csv text.
*
- * @see StringUtils#splitCsv(String, Character, Character, Character,
Character, boolean)
+ * @see StringUtils#splitCsv(String, Character, Character, Character,
Character)
*/
public static String[] splitCsv(
@Nonnull String text,
@@ -384,7 +389,7 @@ public class StringUtils {
}
/**
- * @see StringUtils#splitCsv(String, Character, Character, Character,
Character, boolean)
+ * @see StringUtils#splitCsv(String, Character, Character, Character,
Character)
*/
public static String[][] splitCsv(
@Nonnull String text,
@@ -392,11 +397,12 @@ public class StringUtils {
@Nullable Character escapeChar,
@Nullable Character quoteChar,
@Nullable Character lineDelimiter) {
- return splitCsv(text, delimiter, escapeChar, quoteChar, lineDelimiter,
false);
+ return splitCsv(text, delimiter, escapeChar, quoteChar, lineDelimiter,
+ false, true);
}
/**
- * @see StringUtils#splitCsv(String, Character, Character, Character,
Character, boolean, Integer)
+ * @see StringUtils#splitCsv(String, Character, Character, Character,
Character, boolean, boolean)
*/
public static String[][] splitCsv(
@Nonnull String text,
@@ -404,8 +410,10 @@ public class StringUtils {
@Nullable Character escapeChar,
@Nullable Character quoteChar,
@Nullable Character lineDelimiter,
- boolean deleteHeadDelimiter) {
- return splitCsv(text, delimiter, escapeChar, quoteChar, lineDelimiter,
deleteHeadDelimiter, null);
+ boolean deleteHeadDelimiter,
+ boolean isDeleteEscapeChar) {
+ return splitCsv(text, delimiter, escapeChar, quoteChar, lineDelimiter,
+ deleteHeadDelimiter, isDeleteEscapeChar, null);
}
/**
@@ -434,6 +442,7 @@ public class StringUtils {
@Nullable Character quoteChar,
@Nullable Character lineDelimiter,
boolean deleteHeadDelimiter,
+ boolean isDeleteEscapeChar,
@Nullable Integer maxFieldSize) {
if (maxFieldSize != null && maxFieldSize <= 0) {
return new String[0][];
@@ -481,6 +490,9 @@ public class StringUtils {
switch (state) {
case STATE_NORMAL:
state = STATE_ESCAPING;
+ if (!isDeleteEscapeChar) {
+ stringBuilder.append(ch);
+ }
break;
case STATE_ESCAPING:
stringBuilder.append(ch);
diff --git
a/inlong-sort/sort-formats/format-common/src/test/java/org/apache/inlong/sort/formats/common/StringUtilsTest.java
b/inlong-sort/sort-formats/format-common/src/test/java/org/apache/inlong/sort/formats/common/StringUtilsTest.java
index b9c88ed788..5da3dfab7c 100644
---
a/inlong-sort/sort-formats/format-common/src/test/java/org/apache/inlong/sort/formats/common/StringUtilsTest.java
+++
b/inlong-sort/sort-formats/format-common/src/test/java/org/apache/inlong/sort/formats/common/StringUtilsTest.java
@@ -34,27 +34,27 @@ public class StringUtilsTest {
public void testSplitKvString() {
String kvString1 = "name=n&age=10";
- Map<String, String> map1 = StringUtils.splitKv(kvString1, '&',
- '=', '\\', '\'');
- assertEquals("n", map1.get("name"));
- assertEquals("10", map1.get("age"));
+ List<Map<String, String>> listMap1 = StringUtils.splitKv(kvString1,
'&',
+ '=', '\\', '\'', '\n', true);
+ assertEquals("n", listMap1.get(0).get("name"));
+ assertEquals("10", listMap1.get(0).get("age"));
String kvString2 = "name=&age=20&";
- Map<String, String> map2 = StringUtils.splitKv(kvString2, '&',
- '=', '\\', '\'');
- assertEquals("", map2.get("name"));
- assertEquals("20&", map2.get("age"));
+ List<Map<String, String>> listMap2 = StringUtils.splitKv(kvString2,
'&',
+ '=', '\\', '\'', '\n', true);
+ assertEquals("", listMap2.get(0).get("name"));
+ assertEquals("20&", listMap2.get(0).get("age"));
String kvString3 = "name==&age=20&&&value=aaa&dddd&";
- Map<String, String> map3 = StringUtils.splitKv(kvString3, '&',
- '=', '\\', '\'');
- assertEquals("=", map3.get("name"));
- assertEquals("20&&", map3.get("age"));
- assertEquals("aaa&dddd&", map3.get("value"));
+ List<Map<String, String>> listMap3 = StringUtils.splitKv(kvString3,
'&',
+ '=', '\\', '\'', '\n', true);
+ assertEquals("=", listMap3.get(0).get("name"));
+ assertEquals("20&&", listMap3.get(0).get("age"));
+ assertEquals("aaa&dddd&", listMap3.get(0).get("value"));
String kvString4 = "name==&age=20&&\nname1==&age1=20&&";
List<Map<String, String>> map4 = StringUtils.splitKv(kvString4, '&',
- '=', '\\', '\'', '\n');
+ '=', '\\', '\'', '\n', true);
assertEquals("=", map4.get(0).get("name"));
assertEquals("20&&", map4.get(0).get("age"));
assertEquals("=", map4.get(1).get("name1"));
@@ -62,7 +62,7 @@ public class StringUtilsTest {
String kvString5 =
"name==&age=20&&\nname1==&age1=20&&&value=aaa&dddd&";
List<Map<String, String>> map5 = StringUtils.splitKv(kvString5, '&',
- '=', '\\', '\'', '\n');
+ '=', '\\', '\'', '\n', true);
assertEquals("=", map5.get(0).get("name"));
assertEquals("20&&", map5.get(0).get("age"));
assertEquals("=", map5.get(1).get("name1"));
@@ -71,25 +71,25 @@ public class StringUtilsTest {
String kvString6 = "name==&age=20&&\\";
List<Map<String, String>> map6 = StringUtils.splitKv(kvString6, '&',
- '=', '\\', '\'', '\n');
+ '=', '\\', '\'', '\n', true);
assertEquals("=", map6.get(0).get("name"));
assertEquals("20&&", map6.get(0).get("age"));
String kvString7 = "name==&age=20&&'";
List<Map<String, String>> map7 = StringUtils.splitKv(kvString7, '&',
- '=', '\\', '\'', '\n');
+ '=', '\\', '\'', '\n', true);
assertEquals("=", map7.get(0).get("name"));
assertEquals("20&&", map7.get(0).get("age"));
String kvString8 = "name=\\=&age=20&a&'";
List<Map<String, String>> map8 = StringUtils.splitKv(kvString8, '&',
- '=', '\\', '\'', '\n');
+ '=', '\\', '\'', '\n', true);
assertEquals("=", map8.get(0).get("name"));
assertEquals("20&a&", map8.get(0).get("age"));
String kvString9 = "name=\\=&age=20&a\\&'";
List<Map<String, String>> map9 = StringUtils.splitKv(kvString9, '&',
- '=', '\\', '\'', '\n');
+ '=', '\\', '\'', '\n', true);
assertEquals("=", map8.get(0).get("name"));
assertEquals("20&a&", map8.get(0).get("age"));
}
@@ -115,22 +115,46 @@ public class StringUtilsTest {
assertEquals("home", csv1Array2[2][2]);
}
+ @Test
+ public void testSplitCsvStringWhiteEscape() {
+ String csvString1 = "name|age=20\\||&'";
+ String[][] csv1Array1 = StringUtils.splitCsv(csvString1, '|',
+ '\\', '\'', '\n', false, false);
+
+ assertEquals("age=20\\|", csv1Array1[0][1]);
+ assertEquals("&", csv1Array1[0][2]);
+
+ String csvString2 =
"name|age=20\\||&'\n\name|age=20\\||&'\n\n|home|\\home\\";
+ String[][] csv1Array2 = StringUtils.splitCsv(csvString2, '|',
+ '\\', '\'', '\n', false, false);
+
+ assertEquals("name", csv1Array2[0][0]);
+ assertEquals("age=20\\|", csv1Array2[0][1]);
+ assertEquals("&\n\name|age=20\\||&", csv1Array2[0][2]);
+ assertEquals("", csv1Array2[2][0]);
+ assertEquals("home", csv1Array2[2][1]);
+ assertEquals("\\home\\", csv1Array2[2][2]);
+ }
+
@Test
public void testSplitCsvStringWithMaxFields() {
String csvString =
"name|age=20\\||&'\n\name|age=20\\||&'\n\n|home|\\home\\";
String[][] csv1Array0 = StringUtils.splitCsv(csvString, '|',
- '\\', '\'', '\n', false, 0);
+ '\\', '\'', '\n', false, true,
+ 0);
assertEquals(0, csv1Array0.length);
String[][] csv1Array1 = StringUtils.splitCsv(csvString, '|',
- '\\', '\'', '\n', false, 1);
+ '\\', '\'', '\n', false, true,
+ 1);
assertEquals("name|age=20\\||&'\n\name|age=20\\||&'",
csv1Array1[0][0]);
assertEquals("", csv1Array1[1][0]);
assertEquals("|home|\\home\\", csv1Array1[2][0]);
String[][] csv1Array2 = StringUtils.splitCsv(csvString, '|',
- '\\', '\'', '\n', false, 2);
+ '\\', '\'', '\n', false, true,
+ 2);
assertEquals("name", csv1Array2[0][0]);
assertEquals("age=20\\||&'\n\name|age=20\\||&'", csv1Array2[0][1]);
assertEquals("", csv1Array2[1][0]);
@@ -138,7 +162,8 @@ public class StringUtilsTest {
assertEquals("home|\\home\\", csv1Array2[2][1]);
String[][] csv1Array3 = StringUtils.splitCsv(csvString, '|',
- '\\', '\'', '\n', false, 3);
+ '\\', '\'', '\n', false, true,
+ 3);
assertEquals("name", csv1Array3[0][0]);
assertEquals("age=20|", csv1Array3[0][1]);
assertEquals("&\n\name|age=20\\||&", csv1Array3[0][2]);
@@ -147,7 +172,8 @@ public class StringUtilsTest {
assertEquals("home", csv1Array3[2][2]);
String[][] csv1Array4 = StringUtils.splitCsv(csvString, '|',
- '\\', '\'', '\n', false, 4);
+ '\\', '\'', '\n', false, true,
+ 4);
assertEquals("name", csv1Array4[0][0]);
assertEquals("age=20|", csv1Array4[0][1]);
assertEquals("&\n\name|age=20\\||&", csv1Array4[0][2]);
@@ -159,9 +185,11 @@ public class StringUtilsTest {
@Test
public void testKvScapeCharSplit() {
String text = "k1=v1&\nk\\2=v2\\&&k3=v3";
- Map<String, String> kvMap = splitKv(text, '&', '=', '\\', null);
- Assert.assertTrue(kvMap != null && kvMap.size() == 3);
- Assert.assertTrue(kvMap.get("k3") != null);
- Assert.assertTrue(kvMap.get("\nk2") != null);
+ List<Map<String, String>> kvMapList = splitKv(text, '&', '=', '\\',
+ null, '\n', false);
+ Assert.assertTrue(kvMapList != null && kvMapList.size() == 2);
+ Assert.assertTrue(kvMapList.get(0).get("k3") == null);
+ Assert.assertTrue(kvMapList.get(1).get("\nk2") == null);
+ Assert.assertTrue(kvMapList.get(1).get("k\\2") != null);
}
}
diff --git
a/inlong-sort/sort-formats/format-row/format-base/src/test/java/org/apache/inlong/sort/formats/base/TableFormatUtilsTest.java
b/inlong-sort/sort-formats/format-row/format-base/src/test/java/org/apache/inlong/sort/formats/base/TableFormatUtilsTest.java
index 1487cea9e2..c47a267e59 100644
---
a/inlong-sort/sort-formats/format-row/format-base/src/test/java/org/apache/inlong/sort/formats/base/TableFormatUtilsTest.java
+++
b/inlong-sort/sort-formats/format-row/format-base/src/test/java/org/apache/inlong/sort/formats/base/TableFormatUtilsTest.java
@@ -34,13 +34,13 @@ import static org.junit.Assert.assertNull;
public class TableFormatUtilsTest {
@Test
- public void testDeserializeStringWithoutNullLiteral() {
+ public void testDeserializeStringWithoutNullLiteral() throws Exception {
Object result1 =
deserializeBasicField(
"f",
StringFormatInfo.INSTANCE,
"data",
- null);
+ null, null);
assertEquals("data", result1);
Object result2 =
@@ -48,7 +48,7 @@ public class TableFormatUtilsTest {
"f",
StringFormatInfo.INSTANCE,
"",
- null);
+ null, null);
assertEquals("", result2);
}
@@ -72,13 +72,13 @@ public class TableFormatUtilsTest {
}
@Test
- public void testDeserializeStringWithNullLiteral() {
+ public void testDeserializeStringWithNullLiteral() throws Exception {
Object result1 =
deserializeBasicField(
"f",
StringFormatInfo.INSTANCE,
"data",
- "n/a");
+ "n/a", null);
assertEquals("data", result1);
Object result2 =
@@ -86,7 +86,7 @@ public class TableFormatUtilsTest {
"f",
StringFormatInfo.INSTANCE,
"",
- "n/a");
+ "n/a", null);
assertEquals("", result2);
Object result3 =
@@ -94,7 +94,7 @@ public class TableFormatUtilsTest {
"f",
StringFormatInfo.INSTANCE,
"n/a",
- "n/a");
+ "n/a", null);
assertNull(result3);
}
@@ -126,13 +126,13 @@ public class TableFormatUtilsTest {
}
@Test
- public void testDeserializeNumberWithoutNullLiteral() {
+ public void testDeserializeNumberWithoutNullLiteral() throws Exception {
Object result1 =
deserializeBasicField(
"f",
IntFormatInfo.INSTANCE,
"1",
- null);
+ null, null);
assertEquals(1, result1);
Object result2 =
@@ -140,7 +140,7 @@ public class TableFormatUtilsTest {
"f",
IntFormatInfo.INSTANCE,
"",
- null);
+ null, null);
assertNull(result2);
}
@@ -164,13 +164,13 @@ public class TableFormatUtilsTest {
}
@Test
- public void testDeserializeNumberWithNullLiteral() {
+ public void testDeserializeNumberWithNullLiteral() throws Exception {
Object result1 =
deserializeBasicField(
"f",
IntFormatInfo.INSTANCE,
"1",
- "n/a");
+ "n/a", null);
assertEquals(1, result1);
try {
@@ -178,7 +178,7 @@ public class TableFormatUtilsTest {
"f",
IntFormatInfo.INSTANCE,
"",
- "n/a");
+ "n/a", null);
Assert.assertEquals(null, result2);
} catch (Exception e) {
// ignored
@@ -189,7 +189,7 @@ public class TableFormatUtilsTest {
"f",
IntFormatInfo.INSTANCE,
"n/a",
- "n/a");
+ "n/a", null);
assertNull(result3);
}
diff --git
a/inlong-sort/sort-formats/format-row/format-csv/src/main/java/org/apache/inlong/sort/formats/csv/CsvDeserializationSchema.java
b/inlong-sort/sort-formats/format-row/format-csv/src/main/java/org/apache/inlong/sort/formats/csv/CsvDeserializationSchema.java
index 39bfb5f7a2..2fb51da8ca 100644
---
a/inlong-sort/sort-formats/format-row/format-csv/src/main/java/org/apache/inlong/sort/formats/csv/CsvDeserializationSchema.java
+++
b/inlong-sort/sort-formats/format-row/format-csv/src/main/java/org/apache/inlong/sort/formats/csv/CsvDeserializationSchema.java
@@ -160,7 +160,7 @@ public final class CsvDeserializationSchema extends
DefaultDeserializationSchema
fieldNames[i],
fieldFormatInfos[i],
fieldTexts[i],
- nullLiteral);
+ nullLiteral, null);
row.setField(i, field);
}
diff --git
a/inlong-sort/sort-formats/format-row/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/row/InLongMsgUtils.java
b/inlong-sort/sort-formats/format-row/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/row/InLongMsgUtils.java
index 00391b2911..ab8377d14f 100644
---
a/inlong-sort/sort-formats/format-row/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/row/InLongMsgUtils.java
+++
b/inlong-sort/sort-formats/format-row/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/row/InLongMsgUtils.java
@@ -79,10 +79,12 @@ public class InLongMsgUtils {
public static final String FORMAT_TIME_FIELD_NAME =
"format.time-field-name";
public static final String FORMAT_ATTRIBUTES_FIELD_NAME =
"format.attributes-field-name";
public static final String FORMAT_RETAIN_PREDEFINED_FIELD =
"format.retain-predefined-field";
+ public static final String FORMAT_APPEND_ESCAPE_FIELD =
"format.append-escape";
public static final String DEFAULT_TIME_FIELD_NAME = "inlongmsg_time";
public static final String DEFAULT_ATTRIBUTES_FIELD_NAME =
"inlongmsg_attributes";
public static final boolean DEFAULT_PREDEFINED_FIELD = true;
+ public static final boolean DEFAULT_APPEND_ESCAPE = false;
public static final TypeInformation<Row> MIXED_ROW_TYPE =
Types.ROW_NAMED(
diff --git
a/inlong-sort/sort-formats/format-row/format-inlongmsg-binlog/src/main/java/org/apache/inlong/sort/formats/inlongmsgbinlog/InLongMsgBinlogFormatDeserializer.java
b/inlong-sort/sort-formats/format-row/format-inlongmsg-binlog/src/main/java/org/apache/inlong/sort/formats/inlongmsgbinlog/InLongMsgBinlogFormatDeserializer.java
index 5128d51be5..9cb52d3d07 100644
---
a/inlong-sort/sort-formats/format-row/format-inlongmsg-binlog/src/main/java/org/apache/inlong/sort/formats/inlongmsgbinlog/InLongMsgBinlogFormatDeserializer.java
+++
b/inlong-sort/sort-formats/format-row/format-inlongmsg-binlog/src/main/java/org/apache/inlong/sort/formats/inlongmsgbinlog/InLongMsgBinlogFormatDeserializer.java
@@ -30,7 +30,6 @@ import org.apache.flink.types.Row;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
-import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
@@ -152,7 +151,7 @@ public final class InLongMsgBinlogFormatDeserializer
extends AbstractInLongMsgFo
}
@Override
- protected List<Row> convertRows(InLongMsgHead head, InLongMsgBody body)
throws IOException {
+ protected List<Row> convertRows(InLongMsgHead head, InLongMsgBody body)
throws Exception {
return InLongMsgBinlogUtils.getRows(
rowFormatInfo,
timeFieldName,
diff --git
a/inlong-sort/sort-formats/format-row/format-inlongmsg-binlog/src/main/java/org/apache/inlong/sort/formats/inlongmsgbinlog/InLongMsgBinlogUtils.java
b/inlong-sort/sort-formats/format-row/format-inlongmsg-binlog/src/main/java/org/apache/inlong/sort/formats/inlongmsgbinlog/InLongMsgBinlogUtils.java
index 59dbd84667..4eb3a042f6 100644
---
a/inlong-sort/sort-formats/format-row/format-inlongmsg-binlog/src/main/java/org/apache/inlong/sort/formats/inlongmsgbinlog/InLongMsgBinlogUtils.java
+++
b/inlong-sort/sort-formats/format-row/format-inlongmsg-binlog/src/main/java/org/apache/inlong/sort/formats/inlongmsgbinlog/InLongMsgBinlogUtils.java
@@ -29,7 +29,6 @@ import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.descriptors.DescriptorProperties;
import org.apache.flink.types.Row;
-import java.io.IOException;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Collections;
@@ -186,7 +185,7 @@ public class InLongMsgBinlogUtils {
String metadataFieldName,
Map<String, String> attributes,
byte[] bytes,
- boolean includeUpdateBefore) throws IOException {
+ boolean includeUpdateBefore) throws Exception {
InLongBinlog.RowData rowData = InLongBinlog.RowData.parseFrom(bytes);
@@ -256,7 +255,7 @@ public class InLongMsgBinlogUtils {
Map<String, String> attributes,
InLongBinlog.RowData rowData,
String operation,
- List<InLongBinlog.Column> columns) {
+ List<InLongBinlog.Column> columns) throws Exception {
List<Object> headFields = new ArrayList<>();
if (timeFieldName != null) {
@@ -311,7 +310,7 @@ public class InLongMsgBinlogUtils {
fieldName,
dataFieldFormatInfos[i],
fieldText,
- null);
+ null, null);
row.setField(i + headFields.size(), field);
}
}
diff --git
a/inlong-sort/sort-formats/format-row/format-inlongmsg-csv/src/main/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvFormatDeserializer.java
b/inlong-sort/sort-formats/format-row/format-inlongmsg-csv/src/main/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvFormatDeserializer.java
index ab6a748056..532e71931c 100644
---
a/inlong-sort/sort-formats/format-row/format-inlongmsg-csv/src/main/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvFormatDeserializer.java
+++
b/inlong-sort/sort-formats/format-row/format-inlongmsg-csv/src/main/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvFormatDeserializer.java
@@ -40,9 +40,11 @@ import static
org.apache.inlong.sort.formats.base.TableFormatConstants.DEFAULT_D
import static
org.apache.inlong.sort.formats.base.TableFormatConstants.DEFAULT_LINE_DELIMITER;
import static
org.apache.inlong.sort.formats.base.TableFormatConstants.FORMAT_DELIMITER;
import static
org.apache.inlong.sort.formats.base.TableFormatConstants.FORMAT_LINE_DELIMITER;
+import static
org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgUtils.DEFAULT_APPEND_ESCAPE;
import static
org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgUtils.DEFAULT_ATTRIBUTES_FIELD_NAME;
import static
org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgUtils.DEFAULT_PREDEFINED_FIELD;
import static
org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgUtils.DEFAULT_TIME_FIELD_NAME;
+import static
org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgUtils.FORMAT_APPEND_ESCAPE_FIELD;
import static
org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgUtils.FORMAT_ATTRIBUTES_FIELD_NAME;
import static
org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgUtils.FORMAT_RETAIN_PREDEFINED_FIELD;
import static
org.apache.inlong.sort.formats.inlongmsg.row.InLongMsgUtils.FORMAT_TIME_FIELD_NAME;
@@ -120,6 +122,11 @@ public final class InLongMsgCsvFormatDeserializer extends
AbstractInLongMsgForma
*/
private boolean retainPredefinedField = true;
+ /**
+ * True if the append configed escape char, default false.
+ */
+ private boolean appendEscapeChar = false;
+
public InLongMsgCsvFormatDeserializer(
@Nonnull RowFormatInfo rowFormatInfo,
@Nullable String timeFieldName,
@@ -223,7 +230,7 @@ public final class InLongMsgCsvFormatDeserializer extends
AbstractInLongMsgForma
}
@Override
- protected List<Row> convertRows(InLongMsgHead head, InLongMsgBody body) {
+ protected List<Row> convertRows(InLongMsgHead head, InLongMsgBody body)
throws Exception {
Row dataRow =
InLongMsgCsvUtils.deserializeRow(
rowFormatInfo,
@@ -252,6 +259,7 @@ public final class InLongMsgCsvFormatDeserializer extends
AbstractInLongMsgForma
private Character lineDelimiter = DEFAULT_LINE_DELIMITER;
private Boolean deleteHeadDelimiter = DEFAULT_DELETE_HEAD_DELIMITER;
private Boolean retainPredefinedField = DEFAULT_PREDEFINED_FIELD;
+ private Boolean appendEscapeChar = DEFAULT_APPEND_ESCAPE;
public Builder(RowFormatInfo rowFormatInfo) {
super(rowFormatInfo);
@@ -287,6 +295,11 @@ public final class InLongMsgCsvFormatDeserializer extends
AbstractInLongMsgForma
return this;
}
+ public Builder setAppendEscapeChar(Boolean appendEscapeChar) {
+ this.appendEscapeChar = appendEscapeChar;
+ return this;
+ }
+
@Override
public Builder configure(DescriptorProperties descriptorProperties) {
super.configure(descriptorProperties);
@@ -303,6 +316,8 @@ public final class InLongMsgCsvFormatDeserializer extends
AbstractInLongMsgForma
.ifPresent(this::setDeleteHeadDelimiter);
descriptorProperties.getOptionalBoolean(FORMAT_RETAIN_PREDEFINED_FIELD)
.ifPresent(this::setRetainPredefinedField);
+ descriptorProperties.getOptionalBoolean(FORMAT_APPEND_ESCAPE_FIELD)
+ .ifPresent(this::setAppendEscapeChar);
return this;
}
diff --git
a/inlong-sort/sort-formats/format-row/format-inlongmsg-csv/src/main/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvMixedFormatConverter.java
b/inlong-sort/sort-formats/format-row/format-inlongmsg-csv/src/main/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvMixedFormatConverter.java
index 6b4400f548..7b2e30e6f1 100644
---
a/inlong-sort/sort-formats/format-row/format-inlongmsg-csv/src/main/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvMixedFormatConverter.java
+++
b/inlong-sort/sort-formats/format-row/format-inlongmsg-csv/src/main/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvMixedFormatConverter.java
@@ -96,7 +96,7 @@ public class InLongMsgCsvMixedFormatConverter extends
AbstractInLongMsgMixedForm
Timestamp time,
List<String> predefinedFields,
List<String> fields,
- Map<String, String> entries) {
+ Map<String, String> entries) throws Exception {
Row dataRow =
InLongMsgCsvUtils.deserializeRow(
rowFormatInfo,
diff --git
a/inlong-sort/sort-formats/format-row/format-inlongmsg-csv/src/main/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvUtils.java
b/inlong-sort/sort-formats/format-row/format-inlongmsg-csv/src/main/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvUtils.java
index c347f5861b..e4e4e5659e 100644
---
a/inlong-sort/sort-formats/format-row/format-inlongmsg-csv/src/main/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvUtils.java
+++
b/inlong-sort/sort-formats/format-row/format-inlongmsg-csv/src/main/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvUtils.java
@@ -108,7 +108,8 @@ public class InLongMsgCsvUtils {
String bodyStr = new String(bytes, Charset.forName(charset));
String[][] split =
- splitCsv(bodyStr, delimiter, escapeChar, quoteChar,
lineDelimiter, deleteHeadDelimiter);
+ splitCsv(bodyStr, delimiter, escapeChar, quoteChar,
lineDelimiter, deleteHeadDelimiter,
+ true);
return Arrays.stream(split)
.map((line) -> {
@@ -135,7 +136,7 @@ public class InLongMsgCsvUtils {
RowFormatInfo rowFormatInfo,
String nullLiteral,
List<String> predefinedFields,
- List<String> fields) {
+ List<String> fields) throws Exception {
String[] fieldNames = rowFormatInfo.getFieldNames();
FormatInfo[] fieldFormatInfos = rowFormatInfo.getFieldFormatInfos();
@@ -164,7 +165,7 @@ public class InLongMsgCsvUtils {
fieldName,
fieldFormatInfo,
fieldText,
- nullLiteral);
+ nullLiteral, null);
row.setField(i, field);
}
@@ -184,7 +185,7 @@ public class InLongMsgCsvUtils {
fieldName,
fieldFormatInfo,
fieldText,
- nullLiteral);
+ nullLiteral, null);
row.setField(i + predefinedFields.size(), field);
}
for (int i = predefinedFields.size() + fields.size(); i <
fieldNames.length; ++i) {
diff --git
a/inlong-sort/sort-formats/format-row/format-inlongmsg-csv/src/test/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvFormatDeserializerTest.java
b/inlong-sort/sort-formats/format-row/format-inlongmsg-csv/src/test/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvFormatDeserializerTest.java
index 8508aaa90e..b915519f66 100644
---
a/inlong-sort/sort-formats/format-row/format-inlongmsg-csv/src/test/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvFormatDeserializerTest.java
+++
b/inlong-sort/sort-formats/format-row/format-inlongmsg-csv/src/test/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvFormatDeserializerTest.java
@@ -744,5 +744,11 @@ public class InLongMsgCsvFormatDeserializerTest {
InLongMsgBody body, Exception exception) throws Exception {
rowCount++;
}
+
+ @Override
+ public void onConvertingFieldFailure(String fieldName, String
fieldText, FormatInfo formatInfo,
+ Exception exception) throws Exception {
+ rowCount++;
+ }
}
}
\ No newline at end of file
diff --git
a/inlong-sort/sort-formats/format-row/format-inlongmsg-kv/src/main/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvFormatDeserializer.java
b/inlong-sort/sort-formats/format-row/format-inlongmsg-kv/src/main/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvFormatDeserializer.java
index 1fecb43d5f..b880a4817a 100644
---
a/inlong-sort/sort-formats/format-row/format-inlongmsg-kv/src/main/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvFormatDeserializer.java
+++
b/inlong-sort/sort-formats/format-row/format-inlongmsg-kv/src/main/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvFormatDeserializer.java
@@ -225,7 +225,7 @@ public final class InLongMsgKvFormatDeserializer extends
AbstractInLongMsgFormat
}
@Override
- protected List<Row> convertRows(InLongMsgHead head, InLongMsgBody body) {
+ protected List<Row> convertRows(InLongMsgHead head, InLongMsgBody body)
throws Exception {
Row dataRow =
InLongMsgKvUtils.deserializeRow(
rowFormatInfo,
diff --git
a/inlong-sort/sort-formats/format-row/format-inlongmsg-kv/src/main/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvMixedFormatConverter.java
b/inlong-sort/sort-formats/format-row/format-inlongmsg-kv/src/main/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvMixedFormatConverter.java
index a70ff5a1a3..ffdba490bd 100644
---
a/inlong-sort/sort-formats/format-row/format-inlongmsg-kv/src/main/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvMixedFormatConverter.java
+++
b/inlong-sort/sort-formats/format-row/format-inlongmsg-kv/src/main/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvMixedFormatConverter.java
@@ -95,7 +95,7 @@ public class InLongMsgKvMixedFormatConverter extends
AbstractInLongMsgMixedForma
Timestamp time,
List<String> predefinedFields,
List<String> fields,
- Map<String, String> entries) {
+ Map<String, String> entries) throws Exception {
Row dataRow =
InLongMsgKvUtils.deserializeRow(
rowFormatInfo,
diff --git
a/inlong-sort/sort-formats/format-row/format-inlongmsg-kv/src/main/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvUtils.java
b/inlong-sort/sort-formats/format-row/format-inlongmsg-kv/src/main/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvUtils.java
index faeb11fa18..d8414ca038 100644
---
a/inlong-sort/sort-formats/format-row/format-inlongmsg-kv/src/main/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvUtils.java
+++
b/inlong-sort/sort-formats/format-row/format-inlongmsg-kv/src/main/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvUtils.java
@@ -101,7 +101,7 @@ public class InLongMsgKvUtils {
kvDelimiter,
escapeChar,
quoteChar,
- lineDelimiter);
+ lineDelimiter, true);
return list.stream().map((line) -> {
return new InLongMsgBody(
@@ -125,7 +125,7 @@ public class InLongMsgKvUtils {
RowFormatInfo rowFormatInfo,
String nullLiteral,
List<String> predefinedFields,
- Map<String, String> entries) {
+ Map<String, String> entries) throws Exception {
String[] fieldNames = rowFormatInfo.getFieldNames();
FormatInfo[] fieldFormatInfos = rowFormatInfo.getFieldFormatInfos();
@@ -147,7 +147,7 @@ public class InLongMsgKvUtils {
fieldName,
fieldFormatInfo,
fieldText,
- nullLiteral);
+ nullLiteral, null);
row.setField(i, field);
}
@@ -163,7 +163,7 @@ public class InLongMsgKvUtils {
fieldName,
fieldFormatInfo,
fieldText,
- nullLiteral);
+ nullLiteral, null);
row.setField(i, field);
}
diff --git
a/inlong-sort/sort-formats/format-row/format-inlongmsg-kv/src/test/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvFormatDeserializerTest.java
b/inlong-sort/sort-formats/format-row/format-inlongmsg-kv/src/test/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvFormatDeserializerTest.java
index 045b423f94..1c25f93d82 100644
---
a/inlong-sort/sort-formats/format-row/format-inlongmsg-kv/src/test/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvFormatDeserializerTest.java
+++
b/inlong-sort/sort-formats/format-row/format-inlongmsg-kv/src/test/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvFormatDeserializerTest.java
@@ -329,5 +329,11 @@ public class InLongMsgKvFormatDeserializerTest {
Exception exception) throws Exception {
rowCount++;
}
+
+ @Override
+ public void onConvertingFieldFailure(String fieldName, String
fieldText, FormatInfo formatInfo,
+ Exception exception) throws Exception {
+ rowCount++;
+ }
}
}
diff --git
a/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogcsv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvUtils.java
b/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogcsv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvUtils.java
index d9a699b174..82bc8e2ebc 100644
---
a/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogcsv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvUtils.java
+++
b/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogcsv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvUtils.java
@@ -110,7 +110,7 @@ public class InLongMsgTlogCsvUtils {
RowFormatInfo rowFormatInfo,
String nullLiteral,
List<String> predefinedFields,
- List<String> fields) {
+ List<String> fields) throws Exception {
String[] fieldNames = rowFormatInfo.getFieldNames();
FormatInfo[] fieldFormatInfos = rowFormatInfo.getFieldFormatInfos();
@@ -138,7 +138,7 @@ public class InLongMsgTlogCsvUtils {
fieldName,
fieldFormatInfo,
fieldText,
- nullLiteral);
+ nullLiteral, null);
row.setField(i, field);
}
@@ -158,7 +158,7 @@ public class InLongMsgTlogCsvUtils {
fieldName,
fieldFormatInfo,
fieldText,
- nullLiteral);
+ nullLiteral, null);
row.setField(i + predefinedFields.size(), field);
}
diff --git
a/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogcsv/src/test/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvFormatDeserializerTest.java
b/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogcsv/src/test/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvFormatDeserializerTest.java
index 7d66b9ba1f..002ddfe542 100644
---
a/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogcsv/src/test/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvFormatDeserializerTest.java
+++
b/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogcsv/src/test/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvFormatDeserializerTest.java
@@ -285,5 +285,11 @@ public class InLongMsgTlogCsvFormatDeserializerTest {
InLongMsgBody body, Exception exception) throws Exception {
rowCount++;
}
+
+ @Override
+ public void onConvertingFieldFailure(String fieldName, String
fieldText, FormatInfo formatInfo,
+ Exception exception) throws Exception {
+ rowCount++;
+ }
}
}
diff --git
a/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogkv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogkv/InLongMsgTlogKvUtils.java
b/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogkv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogkv/InLongMsgTlogKvUtils.java
index b2cecd8248..6982b5bea0 100644
---
a/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogkv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogkv/InLongMsgTlogKvUtils.java
+++
b/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogkv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogkv/InLongMsgTlogKvUtils.java
@@ -110,7 +110,7 @@ public class InLongMsgTlogKvUtils {
RowFormatInfo rowFormatInfo,
String nullLiteral,
List<String> predefinedFields,
- Map<String, String> entries) {
+ Map<String, String> entries) throws Exception {
String[] fieldNames = rowFormatInfo.getFieldNames();
FormatInfo[] fieldFormatInfos = rowFormatInfo.getFieldFormatInfos();
@@ -132,7 +132,7 @@ public class InLongMsgTlogKvUtils {
fieldName,
fieldFormatInfo,
fieldText,
- nullLiteral);
+ nullLiteral, null);
row.setField(i, field);
}
@@ -147,7 +147,7 @@ public class InLongMsgTlogKvUtils {
fieldName,
fieldFormatInfo,
fieldText,
- nullLiteral);
+ nullLiteral, null);
row.setField(i, field);
}
diff --git
a/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogkv/src/test/java/org/apache/inlong/sort/formats/inlongmsgtlogkv/InLongMsgTlogKvFormatDeserializerTest.java
b/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogkv/src/test/java/org/apache/inlong/sort/formats/inlongmsgtlogkv/InLongMsgTlogKvFormatDeserializerTest.java
index 598d7bd5dd..d70d6f18ba 100644
---
a/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogkv/src/test/java/org/apache/inlong/sort/formats/inlongmsgtlogkv/InLongMsgTlogKvFormatDeserializerTest.java
+++
b/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogkv/src/test/java/org/apache/inlong/sort/formats/inlongmsgtlogkv/InLongMsgTlogKvFormatDeserializerTest.java
@@ -209,5 +209,11 @@ public class InLongMsgTlogKvFormatDeserializerTest {
InLongMsgBody body, Exception exception) throws Exception {
rowCount++;
}
+
+ @Override
+ public void onConvertingFieldFailure(String fieldName, String
fieldText, FormatInfo formatInfo,
+ Exception exception) throws Exception {
+ rowCount++;
+ }
}
}
diff --git
a/inlong-sort/sort-formats/format-row/format-kv/src/main/java/org/apache/inlong/sort/formats/kv/KvDeserializationSchema.java
b/inlong-sort/sort-formats/format-row/format-kv/src/main/java/org/apache/inlong/sort/formats/kv/KvDeserializationSchema.java
index 0e3f35180e..eb3029db71 100644
---
a/inlong-sort/sort-formats/format-row/format-kv/src/main/java/org/apache/inlong/sort/formats/kv/KvDeserializationSchema.java
+++
b/inlong-sort/sort-formats/format-row/format-kv/src/main/java/org/apache/inlong/sort/formats/kv/KvDeserializationSchema.java
@@ -162,7 +162,7 @@ public final class KvDeserializationSchema extends
DefaultDeserializationSchema<
fieldName,
fieldFormatInfo,
fieldText,
- nullLiteral);
+ nullLiteral, null);
row.setField(i, field);
}
diff --git
a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgUtils.java
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgUtils.java
index f48c1c7a0f..f30d916341 100644
---
a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgUtils.java
+++
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgUtils.java
@@ -76,6 +76,11 @@ public class InLongMsgUtils {
public static final String DEFAULT_TIME_FIELD_NAME = "inlongmsg_time";
public static final String DEFAULT_ATTRIBUTES_FIELD_NAME =
"inlongmsg_attributes";
+ public static final boolean DEFAULT_IS_RETAIN_PREDEFINED_FIELD = false;
+ public static final boolean DEFAULT_IS_DELETE_ESCAPE_CHAR = true;
+ public static final boolean DEFAULT_IS_PATCH_ESCAPE_CHAR = false;
+ public static final boolean DEFAULT_IS_DELETE_HEAD_DELIMITER = false;
+
private static final FieldToRowDataConverters.FieldToRowDataConverter
TIME_FIELD_CONVERTER =
FieldToRowDataConverters.createConverter(new TimestampType());
diff --git
a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-binlog/src/main/java/org/apache/inlong/sort/formats/inlongmsgbinlog/InLongMsgBinlogFormatDeserializer.java
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-binlog/src/main/java/org/apache/inlong/sort/formats/inlongmsgbinlog/InLongMsgBinlogFormatDeserializer.java
index 53a0ba64f4..fca7ac1cdc 100644
---
a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-binlog/src/main/java/org/apache/inlong/sort/formats/inlongmsgbinlog/InLongMsgBinlogFormatDeserializer.java
+++
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-binlog/src/main/java/org/apache/inlong/sort/formats/inlongmsgbinlog/InLongMsgBinlogFormatDeserializer.java
@@ -150,7 +150,8 @@ public final class InLongMsgBinlogFormatDeserializer
extends AbstractInLongMsgFo
metadataFieldName,
head.getAttributes(),
body.getData(),
- includeUpdateBefore);
+ includeUpdateBefore,
+ failureHandler);
}
@Override
diff --git
a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-binlog/src/main/java/org/apache/inlong/sort/formats/inlongmsgbinlog/InLongMsgBinlogUtils.java
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-binlog/src/main/java/org/apache/inlong/sort/formats/inlongmsgbinlog/InLongMsgBinlogUtils.java
index 16ed2a7056..53c00d0daa 100644
---
a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-binlog/src/main/java/org/apache/inlong/sort/formats/inlongmsgbinlog/InLongMsgBinlogUtils.java
+++
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-binlog/src/main/java/org/apache/inlong/sort/formats/inlongmsgbinlog/InLongMsgBinlogUtils.java
@@ -21,6 +21,7 @@ import
org.apache.inlong.common.pojo.sort.dataflow.field.format.FormatInfo;
import org.apache.inlong.common.pojo.sort.dataflow.field.format.RowFormatInfo;
import org.apache.inlong.sort.formats.base.TableFormatUtils;
import org.apache.inlong.sort.formats.binlog.InLongBinlog;
+import org.apache.inlong.sort.formats.inlongmsg.FailureHandler;
import org.apache.inlong.sort.formats.inlongmsg.InLongMsgBody;
import org.apache.inlong.sort.formats.inlongmsg.InLongMsgHead;
@@ -34,7 +35,6 @@ import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.logical.TimestampType;
import org.apache.flink.table.types.logical.VarCharType;
-import java.io.IOException;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Collections;
@@ -145,7 +145,7 @@ public class InLongMsgBinlogUtils {
String metadataFieldName,
Map<String, String> attributes,
byte[] bytes,
- boolean includeUpdateBefore) throws IOException {
+ boolean includeUpdateBefore, FailureHandler failureHandler) throws
Exception {
InLongBinlog.RowData rowData = InLongBinlog.RowData.parseFrom(bytes);
@@ -162,7 +162,8 @@ public class InLongMsgBinlogUtils {
attributes,
rowData,
DBSYNC_OPERATION_INERT,
- rowData.getAfterColumnsList()));
+ rowData.getAfterColumnsList(),
+ failureHandler));
break;
case UPDATE:
if (includeUpdateBefore) {
@@ -175,7 +176,8 @@ public class InLongMsgBinlogUtils {
attributes,
rowData,
DBSYNC_OPERATION_UPDATE_BEFORE,
- rowData.getBeforeColumnsList()));
+ rowData.getBeforeColumnsList(),
+ failureHandler));
}
rows.add(
constructRowData(
@@ -186,7 +188,8 @@ public class InLongMsgBinlogUtils {
attributes,
rowData,
DBSYNC_OPERATION_UPDATE,
- rowData.getAfterColumnsList()));
+ rowData.getAfterColumnsList(),
+ failureHandler));
break;
case DELETE:
rows.add(
@@ -198,7 +201,8 @@ public class InLongMsgBinlogUtils {
attributes,
rowData,
DBSYNC_OPERATION_DELETE,
- rowData.getBeforeColumnsList()));
+ rowData.getBeforeColumnsList(),
+ failureHandler));
break;
default:
return null;
@@ -215,7 +219,7 @@ public class InLongMsgBinlogUtils {
Map<String, String> attributes,
InLongBinlog.RowData rowData,
String operation,
- List<InLongBinlog.Column> columns) {
+ List<InLongBinlog.Column> columns, FailureHandler failureHandler)
throws Exception {
List<Object> headFields = new ArrayList<>();
if (timeFieldName != null) {
@@ -270,7 +274,7 @@ public class InLongMsgBinlogUtils {
fieldName,
dataFieldFormatInfos[i],
fieldText,
- null);
+ null, failureHandler);
row.setField(i + headFields.size(), field);
}
}
diff --git
a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-csv/src/main/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvFormatDeserializer.java
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-csv/src/main/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvFormatDeserializer.java
index 3e9f0843f5..aac79533ef 100644
---
a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-csv/src/main/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvFormatDeserializer.java
+++
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-csv/src/main/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvFormatDeserializer.java
@@ -44,6 +44,8 @@ import static
org.apache.inlong.sort.formats.base.TableFormatConstants.DEFAULT_D
import static
org.apache.inlong.sort.formats.base.TableFormatConstants.DEFAULT_LINE_DELIMITER;
import static
org.apache.inlong.sort.formats.base.TableFormatConstants.DEFAULT_RETAIN_PREDEFINED_FIELD;
import static
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.DEFAULT_ATTRIBUTES_FIELD_NAME;
+import static
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.DEFAULT_IS_DELETE_ESCAPE_CHAR;
+import static
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.DEFAULT_IS_RETAIN_PREDEFINED_FIELD;
import static
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.DEFAULT_TIME_FIELD_NAME;
import static
org.apache.inlong.sort.formats.inlongmsgcsv.InLongMsgCsvUtils.DEFAULT_DELETE_HEAD_DELIMITER;
@@ -122,6 +124,11 @@ public final class InLongMsgCsvFormatDeserializer extends
AbstractInLongMsgForma
*/
private boolean retainPredefinedField = true;
+ /**
+ * True if delete escape char while parsing.
+ */
+ private boolean isDeleteEscapeChar = DEFAULT_IS_DELETE_ESCAPE_CHAR;
+
public InLongMsgCsvFormatDeserializer(
@Nonnull RowFormatInfo rowFormatInfo,
@Nullable String timeFieldName,
@@ -148,8 +155,9 @@ public final class InLongMsgCsvFormatDeserializer extends
AbstractInLongMsgForma
nullLiteral,
deleteHeadDelimiter,
metadataKeys,
+ DEFAULT_IS_DELETE_ESCAPE_CHAR,
+ retainPredefinedField,
InLongMsgUtils.getDefaultExceptionHandler(ignoreErrors));
- this.retainPredefinedField = retainPredefinedField;
}
public InLongMsgCsvFormatDeserializer(
@@ -177,6 +185,8 @@ public final class InLongMsgCsvFormatDeserializer extends
AbstractInLongMsgForma
nullLiteral,
deleteHeadDelimiter,
metadataKeys,
+ DEFAULT_IS_DELETE_ESCAPE_CHAR,
+ DEFAULT_IS_RETAIN_PREDEFINED_FIELD,
InLongMsgUtils.getDefaultExceptionHandler(ignoreErrors));
}
@@ -192,6 +202,8 @@ public final class InLongMsgCsvFormatDeserializer extends
AbstractInLongMsgForma
@Nullable String nullLiteral,
boolean deleteHeadDelimiter,
List<String> metadataKeys,
+ boolean isDeleteEscapeChar,
+ boolean retainPredefinedField,
@Nonnull FailureHandler failureHandler) {
super(failureHandler);
@@ -206,6 +218,8 @@ public final class InLongMsgCsvFormatDeserializer extends
AbstractInLongMsgForma
this.nullLiteral = nullLiteral;
this.deleteHeadDelimiter = deleteHeadDelimiter;
this.metadataKeys = metadataKeys;
+ this.isDeleteEscapeChar = isDeleteEscapeChar;
+ this.retainPredefinedField = retainPredefinedField;
converters = Arrays.stream(rowFormatInfo.getFieldFormatInfos())
.map(formatInfo -> FieldToRowDataConverters.createConverter(
@@ -236,17 +250,18 @@ public final class InLongMsgCsvFormatDeserializer extends
AbstractInLongMsgForma
lineDelimiter,
escapeChar,
quoteChar,
- deleteHeadDelimiter);
+ deleteHeadDelimiter,
+ isDeleteEscapeChar);
}
@Override
- protected List<RowData> convertRowDataList(InLongMsgHead head,
InLongMsgBody body) {
+ protected List<RowData> convertRowDataList(InLongMsgHead head,
InLongMsgBody body) throws Exception {
GenericRowData genericRowData = InLongMsgCsvUtils.deserializeRowData(
rowFormatInfo,
nullLiteral,
retainPredefinedField ? head.getPredefinedFields() :
Collections.emptyList(),
body.getFields(),
- converters);
+ converters, failureHandler);
// Decorate result with time and attributes fields if needed
genericRowData = InLongMsgUtils.decorateRowDataWithNeededHeadFields(
@@ -272,6 +287,7 @@ public final class InLongMsgCsvFormatDeserializer extends
AbstractInLongMsgForma
private Boolean deleteHeadDelimiter = DEFAULT_DELETE_HEAD_DELIMITER;
private List<String> metadataKeys = Collections.emptyList();
private Boolean retainPredefinedField =
DEFAULT_RETAIN_PREDEFINED_FIELD;
+ private boolean isDeleteEscapeChar = DEFAULT_IS_DELETE_ESCAPE_CHAR;
public Builder(RowFormatInfo rowFormatInfo) {
super(rowFormatInfo);
@@ -312,7 +328,28 @@ public final class InLongMsgCsvFormatDeserializer extends
AbstractInLongMsgForma
return this;
}
+ public Builder setDeleteEscapeChar(Boolean isDeleteEscapeChar) {
+ this.isDeleteEscapeChar = isDeleteEscapeChar;
+ return this;
+ }
+
public InLongMsgCsvFormatDeserializer build() {
+ if (failureHandler != null) {
+ return new InLongMsgCsvFormatDeserializer(
+ rowFormatInfo,
+ timeFieldName,
+ attributesFieldName,
+ charset,
+ delimiter,
+ lineDelimiter,
+ escapeChar,
+ quoteChar,
+ nullLiteral,
+ deleteHeadDelimiter,
+ metadataKeys,
+ isDeleteEscapeChar,
+ retainPredefinedField, failureHandler);
+ }
return new InLongMsgCsvFormatDeserializer(
rowFormatInfo,
timeFieldName,
@@ -327,6 +364,7 @@ public final class InLongMsgCsvFormatDeserializer extends
AbstractInLongMsgForma
metadataKeys,
ignoreErrors,
retainPredefinedField);
+
}
}
@@ -355,7 +393,8 @@ public final class InLongMsgCsvFormatDeserializer extends
AbstractInLongMsgForma
Objects.equals(escapeChar, that.escapeChar) &&
Objects.equals(quoteChar, that.quoteChar) &&
Objects.equals(nullLiteral, that.nullLiteral) &&
- Objects.equals(metadataKeys, that.metadataKeys);
+ Objects.equals(metadataKeys, that.metadataKeys) &&
+ Objects.equals(isDeleteEscapeChar, that.isDeleteEscapeChar);
}
@@ -363,7 +402,7 @@ public final class InLongMsgCsvFormatDeserializer extends
AbstractInLongMsgForma
public int hashCode() {
return Objects.hash(super.hashCode(), rowFormatInfo, timeFieldName,
attributesFieldName, charset, delimiter, lineDelimiter,
escapeChar, quoteChar,
- nullLiteral, deleteHeadDelimiter, metadataKeys);
+ nullLiteral, deleteHeadDelimiter, metadataKeys,
isDeleteEscapeChar);
}
@Override
@@ -380,6 +419,7 @@ public final class InLongMsgCsvFormatDeserializer extends
AbstractInLongMsgForma
", nullLiteral='" + nullLiteral + '\'' +
", deleteHeadDelimiter=" + deleteHeadDelimiter +
", metadataKeys=" + metadataKeys +
+ ", isDeleteEscapeChar=" + isDeleteEscapeChar +
'}';
}
}
diff --git
a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-csv/src/main/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvUtils.java
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-csv/src/main/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvUtils.java
index a7f0fd1905..99a44d33c0 100644
---
a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-csv/src/main/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvUtils.java
+++
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-csv/src/main/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvUtils.java
@@ -21,6 +21,7 @@ import
org.apache.inlong.common.pojo.sort.dataflow.field.format.FormatInfo;
import org.apache.inlong.common.pojo.sort.dataflow.field.format.RowFormatInfo;
import org.apache.inlong.sort.formats.base.FieldToRowDataConverters;
import
org.apache.inlong.sort.formats.base.FieldToRowDataConverters.FieldToRowDataConverter;
+import org.apache.inlong.sort.formats.inlongmsg.FailureHandler;
import org.apache.inlong.sort.formats.inlongmsg.InLongMsgBody;
import org.apache.inlong.sort.formats.inlongmsg.InLongMsgHead;
@@ -109,11 +110,12 @@ public class InLongMsgCsvUtils {
Character lineDelimiter,
Character escapeChar,
Character quoteChar,
- boolean deleteHeadDelimiter) {
+ boolean deleteHeadDelimiter, boolean isDeleteEscapeChar) {
String bodyStr = new String(bytes, Charset.forName(charset));
String[][] split =
- splitCsv(bodyStr, delimiter, escapeChar, quoteChar,
lineDelimiter, deleteHeadDelimiter);
+ splitCsv(bodyStr, delimiter, escapeChar, quoteChar,
lineDelimiter,
+ deleteHeadDelimiter, isDeleteEscapeChar);
return Arrays.stream(split)
.map((line) -> {
@@ -132,7 +134,8 @@ public class InLongMsgCsvUtils {
String nullLiteral,
List<String> predefinedFields,
List<String> fields,
- FieldToRowDataConverters.FieldToRowDataConverter[] converters) {
+ FieldToRowDataConverters.FieldToRowDataConverter[] converters,
+ FailureHandler failureHandler) throws Exception {
String[] fieldNames = rowFormatInfo.getFieldNames();
FormatInfo[] fieldFormatInfos = rowFormatInfo.getFieldFormatInfos();
@@ -160,7 +163,7 @@ public class InLongMsgCsvUtils {
fieldName,
fieldFormatInfo,
fieldText,
- nullLiteral));
+ nullLiteral, failureHandler));
rowData.setField(i, field);
}
@@ -180,7 +183,7 @@ public class InLongMsgCsvUtils {
fieldName,
fieldFormatInfo,
fieldText,
- nullLiteral));
+ nullLiteral, failureHandler));
rowData.setField(i + predefinedFields.size(), field);
}
diff --git
a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-csv/src/test/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvFormatDeserializerTest.java
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-csv/src/test/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvFormatDeserializerTest.java
index fb8e00ab41..e7aef2c6b3 100644
---
a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-csv/src/test/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvFormatDeserializerTest.java
+++
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-csv/src/test/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvFormatDeserializerTest.java
@@ -278,6 +278,8 @@ public class InLongMsgCsvFormatDeserializerTest {
null,
InLongMsgCsvUtils.DEFAULT_DELETE_HEAD_DELIMITER,
Collections.emptyList(),
+ true,
+ true,
errorHandler);
InLongMsg inLongMsg = InLongMsg.newInLongMsg();
@@ -290,7 +292,7 @@ public class InLongMsgCsvFormatDeserializerTest {
List<RowData> actualRows = new ArrayList<>();
Collector<RowData> collector = new ListCollector<>(actualRows);
deserializer.flatMap(inLongMsg.buildArray(), collector);
- assertEquals(0, errorHandler.getRowCount());
+ assertEquals(1, errorHandler.getRowCount());
InLongMsg inLongMsg1Head = InLongMsg.newInLongMsg();
String abNormalAttrs =
"m=0&streamId=testInterfaceId&__addcol1__=1&__addcol2__=2";
@@ -874,5 +876,11 @@ public class InLongMsgCsvFormatDeserializerTest {
Exception exception) throws Exception {
rowCount++;
}
+
+ @Override
+ public void onConvertingFieldFailure(String fieldName, String
fieldText, FormatInfo formatInfo,
+ Exception exception) throws Exception {
+ rowCount++;
+ }
}
}
diff --git
a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-kv/src/main/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvFormatDeserializer.java
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-kv/src/main/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvFormatDeserializer.java
index b3dff0dec8..a9caf3bac5 100644
---
a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-kv/src/main/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvFormatDeserializer.java
+++
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-kv/src/main/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvFormatDeserializer.java
@@ -44,6 +44,7 @@ import static
org.apache.inlong.sort.formats.base.TableFormatConstants.DEFAULT_E
import static
org.apache.inlong.sort.formats.base.TableFormatConstants.DEFAULT_KV_DELIMITER;
import static
org.apache.inlong.sort.formats.base.TableFormatConstants.DEFAULT_LINE_DELIMITER;
import static
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.DEFAULT_ATTRIBUTES_FIELD_NAME;
+import static
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.DEFAULT_IS_DELETE_ESCAPE_CHAR;
import static
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.DEFAULT_TIME_FIELD_NAME;
/**
@@ -120,6 +121,11 @@ public final class InLongMsgKvFormatDeserializer extends
AbstractInLongMsgFormat
*/
private boolean retainPredefinedField = true;
+ /**
+ * True if delete escape char while parsing.
+ */
+ private boolean isDeleteEscapeChar = DEFAULT_IS_DELETE_ESCAPE_CHAR;
+
public InLongMsgKvFormatDeserializer(
@Nonnull RowFormatInfo rowFormatInfo,
@Nullable String timeFieldName,
@@ -144,6 +150,7 @@ public final class InLongMsgKvFormatDeserializer extends
AbstractInLongMsgFormat
escapeChar,
quoteChar,
nullLiteral,
+ DEFAULT_IS_DELETE_ESCAPE_CHAR,
InLongMsgUtils.getDefaultExceptionHandler(ignoreErrors));
if (retainPredefinedField != null) {
this.retainPredefinedField = retainPredefinedField;
@@ -173,6 +180,7 @@ public final class InLongMsgKvFormatDeserializer extends
AbstractInLongMsgFormat
escapeChar,
quoteChar,
nullLiteral,
+ DEFAULT_IS_DELETE_ESCAPE_CHAR,
InLongMsgUtils.getDefaultExceptionHandler(ignoreErrors));
}
@@ -187,6 +195,7 @@ public final class InLongMsgKvFormatDeserializer extends
AbstractInLongMsgFormat
@Nullable Character escapeChar,
@Nullable Character quoteChar,
@Nullable String nullLiteral,
+ @Nullable Boolean isDeleteEscapeChar,
@Nonnull FailureHandler failureHandler) {
super(failureHandler);
@@ -200,6 +209,7 @@ public final class InLongMsgKvFormatDeserializer extends
AbstractInLongMsgFormat
this.escapeChar = escapeChar;
this.quoteChar = quoteChar;
this.nullLiteral = nullLiteral;
+ this.isDeleteEscapeChar = isDeleteEscapeChar;
converters = Arrays.stream(rowFormatInfo.getFieldFormatInfos())
.map(formatInfo -> FieldToRowDataConverters.createConverter(
@@ -231,17 +241,19 @@ public final class InLongMsgKvFormatDeserializer extends
AbstractInLongMsgFormat
kvDelimiter,
lineDelimiter,
escapeChar,
- quoteChar);
+ quoteChar,
+ isDeleteEscapeChar);
}
@Override
- protected List<RowData> convertRowDataList(InLongMsgHead head,
InLongMsgBody body) {
+ protected List<RowData> convertRowDataList(InLongMsgHead head,
InLongMsgBody body) throws Exception {
GenericRowData genericRowData = InLongMsgKvUtils.deserializeRowData(
rowFormatInfo,
nullLiteral,
retainPredefinedField ? head.getPredefinedFields() :
Collections.emptyList(),
body.getEntries(),
- converters);
+ converters,
+ failureHandler);
// Decorate result with time and attributes fields if needed
return
Collections.singletonList(InLongMsgUtils.decorateRowDataWithNeededHeadFields(
diff --git
a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-kv/src/main/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvUtils.java
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-kv/src/main/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvUtils.java
index 8bcf77c1ec..703a064384 100644
---
a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-kv/src/main/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvUtils.java
+++
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-kv/src/main/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvUtils.java
@@ -20,6 +20,7 @@ package org.apache.inlong.sort.formats.inlongmsgkv;
import org.apache.inlong.common.pojo.sort.dataflow.field.format.FormatInfo;
import org.apache.inlong.common.pojo.sort.dataflow.field.format.RowFormatInfo;
import
org.apache.inlong.sort.formats.base.FieldToRowDataConverters.FieldToRowDataConverter;
+import org.apache.inlong.sort.formats.inlongmsg.FailureHandler;
import org.apache.inlong.sort.formats.inlongmsg.InLongMsgBody;
import org.apache.inlong.sort.formats.inlongmsg.InLongMsgHead;
@@ -91,7 +92,8 @@ public class InLongMsgKvUtils {
char kvDelimiter,
Character lineDelimiter,
Character escapeChar,
- Character quoteChar) {
+ Character quoteChar,
+ boolean isDeleteEscapeChar) {
String text = new String(bytes, Charset.forName(charset));
List<Map<String, String>> list =
@@ -101,7 +103,8 @@ public class InLongMsgKvUtils {
kvDelimiter,
escapeChar,
quoteChar,
- lineDelimiter);
+ lineDelimiter,
+ isDeleteEscapeChar);
return list.stream().map((line) -> new InLongMsgBody(
bytes,
@@ -124,7 +127,8 @@ public class InLongMsgKvUtils {
String nullLiteral,
List<String> predefinedFields,
Map<String, String> entries,
- FieldToRowDataConverter[] converters) {
+ FieldToRowDataConverter[] converters,
+ FailureHandler failureHandler) throws Exception {
String[] fieldNames = rowFormatInfo.getFieldNames();
FormatInfo[] fieldFormatInfos = rowFormatInfo.getFieldFormatInfos();
@@ -146,7 +150,7 @@ public class InLongMsgKvUtils {
fieldName,
fieldFormatInfo,
fieldText,
- nullLiteral));
+ nullLiteral, failureHandler));
row.setField(i, field);
}
@@ -161,7 +165,8 @@ public class InLongMsgKvUtils {
fieldName,
fieldFormatInfo,
fieldText,
- nullLiteral));
+ nullLiteral,
+ failureHandler));
row.setField(i, field);
}
diff --git
a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-kv/src/test/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvFormatDeserializerTest.java
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-kv/src/test/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvFormatDeserializerTest.java
index d99882c30f..c98c0c1f6c 100644
---
a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-kv/src/test/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvFormatDeserializerTest.java
+++
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-kv/src/test/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvFormatDeserializerTest.java
@@ -84,6 +84,7 @@ public class InLongMsgKvFormatDeserializerTest {
null,
null,
null,
+ true,
errorHandler);
InLongMsg inLongMsg = InLongMsg.newInLongMsg();
@@ -96,7 +97,7 @@ public class InLongMsgKvFormatDeserializerTest {
List<RowData> actualRows = new ArrayList<>();
Collector<RowData> collector = new ListCollector<>(actualRows);
deserializer.flatMap(inLongMsg.buildArray(), collector);
- assertEquals(0, errorHandler.getRowCount());
+ assertEquals(1, errorHandler.getRowCount());
InLongMsg inLongMsg1 = InLongMsg.newInLongMsg();
String abNormalAttrs =
"m=0&iname=testInterfaceId&__addcol1__=1&__addcol2__=2";
@@ -341,5 +342,11 @@ public class InLongMsgKvFormatDeserializerTest {
InLongMsgBody body, Exception exception) throws Exception {
rowCount++;
}
+
+ @Override
+ public void onConvertingFieldFailure(String fieldName, String
fieldText, FormatInfo formatInfo,
+ Exception exception) throws Exception {
+ rowCount++;
+ }
}
}
diff --git
a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-tlogcsv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvFormatDeserializer.java
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-tlogcsv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvFormatDeserializer.java
index 814466f079..415c1efd64 100644
---
a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-tlogcsv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvFormatDeserializer.java
+++
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-tlogcsv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvFormatDeserializer.java
@@ -41,7 +41,10 @@ import java.util.List;
import java.util.Objects;
import static
org.apache.inlong.sort.formats.base.TableFormatConstants.DEFAULT_DELIMITER;
+import static
org.apache.inlong.sort.formats.base.TableFormatConstants.DEFAULT_LINE_DELIMITER;
import static
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.DEFAULT_ATTRIBUTES_FIELD_NAME;
+import static
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.DEFAULT_IS_DELETE_ESCAPE_CHAR;
+import static
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.DEFAULT_IS_DELETE_HEAD_DELIMITER;
import static
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.DEFAULT_TIME_FIELD_NAME;
/**
@@ -92,6 +95,9 @@ public final class InLongMsgTlogCsvFormatDeserializer extends
AbstractInLongMsgF
@Nullable
private final Character quoteChar;
+ @Nullable
+ private final Character lineDelimiter;
+
@Nonnull
private Boolean isIncludeFirstSegment = false;
/**
@@ -104,6 +110,14 @@ public final class InLongMsgTlogCsvFormatDeserializer
extends AbstractInLongMsgF
private final FieldToRowDataConverter[] converters;
+ /**
+ * True if delete escape char while parsing.
+ */
+ private boolean isDeleteEscapeChar = DEFAULT_IS_DELETE_ESCAPE_CHAR;
+
+ private boolean isDeleteHeadDelimiter = DEFAULT_IS_DELETE_HEAD_DELIMITER;
+
+ @Deprecated
public InLongMsgTlogCsvFormatDeserializer(
@Nonnull RowFormatInfo rowFormatInfo,
@Nullable String timeFieldName,
@@ -123,9 +137,42 @@ public final class InLongMsgTlogCsvFormatDeserializer
extends AbstractInLongMsgF
delimiter,
escapeChar,
quoteChar,
+ DEFAULT_LINE_DELIMITER,
nullLiteral,
metadataKeys,
+ DEFAULT_IS_DELETE_ESCAPE_CHAR,
false,
+ DEFAULT_IS_DELETE_HEAD_DELIMITER,
+ InLongMsgUtils.getDefaultExceptionHandler(ignoreErrors));
+ }
+
+ @Deprecated
+ public InLongMsgTlogCsvFormatDeserializer(
+ @Nonnull RowFormatInfo rowFormatInfo,
+ @Nullable String timeFieldName,
+ @Nullable String attributesFieldName,
+ @Nonnull String charset,
+ @Nonnull Character delimiter,
+ @Nullable Character escapeChar,
+ @Nullable Character quoteChar,
+ @Nullable Character lineDelimiter,
+ @Nullable String nullLiteral,
+ List<String> metadataKeys,
+ @Nonnull Boolean ignoreErrors) {
+ this(
+ rowFormatInfo,
+ timeFieldName,
+ attributesFieldName,
+ charset,
+ delimiter,
+ escapeChar,
+ quoteChar,
+ lineDelimiter,
+ nullLiteral,
+ metadataKeys,
+ DEFAULT_IS_DELETE_ESCAPE_CHAR,
+ false,
+ DEFAULT_IS_DELETE_HEAD_DELIMITER,
InLongMsgUtils.getDefaultExceptionHandler(ignoreErrors));
}
@@ -141,6 +188,29 @@ public final class InLongMsgTlogCsvFormatDeserializer
extends AbstractInLongMsgF
List<String> metadataKeys,
@Nonnull Boolean isIncludeFirstSegment,
@Nonnull FailureHandler failureHandler) {
+ this(rowFormatInfo, timeFieldName, attributesFieldName,
+ charset, delimiter, escapeChar, quoteChar,
+ DEFAULT_LINE_DELIMITER,
+ nullLiteral, metadataKeys, DEFAULT_IS_DELETE_ESCAPE_CHAR,
isIncludeFirstSegment,
+ DEFAULT_IS_DELETE_HEAD_DELIMITER, failureHandler);
+
+ }
+
+ public InLongMsgTlogCsvFormatDeserializer(
+ @Nonnull RowFormatInfo rowFormatInfo,
+ @Nullable String timeFieldName,
+ @Nullable String attributesFieldName,
+ @Nonnull String charset,
+ @Nonnull Character delimiter,
+ @Nullable Character escapeChar,
+ @Nullable Character quoteChar,
+ @Nullable Character lineDelimiter,
+ @Nullable String nullLiteral,
+ List<String> metadataKeys,
+ @Nonnull Boolean isDeleteEscapeChar,
+ @Nonnull Boolean isIncludeFirstSegment,
+ @Nonnull Boolean isDeleteHeadDelimiter,
+ @Nonnull FailureHandler failureHandler) {
super(failureHandler);
this.rowFormatInfo = rowFormatInfo;
@@ -150,9 +220,12 @@ public final class InLongMsgTlogCsvFormatDeserializer
extends AbstractInLongMsgF
this.delimiter = delimiter;
this.escapeChar = escapeChar;
this.quoteChar = quoteChar;
+ this.lineDelimiter = lineDelimiter;
this.nullLiteral = nullLiteral;
this.metadataKeys = metadataKeys;
this.isIncludeFirstSegment = isIncludeFirstSegment;
+ this.isDeleteHeadDelimiter = isDeleteHeadDelimiter;
+ this.isDeleteEscapeChar = isDeleteEscapeChar;
converters = Arrays.stream(rowFormatInfo.getFieldFormatInfos())
.map(formatInfo -> FieldToRowDataConverters.createConverter(
TableFormatUtils.deriveLogicalType(formatInfo)))
@@ -175,9 +248,8 @@ public final class InLongMsgTlogCsvFormatDeserializer
extends AbstractInLongMsgF
@Override
protected List<InLongMsgBody> parseBodyList(byte[] bytes) throws Exception
{
- return Collections.singletonList(
- InLongMsgTlogCsvUtils.parseBody(bytes, charset, delimiter,
escapeChar,
- quoteChar, isIncludeFirstSegment));
+ return InLongMsgTlogCsvUtils.parseBody(bytes, charset, delimiter,
escapeChar,
+ quoteChar, lineDelimiter, isDeleteEscapeChar,
isIncludeFirstSegment, isDeleteHeadDelimiter);
}
@Override
@@ -188,9 +260,9 @@ public final class InLongMsgTlogCsvFormatDeserializer
extends AbstractInLongMsgF
nullLiteral,
head.getPredefinedFields(),
body.getFields(),
- converters);
+ converters, failureHandler);
- GenericRowData genericRowData = (GenericRowData)
InLongMsgUtils.decorateRowDataWithNeededHeadFields(
+ GenericRowData genericRowData =
InLongMsgUtils.decorateRowDataWithNeededHeadFields(
timeFieldName,
attributesFieldName,
head.getTime(),
@@ -208,8 +280,11 @@ public final class InLongMsgTlogCsvFormatDeserializer
extends AbstractInLongMsgF
private String timeFieldName = DEFAULT_TIME_FIELD_NAME;
private String attributesFieldName = DEFAULT_ATTRIBUTES_FIELD_NAME;
private Character delimiter = DEFAULT_DELIMITER;
+ private Character lineDelimiter = DEFAULT_LINE_DELIMITER;
private List<String> metadataKeys = Collections.emptyList();
private boolean isIncludeFirstSegment = false;
+ private boolean isDeleteEscapeChar = DEFAULT_IS_DELETE_ESCAPE_CHAR;
+ private boolean isDeleteHeadDelimiter =
DEFAULT_IS_DELETE_HEAD_DELIMITER;
public Builder(RowFormatInfo rowFormatInfo) {
super(rowFormatInfo);
@@ -230,6 +305,11 @@ public final class InLongMsgTlogCsvFormatDeserializer
extends AbstractInLongMsgF
return this;
}
+ public Builder setLineDelimiter(Character lineDelimiter) {
+ this.lineDelimiter = lineDelimiter;
+ return this;
+ }
+
public Builder setMetadataKeys(List<String> metadataKeys) {
this.metadataKeys = metadataKeys;
return this;
@@ -240,6 +320,16 @@ public final class InLongMsgTlogCsvFormatDeserializer
extends AbstractInLongMsgF
return this;
}
+ public Builder setDeleteEscapeChar(Boolean isDeleteEscapeChar) {
+ this.isDeleteEscapeChar = isDeleteEscapeChar;
+ return this;
+ }
+
+ public Builder setDeleteHeadDelimiter(Boolean isDeleteHeadDelimiter) {
+ this.isDeleteHeadDelimiter = isDeleteHeadDelimiter;
+ return this;
+ }
+
public InLongMsgTlogCsvFormatDeserializer build() {
return new InLongMsgTlogCsvFormatDeserializer(
rowFormatInfo,
@@ -249,9 +339,12 @@ public final class InLongMsgTlogCsvFormatDeserializer
extends AbstractInLongMsgF
delimiter,
escapeChar,
quoteChar,
+ lineDelimiter,
nullLiteral,
metadataKeys,
+ isDeleteEscapeChar,
isIncludeFirstSegment,
+ isDeleteHeadDelimiter,
InLongMsgUtils.getDefaultExceptionHandler(ignoreErrors));
}
}
@@ -278,15 +371,17 @@ public final class InLongMsgTlogCsvFormatDeserializer
extends AbstractInLongMsgF
delimiter.equals(that.delimiter) &&
Objects.equals(escapeChar, that.escapeChar) &&
Objects.equals(quoteChar, that.quoteChar) &&
+ Objects.equals(lineDelimiter, that.lineDelimiter) &&
Objects.equals(nullLiteral, that.nullLiteral) &&
Objects.equals(metadataKeys, that.metadataKeys) &&
- Objects.equals(isIncludeFirstSegment,
that.isIncludeFirstSegment);
+ Objects.equals(isIncludeFirstSegment,
that.isIncludeFirstSegment) &&
+ Objects.equals(isDeleteHeadDelimiter,
that.isDeleteHeadDelimiter);
}
@Override
public int hashCode() {
return Objects.hash(super.hashCode(), rowFormatInfo, timeFieldName,
- attributesFieldName, charset, delimiter, escapeChar, quoteChar,
- nullLiteral, metadataKeys, isIncludeFirstSegment);
+ attributesFieldName, charset, delimiter, escapeChar,
quoteChar, lineDelimiter,
+ nullLiteral, metadataKeys, isIncludeFirstSegment,
isDeleteHeadDelimiter);
}
}
diff --git
a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-tlogcsv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvUtils.java
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-tlogcsv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvUtils.java
index a216cf3429..1e72a668b6 100644
---
a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-tlogcsv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvUtils.java
+++
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-tlogcsv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvUtils.java
@@ -20,6 +20,7 @@ package org.apache.inlong.sort.formats.inlongmsgtlogcsv;
import org.apache.inlong.common.pojo.sort.dataflow.field.format.FormatInfo;
import org.apache.inlong.common.pojo.sort.dataflow.field.format.RowFormatInfo;
import org.apache.inlong.sort.formats.base.FieldToRowDataConverters;
+import org.apache.inlong.sort.formats.inlongmsg.FailureHandler;
import org.apache.inlong.sort.formats.inlongmsg.InLongMsgBody;
import org.apache.inlong.sort.formats.inlongmsg.InLongMsgHead;
@@ -29,11 +30,10 @@ import org.slf4j.LoggerFactory;
import java.nio.charset.Charset;
import java.sql.Timestamp;
-import java.util.Arrays;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
-import java.util.stream.Collectors;
import static
org.apache.inlong.sort.formats.base.TableFormatUtils.deserializeBasicField;
import static
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.INLONGMSG_ATTR_TIME_DT;
@@ -75,13 +75,16 @@ public class InLongMsgTlogCsvUtils {
return new InLongMsgHead(attributes, null, time, predefinedFields);
}
- public static InLongMsgBody parseBody(
+ public static List<InLongMsgBody> parseBody(
byte[] bytes,
String charset,
char delimiter,
Character escapeChar,
Character quoteChar,
- boolean isIncludeFirstSegment) {
+ Character lineDelimiter,
+ boolean isDeleteEscapeChar,
+ boolean isIncludeFirstSegment,
+ boolean isDeleteHeadDelimiter) {
String text;
if (bytes[0] == delimiter) {
text = new String(bytes, 1, bytes.length - 1,
Charset.forName(charset));
@@ -89,13 +92,25 @@ public class InLongMsgTlogCsvUtils {
text = new String(bytes, Charset.forName(charset));
}
- String[] segments = splitCsv(text, delimiter, escapeChar, quoteChar);
-
- String tid = segments[0];
- List<String> fields =
- Arrays.stream(segments, (isIncludeFirstSegment ? 0 : 1),
segments.length).collect(Collectors.toList());
-
- return new InLongMsgBody(bytes, tid, fields, Collections.emptyMap());
+ String[][] segments = splitCsv(text, delimiter, escapeChar, quoteChar,
lineDelimiter,
+ isDeleteHeadDelimiter, isDeleteEscapeChar);
+ String tid = "";
+ List<InLongMsgBody> inLongMsgBodies = new ArrayList<>();
+ if (segments.length > 0) {
+ for (int i = 0; i < segments.length; i++) {
+ tid = segments[i][0];
+ List<String> fields = new ArrayList<>();
+ int startIndex = 1;
+ if (isIncludeFirstSegment) {
+ startIndex = 0;
+ }
+ for (int j = startIndex; j < segments[i].length; j++) {
+ fields.add(segments[i][j]);
+ }
+ inLongMsgBodies.add(new InLongMsgBody(null, tid, fields,
Collections.emptyMap()));
+ }
+ }
+ return inLongMsgBodies;
}
/**
@@ -112,7 +127,8 @@ public class InLongMsgTlogCsvUtils {
String nullLiteral,
List<String> predefinedFields,
List<String> fields,
- FieldToRowDataConverters.FieldToRowDataConverter[] converters) {
+ FieldToRowDataConverters.FieldToRowDataConverter[] converters,
+ FailureHandler failureHandler) throws Exception {
String[] fieldNames = rowFormatInfo.getFieldNames();
FormatInfo[] fieldFormatInfos = rowFormatInfo.getFieldFormatInfos();
@@ -140,7 +156,7 @@ public class InLongMsgTlogCsvUtils {
fieldName,
fieldFormatInfo,
fieldText,
- nullLiteral));
+ nullLiteral, failureHandler));
rowData.setField(i, field);
}
@@ -160,7 +176,7 @@ public class InLongMsgTlogCsvUtils {
fieldName,
fieldFormatInfo,
fieldText,
- nullLiteral));
+ nullLiteral, failureHandler));
rowData.setField(i + predefinedFields.size(), field);
}
diff --git
a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-tlogcsv/src/test/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvFormatDeserializerTest.java
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-tlogcsv/src/test/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvFormatDeserializerTest.java
index 7e83816b4d..5d52981f6d 100644
---
a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-tlogcsv/src/test/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvFormatDeserializerTest.java
+++
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-tlogcsv/src/test/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvFormatDeserializerTest.java
@@ -89,14 +89,16 @@ public class InLongMsgTlogCsvFormatDeserializerTest {
DEFAULT_DELIMITER,
null,
null,
+ '\n',
null,
Collections.emptyList(),
false,
+ false,
+ false,
errorHandler);
-
InLongMsg inLongMsg1 = InLongMsg.newInLongMsg(true);
String attrs = "m=0&dt=1584806400000&__addcol1_=1&__addcol2_=test";
- String body1 = "interfaceId1,field1,field2,field3";
+ String body1 =
"interfaceId1,field1,field2,field3\ninterfaceId1,field1,field2,field3";
String body2 = "interfaceId2,field1,field2,field3";
inLongMsg1.addMsg(attrs, body1.getBytes());
inLongMsg1.addMsg(attrs, body2.getBytes());
@@ -104,7 +106,8 @@ public class InLongMsgTlogCsvFormatDeserializerTest {
List<RowData> actualRows = new ArrayList<>();
Collector<RowData> collector = new ListCollector<>(actualRows);
deserializer.flatMap(inLongMsg1.buildArray(), collector);
- assertEquals(0, errorHandler.getRowCount());
+ assertEquals(3, errorHandler.getRowCount());
+ assertEquals(3, actualRows.size());
InLongMsg inLongMsg1Head = InLongMsg.newInLongMsg();
String abNormalAttrs = "m=0&__addcol1_=1&__addcol2_=test";
@@ -336,5 +339,11 @@ public class InLongMsgTlogCsvFormatDeserializerTest {
throws Exception {
rowCount++;
}
+
+ @Override
+ public void onConvertingFieldFailure(String fieldName, String
fieldText, FormatInfo formatInfo,
+ Exception exception) throws Exception {
+ rowCount++;
+ }
}
}
diff --git
a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-tlogkv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogkv/InLongMsgTlogKvFormatDeserializer.java
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-tlogkv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogkv/InLongMsgTlogKvFormatDeserializer.java
index dee2a5b791..2d54f9db13 100644
---
a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-tlogkv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogkv/InLongMsgTlogKvFormatDeserializer.java
+++
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-tlogkv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogkv/InLongMsgTlogKvFormatDeserializer.java
@@ -45,12 +45,15 @@ import java.util.Objects;
import static
org.apache.inlong.sort.formats.base.TableFormatConstants.DEFAULT_DELIMITER;
import static
org.apache.inlong.sort.formats.base.TableFormatConstants.DEFAULT_ENTRY_DELIMITER;
import static
org.apache.inlong.sort.formats.base.TableFormatConstants.DEFAULT_KV_DELIMITER;
+import static
org.apache.inlong.sort.formats.base.TableFormatConstants.DEFAULT_LINE_DELIMITER;
import static
org.apache.inlong.sort.formats.base.TableFormatConstants.FORMAT_ATTRIBUTE_FIELD_NAME;
import static
org.apache.inlong.sort.formats.base.TableFormatConstants.FORMAT_DELIMITER;
import static
org.apache.inlong.sort.formats.base.TableFormatConstants.FORMAT_ENTRY_DELIMITER;
import static
org.apache.inlong.sort.formats.base.TableFormatConstants.FORMAT_KV_DELIMITER;
+import static
org.apache.inlong.sort.formats.base.TableFormatConstants.FORMAT_LINE_DELIMITER;
import static
org.apache.inlong.sort.formats.base.TableFormatConstants.FORMAT_TIME_FIELD_NAME;
import static
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.DEFAULT_ATTRIBUTES_FIELD_NAME;
+import static
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.DEFAULT_IS_DELETE_ESCAPE_CHAR;
import static
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.DEFAULT_TIME_FIELD_NAME;
import static
org.apache.inlong.sort.formats.inlongmsgtlogkv.InLongMsgTlogKvUtils.DEFAULT_INLONGMSG_TLOGKV_CHARSET;
@@ -115,12 +118,17 @@ public final class InLongMsgTlogKvFormatDeserializer
extends AbstractInLongMsgFo
@Nullable
private final Character quoteChar;
+ @Nullable
+ private final Character lineDelimiter;
+
/**
* The literal represented null values, default "".
*/
@Nullable
private final String nullLiteral;
+ private boolean isDeleteEscapeChar = DEFAULT_IS_DELETE_ESCAPE_CHAR;
+
private final FieldToRowDataConverter[] converters;
public InLongMsgTlogKvFormatDeserializer(
@@ -133,6 +141,7 @@ public final class InLongMsgTlogKvFormatDeserializer
extends AbstractInLongMsgFo
@Nonnull Character kvDelimiter,
@Nullable Character escapeChar,
@Nullable Character quoteChar,
+ @Nullable Character lineDelimiter,
@Nullable String nullLiteral,
@Nonnull Boolean ignoreErrors) {
this(
@@ -145,7 +154,9 @@ public final class InLongMsgTlogKvFormatDeserializer
extends AbstractInLongMsgFo
kvDelimiter,
escapeChar,
quoteChar,
+ lineDelimiter,
nullLiteral,
+ DEFAULT_IS_DELETE_ESCAPE_CHAR,
InLongMsgUtils.getDefaultExceptionHandler(ignoreErrors));
}
@@ -159,7 +170,9 @@ public final class InLongMsgTlogKvFormatDeserializer
extends AbstractInLongMsgFo
@Nonnull Character kvDelimiter,
@Nullable Character escapeChar,
@Nullable Character quoteChar,
+ @Nullable Character lineDelimiter,
@Nullable String nullLiteral,
+ @Nullable Boolean isDeleteEscapeChar,
@Nonnull FailureHandler failureHandler) {
super(failureHandler);
@@ -172,7 +185,9 @@ public final class InLongMsgTlogKvFormatDeserializer
extends AbstractInLongMsgFo
this.kvDelimiter = kvDelimiter;
this.escapeChar = escapeChar;
this.quoteChar = quoteChar;
+ this.lineDelimiter = lineDelimiter;
this.nullLiteral = nullLiteral;
+ this.isDeleteEscapeChar = isDeleteEscapeChar;
this.converters = Arrays.stream(rowFormatInfo.getFieldFormatInfos())
.map(formatInfo -> FieldToRowDataConverters.createConverter(
@@ -193,15 +208,14 @@ public final class InLongMsgTlogKvFormatDeserializer
extends AbstractInLongMsgFo
@Override
protected List<InLongMsgBody> parseBodyList(byte[] bytes) throws Exception
{
- return Collections.singletonList(
- InLongMsgTlogKvUtils.parseBody(
- bytes,
- charset,
- delimiter,
- entryDelimiter,
- kvDelimiter,
- escapeChar,
- quoteChar));
+ return InLongMsgTlogKvUtils.parseBody(
+ bytes,
+ charset,
+ delimiter,
+ entryDelimiter,
+ kvDelimiter,
+ escapeChar,
+ quoteChar, lineDelimiter, isDeleteEscapeChar);
}
@Override
@@ -211,7 +225,7 @@ public final class InLongMsgTlogKvFormatDeserializer
extends AbstractInLongMsgFo
rowFormatInfo,
nullLiteral,
head.getPredefinedFields(),
- body.getEntries(), converters);
+ body.getEntries(), converters, failureHandler);
RowData rowData = InLongMsgUtils.decorateRowWithNeededHeadFields(
timeFieldName,
@@ -233,6 +247,8 @@ public final class InLongMsgTlogKvFormatDeserializer
extends AbstractInLongMsgFo
private Character delimiter = DEFAULT_DELIMITER;
private Character entryDelimiter = DEFAULT_ENTRY_DELIMITER;
private Character kvDelimiter = DEFAULT_KV_DELIMITER;
+ private Character lineDelimiter = DEFAULT_LINE_DELIMITER;
+ private boolean isDeleteEscapeChar = DEFAULT_IS_DELETE_ESCAPE_CHAR;
public Builder(RowFormatInfo rowFormatInfo) {
super(rowFormatInfo);
@@ -265,6 +281,16 @@ public final class InLongMsgTlogKvFormatDeserializer
extends AbstractInLongMsgFo
return this;
}
+ public Builder setLineDelimiter(Character lineDelimiter) {
+ this.lineDelimiter = lineDelimiter;
+ return this;
+ }
+
+ public Builder setDeleteEscapeChar(boolean isDeleteEscapeChar) {
+ this.isDeleteEscapeChar = isDeleteEscapeChar;
+ return this;
+ }
+
public Builder configure(DescriptorProperties descriptorProperties) {
super.configure(descriptorProperties);
@@ -278,11 +304,29 @@ public final class InLongMsgTlogKvFormatDeserializer
extends AbstractInLongMsgFo
.ifPresent(this::setEntryDelimiter);
descriptorProperties.getOptionalCharacter(FORMAT_KV_DELIMITER)
.ifPresent(this::setKvDelimiter);
+ descriptorProperties.getOptionalCharacter(FORMAT_LINE_DELIMITER)
+ .ifPresent(this::setLineDelimiter);
return this;
}
public InLongMsgTlogKvFormatDeserializer build() {
+ if (failureHandler != null) {
+ return new InLongMsgTlogKvFormatDeserializer(
+ rowFormatInfo,
+ timeFieldName,
+ attributesFieldName,
+ charset,
+ delimiter,
+ entryDelimiter,
+ kvDelimiter,
+ escapeChar,
+ quoteChar,
+ lineDelimiter,
+ nullLiteral,
+ isDeleteEscapeChar,
+ failureHandler);
+ }
return new InLongMsgTlogKvFormatDeserializer(
rowFormatInfo,
timeFieldName,
@@ -293,6 +337,7 @@ public final class InLongMsgTlogKvFormatDeserializer
extends AbstractInLongMsgFo
kvDelimiter,
escapeChar,
quoteChar,
+ lineDelimiter,
nullLiteral,
ignoreErrors);
}
@@ -322,13 +367,15 @@ public final class InLongMsgTlogKvFormatDeserializer
extends AbstractInLongMsgFo
kvDelimiter.equals(that.kvDelimiter) &&
Objects.equals(escapeChar, that.escapeChar) &&
Objects.equals(quoteChar, that.quoteChar) &&
- Objects.equals(nullLiteral, that.nullLiteral);
+ Objects.equals(lineDelimiter, that.lineDelimiter) &&
+ Objects.equals(nullLiteral, that.nullLiteral) &&
+ Objects.equals(isDeleteEscapeChar, that.isDeleteEscapeChar);
}
@Override
public int hashCode() {
return Objects.hash(super.hashCode(), rowFormatInfo, timeFieldName,
attributesFieldName, charset, delimiter, entryDelimiter,
kvDelimiter,
- escapeChar, quoteChar, nullLiteral);
+ escapeChar, quoteChar, lineDelimiter, nullLiteral,
isDeleteEscapeChar);
}
}
diff --git
a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-tlogkv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogkv/InLongMsgTlogKvUtils.java
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-tlogkv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogkv/InLongMsgTlogKvUtils.java
index 7e5dc6f864..f42caf3053 100644
---
a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-tlogkv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogkv/InLongMsgTlogKvUtils.java
+++
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-tlogkv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogkv/InLongMsgTlogKvUtils.java
@@ -20,6 +20,7 @@ package org.apache.inlong.sort.formats.inlongmsgtlogkv;
import org.apache.inlong.common.pojo.sort.dataflow.field.format.FormatInfo;
import org.apache.inlong.common.pojo.sort.dataflow.field.format.RowFormatInfo;
import
org.apache.inlong.sort.formats.base.FieldToRowDataConverters.FieldToRowDataConverter;
+import org.apache.inlong.sort.formats.inlongmsg.FailureHandler;
import org.apache.inlong.sort.formats.inlongmsg.InLongMsgBody;
import org.apache.inlong.sort.formats.inlongmsg.InLongMsgHead;
@@ -27,6 +28,7 @@ import org.apache.flink.table.data.GenericRowData;
import java.nio.charset.Charset;
import java.sql.Timestamp;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
@@ -69,14 +71,14 @@ public class InLongMsgTlogKvUtils {
return new InLongMsgHead(attributes, null, time, predefinedFields);
}
- public static InLongMsgBody parseBody(
+ public static List<InLongMsgBody> parseBody(
byte[] bytes,
String charset,
char delimiter,
char entryDelimiter,
char kvDelimiter,
Character escapeChar,
- Character quoteChar) {
+ Character quoteChar, Character lineDelimiter, boolean
isDeleteEscapeChar) {
String text;
if (bytes[0] == delimiter) {
text = new String(bytes, 1, bytes.length - 1,
Charset.forName(charset));
@@ -87,15 +89,18 @@ public class InLongMsgTlogKvUtils {
String[] segments = splitCsv(text, delimiter, escapeChar, quoteChar);
String streamId = segments[0];
-
- Map<String, String> entries;
+ List<InLongMsgBody> inLongMsgBodies = new ArrayList<>();
+ List<Map<String, String>> entries;
if (segments.length > 1) {
- entries = splitKv(segments[1], entryDelimiter, kvDelimiter,
escapeChar, quoteChar);
+ entries = splitKv(segments[1], entryDelimiter, kvDelimiter,
escapeChar, quoteChar,
+ lineDelimiter, isDeleteEscapeChar);
+ for (Map<String, String> maps : entries) {
+ inLongMsgBodies.add(new InLongMsgBody(null, streamId,
Collections.emptyList(), maps));
+ }
} else {
- entries = Collections.emptyMap();
+ inLongMsgBodies.add(new InLongMsgBody(null, streamId,
Collections.emptyList(), Collections.emptyMap()));
}
-
- return new InLongMsgBody(bytes, streamId, Collections.emptyList(),
entries);
+ return inLongMsgBodies;
}
/**
@@ -112,7 +117,8 @@ public class InLongMsgTlogKvUtils {
String nullLiteral,
List<String> predefinedFields,
Map<String, String> entries,
- FieldToRowDataConverter[] converters) {
+ FieldToRowDataConverter[] converters,
+ FailureHandler failureHandler) throws Exception {
String[] fieldNames = rowFormatInfo.getFieldNames();
FormatInfo[] fieldFormatInfos = rowFormatInfo.getFieldFormatInfos();
@@ -133,7 +139,7 @@ public class InLongMsgTlogKvUtils {
fieldName,
fieldFormatInfo,
fieldText,
- nullLiteral));
+ nullLiteral, failureHandler));
rowData.setField(i, field);
}
@@ -146,7 +152,7 @@ public class InLongMsgTlogKvUtils {
fieldName,
fieldFormatInfo,
fieldText,
- nullLiteral));
+ nullLiteral, failureHandler));
rowData.setField(i, field);
}
diff --git
a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-tlogkv/src/test/java/org/apache/inlong/sort/formats/inlongmsgtlogkv/InLongMsgTlogKvFormatDeserializerTest.java
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-tlogkv/src/test/java/org/apache/inlong/sort/formats/inlongmsgtlogkv/InLongMsgTlogKvFormatDeserializerTest.java
index f8b4c9c519..490fded392 100644
---
a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-tlogkv/src/test/java/org/apache/inlong/sort/formats/inlongmsgtlogkv/InLongMsgTlogKvFormatDeserializerTest.java
+++
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-tlogkv/src/test/java/org/apache/inlong/sort/formats/inlongmsgtlogkv/InLongMsgTlogKvFormatDeserializerTest.java
@@ -47,7 +47,9 @@ import static org.apache.flink.table.api.DataTypes.STRING;
import static
org.apache.inlong.sort.formats.base.TableFormatConstants.DEFAULT_DELIMITER;
import static
org.apache.inlong.sort.formats.base.TableFormatConstants.DEFAULT_ENTRY_DELIMITER;
import static
org.apache.inlong.sort.formats.base.TableFormatConstants.DEFAULT_KV_DELIMITER;
+import static
org.apache.inlong.sort.formats.base.TableFormatConstants.DEFAULT_LINE_DELIMITER;
import static
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.DEFAULT_ATTRIBUTES_FIELD_NAME;
+import static
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.DEFAULT_IS_DELETE_ESCAPE_CHAR;
import static
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.DEFAULT_TIME_FIELD_NAME;
import static
org.apache.inlong.sort.formats.inlongmsgtlogkv.InLongMsgTlogKvUtils.DEFAULT_INLONGMSG_TLOGKV_CHARSET;
import static org.junit.Assert.assertEquals;
@@ -85,7 +87,9 @@ public class InLongMsgTlogKvFormatDeserializerTest {
DEFAULT_KV_DELIMITER,
null,
null,
+ DEFAULT_LINE_DELIMITER,
null,
+ DEFAULT_IS_DELETE_ESCAPE_CHAR,
errorHandler);
InLongMsg inLongMsg1 = InLongMsg.newInLongMsg(true);
@@ -98,7 +102,7 @@ public class InLongMsgTlogKvFormatDeserializerTest {
List<RowData> actualRowDatas = new ArrayList<>();
Collector<RowData> collector = new ListCollector<>(actualRowDatas);
deserializer.flatMap(inLongMsg1.buildArray(), collector);
- assertEquals(0, errorHandler.getRowCount());
+ assertEquals(2, errorHandler.getRowCount());
InLongMsg inLongMsg1Head = InLongMsg.newInLongMsg();
String abNormalAttrs = "m=0&__addcol1_=1&__addcol2_=tes";
@@ -113,10 +117,10 @@ public class InLongMsgTlogKvFormatDeserializerTest {
InLongMsg inLongMsg1 = InLongMsg.newInLongMsg(true);
String attrs = "m=0&t=20200322&__addcol1_=1&__addcol2_=2";
- String body1 = "testInterfaceId1,f1=field1&f2=field2&f3=field3";
- String body2 = "f1=field1&f2=field2&f3=field3";
- String body3 = "f1=field1&f2=field2,f1=field1&f2=field2&f3=field3";
- String body4 = ",testInterfaceId1,f1=field1&f2=field2&f3=field3";
+ String body1 = "testInterfaceId1,f1=field11&f2=field12&f3=field13";
+ String body2 = "f1=field21&f2=field22&f3=field23";
+ String body3 = "f1=field3&f2=field2,f1=field31&f2=field32&f3=field33";
+ String body4 = ",testInterfaceId1,f1=field41&f2=field42&f3=field43";
inLongMsg1.addMsg(attrs, body1.getBytes());
inLongMsg1.addMsg(attrs, body2.getBytes());
@@ -134,9 +138,9 @@ public class InLongMsgTlogKvFormatDeserializerTest {
expectRowData1.setField(1, mapConvert.convert(expectedAttributes));
expectRowData1.setField(2, 1);
expectRowData1.setField(3, 2);
- expectRowData1.setField(4, StringData.fromString("field1"));
- expectRowData1.setField(5, StringData.fromString("field2"));
- expectRowData1.setField(6, StringData.fromString("field3"));
+ expectRowData1.setField(4, StringData.fromString("field11"));
+ expectRowData1.setField(5, StringData.fromString("field12"));
+ expectRowData1.setField(6, StringData.fromString("field13"));
GenericRowData expectRowData2 = new GenericRowData(7);
expectRowData2.setField(0,
TimestampData.fromTimestamp(Timestamp.valueOf("2020-03-22 00:00:00")));
@@ -152,18 +156,18 @@ public class InLongMsgTlogKvFormatDeserializerTest {
expectRowData3.setField(1, mapConvert.convert(expectedAttributes));
expectRowData3.setField(2, 1);
expectRowData3.setField(3, 2);
- expectRowData3.setField(4, StringData.fromString("field1"));
- expectRowData3.setField(5, StringData.fromString("field2"));
- expectRowData3.setField(6, StringData.fromString("field3"));
+ expectRowData3.setField(4, StringData.fromString("field31"));
+ expectRowData3.setField(5, StringData.fromString("field32"));
+ expectRowData3.setField(6, StringData.fromString("field33"));
GenericRowData expectRowData4 = new GenericRowData(7);
expectRowData4.setField(0,
TimestampData.fromTimestamp(Timestamp.valueOf("2020-03-22 00:00:00")));
expectRowData4.setField(1, mapConvert.convert(expectedAttributes));
expectRowData4.setField(2, 1);
expectRowData4.setField(3, 2);
- expectRowData4.setField(4, StringData.fromString("field1"));
- expectRowData4.setField(5, StringData.fromString("field2"));
- expectRowData4.setField(6, StringData.fromString("field3"));
+ expectRowData4.setField(4, StringData.fromString("field41"));
+ expectRowData4.setField(5, StringData.fromString("field42"));
+ expectRowData4.setField(6, StringData.fromString("field43"));
testRowDataDeserialization(inLongMsg1.buildArray(),
Arrays.asList(expectRowData1, expectRowData2, expectRowData3,
expectRowData4));
@@ -219,5 +223,11 @@ public class InLongMsgTlogKvFormatDeserializerTest {
InLongMsgBody body, Exception exception) throws Exception {
rowCount++;
}
+
+ @Override
+ public void onConvertingFieldFailure(String fieldName, String
fieldText, FormatInfo formatInfo,
+ Exception exception) throws Exception {
+ rowCount++;
+ }
}
}
diff --git
a/inlong-sort/sort-formats/format-rowdata/format-rowdata-base/src/test/java/org/apache/inlong/sort/formats/base/TableFormatUtilsTest.java
b/inlong-sort/sort-formats/format-rowdata/format-rowdata-base/src/test/java/org/apache/inlong/sort/formats/base/TableFormatUtilsTest.java
index 22e5b5cb9e..eae08f62ce 100644
---
a/inlong-sort/sort-formats/format-rowdata/format-rowdata-base/src/test/java/org/apache/inlong/sort/formats/base/TableFormatUtilsTest.java
+++
b/inlong-sort/sort-formats/format-rowdata/format-rowdata-base/src/test/java/org/apache/inlong/sort/formats/base/TableFormatUtilsTest.java
@@ -47,13 +47,13 @@ import static org.junit.Assert.assertNull;
public class TableFormatUtilsTest {
@Test
- public void testDeserializeStringWithoutNullLiteral() {
+ public void testDeserializeStringWithoutNullLiteral() throws Exception {
Object result1 =
deserializeBasicField(
"f",
StringFormatInfo.INSTANCE,
"data",
- null);
+ null, null);
assertEquals("data", result1);
Object result2 =
@@ -61,7 +61,7 @@ public class TableFormatUtilsTest {
"f",
StringFormatInfo.INSTANCE,
"",
- null);
+ null, null);
assertEquals("", result2);
}
@@ -85,13 +85,13 @@ public class TableFormatUtilsTest {
}
@Test
- public void testDeserializeStringWithNullLiteral() {
+ public void testDeserializeStringWithNullLiteral() throws Exception {
Object result1 =
deserializeBasicField(
"f",
StringFormatInfo.INSTANCE,
"data",
- "n/a");
+ "n/a", null);
assertEquals("data", result1);
Object result2 =
@@ -99,7 +99,7 @@ public class TableFormatUtilsTest {
"f",
StringFormatInfo.INSTANCE,
"",
- "n/a");
+ "n/a", null);
assertEquals("", result2);
Object result3 =
@@ -107,7 +107,7 @@ public class TableFormatUtilsTest {
"f",
StringFormatInfo.INSTANCE,
"n/a",
- "n/a");
+ "n/a", null);
assertNull(result3);
}
@@ -139,13 +139,13 @@ public class TableFormatUtilsTest {
}
@Test
- public void testDeserializeNumberWithoutNullLiteral() {
+ public void testDeserializeNumberWithoutNullLiteral() throws Exception {
Object result1 =
deserializeBasicField(
"f",
IntFormatInfo.INSTANCE,
"1",
- null);
+ null, null);
assertEquals(1, result1);
Object result2 =
@@ -153,7 +153,7 @@ public class TableFormatUtilsTest {
"f",
IntFormatInfo.INSTANCE,
"",
- null);
+ null, null);
assertNull(result2);
}
@@ -177,13 +177,13 @@ public class TableFormatUtilsTest {
}
@Test
- public void testDeserializeNumberWithNullLiteral() {
+ public void testDeserializeNumberWithNullLiteral() throws Exception {
Object result1 =
deserializeBasicField(
"f",
IntFormatInfo.INSTANCE,
"1",
- "n/a");
+ "n/a", null);
assertEquals(1, result1);
try {
@@ -191,7 +191,7 @@ public class TableFormatUtilsTest {
"f",
IntFormatInfo.INSTANCE,
"",
- "n/a");
+ "n/a", null);
assertNull(result2);
} catch (Exception e) {
@@ -203,7 +203,7 @@ public class TableFormatUtilsTest {
"f",
IntFormatInfo.INSTANCE,
"n/a",
- "n/a");
+ "n/a", null);
assertNull(result3);
}
diff --git
a/inlong-sort/sort-formats/format-rowdata/format-rowdata-csv/src/main/java/org/apache/inlong/sort/formats/csv/CsvRowDataDeserializationSchema.java
b/inlong-sort/sort-formats/format-rowdata/format-rowdata-csv/src/main/java/org/apache/inlong/sort/formats/csv/CsvRowDataDeserializationSchema.java
index 2a7182e39b..665b1b2fc5 100644
---
a/inlong-sort/sort-formats/format-rowdata/format-rowdata-csv/src/main/java/org/apache/inlong/sort/formats/csv/CsvRowDataDeserializationSchema.java
+++
b/inlong-sort/sort-formats/format-rowdata/format-rowdata-csv/src/main/java/org/apache/inlong/sort/formats/csv/CsvRowDataDeserializationSchema.java
@@ -23,6 +23,7 @@ import
org.apache.inlong.sort.formats.base.DefaultDeserializationSchema;
import org.apache.inlong.sort.formats.base.FieldToRowDataConverters;
import
org.apache.inlong.sort.formats.base.FieldToRowDataConverters.FieldToRowDataConverter;
import org.apache.inlong.sort.formats.base.TableFormatUtils;
+import org.apache.inlong.sort.formats.inlongmsg.FailureHandler;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.typeinfo.TypeInformation;
@@ -132,6 +133,30 @@ public final class CsvRowDataDeserializationSchema extends
DefaultDeserializatio
.toArray(FieldToRowDataConverter[]::new);
}
+ public CsvRowDataDeserializationSchema(
+ @Nonnull TypeInformation<RowData> resultTypeInfo,
+ @Nonnull RowFormatInfo rowFormatInfo,
+ @Nonnull String charset,
+ @Nonnull Character delimiter,
+ @Nullable Character escapeChar,
+ @Nullable Character quoteChar,
+ @Nullable String nullLiteral,
+ FailureHandler failureHandler) {
+ super(failureHandler);
+ this.resultTypeInfo = resultTypeInfo;
+ this.rowFormatInfo = rowFormatInfo;
+ this.charset = charset;
+ this.delimiter = delimiter;
+ this.escapeChar = escapeChar;
+ this.quoteChar = quoteChar;
+ this.nullLiteral = nullLiteral;
+
+ converters = Arrays.stream(rowFormatInfo.getFieldFormatInfos())
+ .map(formatInfo -> FieldToRowDataConverters.createConverter(
+ TableFormatUtils.deriveLogicalType(formatInfo)))
+ .toArray(FieldToRowDataConverter[]::new);
+ }
+
/**
* A builder for creating a {@link CsvRowDataDeserializationSchema}.
*/
@@ -236,7 +261,7 @@ public final class CsvRowDataDeserializationSchema extends
DefaultDeserializatio
fieldNames[i],
fieldFormatInfos[i],
fieldTexts[i],
- nullLiteral);
+ nullLiteral, failureHandler);
rowData.setField(i, converters[i].convert(field));
}
diff --git
a/inlong-sort/sort-formats/format-rowdata/format-rowdata-kv/src/main/java/org/apache/inlong/sort/formats/kv/KvRowDataDeserializationSchema.java
b/inlong-sort/sort-formats/format-rowdata/format-rowdata-kv/src/main/java/org/apache/inlong/sort/formats/kv/KvRowDataDeserializationSchema.java
index 0f4e2b4a63..771cc5bd17 100644
---
a/inlong-sort/sort-formats/format-rowdata/format-rowdata-kv/src/main/java/org/apache/inlong/sort/formats/kv/KvRowDataDeserializationSchema.java
+++
b/inlong-sort/sort-formats/format-rowdata/format-rowdata-kv/src/main/java/org/apache/inlong/sort/formats/kv/KvRowDataDeserializationSchema.java
@@ -22,6 +22,7 @@ import
org.apache.inlong.common.pojo.sort.dataflow.field.format.RowFormatInfo;
import org.apache.inlong.sort.formats.base.DefaultDeserializationSchema;
import org.apache.inlong.sort.formats.base.FieldToRowDataConverters;
import org.apache.inlong.sort.formats.base.TableFormatForRowDataUtils;
+import org.apache.inlong.sort.formats.inlongmsg.FailureHandler;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.table.data.GenericRowData;
@@ -33,6 +34,7 @@ import javax.annotation.Nullable;
import java.io.IOException;
import java.nio.charset.Charset;
import java.util.Arrays;
+import java.util.List;
import java.util.Map;
import java.util.Objects;
@@ -144,13 +146,40 @@ public class KvRowDataDeserializationSchema extends
DefaultDeserializationSchema
.toArray(FieldToRowDataConverters.FieldToRowDataConverter[]::new);
}
+ public KvRowDataDeserializationSchema(
+ @Nonnull RowFormatInfo rowFormatInfo,
+ @Nonnull TypeInformation<RowData> producedTypeInfo,
+ @Nonnull String charset,
+ @Nonnull Character entryDelimiter,
+ @Nonnull Character kvDelimiter,
+ @Nullable Character escapeChar,
+ @Nullable Character quoteChar,
+ @Nullable String nullLiteral,
+ @Nullable FailureHandler failureHandler) {
+ super(failureHandler);
+ this.rowFormatInfo = rowFormatInfo;
+ this.producedTypeInfo = producedTypeInfo;
+ this.charset = charset;
+ this.entryDelimiter = entryDelimiter;
+ this.kvDelimiter = kvDelimiter;
+ this.escapeChar = escapeChar;
+ this.quoteChar = quoteChar;
+ this.nullLiteral = nullLiteral;
+
+ converters = Arrays.stream(rowFormatInfo.getFieldFormatInfos())
+ .map(formatInfo -> FieldToRowDataConverters.createConverter(
+
TableFormatForRowDataUtils.deriveLogicalType(formatInfo)))
+
.toArray(FieldToRowDataConverters.FieldToRowDataConverter[]::new);
+ }
+
@Override
public RowData deserializeInternal(byte[] bytes) throws Exception {
String text = new String(bytes, Charset.forName(charset));
GenericRowData rowData = null;
try {
- Map<String, String> fieldTexts =
- splitKv(text, entryDelimiter, kvDelimiter, escapeChar,
quoteChar);
+ List<Map<String, String>> fieldTexts =
+ splitKv(text, entryDelimiter, kvDelimiter, escapeChar,
quoteChar, null,
+ true);
String[] fieldNames = rowFormatInfo.getFieldNames();
FormatInfo[] fieldFormatInfos =
rowFormatInfo.getFieldFormatInfos();
@@ -160,13 +189,13 @@ public class KvRowDataDeserializationSchema extends
DefaultDeserializationSchema
String fieldName = fieldNames[i];
FormatInfo fieldFormatInfo = fieldFormatInfos[i];
- String fieldText = fieldTexts.get(fieldName);
+ String fieldText = fieldTexts.get(0).get(fieldName);
Object field = deserializeBasicField(
fieldName,
fieldFormatInfo,
fieldText,
- nullLiteral);
+ nullLiteral, failureHandler);
rowData.setField(i, converters[i].convert(field));
}
return rowData;
@@ -199,6 +228,17 @@ public class KvRowDataDeserializationSchema extends
DefaultDeserializationSchema
}
public KvRowDataDeserializationSchema build() {
+ if (failureHandler != null) {
+ return new KvRowDataDeserializationSchema(
+ rowFormatInfo,
+ producedTypeInfo,
+ charset,
+ entryDelimiter,
+ kvDelimiter,
+ escapeChar,
+ quoteChar,
+ nullLiteral, failureHandler);
+ }
return new KvRowDataDeserializationSchema(
rowFormatInfo,
producedTypeInfo,
@@ -229,7 +269,8 @@ public class KvRowDataDeserializationSchema extends
DefaultDeserializationSchema
kvDelimiter.equals(that.kvDelimiter) &&
Objects.equals(escapeChar, that.escapeChar) &&
Objects.equals(quoteChar, that.quoteChar) &&
- Objects.equals(nullLiteral, that.nullLiteral);
+ Objects.equals(nullLiteral, that.nullLiteral) &&
+ Objects.equals(failureHandler, that.failureHandler);
}
@Override
@@ -242,6 +283,6 @@ public class KvRowDataDeserializationSchema extends
DefaultDeserializationSchema
kvDelimiter,
escapeChar,
quoteChar,
- nullLiteral);
+ nullLiteral, failureHandler);
}
}
diff --git
a/inlong-sort/sort-formats/format-rowdata/format-rowdata-kv/src/test/java/org/apache/inlong/sort/formats/kv/KvUtilsTest.java
b/inlong-sort/sort-formats/format-rowdata/format-rowdata-kv/src/test/java/org/apache/inlong/sort/formats/kv/KvUtilsTest.java
index 9452395f59..2376ff198f 100644
---
a/inlong-sort/sort-formats/format-rowdata/format-rowdata-kv/src/test/java/org/apache/inlong/sort/formats/kv/KvUtilsTest.java
+++
b/inlong-sort/sort-formats/format-rowdata/format-rowdata-kv/src/test/java/org/apache/inlong/sort/formats/kv/KvUtilsTest.java
@@ -37,7 +37,8 @@ public class KvUtilsTest {
@Test
public void testSplitNormal() {
List<Map<String, String>> list =
- splitKv("f1=a\nf1=b", '&', '=', '\\', '\"', '\n');
+ splitKv("f1=a\nf1=b", '&', '=', '\\', '\"',
+ '\n', true);
List<Map<String, String>> expectedList = new ArrayList<>();
expectedList.add(new HashMap<String, String>() {
@@ -195,6 +196,103 @@ public class KvUtilsTest {
splitKv("=a&f=", '&', '=', '\\', '\"'));
}
+ @Test
+ public void testSplitNormalWithoutEscape() {
+ List<Map<String, String>> list =
+ splitKv("f1=a\nf1=b", '&', '=', '\\', '\"',
+ '\n', false);
+ List<Map<String, String>> expectedList = new ArrayList<>();
+ expectedList.add(new HashMap<String, String>() {
+
+ {
+ put("f1", "a");
+ }
+ });
+ expectedList.add(new HashMap<String, String>() {
+
+ {
+ put("f1", "b");
+ }
+ });
+ assertEquals(list, expectedList);
+ ArrayList expectedListMap = new ArrayList<HashMap<String, String>>();
+ HashMap expectedMap = new HashMap<String, String>();
+ expectedMap.put("\\=f1", "a");
+ expectedMap.put("f2", "b");
+ expectedMap.put("f3", "c");
+ expectedListMap.add(expectedMap);
+ assertEquals(expectedListMap,
+ splitKv("\\=f1=a&f2=b&f3=c", '&', '=', '\\',
+ null, null, false));
+
+ expectedMap.clear();
+ expectedMap.put("\\&f1", "a");
+ expectedMap.put("f2", "b");
+ expectedMap.put("f3", "c");
+ assertEquals(expectedListMap,
+ splitKv("\\&f1=a&f2=b&f3=c", '&', '=', '\\',
+ null, null, false));
+
+ expectedMap.clear();
+ expectedMap.put("&f1", "a");
+ expectedMap.put("f2", "b");
+ expectedMap.put("f3", "c");
+ assertEquals(expectedListMap,
+ splitKv("\"&f1\"=a&f2=b&f3=c", '&', '=', '\\',
+ '\"', null, false));
+
+ expectedMap.clear();
+ expectedMap.put("f1", "a\\&");
+ expectedMap.put("f2", "b");
+ expectedMap.put("f3", "c");
+ assertEquals(expectedListMap,
+ splitKv("f1=a\\&&f2=b&f3=c", '&', '=', '\\',
+ null, null, false));
+
+ expectedMap.clear();
+ expectedMap.put("f1", "a\\\\");
+ expectedMap.put("f2", "b");
+ expectedMap.put("f3", "c");
+ assertEquals(expectedListMap,
+ splitKv("f1=a\\\\&f2=b&f3=c", '&', '=',
+ '\\', null, null, false));
+
+ expectedMap.clear();
+ expectedMap.put("f1", "a&f2=b");
+ expectedMap.put("f3", "c");
+ expectedMap.put("f4", "d");
+ assertEquals(expectedListMap,
+ splitKv("f1=a\"&f2=\"b&f3=c&f4=d", '&', '=',
+ '\\', '\"', null, false));
+
+ expectedMap.clear();
+ expectedMap.put("f1", "atest\\test");
+ expectedMap.put("f2", "b");
+ expectedMap.put("f3", "c");
+ assertEquals(expectedListMap,
+ splitKv("f1=a\"test\\test\"&f2=b&f3=c", '&', '=', '\\',
+ '\"', null, false));
+
+ expectedMap.clear();
+ expectedMap.put("f1", "a");
+ expectedMap.put("f2", "\\\"b");
+ expectedMap.put("f3", "c\\\"");
+ expectedMap.put("f4", "d");
+ assertEquals(expectedListMap,
+ splitKv("f1=a&f2=\\\"b&f3=c\\\"&f4=d", '&', '=', '\\',
+ '\"', null, false));
+
+ expectedMap.clear();
+ expectedMap.put("", "a");
+ expectedMap.put("f", "");
+ HashMap expectedMap2 = new HashMap<String, String>();
+ expectedMap2.put("c", "dd");
+ expectedListMap.add(expectedMap2);
+ assertEquals(expectedListMap,
+ splitKv("=a&f=\nc=d\\d", '&', '=', '\\',
+ '\"', '\n', true));
+ }
+
@Test
public void testSplitNestedValue() {
Map<String, String> kvMap = splitKv("f1=a=a&f2=b&f3=c", '&', '=',