This is an automated email from the ASF dual-hosted git repository.

zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 5301316bc fix helix job wait completion bug when job goes to STOPPING 
state (#3556)
5301316bc is described below

commit 5301316bc98e68ac88f49f749ae73db876b2704b
Author: Hanghang Nate Liu <[email protected]>
AuthorDate: Fri Sep 9 11:07:10 2022 -0700

    fix helix job wait completion bug when job goes to STOPPING state (#3556)
    
    address comments
    
    update stoppingStateEndTime with currentTime
    
    update test cases
---
 .../org/apache/gobblin/cluster/HelixUtils.java     |  13 +-
 .../cluster/GobblinHelixJobLauncherTest.java       |   2 +-
 .../cluster/GobblinHelixJobSchedulerTest.java      | 198 +++++++++++++++++++++
 .../resources/GobblinHelixJobSchedulerTest.conf    |  34 ++++
 4 files changed, 241 insertions(+), 6 deletions(-)

diff --git 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java
index 63149c419..185726c6b 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java
@@ -254,13 +254,13 @@ public class HelixUtils {
       Optional<Long> timeoutInSeconds, Long stoppingStateTimeoutInSeconds) 
throws InterruptedException, TimeoutException {
     log.info("Waiting for job {} to complete...", jobName);
     long endTime = 0;
-    long currentTimeMillis = System.currentTimeMillis();
+    long jobStartTimeMillis = System.currentTimeMillis();
 
     if (timeoutInSeconds.isPresent()) {
-      endTime = currentTimeMillis + timeoutInSeconds.get() * 1000;
+      endTime = jobStartTimeMillis + timeoutInSeconds.get() * 1000;
     }
 
-    long stoppingStateEndTime = currentTimeMillis + 
stoppingStateTimeoutInSeconds * 1000;
+    Long stoppingStateEndTime = null;
 
     while (!timeoutInSeconds.isPresent() || System.currentTimeMillis() <= 
endTime) {
       WorkflowContext workflowContext = 
TaskDriver.getWorkflowContext(helixManager, workFlowName);
@@ -278,13 +278,16 @@ public class HelixUtils {
           case STOPPING:
             log.info("Waiting for job {} to complete... State - {}", jobName, 
jobState);
             Thread.sleep(TimeUnit.SECONDS.toMillis(1L));
+            if (stoppingStateEndTime == null) {
+              stoppingStateEndTime = System.currentTimeMillis() + 
stoppingStateTimeoutInSeconds * 1000;
+            }
             // Workaround for a Helix bug where a job may be stuck in the 
STOPPING state due to an unresponsive task.
             if (System.currentTimeMillis() > stoppingStateEndTime) {
-              log.info("Deleting workflow {}", workFlowName);
+              log.info("Deleting workflow {} since it stuck in STOPPING state  
for more than {} seconds", workFlowName, stoppingStateTimeoutInSeconds);
               new TaskDriver(helixManager).delete(workFlowName);
               log.info("Deleted workflow {}", workFlowName);
+              return;
             }
-            return;
           default:
             log.info("Waiting for job {} to complete... State - {}", jobName, 
jobState);
             Thread.sleep(TimeUnit.SECONDS.toMillis(10L));
diff --git 
a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobLauncherTest.java
 
b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobLauncherTest.java
index 2aed6f562..e171591d5 100644
--- 
a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobLauncherTest.java
+++ 
b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobLauncherTest.java
@@ -175,7 +175,7 @@ public class GobblinHelixJobLauncherTest {
     this.thread.start();
   }
 
-  private Properties generateJobProperties(Config baseConfig, String 
jobNameSuffix, String jobIdSuffix) {
+  static Properties generateJobProperties(Config baseConfig, String 
jobNameSuffix, String jobIdSuffix) {
     Properties properties = ConfigUtils.configToProperties(baseConfig);
 
     String jobName = properties.getProperty(ConfigurationKeys.JOB_NAME_KEY) + 
jobNameSuffix;
diff --git 
a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobSchedulerTest.java
 
b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobSchedulerTest.java
new file mode 100644
index 000000000..498e2530a
--- /dev/null
+++ 
b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobSchedulerTest.java
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.cluster;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URL;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.curator.test.TestingServer;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.InstanceType;
+import org.assertj.core.util.Lists;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import com.google.common.base.Optional;
+import com.google.common.eventbus.EventBus;
+import com.google.common.io.Closer;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigValueFactory;
+
+import org.apache.gobblin.cluster.event.NewJobConfigArrivalEvent;
+import org.apache.gobblin.cluster.event.UpdateJobConfigArrivalEvent;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.runtime.job_catalog.NonObservingFSJobCatalog;
+import org.apache.gobblin.scheduler.SchedulerService;
+
+
+/**
+ * Unit tests for {@link org.apache.gobblin.cluster.GobblinHelixJobScheduler}.
+ *
+ */
+@Test(groups = {"gobblin.cluster"})
+public class GobblinHelixJobSchedulerTest {
+  public final static Logger LOG = 
LoggerFactory.getLogger(GobblinHelixJobSchedulerTest.class);
+
+  private HelixManager helixManager;
+  private FileSystem localFs;
+  private Path appWorkDir;
+  private final Closer closer = Closer.create();
+  private Config baseConfig;
+
+  private GobblinTaskRunner gobblinTaskRunner;
+
+  private Thread thread;
+
+  private final String workflowIdSuffix1 = "_1504201348471";
+  private final String workflowIdSuffix2 = "_1504201348472";
+
+  @BeforeClass
+  public void setUp()
+      throws Exception {
+    TestingServer testingZKServer = this.closer.register(new 
TestingServer(-1));
+    LOG.info("Testing ZK Server listening on: " + 
testingZKServer.getConnectString());
+
+    URL url = GobblinHelixJobSchedulerTest.class.getClassLoader()
+        .getResource(GobblinHelixJobSchedulerTest.class.getSimpleName() + 
".conf");
+    Assert.assertNotNull(url, "Could not find resource " + url);
+
+    this.appWorkDir = new 
Path(GobblinHelixJobSchedulerTest.class.getSimpleName());
+
+    // Prepare the source Json file
+    File sourceJsonFile = new File(this.appWorkDir.toString(), 
TestHelper.TEST_JOB_NAME + ".json");
+    TestHelper.createSourceJsonFile(sourceJsonFile);
+
+    baseConfig = 
ConfigFactory.parseURL(url).withValue("gobblin.cluster.zk.connection.string",
+        ConfigValueFactory.fromAnyRef(testingZKServer.getConnectString()))
+        .withValue(ConfigurationKeys.SOURCE_FILEBASED_FILES_TO_PULL,
+            ConfigValueFactory.fromAnyRef(sourceJsonFile.getAbsolutePath()))
+        .withValue(ConfigurationKeys.JOB_STATE_IN_STATE_STORE, 
ConfigValueFactory.fromAnyRef("true")).resolve();
+
+    String zkConnectingString = 
baseConfig.getString(GobblinClusterConfigurationKeys.ZK_CONNECTION_STRING_KEY);
+    String helixClusterName = 
baseConfig.getString(GobblinClusterConfigurationKeys.HELIX_CLUSTER_NAME_KEY);
+
+    HelixUtils.createGobblinHelixCluster(zkConnectingString, helixClusterName);
+
+    this.helixManager = HelixManagerFactory
+        .getZKHelixManager(helixClusterName, 
TestHelper.TEST_HELIX_INSTANCE_NAME, InstanceType.CONTROLLER,
+            zkConnectingString);
+    this.closer.register(() -> helixManager.disconnect());
+    this.helixManager.connect();
+
+    this.localFs = FileSystem.getLocal(new Configuration());
+
+    this.closer.register(() -> {
+      if (localFs.exists(appWorkDir)) {
+        localFs.delete(appWorkDir, true);
+      }
+    });
+
+    this.closer.register(() -> {
+      if (localFs.exists(appWorkDir)) {
+        localFs.delete(appWorkDir, true);
+      }
+    });
+
+    this.gobblinTaskRunner =
+        new GobblinTaskRunner(TestHelper.TEST_APPLICATION_NAME, 
TestHelper.TEST_HELIX_INSTANCE_NAME,
+            TestHelper.TEST_APPLICATION_ID, TestHelper.TEST_TASK_RUNNER_ID, 
baseConfig, Optional.of(appWorkDir));
+
+    this.thread = new Thread(() -> gobblinTaskRunner.start());
+    this.thread.start();
+  }
+
+  @Test
+  public void testNewJobAndUpdate()
+      throws Exception {
+    Config config = 
ConfigFactory.empty().withValue(ConfigurationKeys.JOB_CONFIG_FILE_GENERAL_PATH_KEY,
+        ConfigValueFactory.fromAnyRef("/tmp/" + 
GobblinHelixJobScheduler.class.getSimpleName()));
+    SchedulerService schedulerService = new SchedulerService(new Properties());
+    NonObservingFSJobCatalog jobCatalog = new NonObservingFSJobCatalog(config);
+    jobCatalog.startAsync();
+    GobblinHelixJobScheduler jobScheduler =
+        new GobblinHelixJobScheduler(ConfigFactory.empty(), this.helixManager, 
java.util.Optional.empty(),
+            new EventBus(), appWorkDir, Lists.emptyList(), schedulerService, 
jobCatalog);
+
+    final Properties properties1 =
+        GobblinHelixJobLauncherTest.generateJobProperties(this.baseConfig, 
"1", workflowIdSuffix1);
+    
properties1.setProperty(GobblinClusterConfigurationKeys.CANCEL_RUNNING_JOB_ON_DELETE,
 "true");
+
+    NewJobConfigArrivalEvent newJobConfigArrivalEvent =
+        new 
NewJobConfigArrivalEvent(properties1.getProperty(ConfigurationKeys.JOB_NAME_KEY),
 properties1);
+    jobScheduler.handleNewJobConfigArrival(newJobConfigArrivalEvent);
+    properties1.setProperty(ConfigurationKeys.JOB_ID_KEY,
+        "job_" + properties1.getProperty(ConfigurationKeys.JOB_NAME_KEY) + 
workflowIdSuffix2);
+    Map<String, String> workflowIdMap;
+    this.helixManager.connect();
+
+    String workFlowId = null;
+    long endTime = System.currentTimeMillis() + 30000;
+    while (System.currentTimeMillis() < endTime) {
+      workflowIdMap = HelixUtils.getWorkflowIdsFromJobNames(this.helixManager,
+          Collections.singletonList(newJobConfigArrivalEvent.getJobName()));
+      if (workflowIdMap.containsKey(newJobConfigArrivalEvent.getJobName())) {
+        workFlowId = workflowIdMap.get(newJobConfigArrivalEvent.getJobName());
+        break;
+      }
+      Thread.sleep(100);
+    }
+    Assert.assertNotNull(workFlowId);
+    Assert.assertTrue(workFlowId.endsWith(workflowIdSuffix1));
+
+    jobScheduler.handleUpdateJobConfigArrival(
+        new 
UpdateJobConfigArrivalEvent(properties1.getProperty(ConfigurationKeys.JOB_NAME_KEY),
 properties1));
+    this.helixManager.connect();
+    endTime = System.currentTimeMillis() + 30000;
+    while (System.currentTimeMillis() < endTime) {
+      workflowIdMap = HelixUtils.getWorkflowIdsFromJobNames(this.helixManager,
+          Collections.singletonList(newJobConfigArrivalEvent.getJobName()));
+      if (workflowIdMap.containsKey(newJobConfigArrivalEvent.getJobName())) {
+        workFlowId = workflowIdMap.get(newJobConfigArrivalEvent.getJobName());
+        break;
+      }
+      Thread.sleep(100);
+    }
+    Assert.assertTrue(workFlowId.endsWith(workflowIdSuffix2));
+  }
+
+  @AfterClass
+  public void tearDown()
+      throws IOException {
+    try {
+      this.gobblinTaskRunner.stop();
+      this.thread.join();
+    } catch (InterruptedException ie) {
+      Thread.currentThread().interrupt();
+    } finally {
+      this.closer.close();
+    }
+  }
+}
diff --git 
a/gobblin-cluster/src/test/resources/GobblinHelixJobSchedulerTest.conf 
b/gobblin-cluster/src/test/resources/GobblinHelixJobSchedulerTest.conf
new file mode 100644
index 000000000..a684c1d19
--- /dev/null
+++ b/gobblin-cluster/src/test/resources/GobblinHelixJobSchedulerTest.conf
@@ -0,0 +1,34 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+include "reference"
+
+# Cluster / Helix configuration properties
+gobblin.cluster.helix.cluster.name=GobblinHelixJobSchedulerTest
+gobblin.cluster.work.dir=GobblinHelixJobSchedulerTest
+gobblin.cluster.zk.connection.string="localhost:3088"
+
+# Gobblin job configuration properties
+job.name=GobblinHelixJobSchedulerTest
+job.group=test
+source.class=org.apache.gobblin.example.simplejson.SimpleJsonSource
+converter.classes=org.apache.gobblin.example.simplejson.SimpleJsonConverter
+writer.file.name="foo.avro"
+writer.file.path=avro
+writer.builder.class=org.apache.gobblin.writer.AvroDataWriterBuilder
+source.schema="{\"namespace\":\"example.avro\", \"type\":\"record\", 
\"name\":\"User\", \"fields\":[{\"name\":\"name\", \"type\":\"string\"}, 
{\"name\":\"favorite_number\",  \"type\":\"int\"}, 
{\"name\":\"favorite_color\", \"type\":\"string\"}]}"
+metrics.enabled=true
\ No newline at end of file

Reply via email to