This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new c5c18915648 branch-3.0: [enhance](mtmv)Only restrict MTMV to not allow 
concurrent insert overwrite execution #48673 (#49964)
c5c18915648 is described below

commit c5c1891564847b45e85e2841868fca30fe00a6d7
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Fri Apr 25 19:15:49 2025 +0800

    branch-3.0: [enhance](mtmv)Only restrict MTMV to not allow concurrent 
insert overwrite execution #48673 (#49964)
    
    Cherry-picked from #48673
    
    Co-authored-by: zhangdong <[email protected]>
---
 .../insertoverwrite/InsertOverwriteManager.java    |  3 +-
 .../doris/insertoverwrite/InsertOverwriteUtil.java |  9 +++++-
 .../InsertOverwriteManagerTest.java                | 30 +++++++++++++++----
 .../insertoverwrite/InsertOverwriteUtilTest.java   | 35 ++++++++++++++++++++++
 .../test_iot_auto_detect_concurrent.groovy         | 14 +++------
 5 files changed, 74 insertions(+), 17 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/insertoverwrite/InsertOverwriteManager.java
 
b/fe/fe-core/src/main/java/org/apache/doris/insertoverwrite/InsertOverwriteManager.java
index df16b8f1be2..cb01ff90339 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/insertoverwrite/InsertOverwriteManager.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/insertoverwrite/InsertOverwriteManager.java
@@ -19,6 +19,7 @@ package org.apache.doris.insertoverwrite;
 
 import org.apache.doris.catalog.DatabaseIf;
 import org.apache.doris.catalog.Env;
+import org.apache.doris.catalog.MTMV;
 import org.apache.doris.catalog.OlapTable;
 import org.apache.doris.catalog.TableIf;
 import org.apache.doris.common.DdlException;
@@ -294,7 +295,7 @@ public class InsertOverwriteManager extends MasterDaemon 
implements Writable {
         // If executed in parallel, it may cause problems such as not being 
able to find temporary partitions.
         // But in terms of external table, we don't care the internal logic of 
execution,
         // so there's no need to keep records
-        if (!(table instanceof OlapTable)) {
+        if (!(table instanceof MTMV)) {
             return;
         }
         long dbId = db.getId();
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/insertoverwrite/InsertOverwriteUtil.java
 
b/fe/fe-core/src/main/java/org/apache/doris/insertoverwrite/InsertOverwriteUtil.java
index 49e3e091d57..35bcaa0001e 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/insertoverwrite/InsertOverwriteUtil.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/insertoverwrite/InsertOverwriteUtil.java
@@ -103,9 +103,16 @@ public class InsertOverwriteUtil {
      * @return
      */
     public static List<String> generateTempPartitionNames(List<String> 
partitionNames) {
+        long threadId = Thread.currentThread().getId();
+        // Adding thread ID as a prefix is to avoid mutual interference
+        // when different threads perform insert overwrite on the same 
partition simultaneously.
+        // Even if the insert overwrite execution fails/cancels,
+        // the generated temporary partition will be deleted,
+        // so there will be no problem generating temporary partitions with 
the same name in a single thread
+        String prefix = "iot_temp_" + threadId + "_";
         List<String> tempPartitionNames = new 
ArrayList<String>(partitionNames.size());
         for (String partitionName : partitionNames) {
-            String tempPartitionName = "iot_temp_" + partitionName;
+            String tempPartitionName = prefix + partitionName;
             if (tempPartitionName.length() > 50) {
                 tempPartitionName = tempPartitionName.substring(0, 30) + 
Math.abs(Objects.hash(tempPartitionName))
                         + "_" + System.currentTimeMillis();
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/insertoverwrite/InsertOverwriteManagerTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/insertoverwrite/InsertOverwriteManagerTest.java
index 026f8213522..607d79b38ca 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/insertoverwrite/InsertOverwriteManagerTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/insertoverwrite/InsertOverwriteManagerTest.java
@@ -18,6 +18,7 @@
 package org.apache.doris.insertoverwrite;
 
 import org.apache.doris.catalog.DatabaseIf;
+import org.apache.doris.catalog.MTMV;
 import org.apache.doris.catalog.OlapTable;
 import org.apache.doris.common.AnalysisException;
 import org.apache.doris.common.DdlException;
@@ -40,6 +41,9 @@ public class InsertOverwriteManagerTest {
     @Mocked
     private HMSExternalTable hmsExternalTable;
 
+    @Mocked
+    private MTMV mtmv;
+
     @Before
     public void setUp()
             throws NoSuchMethodException, SecurityException, 
AnalysisException, DdlException, MetaNotFoundException {
@@ -69,18 +73,26 @@ public class InsertOverwriteManagerTest {
                 hmsExternalTable.getName();
                 minTimes = 0;
                 result = "hmsTable";
+
+                mtmv.getId();
+                minTimes = 0;
+                result = 4L;
+
+                mtmv.getName();
+                minTimes = 0;
+                result = "mtmv1";
             }
         };
     }
 
     @Test
-    public void testParallel() {
+    public void testMTMVParallel() {
         InsertOverwriteManager manager = new InsertOverwriteManager();
-        manager.recordRunningTableOrException(db, table);
+        manager.recordRunningTableOrException(db, mtmv);
         
Assertions.assertThrows(org.apache.doris.nereids.exceptions.AnalysisException.class,
-                () -> manager.recordRunningTableOrException(db, table));
-        manager.dropRunningRecord(db.getId(), table.getId());
-        Assertions.assertDoesNotThrow(() -> 
manager.recordRunningTableOrException(db, table));
+                () -> manager.recordRunningTableOrException(db, mtmv));
+        manager.dropRunningRecord(db.getId(), mtmv.getId());
+        Assertions.assertDoesNotThrow(() -> 
manager.recordRunningTableOrException(db, mtmv));
     }
 
     @Test
@@ -90,4 +102,12 @@ public class InsertOverwriteManagerTest {
         Assertions.assertDoesNotThrow(() -> 
manager.recordRunningTableOrException(db, hmsExternalTable));
         manager.dropRunningRecord(db.getId(), hmsExternalTable.getId());
     }
+
+    @Test
+    public void testOlapTableParallel() {
+        InsertOverwriteManager manager = new InsertOverwriteManager();
+        manager.recordRunningTableOrException(db, table);
+        Assertions.assertDoesNotThrow(() -> 
manager.recordRunningTableOrException(db, table));
+        manager.dropRunningRecord(db.getId(), table.getId());
+    }
 }
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/insertoverwrite/InsertOverwriteUtilTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/insertoverwrite/InsertOverwriteUtilTest.java
new file mode 100644
index 00000000000..947e876c53c
--- /dev/null
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/insertoverwrite/InsertOverwriteUtilTest.java
@@ -0,0 +1,35 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.insertoverwrite;
+
+import com.google.common.collect.Lists;
+import org.junit.Test;
+import org.junit.jupiter.api.Assertions;
+
+import java.util.List;
+
+public class InsertOverwriteUtilTest {
+
+    @Test
+    public void testGenerateTempPartitionNames() {
+        String regex = "^iot_temp_[0-9]+_p1$";
+        List<String> res = 
InsertOverwriteUtil.generateTempPartitionNames(Lists.newArrayList("p1"));
+        String tempP1Name = res.get(0);
+        Assertions.assertTrue(tempP1Name.matches(regex));
+    }
+}
diff --git 
a/regression-test/suites/insert_overwrite_p1/test_iot_auto_detect_concurrent.groovy
 
b/regression-test/suites/insert_overwrite_p1/test_iot_auto_detect_concurrent.groovy
index 0ce026fb99b..e796edfe5bb 100644
--- 
a/regression-test/suites/insert_overwrite_p1/test_iot_auto_detect_concurrent.groovy
+++ 
b/regression-test/suites/insert_overwrite_p1/test_iot_auto_detect_concurrent.groovy
@@ -26,7 +26,6 @@ suite("test_iot_auto_detect_concurrent") {
     sql new 
File("""${context.file.parent}/ddl/test_iot_auto_detect_concurrent.sql""").text
 
     def success_status = true
-    def err_msg = ""
     def load_data = { range, offset, expect_success ->
         try {
             sql " use test_iot_auto_detect_concurrent; "
@@ -38,7 +37,6 @@ suite("test_iot_auto_detect_concurrent") {
                 success_status = false
                 log.info("fails one")
             }
-            err_msg = e.getMessage()
             log.info("successfully catch the failed insert")
             return
         }
@@ -100,14 +98,10 @@ suite("test_iot_auto_detect_concurrent") {
     thread6.join()
     thread7.join()
     // suppose result: Success to overwrite with a multiple of ten values
-    if (!success_status) {
-        // Not allowed running Insert Overwrite on same table
-        assertTrue(err_msg.contains('same table'))
-    } else {
-        // The execution was fast, resulting in no concurrent execution
-        qt_sql3 " select count(k0) from test_concurrent_write; "
-        qt_sql4 " select count(distinct k0) from test_concurrent_write; "
-    }
+    assertTrue(success_status)
+    qt_sql3 " select count(k0) from test_concurrent_write; "
+    qt_sql4 " select count(distinct k0) from test_concurrent_write; "
+
 
     /// with drop partition concurrently
     success_status = true


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to