This is an automated email from the ASF dual-hosted git repository.
wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 19d1620d8 [GOBBLIN-2039] Handle failure scenarios multileader
compilation startup (#3918)
19d1620d8 is described below
commit 19d1620d873ff5a5cf3f6b1993fff5ddf340653d
Author: William Lo <[email protected]>
AuthorDate: Thu Apr 11 16:23:39 2024 -0400
[GOBBLIN-2039] Handle failure scenarios multileader compilation startup
(#3918)
Fix NPE that occur when compilation issues happen in multihop flow compiler
---
.../runtime/DagActionStoreChangeMonitorTest.java | 81 +++++++++++++++++++---
.../utils/FlowCompilationValidationHelper.java | 2 +-
.../monitoring/DagActionStoreChangeMonitor.java | 7 +-
.../orchestration/MysqlDagActionStoreTest.java | 23 +++---
4 files changed, 92 insertions(+), 21 deletions(-)
diff --git
a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/DagActionStoreChangeMonitorTest.java
b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/DagActionStoreChangeMonitorTest.java
index 0575a99bd..f93836978 100644
---
a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/DagActionStoreChangeMonitorTest.java
+++
b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/DagActionStoreChangeMonitorTest.java
@@ -17,9 +17,14 @@
package org.apache.gobblin.runtime;
+import java.io.IOException;
import java.net.URI;
import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
import com.typesafe.config.Config;
@@ -28,13 +33,17 @@ import com.typesafe.config.ConfigValueFactory;
import lombok.extern.slf4j.Slf4j;
+import org.apache.gobblin.config.ConfigBuilder;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.kafka.client.DecodeableKafkaRecord;
import org.apache.gobblin.kafka.client.Kafka09ConsumerClient;
+import org.apache.gobblin.metastore.testing.ITestMetastoreDatabase;
+import org.apache.gobblin.metastore.testing.TestMetastoreDatabaseFactory;
import org.apache.gobblin.runtime.api.SpecNotFoundException;
import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
import org.apache.gobblin.service.modules.orchestration.DagActionStore;
import org.apache.gobblin.service.modules.orchestration.DagManager;
+import org.apache.gobblin.service.modules.orchestration.MysqlDagActionStore;
import org.apache.gobblin.service.modules.orchestration.Orchestrator;
import org.apache.gobblin.service.monitoring.DagActionStoreChangeEvent;
import org.apache.gobblin.service.monitoring.DagActionStoreChangeMonitor;
@@ -56,12 +65,19 @@ public class DagActionStoreChangeMonitorTest {
public static final String TOPIC =
DagActionStoreChangeEvent.class.getSimpleName();
private final int PARTITION = 1;
private final int OFFSET = 1;
+
+ private static final String USER = "testUser";
+ private static final String PASSWORD = "testPassword";
+ private static final String TABLE = "dag_action_store";
+
private final String FLOW_GROUP = "flowGroup";
private final String FLOW_NAME = "flowName";
private final String FLOW_EXECUTION_ID = "123";
private MockDagActionStoreChangeMonitor mockDagActionStoreChangeMonitor;
private int txidCounter = 0;
+ private ITestMetastoreDatabase testDb;
+
/**
* Note: The class methods are wrapped in a test specific method because the
original methods are package protected
* and cannot be accessed by this class.
@@ -70,13 +86,17 @@ public class DagActionStoreChangeMonitorTest {
public MockDagActionStoreChangeMonitor(String topic, Config config, int
numThreads,
boolean isMultiActiveSchedulerEnabled) {
- super(topic, config, mock(DagManager.class), numThreads,
mock(FlowCatalog.class), mock(Orchestrator.class),
- mock(DagActionStore.class), isMultiActiveSchedulerEnabled);
+ this(topic, config, numThreads, isMultiActiveSchedulerEnabled,
mock(DagActionStore.class), mock(DagManager.class), mock(FlowCatalog.class),
mock(Orchestrator.class));
+ }
+
+ public MockDagActionStoreChangeMonitor(String topic, Config config, int
numThreads, boolean isMultiActiveSchedulerEnabled,
+ DagActionStore dagActionStore, DagManager dagManager, FlowCatalog
flowCatalog, Orchestrator orchestrator) {
+ super(topic, config, dagManager, numThreads, flowCatalog, orchestrator,
+ dagActionStore, isMultiActiveSchedulerEnabled);
}
protected void processMessageForTest(DecodeableKafkaRecord record) {
super.processMessage(record);
-
}
protected void startUpForTest() {
@@ -93,18 +113,28 @@ public class DagActionStoreChangeMonitorTest {
}
// Called at start of every test so the count of each method being called is
reset to 0
- public void setup() {
+ @BeforeMethod
+ public void setupMockMonitor() {
mockDagActionStoreChangeMonitor = createMockDagActionStoreChangeMonitor();
mockDagActionStoreChangeMonitor.startUpForTest();
}
+ @BeforeClass
+ public void setUp() throws Exception {
+ this.testDb = TestMetastoreDatabaseFactory.get();
+ }
+
+ @AfterClass
+ public void tearDown() throws IOException {
+ this.testDb.close();
+ }
+
/**
* Ensure no NPE results from passing a HEARTBEAT type message with a null
{@link DagActionValue} and the message is
* filtered out since it's a heartbeat type so no methods are called.
*/
@Test
public void testProcessMessageWithHeartbeatAndNullDagAction() throws
SpecNotFoundException {
- setup();
Kafka09ConsumerClient.Kafka09ConsumerRecord consumerRecord =
wrapDagActionStoreChangeEvent(OperationType.HEARTBEAT, "", "", "",
null);
mockDagActionStoreChangeMonitor.processMessageForTest(consumerRecord);
@@ -120,7 +150,6 @@ public class DagActionStoreChangeMonitorTest {
*/
@Test (dependsOnMethods = "testProcessMessageWithHeartbeatAndNullDagAction")
public void testProcessMessageWithHeartbeatAndFlowInfo() throws
SpecNotFoundException {
- setup();
Kafka09ConsumerClient.Kafka09ConsumerRecord consumerRecord =
wrapDagActionStoreChangeEvent(OperationType.HEARTBEAT, FLOW_GROUP,
FLOW_NAME, FLOW_EXECUTION_ID, DagActionValue.RESUME);
mockDagActionStoreChangeMonitor.processMessageForTest(consumerRecord);
@@ -134,7 +163,6 @@ public class DagActionStoreChangeMonitorTest {
*/
@Test (dependsOnMethods = "testProcessMessageWithHeartbeatAndFlowInfo")
public void testProcessMessageWithInsertLaunchType() throws
SpecNotFoundException {
- setup();
Kafka09ConsumerClient.Kafka09ConsumerRecord consumerRecord =
wrapDagActionStoreChangeEvent(OperationType.INSERT, FLOW_GROUP,
FLOW_NAME, FLOW_EXECUTION_ID, DagActionValue.LAUNCH);
mockDagActionStoreChangeMonitor.processMessageForTest(consumerRecord);
@@ -149,7 +177,6 @@ public class DagActionStoreChangeMonitorTest {
*/
@Test (dependsOnMethods = "testProcessMessageWithInsertLaunchType")
public void testProcessMessageWithInsertResumeType() throws
SpecNotFoundException {
- setup();
Kafka09ConsumerClient.Kafka09ConsumerRecord consumerRecord =
wrapDagActionStoreChangeEvent(OperationType.INSERT, FLOW_GROUP,
FLOW_NAME, FLOW_EXECUTION_ID, DagActionValue.RESUME);
mockDagActionStoreChangeMonitor.processMessageForTest(consumerRecord);
@@ -163,7 +190,6 @@ public class DagActionStoreChangeMonitorTest {
*/
@Test (dependsOnMethods = "testProcessMessageWithInsertResumeType")
public void testProcessMessageWithInsertKillType() throws
SpecNotFoundException {
- setup();
Kafka09ConsumerClient.Kafka09ConsumerRecord consumerRecord =
wrapDagActionStoreChangeEvent(OperationType.INSERT, FLOW_GROUP,
FLOW_NAME, FLOW_EXECUTION_ID, DagActionValue.KILL);
mockDagActionStoreChangeMonitor.processMessageForTest(consumerRecord);
@@ -178,7 +204,6 @@ public class DagActionStoreChangeMonitorTest {
*/
@Test (dependsOnMethods = "testProcessMessageWithInsertKillType")
public void testProcessMessageWithUpdate() throws SpecNotFoundException {
- setup();
Kafka09ConsumerClient.Kafka09ConsumerRecord consumerRecord =
wrapDagActionStoreChangeEvent(OperationType.UPDATE, FLOW_GROUP,
FLOW_NAME, FLOW_EXECUTION_ID, DagActionValue.LAUNCH);
mockDagActionStoreChangeMonitor.processMessageForTest(consumerRecord);
@@ -192,7 +217,6 @@ public class DagActionStoreChangeMonitorTest {
*/
@Test (dependsOnMethods = "testProcessMessageWithUpdate")
public void testProcessMessageWithDelete() throws SpecNotFoundException {
- setup();
Kafka09ConsumerClient.Kafka09ConsumerRecord consumerRecord =
wrapDagActionStoreChangeEvent(OperationType.DELETE, FLOW_GROUP,
FLOW_NAME, FLOW_EXECUTION_ID, DagActionValue.LAUNCH);
mockDagActionStoreChangeMonitor.processMessageForTest(consumerRecord);
@@ -201,6 +225,41 @@ public class DagActionStoreChangeMonitorTest {
verify(mockDagActionStoreChangeMonitor.getFlowCatalog(),
times(0)).getSpecs(any(URI.class));
}
+ @Test (dependsOnMethods = "testProcessMessageWithDelete")
+ public void testStartupSequenceHandlesFailures() throws Exception {
+ Config config = ConfigBuilder.create()
+ .addPrimitive("MysqlDagActionStore." +
ConfigurationKeys.STATE_STORE_DB_URL_KEY, this.testDb.getJdbcUrl())
+ .addPrimitive("MysqlDagActionStore." +
ConfigurationKeys.STATE_STORE_DB_USER_KEY, USER)
+ .addPrimitive("MysqlDagActionStore." +
ConfigurationKeys.STATE_STORE_DB_PASSWORD_KEY, PASSWORD)
+ .addPrimitive("MysqlDagActionStore." +
ConfigurationKeys.STATE_STORE_DB_TABLE_KEY, TABLE)
+ .build();
+ String flowGroup = "testFlowGroup";
+ String flowName = "testFlowName";
+ String jobName = "testJobName";
+ String flowExecutionId = "12345677";
+
+ MysqlDagActionStore mysqlDagActionStore = new MysqlDagActionStore(config);
+ mysqlDagActionStore.addJobDagAction(flowGroup, flowName, flowExecutionId,
jobName, DagActionStore.DagActionType.LAUNCH);
+
+ Config monitorConfig =
ConfigFactory.empty().withValue(ConfigurationKeys.KAFKA_BROKERS,
ConfigValueFactory.fromAnyRef("localhost:0000"))
+
.withValue(Kafka09ConsumerClient.GOBBLIN_CONFIG_VALUE_DESERIALIZER_CLASS_KEY,
ConfigValueFactory.fromAnyRef("org.apache.kafka.common.serialization.ByteArrayDeserializer"))
+ .withValue(ConfigurationKeys.STATE_STORE_ROOT_DIR_KEY,
ConfigValueFactory.fromAnyRef("/tmp/fakeStateStore"))
+ .withValue("zookeeper.connect",
ConfigValueFactory.fromAnyRef("localhost:2121"));
+ DagManager mockDagManager = mock(DagManager.class);
+ FlowCatalog mockFlowCatalog = mock(FlowCatalog.class);
+ Orchestrator mockOrchestrator = mock(Orchestrator.class);
+ // Throw an uncaught exception during startup sequence
+ when(mockFlowCatalog.getSpecs(any(URI.class))).thenThrow(new
RuntimeException("Uncaught exception"));
+ mockDagActionStoreChangeMonitor = new
MockDagActionStoreChangeMonitor("dummyTopic", monitorConfig, 5,
+ true, mysqlDagActionStore, mockDagManager, mockFlowCatalog,
mockOrchestrator);
+ try {
+ mockDagActionStoreChangeMonitor.setActive();
+ } catch (Exception e) {
+ verify(mockFlowCatalog.getSpecs(), times(1));
+ Assert.fail();
+ }
+ }
+
/**
* Util to create a general DagActionStoreChange type event
*/
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/FlowCompilationValidationHelper.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/FlowCompilationValidationHelper.java
index 4e9f48e5b..590fedb2b 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/FlowCompilationValidationHelper.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/FlowCompilationValidationHelper.java
@@ -136,7 +136,7 @@ public class FlowCompilationValidationHelper {
ConfigurationKeys.FLOW_ALLOW_CONCURRENT_EXECUTION,
String.valueOf(this.isFlowConcurrencyEnabled)));
Dag<JobExecutionPlan> jobExecutionPlanDag =
specCompiler.compileFlow(flowSpec);
- if (jobExecutionPlanDag.isEmpty()) {
+ if (jobExecutionPlanDag == null || jobExecutionPlanDag.isEmpty()) {
return Optional.absent();
}
addFlowExecutionIdIfAbsent(flowMetadata, jobExecutionPlanDag);
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java
index a3abb6454..814236eeb 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java
@@ -144,7 +144,12 @@ public class DagActionStoreChangeMonitor extends
HighLevelConsumer {
}
// TODO: make this multi-threaded to add parallelism
for (DagActionStore.DagAction action : dagActions) {
- handleDagAction(action, true);
+ try {
+ handleDagAction(action, true);
+ } catch (Exception e) {
+ log.error("Unexpected error initializing from DagActionStore changes,
upon {}", action, e);
+ this.unexpectedErrors.mark();
+ }
}
}
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlDagActionStoreTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlDagActionStoreTest.java
index da0bb737d..d88e19b30 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlDagActionStoreTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlDagActionStoreTest.java
@@ -23,6 +23,7 @@ import java.util.Collection;
import java.util.HashSet;
import org.testng.Assert;
+import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
@@ -46,10 +47,11 @@ public class MysqlDagActionStoreTest {
private static final String flowExecutionId_3 = "12345679";
private MysqlDagActionStore mysqlDagActionStore;
+ private ITestMetastoreDatabase testDb;
+
@BeforeClass
public void setUp() throws Exception {
- ITestMetastoreDatabase testDb = TestMetastoreDatabaseFactory.get();
-
+ this.testDb = TestMetastoreDatabaseFactory.get();
Config config = ConfigBuilder.create()
.addPrimitive("MysqlDagActionStore." +
ConfigurationKeys.STATE_STORE_DB_URL_KEY, testDb.getJdbcUrl())
.addPrimitive("MysqlDagActionStore." +
ConfigurationKeys.STATE_STORE_DB_USER_KEY, USER)
@@ -60,6 +62,11 @@ public class MysqlDagActionStoreTest {
this.mysqlDagActionStore = new MysqlDagActionStore(config);
}
+ @AfterClass
+ public void tearDown() throws IOException {
+ this.testDb.close();
+ }
+
@Test
public void testAddAction() throws Exception {
this.mysqlDagActionStore.addJobDagAction(flowGroup, flowName,
flowExecutionId, jobName, DagActionStore.DagActionType.KILL);
@@ -94,12 +101,12 @@ public class MysqlDagActionStoreTest {
@Test(dependsOnMethods = "testGetActions")
public void testDeleteAction() throws IOException, SQLException {
- this.mysqlDagActionStore.deleteDagAction(
- new DagActionStore.DagAction(flowGroup, flowName, flowExecutionId,
jobName, DagActionStore.DagActionType.KILL));
- Assert.assertEquals(this.mysqlDagActionStore.getDagActions().size(), 2);
- Assert.assertFalse(this.mysqlDagActionStore.exists(flowGroup, flowName,
flowExecutionId, jobName, DagActionStore.DagActionType.KILL));
- Assert.assertTrue(this.mysqlDagActionStore.exists(flowGroup, flowName,
flowExecutionId, jobName, DagActionStore.DagActionType.RESUME));
- Assert.assertTrue(this.mysqlDagActionStore.exists(flowGroup, flowName,
flowExecutionId_2, jobName, DagActionStore.DagActionType.KILL));
+ this.mysqlDagActionStore.deleteDagAction(
+ new DagActionStore.DagAction(flowGroup, flowName, flowExecutionId,
jobName, DagActionStore.DagActionType.KILL));
+ Assert.assertEquals(this.mysqlDagActionStore.getDagActions().size(), 2);
+ Assert.assertFalse(this.mysqlDagActionStore.exists(flowGroup, flowName,
flowExecutionId, jobName, DagActionStore.DagActionType.KILL));
+ Assert.assertTrue(this.mysqlDagActionStore.exists(flowGroup, flowName,
flowExecutionId, jobName, DagActionStore.DagActionType.RESUME));
+ Assert.assertTrue(this.mysqlDagActionStore.exists(flowGroup, flowName,
flowExecutionId_2, jobName, DagActionStore.DagActionType.KILL));
}
}
\ No newline at end of file