This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 64348055a1 [improvement](iceberg) Optimize the split to the
user-specified size #22078
64348055a1 is described below
commit 64348055a1b449b018018123b4828c5974591838
Author: wuwenchi <[email protected]>
AuthorDate: Mon Jul 24 08:48:10 2023 +0800
[improvement](iceberg) Optimize the split to the user-specified size #22078
According to the specified split size, the split tasks are merged to keep a
single task near the expected size.
---
.../planner/external/iceberg/IcebergScanNode.java | 24 ++++++++++++++++------
1 file changed, 18 insertions(+), 6 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergScanNode.java
index 2de2f8291c..3d3634fb66 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergScanNode.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergScanNode.java
@@ -47,6 +47,7 @@ import org.apache.doris.thrift.TTableFormatFileDesc;
import avro.shaded.com.google.common.base.Preconditions;
import org.apache.hadoop.fs.Path;
import org.apache.iceberg.BaseTable;
+import org.apache.iceberg.CombinedScanTask;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.FileContent;
import org.apache.iceberg.FileScanTask;
@@ -57,8 +58,11 @@ import org.apache.iceberg.Table;
import org.apache.iceberg.TableScan;
import org.apache.iceberg.exceptions.NotFoundException;
import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.types.Conversions;
+import org.apache.iceberg.util.TableScanUtil;
+import java.io.IOException;
import java.nio.ByteBuffer;
import java.time.Instant;
import java.util.ArrayList;
@@ -179,21 +183,29 @@ public class IcebergScanNode extends FileQueryScanNode {
int formatVersion = ((BaseTable)
table).operations().current().formatVersion();
// Min split size is DEFAULT_SPLIT_SIZE(128MB).
long splitSize =
Math.max(ConnectContext.get().getSessionVariable().getFileSplitSize(),
DEFAULT_SPLIT_SIZE);
- for (FileScanTask task : scan.planFiles()) {
- long fileSize = task.file().fileSizeInBytes();
- for (FileScanTask splitTask : task.split(splitSize)) {
+ CloseableIterable<FileScanTask> fileScanTasks =
TableScanUtil.splitFiles(scan.planFiles(), splitSize);
+ try (CloseableIterable<CombinedScanTask> combinedScanTasks =
+ TableScanUtil.planTasks(fileScanTasks, splitSize, 1, 0)) {
+ combinedScanTasks.forEach(taskGrp ->
taskGrp.files().forEach(splitTask -> {
String dataFilePath = splitTask.file().path().toString();
Path finalDataFilePath =
S3Util.toScanRangeLocation(dataFilePath);
- IcebergSplit split = new IcebergSplit(finalDataFilePath,
splitTask.start(),
- splitTask.length(), fileSize, new String[0]);
+ IcebergSplit split = new IcebergSplit(
+ finalDataFilePath,
+ splitTask.start(),
+ splitTask.length(),
+ splitTask.file().fileSizeInBytes(),
+ new String[0]);
split.setFormatVersion(formatVersion);
if (formatVersion >= MIN_DELETE_FILE_SUPPORT_VERSION) {
split.setDeleteFileFilters(getDeleteFileFilters(splitTask));
}
split.setTableFormatType(TableFormatType.ICEBERG);
splits.add(split);
- }
+ }));
+ } catch (IOException e) {
+ throw new UserException(e.getMessage(), e.getCause());
}
+
return splits;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]