Repository: incubator-gobblin Updated Branches: refs/heads/master ece2858ec -> d0784cad9
[Gobblin-359] fix task executor logging Closes #2241 from kadaan/GOBBLIN- 359__Fix_TaskExecutor_Logging Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/d0784cad Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/d0784cad Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/d0784cad Branch: refs/heads/master Commit: d0784cad9e809f8f1c737317b2b8096af11aea30 Parents: ece2858 Author: Joel Baranick <[email protected]> Authored: Thu Jan 11 15:31:08 2018 -0800 Committer: Hung Tran <[email protected]> Committed: Thu Jan 11 15:31:08 2018 -0800 ---------------------------------------------------------------------- .../apache/gobblin/runtime/TaskExecutor.java | 34 +++++++++++--------- 1 file changed, 18 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/d0784cad/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskExecutor.java ---------------------------------------------------------------------- diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskExecutor.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskExecutor.java index 476282c..be78275 100644 --- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskExecutor.java +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskExecutor.java @@ -136,28 +136,30 @@ public class TaskExecutor extends AbstractIdleService { Preconditions.checkArgument(queuedTaskTimeMaxAge > 0, "Queued task time max age should be positive"); // Currently a fixed-size thread pool is used to execute tasks. We probably need to revisit this later. - this.taskExecutor = Executors.newScheduledThreadPool( + this.taskExecutor = ExecutorsUtils.loggingDecorator(Executors.newScheduledThreadPool( taskExecutorThreadPoolSize, - ExecutorsUtils.newThreadFactory(Optional.of(LOG), Optional.of("TaskExecutor-%d"))); + ExecutorsUtils.newThreadFactory(Optional.of(LOG), Optional.of("TaskExecutor-%d")))); this.retryIntervalInSeconds = retryIntervalInSeconds; this.queuedTaskTimeMaxSize = queuedTaskTimeMaxSize; this.queuedTaskTimeMaxAge = queuedTaskTimeMaxAge; - this.forkExecutor = new ThreadPoolExecutor( - // The core thread pool size is equal to that of the task executor as there's at least one fork per task - taskExecutorThreadPoolSize, - // The fork executor thread pool size is essentially unbounded. This is to make sure all forks of - // a task get a thread to run so all forks of the task are making progress. This is necessary since - // otherwise the parent task will be blocked if the record queue (bounded) of some fork is full and - // that fork has not yet started to run because of no available thread. The task cannot proceed in - // this case because it has to make sure every records go to every forks. - Integer.MAX_VALUE, - 0L, - TimeUnit.MILLISECONDS, - // The work queue is a SynchronousQueue. This essentially forces a new thread to be created for each fork. - new SynchronousQueue<Runnable>(), - ExecutorsUtils.newThreadFactory(Optional.of(LOG), Optional.of("ForkExecutor-%d"))); + this.forkExecutor = ExecutorsUtils.loggingDecorator( + new ThreadPoolExecutor( + // The core thread pool size is equal to that of the task + // executor as there's at least one fork per task + taskExecutorThreadPoolSize, + // The fork executor thread pool size is essentially unbounded. This is to make sure all forks of + // a task get a thread to run so all forks of the task are making progress. This is necessary since + // otherwise the parent task will be blocked if the record queue (bounded) of some fork is full and + // that fork has not yet started to run because of no available thread. The task cannot proceed in + // this case because it has to make sure every records go to every forks. + Integer.MAX_VALUE, + 0L, + TimeUnit.MILLISECONDS, + // The work queue is a SynchronousQueue. This essentially forces a new thread to be created for each fork. + new SynchronousQueue<Runnable>(), + ExecutorsUtils.newThreadFactory(Optional.of(LOG), Optional.of("ForkExecutor-%d")))); } /**
