kbendick commented on a change in pull request #2105: URL: https://github.com/apache/iceberg/pull/2105#discussion_r736786937
########## File path: flink/src/main/java/org/apache/iceberg/flink/source/assigner/GetSplitResult.java ########## @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.flink.source.assigner; + +import javax.annotation.Nullable; +import org.apache.flink.annotation.Internal; +import org.apache.iceberg.flink.source.split.IcebergSourceSplit; + +@Internal +public class GetSplitResult { + + public enum Status { + + AVAILABLE, + + /** + * There are pending splits. But they can't be assigned + * due to constraints (like event time alignment) + */ + CONSTRAINED, + + /** + * Assigner doesn't have pending splits. + */ + UNAVAILABLE + } + + private final Status status; + @Nullable + private final IcebergSourceSplit split; + + public GetSplitResult(Status status) { + this(status, null); + } + + public GetSplitResult(Status status, @Nullable IcebergSourceSplit split) { + if (null == split && status == Status.AVAILABLE) { + throw new IllegalArgumentException("Available status must have a non-null split"); Review comment: Nit: this error message is a bit confusing for me. Maybe `Splits cannot be null if their result is marked as Available`? I think my confusion stems from the phrasing `Available status must...`. Doesn't leave much for the user about what `Available status is`. Though if they encounter this error, then maybe it's fair to say that they should likely be digging through enough to find out. ########## File path: flink/src/main/java/org/apache/iceberg/flink/FlinkConfigOptions.java ########## @@ -40,4 +40,10 @@ private FlinkConfigOptions() { .intType() .defaultValue(100) .withDescription("Sets max infer parallelism for source operator."); + + public static final ConfigOption<Integer> SOURCE_READER_FETCH_BATCH_SIZE = ConfigOptions + .key("source.iceberg.reader.fetch-batch-size") + .intType() + .defaultValue(2048) + .withDescription("The target batch size for split reader fetch."); Review comment: Is this target batch size for number of records per split, bytes, or what? I figure I'll find out further on in my reading but might be good to add to the description regardless 🙂 ########## File path: flink/src/main/java/org/apache/iceberg/flink/source/DataIterator.java ########## @@ -37,20 +38,46 @@ */ @Internal public class DataIterator<T> implements CloseableIterator<T> { - private final FileScanTaskReader<T> fileScanTaskReader; - private final InputFilesDecryptor inputFilesDecryptor; + private final CombinedScanTask combinedTask; + private Iterator<FileScanTask> tasks; private CloseableIterator<T> currentIterator; + private Position position; public DataIterator(FileScanTaskReader<T> fileScanTaskReader, CombinedScanTask task, FileIO io, EncryptionManager encryption) { this.fileScanTaskReader = fileScanTaskReader; - this.inputFilesDecryptor = new InputFilesDecryptor(task, io, encryption); + this.combinedTask = task; + this.tasks = task.files().iterator(); this.currentIterator = CloseableIterator.empty(); + // fileOffset starts at -1 because we started + // from an empty iterator that is not from the split files. + this.position = new Position(-1L, 0L); Review comment: Nit: Maybe we can make this a named constant? ########## File path: flink/src/main/java/org/apache/iceberg/flink/source/assigner/GetSplitResult.java ########## @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.flink.source.assigner; + +import javax.annotation.Nullable; +import org.apache.flink.annotation.Internal; +import org.apache.iceberg.flink.source.split.IcebergSourceSplit; + +@Internal Review comment: +1 to the annotations. ########## File path: flink/src/main/java/org/apache/iceberg/flink/source/ScanContext.java ########## @@ -68,6 +68,9 @@ private static final ConfigOption<Duration> MONITOR_INTERVAL = ConfigOptions.key("monitor-interval").durationType().defaultValue(Duration.ofSeconds(10)); + private static final ConfigOption<Boolean> INCLUDE_COLUMN_STATS = + ConfigOptions.key("include-column-stats").booleanType().defaultValue(false); Review comment: What's the reason for defaulting this to `false`? Will it throw if we use that and there aren't column stats? ########## File path: flink/src/main/java/org/apache/iceberg/flink/FlinkConfigOptions.java ########## @@ -40,4 +40,10 @@ private FlinkConfigOptions() { .intType() .defaultValue(100) .withDescription("Sets max infer parallelism for source operator."); + + public static final ConfigOption<Integer> SOURCE_READER_FETCH_BATCH_SIZE = ConfigOptions + .key("source.iceberg.reader.fetch-batch-size") Review comment: Eventually, we should consider making these constants. But unrelated to this PR. ########## File path: flink/src/main/java/org/apache/iceberg/flink/source/RowDataFileScanTaskReader.java ########## @@ -48,7 +48,6 @@ @Internal public class RowDataFileScanTaskReader implements FileScanTaskReader<RowData> { - Review comment: Nit: Unnecessary diff ########## File path: flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java ########## @@ -0,0 +1,203 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.flink.source; + +import java.io.IOException; +import java.util.List; +import javax.annotation.Nullable; +import org.apache.flink.annotation.Experimental; +import org.apache.flink.api.connector.source.Boundedness; +import org.apache.flink.api.connector.source.Source; +import org.apache.flink.api.connector.source.SourceReader; +import org.apache.flink.api.connector.source.SourceReaderContext; +import org.apache.flink.api.connector.source.SplitEnumerator; +import org.apache.flink.api.connector.source.SplitEnumeratorContext; +import org.apache.flink.connector.file.src.reader.BulkFormat; +import org.apache.flink.core.io.SimpleVersionedSerializer; +import org.apache.flink.util.Preconditions; +import org.apache.iceberg.Table; +import org.apache.iceberg.flink.TableLoader; +import org.apache.iceberg.flink.source.assigner.SplitAssigner; +import org.apache.iceberg.flink.source.assigner.SplitAssignerFactory; +import org.apache.iceberg.flink.source.enumerator.ContinuousIcebergEnumerator; +import org.apache.iceberg.flink.source.enumerator.ContinuousSplitPlanner; +import org.apache.iceberg.flink.source.enumerator.ContinuousSplitPlannerImpl; +import org.apache.iceberg.flink.source.enumerator.IcebergEnumeratorConfig; +import org.apache.iceberg.flink.source.enumerator.IcebergEnumeratorState; +import org.apache.iceberg.flink.source.enumerator.IcebergEnumeratorStateSerializer; +import org.apache.iceberg.flink.source.enumerator.StaticIcebergEnumerator; +import org.apache.iceberg.flink.source.reader.IcebergSourceReader; +import org.apache.iceberg.flink.source.split.IcebergSourceSplit; +import org.apache.iceberg.flink.source.split.IcebergSourceSplitSerializer; + +@Experimental +public class IcebergSource<T> implements Source<T, IcebergSourceSplit, IcebergEnumeratorState> { + + private final TableLoader tableLoader; + private final ScanContext scanContext; + private final BulkFormat<T, IcebergSourceSplit> bulkFormat; + private final SplitAssignerFactory assignerFactory; + private final IcebergEnumeratorConfig enumeratorConfig; + + IcebergSource( + TableLoader tableLoader, + ScanContext scanContext, + BulkFormat<T, IcebergSourceSplit> bulkFormat, + SplitAssignerFactory assignerFactory, + IcebergEnumeratorConfig enumeratorConfig) { + + this.tableLoader = tableLoader; + this.enumeratorConfig = enumeratorConfig; + this.scanContext = scanContext; + this.bulkFormat = bulkFormat; + this.assignerFactory = assignerFactory; + } + + private static Table loadTable(TableLoader tableLoader) { + tableLoader.open(); + try (TableLoader loader = tableLoader) { + return loader.loadTable(); + } catch (IOException e) { + throw new RuntimeException("Failed to close table loader", e); + } + } + + @Override + public Boundedness getBoundedness() { + return enumeratorConfig.splitDiscoveryInterval() == null ? Review comment: Yeah agree with Steven that it's not always intuitive but it does fall in line with their definition. ########## File path: flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java ########## @@ -0,0 +1,207 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.flink.source; + +import java.io.IOException; +import javax.annotation.Nullable; +import org.apache.flink.annotation.Experimental; +import org.apache.flink.api.connector.source.Boundedness; +import org.apache.flink.api.connector.source.Source; +import org.apache.flink.api.connector.source.SourceReader; +import org.apache.flink.api.connector.source.SourceReaderContext; +import org.apache.flink.api.connector.source.SplitEnumerator; +import org.apache.flink.api.connector.source.SplitEnumeratorContext; +import org.apache.flink.core.io.SimpleVersionedSerializer; +import org.apache.flink.util.Preconditions; +import org.apache.iceberg.Table; +import org.apache.iceberg.flink.TableLoader; +import org.apache.iceberg.flink.source.assigner.SplitAssigner; +import org.apache.iceberg.flink.source.assigner.SplitAssignerFactory; +import org.apache.iceberg.flink.source.enumerator.ContinuousIcebergEnumerator; +import org.apache.iceberg.flink.source.enumerator.ContinuousSplitPlanner; +import org.apache.iceberg.flink.source.enumerator.ContinuousSplitPlannerImpl; +import org.apache.iceberg.flink.source.enumerator.IcebergEnumeratorConfig; +import org.apache.iceberg.flink.source.enumerator.IcebergEnumeratorState; +import org.apache.iceberg.flink.source.enumerator.IcebergEnumeratorStateSerializer; +import org.apache.iceberg.flink.source.enumerator.StaticIcebergEnumerator; +import org.apache.iceberg.flink.source.reader.IcebergSourceReader; +import org.apache.iceberg.flink.source.reader.IcebergSourceReaderMetrics; +import org.apache.iceberg.flink.source.reader.ReaderFunction; +import org.apache.iceberg.flink.source.split.IcebergSourceSplit; +import org.apache.iceberg.flink.source.split.IcebergSourceSplitSerializer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@Experimental +public class IcebergSource<T> implements Source<T, IcebergSourceSplit, IcebergEnumeratorState> { + private static final Logger LOG = LoggerFactory.getLogger(IcebergSource.class); + + private final TableLoader tableLoader; + private final ScanContext scanContext; + private final ReaderFunction<T> readerFunction; + private final SplitAssignerFactory assignerFactory; + private final IcebergEnumeratorConfig enumeratorConfig; + + IcebergSource( + TableLoader tableLoader, + ScanContext scanContext, + ReaderFunction<T> readerFunction, + SplitAssignerFactory assignerFactory, + IcebergEnumeratorConfig enumeratorConfig) { + + this.tableLoader = tableLoader; + this.enumeratorConfig = enumeratorConfig; + this.scanContext = scanContext; + this.readerFunction = readerFunction; + this.assignerFactory = assignerFactory; + } + + private static Table loadTable(TableLoader tableLoader) { + tableLoader.open(); + try (TableLoader loader = tableLoader) { Review comment: Is there a reason not to put the call to `.open()` in the try-with-resources block? Like so: ```java try(TableLoader loader = tableLoader.open()) { ... } ``` ########## File path: flink/src/main/java/org/apache/iceberg/flink/source/assigner/SimpleSplitAssigner.java ########## @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.flink.source.assigner; + +import java.util.ArrayDeque; +import java.util.Collection; +import java.util.Deque; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.stream.Collectors; +import javax.annotation.Nullable; +import org.apache.iceberg.flink.source.split.IcebergSourceSplit; +import org.apache.iceberg.flink.source.split.IcebergSourceSplitStatus; + +/** + * This assigner hands out splits without any guarantee in order or locality. + * <p> + * Since all methods are called in the source coordinator thread by enumerator, + * there is no need for locking. + */ +public class SimpleSplitAssigner implements SplitAssigner { + + private final Deque<IcebergSourceSplit> pendingSplits; + private CompletableFuture<Void> availableFuture; + + SimpleSplitAssigner(Deque<IcebergSourceSplit> pendingSplits) { + this.pendingSplits = pendingSplits; + } + + public SimpleSplitAssigner() { + this(new ArrayDeque<>()); + } + + public SimpleSplitAssigner(Map<IcebergSourceSplit, IcebergSourceSplitStatus> state) { + this(new ArrayDeque<>(state.keySet())); + } + + @Override + public GetSplitResult getNext(@Nullable String hostname) { + if (pendingSplits.isEmpty()) { + return new GetSplitResult(GetSplitResult.Status.UNAVAILABLE); + } else { + IcebergSourceSplit split = pendingSplits.poll(); + return new GetSplitResult(GetSplitResult.Status.AVAILABLE, split); + } + } + + @Override + public void onDiscoveredSplits(Collection<IcebergSourceSplit> splits) { + pendingSplits.addAll(splits); + completeAvailableFuturesIfNeeded(); + } + + @Override + public void onUnassignedSplits(Collection<IcebergSourceSplit> splits) { + pendingSplits.addAll(splits); + completeAvailableFuturesIfNeeded(); + } + + /** + * Simple assigner only tracks unassigned splits Review comment: Question: How does Simple assigner handle CONSTRAINED splits? In this model (for Simple / non-location aware) tracking / assigning, is `CONSTRAINED` synonymous with `UNASSIGNED`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
