bryanck commented on code in PR #15059:
URL: https://github.com/apache/iceberg/pull/15059#discussion_r2705985998
##########
spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java:
##########
@@ -201,100 +210,18 @@ public Offset deserializeOffset(String json) {
public void commit(Offset end) {}
@Override
- public void stop() {}
-
- private List<FileScanTask> planFiles(StreamingOffset startOffset,
StreamingOffset endOffset) {
- List<FileScanTask> fileScanTasks = Lists.newArrayList();
- StreamingOffset batchStartOffset =
- StreamingOffset.START_OFFSET.equals(startOffset)
- ? determineStartingOffset(table, fromTimestamp)
- : startOffset;
-
- StreamingOffset currentOffset = null;
-
- // [(startOffset : startFileIndex), (endOffset : endFileIndex) )
- do {
- long endFileIndex;
- if (currentOffset == null) {
- currentOffset = batchStartOffset;
- } else {
- Snapshot snapshotAfter = SnapshotUtil.snapshotAfter(table,
currentOffset.snapshotId());
- // it may happen that we need to read this snapshot partially in case
it's equal to
- // endOffset.
- if (currentOffset.snapshotId() != endOffset.snapshotId()) {
- currentOffset = new StreamingOffset(snapshotAfter.snapshotId(), 0L,
false);
- } else {
- currentOffset = endOffset;
- }
- }
-
- Snapshot snapshot = table.snapshot(currentOffset.snapshotId());
-
- validateCurrentSnapshotExists(snapshot, currentOffset);
-
- if (!shouldProcess(snapshot)) {
- LOG.debug("Skipping snapshot: {} of table {}",
currentOffset.snapshotId(), table.name());
- continue;
- }
-
- Snapshot currentSnapshot = table.snapshot(currentOffset.snapshotId());
- if (currentOffset.snapshotId() == endOffset.snapshotId()) {
- endFileIndex = endOffset.position();
- } else {
- endFileIndex = addedFilesCount(currentSnapshot);
- }
-
- MicroBatch latestMicroBatch =
- MicroBatches.from(currentSnapshot, table.io())
- .caseSensitive(caseSensitive)
- .specsById(table.specs())
- .generate(
- currentOffset.position(),
- endFileIndex,
- Long.MAX_VALUE,
- currentOffset.shouldScanAllFiles());
-
- fileScanTasks.addAll(latestMicroBatch.tasks());
- } while (currentOffset.snapshotId() != endOffset.snapshotId());
-
- return fileScanTasks;
- }
-
- private boolean shouldProcess(Snapshot snapshot) {
- String op = snapshot.operation();
- switch (op) {
- case DataOperations.APPEND:
- return true;
- case DataOperations.REPLACE:
- return false;
- case DataOperations.DELETE:
- Preconditions.checkState(
- skipDelete,
- "Cannot process delete snapshot: %s, to ignore deletes, set
%s=true",
- snapshot.snapshotId(),
- SparkReadOptions.STREAMING_SKIP_DELETE_SNAPSHOTS);
- return false;
- case DataOperations.OVERWRITE:
- Preconditions.checkState(
- skipOverwrite,
- "Cannot process overwrite snapshot: %s, to ignore overwrites, set
%s=true",
- snapshot.snapshotId(),
- SparkReadOptions.STREAMING_SKIP_OVERWRITE_SNAPSHOTS);
- return false;
- default:
- throw new IllegalStateException(
- String.format(
- "Cannot process unknown snapshot operation: %s (snapshot id
%s)",
- op.toLowerCase(Locale.ROOT), snapshot.snapshotId()));
+ public void stop() {
+ if (microBatchPlanner != null) {
+ microBatchPlanner.stop();
}
}
- private static StreamingOffset determineStartingOffset(Table table, Long
fromTimestamp) {
+ static StreamingOffset determineStartingOffset(Table table, long
fromTimestamp) {
Review Comment:
IMO we should move this somewhere else so the planners don't need to
reference `SparkMicroBatchStream`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]