This is an automated email from the ASF dual-hosted git repository.

starocean999 pushed a commit to branch dev_rec
in repository https://gitbox.apache.org/repos/asf/doris.git

commit e6f133b2aec9443ede1da66eb0799dd1bd84ea5d
Author: lichi <[email protected]>
AuthorDate: Thu Oct 9 15:14:12 2025 +0800

    recursive cte fe part
---
 .../antlr4/org/apache/doris/nereids/DorisLexer.g4  |   1 +
 .../antlr4/org/apache/doris/nereids/DorisParser.g4 |   2 +-
 .../doris/catalog/RecursiveCteTempTable.java       |  26 ++
 .../java/org/apache/doris/catalog/TableIf.java     |   6 +-
 .../org/apache/doris/nereids/CascadesContext.java  |  37 +-
 .../glue/translator/PhysicalPlanTranslator.java    |  29 ++
 .../doris/nereids/parser/LogicalPlanBuilder.java   |   3 +-
 .../apache/doris/nereids/pattern/MemoPatterns.java |  24 ++
 .../properties/ChildOutputPropertyDeriver.java     |  12 +
 .../org/apache/doris/nereids/rules/RuleSet.java    |   4 +
 .../org/apache/doris/nereids/rules/RuleType.java   |   2 +
 .../doris/nereids/rules/analysis/AnalyzeCTE.java   | 111 +++++-
 .../doris/nereids/rules/analysis/BindRelation.java |  27 +-
 .../nereids/rules/analysis/CollectRelation.java    |  11 +-
 .../nereids/rules/analysis/SubExprAnalyzer.java    |   4 +-
 ...RecursiveCteScanToPhysicalRecursiveCteScan.java |  42 +++
 .../LogicalRecursiveCteToPhysicalRecursiveCte.java |  39 ++
 .../nereids/rules/rewrite/AdjustNullable.java      |  17 +
 .../doris/nereids/rules/rewrite/CTEInline.java     |  31 +-
 .../doris/nereids/rules/rewrite/ColumnPruning.java | 106 ++++++
 .../doris/nereids/stats/StatsCalculator.java       |  29 ++
 .../trees/copier/LogicalPlanDeepCopier.java        |  23 ++
 .../apache/doris/nereids/trees/plans/PlanType.java |   4 +
 .../commands/CreateMaterializedViewCommand.java    |   2 +-
 .../distribute/worker/job/AssignedJobBuilder.java  |  93 +++++
 .../worker/job/UnassignedJobBuilder.java           |  13 +
 .../worker/job/UnassignedRecursiveCteScanJob.java  |  62 +++
 .../trees/plans/logical/LogicalRecursiveCte.java   | 419 +++++++++++++++++++++
 .../plans/logical/LogicalRecursiveCteScan.java     |  66 ++++
 .../trees/plans/logical/LogicalSubQueryAlias.java  |  31 +-
 .../trees/plans/physical/PhysicalRecursiveCte.java | 251 ++++++++++++
 .../plans/physical/PhysicalRecursiveCteScan.java   |  85 +++++
 .../nereids/trees/plans/visitor/PlanVisitor.java   |  10 +
 .../trees/plans/visitor/RelationVisitor.java       |  10 +
 .../org/apache/doris/planner/RecursiveCteNode.java |  52 +++
 .../apache/doris/planner/RecursiveCteScanNode.java | 113 ++++++
 .../apache/doris/statistics/StatisticalType.java   |   2 +
 .../recursive_cte/test_recursive_cte.groovy        | 102 +++++
 38 files changed, 1845 insertions(+), 56 deletions(-)

diff --git a/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisLexer.g4 
b/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisLexer.g4
index 1ae0cc0e075..f2a543b3d0c 100644
--- a/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisLexer.g4
+++ b/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisLexer.g4
@@ -438,6 +438,7 @@ REAL: 'REAL';
 REBALANCE: 'REBALANCE';
 RECENT: 'RECENT';
 RECOVER: 'RECOVER';
+RECURSIVE: 'RECURSIVE';
 RECYCLE: 'RECYCLE';
 REFRESH: 'REFRESH';
 REFERENCES: 'REFERENCES';
diff --git a/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4 
b/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4
index 25c0d9a00ab..3f35084e689 100644
--- a/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4
+++ b/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4
@@ -1200,7 +1200,7 @@ cte
     ;
 
 aliasQuery
-    : identifier columnAliases? AS LEFT_PAREN query RIGHT_PAREN
+    : RECURSIVE? identifier columnAliases? AS LEFT_PAREN query RIGHT_PAREN
     ;
 
 columnAliases
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/catalog/RecursiveCteTempTable.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/RecursiveCteTempTable.java
new file mode 100644
index 00000000000..9f36b04dfc4
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/RecursiveCteTempTable.java
@@ -0,0 +1,26 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.catalog;
+
+import java.util.List;
+
+public class RecursiveCteTempTable extends Table {
+    public RecursiveCteTempTable(String tableName, List<Column> fullSchema) {
+        super(-1, tableName, TableType.RECURSIVE_CTE_TEMP_TABLE, fullSchema);
+    }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/TableIf.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/TableIf.java
index d15fabd6b75..5aa7f9ead76 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/TableIf.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/TableIf.java
@@ -398,7 +398,8 @@ public interface TableIf {
         @Deprecated ICEBERG, @Deprecated HUDI, JDBC,
         TABLE_VALUED_FUNCTION, HMS_EXTERNAL_TABLE, ES_EXTERNAL_TABLE, 
MATERIALIZED_VIEW, JDBC_EXTERNAL_TABLE,
         ICEBERG_EXTERNAL_TABLE, TEST_EXTERNAL_TABLE, PAIMON_EXTERNAL_TABLE, 
MAX_COMPUTE_EXTERNAL_TABLE,
-        HUDI_EXTERNAL_TABLE, TRINO_CONNECTOR_EXTERNAL_TABLE, 
LAKESOUl_EXTERNAL_TABLE, DICTIONARY;
+        HUDI_EXTERNAL_TABLE, TRINO_CONNECTOR_EXTERNAL_TABLE, 
LAKESOUl_EXTERNAL_TABLE, DICTIONARY,
+        RECURSIVE_CTE_TEMP_TABLE;
 
         public String toEngineName() {
             switch (this) {
@@ -437,6 +438,8 @@ public interface TableIf {
                     return "iceberg";
                 case DICTIONARY:
                     return "dictionary";
+                case RECURSIVE_CTE_TEMP_TABLE:
+                    return "RecursiveCteTempTable";
                 default:
                     return null;
             }
@@ -475,6 +478,7 @@ public interface TableIf {
                 case PAIMON_EXTERNAL_TABLE:
                 case MATERIALIZED_VIEW:
                 case TRINO_CONNECTOR_EXTERNAL_TABLE:
+                case RECURSIVE_CTE_TEMP_TABLE:
                     return "BASE TABLE";
                 default:
                     return null;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/CascadesContext.java 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/CascadesContext.java
index efb53b66822..96b53b0a78d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/CascadesContext.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/CascadesContext.java
@@ -133,6 +133,8 @@ public class CascadesContext implements ScheduleContext {
     private final boolean isEnableExprTrace;
 
     private int groupExpressionCount = 0;
+    private Optional<String> currentRecursiveCteName;
+    private List<Slot> recursiveCteOutputs;
 
     /**
      * Constructor of OptimizerContext.
@@ -142,7 +144,8 @@ public class CascadesContext implements ScheduleContext {
      */
     private CascadesContext(Optional<CascadesContext> parent, Optional<CTEId> 
currentTree,
             StatementContext statementContext, Plan plan, Memo memo,
-            CTEContext cteContext, PhysicalProperties requireProperties, 
boolean isLeadingDisableJoinReorder) {
+            CTEContext cteContext, PhysicalProperties requireProperties, 
boolean isLeadingDisableJoinReorder,
+            Optional<String> currentRecursiveCteName, List<Slot> 
recursiveCteOutputs) {
         this.parent = Objects.requireNonNull(parent, "parent should not null");
         this.currentTree = Objects.requireNonNull(currentTree, "currentTree 
should not null");
         this.statementContext = Objects.requireNonNull(statementContext, 
"statementContext should not null");
@@ -167,6 +170,8 @@ public class CascadesContext implements ScheduleContext {
             this.isEnableExprTrace = false;
         }
         this.isLeadingDisableJoinReorder = isLeadingDisableJoinReorder;
+        this.currentRecursiveCteName = currentRecursiveCteName;
+        this.recursiveCteOutputs = recursiveCteOutputs;
     }
 
     /** init a temporary context to rewrite expression */
@@ -181,7 +186,7 @@ public class CascadesContext implements ScheduleContext {
         }
         return newContext(Optional.empty(), Optional.empty(),
                 statementContext, DUMMY_PLAN,
-                new CTEContext(), PhysicalProperties.ANY, false);
+                new CTEContext(), PhysicalProperties.ANY, false, 
Optional.empty(), ImmutableList.of());
     }
 
     /**
@@ -190,24 +195,25 @@ public class CascadesContext implements ScheduleContext {
     public static CascadesContext initContext(StatementContext 
statementContext,
             Plan initPlan, PhysicalProperties requireProperties) {
         return newContext(Optional.empty(), Optional.empty(), statementContext,
-                initPlan, new CTEContext(), requireProperties, false);
+                initPlan, new CTEContext(), requireProperties, false, 
Optional.empty(), ImmutableList.of());
     }
 
     /**
      * use for analyze cte. we must pass CteContext from outer since we need 
to get right scope of cte
      */
     public static CascadesContext newContextWithCteContext(CascadesContext 
cascadesContext,
-            Plan initPlan, CTEContext cteContext) {
+            Plan initPlan, CTEContext cteContext, Optional<String> 
currentRecursiveCteName,
+            List<Slot> recursiveCteOutputs) {
         return newContext(Optional.of(cascadesContext), Optional.empty(),
                 cascadesContext.getStatementContext(), initPlan, cteContext, 
PhysicalProperties.ANY,
-                cascadesContext.isLeadingDisableJoinReorder
-        );
+                cascadesContext.isLeadingDisableJoinReorder, 
currentRecursiveCteName, recursiveCteOutputs);
     }
 
     public static CascadesContext newCurrentTreeContext(CascadesContext 
context) {
         return CascadesContext.newContext(context.getParent(), 
context.getCurrentTree(), context.getStatementContext(),
                 context.getRewritePlan(), context.getCteContext(),
-                context.getCurrentJobContext().getRequiredProperties(), 
context.isLeadingDisableJoinReorder);
+                context.getCurrentJobContext().getRequiredProperties(), 
context.isLeadingDisableJoinReorder,
+                Optional.empty(), ImmutableList.of());
     }
 
     /**
@@ -216,14 +222,17 @@ public class CascadesContext implements ScheduleContext {
     public static CascadesContext newSubtreeContext(Optional<CTEId> subtree, 
CascadesContext context,
             Plan plan, PhysicalProperties requireProperties) {
         return CascadesContext.newContext(Optional.of(context), subtree, 
context.getStatementContext(),
-                plan, context.getCteContext(), requireProperties, 
context.isLeadingDisableJoinReorder);
+                plan, context.getCteContext(), requireProperties, 
context.isLeadingDisableJoinReorder, Optional.empty(),
+                ImmutableList.of());
     }
 
     private static CascadesContext newContext(Optional<CascadesContext> 
parent, Optional<CTEId> subtree,
             StatementContext statementContext, Plan initPlan, CTEContext 
cteContext,
-            PhysicalProperties requireProperties, boolean 
isLeadingDisableJoinReorder) {
+            PhysicalProperties requireProperties, boolean 
isLeadingDisableJoinReorder,
+            Optional<String> currentRecursiveCteName, List<Slot> 
recursiveCteOutputs) {
         return new CascadesContext(parent, subtree, statementContext, 
initPlan, null,
-            cteContext, requireProperties, isLeadingDisableJoinReorder);
+                cteContext, requireProperties, isLeadingDisableJoinReorder, 
currentRecursiveCteName,
+                recursiveCteOutputs);
     }
 
     public CascadesContext getRoot() {
@@ -250,6 +259,14 @@ public class CascadesContext implements ScheduleContext {
         return isTimeout;
     }
 
+    public Optional<String> getCurrentRecursiveCteName() {
+        return currentRecursiveCteName;
+    }
+
+    public List<Slot> getRecursiveCteOutputs() {
+        return recursiveCteOutputs;
+    }
+
     /**
      * Init memo with plan
      */
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
index a6af723fff8..21f9bbf2126 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
@@ -116,6 +116,7 @@ import org.apache.doris.nereids.trees.plans.Plan;
 import org.apache.doris.nereids.trees.plans.PreAggStatus;
 import org.apache.doris.nereids.trees.plans.algebra.Aggregate;
 import org.apache.doris.nereids.trees.plans.algebra.Relation;
+import org.apache.doris.nereids.trees.plans.algebra.SetOperation;
 import org.apache.doris.nereids.trees.plans.physical.AbstractPhysicalJoin;
 import org.apache.doris.nereids.trees.plans.physical.AbstractPhysicalSort;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalAssertNumRows;
@@ -156,6 +157,8 @@ import 
org.apache.doris.nereids.trees.plans.physical.PhysicalPartitionTopN;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalPlan;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalProject;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalQuickSort;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalRecursiveCte;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalRecursiveCteScan;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalRelation;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalRepeat;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalResultSink;
@@ -205,6 +208,8 @@ import org.apache.doris.planner.OlapTableSink;
 import org.apache.doris.planner.PartitionSortNode;
 import org.apache.doris.planner.PlanFragment;
 import org.apache.doris.planner.PlanNode;
+import org.apache.doris.planner.RecursiveCteNode;
+import org.apache.doris.planner.RecursiveCteScanNode;
 import org.apache.doris.planner.RepeatNode;
 import org.apache.doris.planner.ResultFileSink;
 import org.apache.doris.planner.ResultSink;
@@ -1070,6 +1075,27 @@ public class PhysicalPlanTranslator extends 
DefaultPlanVisitor<PlanFragment, Pla
         return planFragment;
     }
 
+    @Override
+    public PlanFragment visitPhysicalRecursiveCteScan(PhysicalRecursiveCteScan 
recursiveCteScan,
+            PlanTranslatorContext context) {
+        TableIf table = recursiveCteScan.getTable();
+        List<Slot> slots = ImmutableList.copyOf(recursiveCteScan.getOutput());
+        TupleDescriptor tupleDescriptor = generateTupleDesc(slots, table, 
context);
+
+        RecursiveCteScanNode scanNode = new 
RecursiveCteScanNode(context.nextPlanNodeId(), tupleDescriptor);
+        scanNode.setNereidsId(recursiveCteScan.getId());
+        context.getNereidsIdToPlanNodeIdMap().put(recursiveCteScan.getId(), 
scanNode.getId());
+        Utils.execWithUncheckedException(scanNode::initScanRangeLocations);
+
+        translateRuntimeFilter(recursiveCteScan, scanNode, context);
+
+        context.addScanNode(scanNode, recursiveCteScan);
+        PlanFragment planFragment = createPlanFragment(scanNode, 
DataPartition.RANDOM, recursiveCteScan);
+        context.addPlanFragment(planFragment);
+        updateLegacyPlanIdToPhysicalPlan(planFragment.getPlanRoot(), 
recursiveCteScan);
+        return planFragment;
+    }
+
     private List<Expr> translateToExprs(List<Expression> expressions, 
PlanTranslatorContext context) {
         List<Expr> exprs = Lists.newArrayListWithCapacity(expressions.size());
         for (Expression expression : expressions) {
@@ -2274,6 +2300,9 @@ public class PhysicalPlanTranslator extends 
DefaultPlanVisitor<PlanFragment, Pla
             setOperationNode = new ExceptNode(context.nextPlanNodeId(), 
setTuple.getId());
         } else if (setOperation instanceof PhysicalIntersect) {
             setOperationNode = new IntersectNode(context.nextPlanNodeId(), 
setTuple.getId());
+        } else if (setOperation instanceof PhysicalRecursiveCte) {
+            setOperationNode = new RecursiveCteNode(context.nextPlanNodeId(), 
setTuple.getId(),
+                    
setOperation.getQualifier().equals(SetOperation.Qualifier.ALL));
         } else {
             throw new RuntimeException("not support set operation type " + 
setOperation);
         }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java
index 925d1e57c9b..da2671b760f 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java
@@ -2145,7 +2145,8 @@ public class LogicalPlanBuilder extends 
DorisParserBaseVisitor<Object> {
                             .map(RuleContext::getText)
                             .collect(ImmutableList.toImmutableList())
             );
-            return new LogicalSubQueryAlias<>(ctx.identifier().getText(), 
columnNames, queryPlan);
+            return new LogicalSubQueryAlias<>(ctx.identifier().getText(), 
columnNames, ctx.RECURSIVE() != null,
+                    queryPlan);
         });
     }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/pattern/MemoPatterns.java 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/pattern/MemoPatterns.java
index fa9d191f5e2..ec53deafaad 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/pattern/MemoPatterns.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/pattern/MemoPatterns.java
@@ -28,6 +28,7 @@ import 
org.apache.doris.nereids.trees.plans.logical.LogicalExcept;
 import org.apache.doris.nereids.trees.plans.logical.LogicalIntersect;
 import org.apache.doris.nereids.trees.plans.logical.LogicalLeaf;
 import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
+import org.apache.doris.nereids.trees.plans.logical.LogicalRecursiveCte;
 import org.apache.doris.nereids.trees.plans.logical.LogicalRelation;
 import org.apache.doris.nereids.trees.plans.logical.LogicalSetOperation;
 import org.apache.doris.nereids.trees.plans.logical.LogicalUnary;
@@ -205,6 +206,29 @@ public interface MemoPatterns extends Patterns {
                 defaultPromise());
     }
 
+    /**
+     * create a LogicalRecursiveCte pattern.
+     */
+    default PatternDescriptor<LogicalRecursiveCte>
+            logicalRecursiveCte(
+                PatternDescriptor... children) {
+        return new PatternDescriptor(
+                new TypePattern(LogicalRecursiveCte.class,
+                        Arrays.stream(children)
+                                .map(PatternDescriptor::getPattern)
+                                .toArray(Pattern[]::new)),
+                defaultPromise());
+    }
+
+    /**
+     * create a logicalRecursiveCte group.
+     */
+    default PatternDescriptor<LogicalRecursiveCte> logicalRecursiveCte() {
+        return new PatternDescriptor(
+                new TypePattern(LogicalRecursiveCte.class, 
multiGroup().pattern),
+                defaultPromise());
+    }
+
     /**
      * create a logicalExcept pattern.
      */
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildOutputPropertyDeriver.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildOutputPropertyDeriver.java
index d1c8bccb421..d3592388dae 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildOutputPropertyDeriver.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildOutputPropertyDeriver.java
@@ -52,6 +52,8 @@ import 
org.apache.doris.nereids.trees.plans.physical.PhysicalOlapScan;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalOneRowRelation;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalPartitionTopN;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalProject;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalRecursiveCte;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalRecursiveCteScan;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalRepeat;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalSetOperation;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalSink;
@@ -147,6 +149,11 @@ public class ChildOutputPropertyDeriver extends 
PlanVisitor<PhysicalProperties,
         return PhysicalProperties.STORAGE_ANY;
     }
 
+    @Override
+    public PhysicalProperties 
visitPhysicalRecursiveCteScan(PhysicalRecursiveCteScan cteScan, PlanContext 
context) {
+        return PhysicalProperties.GATHER;
+    }
+
     /**
      * TODO return ANY after refactor coordinator
      * return STORAGE_ANY not ANY, in order to generate distribute on jdbc 
scan.
@@ -463,6 +470,11 @@ public class ChildOutputPropertyDeriver extends 
PlanVisitor<PhysicalProperties,
         return PhysicalProperties.createHash(request, firstType);
     }
 
+    @Override
+    public PhysicalProperties visitPhysicalRecursiveCte(PhysicalRecursiveCte 
recursiveCte, PlanContext context) {
+        return PhysicalProperties.GATHER;
+    }
+
     @Override
     public PhysicalProperties visitPhysicalUnion(PhysicalUnion union, 
PlanContext context) {
         if (union.getConstantExprsList().isEmpty()) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleSet.java 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleSet.java
index 45cb22e6b52..122dbee03ef 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleSet.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleSet.java
@@ -80,6 +80,8 @@ import 
org.apache.doris.nereids.rules.implementation.LogicalOlapTableSinkToPhysi
 import 
org.apache.doris.nereids.rules.implementation.LogicalOneRowRelationToPhysicalOneRowRelation;
 import 
org.apache.doris.nereids.rules.implementation.LogicalPartitionTopNToPhysicalPartitionTopN;
 import 
org.apache.doris.nereids.rules.implementation.LogicalProjectToPhysicalProject;
+import 
org.apache.doris.nereids.rules.implementation.LogicalRecursiveCteScanToPhysicalRecursiveCteScan;
+import 
org.apache.doris.nereids.rules.implementation.LogicalRecursiveCteToPhysicalRecursiveCte;
 import 
org.apache.doris.nereids.rules.implementation.LogicalRepeatToPhysicalRepeat;
 import 
org.apache.doris.nereids.rules.implementation.LogicalResultSinkToPhysicalResultSink;
 import 
org.apache.doris.nereids.rules.implementation.LogicalSchemaScanToPhysicalSchemaScan;
@@ -194,6 +196,7 @@ public class RuleSet {
             .add(new LogicalJdbcScanToPhysicalJdbcScan())
             .add(new LogicalOdbcScanToPhysicalOdbcScan())
             .add(new LogicalEsScanToPhysicalEsScan())
+            .add(new LogicalRecursiveCteScanToPhysicalRecursiveCteScan())
             .add(new LogicalProjectToPhysicalProject())
             .add(new LogicalLimitToPhysicalLimit())
             .add(new LogicalWindowToPhysicalWindow())
@@ -209,6 +212,7 @@ public class RuleSet {
             .add(SplitAggWithoutDistinct.INSTANCE)
             .add(SplitAggMultiPhase.INSTANCE)
             .add(SplitAggMultiPhaseWithoutGbyKey.INSTANCE)
+            .add(new LogicalRecursiveCteToPhysicalRecursiveCte())
             .add(new LogicalUnionToPhysicalUnion())
             .add(new LogicalExceptToPhysicalExcept())
             .add(new LogicalIntersectToPhysicalIntersect())
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleType.java 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleType.java
index a8bc2052348..c13607a838d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleType.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleType.java
@@ -496,6 +496,7 @@ public enum RuleType {
     LOGICAL_JDBC_SCAN_TO_PHYSICAL_JDBC_SCAN_RULE(RuleTypeClass.IMPLEMENTATION),
     LOGICAL_ODBC_SCAN_TO_PHYSICAL_ODBC_SCAN_RULE(RuleTypeClass.IMPLEMENTATION),
     LOGICAL_ES_SCAN_TO_PHYSICAL_ES_SCAN_RULE(RuleTypeClass.IMPLEMENTATION),
+    
LOGICAL_RECURSIVE_CTE_SCAN_TO_PHYSICAL_RECUSIVE_CTE_SCAN_RULE(RuleTypeClass.IMPLEMENTATION),
     
LOGICAL_BLACKHOLE_SINK_TO_PHYSICAL_BLACKHOLE_SINK_RULE(RuleTypeClass.IMPLEMENTATION),
     
LOGICAL_OLAP_TABLE_SINK_TO_PHYSICAL_OLAP_TABLE_SINK_RULE(RuleTypeClass.IMPLEMENTATION),
     
LOGICAL_HIVE_TABLE_SINK_TO_PHYSICAL_HIVE_TABLE_SINK_RULE(RuleTypeClass.IMPLEMENTATION),
@@ -517,6 +518,7 @@ public enum RuleType {
     COUNT_ON_INDEX_WITHOUT_PROJECT(RuleTypeClass.IMPLEMENTATION),
     TWO_PHASE_AGGREGATE_WITH_DISTINCT(RuleTypeClass.IMPLEMENTATION),
     LOGICAL_UNION_TO_PHYSICAL_UNION(RuleTypeClass.IMPLEMENTATION),
+    
LOGICAL_RECURSIVE_CTE_TO_PHYSICAL_RECURSIVE_CTE(RuleTypeClass.IMPLEMENTATION),
     LOGICAL_EXCEPT_TO_PHYSICAL_EXCEPT(RuleTypeClass.IMPLEMENTATION),
     LOGICAL_INTERSECT_TO_PHYSICAL_INTERSECT(RuleTypeClass.IMPLEMENTATION),
     LOGICAL_GENERATE_TO_PHYSICAL_GENERATE(RuleTypeClass.IMPLEMENTATION),
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/AnalyzeCTE.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/AnalyzeCTE.java
index 4287e8ca7de..8ac4234e846 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/AnalyzeCTE.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/AnalyzeCTE.java
@@ -24,21 +24,29 @@ import 
org.apache.doris.nereids.exceptions.AnalysisException;
 import org.apache.doris.nereids.rules.Rule;
 import org.apache.doris.nereids.rules.RuleType;
 import org.apache.doris.nereids.trees.expressions.CTEId;
+import org.apache.doris.nereids.trees.expressions.NamedExpression;
 import org.apache.doris.nereids.trees.expressions.Slot;
+import org.apache.doris.nereids.trees.expressions.SlotReference;
 import org.apache.doris.nereids.trees.expressions.StatementScopeIdGenerator;
 import org.apache.doris.nereids.trees.plans.Plan;
 import org.apache.doris.nereids.trees.plans.logical.LogicalCTE;
 import org.apache.doris.nereids.trees.plans.logical.LogicalCTEAnchor;
 import org.apache.doris.nereids.trees.plans.logical.LogicalCTEProducer;
 import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
+import org.apache.doris.nereids.trees.plans.logical.LogicalProject;
+import org.apache.doris.nereids.trees.plans.logical.LogicalRecursiveCte;
 import org.apache.doris.nereids.trees.plans.logical.LogicalSubQueryAlias;
+import org.apache.doris.nereids.trees.plans.logical.LogicalUnion;
+import org.apache.doris.nereids.trees.plans.logical.ProjectProcessor;
 
+import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Sets;
 
 import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Optional;
 import java.util.Set;
 import java.util.stream.Collectors;
 
@@ -70,7 +78,7 @@ public class AnalyzeCTE extends OneAnalysisRuleFactory {
             // step 1. analyzed all cte plan
             Pair<CTEContext, List<LogicalCTEProducer<Plan>>> result = 
analyzeCte(logicalCTE, ctx.cascadesContext);
             CascadesContext outerCascadesCtx = 
CascadesContext.newContextWithCteContext(
-                    ctx.cascadesContext, logicalCTE.child(), result.first);
+                    ctx.cascadesContext, logicalCTE.child(), result.first, 
Optional.empty(), ImmutableList.of());
             
outerCascadesCtx.withPlanProcess(ctx.cascadesContext.showPlanProcess(), () -> {
                 outerCascadesCtx.newAnalyzer().analyze();
             });
@@ -95,25 +103,96 @@ public class AnalyzeCTE extends OneAnalysisRuleFactory {
         List<LogicalCTEProducer<Plan>> cteProducerPlans = new ArrayList<>();
         for (LogicalSubQueryAlias<Plan> aliasQuery : aliasQueries) {
             // we should use a chain to ensure visible of cte
-            LogicalPlan parsedCtePlan = (LogicalPlan) aliasQuery.child();
-            CascadesContext innerCascadesCtx = 
CascadesContext.newContextWithCteContext(
-                    cascadesContext, parsedCtePlan, outerCteCtx);
-            
innerCascadesCtx.withPlanProcess(cascadesContext.showPlanProcess(), () -> {
-                innerCascadesCtx.newAnalyzer().analyze();
-            });
-            
cascadesContext.addPlanProcesses(innerCascadesCtx.getPlanProcesses());
-            LogicalPlan analyzedCtePlan = (LogicalPlan) 
innerCascadesCtx.getRewritePlan();
-            checkColumnAlias(aliasQuery, analyzedCtePlan.getOutput());
-            CTEId cteId = StatementScopeIdGenerator.newCTEId();
-            LogicalSubQueryAlias<Plan> logicalSubQueryAlias =
-                    aliasQuery.withChildren(ImmutableList.of(analyzedCtePlan));
-            outerCteCtx = new CTEContext(cteId, logicalSubQueryAlias, 
outerCteCtx);
-            outerCteCtx.setAnalyzedPlan(logicalSubQueryAlias);
-            cteProducerPlans.add(new LogicalCTEProducer<>(cteId, 
logicalSubQueryAlias));
+            if (aliasQuery.isRecursiveCte()) {
+                Pair<CTEContext, LogicalCTEProducer<Plan>> result = 
analyzeRecursiveCte(aliasQuery, outerCteCtx,
+                        cascadesContext);
+                outerCteCtx = result.first;
+                cteProducerPlans.add(result.second);
+            } else {
+                LogicalPlan parsedCtePlan = (LogicalPlan) aliasQuery.child();
+                CascadesContext innerCascadesCtx = 
CascadesContext.newContextWithCteContext(
+                        cascadesContext, parsedCtePlan, outerCteCtx, 
Optional.empty(), ImmutableList.of());
+                
innerCascadesCtx.withPlanProcess(cascadesContext.showPlanProcess(), () -> {
+                    innerCascadesCtx.newAnalyzer().analyze();
+                });
+                
cascadesContext.addPlanProcesses(innerCascadesCtx.getPlanProcesses());
+                LogicalPlan analyzedCtePlan = (LogicalPlan) 
innerCascadesCtx.getRewritePlan();
+                checkColumnAlias(aliasQuery, analyzedCtePlan.getOutput());
+                CTEId cteId = StatementScopeIdGenerator.newCTEId();
+                LogicalSubQueryAlias<Plan> logicalSubQueryAlias = aliasQuery
+                        .withChildren(ImmutableList.of(analyzedCtePlan));
+                outerCteCtx = new CTEContext(cteId, logicalSubQueryAlias, 
outerCteCtx);
+                outerCteCtx.setAnalyzedPlan(logicalSubQueryAlias);
+                cteProducerPlans.add(new LogicalCTEProducer<>(cteId, 
logicalSubQueryAlias));
+            }
         }
         return Pair.of(outerCteCtx, cteProducerPlans);
     }
 
+    private Pair<CTEContext, LogicalCTEProducer<Plan>> 
analyzeRecursiveCte(LogicalSubQueryAlias<Plan> aliasQuery,
+            CTEContext outerCteCtx, CascadesContext cascadesContext) {
+        Preconditions.checkArgument(aliasQuery.isRecursiveCte(), "alias query 
must be recursive cte");
+        LogicalPlan parsedCtePlan = (LogicalPlan) aliasQuery.child();
+        if (!(parsedCtePlan instanceof LogicalUnion) || 
parsedCtePlan.children().size() != 2) {
+            throw new AnalysisException("recursive cte must be union");
+        }
+        // analyze anchor child, its output list will be recursive cte temp 
table's schema
+        LogicalPlan anchorChild = (LogicalPlan) parsedCtePlan.child(0);
+        CascadesContext innerAnchorCascadesCtx = 
CascadesContext.newContextWithCteContext(
+                cascadesContext, anchorChild, outerCteCtx, 
Optional.of(aliasQuery.getAlias()), ImmutableList.of());
+        
innerAnchorCascadesCtx.withPlanProcess(cascadesContext.showPlanProcess(), () -> 
{
+            innerAnchorCascadesCtx.newAnalyzer().analyze();
+        });
+        
cascadesContext.addPlanProcesses(innerAnchorCascadesCtx.getPlanProcesses());
+        LogicalPlan analyzedAnchorChild = (LogicalPlan) 
innerAnchorCascadesCtx.getRewritePlan();
+        checkColumnAlias(aliasQuery, analyzedAnchorChild.getOutput());
+
+        // analyze recursive child
+        LogicalPlan recursiveChild = (LogicalPlan) parsedCtePlan.child(1);
+        CascadesContext innerRecursiveCascadesCtx = 
CascadesContext.newContextWithCteContext(
+                cascadesContext, recursiveChild, outerCteCtx, 
Optional.of(aliasQuery.getAlias()),
+                analyzedAnchorChild.getOutput());
+        
innerRecursiveCascadesCtx.withPlanProcess(cascadesContext.showPlanProcess(), () 
-> {
+            innerRecursiveCascadesCtx.newAnalyzer().analyze();
+        });
+        
cascadesContext.addPlanProcesses(innerRecursiveCascadesCtx.getPlanProcesses());
+        LogicalPlan analyzedRecursiveChild = (LogicalPlan) 
innerRecursiveCascadesCtx.getRewritePlan();
+        LogicalUnion logicalUnion = (LogicalUnion) parsedCtePlan;
+
+        // manually bind LogicalRecursiveCte, see bindSetOperation in 
BindExpression.java
+        LogicalRecursiveCte analyzedCtePlan = new LogicalRecursiveCte(
+                logicalUnion.getQualifier(), 
ImmutableList.of(analyzedAnchorChild, analyzedRecursiveChild));
+        List<List<NamedExpression>> childrenProjections = 
analyzedCtePlan.collectChildrenProjections();
+        int childrenProjectionSize = childrenProjections.size();
+        ImmutableList.Builder<List<SlotReference>> childrenOutputs = 
ImmutableList
+                .builderWithExpectedSize(childrenProjectionSize);
+        ImmutableList.Builder<Plan> newChildren = 
ImmutableList.builderWithExpectedSize(childrenProjectionSize);
+        for (int i = 0; i < childrenProjectionSize; i++) {
+            Plan newChild;
+            Plan child = analyzedCtePlan.child(i);
+            if 
(childrenProjections.get(i).stream().allMatch(SlotReference.class::isInstance)) 
{
+                newChild = child;
+            } else {
+                List<NamedExpression> parentProject = 
childrenProjections.get(i);
+                newChild = ProjectProcessor.tryProcessProject(parentProject, 
child)
+                        .orElseGet(() -> new LogicalProject<>(parentProject, 
child));
+            }
+            newChildren.add(newChild);
+            childrenOutputs.add((List<SlotReference>) (List) 
newChild.getOutput());
+        }
+        analyzedCtePlan = (LogicalRecursiveCte) 
analyzedCtePlan.withChildrenAndTheirOutputs(newChildren.build(),
+                childrenOutputs.build());
+        List<NamedExpression> newOutputs = analyzedCtePlan.buildNewOutputs();
+        analyzedCtePlan = analyzedCtePlan.withNewOutputs(newOutputs);
+
+        CTEId cteId = StatementScopeIdGenerator.newCTEId();
+        LogicalSubQueryAlias<Plan> logicalSubQueryAlias = 
aliasQuery.withChildren(ImmutableList.of(analyzedCtePlan));
+        outerCteCtx = new CTEContext(cteId, logicalSubQueryAlias, outerCteCtx);
+        outerCteCtx.setAnalyzedPlan(logicalSubQueryAlias);
+        LogicalCTEProducer<Plan> cteProducer = new LogicalCTEProducer<>(cteId, 
logicalSubQueryAlias);
+        return Pair.of(outerCteCtx, cteProducer);
+    }
+
     /**
      * check columnAliases' size and name
      */
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindRelation.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindRelation.java
index 84652b9eed0..4e6be99f290 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindRelation.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindRelation.java
@@ -27,6 +27,7 @@ import org.apache.doris.catalog.FunctionRegistry;
 import org.apache.doris.catalog.KeysType;
 import org.apache.doris.catalog.OlapTable;
 import org.apache.doris.catalog.Partition;
+import org.apache.doris.catalog.RecursiveCteTempTable;
 import org.apache.doris.catalog.SchemaTable;
 import org.apache.doris.catalog.SchemaTable.SchemaColumn;
 import org.apache.doris.catalog.TableIf;
@@ -89,6 +90,7 @@ import 
org.apache.doris.nereids.trees.plans.logical.LogicalJdbcScan;
 import org.apache.doris.nereids.trees.plans.logical.LogicalOdbcScan;
 import org.apache.doris.nereids.trees.plans.logical.LogicalOlapScan;
 import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
+import org.apache.doris.nereids.trees.plans.logical.LogicalRecursiveCteScan;
 import org.apache.doris.nereids.trees.plans.logical.LogicalSchemaScan;
 import org.apache.doris.nereids.trees.plans.logical.LogicalSubQueryAlias;
 import org.apache.doris.nereids.trees.plans.logical.LogicalTVFRelation;
@@ -168,16 +170,27 @@ public class BindRelation extends OneAnalysisRuleFactory {
                 return consumer;
             }
         }
+        LogicalPlan scan;
         List<String> tableQualifier = RelationUtil.getQualifierName(
                 cascadesContext.getConnectContext(), 
unboundRelation.getNameParts());
-        TableIf table = 
cascadesContext.getStatementContext().getAndCacheTable(tableQualifier, 
TableFrom.QUERY,
-                Optional.of(unboundRelation));
+        if 
(tableName.equalsIgnoreCase(cascadesContext.getCurrentRecursiveCteName().orElse("")))
 {
+            ImmutableList.Builder<Column> schema = new 
ImmutableList.Builder<>();
+            for (Slot slot : cascadesContext.getRecursiveCteOutputs()) {
+                schema.add(new Column(slot.getName(), 
slot.getDataType().toCatalogDataType(), slot.nullable()));
+            }
+            RecursiveCteTempTable cteTempTable = new 
RecursiveCteTempTable(tableName, schema.build());
+            scan = new 
LogicalRecursiveCteScan(cascadesContext.getStatementContext().getNextRelationId(),
+                    cteTempTable, tableQualifier);
+        } else {
+            TableIf table = 
cascadesContext.getStatementContext().getAndCacheTable(tableQualifier, 
TableFrom.QUERY,
+                    Optional.of(unboundRelation));
 
-        LogicalPlan scan = getLogicalPlan(table, unboundRelation, 
tableQualifier, cascadesContext);
-        if (cascadesContext.isLeadingJoin()) {
-            LeadingHint leading = (LeadingHint) 
cascadesContext.getHintMap().get("Leading");
-            
leading.putRelationIdAndTableName(Pair.of(unboundRelation.getRelationId(), 
tableName));
-            
leading.getRelationIdToScanMap().put(unboundRelation.getRelationId(), scan);
+            scan = getLogicalPlan(table, unboundRelation, tableQualifier, 
cascadesContext);
+            if (cascadesContext.isLeadingJoin()) {
+                LeadingHint leading = (LeadingHint) 
cascadesContext.getHintMap().get("Leading");
+                
leading.putRelationIdAndTableName(Pair.of(unboundRelation.getRelationId(), 
tableName));
+                
leading.getRelationIdToScanMap().put(unboundRelation.getRelationId(), scan);
+            }
         }
         return scan;
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/CollectRelation.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/CollectRelation.java
index 53650234610..468bc8c7d7c 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/CollectRelation.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/CollectRelation.java
@@ -102,8 +102,10 @@ public class CollectRelation implements 
AnalysisRuleFactory {
         for (LogicalSubQueryAlias<Plan> aliasQuery : aliasQueries) {
             // we should use a chain to ensure visible of cte
             LogicalPlan parsedCtePlan = (LogicalPlan) aliasQuery.child();
+            // 看起来需要在CascadesContext中添加当前CTE的name,以便判断自引用
             CascadesContext innerCascadesCtx = 
CascadesContext.newContextWithCteContext(
-                    cascadesContext, parsedCtePlan, outerCteCtx);
+                    cascadesContext, parsedCtePlan, outerCteCtx, 
aliasQuery.isRecursiveCte()
+                            ? Optional.of(aliasQuery.getAlias()) : 
Optional.empty(), ImmutableList.of());
             innerCascadesCtx.newTableCollector().collect();
             LogicalPlan analyzedCtePlan = (LogicalPlan) 
innerCascadesCtx.getRewritePlan();
             // cteId is not used in CollectTable stage
@@ -122,7 +124,8 @@ public class CollectRelation implements AnalysisRuleFactory 
{
                 if (e instanceof SubqueryExpr) {
                     SubqueryExpr subqueryExpr = (SubqueryExpr) e;
                     CascadesContext subqueryContext = 
CascadesContext.newContextWithCteContext(
-                            ctx.cascadesContext, subqueryExpr.getQueryPlan(), 
ctx.cteContext);
+                            ctx.cascadesContext, subqueryExpr.getQueryPlan(), 
ctx.cteContext, Optional.empty(),
+                            ImmutableList.of());
                     
subqueryContext.keepOrShowPlanProcess(ctx.cascadesContext.showPlanProcess(),
                             () -> 
subqueryContext.newTableCollector().collect());
                     
ctx.cascadesContext.addPlanProcesses(subqueryContext.getPlanProcesses());
@@ -174,6 +177,10 @@ public class CollectRelation implements 
AnalysisRuleFactory {
             List<String> nameParts, TableFrom tableFrom, 
Optional<UnboundRelation> unboundRelation) {
         if (nameParts.size() == 1) {
             String tableName = nameParts.get(0);
+            if (cascadesContext.getCurrentRecursiveCteName().isPresent()
+                    && 
tableName.equalsIgnoreCase(cascadesContext.getCurrentRecursiveCteName().get())) 
{
+                return;
+            }
             // check if it is a CTE's name
             CTEContext cteContext = 
cascadesContext.getCteContext().findCTEContext(tableName).orElse(null);
             if (cteContext != null) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/SubExprAnalyzer.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/SubExprAnalyzer.java
index 69d79934992..05a35dc6ed8 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/SubExprAnalyzer.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/SubExprAnalyzer.java
@@ -52,6 +52,7 @@ import java.util.Collection;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Objects;
+import java.util.Optional;
 import java.util.Set;
 
 /**
@@ -227,7 +228,8 @@ class SubExprAnalyzer<T> extends 
DefaultExpressionRewriter<T> {
             throw new IllegalStateException("Missing CascadesContext");
         }
         CascadesContext subqueryContext = 
CascadesContext.newContextWithCteContext(
-                cascadesContext, expr.getQueryPlan(), 
cascadesContext.getCteContext());
+                cascadesContext, expr.getQueryPlan(), 
cascadesContext.getCteContext(), Optional.empty(),
+                ImmutableList.of());
         // don't use `getScope()` because we only need 
`getScope().getOuterScope()` and `getScope().getSlots()`
         // otherwise unexpected errors may occur
         Scope subqueryScope = new Scope(getScope().getOuterScope(),
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalRecursiveCteScanToPhysicalRecursiveCteScan.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalRecursiveCteScanToPhysicalRecursiveCteScan.java
new file mode 100644
index 00000000000..8714c280bb9
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalRecursiveCteScanToPhysicalRecursiveCteScan.java
@@ -0,0 +1,42 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.rules.implementation;
+
+import org.apache.doris.nereids.rules.Rule;
+import org.apache.doris.nereids.rules.RuleType;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalRecursiveCteScan;
+
+import java.util.Optional;
+
+/**
+ * Implementation rule that convert logical Recursive Cte Scan to physical 
Recursive Cte Scan.
+ */
+public class LogicalRecursiveCteScanToPhysicalRecursiveCteScan extends 
OneImplementationRuleFactory {
+    @Override
+    public Rule build() {
+        return logicalRecursiveCteScan().then(recursiveCteScan ->
+            new PhysicalRecursiveCteScan(
+                    recursiveCteScan.getRelationId(),
+                    recursiveCteScan.getTable(),
+                    recursiveCteScan.getQualifier(),
+                    Optional.empty(),
+                    recursiveCteScan.getLogicalProperties(),
+                    recursiveCteScan.getOperativeSlots())
+        
).toRule(RuleType.LOGICAL_RECURSIVE_CTE_SCAN_TO_PHYSICAL_RECUSIVE_CTE_SCAN_RULE);
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalRecursiveCteToPhysicalRecursiveCte.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalRecursiveCteToPhysicalRecursiveCte.java
new file mode 100644
index 00000000000..c8960aa9e51
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalRecursiveCteToPhysicalRecursiveCte.java
@@ -0,0 +1,39 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.rules.implementation;
+
+import org.apache.doris.nereids.rules.Rule;
+import org.apache.doris.nereids.rules.RuleType;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalRecursiveCte;
+
+/**
+ * Implementation rule that convert logical Recursive Cte to Physical 
Recursive Cte.
+ */
+public class LogicalRecursiveCteToPhysicalRecursiveCte extends 
OneImplementationRuleFactory {
+    @Override
+    public Rule build() {
+        return logicalRecursiveCte().then(recursiveCte ->
+            new PhysicalRecursiveCte(recursiveCte.getQualifier(),
+                    recursiveCte.getOutputs(),
+                    recursiveCte.getRegularChildrenOutputs(),
+                    recursiveCte.getConstantExprsList(),
+                    recursiveCte.getLogicalProperties(),
+                    recursiveCte.children())
+        ).toRule(RuleType.LOGICAL_RECURSIVE_CTE_TO_PHYSICAL_RECURSIVE_CTE);
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/AdjustNullable.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/AdjustNullable.java
index 8592d3f22a0..63d63f32b6f 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/AdjustNullable.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/AdjustNullable.java
@@ -41,6 +41,7 @@ import 
org.apache.doris.nereids.trees.plans.logical.LogicalJoin;
 import org.apache.doris.nereids.trees.plans.logical.LogicalPartitionTopN;
 import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
 import org.apache.doris.nereids.trees.plans.logical.LogicalProject;
+import org.apache.doris.nereids.trees.plans.logical.LogicalRecursiveCte;
 import org.apache.doris.nereids.trees.plans.logical.LogicalRepeat;
 import org.apache.doris.nereids.trees.plans.logical.LogicalSetOperation;
 import org.apache.doris.nereids.trees.plans.logical.LogicalSink;
@@ -330,6 +331,22 @@ public class AdjustNullable extends 
DefaultPlanRewriter<Map<ExprId, Slot>> imple
                     inputNullable.set(j, inputNullable.get(j) || 
constantExprs.get(j).nullable());
                 }
             }
+        } else if (setOperation instanceof LogicalRecursiveCte) {
+            // LogicalRecursiveCte is basically like LogicalUnion, so just do 
same as LogicalUnion
+            LogicalRecursiveCte logicalRecursiveCte = (LogicalRecursiveCte) 
setOperation;
+            if (!logicalRecursiveCte.getConstantExprsList().isEmpty() && 
setOperation.children().isEmpty()) {
+                int outputSize = 
logicalRecursiveCte.getConstantExprsList().get(0).size();
+                // create the inputNullable list and fill it with all FALSE 
values
+                inputNullable = Lists.newArrayListWithCapacity(outputSize);
+                for (int i = 0; i < outputSize; i++) {
+                    inputNullable.add(false);
+                }
+            }
+            for (List<NamedExpression> constantExprs : 
logicalRecursiveCte.getConstantExprsList()) {
+                for (int j = 0; j < constantExprs.size(); j++) {
+                    inputNullable.set(j, inputNullable.get(j) || 
constantExprs.get(j).nullable());
+                }
+            }
         }
         if (inputNullable == null) {
             // this is a fail-safe
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/CTEInline.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/CTEInline.java
index 22ec72c99c5..30970dee649 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/CTEInline.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/CTEInline.java
@@ -31,15 +31,19 @@ import 
org.apache.doris.nereids.trees.plans.logical.LogicalCTEConsumer;
 import org.apache.doris.nereids.trees.plans.logical.LogicalCTEProducer;
 import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
 import org.apache.doris.nereids.trees.plans.logical.LogicalProject;
+import org.apache.doris.nereids.trees.plans.logical.LogicalRecursiveCte;
 import org.apache.doris.nereids.trees.plans.visitor.CustomRewriter;
 import org.apache.doris.nereids.trees.plans.visitor.DefaultPlanRewriter;
 import org.apache.doris.qe.ConnectContext;
 
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
 
 import java.util.ArrayList;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Set;
 
 /**
  * pull up LogicalCteAnchor to the top of plan to avoid CteAnchor break other 
rewrite rules pattern
@@ -48,9 +52,15 @@ import java.util.List;
  * and put all of them to the top of plan depends on dependency tree of them.
  */
 public class CTEInline extends DefaultPlanRewriter<LogicalCTEProducer<?>> 
implements CustomRewriter {
+    Set<LogicalCTEConsumer> mustInlineCteConsumers = new HashSet<>();
 
     @Override
     public Plan rewriteRoot(Plan plan, JobContext jobContext) {
+        List<LogicalRecursiveCte> recursiveCteList = 
plan.collectToList(LogicalRecursiveCte.class::isInstance);
+        for (LogicalRecursiveCte recursiveCte : recursiveCteList) {
+            
mustInlineCteConsumers.addAll(recursiveCte.collect(LogicalCTEConsumer.class::isInstance));
+        }
+
         Plan root = plan.accept(this, null);
         // collect cte id to consumer
         root.foreach(p -> {
@@ -78,17 +88,24 @@ public class CTEInline extends 
DefaultPlanRewriter<LogicalCTEProducer<?>> implem
                 }
                 return false;
             });
-            ConnectContext connectContext = ConnectContext.get();
-            if (connectContext.getSessionVariable().enableCTEMaterialize
-                    && consumers.size() > 
connectContext.getSessionVariable().inlineCTEReferencedThreshold) {
-                // not inline
-                Plan right = cteAnchor.right().accept(this, null);
-                return cteAnchor.withChildren(cteAnchor.left(), right);
-            } else {
+            if (!Sets.intersection(mustInlineCteConsumers, 
Sets.newHashSet(consumers)).isEmpty()) {
                 // should inline
                 Plan root = cteAnchor.right().accept(this, 
(LogicalCTEProducer<?>) cteAnchor.left());
                 // process child
                 return root.accept(this, null);
+            } else {
+                ConnectContext connectContext = ConnectContext.get();
+                if (connectContext.getSessionVariable().enableCTEMaterialize
+                        && consumers.size() > 
connectContext.getSessionVariable().inlineCTEReferencedThreshold) {
+                    // not inline
+                    Plan right = cteAnchor.right().accept(this, null);
+                    return cteAnchor.withChildren(cteAnchor.left(), right);
+                } else {
+                    // should inline
+                    Plan root = cteAnchor.right().accept(this, 
(LogicalCTEProducer<?>) cteAnchor.left());
+                    // process child
+                    return root.accept(this, null);
+                }
             }
         }
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/ColumnPruning.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/ColumnPruning.java
index 90e94d578ba..96daceedfe4 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/ColumnPruning.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/ColumnPruning.java
@@ -44,6 +44,7 @@ import 
org.apache.doris.nereids.trees.plans.logical.LogicalExcept;
 import org.apache.doris.nereids.trees.plans.logical.LogicalFilter;
 import org.apache.doris.nereids.trees.plans.logical.LogicalIntersect;
 import org.apache.doris.nereids.trees.plans.logical.LogicalProject;
+import org.apache.doris.nereids.trees.plans.logical.LogicalRecursiveCte;
 import org.apache.doris.nereids.trees.plans.logical.LogicalRepeat;
 import org.apache.doris.nereids.trees.plans.logical.LogicalSink;
 import org.apache.doris.nereids.trees.plans.logical.LogicalUnion;
@@ -213,6 +214,47 @@ public class ColumnPruning extends 
DefaultPlanRewriter<PruneContext> implements
         return pruneChildren(plan, new RoaringBitmap());
     }
 
+    @Override
+    public Plan visitLogicalRecursiveCte(LogicalRecursiveCte recursiveCte, 
PruneContext context) {
+        // LogicalRecursiveCte is basically like LogicalUnion, so just do same 
as LogicalUnion
+        if (recursiveCte.getQualifier() == Qualifier.DISTINCT) {
+            return skipPruneThisAndFirstLevelChildren(recursiveCte);
+        }
+        LogicalRecursiveCte prunedOutputRecursiveCte = 
pruneRecursiveCteOutput(recursiveCte, context);
+        // start prune children of recursiveCte
+        List<Slot> originOutput = recursiveCte.getOutput();
+        Set<Slot> prunedOutput = prunedOutputRecursiveCte.getOutputSet();
+        List<Integer> prunedOutputIndexes = IntStream.range(0, 
originOutput.size())
+                .filter(index -> 
prunedOutput.contains(originOutput.get(index)))
+                .boxed()
+                .collect(ImmutableList.toImmutableList());
+
+        ImmutableList.Builder<Plan> prunedChildren = ImmutableList.builder();
+        ImmutableList.Builder<List<SlotReference>> prunedChildrenOutputs = 
ImmutableList.builder();
+        for (int i = 0; i < prunedOutputRecursiveCte.arity(); i++) {
+            List<SlotReference> regularChildOutputs = 
prunedOutputRecursiveCte.getRegularChildOutput(i);
+
+            RoaringBitmap prunedChildOutputExprIds = new RoaringBitmap();
+            Builder<SlotReference> prunedChildOutputBuilder
+                    = 
ImmutableList.builderWithExpectedSize(regularChildOutputs.size());
+            for (Integer index : prunedOutputIndexes) {
+                SlotReference slot = regularChildOutputs.get(index);
+                prunedChildOutputBuilder.add(slot);
+                prunedChildOutputExprIds.add(slot.getExprId().asInt());
+            }
+
+            List<SlotReference> prunedChildOutput = 
prunedChildOutputBuilder.build();
+            Plan prunedChild = doPruneChild(
+                    prunedOutputRecursiveCte, 
prunedOutputRecursiveCte.child(i), prunedChildOutputExprIds,
+                    prunedChildOutput, true
+            );
+            prunedChildrenOutputs.add(prunedChildOutput);
+            prunedChildren.add(prunedChild);
+        }
+        return 
prunedOutputRecursiveCte.withChildrenAndTheirOutputs(prunedChildren.build(),
+                prunedChildrenOutputs.build());
+    }
+
     // union can not prune children by the common logic, we must override 
visit method to write special code.
     @Override
     public Plan visitLogicalUnion(LogicalUnion union, PruneContext context) {
@@ -403,6 +445,70 @@ public class ColumnPruning extends 
DefaultPlanRewriter<PruneContext> implements
         }
     }
 
+    private LogicalRecursiveCte pruneRecursiveCteOutput(LogicalRecursiveCte 
recursiveCte, PruneContext context) {
+        List<NamedExpression> originOutput = recursiveCte.getOutputs();
+        if (originOutput.isEmpty()) {
+            return recursiveCte;
+        }
+        List<NamedExpression> prunedOutputs = Lists.newArrayList();
+        List<List<NamedExpression>> constantExprsList = 
recursiveCte.getConstantExprsList();
+        List<List<SlotReference>> regularChildrenOutputs = 
recursiveCte.getRegularChildrenOutputs();
+        List<Plan> children = recursiveCte.children();
+        List<Integer> extractColumnIndex = Lists.newArrayList();
+        for (int i = 0; i < originOutput.size(); i++) {
+            NamedExpression output = originOutput.get(i);
+            if (context.requiredSlotsIds.contains(output.getExprId().asInt())) 
{
+                prunedOutputs.add(output);
+                extractColumnIndex.add(i);
+            }
+        }
+
+        ImmutableList.Builder<List<NamedExpression>> prunedConstantExprsList
+                = 
ImmutableList.builderWithExpectedSize(constantExprsList.size());
+        if (prunedOutputs.isEmpty()) {
+            // process prune all columns
+            NamedExpression originSlot = originOutput.get(0);
+            prunedOutputs = ImmutableList.of(new 
SlotReference(originSlot.getExprId(), originSlot.getName(),
+                    TinyIntType.INSTANCE, false, originSlot.getQualifier()));
+            regularChildrenOutputs = 
Lists.newArrayListWithCapacity(regularChildrenOutputs.size());
+            children = Lists.newArrayListWithCapacity(children.size());
+            for (int i = 0; i < recursiveCte.getArity(); i++) {
+                Plan child = recursiveCte.child(i);
+                List<NamedExpression> newProjectOutput = ImmutableList.of(new 
Alias(new TinyIntLiteral((byte) 1)));
+                LogicalProject<?> project;
+                if (child instanceof LogicalProject) {
+                    LogicalProject<Plan> childProject = (LogicalProject<Plan>) 
child;
+                    List<NamedExpression> mergeProjections = 
PlanUtils.mergeProjections(
+                            childProject.getProjects(), newProjectOutput);
+                    project = new LogicalProject<>(mergeProjections, 
childProject.child());
+                } else {
+                    project = new LogicalProject<>(newProjectOutput, child);
+                }
+                regularChildrenOutputs.add((List) project.getOutput());
+                children.add(project);
+            }
+            for (int i = 0; i < constantExprsList.size(); i++) {
+                prunedConstantExprsList.add(ImmutableList.of(new Alias(new 
TinyIntLiteral((byte) 1))));
+            }
+        } else {
+            int len = extractColumnIndex.size();
+            for (List<NamedExpression> row : constantExprsList) {
+                ImmutableList.Builder<NamedExpression> newRow = 
ImmutableList.builderWithExpectedSize(len);
+                for (int idx : extractColumnIndex) {
+                    newRow.add(row.get(idx));
+                }
+                prunedConstantExprsList.add(newRow.build());
+            }
+        }
+
+        if (prunedOutputs.equals(originOutput) && 
!context.requiredSlotsIds.isEmpty()) {
+            return recursiveCte;
+        } else {
+            return 
recursiveCte.withNewOutputsChildrenAndConstExprsList(prunedOutputs, children,
+                    regularChildrenOutputs, prunedConstantExprsList.build());
+        }
+    }
+
     private LogicalUnion pruneUnionOutput(LogicalUnion union, PruneContext 
context) {
         List<NamedExpression> originOutput = union.getOutputs();
         if (originOutput.isEmpty()) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/stats/StatsCalculator.java 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/stats/StatsCalculator.java
index f0ca1f1e6ba..a14c2dfc9cb 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/stats/StatsCalculator.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/stats/StatsCalculator.java
@@ -90,6 +90,8 @@ import 
org.apache.doris.nereids.trees.plans.logical.LogicalOlapScan;
 import org.apache.doris.nereids.trees.plans.logical.LogicalOneRowRelation;
 import org.apache.doris.nereids.trees.plans.logical.LogicalPartitionTopN;
 import org.apache.doris.nereids.trees.plans.logical.LogicalProject;
+import org.apache.doris.nereids.trees.plans.logical.LogicalRecursiveCte;
+import org.apache.doris.nereids.trees.plans.logical.LogicalRecursiveCteScan;
 import org.apache.doris.nereids.trees.plans.logical.LogicalRepeat;
 import org.apache.doris.nereids.trees.plans.logical.LogicalSchemaScan;
 import org.apache.doris.nereids.trees.plans.logical.LogicalSink;
@@ -123,6 +125,8 @@ import 
org.apache.doris.nereids.trees.plans.physical.PhysicalOneRowRelation;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalPartitionTopN;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalProject;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalQuickSort;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalRecursiveCte;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalRecursiveCteScan;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalRelation;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalRepeat;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalSchemaScan;
@@ -869,6 +873,12 @@ public class StatsCalculator extends 
DefaultPlanVisitor<Statistics, Void> {
         return computeCatalogRelation(esScan);
     }
 
+    @Override
+    public Statistics visitLogicalRecursiveCteScan(LogicalRecursiveCteScan 
recursiveCteScan, Void context) {
+        recursiveCteScan.getExpressions();
+        return computeCatalogRelation(recursiveCteScan);
+    }
+
     @Override
     public Statistics visitLogicalProject(LogicalProject<? extends Plan> 
project, Void context) {
         return computeProject(project, groupExpression.childStatistics(0));
@@ -912,6 +922,14 @@ public class StatsCalculator extends 
DefaultPlanVisitor<Statistics, Void> {
         return computeAssertNumRows(assertNumRows.getAssertNumRowsElement(), 
groupExpression.childStatistics(0));
     }
 
+    @Override
+    public Statistics visitLogicalRecursiveCte(
+            LogicalRecursiveCte recursiveCte, Void context) {
+        return computeUnion(recursiveCte,
+                groupExpression.children()
+                        
.stream().map(Group::getStatistics).collect(Collectors.toList()));
+    }
+
     @Override
     public Statistics visitLogicalUnion(
             LogicalUnion union, Void context) {
@@ -997,6 +1015,11 @@ public class StatsCalculator extends 
DefaultPlanVisitor<Statistics, Void> {
         return computeCatalogRelation(schemaScan);
     }
 
+    @Override
+    public Statistics visitPhysicalRecursiveCteScan(PhysicalRecursiveCteScan 
recursiveCteScan, Void context) {
+        return computeCatalogRelation(recursiveCteScan);
+    }
+
     @Override
     public Statistics visitPhysicalFileScan(PhysicalFileScan fileScan, Void 
context) {
         return computeCatalogRelation(fileScan);
@@ -1083,6 +1106,12 @@ public class StatsCalculator extends 
DefaultPlanVisitor<Statistics, Void> {
         return computeAssertNumRows(assertNumRows.getAssertNumRowsElement(), 
groupExpression.childStatistics(0));
     }
 
+    @Override
+    public Statistics visitPhysicalRecursiveCte(PhysicalRecursiveCte 
recursiveCte, Void context) {
+        return computeUnion(recursiveCte, groupExpression.children()
+                
.stream().map(Group::getStatistics).collect(Collectors.toList()));
+    }
+
     @Override
     public Statistics visitPhysicalUnion(PhysicalUnion union, Void context) {
         return computeUnion(union, groupExpression.children()
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/copier/LogicalPlanDeepCopier.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/copier/LogicalPlanDeepCopier.java
index e85c6eb8dae..c9cf877f709 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/copier/LogicalPlanDeepCopier.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/copier/LogicalPlanDeepCopier.java
@@ -53,6 +53,7 @@ import 
org.apache.doris.nereids.trees.plans.logical.LogicalOneRowRelation;
 import org.apache.doris.nereids.trees.plans.logical.LogicalPartitionTopN;
 import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
 import org.apache.doris.nereids.trees.plans.logical.LogicalProject;
+import org.apache.doris.nereids.trees.plans.logical.LogicalRecursiveCte;
 import org.apache.doris.nereids.trees.plans.logical.LogicalRelation;
 import org.apache.doris.nereids.trees.plans.logical.LogicalRepeat;
 import org.apache.doris.nereids.trees.plans.logical.LogicalSink;
@@ -354,6 +355,28 @@ public class LogicalPlanDeepCopier extends 
DefaultPlanRewriter<DeepCopierContext
                 constantExprsList, union.hasPushedFilter(), children);
     }
 
+    @Override
+    public Plan visitLogicalRecursiveCte(LogicalRecursiveCte recursiveCte, 
DeepCopierContext context) {
+        List<Plan> children = recursiveCte.children().stream()
+                .map(c -> c.accept(this, context))
+                .collect(ImmutableList.toImmutableList());
+        List<List<NamedExpression>> constantExprsList = 
recursiveCte.getConstantExprsList().stream()
+                .map(l -> l.stream()
+                        .map(e -> (NamedExpression) 
ExpressionDeepCopier.INSTANCE.deepCopy(e, context))
+                        .collect(ImmutableList.toImmutableList()))
+                .collect(ImmutableList.toImmutableList());
+        List<NamedExpression> outputs = recursiveCte.getOutputs().stream()
+                .map(o -> (NamedExpression) 
ExpressionDeepCopier.INSTANCE.deepCopy(o, context))
+                .collect(ImmutableList.toImmutableList());
+        List<List<SlotReference>> childrenOutputs = 
recursiveCte.getRegularChildrenOutputs().stream()
+                .map(childOutputs -> childOutputs.stream()
+                        .map(o -> (SlotReference) 
ExpressionDeepCopier.INSTANCE.deepCopy(o, context))
+                        .collect(ImmutableList.toImmutableList()))
+                .collect(ImmutableList.toImmutableList());
+        return new LogicalRecursiveCte(recursiveCte.getQualifier(), outputs, 
childrenOutputs,
+                constantExprsList, recursiveCte.hasPushedFilter(), children);
+    }
+
     @Override
     public Plan visitLogicalExcept(LogicalExcept except, DeepCopierContext 
context) {
         List<Plan> children = except.children().stream()
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/PlanType.java 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/PlanType.java
index f5ace7cd416..f67558962f4 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/PlanType.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/PlanType.java
@@ -83,6 +83,8 @@ public enum PlanType {
     LOGICAL_PARTITION_TOP_N,
     LOGICAL_PROJECT,
     LOGICAL_QUALIFY,
+    LOGICAL_RECURSIVE_CTE,
+    LOGICAL_RECURSIVE_CTE_SCAN,
     LOGICAL_REPEAT,
     LOGICAL_SELECT_HINT,
     LOGICAL_SUBQUERY_ALIAS,
@@ -107,6 +109,7 @@ public enum PlanType {
     PHYSICAL_OLAP_SCAN,
     PHYSICAL_SCHEMA_SCAN,
     PHYSICAL_TVF_RELATION,
+    PHYSICAL_RECURSIVE_CTE_SCAN,
 
     // physical sinks
     PHYSICAL_FILE_SINK,
@@ -123,6 +126,7 @@ public enum PlanType {
     PHYSICAL_ASSERT_NUM_ROWS,
     PHYSICAL_CTE_PRODUCER,
     PHYSICAL_CTE_ANCHOR,
+    PHYSICAL_RECURSIVE_CTE,
     PHYSICAL_DISTRIBUTE,
     PHYSICAL_EXCEPT,
     PHYSICAL_FILTER,
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/CreateMaterializedViewCommand.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/CreateMaterializedViewCommand.java
index 3d9db687908..3ea481294da 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/CreateMaterializedViewCommand.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/CreateMaterializedViewCommand.java
@@ -323,7 +323,7 @@ public class CreateMaterializedViewCommand extends Command 
implements ForwardWit
                 }
                 try {
                     Expr defineExpr = translateToLegacyExpr(predicate, 
context.planTranslatorContext);
-                    context.filterItem = new 
MVColumnItem(defineExpr.toSqlWithoutTbl(), defineExpr);
+                    context.filterItem = new MVColumnItem(predicate.toSql(), 
defineExpr);
                 } catch (Exception ex) {
                     throw new AnalysisException(ex.getMessage());
                 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/AssignedJobBuilder.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/AssignedJobBuilder.java
index 9369acd8d6f..4291ba32732 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/AssignedJobBuilder.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/AssignedJobBuilder.java
@@ -17,19 +17,33 @@
 
 package org.apache.doris.nereids.trees.plans.distribute.worker.job;
 
+import org.apache.doris.analysis.Expr;
 import org.apache.doris.nereids.trees.plans.distribute.DistributeContext;
 import 
org.apache.doris.nereids.trees.plans.distribute.worker.BackendDistributedPlanWorkerManager;
+import 
org.apache.doris.nereids.trees.plans.distribute.worker.DistributedPlanWorker;
 import org.apache.doris.planner.ExchangeNode;
+import org.apache.doris.planner.PlanFragment;
 import org.apache.doris.planner.PlanFragmentId;
+import org.apache.doris.planner.RecursiveCteNode;
+import org.apache.doris.planner.RecursiveCteScanNode;
 import org.apache.doris.thrift.TExplainLevel;
+import org.apache.doris.thrift.TExpr;
+import org.apache.doris.thrift.TNetworkAddress;
+import org.apache.doris.thrift.TRecCTENode;
+import org.apache.doris.thrift.TRecCTEResetInfo;
+import org.apache.doris.thrift.TRecCTETarget;
 
 import com.google.common.collect.ArrayListMultimap;
 import com.google.common.collect.ListMultimap;
 
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.TreeSet;
 
 /** AssignedJobBuilder */
 public class AssignedJobBuilder {
@@ -39,6 +53,8 @@ public class AssignedJobBuilder {
             boolean isLoadJob) {
         DistributeContext distributeContext = new 
DistributeContext(workerManager, isLoadJob);
         ListMultimap<PlanFragmentId, AssignedJob> allAssignedJobs = 
ArrayListMultimap.create();
+        Map<PlanFragmentId, TRecCTETarget> fragmentIdToRecCteTargetMap = new 
TreeMap<>();
+        Map<PlanFragmentId, Set<TNetworkAddress>> 
fragmentIdToNetworkAddressMap = new TreeMap<>();
         for (Entry<PlanFragmentId, UnassignedJob> kv : 
unassignedJobs.entrySet()) {
             PlanFragmentId fragmentId = kv.getKey();
             UnassignedJob unassignedJob = kv.getValue();
@@ -55,6 +71,83 @@ public class AssignedJobBuilder {
                         + ", fragment: " + 
unassignedJob.getFragment().getExplainString(TExplainLevel.VERBOSE));
             }
             allAssignedJobs.putAll(fragmentId, fragmentAssignedJobs);
+
+            Set<TNetworkAddress> networkAddresses = new TreeSet<>();
+            for (AssignedJob assignedJob : fragmentAssignedJobs) {
+                DistributedPlanWorker distributedPlanWorker = 
assignedJob.getAssignedWorker();
+                networkAddresses.add(new 
TNetworkAddress(distributedPlanWorker.host(), distributedPlanWorker.port()));
+            }
+            fragmentIdToNetworkAddressMap.put(fragmentId, networkAddresses);
+
+            PlanFragment planFragment = unassignedJob.getFragment();
+            List<RecursiveCteScanNode> recursiveCteScanNodes = 
planFragment.getPlanRoot()
+                    
.collectInCurrentFragment(RecursiveCteScanNode.class::isInstance);
+            if (!recursiveCteScanNodes.isEmpty()) {
+                if (recursiveCteScanNodes.size() != 1) {
+                    throw new IllegalStateException(
+                            String.format("one fragment can only have 1 
recursive cte scan node, but there is %d",
+                                    recursiveCteScanNodes.size()));
+                }
+                if (fragmentAssignedJobs.size() != 1) {
+                    throw new IllegalStateException(String.format(
+                            "fragmentAssignedJobs's size must be 1 for 
recursive cte scan node, but it is %d",
+                            fragmentAssignedJobs.size()));
+                }
+                TRecCTETarget tRecCTETarget = new TRecCTETarget();
+                DistributedPlanWorker distributedPlanWorker = 
fragmentAssignedJobs.get(0).getAssignedWorker();
+                tRecCTETarget.setAddr(new 
TNetworkAddress(distributedPlanWorker.host(), distributedPlanWorker.port()));
+                
tRecCTETarget.setFragmentInstanceId(fragmentAssignedJobs.get(0).instanceId());
+                
tRecCTETarget.setNodeId(recursiveCteScanNodes.get(0).getId().asInt());
+                fragmentIdToRecCteTargetMap.put(fragmentId, tRecCTETarget);
+            }
+
+            List<RecursiveCteNode> recursiveCteNodes = 
planFragment.getPlanRoot()
+                    
.collectInCurrentFragment(RecursiveCteNode.class::isInstance);
+            if (!recursiveCteNodes.isEmpty()) {
+                if (recursiveCteNodes.size() != 1) {
+                    throw new IllegalStateException(
+                            String.format("one fragment can only have 1 
recursive cte node, but there is %d",
+                                    recursiveCteNodes.size()));
+                }
+
+                List<TRecCTETarget> targets = new ArrayList<>();
+                List<TRecCTEResetInfo> fragmentsToReset = new ArrayList<>();
+                // PhysicalPlanTranslator will swap recursiveCteNodes's child 
fragment,
+                // so we get recursive one by 1st child
+                List<PlanFragment> childFragments = new ArrayList<>();
+                
planFragment.getChild(0).collectAll(PlanFragment.class::isInstance, 
childFragments);
+                for (PlanFragment child : childFragments) {
+                    PlanFragmentId childFragmentId = child.getFragmentId();
+                    TRecCTETarget tRecCTETarget = 
fragmentIdToRecCteTargetMap.getOrDefault(childFragmentId, null);
+                    if (tRecCTETarget != null) {
+                        targets.add(tRecCTETarget);
+                    }
+                    Set<TNetworkAddress> tNetworkAddresses = 
fragmentIdToNetworkAddressMap.get(childFragmentId);
+                    if (tNetworkAddresses == null) {
+                        throw new IllegalStateException(
+                                String.format("can't find TNetworkAddress for 
fragment %d", childFragmentId));
+                    }
+                    for (TNetworkAddress address : tNetworkAddresses) {
+                        TRecCTEResetInfo tRecCTEResetInfo = new 
TRecCTEResetInfo();
+                        
tRecCTEResetInfo.setFragmentId(childFragmentId.asInt());
+                        tRecCTEResetInfo.setAddr(address);
+                        fragmentsToReset.add(tRecCTEResetInfo);
+                    }
+                }
+
+                RecursiveCteNode recursiveCteNode = recursiveCteNodes.get(0);
+                List<List<Expr>> materializedResultExprLists = 
recursiveCteNode.getMaterializedResultExprLists();
+                List<List<TExpr>> texprLists = new 
ArrayList<>(materializedResultExprLists.size());
+                for (List<Expr> exprList : materializedResultExprLists) {
+                    texprLists.add(Expr.treesToThrift(exprList));
+                }
+                TRecCTENode tRecCTENode = new TRecCTENode();
+                tRecCTENode.setIsUnionAll(recursiveCteNode.isUnionAll());
+                tRecCTENode.setTargets(targets);
+                tRecCTENode.setFragmentsToReset(fragmentsToReset);
+                tRecCTENode.setResultExprLists(texprLists);
+                recursiveCteNode.settRecCTENode(tRecCTENode);
+            }
         }
         return allAssignedJobs;
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedJobBuilder.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedJobBuilder.java
index bc20d3efa17..1229bc59455 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedJobBuilder.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedJobBuilder.java
@@ -29,6 +29,7 @@ import org.apache.doris.planner.OlapScanNode;
 import org.apache.doris.planner.PlanFragment;
 import org.apache.doris.planner.PlanFragmentId;
 import org.apache.doris.planner.PlanNodeId;
+import org.apache.doris.planner.RecursiveCteScanNode;
 import org.apache.doris.planner.ScanNode;
 import org.apache.doris.planner.SchemaScanNode;
 import org.apache.doris.thrift.TExplainLevel;
@@ -126,6 +127,10 @@ public class UnassignedJobBuilder {
                 unassignedJob = buildScanMetadataJob(
                         statementContext, planFragment, (SchemaScanNode) 
scanNode, scanWorkerSelector
                 );
+            } else if (scanNode instanceof RecursiveCteScanNode) {
+                unassignedJob = buildScanRecursiveCteJob(
+                        statementContext, planFragment, (RecursiveCteScanNode) 
scanNode, inputJobs, scanWorkerSelector
+                );
             } else {
                 // only scan external tables or cloud tables or table valued 
functions
                 // e,g. select * from numbers('number'='100')
@@ -196,6 +201,14 @@ public class UnassignedJobBuilder {
         return new UnassignedScanMetadataJob(statementContext, fragment, 
schemaScanNode, scanWorkerSelector);
     }
 
+    private UnassignedJob buildScanRecursiveCteJob(
+            StatementContext statementContext, PlanFragment fragment,
+            RecursiveCteScanNode recursiveCteScanNode,
+            ListMultimap<ExchangeNode, UnassignedJob> inputJobs, 
ScanWorkerSelector scanWorkerSelector) {
+        return new UnassignedRecursiveCteScanJob(statementContext, fragment, 
recursiveCteScanNode,
+                inputJobs, scanWorkerSelector);
+    }
+
     private UnassignedJob buildScanRemoteTableJob(
             StatementContext statementContext, PlanFragment planFragment, 
List<ScanNode> scanNodes,
             ListMultimap<ExchangeNode, UnassignedJob> inputJobs,
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedRecursiveCteScanJob.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedRecursiveCteScanJob.java
new file mode 100644
index 00000000000..7fbb3f88e21
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedRecursiveCteScanJob.java
@@ -0,0 +1,62 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.trees.plans.distribute.worker.job;
+
+import org.apache.doris.nereids.StatementContext;
+import org.apache.doris.nereids.trees.plans.distribute.DistributeContext;
+import 
org.apache.doris.nereids.trees.plans.distribute.worker.DistributedPlanWorker;
+import 
org.apache.doris.nereids.trees.plans.distribute.worker.DistributedPlanWorkerManager;
+import 
org.apache.doris.nereids.trees.plans.distribute.worker.ScanWorkerSelector;
+import org.apache.doris.planner.ExchangeNode;
+import org.apache.doris.planner.PlanFragment;
+import org.apache.doris.planner.ScanNode;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ListMultimap;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * UnassignedRecursiveCteScanJob
+ */
+public class UnassignedRecursiveCteScanJob extends AbstractUnassignedScanJob {
+    private final ScanWorkerSelector scanWorkerSelector;
+
+    public UnassignedRecursiveCteScanJob(
+            StatementContext statementContext, PlanFragment fragment, ScanNode 
scanNode,
+            ListMultimap<ExchangeNode, UnassignedJob> exchangeToChildJob, 
ScanWorkerSelector scanWorkerSelector) {
+        super(statementContext, fragment, ImmutableList.of(scanNode), 
exchangeToChildJob);
+        this.scanWorkerSelector = Objects.requireNonNull(scanWorkerSelector, 
"scanWorkerSelector is not null");
+    }
+
+    @Override
+    protected Map<DistributedPlanWorker, UninstancedScanSource> 
multipleMachinesParallelization(
+            DistributeContext distributeContext, ListMultimap<ExchangeNode, 
AssignedJob> inputJobs) {
+        return scanWorkerSelector.selectReplicaAndWorkerWithoutBucket(
+                scanNodes.get(0), statementContext.getConnectContext()
+        );
+    }
+
+    @Override
+    protected List<AssignedJob> fillUpAssignedJobs(List<AssignedJob> 
assignedJobs,
+            DistributedPlanWorkerManager workerManager, 
ListMultimap<ExchangeNode, AssignedJob> inputJobs) {
+        return fillUpSingleEmptyInstance(workerManager);
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalRecursiveCte.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalRecursiveCte.java
new file mode 100644
index 00000000000..cfdf317c620
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalRecursiveCte.java
@@ -0,0 +1,419 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.trees.plans.logical;
+
+import org.apache.doris.common.Pair;
+import org.apache.doris.nereids.CascadesContext;
+import org.apache.doris.nereids.memo.GroupExpression;
+import org.apache.doris.nereids.properties.DataTrait;
+import org.apache.doris.nereids.properties.LogicalProperties;
+import org.apache.doris.nereids.properties.PhysicalProperties;
+import org.apache.doris.nereids.rules.expression.ExpressionRewriteContext;
+import org.apache.doris.nereids.trees.expressions.Expression;
+import org.apache.doris.nereids.trees.expressions.NamedExpression;
+import org.apache.doris.nereids.trees.expressions.Slot;
+import org.apache.doris.nereids.trees.expressions.SlotReference;
+import org.apache.doris.nereids.trees.expressions.literal.Literal;
+import org.apache.doris.nereids.trees.plans.Plan;
+import org.apache.doris.nereids.trees.plans.PlanType;
+import org.apache.doris.nereids.trees.plans.algebra.Union;
+import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
+import org.apache.doris.nereids.types.DataType;
+import org.apache.doris.nereids.util.ExpressionUtils;
+import org.apache.doris.nereids.util.TypeCoercionUtils;
+import org.apache.doris.nereids.util.Utils;
+import org.apache.doris.qe.ConnectContext;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Lists;
+
+import java.util.ArrayList;
+import java.util.BitSet;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+
+/**
+ * LogicalRecursiveCte is basically like LogicalUnion
+ */
+public class LogicalRecursiveCte extends LogicalSetOperation implements Union, 
OutputPrunable {
+
+    // in doris, we use union node to present one row relation
+    private final List<List<NamedExpression>> constantExprsList;
+    // When there is an agg on the union and there is a filter on the agg,
+    // it is necessary to keep the filter on the agg and push the filter down 
to each child of the union.
+    private final boolean hasPushedFilter;
+
+    /** LogicalRecursiveCte */
+    public LogicalRecursiveCte(Qualifier qualifier, List<Plan> children) {
+        this(qualifier, ImmutableList.of(), children);
+    }
+
+    /** LogicalRecursiveCte */
+    public LogicalRecursiveCte(Qualifier qualifier, 
List<List<NamedExpression>> constantExprsList,
+            List<Plan> children) {
+        this(qualifier, ImmutableList.of(), ImmutableList.of(), 
constantExprsList, false, children);
+    }
+
+    /** LogicalRecursiveCte */
+    public LogicalRecursiveCte(Qualifier qualifier, List<NamedExpression> 
outputs,
+            List<List<SlotReference>> childrenOutputs,
+            List<List<NamedExpression>> constantExprsList, boolean 
hasPushedFilter, List<Plan> children) {
+        this(qualifier, outputs, childrenOutputs, constantExprsList, 
hasPushedFilter, Optional.empty(),
+                Optional.empty(),
+                children);
+    }
+
+    /** LogicalRecursiveCte */
+    public LogicalRecursiveCte(Qualifier qualifier, List<NamedExpression> 
outputs,
+            List<List<SlotReference>> childrenOutputs,
+            List<List<NamedExpression>> constantExprsList, boolean 
hasPushedFilter,
+            Optional<GroupExpression> groupExpression, 
Optional<LogicalProperties> logicalProperties,
+            List<Plan> children) {
+        super(PlanType.LOGICAL_RECURSIVE_CTE, qualifier, outputs, 
childrenOutputs,
+                groupExpression, logicalProperties, children);
+        this.hasPushedFilter = hasPushedFilter;
+        this.constantExprsList = Utils.fastToImmutableList(
+                Objects.requireNonNull(constantExprsList, "constantExprsList 
should not be null"));
+    }
+
+    public boolean hasPushedFilter() {
+        return hasPushedFilter;
+    }
+
+    public List<List<NamedExpression>> getConstantExprsList() {
+        return constantExprsList;
+    }
+
+    @Override
+    public List<? extends Expression> getExpressions() {
+        return 
constantExprsList.stream().flatMap(List::stream).collect(ImmutableList.toImmutableList());
+    }
+
+    @Override
+    public String toString() {
+        return Utils.toSqlStringSkipNull("LogicalRecursiveCte",
+                "qualifier", qualifier,
+                "outputs", outputs,
+                "regularChildrenOutputs", regularChildrenOutputs,
+                "constantExprsList", constantExprsList,
+                "hasPushedFilter", hasPushedFilter,
+                "stats", statistics);
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        LogicalRecursiveCte that = (LogicalRecursiveCte) o;
+        return super.equals(that) && hasPushedFilter == that.hasPushedFilter
+                && Objects.equals(constantExprsList, that.constantExprsList);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(super.hashCode(), hasPushedFilter, 
constantExprsList);
+    }
+
+    @Override
+    public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
+        return visitor.visitLogicalRecursiveCte(this, context);
+    }
+
+    @Override
+    public LogicalRecursiveCte withChildren(List<Plan> children) {
+        return new LogicalRecursiveCte(qualifier, outputs, 
regularChildrenOutputs,
+                constantExprsList, hasPushedFilter, children);
+    }
+
+    @Override
+    public LogicalSetOperation withChildrenAndTheirOutputs(List<Plan> children,
+            List<List<SlotReference>> childrenOutputs) {
+        Preconditions.checkArgument(children.size() == childrenOutputs.size(),
+                "children size %s is not equals with children outputs size %s",
+                children.size(), childrenOutputs.size());
+        return new LogicalRecursiveCte(qualifier, outputs, childrenOutputs, 
constantExprsList, hasPushedFilter,
+                children);
+    }
+
+    @Override
+    public LogicalRecursiveCte withGroupExpression(Optional<GroupExpression> 
groupExpression) {
+        return new LogicalRecursiveCte(qualifier, outputs, 
regularChildrenOutputs, constantExprsList, hasPushedFilter,
+                groupExpression, Optional.of(getLogicalProperties()), 
children);
+    }
+
+    @Override
+    public Plan withGroupExprLogicalPropChildren(Optional<GroupExpression> 
groupExpression,
+            Optional<LogicalProperties> logicalProperties, List<Plan> 
children) {
+        return new LogicalRecursiveCte(qualifier, outputs, 
regularChildrenOutputs, constantExprsList, hasPushedFilter,
+                groupExpression, logicalProperties, children);
+    }
+
+    @Override
+    public LogicalRecursiveCte withNewOutputs(List<NamedExpression> 
newOutputs) {
+        return new LogicalRecursiveCte(qualifier, newOutputs, 
regularChildrenOutputs, constantExprsList,
+                hasPushedFilter, Optional.empty(), Optional.empty(), children);
+    }
+
+    public LogicalRecursiveCte 
withNewOutputsAndConstExprsList(List<NamedExpression> newOutputs,
+            List<List<NamedExpression>> constantExprsList) {
+        return new LogicalRecursiveCte(qualifier, newOutputs, 
regularChildrenOutputs, constantExprsList,
+                hasPushedFilter, Optional.empty(), Optional.empty(), children);
+    }
+
+    public LogicalRecursiveCte withChildrenAndConstExprsList(List<Plan> 
children,
+            List<List<SlotReference>> childrenOutputs, 
List<List<NamedExpression>> constantExprsList) {
+        return new LogicalRecursiveCte(qualifier, outputs, childrenOutputs, 
constantExprsList, hasPushedFilter,
+                children);
+    }
+
+    public LogicalRecursiveCte 
withNewOutputsChildrenAndConstExprsList(List<NamedExpression> newOutputs,
+            List<Plan> children,
+            List<List<SlotReference>> childrenOutputs,
+            List<List<NamedExpression>> constantExprsList) {
+        return new LogicalRecursiveCte(qualifier, newOutputs, childrenOutputs, 
constantExprsList,
+                hasPushedFilter, Optional.empty(), Optional.empty(), children);
+    }
+
+    public LogicalRecursiveCte withAllQualifier() {
+        return new LogicalRecursiveCte(Qualifier.ALL, outputs, 
regularChildrenOutputs, constantExprsList,
+                hasPushedFilter,
+                Optional.empty(), Optional.empty(), children);
+    }
+
+    @Override
+    public LogicalRecursiveCte pruneOutputs(List<NamedExpression> 
prunedOutputs) {
+        return withNewOutputs(prunedOutputs);
+    }
+
+    @Override
+    public void computeUnique(DataTrait.Builder builder) {
+        if (qualifier == Qualifier.DISTINCT) {
+            builder.addUniqueSlot(ImmutableSet.copyOf(getOutput()));
+        }
+    }
+
+    @Override
+    public void computeUniform(DataTrait.Builder builder) {
+        final Optional<ExpressionRewriteContext> context = 
ConnectContext.get() == null ? Optional.empty()
+                : Optional.of(new 
ExpressionRewriteContext(CascadesContext.initContext(
+                        ConnectContext.get().getStatementContext(), this, 
PhysicalProperties.ANY)));
+        for (int i = 0; i < getOutputs().size(); i++) {
+            Optional<Literal> value = Optional.empty();
+            if (!constantExprsList.isEmpty()) {
+                value = 
ExpressionUtils.checkConstantExpr(constantExprsList.get(0).get(i), context);
+                if (!value.isPresent()) {
+                    continue;
+                }
+                final int fi = i;
+                Literal literal = value.get();
+                if (constantExprsList.stream()
+                        .map(exprs -> 
ExpressionUtils.checkConstantExpr(exprs.get(fi), context))
+                        .anyMatch(val -> !val.isPresent() || 
!val.get().equals(literal))) {
+                    continue;
+                }
+            }
+            for (int childIdx = 0; childIdx < children.size(); childIdx++) {
+                // TODO: use originOutputs = child(childIdx).getOutput() ?
+                List<? extends Slot> originOutputs = 
regularChildrenOutputs.get(childIdx);
+                Slot slot = originOutputs.get(i);
+                Optional<Expression> childValue = 
child(childIdx).getLogicalProperties()
+                        .getTrait().getUniformValue(slot);
+                if (childValue == null || !childValue.isPresent() || 
!childValue.get().isConstant()) {
+                    value = Optional.empty();
+                    break;
+                }
+                Optional<Literal> constExprOpt = 
ExpressionUtils.checkConstantExpr(childValue.get(), context);
+                if (!constExprOpt.isPresent()) {
+                    value = Optional.empty();
+                    break;
+                }
+                if (!value.isPresent()) {
+                    value = constExprOpt;
+                } else if (!value.equals(constExprOpt)) {
+                    value = Optional.empty();
+                    break;
+                }
+            }
+            if (value.isPresent()) {
+                builder.addUniformSlotAndLiteral(getOutputs().get(i).toSlot(), 
value.get());
+            }
+        }
+    }
+
+    @Override
+    public boolean hasUnboundExpression() {
+        if (!constantExprsList.isEmpty() && children.isEmpty()) {
+            return false;
+        }
+        return super.hasUnboundExpression();
+    }
+
+    private List<BitSet> mapSlotToIndex(Plan plan, List<Set<Slot>> 
equalSlotsList) {
+        Map<Slot, Integer> slotToIndex = new HashMap<>();
+        for (int i = 0; i < plan.getOutput().size(); i++) {
+            slotToIndex.put(plan.getOutput().get(i), i);
+        }
+        List<BitSet> equalSlotIndicesList = new ArrayList<>();
+        for (Set<Slot> equalSlots : equalSlotsList) {
+            BitSet equalSlotIndices = new BitSet();
+            for (Slot slot : equalSlots) {
+                if (slotToIndex.containsKey(slot)) {
+                    equalSlotIndices.set(slotToIndex.get(slot));
+                }
+            }
+            if (equalSlotIndices.cardinality() > 1) {
+                equalSlotIndicesList.add(equalSlotIndices);
+            }
+        }
+        return equalSlotIndicesList;
+    }
+
+    @Override
+    public void computeEqualSet(DataTrait.Builder builder) {
+        if (children.isEmpty()) {
+            return;
+        }
+
+        // Get the list of equal slot sets and their corresponding index 
mappings for the first child
+        List<Set<Slot>> childEqualSlotsList = child(0).getLogicalProperties()
+                .getTrait().calAllEqualSet();
+        List<BitSet> childEqualSlotsIndicesList = mapSlotToIndex(child(0), 
childEqualSlotsList);
+        List<BitSet> unionEqualSlotIndicesList = new 
ArrayList<>(childEqualSlotsIndicesList);
+
+        // Traverse all children and find the equal sets that exist in all 
children
+        for (int i = 1; i < children.size(); i++) {
+            Plan child = children.get(i);
+
+            // Get the equal slot sets for the current child
+            childEqualSlotsList = 
child.getLogicalProperties().getTrait().calAllEqualSet();
+
+            // Map slots to indices for the current child
+            childEqualSlotsIndicesList = mapSlotToIndex(child, 
childEqualSlotsList);
+
+            // Only keep the equal pairs that exist in all children of the 
union
+            // This is done by calculating the intersection of all children's 
equal slot indices
+            for (BitSet unionEqualSlotIndices : unionEqualSlotIndicesList) {
+                BitSet intersect = new BitSet();
+                for (BitSet childEqualSlotIndices : 
childEqualSlotsIndicesList) {
+                    if 
(unionEqualSlotIndices.intersects(childEqualSlotIndices)) {
+                        intersect = childEqualSlotIndices;
+                        break;
+                    }
+                }
+                unionEqualSlotIndices.and(intersect);
+            }
+        }
+
+        // Build the functional dependencies for the output slots
+        List<Slot> outputList = getOutput();
+        for (BitSet equalSlotIndices : unionEqualSlotIndicesList) {
+            if (equalSlotIndices.cardinality() <= 1) {
+                continue;
+            }
+            int first = equalSlotIndices.nextSetBit(0);
+            int next = equalSlotIndices.nextSetBit(first + 1);
+            while (next > 0) {
+                builder.addEqualPair(outputList.get(first), 
outputList.get(next));
+                next = equalSlotIndices.nextSetBit(next + 1);
+            }
+        }
+    }
+
+    @Override
+    public void computeFd(DataTrait.Builder builder) {
+        // don't generate
+    }
+
+    /** castCommonDataTypeAndNullableByConstants */
+    public static Pair<List<List<NamedExpression>>, List<Boolean>> 
castCommonDataTypeAndNullableByConstants(
+            List<List<NamedExpression>> constantExprsList) {
+        int columnCount = constantExprsList.isEmpty() ? 0 : 
constantExprsList.get(0).size();
+        Pair<List<DataType>, List<Boolean>> commonInfo = 
computeCommonDataTypeAndNullable(constantExprsList,
+                columnCount);
+        List<List<NamedExpression>> castedRows = 
castToCommonType(constantExprsList, commonInfo.key(), columnCount);
+        List<Boolean> nullables = commonInfo.second;
+        return Pair.of(castedRows, nullables);
+    }
+
+    private static Pair<List<DataType>, List<Boolean>> 
computeCommonDataTypeAndNullable(
+            List<List<NamedExpression>> constantExprsList, int columnCount) {
+        List<Boolean> nullables = Lists.newArrayListWithCapacity(columnCount);
+        List<DataType> commonDataTypes = 
Lists.newArrayListWithCapacity(columnCount);
+        List<NamedExpression> firstRow = constantExprsList.get(0);
+        for (int columnId = 0; columnId < columnCount; columnId++) {
+            Expression constant = firstRow.get(columnId).child(0);
+            Pair<DataType, Boolean> commonDataTypeAndNullable = 
computeCommonDataTypeAndNullable(constant, columnId,
+                    constantExprsList);
+            commonDataTypes.add(commonDataTypeAndNullable.first);
+            nullables.add(commonDataTypeAndNullable.second);
+        }
+        return Pair.of(commonDataTypes, nullables);
+    }
+
+    private static Pair<DataType, Boolean> computeCommonDataTypeAndNullable(
+            Expression firstRowExpr, int columnId, List<List<NamedExpression>> 
constantExprsList) {
+        DataType commonDataType = firstRowExpr.getDataType();
+        boolean nullable = firstRowExpr.nullable();
+        for (int rowId = 1; rowId < constantExprsList.size(); rowId++) {
+            NamedExpression namedExpression = 
constantExprsList.get(rowId).get(columnId);
+            Expression otherConstant = namedExpression.child(0);
+            nullable |= otherConstant.nullable();
+            DataType otherDataType = otherConstant.getDataType();
+            commonDataType = getAssignmentCompatibleType(commonDataType, 
otherDataType);
+        }
+        return Pair.of(commonDataType, nullable);
+    }
+
+    private static List<List<NamedExpression>> castToCommonType(
+            List<List<NamedExpression>> rows, List<DataType> commonDataTypes, 
int columnCount) {
+        ImmutableList.Builder<List<NamedExpression>> castedConstants = 
ImmutableList
+                .builderWithExpectedSize(rows.size());
+        for (List<NamedExpression> row : rows) {
+            castedConstants.add(castToCommonType(row, commonDataTypes));
+        }
+        return castedConstants.build();
+    }
+
+    private static List<NamedExpression> 
castToCommonType(List<NamedExpression> row, List<DataType> commonTypes) {
+        ImmutableList.Builder<NamedExpression> castedRow = 
ImmutableList.builderWithExpectedSize(row.size());
+        boolean changed = false;
+        for (int columnId = 0; columnId < row.size(); columnId++) {
+            NamedExpression constantAlias = row.get(columnId);
+            Expression constant = constantAlias.child(0);
+            DataType commonType = commonTypes.get(columnId);
+            if (commonType.equals(constant.getDataType())) {
+                castedRow.add(constantAlias);
+            } else {
+                changed = true;
+                Expression expression = 
TypeCoercionUtils.castIfNotSameTypeStrict(constant, commonType);
+                castedRow.add((NamedExpression) 
constantAlias.withChildren(expression));
+            }
+        }
+        return changed ? castedRow.build() : row;
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalRecursiveCteScan.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalRecursiveCteScan.java
new file mode 100644
index 00000000000..7b1fc4862ee
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalRecursiveCteScan.java
@@ -0,0 +1,66 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.trees.plans.logical;
+
+import org.apache.doris.catalog.TableIf;
+import org.apache.doris.nereids.memo.GroupExpression;
+import org.apache.doris.nereids.properties.LogicalProperties;
+import org.apache.doris.nereids.trees.plans.Plan;
+import org.apache.doris.nereids.trees.plans.PlanType;
+import org.apache.doris.nereids.trees.plans.RelationId;
+import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
+
+import java.util.List;
+import java.util.Optional;
+
+/**
+ * LogicalRecursiveCteScan.
+ */
+public class LogicalRecursiveCteScan extends LogicalCatalogRelation {
+    public LogicalRecursiveCteScan(RelationId relationId, TableIf table, 
List<String> qualifier) {
+        this(relationId, table, qualifier, Optional.empty(), Optional.empty());
+    }
+
+    private LogicalRecursiveCteScan(RelationId relationId, TableIf table, 
List<String> qualifier,
+            Optional<GroupExpression> groupExpression, 
Optional<LogicalProperties> logicalProperties) {
+        super(relationId, PlanType.LOGICAL_RECURSIVE_CTE_SCAN, table, 
qualifier, groupExpression, logicalProperties);
+    }
+
+    @Override
+    public Plan withGroupExpression(Optional<GroupExpression> groupExpression) 
{
+        return new LogicalRecursiveCteScan(relationId, table, qualifier,
+                groupExpression, Optional.ofNullable(getLogicalProperties()));
+    }
+
+    @Override
+    public Plan withGroupExprLogicalPropChildren(Optional<GroupExpression> 
groupExpression,
+            Optional<LogicalProperties> logicalProperties, List<Plan> 
children) {
+        return new LogicalRecursiveCteScan(relationId, table, qualifier, 
groupExpression, logicalProperties);
+    }
+
+    @Override
+    public LogicalCatalogRelation withRelationId(RelationId relationId) {
+        return new LogicalRecursiveCteScan(relationId, table, qualifier,
+                groupExpression, Optional.ofNullable(getLogicalProperties()));
+    }
+
+    @Override
+    public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
+        return visitor.visitLogicalRecursiveCteScan(this, context);
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalSubQueryAlias.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalSubQueryAlias.java
index 328508fb3c7..e7230b5a31e 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalSubQueryAlias.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalSubQueryAlias.java
@@ -55,28 +55,36 @@ public class LogicalSubQueryAlias<CHILD_TYPE extends Plan> 
extends LogicalUnary<
     private final List<String> qualifier;
     private final Optional<List<String>> columnAliases;
 
+    private final boolean isRecursiveCte;
+
     public LogicalSubQueryAlias(String tableAlias, CHILD_TYPE child) {
-        this(ImmutableList.of(tableAlias), Optional.empty(), Optional.empty(), 
Optional.empty(), child);
+        this(ImmutableList.of(tableAlias), Optional.empty(), false, 
Optional.empty(), Optional.empty(), child);
     }
 
     public LogicalSubQueryAlias(List<String> qualifier, CHILD_TYPE child) {
-        this(qualifier, Optional.empty(), Optional.empty(), Optional.empty(), 
child);
+        this(qualifier, Optional.empty(), false, Optional.empty(), 
Optional.empty(), child);
     }
 
     public LogicalSubQueryAlias(String tableAlias, Optional<List<String>> 
columnAliases, CHILD_TYPE child) {
-        this(ImmutableList.of(tableAlias), columnAliases, Optional.empty(), 
Optional.empty(), child);
+        this(ImmutableList.of(tableAlias), columnAliases, false, 
Optional.empty(), Optional.empty(), child);
+    }
+
+    public LogicalSubQueryAlias(String tableAlias, Optional<List<String>> 
columnAliases, boolean isRecursiveCte,
+            CHILD_TYPE child) {
+        this(ImmutableList.of(tableAlias), columnAliases, isRecursiveCte, 
Optional.empty(), Optional.empty(), child);
     }
 
     public LogicalSubQueryAlias(List<String> qualifier, Optional<List<String>> 
columnAliases, CHILD_TYPE child) {
-        this(qualifier, columnAliases, Optional.empty(), Optional.empty(), 
child);
+        this(qualifier, columnAliases, false, Optional.empty(), 
Optional.empty(), child);
     }
 
-    public LogicalSubQueryAlias(List<String> qualifier, Optional<List<String>> 
columnAliases,
+    public LogicalSubQueryAlias(List<String> qualifier, Optional<List<String>> 
columnAliases, boolean isRecursiveCte,
             Optional<GroupExpression> groupExpression,
             Optional<LogicalProperties> logicalProperties, CHILD_TYPE child) {
         super(PlanType.LOGICAL_SUBQUERY_ALIAS, groupExpression, 
logicalProperties, child);
         this.qualifier = 
ImmutableList.copyOf(Objects.requireNonNull(qualifier, "qualifier is null"));
         this.columnAliases = columnAliases;
+        this.isRecursiveCte = isRecursiveCte;
     }
 
     @Override
@@ -128,10 +136,14 @@ public class LogicalSubQueryAlias<CHILD_TYPE extends 
Plan> extends LogicalUnary<
         return columnAliases;
     }
 
+    public boolean isRecursiveCte() {
+        return isRecursiveCte;
+    }
+
     @Override
     public String toString() {
         return columnAliases.map(strings -> 
Utils.toSqlString("LogicalSubQueryAlias",
-                "qualifier", qualifier,
+                "qualifier", qualifier, "isRecursiveCte", isRecursiveCte,
                 "columnAliases", StringUtils.join(strings, ",")
         )).orElseGet(() -> Utils.toSqlString("LogicalSubQueryAlias",
                 "qualifier", qualifier
@@ -158,7 +170,8 @@ public class LogicalSubQueryAlias<CHILD_TYPE extends Plan> 
extends LogicalUnary<
     @Override
     public LogicalSubQueryAlias<Plan> withChildren(List<Plan> children) {
         Preconditions.checkArgument(children.size() == 1);
-        return new LogicalSubQueryAlias<>(qualifier, columnAliases, 
children.get(0));
+        return new LogicalSubQueryAlias<>(qualifier, columnAliases, 
isRecursiveCte, Optional.empty(), Optional.empty(),
+                children.get(0));
     }
 
     @Override
@@ -173,7 +186,7 @@ public class LogicalSubQueryAlias<CHILD_TYPE extends Plan> 
extends LogicalUnary<
 
     @Override
     public LogicalSubQueryAlias<CHILD_TYPE> 
withGroupExpression(Optional<GroupExpression> groupExpression) {
-        return new LogicalSubQueryAlias<>(qualifier, columnAliases, 
groupExpression,
+        return new LogicalSubQueryAlias<>(qualifier, columnAliases, 
isRecursiveCte, groupExpression,
                 Optional.of(getLogicalProperties()), child());
     }
 
@@ -181,7 +194,7 @@ public class LogicalSubQueryAlias<CHILD_TYPE extends Plan> 
extends LogicalUnary<
     public Plan withGroupExprLogicalPropChildren(Optional<GroupExpression> 
groupExpression,
             Optional<LogicalProperties> logicalProperties, List<Plan> 
children) {
         Preconditions.checkArgument(children.size() == 1);
-        return new LogicalSubQueryAlias<>(qualifier, columnAliases, 
groupExpression, logicalProperties,
+        return new LogicalSubQueryAlias<>(qualifier, columnAliases, 
isRecursiveCte, groupExpression, logicalProperties,
                 children.get(0));
     }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalRecursiveCte.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalRecursiveCte.java
new file mode 100644
index 00000000000..ef0cc98de0a
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalRecursiveCte.java
@@ -0,0 +1,251 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.trees.plans.physical;
+
+import org.apache.doris.nereids.memo.GroupExpression;
+import org.apache.doris.nereids.properties.DataTrait;
+import org.apache.doris.nereids.properties.LogicalProperties;
+import org.apache.doris.nereids.properties.PhysicalProperties;
+import org.apache.doris.nereids.trees.expressions.Expression;
+import org.apache.doris.nereids.trees.expressions.NamedExpression;
+import org.apache.doris.nereids.trees.expressions.Slot;
+import org.apache.doris.nereids.trees.expressions.SlotReference;
+import org.apache.doris.nereids.trees.plans.Plan;
+import org.apache.doris.nereids.trees.plans.PlanType;
+import org.apache.doris.nereids.trees.plans.algebra.Union;
+import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
+import org.apache.doris.nereids.util.Utils;
+import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.statistics.Statistics;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+
+import java.util.ArrayList;
+import java.util.BitSet;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * PhysicalRecursiveCte is basically like PhysicalUnion
+ */
+public class PhysicalRecursiveCte extends PhysicalSetOperation implements 
Union {
+
+    // in doris, we use union node to present one row relation
+    private final List<List<NamedExpression>> constantExprsList;
+
+    /** PhysicalRecursiveCte */
+    public PhysicalRecursiveCte(Qualifier qualifier,
+            List<NamedExpression> outputs,
+            List<List<SlotReference>> childrenOutputs,
+            List<List<NamedExpression>> constantExprsList,
+            LogicalProperties logicalProperties,
+            List<Plan> children) {
+        super(PlanType.PHYSICAL_RECURSIVE_CTE, qualifier, outputs, 
childrenOutputs, logicalProperties, children);
+        this.constantExprsList = ImmutableList.copyOf(
+                Objects.requireNonNull(constantExprsList, "constantExprsList 
should not be null"));
+    }
+
+    /** PhysicalRecursiveCte */
+    public PhysicalRecursiveCte(Qualifier qualifier,
+            List<NamedExpression> outputs,
+            List<List<SlotReference>> childrenOutputs,
+            List<List<NamedExpression>> constantExprsList,
+            Optional<GroupExpression> groupExpression,
+            LogicalProperties logicalProperties,
+            List<Plan> children) {
+        super(PlanType.PHYSICAL_RECURSIVE_CTE, qualifier, outputs, 
childrenOutputs,
+                groupExpression, logicalProperties, children);
+        this.constantExprsList = ImmutableList.copyOf(
+                Objects.requireNonNull(constantExprsList, "constantExprsList 
should not be null"));
+    }
+
+    /** PhysicalRecursiveCte */
+    public PhysicalRecursiveCte(Qualifier qualifier, List<NamedExpression> 
outputs,
+            List<List<SlotReference>> childrenOutputs, 
List<List<NamedExpression>> constantExprsList,
+            Optional<GroupExpression> groupExpression, LogicalProperties 
logicalProperties,
+            PhysicalProperties physicalProperties, Statistics statistics, 
List<Plan> inputs) {
+        super(PlanType.PHYSICAL_RECURSIVE_CTE, qualifier, outputs, 
childrenOutputs,
+                groupExpression, logicalProperties, physicalProperties, 
statistics, inputs);
+        this.constantExprsList = ImmutableList.copyOf(
+                Objects.requireNonNull(constantExprsList, "constantExprsList 
should not be null"));
+    }
+
+    public List<List<NamedExpression>> getConstantExprsList() {
+        return constantExprsList;
+    }
+
+    @Override
+    public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
+        return visitor.visitPhysicalRecursiveCte(this, context);
+    }
+
+    @Override
+    public String toString() {
+        return Utils.toSqlString("PhysicalRecursiveCte" + "[" + id.asInt() + 
"]" + getGroupIdWithPrefix(),
+                "stats", statistics,
+                "qualifier", qualifier,
+                "outputs", outputs,
+                "regularChildrenOutputs", regularChildrenOutputs,
+                "constantExprsList", constantExprsList);
+    }
+
+    @Override
+    public String shapeInfo() {
+        ConnectContext context = ConnectContext.get();
+        if (context != null
+                && 
context.getSessionVariable().getDetailShapePlanNodesSet().contains(getClass().getSimpleName()))
 {
+            StringBuilder builder = new StringBuilder();
+            builder.append(getClass().getSimpleName());
+            builder.append("(constantExprsList=");
+            builder.append(constantExprsList.stream()
+                    .map(exprs -> exprs.stream().map(Expression::shapeInfo)
+                            .collect(Collectors.joining(", ", "[", "]")))
+                    .collect(Collectors.joining(", ", "[", "]")));
+            builder.append(")");
+            return builder.toString();
+        } else {
+            return super.shapeInfo();
+        }
+    }
+
+    @Override
+    public PhysicalRecursiveCte withChildren(List<Plan> children) {
+        return new PhysicalRecursiveCte(qualifier, outputs, 
regularChildrenOutputs, constantExprsList, groupExpression,
+                getLogicalProperties(), children);
+    }
+
+    @Override
+    public PhysicalRecursiveCte withGroupExpression(Optional<GroupExpression> 
groupExpression) {
+        return new PhysicalRecursiveCte(qualifier, outputs, 
regularChildrenOutputs, constantExprsList,
+                groupExpression, getLogicalProperties(), children);
+    }
+
+    @Override
+    public Plan withGroupExprLogicalPropChildren(Optional<GroupExpression> 
groupExpression,
+            Optional<LogicalProperties> logicalProperties, List<Plan> 
children) {
+        return new PhysicalRecursiveCte(qualifier, outputs, 
regularChildrenOutputs, constantExprsList,
+                groupExpression, logicalProperties.get(), children);
+    }
+
+    @Override
+    public PhysicalRecursiveCte withPhysicalPropertiesAndStats(
+            PhysicalProperties physicalProperties, Statistics statistics) {
+        return new PhysicalRecursiveCte(qualifier, outputs, 
regularChildrenOutputs, constantExprsList,
+                groupExpression, getLogicalProperties(), physicalProperties, 
statistics, children);
+    }
+
+    @Override
+    public PhysicalRecursiveCte resetLogicalProperties() {
+        return new PhysicalRecursiveCte(qualifier, outputs, 
regularChildrenOutputs, constantExprsList,
+                Optional.empty(), null, physicalProperties, statistics, 
children);
+    }
+
+    @Override
+    public void computeUnique(DataTrait.Builder builder) {
+        if (qualifier == Qualifier.DISTINCT) {
+            builder.addUniqueSlot(ImmutableSet.copyOf(getOutput()));
+        }
+    }
+
+    @Override
+    public void computeUniform(DataTrait.Builder builder) {
+        // don't propagate uniform slots
+    }
+
+    private List<BitSet> mapSlotToIndex(Plan plan, List<Set<Slot>> 
equalSlotsList) {
+        Map<Slot, Integer> slotToIndex = new HashMap<>();
+        for (int i = 0; i < plan.getOutput().size(); i++) {
+            slotToIndex.put(plan.getOutput().get(i), i);
+        }
+        List<BitSet> equalSlotIndicesList = new ArrayList<>();
+        for (Set<Slot> equalSlots : equalSlotsList) {
+            BitSet equalSlotIndices = new BitSet();
+            for (Slot slot : equalSlots) {
+                if (slotToIndex.containsKey(slot)) {
+                    equalSlotIndices.set(slotToIndex.get(slot));
+                }
+            }
+            if (equalSlotIndices.cardinality() > 1) {
+                equalSlotIndicesList.add(equalSlotIndices);
+            }
+        }
+        return equalSlotIndicesList;
+    }
+
+    @Override
+    public void computeEqualSet(DataTrait.Builder builder) {
+        if (children.isEmpty()) {
+            return;
+        }
+
+        // Get the list of equal slot sets and their corresponding index 
mappings for the first child
+        List<Set<Slot>> childEqualSlotsList = child(0).getLogicalProperties()
+                .getTrait().calAllEqualSet();
+        List<BitSet> childEqualSlotsIndicesList = mapSlotToIndex(child(0), 
childEqualSlotsList);
+        List<BitSet> unionEqualSlotIndicesList = new 
ArrayList<>(childEqualSlotsIndicesList);
+
+        // Traverse all children and find the equal sets that exist in all 
children
+        for (int i = 1; i < children.size(); i++) {
+            Plan child = children.get(i);
+
+            // Get the equal slot sets for the current child
+            childEqualSlotsList = 
child.getLogicalProperties().getTrait().calAllEqualSet();
+
+            // Map slots to indices for the current child
+            childEqualSlotsIndicesList = mapSlotToIndex(child, 
childEqualSlotsList);
+
+            // Only keep the equal pairs that exist in all children of the 
union
+            // This is done by calculating the intersection of all children's 
equal slot indices
+            for (BitSet unionEqualSlotIndices : unionEqualSlotIndicesList) {
+                BitSet intersect = new BitSet();
+                for (BitSet childEqualSlotIndices : 
childEqualSlotsIndicesList) {
+                    if 
(unionEqualSlotIndices.intersects(childEqualSlotIndices)) {
+                        intersect = childEqualSlotIndices;
+                        break;
+                    }
+                }
+                unionEqualSlotIndices.and(intersect);
+            }
+        }
+
+        // Build the functional dependencies for the output slots
+        List<Slot> outputList = getOutput();
+        for (BitSet equalSlotIndices : unionEqualSlotIndicesList) {
+            if (equalSlotIndices.cardinality() <= 1) {
+                continue;
+            }
+            int first = equalSlotIndices.nextSetBit(0);
+            int next = equalSlotIndices.nextSetBit(first + 1);
+            while (next > 0) {
+                builder.addEqualPair(outputList.get(first), 
outputList.get(next));
+                next = equalSlotIndices.nextSetBit(next + 1);
+            }
+        }
+    }
+
+    @Override
+    public void computeFd(DataTrait.Builder builder) {
+        // don't generate
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalRecursiveCteScan.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalRecursiveCteScan.java
new file mode 100644
index 00000000000..3450ae0de18
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalRecursiveCteScan.java
@@ -0,0 +1,85 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.trees.plans.physical;
+
+import org.apache.doris.catalog.TableIf;
+import org.apache.doris.nereids.memo.GroupExpression;
+import org.apache.doris.nereids.properties.LogicalProperties;
+import org.apache.doris.nereids.properties.PhysicalProperties;
+import org.apache.doris.nereids.trees.expressions.Slot;
+import org.apache.doris.nereids.trees.plans.Plan;
+import org.apache.doris.nereids.trees.plans.PlanType;
+import org.apache.doris.nereids.trees.plans.RelationId;
+import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
+import org.apache.doris.nereids.util.Utils;
+import org.apache.doris.statistics.Statistics;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Optional;
+
+/**
+ * PhysicalRecursiveCteScan.
+ */
+public class PhysicalRecursiveCteScan extends PhysicalCatalogRelation {
+    public PhysicalRecursiveCteScan(RelationId relationId, TableIf table, 
List<String> qualifier,
+            Optional<GroupExpression> groupExpression, LogicalProperties 
logicalProperties,
+            Collection<Slot> operativeSlots) {
+        this(relationId, table, qualifier, groupExpression, logicalProperties, 
PhysicalProperties.ANY, null,
+                operativeSlots);
+    }
+
+    public PhysicalRecursiveCteScan(RelationId relationId, TableIf table, 
List<String> qualifier,
+            Optional<GroupExpression> groupExpression, LogicalProperties 
logicalProperties,
+            PhysicalProperties physicalProperties, Statistics statistics, 
Collection<Slot> operativeSlots) {
+        super(relationId, PlanType.PHYSICAL_RECURSIVE_CTE_SCAN, table, 
qualifier, groupExpression, logicalProperties,
+                physicalProperties, statistics, operativeSlots);
+    }
+
+    @Override
+    public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
+        return visitor.visitPhysicalRecursiveCteScan(this, context);
+    }
+
+    @Override
+    public Plan withGroupExpression(Optional<GroupExpression> groupExpression) 
{
+        return new PhysicalRecursiveCteScan(relationId, table, qualifier, 
groupExpression, getLogicalProperties(),
+                physicalProperties, statistics, operativeSlots);
+    }
+
+    @Override
+    public Plan withGroupExprLogicalPropChildren(Optional<GroupExpression> 
groupExpression,
+            Optional<LogicalProperties> logicalProperties, List<Plan> 
children) {
+        return new PhysicalRecursiveCteScan(relationId, table, qualifier, 
groupExpression, getLogicalProperties(),
+                physicalProperties, statistics, operativeSlots);
+    }
+
+    @Override
+    public PhysicalPlan withPhysicalPropertiesAndStats(PhysicalProperties 
physicalProperties, Statistics statistics) {
+        return new PhysicalRecursiveCteScan(relationId, table, qualifier, 
groupExpression, getLogicalProperties(),
+                physicalProperties, statistics, operativeSlots);
+    }
+
+    @Override
+    public String toString() {
+        return Utils.toSqlString("PhysicalRecursiveCteScan[" + table.getName() 
+ "]" + getGroupIdWithPrefix(),
+                "stats", statistics,
+                "qualified", Utils.qualifiedName(qualifier, table.getName()),
+                "operativeCols", getOperativeSlots());
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/PlanVisitor.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/PlanVisitor.java
index 291f9ed5d80..d34de256608 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/PlanVisitor.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/PlanVisitor.java
@@ -44,6 +44,7 @@ import 
org.apache.doris.nereids.trees.plans.logical.LogicalPreAggOnHint;
 import org.apache.doris.nereids.trees.plans.logical.LogicalPreFilter;
 import org.apache.doris.nereids.trees.plans.logical.LogicalProject;
 import org.apache.doris.nereids.trees.plans.logical.LogicalQualify;
+import org.apache.doris.nereids.trees.plans.logical.LogicalRecursiveCte;
 import org.apache.doris.nereids.trees.plans.logical.LogicalRelation;
 import org.apache.doris.nereids.trees.plans.logical.LogicalRepeat;
 import org.apache.doris.nereids.trees.plans.logical.LogicalSelectHint;
@@ -78,6 +79,7 @@ import 
org.apache.doris.nereids.trees.plans.physical.PhysicalNestedLoopJoin;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalPartitionTopN;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalProject;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalQuickSort;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalRecursiveCte;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalRelation;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalRepeat;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalSetOperation;
@@ -211,6 +213,10 @@ public abstract class PlanVisitor<R, C> implements 
CommandVisitor<R, C>, Relatio
         return visit(join, context);
     }
 
+    public R visitLogicalRecursiveCte(LogicalRecursiveCte recursiveCte, C 
context) {
+        return visit(recursiveCte, context);
+    }
+
     public R visitLogicalLimit(LogicalLimit<? extends Plan> limit, C context) {
         return visit(limit, context);
     }
@@ -376,6 +382,10 @@ public abstract class PlanVisitor<R, C> implements 
CommandVisitor<R, C>, Relatio
         return visitPhysicalSetOperation(union, context);
     }
 
+    public R visitPhysicalRecursiveCte(PhysicalRecursiveCte recursiveCte, C 
context) {
+        return visitPhysicalSetOperation(recursiveCte, context);
+    }
+
     public R visitAbstractPhysicalSort(AbstractPhysicalSort<? extends Plan> 
sort, C context) {
         return visit(sort, context);
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/RelationVisitor.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/RelationVisitor.java
index fef94ff52f9..b325849b1b1 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/RelationVisitor.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/RelationVisitor.java
@@ -30,6 +30,7 @@ import 
org.apache.doris.nereids.trees.plans.logical.LogicalJdbcScan;
 import org.apache.doris.nereids.trees.plans.logical.LogicalOdbcScan;
 import org.apache.doris.nereids.trees.plans.logical.LogicalOlapScan;
 import org.apache.doris.nereids.trees.plans.logical.LogicalOneRowRelation;
+import org.apache.doris.nereids.trees.plans.logical.LogicalRecursiveCteScan;
 import org.apache.doris.nereids.trees.plans.logical.LogicalRelation;
 import org.apache.doris.nereids.trees.plans.logical.LogicalSchemaScan;
 import org.apache.doris.nereids.trees.plans.logical.LogicalTVFRelation;
@@ -45,6 +46,7 @@ import 
org.apache.doris.nereids.trees.plans.physical.PhysicalJdbcScan;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalOdbcScan;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapScan;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalOneRowRelation;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalRecursiveCteScan;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalRelation;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalSchemaScan;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalTVFRelation;
@@ -139,6 +141,10 @@ public interface RelationVisitor<R, C> {
         return visitLogicalCatalogRelation(testScan, context);
     }
 
+    default R visitLogicalRecursiveCteScan(LogicalRecursiveCteScan 
recursiveCteScan, C context) {
+        return visitLogicalCatalogRelation(recursiveCteScan, context);
+    }
+
     // *******************************
     // physical relations
     // *******************************
@@ -176,6 +182,10 @@ public interface RelationVisitor<R, C> {
         return visitPhysicalCatalogRelation(deferMaterializeOlapScan, context);
     }
 
+    default R visitPhysicalRecursiveCteScan(PhysicalRecursiveCteScan 
recursiveCteScan, C context) {
+        return visitPhysicalCatalogRelation(recursiveCteScan, context);
+    }
+
     default R visitPhysicalOneRowRelation(PhysicalOneRowRelation 
oneRowRelation, C context) {
         return visitPhysicalRelation(oneRowRelation, context);
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/RecursiveCteNode.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/RecursiveCteNode.java
new file mode 100644
index 00000000000..b4075d66663
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/RecursiveCteNode.java
@@ -0,0 +1,52 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+// This file is copied from
+// 
https://github.com/apache/impala/blob/branch-2.9.0/fe/src/main/java/org/apache/impala/UnionNode.java
+// and modified by Doris
+
+package org.apache.doris.planner;
+
+import org.apache.doris.analysis.TupleId;
+import org.apache.doris.statistics.StatisticalType;
+import org.apache.doris.thrift.TPlanNode;
+import org.apache.doris.thrift.TPlanNodeType;
+import org.apache.doris.thrift.TRecCTENode;
+
+public class RecursiveCteNode extends SetOperationNode {
+
+    private boolean isUnionAll;
+    private TRecCTENode tRecCTENode;
+
+    public RecursiveCteNode(PlanNodeId id, TupleId tupleId, boolean 
isUnionAll) {
+        super(id, tupleId, "RECURSIVE_CTE", 
StatisticalType.RECURSIVE_CTE_NODE);
+        this.isUnionAll = isUnionAll;
+    }
+
+    public boolean isUnionAll() {
+        return isUnionAll;
+    }
+
+    public void settRecCTENode(TRecCTENode tRecCTENode) {
+        this.tRecCTENode = tRecCTENode;
+    }
+
+    @Override
+    protected void toThrift(TPlanNode msg) {
+        msg.node_type = TPlanNodeType.REC_CTE_NODE;
+        msg.rec_cte_node = tRecCTENode;
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/RecursiveCteScanNode.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/RecursiveCteScanNode.java
new file mode 100644
index 00000000000..103abd8b790
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/planner/RecursiveCteScanNode.java
@@ -0,0 +1,113 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.planner;
+
+import org.apache.doris.analysis.TupleDescriptor;
+import org.apache.doris.catalog.Env;
+import org.apache.doris.common.UserException;
+import org.apache.doris.statistics.StatisticalType;
+import org.apache.doris.system.Backend;
+import org.apache.doris.thrift.TDataGenScanRange;
+import org.apache.doris.thrift.TExplainLevel;
+import org.apache.doris.thrift.TNetworkAddress;
+import org.apache.doris.thrift.TPlanNode;
+import org.apache.doris.thrift.TPlanNodeType;
+import org.apache.doris.thrift.TScanRange;
+import org.apache.doris.thrift.TScanRangeLocation;
+import org.apache.doris.thrift.TScanRangeLocations;
+
+import com.google.common.collect.Lists;
+
+import java.util.Collections;
+import java.util.List;
+
+// Full scan of recursive cte temp table
+public class RecursiveCteScanNode extends ScanNode {
+
+    public RecursiveCteScanNode(PlanNodeId id, TupleDescriptor desc) {
+        super(id, desc, "RECURSIVE_CTE_SCAN", StatisticalType.CTE_SCAN_NODE);
+    }
+
+    public void initScanRangeLocations() throws UserException {
+        createScanRangeLocations();
+    }
+
+    @Override
+    protected void createScanRangeLocations() throws UserException {
+        scanRangeLocations = Lists.newArrayList();
+        // randomly select 1 backend
+        List<Backend> backendList = Lists.newArrayList();
+        for (Backend be : 
Env.getCurrentSystemInfo().getBackendsByCurrentCluster().values()) {
+            if (be.isAlive()) {
+                backendList.add(be);
+            }
+        }
+        if (backendList.isEmpty()) {
+            throw new UserException("No Alive backends");
+        }
+        Collections.shuffle(backendList);
+        Backend selectedBackend = backendList.get(0);
+
+        // create a dummy scan range
+        TScanRange scanRange = new TScanRange();
+        TDataGenScanRange dataGenScanRange = new TDataGenScanRange();
+        scanRange.setDataGenScanRange(dataGenScanRange);
+
+        // create scan range locations
+        TScanRangeLocations locations = new TScanRangeLocations();
+        TScanRangeLocation location = new TScanRangeLocation();
+        location.setBackendId(selectedBackend.getId());
+        location.setServer(new TNetworkAddress(selectedBackend.getHost(), 
selectedBackend.getBePort()));
+        locations.addToLocations(location);
+        locations.setScanRange(scanRange);
+        scanRangeLocations.add(locations);
+    }
+
+    @Override
+    public List<TScanRangeLocations> getScanRangeLocations(long 
maxScanRangeLength) {
+        return scanRangeLocations;
+    }
+
+    @Override
+    public int getNumInstances() {
+        return 1;
+    }
+
+    @Override
+    public int getScanRangeNum() {
+        return 1;
+    }
+
+    @Override
+    public String getNodeExplainString(String prefix, TExplainLevel 
detailLevel) {
+        StringBuilder output = new StringBuilder();
+        output.append(prefix).append("Recursive Cte: 
").append(getTableIf().getName());
+        output.append("\n");
+        return output.toString();
+    }
+
+    @Override
+    protected void toThrift(TPlanNode msg) {
+        msg.node_type = TPlanNodeType.REC_CTE_SCAN_NODE;
+    }
+
+    @Override
+    public boolean isSerialOperator() {
+        return true;
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticalType.java 
b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticalType.java
index 42a930c2471..a2633b51449 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticalType.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticalType.java
@@ -41,6 +41,8 @@ public enum StatisticalType {
     ODBC_SCAN_NODE,
     OLAP_SCAN_NODE,
     PARTITION_TOPN_NODE,
+    RECURSIVE_CTE_NODE,
+    RECURSIVE_CTE_SCAN_NODE,
     REPEAT_NODE,
     SELECT_NODE,
     SET_OPERATION_NODE,
diff --git 
a/regression-test/suites/nereids_p0/recursive_cte/test_recursive_cte.groovy 
b/regression-test/suites/nereids_p0/recursive_cte/test_recursive_cte.groovy
new file mode 100644
index 00000000000..6055f9798cd
--- /dev/null
+++ b/regression-test/suites/nereids_p0/recursive_cte/test_recursive_cte.groovy
@@ -0,0 +1,102 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+suite("test_recursive_cte") {
+    sql """drop table if exists t1"""
+    sql """drop table if exists t2"""
+    sql """drop table if exists t3"""
+    sql """drop table if exists t4"""
+
+    sql """
+        create table if not exists t1 (
+        c2 int   ,
+        c1 int   ,
+        c3 int   ,
+        c4 int   ,
+        pk int
+        )
+        distributed by hash(pk) buckets 10
+        properties("replication_num" = "1");
+    """
+
+    sql """
+        create table if not exists t2 (
+        c1 int   ,
+        c2 int   ,
+        c3 int   ,
+        c4 int   ,
+        pk int
+        )
+        distributed by hash(pk) buckets 10
+        properties("replication_num" = "1");
+    """
+
+    sql """
+        create table if not exists t3 (
+        c2 int   ,
+        c1 int   ,
+        c3 int   ,
+        c4 int   ,
+        pk int
+        )
+        distributed by hash(pk) buckets 10
+        properties("replication_num" = "1");
+    """
+
+    sql """
+        create table if not exists t4 (
+        c1 int   ,
+        c2 int   ,
+        c3 int   ,
+        c4 int   ,
+        pk int
+        )
+        distributed by hash(pk) buckets 10
+        properties("replication_num" = "1");
+    """
+
+    sql """
+        insert into t1(pk,c1,c2,c3,c4) values 
(0,7,2,3328056,7),(1,3,5,3,3045349),(2,2130015,0,7,-7116176),(3,4411710,1203314,1,2336164),(4,4,-8001461,0,8),(5,9,3,6,2),(6,-8088092,null,-7256698,-2025142),(7,8,2,5,1),(8,4,4953685,3,null),(9,-6662413,-3845449,4,2),(10,5315281,0,5,null),(11,9,3,7,7),(12,4341905,null,null,8),(13,3,6,5,1),(14,5,9,6541164,3),(15,1,-582319,1,9),(16,5533636,4,39841,0),(17,1,1,null,7),(18,742881,-1420303,6,1),(19,281430,6753011,3,2),(20,7,1,4,-31350),(21,-5663089
 [...]
+    """
+
+    sql """
+        insert into t2(pk,c1,c2,c3,c4) values 
(0,5,4,189864,-7663457),(1,7,null,6,1),(2,null,8,-3362640,9),(3,3,2,5,-2197130),(4,2,3,7160615,1),(5,null,-57834,420441,3),(6,0,null,2,2),(7,1,-3681539,3,4),(8,548866,3,0,5),(9,8,-2824887,0,3246956),(10,5,3,7,2),(11,8,8,6,8),(12,0,2,7,9),(13,8,6,null,null),(14,-4103729,4,5,8),(15,-3659292,2,7,5),(16,8,7,1,null),(17,2526018,4,8069607,5),(18,6,6,5,2802235),(19,9,0,6379201,null),(20,3,null,4,3),(21,0,8,-5506402,2),(22,6,4,3,1),(23,4,5225086,3,1)
 [...]
+    """
+
+    sql """
+        insert into t3(pk,c1,c2,c3,c4) values 
(0,3,2,6,-3164679),(1,-6216443,3437690,-288827,6),(2,4,-5352286,-1005469,4118240),(3,9,6795167,5,1616205),(4,8,-4659990,-4816829,6),(5,0,9,4,8),(6,-4454766,2,2510766,3),(7,7860071,-3434966,8,3),(8,null,0,2,1),(9,8031908,2,-6673194,-5981416),(10,5,6716310,8,2529959),(11,null,-3622116,1,-7891010),(12,null,3527222,7993802,null),(13,null,1,2,1),(14,2,8,7,7),(15,0,9,5,null),(16,7452083,null,-4620796,0),(17,9,9,null,6),(18,3,1,-1578776,5),(19,9,253
 [...]
+    """
+
+    sql """
+        insert into t4(pk,c1,c2,c3,c4) values 
(0,-4263513,null,null,6),(1,1,3,4,null),(2,2460936,6,5,6299003),(3,null,7,7107446,-2366754),(4,6247611,4785035,3,-8014875),(5,0,2,5249218,3),(6,null,253825,4,3),(7,null,2,9,-350785),(8,6,null,null,4),(9,1,3,1,3422691),(10,0,-6596165,1808018,3),(11,2,752342,null,1),(12,-5220927,2676278,9,7),(13,6025864,2,1,4),(14,7,4,4,9),(15,5,9,9,849881),(16,-4253076,null,-4404479,-6365351),(17,null,6,4240023,3),(18,7,1276495,7,6),(19,null,-4459040,178194,-6
 [...]
+    """
+
+    sql """
+        EXPLAIN ALL PLAN WITH XX AS ( 
+            SELECT C1 
+                FROM t1
+        ),
+        RECURSIVE YY AS (
+            SELECT C1 + 1.0 as c2 
+                FROM XX 
+            UNION 
+            SELECT c1 
+                FROM XX t2 join YY 
+                    WHERE t2.c1 = YY.c2
+            )
+        SELECT * FROM YY;
+    """
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to