This is an automated email from the ASF dual-hosted git repository.
suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 829ec2b [GOBBLIN-1352][GOBBLIN-1319] Ensure Helix workflows are
cleaned up on cluster start up in centralized mode
829ec2b is described below
commit 829ec2bb340a07af9d2a838b20233985e94c7a23
Author: suvasude <[email protected]>
AuthorDate: Thu Jan 7 20:30:48 2021 -0800
[GOBBLIN-1352][GOBBLIN-1319] Ensure Helix workflows are cleaned up on
cluster start up in centralized mode
Closes #3191 from sv2000/helixJobName
---
.../cluster/HelixRetriggeringJobCallable.java | 51 ++++++-------
.../cluster/HelixRetriggeringJobCallableTest.java | 88 ++++++++++++++++++++++
gobblin-cluster/src/test/resources/log4j.xml | 9 +++
3 files changed, 118 insertions(+), 30 deletions(-)
diff --git
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixRetriggeringJobCallable.java
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixRetriggeringJobCallable.java
index 251cb0e..40241a5 100644
---
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixRetriggeringJobCallable.java
+++
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixRetriggeringJobCallable.java
@@ -29,6 +29,8 @@ import org.apache.hadoop.fs.Path;
import org.apache.helix.HelixException;
import org.apache.helix.HelixManager;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
import com.google.common.io.Closer;
import com.google.common.util.concurrent.Striped;
import com.typesafe.config.Config;
@@ -195,33 +197,29 @@ class HelixRetriggeringJobCallable implements Callable {
}
}
+ @VisibleForTesting
+ static GobblinHelixJobLauncher
buildJobLauncherForCentralizedMode(GobblinHelixJobScheduler jobScheduler,
Properties jobProps) throws Exception {
+ //In centralized job launcher mode, the JOB_ID_KEY should be null or
should not contain the
+ //"ActualJob" substring, which is intended for the distributed job
launcher mode.
+ //This ensures that workflows in centralized mode are cleaned up properly
when cluster is restarted.
+ String jobId = jobProps.getProperty(ConfigurationKeys.JOB_ID_KEY);
+ if (jobId != null) {
+
Preconditions.checkArgument(!jobId.contains(GobblinClusterConfigurationKeys.ACTUAL_JOB_NAME_PREFIX),
+ "Job Id should not contain ActualJob in centralized mode.");
+ }
+ return jobScheduler.buildJobLauncher(jobProps);
+ }
+
/**
- * <p> In some cases, the job launcher will be early stopped.
- * It can be due to the large volume of input source data.
- * In such case, we need to re-launch the same job until
- * the job launcher determines it is safe to stop.
+ * A method to run a Gobblin job with ability to re-trigger the job if
neeeded. This method instantiates a
+ * {@link GobblinHelixJobLauncher} and submits the underlying Gobblin job to
a {link GobblinHelixJobScheduler}.
+ * The method will re-submit the job if it has been terminated "early" e.g.
before all data has been pulled.
+ * This method should be called only when distributed job launcher mode is
disabled.
*/
- private void runJobLauncherLoop() throws JobException {
+ private void runJobLauncherLoop() throws JobException {
try {
- // Check if any existing planning job is running
- Optional<String> actualJobIdFromStore =
jobsMapping.getActualJobId(this.jobUri);
-
- if (actualJobIdFromStore.isPresent() &&
!canRun(actualJobIdFromStore.get(), this.jobHelixManager)) {
- return;
- }
-
- String actualJobId = jobProps.containsKey(ConfigurationKeys.JOB_ID_KEY)
- ? jobProps.getProperty(ConfigurationKeys.JOB_ID_KEY) :
HelixJobsMapping.createActualJobId(jobProps);
- log.info("Job {} creates actual job {}", this.jobUri, actualJobId);
-
- jobProps.setProperty(ConfigurationKeys.JOB_ID_KEY, actualJobId);
-
- /* create the job launcher after setting ConfigurationKeys.JOB_ID_KEY */
- currentJobLauncher = this.jobScheduler.buildJobLauncher(jobProps);
-
- this.jobsMapping.setActualJobId(this.jobUri, actualJobId);
-
while (true) {
+ currentJobLauncher = buildJobLauncherForCentralizedMode(jobScheduler,
jobProps);
// in "run once" case, job scheduler will remove current job from the
scheduler
boolean isEarlyStopped = this.jobScheduler.runJob(jobProps,
jobListener, currentJobLauncher);
boolean isRetriggerEnabled = this.isRetriggeringEnabled();
@@ -231,18 +229,11 @@ class HelixRetriggeringJobCallable implements Callable {
break;
}
}
-
} catch (Exception e) {
log.error("Failed to run job {}",
jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY), e);
throw new JobException("Failed to run job " +
jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY), e);
} finally {
currentJobLauncher = null;
- // always cleanup the job mapping for current job name.
- try {
- this.jobsMapping.deleteMapping(jobUri);
- } catch (Exception e) {
- throw new JobException("Cannot delete jobs mapping for job : " +
jobUri);
- }
}
}
diff --git
a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/HelixRetriggeringJobCallableTest.java
b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/HelixRetriggeringJobCallableTest.java
new file mode 100644
index 0000000..779e7dd
--- /dev/null
+++
b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/HelixRetriggeringJobCallableTest.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.cluster;
+
+import java.io.File;
+import java.util.Optional;
+import java.util.Properties;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.helix.HelixManager;
+import org.assertj.core.util.Lists;
+import org.junit.Assert;
+import org.mockito.Mockito;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigValueFactory;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.runtime.api.MutableJobCatalog;
+import org.apache.gobblin.runtime.job_catalog.NonObservingFSJobCatalog;
+import org.apache.gobblin.scheduler.SchedulerService;
+
+
+public class HelixRetriggeringJobCallableTest {
+ public static final String TMP_DIR = "/tmp/" +
HelixRetriggeringJobCallable.class.getSimpleName();
+
+ @BeforeClass
+ public void setUp() {
+ File tmpDir = new File(TMP_DIR);
+ if (!tmpDir.exists()) {
+ tmpDir.mkdirs();
+ }
+ tmpDir.deleteOnExit();
+ }
+
+ @Test
+ public void testBuildJobLauncher()
+ throws Exception {
+ Config config =
ConfigFactory.empty().withValue(ConfigurationKeys.JOB_CONFIG_FILE_GENERAL_PATH_KEY,
+ ConfigValueFactory.fromAnyRef(TMP_DIR));
+ MutableJobCatalog jobCatalog = new NonObservingFSJobCatalog(config);
+ SchedulerService schedulerService = new SchedulerService(new Properties());
+ Path appWorkDir = new Path(TMP_DIR);
+ GobblinHelixJobScheduler jobScheduler = new
GobblinHelixJobScheduler(ConfigFactory.empty(), getMockHelixManager(),
Optional.empty(), null,
+ appWorkDir, Lists.emptyList(), schedulerService, jobCatalog);
+ GobblinHelixJobLauncher jobLauncher =
HelixRetriggeringJobCallable.buildJobLauncherForCentralizedMode(jobScheduler,
getDummyJob());
+ String jobId = jobLauncher.getJobId();
+ Assert.assertNotNull(jobId);
+
Assert.assertFalse(jobId.contains(GobblinClusterConfigurationKeys.ACTUAL_JOB_NAME_PREFIX));
+ }
+
+ private Properties getDummyJob() {
+ Properties jobProps = new Properties();
+ jobProps.setProperty(ConfigurationKeys.JOB_NAME_KEY, "dummyJob");
+ jobProps.setProperty(ConfigurationKeys.JOB_LOCK_ENABLED_KEY, "false");
+ jobProps.setProperty(ConfigurationKeys.STATE_STORE_ENABLED, "false");
+ jobProps.setProperty(ConfigurationKeys.SOURCE_CLASS_KEY,
DummySource.class.getName());
+ return jobProps;
+ }
+
+ private HelixManager getMockHelixManager() {
+ HelixManager helixManager = Mockito.mock(HelixManager.class);
+ Mockito.when(helixManager.getClusterManagmentTool()).thenReturn(null);
+ Mockito.when(helixManager.getClusterName()).thenReturn(null);
+ Mockito.when(helixManager.getHelixDataAccessor()).thenReturn(null);
+ Mockito.when(helixManager.getHelixPropertyStore()).thenReturn(null);
+
+ return helixManager;
+ }
+}
\ No newline at end of file
diff --git a/gobblin-cluster/src/test/resources/log4j.xml
b/gobblin-cluster/src/test/resources/log4j.xml
index 12b4ff9..08b3fc0 100644
--- a/gobblin-cluster/src/test/resources/log4j.xml
+++ b/gobblin-cluster/src/test/resources/log4j.xml
@@ -44,6 +44,15 @@
<appender-ref ref="console" />
</root>
+ <!-- Set log level for Helix/ZK logs to ERROR to avoid Travis build failures
due to exceeding maximum log length. -->
+ <logger name="org.apache.helix">
+ <level value="ERROR"/>
+ </logger>
+
+ <logger name="org.apache.zookeeper">
+ <level value="ERROR"/>
+ </logger>
+
<!-- Swallow annoying exceptions when creating a configuration. -->
<logger name="org.apache.hadoop.conf.Configuration">
<level value="FATAL"/>