This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 4f522d6e9 [INLONG-7061][Sort] Support table level metrics for Apache
Doris connector and add dirty metrics (#7062)
4f522d6e9 is described below
commit 4f522d6e999b34bd694d6c1398b6a2cd27b1a51e
Author: Yizhou Yang <[email protected]>
AuthorDate: Tue Jan 3 14:06:22 2023 +0800
[INLONG-7061][Sort] Support table level metrics for Apache Doris connector
and add dirty metrics (#7062)
---
.../sort/base/dirty/sink/s3/S3DirtySink.java | 3 +-
.../inlong/sort/base/dirty/sink/s3/S3Helper.java | 2 +-
.../sort/base/metric/sub/SinkTableMetricData.java | 28 +++++++++++++
.../table/DorisDynamicSchemaOutputFormat.java | 48 +++++++++++++++++++++-
4 files changed, 78 insertions(+), 3 deletions(-)
diff --git
a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/s3/S3DirtySink.java
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/s3/S3DirtySink.java
index ab8fc9464..b8f1f5f10 100644
---
a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/s3/S3DirtySink.java
+++
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/s3/S3DirtySink.java
@@ -22,6 +22,7 @@ import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.client.builder.AwsClientBuilder;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.AmazonS3ClientBuilder;
+import org.apache.commons.lang.StringEscapeUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.configuration.Configuration;
import
org.apache.flink.formats.json.RowDataToJsonConverters.RowDataToJsonConverter;
@@ -244,7 +245,7 @@ public class S3DirtySink<T> implements DirtySink<T> {
}
String content = null;
try {
- content = StringUtils.join(values, s3Options.getLineDelimiter());
+ content = StringUtils.join(values,
StringEscapeUtils.unescapeJava(s3Options.getLineDelimiter()));
s3Helper.upload(identifier, content);
LOGGER.info("Write {} records to s3 of identifier: {}",
values.size(), identifier);
writeOutNum.addAndGet(values.size());
diff --git
a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/s3/S3Helper.java
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/s3/S3Helper.java
index d79b8aecd..f925d76e8 100644
---
a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/s3/S3Helper.java
+++
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/s3/S3Helper.java
@@ -39,7 +39,7 @@ public class S3Helper implements Serializable {
private static final DateTimeFormatter DATE_TIME_FORMAT =
DateTimeFormatter.ofPattern("yyyyMMddHHmmss");
private static final int SEQUENCE_LENGTH = 4;
- private static final String ESCAPE_PATTERN = "[\\pP\\p{Punct}\\s]";
+ private static final String ESCAPE_PATTERN = "[,,+=: ;()()。/.;]";
private static final String FILE_NAME_SUFFIX = ".txt";
private final Random r = new Random();
private final AmazonS3 s3Client;
diff --git
a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/sub/SinkTableMetricData.java
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/sub/SinkTableMetricData.java
index cf5285b13..a5690a5b5 100644
---
a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/sub/SinkTableMetricData.java
+++
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/sub/SinkTableMetricData.java
@@ -196,6 +196,34 @@ public class SinkTableMetricData extends SinkMetricData
implements SinkSubMetric
subSinkMetricData.invoke(rowCount, rowSize);
}
+ /**
+ * output dirty metrics with estimate
+ *
+ * @param database the database name of record
+ * @param schema the schema name of record
+ * @param table the table name of record
+ * @param rowCount the row count of records
+ * @param rowSize the row size of records
+ */
+ public void outputDirtyMetricsWithEstimate(String database, String table,
long rowCount,
+ long rowSize) {
+ if (StringUtils.isBlank(database) || StringUtils.isBlank(table)) {
+ invokeDirty(rowCount, rowSize);
+ return;
+ }
+ String identify = buildSchemaIdentify(database, null, table);
+ SinkMetricData subSinkMetricData;
+ if (subSinkMetricMap.containsKey(identify)) {
+ subSinkMetricData = subSinkMetricMap.get(identify);
+ } else {
+ subSinkMetricData = buildSubSinkMetricData(new String[]{database,
table}, this);
+ subSinkMetricMap.put(identify, subSinkMetricData);
+ }
+ // sink metric and sub sink metric output metrics
+ this.invokeDirty(rowCount, rowSize);
+ subSinkMetricData.invokeDirty(rowCount, rowSize);
+ }
+
public void outputMetricsWithEstimate(Object data) {
long size = data.toString().getBytes(StandardCharsets.UTF_8).length;
invoke(1, size);
diff --git
a/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicSchemaOutputFormat.java
b/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicSchemaOutputFormat.java
index 1b9bc4c48..efa04a630 100644
---
a/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicSchemaOutputFormat.java
+++
b/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicSchemaOutputFormat.java
@@ -76,6 +76,8 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
+import static org.apache.inlong.sort.base.Constants.DIRTY_BYTES_OUT;
+import static org.apache.inlong.sort.base.Constants.DIRTY_RECORDS_OUT;
import static org.apache.inlong.sort.base.Constants.INLONG_METRIC_STATE_NAME;
import static org.apache.inlong.sort.base.Constants.NUM_BYTES_OUT;
import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_OUT;
@@ -141,7 +143,6 @@ public class DorisDynamicSchemaOutputFormat<T> extends
RichOutputFormat<T> {
private transient MetricState metricState;
private final String[] fieldNames;
private volatile boolean jsonFormat;
- private String keysType;
private volatile RowData.FieldGetter[] fieldGetters;
private String fieldDelimiter;
private String lineDelimiter;
@@ -267,6 +268,8 @@ public class DorisDynamicSchemaOutputFormat<T> extends
RichOutputFormat<T> {
.withInlongAudit(auditHostAndPorts)
.withInitRecords(metricState != null ?
metricState.getMetricValue(NUM_RECORDS_OUT) : 0L)
.withInitBytes(metricState != null ?
metricState.getMetricValue(NUM_BYTES_OUT) : 0L)
+ .withInitDirtyRecords(metricState != null ?
metricState.getMetricValue(DIRTY_RECORDS_OUT) : 0L)
+ .withInitDirtyBytes(metricState != null ?
metricState.getMetricValue(DIRTY_BYTES_OUT) : 0L)
.withRegisterMetric(MetricOption.RegisteredMetric.ALL)
.build();
if (metricOption != null) {
@@ -486,6 +489,12 @@ public class DorisDynamicSchemaOutputFormat<T> extends
RichOutputFormat<T> {
}
throw ex;
}
+
+ if (multipleSink) {
+ handleMultipleDirtyData(dirtyData, dirtyType, e);
+ return;
+ }
+
if (dirtySink != null) {
DirtyData.Builder<Object> builder = DirtyData.builder();
try {
@@ -503,6 +512,43 @@ public class DorisDynamicSchemaOutputFormat<T> extends
RichOutputFormat<T> {
LOG.warn("Dirty sink failed", ex);
}
}
+ metricData.invokeDirty(1,
dirtyData.toString().getBytes(StandardCharsets.UTF_8).length);
+ }
+
+ private void handleMultipleDirtyData(Object dirtyData, DirtyType
dirtyType, Exception e) {
+ JsonNode rootNode;
+ try {
+ rootNode = jsonDynamicSchemaFormat.deserialize(((RowData)
dirtyData).getBinary(0));
+ } catch (Exception ex) {
+ handleDirtyData(dirtyData, DirtyType.DESERIALIZE_ERROR, e);
+ return;
+ }
+
+ if (dirtySink != null) {
+ DirtyData.Builder<Object> builder = DirtyData.builder();
+ try {
+ builder.setData(dirtyData)
+ .setDirtyType(dirtyType)
+ .setLabels(jsonDynamicSchemaFormat.parse(rootNode,
dirtyOptions.getLabels()))
+ .setLogTag(jsonDynamicSchemaFormat.parse(rootNode,
dirtyOptions.getLogTag()))
+ .setDirtyMessage(e.getMessage())
+ .setIdentifier(jsonDynamicSchemaFormat.parse(rootNode,
dirtyOptions.getIdentifier()));
+ dirtySink.invoke(builder.build());
+ } catch (Exception ex) {
+ if (!dirtyOptions.ignoreSideOutputErrors()) {
+ throw new RuntimeException(ex);
+ }
+ LOG.warn("Dirty sink failed", ex);
+ }
+ }
+ try {
+ metricData.outputDirtyMetricsWithEstimate(
+ jsonDynamicSchemaFormat.parse(rootNode, databasePattern),
+ jsonDynamicSchemaFormat.parse(rootNode, tablePattern), 1,
+ ((RowData) dirtyData).getBinary(0).length);
+ } catch (Exception ex) {
+ metricData.invokeDirty(1,
dirtyData.toString().getBytes(StandardCharsets.UTF_8).length);
+ }
}
private void handleColumnsChange(String tableIdentifier, JsonNode
rootNode, JsonNode physicalData) {