danny0405 commented on code in PR #9118:
URL: https://github.com/apache/hudi/pull/9118#discussion_r1353905841


##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java:
##########
@@ -385,16 +393,24 @@ private String getBucketID(HoodieRecord<?> record) {
    * @param value HoodieRecord
    */
   protected void bufferRecord(HoodieRecord<?> value) {
+    writeMetrics.markRecordIn();
     final String bucketID = getBucketID(value);
 
     DataBucket bucket = this.buckets.computeIfAbsent(bucketID,
-        k -> new 
DataBucket(this.config.getDouble(FlinkOptions.WRITE_BATCH_SIZE), value));
+        k -> {
+          // create a new bucket and update metrics
+          writeMetrics.increaseNumOfOpenHandle();
+          return new 
DataBucket(this.config.getDouble(FlinkOptions.WRITE_BATCH_SIZE), value);

Review Comment:
   This is just in-memory bucket, not opening file handle.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to