Repository: incubator-gobblin
Updated Branches:
  refs/heads/master c103a8f6a -> 1155cdc5e


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/1155cdc5/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/TaskRunnerSuiteThreadModel.java
----------------------------------------------------------------------
diff --git 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/TaskRunnerSuiteThreadModel.java
 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/TaskRunnerSuiteThreadModel.java
index cddf519..6080e1f 100644
--- 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/TaskRunnerSuiteThreadModel.java
+++ 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/TaskRunnerSuiteThreadModel.java
@@ -18,12 +18,14 @@
 package org.apache.gobblin.cluster;
 
 import java.net.URI;
+import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 
 import org.apache.helix.task.TaskFactory;
 
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Maps;
 import com.google.common.util.concurrent.Service;
@@ -39,22 +41,25 @@ import org.apache.gobblin.util.ConfigUtils;
 import org.apache.gobblin.util.PathUtils;
 
 /**
- * A sub-type of {@link TaskRunnerSuiteBase} suite which runs all tasks in a 
thread pool.
+ * A sub-type of {@link TaskRunnerSuiteBase} suite which runs tasks in a 
thread pool.
  */
 class TaskRunnerSuiteThreadModel extends TaskRunnerSuiteBase {
   private final TaskExecutor taskExecutor;
+  private final GobblinTaskRunnerMetrics.TaskExecutionMetrics 
taskExecutionMetrics;
 
   TaskRunnerSuiteThreadModel(TaskRunnerSuiteBase.Builder builder) {
     super(builder);
+
+    // initialize task related metrics
     this.taskExecutor = new 
TaskExecutor(ConfigUtils.configToProperties(builder.getConfig()));
+    this.taskExecutionMetrics = new 
GobblinTaskRunnerMetrics.TaskExecutionMetrics(taskExecutor, metricContext);
     this.taskFactory = generateTaskFactory(taskExecutor, builder);
-    this.jobFactory = new GobblinHelixJobFactory(builder);
-    this.taskMetrics = new 
GobblinTaskRunnerMetrics.InProcessTaskRunnerMetrics(taskExecutor, 
metricContext);
+    this.jobFactory = new GobblinHelixJobFactory(builder, this.metricContext);
   }
 
   @Override
-  protected StandardMetricsBridge.StandardMetrics getTaskMetrics() {
-    return this.taskMetrics;
+  protected Collection<StandardMetricsBridge.StandardMetrics> 
getMetricsCollection() {
+    return ImmutableList.of(this.taskExecutionMetrics, 
this.jobFactory.getJobTaskMetrics(), this.jobFactory.getLauncherMetrics());
   }
 
   @Override
@@ -70,7 +75,7 @@ class TaskRunnerSuiteThreadModel extends TaskRunnerSuiteBase {
     return this.services;
   }
 
-  private TaskFactory generateTaskFactory(TaskExecutor taskExecutor, Builder 
builder) {
+  private GobblinHelixTaskFactory generateTaskFactory(TaskExecutor 
taskExecutor, Builder builder) {
     Properties properties = 
ConfigUtils.configToProperties(builder.getConfig());
     URI rootPathUri = PathUtils.getRootPath(builder.getAppWorkPath()).toUri();
     Config stateStoreJobConfig = ConfigUtils.propertiesToConfig(properties)
@@ -84,14 +89,12 @@ class TaskRunnerSuiteThreadModel extends 
TaskRunnerSuiteBase {
     services.add(new JMXReportingService(
         ImmutableMap.of("task.executor", 
taskExecutor.getTaskExecutorQueueMetricSet())));
 
-    TaskFactory taskFactory =
-        new GobblinHelixTaskFactory(builder.getContainerMetrics(),
+    return new GobblinHelixTaskFactory(builder.getContainerMetrics(),
             taskExecutor,
             taskStateTracker,
             builder.getFs(),
             builder.getAppWorkPath(),
             stateStoreJobConfig,
             builder.getHelixManager());
-    return taskFactory;
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/1155cdc5/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/TaskRunnerSuiteForJobFactoryTest.java
----------------------------------------------------------------------
diff --git 
a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/TaskRunnerSuiteForJobFactoryTest.java
 
b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/TaskRunnerSuiteForJobFactoryTest.java
index 476747d..155304a 100644
--- 
a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/TaskRunnerSuiteForJobFactoryTest.java
+++ 
b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/TaskRunnerSuiteForJobFactoryTest.java
@@ -32,6 +32,7 @@ import lombok.extern.slf4j.Slf4j;
 
 import org.apache.gobblin.annotation.Alias;
 import org.apache.gobblin.cluster.suite.IntegrationJobFactorySuite;
+import org.apache.gobblin.metrics.MetricContext;
 import org.apache.gobblin.runtime.util.StateStores;
 import org.apache.gobblin.source.extractor.partition.Partitioner;
 import org.apache.gobblin.util.PropertiesUtils;
@@ -41,7 +42,7 @@ public class TaskRunnerSuiteForJobFactoryTest extends 
TaskRunnerSuiteThreadModel
   private TaskFactory testJobFactory;
   public 
TaskRunnerSuiteForJobFactoryTest(IntegrationJobFactorySuite.TestJobFactorySuiteBuilder
 builder) {
     super(builder);
-    this.testJobFactory = new TestJobFactory(builder);
+    this.testJobFactory = new TestJobFactory(builder, this.metricContext);
   }
 
   @Override
@@ -53,22 +54,32 @@ public class TaskRunnerSuiteForJobFactoryTest extends 
TaskRunnerSuiteThreadModel
   }
 
   public class TestJobFactory extends GobblinHelixJobFactory {
-    public 
TestJobFactory(IntegrationJobFactorySuite.TestJobFactorySuiteBuilder builder) {
-      super (builder);
+    public 
TestJobFactory(IntegrationJobFactorySuite.TestJobFactorySuiteBuilder builder, 
MetricContext metricContext) {
+      super (builder, metricContext);
       this.builder = builder;
     }
 
     @Override
     public Task createNewTask(TaskCallbackContext context) {
-      return new TestHelixJobTask(context, stateStores, builder);
+      return new TestHelixJobTask(context,
+          stateStores,
+          builder,
+          new GobblinHelixJobLauncherMetrics("launcherInJobFactory", 
metricContext, 5),
+          new GobblinHelixJobTask.GobblinHelixJobTaskMetrics(metricContext, 
5));
     }
   }
 
   public class TestHelixJobTask extends GobblinHelixJobTask {
     public TestHelixJobTask(TaskCallbackContext context,
-        StateStores stateStores,
-        TaskRunnerSuiteBase.Builder builder) {
-      super(context, stateStores, builder);
+                            StateStores stateStores,
+                            TaskRunnerSuiteBase.Builder builder,
+                            GobblinHelixJobLauncherMetrics launcherMetrics,
+                            GobblinHelixJobTaskMetrics jobTaskMetrics) {
+      super(context,
+            stateStores,
+            builder,
+            launcherMetrics,
+            jobTaskMetrics);
     }
 
     //TODO: change below to Helix UserConentStore

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/1155cdc5/gobblin-core-base/src/main/java/org/apache/gobblin/instrumented/StandardMetricsBridge.java
----------------------------------------------------------------------
diff --git 
a/gobblin-core-base/src/main/java/org/apache/gobblin/instrumented/StandardMetricsBridge.java
 
b/gobblin-core-base/src/main/java/org/apache/gobblin/instrumented/StandardMetricsBridge.java
index 355139b..8a2b29f 100644
--- 
a/gobblin-core-base/src/main/java/org/apache/gobblin/instrumented/StandardMetricsBridge.java
+++ 
b/gobblin-core-base/src/main/java/org/apache/gobblin/instrumented/StandardMetricsBridge.java
@@ -36,7 +36,21 @@ import org.apache.gobblin.metrics.Tag;
  */
 public interface StandardMetricsBridge extends Instrumentable {
 
-  StandardMetrics getStandardMetrics();
+  /**
+   * Get a single standard metrics.
+   * Also see {@link #getStandardMetricsCollection}.
+   */
+  @Deprecated
+  default StandardMetrics getStandardMetrics() {
+    throw new UnsupportedOperationException("Deprecated API. Please use 
getStandardMetricsCollection.");
+  }
+
+  /**
+   * Get multiple standard metrics.
+   */
+  default Collection<StandardMetrics> getStandardMetricsCollection() {
+    return ImmutableList.of();
+  }
 
   default void switchMetricContext(MetricContext context) {
     throw new UnsupportedOperationException();

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/1155cdc5/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobCatalog.java
----------------------------------------------------------------------
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobCatalog.java 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobCatalog.java
index 6afa94a..6bc9313 100644
--- 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobCatalog.java
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobCatalog.java
@@ -18,15 +18,17 @@ package org.apache.gobblin.runtime.api;
 
 import java.net.URI;
 import java.util.Collection;
-import java.util.List;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 
 import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Lists;
 import com.typesafe.config.Config;
 
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
 import org.apache.gobblin.annotation.Alpha;
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.instrumented.GobblinMetricsKeys;
@@ -34,15 +36,11 @@ import org.apache.gobblin.instrumented.Instrumentable;
 import org.apache.gobblin.instrumented.Instrumented;
 import org.apache.gobblin.instrumented.StandardMetricsBridge;
 import org.apache.gobblin.metrics.ContextAwareGauge;
-import org.apache.gobblin.metrics.ContextAwareMetric;
 import org.apache.gobblin.metrics.ContextAwareTimer;
 import org.apache.gobblin.metrics.GobblinTrackingEvent;
 import org.apache.gobblin.metrics.MetricContext;
 import org.apache.gobblin.util.ConfigUtils;
 
-import lombok.Getter;
-import lombok.extern.slf4j.Slf4j;
-
 
 /**
  * A catalog of all the {@link JobSpec}s a Gobblin instance is currently aware 
of.
@@ -60,6 +58,10 @@ public interface JobCatalog extends 
JobCatalogListenersContainer, Instrumentable
     return getMetrics();
   }
 
+  default Collection<StandardMetricsBridge.StandardMetrics> 
getStandardMetricsCollection() {
+    return ImmutableList.of(getMetrics());
+  }
+
   /**
    * Get a {@link JobSpec} by uri.
    * @throws JobSpecNotFoundException if no such JobSpec exists

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/1155cdc5/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobExecutionLauncher.java
----------------------------------------------------------------------
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobExecutionLauncher.java
 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobExecutionLauncher.java
index e414cf3..f60fa54 100644
--- 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobExecutionLauncher.java
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobExecutionLauncher.java
@@ -20,13 +20,13 @@ import java.util.concurrent.Future;
 
 import com.codahale.metrics.Gauge;
 
+import lombok.Getter;
+
 import org.apache.gobblin.annotation.Alpha;
 import org.apache.gobblin.instrumented.Instrumentable;
 import org.apache.gobblin.metrics.ContextAwareCounter;
 import org.apache.gobblin.metrics.ContextAwareGauge;
 
-import lombok.Getter;
-
 /**
  * A factory for {@link JobExecutionDriver}s.
  */
@@ -59,28 +59,15 @@ public interface JobExecutionLauncher extends 
Instrumentable {
     public static final String NUM_JOBS_FAILED = "numJobsFailed";
     public static final String NUM_JOBS_CANCELLED = "numJobsCancelled";
     public static final String NUM_JOBS_RUNNING = "numJobsRunning";
-
     public static final String TIMER_FOR_COMPLETED_JOBS = 
"timeForCompletedJobs";
     public static final String TIMER_FOR_FAILED_JOBS = "timeForFailedJobs";
     public static final String TIMER_FOR_COMMITTED_JOBS = 
"timerForCommittedJobs";
-    public static final String TIMER_BEFORE_JOB_SCHEDULING = 
"timerBeforeJobScheduling";
-    public static final String TIMER_BEFORE_JOB_LAUNCHING = 
"timerBeforeJobLaunching";
-    public static final String TIMER_BETWEEN_JOB_SCHEDULING_AND_LAUNCHING = 
"timerBetwenJobSchedulingAndLaunching";
 
     public static final String EXECUTOR_ACTIVE_COUNT = "executorActiveCount";
     public static final String EXECUTOR_MAX_POOL_SIZE = 
"executorMaximumPoolSize";
     public static final String EXECUTOR_POOL_SIZE = "executorPoolSize";
     public static final String EXECUTOR_CORE_POOL_SIZE = 
"executorCorePoolSize";
     public static final String EXECUTOR_QUEUE_SIZE = "executorQueueSize";
-
-    public static final String TRACKING_EVENT_NAME = 
"JobExecutionLauncherEvent";
-    public static final String JOB_EXECID_META = "jobExecId";
-    public static final String JOB_LAUNCHED_OPERATION_TYPE = "JobLaunched";
-    public static final String JOB_COMPLETED_OPERATION_TYPE = "JobCompleted";
-    public static final String JOB_COMMITED_OPERATION_TYPE = "JobCommitted";
-    public static final String JOB_FAILED_OPERATION_TYPE = "JobFailed";
-    public static final String JOB_CANCELLED_OPERATION_TYPE = "JobCancelled";
-
     @Getter private final ContextAwareCounter numJobsLaunched;
     @Getter private final ContextAwareCounter numJobsCompleted;
     @Getter private final ContextAwareCounter numJobsCommitted;

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/1155cdc5/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/SpecCatalog.java
----------------------------------------------------------------------
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/SpecCatalog.java 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/SpecCatalog.java
index 024c20c..7905a54 100644
--- 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/SpecCatalog.java
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/SpecCatalog.java
@@ -24,9 +24,13 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 
 import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.typesafe.config.Config;
 
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.instrumented.GobblinMetricsKeys;
 import org.apache.gobblin.instrumented.Instrumented;
@@ -37,9 +41,6 @@ import org.apache.gobblin.metrics.GobblinTrackingEvent;
 import org.apache.gobblin.metrics.MetricContext;
 import org.apache.gobblin.util.ConfigUtils;
 
-import lombok.Getter;
-import lombok.extern.slf4j.Slf4j;
-
 
 public interface SpecCatalog extends SpecCatalogListenersContainer, 
StandardMetricsBridge {
   /** Returns an immutable {@link Collection} of {@link Spec}s that are known 
to the catalog. */
@@ -53,6 +54,10 @@ public interface SpecCatalog extends 
SpecCatalogListenersContainer, StandardMetr
     return this.getMetrics();
   }
 
+  default Collection<StandardMetricsBridge.StandardMetrics> 
getStandardMetricsCollection() {
+    return ImmutableList.of(this.getMetrics());
+  }
+
   /**
    * Get a {@link Spec} by uri.
    * @throws SpecNotFoundException if no such Spec exists

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/1155cdc5/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java
----------------------------------------------------------------------
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java
index 243f7e6..f836573 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java
@@ -44,6 +44,7 @@ import org.slf4j.LoggerFactory;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Optional;
 import com.google.common.base.Throwables;
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.eventbus.EventBus;
@@ -495,6 +496,11 @@ public class GobblinServiceManager implements 
ApplicationLauncher, StandardMetri
     return this.metrics;
   }
 
+  @Override
+  public Collection<StandardMetrics> getStandardMetricsCollection() {
+    return ImmutableList.of(this.metrics);
+  }
+
   @Nonnull
   @Override
   public MetricContext getMetricContext() {

Reply via email to