This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 94c9dce308 [fix](iceberg) fix iceberg's filter expr to filter file 
(#22740)
94c9dce308 is described below

commit 94c9dce308a4815988d9617b6f63cdb60864f684
Author: wuwenchi <[email protected]>
AuthorDate: Thu Aug 10 18:20:57 2023 +0800

    [fix](iceberg) fix iceberg's filter expr to filter file (#22740)
    
    Fix iceberg's filter expr to filter file, and add counts the number of 
partitions read
---
 .../doris/external/iceberg/util/IcebergUtils.java  | 27 ++++------------------
 .../planner/external/iceberg/IcebergScanNode.java  | 27 ++++++++++++++++++----
 2 files changed, 27 insertions(+), 27 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/external/iceberg/util/IcebergUtils.java
 
b/fe/fe-core/src/main/java/org/apache/doris/external/iceberg/util/IcebergUtils.java
index 68e761e710..6b26616b90 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/external/iceberg/util/IcebergUtils.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/external/iceberg/util/IcebergUtils.java
@@ -39,6 +39,7 @@ import org.apache.doris.catalog.Type;
 import org.apache.doris.common.Config;
 import org.apache.doris.common.DdlException;
 import org.apache.doris.common.UserException;
+import org.apache.doris.common.util.TimeUtils;
 import org.apache.doris.thrift.TExprOpcode;
 
 import com.google.common.base.Preconditions;
@@ -59,12 +60,7 @@ import org.apache.iceberg.types.Types;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
-import java.time.LocalDateTime;
-import java.time.ZoneId;
-import java.time.format.DateTimeFormatter;
-import java.time.format.DateTimeParseException;
 import java.util.ArrayList;
-import java.util.Date;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -83,6 +79,7 @@ public class IcebergUtils {
             return 0;
         }
     };
+    static long MILLIS_TO_NANO_TIME = 1000;
 
     /**
      * Create Iceberg schema from Doris ColumnDef.
@@ -341,7 +338,7 @@ public class IcebergUtils {
             }
             List<Object> valueList = new ArrayList<>();
             for (int i = 1; i < inExpr.getChildren().size(); ++i) {
-                if (!(inExpr.getChild(i) instanceof  LiteralExpr)) {
+                if (!(inExpr.getChild(i) instanceof LiteralExpr)) {
                     return null;
                 }
                 LiteralExpr literalExpr = (LiteralExpr) inExpr.getChild(i);
@@ -372,23 +369,7 @@ public class IcebergUtils {
             return boolLiteral.getValue();
         } else if (expr instanceof DateLiteral) {
             DateLiteral dateLiteral = (DateLiteral) expr;
-            DateTimeFormatter formatter = 
DateTimeFormatter.ofPattern("yyyyMMddHHmmss")
-                    .withZone(ZoneId.systemDefault());
-            StringBuilder sb = new StringBuilder();
-            sb.append(dateLiteral.getYear())
-                    .append(dateLiteral.getMonth())
-                    .append(dateLiteral.getDay())
-                    .append(dateLiteral.getHour())
-                    .append(dateLiteral.getMinute())
-                    .append(dateLiteral.getSecond());
-            Date date;
-            try {
-                date = Date.from(
-                        LocalDateTime.parse(sb.toString(), 
formatter).atZone(ZoneId.systemDefault()).toInstant());
-            } catch (DateTimeParseException e) {
-                return null;
-            }
-            return date.getTime();
+            return dateLiteral.unixTimestamp(TimeUtils.getTimeZone()) * 
MILLIS_TO_NANO_TIME;
         } else if (expr instanceof DecimalLiteral) {
             DecimalLiteral decimalLiteral = (DecimalLiteral) expr;
             return decimalLiteral.getValue();
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergScanNode.java
 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergScanNode.java
index ef69536b78..5fa64fa2b6 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergScanNode.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergScanNode.java
@@ -56,6 +56,7 @@ import org.apache.iceberg.HistoryEntry;
 import org.apache.iceberg.MetadataColumns;
 import org.apache.iceberg.PartitionField;
 import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
 import org.apache.iceberg.TableScan;
 import org.apache.iceberg.exceptions.NotFoundException;
 import org.apache.iceberg.expressions.Expression;
@@ -67,6 +68,7 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.time.Instant;
 import java.util.ArrayList;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
@@ -76,6 +78,7 @@ import java.util.stream.Collectors;
 public class IcebergScanNode extends FileQueryScanNode {
 
     public static final int MIN_DELETE_FILE_SUPPORT_VERSION = 2;
+    public static final String DEFAULT_DATA_PATH = "/data/";
 
     private IcebergSource source;
     private Table icebergTable;
@@ -181,11 +184,25 @@ public class IcebergScanNode extends FileQueryScanNode {
         int formatVersion = ((BaseTable) 
icebergTable).operations().current().formatVersion();
         // Min split size is DEFAULT_SPLIT_SIZE(128MB).
         long splitSize = 
Math.max(ConnectContext.get().getSessionVariable().getFileSplitSize(), 
DEFAULT_SPLIT_SIZE);
+        HashSet<String> partitionPathSet = new HashSet<>();
+        String dataPath = icebergTable.location() + icebergTable.properties()
+                .getOrDefault(TableProperties.WRITE_DATA_LOCATION, 
DEFAULT_DATA_PATH);
+        boolean isPartitionedTable = icebergTable.spec().isPartitioned();
+
         CloseableIterable<FileScanTask> fileScanTasks = 
TableScanUtil.splitFiles(scan.planFiles(), splitSize);
         try (CloseableIterable<CombinedScanTask> combinedScanTasks =
-                 TableScanUtil.planTasks(fileScanTasks, splitSize, 1, 0)) {
+                TableScanUtil.planTasks(fileScanTasks, splitSize, 1, 0)) {
             combinedScanTasks.forEach(taskGrp -> 
taskGrp.files().forEach(splitTask -> {
                 String dataFilePath = splitTask.file().path().toString();
+
+                // Counts the number of partitions read
+                if (isPartitionedTable) {
+                    int last = dataFilePath.lastIndexOf("/");
+                    if (last > 0) {
+                        
partitionPathSet.add(dataFilePath.substring(dataPath.length(), last));
+                    }
+                }
+
                 Path finalDataFilePath = 
S3Util.toScanRangeLocation(dataFilePath, source.getCatalog().getProperties());
                 IcebergSplit split = new IcebergSplit(
                         finalDataFilePath,
@@ -205,6 +222,8 @@ public class IcebergScanNode extends FileQueryScanNode {
             throw new UserException(e.getMessage(), e.getCause());
         }
 
+        readPartitionNum = partitionPathSet.size();
+
         return splits;
     }
 
@@ -242,7 +261,7 @@ public class IcebergScanNode extends FileQueryScanNode {
         }
         if (latestHistory == null) {
             throw new NotFoundException("No version history at or before "
-                + Instant.ofEpochMilli(asOfTimestamp));
+                    + Instant.ofEpochMilli(asOfTimestamp));
         }
         return latestHistory.snapshotId();
     }
@@ -278,7 +297,7 @@ public class IcebergScanNode extends FileQueryScanNode {
     @Override
     public TFileType getLocationType(String location) throws UserException {
         return getTFileType(location).orElseThrow(() ->
-            new DdlException("Unknown file location " + location + " for 
iceberg table " + icebergTable.name()));
+                new DdlException("Unknown file location " + location + " for 
iceberg table " + icebergTable.name()));
     }
 
     @Override
@@ -303,7 +322,7 @@ public class IcebergScanNode extends FileQueryScanNode {
     @Override
     public List<String> getPathPartitionKeys() throws UserException {
         return 
icebergTable.spec().fields().stream().map(PartitionField::name).map(String::toLowerCase)
-            .collect(Collectors.toList());
+                .collect(Collectors.toList());
     }
 
     @Override


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to