morningman commented on code in PR #43255:
URL: https://github.com/apache/doris/pull/43255#discussion_r1835712649


##########
be/src/vec/exec/format/orc/vorc_reader.cpp:
##########
@@ -558,190 +562,316 @@ std::tuple<bool, orc::Literal> 
convert_to_orc_literal(const orc::Type* type, con
     }
 }
 
-template <PrimitiveType primitive_type>
-std::vector<OrcPredicate> value_range_to_predicate(
-        const ColumnValueRange<primitive_type>& col_val_range, const 
orc::Type* type,
-        std::vector<orc::TypeKind>* unsupported_pushdown_types) {
-    std::vector<OrcPredicate> predicates;
-
-    PrimitiveType src_type = OrcReader::convert_to_doris_type(type).type;
-    if (src_type != primitive_type) {
-        if (!(is_string_type(src_type) && is_string_type(primitive_type))) {
-            // not support schema change
-            return predicates;
-        }
-    }
-
-    if (unsupported_pushdown_types != nullptr) {
-        for (vector<orc::TypeKind>::iterator it = 
unsupported_pushdown_types->begin();
-             it != unsupported_pushdown_types->end(); ++it) {
-            if (*it == type->getKind()) {
-                // Unsupported type
-                return predicates;
-            }
-        }
+std::tuple<bool, orc::Literal, orc::PredicateDataType> 
OrcReader::_make_orc_leteral(
+        const VSlotRef* slot_ref, const VLiteral* literal) {
+    auto literal_data = literal->get_column_ptr()->get_data_at(0);
+    auto* slot = _tuple_descriptor->slots()[slot_ref->column_id()];
+    auto slot_type = slot->type();
+    const auto* orc_type = 
_type_map[_col_name_to_file_col_name[slot->col_name()]];
+    const auto predicate_type = 
TYPEKIND_TO_PREDICATE_TYPE[orc_type->getKind()];
+    switch (slot_type.type) {
+#define M(NAME)                                                                
\
+    case TYPE_##NAME: {                                                        
\
+        auto [valid, orc_literal] = convert_to_orc_literal<TYPE_##NAME>(       
\
+                orc_type, literal_data, slot_type.precision, slot_type.scale); 
\
+        return std::make_tuple(valid, orc_literal, predicate_type);            
\
+    }
+#define APPLY_FOR_PRIMITIVE_TYPE(M) \
+    M(TINYINT)                      \
+    M(SMALLINT)                     \
+    M(INT)                          \
+    M(BIGINT)                       \
+    M(LARGEINT)                     \
+    M(CHAR)                         \
+    M(DATE)                         \
+    M(DATETIME)                     \
+    M(DATEV2)                       \
+    M(DATETIMEV2)                   \
+    M(VARCHAR)                      \
+    M(STRING)                       \
+    M(HLL)                          \
+    M(DECIMAL32)                    \
+    M(DECIMAL64)                    \
+    M(DECIMAL128I)                  \
+    M(DECIMAL256)                   \
+    M(DECIMALV2)                    \
+    M(BOOLEAN)                      \
+    M(IPV4)                         \
+    M(IPV6)
+        APPLY_FOR_PRIMITIVE_TYPE(M)
+#undef M
+    default: {
+        VLOG_CRITICAL << "Unsupported Convert Orc Literal [ColName=" << 
slot->col_name() << "]";
+        return std::make_tuple(false, orc::Literal(false), predicate_type);
     }
-
-    orc::PredicateDataType predicate_data_type;
-    auto type_it = TYPEKIND_TO_PREDICATE_TYPE.find(type->getKind());
-    if (type_it == TYPEKIND_TO_PREDICATE_TYPE.end()) {
-        // Unsupported type
-        return predicates;
-    } else {
-        predicate_data_type = type_it->second;
     }
+}
 
-    if (col_val_range.is_fixed_value_range()) {
-        OrcPredicate in_predicate;
-        in_predicate.col_name = col_val_range.column_name();
-        in_predicate.data_type = predicate_data_type;
-        in_predicate.op = SQLFilterOp::FILTER_IN;
-        for (const auto& value : col_val_range.get_fixed_value_set()) {
-            auto [valid, literal] = convert_to_orc_literal<primitive_type>(
-                    type, &value, col_val_range.precision(), 
col_val_range.scale());
-            if (valid) {
-                in_predicate.literals.push_back(literal);
-            }
-        }
-        if (!in_predicate.literals.empty()) {
-            predicates.emplace_back(in_predicate);
+// check if the expr can be pushed down to orc reader
+bool OrcReader::_check_expr_can_push_down(const VExprSPtr& expr) {
+    DCHECK_NOTNULL(expr);
+    switch (expr->op()) {
+    case TExprOpcode::COMPOUND_AND:
+    case TExprOpcode::COMPOUND_OR:

Review Comment:
   I have a question that, for example: `A or B`. if `B` can not be pushed 
down, then `A or B` can not be pushed down.
   Why you say that for `OR`, if "at least one child can be pushed down", then 
return true?



##########
be/src/vec/exec/format/orc/vorc_reader.cpp:
##########
@@ -558,190 +562,316 @@ std::tuple<bool, orc::Literal> 
convert_to_orc_literal(const orc::Type* type, con
     }
 }
 
-template <PrimitiveType primitive_type>
-std::vector<OrcPredicate> value_range_to_predicate(
-        const ColumnValueRange<primitive_type>& col_val_range, const 
orc::Type* type,
-        std::vector<orc::TypeKind>* unsupported_pushdown_types) {
-    std::vector<OrcPredicate> predicates;
-
-    PrimitiveType src_type = OrcReader::convert_to_doris_type(type).type;
-    if (src_type != primitive_type) {
-        if (!(is_string_type(src_type) && is_string_type(primitive_type))) {
-            // not support schema change
-            return predicates;
-        }
-    }
-
-    if (unsupported_pushdown_types != nullptr) {
-        for (vector<orc::TypeKind>::iterator it = 
unsupported_pushdown_types->begin();
-             it != unsupported_pushdown_types->end(); ++it) {
-            if (*it == type->getKind()) {
-                // Unsupported type
-                return predicates;
-            }
-        }
+std::tuple<bool, orc::Literal, orc::PredicateDataType> 
OrcReader::_make_orc_leteral(
+        const VSlotRef* slot_ref, const VLiteral* literal) {
+    auto literal_data = literal->get_column_ptr()->get_data_at(0);
+    auto* slot = _tuple_descriptor->slots()[slot_ref->column_id()];
+    auto slot_type = slot->type();
+    const auto* orc_type = 
_type_map[_col_name_to_file_col_name[slot->col_name()]];
+    const auto predicate_type = 
TYPEKIND_TO_PREDICATE_TYPE[orc_type->getKind()];
+    switch (slot_type.type) {
+#define M(NAME)                                                                
\
+    case TYPE_##NAME: {                                                        
\
+        auto [valid, orc_literal] = convert_to_orc_literal<TYPE_##NAME>(       
\
+                orc_type, literal_data, slot_type.precision, slot_type.scale); 
\
+        return std::make_tuple(valid, orc_literal, predicate_type);            
\
+    }
+#define APPLY_FOR_PRIMITIVE_TYPE(M) \
+    M(TINYINT)                      \
+    M(SMALLINT)                     \
+    M(INT)                          \
+    M(BIGINT)                       \
+    M(LARGEINT)                     \
+    M(CHAR)                         \
+    M(DATE)                         \
+    M(DATETIME)                     \
+    M(DATEV2)                       \
+    M(DATETIMEV2)                   \
+    M(VARCHAR)                      \
+    M(STRING)                       \
+    M(HLL)                          \
+    M(DECIMAL32)                    \
+    M(DECIMAL64)                    \
+    M(DECIMAL128I)                  \
+    M(DECIMAL256)                   \
+    M(DECIMALV2)                    \
+    M(BOOLEAN)                      \
+    M(IPV4)                         \
+    M(IPV6)
+        APPLY_FOR_PRIMITIVE_TYPE(M)
+#undef M
+    default: {
+        VLOG_CRITICAL << "Unsupported Convert Orc Literal [ColName=" << 
slot->col_name() << "]";
+        return std::make_tuple(false, orc::Literal(false), predicate_type);
     }
-
-    orc::PredicateDataType predicate_data_type;
-    auto type_it = TYPEKIND_TO_PREDICATE_TYPE.find(type->getKind());
-    if (type_it == TYPEKIND_TO_PREDICATE_TYPE.end()) {
-        // Unsupported type
-        return predicates;
-    } else {
-        predicate_data_type = type_it->second;
     }
+}
 
-    if (col_val_range.is_fixed_value_range()) {
-        OrcPredicate in_predicate;
-        in_predicate.col_name = col_val_range.column_name();
-        in_predicate.data_type = predicate_data_type;
-        in_predicate.op = SQLFilterOp::FILTER_IN;
-        for (const auto& value : col_val_range.get_fixed_value_set()) {
-            auto [valid, literal] = convert_to_orc_literal<primitive_type>(
-                    type, &value, col_val_range.precision(), 
col_val_range.scale());
-            if (valid) {
-                in_predicate.literals.push_back(literal);
-            }
-        }
-        if (!in_predicate.literals.empty()) {
-            predicates.emplace_back(in_predicate);
+// check if the expr can be pushed down to orc reader
+bool OrcReader::_check_expr_can_push_down(const VExprSPtr& expr) {
+    DCHECK_NOTNULL(expr);
+    switch (expr->op()) {
+    case TExprOpcode::COMPOUND_AND:
+    case TExprOpcode::COMPOUND_OR:
+    case TExprOpcode::COMPOUND_NOT:
+        // at least one child can be pushed down
+        return std::ranges::any_of(expr->children(), [this](const auto& child) 
{
+            return _check_expr_can_push_down(child);
+        });
+    case TExprOpcode::GE:
+    case TExprOpcode::GT:
+    case TExprOpcode::LE:
+    case TExprOpcode::LT:
+    case TExprOpcode::EQ:
+    case TExprOpcode::NE:
+    case TExprOpcode::FILTER_IN:
+    case TExprOpcode::FILTER_NOT_IN: {
+        // check if the slot is partition column
+        const auto* slot_ref = static_cast<const 
VSlotRef*>(expr->children()[0].get());
+        return 
!_lazy_read_ctx.predicate_partition_columns.contains(slot_ref->expr_name());
+    }
+    case TExprOpcode::INVALID_OPCODE:
+        if (expr->node_type() == TExprNodeType::FUNCTION_CALL) {
+            auto fn_name = expr->fn().name.function_name;
+            // only support is_null_pred and is_not_null_pred
+            if (fn_name == "is_null_pred" || fn_name == "is_not_null_pred") {
+                return true;
+            }
+            LOG(WARNING) << "Unsupported function [funciton=" << fn_name << 
"]";
         }
-        return predicates;
+        return false;
+    default:
+        VLOG_CRITICAL << "Unsupported Opcode [OpCode=" << expr->op() << "]";
+        return false;
     }
+}
 
-    const auto& high_value = col_val_range.get_range_max_value();
-    const auto& low_value = col_val_range.get_range_min_value();
-    const auto& high_op = col_val_range.get_range_high_op();
-    const auto& low_op = col_val_range.get_range_low_op();
+// convert expr to sargs recursively
+bool OrcReader::_build_search_argument(const VExprSPtr& expr,
+                                       
std::unique_ptr<orc::SearchArgumentBuilder>& builder) {
+    if (expr == nullptr) {
+        return false;
+    }
 
-    // orc can only push down is_null. When col_value_range._contain_null = 
true, only indicating that
-    // value can be null, not equals null, so ignore _contain_null in 
col_value_range
-    if (col_val_range.is_high_value_maximum() && high_op == 
SQLFilterOp::FILTER_LESS_OR_EQUAL &&
-        col_val_range.is_low_value_mininum() && low_op == 
SQLFilterOp::FILTER_LARGER_OR_EQUAL) {
-        return predicates;
+    // if expr can not be pushed down, skip it and continue to next expr
+    if (!_check_expr_can_push_down(expr)) {
+        return true;
     }
 
-    if (low_value < high_value) {
-        if (!col_val_range.is_low_value_mininum() ||
-            SQLFilterOp::FILTER_LARGER_OR_EQUAL != low_op) {
-            auto [valid, low_literal] = convert_to_orc_literal<primitive_type>(
-                    type, &low_value, col_val_range.precision(), 
col_val_range.scale());
-            if (valid) {
-                OrcPredicate low_predicate;
-                low_predicate.col_name = col_val_range.column_name();
-                low_predicate.data_type = predicate_data_type;
-                low_predicate.op = low_op;
-                low_predicate.literals.emplace_back(low_literal);
-                predicates.emplace_back(low_predicate);
+    switch (expr->op()) {
+    case TExprOpcode::COMPOUND_AND:
+        builder->startAnd();
+        for (const auto& child : expr->children()) {
+            if (!_build_search_argument(child, builder)) {
+                return false;
             }
         }
-        if (!col_val_range.is_high_value_maximum() ||
-            SQLFilterOp::FILTER_LESS_OR_EQUAL != high_op) {
-            auto [valid, high_literal] = 
convert_to_orc_literal<primitive_type>(
-                    type, &high_value, col_val_range.precision(), 
col_val_range.scale());
-            if (valid) {
-                OrcPredicate high_predicate;
-                high_predicate.col_name = col_val_range.column_name();
-                high_predicate.data_type = predicate_data_type;
-                high_predicate.op = high_op;
-                high_predicate.literals.emplace_back(high_literal);
-                predicates.emplace_back(high_predicate);
+        builder->end();
+        break;
+    case TExprOpcode::COMPOUND_OR:
+        builder->startOr();
+        for (const auto& child : expr->children()) {
+            if (!_build_search_argument(child, builder)) {
+                return false;
             }
         }
+        builder->end();
+        break;
+    case TExprOpcode::COMPOUND_NOT:
+        builder->startNot();
+        DCHECK_EQ(expr->children().size(), 1);
+        if (!_build_search_argument(expr->children()[0], builder)) {
+            return false;
+        }
+        builder->end();
+        break;
+    case TExprOpcode::GE: {
+        builder->startNot();
+        DCHECK(expr->children().size() == 2);
+        DCHECK(expr->children()[0]->is_slot_ref());
+        DCHECK(expr->children()[1]->is_literal());
+        const auto* slot_ref = static_cast<const 
VSlotRef*>(expr->children()[0].get());
+        const auto* literal = static_cast<const 
VLiteral*>(expr->children()[1].get());
+        auto [valid, orc_literal, predicate_type] = 
_make_orc_leteral(slot_ref, literal);
+        if (!valid) {
+            return false;
+        }
+        builder->lessThan(slot_ref->expr_name(), predicate_type, orc_literal);
+        builder->end();
+        break;
     }
-    return predicates;
-}
-
-bool static build_search_argument(std::vector<OrcPredicate>& predicates, int 
index,
-                                  std::unique_ptr<orc::SearchArgumentBuilder>& 
builder) {
-    if (index >= predicates.size()) {
-        return false;
-    }
-    if (index < predicates.size() - 1) {
-        builder->startAnd();
+    case TExprOpcode::GT: {
+        builder->startNot();
+        DCHECK(expr->children().size() == 2);
+        DCHECK(expr->children()[0]->is_slot_ref());
+        DCHECK(expr->children()[1]->is_literal());
+        const auto* slot_ref = static_cast<const 
VSlotRef*>(expr->children()[0].get());
+        const auto* literal = static_cast<const 
VLiteral*>(expr->children()[1].get());
+        auto [valid, orc_literal, predicate_type] = 
_make_orc_leteral(slot_ref, literal);
+        if (!valid) {
+            return false;
+        }
+        builder->lessThanEquals(slot_ref->expr_name(), predicate_type, 
orc_literal);
+        builder->end();
+        break;
     }
-    OrcPredicate& predicate = predicates[index];
-    switch (predicate.op) {
-    case SQLFilterOp::FILTER_IN: {
-        if (predicate.literals.size() == 1) {
-            builder->equals(predicate.col_name, predicate.data_type, 
predicate.literals[0]);
-        } else {
-            builder->in(predicate.col_name, predicate.data_type, 
predicate.literals);
+    case TExprOpcode::LE: {
+        DCHECK(expr->children().size() == 2);
+        DCHECK(expr->children()[0]->is_slot_ref());
+        DCHECK(expr->children()[1]->is_literal());
+        const auto* slot_ref = static_cast<const 
VSlotRef*>(expr->children()[0].get());
+        const auto* literal = static_cast<const 
VLiteral*>(expr->children()[1].get());
+        auto [valid, orc_literal, predicate_type] = 
_make_orc_leteral(slot_ref, literal);
+        if (!valid) {
+            return false;
         }
+        builder->lessThanEquals(slot_ref->expr_name(), predicate_type, 
orc_literal);
         break;
     }
-    case SQLFilterOp::FILTER_LESS:
-        builder->lessThan(predicate.col_name, predicate.data_type, 
predicate.literals[0]);
+    case TExprOpcode::LT: {
+        DCHECK(expr->children().size() == 2);
+        DCHECK(expr->children()[0]->is_slot_ref());
+        DCHECK(expr->children()[1]->is_literal());
+        const auto* slot_ref = static_cast<const 
VSlotRef*>(expr->children()[0].get());
+        const auto* literal = static_cast<const 
VLiteral*>(expr->children()[1].get());
+        auto [valid, orc_literal, predicate_type] = 
_make_orc_leteral(slot_ref, literal);
+        if (!valid) {
+            return false;
+        }
+        builder->lessThan(slot_ref->expr_name(), predicate_type, orc_literal);
         break;
-    case SQLFilterOp::FILTER_LESS_OR_EQUAL:
-        builder->lessThanEquals(predicate.col_name, predicate.data_type, 
predicate.literals[0]);
+    }
+    case TExprOpcode::EQ: {
+        DCHECK(expr->children().size() == 2);
+        DCHECK(expr->children()[0]->is_slot_ref());
+        DCHECK(expr->children()[1]->is_literal());
+        const auto* slot_ref = static_cast<const 
VSlotRef*>(expr->children()[0].get());
+        const auto* literal = static_cast<const 
VLiteral*>(expr->children()[1].get());
+        auto [valid, orc_literal, predicate_type] = 
_make_orc_leteral(slot_ref, literal);
+        if (!valid) {
+            return false;
+        }
+        builder->equals(slot_ref->expr_name(), predicate_type, orc_literal);
         break;
-    case SQLFilterOp::FILTER_LARGER: {
+    }
+    case TExprOpcode::NE: {
         builder->startNot();
-        builder->lessThanEquals(predicate.col_name, predicate.data_type, 
predicate.literals[0]);
+        DCHECK(expr->children().size() == 2);
+        DCHECK(expr->children()[0]->is_slot_ref());
+        DCHECK(expr->children()[1]->is_literal());
+        const auto* slot_ref = static_cast<const 
VSlotRef*>(expr->children()[0].get());
+        const auto* literal = static_cast<const 
VLiteral*>(expr->children()[1].get());
+        auto [valid, orc_literal, predicate_type] = 
_make_orc_leteral(slot_ref, literal);
+        if (!valid) {
+            return false;
+        }
+        builder->equals(slot_ref->expr_name(), predicate_type, orc_literal);
         builder->end();
         break;
     }
-    case SQLFilterOp::FILTER_LARGER_OR_EQUAL: {
-        builder->startNot();
-        builder->lessThan(predicate.col_name, predicate.data_type, 
predicate.literals[0]);
-        builder->end();
+    case TExprOpcode::FILTER_IN: {
+        DCHECK(expr->children()[0]->is_slot_ref());
+        const auto* slot_ref = static_cast<const 
VSlotRef*>(expr->children()[0].get());
+        std::vector<orc::Literal> literals;
+        orc::PredicateDataType predicate_type;
+        for (size_t i = 1; i < expr->children().size(); ++i) {
+            DCHECK(expr->children()[i]->is_literal());
+            const auto* literal = static_cast<const 
VLiteral*>(expr->children()[i].get());
+            auto [valid, orc_literal, type] = _make_orc_leteral(slot_ref, 
literal);
+            if (!valid) {
+                return false;
+            }
+            literals.emplace_back(orc_literal);
+            predicate_type = type;
+        }
+        if (!literals.empty()) {
+            builder->in(slot_ref->expr_name(), predicate_type, literals);
+        }
         break;
     }
-    default:
-        return false;
+    case TExprOpcode::FILTER_NOT_IN: {
+        DCHECK(expr->children()[0]->is_slot_ref());
+        const auto* slot_ref = static_cast<const 
VSlotRef*>(expr->children()[0].get());
+        std::vector<orc::Literal> literals;
+        orc::PredicateDataType predicate_type;
+        for (size_t i = 1; i < expr->children().size(); ++i) {
+            DCHECK(expr->children()[i]->is_literal());
+            const auto* literal = static_cast<const 
VLiteral*>(expr->children()[i].get());
+            auto [valid, orc_literal, type] = _make_orc_leteral(slot_ref, 
literal);
+            if (!valid) {
+                return false;
+            }
+            literals.emplace_back(orc_literal);
+            predicate_type = type;
+        }
+        if (!literals.empty()) {
+            builder->startNot();
+            builder->in(slot_ref->expr_name(), predicate_type, literals);
+            builder->end();
+        }
+        break;
     }
-    if (index < predicates.size() - 1) {
-        bool can_build = build_search_argument(predicates, index + 1, builder);
-        if (!can_build) {
-            return false;
+    // is null and is not null is represented as function call
+    case TExprOpcode::INVALID_OPCODE: {
+        DCHECK(expr->node_type() == TExprNodeType::FUNCTION_CALL);
+        if (expr->fn().name.function_name == "is_null_pred") {
+            DCHECK(expr->children().size() == 1);
+            DCHECK(expr->children()[0]->is_slot_ref());
+            const auto* slot_ref = static_cast<const 
VSlotRef*>(expr->children()[0].get());
+            auto* slot = _tuple_descriptor->slots()[slot_ref->column_id()];
+            const auto* orc_type = 
_type_map[_col_name_to_file_col_name[slot->col_name()]];
+            const auto predicate_type = 
TYPEKIND_TO_PREDICATE_TYPE[orc_type->getKind()];
+            builder->isNull(slot_ref->expr_name(), predicate_type);
+        } else if (expr->fn().name.function_name == "is_not_null_pred") {
+            DCHECK(expr->children().size() == 1);
+            DCHECK(expr->children()[0]->is_slot_ref());
+            const auto* slot_ref = static_cast<const 
VSlotRef*>(expr->children()[0].get());
+            auto* slot = _tuple_descriptor->slots()[slot_ref->column_id()];
+            const auto* orc_type = 
_type_map[_col_name_to_file_col_name[slot->col_name()]];
+            const auto predicate_type = 
TYPEKIND_TO_PREDICATE_TYPE[orc_type->getKind()];
+            builder->startNot();
+            builder->isNull(slot_ref->expr_name(), predicate_type);
+            builder->end();
+        } else {
+            __builtin_unreachable();
         }
-        builder->end();
+        break;
+    }
+    default: {
+        // should not reach here, because _check_expr_can_push_down has 
already checked
+        __builtin_unreachable();
+    }
     }
     return true;
 }
 
-bool OrcReader::_init_search_argument(
-        std::unordered_map<std::string, ColumnValueRangeType>* 
colname_to_value_range) {
-    if ((!_enable_filter_by_min_max) || colname_to_value_range->empty()) {
+bool OrcReader::_init_search_argument(const VExprContextSPtrs& conjuncts) {
+    if (!_enable_filter_by_min_max) {
         return false;
     }
-    std::vector<OrcPredicate> predicates;
-    auto& root_type = _reader->getType();
-    std::unordered_map<std::string, const orc::Type*> type_map;
-    for (int i = 0; i < root_type.getSubtypeCount(); ++i) {
-        type_map.emplace(get_field_name_lower_case(&root_type, i), 
root_type.getSubtype(i));
+
+    // if no conjuncts can be pushed down, return false
+    if (!std::ranges::any_of(conjuncts, [this](const auto& expr_ctx) {
+            return _check_expr_can_push_down(expr_ctx->root());
+        })) {
+        return false;
     }
-    for (auto& col_name : _lazy_read_ctx.all_read_columns) {
-        auto iter = colname_to_value_range->find(col_name);
-        if (iter == colname_to_value_range->end()) {
-            continue;
-        }
-        auto type_it = type_map.find(_col_name_to_file_col_name[col_name]);
-        if (type_it == type_map.end()) {
-            continue;
+
+    auto sargBuilder = orc::SearchArgumentFactory::newBuilder();

Review Comment:
   ```suggestion
       auto sarg_builder = orc::SearchArgumentFactory::newBuilder();
   ```



##########
be/src/vec/exec/format/orc/vorc_reader.cpp:
##########
@@ -558,190 +562,316 @@ std::tuple<bool, orc::Literal> 
convert_to_orc_literal(const orc::Type* type, con
     }
 }
 
-template <PrimitiveType primitive_type>
-std::vector<OrcPredicate> value_range_to_predicate(
-        const ColumnValueRange<primitive_type>& col_val_range, const 
orc::Type* type,
-        std::vector<orc::TypeKind>* unsupported_pushdown_types) {
-    std::vector<OrcPredicate> predicates;
-
-    PrimitiveType src_type = OrcReader::convert_to_doris_type(type).type;
-    if (src_type != primitive_type) {
-        if (!(is_string_type(src_type) && is_string_type(primitive_type))) {
-            // not support schema change
-            return predicates;
-        }
-    }
-
-    if (unsupported_pushdown_types != nullptr) {
-        for (vector<orc::TypeKind>::iterator it = 
unsupported_pushdown_types->begin();
-             it != unsupported_pushdown_types->end(); ++it) {
-            if (*it == type->getKind()) {
-                // Unsupported type
-                return predicates;
-            }
-        }
+std::tuple<bool, orc::Literal, orc::PredicateDataType> 
OrcReader::_make_orc_leteral(
+        const VSlotRef* slot_ref, const VLiteral* literal) {
+    auto literal_data = literal->get_column_ptr()->get_data_at(0);
+    auto* slot = _tuple_descriptor->slots()[slot_ref->column_id()];
+    auto slot_type = slot->type();
+    const auto* orc_type = 
_type_map[_col_name_to_file_col_name[slot->col_name()]];
+    const auto predicate_type = 
TYPEKIND_TO_PREDICATE_TYPE[orc_type->getKind()];
+    switch (slot_type.type) {
+#define M(NAME)                                                                
\
+    case TYPE_##NAME: {                                                        
\
+        auto [valid, orc_literal] = convert_to_orc_literal<TYPE_##NAME>(       
\
+                orc_type, literal_data, slot_type.precision, slot_type.scale); 
\
+        return std::make_tuple(valid, orc_literal, predicate_type);            
\
+    }
+#define APPLY_FOR_PRIMITIVE_TYPE(M) \
+    M(TINYINT)                      \
+    M(SMALLINT)                     \
+    M(INT)                          \
+    M(BIGINT)                       \
+    M(LARGEINT)                     \
+    M(CHAR)                         \
+    M(DATE)                         \
+    M(DATETIME)                     \
+    M(DATEV2)                       \
+    M(DATETIMEV2)                   \
+    M(VARCHAR)                      \
+    M(STRING)                       \
+    M(HLL)                          \
+    M(DECIMAL32)                    \
+    M(DECIMAL64)                    \
+    M(DECIMAL128I)                  \
+    M(DECIMAL256)                   \
+    M(DECIMALV2)                    \
+    M(BOOLEAN)                      \
+    M(IPV4)                         \
+    M(IPV6)
+        APPLY_FOR_PRIMITIVE_TYPE(M)
+#undef M
+    default: {
+        VLOG_CRITICAL << "Unsupported Convert Orc Literal [ColName=" << 
slot->col_name() << "]";
+        return std::make_tuple(false, orc::Literal(false), predicate_type);
     }
-
-    orc::PredicateDataType predicate_data_type;
-    auto type_it = TYPEKIND_TO_PREDICATE_TYPE.find(type->getKind());
-    if (type_it == TYPEKIND_TO_PREDICATE_TYPE.end()) {
-        // Unsupported type
-        return predicates;
-    } else {
-        predicate_data_type = type_it->second;
     }
+}
 
-    if (col_val_range.is_fixed_value_range()) {
-        OrcPredicate in_predicate;
-        in_predicate.col_name = col_val_range.column_name();
-        in_predicate.data_type = predicate_data_type;
-        in_predicate.op = SQLFilterOp::FILTER_IN;
-        for (const auto& value : col_val_range.get_fixed_value_set()) {
-            auto [valid, literal] = convert_to_orc_literal<primitive_type>(
-                    type, &value, col_val_range.precision(), 
col_val_range.scale());
-            if (valid) {
-                in_predicate.literals.push_back(literal);
-            }
-        }
-        if (!in_predicate.literals.empty()) {
-            predicates.emplace_back(in_predicate);
+// check if the expr can be pushed down to orc reader
+bool OrcReader::_check_expr_can_push_down(const VExprSPtr& expr) {
+    DCHECK_NOTNULL(expr);
+    switch (expr->op()) {
+    case TExprOpcode::COMPOUND_AND:
+    case TExprOpcode::COMPOUND_OR:
+    case TExprOpcode::COMPOUND_NOT:
+        // at least one child can be pushed down
+        return std::ranges::any_of(expr->children(), [this](const auto& child) 
{
+            return _check_expr_can_push_down(child);
+        });
+    case TExprOpcode::GE:
+    case TExprOpcode::GT:
+    case TExprOpcode::LE:
+    case TExprOpcode::LT:
+    case TExprOpcode::EQ:
+    case TExprOpcode::NE:
+    case TExprOpcode::FILTER_IN:
+    case TExprOpcode::FILTER_NOT_IN: {
+        // check if the slot is partition column
+        const auto* slot_ref = static_cast<const 
VSlotRef*>(expr->children()[0].get());
+        return 
!_lazy_read_ctx.predicate_partition_columns.contains(slot_ref->expr_name());
+    }
+    case TExprOpcode::INVALID_OPCODE:
+        if (expr->node_type() == TExprNodeType::FUNCTION_CALL) {
+            auto fn_name = expr->fn().name.function_name;
+            // only support is_null_pred and is_not_null_pred
+            if (fn_name == "is_null_pred" || fn_name == "is_not_null_pred") {
+                return true;
+            }
+            LOG(WARNING) << "Unsupported function [funciton=" << fn_name << 
"]";
         }
-        return predicates;
+        return false;
+    default:
+        VLOG_CRITICAL << "Unsupported Opcode [OpCode=" << expr->op() << "]";
+        return false;
     }
+}
 
-    const auto& high_value = col_val_range.get_range_max_value();
-    const auto& low_value = col_val_range.get_range_min_value();
-    const auto& high_op = col_val_range.get_range_high_op();
-    const auto& low_op = col_val_range.get_range_low_op();
+// convert expr to sargs recursively
+bool OrcReader::_build_search_argument(const VExprSPtr& expr,
+                                       
std::unique_ptr<orc::SearchArgumentBuilder>& builder) {
+    if (expr == nullptr) {
+        return false;
+    }
 
-    // orc can only push down is_null. When col_value_range._contain_null = 
true, only indicating that
-    // value can be null, not equals null, so ignore _contain_null in 
col_value_range
-    if (col_val_range.is_high_value_maximum() && high_op == 
SQLFilterOp::FILTER_LESS_OR_EQUAL &&
-        col_val_range.is_low_value_mininum() && low_op == 
SQLFilterOp::FILTER_LARGER_OR_EQUAL) {
-        return predicates;
+    // if expr can not be pushed down, skip it and continue to next expr
+    if (!_check_expr_can_push_down(expr)) {
+        return true;
     }
 
-    if (low_value < high_value) {
-        if (!col_val_range.is_low_value_mininum() ||
-            SQLFilterOp::FILTER_LARGER_OR_EQUAL != low_op) {
-            auto [valid, low_literal] = convert_to_orc_literal<primitive_type>(
-                    type, &low_value, col_val_range.precision(), 
col_val_range.scale());
-            if (valid) {
-                OrcPredicate low_predicate;
-                low_predicate.col_name = col_val_range.column_name();
-                low_predicate.data_type = predicate_data_type;
-                low_predicate.op = low_op;
-                low_predicate.literals.emplace_back(low_literal);
-                predicates.emplace_back(low_predicate);
+    switch (expr->op()) {
+    case TExprOpcode::COMPOUND_AND:
+        builder->startAnd();
+        for (const auto& child : expr->children()) {
+            if (!_build_search_argument(child, builder)) {
+                return false;
             }
         }
-        if (!col_val_range.is_high_value_maximum() ||
-            SQLFilterOp::FILTER_LESS_OR_EQUAL != high_op) {
-            auto [valid, high_literal] = 
convert_to_orc_literal<primitive_type>(
-                    type, &high_value, col_val_range.precision(), 
col_val_range.scale());
-            if (valid) {
-                OrcPredicate high_predicate;
-                high_predicate.col_name = col_val_range.column_name();
-                high_predicate.data_type = predicate_data_type;
-                high_predicate.op = high_op;
-                high_predicate.literals.emplace_back(high_literal);
-                predicates.emplace_back(high_predicate);
+        builder->end();
+        break;
+    case TExprOpcode::COMPOUND_OR:
+        builder->startOr();
+        for (const auto& child : expr->children()) {
+            if (!_build_search_argument(child, builder)) {
+                return false;
             }
         }
+        builder->end();
+        break;
+    case TExprOpcode::COMPOUND_NOT:
+        builder->startNot();
+        DCHECK_EQ(expr->children().size(), 1);
+        if (!_build_search_argument(expr->children()[0], builder)) {
+            return false;
+        }
+        builder->end();
+        break;
+    case TExprOpcode::GE: {
+        builder->startNot();
+        DCHECK(expr->children().size() == 2);
+        DCHECK(expr->children()[0]->is_slot_ref());

Review Comment:
   How to make sure that first child must be slot and second must be literal?



##########
be/src/vec/exec/format/orc/vorc_reader.cpp:
##########
@@ -558,190 +562,316 @@ std::tuple<bool, orc::Literal> 
convert_to_orc_literal(const orc::Type* type, con
     }
 }
 
-template <PrimitiveType primitive_type>
-std::vector<OrcPredicate> value_range_to_predicate(
-        const ColumnValueRange<primitive_type>& col_val_range, const 
orc::Type* type,
-        std::vector<orc::TypeKind>* unsupported_pushdown_types) {
-    std::vector<OrcPredicate> predicates;
-
-    PrimitiveType src_type = OrcReader::convert_to_doris_type(type).type;
-    if (src_type != primitive_type) {
-        if (!(is_string_type(src_type) && is_string_type(primitive_type))) {
-            // not support schema change
-            return predicates;
-        }
-    }
-
-    if (unsupported_pushdown_types != nullptr) {
-        for (vector<orc::TypeKind>::iterator it = 
unsupported_pushdown_types->begin();
-             it != unsupported_pushdown_types->end(); ++it) {
-            if (*it == type->getKind()) {
-                // Unsupported type
-                return predicates;
-            }
-        }
+std::tuple<bool, orc::Literal, orc::PredicateDataType> 
OrcReader::_make_orc_leteral(
+        const VSlotRef* slot_ref, const VLiteral* literal) {
+    auto literal_data = literal->get_column_ptr()->get_data_at(0);
+    auto* slot = _tuple_descriptor->slots()[slot_ref->column_id()];
+    auto slot_type = slot->type();
+    const auto* orc_type = 
_type_map[_col_name_to_file_col_name[slot->col_name()]];
+    const auto predicate_type = 
TYPEKIND_TO_PREDICATE_TYPE[orc_type->getKind()];
+    switch (slot_type.type) {
+#define M(NAME)                                                                
\
+    case TYPE_##NAME: {                                                        
\
+        auto [valid, orc_literal] = convert_to_orc_literal<TYPE_##NAME>(       
\
+                orc_type, literal_data, slot_type.precision, slot_type.scale); 
\
+        return std::make_tuple(valid, orc_literal, predicate_type);            
\
+    }
+#define APPLY_FOR_PRIMITIVE_TYPE(M) \
+    M(TINYINT)                      \
+    M(SMALLINT)                     \
+    M(INT)                          \
+    M(BIGINT)                       \
+    M(LARGEINT)                     \
+    M(CHAR)                         \
+    M(DATE)                         \
+    M(DATETIME)                     \
+    M(DATEV2)                       \
+    M(DATETIMEV2)                   \
+    M(VARCHAR)                      \
+    M(STRING)                       \
+    M(HLL)                          \
+    M(DECIMAL32)                    \
+    M(DECIMAL64)                    \
+    M(DECIMAL128I)                  \
+    M(DECIMAL256)                   \
+    M(DECIMALV2)                    \
+    M(BOOLEAN)                      \
+    M(IPV4)                         \
+    M(IPV6)
+        APPLY_FOR_PRIMITIVE_TYPE(M)
+#undef M
+    default: {
+        VLOG_CRITICAL << "Unsupported Convert Orc Literal [ColName=" << 
slot->col_name() << "]";
+        return std::make_tuple(false, orc::Literal(false), predicate_type);
     }
-
-    orc::PredicateDataType predicate_data_type;
-    auto type_it = TYPEKIND_TO_PREDICATE_TYPE.find(type->getKind());
-    if (type_it == TYPEKIND_TO_PREDICATE_TYPE.end()) {
-        // Unsupported type
-        return predicates;
-    } else {
-        predicate_data_type = type_it->second;
     }
+}
 
-    if (col_val_range.is_fixed_value_range()) {
-        OrcPredicate in_predicate;
-        in_predicate.col_name = col_val_range.column_name();
-        in_predicate.data_type = predicate_data_type;
-        in_predicate.op = SQLFilterOp::FILTER_IN;
-        for (const auto& value : col_val_range.get_fixed_value_set()) {
-            auto [valid, literal] = convert_to_orc_literal<primitive_type>(
-                    type, &value, col_val_range.precision(), 
col_val_range.scale());
-            if (valid) {
-                in_predicate.literals.push_back(literal);
-            }
-        }
-        if (!in_predicate.literals.empty()) {
-            predicates.emplace_back(in_predicate);
+// check if the expr can be pushed down to orc reader
+bool OrcReader::_check_expr_can_push_down(const VExprSPtr& expr) {
+    DCHECK_NOTNULL(expr);
+    switch (expr->op()) {
+    case TExprOpcode::COMPOUND_AND:
+    case TExprOpcode::COMPOUND_OR:
+    case TExprOpcode::COMPOUND_NOT:
+        // at least one child can be pushed down
+        return std::ranges::any_of(expr->children(), [this](const auto& child) 
{
+            return _check_expr_can_push_down(child);
+        });
+    case TExprOpcode::GE:
+    case TExprOpcode::GT:
+    case TExprOpcode::LE:
+    case TExprOpcode::LT:
+    case TExprOpcode::EQ:
+    case TExprOpcode::NE:
+    case TExprOpcode::FILTER_IN:
+    case TExprOpcode::FILTER_NOT_IN: {
+        // check if the slot is partition column
+        const auto* slot_ref = static_cast<const 
VSlotRef*>(expr->children()[0].get());
+        return 
!_lazy_read_ctx.predicate_partition_columns.contains(slot_ref->expr_name());
+    }
+    case TExprOpcode::INVALID_OPCODE:
+        if (expr->node_type() == TExprNodeType::FUNCTION_CALL) {
+            auto fn_name = expr->fn().name.function_name;
+            // only support is_null_pred and is_not_null_pred
+            if (fn_name == "is_null_pred" || fn_name == "is_not_null_pred") {
+                return true;
+            }
+            LOG(WARNING) << "Unsupported function [funciton=" << fn_name << 
"]";
         }
-        return predicates;
+        return false;
+    default:
+        VLOG_CRITICAL << "Unsupported Opcode [OpCode=" << expr->op() << "]";
+        return false;
     }
+}
 
-    const auto& high_value = col_val_range.get_range_max_value();
-    const auto& low_value = col_val_range.get_range_min_value();
-    const auto& high_op = col_val_range.get_range_high_op();
-    const auto& low_op = col_val_range.get_range_low_op();
+// convert expr to sargs recursively
+bool OrcReader::_build_search_argument(const VExprSPtr& expr,
+                                       
std::unique_ptr<orc::SearchArgumentBuilder>& builder) {
+    if (expr == nullptr) {
+        return false;
+    }
 
-    // orc can only push down is_null. When col_value_range._contain_null = 
true, only indicating that
-    // value can be null, not equals null, so ignore _contain_null in 
col_value_range
-    if (col_val_range.is_high_value_maximum() && high_op == 
SQLFilterOp::FILTER_LESS_OR_EQUAL &&
-        col_val_range.is_low_value_mininum() && low_op == 
SQLFilterOp::FILTER_LARGER_OR_EQUAL) {
-        return predicates;
+    // if expr can not be pushed down, skip it and continue to next expr
+    if (!_check_expr_can_push_down(expr)) {
+        return true;
     }
 
-    if (low_value < high_value) {
-        if (!col_val_range.is_low_value_mininum() ||
-            SQLFilterOp::FILTER_LARGER_OR_EQUAL != low_op) {
-            auto [valid, low_literal] = convert_to_orc_literal<primitive_type>(
-                    type, &low_value, col_val_range.precision(), 
col_val_range.scale());
-            if (valid) {
-                OrcPredicate low_predicate;
-                low_predicate.col_name = col_val_range.column_name();
-                low_predicate.data_type = predicate_data_type;
-                low_predicate.op = low_op;
-                low_predicate.literals.emplace_back(low_literal);
-                predicates.emplace_back(low_predicate);
+    switch (expr->op()) {
+    case TExprOpcode::COMPOUND_AND:
+        builder->startAnd();
+        for (const auto& child : expr->children()) {
+            if (!_build_search_argument(child, builder)) {
+                return false;
             }
         }
-        if (!col_val_range.is_high_value_maximum() ||
-            SQLFilterOp::FILTER_LESS_OR_EQUAL != high_op) {
-            auto [valid, high_literal] = 
convert_to_orc_literal<primitive_type>(
-                    type, &high_value, col_val_range.precision(), 
col_val_range.scale());
-            if (valid) {
-                OrcPredicate high_predicate;
-                high_predicate.col_name = col_val_range.column_name();
-                high_predicate.data_type = predicate_data_type;
-                high_predicate.op = high_op;
-                high_predicate.literals.emplace_back(high_literal);
-                predicates.emplace_back(high_predicate);
+        builder->end();
+        break;
+    case TExprOpcode::COMPOUND_OR:
+        builder->startOr();
+        for (const auto& child : expr->children()) {
+            if (!_build_search_argument(child, builder)) {
+                return false;
             }
         }
+        builder->end();
+        break;
+    case TExprOpcode::COMPOUND_NOT:
+        builder->startNot();
+        DCHECK_EQ(expr->children().size(), 1);
+        if (!_build_search_argument(expr->children()[0], builder)) {
+            return false;
+        }
+        builder->end();
+        break;
+    case TExprOpcode::GE: {
+        builder->startNot();
+        DCHECK(expr->children().size() == 2);
+        DCHECK(expr->children()[0]->is_slot_ref());
+        DCHECK(expr->children()[1]->is_literal());
+        const auto* slot_ref = static_cast<const 
VSlotRef*>(expr->children()[0].get());
+        const auto* literal = static_cast<const 
VLiteral*>(expr->children()[1].get());
+        auto [valid, orc_literal, predicate_type] = 
_make_orc_leteral(slot_ref, literal);
+        if (!valid) {
+            return false;
+        }
+        builder->lessThan(slot_ref->expr_name(), predicate_type, orc_literal);
+        builder->end();
+        break;
     }
-    return predicates;
-}
-
-bool static build_search_argument(std::vector<OrcPredicate>& predicates, int 
index,
-                                  std::unique_ptr<orc::SearchArgumentBuilder>& 
builder) {
-    if (index >= predicates.size()) {
-        return false;
-    }
-    if (index < predicates.size() - 1) {
-        builder->startAnd();
+    case TExprOpcode::GT: {
+        builder->startNot();
+        DCHECK(expr->children().size() == 2);
+        DCHECK(expr->children()[0]->is_slot_ref());
+        DCHECK(expr->children()[1]->is_literal());
+        const auto* slot_ref = static_cast<const 
VSlotRef*>(expr->children()[0].get());
+        const auto* literal = static_cast<const 
VLiteral*>(expr->children()[1].get());
+        auto [valid, orc_literal, predicate_type] = 
_make_orc_leteral(slot_ref, literal);
+        if (!valid) {
+            return false;
+        }
+        builder->lessThanEquals(slot_ref->expr_name(), predicate_type, 
orc_literal);
+        builder->end();
+        break;
     }
-    OrcPredicate& predicate = predicates[index];
-    switch (predicate.op) {
-    case SQLFilterOp::FILTER_IN: {
-        if (predicate.literals.size() == 1) {
-            builder->equals(predicate.col_name, predicate.data_type, 
predicate.literals[0]);
-        } else {
-            builder->in(predicate.col_name, predicate.data_type, 
predicate.literals);
+    case TExprOpcode::LE: {
+        DCHECK(expr->children().size() == 2);
+        DCHECK(expr->children()[0]->is_slot_ref());
+        DCHECK(expr->children()[1]->is_literal());
+        const auto* slot_ref = static_cast<const 
VSlotRef*>(expr->children()[0].get());
+        const auto* literal = static_cast<const 
VLiteral*>(expr->children()[1].get());
+        auto [valid, orc_literal, predicate_type] = 
_make_orc_leteral(slot_ref, literal);
+        if (!valid) {
+            return false;
         }
+        builder->lessThanEquals(slot_ref->expr_name(), predicate_type, 
orc_literal);
         break;
     }
-    case SQLFilterOp::FILTER_LESS:
-        builder->lessThan(predicate.col_name, predicate.data_type, 
predicate.literals[0]);
+    case TExprOpcode::LT: {
+        DCHECK(expr->children().size() == 2);
+        DCHECK(expr->children()[0]->is_slot_ref());
+        DCHECK(expr->children()[1]->is_literal());
+        const auto* slot_ref = static_cast<const 
VSlotRef*>(expr->children()[0].get());
+        const auto* literal = static_cast<const 
VLiteral*>(expr->children()[1].get());
+        auto [valid, orc_literal, predicate_type] = 
_make_orc_leteral(slot_ref, literal);
+        if (!valid) {
+            return false;
+        }
+        builder->lessThan(slot_ref->expr_name(), predicate_type, orc_literal);
         break;
-    case SQLFilterOp::FILTER_LESS_OR_EQUAL:
-        builder->lessThanEquals(predicate.col_name, predicate.data_type, 
predicate.literals[0]);
+    }
+    case TExprOpcode::EQ: {
+        DCHECK(expr->children().size() == 2);
+        DCHECK(expr->children()[0]->is_slot_ref());
+        DCHECK(expr->children()[1]->is_literal());
+        const auto* slot_ref = static_cast<const 
VSlotRef*>(expr->children()[0].get());
+        const auto* literal = static_cast<const 
VLiteral*>(expr->children()[1].get());
+        auto [valid, orc_literal, predicate_type] = 
_make_orc_leteral(slot_ref, literal);
+        if (!valid) {
+            return false;
+        }
+        builder->equals(slot_ref->expr_name(), predicate_type, orc_literal);
         break;
-    case SQLFilterOp::FILTER_LARGER: {
+    }
+    case TExprOpcode::NE: {
         builder->startNot();
-        builder->lessThanEquals(predicate.col_name, predicate.data_type, 
predicate.literals[0]);
+        DCHECK(expr->children().size() == 2);
+        DCHECK(expr->children()[0]->is_slot_ref());
+        DCHECK(expr->children()[1]->is_literal());
+        const auto* slot_ref = static_cast<const 
VSlotRef*>(expr->children()[0].get());
+        const auto* literal = static_cast<const 
VLiteral*>(expr->children()[1].get());
+        auto [valid, orc_literal, predicate_type] = 
_make_orc_leteral(slot_ref, literal);
+        if (!valid) {
+            return false;
+        }
+        builder->equals(slot_ref->expr_name(), predicate_type, orc_literal);
         builder->end();
         break;
     }
-    case SQLFilterOp::FILTER_LARGER_OR_EQUAL: {
-        builder->startNot();
-        builder->lessThan(predicate.col_name, predicate.data_type, 
predicate.literals[0]);
-        builder->end();
+    case TExprOpcode::FILTER_IN: {
+        DCHECK(expr->children()[0]->is_slot_ref());
+        const auto* slot_ref = static_cast<const 
VSlotRef*>(expr->children()[0].get());
+        std::vector<orc::Literal> literals;
+        orc::PredicateDataType predicate_type;
+        for (size_t i = 1; i < expr->children().size(); ++i) {
+            DCHECK(expr->children()[i]->is_literal());
+            const auto* literal = static_cast<const 
VLiteral*>(expr->children()[i].get());
+            auto [valid, orc_literal, type] = _make_orc_leteral(slot_ref, 
literal);
+            if (!valid) {
+                return false;
+            }
+            literals.emplace_back(orc_literal);
+            predicate_type = type;
+        }
+        if (!literals.empty()) {
+            builder->in(slot_ref->expr_name(), predicate_type, literals);
+        }
         break;
     }
-    default:
-        return false;
+    case TExprOpcode::FILTER_NOT_IN: {
+        DCHECK(expr->children()[0]->is_slot_ref());
+        const auto* slot_ref = static_cast<const 
VSlotRef*>(expr->children()[0].get());
+        std::vector<orc::Literal> literals;
+        orc::PredicateDataType predicate_type;
+        for (size_t i = 1; i < expr->children().size(); ++i) {
+            DCHECK(expr->children()[i]->is_literal());
+            const auto* literal = static_cast<const 
VLiteral*>(expr->children()[i].get());
+            auto [valid, orc_literal, type] = _make_orc_leteral(slot_ref, 
literal);
+            if (!valid) {
+                return false;
+            }
+            literals.emplace_back(orc_literal);
+            predicate_type = type;
+        }
+        if (!literals.empty()) {
+            builder->startNot();
+            builder->in(slot_ref->expr_name(), predicate_type, literals);
+            builder->end();
+        }
+        break;
     }
-    if (index < predicates.size() - 1) {
-        bool can_build = build_search_argument(predicates, index + 1, builder);
-        if (!can_build) {
-            return false;
+    // is null and is not null is represented as function call
+    case TExprOpcode::INVALID_OPCODE: {
+        DCHECK(expr->node_type() == TExprNodeType::FUNCTION_CALL);
+        if (expr->fn().name.function_name == "is_null_pred") {
+            DCHECK(expr->children().size() == 1);
+            DCHECK(expr->children()[0]->is_slot_ref());
+            const auto* slot_ref = static_cast<const 
VSlotRef*>(expr->children()[0].get());
+            auto* slot = _tuple_descriptor->slots()[slot_ref->column_id()];
+            const auto* orc_type = 
_type_map[_col_name_to_file_col_name[slot->col_name()]];
+            const auto predicate_type = 
TYPEKIND_TO_PREDICATE_TYPE[orc_type->getKind()];
+            builder->isNull(slot_ref->expr_name(), predicate_type);
+        } else if (expr->fn().name.function_name == "is_not_null_pred") {
+            DCHECK(expr->children().size() == 1);
+            DCHECK(expr->children()[0]->is_slot_ref());
+            const auto* slot_ref = static_cast<const 
VSlotRef*>(expr->children()[0].get());
+            auto* slot = _tuple_descriptor->slots()[slot_ref->column_id()];
+            const auto* orc_type = 
_type_map[_col_name_to_file_col_name[slot->col_name()]];
+            const auto predicate_type = 
TYPEKIND_TO_PREDICATE_TYPE[orc_type->getKind()];
+            builder->startNot();
+            builder->isNull(slot_ref->expr_name(), predicate_type);
+            builder->end();
+        } else {
+            __builtin_unreachable();
         }
-        builder->end();
+        break;
+    }
+    default: {
+        // should not reach here, because _check_expr_can_push_down has 
already checked
+        __builtin_unreachable();
+    }
     }
     return true;
 }
 
-bool OrcReader::_init_search_argument(
-        std::unordered_map<std::string, ColumnValueRangeType>* 
colname_to_value_range) {
-    if ((!_enable_filter_by_min_max) || colname_to_value_range->empty()) {
+bool OrcReader::_init_search_argument(const VExprContextSPtrs& conjuncts) {

Review Comment:
   For a certain query, the conjuncts is identical in all orc readers.
   So I think we can just init search argument once and use the result for 
every orc reader?



##########
be/src/vec/exec/format/orc/vorc_reader.cpp:
##########
@@ -558,190 +562,316 @@ std::tuple<bool, orc::Literal> 
convert_to_orc_literal(const orc::Type* type, con
     }
 }
 
-template <PrimitiveType primitive_type>
-std::vector<OrcPredicate> value_range_to_predicate(
-        const ColumnValueRange<primitive_type>& col_val_range, const 
orc::Type* type,
-        std::vector<orc::TypeKind>* unsupported_pushdown_types) {
-    std::vector<OrcPredicate> predicates;
-
-    PrimitiveType src_type = OrcReader::convert_to_doris_type(type).type;
-    if (src_type != primitive_type) {
-        if (!(is_string_type(src_type) && is_string_type(primitive_type))) {
-            // not support schema change
-            return predicates;
-        }
-    }
-
-    if (unsupported_pushdown_types != nullptr) {
-        for (vector<orc::TypeKind>::iterator it = 
unsupported_pushdown_types->begin();
-             it != unsupported_pushdown_types->end(); ++it) {
-            if (*it == type->getKind()) {
-                // Unsupported type
-                return predicates;
-            }
-        }
+std::tuple<bool, orc::Literal, orc::PredicateDataType> 
OrcReader::_make_orc_leteral(

Review Comment:
   ```suggestion
   std::tuple<bool, orc::Literal, orc::PredicateDataType> 
OrcReader::_make_orc_literal(
   ```



##########
be/src/vec/exec/format/orc/vorc_reader.cpp:
##########
@@ -558,190 +562,316 @@ std::tuple<bool, orc::Literal> 
convert_to_orc_literal(const orc::Type* type, con
     }
 }
 
-template <PrimitiveType primitive_type>
-std::vector<OrcPredicate> value_range_to_predicate(
-        const ColumnValueRange<primitive_type>& col_val_range, const 
orc::Type* type,
-        std::vector<orc::TypeKind>* unsupported_pushdown_types) {
-    std::vector<OrcPredicate> predicates;
-
-    PrimitiveType src_type = OrcReader::convert_to_doris_type(type).type;
-    if (src_type != primitive_type) {
-        if (!(is_string_type(src_type) && is_string_type(primitive_type))) {
-            // not support schema change
-            return predicates;
-        }
-    }
-
-    if (unsupported_pushdown_types != nullptr) {
-        for (vector<orc::TypeKind>::iterator it = 
unsupported_pushdown_types->begin();
-             it != unsupported_pushdown_types->end(); ++it) {
-            if (*it == type->getKind()) {
-                // Unsupported type
-                return predicates;
-            }
-        }
+std::tuple<bool, orc::Literal, orc::PredicateDataType> 
OrcReader::_make_orc_leteral(
+        const VSlotRef* slot_ref, const VLiteral* literal) {
+    auto literal_data = literal->get_column_ptr()->get_data_at(0);
+    auto* slot = _tuple_descriptor->slots()[slot_ref->column_id()];
+    auto slot_type = slot->type();
+    const auto* orc_type = 
_type_map[_col_name_to_file_col_name[slot->col_name()]];
+    const auto predicate_type = 
TYPEKIND_TO_PREDICATE_TYPE[orc_type->getKind()];
+    switch (slot_type.type) {
+#define M(NAME)                                                                
\
+    case TYPE_##NAME: {                                                        
\
+        auto [valid, orc_literal] = convert_to_orc_literal<TYPE_##NAME>(       
\
+                orc_type, literal_data, slot_type.precision, slot_type.scale); 
\
+        return std::make_tuple(valid, orc_literal, predicate_type);            
\
+    }
+#define APPLY_FOR_PRIMITIVE_TYPE(M) \
+    M(TINYINT)                      \
+    M(SMALLINT)                     \
+    M(INT)                          \
+    M(BIGINT)                       \
+    M(LARGEINT)                     \
+    M(CHAR)                         \
+    M(DATE)                         \
+    M(DATETIME)                     \
+    M(DATEV2)                       \
+    M(DATETIMEV2)                   \
+    M(VARCHAR)                      \
+    M(STRING)                       \
+    M(HLL)                          \
+    M(DECIMAL32)                    \
+    M(DECIMAL64)                    \
+    M(DECIMAL128I)                  \
+    M(DECIMAL256)                   \
+    M(DECIMALV2)                    \
+    M(BOOLEAN)                      \
+    M(IPV4)                         \
+    M(IPV6)
+        APPLY_FOR_PRIMITIVE_TYPE(M)
+#undef M
+    default: {
+        VLOG_CRITICAL << "Unsupported Convert Orc Literal [ColName=" << 
slot->col_name() << "]";
+        return std::make_tuple(false, orc::Literal(false), predicate_type);
     }
-
-    orc::PredicateDataType predicate_data_type;
-    auto type_it = TYPEKIND_TO_PREDICATE_TYPE.find(type->getKind());
-    if (type_it == TYPEKIND_TO_PREDICATE_TYPE.end()) {
-        // Unsupported type
-        return predicates;
-    } else {
-        predicate_data_type = type_it->second;
     }
+}
 
-    if (col_val_range.is_fixed_value_range()) {
-        OrcPredicate in_predicate;
-        in_predicate.col_name = col_val_range.column_name();
-        in_predicate.data_type = predicate_data_type;
-        in_predicate.op = SQLFilterOp::FILTER_IN;
-        for (const auto& value : col_val_range.get_fixed_value_set()) {
-            auto [valid, literal] = convert_to_orc_literal<primitive_type>(
-                    type, &value, col_val_range.precision(), 
col_val_range.scale());
-            if (valid) {
-                in_predicate.literals.push_back(literal);
-            }
-        }
-        if (!in_predicate.literals.empty()) {
-            predicates.emplace_back(in_predicate);
+// check if the expr can be pushed down to orc reader
+bool OrcReader::_check_expr_can_push_down(const VExprSPtr& expr) {
+    DCHECK_NOTNULL(expr);
+    switch (expr->op()) {
+    case TExprOpcode::COMPOUND_AND:
+    case TExprOpcode::COMPOUND_OR:
+    case TExprOpcode::COMPOUND_NOT:
+        // at least one child can be pushed down
+        return std::ranges::any_of(expr->children(), [this](const auto& child) 
{
+            return _check_expr_can_push_down(child);
+        });
+    case TExprOpcode::GE:
+    case TExprOpcode::GT:
+    case TExprOpcode::LE:
+    case TExprOpcode::LT:
+    case TExprOpcode::EQ:
+    case TExprOpcode::NE:
+    case TExprOpcode::FILTER_IN:
+    case TExprOpcode::FILTER_NOT_IN: {
+        // check if the slot is partition column
+        const auto* slot_ref = static_cast<const 
VSlotRef*>(expr->children()[0].get());
+        return 
!_lazy_read_ctx.predicate_partition_columns.contains(slot_ref->expr_name());
+    }
+    case TExprOpcode::INVALID_OPCODE:
+        if (expr->node_type() == TExprNodeType::FUNCTION_CALL) {
+            auto fn_name = expr->fn().name.function_name;
+            // only support is_null_pred and is_not_null_pred
+            if (fn_name == "is_null_pred" || fn_name == "is_not_null_pred") {
+                return true;
+            }
+            LOG(WARNING) << "Unsupported function [funciton=" << fn_name << 
"]";
         }
-        return predicates;
+        return false;
+    default:
+        VLOG_CRITICAL << "Unsupported Opcode [OpCode=" << expr->op() << "]";
+        return false;
     }
+}
 
-    const auto& high_value = col_val_range.get_range_max_value();
-    const auto& low_value = col_val_range.get_range_min_value();
-    const auto& high_op = col_val_range.get_range_high_op();
-    const auto& low_op = col_val_range.get_range_low_op();
+// convert expr to sargs recursively
+bool OrcReader::_build_search_argument(const VExprSPtr& expr,
+                                       
std::unique_ptr<orc::SearchArgumentBuilder>& builder) {
+    if (expr == nullptr) {
+        return false;
+    }
 
-    // orc can only push down is_null. When col_value_range._contain_null = 
true, only indicating that
-    // value can be null, not equals null, so ignore _contain_null in 
col_value_range
-    if (col_val_range.is_high_value_maximum() && high_op == 
SQLFilterOp::FILTER_LESS_OR_EQUAL &&
-        col_val_range.is_low_value_mininum() && low_op == 
SQLFilterOp::FILTER_LARGER_OR_EQUAL) {
-        return predicates;
+    // if expr can not be pushed down, skip it and continue to next expr
+    if (!_check_expr_can_push_down(expr)) {
+        return true;
     }
 
-    if (low_value < high_value) {
-        if (!col_val_range.is_low_value_mininum() ||
-            SQLFilterOp::FILTER_LARGER_OR_EQUAL != low_op) {
-            auto [valid, low_literal] = convert_to_orc_literal<primitive_type>(
-                    type, &low_value, col_val_range.precision(), 
col_val_range.scale());
-            if (valid) {
-                OrcPredicate low_predicate;
-                low_predicate.col_name = col_val_range.column_name();
-                low_predicate.data_type = predicate_data_type;
-                low_predicate.op = low_op;
-                low_predicate.literals.emplace_back(low_literal);
-                predicates.emplace_back(low_predicate);
+    switch (expr->op()) {
+    case TExprOpcode::COMPOUND_AND:
+        builder->startAnd();
+        for (const auto& child : expr->children()) {
+            if (!_build_search_argument(child, builder)) {
+                return false;
             }
         }
-        if (!col_val_range.is_high_value_maximum() ||
-            SQLFilterOp::FILTER_LESS_OR_EQUAL != high_op) {
-            auto [valid, high_literal] = 
convert_to_orc_literal<primitive_type>(
-                    type, &high_value, col_val_range.precision(), 
col_val_range.scale());
-            if (valid) {
-                OrcPredicate high_predicate;
-                high_predicate.col_name = col_val_range.column_name();
-                high_predicate.data_type = predicate_data_type;
-                high_predicate.op = high_op;
-                high_predicate.literals.emplace_back(high_literal);
-                predicates.emplace_back(high_predicate);
+        builder->end();
+        break;
+    case TExprOpcode::COMPOUND_OR:
+        builder->startOr();
+        for (const auto& child : expr->children()) {
+            if (!_build_search_argument(child, builder)) {
+                return false;
             }
         }
+        builder->end();
+        break;
+    case TExprOpcode::COMPOUND_NOT:
+        builder->startNot();
+        DCHECK_EQ(expr->children().size(), 1);
+        if (!_build_search_argument(expr->children()[0], builder)) {
+            return false;
+        }
+        builder->end();
+        break;
+    case TExprOpcode::GE: {
+        builder->startNot();
+        DCHECK(expr->children().size() == 2);
+        DCHECK(expr->children()[0]->is_slot_ref());
+        DCHECK(expr->children()[1]->is_literal());
+        const auto* slot_ref = static_cast<const 
VSlotRef*>(expr->children()[0].get());
+        const auto* literal = static_cast<const 
VLiteral*>(expr->children()[1].get());
+        auto [valid, orc_literal, predicate_type] = 
_make_orc_leteral(slot_ref, literal);
+        if (!valid) {
+            return false;
+        }
+        builder->lessThan(slot_ref->expr_name(), predicate_type, orc_literal);
+        builder->end();
+        break;
     }
-    return predicates;
-}
-
-bool static build_search_argument(std::vector<OrcPredicate>& predicates, int 
index,
-                                  std::unique_ptr<orc::SearchArgumentBuilder>& 
builder) {
-    if (index >= predicates.size()) {
-        return false;
-    }
-    if (index < predicates.size() - 1) {
-        builder->startAnd();
+    case TExprOpcode::GT: {
+        builder->startNot();
+        DCHECK(expr->children().size() == 2);
+        DCHECK(expr->children()[0]->is_slot_ref());
+        DCHECK(expr->children()[1]->is_literal());
+        const auto* slot_ref = static_cast<const 
VSlotRef*>(expr->children()[0].get());
+        const auto* literal = static_cast<const 
VLiteral*>(expr->children()[1].get());
+        auto [valid, orc_literal, predicate_type] = 
_make_orc_leteral(slot_ref, literal);
+        if (!valid) {
+            return false;
+        }
+        builder->lessThanEquals(slot_ref->expr_name(), predicate_type, 
orc_literal);
+        builder->end();
+        break;
     }
-    OrcPredicate& predicate = predicates[index];
-    switch (predicate.op) {
-    case SQLFilterOp::FILTER_IN: {
-        if (predicate.literals.size() == 1) {
-            builder->equals(predicate.col_name, predicate.data_type, 
predicate.literals[0]);
-        } else {
-            builder->in(predicate.col_name, predicate.data_type, 
predicate.literals);
+    case TExprOpcode::LE: {
+        DCHECK(expr->children().size() == 2);
+        DCHECK(expr->children()[0]->is_slot_ref());
+        DCHECK(expr->children()[1]->is_literal());
+        const auto* slot_ref = static_cast<const 
VSlotRef*>(expr->children()[0].get());
+        const auto* literal = static_cast<const 
VLiteral*>(expr->children()[1].get());
+        auto [valid, orc_literal, predicate_type] = 
_make_orc_leteral(slot_ref, literal);
+        if (!valid) {
+            return false;
         }
+        builder->lessThanEquals(slot_ref->expr_name(), predicate_type, 
orc_literal);
         break;
     }
-    case SQLFilterOp::FILTER_LESS:
-        builder->lessThan(predicate.col_name, predicate.data_type, 
predicate.literals[0]);
+    case TExprOpcode::LT: {
+        DCHECK(expr->children().size() == 2);
+        DCHECK(expr->children()[0]->is_slot_ref());
+        DCHECK(expr->children()[1]->is_literal());
+        const auto* slot_ref = static_cast<const 
VSlotRef*>(expr->children()[0].get());
+        const auto* literal = static_cast<const 
VLiteral*>(expr->children()[1].get());
+        auto [valid, orc_literal, predicate_type] = 
_make_orc_leteral(slot_ref, literal);
+        if (!valid) {
+            return false;
+        }
+        builder->lessThan(slot_ref->expr_name(), predicate_type, orc_literal);
         break;
-    case SQLFilterOp::FILTER_LESS_OR_EQUAL:
-        builder->lessThanEquals(predicate.col_name, predicate.data_type, 
predicate.literals[0]);
+    }
+    case TExprOpcode::EQ: {
+        DCHECK(expr->children().size() == 2);
+        DCHECK(expr->children()[0]->is_slot_ref());
+        DCHECK(expr->children()[1]->is_literal());
+        const auto* slot_ref = static_cast<const 
VSlotRef*>(expr->children()[0].get());
+        const auto* literal = static_cast<const 
VLiteral*>(expr->children()[1].get());
+        auto [valid, orc_literal, predicate_type] = 
_make_orc_leteral(slot_ref, literal);
+        if (!valid) {
+            return false;
+        }
+        builder->equals(slot_ref->expr_name(), predicate_type, orc_literal);
         break;
-    case SQLFilterOp::FILTER_LARGER: {
+    }
+    case TExprOpcode::NE: {
         builder->startNot();
-        builder->lessThanEquals(predicate.col_name, predicate.data_type, 
predicate.literals[0]);
+        DCHECK(expr->children().size() == 2);
+        DCHECK(expr->children()[0]->is_slot_ref());
+        DCHECK(expr->children()[1]->is_literal());
+        const auto* slot_ref = static_cast<const 
VSlotRef*>(expr->children()[0].get());
+        const auto* literal = static_cast<const 
VLiteral*>(expr->children()[1].get());
+        auto [valid, orc_literal, predicate_type] = 
_make_orc_leteral(slot_ref, literal);
+        if (!valid) {
+            return false;
+        }
+        builder->equals(slot_ref->expr_name(), predicate_type, orc_literal);
         builder->end();
         break;
     }
-    case SQLFilterOp::FILTER_LARGER_OR_EQUAL: {
-        builder->startNot();
-        builder->lessThan(predicate.col_name, predicate.data_type, 
predicate.literals[0]);
-        builder->end();
+    case TExprOpcode::FILTER_IN: {
+        DCHECK(expr->children()[0]->is_slot_ref());
+        const auto* slot_ref = static_cast<const 
VSlotRef*>(expr->children()[0].get());
+        std::vector<orc::Literal> literals;
+        orc::PredicateDataType predicate_type;
+        for (size_t i = 1; i < expr->children().size(); ++i) {
+            DCHECK(expr->children()[i]->is_literal());
+            const auto* literal = static_cast<const 
VLiteral*>(expr->children()[i].get());
+            auto [valid, orc_literal, type] = _make_orc_leteral(slot_ref, 
literal);
+            if (!valid) {
+                return false;
+            }
+            literals.emplace_back(orc_literal);
+            predicate_type = type;
+        }
+        if (!literals.empty()) {
+            builder->in(slot_ref->expr_name(), predicate_type, literals);
+        }
         break;
     }
-    default:
-        return false;
+    case TExprOpcode::FILTER_NOT_IN: {
+        DCHECK(expr->children()[0]->is_slot_ref());
+        const auto* slot_ref = static_cast<const 
VSlotRef*>(expr->children()[0].get());
+        std::vector<orc::Literal> literals;
+        orc::PredicateDataType predicate_type;
+        for (size_t i = 1; i < expr->children().size(); ++i) {
+            DCHECK(expr->children()[i]->is_literal());
+            const auto* literal = static_cast<const 
VLiteral*>(expr->children()[i].get());
+            auto [valid, orc_literal, type] = _make_orc_leteral(slot_ref, 
literal);
+            if (!valid) {
+                return false;
+            }
+            literals.emplace_back(orc_literal);
+            predicate_type = type;
+        }
+        if (!literals.empty()) {
+            builder->startNot();
+            builder->in(slot_ref->expr_name(), predicate_type, literals);
+            builder->end();
+        }
+        break;
     }
-    if (index < predicates.size() - 1) {
-        bool can_build = build_search_argument(predicates, index + 1, builder);
-        if (!can_build) {
-            return false;
+    // is null and is not null is represented as function call
+    case TExprOpcode::INVALID_OPCODE: {
+        DCHECK(expr->node_type() == TExprNodeType::FUNCTION_CALL);
+        if (expr->fn().name.function_name == "is_null_pred") {
+            DCHECK(expr->children().size() == 1);
+            DCHECK(expr->children()[0]->is_slot_ref());
+            const auto* slot_ref = static_cast<const 
VSlotRef*>(expr->children()[0].get());
+            auto* slot = _tuple_descriptor->slots()[slot_ref->column_id()];
+            const auto* orc_type = 
_type_map[_col_name_to_file_col_name[slot->col_name()]];
+            const auto predicate_type = 
TYPEKIND_TO_PREDICATE_TYPE[orc_type->getKind()];
+            builder->isNull(slot_ref->expr_name(), predicate_type);
+        } else if (expr->fn().name.function_name == "is_not_null_pred") {
+            DCHECK(expr->children().size() == 1);
+            DCHECK(expr->children()[0]->is_slot_ref());
+            const auto* slot_ref = static_cast<const 
VSlotRef*>(expr->children()[0].get());
+            auto* slot = _tuple_descriptor->slots()[slot_ref->column_id()];
+            const auto* orc_type = 
_type_map[_col_name_to_file_col_name[slot->col_name()]];
+            const auto predicate_type = 
TYPEKIND_TO_PREDICATE_TYPE[orc_type->getKind()];
+            builder->startNot();
+            builder->isNull(slot_ref->expr_name(), predicate_type);
+            builder->end();
+        } else {
+            __builtin_unreachable();
         }
-        builder->end();
+        break;
+    }
+    default: {
+        // should not reach here, because _check_expr_can_push_down has 
already checked
+        __builtin_unreachable();
+    }
     }
     return true;
 }
 
-bool OrcReader::_init_search_argument(
-        std::unordered_map<std::string, ColumnValueRangeType>* 
colname_to_value_range) {
-    if ((!_enable_filter_by_min_max) || colname_to_value_range->empty()) {
+bool OrcReader::_init_search_argument(const VExprContextSPtrs& conjuncts) {
+    if (!_enable_filter_by_min_max) {
         return false;
     }
-    std::vector<OrcPredicate> predicates;
-    auto& root_type = _reader->getType();
-    std::unordered_map<std::string, const orc::Type*> type_map;
-    for (int i = 0; i < root_type.getSubtypeCount(); ++i) {
-        type_map.emplace(get_field_name_lower_case(&root_type, i), 
root_type.getSubtype(i));
+
+    // if no conjuncts can be pushed down, return false
+    if (!std::ranges::any_of(conjuncts, [this](const auto& expr_ctx) {
+            return _check_expr_can_push_down(expr_ctx->root());
+        })) {
+        return false;
     }
-    for (auto& col_name : _lazy_read_ctx.all_read_columns) {
-        auto iter = colname_to_value_range->find(col_name);
-        if (iter == colname_to_value_range->end()) {
-            continue;
-        }
-        auto type_it = type_map.find(_col_name_to_file_col_name[col_name]);
-        if (type_it == type_map.end()) {
-            continue;
+
+    auto sargBuilder = orc::SearchArgumentFactory::newBuilder();
+    sargBuilder->startAnd();
+    for (const auto& expr_ctx : conjuncts) {
+        if (!_build_search_argument(expr_ctx->root(), sargBuilder)) {

Review Comment:
   if there is A、B、C in `conjuncts`, and if `_build_search_argument(A)` return 
false, then we no longer try to push down B and C?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to