This is an automated email from the ASF dual-hosted git repository.

suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new ae62d77  [GOBBLIN-1441] separate delete and cancel specs in 
KafkaJobMonitor
ae62d77 is described below

commit ae62d77a0b3b0d123cea811b00bd363b6c6d30f3
Author: Arjun <[email protected]>
AuthorDate: Mon May 10 17:17:01 2021 -0700

    [GOBBLIN-1441] separate delete and cancel specs in KafkaJobMonitor
    
    Closes #3276 from
    arjun4084346/cancelDeleteInCluster
---
 .../gobblin/cluster/GobblinClusterManager.java     |  4 --
 .../gobblin/runtime/KafkaAvroJobMonitorTest.java   | 12 ++---
 .../runtime/SLAEventKafkaJobMonitorTest.java       |  8 ++--
 .../gobblin/service/SimpleKafkaSpecConsumer.java   |  7 ++-
 .../gobblin/service/SimpleKafkaSpecExecutor.java   |  3 --
 .../apache/gobblin/runtime/api/FsSpecConsumer.java |  3 +-
 .../apache/gobblin/runtime/api/FsSpecProducer.java |  6 +--
 .../org/apache/gobblin/runtime/api/JobSpec.java    |  3 +-
 .../job_monitor/AvroJobSpecKafkaJobMonitor.java    | 14 ++----
 .../runtime/job_monitor/KafkaAvroJobMonitor.java   |  7 +--
 .../runtime/job_monitor/KafkaJobMonitor.java       | 52 ++++++++++++++++------
 .../job_monitor/SLAEventKafkaJobMonitor.java       |  4 +-
 .../runtime/job_monitor/MockedKafkaJobMonitor.java | 20 +++++----
 13 files changed, 73 insertions(+), 70 deletions(-)

diff --git 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterManager.java
 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterManager.java
index c2415e6..384176b 100644
--- 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterManager.java
+++ 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterManager.java
@@ -379,10 +379,6 @@ public class GobblinClusterManager implements 
ApplicationLauncher, StandardMetri
    * Build the {@link JobConfigurationManager} for the Application Master.
    */
   private JobConfigurationManager buildJobConfigurationManager(Config config) {
-    return create(config);
-  }
-
-  private JobConfigurationManager create(Config config) {
     try {
       List<Object> argumentList = (this.jobCatalog != null)? 
ImmutableList.of(this.eventBus, config, this.jobCatalog, this.fs) :
           ImmutableList.of(this.eventBus, config, this.fs);
diff --git 
a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaAvroJobMonitorTest.java
 
b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaAvroJobMonitorTest.java
index 2e63cc2..92f95a3 100644
--- 
a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaAvroJobMonitorTest.java
+++ 
b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaAvroJobMonitorTest.java
@@ -57,7 +57,7 @@ public class KafkaAvroJobMonitorTest {
         new AvroBinarySerializer<>(GobblinTrackingEvent.SCHEMA$, new 
NoopSchemaVersionWriter());
 
     GobblinTrackingEvent event = new GobblinTrackingEvent(0L, "namespace", 
"event", Maps.<String, String>newHashMap());
-    Collection<Either<JobSpec, URI>> results = 
monitor.parseJobSpec(serializer.serializeRecord(event));
+    Collection<JobSpec> results = 
monitor.parseJobSpec(serializer.serializeRecord(event));
     Assert.assertEquals(results.size(), 1);
     Assert.assertEquals(monitor.events.size(), 1);
     Assert.assertEquals(monitor.events.get(0), event);
@@ -76,7 +76,7 @@ public class KafkaAvroJobMonitorTest {
         new AvroBinarySerializer<>(MetricReport.SCHEMA$, new 
NoopSchemaVersionWriter());
 
     MetricReport event = new MetricReport(Maps.<String, String>newHashMap(), 
0L, Lists.<Metric>newArrayList());
-    Collection<Either<JobSpec, URI>> results = 
monitor.parseJobSpec(serializer.serializeRecord(event));
+    Collection<JobSpec> results = 
monitor.parseJobSpec(serializer.serializeRecord(event));
 
     Assert.assertEquals(results.size(), 0);
     Assert.assertEquals(monitor.events.size(), 0);
@@ -96,7 +96,7 @@ public class KafkaAvroJobMonitorTest {
         new AvroBinarySerializer<>(GobblinTrackingEvent.SCHEMA$, new 
FixedSchemaVersionWriter());
 
     GobblinTrackingEvent event = new GobblinTrackingEvent(0L, "namespace", 
"event", Maps.<String, String>newHashMap());
-    Collection<Either<JobSpec, URI>> results = 
monitor.parseJobSpec(serializer.serializeRecord(event));
+    Collection<JobSpec> results = 
monitor.parseJobSpec(serializer.serializeRecord(event));
     Assert.assertEquals(results.size(), 1);
     Assert.assertEquals(monitor.events.size(), 1);
     Assert.assertEquals(monitor.events.get(0), event);
@@ -114,7 +114,7 @@ public class KafkaAvroJobMonitorTest {
         new AvroBinarySerializer<>(GobblinTrackingEvent.SCHEMA$, new 
FixedSchemaVersionWriter());
 
     GobblinTrackingEvent event = new GobblinTrackingEvent(0L, "namespace", 
"event", Maps.<String, String>newHashMap());
-    Collection<Either<JobSpec, URI>> results = 
monitor.parseJobSpec(serializer.serializeRecord(event));
+    Collection<JobSpec> results = 
monitor.parseJobSpec(serializer.serializeRecord(event));
     Assert.assertEquals(results.size(), 0);
     Assert.assertEquals(monitor.events.size(), 0);
     Assert.assertEquals(monitor.getMessageParseFailures().getCount(), 1);
@@ -132,9 +132,9 @@ public class KafkaAvroJobMonitorTest {
     }
 
     @Override
-    public Collection<Either<JobSpec, URI>> parseJobSpec(GobblinTrackingEvent 
message) {
+    public Collection<JobSpec> parseJobSpec(GobblinTrackingEvent message) {
       this.events.add(message);
-      return Lists.newArrayList(Either.<JobSpec, 
URI>left(JobSpec.builder(message.getName()).build()));
+      return Lists.newArrayList(JobSpec.builder(message.getName()).build());
     }
 
     @Override
diff --git 
a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/SLAEventKafkaJobMonitorTest.java
 
b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/SLAEventKafkaJobMonitorTest.java
index 74564b7..ad74da9 100644
--- 
a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/SLAEventKafkaJobMonitorTest.java
+++ 
b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/SLAEventKafkaJobMonitorTest.java
@@ -68,10 +68,10 @@ public class SLAEventKafkaJobMonitorTest {
 
     GobblinTrackingEvent event = createSLAEvent("DatasetPublish", new 
URI("/data/myDataset"),
         ImmutableMap.of("metadataKey1","value1","key1","value2"));
-    Collection<Either<JobSpec, URI>> jobSpecs = monitor.parseJobSpec(event);
+    Collection<JobSpec> jobSpecs = monitor.parseJobSpec(event);
 
     Assert.assertEquals(jobSpecs.size(), 1);
-    JobSpec jobSpec = (JobSpec) jobSpecs.iterator().next().get();
+    JobSpec jobSpec = jobSpecs.iterator().next();
     Assert.assertEquals(jobSpec.getUri(), new URI("/base/URI/data/myDataset"));
     Assert.assertEquals(jobSpec.getTemplateURI().get(), templateURI);
     // should insert configuration from metadata
@@ -92,7 +92,7 @@ public class SLAEventKafkaJobMonitorTest {
     monitor.buildMetricsContextAndMetrics();
 
     GobblinTrackingEvent event;
-    Collection<Either<JobSpec, URI>> jobSpecs;
+    Collection<JobSpec> jobSpecs;
 
     event = createSLAEvent("acceptthis", new URI("/data/myDataset"), 
Maps.<String, String>newHashMap());
     jobSpecs = monitor.parseJobSpec(event);
@@ -123,7 +123,7 @@ public class SLAEventKafkaJobMonitorTest {
     monitor.buildMetricsContextAndMetrics();
 
     GobblinTrackingEvent event;
-    Collection<Either<JobSpec, URI>> jobSpecs;
+    Collection<JobSpec> jobSpecs;
 
     event = createSLAEvent("event", new URI("/accept/myDataset"), 
Maps.<String, String>newHashMap());
     jobSpecs = monitor.parseJobSpec(event);
diff --git 
a/gobblin-modules/gobblin-service-kafka/src/main/java/org/apache/gobblin/service/SimpleKafkaSpecConsumer.java
 
b/gobblin-modules/gobblin-service-kafka/src/main/java/org/apache/gobblin/service/SimpleKafkaSpecConsumer.java
index 139d204..19458f1 100644
--- 
a/gobblin-modules/gobblin-service-kafka/src/main/java/org/apache/gobblin/service/SimpleKafkaSpecConsumer.java
+++ 
b/gobblin-modules/gobblin-service-kafka/src/main/java/org/apache/gobblin/service/SimpleKafkaSpecConsumer.java
@@ -62,7 +62,6 @@ import 
org.apache.gobblin.source.extractor.extract.kafka.KafkaTopic;
 import org.apache.gobblin.util.CompletedFuture;
 import org.apache.gobblin.util.ConfigUtils;
 
-import static org.apache.gobblin.service.SimpleKafkaSpecExecutor.VERB_KEY;
 import lombok.extern.slf4j.Slf4j;
 
 
@@ -193,10 +192,10 @@ public class SimpleKafkaSpecConsumer implements 
SpecConsumer<Spec>, Closeable {
             jobSpecBuilder.withTemplate(new URI(record.getTemplateUri()));
           }
 
-          String verbName = record.getMetadata().get(VERB_KEY);
+          String verbName = record.getMetadata().get(SpecExecutor.VERB_KEY);
           SpecExecutor.Verb verb = SpecExecutor.Verb.valueOf(verbName);
 
-          changesSpecs.add(new ImmutablePair<SpecExecutor.Verb, Spec>(verb, 
jobSpecBuilder.build()));
+          changesSpecs.add(new ImmutablePair<>(verb, jobSpecBuilder.build()));
         } catch (Throwable t) {
           log.error("Could not decode record at partition " + 
this.currentPartitionIdx +
               " offset " + nextValidMessage.getOffset());
@@ -204,7 +203,7 @@ public class SimpleKafkaSpecConsumer implements 
SpecConsumer<Spec>, Closeable {
       }
     }
 
-    return new CompletedFuture(changesSpecs, null);
+    return new CompletedFuture<>(changesSpecs, null);
   }
 
   private void initializeWatermarks() {
diff --git 
a/gobblin-modules/gobblin-service-kafka/src/main/java/org/apache/gobblin/service/SimpleKafkaSpecExecutor.java
 
b/gobblin-modules/gobblin-service-kafka/src/main/java/org/apache/gobblin/service/SimpleKafkaSpecExecutor.java
index 29e735e..8243d19 100644
--- 
a/gobblin-modules/gobblin-service-kafka/src/main/java/org/apache/gobblin/service/SimpleKafkaSpecExecutor.java
+++ 
b/gobblin-modules/gobblin-service-kafka/src/main/java/org/apache/gobblin/service/SimpleKafkaSpecExecutor.java
@@ -39,9 +39,6 @@ import org.apache.gobblin.util.CompletedFuture;
 public class SimpleKafkaSpecExecutor extends AbstractSpecExecutor {
   public static final String SPEC_KAFKA_TOPICS_KEY = "spec.kafka.topics";
 
-
-  protected static final String VERB_KEY = "Verb";
-
   private SpecProducer<Spec> specProducer;
 
   public SimpleKafkaSpecExecutor(Config config, Optional<Logger> log) {
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/FsSpecConsumer.java
 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/FsSpecConsumer.java
index 14c4186..ccc0a49 100644
--- 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/FsSpecConsumer.java
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/FsSpecConsumer.java
@@ -51,7 +51,6 @@ import org.apache.gobblin.util.filters.HiddenFilter;
 @Slf4j
 public class FsSpecConsumer implements SpecConsumer<Spec> {
   public static final String SPEC_PATH_KEY = 
"gobblin.cluster.specConsumer.path";
-  public static final String VERB_KEY = "Verb";
 
   private final Path specDirPath;
   private final FileSystem fs;
@@ -125,7 +124,7 @@ public class FsSpecConsumer implements SpecConsumer<Spec> {
           continue;
         }
 
-        String verbName = avroJobSpec.getMetadata().get(VERB_KEY);
+        String verbName = avroJobSpec.getMetadata().get(SpecExecutor.VERB_KEY);
         SpecExecutor.Verb verb = SpecExecutor.Verb.valueOf(verbName);
 
         JobSpec jobSpec = jobSpecBuilder.build();
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/FsSpecProducer.java
 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/FsSpecProducer.java
index e8264eb..c626b20 100644
--- 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/FsSpecProducer.java
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/FsSpecProducer.java
@@ -52,8 +52,6 @@ import org.apache.gobblin.util.HadoopUtils;
  */
 @Slf4j
 public class FsSpecProducer implements SpecProducer<Spec> {
-  protected static final String VERB_KEY = "Verb";
-
   private Path specConsumerPath;
   private FileSystem fs;
 
@@ -107,7 +105,7 @@ public class FsSpecProducer implements SpecProducer<Spec> {
   @Override
   public Future<?> deleteSpec(URI deletedSpecURI, Properties headers) {
     AvroJobSpec avroJobSpec = 
AvroJobSpec.newBuilder().setUri(deletedSpecURI.toString())
-        .setMetadata(ImmutableMap.of(VERB_KEY, 
SpecExecutor.Verb.DELETE.name()))
+        .setMetadata(ImmutableMap.of(SpecExecutor.VERB_KEY, 
SpecExecutor.Verb.DELETE.name()))
         .setProperties(Maps.fromProperties(headers)).build();
     try {
       writeAvroJobSpec(avroJobSpec);
@@ -131,7 +129,7 @@ public class FsSpecProducer implements SpecProducer<Spec> {
         setTemplateUri("FS:///").
         setDescription(jobSpec.getDescription()).
         setVersion(jobSpec.getVersion()).
-        setMetadata(ImmutableMap.of(VERB_KEY, verb.name())).build();
+        setMetadata(ImmutableMap.of(SpecExecutor.VERB_KEY, 
verb.name())).build();
   }
 
   private void writeAvroJobSpec(AvroJobSpec jobSpec) throws IOException {
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobSpec.java 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobSpec.java
index 4043a30..9dad5a1 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobSpec.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobSpec.java
@@ -78,7 +78,6 @@ public class JobSpec implements Configurable, Spec {
   Map<String, String> metadata;
 
   /** A Verb identifies if the Spec is for Insert/Update/Delete */
-  public static final String VERB_KEY = "Verb";
   private static final String IN_MEMORY_TEMPLATE_URI = "inmemory";
 
   public static Builder builder(URI jobSpecUri) {
@@ -344,7 +343,7 @@ public class JobSpec implements Configurable, Spec {
 
     public Map getDefaultMetadata() {
       log.debug("Job Spec Verb is not provided, using type 'UNKNOWN'.");
-      return ImmutableMap.of(VERB_KEY, SpecExecutor.Verb.UNKNOWN.name());
+      return ImmutableMap.of(SpecExecutor.VERB_KEY, 
SpecExecutor.Verb.UNKNOWN.name());
     }
 
     public Map getMetadata() {
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_monitor/AvroJobSpecKafkaJobMonitor.java
 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_monitor/AvroJobSpecKafkaJobMonitor.java
index d9bc1aa..94a65b5 100644
--- 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_monitor/AvroJobSpecKafkaJobMonitor.java
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_monitor/AvroJobSpecKafkaJobMonitor.java
@@ -58,7 +58,6 @@ public class AvroJobSpecKafkaJobMonitor extends 
KafkaAvroJobMonitor<AvroJobSpec>
   public static final String CONFIG_PREFIX = "gobblin.jobMonitor.avroJobSpec";
   public static final String TOPIC_KEY = "topic";
   public static final String SCHEMA_VERSION_READER_CLASS = 
"versionReaderClass";
-  protected static final String VERB_KEY = "Verb";
   private static final Config DEFAULTS = 
ConfigFactory.parseMap(ImmutableMap.of(
       SCHEMA_VERSION_READER_CLASS, FixedSchemaVersionWriter.class.getName()));
 
@@ -108,10 +107,10 @@ public class AvroJobSpecKafkaJobMonitor extends 
KafkaAvroJobMonitor<AvroJobSpec>
   /**
    * Creates a {@link JobSpec} or {@link URI} from the {@link AvroJobSpec} 
record.
    * @param record the record as an {@link AvroJobSpec}
-   * @return a {@link JobSpec} or {@link URI} wrapped in a {@link Collection} 
of {@link Either}
+   * @return a {@link JobSpec}
    */
   @Override
-  public Collection<Either<JobSpec, URI>> parseJobSpec(AvroJobSpec record) {
+  public Collection<JobSpec> parseJobSpec(AvroJobSpec record) {
     JobSpec.Builder jobSpecBuilder = JobSpec.builder(record.getUri());
 
     Properties props = new Properties();
@@ -127,17 +126,10 @@ public class AvroJobSpecKafkaJobMonitor extends 
KafkaAvroJobMonitor<AvroJobSpec>
       }
     }
 
-    String verbName = record.getMetadata().get(VERB_KEY);
-    SpecExecutor.Verb verb = SpecExecutor.Verb.valueOf(verbName);
-
     JobSpec jobSpec = jobSpecBuilder.build();
 
     log.info("Parsed job spec " + jobSpec.toString());
 
-    if (verb == Verb.ADD || verb == Verb.UPDATE) {
-      return Lists.newArrayList(Either.left(jobSpec));
-    } else {
-      return Lists.newArrayList(Either.right(jobSpec.getUri()));
-    }
+    return Lists.newArrayList(jobSpec);
   }
 }
\ No newline at end of file
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_monitor/KafkaAvroJobMonitor.java
 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_monitor/KafkaAvroJobMonitor.java
index 14ec6cc..8e9c551 100644
--- 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_monitor/KafkaAvroJobMonitor.java
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_monitor/KafkaAvroJobMonitor.java
@@ -21,7 +21,6 @@ import java.io.ByteArrayInputStream;
 import java.io.DataInputStream;
 import java.io.IOException;
 import java.io.InputStream;
-import java.net.URI;
 import java.util.Collection;
 import java.util.List;
 
@@ -37,13 +36,11 @@ import com.google.common.base.Optional;
 import com.google.common.collect.Lists;
 import com.typesafe.config.Config;
 
-import org.apache.gobblin.kafka.client.GobblinKafkaConsumerClient;
 import org.apache.gobblin.metrics.Tag;
 import org.apache.gobblin.metrics.reporter.util.SchemaVersionWriter;
 import org.apache.gobblin.runtime.api.JobSpec;
 import org.apache.gobblin.runtime.api.MutableJobCatalog;
 import org.apache.gobblin.runtime.metrics.RuntimeMetrics;
-import org.apache.gobblin.util.Either;
 
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
@@ -102,7 +99,7 @@ public abstract class KafkaAvroJobMonitor<T> extends 
KafkaJobMonitor {
   }
 
   @Override
-  public Collection<Either<JobSpec, URI>> parseJobSpec(byte[] message)
+  public Collection<JobSpec> parseJobSpec(byte[] message)
       throws IOException {
 
     InputStream is = new ByteArrayInputStream(message);
@@ -126,5 +123,5 @@ public abstract class KafkaAvroJobMonitor<T> extends 
KafkaJobMonitor {
   /**
    * Extract {@link JobSpec}s from the Kafka message.
    */
-  public abstract Collection<Either<JobSpec, URI>> parseJobSpec(T message);
+  public abstract Collection<JobSpec> parseJobSpec(T message);
 }
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_monitor/KafkaJobMonitor.java
 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_monitor/KafkaJobMonitor.java
index 38c3c68..e36b940 100644
--- 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_monitor/KafkaJobMonitor.java
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_monitor/KafkaJobMonitor.java
@@ -24,6 +24,8 @@ import java.util.Collection;
 import com.codahale.metrics.Counter;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Charsets;
+import com.google.common.base.Optional;
+import com.google.common.eventbus.EventBus;
 import com.typesafe.config.Config;
 
 import lombok.Getter;
@@ -35,10 +37,10 @@ import org.apache.gobblin.runtime.api.FlowSpec;
 import org.apache.gobblin.runtime.api.JobSpec;
 import org.apache.gobblin.runtime.api.JobSpecMonitor;
 import org.apache.gobblin.runtime.api.MutableJobCatalog;
+import org.apache.gobblin.runtime.api.SpecExecutor;
 import org.apache.gobblin.runtime.kafka.HighLevelConsumer;
 import org.apache.gobblin.runtime.metrics.RuntimeMetrics;
 import org.apache.gobblin.util.ConfigUtils;
-import org.apache.gobblin.util.Either;
 
 
 /**
@@ -61,11 +63,11 @@ public abstract class KafkaJobMonitor extends 
HighLevelConsumer<byte[], byte[]>
   protected Counter removedSpecs;
 
   /**
-   * @return A collection of either {@link JobSpec}s to add/update or {@link 
URI}s to remove from the catalog,
+   * @return A collection of {@link JobSpec}s to add/update/remove from the 
catalog,
    *        parsed from the Kafka message.
    * @throws IOException
    */
-  public abstract Collection<Either<JobSpec, URI>> parseJobSpec(byte[] 
message) throws IOException;
+  public abstract Collection<JobSpec> parseJobSpec(byte[] message) throws 
IOException;
 
   public KafkaJobMonitor(String topic, MutableJobCatalog catalog, Config 
config) {
     super(topic, ConfigUtils.getConfigOrEmpty(config, 
KAFKA_JOB_MONITOR_PREFIX), 1);
@@ -100,17 +102,39 @@ public abstract class KafkaJobMonitor extends 
HighLevelConsumer<byte[], byte[]>
   @Override
   protected void processMessage(DecodeableKafkaRecord<byte[],byte[]> message) {
     try {
-      Collection<Either<JobSpec, URI>> parsedCollection = 
parseJobSpec(message.getValue());
-      for (Either<JobSpec, URI> parsedMessage : parsedCollection) {
-        if (parsedMessage instanceof Either.Left) {
-          this.newSpecs.inc();
-          this.jobCatalog.put(((Either.Left<JobSpec, URI>) 
parsedMessage).getLeft());
-        } else if (parsedMessage instanceof Either.Right) {
-          this.removedSpecs.inc();
-          URI jobSpecUri = ((Either.Right<JobSpec, URI>) 
parsedMessage).getRight();
-          this.jobCatalog.remove(jobSpecUri, true);
-          // Delete the job state if it is a delete spec request
-          deleteStateStore(jobSpecUri);
+      Collection<JobSpec> parsedCollection = parseJobSpec(message.getValue());
+      for (JobSpec parsedMessage : parsedCollection) {
+        SpecExecutor.Verb verb;
+
+        try {
+          verb = 
SpecExecutor.Verb.valueOf(parsedMessage.getMetadata().get(SpecExecutor.VERB_KEY));
+        } catch (IllegalArgumentException | NullPointerException e) {
+          log.error("Unknown verb {} for spec {}", 
parsedMessage.getMetadata().get(SpecExecutor.VERB_KEY), parsedMessage.getUri());
+          continue;
+        }
+
+        switch (verb) {
+          case ADD:
+          case UPDATE:
+            this.newSpecs.inc();
+            this.jobCatalog.put(parsedMessage);
+            break;
+          case UNKNOWN: // unknown are considered as add request to maintain 
backward compatibility
+            log.warn("Job Spec Verb is 'UNKNOWN', putting this spec in job 
catalog anyway.");
+            this.jobCatalog.put(parsedMessage);
+            break;
+          case DELETE:
+            this.removedSpecs.inc();
+            URI jobSpecUri = parsedMessage.getUri();
+            this.jobCatalog.remove(jobSpecUri);
+            // Delete the job state if it is a delete spec request
+            deleteStateStore(jobSpecUri);
+            break;
+          case CANCEL:
+            this.jobCatalog.remove(parsedMessage.getUri(), true);
+            break;
+          default:
+            log.error("Cannot process spec {} with verb {}", 
parsedMessage.getUri(), verb);
         }
       }
     } catch (IOException ioe) {
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_monitor/SLAEventKafkaJobMonitor.java
 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_monitor/SLAEventKafkaJobMonitor.java
index 576b8c4..e6cf0a9 100644
--- 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_monitor/SLAEventKafkaJobMonitor.java
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_monitor/SLAEventKafkaJobMonitor.java
@@ -172,7 +172,7 @@ public class SLAEventKafkaJobMonitor extends 
KafkaAvroJobMonitor<GobblinTracking
   }
 
   @Override
-  public Collection<Either<JobSpec, URI>> parseJobSpec(GobblinTrackingEvent 
event) {
+  public Collection<JobSpec> parseJobSpec(GobblinTrackingEvent event) {
 
     if (!acceptEvent(event)) {
       this.rejectedEvents.inc();
@@ -192,7 +192,7 @@ public class SLAEventKafkaJobMonitor extends 
KafkaAvroJobMonitor<GobblinTracking
 
     JobSpec jobSpec = 
JobSpec.builder(jobSpecURI).withTemplate(this.template).withConfig(jobConfig).build();
 
-    return Lists.newArrayList(Either.<JobSpec, URI>left(jobSpec));
+    return Lists.newArrayList(jobSpec);
   }
 
   /**
diff --git 
a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/job_monitor/MockedKafkaJobMonitor.java
 
b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/job_monitor/MockedKafkaJobMonitor.java
index b68bc1a..2d0c64d 100644
--- 
a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/job_monitor/MockedKafkaJobMonitor.java
+++ 
b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/job_monitor/MockedKafkaJobMonitor.java
@@ -31,19 +31,18 @@ import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 
 import com.google.common.base.Charsets;
-import com.google.common.base.Optional;
 import com.google.common.base.Predicate;
 import com.google.common.base.Splitter;
+import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
 
-import org.apache.gobblin.kafka.client.GobblinKafkaConsumerClient;
 import org.apache.gobblin.runtime.api.JobSpec;
 import org.apache.gobblin.runtime.api.MutableJobCatalog;
+import org.apache.gobblin.runtime.api.SpecExecutor;
 import org.apache.gobblin.testing.AssertWithBackoff;
-import org.apache.gobblin.util.Either;
 
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
@@ -90,18 +89,18 @@ public class MockedKafkaJobMonitor extends KafkaJobMonitor {
         jobSpecs.remove(uri);
         return null;
       }
-    }).when(jobCatalog).remove(Mockito.any(URI.class), Mockito.anyBoolean());
+    }).when(jobCatalog).remove(Mockito.any(URI.class));
 
     return jobCatalog;
   }
 
 
   @Override
-  public Collection<Either<JobSpec, URI>> parseJobSpec(byte[] message)
+  public Collection<JobSpec> parseJobSpec(byte[] message)
       throws IOException {
     try {
       String messageString = new String(message, Charsets.UTF_8);
-      List<Either<JobSpec, URI>> jobSpecs = Lists.newArrayList();
+      List<JobSpec> jobSpecs = Lists.newArrayList();
 
       for (String oneInstruction : SPLITTER_COMMA.split(messageString)) {
 
@@ -109,12 +108,15 @@ public class MockedKafkaJobMonitor extends 
KafkaJobMonitor {
 
         if (tokens.get(0).equals(REMOVE)) {
           URI uri = new URI(tokens.get(1));
-          jobSpecs.add(Either.<JobSpec, URI>right(uri));
+          JobSpec jobSpec = new 
JobSpec.Builder(uri).withConfig(ConfigFactory.empty())
+              .withMetadata(ImmutableMap.of(SpecExecutor.VERB_KEY, 
SpecExecutor.Verb.DELETE.name())).build();
+          jobSpecs.add(jobSpec);
         } else {
           URI uri = new URI(tokens.get(0));
           String version = tokens.get(1);
-          JobSpec jobSpec = new 
JobSpec.Builder(uri).withConfig(ConfigFactory.empty()).withVersion(version).build();
-          jobSpecs.add(Either.<JobSpec, URI>left(jobSpec));
+          JobSpec jobSpec = new 
JobSpec.Builder(uri).withConfig(ConfigFactory.empty()).withVersion(version)
+              .withMetadata(ImmutableMap.of(SpecExecutor.VERB_KEY, 
SpecExecutor.Verb.ADD.name())).build();
+          jobSpecs.add(jobSpec);
         }
       }
       return jobSpecs;

Reply via email to