rdblue commented on a change in pull request #2660:
URL: https://github.com/apache/iceberg/pull/2660#discussion_r655707282
##########
File path: core/src/main/java/org/apache/iceberg/util/SnapshotUtil.java
##########
@@ -107,4 +120,19 @@ public static boolean ancestorOf(Table table, long
snapshotId, long ancestorSnap
return newFiles;
}
+
+ /**
+ * traverses through the table's snapshots and finds the {@link Table}'s
+ * Snapshot that is committed after the specified snapshotId
+ * @return null if passed in snapshot expired, else the snapshot after the
passed in snapshot
+ */
+ public static Snapshot snapshotAfter(Table table, long snapshotId) {
+ Snapshot previousSnapshot = table.snapshot(snapshotId);
+ Snapshot pointer = table.currentSnapshot();
Review comment:
Nit: other methods in this file use `current` instead of `pointer`. It
would be good to rename.
##########
File path: core/src/main/java/org/apache/iceberg/util/SnapshotUtil.java
##########
@@ -107,4 +120,19 @@ public static boolean ancestorOf(Table table, long
snapshotId, long ancestorSnap
return newFiles;
}
+
+ /**
+ * traverses through the table's snapshots and finds the {@link Table}'s
+ * Snapshot that is committed after the specified snapshotId
+ * @return null if passed in snapshot expired, else the snapshot after the
passed in snapshot
Review comment:
I don't think this quite captures what the method is doing because it
doesn't mention that this is with respect to the table's current state. How
about this?
> Traverses the history of the table's current snapshot and finds the
snapshot with the given snapshot id as its parent.
I think this should also document the behavior when:
* The given snapshot ID is not found in the table
(`IllegalArgumentException`?)
* The given snapshot ID is not an ancestor of the current table state
(`IllegalStateException`?)
Nit: We also prefer to use sentence case for documentation.
##########
File path: core/src/main/java/org/apache/iceberg/util/SnapshotUtil.java
##########
@@ -63,6 +63,19 @@ public static boolean ancestorOf(Table table, long
snapshotId, long ancestorSnap
return ancestorIds(table.currentSnapshot(), table::snapshot);
}
+ /**
+ * Find the oldest Snapshot of a {@link Table}.
Review comment:
I think this should be more clear, similar to my comments on the docs
about the method below. Basically, it should state that the "oldest" is
determined by traversing the history of the current table state.
##########
File path:
spark3/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java
##########
@@ -0,0 +1,252 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import java.io.BufferedWriter;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.OutputStreamWriter;
+import java.io.UncheckedIOException;
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+import org.apache.iceberg.CombinedScanTask;
+import org.apache.iceberg.DataOperations;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.MicroBatches;
+import org.apache.iceberg.MicroBatches.MicroBatch;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.SchemaParser;
+import org.apache.iceberg.SerializableTable;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.relocated.com.google.common.base.Joiner;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.spark.Spark3Util;
+import org.apache.iceberg.spark.SparkReadOptions;
+import org.apache.iceberg.spark.source.SparkBatchScan.ReadTask;
+import org.apache.iceberg.spark.source.SparkBatchScan.ReaderFactory;
+import org.apache.iceberg.util.PropertyUtil;
+import org.apache.iceberg.util.SnapshotUtil;
+import org.apache.iceberg.util.TableScanUtil;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.broadcast.Broadcast;
+import org.apache.spark.sql.connector.read.InputPartition;
+import org.apache.spark.sql.connector.read.PartitionReaderFactory;
+import org.apache.spark.sql.connector.read.streaming.MicroBatchStream;
+import org.apache.spark.sql.connector.read.streaming.Offset;
+import org.apache.spark.sql.util.CaseInsensitiveStringMap;
+
+import static org.apache.iceberg.TableProperties.SPLIT_LOOKBACK;
+import static org.apache.iceberg.TableProperties.SPLIT_LOOKBACK_DEFAULT;
+import static org.apache.iceberg.TableProperties.SPLIT_OPEN_FILE_COST;
+import static org.apache.iceberg.TableProperties.SPLIT_OPEN_FILE_COST_DEFAULT;
+import static org.apache.iceberg.TableProperties.SPLIT_SIZE;
+import static org.apache.iceberg.TableProperties.SPLIT_SIZE_DEFAULT;
+
+public class SparkMicroBatchStream implements MicroBatchStream {
+ private static final Joiner SLASH = Joiner.on("/");
+
+ private final Table table;
+ private final boolean caseSensitive;
+ private final String expectedSchema;
+ private final Broadcast<Table> tableBroadcast;
+ private final Long splitSize;
+ private final Integer splitLookback;
+ private final Long splitOpenFileCost;
+ private final boolean localityPreferred;
+ private final StreamingOffset initialOffset;
+
+ SparkMicroBatchStream(JavaSparkContext sparkContext, Table table, boolean
caseSensitive,
+ Schema expectedSchema, CaseInsensitiveStringMap
options, String checkpointLocation) {
+ this.table = table;
+ this.caseSensitive = caseSensitive;
+ this.expectedSchema = SchemaParser.toJson(expectedSchema);
+ this.localityPreferred = Spark3Util.isLocalityEnabled(table.io(),
table.location(), options);
+ this.tableBroadcast =
sparkContext.broadcast(SerializableTable.copyOf(table));
+
+ long tableSplitSize = PropertyUtil.propertyAsLong(table.properties(),
SPLIT_SIZE, SPLIT_SIZE_DEFAULT);
+ this.splitSize = Spark3Util.propertyAsLong(options,
SparkReadOptions.SPLIT_SIZE, tableSplitSize);
+
+ int tableSplitLookback = PropertyUtil.propertyAsInt(table.properties(),
SPLIT_LOOKBACK, SPLIT_LOOKBACK_DEFAULT);
+ this.splitLookback = Spark3Util.propertyAsInt(options,
SparkReadOptions.LOOKBACK, tableSplitLookback);
+
+ long tableSplitOpenFileCost = PropertyUtil.propertyAsLong(
+ table.properties(), SPLIT_OPEN_FILE_COST,
SPLIT_OPEN_FILE_COST_DEFAULT);
+ this.splitOpenFileCost = Spark3Util.propertyAsLong(options,
SPLIT_OPEN_FILE_COST, tableSplitOpenFileCost);
+
+ InitialOffsetStore initialOffsetStore = new InitialOffsetStore(table,
checkpointLocation);
+ this.initialOffset = initialOffsetStore.initialOffset();
+ }
+
+ @Override
+ public Offset latestOffset() {
+ table.refresh();
+ Snapshot latestSnapshot = table.currentSnapshot();
+ if (latestSnapshot == null) {
+ return StreamingOffset.START_OFFSET;
+ }
+
+ return new StreamingOffset(latestSnapshot.snapshotId(),
Iterables.size(latestSnapshot.addedFiles()), false);
+ }
+
+ @Override
+ public InputPartition[] planInputPartitions(Offset start, Offset end) {
+ Preconditions.checkArgument(end instanceof StreamingOffset, "Invalid end
offset: %s is not a StreamingOffset", end);
+ Preconditions.checkArgument(
+ start instanceof StreamingOffset, "Invalid start offset: %s is not a
StreamingOffset", start);
+
+ if (end.equals(StreamingOffset.START_OFFSET)) {
+ return new InputPartition[0];
+ }
+
+ StreamingOffset endOffset = (StreamingOffset) end;
+ StreamingOffset startOffset = (StreamingOffset) start;
+
+ List<FileScanTask> fileScanTasks = planFiles(startOffset, endOffset);
+
+ CloseableIterable<FileScanTask> splitTasks = TableScanUtil.splitFiles(
+ CloseableIterable.withNoopClose(fileScanTasks),
+ splitSize);
+ List<CombinedScanTask> combinedScanTasks = Lists.newArrayList(
+ TableScanUtil.planTasks(splitTasks, splitSize, splitLookback,
splitOpenFileCost));
+ InputPartition[] readTasks = new InputPartition[combinedScanTasks.size()];
+
+ for (int i = 0; i < combinedScanTasks.size(); i++) {
+ readTasks[i] = new ReadTask(
+ combinedScanTasks.get(i), tableBroadcast, expectedSchema,
+ caseSensitive, localityPreferred);
+ }
+
+ return readTasks;
+ }
+
+ @Override
+ public PartitionReaderFactory createReaderFactory() {
+ return new ReaderFactory(0);
+ }
+
+ @Override
+ public Offset initialOffset() {
+ return initialOffset;
+ }
+
+ @Override
+ public Offset deserializeOffset(String json) {
+ return StreamingOffset.fromJson(json);
+ }
+
+ @Override
+ public void commit(Offset end) {
+ }
+
+ @Override
+ public void stop() {
+ }
+
+ private List<FileScanTask> planFiles(StreamingOffset startOffset,
StreamingOffset endOffset) {
+ List<FileScanTask> fileScanTasks = Lists.newArrayList();
+ MicroBatch latestMicroBatch = null;
+ StreamingOffset batchStartOffset =
StreamingOffset.START_OFFSET.equals(startOffset) ?
+ new StreamingOffset(SnapshotUtil.oldestSnapshot(table).snapshotId(),
0, false) :
+ startOffset;
+
+ do {
+ StreamingOffset currentOffset =
+ latestMicroBatch != null && latestMicroBatch.lastIndexOfSnapshot() ?
+ new
StreamingOffset(snapshotAfter(latestMicroBatch.snapshotId()), 0L, false) :
+ batchStartOffset;
+
+ latestMicroBatch =
MicroBatches.from(table.snapshot(currentOffset.snapshotId()), table.io())
+ .caseSensitive(caseSensitive)
+ .specsById(table.specs())
+ .generate(currentOffset.position(), Long.MAX_VALUE,
currentOffset.shouldScanAllFiles());
+
+ fileScanTasks.addAll(latestMicroBatch.tasks());
+ } while (latestMicroBatch.snapshotId() != endOffset.snapshotId());
+
+ return fileScanTasks;
+ }
+
+ private long snapshotAfter(long snapshotId) {
+ Snapshot snapshotAfter = SnapshotUtil.snapshotAfter(table, snapshotId);
+
+ Preconditions.checkState(snapshotAfter != null, "Cannot find next
snapshot: as Snapshot %s expired", snapshotId);
Review comment:
I think this should be moved into the `SnapshotUtil` method.
##########
File path:
spark3/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java
##########
@@ -0,0 +1,310 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import java.io.BufferedWriter;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.OutputStreamWriter;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.hadoop.fs.Path;
+import org.apache.iceberg.CombinedScanTask;
+import org.apache.iceberg.DataOperations;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.MicroBatches;
+import org.apache.iceberg.MicroBatches.MicroBatch;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.SchemaParser;
+import org.apache.iceberg.SerializableTable;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.spark.Spark3Util;
+import org.apache.iceberg.spark.SparkReadOptions;
+import org.apache.iceberg.spark.source.SparkBatchScan.ReadTask;
+import org.apache.iceberg.spark.source.SparkBatchScan.ReaderFactory;
+import org.apache.iceberg.util.PropertyUtil;
+import org.apache.iceberg.util.SnapshotUtil;
+import org.apache.iceberg.util.TableScanUtil;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.broadcast.Broadcast;
+import org.apache.spark.sql.connector.read.InputPartition;
+import org.apache.spark.sql.connector.read.PartitionReaderFactory;
+import org.apache.spark.sql.connector.read.streaming.MicroBatchStream;
+import org.apache.spark.sql.connector.read.streaming.Offset;
+import org.apache.spark.sql.util.CaseInsensitiveStringMap;
+
+import static org.apache.iceberg.TableProperties.SPLIT_LOOKBACK;
+import static org.apache.iceberg.TableProperties.SPLIT_LOOKBACK_DEFAULT;
+import static org.apache.iceberg.TableProperties.SPLIT_OPEN_FILE_COST;
+import static org.apache.iceberg.TableProperties.SPLIT_OPEN_FILE_COST_DEFAULT;
+import static org.apache.iceberg.TableProperties.SPLIT_SIZE;
+import static org.apache.iceberg.TableProperties.SPLIT_SIZE_DEFAULT;
+
+public class SparkMicroBatchStream implements MicroBatchStream {
+ private final JavaSparkContext sparkContext;
+ private final Table table;
+ private final boolean caseSensitive;
+ private final Schema expectedSchema;
+ private final Long splitSize;
+ private final Integer splitLookback;
+ private final Long splitOpenFileCost;
+ private final boolean localityPreferred;
+ private final StreamingOffset initialOffset;
+
+ SparkMicroBatchStream(JavaSparkContext sparkContext,
+ Table table, boolean caseSensitive, Schema
expectedSchema,
+ CaseInsensitiveStringMap options, String
checkpointLocation) {
+ this.sparkContext = sparkContext;
+ this.table = table;
+ this.caseSensitive = caseSensitive;
+ this.expectedSchema = expectedSchema;
+ this.localityPreferred = Spark3Util.isLocalityEnabled(table.io(),
table.location(), options);
+
+ long tableSplitSize = PropertyUtil.propertyAsLong(table.properties(),
SPLIT_SIZE, SPLIT_SIZE_DEFAULT);
+ this.splitSize = Spark3Util.propertyAsLong(options,
SparkReadOptions.SPLIT_SIZE, tableSplitSize);
+
+ int tableSplitLookback = PropertyUtil.propertyAsInt(table.properties(),
SPLIT_LOOKBACK, SPLIT_LOOKBACK_DEFAULT);
+ this.splitLookback = Spark3Util.propertyAsInt(options,
SparkReadOptions.LOOKBACK, tableSplitLookback);
+
+ long tableSplitOpenFileCost = PropertyUtil.propertyAsLong(
+ table.properties(), SPLIT_OPEN_FILE_COST,
SPLIT_OPEN_FILE_COST_DEFAULT);
+ this.splitOpenFileCost = Spark3Util.propertyAsLong(options,
SPLIT_OPEN_FILE_COST, tableSplitOpenFileCost);
+
+ InitialOffsetStore initialOffsetStore =
InitialOffsetStore.getInstance(table, checkpointLocation);
+ this.initialOffset = getOrWriteInitialOffset(initialOffsetStore);
+ }
+
+ @Override
+ public Offset latestOffset() {
+ table.refresh();
+ Snapshot latestSnapshot = table.currentSnapshot();
+ if (latestSnapshot == null) {
+ return StreamingOffset.START_OFFSET;
+ }
+
+ // a readStream on an Iceberg table can be started from 2 types of
snapshots
+ // 1. a valid starting Snapshot:
+ // when this valid starting Snapshot is the initialOffset - then,
scanAllFiles must be set to true;
+ // for all StreamingOffsets following this - scanAllFiles must be set
to false
+ // 2. START_OFFSET:
+ // if the stream started on the table from START_OFFSET - it implies
- that all the subsequent Snapshots added
+ // will have all files as net New manifests & hence scanAllFiles can
be false.
+ boolean scanAllFiles = !StreamingOffset.START_OFFSET.equals(initialOffset)
&&
+ latestSnapshot.snapshotId() == initialOffset.snapshotId();
+
+ long filesNewlyAddedInLatestSnapshot =
Iterables.size(latestSnapshot.addedFiles());
+ long existingFilesInheritedByLatestSnapshot =
SnapshotUtil.existingDataFiles(latestSnapshot);
+ long positionValue = scanAllFiles ? existingFilesInheritedByLatestSnapshot
+ filesNewlyAddedInLatestSnapshot :
+ filesNewlyAddedInLatestSnapshot;
+
+ return new StreamingOffset(latestSnapshot.snapshotId(), positionValue,
scanAllFiles);
+ }
+
+ @Override
+ public InputPartition[] planInputPartitions(Offset start, Offset end) {
+ if (end.equals(StreamingOffset.START_OFFSET)) {
+ return new InputPartition[0];
+ }
+
+ // broadcast the table metadata as input partitions will be sent to
executors
+ Broadcast<Table> tableBroadcast =
sparkContext.broadcast(SerializableTable.copyOf(table));
+ String expectedSchemaString = SchemaParser.toJson(expectedSchema);
+
+ Preconditions.checkState(
+ end instanceof StreamingOffset,
+ "The end offset passed to planInputPartitions() is not an instance of
StreamingOffset.");
+
+ Preconditions.checkState(
+ start instanceof StreamingOffset,
+ "The start offset passed to planInputPartitions() is not an instance
of StreamingOffset.");
+
+ StreamingOffset endOffset = (StreamingOffset) end;
+ StreamingOffset startOffset = (StreamingOffset) start;
+
+ List<FileScanTask> fileScanTasks = calculateFileScanTasks(startOffset,
endOffset);
+
+ CloseableIterable<FileScanTask> splitTasks = TableScanUtil.splitFiles(
+ CloseableIterable.withNoopClose(fileScanTasks),
+ splitSize);
+ List<CombinedScanTask> combinedScanTasks = Lists.newArrayList(
+ TableScanUtil.planTasks(splitTasks, splitSize, splitLookback,
splitOpenFileCost));
+ InputPartition[] readTasks = new InputPartition[combinedScanTasks.size()];
+
+ for (int i = 0; i < combinedScanTasks.size(); i++) {
+ readTasks[i] = new ReadTask(
+ combinedScanTasks.get(i), tableBroadcast, expectedSchemaString,
+ caseSensitive, localityPreferred);
+ }
+
+ return readTasks;
+ }
+
+ @Override
+ public PartitionReaderFactory createReaderFactory() {
+ return new ReaderFactory(0);
+ }
+
+ @Override
+ public Offset initialOffset() {
+ return initialOffset;
+ }
+
+ @Override
+ public Offset deserializeOffset(String json) {
+ return StreamingOffset.fromJson(json);
+ }
+
+ @Override
+ public void commit(Offset end) {
+ }
+
+ @Override
+ public void stop() {
+ }
+
+ private static StreamingOffset getOrWriteInitialOffset(InitialOffsetStore
initialOffsetStore) {
+ if (initialOffsetStore.isOffsetStoreInitialized()) {
+ return initialOffsetStore.getInitialOffset();
+ }
+
+ return initialOffsetStore.addInitialOffset();
+ }
+
+ private List<FileScanTask> calculateFileScanTasks(StreamingOffset
startOffset, StreamingOffset endOffset) {
+ List<FileScanTask> fileScanTasks = new ArrayList<>();
+ MicroBatch latestMicroBatch = null;
+ StreamingOffset batchStartOffset =
StreamingOffset.START_OFFSET.equals(startOffset) ?
+ new StreamingOffset(SnapshotUtil.oldestSnapshot(table).snapshotId(),
0, false) :
+ startOffset;
+
+ do {
+ final StreamingOffset currentOffset =
+ latestMicroBatch != null && latestMicroBatch.lastIndexOfSnapshot() ?
+ getNextAvailableSnapshot(latestMicroBatch.snapshotId()) :
+ batchStartOffset;
+
+ latestMicroBatch =
MicroBatches.from(table.snapshot(currentOffset.snapshotId()), table.io())
+ .caseSensitive(caseSensitive)
+ .specsById(table.specs())
+ .generate(currentOffset.position(), Long.MAX_VALUE,
currentOffset.shouldScanAllFiles());
+
+ fileScanTasks.addAll(latestMicroBatch.tasks());
+ } while (latestMicroBatch.snapshotId() != endOffset.snapshotId());
+
+ return fileScanTasks;
+ }
+
+ private StreamingOffset getNextAvailableSnapshot(long snapshotId) {
+ Snapshot previousSnapshot = table.snapshot(snapshotId);
+ Snapshot pointer = table.currentSnapshot();
+ while (pointer != null && previousSnapshot.snapshotId() !=
pointer.parentId()) {
+
Preconditions.checkState(pointer.operation().equals(DataOperations.APPEND),
+ "Encountered Snapshot DataOperation other than APPEND.");
+
+ pointer = table.snapshot(pointer.parentId());
+ }
+
+ Preconditions.checkState(pointer != null,
+ "Cannot read data from snapshot which has already expired: %s",
snapshotId);
+
+ return new StreamingOffset(pointer.snapshotId(), 0L, false);
+ }
+
+ interface InitialOffsetStore {
+ static InitialOffsetStore getInstance(Table table, String
checkpointLocation) {
+ return new InitialOffsetStoreImpl(table, checkpointLocation);
+ }
+
+ StreamingOffset addInitialOffset();
+
+ boolean isOffsetStoreInitialized();
+
+ StreamingOffset getInitialOffset();
+ }
+
+ private static class InitialOffsetStoreImpl implements InitialOffsetStore {
+ private final Table table;
+ private final FileIO io;
+ private final String initialOffsetLocation;
+
+ InitialOffsetStoreImpl(Table table, String checkpointLocation) {
+ this.table = table;
+ this.io = table.io();
+ this.initialOffsetLocation = new Path(checkpointLocation,
"offsets/0").toString();
Review comment:
I'm unresolving this so it is more obvious later if we refer back to
this PR.
##########
File path: core/src/main/java/org/apache/iceberg/util/SnapshotUtil.java
##########
@@ -63,6 +63,19 @@ public static boolean ancestorOf(Table table, long
snapshotId, long ancestorSnap
return ancestorIds(table.currentSnapshot(), table::snapshot);
}
+ /**
+ * Traverses the history of the table's state and finds the oldest Snapshot.
Review comment:
I think that "table's state" isn't clear enough. How about "history of
the table's current snapshot" like the one below?
##########
File path: core/src/main/java/org/apache/iceberg/util/SnapshotUtil.java
##########
@@ -63,6 +63,19 @@ public static boolean ancestorOf(Table table, long
snapshotId, long ancestorSnap
return ancestorIds(table.currentSnapshot(), table::snapshot);
}
+ /**
+ * Traverses the history of the table's state and finds the oldest Snapshot.
+ * @return null if the table is empty, else the oldest Snapshot.
Review comment:
What does "empty" mean? I think it means that there is no current
snapshot, but it would be better to say that.
##########
File path:
spark3/src/main/java/org/apache/iceberg/spark/source/StreamingOffset.java
##########
@@ -62,20 +63,23 @@ static StreamingOffset fromJson(String json) {
try {
JsonNode node = JsonUtil.mapper().readValue(json, JsonNode.class);
- // The version of StreamingOffset. The offset was created with a version
number
- // used to validate when deserializing from json string.
- int version = JsonUtil.getInt(VERSION, node);
- Preconditions.checkArgument(version == CURR_VERSION,
- "Cannot parse offset JSON: offset version %s is not supported",
version);
+ return fromJsonNode(node);
+ } catch (IOException e) {
+ throw new IllegalArgumentException(String.format("Failed to parse
StreamingOffset from JSON string %s", json), e);
Review comment:
I think this should also be `UncheckedIOException`.
##########
File path:
spark3/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java
##########
@@ -0,0 +1,502 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.BaseTable;
+import org.apache.iceberg.DataOperations;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.Files;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.TableOperations;
+import org.apache.iceberg.TestHelpers;
+import org.apache.iceberg.data.FileHelpers;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.hadoop.HadoopTables;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.types.Types;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Encoders;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.streaming.DataStreamWriter;
+import org.apache.spark.sql.streaming.OutputMode;
+import org.apache.spark.sql.streaming.StreamingQuery;
+import org.apache.spark.sql.streaming.StreamingQueryException;
+import org.apache.spark.sql.streaming.Trigger;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.rules.TemporaryFolder;
+
+import static org.apache.iceberg.types.Types.NestedField.optional;
+
+public final class TestStructuredStreamingRead3 {
+ private static final Configuration CONF = new Configuration();
+ private static final Schema SCHEMA = new Schema(
+ optional(1, "id", Types.IntegerType.get()),
+ optional(2, "data", Types.StringType.get())
+ );
+ private static SparkSession spark = null;
+
+ @Rule
+ public TemporaryFolder temp = new TemporaryFolder();
+
+ @Rule
+ public ExpectedException exceptionRule = ExpectedException.none();
+
+ @BeforeClass
+ public static void startSpark() {
+ TestStructuredStreamingRead3.spark = SparkSession.builder()
+ .master("local[2]")
+ .config("spark.sql.shuffle.partitions", 4)
+ .getOrCreate();
+ }
+
+ @AfterClass
+ public static void stopSpark() {
+ SparkSession currentSpark = TestStructuredStreamingRead3.spark;
+ TestStructuredStreamingRead3.spark = null;
+ currentSpark.stop();
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void testReadStreamOnIcebergTableWithMultipleSnapshots() throws
IOException, TimeoutException {
+ File parent = temp.newFolder("parent");
+ File location = new File(parent, "test-table");
+
+ HadoopTables tables = new HadoopTables(CONF);
+ PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).bucket("id",
3).build();
+ Table table = tables.create(SCHEMA, spec, location.toString());
+
+ List<List<SimpleRecord>> expected = getTestDataForMultipleSnapshots();
+ appendData(expected, location);
+
+ table.refresh();
+
+ try {
+ Dataset<Row> df = spark.readStream()
+ .format("iceberg")
+ .load(location.toString());
+ List<SimpleRecord> actual = processStreamTillEnd(df);
+
+ Assert.assertEquals(
+ expected.stream().flatMap(List::stream).collect(Collectors.toList()),
+ actual);
+ } finally {
+ for (StreamingQuery query : spark.streams().active()) {
+ query.stop();
+ }
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void testResumingStreamReadFromCheckpoint() throws IOException,
TimeoutException, StreamingQueryException {
+ File parent = temp.newFolder("parent");
+ File location = new File(parent, "test-table");
+ File writerCheckpoint = new File(parent, "writer-checkpoint");
+
+ HadoopTables tables = new HadoopTables(CONF);
+ PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).bucket("id",
3).build();
+ Table table = tables.create(SCHEMA, spec, location.toString());
+ final String tempView = "microBatchView";
+
+ try {
+ Dataset<Row> df = spark.readStream()
+ .format("iceberg")
+ .load(location.toString());
+ DataStreamWriter<Row> singleBatchWriter = df.writeStream()
+ .trigger(Trigger.Once())
+ .option("checkpointLocation", writerCheckpoint.toString())
+ .foreachBatch((batchDF, batchId) -> {
+ batchDF.createOrReplaceGlobalTempView(tempView);
+ });
+
+ String globalTempView = "global_temp." + tempView;
+
+ List<SimpleRecord> processStreamOnEmptyIcebergTable =
processMicroBatch(singleBatchWriter, globalTempView);
+ Assert.assertEquals(
+ Collections.emptyList(),
+ processStreamOnEmptyIcebergTable);
+
+ for (List<List<SimpleRecord>> expectedCheckpoint :
getTestDataForMultipleWritesWithMultipleSnapshots()) {
+ appendData(expectedCheckpoint, location);
+ table.refresh();
+
+ List<SimpleRecord> actualDataInCurrentMicroBatch =
processMicroBatch(singleBatchWriter, globalTempView);
+ Assert.assertEquals(
+
expectedCheckpoint.stream().flatMap(List::stream).collect(Collectors.toList()),
+ actualDataInCurrentMicroBatch);
+ }
+ } finally {
+ for (StreamingQuery query : spark.streams().active()) {
+ query.stop();
+ }
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void testParquetOrcAvroDataInOneTable() throws IOException,
TimeoutException {
+ File parent = temp.newFolder("parent");
+ File location = new File(parent, "test-table");
+
+ HadoopTables tables = new HadoopTables(CONF);
+
+ // use partition buckets - so that single list<SimpleRecord> can generate
multiple files per write
+ PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).bucket("id",
3).build();
+ Table table = tables.create(SCHEMA, spec, location.toString());
+
+ List<SimpleRecord> parquetFileRecords = Lists.newArrayList(
+ new SimpleRecord(1, "one"),
+ new SimpleRecord(2, "two"),
+ new SimpleRecord(3, "three"));
+
+ List<SimpleRecord> orcFileRecords = Lists.newArrayList(
+ new SimpleRecord(4, "four"),
+ new SimpleRecord(5, "five"));
+
+ List<SimpleRecord> avroFileRecords = Lists.newArrayList(
+ new SimpleRecord(6, "six"),
+ new SimpleRecord(7, "seven"));
+
+ appendData(parquetFileRecords, location, "parquet");
+ appendData(orcFileRecords, location, "orc");
+ appendData(avroFileRecords, location, "avro");
+
+ table.refresh();
+
+
+ try {
+ Dataset<Row> ds = spark.readStream()
+ .format("iceberg")
+ .load(location.toString());
+ List<SimpleRecord> actual = processStreamTillEnd(ds);
+ List<SimpleRecord> expected = Stream.concat(Stream.concat(
+ parquetFileRecords.stream(),
+ orcFileRecords.stream()),
+ avroFileRecords.stream())
+ .collect(Collectors.toList());
+ Assert.assertEquals(expected, actual);
+ } finally {
+ for (StreamingQuery query : spark.streams().active()) {
+ query.stop();
+ }
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void testReadStreamFromEmptyTable() throws IOException,
TimeoutException {
+ File parent = temp.newFolder("parent");
+ File location = new File(parent, "test-table");
+
+ HadoopTables tables = new HadoopTables(CONF);
+ PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).bucket("id",
3).build();
+ Table table = tables.create(SCHEMA, spec, location.toString());
+
+ table.refresh();
+
+ try {
+ Dataset<Row> df = spark.readStream()
+ .format("iceberg")
+ .load(location.toString());
+
+ List<SimpleRecord> actual = processStreamTillEnd(df);
+ Assert.assertEquals(Collections.emptyList(), actual);
+ } finally {
+ for (StreamingQuery query : spark.streams().active()) {
+ query.stop();
+ }
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void testReadStreamWithSnapshotTypeOverwriteErrorsOut() throws
IOException, TimeoutException {
+ File parent = temp.newFolder("parent");
+ File location = new File(parent, "test-table");
+
+ HadoopTables tables = new HadoopTables(CONF);
+ PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).bucket("id",
3).build();
+ Table table = tables.create(SCHEMA, spec, location.toString());
+
+ // upgrade table to verison 2 - to facilitate creation of Snapshot of type
OVERWRITE.
+ TableOperations ops = ((BaseTable) table).operations();
+ TableMetadata meta = ops.current();
+ ops.commit(meta, meta.upgradeToFormatVersion(2));
+
+ // fill table with some data
+ List<List<SimpleRecord>> dataAcrossSnapshots =
getTestDataForMultipleSnapshots();
+ appendData(dataAcrossSnapshots, location);
+
+ table.refresh();
+
+ Schema deleteRowSchema = table.schema().select("data");
+ Record dataDelete = GenericRecord.create(deleteRowSchema);
+ List<Record> dataDeletes = Lists.newArrayList(
+ dataDelete.copy("data", "one") // id = 1
+ );
+
+ DeleteFile eqDeletes = FileHelpers.writeDeleteFile(
+ table, Files.localOutput(temp.newFile()), TestHelpers.Row.of(0),
dataDeletes, deleteRowSchema);
+
+ table.newRowDelta()
+ .addDeletes(eqDeletes)
+ .commit();
+
+ // check pre-condition - that the above Delete file write - actually
resulted in snapshot of type OVERWRITE
+ Assert.assertEquals(DataOperations.OVERWRITE,
table.currentSnapshot().operation());
+
+ try {
+ Dataset<Row> df = spark.readStream()
+ .format("iceberg")
+ .load(location.toString());
+ StreamingQuery streamingQuery = df.writeStream()
+ .format("memory")
+ .queryName("testtablewithoverwrites")
+ .outputMode(OutputMode.Append())
+ .start();
+
+ try {
+ streamingQuery.processAllAvailable();
+ Assert.assertTrue(false); // should be unreachable
+ } catch (Exception exception) {
+ Assert.assertTrue(exception instanceof StreamingQueryException);
+ Assert.assertTrue(((StreamingQueryException) exception).cause()
instanceof IllegalStateException);
+ }
+
+ } finally {
+ for (StreamingQuery query : spark.streams().active()) {
+ query.stop();
+ }
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void testReadStreamWithSnapshotTypeReplaceErrorsOut() throws
IOException, TimeoutException {
+ File parent = temp.newFolder("parent");
+ File location = new File(parent, "test-table");
+
+ HadoopTables tables = new HadoopTables(CONF);
+ PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).build();
+ Table table = tables.create(SCHEMA, spec, location.toString());
+
+ // fill table with some data
+ List<List<SimpleRecord>> dataAcrossSnapshots =
getTestDataForMultipleSnapshots();
+ appendData(dataAcrossSnapshots, location);
+
+ table.refresh();
+
+ // this should create a snapshot with type Replace.
+ table.rewriteManifests()
+ .clusterBy(f -> 1)
+ .commit();
+
+ // check pre-condition
+ Assert.assertEquals(DataOperations.REPLACE,
table.currentSnapshot().operation());
+
+ try {
+ Dataset<Row> df = spark.readStream()
+ .format("iceberg")
+ .load(location.toString());
+ StreamingQuery streamingQuery = df.writeStream()
+ .format("memory")
+ .queryName("testtablewithreplace")
+ .outputMode(OutputMode.Append())
+ .start();
+
+ try {
+ streamingQuery.processAllAvailable();
+ Assert.assertTrue(false); // should be unreachable
+ } catch (Exception exception) {
+ Assert.assertTrue(exception instanceof StreamingQueryException);
+ Assert.assertTrue(((StreamingQueryException) exception).cause()
instanceof IllegalStateException);
+ }
+
+ } finally {
+ for (StreamingQuery query : spark.streams().active()) {
+ query.stop();
+ }
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void testReadStreamWithSnapshotTypeDeleteErrorsOut() throws
IOException, TimeoutException {
+ File parent = temp.newFolder("parent");
+ File location = new File(parent, "test-table");
+
+ HadoopTables tables = new HadoopTables(CONF);
+ PartitionSpec spec =
PartitionSpec.builderFor(SCHEMA).identity("id").build();
+ Table table = tables.create(SCHEMA, spec, location.toString());
+
+ // fill table with some data
+ List<List<SimpleRecord>> dataAcrossSnapshots =
getTestDataForMultipleSnapshots();
+ appendData(dataAcrossSnapshots, location);
+
+ table.refresh();
+
+ // this should create a snapshot with type delete.
+ table.newDelete()
+ .deleteFromRowFilter(Expressions.equal("id", 4))
+ .commit();
+
+ // check pre-condition - that the above newDelete operation on table
resulted in Snapshot of Type DELETE.
+ Assert.assertEquals(DataOperations.DELETE,
table.currentSnapshot().operation());
+
+ try {
+ Dataset<Row> df = spark.readStream()
+ .format("iceberg")
+ .load(location.toString());
+ StreamingQuery streamingQuery = df.writeStream()
+ .format("memory")
+ .queryName("testtablewithdelete")
+ .outputMode(OutputMode.Append())
+ .start();
+
+ try {
+ streamingQuery.processAllAvailable();
+ Assert.assertTrue(false); // should be unreachable
+ } catch (Exception exception) {
+ Assert.assertTrue(exception instanceof StreamingQueryException);
+ Assert.assertTrue(((StreamingQueryException) exception).cause()
instanceof IllegalStateException);
+ }
+
+ } finally {
+ for (StreamingQuery query : spark.streams().active()) {
+ query.stop();
+ }
+ }
+ }
+
+ private static List<SimpleRecord> processMicroBatch(DataStreamWriter<Row>
singleBatchWriter, String viewName)
+ throws TimeoutException, StreamingQueryException {
+ StreamingQuery streamingQuery = singleBatchWriter.start();
+ streamingQuery.awaitTermination();
+
+ return spark.sql(String.format("select * from %s", viewName))
+ .as(Encoders.bean(SimpleRecord.class))
+ .collectAsList();
+ }
+
+ /**
+ * get test data - a list of records per snapshot
+ */
+ private static List<List<SimpleRecord>> getTestDataForMultipleSnapshots() {
+ return Lists.newArrayList(
+ Lists.newArrayList(
+ new SimpleRecord(1, "one"),
+ new SimpleRecord(2, "two"),
+ new SimpleRecord(3, "three")),
+ Lists.newArrayList(
+ new SimpleRecord(4, "four"),
+ new SimpleRecord(5, "five")),
+ Lists.newArrayList(
+ new SimpleRecord(6, "six"),
+ new SimpleRecord(7, "seven"))
+ );
+ }
+
+ /**
+ * appends each list as a Snapshot on the iceberg table at the given
location.
+ * accepts a list of lists - each list representing data per snapshot.
+ */
+ private static void appendData(List<List<SimpleRecord>> data, File location)
{
+ // generate multiple snapshots
+ for (List<SimpleRecord> l : data) {
+ appendData(l, location, "parquet");
+ }
+ }
+
+ private static void appendData(List<SimpleRecord> data, File location,
String fileFormat) {
+ Dataset<Row> df = spark.createDataFrame(data, SimpleRecord.class);
+ df.select("id", "data").write()
+ .format("iceberg")
+ .option("write-format", fileFormat)
+ .mode("append")
+ .save(location.toString());
+ }
+
+ private static List<SimpleRecord> processStreamTillEnd(Dataset<Row> df)
throws TimeoutException {
+ StreamingQuery streamingQuery = df.writeStream()
+ .format("memory")
+ .queryName("test12")
+ .outputMode(OutputMode.Append())
+ .start();
+ streamingQuery.processAllAvailable();
+ return spark.sql("select * from test12")
+ .as(Encoders.bean(SimpleRecord.class))
+ .collectAsList();
+ }
+
+ /**
+ * gets test data - to be used for multiple write batches
+ * each batch inturn will have multiple snapshots
+ */
+ private static List<List<List<SimpleRecord>>>
getTestDataForMultipleWritesWithMultipleSnapshots() {
Review comment:
Why isn't this a static final variable?
##########
File path:
spark3/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java
##########
@@ -0,0 +1,502 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.BaseTable;
+import org.apache.iceberg.DataOperations;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.Files;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.TableOperations;
+import org.apache.iceberg.TestHelpers;
+import org.apache.iceberg.data.FileHelpers;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.hadoop.HadoopTables;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.types.Types;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Encoders;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.streaming.DataStreamWriter;
+import org.apache.spark.sql.streaming.OutputMode;
+import org.apache.spark.sql.streaming.StreamingQuery;
+import org.apache.spark.sql.streaming.StreamingQueryException;
+import org.apache.spark.sql.streaming.Trigger;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.rules.TemporaryFolder;
+
+import static org.apache.iceberg.types.Types.NestedField.optional;
+
+public final class TestStructuredStreamingRead3 {
+ private static final Configuration CONF = new Configuration();
+ private static final Schema SCHEMA = new Schema(
+ optional(1, "id", Types.IntegerType.get()),
Review comment:
Nit: continuation indents are 4 spaces (or 2 indents). Can you fix this
file?
##########
File path: core/src/main/java/org/apache/iceberg/util/SnapshotUtil.java
##########
@@ -107,4 +121,26 @@ public static boolean ancestorOf(Table table, long
snapshotId, long ancestorSnap
return newFiles;
}
+
+ /**
+ * Traverses the history of the table's current snapshot and finds the
snapshot with the given snapshot id as its
+ * parent.
+ * @return null if the passed in snapshot is not present in the table, else
the snapshot for which the given snapshot
+ * is the parent
+ * @throws IllegalArgumentException when the given SnapshotId is not found
in the table
Review comment:
Why is `SnapshotId` capitalized?
##########
File path: core/src/main/java/org/apache/iceberg/util/SnapshotUtil.java
##########
@@ -107,4 +121,26 @@ public static boolean ancestorOf(Table table, long
snapshotId, long ancestorSnap
return newFiles;
}
+
+ /**
+ * Traverses the history of the table's current snapshot and finds the
snapshot with the given snapshot id as its
+ * parent.
+ * @return null if the passed in snapshot is not present in the table, else
the snapshot for which the given snapshot
+ * is the parent
+ * @throws IllegalArgumentException when the given SnapshotId is not found
in the table
+ * @throws IllegalStateException when the given snapshot ID is not an
ancestor of the current table state
+ */
+ public static Snapshot snapshotAfter(Table table, long snapshotId) {
+ Snapshot previousSnapshot = table.snapshot(snapshotId);
Review comment:
Minor: You may want to rename this `parent` so that it is clear why the
loop exits.
##########
File path: core/src/main/java/org/apache/iceberg/util/SnapshotUtil.java
##########
@@ -107,4 +121,26 @@ public static boolean ancestorOf(Table table, long
snapshotId, long ancestorSnap
return newFiles;
}
+
+ /**
+ * Traverses the history of the table's current snapshot and finds the
snapshot with the given snapshot id as its
+ * parent.
+ * @return null if the passed in snapshot is not present in the table, else
the snapshot for which the given snapshot
+ * is the parent
+ * @throws IllegalArgumentException when the given SnapshotId is not found
in the table
+ * @throws IllegalStateException when the given snapshot ID is not an
ancestor of the current table state
+ */
+ public static Snapshot snapshotAfter(Table table, long snapshotId) {
+ Snapshot previousSnapshot = table.snapshot(snapshotId);
+ Preconditions.checkArgument(previousSnapshot != null,
+ "Invalid snapshotId: %s, snapshot not found in the table.",
snapshotId);
Review comment:
The ID isn't invalid, the problem is that the snapshot is not present. I
think it should be `"Cannot find parent snapshot: %s"`
##########
File path: core/src/main/java/org/apache/iceberg/util/SnapshotUtil.java
##########
@@ -107,4 +121,26 @@ public static boolean ancestorOf(Table table, long
snapshotId, long ancestorSnap
return newFiles;
}
+
+ /**
+ * Traverses the history of the table's current snapshot and finds the
snapshot with the given snapshot id as its
+ * parent.
+ * @return null if the passed in snapshot is not present in the table, else
the snapshot for which the given snapshot
+ * is the parent
+ * @throws IllegalArgumentException when the given SnapshotId is not found
in the table
+ * @throws IllegalStateException when the given snapshot ID is not an
ancestor of the current table state
+ */
+ public static Snapshot snapshotAfter(Table table, long snapshotId) {
+ Snapshot previousSnapshot = table.snapshot(snapshotId);
+ Preconditions.checkArgument(previousSnapshot != null,
+ "Invalid snapshotId: %s, snapshot not found in the table.",
snapshotId);
+
+ Snapshot current = table.currentSnapshot();
+ while (previousSnapshot != null && current != null &&
previousSnapshot.snapshotId() != current.parentId()) {
+ current = table.snapshot(current.parentId());
+ }
+
+ Preconditions.checkState(current != null, "Cannot find next snapshot: as
Snapshot %s expired", snapshotId);
Review comment:
I don't think that this error message is correct. The problem is that
traversing the current snapshot's history didn't find the parent snapshot.
Since the parent is defined (we know this from the check above) we know that
the parent is actually not an ancestor of the current table state. That's a
more descriptive error message: `"Cannot find snapshot after %s: not an
ancestor of the current snapshot"`
##########
File path: core/src/main/java/org/apache/iceberg/util/SnapshotUtil.java
##########
@@ -107,4 +121,26 @@ public static boolean ancestorOf(Table table, long
snapshotId, long ancestorSnap
return newFiles;
}
+
+ /**
+ * Traverses the history of the table's current snapshot and finds the
snapshot with the given snapshot id as its
+ * parent.
+ * @return null if the passed in snapshot is not present in the table, else
the snapshot for which the given snapshot
+ * is the parent
+ * @throws IllegalArgumentException when the given SnapshotId is not found
in the table
+ * @throws IllegalStateException when the given snapshot ID is not an
ancestor of the current table state
+ */
+ public static Snapshot snapshotAfter(Table table, long snapshotId) {
+ Snapshot previousSnapshot = table.snapshot(snapshotId);
Review comment:
Actually, I think it may be a good idea to change the loop to exit
early. That would be a bit cleaner. And there's no need to keep the parent
around at all, or to check that it is non-null every time through the loop.
```
Preconditions.checkArgument(table.snapshot(snapshotId) != null, ...);
Snapshot current = table.currentSnapshot();
while (current != null) {
if (current.parentId() == snapshotId) {
return current;
}
current = table.snapshot(current.parentId());
}
throw new IllegalStateException(...);
```
##########
File path:
spark3/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java
##########
@@ -0,0 +1,502 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.BaseTable;
+import org.apache.iceberg.DataOperations;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.Files;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.TableOperations;
+import org.apache.iceberg.TestHelpers;
+import org.apache.iceberg.data.FileHelpers;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.hadoop.HadoopTables;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.types.Types;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Encoders;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.streaming.DataStreamWriter;
+import org.apache.spark.sql.streaming.OutputMode;
+import org.apache.spark.sql.streaming.StreamingQuery;
+import org.apache.spark.sql.streaming.StreamingQueryException;
+import org.apache.spark.sql.streaming.Trigger;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.rules.TemporaryFolder;
+
+import static org.apache.iceberg.types.Types.NestedField.optional;
+
+public final class TestStructuredStreamingRead3 {
+ private static final Configuration CONF = new Configuration();
+ private static final Schema SCHEMA = new Schema(
+ optional(1, "id", Types.IntegerType.get()),
+ optional(2, "data", Types.StringType.get())
+ );
+ private static SparkSession spark = null;
+
+ @Rule
+ public TemporaryFolder temp = new TemporaryFolder();
+
+ @Rule
+ public ExpectedException exceptionRule = ExpectedException.none();
+
+ @BeforeClass
+ public static void startSpark() {
+ TestStructuredStreamingRead3.spark = SparkSession.builder()
+ .master("local[2]")
+ .config("spark.sql.shuffle.partitions", 4)
+ .getOrCreate();
+ }
+
+ @AfterClass
+ public static void stopSpark() {
+ SparkSession currentSpark = TestStructuredStreamingRead3.spark;
+ TestStructuredStreamingRead3.spark = null;
+ currentSpark.stop();
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void testReadStreamOnIcebergTableWithMultipleSnapshots() throws
IOException, TimeoutException {
Review comment:
For tests, you can just throw `Exception` so that these lists don't get
too long.
##########
File path:
spark3/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java
##########
@@ -0,0 +1,502 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.BaseTable;
+import org.apache.iceberg.DataOperations;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.Files;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.TableOperations;
+import org.apache.iceberg.TestHelpers;
+import org.apache.iceberg.data.FileHelpers;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.hadoop.HadoopTables;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.types.Types;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Encoders;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.streaming.DataStreamWriter;
+import org.apache.spark.sql.streaming.OutputMode;
+import org.apache.spark.sql.streaming.StreamingQuery;
+import org.apache.spark.sql.streaming.StreamingQueryException;
+import org.apache.spark.sql.streaming.Trigger;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.rules.TemporaryFolder;
+
+import static org.apache.iceberg.types.Types.NestedField.optional;
+
+public final class TestStructuredStreamingRead3 {
+ private static final Configuration CONF = new Configuration();
+ private static final Schema SCHEMA = new Schema(
+ optional(1, "id", Types.IntegerType.get()),
+ optional(2, "data", Types.StringType.get())
+ );
+ private static SparkSession spark = null;
+
+ @Rule
+ public TemporaryFolder temp = new TemporaryFolder();
+
+ @Rule
+ public ExpectedException exceptionRule = ExpectedException.none();
+
+ @BeforeClass
+ public static void startSpark() {
+ TestStructuredStreamingRead3.spark = SparkSession.builder()
+ .master("local[2]")
+ .config("spark.sql.shuffle.partitions", 4)
+ .getOrCreate();
+ }
+
+ @AfterClass
+ public static void stopSpark() {
+ SparkSession currentSpark = TestStructuredStreamingRead3.spark;
+ TestStructuredStreamingRead3.spark = null;
+ currentSpark.stop();
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void testReadStreamOnIcebergTableWithMultipleSnapshots() throws
IOException, TimeoutException {
+ File parent = temp.newFolder("parent");
+ File location = new File(parent, "test-table");
+
+ HadoopTables tables = new HadoopTables(CONF);
+ PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).bucket("id",
3).build();
+ Table table = tables.create(SCHEMA, spec, location.toString());
+
+ List<List<SimpleRecord>> expected = getTestDataForMultipleSnapshots();
+ appendData(expected, location);
+
+ table.refresh();
+
+ try {
+ Dataset<Row> df = spark.readStream()
+ .format("iceberg")
+ .load(location.toString());
+ List<SimpleRecord> actual = processStreamTillEnd(df);
Review comment:
Looks like `processStreamTillEnd` can't really guarantee an ordering for
the records that are returned, so Spark may do something later that breaks this
test. I think that this is probably a good time to use the new `Assertions`
helper that @nastra recommended so that it is easy to assert that the list that
is returned as a specific size and contains all of the expected records:
```java
Assertions.assertThat(actual).containsExactlyInAnyOrder(Iterables.concat(expected));
```
##########
File path:
spark3/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java
##########
@@ -0,0 +1,502 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.BaseTable;
+import org.apache.iceberg.DataOperations;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.Files;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.TableOperations;
+import org.apache.iceberg.TestHelpers;
+import org.apache.iceberg.data.FileHelpers;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.hadoop.HadoopTables;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.types.Types;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Encoders;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.streaming.DataStreamWriter;
+import org.apache.spark.sql.streaming.OutputMode;
+import org.apache.spark.sql.streaming.StreamingQuery;
+import org.apache.spark.sql.streaming.StreamingQueryException;
+import org.apache.spark.sql.streaming.Trigger;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.rules.TemporaryFolder;
+
+import static org.apache.iceberg.types.Types.NestedField.optional;
+
+public final class TestStructuredStreamingRead3 {
+ private static final Configuration CONF = new Configuration();
+ private static final Schema SCHEMA = new Schema(
+ optional(1, "id", Types.IntegerType.get()),
+ optional(2, "data", Types.StringType.get())
+ );
+ private static SparkSession spark = null;
+
+ @Rule
+ public TemporaryFolder temp = new TemporaryFolder();
+
+ @Rule
+ public ExpectedException exceptionRule = ExpectedException.none();
+
+ @BeforeClass
+ public static void startSpark() {
+ TestStructuredStreamingRead3.spark = SparkSession.builder()
+ .master("local[2]")
+ .config("spark.sql.shuffle.partitions", 4)
+ .getOrCreate();
+ }
+
+ @AfterClass
+ public static void stopSpark() {
+ SparkSession currentSpark = TestStructuredStreamingRead3.spark;
+ TestStructuredStreamingRead3.spark = null;
+ currentSpark.stop();
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void testReadStreamOnIcebergTableWithMultipleSnapshots() throws
IOException, TimeoutException {
+ File parent = temp.newFolder("parent");
+ File location = new File(parent, "test-table");
+
+ HadoopTables tables = new HadoopTables(CONF);
+ PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).bucket("id",
3).build();
+ Table table = tables.create(SCHEMA, spec, location.toString());
+
+ List<List<SimpleRecord>> expected = getTestDataForMultipleSnapshots();
+ appendData(expected, location);
+
+ table.refresh();
+
+ try {
+ Dataset<Row> df = spark.readStream()
+ .format("iceberg")
+ .load(location.toString());
+ List<SimpleRecord> actual = processStreamTillEnd(df);
+
+ Assert.assertEquals(
+ expected.stream().flatMap(List::stream).collect(Collectors.toList()),
+ actual);
+ } finally {
+ for (StreamingQuery query : spark.streams().active()) {
Review comment:
It looks like this `finally` block should be an `@After` method instead?
##########
File path:
spark3/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java
##########
@@ -0,0 +1,502 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.BaseTable;
+import org.apache.iceberg.DataOperations;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.Files;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.TableOperations;
+import org.apache.iceberg.TestHelpers;
+import org.apache.iceberg.data.FileHelpers;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.hadoop.HadoopTables;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.types.Types;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Encoders;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.streaming.DataStreamWriter;
+import org.apache.spark.sql.streaming.OutputMode;
+import org.apache.spark.sql.streaming.StreamingQuery;
+import org.apache.spark.sql.streaming.StreamingQueryException;
+import org.apache.spark.sql.streaming.Trigger;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.rules.TemporaryFolder;
+
+import static org.apache.iceberg.types.Types.NestedField.optional;
+
+public final class TestStructuredStreamingRead3 {
+ private static final Configuration CONF = new Configuration();
+ private static final Schema SCHEMA = new Schema(
+ optional(1, "id", Types.IntegerType.get()),
+ optional(2, "data", Types.StringType.get())
+ );
+ private static SparkSession spark = null;
+
+ @Rule
+ public TemporaryFolder temp = new TemporaryFolder();
+
+ @Rule
+ public ExpectedException exceptionRule = ExpectedException.none();
+
+ @BeforeClass
+ public static void startSpark() {
+ TestStructuredStreamingRead3.spark = SparkSession.builder()
+ .master("local[2]")
+ .config("spark.sql.shuffle.partitions", 4)
+ .getOrCreate();
+ }
+
+ @AfterClass
+ public static void stopSpark() {
+ SparkSession currentSpark = TestStructuredStreamingRead3.spark;
+ TestStructuredStreamingRead3.spark = null;
+ currentSpark.stop();
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void testReadStreamOnIcebergTableWithMultipleSnapshots() throws
IOException, TimeoutException {
+ File parent = temp.newFolder("parent");
+ File location = new File(parent, "test-table");
+
+ HadoopTables tables = new HadoopTables(CONF);
+ PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).bucket("id",
3).build();
+ Table table = tables.create(SCHEMA, spec, location.toString());
+
+ List<List<SimpleRecord>> expected = getTestDataForMultipleSnapshots();
+ appendData(expected, location);
+
+ table.refresh();
+
+ try {
+ Dataset<Row> df = spark.readStream()
+ .format("iceberg")
+ .load(location.toString());
+ List<SimpleRecord> actual = processStreamTillEnd(df);
+
+ Assert.assertEquals(
+ expected.stream().flatMap(List::stream).collect(Collectors.toList()),
+ actual);
+ } finally {
+ for (StreamingQuery query : spark.streams().active()) {
+ query.stop();
+ }
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void testResumingStreamReadFromCheckpoint() throws IOException,
TimeoutException, StreamingQueryException {
+ File parent = temp.newFolder("parent");
+ File location = new File(parent, "test-table");
+ File writerCheckpoint = new File(parent, "writer-checkpoint");
+
+ HadoopTables tables = new HadoopTables(CONF);
+ PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).bucket("id",
3).build();
+ Table table = tables.create(SCHEMA, spec, location.toString());
Review comment:
Is it possible to refactor table creation into a `@Before` method and
drop the table in `@After`?
##########
File path:
spark3/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java
##########
@@ -0,0 +1,502 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.BaseTable;
+import org.apache.iceberg.DataOperations;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.Files;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.TableOperations;
+import org.apache.iceberg.TestHelpers;
+import org.apache.iceberg.data.FileHelpers;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.hadoop.HadoopTables;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.types.Types;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Encoders;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.streaming.DataStreamWriter;
+import org.apache.spark.sql.streaming.OutputMode;
+import org.apache.spark.sql.streaming.StreamingQuery;
+import org.apache.spark.sql.streaming.StreamingQueryException;
+import org.apache.spark.sql.streaming.Trigger;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.rules.TemporaryFolder;
+
+import static org.apache.iceberg.types.Types.NestedField.optional;
+
+public final class TestStructuredStreamingRead3 {
+ private static final Configuration CONF = new Configuration();
+ private static final Schema SCHEMA = new Schema(
+ optional(1, "id", Types.IntegerType.get()),
+ optional(2, "data", Types.StringType.get())
+ );
+ private static SparkSession spark = null;
+
+ @Rule
+ public TemporaryFolder temp = new TemporaryFolder();
+
+ @Rule
+ public ExpectedException exceptionRule = ExpectedException.none();
+
+ @BeforeClass
+ public static void startSpark() {
+ TestStructuredStreamingRead3.spark = SparkSession.builder()
+ .master("local[2]")
+ .config("spark.sql.shuffle.partitions", 4)
+ .getOrCreate();
+ }
+
+ @AfterClass
+ public static void stopSpark() {
+ SparkSession currentSpark = TestStructuredStreamingRead3.spark;
+ TestStructuredStreamingRead3.spark = null;
+ currentSpark.stop();
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void testReadStreamOnIcebergTableWithMultipleSnapshots() throws
IOException, TimeoutException {
+ File parent = temp.newFolder("parent");
+ File location = new File(parent, "test-table");
+
+ HadoopTables tables = new HadoopTables(CONF);
+ PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).bucket("id",
3).build();
+ Table table = tables.create(SCHEMA, spec, location.toString());
+
+ List<List<SimpleRecord>> expected = getTestDataForMultipleSnapshots();
+ appendData(expected, location);
+
+ table.refresh();
+
+ try {
+ Dataset<Row> df = spark.readStream()
+ .format("iceberg")
+ .load(location.toString());
+ List<SimpleRecord> actual = processStreamTillEnd(df);
+
+ Assert.assertEquals(
+ expected.stream().flatMap(List::stream).collect(Collectors.toList()),
+ actual);
+ } finally {
+ for (StreamingQuery query : spark.streams().active()) {
+ query.stop();
+ }
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void testResumingStreamReadFromCheckpoint() throws IOException,
TimeoutException, StreamingQueryException {
+ File parent = temp.newFolder("parent");
+ File location = new File(parent, "test-table");
+ File writerCheckpoint = new File(parent, "writer-checkpoint");
+
+ HadoopTables tables = new HadoopTables(CONF);
+ PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).bucket("id",
3).build();
+ Table table = tables.create(SCHEMA, spec, location.toString());
+ final String tempView = "microBatchView";
+
+ try {
+ Dataset<Row> df = spark.readStream()
+ .format("iceberg")
+ .load(location.toString());
+ DataStreamWriter<Row> singleBatchWriter = df.writeStream()
+ .trigger(Trigger.Once())
+ .option("checkpointLocation", writerCheckpoint.toString())
+ .foreachBatch((batchDF, batchId) -> {
+ batchDF.createOrReplaceGlobalTempView(tempView);
+ });
+
+ String globalTempView = "global_temp." + tempView;
+
+ List<SimpleRecord> processStreamOnEmptyIcebergTable =
processMicroBatch(singleBatchWriter, globalTempView);
+ Assert.assertEquals(
+ Collections.emptyList(),
+ processStreamOnEmptyIcebergTable);
+
+ for (List<List<SimpleRecord>> expectedCheckpoint :
getTestDataForMultipleWritesWithMultipleSnapshots()) {
+ appendData(expectedCheckpoint, location);
+ table.refresh();
+
+ List<SimpleRecord> actualDataInCurrentMicroBatch =
processMicroBatch(singleBatchWriter, globalTempView);
+ Assert.assertEquals(
+
expectedCheckpoint.stream().flatMap(List::stream).collect(Collectors.toList()),
+ actualDataInCurrentMicroBatch);
+ }
+ } finally {
+ for (StreamingQuery query : spark.streams().active()) {
+ query.stop();
+ }
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void testParquetOrcAvroDataInOneTable() throws IOException,
TimeoutException {
+ File parent = temp.newFolder("parent");
+ File location = new File(parent, "test-table");
+
+ HadoopTables tables = new HadoopTables(CONF);
+
+ // use partition buckets - so that single list<SimpleRecord> can generate
multiple files per write
+ PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).bucket("id",
3).build();
+ Table table = tables.create(SCHEMA, spec, location.toString());
+
+ List<SimpleRecord> parquetFileRecords = Lists.newArrayList(
+ new SimpleRecord(1, "one"),
+ new SimpleRecord(2, "two"),
+ new SimpleRecord(3, "three"));
+
+ List<SimpleRecord> orcFileRecords = Lists.newArrayList(
+ new SimpleRecord(4, "four"),
+ new SimpleRecord(5, "five"));
+
+ List<SimpleRecord> avroFileRecords = Lists.newArrayList(
+ new SimpleRecord(6, "six"),
+ new SimpleRecord(7, "seven"));
+
+ appendData(parquetFileRecords, location, "parquet");
+ appendData(orcFileRecords, location, "orc");
+ appendData(avroFileRecords, location, "avro");
+
+ table.refresh();
+
+
+ try {
+ Dataset<Row> ds = spark.readStream()
+ .format("iceberg")
+ .load(location.toString());
+ List<SimpleRecord> actual = processStreamTillEnd(ds);
+ List<SimpleRecord> expected = Stream.concat(Stream.concat(
Review comment:
You may want to consider using `Iterables` instead of streams since that
will work directly with the lists and is friendly with `Assertions`.
##########
File path:
spark3/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java
##########
@@ -0,0 +1,502 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.BaseTable;
+import org.apache.iceberg.DataOperations;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.Files;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.TableOperations;
+import org.apache.iceberg.TestHelpers;
+import org.apache.iceberg.data.FileHelpers;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.hadoop.HadoopTables;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.types.Types;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Encoders;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.streaming.DataStreamWriter;
+import org.apache.spark.sql.streaming.OutputMode;
+import org.apache.spark.sql.streaming.StreamingQuery;
+import org.apache.spark.sql.streaming.StreamingQueryException;
+import org.apache.spark.sql.streaming.Trigger;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.rules.TemporaryFolder;
+
+import static org.apache.iceberg.types.Types.NestedField.optional;
+
+public final class TestStructuredStreamingRead3 {
+ private static final Configuration CONF = new Configuration();
+ private static final Schema SCHEMA = new Schema(
+ optional(1, "id", Types.IntegerType.get()),
+ optional(2, "data", Types.StringType.get())
+ );
+ private static SparkSession spark = null;
+
+ @Rule
+ public TemporaryFolder temp = new TemporaryFolder();
+
+ @Rule
+ public ExpectedException exceptionRule = ExpectedException.none();
+
+ @BeforeClass
+ public static void startSpark() {
+ TestStructuredStreamingRead3.spark = SparkSession.builder()
+ .master("local[2]")
+ .config("spark.sql.shuffle.partitions", 4)
+ .getOrCreate();
+ }
+
+ @AfterClass
+ public static void stopSpark() {
+ SparkSession currentSpark = TestStructuredStreamingRead3.spark;
+ TestStructuredStreamingRead3.spark = null;
+ currentSpark.stop();
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void testReadStreamOnIcebergTableWithMultipleSnapshots() throws
IOException, TimeoutException {
+ File parent = temp.newFolder("parent");
+ File location = new File(parent, "test-table");
+
+ HadoopTables tables = new HadoopTables(CONF);
+ PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).bucket("id",
3).build();
+ Table table = tables.create(SCHEMA, spec, location.toString());
+
+ List<List<SimpleRecord>> expected = getTestDataForMultipleSnapshots();
+ appendData(expected, location);
+
+ table.refresh();
+
+ try {
+ Dataset<Row> df = spark.readStream()
+ .format("iceberg")
+ .load(location.toString());
+ List<SimpleRecord> actual = processStreamTillEnd(df);
+
+ Assert.assertEquals(
+ expected.stream().flatMap(List::stream).collect(Collectors.toList()),
+ actual);
+ } finally {
+ for (StreamingQuery query : spark.streams().active()) {
+ query.stop();
+ }
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void testResumingStreamReadFromCheckpoint() throws IOException,
TimeoutException, StreamingQueryException {
+ File parent = temp.newFolder("parent");
+ File location = new File(parent, "test-table");
+ File writerCheckpoint = new File(parent, "writer-checkpoint");
+
+ HadoopTables tables = new HadoopTables(CONF);
+ PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).bucket("id",
3).build();
+ Table table = tables.create(SCHEMA, spec, location.toString());
+ final String tempView = "microBatchView";
+
+ try {
+ Dataset<Row> df = spark.readStream()
+ .format("iceberg")
+ .load(location.toString());
+ DataStreamWriter<Row> singleBatchWriter = df.writeStream()
+ .trigger(Trigger.Once())
+ .option("checkpointLocation", writerCheckpoint.toString())
+ .foreachBatch((batchDF, batchId) -> {
+ batchDF.createOrReplaceGlobalTempView(tempView);
+ });
+
+ String globalTempView = "global_temp." + tempView;
+
+ List<SimpleRecord> processStreamOnEmptyIcebergTable =
processMicroBatch(singleBatchWriter, globalTempView);
+ Assert.assertEquals(
+ Collections.emptyList(),
+ processStreamOnEmptyIcebergTable);
+
+ for (List<List<SimpleRecord>> expectedCheckpoint :
getTestDataForMultipleWritesWithMultipleSnapshots()) {
+ appendData(expectedCheckpoint, location);
+ table.refresh();
+
+ List<SimpleRecord> actualDataInCurrentMicroBatch =
processMicroBatch(singleBatchWriter, globalTempView);
+ Assert.assertEquals(
+
expectedCheckpoint.stream().flatMap(List::stream).collect(Collectors.toList()),
+ actualDataInCurrentMicroBatch);
+ }
+ } finally {
+ for (StreamingQuery query : spark.streams().active()) {
+ query.stop();
+ }
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void testParquetOrcAvroDataInOneTable() throws IOException,
TimeoutException {
Review comment:
It's okay to have this, but I don't see much of a benefit to testing
this. File formats are orthogonal to the streaming that is done in this PR.
##########
File path:
spark3/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java
##########
@@ -0,0 +1,502 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.BaseTable;
+import org.apache.iceberg.DataOperations;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.Files;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.TableOperations;
+import org.apache.iceberg.TestHelpers;
+import org.apache.iceberg.data.FileHelpers;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.hadoop.HadoopTables;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.types.Types;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Encoders;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.streaming.DataStreamWriter;
+import org.apache.spark.sql.streaming.OutputMode;
+import org.apache.spark.sql.streaming.StreamingQuery;
+import org.apache.spark.sql.streaming.StreamingQueryException;
+import org.apache.spark.sql.streaming.Trigger;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.rules.TemporaryFolder;
+
+import static org.apache.iceberg.types.Types.NestedField.optional;
+
+public final class TestStructuredStreamingRead3 {
+ private static final Configuration CONF = new Configuration();
+ private static final Schema SCHEMA = new Schema(
+ optional(1, "id", Types.IntegerType.get()),
+ optional(2, "data", Types.StringType.get())
+ );
+ private static SparkSession spark = null;
+
+ @Rule
+ public TemporaryFolder temp = new TemporaryFolder();
+
+ @Rule
+ public ExpectedException exceptionRule = ExpectedException.none();
+
+ @BeforeClass
+ public static void startSpark() {
+ TestStructuredStreamingRead3.spark = SparkSession.builder()
+ .master("local[2]")
+ .config("spark.sql.shuffle.partitions", 4)
+ .getOrCreate();
+ }
+
+ @AfterClass
+ public static void stopSpark() {
+ SparkSession currentSpark = TestStructuredStreamingRead3.spark;
+ TestStructuredStreamingRead3.spark = null;
+ currentSpark.stop();
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void testReadStreamOnIcebergTableWithMultipleSnapshots() throws
IOException, TimeoutException {
+ File parent = temp.newFolder("parent");
+ File location = new File(parent, "test-table");
+
+ HadoopTables tables = new HadoopTables(CONF);
+ PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).bucket("id",
3).build();
+ Table table = tables.create(SCHEMA, spec, location.toString());
+
+ List<List<SimpleRecord>> expected = getTestDataForMultipleSnapshots();
+ appendData(expected, location);
+
+ table.refresh();
+
+ try {
+ Dataset<Row> df = spark.readStream()
+ .format("iceberg")
+ .load(location.toString());
+ List<SimpleRecord> actual = processStreamTillEnd(df);
+
+ Assert.assertEquals(
+ expected.stream().flatMap(List::stream).collect(Collectors.toList()),
+ actual);
+ } finally {
+ for (StreamingQuery query : spark.streams().active()) {
+ query.stop();
+ }
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void testResumingStreamReadFromCheckpoint() throws IOException,
TimeoutException, StreamingQueryException {
+ File parent = temp.newFolder("parent");
+ File location = new File(parent, "test-table");
+ File writerCheckpoint = new File(parent, "writer-checkpoint");
+
+ HadoopTables tables = new HadoopTables(CONF);
+ PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).bucket("id",
3).build();
+ Table table = tables.create(SCHEMA, spec, location.toString());
+ final String tempView = "microBatchView";
+
+ try {
+ Dataset<Row> df = spark.readStream()
+ .format("iceberg")
+ .load(location.toString());
+ DataStreamWriter<Row> singleBatchWriter = df.writeStream()
+ .trigger(Trigger.Once())
+ .option("checkpointLocation", writerCheckpoint.toString())
+ .foreachBatch((batchDF, batchId) -> {
+ batchDF.createOrReplaceGlobalTempView(tempView);
+ });
+
+ String globalTempView = "global_temp." + tempView;
+
+ List<SimpleRecord> processStreamOnEmptyIcebergTable =
processMicroBatch(singleBatchWriter, globalTempView);
+ Assert.assertEquals(
+ Collections.emptyList(),
+ processStreamOnEmptyIcebergTable);
+
+ for (List<List<SimpleRecord>> expectedCheckpoint :
getTestDataForMultipleWritesWithMultipleSnapshots()) {
+ appendData(expectedCheckpoint, location);
+ table.refresh();
+
+ List<SimpleRecord> actualDataInCurrentMicroBatch =
processMicroBatch(singleBatchWriter, globalTempView);
+ Assert.assertEquals(
+
expectedCheckpoint.stream().flatMap(List::stream).collect(Collectors.toList()),
+ actualDataInCurrentMicroBatch);
+ }
+ } finally {
+ for (StreamingQuery query : spark.streams().active()) {
+ query.stop();
+ }
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void testParquetOrcAvroDataInOneTable() throws IOException,
TimeoutException {
+ File parent = temp.newFolder("parent");
+ File location = new File(parent, "test-table");
+
+ HadoopTables tables = new HadoopTables(CONF);
+
+ // use partition buckets - so that single list<SimpleRecord> can generate
multiple files per write
+ PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).bucket("id",
3).build();
+ Table table = tables.create(SCHEMA, spec, location.toString());
+
+ List<SimpleRecord> parquetFileRecords = Lists.newArrayList(
+ new SimpleRecord(1, "one"),
+ new SimpleRecord(2, "two"),
+ new SimpleRecord(3, "three"));
+
+ List<SimpleRecord> orcFileRecords = Lists.newArrayList(
+ new SimpleRecord(4, "four"),
+ new SimpleRecord(5, "five"));
+
+ List<SimpleRecord> avroFileRecords = Lists.newArrayList(
+ new SimpleRecord(6, "six"),
+ new SimpleRecord(7, "seven"));
+
+ appendData(parquetFileRecords, location, "parquet");
+ appendData(orcFileRecords, location, "orc");
+ appendData(avroFileRecords, location, "avro");
+
+ table.refresh();
+
+
+ try {
+ Dataset<Row> ds = spark.readStream()
+ .format("iceberg")
+ .load(location.toString());
+ List<SimpleRecord> actual = processStreamTillEnd(ds);
+ List<SimpleRecord> expected = Stream.concat(Stream.concat(
+ parquetFileRecords.stream(),
+ orcFileRecords.stream()),
+ avroFileRecords.stream())
+ .collect(Collectors.toList());
+ Assert.assertEquals(expected, actual);
+ } finally {
+ for (StreamingQuery query : spark.streams().active()) {
+ query.stop();
+ }
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void testReadStreamFromEmptyTable() throws IOException,
TimeoutException {
Review comment:
Can you add a new test case (or extend this one) to write a single batch
and make sure that a table that starts with no snapshot works as expected after
data is added?
##########
File path:
spark3/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java
##########
@@ -0,0 +1,502 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.BaseTable;
+import org.apache.iceberg.DataOperations;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.Files;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.TableOperations;
+import org.apache.iceberg.TestHelpers;
+import org.apache.iceberg.data.FileHelpers;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.hadoop.HadoopTables;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.types.Types;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Encoders;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.streaming.DataStreamWriter;
+import org.apache.spark.sql.streaming.OutputMode;
+import org.apache.spark.sql.streaming.StreamingQuery;
+import org.apache.spark.sql.streaming.StreamingQueryException;
+import org.apache.spark.sql.streaming.Trigger;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.rules.TemporaryFolder;
+
+import static org.apache.iceberg.types.Types.NestedField.optional;
+
+public final class TestStructuredStreamingRead3 {
+ private static final Configuration CONF = new Configuration();
+ private static final Schema SCHEMA = new Schema(
+ optional(1, "id", Types.IntegerType.get()),
+ optional(2, "data", Types.StringType.get())
+ );
+ private static SparkSession spark = null;
+
+ @Rule
+ public TemporaryFolder temp = new TemporaryFolder();
+
+ @Rule
+ public ExpectedException exceptionRule = ExpectedException.none();
+
+ @BeforeClass
+ public static void startSpark() {
+ TestStructuredStreamingRead3.spark = SparkSession.builder()
+ .master("local[2]")
+ .config("spark.sql.shuffle.partitions", 4)
+ .getOrCreate();
+ }
+
+ @AfterClass
+ public static void stopSpark() {
+ SparkSession currentSpark = TestStructuredStreamingRead3.spark;
+ TestStructuredStreamingRead3.spark = null;
+ currentSpark.stop();
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void testReadStreamOnIcebergTableWithMultipleSnapshots() throws
IOException, TimeoutException {
+ File parent = temp.newFolder("parent");
+ File location = new File(parent, "test-table");
+
+ HadoopTables tables = new HadoopTables(CONF);
+ PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).bucket("id",
3).build();
+ Table table = tables.create(SCHEMA, spec, location.toString());
+
+ List<List<SimpleRecord>> expected = getTestDataForMultipleSnapshots();
+ appendData(expected, location);
+
+ table.refresh();
+
+ try {
+ Dataset<Row> df = spark.readStream()
+ .format("iceberg")
+ .load(location.toString());
+ List<SimpleRecord> actual = processStreamTillEnd(df);
+
+ Assert.assertEquals(
+ expected.stream().flatMap(List::stream).collect(Collectors.toList()),
+ actual);
+ } finally {
+ for (StreamingQuery query : spark.streams().active()) {
+ query.stop();
+ }
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void testResumingStreamReadFromCheckpoint() throws IOException,
TimeoutException, StreamingQueryException {
+ File parent = temp.newFolder("parent");
+ File location = new File(parent, "test-table");
+ File writerCheckpoint = new File(parent, "writer-checkpoint");
+
+ HadoopTables tables = new HadoopTables(CONF);
+ PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).bucket("id",
3).build();
+ Table table = tables.create(SCHEMA, spec, location.toString());
+ final String tempView = "microBatchView";
+
+ try {
+ Dataset<Row> df = spark.readStream()
+ .format("iceberg")
+ .load(location.toString());
+ DataStreamWriter<Row> singleBatchWriter = df.writeStream()
+ .trigger(Trigger.Once())
+ .option("checkpointLocation", writerCheckpoint.toString())
+ .foreachBatch((batchDF, batchId) -> {
+ batchDF.createOrReplaceGlobalTempView(tempView);
+ });
+
+ String globalTempView = "global_temp." + tempView;
+
+ List<SimpleRecord> processStreamOnEmptyIcebergTable =
processMicroBatch(singleBatchWriter, globalTempView);
+ Assert.assertEquals(
+ Collections.emptyList(),
+ processStreamOnEmptyIcebergTable);
+
+ for (List<List<SimpleRecord>> expectedCheckpoint :
getTestDataForMultipleWritesWithMultipleSnapshots()) {
+ appendData(expectedCheckpoint, location);
+ table.refresh();
+
+ List<SimpleRecord> actualDataInCurrentMicroBatch =
processMicroBatch(singleBatchWriter, globalTempView);
+ Assert.assertEquals(
+
expectedCheckpoint.stream().flatMap(List::stream).collect(Collectors.toList()),
+ actualDataInCurrentMicroBatch);
+ }
+ } finally {
+ for (StreamingQuery query : spark.streams().active()) {
+ query.stop();
+ }
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void testParquetOrcAvroDataInOneTable() throws IOException,
TimeoutException {
+ File parent = temp.newFolder("parent");
+ File location = new File(parent, "test-table");
+
+ HadoopTables tables = new HadoopTables(CONF);
+
+ // use partition buckets - so that single list<SimpleRecord> can generate
multiple files per write
+ PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).bucket("id",
3).build();
+ Table table = tables.create(SCHEMA, spec, location.toString());
+
+ List<SimpleRecord> parquetFileRecords = Lists.newArrayList(
+ new SimpleRecord(1, "one"),
+ new SimpleRecord(2, "two"),
+ new SimpleRecord(3, "three"));
+
+ List<SimpleRecord> orcFileRecords = Lists.newArrayList(
+ new SimpleRecord(4, "four"),
+ new SimpleRecord(5, "five"));
+
+ List<SimpleRecord> avroFileRecords = Lists.newArrayList(
+ new SimpleRecord(6, "six"),
+ new SimpleRecord(7, "seven"));
+
+ appendData(parquetFileRecords, location, "parquet");
+ appendData(orcFileRecords, location, "orc");
+ appendData(avroFileRecords, location, "avro");
+
+ table.refresh();
+
+
+ try {
+ Dataset<Row> ds = spark.readStream()
+ .format("iceberg")
+ .load(location.toString());
+ List<SimpleRecord> actual = processStreamTillEnd(ds);
+ List<SimpleRecord> expected = Stream.concat(Stream.concat(
+ parquetFileRecords.stream(),
+ orcFileRecords.stream()),
+ avroFileRecords.stream())
+ .collect(Collectors.toList());
+ Assert.assertEquals(expected, actual);
+ } finally {
+ for (StreamingQuery query : spark.streams().active()) {
+ query.stop();
+ }
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void testReadStreamFromEmptyTable() throws IOException,
TimeoutException {
+ File parent = temp.newFolder("parent");
+ File location = new File(parent, "test-table");
+
+ HadoopTables tables = new HadoopTables(CONF);
+ PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).bucket("id",
3).build();
+ Table table = tables.create(SCHEMA, spec, location.toString());
+
+ table.refresh();
+
+ try {
+ Dataset<Row> df = spark.readStream()
+ .format("iceberg")
+ .load(location.toString());
+
+ List<SimpleRecord> actual = processStreamTillEnd(df);
+ Assert.assertEquals(Collections.emptyList(), actual);
+ } finally {
+ for (StreamingQuery query : spark.streams().active()) {
+ query.stop();
+ }
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void testReadStreamWithSnapshotTypeOverwriteErrorsOut() throws
IOException, TimeoutException {
+ File parent = temp.newFolder("parent");
+ File location = new File(parent, "test-table");
+
+ HadoopTables tables = new HadoopTables(CONF);
+ PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).bucket("id",
3).build();
+ Table table = tables.create(SCHEMA, spec, location.toString());
+
+ // upgrade table to verison 2 - to facilitate creation of Snapshot of type
OVERWRITE.
+ TableOperations ops = ((BaseTable) table).operations();
+ TableMetadata meta = ops.current();
+ ops.commit(meta, meta.upgradeToFormatVersion(2));
+
+ // fill table with some data
+ List<List<SimpleRecord>> dataAcrossSnapshots =
getTestDataForMultipleSnapshots();
+ appendData(dataAcrossSnapshots, location);
+
+ table.refresh();
+
+ Schema deleteRowSchema = table.schema().select("data");
+ Record dataDelete = GenericRecord.create(deleteRowSchema);
+ List<Record> dataDeletes = Lists.newArrayList(
+ dataDelete.copy("data", "one") // id = 1
+ );
+
+ DeleteFile eqDeletes = FileHelpers.writeDeleteFile(
+ table, Files.localOutput(temp.newFile()), TestHelpers.Row.of(0),
dataDeletes, deleteRowSchema);
+
+ table.newRowDelta()
+ .addDeletes(eqDeletes)
+ .commit();
+
+ // check pre-condition - that the above Delete file write - actually
resulted in snapshot of type OVERWRITE
+ Assert.assertEquals(DataOperations.OVERWRITE,
table.currentSnapshot().operation());
+
+ try {
+ Dataset<Row> df = spark.readStream()
+ .format("iceberg")
+ .load(location.toString());
+ StreamingQuery streamingQuery = df.writeStream()
+ .format("memory")
+ .queryName("testtablewithoverwrites")
+ .outputMode(OutputMode.Append())
+ .start();
+
+ try {
+ streamingQuery.processAllAvailable();
+ Assert.assertTrue(false); // should be unreachable
+ } catch (Exception exception) {
Review comment:
Please use `AssertHelpers` or `Assertions` for this instead of coding
the try/catch in each case.
##########
File path:
spark3/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java
##########
@@ -0,0 +1,502 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.BaseTable;
+import org.apache.iceberg.DataOperations;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.Files;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.TableOperations;
+import org.apache.iceberg.TestHelpers;
+import org.apache.iceberg.data.FileHelpers;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.hadoop.HadoopTables;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.types.Types;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Encoders;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.streaming.DataStreamWriter;
+import org.apache.spark.sql.streaming.OutputMode;
+import org.apache.spark.sql.streaming.StreamingQuery;
+import org.apache.spark.sql.streaming.StreamingQueryException;
+import org.apache.spark.sql.streaming.Trigger;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.rules.TemporaryFolder;
+
+import static org.apache.iceberg.types.Types.NestedField.optional;
+
+public final class TestStructuredStreamingRead3 {
+ private static final Configuration CONF = new Configuration();
+ private static final Schema SCHEMA = new Schema(
+ optional(1, "id", Types.IntegerType.get()),
+ optional(2, "data", Types.StringType.get())
+ );
+ private static SparkSession spark = null;
+
+ @Rule
+ public TemporaryFolder temp = new TemporaryFolder();
+
+ @Rule
+ public ExpectedException exceptionRule = ExpectedException.none();
+
+ @BeforeClass
+ public static void startSpark() {
+ TestStructuredStreamingRead3.spark = SparkSession.builder()
+ .master("local[2]")
+ .config("spark.sql.shuffle.partitions", 4)
+ .getOrCreate();
+ }
+
+ @AfterClass
+ public static void stopSpark() {
+ SparkSession currentSpark = TestStructuredStreamingRead3.spark;
+ TestStructuredStreamingRead3.spark = null;
+ currentSpark.stop();
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void testReadStreamOnIcebergTableWithMultipleSnapshots() throws
IOException, TimeoutException {
+ File parent = temp.newFolder("parent");
+ File location = new File(parent, "test-table");
+
+ HadoopTables tables = new HadoopTables(CONF);
+ PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).bucket("id",
3).build();
+ Table table = tables.create(SCHEMA, spec, location.toString());
+
+ List<List<SimpleRecord>> expected = getTestDataForMultipleSnapshots();
+ appendData(expected, location);
+
+ table.refresh();
+
+ try {
+ Dataset<Row> df = spark.readStream()
+ .format("iceberg")
+ .load(location.toString());
+ List<SimpleRecord> actual = processStreamTillEnd(df);
+
+ Assert.assertEquals(
+ expected.stream().flatMap(List::stream).collect(Collectors.toList()),
+ actual);
+ } finally {
+ for (StreamingQuery query : spark.streams().active()) {
+ query.stop();
+ }
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void testResumingStreamReadFromCheckpoint() throws IOException,
TimeoutException, StreamingQueryException {
+ File parent = temp.newFolder("parent");
+ File location = new File(parent, "test-table");
+ File writerCheckpoint = new File(parent, "writer-checkpoint");
+
+ HadoopTables tables = new HadoopTables(CONF);
+ PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).bucket("id",
3).build();
+ Table table = tables.create(SCHEMA, spec, location.toString());
+ final String tempView = "microBatchView";
+
+ try {
+ Dataset<Row> df = spark.readStream()
+ .format("iceberg")
+ .load(location.toString());
+ DataStreamWriter<Row> singleBatchWriter = df.writeStream()
+ .trigger(Trigger.Once())
+ .option("checkpointLocation", writerCheckpoint.toString())
+ .foreachBatch((batchDF, batchId) -> {
+ batchDF.createOrReplaceGlobalTempView(tempView);
+ });
+
+ String globalTempView = "global_temp." + tempView;
+
+ List<SimpleRecord> processStreamOnEmptyIcebergTable =
processMicroBatch(singleBatchWriter, globalTempView);
+ Assert.assertEquals(
+ Collections.emptyList(),
+ processStreamOnEmptyIcebergTable);
+
+ for (List<List<SimpleRecord>> expectedCheckpoint :
getTestDataForMultipleWritesWithMultipleSnapshots()) {
+ appendData(expectedCheckpoint, location);
+ table.refresh();
+
+ List<SimpleRecord> actualDataInCurrentMicroBatch =
processMicroBatch(singleBatchWriter, globalTempView);
+ Assert.assertEquals(
+
expectedCheckpoint.stream().flatMap(List::stream).collect(Collectors.toList()),
+ actualDataInCurrentMicroBatch);
+ }
+ } finally {
+ for (StreamingQuery query : spark.streams().active()) {
+ query.stop();
+ }
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void testParquetOrcAvroDataInOneTable() throws IOException,
TimeoutException {
+ File parent = temp.newFolder("parent");
+ File location = new File(parent, "test-table");
+
+ HadoopTables tables = new HadoopTables(CONF);
+
+ // use partition buckets - so that single list<SimpleRecord> can generate
multiple files per write
+ PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).bucket("id",
3).build();
+ Table table = tables.create(SCHEMA, spec, location.toString());
+
+ List<SimpleRecord> parquetFileRecords = Lists.newArrayList(
+ new SimpleRecord(1, "one"),
+ new SimpleRecord(2, "two"),
+ new SimpleRecord(3, "three"));
+
+ List<SimpleRecord> orcFileRecords = Lists.newArrayList(
+ new SimpleRecord(4, "four"),
+ new SimpleRecord(5, "five"));
+
+ List<SimpleRecord> avroFileRecords = Lists.newArrayList(
+ new SimpleRecord(6, "six"),
+ new SimpleRecord(7, "seven"));
+
+ appendData(parquetFileRecords, location, "parquet");
+ appendData(orcFileRecords, location, "orc");
+ appendData(avroFileRecords, location, "avro");
+
+ table.refresh();
+
+
+ try {
+ Dataset<Row> ds = spark.readStream()
+ .format("iceberg")
+ .load(location.toString());
+ List<SimpleRecord> actual = processStreamTillEnd(ds);
+ List<SimpleRecord> expected = Stream.concat(Stream.concat(
+ parquetFileRecords.stream(),
+ orcFileRecords.stream()),
+ avroFileRecords.stream())
+ .collect(Collectors.toList());
+ Assert.assertEquals(expected, actual);
+ } finally {
+ for (StreamingQuery query : spark.streams().active()) {
+ query.stop();
+ }
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void testReadStreamFromEmptyTable() throws IOException,
TimeoutException {
+ File parent = temp.newFolder("parent");
+ File location = new File(parent, "test-table");
+
+ HadoopTables tables = new HadoopTables(CONF);
+ PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).bucket("id",
3).build();
+ Table table = tables.create(SCHEMA, spec, location.toString());
+
+ table.refresh();
+
+ try {
+ Dataset<Row> df = spark.readStream()
+ .format("iceberg")
+ .load(location.toString());
+
+ List<SimpleRecord> actual = processStreamTillEnd(df);
+ Assert.assertEquals(Collections.emptyList(), actual);
+ } finally {
+ for (StreamingQuery query : spark.streams().active()) {
+ query.stop();
+ }
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void testReadStreamWithSnapshotTypeOverwriteErrorsOut() throws
IOException, TimeoutException {
+ File parent = temp.newFolder("parent");
+ File location = new File(parent, "test-table");
+
+ HadoopTables tables = new HadoopTables(CONF);
+ PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).bucket("id",
3).build();
+ Table table = tables.create(SCHEMA, spec, location.toString());
+
+ // upgrade table to verison 2 - to facilitate creation of Snapshot of type
OVERWRITE.
+ TableOperations ops = ((BaseTable) table).operations();
+ TableMetadata meta = ops.current();
+ ops.commit(meta, meta.upgradeToFormatVersion(2));
+
+ // fill table with some data
+ List<List<SimpleRecord>> dataAcrossSnapshots =
getTestDataForMultipleSnapshots();
+ appendData(dataAcrossSnapshots, location);
+
+ table.refresh();
+
+ Schema deleteRowSchema = table.schema().select("data");
+ Record dataDelete = GenericRecord.create(deleteRowSchema);
+ List<Record> dataDeletes = Lists.newArrayList(
+ dataDelete.copy("data", "one") // id = 1
+ );
+
+ DeleteFile eqDeletes = FileHelpers.writeDeleteFile(
+ table, Files.localOutput(temp.newFile()), TestHelpers.Row.of(0),
dataDeletes, deleteRowSchema);
+
+ table.newRowDelta()
+ .addDeletes(eqDeletes)
+ .commit();
+
+ // check pre-condition - that the above Delete file write - actually
resulted in snapshot of type OVERWRITE
+ Assert.assertEquals(DataOperations.OVERWRITE,
table.currentSnapshot().operation());
+
+ try {
+ Dataset<Row> df = spark.readStream()
+ .format("iceberg")
+ .load(location.toString());
+ StreamingQuery streamingQuery = df.writeStream()
+ .format("memory")
+ .queryName("testtablewithoverwrites")
+ .outputMode(OutputMode.Append())
+ .start();
+
+ try {
+ streamingQuery.processAllAvailable();
+ Assert.assertTrue(false); // should be unreachable
+ } catch (Exception exception) {
+ Assert.assertTrue(exception instanceof StreamingQueryException);
+ Assert.assertTrue(((StreamingQueryException) exception).cause()
instanceof IllegalStateException);
+ }
+
+ } finally {
+ for (StreamingQuery query : spark.streams().active()) {
+ query.stop();
+ }
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void testReadStreamWithSnapshotTypeReplaceErrorsOut() throws
IOException, TimeoutException {
+ File parent = temp.newFolder("parent");
+ File location = new File(parent, "test-table");
+
+ HadoopTables tables = new HadoopTables(CONF);
+ PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).build();
+ Table table = tables.create(SCHEMA, spec, location.toString());
+
+ // fill table with some data
+ List<List<SimpleRecord>> dataAcrossSnapshots =
getTestDataForMultipleSnapshots();
+ appendData(dataAcrossSnapshots, location);
+
+ table.refresh();
+
+ // this should create a snapshot with type Replace.
+ table.rewriteManifests()
+ .clusterBy(f -> 1)
+ .commit();
+
+ // check pre-condition
+ Assert.assertEquals(DataOperations.REPLACE,
table.currentSnapshot().operation());
+
+ try {
+ Dataset<Row> df = spark.readStream()
+ .format("iceberg")
+ .load(location.toString());
+ StreamingQuery streamingQuery = df.writeStream()
+ .format("memory")
+ .queryName("testtablewithreplace")
+ .outputMode(OutputMode.Append())
+ .start();
+
+ try {
+ streamingQuery.processAllAvailable();
+ Assert.assertTrue(false); // should be unreachable
+ } catch (Exception exception) {
+ Assert.assertTrue(exception instanceof StreamingQueryException);
+ Assert.assertTrue(((StreamingQueryException) exception).cause()
instanceof IllegalStateException);
+ }
+
+ } finally {
+ for (StreamingQuery query : spark.streams().active()) {
+ query.stop();
+ }
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void testReadStreamWithSnapshotTypeDeleteErrorsOut() throws
IOException, TimeoutException {
+ File parent = temp.newFolder("parent");
+ File location = new File(parent, "test-table");
+
+ HadoopTables tables = new HadoopTables(CONF);
+ PartitionSpec spec =
PartitionSpec.builderFor(SCHEMA).identity("id").build();
+ Table table = tables.create(SCHEMA, spec, location.toString());
+
+ // fill table with some data
+ List<List<SimpleRecord>> dataAcrossSnapshots =
getTestDataForMultipleSnapshots();
+ appendData(dataAcrossSnapshots, location);
+
+ table.refresh();
+
+ // this should create a snapshot with type delete.
+ table.newDelete()
+ .deleteFromRowFilter(Expressions.equal("id", 4))
+ .commit();
+
+ // check pre-condition - that the above newDelete operation on table
resulted in Snapshot of Type DELETE.
+ Assert.assertEquals(DataOperations.DELETE,
table.currentSnapshot().operation());
+
+ try {
+ Dataset<Row> df = spark.readStream()
+ .format("iceberg")
+ .load(location.toString());
+ StreamingQuery streamingQuery = df.writeStream()
+ .format("memory")
+ .queryName("testtablewithdelete")
+ .outputMode(OutputMode.Append())
+ .start();
+
+ try {
+ streamingQuery.processAllAvailable();
+ Assert.assertTrue(false); // should be unreachable
+ } catch (Exception exception) {
+ Assert.assertTrue(exception instanceof StreamingQueryException);
+ Assert.assertTrue(((StreamingQueryException) exception).cause()
instanceof IllegalStateException);
+ }
+
+ } finally {
+ for (StreamingQuery query : spark.streams().active()) {
+ query.stop();
+ }
+ }
+ }
+
+ private static List<SimpleRecord> processMicroBatch(DataStreamWriter<Row>
singleBatchWriter, String viewName)
+ throws TimeoutException, StreamingQueryException {
+ StreamingQuery streamingQuery = singleBatchWriter.start();
+ streamingQuery.awaitTermination();
+
+ return spark.sql(String.format("select * from %s", viewName))
+ .as(Encoders.bean(SimpleRecord.class))
+ .collectAsList();
+ }
+
+ /**
+ * get test data - a list of records per snapshot
+ */
+ private static List<List<SimpleRecord>> getTestDataForMultipleSnapshots() {
+ return Lists.newArrayList(
+ Lists.newArrayList(
+ new SimpleRecord(1, "one"),
+ new SimpleRecord(2, "two"),
+ new SimpleRecord(3, "three")),
+ Lists.newArrayList(
+ new SimpleRecord(4, "four"),
+ new SimpleRecord(5, "five")),
+ Lists.newArrayList(
+ new SimpleRecord(6, "six"),
+ new SimpleRecord(7, "seven"))
+ );
+ }
+
+ /**
+ * appends each list as a Snapshot on the iceberg table at the given
location.
+ * accepts a list of lists - each list representing data per snapshot.
+ */
+ private static void appendData(List<List<SimpleRecord>> data, File location)
{
+ // generate multiple snapshots
+ for (List<SimpleRecord> l : data) {
+ appendData(l, location, "parquet");
+ }
+ }
+
+ private static void appendData(List<SimpleRecord> data, File location,
String fileFormat) {
+ Dataset<Row> df = spark.createDataFrame(data, SimpleRecord.class);
+ df.select("id", "data").write()
+ .format("iceberg")
+ .option("write-format", fileFormat)
+ .mode("append")
+ .save(location.toString());
+ }
+
+ private static List<SimpleRecord> processStreamTillEnd(Dataset<Row> df)
throws TimeoutException {
Review comment:
Looks like this method name is slightly misleading because it isn't
processing and ending the stream, it is processing all of the available data.
How about `processAvailable`?
##########
File path:
spark3/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java
##########
@@ -0,0 +1,502 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.BaseTable;
+import org.apache.iceberg.DataOperations;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.Files;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.TableOperations;
+import org.apache.iceberg.TestHelpers;
+import org.apache.iceberg.data.FileHelpers;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.hadoop.HadoopTables;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.types.Types;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Encoders;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.streaming.DataStreamWriter;
+import org.apache.spark.sql.streaming.OutputMode;
+import org.apache.spark.sql.streaming.StreamingQuery;
+import org.apache.spark.sql.streaming.StreamingQueryException;
+import org.apache.spark.sql.streaming.Trigger;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.rules.TemporaryFolder;
+
+import static org.apache.iceberg.types.Types.NestedField.optional;
+
+public final class TestStructuredStreamingRead3 {
+ private static final Configuration CONF = new Configuration();
+ private static final Schema SCHEMA = new Schema(
+ optional(1, "id", Types.IntegerType.get()),
+ optional(2, "data", Types.StringType.get())
+ );
+ private static SparkSession spark = null;
+
+ @Rule
+ public TemporaryFolder temp = new TemporaryFolder();
+
+ @Rule
+ public ExpectedException exceptionRule = ExpectedException.none();
+
+ @BeforeClass
+ public static void startSpark() {
+ TestStructuredStreamingRead3.spark = SparkSession.builder()
+ .master("local[2]")
+ .config("spark.sql.shuffle.partitions", 4)
+ .getOrCreate();
+ }
+
+ @AfterClass
+ public static void stopSpark() {
+ SparkSession currentSpark = TestStructuredStreamingRead3.spark;
+ TestStructuredStreamingRead3.spark = null;
+ currentSpark.stop();
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void testReadStreamOnIcebergTableWithMultipleSnapshots() throws
IOException, TimeoutException {
+ File parent = temp.newFolder("parent");
+ File location = new File(parent, "test-table");
+
+ HadoopTables tables = new HadoopTables(CONF);
+ PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).bucket("id",
3).build();
+ Table table = tables.create(SCHEMA, spec, location.toString());
+
+ List<List<SimpleRecord>> expected = getTestDataForMultipleSnapshots();
+ appendData(expected, location);
+
+ table.refresh();
+
+ try {
+ Dataset<Row> df = spark.readStream()
+ .format("iceberg")
+ .load(location.toString());
+ List<SimpleRecord> actual = processStreamTillEnd(df);
+
+ Assert.assertEquals(
+ expected.stream().flatMap(List::stream).collect(Collectors.toList()),
+ actual);
+ } finally {
+ for (StreamingQuery query : spark.streams().active()) {
+ query.stop();
+ }
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void testResumingStreamReadFromCheckpoint() throws IOException,
TimeoutException, StreamingQueryException {
+ File parent = temp.newFolder("parent");
+ File location = new File(parent, "test-table");
+ File writerCheckpoint = new File(parent, "writer-checkpoint");
+
+ HadoopTables tables = new HadoopTables(CONF);
+ PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).bucket("id",
3).build();
+ Table table = tables.create(SCHEMA, spec, location.toString());
+ final String tempView = "microBatchView";
+
+ try {
+ Dataset<Row> df = spark.readStream()
+ .format("iceberg")
+ .load(location.toString());
+ DataStreamWriter<Row> singleBatchWriter = df.writeStream()
+ .trigger(Trigger.Once())
Review comment:
I think that the intent is that `Trigger.Once()` combined with
`awaitTermination` in `processMicroBatch` will stop the stream, is that
correct? I think that it would be helpful to have a comment above this
explaining how this is causing Spark to checkpoint and start new streams for
each call to `processMicroBatch`.
##########
File path:
spark3/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java
##########
@@ -0,0 +1,502 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.BaseTable;
+import org.apache.iceberg.DataOperations;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.Files;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.TableOperations;
+import org.apache.iceberg.TestHelpers;
+import org.apache.iceberg.data.FileHelpers;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.hadoop.HadoopTables;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.types.Types;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Encoders;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.streaming.DataStreamWriter;
+import org.apache.spark.sql.streaming.OutputMode;
+import org.apache.spark.sql.streaming.StreamingQuery;
+import org.apache.spark.sql.streaming.StreamingQueryException;
+import org.apache.spark.sql.streaming.Trigger;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.rules.TemporaryFolder;
+
+import static org.apache.iceberg.types.Types.NestedField.optional;
+
+public final class TestStructuredStreamingRead3 {
+ private static final Configuration CONF = new Configuration();
+ private static final Schema SCHEMA = new Schema(
+ optional(1, "id", Types.IntegerType.get()),
+ optional(2, "data", Types.StringType.get())
+ );
+ private static SparkSession spark = null;
+
+ @Rule
+ public TemporaryFolder temp = new TemporaryFolder();
+
+ @Rule
+ public ExpectedException exceptionRule = ExpectedException.none();
+
+ @BeforeClass
+ public static void startSpark() {
+ TestStructuredStreamingRead3.spark = SparkSession.builder()
+ .master("local[2]")
+ .config("spark.sql.shuffle.partitions", 4)
+ .getOrCreate();
+ }
+
+ @AfterClass
+ public static void stopSpark() {
+ SparkSession currentSpark = TestStructuredStreamingRead3.spark;
+ TestStructuredStreamingRead3.spark = null;
+ currentSpark.stop();
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void testReadStreamOnIcebergTableWithMultipleSnapshots() throws
IOException, TimeoutException {
+ File parent = temp.newFolder("parent");
+ File location = new File(parent, "test-table");
+
+ HadoopTables tables = new HadoopTables(CONF);
+ PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).bucket("id",
3).build();
+ Table table = tables.create(SCHEMA, spec, location.toString());
+
+ List<List<SimpleRecord>> expected = getTestDataForMultipleSnapshots();
+ appendData(expected, location);
+
+ table.refresh();
+
+ try {
+ Dataset<Row> df = spark.readStream()
+ .format("iceberg")
+ .load(location.toString());
+ List<SimpleRecord> actual = processStreamTillEnd(df);
+
+ Assert.assertEquals(
+ expected.stream().flatMap(List::stream).collect(Collectors.toList()),
+ actual);
+ } finally {
+ for (StreamingQuery query : spark.streams().active()) {
+ query.stop();
+ }
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void testResumingStreamReadFromCheckpoint() throws IOException,
TimeoutException, StreamingQueryException {
+ File parent = temp.newFolder("parent");
+ File location = new File(parent, "test-table");
+ File writerCheckpoint = new File(parent, "writer-checkpoint");
+
+ HadoopTables tables = new HadoopTables(CONF);
+ PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).bucket("id",
3).build();
+ Table table = tables.create(SCHEMA, spec, location.toString());
+ final String tempView = "microBatchView";
+
+ try {
+ Dataset<Row> df = spark.readStream()
+ .format("iceberg")
+ .load(location.toString());
+ DataStreamWriter<Row> singleBatchWriter = df.writeStream()
+ .trigger(Trigger.Once())
+ .option("checkpointLocation", writerCheckpoint.toString())
+ .foreachBatch((batchDF, batchId) -> {
+ batchDF.createOrReplaceGlobalTempView(tempView);
+ });
+
+ String globalTempView = "global_temp." + tempView;
+
+ List<SimpleRecord> processStreamOnEmptyIcebergTable =
processMicroBatch(singleBatchWriter, globalTempView);
+ Assert.assertEquals(
+ Collections.emptyList(),
+ processStreamOnEmptyIcebergTable);
+
+ for (List<List<SimpleRecord>> expectedCheckpoint :
getTestDataForMultipleWritesWithMultipleSnapshots()) {
+ appendData(expectedCheckpoint, location);
+ table.refresh();
+
+ List<SimpleRecord> actualDataInCurrentMicroBatch =
processMicroBatch(singleBatchWriter, globalTempView);
+ Assert.assertEquals(
+
expectedCheckpoint.stream().flatMap(List::stream).collect(Collectors.toList()),
+ actualDataInCurrentMicroBatch);
+ }
+ } finally {
+ for (StreamingQuery query : spark.streams().active()) {
+ query.stop();
+ }
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void testParquetOrcAvroDataInOneTable() throws IOException,
TimeoutException {
+ File parent = temp.newFolder("parent");
+ File location = new File(parent, "test-table");
+
+ HadoopTables tables = new HadoopTables(CONF);
+
+ // use partition buckets - so that single list<SimpleRecord> can generate
multiple files per write
+ PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).bucket("id",
3).build();
+ Table table = tables.create(SCHEMA, spec, location.toString());
+
+ List<SimpleRecord> parquetFileRecords = Lists.newArrayList(
+ new SimpleRecord(1, "one"),
+ new SimpleRecord(2, "two"),
+ new SimpleRecord(3, "three"));
+
+ List<SimpleRecord> orcFileRecords = Lists.newArrayList(
+ new SimpleRecord(4, "four"),
+ new SimpleRecord(5, "five"));
+
+ List<SimpleRecord> avroFileRecords = Lists.newArrayList(
+ new SimpleRecord(6, "six"),
+ new SimpleRecord(7, "seven"));
+
+ appendData(parquetFileRecords, location, "parquet");
+ appendData(orcFileRecords, location, "orc");
+ appendData(avroFileRecords, location, "avro");
+
+ table.refresh();
+
+
+ try {
+ Dataset<Row> ds = spark.readStream()
+ .format("iceberg")
+ .load(location.toString());
+ List<SimpleRecord> actual = processStreamTillEnd(ds);
+ List<SimpleRecord> expected = Stream.concat(Stream.concat(
+ parquetFileRecords.stream(),
+ orcFileRecords.stream()),
+ avroFileRecords.stream())
+ .collect(Collectors.toList());
+ Assert.assertEquals(expected, actual);
+ } finally {
+ for (StreamingQuery query : spark.streams().active()) {
+ query.stop();
+ }
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void testReadStreamFromEmptyTable() throws IOException,
TimeoutException {
Review comment:
Nevermind, I think that the checkpoint test covers this.
##########
File path:
spark3/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java
##########
@@ -0,0 +1,502 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.BaseTable;
+import org.apache.iceberg.DataOperations;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.Files;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.TableOperations;
+import org.apache.iceberg.TestHelpers;
+import org.apache.iceberg.data.FileHelpers;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.hadoop.HadoopTables;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.types.Types;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Encoders;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.streaming.DataStreamWriter;
+import org.apache.spark.sql.streaming.OutputMode;
+import org.apache.spark.sql.streaming.StreamingQuery;
+import org.apache.spark.sql.streaming.StreamingQueryException;
+import org.apache.spark.sql.streaming.Trigger;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.rules.TemporaryFolder;
+
+import static org.apache.iceberg.types.Types.NestedField.optional;
+
+public final class TestStructuredStreamingRead3 {
+ private static final Configuration CONF = new Configuration();
+ private static final Schema SCHEMA = new Schema(
+ optional(1, "id", Types.IntegerType.get()),
+ optional(2, "data", Types.StringType.get())
+ );
+ private static SparkSession spark = null;
+
+ @Rule
+ public TemporaryFolder temp = new TemporaryFolder();
+
+ @Rule
+ public ExpectedException exceptionRule = ExpectedException.none();
+
+ @BeforeClass
+ public static void startSpark() {
+ TestStructuredStreamingRead3.spark = SparkSession.builder()
+ .master("local[2]")
+ .config("spark.sql.shuffle.partitions", 4)
+ .getOrCreate();
+ }
+
+ @AfterClass
+ public static void stopSpark() {
+ SparkSession currentSpark = TestStructuredStreamingRead3.spark;
+ TestStructuredStreamingRead3.spark = null;
+ currentSpark.stop();
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void testReadStreamOnIcebergTableWithMultipleSnapshots() throws
IOException, TimeoutException {
+ File parent = temp.newFolder("parent");
+ File location = new File(parent, "test-table");
+
+ HadoopTables tables = new HadoopTables(CONF);
+ PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).bucket("id",
3).build();
+ Table table = tables.create(SCHEMA, spec, location.toString());
+
+ List<List<SimpleRecord>> expected = getTestDataForMultipleSnapshots();
+ appendData(expected, location);
+
+ table.refresh();
+
+ try {
+ Dataset<Row> df = spark.readStream()
+ .format("iceberg")
+ .load(location.toString());
+ List<SimpleRecord> actual = processStreamTillEnd(df);
+
+ Assert.assertEquals(
+ expected.stream().flatMap(List::stream).collect(Collectors.toList()),
+ actual);
+ } finally {
+ for (StreamingQuery query : spark.streams().active()) {
+ query.stop();
+ }
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void testResumingStreamReadFromCheckpoint() throws IOException,
TimeoutException, StreamingQueryException {
+ File parent = temp.newFolder("parent");
+ File location = new File(parent, "test-table");
+ File writerCheckpoint = new File(parent, "writer-checkpoint");
+
+ HadoopTables tables = new HadoopTables(CONF);
+ PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).bucket("id",
3).build();
+ Table table = tables.create(SCHEMA, spec, location.toString());
+ final String tempView = "microBatchView";
+
+ try {
+ Dataset<Row> df = spark.readStream()
+ .format("iceberg")
+ .load(location.toString());
+ DataStreamWriter<Row> singleBatchWriter = df.writeStream()
+ .trigger(Trigger.Once())
+ .option("checkpointLocation", writerCheckpoint.toString())
+ .foreachBatch((batchDF, batchId) -> {
+ batchDF.createOrReplaceGlobalTempView(tempView);
+ });
+
+ String globalTempView = "global_temp." + tempView;
+
+ List<SimpleRecord> processStreamOnEmptyIcebergTable =
processMicroBatch(singleBatchWriter, globalTempView);
+ Assert.assertEquals(
+ Collections.emptyList(),
+ processStreamOnEmptyIcebergTable);
+
+ for (List<List<SimpleRecord>> expectedCheckpoint :
getTestDataForMultipleWritesWithMultipleSnapshots()) {
+ appendData(expectedCheckpoint, location);
+ table.refresh();
+
+ List<SimpleRecord> actualDataInCurrentMicroBatch =
processMicroBatch(singleBatchWriter, globalTempView);
+ Assert.assertEquals(
+
expectedCheckpoint.stream().flatMap(List::stream).collect(Collectors.toList()),
+ actualDataInCurrentMicroBatch);
+ }
+ } finally {
+ for (StreamingQuery query : spark.streams().active()) {
+ query.stop();
+ }
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void testParquetOrcAvroDataInOneTable() throws IOException,
TimeoutException {
+ File parent = temp.newFolder("parent");
+ File location = new File(parent, "test-table");
+
+ HadoopTables tables = new HadoopTables(CONF);
+
+ // use partition buckets - so that single list<SimpleRecord> can generate
multiple files per write
+ PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).bucket("id",
3).build();
+ Table table = tables.create(SCHEMA, spec, location.toString());
+
+ List<SimpleRecord> parquetFileRecords = Lists.newArrayList(
+ new SimpleRecord(1, "one"),
+ new SimpleRecord(2, "two"),
+ new SimpleRecord(3, "three"));
+
+ List<SimpleRecord> orcFileRecords = Lists.newArrayList(
+ new SimpleRecord(4, "four"),
+ new SimpleRecord(5, "five"));
+
+ List<SimpleRecord> avroFileRecords = Lists.newArrayList(
+ new SimpleRecord(6, "six"),
+ new SimpleRecord(7, "seven"));
+
+ appendData(parquetFileRecords, location, "parquet");
+ appendData(orcFileRecords, location, "orc");
+ appendData(avroFileRecords, location, "avro");
+
+ table.refresh();
+
+
+ try {
+ Dataset<Row> ds = spark.readStream()
+ .format("iceberg")
+ .load(location.toString());
+ List<SimpleRecord> actual = processStreamTillEnd(ds);
+ List<SimpleRecord> expected = Stream.concat(Stream.concat(
+ parquetFileRecords.stream(),
+ orcFileRecords.stream()),
+ avroFileRecords.stream())
+ .collect(Collectors.toList());
+ Assert.assertEquals(expected, actual);
+ } finally {
+ for (StreamingQuery query : spark.streams().active()) {
+ query.stop();
+ }
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void testReadStreamFromEmptyTable() throws IOException,
TimeoutException {
+ File parent = temp.newFolder("parent");
+ File location = new File(parent, "test-table");
+
+ HadoopTables tables = new HadoopTables(CONF);
+ PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).bucket("id",
3).build();
+ Table table = tables.create(SCHEMA, spec, location.toString());
+
+ table.refresh();
+
+ try {
+ Dataset<Row> df = spark.readStream()
+ .format("iceberg")
+ .load(location.toString());
+
+ List<SimpleRecord> actual = processStreamTillEnd(df);
+ Assert.assertEquals(Collections.emptyList(), actual);
+ } finally {
+ for (StreamingQuery query : spark.streams().active()) {
+ query.stop();
+ }
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void testReadStreamWithSnapshotTypeOverwriteErrorsOut() throws
IOException, TimeoutException {
+ File parent = temp.newFolder("parent");
+ File location = new File(parent, "test-table");
+
+ HadoopTables tables = new HadoopTables(CONF);
+ PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).bucket("id",
3).build();
+ Table table = tables.create(SCHEMA, spec, location.toString());
+
+ // upgrade table to verison 2 - to facilitate creation of Snapshot of type
OVERWRITE.
+ TableOperations ops = ((BaseTable) table).operations();
+ TableMetadata meta = ops.current();
+ ops.commit(meta, meta.upgradeToFormatVersion(2));
+
+ // fill table with some data
+ List<List<SimpleRecord>> dataAcrossSnapshots =
getTestDataForMultipleSnapshots();
+ appendData(dataAcrossSnapshots, location);
+
+ table.refresh();
+
+ Schema deleteRowSchema = table.schema().select("data");
+ Record dataDelete = GenericRecord.create(deleteRowSchema);
+ List<Record> dataDeletes = Lists.newArrayList(
Review comment:
I think you could just run a `DELETE FROM` or `MERGE INTO` SQL query.
##########
File path:
spark3/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java
##########
@@ -0,0 +1,502 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.BaseTable;
+import org.apache.iceberg.DataOperations;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.Files;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.TableOperations;
+import org.apache.iceberg.TestHelpers;
+import org.apache.iceberg.data.FileHelpers;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.hadoop.HadoopTables;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.types.Types;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Encoders;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.streaming.DataStreamWriter;
+import org.apache.spark.sql.streaming.OutputMode;
+import org.apache.spark.sql.streaming.StreamingQuery;
+import org.apache.spark.sql.streaming.StreamingQueryException;
+import org.apache.spark.sql.streaming.Trigger;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.rules.TemporaryFolder;
+
+import static org.apache.iceberg.types.Types.NestedField.optional;
+
+public final class TestStructuredStreamingRead3 {
Review comment:
Could you create a variant of these tests based on
`SparkCatalogTestBase`? In addition to testing tables that are identified by
location, we should also validate that everything works with tables tracked in
catalogs.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]