This is an automated email from the ASF dual-hosted git repository.

kxiao pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.0 by this push:
     new 88abaaef33 [fix](alter table property) fix alter property if rpc 
failed, branch 2.0 #22845 (#23116)
88abaaef33 is described below

commit 88abaaef33d3f506a262e3a52e838eb5dc58a51c
Author: Chenyang Sun <[email protected]>
AuthorDate: Thu Aug 17 18:16:51 2023 +0800

    [fix](alter table property) fix alter property if rpc failed, branch 2.0 
#22845 (#23116)
---
 .../main/java/org/apache/doris/alter/Alter.java    | 22 +-----
 .../apache/doris/alter/SchemaChangeHandler.java    | 91 ++--------------------
 .../main/java/org/apache/doris/catalog/Env.java    |  6 +-
 .../doris/task/UpdateTabletMetaInfoTask.java       |  6 +-
 .../test_table_level_compaction_policy.groovy      | 29 ++++++-
 5 files changed, 45 insertions(+), 109 deletions(-)

diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/Alter.java 
b/fe/fe-core/src/main/java/org/apache/doris/alter/Alter.java
index f8c90b8a52..8a423e5894 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/alter/Alter.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/alter/Alter.java
@@ -211,27 +211,13 @@ public class Alter {
         } else if (currentAlterOps.checkIsBeingSynced(alterClauses)) {
             
olapTable.setIsBeingSynced(currentAlterOps.isBeingSynced(alterClauses));
             needProcessOutsideTableLock = true;
-        } else if (currentAlterOps.checkCompactionPolicy(alterClauses)
-                    && currentAlterOps.getCompactionPolicy(alterClauses) != 
olapTable.getCompactionPolicy()) {
-            
olapTable.setCompactionPolicy(currentAlterOps.getCompactionPolicy(alterClauses));
+        } else if (currentAlterOps.checkCompactionPolicy(alterClauses)) {
             needProcessOutsideTableLock = true;
-        } else if 
(currentAlterOps.checkTimeSeriesCompactionGoalSizeMbytes(alterClauses)
-                    && 
currentAlterOps.getTimeSeriesCompactionGoalSizeMbytes(alterClauses)
-                                                != 
olapTable.getTimeSeriesCompactionGoalSizeMbytes()) {
-            olapTable.setTimeSeriesCompactionGoalSizeMbytes(currentAlterOps
-                                            
.getTimeSeriesCompactionGoalSizeMbytes(alterClauses));
+        } else if 
(currentAlterOps.checkTimeSeriesCompactionGoalSizeMbytes(alterClauses)) {
             needProcessOutsideTableLock = true;
-        } else if 
(currentAlterOps.checkTimeSeriesCompactionFileCountThreshold(alterClauses)
-                    && 
currentAlterOps.getTimeSeriesCompactionFileCountThreshold(alterClauses)
-                                                != 
olapTable.getTimeSeriesCompactionFileCountThreshold()) {
-            olapTable.setTimeSeriesCompactionFileCountThreshold(currentAlterOps
-                                            
.getTimeSeriesCompactionFileCountThreshold(alterClauses));
+        } else if 
(currentAlterOps.checkTimeSeriesCompactionFileCountThreshold(alterClauses)) {
             needProcessOutsideTableLock = true;
-        } else if 
(currentAlterOps.checkTimeSeriesCompactionTimeThresholdSeconds(alterClauses)
-                    && 
currentAlterOps.getTimeSeriesCompactionTimeThresholdSeconds(alterClauses)
-                                                != 
olapTable.getTimeSeriesCompactionTimeThresholdSeconds()) {
-            
olapTable.setTimeSeriesCompactionTimeThresholdSeconds(currentAlterOps
-                                            
.getTimeSeriesCompactionTimeThresholdSeconds(alterClauses));
+        } else if 
(currentAlterOps.checkTimeSeriesCompactionTimeThresholdSeconds(alterClauses)) {
             needProcessOutsideTableLock = true;
         } else if (currentAlterOps.checkBinlogConfigChange(alterClauses)) {
             if (!Config.enable_feature_binlog) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java 
b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java
index 9130555414..b555516bc2 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java
@@ -2218,7 +2218,8 @@ public class SchemaChangeHandler extends AlterHandler {
 
         for (String partitionName : partitionNames) {
             try {
-                updatePartitionProperties(db, olapTable.getName(), 
partitionName, storagePolicyId, isInMemory, null);
+                updatePartitionProperties(db, olapTable.getName(), 
partitionName, storagePolicyId,
+                                                                               
     isInMemory, null, null, null);
             } catch (Exception e) {
                 String errMsg = "Failed to update partition[" + partitionName 
+ "]'s 'in_memory' property. "
                         + "The reason is [" + e.getMessage() + "]";
@@ -2232,7 +2233,8 @@ public class SchemaChangeHandler extends AlterHandler {
      * This operation may return partial successfully, with an exception to 
inform user to retry
      */
     public void updatePartitionProperties(Database db, String tableName, 
String partitionName, long storagePolicyId,
-                                          int isInMemory, BinlogConfig 
binlogConfig) throws UserException {
+                                          int isInMemory, BinlogConfig 
binlogConfig, String compactionPolicy,
+                                          Map<String, Long> 
timeSeriesCompactionConfig) throws UserException {
         // be id -> <tablet id,schemaHash>
         Map<Long, Set<Pair<Long, Integer>>> beIdToTabletIdWithHash = 
Maps.newHashMap();
         OlapTable olapTable = (OlapTable) 
db.getTableOrMetaException(tableName, Table.TableType.OLAP);
@@ -2264,7 +2266,7 @@ public class SchemaChangeHandler extends AlterHandler {
         for (Map.Entry<Long, Set<Pair<Long, Integer>>> kv : 
beIdToTabletIdWithHash.entrySet()) {
             countDownLatch.addMark(kv.getKey(), kv.getValue());
             UpdateTabletMetaInfoTask task = new 
UpdateTabletMetaInfoTask(kv.getKey(), kv.getValue(), isInMemory,
-                    storagePolicyId, binlogConfig, countDownLatch);
+                    storagePolicyId, binlogConfig, countDownLatch, 
compactionPolicy, timeSeriesCompactionConfig);
             batchTask.addTask(task);
         }
         if (!FeConstants.runningUnitTest) {
@@ -2307,86 +2309,6 @@ public class SchemaChangeHandler extends AlterHandler {
         }
     }
 
-    /**
-     * Update one specified partition's properties by partition name of table
-     * This operation may return partial successfully, with an exception to 
inform user to retry
-     */
-    public void updatePartitionProperties(Database db, String tableName, 
String partitionName, long storagePolicyId,
-                                          int isInMemory, BinlogConfig 
binlogConfig, String compactionPolicy,
-                                          Map<String, Long> 
timeSeriesCompactionConfig) throws UserException {
-        // be id -> <tablet id,schemaHash>
-        Map<Long, Set<Pair<Long, Integer>>> beIdToTabletIdWithHash = 
Maps.newHashMap();
-        OlapTable olapTable = (OlapTable) 
db.getTableOrMetaException(tableName, Table.TableType.OLAP);
-        olapTable.readLock();
-        try {
-            Partition partition = olapTable.getPartition(partitionName);
-            if (partition == null) {
-                throw new DdlException(
-                        "Partition[" + partitionName + "] does not exist in 
table[" + olapTable.getName() + "]");
-            }
-
-            for (MaterializedIndex index : 
partition.getMaterializedIndices(IndexExtState.VISIBLE)) {
-                int schemaHash = 
olapTable.getSchemaHashByIndexId(index.getId());
-                for (Tablet tablet : index.getTablets()) {
-                    for (Replica replica : tablet.getReplicas()) {
-                        Set<Pair<Long, Integer>> tabletIdWithHash = 
beIdToTabletIdWithHash.computeIfAbsent(
-                                replica.getBackendId(), k -> 
Sets.newHashSet());
-                        tabletIdWithHash.add(Pair.of(tablet.getId(), 
schemaHash));
-                    }
-                }
-            }
-        } finally {
-            olapTable.readUnlock();
-        }
-
-        int totalTaskNum = beIdToTabletIdWithHash.keySet().size();
-        MarkedCountDownLatch<Long, Set<Pair<Long, Integer>>> countDownLatch = 
new MarkedCountDownLatch<>(totalTaskNum);
-        AgentBatchTask batchTask = new AgentBatchTask();
-        for (Map.Entry<Long, Set<Pair<Long, Integer>>> kv : 
beIdToTabletIdWithHash.entrySet()) {
-            countDownLatch.addMark(kv.getKey(), kv.getValue());
-            UpdateTabletMetaInfoTask task = new 
UpdateTabletMetaInfoTask(kv.getKey(), kv.getValue(), isInMemory,
-                    storagePolicyId, binlogConfig, countDownLatch, 
compactionPolicy, timeSeriesCompactionConfig);
-            batchTask.addTask(task);
-        }
-        if (!FeConstants.runningUnitTest) {
-            // send all tasks and wait them finished
-            AgentTaskQueue.addBatchTask(batchTask);
-            AgentTaskExecutor.submit(batchTask);
-            LOG.info("send update tablet meta task for table {}, partitions 
{}, number: {}", tableName, partitionName,
-                    batchTask.getTaskNum());
-
-            // estimate timeout
-            long timeout = DbUtil.getCreateReplicasTimeoutMs(totalTaskNum);
-            boolean ok = false;
-            try {
-                ok = countDownLatch.await(timeout, TimeUnit.MILLISECONDS);
-            } catch (InterruptedException e) {
-                LOG.warn("InterruptedException: ", e);
-            }
-
-            if (!ok || !countDownLatch.getStatus().ok()) {
-                String errMsg = "Failed to update partition[" + partitionName 
+ "]. tablet meta.";
-                // clear tasks
-                AgentTaskQueue.removeBatchTask(batchTask, 
TTaskType.UPDATE_TABLET_META_INFO);
-
-                if (!countDownLatch.getStatus().ok()) {
-                    errMsg += " Error: " + 
countDownLatch.getStatus().getErrorMsg();
-                } else {
-                    List<Map.Entry<Long, Set<Pair<Long, Integer>>>> 
unfinishedMarks = countDownLatch.getLeftMarks();
-                    // only show at most 3 results
-                    List<Map.Entry<Long, Set<Pair<Long, Integer>>>> subList = 
unfinishedMarks.subList(0,
-                            Math.min(unfinishedMarks.size(), 3));
-                    if (!subList.isEmpty()) {
-                        errMsg += " Unfinished mark: " + Joiner.on(", 
").join(subList);
-                    }
-                }
-                errMsg += ". This operation maybe partial successfully, You 
should retry until success.";
-                LOG.warn(errMsg);
-                throw new DdlException(errMsg);
-            }
-        }
-    }
-
     @Override
     public void cancel(CancelStmt stmt) throws DdlException {
         CancelAlterTableStmt cancelAlterTableStmt = (CancelAlterTableStmt) 
stmt;
@@ -2988,7 +2910,8 @@ public class SchemaChangeHandler extends AlterHandler {
 
 
         for (Partition partition : partitions) {
-            updatePartitionProperties(db, olapTable.getName(), 
partition.getName(), -1, -1, newBinlogConfig);
+            updatePartitionProperties(db, olapTable.getName(), 
partition.getName(), -1, -1,
+                                                newBinlogConfig, null, null);
         }
 
         olapTable.writeLockOrDdlException();
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
index 7d56ba8882..a1fd73a140 100755
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
@@ -4504,7 +4504,11 @@ public class Env {
         }
         tableProperty.buildInMemory()
                 .buildStoragePolicy()
-                .buildIsBeingSynced();
+                .buildIsBeingSynced()
+                .buildCompactionPolicy()
+                .buildTimeSeriesCompactionGoalSizeMbytes()
+                .buildTimeSeriesCompactionFileCountThreshold()
+                .buildTimeSeriesCompactionTimeThresholdSeconds();
 
         // need to update partition info meta
         for (Partition partition : table.getPartitions()) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/task/UpdateTabletMetaInfoTask.java 
b/fe/fe-core/src/main/java/org/apache/doris/task/UpdateTabletMetaInfoTask.java
index 6b12c34e0d..7b368cbf7c 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/task/UpdateTabletMetaInfoTask.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/task/UpdateTabletMetaInfoTask.java
@@ -83,11 +83,7 @@ public class UpdateTabletMetaInfoTask extends AgentTask {
                                     MarkedCountDownLatch<Long, Set<Pair<Long, 
Integer>>> latch,
                                     String compactionPolicy,
                                     Map<String, Long> 
timeSeriesCompactionConfig) {
-        this(backendId, tableIdWithSchemaHash);
-        this.storagePolicyId = storagePolicyId;
-        this.inMemory = inMemory;
-        this.binlogConfig = binlogConfig;
-        this.latch = latch;
+        this(backendId, tableIdWithSchemaHash, inMemory, storagePolicyId, 
binlogConfig, latch);
         this.compactionPolicy = compactionPolicy;
         this.timeSeriesCompactionConfig = timeSeriesCompactionConfig;
     }
diff --git 
a/regression-test/suites/compaction/test_table_level_compaction_policy.groovy 
b/regression-test/suites/compaction/test_table_level_compaction_policy.groovy
index 5bb33eebf2..e0b606af9d 100644
--- 
a/regression-test/suites/compaction/test_table_level_compaction_policy.groovy
+++ 
b/regression-test/suites/compaction/test_table_level_compaction_policy.groovy
@@ -65,7 +65,7 @@ suite("test_table_level_compaction_policy") {
     logger.info("${showResult3}")
     
assertTrue(showResult3.toString().containsIgnoreCase('"time_series_compaction_file_count_threshold"
 = "6000"'))
 
-     sql """
+    sql """
         alter table ${tableName} set 
("time_series_compaction_time_threshold_seconds" = "3000")
         """
     sql """sync"""
@@ -74,6 +74,33 @@ suite("test_table_level_compaction_policy") {
     logger.info("${showResult4}")
     
assertTrue(showResult4.toString().containsIgnoreCase('"time_series_compaction_time_threshold_seconds"
 = "3000"'))
 
+    sql """
+        alter table ${tableName} set 
("time_series_compaction_goal_size_mbytes" = "1024")
+        """
+    sql """sync"""
+
+    def showResult6 = sql """show create table ${tableName}"""
+    logger.info("${showResult6}")
+    
assertTrue(showResult6.toString().containsIgnoreCase('"time_series_compaction_goal_size_mbytes"
 = "1024"'))
+
+    sql """
+        alter table ${tableName} set 
("time_series_compaction_file_count_threshold" = "6000")
+        """
+    sql """sync"""
+
+    def showResult7 = sql """show create table ${tableName}"""
+    logger.info("${showResult7}")
+    
assertTrue(showResult7.toString().containsIgnoreCase('"time_series_compaction_file_count_threshold"
 = "6000"'))
+
+    sql """
+        alter table ${tableName} set 
("time_series_compaction_time_threshold_seconds" = "3000")
+        """
+    sql """sync"""
+
+    def showResult8 = sql """show create table ${tableName}"""
+    logger.info("${showResult8}")
+    
assertTrue(showResult8.toString().containsIgnoreCase('"time_series_compaction_time_threshold_seconds"
 = "3000"'))
+
     sql """ DROP TABLE IF EXISTS ${tableName} """
     sql """sync"""
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to