Repository: incubator-gobblin
Updated Branches:
  refs/heads/master 5c678d9b6 -> f89505702


[GOBBLIN-406] delete job state store on spec delete request

Closes #2281 from arjun4084346/deleteInCluster


Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/f8950570
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/f8950570
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/f8950570

Branch: refs/heads/master
Commit: f89505702a3a130a7e73c617b33a285ea228bd8c
Parents: 5c678d9
Author: Arjun <ab...@linkedin.com>
Authored: Fri Feb 9 16:04:10 2018 -0800
Committer: Hung Tran <hut...@linkedin.com>
Committed: Fri Feb 9 16:04:10 2018 -0800

----------------------------------------------------------------------
 .../runtime/job_monitor/KafkaJobMonitor.java    | 25 ++++++++++++++++-
 .../job_monitor/KafkaJobMonitorTest.java        | 24 +++++++++++++++--
 .../runtime/kafka/HighLevelConsumerTest.java    |  7 +++++
 .../modules/flow/BaseFlowToJobSpecCompiler.java | 28 +++++++++++++++++++-
 .../flow/MultiHopsFlowToJobSpecCompiler.java    |  8 ++++--
 5 files changed, 86 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f8950570/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_monitor/KafkaJobMonitor.java
----------------------------------------------------------------------
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_monitor/KafkaJobMonitor.java
 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_monitor/KafkaJobMonitor.java
index ba79305..0bb4f14 100644
--- 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_monitor/KafkaJobMonitor.java
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_monitor/KafkaJobMonitor.java
@@ -26,6 +26,8 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Charsets;
 import com.typesafe.config.Config;
 
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metastore.DatasetStateStore;
 import org.apache.gobblin.runtime.api.JobSpec;
 import org.apache.gobblin.runtime.api.JobSpecMonitor;
 import org.apache.gobblin.runtime.api.MutableJobCatalog;
@@ -50,8 +52,9 @@ public abstract class KafkaJobMonitor extends 
HighLevelConsumer<byte[], byte[]>
   public static final String KAFKA_AUTO_OFFSET_RESET_KEY = 
KAFKA_JOB_MONITOR_PREFIX + ".auto.offset.reset";
   public static final String KAFKA_AUTO_OFFSET_RESET_SMALLEST = "smallest";
   public static final String KAFKA_AUTO_OFFSET_RESET_LARGEST = "largest";
-
+  private DatasetStateStore datasetStateStore;
   private final MutableJobCatalog jobCatalog;
+
   @Getter
   private Counter newSpecs;
   @Getter
@@ -67,6 +70,15 @@ public abstract class KafkaJobMonitor extends 
HighLevelConsumer<byte[], byte[]>
   public KafkaJobMonitor(String topic, MutableJobCatalog catalog, Config 
config) {
     super(topic, ConfigUtils.getConfigOrEmpty(config, 
KAFKA_JOB_MONITOR_PREFIX), 1);
     this.jobCatalog = catalog;
+    try {
+      if (config.hasPath(ConfigurationKeys.STATE_STORE_ENABLED) &&
+          config.getBoolean(ConfigurationKeys.STATE_STORE_ENABLED) &&
+          config.hasPath(ConfigurationKeys.STATE_STORE_ROOT_DIR_KEY)) {
+        this.datasetStateStore = 
DatasetStateStore.buildDatasetStateStore(config);
+      }
+    } catch (IOException e) {
+      log.warn("DatasetStateStore could not be created.");
+    }
   }
 
   @Override
@@ -100,6 +112,17 @@ public abstract class KafkaJobMonitor extends 
HighLevelConsumer<byte[], byte[]>
         } else if (parsedMessage instanceof Either.Right) {
           this.remmovedSpecs.inc();
           this.jobCatalog.remove(((Either.Right<JobSpec, URI>) 
parsedMessage).getRight());
+
+          // Refer FlowConfigsResources:delete to understand the pattern of 
flow URI
+          // FlowToJobSpec Compilers use the flowSpecURI to derive jobSpecURI
+          String[] uriTokens = ((URI)(((Either.Right) 
parsedMessage).getRight())).getPath().split("/");
+          if (uriTokens.length == 3) {
+            String jobName = uriTokens[2];
+            // Delete the job state if it is a delete spec request
+            if (this.datasetStateStore != null) {
+              this.datasetStateStore.delete(jobName);
+            }
+          }
         }
       }
     } catch (IOException ioe) {

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f8950570/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/job_monitor/KafkaJobMonitorTest.java
----------------------------------------------------------------------
diff --git 
a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/job_monitor/KafkaJobMonitorTest.java
 
b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/job_monitor/KafkaJobMonitorTest.java
index 1dd90b1..57f99a9 100644
--- 
a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/job_monitor/KafkaJobMonitorTest.java
+++ 
b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/job_monitor/KafkaJobMonitorTest.java
@@ -19,11 +19,16 @@ package org.apache.gobblin.runtime.job_monitor;
 
 import java.net.URI;
 
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
 import com.google.common.base.Optional;
+import com.typesafe.config.Config;
 
+import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.runtime.kafka.HighLevelConsumerTest;
 
 
@@ -32,8 +37,11 @@ public class KafkaJobMonitorTest {
   @Test
   public void test() throws Exception {
 
-    MockedKafkaJobMonitor monitor =
-        
MockedKafkaJobMonitor.create(HighLevelConsumerTest.getSimpleConfig(Optional.of(KafkaJobMonitor.KAFKA_JOB_MONITOR_PREFIX)));
+    Config config = 
HighLevelConsumerTest.getSimpleConfig(Optional.of(KafkaJobMonitor.KAFKA_JOB_MONITOR_PREFIX));
+    String stateStoreRootDir = 
config.getString(ConfigurationKeys.STATE_STORE_ROOT_DIR_KEY);
+    FileSystem fs = FileSystem.getLocal(new Configuration());
+
+    MockedKafkaJobMonitor monitor = MockedKafkaJobMonitor.create(config);
     monitor.startAsync();
 
     monitor.getMockKafkaStream().pushToStream("job1:1");
@@ -58,6 +66,18 @@ public class KafkaJobMonitorTest {
     Assert.assertTrue(monitor.getJobSpecs().containsKey(new URI("job2")));
     Assert.assertEquals(monitor.getJobSpecs().get(new 
URI("job2")).getVersion(), "2");
 
+    monitor.getMockKafkaStream().pushToStream("/flow3/job3:1");
+    monitor.awaitExactlyNSpecs(3);
+    Assert.assertTrue(monitor.getJobSpecs().containsKey(new 
URI("/flow3/job3")));
+
+    // TODO: Currently, state stores are not categorized by flow name.
+    //       This can lead to one job overwriting other jobs' job state.
+    fs.create(new Path(stateStoreRootDir, "job3"));
+    Assert.assertTrue(fs.exists(new Path(stateStoreRootDir, "job3")));
+    monitor.getMockKafkaStream().pushToStream(MockedKafkaJobMonitor.REMOVE + 
":/flow3/job3");
+    monitor.awaitExactlyNSpecs(2);
+    Assert.assertFalse(fs.exists(new Path(stateStoreRootDir, "job3")));
+
     monitor.shutDown();
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f8950570/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/kafka/HighLevelConsumerTest.java
----------------------------------------------------------------------
diff --git 
a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/kafka/HighLevelConsumerTest.java
 
b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/kafka/HighLevelConsumerTest.java
index 6e8d7a2..e8d4e6c 100644
--- 
a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/kafka/HighLevelConsumerTest.java
+++ 
b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/kafka/HighLevelConsumerTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.gobblin.runtime.kafka;
 
+import java.io.File;
 import java.util.Properties;
 import java.util.concurrent.TimeoutException;
 
@@ -24,9 +25,11 @@ import org.testng.Assert;
 import org.testng.annotations.Test;
 
 import com.google.common.base.Optional;
+import com.google.common.io.Files;
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
 
+import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.runtime.job_monitor.MockKafkaStream;
 
 
@@ -35,6 +38,10 @@ public class HighLevelConsumerTest {
   public static Config getSimpleConfig(Optional<String> prefix) {
     Properties properties = new Properties();
     properties.put(getConfigKey(prefix, "zookeeper.connect"), "zookeeper");
+    properties.put(ConfigurationKeys.STATE_STORE_ENABLED, "true");
+    File tmpDir = Files.createTempDir();
+    tmpDir.deleteOnExit();
+    properties.put(ConfigurationKeys.STATE_STORE_ROOT_DIR_KEY, 
tmpDir.toString());
 
     return ConfigFactory.parseProperties(properties);
   }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f8950570/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/BaseFlowToJobSpecCompiler.java
----------------------------------------------------------------------
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/BaseFlowToJobSpecCompiler.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/BaseFlowToJobSpecCompiler.java
index db92ef9..855d692 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/BaseFlowToJobSpecCompiler.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/BaseFlowToJobSpecCompiler.java
@@ -211,7 +211,7 @@ public abstract class BaseFlowToJobSpecCompiler implements 
SpecCompiler {
    */
   protected JobSpec jobSpecGenerator(FlowSpec flowSpec) {
     JobSpec jobSpec;
-    JobSpec.Builder jobSpecBuilder = JobSpec.builder(flowSpec.getUri())
+    JobSpec.Builder jobSpecBuilder = 
JobSpec.builder(jobSpecURIGenerator(flowSpec))
         .withConfig(flowSpec.getConfig())
         .withDescription(flowSpec.getDescription())
         .withVersion(flowSpec.getVersion());
@@ -254,6 +254,32 @@ public abstract class BaseFlowToJobSpecCompiler implements 
SpecCompiler {
   }
 
   /**
+   * It can receive multiple number of parameters, needed to generate a unique 
URI.
+   * Implementation is flowSpecCompiler dependent.
+   * This method should return URI which has job name at third place, when 
split by "/"
+   * e.g. /flowGroup/flowName
+   *      /flowGroup/flowName/sourceNode-targetNode
+   * SafeDatasetCommit creates state store using this name and
+   * {@link org.apache.gobblin.runtime.job_monitor.KafkaJobMonitor} extract 
job name to find the state store path.
+   * @param objects
+   * @return
+   */
+  public  URI jobSpecURIGenerator(Object... objects) {
+    return ((FlowSpec)objects[0]).getUri();
+  }
+
+  /**
+   * It returns the template uri for job.
+   * This method can be overridden by derived FlowToJobSpecCompiler classes.
+   * @param flowSpec
+   * @return template URI
+   */
+  protected URI jobSpecTemplateURIGenerator(FlowSpec flowSpec) {
+    // For now only first template uri will be honored for Identity
+    return flowSpec.getTemplateURIs().get().iterator().next();
+  }
+
+  /**
    * Ideally each edge has its own eligible template repository(Based on 
{@link SpecExecutor})
    * to pick templates from.
    *

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f8950570/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopsFlowToJobSpecCompiler.java
----------------------------------------------------------------------
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopsFlowToJobSpecCompiler.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopsFlowToJobSpecCompiler.java
index ba5c203..544ca42 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopsFlowToJobSpecCompiler.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopsFlowToJobSpecCompiler.java
@@ -323,14 +323,18 @@ public class MultiHopsFlowToJobSpecCompiler extends 
BaseFlowToJobSpecCompiler {
   private URI getTemplateURI (ServiceNode sourceNode, ServiceNode targetNode, 
FlowSpec flowSpec, FlowEdge flowEdge) {
     URI firstTemplateURI =
         (edgeTemplateMap != null && 
edgeTemplateMap.containsKey(flowEdge.getEdgeIdentity())) ? edgeTemplateMap.get(
-            flowEdge.getEdgeIdentity()).get(0) : 
jobSpecGenerator(flowSpec).getTemplateURI().orNull();
+            flowEdge.getEdgeIdentity()).get(0) : 
jobSpecTemplateURIGenerator(flowSpec);
     return firstTemplateURI;
   }
 
   /**
    * A naive implementation of generating a jobSpec's URI within a multi-hop 
logical Flow.
    */
-  public static URI jobSpecURIGenerator(FlowSpec flowSpec, ServiceNode 
sourceNode, ServiceNode targetNode) {
+  @Override
+  public URI jobSpecURIGenerator(Object... objects) {
+    FlowSpec flowSpec = (FlowSpec) objects[0];
+    ServiceNode sourceNode = (ServiceNode) objects[1];
+    ServiceNode targetNode = (ServiceNode) objects[2];
     try {
       return new URI(JobSpec.Builder.DEFAULT_JOB_CATALOG_SCHEME, 
flowSpec.getUri().getAuthority(),
           
StringUtils.appendIfMissing(StringUtils.prependIfMissing(flowSpec.getUri().getPath(),
 "/"),"/")

Reply via email to