This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 683d64b361 [Refactor](multi catalog)Remove redundant param context for
FileQueryScanNode (#18636)
683d64b361 is described below
commit 683d64b3619a52b29e9f8cf04285060fbabbc296
Author: Jibing-Li <[email protected]>
AuthorDate: Fri Apr 14 20:20:21 2023 +0800
[Refactor](multi catalog)Remove redundant param context for
FileQueryScanNode (#18636)
Remove redundant param context for FileQueryScanNode.
Remove duplicated code for QueryScanProviders.
---
.../org/apache/doris/planner/FileLoadScanNode.java | 28 +++++++-
.../doris/planner/external/ExternalScanNode.java | 6 --
.../doris/planner/external/FileGroupInfo.java | 3 +-
.../doris/planner/external/FileQueryScanNode.java | 82 +++++++++++++---------
.../doris/planner/external/FileScanNode.java | 44 ++++--------
.../doris/planner/external/FileScanProviderIf.java | 11 ++-
.../doris/planner/external/HiveScanProvider.java | 37 ----------
.../doris/planner/external/LoadScanProvider.java | 16 +++--
.../doris/planner/external/QueryScanProvider.java | 32 ++++++---
.../doris/planner/external/TVFScanProvider.java | 34 ---------
.../planner/external/iceberg/IcebergApiSource.java | 35 ---------
.../planner/external/iceberg/IcebergHMSSource.java | 6 --
.../external/iceberg/IcebergScanProvider.java | 6 --
.../planner/external/iceberg/IcebergSource.java | 3 -
.../account_p0/test_nereids_row_policy.groovy | 8 +--
15 files changed, 140 insertions(+), 211 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/FileLoadScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/FileLoadScanNode.java
index 7bd04e9030..4b8768cc8b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/FileLoadScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/FileLoadScanNode.java
@@ -62,11 +62,35 @@ import java.util.List;
import java.util.Map;
/**
- * FileLoadScanNode for broker node and stream load.
+ * FileLoadScanNode for broker load and stream load.
*/
public class FileLoadScanNode extends FileScanNode {
private static final Logger LOG =
LogManager.getLogger(FileLoadScanNode.class);
+ public static class ParamCreateContext {
+ public BrokerFileGroup fileGroup;
+ public List<Expr> conjuncts;
+
+ public TupleDescriptor destTupleDescriptor;
+ public Map<String, SlotDescriptor> destSlotDescByName;
+ // === Set when init ===
+ public TupleDescriptor srcTupleDescriptor;
+ public Map<String, SlotDescriptor> srcSlotDescByName;
+ public Map<String, Expr> exprMap;
+ public String timezone;
+ // === Set when init ===
+
+ public TFileScanRangeParams params;
+
+ public void createDestSlotMap() {
+ Preconditions.checkNotNull(destTupleDescriptor);
+ destSlotDescByName = Maps.newHashMap();
+ for (SlotDescriptor slot : destTupleDescriptor.getSlots()) {
+ destSlotDescByName.put(slot.getColumn().getName(), slot);
+ }
+ }
+ }
+
// Save all info about load attributes and files.
// Each DataDescription in a load stmt conreponding to a FileGroupInfo in
this list.
private final List<FileGroupInfo> fileGroupInfos = Lists.newArrayList();
@@ -188,7 +212,7 @@ public class FileLoadScanNode extends FileScanNode {
Preconditions.checkState(contexts.size() == scanProviders.size(),
contexts.size() + " vs. " + scanProviders.size());
for (int i = 0; i < contexts.size(); ++i) {
- FileScanNode.ParamCreateContext context = contexts.get(i);
+ FileLoadScanNode.ParamCreateContext context = contexts.get(i);
FileScanProviderIf scanProvider = scanProviders.get(i);
setDefaultValueExprs(scanProvider, context);
finalizeParamsForLoad(context, analyzer);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalScanNode.java
index 2fb3842b40..24137e1351 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalScanNode.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalScanNode.java
@@ -42,12 +42,6 @@ public abstract class ExternalScanNode extends ScanNode {
// set to false means this scan node does not need to check column priv.
protected boolean needCheckColumnPriv;
- // For explain
- protected long inputSplitsNum = 0;
- protected long totalFileSize = 0;
- protected long totalPartitionNum = 0;
- protected long readPartitionNum = 0;
-
// Final output of this file scan node
protected List<TScanRangeLocations> scanRangeLocations =
Lists.newArrayList();
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileGroupInfo.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileGroupInfo.java
index ca14046c76..39292c5e4c 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileGroupInfo.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileGroupInfo.java
@@ -29,6 +29,7 @@ import org.apache.doris.common.UserException;
import org.apache.doris.common.util.BrokerUtil;
import org.apache.doris.common.util.Util;
import org.apache.doris.load.BrokerFileGroup;
+import org.apache.doris.planner.FileLoadScanNode;
import org.apache.doris.system.Backend;
import org.apache.doris.thrift.TBrokerFileStatus;
import org.apache.doris.thrift.TExternalScanRange;
@@ -187,7 +188,7 @@ public class FileGroupInfo {
LOG.info("number instance of file scan node is: {}, bytes per
instance: {}", numInstances, bytesPerInstance);
}
- public void createScanRangeLocations(FileScanNode.ParamCreateContext
context,
+ public void createScanRangeLocations(FileLoadScanNode.ParamCreateContext
context,
FederationBackendPolicy backendPolicy,
List<TScanRangeLocations>
scanRangeLocations) throws UserException {
TScanRangeLocations curLocations = newLocations(context.params,
brokerDesc, backendPolicy);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java
index de523ea8e5..aa04d54f3c 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java
@@ -41,6 +41,7 @@ import
org.apache.doris.planner.external.iceberg.IcebergSource;
import org.apache.doris.statistics.StatisticalType;
import org.apache.doris.tablefunction.ExternalFileTableValuedFunction;
import org.apache.doris.thrift.TExpr;
+import org.apache.doris.thrift.TFileScanRangeParams;
import org.apache.doris.thrift.TFileScanSlotInfo;
import com.google.common.base.Preconditions;
@@ -54,7 +55,7 @@ import java.util.Map;
import java.util.Set;
/**
- * ExternalFileScanNode for the file access type of catalog, now only support
+ * FileQueryScanNode for querying the file access type of catalog, now only
support
* hive,hudi and iceberg.
*/
public class FileQueryScanNode extends FileScanNode {
@@ -62,8 +63,9 @@ public class FileQueryScanNode extends FileScanNode {
// For query, there is only one FileScanProvider.
private FileScanProviderIf scanProvider;
- // For query, there is only one ParamCreateContext.
- private ParamCreateContext context;
+
+ private Map<String, SlotDescriptor> destSlotDescByName;
+ private TFileScanRangeParams params;
/**
* External file scan node for Query hms table
@@ -92,7 +94,7 @@ public class FileQueryScanNode extends FileScanNode {
}
backendPolicy.init();
numNodes = backendPolicy.numBackends();
- initParamCreateContexts(analyzer);
+ initScanRangeParams();
}
/**
@@ -114,9 +116,7 @@ public class FileQueryScanNode extends FileScanNode {
backendPolicy.init();
numNodes = backendPolicy.numBackends();
- context = scanProvider.createContext(analyzer);
- context.createDestSlotMap();
- context.conjuncts = conjuncts;
+ initScanRangeParams();
}
/**
@@ -126,7 +126,7 @@ public class FileQueryScanNode extends FileScanNode {
@Override
public void updateRequiredSlots(PlanTranslatorContext
planTranslatorContext,
Set<SlotId> requiredByProjectSlotIdSet) throws UserException {
- context.params.unsetRequiredSlots();
+ params.unsetRequiredSlots();
for (SlotDescriptor slot : desc.getSlots()) {
if (!slot.isMaterialized()) {
continue;
@@ -135,7 +135,7 @@ public class FileQueryScanNode extends FileScanNode {
TFileScanSlotInfo slotInfo = new TFileScanSlotInfo();
slotInfo.setSlotId(slot.getId().asInt());
slotInfo.setIsFileSlot(!scanProvider.getPathPartitionKeys().contains(slot.getColumn().getName()));
- context.params.addToRequiredSlots(slotInfo);
+ params.addToRequiredSlots(slotInfo);
}
}
@@ -202,25 +202,41 @@ public class FileQueryScanNode extends FileScanNode {
scanProvider = new TVFScanProvider(table, desc, tvf);
}
- // Create a corresponding ParamCreateContext
- private void initParamCreateContexts(Analyzer analyzer) throws
UserException {
- context = scanProvider.createContext(analyzer);
- context.createDestSlotMap();
- context.conjuncts = conjuncts;
+ // Create a corresponding TFileScanRangeParams
+ private void initScanRangeParams() throws UserException {
+ Preconditions.checkNotNull(desc);
+ destSlotDescByName = Maps.newHashMap();
+ for (SlotDescriptor slot : desc.getSlots()) {
+ destSlotDescByName.put(slot.getColumn().getName(), slot);
+ }
+ params = new TFileScanRangeParams();
+ params.setDestTupleId(desc.getId().asInt());
+ List<String> partitionKeys = scanProvider.getPathPartitionKeys();
+ List<Column> columns = desc.getTable().getBaseSchema(false);
+ params.setNumOfColumnsFromFile(columns.size() - partitionKeys.size());
+ for (SlotDescriptor slot : desc.getSlots()) {
+ if (!slot.isMaterialized()) {
+ continue;
+ }
+ TFileScanSlotInfo slotInfo = new TFileScanSlotInfo();
+ slotInfo.setSlotId(slot.getId().asInt());
+
slotInfo.setIsFileSlot(!partitionKeys.contains(slot.getColumn().getName()));
+ params.addToRequiredSlots(slotInfo);
+ }
}
@Override
public void finalize(Analyzer analyzer) throws UserException {
- setDefaultValueExprs(scanProvider, context);
- setColumnPositionMappingForTextFile(scanProvider, context);
- context.params.setSrcTupleId(-1);
- createScanRangeLocations(context, scanProvider);
+ setDefaultValueExprs();
+ setColumnPositionMappingForTextFile();
+ params.setSrcTupleId(-1);
+ createScanRangeLocations(conjuncts, params, scanProvider);
this.inputSplitsNum += scanProvider.getInputSplitNum();
this.totalFileSize += scanProvider.getInputFileSize();
TableIf table = desc.getTable();
if (table instanceof HMSExternalTable) {
if (((HMSExternalTable)
table).getDlaType().equals(HMSExternalTable.DLAType.HIVE)) {
- genSlotToSchemaIdMap(context);
+ genSlotToSchemaIdMap();
}
}
if (scanProvider instanceof HiveScanProvider) {
@@ -231,16 +247,16 @@ public class FileQueryScanNode extends FileScanNode {
@Override
public void finalizeForNereids() throws UserException {
- setDefaultValueExprs(scanProvider, context);
- setColumnPositionMappingForTextFile(scanProvider, context);
- context.params.setSrcTupleId(-1);
- createScanRangeLocations(context, scanProvider);
+ setDefaultValueExprs();
+ setColumnPositionMappingForTextFile();
+ params.setSrcTupleId(-1);
+ createScanRangeLocations(conjuncts, params, scanProvider);
this.inputSplitsNum += scanProvider.getInputSplitNum();
this.totalFileSize += scanProvider.getInputFileSize();
TableIf table = desc.getTable();
if (table instanceof HMSExternalTable) {
if (((HMSExternalTable)
table).getDlaType().equals(HMSExternalTable.DLAType.HIVE)) {
- genSlotToSchemaIdMap(context);
+ genSlotToSchemaIdMap();
}
}
if (scanProvider instanceof HiveScanProvider) {
@@ -249,12 +265,12 @@ public class FileQueryScanNode extends FileScanNode {
}
}
- private void setColumnPositionMappingForTextFile(FileScanProviderIf
scanProvider, ParamCreateContext context)
+ private void setColumnPositionMappingForTextFile()
throws UserException {
TableIf tbl = scanProvider.getTargetTable();
List<Integer> columnIdxs = Lists.newArrayList();
- for (TFileScanSlotInfo slot : context.params.getRequiredSlots()) {
+ for (TFileScanSlotInfo slot : params.getRequiredSlots()) {
if (!slot.isIsFileSlot()) {
continue;
}
@@ -266,10 +282,10 @@ public class FileQueryScanNode extends FileScanNode {
}
columnIdxs.add(idx);
}
- context.params.setColumnIdxs(columnIdxs);
+ params.setColumnIdxs(columnIdxs);
}
- protected void setDefaultValueExprs(FileScanProviderIf scanProvider,
ParamCreateContext context)
+ protected void setDefaultValueExprs()
throws UserException {
TableIf tbl = scanProvider.getTargetTable();
Preconditions.checkNotNull(tbl);
@@ -291,7 +307,7 @@ public class FileQueryScanNode extends FileScanNode {
expr = null;
}
}
- SlotDescriptor slotDesc =
context.destSlotDescByName.get(column.getName());
+ SlotDescriptor slotDesc = destSlotDescByName.get(column.getName());
// if slot desc is null, which mean it is an unrelated slot, just
skip.
// eg:
// (a, b, c) set (x=a, y=b, z=c)
@@ -300,15 +316,15 @@ public class FileQueryScanNode extends FileScanNode {
if (slotDesc != null) {
if (expr != null) {
expr = castToSlot(slotDesc, expr);
-
context.params.putToDefaultValueOfSrcSlot(slotDesc.getId().asInt(),
expr.treeToThrift());
+
params.putToDefaultValueOfSrcSlot(slotDesc.getId().asInt(),
expr.treeToThrift());
} else {
-
context.params.putToDefaultValueOfSrcSlot(slotDesc.getId().asInt(), tExpr);
+
params.putToDefaultValueOfSrcSlot(slotDesc.getId().asInt(), tExpr);
}
}
}
}
- private void genSlotToSchemaIdMap(ParamCreateContext context) {
+ private void genSlotToSchemaIdMap() {
List<Column> baseSchema = desc.getTable().getBaseSchema();
Map<String, Integer> columnNameToPosition = Maps.newHashMap();
for (SlotDescriptor slot : desc.getSlots()) {
@@ -321,6 +337,6 @@ public class FileQueryScanNode extends FileScanNode {
idx += 1;
}
}
- context.params.setSlotNameToSchemaPos(columnNameToPosition);
+ params.setSlotNameToSchemaPos(columnNameToPosition);
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileScanNode.java
index 56ea637d1a..18f878e41b 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileScanNode.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileScanNode.java
@@ -18,10 +18,9 @@
package org.apache.doris.planner.external;
import org.apache.doris.analysis.Expr;
-import org.apache.doris.analysis.SlotDescriptor;
import org.apache.doris.analysis.TupleDescriptor;
import org.apache.doris.common.UserException;
-import org.apache.doris.load.BrokerFileGroup;
+import org.apache.doris.planner.FileLoadScanNode;
import org.apache.doris.planner.PlanNodeId;
import org.apache.doris.statistics.StatisticalType;
import org.apache.doris.thrift.TExplainLevel;
@@ -32,10 +31,8 @@ import org.apache.doris.thrift.TPlanNode;
import org.apache.doris.thrift.TPlanNodeType;
import org.apache.doris.thrift.TScanRangeLocations;
-import com.google.common.base.Preconditions;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
import com.google.common.collect.Multimap;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -43,7 +40,6 @@ import org.apache.logging.log4j.Logger;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
-import java.util.Map;
/**
* Base class for External File Scan, including external query and load.
@@ -51,29 +47,11 @@ import java.util.Map;
public class FileScanNode extends ExternalScanNode {
private static final Logger LOG = LogManager.getLogger(FileScanNode.class);
- public static class ParamCreateContext {
- public BrokerFileGroup fileGroup;
- public List<Expr> conjuncts;
-
- public TupleDescriptor destTupleDescriptor;
- public Map<String, SlotDescriptor> destSlotDescByName;
- // === Set when init ===
- public TupleDescriptor srcTupleDescriptor;
- public Map<String, SlotDescriptor> srcSlotDescByName;
- public Map<String, Expr> exprMap;
- public String timezone;
- // === Set when init ===
-
- public TFileScanRangeParams params;
-
- public void createDestSlotMap() {
- Preconditions.checkNotNull(destTupleDescriptor);
- destSlotDescByName = Maps.newHashMap();
- for (SlotDescriptor slot : destTupleDescriptor.getSlots()) {
- destSlotDescByName.put(slot.getColumn().getName(), slot);
- }
- }
- }
+ // For explain
+ protected long inputSplitsNum = 0;
+ protected long totalFileSize = 0;
+ protected long totalPartitionNum = 0;
+ protected long readPartitionNum = 0;
protected final FederationBackendPolicy backendPolicy = new
FederationBackendPolicy();
@@ -166,8 +144,16 @@ public class FileScanNode extends ExternalScanNode {
return output.toString();
}
- protected void createScanRangeLocations(ParamCreateContext context,
FileScanProviderIf scanProvider)
+ // TODO: Keep 2 versions of createScanRangeLocations, will fix this while
refactor split and assignment code.
+ protected void
createScanRangeLocations(FileLoadScanNode.ParamCreateContext context,
+ FileScanProviderIf scanProvider)
throws UserException {
scanProvider.createScanRangeLocations(context, backendPolicy,
scanRangeLocations);
}
+
+ protected void createScanRangeLocations(List<Expr> conjuncts,
TFileScanRangeParams params,
+ FileScanProviderIf scanProvider)
+ throws UserException {
+ scanProvider.createScanRangeLocations(conjuncts, params,
backendPolicy, scanRangeLocations);
+ }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileScanProviderIf.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileScanProviderIf.java
index d20f6cd861..367e5d4e72 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileScanProviderIf.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileScanProviderIf.java
@@ -18,11 +18,14 @@
package org.apache.doris.planner.external;
import org.apache.doris.analysis.Analyzer;
+import org.apache.doris.analysis.Expr;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.common.UserException;
+import org.apache.doris.planner.FileLoadScanNode;
import org.apache.doris.thrift.TFileFormatType;
+import org.apache.doris.thrift.TFileScanRangeParams;
import org.apache.doris.thrift.TFileType;
import org.apache.doris.thrift.TScanRangeLocations;
@@ -41,9 +44,13 @@ public interface FileScanProviderIf {
List<String> getPathPartitionKeys() throws DdlException,
MetaNotFoundException;
- FileScanNode.ParamCreateContext createContext(Analyzer analyzer) throws
UserException;
+ FileLoadScanNode.ParamCreateContext createContext(Analyzer analyzer)
throws UserException;
- void createScanRangeLocations(FileScanNode.ParamCreateContext context,
FederationBackendPolicy backendPolicy,
+ void createScanRangeLocations(FileLoadScanNode.ParamCreateContext context,
FederationBackendPolicy backendPolicy,
+ List<TScanRangeLocations>
scanRangeLocations) throws UserException;
+
+ void createScanRangeLocations(List<Expr> conjuncts, TFileScanRangeParams
params,
+ FederationBackendPolicy backendPolicy,
List<TScanRangeLocations>
scanRangeLocations) throws UserException;
int getInputSplitNum();
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanProvider.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanProvider.java
index ca776be5f3..99f84a128f 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanProvider.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanProvider.java
@@ -17,10 +17,7 @@
package org.apache.doris.planner.external;
-import org.apache.doris.analysis.Analyzer;
-import org.apache.doris.analysis.SlotDescriptor;
import org.apache.doris.analysis.TupleDescriptor;
-import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.HiveMetaStoreClientHelper;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.catalog.external.HMSExternalTable;
@@ -28,12 +25,9 @@ import org.apache.doris.common.DdlException;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.common.UserException;
-import org.apache.doris.load.BrokerFileGroup;
import org.apache.doris.planner.ColumnRange;
import org.apache.doris.thrift.TFileAttributes;
import org.apache.doris.thrift.TFileFormatType;
-import org.apache.doris.thrift.TFileScanRangeParams;
-import org.apache.doris.thrift.TFileScanSlotInfo;
import org.apache.doris.thrift.TFileTextScanRangeParams;
import org.apache.doris.thrift.TFileType;
@@ -61,9 +55,6 @@ public class HiveScanProvider extends HMSTableScanProvider {
protected final TupleDescriptor desc;
protected Map<String, ColumnRange> columnNameToRange;
- protected int totalPartitionNum = 0;
- protected int readPartitionNum = 0;
-
public HiveScanProvider(HMSExternalTable hmsTable, TupleDescriptor desc,
Map<String, ColumnRange> columnNameToRange) {
this.hmsTable = hmsTable;
@@ -151,34 +142,6 @@ public class HiveScanProvider extends HMSTableScanProvider
{
return
getRemoteHiveTable().getPartitionKeys().stream().map(FieldSchema::getName).collect(Collectors.toList());
}
- @Override
- public FileScanNode.ParamCreateContext createContext(Analyzer analyzer)
throws UserException {
- FileScanNode.ParamCreateContext context = new
FileScanNode.ParamCreateContext();
- context.params = new TFileScanRangeParams();
- context.destTupleDescriptor = desc;
- context.params.setDestTupleId(desc.getId().asInt());
- context.fileGroup = new BrokerFileGroup(hmsTable.getId(),
- hmsTable.getRemoteTable().getSd().getLocation(),
hmsTable.getRemoteTable().getSd().getInputFormat());
-
-
- // Hive table must extract partition value from path and hudi/iceberg
table keep
- // partition field in file.
- List<String> partitionKeys = getPathPartitionKeys();
- List<Column> columns = hmsTable.getBaseSchema(false);
- context.params.setNumOfColumnsFromFile(columns.size() -
partitionKeys.size());
- for (SlotDescriptor slot : desc.getSlots()) {
- if (!slot.isMaterialized()) {
- continue;
- }
-
- TFileScanSlotInfo slotInfo = new TFileScanSlotInfo();
- slotInfo.setSlotId(slot.getId().asInt());
-
slotInfo.setIsFileSlot(!partitionKeys.contains(slot.getColumn().getName()));
- context.params.addToRequiredSlots(slotInfo);
- }
- return context;
- }
-
@Override
public TFileAttributes getFileAttributes() throws UserException {
TFileTextScanRangeParams textParams = new TFileTextScanRangeParams();
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/LoadScanProvider.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/LoadScanProvider.java
index 8a5fa8a16b..cac20c78fa 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/LoadScanProvider.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/LoadScanProvider.java
@@ -18,6 +18,7 @@
package org.apache.doris.planner.external;
import org.apache.doris.analysis.Analyzer;
+import org.apache.doris.analysis.Expr;
import org.apache.doris.analysis.ImportColumnDesc;
import org.apache.doris.analysis.IntLiteral;
import org.apache.doris.analysis.SlotRef;
@@ -35,6 +36,7 @@ import org.apache.doris.common.util.VectorizedUtil;
import org.apache.doris.load.BrokerFileGroup;
import org.apache.doris.load.Load;
import org.apache.doris.load.loadv2.LoadTask;
+import org.apache.doris.planner.FileLoadScanNode;
import org.apache.doris.task.LoadTaskInfo;
import org.apache.doris.thrift.TBrokerFileStatus;
import org.apache.doris.thrift.TFileAttributes;
@@ -84,8 +86,8 @@ public class LoadScanProvider implements FileScanProviderIf {
}
@Override
- public FileScanNode.ParamCreateContext createContext(Analyzer analyzer)
throws UserException {
- FileScanNode.ParamCreateContext ctx = new
FileScanNode.ParamCreateContext();
+ public FileLoadScanNode.ParamCreateContext createContext(Analyzer
analyzer) throws UserException {
+ FileLoadScanNode.ParamCreateContext ctx = new
FileLoadScanNode.ParamCreateContext();
ctx.destTupleDescriptor = destTupleDesc;
ctx.fileGroup = fileGroupInfo.getFileGroup();
ctx.timezone = analyzer.getTimezone();
@@ -137,7 +139,7 @@ public class LoadScanProvider implements FileScanProviderIf
{
}
@Override
- public void createScanRangeLocations(FileScanNode.ParamCreateContext
context,
+ public void createScanRangeLocations(FileLoadScanNode.ParamCreateContext
context,
FederationBackendPolicy backendPolicy,
List<TScanRangeLocations>
scanRangeLocations) throws UserException {
Preconditions.checkNotNull(fileGroupInfo);
@@ -145,6 +147,12 @@ public class LoadScanProvider implements
FileScanProviderIf {
fileGroupInfo.createScanRangeLocations(context, backendPolicy,
scanRangeLocations);
}
+ @Override
+ public void createScanRangeLocations(List<Expr> conjuncts,
TFileScanRangeParams params,
+ FederationBackendPolicy backendPolicy,
+ List<TScanRangeLocations>
scanRangeLocations) throws UserException {
+ }
+
@Override
public int getInputSplitNum() {
return fileGroupInfo.getFileStatuses().size();
@@ -169,7 +177,7 @@ public class LoadScanProvider implements FileScanProviderIf
{
* @param context
* @throws UserException
*/
- private void initColumns(FileScanNode.ParamCreateContext context, Analyzer
analyzer) throws UserException {
+ private void initColumns(FileLoadScanNode.ParamCreateContext context,
Analyzer analyzer) throws UserException {
context.srcTupleDescriptor =
analyzer.getDescTbl().createTupleDescriptor();
context.srcSlotDescByName =
Maps.newTreeMap(String.CASE_INSENSITIVE_ORDER);
context.exprMap = Maps.newTreeMap(String.CASE_INSENSITIVE_ORDER);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/QueryScanProvider.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/QueryScanProvider.java
index be7e5bb43e..4f9e1d2e17 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/QueryScanProvider.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/QueryScanProvider.java
@@ -17,6 +17,8 @@
package org.apache.doris.planner.external;
+import org.apache.doris.analysis.Analyzer;
+import org.apache.doris.analysis.Expr;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.FsBroker;
import org.apache.doris.catalog.HdfsResource;
@@ -24,6 +26,7 @@ import org.apache.doris.common.DdlException;
import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.BrokerUtil;
+import org.apache.doris.planner.FileLoadScanNode;
import org.apache.doris.planner.Split;
import org.apache.doris.planner.Splitter;
import org.apache.doris.planner.external.iceberg.IcebergScanProvider;
@@ -58,22 +61,33 @@ public abstract class QueryScanProvider implements
FileScanProviderIf {
public abstract TFileAttributes getFileAttributes() throws UserException;
@Override
- public void createScanRangeLocations(FileScanNode.ParamCreateContext
context,
+ public void createScanRangeLocations(FileLoadScanNode.ParamCreateContext
context,
+ FederationBackendPolicy backendPolicy,
+ List<TScanRangeLocations>
scanRangeLocations) throws UserException {
+ }
+
+ @Override
+ public FileLoadScanNode.ParamCreateContext createContext(Analyzer
analyzer) throws UserException {
+ return null;
+ }
+
+ @Override
+ public void createScanRangeLocations(List<Expr> conjuncts,
TFileScanRangeParams params,
FederationBackendPolicy backendPolicy,
List<TScanRangeLocations>
scanRangeLocations) throws UserException {
long start = System.currentTimeMillis();
- List<Split> inputSplits = splitter.getSplits(context.conjuncts);
+ List<Split> inputSplits = splitter.getSplits(conjuncts);
this.inputSplitNum = inputSplits.size();
if (inputSplits.isEmpty()) {
return;
}
FileSplit inputSplit = (FileSplit) inputSplits.get(0);
TFileType locationType = getLocationType();
- context.params.setFileType(locationType);
+ params.setFileType(locationType);
TFileFormatType fileFormatType = getFileFormatType();
- context.params.setFormatType(getFileFormatType());
+ params.setFormatType(getFileFormatType());
if (fileFormatType == TFileFormatType.FORMAT_CSV_PLAIN ||
fileFormatType == TFileFormatType.FORMAT_JSON) {
- context.params.setFileAttributes(getFileAttributes());
+ params.setFileAttributes(getFileAttributes());
}
// set hdfs params for hdfs file type.
@@ -92,21 +106,21 @@ public abstract class QueryScanProvider implements
FileScanProviderIf {
}
THdfsParams tHdfsParams =
HdfsResource.generateHdfsParam(locationProperties);
tHdfsParams.setFsName(fsName);
- context.params.setHdfsParams(tHdfsParams);
+ params.setHdfsParams(tHdfsParams);
if (locationType == TFileType.FILE_BROKER) {
FsBroker broker =
Env.getCurrentEnv().getBrokerMgr().getAnyAliveBroker();
if (broker == null) {
throw new UserException("No alive broker.");
}
- context.params.addToBrokerAddresses(new
TNetworkAddress(broker.ip, broker.port));
+ params.addToBrokerAddresses(new TNetworkAddress(broker.ip,
broker.port));
}
} else if (locationType == TFileType.FILE_S3) {
- context.params.setProperties(locationProperties);
+ params.setProperties(locationProperties);
}
for (Split split : inputSplits) {
- TScanRangeLocations curLocations = newLocations(context.params,
backendPolicy);
+ TScanRangeLocations curLocations = newLocations(params,
backendPolicy);
FileSplit fileSplit = (FileSplit) split;
List<String> pathPartitionKeys = getPathPartitionKeys();
List<String> partitionValuesFromPath =
BrokerUtil.parseColumnsFromPath(fileSplit.getPath().toString(),
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/TVFScanProvider.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/TVFScanProvider.java
index cc045f00ae..e4941ffa5a 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/TVFScanProvider.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/TVFScanProvider.java
@@ -17,21 +17,15 @@
package org.apache.doris.planner.external;
-import org.apache.doris.analysis.Analyzer;
-import org.apache.doris.analysis.SlotDescriptor;
import org.apache.doris.analysis.TupleDescriptor;
-import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.FunctionGenTable;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.common.UserException;
-import org.apache.doris.load.BrokerFileGroup;
import org.apache.doris.tablefunction.ExternalFileTableValuedFunction;
import org.apache.doris.thrift.TFileAttributes;
import org.apache.doris.thrift.TFileFormatType;
-import org.apache.doris.thrift.TFileScanRangeParams;
-import org.apache.doris.thrift.TFileScanSlotInfo;
import org.apache.doris.thrift.TFileType;
import com.google.common.collect.Lists;
@@ -84,34 +78,6 @@ public class TVFScanProvider extends QueryScanProvider {
return Lists.newArrayList();
}
- @Override
- public FileScanNode.ParamCreateContext createContext(Analyzer analyzer)
throws UserException {
- FileScanNode.ParamCreateContext context = new
FileScanNode.ParamCreateContext();
- context.params = new TFileScanRangeParams();
- context.destTupleDescriptor = desc;
- context.params.setDestTupleId(desc.getId().asInt());
- // no use, only for avoid null exception.
- context.fileGroup = new BrokerFileGroup(tvfTable.getId(), "", "");
-
-
- // Hive table must extract partition value from path and hudi/iceberg
table keep
- // partition field in file.
- List<String> partitionKeys = getPathPartitionKeys();
- List<Column> columns = tvfTable.getBaseSchema(false);
- context.params.setNumOfColumnsFromFile(columns.size() -
partitionKeys.size());
- for (SlotDescriptor slot : desc.getSlots()) {
- if (!slot.isMaterialized()) {
- continue;
- }
-
- TFileScanSlotInfo slotInfo = new TFileScanSlotInfo();
- slotInfo.setSlotId(slot.getId().asInt());
-
slotInfo.setIsFileSlot(!partitionKeys.contains(slot.getColumn().getName()));
- context.params.addToRequiredSlots(slotInfo);
- }
- return context;
- }
-
@Override
public TableIf getTargetTable() {
return tvfTable;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergApiSource.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergApiSource.java
index 07ad1fa208..4e4bf30c50 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergApiSource.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergApiSource.java
@@ -17,29 +17,20 @@
package org.apache.doris.planner.external.iceberg;
-import org.apache.doris.analysis.SlotDescriptor;
import org.apache.doris.analysis.TupleDescriptor;
-import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.catalog.external.IcebergExternalTable;
import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.common.UserException;
import org.apache.doris.datasource.ExternalCatalog;
import org.apache.doris.datasource.iceberg.IcebergExternalCatalog;
-import org.apache.doris.load.BrokerFileGroup;
import org.apache.doris.planner.ColumnRange;
-import org.apache.doris.planner.external.FileQueryScanNode;
import org.apache.doris.thrift.TFileAttributes;
-import org.apache.doris.thrift.TFileScanRangeParams;
-import org.apache.doris.thrift.TFileScanSlotInfo;
-import org.apache.iceberg.PartitionField;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
-import java.util.List;
import java.util.Map;
-import java.util.stream.Collectors;
/**
* Get metadata from iceberg api (all iceberg table like hive, rest, glue...)
@@ -80,32 +71,6 @@ public class IcebergApiSource implements IcebergSource {
return icebergExtTable;
}
- @Override
- public FileQueryScanNode.ParamCreateContext createContext() throws
UserException {
- FileQueryScanNode.ParamCreateContext context = new
FileQueryScanNode.ParamCreateContext();
- context.params = new TFileScanRangeParams();
- context.destTupleDescriptor = desc;
- context.params.setDestTupleId(desc.getId().asInt());
- context.fileGroup = new BrokerFileGroup(icebergExtTable.getId(),
originTable.location(), getFileFormat());
-
- // Hive table must extract partition value from path and hudi/iceberg
table keep
- // partition field in file.
- List<String> partitionKeys = originTable.spec().fields().stream()
- .map(PartitionField::name).collect(Collectors.toList());
- List<Column> columns = icebergExtTable.getBaseSchema(false);
- context.params.setNumOfColumnsFromFile(columns.size() -
partitionKeys.size());
- for (SlotDescriptor slot : desc.getSlots()) {
- if (!slot.isMaterialized()) {
- continue;
- }
- TFileScanSlotInfo slotInfo = new TFileScanSlotInfo();
- slotInfo.setSlotId(slot.getId().asInt());
-
slotInfo.setIsFileSlot(!partitionKeys.contains(slot.getColumn().getName()));
- context.params.addToRequiredSlots(slotInfo);
- }
- return context;
- }
-
@Override
public TFileAttributes getFileAttributes() throws UserException {
return new TFileAttributes();
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergHMSSource.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergHMSSource.java
index 0d99ab49c4..87760f77e1 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergHMSSource.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergHMSSource.java
@@ -26,7 +26,6 @@ import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.common.UserException;
import org.apache.doris.datasource.ExternalCatalog;
import org.apache.doris.planner.ColumnRange;
-import org.apache.doris.planner.external.FileQueryScanNode;
import org.apache.doris.planner.external.HiveScanProvider;
import org.apache.doris.thrift.TFileAttributes;
@@ -63,11 +62,6 @@ public class IcebergHMSSource implements IcebergSource {
return HiveMetaStoreClientHelper.getIcebergTable(hmsTable);
}
- @Override
- public FileQueryScanNode.ParamCreateContext createContext() throws
UserException {
- return hiveScanProvider.createContext(null);
- }
-
@Override
public TableIf getTargetTable() {
return hiveScanProvider.getTargetTable();
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergScanProvider.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergScanProvider.java
index 3cb997a7f3..c481362da7 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergScanProvider.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergScanProvider.java
@@ -23,7 +23,6 @@ import org.apache.doris.common.DdlException;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.common.UserException;
-import org.apache.doris.planner.external.FileQueryScanNode;
import org.apache.doris.planner.external.IcebergSplitter;
import org.apache.doris.planner.external.QueryScanProvider;
import org.apache.doris.thrift.TFileAttributes;
@@ -145,11 +144,6 @@ public class IcebergScanProvider extends QueryScanProvider
{
return icebergSource.getCatalog().getProperties();
}
- @Override
- public FileQueryScanNode.ParamCreateContext createContext(Analyzer
analyzer) throws UserException {
- return icebergSource.createContext();
- }
-
@Override
public TableIf getTargetTable() {
return icebergSource.getTargetTable();
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergSource.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergSource.java
index e21cfaa48a..2b9dcd23d6 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergSource.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergSource.java
@@ -23,7 +23,6 @@ import org.apache.doris.common.DdlException;
import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.common.UserException;
import org.apache.doris.datasource.ExternalCatalog;
-import org.apache.doris.planner.external.FileQueryScanNode;
import org.apache.doris.thrift.TFileAttributes;
public interface IcebergSource {
@@ -32,8 +31,6 @@ public interface IcebergSource {
org.apache.iceberg.Table getIcebergTable() throws MetaNotFoundException;
- FileQueryScanNode.ParamCreateContext createContext() throws UserException;
-
TableIf getTargetTable();
TFileAttributes getFileAttributes() throws UserException;
diff --git a/regression-test/suites/account_p0/test_nereids_row_policy.groovy
b/regression-test/suites/account_p0/test_nereids_row_policy.groovy
index 5860527306..048364ecac 100644
--- a/regression-test/suites/account_p0/test_nereids_row_policy.groovy
+++ b/regression-test/suites/account_p0/test_nereids_row_policy.groovy
@@ -23,16 +23,16 @@ suite("test_nereids_row_policy") {
def url=tokens[0] + "//" + tokens[2] + "/" + dbName + "?"
def assertQueryResult = { size ->
- def result1 = connect(user=user, password='123456', url=url) {
+ def result1 = connect(user=user, password='123456abc', url=url) {
sql "set enable_nereids_planner = false"
sql "SELECT * FROM ${tableName}"
}
- def result2 = connect(user=user, password='123456', url=url) {
+ def result2 = connect(user=user, password='123456abc', url=url) {
sql "set enable_nereids_planner = true"
sql "set enable_fallback_to_original_planner = false"
sql "SELECT * FROM ${tableName}"
}
- def result3 = connect(user=user, password='123456', url=url) {
+ def result3 = connect(user=user, password='123456abc', url=url) {
sql "set enable_nereids_planner = true"
sql "set enable_fallback_to_original_planner = false"
sql "SELECT * FROM ${viewName}"
@@ -76,7 +76,7 @@ suite("test_nereids_row_policy") {
// create user
sql "DROP USER IF EXISTS ${user}"
- sql "CREATE USER ${user} IDENTIFIED BY '123456'"
+ sql "CREATE USER ${user} IDENTIFIED BY '123456abc'"
sql "GRANT SELECT_PRIV ON internal.${dbName}.${tableName} TO ${user}"
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]