This is an automated email from the ASF dual-hosted git repository.

suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 4cc0cff  [GOBBLIN-1297] Register/unregister eventbus with dagManager 
on leadership change
4cc0cff is described below

commit 4cc0cffa1791218dee07889e2da64eb091fe8a31
Author: Jack Moseley <[email protected]>
AuthorDate: Fri Oct 23 12:56:19 2020 -0700

    [GOBBLIN-1297] Register/unregister eventbus with dagManager on leadership 
change
    
    Closes #3135 from jack-moseley/eventbus-
    leadership-change
---
 .../modules/core/GobblinServiceManager.java        |  7 ++--
 .../service/modules/core/GobblinServiceHATest.java | 39 ++++++++++++++++++----
 2 files changed, 38 insertions(+), 8 deletions(-)

diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java
index cb9e161..60781f9 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java
@@ -123,7 +123,7 @@ public class GobblinServiceManager implements 
ApplicationLauncher, StandardMetri
 
   // An EventBus used for communications between services running in the 
ApplicationMaster
   @Getter
-  protected final EventBus eventBus = new 
EventBus(GobblinServiceManager.class.getSimpleName());
+  protected EventBus eventBus = new 
EventBus(GobblinServiceManager.class.getSimpleName());
 
   protected final FileSystem fs;
   protected final Path serviceWorkDir;
@@ -136,7 +136,7 @@ public class GobblinServiceManager implements 
ApplicationLauncher, StandardMetri
   protected final boolean isRestLIServerEnabled;
   protected final boolean isTopologySpecFactoryEnabled;
   protected final boolean isGitConfigMonitorEnabled;
-  protected final boolean isDagManagerEnabled;
+  protected boolean isDagManagerEnabled;
   protected final boolean isJobStatusMonitorEnabled;
 
   protected TopologyCatalog topologyCatalog;
@@ -161,6 +161,7 @@ public class GobblinServiceManager implements 
ApplicationLauncher, StandardMetri
 
   protected GitConfigMonitor gitConfigMonitor;
 
+  @Getter
   protected DagManager dagManager;
 
   protected KafkaJobStatusMonitor jobStatusMonitor;
@@ -417,6 +418,7 @@ public class GobblinServiceManager implements 
ApplicationLauncher, StandardMetri
         //Activate DagManager only if TopologyCatalog is initialized. If not; 
skip activation.
         if (this.topologyCatalog.getInitComplete().getCount() == 0) {
           this.dagManager.setActive(true);
+          this.eventBus.register(this.dagManager);
         }
       }
     } else if (this.helixManager.isPresent()) {
@@ -438,6 +440,7 @@ public class GobblinServiceManager implements 
ApplicationLauncher, StandardMetri
 
       if (this.isDagManagerEnabled) {
         this.dagManager.setActive(false);
+        this.eventBus.unregister(this.dagManager);
       }
     }
   }
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GobblinServiceHATest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GobblinServiceHATest.java
index d8bc99e..cd10a82 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GobblinServiceHATest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GobblinServiceHATest.java
@@ -23,9 +23,11 @@ import java.util.UUID;
 
 import org.apache.commons.io.FileUtils;
 import org.apache.curator.test.TestingServer;
-import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
 import org.apache.hadoop.fs.Path;
 import org.eclipse.jetty.http.HttpStatus;
+import org.jetbrains.annotations.Nullable;
+import org.mockito.Mockito;
+import org.mockito.exceptions.base.MockitoAssertionError;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testng.Assert;
@@ -35,24 +37,26 @@ import org.testng.annotations.Test;
 
 import com.google.common.base.Optional;
 import com.google.common.collect.Maps;
+import com.google.common.eventbus.EventBus;
 import com.linkedin.data.template.StringMap;
-import com.linkedin.r2.transport.common.Client;
-import com.linkedin.r2.transport.common.bridge.client.TransportClientAdapter;
 import com.linkedin.r2.transport.http.client.HttpClientFactory;
-import com.linkedin.restli.client.RestClient;
 import com.linkedin.restli.client.RestLiResponseException;
+import com.typesafe.config.Config;
 
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.metastore.MysqlJobStatusStateStoreFactory;
 import org.apache.gobblin.metastore.testing.ITestMetastoreDatabase;
 import org.apache.gobblin.metastore.testing.TestMetastoreDatabaseFactory;
+import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
 import org.apache.gobblin.service.FlowConfig;
 import org.apache.gobblin.service.FlowConfigClient;
 import org.apache.gobblin.service.FlowId;
 import org.apache.gobblin.service.Schedule;
 import org.apache.gobblin.service.ServiceConfigKeys;
+import org.apache.gobblin.service.modules.orchestration.DagManager;
 import org.apache.gobblin.service.modules.utils.HelixUtils;
 import org.apache.gobblin.service.monitoring.FsJobStatusRetriever;
+import org.apache.gobblin.testing.AssertWithBackoff;
 import org.apache.gobblin.util.ConfigUtils;
 
 @Test
@@ -162,12 +166,12 @@ public class GobblinServiceHATest {
     node2ServiceCoreProperties.put(QUARTZ_THREAD_POOL_COUNT, 3);
 
     // Start Node 1
-    this.node1GobblinServiceManager = new 
GobblinServiceManager("CoreService1", "1",
+    this.node1GobblinServiceManager = new 
TestGobblinServiceManager("CoreService1", "1",
         ConfigUtils.propertiesToConfig(node1ServiceCoreProperties), 
Optional.of(new Path(NODE_1_SERVICE_WORK_DIR)));
     this.node1GobblinServiceManager.start();
 
     // Start Node 2
-    this.node2GobblinServiceManager = new 
GobblinServiceManager("CoreService2", "2",
+    this.node2GobblinServiceManager = new 
TestGobblinServiceManager("CoreService2", "2",
         ConfigUtils.propertiesToConfig(node2ServiceCoreProperties), 
Optional.of(new Path(NODE_2_SERVICE_WORK_DIR)));
     this.node2GobblinServiceManager.start();
 
@@ -546,6 +550,29 @@ public class GobblinServiceHATest {
 
     Assert.assertTrue(assertSuccess, "New master should take over all old 
master jobs.");
 
+    // Check eventbus was registered with new leader
+    AssertWithBackoff assertWithBackoff = 
AssertWithBackoff.create().logger(LoggerFactory.getLogger("checkEventbusRegistered")).timeoutMs(20000);
+    assertWithBackoff.assertTrue(new com.google.common.base.Predicate<Void>() {
+      @Override
+      public boolean apply(@Nullable Void input) {
+        try {
+          Mockito.verify(secondary.getEventBus(), 
Mockito.atLeastOnce()).register(secondary.getDagManager());
+          return true;
+        } catch (MockitoAssertionError e) {
+          return false;
+        }
+      }
+    }, "Checking eventBus was registered");
+
     logger.info("+++++++++++++++++++ testKillNode END");
   }
+
+  public class TestGobblinServiceManager extends GobblinServiceManager {
+    public TestGobblinServiceManager(String serviceName, String serviceId, 
Config config, Optional<Path> serviceWorkDirOptional) throws Exception {
+      super(serviceName, serviceId, config, serviceWorkDirOptional);
+      this.isDagManagerEnabled = true;
+      this.eventBus = Mockito.mock(EventBus.class);
+      this.dagManager = Mockito.mock(DagManager.class);
+    }
+  }
 }
\ No newline at end of file

Reply via email to