This is an automated email from the ASF dual-hosted git repository.

suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 829ec2b  [GOBBLIN-1352][GOBBLIN-1319] Ensure Helix workflows are 
cleaned up on cluster start up in centralized mode
829ec2b is described below

commit 829ec2bb340a07af9d2a838b20233985e94c7a23
Author: suvasude <[email protected]>
AuthorDate: Thu Jan 7 20:30:48 2021 -0800

    [GOBBLIN-1352][GOBBLIN-1319] Ensure Helix workflows are cleaned up on 
cluster start up in centralized mode
    
    Closes #3191 from sv2000/helixJobName
---
 .../cluster/HelixRetriggeringJobCallable.java      | 51 ++++++-------
 .../cluster/HelixRetriggeringJobCallableTest.java  | 88 ++++++++++++++++++++++
 gobblin-cluster/src/test/resources/log4j.xml       |  9 +++
 3 files changed, 118 insertions(+), 30 deletions(-)

diff --git 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixRetriggeringJobCallable.java
 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixRetriggeringJobCallable.java
index 251cb0e..40241a5 100644
--- 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixRetriggeringJobCallable.java
+++ 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixRetriggeringJobCallable.java
@@ -29,6 +29,8 @@ import org.apache.hadoop.fs.Path;
 import org.apache.helix.HelixException;
 import org.apache.helix.HelixManager;
 
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
 import com.google.common.io.Closer;
 import com.google.common.util.concurrent.Striped;
 import com.typesafe.config.Config;
@@ -195,33 +197,29 @@ class HelixRetriggeringJobCallable implements Callable {
     }
   }
 
+  @VisibleForTesting
+  static GobblinHelixJobLauncher 
buildJobLauncherForCentralizedMode(GobblinHelixJobScheduler jobScheduler, 
Properties jobProps) throws Exception {
+    //In centralized job launcher mode, the JOB_ID_KEY should be null or 
should not contain the
+    //"ActualJob" substring, which is intended for the distributed job 
launcher mode.
+    //This ensures that workflows in centralized mode are cleaned up properly 
when cluster is restarted.
+    String jobId = jobProps.getProperty(ConfigurationKeys.JOB_ID_KEY);
+    if (jobId != null) {
+      
Preconditions.checkArgument(!jobId.contains(GobblinClusterConfigurationKeys.ACTUAL_JOB_NAME_PREFIX),
+          "Job Id should not contain ActualJob in centralized mode.");
+    }
+    return jobScheduler.buildJobLauncher(jobProps);
+  }
+
   /**
-   * <p> In some cases, the job launcher will be early stopped.
-   * It can be due to the large volume of input source data.
-   * In such case, we need to re-launch the same job until
-   * the job launcher determines it is safe to stop.
+   * A method to run a Gobblin job with ability to re-trigger the job if 
neeeded. This method instantiates a
+   * {@link GobblinHelixJobLauncher} and submits the underlying Gobblin job to 
a {link GobblinHelixJobScheduler}.
+   * The method will re-submit the job if it has been terminated "early" e.g. 
before all data has been pulled.
+   * This method should be called only when distributed job launcher mode is 
disabled.
    */
-  private void runJobLauncherLoop() throws JobException {
+   private void runJobLauncherLoop() throws JobException {
     try {
-      // Check if any existing planning job is running
-      Optional<String> actualJobIdFromStore = 
jobsMapping.getActualJobId(this.jobUri);
-
-      if (actualJobIdFromStore.isPresent() && 
!canRun(actualJobIdFromStore.get(), this.jobHelixManager)) {
-        return;
-      }
-
-      String actualJobId = jobProps.containsKey(ConfigurationKeys.JOB_ID_KEY)
-              ? jobProps.getProperty(ConfigurationKeys.JOB_ID_KEY) : 
HelixJobsMapping.createActualJobId(jobProps);
-      log.info("Job {} creates actual job {}", this.jobUri, actualJobId);
-
-      jobProps.setProperty(ConfigurationKeys.JOB_ID_KEY, actualJobId);
-
-      /* create the job launcher after setting ConfigurationKeys.JOB_ID_KEY */
-      currentJobLauncher = this.jobScheduler.buildJobLauncher(jobProps);
-
-      this.jobsMapping.setActualJobId(this.jobUri, actualJobId);
-
       while (true) {
+        currentJobLauncher = buildJobLauncherForCentralizedMode(jobScheduler, 
jobProps);
         // in "run once" case, job scheduler will remove current job from the 
scheduler
         boolean isEarlyStopped = this.jobScheduler.runJob(jobProps, 
jobListener, currentJobLauncher);
         boolean isRetriggerEnabled = this.isRetriggeringEnabled();
@@ -231,18 +229,11 @@ class HelixRetriggeringJobCallable implements Callable {
           break;
         }
       }
-
     } catch (Exception e) {
       log.error("Failed to run job {}", 
jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY), e);
       throw new JobException("Failed to run job " + 
jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY), e);
     } finally {
       currentJobLauncher = null;
-      // always cleanup the job mapping for current job name.
-      try {
-        this.jobsMapping.deleteMapping(jobUri);
-      } catch (Exception e) {
-        throw new JobException("Cannot delete jobs mapping for job : " + 
jobUri);
-      }
     }
   }
 
diff --git 
a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/HelixRetriggeringJobCallableTest.java
 
b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/HelixRetriggeringJobCallableTest.java
new file mode 100644
index 0000000..779e7dd
--- /dev/null
+++ 
b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/HelixRetriggeringJobCallableTest.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.cluster;
+
+import java.io.File;
+import java.util.Optional;
+import java.util.Properties;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.helix.HelixManager;
+import org.assertj.core.util.Lists;
+import org.junit.Assert;
+import org.mockito.Mockito;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigValueFactory;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.runtime.api.MutableJobCatalog;
+import org.apache.gobblin.runtime.job_catalog.NonObservingFSJobCatalog;
+import org.apache.gobblin.scheduler.SchedulerService;
+
+
+public class HelixRetriggeringJobCallableTest {
+  public static final String TMP_DIR = "/tmp/" + 
HelixRetriggeringJobCallable.class.getSimpleName();
+
+  @BeforeClass
+  public void setUp() {
+    File tmpDir = new File(TMP_DIR);
+    if (!tmpDir.exists()) {
+      tmpDir.mkdirs();
+    }
+    tmpDir.deleteOnExit();
+  }
+
+  @Test
+  public void testBuildJobLauncher()
+      throws Exception {
+    Config config = 
ConfigFactory.empty().withValue(ConfigurationKeys.JOB_CONFIG_FILE_GENERAL_PATH_KEY,
+        ConfigValueFactory.fromAnyRef(TMP_DIR));
+    MutableJobCatalog jobCatalog = new NonObservingFSJobCatalog(config);
+    SchedulerService schedulerService = new SchedulerService(new Properties());
+    Path appWorkDir = new Path(TMP_DIR);
+    GobblinHelixJobScheduler jobScheduler = new 
GobblinHelixJobScheduler(ConfigFactory.empty(), getMockHelixManager(), 
Optional.empty(), null,
+        appWorkDir, Lists.emptyList(), schedulerService, jobCatalog);
+    GobblinHelixJobLauncher jobLauncher = 
HelixRetriggeringJobCallable.buildJobLauncherForCentralizedMode(jobScheduler, 
getDummyJob());
+    String jobId = jobLauncher.getJobId();
+    Assert.assertNotNull(jobId);
+    
Assert.assertFalse(jobId.contains(GobblinClusterConfigurationKeys.ACTUAL_JOB_NAME_PREFIX));
+  }
+
+  private Properties getDummyJob() {
+    Properties jobProps = new Properties();
+    jobProps.setProperty(ConfigurationKeys.JOB_NAME_KEY, "dummyJob");
+    jobProps.setProperty(ConfigurationKeys.JOB_LOCK_ENABLED_KEY, "false");
+    jobProps.setProperty(ConfigurationKeys.STATE_STORE_ENABLED, "false");
+    jobProps.setProperty(ConfigurationKeys.SOURCE_CLASS_KEY, 
DummySource.class.getName());
+    return jobProps;
+  }
+
+  private HelixManager getMockHelixManager() {
+    HelixManager helixManager = Mockito.mock(HelixManager.class);
+    Mockito.when(helixManager.getClusterManagmentTool()).thenReturn(null);
+    Mockito.when(helixManager.getClusterName()).thenReturn(null);
+    Mockito.when(helixManager.getHelixDataAccessor()).thenReturn(null);
+    Mockito.when(helixManager.getHelixPropertyStore()).thenReturn(null);
+
+    return helixManager;
+  }
+}
\ No newline at end of file
diff --git a/gobblin-cluster/src/test/resources/log4j.xml 
b/gobblin-cluster/src/test/resources/log4j.xml
index 12b4ff9..08b3fc0 100644
--- a/gobblin-cluster/src/test/resources/log4j.xml
+++ b/gobblin-cluster/src/test/resources/log4j.xml
@@ -44,6 +44,15 @@
     <appender-ref ref="console" />
   </root>
 
+  <!-- Set log level for Helix/ZK logs to ERROR to avoid Travis build failures 
due to exceeding maximum log length. -->
+  <logger name="org.apache.helix">
+    <level value="ERROR"/>
+  </logger>
+
+  <logger name="org.apache.zookeeper">
+    <level value="ERROR"/>
+  </logger>
+
   <!-- Swallow annoying exceptions when creating a configuration. -->
   <logger name="org.apache.hadoop.conf.Configuration">
     <level value="FATAL"/>

Reply via email to