This is an automated email from the ASF dual-hosted git repository.

wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 19d1620d8 [GOBBLIN-2039] Handle failure scenarios multileader 
compilation startup (#3918)
19d1620d8 is described below

commit 19d1620d873ff5a5cf3f6b1993fff5ddf340653d
Author: William Lo <[email protected]>
AuthorDate: Thu Apr 11 16:23:39 2024 -0400

    [GOBBLIN-2039] Handle failure scenarios multileader compilation startup 
(#3918)
    
    Fix NPE that occur when compilation issues happen in multihop flow compiler
---
 .../runtime/DagActionStoreChangeMonitorTest.java   | 81 +++++++++++++++++++---
 .../utils/FlowCompilationValidationHelper.java     |  2 +-
 .../monitoring/DagActionStoreChangeMonitor.java    |  7 +-
 .../orchestration/MysqlDagActionStoreTest.java     | 23 +++---
 4 files changed, 92 insertions(+), 21 deletions(-)

diff --git 
a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/DagActionStoreChangeMonitorTest.java
 
b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/DagActionStoreChangeMonitorTest.java
index 0575a99bd..f93836978 100644
--- 
a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/DagActionStoreChangeMonitorTest.java
+++ 
b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/DagActionStoreChangeMonitorTest.java
@@ -17,9 +17,14 @@
 
 package org.apache.gobblin.runtime;
 
+import java.io.IOException;
 import java.net.URI;
 
 import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
 import com.typesafe.config.Config;
@@ -28,13 +33,17 @@ import com.typesafe.config.ConfigValueFactory;
 
 import lombok.extern.slf4j.Slf4j;
 
+import org.apache.gobblin.config.ConfigBuilder;
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.kafka.client.DecodeableKafkaRecord;
 import org.apache.gobblin.kafka.client.Kafka09ConsumerClient;
+import org.apache.gobblin.metastore.testing.ITestMetastoreDatabase;
+import org.apache.gobblin.metastore.testing.TestMetastoreDatabaseFactory;
 import org.apache.gobblin.runtime.api.SpecNotFoundException;
 import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
 import org.apache.gobblin.service.modules.orchestration.DagActionStore;
 import org.apache.gobblin.service.modules.orchestration.DagManager;
+import org.apache.gobblin.service.modules.orchestration.MysqlDagActionStore;
 import org.apache.gobblin.service.modules.orchestration.Orchestrator;
 import org.apache.gobblin.service.monitoring.DagActionStoreChangeEvent;
 import org.apache.gobblin.service.monitoring.DagActionStoreChangeMonitor;
@@ -56,12 +65,19 @@ public class DagActionStoreChangeMonitorTest {
   public static final String TOPIC = 
DagActionStoreChangeEvent.class.getSimpleName();
   private final int PARTITION = 1;
   private final int OFFSET = 1;
+
+  private static final String USER = "testUser";
+  private static final String PASSWORD = "testPassword";
+  private static final String TABLE = "dag_action_store";
+
   private final String FLOW_GROUP = "flowGroup";
   private final String FLOW_NAME = "flowName";
   private final String FLOW_EXECUTION_ID = "123";
   private MockDagActionStoreChangeMonitor mockDagActionStoreChangeMonitor;
   private int txidCounter = 0;
 
+  private ITestMetastoreDatabase testDb;
+
   /**
    * Note: The class methods are wrapped in a test specific method because the 
original methods are package protected
    * and cannot be accessed by this class.
@@ -70,13 +86,17 @@ public class DagActionStoreChangeMonitorTest {
 
     public MockDagActionStoreChangeMonitor(String topic, Config config, int 
numThreads,
         boolean isMultiActiveSchedulerEnabled) {
-      super(topic, config, mock(DagManager.class), numThreads, 
mock(FlowCatalog.class), mock(Orchestrator.class),
-          mock(DagActionStore.class), isMultiActiveSchedulerEnabled);
+      this(topic, config, numThreads, isMultiActiveSchedulerEnabled, 
mock(DagActionStore.class), mock(DagManager.class), mock(FlowCatalog.class), 
mock(Orchestrator.class));
+    }
+
+    public MockDagActionStoreChangeMonitor(String topic, Config config, int 
numThreads, boolean isMultiActiveSchedulerEnabled,
+        DagActionStore dagActionStore, DagManager dagManager, FlowCatalog 
flowCatalog, Orchestrator orchestrator) {
+      super(topic, config, dagManager, numThreads, flowCatalog, orchestrator,
+          dagActionStore, isMultiActiveSchedulerEnabled);
     }
 
     protected void processMessageForTest(DecodeableKafkaRecord record) {
       super.processMessage(record);
-
     }
 
     protected void startUpForTest() {
@@ -93,18 +113,28 @@ public class DagActionStoreChangeMonitorTest {
   }
 
   // Called at start of every test so the count of each method being called is 
reset to 0
-  public void setup() {
+  @BeforeMethod
+  public void setupMockMonitor() {
      mockDagActionStoreChangeMonitor = createMockDagActionStoreChangeMonitor();
      mockDagActionStoreChangeMonitor.startUpForTest();
   }
 
+  @BeforeClass
+  public void setUp() throws Exception {
+    this.testDb = TestMetastoreDatabaseFactory.get();
+  }
+
+  @AfterClass
+  public void tearDown() throws IOException {
+    this.testDb.close();
+  }
+
   /**
    * Ensure no NPE results from passing a HEARTBEAT type message with a null 
{@link DagActionValue} and the message is
    * filtered out since it's a heartbeat type so no methods are called.
    */
   @Test
   public void testProcessMessageWithHeartbeatAndNullDagAction() throws 
SpecNotFoundException {
-    setup();
     Kafka09ConsumerClient.Kafka09ConsumerRecord consumerRecord =
         wrapDagActionStoreChangeEvent(OperationType.HEARTBEAT, "", "", "", 
null);
     mockDagActionStoreChangeMonitor.processMessageForTest(consumerRecord);
@@ -120,7 +150,6 @@ public class DagActionStoreChangeMonitorTest {
    */
   @Test (dependsOnMethods = "testProcessMessageWithHeartbeatAndNullDagAction")
   public void testProcessMessageWithHeartbeatAndFlowInfo() throws 
SpecNotFoundException {
-    setup();
     Kafka09ConsumerClient.Kafka09ConsumerRecord consumerRecord =
         wrapDagActionStoreChangeEvent(OperationType.HEARTBEAT, FLOW_GROUP, 
FLOW_NAME, FLOW_EXECUTION_ID, DagActionValue.RESUME);
     mockDagActionStoreChangeMonitor.processMessageForTest(consumerRecord);
@@ -134,7 +163,6 @@ public class DagActionStoreChangeMonitorTest {
    */
   @Test (dependsOnMethods = "testProcessMessageWithHeartbeatAndFlowInfo")
   public void testProcessMessageWithInsertLaunchType() throws 
SpecNotFoundException {
-    setup();
     Kafka09ConsumerClient.Kafka09ConsumerRecord consumerRecord =
         wrapDagActionStoreChangeEvent(OperationType.INSERT, FLOW_GROUP, 
FLOW_NAME, FLOW_EXECUTION_ID, DagActionValue.LAUNCH);
     mockDagActionStoreChangeMonitor.processMessageForTest(consumerRecord);
@@ -149,7 +177,6 @@ public class DagActionStoreChangeMonitorTest {
    */
   @Test (dependsOnMethods = "testProcessMessageWithInsertLaunchType")
   public void testProcessMessageWithInsertResumeType() throws 
SpecNotFoundException {
-    setup();
     Kafka09ConsumerClient.Kafka09ConsumerRecord consumerRecord =
         wrapDagActionStoreChangeEvent(OperationType.INSERT, FLOW_GROUP, 
FLOW_NAME, FLOW_EXECUTION_ID, DagActionValue.RESUME);
     mockDagActionStoreChangeMonitor.processMessageForTest(consumerRecord);
@@ -163,7 +190,6 @@ public class DagActionStoreChangeMonitorTest {
    */
   @Test (dependsOnMethods = "testProcessMessageWithInsertResumeType")
   public void testProcessMessageWithInsertKillType() throws 
SpecNotFoundException {
-    setup();
     Kafka09ConsumerClient.Kafka09ConsumerRecord consumerRecord =
         wrapDagActionStoreChangeEvent(OperationType.INSERT, FLOW_GROUP, 
FLOW_NAME, FLOW_EXECUTION_ID, DagActionValue.KILL);
     mockDagActionStoreChangeMonitor.processMessageForTest(consumerRecord);
@@ -178,7 +204,6 @@ public class DagActionStoreChangeMonitorTest {
    */
   @Test (dependsOnMethods = "testProcessMessageWithInsertKillType")
   public void testProcessMessageWithUpdate() throws SpecNotFoundException {
-    setup();
     Kafka09ConsumerClient.Kafka09ConsumerRecord consumerRecord =
         wrapDagActionStoreChangeEvent(OperationType.UPDATE, FLOW_GROUP, 
FLOW_NAME, FLOW_EXECUTION_ID, DagActionValue.LAUNCH);
     mockDagActionStoreChangeMonitor.processMessageForTest(consumerRecord);
@@ -192,7 +217,6 @@ public class DagActionStoreChangeMonitorTest {
    */
   @Test (dependsOnMethods = "testProcessMessageWithUpdate")
   public void testProcessMessageWithDelete() throws SpecNotFoundException {
-    setup();
     Kafka09ConsumerClient.Kafka09ConsumerRecord consumerRecord =
         wrapDagActionStoreChangeEvent(OperationType.DELETE, FLOW_GROUP, 
FLOW_NAME, FLOW_EXECUTION_ID, DagActionValue.LAUNCH);
     mockDagActionStoreChangeMonitor.processMessageForTest(consumerRecord);
@@ -201,6 +225,41 @@ public class DagActionStoreChangeMonitorTest {
     verify(mockDagActionStoreChangeMonitor.getFlowCatalog(), 
times(0)).getSpecs(any(URI.class));
   }
 
+  @Test (dependsOnMethods = "testProcessMessageWithDelete")
+  public void testStartupSequenceHandlesFailures() throws Exception {
+    Config config = ConfigBuilder.create()
+        .addPrimitive("MysqlDagActionStore." + 
ConfigurationKeys.STATE_STORE_DB_URL_KEY, this.testDb.getJdbcUrl())
+        .addPrimitive("MysqlDagActionStore." + 
ConfigurationKeys.STATE_STORE_DB_USER_KEY, USER)
+        .addPrimitive("MysqlDagActionStore." + 
ConfigurationKeys.STATE_STORE_DB_PASSWORD_KEY, PASSWORD)
+        .addPrimitive("MysqlDagActionStore." + 
ConfigurationKeys.STATE_STORE_DB_TABLE_KEY, TABLE)
+        .build();
+    String flowGroup = "testFlowGroup";
+    String flowName = "testFlowName";
+    String jobName = "testJobName";
+    String flowExecutionId = "12345677";
+
+    MysqlDagActionStore mysqlDagActionStore = new MysqlDagActionStore(config);
+    mysqlDagActionStore.addJobDagAction(flowGroup, flowName, flowExecutionId, 
jobName, DagActionStore.DagActionType.LAUNCH);
+
+    Config monitorConfig = 
ConfigFactory.empty().withValue(ConfigurationKeys.KAFKA_BROKERS, 
ConfigValueFactory.fromAnyRef("localhost:0000"))
+        
.withValue(Kafka09ConsumerClient.GOBBLIN_CONFIG_VALUE_DESERIALIZER_CLASS_KEY, 
ConfigValueFactory.fromAnyRef("org.apache.kafka.common.serialization.ByteArrayDeserializer"))
+        .withValue(ConfigurationKeys.STATE_STORE_ROOT_DIR_KEY, 
ConfigValueFactory.fromAnyRef("/tmp/fakeStateStore"))
+        .withValue("zookeeper.connect", 
ConfigValueFactory.fromAnyRef("localhost:2121"));
+    DagManager mockDagManager = mock(DagManager.class);
+    FlowCatalog mockFlowCatalog = mock(FlowCatalog.class);
+    Orchestrator mockOrchestrator = mock(Orchestrator.class);
+    // Throw an uncaught exception during startup sequence
+    when(mockFlowCatalog.getSpecs(any(URI.class))).thenThrow(new 
RuntimeException("Uncaught exception"));
+    mockDagActionStoreChangeMonitor =  new 
MockDagActionStoreChangeMonitor("dummyTopic", monitorConfig, 5,
+        true, mysqlDagActionStore, mockDagManager, mockFlowCatalog, 
mockOrchestrator);
+    try {
+      mockDagActionStoreChangeMonitor.setActive();
+    } catch (Exception e) {
+      verify(mockFlowCatalog.getSpecs(), times(1));
+      Assert.fail();
+    }
+  }
+
   /**
    * Util to create a general DagActionStoreChange type event
    */
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/FlowCompilationValidationHelper.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/FlowCompilationValidationHelper.java
index 4e9f48e5b..590fedb2b 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/FlowCompilationValidationHelper.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/FlowCompilationValidationHelper.java
@@ -136,7 +136,7 @@ public class FlowCompilationValidationHelper {
         ConfigurationKeys.FLOW_ALLOW_CONCURRENT_EXECUTION, 
String.valueOf(this.isFlowConcurrencyEnabled)));
 
     Dag<JobExecutionPlan> jobExecutionPlanDag = 
specCompiler.compileFlow(flowSpec);
-    if (jobExecutionPlanDag.isEmpty()) {
+    if (jobExecutionPlanDag == null || jobExecutionPlanDag.isEmpty()) {
       return Optional.absent();
     }
     addFlowExecutionIdIfAbsent(flowMetadata, jobExecutionPlanDag);
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java
index a3abb6454..814236eeb 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java
@@ -144,7 +144,12 @@ public class DagActionStoreChangeMonitor extends 
HighLevelConsumer {
     }
     // TODO: make this multi-threaded to add parallelism
     for (DagActionStore.DagAction action : dagActions) {
-      handleDagAction(action, true);
+      try {
+        handleDagAction(action, true);
+      } catch (Exception e) {
+        log.error("Unexpected error initializing from DagActionStore changes, 
upon {}", action, e);
+        this.unexpectedErrors.mark();
+      }
     }
   }
 
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlDagActionStoreTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlDagActionStoreTest.java
index da0bb737d..d88e19b30 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlDagActionStoreTest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlDagActionStoreTest.java
@@ -23,6 +23,7 @@ import java.util.Collection;
 import java.util.HashSet;
 
 import org.testng.Assert;
+import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
@@ -46,10 +47,11 @@ public class MysqlDagActionStoreTest {
   private static final String flowExecutionId_3 = "12345679";
   private MysqlDagActionStore mysqlDagActionStore;
 
+  private ITestMetastoreDatabase testDb;
+
   @BeforeClass
   public void setUp() throws Exception {
-    ITestMetastoreDatabase testDb = TestMetastoreDatabaseFactory.get();
-
+    this.testDb = TestMetastoreDatabaseFactory.get();
     Config config = ConfigBuilder.create()
         .addPrimitive("MysqlDagActionStore." + 
ConfigurationKeys.STATE_STORE_DB_URL_KEY, testDb.getJdbcUrl())
         .addPrimitive("MysqlDagActionStore." + 
ConfigurationKeys.STATE_STORE_DB_USER_KEY, USER)
@@ -60,6 +62,11 @@ public class MysqlDagActionStoreTest {
     this.mysqlDagActionStore = new MysqlDagActionStore(config);
   }
 
+  @AfterClass
+  public void tearDown() throws IOException {
+    this.testDb.close();
+  }
+
   @Test
   public void testAddAction() throws Exception {
     this.mysqlDagActionStore.addJobDagAction(flowGroup, flowName, 
flowExecutionId, jobName, DagActionStore.DagActionType.KILL);
@@ -94,12 +101,12 @@ public class MysqlDagActionStoreTest {
 
   @Test(dependsOnMethods = "testGetActions")
   public void testDeleteAction() throws IOException, SQLException {
-   this.mysqlDagActionStore.deleteDagAction(
-       new DagActionStore.DagAction(flowGroup, flowName, flowExecutionId, 
jobName, DagActionStore.DagActionType.KILL));
-   Assert.assertEquals(this.mysqlDagActionStore.getDagActions().size(), 2);
-   Assert.assertFalse(this.mysqlDagActionStore.exists(flowGroup, flowName, 
flowExecutionId, jobName, DagActionStore.DagActionType.KILL));
-    Assert.assertTrue(this.mysqlDagActionStore.exists(flowGroup, flowName, 
flowExecutionId, jobName, DagActionStore.DagActionType.RESUME));
-   Assert.assertTrue(this.mysqlDagActionStore.exists(flowGroup, flowName, 
flowExecutionId_2, jobName, DagActionStore.DagActionType.KILL));
+     this.mysqlDagActionStore.deleteDagAction(
+         new DagActionStore.DagAction(flowGroup, flowName, flowExecutionId, 
jobName, DagActionStore.DagActionType.KILL));
+     Assert.assertEquals(this.mysqlDagActionStore.getDagActions().size(), 2);
+     Assert.assertFalse(this.mysqlDagActionStore.exists(flowGroup, flowName, 
flowExecutionId, jobName, DagActionStore.DagActionType.KILL));
+     Assert.assertTrue(this.mysqlDagActionStore.exists(flowGroup, flowName, 
flowExecutionId, jobName, DagActionStore.DagActionType.RESUME));
+     Assert.assertTrue(this.mysqlDagActionStore.exists(flowGroup, flowName, 
flowExecutionId_2, jobName, DagActionStore.DagActionType.KILL));
   }
 
 }
\ No newline at end of file

Reply via email to