openinx commented on a change in pull request #2105:
URL: https://github.com/apache/iceberg/pull/2105#discussion_r568303231
##########
File path: flink/src/main/java/org/apache/iceberg/flink/source/DataIterator.java
##########
@@ -79,6 +83,27 @@ InputFile getInputFile(String location) {
return inputFiles.get(location);
}
+ public void seek(CheckpointedPosition checkpointedPosition) {
Review comment:
Currently, we put those two-level iterators inside a single
DataIterator, that makes the code a bit complex to read and understand. I'd
prefer to make this into two different iterators:
1. FileRecordIterator, that will seek the provided row offset and then
continue to read the following records.
2. CombinedTaskRecordIterator, that will have multiple
`FileRecordIterator`s, it will locate the latest opening `FileRecordIterator`
and seek to the given row offset to read the following records.
##########
File path: flink/src/main/java/org/apache/iceberg/flink/TableInfo.java
##########
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink;
+
+import java.io.Serializable;
+import java.util.Map;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.encryption.EncryptionManager;
+import org.apache.iceberg.io.FileIO;
+
+/**
+ * This util class holds the serializable parts of {@link
org.apache.iceberg.Table}
+ */
+public class TableInfo implements Serializable {
Review comment:
I remember that there's an issue which have already made this `Table` to
be serializable: https://github.com/apache/iceberg/pull/2046. So we may don't
need this serializable `TableInfo` any more.
##########
File path:
flink/src/main/java/org/apache/iceberg/flink/source/FlinkSplitGenerator.java
##########
@@ -21,29 +21,46 @@
import java.io.IOException;
import java.io.UncheckedIOException;
+import java.util.ArrayList;
import java.util.List;
import org.apache.iceberg.CombinedScanTask;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.TableScan;
import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
-class FlinkSplitGenerator {
+public class FlinkSplitGenerator {
private FlinkSplitGenerator() {
}
static FlinkInputSplit[] createInputSplits(Table table, ScanContext context)
{
- List<CombinedScanTask> tasks = tasks(table, context);
- FlinkInputSplit[] splits = new FlinkInputSplit[tasks.size()];
- for (int i = 0; i < tasks.size(); i++) {
- splits[i] = new FlinkInputSplit(i, tasks.get(i));
+ try (CloseableIterable<CombinedScanTask> tasksIterable = planTasks(table,
context)) {
+ List<CombinedScanTask> tasks = Lists.newArrayList(tasksIterable);
+ FlinkInputSplit[] splits = new FlinkInputSplit[tasks.size()];
+ for (int i = 0; i < tasks.size(); i++) {
+ splits[i] = new FlinkInputSplit(i, tasks.get(i));
+ }
+ return splits;
+ } catch (IOException e) {
+ throw new UncheckedIOException("Failed to process tasks iterable", e);
+ }
+ }
+
+ public static List<IcebergSourceSplit> planIcebergSourceSplits(
+ Table table, ScanContext context) {
+ try (CloseableIterable<CombinedScanTask> tasksIterable = planTasks(table,
context)) {
+ List<IcebergSourceSplit> splits = new ArrayList<>();
Review comment:
Nit: Lists.newArrayList ?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]