This is an automated email from the ASF dual-hosted git repository. kirs pushed a commit to branch 1.3.7-prepare_#5783 in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
commit a05ba8150bc9a92ebcc7649349c32d09273e8fc3 Author: CalvinKirs <[email protected]> AuthorDate: Mon Jul 19 16:28:07 2021 +0800 [1.3.7-prepare#5775][Improvement][Worker] Task log may be lost Issue #5775 Pr #5783 --- .../worker/task/AbstractCommandExecutor.java | 59 ++++++---------------- .../server/worker/task/AbstractTask.java | 11 +++- .../server/worker/task/PythonCommandExecutor.java | 3 +- .../server/worker/task/ShellCommandExecutor.java | 3 +- .../server/worker/task/sqoop/SqoopTaskTest.java | 40 ++++++++++++--- 5 files changed, 61 insertions(+), 55 deletions(-) diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractCommandExecutor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractCommandExecutor.java index 4baf8af..a0d1016 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractCommandExecutor.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractCommandExecutor.java @@ -41,6 +41,7 @@ import java.util.Collections; import java.util.LinkedList; import java.util.List; import java.util.concurrent.ExecutorService; +import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import java.util.function.Consumer; import java.util.regex.Matcher; @@ -67,7 +68,7 @@ public abstract class AbstractCommandExecutor { /** * log handler */ - protected Consumer<List<String>> logHandler; + protected Consumer<LinkedBlockingQueue<String>> logHandler; /** * logger @@ -75,9 +76,11 @@ public abstract class AbstractCommandExecutor { protected Logger logger; /** - * log list + * log collection */ - protected final List<String> logBuffer; + protected final LinkedBlockingQueue<String> logBuffer; + + protected boolean logOutputIsScuccess = false; /** * taskExecutionContext @@ -92,13 +95,13 @@ public abstract class AbstractCommandExecutor { */ private TaskExecutionContextCacheManager taskExecutionContextCacheManager; - public AbstractCommandExecutor(Consumer<List<String>> logHandler, + public AbstractCommandExecutor(Consumer<LinkedBlockingQueue<String>> logHandler, TaskExecutionContext taskExecutionContext , Logger logger){ this.logHandler = logHandler; this.taskExecutionContext = taskExecutionContext; this.logger = logger; - this.logBuffer = Collections.synchronizedList(new ArrayList<>()); + this.logBuffer = new LinkedBlockingQueue<>(); this.taskExecutionContextCacheManager = SpringApplicationContext.getBean(TaskExecutionContextCacheManagerImpl.class); } @@ -335,15 +338,14 @@ public abstract class AbstractCommandExecutor { */ private void clear() { - List<String> markerList = new ArrayList<>(); - markerList.add(ch.qos.logback.classic.ClassicConstants.FINALIZE_SESSION_MARKER.toString()); + LinkedBlockingQueue<String> markerLog = new LinkedBlockingQueue<>(); + markerLog.add(ch.qos.logback.classic.ClassicConstants.FINALIZE_SESSION_MARKER.toString()); if (!logBuffer.isEmpty()) { // log handle logHandler.accept(logBuffer); - logBuffer.clear(); } - logHandler.accept(markerList); + logHandler.accept(markerLog); } /** @@ -354,9 +356,7 @@ public abstract class AbstractCommandExecutor { String threadLoggerInfoName = String.format(LoggerUtils.TASK_LOGGER_THREAD_NAME + "-%s", taskExecutionContext.getTaskAppId()); ExecutorService getOutputLogService = ThreadUtils.newDaemonSingleThreadExecutor(threadLoggerInfoName + "-" + "getOutputLogService"); getOutputLogService.submit(() -> { - BufferedReader inReader = null; - try { - inReader = new BufferedReader(new InputStreamReader(process.getInputStream())); + try (BufferedReader inReader = new BufferedReader(new InputStreamReader(process.getInputStream()))) { String line; while ((line = inReader.readLine()) != null) { logBuffer.add(line); @@ -364,8 +364,7 @@ public abstract class AbstractCommandExecutor { } catch (Exception e) { logger.error(e.getMessage(), e); } finally { - logOutputIsSuccess = true; - close(inReader); + logOutputIsScuccess = true; } }); getOutputLogService.shutdown(); @@ -454,31 +453,20 @@ public abstract class AbstractCommandExecutor { * @return line list */ private List<String> convertFile2List(String filename) { - List lineList = new ArrayList<String>(100); + List<String> lineList = new ArrayList<>(100); File file=new File(filename); if (!file.exists()){ return lineList; } - BufferedReader br = null; - try { - br = new BufferedReader(new InputStreamReader(new FileInputStream(filename), StandardCharsets.UTF_8)); + try (BufferedReader br = new BufferedReader(new InputStreamReader(new FileInputStream(filename), StandardCharsets.UTF_8))) { String line = null; while ((line = br.readLine()) != null) { lineList.add(line); } } catch (Exception e) { logger.error(String.format("read file: %s failed : ",filename),e); - } finally { - if(br != null){ - try { - br.close(); - } catch (IOException e) { - logger.error(e.getMessage(),e); - } - } - } return lineList; } @@ -555,27 +543,10 @@ public abstract class AbstractCommandExecutor { lastFlushTime = now; /** log handle */ logHandler.accept(logBuffer); - - logBuffer.clear(); } return lastFlushTime; } - /** - * close buffer reader - * - * @param inReader in reader - */ - private void close(BufferedReader inReader) { - if (inReader != null) { - try { - inReader.close(); - } catch (IOException e) { - logger.error(e.getMessage(), e); - } - } - } - protected List<String> commandOptions() { return Collections.emptyList(); } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractTask.java index 84c5052..a85acdc 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractTask.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractTask.java @@ -44,6 +44,8 @@ import org.apache.commons.lang.StringUtils; import java.util.List; import java.util.Map; +import java.util.StringJoiner; +import java.util.concurrent.LinkedBlockingQueue; import org.slf4j.Logger; @@ -119,14 +121,19 @@ public abstract class AbstractTask { /** * log handle + * * @param logs log list */ - public void logHandle(List<String> logs) { + public void logHandle(LinkedBlockingQueue<String> logs) { // note that the "new line" is added here to facilitate log parsing if (logs.contains(FINALIZE_SESSION_MARKER.toString())) { logger.info(FINALIZE_SESSION_MARKER, FINALIZE_SESSION_MARKER.toString()); } else { - logger.info(" -> {}", String.join("\n\t", logs)); + StringJoiner joiner = new StringJoiner("\n\t"); + while (!logs.isEmpty()) { + joiner.add(logs.poll()); + } + logger.info(" -> {}", joiner); } } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/PythonCommandExecutor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/PythonCommandExecutor.java index 344d00f..2ea5710 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/PythonCommandExecutor.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/PythonCommandExecutor.java @@ -30,6 +30,7 @@ import java.nio.file.Paths; import java.util.Collections; import java.util.Date; import java.util.List; +import java.util.concurrent.LinkedBlockingQueue; import java.util.function.Consumer; /** @@ -54,7 +55,7 @@ public class PythonCommandExecutor extends AbstractCommandExecutor { * @param taskExecutionContext taskExecutionContext * @param logger logger */ - public PythonCommandExecutor(Consumer<List<String>> logHandler, + public PythonCommandExecutor(Consumer<LinkedBlockingQueue<String>> logHandler, TaskExecutionContext taskExecutionContext, Logger logger) { super(logHandler,taskExecutionContext,logger); diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/ShellCommandExecutor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/ShellCommandExecutor.java index 5e297ab..7a552c4 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/ShellCommandExecutor.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/ShellCommandExecutor.java @@ -27,6 +27,7 @@ import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.Paths; import java.util.List; +import java.util.concurrent.LinkedBlockingQueue; import java.util.function.Consumer; /** @@ -50,7 +51,7 @@ public class ShellCommandExecutor extends AbstractCommandExecutor { * @param taskExecutionContext taskExecutionContext * @param logger logger */ - public ShellCommandExecutor(Consumer<List<String>> logHandler, + public ShellCommandExecutor(Consumer<LinkedBlockingQueue<String>> logHandler, TaskExecutionContext taskExecutionContext, Logger logger) { super(logHandler,taskExecutionContext,logger); diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/sqoop/SqoopTaskTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/sqoop/SqoopTaskTest.java index 5a15a21..fecb31e 100644 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/sqoop/SqoopTaskTest.java +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/sqoop/SqoopTaskTest.java @@ -25,7 +25,11 @@ import org.apache.dolphinscheduler.server.worker.task.sqoop.generator.SqoopJobGe import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; import org.apache.dolphinscheduler.service.process.ProcessService; +import java.util.ArrayList; +import java.util.Collections; import java.util.Date; +import java.util.List; +import java.util.concurrent.LinkedBlockingQueue; import org.junit.Assert; import org.junit.Before; @@ -137,18 +141,19 @@ public class SqoopTaskTest { //import mysql to hive String mysqlToHive = "{\"jobName\":\"sqoop_import\",\"jobType\":\"TEMPLATE\",\"concurrency\":1,\"modelType\":\"import\",\"sourceType\":\"MYSQL\",\"targetType\":\"HIVE\"," - + "\"hadoopCustomParams\":[{\"prop\":\"mapreduce.map.memory.mb\",\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"2048\"},{\"prop\":\"mapreduce.reduce.memory.mb\",\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"2048\"}]," - + "\"sqoopAdvancedParams\":[{\"prop\":\"--delete-target-dir\",\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"\"},{\"prop\":\"--direct\",\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"\"}]," + "\"sourceParams\":\"{\\\"srcDatasource\\\":2,\\\"srcTable\\\":\\\"person_2\\\",\\\"srcQueryType\\\":\\\"1\\\"," + "\\\"srcQuerySql\\\":\\\"SELECT * FROM person_2\\\",\\\"srcColumnType\\\":\\\"0\\\",\\\"srcColumns\\\":\\\"\\\",\\\"srcConditionList\\\":[]," - + "\\\"mapColumnHive\\\":[{\\\"prop\\\":\\\"create_time\\\",\\\"direct\\\":\\\"IN\\\",\\\"type\\\":\\\"VARCHAR\\\",\\\"value\\\":\\\"string\\\"},{\\\"prop\\\":\\\"update_time\\\",\\\"direct\\\":\\\"IN\\\",\\\"type\\\":\\\"VARCHAR\\\",\\\"value\\\":\\\"string\\\"}]," - + "\\\"mapColumnJava\\\":[{\\\"prop\\\":\\\"create_time\\\",\\\"direct\\\":\\\"IN\\\",\\\"type\\\":\\\"VARCHAR\\\",\\\"value\\\":\\\"java.sql.Date\\\"},{\\\"prop\\\":\\\"update_time\\\",\\\"direct\\\":\\\"IN\\\",\\\"type\\\":\\\"VARCHAR\\\",\\\"value\\\":\\\"java.sql.Date\\\"}]}\"," + + "\\\"mapColumnHive\\\":[],\\\"mapColumnJava\\\":[{\\\"prop\\\":\\\"id\\\",\\\"direct\\\":\\\"IN\\\",\\\"type\\\":\\\"VARCHAR\\\",\\\"value\\\":\\\"Integer\\\"}]}\"," + "\"targetParams\":\"{\\\"hiveDatabase\\\":\\\"stg\\\",\\\"hiveTable\\\":\\\"person_internal_2\\\",\\\"createHiveTable\\\":true,\\\"dropDelimiter\\\":false," - + "\\\"hiveOverWrite\\\":true,\\\"hiveTargetDir\\\":\\\"/tmp/sqoop_import_hive\\\",\\\"replaceDelimiter\\\":\\\"\\\",\\\"hivePartitionKey\\\":\\\"date\\\",\\\"hivePartitionValue\\\":\\\"2020-02-16\\\"}\",\"localParams\":[]}"; + + "\\\"hiveOverWrite\\\":true,\\\"replaceDelimiter\\\":\\\"\\\",\\\"hivePartitionKey\\\":\\\"date\\\",\\\"hivePartitionValue\\\":\\\"2020-02-16\\\"}\",\"localParams\":[]}"; SqoopParameters mysqlToHiveParams = JSONUtils.parseObject(mysqlToHive, SqoopParameters.class); String mysqlToHiveScript = generator.generateSqoopJob(mysqlToHiveParams, mysqlTaskExecutionContext); - String mysqlToHiveExpected ="sqoop import -D mapred.job.name=sqoop_import -D mapreduce.map.memory.mb=2048 -D mapreduce.reduce.memory.mb=2048 --delete-target-dir --direct -m 1 --connect \"jdbc:mysql://192.168.0.111:3306/test?allowLoadLocalInfile=false&autoDeserialize=false&allowLocalInfile=false&allowUrlInLocalInfile=false\" --username kylo --password \"123456\" --query \"SELECT * FROM person_2 WHERE \\$CONDITIONS\" --map-column-hive create_time=string,update_time=string --map-c [...] - + String mysqlToHiveExpected = + "sqoop import -D mapred.job.name=sqoop_import -m 1 --connect \"jdbc:mysql://192.168.0.111:3306/" + + "test?allowLoadLocalInfile=false&autoDeserialize=false&allowLocalInfile=false&allowUrlInLocalInfile=false\" " + + "--username kylo --password \"123456\" " + + "--query \"SELECT * FROM person_2 WHERE \\$CONDITIONS\" --map-column-java id=Integer --hive-import --hive-database stg --hive-table person_internal_2 " + + "--create-hive-table --hive-overwrite --delete-target-dir --hive-partition-key date --hive-partition-value 2020-02-16"; Assert.assertEquals(mysqlToHiveExpected, mysqlToHiveScript); //sqoop CUSTOM job @@ -199,4 +204,25 @@ public class SqoopTaskTest { } } + @Test + public void testLogHandler() throws InterruptedException { + LinkedBlockingQueue<String> loggerBuffer = new LinkedBlockingQueue<>(); + Thread thread1 = new Thread(() -> { + for (int i = 0; i < 10; i++) { + loggerBuffer.add("test add log"); + } + }); + Thread thread2 = new Thread(() -> { + for (int i = 0; i < 10; i++) { + sqoopTask.logHandle(loggerBuffer); + } + }); + thread1.start(); + thread2.start(); + thread1.join(); + thread2.join(); + // if no exception throw, assert true + Assert.assertTrue(true); + } + }
