This is an automated email from the ASF dual-hosted git repository.

suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 8dd5eb8  [GOBBLIN-781] Skeleton for GaaS DR mode clean transition
8dd5eb8 is described below

commit 8dd5eb8a503cf1f161e2b9b4bc693d9c967b9b86
Author: autumnust <[email protected]>
AuthorDate: Tue Jun 11 09:16:25 2019 -0700

    [GOBBLIN-781] Skeleton for GaaS DR mode clean transition
    
    Closes #2647 from autumnust/DRStateCleaning
---
 .../apache/gobblin/service/ServiceConfigKeys.java  |  3 --
 .../apache/gobblin/runtime/api/SpecCatalog.java    |  5 +-
 .../org/apache/gobblin/runtime/api/SpecStore.java  | 12 +++++
 .../gobblin/runtime/spec_catalog/FlowCatalog.java  | 56 +++++++++++++++-----
 .../gobblin/runtime/spec_store/FSSpecStore.java    |  5 ++
 .../gobblin/runtime/spec_store/MysqlSpecStore.java | 60 +++++++++++++++-------
 .../runtime/spec_store/MysqlSpecStoreTest.java     | 32 ++++++++++++
 .../modules/core/GobblinServiceManager.java        | 26 +++++++---
 .../modules/orchestration/Orchestrator.java        |  3 +-
 .../scheduler/GobblinServiceJobScheduler.java      | 58 +++++++++++++++++----
 10 files changed, 208 insertions(+), 52 deletions(-)

diff --git 
a/gobblin-api/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java 
b/gobblin-api/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java
index 3528cdb..c984b54 100644
--- 
a/gobblin-api/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java
+++ 
b/gobblin-api/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java
@@ -63,9 +63,6 @@ public class ServiceConfigKeys {
   public static final String FLOW_SOURCE_IDENTIFIER_KEY = 
"gobblin.flow.sourceIdentifier";
   public static final String FLOW_DESTINATION_IDENTIFIER_KEY = 
"gobblin.flow.destinationIdentifier";
 
-  // Command line options
-  public static final String SERVICE_NAME_OPTION_NAME = "service_name";
-
   // Topology Factory Keys (for overall factory)
   public static final String TOPOLOGY_FACTORY_PREFIX = "topologySpecFactory.";
   public static final String DEFAULT_TOPOLOGY_SPEC_FACTORY =
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/SpecCatalog.java 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/SpecCatalog.java
index 05f63a0..d5c1e35 100644
--- 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/SpecCatalog.java
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/SpecCatalog.java
@@ -45,7 +45,10 @@ import org.apache.gobblin.util.ConfigUtils;
 
 
 public interface SpecCatalog extends SpecCatalogListenersContainer, 
Instrumentable, StandardMetricsBridge {
-  /** Returns an immutable {@link Collection} of {@link Spec}s that are known 
to the catalog. */
+  /**
+   * Returns an immutable {@link Collection} of {@link Spec}s that are known 
to the catalog.
+   * This method should only be used for short list of {@link Spec}s, 
otherwise it would risk overusing memory.
+   * */
   Collection<Spec> getSpecs();
 
   /** Metrics for the spec catalog; null if
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/SpecStore.java 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/SpecStore.java
index 5bfeba9..fa8a713 100644
--- 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/SpecStore.java
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/SpecStore.java
@@ -114,6 +114,18 @@ public interface SpecStore {
   Iterator<URI> getSpecURIs() throws IOException;
 
   /**
+   * Return an iterator of Spec URIS with certain tag.
+   * Tag can be an implementation details, but provide an example here with 
{@link org.apache.gobblin.runtime.spec_store.MysqlSpecStore}:
+   * We could add Tag field in MySQL table, it stores value for convenience of 
filtering in Mysql statement level:
+   * Select * from <TABLE> Where tag == ?
+   *
+   * This type of filtering will be needed when we want to opt-out some specs 
in loading, or we want to only
+   * whitelist several specs in loading, etc.
+   *
+   */
+  Iterator<URI> getSpecURIsWithTag(String tag) throws IOException;
+
+  /**
    * @return A URI to identify the SpecStore itself.
    * e.g. For File-System based implementation of {@link SpecStore}, the URI 
will be associated
    * with root-level FileSystem directory.
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java
 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java
index 9b4c00c..5076092 100644
--- 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java
@@ -144,8 +144,13 @@ public class FlowCatalog extends AbstractIdleService 
implements SpecCatalog, Mut
    /**************************************************/
 
   protected void notifyAllListeners() {
-    for (Spec spec : getSpecsWithTimeUpdate()) {
-      this.listeners.onAddSpec(spec);
+    try {
+      Iterator<URI> uriIterator = getSpecURIs();
+      while (uriIterator.hasNext()) {
+        this.listeners.onAddSpec(getSpecWrapper(uriIterator.next()));
+      }
+    } catch (SpecSerDeException ssde) {
+      log.error("Cannot retrieve specs from catalog:", ssde);
     }
   }
 
@@ -155,9 +160,15 @@ public class FlowCatalog extends AbstractIdleService 
implements SpecCatalog, Mut
     this.listeners.addListener(specListener);
 
     if (state() == State.RUNNING) {
-      for (Spec spec : getSpecsWithTimeUpdate()) {
-        SpecCatalogListener.AddSpecCallback addJobCallback = new 
SpecCatalogListener.AddSpecCallback(spec);
-        this.listeners.callbackOneListener(addJobCallback, specListener);
+      try {
+        Iterator<URI> uriIterator = getSpecURIs();
+        while (uriIterator.hasNext()) {
+          SpecCatalogListener.AddSpecCallback addJobCallback =
+              new 
SpecCatalogListener.AddSpecCallback(getSpecWrapper(uriIterator.next()));
+          this.listeners.callbackOneListener(addJobCallback, specListener);
+        }
+      } catch (SpecSerDeException ssde) {
+        log.error("Cannot retrieve specs from catalog:", ssde);
       }
     }
   }
@@ -219,9 +230,21 @@ public class FlowCatalog extends AbstractIdleService 
implements SpecCatalog, Mut
     }
   }
 
+  public Iterator<URI> getSpecURISWithTag(String tag) throws 
SpecSerDeException {
+    try {
+      return specStore.getSpecURIsWithTag(tag);
+    } catch (IOException ioe) {
+      throw new SpecSerDeException( String.format("Cannot retrieve Specs' URI 
with tag %s from Spec Store", tag),
+          specStore.getSpecStoreURI().get(), ioe);
+    }
+  }
+
   /**
    * Get all specs from {@link SpecStore}
+   * Not suggested for {@link FlowCatalog} where the total amount of space 
that all {@link FlowSpec}s occupied
+   * would be large and loading process is slow.
    */
+  @Deprecated
   @Override
   public Collection<Spec> getSpecs() {
     try {
@@ -232,13 +255,6 @@ public class FlowCatalog extends AbstractIdleService 
implements SpecCatalog, Mut
     }
   }
 
-  public Collection<Spec> getSpecsWithTimeUpdate() {
-    long startTime = System.currentTimeMillis();
-    Collection<Spec> specs = this.getSpecs();
-    this.metrics.updateGetSpecTime(startTime);
-    return specs;
-  }
-
   public boolean exists(URI uri) {
     try {
       return specStore.exists(uri);
@@ -257,6 +273,22 @@ public class FlowCatalog extends AbstractIdleService 
implements SpecCatalog, Mut
   }
 
   /**
+   * A wrapper of getSpec that handles {@link SpecNotFoundException} properly.
+   * This is the most common way to fetch {@link Spec}. For customized way to 
deal with exception, one will
+   * need to implement specific catch-block logic.
+   */
+  public Spec getSpecWrapper(URI uri) {
+    Spec spec = null;
+    try {
+      spec = getSpec(uri);
+    } catch (SpecNotFoundException snfe) {
+      log.error(String.format("The URI %s discovered in SpecStore is missing 
in FlowCatlog"
+          + ", suspecting current modification on SpecStore", uri), snfe);
+    }
+    return spec;
+  }
+
+  /**
    * Persist {@link Spec} into {@link SpecStore} and notify {@link 
SpecCatalogListener} if triggerListener
    * is set to true.
    *
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_store/FSSpecStore.java
 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_store/FSSpecStore.java
index 344901b..feee4a9 100644
--- 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_store/FSSpecStore.java
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_store/FSSpecStore.java
@@ -264,6 +264,11 @@ public class FSSpecStore implements SpecStore {
   }
 
   @Override
+  public Iterator<URI> getSpecURIsWithTag(String tag) throws IOException {
+    throw new UnsupportedOperationException("Loading specs with tag is not 
supported in FS-Implementation of SpecStore");
+  }
+
+  @Override
   public Optional<URI> getSpecStoreURI() {
     return Optional.of(this.fsSpecStoreDirPath.toUri());
   }
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_store/MysqlSpecStore.java
 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_store/MysqlSpecStore.java
index 6206a3f..f2c1fd9 100644
--- 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_store/MysqlSpecStore.java
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_store/MysqlSpecStore.java
@@ -47,32 +47,36 @@ import org.apache.gobblin.runtime.api.Spec;
 import org.apache.gobblin.runtime.api.SpecNotFoundException;
 import org.apache.gobblin.runtime.api.SpecSerDe;
 import org.apache.gobblin.runtime.api.SpecStore;
-import org.apache.gobblin.util.ConfigUtils;
+
 
 
 /**
  * Implementation of {@link SpecStore} that stores specs as serialized java 
objects in MySQL. Note that versions are not
  * supported, so the version parameter will be ignored in methods that have it.
+ *
+ * A tag column is added into implementation to serve certain filtering 
purposes in MySQL-based SpecStore.
+ * For example, in DR mode of GaaS, we would only want certain {@link Spec}s 
to be eligible for orchestrated
+ * by alternative GaaS instances. Another example is whitelisting/blacklisting 
{@link Spec}s temporarily
+ * but not removing it from {@link SpecStore}.
  */
 @Slf4j
 public class MysqlSpecStore implements SpecStore {
   public static final String CONFIG_PREFIX = "mysqlSpecStore";
-  public static final String SPEC_STORE_SOURCE = "source";
-  public static final String DEFAULT_SPEC_STORE_SOURCE = "default_source";
+  public static final String DEFAULT_TAG_VALUE = "";
 
   private static final String CREATE_TABLE_STATEMENT =
-      "CREATE TABLE IF NOT EXISTS %s (spec_uri VARCHAR(128) NOT NULL, 
spec_source VARCHAR(128) NOT NULL, spec LONGBLOB, PRIMARY KEY (spec_uri, 
spec_source))";
+      "CREATE TABLE IF NOT EXISTS %s (spec_uri VARCHAR(128) NOT NULL, tag 
VARCHAR(128) NOT NULL, spec LONGBLOB, PRIMARY KEY (spec_uri))";
   private static final String EXISTS_STATEMENT = "SELECT EXISTS(SELECT * FROM 
%s WHERE spec_uri = ?)";
-  private static final String INSERT_STATEMENT = "INSERT INTO %s (spec_uri, 
spec_source, spec) VALUES (?, ?, ?) ON DUPLICATE KEY UPDATE spec = 
VALUES(spec)";
+  private static final String INSERT_STATEMENT = "INSERT INTO %s (spec_uri, 
tag, spec) VALUES (?, ?, ?) ON DUPLICATE KEY UPDATE spec = VALUES(spec)";
   private static final String DELETE_STATEMENT = "DELETE FROM %s WHERE 
spec_uri = ?";
   private static final String GET_STATEMENT = "SELECT spec FROM %s WHERE 
spec_uri = ?";
   private static final String GET_ALL_STATEMENT = "SELECT spec_uri, spec FROM 
%s";
+  private static final String GET_ALL_STATEMENT_WITH_TAG = "SELECT spec_uri, 
spec FROM %s WHERE tag = ?";
 
   private final DataSource dataSource;
   private final String tableName;
   private final URI specStoreURI;
   private final SpecSerDe specSerDe;
-  private final String specStoreSource;
 
   public MysqlSpecStore(Config config, SpecSerDe specSerDe) throws IOException 
{
     if (config.hasPath(CONFIG_PREFIX)) {
@@ -83,7 +87,6 @@ public class MysqlSpecStore implements SpecStore {
     this.tableName = 
config.getString(ConfigurationKeys.STATE_STORE_DB_TABLE_KEY);
     this.specStoreURI = 
URI.create(config.getString(ConfigurationKeys.STATE_STORE_DB_URL_KEY));
     this.specSerDe = specSerDe;
-    this.specStoreSource = ConfigUtils.getString(config, SPEC_STORE_SOURCE, 
DEFAULT_SPEC_STORE_SOURCE);
 
     try (Connection connection = this.dataSource.getConnection();
         PreparedStatement statement = 
connection.prepareStatement(String.format(CREATE_TABLE_STATEMENT, 
this.tableName))) {
@@ -109,11 +112,18 @@ public class MysqlSpecStore implements SpecStore {
 
   @Override
   public void addSpec(Spec spec) throws IOException {
+    this.addSpec(spec, DEFAULT_TAG_VALUE);
+  }
+
+  /**
+   * Temporarily only used for testing since tag it not exposed in endpoint of 
{@link org.apache.gobblin.runtime.api.FlowSpec}
+   */
+  public void addSpec(Spec spec, String tagValue) throws IOException{
     try (Connection connection = this.dataSource.getConnection();
         PreparedStatement statement = 
connection.prepareStatement(String.format(INSERT_STATEMENT, this.tableName))) {
 
       statement.setString(1, spec.getUri().toString());
-      statement.setString(2, this.specStoreSource);
+      statement.setString(2, tagValue);
       statement.setBlob(3, new 
ByteArrayInputStream(this.specSerDe.serialize(spec)));
       statement.executeUpdate();
 
@@ -210,22 +220,36 @@ public class MysqlSpecStore implements SpecStore {
   public Iterator<URI> getSpecURIs() throws IOException {
     try (Connection connection = this.dataSource.getConnection();
         PreparedStatement statement = 
connection.prepareStatement(String.format(GET_ALL_STATEMENT, this.tableName))) {
+      return getURIIteratorByQuery(statement);
+    } catch (SQLException e) {
+      throw new IOException(e);
+    }
+  }
 
-      List<URI> specs = new ArrayList<>();
-
-      try (ResultSet rs = statement.executeQuery()) {
-        while (rs.next()) {
-          URI specURI = URI.create(rs.getString(1));
-          specs.add(specURI);
-        }
-      }
-
-      return specs.iterator();
+  @Override
+  public Iterator<URI> getSpecURIsWithTag(String tag) throws IOException {
+    try (Connection connection = this.dataSource.getConnection();
+        PreparedStatement statement = 
connection.prepareStatement(String.format(GET_ALL_STATEMENT_WITH_TAG, 
this.tableName))) {
+      statement.setString(1, tag);
+      return getURIIteratorByQuery(statement);
     } catch (SQLException e) {
       throw new IOException(e);
     }
   }
 
+  private Iterator<URI> getURIIteratorByQuery(PreparedStatement statement) 
throws SQLException {
+    List<URI> specs = new ArrayList<>();
+
+    try (ResultSet rs = statement.executeQuery()) {
+      while (rs.next()) {
+        URI specURI = URI.create(rs.getString(1));
+        specs.add(specURI);
+      }
+    }
+
+    return specs.iterator();
+  }
+
   @Override
   public Optional<URI> getSpecStoreURI() {
     return Optional.of(this.specStoreURI);
diff --git 
a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/spec_store/MysqlSpecStoreTest.java
 
b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/spec_store/MysqlSpecStoreTest.java
index 4d3c89d..50c4aa3 100644
--- 
a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/spec_store/MysqlSpecStoreTest.java
+++ 
b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/spec_store/MysqlSpecStoreTest.java
@@ -18,9 +18,11 @@
 package org.apache.gobblin.runtime.spec_store;
 
 import java.net.URI;
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Iterator;
 
+import java.util.List;
 import org.apache.commons.lang3.ArrayUtils;
 import org.apache.commons.lang3.SerializationUtils;
 import org.testng.Assert;
@@ -106,6 +108,36 @@ public class MysqlSpecStoreTest {
   }
 
   @Test
+  public void testGetSpecWithTag() throws Exception {
+
+    //Creating and inserting flowspecs with tags
+    URI uri4 = URI.create("flowspec4");
+    FlowSpec flowSpec4 = FlowSpec.builder(uri4)
+        .withConfig(ConfigBuilder.create().addPrimitive("key4", 
"value4").build())
+        .withDescription("Test flow spec 4")
+        .withVersion("Test version 4")
+        .build();
+
+    URI uri5 = URI.create("flowspec5");
+    FlowSpec flowSpec5 = FlowSpec.builder(uri5)
+        .withConfig(ConfigBuilder.create().addPrimitive("key5", 
"value5").build())
+        .withDescription("Test flow spec 5")
+        .withVersion("Test version 5")
+        .build();
+
+    this.specStore.addSpec(flowSpec3);
+    this.specStore.addSpec(flowSpec4, "dr");
+    this.specStore.addSpec(flowSpec5, "dr");
+
+    Assert.assertTrue(this.specStore.exists(uri3));
+    Assert.assertTrue(this.specStore.exists(uri4));
+    Assert.assertTrue(this.specStore.exists(uri5));
+    List<URI> result = new ArrayList();
+    this.specStore.getSpecURIsWithTag("dr").forEachRemaining(result::add);
+    Assert.assertEquals(result.size(), 2);
+  }
+
+  @Test
   public void testGetCorruptedSpec() throws Exception {
     this.specStore.addSpec(this.flowSpec1);
     this.specStore.addSpec(this.flowSpec2);
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java
index f0419ea..05e5143 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java
@@ -17,6 +17,7 @@
 
 package org.apache.gobblin.service.modules.core;
 
+import com.typesafe.config.ConfigValueFactory;
 import java.io.IOException;
 import java.lang.reflect.InvocationTargetException;
 import java.net.URI;
@@ -104,6 +105,11 @@ import 
org.apache.gobblin.util.reflection.GobblinConstructorUtils;
 @Alpha
 public class GobblinServiceManager implements ApplicationLauncher, 
StandardMetricsBridge {
 
+  // Command line options
+  // These two options are required to launch GobblinServiceManager.
+  public static final String SERVICE_NAME_OPTION_NAME = "service_name";
+  public static final String SERVICE_ID_OPTION_NAME = "service_id";
+
   private static final Logger LOGGER = 
LoggerFactory.getLogger(GobblinServiceManager.class);
   private static final String JOB_STATUS_RETRIEVER_CLASS_KEY = 
"jobStatusRetriever.class";
 
@@ -163,9 +169,8 @@ public class GobblinServiceManager implements 
ApplicationLauncher, StandardMetri
     if (!properties.contains(ServiceBasedAppLauncher.APP_STOP_TIME_SECONDS)) {
       properties.setProperty(ServiceBasedAppLauncher.APP_STOP_TIME_SECONDS, 
Long.toString(300));
     }
-    this.config = config;
     this.metricContext = 
Instrumented.getMetricContext(ConfigUtils.configToState(config), 
this.getClass());
-    this.metrics = new Metrics(this.metricContext, this.config);
+    this.metrics = new Metrics(this.metricContext, config);
     this.serviceName = serviceName;
     this.serviceId = serviceId;
     this.serviceLauncher = new ServiceBasedAppLauncher(properties, 
serviceName);
@@ -544,13 +549,15 @@ public class GobblinServiceManager implements 
ApplicationLauncher, StandardMetri
     }
   }
 
-  private static String getServiceId() {
-    return "1";
+  private static String getServiceId(CommandLine cmd) {
+    return cmd.getOptionValue(SERVICE_ID_OPTION_NAME) == null ? "1" : 
cmd.getOptionValue(SERVICE_ID_OPTION_NAME);
   }
 
   private static Options buildOptions() {
     Options options = new Options();
-    options.addOption("a", ServiceConfigKeys.SERVICE_NAME_OPTION_NAME, true, 
"Gobblin application name");
+    options.addOption("a", SERVICE_NAME_OPTION_NAME, true, "Gobblin Service 
application's name");
+    options.addOption("i", SERVICE_ID_OPTION_NAME, true, "Gobblin Service 
application's ID, "
+        + "this needs to be globally unique");
     return options;
   }
 
@@ -563,11 +570,16 @@ public class GobblinServiceManager implements 
ApplicationLauncher, StandardMetri
     Options options = buildOptions();
     try {
       CommandLine cmd = new DefaultParser().parse(options, args);
-      if (!cmd.hasOption(ServiceConfigKeys.SERVICE_NAME_OPTION_NAME)) {
+      if (!cmd.hasOption(SERVICE_NAME_OPTION_NAME)) {
         printUsage(options);
         System.exit(1);
       }
 
+      if (!cmd.hasOption(SERVICE_ID_OPTION_NAME)) {
+        printUsage(options);
+        LOGGER.warn("Please assign globally unique ID for a 
GobblinServiceManager instance, or it will use default ID");
+      }
+
       boolean isTestMode = false;
       if (cmd.hasOption("test_mode")) {
         isTestMode = Boolean.parseBoolean(cmd.getOptionValue("test_mode", 
"false"));
@@ -575,7 +587,7 @@ public class GobblinServiceManager implements 
ApplicationLauncher, StandardMetri
 
       Config config = ConfigFactory.load();
       try (GobblinServiceManager gobblinServiceManager = new 
GobblinServiceManager(
-          cmd.getOptionValue(ServiceConfigKeys.SERVICE_NAME_OPTION_NAME), 
getServiceId(),
+          cmd.getOptionValue(SERVICE_NAME_OPTION_NAME), getServiceId(cmd),
           config, Optional.<Path>absent())) {
         
gobblinServiceManager.getOrchestrator().setFlowStatusGenerator(gobblinServiceManager.buildFlowStatusGenerator(config));
         gobblinServiceManager.start();
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
index f3a4f06..c371b51 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
@@ -318,7 +318,8 @@ public class Orchestrator implements SpecCatalogListener, 
Instrumentable {
   }
 
   /**
-   * Check if the flow instance is allowed to run.
+   * Check if a FlowSpec instance is allowed to run.
+   *
    * @param flowName
    * @param flowGroup
    * @param allowConcurrentExecution
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
index fde21bf..e14ce4c 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
@@ -35,7 +35,6 @@ import org.apache.gobblin.runtime.JobException;
 import org.apache.gobblin.runtime.api.FlowSpec;
 import org.apache.gobblin.runtime.api.Spec;
 import org.apache.gobblin.runtime.api.SpecCatalogListener;
-import org.apache.gobblin.runtime.api.SpecNotFoundException;
 import org.apache.gobblin.runtime.api.SpecSerDeException;
 import org.apache.gobblin.runtime.listeners.JobListener;
 import org.apache.gobblin.runtime.spec_catalog.AddSpecResponse;
@@ -61,6 +60,8 @@ import org.quartz.UnableToInterruptJobException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static org.apache.gobblin.service.ServiceConfigKeys.*;
+
 
 /**
  * An extension to {@link JobScheduler} that is also a {@link 
SpecCatalogListener}.
@@ -69,6 +70,11 @@ import org.slf4j.LoggerFactory;
  */
 @Alpha
 public class GobblinServiceJobScheduler extends JobScheduler implements 
SpecCatalogListener {
+
+  // Scheduler related configuration
+  // A boolean function indicating if current instance will handle DR traffic 
or not.
+  public static final String GOBBLIN_SERVICE_SCHEDULER_DR_NOMINATED = 
GOBBLIN_SERVICE_PREFIX + "drNominatedInstance";
+
   protected final Logger _log;
 
   protected final Optional<FlowCatalog> flowCatalog;
@@ -80,6 +86,21 @@ public class GobblinServiceJobScheduler extends JobScheduler 
implements SpecCata
   private volatile boolean isActive;
   private String serviceName;
 
+  /**
+   * If current instances is nominated as a handler for DR traffic from down 
GaaS-Instance.
+   * Note this is, currently, different from leadership change/fail-over 
handling, where the traffice could come
+   * from GaaS instance out of current GaaS Cluster:
+   * e.g. There are multi-datacenter deployment of GaaS Cluster. 
Intra-datacenter fail-over could be handled by
+   * leadership change mechanism, while inter-datacenter fail-over would be 
handled by DR handling mechanism.
+   */
+  private boolean isNominatedDRHandler;
+
+  /**
+   * Use this to tag all DR-applicable FlowSpec entries in {@link 
org.apache.gobblin.runtime.api.SpecStore}
+   * so only they would be loaded during DR handling.
+   */
+  public static final String DR_FILTER_TAG = "dr";
+
   public GobblinServiceJobScheduler(String serviceName, Config config, 
Optional<HelixManager> helixManager,
       Optional<FlowCatalog> flowCatalog, Optional<TopologyCatalog> 
topologyCatalog, Orchestrator orchestrator,
       SchedulerService schedulerService, Optional<Logger> log) throws 
Exception {
@@ -91,6 +112,8 @@ public class GobblinServiceJobScheduler extends JobScheduler 
implements SpecCata
     this.helixManager = helixManager;
     this.orchestrator = orchestrator;
     this.scheduledFlowSpecs = Maps.newHashMap();
+    this.isNominatedDRHandler = 
config.hasPath(GOBBLIN_SERVICE_SCHEDULER_DR_NOMINATED)
+        && config.hasPath(GOBBLIN_SERVICE_SCHEDULER_DR_NOMINATED);
   }
 
   public GobblinServiceJobScheduler(String serviceName, Config config, 
Optional<HelixManager> helixManager,
@@ -135,6 +158,7 @@ public class GobblinServiceJobScheduler extends 
JobScheduler implements SpecCata
    * Load all {@link FlowSpec}s from {@link FlowCatalog} as one of the 
initialization step,
    * and make schedulers be aware of that.
    *
+   * If it is newly brought up as the DR handler, will load additional 
FlowSpecs and handle transition properly.
    */
   private void scheduleSpecsFromCatalog() {
     Iterator<URI> specUris = null;
@@ -142,21 +166,22 @@ public class GobblinServiceJobScheduler extends 
JobScheduler implements SpecCata
 
     try {
       specUris = this.flowCatalog.get().getSpecURIs();
+
+      // If current instances nominated as DR handler, will take additional 
URIS from FlowCatalog.
+      if (isNominatedDRHandler) {
+        // Synchronously cleaning the execution state for DR-applicable 
FlowSpecs
+        // before rescheduling the again in nominated DR-Hanlder.
+        Iterator<URI> drUris = 
this.flowCatalog.get().getSpecURISWithTag(DR_FILTER_TAG);
+        clearRunningFlowState(drUris);
+      }
+
     } catch (SpecSerDeException ssde) {
       throw new RuntimeException("Failed to get the iterator of all Spec 
URIS", ssde);
     }
 
-
     try {
       while (specUris.hasNext()) {
-        Spec spec = null;
-        try {
-          spec = this.flowCatalog.get().getSpec(specUris.next());
-        } catch (SpecNotFoundException snfe) {
-          _log.error(String.format("The URI %s discovered in SpecStore is 
missing in FlowCatlog"
-              + ", suspecting current modification on SpecStore", 
specUris.next()), snfe);
-        }
-
+        Spec spec = this.flowCatalog.get().getSpecWrapper(specUris.next());
         //Disable FLOW_RUN_IMMEDIATELY on service startup or leadership change
         if (spec instanceof FlowSpec) {
           Spec modifiedSpec = disableFlowRunImmediatelyOnStart((FlowSpec) 
spec);
@@ -170,6 +195,19 @@ public class GobblinServiceJobScheduler extends 
JobScheduler implements SpecCata
     }
   }
 
+  /**
+   * In DR-mode, the running {@link FlowSpec} will all be cancelled and 
rescheduled.
+   * We will need to make sure that running {@link FlowSpec}s' state are 
cleared, and corresponding running jobs are
+   * killed before rescheduling them.
+   * @param drUris The uris that applicable for DR discovered from FlowCatalog.
+   */
+  private void clearRunningFlowState(Iterator<URI> drUris) {
+    while (drUris.hasNext()) {
+      // TODO: Instead of simply call onDeleteSpec, a callback when FlowSpec 
is deleted from FlowCatalog, should also kill Azkaban Flow from 
AzkabanSpecProducer.
+      onDeleteSpec(drUris.next(), FlowSpec.Builder.DEFAULT_VERSION);
+    }
+  }
+
   @VisibleForTesting
   protected static Spec disableFlowRunImmediatelyOnStart(FlowSpec spec) {
     Properties properties = spec.getConfigAsProperties();

Reply via email to