Repository: incubator-gobblin
Updated Branches:
  refs/heads/master ed91dcdae -> ccd7ba769


[GOBBLIN-456] add option to delete state store

add option to delete state store

Closes #2327 from
arjun4084346/addDeleteStateStoreOption


Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/ccd7ba76
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/ccd7ba76
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/ccd7ba76

Branch: refs/heads/master
Commit: ccd7ba769308e720db33ea800d964df43df4e878
Parents: ed91dcd
Author: Arjun <ab...@linkedin.com>
Authored: Mon Apr 9 11:54:31 2018 -0700
Committer: Hung Tran <hut...@linkedin.com>
Committed: Mon Apr 9 11:54:58 2018 -0700

----------------------------------------------------------------------
 .../gobblin/runtime/api/SpecExecutor.java       |  3 +-
 .../gobblin/runtime/api/SpecProducer.java       |  7 ++-
 .../orchestration/AzkabanSpecProducer.java      |  3 +-
 .../orchestration/AzkabanProjectConfigTest.java | 11 ++--
 .../service/SimpleKafkaSpecProducer.java        |  6 +-
 .../service/StreamingKafkaSpecConsumer.java     |  4 +-
 .../gobblin/service/FlowConfigClient.java       | 18 ++++++
 .../gobblin/service/FlowConfigsResource.java    | 19 +++++-
 .../org/apache/gobblin/runtime/api/JobSpec.java | 31 +++++++++-
 .../gobblin/runtime/api/MutableSpecCatalog.java |  7 +--
 .../apache/gobblin/runtime/api/SpecCatalog.java |  6 +-
 .../runtime/api/SpecCatalogListener.java        |  9 ++-
 .../job_monitor/AvroJobSpecKafkaJobMonitor.java | 62 ++++++++++++++++----
 .../runtime/job_monitor/KafkaJobMonitor.java    | 24 ++------
 .../runtime/job_spec/ResolvedJobSpec.java       |  2 +-
 .../runtime/spec_catalog/FlowCatalog.java       | 10 +++-
 .../spec_catalog/SpecCatalogListenersList.java  |  5 +-
 .../runtime/spec_catalog/TopologyCatalog.java   |  9 ++-
 .../InMemorySpecProducer.java                   |  3 +-
 .../job_monitor/KafkaJobMonitorTest.java        | 18 ------
 .../job_monitor/MockedKafkaJobMonitor.java      |  1 +
 .../modules/flow/BaseFlowToJobSpecCompiler.java |  8 ++-
 .../modules/orchestration/Orchestrator.java     | 13 ++--
 .../scheduler/GobblinServiceJobScheduler.java   |  8 ++-
 24 files changed, 198 insertions(+), 89 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ccd7ba76/gobblin-api/src/main/java/org/apache/gobblin/runtime/api/SpecExecutor.java
----------------------------------------------------------------------
diff --git 
a/gobblin-api/src/main/java/org/apache/gobblin/runtime/api/SpecExecutor.java 
b/gobblin-api/src/main/java/org/apache/gobblin/runtime/api/SpecExecutor.java
index cb5197a..85ee7af 100644
--- a/gobblin-api/src/main/java/org/apache/gobblin/runtime/api/SpecExecutor.java
+++ b/gobblin-api/src/main/java/org/apache/gobblin/runtime/api/SpecExecutor.java
@@ -58,7 +58,8 @@ public interface SpecExecutor {
   public static enum Verb {
     ADD(1, "add"),
     UPDATE(2, "update"),
-    DELETE(3, "delete");
+    DELETE(3, "delete"),
+    UNKNOWN(4, "unknown");
 
     private int _id;
     private String _verb;

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ccd7ba76/gobblin-api/src/main/java/org/apache/gobblin/runtime/api/SpecProducer.java
----------------------------------------------------------------------
diff --git 
a/gobblin-api/src/main/java/org/apache/gobblin/runtime/api/SpecProducer.java 
b/gobblin-api/src/main/java/org/apache/gobblin/runtime/api/SpecProducer.java
index 9b9e504..880847d 100644
--- a/gobblin-api/src/main/java/org/apache/gobblin/runtime/api/SpecProducer.java
+++ b/gobblin-api/src/main/java/org/apache/gobblin/runtime/api/SpecProducer.java
@@ -19,6 +19,7 @@ package org.apache.gobblin.runtime.api;
 
 import java.net.URI;
 import java.util.List;
+import java.util.Properties;
 import java.util.concurrent.Future;
 
 import org.apache.gobblin.annotation.Alpha;
@@ -38,8 +39,12 @@ public interface SpecProducer<V> {
   /** Update a {@link Spec} being executed on {@link SpecExecutor}. */
   Future<?> updateSpec(V updatedSpec);
 
+  default Future<?> deleteSpec(URI deletedSpecURI) {
+    return deleteSpec(deletedSpecURI, new Properties());
+  }
+
   /** Delete a {@link Spec} being executed on {@link SpecExecutor}. */
-  Future<?> deleteSpec(URI deletedSpecURI);
+  Future<?> deleteSpec(URI deletedSpecURI, Properties headers);
 
   /** List all {@link Spec} being executed on {@link SpecExecutor}. */
   Future<? extends List<V>> listSpecs();

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ccd7ba76/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanSpecProducer.java
----------------------------------------------------------------------
diff --git 
a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanSpecProducer.java
 
b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanSpecProducer.java
index 7b11cef..a1ae133 100644
--- 
a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanSpecProducer.java
+++ 
b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanSpecProducer.java
@@ -20,6 +20,7 @@ import java.io.Closeable;
 import java.io.IOException;
 import java.net.URI;
 import java.util.List;
+import java.util.Properties;
 import java.util.concurrent.Future;
 
 import org.apache.commons.codec.EncoderException;
@@ -139,7 +140,7 @@ public class AzkabanSpecProducer implements 
SpecProducer<Spec>, Closeable {
   }
 
   @Override
-  public Future<?> deleteSpec(URI deletedSpecURI) {
+  public Future<?> deleteSpec(URI deletedSpecURI, Properties headers) {
     // Delete project
     JobSpec jobSpec = new JobSpec.Builder(deletedSpecURI).build();
 

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ccd7ba76/gobblin-modules/gobblin-azkaban/src/test/java/org/apache/gobblin/service/modules/orchestration/AzkabanProjectConfigTest.java
----------------------------------------------------------------------
diff --git 
a/gobblin-modules/gobblin-azkaban/src/test/java/org/apache/gobblin/service/modules/orchestration/AzkabanProjectConfigTest.java
 
b/gobblin-modules/gobblin-azkaban/src/test/java/org/apache/gobblin/service/modules/orchestration/AzkabanProjectConfigTest.java
index 9e189ab..3a48806 100644
--- 
a/gobblin-modules/gobblin-azkaban/src/test/java/org/apache/gobblin/service/modules/orchestration/AzkabanProjectConfigTest.java
+++ 
b/gobblin-modules/gobblin-azkaban/src/test/java/org/apache/gobblin/service/modules/orchestration/AzkabanProjectConfigTest.java
@@ -17,6 +17,7 @@
 package org.apache.gobblin.service.modules.orchestration;
 
 import java.net.URI;
+import java.util.Collections;
 import java.util.Properties;
 
 import lombok.extern.slf4j.Slf4j;
@@ -39,7 +40,7 @@ public class AzkabanProjectConfigTest {
 
     Properties properties = new Properties();
     JobSpec jobSpec = new JobSpec(new URI("uri"), "0.0", "test job spec",
-        ConfigUtils.propertiesToConfig(properties), properties, 
Optional.absent());
+        ConfigUtils.propertiesToConfig(properties), properties, 
Optional.absent(), Collections.EMPTY_MAP);
     AzkabanProjectConfig azkabanProjectConfig = new 
AzkabanProjectConfig(jobSpec);
 
     String actualProjectName = azkabanProjectConfig.getAzkabanProjectName();
@@ -54,7 +55,7 @@ public class AzkabanProjectConfigTest {
     Properties properties = new Properties();
     properties.setProperty("gobblin.service.azkaban.project.namePrefix", 
"randomPrefix");
     JobSpec jobSpec = new JobSpec(new URI("http://localhost:8000/context";), 
"0.0", "test job spec",
-        ConfigUtils.propertiesToConfig(properties), properties, 
Optional.absent());
+        ConfigUtils.propertiesToConfig(properties), properties, 
Optional.absent(), Collections.EMPTY_MAP);
     AzkabanProjectConfig azkabanProjectConfig = new 
AzkabanProjectConfig(jobSpec);
 
     String actualProjectName = azkabanProjectConfig.getAzkabanProjectName();
@@ -69,7 +70,7 @@ public class AzkabanProjectConfigTest {
     Properties properties = new Properties();
     properties.setProperty("gobblin.service.azkaban.project.namePrefix", 
"randomPrefixWithReallyLongName");
     JobSpec jobSpec = new JobSpec(new 
URI("http://localhost:8000/context/that-keeps-expanding-and-explanding";),
-        "0.0", "test job spec", ConfigUtils.propertiesToConfig(properties), 
properties, Optional.absent());
+        "0.0", "test job spec", ConfigUtils.propertiesToConfig(properties), 
properties, Optional.absent(), Collections.EMPTY_MAP);
     AzkabanProjectConfig azkabanProjectConfig = new 
AzkabanProjectConfig(jobSpec);
 
     String actualProjectName = azkabanProjectConfig.getAzkabanProjectName();
@@ -84,7 +85,7 @@ public class AzkabanProjectConfigTest {
     Properties properties = new Properties();
     properties.setProperty("gobblin.service.azkaban.project.namePrefix", 
"randomPrefix");
     JobSpec jobSpec = new JobSpec(new URI("http://localhost:8000/context";), 
"0.0", "test job spec",
-        ConfigUtils.propertiesToConfig(properties), properties, 
Optional.absent());
+        ConfigUtils.propertiesToConfig(properties), properties, 
Optional.absent(), Collections.EMPTY_MAP);
     AzkabanProjectConfig azkabanProjectConfig = new 
AzkabanProjectConfig(jobSpec);
 
     String actualZipFileName = 
azkabanProjectConfig.getAzkabanProjectZipFilename();
@@ -99,7 +100,7 @@ public class AzkabanProjectConfigTest {
     Properties properties = new Properties();
     properties.setProperty("gobblin.service.azkaban.project.namePrefix", 
"randomPrefixWithReallyLongName");
     JobSpec jobSpec = new JobSpec(new 
URI("http://localhost:8000/context/that-keeps-expanding-and-explanding";),
-        "0.0", "test job spec", ConfigUtils.propertiesToConfig(properties), 
properties, Optional.absent());
+        "0.0", "test job spec", ConfigUtils.propertiesToConfig(properties), 
properties, Optional.absent(), Collections.EMPTY_MAP);
     AzkabanProjectConfig azkabanProjectConfig = new 
AzkabanProjectConfig(jobSpec);
 
     String actualZipFileName = 
azkabanProjectConfig.getAzkabanProjectZipFilename();

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ccd7ba76/gobblin-modules/gobblin-service-kafka/src/main/java/org/apache/gobblin/service/SimpleKafkaSpecProducer.java
----------------------------------------------------------------------
diff --git 
a/gobblin-modules/gobblin-service-kafka/src/main/java/org/apache/gobblin/service/SimpleKafkaSpecProducer.java
 
b/gobblin-modules/gobblin-service-kafka/src/main/java/org/apache/gobblin/service/SimpleKafkaSpecProducer.java
index a5163db..c56593c 100644
--- 
a/gobblin-modules/gobblin-service-kafka/src/main/java/org/apache/gobblin/service/SimpleKafkaSpecProducer.java
+++ 
b/gobblin-modules/gobblin-service-kafka/src/main/java/org/apache/gobblin/service/SimpleKafkaSpecProducer.java
@@ -23,6 +23,7 @@ import java.lang.reflect.InvocationTargetException;
 import java.net.URI;
 import java.util.List;
 import java.util.concurrent.Future;
+import java.util.Properties;
 
 import org.apache.commons.lang3.reflect.ConstructorUtils;
 import org.slf4j.Logger;
@@ -101,10 +102,11 @@ public class SimpleKafkaSpecProducer implements 
SpecProducer<Spec>, Closeable  {
   }
 
   @Override
-  public Future<?> deleteSpec(URI deletedSpecURI) {
+  public Future<?> deleteSpec(URI deletedSpecURI, Properties headers) {
 
     AvroJobSpec avroJobSpec = 
AvroJobSpec.newBuilder().setUri(deletedSpecURI.toString())
-        .setMetadata(ImmutableMap.of(VERB_KEY, 
SpecExecutor.Verb.DELETE.name())).build();
+        .setMetadata(ImmutableMap.of(VERB_KEY, 
SpecExecutor.Verb.DELETE.name()))
+        .setProperties(Maps.fromProperties(headers)).build();
 
     log.info("Deleting Spec: " + deletedSpecURI + " using Kafka.");
 

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ccd7ba76/gobblin-modules/gobblin-service-kafka/src/main/java/org/apache/gobblin/service/StreamingKafkaSpecConsumer.java
----------------------------------------------------------------------
diff --git 
a/gobblin-modules/gobblin-service-kafka/src/main/java/org/apache/gobblin/service/StreamingKafkaSpecConsumer.java
 
b/gobblin-modules/gobblin-service-kafka/src/main/java/org/apache/gobblin/service/StreamingKafkaSpecConsumer.java
index 6d8de39..ef44c7d 100644
--- 
a/gobblin-modules/gobblin-service-kafka/src/main/java/org/apache/gobblin/service/StreamingKafkaSpecConsumer.java
+++ 
b/gobblin-modules/gobblin-service-kafka/src/main/java/org/apache/gobblin/service/StreamingKafkaSpecConsumer.java
@@ -224,8 +224,8 @@ public class StreamingKafkaSpecConsumer extends 
AbstractIdleService implements S
     }
 
     private long getRemovedSpecs() {
-      return StreamingKafkaSpecConsumer.this._jobMonitor.getRemmovedSpecs() != 
null?
-          
StreamingKafkaSpecConsumer.this._jobMonitor.getRemmovedSpecs().getCount() : 0;
+      return StreamingKafkaSpecConsumer.this._jobMonitor.getRemovedSpecs() != 
null?
+          
StreamingKafkaSpecConsumer.this._jobMonitor.getRemovedSpecs().getCount() : 0;
     }
 
     private long getMessageParseFailures() {

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ccd7ba76/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/main/java/org/apache/gobblin/service/FlowConfigClient.java
----------------------------------------------------------------------
diff --git 
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/main/java/org/apache/gobblin/service/FlowConfigClient.java
 
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/main/java/org/apache/gobblin/service/FlowConfigClient.java
index 28255bb..a1c983e 100644
--- 
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/main/java/org/apache/gobblin/service/FlowConfigClient.java
+++ 
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/main/java/org/apache/gobblin/service/FlowConfigClient.java
@@ -52,6 +52,7 @@ public class FlowConfigClient implements Closeable {
   private Optional<HttpClientFactory> _httpClientFactory;
   private Optional<RestClient> _restClient;
   private final FlowconfigsRequestBuilders _flowconfigsRequestBuilders;
+  public static final String DELETE_STATE_STORE_KEY = "delete.state.store";
 
   /**
    * Construct a {@link FlowConfigClient} to communicate with http flow config 
server at URI serverUri
@@ -156,6 +157,23 @@ public class FlowConfigClient implements Closeable {
     response.getResponse();
   }
 
+  /**
+   * Delete a flow configuration
+   * @param flowId identifier of flow configuration to delete
+   * @throws RemoteInvocationException
+   */
+  public void deleteFlowConfigWithStateStore(FlowId flowId)
+      throws RemoteInvocationException {
+    LOG.debug("deleteFlowConfig and state store with groupName " + 
flowId.getFlowGroup() + " flowName " +
+        flowId.getFlowName());
+
+    DeleteRequest<FlowConfig> deleteRequest = 
_flowconfigsRequestBuilders.delete()
+        .id(new ComplexResourceKey<>(flowId, new 
EmptyRecord())).setHeader(DELETE_STATE_STORE_KEY, 
Boolean.TRUE.toString()).build();
+    ResponseFuture<EmptyRecord> response = 
_restClient.get().sendRequest(deleteRequest);
+
+    response.getResponse();
+  }
+
   @Override
   public void close()
       throws IOException {

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ccd7ba76/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigsResource.java
----------------------------------------------------------------------
diff --git 
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigsResource.java
 
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigsResource.java
index f0bce17..9074a43 100644
--- 
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigsResource.java
+++ 
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigsResource.java
@@ -19,7 +19,9 @@ package org.apache.gobblin.service;
 
 import java.net.URI;
 import java.net.URISyntaxException;
+import java.util.Map;
 import java.util.Properties;
+import java.util.Set;
 
 import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
@@ -46,12 +48,17 @@ import org.apache.gobblin.runtime.api.FlowSpec;
 import org.apache.gobblin.runtime.api.SpecNotFoundException;
 import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
 
+import com.google.common.collect.ImmutableSet;
+
+
 /**
  * Resource for handling flow configuration requests
  */
 @RestLiCollection(name = "flowconfigs", namespace = 
"org.apache.gobblin.service", keyName = "id")
 public class FlowConfigsResource extends ComplexKeyResourceTemplate<FlowId, 
EmptyRecord, FlowConfig> {
   private static final Logger LOG = 
LoggerFactory.getLogger(FlowConfigsResource.class);
+  private static final Set<String> ALLOWED_METADATA = 
ImmutableSet.of("delete.state.store");
+
 
   @edu.umd.cs.findbugs.annotations.SuppressWarnings("MS_SHOULD_BE_FINAL")
   public static FlowCatalog _globalFlowCatalog;
@@ -234,7 +241,7 @@ public class FlowConfigsResource extends 
ComplexKeyResourceTemplate<FlowId, Empt
       flowUri = new URI(flowCatalogURI.getScheme(), 
flowCatalogURI.getAuthority(),
           "/" + flowGroup + "/" + flowName, null, null);
 
-      getFlowCatalog().remove(flowUri);
+      getFlowCatalog().remove(flowUri, getHeaders());
 
       return new UpdateResponse(HttpStatus.S_200_OK);
     } catch (URISyntaxException e) {
@@ -244,6 +251,16 @@ public class FlowConfigsResource extends 
ComplexKeyResourceTemplate<FlowId, Empt
     return null;
   }
 
+  private Properties getHeaders() {
+    Properties headerProperties = new Properties();
+    for (Map.Entry<String, String> entry : 
getContext().getRequestHeaders().entrySet()) {
+      if (ALLOWED_METADATA.contains(entry.getKey())) {
+        headerProperties.put(entry.getKey(), entry.getValue());
+      }
+    }
+    return headerProperties;
+  }
+
   /***
    * This method is to workaround injection issues where Service has only one 
active global FlowCatalog
    * .. and is not able to inject it via RestLI bootstrap. We should remove 
this and make injected

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ccd7ba76/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobSpec.java
----------------------------------------------------------------------
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobSpec.java 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobSpec.java
index 0ae943e..203ea8d 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobSpec.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobSpec.java
@@ -19,11 +19,13 @@ package org.apache.gobblin.runtime.api;
 import java.io.IOException;
 import java.net.URI;
 import java.net.URISyntaxException;
+import java.util.Map;
 import java.util.Properties;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableMap;
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
 
@@ -34,8 +36,10 @@ import org.apache.gobblin.util.ConfigUtils;
 
 import lombok.AllArgsConstructor;
 import lombok.Data;
+import lombok.extern.slf4j.Slf4j;
 
 
+@Slf4j
 /**
  * Defines a Gobblin Job that can be run once, or multiple times. A {@link 
JobSpec} is
  * {@link Configurable} so it has an associated {@link Config}, along with 
other mandatory
@@ -64,6 +68,12 @@ public class JobSpec implements Configurable, Spec {
   /** URI of {@link org.apache.gobblin.runtime.api.JobTemplate} to use. */
   Optional<URI> templateURI;
 
+  /** Metadata can contain properties which are not a part of config, e.g. 
Verb */
+  Map<String, String> metadata;
+
+  /** A Verb identifies if the Spec is for Insert/Update/Delete */
+  public static final String VERB_KEY = "Verb";
+
   public static Builder builder(URI jobSpecUri) {
     return new Builder(jobSpecUri);
   }
@@ -131,6 +141,7 @@ public class JobSpec implements Configurable, Spec {
     private Optional<String> description = Optional.absent();
     private Optional<URI> jobCatalogURI = Optional.absent();
     private Optional<URI> templateURI = Optional.absent();
+    private Optional<Map> metadata = Optional.absent();
 
     public Builder(URI jobSpecUri) {
       Preconditions.checkNotNull(jobSpecUri);
@@ -156,7 +167,7 @@ public class JobSpec implements Configurable, Spec {
       Preconditions.checkNotNull(this.uri);
       Preconditions.checkNotNull(this.version);
       return new JobSpec(getURI(), getVersion(), getDescription(), getConfig(),
-                         getConfigAsProperties(), getTemplateURI());
+                         getConfigAsProperties(), getTemplateURI(), 
getMetadata());
     }
 
     /** The scheme and authority of the job catalog URI are used to generate 
JobSpec URIs from
@@ -289,6 +300,24 @@ public class JobSpec implements Configurable, Spec {
       this.templateURI = Optional.of(templateURI);
       return this;
     }
+
+    public Map getDefaultMetadata() {
+      log.warn("Job Spec Verb is not provided, using type 'UNKNOWN'.");
+      return ImmutableMap.of(VERB_KEY, SpecExecutor.Verb.UNKNOWN.name());
+    }
+
+    public Map getMetadata() {
+      if (!this.metadata.isPresent()) {
+        this.metadata = Optional.of(getDefaultMetadata());
+      }
+      return this.metadata.get();
+    }
+
+    public Builder withMetadata(Map<String, String> metadata) {
+      Preconditions.checkNotNull(metadata);
+      this.metadata = Optional.of(metadata);
+      return this;
+    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ccd7ba76/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MutableSpecCatalog.java
----------------------------------------------------------------------
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MutableSpecCatalog.java
 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MutableSpecCatalog.java
index 108a324..7a3e946 100644
--- 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MutableSpecCatalog.java
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MutableSpecCatalog.java
@@ -18,15 +18,12 @@
 package org.apache.gobblin.runtime.api;
 
 import java.net.URI;
-import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Properties;
 import java.util.concurrent.TimeUnit;
 
-import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.instrumented.Instrumented;
-import org.apache.gobblin.metrics.ContextAwareMetric;
 import org.apache.gobblin.metrics.ContextAwareTimer;
-import org.apache.gobblin.util.ConfigUtils;
 
 import com.google.common.base.Optional;
 import com.typesafe.config.Config;
@@ -51,7 +48,7 @@ public interface MutableSpecCatalog extends SpecCatalog {
    * Removes an existing {@link Spec} with the given URI.
    * Throws SpecNotFoundException if such {@link Spec} does not exist.
    */
-  void remove(URI uri) throws SpecNotFoundException;
+  void remove(URI uri, Properties headers) throws SpecNotFoundException;
 
   @Slf4j
   public static class MutableStandardMetrics extends StandardMetrics {

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ccd7ba76/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/SpecCatalog.java
----------------------------------------------------------------------
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/SpecCatalog.java 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/SpecCatalog.java
index 457be9a..024c20c 100644
--- 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/SpecCatalog.java
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/SpecCatalog.java
@@ -19,13 +19,12 @@ package org.apache.gobblin.runtime.api;
 
 import java.net.URI;
 import java.util.Collection;
-import java.util.List;
+import java.util.Properties;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 
 import com.google.common.base.Optional;
 import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Lists;
 import com.typesafe.config.Config;
 
 import org.apache.gobblin.configuration.ConfigurationKeys;
@@ -33,7 +32,6 @@ import org.apache.gobblin.instrumented.GobblinMetricsKeys;
 import org.apache.gobblin.instrumented.Instrumented;
 import org.apache.gobblin.instrumented.StandardMetricsBridge;
 import org.apache.gobblin.metrics.ContextAwareGauge;
-import org.apache.gobblin.metrics.ContextAwareMetric;
 import org.apache.gobblin.metrics.ContextAwareTimer;
 import org.apache.gobblin.metrics.GobblinTrackingEvent;
 import org.apache.gobblin.metrics.MetricContext;
@@ -141,7 +139,7 @@ public interface SpecCatalog extends 
SpecCatalogListenersContainer, StandardMetr
     }
 
     @Override
-    public void onDeleteSpec(URI deletedSpecURI, String deletedSpecVersion) {
+    public void onDeleteSpec(URI deletedSpecURI, String deletedSpecVersion, 
Properties headers) {
       this.totalDeletedSpecs.incrementAndGet();
       submitTrackingEvent(deletedSpecURI, deletedSpecVersion, 
SPEC_DELETED_OPERATION_TYPE);
     }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ccd7ba76/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/SpecCatalogListener.java
----------------------------------------------------------------------
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/SpecCatalogListener.java
 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/SpecCatalogListener.java
index 2b0aa40..1448231 100644
--- 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/SpecCatalogListener.java
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/SpecCatalogListener.java
@@ -18,6 +18,7 @@
 package org.apache.gobblin.runtime.api;
 
 import java.net.URI;
+import java.util.Properties;
 
 import com.google.common.base.Objects;
 
@@ -31,7 +32,7 @@ public interface SpecCatalogListener {
   /**
    * Invoked when a {@link Spec} gets removed from the catalog.
    */
-  public void onDeleteSpec(URI deletedSpecURI, String deletedSpecVersion);
+  public void onDeleteSpec(URI deletedSpecURI, String deletedSpecVersion, 
Properties headers);
 
   /**
    * Invoked when the contents of a {@link Spec} gets updated in the catalog.
@@ -56,18 +57,20 @@ public interface SpecCatalogListener {
   public static class DeleteSpecCallback extends Callback<SpecCatalogListener, 
Void> {
     private final URI _deletedSpecURI;
     private final String _deletedSpecVersion;
+    private final Properties _headers;
 
-    public DeleteSpecCallback(URI deletedSpecURI, String deletedSpecVersion) {
+    public DeleteSpecCallback(URI deletedSpecURI, String deletedSpecVersion, 
Properties headers) {
       super(Objects.toStringHelper("onDeleteSpec")
           .add("deletedSpecURI", deletedSpecURI)
           .add("deletedSpecVersion", deletedSpecVersion)
           .toString());
       _deletedSpecURI = deletedSpecURI;
       _deletedSpecVersion = deletedSpecVersion;
+      _headers = headers;
     }
 
     @Override public Void apply(SpecCatalogListener listener) {
-      listener.onDeleteSpec(_deletedSpecURI, _deletedSpecVersion);
+      listener.onDeleteSpec(_deletedSpecURI, _deletedSpecVersion, _headers);
       return null;
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ccd7ba76/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_monitor/AvroJobSpecKafkaJobMonitor.java
----------------------------------------------------------------------
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_monitor/AvroJobSpecKafkaJobMonitor.java
 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_monitor/AvroJobSpecKafkaJobMonitor.java
index 59733d3..e035326 100644
--- 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_monitor/AvroJobSpecKafkaJobMonitor.java
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_monitor/AvroJobSpecKafkaJobMonitor.java
@@ -23,6 +23,7 @@ import java.net.URISyntaxException;
 import java.util.Collection;
 import java.util.Properties;
 
+import com.google.common.base.Charsets;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Lists;
@@ -36,10 +37,12 @@ import org.apache.gobblin.runtime.api.JobSpec;
 import org.apache.gobblin.runtime.api.JobSpecMonitor;
 import org.apache.gobblin.runtime.api.JobSpecMonitorFactory;
 import org.apache.gobblin.runtime.api.MutableJobCatalog;
-import org.apache.gobblin.runtime.api.SpecExecutor.Verb;
+import org.apache.gobblin.runtime.api.SpecExecutor;
 import org.apache.gobblin.runtime.job_spec.AvroJobSpec;
 import org.apache.gobblin.util.Either;
 import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
+
+import kafka.message.MessageAndMetadata;
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 
@@ -54,7 +57,7 @@ public class AvroJobSpecKafkaJobMonitor extends 
KafkaAvroJobMonitor<AvroJobSpec>
   public static final String CONFIG_PREFIX = "gobblin.jobMonitor.avroJobSpec";
   public static final String TOPIC_KEY = "topic";
   public static final String SCHEMA_VERSION_READER_CLASS = 
"versionReaderClass";
-  protected static final String VERB_KEY = "Verb";
+  public static final String DELETE_STATE_STORE_KEY = "delete.state.store";
 
   private static final Config DEFAULTS = 
ConfigFactory.parseMap(ImmutableMap.of(
       SCHEMA_VERSION_READER_CLASS, FixedSchemaVersionWriter.class.getName()));
@@ -103,9 +106,9 @@ public class AvroJobSpecKafkaJobMonitor extends 
KafkaAvroJobMonitor<AvroJobSpec>
   }
 
   /**
-   * Creates a {@link JobSpec} or {@link URI} from the {@link AvroJobSpec} 
record.
+   * Creates {@link JobSpec} from the {@link AvroJobSpec} record.
    * @param record the record as an {@link AvroJobSpec}
-   * @return a {@link JobSpec} or {@link URI} wrapped in a {@link Collection} 
of {@link Either}
+   * @return a {@link JobSpec} wrapped in a {@link Collection} of {@link 
Either}
    */
   @Override
   public Collection<Either<JobSpec, URI>> parseJobSpec(AvroJobSpec record) {
@@ -114,7 +117,7 @@ public class AvroJobSpecKafkaJobMonitor extends 
KafkaAvroJobMonitor<AvroJobSpec>
     Properties props = new Properties();
     props.putAll(record.getProperties());
     
jobSpecBuilder.withJobCatalogURI(record.getUri()).withVersion(record.getVersion())
-        
.withDescription(record.getDescription()).withConfigAsProperties(props);
+        
.withDescription(record.getDescription()).withConfigAsProperties(props).withMetadata(record.getMetadata());
 
     if (!record.getTemplateUri().isEmpty()) {
       try {
@@ -124,17 +127,52 @@ public class AvroJobSpecKafkaJobMonitor extends 
KafkaAvroJobMonitor<AvroJobSpec>
       }
     }
 
-    String verbName = record.getMetadata().get(VERB_KEY);
-    Verb verb = Verb.valueOf(verbName);
-
     JobSpec jobSpec = jobSpecBuilder.build();
 
     log.info("Parsed job spec " + jobSpec.toString());
 
-    if (verb == Verb.ADD || verb == Verb.UPDATE) {
-      return Lists.newArrayList(Either.<JobSpec, URI>left(jobSpec));
-    } else {
-      return Lists.newArrayList(Either.<JobSpec, URI>right(jobSpec.getUri()));
+    return Lists.newArrayList(Either.<JobSpec, URI>left(jobSpec));
+  }
+
+  @Override
+  protected void processMessage(MessageAndMetadata<byte[], byte[]> message) {
+    try {
+      Collection<Either<JobSpec, URI>> parsedCollection = 
parseJobSpec(message.message());
+      for (Either<JobSpec, URI> parsedMessage : parsedCollection) {
+        JobSpec jobSpec = ((Either.Left<JobSpec, URI>)parsedMessage).getLeft();
+        if 
(jobSpec.getMetadata().get(JobSpec.VERB_KEY).equalsIgnoreCase(SpecExecutor.Verb.DELETE.name()))
 {
+          this.removedSpecs.inc();
+          URI jobSpecUri = jobSpec.getUri();
+          this.jobCatalog.remove(jobSpecUri);
+
+          // Refer FlowConfigsResources:delete to understand the pattern of 
flow URI
+          // FlowToJobSpec Compilers use the flowSpecURI to derive jobSpecURI
+          if (jobSpec.getConfig().hasPath(DELETE_STATE_STORE_KEY) &&
+              
Boolean.parseBoolean(jobSpec.getConfig().getString(DELETE_STATE_STORE_KEY))) {
+            // Delete the job state if it is a delete spec request
+            String[] uriTokens = jobSpecUri.getPath().split("/");
+            if (null == this.datasetStateStore) {
+              log.warn("Job state store deletion failed as datasetstore is not 
initialized.");
+              continue;
+            }
+            if (uriTokens.length != 3) {
+              log.error("Invalid URI {}.", jobSpecUri);
+              continue;
+            }
+            String jobName = uriTokens[2];
+            this.datasetStateStore.delete(jobName);
+            log.info("JobSpec {} deleted with statestore.", jobSpecUri);
+          } else {
+            log.info("JobSpec {} deleted keeping statestore.", jobSpecUri);
+          }
+        } else {
+          this.newSpecs.inc();
+          this.jobCatalog.put(jobSpec);
+        }
+      }
+    } catch (IOException ioe) {
+      String messageStr = new String(message.message(), Charsets.UTF_8);
+      log.error(String.format("Failed to delete job/jobStateStore or parse 
kafka message with offset %d: %s.", message.offset(), messageStr), ioe);
     }
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ccd7ba76/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_monitor/KafkaJobMonitor.java
----------------------------------------------------------------------
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_monitor/KafkaJobMonitor.java
 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_monitor/KafkaJobMonitor.java
index 6902eae..9d79fc0 100644
--- 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_monitor/KafkaJobMonitor.java
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_monitor/KafkaJobMonitor.java
@@ -26,7 +26,6 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Charsets;
 import com.typesafe.config.Config;
 
-import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.metastore.DatasetStateStore;
 import org.apache.gobblin.runtime.api.JobSpec;
 import org.apache.gobblin.runtime.api.JobSpecMonitor;
@@ -52,13 +51,13 @@ public abstract class KafkaJobMonitor extends 
HighLevelConsumer<byte[], byte[]>
   public static final String KAFKA_AUTO_OFFSET_RESET_KEY = 
KAFKA_JOB_MONITOR_PREFIX + ".auto.offset.reset";
   public static final String KAFKA_AUTO_OFFSET_RESET_SMALLEST = "smallest";
   public static final String KAFKA_AUTO_OFFSET_RESET_LARGEST = "largest";
-  private DatasetStateStore datasetStateStore;
-  private final MutableJobCatalog jobCatalog;
+  protected DatasetStateStore datasetStateStore;
+  protected final MutableJobCatalog jobCatalog;
 
   @Getter
-  private Counter newSpecs;
+  protected Counter newSpecs;
   @Getter
-  private Counter remmovedSpecs;
+  protected Counter removedSpecs;
 
   /**
    * @return A collection of either {@link JobSpec}s to add/update or {@link 
URI}s to remove from the catalog,
@@ -81,7 +80,7 @@ public abstract class KafkaJobMonitor extends 
HighLevelConsumer<byte[], byte[]>
   protected void createMetrics() {
     super.createMetrics();
     this.newSpecs = 
this.getMetricContext().counter(RuntimeMetrics.GOBBLIN_JOB_MONITOR_KAFKA_NEW_SPECS);
-    this.remmovedSpecs = 
this.getMetricContext().counter(RuntimeMetrics.GOBBLIN_JOB_MONITOR_KAFKA_REMOVED_SPECS);
+    this.removedSpecs = 
this.getMetricContext().counter(RuntimeMetrics.GOBBLIN_JOB_MONITOR_KAFKA_REMOVED_SPECS);
   }
 
   @VisibleForTesting
@@ -106,19 +105,8 @@ public abstract class KafkaJobMonitor extends 
HighLevelConsumer<byte[], byte[]>
           this.newSpecs.inc();
           this.jobCatalog.put(((Either.Left<JobSpec, URI>) 
parsedMessage).getLeft());
         } else if (parsedMessage instanceof Either.Right) {
-          this.remmovedSpecs.inc();
+          this.removedSpecs.inc();
           this.jobCatalog.remove(((Either.Right<JobSpec, URI>) 
parsedMessage).getRight());
-
-          // Refer FlowConfigsResources:delete to understand the pattern of 
flow URI
-          // FlowToJobSpec Compilers use the flowSpecURI to derive jobSpecURI
-          String[] uriTokens = ((URI)(((Either.Right) 
parsedMessage).getRight())).getPath().split("/");
-          if (uriTokens.length == 3) {
-            String jobName = uriTokens[2];
-            // Delete the job state if it is a delete spec request
-            if (this.datasetStateStore != null) {
-              this.datasetStateStore.delete(jobName);
-            }
-          }
         }
       }
     } catch (IOException ioe) {

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ccd7ba76/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_spec/ResolvedJobSpec.java
----------------------------------------------------------------------
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_spec/ResolvedJobSpec.java
 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_spec/ResolvedJobSpec.java
index 8847467..b64f178 100644
--- 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_spec/ResolvedJobSpec.java
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_spec/ResolvedJobSpec.java
@@ -59,7 +59,7 @@ public class ResolvedJobSpec extends JobSpec {
   public ResolvedJobSpec(JobSpec other, JobCatalog catalog)
       throws SpecNotFoundException, JobTemplate.TemplateException {
     super(other.getUri(), other.getVersion(), other.getDescription(), 
resolveConfig(other, catalog),
-        ConfigUtils.configToProperties(resolveConfig(other, catalog)), 
other.getTemplateURI());
+        ConfigUtils.configToProperties(resolveConfig(other, catalog)), 
other.getTemplateURI(), other.getMetadata());
     this.originalJobSpec = other;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ccd7ba76/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java
----------------------------------------------------------------------
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java
 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java
index f78be47..f9ae420 100644
--- 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java
@@ -23,6 +23,8 @@ import java.net.URI;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
+import java.util.Properties;
+
 import javax.annotation.Nonnull;
 
 import org.apache.commons.lang3.SerializationUtils;
@@ -255,8 +257,12 @@ public class FlowCatalog extends AbstractIdleService 
implements SpecCatalog, Mut
     }
   }
 
-  @Override
   public void remove(URI uri) {
+    remove(uri, new Properties());
+  }
+
+  @Override
+  public void remove(URI uri, Properties headers) {
     try {
       Preconditions.checkState(state() == State.RUNNING, String.format("%s is 
not running.", this.getClass().getName()));
       Preconditions.checkNotNull(uri);
@@ -264,7 +270,7 @@ public class FlowCatalog extends AbstractIdleService 
implements SpecCatalog, Mut
       log.info(String.format("Removing FlowSpec with URI: %s", uri));
       specStore.deleteSpec(uri);
       this.metrics.updateRemoveSpecTime(startTime);
-      this.listeners.onDeleteSpec(uri, FlowSpec.Builder.DEFAULT_VERSION);
+      this.listeners.onDeleteSpec(uri, FlowSpec.Builder.DEFAULT_VERSION, 
headers);
 
     } catch (IOException e) {
       throw new RuntimeException("Cannot delete Spec from Spec store for URI: 
" + uri, e);

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ccd7ba76/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/SpecCatalogListenersList.java
----------------------------------------------------------------------
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/SpecCatalogListenersList.java
 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/SpecCatalogListenersList.java
index cdb9379..f2cd04b 100644
--- 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/SpecCatalogListenersList.java
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/SpecCatalogListenersList.java
@@ -56,6 +56,7 @@ import java.io.IOException;
 import java.net.URI;
 import java.util.List;
 import java.util.concurrent.ExecutorService;
+import java.util.Properties;
 
 import org.slf4j.Logger;
 
@@ -109,11 +110,11 @@ public class SpecCatalogListenersList implements 
SpecCatalogListener, SpecCatalo
   }
 
   @Override
-  public synchronized void onDeleteSpec(URI deletedSpecURI, String 
deletedSpecVersion) {
+  public synchronized void onDeleteSpec(URI deletedSpecURI, String 
deletedSpecVersion, Properties headers) {
     Preconditions.checkNotNull(deletedSpecURI);
 
     try {
-      _disp.execCallbacks(new 
SpecCatalogListener.DeleteSpecCallback(deletedSpecURI, deletedSpecVersion));
+      _disp.execCallbacks(new 
SpecCatalogListener.DeleteSpecCallback(deletedSpecURI, deletedSpecVersion, 
headers));
     } catch (InterruptedException e) {
       getLog().warn("onDeleteSpec interrupted.");
     }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ccd7ba76/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/TopologyCatalog.java
----------------------------------------------------------------------
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/TopologyCatalog.java
 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/TopologyCatalog.java
index 5c25a67..a842abd 100644
--- 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/TopologyCatalog.java
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/TopologyCatalog.java
@@ -23,6 +23,7 @@ import java.net.URI;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
+import java.util.Properties;
 import java.util.concurrent.CountDownLatch;
 
 import javax.annotation.Nonnull;
@@ -241,14 +242,18 @@ public class TopologyCatalog extends AbstractIdleService 
implements SpecCatalog,
     }
   }
 
-  @Override
   public void remove(URI uri) {
+    remove(uri, new Properties());
+  }
+
+    @Override
+  public void remove(URI uri, Properties headers) {
     try {
       Preconditions.checkState(state() == Service.State.RUNNING, 
String.format("%s is not running.", this.getClass().getName()));
       Preconditions.checkNotNull(uri);
 
       log.info(String.format("Removing TopologySpec with URI: %s", uri));
-      this.listeners.onDeleteSpec(uri, FlowSpec.Builder.DEFAULT_VERSION);
+      this.listeners.onDeleteSpec(uri, FlowSpec.Builder.DEFAULT_VERSION, 
headers);
       specStore.deleteSpec(uri);
     } catch (IOException e) {
       throw new RuntimeException("Cannot delete Spec from Spec store for URI: 
" + uri, e);

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ccd7ba76/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_executorInstance/InMemorySpecProducer.java
----------------------------------------------------------------------
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_executorInstance/InMemorySpecProducer.java
 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_executorInstance/InMemorySpecProducer.java
index 80f64ec..cc74757 100644
--- 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_executorInstance/InMemorySpecProducer.java
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_executorInstance/InMemorySpecProducer.java
@@ -21,6 +21,7 @@ import java.io.Serializable;
 import java.net.URI;
 import java.util.List;
 import java.util.Map;
+import java.util.Properties;
 import java.util.concurrent.Future;
 
 import com.google.common.collect.Lists;
@@ -65,7 +66,7 @@ public class InMemorySpecProducer implements 
SpecProducer<Spec>, Serializable {
   }
 
   @Override
-  public Future<?> deleteSpec(URI deletedSpecURI) {
+  public Future<?> deleteSpec(URI deletedSpecURI, Properties headers) {
     if (!provisionedSpecs.containsKey(deletedSpecURI)) {
       throw new RuntimeException("Spec not found: " + deletedSpecURI);
     }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ccd7ba76/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/job_monitor/KafkaJobMonitorTest.java
----------------------------------------------------------------------
diff --git 
a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/job_monitor/KafkaJobMonitorTest.java
 
b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/job_monitor/KafkaJobMonitorTest.java
index 57f99a9..c825a12 100644
--- 
a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/job_monitor/KafkaJobMonitorTest.java
+++ 
b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/job_monitor/KafkaJobMonitorTest.java
@@ -19,16 +19,12 @@ package org.apache.gobblin.runtime.job_monitor;
 
 import java.net.URI;
 
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
 import com.google.common.base.Optional;
 import com.typesafe.config.Config;
 
-import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.runtime.kafka.HighLevelConsumerTest;
 
 
@@ -38,8 +34,6 @@ public class KafkaJobMonitorTest {
   public void test() throws Exception {
 
     Config config = 
HighLevelConsumerTest.getSimpleConfig(Optional.of(KafkaJobMonitor.KAFKA_JOB_MONITOR_PREFIX));
-    String stateStoreRootDir = 
config.getString(ConfigurationKeys.STATE_STORE_ROOT_DIR_KEY);
-    FileSystem fs = FileSystem.getLocal(new Configuration());
 
     MockedKafkaJobMonitor monitor = MockedKafkaJobMonitor.create(config);
     monitor.startAsync();
@@ -66,18 +60,6 @@ public class KafkaJobMonitorTest {
     Assert.assertTrue(monitor.getJobSpecs().containsKey(new URI("job2")));
     Assert.assertEquals(monitor.getJobSpecs().get(new 
URI("job2")).getVersion(), "2");
 
-    monitor.getMockKafkaStream().pushToStream("/flow3/job3:1");
-    monitor.awaitExactlyNSpecs(3);
-    Assert.assertTrue(monitor.getJobSpecs().containsKey(new 
URI("/flow3/job3")));
-
-    // TODO: Currently, state stores are not categorized by flow name.
-    //       This can lead to one job overwriting other jobs' job state.
-    fs.create(new Path(stateStoreRootDir, "job3"));
-    Assert.assertTrue(fs.exists(new Path(stateStoreRootDir, "job3")));
-    monitor.getMockKafkaStream().pushToStream(MockedKafkaJobMonitor.REMOVE + 
":/flow3/job3");
-    monitor.awaitExactlyNSpecs(2);
-    Assert.assertFalse(fs.exists(new Path(stateStoreRootDir, "job3")));
-
     monitor.shutDown();
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ccd7ba76/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/job_monitor/MockedKafkaJobMonitor.java
----------------------------------------------------------------------
diff --git 
a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/job_monitor/MockedKafkaJobMonitor.java
 
b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/job_monitor/MockedKafkaJobMonitor.java
index 7d3ef37..9e55236 100644
--- 
a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/job_monitor/MockedKafkaJobMonitor.java
+++ 
b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/job_monitor/MockedKafkaJobMonitor.java
@@ -98,6 +98,7 @@ class MockedKafkaJobMonitor extends KafkaJobMonitor {
     return jobCatalog;
   }
 
+
   @Override
   public Collection<Either<JobSpec, URI>> parseJobSpec(byte[] message)
       throws IOException {

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ccd7ba76/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/BaseFlowToJobSpecCompiler.java
----------------------------------------------------------------------
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/BaseFlowToJobSpecCompiler.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/BaseFlowToJobSpecCompiler.java
index 855d692..e4c9ae3 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/BaseFlowToJobSpecCompiler.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/BaseFlowToJobSpecCompiler.java
@@ -22,6 +22,8 @@ import java.net.URI;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.Properties;
+
 import javax.annotation.Nonnull;
 
 import com.codahale.metrics.Meter;
@@ -158,8 +160,12 @@ public abstract class BaseFlowToJobSpecCompiler implements 
SpecCompiler {
     topologySpecMap.put(addedSpec.getUri(), (TopologySpec) addedSpec);
   }
 
+  public void onDeleteSpec(URI deletedSpecURI, String deletedSpecVersion) {
+    onDeleteSpec(deletedSpecURI, deletedSpecVersion, new Properties());
+  }
+
   @Override
-  public synchronized void onDeleteSpec(URI deletedSpecURI, String 
deletedSpecVersion) {
+  public synchronized void onDeleteSpec(URI deletedSpecURI, String 
deletedSpecVersion, Properties headers) {
     if (topologySpecMap.containsKey(deletedSpecURI)) {
       topologySpecMap.remove(deletedSpecURI);
     }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ccd7ba76/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
----------------------------------------------------------------------
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
index e2d36aa..1b3907d 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
@@ -22,6 +22,7 @@ import java.net.URI;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.Properties;
 import java.util.concurrent.TimeUnit;
 import javax.annotation.Nonnull;
 
@@ -144,13 +145,17 @@ public class Orchestrator implements SpecCatalogListener, 
Instrumentable {
     }
   }
 
+  public void onDeleteSpec(URI deletedSpecURI, String deletedSpecVersion) {
+    onDeleteSpec(deletedSpecURI, deletedSpecVersion, new Properties());
+  }
+
   /** {@inheritDoc} */
   @Override
-  public void onDeleteSpec(URI deletedSpecURI, String deletedSpecVersion) {
+  public void onDeleteSpec(URI deletedSpecURI, String deletedSpecVersion, 
Properties headers) {
     _log.info("Spec deletion detected: " + deletedSpecURI + "/" + 
deletedSpecVersion);
 
     if (topologyCatalog.isPresent()) {
-      this.specCompiler.onDeleteSpec(deletedSpecURI, deletedSpecVersion);
+      this.specCompiler.onDeleteSpec(deletedSpecURI, deletedSpecVersion, 
headers);
     }
   }
 
@@ -211,7 +216,7 @@ public class Orchestrator implements SpecCatalogListener, 
Instrumentable {
     Instrumented.updateTimer(this.flowOrchestrationTimer, System.nanoTime() - 
startTime, TimeUnit.NANOSECONDS);
   }
 
-  public void remove(Spec spec) {
+  public void remove(Spec spec, Properties headers) {
     // TODO: Evolve logic to cache and reuse previously compiled JobSpecs
     // .. this will work for Identity compiler but not always for multi-hop.
     // Note: Current logic assumes compilation is consistent between all 
executions
@@ -232,7 +237,7 @@ public class Orchestrator implements SpecCatalogListener, 
Instrumentable {
           Spec jobSpec = specsToDelete.getKey();
 
           _log.info(String.format("Going to delete JobSpec: %s on Executor: 
%s", jobSpec, producer));
-          producer.deleteSpec(jobSpec.getUri());
+          producer.deleteSpec(jobSpec.getUri(), headers);
         } catch(Exception e) {
           _log.error("Cannot successfully delete spec: " + 
specsToDelete.getKey() + " on executor: " + producer +
               " for flow: " + spec, e);

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ccd7ba76/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
----------------------------------------------------------------------
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
index ae18fc2..328c742 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
@@ -237,9 +237,13 @@ public class GobblinServiceJobScheduler extends 
JobScheduler implements SpecCata
     }
   }
 
+  public void onDeleteSpec(URI deletedSpecURI, String deletedSpecVersion) {
+    onDeleteSpec(deletedSpecURI, deletedSpecVersion, new Properties());
+  }
+
   /** {@inheritDoc} */
   @Override
-  public void onDeleteSpec(URI deletedSpecURI, String deletedSpecVersion) {
+  public void onDeleteSpec(URI deletedSpecURI, String deletedSpecVersion, 
Properties headers) {
     if (this.helixManager.isPresent() && 
!this.helixManager.get().isConnected()) {
       // Specs in store will be notified when Scheduler is added as listener 
to FlowCatalog, so ignore
       // .. Specs if in cluster mode and Helix is not yet initialized
@@ -259,7 +263,7 @@ public class GobblinServiceJobScheduler extends 
JobScheduler implements SpecCata
     try {
       Spec deletedSpec = 
this.scheduledFlowSpecs.get(deletedSpecURI.toString());
       if (null != deletedSpec) {
-        this.orchestrator.remove(deletedSpec);
+        this.orchestrator.remove(deletedSpec, headers);
         this.scheduledFlowSpecs.remove(deletedSpecURI.toString());
         unscheduleJob(deletedSpecURI.toString());
       } else {

Reply via email to