SreeramGarlapati commented on a change in pull request #2660: URL: https://github.com/apache/iceberg/pull/2660#discussion_r650082097
########## File path: spark3/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java ########## @@ -0,0 +1,314 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.spark.source; + +import java.io.BufferedWriter; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.io.OutputStreamWriter; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; +import org.apache.hadoop.fs.Path; +import org.apache.iceberg.CombinedScanTask; +import org.apache.iceberg.DataOperations; +import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.MicroBatches; +import org.apache.iceberg.MicroBatches.MicroBatch; +import org.apache.iceberg.Schema; +import org.apache.iceberg.SchemaParser; +import org.apache.iceberg.SerializableTable; +import org.apache.iceberg.Snapshot; +import org.apache.iceberg.SnapshotSummary; +import org.apache.iceberg.Table; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Iterables; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.spark.Spark3Util; +import org.apache.iceberg.spark.SparkReadOptions; +import org.apache.iceberg.spark.source.SparkBatchScan.ReadTask; +import org.apache.iceberg.spark.source.SparkBatchScan.ReaderFactory; +import org.apache.iceberg.util.PropertyUtil; +import org.apache.iceberg.util.SnapshotUtil; +import org.apache.iceberg.util.TableScanUtil; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.broadcast.Broadcast; +import org.apache.spark.sql.connector.read.InputPartition; +import org.apache.spark.sql.connector.read.PartitionReaderFactory; +import org.apache.spark.sql.connector.read.streaming.MicroBatchStream; +import org.apache.spark.sql.connector.read.streaming.Offset; +import org.apache.spark.sql.util.CaseInsensitiveStringMap; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.apache.iceberg.TableProperties.SPLIT_LOOKBACK; +import static org.apache.iceberg.TableProperties.SPLIT_LOOKBACK_DEFAULT; +import static org.apache.iceberg.TableProperties.SPLIT_OPEN_FILE_COST; +import static org.apache.iceberg.TableProperties.SPLIT_OPEN_FILE_COST_DEFAULT; +import static org.apache.iceberg.TableProperties.SPLIT_SIZE; +import static org.apache.iceberg.TableProperties.SPLIT_SIZE_DEFAULT; + +public class SparkMicroBatchStream implements MicroBatchStream { + private static final Logger LOG = LoggerFactory.getLogger(SparkMicroBatchStream.class); + + private final JavaSparkContext sparkContext; + private final Table table; + private final boolean caseSensitive; + private final Schema expectedSchema; + private final Long splitSize; + private final Integer splitLookback; + private final Long splitOpenFileCost; + private final boolean localityPreferred; + private final InitialOffsetStore initialOffsetStore; + private final StreamingOffset initialOffset; + + SparkMicroBatchStream(JavaSparkContext sparkContext, + Table table, boolean caseSensitive, Schema expectedSchema, + CaseInsensitiveStringMap options, String checkpointLocation) { + this.sparkContext = sparkContext; + this.table = table; + this.caseSensitive = caseSensitive; + this.expectedSchema = expectedSchema; + this.localityPreferred = Spark3Util.isLocalityEnabled(table.io(), table.location(), options); + this.splitSize = Optional.ofNullable(Spark3Util.propertyAsLong(options, SparkReadOptions.SPLIT_SIZE, null)) + .orElseGet(() -> PropertyUtil.propertyAsLong(table.properties(), SPLIT_SIZE, SPLIT_SIZE_DEFAULT)); + this.splitLookback = Optional.ofNullable(Spark3Util.propertyAsInt(options, SparkReadOptions.LOOKBACK, null)) + .orElseGet(() -> PropertyUtil.propertyAsInt(table.properties(), SPLIT_LOOKBACK, SPLIT_LOOKBACK_DEFAULT)); + this.splitOpenFileCost = Optional.ofNullable( + Spark3Util.propertyAsLong(options, SparkReadOptions.FILE_OPEN_COST, null)) + .orElseGet(() -> PropertyUtil.propertyAsLong(table.properties(), SPLIT_OPEN_FILE_COST, + SPLIT_OPEN_FILE_COST_DEFAULT)); + this.initialOffsetStore = InitialOffsetStore.getInstance(table.io(), checkpointLocation); + this.initialOffset = getOrWriteInitialOffset(table, initialOffsetStore); + } + + @Override + public Offset latestOffset() { + table.refresh(); + Snapshot latestSnapshot = table.currentSnapshot(); + if (latestSnapshot == null) { + return StreamingOffset.START_OFFSET; + } + + String addedFilesValue = latestSnapshot.summary().get(SnapshotSummary.ADDED_FILES_PROP); + long addedFiles = addedFilesValue == null ? + Iterables.size(latestSnapshot.addedFiles()) : + Long.parseLong(addedFilesValue); + + // a readStream on an Iceberg table can be started from 2 types of snapshots + // 1. a valid starting Snapshot: + // when this valid starting Snapshot is the initialOffset - then, scanAllFiles must be set to true; + // for all StreamingOffsets following this - scanAllFiles must be set to false + // 2. START_OFFSET: + // if the stream started on the table from START_OFFSET - it implies - that all the subsequent Snapshots added + // will have all files as net New manifests & hence scanAllFiles can be false. + boolean scanAllFiles = !StreamingOffset.START_OFFSET.equals(initialOffset) && + latestSnapshot.snapshotId() == initialOffset.snapshotId(); + return new StreamingOffset( + latestSnapshot.snapshotId(), + addedFiles, + scanAllFiles); + } + + @Override + public InputPartition[] planInputPartitions(Offset start, Offset end) { + if (end.equals(StreamingOffset.START_OFFSET)) { + return new InputPartition[0]; + } + + // broadcast the table metadata as input partitions will be sent to executors + Broadcast<Table> tableBroadcast = sparkContext.broadcast(SerializableTable.copyOf(table)); + String expectedSchemaString = SchemaParser.toJson(expectedSchema); + + Preconditions.checkState( + end instanceof StreamingOffset, + "The end offset passed to planInputPartitions() is not an instance of StreamingOffset."); + + Preconditions.checkState( + start instanceof StreamingOffset, + "The start offset passed to planInputPartitions() is not an instance of StreamingOffset."); + + StreamingOffset endOffset = (StreamingOffset) end; + StreamingOffset startOffset = (StreamingOffset) start; + + List<FileScanTask> fileScanTasks = getFileScanTasks(startOffset, endOffset); + + CloseableIterable<FileScanTask> splitTasks = TableScanUtil.splitFiles( + CloseableIterable.withNoopClose(fileScanTasks), + splitSize); + List<CombinedScanTask> combinedScanTasks = Lists.newArrayList( + TableScanUtil.planTasks(splitTasks, splitSize, splitLookback, splitOpenFileCost)); + InputPartition[] readTasks = new InputPartition[combinedScanTasks.size()]; + + for (int i = 0; i < combinedScanTasks.size(); i++) { + readTasks[i] = new ReadTask( + combinedScanTasks.get(i), tableBroadcast, expectedSchemaString, + caseSensitive, localityPreferred); + } + + return readTasks; + } + + @Override + public PartitionReaderFactory createReaderFactory() { + int batchSizeValueToDisableColumnarReads = 0; + return new ReaderFactory(batchSizeValueToDisableColumnarReads); + } + + @Override + public Offset initialOffset() { + return initialOffset; + } + + @Override + public Offset deserializeOffset(String json) { + return StreamingOffset.fromJson(json); + } + + @Override + public void commit(Offset end) { + } + + @Override + public void stop() { + } + + private static StreamingOffset getOrWriteInitialOffset(Table table, InitialOffsetStore initialOffsetStore) { + if (initialOffsetStore.isOffsetStoreInitialized()) { + return initialOffsetStore.getInitialOffset(); + } + + table.refresh(); + StreamingOffset offset = table.currentSnapshot() == null ? + StreamingOffset.START_OFFSET : + new StreamingOffset(SnapshotUtil.oldestSnapshot(table).snapshotId(), 0, true); + initialOffsetStore.addInitialOffset(offset); + + return offset; + } + + private List<FileScanTask> getFileScanTasks(StreamingOffset startOffset, StreamingOffset endOffset) { Review comment: renamed to `calculate`. naming this was a little bit of a head scratcher :) calculate / resolve / plan all fitted the bill - but finally felt calculate is good enough to tell give a gist of whats going on in this method.. ########## File path: spark3/src/main/java/org/apache/iceberg/spark/source/SparkBatchScan.java ########## @@ -62,6 +63,7 @@ private static final Logger LOG = LoggerFactory.getLogger(SparkBatchScan.class); private final JavaSparkContext sparkContext; + private final SparkSession spark; Review comment: fixed this. ########## File path: spark3/src/main/java/org/apache/iceberg/spark/source/SparkBatchScan.java ########## @@ -108,6 +111,12 @@ public Batch toBatch() { return this; } + @Override + public MicroBatchStream toMicroBatchStream(String checkpointLocation) { Review comment: yes. I guess that is the answer for my question here too: https://github.com/apache/iceberg/pull/2611#discussion_r635010805 ########## File path: spark3/src/main/java/org/apache/iceberg/spark/source/SparkBatchScan.java ########## @@ -213,10 +222,10 @@ public String description() { return String.format("%s [filters=%s]", table, filters); } - private static class ReaderFactory implements PartitionReaderFactory { + public static class ReaderFactory implements PartitionReaderFactory { Review comment: @rdblue - addressed the visibility feedback. SparkBatchScan class could use a bit of Refactoring - I will follow this UP with a refactor only pr. Rationale for that is: * changes to the codebase are easily auditable * mix of refactor and an actual code change - will be lost - very hard for any code review to make sense out of the diff at that point. with that, I am resolving this comment. Please feel free to reopen if you feel we must refactor in the current PR. ########## File path: spark3/src/main/java/org/apache/iceberg/spark/source/SparkBatchScan.java ########## @@ -213,10 +222,10 @@ public String description() { return String.format("%s [filters=%s]", table, filters); } - private static class ReaderFactory implements PartitionReaderFactory { + public static class ReaderFactory implements PartitionReaderFactory { Review comment: @rdblue - addressed the visibility feedback. SparkBatchScan class could use a bit of Refactoring - I will follow this UP with a refactor only pr & address this suggestion as well. Rationale for separating this out into REFACTOR ONLY PRs is that: * changes to the codebase are easily auditable * cognitive overload on reviewers: mix of refactor and an actual code change - will be very hard for any code reviewer to make sense out of the diff at that point. with that, I am resolving this comment. Please feel free to reopen if you feel a strong need to refactor in the current PR. ########## File path: spark3/src/main/java/org/apache/iceberg/spark/source/SparkBatchScan.java ########## @@ -108,6 +109,12 @@ public Batch toBatch() { return this; } + @Override + public MicroBatchStream toMicroBatchStream(String checkpointLocation) { + return new SparkMicroBatchStream( Review comment: good point @aokolnychyi `stats estimation` - is something that i felt - will evolve differently for MicroBatchStreaming vs Batch reads - as Batch reads only rely on the latest snapshot - where as the data returned by MicroBatchStreaming is multiple Snapshots. Given the cognitive overload of the current PR - & the sheer number of things to drive consensus on - I want to take that in a different PR. Added to the **Whats Next** section - of this PR. Once the PR - concludes - I will file those as issues & track. Pl. lemme know if that makes sense. ########## File path: spark3/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java ########## @@ -0,0 +1,313 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.spark.source; + +import java.io.BufferedWriter; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.io.OutputStreamWriter; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; +import org.apache.hadoop.fs.Path; +import org.apache.iceberg.CombinedScanTask; +import org.apache.iceberg.DataOperations; +import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.MicroBatches; +import org.apache.iceberg.MicroBatches.MicroBatch; +import org.apache.iceberg.Schema; +import org.apache.iceberg.SchemaParser; +import org.apache.iceberg.SerializableTable; +import org.apache.iceberg.Snapshot; +import org.apache.iceberg.SnapshotSummary; +import org.apache.iceberg.Table; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.spark.Spark3Util; +import org.apache.iceberg.spark.SparkReadOptions; +import org.apache.iceberg.spark.source.SparkBatchScan.ReadTask; +import org.apache.iceberg.spark.source.SparkBatchScan.ReaderFactory; +import org.apache.iceberg.util.PropertyUtil; +import org.apache.iceberg.util.SnapshotUtil; +import org.apache.iceberg.util.TableScanUtil; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.broadcast.Broadcast; +import org.apache.spark.sql.connector.read.InputPartition; +import org.apache.spark.sql.connector.read.PartitionReaderFactory; +import org.apache.spark.sql.connector.read.streaming.MicroBatchStream; +import org.apache.spark.sql.connector.read.streaming.Offset; +import org.apache.spark.sql.util.CaseInsensitiveStringMap; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.apache.iceberg.TableProperties.SPLIT_LOOKBACK; +import static org.apache.iceberg.TableProperties.SPLIT_LOOKBACK_DEFAULT; +import static org.apache.iceberg.TableProperties.SPLIT_OPEN_FILE_COST; +import static org.apache.iceberg.TableProperties.SPLIT_OPEN_FILE_COST_DEFAULT; +import static org.apache.iceberg.TableProperties.SPLIT_SIZE; +import static org.apache.iceberg.TableProperties.SPLIT_SIZE_DEFAULT; + +public class SparkMicroBatchStream implements MicroBatchStream { + private static final Logger LOG = LoggerFactory.getLogger(SparkMicroBatchStream.class); Review comment: Given that 1. this is **Streaming** - & I envision that most of the use cases would be high traffic tables in Streaming & could result in chatty logs 2. spark already logs the batches returned checkpointed etc. - I wasn't adding the logs unless I feel a strong need. Can you pl. point me to specific places that you feel will be helpful. ########## File path: spark3/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java ########## @@ -0,0 +1,313 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.spark.source; + +import java.io.BufferedWriter; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.io.OutputStreamWriter; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; +import org.apache.hadoop.fs.Path; +import org.apache.iceberg.CombinedScanTask; +import org.apache.iceberg.DataOperations; +import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.MicroBatches; +import org.apache.iceberg.MicroBatches.MicroBatch; +import org.apache.iceberg.Schema; +import org.apache.iceberg.SchemaParser; +import org.apache.iceberg.SerializableTable; +import org.apache.iceberg.Snapshot; +import org.apache.iceberg.SnapshotSummary; +import org.apache.iceberg.Table; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.spark.Spark3Util; +import org.apache.iceberg.spark.SparkReadOptions; +import org.apache.iceberg.spark.source.SparkBatchScan.ReadTask; +import org.apache.iceberg.spark.source.SparkBatchScan.ReaderFactory; +import org.apache.iceberg.util.PropertyUtil; +import org.apache.iceberg.util.SnapshotUtil; +import org.apache.iceberg.util.TableScanUtil; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.broadcast.Broadcast; +import org.apache.spark.sql.connector.read.InputPartition; +import org.apache.spark.sql.connector.read.PartitionReaderFactory; +import org.apache.spark.sql.connector.read.streaming.MicroBatchStream; +import org.apache.spark.sql.connector.read.streaming.Offset; +import org.apache.spark.sql.util.CaseInsensitiveStringMap; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.apache.iceberg.TableProperties.SPLIT_LOOKBACK; +import static org.apache.iceberg.TableProperties.SPLIT_LOOKBACK_DEFAULT; +import static org.apache.iceberg.TableProperties.SPLIT_OPEN_FILE_COST; +import static org.apache.iceberg.TableProperties.SPLIT_OPEN_FILE_COST_DEFAULT; +import static org.apache.iceberg.TableProperties.SPLIT_SIZE; +import static org.apache.iceberg.TableProperties.SPLIT_SIZE_DEFAULT; + +public class SparkMicroBatchStream implements MicroBatchStream { + private static final Logger LOG = LoggerFactory.getLogger(SparkMicroBatchStream.class); + + private final JavaSparkContext sparkContext; + private final Table table; + private final boolean caseSensitive; + private final Schema expectedSchema; + private final Long splitSize; + private final Integer splitLookback; + private final Long splitOpenFileCost; + private final boolean localityPreferred; + private final InitialOffsetStore initialOffsetStore; Review comment: fixed this. ########## File path: spark3/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java ########## @@ -0,0 +1,313 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.spark.source; + +import java.io.BufferedWriter; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.io.OutputStreamWriter; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; +import org.apache.hadoop.fs.Path; +import org.apache.iceberg.CombinedScanTask; +import org.apache.iceberg.DataOperations; +import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.MicroBatches; +import org.apache.iceberg.MicroBatches.MicroBatch; +import org.apache.iceberg.Schema; +import org.apache.iceberg.SchemaParser; +import org.apache.iceberg.SerializableTable; +import org.apache.iceberg.Snapshot; +import org.apache.iceberg.SnapshotSummary; +import org.apache.iceberg.Table; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.spark.Spark3Util; +import org.apache.iceberg.spark.SparkReadOptions; +import org.apache.iceberg.spark.source.SparkBatchScan.ReadTask; +import org.apache.iceberg.spark.source.SparkBatchScan.ReaderFactory; +import org.apache.iceberg.util.PropertyUtil; +import org.apache.iceberg.util.SnapshotUtil; +import org.apache.iceberg.util.TableScanUtil; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.broadcast.Broadcast; +import org.apache.spark.sql.connector.read.InputPartition; +import org.apache.spark.sql.connector.read.PartitionReaderFactory; +import org.apache.spark.sql.connector.read.streaming.MicroBatchStream; +import org.apache.spark.sql.connector.read.streaming.Offset; +import org.apache.spark.sql.util.CaseInsensitiveStringMap; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.apache.iceberg.TableProperties.SPLIT_LOOKBACK; +import static org.apache.iceberg.TableProperties.SPLIT_LOOKBACK_DEFAULT; +import static org.apache.iceberg.TableProperties.SPLIT_OPEN_FILE_COST; +import static org.apache.iceberg.TableProperties.SPLIT_OPEN_FILE_COST_DEFAULT; +import static org.apache.iceberg.TableProperties.SPLIT_SIZE; +import static org.apache.iceberg.TableProperties.SPLIT_SIZE_DEFAULT; + +public class SparkMicroBatchStream implements MicroBatchStream { + private static final Logger LOG = LoggerFactory.getLogger(SparkMicroBatchStream.class); + + private final JavaSparkContext sparkContext; + private final Table table; + private final boolean caseSensitive; + private final Schema expectedSchema; + private final Long splitSize; + private final Integer splitLookback; + private final Long splitOpenFileCost; + private final boolean localityPreferred; + private final InitialOffsetStore initialOffsetStore; + private final StreamingOffset initialOffset; + + SparkMicroBatchStream(JavaSparkContext sparkContext, + Table table, boolean caseSensitive, Schema expectedSchema, + CaseInsensitiveStringMap options, String checkpointLocation) { + this.sparkContext = sparkContext; + this.table = table; + this.caseSensitive = caseSensitive; + this.expectedSchema = expectedSchema; + this.localityPreferred = Spark3Util.isLocalityEnabled(table.io(), table.location(), options); + this.splitSize = Optional.ofNullable(Spark3Util.propertyAsLong(options, SparkReadOptions.SPLIT_SIZE, null)) Review comment: nice, adopted that formatting. ########## File path: spark3/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java ########## @@ -0,0 +1,313 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.spark.source; + +import java.io.BufferedWriter; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.io.OutputStreamWriter; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; +import org.apache.hadoop.fs.Path; +import org.apache.iceberg.CombinedScanTask; +import org.apache.iceberg.DataOperations; +import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.MicroBatches; +import org.apache.iceberg.MicroBatches.MicroBatch; +import org.apache.iceberg.Schema; +import org.apache.iceberg.SchemaParser; +import org.apache.iceberg.SerializableTable; +import org.apache.iceberg.Snapshot; +import org.apache.iceberg.SnapshotSummary; +import org.apache.iceberg.Table; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.spark.Spark3Util; +import org.apache.iceberg.spark.SparkReadOptions; +import org.apache.iceberg.spark.source.SparkBatchScan.ReadTask; +import org.apache.iceberg.spark.source.SparkBatchScan.ReaderFactory; +import org.apache.iceberg.util.PropertyUtil; +import org.apache.iceberg.util.SnapshotUtil; +import org.apache.iceberg.util.TableScanUtil; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.broadcast.Broadcast; +import org.apache.spark.sql.connector.read.InputPartition; +import org.apache.spark.sql.connector.read.PartitionReaderFactory; +import org.apache.spark.sql.connector.read.streaming.MicroBatchStream; +import org.apache.spark.sql.connector.read.streaming.Offset; +import org.apache.spark.sql.util.CaseInsensitiveStringMap; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.apache.iceberg.TableProperties.SPLIT_LOOKBACK; +import static org.apache.iceberg.TableProperties.SPLIT_LOOKBACK_DEFAULT; +import static org.apache.iceberg.TableProperties.SPLIT_OPEN_FILE_COST; +import static org.apache.iceberg.TableProperties.SPLIT_OPEN_FILE_COST_DEFAULT; +import static org.apache.iceberg.TableProperties.SPLIT_SIZE; +import static org.apache.iceberg.TableProperties.SPLIT_SIZE_DEFAULT; + +public class SparkMicroBatchStream implements MicroBatchStream { + private static final Logger LOG = LoggerFactory.getLogger(SparkMicroBatchStream.class); + + private final JavaSparkContext sparkContext; + private final Table table; + private final boolean caseSensitive; + private final Schema expectedSchema; + private final Long splitSize; + private final Integer splitLookback; + private final Long splitOpenFileCost; + private final boolean localityPreferred; + private final InitialOffsetStore initialOffsetStore; + private final StreamingOffset initialOffset; + + SparkMicroBatchStream(JavaSparkContext sparkContext, + Table table, boolean caseSensitive, Schema expectedSchema, + CaseInsensitiveStringMap options, String checkpointLocation) { + this.sparkContext = sparkContext; + this.table = table; + this.caseSensitive = caseSensitive; + this.expectedSchema = expectedSchema; + this.localityPreferred = Spark3Util.isLocalityEnabled(table.io(), table.location(), options); + this.splitSize = Optional.ofNullable(Spark3Util.propertyAsLong(options, SparkReadOptions.SPLIT_SIZE, null)) + .orElseGet(() -> PropertyUtil.propertyAsLong(table.properties(), SPLIT_SIZE, SPLIT_SIZE_DEFAULT)); + this.splitLookback = Optional.ofNullable(Spark3Util.propertyAsInt(options, SparkReadOptions.LOOKBACK, null)) + .orElseGet(() -> PropertyUtil.propertyAsInt(table.properties(), SPLIT_LOOKBACK, SPLIT_LOOKBACK_DEFAULT)); + this.splitOpenFileCost = Optional.ofNullable( + Spark3Util.propertyAsLong(options, SparkReadOptions.FILE_OPEN_COST, null)) + .orElseGet(() -> PropertyUtil.propertyAsLong(table.properties(), SPLIT_OPEN_FILE_COST, + SPLIT_OPEN_FILE_COST_DEFAULT)); + this.initialOffsetStore = InitialOffsetStore.getInstance(table, checkpointLocation); + this.initialOffset = getOrWriteInitialOffset(initialOffsetStore); + } + + @Override + public Offset latestOffset() { + table.refresh(); + Snapshot latestSnapshot = table.currentSnapshot(); + if (latestSnapshot == null) { + return StreamingOffset.START_OFFSET; + } + + // a readStream on an Iceberg table can be started from 2 types of snapshots + // 1. a valid starting Snapshot: + // when this valid starting Snapshot is the initialOffset - then, scanAllFiles must be set to true; + // for all StreamingOffsets following this - scanAllFiles must be set to false + // 2. START_OFFSET: + // if the stream started on the table from START_OFFSET - it implies - that all the subsequent Snapshots added + // will have all files as net New manifests & hence scanAllFiles can be false. + boolean scanAllFiles = !StreamingOffset.START_OFFSET.equals(initialOffset) && + latestSnapshot.snapshotId() == initialOffset.snapshotId(); + + String positionValue = scanAllFiles ? + latestSnapshot.summary().get(SnapshotSummary.TOTAL_DATA_FILES_PROP) : + latestSnapshot.summary().get(SnapshotSummary.ADDED_FILES_PROP); + + return new StreamingOffset( + latestSnapshot.snapshotId(), positionValue != null ? Long.parseLong(positionValue) : 0, scanAllFiles); + } + + @Override + public InputPartition[] planInputPartitions(Offset start, Offset end) { + if (end.equals(StreamingOffset.START_OFFSET)) { + return new InputPartition[0]; + } + + // broadcast the table metadata as input partitions will be sent to executors + Broadcast<Table> tableBroadcast = sparkContext.broadcast(SerializableTable.copyOf(table)); + String expectedSchemaString = SchemaParser.toJson(expectedSchema); + + Preconditions.checkState( + end instanceof StreamingOffset, + "The end offset passed to planInputPartitions() is not an instance of StreamingOffset."); + + Preconditions.checkState( + start instanceof StreamingOffset, + "The start offset passed to planInputPartitions() is not an instance of StreamingOffset."); + + StreamingOffset endOffset = (StreamingOffset) end; + StreamingOffset startOffset = (StreamingOffset) start; + + List<FileScanTask> fileScanTasks = calculateFileScanTasks(startOffset, endOffset); + + CloseableIterable<FileScanTask> splitTasks = TableScanUtil.splitFiles( + CloseableIterable.withNoopClose(fileScanTasks), + splitSize); + List<CombinedScanTask> combinedScanTasks = Lists.newArrayList( + TableScanUtil.planTasks(splitTasks, splitSize, splitLookback, splitOpenFileCost)); + InputPartition[] readTasks = new InputPartition[combinedScanTasks.size()]; + + for (int i = 0; i < combinedScanTasks.size(); i++) { + readTasks[i] = new ReadTask( + combinedScanTasks.get(i), tableBroadcast, expectedSchemaString, + caseSensitive, localityPreferred); + } + + return readTasks; + } + + @Override + public PartitionReaderFactory createReaderFactory() { + return new ReaderFactory(0); + } + + @Override + public Offset initialOffset() { + return initialOffset; + } + + @Override + public Offset deserializeOffset(String json) { + return StreamingOffset.fromJson(json); + } + + @Override + public void commit(Offset end) { + } + + @Override + public void stop() { + } + + private static StreamingOffset getOrWriteInitialOffset(InitialOffsetStore initialOffsetStore) { + if (initialOffsetStore.isOffsetStoreInitialized()) { + return initialOffsetStore.getInitialOffset(); + } + + return initialOffsetStore.addInitialOffset(); + } + + private List<FileScanTask> calculateFileScanTasks(StreamingOffset startOffset, StreamingOffset endOffset) { + List<FileScanTask> fileScanTasks = new ArrayList<>(); + MicroBatch latestMicroBatch = null; + StreamingOffset batchStartOffset = StreamingOffset.START_OFFSET.equals(startOffset) ? + new StreamingOffset(SnapshotUtil.oldestSnapshot(table).snapshotId(), 0, false) : + startOffset; + + do { + final StreamingOffset currentOffset = + latestMicroBatch != null && latestMicroBatch.lastIndexOfSnapshot() ? + getNextAvailableSnapshot(latestMicroBatch.snapshotId()) : + batchStartOffset; + + latestMicroBatch = MicroBatches.from(table.snapshot(currentOffset.snapshotId()), table.io()) + .caseSensitive(caseSensitive) + .specsById(table.specs()) + .generate(currentOffset.position(), Long.MAX_VALUE, currentOffset.shouldScanAllFiles()); + + fileScanTasks.addAll(latestMicroBatch.tasks()); + } while (latestMicroBatch.snapshotId() != endOffset.snapshotId()); + + return fileScanTasks; + } + + private StreamingOffset getNextAvailableSnapshot(long snapshotId) { + Snapshot previousSnapshot = table.snapshot(snapshotId); + Snapshot pointer = table.currentSnapshot(); + while (pointer != null && previousSnapshot.snapshotId() != pointer.parentId()) { + Preconditions.checkState(pointer.operation().equals(DataOperations.APPEND), + "Encountered Snapshot DataOperation other than APPEND."); + + pointer = table.snapshot(pointer.parentId()); + } + + Preconditions.checkState(pointer != null, + "Cannot read data from snapshot which has already expired: %s", snapshotId); + + return new StreamingOffset(pointer.snapshotId(), 0L, false); + } + + interface InitialOffsetStore { Review comment: this `interface` really is serving one purpose - popping out those specific methods/operations/expectations from the InitialOffsetStore. the underlying impl. class could have several methods within it - but this interface is what will be used in our code. ########## File path: spark3/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java ########## @@ -0,0 +1,314 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.spark.source; + +import java.io.BufferedWriter; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.io.OutputStreamWriter; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.List; +import org.apache.hadoop.fs.Path; +import org.apache.iceberg.CombinedScanTask; +import org.apache.iceberg.DataOperations; +import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.MicroBatches; +import org.apache.iceberg.MicroBatches.MicroBatch; +import org.apache.iceberg.Schema; +import org.apache.iceberg.SchemaParser; +import org.apache.iceberg.SerializableTable; +import org.apache.iceberg.Snapshot; +import org.apache.iceberg.SnapshotSummary; +import org.apache.iceberg.Table; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.spark.Spark3Util; +import org.apache.iceberg.spark.SparkReadOptions; +import org.apache.iceberg.spark.source.SparkBatchScan.ReadTask; +import org.apache.iceberg.spark.source.SparkBatchScan.ReaderFactory; +import org.apache.iceberg.util.PropertyUtil; +import org.apache.iceberg.util.SnapshotUtil; +import org.apache.iceberg.util.TableScanUtil; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.broadcast.Broadcast; +import org.apache.spark.sql.connector.read.InputPartition; +import org.apache.spark.sql.connector.read.PartitionReaderFactory; +import org.apache.spark.sql.connector.read.streaming.MicroBatchStream; +import org.apache.spark.sql.connector.read.streaming.Offset; +import org.apache.spark.sql.util.CaseInsensitiveStringMap; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.apache.iceberg.TableProperties.SPLIT_LOOKBACK; +import static org.apache.iceberg.TableProperties.SPLIT_LOOKBACK_DEFAULT; +import static org.apache.iceberg.TableProperties.SPLIT_OPEN_FILE_COST; +import static org.apache.iceberg.TableProperties.SPLIT_OPEN_FILE_COST_DEFAULT; +import static org.apache.iceberg.TableProperties.SPLIT_SIZE; +import static org.apache.iceberg.TableProperties.SPLIT_SIZE_DEFAULT; + +public class SparkMicroBatchStream implements MicroBatchStream { + private static final Logger LOG = LoggerFactory.getLogger(SparkMicroBatchStream.class); + + private final JavaSparkContext sparkContext; + private final Table table; + private final boolean caseSensitive; + private final Schema expectedSchema; + private final Long splitSize; + private final Integer splitLookback; + private final Long splitOpenFileCost; + private final boolean localityPreferred; + private final StreamingOffset initialOffset; + + SparkMicroBatchStream(JavaSparkContext sparkContext, + Table table, boolean caseSensitive, Schema expectedSchema, + CaseInsensitiveStringMap options, String checkpointLocation) { + this.sparkContext = sparkContext; + this.table = table; + this.caseSensitive = caseSensitive; + this.expectedSchema = expectedSchema; + this.localityPreferred = Spark3Util.isLocalityEnabled(table.io(), table.location(), options); + + long tableSplitSize = PropertyUtil.propertyAsLong(table.properties(), SPLIT_SIZE, SPLIT_SIZE_DEFAULT); + this.splitSize = Spark3Util.propertyAsLong(options, SparkReadOptions.SPLIT_SIZE, tableSplitSize); + + int tableSplitLookback = PropertyUtil.propertyAsInt(table.properties(), SPLIT_LOOKBACK, SPLIT_LOOKBACK_DEFAULT); + this.splitLookback = Spark3Util.propertyAsInt(options, SparkReadOptions.LOOKBACK, tableSplitLookback); + + long tableSplitOpenFileCost = PropertyUtil.propertyAsLong( + table.properties(), SPLIT_OPEN_FILE_COST, SPLIT_OPEN_FILE_COST_DEFAULT); + this.splitOpenFileCost = Spark3Util.propertyAsLong(options, SPLIT_OPEN_FILE_COST, tableSplitOpenFileCost); + + InitialOffsetStore initialOffsetStore = InitialOffsetStore.getInstance(table, checkpointLocation); + this.initialOffset = getOrWriteInitialOffset(initialOffsetStore); + } + + @Override + public Offset latestOffset() { + table.refresh(); + Snapshot latestSnapshot = table.currentSnapshot(); + if (latestSnapshot == null) { + return StreamingOffset.START_OFFSET; + } + + // a readStream on an Iceberg table can be started from 2 types of snapshots + // 1. a valid starting Snapshot: + // when this valid starting Snapshot is the initialOffset - then, scanAllFiles must be set to true; + // for all StreamingOffsets following this - scanAllFiles must be set to false + // 2. START_OFFSET: + // if the stream started on the table from START_OFFSET - it implies - that all the subsequent Snapshots added + // will have all files as net New manifests & hence scanAllFiles can be false. + boolean scanAllFiles = !StreamingOffset.START_OFFSET.equals(initialOffset) && + latestSnapshot.snapshotId() == initialOffset.snapshotId(); + + String positionValue = scanAllFiles ? + latestSnapshot.summary().get(SnapshotSummary.TOTAL_DATA_FILES_PROP) : + latestSnapshot.summary().get(SnapshotSummary.ADDED_FILES_PROP); + + return new StreamingOffset( + latestSnapshot.snapshotId(), positionValue != null ? Long.parseLong(positionValue) : 0, scanAllFiles); + } + + @Override + public InputPartition[] planInputPartitions(Offset start, Offset end) { + if (end.equals(StreamingOffset.START_OFFSET)) { + return new InputPartition[0]; + } + + // broadcast the table metadata as input partitions will be sent to executors + Broadcast<Table> tableBroadcast = sparkContext.broadcast(SerializableTable.copyOf(table)); + String expectedSchemaString = SchemaParser.toJson(expectedSchema); + + Preconditions.checkState( + end instanceof StreamingOffset, + "The end offset passed to planInputPartitions() is not an instance of StreamingOffset."); + + Preconditions.checkState( + start instanceof StreamingOffset, + "The start offset passed to planInputPartitions() is not an instance of StreamingOffset."); + + StreamingOffset endOffset = (StreamingOffset) end; + StreamingOffset startOffset = (StreamingOffset) start; + + List<FileScanTask> fileScanTasks = calculateFileScanTasks(startOffset, endOffset); + + CloseableIterable<FileScanTask> splitTasks = TableScanUtil.splitFiles( + CloseableIterable.withNoopClose(fileScanTasks), + splitSize); + List<CombinedScanTask> combinedScanTasks = Lists.newArrayList( + TableScanUtil.planTasks(splitTasks, splitSize, splitLookback, splitOpenFileCost)); + InputPartition[] readTasks = new InputPartition[combinedScanTasks.size()]; + + for (int i = 0; i < combinedScanTasks.size(); i++) { + readTasks[i] = new ReadTask( + combinedScanTasks.get(i), tableBroadcast, expectedSchemaString, + caseSensitive, localityPreferred); + } + + return readTasks; + } + + @Override + public PartitionReaderFactory createReaderFactory() { + return new ReaderFactory(0); + } + + @Override + public Offset initialOffset() { + return initialOffset; + } + + @Override + public Offset deserializeOffset(String json) { + return StreamingOffset.fromJson(json); + } + + @Override + public void commit(Offset end) { Review comment: yes, that is correct. ########## File path: spark3/src/main/java/org/apache/iceberg/spark/source/SparkBatchScan.java ########## @@ -108,6 +111,12 @@ public Batch toBatch() { return this; } + @Override + public MicroBatchStream toMicroBatchStream(String checkpointLocation) { Review comment: added this to my - **Next Steps** list in the PR description. Once, the PR concludes - I will create Issues for these. ########## File path: spark3/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java ########## @@ -0,0 +1,314 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.spark.source; + +import java.io.BufferedWriter; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.io.OutputStreamWriter; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.List; +import org.apache.hadoop.fs.Path; +import org.apache.iceberg.CombinedScanTask; +import org.apache.iceberg.DataOperations; +import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.MicroBatches; +import org.apache.iceberg.MicroBatches.MicroBatch; +import org.apache.iceberg.Schema; +import org.apache.iceberg.SchemaParser; +import org.apache.iceberg.SerializableTable; +import org.apache.iceberg.Snapshot; +import org.apache.iceberg.SnapshotSummary; +import org.apache.iceberg.Table; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.spark.Spark3Util; +import org.apache.iceberg.spark.SparkReadOptions; +import org.apache.iceberg.spark.source.SparkBatchScan.ReadTask; +import org.apache.iceberg.spark.source.SparkBatchScan.ReaderFactory; +import org.apache.iceberg.util.PropertyUtil; +import org.apache.iceberg.util.SnapshotUtil; +import org.apache.iceberg.util.TableScanUtil; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.broadcast.Broadcast; +import org.apache.spark.sql.connector.read.InputPartition; +import org.apache.spark.sql.connector.read.PartitionReaderFactory; +import org.apache.spark.sql.connector.read.streaming.MicroBatchStream; +import org.apache.spark.sql.connector.read.streaming.Offset; +import org.apache.spark.sql.util.CaseInsensitiveStringMap; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.apache.iceberg.TableProperties.SPLIT_LOOKBACK; +import static org.apache.iceberg.TableProperties.SPLIT_LOOKBACK_DEFAULT; +import static org.apache.iceberg.TableProperties.SPLIT_OPEN_FILE_COST; +import static org.apache.iceberg.TableProperties.SPLIT_OPEN_FILE_COST_DEFAULT; +import static org.apache.iceberg.TableProperties.SPLIT_SIZE; +import static org.apache.iceberg.TableProperties.SPLIT_SIZE_DEFAULT; + +public class SparkMicroBatchStream implements MicroBatchStream { + private static final Logger LOG = LoggerFactory.getLogger(SparkMicroBatchStream.class); + + private final JavaSparkContext sparkContext; + private final Table table; + private final boolean caseSensitive; + private final Schema expectedSchema; + private final Long splitSize; + private final Integer splitLookback; + private final Long splitOpenFileCost; + private final boolean localityPreferred; + private final StreamingOffset initialOffset; + + SparkMicroBatchStream(JavaSparkContext sparkContext, + Table table, boolean caseSensitive, Schema expectedSchema, + CaseInsensitiveStringMap options, String checkpointLocation) { + this.sparkContext = sparkContext; + this.table = table; + this.caseSensitive = caseSensitive; + this.expectedSchema = expectedSchema; + this.localityPreferred = Spark3Util.isLocalityEnabled(table.io(), table.location(), options); + + long tableSplitSize = PropertyUtil.propertyAsLong(table.properties(), SPLIT_SIZE, SPLIT_SIZE_DEFAULT); + this.splitSize = Spark3Util.propertyAsLong(options, SparkReadOptions.SPLIT_SIZE, tableSplitSize); + + int tableSplitLookback = PropertyUtil.propertyAsInt(table.properties(), SPLIT_LOOKBACK, SPLIT_LOOKBACK_DEFAULT); + this.splitLookback = Spark3Util.propertyAsInt(options, SparkReadOptions.LOOKBACK, tableSplitLookback); + + long tableSplitOpenFileCost = PropertyUtil.propertyAsLong( + table.properties(), SPLIT_OPEN_FILE_COST, SPLIT_OPEN_FILE_COST_DEFAULT); + this.splitOpenFileCost = Spark3Util.propertyAsLong(options, SPLIT_OPEN_FILE_COST, tableSplitOpenFileCost); + + InitialOffsetStore initialOffsetStore = InitialOffsetStore.getInstance(table, checkpointLocation); + this.initialOffset = getOrWriteInitialOffset(initialOffsetStore); + } + + @Override + public Offset latestOffset() { Review comment: yes, this is a limitation with spark's current `MicroBatchStream` interface. it is asking the streaming source only for 2 values: `latest` and `initial` - & is building a MicroBatch out of it. let me explore the limit API a little bit more - but, yes this is very good call out - we must leverage the limit API - if it solves that problem - as soon as we switch to 3.1. ########## File path: spark3/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java ########## @@ -0,0 +1,314 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.spark.source; + +import java.io.BufferedWriter; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.io.OutputStreamWriter; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.List; +import org.apache.hadoop.fs.Path; +import org.apache.iceberg.CombinedScanTask; +import org.apache.iceberg.DataOperations; +import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.MicroBatches; +import org.apache.iceberg.MicroBatches.MicroBatch; +import org.apache.iceberg.Schema; +import org.apache.iceberg.SchemaParser; +import org.apache.iceberg.SerializableTable; +import org.apache.iceberg.Snapshot; +import org.apache.iceberg.SnapshotSummary; +import org.apache.iceberg.Table; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.spark.Spark3Util; +import org.apache.iceberg.spark.SparkReadOptions; +import org.apache.iceberg.spark.source.SparkBatchScan.ReadTask; +import org.apache.iceberg.spark.source.SparkBatchScan.ReaderFactory; +import org.apache.iceberg.util.PropertyUtil; +import org.apache.iceberg.util.SnapshotUtil; +import org.apache.iceberg.util.TableScanUtil; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.broadcast.Broadcast; +import org.apache.spark.sql.connector.read.InputPartition; +import org.apache.spark.sql.connector.read.PartitionReaderFactory; +import org.apache.spark.sql.connector.read.streaming.MicroBatchStream; +import org.apache.spark.sql.connector.read.streaming.Offset; +import org.apache.spark.sql.util.CaseInsensitiveStringMap; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.apache.iceberg.TableProperties.SPLIT_LOOKBACK; +import static org.apache.iceberg.TableProperties.SPLIT_LOOKBACK_DEFAULT; +import static org.apache.iceberg.TableProperties.SPLIT_OPEN_FILE_COST; +import static org.apache.iceberg.TableProperties.SPLIT_OPEN_FILE_COST_DEFAULT; +import static org.apache.iceberg.TableProperties.SPLIT_SIZE; +import static org.apache.iceberg.TableProperties.SPLIT_SIZE_DEFAULT; + +public class SparkMicroBatchStream implements MicroBatchStream { + private static final Logger LOG = LoggerFactory.getLogger(SparkMicroBatchStream.class); + + private final JavaSparkContext sparkContext; + private final Table table; + private final boolean caseSensitive; + private final Schema expectedSchema; + private final Long splitSize; + private final Integer splitLookback; + private final Long splitOpenFileCost; + private final boolean localityPreferred; + private final StreamingOffset initialOffset; + + SparkMicroBatchStream(JavaSparkContext sparkContext, + Table table, boolean caseSensitive, Schema expectedSchema, + CaseInsensitiveStringMap options, String checkpointLocation) { + this.sparkContext = sparkContext; + this.table = table; + this.caseSensitive = caseSensitive; + this.expectedSchema = expectedSchema; + this.localityPreferred = Spark3Util.isLocalityEnabled(table.io(), table.location(), options); + + long tableSplitSize = PropertyUtil.propertyAsLong(table.properties(), SPLIT_SIZE, SPLIT_SIZE_DEFAULT); + this.splitSize = Spark3Util.propertyAsLong(options, SparkReadOptions.SPLIT_SIZE, tableSplitSize); + + int tableSplitLookback = PropertyUtil.propertyAsInt(table.properties(), SPLIT_LOOKBACK, SPLIT_LOOKBACK_DEFAULT); + this.splitLookback = Spark3Util.propertyAsInt(options, SparkReadOptions.LOOKBACK, tableSplitLookback); + + long tableSplitOpenFileCost = PropertyUtil.propertyAsLong( + table.properties(), SPLIT_OPEN_FILE_COST, SPLIT_OPEN_FILE_COST_DEFAULT); + this.splitOpenFileCost = Spark3Util.propertyAsLong(options, SPLIT_OPEN_FILE_COST, tableSplitOpenFileCost); + + InitialOffsetStore initialOffsetStore = InitialOffsetStore.getInstance(table, checkpointLocation); + this.initialOffset = getOrWriteInitialOffset(initialOffsetStore); + } + + @Override + public Offset latestOffset() { + table.refresh(); + Snapshot latestSnapshot = table.currentSnapshot(); + if (latestSnapshot == null) { + return StreamingOffset.START_OFFSET; + } + + // a readStream on an Iceberg table can be started from 2 types of snapshots + // 1. a valid starting Snapshot: + // when this valid starting Snapshot is the initialOffset - then, scanAllFiles must be set to true; + // for all StreamingOffsets following this - scanAllFiles must be set to false + // 2. START_OFFSET: + // if the stream started on the table from START_OFFSET - it implies - that all the subsequent Snapshots added + // will have all files as net New manifests & hence scanAllFiles can be false. + boolean scanAllFiles = !StreamingOffset.START_OFFSET.equals(initialOffset) && + latestSnapshot.snapshotId() == initialOffset.snapshotId(); + + String positionValue = scanAllFiles ? + latestSnapshot.summary().get(SnapshotSummary.TOTAL_DATA_FILES_PROP) : + latestSnapshot.summary().get(SnapshotSummary.ADDED_FILES_PROP); + + return new StreamingOffset( + latestSnapshot.snapshotId(), positionValue != null ? Long.parseLong(positionValue) : 0, scanAllFiles); Review comment: This is currently unused - but per my debugging - Snapshots of type `Delete`/`Replace` - doesn't set `ADDED_FILES_PROP` - at which point it defaults to 0 - ie., there are no files to read. ########## File path: spark3/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java ########## @@ -0,0 +1,314 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.spark.source; + +import java.io.BufferedWriter; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.io.OutputStreamWriter; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.List; +import org.apache.hadoop.fs.Path; +import org.apache.iceberg.CombinedScanTask; +import org.apache.iceberg.DataOperations; +import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.MicroBatches; +import org.apache.iceberg.MicroBatches.MicroBatch; +import org.apache.iceberg.Schema; +import org.apache.iceberg.SchemaParser; +import org.apache.iceberg.SerializableTable; +import org.apache.iceberg.Snapshot; +import org.apache.iceberg.SnapshotSummary; +import org.apache.iceberg.Table; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.spark.Spark3Util; +import org.apache.iceberg.spark.SparkReadOptions; +import org.apache.iceberg.spark.source.SparkBatchScan.ReadTask; +import org.apache.iceberg.spark.source.SparkBatchScan.ReaderFactory; +import org.apache.iceberg.util.PropertyUtil; +import org.apache.iceberg.util.SnapshotUtil; +import org.apache.iceberg.util.TableScanUtil; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.broadcast.Broadcast; +import org.apache.spark.sql.connector.read.InputPartition; +import org.apache.spark.sql.connector.read.PartitionReaderFactory; +import org.apache.spark.sql.connector.read.streaming.MicroBatchStream; +import org.apache.spark.sql.connector.read.streaming.Offset; +import org.apache.spark.sql.util.CaseInsensitiveStringMap; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.apache.iceberg.TableProperties.SPLIT_LOOKBACK; +import static org.apache.iceberg.TableProperties.SPLIT_LOOKBACK_DEFAULT; +import static org.apache.iceberg.TableProperties.SPLIT_OPEN_FILE_COST; +import static org.apache.iceberg.TableProperties.SPLIT_OPEN_FILE_COST_DEFAULT; +import static org.apache.iceberg.TableProperties.SPLIT_SIZE; +import static org.apache.iceberg.TableProperties.SPLIT_SIZE_DEFAULT; + +public class SparkMicroBatchStream implements MicroBatchStream { + private static final Logger LOG = LoggerFactory.getLogger(SparkMicroBatchStream.class); + + private final JavaSparkContext sparkContext; + private final Table table; + private final boolean caseSensitive; + private final Schema expectedSchema; + private final Long splitSize; + private final Integer splitLookback; + private final Long splitOpenFileCost; + private final boolean localityPreferred; + private final StreamingOffset initialOffset; + + SparkMicroBatchStream(JavaSparkContext sparkContext, + Table table, boolean caseSensitive, Schema expectedSchema, + CaseInsensitiveStringMap options, String checkpointLocation) { + this.sparkContext = sparkContext; + this.table = table; + this.caseSensitive = caseSensitive; + this.expectedSchema = expectedSchema; + this.localityPreferred = Spark3Util.isLocalityEnabled(table.io(), table.location(), options); + + long tableSplitSize = PropertyUtil.propertyAsLong(table.properties(), SPLIT_SIZE, SPLIT_SIZE_DEFAULT); + this.splitSize = Spark3Util.propertyAsLong(options, SparkReadOptions.SPLIT_SIZE, tableSplitSize); + + int tableSplitLookback = PropertyUtil.propertyAsInt(table.properties(), SPLIT_LOOKBACK, SPLIT_LOOKBACK_DEFAULT); + this.splitLookback = Spark3Util.propertyAsInt(options, SparkReadOptions.LOOKBACK, tableSplitLookback); + + long tableSplitOpenFileCost = PropertyUtil.propertyAsLong( + table.properties(), SPLIT_OPEN_FILE_COST, SPLIT_OPEN_FILE_COST_DEFAULT); + this.splitOpenFileCost = Spark3Util.propertyAsLong(options, SPLIT_OPEN_FILE_COST, tableSplitOpenFileCost); + + InitialOffsetStore initialOffsetStore = InitialOffsetStore.getInstance(table, checkpointLocation); + this.initialOffset = getOrWriteInitialOffset(initialOffsetStore); + } + + @Override + public Offset latestOffset() { Review comment: this is one of the primary reasons why I added `rate limiting` - as one of the items to address - in the **Next Steps** section - in the PR description. I will add this special case - as I would imagine a lot of users out there will run into this. ########## File path: spark3/src/main/java/org/apache/iceberg/spark/source/SparkBatchScan.java ########## @@ -108,6 +109,12 @@ public Batch toBatch() { return this; } + @Override + public MicroBatchStream toMicroBatchStream(String checkpointLocation) { + return new SparkMicroBatchStream( Review comment: I am resolving the comments - the ones which I felt - where we are able to drive reasonable consensus or tracked in my PR **What next** section - so that - **the ones that need more discussion will pop out**. Pl. definitely reopen or start a new comment - if you feel - the need for more discussion right here. ########## File path: spark3/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java ########## @@ -0,0 +1,314 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.spark.source; + +import java.io.BufferedWriter; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.io.OutputStreamWriter; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.List; +import org.apache.hadoop.fs.Path; +import org.apache.iceberg.CombinedScanTask; +import org.apache.iceberg.DataOperations; +import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.MicroBatches; +import org.apache.iceberg.MicroBatches.MicroBatch; +import org.apache.iceberg.Schema; +import org.apache.iceberg.SchemaParser; +import org.apache.iceberg.SerializableTable; +import org.apache.iceberg.Snapshot; +import org.apache.iceberg.SnapshotSummary; +import org.apache.iceberg.Table; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.spark.Spark3Util; +import org.apache.iceberg.spark.SparkReadOptions; +import org.apache.iceberg.spark.source.SparkBatchScan.ReadTask; +import org.apache.iceberg.spark.source.SparkBatchScan.ReaderFactory; +import org.apache.iceberg.util.PropertyUtil; +import org.apache.iceberg.util.SnapshotUtil; +import org.apache.iceberg.util.TableScanUtil; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.broadcast.Broadcast; +import org.apache.spark.sql.connector.read.InputPartition; +import org.apache.spark.sql.connector.read.PartitionReaderFactory; +import org.apache.spark.sql.connector.read.streaming.MicroBatchStream; +import org.apache.spark.sql.connector.read.streaming.Offset; +import org.apache.spark.sql.util.CaseInsensitiveStringMap; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.apache.iceberg.TableProperties.SPLIT_LOOKBACK; +import static org.apache.iceberg.TableProperties.SPLIT_LOOKBACK_DEFAULT; +import static org.apache.iceberg.TableProperties.SPLIT_OPEN_FILE_COST; +import static org.apache.iceberg.TableProperties.SPLIT_OPEN_FILE_COST_DEFAULT; +import static org.apache.iceberg.TableProperties.SPLIT_SIZE; +import static org.apache.iceberg.TableProperties.SPLIT_SIZE_DEFAULT; + +public class SparkMicroBatchStream implements MicroBatchStream { + private static final Logger LOG = LoggerFactory.getLogger(SparkMicroBatchStream.class); + + private final JavaSparkContext sparkContext; + private final Table table; + private final boolean caseSensitive; + private final Schema expectedSchema; + private final Long splitSize; + private final Integer splitLookback; + private final Long splitOpenFileCost; + private final boolean localityPreferred; + private final StreamingOffset initialOffset; + + SparkMicroBatchStream(JavaSparkContext sparkContext, + Table table, boolean caseSensitive, Schema expectedSchema, + CaseInsensitiveStringMap options, String checkpointLocation) { + this.sparkContext = sparkContext; + this.table = table; + this.caseSensitive = caseSensitive; + this.expectedSchema = expectedSchema; + this.localityPreferred = Spark3Util.isLocalityEnabled(table.io(), table.location(), options); + + long tableSplitSize = PropertyUtil.propertyAsLong(table.properties(), SPLIT_SIZE, SPLIT_SIZE_DEFAULT); + this.splitSize = Spark3Util.propertyAsLong(options, SparkReadOptions.SPLIT_SIZE, tableSplitSize); + + int tableSplitLookback = PropertyUtil.propertyAsInt(table.properties(), SPLIT_LOOKBACK, SPLIT_LOOKBACK_DEFAULT); + this.splitLookback = Spark3Util.propertyAsInt(options, SparkReadOptions.LOOKBACK, tableSplitLookback); + + long tableSplitOpenFileCost = PropertyUtil.propertyAsLong( + table.properties(), SPLIT_OPEN_FILE_COST, SPLIT_OPEN_FILE_COST_DEFAULT); + this.splitOpenFileCost = Spark3Util.propertyAsLong(options, SPLIT_OPEN_FILE_COST, tableSplitOpenFileCost); + + InitialOffsetStore initialOffsetStore = InitialOffsetStore.getInstance(table, checkpointLocation); + this.initialOffset = getOrWriteInitialOffset(initialOffsetStore); + } + + @Override + public Offset latestOffset() { + table.refresh(); + Snapshot latestSnapshot = table.currentSnapshot(); + if (latestSnapshot == null) { + return StreamingOffset.START_OFFSET; + } + + // a readStream on an Iceberg table can be started from 2 types of snapshots + // 1. a valid starting Snapshot: + // when this valid starting Snapshot is the initialOffset - then, scanAllFiles must be set to true; + // for all StreamingOffsets following this - scanAllFiles must be set to false + // 2. START_OFFSET: + // if the stream started on the table from START_OFFSET - it implies - that all the subsequent Snapshots added + // will have all files as net New manifests & hence scanAllFiles can be false. + boolean scanAllFiles = !StreamingOffset.START_OFFSET.equals(initialOffset) && + latestSnapshot.snapshotId() == initialOffset.snapshotId(); + + String positionValue = scanAllFiles ? Review comment: @aokolnychyi - is the concern here about - that the value being `NOT PRESENT` in `Snapshot.summary()`? or is it that the `value is present - but could be wrong`? ########## File path: spark3/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java ########## @@ -0,0 +1,314 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.spark.source; + +import java.io.BufferedWriter; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.io.OutputStreamWriter; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.List; +import org.apache.hadoop.fs.Path; +import org.apache.iceberg.CombinedScanTask; +import org.apache.iceberg.DataOperations; +import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.MicroBatches; +import org.apache.iceberg.MicroBatches.MicroBatch; +import org.apache.iceberg.Schema; +import org.apache.iceberg.SchemaParser; +import org.apache.iceberg.SerializableTable; +import org.apache.iceberg.Snapshot; +import org.apache.iceberg.SnapshotSummary; +import org.apache.iceberg.Table; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.spark.Spark3Util; +import org.apache.iceberg.spark.SparkReadOptions; +import org.apache.iceberg.spark.source.SparkBatchScan.ReadTask; +import org.apache.iceberg.spark.source.SparkBatchScan.ReaderFactory; +import org.apache.iceberg.util.PropertyUtil; +import org.apache.iceberg.util.SnapshotUtil; +import org.apache.iceberg.util.TableScanUtil; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.broadcast.Broadcast; +import org.apache.spark.sql.connector.read.InputPartition; +import org.apache.spark.sql.connector.read.PartitionReaderFactory; +import org.apache.spark.sql.connector.read.streaming.MicroBatchStream; +import org.apache.spark.sql.connector.read.streaming.Offset; +import org.apache.spark.sql.util.CaseInsensitiveStringMap; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.apache.iceberg.TableProperties.SPLIT_LOOKBACK; +import static org.apache.iceberg.TableProperties.SPLIT_LOOKBACK_DEFAULT; +import static org.apache.iceberg.TableProperties.SPLIT_OPEN_FILE_COST; +import static org.apache.iceberg.TableProperties.SPLIT_OPEN_FILE_COST_DEFAULT; +import static org.apache.iceberg.TableProperties.SPLIT_SIZE; +import static org.apache.iceberg.TableProperties.SPLIT_SIZE_DEFAULT; + +public class SparkMicroBatchStream implements MicroBatchStream { + private static final Logger LOG = LoggerFactory.getLogger(SparkMicroBatchStream.class); + + private final JavaSparkContext sparkContext; + private final Table table; + private final boolean caseSensitive; + private final Schema expectedSchema; + private final Long splitSize; + private final Integer splitLookback; + private final Long splitOpenFileCost; + private final boolean localityPreferred; + private final StreamingOffset initialOffset; + + SparkMicroBatchStream(JavaSparkContext sparkContext, + Table table, boolean caseSensitive, Schema expectedSchema, + CaseInsensitiveStringMap options, String checkpointLocation) { + this.sparkContext = sparkContext; + this.table = table; + this.caseSensitive = caseSensitive; + this.expectedSchema = expectedSchema; + this.localityPreferred = Spark3Util.isLocalityEnabled(table.io(), table.location(), options); + + long tableSplitSize = PropertyUtil.propertyAsLong(table.properties(), SPLIT_SIZE, SPLIT_SIZE_DEFAULT); + this.splitSize = Spark3Util.propertyAsLong(options, SparkReadOptions.SPLIT_SIZE, tableSplitSize); + + int tableSplitLookback = PropertyUtil.propertyAsInt(table.properties(), SPLIT_LOOKBACK, SPLIT_LOOKBACK_DEFAULT); + this.splitLookback = Spark3Util.propertyAsInt(options, SparkReadOptions.LOOKBACK, tableSplitLookback); + + long tableSplitOpenFileCost = PropertyUtil.propertyAsLong( + table.properties(), SPLIT_OPEN_FILE_COST, SPLIT_OPEN_FILE_COST_DEFAULT); + this.splitOpenFileCost = Spark3Util.propertyAsLong(options, SPLIT_OPEN_FILE_COST, tableSplitOpenFileCost); + + InitialOffsetStore initialOffsetStore = InitialOffsetStore.getInstance(table, checkpointLocation); + this.initialOffset = getOrWriteInitialOffset(initialOffsetStore); + } + + @Override + public Offset latestOffset() { + table.refresh(); + Snapshot latestSnapshot = table.currentSnapshot(); + if (latestSnapshot == null) { + return StreamingOffset.START_OFFSET; + } + + // a readStream on an Iceberg table can be started from 2 types of snapshots + // 1. a valid starting Snapshot: + // when this valid starting Snapshot is the initialOffset - then, scanAllFiles must be set to true; + // for all StreamingOffsets following this - scanAllFiles must be set to false + // 2. START_OFFSET: + // if the stream started on the table from START_OFFSET - it implies - that all the subsequent Snapshots added + // will have all files as net New manifests & hence scanAllFiles can be false. + boolean scanAllFiles = !StreamingOffset.START_OFFSET.equals(initialOffset) && + latestSnapshot.snapshotId() == initialOffset.snapshotId(); + + String positionValue = scanAllFiles ? + latestSnapshot.summary().get(SnapshotSummary.TOTAL_DATA_FILES_PROP) : + latestSnapshot.summary().get(SnapshotSummary.ADDED_FILES_PROP); + + return new StreamingOffset( + latestSnapshot.snapshotId(), positionValue != null ? Long.parseLong(positionValue) : 0, scanAllFiles); Review comment: I am resolving the comments - the ones which I felt - where we are able to drive reasonable consensus or tracked in my PR **What next** section - so that - **the ones that need more discussion will pop out**. Pl. definitely reopen or start a new comment - if you feel - the need for more discussion right here. ########## File path: spark3/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java ########## @@ -0,0 +1,314 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.spark.source; + +import java.io.BufferedWriter; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.io.OutputStreamWriter; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.List; +import org.apache.hadoop.fs.Path; +import org.apache.iceberg.CombinedScanTask; +import org.apache.iceberg.DataOperations; +import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.MicroBatches; +import org.apache.iceberg.MicroBatches.MicroBatch; +import org.apache.iceberg.Schema; +import org.apache.iceberg.SchemaParser; +import org.apache.iceberg.SerializableTable; +import org.apache.iceberg.Snapshot; +import org.apache.iceberg.SnapshotSummary; +import org.apache.iceberg.Table; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.spark.Spark3Util; +import org.apache.iceberg.spark.SparkReadOptions; +import org.apache.iceberg.spark.source.SparkBatchScan.ReadTask; +import org.apache.iceberg.spark.source.SparkBatchScan.ReaderFactory; +import org.apache.iceberg.util.PropertyUtil; +import org.apache.iceberg.util.SnapshotUtil; +import org.apache.iceberg.util.TableScanUtil; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.broadcast.Broadcast; +import org.apache.spark.sql.connector.read.InputPartition; +import org.apache.spark.sql.connector.read.PartitionReaderFactory; +import org.apache.spark.sql.connector.read.streaming.MicroBatchStream; +import org.apache.spark.sql.connector.read.streaming.Offset; +import org.apache.spark.sql.util.CaseInsensitiveStringMap; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.apache.iceberg.TableProperties.SPLIT_LOOKBACK; +import static org.apache.iceberg.TableProperties.SPLIT_LOOKBACK_DEFAULT; +import static org.apache.iceberg.TableProperties.SPLIT_OPEN_FILE_COST; +import static org.apache.iceberg.TableProperties.SPLIT_OPEN_FILE_COST_DEFAULT; +import static org.apache.iceberg.TableProperties.SPLIT_SIZE; +import static org.apache.iceberg.TableProperties.SPLIT_SIZE_DEFAULT; + +public class SparkMicroBatchStream implements MicroBatchStream { + private static final Logger LOG = LoggerFactory.getLogger(SparkMicroBatchStream.class); + + private final JavaSparkContext sparkContext; + private final Table table; + private final boolean caseSensitive; + private final Schema expectedSchema; + private final Long splitSize; + private final Integer splitLookback; + private final Long splitOpenFileCost; + private final boolean localityPreferred; + private final StreamingOffset initialOffset; + + SparkMicroBatchStream(JavaSparkContext sparkContext, + Table table, boolean caseSensitive, Schema expectedSchema, + CaseInsensitiveStringMap options, String checkpointLocation) { + this.sparkContext = sparkContext; + this.table = table; + this.caseSensitive = caseSensitive; + this.expectedSchema = expectedSchema; + this.localityPreferred = Spark3Util.isLocalityEnabled(table.io(), table.location(), options); + + long tableSplitSize = PropertyUtil.propertyAsLong(table.properties(), SPLIT_SIZE, SPLIT_SIZE_DEFAULT); + this.splitSize = Spark3Util.propertyAsLong(options, SparkReadOptions.SPLIT_SIZE, tableSplitSize); + + int tableSplitLookback = PropertyUtil.propertyAsInt(table.properties(), SPLIT_LOOKBACK, SPLIT_LOOKBACK_DEFAULT); + this.splitLookback = Spark3Util.propertyAsInt(options, SparkReadOptions.LOOKBACK, tableSplitLookback); + + long tableSplitOpenFileCost = PropertyUtil.propertyAsLong( + table.properties(), SPLIT_OPEN_FILE_COST, SPLIT_OPEN_FILE_COST_DEFAULT); + this.splitOpenFileCost = Spark3Util.propertyAsLong(options, SPLIT_OPEN_FILE_COST, tableSplitOpenFileCost); + + InitialOffsetStore initialOffsetStore = InitialOffsetStore.getInstance(table, checkpointLocation); + this.initialOffset = getOrWriteInitialOffset(initialOffsetStore); + } + + @Override + public Offset latestOffset() { + table.refresh(); + Snapshot latestSnapshot = table.currentSnapshot(); + if (latestSnapshot == null) { + return StreamingOffset.START_OFFSET; + } + + // a readStream on an Iceberg table can be started from 2 types of snapshots + // 1. a valid starting Snapshot: + // when this valid starting Snapshot is the initialOffset - then, scanAllFiles must be set to true; + // for all StreamingOffsets following this - scanAllFiles must be set to false + // 2. START_OFFSET: + // if the stream started on the table from START_OFFSET - it implies - that all the subsequent Snapshots added + // will have all files as net New manifests & hence scanAllFiles can be false. + boolean scanAllFiles = !StreamingOffset.START_OFFSET.equals(initialOffset) && + latestSnapshot.snapshotId() == initialOffset.snapshotId(); + + String positionValue = scanAllFiles ? + latestSnapshot.summary().get(SnapshotSummary.TOTAL_DATA_FILES_PROP) : + latestSnapshot.summary().get(SnapshotSummary.ADDED_FILES_PROP); + + return new StreamingOffset( + latestSnapshot.snapshotId(), positionValue != null ? Long.parseLong(positionValue) : 0, scanAllFiles); + } + + @Override + public InputPartition[] planInputPartitions(Offset start, Offset end) { + if (end.equals(StreamingOffset.START_OFFSET)) { + return new InputPartition[0]; + } + + // broadcast the table metadata as input partitions will be sent to executors + Broadcast<Table> tableBroadcast = sparkContext.broadcast(SerializableTable.copyOf(table)); + String expectedSchemaString = SchemaParser.toJson(expectedSchema); + + Preconditions.checkState( + end instanceof StreamingOffset, + "The end offset passed to planInputPartitions() is not an instance of StreamingOffset."); + + Preconditions.checkState( + start instanceof StreamingOffset, + "The start offset passed to planInputPartitions() is not an instance of StreamingOffset."); + + StreamingOffset endOffset = (StreamingOffset) end; + StreamingOffset startOffset = (StreamingOffset) start; + + List<FileScanTask> fileScanTasks = calculateFileScanTasks(startOffset, endOffset); + + CloseableIterable<FileScanTask> splitTasks = TableScanUtil.splitFiles( + CloseableIterable.withNoopClose(fileScanTasks), + splitSize); + List<CombinedScanTask> combinedScanTasks = Lists.newArrayList( + TableScanUtil.planTasks(splitTasks, splitSize, splitLookback, splitOpenFileCost)); + InputPartition[] readTasks = new InputPartition[combinedScanTasks.size()]; + + for (int i = 0; i < combinedScanTasks.size(); i++) { + readTasks[i] = new ReadTask( + combinedScanTasks.get(i), tableBroadcast, expectedSchemaString, + caseSensitive, localityPreferred); + } + + return readTasks; + } + + @Override + public PartitionReaderFactory createReaderFactory() { + return new ReaderFactory(0); + } + + @Override + public Offset initialOffset() { + return initialOffset; + } + + @Override + public Offset deserializeOffset(String json) { + return StreamingOffset.fromJson(json); + } + + @Override + public void commit(Offset end) { Review comment: I am resolving the comments - the ones which I felt - where we are able to drive reasonable consensus or tracked in my PR **What next** section - so that - **the ones that need more discussion will pop out**. Pl. definitely reopen or start a new comment - if you feel - the need for more discussion right here. ########## File path: spark3/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java ########## @@ -0,0 +1,313 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.spark.source; + +import java.io.BufferedWriter; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.io.OutputStreamWriter; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; +import org.apache.hadoop.fs.Path; +import org.apache.iceberg.CombinedScanTask; +import org.apache.iceberg.DataOperations; +import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.MicroBatches; +import org.apache.iceberg.MicroBatches.MicroBatch; +import org.apache.iceberg.Schema; +import org.apache.iceberg.SchemaParser; +import org.apache.iceberg.SerializableTable; +import org.apache.iceberg.Snapshot; +import org.apache.iceberg.SnapshotSummary; +import org.apache.iceberg.Table; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.spark.Spark3Util; +import org.apache.iceberg.spark.SparkReadOptions; +import org.apache.iceberg.spark.source.SparkBatchScan.ReadTask; +import org.apache.iceberg.spark.source.SparkBatchScan.ReaderFactory; +import org.apache.iceberg.util.PropertyUtil; +import org.apache.iceberg.util.SnapshotUtil; +import org.apache.iceberg.util.TableScanUtil; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.broadcast.Broadcast; +import org.apache.spark.sql.connector.read.InputPartition; +import org.apache.spark.sql.connector.read.PartitionReaderFactory; +import org.apache.spark.sql.connector.read.streaming.MicroBatchStream; +import org.apache.spark.sql.connector.read.streaming.Offset; +import org.apache.spark.sql.util.CaseInsensitiveStringMap; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.apache.iceberg.TableProperties.SPLIT_LOOKBACK; +import static org.apache.iceberg.TableProperties.SPLIT_LOOKBACK_DEFAULT; +import static org.apache.iceberg.TableProperties.SPLIT_OPEN_FILE_COST; +import static org.apache.iceberg.TableProperties.SPLIT_OPEN_FILE_COST_DEFAULT; +import static org.apache.iceberg.TableProperties.SPLIT_SIZE; +import static org.apache.iceberg.TableProperties.SPLIT_SIZE_DEFAULT; + +public class SparkMicroBatchStream implements MicroBatchStream { + private static final Logger LOG = LoggerFactory.getLogger(SparkMicroBatchStream.class); + + private final JavaSparkContext sparkContext; + private final Table table; + private final boolean caseSensitive; + private final Schema expectedSchema; + private final Long splitSize; + private final Integer splitLookback; + private final Long splitOpenFileCost; + private final boolean localityPreferred; + private final InitialOffsetStore initialOffsetStore; + private final StreamingOffset initialOffset; + + SparkMicroBatchStream(JavaSparkContext sparkContext, + Table table, boolean caseSensitive, Schema expectedSchema, + CaseInsensitiveStringMap options, String checkpointLocation) { + this.sparkContext = sparkContext; + this.table = table; + this.caseSensitive = caseSensitive; + this.expectedSchema = expectedSchema; + this.localityPreferred = Spark3Util.isLocalityEnabled(table.io(), table.location(), options); + this.splitSize = Optional.ofNullable(Spark3Util.propertyAsLong(options, SparkReadOptions.SPLIT_SIZE, null)) + .orElseGet(() -> PropertyUtil.propertyAsLong(table.properties(), SPLIT_SIZE, SPLIT_SIZE_DEFAULT)); + this.splitLookback = Optional.ofNullable(Spark3Util.propertyAsInt(options, SparkReadOptions.LOOKBACK, null)) + .orElseGet(() -> PropertyUtil.propertyAsInt(table.properties(), SPLIT_LOOKBACK, SPLIT_LOOKBACK_DEFAULT)); + this.splitOpenFileCost = Optional.ofNullable( + Spark3Util.propertyAsLong(options, SparkReadOptions.FILE_OPEN_COST, null)) + .orElseGet(() -> PropertyUtil.propertyAsLong(table.properties(), SPLIT_OPEN_FILE_COST, + SPLIT_OPEN_FILE_COST_DEFAULT)); + this.initialOffsetStore = InitialOffsetStore.getInstance(table, checkpointLocation); + this.initialOffset = getOrWriteInitialOffset(initialOffsetStore); + } + + @Override + public Offset latestOffset() { + table.refresh(); + Snapshot latestSnapshot = table.currentSnapshot(); + if (latestSnapshot == null) { + return StreamingOffset.START_OFFSET; + } + + // a readStream on an Iceberg table can be started from 2 types of snapshots + // 1. a valid starting Snapshot: + // when this valid starting Snapshot is the initialOffset - then, scanAllFiles must be set to true; + // for all StreamingOffsets following this - scanAllFiles must be set to false + // 2. START_OFFSET: + // if the stream started on the table from START_OFFSET - it implies - that all the subsequent Snapshots added + // will have all files as net New manifests & hence scanAllFiles can be false. + boolean scanAllFiles = !StreamingOffset.START_OFFSET.equals(initialOffset) && + latestSnapshot.snapshotId() == initialOffset.snapshotId(); + + String positionValue = scanAllFiles ? + latestSnapshot.summary().get(SnapshotSummary.TOTAL_DATA_FILES_PROP) : + latestSnapshot.summary().get(SnapshotSummary.ADDED_FILES_PROP); + + return new StreamingOffset( + latestSnapshot.snapshotId(), positionValue != null ? Long.parseLong(positionValue) : 0, scanAllFiles); + } + + @Override + public InputPartition[] planInputPartitions(Offset start, Offset end) { + if (end.equals(StreamingOffset.START_OFFSET)) { + return new InputPartition[0]; + } + + // broadcast the table metadata as input partitions will be sent to executors + Broadcast<Table> tableBroadcast = sparkContext.broadcast(SerializableTable.copyOf(table)); + String expectedSchemaString = SchemaParser.toJson(expectedSchema); + + Preconditions.checkState( + end instanceof StreamingOffset, + "The end offset passed to planInputPartitions() is not an instance of StreamingOffset."); + + Preconditions.checkState( + start instanceof StreamingOffset, + "The start offset passed to planInputPartitions() is not an instance of StreamingOffset."); + + StreamingOffset endOffset = (StreamingOffset) end; + StreamingOffset startOffset = (StreamingOffset) start; + + List<FileScanTask> fileScanTasks = calculateFileScanTasks(startOffset, endOffset); + + CloseableIterable<FileScanTask> splitTasks = TableScanUtil.splitFiles( + CloseableIterable.withNoopClose(fileScanTasks), + splitSize); + List<CombinedScanTask> combinedScanTasks = Lists.newArrayList( + TableScanUtil.planTasks(splitTasks, splitSize, splitLookback, splitOpenFileCost)); + InputPartition[] readTasks = new InputPartition[combinedScanTasks.size()]; + + for (int i = 0; i < combinedScanTasks.size(); i++) { + readTasks[i] = new ReadTask( + combinedScanTasks.get(i), tableBroadcast, expectedSchemaString, + caseSensitive, localityPreferred); + } + + return readTasks; + } + + @Override + public PartitionReaderFactory createReaderFactory() { + return new ReaderFactory(0); + } + + @Override + public Offset initialOffset() { + return initialOffset; + } + + @Override + public Offset deserializeOffset(String json) { + return StreamingOffset.fromJson(json); + } + + @Override + public void commit(Offset end) { + } + + @Override + public void stop() { + } + + private static StreamingOffset getOrWriteInitialOffset(InitialOffsetStore initialOffsetStore) { + if (initialOffsetStore.isOffsetStoreInitialized()) { + return initialOffsetStore.getInitialOffset(); + } + + return initialOffsetStore.addInitialOffset(); + } + + private List<FileScanTask> calculateFileScanTasks(StreamingOffset startOffset, StreamingOffset endOffset) { + List<FileScanTask> fileScanTasks = new ArrayList<>(); + MicroBatch latestMicroBatch = null; + StreamingOffset batchStartOffset = StreamingOffset.START_OFFSET.equals(startOffset) ? + new StreamingOffset(SnapshotUtil.oldestSnapshot(table).snapshotId(), 0, false) : + startOffset; + + do { + final StreamingOffset currentOffset = + latestMicroBatch != null && latestMicroBatch.lastIndexOfSnapshot() ? + getNextAvailableSnapshot(latestMicroBatch.snapshotId()) : + batchStartOffset; + + latestMicroBatch = MicroBatches.from(table.snapshot(currentOffset.snapshotId()), table.io()) + .caseSensitive(caseSensitive) + .specsById(table.specs()) + .generate(currentOffset.position(), Long.MAX_VALUE, currentOffset.shouldScanAllFiles()); + + fileScanTasks.addAll(latestMicroBatch.tasks()); + } while (latestMicroBatch.snapshotId() != endOffset.snapshotId()); + + return fileScanTasks; + } + + private StreamingOffset getNextAvailableSnapshot(long snapshotId) { + Snapshot previousSnapshot = table.snapshot(snapshotId); + Snapshot pointer = table.currentSnapshot(); + while (pointer != null && previousSnapshot.snapshotId() != pointer.parentId()) { + Preconditions.checkState(pointer.operation().equals(DataOperations.APPEND), + "Encountered Snapshot DataOperation other than APPEND."); + + pointer = table.snapshot(pointer.parentId()); + } + + Preconditions.checkState(pointer != null, + "Cannot read data from snapshot which has already expired: %s", snapshotId); + + return new StreamingOffset(pointer.snapshotId(), 0L, false); + } + + interface InitialOffsetStore { Review comment: & I am not fully certain - if `getOrWriteInitialOffset` will persist - as this class matures to support more options. So, conceptually, I wanted to keep this separation - that there is a tangible `InitialOffsetStore` - which is providing us with storing and checking on `initialOffset`. Did this make sense!? ########## File path: spark3/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java ########## @@ -0,0 +1,314 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.spark.source; + +import java.io.BufferedWriter; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.io.OutputStreamWriter; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.List; +import org.apache.hadoop.fs.Path; +import org.apache.iceberg.CombinedScanTask; +import org.apache.iceberg.DataOperations; +import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.MicroBatches; +import org.apache.iceberg.MicroBatches.MicroBatch; +import org.apache.iceberg.Schema; +import org.apache.iceberg.SchemaParser; +import org.apache.iceberg.SerializableTable; +import org.apache.iceberg.Snapshot; +import org.apache.iceberg.SnapshotSummary; +import org.apache.iceberg.Table; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.spark.Spark3Util; +import org.apache.iceberg.spark.SparkReadOptions; +import org.apache.iceberg.spark.source.SparkBatchScan.ReadTask; +import org.apache.iceberg.spark.source.SparkBatchScan.ReaderFactory; +import org.apache.iceberg.util.PropertyUtil; +import org.apache.iceberg.util.SnapshotUtil; +import org.apache.iceberg.util.TableScanUtil; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.broadcast.Broadcast; +import org.apache.spark.sql.connector.read.InputPartition; +import org.apache.spark.sql.connector.read.PartitionReaderFactory; +import org.apache.spark.sql.connector.read.streaming.MicroBatchStream; +import org.apache.spark.sql.connector.read.streaming.Offset; +import org.apache.spark.sql.util.CaseInsensitiveStringMap; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.apache.iceberg.TableProperties.SPLIT_LOOKBACK; +import static org.apache.iceberg.TableProperties.SPLIT_LOOKBACK_DEFAULT; +import static org.apache.iceberg.TableProperties.SPLIT_OPEN_FILE_COST; +import static org.apache.iceberg.TableProperties.SPLIT_OPEN_FILE_COST_DEFAULT; +import static org.apache.iceberg.TableProperties.SPLIT_SIZE; +import static org.apache.iceberg.TableProperties.SPLIT_SIZE_DEFAULT; + +public class SparkMicroBatchStream implements MicroBatchStream { + private static final Logger LOG = LoggerFactory.getLogger(SparkMicroBatchStream.class); + + private final JavaSparkContext sparkContext; + private final Table table; + private final boolean caseSensitive; + private final Schema expectedSchema; + private final Long splitSize; + private final Integer splitLookback; + private final Long splitOpenFileCost; + private final boolean localityPreferred; + private final StreamingOffset initialOffset; + + SparkMicroBatchStream(JavaSparkContext sparkContext, + Table table, boolean caseSensitive, Schema expectedSchema, + CaseInsensitiveStringMap options, String checkpointLocation) { + this.sparkContext = sparkContext; + this.table = table; + this.caseSensitive = caseSensitive; + this.expectedSchema = expectedSchema; + this.localityPreferred = Spark3Util.isLocalityEnabled(table.io(), table.location(), options); + + long tableSplitSize = PropertyUtil.propertyAsLong(table.properties(), SPLIT_SIZE, SPLIT_SIZE_DEFAULT); + this.splitSize = Spark3Util.propertyAsLong(options, SparkReadOptions.SPLIT_SIZE, tableSplitSize); + + int tableSplitLookback = PropertyUtil.propertyAsInt(table.properties(), SPLIT_LOOKBACK, SPLIT_LOOKBACK_DEFAULT); + this.splitLookback = Spark3Util.propertyAsInt(options, SparkReadOptions.LOOKBACK, tableSplitLookback); + + long tableSplitOpenFileCost = PropertyUtil.propertyAsLong( + table.properties(), SPLIT_OPEN_FILE_COST, SPLIT_OPEN_FILE_COST_DEFAULT); + this.splitOpenFileCost = Spark3Util.propertyAsLong(options, SPLIT_OPEN_FILE_COST, tableSplitOpenFileCost); + + InitialOffsetStore initialOffsetStore = InitialOffsetStore.getInstance(table, checkpointLocation); + this.initialOffset = getOrWriteInitialOffset(initialOffsetStore); + } + + @Override + public Offset latestOffset() { + table.refresh(); + Snapshot latestSnapshot = table.currentSnapshot(); + if (latestSnapshot == null) { + return StreamingOffset.START_OFFSET; + } + + // a readStream on an Iceberg table can be started from 2 types of snapshots + // 1. a valid starting Snapshot: + // when this valid starting Snapshot is the initialOffset - then, scanAllFiles must be set to true; + // for all StreamingOffsets following this - scanAllFiles must be set to false + // 2. START_OFFSET: + // if the stream started on the table from START_OFFSET - it implies - that all the subsequent Snapshots added + // will have all files as net New manifests & hence scanAllFiles can be false. + boolean scanAllFiles = !StreamingOffset.START_OFFSET.equals(initialOffset) && + latestSnapshot.snapshotId() == initialOffset.snapshotId(); + + String positionValue = scanAllFiles ? Review comment: @aokolnychyi - is the concern here about - that the value being `NOT PRESENT` in `Snapshot.summary()`? or is it that the `value is present - but could be wrong`? or both :) ########## File path: spark3/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java ########## @@ -0,0 +1,314 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.spark.source; + +import java.io.BufferedWriter; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.io.OutputStreamWriter; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.List; +import org.apache.hadoop.fs.Path; +import org.apache.iceberg.CombinedScanTask; +import org.apache.iceberg.DataOperations; +import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.MicroBatches; +import org.apache.iceberg.MicroBatches.MicroBatch; +import org.apache.iceberg.Schema; +import org.apache.iceberg.SchemaParser; +import org.apache.iceberg.SerializableTable; +import org.apache.iceberg.Snapshot; +import org.apache.iceberg.SnapshotSummary; +import org.apache.iceberg.Table; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.spark.Spark3Util; +import org.apache.iceberg.spark.SparkReadOptions; +import org.apache.iceberg.spark.source.SparkBatchScan.ReadTask; +import org.apache.iceberg.spark.source.SparkBatchScan.ReaderFactory; +import org.apache.iceberg.util.PropertyUtil; +import org.apache.iceberg.util.SnapshotUtil; +import org.apache.iceberg.util.TableScanUtil; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.broadcast.Broadcast; +import org.apache.spark.sql.connector.read.InputPartition; +import org.apache.spark.sql.connector.read.PartitionReaderFactory; +import org.apache.spark.sql.connector.read.streaming.MicroBatchStream; +import org.apache.spark.sql.connector.read.streaming.Offset; +import org.apache.spark.sql.util.CaseInsensitiveStringMap; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.apache.iceberg.TableProperties.SPLIT_LOOKBACK; +import static org.apache.iceberg.TableProperties.SPLIT_LOOKBACK_DEFAULT; +import static org.apache.iceberg.TableProperties.SPLIT_OPEN_FILE_COST; +import static org.apache.iceberg.TableProperties.SPLIT_OPEN_FILE_COST_DEFAULT; +import static org.apache.iceberg.TableProperties.SPLIT_SIZE; +import static org.apache.iceberg.TableProperties.SPLIT_SIZE_DEFAULT; + +public class SparkMicroBatchStream implements MicroBatchStream { + private static final Logger LOG = LoggerFactory.getLogger(SparkMicroBatchStream.class); + + private final JavaSparkContext sparkContext; + private final Table table; + private final boolean caseSensitive; + private final Schema expectedSchema; + private final Long splitSize; + private final Integer splitLookback; + private final Long splitOpenFileCost; + private final boolean localityPreferred; + private final StreamingOffset initialOffset; + + SparkMicroBatchStream(JavaSparkContext sparkContext, + Table table, boolean caseSensitive, Schema expectedSchema, + CaseInsensitiveStringMap options, String checkpointLocation) { + this.sparkContext = sparkContext; + this.table = table; + this.caseSensitive = caseSensitive; + this.expectedSchema = expectedSchema; + this.localityPreferred = Spark3Util.isLocalityEnabled(table.io(), table.location(), options); + + long tableSplitSize = PropertyUtil.propertyAsLong(table.properties(), SPLIT_SIZE, SPLIT_SIZE_DEFAULT); + this.splitSize = Spark3Util.propertyAsLong(options, SparkReadOptions.SPLIT_SIZE, tableSplitSize); + + int tableSplitLookback = PropertyUtil.propertyAsInt(table.properties(), SPLIT_LOOKBACK, SPLIT_LOOKBACK_DEFAULT); + this.splitLookback = Spark3Util.propertyAsInt(options, SparkReadOptions.LOOKBACK, tableSplitLookback); + + long tableSplitOpenFileCost = PropertyUtil.propertyAsLong( + table.properties(), SPLIT_OPEN_FILE_COST, SPLIT_OPEN_FILE_COST_DEFAULT); + this.splitOpenFileCost = Spark3Util.propertyAsLong(options, SPLIT_OPEN_FILE_COST, tableSplitOpenFileCost); + + InitialOffsetStore initialOffsetStore = InitialOffsetStore.getInstance(table, checkpointLocation); + this.initialOffset = getOrWriteInitialOffset(initialOffsetStore); + } + + @Override + public Offset latestOffset() { Review comment: I am resolving the comments - the ones which I felt - where we are able to drive reasonable consensus or is tracked in my PR **What next** section - so that - **the ones that need more discussion will pop out**. Pl. definitely reopen or start a new comment - if you feel - the need for more discussion right here. ########## File path: spark3/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java ########## @@ -0,0 +1,313 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.spark.source; + +import java.io.BufferedWriter; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.io.OutputStreamWriter; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; +import org.apache.hadoop.fs.Path; +import org.apache.iceberg.CombinedScanTask; +import org.apache.iceberg.DataOperations; +import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.MicroBatches; +import org.apache.iceberg.MicroBatches.MicroBatch; +import org.apache.iceberg.Schema; +import org.apache.iceberg.SchemaParser; +import org.apache.iceberg.SerializableTable; +import org.apache.iceberg.Snapshot; +import org.apache.iceberg.SnapshotSummary; +import org.apache.iceberg.Table; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.spark.Spark3Util; +import org.apache.iceberg.spark.SparkReadOptions; +import org.apache.iceberg.spark.source.SparkBatchScan.ReadTask; +import org.apache.iceberg.spark.source.SparkBatchScan.ReaderFactory; +import org.apache.iceberg.util.PropertyUtil; +import org.apache.iceberg.util.SnapshotUtil; +import org.apache.iceberg.util.TableScanUtil; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.broadcast.Broadcast; +import org.apache.spark.sql.connector.read.InputPartition; +import org.apache.spark.sql.connector.read.PartitionReaderFactory; +import org.apache.spark.sql.connector.read.streaming.MicroBatchStream; +import org.apache.spark.sql.connector.read.streaming.Offset; +import org.apache.spark.sql.util.CaseInsensitiveStringMap; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.apache.iceberg.TableProperties.SPLIT_LOOKBACK; +import static org.apache.iceberg.TableProperties.SPLIT_LOOKBACK_DEFAULT; +import static org.apache.iceberg.TableProperties.SPLIT_OPEN_FILE_COST; +import static org.apache.iceberg.TableProperties.SPLIT_OPEN_FILE_COST_DEFAULT; +import static org.apache.iceberg.TableProperties.SPLIT_SIZE; +import static org.apache.iceberg.TableProperties.SPLIT_SIZE_DEFAULT; + +public class SparkMicroBatchStream implements MicroBatchStream { + private static final Logger LOG = LoggerFactory.getLogger(SparkMicroBatchStream.class); + + private final JavaSparkContext sparkContext; + private final Table table; + private final boolean caseSensitive; + private final Schema expectedSchema; + private final Long splitSize; + private final Integer splitLookback; + private final Long splitOpenFileCost; + private final boolean localityPreferred; + private final InitialOffsetStore initialOffsetStore; + private final StreamingOffset initialOffset; + + SparkMicroBatchStream(JavaSparkContext sparkContext, + Table table, boolean caseSensitive, Schema expectedSchema, + CaseInsensitiveStringMap options, String checkpointLocation) { + this.sparkContext = sparkContext; + this.table = table; + this.caseSensitive = caseSensitive; + this.expectedSchema = expectedSchema; + this.localityPreferred = Spark3Util.isLocalityEnabled(table.io(), table.location(), options); + this.splitSize = Optional.ofNullable(Spark3Util.propertyAsLong(options, SparkReadOptions.SPLIT_SIZE, null)) + .orElseGet(() -> PropertyUtil.propertyAsLong(table.properties(), SPLIT_SIZE, SPLIT_SIZE_DEFAULT)); + this.splitLookback = Optional.ofNullable(Spark3Util.propertyAsInt(options, SparkReadOptions.LOOKBACK, null)) + .orElseGet(() -> PropertyUtil.propertyAsInt(table.properties(), SPLIT_LOOKBACK, SPLIT_LOOKBACK_DEFAULT)); + this.splitOpenFileCost = Optional.ofNullable( + Spark3Util.propertyAsLong(options, SparkReadOptions.FILE_OPEN_COST, null)) + .orElseGet(() -> PropertyUtil.propertyAsLong(table.properties(), SPLIT_OPEN_FILE_COST, + SPLIT_OPEN_FILE_COST_DEFAULT)); + this.initialOffsetStore = InitialOffsetStore.getInstance(table, checkpointLocation); + this.initialOffset = getOrWriteInitialOffset(initialOffsetStore); + } + + @Override + public Offset latestOffset() { + table.refresh(); + Snapshot latestSnapshot = table.currentSnapshot(); + if (latestSnapshot == null) { + return StreamingOffset.START_OFFSET; + } + + // a readStream on an Iceberg table can be started from 2 types of snapshots + // 1. a valid starting Snapshot: + // when this valid starting Snapshot is the initialOffset - then, scanAllFiles must be set to true; + // for all StreamingOffsets following this - scanAllFiles must be set to false + // 2. START_OFFSET: + // if the stream started on the table from START_OFFSET - it implies - that all the subsequent Snapshots added + // will have all files as net New manifests & hence scanAllFiles can be false. + boolean scanAllFiles = !StreamingOffset.START_OFFSET.equals(initialOffset) && + latestSnapshot.snapshotId() == initialOffset.snapshotId(); + + String positionValue = scanAllFiles ? + latestSnapshot.summary().get(SnapshotSummary.TOTAL_DATA_FILES_PROP) : + latestSnapshot.summary().get(SnapshotSummary.ADDED_FILES_PROP); + + return new StreamingOffset( + latestSnapshot.snapshotId(), positionValue != null ? Long.parseLong(positionValue) : 0, scanAllFiles); + } + + @Override + public InputPartition[] planInputPartitions(Offset start, Offset end) { + if (end.equals(StreamingOffset.START_OFFSET)) { + return new InputPartition[0]; + } + + // broadcast the table metadata as input partitions will be sent to executors + Broadcast<Table> tableBroadcast = sparkContext.broadcast(SerializableTable.copyOf(table)); + String expectedSchemaString = SchemaParser.toJson(expectedSchema); + + Preconditions.checkState( + end instanceof StreamingOffset, + "The end offset passed to planInputPartitions() is not an instance of StreamingOffset."); + + Preconditions.checkState( + start instanceof StreamingOffset, + "The start offset passed to planInputPartitions() is not an instance of StreamingOffset."); + + StreamingOffset endOffset = (StreamingOffset) end; + StreamingOffset startOffset = (StreamingOffset) start; + + List<FileScanTask> fileScanTasks = calculateFileScanTasks(startOffset, endOffset); + + CloseableIterable<FileScanTask> splitTasks = TableScanUtil.splitFiles( + CloseableIterable.withNoopClose(fileScanTasks), + splitSize); + List<CombinedScanTask> combinedScanTasks = Lists.newArrayList( + TableScanUtil.planTasks(splitTasks, splitSize, splitLookback, splitOpenFileCost)); + InputPartition[] readTasks = new InputPartition[combinedScanTasks.size()]; + + for (int i = 0; i < combinedScanTasks.size(); i++) { + readTasks[i] = new ReadTask( + combinedScanTasks.get(i), tableBroadcast, expectedSchemaString, + caseSensitive, localityPreferred); + } + + return readTasks; + } + + @Override + public PartitionReaderFactory createReaderFactory() { + return new ReaderFactory(0); + } + + @Override + public Offset initialOffset() { + return initialOffset; + } + + @Override + public Offset deserializeOffset(String json) { + return StreamingOffset.fromJson(json); + } + + @Override + public void commit(Offset end) { + } + + @Override + public void stop() { + } + + private static StreamingOffset getOrWriteInitialOffset(InitialOffsetStore initialOffsetStore) { + if (initialOffsetStore.isOffsetStoreInitialized()) { + return initialOffsetStore.getInitialOffset(); + } + + return initialOffsetStore.addInitialOffset(); + } + + private List<FileScanTask> calculateFileScanTasks(StreamingOffset startOffset, StreamingOffset endOffset) { + List<FileScanTask> fileScanTasks = new ArrayList<>(); + MicroBatch latestMicroBatch = null; + StreamingOffset batchStartOffset = StreamingOffset.START_OFFSET.equals(startOffset) ? + new StreamingOffset(SnapshotUtil.oldestSnapshot(table).snapshotId(), 0, false) : + startOffset; + + do { + final StreamingOffset currentOffset = + latestMicroBatch != null && latestMicroBatch.lastIndexOfSnapshot() ? + getNextAvailableSnapshot(latestMicroBatch.snapshotId()) : + batchStartOffset; + + latestMicroBatch = MicroBatches.from(table.snapshot(currentOffset.snapshotId()), table.io()) + .caseSensitive(caseSensitive) + .specsById(table.specs()) + .generate(currentOffset.position(), Long.MAX_VALUE, currentOffset.shouldScanAllFiles()); + + fileScanTasks.addAll(latestMicroBatch.tasks()); + } while (latestMicroBatch.snapshotId() != endOffset.snapshotId()); + + return fileScanTasks; + } + + private StreamingOffset getNextAvailableSnapshot(long snapshotId) { + Snapshot previousSnapshot = table.snapshot(snapshotId); + Snapshot pointer = table.currentSnapshot(); + while (pointer != null && previousSnapshot.snapshotId() != pointer.parentId()) { + Preconditions.checkState(pointer.operation().equals(DataOperations.APPEND), + "Encountered Snapshot DataOperation other than APPEND."); + + pointer = table.snapshot(pointer.parentId()); + } + + Preconditions.checkState(pointer != null, + "Cannot read data from snapshot which has already expired: %s", snapshotId); + + return new StreamingOffset(pointer.snapshotId(), 0L, false); + } + + interface InitialOffsetStore { Review comment: & I am not fully certain - if `getOrWriteInitialOffset` will exist after a few iterations - as this class matures to support more options. So, conceptually, I wanted to keep this separation - that there is a tangible `InitialOffsetStore` - which is providing us with storing and checking on `initialOffset`. Did this make sense!? ########## File path: spark3/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java ########## @@ -0,0 +1,313 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.spark.source; + +import java.io.BufferedWriter; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.io.OutputStreamWriter; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; +import org.apache.hadoop.fs.Path; +import org.apache.iceberg.CombinedScanTask; +import org.apache.iceberg.DataOperations; +import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.MicroBatches; +import org.apache.iceberg.MicroBatches.MicroBatch; +import org.apache.iceberg.Schema; +import org.apache.iceberg.SchemaParser; +import org.apache.iceberg.SerializableTable; +import org.apache.iceberg.Snapshot; +import org.apache.iceberg.SnapshotSummary; +import org.apache.iceberg.Table; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.spark.Spark3Util; +import org.apache.iceberg.spark.SparkReadOptions; +import org.apache.iceberg.spark.source.SparkBatchScan.ReadTask; +import org.apache.iceberg.spark.source.SparkBatchScan.ReaderFactory; +import org.apache.iceberg.util.PropertyUtil; +import org.apache.iceberg.util.SnapshotUtil; +import org.apache.iceberg.util.TableScanUtil; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.broadcast.Broadcast; +import org.apache.spark.sql.connector.read.InputPartition; +import org.apache.spark.sql.connector.read.PartitionReaderFactory; +import org.apache.spark.sql.connector.read.streaming.MicroBatchStream; +import org.apache.spark.sql.connector.read.streaming.Offset; +import org.apache.spark.sql.util.CaseInsensitiveStringMap; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.apache.iceberg.TableProperties.SPLIT_LOOKBACK; +import static org.apache.iceberg.TableProperties.SPLIT_LOOKBACK_DEFAULT; +import static org.apache.iceberg.TableProperties.SPLIT_OPEN_FILE_COST; +import static org.apache.iceberg.TableProperties.SPLIT_OPEN_FILE_COST_DEFAULT; +import static org.apache.iceberg.TableProperties.SPLIT_SIZE; +import static org.apache.iceberg.TableProperties.SPLIT_SIZE_DEFAULT; + +public class SparkMicroBatchStream implements MicroBatchStream { + private static final Logger LOG = LoggerFactory.getLogger(SparkMicroBatchStream.class); Review comment: hmm. this is probably why `SparkBatchScan` also have like literally only 1 debug log. let me remove this unused variable for now. ########## File path: spark3/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java ########## @@ -0,0 +1,313 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.spark.source; + +import java.io.BufferedWriter; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.io.OutputStreamWriter; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; +import org.apache.hadoop.fs.Path; +import org.apache.iceberg.CombinedScanTask; +import org.apache.iceberg.DataOperations; +import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.MicroBatches; +import org.apache.iceberg.MicroBatches.MicroBatch; +import org.apache.iceberg.Schema; +import org.apache.iceberg.SchemaParser; +import org.apache.iceberg.SerializableTable; +import org.apache.iceberg.Snapshot; +import org.apache.iceberg.SnapshotSummary; +import org.apache.iceberg.Table; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.spark.Spark3Util; +import org.apache.iceberg.spark.SparkReadOptions; +import org.apache.iceberg.spark.source.SparkBatchScan.ReadTask; +import org.apache.iceberg.spark.source.SparkBatchScan.ReaderFactory; +import org.apache.iceberg.util.PropertyUtil; +import org.apache.iceberg.util.SnapshotUtil; +import org.apache.iceberg.util.TableScanUtil; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.broadcast.Broadcast; +import org.apache.spark.sql.connector.read.InputPartition; +import org.apache.spark.sql.connector.read.PartitionReaderFactory; +import org.apache.spark.sql.connector.read.streaming.MicroBatchStream; +import org.apache.spark.sql.connector.read.streaming.Offset; +import org.apache.spark.sql.util.CaseInsensitiveStringMap; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.apache.iceberg.TableProperties.SPLIT_LOOKBACK; +import static org.apache.iceberg.TableProperties.SPLIT_LOOKBACK_DEFAULT; +import static org.apache.iceberg.TableProperties.SPLIT_OPEN_FILE_COST; +import static org.apache.iceberg.TableProperties.SPLIT_OPEN_FILE_COST_DEFAULT; +import static org.apache.iceberg.TableProperties.SPLIT_SIZE; +import static org.apache.iceberg.TableProperties.SPLIT_SIZE_DEFAULT; + +public class SparkMicroBatchStream implements MicroBatchStream { + private static final Logger LOG = LoggerFactory.getLogger(SparkMicroBatchStream.class); Review comment: did a scan on the file and couldn't really find a useful place to log. after invoking these interface methods - spark is already logging. this might be the reason why other Batch files doesn't have a single info log. removed this unused variable for now. pls. feel free to point me at specific places or reopen this as you see fit. ########## File path: spark3/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java ########## @@ -0,0 +1,314 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.spark.source; + +import java.io.BufferedWriter; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.io.OutputStreamWriter; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.List; +import org.apache.hadoop.fs.Path; +import org.apache.iceberg.CombinedScanTask; +import org.apache.iceberg.DataOperations; +import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.MicroBatches; +import org.apache.iceberg.MicroBatches.MicroBatch; +import org.apache.iceberg.Schema; +import org.apache.iceberg.SchemaParser; +import org.apache.iceberg.SerializableTable; +import org.apache.iceberg.Snapshot; +import org.apache.iceberg.SnapshotSummary; +import org.apache.iceberg.Table; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.spark.Spark3Util; +import org.apache.iceberg.spark.SparkReadOptions; +import org.apache.iceberg.spark.source.SparkBatchScan.ReadTask; +import org.apache.iceberg.spark.source.SparkBatchScan.ReaderFactory; +import org.apache.iceberg.util.PropertyUtil; +import org.apache.iceberg.util.SnapshotUtil; +import org.apache.iceberg.util.TableScanUtil; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.broadcast.Broadcast; +import org.apache.spark.sql.connector.read.InputPartition; +import org.apache.spark.sql.connector.read.PartitionReaderFactory; +import org.apache.spark.sql.connector.read.streaming.MicroBatchStream; +import org.apache.spark.sql.connector.read.streaming.Offset; +import org.apache.spark.sql.util.CaseInsensitiveStringMap; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.apache.iceberg.TableProperties.SPLIT_LOOKBACK; +import static org.apache.iceberg.TableProperties.SPLIT_LOOKBACK_DEFAULT; +import static org.apache.iceberg.TableProperties.SPLIT_OPEN_FILE_COST; +import static org.apache.iceberg.TableProperties.SPLIT_OPEN_FILE_COST_DEFAULT; +import static org.apache.iceberg.TableProperties.SPLIT_SIZE; +import static org.apache.iceberg.TableProperties.SPLIT_SIZE_DEFAULT; + +public class SparkMicroBatchStream implements MicroBatchStream { + private static final Logger LOG = LoggerFactory.getLogger(SparkMicroBatchStream.class); + + private final JavaSparkContext sparkContext; + private final Table table; + private final boolean caseSensitive; + private final Schema expectedSchema; + private final Long splitSize; + private final Integer splitLookback; + private final Long splitOpenFileCost; + private final boolean localityPreferred; + private final StreamingOffset initialOffset; + + SparkMicroBatchStream(JavaSparkContext sparkContext, + Table table, boolean caseSensitive, Schema expectedSchema, + CaseInsensitiveStringMap options, String checkpointLocation) { + this.sparkContext = sparkContext; + this.table = table; + this.caseSensitive = caseSensitive; + this.expectedSchema = expectedSchema; + this.localityPreferred = Spark3Util.isLocalityEnabled(table.io(), table.location(), options); + + long tableSplitSize = PropertyUtil.propertyAsLong(table.properties(), SPLIT_SIZE, SPLIT_SIZE_DEFAULT); + this.splitSize = Spark3Util.propertyAsLong(options, SparkReadOptions.SPLIT_SIZE, tableSplitSize); + + int tableSplitLookback = PropertyUtil.propertyAsInt(table.properties(), SPLIT_LOOKBACK, SPLIT_LOOKBACK_DEFAULT); + this.splitLookback = Spark3Util.propertyAsInt(options, SparkReadOptions.LOOKBACK, tableSplitLookback); + + long tableSplitOpenFileCost = PropertyUtil.propertyAsLong( + table.properties(), SPLIT_OPEN_FILE_COST, SPLIT_OPEN_FILE_COST_DEFAULT); + this.splitOpenFileCost = Spark3Util.propertyAsLong(options, SPLIT_OPEN_FILE_COST, tableSplitOpenFileCost); + + InitialOffsetStore initialOffsetStore = InitialOffsetStore.getInstance(table, checkpointLocation); + this.initialOffset = getOrWriteInitialOffset(initialOffsetStore); + } + + @Override + public Offset latestOffset() { + table.refresh(); + Snapshot latestSnapshot = table.currentSnapshot(); + if (latestSnapshot == null) { + return StreamingOffset.START_OFFSET; + } + + // a readStream on an Iceberg table can be started from 2 types of snapshots + // 1. a valid starting Snapshot: + // when this valid starting Snapshot is the initialOffset - then, scanAllFiles must be set to true; + // for all StreamingOffsets following this - scanAllFiles must be set to false + // 2. START_OFFSET: + // if the stream started on the table from START_OFFSET - it implies - that all the subsequent Snapshots added + // will have all files as net New manifests & hence scanAllFiles can be false. + boolean scanAllFiles = !StreamingOffset.START_OFFSET.equals(initialOffset) && + latestSnapshot.snapshotId() == initialOffset.snapshotId(); + + String positionValue = scanAllFiles ? Review comment: As @aokolnychyi expressed a concern on reliability of snapshot summary props - I switched the code to use the actual manifest stats (which is the actual **master of this data**). @RussellSpitzer - I will add this potential optimization possibility into **What Next** list - which I will translate into an issue - once this PR makes it in.. ########## File path: spark3/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java ########## @@ -0,0 +1,313 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.spark.source; + +import java.io.BufferedWriter; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.io.OutputStreamWriter; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; +import org.apache.hadoop.fs.Path; +import org.apache.iceberg.CombinedScanTask; +import org.apache.iceberg.DataOperations; +import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.MicroBatches; +import org.apache.iceberg.MicroBatches.MicroBatch; +import org.apache.iceberg.Schema; +import org.apache.iceberg.SchemaParser; +import org.apache.iceberg.SerializableTable; +import org.apache.iceberg.Snapshot; +import org.apache.iceberg.SnapshotSummary; +import org.apache.iceberg.Table; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.spark.Spark3Util; +import org.apache.iceberg.spark.SparkReadOptions; +import org.apache.iceberg.spark.source.SparkBatchScan.ReadTask; +import org.apache.iceberg.spark.source.SparkBatchScan.ReaderFactory; +import org.apache.iceberg.util.PropertyUtil; +import org.apache.iceberg.util.SnapshotUtil; +import org.apache.iceberg.util.TableScanUtil; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.broadcast.Broadcast; +import org.apache.spark.sql.connector.read.InputPartition; +import org.apache.spark.sql.connector.read.PartitionReaderFactory; +import org.apache.spark.sql.connector.read.streaming.MicroBatchStream; +import org.apache.spark.sql.connector.read.streaming.Offset; +import org.apache.spark.sql.util.CaseInsensitiveStringMap; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.apache.iceberg.TableProperties.SPLIT_LOOKBACK; +import static org.apache.iceberg.TableProperties.SPLIT_LOOKBACK_DEFAULT; +import static org.apache.iceberg.TableProperties.SPLIT_OPEN_FILE_COST; +import static org.apache.iceberg.TableProperties.SPLIT_OPEN_FILE_COST_DEFAULT; +import static org.apache.iceberg.TableProperties.SPLIT_SIZE; +import static org.apache.iceberg.TableProperties.SPLIT_SIZE_DEFAULT; + +public class SparkMicroBatchStream implements MicroBatchStream { + private static final Logger LOG = LoggerFactory.getLogger(SparkMicroBatchStream.class); + + private final JavaSparkContext sparkContext; + private final Table table; + private final boolean caseSensitive; + private final Schema expectedSchema; + private final Long splitSize; + private final Integer splitLookback; + private final Long splitOpenFileCost; + private final boolean localityPreferred; + private final InitialOffsetStore initialOffsetStore; + private final StreamingOffset initialOffset; + + SparkMicroBatchStream(JavaSparkContext sparkContext, + Table table, boolean caseSensitive, Schema expectedSchema, + CaseInsensitiveStringMap options, String checkpointLocation) { + this.sparkContext = sparkContext; + this.table = table; + this.caseSensitive = caseSensitive; + this.expectedSchema = expectedSchema; + this.localityPreferred = Spark3Util.isLocalityEnabled(table.io(), table.location(), options); + this.splitSize = Optional.ofNullable(Spark3Util.propertyAsLong(options, SparkReadOptions.SPLIT_SIZE, null)) + .orElseGet(() -> PropertyUtil.propertyAsLong(table.properties(), SPLIT_SIZE, SPLIT_SIZE_DEFAULT)); + this.splitLookback = Optional.ofNullable(Spark3Util.propertyAsInt(options, SparkReadOptions.LOOKBACK, null)) + .orElseGet(() -> PropertyUtil.propertyAsInt(table.properties(), SPLIT_LOOKBACK, SPLIT_LOOKBACK_DEFAULT)); + this.splitOpenFileCost = Optional.ofNullable( + Spark3Util.propertyAsLong(options, SparkReadOptions.FILE_OPEN_COST, null)) + .orElseGet(() -> PropertyUtil.propertyAsLong(table.properties(), SPLIT_OPEN_FILE_COST, + SPLIT_OPEN_FILE_COST_DEFAULT)); + this.initialOffsetStore = InitialOffsetStore.getInstance(table, checkpointLocation); + this.initialOffset = getOrWriteInitialOffset(initialOffsetStore); + } + + @Override + public Offset latestOffset() { + table.refresh(); + Snapshot latestSnapshot = table.currentSnapshot(); + if (latestSnapshot == null) { + return StreamingOffset.START_OFFSET; + } + + // a readStream on an Iceberg table can be started from 2 types of snapshots + // 1. a valid starting Snapshot: + // when this valid starting Snapshot is the initialOffset - then, scanAllFiles must be set to true; + // for all StreamingOffsets following this - scanAllFiles must be set to false + // 2. START_OFFSET: + // if the stream started on the table from START_OFFSET - it implies - that all the subsequent Snapshots added + // will have all files as net New manifests & hence scanAllFiles can be false. + boolean scanAllFiles = !StreamingOffset.START_OFFSET.equals(initialOffset) && + latestSnapshot.snapshotId() == initialOffset.snapshotId(); + + String positionValue = scanAllFiles ? + latestSnapshot.summary().get(SnapshotSummary.TOTAL_DATA_FILES_PROP) : + latestSnapshot.summary().get(SnapshotSummary.ADDED_FILES_PROP); + + return new StreamingOffset( + latestSnapshot.snapshotId(), positionValue != null ? Long.parseLong(positionValue) : 0, scanAllFiles); + } + + @Override + public InputPartition[] planInputPartitions(Offset start, Offset end) { + if (end.equals(StreamingOffset.START_OFFSET)) { + return new InputPartition[0]; + } + + // broadcast the table metadata as input partitions will be sent to executors + Broadcast<Table> tableBroadcast = sparkContext.broadcast(SerializableTable.copyOf(table)); + String expectedSchemaString = SchemaParser.toJson(expectedSchema); + + Preconditions.checkState( + end instanceof StreamingOffset, + "The end offset passed to planInputPartitions() is not an instance of StreamingOffset."); + + Preconditions.checkState( + start instanceof StreamingOffset, + "The start offset passed to planInputPartitions() is not an instance of StreamingOffset."); + + StreamingOffset endOffset = (StreamingOffset) end; + StreamingOffset startOffset = (StreamingOffset) start; + + List<FileScanTask> fileScanTasks = calculateFileScanTasks(startOffset, endOffset); + + CloseableIterable<FileScanTask> splitTasks = TableScanUtil.splitFiles( + CloseableIterable.withNoopClose(fileScanTasks), + splitSize); + List<CombinedScanTask> combinedScanTasks = Lists.newArrayList( + TableScanUtil.planTasks(splitTasks, splitSize, splitLookback, splitOpenFileCost)); + InputPartition[] readTasks = new InputPartition[combinedScanTasks.size()]; + + for (int i = 0; i < combinedScanTasks.size(); i++) { + readTasks[i] = new ReadTask( + combinedScanTasks.get(i), tableBroadcast, expectedSchemaString, + caseSensitive, localityPreferred); + } + + return readTasks; + } + + @Override + public PartitionReaderFactory createReaderFactory() { + return new ReaderFactory(0); + } + + @Override + public Offset initialOffset() { + return initialOffset; + } + + @Override + public Offset deserializeOffset(String json) { + return StreamingOffset.fromJson(json); + } + + @Override + public void commit(Offset end) { + } + + @Override + public void stop() { + } + + private static StreamingOffset getOrWriteInitialOffset(InitialOffsetStore initialOffsetStore) { + if (initialOffsetStore.isOffsetStoreInitialized()) { + return initialOffsetStore.getInitialOffset(); + } + + return initialOffsetStore.addInitialOffset(); + } + + private List<FileScanTask> calculateFileScanTasks(StreamingOffset startOffset, StreamingOffset endOffset) { + List<FileScanTask> fileScanTasks = new ArrayList<>(); + MicroBatch latestMicroBatch = null; + StreamingOffset batchStartOffset = StreamingOffset.START_OFFSET.equals(startOffset) ? + new StreamingOffset(SnapshotUtil.oldestSnapshot(table).snapshotId(), 0, false) : + startOffset; + + do { + final StreamingOffset currentOffset = + latestMicroBatch != null && latestMicroBatch.lastIndexOfSnapshot() ? + getNextAvailableSnapshot(latestMicroBatch.snapshotId()) : + batchStartOffset; + + latestMicroBatch = MicroBatches.from(table.snapshot(currentOffset.snapshotId()), table.io()) + .caseSensitive(caseSensitive) + .specsById(table.specs()) + .generate(currentOffset.position(), Long.MAX_VALUE, currentOffset.shouldScanAllFiles()); + + fileScanTasks.addAll(latestMicroBatch.tasks()); + } while (latestMicroBatch.snapshotId() != endOffset.snapshotId()); + + return fileScanTasks; + } + + private StreamingOffset getNextAvailableSnapshot(long snapshotId) { + Snapshot previousSnapshot = table.snapshot(snapshotId); + Snapshot pointer = table.currentSnapshot(); + while (pointer != null && previousSnapshot.snapshotId() != pointer.parentId()) { + Preconditions.checkState(pointer.operation().equals(DataOperations.APPEND), + "Encountered Snapshot DataOperation other than APPEND."); + + pointer = table.snapshot(pointer.parentId()); + } + + Preconditions.checkState(pointer != null, + "Cannot read data from snapshot which has already expired: %s", snapshotId); + + return new StreamingOffset(pointer.snapshotId(), 0L, false); + } + + interface InitialOffsetStore { Review comment: resolving this - as this is a refactor / code-readability related question and i provided a bit reasonable explanation. pl. definitely reopen or ping me on slack - if you feel strongly that we should not do this. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
