This is an automated email from the ASF dual-hosted git repository. pvary pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push: new 6550486f0c Spark 3.4: Structured Streaming read limit support follow-up (#13099) 6550486f0c is described below commit 6550486f0c640e15bb7c4d6c16b087397656213e Author: Wing Yew Poon <wyp...@cloudera.com> AuthorDate: Wed May 21 08:27:38 2025 -0700 Spark 3.4: Structured Streaming read limit support follow-up (#13099) backports #12260 to Spark 3.4 --- .../spark/source/SparkMicroBatchStream.java | 52 +++++++++++++++++++--- .../spark/source/TestStructuredStreamingRead3.java | 44 ++++++++++++++---- 2 files changed, 82 insertions(+), 14 deletions(-) diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java index 5e10719c0c..d0da342c1c 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java @@ -58,9 +58,12 @@ import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.broadcast.Broadcast; import org.apache.spark.sql.connector.read.InputPartition; import org.apache.spark.sql.connector.read.PartitionReaderFactory; +import org.apache.spark.sql.connector.read.streaming.CompositeReadLimit; import org.apache.spark.sql.connector.read.streaming.MicroBatchStream; import org.apache.spark.sql.connector.read.streaming.Offset; import org.apache.spark.sql.connector.read.streaming.ReadLimit; +import org.apache.spark.sql.connector.read.streaming.ReadMaxFiles; +import org.apache.spark.sql.connector.read.streaming.ReadMaxRows; import org.apache.spark.sql.connector.read.streaming.SupportsAdmissionControl; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -309,6 +312,47 @@ public class SparkMicroBatchStream implements MicroBatchStream, SupportsAdmissio } } + private static int getMaxFiles(ReadLimit readLimit) { + if (readLimit instanceof ReadMaxFiles) { + return ((ReadMaxFiles) readLimit).maxFiles(); + } + + if (readLimit instanceof CompositeReadLimit) { + // We do not expect a CompositeReadLimit to contain a nested CompositeReadLimit. + // In fact, it should only be a composite of two or more of ReadMinRows, ReadMaxRows and + // ReadMaxFiles, with no more than one of each. + ReadLimit[] limits = ((CompositeReadLimit) readLimit).getReadLimits(); + for (ReadLimit limit : limits) { + if (limit instanceof ReadMaxFiles) { + return ((ReadMaxFiles) limit).maxFiles(); + } + } + } + + // there is no ReadMaxFiles, so return the default + return Integer.MAX_VALUE; + } + + private static int getMaxRows(ReadLimit readLimit) { + if (readLimit instanceof ReadMaxRows) { + long maxRows = ((ReadMaxRows) readLimit).maxRows(); + return Math.toIntExact(maxRows); + } + + if (readLimit instanceof CompositeReadLimit) { + ReadLimit[] limits = ((CompositeReadLimit) readLimit).getReadLimits(); + for (ReadLimit limit : limits) { + if (limit instanceof ReadMaxRows) { + long maxRows = ((ReadMaxRows) limit).maxRows(); + return Math.toIntExact(maxRows); + } + } + } + + // there is no ReadMaxRows, so return the default + return Integer.MAX_VALUE; + } + @Override @SuppressWarnings("checkstyle:CyclomaticComplexity") public Offset latestOffset(Offset startOffset, ReadLimit limit) { @@ -368,10 +412,8 @@ public class SparkMicroBatchStream implements MicroBatchStream, SupportsAdmissio while (taskIter.hasNext()) { FileScanTask task = taskIter.next(); if (curPos >= startPosOfSnapOffset) { - // TODO : use readLimit provided in function param, the readLimits are derived from - // these 2 properties. - if ((curFilesAdded + 1) > maxFilesPerMicroBatch - || (curRecordCount + task.file().recordCount()) > maxRecordsPerMicroBatch) { + if ((curFilesAdded + 1) > getMaxFiles(limit) + || (curRecordCount + task.file().recordCount()) > getMaxRows(limit)) { shouldContinueReading = false; break; } @@ -431,7 +473,7 @@ public class SparkMicroBatchStream implements MicroBatchStream, SupportsAdmissio && maxRecordsPerMicroBatch != Integer.MAX_VALUE) { ReadLimit[] readLimits = new ReadLimit[2]; readLimits[0] = ReadLimit.maxFiles(maxFilesPerMicroBatch); - readLimits[1] = ReadLimit.maxRows(maxFilesPerMicroBatch); + readLimits[1] = ReadLimit.maxRows(maxRecordsPerMicroBatch); return ReadLimit.compositeLimit(readLimits); } else if (maxFilesPerMicroBatch != Integer.MAX_VALUE) { return ReadLimit.maxFiles(maxFilesPerMicroBatch); diff --git a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java index 2544a8f739..c0e47cbafb 100644 --- a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java +++ b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java @@ -151,8 +151,7 @@ public final class TestStructuredStreamingRead3 extends CatalogTestBase { } @TestTemplate - public void testReadStreamOnIcebergTableWithMultipleSnapshots_WithNumberOfFiles_1() - throws Exception { + public void testReadStreamWithMaxFiles1() throws Exception { appendDataAsMultipleSnapshots(TEST_DATA_MULTIPLE_SNAPSHOTS); assertThat( @@ -162,8 +161,7 @@ public final class TestStructuredStreamingRead3 extends CatalogTestBase { } @TestTemplate - public void testReadStreamOnIcebergTableWithMultipleSnapshots_WithNumberOfFiles_2() - throws Exception { + public void testReadStreamWithMaxFiles2() throws Exception { appendDataAsMultipleSnapshots(TEST_DATA_MULTIPLE_SNAPSHOTS); assertThat( @@ -173,8 +171,7 @@ public final class TestStructuredStreamingRead3 extends CatalogTestBase { } @TestTemplate - public void testReadStreamOnIcebergTableWithMultipleSnapshots_WithNumberOfRows_1() - throws Exception { + public void testReadStreamWithMaxRows1() throws Exception { appendDataAsMultipleSnapshots(TEST_DATA_MULTIPLE_SNAPSHOTS); // only 1 micro-batch will be formed and we will read data partially @@ -183,7 +180,8 @@ public final class TestStructuredStreamingRead3 extends CatalogTestBase { ImmutableMap.of(SparkReadOptions.STREAMING_MAX_ROWS_PER_MICRO_BATCH, "1"))) .isEqualTo(1); - StreamingQuery query = startStream(SparkReadOptions.STREAMING_MAX_ROWS_PER_MICRO_BATCH, "1"); + StreamingQuery query = + startStream(ImmutableMap.of(SparkReadOptions.STREAMING_MAX_ROWS_PER_MICRO_BATCH, "1")); // check answer correctness only 1 record read the micro-batch will be stuck List<SimpleRecord> actual = rowsAvailable(query); @@ -193,8 +191,24 @@ public final class TestStructuredStreamingRead3 extends CatalogTestBase { } @TestTemplate - public void testReadStreamOnIcebergTableWithMultipleSnapshots_WithNumberOfRows_4() - throws Exception { + public void testReadStreamWithMaxRows2() throws Exception { + appendDataAsMultipleSnapshots(TEST_DATA_MULTIPLE_SNAPSHOTS); + + assertThat( + microBatchCount( + ImmutableMap.of(SparkReadOptions.STREAMING_MAX_ROWS_PER_MICRO_BATCH, "2"))) + .isEqualTo(4); + + StreamingQuery query = + startStream(ImmutableMap.of(SparkReadOptions.STREAMING_MAX_ROWS_PER_MICRO_BATCH, "2")); + + List<SimpleRecord> actual = rowsAvailable(query); + assertThat(actual) + .containsExactlyInAnyOrderElementsOf(Iterables.concat(TEST_DATA_MULTIPLE_SNAPSHOTS)); + } + + @TestTemplate + public void testReadStreamWithMaxRows4() throws Exception { appendDataAsMultipleSnapshots(TEST_DATA_MULTIPLE_SNAPSHOTS); assertThat( @@ -203,6 +217,18 @@ public final class TestStructuredStreamingRead3 extends CatalogTestBase { .isEqualTo(2); } + @TestTemplate + public void testReadStreamWithCompositeReadLimit() throws Exception { + appendDataAsMultipleSnapshots(TEST_DATA_MULTIPLE_SNAPSHOTS); + + assertThat( + microBatchCount( + ImmutableMap.of( + SparkReadOptions.STREAMING_MAX_FILES_PER_MICRO_BATCH, "1", + SparkReadOptions.STREAMING_MAX_ROWS_PER_MICRO_BATCH, "2"))) + .isEqualTo(6); + } + @TestTemplate public void testReadStreamOnIcebergThenAddData() throws Exception { List<List<SimpleRecord>> expected = TEST_DATA_MULTIPLE_SNAPSHOTS;