stevenzwu commented on a change in pull request #3001:
URL: https://github.com/apache/iceberg/pull/3001#discussion_r694483500



##########
File path: 
flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java
##########
@@ -285,34 +292,35 @@ private void commitDeltaTxn(NavigableMap<Long, 
WriteResult> pendingResults, Stri
         RowDelta rowDelta = table.newRowDelta()
             
.validateDataFilesExist(ImmutableList.copyOf(result.referencedDataFiles()))
             .validateDeletedFiles();
-
-        int numDataFiles = result.dataFiles().length;
         Arrays.stream(result.dataFiles()).forEach(rowDelta::addRows);
-
-        int numDeleteFiles = result.deleteFiles().length;
         Arrays.stream(result.deleteFiles()).forEach(rowDelta::addDeletes);
-
-        commitOperation(rowDelta, numDataFiles, numDeleteFiles, "rowDelta", 
newFlinkJobId, e.getKey());
+        commitOperation(rowDelta, stats, "rowDelta", newFlinkJobId, 
e.getKey());
       }
     }
   }
 
-  private void commitOperation(SnapshotUpdate<?> operation, int numDataFiles, 
int numDeleteFiles, String description,
+  private void commitOperation(SnapshotUpdate<?> operation, CommitStats stats, 
String description,
                                String newFlinkJobId, long checkpointId) {
-    LOG.info("Committing {} with {} data files and {} delete files to table 
{}", description, numDataFiles,
-        numDeleteFiles, table);
+    LOG.info("Committing {} to table {}: {}", description, table.name(), 
stats);
     operation.set(MAX_COMMITTED_CHECKPOINT_ID, Long.toString(checkpointId));
     operation.set(FLINK_JOB_ID, newFlinkJobId);
 
-    long start = System.currentTimeMillis();
+    long startNano = System.nanoTime();
     operation.commit(); // abort is automatically called if this fails.
-    long duration = System.currentTimeMillis() - start;
-    LOG.info("Committed in {} ms", duration);
+    long durationMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - 
startNano);
+    LOG.info("Committed {} to table {} in {} ms: {}", description, 
table.name(), durationMs, stats);
+    committerMetrics.commitDuration(durationMs);
   }
 
   @Override
   public void processElement(StreamRecord<WriteResult> element) {
     this.writeResultsOfCurrentCkpt.add(element.getValue());
+    /**

Review comment:
       I did it mainly to use the `{@link CommitStats}`. let me move the 
javadoc to the method level (NOT inside the method)

##########
File path: flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java
##########
@@ -249,6 +251,19 @@ public Builder uidPrefix(String newPrefix) {
       return this;
     }
 
+    /**
+     * Set the {@link SlidingWindowReservoir} size (number of measurements 
stored)
+     * for the two histogram metrics of data files and delete file size 
distribution.
+     *
+     * @param newReservoirSize
+     * default is 128, which  only add a small memory overhead of 1 KB (128 x 
8B) per histogram. For large
+     * use cases with a lot of files, a larger reservoir size can produce more 
accurate histogram distribution.
+     */
+    public Builder fileSizeHistogramReservoirSize(int newReservoirSize) {
+      this.fileSizeHistogramReservoirSize = newReservoirSize;

Review comment:
       will do




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to