cshuo commented on code in PR #18762:
URL: https://github.com/apache/hudi/pull/18762#discussion_r3256211212


##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/DynamicBucketAssignFunction.java:
##########
@@ -119,7 +123,9 @@ private boolean isRecordKeyOfThisTask(String recordKey) {
   public void processElement(HoodieFlinkInternalRow record, Context ctx, 
Collector<HoodieFlinkInternalRow> out) throws Exception {
     String partitionPath = record.getPartitionPath();
     String recordKey = record.getRecordKey();
+    metrics.startIndexLookup();

Review Comment:
   This is the per-record hot writing path, metrics here is too frequency. 
   `HoodieFlinkMetrics#startTimer` & `HoodieFlinkMetrics#endTimer` will both 
call `System.currentTimeMillis()` and about 2 map operations. We should avoid 
such metric collecting on per-record hot path.
   
   Maybe we can focused on metrics for global RLI write path in this pr, and 
add metrics only in `MinibatchBucketAssignFunction`.



##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/MinibatchBucketAssignFunction.java:
##########
@@ -125,13 +130,18 @@ private void 
processBufferedRecords(Collector<HoodieFlinkInternalRow> out) throw
     if (isChangingRecords) {
       List<String> recordKeys = 
recordBuffer.stream().map(HoodieFlinkInternalRow::getRecordKey).collect(Collectors.toList());
       MinibatchIndexBackend minibatchIndexBackend = (MinibatchIndexBackend) 
delegateFunction.getIndexBackend();
+      delegateFunction.getMetrics().startIndexLookup();

Review Comment:
   Can we separate index lookup time by `local_index_lookup` and 
`remote_index_lookup`, and move the metric instrumentation to 
`GlobalRecordLevelIndexBackend#get`. Besides, we should also add the following 
metrics meanwhile:
   * `local_lookup_keys_num`
   * `remote_lookup_keys_num`



##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/MinibatchBucketAssignFunction.java:
##########
@@ -125,13 +130,18 @@ private void 
processBufferedRecords(Collector<HoodieFlinkInternalRow> out) throw
     if (isChangingRecords) {
       List<String> recordKeys = 
recordBuffer.stream().map(HoodieFlinkInternalRow::getRecordKey).collect(Collectors.toList());
       MinibatchIndexBackend minibatchIndexBackend = (MinibatchIndexBackend) 
delegateFunction.getIndexBackend();
+      delegateFunction.getMetrics().startIndexLookup();
       // load the record location mapping into the record index cache.
       minibatchIndexBackend.get(recordKeys);
+      delegateFunction.getMetrics().endIndexLookup();
     }
     for (HoodieFlinkInternalRow record: recordBuffer) {
-      delegateFunction.processRecord(record, record.getRecordKey(), out);
+      delegateFunction.processRecord(record, record.getRecordKey(), out, 
false);
     }
 
+    // Record how long the oldest record in the batch was buffered
+    delegateFunction.getMetrics().endRecordBuffering();

Review Comment:
   Buffering time is indicating 'Time records spend buffered before being 
processed', so `endRecordBuffering()` should be placed before processing the 
minibatch buffer, like the beiging of `processBufferedRecords` method.



##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java:
##########
@@ -166,7 +176,13 @@ protected void processRecord(HoodieFlinkInternalRow 
record, String recordKey, Co
       // Only changing records need looking up the index for the location,
       // append only records are always recognized as INSERT.
       // Structured as Tuple(partition, fileId, instantTime).
+      if (recordIndexMetrics) {

Review Comment:
   This is the per-record hot writing path, metrics here is too frequency. 
   `HoodieFlinkMetrics#startTimer` & `HoodieFlinkMetrics#endTimer` will both 
call `System.currentTimeMillis()` and about 2 map operations. We should avoid 
such metric collecting on per-record hot path.
   
   Maybe we can focused on metrics for global RLI write path in this pr, and 
add metrics only in `MinibatchBucketAssignFunction`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to