This is an automated email from the ASF dual-hosted git repository.
bryanck pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new b6de7acdb2 Spark 4.1: Refactor SparkMicroBatchStream to SyncPlanner
(#15298)
b6de7acdb2 is described below
commit b6de7acdb23704ef5165fb674d6814060f60e754
Author: Ruijing Li <[email protected]>
AuthorDate: Sat Feb 14 18:48:51 2026 -0800
Spark 4.1: Refactor SparkMicroBatchStream to SyncPlanner (#15298)
---
.../spark/source/BaseSparkMicroBatchPlanner.java | 137 ++++++++
.../iceberg/spark/source/MicroBatchUtils.java | 67 ++++
.../spark/source/SparkMicroBatchPlanner.java | 47 +++
.../spark/source/SparkMicroBatchStream.java | 347 ++-------------------
.../spark/source/SyncSparkMicroBatchPlanner.java | 240 ++++++++++++++
.../spark/source/TestMicroBatchPlanningUtils.java | 100 ++++++
6 files changed, 619 insertions(+), 319 deletions(-)
diff --git
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/BaseSparkMicroBatchPlanner.java
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/BaseSparkMicroBatchPlanner.java
new file mode 100644
index 0000000000..70c0129484
--- /dev/null
+++
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/BaseSparkMicroBatchPlanner.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.source;
+
+import java.util.Locale;
+import org.apache.iceberg.DataOperations;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.spark.SparkReadConf;
+import org.apache.iceberg.spark.SparkReadOptions;
+import org.apache.iceberg.util.SnapshotUtil;
+import org.apache.spark.sql.connector.read.streaming.CompositeReadLimit;
+import org.apache.spark.sql.connector.read.streaming.ReadLimit;
+import org.apache.spark.sql.connector.read.streaming.ReadMaxFiles;
+import org.apache.spark.sql.connector.read.streaming.ReadMaxRows;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+abstract class BaseSparkMicroBatchPlanner implements SparkMicroBatchPlanner {
+ private static final Logger LOG =
LoggerFactory.getLogger(BaseSparkMicroBatchPlanner.class);
+ private final Table table;
+ private final SparkReadConf readConf;
+
+ BaseSparkMicroBatchPlanner(Table table, SparkReadConf readConf) {
+ this.table = table;
+ this.readConf = readConf;
+ }
+
+ protected Table table() {
+ return table;
+ }
+
+ protected SparkReadConf readConf() {
+ return readConf;
+ }
+
+ protected boolean shouldProcess(Snapshot snapshot) {
+ String op = snapshot.operation();
+ switch (op) {
+ case DataOperations.APPEND:
+ return true;
+ case DataOperations.REPLACE:
+ return false;
+ case DataOperations.DELETE:
+ Preconditions.checkState(
+ readConf.streamingSkipDeleteSnapshots(),
+ "Cannot process delete snapshot: %s, to ignore deletes, set
%s=true",
+ snapshot.snapshotId(),
+ SparkReadOptions.STREAMING_SKIP_DELETE_SNAPSHOTS);
+ return false;
+ case DataOperations.OVERWRITE:
+ Preconditions.checkState(
+ readConf.streamingSkipOverwriteSnapshots(),
+ "Cannot process overwrite snapshot: %s, to ignore overwrites, set
%s=true",
+ snapshot.snapshotId(),
+ SparkReadOptions.STREAMING_SKIP_OVERWRITE_SNAPSHOTS);
+ return false;
+ default:
+ throw new IllegalStateException(
+ String.format(
+ "Cannot process unknown snapshot operation: %s (snapshot id
%s)",
+ op.toLowerCase(Locale.ROOT), snapshot.snapshotId()));
+ }
+ }
+
+ /**
+ * Get the next snapshot skipping over rewrite and delete snapshots.
+ *
+ * @param curSnapshot the current snapshot
+ * @return the next valid snapshot (not a rewrite or delete snapshot),
returns null if all
+ * remaining snapshots should be skipped.
+ */
+ protected Snapshot nextValidSnapshot(Snapshot curSnapshot) {
+ Snapshot nextSnapshot = SnapshotUtil.snapshotAfter(table,
curSnapshot.snapshotId());
+
+ // skip over rewrite and delete snapshots
+ while (!shouldProcess(nextSnapshot)) {
+ LOG.debug("Skipping snapshot: {}", nextSnapshot);
+ // if the currentSnapShot was also the mostRecentSnapshot then break
+ // avoids snapshotAfter throwing exception since there are no more
snapshots to process
+ if (nextSnapshot.snapshotId() == table.currentSnapshot().snapshotId()) {
+ return null;
+ }
+ nextSnapshot = SnapshotUtil.snapshotAfter(table,
nextSnapshot.snapshotId());
+ }
+ return nextSnapshot;
+ }
+
+ static class UnpackedLimits {
+ private long maxRows = Integer.MAX_VALUE;
+ private long maxFiles = Integer.MAX_VALUE;
+
+ UnpackedLimits(ReadLimit limit) {
+ if (limit instanceof CompositeReadLimit) {
+ ReadLimit[] compositeLimits = ((CompositeReadLimit)
limit).getReadLimits();
+ for (ReadLimit individualLimit : compositeLimits) {
+ if (individualLimit instanceof ReadMaxRows) {
+ ReadMaxRows readMaxRows = (ReadMaxRows) individualLimit;
+ this.maxRows = Math.min(this.maxRows, readMaxRows.maxRows());
+ } else if (individualLimit instanceof ReadMaxFiles) {
+ ReadMaxFiles readMaxFiles = (ReadMaxFiles) individualLimit;
+ this.maxFiles = Math.min(this.maxFiles, readMaxFiles.maxFiles());
+ }
+ }
+ } else if (limit instanceof ReadMaxRows) {
+ this.maxRows = ((ReadMaxRows) limit).maxRows();
+ } else if (limit instanceof ReadMaxFiles) {
+ this.maxFiles = ((ReadMaxFiles) limit).maxFiles();
+ }
+ }
+
+ public long getMaxRows() {
+ return maxRows;
+ }
+
+ public long getMaxFiles() {
+ return maxFiles;
+ }
+ }
+}
diff --git
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/MicroBatchUtils.java
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/MicroBatchUtils.java
new file mode 100644
index 0000000000..360e0e8fb6
--- /dev/null
+++
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/MicroBatchUtils.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.source;
+
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.SnapshotSummary;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.util.PropertyUtil;
+import org.apache.iceberg.util.SnapshotUtil;
+
+class MicroBatchUtils {
+
+ private MicroBatchUtils() {}
+
+ static StreamingOffset determineStartingOffset(Table table, long
fromTimestamp) {
+ if (table.currentSnapshot() == null) {
+ return StreamingOffset.START_OFFSET;
+ }
+
+ if (fromTimestamp == Long.MIN_VALUE) {
+ // start from the oldest snapshot, since default value is MIN_VALUE
+ // avoids looping to find first snapshot
+ return new
StreamingOffset(SnapshotUtil.oldestAncestor(table).snapshotId(), 0, false);
+ }
+
+ if (table.currentSnapshot().timestampMillis() < fromTimestamp) {
+ return StreamingOffset.START_OFFSET;
+ }
+
+ try {
+ Snapshot snapshot = SnapshotUtil.oldestAncestorAfter(table,
fromTimestamp);
+ if (snapshot != null) {
+ return new StreamingOffset(snapshot.snapshotId(), 0, false);
+ } else {
+ return StreamingOffset.START_OFFSET;
+ }
+ } catch (IllegalStateException e) {
+ // could not determine the first snapshot after the timestamp. use the
oldest ancestor instead
+ return new
StreamingOffset(SnapshotUtil.oldestAncestor(table).snapshotId(), 0, false);
+ }
+ }
+
+ static long addedFilesCount(Table table, Snapshot snapshot) {
+ long addedFilesCount =
+ PropertyUtil.propertyAsLong(snapshot.summary(),
SnapshotSummary.ADDED_FILES_PROP, -1);
+ return addedFilesCount == -1
+ ? Iterables.size(snapshot.addedDataFiles(table.io()))
+ : addedFilesCount;
+ }
+}
diff --git
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchPlanner.java
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchPlanner.java
new file mode 100644
index 0000000000..1986ddac5d
--- /dev/null
+++
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchPlanner.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.source;
+
+import java.util.List;
+import org.apache.iceberg.FileScanTask;
+import org.apache.spark.sql.connector.read.streaming.ReadLimit;
+
+interface SparkMicroBatchPlanner {
+ /**
+ * Return the {@link FileScanTask}s for data added between the start and end
offsets.
+ *
+ * @param startOffset the offset to start planning from
+ * @param endOffset the offset to plan up to
+ * @return file scan tasks for data in the offset range
+ */
+ List<FileScanTask> planFiles(StreamingOffset startOffset, StreamingOffset
endOffset);
+
+ /**
+ * Return the latest offset the stream can advance to from {@code
startOffset}, respecting the
+ * given {@link ReadLimit}.
+ *
+ * @param startOffset the current offset of the stream
+ * @param limit the read limit bounding how far ahead to advance
+ * @return the latest available offset, or {@code null} if no new data is
available
+ */
+ StreamingOffset latestOffset(StreamingOffset startOffset, ReadLimit limit);
+
+ /** Stop the planner and release any resources. */
+ void stop();
+}
diff --git
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java
index 4c1b4b65a8..036a205c97 100644
---
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java
+++
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java
@@ -26,44 +26,29 @@ import java.io.OutputStreamWriter;
import java.io.UncheckedIOException;
import java.nio.charset.StandardCharsets;
import java.util.List;
-import java.util.Locale;
import org.apache.iceberg.CombinedScanTask;
-import org.apache.iceberg.DataOperations;
import org.apache.iceberg.FileScanTask;
-import org.apache.iceberg.ManifestFile;
-import org.apache.iceberg.MicroBatches;
-import org.apache.iceberg.MicroBatches.MicroBatch;
import org.apache.iceberg.Schema;
import org.apache.iceberg.SchemaParser;
import org.apache.iceberg.Snapshot;
-import org.apache.iceberg.SnapshotSummary;
import org.apache.iceberg.Table;
import org.apache.iceberg.io.CloseableIterable;
-import org.apache.iceberg.io.CloseableIterator;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.relocated.com.google.common.base.Joiner;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
-import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.spark.SparkReadConf;
-import org.apache.iceberg.spark.SparkReadOptions;
import org.apache.iceberg.types.Types;
-import org.apache.iceberg.util.Pair;
-import org.apache.iceberg.util.PropertyUtil;
-import org.apache.iceberg.util.SnapshotUtil;
import org.apache.iceberg.util.TableScanUtil;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.broadcast.Broadcast;
import org.apache.spark.sql.connector.read.InputPartition;
import org.apache.spark.sql.connector.read.PartitionReaderFactory;
-import org.apache.spark.sql.connector.read.streaming.CompositeReadLimit;
import org.apache.spark.sql.connector.read.streaming.MicroBatchStream;
import org.apache.spark.sql.connector.read.streaming.Offset;
import org.apache.spark.sql.connector.read.streaming.ReadLimit;
-import org.apache.spark.sql.connector.read.streaming.ReadMaxFiles;
-import org.apache.spark.sql.connector.read.streaming.ReadMaxRows;
import
org.apache.spark.sql.connector.read.streaming.SupportsTriggerAvailableNow;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -74,6 +59,7 @@ public class SparkMicroBatchStream implements
MicroBatchStream, SupportsTriggerA
private static final Types.StructType EMPTY_GROUPING_KEY_TYPE =
Types.StructType.of();
private final Table table;
+ private final SparkReadConf readConf;
private final boolean caseSensitive;
private final String expectedSchema;
private final Broadcast<Table> tableBroadcast;
@@ -82,12 +68,11 @@ public class SparkMicroBatchStream implements
MicroBatchStream, SupportsTriggerA
private final long splitOpenFileCost;
private final boolean localityPreferred;
private final StreamingOffset initialOffset;
- private final boolean skipDelete;
- private final boolean skipOverwrite;
private final long fromTimestamp;
private final int maxFilesPerMicroBatch;
private final int maxRecordsPerMicroBatch;
private final boolean cacheDeleteFilesOnExecutors;
+ private SparkMicroBatchPlanner planner;
private StreamingOffset lastOffsetForTriggerAvailableNow;
SparkMicroBatchStream(
@@ -97,6 +82,7 @@ public class SparkMicroBatchStream implements
MicroBatchStream, SupportsTriggerA
Schema expectedSchema,
String checkpointLocation) {
this.table = table;
+ this.readConf = readConf;
this.caseSensitive = readConf.caseSensitive();
this.expectedSchema = SchemaParser.toJson(expectedSchema);
this.localityPreferred = readConf.localityEnabled();
@@ -112,9 +98,6 @@ public class SparkMicroBatchStream implements
MicroBatchStream, SupportsTriggerA
InitialOffsetStore initialOffsetStore =
new InitialOffsetStore(table, checkpointLocation, fromTimestamp);
this.initialOffset = initialOffsetStore.initialOffset();
-
- this.skipDelete = readConf.streamingSkipDeleteSnapshots();
- this.skipOverwrite = readConf.streamingSkipOverwriteSnapshots();
}
@Override
@@ -130,7 +113,8 @@ public class SparkMicroBatchStream implements
MicroBatchStream, SupportsTriggerA
Snapshot latestSnapshot = table.currentSnapshot();
- return new StreamingOffset(latestSnapshot.snapshotId(),
addedFilesCount(latestSnapshot), false);
+ return new StreamingOffset(
+ latestSnapshot.snapshotId(), MicroBatchUtils.addedFilesCount(table,
latestSnapshot), false);
}
@Override
@@ -149,7 +133,12 @@ public class SparkMicroBatchStream implements
MicroBatchStream, SupportsTriggerA
StreamingOffset endOffset = (StreamingOffset) end;
StreamingOffset startOffset = (StreamingOffset) start;
- List<FileScanTask> fileScanTasks = planFiles(startOffset, endOffset);
+ // Initialize planner if not already done (for resume scenarios)
+ if (planner == null) {
+ initializePlanner(startOffset, endOffset);
+ }
+
+ List<FileScanTask> fileScanTasks = planner.planFiles(startOffset,
endOffset);
CloseableIterable<FileScanTask> splitTasks =
TableScanUtil.splitFiles(CloseableIterable.withNoopClose(fileScanTasks),
splitSize);
@@ -198,164 +187,18 @@ public class SparkMicroBatchStream implements
MicroBatchStream, SupportsTriggerA
public void commit(Offset end) {}
@Override
- public void stop() {}
-
- private List<FileScanTask> planFiles(StreamingOffset startOffset,
StreamingOffset endOffset) {
- List<FileScanTask> fileScanTasks = Lists.newArrayList();
- StreamingOffset batchStartOffset =
- StreamingOffset.START_OFFSET.equals(startOffset)
- ? determineStartingOffset(table, fromTimestamp)
- : startOffset;
-
- StreamingOffset currentOffset = null;
-
- // [(startOffset : startFileIndex), (endOffset : endFileIndex) )
- do {
- long endFileIndex;
- if (currentOffset == null) {
- currentOffset = batchStartOffset;
- } else {
- Snapshot snapshotAfter = SnapshotUtil.snapshotAfter(table,
currentOffset.snapshotId());
- // it may happen that we need to read this snapshot partially in case
it's equal to
- // endOffset.
- if (currentOffset.snapshotId() != endOffset.snapshotId()) {
- currentOffset = new StreamingOffset(snapshotAfter.snapshotId(), 0L,
false);
- } else {
- currentOffset = endOffset;
- }
- }
-
- Snapshot snapshot = table.snapshot(currentOffset.snapshotId());
-
- validateCurrentSnapshotExists(snapshot, currentOffset);
-
- if (!shouldProcess(snapshot)) {
- LOG.debug("Skipping snapshot: {} of table {}",
currentOffset.snapshotId(), table.name());
- continue;
- }
-
- Snapshot currentSnapshot = table.snapshot(currentOffset.snapshotId());
- if (currentOffset.snapshotId() == endOffset.snapshotId()) {
- endFileIndex = endOffset.position();
- } else {
- endFileIndex = addedFilesCount(currentSnapshot);
- }
-
- MicroBatch latestMicroBatch =
- MicroBatches.from(currentSnapshot, table.io())
- .caseSensitive(caseSensitive)
- .specsById(table.specs())
- .generate(
- currentOffset.position(),
- endFileIndex,
- Long.MAX_VALUE,
- currentOffset.shouldScanAllFiles());
-
- fileScanTasks.addAll(latestMicroBatch.tasks());
- } while (currentOffset.snapshotId() != endOffset.snapshotId());
-
- return fileScanTasks;
- }
-
- private boolean shouldProcess(Snapshot snapshot) {
- String op = snapshot.operation();
- switch (op) {
- case DataOperations.APPEND:
- return true;
- case DataOperations.REPLACE:
- return false;
- case DataOperations.DELETE:
- Preconditions.checkState(
- skipDelete,
- "Cannot process delete snapshot: %s, to ignore deletes, set
%s=true",
- snapshot.snapshotId(),
- SparkReadOptions.STREAMING_SKIP_DELETE_SNAPSHOTS);
- return false;
- case DataOperations.OVERWRITE:
- Preconditions.checkState(
- skipOverwrite,
- "Cannot process overwrite snapshot: %s, to ignore overwrites, set
%s=true",
- snapshot.snapshotId(),
- SparkReadOptions.STREAMING_SKIP_OVERWRITE_SNAPSHOTS);
- return false;
- default:
- throw new IllegalStateException(
- String.format(
- "Cannot process unknown snapshot operation: %s (snapshot id
%s)",
- op.toLowerCase(Locale.ROOT), snapshot.snapshotId()));
+ public void stop() {
+ if (planner != null) {
+ planner.stop();
}
}
- private static StreamingOffset determineStartingOffset(Table table, Long
fromTimestamp) {
- if (table.currentSnapshot() == null) {
- return StreamingOffset.START_OFFSET;
- }
-
- if (fromTimestamp == null) {
- // match existing behavior and start from the oldest snapshot
- return new
StreamingOffset(SnapshotUtil.oldestAncestor(table).snapshotId(), 0, false);
- }
-
- if (table.currentSnapshot().timestampMillis() < fromTimestamp) {
- return StreamingOffset.START_OFFSET;
- }
-
- try {
- Snapshot snapshot = SnapshotUtil.oldestAncestorAfter(table,
fromTimestamp);
- if (snapshot != null) {
- return new StreamingOffset(snapshot.snapshotId(), 0, false);
- } else {
- return StreamingOffset.START_OFFSET;
- }
- } catch (IllegalStateException e) {
- // could not determine the first snapshot after the timestamp. use the
oldest ancestor instead
- return new
StreamingOffset(SnapshotUtil.oldestAncestor(table).snapshotId(), 0, false);
- }
- }
-
- private static int getMaxFiles(ReadLimit readLimit) {
- if (readLimit instanceof ReadMaxFiles) {
- return ((ReadMaxFiles) readLimit).maxFiles();
- }
-
- if (readLimit instanceof CompositeReadLimit) {
- // We do not expect a CompositeReadLimit to contain a nested
CompositeReadLimit.
- // In fact, it should only be a composite of two or more of ReadMinRows,
ReadMaxRows and
- // ReadMaxFiles, with no more than one of each.
- ReadLimit[] limits = ((CompositeReadLimit) readLimit).getReadLimits();
- for (ReadLimit limit : limits) {
- if (limit instanceof ReadMaxFiles) {
- return ((ReadMaxFiles) limit).maxFiles();
- }
- }
- }
-
- // there is no ReadMaxFiles, so return the default
- return Integer.MAX_VALUE;
- }
-
- private static int getMaxRows(ReadLimit readLimit) {
- if (readLimit instanceof ReadMaxRows) {
- long maxRows = ((ReadMaxRows) readLimit).maxRows();
- return Math.toIntExact(maxRows);
- }
-
- if (readLimit instanceof CompositeReadLimit) {
- ReadLimit[] limits = ((CompositeReadLimit) readLimit).getReadLimits();
- for (ReadLimit limit : limits) {
- if (limit instanceof ReadMaxRows) {
- long maxRows = ((ReadMaxRows) limit).maxRows();
- return Math.toIntExact(maxRows);
- }
- }
- }
-
- // there is no ReadMaxRows, so return the default
- return Integer.MAX_VALUE;
+ private void initializePlanner(StreamingOffset startOffset, StreamingOffset
endOffset) {
+ this.planner =
+ new SyncSparkMicroBatchPlanner(table, readConf,
lastOffsetForTriggerAvailableNow);
}
@Override
- @SuppressWarnings("checkstyle:CyclomaticComplexity")
public Offset latestOffset(Offset startOffset, ReadLimit limit) {
// calculate end offset get snapshotId from the startOffset
Preconditions.checkArgument(
@@ -363,152 +206,12 @@ public class SparkMicroBatchStream implements
MicroBatchStream, SupportsTriggerA
"Invalid start offset: %s is not a StreamingOffset",
startOffset);
- table.refresh();
- if (table.currentSnapshot() == null) {
- return StreamingOffset.START_OFFSET;
- }
-
- if (table.currentSnapshot().timestampMillis() < fromTimestamp) {
- return StreamingOffset.START_OFFSET;
- }
-
- // end offset can expand to multiple snapshots
- StreamingOffset startingOffset = (StreamingOffset) startOffset;
-
- if (startOffset.equals(StreamingOffset.START_OFFSET)) {
- startingOffset = determineStartingOffset(table, fromTimestamp);
- }
-
- Snapshot curSnapshot = table.snapshot(startingOffset.snapshotId());
- validateCurrentSnapshotExists(curSnapshot, startingOffset);
-
- // Use the pre-computed snapshotId when Trigger.AvailableNow is enabled.
- long latestSnapshotId =
- lastOffsetForTriggerAvailableNow != null
- ? lastOffsetForTriggerAvailableNow.snapshotId()
- : table.currentSnapshot().snapshotId();
-
- int startPosOfSnapOffset = (int) startingOffset.position();
-
- boolean scanAllFiles = startingOffset.shouldScanAllFiles();
-
- boolean shouldContinueReading = true;
- int curFilesAdded = 0;
- long curRecordCount = 0;
- int curPos = 0;
-
- // Note : we produce nextOffset with pos as non-inclusive
- while (shouldContinueReading) {
- // generate manifest index for the curSnapshot
- List<Pair<ManifestFile, Integer>> indexedManifests =
- MicroBatches.skippedManifestIndexesFromSnapshot(
- table.io(), curSnapshot, startPosOfSnapOffset, scanAllFiles);
- // this is under assumption we will be able to add at-least 1 file in
the new offset
- for (int idx = 0; idx < indexedManifests.size() &&
shouldContinueReading; idx++) {
- // be rest assured curPos >= startFileIndex
- curPos = indexedManifests.get(idx).second();
- try (CloseableIterable<FileScanTask> taskIterable =
- MicroBatches.openManifestFile(
- table.io(),
- table.specs(),
- caseSensitive,
- curSnapshot,
- indexedManifests.get(idx).first(),
- scanAllFiles);
- CloseableIterator<FileScanTask> taskIter =
taskIterable.iterator()) {
- while (taskIter.hasNext()) {
- FileScanTask task = taskIter.next();
- if (curPos >= startPosOfSnapOffset) {
- if ((curFilesAdded + 1) > getMaxFiles(limit)) {
- // On including the file it might happen that we might exceed,
the configured
- // soft limit on the number of records, since this is a soft
limit its acceptable.
- shouldContinueReading = false;
- break;
- }
-
- curFilesAdded += 1;
- curRecordCount += task.file().recordCount();
-
- if (curRecordCount >= getMaxRows(limit)) {
- // we included the file, so increment the number of files
- // read in the current snapshot.
- ++curPos;
- shouldContinueReading = false;
- break;
- }
- }
- ++curPos;
- }
- } catch (IOException ioe) {
- LOG.warn("Failed to close task iterable", ioe);
- }
- }
- // if the currentSnapShot was also the latestSnapshot then break
- if (curSnapshot.snapshotId() == latestSnapshotId) {
- break;
- }
-
- // if everything was OK and we consumed complete snapshot then move to
next snapshot
- if (shouldContinueReading) {
- Snapshot nextValid = nextValidSnapshot(curSnapshot);
- if (nextValid == null) {
- // nextValid implies all the remaining snapshots should be skipped.
- break;
- }
- // we found the next available snapshot, continue from there.
- curSnapshot = nextValid;
- startPosOfSnapOffset = -1;
- // if anyhow we are moving to next snapshot we should only scan
addedFiles
- scanAllFiles = false;
- }
+ // Initialize planner if not already done
+ if (planner == null) {
+ initializePlanner((StreamingOffset) startOffset, null);
}
- StreamingOffset latestStreamingOffset =
- new StreamingOffset(curSnapshot.snapshotId(), curPos, scanAllFiles);
-
- // if no new data arrived, then return null.
- return latestStreamingOffset.equals(startingOffset) ? null :
latestStreamingOffset;
- }
-
- /**
- * Get the next snapshot skiping over rewrite and delete snapshots.
- *
- * @param curSnapshot the current snapshot
- * @return the next valid snapshot (not a rewrite or delete snapshot),
returns null if all
- * remaining snapshots should be skipped.
- */
- private Snapshot nextValidSnapshot(Snapshot curSnapshot) {
- Snapshot nextSnapshot = SnapshotUtil.snapshotAfter(table,
curSnapshot.snapshotId());
- // skip over rewrite and delete snapshots
- while (!shouldProcess(nextSnapshot)) {
- LOG.debug("Skipping snapshot: {} of table {}",
nextSnapshot.snapshotId(), table.name());
- // if the currentSnapShot was also the mostRecentSnapshot then break
- if (nextSnapshot.snapshotId() == table.currentSnapshot().snapshotId()) {
- return null;
- }
- nextSnapshot = SnapshotUtil.snapshotAfter(table,
nextSnapshot.snapshotId());
- }
- return nextSnapshot;
- }
-
- private long addedFilesCount(Snapshot snapshot) {
- long addedFilesCount =
- PropertyUtil.propertyAsLong(snapshot.summary(),
SnapshotSummary.ADDED_FILES_PROP, -1);
- // If snapshotSummary doesn't have SnapshotSummary.ADDED_FILES_PROP,
- // iterate through addedFiles iterator to find addedFilesCount.
- return addedFilesCount == -1
- ? Iterables.size(snapshot.addedDataFiles(table.io()))
- : addedFilesCount;
- }
-
- private void validateCurrentSnapshotExists(Snapshot snapshot,
StreamingOffset currentOffset) {
- if (snapshot == null) {
- throw new IllegalStateException(
- String.format(
- Locale.ROOT,
- "Cannot load current offset at snapshot %d, the snapshot was
expired or removed",
- currentOffset.snapshotId()));
- }
+ return planner.latestOffset((StreamingOffset) startOffset, limit);
}
@Override
@@ -536,6 +239,12 @@ public class SparkMicroBatchStream implements
MicroBatchStream, SupportsTriggerA
(StreamingOffset) latestOffset(initialOffset,
ReadLimit.allAvailable());
LOG.info("lastOffset for Trigger.AvailableNow is {}",
lastOffsetForTriggerAvailableNow.json());
+
+ // Reset planner so it gets recreated with the cap on next call
+ if (planner != null) {
+ planner.stop();
+ planner = null;
+ }
}
private static class InitialOffsetStore {
@@ -558,7 +267,7 @@ public class SparkMicroBatchStream implements
MicroBatchStream, SupportsTriggerA
}
table.refresh();
- StreamingOffset offset = determineStartingOffset(table, fromTimestamp);
+ StreamingOffset offset = MicroBatchUtils.determineStartingOffset(table,
fromTimestamp);
OutputFile outputFile = io.newOutputFile(initialOffsetLocation);
writeOffset(offset, outputFile);
diff --git
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SyncSparkMicroBatchPlanner.java
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SyncSparkMicroBatchPlanner.java
new file mode 100644
index 0000000000..fe662e2b83
--- /dev/null
+++
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SyncSparkMicroBatchPlanner.java
@@ -0,0 +1,240 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.source;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Locale;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.ManifestFile;
+import org.apache.iceberg.MicroBatches;
+import org.apache.iceberg.MicroBatches.MicroBatch;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.CloseableIterator;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.spark.SparkReadConf;
+import org.apache.iceberg.util.Pair;
+import org.apache.iceberg.util.SnapshotUtil;
+import org.apache.spark.sql.connector.read.streaming.ReadLimit;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class SyncSparkMicroBatchPlanner extends BaseSparkMicroBatchPlanner {
+ private static final Logger LOG =
LoggerFactory.getLogger(SyncSparkMicroBatchPlanner.class);
+
+ private final boolean caseSensitive;
+ private final long fromTimestamp;
+ private final StreamingOffset lastOffsetForTriggerAvailableNow;
+
+ SyncSparkMicroBatchPlanner(
+ Table table, SparkReadConf readConf, StreamingOffset
lastOffsetForTriggerAvailableNow) {
+ super(table, readConf);
+ this.caseSensitive = readConf().caseSensitive();
+ this.fromTimestamp = readConf().streamFromTimestamp();
+ this.lastOffsetForTriggerAvailableNow = lastOffsetForTriggerAvailableNow;
+ }
+
+ @Override
+ public List<FileScanTask> planFiles(StreamingOffset startOffset,
StreamingOffset endOffset) {
+ List<FileScanTask> fileScanTasks = Lists.newArrayList();
+ StreamingOffset batchStartOffset =
+ StreamingOffset.START_OFFSET.equals(startOffset)
+ ? MicroBatchUtils.determineStartingOffset(table(), fromTimestamp)
+ : startOffset;
+
+ StreamingOffset currentOffset = null;
+
+ // [(startOffset : startFileIndex), (endOffset : endFileIndex) )
+ do {
+ long endFileIndex;
+ if (currentOffset == null) {
+ currentOffset = batchStartOffset;
+ } else {
+ Snapshot snapshotAfter = SnapshotUtil.snapshotAfter(table(),
currentOffset.snapshotId());
+ // it may happen that we need to read this snapshot partially in case
it's equal to
+ // endOffset.
+ if (currentOffset.snapshotId() != endOffset.snapshotId()) {
+ currentOffset = new StreamingOffset(snapshotAfter.snapshotId(), 0L,
false);
+ } else {
+ currentOffset = endOffset;
+ }
+ }
+
+ Snapshot snapshot = table().snapshot(currentOffset.snapshotId());
+
+ validateCurrentSnapshotExists(snapshot, currentOffset);
+
+ if (!shouldProcess(snapshot)) {
+ LOG.debug("Skipping snapshot: {} of table {}",
currentOffset.snapshotId(), table().name());
+ continue;
+ }
+
+ Snapshot currentSnapshot = table().snapshot(currentOffset.snapshotId());
+ if (currentOffset.snapshotId() == endOffset.snapshotId()) {
+ endFileIndex = endOffset.position();
+ } else {
+ endFileIndex = MicroBatchUtils.addedFilesCount(table(),
currentSnapshot);
+ }
+
+ MicroBatch latestMicroBatch =
+ MicroBatches.from(currentSnapshot, table().io())
+ .caseSensitive(caseSensitive)
+ .specsById(table().specs())
+ .generate(
+ currentOffset.position(),
+ endFileIndex,
+ Long.MAX_VALUE,
+ currentOffset.shouldScanAllFiles());
+
+ fileScanTasks.addAll(latestMicroBatch.tasks());
+ } while (currentOffset.snapshotId() != endOffset.snapshotId());
+
+ return fileScanTasks;
+ }
+
+ @Override
+ @SuppressWarnings("checkstyle:CyclomaticComplexity")
+ public StreamingOffset latestOffset(StreamingOffset startOffset, ReadLimit
limit) {
+ table().refresh();
+ if (table().currentSnapshot() == null) {
+ return StreamingOffset.START_OFFSET;
+ }
+
+ if (table().currentSnapshot().timestampMillis() < fromTimestamp) {
+ return StreamingOffset.START_OFFSET;
+ }
+
+ // end offset can expand to multiple snapshots
+ StreamingOffset startingOffset = startOffset;
+
+ if (startOffset.equals(StreamingOffset.START_OFFSET)) {
+ startingOffset = MicroBatchUtils.determineStartingOffset(table(),
fromTimestamp);
+ }
+
+ Snapshot curSnapshot = table().snapshot(startingOffset.snapshotId());
+ validateCurrentSnapshotExists(curSnapshot, startingOffset);
+
+ // Use the pre-computed snapshotId when Trigger.AvailableNow is enabled.
+ long latestSnapshotId =
+ lastOffsetForTriggerAvailableNow != null
+ ? lastOffsetForTriggerAvailableNow.snapshotId()
+ : table().currentSnapshot().snapshotId();
+
+ int startPosOfSnapOffset = (int) startingOffset.position();
+
+ boolean scanAllFiles = startingOffset.shouldScanAllFiles();
+
+ boolean shouldContinueReading = true;
+ int curFilesAdded = 0;
+ long curRecordCount = 0;
+ int curPos = 0;
+
+ // Extract limits once to avoid repeated calls in tight loop
+ UnpackedLimits unpackedLimits = new UnpackedLimits(limit);
+ long maxFiles = unpackedLimits.getMaxFiles();
+ long maxRows = unpackedLimits.getMaxRows();
+
+ // Note : we produce nextOffset with pos as non-inclusive
+ while (shouldContinueReading) {
+ // generate manifest index for the curSnapshot
+ List<Pair<ManifestFile, Integer>> indexedManifests =
+ MicroBatches.skippedManifestIndexesFromSnapshot(
+ table().io(), curSnapshot, startPosOfSnapOffset, scanAllFiles);
+ // this is under assumption we will be able to add at-least 1 file in
the new offset
+ for (int idx = 0; idx < indexedManifests.size() &&
shouldContinueReading; idx++) {
+ // be rest assured curPos >= startFileIndex
+ curPos = indexedManifests.get(idx).second();
+ try (CloseableIterable<FileScanTask> taskIterable =
+ MicroBatches.openManifestFile(
+ table().io(),
+ table().specs(),
+ caseSensitive,
+ curSnapshot,
+ indexedManifests.get(idx).first(),
+ scanAllFiles);
+ CloseableIterator<FileScanTask> taskIter =
taskIterable.iterator()) {
+ while (taskIter.hasNext()) {
+ FileScanTask task = taskIter.next();
+ if (curPos >= startPosOfSnapOffset) {
+ if ((curFilesAdded + 1) > maxFiles) {
+ // On including the file it might happen that we might exceed,
the configured
+ // soft limit on the number of records, since this is a soft
limit its acceptable.
+ shouldContinueReading = false;
+ break;
+ }
+
+ curFilesAdded += 1;
+ curRecordCount += task.file().recordCount();
+
+ if (curRecordCount >= maxRows) {
+ // we included the file, so increment the number of files
+ // read in the current snapshot.
+ ++curPos;
+ shouldContinueReading = false;
+ break;
+ }
+ }
+ ++curPos;
+ }
+ } catch (IOException ioe) {
+ LOG.warn("Failed to close task iterable", ioe);
+ }
+ }
+ // if the currentSnapShot was also the latestSnapshot then break
+ if (curSnapshot.snapshotId() == latestSnapshotId) {
+ break;
+ }
+
+ // if everything was OK and we consumed complete snapshot then move to
next snapshot
+ if (shouldContinueReading) {
+ Snapshot nextValid = nextValidSnapshot(curSnapshot);
+ if (nextValid == null) {
+ // nextValid implies all the remaining snapshots should be skipped.
+ break;
+ }
+ // we found the next available snapshot, continue from there.
+ curSnapshot = nextValid;
+ startPosOfSnapOffset = -1;
+ // if anyhow we are moving to next snapshot we should only scan
addedFiles
+ scanAllFiles = false;
+ }
+ }
+
+ StreamingOffset latestStreamingOffset =
+ new StreamingOffset(curSnapshot.snapshotId(), curPos, scanAllFiles);
+
+ // if no new data arrived, then return null.
+ return latestStreamingOffset.equals(startingOffset) ? null :
latestStreamingOffset;
+ }
+
+ @Override
+ public void stop() {}
+
+ private void validateCurrentSnapshotExists(Snapshot snapshot,
StreamingOffset currentOffset) {
+ if (snapshot == null) {
+ throw new IllegalStateException(
+ String.format(
+ Locale.ROOT,
+ "Cannot load current offset at snapshot %d, the snapshot was
expired or removed",
+ currentOffset.snapshotId()));
+ }
+ }
+}
diff --git
a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestMicroBatchPlanningUtils.java
b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestMicroBatchPlanningUtils.java
new file mode 100644
index 0000000000..a9ce340fd4
--- /dev/null
+++
b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestMicroBatchPlanningUtils.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.source;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import org.apache.iceberg.ParameterizedTestExtension;
+import org.apache.iceberg.SnapshotSummary;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.spark.CatalogTestBase;
+import org.apache.spark.sql.connector.read.streaming.ReadLimit;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.TestTemplate;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+@ExtendWith(ParameterizedTestExtension.class)
+public class TestMicroBatchPlanningUtils extends CatalogTestBase {
+
+ private Table table;
+
+ @BeforeEach
+ public void setupTable() {
+ sql("DROP TABLE IF EXISTS %s", tableName);
+ sql(
+ "CREATE TABLE %s "
+ + "(id INT, data STRING) "
+ + "USING iceberg "
+ + "PARTITIONED BY (bucket(3, id))",
+ tableName);
+ this.table = validationCatalog.loadTable(tableIdent);
+ }
+
+ @AfterEach
+ public void dropTable() {
+ sql("DROP TABLE IF EXISTS %s", tableName);
+ }
+
+ @TestTemplate
+ public void testUnpackedLimitsCompositeChoosesMinimum() {
+ ReadLimit[] limits =
+ new ReadLimit[] {
+ ReadLimit.maxRows(10), ReadLimit.maxRows(4), ReadLimit.maxFiles(8),
ReadLimit.maxFiles(2)
+ };
+
+ ReadLimit composite = ReadLimit.compositeLimit(limits);
+
+ BaseSparkMicroBatchPlanner.UnpackedLimits unpacked =
+ new BaseSparkMicroBatchPlanner.UnpackedLimits(composite);
+
+ assertThat(unpacked.getMaxRows()).isEqualTo(4);
+ assertThat(unpacked.getMaxFiles()).isEqualTo(2);
+ }
+
+ @TestTemplate
+ public void testDetermineStartingOffsetWithTimestampBetweenSnapshots() {
+ sql("INSERT INTO %s VALUES (1, 'one')", tableName);
+ table.refresh();
+ long snapshot1Time = table.currentSnapshot().timestampMillis();
+
+ sql("INSERT INTO %s VALUES (2, 'two')", tableName);
+ table.refresh();
+ long snapshot2Id = table.currentSnapshot().snapshotId();
+
+ StreamingOffset offset = MicroBatchUtils.determineStartingOffset(table,
snapshot1Time + 1);
+
+ assertThat(offset.snapshotId()).isEqualTo(snapshot2Id);
+ assertThat(offset.position()).isEqualTo(0L);
+ assertThat(offset.shouldScanAllFiles()).isFalse();
+ }
+
+ @TestTemplate
+ public void testAddedFilesCountUsesSummaryWhenPresent() {
+ sql("INSERT INTO %s VALUES (1, 'one')", tableName);
+ table.refresh();
+
+ long expectedAddedFiles =
+
Long.parseLong(table.currentSnapshot().summary().get(SnapshotSummary.ADDED_FILES_PROP));
+
+ long actual = MicroBatchUtils.addedFilesCount(table,
table.currentSnapshot());
+
+ assertThat(actual).isEqualTo(expectedAddedFiles);
+ }
+}