morrySnow commented on code in PR #63850:
URL: https://github.com/apache/doris/pull/63850#discussion_r3378411702


##########
fe/fe-core/src/main/java/org/apache/doris/catalog/stream/OlapTableStream.java:
##########
@@ -169,21 +185,16 @@ public boolean hasData(Partition partition) {
         // if all available visible data has been consumed, return false
         // todo(TsukiokaKogane): change offset from partition version to 
commit tso

Review Comment:
   remove this TODO



##########
fe/fe-core/src/main/java/org/apache/doris/catalog/stream/OlapTableStreamWrapper.java:
##########
@@ -32,18 +33,27 @@
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
+import java.util.stream.Collectors;
 
 // runtime-only class for unified query/insert experience, created when bind 
relation with OlapTableStream
 public class OlapTableStreamWrapper extends OlapTable {
     private OlapTableStream stream;
     private OlapTable baseTable;
     protected Map<Long, Pair<Long, Long>> outputUpdateMap = Maps.newHashMap();
+    private final KeysType keysType;
 
     public OlapTableStreamWrapper(OlapTableStream stream, OlapTable baseTable) 
{
         super(stream.getId(), stream.getName(), stream.getFullSchema(), 
baseTable.getKeysType(),
                 baseTable.getPartitionInfo(), 
baseTable.getDefaultDistributionInfo());
+        // Inherit base table's qualifiedDbName so that wrapper.getDatabase() 
can resolve the
+        // owning Database via 
Env.getCurrentInternalCatalog().getDbNullable(qualifiedDbName).
+        // Otherwise downstream consumers (e.g. QueryPartitionCollector, 
partition routing,
+        // MV partition compensation) treat the wrapper as having no database 
and silently
+        // fall back to empty results when scanning the stream.
+        setQualifiedDbName(baseTable.getQualifiedDbName());

Review Comment:
   这块没太理解,是说基于 stream 创建的异步物化视图会有问题?什么场景下会基于 stream 创建物化视图?



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindRelation.java:
##########
@@ -447,6 +475,74 @@ private Optional<LogicalPlan> handleMetaTable(TableIf 
table, UnboundRelation unb
         return Optional.of(new 
LogicalTVFRelation(unboundRelation.getRelationId(), tvf, ImmutableList.of()));
     }
 
+    private TBinlogScanType checkChangeScanCondition(OlapTable olapTable, 
TableScanParams scanParams)
+            throws AnalysisException {
+        if (!olapTable.needRowBinlog()) {
+            throw new AnalysisException("INCR query requires ROW binlog 
enabled on base table "
+                    + 
"(PROPERTIES('binlog.enable'='true','binlog.format'='ROW')). "
+                    + "Table " + olapTable.getQualifiedName() + " doesn't 
enable row binlog.");
+        }
+        HashSet<String> keys = new HashSet(scanParams.getMapParams().keySet());
+        keys.remove(OlapScanNode.OLAP_INCREMENT_TYPE);
+        keys.remove(OlapScanNode.OLAP_START_TIMESTAMP);
+        keys.remove(OlapScanNode.OLAP_END_TIMESTAMP);
+        if (!keys.isEmpty()) {
+            throw new ParseException("Unsupported parameter in incr query: " + 
keys);
+        }
+        TBinlogScanType binlogScanType = 
OlapScanNode.parseBinlogScanType(scanParams, olapTable);

Review Comment:
   thrift should not appear here, use a nereids's enum instead and translate to 
thrift when construct fgrament parameters



##########
fe/fe-core/src/main/java/org/apache/doris/catalog/stream/OlapTableStreamWrapper.java:
##########
@@ -163,4 +173,32 @@ public List<Long> 
selectNonEmptyPartitionIds(Collection<Long> partitionIds) {
         }
         return nonEmptyIds;
     }
+
+    public List<Column> getRowBinlogSchema() {
+        return baseTable.getRowBinlogMeta().getSchema();
+    }
+
+    public List<Long> filterHistoryPartitionIds(List<Long> partitionIds) {
+        return partitionIds.stream()
+                .filter(partitionId -> stream.hasHistoricalData(partitionId))
+                .collect(Collectors.toList());
+    }
+
+    public List<Long> filterIncrementalPartitionIds(List<Long> partitionIds) {
+        return partitionIds.stream()
+                .filter(partitionId -> !stream.hasHistoricalData(partitionId)
+                        && stream.hasData(getPartition(partitionId)))
+                .collect(Collectors.toList());
+    }
+
+    public OlapTable getBaseTable() {
+        return baseTable;
+    }
+
+    public BaseTableStream.StreamScanType getStreamScanType() {
+        if (keysType == KeysType.DUP_KEYS) {
+            return BaseTableStream.StreamScanType.APPEND_ONLY;
+        }
+        return stream.getStreamScanType();

Review Comment:
   这里为什么不是在stream层做处理,而是在wrapper层做处理?



##########
fe/fe-core/src/main/java/org/apache/doris/catalog/stream/TableStreamManager.java:
##########
@@ -38,24 +43,34 @@
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
+import java.util.concurrent.TimeUnit;
 
-public class TableStreamManager implements Writable, GsonPostProcessable {
+public class TableStreamManager extends MasterDaemon implements Writable, 
GsonPostProcessable {
     private static final Logger LOG = 
LogManager.getLogger(TableStreamManager.class);
     @SerializedName(value = "dbStreamMap")
     private Map<Long, Set<Long>> dbStreamMap;
     protected MonitoredReentrantReadWriteLock rwLock;
 
     public TableStreamManager() {
+        super("table-stream-cleanup", 
Config.table_stream_partition_offset_cleanup_interval_second * 1000L);

Review Comment:
   线程名字和类名保持一致?



##########
fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java:
##########


Review Comment:
   这里最后是什么结论?



##########
fe/fe-core/src/main/java/org/apache/doris/catalog/stream/TableStreamManager.java:
##########
@@ -97,7 +128,129 @@ public Set<Long> getTableStreamIds(DatabaseIf db) {
         return result;
     }
 
-    public void fillTableStreamValuesMetadataResult(List<TRow> dataBatch) {
+    public void cleanupStalePartitionOffsets() {
+        List<Long> staleDbIds = new ArrayList<>();
+        List<Pair<Long, Long>> staleStreamIds = new ArrayList<>();
+        List<PruneTableStreamPartitionOffsetInfo.Entry> pruneEntries = new 
ArrayList<>();
+        for (Map.Entry<Long, Set<Long>> entry : copyDbStreamMap().entrySet()) {
+            Optional<Database> db = 
Env.getCurrentInternalCatalog().getDb(entry.getKey());
+            if (!db.isPresent()) {
+                staleDbIds.add(entry.getKey());
+                continue;
+            }
+            for (Long tableId : entry.getValue()) {
+                Optional<Table> table = db.get().getTable(tableId);
+                if (!table.isPresent()) {
+                    staleStreamIds.add(Pair.of(db.get().getId(), tableId));
+                    continue;
+                }
+                if (!(table.get() instanceof OlapTableStream)) {
+                    staleStreamIds.add(Pair.of(db.get().getId(), tableId));
+                    continue;
+                }
+                cleanupStalePartitionOffsets((OlapTableStream) 
table.get()).ifPresent(pruneEntries::add);
+            }
+        }
+        removeStaleDbAndStream(staleDbIds, staleStreamIds);

Review Comment:
   如果follower一直不重启,会有内存泄漏,这怎么处理?



##########
fe/fe-core/src/main/java/org/apache/doris/catalog/RowBinlogTableWrapper.java:
##########
@@ -33,6 +36,14 @@ public RowBinlogTableWrapper(OlapTable originTable) {
         this.setBaseIndexId(rowBinlogMeta.getIndexId());
     }
 
+    public RowBinlogTableWrapper(OlapTable originTable, OlapTableStreamWrapper 
parent) {
+        super(originTable, originTable.getName(), 
originTable.getRowBinlogMeta().getSchema(), KeysType.DUP_KEYS);

Review Comment:
   这是指的binlog支持存算分离那个事情?



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java:
##########
@@ -888,12 +896,12 @@ private PlanFragment 
computePhysicalOlapScan(PhysicalOlapScan olapScan, PlanTran
         }
 
         // generate base index tuple because this fragment partitioned expr 
relay on slots of based index
-        if (olapScan.getSelectedIndexId() != 
olapScan.getTable().getBaseIndexId()) {
+        if (!olapScan.isIncrementalScan() && olapScan.getSelectedIndexId() != 
olapScan.getTable().getBaseIndexId()) {

Review Comment:
   yes,如果后续的binlog支持存算分离,会让这个if改回去的话,这里保持现状也可以



##########
fe/fe-core/src/main/java/org/apache/doris/catalog/stream/TableStreamManager.java:
##########
@@ -97,7 +128,129 @@ public Set<Long> getTableStreamIds(DatabaseIf db) {
         return result;
     }
 
-    public void fillTableStreamValuesMetadataResult(List<TRow> dataBatch) {
+    public void cleanupStalePartitionOffsets() {
+        List<Long> staleDbIds = new ArrayList<>();
+        List<Pair<Long, Long>> staleStreamIds = new ArrayList<>();
+        List<PruneTableStreamPartitionOffsetInfo.Entry> pruneEntries = new 
ArrayList<>();
+        for (Map.Entry<Long, Set<Long>> entry : copyDbStreamMap().entrySet()) {
+            Optional<Database> db = 
Env.getCurrentInternalCatalog().getDb(entry.getKey());
+            if (!db.isPresent()) {
+                staleDbIds.add(entry.getKey());
+                continue;
+            }
+            for (Long tableId : entry.getValue()) {
+                Optional<Table> table = db.get().getTable(tableId);
+                if (!table.isPresent()) {
+                    staleStreamIds.add(Pair.of(db.get().getId(), tableId));
+                    continue;
+                }
+                if (!(table.get() instanceof OlapTableStream)) {
+                    staleStreamIds.add(Pair.of(db.get().getId(), tableId));
+                    continue;
+                }
+                cleanupStalePartitionOffsets((OlapTableStream) 
table.get()).ifPresent(pruneEntries::add);
+            }
+        }
+        removeStaleDbAndStream(staleDbIds, staleStreamIds);
+        if (!pruneEntries.isEmpty()) {
+            
Env.getCurrentEnv().getEditLog().logPruneTableStreamPartitionOffsets(
+                    new PruneTableStreamPartitionOffsetInfo(pruneEntries));
+        }
+    }
+
+    private Optional<PruneTableStreamPartitionOffsetInfo.Entry> 
cleanupStalePartitionOffsets(OlapTableStream stream) {
+        if (stream.isDisabled() || stream.isStale()) {
+            return Optional.empty();
+        }
+        OlapTable baseTable = stream.getBaseTableNullable();
+        if (baseTable == null) {
+            return Optional.empty();
+        }
+        Set<Long> validPartitionIds;
+        if (!baseTable.tryReadLock(Table.TRY_LOCK_TIMEOUT_MS, 
TimeUnit.MILLISECONDS)) {
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("skip cleaning stream {} because base table {} read 
lock is busy",
+                        stream.getName(), baseTable.getName());
+            }
+            return Optional.empty();
+        }
+        try {
+            if (baseTable.isDropped) {
+                return Optional.empty();
+            }
+            validPartitionIds = new HashSet<>(baseTable.getPartitionIds());
+        } finally {
+            baseTable.readUnlock();
+        }
+        if (!stream.tryWriteLockIfExist(Table.TRY_LOCK_TIMEOUT_MS, 
TimeUnit.MILLISECONDS)) {
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("skip cleaning stream {} because stream write lock 
is busy", stream.getName());
+            }
+            return Optional.empty();
+        }
+        try {
+            if (stream.isDisabled() || stream.isStale()) {
+                return Optional.empty();
+            }
+            Set<Long> stalePartitionIds = 
stream.unprotectedCollectStalePartitionOffsetIds(validPartitionIds);
+            if (stalePartitionIds.isEmpty()) {
+                return Optional.empty();
+            }
+            int removedPartitionCount = 
stream.unprotectedPrunePartitionOffsets(stalePartitionIds);
+            if (removedPartitionCount > 0) {
+                LOG.info("cleaned {} stale partition offset entries from 
stream {}.{} ({})",
+                        removedPartitionCount, 
stream.getDatabase().getFullName(), stream.getName(), stream.getId());
+            }
+            return Optional.of(new PruneTableStreamPartitionOffsetInfo.Entry(
+                    stream.getDatabase().getId(), stream.getId(), 
stalePartitionIds));
+        } finally {
+            stream.writeUnlock();
+        }
+    }
+
+    public void 
replayPruneTableStreamPartitionOffsets(PruneTableStreamPartitionOffsetInfo 
info) {
+        if (info == null || info.getEntries() == null || 
info.getEntries().isEmpty()) {
+            return;
+        }
+        for (PruneTableStreamPartitionOffsetInfo.Entry entry : 
info.getEntries()) {
+            replayPruneTableStreamPartitionOffsets(entry);
+        }
+    }
+
+    private void 
replayPruneTableStreamPartitionOffsets(PruneTableStreamPartitionOffsetInfo.Entry
 entry) {
+        if (entry == null || entry.getPartitionIds() == null || 
entry.getPartitionIds().isEmpty()) {
+            return;
+        }
+        Optional<Database> db = 
Env.getCurrentInternalCatalog().getDb(entry.getDbId());
+        if (!db.isPresent()) {
+            LOG.info("skip replay pruning partition offsets because db {} does 
not exist", entry.getDbId());
+            return;
+        }
+        Optional<Table> table = db.get().getTable(entry.getStreamId());
+        if (!table.isPresent()) {
+            LOG.info("skip replay pruning partition offsets because stream 
{}.{} does not exist",
+                    entry.getDbId(), entry.getStreamId());
+            return;
+        }
+        if (!(table.get() instanceof OlapTableStream)) {
+            LOG.info("skip replay pruning partition offsets because table 
{}.{} is not an olap table stream",
+                    entry.getDbId(), entry.getStreamId());
+            return;
+        }
+        OlapTableStream stream = (OlapTableStream) table.get();
+        if (!stream.tryWriteLockIfExist(Table.TRY_LOCK_TIMEOUT_MS, 
TimeUnit.MILLISECONDS)) {

Review Comment:
   没删掉不会有内存泄漏吗?



##########
fe/fe-core/src/main/java/org/apache/doris/catalog/stream/TableStreamManager.java:
##########
@@ -97,7 +128,129 @@ public Set<Long> getTableStreamIds(DatabaseIf db) {
         return result;
     }
 
-    public void fillTableStreamValuesMetadataResult(List<TRow> dataBatch) {
+    public void cleanupStalePartitionOffsets() {
+        List<Long> staleDbIds = new ArrayList<>();
+        List<Pair<Long, Long>> staleStreamIds = new ArrayList<>();
+        List<PruneTableStreamPartitionOffsetInfo.Entry> pruneEntries = new 
ArrayList<>();
+        for (Map.Entry<Long, Set<Long>> entry : copyDbStreamMap().entrySet()) {
+            Optional<Database> db = 
Env.getCurrentInternalCatalog().getDb(entry.getKey());
+            if (!db.isPresent()) {
+                staleDbIds.add(entry.getKey());
+                continue;
+            }
+            for (Long tableId : entry.getValue()) {
+                Optional<Table> table = db.get().getTable(tableId);
+                if (!table.isPresent()) {
+                    staleStreamIds.add(Pair.of(db.get().getId(), tableId));
+                    continue;
+                }
+                if (!(table.get() instanceof OlapTableStream)) {
+                    staleStreamIds.add(Pair.of(db.get().getId(), tableId));
+                    continue;
+                }
+                cleanupStalePartitionOffsets((OlapTableStream) 
table.get()).ifPresent(pruneEntries::add);
+            }
+        }
+        removeStaleDbAndStream(staleDbIds, staleStreamIds);
+        if (!pruneEntries.isEmpty()) {
+            
Env.getCurrentEnv().getEditLog().logPruneTableStreamPartitionOffsets(

Review Comment:
   锁加在什么地方了?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to