This is an automated email from the ASF dual-hosted git repository.

kirs pushed a commit to branch 1.3.7-prepare_#5783
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git

commit a05ba8150bc9a92ebcc7649349c32d09273e8fc3
Author: CalvinKirs <[email protected]>
AuthorDate: Mon Jul 19 16:28:07 2021 +0800

    [1.3.7-prepare#5775][Improvement][Worker] Task log may be lost
    Issue #5775
    Pr #5783
---
 .../worker/task/AbstractCommandExecutor.java       | 59 ++++++----------------
 .../server/worker/task/AbstractTask.java           | 11 +++-
 .../server/worker/task/PythonCommandExecutor.java  |  3 +-
 .../server/worker/task/ShellCommandExecutor.java   |  3 +-
 .../server/worker/task/sqoop/SqoopTaskTest.java    | 40 ++++++++++++---
 5 files changed, 61 insertions(+), 55 deletions(-)

diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractCommandExecutor.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractCommandExecutor.java
index 4baf8af..a0d1016 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractCommandExecutor.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractCommandExecutor.java
@@ -41,6 +41,7 @@ import java.util.Collections;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Consumer;
 import java.util.regex.Matcher;
@@ -67,7 +68,7 @@ public abstract class AbstractCommandExecutor {
     /**
      *  log handler
      */
-    protected Consumer<List<String>> logHandler;
+    protected Consumer<LinkedBlockingQueue<String>> logHandler;
 
     /**
      *  logger
@@ -75,9 +76,11 @@ public abstract class AbstractCommandExecutor {
     protected Logger logger;
 
     /**
-     *  log list
+     * log collection
      */
-    protected final List<String> logBuffer;
+    protected final LinkedBlockingQueue<String> logBuffer;
+
+    protected boolean logOutputIsScuccess = false;
 
     /**
      * taskExecutionContext
@@ -92,13 +95,13 @@ public abstract class AbstractCommandExecutor {
      */
     private TaskExecutionContextCacheManager taskExecutionContextCacheManager;
 
-    public AbstractCommandExecutor(Consumer<List<String>> logHandler,
+    public AbstractCommandExecutor(Consumer<LinkedBlockingQueue<String>> 
logHandler,
                                    TaskExecutionContext taskExecutionContext ,
                                    Logger logger){
         this.logHandler = logHandler;
         this.taskExecutionContext = taskExecutionContext;
         this.logger = logger;
-        this.logBuffer = Collections.synchronizedList(new ArrayList<>());
+        this.logBuffer = new LinkedBlockingQueue<>();
         this.taskExecutionContextCacheManager = 
SpringApplicationContext.getBean(TaskExecutionContextCacheManagerImpl.class);
     }
 
@@ -335,15 +338,14 @@ public abstract class AbstractCommandExecutor {
      */
     private void clear() {
 
-        List<String> markerList = new ArrayList<>();
-        
markerList.add(ch.qos.logback.classic.ClassicConstants.FINALIZE_SESSION_MARKER.toString());
+        LinkedBlockingQueue<String> markerLog = new LinkedBlockingQueue<>();
+        
markerLog.add(ch.qos.logback.classic.ClassicConstants.FINALIZE_SESSION_MARKER.toString());
 
         if (!logBuffer.isEmpty()) {
             // log handle
             logHandler.accept(logBuffer);
-            logBuffer.clear();
         }
-        logHandler.accept(markerList);
+        logHandler.accept(markerLog);
     }
 
     /**
@@ -354,9 +356,7 @@ public abstract class AbstractCommandExecutor {
         String threadLoggerInfoName = 
String.format(LoggerUtils.TASK_LOGGER_THREAD_NAME + "-%s", 
taskExecutionContext.getTaskAppId());
         ExecutorService getOutputLogService = 
ThreadUtils.newDaemonSingleThreadExecutor(threadLoggerInfoName + "-" + 
"getOutputLogService");
         getOutputLogService.submit(() -> {
-            BufferedReader inReader = null;
-            try {
-                inReader = new BufferedReader(new 
InputStreamReader(process.getInputStream()));
+            try (BufferedReader inReader = new BufferedReader(new 
InputStreamReader(process.getInputStream()))) {
                 String line;
                 while ((line = inReader.readLine()) != null) {
                     logBuffer.add(line);
@@ -364,8 +364,7 @@ public abstract class AbstractCommandExecutor {
             } catch (Exception e) {
                 logger.error(e.getMessage(), e);
             } finally {
-                logOutputIsSuccess = true;
-                close(inReader);
+                logOutputIsScuccess = true;
             }
         });
         getOutputLogService.shutdown();
@@ -454,31 +453,20 @@ public abstract class AbstractCommandExecutor {
      * @return line list
      */
     private List<String> convertFile2List(String filename) {
-        List lineList = new ArrayList<String>(100);
+        List<String> lineList = new ArrayList<>(100);
         File file=new File(filename);
 
         if (!file.exists()){
             return lineList;
         }
 
-        BufferedReader br = null;
-        try {
-            br = new BufferedReader(new InputStreamReader(new 
FileInputStream(filename), StandardCharsets.UTF_8));
+        try (BufferedReader br = new BufferedReader(new InputStreamReader(new 
FileInputStream(filename), StandardCharsets.UTF_8))) {
             String line = null;
             while ((line = br.readLine()) != null) {
                 lineList.add(line);
             }
         } catch (Exception e) {
             logger.error(String.format("read file: %s failed : ",filename),e);
-        } finally {
-            if(br != null){
-                try {
-                    br.close();
-                } catch (IOException e) {
-                    logger.error(e.getMessage(),e);
-                }
-            }
-
         }
         return lineList;
     }
@@ -555,27 +543,10 @@ public abstract class AbstractCommandExecutor {
             lastFlushTime = now;
             /** log handle */
             logHandler.accept(logBuffer);
-
-            logBuffer.clear();
         }
         return lastFlushTime;
     }
 
-    /**
-     * close buffer reader
-     *
-     * @param inReader in reader
-     */
-    private void close(BufferedReader inReader) {
-        if (inReader != null) {
-            try {
-                inReader.close();
-            } catch (IOException e) {
-                logger.error(e.getMessage(), e);
-            }
-        }
-    }
-
     protected List<String> commandOptions() {
         return Collections.emptyList();
     }
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractTask.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractTask.java
index 84c5052..a85acdc 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractTask.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractTask.java
@@ -44,6 +44,8 @@ import org.apache.commons.lang.StringUtils;
 
 import java.util.List;
 import java.util.Map;
+import java.util.StringJoiner;
+import java.util.concurrent.LinkedBlockingQueue;
 
 import org.slf4j.Logger;
 
@@ -119,14 +121,19 @@ public abstract class AbstractTask {
 
     /**
      * log handle
+     *
      * @param logs log list
      */
-    public void logHandle(List<String> logs) {
+    public void logHandle(LinkedBlockingQueue<String> logs) {
         // note that the "new line" is added here to facilitate log parsing
         if (logs.contains(FINALIZE_SESSION_MARKER.toString())) {
             logger.info(FINALIZE_SESSION_MARKER, 
FINALIZE_SESSION_MARKER.toString());
         } else {
-            logger.info(" -> {}", String.join("\n\t", logs));
+            StringJoiner joiner = new StringJoiner("\n\t");
+            while (!logs.isEmpty()) {
+                joiner.add(logs.poll());
+            }
+            logger.info(" -> {}", joiner);
         }
     }
 
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/PythonCommandExecutor.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/PythonCommandExecutor.java
index 344d00f..2ea5710 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/PythonCommandExecutor.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/PythonCommandExecutor.java
@@ -30,6 +30,7 @@ import java.nio.file.Paths;
 import java.util.Collections;
 import java.util.Date;
 import java.util.List;
+import java.util.concurrent.LinkedBlockingQueue;
 import java.util.function.Consumer;
 
 /**
@@ -54,7 +55,7 @@ public class PythonCommandExecutor extends 
AbstractCommandExecutor {
      * @param taskExecutionContext       taskExecutionContext
      * @param logger        logger
      */
-    public PythonCommandExecutor(Consumer<List<String>> logHandler,
+    public PythonCommandExecutor(Consumer<LinkedBlockingQueue<String>> 
logHandler,
                                  TaskExecutionContext taskExecutionContext,
                                  Logger logger) {
         super(logHandler,taskExecutionContext,logger);
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/ShellCommandExecutor.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/ShellCommandExecutor.java
index 5e297ab..7a552c4 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/ShellCommandExecutor.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/ShellCommandExecutor.java
@@ -27,6 +27,7 @@ import java.nio.charset.StandardCharsets;
 import java.nio.file.Files;
 import java.nio.file.Paths;
 import java.util.List;
+import java.util.concurrent.LinkedBlockingQueue;
 import java.util.function.Consumer;
 
 /**
@@ -50,7 +51,7 @@ public class ShellCommandExecutor extends 
AbstractCommandExecutor {
      * @param taskExecutionContext taskExecutionContext
      * @param logger logger
      */
-    public ShellCommandExecutor(Consumer<List<String>> logHandler,
+    public ShellCommandExecutor(Consumer<LinkedBlockingQueue<String>> 
logHandler,
                                 TaskExecutionContext taskExecutionContext,
                                 Logger logger) {
         super(logHandler,taskExecutionContext,logger);
diff --git 
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/sqoop/SqoopTaskTest.java
 
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/sqoop/SqoopTaskTest.java
index 5a15a21..fecb31e 100644
--- 
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/sqoop/SqoopTaskTest.java
+++ 
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/sqoop/SqoopTaskTest.java
@@ -25,7 +25,11 @@ import 
org.apache.dolphinscheduler.server.worker.task.sqoop.generator.SqoopJobGe
 import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
 import org.apache.dolphinscheduler.service.process.ProcessService;
 
+import java.util.ArrayList;
+import java.util.Collections;
 import java.util.Date;
+import java.util.List;
+import java.util.concurrent.LinkedBlockingQueue;
 
 import org.junit.Assert;
 import org.junit.Before;
@@ -137,18 +141,19 @@ public class SqoopTaskTest {
         //import mysql to hive
         String mysqlToHive =
             
"{\"jobName\":\"sqoop_import\",\"jobType\":\"TEMPLATE\",\"concurrency\":1,\"modelType\":\"import\",\"sourceType\":\"MYSQL\",\"targetType\":\"HIVE\","
-                + 
"\"hadoopCustomParams\":[{\"prop\":\"mapreduce.map.memory.mb\",\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"2048\"},{\"prop\":\"mapreduce.reduce.memory.mb\",\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"2048\"}],"
-                + 
"\"sqoopAdvancedParams\":[{\"prop\":\"--delete-target-dir\",\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"\"},{\"prop\":\"--direct\",\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"\"}],"
                 + 
"\"sourceParams\":\"{\\\"srcDatasource\\\":2,\\\"srcTable\\\":\\\"person_2\\\",\\\"srcQueryType\\\":\\\"1\\\","
                 + "\\\"srcQuerySql\\\":\\\"SELECT * FROM 
person_2\\\",\\\"srcColumnType\\\":\\\"0\\\",\\\"srcColumns\\\":\\\"\\\",\\\"srcConditionList\\\":[],"
-                + 
"\\\"mapColumnHive\\\":[{\\\"prop\\\":\\\"create_time\\\",\\\"direct\\\":\\\"IN\\\",\\\"type\\\":\\\"VARCHAR\\\",\\\"value\\\":\\\"string\\\"},{\\\"prop\\\":\\\"update_time\\\",\\\"direct\\\":\\\"IN\\\",\\\"type\\\":\\\"VARCHAR\\\",\\\"value\\\":\\\"string\\\"}],"
-                + 
"\\\"mapColumnJava\\\":[{\\\"prop\\\":\\\"create_time\\\",\\\"direct\\\":\\\"IN\\\",\\\"type\\\":\\\"VARCHAR\\\",\\\"value\\\":\\\"java.sql.Date\\\"},{\\\"prop\\\":\\\"update_time\\\",\\\"direct\\\":\\\"IN\\\",\\\"type\\\":\\\"VARCHAR\\\",\\\"value\\\":\\\"java.sql.Date\\\"}]}\","
+                + 
"\\\"mapColumnHive\\\":[],\\\"mapColumnJava\\\":[{\\\"prop\\\":\\\"id\\\",\\\"direct\\\":\\\"IN\\\",\\\"type\\\":\\\"VARCHAR\\\",\\\"value\\\":\\\"Integer\\\"}]}\","
                 + 
"\"targetParams\":\"{\\\"hiveDatabase\\\":\\\"stg\\\",\\\"hiveTable\\\":\\\"person_internal_2\\\",\\\"createHiveTable\\\":true,\\\"dropDelimiter\\\":false,"
-                + 
"\\\"hiveOverWrite\\\":true,\\\"hiveTargetDir\\\":\\\"/tmp/sqoop_import_hive\\\",\\\"replaceDelimiter\\\":\\\"\\\",\\\"hivePartitionKey\\\":\\\"date\\\",\\\"hivePartitionValue\\\":\\\"2020-02-16\\\"}\",\"localParams\":[]}";
+                + 
"\\\"hiveOverWrite\\\":true,\\\"replaceDelimiter\\\":\\\"\\\",\\\"hivePartitionKey\\\":\\\"date\\\",\\\"hivePartitionValue\\\":\\\"2020-02-16\\\"}\",\"localParams\":[]}";
         SqoopParameters mysqlToHiveParams = JSONUtils.parseObject(mysqlToHive, 
SqoopParameters.class);
         String mysqlToHiveScript = 
generator.generateSqoopJob(mysqlToHiveParams, mysqlTaskExecutionContext);
-        String mysqlToHiveExpected ="sqoop import -D 
mapred.job.name=sqoop_import -D mapreduce.map.memory.mb=2048 -D 
mapreduce.reduce.memory.mb=2048 --delete-target-dir  --direct  -m 1 --connect 
\"jdbc:mysql://192.168.0.111:3306/test?allowLoadLocalInfile=false&autoDeserialize=false&allowLocalInfile=false&allowUrlInLocalInfile=false\"
 --username kylo --password \"123456\" --query \"SELECT * FROM person_2 WHERE 
\\$CONDITIONS\" --map-column-hive create_time=string,update_time=string --map-c 
[...]
-
+        String mysqlToHiveExpected =
+            "sqoop import -D mapred.job.name=sqoop_import -m 1 --connect 
\"jdbc:mysql://192.168.0.111:3306/"
+                    + 
"test?allowLoadLocalInfile=false&autoDeserialize=false&allowLocalInfile=false&allowUrlInLocalInfile=false\"
 "
+                    + "--username kylo --password \"123456\" "
+                + "--query \"SELECT * FROM person_2 WHERE \\$CONDITIONS\" 
--map-column-java id=Integer --hive-import --hive-database stg --hive-table 
person_internal_2 "
+                + "--create-hive-table --hive-overwrite --delete-target-dir 
--hive-partition-key date --hive-partition-value 2020-02-16";
                       Assert.assertEquals(mysqlToHiveExpected, 
mysqlToHiveScript);
 
         //sqoop CUSTOM job
@@ -199,4 +204,25 @@ public class SqoopTaskTest {
         }
     }
 
+    @Test
+    public void testLogHandler() throws InterruptedException {
+        LinkedBlockingQueue<String> loggerBuffer = new LinkedBlockingQueue<>();
+        Thread thread1 = new Thread(() -> {
+            for (int i = 0; i < 10; i++) {
+                loggerBuffer.add("test add log");
+            }
+        });
+        Thread thread2 = new Thread(() -> {
+            for (int i = 0; i < 10; i++) {
+                sqoopTask.logHandle(loggerBuffer);
+            }
+        });
+        thread1.start();
+        thread2.start();
+        thread1.join();
+        thread2.join();
+        // if no exception throw, assert true
+        Assert.assertTrue(true);
+    }
+
 }

Reply via email to