This is an automated email from the ASF dual-hosted git repository.
stevenwu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new dfecabc76c CP 12988 to Spark version (#13412)
dfecabc76c is described below
commit dfecabc76c5b82ecdde7d61abdf540d43ee14f59
Author: Prashant Singh <[email protected]>
AuthorDate: Tue Jul 1 16:55:14 2025 -0700
CP 12988 to Spark version (#13412)
Co-authored-by: Prashant Singh <[email protected]>
---
.../spark/source/SparkMicroBatchStream.java | 17 +++-
.../spark/source/TestStructuredStreamingRead3.java | 109 +++++++++------------
.../spark/source/SparkMicroBatchStream.java | 15 ++-
.../spark/source/TestStructuredStreamingRead3.java | 102 +++++++++----------
4 files changed, 119 insertions(+), 124 deletions(-)
diff --git
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java
index 859f1b23bf..fa18b6edd8 100644
---
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java
+++
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java
@@ -349,7 +349,7 @@ public class SparkMicroBatchStream implements
MicroBatchStream, SupportsAdmissio
}
}
- // there is no ReadMaxRows, so return the default
+ // There is no ReadMaxRows, so return the default
return Integer.MAX_VALUE;
}
@@ -388,7 +388,7 @@ public class SparkMicroBatchStream implements
MicroBatchStream, SupportsAdmissio
boolean shouldContinueReading = true;
int curFilesAdded = 0;
int curRecordCount = 0;
- int curPos = 0;
+ long curPos = 0;
// Note : we produce nextOffset with pos as non-inclusive
while (shouldContinueReading) {
@@ -412,14 +412,23 @@ public class SparkMicroBatchStream implements
MicroBatchStream, SupportsAdmissio
while (taskIter.hasNext()) {
FileScanTask task = taskIter.next();
if (curPos >= startPosOfSnapOffset) {
- if ((curFilesAdded + 1) > getMaxFiles(limit)
- || (curRecordCount + task.file().recordCount()) >
getMaxRows(limit)) {
+ if ((curFilesAdded + 1) > getMaxFiles(limit)) {
+ // On including the file it might happen that we might exceed,
the configured
+ // soft limit on the number of records, since this is a soft
limit its acceptable.
shouldContinueReading = false;
break;
}
curFilesAdded += 1;
curRecordCount += task.file().recordCount();
+
+ if (curRecordCount >= getMaxRows(limit)) {
+ // we included the file, so increment the number of files
+ // read in the current snapshot.
+ ++curPos;
+ shouldContinueReading = false;
+ break;
+ }
}
++curPos;
}
diff --git
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java
index 5ce8cb6645..4f51819312 100644
---
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java
+++
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java
@@ -157,50 +157,42 @@ public final class TestStructuredStreamingRead3 extends
CatalogTestBase {
public void testReadStreamWithMaxFiles1() throws Exception {
appendDataAsMultipleSnapshots(TEST_DATA_MULTIPLE_SNAPSHOTS);
- assertThat(
- microBatchCount(
-
ImmutableMap.of(SparkReadOptions.STREAMING_MAX_FILES_PER_MICRO_BATCH, "1")))
- .isEqualTo(6);
+ assertMicroBatchRecordSizes(
+ ImmutableMap.of(SparkReadOptions.STREAMING_MAX_FILES_PER_MICRO_BATCH,
"1"),
+ List.of(1L, 2L, 1L, 1L, 1L, 1L));
}
@TestTemplate
public void testReadStreamWithMaxFiles2() throws Exception {
appendDataAsMultipleSnapshots(TEST_DATA_MULTIPLE_SNAPSHOTS);
- assertThat(
- microBatchCount(
-
ImmutableMap.of(SparkReadOptions.STREAMING_MAX_FILES_PER_MICRO_BATCH, "2")))
- .isEqualTo(3);
+ assertMicroBatchRecordSizes(
+ ImmutableMap.of(SparkReadOptions.STREAMING_MAX_FILES_PER_MICRO_BATCH,
"2"),
+ List.of(3L, 2L, 2L));
}
@TestTemplate
public void testReadStreamWithMaxRows1() throws Exception {
appendDataAsMultipleSnapshots(TEST_DATA_MULTIPLE_SNAPSHOTS);
+ assertMicroBatchRecordSizes(
+ ImmutableMap.of(SparkReadOptions.STREAMING_MAX_ROWS_PER_MICRO_BATCH,
"1"),
+ List.of(1L, 2L, 1L, 1L, 1L, 1L));
- // only 1 micro-batch will be formed and we will read data partially
- assertThat(
- microBatchCount(
-
ImmutableMap.of(SparkReadOptions.STREAMING_MAX_ROWS_PER_MICRO_BATCH, "1")))
- .isEqualTo(1);
+ // soft limit of 1 is being enforced, the stream is not blocked.
+ StreamingQuery query =
startStream(SparkReadOptions.STREAMING_MAX_ROWS_PER_MICRO_BATCH, "1");
- StreamingQuery query =
-
startStream(ImmutableMap.of(SparkReadOptions.STREAMING_MAX_ROWS_PER_MICRO_BATCH,
"1"));
-
- // check answer correctness only 1 record read the micro-batch will be
stuck
List<SimpleRecord> actual = rowsAvailable(query);
assertThat(actual)
- .containsExactlyInAnyOrderElementsOf(
- Lists.newArrayList(TEST_DATA_MULTIPLE_SNAPSHOTS.get(0).get(0)));
+
.containsExactlyInAnyOrderElementsOf(Iterables.concat(TEST_DATA_MULTIPLE_SNAPSHOTS));
}
@TestTemplate
public void testReadStreamWithMaxRows2() throws Exception {
appendDataAsMultipleSnapshots(TEST_DATA_MULTIPLE_SNAPSHOTS);
- assertThat(
- microBatchCount(
-
ImmutableMap.of(SparkReadOptions.STREAMING_MAX_ROWS_PER_MICRO_BATCH, "2")))
- .isEqualTo(4);
+ assertMicroBatchRecordSizes(
+ ImmutableMap.of(SparkReadOptions.STREAMING_MAX_ROWS_PER_MICRO_BATCH,
"2"),
+ List.of(3L, 2L, 2L));
StreamingQuery query =
startStream(ImmutableMap.of(SparkReadOptions.STREAMING_MAX_ROWS_PER_MICRO_BATCH,
"2"));
@@ -214,22 +206,21 @@ public final class TestStructuredStreamingRead3 extends
CatalogTestBase {
public void testReadStreamWithMaxRows4() throws Exception {
appendDataAsMultipleSnapshots(TEST_DATA_MULTIPLE_SNAPSHOTS);
- assertThat(
- microBatchCount(
-
ImmutableMap.of(SparkReadOptions.STREAMING_MAX_ROWS_PER_MICRO_BATCH, "4")))
- .isEqualTo(2);
+ assertMicroBatchRecordSizes(
+ ImmutableMap.of(SparkReadOptions.STREAMING_MAX_ROWS_PER_MICRO_BATCH,
"4"), List.of(4L, 3L));
}
@TestTemplate
public void testReadStreamWithCompositeReadLimit() throws Exception {
appendDataAsMultipleSnapshots(TEST_DATA_MULTIPLE_SNAPSHOTS);
- assertThat(
- microBatchCount(
- ImmutableMap.of(
- SparkReadOptions.STREAMING_MAX_FILES_PER_MICRO_BATCH, "1",
- SparkReadOptions.STREAMING_MAX_ROWS_PER_MICRO_BATCH, "2")))
- .isEqualTo(6);
+ assertMicroBatchRecordSizes(
+ ImmutableMap.of(
+ SparkReadOptions.STREAMING_MAX_ROWS_PER_MICRO_BATCH,
+ "4",
+ SparkReadOptions.STREAMING_MAX_FILES_PER_MICRO_BATCH,
+ "1"),
+ List.of(1L, 2L, 1L, 1L, 1L, 1L));
}
@TestTemplate
@@ -542,10 +533,9 @@ public final class TestStructuredStreamingRead3 extends
CatalogTestBase {
makeRewriteDataFiles();
- assertThat(
- microBatchCount(
-
ImmutableMap.of(SparkReadOptions.STREAMING_MAX_FILES_PER_MICRO_BATCH, "1")))
- .isEqualTo(6);
+ assertMicroBatchRecordSizes(
+ ImmutableMap.of(SparkReadOptions.STREAMING_MAX_FILES_PER_MICRO_BATCH,
"1"),
+ List.of(1L, 2L, 1L, 1L, 1L, 1L));
}
@TestTemplate
@@ -557,10 +547,9 @@ public final class TestStructuredStreamingRead3 extends
CatalogTestBase {
makeRewriteDataFiles();
- assertThat(
- microBatchCount(
-
ImmutableMap.of(SparkReadOptions.STREAMING_MAX_ROWS_PER_MICRO_BATCH, "4")))
- .isEqualTo(2);
+ assertMicroBatchRecordSizes(
+ ImmutableMap.of(SparkReadOptions.STREAMING_MAX_FILES_PER_MICRO_BATCH,
"4"),
+ List.of(5L, 2L));
}
@TestTemplate
@@ -571,15 +560,13 @@ public final class TestStructuredStreamingRead3 extends
CatalogTestBase {
appendDataAsMultipleSnapshots(expected);
makeRewriteDataFiles();
-
- assertThat(
- microBatchCount(
- ImmutableMap.of(
- SparkReadOptions.STREAMING_MAX_ROWS_PER_MICRO_BATCH,
- "4",
- SparkReadOptions.STREAMING_MAX_FILES_PER_MICRO_BATCH,
- "1")))
- .isEqualTo(6);
+ assertMicroBatchRecordSizes(
+ ImmutableMap.of(
+ SparkReadOptions.STREAMING_MAX_ROWS_PER_MICRO_BATCH,
+ "4",
+ SparkReadOptions.STREAMING_MAX_FILES_PER_MICRO_BATCH,
+ "1"),
+ List.of(1L, 2L, 1L, 1L, 1L, 1L));
}
@TestTemplate
@@ -591,10 +578,9 @@ public final class TestStructuredStreamingRead3 extends
CatalogTestBase {
makeRewriteDataFiles();
makeRewriteDataFiles();
- assertThat(
- microBatchCount(
-
ImmutableMap.of(SparkReadOptions.STREAMING_MAX_FILES_PER_MICRO_BATCH, "1")))
- .isEqualTo(6);
+ assertMicroBatchRecordSizes(
+ ImmutableMap.of(SparkReadOptions.STREAMING_MAX_FILES_PER_MICRO_BATCH,
"1"),
+ List.of(1L, 2L, 1L, 1L, 1L, 1L));
}
@TestTemplate
@@ -608,10 +594,9 @@ public final class TestStructuredStreamingRead3 extends
CatalogTestBase {
appendDataAsMultipleSnapshots(expected);
- assertThat(
- microBatchCount(
-
ImmutableMap.of(SparkReadOptions.STREAMING_MAX_FILES_PER_MICRO_BATCH, "1")))
- .isEqualTo(12);
+ assertMicroBatchRecordSizes(
+ ImmutableMap.of(SparkReadOptions.STREAMING_MAX_FILES_PER_MICRO_BATCH,
"1"),
+ List.of(1L, 2L, 1L, 1L, 1L, 1L, 1L, 2L, 1L, 1L, 1L, 1L));
}
@TestTemplate
@@ -778,8 +763,11 @@ public final class TestStructuredStreamingRead3 extends
CatalogTestBase {
ImmutableMap.of(key, value,
SparkReadOptions.STREAMING_MAX_FILES_PER_MICRO_BATCH, "1"));
}
- private int microBatchCount(Map<String, String> options) throws
TimeoutException {
+ private void assertMicroBatchRecordSizes(
+ Map<String, String> options, List<Long> expectedMicroBatchRecordSize)
+ throws TimeoutException {
Dataset<Row> ds =
spark.readStream().options(options).format("iceberg").load(tableName);
+ List<Long> syncList = Collections.synchronizedList(Lists.newArrayList());
ds.writeStream()
.options(options)
@@ -787,12 +775,13 @@ public final class TestStructuredStreamingRead3 extends
CatalogTestBase {
(VoidFunction2<Dataset<Row>, Long>)
(dataset, batchId) -> {
microBatches.getAndIncrement();
+ syncList.add(dataset.count());
})
.start()
.processAllAvailable();
stopStreams();
- return microBatches.get();
+
assertThat(syncList).containsExactlyInAnyOrderElementsOf(expectedMicroBatchRecordSize);
}
private List<SimpleRecord> rowsAvailable(StreamingQuery query) {
diff --git
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java
index d348c3cc57..f4468fe156 100644
---
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java
+++
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java
@@ -412,14 +412,23 @@ public class SparkMicroBatchStream implements
MicroBatchStream, SupportsAdmissio
while (taskIter.hasNext()) {
FileScanTask task = taskIter.next();
if (curPos >= startPosOfSnapOffset) {
- if ((curFilesAdded + 1) > getMaxFiles(limit)
- || (curRecordCount + task.file().recordCount()) >
getMaxRows(limit)) {
+ if ((curFilesAdded + 1) > getMaxFiles(limit)) {
+ // On including the file it might happen that we might exceed,
the configured
+ // soft limit on the number of records, since this is a soft
limit its acceptable.
shouldContinueReading = false;
break;
}
curFilesAdded += 1;
curRecordCount += task.file().recordCount();
+
+ if (curRecordCount >= getMaxRows(limit)) {
+ // we included the file, so increment the number of files
+ // read in the current snapshot.
+ ++curPos;
+ shouldContinueReading = false;
+ break;
+ }
}
++curPos;
}
@@ -436,7 +445,7 @@ public class SparkMicroBatchStream implements
MicroBatchStream, SupportsAdmissio
if (shouldContinueReading) {
Snapshot nextValid = nextValidSnapshot(curSnapshot);
if (nextValid == null) {
- // nextValide implies all the remaining snapshots should be skipped.
+ // nextValid implies all the remaining snapshots should be skipped.
break;
}
// we found the next available snapshot, continue from there.
diff --git
a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java
b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java
index 370abfe096..bb664f0c72 100644
---
a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java
+++
b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java
@@ -157,50 +157,44 @@ public final class TestStructuredStreamingRead3 extends
CatalogTestBase {
public void testReadStreamWithMaxFiles1() throws Exception {
appendDataAsMultipleSnapshots(TEST_DATA_MULTIPLE_SNAPSHOTS);
- assertThat(
- microBatchCount(
-
ImmutableMap.of(SparkReadOptions.STREAMING_MAX_FILES_PER_MICRO_BATCH, "1")))
- .isEqualTo(6);
+ assertMicroBatchRecordSizes(
+ ImmutableMap.of(SparkReadOptions.STREAMING_MAX_FILES_PER_MICRO_BATCH,
"1"),
+ List.of(1L, 2L, 1L, 1L, 1L, 1L));
}
@TestTemplate
public void testReadStreamWithMaxFiles2() throws Exception {
appendDataAsMultipleSnapshots(TEST_DATA_MULTIPLE_SNAPSHOTS);
- assertThat(
- microBatchCount(
-
ImmutableMap.of(SparkReadOptions.STREAMING_MAX_FILES_PER_MICRO_BATCH, "2")))
- .isEqualTo(3);
+ assertMicroBatchRecordSizes(
+ ImmutableMap.of(SparkReadOptions.STREAMING_MAX_FILES_PER_MICRO_BATCH,
"2"),
+ List.of(3L, 2L, 2L));
}
@TestTemplate
public void testReadStreamWithMaxRows1() throws Exception {
appendDataAsMultipleSnapshots(TEST_DATA_MULTIPLE_SNAPSHOTS);
- // only 1 micro-batch will be formed and we will read data partially
- assertThat(
- microBatchCount(
-
ImmutableMap.of(SparkReadOptions.STREAMING_MAX_ROWS_PER_MICRO_BATCH, "1")))
- .isEqualTo(1);
-
+ assertMicroBatchRecordSizes(
+ ImmutableMap.of(SparkReadOptions.STREAMING_MAX_ROWS_PER_MICRO_BATCH,
"1"),
+ List.of(1L, 2L, 1L, 1L, 1L, 1L));
+ // soft limit of 1 is being enforced, the stream is not blocked.
StreamingQuery query =
startStream(ImmutableMap.of(SparkReadOptions.STREAMING_MAX_ROWS_PER_MICRO_BATCH,
"1"));
// check answer correctness only 1 record read the micro-batch will be
stuck
List<SimpleRecord> actual = rowsAvailable(query);
assertThat(actual)
- .containsExactlyInAnyOrderElementsOf(
- Lists.newArrayList(TEST_DATA_MULTIPLE_SNAPSHOTS.get(0).get(0)));
+
.containsExactlyInAnyOrderElementsOf(Iterables.concat(TEST_DATA_MULTIPLE_SNAPSHOTS));
}
@TestTemplate
public void testReadStreamWithMaxRows2() throws Exception {
appendDataAsMultipleSnapshots(TEST_DATA_MULTIPLE_SNAPSHOTS);
- assertThat(
- microBatchCount(
-
ImmutableMap.of(SparkReadOptions.STREAMING_MAX_ROWS_PER_MICRO_BATCH, "2")))
- .isEqualTo(4);
+ assertMicroBatchRecordSizes(
+ ImmutableMap.of(SparkReadOptions.STREAMING_MAX_ROWS_PER_MICRO_BATCH,
"2"),
+ List.of(3L, 2L, 2L));
StreamingQuery query =
startStream(ImmutableMap.of(SparkReadOptions.STREAMING_MAX_ROWS_PER_MICRO_BATCH,
"2"));
@@ -214,22 +208,19 @@ public final class TestStructuredStreamingRead3 extends
CatalogTestBase {
public void testReadStreamWithMaxRows4() throws Exception {
appendDataAsMultipleSnapshots(TEST_DATA_MULTIPLE_SNAPSHOTS);
- assertThat(
- microBatchCount(
-
ImmutableMap.of(SparkReadOptions.STREAMING_MAX_ROWS_PER_MICRO_BATCH, "4")))
- .isEqualTo(2);
+ assertMicroBatchRecordSizes(
+ ImmutableMap.of(SparkReadOptions.STREAMING_MAX_ROWS_PER_MICRO_BATCH,
"4"), List.of(4L, 3L));
}
@TestTemplate
public void testReadStreamWithCompositeReadLimit() throws Exception {
appendDataAsMultipleSnapshots(TEST_DATA_MULTIPLE_SNAPSHOTS);
- assertThat(
- microBatchCount(
- ImmutableMap.of(
- SparkReadOptions.STREAMING_MAX_FILES_PER_MICRO_BATCH, "1",
- SparkReadOptions.STREAMING_MAX_ROWS_PER_MICRO_BATCH, "2")))
- .isEqualTo(6);
+ assertMicroBatchRecordSizes(
+ ImmutableMap.of(
+ SparkReadOptions.STREAMING_MAX_FILES_PER_MICRO_BATCH, "1",
+ SparkReadOptions.STREAMING_MAX_ROWS_PER_MICRO_BATCH, "2"),
+ List.of(1L, 2L, 1L, 1L, 1L, 1L));
}
@TestTemplate
@@ -542,10 +533,9 @@ public final class TestStructuredStreamingRead3 extends
CatalogTestBase {
makeRewriteDataFiles();
- assertThat(
- microBatchCount(
-
ImmutableMap.of(SparkReadOptions.STREAMING_MAX_FILES_PER_MICRO_BATCH, "1")))
- .isEqualTo(6);
+ assertMicroBatchRecordSizes(
+ ImmutableMap.of(SparkReadOptions.STREAMING_MAX_FILES_PER_MICRO_BATCH,
"1"),
+ List.of(1L, 2L, 1L, 1L, 1L, 1L));
}
@TestTemplate
@@ -557,10 +547,8 @@ public final class TestStructuredStreamingRead3 extends
CatalogTestBase {
makeRewriteDataFiles();
- assertThat(
- microBatchCount(
-
ImmutableMap.of(SparkReadOptions.STREAMING_MAX_ROWS_PER_MICRO_BATCH, "4")))
- .isEqualTo(2);
+ assertMicroBatchRecordSizes(
+ ImmutableMap.of(SparkReadOptions.STREAMING_MAX_ROWS_PER_MICRO_BATCH,
"4"), List.of(4L, 3L));
}
@TestTemplate
@@ -572,14 +560,13 @@ public final class TestStructuredStreamingRead3 extends
CatalogTestBase {
makeRewriteDataFiles();
- assertThat(
- microBatchCount(
- ImmutableMap.of(
- SparkReadOptions.STREAMING_MAX_ROWS_PER_MICRO_BATCH,
- "4",
- SparkReadOptions.STREAMING_MAX_FILES_PER_MICRO_BATCH,
- "1")))
- .isEqualTo(6);
+ assertMicroBatchRecordSizes(
+ ImmutableMap.of(
+ SparkReadOptions.STREAMING_MAX_ROWS_PER_MICRO_BATCH,
+ "4",
+ SparkReadOptions.STREAMING_MAX_FILES_PER_MICRO_BATCH,
+ "1"),
+ List.of(1L, 2L, 1L, 1L, 1L, 1L));
}
@TestTemplate
@@ -591,10 +578,9 @@ public final class TestStructuredStreamingRead3 extends
CatalogTestBase {
makeRewriteDataFiles();
makeRewriteDataFiles();
- assertThat(
- microBatchCount(
-
ImmutableMap.of(SparkReadOptions.STREAMING_MAX_FILES_PER_MICRO_BATCH, "1")))
- .isEqualTo(6);
+ assertMicroBatchRecordSizes(
+ ImmutableMap.of(SparkReadOptions.STREAMING_MAX_FILES_PER_MICRO_BATCH,
"1"),
+ List.of(1L, 2L, 1L, 1L, 1L, 1L));
}
@TestTemplate
@@ -608,10 +594,9 @@ public final class TestStructuredStreamingRead3 extends
CatalogTestBase {
appendDataAsMultipleSnapshots(expected);
- assertThat(
- microBatchCount(
-
ImmutableMap.of(SparkReadOptions.STREAMING_MAX_FILES_PER_MICRO_BATCH, "1")))
- .isEqualTo(12);
+ assertMicroBatchRecordSizes(
+ ImmutableMap.of(SparkReadOptions.STREAMING_MAX_FILES_PER_MICRO_BATCH,
"1"),
+ List.of(1L, 2L, 1L, 1L, 1L, 1L, 1L, 2L, 1L, 1L, 1L, 1L));
}
@TestTemplate
@@ -778,21 +763,24 @@ public final class TestStructuredStreamingRead3 extends
CatalogTestBase {
ImmutableMap.of(key, value,
SparkReadOptions.STREAMING_MAX_FILES_PER_MICRO_BATCH, "1"));
}
- private int microBatchCount(Map<String, String> options) throws
TimeoutException {
+ private void assertMicroBatchRecordSizes(
+ Map<String, String> options, List<Long> expectedMicroBatchRecordSize)
+ throws TimeoutException {
Dataset<Row> ds =
spark.readStream().options(options).format("iceberg").load(tableName);
+ List<Long> syncList = Collections.synchronizedList(Lists.newArrayList());
ds.writeStream()
.options(options)
.foreachBatch(
(VoidFunction2<Dataset<Row>, Long>)
(dataset, batchId) -> {
- microBatches.getAndIncrement();
+ syncList.add(dataset.count());
})
.start()
.processAllAvailable();
stopStreams();
- return microBatches.get();
+
assertThat(syncList).containsExactlyInAnyOrderElementsOf(expectedMicroBatchRecordSize);
}
private List<SimpleRecord> rowsAvailable(StreamingQuery query) {