This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch ty/TableModelGrammar
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/ty/TableModelGrammar by this
push:
new 180e8fb422b Fixed the bug that a table may be concurrently altered and
cause unordered operations on dataNode table cache
180e8fb422b is described below
commit 180e8fb422bb46b4a772e17da820068d6cdf794b
Author: Caideyipi <[email protected]>
AuthorDate: Tue Jul 9 09:21:35 2024 +0800
Fixed the bug that a table may be concurrently altered and cause unordered
operations on dataNode table cache
---
.../iotdb/confignode/manager/ProcedureManager.java | 183 ++++++++++++++++-----
.../impl/schema/DeleteDatabaseProcedure.java | 4 +
2 files changed, 145 insertions(+), 42 deletions(-)
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
index 7e5b74163ce..9f0f7765f87 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
@@ -212,16 +212,67 @@ public class ProcedureManager {
}
public TSStatus deleteDatabases(
- List<TDatabaseSchema> deleteSgSchemaList, boolean isGeneratedByPipe) {
- List<Long> procedureIds = new ArrayList<>();
- for (TDatabaseSchema storageGroupSchema : deleteSgSchemaList) {
- DeleteDatabaseProcedure deleteDatabaseProcedure =
- new DeleteDatabaseProcedure(storageGroupSchema, isGeneratedByPipe);
- long procedureId =
this.executor.submitProcedure(deleteDatabaseProcedure);
- procedureIds.add(procedureId);
+ final List<TDatabaseSchema> deleteSgSchemaList, final boolean
isGeneratedByPipe) {
+ final List<Long> procedureIds = new ArrayList<>();
+ final long startCheckTimeForProcedures = System.currentTimeMillis();
+ for (final TDatabaseSchema databaseSchema : deleteSgSchemaList) {
+ final String database = databaseSchema.getName();
+ boolean hasOverlappedTask = false;
+ synchronized (this) {
+ while (executor.isRunning()
+ && System.currentTimeMillis() - startCheckTimeForProcedures <
PROCEDURE_WAIT_TIME_OUT) {
+ ProcedureType type;
+ for (final Procedure<?> procedure :
executor.getProcedures().values()) {
+ type = ProcedureFactory.getProcedureType(procedure);
+ if (type == null) {
+ continue;
+ }
+ // A table shall not be concurrently operated or else the dataNode
cache
+ // may record fake values
+ switch (type) {
+ case CREATE_TABLE_PROCEDURE:
+ final CreateTableProcedure createTableProcedure =
(CreateTableProcedure) procedure;
+ if
(databaseSchema.getName().equals(createTableProcedure.getDatabase())) {
+ hasOverlappedTask = true;
+ break;
+ }
+ break;
+ case ADD_TABLE_COLUMN_PROCEDURE:
+ final AddTableColumnProcedure addTableColumnProcedure =
+ (AddTableColumnProcedure) procedure;
+ if (database.equals(addTableColumnProcedure.getDatabase())) {
+ hasOverlappedTask = true;
+ break;
+ }
+ break;
+ default:
+ break;
+ }
+ }
+ if (!hasOverlappedTask) {
+ final DeleteDatabaseProcedure deleteDatabaseProcedure =
+ new DeleteDatabaseProcedure(databaseSchema, isGeneratedByPipe);
+ final long procedureId =
this.executor.submitProcedure(deleteDatabaseProcedure);
+ procedureIds.add(procedureId);
+ break;
+ }
+ try {
+ wait(PROCEDURE_WAIT_RETRY_TIMEOUT);
+ } catch (final InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
+ if (hasOverlappedTask) {
+ return RpcUtils.getStatus(
+ TSStatusCode.OVERLAP_WITH_EXISTING_TASK,
+ String.format(
+ "Some other task is operating table under the database %s,
please retry after the procedure finishes.",
+ database));
+ }
}
- List<TSStatus> procedureStatus = new ArrayList<>();
- boolean isSucceed = waitingProcedureFinished(procedureIds,
procedureStatus);
+ final List<TSStatus> procedureStatus = new ArrayList<>();
+ final boolean isSucceed = waitingProcedureFinished(procedureIds,
procedureStatus);
// Clear the previously deleted regions
final PartitionManager partitionManager =
getConfigManager().getPartitionManager();
partitionManager.getRegionMaintainer().submit(partitionManager::maintainRegionReplicas);
@@ -1251,27 +1302,51 @@ public class ProcedureManager {
}
}
- public TSStatus createTable(String database, TsTable table) {
+ public TSStatus createTable(final String database, final TsTable table) {
long procedureId = -1;
synchronized (this) {
boolean hasOverlappedTask = false;
ProcedureType type;
- CreateTableProcedure createTableProcedure;
- for (Procedure<?> procedure : executor.getProcedures().values()) {
+ for (final Procedure<?> procedure : executor.getProcedures().values()) {
type = ProcedureFactory.getProcedureType(procedure);
- if (type == null ||
!type.equals(ProcedureType.CREATE_TABLE_PROCEDURE)) {
+ if (type == null) {
continue;
}
- createTableProcedure = (CreateTableProcedure) procedure;
- if (database.equals(createTableProcedure.getDatabase())
- && table.equals(createTableProcedure.getTable())) {
- procedureId = createTableProcedure.getProcId();
- break;
- }
- if (database.equals(createTableProcedure.getDatabase())
- &&
table.getTableName().equals(createTableProcedure.getTable().getTableName())) {
- hasOverlappedTask = true;
- break;
+ // A table shall not be concurrently operated or else the dataNode
cache
+ // may record fake values
+ switch (type) {
+ case CREATE_TABLE_PROCEDURE:
+ final CreateTableProcedure createTableProcedure =
(CreateTableProcedure) procedure;
+ if (database.equals(createTableProcedure.getDatabase())
+ && table.equals(createTableProcedure.getTable())) {
+ procedureId = createTableProcedure.getProcId();
+ break;
+ }
+ if (database.equals(createTableProcedure.getDatabase())
+ &&
table.getTableName().equals(createTableProcedure.getTable().getTableName())) {
+ hasOverlappedTask = true;
+ break;
+ }
+ break;
+ case ADD_TABLE_COLUMN_PROCEDURE:
+ final AddTableColumnProcedure addTableColumnProcedure =
+ (AddTableColumnProcedure) procedure;
+ if (database.equals(addTableColumnProcedure.getDatabase())
+ &&
table.getTableName().equals(addTableColumnProcedure.getTableName())) {
+ hasOverlappedTask = true;
+ break;
+ }
+ break;
+ case DELETE_DATABASE_PROCEDURE:
+ final DeleteDatabaseProcedure deleteDatabaseProcedure =
+ (DeleteDatabaseProcedure) procedure;
+ if (database.equals(deleteDatabaseProcedure.getDatabase())) {
+ hasOverlappedTask = true;
+ break;
+ }
+ break;
+ default:
+ break;
}
}
@@ -1279,12 +1354,12 @@ public class ProcedureManager {
if (hasOverlappedTask) {
return RpcUtils.getStatus(
TSStatusCode.OVERLAP_WITH_EXISTING_TASK,
- "Some other task is creating table with same name.");
+ "Some other task is operating table with same name.");
}
procedureId = this.executor.submitProcedure(new
CreateTableProcedure(database, table));
}
}
- List<TSStatus> procedureStatus = new ArrayList<>();
+ final List<TSStatus> procedureStatus = new ArrayList<>();
boolean isSucceed =
waitingProcedureFinished(Collections.singletonList(procedureId),
procedureStatus);
if (isSucceed) {
@@ -1294,32 +1369,56 @@ public class ProcedureManager {
}
}
- public TSStatus alterTableAddColumn(TAlterTableReq req) {
- String database = req.database;
- String tableName = req.tableName;
- String queryId = req.queryId;
- List<TsTableColumnSchema> columnSchemaList =
+ public TSStatus alterTableAddColumn(final TAlterTableReq req) {
+ final String database = req.database;
+ final String tableName = req.tableName;
+ final String queryId = req.queryId;
+ final List<TsTableColumnSchema> columnSchemaList =
TsTableColumnSchemaUtil.deserializeColumnSchemaList(req.updateInfo);
long procedureId = -1;
synchronized (this) {
boolean hasOverlappedTask = false;
ProcedureType type;
- AddTableColumnProcedure addTableColumnProcedure;
- for (Procedure<?> procedure : executor.getProcedures().values()) {
+ for (final Procedure<?> procedure : executor.getProcedures().values()) {
type = ProcedureFactory.getProcedureType(procedure);
- if (type == null ||
!type.equals(ProcedureType.ADD_TABLE_COLUMN_PROCEDURE)) {
+ if (type == null) {
continue;
}
- addTableColumnProcedure = (AddTableColumnProcedure) procedure;
- if (queryId.equals(addTableColumnProcedure.getQueryId())) {
- procedureId = addTableColumnProcedure.getProcId();
- break;
- }
- if (database.equals(addTableColumnProcedure.getDatabase())
- && tableName.equals(addTableColumnProcedure.getTableName())) {
- hasOverlappedTask = true;
- break;
+ // A table shall not be concurrently operated or else the dataNode
cache
+ // may record fake values
+ switch (type) {
+ case CREATE_TABLE_PROCEDURE:
+ final CreateTableProcedure createTableProcedure =
(CreateTableProcedure) procedure;
+ if (database.equals(createTableProcedure.getDatabase())
+ &&
tableName.equals(createTableProcedure.getTable().getTableName())) {
+ hasOverlappedTask = true;
+ break;
+ }
+ break;
+ case ADD_TABLE_COLUMN_PROCEDURE:
+ final AddTableColumnProcedure addTableColumnProcedure =
+ (AddTableColumnProcedure) procedure;
+ if (queryId.equals(addTableColumnProcedure.getQueryId())) {
+ procedureId = addTableColumnProcedure.getProcId();
+ break;
+ }
+ if (database.equals(addTableColumnProcedure.getDatabase())
+ && tableName.equals(addTableColumnProcedure.getTableName())) {
+ hasOverlappedTask = true;
+ break;
+ }
+ break;
+ case DELETE_DATABASE_PROCEDURE:
+ final DeleteDatabaseProcedure deleteDatabaseProcedure =
+ (DeleteDatabaseProcedure) procedure;
+ if (database.equals(deleteDatabaseProcedure.getDatabase())) {
+ hasOverlappedTask = true;
+ break;
+ }
+ break;
+ default:
+ break;
}
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/DeleteDatabaseProcedure.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/DeleteDatabaseProcedure.java
index d47e575a3ad..7bd107c8450 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/DeleteDatabaseProcedure.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/DeleteDatabaseProcedure.java
@@ -277,6 +277,10 @@ public class DeleteDatabaseProcedure
return DeleteStorageGroupState.PRE_DELETE_DATABASE;
}
+ public String getDatabase() {
+ return deleteDatabaseSchema.getName();
+ }
+
@Override
public void serialize(DataOutputStream stream) throws IOException {
stream.writeShort(