This is an automated email from the ASF dual-hosted git repository.
zouxinyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 90c52dc7994 [enhancement](plsql) Support select * from routines
(#32866)
90c52dc7994 is described below
commit 90c52dc7994022f437ff39be331f54e2aaf5ab05
Author: Vallish Pai <[email protected]>
AuthorDate: Thu Mar 28 13:52:50 2024 +0530
[enhancement](plsql) Support select * from routines (#32866)
Support show of plsql procedure using select * from routines.
---
be/src/exec/schema_scanner.cpp | 3 +
.../exec/schema_scanner/schema_routine_scanner.cpp | 172 +++++++++++++++++++++
.../exec/schema_scanner/schema_routine_scanner.h | 52 +++++++
.../apache/doris/plsql/metastore/PlsqlManager.java | 16 +-
.../apache/doris/service/FrontendServiceImpl.java | 73 +++++----
.../doris/tablefunction/MetadataGenerator.java | 96 ++++++++++--
gensrc/thrift/FrontendService.thrift | 1 +
7 files changed, 367 insertions(+), 46 deletions(-)
diff --git a/be/src/exec/schema_scanner.cpp b/be/src/exec/schema_scanner.cpp
index 32197e37e1e..908bb7b6f09 100644
--- a/be/src/exec/schema_scanner.cpp
+++ b/be/src/exec/schema_scanner.cpp
@@ -37,6 +37,7 @@
#include "exec/schema_scanner/schema_partitions_scanner.h"
#include "exec/schema_scanner/schema_processlist_scanner.h"
#include "exec/schema_scanner/schema_profiling_scanner.h"
+#include "exec/schema_scanner/schema_routine_scanner.h"
#include "exec/schema_scanner/schema_rowsets_scanner.h"
#include "exec/schema_scanner/schema_schema_privileges_scanner.h"
#include "exec/schema_scanner/schema_schemata_scanner.h"
@@ -161,6 +162,8 @@ std::unique_ptr<SchemaScanner>
SchemaScanner::create(TSchemaTableType::type type
return SchemaWorkloadGroupsScanner::create_unique();
case TSchemaTableType::SCH_PROCESSLIST:
return SchemaProcessListScanner::create_unique();
+ case TSchemaTableType::SCH_PROCEDURES:
+ return SchemaRoutinesScanner::create_unique();
default:
return SchemaDummyScanner::create_unique();
break;
diff --git a/be/src/exec/schema_scanner/schema_routine_scanner.cpp
b/be/src/exec/schema_scanner/schema_routine_scanner.cpp
new file mode 100644
index 00000000000..7db46ada650
--- /dev/null
+++ b/be/src/exec/schema_scanner/schema_routine_scanner.cpp
@@ -0,0 +1,172 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "exec/schema_scanner/schema_routine_scanner.h"
+
+#include "runtime/client_cache.h"
+#include "runtime/exec_env.h"
+#include "runtime/runtime_state.h"
+#include "util/thrift_rpc_helper.h"
+#include "vec/common/string_ref.h"
+#include "vec/core/block.h"
+#include "vec/data_types/data_type_factory.hpp"
+
+namespace doris {
+std::vector<SchemaScanner::ColumnDesc> SchemaRoutinesScanner::_s_tbls_columns
= {
+ {"SPECIFIC_NAME", TYPE_VARCHAR, sizeof(StringRef), true},
+ {"ROUTINE_CATALOG", TYPE_VARCHAR, sizeof(StringRef), true},
+ {"ROUTINE_SCHEMA", TYPE_VARCHAR, sizeof(StringRef), true},
+ {"ROUTINE_NAME", TYPE_VARCHAR, sizeof(StringRef), true},
+ {"ROUTINE_TYPE", TYPE_VARCHAR, sizeof(StringRef), true},
+ {"DTD_IDENTIFIER", TYPE_VARCHAR, sizeof(StringRef), true},
+ {"ROUTINE_BODY", TYPE_VARCHAR, sizeof(StringRef), true},
+ {"ROUTINE_DEFINITION", TYPE_VARCHAR, sizeof(StringRef), true},
+ {"EXTERNAL_NAME", TYPE_VARCHAR, sizeof(StringRef), true},
+ {"EXTERNAL_LANGUAGE", TYPE_VARCHAR, sizeof(StringRef), true},
+ {"PARAMETER_STYLE", TYPE_VARCHAR, sizeof(StringRef), true},
+ {"IS_DETERMINISTIC", TYPE_VARCHAR, sizeof(StringRef), true},
+ {"SQL_DATA_ACCESS", TYPE_VARCHAR, sizeof(StringRef), true},
+ {"SQL_PATH", TYPE_VARCHAR, sizeof(StringRef), true},
+ {"SECURITY_TYPE", TYPE_VARCHAR, sizeof(StringRef), true},
+ {"CREATED", TYPE_DATETIME, sizeof(int64_t), true},
+ {"LAST_ALTERED", TYPE_DATETIME, sizeof(int64_t), true},
+ {"SQL_MODE", TYPE_VARCHAR, sizeof(StringRef), true},
+ {"ROUTINE_COMMENT", TYPE_VARCHAR, sizeof(StringRef), true},
+ {"DEFINER", TYPE_VARCHAR, sizeof(StringRef), true},
+ {"CHARACTER_SET_CLIENT", TYPE_VARCHAR, sizeof(StringRef), true},
+ {"COLLATION_CONNECTION", TYPE_VARCHAR, sizeof(StringRef), true},
+ {"DATABASE_COLLATION", TYPE_VARCHAR, sizeof(StringRef), true},
+};
+
+SchemaRoutinesScanner::SchemaRoutinesScanner()
+ : SchemaScanner(_s_tbls_columns, TSchemaTableType::SCH_PROCEDURES) {}
+
+Status SchemaRoutinesScanner::start(RuntimeState* state) {
+ _block_rows_limit = state->batch_size();
+ _rpc_timeout = state->execution_timeout() * 1000;
+ return Status::OK();
+}
+
+Status SchemaRoutinesScanner::get_block_from_fe() {
+ TNetworkAddress master_addr =
ExecEnv::GetInstance()->master_info()->network_address;
+ TSchemaTableRequestParams schema_table_request_params;
+ for (int i = 0; i < _s_tbls_columns.size(); i++) {
+ schema_table_request_params.__isset.columns_name = true;
+
schema_table_request_params.columns_name.emplace_back(_s_tbls_columns[i].name);
+ }
+
schema_table_request_params.__set_current_user_ident(*_param->common_param->current_user_ident);
+ TFetchSchemaTableDataRequest request;
+ request.__set_schema_table_name(TSchemaTableName::ROUTINES_INFO);
+ request.__set_schema_table_params(schema_table_request_params);
+ TFetchSchemaTableDataResult result;
+ RETURN_IF_ERROR(ThriftRpcHelper::rpc<FrontendServiceClient>(
+ master_addr.hostname, master_addr.port,
+ [&request, &result](FrontendServiceConnection& client) {
+ client->fetchSchemaTableData(result, request);
+ },
+ _rpc_timeout));
+ Status status(Status::create(result.status));
+ if (!status.ok()) {
+ LOG(WARNING) << "fetch routines from FE failed, errmsg=" << status;
+ return status;
+ }
+ std::vector<TRow> result_data = result.data_batch;
+ _routines_block = vectorized::Block::create_unique();
+ for (int i = 0; i < _s_tbls_columns.size(); ++i) {
+ TypeDescriptor descriptor(_s_tbls_columns[i].type);
+ auto data_type =
vectorized::DataTypeFactory::instance().create_data_type(descriptor, true);
+ _routines_block->insert(vectorized::ColumnWithTypeAndName(
+ data_type->create_column(), data_type,
_s_tbls_columns[i].name));
+ }
+ _routines_block->reserve(_block_rows_limit);
+ if (result_data.size() > 0) {
+ int col_size = result_data[0].column_value.size();
+ if (col_size != _s_tbls_columns.size()) {
+ return Status::InternalError<false>("routine table schema is not
match for FE and BE");
+ }
+ }
+ auto insert_string_value = [&](int col_index, std::string str_val,
vectorized::Block* block) {
+ vectorized::MutableColumnPtr mutable_col_ptr;
+ mutable_col_ptr =
std::move(*block->get_by_position(col_index).column).assume_mutable();
+ auto* nullable_column =
+
reinterpret_cast<vectorized::ColumnNullable*>(mutable_col_ptr.get());
+ vectorized::IColumn* col_ptr = &nullable_column->get_nested_column();
+
reinterpret_cast<vectorized::ColumnString*>(col_ptr)->insert_data(str_val.data(),
+
str_val.size());
+ nullable_column->get_null_map_data().emplace_back(0);
+ };
+ auto insert_datetime_value = [&](int col_index, const std::vector<void*>&
datas,
+ vectorized::Block* block) {
+ vectorized::MutableColumnPtr mutable_col_ptr;
+ mutable_col_ptr =
std::move(*block->get_by_position(col_index).column).assume_mutable();
+ auto* nullable_column =
+
reinterpret_cast<vectorized::ColumnNullable*>(mutable_col_ptr.get());
+ vectorized::IColumn* col_ptr = &nullable_column->get_nested_column();
+ auto data = datas[0];
+
reinterpret_cast<vectorized::ColumnVector<vectorized::Int64>*>(col_ptr)->insert_data(
+ reinterpret_cast<char*>(data), 0);
+ nullable_column->get_null_map_data().emplace_back(0);
+ };
+
+ for (int i = 0; i < result_data.size(); i++) {
+ TRow row = result_data[i];
+
+ for (int j = 0; j < _s_tbls_columns.size(); j++) {
+ if (_s_tbls_columns[j].type == TYPE_DATETIME) {
+ std::vector<void*> datas(1);
+ VecDateTimeValue src[1];
+ src[0].from_date_str(row.column_value[j].stringVal.data(),
+ row.column_value[j].stringVal.size());
+ datas[0] = src;
+ insert_datetime_value(j, datas, _routines_block.get());
+ } else {
+ insert_string_value(j, row.column_value[j].stringVal,
_routines_block.get());
+ }
+ }
+ }
+ return Status::OK();
+}
+
+Status SchemaRoutinesScanner::get_next_block(vectorized::Block* block, bool*
eos) {
+ if (!_is_init) {
+ return Status::InternalError("Used before initialized.");
+ }
+
+ if (nullptr == block || nullptr == eos) {
+ return Status::InternalError("input pointer is nullptr.");
+ }
+
+ if (_routines_block == nullptr) {
+ RETURN_IF_ERROR(get_block_from_fe());
+ _total_rows = _routines_block->rows();
+ }
+
+ if (_row_idx == _total_rows) {
+ *eos = true;
+ return Status::OK();
+ }
+
+ int current_batch_rows = std::min(_block_rows_limit, _total_rows -
_row_idx);
+ vectorized::MutableBlock mblock =
vectorized::MutableBlock::build_mutable_block(block);
+ mblock.add_rows(_routines_block.get(), _row_idx, current_batch_rows);
+ _row_idx += current_batch_rows;
+
+ *eos = _row_idx == _total_rows;
+ return Status::OK();
+}
+
+} // namespace doris
\ No newline at end of file
diff --git a/be/src/exec/schema_scanner/schema_routine_scanner.h
b/be/src/exec/schema_scanner/schema_routine_scanner.h
new file mode 100644
index 00000000000..543f9e8e8f6
--- /dev/null
+++ b/be/src/exec/schema_scanner/schema_routine_scanner.h
@@ -0,0 +1,52 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <vector>
+
+#include "common/status.h"
+#include "exec/schema_scanner.h"
+
+namespace doris {
+class RuntimeState;
+namespace vectorized {
+class Block;
+} // namespace vectorized
+
+class SchemaRoutinesScanner : public SchemaScanner {
+ ENABLE_FACTORY_CREATOR(SchemaRoutinesScanner);
+
+public:
+ SchemaRoutinesScanner();
+ ~SchemaRoutinesScanner() override = default;
+
+ Status start(RuntimeState* state) override;
+ Status get_next_block(vectorized::Block* block, bool* eos) override;
+
+ static std::vector<SchemaScanner::ColumnDesc> _s_tbls_columns;
+
+private:
+ Status get_block_from_fe();
+
+ int _block_rows_limit = 4096;
+ int _row_idx = 0;
+ int _total_rows = 0;
+ std::unique_ptr<vectorized::Block> _routines_block = nullptr;
+ int _rpc_timeout = 3000;
+};
+}; // namespace doris
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/plsql/metastore/PlsqlManager.java
b/fe/fe-core/src/main/java/org/apache/doris/plsql/metastore/PlsqlManager.java
index 87cbd7b58a3..dc91d344f63 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/plsql/metastore/PlsqlManager.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/plsql/metastore/PlsqlManager.java
@@ -22,6 +22,7 @@ import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
import org.apache.doris.persist.gson.GsonUtils;
+import com.google.common.collect.ImmutableList;
import com.google.common.collect.Maps;
import com.google.gson.annotations.SerializedName;
import org.apache.logging.log4j.LogManager;
@@ -34,7 +35,20 @@ import java.util.Map;
public class PlsqlManager implements Writable {
private static final Logger LOG = LogManager.getLogger(PlsqlManager.class);
-
+ public static final ImmutableList<String> ROUTINE_INFO_TITLE_NAMES = new
ImmutableList.Builder<String>()
+
.add("SPECIFIC_NAME").add("ROUTINE_CATALOG").add("ROUTINE_SCHEMA").add("ROUTINE_NAME")
+ .add("ROUTINE_TYPE")
+ .add("DTD_IDENTIFIER").add("ROUTINE_BODY")
+ .add("ROUTINE_DEFINITION").add("EXTERNAL_NAME")
+ .add("EXTERNAL_LANGUAGE").add("PARAMETER_STYLE")
+ .add("IS_DETERMINISTIC")
+ .add("SQL_DATA_ACCESS").add("SQL_PATH")
+ .add("SECURITY_TYPE").add("CREATED")
+ .add("LAST_ALTERED").add("SQL_MODE")
+ .add("ROUTINE_COMMENT")
+ .add("DEFINER").add("CHARACTER_SET_CLIENT")
+ .add("COLLATION_CONNECTION").add("DATABASE_COLLATION")
+ .build();
@SerializedName(value = "nameToStoredProcedures")
Map<PlsqlProcedureKey, PlsqlStoredProcedure> nameToStoredProcedures =
Maps.newConcurrentMap();
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
index f3858e18d37..60ae26df757 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
@@ -285,9 +285,9 @@ public class FrontendServiceImpl implements
FrontendService.Iface {
private MasterImpl masterImpl;
private ExecuteEnv exeEnv;
- // key is txn id,value is index of plan fragment instance, it's used by
multi table request plan
- private ConcurrentHashMap<Long, Integer>
multiTableFragmentInstanceIdIndexMap =
- new ConcurrentHashMap<>(64);
+ // key is txn id,value is index of plan fragment instance, it's used by
multi
+ // table request plan
+ private ConcurrentHashMap<Long, Integer>
multiTableFragmentInstanceIdIndexMap = new ConcurrentHashMap<>(64);
private final Map<TUniqueId, ConnectContext> proxyQueryIdToConnCtx =
new ConcurrentHashMap<>(64);
@@ -352,8 +352,9 @@ public class FrontendServiceImpl implements
FrontendService.Iface {
}
// check cooldownMetaId of all replicas are the same
List<Replica> replicas =
Env.getCurrentEnv().getTabletInvertedIndex().getReplicas(info.tablet_id);
- // FIXME(plat1ko): We only delete remote files when tablet is
under a stable state: enough replicas and
- // all replicas are alive. Are these conditions really sufficient
or necessary?
+ // FIXME(plat1ko): We only delete remote files when tablet is
under a stable
+ // state: enough replicas and
+ // all replicas are alive. Are these conditions really sufficient
or necessary?
if (replicas.size() < replicaNum) {
LOG.info("num replicas are not enough, tablet={}",
info.tablet_id);
return;
@@ -669,7 +670,6 @@ public class FrontendServiceImpl implements
FrontendService.Iface {
}
PatternMatcher finalMatcher = matcher;
-
ExecutorService executor = Executors.newSingleThreadExecutor();
Future<?> future = executor.submit(() -> {
@@ -1013,7 +1013,8 @@ public class FrontendServiceImpl implements
FrontendService.Iface {
LOG.debug("receive forwarded stmt {} from FE: {}",
params.getStmtId(), params.getClientNodeHost());
}
ConnectContext context = new ConnectContext(null, true);
- // Set current connected FE to the client address, so that we can know
where this request come from.
+ // Set current connected FE to the client address, so that we can know
where
+ // this request come from.
context.setCurrentConnectedFEIp(params.getClientNodeHost());
if (Config.isCloudMode() &&
!Strings.isNullOrEmpty(params.getCloudCluster())) {
context.setCloudCluster(params.getCloudCluster());
@@ -1359,7 +1360,8 @@ public class FrontendServiceImpl implements
FrontendService.Iface {
OlapTable table = (OlapTable) db.getTableOrMetaException(tbl,
TableType.OLAP);
tables.add(table);
}
- // if it has multi table, use multi table and update multi table
running transaction table ids
+ // if it has multi table, use multi table and update multi table
running
+ // transaction table ids
if (CollectionUtils.isNotEmpty(request.getTbls())) {
List<Long> multiTableIds =
tables.stream().map(Table::getId).collect(Collectors.toList());
Env.getCurrentGlobalTransactionMgr()
@@ -1555,7 +1557,8 @@ public class FrontendServiceImpl implements
FrontendService.Iface {
return result;
}
- // return true if commit success and publish success, return false if
publish timeout
+ // return true if commit success and publish success, return false if
publish
+ // timeout
private boolean loadTxnCommitImpl(TLoadTxnCommitRequest request) throws
UserException {
if (request.isSetAuthCode()) {
// TODO(cmy): find a way to check
@@ -1633,7 +1636,8 @@ public class FrontendServiceImpl implements
FrontendService.Iface {
return result;
}
- // return true if commit success and publish success, return false if
publish timeout
+ // return true if commit success and publish success, return false if
publish
+ // timeout
private boolean commitTxnImpl(TCommitTxnRequest request) throws
UserException {
/// Check required arg: user, passwd, db, txn_id, commit_infos
if (!request.isSetUser()) {
@@ -1698,7 +1702,6 @@ public class FrontendServiceImpl implements
FrontendService.Iface {
// Step 4: get timeout
long timeoutMs = request.isSetThriftRpcTimeoutMs() ?
request.getThriftRpcTimeoutMs() / 2 : 5000;
-
// Step 5: commit and publish
return Env.getCurrentGlobalTransactionMgr()
.commitAndPublishTransaction(db, tableList,
@@ -1949,7 +1952,8 @@ public class FrontendServiceImpl implements
FrontendService.Iface {
}
/**
- * For first-class multi-table scenarios, we should store the mapping
between Txn and data source type in a common
+ * For first-class multi-table scenarios, we should store the mapping
between
+ * Txn and data source type in a common
* place. Since there is only Kafka now, we should do this first.
*/
private void buildMultiTableStreamLoadTask(StreamLoadTask baseTaskInfo,
long txnId) {
@@ -2002,7 +2006,7 @@ public class FrontendServiceImpl implements
FrontendService.Iface {
ctx.setCurrentUserIdentity(currentUser.get(0));
}
LOG.info("one stream multi table load use cloud cluster {}",
request.getCloudCluster());
- //ctx.setCloudCluster();
+ // ctx.setCloudCluster();
if (!Strings.isNullOrEmpty(request.getCloudCluster())) {
if (Strings.isNullOrEmpty(request.getUser())) {
ctx.setCloudCluster(request.getCloudCluster());
@@ -2055,7 +2059,7 @@ public class FrontendServiceImpl implements
FrontendService.Iface {
RoutineLoadJob routineLoadJob =
Env.getCurrentEnv().getRoutineLoadManager()
.getRoutineLoadJobByMultiLoadTaskTxnId(request.getTxnId());
routineLoadJob.updateState(JobState.PAUSED, new
ErrorReason(InternalErrorCode.CANNOT_RESUME_ERR,
- "failed to get stream load plan, " +
exception.getMessage()), false);
+ "failed to get stream load plan, " +
exception.getMessage()), false);
} catch (UserException e) {
LOG.warn("catch update routine load job error.", e);
}
@@ -2112,9 +2116,9 @@ public class FrontendServiceImpl implements
FrontendService.Iface {
coord.setQueryType(TQueryType.LOAD);
TableIf table = httpStreamParams.getTable();
if (table instanceof OlapTable) {
- boolean isEnableMemtableOnSinkNode =
- ((OlapTable)
table).getTableProperty().getUseSchemaLightChange()
- ?
coord.getQueryOptions().isEnableMemtableOnSinkNode() : false;
+ boolean isEnableMemtableOnSinkNode = ((OlapTable)
table).getTableProperty().getUseSchemaLightChange()
+ ? coord.getQueryOptions().isEnableMemtableOnSinkNode()
+ : false;
coord.getQueryOptions().setEnableMemtableOnSinkNode(isEnableMemtableOnSinkNode);
}
httpStreamParams.setParams(coord.getStreamLoadPlan());
@@ -2239,7 +2243,7 @@ public class FrontendServiceImpl implements
FrontendService.Iface {
try {
if (!((OlapTable)
table).getTableProperty().getUseSchemaLightChange()
&& (request.getGroupCommitMode() != null
- && !request.getGroupCommitMode().equals("off_mode"))) {
+ &&
!request.getGroupCommitMode().equals("off_mode"))) {
throw new UserException(
"table light_schema_change is false, can't do stream
load with group commit mode");
}
@@ -2906,7 +2910,6 @@ public class FrontendServiceImpl implements
FrontendService.Iface {
throw new UserException("prev_commit_seq is not set");
}
-
// step 1: check auth
if (Strings.isNullOrEmpty(request.getToken())) {
checkSingleTablePasswordAndPrivs(request.getUser(),
request.getPasswd(), request.getDb(),
@@ -2999,7 +3002,8 @@ public class FrontendServiceImpl implements
FrontendService.Iface {
// getSnapshotImpl
private TGetSnapshotResult getSnapshotImpl(TGetSnapshotRequest request,
String clientIp)
throws UserException {
- // Step 1: Check all required arg: user, passwd, db, label_name,
snapshot_name, snapshot_type
+ // Step 1: Check all required arg: user, passwd, db, label_name,
snapshot_name,
+ // snapshot_type
if (!request.isSetUser()) {
throw new UserException("user is not set");
}
@@ -3080,7 +3084,8 @@ public class FrontendServiceImpl implements
FrontendService.Iface {
// restoreSnapshotImpl
private TRestoreSnapshotResult restoreSnapshotImpl(TRestoreSnapshotRequest
request, String clientIp)
throws UserException {
- // Step 1: Check all required arg: user, passwd, db, label_name,
repo_name, meta, info
+ // Step 1: Check all required arg: user, passwd, db, label_name,
repo_name,
+ // meta, info
if (!request.isSetUser()) {
throw new UserException("user is not set");
}
@@ -3347,7 +3352,6 @@ public class FrontendServiceImpl implements
FrontendService.Iface {
throw new UserException("prev_commit_seq is not set");
}
-
// step 1: check auth
if (Strings.isNullOrEmpty(request.getToken())) {
checkSingleTablePasswordAndPrivs(request.getUser(),
request.getPasswd(), request.getDb(),
@@ -3642,8 +3646,10 @@ public class FrontendServiceImpl implements
FrontendService.Iface {
List<String> allReqPartNames; // all request partitions
try {
taskLock.lock();
- // we dont lock the table. other thread in this txn will be
controled by taskLock.
- // if we have already replaced. dont do it again, but acquire the
recorded new partition directly.
+ // we dont lock the table. other thread in this txn will be
controled by
+ // taskLock.
+ // if we have already replaced. dont do it again, but acquire the
recorded new
+ // partition directly.
// if not by this txn, just let it fail naturally is ok.
List<Long> replacedPartIds =
overwriteManager.tryReplacePartitionIds(taskGroupId, partitionIds);
// here if replacedPartIds still have null. this will throw
exception.
@@ -3653,7 +3659,8 @@ public class FrontendServiceImpl implements
FrontendService.Iface {
.filter(i -> partitionIds.get(i) ==
replacedPartIds.get(i)) // equal means not replaced
.mapToObj(partitionIds::get)
.collect(Collectors.toList());
- // from here we ONLY deal the pending partitions. not include the
dealed(by others).
+ // from here we ONLY deal the pending partitions. not include the
dealed(by
+ // others).
if (!pendingPartitionIds.isEmpty()) {
// below two must have same order inner.
List<String> pendingPartitionNames =
olapTable.uncheckedGetPartNamesById(pendingPartitionIds);
@@ -3664,7 +3671,8 @@ public class FrontendServiceImpl implements
FrontendService.Iface {
overwriteManager.registerTaskInGroup(taskGroupId, taskId);
InsertOverwriteUtil.addTempPartitions(olapTable,
pendingPartitionNames, tempPartitionNames);
InsertOverwriteUtil.replacePartition(olapTable,
pendingPartitionNames, tempPartitionNames);
- // now temp partitions are bumped up and use new names. we get
their ids and record them.
+ // now temp partitions are bumped up and use new names. we get
their ids and
+ // record them.
List<Long> newPartitionIds = new ArrayList<Long>();
for (String newPartName : pendingPartitionNames) {
newPartitionIds.add(olapTable.getPartition(newPartName).getId());
@@ -3686,8 +3694,10 @@ public class FrontendServiceImpl implements
FrontendService.Iface {
taskLock.unlock();
}
- // build partition & tablets. now all partitions in allReqPartNames
are replaced an recorded.
- // so they won't be changed again. if other transaction changing it.
just let it fail.
+ // build partition & tablets. now all partitions in allReqPartNames
are replaced
+ // an recorded.
+ // so they won't be changed again. if other transaction changing it.
just let it
+ // fail.
List<TOlapTablePartition> partitions = Lists.newArrayList();
List<TTabletLocation> tablets = Lists.newArrayList();
PartitionInfo partitionInfo = olapTable.getPartitionInfo();
@@ -3796,7 +3806,7 @@ public class FrontendServiceImpl implements
FrontendService.Iface {
private TGetMetaResult getMetaImpl(TGetMetaRequest request, String
clientIp)
throws Exception {
- // Step 1: check fields
+ // Step 1: check fields
if (!request.isSetUser()) {
throw new UserException("user is not set");
}
@@ -3869,7 +3879,6 @@ public class FrontendServiceImpl implements
FrontendService.Iface {
}
}
-
@Override
public TGetColumnInfoResult getColumnInfo(TGetColumnInfoRequest request) {
TGetColumnInfoResult result = new TGetColumnInfoResult();
@@ -3990,12 +3999,12 @@ public class FrontendServiceImpl implements
FrontendService.Iface {
// FE only has one master, this should not be a problem
if (!Env.getCurrentEnv().isMaster()) {
LOG.error("failed to handle load stats report: not master,
backend:{}",
- clientAddr);
+ clientAddr);
return new TStatus(TStatusCode.NOT_MASTER);
}
LOG.info("receive load stats report request: {}, backend: {}, dbId:
{}, txnId: {}, label: {}",
- request, clientAddr, request.getDbId(), request.getTxnId(),
request.getLabel());
+ request, clientAddr, request.getDbId(), request.getTxnId(),
request.getLabel());
try {
byte[] receivedProtobufBytes = request.getPayload();
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java
index c84939ffbe6..7908bb1bb4b 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java
@@ -19,6 +19,7 @@ package org.apache.doris.tablefunction;
import org.apache.doris.analysis.UserIdentity;
import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.DatabaseIf;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.MTMV;
import org.apache.doris.catalog.SchemaTable;
@@ -38,6 +39,9 @@ import org.apache.doris.job.extensions.mtmv.MTMVJob;
import org.apache.doris.job.task.AbstractTask;
import org.apache.doris.mtmv.MTMVPartitionUtil;
import org.apache.doris.mysql.privilege.PrivPredicate;
+import org.apache.doris.plsql.metastore.PlsqlManager;
+import org.apache.doris.plsql.metastore.PlsqlProcedureKey;
+import org.apache.doris.plsql.metastore.PlsqlStoredProcedure;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.QeProcessorImpl;
import org.apache.doris.qe.QeProcessorImpl.QueryInfo;
@@ -92,6 +96,8 @@ public class MetadataGenerator {
private static final ImmutableMap<String, Integer>
WORKLOAD_GROUPS_COLUMN_TO_INDEX;
+ private static final ImmutableMap<String, Integer>
ROUTINE_INFO_COLUMN_TO_INDEX;
+
static {
ImmutableMap.Builder<String, Integer> activeQueriesbuilder = new
ImmutableMap.Builder();
List<Column> activeQueriesColList =
SchemaTable.TABLE_MAP.get("active_queries").getFullSchema();
@@ -105,6 +111,12 @@ public class MetadataGenerator {
workloadGroupBuilder.put(WorkloadGroupMgr.WORKLOAD_GROUP_PROC_NODE_TITLE_NAMES.get(i).toLowerCase(),
i);
}
WORKLOAD_GROUPS_COLUMN_TO_INDEX = workloadGroupBuilder.build();
+
+ ImmutableMap.Builder<String, Integer> routineInfoBuilder = new
ImmutableMap.Builder();
+ for (int i = 0; i < PlsqlManager.ROUTINE_INFO_TITLE_NAMES.size(); i++)
{
+
routineInfoBuilder.put(PlsqlManager.ROUTINE_INFO_TITLE_NAMES.get(i).toLowerCase(),
i);
+ }
+ ROUTINE_INFO_COLUMN_TO_INDEX = routineInfoBuilder.build();
}
public static TFetchSchemaTableDataResult
getMetadataTable(TFetchSchemaTableDataRequest request) throws TException {
@@ -167,6 +179,10 @@ public class MetadataGenerator {
result = workloadGroupsMetadataResult(schemaTableParams);
columnIndex = WORKLOAD_GROUPS_COLUMN_TO_INDEX;
break;
+ case ROUTINES_INFO:
+ result = routineInfoMetadataResult(schemaTableParams);
+ columnIndex = ROUTINE_INFO_COLUMN_TO_INDEX;
+ break;
default:
return errorResult("invalid schema table name.");
}
@@ -189,7 +205,7 @@ public class MetadataGenerator {
return errorResult("Iceberg metadata params is not set.");
}
- TIcebergMetadataParams icebergMetadataParams =
params.getIcebergMetadataParams();
+ TIcebergMetadataParams icebergMetadataParams =
params.getIcebergMetadataParams();
TIcebergQueryType icebergQueryType =
icebergMetadataParams.getIcebergQueryType();
IcebergMetadataCache icebergMetadataCache =
Env.getCurrentEnv().getExtMetaCacheMgr().getIcebergMetadataCache();
List<TRow> dataBatch = Lists.newArrayList();
@@ -376,7 +392,7 @@ public class MetadataGenerator {
private static TFetchSchemaTableDataResult
catalogsMetadataResult(TMetadataTableRequestParams params) {
TFetchSchemaTableDataResult result = new TFetchSchemaTableDataResult();
- List<CatalogIf> info =
Env.getCurrentEnv().getCatalogMgr().listCatalogs();
+ List<CatalogIf> info =
Env.getCurrentEnv().getCatalogMgr().listCatalogs();
List<TRow> dataBatch = Lists.newArrayList();
for (CatalogIf catalog : info) {
@@ -417,15 +433,15 @@ public class MetadataGenerator {
List<TRow> dataBatch = Lists.newArrayList();
for (List<String> rGroupsInfo : workloadGroupsInfo) {
TRow trow = new TRow();
- trow.addToColumnValue(new
TCell().setLongVal(Long.valueOf(rGroupsInfo.get(0)))); // id
- trow.addToColumnValue(new
TCell().setStringVal(rGroupsInfo.get(1))); // name
+ trow.addToColumnValue(new
TCell().setLongVal(Long.valueOf(rGroupsInfo.get(0)))); // id
+ trow.addToColumnValue(new
TCell().setStringVal(rGroupsInfo.get(1))); // name
trow.addToColumnValue(new
TCell().setLongVal(Long.valueOf(rGroupsInfo.get(2)))); // cpu_share
- trow.addToColumnValue(new
TCell().setStringVal(rGroupsInfo.get(3))); // mem_limit
- trow.addToColumnValue(new
TCell().setStringVal(rGroupsInfo.get(4))); // mem overcommit
+ trow.addToColumnValue(new
TCell().setStringVal(rGroupsInfo.get(3))); // mem_limit
+ trow.addToColumnValue(new
TCell().setStringVal(rGroupsInfo.get(4))); // mem overcommit
trow.addToColumnValue(new
TCell().setLongVal(Long.valueOf(rGroupsInfo.get(5)))); // max concurrent
trow.addToColumnValue(new
TCell().setLongVal(Long.valueOf(rGroupsInfo.get(6)))); // max queue size
trow.addToColumnValue(new
TCell().setLongVal(Long.valueOf(rGroupsInfo.get(7)))); // queue timeout
- trow.addToColumnValue(new
TCell().setStringVal(rGroupsInfo.get(8))); // cpu hard limit
+ trow.addToColumnValue(new
TCell().setStringVal(rGroupsInfo.get(8))); // cpu hard limit
trow.addToColumnValue(new
TCell().setLongVal(Long.valueOf(rGroupsInfo.get(9)))); // scan thread num
// max remote scan thread num
trow.addToColumnValue(new
TCell().setLongVal(Long.valueOf(rGroupsInfo.get(10))));
@@ -455,11 +471,11 @@ public class MetadataGenerator {
List<TRow> dataBatch = Lists.newArrayList();
for (List<String> policyRow : workloadPolicyList) {
TRow trow = new TRow();
- trow.addToColumnValue(new
TCell().setLongVal(Long.valueOf(policyRow.get(0)))); // id
- trow.addToColumnValue(new TCell().setStringVal(policyRow.get(1)));
// name
- trow.addToColumnValue(new TCell().setStringVal(policyRow.get(2)));
// condition
- trow.addToColumnValue(new TCell().setStringVal(policyRow.get(3)));
// action
- trow.addToColumnValue(new
TCell().setIntVal(Integer.valueOf(policyRow.get(4)))); // priority
+ trow.addToColumnValue(new
TCell().setLongVal(Long.valueOf(policyRow.get(0)))); // id
+ trow.addToColumnValue(new TCell().setStringVal(policyRow.get(1)));
// name
+ trow.addToColumnValue(new TCell().setStringVal(policyRow.get(2)));
// condition
+ trow.addToColumnValue(new TCell().setStringVal(policyRow.get(3)));
// action
+ trow.addToColumnValue(new
TCell().setIntVal(Integer.valueOf(policyRow.get(4)))); // priority
trow.addToColumnValue(new
TCell().setBoolVal(Boolean.valueOf(policyRow.get(5)))); // enabled
trow.addToColumnValue(new
TCell().setIntVal(Integer.valueOf(policyRow.get(6)))); // version
dataBatch.add(trow);
@@ -552,7 +568,7 @@ public class MetadataGenerator {
List<TFetchSchemaTableDataResult> relayResults =
forwardToOtherFrontends(replayFetchSchemaTableReq);
relayResults
.forEach(rs -> rs.getDataBatch()
- .forEach(row -> dataBatch.add(row)));
+ .forEach(row -> dataBatch.add(row)));
}
result.setDataBatch(dataBatch);
@@ -753,4 +769,58 @@ public class MetadataGenerator {
result.setStatus(new TStatus(TStatusCode.OK));
return result;
}
+
+ private static TFetchSchemaTableDataResult
routineInfoMetadataResult(TSchemaTableRequestParams params) {
+ if (!params.isSetCurrentUserIdent()) {
+ return errorResult("current user ident is not set.");
+ }
+
+ PlsqlManager plSqlClient = Env.getCurrentEnv().getPlsqlManager();
+
+ TFetchSchemaTableDataResult result = new TFetchSchemaTableDataResult();
+ List<TRow> dataBatch = Lists.newArrayList();
+
+ Map<PlsqlProcedureKey, PlsqlStoredProcedure> allProc =
plSqlClient.getAllPlsqlStoredProcedures();
+ for (Map.Entry<PlsqlProcedureKey, PlsqlStoredProcedure> entry :
allProc.entrySet()) {
+ PlsqlStoredProcedure proc = entry.getValue();
+ TRow trow = new TRow();
+ trow.addToColumnValue(new TCell().setStringVal(proc.getName()));
// SPECIFIC_NAME
+ trow.addToColumnValue(new
TCell().setStringVal(Long.toString(proc.getCatalogId()))); // ROUTINE_CATALOG
+ CatalogIf catalog =
Env.getCurrentEnv().getCatalogMgr().getCatalog(proc.getCatalogId());
+ if (catalog != null) {
+ DatabaseIf db = catalog.getDbNullable(proc.getDbId());
+ if (db != null) {
+ trow.addToColumnValue(new
TCell().setStringVal(db.getFullName())); // ROUTINE_SCHEMA
+ } else {
+ trow.addToColumnValue(new TCell().setStringVal("")); //
ROUTINE_SCHEMA
+ }
+ } else {
+ trow.addToColumnValue(new TCell().setStringVal("")); //
ROUTINE_SCHEMA
+ }
+ trow.addToColumnValue(new TCell().setStringVal(proc.getName()));
// ROUTINE_NAME
+ trow.addToColumnValue(new TCell().setStringVal("PROCEDURE")); //
ROUTINE_TYPE
+ trow.addToColumnValue(new TCell().setStringVal("")); //
DTD_IDENTIFIER
+ trow.addToColumnValue(new TCell().setStringVal(proc.getSource()));
// ROUTINE_BODY
+ trow.addToColumnValue(new TCell().setStringVal("")); //
ROUTINE_DEFINITION
+ trow.addToColumnValue(new TCell().setStringVal("NULL")); //
EXTERNAL_NAME
+ trow.addToColumnValue(new TCell().setStringVal("")); //
EXTERNAL_LANGUAGE
+ trow.addToColumnValue(new TCell().setStringVal("SQL")); //
PARAMETER_STYLE
+ trow.addToColumnValue(new TCell().setStringVal("")); //
IS_DETERMINISTIC
+ trow.addToColumnValue(new TCell().setStringVal("")); //
SQL_DATA_ACCESS
+ trow.addToColumnValue(new TCell().setStringVal("NULL")); //
SQL_PATH
+ trow.addToColumnValue(new TCell().setStringVal("DEFINER")); //
SECURITY_TYPE
+ trow.addToColumnValue(new
TCell().setStringVal(proc.getCreateTime())); // CREATED
+ trow.addToColumnValue(new
TCell().setStringVal(proc.getModifyTime())); // LAST_ALTERED
+ trow.addToColumnValue(new TCell().setStringVal("")); // SQ_MODE
+ trow.addToColumnValue(new TCell().setStringVal("")); //
ROUTINE_COMMENT
+ trow.addToColumnValue(new
TCell().setStringVal(proc.getOwnerName())); // DEFINER
+ trow.addToColumnValue(new TCell().setStringVal("")); //
CHARACTER_SET_CLIENT
+ trow.addToColumnValue(new TCell().setStringVal("")); //
COLLATION_CONNECTION
+ trow.addToColumnValue(new TCell().setStringVal("")); //
DATABASE_COLLATION
+ dataBatch.add(trow);
+ }
+ result.setDataBatch(dataBatch);
+ result.setStatus(new TStatus(TStatusCode.OK));
+ return result;
+ }
}
diff --git a/gensrc/thrift/FrontendService.thrift
b/gensrc/thrift/FrontendService.thrift
index 568b174c11c..ef90e97b41b 100644
--- a/gensrc/thrift/FrontendService.thrift
+++ b/gensrc/thrift/FrontendService.thrift
@@ -925,6 +925,7 @@ enum TSchemaTableName {
METADATA_TABLE = 1, // tvf
ACTIVE_QUERIES = 2, // db information_schema's table
WORKLOAD_GROUPS = 3, // db information_schema's table
+ ROUTINES_INFO = 4, // db information_schema's table
}
struct TMetadataTableRequestParams {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]