morningman commented on code in PR #53399:
URL: https://github.com/apache/doris/pull/53399#discussion_r2241050669


##########
fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonUtil.java:
##########
@@ -371,4 +379,68 @@ public static <T> String encodeObjectToString(T t) {
         }
     }
 
+    public static Map<String, String> getPartitionInfoMap(Table table, 
BinaryRow partitionValues, String timeZone) {
+        Map<String, String> partitionInfoMap = new HashMap<>();
+        List<String> partitionKeys = table.partitionKeys();
+        RowType partitionType = table.rowType().project(partitionKeys);
+        RowDataToObjectArrayConverter toObjectArrayConverter = new 
RowDataToObjectArrayConverter(
+                partitionType);
+        Object[] partitionValuesArray = 
toObjectArrayConverter.convert(partitionValues);
+        for (int i = 0; i < partitionKeys.size(); i++) {
+            try {
+                String partitionValue = 
serializePartitionValue(partitionType.getFields().get(i).type(),
+                        partitionValuesArray[i], timeZone);
+                partitionInfoMap.put(partitionKeys.get(i), partitionValue);
+            } catch (UnsupportedOperationException e) {
+                LOG.warn("Failed to serialize partition value for key {}: {}", 
partitionKeys.get(i), e.getMessage());
+                return null;
+            }
+        }
+        return partitionInfoMap;
+    }
+
+    private static String 
serializePartitionValue(org.apache.paimon.types.DataType type, Object value,
+            String timeZone) {
+        if (value == null) {
+            return "\\N";

Review Comment:
   ```suggestion
               return FeConstants.null_string;
   ```



##########
fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonUtil.java:
##########
@@ -371,4 +379,68 @@ public static <T> String encodeObjectToString(T t) {
         }
     }
 
+    public static Map<String, String> getPartitionInfoMap(Table table, 
BinaryRow partitionValues, String timeZone) {
+        Map<String, String> partitionInfoMap = new HashMap<>();
+        List<String> partitionKeys = table.partitionKeys();
+        RowType partitionType = table.rowType().project(partitionKeys);
+        RowDataToObjectArrayConverter toObjectArrayConverter = new 
RowDataToObjectArrayConverter(
+                partitionType);
+        Object[] partitionValuesArray = 
toObjectArrayConverter.convert(partitionValues);
+        for (int i = 0; i < partitionKeys.size(); i++) {
+            try {
+                String partitionValue = 
serializePartitionValue(partitionType.getFields().get(i).type(),
+                        partitionValuesArray[i], timeZone);
+                partitionInfoMap.put(partitionKeys.get(i), partitionValue);
+            } catch (UnsupportedOperationException e) {
+                LOG.warn("Failed to serialize partition value for key {}: {}", 
partitionKeys.get(i), e.getMessage());

Review Comment:
   Add table name



##########
fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergUtils.java:
##########
@@ -602,6 +609,72 @@ public static Type 
icebergTypeToDorisType(org.apache.iceberg.types.Type type) {
         }
     }
 
+    public static Map<String, String> getPartitionInfoMap(PartitionData 
partitionData, String timeZone) {
+        Map<String, String> partitionInfoMap = new HashMap<>();
+        List<NestedField> fields = 
partitionData.getPartitionType().asNestedType().fields();
+        for (int i = 0; i < fields.size(); i++) {
+            NestedField field = fields.get(i);
+            Object value = partitionData.get(i);
+            try {
+                String partitionString = serializePartitionValue(field.type(), 
value, timeZone);
+                partitionInfoMap.put(field.name(), partitionString);
+            } catch (UnsupportedOperationException e) {
+                LOG.warn("Failed to serialize partition value for field {}: 
{}", field.name(), e.getMessage());

Review Comment:
   add table name



##########
be/src/vec/exec/scan/file_scanner.h:
##########
@@ -192,6 +192,15 @@ class FileScanner : public Scanner {
             _partition_col_descs;
     std::unordered_map<std::string, VExprContextSPtr> _missing_col_descs;
 
+    // store all slot descriptors, used for initializing runtime filter 
partition prune context
+    std::unordered_map<std::string, const SlotDescriptor*> 
_all_col_name_to_slot_desc;

Review Comment:
   we should explain more about `_partition_slot_descs` and 
`_all_col_name_to_slot_desc`



##########
fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergUtils.java:
##########
@@ -602,6 +609,72 @@ public static Type 
icebergTypeToDorisType(org.apache.iceberg.types.Type type) {
         }
     }
 
+    public static Map<String, String> getPartitionInfoMap(PartitionData 
partitionData, String timeZone) {
+        Map<String, String> partitionInfoMap = new HashMap<>();
+        List<NestedField> fields = 
partitionData.getPartitionType().asNestedType().fields();
+        for (int i = 0; i < fields.size(); i++) {
+            NestedField field = fields.get(i);
+            Object value = partitionData.get(i);
+            try {
+                String partitionString = serializePartitionValue(field.type(), 
value, timeZone);
+                partitionInfoMap.put(field.name(), partitionString);
+            } catch (UnsupportedOperationException e) {
+                LOG.warn("Failed to serialize partition value for field {}: 
{}", field.name(), e.getMessage());
+                return null;
+            }
+        }
+        return partitionInfoMap;
+    }
+
+    private static String 
serializePartitionValue(org.apache.iceberg.types.Type type, Object value, 
String timeZone) {
+        if (value == null) {
+            return "\\N";

Review Comment:
   ```suggestion
              return FeConstants.null_string;;
   ```



##########
be/src/vec/exec/scan/file_scanner.cpp:
##########
@@ -913,26 +916,32 @@ Status FileScanner::_get_next_reader() {
         const TFileRangeDesc& range = _current_range;
         _current_range_path = range.path;
 
-        if (!_partition_slot_descs.empty()) {
-            // we need get partition columns first for runtime filter 
partition pruning
-            RETURN_IF_ERROR(_generate_parititon_columns());
+        // try to get the partition columns from the range
+        RETURN_IF_ERROR(_generate_partition_columns());
+        // try to get the data lake partition columns from the range
+        RETURN_IF_ERROR(_generate_data_lake_partition_columns());
 
-            if (_state->query_options().enable_runtime_filter_partition_prune) 
{
-                // if enable_runtime_filter_partition_prune is true, we need 
to check whether this range can be filtered out
-                // by runtime filter partition prune
-                if (_push_down_conjuncts.size() < _conjuncts.size()) {
-                    // there are new runtime filters, need to re-init runtime 
filter partition pruning ctxs
-                    _init_runtime_filter_partition_prune_ctxs();
-                }
+        const auto& partition_col_descs = !_partition_col_descs.empty()

Review Comment:
   I think we can merge `_partition_col_descs` and 
`_data_lake_partition_col_descs`.
   Because we only need one of them. The only diff is that one is generated 
from path, one is from metadata.



##########
be/src/vec/exec/scan/file_scanner.cpp:
##########
@@ -1480,6 +1489,21 @@ Status FileScanner::_generate_parititon_columns() {
     return Status::OK();
 }
 
+Status FileScanner::_generate_data_lake_partition_columns() {
+    _data_lake_partition_col_descs.clear();
+    if (_current_range.__isset.data_lake_partition_values) {
+        const auto& partition_values = 
_current_range.data_lake_partition_values;
+        for (const auto& [key, value] : partition_values) {
+            if (_all_col_name_to_slot_desc.find(key) == 
_all_col_name_to_slot_desc.end()) {

Review Comment:
   `_all_col_name_to_slot_desc.find(key)` find the key, and 
`_all_col_name_to_slot_desc[key]` find again, should optimize to find only once



##########
fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java:
##########
@@ -384,6 +384,9 @@ private void getPartitionSplits(HivePartition partition, 
List<Split> splits) thr
                 HudiSplit hudiSplit = new HudiSplit(locationPath, 0, fileSize, 
fileSize,
                         new String[0], partition.getPartitionValues());
                 hudiSplit.setTableFormatType(TableFormatType.HUDI);
+                if (sessionVariable.isEnableRuntimeFilterPartitionPrune()) {
+                    
hudiSplit.setHudiPartitionValues(HudiUtils.getPartitionInfoMap(hmsTable, 
partition));

Review Comment:
   This is unresolved?



##########
be/src/vec/exec/scan/file_scanner.cpp:
##########
@@ -1480,6 +1489,21 @@ Status FileScanner::_generate_parititon_columns() {
     return Status::OK();
 }
 
+Status FileScanner::_generate_data_lake_partition_columns() {
+    _data_lake_partition_col_descs.clear();
+    if (_current_range.__isset.data_lake_partition_values) {

Review Comment:
   data_lake_partition_values maybe null?
   Because on FE side, if we failed to deserialize the partition value, a null 
will be returned and set to `data_lake_partition_values`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to