szehon-ho commented on a change in pull request #3001:
URL: https://github.com/apache/iceberg/pull/3001#discussion_r694439042
##########
File path:
flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java
##########
@@ -285,34 +292,35 @@ private void commitDeltaTxn(NavigableMap<Long,
WriteResult> pendingResults, Stri
RowDelta rowDelta = table.newRowDelta()
.validateDataFilesExist(ImmutableList.copyOf(result.referencedDataFiles()))
.validateDeletedFiles();
-
- int numDataFiles = result.dataFiles().length;
Arrays.stream(result.dataFiles()).forEach(rowDelta::addRows);
-
- int numDeleteFiles = result.deleteFiles().length;
Arrays.stream(result.deleteFiles()).forEach(rowDelta::addDeletes);
-
- commitOperation(rowDelta, numDataFiles, numDeleteFiles, "rowDelta",
newFlinkJobId, e.getKey());
+ commitOperation(rowDelta, stats, "rowDelta", newFlinkJobId,
e.getKey());
}
}
}
- private void commitOperation(SnapshotUpdate<?> operation, int numDataFiles,
int numDeleteFiles, String description,
+ private void commitOperation(SnapshotUpdate<?> operation, CommitStats stats,
String description,
String newFlinkJobId, long checkpointId) {
- LOG.info("Committing {} with {} data files and {} delete files to table
{}", description, numDataFiles,
- numDeleteFiles, table);
+ LOG.info("Committing {} to table {}: {}", description, table.name(),
stats);
operation.set(MAX_COMMITTED_CHECKPOINT_ID, Long.toString(checkpointId));
operation.set(FLINK_JOB_ID, newFlinkJobId);
- long start = System.currentTimeMillis();
+ long startNano = System.nanoTime();
operation.commit(); // abort is automatically called if this fails.
- long duration = System.currentTimeMillis() - start;
- LOG.info("Committed in {} ms", duration);
+ long durationMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() -
startNano);
+ LOG.info("Committed {} to table {} in {} ms: {}", description,
table.name(), durationMs, stats);
+ committerMetrics.commitDuration(durationMs);
}
@Override
public void processElement(StreamRecord<WriteResult> element) {
this.writeResultsOfCurrentCkpt.add(element.getValue());
+ /**
Review comment:
Style: no need for extra javadoc style comment (I think rest of code
uses normal comment even for multiline)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]