This is an automated email from the ASF dual-hosted git repository.

jakevin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 3961b8df76 [refactor](Nereids) mv top-n two phase read rule from post 
processor to rewriter (#22487)
3961b8df76 is described below

commit 3961b8df7696fea608d0475d6a4153b5a7e36c1e
Author: morrySnow <[email protected]>
AuthorDate: Thu Aug 3 14:28:13 2023 +0800

    [refactor](Nereids) mv top-n two phase read rule from post processor to 
rewriter (#22487)
    
    use three new plan node to represent defer materialize of TopN.
    Example:
    
    ```
    -- SQL
    select * from t1 order by c1 limit 10;
    
    -- PLAN
    +------------------------------------------+
    | Explain String                           |
    +------------------------------------------+
    | PhysicalDeferMaterializeResultSink       |
    | --PhysicalDeferMaterializeTopN           |
    | ----PhysicalDistribute                   |
    | ------PhysicalDeferMaterializeTopN       |
    | --------PhysicalDeferMaterializeOlapScan |
    +------------------------------------------+
    ```
---
 .../java/org/apache/doris/catalog/OlapTable.java   |  42 +++++
 .../org/apache/doris/nereids/cost/CostModelV1.java |  14 ++
 .../glue/translator/PhysicalPlanTranslator.java    | 182 ++++++++-------------
 .../doris/nereids/jobs/executor/Rewriter.java      |   4 +
 .../nereids/processor/post/PlanPostProcessors.java |   1 -
 .../doris/nereids/processor/post/TopNScanOpt.java  |  23 ++-
 .../nereids/processor/post/TwoPhaseReadOpt.java    | 163 ------------------
 .../properties/ChildOutputPropertyDeriver.java     |   7 +
 .../nereids/properties/RequestPropertyDeriver.java |   9 +
 .../org/apache/doris/nereids/rules/RuleSet.java    |   6 +
 .../org/apache/doris/nereids/rules/RuleType.java   |   6 +
 ...OlapScanToPhysicalDeferMaterializeOlapScan.java |  45 +++++
 ...ltSinkToPhysicalDeferMaterializeResultSink.java |  48 ++++++
 ...erializeTopNToPhysicalDeferMaterializeTopN.java |  53 ++++++
 .../rules/rewrite/DeferMaterializeTopNResult.java  | 113 +++++++++++++
 .../doris/nereids/stats/StatsCalculator.java       |  27 +++
 .../trees/copier/LogicalPlanDeepCopier.java        |  29 ++++
 .../logical/LogicalDeferMaterializeOlapScan.java   | 168 +++++++++++++++++++
 .../logical/LogicalDeferMaterializeResultSink.java | 146 +++++++++++++++++
 .../plans/logical/LogicalDeferMaterializeTopN.java | 171 +++++++++++++++++++
 .../trees/plans/logical/LogicalOlapScan.java       |   1 -
 .../trees/plans/logical/LogicalResultSink.java     |   8 +-
 .../nereids/trees/plans/logical/LogicalTopN.java   |  10 +-
 .../physical/PhysicalDeferMaterializeOlapScan.java | 157 ++++++++++++++++++
 .../PhysicalDeferMaterializeResultSink.java        | 167 +++++++++++++++++++
 .../physical/PhysicalDeferMaterializeTopN.java     | 176 ++++++++++++++++++++
 .../trees/plans/physical/PhysicalFileSink.java     |   5 +-
 .../trees/plans/physical/PhysicalOlapScan.java     |   2 -
 .../plans/physical/PhysicalOlapTableSink.java      |  22 +--
 .../trees/plans/physical/PhysicalRelation.java     |   6 -
 .../trees/plans/physical/PhysicalResultSink.java   |  24 +--
 .../nereids/trees/plans/physical/PhysicalSink.java |  12 --
 .../nereids/trees/plans/physical/PhysicalTopN.java |   6 +-
 .../nereids/trees/plans/visitor/PlanVisitor.java   |  10 ++
 .../trees/plans/visitor/RelationVisitor.java       |  12 ++
 .../nereids/trees/plans/visitor/SinkVisitor.java   |  12 ++
 .../org/apache/doris/planner/ExchangeNode.java     |   4 +
 .../org/apache/doris/planner/OlapScanNode.java     |  23 +--
 .../org/apache/doris/planner/OriginalPlanner.java  |  13 +-
 .../nereids/postprocess/TopNRuntimeFilterTest.java |  22 ++-
 40 files changed, 1559 insertions(+), 390 deletions(-)

diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
index 03df41cbf6..1d12547203 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
@@ -57,8 +57,11 @@ import org.apache.doris.statistics.MVAnalysisTask;
 import org.apache.doris.statistics.OlapAnalysisTask;
 import org.apache.doris.system.Backend;
 import org.apache.doris.system.SystemInfoService;
+import org.apache.doris.thrift.TColumn;
 import org.apache.doris.thrift.TCompressionType;
+import org.apache.doris.thrift.TFetchOption;
 import org.apache.doris.thrift.TOlapTable;
+import org.apache.doris.thrift.TPrimitiveType;
 import org.apache.doris.thrift.TSortType;
 import org.apache.doris.thrift.TStorageFormat;
 import org.apache.doris.thrift.TStorageMedium;
@@ -2234,4 +2237,43 @@ public class OlapTable extends Table {
     public AutoIncrementGenerator getAutoIncrementGenerator() {
         return autoIncrementGenerator;
     }
+
+    /**
+     * generate two phase read fetch option from this olap table.
+     *
+     * @param selectedIndexId the index want to scan
+     */
+    public TFetchOption generateTwoPhaseReadOption(long selectedIndexId) {
+        TFetchOption fetchOption = new TFetchOption();
+        fetchOption.setFetchRowStore(this.storeRowColumn());
+        fetchOption.setUseTwoPhaseFetch(true);
+        fetchOption.setNodesInfo(SystemInfoService.createAliveNodesInfo());
+        if (!this.storeRowColumn()) {
+            List<TColumn> columnsDesc = Lists.newArrayList();
+            getColumnDesc(selectedIndexId, columnsDesc, null, null);
+            fetchOption.setColumnDesc(columnsDesc);
+        }
+        return fetchOption;
+    }
+
+    public void getColumnDesc(long selectedIndexId, List<TColumn> columnsDesc, 
List<String> keyColumnNames,
+            List<TPrimitiveType> keyColumnTypes) {
+        if (selectedIndexId != -1) {
+            for (Column col : this.getSchemaByIndexId(selectedIndexId, true)) {
+                TColumn tColumn = col.toThrift();
+                col.setIndexFlag(tColumn, this);
+                if (columnsDesc != null) {
+                    columnsDesc.add(tColumn);
+                }
+                if ((Util.showHiddenColumns() || (!Util.showHiddenColumns() && 
col.isVisible())) && col.isKey()) {
+                    if (keyColumnNames != null) {
+                        keyColumnNames.add(col.getName());
+                    }
+                    if (keyColumnTypes != null) {
+                        keyColumnTypes.add(col.getDataType().toThrift());
+                    }
+                }
+            }
+        }
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/cost/CostModelV1.java 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/cost/CostModelV1.java
index 0627d366fa..d579d03764 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/cost/CostModelV1.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/cost/CostModelV1.java
@@ -24,6 +24,8 @@ import 
org.apache.doris.nereids.properties.DistributionSpecHash;
 import org.apache.doris.nereids.properties.DistributionSpecReplicated;
 import org.apache.doris.nereids.trees.plans.Plan;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalAssertNumRows;
+import 
org.apache.doris.nereids.trees.plans.physical.PhysicalDeferMaterializeOlapScan;
+import 
org.apache.doris.nereids.trees.plans.physical.PhysicalDeferMaterializeTopN;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalDistribute;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalEsScan;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalFileScan;
@@ -99,6 +101,12 @@ class CostModelV1 extends PlanVisitor<Cost, PlanContext> {
         return CostV1.ofCpu(statistics.getRowCount());
     }
 
+    @Override
+    public Cost 
visitPhysicalDeferMaterializeOlapScan(PhysicalDeferMaterializeOlapScan 
deferMaterializeOlapScan,
+            PlanContext context) {
+        return 
visitPhysicalOlapScan(deferMaterializeOlapScan.getPhysicalOlapScan(), context);
+    }
+
     public Cost visitPhysicalSchemaScan(PhysicalSchemaScan physicalSchemaScan, 
PlanContext context) {
         Statistics statistics = context.getStatisticsWithCheck();
         return CostV1.ofCpu(statistics.getRowCount());
@@ -167,6 +175,12 @@ class CostModelV1 extends PlanVisitor<Cost, PlanContext> {
                 childStatistics.getRowCount());
     }
 
+    @Override
+    public Cost 
visitPhysicalDeferMaterializeTopN(PhysicalDeferMaterializeTopN<? extends Plan> 
topN,
+            PlanContext context) {
+        return visitPhysicalTopN(topN.getPhysicalTopN(), context);
+    }
+
     @Override
     public Cost visitPhysicalPartitionTopN(PhysicalPartitionTopN<? extends 
Plan> partitionTopN, PlanContext context) {
         Statistics statistics = context.getStatisticsWithCheck();
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
index 81e6a96b6b..41be06be44 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
@@ -91,6 +91,9 @@ import 
org.apache.doris.nereids.trees.plans.physical.PhysicalAssertNumRows;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalCTEAnchor;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalCTEConsumer;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalCTEProducer;
+import 
org.apache.doris.nereids.trees.plans.physical.PhysicalDeferMaterializeOlapScan;
+import 
org.apache.doris.nereids.trees.plans.physical.PhysicalDeferMaterializeResultSink;
+import 
org.apache.doris.nereids.trees.plans.physical.PhysicalDeferMaterializeTopN;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalDistribute;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalEmptyRelation;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalEsScan;
@@ -165,9 +168,7 @@ import 
org.apache.doris.planner.external.iceberg.IcebergScanNode;
 import org.apache.doris.planner.external.jdbc.JdbcScanNode;
 import org.apache.doris.planner.external.paimon.PaimonScanNode;
 import org.apache.doris.qe.ConnectContext;
-import org.apache.doris.system.SystemInfoService;
 import org.apache.doris.tablefunction.TableValuedFunctionIf;
-import org.apache.doris.thrift.TColumn;
 import org.apache.doris.thrift.TFetchOption;
 import org.apache.doris.thrift.TPartitionType;
 import org.apache.doris.thrift.TPushAggOp;
@@ -228,19 +229,10 @@ public class PhysicalPlanTranslator extends 
DefaultPlanVisitor<PlanFragment, Pla
      */
     public PlanFragment translatePlan(PhysicalPlan physicalPlan) {
         PlanFragment rootFragment = physicalPlan.accept(this, context);
-
-        // TODO: why we need if? we should always set output expr?
-        //   OlapSink? maybe OlapSink should not set output exprs by it self
-        if (rootFragment.getOutputExprs() == null) {
-            List<Expr> outputExprs = Lists.newArrayList();
-            physicalPlan.getOutput().stream().map(Slot::getExprId)
-                    .forEach(exprId -> 
outputExprs.add(context.findSlotRef(exprId)));
-            rootFragment.setOutputExprs(outputExprs);
-        }
-        for (PlanFragment fragment : context.getPlanFragments()) {
-            fragment.finalize(null);
-        }
-        setResultSinkFetchOptionIfNeed();
+        List<Expr> outputExprs = Lists.newArrayList();
+        physicalPlan.getOutput().stream().map(Slot::getExprId)
+                .forEach(exprId -> 
outputExprs.add(context.findSlotRef(exprId)));
+        rootFragment.setOutputExprs(outputExprs);
         Collections.reverse(context.getPlanFragments());
         // TODO: maybe we need to trans nullable directly? and then we could 
remove call computeMemLayout
         context.getDescTable().computeMemLayout();
@@ -311,6 +303,9 @@ public class PhysicalPlanTranslator extends 
DefaultPlanVisitor<PlanFragment, Pla
         } else {
             inputFragment.setDestination(exchangeNode);
             inputFragment.setOutputPartition(dataPartition);
+            DataStreamSink streamSink = new 
DataStreamSink(exchangeNode.getId());
+            streamSink.setOutputPartition(dataPartition);
+            inputFragment.setSink(streamSink);
         }
 
         context.addPlanFragment(parentFragment);
@@ -324,13 +319,26 @@ public class PhysicalPlanTranslator extends 
DefaultPlanVisitor<PlanFragment, Pla
     @Override
     public PlanFragment visitPhysicalResultSink(PhysicalResultSink<? extends 
Plan> physicalResultSink,
             PlanTranslatorContext context) {
-        return physicalResultSink.child().accept(this, context);
+        PlanFragment planFragment = physicalResultSink.child().accept(this, 
context);
+        planFragment.setSink(new 
ResultSink(planFragment.getPlanRoot().getId()));
+        return planFragment;
+    }
+
+    @Override
+    public PlanFragment visitPhysicalDeferMaterializeResultSink(
+            PhysicalDeferMaterializeResultSink<? extends Plan> sink,
+            PlanTranslatorContext context) {
+        PlanFragment planFragment = 
visitPhysicalResultSink(sink.getPhysicalResultSink(), context);
+        TFetchOption fetchOption = 
sink.getOlapTable().generateTwoPhaseReadOption(sink.getSelectedIndexId());
+        ((ResultSink) planFragment.getSink()).setFetchOption(fetchOption);
+        return planFragment;
     }
 
     @Override
     public PlanFragment visitPhysicalOlapTableSink(PhysicalOlapTableSink<? 
extends Plan> olapTableSink,
             PlanTranslatorContext context) {
         PlanFragment rootFragment = olapTableSink.child().accept(this, 
context);
+        rootFragment.setOutputPartition(DataPartition.UNPARTITIONED);
 
         TupleDescriptor olapTuple = context.generateTupleDesc();
         List<Column> targetTableColumns = 
olapTableSink.getTargetTable().getFullSchema();
@@ -341,26 +349,21 @@ public class PhysicalPlanTranslator extends 
DefaultPlanVisitor<PlanFragment, Pla
             slotDesc.setColumn(column);
             slotDesc.setIsNullable(column.isAllowNull());
         }
-
         OlapTableSink sink = new OlapTableSink(
                 olapTableSink.getTargetTable(),
                 olapTuple,
                 olapTableSink.getPartitionIds().isEmpty() ? null : 
olapTableSink.getPartitionIds(),
                 olapTableSink.isSingleReplicaLoad()
         );
-
         if (olapTableSink.isPartialUpdate()) {
-            HashSet<String> partialUpdateCols = new HashSet<String>();
+            HashSet<String> partialUpdateCols = new HashSet<>();
             for (Column col : olapTableSink.getCols()) {
                 partialUpdateCols.add(col.getName());
             }
             sink.setPartialUpdateInputColumns(true, partialUpdateCols);
         }
-
         rootFragment.setSink(sink);
 
-        rootFragment.setOutputPartition(DataPartition.UNPARTITIONED);
-
         return rootFragment;
     }
 
@@ -379,6 +382,7 @@ public class PhysicalPlanTranslator extends 
DefaultPlanVisitor<PlanFragment, Pla
                 .forEach(exprId -> 
outputExprs.add(context.findSlotRef(exprId)));
         rootFragment.setOutputExprs(outputExprs);
 
+        // TODO: should not call legacy planner analyze in Nereids
         try {
             outFile.analyze(null, outputExprs,
                     
fileSink.getOutput().stream().map(NamedExpression::getName).collect(Collectors.toList()));
@@ -515,24 +519,13 @@ public class PhysicalPlanTranslator extends 
DefaultPlanVisitor<PlanFragment, Pla
 
     @Override
     public PlanFragment visitPhysicalOlapScan(PhysicalOlapScan olapScan, 
PlanTranslatorContext context) {
-        // deferred materialized slots used for topn opt.
-        Set<ExprId> deferredMaterializedExprIds = olapScan
-                .getMutableState(PhysicalOlapScan.DEFERRED_MATERIALIZED_SLOTS)
-                .map(s -> (Set<ExprId>) s)
-                .orElse(Collections.emptySet());
-
         List<Slot> slots = olapScan.getOutput();
         OlapTable olapTable = olapScan.getTable();
         // generate real output tuple
-        TupleDescriptor tupleDescriptor = generateTupleDesc(slots, olapTable, 
deferredMaterializedExprIds, context);
+        TupleDescriptor tupleDescriptor = generateTupleDesc(slots, olapTable, 
context);
         // generate base index tuple because this fragment partitioned expr 
relay on slots of based index
         if (olapScan.getSelectedIndexId() != 
olapScan.getTable().getBaseIndexId()) {
-            generateTupleDesc(olapScan.getBaseOutputs(), olapTable, 
deferredMaterializedExprIds, context);
-        }
-
-        // TODO: remove this, we should add this column in Nereids
-        if 
(olapScan.getMutableState(PhysicalOlapScan.DEFERRED_MATERIALIZED_SLOTS).isPresent())
 {
-            injectRowIdColumnSlot(tupleDescriptor);
+            generateTupleDesc(olapScan.getBaseOutputs(), olapTable, context);
         }
 
         OlapScanNode olapScanNode = new OlapScanNode(context.nextPlanNodeId(), 
tupleDescriptor, "OlapScanNode");
@@ -590,6 +583,22 @@ public class PhysicalPlanTranslator extends 
DefaultPlanVisitor<PlanFragment, Pla
         return planFragment;
     }
 
+    @Override
+    public PlanFragment visitPhysicalDeferMaterializeOlapScan(
+            PhysicalDeferMaterializeOlapScan deferMaterializeOlapScan, 
PlanTranslatorContext context) {
+        PlanFragment planFragment = 
visitPhysicalOlapScan(deferMaterializeOlapScan.getPhysicalOlapScan(), context);
+        OlapScanNode olapScanNode = (OlapScanNode) planFragment.getPlanRoot();
+        TupleDescriptor tupleDescriptor = 
context.getTupleDesc(olapScanNode.getTupleId());
+        for (SlotDescriptor slotDescriptor : tupleDescriptor.getSlots()) {
+            if (deferMaterializeOlapScan.getDeferMaterializeSlotIds()
+                    .contains(context.findExprId(slotDescriptor.getId()))) {
+                slotDescriptor.setNeedMaterialize(false);
+            }
+        }
+        context.createSlotDesc(tupleDescriptor, 
deferMaterializeOlapScan.getColumnIdSlot());
+        return planFragment;
+    }
+
     @Override
     public PlanFragment visitPhysicalOneRowRelation(PhysicalOneRowRelation 
oneRowRelation,
             PlanTranslatorContext context) {
@@ -1734,6 +1743,26 @@ public class PhysicalPlanTranslator extends 
DefaultPlanVisitor<PlanFragment, Pla
         return inputFragment;
     }
 
+    @Override
+    public PlanFragment 
visitPhysicalDeferMaterializeTopN(PhysicalDeferMaterializeTopN<? extends Plan> 
topN,
+            PlanTranslatorContext context) {
+
+        PlanFragment planFragment = visitPhysicalTopN(topN.getPhysicalTopN(), 
context);
+        if (planFragment.getPlanRoot() instanceof SortNode) {
+            SortNode sortNode = (SortNode) planFragment.getPlanRoot();
+            sortNode.setUseTwoPhaseReadOpt(true);
+            sortNode.getSortInfo().setUseTwoPhaseRead();
+            TupleDescriptor tupleDescriptor = 
sortNode.getSortInfo().getSortTupleDescriptor();
+            for (SlotDescriptor slotDescriptor : tupleDescriptor.getSlots()) {
+                if (topN.getDeferMaterializeSlotIds()
+                        .contains(context.findExprId(slotDescriptor.getId()))) 
{
+                    slotDescriptor.setNeedMaterialize(false);
+                }
+            }
+        }
+        return planFragment;
+    }
+
     @Override
     public PlanFragment visitPhysicalRepeat(PhysicalRepeat<? extends Plan> 
repeat, PlanTranslatorContext context) {
         PlanFragment inputPlanFragment = repeat.child(0).accept(this, context);
@@ -1907,12 +1936,7 @@ public class PhysicalPlanTranslator extends 
DefaultPlanVisitor<PlanFragment, Pla
 
     private SortNode translateSortNode(AbstractPhysicalSort<? extends Plan> 
sort, PlanNode childNode,
             PlanTranslatorContext context) {
-        Set<ExprId> deferredMaterializedExprIds = sort
-                .getMutableState(PhysicalOlapScan.DEFERRED_MATERIALIZED_SLOTS)
-                .map(s -> (Set<ExprId>) s)
-                .orElse(Collections.emptySet());
-        TupleDescriptor sortTuple = generateTupleDesc(sort.child().getOutput(),
-                null, deferredMaterializedExprIds, context);
+        TupleDescriptor sortTuple = 
generateTupleDesc(sort.child().getOutput(), null, context);
         List<Expr> orderingExprs = Lists.newArrayList();
         List<Boolean> ascOrders = Lists.newArrayList();
         List<Boolean> nullsFirstParams = Lists.newArrayList();
@@ -1924,11 +1948,6 @@ public class PhysicalPlanTranslator extends 
DefaultPlanVisitor<PlanFragment, Pla
         });
         SortInfo sortInfo = new SortInfo(orderingExprs, ascOrders, 
nullsFirstParams, sortTuple);
         SortNode sortNode = new SortNode(context.nextPlanNodeId(), childNode, 
sortInfo, sort instanceof PhysicalTopN);
-        if 
(sort.getMutableState(PhysicalOlapScan.DEFERRED_MATERIALIZED_SLOTS).isPresent())
 {
-            sortNode.setUseTwoPhaseReadOpt(true);
-            sortNode.getSortInfo().setUseTwoPhaseRead();
-            
injectRowIdColumnSlot(sortNode.getSortInfo().getSortTupleDescriptor());
-        }
         if (sort.getStats() != null) {
             sortNode.setCardinality((long) sort.getStats().getRowCount());
         }
@@ -1974,19 +1993,6 @@ public class PhysicalPlanTranslator extends 
DefaultPlanVisitor<PlanFragment, Pla
         updateLegacyPlanIdToPhysicalPlan(planNode, filter);
     }
 
-    private TupleDescriptor generateTupleDesc(List<Slot> slotList, TableIf 
table,
-            Set<ExprId> deferredMaterializedExprIds, PlanTranslatorContext 
context) {
-        TupleDescriptor tupleDescriptor = context.generateTupleDesc();
-        tupleDescriptor.setTable(table);
-        for (Slot slot : slotList) {
-            SlotDescriptor slotDescriptor = 
context.createSlotDesc(tupleDescriptor, (SlotReference) slot, table);
-            if (deferredMaterializedExprIds.contains(slot.getExprId())) {
-                slotDescriptor.setNeedMaterialize(false);
-            }
-        }
-        return tupleDescriptor;
-    }
-
     private TupleDescriptor generateTupleDesc(List<Slot> slotList, TableIf 
table, PlanTranslatorContext context) {
         TupleDescriptor tupleDescriptor = context.generateTupleDesc();
         tupleDescriptor.setTable(table);
@@ -2172,71 +2178,19 @@ public class PhysicalPlanTranslator extends 
DefaultPlanVisitor<PlanFragment, Pla
         slotDesc.setIsMaterialized(true);
     }
 
-    /**
-     * We use two phase read to optimize sql like: select * from tbl [where 
xxx = ???] [order by column1] [limit n]
-     * in the first phase, we add an extra column `RowId` to Block, and sort 
blocks in TopN nodes
-     * in the second phase, we have n rows, we do a fetch rpc to get all 
rowids data for the n rows
-     * and reconstruct the final block
-     */
-    private void setResultSinkFetchOptionIfNeed() {
-        boolean needFetch = false;
-        // Only single olap table should be fetched
-        OlapTable fetchOlapTable = null;
-        OlapScanNode scanNode = null;
-        for (PlanFragment fragment : context.getPlanFragments()) {
-            PlanNode node = fragment.getPlanRoot();
-            PlanNode parent = null;
-            // OlapScanNode is the last node.
-            // So, just get the last two node and check if they are SortNode 
and OlapScan.
-            while (node.getChildren().size() != 0) {
-                parent = node;
-                node = node.getChildren().get(0);
-            }
-
-            // case1: general topn optimized query
-            if ((node instanceof OlapScanNode) && (parent instanceof 
SortNode)) {
-                SortNode sortNode = (SortNode) parent;
-                scanNode = (OlapScanNode) node;
-                if (sortNode.getUseTwoPhaseReadOpt()) {
-                    needFetch = true;
-                    fetchOlapTable = scanNode.getOlapTable();
-                    break;
-                }
-            }
-        }
-        for (PlanFragment fragment : context.getPlanFragments()) {
-            if (needFetch && fragment.getSink() instanceof ResultSink) {
-                TFetchOption fetchOption = new TFetchOption();
-                fetchOption.setFetchRowStore(fetchOlapTable.storeRowColumn());
-                fetchOption.setUseTwoPhaseFetch(true);
-                
fetchOption.setNodesInfo(SystemInfoService.createAliveNodesInfo());
-                if (!fetchOlapTable.storeRowColumn()) {
-                    // Set column desc for each column
-                    List<TColumn> columnsDesc = new ArrayList<>();
-                    scanNode.getColumnDesc(columnsDesc, null, null);
-                    fetchOption.setColumnDesc(columnsDesc);
-                }
-                ((ResultSink) fragment.getSink()).setFetchOption(fetchOption);
-                break;
-            }
-        }
-    }
-
     /**
      * topN opt: using storage data ordering to accelerate topn operation.
      * refer pr: optimize topn query if order by columns is prefix of sort 
keys of table (#10694)
      */
     private boolean checkPushSort(SortNode sortNode, OlapTable olapTable) {
-        // Ensure limit is less then threshold
+        // Ensure limit is less than threshold
         if (sortNode.getLimit() <= 0
                 || sortNode.getLimit() > 
ConnectContext.get().getSessionVariable().topnOptLimitThreshold) {
             return false;
         }
 
-        // Ensure all isAscOrder is same, ande length != 0.
-        // Can't be zorder.
-        if (sortNode.getSortInfo().getIsAscOrder().stream().distinct().count() 
!= 1
-                || olapTable.isZOrderSort()) {
+        // Ensure all isAscOrder is same, ande length != 0. Can't be z-order.
+        if (sortNode.getSortInfo().getIsAscOrder().stream().distinct().count() 
!= 1 || olapTable.isZOrderSort()) {
             return false;
         }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/executor/Rewriter.java 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/executor/Rewriter.java
index 0c1507ae2b..606d9746cf 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/executor/Rewriter.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/executor/Rewriter.java
@@ -46,6 +46,7 @@ import 
org.apache.doris.nereids.rules.rewrite.CollectProjectAboveConsumer;
 import org.apache.doris.nereids.rules.rewrite.ColumnPruning;
 import org.apache.doris.nereids.rules.rewrite.ConvertInnerOrCrossJoin;
 import org.apache.doris.nereids.rules.rewrite.CountDistinctRewrite;
+import org.apache.doris.nereids.rules.rewrite.DeferMaterializeTopNResult;
 import org.apache.doris.nereids.rules.rewrite.EliminateAggregate;
 import org.apache.doris.nereids.rules.rewrite.EliminateDedupJoinCondition;
 import org.apache.doris.nereids.rules.rewrite.EliminateFilter;
@@ -283,6 +284,9 @@ public class Rewriter extends AbstractBatchJobExecutor {
                     bottomUp(RuleSet.PUSH_DOWN_FILTERS),
                     custom(RuleType.ELIMINATE_UNNECESSARY_PROJECT, 
EliminateUnnecessaryProject::new)
             ),
+            topic("topn optimize",
+                    topDown(new DeferMaterializeTopNResult())
+            ),
             // this rule batch must keep at the end of rewrite to do some plan 
check
             topic("Final rewrite and check",
                     custom(RuleType.ENSURE_PROJECT_ON_TOP_JOIN, 
EnsureProjectOnTopJoin::new),
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/PlanPostProcessors.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/PlanPostProcessors.java
index 40891e82a4..1c927fb89d 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/PlanPostProcessors.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/PlanPostProcessors.java
@@ -71,7 +71,6 @@ public class PlanPostProcessors {
         builder.add(new Validator());
         builder.add(new RecomputeLogicalPropertiesProcessor());
         builder.add(new TopNScanOpt());
-        builder.add(new TwoPhaseReadOpt());
         return builder.build();
     }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/TopNScanOpt.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/TopNScanOpt.java
index a938a231ed..0c5d8d8ce6 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/TopNScanOpt.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/TopNScanOpt.java
@@ -22,22 +22,23 @@ import 
org.apache.doris.nereids.trees.expressions.Expression;
 import org.apache.doris.nereids.trees.plans.Plan;
 import org.apache.doris.nereids.trees.plans.SortPhase;
 import org.apache.doris.nereids.trees.plans.algebra.Filter;
+import org.apache.doris.nereids.trees.plans.algebra.OlapScan;
 import org.apache.doris.nereids.trees.plans.algebra.Project;
-import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapScan;
+import 
org.apache.doris.nereids.trees.plans.physical.PhysicalDeferMaterializeTopN;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalTopN;
 import org.apache.doris.qe.ConnectContext;
 
 /**
  * topN opt
  * refer to:
- * https://github.com/apache/doris/pull/15558
- * https://github.com/apache/doris/pull/15663
+ * <a href="https://github.com/apache/doris/pull/15558";>...</a>
+ * <a href="https://github.com/apache/doris/pull/15663";>...</a>
  */
 
 public class TopNScanOpt extends PlanPostProcessor {
 
     @Override
-    public PhysicalTopN visitPhysicalTopN(PhysicalTopN<? extends Plan> topN, 
CascadesContext ctx) {
+    public PhysicalTopN<? extends Plan> visitPhysicalTopN(PhysicalTopN<? 
extends Plan> topN, CascadesContext ctx) {
         topN.child().accept(this, ctx);
         Plan child = topN.child();
         if (topN.getSortPhase() != SortPhase.LOCAL_SORT) {
@@ -52,7 +53,7 @@ public class TopNScanOpt extends PlanPostProcessor {
         if (topNOptLimitThreshold == -1 || topN.getLimit() > 
topNOptLimitThreshold) {
             return topN;
         }
-        // if firstKey's column is not present, it means the firstKey is not a 
original column from scan node
+        // if firstKey's column is not present, it means the firstKey is not 
an original column from scan node
         // for example: "select cast(k1 as INT) as id from tbl1 order by id 
limit 2;" the firstKey "id" is
         // a cast expr which is not from tbl1 and its column is not present.
         // On the other hand "select k1 as id from tbl1 order by id limit 2;" 
the firstKey "id" is just an alias of k1
@@ -68,14 +69,14 @@ public class TopNScanOpt extends PlanPostProcessor {
             return topN;
         }
 
-        PhysicalOlapScan olapScan;
+        OlapScan olapScan;
         while (child instanceof Project || child instanceof Filter) {
             child = child.child(0);
         }
-        if (!(child instanceof PhysicalOlapScan)) {
+        if (!(child instanceof OlapScan)) {
             return topN;
         }
-        olapScan = (PhysicalOlapScan) child;
+        olapScan = (OlapScan) child;
 
         if (olapScan.getTable().isDupKeysOrMergeOnWrite()) {
             topN.setMutableState(PhysicalTopN.TOPN_RUNTIME_FILTER, true);
@@ -84,6 +85,12 @@ public class TopNScanOpt extends PlanPostProcessor {
         return topN;
     }
 
+    @Override
+    public Plan 
visitPhysicalDeferMaterializeTopN(PhysicalDeferMaterializeTopN<? extends Plan> 
topN,
+            CascadesContext context) {
+        return topN.withPhysicalTopN(visitPhysicalTopN(topN.getPhysicalTopN(), 
context));
+    }
+
     private long getTopNOptLimitThreshold() {
         if (ConnectContext.get() != null && 
ConnectContext.get().getSessionVariable() != null) {
             return 
ConnectContext.get().getSessionVariable().topnOptLimitThreshold;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/TwoPhaseReadOpt.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/TwoPhaseReadOpt.java
deleted file mode 100644
index 93dd579e70..0000000000
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/TwoPhaseReadOpt.java
+++ /dev/null
@@ -1,163 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.doris.nereids.processor.post;
-
-import org.apache.doris.nereids.CascadesContext;
-import org.apache.doris.nereids.properties.OrderKey;
-import org.apache.doris.nereids.trees.expressions.Alias;
-import org.apache.doris.nereids.trees.expressions.ExprId;
-import org.apache.doris.nereids.trees.expressions.Expression;
-import org.apache.doris.nereids.trees.expressions.NamedExpression;
-import org.apache.doris.nereids.trees.expressions.Slot;
-import org.apache.doris.nereids.trees.plans.Plan;
-import org.apache.doris.nereids.trees.plans.SortPhase;
-import org.apache.doris.nereids.trees.plans.algebra.Filter;
-import org.apache.doris.nereids.trees.plans.algebra.Project;
-import org.apache.doris.nereids.trees.plans.physical.PhysicalDistribute;
-import org.apache.doris.nereids.trees.plans.physical.PhysicalFilter;
-import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapScan;
-import org.apache.doris.nereids.trees.plans.physical.PhysicalProject;
-import org.apache.doris.nereids.trees.plans.physical.PhysicalTopN;
-import org.apache.doris.qe.ConnectContext;
-
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-
-import java.util.Map;
-import java.util.Objects;
-import java.util.Set;
-
-/**
- * two phase read opt
- * refer to:
- * https://github.com/apache/doris/pull/15642
- * https://github.com/apache/doris/pull/16460
- * https://github.com/apache/doris/pull/16848
- */
-
-public class TwoPhaseReadOpt extends PlanPostProcessor {
-
-    @Override
-    public Plan processRoot(Plan plan, CascadesContext ctx) {
-        if (plan instanceof PhysicalTopN) {
-            PhysicalTopN<Plan> physicalTopN = (PhysicalTopN<Plan>) plan;
-            if (physicalTopN.getSortPhase() == SortPhase.MERGE_SORT) {
-                return plan.accept(this, ctx);
-            }
-        }
-        return plan;
-    }
-
-    @Override
-    public PhysicalTopN visitPhysicalTopN(PhysicalTopN<? extends Plan> 
mergeTopN, CascadesContext ctx) {
-        mergeTopN.child().accept(this, ctx);
-        if (mergeTopN.getSortPhase() != SortPhase.MERGE_SORT || 
!(mergeTopN.child() instanceof PhysicalDistribute)) {
-            return mergeTopN;
-        }
-        PhysicalDistribute<Plan> distribute = (PhysicalDistribute<Plan>) 
mergeTopN.child();
-        if (!(distribute.child() instanceof PhysicalTopN)) {
-            return mergeTopN;
-        }
-        PhysicalTopN<Plan> localTopN = (PhysicalTopN<Plan>) distribute.child();
-
-        if (localTopN.getOrderKeys().isEmpty()) {
-            return mergeTopN;
-        }
-
-        // topn opt
-        long topNOptLimitThreshold = getTopNOptLimitThreshold();
-        if (topNOptLimitThreshold < 0 || mergeTopN.getLimit() > 
topNOptLimitThreshold) {
-            return mergeTopN;
-        }
-        if 
(!localTopN.getOrderKeys().stream().map(OrderKey::getExpr).allMatch(Expression::isColumnFromTable))
 {
-            return mergeTopN;
-        }
-
-        PhysicalOlapScan olapScan;
-        PhysicalProject<Plan> project = null;
-        PhysicalFilter<Plan> filter = null;
-        Plan child = localTopN.child();
-        while (child instanceof Project || child instanceof Filter) {
-            if (child instanceof Filter) {
-                filter = (PhysicalFilter<Plan>) child;
-            }
-            if (child instanceof Project) {
-                project = (PhysicalProject<Plan>) child;
-                // TODO: remove this after fix two phase read on project core
-                return mergeTopN;
-            }
-            child = child.child(0);
-        }
-        if (!(child instanceof PhysicalOlapScan)) {
-            return mergeTopN;
-        }
-        olapScan = (PhysicalOlapScan) child;
-
-        // all order key must column from table
-        if (!olapScan.getTable().getEnableLightSchemaChange()) {
-            return mergeTopN;
-        }
-
-        Map<ExprId, ExprId> projectRevertedMap = Maps.newHashMap();
-        if (project != null) {
-            for (Expression e : project.getProjects()) {
-                if (e.isSlot()) {
-                    Slot slot = (Slot) e;
-                    projectRevertedMap.put(slot.getExprId(), slot.getExprId());
-                } else if (e instanceof Alias) {
-                    Alias alias = (Alias) e;
-                    if (alias.child().isSlot()) {
-                        Slot slot = (Slot) alias.child();
-                        projectRevertedMap.put(alias.getExprId(), 
slot.getExprId());
-                    }
-                }
-            }
-        }
-        Set<ExprId> deferredMaterializedExprIds = 
Sets.newHashSet(olapScan.getOutputExprIdSet());
-        if (filter != null) {
-            filter.getConjuncts().forEach(e -> 
deferredMaterializedExprIds.removeAll(e.getInputSlotExprIds()));
-        }
-        localTopN.getOrderKeys().stream()
-                .map(OrderKey::getExpr)
-                .map(Slot.class::cast)
-                .map(NamedExpression::getExprId)
-                .map(projectRevertedMap::get)
-                .filter(Objects::nonNull)
-                .forEach(deferredMaterializedExprIds::remove);
-        localTopN.getOrderKeys().stream()
-                .map(OrderKey::getExpr)
-                .map(Slot.class::cast)
-                .map(NamedExpression::getExprId)
-                .forEach(deferredMaterializedExprIds::remove);
-        olapScan.setMutableState(PhysicalOlapScan.DEFERRED_MATERIALIZED_SLOTS, 
deferredMaterializedExprIds);
-        
localTopN.setMutableState(PhysicalOlapScan.DEFERRED_MATERIALIZED_SLOTS, 
deferredMaterializedExprIds);
-        
mergeTopN.setMutableState(PhysicalOlapScan.DEFERRED_MATERIALIZED_SLOTS, 
deferredMaterializedExprIds);
-
-        return mergeTopN;
-    }
-
-    private long getTopNOptLimitThreshold() {
-        if (ConnectContext.get() != null && 
ConnectContext.get().getSessionVariable() != null) {
-            if 
(!ConnectContext.get().getSessionVariable().enableTwoPhaseReadOpt) {
-                return -1;
-            }
-            return 
ConnectContext.get().getSessionVariable().topnOptLimitThreshold;
-        }
-        return -1;
-    }
-}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildOutputPropertyDeriver.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildOutputPropertyDeriver.java
index 2ebc3acc5f..97c197faa1 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildOutputPropertyDeriver.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildOutputPropertyDeriver.java
@@ -34,6 +34,7 @@ import 
org.apache.doris.nereids.trees.plans.physical.PhysicalAssertNumRows;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalCTEAnchor;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalCTEConsumer;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalCTEProducer;
+import 
org.apache.doris.nereids.trees.plans.physical.PhysicalDeferMaterializeOlapScan;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalDistribute;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalEmptyRelation;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalEsScan;
@@ -141,6 +142,12 @@ public class ChildOutputPropertyDeriver extends 
PlanVisitor<PhysicalProperties,
         return new PhysicalProperties(olapScan.getDistributionSpec());
     }
 
+    @Override
+    public PhysicalProperties visitPhysicalDeferMaterializeOlapScan(
+            PhysicalDeferMaterializeOlapScan deferMaterializeOlapScan, 
PlanContext context) {
+        return 
visitPhysicalOlapScan(deferMaterializeOlapScan.getPhysicalOlapScan(), context);
+    }
+
     @Override
     public PhysicalProperties 
visitPhysicalOneRowRelation(PhysicalOneRowRelation oneRowRelation, PlanContext 
context) {
         return PhysicalProperties.GATHER;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/RequestPropertyDeriver.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/RequestPropertyDeriver.java
index 9b5603fe86..6ca028f469 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/RequestPropertyDeriver.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/RequestPropertyDeriver.java
@@ -29,6 +29,7 @@ import org.apache.doris.nereids.trees.plans.Plan;
 import org.apache.doris.nereids.trees.plans.physical.AbstractPhysicalSort;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalAssertNumRows;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalCTEAnchor;
+import 
org.apache.doris.nereids.trees.plans.physical.PhysicalDeferMaterializeResultSink;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalFileSink;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalHashJoin;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalLimit;
@@ -109,6 +110,14 @@ public class RequestPropertyDeriver extends 
PlanVisitor<Void, PlanContext> {
         return null;
     }
 
+    @Override
+    public Void visitPhysicalDeferMaterializeResultSink(
+            PhysicalDeferMaterializeResultSink<? extends Plan> sink,
+            PlanContext context) {
+        addRequestPropertyToChildren(PhysicalProperties.GATHER);
+        return null;
+    }
+
     /* 
********************************************************************************************
      * Other Node, in lexicographical order
      * 
********************************************************************************************
 */
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleSet.java 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleSet.java
index 4ed887193f..a5e872d3c5 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleSet.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleSet.java
@@ -45,6 +45,9 @@ import 
org.apache.doris.nereids.rules.implementation.LogicalAssertNumRowsToPhysi
 import 
org.apache.doris.nereids.rules.implementation.LogicalCTEAnchorToPhysicalCTEAnchor;
 import 
org.apache.doris.nereids.rules.implementation.LogicalCTEConsumerToPhysicalCTEConsumer;
 import 
org.apache.doris.nereids.rules.implementation.LogicalCTEProducerToPhysicalCTEProducer;
+import 
org.apache.doris.nereids.rules.implementation.LogicalDeferMaterializeOlapScanToPhysicalDeferMaterializeOlapScan;
+import 
org.apache.doris.nereids.rules.implementation.LogicalDeferMaterializeResultSinkToPhysicalDeferMaterializeResultSink;
+import 
org.apache.doris.nereids.rules.implementation.LogicalDeferMaterializeTopNToPhysicalDeferMaterializeTopN;
 import 
org.apache.doris.nereids.rules.implementation.LogicalEmptyRelationToPhysicalEmptyRelation;
 import 
org.apache.doris.nereids.rules.implementation.LogicalEsScanToPhysicalEsScan;
 import 
org.apache.doris.nereids.rules.implementation.LogicalExceptToPhysicalExcept;
@@ -143,6 +146,7 @@ public class RuleSet {
             .add(new LogicalJoinToHashJoin())
             .add(new LogicalJoinToNestedLoopJoin())
             .add(new LogicalOlapScanToPhysicalOlapScan())
+            .add(new 
LogicalDeferMaterializeOlapScanToPhysicalDeferMaterializeOlapScan())
             .add(new LogicalSchemaScanToPhysicalSchemaScan())
             .add(new LogicalFileScanToPhysicalFileScan())
             .add(new LogicalJdbcScanToPhysicalJdbcScan())
@@ -152,6 +156,7 @@ public class RuleSet {
             .add(new LogicalWindowToPhysicalWindow())
             .add(new LogicalSortToPhysicalQuickSort())
             .add(new LogicalTopNToPhysicalTopN())
+            .add(new 
LogicalDeferMaterializeTopNToPhysicalDeferMaterializeTopN())
             .add(new LogicalPartitionTopNToPhysicalPartitionTopN())
             .add(new LogicalAssertNumRowsToPhysicalAssertNumRows())
             .add(new LogicalOneRowRelationToPhysicalOneRowRelation())
@@ -165,6 +170,7 @@ public class RuleSet {
             .add(new LogicalOlapTableSinkToPhysicalOlapTableSink())
             .add(new LogicalFileSinkToPhysicalFileSink())
             .add(new LogicalResultSinkToPhysicalResultSink())
+            .add(new 
LogicalDeferMaterializeResultSinkToPhysicalDeferMaterializeResultSink())
             .build();
 
     public static final List<Rule> ZIG_ZAG_TREE_JOIN_REORDER = 
planRuleFactories()
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleType.java 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleType.java
index d43438ee85..15dd9f37a9 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleType.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleType.java
@@ -248,6 +248,9 @@ public enum RuleType {
     COLLECT_PROJECT_ABOVE_FILTER_CONSUMER(RuleTypeClass.REWRITE),
     REWRITE_SENTINEL(RuleTypeClass.REWRITE),
 
+    // topn opts
+    DEFER_MATERIALIZE_TOP_N_RESULT(RuleTypeClass.REWRITE),
+
     // exploration rules
     TEST_EXPLORATION(RuleTypeClass.EXPLORATION),
     OR_EXPANSION(RuleTypeClass.EXPLORATION),
@@ -295,16 +298,19 @@ public enum RuleType {
     
LOGICAL_CTE_ANCHOR_TO_PHYSICAL_CTE_ANCHOR_RULE(RuleTypeClass.IMPLEMENTATION),
     LOGICAL_SORT_TO_PHYSICAL_QUICK_SORT_RULE(RuleTypeClass.IMPLEMENTATION),
     LOGICAL_TOP_N_TO_PHYSICAL_TOP_N_RULE(RuleTypeClass.IMPLEMENTATION),
+    
LOGICAL_DEFER_MATERIALIZE_TOP_N_TO_PHYSICAL_DEFER_MATERIALIZE_TOP_N_RULE(RuleTypeClass.IMPLEMENTATION),
     
LOGICAL_PARTITION_TOP_N_TO_PHYSICAL_PARTITION_TOP_N_RULE(RuleTypeClass.IMPLEMENTATION),
     
LOGICAL_EMPTY_RELATION_TO_PHYSICAL_EMPTY_RELATION_RULE(RuleTypeClass.IMPLEMENTATION),
     LOGICAL_LIMIT_TO_PHYSICAL_LIMIT_RULE(RuleTypeClass.IMPLEMENTATION),
     LOGICAL_OLAP_SCAN_TO_PHYSICAL_OLAP_SCAN_RULE(RuleTypeClass.IMPLEMENTATION),
+    
LOGICAL_DEFER_MATERIALIZE_OLAP_SCAN_TO_PHYSICAL_DEFER_MATERIALIZE_OLAP_SCAN_RULE(RuleTypeClass.IMPLEMENTATION),
     
LOGICAL_SCHEMA_SCAN_TO_PHYSICAL_SCHEMA_SCAN_RULE(RuleTypeClass.IMPLEMENTATION),
     LOGICAL_FILE_SCAN_TO_PHYSICAL_FILE_SCAN_RULE(RuleTypeClass.IMPLEMENTATION),
     LOGICAL_JDBC_SCAN_TO_PHYSICAL_JDBC_SCAN_RULE(RuleTypeClass.IMPLEMENTATION),
     LOGICAL_ES_SCAN_TO_PHYSICAL_ES_SCAN_RULE(RuleTypeClass.IMPLEMENTATION),
     
LOGICAL_OLAP_TABLE_SINK_TO_PHYSICAL_OLAP_TABLE_SINK_RULE(RuleTypeClass.IMPLEMENTATION),
     
LOGICAL_RESULT_SINK_TO_PHYSICAL_RESULT_SINK_RULE(RuleTypeClass.IMPLEMENTATION),
+    
LOGICAL_DEFER_MATERIALIZE_RESULT_SINK_TO_PHYSICAL_DEFER_MATERIALIZE_RESULT_SINK_RULE(RuleTypeClass.IMPLEMENTATION),
     LOGICAL_FILE_SINK_TO_PHYSICAL_FILE_SINK_RULE(RuleTypeClass.IMPLEMENTATION),
     
LOGICAL_ASSERT_NUM_ROWS_TO_PHYSICAL_ASSERT_NUM_ROWS(RuleTypeClass.IMPLEMENTATION),
     STORAGE_LAYER_AGGREGATE_WITHOUT_PROJECT(RuleTypeClass.IMPLEMENTATION),
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalDeferMaterializeOlapScanToPhysicalDeferMaterializeOlapScan.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalDeferMaterializeOlapScanToPhysicalDeferMaterializeOlapScan.java
new file mode 100644
index 0000000000..8d30f32dbc
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalDeferMaterializeOlapScanToPhysicalDeferMaterializeOlapScan.java
@@ -0,0 +1,45 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.rules.implementation;
+
+import org.apache.doris.nereids.rules.Rule;
+import org.apache.doris.nereids.rules.RuleType;
+import 
org.apache.doris.nereids.trees.plans.logical.LogicalDeferMaterializeOlapScan;
+import 
org.apache.doris.nereids.trees.plans.physical.PhysicalDeferMaterializeOlapScan;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapScan;
+
+import java.util.Optional;
+
+/**
+ * implement defer materialize olap scan from logical to physical
+ */
+public class LogicalDeferMaterializeOlapScanToPhysicalDeferMaterializeOlapScan 
extends OneImplementationRuleFactory {
+    @Override
+    public Rule build() {
+        return logicalDeferMaterializeOlapScan().thenApply(ctx -> {
+            LogicalDeferMaterializeOlapScan logicalDeferOlapScan = ctx.root;
+            PhysicalOlapScan physicalOlapScan = (PhysicalOlapScan) new 
LogicalOlapScanToPhysicalOlapScan().build()
+                    .transform(logicalDeferOlapScan.getLogicalOlapScan(), 
ctx.cascadesContext).get(0);
+            return new PhysicalDeferMaterializeOlapScan(physicalOlapScan,
+                    logicalDeferOlapScan.getDeferMaterializeSlotIds(),
+                    logicalDeferOlapScan.getColumnIdSlot(),
+                    Optional.empty(),
+                    logicalDeferOlapScan.getLogicalProperties());
+        
}).toRule(RuleType.LOGICAL_DEFER_MATERIALIZE_OLAP_SCAN_TO_PHYSICAL_DEFER_MATERIALIZE_OLAP_SCAN_RULE);
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalDeferMaterializeResultSinkToPhysicalDeferMaterializeResultSink.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalDeferMaterializeResultSinkToPhysicalDeferMaterializeResultSink.java
new file mode 100644
index 0000000000..0c02841e2b
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalDeferMaterializeResultSinkToPhysicalDeferMaterializeResultSink.java
@@ -0,0 +1,48 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.rules.implementation;
+
+import org.apache.doris.nereids.rules.Rule;
+import org.apache.doris.nereids.rules.RuleType;
+import org.apache.doris.nereids.trees.plans.Plan;
+import 
org.apache.doris.nereids.trees.plans.logical.LogicalDeferMaterializeResultSink;
+import 
org.apache.doris.nereids.trees.plans.physical.PhysicalDeferMaterializeResultSink;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalResultSink;
+
+import java.util.Optional;
+
+/**
+ * implement defer materialize result sink from logical to physical
+ */
+public class 
LogicalDeferMaterializeResultSinkToPhysicalDeferMaterializeResultSink
+        extends OneImplementationRuleFactory {
+    @Override
+    public Rule build() {
+        return logicalDeferMaterializeResultSink().thenApply(ctx -> {
+            LogicalDeferMaterializeResultSink<? extends Plan> sink = ctx.root;
+            PhysicalResultSink<? extends Plan> physicalResultSink
+                    = (PhysicalResultSink<? extends Plan>) new 
LogicalResultSinkToPhysicalResultSink()
+                    .build()
+                    .transform(sink.getLogicalResultSink(), 
ctx.cascadesContext)
+                    .get(0);
+            return new PhysicalDeferMaterializeResultSink<>(
+                    physicalResultSink, sink.getOlapTable(), 
sink.getSelectedIndexId(),
+                    Optional.empty(), sink.getLogicalProperties(), 
sink.child());
+        
}).toRule(RuleType.LOGICAL_DEFER_MATERIALIZE_RESULT_SINK_TO_PHYSICAL_DEFER_MATERIALIZE_RESULT_SINK_RULE);
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalDeferMaterializeTopNToPhysicalDeferMaterializeTopN.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalDeferMaterializeTopNToPhysicalDeferMaterializeTopN.java
new file mode 100644
index 0000000000..9ad6b73d1c
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalDeferMaterializeTopNToPhysicalDeferMaterializeTopN.java
@@ -0,0 +1,53 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.rules.implementation;
+
+import org.apache.doris.nereids.rules.Rule;
+import org.apache.doris.nereids.rules.RuleType;
+import org.apache.doris.nereids.trees.plans.Plan;
+import 
org.apache.doris.nereids.trees.plans.logical.LogicalDeferMaterializeTopN;
+import 
org.apache.doris.nereids.trees.plans.physical.PhysicalDeferMaterializeTopN;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalTopN;
+
+import java.util.Optional;
+
+/**
+ * implement defer materialize top n from logical to physical
+ */
+public class LogicalDeferMaterializeTopNToPhysicalDeferMaterializeTopN extends 
OneImplementationRuleFactory {
+    @Override
+    public Rule build() {
+        return logicalDeferMaterializeTopN().thenApply(ctx -> {
+            LogicalDeferMaterializeTopN<? extends Plan> topN = ctx.root;
+            PhysicalTopN<? extends Plan> physicalTopN = (PhysicalTopN<? 
extends Plan>) new LogicalTopNToPhysicalTopN()
+                    .build()
+                    .transform(topN.getLogicalTopN(), ctx.cascadesContext)
+                    .get(0);
+            return wrap(physicalTopN, topN, wrap((PhysicalTopN<? extends 
Plan>) physicalTopN.child(), topN,
+                    ((PhysicalTopN<?>) physicalTopN.child()).child()));
+
+        
}).toRule(RuleType.LOGICAL_DEFER_MATERIALIZE_TOP_N_TO_PHYSICAL_DEFER_MATERIALIZE_TOP_N_RULE);
+    }
+
+    private PhysicalDeferMaterializeTopN<? extends Plan> wrap(PhysicalTopN<? 
extends Plan> physicalTopN,
+            LogicalDeferMaterializeTopN<? extends Plan> logicalWrapped, Plan 
child) {
+        return new PhysicalDeferMaterializeTopN<>(physicalTopN,
+                logicalWrapped.getDeferMaterializeSlotIds(), 
logicalWrapped.getColumnIdSlot(),
+                Optional.empty(), logicalWrapped.getLogicalProperties(), 
child);
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/DeferMaterializeTopNResult.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/DeferMaterializeTopNResult.java
new file mode 100644
index 0000000000..4cd41bd509
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/DeferMaterializeTopNResult.java
@@ -0,0 +1,113 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.rules.rewrite;
+
+import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.Type;
+import org.apache.doris.nereids.properties.OrderKey;
+import org.apache.doris.nereids.rules.Rule;
+import org.apache.doris.nereids.rules.RuleType;
+import org.apache.doris.nereids.trees.expressions.ExprId;
+import org.apache.doris.nereids.trees.expressions.Expression;
+import org.apache.doris.nereids.trees.expressions.NamedExpression;
+import org.apache.doris.nereids.trees.expressions.Slot;
+import org.apache.doris.nereids.trees.expressions.SlotReference;
+import org.apache.doris.nereids.trees.plans.Plan;
+import 
org.apache.doris.nereids.trees.plans.logical.LogicalDeferMaterializeOlapScan;
+import 
org.apache.doris.nereids.trees.plans.logical.LogicalDeferMaterializeResultSink;
+import 
org.apache.doris.nereids.trees.plans.logical.LogicalDeferMaterializeTopN;
+import org.apache.doris.nereids.trees.plans.logical.LogicalFilter;
+import org.apache.doris.nereids.trees.plans.logical.LogicalOlapScan;
+import org.apache.doris.nereids.trees.plans.logical.LogicalResultSink;
+import org.apache.doris.nereids.trees.plans.logical.LogicalTopN;
+import org.apache.doris.qe.ConnectContext;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Sets;
+
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+
+/**
+ * rewrite simple top n query to defer materialize slot not use for sort or 
predicate
+ */
+public class DeferMaterializeTopNResult implements RewriteRuleFactory {
+
+    @Override
+    public List<Rule> buildRules() {
+        return ImmutableList.of(
+                RuleType.DEFER_MATERIALIZE_TOP_N_RESULT.build(
+                        logicalResultSink(logicalTopN(logicalOlapScan()))
+                                .when(r -> r.child().getLimit() < 
getTopNOptLimitThreshold())
+                                .whenNot(r -> 
r.child().getOrderKeys().isEmpty())
+                                .when(r -> 
r.child().getOrderKeys().stream().map(OrderKey::getExpr)
+                                        
.allMatch(Expression::isColumnFromTable))
+                                .when(r -> 
r.child().child().getTable().getEnableLightSchemaChange())
+                                .then(r -> deferMaterialize(r, r.child(), 
Optional.empty(), r.child().child()))
+                ),
+                RuleType.DEFER_MATERIALIZE_TOP_N_RESULT.build(
+                        
logicalResultSink(logicalTopN(logicalFilter(logicalOlapScan())))
+                                .when(r -> r.child().getLimit() < 
getTopNOptLimitThreshold())
+                                .whenNot(r -> 
r.child().getOrderKeys().isEmpty())
+                                .when(r -> 
r.child().getOrderKeys().stream().map(OrderKey::getExpr)
+                                        
.allMatch(Expression::isColumnFromTable))
+                                .when(r -> 
r.child().child().child().getTable().getEnableLightSchemaChange())
+                                .then(r -> {
+                                    LogicalFilter<LogicalOlapScan> filter = 
r.child().child();
+                                    return deferMaterialize(r, r.child(), 
Optional.of(filter), filter.child());
+                                })
+                )
+        );
+    }
+
+    private Plan deferMaterialize(LogicalResultSink<? extends Plan> 
logicalResultSink,
+            LogicalTopN<? extends Plan> logicalTopN, Optional<LogicalFilter<? 
extends Plan>> logicalFilter,
+            LogicalOlapScan logicalOlapScan) {
+        Column rowId = new Column(Column.ROWID_COL, Type.STRING, false, null, 
false, "", "rowid column");
+        SlotReference columnId = SlotReference.fromColumn(rowId, 
logicalOlapScan.getQualifier());
+        Set<ExprId> deferredMaterializedExprIds = 
Sets.newHashSet(logicalOlapScan.getOutputExprIdSet());
+        logicalFilter.ifPresent(filter -> filter.getConjuncts()
+                .forEach(e -> 
deferredMaterializedExprIds.removeAll(e.getInputSlotExprIds())));
+        logicalTopN.getOrderKeys().stream()
+                .map(OrderKey::getExpr)
+                .map(Slot.class::cast)
+                .map(NamedExpression::getExprId)
+                .filter(Objects::nonNull)
+                .forEach(deferredMaterializedExprIds::remove);
+        LogicalDeferMaterializeOlapScan deferOlapScan = new 
LogicalDeferMaterializeOlapScan(
+                logicalOlapScan, deferredMaterializedExprIds, columnId);
+        Plan root = logicalFilter.map(f -> 
f.withChildren(deferOlapScan)).orElse(deferOlapScan);
+        root = new LogicalDeferMaterializeTopN<>((LogicalTopN<? extends Plan>) 
logicalTopN.withChildren(root),
+                deferredMaterializedExprIds, columnId);
+        root = logicalResultSink.withChildren(root);
+        return new LogicalDeferMaterializeResultSink<>((LogicalResultSink<? 
extends Plan>) root,
+                logicalOlapScan.getTable(), 
logicalOlapScan.getSelectedIndexId());
+    }
+
+    private long getTopNOptLimitThreshold() {
+        if (ConnectContext.get() != null && 
ConnectContext.get().getSessionVariable() != null) {
+            if 
(!ConnectContext.get().getSessionVariable().enableTwoPhaseReadOpt) {
+                return -1;
+            }
+            return 
ConnectContext.get().getSessionVariable().topnOptLimitThreshold;
+        }
+        return -1;
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/stats/StatsCalculator.java 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/stats/StatsCalculator.java
index 34eef3f433..d25a328c17 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/stats/StatsCalculator.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/stats/StatsCalculator.java
@@ -57,6 +57,8 @@ import 
org.apache.doris.nereids.trees.plans.logical.LogicalAssertNumRows;
 import org.apache.doris.nereids.trees.plans.logical.LogicalCTEAnchor;
 import org.apache.doris.nereids.trees.plans.logical.LogicalCTEConsumer;
 import org.apache.doris.nereids.trees.plans.logical.LogicalCTEProducer;
+import 
org.apache.doris.nereids.trees.plans.logical.LogicalDeferMaterializeOlapScan;
+import 
org.apache.doris.nereids.trees.plans.logical.LogicalDeferMaterializeTopN;
 import org.apache.doris.nereids.trees.plans.logical.LogicalEmptyRelation;
 import org.apache.doris.nereids.trees.plans.logical.LogicalEsScan;
 import org.apache.doris.nereids.trees.plans.logical.LogicalExcept;
@@ -83,6 +85,8 @@ import 
org.apache.doris.nereids.trees.plans.physical.PhysicalAssertNumRows;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalCTEAnchor;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalCTEConsumer;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalCTEProducer;
+import 
org.apache.doris.nereids.trees.plans.physical.PhysicalDeferMaterializeOlapScan;
+import 
org.apache.doris.nereids.trees.plans.physical.PhysicalDeferMaterializeTopN;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalDistribute;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalEmptyRelation;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalEsScan;
@@ -283,6 +287,12 @@ public class StatsCalculator extends 
DefaultPlanVisitor<Statistics, Void> {
         return computeCatalogRelation(olapScan);
     }
 
+    @Override
+    public Statistics 
visitLogicalDeferMaterializeOlapScan(LogicalDeferMaterializeOlapScan 
deferMaterializeOlapScan,
+            Void context) {
+        return 
computeCatalogRelation(deferMaterializeOlapScan.getLogicalOlapScan());
+    }
+
     @Override
     public Statistics visitLogicalSchemaScan(LogicalSchemaScan schemaScan, 
Void context) {
         return computeCatalogRelation(schemaScan);
@@ -326,6 +336,11 @@ public class StatsCalculator extends 
DefaultPlanVisitor<Statistics, Void> {
         return computeTopN(topN);
     }
 
+    @Override
+    public Statistics 
visitLogicalDeferMaterializeTopN(LogicalDeferMaterializeTopN<? extends Plan> 
topN, Void context) {
+        return computeTopN(topN.getLogicalTopN());
+    }
+
     @Override
     public Statistics visitLogicalPartitionTopN(LogicalPartitionTopN<? extends 
Plan> partitionTopN, Void context) {
         return computePartitionTopN(partitionTopN);
@@ -410,6 +425,12 @@ public class StatsCalculator extends 
DefaultPlanVisitor<Statistics, Void> {
         return computeCatalogRelation(olapScan);
     }
 
+    @Override
+    public Statistics 
visitPhysicalDeferMaterializeOlapScan(PhysicalDeferMaterializeOlapScan 
deferMaterializeOlapScan,
+            Void context) {
+        return 
computeCatalogRelation(deferMaterializeOlapScan.getPhysicalOlapScan());
+    }
+
     @Override
     public Statistics visitPhysicalSchemaScan(PhysicalSchemaScan schemaScan, 
Void context) {
         return computeCatalogRelation(schemaScan);
@@ -451,6 +472,12 @@ public class StatsCalculator extends 
DefaultPlanVisitor<Statistics, Void> {
         return computeTopN(topN);
     }
 
+    @Override
+    public Statistics 
visitPhysicalDeferMaterializeTopN(PhysicalDeferMaterializeTopN<? extends Plan> 
topN,
+            Void context) {
+        return computeTopN(topN.getPhysicalTopN());
+    }
+
     @Override
     public Statistics visitPhysicalHashJoin(
             PhysicalHashJoin<? extends Plan, ? extends Plan> hashJoin, Void 
context) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/copier/LogicalPlanDeepCopier.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/copier/LogicalPlanDeepCopier.java
index 400d27e71a..5727279ccf 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/copier/LogicalPlanDeepCopier.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/copier/LogicalPlanDeepCopier.java
@@ -25,6 +25,7 @@ import 
org.apache.doris.nereids.trees.expressions.MarkJoinSlotReference;
 import org.apache.doris.nereids.trees.expressions.NamedExpression;
 import org.apache.doris.nereids.trees.expressions.OrderExpression;
 import org.apache.doris.nereids.trees.expressions.Slot;
+import org.apache.doris.nereids.trees.expressions.SlotReference;
 import org.apache.doris.nereids.trees.expressions.StatementScopeIdGenerator;
 import org.apache.doris.nereids.trees.expressions.SubqueryExpr;
 import org.apache.doris.nereids.trees.expressions.functions.Function;
@@ -35,6 +36,8 @@ import 
org.apache.doris.nereids.trees.plans.logical.LogicalAssertNumRows;
 import org.apache.doris.nereids.trees.plans.logical.LogicalCTEAnchor;
 import org.apache.doris.nereids.trees.plans.logical.LogicalCTEConsumer;
 import org.apache.doris.nereids.trees.plans.logical.LogicalCTEProducer;
+import 
org.apache.doris.nereids.trees.plans.logical.LogicalDeferMaterializeOlapScan;
+import 
org.apache.doris.nereids.trees.plans.logical.LogicalDeferMaterializeTopN;
 import org.apache.doris.nereids.trees.plans.logical.LogicalEmptyRelation;
 import org.apache.doris.nereids.trees.plans.logical.LogicalEsScan;
 import org.apache.doris.nereids.trees.plans.logical.LogicalExcept;
@@ -171,6 +174,19 @@ public class LogicalPlanDeepCopier extends 
DefaultPlanRewriter<DeepCopierContext
         return newOlapScan;
     }
 
+    @Override
+    public Plan 
visitLogicalDeferMaterializeOlapScan(LogicalDeferMaterializeOlapScan 
deferMaterializeOlapScan,
+            DeepCopierContext context) {
+        LogicalOlapScan newScan = (LogicalOlapScan) visitLogicalOlapScan(
+                deferMaterializeOlapScan.getLogicalOlapScan(), context);
+        Set<ExprId> newSlotIds = 
deferMaterializeOlapScan.getDeferMaterializeSlotIds().stream()
+                .map(context.exprIdReplaceMap::get)
+                .collect(ImmutableSet.toImmutableSet());
+        SlotReference newRowId = (SlotReference) ExpressionDeepCopier.INSTANCE
+                .deepCopy(deferMaterializeOlapScan.getColumnIdSlot(), context);
+        return new LogicalDeferMaterializeOlapScan(newScan, newSlotIds, 
newRowId);
+    }
+
     @Override
     public Plan visitLogicalSchemaScan(LogicalSchemaScan schemaScan, 
DeepCopierContext context) {
         if 
(context.getRelationReplaceMap().containsKey(schemaScan.getRelationId())) {
@@ -263,6 +279,19 @@ public class LogicalPlanDeepCopier extends 
DefaultPlanRewriter<DeepCopierContext
         return new LogicalTopN<>(orderKeys, topN.getLimit(), topN.getOffset(), 
child);
     }
 
+    @Override
+    public Plan visitLogicalDeferMaterializeTopN(LogicalDeferMaterializeTopN<? 
extends Plan> topN,
+            DeepCopierContext context) {
+        LogicalTopN<? extends Plan> newTopN
+                = (LogicalTopN<? extends Plan>) 
visitLogicalTopN(topN.getLogicalTopN(), context);
+        Set<ExprId> newSlotIds = topN.getDeferMaterializeSlotIds().stream()
+                .map(context.exprIdReplaceMap::get)
+                .collect(ImmutableSet.toImmutableSet());
+        SlotReference newRowId = (SlotReference) ExpressionDeepCopier.INSTANCE
+                .deepCopy(topN.getColumnIdSlot(), context);
+        return new LogicalDeferMaterializeTopN<>(newTopN, newSlotIds, 
newRowId);
+    }
+
     @Override
     public Plan visitLogicalPartitionTopN(LogicalPartitionTopN<? extends Plan> 
partitionTopN,
             DeepCopierContext context) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalDeferMaterializeOlapScan.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalDeferMaterializeOlapScan.java
new file mode 100644
index 0000000000..a49e37d8df
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalDeferMaterializeOlapScan.java
@@ -0,0 +1,168 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.trees.plans.logical;
+
+import org.apache.doris.catalog.OlapTable;
+import org.apache.doris.nereids.memo.GroupExpression;
+import org.apache.doris.nereids.properties.LogicalProperties;
+import org.apache.doris.nereids.trees.expressions.ExprId;
+import org.apache.doris.nereids.trees.expressions.Slot;
+import org.apache.doris.nereids.trees.expressions.SlotReference;
+import org.apache.doris.nereids.trees.plans.Plan;
+import org.apache.doris.nereids.trees.plans.algebra.OlapScan;
+import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
+import org.apache.doris.nereids.util.Utils;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+
+/**
+ * use for defer materialize top n
+ */
+public class LogicalDeferMaterializeOlapScan extends LogicalCatalogRelation 
implements OlapScan {
+
+    private final LogicalOlapScan logicalOlapScan;
+
+    ///////////////////////////////////////////////////////////////////////////
+    // Members for defer materialize for top-n opt.
+    ///////////////////////////////////////////////////////////////////////////
+    private final Set<ExprId> deferMaterializeSlotIds;
+    private final SlotReference columnIdSlot;
+
+    public LogicalDeferMaterializeOlapScan(LogicalOlapScan logicalOlapScan,
+            Set<ExprId> deferMaterializeSlotIds, SlotReference columnIdSlot) {
+        this(logicalOlapScan, deferMaterializeSlotIds, columnIdSlot,
+                logicalOlapScan.getGroupExpression(), Optional.empty());
+    }
+
+    /**
+     * constructor
+     */
+    public LogicalDeferMaterializeOlapScan(LogicalOlapScan logicalOlapScan,
+            Set<ExprId> deferMaterializeSlotIds, SlotReference columnIdSlot,
+            Optional<GroupExpression> groupExpression, 
Optional<LogicalProperties> logicalProperties) {
+        super(logicalOlapScan.getRelationId(), logicalOlapScan.getType(), 
logicalOlapScan.getTable(),
+                logicalOlapScan.getQualifier(), groupExpression, 
logicalProperties);
+        this.logicalOlapScan = Objects.requireNonNull(logicalOlapScan, 
"logicalOlapScan can not be null");
+        this.deferMaterializeSlotIds = 
ImmutableSet.copyOf(Objects.requireNonNull(deferMaterializeSlotIds,
+                "deferMaterializeSlotIds can not be null"));
+        this.columnIdSlot = Objects.requireNonNull(columnIdSlot, "columnIdSlot 
can not be null");
+    }
+
+    public LogicalOlapScan getLogicalOlapScan() {
+        return logicalOlapScan;
+    }
+
+    public Set<ExprId> getDeferMaterializeSlotIds() {
+        return deferMaterializeSlotIds;
+    }
+
+    public SlotReference getColumnIdSlot() {
+        return columnIdSlot;
+    }
+
+    @Override
+    public OlapTable getTable() {
+        return logicalOlapScan.getTable();
+    }
+
+    @Override
+    public long getSelectedIndexId() {
+        return logicalOlapScan.getSelectedIndexId();
+    }
+
+    @Override
+    public List<Long> getSelectedPartitionIds() {
+        return logicalOlapScan.getSelectedPartitionIds();
+    }
+
+    @Override
+    public List<Long> getSelectedTabletIds() {
+        return logicalOlapScan.getSelectedPartitionIds();
+    }
+
+    @Override
+    public List<Slot> computeOutput() {
+        return ImmutableList.<Slot>builder()
+                .addAll(logicalOlapScan.getOutput())
+                .add(columnIdSlot)
+                .build();
+    }
+
+    @Override
+    public Plan withGroupExpression(Optional<GroupExpression> groupExpression) 
{
+        return new LogicalDeferMaterializeOlapScan(logicalOlapScan, 
deferMaterializeSlotIds, columnIdSlot,
+                groupExpression, Optional.of(getLogicalProperties()));
+    }
+
+    @Override
+    public Plan withGroupExprLogicalPropChildren(Optional<GroupExpression> 
groupExpression,
+            Optional<LogicalProperties> logicalProperties, List<Plan> 
children) {
+        Preconditions.checkArgument(children.isEmpty(), 
"LogicalDeferMaterializeOlapScan should have no child");
+        return new LogicalDeferMaterializeOlapScan(logicalOlapScan, 
deferMaterializeSlotIds, columnIdSlot,
+                groupExpression, logicalProperties);
+    }
+
+    @Override
+    public Plan withChildren(List<Plan> children) {
+        Preconditions.checkArgument(children.isEmpty(), 
"LogicalDeferMaterializeOlapScan should have no child");
+        return this;
+    }
+
+    @Override
+    public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
+        return visitor.visitLogicalDeferMaterializeOlapScan(this, context);
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        if (!super.equals(o)) {
+            return false;
+        }
+        LogicalDeferMaterializeOlapScan that = 
(LogicalDeferMaterializeOlapScan) o;
+        return Objects.equals(logicalOlapScan, that.logicalOlapScan) && 
Objects.equals(
+                deferMaterializeSlotIds, that.deferMaterializeSlotIds) && 
Objects.equals(columnIdSlot,
+                that.columnIdSlot);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(super.hashCode(), logicalOlapScan, 
deferMaterializeSlotIds, columnIdSlot);
+    }
+
+    @Override
+    public String toString() {
+        return Utils.toSqlString("LogicalDeferMaterializeOlapScan[" + 
id.asInt() + "]",
+                "olapScan", logicalOlapScan,
+                "deferMaterializeSlotIds", deferMaterializeSlotIds,
+                "columnIdSlot", columnIdSlot
+        );
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalDeferMaterializeResultSink.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalDeferMaterializeResultSink.java
new file mode 100644
index 0000000000..48ea072045
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalDeferMaterializeResultSink.java
@@ -0,0 +1,146 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.trees.plans.logical;
+
+import org.apache.doris.catalog.OlapTable;
+import org.apache.doris.nereids.memo.GroupExpression;
+import org.apache.doris.nereids.properties.LogicalProperties;
+import org.apache.doris.nereids.trees.expressions.Expression;
+import org.apache.doris.nereids.trees.expressions.Slot;
+import org.apache.doris.nereids.trees.plans.Plan;
+import org.apache.doris.nereids.trees.plans.algebra.Sink;
+import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
+import org.apache.doris.nereids.util.Utils;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+
+/**
+ * use for defer materialize top n
+ */
+public class LogicalDeferMaterializeResultSink<CHILD_TYPE extends Plan>
+        extends LogicalSink<CHILD_TYPE> implements Sink {
+
+    private final LogicalResultSink<? extends Plan> logicalResultSink;
+    private final OlapTable olapTable;
+    private final long selectedIndexId;
+
+    public LogicalDeferMaterializeResultSink(LogicalResultSink<CHILD_TYPE> 
logicalResultSink,
+            OlapTable olapTable, long selectedIndexId) {
+        this(logicalResultSink, olapTable, selectedIndexId,
+                Optional.empty(), Optional.empty(), logicalResultSink.child());
+    }
+
+    public LogicalDeferMaterializeResultSink(LogicalResultSink<? extends Plan> 
logicalResultSink,
+            OlapTable olapTable, long selectedIndexId,
+            Optional<GroupExpression> groupExpression, 
Optional<LogicalProperties> logicalProperties,
+            CHILD_TYPE child) {
+        super(logicalResultSink.getType(), groupExpression, logicalProperties, 
child);
+        this.logicalResultSink = logicalResultSink;
+        this.olapTable = olapTable;
+        this.selectedIndexId = selectedIndexId;
+    }
+
+    public LogicalResultSink<? extends Plan> getLogicalResultSink() {
+        return logicalResultSink;
+    }
+
+    public OlapTable getOlapTable() {
+        return olapTable;
+    }
+
+    public long getSelectedIndexId() {
+        return selectedIndexId;
+    }
+
+    @Override
+    public LogicalDeferMaterializeResultSink<Plan> withChildren(List<Plan> 
children) {
+        Preconditions.checkArgument(children.size() == 1,
+                "LogicalDeferMaterializeResultSink only accepts one child");
+        return new LogicalDeferMaterializeResultSink<>(
+                
logicalResultSink.withChildren(ImmutableList.of(children.get(0))),
+                olapTable, selectedIndexId, Optional.empty(), 
Optional.empty(), children.get(0));
+    }
+
+    @Override
+    public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
+        return visitor.visitLogicalDeferMaterializeResultSink(this, context);
+    }
+
+    @Override
+    public List<? extends Expression> getExpressions() {
+        return logicalResultSink.getExpressions();
+    }
+
+    @Override
+    public LogicalDeferMaterializeResultSink<Plan> 
withGroupExpression(Optional<GroupExpression> groupExpression) {
+        return new LogicalDeferMaterializeResultSink<>(logicalResultSink, 
olapTable, selectedIndexId,
+                groupExpression, Optional.of(getLogicalProperties()), child());
+    }
+
+    @Override
+    public LogicalDeferMaterializeResultSink<Plan> 
withGroupExprLogicalPropChildren(
+            Optional<GroupExpression> groupExpression,
+            Optional<LogicalProperties> logicalProperties,
+            List<Plan> children) {
+        Preconditions.checkArgument(children.size() == 1,
+                "LogicalDeferMaterializeResultSink only accepts one child");
+        return new LogicalDeferMaterializeResultSink<>(
+                
logicalResultSink.withChildren(ImmutableList.of(children.get(0))),
+                olapTable, selectedIndexId, groupExpression, 
logicalProperties, children.get(0));
+    }
+
+    @Override
+    public List<Slot> computeOutput() {
+        return child().getOutput();
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        if (!super.equals(o)) {
+            return false;
+        }
+        LogicalDeferMaterializeResultSink<?> that = 
(LogicalDeferMaterializeResultSink<?>) o;
+        return selectedIndexId == that.selectedIndexId && 
Objects.equals(logicalResultSink,
+                that.logicalResultSink) && Objects.equals(olapTable, 
that.olapTable);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(super.hashCode(), logicalResultSink, olapTable, 
selectedIndexId);
+    }
+
+    @Override
+    public String toString() {
+        return Utils.toSqlString("LogicalDeferMaterializeResultSink[" + 
id.asInt() + "]",
+                "logicalResultSink", logicalResultSink,
+                "olapTable", olapTable,
+                "selectedIndexId", selectedIndexId
+        );
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalDeferMaterializeTopN.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalDeferMaterializeTopN.java
new file mode 100644
index 0000000000..b775cb1db2
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalDeferMaterializeTopN.java
@@ -0,0 +1,171 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.trees.plans.logical;
+
+import org.apache.doris.nereids.memo.GroupExpression;
+import org.apache.doris.nereids.properties.LogicalProperties;
+import org.apache.doris.nereids.properties.OrderKey;
+import org.apache.doris.nereids.trees.expressions.ExprId;
+import org.apache.doris.nereids.trees.expressions.Expression;
+import org.apache.doris.nereids.trees.expressions.Slot;
+import org.apache.doris.nereids.trees.expressions.SlotReference;
+import org.apache.doris.nereids.trees.plans.Plan;
+import org.apache.doris.nereids.trees.plans.PlanType;
+import org.apache.doris.nereids.trees.plans.algebra.TopN;
+import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
+import org.apache.doris.nereids.util.Utils;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+
+/**
+ * use for defer materialize top n
+ */
+public class LogicalDeferMaterializeTopN<CHILD_TYPE extends Plan> extends 
LogicalUnary<CHILD_TYPE> implements TopN {
+
+    private final LogicalTopN<? extends Plan> logicalTopN;
+
+    ///////////////////////////////////////////////////////////////////////////
+    // Members for defer materialize for top-n opt.
+    ///////////////////////////////////////////////////////////////////////////
+    private final Set<ExprId> deferMaterializeSlotIds;
+    private final SlotReference columnIdSlot;
+
+    public LogicalDeferMaterializeTopN(LogicalTopN<CHILD_TYPE> logicalTopN,
+            Set<ExprId> deferMaterializeSlotIds, SlotReference columnIdSlot) {
+        super(PlanType.LOGICAL_TOP_N, logicalTopN.getGroupExpression(),
+                Optional.of(logicalTopN.getLogicalProperties()), 
logicalTopN.child());
+        this.logicalTopN = logicalTopN;
+        this.deferMaterializeSlotIds = deferMaterializeSlotIds;
+        this.columnIdSlot = columnIdSlot;
+    }
+
+    public LogicalDeferMaterializeTopN(LogicalTopN<? extends Plan> logicalTopN,
+            Set<ExprId> deferMaterializeSlotIds, SlotReference columnIdSlot,
+            Optional<GroupExpression> groupExpression, 
Optional<LogicalProperties> logicalProperties,
+            CHILD_TYPE child) {
+        super(PlanType.LOGICAL_TOP_N, groupExpression, logicalProperties, 
child);
+        this.logicalTopN = logicalTopN;
+        this.deferMaterializeSlotIds = deferMaterializeSlotIds;
+        this.columnIdSlot = columnIdSlot;
+    }
+
+    public LogicalTopN<? extends Plan> getLogicalTopN() {
+        return logicalTopN;
+    }
+
+    public Set<ExprId> getDeferMaterializeSlotIds() {
+        return deferMaterializeSlotIds;
+    }
+
+    public SlotReference getColumnIdSlot() {
+        return columnIdSlot;
+    }
+
+    @Override
+    public List<OrderKey> getOrderKeys() {
+        return logicalTopN.getOrderKeys();
+    }
+
+    @Override
+    public long getOffset() {
+        return logicalTopN.getOffset();
+    }
+
+    @Override
+    public long getLimit() {
+        return logicalTopN.getLimit();
+    }
+
+    @Override
+    public List<? extends Expression> getExpressions() {
+        return ImmutableList.<Expression>builder()
+                .addAll(logicalTopN.getExpressions())
+                .add(columnIdSlot).build();
+    }
+
+    @Override
+    public List<Slot> computeOutput() {
+        return logicalTopN.getOutput().stream()
+                .filter(s -> !(s.getExprId().equals(columnIdSlot.getExprId())))
+                .collect(ImmutableList.toImmutableList());
+    }
+
+    @Override
+    public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
+        return visitor.visitLogicalDeferMaterializeTopN(this, context);
+    }
+
+    @Override
+    public Plan withGroupExpression(Optional<GroupExpression> groupExpression) 
{
+        return new LogicalDeferMaterializeTopN<>(logicalTopN, 
deferMaterializeSlotIds, columnIdSlot,
+                groupExpression, Optional.of(getLogicalProperties()), child());
+    }
+
+    @Override
+    public Plan withGroupExprLogicalPropChildren(Optional<GroupExpression> 
groupExpression,
+            Optional<LogicalProperties> logicalProperties, List<Plan> 
children) {
+        Preconditions.checkArgument(children.size() == 1,
+                "LogicalDeferMaterializeTopN should have 1 child, but input is 
%s", children.size());
+        return new 
LogicalDeferMaterializeTopN<>(logicalTopN.withChildren(ImmutableList.of(children.get(0))),
+                deferMaterializeSlotIds, columnIdSlot, groupExpression, 
logicalProperties, children.get(0));
+    }
+
+    @Override
+    public Plan withChildren(List<Plan> children) {
+        Preconditions.checkArgument(children.size() == 1,
+                "LogicalDeferMaterializeTopN should have 1 child, but input is 
%s", children.size());
+        return new 
LogicalDeferMaterializeTopN<>(logicalTopN.withChildren(ImmutableList.of(children.get(0))),
+                deferMaterializeSlotIds, columnIdSlot, Optional.empty(), 
Optional.empty(), children.get(0));
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        if (!super.equals(o)) {
+            return false;
+        }
+        LogicalDeferMaterializeTopN<?> that = (LogicalDeferMaterializeTopN<?>) 
o;
+        return Objects.equals(logicalTopN, that.logicalTopN) && 
Objects.equals(deferMaterializeSlotIds,
+                that.deferMaterializeSlotIds) && Objects.equals(columnIdSlot, 
that.columnIdSlot);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(super.hashCode(), logicalTopN, 
deferMaterializeSlotIds, columnIdSlot);
+    }
+
+    @Override
+    public String toString() {
+        return Utils.toSqlString("LogicalDeferMaterializeTopN[" + id.asInt() + 
"]",
+                "logicalTopN", logicalTopN,
+                "deferMaterializeSlotIds", deferMaterializeSlotIds,
+                "columnIdSlot", columnIdSlot
+        );
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalOlapScan.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalOlapScan.java
index 6d58ba1f71..60458eb2a2 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalOlapScan.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalOlapScan.java
@@ -141,7 +141,6 @@ public class LogicalOlapScan extends LogicalCatalogRelation 
implements OlapScan
             List<Long> selectedTabletIds, long selectedIndexId, boolean 
indexSelected,
             PreAggStatus preAggStatus, List<Long> specifiedPartitions,
             List<String> hints, Map<String, Slot> cacheSlotWithSlotName) {
-
         super(id, PlanType.LOGICAL_OLAP_SCAN, table, qualifier,
                 groupExpression, logicalProperties);
         Preconditions.checkArgument(selectedPartitionIds != null, 
"selectedPartitionIds can not be null");
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalResultSink.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalResultSink.java
index c21422e858..eb7b2556d3 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalResultSink.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalResultSink.java
@@ -59,7 +59,7 @@ public class LogicalResultSink<CHILD_TYPE extends Plan> 
extends LogicalSink<CHIL
     }
 
     @Override
-    public Plan withChildren(List<Plan> children) {
+    public LogicalResultSink<Plan> withChildren(List<Plan> children) {
         Preconditions.checkArgument(children.size() == 1,
                 "LogicalResultSink's children size must be 1, but real is %s", 
children.size());
         return new LogicalResultSink<>(outputExprs, children.get(0));
@@ -76,14 +76,14 @@ public class LogicalResultSink<CHILD_TYPE extends Plan> 
extends LogicalSink<CHIL
     }
 
     @Override
-    public Plan withGroupExpression(Optional<GroupExpression> groupExpression) 
{
+    public LogicalResultSink<Plan> 
withGroupExpression(Optional<GroupExpression> groupExpression) {
         return new LogicalResultSink<>(outputExprs, groupExpression, 
Optional.of(getLogicalProperties()), child());
     }
 
     @Override
-    public Plan withGroupExprLogicalPropChildren(Optional<GroupExpression> 
groupExpression,
+    public LogicalResultSink<Plan> 
withGroupExprLogicalPropChildren(Optional<GroupExpression> groupExpression,
             Optional<LogicalProperties> logicalProperties, List<Plan> 
children) {
-        Preconditions.checkArgument(children.size() == 1, "UnboundResultSink 
only accepts one child");
+        Preconditions.checkArgument(children.size() == 1, "LogicalResultSink 
only accepts one child");
         return new LogicalResultSink<>(outputExprs, groupExpression, 
logicalProperties, children.get(0));
     }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalTopN.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalTopN.java
index 348a709176..80de9d6215 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalTopN.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalTopN.java
@@ -64,14 +64,17 @@ public class LogicalTopN<CHILD_TYPE extends Plan> extends 
LogicalUnary<CHILD_TYP
         return child().getOutput();
     }
 
+    @Override
     public List<OrderKey> getOrderKeys() {
         return orderKeys;
     }
 
+    @Override
     public long getOffset() {
         return offset;
     }
 
+    @Override
     public long getLimit() {
         return limit;
     }
@@ -93,7 +96,7 @@ public class LogicalTopN<CHILD_TYPE extends Plan> extends 
LogicalUnary<CHILD_TYP
         if (o == null || getClass() != o.getClass()) {
             return false;
         }
-        LogicalTopN that = (LogicalTopN) o;
+        LogicalTopN<?> that = (LogicalTopN<?>) o;
         return this.offset == that.offset && this.limit == that.limit && 
Objects.equals(this.orderKeys, that.orderKeys);
     }
 
@@ -104,7 +107,7 @@ public class LogicalTopN<CHILD_TYPE extends Plan> extends 
LogicalUnary<CHILD_TYP
 
     @Override
     public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
-        return visitor.visitLogicalTopN((LogicalTopN<Plan>) this, context);
+        return visitor.visitLogicalTopN(this, context);
     }
 
     @Override
@@ -121,7 +124,8 @@ public class LogicalTopN<CHILD_TYPE extends Plan> extends 
LogicalUnary<CHILD_TYP
 
     @Override
     public LogicalTopN<Plan> withChildren(List<Plan> children) {
-        Preconditions.checkArgument(children.size() == 1);
+        Preconditions.checkArgument(children.size() == 1,
+                "LogicalTopN should have 1 child, but input is %s", 
children.size());
         return new LogicalTopN<>(orderKeys, limit, offset, children.get(0));
     }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalDeferMaterializeOlapScan.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalDeferMaterializeOlapScan.java
new file mode 100644
index 0000000000..f82bd6dbec
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalDeferMaterializeOlapScan.java
@@ -0,0 +1,157 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.trees.plans.physical;
+
+import org.apache.doris.catalog.OlapTable;
+import org.apache.doris.nereids.memo.GroupExpression;
+import org.apache.doris.nereids.properties.LogicalProperties;
+import org.apache.doris.nereids.properties.PhysicalProperties;
+import org.apache.doris.nereids.trees.expressions.ExprId;
+import org.apache.doris.nereids.trees.expressions.SlotReference;
+import org.apache.doris.nereids.trees.plans.Plan;
+import org.apache.doris.nereids.trees.plans.algebra.OlapScan;
+import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
+import org.apache.doris.nereids.util.Utils;
+import org.apache.doris.statistics.Statistics;
+
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+
+/**
+ * use for defer materialize top n
+ */
+public class PhysicalDeferMaterializeOlapScan extends PhysicalCatalogRelation 
implements OlapScan {
+
+    private final PhysicalOlapScan physicalOlapScan;
+
+    ///////////////////////////////////////////////////////////////////////////
+    // Members for defer materialize for top-n opt.
+    ///////////////////////////////////////////////////////////////////////////
+    private final Set<ExprId> deferMaterializeSlotIds;
+    private final SlotReference columnIdSlot;
+
+    public PhysicalDeferMaterializeOlapScan(PhysicalOlapScan physicalOlapScan,
+            Set<ExprId> deferMaterializeSlotIds, SlotReference columnIdSlot,
+            Optional<GroupExpression> groupExpression, LogicalProperties 
logicalProperties) {
+        this(physicalOlapScan, deferMaterializeSlotIds, columnIdSlot, 
groupExpression, logicalProperties, null, null);
+    }
+
+    /**
+     * constructor
+     */
+    public PhysicalDeferMaterializeOlapScan(PhysicalOlapScan physicalOlapScan,
+            Set<ExprId> deferMaterializeSlotIds, SlotReference columnIdSlot,
+            Optional<GroupExpression> groupExpression, LogicalProperties 
logicalProperties,
+            PhysicalProperties physicalProperties, Statistics statistics) {
+        super(physicalOlapScan.getRelationId(), physicalOlapScan.getType(),
+                physicalOlapScan.getTable(), physicalOlapScan.getQualifier(),
+                groupExpression, logicalProperties, physicalProperties, 
statistics);
+        this.physicalOlapScan = physicalOlapScan;
+        this.deferMaterializeSlotIds = deferMaterializeSlotIds;
+        this.columnIdSlot = columnIdSlot;
+    }
+
+    public PhysicalOlapScan getPhysicalOlapScan() {
+        return physicalOlapScan;
+    }
+
+    public Set<ExprId> getDeferMaterializeSlotIds() {
+        return deferMaterializeSlotIds;
+    }
+
+    public SlotReference getColumnIdSlot() {
+        return columnIdSlot;
+    }
+
+    @Override
+    public OlapTable getTable() {
+        return physicalOlapScan.getTable();
+    }
+
+    @Override
+    public long getSelectedIndexId() {
+        return physicalOlapScan.getSelectedIndexId();
+    }
+
+    @Override
+    public List<Long> getSelectedPartitionIds() {
+        return physicalOlapScan.getSelectedPartitionIds();
+    }
+
+    @Override
+    public List<Long> getSelectedTabletIds() {
+        return physicalOlapScan.getSelectedTabletIds();
+    }
+
+    @Override
+    public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
+        return visitor.visitPhysicalDeferMaterializeOlapScan(this, context);
+    }
+
+    @Override
+    public Plan withGroupExpression(Optional<GroupExpression> groupExpression) 
{
+        return new PhysicalDeferMaterializeOlapScan(physicalOlapScan, 
deferMaterializeSlotIds, columnIdSlot,
+                groupExpression, getLogicalProperties(), physicalProperties, 
statistics);
+    }
+
+    @Override
+    public Plan withGroupExprLogicalPropChildren(Optional<GroupExpression> 
groupExpression,
+            Optional<LogicalProperties> logicalProperties, List<Plan> 
children) {
+        return new PhysicalDeferMaterializeOlapScan(physicalOlapScan, 
deferMaterializeSlotIds, columnIdSlot,
+                groupExpression, logicalProperties.get(), physicalProperties, 
statistics);
+    }
+
+    @Override
+    public PhysicalPlan withPhysicalPropertiesAndStats(PhysicalProperties 
physicalProperties, Statistics statistics) {
+        return new PhysicalDeferMaterializeOlapScan(physicalOlapScan, 
deferMaterializeSlotIds, columnIdSlot,
+                groupExpression, getLogicalProperties(), physicalProperties, 
statistics);
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        if (!super.equals(o)) {
+            return false;
+        }
+        PhysicalDeferMaterializeOlapScan that = 
(PhysicalDeferMaterializeOlapScan) o;
+        return Objects.equals(physicalOlapScan, that.physicalOlapScan) && 
Objects.equals(
+                deferMaterializeSlotIds, that.deferMaterializeSlotIds) && 
Objects.equals(columnIdSlot,
+                that.columnIdSlot);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(super.hashCode(), physicalOlapScan, 
deferMaterializeSlotIds, columnIdSlot);
+    }
+
+    @Override
+    public String toString() {
+        return Utils.toSqlString("PhysicalDeferMaterializeOlapScan[" + 
id.asInt() + "]",
+                "physicalOlapScan", physicalOlapScan,
+                "deferMaterializeSlotIds", deferMaterializeSlotIds,
+                "columnIdSlot", columnIdSlot
+            );
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalDeferMaterializeResultSink.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalDeferMaterializeResultSink.java
new file mode 100644
index 0000000000..ee07133006
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalDeferMaterializeResultSink.java
@@ -0,0 +1,167 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.trees.plans.physical;
+
+import org.apache.doris.catalog.OlapTable;
+import org.apache.doris.nereids.memo.GroupExpression;
+import org.apache.doris.nereids.properties.LogicalProperties;
+import org.apache.doris.nereids.properties.PhysicalProperties;
+import org.apache.doris.nereids.trees.expressions.Expression;
+import org.apache.doris.nereids.trees.expressions.NamedExpression;
+import org.apache.doris.nereids.trees.expressions.Slot;
+import org.apache.doris.nereids.trees.plans.Plan;
+import org.apache.doris.nereids.trees.plans.algebra.Sink;
+import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
+import org.apache.doris.nereids.util.Utils;
+import org.apache.doris.statistics.Statistics;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import org.jetbrains.annotations.Nullable;
+
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+
+/**
+ * use for defer materialize top n
+ */
+public class PhysicalDeferMaterializeResultSink<CHILD_TYPE extends Plan>
+        extends PhysicalSink<CHILD_TYPE> implements Sink {
+
+    private final PhysicalResultSink<? extends Plan> physicalResultSink;
+    private final OlapTable olapTable;
+    private final long selectedIndexId;
+
+    public PhysicalDeferMaterializeResultSink(PhysicalResultSink<? extends 
Plan> physicalResultSink,
+            OlapTable olapTable, long selectedIndexId,
+            Optional<GroupExpression> groupExpression, LogicalProperties 
logicalProperties,
+            CHILD_TYPE child) {
+        this(physicalResultSink, olapTable, selectedIndexId,
+                groupExpression, logicalProperties, PhysicalProperties.GATHER, 
null, child);
+    }
+
+    public PhysicalDeferMaterializeResultSink(PhysicalResultSink<? extends 
Plan> physicalResultSink,
+            OlapTable olapTable, long selectedIndexId,
+            Optional<GroupExpression> groupExpression, LogicalProperties 
logicalProperties,
+            @Nullable PhysicalProperties physicalProperties, Statistics 
statistics,
+            CHILD_TYPE child) {
+        super(physicalResultSink.getType(), groupExpression, 
logicalProperties, physicalProperties, statistics, child);
+        this.physicalResultSink = physicalResultSink;
+        this.olapTable = olapTable;
+        this.selectedIndexId = selectedIndexId;
+    }
+
+    public PhysicalResultSink<? extends Plan> getPhysicalResultSink() {
+        return physicalResultSink;
+    }
+
+    public OlapTable getOlapTable() {
+        return olapTable;
+    }
+
+    public long getSelectedIndexId() {
+        return selectedIndexId;
+    }
+
+    @Override
+    public Plan withChildren(List<Plan> children) {
+        Preconditions.checkArgument(children.size() == 1,
+                "PhysicalDeferMaterializeResultSink's children size must be 1, 
but real is %s", children.size());
+        return new PhysicalDeferMaterializeResultSink<>(
+                
physicalResultSink.withChildren(ImmutableList.of(children.get(0))),
+                olapTable, selectedIndexId, groupExpression, 
getLogicalProperties(),
+                physicalProperties, statistics, children.get(0));
+    }
+
+    @Override
+    public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
+        return visitor.visitPhysicalDeferMaterializeResultSink(this, context);
+    }
+
+    @Override
+    public List<? extends Expression> getExpressions() {
+        return physicalResultSink.getExpressions();
+    }
+
+    @Override
+    public Plan withGroupExpression(Optional<GroupExpression> groupExpression) 
{
+        return new PhysicalDeferMaterializeResultSink<>(physicalResultSink, 
olapTable, selectedIndexId,
+                groupExpression, getLogicalProperties(), physicalProperties, 
statistics, child());
+    }
+
+    @Override
+    public Plan withGroupExprLogicalPropChildren(Optional<GroupExpression> 
groupExpression,
+            Optional<LogicalProperties> logicalProperties, List<Plan> 
children) {
+        Preconditions.checkArgument(children.size() == 1,
+                "PhysicalDeferMaterializeResultSink's children size must be 1, 
but real is %s", children.size());
+        return new PhysicalDeferMaterializeResultSink<>(
+                
physicalResultSink.withChildren(ImmutableList.of(children.get(0))),
+                olapTable, selectedIndexId, groupExpression, 
logicalProperties.get(),
+                physicalProperties, statistics, children.get(0));
+    }
+
+    @Override
+    public PhysicalPlan withPhysicalPropertiesAndStats(PhysicalProperties 
physicalProperties, Statistics statistics) {
+        return new PhysicalDeferMaterializeResultSink<>(physicalResultSink, 
olapTable, selectedIndexId,
+                groupExpression, getLogicalProperties(), physicalProperties, 
statistics, child());
+    }
+
+    @Override
+    public List<Slot> computeOutput() {
+        return physicalResultSink.getOutputExprs().stream()
+                .map(NamedExpression::toSlot)
+                .collect(ImmutableList.toImmutableList());
+    }
+
+    @Override
+    public PhysicalDeferMaterializeResultSink<CHILD_TYPE> 
resetLogicalProperties() {
+        return new PhysicalDeferMaterializeResultSink<>(physicalResultSink, 
olapTable, selectedIndexId,
+                groupExpression, null, physicalProperties, statistics, 
child());
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        if (!super.equals(o)) {
+            return false;
+        }
+        PhysicalDeferMaterializeResultSink<?> that = 
(PhysicalDeferMaterializeResultSink<?>) o;
+        return selectedIndexId == that.selectedIndexId && 
Objects.equals(physicalResultSink,
+                that.physicalResultSink) && Objects.equals(olapTable, 
that.olapTable);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(super.hashCode(), physicalResultSink, olapTable, 
selectedIndexId);
+    }
+
+    @Override
+    public String toString() {
+        return Utils.toSqlString("PhysicalDeferMaterializeResultSink[" + 
id.asInt() + "]",
+                "physicalResultSink", physicalResultSink,
+                "olapTable", olapTable,
+                "selectedIndexId", selectedIndexId
+        );
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalDeferMaterializeTopN.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalDeferMaterializeTopN.java
new file mode 100644
index 0000000000..2c2a53761a
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalDeferMaterializeTopN.java
@@ -0,0 +1,176 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.trees.plans.physical;
+
+import org.apache.doris.nereids.memo.GroupExpression;
+import org.apache.doris.nereids.properties.LogicalProperties;
+import org.apache.doris.nereids.properties.PhysicalProperties;
+import org.apache.doris.nereids.trees.expressions.ExprId;
+import org.apache.doris.nereids.trees.expressions.Slot;
+import org.apache.doris.nereids.trees.expressions.SlotReference;
+import org.apache.doris.nereids.trees.plans.Plan;
+import org.apache.doris.nereids.trees.plans.algebra.TopN;
+import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
+import org.apache.doris.nereids.util.Utils;
+import org.apache.doris.statistics.Statistics;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+
+/**
+ * use for defer materialize top n
+ */
+public class PhysicalDeferMaterializeTopN<CHILD_TYPE extends Plan>
+        extends AbstractPhysicalSort<CHILD_TYPE> implements TopN {
+
+    private final PhysicalTopN<? extends Plan> physicalTopN;
+
+    ///////////////////////////////////////////////////////////////////////////
+    // Members for defer materialize for top-n opt.
+    ///////////////////////////////////////////////////////////////////////////
+    private final Set<ExprId> deferMaterializeSlotIds;
+    private final SlotReference columnIdSlot;
+
+    public PhysicalDeferMaterializeTopN(PhysicalTopN<? extends Plan> 
physicalTopN,
+            Set<ExprId> deferMaterializeSlotIds, SlotReference columnIdSlot,
+            Optional<GroupExpression> groupExpression, LogicalProperties 
logicalProperties, CHILD_TYPE child) {
+        this(physicalTopN, deferMaterializeSlotIds, columnIdSlot,
+                groupExpression, logicalProperties, null, null, child);
+    }
+
+    public PhysicalDeferMaterializeTopN(PhysicalTopN<? extends Plan> 
physicalTopN,
+            Set<ExprId> deferMaterializeSlotIds, SlotReference columnIdSlot,
+            Optional<GroupExpression> groupExpression, LogicalProperties 
logicalProperties,
+            PhysicalProperties physicalProperties, Statistics statistics, 
CHILD_TYPE child) {
+        super(physicalTopN.getType(), physicalTopN.getOrderKeys(), 
physicalTopN.getSortPhase(),
+                groupExpression, logicalProperties, physicalProperties, 
statistics, child);
+        this.physicalTopN = physicalTopN;
+        this.deferMaterializeSlotIds = deferMaterializeSlotIds;
+        this.columnIdSlot = columnIdSlot;
+    }
+
+    public PhysicalTopN<? extends Plan> getPhysicalTopN() {
+        return physicalTopN;
+    }
+
+    public Set<ExprId> getDeferMaterializeSlotIds() {
+        return deferMaterializeSlotIds;
+    }
+
+    public SlotReference getColumnIdSlot() {
+        return columnIdSlot;
+    }
+
+    @Override
+    public long getOffset() {
+        return physicalTopN.getOffset();
+    }
+
+    @Override
+    public long getLimit() {
+        return physicalTopN.getLimit();
+    }
+
+    public Plan withPhysicalTopN(PhysicalTopN<? extends Plan> physicalTopN) {
+        return new PhysicalDeferMaterializeTopN<>(physicalTopN, 
deferMaterializeSlotIds, columnIdSlot, groupExpression,
+                getLogicalProperties(), physicalProperties, statistics, 
physicalTopN.child());
+    }
+
+    @Override
+    public Plan withChildren(List<Plan> children) {
+        Preconditions.checkArgument(children.size() == 1,
+                "PhysicalDeferMaterializeTopN's children size must be 1, but 
real is %s", children.size());
+        return new 
PhysicalDeferMaterializeTopN<>(physicalTopN.withChildren(ImmutableList.of(children.get(0))),
+                deferMaterializeSlotIds, columnIdSlot, groupExpression, 
getLogicalProperties(),
+                physicalProperties, statistics, children.get(0));
+    }
+
+    @Override
+    public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
+        return visitor.visitPhysicalDeferMaterializeTopN(this, context);
+    }
+
+    @Override
+    public Plan withGroupExpression(Optional<GroupExpression> groupExpression) 
{
+        return new PhysicalDeferMaterializeTopN<>(physicalTopN, 
deferMaterializeSlotIds, columnIdSlot,
+                groupExpression, getLogicalProperties(), physicalProperties, 
statistics, child());
+    }
+
+    @Override
+    public Plan withGroupExprLogicalPropChildren(Optional<GroupExpression> 
groupExpression,
+            Optional<LogicalProperties> logicalProperties, List<Plan> 
children) {
+        Preconditions.checkArgument(children.size() == 1,
+                "PhysicalDeferMaterializeTopN's children size must be 1, but 
real is %s", children.size());
+        return new 
PhysicalDeferMaterializeTopN<>(physicalTopN.withChildren(ImmutableList.of(children.get(0))),
+                deferMaterializeSlotIds, columnIdSlot, groupExpression, 
logicalProperties.get(),
+                physicalProperties, statistics, children.get(0));
+    }
+
+    @Override
+    public PhysicalPlan withPhysicalPropertiesAndStats(PhysicalProperties 
physicalProperties, Statistics statistics) {
+        return new PhysicalDeferMaterializeTopN<>(physicalTopN, 
deferMaterializeSlotIds, columnIdSlot,
+                groupExpression, getLogicalProperties(), physicalProperties, 
statistics, child());
+    }
+
+    @Override
+    public List<Slot> computeOutput() {
+        return child().getOutput();
+    }
+
+    @Override
+    public PhysicalDeferMaterializeTopN<CHILD_TYPE> resetLogicalProperties() {
+        return new PhysicalDeferMaterializeTopN<>(physicalTopN, 
deferMaterializeSlotIds, columnIdSlot,
+                groupExpression, null, physicalProperties, statistics, 
child());
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        if (!super.equals(o)) {
+            return false;
+        }
+        PhysicalDeferMaterializeTopN<?> that = 
(PhysicalDeferMaterializeTopN<?>) o;
+        return Objects.equals(physicalTopN, that.physicalTopN) && 
Objects.equals(
+                deferMaterializeSlotIds, that.deferMaterializeSlotIds) && 
Objects.equals(columnIdSlot,
+                that.columnIdSlot);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(super.hashCode(), physicalTopN, 
deferMaterializeSlotIds, columnIdSlot);
+    }
+
+    @Override
+    public String toString() {
+        return Utils.toSqlString("PhysicalDeferMaterializeTopN[" + id.asInt() 
+ "]",
+                "physicalTopN", physicalTopN,
+                "deferMaterializeSlotIds", deferMaterializeSlotIds,
+                "columnIdSlot", columnIdSlot
+        );
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalFileSink.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalFileSink.java
index 8d33bc367c..56ff9aff08 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalFileSink.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalFileSink.java
@@ -53,10 +53,7 @@ public class PhysicalFileSink<CHILD_TYPE extends Plan> 
extends PhysicalSink<CHIL
     public PhysicalFileSink(String filePath, String format, Map<String, 
String> properties,
             Optional<GroupExpression> groupExpression, LogicalProperties 
logicalProperties,
             CHILD_TYPE child) {
-        super(PlanType.PHYSICAL_FILE_SINK, groupExpression, logicalProperties, 
child);
-        this.filePath = filePath;
-        this.format = format;
-        this.properties = properties;
+        this(filePath, format, properties, groupExpression, logicalProperties, 
PhysicalProperties.GATHER, null, child);
     }
 
     public PhysicalFileSink(String filePath, String format, Map<String, 
String> properties,
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalOlapScan.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalOlapScan.java
index 90bc9093cb..8462705c54 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalOlapScan.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalOlapScan.java
@@ -45,8 +45,6 @@ import java.util.Optional;
  */
 public class PhysicalOlapScan extends PhysicalCatalogRelation implements 
OlapScan {
 
-    public static final String DEFERRED_MATERIALIZED_SLOTS = 
"deferred_materialized_slots";
-
     private final DistributionSpec distributionSpec;
     private final long selectedIndexId;
     private final ImmutableList<Long> selectedTabletIds;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalOlapTableSink.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalOlapTableSink.java
index 3ad20dcb80..7327c7b6af 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalOlapTableSink.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalOlapTableSink.java
@@ -57,26 +57,14 @@ public class PhysicalOlapTableSink<CHILD_TYPE extends Plan> 
extends PhysicalSink
     private final boolean singleReplicaLoad;
     private final boolean isPartialUpdate;
 
-    public PhysicalOlapTableSink(Database database, OlapTable targetTable, 
List<Long> partitionIds,
-            List<Column> cols, boolean singleReplicaLoad, boolean 
isPartialUpdate, LogicalProperties logicalProperties,
-            CHILD_TYPE child) {
-        this(database, targetTable, partitionIds, cols, singleReplicaLoad, 
isPartialUpdate,
-                Optional.empty(), logicalProperties, child);
-    }
-
     /**
      * Constructor
      */
-    public PhysicalOlapTableSink(Database database, OlapTable targetTable, 
List<Long> partitionIds,
-            List<Column> cols, boolean singleReplicaLoad, boolean 
isPartialUpdate,
-            Optional<GroupExpression> groupExpression, LogicalProperties 
logicalProperties, CHILD_TYPE child) {
-        super(PlanType.PHYSICAL_OLAP_TABLE_SINK, groupExpression, 
logicalProperties, child);
-        this.database = Objects.requireNonNull(database, "database != null in 
PhysicalOlapTableSink");
-        this.targetTable = Objects.requireNonNull(targetTable, "targetTable != 
null in PhysicalOlapTableSink");
-        this.cols = Utils.copyRequiredList(cols);
-        this.partitionIds = Utils.copyRequiredList(partitionIds);
-        this.singleReplicaLoad = singleReplicaLoad;
-        this.isPartialUpdate = isPartialUpdate;
+    public PhysicalOlapTableSink(Database database, OlapTable targetTable, 
List<Long> partitionIds, List<Column> cols,
+            boolean singleReplicaLoad, boolean isPartialUpdate, 
Optional<GroupExpression> groupExpression,
+            LogicalProperties logicalProperties, CHILD_TYPE child) {
+        this(database, targetTable, partitionIds, cols, singleReplicaLoad, 
isPartialUpdate,
+                groupExpression, logicalProperties, PhysicalProperties.GATHER, 
null, child);
     }
 
     /**
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalRelation.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalRelation.java
index cb1750d8c1..7dacd28db6 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalRelation.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalRelation.java
@@ -24,7 +24,6 @@ import org.apache.doris.nereids.trees.expressions.Expression;
 import org.apache.doris.nereids.trees.plans.PlanType;
 import org.apache.doris.nereids.trees.plans.RelationId;
 import org.apache.doris.nereids.trees.plans.algebra.Relation;
-import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
 import org.apache.doris.statistics.Statistics;
 
 import com.google.common.collect.ImmutableList;
@@ -76,11 +75,6 @@ public abstract class PhysicalRelation extends PhysicalLeaf 
implements Relation
         return Objects.hash(relationId);
     }
 
-    @Override
-    public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
-        return visitor.visitPhysicalRelation(this, context);
-    }
-
     @Override
     public List<? extends Expression> getExpressions() {
         return ImmutableList.of();
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalResultSink.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalResultSink.java
index c960274f2d..9126ccc3ba 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalResultSink.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalResultSink.java
@@ -45,16 +45,9 @@ public class PhysicalResultSink<CHILD_TYPE extends Plan> 
extends PhysicalSink<CH
 
     private final List<NamedExpression> outputExprs;
 
-    public PhysicalResultSink(List<NamedExpression> outputExprs, 
LogicalProperties logicalProperties,
-            CHILD_TYPE child) {
-        super(PlanType.PHYSICAL_RESULT_SINK, logicalProperties, child);
-        this.outputExprs = outputExprs;
-    }
-
     public PhysicalResultSink(List<NamedExpression> outputExprs, 
Optional<GroupExpression> groupExpression,
             LogicalProperties logicalProperties, CHILD_TYPE child) {
-        super(PlanType.PHYSICAL_RESULT_SINK, groupExpression, 
logicalProperties, child);
-        this.outputExprs = outputExprs;
+        this(outputExprs, groupExpression, logicalProperties, 
PhysicalProperties.GATHER, null, child);
     }
 
     public PhysicalResultSink(List<NamedExpression> outputExprs, 
Optional<GroupExpression> groupExpression,
@@ -64,11 +57,16 @@ public class PhysicalResultSink<CHILD_TYPE extends Plan> 
extends PhysicalSink<CH
         this.outputExprs = outputExprs;
     }
 
+    public List<NamedExpression> getOutputExprs() {
+        return outputExprs;
+    }
+
     @Override
     public PhysicalResultSink<Plan> withChildren(List<Plan> children) {
         Preconditions.checkArgument(children.size() == 1,
                 "PhysicalResultSink's children size must be 1, but real is 
%s", children.size());
-        return new PhysicalResultSink<>(outputExprs, groupExpression, 
getLogicalProperties(), children.get(0));
+        return new PhysicalResultSink<>(outputExprs, groupExpression, 
getLogicalProperties(),
+                physicalProperties, statistics, children.get(0));
     }
 
     @Override
@@ -83,13 +81,17 @@ public class PhysicalResultSink<CHILD_TYPE extends Plan> 
extends PhysicalSink<CH
 
     @Override
     public PhysicalResultSink<Plan> 
withGroupExpression(Optional<GroupExpression> groupExpression) {
-        return new PhysicalResultSink<>(outputExprs, groupExpression, 
getLogicalProperties(), child());
+        return new PhysicalResultSink<>(outputExprs, groupExpression, 
getLogicalProperties(),
+                physicalProperties, statistics, child());
     }
 
     @Override
     public PhysicalResultSink<Plan> 
withGroupExprLogicalPropChildren(Optional<GroupExpression> groupExpression,
             Optional<LogicalProperties> logicalProperties, List<Plan> 
children) {
-        return new PhysicalResultSink<>(outputExprs, groupExpression, 
logicalProperties.get(), child());
+        Preconditions.checkArgument(children.size() == 1,
+                "PhysicalResultSink's children size must be 1, but real is 
%s", children.size());
+        return new PhysicalResultSink<>(outputExprs, groupExpression, 
logicalProperties.get(),
+                physicalProperties, statistics, children.get(0));
     }
 
     @Override
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalSink.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalSink.java
index be6837b0a5..d9dd45d208 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalSink.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalSink.java
@@ -30,18 +30,6 @@ import java.util.Optional;
 
 /** abstract physical sink */
 public abstract class PhysicalSink<CHILD_TYPE extends Plan> extends 
PhysicalUnary<CHILD_TYPE> {
-
-    public PhysicalSink(PlanType type,
-            LogicalProperties logicalProperties, CHILD_TYPE child) {
-        super(type, logicalProperties, child);
-    }
-
-    public PhysicalSink(PlanType type,
-            Optional<GroupExpression> groupExpression,
-            LogicalProperties logicalProperties, CHILD_TYPE child) {
-        super(type, groupExpression, logicalProperties, child);
-    }
-
     public PhysicalSink(PlanType type,
             Optional<GroupExpression> groupExpression,
             LogicalProperties logicalProperties,
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalTopN.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalTopN.java
index 0f1d0069b3..7df18fd010 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalTopN.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalTopN.java
@@ -109,7 +109,8 @@ public class PhysicalTopN<CHILD_TYPE extends Plan> extends 
AbstractPhysicalSort<
 
     @Override
     public PhysicalTopN<Plan> withChildren(List<Plan> children) {
-        Preconditions.checkArgument(children.size() == 1);
+        Preconditions.checkArgument(children.size() == 1,
+                "PhysicalTopN's children size must be 1, but real is %s", 
children.size());
         return new PhysicalTopN<>(orderKeys, limit, offset, phase, 
groupExpression,
                 getLogicalProperties(), physicalProperties, statistics, 
children.get(0));
     }
@@ -122,7 +123,8 @@ public class PhysicalTopN<CHILD_TYPE extends Plan> extends 
AbstractPhysicalSort<
     @Override
     public Plan withGroupExprLogicalPropChildren(Optional<GroupExpression> 
groupExpression,
             Optional<LogicalProperties> logicalProperties, List<Plan> 
children) {
-        Preconditions.checkArgument(children.size() == 1);
+        Preconditions.checkArgument(children.size() == 1,
+                "PhysicalTopN's children size must be 1, but real is %s", 
children.size());
         return new PhysicalTopN<>(orderKeys, limit, offset, phase, 
groupExpression, logicalProperties.get(),
                 children.get(0));
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/PlanVisitor.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/PlanVisitor.java
index c1077f7a7e..0382948185 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/PlanVisitor.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/PlanVisitor.java
@@ -28,6 +28,7 @@ import 
org.apache.doris.nereids.trees.plans.logical.LogicalCTEAnchor;
 import org.apache.doris.nereids.trees.plans.logical.LogicalCTEConsumer;
 import org.apache.doris.nereids.trees.plans.logical.LogicalCTEProducer;
 import org.apache.doris.nereids.trees.plans.logical.LogicalCheckPolicy;
+import 
org.apache.doris.nereids.trees.plans.logical.LogicalDeferMaterializeTopN;
 import org.apache.doris.nereids.trees.plans.logical.LogicalExcept;
 import org.apache.doris.nereids.trees.plans.logical.LogicalFilter;
 import org.apache.doris.nereids.trees.plans.logical.LogicalGenerate;
@@ -53,6 +54,7 @@ import 
org.apache.doris.nereids.trees.plans.physical.PhysicalAssertNumRows;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalCTEAnchor;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalCTEConsumer;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalCTEProducer;
+import 
org.apache.doris.nereids.trees.plans.physical.PhysicalDeferMaterializeTopN;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalDistribute;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalExcept;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalFilter;
@@ -225,6 +227,10 @@ public abstract class PlanVisitor<R, C> implements 
CommandVisitor<R, C>, Relatio
         return visit(topN, context);
     }
 
+    public R visitLogicalDeferMaterializeTopN(LogicalDeferMaterializeTopN<? 
extends Plan> topN, C context) {
+        return visit(topN, context);
+    }
+
     public R visitLogicalWindow(LogicalWindow<? extends Plan> window, C 
context) {
         return visit(window, context);
     }
@@ -323,6 +329,10 @@ public abstract class PlanVisitor<R, C> implements 
CommandVisitor<R, C>, Relatio
         return visitAbstractPhysicalSort(topN, context);
     }
 
+    public R visitPhysicalDeferMaterializeTopN(PhysicalDeferMaterializeTopN<? 
extends Plan> topN, C context) {
+        return visitAbstractPhysicalSort(topN, context);
+    }
+
     public R visitPhysicalWindow(PhysicalWindow<? extends Plan> window, C 
context) {
         return visit(window, context);
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/RelationVisitor.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/RelationVisitor.java
index af65f43d9d..65a0350502 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/RelationVisitor.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/RelationVisitor.java
@@ -20,6 +20,7 @@ package org.apache.doris.nereids.trees.plans.visitor;
 import org.apache.doris.nereids.analyzer.UnboundOneRowRelation;
 import org.apache.doris.nereids.analyzer.UnboundRelation;
 import org.apache.doris.nereids.analyzer.UnboundTVFRelation;
+import 
org.apache.doris.nereids.trees.plans.logical.LogicalDeferMaterializeOlapScan;
 import org.apache.doris.nereids.trees.plans.logical.LogicalEmptyRelation;
 import org.apache.doris.nereids.trees.plans.logical.LogicalEsScan;
 import org.apache.doris.nereids.trees.plans.logical.LogicalFileScan;
@@ -29,6 +30,7 @@ import 
org.apache.doris.nereids.trees.plans.logical.LogicalOneRowRelation;
 import org.apache.doris.nereids.trees.plans.logical.LogicalRelation;
 import org.apache.doris.nereids.trees.plans.logical.LogicalSchemaScan;
 import org.apache.doris.nereids.trees.plans.logical.LogicalTVFRelation;
+import 
org.apache.doris.nereids.trees.plans.physical.PhysicalDeferMaterializeOlapScan;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalEmptyRelation;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalEsScan;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalFileScan;
@@ -92,6 +94,11 @@ public interface RelationVisitor<R, C> {
         return visitLogicalRelation(olapScan, context);
     }
 
+    default R visitLogicalDeferMaterializeOlapScan(
+            LogicalDeferMaterializeOlapScan deferMaterializeOlapScan, C 
context) {
+        return visitLogicalRelation(deferMaterializeOlapScan, context);
+    }
+
     default R visitLogicalOneRowRelation(LogicalOneRowRelation oneRowRelation, 
C context) {
         return visitLogicalRelation(oneRowRelation, context);
     }
@@ -128,6 +135,11 @@ public interface RelationVisitor<R, C> {
         return visitPhysicalRelation(olapScan, context);
     }
 
+    default R visitPhysicalDeferMaterializeOlapScan(
+            PhysicalDeferMaterializeOlapScan deferMaterializeOlapScan, C 
context) {
+        return visitPhysicalRelation(deferMaterializeOlapScan, context);
+    }
+
     default R visitPhysicalOneRowRelation(PhysicalOneRowRelation 
oneRowRelation, C context) {
         return visitPhysicalRelation(oneRowRelation, context);
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/SinkVisitor.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/SinkVisitor.java
index 63f371aecb..df790fddd2 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/SinkVisitor.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/SinkVisitor.java
@@ -20,10 +20,12 @@ package org.apache.doris.nereids.trees.plans.visitor;
 import org.apache.doris.nereids.analyzer.UnboundOlapTableSink;
 import org.apache.doris.nereids.analyzer.UnboundResultSink;
 import org.apache.doris.nereids.trees.plans.Plan;
+import 
org.apache.doris.nereids.trees.plans.logical.LogicalDeferMaterializeResultSink;
 import org.apache.doris.nereids.trees.plans.logical.LogicalFileSink;
 import org.apache.doris.nereids.trees.plans.logical.LogicalOlapTableSink;
 import org.apache.doris.nereids.trees.plans.logical.LogicalResultSink;
 import org.apache.doris.nereids.trees.plans.logical.LogicalSink;
+import 
org.apache.doris.nereids.trees.plans.physical.PhysicalDeferMaterializeResultSink;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalFileSink;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapTableSink;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalResultSink;
@@ -70,6 +72,11 @@ public interface SinkVisitor<R, C> {
         return visitLogicalSink(logicalResultSink, context);
     }
 
+    default R visitLogicalDeferMaterializeResultSink(
+            LogicalDeferMaterializeResultSink<? extends Plan> 
logicalDeferMaterializeResultSink, C context) {
+        return visitLogicalSink(logicalDeferMaterializeResultSink, context);
+    }
+
     // *******************************
     // physical
     // *******************************
@@ -85,4 +92,9 @@ public interface SinkVisitor<R, C> {
     default R visitPhysicalResultSink(PhysicalResultSink<? extends Plan> 
physicalResultSink, C context) {
         return visitPhysicalSink(physicalResultSink, context);
     }
+
+    default R visitPhysicalDeferMaterializeResultSink(
+            PhysicalDeferMaterializeResultSink<? extends Plan> sink, C 
context) {
+        return visitPhysicalSink(sink, context);
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/ExchangeNode.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/ExchangeNode.java
index f040af0042..bde891a835 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/ExchangeNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/ExchangeNode.java
@@ -147,6 +147,10 @@ public class ExchangeNode extends PlanNode {
         }
     }
 
+    public SortInfo getMergeInfo() {
+        return mergeInfo;
+    }
+
     /**
      * Set the parameters used to merge sorted input streams. This can be 
called
      * after init().
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java
index 7b3e95c3a0..7fdf7eda2d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java
@@ -1281,33 +1281,12 @@ public class OlapScanNode extends ScanNode {
         return shouldColoScan;
     }
 
-    public void getColumnDesc(List<TColumn> columnsDesc, List<String> 
keyColumnNames,
-                        List<TPrimitiveType> keyColumnTypes) {
-        if (selectedIndexId != -1) {
-            for (Column col : olapTable.getSchemaByIndexId(selectedIndexId, 
true)) {
-                TColumn tColumn = col.toThrift();
-                col.setIndexFlag(tColumn, olapTable);
-                if (columnsDesc != null) {
-                    columnsDesc.add(tColumn);
-                }
-                if ((Util.showHiddenColumns() || (!Util.showHiddenColumns() && 
col.isVisible())) && col.isKey()) {
-                    if (keyColumnNames != null) {
-                        keyColumnNames.add(col.getName());
-                    }
-                    if (keyColumnTypes != null) {
-                        keyColumnTypes.add(col.getDataType().toThrift());
-                    }
-                }
-            }
-        }
-    }
-
     @Override
     protected void toThrift(TPlanNode msg) {
         List<String> keyColumnNames = new ArrayList<String>();
         List<TPrimitiveType> keyColumnTypes = new ArrayList<TPrimitiveType>();
         List<TColumn> columnsDesc = new ArrayList<TColumn>();
-        getColumnDesc(columnsDesc, keyColumnNames, keyColumnTypes);
+        olapTable.getColumnDesc(selectedIndexId, columnsDesc, keyColumnNames, 
keyColumnTypes);
         List<TOlapTableIndex> indexDesc = Lists.newArrayList();
 
         // Add extra row id column
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/OriginalPlanner.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/OriginalPlanner.java
index d5a3b7e6e6..08af81afbb 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/OriginalPlanner.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OriginalPlanner.java
@@ -51,7 +51,6 @@ import org.apache.doris.mysql.privilege.PrivPredicate;
 import org.apache.doris.qe.ConnectContext;
 import org.apache.doris.rewrite.mvrewrite.MVSelectFailedException;
 import org.apache.doris.statistics.query.StatsDelta;
-import org.apache.doris.thrift.TColumn;
 import org.apache.doris.thrift.TFetchOption;
 import org.apache.doris.thrift.TQueryOptions;
 import org.apache.doris.thrift.TRuntimeFilterMode;
@@ -501,17 +500,7 @@ public class OriginalPlanner extends Planner {
         }
         for (PlanFragment fragment : fragments) {
             if (injected && fragment.getSink() instanceof ResultSink) {
-                TFetchOption fetchOption = new TFetchOption();
-                fetchOption.setFetchRowStore(olapTable.storeRowColumn());
-                fetchOption.setUseTwoPhaseFetch(true);
-                
fetchOption.setNodesInfo(Env.getCurrentSystemInfo().createAliveNodesInfo());
-                // TODO for row store used seperate more faster path for wide 
tables
-                if (!olapTable.storeRowColumn()) {
-                    // Set column desc for each column
-                    List<TColumn> columnsDesc = new ArrayList<TColumn>();
-                    scanNode.getColumnDesc(columnsDesc, null, null);
-                    fetchOption.setColumnDesc(columnsDesc);
-                }
+                TFetchOption fetchOption = 
olapTable.generateTwoPhaseReadOption(scanNode.getSelectedIndexId());
                 ((ResultSink) fragment.getSink()).setFetchOption(fetchOption);
                 break;
             }
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/nereids/postprocess/TopNRuntimeFilterTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/nereids/postprocess/TopNRuntimeFilterTest.java
index b38f4876e3..944ebcf3e8 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/nereids/postprocess/TopNRuntimeFilterTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/nereids/postprocess/TopNRuntimeFilterTest.java
@@ -19,6 +19,8 @@ package org.apache.doris.nereids.postprocess;
 
 import org.apache.doris.nereids.datasets.ssb.SSBTestBase;
 import org.apache.doris.nereids.processor.post.PlanPostProcessors;
+import org.apache.doris.nereids.trees.plans.Plan;
+import 
org.apache.doris.nereids.trees.plans.physical.PhysicalDeferMaterializeTopN;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalPlan;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalTopN;
 import org.apache.doris.nereids.util.PlanChecker;
@@ -39,10 +41,12 @@ public class TopNRuntimeFilterTest extends SSBTestBase {
                 .rewrite()
                 .implement();
         PhysicalPlan plan = checker.getPhysicalPlan();
-        plan = new 
PlanPostProcessors(checker.getCascadesContext()).process(plan);
-        Assertions.assertTrue(plan.children().get(0).child(0) instanceof 
PhysicalTopN);
-        PhysicalTopN localTopN = (PhysicalTopN) 
plan.children().get(0).child(0);
-        
Assertions.assertTrue(localTopN.getMutableState(PhysicalTopN.TOPN_RUNTIME_FILTER).isPresent());
+        new PlanPostProcessors(checker.getCascadesContext()).process(plan);
+        Assertions.assertTrue(plan.children().get(0).child(0) instanceof 
PhysicalDeferMaterializeTopN);
+        PhysicalDeferMaterializeTopN<? extends Plan> localTopN
+                = (PhysicalDeferMaterializeTopN<? extends Plan>) 
plan.child(0).child(0);
+        Assertions.assertTrue(localTopN.getPhysicalTopN()
+                
.getMutableState(PhysicalTopN.TOPN_RUNTIME_FILTER).isPresent());
     }
 
     // topn rf do not apply on string-like and float column
@@ -53,9 +57,11 @@ public class TopNRuntimeFilterTest extends SSBTestBase {
                 .rewrite()
                 .implement();
         PhysicalPlan plan = checker.getPhysicalPlan();
-        plan = new 
PlanPostProcessors(checker.getCascadesContext()).process(plan);
-        Assertions.assertTrue(plan.children().get(0) instanceof PhysicalTopN);
-        PhysicalTopN localTopN = (PhysicalTopN) plan.children().get(0);
-        
Assertions.assertFalse(localTopN.getMutableState(PhysicalTopN.TOPN_RUNTIME_FILTER).isPresent());
+        new PlanPostProcessors(checker.getCascadesContext()).process(plan);
+        Assertions.assertTrue(plan.children().get(0).child(0) instanceof 
PhysicalDeferMaterializeTopN);
+        PhysicalDeferMaterializeTopN<? extends Plan> localTopN
+                = (PhysicalDeferMaterializeTopN<? extends Plan>) 
plan.child(0).child(0);
+        Assertions.assertFalse(localTopN.getPhysicalTopN()
+                
.getMutableState(PhysicalTopN.TOPN_RUNTIME_FILTER).isPresent());
     }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to