This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git

commit ae482da8257a332a4ab1b10470fcc2cfb8f995ad
Author: lihangyu <[email protected]>
AuthorDate: Tue Feb 6 18:14:18 2024 +0800

    [Fix](Variant) support materialize view for variant and accessing variant 
subcolumns (#30603)
    
    * [Fix](Variant) support materialize view for variant and accessing variant 
subcolumns
    1. fix schema change with path lost and lead to invalid data read
    2. support element_at function in BE side and use simdjson to parse data
    3. fix multi slot expression
---
 be/src/olap/rowset/segment_v2/segment.cpp          |   4 +-
 be/src/vec/columns/column_object.cpp               |   5 +
 be/src/vec/columns/column_object.h                 |   2 +
 be/src/vec/functions/function_variant_element.cpp  | 178 +++++++++++++++++++++
 be/src/vec/functions/simple_function_factory.h     |  10 +-
 .../main/java/org/apache/doris/common/Config.java  |   2 -
 .../java/org/apache/doris/analysis/Analyzer.java   |   2 +-
 .../glue/translator/PhysicalPlanTranslator.java    |  31 ++--
 .../rules/expression/ExpressionOptimization.java   |   4 +-
 .../rules/expression/rules/ElementAtToSlot.java    |  89 -----------
 .../rules/expression/rules/FunctionBinder.java     |   5 +-
 .../expressions/functions/scalar/ElementAt.java    |   4 +-
 .../scalar/PushDownToProjectionFunction.java       |  50 +++++-
 .../java/org/apache/doris/qe/SessionVariable.java  |   5 +
 .../doris/rewrite/ElementAtToSlotRefRule.java      |   6 -
 regression-test/data/variant_p0/mv/multi_slot.out  |  43 +++++
 .../suites/variant_p0/mv/multi_slot.groovy         |  93 +++++++++++
 .../variant_p0/schema_change/schema_change.groovy  |   4 +-
 18 files changed, 408 insertions(+), 129 deletions(-)

diff --git a/be/src/olap/rowset/segment_v2/segment.cpp 
b/be/src/olap/rowset/segment_v2/segment.cpp
index b7d3a4ca0f9..1ae3f7acc7f 100644
--- a/be/src/olap/rowset/segment_v2/segment.cpp
+++ b/be/src/olap/rowset/segment_v2/segment.cpp
@@ -449,8 +449,8 @@ Status Segment::new_column_iterator_with_path(const 
TabletColumn& tablet_column,
         // Alter table operation should read the whole variant column, since 
it does not aware of
         // subcolumns of variant during processing rewriting rowsets.
         // This is slow, since it needs to read all sub columns and merge them 
into a single column
-        RETURN_IF_ERROR(HierarchicalDataReader::create(iter, 
tablet_column.path_info(), node, root,
-                                                       output_as_raw_json));
+        RETURN_IF_ERROR(
+                HierarchicalDataReader::create(iter, root_path, node, root, 
output_as_raw_json));
         return Status::OK();
     }
 
diff --git a/be/src/vec/columns/column_object.cpp 
b/be/src/vec/columns/column_object.cpp
index aff38c56a80..552ad31809a 100644
--- a/be/src/vec/columns/column_object.cpp
+++ b/be/src/vec/columns/column_object.cpp
@@ -586,6 +586,11 @@ ColumnObject::ColumnObject(bool is_nullable_, bool 
create_root_)
     }
 }
 
+ColumnObject::ColumnObject(bool is_nullable_, DataTypePtr type, 
MutableColumnPtr&& column)
+        : is_nullable(is_nullable_) {
+    add_sub_column({}, std::move(column), type);
+}
+
 ColumnObject::ColumnObject(Subcolumns&& subcolumns_, bool is_nullable_)
         : is_nullable(is_nullable_),
           subcolumns(std::move(subcolumns_)),
diff --git a/be/src/vec/columns/column_object.h 
b/be/src/vec/columns/column_object.h
index 7f328992a25..407419ff5c4 100644
--- a/be/src/vec/columns/column_object.h
+++ b/be/src/vec/columns/column_object.h
@@ -231,6 +231,8 @@ public:
 
     explicit ColumnObject(bool is_nullable_, bool create_root = true);
 
+    explicit ColumnObject(bool is_nullable_, DataTypePtr type, 
MutableColumnPtr&& column);
+
     ColumnObject(Subcolumns&& subcolumns_, bool is_nullable_);
 
     ~ColumnObject() override = default;
diff --git a/be/src/vec/functions/function_variant_element.cpp 
b/be/src/vec/functions/function_variant_element.cpp
new file mode 100644
index 00000000000..89256635279
--- /dev/null
+++ b/be/src/vec/functions/function_variant_element.cpp
@@ -0,0 +1,178 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <glog/logging.h>
+#include <stddef.h>
+
+#include <memory>
+#include <ostream>
+#include <string>
+#include <string_view>
+#include <utility>
+#include <vector>
+
+#include "common/status.h"
+#include "exprs/json_functions.h"
+#include "simdjson.h"
+#include "vec/columns/column.h"
+#include "vec/columns/column_nullable.h"
+#include "vec/columns/column_object.h"
+#include "vec/columns/column_string.h"
+#include "vec/common/assert_cast.h"
+#include "vec/common/string_ref.h"
+#include "vec/core/block.h"
+#include "vec/data_types/data_type.h"
+#include "vec/data_types/data_type_nothing.h"
+#include "vec/data_types/data_type_nullable.h"
+#include "vec/data_types/data_type_object.h"
+#include "vec/data_types/data_type_string.h"
+#include "vec/functions/function.h"
+#include "vec/functions/function_helpers.h"
+#include "vec/functions/simple_function_factory.h"
+
+namespace doris::vectorized {
+
+class FunctionVariantElement : public IFunction {
+public:
+    static constexpr auto name = "element_at";
+    static FunctionPtr create() { return 
std::make_shared<FunctionVariantElement>(); }
+
+    // Get function name.
+    String get_name() const override { return name; }
+
+    bool use_default_implementation_for_nulls() const override { return true; }
+
+    size_t get_number_of_arguments() const override { return 2; }
+
+    ColumnNumbers get_arguments_that_are_always_constant() const override { 
return {1}; }
+
+    DataTypes get_variadic_argument_types_impl() const override {
+        return {std::make_shared<vectorized::DataTypeObject>(), 
std::make_shared<DataTypeString>()};
+    }
+
+    DataTypePtr get_return_type_impl(const DataTypes& arguments) const 
override {
+        
DCHECK((WhichDataType(remove_nullable(arguments[0]))).is_variant_type())
+                << "First argument for function: " << name
+                << " should be DataTypeObject but it has type " << 
arguments[0]->get_name() << ".";
+        DCHECK(is_string(arguments[1]))
+                << "Second argument for function: " << name << " should be 
String but it has type "
+                << arguments[1]->get_name() << ".";
+        return make_nullable(std::make_shared<DataTypeObject>());
+    }
+
+    Status execute_impl(FunctionContext* context, Block& block, const 
ColumnNumbers& arguments,
+                        size_t result, size_t input_rows_count) const override 
{
+        const auto* variant_col = check_and_get_column<ColumnObject>(
+                block.get_by_position(arguments[0]).column.get());
+        if (!variant_col) {
+            return Status::RuntimeError(
+                    fmt::format("unsupported types for function {}({}, {})", 
get_name(),
+                                
block.get_by_position(arguments[0]).type->get_name(),
+                                
block.get_by_position(arguments[1]).type->get_name()));
+        }
+        if (block.empty()) {
+            block.replace_by_position(result, 
block.get_by_position(result).type->create_column());
+            return Status::OK();
+        }
+
+        auto index_column = block.get_by_position(arguments[1]).column;
+        ColumnPtr result_column;
+        RETURN_IF_ERROR(get_element_column(*variant_col, index_column, 
&result_column));
+        block.replace_by_position(result, result_column);
+        return Status::OK();
+    }
+
+private:
+    static Status get_element_column(const ColumnObject& src, const ColumnPtr& 
index_column,
+                                     ColumnPtr* result) {
+        std::string field_name = index_column->get_data_at(0).to_string();
+        if (src.empty()) {
+            *result = ColumnObject::create(true);
+            return Status::OK();
+        }
+        if (src.is_scalar_variant() &&
+            
WhichDataType(remove_nullable(src.get_root_type())).is_string_or_fixed_string())
 {
+            // use parser to extract from root
+            auto type = std::make_shared<DataTypeString>();
+            MutableColumnPtr result_column = type->create_column();
+            const ColumnString& docs =
+                    
*check_and_get_column<ColumnString>(remove_nullable(src.get_root()).get());
+            simdjson::ondemand::parser parser;
+            std::vector<JsonPath> parsed_paths;
+            if (field_name.empty() || field_name[0] != '$') {
+                field_name = "$." + field_name;
+            }
+            JsonFunctions::parse_json_paths(field_name, &parsed_paths);
+            for (size_t i = 0; i < docs.size(); ++i) {
+                if (!extract_from_document(parser, docs.get_data_at(i), 
parsed_paths,
+                                           
assert_cast<ColumnString*>(result_column.get()))) {
+                    VLOG_DEBUG << "failed to parse " << docs.get_data_at(i) << 
", field "
+                               << field_name;
+                    result_column->insert_default();
+                }
+            }
+            *result = ColumnObject::create(true, type, 
std::move(result_column));
+            return Status::OK();
+        } else {
+            return Status::NotSupported("Not support element_at with none 
scalar variant {}",
+                                        src.debug_string());
+        }
+    }
+
+    static Status extract_from_document(simdjson::ondemand::parser& parser, 
const StringRef& doc,
+                                        const std::vector<JsonPath>& paths, 
ColumnString* column) {
+        try {
+            simdjson::padded_string json_str {doc.data, doc.size};
+            simdjson::ondemand::document doc = parser.iterate(json_str);
+            simdjson::ondemand::object object = doc.get_object();
+            simdjson::ondemand::value value;
+            RETURN_IF_ERROR(JsonFunctions::extract_from_object(object, paths, 
&value));
+            _write_data_to_column(value, column);
+        } catch (simdjson::simdjson_error& e) {
+            VLOG_DEBUG << "simdjson parse exception: " << e.what();
+            return Status::DataQualityError("simdjson parse exception {}", 
e.what());
+        }
+        return Status::OK();
+    }
+
+    static void _write_data_to_column(simdjson::ondemand::value& value, 
ColumnString* column) {
+        switch (value.type()) {
+        case simdjson::ondemand::json_type::null: {
+            column->insert_default();
+            break;
+        }
+        case simdjson::ondemand::json_type::boolean: {
+            if (value.get_bool()) {
+                column->insert_data("1", 1);
+            } else {
+                column->insert_data("0", 1);
+            }
+            break;
+        }
+        default: {
+            auto value_str = simdjson::to_json_string(value).value();
+            column->insert_data(value_str.data(), value_str.length());
+        }
+        }
+    }
+};
+
+void register_function_variant_element(SimpleFunctionFactory& factory) {
+    factory.register_function<FunctionVariantElement>();
+}
+
+} // namespace doris::vectorized
diff --git a/be/src/vec/functions/simple_function_factory.h 
b/be/src/vec/functions/simple_function_factory.h
index 5c47198fd6b..9bedc204cb0 100644
--- a/be/src/vec/functions/simple_function_factory.h
+++ b/be/src/vec/functions/simple_function_factory.h
@@ -90,6 +90,7 @@ void register_function_array(SimpleFunctionFactory& factory);
 void register_function_map(SimpleFunctionFactory& factory);
 void register_function_struct(SimpleFunctionFactory& factory);
 void register_function_struct_element(SimpleFunctionFactory& factory);
+void register_function_variant_element(SimpleFunctionFactory& factory);
 void register_function_geo(SimpleFunctionFactory& factory);
 void register_function_multi_string_position(SimpleFunctionFactory& factory);
 void register_function_multi_string_search(SimpleFunctionFactory& factory);
@@ -178,8 +179,12 @@ public:
 
         auto iter = function_creators.find(key_str);
         if (iter == function_creators.end()) {
-            LOG(WARNING) << fmt::format("Function signature {} is not found", 
key_str);
-            return nullptr;
+            // use original name as signature without variadic arguments
+            iter = function_creators.find(name);
+            if (iter == function_creators.end()) {
+                LOG(WARNING) << fmt::format("Function signature {} is not 
found", key_str);
+                return nullptr;
+            }
         }
 
         return iter->second()->build(arguments, return_type);
@@ -280,6 +285,7 @@ public:
             register_function_ip(instance);
             register_function_tokenize(instance);
             register_function_ignore(instance);
+            register_function_variant_element(instance);
         });
         return instance;
     }
diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java 
b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
index be58c139852..a7f642944ed 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
@@ -2521,8 +2521,6 @@ public class Config extends ConfigBase {
     @ConfField
     public static String cloud_sql_server_cluster_id = 
"RESERVED_CLUSTER_ID_FOR_SQL_SERVER";
 
-    @ConfField(mutable = true)
-    public static boolean enable_variant_access_in_original_planner = false;
     
//==========================================================================
     //                      end of cloud config
     
//==========================================================================
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/Analyzer.java 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/Analyzer.java
index 38b7403960c..0b4317f8714 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/Analyzer.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/Analyzer.java
@@ -1039,7 +1039,7 @@ public class Analyzer {
 
         LOG.debug("register column ref table {}, colName {}, col {}", tblName, 
colName, col.toSql());
         if (col.getType().isVariantType() || (subColNames != null && 
!subColNames.isEmpty())) {
-            if (!Config.enable_variant_access_in_original_planner
+            if (getContext() != null && 
!getContext().getSessionVariable().enableVariantAccessInOriginalPlanner
                     && (subColNames != null && !subColNames.isEmpty())) {
                 ErrorReport.reportAnalysisException("Variant sub-column access 
is disabled in original planner,"
                         + "set enable_variant_access_in_original_planner = 
true in session variable");
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
index cc8b2483ff8..94b2f78cf11 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
@@ -1657,28 +1657,31 @@ public class PhysicalPlanTranslator extends 
DefaultPlanVisitor<PlanFragment, Pla
         return inputFragment;
     }
 
-    // Get top most PushDownToProjectionFunction from expression
-    private Expression getOriginalFunctionForRewritten(NamedExpression 
expression) {
-        List<Expression> targetExpr = 
expression.collectFirst(PushDownToProjectionFunction.class::isInstance);
-        if (!targetExpr.isEmpty()) {
-            return targetExpr.get(0);
-        }
-        return null;
+    // collect all valid PushDownToProjectionFunction from expression
+    private List<Expression> 
getPushDownToProjectionFunctionForRewritten(NamedExpression expression) {
+        List<Expression> targetExprList = 
expression.collectToList(PushDownToProjectionFunction.class::isInstance);
+        return targetExprList.stream()
+                .filter(PushDownToProjectionFunction::validToPushDown)
+                .collect(Collectors.toList());
     }
 
     // register rewritten slots from original PushDownToProjectionFunction
     private void registerRewrittenSlot(PhysicalProject<? extends Plan> 
project, OlapScanNode olapScanNode) {
         // register slots that are rewritten from element_at/etc..
-        for (NamedExpression expr : project.getProjects()) {
+        List<Expression> allPushDownProjectionFunctions = 
project.getProjects().stream()
+                .map(this::getPushDownToProjectionFunctionForRewritten)
+                .flatMap(List::stream)
+                .collect(Collectors.toList());
+        for (Expression expr : allPushDownProjectionFunctions) {
+            PushDownToProjectionFunction function = 
(PushDownToProjectionFunction) expr;
             if (context != null
                     && context.getConnectContext() != null
                     && context.getConnectContext().getStatementContext() != 
null) {
-                Slot rewrittenSlot = context.getConnectContext()
-                        
.getStatementContext().getRewrittenSlotRefByOriginalExpr(getOriginalFunctionForRewritten(expr));
-                if (rewrittenSlot != null) {
-                    TupleDescriptor tupleDescriptor = 
context.getTupleDesc(olapScanNode.getTupleId());
-                    context.createSlotDesc(tupleDescriptor, (SlotReference) 
rewrittenSlot);
-                }
+                Slot argumentSlot = 
function.getInputSlots().stream().findFirst().get();
+                Expression rewrittenSlot = 
PushDownToProjectionFunction.rewriteToSlot(
+                        function, (SlotReference) argumentSlot);
+                TupleDescriptor tupleDescriptor = 
context.getTupleDesc(olapScanNode.getTupleId());
+                context.createSlotDesc(tupleDescriptor, (SlotReference) 
rewrittenSlot);
             }
         }
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/expression/ExpressionOptimization.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/expression/ExpressionOptimization.java
index 6064a8d210a..e7b3a308f0f 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/expression/ExpressionOptimization.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/expression/ExpressionOptimization.java
@@ -21,7 +21,6 @@ import 
org.apache.doris.nereids.rules.expression.rules.ArrayContainToArrayOverla
 import org.apache.doris.nereids.rules.expression.rules.CaseWhenToIf;
 import org.apache.doris.nereids.rules.expression.rules.DateFunctionRewrite;
 import org.apache.doris.nereids.rules.expression.rules.DistinctPredicatesRule;
-import org.apache.doris.nereids.rules.expression.rules.ElementAtToSlot;
 import org.apache.doris.nereids.rules.expression.rules.ExtractCommonFactorRule;
 import org.apache.doris.nereids.rules.expression.rules.OrToIn;
 import 
org.apache.doris.nereids.rules.expression.rules.SimplifyComparisonPredicate;
@@ -49,8 +48,7 @@ public class ExpressionOptimization extends ExpressionRewrite 
{
             OrToIn.INSTANCE,
             ArrayContainToArrayOverlap.INSTANCE,
             CaseWhenToIf.INSTANCE,
-            TopnToMax.INSTANCE,
-            ElementAtToSlot.INSTANCE
+            TopnToMax.INSTANCE
     );
     private static final ExpressionRuleExecutor EXECUTOR = new 
ExpressionRuleExecutor(OPTIMIZE_REWRITE_RULES);
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/expression/rules/ElementAtToSlot.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/expression/rules/ElementAtToSlot.java
deleted file mode 100644
index adc050c9871..00000000000
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/expression/rules/ElementAtToSlot.java
+++ /dev/null
@@ -1,89 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.doris.nereids.rules.expression.rules;
-
-import org.apache.doris.nereids.StatementContext;
-import org.apache.doris.nereids.rules.expression.ExpressionRewriteContext;
-import org.apache.doris.nereids.rules.expression.ExpressionRewriteRule;
-import org.apache.doris.nereids.trees.expressions.Expression;
-import org.apache.doris.nereids.trees.expressions.SlotReference;
-import org.apache.doris.nereids.trees.expressions.StatementScopeIdGenerator;
-import org.apache.doris.nereids.trees.expressions.functions.scalar.ElementAt;
-import org.apache.doris.nereids.trees.expressions.literal.VarcharLiteral;
-import 
org.apache.doris.nereids.trees.expressions.visitor.DefaultExpressionRewriter;
-import org.apache.doris.qe.ConnectContext;
-
-import java.util.List;
-import java.util.Optional;
-import java.util.stream.Collectors;
-
-/**
- * Transform element_at function to SlotReference for variant sub-column 
access.
- * This optimization will help query engine to prune as many sub columns as 
possible
- * to speed up query.
- * eg: element_at(element_at(v, "a"), "b") -> SlotReference(column=v, 
subColLabels=["a", "b"])
- */
-public class ElementAtToSlot extends 
DefaultExpressionRewriter<ExpressionRewriteContext> implements
-        ExpressionRewriteRule<ExpressionRewriteContext> {
-
-    public static final ElementAtToSlot INSTANCE = new ElementAtToSlot();
-
-    @Override
-    public Expression rewrite(Expression expr, ExpressionRewriteContext ctx) {
-        return expr.accept(this, ctx);
-    }
-
-    /**
-     * Rewrites an {@link ElementAt} instance to a {@link SlotReference}.
-     * This method is used to transform an ElementAt expr into a SlotReference,
-     * based on the provided topColumnSlot and the context of the statement.
-     *
-     * @param elementAt The {@link ElementAt} instance to be rewritten.
-     * @param topColumnSlot The {@link SlotReference} that represents the top 
column slot.
-     * @return A {@link SlotReference} that represents the rewritten element.
-     *         If a target column slot is found in the context, it is returned 
to avoid duplicates.
-     *         Otherwise, a new SlotReference is created and added to the 
context.
-     */
-    public static Expression rewriteToSlot(ElementAt elementAt, SlotReference 
topColumnSlot) {
-        // rewrite to slotRef
-        StatementContext ctx = ConnectContext.get().getStatementContext();
-        List<String> fullPaths = elementAt.collectToList(node -> node 
instanceof VarcharLiteral).stream()
-                .map(node -> ((VarcharLiteral) node).getValue())
-                .collect(Collectors.toList());
-        SlotReference targetColumnSlot = ctx.getPathSlot(topColumnSlot, 
fullPaths);
-        if (targetColumnSlot != null) {
-            // avoid duplicated slots
-            return targetColumnSlot;
-        }
-        SlotReference slotRef = new 
SlotReference(StatementScopeIdGenerator.newExprId(),
-                topColumnSlot.getName(), topColumnSlot.getDataType(),
-                topColumnSlot.nullable(), topColumnSlot.getQualifier(), 
topColumnSlot.getTable().get(),
-                topColumnSlot.getColumn().get(), 
Optional.of(topColumnSlot.getInternalName()),
-                fullPaths);
-        ctx.addPathSlotRef(topColumnSlot, fullPaths, slotRef, elementAt);
-        ctx.addSlotToRelation(slotRef, ctx.getRelationBySlot(topColumnSlot));
-
-        return slotRef;
-    }
-
-    @Override
-    public Expression visitElementAt(ElementAt elementAt, 
ExpressionRewriteContext context) {
-        // todo
-        return elementAt;
-    }
-}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/expression/rules/FunctionBinder.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/expression/rules/FunctionBinder.java
index f60f38f7649..bf0812cdc21 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/expression/rules/FunctionBinder.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/expression/rules/FunctionBinder.java
@@ -205,15 +205,14 @@ public class FunctionBinder extends 
AbstractExpressionRewriteRule {
             if (ConnectContext.get() != null
                     && ConnectContext.get().getSessionVariable() != null
                     && 
!ConnectContext.get().getSessionVariable().isEnableRewriteElementAtToSlot()) {
-                throw new AnalysisException(
-                        "set enable_rewrite_element_at_to_slot=true when using 
element_at function for variant type");
+                return boundFunction;
             }
             Slot slot = elementAt.getInputSlots().stream().findFirst().get();
             if (slot.hasUnbound()) {
                 slot = (Slot) super.visit(slot, context);
             }
             // rewrite to slot and bound this slot
-            return ElementAtToSlot.rewriteToSlot(elementAt, (SlotReference) 
slot);
+            return PushDownToProjectionFunction.rewriteToSlot(elementAt, 
(SlotReference) slot);
         }
         return boundFunction;
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/scalar/ElementAt.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/scalar/ElementAt.java
index 6bd5f1bd8e9..d4fe6d50438 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/scalar/ElementAt.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/scalar/ElementAt.java
@@ -39,8 +39,8 @@ import java.util.List;
 /**
  * ScalarFunction 'element_at'. This class is generated by GenerateFunction.
  */
-public class ElementAt extends ScalarFunction
-        implements BinaryExpression, ExplicitlyCastableSignature, 
AlwaysNullable, PushDownToProjectionFunction {
+public class ElementAt extends PushDownToProjectionFunction
+        implements BinaryExpression, ExplicitlyCastableSignature, 
AlwaysNullable {
 
     public static final List<FunctionSignature> SIGNATURES = ImmutableList.of(
             FunctionSignature.ret(new FollowToAnyDataType(0))
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/scalar/PushDownToProjectionFunction.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/scalar/PushDownToProjectionFunction.java
index e2473ea0954..362a84bc943 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/scalar/PushDownToProjectionFunction.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/scalar/PushDownToProjectionFunction.java
@@ -17,24 +17,68 @@
 
 package org.apache.doris.nereids.trees.expressions.functions.scalar;
 
+import org.apache.doris.nereids.StatementContext;
 import org.apache.doris.nereids.trees.expressions.Expression;
+import org.apache.doris.nereids.trees.expressions.SlotReference;
+import org.apache.doris.nereids.trees.expressions.StatementScopeIdGenerator;
+import org.apache.doris.nereids.trees.expressions.literal.VarcharLiteral;
+import org.apache.doris.qe.ConnectContext;
 
+import java.util.List;
+import java.util.Optional;
 import java.util.stream.Collectors;
 
 /**
  * Function that could be rewritten and pushed down to projection
  */
-public interface PushDownToProjectionFunction {
+public abstract class PushDownToProjectionFunction extends ScalarFunction {
+    public PushDownToProjectionFunction(String name, Expression... arguments) {
+        super(name, arguments);
+    }
+
     /**
      * check if specified function could be pushed down to project
      * @param pushDownExpr expr to check
      * @return if it is valid to push down input expr
      */
-    static boolean validToPushDown(Expression pushDownExpr) {
+    public static boolean validToPushDown(Expression pushDownExpr) {
         // Currently only element at for variant type could be pushed down
-        return !pushDownExpr.collectToList(
+        return pushDownExpr != null && !pushDownExpr.collectToList(
                     
PushDownToProjectionFunction.class::isInstance).stream().filter(
                             x -> ((Expression) 
x).getDataType().isVariantType()).collect(
                     Collectors.toList()).isEmpty();
     }
+
+    /**
+     * Rewrites an {@link PushDownToProjectionFunction} instance to a {@link 
SlotReference}.
+     * This method is used to transform an PushDownToProjectionFunction expr 
into a SlotReference,
+     * based on the provided topColumnSlot and the context of the statement.
+     *
+     * @param pushedFunction The {@link PushDownToProjectionFunction} instance 
to be rewritten.
+     * @param topColumnSlot The {@link SlotReference} that represents the top 
column slot.
+     * @return A {@link SlotReference} that represents the rewritten element.
+     *         If a target column slot is found in the context, it is returned 
to avoid duplicates.
+     *         Otherwise, a new SlotReference is created and added to the 
context.
+     */
+    public static Expression rewriteToSlot(PushDownToProjectionFunction 
pushedFunction, SlotReference topColumnSlot) {
+        // rewrite to slotRef
+        StatementContext ctx = ConnectContext.get().getStatementContext();
+        List<String> fullPaths = pushedFunction.collectToList(node -> node 
instanceof VarcharLiteral).stream()
+                .map(node -> ((VarcharLiteral) node).getValue())
+                .collect(Collectors.toList());
+        SlotReference targetColumnSlot = ctx.getPathSlot(topColumnSlot, 
fullPaths);
+        if (targetColumnSlot != null) {
+            // avoid duplicated slots
+            return targetColumnSlot;
+        }
+        SlotReference slotRef = new 
SlotReference(StatementScopeIdGenerator.newExprId(),
+                topColumnSlot.getName(), topColumnSlot.getDataType(),
+                topColumnSlot.nullable(), topColumnSlot.getQualifier(), 
topColumnSlot.getTable().get(),
+                topColumnSlot.getColumn().get(), 
Optional.of(topColumnSlot.getInternalName()),
+                fullPaths);
+        ctx.addPathSlotRef(topColumnSlot, fullPaths, slotRef, pushedFunction);
+        ctx.addSlotToRelation(slotRef, ctx.getRelationBySlot(topColumnSlot));
+
+        return slotRef;
+    }
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
index 93a592d64a9..df394ec2810 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
@@ -186,6 +186,8 @@ public class SessionVariable implements Serializable, 
Writable {
 
     public static final String DELETE_WITHOUT_PARTITION = 
"delete_without_partition";
 
+    public static final String ENABLE_VARIANT_ACCESS_IN_ORIGINAL_PLANNER = 
"enable_variant_access_in_original_planner";
+
     // set the default parallelism for send batch when execute InsertStmt 
operation,
     // if the value for parallelism exceed 
`max_send_batch_parallelism_per_job` in BE config,
     // then the coordinator be will use the value of 
`max_send_batch_parallelism_per_job`
@@ -793,6 +795,9 @@ public class SessionVariable implements Serializable, 
Writable {
     @VariableMgr.VarAttr(name = SEND_BATCH_PARALLELISM, needForward = true)
     public int sendBatchParallelism = 1;
 
+    @VariableMgr.VarAttr(name = ENABLE_VARIANT_ACCESS_IN_ORIGINAL_PLANNER)
+    public boolean enableVariantAccessInOriginalPlanner = false;
+
     @VariableMgr.VarAttr(name = EXTRACT_WIDE_RANGE_EXPR, needForward = true)
     public boolean extractWideRangeExpr = true;
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/rewrite/ElementAtToSlotRefRule.java 
b/fe/fe-core/src/main/java/org/apache/doris/rewrite/ElementAtToSlotRefRule.java
index e6234b002f1..e1d32530ec9 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/rewrite/ElementAtToSlotRefRule.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/rewrite/ElementAtToSlotRefRule.java
@@ -42,12 +42,6 @@ public class ElementAtToSlotRefRule implements 
ExprRewriteRule  {
 
     @Override
     public Expr apply(Expr expr, Analyzer analyzer, ClauseType clauseType) 
throws AnalysisException {
-        // Only check element at of variant all rewrited to slots
-        List<Expr> elementAtFunctions = Lists.newArrayList();
-        getElementAtFunction(expr, elementAtFunctions);
-        if (!elementAtFunctions.isEmpty()) {
-            throw new AnalysisException("element_at should not appear in 
common rewrite stage");
-        }
         return expr;
     }
 
diff --git a/regression-test/data/variant_p0/mv/multi_slot.out 
b/regression-test/data/variant_p0/mv/multi_slot.out
new file mode 100644
index 00000000000..2499e282895
--- /dev/null
+++ b/regression-test/data/variant_p0/mv/multi_slot.out
@@ -0,0 +1,43 @@
+-- This file is automatically generated. You should know what you did if you 
want to edit this
+-- !select_star --
+\N     \N
+456    \N
+
+-- !select_star --
+\N     \N
+\N     \N
+1      \N
+1      \N
+1      \N
+3      \N
+5      \N
+
+-- !select_star --
+-4     {"k1":-4,"k2":-4,"k3":"d"}
+-5     {"k1":-4,"k2":-4,"k4":"d"}
+-6     {"k1":-4,"k2":-4,"k4":{"k44":789}}
+1      {"k1":1,"k2":1,"k3":"a"}
+2      {"k1":2,"k2":2,"k3":"b"}
+3      {"k1":-3,"k3":"c"}
+4      {"k1":-3,"k4":{"k44":456}}
+
+-- !select_star --
+\N     \N
+456    \N
+789    \N
+
+-- !select_mv --
+\N     \N
+1      \N
+3      \N
+5      \N
+
+-- !select_mv --
+\N     \N
+\N     \N
+1      \N
+1      \N
+1      \N
+3      \N
+5      \N
+
diff --git a/regression-test/suites/variant_p0/mv/multi_slot.groovy 
b/regression-test/suites/variant_p0/mv/multi_slot.groovy
new file mode 100644
index 00000000000..98b8f7f549e
--- /dev/null
+++ b/regression-test/suites/variant_p0/mv/multi_slot.groovy
@@ -0,0 +1,93 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite ("multi_slot") {
+    sql """ DROP TABLE IF EXISTS multi_slot; """
+
+    sql """
+            create table multi_slot(
+                k int null,
+                v variant null
+            )
+            duplicate key (k)
+            distributed BY hash(k) buckets 3
+            properties("replication_num" = "1");
+        """
+
+    sql """insert into multi_slot select 1,'{"k1" : 1, "k2" : 1, "k3" : 
"a"}';"""
+    sql """insert into multi_slot select 2,'{"k1" : 2, "k2" : 2, "k3" : 
"b"}';"""
+    sql """insert into multi_slot select 3,'{"k1" : -3, "k2" : null, "k3" : 
"c"}';"""
+    sql """insert into multi_slot select 4,'{"k1" : -3, "k2" : null, "k4" : 
{"k44" : 456}}';"""
+    order_qt_select_star "select abs(cast(v['k4']['k44'] as int)), 
sum(abs(cast(v['k2'] as int)+2)+cast(v['k3'] as int)+3) from multi_slot group 
by abs(cast(v['k4']['k44'] as int))"
+
+    createMV ("create materialized view k1a2p2ap3p as select abs(cast(v['k1'] 
as int))+cast(v['k2'] as int)+1,abs(cast(v['k2'] as int)+2)+cast(v['k3'] as 
int)+3 from multi_slot;")
+
+    createMV("create materialized view k1a2p2ap3ps as select abs(cast(v['k1'] 
as int))+cast(v['k2'] as int)+1,sum(abs(cast(v['k2'] as int)+2)+cast(v['k3'] as 
int)+3) from multi_slot group by abs(cast(v['k1'] as int))+cast(v['k2'] as 
int)+1;")
+
+    createMV("create materialized view k1a2p2ap3psp as select 
abs(cast(v['k4']['k44'] as int)), sum(abs(cast(v['k2'] as int)+2)+cast(v['k3'] 
as int)+3) from multi_slot group by abs(cast(v['k4']['k44'] as int));")
+
+    sql """insert into multi_slot select -4,'{"k1" : -4, "k2" : -4, "k3" : 
"d"}';"""
+    sql """insert into multi_slot select -5,'{"k1" : -4, "k2" : -4, "k4" : 
"d"}';"""
+    sql """insert into multi_slot select -6,'{"k1" : -4, "k2" : -4, "k4" : 
{"k44" : 789}}';"""
+
+    sql "SET experimental_enable_nereids_planner=true"
+    sql "SET enable_fallback_to_original_planner=false"
+
+
+    order_qt_select_star "select abs(cast(v['k1'] as int))+cast(v['k2'] as 
int)+1,abs(cast(v['k2'] as int)+2)+cast(v['k3'] as int)+3 from multi_slot;"
+    order_qt_select_star "select * from multi_slot order by cast(v['k1'] as 
int);"
+    // TODO fix and remove enable_rewrite_element_at_to_slot
+    order_qt_select_star "select 
/*+SET_VAR(enable_rewrite_element_at_to_slot=false) */ abs(cast(v['k4']['k44'] 
as int)), sum(abs(cast(v['k2'] as int)+2)+cast(v['k3'] as int)+3) from 
multi_slot group by abs(cast(v['k4']['k44'] as int))"
+
+    def retry_times = 60
+    for (def i = 0; i < retry_times; ++i) {
+        boolean is_k1a2p2ap3p = false
+        boolean is_k1a2p2ap3ps = false
+        boolean is_d_table = false
+        explain {
+            sql("select  /*+SET_VAR(enable_rewrite_element_at_to_slot=false) 
*/ abs(cast(v['k1'] as int))+cast(v['k2'] as int)+1,sum(abs(cast(v['k2'] as 
int)+2)+cast(v['k3'] as int)+3) from multi_slot group by abs(cast(v['k1'] as 
int))+cast(v['k2'] as int)+1 order by abs(cast(v['k1'] as int))+cast(v['k2'] as 
int)+1")
+            check { explainStr, ex, startTime, endTime ->
+                if (ex != null) {
+                    throw ex;
+                }
+                logger.info("explain result: ${explainStr}".toString())
+                is_k1a2p2ap3p = explainStr.contains"(k1a2p2ap3p)"
+                is_k1a2p2ap3ps = explainStr.contains("(k1a2p2ap3ps)")
+                is_d_table = explainStr.contains("(multi_slot)")
+                assert is_k1a2p2ap3p || is_k1a2p2ap3ps || is_d_table
+            }
+        }
+        // FIXME: the mv selector maybe select base table forever when exist 
multi mv,
+        //        so this pr just treat as success if select base table.
+        //        we should remove is_d_table in the future
+        if (is_d_table || is_k1a2p2ap3p || is_k1a2p2ap3ps) {
+            break
+        }
+        if (i + 1 == retry_times) {
+            throw new IllegalStateException("retry and failed too much")
+        }
+        sleep(1000)
+    }
+    order_qt_select_mv "select  
/*+SET_VAR(enable_rewrite_element_at_to_slot=false) */ abs(cast(v['k1'] as 
int))+cast(v['k2'] as int)+1,sum(abs(cast(v['k2'] as int)+2)+cast(v['k3'] as 
int)+3) from multi_slot group by abs(cast(v['k1'] as int))+cast(v['k2'] as 
int)+1 order by abs(cast(v['k1'] as int))+cast(v['k2'] as int)+1;"
+
+    explain {
+        sql("select abs(cast(v['k1'] as int))+cast(v['k2'] as 
int)+1,abs(cast(v['k2'] as int)+2)+cast(v['k3'] as int)+3 from multi_slot order 
by abs(cast(v['k1'] as int))+cast(v['k2'] as int)+1,abs(cast(v['k2'] as 
int)+2)+cast(v['k3'] as int)+3")
+        contains "(k1a2p2ap3p)"
+    }
+    order_qt_select_mv "select abs(cast(v['k1'] as int))+cast(v['k2'] as 
int)+1,abs(cast(v['k2'] as int)+2)+cast(v['k3'] as int)+3 from multi_slot order 
by abs(cast(v['k1'] as int))+cast(v['k2'] as int)+1,abs(cast(v['k2'] as 
int)+2)+cast(v['k3'] as int)+3;"
+
+}
diff --git 
a/regression-test/suites/variant_p0/schema_change/schema_change.groovy 
b/regression-test/suites/variant_p0/schema_change/schema_change.groovy
index fe593553fda..42cef32c8e5 100644
--- a/regression-test/suites/variant_p0/schema_change/schema_change.groovy
+++ b/regression-test/suites/variant_p0/schema_change/schema_change.groovy
@@ -77,6 +77,6 @@ suite("regression_test_variant_schema_change", 
"variant_type"){
     sql """INSERT INTO ${table_name} SELECT k, v, v from ${table_name} limit 
8101"""
     sql """DROP MATERIALIZED VIEW var_cnt ON ${table_name}"""
     sql """INSERT INTO ${table_name} SELECT k, v,v  from ${table_name} limit 
1111"""
-    // TODO support select from mv
-    // qt_sql """select v['k1'], cast(v['k2'] as string) from ${table_name} 
order by k desc limit 10"""
+    // select from mv
+    qt_sql """select v['k1'], cast(v['k2'] as string) from ${table_name} order 
by k desc limit 10"""
 }
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to