This is an automated email from the ASF dual-hosted git repository.

kxiao pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.0 by this push:
     new 80fda5a626f read-partitioned-iceberg-without-partition-path #25503 
(#26369)
80fda5a626f is described below

commit 80fda5a626f1a78583a8baa24a480d50f95b7738
Author: wuwenchi <[email protected]>
AuthorDate: Fri Nov 3 16:21:40 2023 +0800

    read-partitioned-iceberg-without-partition-path #25503 (#26369)
---
 be/src/vec/exec/format/table/iceberg_reader.cpp    | 12 +++++++++++-
 .../planner/external/iceberg/IcebergScanNode.java  | 22 +++++++++++++---------
 .../planner/external/iceberg/IcebergSplit.java     |  5 +++--
 ...est_external_catalog_iceberg_hadoop_catalog.out |  9 +++++++++
 ..._external_catalog_iceberg_hadoop_catalog.groovy |  6 ++++++
 5 files changed, 42 insertions(+), 12 deletions(-)

diff --git a/be/src/vec/exec/format/table/iceberg_reader.cpp 
b/be/src/vec/exec/format/table/iceberg_reader.cpp
index 95994723ae1..9f98a0ae3f4 100644
--- a/be/src/vec/exec/format/table/iceberg_reader.cpp
+++ b/be/src/vec/exec/format/table/iceberg_reader.cpp
@@ -561,9 +561,19 @@ void IcebergTableReader::_gen_file_col_names() {
         auto name = _file_col_names[i];
         auto iter = _table_col_to_file_col.find(name);
         if (iter == _table_col_to_file_col.end()) {
-            _all_required_col_names.emplace_back(name);
+            // If the user creates the iceberg table, directly append the 
parquet file that already exists,
+            // there is no 'iceberg.schema' field in the footer of parquet, 
the '_table_col_to_file_col' may be empty.
+            // Because we are ignoring case, so, it is converted to lowercase 
here
+            auto name_low = to_lower(name);
+            _all_required_col_names.emplace_back(name_low);
             if (_has_iceberg_schema) {
                 _not_in_file_col_names.emplace_back(name);
+            } else {
+                _table_col_to_file_col.emplace(name, name_low);
+                _file_col_to_table_col.emplace(name_low, name);
+                if (name != name_low) {
+                    _has_schema_change = true;
+                }
             }
         } else {
             _all_required_col_names.emplace_back(iter->second);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergScanNode.java
 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergScanNode.java
index 85a68aa785e..c8a9437e243 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergScanNode.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergScanNode.java
@@ -62,8 +62,8 @@ import org.apache.iceberg.HistoryEntry;
 import org.apache.iceberg.MetadataColumns;
 import org.apache.iceberg.PartitionField;
 import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.StructLike;
 import org.apache.iceberg.Table;
-import org.apache.iceberg.TableProperties;
 import org.apache.iceberg.TableScan;
 import org.apache.iceberg.exceptions.NotFoundException;
 import org.apache.iceberg.expressions.Expression;
@@ -86,7 +86,6 @@ import java.util.stream.Collectors;
 public class IcebergScanNode extends FileQueryScanNode {
 
     public static final int MIN_DELETE_FILE_SUPPORT_VERSION = 2;
-    public static final String DEFAULT_DATA_PATH = "/data/";
     private static final String TOTAL_RECORDS = "total-records";
     private static final String TOTAL_POSITION_DELETES = 
"total-position-deletes";
     private static final String TOTAL_EQUALITY_DELETES = 
"total-equality-deletes";
@@ -201,8 +200,6 @@ public class IcebergScanNode extends FileQueryScanNode {
         // Min split size is DEFAULT_SPLIT_SIZE(128MB).
         long splitSize = 
Math.max(ConnectContext.get().getSessionVariable().getFileSplitSize(), 
DEFAULT_SPLIT_SIZE);
         HashSet<String> partitionPathSet = new HashSet<>();
-        String dataPath = normalizeLocation(icebergTable.location()) + 
icebergTable.properties()
-                .getOrDefault(TableProperties.WRITE_DATA_LOCATION, 
DEFAULT_DATA_PATH);
         boolean isPartitionedTable = icebergTable.spec().isPartitioned();
 
         CloseableIterable<FileScanTask> fileScanTasks = 
TableScanUtil.splitFiles(scan.planFiles(), splitSize);
@@ -211,12 +208,18 @@ public class IcebergScanNode extends FileQueryScanNode {
             combinedScanTasks.forEach(taskGrp -> 
taskGrp.files().forEach(splitTask -> {
                 String dataFilePath = 
normalizeLocation(splitTask.file().path().toString());
 
-                // Counts the number of partitions read
+                List<String> partitionValues = new ArrayList<>();
                 if (isPartitionedTable) {
-                    int last = dataFilePath.lastIndexOf("/");
-                    if (last > 0) {
-                        
partitionPathSet.add(dataFilePath.substring(dataPath.length(), last));
+                    StructLike structLike = splitTask.file().partition();
+
+                    // set partitionValue for this IcebergSplit
+                    for (int i = 0; i < structLike.size(); i++) {
+                        String partition = String.valueOf(structLike.get(i, 
Object.class));
+                        partitionValues.add(partition);
                     }
+
+                    // Counts the number of partitions read
+                    partitionPathSet.add(structLike.toString());
                 }
 
                 Path finalDataFilePath = 
S3Util.toScanRangeLocation(dataFilePath, source.getCatalog().getProperties());
@@ -227,7 +230,8 @@ public class IcebergScanNode extends FileQueryScanNode {
                         splitTask.file().fileSizeInBytes(),
                         new String[0],
                         formatVersion,
-                        source.getCatalog().getProperties());
+                        source.getCatalog().getProperties(),
+                        partitionValues);
                 if (formatVersion >= MIN_DELETE_FILE_SUPPORT_VERSION) {
                     
split.setDeleteFileFilters(getDeleteFileFilters(splitTask));
                 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergSplit.java
 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergSplit.java
index 29deb293b3d..b58514dcf38 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergSplit.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergSplit.java
@@ -30,8 +30,9 @@ public class IcebergSplit extends FileSplit {
 
     // File path will be changed if the file is modified, so there's no need 
to get modification time.
     public IcebergSplit(Path file, long start, long length, long fileLength, 
String[] hosts,
-                        Integer formatVersion, Map<String, String> config) {
-        super(file, start, length, fileLength, hosts, null);
+                        Integer formatVersion, Map<String, String> config,
+                        List<String> partitionList) {
+        super(file, start, length, fileLength, hosts, partitionList);
         this.formatVersion = formatVersion;
         this.config = config;
     }
diff --git 
a/regression-test/data/external_table_p2/iceberg/test_external_catalog_iceberg_hadoop_catalog.out
 
b/regression-test/data/external_table_p2/iceberg/test_external_catalog_iceberg_hadoop_catalog.out
index fa1a58f6f19..aaa9037977d 100644
--- 
a/regression-test/data/external_table_p2/iceberg/test_external_catalog_iceberg_hadoop_catalog.out
+++ 
b/regression-test/data/external_table_p2/iceberg/test_external_catalog_iceberg_hadoop_catalog.out
@@ -15,3 +15,12 @@
 1      Customer#000000001      j5JsirBM9P      MOROCCO  0      MOROCCO AFRICA  
25-989-741-2988 BUILDING
 3      Customer#000000003      fkRGN8n ARGENTINA7      ARGENTINA       AMERICA 
11-719-748-3364 AUTOMOBILE
 5      Customer#000000005      hwBtxkoBF qSW4KrI       CANADA   5      CANADA  
AMERICA 13-750-942-6364 HOUSEHOLD
+
+-- !q04 --
+1      1970-01-03 09:02:03.000001      a
+1      1970-01-03 09:02:03.000001      b
+2      1970-01-03 09:02:04.000001      c
+2      1970-01-03 09:02:04.000001      d
+
+-- !q05 --
+463870
diff --git 
a/regression-test/suites/external_table_p2/iceberg/test_external_catalog_iceberg_hadoop_catalog.groovy
 
b/regression-test/suites/external_table_p2/iceberg/test_external_catalog_iceberg_hadoop_catalog.groovy
index b35a799b28d..396254b168f 100644
--- 
a/regression-test/suites/external_table_p2/iceberg/test_external_catalog_iceberg_hadoop_catalog.groovy
+++ 
b/regression-test/suites/external_table_p2/iceberg/test_external_catalog_iceberg_hadoop_catalog.groovy
@@ -37,7 +37,13 @@ suite("test_external_catalog_iceberg_hadoop_catalog", 
"p2,external,iceberg,exter
             qt_q03 """ select * from iceberg_hadoop_catalog order by c_custkey 
limit 3 """
         }
 
+        def q02 = {
+            qt_q04 """ select * from multi_partition2 order by val """
+            qt_q05 """ select count(*) from table_with_append_file where 
MAN_ID is not null """
+        }
+
         sql """ use `multi_catalog`; """
         q01()
+        q02()
     }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to