e-mhui commented on code in PR #6808:
URL: https://github.com/apache/inlong/pull/6808#discussion_r1046720761
##########
inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/sub/SourceTableMetricData.java:
##########
@@ -122,9 +122,14 @@ private SourceMetricData buildSubSourceMetricData(String[]
schemaInfoArray, Metr
String metricGroupLabels = labels.entrySet().stream().map(entry ->
entry.getKey() + "=" + entry.getValue())
.collect(Collectors.joining(DELIMITER));
StringBuilder labelBuilder = new StringBuilder(metricGroupLabels);
-
labelBuilder.append(DELIMITER).append(Constants.DATABASE_NAME).append("=").append(schemaInfoArray[0])
-
.append(DELIMITER).append(Constants.TABLE_NAME).append("=").append(schemaInfoArray[1]);
-
+ if (schemaInfoArray.length == 2) {
+
labelBuilder.append(DELIMITER).append(Constants.DATABASE_NAME).append("=").append(schemaInfoArray[0])
+
.append(DELIMITER).append(Constants.TABLE_NAME).append("=").append(schemaInfoArray[1]);
+ } else if (schemaInfoArray.length == 3) {
+
labelBuilder.append(DELIMITER).append(Constants.DATABASE_NAME).append("=").append(schemaInfoArray[0])
+
.append(DELIMITER).append(Constants.SCHEMA_NAME).append("=").append(schemaInfoArray[1])
+
.append(DELIMITER).append(Constants.TABLE_NAME).append("=").append(schemaInfoArray[2]);
+ }
Review Comment:
There is no need to validate here. The `schemaInfoArray` has been validated
before it's initialization. The code is as follows:
```java
public void outputMetricsWithEstimate(String database, String schema,
String table,
boolean isSnapshotRecord, Object data) {
if (StringUtils.isBlank(database) || StringUtils.isBlank(schema) ||
StringUtils.isBlank(table)) {
outputMetricsWithEstimate(data);
return;
}
String identify = buildSchemaIdentify(database, schema, table);
SourceMetricData subSourceMetricData;
if (subSourceMetricMap.containsKey(identify)) {
subSourceMetricData = subSourceMetricMap.get(identify);
} else {
subSourceMetricData = buildSubSourceMetricData(new
String[]{database, schema, table}, this);
subSourceMetricMap.put(identify, subSourceMetricData);
}
// source metric and sub source metric output metrics
long rowCountSize = 1L;
long rowDataSize =
data.toString().getBytes(StandardCharsets.UTF_8).length;
this.outputMetrics(rowCountSize, rowDataSize);
subSourceMetricData.outputMetrics(rowCountSize, rowDataSize);
// output read phase metric
outputReadPhaseMetrics((isSnapshotRecord) ? ReadPhase.SNAPSHOT_PHASE
: ReadPhase.INCREASE_PHASE);
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]