stevenzwu commented on a change in pull request #3001:
URL: https://github.com/apache/iceberg/pull/3001#discussion_r694483500
##########
File path:
flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java
##########
@@ -285,34 +292,35 @@ private void commitDeltaTxn(NavigableMap<Long,
WriteResult> pendingResults, Stri
RowDelta rowDelta = table.newRowDelta()
.validateDataFilesExist(ImmutableList.copyOf(result.referencedDataFiles()))
.validateDeletedFiles();
-
- int numDataFiles = result.dataFiles().length;
Arrays.stream(result.dataFiles()).forEach(rowDelta::addRows);
-
- int numDeleteFiles = result.deleteFiles().length;
Arrays.stream(result.deleteFiles()).forEach(rowDelta::addDeletes);
-
- commitOperation(rowDelta, numDataFiles, numDeleteFiles, "rowDelta",
newFlinkJobId, e.getKey());
+ commitOperation(rowDelta, stats, "rowDelta", newFlinkJobId,
e.getKey());
}
}
}
- private void commitOperation(SnapshotUpdate<?> operation, int numDataFiles,
int numDeleteFiles, String description,
+ private void commitOperation(SnapshotUpdate<?> operation, CommitStats stats,
String description,
String newFlinkJobId, long checkpointId) {
- LOG.info("Committing {} with {} data files and {} delete files to table
{}", description, numDataFiles,
- numDeleteFiles, table);
+ LOG.info("Committing {} to table {}: {}", description, table.name(),
stats);
operation.set(MAX_COMMITTED_CHECKPOINT_ID, Long.toString(checkpointId));
operation.set(FLINK_JOB_ID, newFlinkJobId);
- long start = System.currentTimeMillis();
+ long startNano = System.nanoTime();
operation.commit(); // abort is automatically called if this fails.
- long duration = System.currentTimeMillis() - start;
- LOG.info("Committed in {} ms", duration);
+ long durationMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() -
startNano);
+ LOG.info("Committed {} to table {} in {} ms: {}", description,
table.name(), durationMs, stats);
+ committerMetrics.commitDuration(durationMs);
}
@Override
public void processElement(StreamRecord<WriteResult> element) {
this.writeResultsOfCurrentCkpt.add(element.getValue());
+ /**
Review comment:
I did it mainly to use the `{@link CommitStats}`. let me move the
javadoc to the method level (NOT inside the method)
##########
File path: flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java
##########
@@ -249,6 +251,19 @@ public Builder uidPrefix(String newPrefix) {
return this;
}
+ /**
+ * Set the {@link SlidingWindowReservoir} size (number of measurements
stored)
+ * for the two histogram metrics of data files and delete file size
distribution.
+ *
+ * @param newReservoirSize
+ * default is 128, which only add a small memory overhead of 1 KB (128 x
8B) per histogram. For large
+ * use cases with a lot of files, a larger reservoir size can produce more
accurate histogram distribution.
+ */
+ public Builder fileSizeHistogramReservoirSize(int newReservoirSize) {
+ this.fileSizeHistogramReservoirSize = newReservoirSize;
Review comment:
will do
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]