This is an automated email from the ASF dual-hosted git repository.
suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 8dd5eb8 [GOBBLIN-781] Skeleton for GaaS DR mode clean transition
8dd5eb8 is described below
commit 8dd5eb8a503cf1f161e2b9b4bc693d9c967b9b86
Author: autumnust <[email protected]>
AuthorDate: Tue Jun 11 09:16:25 2019 -0700
[GOBBLIN-781] Skeleton for GaaS DR mode clean transition
Closes #2647 from autumnust/DRStateCleaning
---
.../apache/gobblin/service/ServiceConfigKeys.java | 3 --
.../apache/gobblin/runtime/api/SpecCatalog.java | 5 +-
.../org/apache/gobblin/runtime/api/SpecStore.java | 12 +++++
.../gobblin/runtime/spec_catalog/FlowCatalog.java | 56 +++++++++++++++-----
.../gobblin/runtime/spec_store/FSSpecStore.java | 5 ++
.../gobblin/runtime/spec_store/MysqlSpecStore.java | 60 +++++++++++++++-------
.../runtime/spec_store/MysqlSpecStoreTest.java | 32 ++++++++++++
.../modules/core/GobblinServiceManager.java | 26 +++++++---
.../modules/orchestration/Orchestrator.java | 3 +-
.../scheduler/GobblinServiceJobScheduler.java | 58 +++++++++++++++++----
10 files changed, 208 insertions(+), 52 deletions(-)
diff --git
a/gobblin-api/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java
b/gobblin-api/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java
index 3528cdb..c984b54 100644
---
a/gobblin-api/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java
+++
b/gobblin-api/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java
@@ -63,9 +63,6 @@ public class ServiceConfigKeys {
public static final String FLOW_SOURCE_IDENTIFIER_KEY =
"gobblin.flow.sourceIdentifier";
public static final String FLOW_DESTINATION_IDENTIFIER_KEY =
"gobblin.flow.destinationIdentifier";
- // Command line options
- public static final String SERVICE_NAME_OPTION_NAME = "service_name";
-
// Topology Factory Keys (for overall factory)
public static final String TOPOLOGY_FACTORY_PREFIX = "topologySpecFactory.";
public static final String DEFAULT_TOPOLOGY_SPEC_FACTORY =
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/SpecCatalog.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/SpecCatalog.java
index 05f63a0..d5c1e35 100644
---
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/SpecCatalog.java
+++
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/SpecCatalog.java
@@ -45,7 +45,10 @@ import org.apache.gobblin.util.ConfigUtils;
public interface SpecCatalog extends SpecCatalogListenersContainer,
Instrumentable, StandardMetricsBridge {
- /** Returns an immutable {@link Collection} of {@link Spec}s that are known
to the catalog. */
+ /**
+ * Returns an immutable {@link Collection} of {@link Spec}s that are known
to the catalog.
+ * This method should only be used for short list of {@link Spec}s,
otherwise it would risk overusing memory.
+ * */
Collection<Spec> getSpecs();
/** Metrics for the spec catalog; null if
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/SpecStore.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/SpecStore.java
index 5bfeba9..fa8a713 100644
---
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/SpecStore.java
+++
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/SpecStore.java
@@ -114,6 +114,18 @@ public interface SpecStore {
Iterator<URI> getSpecURIs() throws IOException;
/**
+ * Return an iterator of Spec URIS with certain tag.
+ * Tag can be an implementation details, but provide an example here with
{@link org.apache.gobblin.runtime.spec_store.MysqlSpecStore}:
+ * We could add Tag field in MySQL table, it stores value for convenience of
filtering in Mysql statement level:
+ * Select * from <TABLE> Where tag == ?
+ *
+ * This type of filtering will be needed when we want to opt-out some specs
in loading, or we want to only
+ * whitelist several specs in loading, etc.
+ *
+ */
+ Iterator<URI> getSpecURIsWithTag(String tag) throws IOException;
+
+ /**
* @return A URI to identify the SpecStore itself.
* e.g. For File-System based implementation of {@link SpecStore}, the URI
will be associated
* with root-level FileSystem directory.
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java
index 9b4c00c..5076092 100644
---
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java
+++
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java
@@ -144,8 +144,13 @@ public class FlowCatalog extends AbstractIdleService
implements SpecCatalog, Mut
/**************************************************/
protected void notifyAllListeners() {
- for (Spec spec : getSpecsWithTimeUpdate()) {
- this.listeners.onAddSpec(spec);
+ try {
+ Iterator<URI> uriIterator = getSpecURIs();
+ while (uriIterator.hasNext()) {
+ this.listeners.onAddSpec(getSpecWrapper(uriIterator.next()));
+ }
+ } catch (SpecSerDeException ssde) {
+ log.error("Cannot retrieve specs from catalog:", ssde);
}
}
@@ -155,9 +160,15 @@ public class FlowCatalog extends AbstractIdleService
implements SpecCatalog, Mut
this.listeners.addListener(specListener);
if (state() == State.RUNNING) {
- for (Spec spec : getSpecsWithTimeUpdate()) {
- SpecCatalogListener.AddSpecCallback addJobCallback = new
SpecCatalogListener.AddSpecCallback(spec);
- this.listeners.callbackOneListener(addJobCallback, specListener);
+ try {
+ Iterator<URI> uriIterator = getSpecURIs();
+ while (uriIterator.hasNext()) {
+ SpecCatalogListener.AddSpecCallback addJobCallback =
+ new
SpecCatalogListener.AddSpecCallback(getSpecWrapper(uriIterator.next()));
+ this.listeners.callbackOneListener(addJobCallback, specListener);
+ }
+ } catch (SpecSerDeException ssde) {
+ log.error("Cannot retrieve specs from catalog:", ssde);
}
}
}
@@ -219,9 +230,21 @@ public class FlowCatalog extends AbstractIdleService
implements SpecCatalog, Mut
}
}
+ public Iterator<URI> getSpecURISWithTag(String tag) throws
SpecSerDeException {
+ try {
+ return specStore.getSpecURIsWithTag(tag);
+ } catch (IOException ioe) {
+ throw new SpecSerDeException( String.format("Cannot retrieve Specs' URI
with tag %s from Spec Store", tag),
+ specStore.getSpecStoreURI().get(), ioe);
+ }
+ }
+
/**
* Get all specs from {@link SpecStore}
+ * Not suggested for {@link FlowCatalog} where the total amount of space
that all {@link FlowSpec}s occupied
+ * would be large and loading process is slow.
*/
+ @Deprecated
@Override
public Collection<Spec> getSpecs() {
try {
@@ -232,13 +255,6 @@ public class FlowCatalog extends AbstractIdleService
implements SpecCatalog, Mut
}
}
- public Collection<Spec> getSpecsWithTimeUpdate() {
- long startTime = System.currentTimeMillis();
- Collection<Spec> specs = this.getSpecs();
- this.metrics.updateGetSpecTime(startTime);
- return specs;
- }
-
public boolean exists(URI uri) {
try {
return specStore.exists(uri);
@@ -257,6 +273,22 @@ public class FlowCatalog extends AbstractIdleService
implements SpecCatalog, Mut
}
/**
+ * A wrapper of getSpec that handles {@link SpecNotFoundException} properly.
+ * This is the most common way to fetch {@link Spec}. For customized way to
deal with exception, one will
+ * need to implement specific catch-block logic.
+ */
+ public Spec getSpecWrapper(URI uri) {
+ Spec spec = null;
+ try {
+ spec = getSpec(uri);
+ } catch (SpecNotFoundException snfe) {
+ log.error(String.format("The URI %s discovered in SpecStore is missing
in FlowCatlog"
+ + ", suspecting current modification on SpecStore", uri), snfe);
+ }
+ return spec;
+ }
+
+ /**
* Persist {@link Spec} into {@link SpecStore} and notify {@link
SpecCatalogListener} if triggerListener
* is set to true.
*
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_store/FSSpecStore.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_store/FSSpecStore.java
index 344901b..feee4a9 100644
---
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_store/FSSpecStore.java
+++
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_store/FSSpecStore.java
@@ -264,6 +264,11 @@ public class FSSpecStore implements SpecStore {
}
@Override
+ public Iterator<URI> getSpecURIsWithTag(String tag) throws IOException {
+ throw new UnsupportedOperationException("Loading specs with tag is not
supported in FS-Implementation of SpecStore");
+ }
+
+ @Override
public Optional<URI> getSpecStoreURI() {
return Optional.of(this.fsSpecStoreDirPath.toUri());
}
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_store/MysqlSpecStore.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_store/MysqlSpecStore.java
index 6206a3f..f2c1fd9 100644
---
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_store/MysqlSpecStore.java
+++
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_store/MysqlSpecStore.java
@@ -47,32 +47,36 @@ import org.apache.gobblin.runtime.api.Spec;
import org.apache.gobblin.runtime.api.SpecNotFoundException;
import org.apache.gobblin.runtime.api.SpecSerDe;
import org.apache.gobblin.runtime.api.SpecStore;
-import org.apache.gobblin.util.ConfigUtils;
+
/**
* Implementation of {@link SpecStore} that stores specs as serialized java
objects in MySQL. Note that versions are not
* supported, so the version parameter will be ignored in methods that have it.
+ *
+ * A tag column is added into implementation to serve certain filtering
purposes in MySQL-based SpecStore.
+ * For example, in DR mode of GaaS, we would only want certain {@link Spec}s
to be eligible for orchestrated
+ * by alternative GaaS instances. Another example is whitelisting/blacklisting
{@link Spec}s temporarily
+ * but not removing it from {@link SpecStore}.
*/
@Slf4j
public class MysqlSpecStore implements SpecStore {
public static final String CONFIG_PREFIX = "mysqlSpecStore";
- public static final String SPEC_STORE_SOURCE = "source";
- public static final String DEFAULT_SPEC_STORE_SOURCE = "default_source";
+ public static final String DEFAULT_TAG_VALUE = "";
private static final String CREATE_TABLE_STATEMENT =
- "CREATE TABLE IF NOT EXISTS %s (spec_uri VARCHAR(128) NOT NULL,
spec_source VARCHAR(128) NOT NULL, spec LONGBLOB, PRIMARY KEY (spec_uri,
spec_source))";
+ "CREATE TABLE IF NOT EXISTS %s (spec_uri VARCHAR(128) NOT NULL, tag
VARCHAR(128) NOT NULL, spec LONGBLOB, PRIMARY KEY (spec_uri))";
private static final String EXISTS_STATEMENT = "SELECT EXISTS(SELECT * FROM
%s WHERE spec_uri = ?)";
- private static final String INSERT_STATEMENT = "INSERT INTO %s (spec_uri,
spec_source, spec) VALUES (?, ?, ?) ON DUPLICATE KEY UPDATE spec =
VALUES(spec)";
+ private static final String INSERT_STATEMENT = "INSERT INTO %s (spec_uri,
tag, spec) VALUES (?, ?, ?) ON DUPLICATE KEY UPDATE spec = VALUES(spec)";
private static final String DELETE_STATEMENT = "DELETE FROM %s WHERE
spec_uri = ?";
private static final String GET_STATEMENT = "SELECT spec FROM %s WHERE
spec_uri = ?";
private static final String GET_ALL_STATEMENT = "SELECT spec_uri, spec FROM
%s";
+ private static final String GET_ALL_STATEMENT_WITH_TAG = "SELECT spec_uri,
spec FROM %s WHERE tag = ?";
private final DataSource dataSource;
private final String tableName;
private final URI specStoreURI;
private final SpecSerDe specSerDe;
- private final String specStoreSource;
public MysqlSpecStore(Config config, SpecSerDe specSerDe) throws IOException
{
if (config.hasPath(CONFIG_PREFIX)) {
@@ -83,7 +87,6 @@ public class MysqlSpecStore implements SpecStore {
this.tableName =
config.getString(ConfigurationKeys.STATE_STORE_DB_TABLE_KEY);
this.specStoreURI =
URI.create(config.getString(ConfigurationKeys.STATE_STORE_DB_URL_KEY));
this.specSerDe = specSerDe;
- this.specStoreSource = ConfigUtils.getString(config, SPEC_STORE_SOURCE,
DEFAULT_SPEC_STORE_SOURCE);
try (Connection connection = this.dataSource.getConnection();
PreparedStatement statement =
connection.prepareStatement(String.format(CREATE_TABLE_STATEMENT,
this.tableName))) {
@@ -109,11 +112,18 @@ public class MysqlSpecStore implements SpecStore {
@Override
public void addSpec(Spec spec) throws IOException {
+ this.addSpec(spec, DEFAULT_TAG_VALUE);
+ }
+
+ /**
+ * Temporarily only used for testing since tag it not exposed in endpoint of
{@link org.apache.gobblin.runtime.api.FlowSpec}
+ */
+ public void addSpec(Spec spec, String tagValue) throws IOException{
try (Connection connection = this.dataSource.getConnection();
PreparedStatement statement =
connection.prepareStatement(String.format(INSERT_STATEMENT, this.tableName))) {
statement.setString(1, spec.getUri().toString());
- statement.setString(2, this.specStoreSource);
+ statement.setString(2, tagValue);
statement.setBlob(3, new
ByteArrayInputStream(this.specSerDe.serialize(spec)));
statement.executeUpdate();
@@ -210,22 +220,36 @@ public class MysqlSpecStore implements SpecStore {
public Iterator<URI> getSpecURIs() throws IOException {
try (Connection connection = this.dataSource.getConnection();
PreparedStatement statement =
connection.prepareStatement(String.format(GET_ALL_STATEMENT, this.tableName))) {
+ return getURIIteratorByQuery(statement);
+ } catch (SQLException e) {
+ throw new IOException(e);
+ }
+ }
- List<URI> specs = new ArrayList<>();
-
- try (ResultSet rs = statement.executeQuery()) {
- while (rs.next()) {
- URI specURI = URI.create(rs.getString(1));
- specs.add(specURI);
- }
- }
-
- return specs.iterator();
+ @Override
+ public Iterator<URI> getSpecURIsWithTag(String tag) throws IOException {
+ try (Connection connection = this.dataSource.getConnection();
+ PreparedStatement statement =
connection.prepareStatement(String.format(GET_ALL_STATEMENT_WITH_TAG,
this.tableName))) {
+ statement.setString(1, tag);
+ return getURIIteratorByQuery(statement);
} catch (SQLException e) {
throw new IOException(e);
}
}
+ private Iterator<URI> getURIIteratorByQuery(PreparedStatement statement)
throws SQLException {
+ List<URI> specs = new ArrayList<>();
+
+ try (ResultSet rs = statement.executeQuery()) {
+ while (rs.next()) {
+ URI specURI = URI.create(rs.getString(1));
+ specs.add(specURI);
+ }
+ }
+
+ return specs.iterator();
+ }
+
@Override
public Optional<URI> getSpecStoreURI() {
return Optional.of(this.specStoreURI);
diff --git
a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/spec_store/MysqlSpecStoreTest.java
b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/spec_store/MysqlSpecStoreTest.java
index 4d3c89d..50c4aa3 100644
---
a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/spec_store/MysqlSpecStoreTest.java
+++
b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/spec_store/MysqlSpecStoreTest.java
@@ -18,9 +18,11 @@
package org.apache.gobblin.runtime.spec_store;
import java.net.URI;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
+import java.util.List;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.commons.lang3.SerializationUtils;
import org.testng.Assert;
@@ -106,6 +108,36 @@ public class MysqlSpecStoreTest {
}
@Test
+ public void testGetSpecWithTag() throws Exception {
+
+ //Creating and inserting flowspecs with tags
+ URI uri4 = URI.create("flowspec4");
+ FlowSpec flowSpec4 = FlowSpec.builder(uri4)
+ .withConfig(ConfigBuilder.create().addPrimitive("key4",
"value4").build())
+ .withDescription("Test flow spec 4")
+ .withVersion("Test version 4")
+ .build();
+
+ URI uri5 = URI.create("flowspec5");
+ FlowSpec flowSpec5 = FlowSpec.builder(uri5)
+ .withConfig(ConfigBuilder.create().addPrimitive("key5",
"value5").build())
+ .withDescription("Test flow spec 5")
+ .withVersion("Test version 5")
+ .build();
+
+ this.specStore.addSpec(flowSpec3);
+ this.specStore.addSpec(flowSpec4, "dr");
+ this.specStore.addSpec(flowSpec5, "dr");
+
+ Assert.assertTrue(this.specStore.exists(uri3));
+ Assert.assertTrue(this.specStore.exists(uri4));
+ Assert.assertTrue(this.specStore.exists(uri5));
+ List<URI> result = new ArrayList();
+ this.specStore.getSpecURIsWithTag("dr").forEachRemaining(result::add);
+ Assert.assertEquals(result.size(), 2);
+ }
+
+ @Test
public void testGetCorruptedSpec() throws Exception {
this.specStore.addSpec(this.flowSpec1);
this.specStore.addSpec(this.flowSpec2);
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java
index f0419ea..05e5143 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java
@@ -17,6 +17,7 @@
package org.apache.gobblin.service.modules.core;
+import com.typesafe.config.ConfigValueFactory;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.net.URI;
@@ -104,6 +105,11 @@ import
org.apache.gobblin.util.reflection.GobblinConstructorUtils;
@Alpha
public class GobblinServiceManager implements ApplicationLauncher,
StandardMetricsBridge {
+ // Command line options
+ // These two options are required to launch GobblinServiceManager.
+ public static final String SERVICE_NAME_OPTION_NAME = "service_name";
+ public static final String SERVICE_ID_OPTION_NAME = "service_id";
+
private static final Logger LOGGER =
LoggerFactory.getLogger(GobblinServiceManager.class);
private static final String JOB_STATUS_RETRIEVER_CLASS_KEY =
"jobStatusRetriever.class";
@@ -163,9 +169,8 @@ public class GobblinServiceManager implements
ApplicationLauncher, StandardMetri
if (!properties.contains(ServiceBasedAppLauncher.APP_STOP_TIME_SECONDS)) {
properties.setProperty(ServiceBasedAppLauncher.APP_STOP_TIME_SECONDS,
Long.toString(300));
}
- this.config = config;
this.metricContext =
Instrumented.getMetricContext(ConfigUtils.configToState(config),
this.getClass());
- this.metrics = new Metrics(this.metricContext, this.config);
+ this.metrics = new Metrics(this.metricContext, config);
this.serviceName = serviceName;
this.serviceId = serviceId;
this.serviceLauncher = new ServiceBasedAppLauncher(properties,
serviceName);
@@ -544,13 +549,15 @@ public class GobblinServiceManager implements
ApplicationLauncher, StandardMetri
}
}
- private static String getServiceId() {
- return "1";
+ private static String getServiceId(CommandLine cmd) {
+ return cmd.getOptionValue(SERVICE_ID_OPTION_NAME) == null ? "1" :
cmd.getOptionValue(SERVICE_ID_OPTION_NAME);
}
private static Options buildOptions() {
Options options = new Options();
- options.addOption("a", ServiceConfigKeys.SERVICE_NAME_OPTION_NAME, true,
"Gobblin application name");
+ options.addOption("a", SERVICE_NAME_OPTION_NAME, true, "Gobblin Service
application's name");
+ options.addOption("i", SERVICE_ID_OPTION_NAME, true, "Gobblin Service
application's ID, "
+ + "this needs to be globally unique");
return options;
}
@@ -563,11 +570,16 @@ public class GobblinServiceManager implements
ApplicationLauncher, StandardMetri
Options options = buildOptions();
try {
CommandLine cmd = new DefaultParser().parse(options, args);
- if (!cmd.hasOption(ServiceConfigKeys.SERVICE_NAME_OPTION_NAME)) {
+ if (!cmd.hasOption(SERVICE_NAME_OPTION_NAME)) {
printUsage(options);
System.exit(1);
}
+ if (!cmd.hasOption(SERVICE_ID_OPTION_NAME)) {
+ printUsage(options);
+ LOGGER.warn("Please assign globally unique ID for a
GobblinServiceManager instance, or it will use default ID");
+ }
+
boolean isTestMode = false;
if (cmd.hasOption("test_mode")) {
isTestMode = Boolean.parseBoolean(cmd.getOptionValue("test_mode",
"false"));
@@ -575,7 +587,7 @@ public class GobblinServiceManager implements
ApplicationLauncher, StandardMetri
Config config = ConfigFactory.load();
try (GobblinServiceManager gobblinServiceManager = new
GobblinServiceManager(
- cmd.getOptionValue(ServiceConfigKeys.SERVICE_NAME_OPTION_NAME),
getServiceId(),
+ cmd.getOptionValue(SERVICE_NAME_OPTION_NAME), getServiceId(cmd),
config, Optional.<Path>absent())) {
gobblinServiceManager.getOrchestrator().setFlowStatusGenerator(gobblinServiceManager.buildFlowStatusGenerator(config));
gobblinServiceManager.start();
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
index f3a4f06..c371b51 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
@@ -318,7 +318,8 @@ public class Orchestrator implements SpecCatalogListener,
Instrumentable {
}
/**
- * Check if the flow instance is allowed to run.
+ * Check if a FlowSpec instance is allowed to run.
+ *
* @param flowName
* @param flowGroup
* @param allowConcurrentExecution
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
index fde21bf..e14ce4c 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
@@ -35,7 +35,6 @@ import org.apache.gobblin.runtime.JobException;
import org.apache.gobblin.runtime.api.FlowSpec;
import org.apache.gobblin.runtime.api.Spec;
import org.apache.gobblin.runtime.api.SpecCatalogListener;
-import org.apache.gobblin.runtime.api.SpecNotFoundException;
import org.apache.gobblin.runtime.api.SpecSerDeException;
import org.apache.gobblin.runtime.listeners.JobListener;
import org.apache.gobblin.runtime.spec_catalog.AddSpecResponse;
@@ -61,6 +60,8 @@ import org.quartz.UnableToInterruptJobException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import static org.apache.gobblin.service.ServiceConfigKeys.*;
+
/**
* An extension to {@link JobScheduler} that is also a {@link
SpecCatalogListener}.
@@ -69,6 +70,11 @@ import org.slf4j.LoggerFactory;
*/
@Alpha
public class GobblinServiceJobScheduler extends JobScheduler implements
SpecCatalogListener {
+
+ // Scheduler related configuration
+ // A boolean function indicating if current instance will handle DR traffic
or not.
+ public static final String GOBBLIN_SERVICE_SCHEDULER_DR_NOMINATED =
GOBBLIN_SERVICE_PREFIX + "drNominatedInstance";
+
protected final Logger _log;
protected final Optional<FlowCatalog> flowCatalog;
@@ -80,6 +86,21 @@ public class GobblinServiceJobScheduler extends JobScheduler
implements SpecCata
private volatile boolean isActive;
private String serviceName;
+ /**
+ * If current instances is nominated as a handler for DR traffic from down
GaaS-Instance.
+ * Note this is, currently, different from leadership change/fail-over
handling, where the traffice could come
+ * from GaaS instance out of current GaaS Cluster:
+ * e.g. There are multi-datacenter deployment of GaaS Cluster.
Intra-datacenter fail-over could be handled by
+ * leadership change mechanism, while inter-datacenter fail-over would be
handled by DR handling mechanism.
+ */
+ private boolean isNominatedDRHandler;
+
+ /**
+ * Use this to tag all DR-applicable FlowSpec entries in {@link
org.apache.gobblin.runtime.api.SpecStore}
+ * so only they would be loaded during DR handling.
+ */
+ public static final String DR_FILTER_TAG = "dr";
+
public GobblinServiceJobScheduler(String serviceName, Config config,
Optional<HelixManager> helixManager,
Optional<FlowCatalog> flowCatalog, Optional<TopologyCatalog>
topologyCatalog, Orchestrator orchestrator,
SchedulerService schedulerService, Optional<Logger> log) throws
Exception {
@@ -91,6 +112,8 @@ public class GobblinServiceJobScheduler extends JobScheduler
implements SpecCata
this.helixManager = helixManager;
this.orchestrator = orchestrator;
this.scheduledFlowSpecs = Maps.newHashMap();
+ this.isNominatedDRHandler =
config.hasPath(GOBBLIN_SERVICE_SCHEDULER_DR_NOMINATED)
+ && config.hasPath(GOBBLIN_SERVICE_SCHEDULER_DR_NOMINATED);
}
public GobblinServiceJobScheduler(String serviceName, Config config,
Optional<HelixManager> helixManager,
@@ -135,6 +158,7 @@ public class GobblinServiceJobScheduler extends
JobScheduler implements SpecCata
* Load all {@link FlowSpec}s from {@link FlowCatalog} as one of the
initialization step,
* and make schedulers be aware of that.
*
+ * If it is newly brought up as the DR handler, will load additional
FlowSpecs and handle transition properly.
*/
private void scheduleSpecsFromCatalog() {
Iterator<URI> specUris = null;
@@ -142,21 +166,22 @@ public class GobblinServiceJobScheduler extends
JobScheduler implements SpecCata
try {
specUris = this.flowCatalog.get().getSpecURIs();
+
+ // If current instances nominated as DR handler, will take additional
URIS from FlowCatalog.
+ if (isNominatedDRHandler) {
+ // Synchronously cleaning the execution state for DR-applicable
FlowSpecs
+ // before rescheduling the again in nominated DR-Hanlder.
+ Iterator<URI> drUris =
this.flowCatalog.get().getSpecURISWithTag(DR_FILTER_TAG);
+ clearRunningFlowState(drUris);
+ }
+
} catch (SpecSerDeException ssde) {
throw new RuntimeException("Failed to get the iterator of all Spec
URIS", ssde);
}
-
try {
while (specUris.hasNext()) {
- Spec spec = null;
- try {
- spec = this.flowCatalog.get().getSpec(specUris.next());
- } catch (SpecNotFoundException snfe) {
- _log.error(String.format("The URI %s discovered in SpecStore is
missing in FlowCatlog"
- + ", suspecting current modification on SpecStore",
specUris.next()), snfe);
- }
-
+ Spec spec = this.flowCatalog.get().getSpecWrapper(specUris.next());
//Disable FLOW_RUN_IMMEDIATELY on service startup or leadership change
if (spec instanceof FlowSpec) {
Spec modifiedSpec = disableFlowRunImmediatelyOnStart((FlowSpec)
spec);
@@ -170,6 +195,19 @@ public class GobblinServiceJobScheduler extends
JobScheduler implements SpecCata
}
}
+ /**
+ * In DR-mode, the running {@link FlowSpec} will all be cancelled and
rescheduled.
+ * We will need to make sure that running {@link FlowSpec}s' state are
cleared, and corresponding running jobs are
+ * killed before rescheduling them.
+ * @param drUris The uris that applicable for DR discovered from FlowCatalog.
+ */
+ private void clearRunningFlowState(Iterator<URI> drUris) {
+ while (drUris.hasNext()) {
+ // TODO: Instead of simply call onDeleteSpec, a callback when FlowSpec
is deleted from FlowCatalog, should also kill Azkaban Flow from
AzkabanSpecProducer.
+ onDeleteSpec(drUris.next(), FlowSpec.Builder.DEFAULT_VERSION);
+ }
+ }
+
@VisibleForTesting
protected static Spec disableFlowRunImmediatelyOnStart(FlowSpec spec) {
Properties properties = spec.getConfigAsProperties();