github-actions[bot] commented on code in PR #61382: URL: https://github.com/apache/doris/pull/61382#discussion_r2985601233
########## fe/fe-core/src/main/java/org/apache/doris/catalog/stream/TableStreamManager.java: ########## @@ -0,0 +1,209 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.catalog.stream; + +import org.apache.doris.catalog.Database; +import org.apache.doris.catalog.DatabaseIf; +import org.apache.doris.catalog.Env; +import org.apache.doris.catalog.Table; +import org.apache.doris.catalog.TableIf; +import org.apache.doris.common.io.Text; +import org.apache.doris.common.io.Writable; +import org.apache.doris.common.lock.MonitoredReentrantReadWriteLock; +import org.apache.doris.persist.gson.GsonUtils; +import org.apache.doris.thrift.TCell; +import org.apache.doris.thrift.TRow; + +import com.google.common.base.Preconditions; +import com.google.gson.annotations.SerializedName; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; + +public class TableStreamManager implements Writable { + private static final Logger LOG = LogManager.getLogger(TableStreamManager.class); + @SerializedName(value = "dbStreamMap") + private Map<Long, Set<Long>> dbStreamMap; + protected MonitoredReentrantReadWriteLock rwLock; + + public TableStreamManager() { + this.rwLock = new MonitoredReentrantReadWriteLock(true); + this.dbStreamMap = new HashMap<>(); + } + + public void addTableStream(BaseTableStream stream) { + rwLock.writeLock().lock(); + try { + dbStreamMap.computeIfAbsent(stream.getDatabase().getId(), k -> new HashSet<>()).add(stream.getId()); + } finally { + rwLock.writeLock().unlock(); + } + } + + public void removeTableStream(BaseTableStream stream) { + rwLock.writeLock().lock(); + try { + Optional.ofNullable(dbStreamMap.get(stream.getDatabase().getId())) + .ifPresent(set -> set.remove(stream.getId())); + } finally { + rwLock.writeLock().unlock(); + } + } + + @Override + public void write(DataOutput out) throws IOException { + String json = GsonUtils.GSON.toJson(this); + Text.writeString(out, json); + } + + public static TableStreamManager read(DataInput in) throws IOException { + String json = Text.readString(in); + return GsonUtils.GSON.fromJson(json, TableStreamManager.class); + } + + public Set<Long> getTableStreamIds(DatabaseIf db) { + Set<Long> result = new HashSet<>(); + rwLock.readLock().lock(); + try { + result.addAll(dbStreamMap.getOrDefault(db.getId(), new HashSet<>())); + } finally { + rwLock.readLock().unlock(); + } + return result; + } + + public void fillTableStreamValuesMetadataResult(List<TRow> dataBatch) { + Map<Long, Set<Long>> copiedMap = new HashMap<>(); + rwLock.readLock().lock(); + try { + for (Map.Entry<Long, Set<Long>> e : dbStreamMap.entrySet()) { + copiedMap.put(e.getKey(), new HashSet<>(e.getValue())); + } + } finally { + rwLock.readLock().unlock(); + } + for (Map.Entry<Long, Set<Long>> entry : copiedMap.entrySet()) { + Optional<Database> db = Env.getCurrentInternalCatalog().getDb(entry.getKey()); + if (db.isPresent()) { + for (Long tableId : entry.getValue()) { + Optional<Table> table = db.get().getTable(tableId); + if (!table.isPresent()) { + if (LOG.isDebugEnabled()) { + LOG.warn("invalid stream id: {}, db: {}", tableId, db.get().getFullName()); Review Comment: Bug: `LOG.warn` inside `LOG.isDebugEnabled()` guard. This means the warning will only be emitted when DEBUG logging is enabled, which is almost never in production. This should either be: - `if (LOG.isDebugEnabled()) { LOG.debug(...) }` — if you want debug-only logging, or - Just `LOG.warn(...)` without the guard — if this is truly a warning-worthy situation (invalid stream ID). Given this indicates stale data in `dbStreamMap`, a plain `LOG.warn(...)` without the `isDebugEnabled` guard seems more appropriate. ########## fe/fe-core/src/main/java/org/apache/doris/catalog/stream/TableStreamManager.java: ########## @@ -0,0 +1,209 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.catalog.stream; + +import org.apache.doris.catalog.Database; +import org.apache.doris.catalog.DatabaseIf; +import org.apache.doris.catalog.Env; +import org.apache.doris.catalog.Table; +import org.apache.doris.catalog.TableIf; +import org.apache.doris.common.io.Text; +import org.apache.doris.common.io.Writable; +import org.apache.doris.common.lock.MonitoredReentrantReadWriteLock; +import org.apache.doris.persist.gson.GsonUtils; +import org.apache.doris.thrift.TCell; +import org.apache.doris.thrift.TRow; + +import com.google.common.base.Preconditions; +import com.google.gson.annotations.SerializedName; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; + +public class TableStreamManager implements Writable { + private static final Logger LOG = LogManager.getLogger(TableStreamManager.class); + @SerializedName(value = "dbStreamMap") + private Map<Long, Set<Long>> dbStreamMap; + protected MonitoredReentrantReadWriteLock rwLock; + + public TableStreamManager() { + this.rwLock = new MonitoredReentrantReadWriteLock(true); + this.dbStreamMap = new HashMap<>(); + } + + public void addTableStream(BaseTableStream stream) { + rwLock.writeLock().lock(); + try { + dbStreamMap.computeIfAbsent(stream.getDatabase().getId(), k -> new HashSet<>()).add(stream.getId()); + } finally { + rwLock.writeLock().unlock(); + } + } + + public void removeTableStream(BaseTableStream stream) { + rwLock.writeLock().lock(); + try { + Optional.ofNullable(dbStreamMap.get(stream.getDatabase().getId())) + .ifPresent(set -> set.remove(stream.getId())); + } finally { + rwLock.writeLock().unlock(); + } + } + + @Override + public void write(DataOutput out) throws IOException { + String json = GsonUtils.GSON.toJson(this); + Text.writeString(out, json); + } + + public static TableStreamManager read(DataInput in) throws IOException { + String json = Text.readString(in); + return GsonUtils.GSON.fromJson(json, TableStreamManager.class); + } + + public Set<Long> getTableStreamIds(DatabaseIf db) { + Set<Long> result = new HashSet<>(); + rwLock.readLock().lock(); + try { + result.addAll(dbStreamMap.getOrDefault(db.getId(), new HashSet<>())); + } finally { + rwLock.readLock().unlock(); + } + return result; + } + + public void fillTableStreamValuesMetadataResult(List<TRow> dataBatch) { + Map<Long, Set<Long>> copiedMap = new HashMap<>(); + rwLock.readLock().lock(); + try { + for (Map.Entry<Long, Set<Long>> e : dbStreamMap.entrySet()) { + copiedMap.put(e.getKey(), new HashSet<>(e.getValue())); + } + } finally { + rwLock.readLock().unlock(); + } + for (Map.Entry<Long, Set<Long>> entry : copiedMap.entrySet()) { + Optional<Database> db = Env.getCurrentInternalCatalog().getDb(entry.getKey()); + if (db.isPresent()) { + for (Long tableId : entry.getValue()) { + Optional<Table> table = db.get().getTable(tableId); + if (!table.isPresent()) { + if (LOG.isDebugEnabled()) { + LOG.warn("invalid stream id: {}, db: {}", tableId, db.get().getFullName()); + } + continue; + } + Preconditions.checkArgument(table.get() instanceof BaseTableStream); + BaseTableStream stream = (BaseTableStream) table.get(); Review Comment: Using `Preconditions.checkArgument` here will crash the FE if a non-stream table ID somehow ends up in `dbStreamMap`. Since this is in a metadata query path (information_schema), crashing is disproportionate. Consider either: - Using a defensive `if (!(table.get() instanceof BaseTableStream)) { LOG.warn(...); continue; }`, or - Using `DORIS_CHECK` equivalent if crash-on-invariant-violation is truly desired. The same applies at the other `Preconditions.checkArgument` calls in this class (line for baseTableQualifiers.size() == 3, and in `fillStreamConsumptionValuesMetadataResult`). ########## fe/fe-core/src/main/java/org/apache/doris/catalog/stream/TableStreamManager.java: ########## @@ -0,0 +1,209 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.catalog.stream; + +import org.apache.doris.catalog.Database; +import org.apache.doris.catalog.DatabaseIf; +import org.apache.doris.catalog.Env; +import org.apache.doris.catalog.Table; +import org.apache.doris.catalog.TableIf; +import org.apache.doris.common.io.Text; +import org.apache.doris.common.io.Writable; +import org.apache.doris.common.lock.MonitoredReentrantReadWriteLock; +import org.apache.doris.persist.gson.GsonUtils; +import org.apache.doris.thrift.TCell; +import org.apache.doris.thrift.TRow; + +import com.google.common.base.Preconditions; +import com.google.gson.annotations.SerializedName; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; + +public class TableStreamManager implements Writable { + private static final Logger LOG = LogManager.getLogger(TableStreamManager.class); + @SerializedName(value = "dbStreamMap") + private Map<Long, Set<Long>> dbStreamMap; + protected MonitoredReentrantReadWriteLock rwLock; + + public TableStreamManager() { + this.rwLock = new MonitoredReentrantReadWriteLock(true); + this.dbStreamMap = new HashMap<>(); + } + + public void addTableStream(BaseTableStream stream) { + rwLock.writeLock().lock(); + try { + dbStreamMap.computeIfAbsent(stream.getDatabase().getId(), k -> new HashSet<>()).add(stream.getId()); + } finally { + rwLock.writeLock().unlock(); + } + } + + public void removeTableStream(BaseTableStream stream) { + rwLock.writeLock().lock(); + try { + Optional.ofNullable(dbStreamMap.get(stream.getDatabase().getId())) + .ifPresent(set -> set.remove(stream.getId())); + } finally { + rwLock.writeLock().unlock(); + } + } + + @Override + public void write(DataOutput out) throws IOException { + String json = GsonUtils.GSON.toJson(this); + Text.writeString(out, json); + } + + public static TableStreamManager read(DataInput in) throws IOException { + String json = Text.readString(in); + return GsonUtils.GSON.fromJson(json, TableStreamManager.class); + } + + public Set<Long> getTableStreamIds(DatabaseIf db) { + Set<Long> result = new HashSet<>(); + rwLock.readLock().lock(); + try { + result.addAll(dbStreamMap.getOrDefault(db.getId(), new HashSet<>())); + } finally { + rwLock.readLock().unlock(); + } + return result; + } + + public void fillTableStreamValuesMetadataResult(List<TRow> dataBatch) { + Map<Long, Set<Long>> copiedMap = new HashMap<>(); + rwLock.readLock().lock(); + try { + for (Map.Entry<Long, Set<Long>> e : dbStreamMap.entrySet()) { + copiedMap.put(e.getKey(), new HashSet<>(e.getValue())); + } + } finally { + rwLock.readLock().unlock(); + } + for (Map.Entry<Long, Set<Long>> entry : copiedMap.entrySet()) { + Optional<Database> db = Env.getCurrentInternalCatalog().getDb(entry.getKey()); + if (db.isPresent()) { + for (Long tableId : entry.getValue()) { + Optional<Table> table = db.get().getTable(tableId); + if (!table.isPresent()) { + if (LOG.isDebugEnabled()) { + LOG.warn("invalid stream id: {}, db: {}", tableId, db.get().getFullName()); + } + continue; + } + Preconditions.checkArgument(table.get() instanceof BaseTableStream); + BaseTableStream stream = (BaseTableStream) table.get(); + if (stream.readLockIfExist()) { + try { + TRow trow = new TRow(); + // DB_NAME + trow.addToColumnValue(new TCell().setStringVal(stream.getDatabase().getFullName())); + // STREAM_NAME + trow.addToColumnValue(new TCell().setStringVal(stream.getName())); + // STREAM_ID + trow.addToColumnValue(new TCell().setLongVal(stream.getId())); + // STREAM_TYPE + trow.addToColumnValue(new TCell().setStringVal(stream.getTableStreamType())); + // CONSUME_TYPE + trow.addToColumnValue(new TCell().setStringVal(stream.getConsumeType())); + // STREAM_COMMENT + trow.addToColumnValue(new TCell().setStringVal(stream.getComment())); + TableIf baseTable = stream.getBaseTableNullable(); + if (baseTable == null) { + // BASE_TABLE_NAME + trow.addToColumnValue(new TCell().setStringVal("N/A")); + // BASE_TABLE_DB + trow.addToColumnValue(new TCell().setStringVal("N/A")); + // BASE_TABLE_CTL + trow.addToColumnValue(new TCell().setStringVal("N/A")); + // BASE_TABLE_TYPE + trow.addToColumnValue(new TCell().setStringVal("N/A")); + } else { + List<String> baseTableQualifiers = baseTable.getFullQualifiers(); + Preconditions.checkArgument(baseTableQualifiers.size() == 3); + // BASE_TABLE_NAME + trow.addToColumnValue(new TCell().setStringVal(baseTableQualifiers.get(2))); + // BASE_TABLE_DB + trow.addToColumnValue(new TCell().setStringVal(baseTableQualifiers.get(1))); + // BASE_TABLE_CTL + trow.addToColumnValue(new TCell().setStringVal(baseTableQualifiers.get(0))); + // BASE_TABLE_TYPE + trow.addToColumnValue(new TCell().setStringVal(baseTable.getType().name())); + } + // ENABLED + trow.addToColumnValue(new TCell().setBoolVal(!stream.isDisabled())); + // IS_STALE + trow.addToColumnValue(new TCell().setBoolVal(stream.isStale())); + // STALE_REASON + trow.addToColumnValue(new TCell().setStringVal(stream.getStaleReason())); + dataBatch.add(trow); + } finally { + stream.readUnlock(); + } + } + } + } + } + } + + public void fillStreamConsumptionValuesMetadataResult(List<TRow> dataBatch) { + Map<Long, Set<Long>> copiedMap = new HashMap<>(); + rwLock.readLock().lock(); + try { + for (Map.Entry<Long, Set<Long>> e : dbStreamMap.entrySet()) { + copiedMap.put(e.getKey(), new HashSet<>(e.getValue())); + } + } finally { + rwLock.readLock().unlock(); + } + for (Map.Entry<Long, Set<Long>> entry : copiedMap.entrySet()) { + Optional<Database> db = Env.getCurrentInternalCatalog().getDb(entry.getKey()); + if (db.isPresent()) { + for (Long tableId : entry.getValue()) { + Optional<Table> table = db.get().getTable(tableId); + if (!table.isPresent()) { + if (LOG.isDebugEnabled()) { + LOG.warn("invalid stream id: {}, db: {}", tableId, db.get().getFullName()); Review Comment: Same bug as above: `LOG.warn` inside `LOG.isDebugEnabled()` guard. This pattern appears in both `fillTableStreamValuesMetadataResult` and `fillStreamConsumptionValuesMetadataResult`. ########## fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java: ########## @@ -994,6 +1007,9 @@ private void dropTableInternal(Database db, Table table, boolean isView, boolean if (table instanceof OlapTable) { Env.getCurrentEnv().getMtmvService().dropTable(table); } + if (table instanceof BaseTableStream) { + Env.getCurrentEnv().getTableStreamManager().removeTableStream((BaseTableStream) table); + } Review Comment: Issue: Double removal of stream from `TableStreamManager`. `removeTableStream` is called here in `dropTableInternal` (line 1011), but it was already called inside `unprotectDropTable` (line 1045) which is invoked at line 990 above. The second call is a no-op but indicates unclear ownership of this responsibility. Consider removing one of the two calls to avoid confusion — the one in `unprotectDropTable` is the canonical location since it handles both normal and replay paths. ########## fe/fe-core/src/main/java/org/apache/doris/catalog/stream/BaseTableStream.java: ########## @@ -0,0 +1,166 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.catalog.stream; + +import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.Table; +import org.apache.doris.catalog.TableIf; +import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.io.Text; +import org.apache.doris.common.util.PropertyAnalyzer; +import org.apache.doris.persist.gson.GsonUtils; +import org.apache.doris.thrift.TRow; + +import com.google.common.collect.ImmutableList; +import com.google.gson.annotations.SerializedName; + +import java.io.DataOutput; +import java.io.IOException; +import java.util.List; +import java.util.Map; + +public abstract class BaseTableStream extends Table { + public enum StreamConsumeType { + DEFAULT, + APPEND_ONLY, + MIN_DELTA, + UNKNOWN; + public static StreamConsumeType getType(String typeName) { + if (typeName == null) { + return UNKNOWN; + } + typeName = typeName.toLowerCase(); + switch (typeName) { + case "default": + return DEFAULT; + case "append_only": + return APPEND_ONLY; + case "min_delta": + return MIN_DELTA; + default: + return UNKNOWN; + } + } + } + + private static ImmutableList<TableType> supportedTableTypeList = ImmutableList.of(TableType.OLAP); + + @SerializedName("sct") + protected StreamConsumeType streamConsumeType = StreamConsumeType.DEFAULT; + + @SerializedName("sir") + protected boolean showInitialRows; + + @SerializedName("sti") + protected StreamTableInfo streamTableInfo; + + @SerializedName("d") + private boolean disabled; + + @SerializedName("s") + private boolean stale; + + @SerializedName("sr") + private String staleReason = "N/A"; + + protected volatile TableIf baseTable; + Review Comment: Minor: The `volatile` on `baseTable` provides visibility across threads but `getBaseTableNullable()` has a check-then-act race: two threads could both see `baseTable == null` and both call `streamTableInfo.getTableNullable()`. Since `getTableNullable()` is idempotent and `TableIf` assignment is atomic for references, this is benign — but worth a comment noting this is intentional (similar to the double-checked locking pattern without synchronization). ########## be/src/information_schema/schema_table_streams_scanner.cpp: ########## @@ -0,0 +1,141 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "information_schema/schema_table_streams_scanner.h" + +#include <cstddef> + +#include "core/block/block.h" +#include "core/data_type/data_type_factory.hpp" +#include "core/data_type/define_primitive_type.h" +#include "core/string_ref.h" +#include "gen_cpp/FrontendService_types.h" +#include "information_schema/schema_helper.h" +#include "runtime/exec_env.h" +#include "runtime/runtime_state.h" +#include "util/client_cache.h" +#include "util/thrift_rpc_helper.h" + +namespace doris { + +std::vector<SchemaScanner::ColumnDesc> SchemaTableStreamsScanner::_s_table_streams_columns = { + {"DB_NAME", TYPE_VARCHAR, sizeof(StringRef), true}, + {"STREAM_NAME", TYPE_VARCHAR, sizeof(StringRef), true}, + {"STREAM_ID", TYPE_BIGINT, sizeof(int64_t), true}, + {"STREAM_TYPE", TYPE_VARCHAR, sizeof(StringRef), true}, + {"CONSUME_TYPE", TYPE_VARCHAR, sizeof(StringRef), true}, + {"STREAM_COMMENT", TYPE_STRING, sizeof(StringRef), true}, + {"BASE_TABLE_NAME", TYPE_VARCHAR, sizeof(StringRef), true}, + {"BASE_TABLE_DB", TYPE_VARCHAR, sizeof(StringRef), true}, + {"BASE_TABLE_CTL", TYPE_VARCHAR, sizeof(StringRef), true}, + {"BASE_TABLE_TYPE", TYPE_VARCHAR, sizeof(StringRef), true}, + {"ENABLED", TYPE_BOOLEAN, sizeof(int8_t), true}, + {"IS_STALE", TYPE_BOOLEAN, sizeof(int8_t), true}, + {"STALE_REASON", TYPE_STRING, sizeof(StringRef), true}, +}; + +SchemaTableStreamsScanner::SchemaTableStreamsScanner() + : SchemaScanner(_s_table_streams_columns, TSchemaTableType::SCH_TABLE_STREAMS) {} + +SchemaTableStreamsScanner::~SchemaTableStreamsScanner() = default; + +Status SchemaTableStreamsScanner::start(RuntimeState* state) { + _block_rows_limit = state->batch_size(); + _rpc_timeout_ms = state->execution_timeout() * 1000; + return Status::OK(); +} + +Status SchemaTableStreamsScanner::_get_table_streams_block_from_fe() { + TNetworkAddress master_addr = ExecEnv::GetInstance()->cluster_info()->master_fe_addr; + + TSchemaTableRequestParams schema_table_request_params; + TFetchSchemaTableDataRequest request; + request.__set_schema_table_name(TSchemaTableName::TABLE_STREAMS); + request.__set_schema_table_params(schema_table_request_params); + + TFetchSchemaTableDataResult result; + + RETURN_IF_ERROR(ThriftRpcHelper::rpc<FrontendServiceClient>( + master_addr.hostname, master_addr.port, + [&request, &result](FrontendServiceConnection& client) { + client->fetchSchemaTableData(result, request); + }, + _rpc_timeout_ms)); + + Status status(Status::create(result.status)); + if (!status.ok()) { + LOG(WARNING) << "fetch transactions from FE failed, errmsg=" << status; + return status; Review Comment: Nit: Copy-paste error in log message. This says `"fetch transactions from FE failed"` but this is fetching **streams**, not transactions. Should be `"fetch table streams from FE failed"`. ########## fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/ShowCreateStreamCommand.java: ########## @@ -0,0 +1,124 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.trees.plans.commands; + +import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.DatabaseIf; +import org.apache.doris.catalog.Env; +import org.apache.doris.catalog.ScalarType; +import org.apache.doris.catalog.TableIf; +import org.apache.doris.catalog.stream.BaseTableStream; +import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.ErrorCode; +import org.apache.doris.common.ErrorReport; +import org.apache.doris.info.TableNameInfo; +import org.apache.doris.mysql.privilege.PrivPredicate; +import org.apache.doris.nereids.trees.plans.PlanType; +import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor; +import org.apache.doris.qe.ConnectContext; +import org.apache.doris.qe.ShowResultSet; +import org.apache.doris.qe.ShowResultSetMetaData; +import org.apache.doris.qe.StmtExecutor; + +import com.google.common.collect.Lists; + +import java.util.List; + +/** + * Represents the command for SHOW CREATE STREAM. + */ +public class ShowCreateStreamCommand extends ShowCommand { + private static final ShowResultSetMetaData STREAM_META_DATA = + ShowResultSetMetaData.builder() + .addColumn(new Column("Stream", ScalarType.createVarchar(20))) + .addColumn(new Column("Create Stream", ScalarType.createVarchar(30))) + .build(); + + private final TableNameInfo tblNameInfo; + + public ShowCreateStreamCommand(TableNameInfo tableNameInfo) { + super(PlanType.SHOW_CREATE_STREAM_COMMAND); + this.tblNameInfo = tableNameInfo; + } + + private void validate(ConnectContext ctx) throws AnalysisException { + tblNameInfo.analyze(ctx); + + TableIf tableIf = Env.getCurrentEnv().getCatalogMgr() + .getCatalogOrAnalysisException(tblNameInfo.getCtl()) + .getDbOrAnalysisException(tblNameInfo.getDb()).getTableOrAnalysisException(tblNameInfo.getTbl()); + + if (!(tableIf instanceof BaseTableStream)) { + ErrorReport.reportAnalysisException(tblNameInfo.toFullyQualified() + + " is not a stream, type:" + tableIf.getType()); Review Comment: Issue: `ErrorReport.reportAnalysisException()` is called with a raw string without an `ErrorCode`. The first argument should be an `ErrorCode` enum value, not a free-form string. Looking at the pattern used for SHOW CREATE TABLE in `ShowCreateTableCommand`, it uses `ErrorCode.ERR_WRONG_OBJECT`. This should follow the same pattern: ```java ErrorReport.reportAnalysisException(ErrorCode.ERR_WRONG_OBJECT, tblNameInfo.getDb(), tblNameInfo.getTbl(), "STREAM", ...); ``` ########## be/src/information_schema/schema_table_stream_consumption_scanner.cpp: ########## @@ -0,0 +1,141 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "information_schema/schema_table_stream_consumption_scanner.h" + +#include <cstddef> + +#include "core/block/block.h" +#include "core/data_type/data_type_factory.hpp" +#include "core/data_type/define_primitive_type.h" +#include "core/string_ref.h" +#include "gen_cpp/FrontendService_types.h" +#include "information_schema/schema_helper.h" +#include "runtime/exec_env.h" +#include "runtime/runtime_state.h" +#include "util/client_cache.h" +#include "util/thrift_rpc_helper.h" + +namespace doris { + +std::vector<SchemaScanner::ColumnDesc> + SchemaTableStreamConsumptionScanner::_s_table_stream_consumption_columns = { + {"DB_NAME", TYPE_VARCHAR, sizeof(StringRef), true}, + {"STREAM_NAME", TYPE_VARCHAR, sizeof(StringRef), true}, + {"STREAM_ID", TYPE_BIGINT, sizeof(int64_t), true}, + {"UNIT", TYPE_VARCHAR, sizeof(StringRef), true}, + {"CONSUMPTION_STATUS", TYPE_VARCHAR, sizeof(StringRef), true}, + {"LAG", TYPE_VARCHAR, sizeof(StringRef), true}, + {"LAST_CONSUMPTION_TIME", TYPE_BIGINT, sizeof(int64_t), true}, +}; + +SchemaTableStreamConsumptionScanner::SchemaTableStreamConsumptionScanner() + : SchemaScanner(_s_table_stream_consumption_columns, + TSchemaTableType::SCH_TABLE_STREAM_CONSUMPTION) {} + +SchemaTableStreamConsumptionScanner::~SchemaTableStreamConsumptionScanner() = default; + +Status SchemaTableStreamConsumptionScanner::start(RuntimeState* state) { + _block_rows_limit = state->batch_size(); + _rpc_timeout_ms = state->execution_timeout() * 1000; + return Status::OK(); +} + +Status SchemaTableStreamConsumptionScanner::_get_table_stream_consumption_block_from_fe() { + TNetworkAddress master_addr = ExecEnv::GetInstance()->cluster_info()->master_fe_addr; + + TSchemaTableRequestParams schema_table_request_params; + TFetchSchemaTableDataRequest request; + request.__set_schema_table_name(TSchemaTableName::TABLE_STREAM_CONSUMPTION); + request.__set_schema_table_params(schema_table_request_params); + + TFetchSchemaTableDataResult result; + + RETURN_IF_ERROR(ThriftRpcHelper::rpc<FrontendServiceClient>( + master_addr.hostname, master_addr.port, + [&request, &result](FrontendServiceConnection& client) { + client->fetchSchemaTableData(result, request); + }, + _rpc_timeout_ms)); + + Status status(Status::create(result.status)); + if (!status.ok()) { + LOG(WARNING) << "fetch transactions from FE failed, errmsg=" << status; + return status; Review Comment: Nit: Same copy-paste error — log message says `"fetch transactions from FE failed"` but should say `"fetch stream consumption from FE failed"`. ########## fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java: ########## @@ -3902,4 +3923,72 @@ public Map<String, Long> getUsedDataQuota() { public void onClose() { Env.getCurrentEnv().getRefreshManager().removeFromRefreshMap(getId()); } + + public void createTableStream(CreateStreamCommand command) throws DdlException { + if (!Config.enable_table_stream) { + throw new DdlException("Table Stream is experimental." + + " Please set enable_table_stream=true to enable it."); + } + CreateStreamInfo createStreamInfo = command.getCreateStreamInfo(); + String dbName = createStreamInfo.getStreamName().getDb(); + String streamName = createStreamInfo.getStreamName().getTbl(); + // check if db exists + Database db = getDbOrDdlException(dbName); + // check if table exists in db + boolean replace = false; + if (db.getTable(streamName).isPresent()) { + if (createStreamInfo.isIfNotExists()) { + LOG.info("create stream[{}] which already exists", streamName); + return; + } else if (createStreamInfo.isOrReplace()) { + replace = true; + LOG.info("stream[{}] already exists, need to replace it", streamName); + } else { + ErrorReport.reportDdlException(ErrorCode.ERR_TABLE_EXISTS_ERROR, streamName); + } + } + if (replace) { + throw new DdlException("do not support replace currently"); + } else { + // get base table + CatalogIf baseCatalog; + if (Strings.isNullOrEmpty(createStreamInfo.getBaseTableName().getCtl())) { + baseCatalog = this; + } else { + baseCatalog = Env.getCurrentEnv().getCatalogMgr() + .getCatalogOrDdlException(createStreamInfo.getBaseTableName().getCtl()); + } + BaseTableStream newStream; + TableIf baseTable = baseCatalog.getDbOrDdlException(createStreamInfo.getBaseTableName().getDb()) + .getTableOrDdlException(createStreamInfo.getBaseTableName().getTbl()); + // lock base table for stream init + baseTable.readLock(); + try { + Map<String, String> properties = createStreamInfo.getProperties(); + // build new stream + newStream = new TableStreamBuildFactory() + .withName(streamName) + .withBaseTable(baseTable) + .build(); + newStream.setComment(createStreamInfo.getComment()); + try { + newStream.setProperties(properties); + } catch (AnalysisException e) { + throw new DdlException(e.getMessage(), e); + } + if (properties != null && !properties.isEmpty()) { + // before here, all properties should be checked + throw new DdlException("Unknown properties: " + properties); + } + newStream.setId((Env.getCurrentEnv().getNextId())); + } finally { Review Comment: Issue: `newStream.setId()` is called after `baseTable.readUnlock()`, outside the base table read lock but before `db.createTableWithLock()`. While `getNextId()` is atomic, there's a TOCTOU issue: between checking the stream name doesn't exist (line 3939) and actually creating it (line 3987), another thread could create a table/stream with the same name. The `db.createTableWithLock` internally handles this with `ifNotExists`, but without `ifNotExists`, the error message from `createTableWithLock` might be different from the expected `ERR_TABLE_EXISTS_ERROR`. Consider wrapping the existence check and creation under the DB write lock, similar to how `createTable` works for OlapTable. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
