This is an automated email from the ASF dual-hosted git repository.
kirs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new 9d0c816 [Bug][worker] task throw ConcurrentModifiedException #5528
(#5530)
9d0c816 is described below
commit 9d0c816cee102edbba2ac080f483c8a73a0b7b30
Author: ruanwenjun <[email protected]>
AuthorDate: Tue May 25 04:53:56 2021 +0800
[Bug][worker] task throw ConcurrentModifiedException #5528 (#5530)
* [Bug][worker] task throw ConcurrentModifiedException #5528
* fix code smell
---
.../server/worker/task/AbstractTask.java | 7 +++++-
.../server/worker/task/sqoop/SqoopTaskTest.java | 25 +++++++++++++++++++++-
2 files changed, 30 insertions(+), 2 deletions(-)
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractTask.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractTask.java
index 7454f49..45b94d2 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractTask.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractTask.java
@@ -26,6 +26,7 @@ import
org.apache.dolphinscheduler.common.task.AbstractParameters;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import java.util.List;
+import java.util.StringJoiner;
import org.slf4j.Logger;
@@ -130,7 +131,11 @@ public abstract class AbstractTask {
if (logs.contains(FINALIZE_SESSION_MARKER.toString())) {
logger.info(FINALIZE_SESSION_MARKER,
FINALIZE_SESSION_MARKER.toString());
} else {
- logger.info(" -> {}", String.join("\n\t", logs));
+ // note: if the logs is a SynchronizedList and will be modified
concurrently,
+ // we should must use foreach to iterate the element, otherwise
will throw a ConcurrentModifiedException(#issue 5528)
+ StringJoiner joiner = new StringJoiner("\n\t");
+ logs.forEach(joiner::add);
+ logger.info(" -> {}", joiner);
}
}
diff --git
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/sqoop/SqoopTaskTest.java
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/sqoop/SqoopTaskTest.java
index 5787907..222c355 100644
---
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/sqoop/SqoopTaskTest.java
+++
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/sqoop/SqoopTaskTest.java
@@ -25,7 +25,10 @@ import
org.apache.dolphinscheduler.server.worker.task.sqoop.generator.SqoopJobGe
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.process.ProcessService;
+import java.util.ArrayList;
+import java.util.Collections;
import java.util.Date;
+import java.util.List;
import org.junit.Assert;
import org.junit.Before;
@@ -37,7 +40,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.ApplicationContext;
-
/**
* sqoop task test
*/
@@ -201,4 +203,25 @@ public class SqoopTaskTest {
}
}
+ @Test
+ public void testLogHandler() throws InterruptedException {
+ List<String> list = Collections.synchronizedList(new ArrayList<>());
+ Thread thread1 = new Thread(() -> {
+ for (int i = 0; i < 10; i++) {
+ list.add("test add log");
+ }
+ });
+ Thread thread2 = new Thread(() -> {
+ for (int i = 0; i < 10; i++) {
+ sqoopTask.logHandle(list);
+ }
+ });
+ thread1.start();
+ thread2.start();
+ thread1.join();
+ thread2.join();
+ // if no exception throw, assert true
+ Assert.assertTrue(true);
+ }
+
}