This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new b98744ae907 [Bug](iceberg)fix read partitioned iceberg without
partition path (#25503)
b98744ae907 is described below
commit b98744ae907bf22cbe496473a81c0bf8d22508fa
Author: wuwenchi <[email protected]>
AuthorDate: Tue Oct 31 18:09:53 2023 +0800
[Bug](iceberg)fix read partitioned iceberg without partition path (#25503)
Iceberg does not require partition values to exist on file paths, so we
should get the partition value from `PartitionScanTask.partition`.
---
be/src/vec/exec/format/table/iceberg_reader.cpp | 12 +++++++++++-
.../org/apache/doris/analysis/SlotDescriptor.java | 3 ++-
.../planner/external/iceberg/IcebergScanNode.java | 22 +++++++++++++---------
.../planner/external/iceberg/IcebergSplit.java | 5 +++--
...est_external_catalog_iceberg_hadoop_catalog.out | 9 +++++++++
..._external_catalog_iceberg_hadoop_catalog.groovy | 6 ++++++
6 files changed, 44 insertions(+), 13 deletions(-)
diff --git a/be/src/vec/exec/format/table/iceberg_reader.cpp
b/be/src/vec/exec/format/table/iceberg_reader.cpp
index 01333545658..c4bec00f3d6 100644
--- a/be/src/vec/exec/format/table/iceberg_reader.cpp
+++ b/be/src/vec/exec/format/table/iceberg_reader.cpp
@@ -562,9 +562,19 @@ void IcebergTableReader::_gen_file_col_names() {
auto name = _file_col_names[i];
auto iter = _table_col_to_file_col.find(name);
if (iter == _table_col_to_file_col.end()) {
- _all_required_col_names.emplace_back(name);
+ // If the user creates the iceberg table, directly append the
parquet file that already exists,
+ // there is no 'iceberg.schema' field in the footer of parquet,
the '_table_col_to_file_col' may be empty.
+ // Because we are ignoring case, so, it is converted to lowercase
here
+ auto name_low = to_lower(name);
+ _all_required_col_names.emplace_back(name_low);
if (_has_iceberg_schema) {
_not_in_file_col_names.emplace_back(name);
+ } else {
+ _table_col_to_file_col.emplace(name, name_low);
+ _file_col_to_table_col.emplace(name_low, name);
+ if (name != name_low) {
+ _has_schema_change = true;
+ }
}
} else {
_all_required_col_names.emplace_back(iter->second);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/analysis/SlotDescriptor.java
b/fe/fe-core/src/main/java/org/apache/doris/analysis/SlotDescriptor.java
index 6384dad8d7b..c5291414b1d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/SlotDescriptor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/SlotDescriptor.java
@@ -296,7 +296,8 @@ public class SlotDescriptor {
public TSlotDescriptor toThrift() {
// Non-nullable slots will have 0 for the byte offset and -1 for the
bit mask
TSlotDescriptor tSlotDescriptor = new TSlotDescriptor(id.asInt(),
parent.getId().asInt(), type.toThrift(), -1,
- byteOffset, 0, getIsNullable() ? 0 : -1, ((column != null) ?
column.getNonShadowName() : ""), slotIdx,
+ byteOffset, 0, getIsNullable() ? 0 : -1,
+ ((column != null) ? column.getNonShadowName() : ""), slotIdx,
isMaterialized);
tSlotDescriptor.setNeedMaterialize(needMaterialize);
tSlotDescriptor.setIsAutoIncrement(isAutoInc);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergScanNode.java
index 85a68aa785e..c8a9437e243 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergScanNode.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergScanNode.java
@@ -62,8 +62,8 @@ import org.apache.iceberg.HistoryEntry;
import org.apache.iceberg.MetadataColumns;
import org.apache.iceberg.PartitionField;
import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.StructLike;
import org.apache.iceberg.Table;
-import org.apache.iceberg.TableProperties;
import org.apache.iceberg.TableScan;
import org.apache.iceberg.exceptions.NotFoundException;
import org.apache.iceberg.expressions.Expression;
@@ -86,7 +86,6 @@ import java.util.stream.Collectors;
public class IcebergScanNode extends FileQueryScanNode {
public static final int MIN_DELETE_FILE_SUPPORT_VERSION = 2;
- public static final String DEFAULT_DATA_PATH = "/data/";
private static final String TOTAL_RECORDS = "total-records";
private static final String TOTAL_POSITION_DELETES =
"total-position-deletes";
private static final String TOTAL_EQUALITY_DELETES =
"total-equality-deletes";
@@ -201,8 +200,6 @@ public class IcebergScanNode extends FileQueryScanNode {
// Min split size is DEFAULT_SPLIT_SIZE(128MB).
long splitSize =
Math.max(ConnectContext.get().getSessionVariable().getFileSplitSize(),
DEFAULT_SPLIT_SIZE);
HashSet<String> partitionPathSet = new HashSet<>();
- String dataPath = normalizeLocation(icebergTable.location()) +
icebergTable.properties()
- .getOrDefault(TableProperties.WRITE_DATA_LOCATION,
DEFAULT_DATA_PATH);
boolean isPartitionedTable = icebergTable.spec().isPartitioned();
CloseableIterable<FileScanTask> fileScanTasks =
TableScanUtil.splitFiles(scan.planFiles(), splitSize);
@@ -211,12 +208,18 @@ public class IcebergScanNode extends FileQueryScanNode {
combinedScanTasks.forEach(taskGrp ->
taskGrp.files().forEach(splitTask -> {
String dataFilePath =
normalizeLocation(splitTask.file().path().toString());
- // Counts the number of partitions read
+ List<String> partitionValues = new ArrayList<>();
if (isPartitionedTable) {
- int last = dataFilePath.lastIndexOf("/");
- if (last > 0) {
-
partitionPathSet.add(dataFilePath.substring(dataPath.length(), last));
+ StructLike structLike = splitTask.file().partition();
+
+ // set partitionValue for this IcebergSplit
+ for (int i = 0; i < structLike.size(); i++) {
+ String partition = String.valueOf(structLike.get(i,
Object.class));
+ partitionValues.add(partition);
}
+
+ // Counts the number of partitions read
+ partitionPathSet.add(structLike.toString());
}
Path finalDataFilePath =
S3Util.toScanRangeLocation(dataFilePath, source.getCatalog().getProperties());
@@ -227,7 +230,8 @@ public class IcebergScanNode extends FileQueryScanNode {
splitTask.file().fileSizeInBytes(),
new String[0],
formatVersion,
- source.getCatalog().getProperties());
+ source.getCatalog().getProperties(),
+ partitionValues);
if (formatVersion >= MIN_DELETE_FILE_SUPPORT_VERSION) {
split.setDeleteFileFilters(getDeleteFileFilters(splitTask));
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergSplit.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergSplit.java
index 29deb293b3d..b58514dcf38 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergSplit.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergSplit.java
@@ -30,8 +30,9 @@ public class IcebergSplit extends FileSplit {
// File path will be changed if the file is modified, so there's no need
to get modification time.
public IcebergSplit(Path file, long start, long length, long fileLength,
String[] hosts,
- Integer formatVersion, Map<String, String> config) {
- super(file, start, length, fileLength, hosts, null);
+ Integer formatVersion, Map<String, String> config,
+ List<String> partitionList) {
+ super(file, start, length, fileLength, hosts, partitionList);
this.formatVersion = formatVersion;
this.config = config;
}
diff --git
a/regression-test/data/external_table_p2/iceberg/test_external_catalog_iceberg_hadoop_catalog.out
b/regression-test/data/external_table_p2/iceberg/test_external_catalog_iceberg_hadoop_catalog.out
index fa1a58f6f19..aaa9037977d 100644
---
a/regression-test/data/external_table_p2/iceberg/test_external_catalog_iceberg_hadoop_catalog.out
+++
b/regression-test/data/external_table_p2/iceberg/test_external_catalog_iceberg_hadoop_catalog.out
@@ -15,3 +15,12 @@
1 Customer#000000001 j5JsirBM9P MOROCCO 0 MOROCCO AFRICA
25-989-741-2988 BUILDING
3 Customer#000000003 fkRGN8n ARGENTINA7 ARGENTINA AMERICA
11-719-748-3364 AUTOMOBILE
5 Customer#000000005 hwBtxkoBF qSW4KrI CANADA 5 CANADA
AMERICA 13-750-942-6364 HOUSEHOLD
+
+-- !q04 --
+1 1970-01-03 09:02:03.000001 a
+1 1970-01-03 09:02:03.000001 b
+2 1970-01-03 09:02:04.000001 c
+2 1970-01-03 09:02:04.000001 d
+
+-- !q05 --
+463870
diff --git
a/regression-test/suites/external_table_p2/iceberg/test_external_catalog_iceberg_hadoop_catalog.groovy
b/regression-test/suites/external_table_p2/iceberg/test_external_catalog_iceberg_hadoop_catalog.groovy
index b35a799b28d..12ec14992ec 100644
---
a/regression-test/suites/external_table_p2/iceberg/test_external_catalog_iceberg_hadoop_catalog.groovy
+++
b/regression-test/suites/external_table_p2/iceberg/test_external_catalog_iceberg_hadoop_catalog.groovy
@@ -36,8 +36,14 @@ suite("test_external_catalog_iceberg_hadoop_catalog",
"p2,external,iceberg,exter
qt_q02 """ select c_custkey from iceberg_hadoop_catalog group by
c_custkey order by c_custkey limit 7 """
qt_q03 """ select * from iceberg_hadoop_catalog order by c_custkey
limit 3 """
}
+
+ def q02 = {
+ qt_q04 """ select * from multi_partition2 order by val """
+ qt_q05 """ select count(*) from table_with_append_file where
MAN_ID is not null """
+ }
sql """ use `multi_catalog`; """
q01()
+ q02()
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]