github-actions[bot] commented on code in PR #61366:
URL: https://github.com/apache/doris/pull/61366#discussion_r2939776980
##########
fe/fe-core/src/main/java/org/apache/doris/catalog/CatalogRecycleBin.java:
##########
@@ -290,29 +353,40 @@ private synchronized void eraseDatabase(long
currentTimeMs, int keepNum) {
}
}
- private synchronized void eraseDatabaseWithSameName(String dbName, long
currentTimeMs,
+ private void eraseDatabaseWithSameName(String dbName, long currentTimeMs,
int
maxSameNameTrashNum, List<Long> sameNameDbIdList) {
- List<Long> dbIdToErase =
getIdListToEraseByRecycleTime(sameNameDbIdList, maxSameNameTrashNum);
+ List<Long> dbIdToErase;
+ readLock();
+ try {
+ dbIdToErase = getIdListToEraseByRecycleTime(sameNameDbIdList,
maxSameNameTrashNum);
+ } finally {
+ readUnlock();
+ }
for (Long dbId : dbIdToErase) {
- RecycleDatabaseInfo dbInfo = idToDatabase.get(dbId);
- if (!isExpireMinLatency(dbId, currentTimeMs)) {
- continue;
- }
- eraseAllTables(dbInfo);
- idToDatabase.remove(dbId);
- idToRecycleTime.remove(dbId);
+ writeLock();
+ try {
+ RecycleDatabaseInfo dbInfo = idToDatabase.get(dbId);
+ if (dbInfo == null || !isExpireMinLatency(dbId,
currentTimeMs)) {
+ continue;
+ }
+ eraseAllTables(dbInfo);
+ idToDatabase.remove(dbId);
+ idToRecycleTime.remove(dbId);
- dbNameToIds.computeIfPresent(dbName, (k, v) -> {
- v.remove(dbId);
- return v.isEmpty() ? null : v;
- });
+ dbNameToIds.computeIfPresent(dbName, (k, v) -> {
+ v.remove(dbId);
+ return v.isEmpty() ? null : v;
+ });
- Env.getCurrentEnv().eraseDatabase(dbId, true);
- LOG.info("erase database[{}] name: {}", dbId, dbName);
+ Env.getCurrentEnv().eraseDatabase(dbId, true);
+ LOG.info("erase database[{}] name: {}", dbId, dbName);
+ } finally {
+ writeUnlock();
+ }
}
}
- private synchronized boolean isExpireMinLatency(long id, long
currentTimeMs) {
+ private boolean isExpireMinLatency(long id, long currentTimeMs) {
return (currentTimeMs - idToRecycleTime.get(id)) > minEraseLatency ||
FeConstants.runningUnitTest;
}
Review Comment:
**[Medium]** Same NPE risk as `isExpire` — `idToRecycleTime.get(id)` can
return `null` if the entry was removed. Suggest using `getOrDefault`:
```java
return (currentTimeMs - idToRecycleTime.getOrDefault(id, currentTimeMs)) >
minEraseLatency || FeConstants.runningUnitTest;
```
##########
fe/fe-core/src/main/java/org/apache/doris/catalog/CatalogRecycleBin.java:
##########
@@ -290,29 +353,40 @@ private synchronized void eraseDatabase(long
currentTimeMs, int keepNum) {
}
}
- private synchronized void eraseDatabaseWithSameName(String dbName, long
currentTimeMs,
+ private void eraseDatabaseWithSameName(String dbName, long currentTimeMs,
int
maxSameNameTrashNum, List<Long> sameNameDbIdList) {
- List<Long> dbIdToErase =
getIdListToEraseByRecycleTime(sameNameDbIdList, maxSameNameTrashNum);
+ List<Long> dbIdToErase;
+ readLock();
+ try {
+ dbIdToErase = getIdListToEraseByRecycleTime(sameNameDbIdList,
maxSameNameTrashNum);
+ } finally {
+ readUnlock();
+ }
for (Long dbId : dbIdToErase) {
- RecycleDatabaseInfo dbInfo = idToDatabase.get(dbId);
- if (!isExpireMinLatency(dbId, currentTimeMs)) {
- continue;
- }
- eraseAllTables(dbInfo);
- idToDatabase.remove(dbId);
- idToRecycleTime.remove(dbId);
+ writeLock();
+ try {
+ RecycleDatabaseInfo dbInfo = idToDatabase.get(dbId);
+ if (dbInfo == null || !isExpireMinLatency(dbId,
currentTimeMs)) {
+ continue;
+ }
+ eraseAllTables(dbInfo);
+ idToDatabase.remove(dbId);
Review Comment:
**[Medium]** `eraseAllTables(dbInfo)` is called inside a `writeLock` block,
but the method itself iterates all tables in the database, calling
`onEraseOlapTable()` (potential cloud RPC with retries) and `logEraseTable()`
(journal I/O) for each table — all while holding the write lock.
This is exactly the O(N*T) pattern the PR description says it's fixing. For
databases with many tables, this path still holds the write lock for the entire
duration. Consider applying the same microbatch pattern here: collect table IDs
under readLock, then erase each under writeLock with release between items.
##########
fe/fe-core/src/test/java/org/apache/doris/catalog/CatalogRecycleBinTest.java:
##########
@@ -835,5 +843,182 @@ public void recycleAllTables(Database db,
CatalogRecycleBin recycleBin) {
db.unregisterTable(CatalogTestUtil.testEsTableId1);
recycleBin.recycleTable(CatalogTestUtil.testDbId1, esTable, false,
false, 0);
}
+
+ @Test
+ public void testConcurrentReadsDoNotBlock() throws Exception {
+ CatalogRecycleBin recycleBin = Env.getCurrentRecycleBin();
+
+ // Recycle several partitions
+ for (int i = 1; i <= 50; i++) {
+ MaterializedIndex index = new MaterializedIndex(2000 + i,
IndexState.NORMAL);
+ RandomDistributionInfo dist = new RandomDistributionInfo(1);
+ Partition partition = new Partition(3000 + i, "part_" + i, index,
dist);
+ recycleBin.recyclePartition(
+ CatalogTestUtil.testDbId1, CatalogTestUtil.testTableId1,
+ CatalogTestUtil.testTable1, partition, null, null,
+ new DataProperty(TStorageMedium.HDD), new
ReplicaAllocation((short) 3),
+ false, false);
+ }
+
+ // Multiple reader threads should run concurrently without blocking
each other
+ int numReaders = 10;
+ CyclicBarrier barrier = new CyclicBarrier(numReaders);
+ ExecutorService executor = Executors.newFixedThreadPool(numReaders);
+ List<Future<Boolean>> futures = new ArrayList<>();
+
+ for (int i = 0; i < numReaders; i++) {
+ futures.add(executor.submit(() -> {
+ barrier.await(5, TimeUnit.SECONDS);
+ // Perform various read operations concurrently
+ for (int j = 1; j <= 50; j++) {
+ recycleBin.isRecyclePartition(CatalogTestUtil.testDbId1,
+ CatalogTestUtil.testTableId1, 3000 + j);
+ recycleBin.getRecycleTimeById(3000 + j);
+ }
+ Set<Long> dbIds = Sets.newHashSet();
+ Set<Long> tableIds = Sets.newHashSet();
+ Set<Long> partIds = Sets.newHashSet();
+ recycleBin.getRecycleIds(dbIds, tableIds, partIds);
+ return true;
+ }));
+ }
+
+ executor.shutdown();
+ Assert.assertTrue(executor.awaitTermination(30, TimeUnit.SECONDS));
+ for (Future<Boolean> f : futures) {
+ Assert.assertTrue(f.get());
+ }
+ }
+
+ @Test
+ public void testConcurrentRecycleAndRead() throws Exception {
+ CatalogRecycleBin recycleBin = Env.getCurrentRecycleBin();
+
+ AtomicBoolean readerError = new AtomicBoolean(false);
+ AtomicBoolean writerDone = new AtomicBoolean(false);
+ CountDownLatch startLatch = new CountDownLatch(1);
+
+ // Writer thread: continuously recycles partitions
+ Thread writer = new Thread(() -> {
+ try {
+ startLatch.await();
+ for (int i = 1; i <= 100; i++) {
+ MaterializedIndex index = new MaterializedIndex(4000 + i,
IndexState.NORMAL);
+ RandomDistributionInfo dist = new
RandomDistributionInfo(1);
+ Partition partition = new Partition(5000 + i, "cpart_" +
i, index, dist);
+ recycleBin.recyclePartition(
+ CatalogTestUtil.testDbId1,
CatalogTestUtil.testTableId1,
+ CatalogTestUtil.testTable1, partition, null, null,
+ new DataProperty(TStorageMedium.HDD), new
ReplicaAllocation((short) 3),
+ false, false);
+ }
+ } catch (Exception e) {
+ readerError.set(true);
+ } finally {
+ writerDone.set(true);
+ }
+ });
+
+ // Reader threads: continuously read while writer is active
+ List<Thread> readers = new ArrayList<>();
+ for (int r = 0; r < 5; r++) {
+ Thread reader = new Thread(() -> {
+ try {
+ startLatch.await();
+ while (!writerDone.get()) {
+ // These should never throw
ConcurrentModificationException
+ Set<Long> dbIds = Sets.newHashSet();
+ Set<Long> tableIds = Sets.newHashSet();
+ Set<Long> partIds = Sets.newHashSet();
+ recycleBin.getRecycleIds(dbIds, tableIds, partIds);
+
recycleBin.isRecyclePartition(CatalogTestUtil.testDbId1,
+ CatalogTestUtil.testTableId1, 5001);
+ }
+ } catch (Exception e) {
+ readerError.set(true);
+ }
+ });
+ readers.add(reader);
+ }
+
+ writer.start();
+ readers.forEach(Thread::start);
+ startLatch.countDown();
+
+ writer.join(30_000);
+ for (Thread reader : readers) {
+ reader.join(30_000);
+ }
+
+ Assert.assertFalse("Reader or writer thread encountered an error",
readerError.get());
+ // Verify all 100 partitions were recycled
+ for (int i = 1; i <= 100; i++) {
+
Assert.assertTrue(recycleBin.isRecyclePartition(CatalogTestUtil.testDbId1,
+ CatalogTestUtil.testTableId1, 5000 + i));
+ }
+ }
+
+ @Test
+ public void testMicrobatchEraseReleasesLockBetweenItems() throws Exception
{
+ CatalogRecycleBin recycleBin = Env.getCurrentRecycleBin();
+
+ // Recycle many partitions
+ int numPartitions = 50;
+ for (int i = 1; i <= numPartitions; i++) {
+ MaterializedIndex index = new MaterializedIndex(6000 + i,
IndexState.NORMAL);
+ RandomDistributionInfo dist = new RandomDistributionInfo(1);
+ Partition partition = new Partition(7000 + i, "epart_" + i, index,
dist);
+ recycleBin.recyclePartition(
+ CatalogTestUtil.testDbId1, CatalogTestUtil.testTableId1,
+ CatalogTestUtil.testTable1, partition, null, null,
+ new DataProperty(TStorageMedium.HDD), new
ReplicaAllocation((short) 3),
+ false, false);
+ }
+
+ // Verify all were recycled
+ Set<Long> dbIds = Sets.newHashSet();
+ Set<Long> tableIds = Sets.newHashSet();
+ Set<Long> partitionIds = Sets.newHashSet();
+ recycleBin.getRecycleIds(dbIds, tableIds, partitionIds);
+ Assert.assertEquals(numPartitions, partitionIds.size());
+
+ // Now run erase daemon which should process items one at a time
+ // While erase is running, a concurrent recyclePartition should be
able to
+ // proceed between items (not blocked for the entire erase duration)
+ AtomicBoolean recycleCompleted = new AtomicBoolean(false);
+ AtomicBoolean eraseStarted = new AtomicBoolean(false);
+
+ Thread eraseThread = new Thread(() -> {
+ eraseStarted.set(true);
+ recycleBin.runAfterCatalogReady();
+ });
+
+ eraseThread.start();
+
+ // Wait briefly for erase to start, then try to recycle a new partition
+ Thread.sleep(50);
+ if (eraseStarted.get()) {
Review Comment:
**[Low]** This `Thread.sleep(50)` makes the test timing-dependent. The test
can pass trivially if the erase thread finishes all 50 partitions within 50ms
(very likely in unit tests with no real I/O), meaning `recyclePartition` runs
after erase completes with no contention — proving nothing about microbatch
lock release.
A more robust approach: inject a per-item callback or use a
`CountDownLatch`/`Phaser` so the erase thread signals after processing at least
one item (proving it's mid-iteration), and the test thread signals back that
`recyclePartition` succeeded before erase finishes all items.
##########
fe/fe-core/src/main/java/org/apache/doris/catalog/CatalogRecycleBin.java:
##########
@@ -141,130 +166,158 @@ private void addRecycledTabletsForPartition(Set<Long>
recycledTabletSet, Partiti
}
}
- public synchronized boolean recycleDatabase(Database db, Set<String>
tableNames, Set<Long> tableIds,
+ public boolean recycleDatabase(Database db, Set<String> tableNames,
Set<Long> tableIds,
boolean isReplay, boolean
isForceDrop, long replayRecycleTime) {
- long recycleTime = 0;
- if (idToDatabase.containsKey(db.getId())) {
- LOG.error("db[{}] already in recycle bin.", db.getId());
- return false;
- }
-
- // db should be empty. all tables are recycled before
- if (!db.getTableIds().isEmpty()) {
- throw new IllegalStateException("Database " + db.getFullName() + "
is not empty. Contains tables: "
- +
db.getTableIds().stream().collect(Collectors.toSet()));
- }
-
- // recycle db
- RecycleDatabaseInfo databaseInfo = new RecycleDatabaseInfo(db,
tableNames, tableIds);
- idToDatabase.put(db.getId(), databaseInfo);
- if (isForceDrop) {
- // The 'force drop' database should be recycle immediately.
- recycleTime = 0;
- } else if (!isReplay || replayRecycleTime == 0) {
- recycleTime = System.currentTimeMillis();
- } else {
- recycleTime = replayRecycleTime;
- }
- idToRecycleTime.put(db.getId(), recycleTime);
- dbNameToIds.computeIfAbsent(db.getFullName(), k ->
ConcurrentHashMap.newKeySet()).add(db.getId());
- LOG.info("recycle db[{}-{}], is force drop: {}", db.getId(),
db.getFullName(), isForceDrop);
- return true;
+ writeLock();
+ try {
+ long recycleTime = 0;
+ if (idToDatabase.containsKey(db.getId())) {
+ LOG.error("db[{}] already in recycle bin.", db.getId());
+ return false;
+ }
+
+ // db should be empty. all tables are recycled before
+ if (!db.getTableIds().isEmpty()) {
+ throw new IllegalStateException("Database " + db.getFullName()
+ " is not empty. Contains tables: "
+ +
db.getTableIds().stream().collect(Collectors.toSet()));
+ }
+
+ // recycle db
+ RecycleDatabaseInfo databaseInfo = new RecycleDatabaseInfo(db,
tableNames, tableIds);
+ idToDatabase.put(db.getId(), databaseInfo);
+ if (isForceDrop) {
+ // The 'force drop' database should be recycle immediately.
+ recycleTime = 0;
+ } else if (!isReplay || replayRecycleTime == 0) {
+ recycleTime = System.currentTimeMillis();
+ } else {
+ recycleTime = replayRecycleTime;
+ }
+ idToRecycleTime.put(db.getId(), recycleTime);
+ dbNameToIds.computeIfAbsent(db.getFullName(), k ->
ConcurrentHashMap.newKeySet()).add(db.getId());
+ LOG.info("recycle db[{}-{}], is force drop: {}", db.getId(),
db.getFullName(), isForceDrop);
+ return true;
+ } finally {
+ writeUnlock();
+ }
}
- public synchronized boolean recycleTable(long dbId, Table table, boolean
isReplay,
+ public boolean recycleTable(long dbId, Table table, boolean isReplay,
boolean isForceDrop, long
replayRecycleTime) {
- long recycleTime = 0;
- if (idToTable.containsKey(table.getId())) {
- LOG.error("table[{}] already in recycle bin.", table.getId());
- return false;
- }
-
- // recycle table
- RecycleTableInfo tableInfo = new RecycleTableInfo(dbId, table);
- if (isForceDrop) {
- // The 'force drop' table should be recycle immediately.
- recycleTime = 0;
- } else if (!isReplay || replayRecycleTime == 0) {
- recycleTime = System.currentTimeMillis();
- } else {
- recycleTime = replayRecycleTime;
- }
- idToRecycleTime.put(table.getId(), recycleTime);
- idToTable.put(table.getId(), tableInfo);
- dbIdTableNameToIds.computeIfAbsent(Pair.of(dbId, table.getName()),
- k -> ConcurrentHashMap.newKeySet()).add(table.getId());
- LOG.info("recycle table[{}-{}], is force drop: {}", table.getId(),
table.getName(), isForceDrop);
- return true;
+ writeLock();
+ try {
+ long recycleTime = 0;
+ if (idToTable.containsKey(table.getId())) {
+ LOG.error("table[{}] already in recycle bin.", table.getId());
+ return false;
+ }
+
+ // recycle table
+ RecycleTableInfo tableInfo = new RecycleTableInfo(dbId, table);
+ if (isForceDrop) {
+ // The 'force drop' table should be recycle immediately.
+ recycleTime = 0;
+ } else if (!isReplay || replayRecycleTime == 0) {
+ recycleTime = System.currentTimeMillis();
+ } else {
+ recycleTime = replayRecycleTime;
+ }
+ idToRecycleTime.put(table.getId(), recycleTime);
+ idToTable.put(table.getId(), tableInfo);
+ dbIdTableNameToIds.computeIfAbsent(Pair.of(dbId, table.getName()),
+ k -> ConcurrentHashMap.newKeySet()).add(table.getId());
+ LOG.info("recycle table[{}-{}], is force drop: {}", table.getId(),
table.getName(), isForceDrop);
+ return true;
+ } finally {
+ writeUnlock();
+ }
}
- public synchronized boolean recyclePartition(long dbId, long tableId,
String tableName, Partition partition,
+ public boolean recyclePartition(long dbId, long tableId, String tableName,
Partition partition,
Range<PartitionKey> range,
PartitionItem listPartitionItem,
DataProperty dataProperty,
ReplicaAllocation replicaAlloc,
boolean isInMemory, boolean
isMutable) {
- if (idToPartition.containsKey(partition.getId())) {
- LOG.error("partition[{}] already in recycle bin.",
partition.getId());
- return false;
- }
-
- // recycle partition
- RecyclePartitionInfo partitionInfo = new RecyclePartitionInfo(dbId,
tableId, partition,
- range, listPartitionItem, dataProperty, replicaAlloc,
isInMemory, isMutable);
- idToRecycleTime.put(partition.getId(), System.currentTimeMillis());
- idToPartition.put(partition.getId(), partitionInfo);
- dbTblIdPartitionNameToIds.computeIfAbsent(Pair.of(dbId, tableId), k ->
new ConcurrentHashMap<>())
- .computeIfAbsent(partition.getName(), k ->
ConcurrentHashMap.newKeySet()).add(partition.getId());
- LOG.info("recycle partition[{}-{}] of table [{}-{}]",
partition.getId(), partition.getName(),
- tableId, tableName);
- return true;
+ writeLock();
+ try {
+ if (idToPartition.containsKey(partition.getId())) {
+ LOG.error("partition[{}] already in recycle bin.",
partition.getId());
+ return false;
+ }
+
+ // recycle partition
+ RecyclePartitionInfo partitionInfo = new
RecyclePartitionInfo(dbId, tableId, partition,
+ range, listPartitionItem, dataProperty, replicaAlloc,
isInMemory, isMutable);
+ idToRecycleTime.put(partition.getId(), System.currentTimeMillis());
+ idToPartition.put(partition.getId(), partitionInfo);
+ dbTblIdPartitionNameToIds.computeIfAbsent(Pair.of(dbId, tableId),
k -> new ConcurrentHashMap<>())
+ .computeIfAbsent(partition.getName(), k ->
ConcurrentHashMap.newKeySet()).add(partition.getId());
+ LOG.info("recycle partition[{}-{}] of table [{}-{}]",
partition.getId(), partition.getName(),
+ tableId, tableName);
+ return true;
+ } finally {
+ writeUnlock();
+ }
}
- public synchronized Long getRecycleTimeById(long id) {
+ public Long getRecycleTimeById(long id) {
return idToRecycleTime.get(id);
}
- public synchronized void setRecycleTimeByIdForReplay(long id, Long
recycleTime) {
+ public void setRecycleTimeByIdForReplay(long id, Long recycleTime) {
idToRecycleTime.put(id, recycleTime);
}
- public synchronized boolean isRecycleDatabase(long dbId) {
+ public boolean isRecycleDatabase(long dbId) {
return idToDatabase.containsKey(dbId);
}
- public synchronized boolean isRecycleTable(long dbId, long tableId) {
+ public boolean isRecycleTable(long dbId, long tableId) {
return isRecycleDatabase(dbId) || idToTable.containsKey(tableId);
}
- public synchronized boolean isRecyclePartition(long dbId, long tableId,
long partitionId) {
+ public boolean isRecyclePartition(long dbId, long tableId, long
partitionId) {
return isRecycleTable(dbId, tableId) ||
idToPartition.containsKey(partitionId);
}
- public synchronized void getRecycleIds(Set<Long> dbIds, Set<Long>
tableIds, Set<Long> partitionIds) {
+ public void getRecycleIds(Set<Long> dbIds, Set<Long> tableIds, Set<Long>
partitionIds) {
dbIds.addAll(idToDatabase.keySet());
tableIds.addAll(idToTable.keySet());
partitionIds.addAll(idToPartition.keySet());
}
- private synchronized boolean isExpire(long id, long currentTimeMs) {
+ private boolean isExpire(long id, long currentTimeMs) {
long latency = currentTimeMs - idToRecycleTime.get(id);
return (Config.catalog_trash_ignore_min_erase_latency || latency >
minEraseLatency)
Review Comment:
**[Medium]** `idToRecycleTime.get(id)` returns boxed `Long`. If the id has
been concurrently removed from `idToRecycleTime`, this returns `null` and the
subtraction throws `NullPointerException` during auto-unboxing.
While callers currently hold readLock/writeLock which provides some
protection, the companion method `getIdListToEraseByRecycleTime` already uses
`getOrDefault(y, 0L)` defensively for exactly this scenario. For consistency
and robustness, suggest:
```java
long latency = currentTimeMs - idToRecycleTime.getOrDefault(id,
currentTimeMs);
```
This returns `latency=0` (not expired) for missing entries, which is safe.
##########
fe/fe-core/src/main/java/org/apache/doris/catalog/CatalogRecycleBin.java:
##########
@@ -141,130 +166,158 @@ private void addRecycledTabletsForPartition(Set<Long>
recycledTabletSet, Partiti
}
}
- public synchronized boolean recycleDatabase(Database db, Set<String>
tableNames, Set<Long> tableIds,
+ public boolean recycleDatabase(Database db, Set<String> tableNames,
Set<Long> tableIds,
boolean isReplay, boolean
isForceDrop, long replayRecycleTime) {
- long recycleTime = 0;
- if (idToDatabase.containsKey(db.getId())) {
- LOG.error("db[{}] already in recycle bin.", db.getId());
- return false;
- }
-
- // db should be empty. all tables are recycled before
- if (!db.getTableIds().isEmpty()) {
- throw new IllegalStateException("Database " + db.getFullName() + "
is not empty. Contains tables: "
- +
db.getTableIds().stream().collect(Collectors.toSet()));
- }
-
- // recycle db
- RecycleDatabaseInfo databaseInfo = new RecycleDatabaseInfo(db,
tableNames, tableIds);
- idToDatabase.put(db.getId(), databaseInfo);
- if (isForceDrop) {
- // The 'force drop' database should be recycle immediately.
- recycleTime = 0;
- } else if (!isReplay || replayRecycleTime == 0) {
- recycleTime = System.currentTimeMillis();
- } else {
- recycleTime = replayRecycleTime;
- }
- idToRecycleTime.put(db.getId(), recycleTime);
- dbNameToIds.computeIfAbsent(db.getFullName(), k ->
ConcurrentHashMap.newKeySet()).add(db.getId());
- LOG.info("recycle db[{}-{}], is force drop: {}", db.getId(),
db.getFullName(), isForceDrop);
- return true;
+ writeLock();
+ try {
+ long recycleTime = 0;
+ if (idToDatabase.containsKey(db.getId())) {
+ LOG.error("db[{}] already in recycle bin.", db.getId());
+ return false;
+ }
+
+ // db should be empty. all tables are recycled before
+ if (!db.getTableIds().isEmpty()) {
+ throw new IllegalStateException("Database " + db.getFullName()
+ " is not empty. Contains tables: "
+ +
db.getTableIds().stream().collect(Collectors.toSet()));
+ }
+
+ // recycle db
+ RecycleDatabaseInfo databaseInfo = new RecycleDatabaseInfo(db,
tableNames, tableIds);
+ idToDatabase.put(db.getId(), databaseInfo);
+ if (isForceDrop) {
+ // The 'force drop' database should be recycle immediately.
+ recycleTime = 0;
+ } else if (!isReplay || replayRecycleTime == 0) {
+ recycleTime = System.currentTimeMillis();
+ } else {
+ recycleTime = replayRecycleTime;
+ }
+ idToRecycleTime.put(db.getId(), recycleTime);
+ dbNameToIds.computeIfAbsent(db.getFullName(), k ->
ConcurrentHashMap.newKeySet()).add(db.getId());
+ LOG.info("recycle db[{}-{}], is force drop: {}", db.getId(),
db.getFullName(), isForceDrop);
+ return true;
+ } finally {
+ writeUnlock();
+ }
}
- public synchronized boolean recycleTable(long dbId, Table table, boolean
isReplay,
+ public boolean recycleTable(long dbId, Table table, boolean isReplay,
boolean isForceDrop, long
replayRecycleTime) {
- long recycleTime = 0;
- if (idToTable.containsKey(table.getId())) {
- LOG.error("table[{}] already in recycle bin.", table.getId());
- return false;
- }
-
- // recycle table
- RecycleTableInfo tableInfo = new RecycleTableInfo(dbId, table);
- if (isForceDrop) {
- // The 'force drop' table should be recycle immediately.
- recycleTime = 0;
- } else if (!isReplay || replayRecycleTime == 0) {
- recycleTime = System.currentTimeMillis();
- } else {
- recycleTime = replayRecycleTime;
- }
- idToRecycleTime.put(table.getId(), recycleTime);
- idToTable.put(table.getId(), tableInfo);
- dbIdTableNameToIds.computeIfAbsent(Pair.of(dbId, table.getName()),
- k -> ConcurrentHashMap.newKeySet()).add(table.getId());
- LOG.info("recycle table[{}-{}], is force drop: {}", table.getId(),
table.getName(), isForceDrop);
- return true;
+ writeLock();
+ try {
+ long recycleTime = 0;
+ if (idToTable.containsKey(table.getId())) {
+ LOG.error("table[{}] already in recycle bin.", table.getId());
+ return false;
+ }
+
+ // recycle table
+ RecycleTableInfo tableInfo = new RecycleTableInfo(dbId, table);
+ if (isForceDrop) {
+ // The 'force drop' table should be recycle immediately.
+ recycleTime = 0;
+ } else if (!isReplay || replayRecycleTime == 0) {
+ recycleTime = System.currentTimeMillis();
+ } else {
+ recycleTime = replayRecycleTime;
+ }
+ idToRecycleTime.put(table.getId(), recycleTime);
+ idToTable.put(table.getId(), tableInfo);
+ dbIdTableNameToIds.computeIfAbsent(Pair.of(dbId, table.getName()),
+ k -> ConcurrentHashMap.newKeySet()).add(table.getId());
+ LOG.info("recycle table[{}-{}], is force drop: {}", table.getId(),
table.getName(), isForceDrop);
+ return true;
+ } finally {
+ writeUnlock();
+ }
}
- public synchronized boolean recyclePartition(long dbId, long tableId,
String tableName, Partition partition,
+ public boolean recyclePartition(long dbId, long tableId, String tableName,
Partition partition,
Range<PartitionKey> range,
PartitionItem listPartitionItem,
DataProperty dataProperty,
ReplicaAllocation replicaAlloc,
boolean isInMemory, boolean
isMutable) {
- if (idToPartition.containsKey(partition.getId())) {
- LOG.error("partition[{}] already in recycle bin.",
partition.getId());
- return false;
- }
-
- // recycle partition
- RecyclePartitionInfo partitionInfo = new RecyclePartitionInfo(dbId,
tableId, partition,
- range, listPartitionItem, dataProperty, replicaAlloc,
isInMemory, isMutable);
- idToRecycleTime.put(partition.getId(), System.currentTimeMillis());
- idToPartition.put(partition.getId(), partitionInfo);
- dbTblIdPartitionNameToIds.computeIfAbsent(Pair.of(dbId, tableId), k ->
new ConcurrentHashMap<>())
- .computeIfAbsent(partition.getName(), k ->
ConcurrentHashMap.newKeySet()).add(partition.getId());
- LOG.info("recycle partition[{}-{}] of table [{}-{}]",
partition.getId(), partition.getName(),
- tableId, tableName);
- return true;
+ writeLock();
+ try {
+ if (idToPartition.containsKey(partition.getId())) {
+ LOG.error("partition[{}] already in recycle bin.",
partition.getId());
+ return false;
+ }
+
+ // recycle partition
+ RecyclePartitionInfo partitionInfo = new
RecyclePartitionInfo(dbId, tableId, partition,
+ range, listPartitionItem, dataProperty, replicaAlloc,
isInMemory, isMutable);
+ idToRecycleTime.put(partition.getId(), System.currentTimeMillis());
+ idToPartition.put(partition.getId(), partitionInfo);
+ dbTblIdPartitionNameToIds.computeIfAbsent(Pair.of(dbId, tableId),
k -> new ConcurrentHashMap<>())
+ .computeIfAbsent(partition.getName(), k ->
ConcurrentHashMap.newKeySet()).add(partition.getId());
+ LOG.info("recycle partition[{}-{}] of table [{}-{}]",
partition.getId(), partition.getName(),
+ tableId, tableName);
+ return true;
+ } finally {
+ writeUnlock();
+ }
}
- public synchronized Long getRecycleTimeById(long id) {
+ public Long getRecycleTimeById(long id) {
return idToRecycleTime.get(id);
}
- public synchronized void setRecycleTimeByIdForReplay(long id, Long
recycleTime) {
+ public void setRecycleTimeByIdForReplay(long id, Long recycleTime) {
idToRecycleTime.put(id, recycleTime);
}
- public synchronized boolean isRecycleDatabase(long dbId) {
+ public boolean isRecycleDatabase(long dbId) {
return idToDatabase.containsKey(dbId);
}
- public synchronized boolean isRecycleTable(long dbId, long tableId) {
+ public boolean isRecycleTable(long dbId, long tableId) {
return isRecycleDatabase(dbId) || idToTable.containsKey(tableId);
}
- public synchronized boolean isRecyclePartition(long dbId, long tableId,
long partitionId) {
+ public boolean isRecyclePartition(long dbId, long tableId, long
partitionId) {
return isRecycleTable(dbId, tableId) ||
idToPartition.containsKey(partitionId);
}
- public synchronized void getRecycleIds(Set<Long> dbIds, Set<Long>
tableIds, Set<Long> partitionIds) {
+ public void getRecycleIds(Set<Long> dbIds, Set<Long> tableIds, Set<Long>
partitionIds) {
dbIds.addAll(idToDatabase.keySet());
Review Comment:
**[Low]** This method reads three separate ConcurrentHashMaps without any
lock. The original `synchronized` version provided a consistent snapshot across
all three maps. Now, a concurrent recycle/erase between the three `addAll`
calls can produce an inconsistent view (e.g., a partition ID appears but its
parent table ID doesn't).
If callers rely on cross-map consistency, this needs a readLock. If they
only need approximate membership checks, the current approach is fine. Worth
documenting the relaxed semantics.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]