This is an automated email from the ASF dual-hosted git repository.
zirui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 9c6908e2b [INLONG-7292][Sort] Accurately detect and archive dirty data
for Doris/StarRocks/JDBC (#7289)
9c6908e2b is described below
commit 9c6908e2b236e35f9b102499243f6f9a191d466a
Author: Yizhou Yang <[email protected]>
AuthorDate: Fri Feb 3 10:30:53 2023 +0800
[INLONG-7292][Sort] Accurately detect and archive dirty data for
Doris/StarRocks/JDBC (#7289)
---
.../inlong/sort/base/dirty/DirtySinkHelper.java | 124 ++++++++++++++++++---
.../inlong/sort/base/dirty/RegexReplaceTest.java | 38 +++++++
.../table/DorisDynamicSchemaOutputFormat.java | 8 +-
.../internal/JdbcMultiBatchingOutputFormat.java | 4 +-
.../starrocks/manager/StarRocksSinkManager.java | 9 +-
5 files changed, 162 insertions(+), 21 deletions(-)
diff --git
a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/DirtySinkHelper.java
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/DirtySinkHelper.java
index e815db856..3f3b14b6a 100644
---
a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/DirtySinkHelper.java
+++
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/DirtySinkHelper.java
@@ -28,7 +28,14 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
+import java.io.IOException;
import java.io.Serializable;
+import java.time.LocalDateTime;
+import java.time.format.DateTimeFormatter;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
/**
* Dirty sink helper, it helps dirty data sink for {@link DirtySink}
@@ -38,6 +45,7 @@ public class DirtySinkHelper<T> implements Serializable {
private static final long serialVersionUID = 1L;
private static final Logger LOGGER =
LoggerFactory.getLogger(DirtySinkHelper.class);
+ static final Pattern REGEX_PATTERN =
Pattern.compile("\\$\\{\\s*([\\w.-]+)\\s*}", Pattern.CASE_INSENSITIVE);
private DirtyOptions dirtyOptions;
private final @Nullable DirtySink<T> dirtySink;
@@ -97,10 +105,8 @@ public class DirtySinkHelper<T> implements Serializable {
}
}
- public void invokeMultiple(T dirtyData, DirtyType dirtyType, Throwable e,
+ public void invokeMultiple(String tableIdentifier, T dirtyData, DirtyType
dirtyType, Throwable e,
String sinkMultipleFormat) {
- JsonDynamicSchemaFormat jsonDynamicSchemaFormat =
- (JsonDynamicSchemaFormat)
DynamicSchemaFormatFactory.getFormat(sinkMultipleFormat);
if (!dirtyOptions.ignoreDirty()) {
RuntimeException ex;
if (e instanceof RuntimeException) {
@@ -110,22 +116,60 @@ public class DirtySinkHelper<T> implements Serializable {
}
throw ex;
}
- if (dirtySink != null) {
- JsonNode rootNode;
- DirtyData.Builder<T> builder = DirtyData.builder();
- try {
+
+ JsonDynamicSchemaFormat jsonDynamicSchemaFormat =
+ (JsonDynamicSchemaFormat)
DynamicSchemaFormatFactory.getFormat(sinkMultipleFormat);
+ JsonNode rootNode = null;
+ String[] actualIdentifier = tableIdentifier.split("\\.");;
+
+ try {
+ // for rowdata where identifier is not the first element, append
identifier and SEPARATOR before it.
+ if (dirtyData instanceof RowData) {
rootNode = jsonDynamicSchemaFormat.deserialize(((RowData)
dirtyData).getBinary(0));
- } catch (Exception ex) {
- invoke(dirtyData, DirtyType.DESERIALIZE_ERROR, e);
- return;
+ handleDirty(dirtyType, e, null, rootNode,
jsonDynamicSchemaFormat, dirtyData);
+ } else if (dirtyData instanceof JsonNode) {
+ rootNode = (JsonNode) dirtyData;
+ handleDirty(dirtyType, e, null, rootNode,
jsonDynamicSchemaFormat, dirtyData);
+ } else if (dirtyData instanceof String) {
+ handleDirty(dirtyType, e, actualIdentifier, null,
jsonDynamicSchemaFormat, dirtyData);
+ } else {
+ throw new Exception("unidentified dirty data " + dirtyData);
}
+ } catch (Exception ex) {
+ LOGGER.warn("parse dirty data {} of class {} failed", dirtyData,
dirtyData.getClass());
+ invoke(dirtyData, DirtyType.DESERIALIZE_ERROR, e);
+ }
+ }
+
+ private void handleDirty(DirtyType dirtyType, Throwable e,
+ String[] actualIdentifier, JsonNode rootNode,
JsonDynamicSchemaFormat jsonDynamicSchemaFormat,
+ T dirtyData) {
+ if (dirtySink != null) {
+ DirtyData.Builder<T> builder = DirtyData.builder();
try {
- builder.setData(dirtyData)
- .setDirtyType(dirtyType)
- .setLabels(jsonDynamicSchemaFormat.parse(rootNode,
dirtyOptions.getLabels()))
- .setLogTag(jsonDynamicSchemaFormat.parse(rootNode,
dirtyOptions.getLogTag()))
- .setDirtyMessage(e.getMessage())
- .setIdentifier(jsonDynamicSchemaFormat.parse(rootNode,
dirtyOptions.getIdentifier()));
+ if (rootNode != null) {
+ String labels = regexReplace(dirtyOptions.getLabels(),
dirtyType, e.getMessage(), null);
+ String logTag = regexReplace(dirtyOptions.getLogTag(),
dirtyType, e.getMessage(), null);
+ String identifier =
regexReplace(dirtyOptions.getIdentifier(), dirtyType, e.getMessage(), null);
+ builder.setData(dirtyData)
+ .setDirtyType(dirtyType)
+ .setLabels(jsonDynamicSchemaFormat.parse(rootNode,
labels))
+ .setLogTag(jsonDynamicSchemaFormat.parse(rootNode,
logTag))
+ .setDirtyMessage(e.getMessage())
+
.setIdentifier(jsonDynamicSchemaFormat.parse(rootNode, identifier));
+ } else {
+ // for dirty data without proper rootnode, parse
completely into string literals
+ String labels = regexReplace(dirtyOptions.getLabels(),
dirtyType, e.getMessage(), actualIdentifier);
+ String logTag = regexReplace(dirtyOptions.getLogTag(),
dirtyType, e.getMessage(), actualIdentifier);
+ String identifier =
+ regexReplace(dirtyOptions.getIdentifier(),
dirtyType, e.getMessage(), actualIdentifier);
+ builder.setData(dirtyData)
+ .setDirtyType(dirtyType)
+ .setLabels(labels)
+ .setLogTag(logTag)
+ .setDirtyMessage(e.getMessage())
+ .setIdentifier(identifier);
+ }
dirtySink.invoke(builder.build());
} catch (Exception ex) {
if (!dirtyOptions.ignoreSideOutputErrors()) {
@@ -136,6 +180,54 @@ public class DirtySinkHelper<T> implements Serializable {
}
}
+ public static String regexReplace(String pattern, DirtyType dirtyType,
+ String dirtyMessage, String[] actualIdentifier) throws IOException
{
+
+ if (pattern == null) {
+ return null;
+ }
+
+ final String DIRTY_TYPE_KEY = "DIRTY_TYPE";
+ final String DIRTY_MESSAGE_KEY = "DIRTY_MESSAGE";
+ final String SYSTEM_TIME_KEY = "SYSTEM_TIME";
+ final DateTimeFormatter DATE_TIME_FORMAT =
DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
+
+ Map<String, String> paramMap = new HashMap<>();
+ paramMap.put(SYSTEM_TIME_KEY,
DATE_TIME_FORMAT.format(LocalDateTime.now()));
+ paramMap.put(DIRTY_TYPE_KEY, dirtyType.format());
+ paramMap.put(DIRTY_MESSAGE_KEY, dirtyMessage);
+
+ Matcher matcher = REGEX_PATTERN.matcher(pattern);
+
+ // if RootNode is not available, generate a complete paramMap with
{database} {table},etc.
+ if (actualIdentifier != null) {
+ int i = 0;
+ while (matcher.find()) {
+ try {
+ String keyText = matcher.group(1);
+ int finalI = i;
+ paramMap.computeIfAbsent(keyText, k ->
actualIdentifier[finalI]);
+ } catch (Exception e) {
+ throw new IOException("param map replacement failed", e);
+ }
+ i++;
+ }
+ }
+
+ matcher = REGEX_PATTERN.matcher(pattern);
+ StringBuffer sb = new StringBuffer();
+ while (matcher.find()) {
+ String keyText = matcher.group(1);
+ String replacement = paramMap.get(keyText);
+ if (replacement == null) {
+ continue;
+ }
+ matcher.appendReplacement(sb, replacement);
+ }
+ matcher.appendTail(sb);
+ return sb.toString();
+ }
+
public void setDirtyOptions(DirtyOptions dirtyOptions) {
this.dirtyOptions = dirtyOptions;
}
diff --git
a/inlong-sort/sort-connectors/base/src/test/java/org/apache/inlong/sort/base/dirty/RegexReplaceTest.java
b/inlong-sort/sort-connectors/base/src/test/java/org/apache/inlong/sort/base/dirty/RegexReplaceTest.java
new file mode 100644
index 000000000..f6b21ed26
--- /dev/null
+++
b/inlong-sort/sort-connectors/base/src/test/java/org/apache/inlong/sort/base/dirty/RegexReplaceTest.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.base.dirty;
+
+import lombok.extern.slf4j.Slf4j;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+
+@Slf4j
+public class RegexReplaceTest {
+
+ @Test
+ public void testRegexReplacement() throws IOException {
+ String[] identifier = new String[2];
+ identifier[0] = "yizhouyang";
+ identifier[1] = "table2";
+ String pattern = "${database}-${table}-${DIRTY_MESSAGE}";
+ String answer = DirtySinkHelper.regexReplace(pattern,
DirtyType.BATCH_LOAD_ERROR, "mock message", identifier);
+ Assert.assertEquals("yizhouyang-table2-mock message", answer);
+ }
+}
diff --git
a/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicSchemaOutputFormat.java
b/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicSchemaOutputFormat.java
index 0878124f5..93e68370a 100644
---
a/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicSchemaOutputFormat.java
+++
b/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicSchemaOutputFormat.java
@@ -499,8 +499,12 @@ public class DorisDynamicSchemaOutputFormat<T> extends
RichOutputFormat<T> {
}
if (multipleSink) {
- handleMultipleDirtyData(dirtyData, dirtyType, e);
- return;
+ if (dirtyType == DirtyType.DESERIALIZE_ERROR) {
+ LOG.error("database and table can't be identified, will use
default ${database}${table}");
+ } else {
+ handleMultipleDirtyData(dirtyData, dirtyType, e);
+ return;
+ }
}
if (dirtySink != null) {
diff --git
a/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/JdbcMultiBatchingOutputFormat.java
b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/JdbcMultiBatchingOutputFormat.java
index ead5b58a3..698290f91 100644
---
a/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/JdbcMultiBatchingOutputFormat.java
+++
b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/JdbcMultiBatchingOutputFormat.java
@@ -572,7 +572,9 @@ public class JdbcMultiBatchingOutputFormat<In, JdbcIn,
JdbcExec extends JdbcBatc
outputMetrics(tableIdentifier,
Long.valueOf(tableIdRecordList.size()),
1L, true);
if
(!schemaUpdateExceptionPolicy.equals(SchemaUpdateExceptionPolicy.THROW_WITH_STOP))
{
- dirtySinkHelper.invokeMultiple(record,
DirtyType.RETRY_LOAD_ERROR, tableException,
+ dirtySinkHelper.invokeMultiple(
+ tableIdentifier, record.toString(),
+ DirtyType.RETRY_LOAD_ERROR, tableException,
sinkMultipleFormat);
}
tableExceptionMap.put(tableIdentifier, tableException);
diff --git
a/inlong-sort/sort-connectors/starrocks/src/main/java/org/apache/inlong/sort/starrocks/manager/StarRocksSinkManager.java
b/inlong-sort/sort-connectors/starrocks/src/main/java/org/apache/inlong/sort/starrocks/manager/StarRocksSinkManager.java
index 19c762192..6e10725e8 100644
---
a/inlong-sort/sort-connectors/starrocks/src/main/java/org/apache/inlong/sort/starrocks/manager/StarRocksSinkManager.java
+++
b/inlong-sort/sort-connectors/starrocks/src/main/java/org/apache/inlong/sort/starrocks/manager/StarRocksSinkManager.java
@@ -451,12 +451,17 @@ public class StarRocksSinkManager implements Serializable
{
// archive dirty data
if
(StarRocksSinkOptions.StreamLoadFormat.CSV.equals(sinkOptions.getStreamLoadFormat()))
{
for (byte[] row : flushData.getBuffer()) {
- dirtySinkHelper.invokeMultiple(new String(row,
StandardCharsets.UTF_8), DirtyType.BATCH_LOAD_ERROR, e,
+ dirtySinkHelper.invokeMultiple(
+ flushData.getDatabase() + "." + flushData.getTable(),
+ new String(row, StandardCharsets.UTF_8),
+ DirtyType.BATCH_LOAD_ERROR, e,
sinkMultipleFormat);
}
} else if
(StarRocksSinkOptions.StreamLoadFormat.JSON.equals(sinkOptions.getStreamLoadFormat()))
{
for (byte[] row : flushData.getBuffer()) {
- dirtySinkHelper.invokeMultiple(OBJECT_MAPPER.readTree(new
String(row, StandardCharsets.UTF_8)),
+ dirtySinkHelper.invokeMultiple(
+ flushData.getDatabase() + "." + flushData.getTable(),
+ OBJECT_MAPPER.readTree(new String(row,
StandardCharsets.UTF_8)).toString(),
DirtyType.BATCH_LOAD_ERROR, e, sinkMultipleFormat);
}
}