This is an automated email from the ASF dual-hosted git repository.
suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 4cc0cff [GOBBLIN-1297] Register/unregister eventbus with dagManager
on leadership change
4cc0cff is described below
commit 4cc0cffa1791218dee07889e2da64eb091fe8a31
Author: Jack Moseley <[email protected]>
AuthorDate: Fri Oct 23 12:56:19 2020 -0700
[GOBBLIN-1297] Register/unregister eventbus with dagManager on leadership
change
Closes #3135 from jack-moseley/eventbus-
leadership-change
---
.../modules/core/GobblinServiceManager.java | 7 ++--
.../service/modules/core/GobblinServiceHATest.java | 39 ++++++++++++++++++----
2 files changed, 38 insertions(+), 8 deletions(-)
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java
index cb9e161..60781f9 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java
@@ -123,7 +123,7 @@ public class GobblinServiceManager implements
ApplicationLauncher, StandardMetri
// An EventBus used for communications between services running in the
ApplicationMaster
@Getter
- protected final EventBus eventBus = new
EventBus(GobblinServiceManager.class.getSimpleName());
+ protected EventBus eventBus = new
EventBus(GobblinServiceManager.class.getSimpleName());
protected final FileSystem fs;
protected final Path serviceWorkDir;
@@ -136,7 +136,7 @@ public class GobblinServiceManager implements
ApplicationLauncher, StandardMetri
protected final boolean isRestLIServerEnabled;
protected final boolean isTopologySpecFactoryEnabled;
protected final boolean isGitConfigMonitorEnabled;
- protected final boolean isDagManagerEnabled;
+ protected boolean isDagManagerEnabled;
protected final boolean isJobStatusMonitorEnabled;
protected TopologyCatalog topologyCatalog;
@@ -161,6 +161,7 @@ public class GobblinServiceManager implements
ApplicationLauncher, StandardMetri
protected GitConfigMonitor gitConfigMonitor;
+ @Getter
protected DagManager dagManager;
protected KafkaJobStatusMonitor jobStatusMonitor;
@@ -417,6 +418,7 @@ public class GobblinServiceManager implements
ApplicationLauncher, StandardMetri
//Activate DagManager only if TopologyCatalog is initialized. If not;
skip activation.
if (this.topologyCatalog.getInitComplete().getCount() == 0) {
this.dagManager.setActive(true);
+ this.eventBus.register(this.dagManager);
}
}
} else if (this.helixManager.isPresent()) {
@@ -438,6 +440,7 @@ public class GobblinServiceManager implements
ApplicationLauncher, StandardMetri
if (this.isDagManagerEnabled) {
this.dagManager.setActive(false);
+ this.eventBus.unregister(this.dagManager);
}
}
}
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GobblinServiceHATest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GobblinServiceHATest.java
index d8bc99e..cd10a82 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GobblinServiceHATest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GobblinServiceHATest.java
@@ -23,9 +23,11 @@ import java.util.UUID;
import org.apache.commons.io.FileUtils;
import org.apache.curator.test.TestingServer;
-import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
import org.apache.hadoop.fs.Path;
import org.eclipse.jetty.http.HttpStatus;
+import org.jetbrains.annotations.Nullable;
+import org.mockito.Mockito;
+import org.mockito.exceptions.base.MockitoAssertionError;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
@@ -35,24 +37,26 @@ import org.testng.annotations.Test;
import com.google.common.base.Optional;
import com.google.common.collect.Maps;
+import com.google.common.eventbus.EventBus;
import com.linkedin.data.template.StringMap;
-import com.linkedin.r2.transport.common.Client;
-import com.linkedin.r2.transport.common.bridge.client.TransportClientAdapter;
import com.linkedin.r2.transport.http.client.HttpClientFactory;
-import com.linkedin.restli.client.RestClient;
import com.linkedin.restli.client.RestLiResponseException;
+import com.typesafe.config.Config;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.metastore.MysqlJobStatusStateStoreFactory;
import org.apache.gobblin.metastore.testing.ITestMetastoreDatabase;
import org.apache.gobblin.metastore.testing.TestMetastoreDatabaseFactory;
+import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
import org.apache.gobblin.service.FlowConfig;
import org.apache.gobblin.service.FlowConfigClient;
import org.apache.gobblin.service.FlowId;
import org.apache.gobblin.service.Schedule;
import org.apache.gobblin.service.ServiceConfigKeys;
+import org.apache.gobblin.service.modules.orchestration.DagManager;
import org.apache.gobblin.service.modules.utils.HelixUtils;
import org.apache.gobblin.service.monitoring.FsJobStatusRetriever;
+import org.apache.gobblin.testing.AssertWithBackoff;
import org.apache.gobblin.util.ConfigUtils;
@Test
@@ -162,12 +166,12 @@ public class GobblinServiceHATest {
node2ServiceCoreProperties.put(QUARTZ_THREAD_POOL_COUNT, 3);
// Start Node 1
- this.node1GobblinServiceManager = new
GobblinServiceManager("CoreService1", "1",
+ this.node1GobblinServiceManager = new
TestGobblinServiceManager("CoreService1", "1",
ConfigUtils.propertiesToConfig(node1ServiceCoreProperties),
Optional.of(new Path(NODE_1_SERVICE_WORK_DIR)));
this.node1GobblinServiceManager.start();
// Start Node 2
- this.node2GobblinServiceManager = new
GobblinServiceManager("CoreService2", "2",
+ this.node2GobblinServiceManager = new
TestGobblinServiceManager("CoreService2", "2",
ConfigUtils.propertiesToConfig(node2ServiceCoreProperties),
Optional.of(new Path(NODE_2_SERVICE_WORK_DIR)));
this.node2GobblinServiceManager.start();
@@ -546,6 +550,29 @@ public class GobblinServiceHATest {
Assert.assertTrue(assertSuccess, "New master should take over all old
master jobs.");
+ // Check eventbus was registered with new leader
+ AssertWithBackoff assertWithBackoff =
AssertWithBackoff.create().logger(LoggerFactory.getLogger("checkEventbusRegistered")).timeoutMs(20000);
+ assertWithBackoff.assertTrue(new com.google.common.base.Predicate<Void>() {
+ @Override
+ public boolean apply(@Nullable Void input) {
+ try {
+ Mockito.verify(secondary.getEventBus(),
Mockito.atLeastOnce()).register(secondary.getDagManager());
+ return true;
+ } catch (MockitoAssertionError e) {
+ return false;
+ }
+ }
+ }, "Checking eventBus was registered");
+
logger.info("+++++++++++++++++++ testKillNode END");
}
+
+ public class TestGobblinServiceManager extends GobblinServiceManager {
+ public TestGobblinServiceManager(String serviceName, String serviceId,
Config config, Optional<Path> serviceWorkDirOptional) throws Exception {
+ super(serviceName, serviceId, config, serviceWorkDirOptional);
+ this.isDagManagerEnabled = true;
+ this.eventBus = Mockito.mock(EventBus.class);
+ this.dagManager = Mockito.mock(DagManager.class);
+ }
+ }
}
\ No newline at end of file