This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 46d88ede02 [Refactor](Metadata tvf) Reconstruct Metadata table-value
function into a more general framework. (#17590)
46d88ede02 is described below
commit 46d88ede024d1cb8c31837c54c1ac721b35e6966
Author: Tiewei Fang <[email protected]>
AuthorDate: Fri Mar 17 19:54:50 2023 +0800
[Refactor](Metadata tvf) Reconstruct Metadata table-value function into a
more general framework. (#17590)
---
be/src/vec/exec/scan/vmeta_scanner.cpp | 62 +++---
be/src/vec/exec/scan/vmeta_scanner.h | 5 +-
.../org/apache/doris/planner/DataGenScanNode.java | 5 +-
.../doris/planner/external/MetadataScanNode.java | 30 +--
.../apache/doris/service/FrontendServiceImpl.java | 211 +-----------------
.../tablefunction/DataGenTableValuedFunction.java | 8 +
.../tablefunction/IcebergTableValuedFunction.java | 57 +++--
.../doris/tablefunction/MetadataGenerator.java | 248 +++++++++++++++++++++
.../tablefunction/MetadataTableValuedFunction.java | 18 +-
.../tablefunction/NumbersTableValuedFunction.java | 9 -
gensrc/thrift/FrontendService.thrift | 10 +-
gensrc/thrift/PlanNodes.thrift | 15 +-
gensrc/thrift/Types.thrift | 8 +
13 files changed, 368 insertions(+), 318 deletions(-)
diff --git a/be/src/vec/exec/scan/vmeta_scanner.cpp
b/be/src/vec/exec/scan/vmeta_scanner.cpp
index 86e60edf11..50877d5558 100644
--- a/be/src/vec/exec/scan/vmeta_scanner.cpp
+++ b/be/src/vec/exec/scan/vmeta_scanner.cpp
@@ -31,7 +31,6 @@ VMetaScanner::VMetaScanner(RuntimeState* state,
VMetaScanNode* parent, int64_t t
const TScanRangeParams& scan_range, int64_t limit,
RuntimeProfile* profile)
: VScanner(state, static_cast<VScanNode*>(parent), limit, profile),
- _parent(parent),
_meta_eos(false),
_tuple_id(tuple_id),
_scan_range(scan_range.scan_range) {}
@@ -46,11 +45,7 @@ Status VMetaScanner::prepare(RuntimeState* state,
VExprContext** vconjunct_ctx_p
VLOG_CRITICAL << "VMetaScanner::prepare";
RETURN_IF_ERROR(VScanner::prepare(_state, vconjunct_ctx_ptr));
_tuple_desc = state->desc_tbl().get_tuple_descriptor(_tuple_id);
- if (_scan_range.meta_scan_range.__isset.iceberg_params) {
- RETURN_IF_ERROR(_fetch_iceberg_metadata_batch());
- } else {
- _meta_eos = true;
- }
+ RETURN_IF_ERROR(_fetch_metadata(_scan_range.meta_scan_range));
return Status::OK();
}
@@ -158,37 +153,28 @@ Status VMetaScanner::_fill_block_with_remote_data(const
std::vector<MutableColum
return Status::OK();
}
-Status VMetaScanner::_fetch_iceberg_metadata_batch() {
- VLOG_CRITICAL << "VMetaScanner::_fetch_iceberg_metadata_batch";
+Status VMetaScanner::_fetch_metadata(const TMetaScanRange& meta_scan_range) {
+ VLOG_CRITICAL << "VMetaScanner::_fetch_metadata";
TFetchSchemaTableDataRequest request;
- request.cluster_name = "";
- request.__isset.cluster_name = true;
- request.schema_table_name = TSchemaTableName::ICEBERG_TABLE_META;
- request.__isset.schema_table_name = true;
- auto scan_params = _parent->scan_params();
- TMetadataTableRequestParams meta_table_params =
TMetadataTableRequestParams();
- meta_table_params.catalog = scan_params.catalog;
- meta_table_params.__isset.catalog = true;
- meta_table_params.database = scan_params.database;
- meta_table_params.__isset.database = true;
- meta_table_params.table = scan_params.table;
- meta_table_params.__isset.table = true;
-
- meta_table_params.iceberg_metadata_params =
_scan_range.meta_scan_range.iceberg_params;
- meta_table_params.__isset.iceberg_metadata_params = true;
-
- request.metada_table_params = meta_table_params;
- request.__isset.metada_table_params = true;
+ switch (meta_scan_range.metadata_type) {
+ case TMetadataType::ICEBERG:
+ RETURN_IF_ERROR(_build_iceberg_metadata_request(meta_scan_range,
&request));
+ break;
+ default:
+ _meta_eos = true;
+ return Status::OK();
+ }
+ // _state->query_timeout() is seconds, change to milliseconds
+ int time_out = _state->query_timeout() * 1000;
TNetworkAddress master_addr =
ExecEnv::GetInstance()->master_info()->network_address;
TFetchSchemaTableDataResult result;
-
RETURN_IF_ERROR(ThriftRpcHelper::rpc<FrontendServiceClient>(
master_addr.hostname, master_addr.port,
[&request, &result](FrontendServiceConnection& client) {
client->fetchSchemaTableData(result, request);
},
- config::txn_commit_rpc_timeout_ms));
+ time_out));
Status status(result.status);
if (!status.ok()) {
@@ -199,6 +185,26 @@ Status VMetaScanner::_fetch_iceberg_metadata_batch() {
return Status::OK();
}
+Status VMetaScanner::_build_iceberg_metadata_request(const TMetaScanRange&
meta_scan_range,
+
TFetchSchemaTableDataRequest* request) {
+ VLOG_CRITICAL << "VMetaScanner::_build_iceberg_metadata_request";
+ if (!meta_scan_range.__isset.iceberg_params) {
+ return Status::InternalError("Can not find TIcebergMetadataParams from
meta_scan_range.");
+ }
+
+ // create request
+ request->__set_cluster_name("");
+ request->__set_schema_table_name(TSchemaTableName::METADATA_TABLE);
+
+ // create TMetadataTableRequestParams
+ TMetadataTableRequestParams metadata_table_params;
+ metadata_table_params.__set_metadata_type(TMetadataType::ICEBERG);
+
metadata_table_params.__set_iceberg_metadata_params(meta_scan_range.iceberg_params);
+
+ request->__set_metada_table_params(metadata_table_params);
+ return Status::OK();
+}
+
Status VMetaScanner::close(RuntimeState* state) {
VLOG_CRITICAL << "VMetaScanner::close";
RETURN_IF_ERROR(VScanner::close(state));
diff --git a/be/src/vec/exec/scan/vmeta_scanner.h
b/be/src/vec/exec/scan/vmeta_scanner.h
index 63cd50fe10..ffb6ff8d6c 100644
--- a/be/src/vec/exec/scan/vmeta_scanner.h
+++ b/be/src/vec/exec/scan/vmeta_scanner.h
@@ -35,10 +35,11 @@ public:
protected:
Status _get_block_impl(RuntimeState* state, Block* block, bool* eos)
override;
Status _fill_block_with_remote_data(const std::vector<MutableColumnPtr>&
columns);
- Status _fetch_iceberg_metadata_batch();
+ Status _fetch_metadata(const TMetaScanRange& meta_scan_range);
+ Status _build_iceberg_metadata_request(const TMetaScanRange&
meta_scan_range,
+ TFetchSchemaTableDataRequest*
request);
private:
- VMetaScanNode* _parent;
bool _meta_eos;
TupleId _tuple_id;
const TupleDescriptor* _tuple_desc;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/DataGenScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/DataGenScanNode.java
index 94c5c07028..cee9007b27 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/DataGenScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/DataGenScanNode.java
@@ -48,9 +48,8 @@ public class DataGenScanNode extends ScanNode {
private DataGenTableValuedFunction tvf;
private boolean isFinalized = false;
- public DataGenScanNode(PlanNodeId id, TupleDescriptor desc,
- String planNodeName,
DataGenTableValuedFunction tvf) {
- super(id, desc, planNodeName,
StatisticalType.TABLE_VALUED_FUNCTION_NODE);
+ public DataGenScanNode(PlanNodeId id, TupleDescriptor desc,
DataGenTableValuedFunction tvf) {
+ super(id, desc, "DataGenScanNode",
StatisticalType.TABLE_VALUED_FUNCTION_NODE);
this.tvf = tvf;
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/MetadataScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/MetadataScanNode.java
index aa9c840197..bcb34b93ec 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/MetadataScanNode.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/MetadataScanNode.java
@@ -24,12 +24,8 @@ import org.apache.doris.planner.PlanNodeId;
import org.apache.doris.planner.ScanNode;
import org.apache.doris.statistics.StatisticalType;
import org.apache.doris.system.Backend;
-import org.apache.doris.tablefunction.IcebergTableValuedFunction;
import org.apache.doris.tablefunction.MetadataTableValuedFunction;
-import org.apache.doris.thrift.TIcebergMetadataParams;
-import org.apache.doris.thrift.TIcebergMetadataType;
import org.apache.doris.thrift.TMetaScanNode;
-import org.apache.doris.thrift.TMetaScanRange;
import org.apache.doris.thrift.TNetworkAddress;
import org.apache.doris.thrift.TPlanNode;
import org.apache.doris.thrift.TPlanNodeType;
@@ -64,10 +60,8 @@ public class MetadataScanNode extends ScanNode {
protected void toThrift(TPlanNode planNode) {
planNode.setNodeType(TPlanNodeType.META_SCAN_NODE);
TMetaScanNode metaScanNode = new TMetaScanNode();
- metaScanNode.setCatalog(tvf.getMetadataTableName().getCtl());
- metaScanNode.setDatabase(tvf.getMetadataTableName().getDb());
- metaScanNode.setTable(tvf.getMetadataTableName().getTbl());
metaScanNode.setTupleId(desc.getId().asInt());
+ metaScanNode.setMetadataType(this.tvf.getMetadataType());
planNode.setMetaScanNode(metaScanNode);
}
@@ -83,28 +77,18 @@ public class MetadataScanNode extends ScanNode {
@Override
public boolean needToCheckColumnPriv() {
- return super.needToCheckColumnPriv();
+ return false;
}
private void buildScanRanges() {
- if (tvf.getMetaType() == MetadataTableValuedFunction.MetaType.ICEBERG)
{
- IcebergTableValuedFunction icebergTvf =
(IcebergTableValuedFunction) tvf;
- // todo: split
- TScanRangeLocations locations =
createIcebergTvfLocations(icebergTvf);
- scanRangeLocations.add(locations);
- }
+ // todo: split
+ TScanRangeLocations locations = createMetaDataTvfLocations();
+ scanRangeLocations.add(locations);
}
- private TScanRangeLocations
createIcebergTvfLocations(IcebergTableValuedFunction icebergTvf) {
+ private TScanRangeLocations createMetaDataTvfLocations() {
TScanRange scanRange = new TScanRange();
- TMetaScanRange metaScanRange = new TMetaScanRange();
- // set iceberg metadata params
- TIcebergMetadataParams icebergMetadataParams = new
TIcebergMetadataParams();
- int metadataType = icebergTvf.getMetaQueryType().ordinal();
-
icebergMetadataParams.setMetadataType(TIcebergMetadataType.findByValue(metadataType));
-
- metaScanRange.setIcebergParams(icebergMetadataParams);
- scanRange.setMetaScanRange(metaScanRange);
+ scanRange.setMetaScanRange(tvf.getMetaScanRange());
// set location
TScanRangeLocation location = new TScanRangeLocation();
Backend backend = backendPolicy.getNextBe();
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
index 911f2b5f15..58c2eb479e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
@@ -17,7 +17,6 @@
package org.apache.doris.service;
-import org.apache.doris.alter.DecommissionType;
import org.apache.doris.alter.SchemaChangeHandler;
import org.apache.doris.analysis.AddColumnsClause;
import org.apache.doris.analysis.ColumnDef;
@@ -29,7 +28,6 @@ import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.DatabaseIf;
import org.apache.doris.catalog.Env;
-import org.apache.doris.catalog.HMSResource;
import org.apache.doris.catalog.Index;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.Replica;
@@ -39,7 +37,6 @@ import org.apache.doris.catalog.TableIf.TableType;
import org.apache.doris.catalog.Tablet;
import org.apache.doris.catalog.TabletMeta;
import org.apache.doris.catalog.external.ExternalDatabase;
-import org.apache.doris.cluster.Cluster;
import org.apache.doris.cluster.ClusterNamespace;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.AuthenticationException;
@@ -56,11 +53,9 @@ import org.apache.doris.common.ThriftServerEventProcessor;
import org.apache.doris.common.UserException;
import org.apache.doris.common.Version;
import org.apache.doris.common.annotation.LogException;
-import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.cooldown.CooldownDelete;
import org.apache.doris.datasource.CatalogIf;
import org.apache.doris.datasource.ExternalCatalog;
-import org.apache.doris.datasource.HMSExternalCatalog;
import org.apache.doris.datasource.InternalCatalog;
import org.apache.doris.master.MasterImpl;
import org.apache.doris.mysql.privilege.AccessControllerManager;
@@ -71,15 +66,14 @@ import org.apache.doris.qe.ConnectProcessor;
import org.apache.doris.qe.QeProcessorImpl;
import org.apache.doris.qe.QueryState;
import org.apache.doris.qe.VariableMgr;
-import org.apache.doris.system.Backend;
import org.apache.doris.system.Frontend;
import org.apache.doris.system.SystemInfoService;
+import org.apache.doris.tablefunction.MetadataGenerator;
import org.apache.doris.task.StreamLoadTask;
import org.apache.doris.thrift.FrontendService;
import org.apache.doris.thrift.FrontendServiceVersion;
import org.apache.doris.thrift.TAddColumnsRequest;
import org.apache.doris.thrift.TAddColumnsResult;
-import org.apache.doris.thrift.TCell;
import org.apache.doris.thrift.TCheckAuthRequest;
import org.apache.doris.thrift.TCheckAuthResult;
import org.apache.doris.thrift.TColumn;
@@ -104,7 +98,6 @@ import org.apache.doris.thrift.TGetDbsParams;
import org.apache.doris.thrift.TGetDbsResult;
import org.apache.doris.thrift.TGetTablesParams;
import org.apache.doris.thrift.TGetTablesResult;
-import org.apache.doris.thrift.TIcebergMetadataType;
import org.apache.doris.thrift.TInitExternalCtlMetaRequest;
import org.apache.doris.thrift.TInitExternalCtlMetaResult;
import org.apache.doris.thrift.TListPrivilegesResult;
@@ -120,7 +113,6 @@ import org.apache.doris.thrift.TLoadTxnRollbackResult;
import org.apache.doris.thrift.TMasterOpRequest;
import org.apache.doris.thrift.TMasterOpResult;
import org.apache.doris.thrift.TMasterResult;
-import org.apache.doris.thrift.TMetadataTableRequestParams;
import org.apache.doris.thrift.TMySqlLoadAcquireTokenResult;
import org.apache.doris.thrift.TNetworkAddress;
import org.apache.doris.thrift.TPrivilegeCtrl;
@@ -130,7 +122,6 @@ import org.apache.doris.thrift.TPrivilegeType;
import org.apache.doris.thrift.TReportExecStatusParams;
import org.apache.doris.thrift.TReportExecStatusResult;
import org.apache.doris.thrift.TReportRequest;
-import org.apache.doris.thrift.TRow;
import org.apache.doris.thrift.TShowVariableRequest;
import org.apache.doris.thrift.TShowVariableResult;
import org.apache.doris.thrift.TSnapshotLoaderReportRequest;
@@ -150,22 +141,14 @@ import
org.apache.doris.transaction.TransactionState.TxnSourceType;
import org.apache.doris.transaction.TxnCommitAttachment;
import com.google.common.base.Preconditions;
-import com.google.common.base.Stopwatch;
import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
-import com.google.gson.Gson;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hdfs.HdfsConfiguration;
-import org.apache.iceberg.Snapshot;
-import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.thrift.TException;
-import org.jetbrains.annotations.NotNull;
import java.time.Instant;
-import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.util.ArrayList;
@@ -1352,197 +1335,13 @@ public class FrontendServiceImpl implements
FrontendService.Iface {
public TFetchSchemaTableDataResult
fetchSchemaTableData(TFetchSchemaTableDataRequest request) throws TException {
switch (request.getSchemaTableName()) {
case BACKENDS:
- return getBackendsSchemaTable(request);
- case ICEBERG_TABLE_META:
- return getIcebergMetadataTable(request);
+ return MetadataGenerator.getBackendsSchemaTable(request);
+ case METADATA_TABLE:
+ return MetadataGenerator.getMetadataTable(request);
default:
break;
}
- TFetchSchemaTableDataResult result = new TFetchSchemaTableDataResult();
- result.setStatus(new TStatus(TStatusCode.INTERNAL_ERROR));
- return result;
- }
-
- private TFetchSchemaTableDataResult
getIcebergMetadataTable(TFetchSchemaTableDataRequest request) {
- if (!request.isSetMetadaTableParams()) {
- return errorResult("Metadata table params is not set. ");
- }
- TMetadataTableRequestParams params = request.getMetadaTableParams();
- if (!params.isSetIcebergMetadataParams()) {
- return errorResult("Iceberg metadata params is not set. ");
- }
-
- HMSExternalCatalog catalog = (HMSExternalCatalog)
Env.getCurrentEnv().getCatalogMgr()
- .getCatalog(params.getCatalog());
- org.apache.iceberg.Table table;
- try {
- table = getIcebergTable(catalog, params.getDatabase(),
params.getTable());
- } catch (MetaNotFoundException e) {
- return errorResult(e.getMessage());
- }
- TFetchSchemaTableDataResult result = new TFetchSchemaTableDataResult();
- List<TRow> dataBatch = Lists.newArrayList();
- TIcebergMetadataType metadataType =
params.getIcebergMetadataParams().getMetadataType();
- switch (metadataType) {
- case SNAPSHOTS:
- for (Snapshot snapshot : table.snapshots()) {
- TRow trow = new TRow();
- LocalDateTime committedAt =
LocalDateTime.ofInstant(Instant.ofEpochMilli(
- snapshot.timestampMillis()),
TimeUtils.getTimeZone().toZoneId());
- long encodedDatetime =
convertToDateTimeV2(committedAt.getYear(), committedAt.getMonthValue(),
- committedAt.getDayOfMonth(), committedAt.getHour(),
- committedAt.getMinute(), committedAt.getSecond());
-
- trow.addToColumnValue(new
TCell().setLongVal(encodedDatetime));
- trow.addToColumnValue(new
TCell().setLongVal(snapshot.snapshotId()));
- if (snapshot.parentId() == null) {
- trow.addToColumnValue(new TCell().setLongVal(-1L));
- } else {
- trow.addToColumnValue(new
TCell().setLongVal(snapshot.parentId()));
- }
- trow.addToColumnValue(new
TCell().setStringVal(snapshot.operation()));
- trow.addToColumnValue(new
TCell().setStringVal(snapshot.manifestListLocation()));
- dataBatch.add(trow);
- }
- break;
- default:
- return errorResult("Unsupported metadata inspect type: " +
metadataType);
- }
- result.setDataBatch(dataBatch);
- result.setStatus(new TStatus(TStatusCode.OK));
- return result;
- }
-
- public static long convertToDateTimeV2(int year, int month, int day, int
hour, int minute, int second) {
- return (long) second << 20 | (long) minute << 26 | (long) hour << 32
- | (long) day << 37 | (long) month << 42 | (long) year << 46;
- }
-
- @NotNull
- private TFetchSchemaTableDataResult errorResult(String msg) {
- TFetchSchemaTableDataResult result = new TFetchSchemaTableDataResult();
- result.setStatus(new TStatus(TStatusCode.INTERNAL_ERROR));
- result.status.addToErrorMsgs(msg);
- return result;
- }
-
- private org.apache.iceberg.Table getIcebergTable(HMSExternalCatalog
catalog, String db, String tbl)
- throws MetaNotFoundException {
- org.apache.iceberg.hive.HiveCatalog hiveCatalog = new
org.apache.iceberg.hive.HiveCatalog();
- Configuration conf = new HdfsConfiguration();
- Map<String, String> properties =
catalog.getCatalogProperty().getHadoopProperties();
- for (Map.Entry<String, String> entry : properties.entrySet()) {
- conf.set(entry.getKey(), entry.getValue());
- }
- hiveCatalog.setConf(conf);
- Map<String, String> catalogProperties = new HashMap<>();
- catalogProperties.put(HMSResource.HIVE_METASTORE_URIS,
catalog.getHiveMetastoreUris());
- catalogProperties.put("uri", catalog.getHiveMetastoreUris());
- hiveCatalog.initialize("hive", catalogProperties);
- return hiveCatalog.loadTable(TableIdentifier.of(db, tbl));
- }
-
- private TFetchSchemaTableDataResult
getBackendsSchemaTable(TFetchSchemaTableDataRequest request) {
- final SystemInfoService clusterInfoService =
Env.getCurrentSystemInfo();
- TFetchSchemaTableDataResult result = new TFetchSchemaTableDataResult();
- List<Long> backendIds = null;
- if (!Strings.isNullOrEmpty(request.cluster_name)) {
- final Cluster cluster =
Env.getCurrentEnv().getCluster(request.cluster_name);
- // root not in any cluster
- if (null == cluster) {
- return result;
- }
- backendIds = cluster.getBackendIdList();
- } else {
- backendIds = clusterInfoService.getBackendIds(false);
- if (backendIds == null) {
- return result;
- }
- }
-
- long start = System.currentTimeMillis();
- Stopwatch watch = Stopwatch.createUnstarted();
-
- List<TRow> dataBatch = Lists.newArrayList();
- for (long backendId : backendIds) {
- Backend backend = clusterInfoService.getBackend(backendId);
- if (backend == null) {
- continue;
- }
-
- watch.start();
- Integer tabletNum =
Env.getCurrentInvertedIndex().getTabletNumByBackendId(backendId);
- watch.stop();
-
- TRow trow = new TRow();
- trow.addToColumnValue(new TCell().setLongVal(backendId));
- trow.addToColumnValue(new
TCell().setStringVal(backend.getOwnerClusterName()));
- trow.addToColumnValue(new TCell().setStringVal(backend.getIp()));
- if (Strings.isNullOrEmpty(request.cluster_name)) {
- trow.addToColumnValue(new
TCell().setIntVal(backend.getHeartbeatPort()));
- trow.addToColumnValue(new
TCell().setIntVal(backend.getBePort()));
- trow.addToColumnValue(new
TCell().setIntVal(backend.getHttpPort()));
- trow.addToColumnValue(new
TCell().setIntVal(backend.getBrpcPort()));
- }
- trow.addToColumnValue(new
TCell().setStringVal(TimeUtils.longToTimeString(backend.getLastStartTime())));
- trow.addToColumnValue(new
TCell().setStringVal(TimeUtils.longToTimeString(backend.getLastUpdateMs())));
- trow.addToColumnValue(new
TCell().setStringVal(String.valueOf(backend.isAlive())));
- if (backend.isDecommissioned() && backend.getDecommissionType() ==
DecommissionType.ClusterDecommission) {
- trow.addToColumnValue(new TCell().setStringVal("false"));
- trow.addToColumnValue(new TCell().setStringVal("true"));
- } else if (backend.isDecommissioned()
- && backend.getDecommissionType() ==
DecommissionType.SystemDecommission) {
- trow.addToColumnValue(new TCell().setStringVal("true"));
- trow.addToColumnValue(new TCell().setStringVal("false"));
- } else {
- trow.addToColumnValue(new TCell().setStringVal("false"));
- trow.addToColumnValue(new TCell().setStringVal("false"));
- }
- trow.addToColumnValue(new TCell().setLongVal(tabletNum));
-
- // capacity
- // data used
- trow.addToColumnValue(new
TCell().setLongVal(backend.getDataUsedCapacityB()));
-
- // available
- long availB = backend.getAvailableCapacityB();
- trow.addToColumnValue(new TCell().setLongVal(availB));
-
- // total
- long totalB = backend.getTotalCapacityB();
- trow.addToColumnValue(new TCell().setLongVal(totalB));
-
- // used percent
- double used = 0.0;
- if (totalB <= 0) {
- used = 0.0;
- } else {
- used = (double) (totalB - availB) * 100 / totalB;
- }
- trow.addToColumnValue(new TCell().setDoubleVal(used));
- trow.addToColumnValue(new
TCell().setDoubleVal(backend.getMaxDiskUsedPct() * 100));
-
- // remote used capacity
- trow.addToColumnValue(new
TCell().setLongVal(backend.getRemoteUsedCapacityB()));
-
- // tags
- trow.addToColumnValue(new
TCell().setStringVal(backend.getTagMapString()));
- // err msg
- trow.addToColumnValue(new
TCell().setStringVal(backend.getHeartbeatErrMsg()));
- // version
- trow.addToColumnValue(new
TCell().setStringVal(backend.getVersion()));
- // status
- trow.addToColumnValue(new TCell().setStringVal(new
Gson().toJson(backend.getBackendStatus())));
- dataBatch.add(trow);
- }
-
- // backends proc node get result too slow, add log to observer.
- LOG.debug("backends proc get tablet num cost: {}, total cost: {}",
- watch.elapsed(TimeUnit.MILLISECONDS),
(System.currentTimeMillis() - start));
-
- result.setDataBatch(dataBatch);
- result.setStatus(new TStatus(TStatusCode.OK));
- return result;
+ return MetadataGenerator.errorResult("Fetch schema table name is not
set");
}
private TNetworkAddress getClientAddr() {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/DataGenTableValuedFunction.java
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/DataGenTableValuedFunction.java
index ab3579e9d8..fc2a6d6dd3 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/DataGenTableValuedFunction.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/DataGenTableValuedFunction.java
@@ -17,7 +17,11 @@
package org.apache.doris.tablefunction;
+import org.apache.doris.analysis.TupleDescriptor;
import org.apache.doris.common.AnalysisException;
+import org.apache.doris.planner.DataGenScanNode;
+import org.apache.doris.planner.PlanNodeId;
+import org.apache.doris.planner.ScanNode;
import org.apache.doris.thrift.TDataGenFunctionName;
import java.util.List;
@@ -27,4 +31,8 @@ public abstract class DataGenTableValuedFunction extends
TableValuedFunctionIf {
public abstract TDataGenFunctionName getDataGenFunctionName();
+ @Override
+ public ScanNode getScanNode(PlanNodeId id, TupleDescriptor desc) {
+ return new DataGenScanNode(id, desc, this);
+ }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/IcebergTableValuedFunction.java
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/IcebergTableValuedFunction.java
index 9b58119ddd..38d11496e3 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/IcebergTableValuedFunction.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/IcebergTableValuedFunction.java
@@ -26,6 +26,10 @@ import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.ErrorReport;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.thrift.TIcebergMetadataParams;
+import org.apache.doris.thrift.TIcebergQueryType;
+import org.apache.doris.thrift.TMetaScanRange;
+import org.apache.doris.thrift.TMetadataType;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Maps;
@@ -40,8 +44,6 @@ import java.util.Map;
*/
public class IcebergTableValuedFunction extends MetadataTableValuedFunction {
- public enum MetadataType { SNAPSHOTS }
-
public static final String NAME = "iceberg_meta";
private static final String TABLE = "table";
private static final String QUERY_TYPE = "query_type";
@@ -51,54 +53,71 @@ public class IcebergTableValuedFunction extends
MetadataTableValuedFunction {
.add(QUERY_TYPE)
.build();
- private final MetadataType queryType;
- private final TableName tableName;
+ private TIcebergQueryType queryType;
+
+ // here tableName represents the name of a table in Iceberg.
+ private final TableName icebergTableName;
public IcebergTableValuedFunction(Map<String, String> params) throws
AnalysisException {
- super(MetaType.ICEBERG);
Map<String, String> validParams = Maps.newHashMap();
for (String key : params.keySet()) {
if (!PROPERTIES_SET.contains(key.toLowerCase())) {
throw new AnalysisException("'" + key + "' is invalid
property");
}
- // check ctl db tbl
+ // check ctl, db, tbl
validParams.put(key.toLowerCase(), params.get(key));
}
String tableName = validParams.get(TABLE);
- String queryType = validParams.get(QUERY_TYPE);
- if (tableName == null || queryType == null) {
+ String queryTypeString = validParams.get(QUERY_TYPE);
+ if (tableName == null || queryTypeString == null) {
throw new AnalysisException("Invalid iceberg metadata query");
}
String[] names = tableName.split("\\.");
if (names.length != 3) {
throw new AnalysisException("The iceberg table name contains the
catalogName, databaseName, and tableName");
}
- this.tableName = new TableName(names[0], names[1], names[2]);
+ this.icebergTableName = new TableName(names[0], names[1], names[2]);
// check auth
if (!Env.getCurrentEnv().getAccessManager()
- .checkTblPriv(ConnectContext.get(), this.tableName,
PrivPredicate.SELECT)) {
+ .checkTblPriv(ConnectContext.get(), this.icebergTableName,
PrivPredicate.SELECT)) {
ErrorReport.reportAnalysisException(ErrorCode.ERR_TABLEACCESS_DENIED_ERROR,
"SELECT",
ConnectContext.get().getQualifiedUser(),
ConnectContext.get().getRemoteIP(),
- this.tableName.getDb() + ": " + this.tableName.getTbl());
+ this.icebergTableName.getDb() + ": " +
this.icebergTableName.getTbl());
}
try {
- this.queryType = MetadataType.valueOf(queryType.toUpperCase());
+ // TODO(ftw): check here
+ this.queryType =
TIcebergQueryType.valueOf(queryTypeString.toUpperCase());
} catch (IllegalArgumentException e) {
throw new AnalysisException("Unsupported iceberg metadata query
type: " + queryType);
}
}
+ public TIcebergQueryType getIcebergQueryType() {
+ return queryType;
+ }
+
@Override
- public String getTableName() {
- return "IcebergMetadataTableValuedFunction";
+ public TMetadataType getMetadataType() {
+ return TMetadataType.ICEBERG;
}
- public TableName getMetadataTableName() {
- return tableName;
+ @Override
+ public TMetaScanRange getMetaScanRange() {
+ TMetaScanRange metaScanRange = new TMetaScanRange();
+ metaScanRange.setMetadataType(TMetadataType.ICEBERG);
+ // set iceberg metadata params
+ TIcebergMetadataParams icebergMetadataParams = new
TIcebergMetadataParams();
+ icebergMetadataParams.setIcebergQueryType(queryType);
+ icebergMetadataParams.setCatalog(icebergTableName.getCtl());
+ icebergMetadataParams.setDatabase(icebergTableName.getDb());
+ icebergMetadataParams.setTable(icebergTableName.getTbl());
+ metaScanRange.setIcebergParams(icebergMetadataParams);
+ return metaScanRange;
}
- public MetadataType getMetaQueryType() {
- return queryType;
+ @Override
+ public String getTableName() {
+ return "IcebergMetadataTableValuedFunction";
}
/**
@@ -110,7 +129,7 @@ public class IcebergTableValuedFunction extends
MetadataTableValuedFunction {
@Override
public List<Column> getTableColumns() throws AnalysisException {
List<Column> resColumns = new ArrayList<>();
- if (queryType == MetadataType.SNAPSHOTS) {
+ if (queryType == TIcebergQueryType.SNAPSHOTS) {
resColumns.add(new Column("committed_at",
PrimitiveType.DATETIMEV2, false));
resColumns.add(new Column("snapshot_id", PrimitiveType.BIGINT,
false));
resColumns.add(new Column("parent_id", PrimitiveType.BIGINT,
false));
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java
new file mode 100644
index 0000000000..20a5cd3a00
--- /dev/null
+++
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java
@@ -0,0 +1,248 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.tablefunction;
+
+import org.apache.doris.alter.DecommissionType;
+import org.apache.doris.catalog.Env;
+import org.apache.doris.catalog.HMSResource;
+import org.apache.doris.cluster.Cluster;
+import org.apache.doris.common.MetaNotFoundException;
+import org.apache.doris.common.util.TimeUtils;
+import org.apache.doris.datasource.HMSExternalCatalog;
+import org.apache.doris.system.Backend;
+import org.apache.doris.system.SystemInfoService;
+import org.apache.doris.thrift.TCell;
+import org.apache.doris.thrift.TFetchSchemaTableDataRequest;
+import org.apache.doris.thrift.TFetchSchemaTableDataResult;
+import org.apache.doris.thrift.TIcebergMetadataParams;
+import org.apache.doris.thrift.TIcebergQueryType;
+import org.apache.doris.thrift.TMetadataTableRequestParams;
+import org.apache.doris.thrift.TRow;
+import org.apache.doris.thrift.TStatus;
+import org.apache.doris.thrift.TStatusCode;
+
+import com.google.common.base.Stopwatch;
+import com.google.common.base.Strings;
+import com.google.common.collect.Lists;
+import com.google.gson.Gson;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.jetbrains.annotations.NotNull;
+
+import java.time.Instant;
+import java.time.LocalDateTime;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+public class MetadataGenerator {
+ private static final Logger LOG =
LogManager.getLogger(MetadataGenerator.class);
+
+ public static TFetchSchemaTableDataResult
getMetadataTable(TFetchSchemaTableDataRequest request) {
+ if (!request.isSetMetadaTableParams()) {
+ return errorResult("Metadata table params is not set. ");
+ }
+ switch (request.getMetadaTableParams().getMetadataType()) {
+ case ICEBERG:
+ return icebergMetadataResult(request.getMetadaTableParams());
+ default:
+ break;
+ }
+ return errorResult("Metadata table params is not set. ");
+ }
+
+ public static TFetchSchemaTableDataResult
getBackendsSchemaTable(TFetchSchemaTableDataRequest request) {
+ final SystemInfoService clusterInfoService =
Env.getCurrentSystemInfo();
+ List<Long> backendIds = null;
+ if (!Strings.isNullOrEmpty(request.cluster_name)) {
+ final Cluster cluster =
Env.getCurrentEnv().getCluster(request.cluster_name);
+ // root not in any cluster
+ if (null == cluster) {
+ return errorResult("Cluster is not existed.");
+ }
+ backendIds = cluster.getBackendIdList();
+ } else {
+ backendIds = clusterInfoService.getBackendIds(false);
+ }
+
+ TFetchSchemaTableDataResult result = new TFetchSchemaTableDataResult();
+ long start = System.currentTimeMillis();
+ Stopwatch watch = Stopwatch.createUnstarted();
+
+ List<TRow> dataBatch = Lists.newArrayList();
+ for (long backendId : backendIds) {
+ Backend backend = clusterInfoService.getBackend(backendId);
+ if (backend == null) {
+ continue;
+ }
+
+ watch.start();
+ Integer tabletNum =
Env.getCurrentInvertedIndex().getTabletNumByBackendId(backendId);
+ watch.stop();
+
+ TRow trow = new TRow();
+ trow.addToColumnValue(new TCell().setLongVal(backendId));
+ trow.addToColumnValue(new
TCell().setStringVal(backend.getOwnerClusterName()));
+ trow.addToColumnValue(new TCell().setStringVal(backend.getIp()));
+ if (Strings.isNullOrEmpty(request.cluster_name)) {
+ trow.addToColumnValue(new
TCell().setIntVal(backend.getHeartbeatPort()));
+ trow.addToColumnValue(new
TCell().setIntVal(backend.getBePort()));
+ trow.addToColumnValue(new
TCell().setIntVal(backend.getHttpPort()));
+ trow.addToColumnValue(new
TCell().setIntVal(backend.getBrpcPort()));
+ }
+ trow.addToColumnValue(new
TCell().setStringVal(TimeUtils.longToTimeString(backend.getLastStartTime())));
+ trow.addToColumnValue(new
TCell().setStringVal(TimeUtils.longToTimeString(backend.getLastUpdateMs())));
+ trow.addToColumnValue(new
TCell().setStringVal(String.valueOf(backend.isAlive())));
+ if (backend.isDecommissioned() && backend.getDecommissionType() ==
DecommissionType.ClusterDecommission) {
+ trow.addToColumnValue(new TCell().setStringVal("false"));
+ trow.addToColumnValue(new TCell().setStringVal("true"));
+ } else if (backend.isDecommissioned()
+ && backend.getDecommissionType() ==
DecommissionType.SystemDecommission) {
+ trow.addToColumnValue(new TCell().setStringVal("true"));
+ trow.addToColumnValue(new TCell().setStringVal("false"));
+ } else {
+ trow.addToColumnValue(new TCell().setStringVal("false"));
+ trow.addToColumnValue(new TCell().setStringVal("false"));
+ }
+ trow.addToColumnValue(new TCell().setLongVal(tabletNum));
+
+ // capacity
+ // data used
+ trow.addToColumnValue(new
TCell().setLongVal(backend.getDataUsedCapacityB()));
+
+ // available
+ long availB = backend.getAvailableCapacityB();
+ trow.addToColumnValue(new TCell().setLongVal(availB));
+
+ // total
+ long totalB = backend.getTotalCapacityB();
+ trow.addToColumnValue(new TCell().setLongVal(totalB));
+
+ // used percent
+ double used = 0.0;
+ if (totalB <= 0) {
+ used = 0.0;
+ } else {
+ used = (double) (totalB - availB) * 100 / totalB;
+ }
+ trow.addToColumnValue(new TCell().setDoubleVal(used));
+ trow.addToColumnValue(new
TCell().setDoubleVal(backend.getMaxDiskUsedPct() * 100));
+
+ // remote used capacity
+ trow.addToColumnValue(new
TCell().setLongVal(backend.getRemoteUsedCapacityB()));
+
+ // tags
+ trow.addToColumnValue(new
TCell().setStringVal(backend.getTagMapString()));
+ // err msg
+ trow.addToColumnValue(new
TCell().setStringVal(backend.getHeartbeatErrMsg()));
+ // version
+ trow.addToColumnValue(new
TCell().setStringVal(backend.getVersion()));
+ // status
+ trow.addToColumnValue(new TCell().setStringVal(new
Gson().toJson(backend.getBackendStatus())));
+ dataBatch.add(trow);
+ }
+
+ // backends proc node get result too slow, add log to observer.
+ LOG.debug("backends proc get tablet num cost: {}, total cost: {}",
+ watch.elapsed(TimeUnit.MILLISECONDS),
(System.currentTimeMillis() - start));
+
+ result.setDataBatch(dataBatch);
+ result.setStatus(new TStatus(TStatusCode.OK));
+ return result;
+ }
+
+ @NotNull
+ public static TFetchSchemaTableDataResult errorResult(String msg) {
+ TFetchSchemaTableDataResult result = new TFetchSchemaTableDataResult();
+ result.setStatus(new TStatus(TStatusCode.INTERNAL_ERROR));
+ result.status.addToErrorMsgs(msg);
+ return result;
+ }
+
+ private static TFetchSchemaTableDataResult
icebergMetadataResult(TMetadataTableRequestParams params) {
+ if (!params.isSetIcebergMetadataParams()) {
+ return errorResult("Iceberg metadata params is not set. ");
+ }
+ TIcebergMetadataParams icebergMetadataParams =
params.getIcebergMetadataParams();
+ HMSExternalCatalog catalog = (HMSExternalCatalog)
Env.getCurrentEnv().getCatalogMgr()
+ .getCatalog(icebergMetadataParams.getCatalog());
+ org.apache.iceberg.Table table;
+ try {
+ table = getIcebergTable(catalog,
icebergMetadataParams.getDatabase(), icebergMetadataParams.getTable());
+ } catch (MetaNotFoundException e) {
+ return errorResult(e.getMessage());
+ }
+ TFetchSchemaTableDataResult result = new TFetchSchemaTableDataResult();
+ List<TRow> dataBatch = Lists.newArrayList();
+ TIcebergQueryType icebergQueryType =
icebergMetadataParams.getIcebergQueryType();
+ switch (icebergQueryType) {
+ case SNAPSHOTS:
+ for (Snapshot snapshot : table.snapshots()) {
+ TRow trow = new TRow();
+ LocalDateTime committedAt =
LocalDateTime.ofInstant(Instant.ofEpochMilli(
+ snapshot.timestampMillis()),
TimeUtils.getTimeZone().toZoneId());
+ long encodedDatetime =
convertToDateTimeV2(committedAt.getYear(), committedAt.getMonthValue(),
+ committedAt.getDayOfMonth(), committedAt.getHour(),
+ committedAt.getMinute(), committedAt.getSecond());
+
+ trow.addToColumnValue(new
TCell().setLongVal(encodedDatetime));
+ trow.addToColumnValue(new
TCell().setLongVal(snapshot.snapshotId()));
+ if (snapshot.parentId() == null) {
+ trow.addToColumnValue(new TCell().setLongVal(-1L));
+ } else {
+ trow.addToColumnValue(new
TCell().setLongVal(snapshot.parentId()));
+ }
+ trow.addToColumnValue(new
TCell().setStringVal(snapshot.operation()));
+ trow.addToColumnValue(new
TCell().setStringVal(snapshot.manifestListLocation()));
+ dataBatch.add(trow);
+ }
+ break;
+ default:
+ return errorResult("Unsupported iceberg inspect type: " +
icebergQueryType);
+ }
+ result.setDataBatch(dataBatch);
+ result.setStatus(new TStatus(TStatusCode.OK));
+ return result;
+ }
+
+ private static org.apache.iceberg.Table getIcebergTable(HMSExternalCatalog
catalog, String db, String tbl)
+ throws MetaNotFoundException {
+ org.apache.iceberg.hive.HiveCatalog hiveCatalog = new
org.apache.iceberg.hive.HiveCatalog();
+ Configuration conf = new HdfsConfiguration();
+ Map<String, String> properties =
catalog.getCatalogProperty().getHadoopProperties();
+ for (Map.Entry<String, String> entry : properties.entrySet()) {
+ conf.set(entry.getKey(), entry.getValue());
+ }
+ hiveCatalog.setConf(conf);
+ Map<String, String> catalogProperties = new HashMap<>();
+ catalogProperties.put(HMSResource.HIVE_METASTORE_URIS,
catalog.getHiveMetastoreUris());
+ catalogProperties.put("uri", catalog.getHiveMetastoreUris());
+ hiveCatalog.initialize("hive", catalogProperties);
+ return hiveCatalog.loadTable(TableIdentifier.of(db, tbl));
+ }
+
+ private static long convertToDateTimeV2(int year, int month, int day, int
hour, int minute, int second) {
+ return (long) second << 20 | (long) minute << 26 | (long) hour << 32
+ | (long) day << 37 | (long) month << 42 | (long) year << 46;
+ }
+}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataTableValuedFunction.java
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataTableValuedFunction.java
index fd83c59957..0b30ea18c7 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataTableValuedFunction.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataTableValuedFunction.java
@@ -17,27 +17,17 @@
package org.apache.doris.tablefunction;
-import org.apache.doris.analysis.TableName;
import org.apache.doris.analysis.TupleDescriptor;
import org.apache.doris.planner.PlanNodeId;
import org.apache.doris.planner.ScanNode;
import org.apache.doris.planner.external.MetadataScanNode;
+import org.apache.doris.thrift.TMetaScanRange;
+import org.apache.doris.thrift.TMetadataType;
public abstract class MetadataTableValuedFunction extends
TableValuedFunctionIf {
+ public abstract TMetadataType getMetadataType();
- public enum MetaType { ICEBERG }
-
- private final MetaType metaType;
-
- public MetadataTableValuedFunction(MetaType metaType) {
- this.metaType = metaType;
- }
-
- public MetaType getMetaType() {
- return metaType;
- }
-
- public abstract TableName getMetadataTableName();
+ public abstract TMetaScanRange getMetaScanRange();
@Override
public ScanNode getScanNode(PlanNodeId id, TupleDescriptor desc) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/NumbersTableValuedFunction.java
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/NumbersTableValuedFunction.java
index 3e0e92b5b5..639dfeef35 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/NumbersTableValuedFunction.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/NumbersTableValuedFunction.java
@@ -17,14 +17,10 @@
package org.apache.doris.tablefunction;
-import org.apache.doris.analysis.TupleDescriptor;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.PrimitiveType;
import org.apache.doris.common.AnalysisException;
-import org.apache.doris.planner.DataGenScanNode;
-import org.apache.doris.planner.PlanNodeId;
-import org.apache.doris.planner.ScanNode;
import org.apache.doris.system.Backend;
import org.apache.doris.thrift.TDataGenFunctionName;
import org.apache.doris.thrift.TDataGenScanRange;
@@ -141,9 +137,4 @@ public class NumbersTableValuedFunction extends
DataGenTableValuedFunction {
}
return res;
}
-
- @Override
- public ScanNode getScanNode(PlanNodeId id, TupleDescriptor desc) {
- return new DataGenScanNode(id, desc, "DataGenScanNode", this);
- }
}
diff --git a/gensrc/thrift/FrontendService.thrift
b/gensrc/thrift/FrontendService.thrift
index 54b12fd8ac..1336d5e81d 100644
--- a/gensrc/thrift/FrontendService.thrift
+++ b/gensrc/thrift/FrontendService.thrift
@@ -715,16 +715,14 @@ struct TInitExternalCtlMetaResult {
2: optional string status;
}
-enum TSchemaTableName{
+enum TSchemaTableName {
BACKENDS = 0,
- ICEBERG_TABLE_META = 1,
+ METADATA_TABLE = 1,
}
struct TMetadataTableRequestParams {
- 1: optional PlanNodes.TIcebergMetadataParams iceberg_metadata_params
- 2: optional string catalog
- 3: optional string database
- 4: optional string table
+ 1: optional Types.TMetadataType metadata_type
+ 2: optional PlanNodes.TIcebergMetadataParams iceberg_metadata_params
}
struct TFetchSchemaTableDataRequest {
diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift
index 978c90e103..e80372eb69 100644
--- a/gensrc/thrift/PlanNodes.thrift
+++ b/gensrc/thrift/PlanNodes.thrift
@@ -377,16 +377,17 @@ struct TDataGenScanRange {
1: optional TTVFNumbersScanRange numbers_params
}
-enum TIcebergMetadataType {
- SNAPSHOTS = 0,
-}
struct TIcebergMetadataParams {
- 1: optional TIcebergMetadataType metadata_type
+ 1: optional Types.TIcebergQueryType iceberg_query_type
+ 2: optional string catalog
+ 3: optional string database
+ 4: optional string table
}
struct TMetaScanRange {
- 1: optional TIcebergMetadataParams iceberg_params
+ 1: optional Types.TMetadataType metadata_type
+ 2: optional TIcebergMetadataParams iceberg_params
}
// Specification of an individual data range which is held in its entirety
@@ -532,9 +533,7 @@ struct TSchemaScanNode {
struct TMetaScanNode {
1: required Types.TTupleId tuple_id
- 2: optional string catalog
- 3: optional string database
- 4: optional string table
+ 2: optional Types.TMetadataType metadata_type
}
struct TTestExternalScanNode {
diff --git a/gensrc/thrift/Types.thrift b/gensrc/thrift/Types.thrift
index aa1e00de98..e81f7fd002 100644
--- a/gensrc/thrift/Types.thrift
+++ b/gensrc/thrift/Types.thrift
@@ -657,6 +657,14 @@ enum TSortType {
ZORDER,
}
+enum TMetadataType {
+ ICEBERG
+}
+
+enum TIcebergQueryType {
+ SNAPSHOTS
+}
+
// represent a user identity
struct TUserIdentity {
1: optional string username
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]