This is an automated email from the ASF dual-hosted git repository.

suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 6cca9eb  [GOBBLIN-1043] Implement a Helix assigned participant check 
as a Commi…
6cca9eb is described below

commit 6cca9eb55c9b64556d75de999ba762b5c886378b
Author: sv2000 <[email protected]>
AuthorDate: Fri Feb 7 11:00:26 2020 -0800

    [GOBBLIN-1043] Implement a Helix assigned participant check as a Commi…
    
    Closes #2883 from sv2000/helixSplitBrain
---
 .../apache/gobblin/commit/CommitStepException.java |  30 +++++
 .../cluster/GobblinClusterConfigurationKeys.java   |  10 ++
 .../apache/gobblin/cluster/GobblinHelixTask.java   |  39 +++++-
 .../gobblin/cluster/GobblinHelixTaskFactory.java   |  27 +++-
 .../apache/gobblin/cluster/GobblinTaskRunner.java  |  11 ++
 .../cluster/HelixAssignedParticipantCheck.java     | 144 +++++++++++++++++++++
 .../gobblin/cluster/TaskRunnerSuiteBase.java       |  12 ++
 .../gobblin/cluster/ClusterIntegrationTest.java    |   4 +-
 .../gobblin/cluster/GobblinHelixTaskTest.java      |  24 +++-
 .../cluster/HelixAssignedParticipantCheckTest.java | 124 ++++++++++++++++++
 10 files changed, 414 insertions(+), 11 deletions(-)

diff --git 
a/gobblin-api/src/main/java/org/apache/gobblin/commit/CommitStepException.java 
b/gobblin-api/src/main/java/org/apache/gobblin/commit/CommitStepException.java
new file mode 100644
index 0000000..ef3f90e
--- /dev/null
+++ 
b/gobblin-api/src/main/java/org/apache/gobblin/commit/CommitStepException.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.commit;
+
+import java.io.IOException;
+
+
+public class CommitStepException extends IOException {
+  public CommitStepException(String message, Throwable t) {
+    super(message, t);
+  }
+
+  public CommitStepException(String message) {
+    super(message);
+  }
+}
diff --git 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
index 787130e..27e909d 100644
--- 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
+++ 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
@@ -144,10 +144,12 @@ public class GobblinClusterConfigurationKeys {
   public static final long DEFAULT_HELIX_JOB_STOP_TIMEOUT_SECONDS = 10L;
   public static final String TASK_RUNNER_SUITE_BUILDER = 
GOBBLIN_CLUSTER_PREFIX + "taskRunnerSuite.builder";
 
+  public static final String HELIX_JOB_NAME_KEY = GOBBLIN_CLUSTER_PREFIX + 
"helixJobName";
   public static final String HELIX_JOB_TIMEOUT_ENABLED_KEY = 
"helix.job.timeout.enabled";
   public static final String DEFAULT_HELIX_JOB_TIMEOUT_ENABLED = "false";
   public static final String HELIX_JOB_TIMEOUT_SECONDS = 
"helix.job.timeout.seconds";
   public static final String DEFAULT_HELIX_JOB_TIMEOUT_SECONDS = "10800";
+  public static final String HELIX_TASK_NAME_KEY = GOBBLIN_CLUSTER_PREFIX + 
"helixTaskName";
   public static final String HELIX_TASK_TIMEOUT_SECONDS = 
"helix.task.timeout.seconds";
   public static final String HELIX_TASK_MAX_ATTEMPTS_KEY = 
"helix.task.maxAttempts";
 
@@ -183,4 +185,12 @@ public class GobblinClusterConfigurationKeys {
   public static final boolean DEFAULT_IS_HELIX_CLUSTER_MANAGED = false;
 
   public static final String HADOOP_CONFIG_OVERRIDES_PREFIX = 
GOBBLIN_CLUSTER_PREFIX + "hadoop.inject";
+
+  //Configurations that will be set dynamically when a 
GobblinTaskRunner/GobblinHelixTask are instantiated.
+  public static final String GOBBLIN_HELIX_PREFIX = "gobblin.helix.";
+  public static final String HELIX_JOB_ID_KEY = GOBBLIN_HELIX_PREFIX + "jobId";
+  public static final String HELIX_TASK_ID_KEY = GOBBLIN_HELIX_PREFIX + 
"taskId";
+  public static final String HELIX_PARTITION_ID_KEY = GOBBLIN_HELIX_PREFIX + 
"partitionId" ;
+  public static final String TASK_RUNNER_HOST_NAME_KEY = GOBBLIN_HELIX_PREFIX 
+ "hostName";
+  public static final String CONTAINER_ID_KEY = GOBBLIN_HELIX_PREFIX + 
"containerId";
 }
diff --git 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTask.java
 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTask.java
index e124aca..122a8d5 100644
--- 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTask.java
+++ 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTask.java
@@ -17,19 +17,22 @@
 
 package org.apache.gobblin.cluster;
 
-import java.io.IOException;
 import java.util.Map;
 
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.helix.task.JobContext;
 import org.apache.helix.task.Task;
 import org.apache.helix.task.TaskCallbackContext;
 import org.apache.helix.task.TaskConfig;
+import org.apache.helix.task.TaskDriver;
 import org.apache.helix.task.TaskResult;
 import org.slf4j.MDC;
 
 import com.google.common.base.Throwables;
 import com.google.common.io.Closer;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigValueFactory;
 
 import lombok.extern.slf4j.Slf4j;
 
@@ -67,18 +70,23 @@ public class GobblinHelixTask implements Task {
 
   private String jobName;
   private String jobId;
+  private String helixJobId;
   private String jobKey;
   private String taskId;
   private Path workUnitFilePath;
   private GobblinHelixTaskMetrics taskMetrics;
   private SingleTask task;
+  private String helixTaskId;
 
   public GobblinHelixTask(TaskRunnerSuiteBase.Builder builder,
                           TaskCallbackContext taskCallbackContext,
                           TaskAttemptBuilder taskAttemptBuilder,
                           StateStores stateStores,
-                          GobblinHelixTaskMetrics taskMetrics) {
+                          GobblinHelixTaskMetrics taskMetrics,
+                          TaskDriver taskDriver)
+  {
     this.taskConfig = taskCallbackContext.getTaskConfig();
+    this.helixJobId = taskCallbackContext.getJobConfig().getJobId();
     this.applicationName = builder.getApplicationName();
     this.instanceName = builder.getInstanceName();
     this.taskMetrics = taskMetrics;
@@ -89,19 +97,35 @@ public class GobblinHelixTask implements Task {
                              builder.getAppWorkPath(),
                              this.jobId);
 
+    Config dynamicConfig = builder.getDynamicConfig()
+        .withValue(GobblinClusterConfigurationKeys.TASK_RUNNER_HOST_NAME_KEY, 
ConfigValueFactory.fromAnyRef(builder.getHostName()))
+        .withValue(GobblinClusterConfigurationKeys.CONTAINER_ID_KEY, 
ConfigValueFactory.fromAnyRef(builder.getContainerId()))
+        .withValue(GobblinClusterConfigurationKeys.HELIX_INSTANCE_NAME_KEY, 
ConfigValueFactory.fromAnyRef(builder.getInstanceName()))
+        .withValue(GobblinClusterConfigurationKeys.HELIX_JOB_ID_KEY, 
ConfigValueFactory.fromAnyRef(this.helixJobId))
+        .withValue(GobblinClusterConfigurationKeys.HELIX_TASK_ID_KEY, 
ConfigValueFactory.fromAnyRef(this.helixTaskId));
+
+    Integer partitionNum = getPartitionForHelixTask(taskDriver);
+
+    if (partitionNum == null) {
+      throw new IllegalStateException(String.format("Task %s, job %s on 
instance %s has no partition assigned",
+          this.helixTaskId, builder.getInstanceName(), this.helixJobId));
+    }
+
+    dynamicConfig = 
dynamicConfig.withValue(GobblinClusterConfigurationKeys.HELIX_PARTITION_ID_KEY, 
ConfigValueFactory.fromAnyRef(partitionNum));
     this.task = new SingleTask(this.jobId,
                                this.workUnitFilePath,
                                jobStateFilePath,
                                builder.getFs(),
                                taskAttemptBuilder,
                                stateStores,
-                               builder.getDynamicConfig());
+                               dynamicConfig);
   }
 
   private void getInfoFromTaskConfig() {
     Map<String, String> configMap = this.taskConfig.getConfigMap();
     this.jobName = configMap.get(ConfigurationKeys.JOB_NAME_KEY);
     this.jobId = configMap.get(ConfigurationKeys.JOB_ID_KEY);
+    this.helixTaskId = this.taskConfig.getId();
     this.jobKey = Long.toString(Id.parse(this.jobId).getSequence());
     this.taskId = configMap.get(ConfigurationKeys.TASK_ID_KEY);
     this.workUnitFilePath =
@@ -135,6 +159,15 @@ public class GobblinHelixTask implements Task {
     }
   }
 
+  private Integer getPartitionForHelixTask(TaskDriver taskDriver) {
+    //Get Helix partition id for this task
+    JobContext jobContext = taskDriver.getJobContext(this.helixJobId);
+    if (jobContext != null) {
+      return jobContext.getTaskIdPartitionMap().get(this.helixTaskId);
+    }
+    return null;
+  }
+
   @Override
   public void cancel() {
     log.warn("Gobblin helix task cancellation invoked.");
diff --git 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTaskFactory.java
 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTaskFactory.java
index 6bf3d5f..fdf8b27 100644
--- 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTaskFactory.java
+++ 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTaskFactory.java
@@ -21,11 +21,13 @@ import org.apache.hadoop.fs.Path;
 import org.apache.helix.HelixManager;
 import org.apache.helix.task.Task;
 import org.apache.helix.task.TaskCallbackContext;
+import org.apache.helix.task.TaskDriver;
 import org.apache.helix.task.TaskFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.codahale.metrics.Counter;
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Optional;
 import com.typesafe.config.Config;
 
@@ -54,6 +56,7 @@ public class GobblinHelixTaskFactory implements TaskFactory {
 
   private final Optional<ContainerMetrics> containerMetrics;
   private final HelixManager helixManager;
+  private Optional<TaskDriver> taskDriver;
   private TaskRunnerSuiteBase.Builder builder;
 
   /**
@@ -70,9 +73,23 @@ public class GobblinHelixTaskFactory implements TaskFactory {
   private final TaskAttemptBuilder taskAttemptBuilder;
 
   public GobblinHelixTaskFactory(TaskRunnerSuiteBase.Builder builder,
+      MetricContext metricContext,
+      TaskStateTracker taskStateTracker,
+      Config stateStoreConfig) {
+    this(builder, metricContext, taskStateTracker, stateStoreConfig, 
Optional.absent());
+  }
+
+  /**
+   * Constructor that allows passing in a {@link TaskDriver} instance. This 
constructor is exposed purely for
+   * testing purposes to allow passing in a mock {@link TaskDriver} (e.g. see 
GobblinHelixTaskTest). For other cases, use
+   * the constructor {@link 
#GobblinHelixTaskFactory(TaskRunnerSuiteBase.Builder, MetricContext, 
TaskStateTracker, Config)}.
+   */
+  @VisibleForTesting
+  public GobblinHelixTaskFactory(TaskRunnerSuiteBase.Builder builder,
                                  MetricContext metricContext,
                                  TaskStateTracker taskStateTracker,
-                                 Config stateStoreConfig) {
+                                 Config stateStoreConfig,
+                                 Optional<TaskDriver> taskDriver) {
 
     // initialize task related metrics
     int windowSizeInMin = ConfigUtils.getInt(builder.getConfig(),
@@ -98,6 +115,7 @@ public class GobblinHelixTaskFactory implements TaskFactory {
         appWorkDir,
         GobblinClusterConfigurationKeys.JOB_STATE_DIR_NAME);
     this.taskAttemptBuilder = createTaskAttemptBuilder();
+    this.taskDriver = taskDriver;
   }
 
   private TaskAttemptBuilder createTaskAttemptBuilder() {
@@ -113,6 +131,11 @@ public class GobblinHelixTaskFactory implements 
TaskFactory {
     if (this.newTasksCounter.isPresent()) {
       this.newTasksCounter.get().inc();
     }
-    return new GobblinHelixTask(builder, context, this.taskAttemptBuilder, 
this.stateStores, this.taskMetrics);
+
+    if (!this.taskDriver.isPresent()) {
+      this.taskDriver = Optional.of(new TaskDriver(context.getManager()));
+    }
+
+    return new GobblinHelixTask(builder, context, this.taskAttemptBuilder, 
this.stateStores, this.taskMetrics, this.taskDriver.get());
   }
 }
diff --git 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java
 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java
index 22c21bf..9693227 100644
--- 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java
+++ 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java
@@ -18,7 +18,9 @@
 package org.apache.gobblin.cluster;
 
 import java.io.IOException;
+import java.net.InetAddress;
 import java.net.URI;
+import java.net.UnknownHostException;
 import java.nio.file.Paths;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -180,6 +182,13 @@ public class GobblinTaskRunner implements 
StandardMetricsBridge {
         GobblinClusterConfigurationKeys.TASK_RUNNER_SUITE_BUILDER,
         TaskRunnerSuiteBase.Builder.class.getName());
 
+    String hostName = "";
+    try {
+      hostName = InetAddress.getLocalHost().getHostName();
+    } catch (UnknownHostException e) {
+      logger.warn("Cannot find host name for Helix instance: {}", 
this.helixInstanceName);
+    }
+
     TaskRunnerSuiteBase.Builder builder = 
GobblinConstructorUtils.<TaskRunnerSuiteBase.Builder>invokeLongestConstructor(
           new ClassAliasResolver(TaskRunnerSuiteBase.Builder.class)
               .resolveClass(builderStr), this.config);
@@ -191,6 +200,8 @@ public class GobblinTaskRunner implements 
StandardMetricsBridge {
         .setApplicationId(applicationId)
         .setApplicationName(applicationName)
         .setInstanceName(helixInstanceName)
+        .setContainerId(taskRunnerId)
+        .setHostName(hostName)
         .build();
 
     this.taskStateModelFactory = 
createTaskStateModelFactory(suite.getTaskFactoryMap());
diff --git 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixAssignedParticipantCheck.java
 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixAssignedParticipantCheck.java
new file mode 100644
index 0000000..065eb6f
--- /dev/null
+++ 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixAssignedParticipantCheck.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.cluster;
+
+import java.io.IOException;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.InstanceType;
+import org.apache.helix.task.JobContext;
+import org.apache.helix.task.TaskDriver;
+
+import com.github.rholder.retry.AttemptTimeLimiters;
+import com.github.rholder.retry.RetryException;
+import com.github.rholder.retry.Retryer;
+import com.github.rholder.retry.RetryerBuilder;
+import com.github.rholder.retry.StopStrategies;
+import com.typesafe.config.Config;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.annotation.Alias;
+import org.apache.gobblin.commit.CommitStep;
+import org.apache.gobblin.commit.CommitStepException;
+
+
+/**
+ * A {@link CommitStep} that checks with Helix if a particular Helix instance 
is still the assigned participant for a given
+ * Helix Partition. This {@link CommitStep} implementation is a safety check 
against Helix and is intended to be used
+ * before data is published and state is committed. The primiary motivation 
for this {@link CommitStep} is to avoid a "split-brain"
+ * scenario where a runaway Helix task continues to process a partition even 
though Helix has assigned the same
+ * partition to a different Helix task. This can happen due to inconsistency 
between the state of a task as maintained
+ * by Helix on ZK vs the local state of the task.
+ */
+@Slf4j
+@Alias (value = "HelixParticipantCheck")
+public class HelixAssignedParticipantCheck implements CommitStep {
+  private static volatile HelixManager helixManager = null;
+  private static volatile Retryer<Boolean> retryer = 
RetryerBuilder.<Boolean>newBuilder()
+      .retryIfException()
+      .withStopStrategy(StopStrategies.stopAfterAttempt(3))
+      .withAttemptTimeLimiter(AttemptTimeLimiters.fixedTimeLimit(3000, 
TimeUnit.MILLISECONDS)).build();
+
+  private final String helixInstanceName;
+  private final String helixJob;
+  private final int partitionNum;
+
+  private boolean isCompleted;
+
+  /**
+   * A method that uses the Singleton pattern to instantiate a {@link 
HelixManager} instance.
+   * @param config
+   * @return
+   */
+  public static HelixManager getHelixManager(Config config) {
+    if (helixManager == null) {
+      synchronized (HelixAssignedParticipantCheck.class) {
+        if (helixManager == null) {
+          String zkConnectString = 
config.getString(GobblinClusterConfigurationKeys.ZK_CONNECTION_STRING_KEY);
+          String clusterName = 
config.getString(GobblinClusterConfigurationKeys.HELIX_CLUSTER_NAME_KEY);
+          helixManager = HelixManagerFactory.getZKHelixManager(clusterName, 
HelixAssignedParticipantCheck.class.getSimpleName(),
+              InstanceType.SPECTATOR, zkConnectString);
+        }
+      }
+    }
+    return helixManager;
+  }
+
+  public HelixAssignedParticipantCheck(Config config) {
+    getHelixManager(config);
+    this.helixInstanceName = 
config.getString(GobblinClusterConfigurationKeys.HELIX_INSTANCE_NAME_KEY);
+    this.helixJob = 
config.getString(GobblinClusterConfigurationKeys.HELIX_JOB_ID_KEY);
+    this.partitionNum = 
config.getInt(GobblinClusterConfigurationKeys.HELIX_PARTITION_ID_KEY);
+  }
+
+  /**
+   * Determine whether the commit step has been completed.
+   */
+  @Override
+  public boolean isCompleted()
+      throws IOException {
+    return isCompleted;
+  }
+
+  /**
+   * Execute the commit step.
+   */
+  @Override
+  public void execute() throws CommitStepException {
+    if (!helixManager.isConnected()) {
+      try {
+        helixManager.connect();
+      } catch (Exception e) {
+        throw new CommitStepException(String.format("Helix instance %s unable 
to connect to Helix/ZK", helixInstanceName));
+      }
+    }
+    TaskDriver taskDriver = new TaskDriver(helixManager);
+    log.info(String.format("HelixParticipantCheck step called for Helix 
Instance: %s, Helix job: %s, Helix partition: %d",
+        this.helixInstanceName, this.helixJob, this.partitionNum));
+
+    //Query Helix to get the currently assigned participant for the Helix 
partitionNum
+    Callable callable = () -> {
+      JobContext jobContext = taskDriver.getJobContext(helixJob);
+      if (jobContext != null) {
+        String participant = jobContext.getAssignedParticipant(partitionNum);
+        if (participant != null) {
+          return participant.equalsIgnoreCase(helixInstanceName);
+        }
+      }
+      return false;
+    };
+
+    boolean isParticipant;
+    try {
+      isParticipant = retryer.call(callable);
+    } catch (ExecutionException | RetryException e) {
+      log.error("Cannot complete participant assignment check within the retry 
limit due to: {}", e);
+      //Set isParticipant to true; since we cannot verify the status of the 
Helix Participant at this time.
+      isParticipant = true;
+    }
+
+    this.isCompleted = true;
+    if (!isParticipant) {
+      throw new CommitStepException(String.format("Helix instance %s not the 
assigned participant for partition %d",this.helixInstanceName, 
this.partitionNum));
+    }
+  }
+}
diff --git 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/TaskRunnerSuiteBase.java
 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/TaskRunnerSuiteBase.java
index 3488785..2175b57 100644
--- 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/TaskRunnerSuiteBase.java
+++ 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/TaskRunnerSuiteBase.java
@@ -91,6 +91,8 @@ public abstract class TaskRunnerSuiteBase {
     private String applicationId;
     private String applicationName;
     private String instanceName;
+    private String hostName;
+    private String containerId;
 
     public Builder(Config config) {
       this.dynamicConfig = GobblinClusterUtils.getDynamicConfig(config);
@@ -112,6 +114,16 @@ public abstract class TaskRunnerSuiteBase {
       return this;
     }
 
+    public Builder setContainerId (String containerId) {
+      this.containerId = containerId;
+      return this;
+    }
+
+    public Builder setHostName(String hostName) {
+      this.hostName = hostName;
+      return this;
+    }
+
     public Builder setApplicationId(String applicationId) {
       this.applicationId = applicationId;
       return this;
diff --git 
a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/ClusterIntegrationTest.java
 
b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/ClusterIntegrationTest.java
index 95a6cb8..e85413f 100644
--- 
a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/ClusterIntegrationTest.java
+++ 
b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/ClusterIntegrationTest.java
@@ -180,11 +180,11 @@ public class ClusterIntegrationTest {
     }, "Waiting for Workflow TargetState to be START");
   }
 
-  private Predicate<Void> isTaskStarted(HelixManager helixManager, String 
jobId) {
+  public static Predicate<Void> isTaskStarted(HelixManager helixManager, 
String jobId) {
     return input -> TaskDriver.getWorkflowContext(helixManager, jobId) != null;
   }
 
-  private Predicate<Void> isTaskRunning(String taskStateFileName) {
+  public static Predicate<Void> isTaskRunning(String taskStateFileName) {
     return input -> {
       File taskStateFile = new File(taskStateFileName);
       return taskStateFile.exists();
diff --git 
a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixTaskTest.java
 
b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixTaskTest.java
index 9386bf6..24c597e 100644
--- 
a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixTaskTest.java
+++ 
b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixTaskTest.java
@@ -27,8 +27,11 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.helix.HelixManager;
+import org.apache.helix.task.JobConfig;
+import org.apache.helix.task.JobContext;
 import org.apache.helix.task.TaskCallbackContext;
 import org.apache.helix.task.TaskConfig;
+import org.apache.helix.task.TaskDriver;
 import org.apache.helix.task.TaskResult;
 import org.mockito.Mockito;
 import org.testng.Assert;
@@ -36,7 +39,9 @@ import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
+import com.google.common.base.Joiner;
 import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Maps;
 import com.typesafe.config.ConfigFactory;
 
@@ -48,10 +53,8 @@ import org.apache.gobblin.runtime.AbstractJobLauncher;
 import org.apache.gobblin.runtime.JobState;
 import org.apache.gobblin.runtime.TaskExecutor;
 import org.apache.gobblin.source.workunit.WorkUnit;
-import org.apache.gobblin.util.ClassAliasResolver;
 import org.apache.gobblin.util.Id;
 import org.apache.gobblin.util.SerializationUtils;
-import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
 import org.apache.gobblin.writer.AvroDataWriterBuilder;
 import org.apache.gobblin.writer.Destination;
 import org.apache.gobblin.writer.WriterOutputFormat;
@@ -102,7 +105,8 @@ public class GobblinHelixTaskTest {
   }
 
   @Test
-  public void testPrepareTask() throws IOException {
+  public void testPrepareTask()
+      throws IOException, InterruptedException {
     // Serialize the JobState that will be read later in GobblinHelixTask
     Path jobStateFilePath =
         new Path(appWorkDir, TestHelper.TEST_JOB_ID + "." + 
AbstractJobLauncher.JOB_STATE_FILE_NAME);
@@ -141,7 +145,17 @@ public class GobblinHelixTaskTest {
     TaskCallbackContext taskCallbackContext = 
Mockito.mock(TaskCallbackContext.class);
     Mockito.when(taskCallbackContext.getTaskConfig()).thenReturn(taskConfig);
     
Mockito.when(taskCallbackContext.getManager()).thenReturn(this.helixManager);
+    String helixJobId = Joiner.on("_").join(TestHelper.TEST_JOB_ID, 
TestHelper.TEST_JOB_ID);
+    JobConfig jobConfig = Mockito.mock(JobConfig.class);
 
+    Mockito.when(jobConfig.getJobId()).thenReturn(helixJobId);
+    Mockito.when(taskCallbackContext.getJobConfig()).thenReturn(jobConfig);
+    JobContext mockJobContext = Mockito.mock(JobContext.class);
+    Map<String, Integer> taskIdPartitionMap = 
ImmutableMap.of(taskConfig.getId(), 0);
+    
Mockito.when(mockJobContext.getTaskIdPartitionMap()).thenReturn(taskIdPartitionMap);
+
+    TaskDriver taskDriver = Mockito.mock(TaskDriver.class);
+    
Mockito.when(taskDriver.getJobContext(Mockito.anyString())).thenReturn(mockJobContext);
 
     TaskRunnerSuiteBase.Builder builder = new 
TaskRunnerSuiteBase.Builder(ConfigFactory.empty());
     TaskRunnerSuiteBase sb = builder.setInstanceName("TestInstance")
@@ -157,9 +171,11 @@ public class GobblinHelixTaskTest {
         new GobblinHelixTaskFactory(builder,
                                     sb.metricContext,
                                     this.taskStateTracker,
-                                    ConfigFactory.empty());
+                                    ConfigFactory.empty(),
+                                    Optional.of(taskDriver));
 
     this.gobblinHelixTask = (GobblinHelixTask) 
gobblinHelixTaskFactory.createNewTask(taskCallbackContext);
+    Thread.sleep(1000);
   }
 
   @Test(dependsOnMethods = "testPrepareTask")
diff --git 
a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/HelixAssignedParticipantCheckTest.java
 
b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/HelixAssignedParticipantCheckTest.java
new file mode 100644
index 0000000..eec52b1
--- /dev/null
+++ 
b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/HelixAssignedParticipantCheckTest.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.cluster;
+
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.InstanceType;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import com.google.common.base.Joiner;
+import com.google.common.collect.ImmutableMap;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigValueFactory;
+
+import org.apache.gobblin.cluster.suite.IntegrationBasicSuite;
+import org.apache.gobblin.commit.CommitStepException;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.testing.AssertWithBackoff;
+
+
+public class HelixAssignedParticipantCheckTest {
+  private IntegrationJobSuite suite;
+  private HelixManager helixManager;
+  private Config helixConfig;
+
+  @BeforeClass
+  public void setUp()
+      throws Exception {
+    //Set up a Gobblin Helix cluster
+    suite = new IntegrationJobSuite();
+
+    helixConfig = suite.getManagerConfig();
+    String clusterName = 
helixConfig.getString(GobblinClusterConfigurationKeys.HELIX_CLUSTER_NAME_KEY);
+    String zkConnectString = 
helixConfig.getString(GobblinClusterConfigurationKeys.ZK_CONNECTION_STRING_KEY);
+    helixManager = HelixManagerFactory.getZKHelixManager(clusterName, 
"TestManager",
+        InstanceType.SPECTATOR, zkConnectString);
+  }
+
+  @Test (groups = {"disabledOnTravis"})
+  //Test disabled on Travis because cluster integration tests are generally 
flaky on Travis.
+  public void testExecute() throws Exception {
+    suite.startCluster();
+
+    //Connect to the previously started Helix cluster
+    helixManager.connect();
+
+    //Ensure that Helix has created a workflow
+    AssertWithBackoff.create().maxSleepMs(1000).backoffFactor(1).
+        assertTrue(ClusterIntegrationTest.isTaskStarted(helixManager, 
IntegrationJobSuite.JOB_ID), "Waiting for the job to start...");
+
+    //Instantiate config for HelixAssignedParticipantCheck
+    String helixJobId = Joiner.on("_").join(IntegrationJobSuite.JOB_ID, 
IntegrationJobSuite.JOB_ID);
+    helixConfig = 
helixConfig.withValue(GobblinClusterConfigurationKeys.HELIX_INSTANCE_NAME_KEY,
+        ConfigValueFactory.fromAnyRef(IntegrationBasicSuite.WORKER_INSTANCE_0))
+        .withValue(GobblinClusterConfigurationKeys.HELIX_JOB_ID_KEY, 
ConfigValueFactory.fromAnyRef(helixJobId))
+        .withValue(GobblinClusterConfigurationKeys.HELIX_PARTITION_ID_KEY, 
ConfigValueFactory.fromAnyRef(0));
+    HelixAssignedParticipantCheck check = new 
HelixAssignedParticipantCheck(helixConfig);
+
+    //Ensure that the SleepingTask is running
+    
AssertWithBackoff.create().maxSleepMs(100).timeoutMs(2000).backoffFactor(1).
+        
assertTrue(ClusterIntegrationTest.isTaskRunning(IntegrationJobSuite.TASK_STATE_FILE),"Waiting
 for the task to enter running state");
+
+    //Run the check. Ensure that the configured Helix instance is indeed the 
assigned participant
+    // (i.e. no exceptions thrown).
+    check.execute();
+
+    //Create Helix config with invalid partition num. Ensure 
HelixAssignedParticipantCheck fails.
+    helixConfig = 
helixConfig.withValue(GobblinClusterConfigurationKeys.HELIX_PARTITION_ID_KEY, 
ConfigValueFactory.fromAnyRef(1));
+    check = new HelixAssignedParticipantCheck(helixConfig);
+
+    try {
+      check.execute();
+      Assert.fail("Expected to throw CommitStepException");
+    } catch (CommitStepException e) {
+      //Expected to throw CommitStepException
+      Assert.assertTrue(e.getClass().equals(CommitStepException.class));
+    }
+  }
+
+  public void tearDown() throws IOException, InterruptedException {
+    //Shutdown cluster
+    suite.shutdownCluster();
+    if (helixManager.isConnected()) {
+      helixManager.disconnect();
+    }
+  }
+
+  public static class IntegrationJobSuite extends IntegrationBasicSuite {
+    public static final String JOB_ID = "job_testJob_345";
+    public static final String TASK_STATE_FILE = "/tmp/" + 
IntegrationJobSuite.class.getSimpleName() + "/taskState/_RUNNING";
+
+
+    @Override
+    protected Map<String, Config> overrideJobConfigs(Config rawJobConfig) {
+      Config newConfig = ConfigFactory.parseMap(ImmutableMap.of(
+          ConfigurationKeys.SOURCE_CLASS_KEY, 
"org.apache.gobblin.cluster.SleepingCustomTaskSource",
+          ConfigurationKeys.JOB_ID_KEY, JOB_ID,
+          GobblinClusterConfigurationKeys.HELIX_JOB_TIMEOUT_ENABLED_KEY, 
Boolean.TRUE,
+          GobblinClusterConfigurationKeys.HELIX_JOB_TIMEOUT_SECONDS, 10L, 
SleepingTask.TASK_STATE_FILE_KEY, TASK_STATE_FILE))
+          .withFallback(rawJobConfig);
+      return ImmutableMap.of(JOB_NAME, newConfig);
+    }
+  }
+}
\ No newline at end of file

Reply via email to