This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch ty/TableModelGrammar
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/ty/TableModelGrammar by this 
push:
     new 180e8fb422b Fixed the bug that a table may be concurrently altered and 
cause unordered operations on dataNode table cache
180e8fb422b is described below

commit 180e8fb422bb46b4a772e17da820068d6cdf794b
Author: Caideyipi <[email protected]>
AuthorDate: Tue Jul 9 09:21:35 2024 +0800

    Fixed the bug that a table may be concurrently altered and cause unordered 
operations on dataNode table cache
---
 .../iotdb/confignode/manager/ProcedureManager.java | 183 ++++++++++++++++-----
 .../impl/schema/DeleteDatabaseProcedure.java       |   4 +
 2 files changed, 145 insertions(+), 42 deletions(-)

diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
index 7e5b74163ce..9f0f7765f87 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
@@ -212,16 +212,67 @@ public class ProcedureManager {
   }
 
   public TSStatus deleteDatabases(
-      List<TDatabaseSchema> deleteSgSchemaList, boolean isGeneratedByPipe) {
-    List<Long> procedureIds = new ArrayList<>();
-    for (TDatabaseSchema storageGroupSchema : deleteSgSchemaList) {
-      DeleteDatabaseProcedure deleteDatabaseProcedure =
-          new DeleteDatabaseProcedure(storageGroupSchema, isGeneratedByPipe);
-      long procedureId = 
this.executor.submitProcedure(deleteDatabaseProcedure);
-      procedureIds.add(procedureId);
+      final List<TDatabaseSchema> deleteSgSchemaList, final boolean 
isGeneratedByPipe) {
+    final List<Long> procedureIds = new ArrayList<>();
+    final long startCheckTimeForProcedures = System.currentTimeMillis();
+    for (final TDatabaseSchema databaseSchema : deleteSgSchemaList) {
+      final String database = databaseSchema.getName();
+      boolean hasOverlappedTask = false;
+      synchronized (this) {
+        while (executor.isRunning()
+            && System.currentTimeMillis() - startCheckTimeForProcedures < 
PROCEDURE_WAIT_TIME_OUT) {
+          ProcedureType type;
+          for (final Procedure<?> procedure : 
executor.getProcedures().values()) {
+            type = ProcedureFactory.getProcedureType(procedure);
+            if (type == null) {
+              continue;
+            }
+            // A table shall not be concurrently operated or else the dataNode 
cache
+            // may record fake values
+            switch (type) {
+              case CREATE_TABLE_PROCEDURE:
+                final CreateTableProcedure createTableProcedure = 
(CreateTableProcedure) procedure;
+                if 
(databaseSchema.getName().equals(createTableProcedure.getDatabase())) {
+                  hasOverlappedTask = true;
+                  break;
+                }
+                break;
+              case ADD_TABLE_COLUMN_PROCEDURE:
+                final AddTableColumnProcedure addTableColumnProcedure =
+                    (AddTableColumnProcedure) procedure;
+                if (database.equals(addTableColumnProcedure.getDatabase())) {
+                  hasOverlappedTask = true;
+                  break;
+                }
+                break;
+              default:
+                break;
+            }
+          }
+          if (!hasOverlappedTask) {
+            final DeleteDatabaseProcedure deleteDatabaseProcedure =
+                new DeleteDatabaseProcedure(databaseSchema, isGeneratedByPipe);
+            final long procedureId = 
this.executor.submitProcedure(deleteDatabaseProcedure);
+            procedureIds.add(procedureId);
+            break;
+          }
+          try {
+            wait(PROCEDURE_WAIT_RETRY_TIMEOUT);
+          } catch (final InterruptedException e) {
+            Thread.currentThread().interrupt();
+          }
+        }
+      }
+      if (hasOverlappedTask) {
+        return RpcUtils.getStatus(
+            TSStatusCode.OVERLAP_WITH_EXISTING_TASK,
+            String.format(
+                "Some other task is operating table under the database %s, 
please retry after the procedure finishes.",
+                database));
+      }
     }
-    List<TSStatus> procedureStatus = new ArrayList<>();
-    boolean isSucceed = waitingProcedureFinished(procedureIds, 
procedureStatus);
+    final List<TSStatus> procedureStatus = new ArrayList<>();
+    final boolean isSucceed = waitingProcedureFinished(procedureIds, 
procedureStatus);
     // Clear the previously deleted regions
     final PartitionManager partitionManager = 
getConfigManager().getPartitionManager();
     
partitionManager.getRegionMaintainer().submit(partitionManager::maintainRegionReplicas);
@@ -1251,27 +1302,51 @@ public class ProcedureManager {
     }
   }
 
-  public TSStatus createTable(String database, TsTable table) {
+  public TSStatus createTable(final String database, final TsTable table) {
     long procedureId = -1;
     synchronized (this) {
       boolean hasOverlappedTask = false;
       ProcedureType type;
-      CreateTableProcedure createTableProcedure;
-      for (Procedure<?> procedure : executor.getProcedures().values()) {
+      for (final Procedure<?> procedure : executor.getProcedures().values()) {
         type = ProcedureFactory.getProcedureType(procedure);
-        if (type == null || 
!type.equals(ProcedureType.CREATE_TABLE_PROCEDURE)) {
+        if (type == null) {
           continue;
         }
-        createTableProcedure = (CreateTableProcedure) procedure;
-        if (database.equals(createTableProcedure.getDatabase())
-            && table.equals(createTableProcedure.getTable())) {
-          procedureId = createTableProcedure.getProcId();
-          break;
-        }
-        if (database.equals(createTableProcedure.getDatabase())
-            && 
table.getTableName().equals(createTableProcedure.getTable().getTableName())) {
-          hasOverlappedTask = true;
-          break;
+        // A table shall not be concurrently operated or else the dataNode 
cache
+        // may record fake values
+        switch (type) {
+          case CREATE_TABLE_PROCEDURE:
+            final CreateTableProcedure createTableProcedure = 
(CreateTableProcedure) procedure;
+            if (database.equals(createTableProcedure.getDatabase())
+                && table.equals(createTableProcedure.getTable())) {
+              procedureId = createTableProcedure.getProcId();
+              break;
+            }
+            if (database.equals(createTableProcedure.getDatabase())
+                && 
table.getTableName().equals(createTableProcedure.getTable().getTableName())) {
+              hasOverlappedTask = true;
+              break;
+            }
+            break;
+          case ADD_TABLE_COLUMN_PROCEDURE:
+            final AddTableColumnProcedure addTableColumnProcedure =
+                (AddTableColumnProcedure) procedure;
+            if (database.equals(addTableColumnProcedure.getDatabase())
+                && 
table.getTableName().equals(addTableColumnProcedure.getTableName())) {
+              hasOverlappedTask = true;
+              break;
+            }
+            break;
+          case DELETE_DATABASE_PROCEDURE:
+            final DeleteDatabaseProcedure deleteDatabaseProcedure =
+                (DeleteDatabaseProcedure) procedure;
+            if (database.equals(deleteDatabaseProcedure.getDatabase())) {
+              hasOverlappedTask = true;
+              break;
+            }
+            break;
+          default:
+            break;
         }
       }
 
@@ -1279,12 +1354,12 @@ public class ProcedureManager {
         if (hasOverlappedTask) {
           return RpcUtils.getStatus(
               TSStatusCode.OVERLAP_WITH_EXISTING_TASK,
-              "Some other task is creating table with same name.");
+              "Some other task is operating table with same name.");
         }
         procedureId = this.executor.submitProcedure(new 
CreateTableProcedure(database, table));
       }
     }
-    List<TSStatus> procedureStatus = new ArrayList<>();
+    final List<TSStatus> procedureStatus = new ArrayList<>();
     boolean isSucceed =
         waitingProcedureFinished(Collections.singletonList(procedureId), 
procedureStatus);
     if (isSucceed) {
@@ -1294,32 +1369,56 @@ public class ProcedureManager {
     }
   }
 
-  public TSStatus alterTableAddColumn(TAlterTableReq req) {
-    String database = req.database;
-    String tableName = req.tableName;
-    String queryId = req.queryId;
-    List<TsTableColumnSchema> columnSchemaList =
+  public TSStatus alterTableAddColumn(final TAlterTableReq req) {
+    final String database = req.database;
+    final String tableName = req.tableName;
+    final String queryId = req.queryId;
+    final List<TsTableColumnSchema> columnSchemaList =
         TsTableColumnSchemaUtil.deserializeColumnSchemaList(req.updateInfo);
 
     long procedureId = -1;
     synchronized (this) {
       boolean hasOverlappedTask = false;
       ProcedureType type;
-      AddTableColumnProcedure addTableColumnProcedure;
-      for (Procedure<?> procedure : executor.getProcedures().values()) {
+      for (final Procedure<?> procedure : executor.getProcedures().values()) {
         type = ProcedureFactory.getProcedureType(procedure);
-        if (type == null || 
!type.equals(ProcedureType.ADD_TABLE_COLUMN_PROCEDURE)) {
+        if (type == null) {
           continue;
         }
-        addTableColumnProcedure = (AddTableColumnProcedure) procedure;
-        if (queryId.equals(addTableColumnProcedure.getQueryId())) {
-          procedureId = addTableColumnProcedure.getProcId();
-          break;
-        }
-        if (database.equals(addTableColumnProcedure.getDatabase())
-            && tableName.equals(addTableColumnProcedure.getTableName())) {
-          hasOverlappedTask = true;
-          break;
+        // A table shall not be concurrently operated or else the dataNode 
cache
+        // may record fake values
+        switch (type) {
+          case CREATE_TABLE_PROCEDURE:
+            final CreateTableProcedure createTableProcedure = 
(CreateTableProcedure) procedure;
+            if (database.equals(createTableProcedure.getDatabase())
+                && 
tableName.equals(createTableProcedure.getTable().getTableName())) {
+              hasOverlappedTask = true;
+              break;
+            }
+            break;
+          case ADD_TABLE_COLUMN_PROCEDURE:
+            final AddTableColumnProcedure addTableColumnProcedure =
+                (AddTableColumnProcedure) procedure;
+            if (queryId.equals(addTableColumnProcedure.getQueryId())) {
+              procedureId = addTableColumnProcedure.getProcId();
+              break;
+            }
+            if (database.equals(addTableColumnProcedure.getDatabase())
+                && tableName.equals(addTableColumnProcedure.getTableName())) {
+              hasOverlappedTask = true;
+              break;
+            }
+            break;
+          case DELETE_DATABASE_PROCEDURE:
+            final DeleteDatabaseProcedure deleteDatabaseProcedure =
+                (DeleteDatabaseProcedure) procedure;
+            if (database.equals(deleteDatabaseProcedure.getDatabase())) {
+              hasOverlappedTask = true;
+              break;
+            }
+            break;
+          default:
+            break;
         }
       }
 
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/DeleteDatabaseProcedure.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/DeleteDatabaseProcedure.java
index d47e575a3ad..7bd107c8450 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/DeleteDatabaseProcedure.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/DeleteDatabaseProcedure.java
@@ -277,6 +277,10 @@ public class DeleteDatabaseProcedure
     return DeleteStorageGroupState.PRE_DELETE_DATABASE;
   }
 
+  public String getDatabase() {
+    return deleteDatabaseSchema.getName();
+  }
+
   @Override
   public void serialize(DataOutputStream stream) throws IOException {
     stream.writeShort(

Reply via email to