EMsnap commented on code in PR #6585:
URL: https://github.com/apache/inlong/pull/6585#discussion_r1031190852
##########
inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/SourceMetricData.java:
##########
@@ -189,11 +237,126 @@ public Map<String, String> getLabels() {
return labels;
}
+ /**
+ * register sub metrics group from metric state
+ *
+ * @param metricState MetricState
+ */
+ public void registerSubMetricsGroup(MetricState metricState) {
+ if (metricState == null || metricState.getSubMetricStateMap() == null
+ || metricState.getSubMetricStateMap().isEmpty()) {
+ return;
+ }
+
+ Map<String, MetricState> subMetricStateMap =
metricState.getSubMetricStateMap();
+ for (Entry<String, MetricState> subMetricStateEntry :
subMetricStateMap.entrySet()) {
+ String schemaIdentify = subMetricStateEntry.getKey();
+ SourceRecordSchemaInfo sourceRecordSchemaInfo = new
SourceRecordSchemaInfo(schemaIdentify);
+ final MetricState subMetricState = subMetricStateEntry.getValue();
+ SourceMetricData subSourceMetricData =
buildSubSourceMetricData(sourceRecordSchemaInfo,
+ subMetricState, this);
+ subSourceMetricMap.put(subMetricStateEntry.getKey(),
subSourceMetricData);
+ }
+ LOGGER.info("register subMetricsGroup from metricState,sub metric map
size:{}", subSourceMetricMap.size());
+ }
+
+ /**
+ * build sub source metric data
+ *
+ * @param recordSchemaInfo source record schema info
+ * @param subMetricState sub metric state
+ * @param sourceMetricData source metric data
+ * @return sub source metric data
+ */
+ private SourceMetricData buildSubSourceMetricData(SourceRecordSchemaInfo
recordSchemaInfo,
+ MetricState subMetricState, SourceMetricData sourceMetricData) {
+ if (sourceMetricData == null || recordSchemaInfo == null) {
+ return null;
+ }
+
+ // build sub metricGroup labels
+ String metricGroupLabels = this.labels.entrySet().stream().map(entry
-> entry.getKey() + "=" + entry.getValue())
+ .collect(Collectors.joining(DELIMITER));
+
+ StringBuilder labelStringBuilder = new
StringBuilder(metricGroupLabels);
+ String topicName = recordSchemaInfo.getTopicName();
+ if (StringUtils.isNotBlank(topicName)) {
+ // judging only the topic
+
labelStringBuilder.append(DELIMITER).append(Constants.TOPIC_NAME).append("=").append(topicName);
+ } else {
+ // judge The case of database.schema.table or database.table
+ String databaseName = recordSchemaInfo.getDatabaseName();
+
labelStringBuilder.append(DELIMITER).append(Constants.DATABASE_NAME).append("=").append(databaseName);
+ String schemaName = recordSchemaInfo.getSchemaName();
+ if (StringUtils.isNotBlank(schemaName)) {
+
labelStringBuilder.append(DELIMITER).append(Constants.SCHEMA_NAME).append("=").append(schemaName);
+ }
+ String tableName = recordSchemaInfo.getTableName();
+
labelStringBuilder.append(DELIMITER).append(Constants.TABLE_NAME).append("=").append(tableName);
+ }
+
+ // build option labels
+ MetricOption metricOption = MetricOption.builder()
+ .withInitRecords(subMetricState != null ?
subMetricState.getMetricValue(NUM_RECORDS_IN) : 0L)
+ .withInitBytes(subMetricState != null ?
subMetricState.getMetricValue(NUM_BYTES_IN) : 0L)
+ .withReadPhase(subMetricState != null ?
subMetricState.getMetricValue(READ_PHASE) : 0L)
+ .withInlongLabels(labelStringBuilder.toString())
+ .withRegisterMetric(RegisteredMetric.NORMAL)
+ .build();
+ return new SourceMetricData(metricOption,
sourceMetricData.getMetricGroup());
+ }
+
+ /**
+ * build record schema identify
+ *
+ * @param recordSchemaInfo source record schema info
+ * @return record schema identify
+ */
+ public String buildSchemaIdentify(SourceRecordSchemaInfo recordSchemaInfo)
{
+ String database = recordSchemaInfo.getDatabaseName();
+ String topicName = recordSchemaInfo.getTopicName();
+ // Judging only the topic
+ if (StringUtils.isNotBlank(topicName)) {
+ return topicName;
+ }
+ // judge The case of database.schema.table or database.table
+ String table = recordSchemaInfo.getTableName();
+ String schema = recordSchemaInfo.getSchemaName();
+ StringBuilder identifyBuilder = new StringBuilder();
+ identifyBuilder.append(database).append(Constants.SEMICOLON);
+ if (StringUtils.isNotBlank(schema)) {
+ identifyBuilder.append(schema).append(Constants.SEMICOLON);
+ }
+ identifyBuilder.append(table);
+ return identifyBuilder.toString();
+ }
+
public void outputMetricsWithEstimate(Object o) {
long size = o.toString().getBytes(StandardCharsets.UTF_8).length;
outputMetrics(1, size);
}
+ public void outputMetricsWithEstimate(SourceRecordSchemaInfo
recordSchemaInfo, Object o) {
Review Comment:
Object data
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]