This is an automated email from the ASF dual-hosted git repository.
pacinogong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new d0124270a [INLONG-6899][Sort] StarRocks supports table level metric
(#6903)
d0124270a is described below
commit d0124270a7b7778093c33e87288e0c4e06258cac
Author: Liao Rui <[email protected]>
AuthorDate: Wed Dec 21 14:27:47 2022 +0800
[INLONG-6899][Sort] StarRocks supports table level metric (#6903)
---
.../sort/base/metric/sub/SinkSubMetricData.java | 11 +-
.../sort/base/metric/sub/SinkTableMetricData.java | 216 +++++++++++++++++++++
.../sort/base/metric/sub/SinkTopicMetricData.java | 2 +-
.../inlong/sort/base/util/MetricStateUtils.java | 2 +-
.../starrocks/manager/StarRocksSinkManager.java | 13 +-
.../table/sink/StarRocksDynamicSinkFunction.java | 10 +-
6 files changed, 240 insertions(+), 14 deletions(-)
diff --git
a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/sub/SinkSubMetricData.java
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/sub/SinkSubMetricData.java
index 143a925ca..3f863a317 100644
---
a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/sub/SinkSubMetricData.java
+++
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/sub/SinkSubMetricData.java
@@ -17,10 +17,12 @@
package org.apache.inlong.sort.base.metric.sub;
-import org.apache.inlong.sort.base.metric.SinkMetricData;
-
import java.util.Map;
+import org.apache.inlong.sort.base.metric.SinkMetricData;
+/**
+ * A collection class for handling sub metrics
+ */
public interface SinkSubMetricData {
/**
@@ -28,6 +30,5 @@ public interface SinkSubMetricData {
*
* @return The sub sink metric map
*/
- Map<String, SinkMetricData> getSubSourceMetricMap();
-
-}
+ Map<String, SinkMetricData> getSubSinkMetricMap();
+}
\ No newline at end of file
diff --git
a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/sub/SinkTableMetricData.java
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/sub/SinkTableMetricData.java
new file mode 100644
index 000000000..82dcf74f1
--- /dev/null
+++
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/sub/SinkTableMetricData.java
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.base.metric.sub;
+
+import static org.apache.inlong.sort.base.Constants.DELIMITER;
+import static org.apache.inlong.sort.base.Constants.DIRTY_BYTES_OUT;
+import static org.apache.inlong.sort.base.Constants.DIRTY_RECORDS_OUT;
+import static org.apache.inlong.sort.base.Constants.NUM_BYTES_OUT;
+import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_OUT;
+
+import com.google.common.collect.Maps;
+import java.nio.charset.StandardCharsets;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.stream.Collectors;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.inlong.sort.base.Constants;
+import org.apache.inlong.sort.base.metric.MetricOption;
+import org.apache.inlong.sort.base.metric.MetricOption.RegisteredMetric;
+import org.apache.inlong.sort.base.metric.MetricState;
+import org.apache.inlong.sort.base.metric.SinkMetricData;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A collection class for handling sub metrics of table schema type
+ */
+public class SinkTableMetricData extends SinkMetricData implements
SinkSubMetricData {
+
+ public static final Logger LOGGER =
LoggerFactory.getLogger(SinkTableMetricData.class);
+
+ /**
+ * The sub sink metric data container of sink metric data
+ */
+ private final Map<String, SinkMetricData> subSinkMetricMap =
Maps.newHashMap();
+
+ public SinkTableMetricData(MetricOption option, MetricGroup metricGroup) {
+ super(option, metricGroup);
+ }
+
+ /**
+ * register sub sink metrics group from metric state
+ *
+ * @param metricState MetricState
+ */
+ public void registerSubMetricsGroup(MetricState metricState) {
+ if (metricState == null) {
+ return;
+ }
+
+ // register sub sink metric data
+ if (metricState.getSubMetricStateMap() == null) {
+ return;
+ }
+ Map<String, MetricState> subMetricStateMap =
metricState.getSubMetricStateMap();
+ for (Entry<String, MetricState> subMetricStateEntry :
subMetricStateMap.entrySet()) {
+ String[] schemaInfoArray =
parseSchemaIdentify(subMetricStateEntry.getKey());
+ final MetricState subMetricState = subMetricStateEntry.getValue();
+ SinkMetricData subSinkMetricData =
buildSubSinkMetricData(schemaInfoArray, subMetricState, this);
+ subSinkMetricMap.put(subMetricStateEntry.getKey(),
subSinkMetricData);
+ }
+ LOGGER.info("register subMetricsGroup from metricState,sub metric map
size:{}", subSinkMetricMap.size());
+ }
+
+ /**
+ * build sub sink metric data
+ *
+ * @param schemaInfoArray sink record schema info
+ * @param sinkMetricData sink metric data
+ * @return sub sink metric data
+ */
+ private SinkMetricData buildSubSinkMetricData(String[] schemaInfoArray,
SinkMetricData sinkMetricData) {
+ return buildSubSinkMetricData(schemaInfoArray, null, sinkMetricData);
+ }
+
+ /**
+ * build sub sink metric data
+ *
+ * @param schemaInfoArray the schema info array of record
+ * @param subMetricState sub metric state
+ * @param sinkMetricData sink metric data
+ * @return sub sink metric data
+ */
+ private SinkMetricData buildSubSinkMetricData(String[] schemaInfoArray,
MetricState subMetricState,
+ SinkMetricData sinkMetricData) {
+ if (sinkMetricData == null || schemaInfoArray == null) {
+ return null;
+ }
+ // build sub sink metric data
+ Map<String, String> labels = sinkMetricData.getLabels();
+ String metricGroupLabels = labels.entrySet().stream().map(entry ->
entry.getKey() + "=" + entry.getValue())
+ .collect(Collectors.joining(DELIMITER));
+ StringBuilder labelBuilder = new StringBuilder(metricGroupLabels);
+ if (schemaInfoArray.length == 2) {
+
labelBuilder.append(DELIMITER).append(Constants.DATABASE_NAME).append("=").append(schemaInfoArray[0])
+
.append(DELIMITER).append(Constants.TABLE_NAME).append("=").append(schemaInfoArray[1]);
+ } else if (schemaInfoArray.length == 3) {
+
labelBuilder.append(DELIMITER).append(Constants.DATABASE_NAME).append("=").append(schemaInfoArray[0])
+
.append(DELIMITER).append(Constants.SCHEMA_NAME).append("=").append(schemaInfoArray[1])
+
.append(DELIMITER).append(Constants.TABLE_NAME).append("=").append(schemaInfoArray[2]);
+ }
+
+ MetricOption metricOption = MetricOption.builder()
+ .withInitRecords(subMetricState != null ?
subMetricState.getMetricValue(NUM_RECORDS_OUT) : 0L)
+ .withInitBytes(subMetricState != null ?
subMetricState.getMetricValue(NUM_BYTES_OUT) : 0L)
+ .withInitDirtyRecords(subMetricState != null ?
subMetricState.getMetricValue(DIRTY_RECORDS_OUT) : 0L)
+ .withInitDirtyBytes(subMetricState != null ?
subMetricState.getMetricValue(DIRTY_BYTES_OUT) : 0L)
+
.withInlongLabels(labelBuilder.toString()).withRegisterMetric(RegisteredMetric.ALL).build();
+ return new SinkTableMetricData(metricOption,
sinkMetricData.getMetricGroup());
+ }
+
+ /**
+ * build record schema identify,in the form of database.schema.table or
database.table
+ *
+ * @param database the database name of record
+ * @param schema the schema name of record
+ * @param table the table name of record
+ * @return the record schema identify
+ */
+ public String buildSchemaIdentify(String database, String schema, String
table) {
+ if (schema == null) {
+ return database + Constants.SEMICOLON + table;
+ }
+ return database + Constants.SEMICOLON + schema + Constants.SEMICOLON +
table;
+ }
+
+ /**
+ * parse record schema identify
+ *
+ * @param schemaIdentify the schema identify of record
+ * @return the record schema identify array,String[]{database,table}
+ */
+ public String[] parseSchemaIdentify(String schemaIdentify) {
+ return schemaIdentify.split(Constants.SPILT_SEMICOLON);
+ }
+
+ /**
+ * output metrics with estimate
+ *
+ * @param database the database name of record
+ * @param schema the schema name of record
+ * @param table the table name of record
+ * @param isSnapshotRecord is it snapshot record
+ * @param data the data of record
+ */
+ public void outputMetricsWithEstimate(String database, String schema,
String table, boolean isSnapshotRecord,
+ Object data) {
+ // sink metric and sub sink metric output metrics
+ long rowCountSize = 1L;
+ long rowDataSize = 0L;
+ if (data != null) {
+ rowDataSize =
data.toString().getBytes(StandardCharsets.UTF_8).length;
+ }
+ outputMetricsWithEstimate(database, schema, table, isSnapshotRecord,
rowCountSize, rowDataSize);
+ }
+
+ /**
+ * output metrics with estimate
+ *
+ * @param database the database name of record
+ * @param schema the schema name of record
+ * @param table the table name of record
+ * @param isSnapshotRecord is it snapshot record
+ * @param rowCount the row count of records
+ * @param rowSize the row size of records
+ */
+ public void outputMetricsWithEstimate(String database, String schema,
String table, boolean isSnapshotRecord,
+ long rowCount, long rowSize) {
+ if (StringUtils.isBlank(database) || StringUtils.isBlank(table)) {
+ invoke(rowCount, rowSize);
+ return;
+ }
+ String identify = buildSchemaIdentify(database, schema, table);
+ SinkMetricData subSinkMetricData;
+ if (subSinkMetricMap.containsKey(identify)) {
+ subSinkMetricData = subSinkMetricMap.get(identify);
+ } else {
+ subSinkMetricData = buildSubSinkMetricData(new String[]{database,
schema, table}, this);
+ subSinkMetricMap.put(identify, subSinkMetricData);
+ }
+ // sink metric and sub sink metric output metrics
+ this.invoke(rowCount, rowSize);
+ subSinkMetricData.invoke(rowCount, rowSize);
+ }
+
+ public void outputMetricsWithEstimate(Object data) {
+ long size = data.toString().getBytes(StandardCharsets.UTF_8).length;
+ invoke(1, size);
+ }
+
+ @Override
+ public Map<String, SinkMetricData> getSubSinkMetricMap() {
+ return this.subSinkMetricMap;
+ }
+
+ @Override
+ public String toString() {
+ return "SinkTableMetricData{" + "subSinkMetricMap=" + subSinkMetricMap
+ '}';
+ }
+}
\ No newline at end of file
diff --git
a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/sub/SinkTopicMetricData.java
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/sub/SinkTopicMetricData.java
index 98735f053..ebe2048ea 100644
---
a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/sub/SinkTopicMetricData.java
+++
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/sub/SinkTopicMetricData.java
@@ -96,7 +96,7 @@ public class SinkTopicMetricData extends SinkMetricData
implements SinkSubMetric
}
@Override
- public Map<String, SinkMetricData> getSubSourceMetricMap() {
+ public Map<String, SinkMetricData> getSubSinkMetricMap() {
return this.sinkMetricMap;
}
}
diff --git
a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/util/MetricStateUtils.java
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/util/MetricStateUtils.java
index 4bf1b8930..95aa3c477 100644
---
a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/util/MetricStateUtils.java
+++
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/util/MetricStateUtils.java
@@ -249,7 +249,7 @@ public class MetricStateUtils {
}
SinkSubMetricData sinkSubMetricData = (SinkSubMetricData)
sinkMetricData;
- Map<String, SinkMetricData> subSinkMetricMap =
sinkSubMetricData.getSubSourceMetricMap();
+ Map<String, SinkMetricData> subSinkMetricMap =
sinkSubMetricData.getSubSinkMetricMap();
if (subSinkMetricMap != null && !subSinkMetricMap.isEmpty()) {
Map<String, MetricState> subMetricStateMap = new HashMap<>();
Set<Entry<String, SinkMetricData>> entries =
subSinkMetricMap.entrySet();
diff --git
a/inlong-sort/sort-connectors/starrocks/src/main/java/org/apache/inlong/sort/starrocks/manager/StarRocksSinkManager.java
b/inlong-sort/sort-connectors/starrocks/src/main/java/org/apache/inlong/sort/starrocks/manager/StarRocksSinkManager.java
index a6952eb6d..39d77c472 100644
---
a/inlong-sort/sort-connectors/starrocks/src/main/java/org/apache/inlong/sort/starrocks/manager/StarRocksSinkManager.java
+++
b/inlong-sort/sort-connectors/starrocks/src/main/java/org/apache/inlong/sort/starrocks/manager/StarRocksSinkManager.java
@@ -51,7 +51,7 @@ import org.apache.flink.table.api.TableColumn;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.constraints.UniqueConstraint;
import org.apache.flink.table.types.logical.LogicalTypeRoot;
-import org.apache.inlong.sort.base.metric.SinkMetricData;
+import org.apache.inlong.sort.base.metric.sub.SinkTableMetricData;
import org.apache.inlong.sort.base.sink.SchemaUpdateExceptionPolicy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -115,14 +115,14 @@ public class StarRocksSinkManager implements Serializable
{
private final boolean multipleSink;
private final SchemaUpdateExceptionPolicy schemaUpdatePolicy;
- private transient SinkMetricData metricData;
+ private transient SinkTableMetricData metricData;
/**
* If a table writing throws exception, ignore it when receiving data
later again
*/
private Set<String> ignoreWriteTables = new HashSet<>();
- public void setSinkMetricData(SinkMetricData metricData) {
+ public void setSinkMetricData(SinkTableMetricData metricData) {
this.metricData = metricData;
}
@@ -391,7 +391,12 @@ public class StarRocksSinkManager implements Serializable {
updateMetricsFromStreamLoadResult(result);
if (null != metricData) {
- metricData.invoke(flushData.getBatchCount(),
flushData.getBatchSize());
+ if (multipleSink) {
+
metricData.outputMetricsWithEstimate(flushData.getDatabase(), null,
flushData.getTable(),
+ false, flushData.getBatchCount(),
flushData.getBatchSize());
+ } else {
+ metricData.invoke(flushData.getBatchCount(),
flushData.getBatchSize());
+ }
}
}
startScheduler();
diff --git
a/inlong-sort/sort-connectors/starrocks/src/main/java/org/apache/inlong/sort/starrocks/table/sink/StarRocksDynamicSinkFunction.java
b/inlong-sort/sort-connectors/starrocks/src/main/java/org/apache/inlong/sort/starrocks/table/sink/StarRocksDynamicSinkFunction.java
index 93a1363ff..422715d53 100644
---
a/inlong-sort/sort-connectors/starrocks/src/main/java/org/apache/inlong/sort/starrocks/table/sink/StarRocksDynamicSinkFunction.java
+++
b/inlong-sort/sort-connectors/starrocks/src/main/java/org/apache/inlong/sort/starrocks/table/sink/StarRocksDynamicSinkFunction.java
@@ -63,7 +63,7 @@ import
org.apache.inlong.sort.base.format.DynamicSchemaFormatFactory;
import org.apache.inlong.sort.base.format.JsonDynamicSchemaFormat;
import org.apache.inlong.sort.base.metric.MetricOption;
import org.apache.inlong.sort.base.metric.MetricState;
-import org.apache.inlong.sort.base.metric.SinkMetricData;
+import org.apache.inlong.sort.base.metric.sub.SinkTableMetricData;
import org.apache.inlong.sort.base.sink.SchemaUpdateExceptionPolicy;
import org.apache.inlong.sort.base.util.MetricStateUtils;
import org.apache.inlong.sort.starrocks.manager.StarRocksSinkManager;
@@ -95,7 +95,7 @@ public class StarRocksDynamicSinkFunction<T> extends
RichSinkFunction<T> impleme
private final String tablePattern;
private final String inlongMetric;
- private transient SinkMetricData metricData;
+ private transient SinkTableMetricData metricData;
private transient ListState<MetricState> metricStateListState;
private transient MetricState metricState;
private final String auditHostAndPorts;
@@ -152,7 +152,11 @@ public class StarRocksDynamicSinkFunction<T> extends
RichSinkFunction<T> impleme
.withInitBytes(metricState != null ?
metricState.getMetricValue(NUM_BYTES_OUT) : 0L)
.withRegisterMetric(MetricOption.RegisteredMetric.ALL).build();
if (metricOption != null) {
- metricData = new SinkMetricData(metricOption,
getRuntimeContext().getMetricGroup());
+ metricData = new SinkTableMetricData(metricOption,
getRuntimeContext().getMetricGroup());
+ if (multipleSink) {
+ // register sub sink metric data from metric state
+ metricData.registerSubMetricsGroup(metricState);
+ }
sinkManager.setSinkMetricData(metricData);
}
}