This is an automated email from the ASF dual-hosted git repository.

kirs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new 9d0c816  [Bug][worker] task throw ConcurrentModifiedException #5528 
(#5530)
9d0c816 is described below

commit 9d0c816cee102edbba2ac080f483c8a73a0b7b30
Author: ruanwenjun <[email protected]>
AuthorDate: Tue May 25 04:53:56 2021 +0800

    [Bug][worker] task throw ConcurrentModifiedException #5528 (#5530)
    
    * [Bug][worker] task throw ConcurrentModifiedException #5528
    
    * fix code smell
---
 .../server/worker/task/AbstractTask.java           |  7 +++++-
 .../server/worker/task/sqoop/SqoopTaskTest.java    | 25 +++++++++++++++++++++-
 2 files changed, 30 insertions(+), 2 deletions(-)

diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractTask.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractTask.java
index 7454f49..45b94d2 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractTask.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractTask.java
@@ -26,6 +26,7 @@ import 
org.apache.dolphinscheduler.common.task.AbstractParameters;
 import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
 
 import java.util.List;
+import java.util.StringJoiner;
 
 import org.slf4j.Logger;
 
@@ -130,7 +131,11 @@ public abstract class AbstractTask {
         if (logs.contains(FINALIZE_SESSION_MARKER.toString())) {
             logger.info(FINALIZE_SESSION_MARKER, 
FINALIZE_SESSION_MARKER.toString());
         } else {
-            logger.info(" -> {}", String.join("\n\t", logs));
+            // note: if the logs is a SynchronizedList and will be modified 
concurrently,
+            // we should must use foreach to iterate the element, otherwise 
will throw a ConcurrentModifiedException(#issue 5528)
+            StringJoiner joiner = new StringJoiner("\n\t");
+            logs.forEach(joiner::add);
+            logger.info(" -> {}", joiner);
         }
     }
 
diff --git 
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/sqoop/SqoopTaskTest.java
 
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/sqoop/SqoopTaskTest.java
index 5787907..222c355 100644
--- 
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/sqoop/SqoopTaskTest.java
+++ 
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/sqoop/SqoopTaskTest.java
@@ -25,7 +25,10 @@ import 
org.apache.dolphinscheduler.server.worker.task.sqoop.generator.SqoopJobGe
 import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
 import org.apache.dolphinscheduler.service.process.ProcessService;
 
+import java.util.ArrayList;
+import java.util.Collections;
 import java.util.Date;
+import java.util.List;
 
 import org.junit.Assert;
 import org.junit.Before;
@@ -37,7 +40,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.context.ApplicationContext;
 
-
 /**
  * sqoop task test
  */
@@ -201,4 +203,25 @@ public class SqoopTaskTest {
         }
     }
 
+    @Test
+    public void testLogHandler() throws InterruptedException {
+        List<String> list = Collections.synchronizedList(new ArrayList<>());
+        Thread thread1 = new Thread(() -> {
+            for (int i = 0; i < 10; i++) {
+                list.add("test add log");
+            }
+        });
+        Thread thread2 = new Thread(() -> {
+            for (int i = 0; i < 10; i++) {
+                sqoopTask.logHandle(list);
+            }
+        });
+        thread1.start();
+        thread2.start();
+        thread1.join();
+        thread2.join();
+        // if no exception throw, assert true
+        Assert.assertTrue(true);
+    }
+
 }

Reply via email to