This is an automated email from the ASF dual-hosted git repository.
zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 5301316bc fix helix job wait completion bug when job goes to STOPPING
state (#3556)
5301316bc is described below
commit 5301316bc98e68ac88f49f749ae73db876b2704b
Author: Hanghang Nate Liu <[email protected]>
AuthorDate: Fri Sep 9 11:07:10 2022 -0700
fix helix job wait completion bug when job goes to STOPPING state (#3556)
address comments
update stoppingStateEndTime with currentTime
update test cases
---
.../org/apache/gobblin/cluster/HelixUtils.java | 13 +-
.../cluster/GobblinHelixJobLauncherTest.java | 2 +-
.../cluster/GobblinHelixJobSchedulerTest.java | 198 +++++++++++++++++++++
.../resources/GobblinHelixJobSchedulerTest.conf | 34 ++++
4 files changed, 241 insertions(+), 6 deletions(-)
diff --git
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java
index 63149c419..185726c6b 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java
@@ -254,13 +254,13 @@ public class HelixUtils {
Optional<Long> timeoutInSeconds, Long stoppingStateTimeoutInSeconds)
throws InterruptedException, TimeoutException {
log.info("Waiting for job {} to complete...", jobName);
long endTime = 0;
- long currentTimeMillis = System.currentTimeMillis();
+ long jobStartTimeMillis = System.currentTimeMillis();
if (timeoutInSeconds.isPresent()) {
- endTime = currentTimeMillis + timeoutInSeconds.get() * 1000;
+ endTime = jobStartTimeMillis + timeoutInSeconds.get() * 1000;
}
- long stoppingStateEndTime = currentTimeMillis +
stoppingStateTimeoutInSeconds * 1000;
+ Long stoppingStateEndTime = null;
while (!timeoutInSeconds.isPresent() || System.currentTimeMillis() <=
endTime) {
WorkflowContext workflowContext =
TaskDriver.getWorkflowContext(helixManager, workFlowName);
@@ -278,13 +278,16 @@ public class HelixUtils {
case STOPPING:
log.info("Waiting for job {} to complete... State - {}", jobName,
jobState);
Thread.sleep(TimeUnit.SECONDS.toMillis(1L));
+ if (stoppingStateEndTime == null) {
+ stoppingStateEndTime = System.currentTimeMillis() +
stoppingStateTimeoutInSeconds * 1000;
+ }
// Workaround for a Helix bug where a job may be stuck in the
STOPPING state due to an unresponsive task.
if (System.currentTimeMillis() > stoppingStateEndTime) {
- log.info("Deleting workflow {}", workFlowName);
+ log.info("Deleting workflow {} since it stuck in STOPPING state
for more than {} seconds", workFlowName, stoppingStateTimeoutInSeconds);
new TaskDriver(helixManager).delete(workFlowName);
log.info("Deleted workflow {}", workFlowName);
+ return;
}
- return;
default:
log.info("Waiting for job {} to complete... State - {}", jobName,
jobState);
Thread.sleep(TimeUnit.SECONDS.toMillis(10L));
diff --git
a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobLauncherTest.java
b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobLauncherTest.java
index 2aed6f562..e171591d5 100644
---
a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobLauncherTest.java
+++
b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobLauncherTest.java
@@ -175,7 +175,7 @@ public class GobblinHelixJobLauncherTest {
this.thread.start();
}
- private Properties generateJobProperties(Config baseConfig, String
jobNameSuffix, String jobIdSuffix) {
+ static Properties generateJobProperties(Config baseConfig, String
jobNameSuffix, String jobIdSuffix) {
Properties properties = ConfigUtils.configToProperties(baseConfig);
String jobName = properties.getProperty(ConfigurationKeys.JOB_NAME_KEY) +
jobNameSuffix;
diff --git
a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobSchedulerTest.java
b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobSchedulerTest.java
new file mode 100644
index 000000000..498e2530a
--- /dev/null
+++
b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobSchedulerTest.java
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.cluster;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URL;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.curator.test.TestingServer;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.InstanceType;
+import org.assertj.core.util.Lists;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import com.google.common.base.Optional;
+import com.google.common.eventbus.EventBus;
+import com.google.common.io.Closer;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigValueFactory;
+
+import org.apache.gobblin.cluster.event.NewJobConfigArrivalEvent;
+import org.apache.gobblin.cluster.event.UpdateJobConfigArrivalEvent;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.runtime.job_catalog.NonObservingFSJobCatalog;
+import org.apache.gobblin.scheduler.SchedulerService;
+
+
+/**
+ * Unit tests for {@link org.apache.gobblin.cluster.GobblinHelixJobScheduler}.
+ *
+ */
+@Test(groups = {"gobblin.cluster"})
+public class GobblinHelixJobSchedulerTest {
+ public final static Logger LOG =
LoggerFactory.getLogger(GobblinHelixJobSchedulerTest.class);
+
+ private HelixManager helixManager;
+ private FileSystem localFs;
+ private Path appWorkDir;
+ private final Closer closer = Closer.create();
+ private Config baseConfig;
+
+ private GobblinTaskRunner gobblinTaskRunner;
+
+ private Thread thread;
+
+ private final String workflowIdSuffix1 = "_1504201348471";
+ private final String workflowIdSuffix2 = "_1504201348472";
+
+ @BeforeClass
+ public void setUp()
+ throws Exception {
+ TestingServer testingZKServer = this.closer.register(new
TestingServer(-1));
+ LOG.info("Testing ZK Server listening on: " +
testingZKServer.getConnectString());
+
+ URL url = GobblinHelixJobSchedulerTest.class.getClassLoader()
+ .getResource(GobblinHelixJobSchedulerTest.class.getSimpleName() +
".conf");
+ Assert.assertNotNull(url, "Could not find resource " + url);
+
+ this.appWorkDir = new
Path(GobblinHelixJobSchedulerTest.class.getSimpleName());
+
+ // Prepare the source Json file
+ File sourceJsonFile = new File(this.appWorkDir.toString(),
TestHelper.TEST_JOB_NAME + ".json");
+ TestHelper.createSourceJsonFile(sourceJsonFile);
+
+ baseConfig =
ConfigFactory.parseURL(url).withValue("gobblin.cluster.zk.connection.string",
+ ConfigValueFactory.fromAnyRef(testingZKServer.getConnectString()))
+ .withValue(ConfigurationKeys.SOURCE_FILEBASED_FILES_TO_PULL,
+ ConfigValueFactory.fromAnyRef(sourceJsonFile.getAbsolutePath()))
+ .withValue(ConfigurationKeys.JOB_STATE_IN_STATE_STORE,
ConfigValueFactory.fromAnyRef("true")).resolve();
+
+ String zkConnectingString =
baseConfig.getString(GobblinClusterConfigurationKeys.ZK_CONNECTION_STRING_KEY);
+ String helixClusterName =
baseConfig.getString(GobblinClusterConfigurationKeys.HELIX_CLUSTER_NAME_KEY);
+
+ HelixUtils.createGobblinHelixCluster(zkConnectingString, helixClusterName);
+
+ this.helixManager = HelixManagerFactory
+ .getZKHelixManager(helixClusterName,
TestHelper.TEST_HELIX_INSTANCE_NAME, InstanceType.CONTROLLER,
+ zkConnectingString);
+ this.closer.register(() -> helixManager.disconnect());
+ this.helixManager.connect();
+
+ this.localFs = FileSystem.getLocal(new Configuration());
+
+ this.closer.register(() -> {
+ if (localFs.exists(appWorkDir)) {
+ localFs.delete(appWorkDir, true);
+ }
+ });
+
+ this.closer.register(() -> {
+ if (localFs.exists(appWorkDir)) {
+ localFs.delete(appWorkDir, true);
+ }
+ });
+
+ this.gobblinTaskRunner =
+ new GobblinTaskRunner(TestHelper.TEST_APPLICATION_NAME,
TestHelper.TEST_HELIX_INSTANCE_NAME,
+ TestHelper.TEST_APPLICATION_ID, TestHelper.TEST_TASK_RUNNER_ID,
baseConfig, Optional.of(appWorkDir));
+
+ this.thread = new Thread(() -> gobblinTaskRunner.start());
+ this.thread.start();
+ }
+
+ @Test
+ public void testNewJobAndUpdate()
+ throws Exception {
+ Config config =
ConfigFactory.empty().withValue(ConfigurationKeys.JOB_CONFIG_FILE_GENERAL_PATH_KEY,
+ ConfigValueFactory.fromAnyRef("/tmp/" +
GobblinHelixJobScheduler.class.getSimpleName()));
+ SchedulerService schedulerService = new SchedulerService(new Properties());
+ NonObservingFSJobCatalog jobCatalog = new NonObservingFSJobCatalog(config);
+ jobCatalog.startAsync();
+ GobblinHelixJobScheduler jobScheduler =
+ new GobblinHelixJobScheduler(ConfigFactory.empty(), this.helixManager,
java.util.Optional.empty(),
+ new EventBus(), appWorkDir, Lists.emptyList(), schedulerService,
jobCatalog);
+
+ final Properties properties1 =
+ GobblinHelixJobLauncherTest.generateJobProperties(this.baseConfig,
"1", workflowIdSuffix1);
+
properties1.setProperty(GobblinClusterConfigurationKeys.CANCEL_RUNNING_JOB_ON_DELETE,
"true");
+
+ NewJobConfigArrivalEvent newJobConfigArrivalEvent =
+ new
NewJobConfigArrivalEvent(properties1.getProperty(ConfigurationKeys.JOB_NAME_KEY),
properties1);
+ jobScheduler.handleNewJobConfigArrival(newJobConfigArrivalEvent);
+ properties1.setProperty(ConfigurationKeys.JOB_ID_KEY,
+ "job_" + properties1.getProperty(ConfigurationKeys.JOB_NAME_KEY) +
workflowIdSuffix2);
+ Map<String, String> workflowIdMap;
+ this.helixManager.connect();
+
+ String workFlowId = null;
+ long endTime = System.currentTimeMillis() + 30000;
+ while (System.currentTimeMillis() < endTime) {
+ workflowIdMap = HelixUtils.getWorkflowIdsFromJobNames(this.helixManager,
+ Collections.singletonList(newJobConfigArrivalEvent.getJobName()));
+ if (workflowIdMap.containsKey(newJobConfigArrivalEvent.getJobName())) {
+ workFlowId = workflowIdMap.get(newJobConfigArrivalEvent.getJobName());
+ break;
+ }
+ Thread.sleep(100);
+ }
+ Assert.assertNotNull(workFlowId);
+ Assert.assertTrue(workFlowId.endsWith(workflowIdSuffix1));
+
+ jobScheduler.handleUpdateJobConfigArrival(
+ new
UpdateJobConfigArrivalEvent(properties1.getProperty(ConfigurationKeys.JOB_NAME_KEY),
properties1));
+ this.helixManager.connect();
+ endTime = System.currentTimeMillis() + 30000;
+ while (System.currentTimeMillis() < endTime) {
+ workflowIdMap = HelixUtils.getWorkflowIdsFromJobNames(this.helixManager,
+ Collections.singletonList(newJobConfigArrivalEvent.getJobName()));
+ if (workflowIdMap.containsKey(newJobConfigArrivalEvent.getJobName())) {
+ workFlowId = workflowIdMap.get(newJobConfigArrivalEvent.getJobName());
+ break;
+ }
+ Thread.sleep(100);
+ }
+ Assert.assertTrue(workFlowId.endsWith(workflowIdSuffix2));
+ }
+
+ @AfterClass
+ public void tearDown()
+ throws IOException {
+ try {
+ this.gobblinTaskRunner.stop();
+ this.thread.join();
+ } catch (InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ } finally {
+ this.closer.close();
+ }
+ }
+}
diff --git
a/gobblin-cluster/src/test/resources/GobblinHelixJobSchedulerTest.conf
b/gobblin-cluster/src/test/resources/GobblinHelixJobSchedulerTest.conf
new file mode 100644
index 000000000..a684c1d19
--- /dev/null
+++ b/gobblin-cluster/src/test/resources/GobblinHelixJobSchedulerTest.conf
@@ -0,0 +1,34 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+include "reference"
+
+# Cluster / Helix configuration properties
+gobblin.cluster.helix.cluster.name=GobblinHelixJobSchedulerTest
+gobblin.cluster.work.dir=GobblinHelixJobSchedulerTest
+gobblin.cluster.zk.connection.string="localhost:3088"
+
+# Gobblin job configuration properties
+job.name=GobblinHelixJobSchedulerTest
+job.group=test
+source.class=org.apache.gobblin.example.simplejson.SimpleJsonSource
+converter.classes=org.apache.gobblin.example.simplejson.SimpleJsonConverter
+writer.file.name="foo.avro"
+writer.file.path=avro
+writer.builder.class=org.apache.gobblin.writer.AvroDataWriterBuilder
+source.schema="{\"namespace\":\"example.avro\", \"type\":\"record\",
\"name\":\"User\", \"fields\":[{\"name\":\"name\", \"type\":\"string\"},
{\"name\":\"favorite_number\", \"type\":\"int\"},
{\"name\":\"favorite_color\", \"type\":\"string\"}]}"
+metrics.enabled=true
\ No newline at end of file