Repository: incubator-gobblin Updated Branches: refs/heads/master c103a8f6a -> 1155cdc5e
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/1155cdc5/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/TaskRunnerSuiteThreadModel.java ---------------------------------------------------------------------- diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/TaskRunnerSuiteThreadModel.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/TaskRunnerSuiteThreadModel.java index cddf519..6080e1f 100644 --- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/TaskRunnerSuiteThreadModel.java +++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/TaskRunnerSuiteThreadModel.java @@ -18,12 +18,14 @@ package org.apache.gobblin.cluster; import java.net.URI; +import java.util.Collection; import java.util.List; import java.util.Map; import java.util.Properties; import org.apache.helix.task.TaskFactory; +import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Maps; import com.google.common.util.concurrent.Service; @@ -39,22 +41,25 @@ import org.apache.gobblin.util.ConfigUtils; import org.apache.gobblin.util.PathUtils; /** - * A sub-type of {@link TaskRunnerSuiteBase} suite which runs all tasks in a thread pool. + * A sub-type of {@link TaskRunnerSuiteBase} suite which runs tasks in a thread pool. */ class TaskRunnerSuiteThreadModel extends TaskRunnerSuiteBase { private final TaskExecutor taskExecutor; + private final GobblinTaskRunnerMetrics.TaskExecutionMetrics taskExecutionMetrics; TaskRunnerSuiteThreadModel(TaskRunnerSuiteBase.Builder builder) { super(builder); + + // initialize task related metrics this.taskExecutor = new TaskExecutor(ConfigUtils.configToProperties(builder.getConfig())); + this.taskExecutionMetrics = new GobblinTaskRunnerMetrics.TaskExecutionMetrics(taskExecutor, metricContext); this.taskFactory = generateTaskFactory(taskExecutor, builder); - this.jobFactory = new GobblinHelixJobFactory(builder); - this.taskMetrics = new GobblinTaskRunnerMetrics.InProcessTaskRunnerMetrics(taskExecutor, metricContext); + this.jobFactory = new GobblinHelixJobFactory(builder, this.metricContext); } @Override - protected StandardMetricsBridge.StandardMetrics getTaskMetrics() { - return this.taskMetrics; + protected Collection<StandardMetricsBridge.StandardMetrics> getMetricsCollection() { + return ImmutableList.of(this.taskExecutionMetrics, this.jobFactory.getJobTaskMetrics(), this.jobFactory.getLauncherMetrics()); } @Override @@ -70,7 +75,7 @@ class TaskRunnerSuiteThreadModel extends TaskRunnerSuiteBase { return this.services; } - private TaskFactory generateTaskFactory(TaskExecutor taskExecutor, Builder builder) { + private GobblinHelixTaskFactory generateTaskFactory(TaskExecutor taskExecutor, Builder builder) { Properties properties = ConfigUtils.configToProperties(builder.getConfig()); URI rootPathUri = PathUtils.getRootPath(builder.getAppWorkPath()).toUri(); Config stateStoreJobConfig = ConfigUtils.propertiesToConfig(properties) @@ -84,14 +89,12 @@ class TaskRunnerSuiteThreadModel extends TaskRunnerSuiteBase { services.add(new JMXReportingService( ImmutableMap.of("task.executor", taskExecutor.getTaskExecutorQueueMetricSet()))); - TaskFactory taskFactory = - new GobblinHelixTaskFactory(builder.getContainerMetrics(), + return new GobblinHelixTaskFactory(builder.getContainerMetrics(), taskExecutor, taskStateTracker, builder.getFs(), builder.getAppWorkPath(), stateStoreJobConfig, builder.getHelixManager()); - return taskFactory; } } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/1155cdc5/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/TaskRunnerSuiteForJobFactoryTest.java ---------------------------------------------------------------------- diff --git a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/TaskRunnerSuiteForJobFactoryTest.java b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/TaskRunnerSuiteForJobFactoryTest.java index 476747d..155304a 100644 --- a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/TaskRunnerSuiteForJobFactoryTest.java +++ b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/TaskRunnerSuiteForJobFactoryTest.java @@ -32,6 +32,7 @@ import lombok.extern.slf4j.Slf4j; import org.apache.gobblin.annotation.Alias; import org.apache.gobblin.cluster.suite.IntegrationJobFactorySuite; +import org.apache.gobblin.metrics.MetricContext; import org.apache.gobblin.runtime.util.StateStores; import org.apache.gobblin.source.extractor.partition.Partitioner; import org.apache.gobblin.util.PropertiesUtils; @@ -41,7 +42,7 @@ public class TaskRunnerSuiteForJobFactoryTest extends TaskRunnerSuiteThreadModel private TaskFactory testJobFactory; public TaskRunnerSuiteForJobFactoryTest(IntegrationJobFactorySuite.TestJobFactorySuiteBuilder builder) { super(builder); - this.testJobFactory = new TestJobFactory(builder); + this.testJobFactory = new TestJobFactory(builder, this.metricContext); } @Override @@ -53,22 +54,32 @@ public class TaskRunnerSuiteForJobFactoryTest extends TaskRunnerSuiteThreadModel } public class TestJobFactory extends GobblinHelixJobFactory { - public TestJobFactory(IntegrationJobFactorySuite.TestJobFactorySuiteBuilder builder) { - super (builder); + public TestJobFactory(IntegrationJobFactorySuite.TestJobFactorySuiteBuilder builder, MetricContext metricContext) { + super (builder, metricContext); this.builder = builder; } @Override public Task createNewTask(TaskCallbackContext context) { - return new TestHelixJobTask(context, stateStores, builder); + return new TestHelixJobTask(context, + stateStores, + builder, + new GobblinHelixJobLauncherMetrics("launcherInJobFactory", metricContext, 5), + new GobblinHelixJobTask.GobblinHelixJobTaskMetrics(metricContext, 5)); } } public class TestHelixJobTask extends GobblinHelixJobTask { public TestHelixJobTask(TaskCallbackContext context, - StateStores stateStores, - TaskRunnerSuiteBase.Builder builder) { - super(context, stateStores, builder); + StateStores stateStores, + TaskRunnerSuiteBase.Builder builder, + GobblinHelixJobLauncherMetrics launcherMetrics, + GobblinHelixJobTaskMetrics jobTaskMetrics) { + super(context, + stateStores, + builder, + launcherMetrics, + jobTaskMetrics); } //TODO: change below to Helix UserConentStore http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/1155cdc5/gobblin-core-base/src/main/java/org/apache/gobblin/instrumented/StandardMetricsBridge.java ---------------------------------------------------------------------- diff --git a/gobblin-core-base/src/main/java/org/apache/gobblin/instrumented/StandardMetricsBridge.java b/gobblin-core-base/src/main/java/org/apache/gobblin/instrumented/StandardMetricsBridge.java index 355139b..8a2b29f 100644 --- a/gobblin-core-base/src/main/java/org/apache/gobblin/instrumented/StandardMetricsBridge.java +++ b/gobblin-core-base/src/main/java/org/apache/gobblin/instrumented/StandardMetricsBridge.java @@ -36,7 +36,21 @@ import org.apache.gobblin.metrics.Tag; */ public interface StandardMetricsBridge extends Instrumentable { - StandardMetrics getStandardMetrics(); + /** + * Get a single standard metrics. + * Also see {@link #getStandardMetricsCollection}. + */ + @Deprecated + default StandardMetrics getStandardMetrics() { + throw new UnsupportedOperationException("Deprecated API. Please use getStandardMetricsCollection."); + } + + /** + * Get multiple standard metrics. + */ + default Collection<StandardMetrics> getStandardMetricsCollection() { + return ImmutableList.of(); + } default void switchMetricContext(MetricContext context) { throw new UnsupportedOperationException(); http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/1155cdc5/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobCatalog.java ---------------------------------------------------------------------- diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobCatalog.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobCatalog.java index 6afa94a..6bc9313 100644 --- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobCatalog.java +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobCatalog.java @@ -18,15 +18,17 @@ package org.apache.gobblin.runtime.api; import java.net.URI; import java.util.Collection; -import java.util.List; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import com.google.common.base.Optional; +import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; -import com.google.common.collect.Lists; import com.typesafe.config.Config; +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; + import org.apache.gobblin.annotation.Alpha; import org.apache.gobblin.configuration.ConfigurationKeys; import org.apache.gobblin.instrumented.GobblinMetricsKeys; @@ -34,15 +36,11 @@ import org.apache.gobblin.instrumented.Instrumentable; import org.apache.gobblin.instrumented.Instrumented; import org.apache.gobblin.instrumented.StandardMetricsBridge; import org.apache.gobblin.metrics.ContextAwareGauge; -import org.apache.gobblin.metrics.ContextAwareMetric; import org.apache.gobblin.metrics.ContextAwareTimer; import org.apache.gobblin.metrics.GobblinTrackingEvent; import org.apache.gobblin.metrics.MetricContext; import org.apache.gobblin.util.ConfigUtils; -import lombok.Getter; -import lombok.extern.slf4j.Slf4j; - /** * A catalog of all the {@link JobSpec}s a Gobblin instance is currently aware of. @@ -60,6 +58,10 @@ public interface JobCatalog extends JobCatalogListenersContainer, Instrumentable return getMetrics(); } + default Collection<StandardMetricsBridge.StandardMetrics> getStandardMetricsCollection() { + return ImmutableList.of(getMetrics()); + } + /** * Get a {@link JobSpec} by uri. * @throws JobSpecNotFoundException if no such JobSpec exists http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/1155cdc5/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobExecutionLauncher.java ---------------------------------------------------------------------- diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobExecutionLauncher.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobExecutionLauncher.java index e414cf3..f60fa54 100644 --- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobExecutionLauncher.java +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobExecutionLauncher.java @@ -20,13 +20,13 @@ import java.util.concurrent.Future; import com.codahale.metrics.Gauge; +import lombok.Getter; + import org.apache.gobblin.annotation.Alpha; import org.apache.gobblin.instrumented.Instrumentable; import org.apache.gobblin.metrics.ContextAwareCounter; import org.apache.gobblin.metrics.ContextAwareGauge; -import lombok.Getter; - /** * A factory for {@link JobExecutionDriver}s. */ @@ -59,28 +59,15 @@ public interface JobExecutionLauncher extends Instrumentable { public static final String NUM_JOBS_FAILED = "numJobsFailed"; public static final String NUM_JOBS_CANCELLED = "numJobsCancelled"; public static final String NUM_JOBS_RUNNING = "numJobsRunning"; - public static final String TIMER_FOR_COMPLETED_JOBS = "timeForCompletedJobs"; public static final String TIMER_FOR_FAILED_JOBS = "timeForFailedJobs"; public static final String TIMER_FOR_COMMITTED_JOBS = "timerForCommittedJobs"; - public static final String TIMER_BEFORE_JOB_SCHEDULING = "timerBeforeJobScheduling"; - public static final String TIMER_BEFORE_JOB_LAUNCHING = "timerBeforeJobLaunching"; - public static final String TIMER_BETWEEN_JOB_SCHEDULING_AND_LAUNCHING = "timerBetwenJobSchedulingAndLaunching"; public static final String EXECUTOR_ACTIVE_COUNT = "executorActiveCount"; public static final String EXECUTOR_MAX_POOL_SIZE = "executorMaximumPoolSize"; public static final String EXECUTOR_POOL_SIZE = "executorPoolSize"; public static final String EXECUTOR_CORE_POOL_SIZE = "executorCorePoolSize"; public static final String EXECUTOR_QUEUE_SIZE = "executorQueueSize"; - - public static final String TRACKING_EVENT_NAME = "JobExecutionLauncherEvent"; - public static final String JOB_EXECID_META = "jobExecId"; - public static final String JOB_LAUNCHED_OPERATION_TYPE = "JobLaunched"; - public static final String JOB_COMPLETED_OPERATION_TYPE = "JobCompleted"; - public static final String JOB_COMMITED_OPERATION_TYPE = "JobCommitted"; - public static final String JOB_FAILED_OPERATION_TYPE = "JobFailed"; - public static final String JOB_CANCELLED_OPERATION_TYPE = "JobCancelled"; - @Getter private final ContextAwareCounter numJobsLaunched; @Getter private final ContextAwareCounter numJobsCompleted; @Getter private final ContextAwareCounter numJobsCommitted; http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/1155cdc5/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/SpecCatalog.java ---------------------------------------------------------------------- diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/SpecCatalog.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/SpecCatalog.java index 024c20c..7905a54 100644 --- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/SpecCatalog.java +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/SpecCatalog.java @@ -24,9 +24,13 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import com.google.common.base.Optional; +import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.typesafe.config.Config; +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; + import org.apache.gobblin.configuration.ConfigurationKeys; import org.apache.gobblin.instrumented.GobblinMetricsKeys; import org.apache.gobblin.instrumented.Instrumented; @@ -37,9 +41,6 @@ import org.apache.gobblin.metrics.GobblinTrackingEvent; import org.apache.gobblin.metrics.MetricContext; import org.apache.gobblin.util.ConfigUtils; -import lombok.Getter; -import lombok.extern.slf4j.Slf4j; - public interface SpecCatalog extends SpecCatalogListenersContainer, StandardMetricsBridge { /** Returns an immutable {@link Collection} of {@link Spec}s that are known to the catalog. */ @@ -53,6 +54,10 @@ public interface SpecCatalog extends SpecCatalogListenersContainer, StandardMetr return this.getMetrics(); } + default Collection<StandardMetricsBridge.StandardMetrics> getStandardMetricsCollection() { + return ImmutableList.of(this.getMetrics()); + } + /** * Get a {@link Spec} by uri. * @throws SpecNotFoundException if no such Spec exists http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/1155cdc5/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java ---------------------------------------------------------------------- diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java index 243f7e6..f836573 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java @@ -44,6 +44,7 @@ import org.slf4j.LoggerFactory; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Optional; import com.google.common.base.Throwables; +import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.eventbus.EventBus; @@ -495,6 +496,11 @@ public class GobblinServiceManager implements ApplicationLauncher, StandardMetri return this.metrics; } + @Override + public Collection<StandardMetrics> getStandardMetricsCollection() { + return ImmutableList.of(this.metrics); + } + @Nonnull @Override public MetricContext getMetricContext() {
