This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 94c9dce308 [fix](iceberg) fix iceberg's filter expr to filter file
(#22740)
94c9dce308 is described below
commit 94c9dce308a4815988d9617b6f63cdb60864f684
Author: wuwenchi <[email protected]>
AuthorDate: Thu Aug 10 18:20:57 2023 +0800
[fix](iceberg) fix iceberg's filter expr to filter file (#22740)
Fix iceberg's filter expr to filter file, and add counts the number of
partitions read
---
.../doris/external/iceberg/util/IcebergUtils.java | 27 ++++------------------
.../planner/external/iceberg/IcebergScanNode.java | 27 ++++++++++++++++++----
2 files changed, 27 insertions(+), 27 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/external/iceberg/util/IcebergUtils.java
b/fe/fe-core/src/main/java/org/apache/doris/external/iceberg/util/IcebergUtils.java
index 68e761e710..6b26616b90 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/external/iceberg/util/IcebergUtils.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/external/iceberg/util/IcebergUtils.java
@@ -39,6 +39,7 @@ import org.apache.doris.catalog.Type;
import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.UserException;
+import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.thrift.TExprOpcode;
import com.google.common.base.Preconditions;
@@ -59,12 +60,7 @@ import org.apache.iceberg.types.Types;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
-import java.time.LocalDateTime;
-import java.time.ZoneId;
-import java.time.format.DateTimeFormatter;
-import java.time.format.DateTimeParseException;
import java.util.ArrayList;
-import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -83,6 +79,7 @@ public class IcebergUtils {
return 0;
}
};
+ static long MILLIS_TO_NANO_TIME = 1000;
/**
* Create Iceberg schema from Doris ColumnDef.
@@ -341,7 +338,7 @@ public class IcebergUtils {
}
List<Object> valueList = new ArrayList<>();
for (int i = 1; i < inExpr.getChildren().size(); ++i) {
- if (!(inExpr.getChild(i) instanceof LiteralExpr)) {
+ if (!(inExpr.getChild(i) instanceof LiteralExpr)) {
return null;
}
LiteralExpr literalExpr = (LiteralExpr) inExpr.getChild(i);
@@ -372,23 +369,7 @@ public class IcebergUtils {
return boolLiteral.getValue();
} else if (expr instanceof DateLiteral) {
DateLiteral dateLiteral = (DateLiteral) expr;
- DateTimeFormatter formatter =
DateTimeFormatter.ofPattern("yyyyMMddHHmmss")
- .withZone(ZoneId.systemDefault());
- StringBuilder sb = new StringBuilder();
- sb.append(dateLiteral.getYear())
- .append(dateLiteral.getMonth())
- .append(dateLiteral.getDay())
- .append(dateLiteral.getHour())
- .append(dateLiteral.getMinute())
- .append(dateLiteral.getSecond());
- Date date;
- try {
- date = Date.from(
- LocalDateTime.parse(sb.toString(),
formatter).atZone(ZoneId.systemDefault()).toInstant());
- } catch (DateTimeParseException e) {
- return null;
- }
- return date.getTime();
+ return dateLiteral.unixTimestamp(TimeUtils.getTimeZone()) *
MILLIS_TO_NANO_TIME;
} else if (expr instanceof DecimalLiteral) {
DecimalLiteral decimalLiteral = (DecimalLiteral) expr;
return decimalLiteral.getValue();
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergScanNode.java
index ef69536b78..5fa64fa2b6 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergScanNode.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergScanNode.java
@@ -56,6 +56,7 @@ import org.apache.iceberg.HistoryEntry;
import org.apache.iceberg.MetadataColumns;
import org.apache.iceberg.PartitionField;
import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
import org.apache.iceberg.TableScan;
import org.apache.iceberg.exceptions.NotFoundException;
import org.apache.iceberg.expressions.Expression;
@@ -67,6 +68,7 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.time.Instant;
import java.util.ArrayList;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@@ -76,6 +78,7 @@ import java.util.stream.Collectors;
public class IcebergScanNode extends FileQueryScanNode {
public static final int MIN_DELETE_FILE_SUPPORT_VERSION = 2;
+ public static final String DEFAULT_DATA_PATH = "/data/";
private IcebergSource source;
private Table icebergTable;
@@ -181,11 +184,25 @@ public class IcebergScanNode extends FileQueryScanNode {
int formatVersion = ((BaseTable)
icebergTable).operations().current().formatVersion();
// Min split size is DEFAULT_SPLIT_SIZE(128MB).
long splitSize =
Math.max(ConnectContext.get().getSessionVariable().getFileSplitSize(),
DEFAULT_SPLIT_SIZE);
+ HashSet<String> partitionPathSet = new HashSet<>();
+ String dataPath = icebergTable.location() + icebergTable.properties()
+ .getOrDefault(TableProperties.WRITE_DATA_LOCATION,
DEFAULT_DATA_PATH);
+ boolean isPartitionedTable = icebergTable.spec().isPartitioned();
+
CloseableIterable<FileScanTask> fileScanTasks =
TableScanUtil.splitFiles(scan.planFiles(), splitSize);
try (CloseableIterable<CombinedScanTask> combinedScanTasks =
- TableScanUtil.planTasks(fileScanTasks, splitSize, 1, 0)) {
+ TableScanUtil.planTasks(fileScanTasks, splitSize, 1, 0)) {
combinedScanTasks.forEach(taskGrp ->
taskGrp.files().forEach(splitTask -> {
String dataFilePath = splitTask.file().path().toString();
+
+ // Counts the number of partitions read
+ if (isPartitionedTable) {
+ int last = dataFilePath.lastIndexOf("/");
+ if (last > 0) {
+
partitionPathSet.add(dataFilePath.substring(dataPath.length(), last));
+ }
+ }
+
Path finalDataFilePath =
S3Util.toScanRangeLocation(dataFilePath, source.getCatalog().getProperties());
IcebergSplit split = new IcebergSplit(
finalDataFilePath,
@@ -205,6 +222,8 @@ public class IcebergScanNode extends FileQueryScanNode {
throw new UserException(e.getMessage(), e.getCause());
}
+ readPartitionNum = partitionPathSet.size();
+
return splits;
}
@@ -242,7 +261,7 @@ public class IcebergScanNode extends FileQueryScanNode {
}
if (latestHistory == null) {
throw new NotFoundException("No version history at or before "
- + Instant.ofEpochMilli(asOfTimestamp));
+ + Instant.ofEpochMilli(asOfTimestamp));
}
return latestHistory.snapshotId();
}
@@ -278,7 +297,7 @@ public class IcebergScanNode extends FileQueryScanNode {
@Override
public TFileType getLocationType(String location) throws UserException {
return getTFileType(location).orElseThrow(() ->
- new DdlException("Unknown file location " + location + " for
iceberg table " + icebergTable.name()));
+ new DdlException("Unknown file location " + location + " for
iceberg table " + icebergTable.name()));
}
@Override
@@ -303,7 +322,7 @@ public class IcebergScanNode extends FileQueryScanNode {
@Override
public List<String> getPathPartitionKeys() throws UserException {
return
icebergTable.spec().fields().stream().map(PartitionField::name).map(String::toLowerCase)
- .collect(Collectors.toList());
+ .collect(Collectors.toList());
}
@Override
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]