This is an automated email from the ASF dual-hosted git repository.
zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 6aaf48529 [GOBBLIN-1744] Improve handling of null value edge cases
when querying Helix (#3603)
6aaf48529 is described below
commit 6aaf4852984f36420cbcbb850ddda7d96393c83f
Author: Matthew Ho <[email protected]>
AuthorDate: Tue Jan 10 17:30:12 2023 -0800
[GOBBLIN-1744] Improve handling of null value edge cases when querying
Helix (#3603)
* [GOBBLIN-1744] Improve logging in null cases when querying from Helix
* * throws a descriptive exception when seeing invalid state from helix / zk
---
.../GobblinHelixUnexpectedStateException.java | 29 +++++++
.../cluster/HelixAssignedParticipantCheck.java | 24 ++++--
.../org/apache/gobblin/cluster/HelixUtils.java | 38 ++++++---
.../org/apache/gobblin/cluster/HelixUtilsTest.java | 93 +++++++++++++++++++---
4 files changed, 153 insertions(+), 31 deletions(-)
diff --git
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixUnexpectedStateException.java
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixUnexpectedStateException.java
new file mode 100644
index 000000000..ec5a74844
--- /dev/null
+++
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixUnexpectedStateException.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.cluster;
+
+/**
+ * Exception to describe situations where Gobblin sees unexpected state from
Helix. Historically, we've seen unexpected
+ * null values, which bubble up as NPE. This exception is explicitly used to
differentiate bad Gobblin code from
+ * Helix failures (i.e. seeing a NPE implies Gobblin bug)
+ */
+public class GobblinHelixUnexpectedStateException extends Exception {
+ public GobblinHelixUnexpectedStateException(String message, Object... args) {
+ super(String.format(message, args));
+ }
+}
diff --git
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixAssignedParticipantCheck.java
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixAssignedParticipantCheck.java
index 4017143b7..12439c752 100644
---
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixAssignedParticipantCheck.java
+++
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixAssignedParticipantCheck.java
@@ -139,16 +139,24 @@ public class HelixAssignedParticipantCheck implements
CommitStep {
if (jobContext != null) {
String participant = jobContext.getAssignedParticipant(partitionNum);
- if (participant != null) {
- boolean isAssignedParticipant =
participant.equalsIgnoreCase(helixInstanceName);
- if (!isAssignedParticipant) {
- log.info("The current helix instance is not the assigned
participant. helixInstanceName={}, assignedParticipant={}",
- helixInstanceName, participant);
- }
-
- return isAssignedParticipant;
+ if (participant == null) {
+ log.error("The current assigned participant is null. This implies
that \n"
+ + "\t\t(a)Helix failed to write to zookeeper, which is often
caused by lack of compression leading / exceeding zookeeper jute max buffer
size (Default 1MB)\n"
+ + "\t\t(b)Helix reassigned the task (unlikely if this current
task has been running without issue. Helix does not have code for reassigning
\"running\" tasks)\n"
+ + "\t\tNote: This logic is true as of Helix version 1.0.2 and ZK
version 3.6");
+
+ return false;
+ }
+
+ boolean isAssignedParticipant =
participant.equalsIgnoreCase(helixInstanceName);
+ if (!isAssignedParticipant) {
+ log.info("The current helix instance is not the assigned
participant. helixInstanceName={}, assignedParticipant={}",
+ helixInstanceName, participant);
}
+
+ return isAssignedParticipant;
}
+
return false;
};
diff --git
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java
index ce217ed20..308229b34 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java
@@ -28,10 +28,13 @@ import java.util.Properties;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
-
+import lombok.extern.slf4j.Slf4j;
+import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.metrics.Tag;
import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.runtime.JobException;
import org.apache.gobblin.runtime.JobState;
+import org.apache.gobblin.runtime.listeners.JobListener;
import org.apache.gobblin.util.Id;
import org.apache.gobblin.util.JobLauncherUtils;
import org.apache.helix.HelixAdmin;
@@ -54,13 +57,7 @@ import org.apache.helix.task.WorkflowConfig;
import org.apache.helix.task.WorkflowContext;
import org.apache.helix.tools.ClusterSetup;
-import lombok.extern.slf4j.Slf4j;
-
-import org.apache.gobblin.configuration.ConfigurationKeys;
-import org.apache.gobblin.runtime.JobException;
-import org.apache.gobblin.runtime.listeners.JobListener;
-
-import static org.apache.helix.task.TaskState.STOPPED;
+import static org.apache.helix.task.TaskState.*;
/**
@@ -393,13 +390,28 @@ public class HelixUtils {
*
* @param jobNames a list of Gobblin job names.
* @return a map from jobNames to their Helix Workflow Ids.
+ * @throws GobblinHelixUnexpectedStateException when there is inconsistent
helix state. This implies that we should retry the call
+ * to avoid acting on stale data
*/
- public static Map<String, String> getWorkflowIdsFromJobNames(HelixManager
helixManager, Collection<String> jobNames) {
- Map<String, String> jobNameToWorkflowId = new HashMap<>();
+ public static Map<String, String> getWorkflowIdsFromJobNames(HelixManager
helixManager, Collection<String> jobNames)
+ throws GobblinHelixUnexpectedStateException {
TaskDriver taskDriver = new TaskDriver(helixManager);
+ return getWorkflowIdsFromJobNames(taskDriver, jobNames);
+ }
+
+ public static Map<String, String> getWorkflowIdsFromJobNames(TaskDriver
taskDriver, Collection<String> jobNames)
+ throws GobblinHelixUnexpectedStateException {
+ Map<String, String> jobNameToWorkflowId = new HashMap<>();
Map<String, WorkflowConfig> workflowConfigMap = taskDriver.getWorkflows();
- for (String workflow : workflowConfigMap.keySet()) {
- WorkflowConfig workflowConfig = taskDriver.getWorkflowConfig(workflow);
+ for (Map.Entry<String, WorkflowConfig> entry :
workflowConfigMap.entrySet()) {
+ String workflow = entry.getKey();
+ WorkflowConfig workflowConfig = entry.getValue();
+ if (workflowConfig == null) {
+ // As of Helix 1.0.2 implementation, this in theory shouldn't happen.
But this null check is here in case implementation changes
+ // because the API doesn't technically prohibit null configs, maps
allowing null values is implementation based, and we want to fail loudly with a
clear root cause.
+ // the caller of this API should retry this API call
+ throw new GobblinHelixUnexpectedStateException("Received null workflow
config from Helix. We should not see any null configs when reading all
workflows. workflowId=%s", workflow);
+ }
//Filter out any stale Helix workflows which are not running.
if (workflowConfig.getTargetState() != TargetState.START) {
continue;
@@ -450,4 +462,4 @@ public class HelixUtils {
log.error("Could not drop instance: {} due to: {}", helixInstanceName,
e);
}
}
-}
\ No newline at end of file
+}
diff --git
a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/HelixUtilsTest.java
b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/HelixUtilsTest.java
index d7841a88b..e3ea1155f 100644
---
a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/HelixUtilsTest.java
+++
b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/HelixUtilsTest.java
@@ -19,22 +19,36 @@ package org.apache.gobblin.cluster;
import java.io.IOException;
import java.net.URL;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
import java.util.Properties;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.util.ConfigUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.token.Token;
+import org.apache.helix.task.JobConfig;
+import org.apache.helix.task.JobDag;
+import org.apache.helix.task.TargetState;
+import org.apache.helix.task.TaskConfig;
+import org.apache.helix.task.TaskDriver;
+import org.apache.helix.task.WorkflowConfig;
+import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
+import com.google.common.collect.ImmutableMap;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
-import org.apache.gobblin.util.ConfigUtils;
+import static org.testng.Assert.*;
/**
@@ -66,18 +80,77 @@ public class HelixUtilsTest {
Assert.assertNotNull(url, "Could not find resource " + url);
Config config = ConfigFactory.parseURL(url).resolve();
- Assert.assertEquals(config.getString("k1"), "v1");
- Assert.assertEquals(config.getString("k2"), "v1");
- Assert.assertEquals(config.getInt("k3"), 1000);
+ assertEquals(config.getString("k1"), "v1");
+ assertEquals(config.getString("k2"), "v1");
+ assertEquals(config.getInt("k3"), 1000);
Assert.assertTrue(config.getBoolean("k4"));
- Assert.assertEquals(config.getLong("k5"), 10000);
+ assertEquals(config.getLong("k5"), 10000);
Properties properties = ConfigUtils.configToProperties(config);
- Assert.assertEquals(properties.getProperty("k1"), "v1");
- Assert.assertEquals(properties.getProperty("k2"), "v1");
- Assert.assertEquals(properties.getProperty("k3"), "1000");
- Assert.assertEquals(properties.getProperty("k4"), "true");
- Assert.assertEquals(properties.getProperty("k5"), "10000");
+ assertEquals(properties.getProperty("k1"), "v1");
+ assertEquals(properties.getProperty("k2"), "v1");
+ assertEquals(properties.getProperty("k3"), "1000");
+ assertEquals(properties.getProperty("k4"), "true");
+ assertEquals(properties.getProperty("k5"), "10000");
+ }
+
+ @Test
+ public void testGetWorkunitIdForJobNames() throws
GobblinHelixUnexpectedStateException {
+ final String HELIX_JOB = "job";
+ final String GOBBLIN_JOB_NAME = "gobblin-job-name";
+
+ TaskDriver driver = Mockito.mock(TaskDriver.class);
+ WorkflowConfig workflowCfg = Mockito.mock(WorkflowConfig.class);
+ JobDag dag = Mockito.mock(JobDag.class);
+ JobConfig jobCfg = Mockito.mock(JobConfig.class);
+ TaskConfig taskCfg = Mockito.mock(TaskConfig.class);
+
+ /**
+ * Mocks for setting up the workflow, job dag, job names, etc.
+ *
+ * Example of task cfg
+ * "mapFields" : {
+ * "006d6d2b-4b8b-4c1b-877b-b7fb51d9295c" : {
+ * "TASK_SUCCESS_OPTIONAL" : "true",
+ * "job.id" : "job_KafkaHdfsStreamingTracking_1668738617409",
+ * "job.name" : "KafkaHdfsStreamingTracking",
+ * "task.id" : "task_KafkaHdfsStreamingTracking_1668738617409_179",
+ * "gobblin.cluster.work.unit.file.path" : "<SOME PATH>",
+ * "TASK_ID" : "006d6d2b-4b8b-4c1b-877b-b7fb51d9295c"
+ * },
+ */
+ Mockito.when(driver.getWorkflows()).thenReturn(ImmutableMap.of(
+ "workflow-1", workflowCfg
+ ));
+
+ Mockito.when(workflowCfg.getTargetState()).thenReturn(TargetState.START);
+ Mockito.when(workflowCfg.getJobDag()).thenReturn(dag);
+ Mockito.when(dag.getAllNodes()).thenReturn(new
HashSet<>(Arrays.asList(HELIX_JOB)));
+ Mockito.when(driver.getJobConfig(HELIX_JOB)).thenReturn(jobCfg);
+
Mockito.when(jobCfg.getTaskConfigMap()).thenReturn(ImmutableMap.of("stub-guid",
taskCfg));
+
Mockito.when(taskCfg.getConfigMap()).thenReturn(ImmutableMap.of(ConfigurationKeys.JOB_NAME_KEY,
GOBBLIN_JOB_NAME));
+
+ assertEquals(
+ HelixUtils.getWorkflowIdsFromJobNames(driver,
Arrays.asList(GOBBLIN_JOB_NAME)),
+ ImmutableMap.of(GOBBLIN_JOB_NAME, "workflow-1"));
+ }
+
+ @Test(expectedExceptions = GobblinHelixUnexpectedStateException.class)
+ public void testGetWorkunitIdForJobNamesWithInvalidHelixState() throws
GobblinHelixUnexpectedStateException {
+ final String GOBBLIN_JOB_NAME = "gobblin-job-name";
+
+ TaskDriver driver = Mockito.mock(TaskDriver.class);
+
+ Map<String, WorkflowConfig> workflowConfigMap = new HashMap<>();
+ workflowConfigMap.put("null-workflow-to-throw-exception", null);
+ Mockito.when(driver.getWorkflows()).thenReturn(workflowConfigMap);
+
+ try {
+ HelixUtils.getWorkflowIdsFromJobNames(driver,
Arrays.asList(GOBBLIN_JOB_NAME));
+ } catch (GobblinHelixUnexpectedStateException e) {
+ e.printStackTrace();
+ throw e;
+ }
}
@AfterClass