This is an automated email from the ASF dual-hosted git repository.
lesun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 678a3af [GOBBLIN-1542] Integrate with Helix API to add/remove task
from a running helix job (#3393)
678a3af is described below
commit 678a3afa19b6f0ba88d370171993a04a518503d4
Author: Zihan Li <[email protected]>
AuthorDate: Wed Oct 13 14:09:36 2021 -0700
[GOBBLIN-1542] Integrate with Helix API to add/remove task from a running
helix job (#3393)
* [GOBBLIN-1542] Integrate with Helix API to add/remove task from a running
helix job
* fix error when deleting task
* fix some flaky test
* remove unintentional change
* fix test class path for gobblin-service
---
.../org/apache/gobblin/source/InfiniteSource.java | 40 ++++
.../apache/gobblin/stream/WorkUnitChangeEvent.java | 37 +++
gobblin-cluster/build.gradle | 4 +-
.../gobblin/cluster/GobblinHelixJobLauncher.java | 257 ++++++++++++++-------
.../org/apache/gobblin/cluster/HelixUtils.java | 28 +++
.../gobblin/cluster/ClusterIntegrationTest.java | 6 +-
.../gobblin/cluster/GobblinTaskRunnerTest.java | 3 +-
.../cluster/HelixAssignedParticipantCheckTest.java | 8 +-
.../org/apache/gobblin/metastore/ZkStateStore.java | 2 +-
.../extract/kafka/KafkaStreamingExtractor.java | 4 +-
.../extract/kafka/UniversalKafkaSource.java | 22 +-
.../gobblin/runtime/AbstractJobLauncher.java | 36 ++-
.../apache/gobblin/runtime/SourceDecorator.java | 11 +
gobblin-service/build.gradle | 4 +-
gobblin-yarn/build.gradle | 4 +-
.../yarn/YarnServiceTestWithExpiration.java | 20 +-
gradle/scripts/dependencyDefinitions.gradle | 2 +-
17 files changed, 376 insertions(+), 112 deletions(-)
diff --git
a/gobblin-api/src/main/java/org/apache/gobblin/source/InfiniteSource.java
b/gobblin-api/src/main/java/org/apache/gobblin/source/InfiniteSource.java
new file mode 100644
index 0000000..51eaf68
--- /dev/null
+++ b/gobblin-api/src/main/java/org/apache/gobblin/source/InfiniteSource.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.source;
+
+import com.google.common.eventbus.EventBus;
+import org.apache.gobblin.annotation.Alpha;
+
+
+/**
+ * An interface for infinite source, where source should be able to detect the
work unit change
+ * and post the change through eventBus
+ *
+ * @author Zihan Li
+ *
+ * @param <S> output schema type
+ * @param <D> output record type
+ */
+@Alpha
+public interface InfiniteSource<S, D> extends Source<S, D>{
+
+ /**
+ * Return the eventBus where it will post {@link
org.apache.gobblin.stream.WorkUnitChangeEvent} when workUnit change
+ */
+ EventBus getEventBus();
+
+}
diff --git
a/gobblin-api/src/main/java/org/apache/gobblin/stream/WorkUnitChangeEvent.java
b/gobblin-api/src/main/java/org/apache/gobblin/stream/WorkUnitChangeEvent.java
new file mode 100644
index 0000000..f977a4c
--- /dev/null
+++
b/gobblin-api/src/main/java/org/apache/gobblin/stream/WorkUnitChangeEvent.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.stream;
+
+import java.util.List;
+import lombok.Getter;
+import org.apache.gobblin.source.workunit.WorkUnit;
+
+/**
+ * The event for {@link org.apache.gobblin.source.InfiniteSource} to indicate
there is a change in work units
+ * Job launcher should then be able to handle this event
+ */
+public class WorkUnitChangeEvent {
+ @Getter
+ private final List<String> oldTaskIds;
+ @Getter
+ private final List<WorkUnit> newWorkUnits;
+ public WorkUnitChangeEvent(List<String> oldTaskIds, List<WorkUnit>
newWorkUnits) {
+ this.oldTaskIds = oldTaskIds;
+ this.newWorkUnits = newWorkUnits;
+ }
+}
diff --git a/gobblin-cluster/build.gradle b/gobblin-cluster/build.gradle
index c49be50..5b574eb 100644
--- a/gobblin-cluster/build.gradle
+++ b/gobblin-cluster/build.gradle
@@ -47,7 +47,9 @@ dependencies {
compile externalDependency.hadoopCommon
compile externalDependency.avroMapredH2
compile externalDependency.findBugsAnnotations
- compile externalDependency.helix
+ compile (externalDependency.helix) {
+ exclude group: 'io.dropwizard.metrics', module: 'metrics-core'
+ }
runtimeOnly project(":gobblin-modules:gobblin-service-kafka")
diff --git
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
index f50dfb4..4d48fcd 100644
---
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
+++
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
@@ -17,14 +17,20 @@
package org.apache.gobblin.cluster;
+import com.github.rholder.retry.RetryException;
+import com.github.rholder.retry.Retryer;
+import com.github.rholder.retry.RetryerBuilder;
+import com.github.rholder.retry.StopStrategies;
import java.io.IOException;
import java.net.URI;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
import org.apache.hadoop.conf.Configuration;
@@ -127,13 +133,12 @@ public class GobblinHelixJobLauncher extends
AbstractJobLauncher {
private final Config jobConfig;
private final long workFlowExpiryTimeSeconds;
private final long helixJobStopTimeoutSeconds;
+ private Map<String, TaskConfig> workUnitToHelixConfig;
+ private Retryer<Boolean> taskRetryer;
- public GobblinHelixJobLauncher (Properties jobProps,
- final HelixManager helixManager,
- Path appWorkDir,
- List<? extends Tag<?>> metadataTags,
- ConcurrentHashMap<String, Boolean>
runningMap,
- Optional<GobblinHelixMetrics> helixMetrics)
throws Exception {
+ public GobblinHelixJobLauncher(Properties jobProps, final HelixManager
helixManager, Path appWorkDir,
+ List<? extends Tag<?>> metadataTags, ConcurrentHashMap<String, Boolean>
runningMap,
+ Optional<GobblinHelixMetrics> helixMetrics) throws Exception {
super(jobProps, addAdditionalMetadataTags(jobProps, metadataTags));
LOGGER.debug("GobblinHelixJobLauncher: jobProps {}, appWorkDir {}",
jobProps, appWorkDir);
@@ -142,8 +147,8 @@ public class GobblinHelixJobLauncher extends
AbstractJobLauncher {
this.runningMap = runningMap;
this.appWorkDir = appWorkDir;
this.inputWorkUnitDir = new Path(appWorkDir,
GobblinClusterConfigurationKeys.INPUT_WORK_UNIT_DIR_NAME);
- this.outputTaskStateDir = new Path(this.appWorkDir,
GobblinClusterConfigurationKeys.OUTPUT_TASK_STATE_DIR_NAME
- + Path.SEPARATOR + this.jobContext.getJobId());
+ this.outputTaskStateDir = new Path(this.appWorkDir,
+ GobblinClusterConfigurationKeys.OUTPUT_TASK_STATE_DIR_NAME +
Path.SEPARATOR + this.jobContext.getJobId());
this.helixWorkFlowName = this.jobContext.getJobId();
this.jobContext.getJobState().setJobLauncherType(LauncherTypeEnum.CLUSTER);
@@ -152,36 +157,36 @@ public class GobblinHelixJobLauncher extends
AbstractJobLauncher {
Integer.toString(ParallelRunner.DEFAULT_PARALLEL_RUNNER_THREADS)));
jobConfig = ConfigUtils.propertiesToConfig(jobProps);
- this.workFlowExpiryTimeSeconds = ConfigUtils.getLong(jobConfig,
- GobblinClusterConfigurationKeys.HELIX_WORKFLOW_EXPIRY_TIME_SECONDS,
-
GobblinClusterConfigurationKeys.DEFAULT_HELIX_WORKFLOW_EXPIRY_TIME_SECONDS);
+ this.workFlowExpiryTimeSeconds =
+ ConfigUtils.getLong(jobConfig,
GobblinClusterConfigurationKeys.HELIX_WORKFLOW_EXPIRY_TIME_SECONDS,
+
GobblinClusterConfigurationKeys.DEFAULT_HELIX_WORKFLOW_EXPIRY_TIME_SECONDS);
- this.helixJobStopTimeoutSeconds = ConfigUtils.getLong(jobConfig,
- GobblinClusterConfigurationKeys.HELIX_JOB_STOP_TIMEOUT_SECONDS,
-
GobblinClusterConfigurationKeys.DEFAULT_HELIX_JOB_STOP_TIMEOUT_SECONDS);
+ this.helixJobStopTimeoutSeconds =
+ ConfigUtils.getLong(jobConfig,
GobblinClusterConfigurationKeys.HELIX_JOB_STOP_TIMEOUT_SECONDS,
+
GobblinClusterConfigurationKeys.DEFAULT_HELIX_JOB_STOP_TIMEOUT_SECONDS);
Config stateStoreJobConfig = ConfigUtils.propertiesToConfig(jobProps)
.withValue(ConfigurationKeys.STATE_STORE_FS_URI_KEY,
ConfigValueFactory.fromAnyRef(
- new URI(appWorkDir.toUri().getScheme(), null,
appWorkDir.toUri().getHost(),
- appWorkDir.toUri().getPort(), "/", null, null).toString()));
+ new URI(appWorkDir.toUri().getScheme(), null,
appWorkDir.toUri().getHost(), appWorkDir.toUri().getPort(),
+ "/", null, null).toString()));
- this.stateStores = new StateStores(stateStoreJobConfig, appWorkDir,
- GobblinClusterConfigurationKeys.OUTPUT_TASK_STATE_DIR_NAME, appWorkDir,
- GobblinClusterConfigurationKeys.INPUT_WORK_UNIT_DIR_NAME, appWorkDir,
- GobblinClusterConfigurationKeys.JOB_STATE_DIR_NAME);
+ this.stateStores =
+ new StateStores(stateStoreJobConfig, appWorkDir,
GobblinClusterConfigurationKeys.OUTPUT_TASK_STATE_DIR_NAME,
+ appWorkDir,
GobblinClusterConfigurationKeys.INPUT_WORK_UNIT_DIR_NAME, appWorkDir,
+ GobblinClusterConfigurationKeys.JOB_STATE_DIR_NAME);
URI fsUri = URI.create(jobProps.getProperty(ConfigurationKeys.FS_URI_KEY,
ConfigurationKeys.LOCAL_FS_URI));
this.fs = FileSystem.get(fsUri, new Configuration());
- this.taskStateCollectorService = new TaskStateCollectorService(jobProps,
- this.jobContext.getJobState(),
- this.eventBus,
- this.eventSubmitter,
- this.stateStores.getTaskStateStore(),
- this.outputTaskStateDir,
- this.getIssueRepository());
+ this.taskStateCollectorService =
+ new TaskStateCollectorService(jobProps, this.jobContext.getJobState(),
this.eventBus, this.eventSubmitter,
+ this.stateStores.getTaskStateStore(), this.outputTaskStateDir,
this.getIssueRepository());
this.helixMetrics = helixMetrics;
+ this.workUnitToHelixConfig = new HashMap<>();
+ this.taskRetryer = RetryerBuilder.<Boolean>newBuilder()
+ .retryIfException()
+ .withStopStrategy(StopStrategies.stopAfterAttempt(3)).build();
startCancellationExecutor();
}
@@ -269,6 +274,64 @@ public class GobblinHelixJobLauncher extends
AbstractJobLauncher {
}
}
+ protected void removeTasksFromCurrentJob(List<String> workUnitIdsToRemove)
throws IOException, ExecutionException,
+
RetryException {
+ String jobName = this.jobContext.getJobId();
+ try (ParallelRunner stateSerDeRunner = new
ParallelRunner(this.stateSerDeRunnerThreads, this.fs)) {
+ for (String workUnitId : workUnitIdsToRemove) {
+ taskRetryer.call(new Callable<Boolean>() {
+ @Override
+ public Boolean call() throws Exception {
+ String taskId = workUnitToHelixConfig.get(workUnitId).getId();
+ boolean remove =
+ HelixUtils.deleteTaskFromHelixJob(helixWorkFlowName, jobName,
taskId, helixTaskDriver);
+ if (remove) {
+ log.info(String.format("Removed helix task %s with gobblin task
id %s from helix job %s:%s ", taskId,
+ workUnitId, helixWorkFlowName, jobName));
+ } else {
+ throw new IOException(
+ String.format("Cannot remove task %s from helix job %s:%s",
workUnitId,
+ helixWorkFlowName, jobName));
+ }
+ return true;
+ }
+ });
+ deleteWorkUnitFromStateStore(workUnitId, stateSerDeRunner);
+ log.info(String.format("remove task state for %s in state store",
workUnitId));
+ this.workUnitToHelixConfig.remove(workUnitId);
+ }
+ }
+ }
+
+ protected void addTasksToCurrentJob(List<WorkUnit> workUnitsToAdd) throws
IOException, ExecutionException,
+
RetryException {
+ String jobName = this.jobContext.getJobId();
+ try (ParallelRunner stateSerDeRunner = new
ParallelRunner(this.stateSerDeRunnerThreads, this.fs)) {
+ for (WorkUnit workunit : workUnitsToAdd) {
+ TaskConfig taskConfig = getTaskConfig(workunit, stateSerDeRunner);
+ this.taskRetryer.call(new Callable<Boolean>() {
+ @Override
+ public Boolean call() throws Exception {
+ boolean added =
+ HelixUtils.addTaskToHelixJob(helixWorkFlowName, jobName,
taskConfig, helixTaskDriver);
+ if (added) {
+ log.info(
+ String.format("Added task %s to helix job %s:%s ",
workunit.getId(), helixWorkFlowName, jobName));
+ } else {
+ log.error(
+ String.format("Failed to add task %s to helix job %s:%s ",
workunit.getId(), helixWorkFlowName,
+ jobName));
+ throw new IOException(
+ String.format("Cannot add task %s to helix job %s:%s",
workunit.getId(),
+ helixWorkFlowName, jobName));
+ }
+ return true;
+ }
+ });
+ }
+ }
+ }
+
/**
* Create a job from a given batch of {@link WorkUnit}s.
*/
@@ -289,8 +352,8 @@ public class GobblinHelixJobLauncher extends
AbstractJobLauncher {
// write the job.state using the state store if present, otherwise
serialize directly to the file
if (this.stateStores.haveJobStateStore()) {
jobStateFilePath = GobblinClusterUtils.getJobStateFilePath(true,
this.appWorkDir, this.jobContext.getJobId());
-
this.stateStores.getJobStateStore().put(jobStateFilePath.getParent().getName(),
jobStateFilePath.getName(),
- this.jobContext.getJobState());
+ this.stateStores.getJobStateStore()
+ .put(jobStateFilePath.getParent().getName(),
jobStateFilePath.getName(), this.jobContext.getJobState());
} else {
jobStateFilePath = GobblinClusterUtils.getJobStateFilePath(false,
this.appWorkDir, this.jobContext.getJobId());
SerializationUtils.serializeState(this.fs, jobStateFilePath,
this.jobContext.getJobState());
@@ -315,23 +378,22 @@ public class GobblinHelixJobLauncher extends
AbstractJobLauncher {
JobConfig.Builder jobConfigBuilder = new JobConfig.Builder();
// Helix task attempts = retries + 1 (fallback to general task retry for
backward compatibility)
- jobConfigBuilder.setMaxAttemptsPerTask(gobblinJobState.getPropAsInt(
- GobblinClusterConfigurationKeys.HELIX_TASK_MAX_ATTEMPTS_KEY,
gobblinJobState.getPropAsInt(
- ConfigurationKeys.MAX_TASK_RETRIES_KEY,
- ConfigurationKeys.DEFAULT_MAX_TASK_RETRIES)) + 1);
+ jobConfigBuilder.setMaxAttemptsPerTask(
+
gobblinJobState.getPropAsInt(GobblinClusterConfigurationKeys.HELIX_TASK_MAX_ATTEMPTS_KEY,
+
gobblinJobState.getPropAsInt(ConfigurationKeys.MAX_TASK_RETRIES_KEY,
+ ConfigurationKeys.DEFAULT_MAX_TASK_RETRIES)) + 1);
// Helix task timeout (fallback to general task timeout for backward
compatibility)
- jobConfigBuilder.setTimeoutPerTask(gobblinJobState.getPropAsLong(
- GobblinClusterConfigurationKeys.HELIX_TASK_TIMEOUT_SECONDS,
- gobblinJobState.getPropAsLong(
- ConfigurationKeys.TASK_TIMEOUT_SECONDS,
- ConfigurationKeys.DEFAULT_TASK_TIMEOUT_SECONDS)) * 1000);
+ jobConfigBuilder.setTimeoutPerTask(
+
gobblinJobState.getPropAsLong(GobblinClusterConfigurationKeys.HELIX_TASK_TIMEOUT_SECONDS,
+
gobblinJobState.getPropAsLong(ConfigurationKeys.TASK_TIMEOUT_SECONDS,
+ ConfigurationKeys.DEFAULT_TASK_TIMEOUT_SECONDS)) * 1000);
jobConfigBuilder.setFailureThreshold(workUnits.size());
jobConfigBuilder.addTaskConfigMap(taskConfigMap).setCommand(GobblinTaskRunner.GOBBLIN_TASK_FACTORY_NAME);
-
jobConfigBuilder.setNumConcurrentTasksPerInstance(ConfigUtils.getInt(jobConfig,
- GobblinClusterConfigurationKeys.HELIX_CLUSTER_TASK_CONCURRENCY,
-
GobblinClusterConfigurationKeys.HELIX_CLUSTER_TASK_CONCURRENCY_DEFAULT));
+ jobConfigBuilder.setNumConcurrentTasksPerInstance(
+ ConfigUtils.getInt(jobConfig,
GobblinClusterConfigurationKeys.HELIX_CLUSTER_TASK_CONCURRENCY,
+
GobblinClusterConfigurationKeys.HELIX_CLUSTER_TASK_CONCURRENCY_DEFAULT));
if
(this.jobConfig.hasPath(GobblinClusterConfigurationKeys.HELIX_JOB_TAG_KEY)) {
String jobTag =
this.jobConfig.getString(GobblinClusterConfigurationKeys.HELIX_JOB_TAG_KEY);
@@ -349,8 +411,9 @@ public class GobblinHelixJobLauncher extends
AbstractJobLauncher {
jobConfigBuilder.setRebalanceRunningTask(true);
}
- jobConfigBuilder.setExpiry(gobblinJobState.getPropAsLong(
- GobblinClusterConfigurationKeys.HELIX_WORKFLOW_EXPIRY_TIME_SECONDS,
GobblinClusterConfigurationKeys.DEFAULT_HELIX_WORKFLOW_EXPIRY_TIME_SECONDS));
+ jobConfigBuilder.setExpiry(
+
gobblinJobState.getPropAsLong(GobblinClusterConfigurationKeys.HELIX_WORKFLOW_EXPIRY_TIME_SECONDS,
+
GobblinClusterConfigurationKeys.DEFAULT_HELIX_WORKFLOW_EXPIRY_TIME_SECONDS));
return jobConfigBuilder;
}
@@ -359,16 +422,11 @@ public class GobblinHelixJobLauncher extends
AbstractJobLauncher {
* Submit a job to run.
*/
private void submitJobToHelix(JobConfig.Builder jobConfigBuilder) throws
Exception {
- HelixUtils.submitJobToWorkFlow(jobConfigBuilder,
- this.helixWorkFlowName,
- this.jobContext.getJobId(),
- this.helixTaskDriver,
- this.helixManager,
- this.workFlowExpiryTimeSeconds);
+ HelixUtils.submitJobToWorkFlow(jobConfigBuilder, this.helixWorkFlowName,
this.jobContext.getJobId(),
+ this.helixTaskDriver, this.helixManager,
this.workFlowExpiryTimeSeconds);
}
- public void launchJob(@Nullable JobListener jobListener)
- throws JobException {
+ public void launchJob(@Nullable JobListener jobListener) throws JobException
{
this.jobListener = jobListener;
boolean isLaunched = false;
this.runningMap.putIfAbsent(this.jobContext.getJobName(), false);
@@ -376,11 +434,11 @@ public class GobblinHelixJobLauncher extends
AbstractJobLauncher {
Throwable errorInJobLaunching = null;
try {
if (this.runningMap.replace(this.jobContext.getJobName(), false, true)) {
- LOGGER.info ("Job {} will be executed, add into running map.",
this.jobContext.getJobId());
+ LOGGER.info("Job {} will be executed, add into running map.",
this.jobContext.getJobId());
isLaunched = true;
super.launchJob(jobListener);
} else {
- LOGGER.warn ("Job {} will not be executed because other jobs are still
running.", this.jobContext.getJobId());
+ LOGGER.warn("Job {} will not be executed because other jobs are still
running.", this.jobContext.getJobId());
}
// TODO: Better error handling
} catch (Throwable t) {
@@ -388,23 +446,19 @@ public class GobblinHelixJobLauncher extends
AbstractJobLauncher {
} finally {
if (isLaunched) {
if (this.runningMap.replace(this.jobContext.getJobName(), true,
false)) {
- LOGGER.info ("Job {} is done, remove from running map.",
this.jobContext.getJobId());
+ LOGGER.info("Job {} is done, remove from running map.",
this.jobContext.getJobId());
} else {
- throw errorInJobLaunching == null ? new IllegalStateException("A
launched job should have running state equal to true in the running map.")
+ throw errorInJobLaunching == null ? new IllegalStateException(
+ "A launched job should have running state equal to true in the
running map.")
: new RuntimeException("Failure in launching job:",
errorInJobLaunching);
}
}
}
}
- /**
- * Add a single {@link WorkUnit} (flattened) to persistent storage so that
worker can fetch that based on information
- * fetched in Helix task.
- */
- private void addWorkUnit(WorkUnit workUnit, ParallelRunner stateSerDeRunner,
- Map<String, TaskConfig> taskConfigMap) throws IOException {
- String workUnitFilePath = persistWorkUnit(
- new Path(this.inputWorkUnitDir, this.jobContext.getJobId()), workUnit,
stateSerDeRunner);
+ private TaskConfig getTaskConfig(WorkUnit workUnit, ParallelRunner
stateSerDeRunner) throws IOException {
+ String workUnitFilePath =
+ persistWorkUnit(new Path(this.inputWorkUnitDir,
this.jobContext.getJobId()), workUnit, stateSerDeRunner);
Map<String, String> rawConfigMap = Maps.newHashMap();
rawConfigMap.put(GobblinClusterConfigurationKeys.WORK_UNIT_FILE_PATH,
workUnitFilePath);
@@ -412,8 +466,42 @@ public class GobblinHelixJobLauncher extends
AbstractJobLauncher {
rawConfigMap.put(ConfigurationKeys.JOB_ID_KEY, this.jobContext.getJobId());
rawConfigMap.put(ConfigurationKeys.TASK_ID_KEY, workUnit.getId());
rawConfigMap.put(GobblinClusterConfigurationKeys.TASK_SUCCESS_OPTIONAL_KEY,
"true");
+ TaskConfig taskConfig = TaskConfig.Builder.from(rawConfigMap);
+ workUnitToHelixConfig.put(workUnit.getId(), taskConfig);
+ return taskConfig;
+ }
+
+ /**
+ * Add a single {@link WorkUnit} (flattened) to persistent storage so that
worker can fetch that based on information
+ * fetched in Helix task.
+ */
+ private void addWorkUnit(WorkUnit workUnit, ParallelRunner stateSerDeRunner,
Map<String, TaskConfig> taskConfigMap)
+ throws IOException {
+ taskConfigMap.put(workUnit.getId(), getTaskConfig(workUnit,
stateSerDeRunner));
+ }
- taskConfigMap.put(workUnit.getId(), TaskConfig.Builder.from(rawConfigMap));
+ /**
+ * Delete a single {@link WorkUnit} (flattened) from state store.
+ */
+ private void deleteWorkUnitFromStateStore(String workUnitId, ParallelRunner
stateSerDeRunner) {
+ String workUnitFilePath =
+
workUnitToHelixConfig.get(workUnitId).getConfigMap().get(GobblinClusterConfigurationKeys.WORK_UNIT_FILE_PATH);
+ final StateStore stateStore;
+ Path workUnitFile = new Path(workUnitFilePath);
+ final String fileName = workUnitFile.getName();
+ final String storeName = workUnitFile.getParent().getName();
+ if (fileName.endsWith(MULTI_WORK_UNIT_FILE_EXTENSION)) {
+ stateStore = stateStores.getMwuStateStore();
+ } else {
+ stateStore = stateStores.getWuStateStore();
+ }
+ stateSerDeRunner.submitCallable(new Callable<Void>() {
+ @Override
+ public Void call() throws Exception {
+ stateStore.delete(storeName, fileName);
+ return null;
+ }
+ }, "Delete state " + fileName + " from store " + storeName);
}
/**
@@ -446,28 +534,23 @@ public class GobblinHelixJobLauncher extends
AbstractJobLauncher {
return workUnitFile.toString();
}
- private void waitForJobCompletion() throws InterruptedException {
- boolean timeoutEnabled = Boolean.parseBoolean(this.jobProps.getProperty(
- GobblinClusterConfigurationKeys.HELIX_JOB_TIMEOUT_ENABLED_KEY,
- GobblinClusterConfigurationKeys.DEFAULT_HELIX_JOB_TIMEOUT_ENABLED));
- long timeoutInSeconds = Long.parseLong(this.jobProps.getProperty(
- GobblinClusterConfigurationKeys.HELIX_JOB_TIMEOUT_SECONDS,
- GobblinClusterConfigurationKeys.DEFAULT_HELIX_JOB_TIMEOUT_SECONDS));
+ private void waitForJobCompletion() throws InterruptedException {
+ boolean timeoutEnabled = Boolean.parseBoolean(
+
this.jobProps.getProperty(GobblinClusterConfigurationKeys.HELIX_JOB_TIMEOUT_ENABLED_KEY,
+
GobblinClusterConfigurationKeys.DEFAULT_HELIX_JOB_TIMEOUT_ENABLED));
+ long timeoutInSeconds = Long.parseLong(
+
this.jobProps.getProperty(GobblinClusterConfigurationKeys.HELIX_JOB_TIMEOUT_SECONDS,
+
GobblinClusterConfigurationKeys.DEFAULT_HELIX_JOB_TIMEOUT_SECONDS));
- long stoppingStateTimeoutInSeconds = PropertiesUtils
- .getPropAsLong(this.jobProps,
GobblinClusterConfigurationKeys.HELIX_JOB_STOPPING_STATE_TIMEOUT_SECONDS,
-
GobblinClusterConfigurationKeys.DEFAULT_HELIX_JOB_STOPPING_STATE_TIMEOUT_SECONDS);
+ long stoppingStateTimeoutInSeconds =
PropertiesUtils.getPropAsLong(this.jobProps,
+
GobblinClusterConfigurationKeys.HELIX_JOB_STOPPING_STATE_TIMEOUT_SECONDS,
+
GobblinClusterConfigurationKeys.DEFAULT_HELIX_JOB_STOPPING_STATE_TIMEOUT_SECONDS);
try {
- HelixUtils.waitJobCompletion(
- this.helixManager,
- this.helixWorkFlowName,
- this.jobContext.getJobId(),
- timeoutEnabled? Optional.of(timeoutInSeconds) : Optional.empty(),
- stoppingStateTimeoutInSeconds);
+ HelixUtils.waitJobCompletion(this.helixManager, this.helixWorkFlowName,
this.jobContext.getJobId(),
+ timeoutEnabled ? Optional.of(timeoutInSeconds) : Optional.empty(),
stoppingStateTimeoutInSeconds);
} catch (TimeoutException te) {
- HelixUtils.handleJobTimeout(helixWorkFlowName, jobContext.getJobId(),
- helixManager, this, this.jobListener);
+ HelixUtils.handleJobTimeout(helixWorkFlowName, jobContext.getJobId(),
helixManager, this, this.jobListener);
}
}
@@ -486,7 +569,8 @@ public class GobblinHelixJobLauncher extends
AbstractJobLauncher {
if (this.stateStores.haveJobStateStore()) {
this.stateStores.getJobStateStore().delete(this.jobContext.getJobId());
} else {
- Path jobStateFilePath = GobblinClusterUtils.getJobStateFilePath(false,
this.appWorkDir, this.jobContext.getJobId());
+ Path jobStateFilePath =
+ GobblinClusterUtils.getJobStateFilePath(false, this.appWorkDir,
this.jobContext.getJobId());
this.fs.delete(jobStateFilePath, false);
}
}
@@ -497,7 +581,8 @@ public class GobblinHelixJobLauncher extends
AbstractJobLauncher {
* @param inputTags list of metadata tags
* @return
*/
- private static List<? extends Tag<?>> addAdditionalMetadataTags(Properties
jobProps, List<? extends Tag<?>> inputTags) {
+ private static List<? extends Tag<?>> addAdditionalMetadataTags(Properties
jobProps,
+ List<? extends Tag<?>> inputTags) {
List<Tag<?>> metadataTags = Lists.newArrayList(inputTags);
String jobId;
@@ -515,8 +600,8 @@ public class GobblinHelixJobLauncher extends
AbstractJobLauncher {
if (jobProps.containsKey(ConfigurationKeys.FLOW_NAME_KEY)) {
metadataTags.add(new
Tag<>(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD,
jobProps.getProperty(ConfigurationKeys.FLOW_GROUP_KEY, "")));
- metadataTags.add(
- new Tag<>(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD,
jobProps.getProperty(ConfigurationKeys.FLOW_NAME_KEY)));
+ metadataTags.add(new
Tag<>(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD,
+ jobProps.getProperty(ConfigurationKeys.FLOW_NAME_KEY)));
// use job execution id if flow execution id is not present
metadataTags.add(new
Tag<>(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD,
diff --git
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java
index fe23f20..80962b4 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java
@@ -36,6 +36,7 @@ import org.apache.helix.manager.zk.ZKHelixManager;
import org.apache.helix.model.HelixConfigScope;
import org.apache.helix.model.InstanceConfig;
import org.apache.helix.task.JobConfig;
+import org.apache.helix.task.JobContext;
import org.apache.helix.task.TargetState;
import org.apache.helix.task.TaskConfig;
import org.apache.helix.task.TaskDriver;
@@ -145,6 +146,33 @@ public class HelixUtils {
log.info("Work flow {} initialized", workFlowName);
}
+ protected static boolean deleteTaskFromHelixJob(String workFlowName,
+ String jobName, String taskID, TaskDriver helixTaskDriver) {
+ try {
+ log.info(String.format("try to delete task %s from workflow %s, job %s",
taskID, workFlowName, jobName));
+ helixTaskDriver.deleteTask(workFlowName, jobName, taskID);
+ } catch (Exception e) {
+ e.printStackTrace();
+ return
!helixTaskDriver.getJobConfig(TaskUtil.getNamespacedJobName(workFlowName,
jobName)).getMapConfigs().containsKey(taskID);
+ }
+ return true;
+ }
+
+ protected static boolean addTaskToHelixJob(String workFlowName,
+ String jobName, TaskConfig taskConfig, TaskDriver helixTaskDriver) {
+ String taskId = taskConfig.getId();
+ try {
+ log.info(String.format("try to add task %s to workflow %s, job %s",
taskId, workFlowName, jobName));
+ helixTaskDriver.addTask(workFlowName, jobName, taskConfig);
+ } catch (Exception e) {
+ e.printStackTrace();
+ JobContext jobContext =
+
helixTaskDriver.getJobContext(TaskUtil.getNamespacedJobName(workFlowName,
jobName));
+ return jobContext.getTaskIdPartitionMap().containsKey(taskId);
+ }
+ return true;
+ }
+
public static void submitJobToWorkFlow(JobConfig.Builder jobConfigBuilder,
String workFlowName,
String jobName,
diff --git
a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/ClusterIntegrationTest.java
b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/ClusterIntegrationTest.java
index fb5b8e5..a24e627 100644
---
a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/ClusterIntegrationTest.java
+++
b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/ClusterIntegrationTest.java
@@ -29,12 +29,12 @@ import org.apache.helix.HelixManager;
import org.apache.helix.HelixManagerFactory;
import org.apache.helix.InstanceType;
import org.apache.helix.ZNRecord;
-import org.apache.helix.manager.zk.ChainedPathZkSerializer;
-import org.apache.helix.manager.zk.PathBasedZkSerializer;
import org.apache.helix.manager.zk.ZNRecordStreamingSerializer;
import org.apache.helix.manager.zk.ZkClient;
import org.apache.helix.task.TargetState;
import org.apache.helix.task.TaskDriver;
+import org.apache.helix.zookeeper.datamodel.serializer.ChainedPathZkSerializer;
+import org.apache.helix.zookeeper.zkclient.serialize.PathBasedZkSerializer;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.Test;
@@ -89,8 +89,8 @@ public class ClusterIntegrationTest {
IntegrationJobCancelSuite.TASK_STATE_FILE)
.withValue(SleepingTask.SLEEP_TIME_IN_SECONDS,
ConfigValueFactory.fromAnyRef(100));
this.suite = new IntegrationJobCancelSuite(jobConfigOverrides);
- HelixManager helixManager = getHelixManager();
suite.startCluster();
+ HelixManager helixManager = getHelixManager();
helixManager.connect();
ExecutorService executor = Executors.newSingleThreadExecutor();
diff --git
a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinTaskRunnerTest.java
b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinTaskRunnerTest.java
index 448a91d..904c552 100644
---
a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinTaskRunnerTest.java
+++
b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinTaskRunnerTest.java
@@ -210,12 +210,13 @@ public class GobblinTaskRunnerTest {
Config jobConfigOverrides =
ClusterIntegrationTestUtils.buildSleepingJob(JOB_ID, TASK_STATE_FILE);
this.suite = new TaskAssignmentAfterConnectionRetry(jobConfigOverrides);
+ suite.startCluster();
+
String zkConnectString =
suite.getManagerConfig().getString(GobblinClusterConfigurationKeys.ZK_CONNECTION_STRING_KEY);
String clusterName =
suite.getManagerConfig().getString(GobblinClusterConfigurationKeys.HELIX_CLUSTER_NAME_KEY);
//A test manager instance for observing the state of the cluster
HelixManager helixManager =
HelixManagerFactory.getZKHelixManager(clusterName, "TestManager",
InstanceType.SPECTATOR, zkConnectString);
- suite.startCluster();
helixManager.connect();
diff --git
a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/HelixAssignedParticipantCheckTest.java
b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/HelixAssignedParticipantCheckTest.java
index 57d9fd6..f38631a 100644
---
a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/HelixAssignedParticipantCheckTest.java
+++
b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/HelixAssignedParticipantCheckTest.java
@@ -50,16 +50,16 @@ public class HelixAssignedParticipantCheckTest {
suite = new IntegrationBasicSuite(jobConfigOverrides);
helixConfig = suite.getManagerConfig();
- String clusterName =
helixConfig.getString(GobblinClusterConfigurationKeys.HELIX_CLUSTER_NAME_KEY);
- String zkConnectString =
helixConfig.getString(GobblinClusterConfigurationKeys.ZK_CONNECTION_STRING_KEY);
- helixManager = HelixManagerFactory.getZKHelixManager(clusterName,
"TestManager",
- InstanceType.SPECTATOR, zkConnectString);
}
@Test (groups = {"disabledOnCI"})
//Test disabled on Travis because cluster integration tests are generally
flaky on Travis.
public void testExecute() throws Exception {
suite.startCluster();
+ String clusterName =
helixConfig.getString(GobblinClusterConfigurationKeys.HELIX_CLUSTER_NAME_KEY);
+ String zkConnectString =
helixConfig.getString(GobblinClusterConfigurationKeys.ZK_CONNECTION_STRING_KEY);
+ helixManager = HelixManagerFactory.getZKHelixManager(clusterName,
"TestManager",
+ InstanceType.SPECTATOR, zkConnectString);
//Connect to the previously started Helix cluster
helixManager.connect();
diff --git
a/gobblin-modules/gobblin-helix/src/main/java/org/apache/gobblin/metastore/ZkStateStore.java
b/gobblin-modules/gobblin-helix/src/main/java/org/apache/gobblin/metastore/ZkStateStore.java
index e383060..ea321a3 100644
---
a/gobblin-modules/gobblin-helix/src/main/java/org/apache/gobblin/metastore/ZkStateStore.java
+++
b/gobblin-modules/gobblin-helix/src/main/java/org/apache/gobblin/metastore/ZkStateStore.java
@@ -32,7 +32,6 @@ import java.util.List;
import java.util.zip.GZIPInputStream;
import java.util.zip.GZIPOutputStream;
-import org.I0Itec.zkclient.serialize.ZkSerializer;
import org.apache.commons.lang.ArrayUtils;
import org.apache.hadoop.io.Text;
import org.apache.helix.AccessOption;
@@ -47,6 +46,7 @@ import com.google.common.collect.Lists;
import org.apache.gobblin.configuration.State;
import org.apache.gobblin.util.io.StreamUtils;
+import org.apache.helix.zookeeper.zkclient.serialize.ZkSerializer;
/**
diff --git
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaStreamingExtractor.java
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaStreamingExtractor.java
index bc2904b..a29bd0f 100644
---
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaStreamingExtractor.java
+++
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaStreamingExtractor.java
@@ -291,7 +291,7 @@ public class KafkaStreamingExtractor<S> extends
FlushingExtractor<S, DecodeableK
return longWatermarkMap;
}
- private List<KafkaPartition> getTopicPartitionsFromWorkUnit(WorkUnitState
state) {
+ public static List<KafkaPartition>
getTopicPartitionsFromWorkUnit(WorkUnitState state) {
// what topic partitions are we responsible for?
List<KafkaPartition> topicPartitions = new ArrayList<>();
@@ -302,7 +302,7 @@ public class KafkaStreamingExtractor<S> extends
FlushingExtractor<S, DecodeableK
for (int i = 0; i < numOfPartitions; ++i) {
if (workUnit.getProp(topicNameProp, null) == null) {
- log.warn("There's no topic.name property being set in workunt which
could be an illegal state");
+ log.warn("There's no topic.name property being set in workunit which
could be an illegal state");
break;
}
String topicName = workUnit.getProp(topicNameProp);
diff --git
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/UniversalKafkaSource.java
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/UniversalKafkaSource.java
index b227636..ad80334 100644
---
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/UniversalKafkaSource.java
+++
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/UniversalKafkaSource.java
@@ -17,11 +17,17 @@
package org.apache.gobblin.source.extractor.extract.kafka;
+import com.google.common.eventbus.EventBus;
import java.io.IOException;
+import java.util.List;
+import lombok.extern.slf4j.Slf4j;
import org.apache.gobblin.configuration.WorkUnitState;
+import org.apache.gobblin.source.InfiniteSource;
import org.apache.gobblin.source.extractor.Extractor;
import org.apache.gobblin.source.extractor.extract.EventBasedExtractor;
+import org.apache.gobblin.source.workunit.WorkUnit;
+import org.apache.gobblin.stream.WorkUnitChangeEvent;
import org.apache.gobblin.util.ClassAliasResolver;
import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
@@ -32,9 +38,11 @@ import com.google.common.base.Preconditions;
* A {@link KafkaSource} to use with arbitrary {@link EventBasedExtractor}.
Specify the extractor to use with key
* {@link #EXTRACTOR_TYPE}.
*/
-public class UniversalKafkaSource<S, D> extends KafkaSource<S, D> {
+@Slf4j
+public class UniversalKafkaSource<S, D> extends KafkaSource<S, D> implements
InfiniteSource<S, D> {
public static final String EXTRACTOR_TYPE =
"gobblin.source.kafka.extractorType";
+ private final EventBus eventBus = new
EventBus(this.getClass().getSimpleName());
@Override
public Extractor<S, D> getExtractor(WorkUnitState state)
@@ -50,4 +58,16 @@ public class UniversalKafkaSource<S, D> extends
KafkaSource<S, D> {
throw new RuntimeException(e);
}
}
+
+ public void onWorkUnitUpdate(List<String> oldTaskIds, List<WorkUnit>
newWorkUnits) {
+ if (this.eventBus != null) {
+ log.info("post workunit change event");
+ this.eventBus.post(new WorkUnitChangeEvent(oldTaskIds, newWorkUnits));
+ }
+ }
+
+ @Override
+ public EventBus getEventBus() {
+ return this.eventBus;
+ }
}
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
index 684a00a..aa40bce 100644
---
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
+++
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
@@ -17,7 +17,10 @@
package org.apache.gobblin.runtime;
+import com.github.rholder.retry.RetryException;
+import com.google.common.eventbus.Subscribe;
import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
import java.net.Authenticator;
import java.net.URI;
import java.net.URISyntaxException;
@@ -27,11 +30,15 @@ import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang.StringUtils;
+import org.apache.gobblin.service.ServiceConfigKeys;
+import org.apache.gobblin.source.InfiniteSource;
+import org.apache.gobblin.stream.WorkUnitChangeEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
@@ -92,7 +99,6 @@ import
org.apache.gobblin.runtime.troubleshooter.AutomaticTroubleshooter;
import
org.apache.gobblin.runtime.troubleshooter.AutomaticTroubleshooterFactory;
import org.apache.gobblin.runtime.troubleshooter.IssueRepository;
import org.apache.gobblin.runtime.util.JobMetrics;
-import org.apache.gobblin.service.ServiceConfigKeys;
import org.apache.gobblin.source.Source;
import org.apache.gobblin.source.WorkUnitStreamSource;
import org.apache.gobblin.source.extractor.JobCommitPolicy;
@@ -260,6 +266,27 @@ public abstract class AbstractJobLauncher implements
JobLauncher {
}
/**
+ * Handle {@link WorkUnitChangeEvent}, by default it will do nothing
+ */
+ @Subscribe
+ public void handleWorkUnitChangeEvent(WorkUnitChangeEvent
workUnitChangeEvent)
+ throws InvocationTargetException {
+ LOG.info("start to handle workunit change event");
+ try {
+ this.removeTasksFromCurrentJob(workUnitChangeEvent.getOldTaskIds());
+ this.addTasksToCurrentJob(workUnitChangeEvent.getNewWorkUnits());
+ } catch (Exception e) {
+ //todo: emit some event to indicate there is an error handling this
event that may cause starvation
+ throw new InvocationTargetException(e);
+ }
+ }
+
+ protected void removeTasksFromCurrentJob(List<String> taskIdsToRemove)
throws IOException, ExecutionException,
+
RetryException {}
+ protected void addTasksToCurrentJob(List<WorkUnit> workUnitsToAdd) throws
IOException, ExecutionException,
+
RetryException {}
+
+ /**
* To supporting 'gobblin.template.uri' in any types of jobLauncher, place
this resolution as a public-static method
* to make it accessible for all implementation of JobLauncher and
**AzkabanJobLauncher**.
*
@@ -427,6 +454,13 @@ public abstract class AbstractJobLauncher implements
JobLauncher {
TimingEvent workUnitsCreationTimer =
this.eventSubmitter.getTimingEvent(TimingEvent.LauncherTimings.WORK_UNITS_CREATION);
Source<?, ?> source = this.jobContext.getSource();
+ if (source instanceof InfiniteSource) {
+ ((InfiniteSource) source).getEventBus().register(this);
+ } else if (source instanceof SourceDecorator) {
+ if (((SourceDecorator<?, ?>) source).getEventBus() != null) {
+ ((SourceDecorator<?, ?>) source).getEventBus().register(this);
+ }
+ }
WorkUnitStream workUnitStream;
if (source instanceof WorkUnitStreamSource) {
workUnitStream = ((WorkUnitStreamSource)
source).getWorkunitStream(jobState);
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/SourceDecorator.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/SourceDecorator.java
index 4558325..f1e8928 100644
---
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/SourceDecorator.java
+++
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/SourceDecorator.java
@@ -17,10 +17,12 @@
package org.apache.gobblin.runtime;
+import com.google.common.eventbus.EventBus;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
+import org.apache.gobblin.source.InfiniteSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -72,6 +74,15 @@ public class SourceDecorator<S, D> implements
WorkUnitStreamSource<S, D>, Decora
}
}
+ public EventBus getEventBus() {
+ if (this.getDecoratedObject() instanceof InfiniteSource) {
+ return ((InfiniteSource) this.getDecoratedObject()).getEventBus();
+ } else if (this.getDecoratedObject() instanceof SourceDecorator) {
+ return ((SourceDecorator) this.getDecoratedObject()).getEventBus();
+ }
+ return null;
+ }
+
@Override
public WorkUnitStream getWorkunitStream(SourceState state) {
try {
diff --git a/gobblin-service/build.gradle b/gobblin-service/build.gradle
index 68f3ac7..6fb986e 100644
--- a/gobblin-service/build.gradle
+++ b/gobblin-service/build.gradle
@@ -53,7 +53,9 @@ dependencies {
compile externalDependency.guiceMultibindings
compile externalDependency.hadoopClientCommon
compile externalDependency.hadoopCommon
- compile externalDependency.helix
+ compile (externalDependency.helix) {
+ exclude group: 'io.dropwizard.metrics', module: 'metrics-core'
+ }
compile externalDependency.hiveCommon
compile externalDependency.httpclient
compile externalDependency.httpcore
diff --git a/gobblin-yarn/build.gradle b/gobblin-yarn/build.gradle
index 290dab3..1c01e55 100644
--- a/gobblin-yarn/build.gradle
+++ b/gobblin-yarn/build.gradle
@@ -56,7 +56,9 @@ dependencies {
compile externalDependency.hadoopYarnClient
compile externalDependency.avroMapredH2
compile externalDependency.findBugsAnnotations
- compile externalDependency.helix
+ compile (externalDependency.helix) {
+ exclude group: 'io.dropwizard.metrics', module: 'metrics-core'
+ }
testCompile project(path: ':gobblin-cluster', configuration: 'tests')
testCompile project(":gobblin-example")
diff --git
a/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnServiceTestWithExpiration.java
b/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnServiceTestWithExpiration.java
index 124d0f5..1934ece 100644
---
a/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnServiceTestWithExpiration.java
+++
b/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnServiceTestWithExpiration.java
@@ -203,14 +203,16 @@ public class YarnServiceTestWithExpiration {
Assert.assertFalse(this.expiredYarnService.getMatchingRequestsList(64,
1).isEmpty());
Assert.assertEquals(this.expiredYarnService.getNumRequestedContainers(),
10);
- try {
- Thread.sleep(20000);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- }
- //Since it may retry to request the container and start again, so the
number may lager than 10
- Assert.assertTrue(this.expiredYarnService.completedContainers.size() >=
10);
- Assert.assertTrue(this.expiredYarnService.startErrorContainers.size() >=
10);
+
AssertWithBackoff.create().logger(LOG).timeoutMs(60000).maxSleepMs(2000).backoffFactor(1.5)
+ .assertTrue(new Predicate<Void>() {
+ @Override
+ public boolean apply(Void input) {
+ //Since it may retry to request the container and start again, so
the number may lager than 10
+ return expiredYarnService.completedContainers.size() >= 10
+ && expiredYarnService.startErrorContainers.size() >= 10;
+ }
+ }, "Waiting for container completed");
+
}
private static class TestExpiredYarnService extends
YarnServiceTest.TestYarnService {
@@ -240,7 +242,7 @@ public class YarnServiceTestWithExpiration {
Thread.currentThread().interrupt();
}
return BuilderUtils.newContainerLaunchContext(Collections.emptyMap(),
Collections.emptyMap(),
- Arrays.asList("sleep", "60000"), Collections.emptyMap(), null,
Collections.emptyMap());
+ Arrays.asList("sleep", "600"), Collections.emptyMap(), null,
Collections.emptyMap());
}
private class TestNMClientCallbackHandler extends
YarnService.NMClientCallbackHandler {
@Override
diff --git a/gradle/scripts/dependencyDefinitions.gradle
b/gradle/scripts/dependencyDefinitions.gradle
index cfe9646..56562e5 100644
--- a/gradle/scripts/dependencyDefinitions.gradle
+++ b/gradle/scripts/dependencyDefinitions.gradle
@@ -67,7 +67,7 @@ ext.externalDependency = [
"hadoopAnnotations": "org.apache.hadoop:hadoop-annotations:" +
hadoopVersion,
"hadoopAws": "org.apache.hadoop:hadoop-aws:2.6.0",
"hdrHistogram": "org.hdrhistogram:HdrHistogram:2.1.11",
- "helix": "org.apache.helix:helix-core:0.9.4",
+ "helix": "org.apache.helix:helix-core:1.0.2",
"hiveCommon": "org.apache.hive:hive-common:" + hiveVersion,
"hiveService": "org.apache.hive:hive-service:" + hiveVersion,
"hiveJdbc": "org.apache.hive:hive-jdbc:" + hiveVersion,