RussellSpitzer commented on a change in pull request #796:
URL: https://github.com/apache/iceberg/pull/796#discussion_r487131427
##########
File path: core/src/main/java/org/apache/iceberg/util/SnapshotUtil.java
##########
@@ -68,8 +69,20 @@ public static boolean ancestorOf(Table table, long
snapshotId, long ancestorSnap
* This method assumes that fromSnapshotId is an ancestor of toSnapshotId
*/
public static List<Long> snapshotIdsBetween(Table table, long
fromSnapshotId, long toSnapshotId) {
+ AtomicBoolean isAncestor = new AtomicBoolean(false);
Review comment:
you can't just have
if (snapshotId == fromSnapshotId) throw new IllegalStateException
?
##########
File path:
spark2/src/main/java/org/apache/iceberg/spark/source/StreamingReader.java
##########
@@ -0,0 +1,323 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+import java.util.stream.Collectors;
+import org.apache.iceberg.CombinedScanTask;
+import org.apache.iceberg.DataOperations;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.MicroBatches;
+import org.apache.iceberg.MicroBatches.MicroBatch;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.encryption.EncryptionManager;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.FileIO;
+import
org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.util.Pair;
+import org.apache.iceberg.util.PropertyUtil;
+import org.apache.iceberg.util.SnapshotUtil;
+import org.apache.iceberg.util.TableScanUtil;
+import org.apache.spark.broadcast.Broadcast;
+import org.apache.spark.sql.sources.v2.DataSourceOptions;
+import org.apache.spark.sql.sources.v2.reader.streaming.MicroBatchReader;
+import org.apache.spark.sql.sources.v2.reader.streaming.Offset;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.iceberg.TableProperties.SPLIT_LOOKBACK;
+import static org.apache.iceberg.TableProperties.SPLIT_LOOKBACK_DEFAULT;
+import static org.apache.iceberg.TableProperties.SPLIT_OPEN_FILE_COST;
+import static org.apache.iceberg.TableProperties.SPLIT_OPEN_FILE_COST_DEFAULT;
+import static org.apache.iceberg.TableProperties.SPLIT_SIZE;
+import static org.apache.iceberg.TableProperties.SPLIT_SIZE_DEFAULT;
+
+/**
+ * A micro-batch based Spark Structured Streaming reader for Iceberg table. It
will track the added
+ * files and generate tasks per batch to process newly added files. By default
it will process
+ * all the newly added files to the current snapshot in each batch, user could
also set this
+ * configuration "max-files-per-trigger" to control the number of files
processed per batch.
+ */
+class StreamingReader extends Reader implements MicroBatchReader {
+ private static final Logger LOG =
LoggerFactory.getLogger(StreamingReader.class);
+
+ private StreamingOffset startOffset;
+ private StreamingOffset endOffset;
+
+ private final Table table;
+ private final long maxSizePerBatch;
+ private final Long startSnapshotId;
+ private final long splitSize;
+ private final int splitLookback;
+ private final long splitOpenFileCost;
+ private Boolean readUsingBatch = null;
+
+ // Used to cache the pending batches for this streaming batch interval.
+ private Pair<StreamingOffset, List<MicroBatch>> cachedPendingBatches = null;
+
+ StreamingReader(Table table, Broadcast<FileIO> io,
Broadcast<EncryptionManager> encryptionManager,
+ boolean caseSensitive, DataSourceOptions options) {
+ super(table, io, encryptionManager, caseSensitive, options);
+
+ this.table = table;
+ this.maxSizePerBatch =
options.get("max-size-per-batch").map(Long::parseLong).orElse(Long.MAX_VALUE);
+ Preconditions.checkArgument(maxSizePerBatch > 0L,
+ "Option max-size-per-batch '%s' should > 0", maxSizePerBatch);
+
+ this.startSnapshotId =
options.get("starting-snapshot-id").map(Long::parseLong).orElse(null);
+ if (startSnapshotId != null) {
+ if (!SnapshotUtil.ancestorOf(table,
table.currentSnapshot().snapshotId(), startSnapshotId)) {
+ throw new IllegalStateException("The option starting-snapshot-id " +
startSnapshotId +
+ " is not an ancestor of the current snapshot");
+ }
+ }
+
+ this.splitSize = Optional.ofNullable(splitSize())
+ .orElseGet(() -> PropertyUtil.propertyAsLong(table.properties(),
SPLIT_SIZE, SPLIT_SIZE_DEFAULT));
+ this.splitLookback = Optional.ofNullable(splitLookback())
+ .orElseGet(() -> PropertyUtil.propertyAsInt(table.properties(),
SPLIT_LOOKBACK, SPLIT_LOOKBACK_DEFAULT));
+ this.splitOpenFileCost = Optional.ofNullable(splitOpenFileCost())
+ .orElseGet(() ->
+ PropertyUtil.propertyAsLong(table.properties(),
SPLIT_OPEN_FILE_COST, SPLIT_OPEN_FILE_COST_DEFAULT));
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public void setOffsetRange(Optional<Offset> start, Optional<Offset> end) {
+ table.refresh();
+
+ if (start.isPresent() &&
!StreamingOffset.START_OFFSET.equals(start.get())) {
+ this.startOffset = (StreamingOffset) start.get();
+ this.endOffset = (StreamingOffset) end.orElseGet(() ->
calculateEndOffset(startOffset));
+ } else {
+ // If starting offset is "START_OFFSET" (there's no snapshot in the last
batch), or starting
+ // offset is not set, then we need to calculate the starting offset
again.
+ this.startOffset = calculateStartingOffset();
+ this.endOffset = calculateEndOffset(startOffset);
+ }
+ }
+
+ @Override
+ public Offset getStartOffset() {
+ if (startOffset == null) {
+ throw new IllegalStateException("Start offset is not set");
+ }
+
+ return startOffset;
+ }
+
+ @Override
+ public Offset getEndOffset() {
+ if (endOffset == null) {
+ throw new IllegalStateException("End offset is not set");
+ }
+
+ return endOffset;
+ }
+
+ @Override
+ public Offset deserializeOffset(String json) {
+ return StreamingOffset.fromJson(json);
+ }
+
+ @Override
+ public void commit(Offset end) {
+ // Since all the data and metadata of Iceberg is as it is, nothing needs
to commit when
+ // offset is processed, so no need to implement this method.
+ }
+
+ @Override
+ public void stop() {
+ }
+
+ @Override
+ public boolean enableBatchRead() {
+ return readUsingBatch == null ? false : readUsingBatch;
+ }
+
+ @Override
+ public String toString() {
+ return String.format(
+ "IcebergStreamScan(table=%s, type=%s)", table,
table.schema().asStruct());
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ protected List<CombinedScanTask> tasks() {
+ if (startOffset.equals(endOffset)) {
+ LOG.info("Start offset {} equals to end offset {}, no data to process",
startOffset, endOffset);
+ return Collections.emptyList();
+ }
+
+ Preconditions.checkState(cachedPendingBatches != null,
+ "pendingBatches is null, which is unexpected as it will be set when
calculating end offset");
+
+ List<MicroBatch> pendingBatches = cachedPendingBatches.second();
+ if (pendingBatches.isEmpty()) {
+ LOG.info("Current start offset {} and end offset {}, there's no task to
process in this batch",
+ startOffset, endOffset);
+ return Collections.emptyList();
+ }
+
+ MicroBatch lastBatch = pendingBatches.get(pendingBatches.size() - 1);
+ Preconditions.checkState(
+ lastBatch.snapshotId() == endOffset.snapshotId() &&
lastBatch.endFileIndex() == endOffset.index(),
+ "The cached pendingBatches doesn't match the current end offset " +
endOffset);
+
+ LOG.info("Processing data from {} to {}", startOffset, endOffset);
+ List<FileScanTask> tasks = pendingBatches.stream()
+ .flatMap(batch -> batch.tasks().stream())
+ .collect(Collectors.toList());
+ CloseableIterable<FileScanTask> splitTasks =
TableScanUtil.splitFiles(CloseableIterable.withNoopClose(tasks),
+ splitSize);
+ List<CombinedScanTask> combinedScanTasks = Lists.newArrayList(
+ TableScanUtil.planTasks(splitTasks, splitSize, splitLookback,
splitOpenFileCost));
+
+ if (readUsingBatch == null) {
+ this.readUsingBatch = checkEnableBatchRead(combinedScanTasks);
+ }
+
+ return combinedScanTasks;
+ }
+
+ private StreamingOffset calculateStartingOffset() {
+ StreamingOffset startingOffset;
+ if (startSnapshotId != null) {
+ startingOffset = new StreamingOffset(startSnapshotId, 0, true, false);
+ } else {
+ List<Long> snapshotIds = SnapshotUtil.currentAncestors(table);
+ if (snapshotIds.isEmpty()) {
+ // there's no snapshot currently.
+ startingOffset = StreamingOffset.START_OFFSET;
+ } else {
+ startingOffset = new
StreamingOffset(snapshotIds.get(snapshotIds.size() - 1), 0, true, false);
Review comment:
Guava Iterables.getLast could be used instead of
snapshotIds.get(snapshotIds.size -1),
There is also a Iterables.getLast(iterable, default) which could be used to
collapse this if/else branch
##########
File path:
spark2/src/main/java/org/apache/iceberg/spark/source/StreamingReader.java
##########
@@ -0,0 +1,323 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+import java.util.stream.Collectors;
+import org.apache.iceberg.CombinedScanTask;
+import org.apache.iceberg.DataOperations;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.MicroBatches;
+import org.apache.iceberg.MicroBatches.MicroBatch;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.encryption.EncryptionManager;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.FileIO;
+import
org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.util.Pair;
+import org.apache.iceberg.util.PropertyUtil;
+import org.apache.iceberg.util.SnapshotUtil;
+import org.apache.iceberg.util.TableScanUtil;
+import org.apache.spark.broadcast.Broadcast;
+import org.apache.spark.sql.sources.v2.DataSourceOptions;
+import org.apache.spark.sql.sources.v2.reader.streaming.MicroBatchReader;
+import org.apache.spark.sql.sources.v2.reader.streaming.Offset;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.iceberg.TableProperties.SPLIT_LOOKBACK;
+import static org.apache.iceberg.TableProperties.SPLIT_LOOKBACK_DEFAULT;
+import static org.apache.iceberg.TableProperties.SPLIT_OPEN_FILE_COST;
+import static org.apache.iceberg.TableProperties.SPLIT_OPEN_FILE_COST_DEFAULT;
+import static org.apache.iceberg.TableProperties.SPLIT_SIZE;
+import static org.apache.iceberg.TableProperties.SPLIT_SIZE_DEFAULT;
+
+/**
+ * A micro-batch based Spark Structured Streaming reader for Iceberg table. It
will track the added
+ * files and generate tasks per batch to process newly added files. By default
it will process
+ * all the newly added files to the current snapshot in each batch, user could
also set this
+ * configuration "max-files-per-trigger" to control the number of files
processed per batch.
+ */
+class StreamingReader extends Reader implements MicroBatchReader {
+ private static final Logger LOG =
LoggerFactory.getLogger(StreamingReader.class);
+
+ private StreamingOffset startOffset;
+ private StreamingOffset endOffset;
+
+ private final Table table;
+ private final long maxSizePerBatch;
+ private final Long startSnapshotId;
+ private final long splitSize;
+ private final int splitLookback;
+ private final long splitOpenFileCost;
+ private Boolean readUsingBatch = null;
+
+ // Used to cache the pending batches for this streaming batch interval.
+ private Pair<StreamingOffset, List<MicroBatch>> cachedPendingBatches = null;
+
+ StreamingReader(Table table, Broadcast<FileIO> io,
Broadcast<EncryptionManager> encryptionManager,
+ boolean caseSensitive, DataSourceOptions options) {
+ super(table, io, encryptionManager, caseSensitive, options);
+
+ this.table = table;
+ this.maxSizePerBatch =
options.get("max-size-per-batch").map(Long::parseLong).orElse(Long.MAX_VALUE);
+ Preconditions.checkArgument(maxSizePerBatch > 0L,
+ "Option max-size-per-batch '%s' should > 0", maxSizePerBatch);
+
+ this.startSnapshotId =
options.get("starting-snapshot-id").map(Long::parseLong).orElse(null);
+ if (startSnapshotId != null) {
+ if (!SnapshotUtil.ancestorOf(table,
table.currentSnapshot().snapshotId(), startSnapshotId)) {
+ throw new IllegalStateException("The option starting-snapshot-id " +
startSnapshotId +
+ " is not an ancestor of the current snapshot");
+ }
+ }
+
+ this.splitSize = Optional.ofNullable(splitSize())
+ .orElseGet(() -> PropertyUtil.propertyAsLong(table.properties(),
SPLIT_SIZE, SPLIT_SIZE_DEFAULT));
+ this.splitLookback = Optional.ofNullable(splitLookback())
+ .orElseGet(() -> PropertyUtil.propertyAsInt(table.properties(),
SPLIT_LOOKBACK, SPLIT_LOOKBACK_DEFAULT));
+ this.splitOpenFileCost = Optional.ofNullable(splitOpenFileCost())
+ .orElseGet(() ->
+ PropertyUtil.propertyAsLong(table.properties(),
SPLIT_OPEN_FILE_COST, SPLIT_OPEN_FILE_COST_DEFAULT));
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public void setOffsetRange(Optional<Offset> start, Optional<Offset> end) {
+ table.refresh();
+
+ if (start.isPresent() &&
!StreamingOffset.START_OFFSET.equals(start.get())) {
+ this.startOffset = (StreamingOffset) start.get();
+ this.endOffset = (StreamingOffset) end.orElseGet(() ->
calculateEndOffset(startOffset));
+ } else {
+ // If starting offset is "START_OFFSET" (there's no snapshot in the last
batch), or starting
+ // offset is not set, then we need to calculate the starting offset
again.
+ this.startOffset = calculateStartingOffset();
+ this.endOffset = calculateEndOffset(startOffset);
+ }
+ }
+
+ @Override
+ public Offset getStartOffset() {
+ if (startOffset == null) {
+ throw new IllegalStateException("Start offset is not set");
+ }
+
+ return startOffset;
+ }
+
+ @Override
+ public Offset getEndOffset() {
+ if (endOffset == null) {
+ throw new IllegalStateException("End offset is not set");
+ }
+
+ return endOffset;
+ }
+
+ @Override
+ public Offset deserializeOffset(String json) {
+ return StreamingOffset.fromJson(json);
+ }
+
+ @Override
+ public void commit(Offset end) {
+ // Since all the data and metadata of Iceberg is as it is, nothing needs
to commit when
+ // offset is processed, so no need to implement this method.
+ }
+
+ @Override
+ public void stop() {
+ }
+
+ @Override
+ public boolean enableBatchRead() {
+ return readUsingBatch == null ? false : readUsingBatch;
+ }
+
+ @Override
+ public String toString() {
+ return String.format(
+ "IcebergStreamScan(table=%s, type=%s)", table,
table.schema().asStruct());
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ protected List<CombinedScanTask> tasks() {
+ if (startOffset.equals(endOffset)) {
+ LOG.info("Start offset {} equals to end offset {}, no data to process",
startOffset, endOffset);
+ return Collections.emptyList();
+ }
+
+ Preconditions.checkState(cachedPendingBatches != null,
+ "pendingBatches is null, which is unexpected as it will be set when
calculating end offset");
+
+ List<MicroBatch> pendingBatches = cachedPendingBatches.second();
+ if (pendingBatches.isEmpty()) {
+ LOG.info("Current start offset {} and end offset {}, there's no task to
process in this batch",
+ startOffset, endOffset);
+ return Collections.emptyList();
+ }
+
+ MicroBatch lastBatch = pendingBatches.get(pendingBatches.size() - 1);
+ Preconditions.checkState(
+ lastBatch.snapshotId() == endOffset.snapshotId() &&
lastBatch.endFileIndex() == endOffset.index(),
+ "The cached pendingBatches doesn't match the current end offset " +
endOffset);
+
+ LOG.info("Processing data from {} to {}", startOffset, endOffset);
+ List<FileScanTask> tasks = pendingBatches.stream()
+ .flatMap(batch -> batch.tasks().stream())
+ .collect(Collectors.toList());
+ CloseableIterable<FileScanTask> splitTasks =
TableScanUtil.splitFiles(CloseableIterable.withNoopClose(tasks),
+ splitSize);
+ List<CombinedScanTask> combinedScanTasks = Lists.newArrayList(
+ TableScanUtil.planTasks(splitTasks, splitSize, splitLookback,
splitOpenFileCost));
+
+ if (readUsingBatch == null) {
+ this.readUsingBatch = checkEnableBatchRead(combinedScanTasks);
+ }
+
+ return combinedScanTasks;
+ }
+
+ private StreamingOffset calculateStartingOffset() {
+ StreamingOffset startingOffset;
+ if (startSnapshotId != null) {
+ startingOffset = new StreamingOffset(startSnapshotId, 0, true, false);
+ } else {
+ List<Long> snapshotIds = SnapshotUtil.currentAncestors(table);
+ if (snapshotIds.isEmpty()) {
+ // there's no snapshot currently.
+ startingOffset = StreamingOffset.START_OFFSET;
+ } else {
+ startingOffset = new
StreamingOffset(snapshotIds.get(snapshotIds.size() - 1), 0, true, false);
+ }
+ }
+
+ return startingOffset;
+ }
+
+ private StreamingOffset calculateEndOffset(StreamingOffset start) {
+ if (start.equals(StreamingOffset.START_OFFSET)) {
+ return StreamingOffset.START_OFFSET;
+ }
+
+ // Spark will invoke setOffsetRange more than once. If this is already
calulated, use the cached one to avoid
+ // calculating again.
+ if (cachedPendingBatches == null ||
!cachedPendingBatches.first().equals(start)) {
+ this.cachedPendingBatches = Pair.of(start,
getChangesWithRateLimit(start, maxSizePerBatch));
+ }
+
+ List<MicroBatch> batches = cachedPendingBatches.second();
+ MicroBatch lastBatch = batches.isEmpty() ? null :
batches.get(batches.size() - 1);
Review comment:
Could also be Iterables.getLast(batches, null)
##########
File path:
spark2/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead.java
##########
@@ -0,0 +1,522 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Optional;
+import java.util.stream.Collectors;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.AssertHelpers;
+import org.apache.iceberg.MicroBatches.MicroBatch;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.hadoop.HadoopTables;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.SnapshotUtil;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Encoders;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.sources.v2.DataSourceOptions;
+import org.apache.spark.sql.streaming.DataStreamWriter;
+import org.apache.spark.sql.streaming.StreamingQuery;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import static org.apache.iceberg.types.Types.NestedField.optional;
+
+public class TestStructuredStreamingRead {
+ private static final Configuration CONF = new Configuration();
+ private static final Schema SCHEMA = new Schema(
+ optional(1, "id", Types.IntegerType.get()),
+ optional(2, "data", Types.StringType.get())
+ );
+ private static SparkSession spark = null;
+ private static Path parent = null;
+ private static File tableLocation = null;
+ private static Table table = null;
+ private static List<List<SimpleRecord>> expected = null;
+
+ @Rule
+ public ExpectedException exceptionRule = ExpectedException.none();
+
+ @BeforeClass
+ public static void startSpark() throws Exception {
+ TestStructuredStreamingRead.spark = SparkSession.builder()
+ .master("local[2]")
+ .config("spark.sql.shuffle.partitions", 4)
+ .getOrCreate();
+
+ parent = Files.createTempDirectory("test");
+ tableLocation = new File(parent.toFile(), "table");
+ tableLocation.mkdir();
+
+ HadoopTables tables = new HadoopTables(CONF);
+ PartitionSpec spec =
PartitionSpec.builderFor(SCHEMA).identity("data").build();
+ table = tables.create(SCHEMA, spec, tableLocation.toString());
+
+ expected = Lists.newArrayList(
+ Lists.newArrayList(new SimpleRecord(1, "1")),
+ Lists.newArrayList(new SimpleRecord(2, "2")),
+ Lists.newArrayList(new SimpleRecord(3, "3")),
+ Lists.newArrayList(new SimpleRecord(4, "4"))
+ );
+
+ // Write records one by one to generate 4 snapshots.
+ for (List<SimpleRecord> l : expected) {
+ Dataset<Row> df = spark.createDataFrame(l, SimpleRecord.class);
+ df.select("id", "data").write()
+ .format("iceberg")
+ .mode("append")
+ .save(tableLocation.toString());
+ }
+ table.refresh();
+ }
+
+ @AfterClass
+ public static void stopSpark() {
+ SparkSession currentSpark = TestStructuredStreamingRead.spark;
+ TestStructuredStreamingRead.spark = null;
+ currentSpark.stop();
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void testGetChangesFromStart() throws IOException {
+ File checkpoint = Files.createTempDirectory(parent, "checkpoint").toFile();
+
+ DataSourceOptions options = new DataSourceOptions(ImmutableMap.of(
+ "path", tableLocation.toString(),
+ "checkpointLocation", checkpoint.toString()));
+ IcebergSource source = new IcebergSource();
+
+ StreamingReader streamingReader = (StreamingReader)
source.createMicroBatchReader(
+ Optional.empty(), checkpoint.toString(), options);
+
+ List<Long> snapshotIds = SnapshotUtil.currentAncestors(table);
+ Collections.reverse(snapshotIds);
+ long initialSnapshotId = snapshotIds.get(0);
+
+ // Getting all appends from initial snapshot.
+ List<MicroBatch> pendingBatches = streamingReader.getChangesWithRateLimit(
+ new StreamingOffset(initialSnapshotId, 0, true, false),
Long.MAX_VALUE);
+ Assert.assertEquals("Batches with unlimited size control should have 4
snapshots", 4, pendingBatches.size());
+
+ List<Long> batchSnapshotIds = pendingBatches.stream()
+ .map(MicroBatch::snapshotId)
+ .collect(Collectors.toList());
+ Assert.assertEquals("Snapshot id of each batch should match snapshot id of
table", snapshotIds, batchSnapshotIds);
+
+ // Getting appends from initial snapshot with last index, 1st snapshot
should be an empty batch.
+ List<MicroBatch> pendingBatches1 = streamingReader.getChangesWithRateLimit(
+ new StreamingOffset(initialSnapshotId, 1, true, false),
Long.MAX_VALUE);
+
+ Assert.assertEquals("Batches with unlimited size control from initial id
should have 4 snapshots",
+ 4, pendingBatches1.size());
+ MicroBatch batch = pendingBatches1.get(0);
+ // 1st batch should be empty, since the starting offset is the end of this
snapshot.
+ Assert.assertEquals("1st batch should be empty, have 0 size batch", 0L,
batch.sizeInBytes());
+ Assert.assertEquals("1st batch should be empty, endFileIndex should be
equal to start", 1, batch.endFileIndex());
+ Assert.assertTrue("1st batch should be empty",
Iterables.isEmpty(batch.tasks()));
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void testGetChangesFrom2ndSnapshot() throws IOException {
+ File checkpoint = Files.createTempDirectory(parent, "checkpoint").toFile();
+
+ DataSourceOptions options = new DataSourceOptions(ImmutableMap.of(
+ "path", tableLocation.toString(),
+ "checkpointLocation", checkpoint.toString()));
+ IcebergSource source = new IcebergSource();
+
+ StreamingReader streamingReader = (StreamingReader)
source.createMicroBatchReader(
+ Optional.empty(), checkpoint.toString(), options);
+
+ List<Long> snapshotIds = SnapshotUtil.currentAncestors(table);
+ Collections.reverse(snapshotIds);
+ long initialSnapshotId = snapshotIds.get(0);
+
+ // Getting appends from 2nd snapshot, 1st snapshot should be filtered out.
+ List<MicroBatch> pendingBatches = streamingReader.getChangesWithRateLimit(
+ new StreamingOffset(snapshotIds.get(1), 0, false, false),
Long.MAX_VALUE);
+
+ Assert.assertEquals(3, pendingBatches.size());
+ List<Long> batchSnapshotIds = pendingBatches.stream()
+ .map(MicroBatch::snapshotId)
+ .collect(Collectors.toList());
+ Assert.assertFalse("1st snapshot should be filtered",
batchSnapshotIds.contains(initialSnapshotId));
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void testGetChangesFromLastSnapshot() throws IOException {
+ File checkpoint = Files.createTempDirectory(parent, "checkpoint").toFile();
+
+ DataSourceOptions options = new DataSourceOptions(ImmutableMap.of(
+ "path", tableLocation.toString(),
+ "checkpointLocation", checkpoint.toString()));
+ IcebergSource source = new IcebergSource();
+
+ StreamingReader streamingReader = (StreamingReader)
source.createMicroBatchReader(
+ Optional.empty(), checkpoint.toString(), options);
+
+ List<Long> snapshotIds = SnapshotUtil.currentAncestors(table);
+ Collections.reverse(snapshotIds);
+
+ // Getting appends from last snapshot with last index, should get an empty
batch.
+ long lastSnapshotId = snapshotIds.get(3);
+ List<MicroBatch> pendingBatches = streamingReader.getChangesWithRateLimit(
+ new StreamingOffset(lastSnapshotId, 1, false, false), Long.MAX_VALUE);
+
+ Assert.assertEquals("Should only have 1 batch with last snapshot", 1,
pendingBatches.size());
+ MicroBatch batch = pendingBatches.get(0);
+ Assert.assertEquals("batch should have 0 size", 0L, batch.sizeInBytes());
+ Assert.assertEquals("batch endFileIndex should be euqal to start", 1,
batch.endFileIndex());
+ Assert.assertTrue("batch should be empty",
Iterables.isEmpty(batch.tasks()));
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void testGetChangesWithRateLimit1000() throws IOException {
+ File checkpoint = Files.createTempDirectory(parent, "checkpoint").toFile();
+
+ IcebergSource source = new IcebergSource();
+ List<Long> snapshotIds = SnapshotUtil.currentAncestors(table);
+ Collections.reverse(snapshotIds);
+ long initialSnapshotId = snapshotIds.get(0);
+
+ DataSourceOptions options = new DataSourceOptions(ImmutableMap.of(
+ "path", tableLocation.toString(),
+ "checkpointLocation", checkpoint.toString()));
+ StreamingReader streamingReader = (StreamingReader)
source.createMicroBatchReader(
+ Optional.empty(), checkpoint.toString(), options);
+
+ // The size of each data file is around 600 bytes.
+ // Max size set to 1000. One additional batch will be added because the
left size is less than file size,
+ // MicroBatchBuilder will add one more to avoid stuck.
+ List<MicroBatch> rateLimitedBatches =
streamingReader.getChangesWithRateLimit(
+ new StreamingOffset(initialSnapshotId, 0, true, false), 1000);
+
+ Assert.assertEquals("Should have 2 batches", 2L,
rateLimitedBatches.size());
+ MicroBatch batch = rateLimitedBatches.get(0);
+ Assert.assertEquals("1st batch's endFileIndex should reach to the end of
file indexes", 1, batch.endFileIndex());
+ Assert.assertTrue("1st batch should be the last index of 1st snapshot",
batch.lastIndexOfSnapshot());
+ Assert.assertEquals("1st batch should only have 1 task", 1,
batch.tasks().size());
+ Assert.assertTrue("1st batch's size should be around 600",
batch.sizeInBytes() < 1000 && batch.sizeInBytes() > 0);
+
+ MicroBatch batch1 = rateLimitedBatches.get(1);
+ Assert.assertEquals("2nd batch's endFileIndex should reach to the end of
file indexes", 1, batch1.endFileIndex());
+ Assert.assertTrue("2nd batch should be the last of 2nd snapshot",
batch1.lastIndexOfSnapshot());
+ Assert.assertEquals("2nd batch should only have 1 task", 1,
batch1.tasks().size());
+ Assert.assertTrue("2nd batch's size should be aound 600",
batch1.sizeInBytes() < 1000 && batch1.sizeInBytes() > 0);
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void testGetChangesWithRateLimit100() throws IOException {
+ File checkpoint = Files.createTempDirectory(parent, "checkpoint").toFile();
+
+ IcebergSource source = new IcebergSource();
+ List<Long> snapshotIds = SnapshotUtil.currentAncestors(table);
+ Collections.reverse(snapshotIds);
+
+ DataSourceOptions options = new DataSourceOptions(ImmutableMap.of(
+ "path", tableLocation.toString(),
+ "checkpointLocation", checkpoint.toString()));
+ StreamingReader streamingReader = (StreamingReader)
source.createMicroBatchReader(
+ Optional.empty(), checkpoint.toString(), options);
+
+ // Max size less than file size, should have one batch added to avoid
stuck.
+ List<MicroBatch> rateLimitedBatches =
streamingReader.getChangesWithRateLimit(
+ new StreamingOffset(snapshotIds.get(1), 1, false, true), 100);
+
+ Assert.assertEquals("Should only have 1 batch", 1,
rateLimitedBatches.size());
+ MicroBatch batch = rateLimitedBatches.get(0);
+ Assert.assertEquals("Batch's endFileIndex should reach to the end of file
indexes", 1, batch.endFileIndex());
+ Assert.assertTrue("Batch should be the last of 1st snapshot",
batch.lastIndexOfSnapshot());
+ Assert.assertEquals("Batch should have 1 task", 1, batch.tasks().size());
+ Assert.assertTrue("Batch's size should be around 600", batch.sizeInBytes()
< 1000 && batch.sizeInBytes() > 0);
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void testGetChangesWithRateLimit10000() throws IOException {
+ File checkpoint = Files.createTempDirectory(parent, "checkpoint").toFile();
+
+ IcebergSource source = new IcebergSource();
+ List<Long> snapshotIds = SnapshotUtil.currentAncestors(table);
+ Collections.reverse(snapshotIds);
+
+ DataSourceOptions options = new DataSourceOptions(ImmutableMap.of(
+ "path", tableLocation.toString(),
+ "checkpointLocation", checkpoint.toString()));
+ StreamingReader streamingReader = (StreamingReader)
source.createMicroBatchReader(
+ Optional.empty(), checkpoint.toString(), options);
+
+ // Max size set to 10000, the last left batch will be added.
+ List<MicroBatch> rateLimitedBatches =
streamingReader.getChangesWithRateLimit(
+ new StreamingOffset(snapshotIds.get(2), 1, false, true), 10000);
+
+ Assert.assertEquals("Should only have 1 batch", 1,
rateLimitedBatches.size());
+ MicroBatch batch = rateLimitedBatches.get(0);
+ Assert.assertEquals("Batch's endFileIndex should reach to the end of file
indexes", 1, batch.endFileIndex());
+ Assert.assertEquals("Batch should have 1 task", 1, batch.tasks().size());
+ Assert.assertTrue("Batch should have 1 task", batch.lastIndexOfSnapshot());
+ Assert.assertTrue("Batch's size should be around 600", batch.sizeInBytes()
< 1000 && batch.sizeInBytes() > 0);
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void testGetOffsetWithDefaultRateLimit() throws IOException {
+ File checkpoint = Files.createTempDirectory(parent, "checkpoint").toFile();
+
+ IcebergSource source = new IcebergSource();
+ List<Long> snapshotIds = SnapshotUtil.currentAncestors(table);
+ Collections.reverse(snapshotIds);
+
+ // Default max size per batch, this will consume all the data of this
table.
+ DataSourceOptions options = new DataSourceOptions(ImmutableMap.of(
+ "path", tableLocation.toString(),
+ "checkpointLocation", checkpoint.toString()));
+ StreamingReader streamingReader = (StreamingReader)
source.createMicroBatchReader(
+ Optional.empty(), checkpoint.toString(), options);
+ streamingReader.setOffsetRange(Optional.empty(), Optional.empty());
+
+ StreamingOffset start = (StreamingOffset) streamingReader.getStartOffset();
+ Assert.assertEquals("Start offset's snapshot id should be 1st snapshot id",
+ snapshotIds.get(0).longValue(), start.snapshotId());
+ Assert.assertEquals("Start offset's index should be the start index of 1st
snapshot", 0, start.index());
+ Assert.assertTrue("Start offset's snapshot should do a full table scan",
start.shouldScanAllFiles());
+ Assert.assertFalse("Start offset should not fully process the snapshot",
start.isSnapshotFullyProcessed());
+
+ StreamingOffset end = (StreamingOffset) streamingReader.getEndOffset();
+ Assert.assertEquals("End offset's snapshot should be the last snapshot id",
+ snapshotIds.get(3).longValue(), end.snapshotId());
+ Assert.assertEquals("End offset's index should be the last index", 1,
end.index());
+ Assert.assertFalse("End offset's snapshot should not do a full table
scan", end.shouldScanAllFiles());
+ Assert.assertTrue("End offset should fully process the snapshot",
end.isSnapshotFullyProcessed());
+
+ streamingReader.setOffsetRange(Optional.of(end), Optional.empty());
+ StreamingOffset end1 = (StreamingOffset) streamingReader.getEndOffset();
+ Assert.assertEquals("End offset should be same to start offset since
there's no more batches to consume",
+ end1, end);
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void testGetOffsetWithRateLimit1000() throws IOException {
+ File checkpoint = Files.createTempDirectory(parent, "checkpoint").toFile();
+
+ IcebergSource source = new IcebergSource();
+ List<Long> snapshotIds = SnapshotUtil.currentAncestors(table);
+ Collections.reverse(snapshotIds);
+
+ // Max size to 1000, this will generate two MicroBatches per consuming.
+ DataSourceOptions options = new DataSourceOptions(ImmutableMap.of(
+ "path", tableLocation.toString(),
+ "checkpointLocation", checkpoint.toString(),
+ "max-size-per-batch", "1000"));
+ StreamingReader streamingReader = (StreamingReader)
source.createMicroBatchReader(
+ Optional.empty(), checkpoint.toString(), options);
+
+ streamingReader.setOffsetRange(Optional.empty(), Optional.empty());
+ StreamingOffset start = (StreamingOffset) streamingReader.getStartOffset();
+ validateOffset(new StreamingOffset(snapshotIds.get(0), 0, true, false),
start);
+
+ StreamingOffset end = (StreamingOffset) streamingReader.getEndOffset();
+ validateOffset(new StreamingOffset(snapshotIds.get(1), 1, false, true),
end);
+
+ streamingReader.setOffsetRange(Optional.of(end), Optional.empty());
+ StreamingOffset end1 = (StreamingOffset) streamingReader.getEndOffset();
+ validateOffset(new StreamingOffset(snapshotIds.get(3), 1, false, true),
end1);
+
+ streamingReader.setOffsetRange(Optional.of(end1), Optional.empty());
+ StreamingOffset end2 = (StreamingOffset) streamingReader.getEndOffset();
+ validateOffset(end2, end1);
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void testGetOffsetWithRateLimit100() throws IOException {
+ File checkpoint = Files.createTempDirectory(parent, "checkpoint").toFile();
+
+ IcebergSource source = new IcebergSource();
+ List<Long> snapshotIds = SnapshotUtil.currentAncestors(table);
+ Collections.reverse(snapshotIds);
+
+ // Max size to 100, will generate 1 MicroBatch per consuming.
+ DataSourceOptions options = new DataSourceOptions(ImmutableMap.of(
+ "path", tableLocation.toString(),
+ "checkpointLocation", checkpoint.toString(),
+ "max-size-per-batch", "100"));
+ StreamingReader streamingReader = (StreamingReader)
source.createMicroBatchReader(
+ Optional.empty(), checkpoint.toString(), options);
+
+ streamingReader.setOffsetRange(Optional.empty(), Optional.empty());
+ StreamingOffset start = (StreamingOffset) streamingReader.getStartOffset();
+ validateOffset(new StreamingOffset(snapshotIds.get(0), 0, true, false),
start);
+
+ StreamingOffset end = (StreamingOffset) streamingReader.getEndOffset();
+ validateOffset(new StreamingOffset(snapshotIds.get(0), 1, true, true),
end);
+
+ streamingReader.setOffsetRange(Optional.of(end), Optional.empty());
+ StreamingOffset end1 = (StreamingOffset) streamingReader.getEndOffset();
+ validateOffset(new StreamingOffset(snapshotIds.get(1), 1, false, true),
end1);
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void testSpecifyInvalidSnapshotId() throws IOException {
+ File checkpoint = Files.createTempDirectory(parent, "checkpoint").toFile();
+ IcebergSource source = new IcebergSource();
+
+ // test invalid snapshot id
+ DataSourceOptions options = new DataSourceOptions(ImmutableMap.of(
+ "path", tableLocation.toString(),
+ "checkpointLocation", checkpoint.toString(),
+ "starting-snapshot-id", "-1"));
+ AssertHelpers.assertThrows("Test invalid snapshot id",
+ IllegalStateException.class, "The option starting-snapshot-id -1 is
not an ancestor",
+ () -> source.createMicroBatchReader(Optional.empty(),
checkpoint.toString(), options));
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void testSpecifySnapshotId() throws IOException {
+ File checkpoint = Files.createTempDirectory(parent, "checkpoint").toFile();
+
+ IcebergSource source = new IcebergSource();
+ List<Long> snapshotIds = SnapshotUtil.currentAncestors(table);
+ Collections.reverse(snapshotIds);
+
+ // test specify snapshot-id
+ DataSourceOptions options = new DataSourceOptions(ImmutableMap.of(
+ "path", tableLocation.toString(),
+ "checkpointLocation", checkpoint.toString(),
+ "starting-snapshot-id", snapshotIds.get(1).toString(),
+ "max-size-per-batch", "1000"));
+ StreamingReader streamingReader = (StreamingReader)
source.createMicroBatchReader(
+ Optional.empty(), checkpoint.toString(), options);
+
+ streamingReader.setOffsetRange(Optional.empty(), Optional.empty());
+ StreamingOffset start = (StreamingOffset) streamingReader.getStartOffset();
+ validateOffset(new StreamingOffset(snapshotIds.get(1), 0, true, false),
start);
+
+ StreamingOffset end = (StreamingOffset) streamingReader.getEndOffset();
+ validateOffset(new StreamingOffset(snapshotIds.get(1), 1, true, false),
end);
+
+ streamingReader.setOffsetRange(Optional.of(end), Optional.empty());
+ StreamingOffset end1 = (StreamingOffset) streamingReader.getEndOffset();
+ validateOffset(new StreamingOffset(snapshotIds.get(2), 1, false, true),
end1);
+
+ streamingReader.setOffsetRange(Optional.of(end1), Optional.empty());
+ StreamingOffset end2 = (StreamingOffset) streamingReader.getEndOffset();
+ validateOffset(new StreamingOffset(snapshotIds.get(3), 1, false, true),
end2);
+
+ streamingReader.setOffsetRange(Optional.of(end2), Optional.empty());
+ StreamingOffset end3 = (StreamingOffset) streamingReader.getEndOffset();
+ validateOffset(end2, end3);
+ }
+
Review comment:
I think this needs a checkpoint recovery test
##########
File path:
spark2/src/main/java/org/apache/iceberg/spark/source/StreamingReader.java
##########
@@ -0,0 +1,323 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+import java.util.stream.Collectors;
+import org.apache.iceberg.CombinedScanTask;
+import org.apache.iceberg.DataOperations;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.MicroBatches;
+import org.apache.iceberg.MicroBatches.MicroBatch;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.encryption.EncryptionManager;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.FileIO;
+import
org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.util.Pair;
+import org.apache.iceberg.util.PropertyUtil;
+import org.apache.iceberg.util.SnapshotUtil;
+import org.apache.iceberg.util.TableScanUtil;
+import org.apache.spark.broadcast.Broadcast;
+import org.apache.spark.sql.sources.v2.DataSourceOptions;
+import org.apache.spark.sql.sources.v2.reader.streaming.MicroBatchReader;
+import org.apache.spark.sql.sources.v2.reader.streaming.Offset;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.iceberg.TableProperties.SPLIT_LOOKBACK;
+import static org.apache.iceberg.TableProperties.SPLIT_LOOKBACK_DEFAULT;
+import static org.apache.iceberg.TableProperties.SPLIT_OPEN_FILE_COST;
+import static org.apache.iceberg.TableProperties.SPLIT_OPEN_FILE_COST_DEFAULT;
+import static org.apache.iceberg.TableProperties.SPLIT_SIZE;
+import static org.apache.iceberg.TableProperties.SPLIT_SIZE_DEFAULT;
+
+/**
+ * A micro-batch based Spark Structured Streaming reader for Iceberg table. It
will track the added
+ * files and generate tasks per batch to process newly added files. By default
it will process
+ * all the newly added files to the current snapshot in each batch, user could
also set this
+ * configuration "max-files-per-trigger" to control the number of files
processed per batch.
+ */
+class StreamingReader extends Reader implements MicroBatchReader {
+ private static final Logger LOG =
LoggerFactory.getLogger(StreamingReader.class);
+
+ private StreamingOffset startOffset;
+ private StreamingOffset endOffset;
+
+ private final Table table;
+ private final long maxSizePerBatch;
+ private final Long startSnapshotId;
+ private final long splitSize;
+ private final int splitLookback;
+ private final long splitOpenFileCost;
+ private Boolean readUsingBatch = null;
+
+ // Used to cache the pending batches for this streaming batch interval.
+ private Pair<StreamingOffset, List<MicroBatch>> cachedPendingBatches = null;
Review comment:
If my stream fails and I have to restore from a checkpoint, will there
be an issue that some of my offsets are now computed differently than
previously cached?
##########
File path:
spark2/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead.java
##########
@@ -0,0 +1,522 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Optional;
+import java.util.stream.Collectors;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.AssertHelpers;
+import org.apache.iceberg.MicroBatches.MicroBatch;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.hadoop.HadoopTables;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.SnapshotUtil;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Encoders;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.sources.v2.DataSourceOptions;
+import org.apache.spark.sql.streaming.DataStreamWriter;
+import org.apache.spark.sql.streaming.StreamingQuery;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import static org.apache.iceberg.types.Types.NestedField.optional;
+
+public class TestStructuredStreamingRead {
+ private static final Configuration CONF = new Configuration();
+ private static final Schema SCHEMA = new Schema(
+ optional(1, "id", Types.IntegerType.get()),
+ optional(2, "data", Types.StringType.get())
+ );
+ private static SparkSession spark = null;
+ private static Path parent = null;
+ private static File tableLocation = null;
+ private static Table table = null;
+ private static List<List<SimpleRecord>> expected = null;
+
+ @Rule
+ public ExpectedException exceptionRule = ExpectedException.none();
+
+ @BeforeClass
+ public static void startSpark() throws Exception {
+ TestStructuredStreamingRead.spark = SparkSession.builder()
+ .master("local[2]")
+ .config("spark.sql.shuffle.partitions", 4)
+ .getOrCreate();
+
+ parent = Files.createTempDirectory("test");
+ tableLocation = new File(parent.toFile(), "table");
+ tableLocation.mkdir();
+
+ HadoopTables tables = new HadoopTables(CONF);
+ PartitionSpec spec =
PartitionSpec.builderFor(SCHEMA).identity("data").build();
+ table = tables.create(SCHEMA, spec, tableLocation.toString());
+
+ expected = Lists.newArrayList(
+ Lists.newArrayList(new SimpleRecord(1, "1")),
+ Lists.newArrayList(new SimpleRecord(2, "2")),
+ Lists.newArrayList(new SimpleRecord(3, "3")),
+ Lists.newArrayList(new SimpleRecord(4, "4"))
+ );
+
+ // Write records one by one to generate 4 snapshots.
+ for (List<SimpleRecord> l : expected) {
+ Dataset<Row> df = spark.createDataFrame(l, SimpleRecord.class);
+ df.select("id", "data").write()
+ .format("iceberg")
+ .mode("append")
+ .save(tableLocation.toString());
+ }
+ table.refresh();
+ }
+
+ @AfterClass
+ public static void stopSpark() {
+ SparkSession currentSpark = TestStructuredStreamingRead.spark;
+ TestStructuredStreamingRead.spark = null;
+ currentSpark.stop();
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void testGetChangesFromStart() throws IOException {
+ File checkpoint = Files.createTempDirectory(parent, "checkpoint").toFile();
+
+ DataSourceOptions options = new DataSourceOptions(ImmutableMap.of(
+ "path", tableLocation.toString(),
+ "checkpointLocation", checkpoint.toString()));
+ IcebergSource source = new IcebergSource();
+
+ StreamingReader streamingReader = (StreamingReader)
source.createMicroBatchReader(
+ Optional.empty(), checkpoint.toString(), options);
+
+ List<Long> snapshotIds = SnapshotUtil.currentAncestors(table);
+ Collections.reverse(snapshotIds);
+ long initialSnapshotId = snapshotIds.get(0);
+
+ // Getting all appends from initial snapshot.
+ List<MicroBatch> pendingBatches = streamingReader.getChangesWithRateLimit(
+ new StreamingOffset(initialSnapshotId, 0, true, false),
Long.MAX_VALUE);
+ Assert.assertEquals("Batches with unlimited size control should have 4
snapshots", 4, pendingBatches.size());
+
+ List<Long> batchSnapshotIds = pendingBatches.stream()
+ .map(MicroBatch::snapshotId)
+ .collect(Collectors.toList());
+ Assert.assertEquals("Snapshot id of each batch should match snapshot id of
table", snapshotIds, batchSnapshotIds);
+
+ // Getting appends from initial snapshot with last index, 1st snapshot
should be an empty batch.
+ List<MicroBatch> pendingBatches1 = streamingReader.getChangesWithRateLimit(
+ new StreamingOffset(initialSnapshotId, 1, true, false),
Long.MAX_VALUE);
+
+ Assert.assertEquals("Batches with unlimited size control from initial id
should have 4 snapshots",
+ 4, pendingBatches1.size());
+ MicroBatch batch = pendingBatches1.get(0);
+ // 1st batch should be empty, since the starting offset is the end of this
snapshot.
+ Assert.assertEquals("1st batch should be empty, have 0 size batch", 0L,
batch.sizeInBytes());
+ Assert.assertEquals("1st batch should be empty, endFileIndex should be
equal to start", 1, batch.endFileIndex());
+ Assert.assertTrue("1st batch should be empty",
Iterables.isEmpty(batch.tasks()));
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void testGetChangesFrom2ndSnapshot() throws IOException {
+ File checkpoint = Files.createTempDirectory(parent, "checkpoint").toFile();
+
+ DataSourceOptions options = new DataSourceOptions(ImmutableMap.of(
+ "path", tableLocation.toString(),
+ "checkpointLocation", checkpoint.toString()));
+ IcebergSource source = new IcebergSource();
+
+ StreamingReader streamingReader = (StreamingReader)
source.createMicroBatchReader(
+ Optional.empty(), checkpoint.toString(), options);
+
+ List<Long> snapshotIds = SnapshotUtil.currentAncestors(table);
+ Collections.reverse(snapshotIds);
+ long initialSnapshotId = snapshotIds.get(0);
+
+ // Getting appends from 2nd snapshot, 1st snapshot should be filtered out.
+ List<MicroBatch> pendingBatches = streamingReader.getChangesWithRateLimit(
+ new StreamingOffset(snapshotIds.get(1), 0, false, false),
Long.MAX_VALUE);
+
+ Assert.assertEquals(3, pendingBatches.size());
+ List<Long> batchSnapshotIds = pendingBatches.stream()
+ .map(MicroBatch::snapshotId)
+ .collect(Collectors.toList());
+ Assert.assertFalse("1st snapshot should be filtered",
batchSnapshotIds.contains(initialSnapshotId));
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void testGetChangesFromLastSnapshot() throws IOException {
+ File checkpoint = Files.createTempDirectory(parent, "checkpoint").toFile();
+
+ DataSourceOptions options = new DataSourceOptions(ImmutableMap.of(
+ "path", tableLocation.toString(),
+ "checkpointLocation", checkpoint.toString()));
+ IcebergSource source = new IcebergSource();
+
+ StreamingReader streamingReader = (StreamingReader)
source.createMicroBatchReader(
+ Optional.empty(), checkpoint.toString(), options);
+
+ List<Long> snapshotIds = SnapshotUtil.currentAncestors(table);
+ Collections.reverse(snapshotIds);
+
+ // Getting appends from last snapshot with last index, should get an empty
batch.
+ long lastSnapshotId = snapshotIds.get(3);
+ List<MicroBatch> pendingBatches = streamingReader.getChangesWithRateLimit(
+ new StreamingOffset(lastSnapshotId, 1, false, false), Long.MAX_VALUE);
+
+ Assert.assertEquals("Should only have 1 batch with last snapshot", 1,
pendingBatches.size());
+ MicroBatch batch = pendingBatches.get(0);
+ Assert.assertEquals("batch should have 0 size", 0L, batch.sizeInBytes());
+ Assert.assertEquals("batch endFileIndex should be euqal to start", 1,
batch.endFileIndex());
+ Assert.assertTrue("batch should be empty",
Iterables.isEmpty(batch.tasks()));
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void testGetChangesWithRateLimit1000() throws IOException {
+ File checkpoint = Files.createTempDirectory(parent, "checkpoint").toFile();
+
+ IcebergSource source = new IcebergSource();
+ List<Long> snapshotIds = SnapshotUtil.currentAncestors(table);
+ Collections.reverse(snapshotIds);
+ long initialSnapshotId = snapshotIds.get(0);
+
+ DataSourceOptions options = new DataSourceOptions(ImmutableMap.of(
+ "path", tableLocation.toString(),
+ "checkpointLocation", checkpoint.toString()));
+ StreamingReader streamingReader = (StreamingReader)
source.createMicroBatchReader(
+ Optional.empty(), checkpoint.toString(), options);
+
+ // The size of each data file is around 600 bytes.
+ // Max size set to 1000. One additional batch will be added because the
left size is less than file size,
+ // MicroBatchBuilder will add one more to avoid stuck.
+ List<MicroBatch> rateLimitedBatches =
streamingReader.getChangesWithRateLimit(
+ new StreamingOffset(initialSnapshotId, 0, true, false), 1000);
+
+ Assert.assertEquals("Should have 2 batches", 2L,
rateLimitedBatches.size());
+ MicroBatch batch = rateLimitedBatches.get(0);
+ Assert.assertEquals("1st batch's endFileIndex should reach to the end of
file indexes", 1, batch.endFileIndex());
+ Assert.assertTrue("1st batch should be the last index of 1st snapshot",
batch.lastIndexOfSnapshot());
+ Assert.assertEquals("1st batch should only have 1 task", 1,
batch.tasks().size());
+ Assert.assertTrue("1st batch's size should be around 600",
batch.sizeInBytes() < 1000 && batch.sizeInBytes() > 0);
+
+ MicroBatch batch1 = rateLimitedBatches.get(1);
+ Assert.assertEquals("2nd batch's endFileIndex should reach to the end of
file indexes", 1, batch1.endFileIndex());
+ Assert.assertTrue("2nd batch should be the last of 2nd snapshot",
batch1.lastIndexOfSnapshot());
+ Assert.assertEquals("2nd batch should only have 1 task", 1,
batch1.tasks().size());
+ Assert.assertTrue("2nd batch's size should be aound 600",
batch1.sizeInBytes() < 1000 && batch1.sizeInBytes() > 0);
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void testGetChangesWithRateLimit100() throws IOException {
+ File checkpoint = Files.createTempDirectory(parent, "checkpoint").toFile();
+
+ IcebergSource source = new IcebergSource();
+ List<Long> snapshotIds = SnapshotUtil.currentAncestors(table);
+ Collections.reverse(snapshotIds);
+
+ DataSourceOptions options = new DataSourceOptions(ImmutableMap.of(
+ "path", tableLocation.toString(),
+ "checkpointLocation", checkpoint.toString()));
+ StreamingReader streamingReader = (StreamingReader)
source.createMicroBatchReader(
+ Optional.empty(), checkpoint.toString(), options);
+
+ // Max size less than file size, should have one batch added to avoid
stuck.
+ List<MicroBatch> rateLimitedBatches =
streamingReader.getChangesWithRateLimit(
+ new StreamingOffset(snapshotIds.get(1), 1, false, true), 100);
+
+ Assert.assertEquals("Should only have 1 batch", 1,
rateLimitedBatches.size());
+ MicroBatch batch = rateLimitedBatches.get(0);
+ Assert.assertEquals("Batch's endFileIndex should reach to the end of file
indexes", 1, batch.endFileIndex());
+ Assert.assertTrue("Batch should be the last of 1st snapshot",
batch.lastIndexOfSnapshot());
+ Assert.assertEquals("Batch should have 1 task", 1, batch.tasks().size());
+ Assert.assertTrue("Batch's size should be around 600", batch.sizeInBytes()
< 1000 && batch.sizeInBytes() > 0);
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void testGetChangesWithRateLimit10000() throws IOException {
+ File checkpoint = Files.createTempDirectory(parent, "checkpoint").toFile();
+
+ IcebergSource source = new IcebergSource();
+ List<Long> snapshotIds = SnapshotUtil.currentAncestors(table);
+ Collections.reverse(snapshotIds);
+
+ DataSourceOptions options = new DataSourceOptions(ImmutableMap.of(
+ "path", tableLocation.toString(),
+ "checkpointLocation", checkpoint.toString()));
+ StreamingReader streamingReader = (StreamingReader)
source.createMicroBatchReader(
+ Optional.empty(), checkpoint.toString(), options);
+
+ // Max size set to 10000, the last left batch will be added.
+ List<MicroBatch> rateLimitedBatches =
streamingReader.getChangesWithRateLimit(
+ new StreamingOffset(snapshotIds.get(2), 1, false, true), 10000);
+
+ Assert.assertEquals("Should only have 1 batch", 1,
rateLimitedBatches.size());
+ MicroBatch batch = rateLimitedBatches.get(0);
+ Assert.assertEquals("Batch's endFileIndex should reach to the end of file
indexes", 1, batch.endFileIndex());
+ Assert.assertEquals("Batch should have 1 task", 1, batch.tasks().size());
+ Assert.assertTrue("Batch should have 1 task", batch.lastIndexOfSnapshot());
+ Assert.assertTrue("Batch's size should be around 600", batch.sizeInBytes()
< 1000 && batch.sizeInBytes() > 0);
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void testGetOffsetWithDefaultRateLimit() throws IOException {
+ File checkpoint = Files.createTempDirectory(parent, "checkpoint").toFile();
+
+ IcebergSource source = new IcebergSource();
+ List<Long> snapshotIds = SnapshotUtil.currentAncestors(table);
+ Collections.reverse(snapshotIds);
+
+ // Default max size per batch, this will consume all the data of this
table.
+ DataSourceOptions options = new DataSourceOptions(ImmutableMap.of(
+ "path", tableLocation.toString(),
+ "checkpointLocation", checkpoint.toString()));
+ StreamingReader streamingReader = (StreamingReader)
source.createMicroBatchReader(
+ Optional.empty(), checkpoint.toString(), options);
+ streamingReader.setOffsetRange(Optional.empty(), Optional.empty());
+
+ StreamingOffset start = (StreamingOffset) streamingReader.getStartOffset();
+ Assert.assertEquals("Start offset's snapshot id should be 1st snapshot id",
+ snapshotIds.get(0).longValue(), start.snapshotId());
+ Assert.assertEquals("Start offset's index should be the start index of 1st
snapshot", 0, start.index());
+ Assert.assertTrue("Start offset's snapshot should do a full table scan",
start.shouldScanAllFiles());
+ Assert.assertFalse("Start offset should not fully process the snapshot",
start.isSnapshotFullyProcessed());
+
+ StreamingOffset end = (StreamingOffset) streamingReader.getEndOffset();
+ Assert.assertEquals("End offset's snapshot should be the last snapshot id",
+ snapshotIds.get(3).longValue(), end.snapshotId());
+ Assert.assertEquals("End offset's index should be the last index", 1,
end.index());
+ Assert.assertFalse("End offset's snapshot should not do a full table
scan", end.shouldScanAllFiles());
+ Assert.assertTrue("End offset should fully process the snapshot",
end.isSnapshotFullyProcessed());
+
+ streamingReader.setOffsetRange(Optional.of(end), Optional.empty());
+ StreamingOffset end1 = (StreamingOffset) streamingReader.getEndOffset();
+ Assert.assertEquals("End offset should be same to start offset since
there's no more batches to consume",
+ end1, end);
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void testGetOffsetWithRateLimit1000() throws IOException {
+ File checkpoint = Files.createTempDirectory(parent, "checkpoint").toFile();
+
+ IcebergSource source = new IcebergSource();
+ List<Long> snapshotIds = SnapshotUtil.currentAncestors(table);
+ Collections.reverse(snapshotIds);
+
+ // Max size to 1000, this will generate two MicroBatches per consuming.
+ DataSourceOptions options = new DataSourceOptions(ImmutableMap.of(
+ "path", tableLocation.toString(),
+ "checkpointLocation", checkpoint.toString(),
+ "max-size-per-batch", "1000"));
+ StreamingReader streamingReader = (StreamingReader)
source.createMicroBatchReader(
+ Optional.empty(), checkpoint.toString(), options);
+
+ streamingReader.setOffsetRange(Optional.empty(), Optional.empty());
+ StreamingOffset start = (StreamingOffset) streamingReader.getStartOffset();
+ validateOffset(new StreamingOffset(snapshotIds.get(0), 0, true, false),
start);
+
+ StreamingOffset end = (StreamingOffset) streamingReader.getEndOffset();
+ validateOffset(new StreamingOffset(snapshotIds.get(1), 1, false, true),
end);
+
+ streamingReader.setOffsetRange(Optional.of(end), Optional.empty());
+ StreamingOffset end1 = (StreamingOffset) streamingReader.getEndOffset();
+ validateOffset(new StreamingOffset(snapshotIds.get(3), 1, false, true),
end1);
+
+ streamingReader.setOffsetRange(Optional.of(end1), Optional.empty());
+ StreamingOffset end2 = (StreamingOffset) streamingReader.getEndOffset();
+ validateOffset(end2, end1);
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void testGetOffsetWithRateLimit100() throws IOException {
+ File checkpoint = Files.createTempDirectory(parent, "checkpoint").toFile();
+
+ IcebergSource source = new IcebergSource();
+ List<Long> snapshotIds = SnapshotUtil.currentAncestors(table);
+ Collections.reverse(snapshotIds);
+
+ // Max size to 100, will generate 1 MicroBatch per consuming.
+ DataSourceOptions options = new DataSourceOptions(ImmutableMap.of(
+ "path", tableLocation.toString(),
+ "checkpointLocation", checkpoint.toString(),
+ "max-size-per-batch", "100"));
+ StreamingReader streamingReader = (StreamingReader)
source.createMicroBatchReader(
+ Optional.empty(), checkpoint.toString(), options);
+
+ streamingReader.setOffsetRange(Optional.empty(), Optional.empty());
+ StreamingOffset start = (StreamingOffset) streamingReader.getStartOffset();
+ validateOffset(new StreamingOffset(snapshotIds.get(0), 0, true, false),
start);
+
+ StreamingOffset end = (StreamingOffset) streamingReader.getEndOffset();
+ validateOffset(new StreamingOffset(snapshotIds.get(0), 1, true, true),
end);
+
+ streamingReader.setOffsetRange(Optional.of(end), Optional.empty());
+ StreamingOffset end1 = (StreamingOffset) streamingReader.getEndOffset();
+ validateOffset(new StreamingOffset(snapshotIds.get(1), 1, false, true),
end1);
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void testSpecifyInvalidSnapshotId() throws IOException {
+ File checkpoint = Files.createTempDirectory(parent, "checkpoint").toFile();
+ IcebergSource source = new IcebergSource();
+
+ // test invalid snapshot id
+ DataSourceOptions options = new DataSourceOptions(ImmutableMap.of(
+ "path", tableLocation.toString(),
+ "checkpointLocation", checkpoint.toString(),
+ "starting-snapshot-id", "-1"));
+ AssertHelpers.assertThrows("Test invalid snapshot id",
+ IllegalStateException.class, "The option starting-snapshot-id -1 is
not an ancestor",
+ () -> source.createMicroBatchReader(Optional.empty(),
checkpoint.toString(), options));
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void testSpecifySnapshotId() throws IOException {
+ File checkpoint = Files.createTempDirectory(parent, "checkpoint").toFile();
+
+ IcebergSource source = new IcebergSource();
+ List<Long> snapshotIds = SnapshotUtil.currentAncestors(table);
+ Collections.reverse(snapshotIds);
+
+ // test specify snapshot-id
+ DataSourceOptions options = new DataSourceOptions(ImmutableMap.of(
+ "path", tableLocation.toString(),
+ "checkpointLocation", checkpoint.toString(),
+ "starting-snapshot-id", snapshotIds.get(1).toString(),
+ "max-size-per-batch", "1000"));
+ StreamingReader streamingReader = (StreamingReader)
source.createMicroBatchReader(
+ Optional.empty(), checkpoint.toString(), options);
+
+ streamingReader.setOffsetRange(Optional.empty(), Optional.empty());
+ StreamingOffset start = (StreamingOffset) streamingReader.getStartOffset();
+ validateOffset(new StreamingOffset(snapshotIds.get(1), 0, true, false),
start);
+
+ StreamingOffset end = (StreamingOffset) streamingReader.getEndOffset();
+ validateOffset(new StreamingOffset(snapshotIds.get(1), 1, true, false),
end);
+
+ streamingReader.setOffsetRange(Optional.of(end), Optional.empty());
+ StreamingOffset end1 = (StreamingOffset) streamingReader.getEndOffset();
+ validateOffset(new StreamingOffset(snapshotIds.get(2), 1, false, true),
end1);
+
+ streamingReader.setOffsetRange(Optional.of(end1), Optional.empty());
+ StreamingOffset end2 = (StreamingOffset) streamingReader.getEndOffset();
+ validateOffset(new StreamingOffset(snapshotIds.get(3), 1, false, true),
end2);
+
+ streamingReader.setOffsetRange(Optional.of(end2), Optional.empty());
+ StreamingOffset end3 = (StreamingOffset) streamingReader.getEndOffset();
+ validateOffset(end2, end3);
+ }
+
+ @Test
+ public void testStreamingRead() {
+ Dataset<Row> read = spark.readStream()
+ .format("iceberg")
+ .load(tableLocation.toString());
+ DataStreamWriter<Row> streamWriter = read.writeStream()
+ .format("memory")
+ .outputMode("append")
+ .queryName("memoryStream");
+
+ try {
+ StreamingQuery query = streamWriter.start();
+ query.processAllAvailable();
+
+ List<SimpleRecord> actual = spark.table("memoryStream")
+ .as(Encoders.bean(SimpleRecord.class))
+ .collectAsList();
+ List<SimpleRecord> expectedResult =
expected.stream().flatMap(List::stream).collect(Collectors.toList());
+ validateResult(expectedResult, actual);
+ } finally {
+ for (StreamingQuery query : spark.streams().active()) {
+ query.stop();
+ }
+ }
+ }
+
+ @Test
+ public void testStreamingReadWithSpecifiedSnapshotId() {
Review comment:
There are several other options that can be passed through and we
probably need to verify that all of them work, I counted
SPLIT_SIZE
SPLIT_OPEN_FILE_COST
SPLIT_LOOKBACK
##########
File path:
spark2/src/main/java/org/apache/iceberg/spark/source/StreamingReader.java
##########
@@ -0,0 +1,323 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+import java.util.stream.Collectors;
+import org.apache.iceberg.CombinedScanTask;
+import org.apache.iceberg.DataOperations;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.MicroBatches;
+import org.apache.iceberg.MicroBatches.MicroBatch;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.encryption.EncryptionManager;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.FileIO;
+import
org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.util.Pair;
+import org.apache.iceberg.util.PropertyUtil;
+import org.apache.iceberg.util.SnapshotUtil;
+import org.apache.iceberg.util.TableScanUtil;
+import org.apache.spark.broadcast.Broadcast;
+import org.apache.spark.sql.sources.v2.DataSourceOptions;
+import org.apache.spark.sql.sources.v2.reader.streaming.MicroBatchReader;
+import org.apache.spark.sql.sources.v2.reader.streaming.Offset;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.iceberg.TableProperties.SPLIT_LOOKBACK;
+import static org.apache.iceberg.TableProperties.SPLIT_LOOKBACK_DEFAULT;
+import static org.apache.iceberg.TableProperties.SPLIT_OPEN_FILE_COST;
+import static org.apache.iceberg.TableProperties.SPLIT_OPEN_FILE_COST_DEFAULT;
+import static org.apache.iceberg.TableProperties.SPLIT_SIZE;
+import static org.apache.iceberg.TableProperties.SPLIT_SIZE_DEFAULT;
+
+/**
+ * A micro-batch based Spark Structured Streaming reader for Iceberg table. It
will track the added
+ * files and generate tasks per batch to process newly added files. By default
it will process
+ * all the newly added files to the current snapshot in each batch, user could
also set this
+ * configuration "max-files-per-trigger" to control the number of files
processed per batch.
+ */
+class StreamingReader extends Reader implements MicroBatchReader {
+ private static final Logger LOG =
LoggerFactory.getLogger(StreamingReader.class);
+
+ private StreamingOffset startOffset;
+ private StreamingOffset endOffset;
+
+ private final Table table;
+ private final long maxSizePerBatch;
+ private final Long startSnapshotId;
+ private final long splitSize;
+ private final int splitLookback;
+ private final long splitOpenFileCost;
+ private Boolean readUsingBatch = null;
+
+ // Used to cache the pending batches for this streaming batch interval.
+ private Pair<StreamingOffset, List<MicroBatch>> cachedPendingBatches = null;
+
+ StreamingReader(Table table, Broadcast<FileIO> io,
Broadcast<EncryptionManager> encryptionManager,
+ boolean caseSensitive, DataSourceOptions options) {
+ super(table, io, encryptionManager, caseSensitive, options);
+
+ this.table = table;
+ this.maxSizePerBatch =
options.get("max-size-per-batch").map(Long::parseLong).orElse(Long.MAX_VALUE);
+ Preconditions.checkArgument(maxSizePerBatch > 0L,
+ "Option max-size-per-batch '%s' should > 0", maxSizePerBatch);
+
+ this.startSnapshotId =
options.get("starting-snapshot-id").map(Long::parseLong).orElse(null);
Review comment:
starting-snapshot-id should have a constant
##########
File path:
spark2/src/main/java/org/apache/iceberg/spark/source/StreamingReader.java
##########
@@ -0,0 +1,323 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+import java.util.stream.Collectors;
+import org.apache.iceberg.CombinedScanTask;
+import org.apache.iceberg.DataOperations;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.MicroBatches;
+import org.apache.iceberg.MicroBatches.MicroBatch;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.encryption.EncryptionManager;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.FileIO;
+import
org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.util.Pair;
+import org.apache.iceberg.util.PropertyUtil;
+import org.apache.iceberg.util.SnapshotUtil;
+import org.apache.iceberg.util.TableScanUtil;
+import org.apache.spark.broadcast.Broadcast;
+import org.apache.spark.sql.sources.v2.DataSourceOptions;
+import org.apache.spark.sql.sources.v2.reader.streaming.MicroBatchReader;
+import org.apache.spark.sql.sources.v2.reader.streaming.Offset;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.iceberg.TableProperties.SPLIT_LOOKBACK;
+import static org.apache.iceberg.TableProperties.SPLIT_LOOKBACK_DEFAULT;
+import static org.apache.iceberg.TableProperties.SPLIT_OPEN_FILE_COST;
+import static org.apache.iceberg.TableProperties.SPLIT_OPEN_FILE_COST_DEFAULT;
+import static org.apache.iceberg.TableProperties.SPLIT_SIZE;
+import static org.apache.iceberg.TableProperties.SPLIT_SIZE_DEFAULT;
+
+/**
+ * A micro-batch based Spark Structured Streaming reader for Iceberg table. It
will track the added
+ * files and generate tasks per batch to process newly added files. By default
it will process
+ * all the newly added files to the current snapshot in each batch, user could
also set this
+ * configuration "max-files-per-trigger" to control the number of files
processed per batch.
+ */
+class StreamingReader extends Reader implements MicroBatchReader {
+ private static final Logger LOG =
LoggerFactory.getLogger(StreamingReader.class);
+
+ private StreamingOffset startOffset;
+ private StreamingOffset endOffset;
+
+ private final Table table;
+ private final long maxSizePerBatch;
+ private final Long startSnapshotId;
+ private final long splitSize;
+ private final int splitLookback;
+ private final long splitOpenFileCost;
+ private Boolean readUsingBatch = null;
+
+ // Used to cache the pending batches for this streaming batch interval.
+ private Pair<StreamingOffset, List<MicroBatch>> cachedPendingBatches = null;
+
+ StreamingReader(Table table, Broadcast<FileIO> io,
Broadcast<EncryptionManager> encryptionManager,
+ boolean caseSensitive, DataSourceOptions options) {
+ super(table, io, encryptionManager, caseSensitive, options);
+
+ this.table = table;
+ this.maxSizePerBatch =
options.get("max-size-per-batch").map(Long::parseLong).orElse(Long.MAX_VALUE);
Review comment:
max-size-per-batch also needs a constant
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]