Repository: incubator-gobblin
Updated Branches:
  refs/heads/master ece2858ec -> d0784cad9


[Gobblin-359] fix task executor logging

Closes #2241 from kadaan/GOBBLIN-
359__Fix_TaskExecutor_Logging


Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/d0784cad
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/d0784cad
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/d0784cad

Branch: refs/heads/master
Commit: d0784cad9e809f8f1c737317b2b8096af11aea30
Parents: ece2858
Author: Joel Baranick <[email protected]>
Authored: Thu Jan 11 15:31:08 2018 -0800
Committer: Hung Tran <[email protected]>
Committed: Thu Jan 11 15:31:08 2018 -0800

----------------------------------------------------------------------
 .../apache/gobblin/runtime/TaskExecutor.java    | 34 +++++++++++---------
 1 file changed, 18 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/d0784cad/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskExecutor.java
----------------------------------------------------------------------
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskExecutor.java 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskExecutor.java
index 476282c..be78275 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskExecutor.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/TaskExecutor.java
@@ -136,28 +136,30 @@ public class TaskExecutor extends AbstractIdleService {
     Preconditions.checkArgument(queuedTaskTimeMaxAge > 0, "Queued task time 
max age should be positive");
 
     // Currently a fixed-size thread pool is used to execute tasks. We 
probably need to revisit this later.
-    this.taskExecutor = Executors.newScheduledThreadPool(
+    this.taskExecutor = 
ExecutorsUtils.loggingDecorator(Executors.newScheduledThreadPool(
         taskExecutorThreadPoolSize,
-        ExecutorsUtils.newThreadFactory(Optional.of(LOG), 
Optional.of("TaskExecutor-%d")));
+        ExecutorsUtils.newThreadFactory(Optional.of(LOG), 
Optional.of("TaskExecutor-%d"))));
 
     this.retryIntervalInSeconds = retryIntervalInSeconds;
     this.queuedTaskTimeMaxSize = queuedTaskTimeMaxSize;
     this.queuedTaskTimeMaxAge = queuedTaskTimeMaxAge;
 
-    this.forkExecutor = new ThreadPoolExecutor(
-        // The core thread pool size is equal to that of the task executor as 
there's at least one fork per task
-        taskExecutorThreadPoolSize,
-        // The fork executor thread pool size is essentially unbounded. This 
is to make sure all forks of
-        // a task get a thread to run so all forks of the task are making 
progress. This is necessary since
-        // otherwise the parent task will be blocked if the record queue 
(bounded) of some fork is full and
-        // that fork has not yet started to run because of no available 
thread. The task cannot proceed in
-        // this case because it has to make sure every records go to every 
forks.
-        Integer.MAX_VALUE,
-        0L,
-        TimeUnit.MILLISECONDS,
-        // The work queue is a SynchronousQueue. This essentially forces a new 
thread to be created for each fork.
-        new SynchronousQueue<Runnable>(),
-        ExecutorsUtils.newThreadFactory(Optional.of(LOG), 
Optional.of("ForkExecutor-%d")));
+    this.forkExecutor = ExecutorsUtils.loggingDecorator(
+        new ThreadPoolExecutor(
+            // The core thread pool size is equal to that of the task
+            // executor as there's at least one fork per task
+            taskExecutorThreadPoolSize,
+            // The fork executor thread pool size is essentially unbounded. 
This is to make sure all forks of
+            // a task get a thread to run so all forks of the task are making 
progress. This is necessary since
+            // otherwise the parent task will be blocked if the record queue 
(bounded) of some fork is full and
+            // that fork has not yet started to run because of no available 
thread. The task cannot proceed in
+            // this case because it has to make sure every records go to every 
forks.
+            Integer.MAX_VALUE,
+            0L,
+            TimeUnit.MILLISECONDS,
+            // The work queue is a SynchronousQueue. This essentially forces a 
new thread to be created for each fork.
+            new SynchronousQueue<Runnable>(),
+            ExecutorsUtils.newThreadFactory(Optional.of(LOG), 
Optional.of("ForkExecutor-%d"))));
   }
 
   /**

Reply via email to