This is an automated email from the ASF dual-hosted git repository.
suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 6cca9eb [GOBBLIN-1043] Implement a Helix assigned participant check
as a Commi…
6cca9eb is described below
commit 6cca9eb55c9b64556d75de999ba762b5c886378b
Author: sv2000 <[email protected]>
AuthorDate: Fri Feb 7 11:00:26 2020 -0800
[GOBBLIN-1043] Implement a Helix assigned participant check as a Commi…
Closes #2883 from sv2000/helixSplitBrain
---
.../apache/gobblin/commit/CommitStepException.java | 30 +++++
.../cluster/GobblinClusterConfigurationKeys.java | 10 ++
.../apache/gobblin/cluster/GobblinHelixTask.java | 39 +++++-
.../gobblin/cluster/GobblinHelixTaskFactory.java | 27 +++-
.../apache/gobblin/cluster/GobblinTaskRunner.java | 11 ++
.../cluster/HelixAssignedParticipantCheck.java | 144 +++++++++++++++++++++
.../gobblin/cluster/TaskRunnerSuiteBase.java | 12 ++
.../gobblin/cluster/ClusterIntegrationTest.java | 4 +-
.../gobblin/cluster/GobblinHelixTaskTest.java | 24 +++-
.../cluster/HelixAssignedParticipantCheckTest.java | 124 ++++++++++++++++++
10 files changed, 414 insertions(+), 11 deletions(-)
diff --git
a/gobblin-api/src/main/java/org/apache/gobblin/commit/CommitStepException.java
b/gobblin-api/src/main/java/org/apache/gobblin/commit/CommitStepException.java
new file mode 100644
index 0000000..ef3f90e
--- /dev/null
+++
b/gobblin-api/src/main/java/org/apache/gobblin/commit/CommitStepException.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.commit;
+
+import java.io.IOException;
+
+
+public class CommitStepException extends IOException {
+ public CommitStepException(String message, Throwable t) {
+ super(message, t);
+ }
+
+ public CommitStepException(String message) {
+ super(message);
+ }
+}
diff --git
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
index 787130e..27e909d 100644
---
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
+++
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
@@ -144,10 +144,12 @@ public class GobblinClusterConfigurationKeys {
public static final long DEFAULT_HELIX_JOB_STOP_TIMEOUT_SECONDS = 10L;
public static final String TASK_RUNNER_SUITE_BUILDER =
GOBBLIN_CLUSTER_PREFIX + "taskRunnerSuite.builder";
+ public static final String HELIX_JOB_NAME_KEY = GOBBLIN_CLUSTER_PREFIX +
"helixJobName";
public static final String HELIX_JOB_TIMEOUT_ENABLED_KEY =
"helix.job.timeout.enabled";
public static final String DEFAULT_HELIX_JOB_TIMEOUT_ENABLED = "false";
public static final String HELIX_JOB_TIMEOUT_SECONDS =
"helix.job.timeout.seconds";
public static final String DEFAULT_HELIX_JOB_TIMEOUT_SECONDS = "10800";
+ public static final String HELIX_TASK_NAME_KEY = GOBBLIN_CLUSTER_PREFIX +
"helixTaskName";
public static final String HELIX_TASK_TIMEOUT_SECONDS =
"helix.task.timeout.seconds";
public static final String HELIX_TASK_MAX_ATTEMPTS_KEY =
"helix.task.maxAttempts";
@@ -183,4 +185,12 @@ public class GobblinClusterConfigurationKeys {
public static final boolean DEFAULT_IS_HELIX_CLUSTER_MANAGED = false;
public static final String HADOOP_CONFIG_OVERRIDES_PREFIX =
GOBBLIN_CLUSTER_PREFIX + "hadoop.inject";
+
+ //Configurations that will be set dynamically when a
GobblinTaskRunner/GobblinHelixTask are instantiated.
+ public static final String GOBBLIN_HELIX_PREFIX = "gobblin.helix.";
+ public static final String HELIX_JOB_ID_KEY = GOBBLIN_HELIX_PREFIX + "jobId";
+ public static final String HELIX_TASK_ID_KEY = GOBBLIN_HELIX_PREFIX +
"taskId";
+ public static final String HELIX_PARTITION_ID_KEY = GOBBLIN_HELIX_PREFIX +
"partitionId" ;
+ public static final String TASK_RUNNER_HOST_NAME_KEY = GOBBLIN_HELIX_PREFIX
+ "hostName";
+ public static final String CONTAINER_ID_KEY = GOBBLIN_HELIX_PREFIX +
"containerId";
}
diff --git
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTask.java
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTask.java
index e124aca..122a8d5 100644
---
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTask.java
+++
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTask.java
@@ -17,19 +17,22 @@
package org.apache.gobblin.cluster;
-import java.io.IOException;
import java.util.Map;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.helix.task.JobContext;
import org.apache.helix.task.Task;
import org.apache.helix.task.TaskCallbackContext;
import org.apache.helix.task.TaskConfig;
+import org.apache.helix.task.TaskDriver;
import org.apache.helix.task.TaskResult;
import org.slf4j.MDC;
import com.google.common.base.Throwables;
import com.google.common.io.Closer;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigValueFactory;
import lombok.extern.slf4j.Slf4j;
@@ -67,18 +70,23 @@ public class GobblinHelixTask implements Task {
private String jobName;
private String jobId;
+ private String helixJobId;
private String jobKey;
private String taskId;
private Path workUnitFilePath;
private GobblinHelixTaskMetrics taskMetrics;
private SingleTask task;
+ private String helixTaskId;
public GobblinHelixTask(TaskRunnerSuiteBase.Builder builder,
TaskCallbackContext taskCallbackContext,
TaskAttemptBuilder taskAttemptBuilder,
StateStores stateStores,
- GobblinHelixTaskMetrics taskMetrics) {
+ GobblinHelixTaskMetrics taskMetrics,
+ TaskDriver taskDriver)
+ {
this.taskConfig = taskCallbackContext.getTaskConfig();
+ this.helixJobId = taskCallbackContext.getJobConfig().getJobId();
this.applicationName = builder.getApplicationName();
this.instanceName = builder.getInstanceName();
this.taskMetrics = taskMetrics;
@@ -89,19 +97,35 @@ public class GobblinHelixTask implements Task {
builder.getAppWorkPath(),
this.jobId);
+ Config dynamicConfig = builder.getDynamicConfig()
+ .withValue(GobblinClusterConfigurationKeys.TASK_RUNNER_HOST_NAME_KEY,
ConfigValueFactory.fromAnyRef(builder.getHostName()))
+ .withValue(GobblinClusterConfigurationKeys.CONTAINER_ID_KEY,
ConfigValueFactory.fromAnyRef(builder.getContainerId()))
+ .withValue(GobblinClusterConfigurationKeys.HELIX_INSTANCE_NAME_KEY,
ConfigValueFactory.fromAnyRef(builder.getInstanceName()))
+ .withValue(GobblinClusterConfigurationKeys.HELIX_JOB_ID_KEY,
ConfigValueFactory.fromAnyRef(this.helixJobId))
+ .withValue(GobblinClusterConfigurationKeys.HELIX_TASK_ID_KEY,
ConfigValueFactory.fromAnyRef(this.helixTaskId));
+
+ Integer partitionNum = getPartitionForHelixTask(taskDriver);
+
+ if (partitionNum == null) {
+ throw new IllegalStateException(String.format("Task %s, job %s on
instance %s has no partition assigned",
+ this.helixTaskId, builder.getInstanceName(), this.helixJobId));
+ }
+
+ dynamicConfig =
dynamicConfig.withValue(GobblinClusterConfigurationKeys.HELIX_PARTITION_ID_KEY,
ConfigValueFactory.fromAnyRef(partitionNum));
this.task = new SingleTask(this.jobId,
this.workUnitFilePath,
jobStateFilePath,
builder.getFs(),
taskAttemptBuilder,
stateStores,
- builder.getDynamicConfig());
+ dynamicConfig);
}
private void getInfoFromTaskConfig() {
Map<String, String> configMap = this.taskConfig.getConfigMap();
this.jobName = configMap.get(ConfigurationKeys.JOB_NAME_KEY);
this.jobId = configMap.get(ConfigurationKeys.JOB_ID_KEY);
+ this.helixTaskId = this.taskConfig.getId();
this.jobKey = Long.toString(Id.parse(this.jobId).getSequence());
this.taskId = configMap.get(ConfigurationKeys.TASK_ID_KEY);
this.workUnitFilePath =
@@ -135,6 +159,15 @@ public class GobblinHelixTask implements Task {
}
}
+ private Integer getPartitionForHelixTask(TaskDriver taskDriver) {
+ //Get Helix partition id for this task
+ JobContext jobContext = taskDriver.getJobContext(this.helixJobId);
+ if (jobContext != null) {
+ return jobContext.getTaskIdPartitionMap().get(this.helixTaskId);
+ }
+ return null;
+ }
+
@Override
public void cancel() {
log.warn("Gobblin helix task cancellation invoked.");
diff --git
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTaskFactory.java
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTaskFactory.java
index 6bf3d5f..fdf8b27 100644
---
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTaskFactory.java
+++
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTaskFactory.java
@@ -21,11 +21,13 @@ import org.apache.hadoop.fs.Path;
import org.apache.helix.HelixManager;
import org.apache.helix.task.Task;
import org.apache.helix.task.TaskCallbackContext;
+import org.apache.helix.task.TaskDriver;
import org.apache.helix.task.TaskFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.codahale.metrics.Counter;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import com.typesafe.config.Config;
@@ -54,6 +56,7 @@ public class GobblinHelixTaskFactory implements TaskFactory {
private final Optional<ContainerMetrics> containerMetrics;
private final HelixManager helixManager;
+ private Optional<TaskDriver> taskDriver;
private TaskRunnerSuiteBase.Builder builder;
/**
@@ -70,9 +73,23 @@ public class GobblinHelixTaskFactory implements TaskFactory {
private final TaskAttemptBuilder taskAttemptBuilder;
public GobblinHelixTaskFactory(TaskRunnerSuiteBase.Builder builder,
+ MetricContext metricContext,
+ TaskStateTracker taskStateTracker,
+ Config stateStoreConfig) {
+ this(builder, metricContext, taskStateTracker, stateStoreConfig,
Optional.absent());
+ }
+
+ /**
+ * Constructor that allows passing in a {@link TaskDriver} instance. This
constructor is exposed purely for
+ * testing purposes to allow passing in a mock {@link TaskDriver} (e.g. see
GobblinHelixTaskTest). For other cases, use
+ * the constructor {@link
#GobblinHelixTaskFactory(TaskRunnerSuiteBase.Builder, MetricContext,
TaskStateTracker, Config)}.
+ */
+ @VisibleForTesting
+ public GobblinHelixTaskFactory(TaskRunnerSuiteBase.Builder builder,
MetricContext metricContext,
TaskStateTracker taskStateTracker,
- Config stateStoreConfig) {
+ Config stateStoreConfig,
+ Optional<TaskDriver> taskDriver) {
// initialize task related metrics
int windowSizeInMin = ConfigUtils.getInt(builder.getConfig(),
@@ -98,6 +115,7 @@ public class GobblinHelixTaskFactory implements TaskFactory {
appWorkDir,
GobblinClusterConfigurationKeys.JOB_STATE_DIR_NAME);
this.taskAttemptBuilder = createTaskAttemptBuilder();
+ this.taskDriver = taskDriver;
}
private TaskAttemptBuilder createTaskAttemptBuilder() {
@@ -113,6 +131,11 @@ public class GobblinHelixTaskFactory implements
TaskFactory {
if (this.newTasksCounter.isPresent()) {
this.newTasksCounter.get().inc();
}
- return new GobblinHelixTask(builder, context, this.taskAttemptBuilder,
this.stateStores, this.taskMetrics);
+
+ if (!this.taskDriver.isPresent()) {
+ this.taskDriver = Optional.of(new TaskDriver(context.getManager()));
+ }
+
+ return new GobblinHelixTask(builder, context, this.taskAttemptBuilder,
this.stateStores, this.taskMetrics, this.taskDriver.get());
}
}
diff --git
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java
index 22c21bf..9693227 100644
---
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java
+++
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java
@@ -18,7 +18,9 @@
package org.apache.gobblin.cluster;
import java.io.IOException;
+import java.net.InetAddress;
import java.net.URI;
+import java.net.UnknownHostException;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Collection;
@@ -180,6 +182,13 @@ public class GobblinTaskRunner implements
StandardMetricsBridge {
GobblinClusterConfigurationKeys.TASK_RUNNER_SUITE_BUILDER,
TaskRunnerSuiteBase.Builder.class.getName());
+ String hostName = "";
+ try {
+ hostName = InetAddress.getLocalHost().getHostName();
+ } catch (UnknownHostException e) {
+ logger.warn("Cannot find host name for Helix instance: {}",
this.helixInstanceName);
+ }
+
TaskRunnerSuiteBase.Builder builder =
GobblinConstructorUtils.<TaskRunnerSuiteBase.Builder>invokeLongestConstructor(
new ClassAliasResolver(TaskRunnerSuiteBase.Builder.class)
.resolveClass(builderStr), this.config);
@@ -191,6 +200,8 @@ public class GobblinTaskRunner implements
StandardMetricsBridge {
.setApplicationId(applicationId)
.setApplicationName(applicationName)
.setInstanceName(helixInstanceName)
+ .setContainerId(taskRunnerId)
+ .setHostName(hostName)
.build();
this.taskStateModelFactory =
createTaskStateModelFactory(suite.getTaskFactoryMap());
diff --git
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixAssignedParticipantCheck.java
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixAssignedParticipantCheck.java
new file mode 100644
index 0000000..065eb6f
--- /dev/null
+++
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixAssignedParticipantCheck.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.cluster;
+
+import java.io.IOException;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.InstanceType;
+import org.apache.helix.task.JobContext;
+import org.apache.helix.task.TaskDriver;
+
+import com.github.rholder.retry.AttemptTimeLimiters;
+import com.github.rholder.retry.RetryException;
+import com.github.rholder.retry.Retryer;
+import com.github.rholder.retry.RetryerBuilder;
+import com.github.rholder.retry.StopStrategies;
+import com.typesafe.config.Config;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.annotation.Alias;
+import org.apache.gobblin.commit.CommitStep;
+import org.apache.gobblin.commit.CommitStepException;
+
+
+/**
+ * A {@link CommitStep} that checks with Helix if a particular Helix instance
is still the assigned participant for a given
+ * Helix Partition. This {@link CommitStep} implementation is a safety check
against Helix and is intended to be used
+ * before data is published and state is committed. The primiary motivation
for this {@link CommitStep} is to avoid a "split-brain"
+ * scenario where a runaway Helix task continues to process a partition even
though Helix has assigned the same
+ * partition to a different Helix task. This can happen due to inconsistency
between the state of a task as maintained
+ * by Helix on ZK vs the local state of the task.
+ */
+@Slf4j
+@Alias (value = "HelixParticipantCheck")
+public class HelixAssignedParticipantCheck implements CommitStep {
+ private static volatile HelixManager helixManager = null;
+ private static volatile Retryer<Boolean> retryer =
RetryerBuilder.<Boolean>newBuilder()
+ .retryIfException()
+ .withStopStrategy(StopStrategies.stopAfterAttempt(3))
+ .withAttemptTimeLimiter(AttemptTimeLimiters.fixedTimeLimit(3000,
TimeUnit.MILLISECONDS)).build();
+
+ private final String helixInstanceName;
+ private final String helixJob;
+ private final int partitionNum;
+
+ private boolean isCompleted;
+
+ /**
+ * A method that uses the Singleton pattern to instantiate a {@link
HelixManager} instance.
+ * @param config
+ * @return
+ */
+ public static HelixManager getHelixManager(Config config) {
+ if (helixManager == null) {
+ synchronized (HelixAssignedParticipantCheck.class) {
+ if (helixManager == null) {
+ String zkConnectString =
config.getString(GobblinClusterConfigurationKeys.ZK_CONNECTION_STRING_KEY);
+ String clusterName =
config.getString(GobblinClusterConfigurationKeys.HELIX_CLUSTER_NAME_KEY);
+ helixManager = HelixManagerFactory.getZKHelixManager(clusterName,
HelixAssignedParticipantCheck.class.getSimpleName(),
+ InstanceType.SPECTATOR, zkConnectString);
+ }
+ }
+ }
+ return helixManager;
+ }
+
+ public HelixAssignedParticipantCheck(Config config) {
+ getHelixManager(config);
+ this.helixInstanceName =
config.getString(GobblinClusterConfigurationKeys.HELIX_INSTANCE_NAME_KEY);
+ this.helixJob =
config.getString(GobblinClusterConfigurationKeys.HELIX_JOB_ID_KEY);
+ this.partitionNum =
config.getInt(GobblinClusterConfigurationKeys.HELIX_PARTITION_ID_KEY);
+ }
+
+ /**
+ * Determine whether the commit step has been completed.
+ */
+ @Override
+ public boolean isCompleted()
+ throws IOException {
+ return isCompleted;
+ }
+
+ /**
+ * Execute the commit step.
+ */
+ @Override
+ public void execute() throws CommitStepException {
+ if (!helixManager.isConnected()) {
+ try {
+ helixManager.connect();
+ } catch (Exception e) {
+ throw new CommitStepException(String.format("Helix instance %s unable
to connect to Helix/ZK", helixInstanceName));
+ }
+ }
+ TaskDriver taskDriver = new TaskDriver(helixManager);
+ log.info(String.format("HelixParticipantCheck step called for Helix
Instance: %s, Helix job: %s, Helix partition: %d",
+ this.helixInstanceName, this.helixJob, this.partitionNum));
+
+ //Query Helix to get the currently assigned participant for the Helix
partitionNum
+ Callable callable = () -> {
+ JobContext jobContext = taskDriver.getJobContext(helixJob);
+ if (jobContext != null) {
+ String participant = jobContext.getAssignedParticipant(partitionNum);
+ if (participant != null) {
+ return participant.equalsIgnoreCase(helixInstanceName);
+ }
+ }
+ return false;
+ };
+
+ boolean isParticipant;
+ try {
+ isParticipant = retryer.call(callable);
+ } catch (ExecutionException | RetryException e) {
+ log.error("Cannot complete participant assignment check within the retry
limit due to: {}", e);
+ //Set isParticipant to true; since we cannot verify the status of the
Helix Participant at this time.
+ isParticipant = true;
+ }
+
+ this.isCompleted = true;
+ if (!isParticipant) {
+ throw new CommitStepException(String.format("Helix instance %s not the
assigned participant for partition %d",this.helixInstanceName,
this.partitionNum));
+ }
+ }
+}
diff --git
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/TaskRunnerSuiteBase.java
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/TaskRunnerSuiteBase.java
index 3488785..2175b57 100644
---
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/TaskRunnerSuiteBase.java
+++
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/TaskRunnerSuiteBase.java
@@ -91,6 +91,8 @@ public abstract class TaskRunnerSuiteBase {
private String applicationId;
private String applicationName;
private String instanceName;
+ private String hostName;
+ private String containerId;
public Builder(Config config) {
this.dynamicConfig = GobblinClusterUtils.getDynamicConfig(config);
@@ -112,6 +114,16 @@ public abstract class TaskRunnerSuiteBase {
return this;
}
+ public Builder setContainerId (String containerId) {
+ this.containerId = containerId;
+ return this;
+ }
+
+ public Builder setHostName(String hostName) {
+ this.hostName = hostName;
+ return this;
+ }
+
public Builder setApplicationId(String applicationId) {
this.applicationId = applicationId;
return this;
diff --git
a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/ClusterIntegrationTest.java
b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/ClusterIntegrationTest.java
index 95a6cb8..e85413f 100644
---
a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/ClusterIntegrationTest.java
+++
b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/ClusterIntegrationTest.java
@@ -180,11 +180,11 @@ public class ClusterIntegrationTest {
}, "Waiting for Workflow TargetState to be START");
}
- private Predicate<Void> isTaskStarted(HelixManager helixManager, String
jobId) {
+ public static Predicate<Void> isTaskStarted(HelixManager helixManager,
String jobId) {
return input -> TaskDriver.getWorkflowContext(helixManager, jobId) != null;
}
- private Predicate<Void> isTaskRunning(String taskStateFileName) {
+ public static Predicate<Void> isTaskRunning(String taskStateFileName) {
return input -> {
File taskStateFile = new File(taskStateFileName);
return taskStateFile.exists();
diff --git
a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixTaskTest.java
b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixTaskTest.java
index 9386bf6..24c597e 100644
---
a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixTaskTest.java
+++
b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixTaskTest.java
@@ -27,8 +27,11 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.helix.HelixManager;
+import org.apache.helix.task.JobConfig;
+import org.apache.helix.task.JobContext;
import org.apache.helix.task.TaskCallbackContext;
import org.apache.helix.task.TaskConfig;
+import org.apache.helix.task.TaskDriver;
import org.apache.helix.task.TaskResult;
import org.mockito.Mockito;
import org.testng.Assert;
@@ -36,7 +39,9 @@ import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
+import com.google.common.base.Joiner;
import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import com.typesafe.config.ConfigFactory;
@@ -48,10 +53,8 @@ import org.apache.gobblin.runtime.AbstractJobLauncher;
import org.apache.gobblin.runtime.JobState;
import org.apache.gobblin.runtime.TaskExecutor;
import org.apache.gobblin.source.workunit.WorkUnit;
-import org.apache.gobblin.util.ClassAliasResolver;
import org.apache.gobblin.util.Id;
import org.apache.gobblin.util.SerializationUtils;
-import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
import org.apache.gobblin.writer.AvroDataWriterBuilder;
import org.apache.gobblin.writer.Destination;
import org.apache.gobblin.writer.WriterOutputFormat;
@@ -102,7 +105,8 @@ public class GobblinHelixTaskTest {
}
@Test
- public void testPrepareTask() throws IOException {
+ public void testPrepareTask()
+ throws IOException, InterruptedException {
// Serialize the JobState that will be read later in GobblinHelixTask
Path jobStateFilePath =
new Path(appWorkDir, TestHelper.TEST_JOB_ID + "." +
AbstractJobLauncher.JOB_STATE_FILE_NAME);
@@ -141,7 +145,17 @@ public class GobblinHelixTaskTest {
TaskCallbackContext taskCallbackContext =
Mockito.mock(TaskCallbackContext.class);
Mockito.when(taskCallbackContext.getTaskConfig()).thenReturn(taskConfig);
Mockito.when(taskCallbackContext.getManager()).thenReturn(this.helixManager);
+ String helixJobId = Joiner.on("_").join(TestHelper.TEST_JOB_ID,
TestHelper.TEST_JOB_ID);
+ JobConfig jobConfig = Mockito.mock(JobConfig.class);
+ Mockito.when(jobConfig.getJobId()).thenReturn(helixJobId);
+ Mockito.when(taskCallbackContext.getJobConfig()).thenReturn(jobConfig);
+ JobContext mockJobContext = Mockito.mock(JobContext.class);
+ Map<String, Integer> taskIdPartitionMap =
ImmutableMap.of(taskConfig.getId(), 0);
+
Mockito.when(mockJobContext.getTaskIdPartitionMap()).thenReturn(taskIdPartitionMap);
+
+ TaskDriver taskDriver = Mockito.mock(TaskDriver.class);
+
Mockito.when(taskDriver.getJobContext(Mockito.anyString())).thenReturn(mockJobContext);
TaskRunnerSuiteBase.Builder builder = new
TaskRunnerSuiteBase.Builder(ConfigFactory.empty());
TaskRunnerSuiteBase sb = builder.setInstanceName("TestInstance")
@@ -157,9 +171,11 @@ public class GobblinHelixTaskTest {
new GobblinHelixTaskFactory(builder,
sb.metricContext,
this.taskStateTracker,
- ConfigFactory.empty());
+ ConfigFactory.empty(),
+ Optional.of(taskDriver));
this.gobblinHelixTask = (GobblinHelixTask)
gobblinHelixTaskFactory.createNewTask(taskCallbackContext);
+ Thread.sleep(1000);
}
@Test(dependsOnMethods = "testPrepareTask")
diff --git
a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/HelixAssignedParticipantCheckTest.java
b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/HelixAssignedParticipantCheckTest.java
new file mode 100644
index 0000000..eec52b1
--- /dev/null
+++
b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/HelixAssignedParticipantCheckTest.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.cluster;
+
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.InstanceType;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import com.google.common.base.Joiner;
+import com.google.common.collect.ImmutableMap;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigValueFactory;
+
+import org.apache.gobblin.cluster.suite.IntegrationBasicSuite;
+import org.apache.gobblin.commit.CommitStepException;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.testing.AssertWithBackoff;
+
+
+public class HelixAssignedParticipantCheckTest {
+ private IntegrationJobSuite suite;
+ private HelixManager helixManager;
+ private Config helixConfig;
+
+ @BeforeClass
+ public void setUp()
+ throws Exception {
+ //Set up a Gobblin Helix cluster
+ suite = new IntegrationJobSuite();
+
+ helixConfig = suite.getManagerConfig();
+ String clusterName =
helixConfig.getString(GobblinClusterConfigurationKeys.HELIX_CLUSTER_NAME_KEY);
+ String zkConnectString =
helixConfig.getString(GobblinClusterConfigurationKeys.ZK_CONNECTION_STRING_KEY);
+ helixManager = HelixManagerFactory.getZKHelixManager(clusterName,
"TestManager",
+ InstanceType.SPECTATOR, zkConnectString);
+ }
+
+ @Test (groups = {"disabledOnTravis"})
+ //Test disabled on Travis because cluster integration tests are generally
flaky on Travis.
+ public void testExecute() throws Exception {
+ suite.startCluster();
+
+ //Connect to the previously started Helix cluster
+ helixManager.connect();
+
+ //Ensure that Helix has created a workflow
+ AssertWithBackoff.create().maxSleepMs(1000).backoffFactor(1).
+ assertTrue(ClusterIntegrationTest.isTaskStarted(helixManager,
IntegrationJobSuite.JOB_ID), "Waiting for the job to start...");
+
+ //Instantiate config for HelixAssignedParticipantCheck
+ String helixJobId = Joiner.on("_").join(IntegrationJobSuite.JOB_ID,
IntegrationJobSuite.JOB_ID);
+ helixConfig =
helixConfig.withValue(GobblinClusterConfigurationKeys.HELIX_INSTANCE_NAME_KEY,
+ ConfigValueFactory.fromAnyRef(IntegrationBasicSuite.WORKER_INSTANCE_0))
+ .withValue(GobblinClusterConfigurationKeys.HELIX_JOB_ID_KEY,
ConfigValueFactory.fromAnyRef(helixJobId))
+ .withValue(GobblinClusterConfigurationKeys.HELIX_PARTITION_ID_KEY,
ConfigValueFactory.fromAnyRef(0));
+ HelixAssignedParticipantCheck check = new
HelixAssignedParticipantCheck(helixConfig);
+
+ //Ensure that the SleepingTask is running
+
AssertWithBackoff.create().maxSleepMs(100).timeoutMs(2000).backoffFactor(1).
+
assertTrue(ClusterIntegrationTest.isTaskRunning(IntegrationJobSuite.TASK_STATE_FILE),"Waiting
for the task to enter running state");
+
+ //Run the check. Ensure that the configured Helix instance is indeed the
assigned participant
+ // (i.e. no exceptions thrown).
+ check.execute();
+
+ //Create Helix config with invalid partition num. Ensure
HelixAssignedParticipantCheck fails.
+ helixConfig =
helixConfig.withValue(GobblinClusterConfigurationKeys.HELIX_PARTITION_ID_KEY,
ConfigValueFactory.fromAnyRef(1));
+ check = new HelixAssignedParticipantCheck(helixConfig);
+
+ try {
+ check.execute();
+ Assert.fail("Expected to throw CommitStepException");
+ } catch (CommitStepException e) {
+ //Expected to throw CommitStepException
+ Assert.assertTrue(e.getClass().equals(CommitStepException.class));
+ }
+ }
+
+ public void tearDown() throws IOException, InterruptedException {
+ //Shutdown cluster
+ suite.shutdownCluster();
+ if (helixManager.isConnected()) {
+ helixManager.disconnect();
+ }
+ }
+
+ public static class IntegrationJobSuite extends IntegrationBasicSuite {
+ public static final String JOB_ID = "job_testJob_345";
+ public static final String TASK_STATE_FILE = "/tmp/" +
IntegrationJobSuite.class.getSimpleName() + "/taskState/_RUNNING";
+
+
+ @Override
+ protected Map<String, Config> overrideJobConfigs(Config rawJobConfig) {
+ Config newConfig = ConfigFactory.parseMap(ImmutableMap.of(
+ ConfigurationKeys.SOURCE_CLASS_KEY,
"org.apache.gobblin.cluster.SleepingCustomTaskSource",
+ ConfigurationKeys.JOB_ID_KEY, JOB_ID,
+ GobblinClusterConfigurationKeys.HELIX_JOB_TIMEOUT_ENABLED_KEY,
Boolean.TRUE,
+ GobblinClusterConfigurationKeys.HELIX_JOB_TIMEOUT_SECONDS, 10L,
SleepingTask.TASK_STATE_FILE_KEY, TASK_STATE_FILE))
+ .withFallback(rawJobConfig);
+ return ImmutableMap.of(JOB_NAME, newConfig);
+ }
+ }
+}
\ No newline at end of file