e-mhui commented on code in PR #6808:
URL: https://github.com/apache/inlong/pull/6808#discussion_r1046720761


##########
inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/sub/SourceTableMetricData.java:
##########
@@ -122,9 +122,14 @@ private SourceMetricData buildSubSourceMetricData(String[] 
schemaInfoArray, Metr
         String metricGroupLabels = labels.entrySet().stream().map(entry -> 
entry.getKey() + "=" + entry.getValue())
                 .collect(Collectors.joining(DELIMITER));
         StringBuilder labelBuilder = new StringBuilder(metricGroupLabels);
-        
labelBuilder.append(DELIMITER).append(Constants.DATABASE_NAME).append("=").append(schemaInfoArray[0])
-                
.append(DELIMITER).append(Constants.TABLE_NAME).append("=").append(schemaInfoArray[1]);
-
+        if (schemaInfoArray.length == 2) {
+            
labelBuilder.append(DELIMITER).append(Constants.DATABASE_NAME).append("=").append(schemaInfoArray[0])
+                    
.append(DELIMITER).append(Constants.TABLE_NAME).append("=").append(schemaInfoArray[1]);
+        } else if (schemaInfoArray.length == 3) {
+            
labelBuilder.append(DELIMITER).append(Constants.DATABASE_NAME).append("=").append(schemaInfoArray[0])
+                    
.append(DELIMITER).append(Constants.SCHEMA_NAME).append("=").append(schemaInfoArray[1])
+                    
.append(DELIMITER).append(Constants.TABLE_NAME).append("=").append(schemaInfoArray[2]);
+        }

Review Comment:
   There is no need to validate here. The `schemaInfoArray` has been validated 
before it's initialization. The code is as follows:
   
   ```java
       public void outputMetricsWithEstimate(String database, String schema, 
String table,
               boolean isSnapshotRecord, Object data) {
           if (StringUtils.isBlank(database) || StringUtils.isBlank(schema) || 
StringUtils.isBlank(table)) {
               outputMetricsWithEstimate(data);
               return;
           }
           String identify = buildSchemaIdentify(database, schema, table);
           SourceMetricData subSourceMetricData;
           if (subSourceMetricMap.containsKey(identify)) {
               subSourceMetricData = subSourceMetricMap.get(identify);
           } else {
               subSourceMetricData = buildSubSourceMetricData(new 
String[]{database, schema, table}, this);
               subSourceMetricMap.put(identify, subSourceMetricData);
           }
           // source metric and sub source metric output metrics
           long rowCountSize = 1L;
           long rowDataSize = 
data.toString().getBytes(StandardCharsets.UTF_8).length;
           this.outputMetrics(rowCountSize, rowDataSize);
           subSourceMetricData.outputMetrics(rowCountSize, rowDataSize);
   
           // output read phase metric
           outputReadPhaseMetrics((isSnapshotRecord) ? ReadPhase.SNAPSHOT_PHASE 
: ReadPhase.INCREASE_PHASE);
       }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to