This is an automated email from the ASF dual-hosted git repository.

JackieTien97 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new c94c9a393c7 Fix idempotent table cache update handling (#17959)
c94c9a393c7 is described below

commit c94c9a393c7cd20baad5bb3281553eb2bd175f31
Author: Caideyipi <[email protected]>
AuthorDate: Wed Jun 17 09:05:40 2026 +0800

    Fix idempotent table cache update handling (#17959)
---
 .../db/schemaengine/table/DataNodeTableCache.java  | 37 ++++++++++++++---
 .../schemaengine/table/DataNodeTableCacheTest.java | 48 ++++++++++++++++++++++
 2 files changed, 79 insertions(+), 6 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/table/DataNodeTableCache.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/table/DataNodeTableCache.java
index d8d8f4fe19b..11e80f5a63e 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/table/DataNodeTableCache.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/table/DataNodeTableCache.java
@@ -179,7 +179,14 @@ public class DataNodeTableCache implements ITableCache {
       // If rename table
       if (Objects.nonNull(oldName)) {
         // Equals to commit update
-        final TsTable oldTable = 
preUpdateTableMap.get(database).get(oldName).getLeft();
+        final TsTable oldTable = getTableFromPreUpdateMap(database, oldName);
+        if (Objects.isNull(oldTable)) {
+          LOGGER.info(
+              "Skip rollback renaming old table {}.{} because it has been 
handled.",
+              database,
+              oldName);
+          return;
+        }
         // Cannot be rolled back, consider:
         // 1. Fetched a written CN table
         // 2. CN rollback because of timeout
@@ -200,24 +207,42 @@ public class DataNodeTableCache implements ITableCache {
   }
 
   private void removeTableFromPreUpdateMap(final String database, final String 
tableName) {
-    preUpdateTableMap.compute(
+    preUpdateTableMap.computeIfPresent(
         database,
         (k, v) -> {
-          if (v == null) {
-            throw new IllegalStateException();
+          final Pair<TsTable, Long> tableVersionPair = v.get(tableName);
+          if (Objects.nonNull(tableVersionPair)) {
+            tableVersionPair.setLeft(null);
           }
-          v.get(tableName).setLeft(null);
           return v;
         });
   }
 
+  private @Nullable TsTable getTableFromPreUpdateMap(
+      final String database, final String tableName) {
+    final Map<String, Pair<TsTable, Long>> tableMap = 
preUpdateTableMap.get(database);
+    if (Objects.isNull(tableMap)) {
+      return null;
+    }
+    final Pair<TsTable, Long> tableVersionPair = tableMap.get(tableName);
+    return Objects.nonNull(tableVersionPair) ? tableVersionPair.getLeft() : 
null;
+  }
+
   @Override
   public void commitUpdateTable(
       String database, final String tableName, final @Nullable String oldName) 
{
     database = PathUtils.unQualifyDatabaseName(database);
     readWriteLock.writeLock().lock();
     try {
-      final TsTable newTable = 
preUpdateTableMap.get(database).get(tableName).getLeft();
+      final TsTable newTable = getTableFromPreUpdateMap(database, tableName);
+      if (Objects.isNull(newTable)) {
+        LOGGER.info(
+            "Skip commit-update table {}.{} because it has been handled.", 
database, tableName);
+        if (Objects.nonNull(oldName)) {
+          removeTableFromPreUpdateMap(database, oldName);
+        }
+        return;
+      }
       // Cannot be committed, consider:
       // 1. Fetched a non-changed CN table
       // 2. CN is changed
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/schemaengine/table/DataNodeTableCacheTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/schemaengine/table/DataNodeTableCacheTest.java
index 2d6114363a5..4b33991e3d7 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/schemaengine/table/DataNodeTableCacheTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/schemaengine/table/DataNodeTableCacheTest.java
@@ -19,6 +19,12 @@
 
 package org.apache.iotdb.db.schemaengine.table;
 
+import org.apache.iotdb.commons.schema.table.TsTable;
+import org.apache.iotdb.commons.schema.table.column.FieldColumnSchema;
+
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.file.metadata.enums.CompressionType;
+import org.apache.tsfile.file.metadata.enums.TSEncoding;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -28,6 +34,8 @@ import java.util.concurrent.Semaphore;
 public class DataNodeTableCacheTest {
 
   private static final String DATABASE = "interrupted_fetch_database";
+  private static final String TABLE_CACHE_TEST_DATABASE = 
"root.table_cache_test";
+  private static final String TABLE_NAME = "table1";
 
   @Test
   public void interruptedFetchDoesNotLeakSemaphorePermit() throws Exception {
@@ -50,9 +58,49 @@ public class DataNodeTableCacheTest {
     }
   }
 
+  @Test
+  public void commitUpdateTableIsIdempotent() {
+    final DataNodeTableCache cache = DataNodeTableCache.getInstance();
+    cache.invalid(TABLE_CACHE_TEST_DATABASE);
+    try {
+      cache.preUpdateTable(TABLE_CACHE_TEST_DATABASE, createTable(TABLE_NAME), 
null);
+
+      cache.commitUpdateTable(TABLE_CACHE_TEST_DATABASE, TABLE_NAME, null);
+      cache.commitUpdateTable(TABLE_CACHE_TEST_DATABASE, TABLE_NAME, null);
+
+      Assert.assertEquals(
+          TABLE_NAME, cache.getTable(TABLE_CACHE_TEST_DATABASE, 
TABLE_NAME).getTableName());
+    } finally {
+      cache.invalid(TABLE_CACHE_TEST_DATABASE);
+    }
+  }
+
+  @Test
+  public void commitAfterRollbackUpdateTableIsIgnored() {
+    final DataNodeTableCache cache = DataNodeTableCache.getInstance();
+    cache.invalid(TABLE_CACHE_TEST_DATABASE);
+    try {
+      cache.preUpdateTable(TABLE_CACHE_TEST_DATABASE, createTable(TABLE_NAME), 
null);
+
+      cache.rollbackUpdateTable(TABLE_CACHE_TEST_DATABASE, TABLE_NAME, null);
+      cache.commitUpdateTable(TABLE_CACHE_TEST_DATABASE, TABLE_NAME, null);
+
+      Assert.assertNull(cache.getTable(TABLE_CACHE_TEST_DATABASE, TABLE_NAME, 
false));
+    } finally {
+      cache.invalid(TABLE_CACHE_TEST_DATABASE);
+    }
+  }
+
   private Semaphore getFetchTableSemaphore(final DataNodeTableCache cache) 
throws Exception {
     final Field field = 
DataNodeTableCache.class.getDeclaredField("fetchTableSemaphore");
     field.setAccessible(true);
     return (Semaphore) field.get(cache);
   }
+
+  private TsTable createTable(final String tableName) {
+    final TsTable table = new TsTable(tableName);
+    table.addColumnSchema(
+        new FieldColumnSchema("s1", TSDataType.INT32, TSEncoding.RLE, 
CompressionType.GZIP));
+    return table;
+  }
 }

Reply via email to