github-actions[bot] commented on code in PR #61382:
URL: https://github.com/apache/doris/pull/61382#discussion_r2968404992
##########
fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java:
##########
@@ -251,6 +255,22 @@ public class MetadataGenerator {
authenticationIntegrationsColList.get(i).getName().toLowerCase(), i);
}
AUTHENTICATION_INTEGRATIONS_COLUMN_TO_INDEX =
authenticationIntegrationsBuilder.build();
+
+ ImmutableMap.Builder<String, Integer> streamsBuilder = new
ImmutableMap.Builder();
+ List<Column> streamsBuilderColList =
SchemaTable.TABLE_MAP.get("streams")
+ .getFullSchema();
Review Comment:
**Bug (Critical):** This line uses `streamsBuilder` instead of
`streamConsumptionBuilder`. Since `streamsBuilder` was already `.build()`-ed on
line 265, calling `.put()` on it again will throw `IllegalStateException` at
class-load time (ImmutableMap.Builder is single-use). Even if that didn't
throw, `STREAM_CONSUMPTION_COLUMN_TO_INDEX` would be empty because
`streamConsumptionBuilder` is never populated.
Fix:
```java
streamConsumptionBuilder.put(streamConsumptionBuilderColList.get(i).getName().toLowerCase(),
i);
```
##########
fe/fe-core/src/main/java/org/apache/doris/catalog/stream/BaseTableInfo.java:
##########
@@ -0,0 +1,142 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.catalog.stream;
+
+import org.apache.doris.catalog.DatabaseIf;
+import org.apache.doris.catalog.Env;
+import org.apache.doris.catalog.TableIf;
+import org.apache.doris.datasource.CatalogIf;
+import org.apache.doris.datasource.InternalCatalog;
+
+import com.google.common.base.Objects;
+import com.google.gson.annotations.SerializedName;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.Optional;
+
+public class BaseTableInfo {
+ private static final Logger LOG =
LogManager.getLogger(BaseTableInfo.class);
+ // for internal table we use id as identifier otherwise use name instead
+ @SerializedName("ci")
+ private long ctlId;
+ @SerializedName("di")
+ private long dbId;
+ @SerializedName("ti")
+ private long tableId;
+ @SerializedName("tn")
+ private String tableName;
+ @SerializedName("dn")
+ private String dbName;
+ @SerializedName("cn")
+ private String ctlName;
+
+ public BaseTableInfo(TableIf table) {
+ java.util.Objects.requireNonNull(table, "table is null");
+ DatabaseIf database = table.getDatabase();
+ java.util.Objects.requireNonNull(database, "database is null");
+ CatalogIf catalog = database.getCatalog();
+ java.util.Objects.requireNonNull(catalog, "catalog is null");
+ this.tableId = table.getId();
+ this.dbId = database.getId();
+ this.ctlId = catalog.getId();
+ this.tableName = table.getName();
+ this.dbName = database.getFullName();
+ this.ctlName = catalog.getName();
+ }
+
+ public String getTableName() {
+ return tableName;
+ }
+
+ public String getDbName() {
+ return dbName;
+ }
+
+ public String getCtlName() {
+ return ctlName;
+ }
+
+ public long getTableId() {
+ return tableId;
+ }
+
+ public long getDbId() {
+ return dbId;
+ }
+
+ public long getCtlId() {
+ return ctlId;
+ }
+
+ public boolean isInternalTable() {
+ return InternalCatalog.INTERNAL_CATALOG_ID == ctlId;
+ }
+
+ public TableIf getTableNullable() {
+ CatalogIf catalog =
Env.getCurrentEnv().getCatalogMgr().getCatalog(ctlId);
+ if (catalog != null) {
+ if (isInternalTable()) {
+ Optional<DatabaseIf> db = catalog.getDb(dbId);
+ if (db.isPresent()) {
+ Optional<TableIf> table = db.get().getTable(tableId);
+ if (table.isPresent()) {
+ return table.get();
+ }
+ }
+ } else {
+ Optional<DatabaseIf> db = catalog.getDb(dbName);
+ if (db.isPresent()) {
+ Optional<TableIf> table = db.get().getTable(tableName);
+ if (table.isPresent()) {
+ return table.get();
+ }
+ }
+ }
+ }
+ LOG.warn("invalid base table: {}", this);
+ return null;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
Review Comment:
**Issue (Low):** `equals()` is overridden without `hashCode()`, which
violates the Java `equals`/`hashCode` contract. If `BaseTableInfo` objects are
ever placed in a `HashSet` or used as `HashMap` keys, they will behave
incorrectly. Consider adding a `hashCode()` implementation consistent with the
`equals()` logic.
##########
fe/fe-core/src/main/java/org/apache/doris/catalog/stream/StreamManager.java:
##########
@@ -0,0 +1,209 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.catalog.stream;
+
+import org.apache.doris.catalog.Database;
+import org.apache.doris.catalog.DatabaseIf;
+import org.apache.doris.catalog.Env;
+import org.apache.doris.catalog.Table;
+import org.apache.doris.catalog.TableIf;
+import org.apache.doris.common.io.Text;
+import org.apache.doris.common.io.Writable;
+import org.apache.doris.common.lock.MonitoredReentrantReadWriteLock;
+import org.apache.doris.persist.gson.GsonUtils;
+import org.apache.doris.thrift.TCell;
+import org.apache.doris.thrift.TRow;
+
+import com.google.common.base.Preconditions;
+import com.google.gson.annotations.SerializedName;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+public class StreamManager implements Writable {
+ private static final Logger LOG =
LogManager.getLogger(StreamManager.class);
+ @SerializedName(value = "dbStreamMap")
+ private Map<Long, Set<Long>> dbStreamMap;
+ protected MonitoredReentrantReadWriteLock rwLock;
+
Review Comment:
**Bug (Critical):** `rwLock` is not a `@SerializedName` field (correctly),
but it also lacks proper reinitialization after GSON deserialization. When
`StreamManager.read()` calls `GsonUtils.GSON.fromJson()`, GSON uses
`Unsafe.allocateInstance()` which bypasses the constructor, so `rwLock` will be
`null`. Any subsequent call to `addStream()`, `removeStream()`,
`getStreamIds()`, or `fillStreamValues*()` will NPE.
**Fix:** `StreamManager` should implement `GsonPostProcessable` and
reinitialize `rwLock` in `gsonPostProcess()`:
```java
public class StreamManager implements Writable, GsonPostProcessable {
...
@Override
public void gsonPostProcess() throws IOException {
this.rwLock = new MonitoredReentrantReadWriteLock(true);
}
}
```
##########
fe/fe-core/src/main/java/org/apache/doris/catalog/stream/StreamManager.java:
##########
@@ -0,0 +1,209 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.catalog.stream;
+
+import org.apache.doris.catalog.Database;
+import org.apache.doris.catalog.DatabaseIf;
+import org.apache.doris.catalog.Env;
+import org.apache.doris.catalog.Table;
+import org.apache.doris.catalog.TableIf;
+import org.apache.doris.common.io.Text;
+import org.apache.doris.common.io.Writable;
+import org.apache.doris.common.lock.MonitoredReentrantReadWriteLock;
+import org.apache.doris.persist.gson.GsonUtils;
+import org.apache.doris.thrift.TCell;
+import org.apache.doris.thrift.TRow;
+
+import com.google.common.base.Preconditions;
+import com.google.gson.annotations.SerializedName;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+public class StreamManager implements Writable {
+ private static final Logger LOG =
LogManager.getLogger(StreamManager.class);
+ @SerializedName(value = "dbStreamMap")
+ private Map<Long, Set<Long>> dbStreamMap;
+ protected MonitoredReentrantReadWriteLock rwLock;
+
+ public StreamManager() {
+ this.rwLock = new MonitoredReentrantReadWriteLock(true);
+ this.dbStreamMap = new HashMap<>();
+ }
+
+ public void addStream(BaseStream stream) {
+ rwLock.writeLock().lock();
+ try {
+ dbStreamMap.computeIfAbsent(stream.getDatabase().getId(), k -> new
HashSet<>()).add(stream.getId());
+ } finally {
+ rwLock.writeLock().unlock();
+ }
+ }
+
+ public void removeStream(BaseStream stream) {
+ rwLock.writeLock().lock();
+ try {
+ Optional.ofNullable(dbStreamMap.get(stream.getDatabase().getId()))
+ .ifPresent(set -> set.remove(stream.getId()));
+ } finally {
+ rwLock.writeLock().unlock();
+ }
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ String json = GsonUtils.GSON.toJson(this);
+ Text.writeString(out, json);
+ }
+
+ public static StreamManager read(DataInput in) throws IOException {
+ String json = Text.readString(in);
+ return GsonUtils.GSON.fromJson(json, StreamManager.class);
+ }
+
+ public Set<Long> getStreamIds(DatabaseIf db) {
+ Set<Long> result = new HashSet<>();
+ rwLock.readLock().lock();
+ try {
+ result.addAll(dbStreamMap.getOrDefault(db.getId(), new
HashSet<>()));
+ } finally {
+ rwLock.readLock().unlock();
+ }
+ return result;
+ }
+
+ public void fillStreamValuesMetadataResult(List<TRow> dataBatch) {
+ Map<Long, Set<Long>> copiedMap = new HashMap<>();
+ rwLock.readLock().lock();
+ try {
+ for (Map.Entry<Long, Set<Long>> e : dbStreamMap.entrySet()) {
+ copiedMap.put(e.getKey(), new HashSet<>(e.getValue()));
+ }
+ } finally {
+ rwLock.readLock().unlock();
+ }
+ for (Map.Entry<Long, Set<Long>> entry : copiedMap.entrySet()) {
+ Optional<Database> db =
Env.getCurrentInternalCatalog().getDb(entry.getKey());
+ if (db.isPresent()) {
+ for (Long tableId : entry.getValue()) {
+ Optional<Table> table = db.get().getTable(tableId);
+ if (!table.isPresent()) {
+ if (LOG.isDebugEnabled()) {
+ LOG.warn("invalid stream id: {}, db: {}", tableId,
db.get().getFullName());
+ }
Review Comment:
**Bug (Medium):** `LOG.warn()` is guarded by `if (LOG.isDebugEnabled())`.
This means the warning about an invalid stream ID will never be logged unless
debug logging is enabled, which defeats the purpose of a WARN-level message.
Either:
- Remove the `isDebugEnabled()` guard, or
- Change `LOG.warn` to `LOG.debug`
Same issue at the second occurrence in
`fillStreamConsumptionValuesMetadataResult()`.
##########
fe/fe-core/src/main/java/org/apache/doris/catalog/stream/BaseStream.java:
##########
@@ -0,0 +1,166 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.catalog.stream;
+
+import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.Table;
+import org.apache.doris.catalog.TableIf;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.io.Text;
+import org.apache.doris.common.util.PropertyAnalyzer;
+import org.apache.doris.persist.gson.GsonUtils;
+import org.apache.doris.thrift.TRow;
+
+import com.google.common.collect.ImmutableList;
+import com.google.gson.annotations.SerializedName;
+
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+public abstract class BaseStream extends Table {
+ public enum StreamConsumeType {
+ DEFAULT,
+ APPEND_ONLY,
+ MIN_DELTA,
+ UNKNOWN;
+ public static StreamConsumeType getType(String typeName) {
+ if (typeName == null) {
+ return UNKNOWN;
+ }
+ typeName = typeName.toLowerCase();
+ switch (typeName) {
+ case "default":
+ return DEFAULT;
+ case "append_only":
+ return APPEND_ONLY;
+ case "min_delta":
+ return MIN_DELTA;
+ default:
+ return UNKNOWN;
+ }
+ }
+ }
+
+ private static ImmutableList<TableType> supportedTableTypeList =
ImmutableList.of(TableType.OLAP);
+
+ @SerializedName("streamType")
+ protected StreamConsumeType streamConsumeType = StreamConsumeType.DEFAULT;
Review Comment:
**Naming concern (Low):** The serialized name `"streamType"` stores
`StreamConsumeType`, not the stream type (which would be e.g.
`OLAP_TABLE_STREAM`). This is confusing and could cause compatibility issues if
a true "stream type" field is added later. Consider renaming to
`"streamConsumeType"` or `"consumeType"` before this becomes a persisted format
commitment.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]