This is an automated email from the ASF dual-hosted git repository.
kxiao pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.0 by this push:
new 88abaaef33 [fix](alter table property) fix alter property if rpc
failed, branch 2.0 #22845 (#23116)
88abaaef33 is described below
commit 88abaaef33d3f506a262e3a52e838eb5dc58a51c
Author: Chenyang Sun <[email protected]>
AuthorDate: Thu Aug 17 18:16:51 2023 +0800
[fix](alter table property) fix alter property if rpc failed, branch 2.0
#22845 (#23116)
---
.../main/java/org/apache/doris/alter/Alter.java | 22 +-----
.../apache/doris/alter/SchemaChangeHandler.java | 91 ++--------------------
.../main/java/org/apache/doris/catalog/Env.java | 6 +-
.../doris/task/UpdateTabletMetaInfoTask.java | 6 +-
.../test_table_level_compaction_policy.groovy | 29 ++++++-
5 files changed, 45 insertions(+), 109 deletions(-)
diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/Alter.java
b/fe/fe-core/src/main/java/org/apache/doris/alter/Alter.java
index f8c90b8a52..8a423e5894 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/alter/Alter.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/alter/Alter.java
@@ -211,27 +211,13 @@ public class Alter {
} else if (currentAlterOps.checkIsBeingSynced(alterClauses)) {
olapTable.setIsBeingSynced(currentAlterOps.isBeingSynced(alterClauses));
needProcessOutsideTableLock = true;
- } else if (currentAlterOps.checkCompactionPolicy(alterClauses)
- && currentAlterOps.getCompactionPolicy(alterClauses) !=
olapTable.getCompactionPolicy()) {
-
olapTable.setCompactionPolicy(currentAlterOps.getCompactionPolicy(alterClauses));
+ } else if (currentAlterOps.checkCompactionPolicy(alterClauses)) {
needProcessOutsideTableLock = true;
- } else if
(currentAlterOps.checkTimeSeriesCompactionGoalSizeMbytes(alterClauses)
- &&
currentAlterOps.getTimeSeriesCompactionGoalSizeMbytes(alterClauses)
- !=
olapTable.getTimeSeriesCompactionGoalSizeMbytes()) {
- olapTable.setTimeSeriesCompactionGoalSizeMbytes(currentAlterOps
-
.getTimeSeriesCompactionGoalSizeMbytes(alterClauses));
+ } else if
(currentAlterOps.checkTimeSeriesCompactionGoalSizeMbytes(alterClauses)) {
needProcessOutsideTableLock = true;
- } else if
(currentAlterOps.checkTimeSeriesCompactionFileCountThreshold(alterClauses)
- &&
currentAlterOps.getTimeSeriesCompactionFileCountThreshold(alterClauses)
- !=
olapTable.getTimeSeriesCompactionFileCountThreshold()) {
- olapTable.setTimeSeriesCompactionFileCountThreshold(currentAlterOps
-
.getTimeSeriesCompactionFileCountThreshold(alterClauses));
+ } else if
(currentAlterOps.checkTimeSeriesCompactionFileCountThreshold(alterClauses)) {
needProcessOutsideTableLock = true;
- } else if
(currentAlterOps.checkTimeSeriesCompactionTimeThresholdSeconds(alterClauses)
- &&
currentAlterOps.getTimeSeriesCompactionTimeThresholdSeconds(alterClauses)
- !=
olapTable.getTimeSeriesCompactionTimeThresholdSeconds()) {
-
olapTable.setTimeSeriesCompactionTimeThresholdSeconds(currentAlterOps
-
.getTimeSeriesCompactionTimeThresholdSeconds(alterClauses));
+ } else if
(currentAlterOps.checkTimeSeriesCompactionTimeThresholdSeconds(alterClauses)) {
needProcessOutsideTableLock = true;
} else if (currentAlterOps.checkBinlogConfigChange(alterClauses)) {
if (!Config.enable_feature_binlog) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java
b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java
index 9130555414..b555516bc2 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java
@@ -2218,7 +2218,8 @@ public class SchemaChangeHandler extends AlterHandler {
for (String partitionName : partitionNames) {
try {
- updatePartitionProperties(db, olapTable.getName(),
partitionName, storagePolicyId, isInMemory, null);
+ updatePartitionProperties(db, olapTable.getName(),
partitionName, storagePolicyId,
+
isInMemory, null, null, null);
} catch (Exception e) {
String errMsg = "Failed to update partition[" + partitionName
+ "]'s 'in_memory' property. "
+ "The reason is [" + e.getMessage() + "]";
@@ -2232,7 +2233,8 @@ public class SchemaChangeHandler extends AlterHandler {
* This operation may return partial successfully, with an exception to
inform user to retry
*/
public void updatePartitionProperties(Database db, String tableName,
String partitionName, long storagePolicyId,
- int isInMemory, BinlogConfig
binlogConfig) throws UserException {
+ int isInMemory, BinlogConfig
binlogConfig, String compactionPolicy,
+ Map<String, Long>
timeSeriesCompactionConfig) throws UserException {
// be id -> <tablet id,schemaHash>
Map<Long, Set<Pair<Long, Integer>>> beIdToTabletIdWithHash =
Maps.newHashMap();
OlapTable olapTable = (OlapTable)
db.getTableOrMetaException(tableName, Table.TableType.OLAP);
@@ -2264,7 +2266,7 @@ public class SchemaChangeHandler extends AlterHandler {
for (Map.Entry<Long, Set<Pair<Long, Integer>>> kv :
beIdToTabletIdWithHash.entrySet()) {
countDownLatch.addMark(kv.getKey(), kv.getValue());
UpdateTabletMetaInfoTask task = new
UpdateTabletMetaInfoTask(kv.getKey(), kv.getValue(), isInMemory,
- storagePolicyId, binlogConfig, countDownLatch);
+ storagePolicyId, binlogConfig, countDownLatch,
compactionPolicy, timeSeriesCompactionConfig);
batchTask.addTask(task);
}
if (!FeConstants.runningUnitTest) {
@@ -2307,86 +2309,6 @@ public class SchemaChangeHandler extends AlterHandler {
}
}
- /**
- * Update one specified partition's properties by partition name of table
- * This operation may return partial successfully, with an exception to
inform user to retry
- */
- public void updatePartitionProperties(Database db, String tableName,
String partitionName, long storagePolicyId,
- int isInMemory, BinlogConfig
binlogConfig, String compactionPolicy,
- Map<String, Long>
timeSeriesCompactionConfig) throws UserException {
- // be id -> <tablet id,schemaHash>
- Map<Long, Set<Pair<Long, Integer>>> beIdToTabletIdWithHash =
Maps.newHashMap();
- OlapTable olapTable = (OlapTable)
db.getTableOrMetaException(tableName, Table.TableType.OLAP);
- olapTable.readLock();
- try {
- Partition partition = olapTable.getPartition(partitionName);
- if (partition == null) {
- throw new DdlException(
- "Partition[" + partitionName + "] does not exist in
table[" + olapTable.getName() + "]");
- }
-
- for (MaterializedIndex index :
partition.getMaterializedIndices(IndexExtState.VISIBLE)) {
- int schemaHash =
olapTable.getSchemaHashByIndexId(index.getId());
- for (Tablet tablet : index.getTablets()) {
- for (Replica replica : tablet.getReplicas()) {
- Set<Pair<Long, Integer>> tabletIdWithHash =
beIdToTabletIdWithHash.computeIfAbsent(
- replica.getBackendId(), k ->
Sets.newHashSet());
- tabletIdWithHash.add(Pair.of(tablet.getId(),
schemaHash));
- }
- }
- }
- } finally {
- olapTable.readUnlock();
- }
-
- int totalTaskNum = beIdToTabletIdWithHash.keySet().size();
- MarkedCountDownLatch<Long, Set<Pair<Long, Integer>>> countDownLatch =
new MarkedCountDownLatch<>(totalTaskNum);
- AgentBatchTask batchTask = new AgentBatchTask();
- for (Map.Entry<Long, Set<Pair<Long, Integer>>> kv :
beIdToTabletIdWithHash.entrySet()) {
- countDownLatch.addMark(kv.getKey(), kv.getValue());
- UpdateTabletMetaInfoTask task = new
UpdateTabletMetaInfoTask(kv.getKey(), kv.getValue(), isInMemory,
- storagePolicyId, binlogConfig, countDownLatch,
compactionPolicy, timeSeriesCompactionConfig);
- batchTask.addTask(task);
- }
- if (!FeConstants.runningUnitTest) {
- // send all tasks and wait them finished
- AgentTaskQueue.addBatchTask(batchTask);
- AgentTaskExecutor.submit(batchTask);
- LOG.info("send update tablet meta task for table {}, partitions
{}, number: {}", tableName, partitionName,
- batchTask.getTaskNum());
-
- // estimate timeout
- long timeout = DbUtil.getCreateReplicasTimeoutMs(totalTaskNum);
- boolean ok = false;
- try {
- ok = countDownLatch.await(timeout, TimeUnit.MILLISECONDS);
- } catch (InterruptedException e) {
- LOG.warn("InterruptedException: ", e);
- }
-
- if (!ok || !countDownLatch.getStatus().ok()) {
- String errMsg = "Failed to update partition[" + partitionName
+ "]. tablet meta.";
- // clear tasks
- AgentTaskQueue.removeBatchTask(batchTask,
TTaskType.UPDATE_TABLET_META_INFO);
-
- if (!countDownLatch.getStatus().ok()) {
- errMsg += " Error: " +
countDownLatch.getStatus().getErrorMsg();
- } else {
- List<Map.Entry<Long, Set<Pair<Long, Integer>>>>
unfinishedMarks = countDownLatch.getLeftMarks();
- // only show at most 3 results
- List<Map.Entry<Long, Set<Pair<Long, Integer>>>> subList =
unfinishedMarks.subList(0,
- Math.min(unfinishedMarks.size(), 3));
- if (!subList.isEmpty()) {
- errMsg += " Unfinished mark: " + Joiner.on(",
").join(subList);
- }
- }
- errMsg += ". This operation maybe partial successfully, You
should retry until success.";
- LOG.warn(errMsg);
- throw new DdlException(errMsg);
- }
- }
- }
-
@Override
public void cancel(CancelStmt stmt) throws DdlException {
CancelAlterTableStmt cancelAlterTableStmt = (CancelAlterTableStmt)
stmt;
@@ -2988,7 +2910,8 @@ public class SchemaChangeHandler extends AlterHandler {
for (Partition partition : partitions) {
- updatePartitionProperties(db, olapTable.getName(),
partition.getName(), -1, -1, newBinlogConfig);
+ updatePartitionProperties(db, olapTable.getName(),
partition.getName(), -1, -1,
+ newBinlogConfig, null, null);
}
olapTable.writeLockOrDdlException();
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
index 7d56ba8882..a1fd73a140 100755
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
@@ -4504,7 +4504,11 @@ public class Env {
}
tableProperty.buildInMemory()
.buildStoragePolicy()
- .buildIsBeingSynced();
+ .buildIsBeingSynced()
+ .buildCompactionPolicy()
+ .buildTimeSeriesCompactionGoalSizeMbytes()
+ .buildTimeSeriesCompactionFileCountThreshold()
+ .buildTimeSeriesCompactionTimeThresholdSeconds();
// need to update partition info meta
for (Partition partition : table.getPartitions()) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/task/UpdateTabletMetaInfoTask.java
b/fe/fe-core/src/main/java/org/apache/doris/task/UpdateTabletMetaInfoTask.java
index 6b12c34e0d..7b368cbf7c 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/task/UpdateTabletMetaInfoTask.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/task/UpdateTabletMetaInfoTask.java
@@ -83,11 +83,7 @@ public class UpdateTabletMetaInfoTask extends AgentTask {
MarkedCountDownLatch<Long, Set<Pair<Long,
Integer>>> latch,
String compactionPolicy,
Map<String, Long>
timeSeriesCompactionConfig) {
- this(backendId, tableIdWithSchemaHash);
- this.storagePolicyId = storagePolicyId;
- this.inMemory = inMemory;
- this.binlogConfig = binlogConfig;
- this.latch = latch;
+ this(backendId, tableIdWithSchemaHash, inMemory, storagePolicyId,
binlogConfig, latch);
this.compactionPolicy = compactionPolicy;
this.timeSeriesCompactionConfig = timeSeriesCompactionConfig;
}
diff --git
a/regression-test/suites/compaction/test_table_level_compaction_policy.groovy
b/regression-test/suites/compaction/test_table_level_compaction_policy.groovy
index 5bb33eebf2..e0b606af9d 100644
---
a/regression-test/suites/compaction/test_table_level_compaction_policy.groovy
+++
b/regression-test/suites/compaction/test_table_level_compaction_policy.groovy
@@ -65,7 +65,7 @@ suite("test_table_level_compaction_policy") {
logger.info("${showResult3}")
assertTrue(showResult3.toString().containsIgnoreCase('"time_series_compaction_file_count_threshold"
= "6000"'))
- sql """
+ sql """
alter table ${tableName} set
("time_series_compaction_time_threshold_seconds" = "3000")
"""
sql """sync"""
@@ -74,6 +74,33 @@ suite("test_table_level_compaction_policy") {
logger.info("${showResult4}")
assertTrue(showResult4.toString().containsIgnoreCase('"time_series_compaction_time_threshold_seconds"
= "3000"'))
+ sql """
+ alter table ${tableName} set
("time_series_compaction_goal_size_mbytes" = "1024")
+ """
+ sql """sync"""
+
+ def showResult6 = sql """show create table ${tableName}"""
+ logger.info("${showResult6}")
+
assertTrue(showResult6.toString().containsIgnoreCase('"time_series_compaction_goal_size_mbytes"
= "1024"'))
+
+ sql """
+ alter table ${tableName} set
("time_series_compaction_file_count_threshold" = "6000")
+ """
+ sql """sync"""
+
+ def showResult7 = sql """show create table ${tableName}"""
+ logger.info("${showResult7}")
+
assertTrue(showResult7.toString().containsIgnoreCase('"time_series_compaction_file_count_threshold"
= "6000"'))
+
+ sql """
+ alter table ${tableName} set
("time_series_compaction_time_threshold_seconds" = "3000")
+ """
+ sql """sync"""
+
+ def showResult8 = sql """show create table ${tableName}"""
+ logger.info("${showResult8}")
+
assertTrue(showResult8.toString().containsIgnoreCase('"time_series_compaction_time_threshold_seconds"
= "3000"'))
+
sql """ DROP TABLE IF EXISTS ${tableName} """
sql """sync"""
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]