This is an automated email from the ASF dual-hosted git repository.
suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new ae62d77 [GOBBLIN-1441] separate delete and cancel specs in
KafkaJobMonitor
ae62d77 is described below
commit ae62d77a0b3b0d123cea811b00bd363b6c6d30f3
Author: Arjun <[email protected]>
AuthorDate: Mon May 10 17:17:01 2021 -0700
[GOBBLIN-1441] separate delete and cancel specs in KafkaJobMonitor
Closes #3276 from
arjun4084346/cancelDeleteInCluster
---
.../gobblin/cluster/GobblinClusterManager.java | 4 --
.../gobblin/runtime/KafkaAvroJobMonitorTest.java | 12 ++---
.../runtime/SLAEventKafkaJobMonitorTest.java | 8 ++--
.../gobblin/service/SimpleKafkaSpecConsumer.java | 7 ++-
.../gobblin/service/SimpleKafkaSpecExecutor.java | 3 --
.../apache/gobblin/runtime/api/FsSpecConsumer.java | 3 +-
.../apache/gobblin/runtime/api/FsSpecProducer.java | 6 +--
.../org/apache/gobblin/runtime/api/JobSpec.java | 3 +-
.../job_monitor/AvroJobSpecKafkaJobMonitor.java | 14 ++----
.../runtime/job_monitor/KafkaAvroJobMonitor.java | 7 +--
.../runtime/job_monitor/KafkaJobMonitor.java | 52 ++++++++++++++++------
.../job_monitor/SLAEventKafkaJobMonitor.java | 4 +-
.../runtime/job_monitor/MockedKafkaJobMonitor.java | 20 +++++----
13 files changed, 73 insertions(+), 70 deletions(-)
diff --git
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterManager.java
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterManager.java
index c2415e6..384176b 100644
---
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterManager.java
+++
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterManager.java
@@ -379,10 +379,6 @@ public class GobblinClusterManager implements
ApplicationLauncher, StandardMetri
* Build the {@link JobConfigurationManager} for the Application Master.
*/
private JobConfigurationManager buildJobConfigurationManager(Config config) {
- return create(config);
- }
-
- private JobConfigurationManager create(Config config) {
try {
List<Object> argumentList = (this.jobCatalog != null)?
ImmutableList.of(this.eventBus, config, this.jobCatalog, this.fs) :
ImmutableList.of(this.eventBus, config, this.fs);
diff --git
a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaAvroJobMonitorTest.java
b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaAvroJobMonitorTest.java
index 2e63cc2..92f95a3 100644
---
a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaAvroJobMonitorTest.java
+++
b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaAvroJobMonitorTest.java
@@ -57,7 +57,7 @@ public class KafkaAvroJobMonitorTest {
new AvroBinarySerializer<>(GobblinTrackingEvent.SCHEMA$, new
NoopSchemaVersionWriter());
GobblinTrackingEvent event = new GobblinTrackingEvent(0L, "namespace",
"event", Maps.<String, String>newHashMap());
- Collection<Either<JobSpec, URI>> results =
monitor.parseJobSpec(serializer.serializeRecord(event));
+ Collection<JobSpec> results =
monitor.parseJobSpec(serializer.serializeRecord(event));
Assert.assertEquals(results.size(), 1);
Assert.assertEquals(monitor.events.size(), 1);
Assert.assertEquals(monitor.events.get(0), event);
@@ -76,7 +76,7 @@ public class KafkaAvroJobMonitorTest {
new AvroBinarySerializer<>(MetricReport.SCHEMA$, new
NoopSchemaVersionWriter());
MetricReport event = new MetricReport(Maps.<String, String>newHashMap(),
0L, Lists.<Metric>newArrayList());
- Collection<Either<JobSpec, URI>> results =
monitor.parseJobSpec(serializer.serializeRecord(event));
+ Collection<JobSpec> results =
monitor.parseJobSpec(serializer.serializeRecord(event));
Assert.assertEquals(results.size(), 0);
Assert.assertEquals(monitor.events.size(), 0);
@@ -96,7 +96,7 @@ public class KafkaAvroJobMonitorTest {
new AvroBinarySerializer<>(GobblinTrackingEvent.SCHEMA$, new
FixedSchemaVersionWriter());
GobblinTrackingEvent event = new GobblinTrackingEvent(0L, "namespace",
"event", Maps.<String, String>newHashMap());
- Collection<Either<JobSpec, URI>> results =
monitor.parseJobSpec(serializer.serializeRecord(event));
+ Collection<JobSpec> results =
monitor.parseJobSpec(serializer.serializeRecord(event));
Assert.assertEquals(results.size(), 1);
Assert.assertEquals(monitor.events.size(), 1);
Assert.assertEquals(monitor.events.get(0), event);
@@ -114,7 +114,7 @@ public class KafkaAvroJobMonitorTest {
new AvroBinarySerializer<>(GobblinTrackingEvent.SCHEMA$, new
FixedSchemaVersionWriter());
GobblinTrackingEvent event = new GobblinTrackingEvent(0L, "namespace",
"event", Maps.<String, String>newHashMap());
- Collection<Either<JobSpec, URI>> results =
monitor.parseJobSpec(serializer.serializeRecord(event));
+ Collection<JobSpec> results =
monitor.parseJobSpec(serializer.serializeRecord(event));
Assert.assertEquals(results.size(), 0);
Assert.assertEquals(monitor.events.size(), 0);
Assert.assertEquals(monitor.getMessageParseFailures().getCount(), 1);
@@ -132,9 +132,9 @@ public class KafkaAvroJobMonitorTest {
}
@Override
- public Collection<Either<JobSpec, URI>> parseJobSpec(GobblinTrackingEvent
message) {
+ public Collection<JobSpec> parseJobSpec(GobblinTrackingEvent message) {
this.events.add(message);
- return Lists.newArrayList(Either.<JobSpec,
URI>left(JobSpec.builder(message.getName()).build()));
+ return Lists.newArrayList(JobSpec.builder(message.getName()).build());
}
@Override
diff --git
a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/SLAEventKafkaJobMonitorTest.java
b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/SLAEventKafkaJobMonitorTest.java
index 74564b7..ad74da9 100644
---
a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/SLAEventKafkaJobMonitorTest.java
+++
b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/SLAEventKafkaJobMonitorTest.java
@@ -68,10 +68,10 @@ public class SLAEventKafkaJobMonitorTest {
GobblinTrackingEvent event = createSLAEvent("DatasetPublish", new
URI("/data/myDataset"),
ImmutableMap.of("metadataKey1","value1","key1","value2"));
- Collection<Either<JobSpec, URI>> jobSpecs = monitor.parseJobSpec(event);
+ Collection<JobSpec> jobSpecs = monitor.parseJobSpec(event);
Assert.assertEquals(jobSpecs.size(), 1);
- JobSpec jobSpec = (JobSpec) jobSpecs.iterator().next().get();
+ JobSpec jobSpec = jobSpecs.iterator().next();
Assert.assertEquals(jobSpec.getUri(), new URI("/base/URI/data/myDataset"));
Assert.assertEquals(jobSpec.getTemplateURI().get(), templateURI);
// should insert configuration from metadata
@@ -92,7 +92,7 @@ public class SLAEventKafkaJobMonitorTest {
monitor.buildMetricsContextAndMetrics();
GobblinTrackingEvent event;
- Collection<Either<JobSpec, URI>> jobSpecs;
+ Collection<JobSpec> jobSpecs;
event = createSLAEvent("acceptthis", new URI("/data/myDataset"),
Maps.<String, String>newHashMap());
jobSpecs = monitor.parseJobSpec(event);
@@ -123,7 +123,7 @@ public class SLAEventKafkaJobMonitorTest {
monitor.buildMetricsContextAndMetrics();
GobblinTrackingEvent event;
- Collection<Either<JobSpec, URI>> jobSpecs;
+ Collection<JobSpec> jobSpecs;
event = createSLAEvent("event", new URI("/accept/myDataset"),
Maps.<String, String>newHashMap());
jobSpecs = monitor.parseJobSpec(event);
diff --git
a/gobblin-modules/gobblin-service-kafka/src/main/java/org/apache/gobblin/service/SimpleKafkaSpecConsumer.java
b/gobblin-modules/gobblin-service-kafka/src/main/java/org/apache/gobblin/service/SimpleKafkaSpecConsumer.java
index 139d204..19458f1 100644
---
a/gobblin-modules/gobblin-service-kafka/src/main/java/org/apache/gobblin/service/SimpleKafkaSpecConsumer.java
+++
b/gobblin-modules/gobblin-service-kafka/src/main/java/org/apache/gobblin/service/SimpleKafkaSpecConsumer.java
@@ -62,7 +62,6 @@ import
org.apache.gobblin.source.extractor.extract.kafka.KafkaTopic;
import org.apache.gobblin.util.CompletedFuture;
import org.apache.gobblin.util.ConfigUtils;
-import static org.apache.gobblin.service.SimpleKafkaSpecExecutor.VERB_KEY;
import lombok.extern.slf4j.Slf4j;
@@ -193,10 +192,10 @@ public class SimpleKafkaSpecConsumer implements
SpecConsumer<Spec>, Closeable {
jobSpecBuilder.withTemplate(new URI(record.getTemplateUri()));
}
- String verbName = record.getMetadata().get(VERB_KEY);
+ String verbName = record.getMetadata().get(SpecExecutor.VERB_KEY);
SpecExecutor.Verb verb = SpecExecutor.Verb.valueOf(verbName);
- changesSpecs.add(new ImmutablePair<SpecExecutor.Verb, Spec>(verb,
jobSpecBuilder.build()));
+ changesSpecs.add(new ImmutablePair<>(verb, jobSpecBuilder.build()));
} catch (Throwable t) {
log.error("Could not decode record at partition " +
this.currentPartitionIdx +
" offset " + nextValidMessage.getOffset());
@@ -204,7 +203,7 @@ public class SimpleKafkaSpecConsumer implements
SpecConsumer<Spec>, Closeable {
}
}
- return new CompletedFuture(changesSpecs, null);
+ return new CompletedFuture<>(changesSpecs, null);
}
private void initializeWatermarks() {
diff --git
a/gobblin-modules/gobblin-service-kafka/src/main/java/org/apache/gobblin/service/SimpleKafkaSpecExecutor.java
b/gobblin-modules/gobblin-service-kafka/src/main/java/org/apache/gobblin/service/SimpleKafkaSpecExecutor.java
index 29e735e..8243d19 100644
---
a/gobblin-modules/gobblin-service-kafka/src/main/java/org/apache/gobblin/service/SimpleKafkaSpecExecutor.java
+++
b/gobblin-modules/gobblin-service-kafka/src/main/java/org/apache/gobblin/service/SimpleKafkaSpecExecutor.java
@@ -39,9 +39,6 @@ import org.apache.gobblin.util.CompletedFuture;
public class SimpleKafkaSpecExecutor extends AbstractSpecExecutor {
public static final String SPEC_KAFKA_TOPICS_KEY = "spec.kafka.topics";
-
- protected static final String VERB_KEY = "Verb";
-
private SpecProducer<Spec> specProducer;
public SimpleKafkaSpecExecutor(Config config, Optional<Logger> log) {
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/FsSpecConsumer.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/FsSpecConsumer.java
index 14c4186..ccc0a49 100644
---
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/FsSpecConsumer.java
+++
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/FsSpecConsumer.java
@@ -51,7 +51,6 @@ import org.apache.gobblin.util.filters.HiddenFilter;
@Slf4j
public class FsSpecConsumer implements SpecConsumer<Spec> {
public static final String SPEC_PATH_KEY =
"gobblin.cluster.specConsumer.path";
- public static final String VERB_KEY = "Verb";
private final Path specDirPath;
private final FileSystem fs;
@@ -125,7 +124,7 @@ public class FsSpecConsumer implements SpecConsumer<Spec> {
continue;
}
- String verbName = avroJobSpec.getMetadata().get(VERB_KEY);
+ String verbName = avroJobSpec.getMetadata().get(SpecExecutor.VERB_KEY);
SpecExecutor.Verb verb = SpecExecutor.Verb.valueOf(verbName);
JobSpec jobSpec = jobSpecBuilder.build();
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/FsSpecProducer.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/FsSpecProducer.java
index e8264eb..c626b20 100644
---
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/FsSpecProducer.java
+++
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/FsSpecProducer.java
@@ -52,8 +52,6 @@ import org.apache.gobblin.util.HadoopUtils;
*/
@Slf4j
public class FsSpecProducer implements SpecProducer<Spec> {
- protected static final String VERB_KEY = "Verb";
-
private Path specConsumerPath;
private FileSystem fs;
@@ -107,7 +105,7 @@ public class FsSpecProducer implements SpecProducer<Spec> {
@Override
public Future<?> deleteSpec(URI deletedSpecURI, Properties headers) {
AvroJobSpec avroJobSpec =
AvroJobSpec.newBuilder().setUri(deletedSpecURI.toString())
- .setMetadata(ImmutableMap.of(VERB_KEY,
SpecExecutor.Verb.DELETE.name()))
+ .setMetadata(ImmutableMap.of(SpecExecutor.VERB_KEY,
SpecExecutor.Verb.DELETE.name()))
.setProperties(Maps.fromProperties(headers)).build();
try {
writeAvroJobSpec(avroJobSpec);
@@ -131,7 +129,7 @@ public class FsSpecProducer implements SpecProducer<Spec> {
setTemplateUri("FS:///").
setDescription(jobSpec.getDescription()).
setVersion(jobSpec.getVersion()).
- setMetadata(ImmutableMap.of(VERB_KEY, verb.name())).build();
+ setMetadata(ImmutableMap.of(SpecExecutor.VERB_KEY,
verb.name())).build();
}
private void writeAvroJobSpec(AvroJobSpec jobSpec) throws IOException {
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobSpec.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobSpec.java
index 4043a30..9dad5a1 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobSpec.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobSpec.java
@@ -78,7 +78,6 @@ public class JobSpec implements Configurable, Spec {
Map<String, String> metadata;
/** A Verb identifies if the Spec is for Insert/Update/Delete */
- public static final String VERB_KEY = "Verb";
private static final String IN_MEMORY_TEMPLATE_URI = "inmemory";
public static Builder builder(URI jobSpecUri) {
@@ -344,7 +343,7 @@ public class JobSpec implements Configurable, Spec {
public Map getDefaultMetadata() {
log.debug("Job Spec Verb is not provided, using type 'UNKNOWN'.");
- return ImmutableMap.of(VERB_KEY, SpecExecutor.Verb.UNKNOWN.name());
+ return ImmutableMap.of(SpecExecutor.VERB_KEY,
SpecExecutor.Verb.UNKNOWN.name());
}
public Map getMetadata() {
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_monitor/AvroJobSpecKafkaJobMonitor.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_monitor/AvroJobSpecKafkaJobMonitor.java
index d9bc1aa..94a65b5 100644
---
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_monitor/AvroJobSpecKafkaJobMonitor.java
+++
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_monitor/AvroJobSpecKafkaJobMonitor.java
@@ -58,7 +58,6 @@ public class AvroJobSpecKafkaJobMonitor extends
KafkaAvroJobMonitor<AvroJobSpec>
public static final String CONFIG_PREFIX = "gobblin.jobMonitor.avroJobSpec";
public static final String TOPIC_KEY = "topic";
public static final String SCHEMA_VERSION_READER_CLASS =
"versionReaderClass";
- protected static final String VERB_KEY = "Verb";
private static final Config DEFAULTS =
ConfigFactory.parseMap(ImmutableMap.of(
SCHEMA_VERSION_READER_CLASS, FixedSchemaVersionWriter.class.getName()));
@@ -108,10 +107,10 @@ public class AvroJobSpecKafkaJobMonitor extends
KafkaAvroJobMonitor<AvroJobSpec>
/**
* Creates a {@link JobSpec} or {@link URI} from the {@link AvroJobSpec}
record.
* @param record the record as an {@link AvroJobSpec}
- * @return a {@link JobSpec} or {@link URI} wrapped in a {@link Collection}
of {@link Either}
+ * @return a {@link JobSpec}
*/
@Override
- public Collection<Either<JobSpec, URI>> parseJobSpec(AvroJobSpec record) {
+ public Collection<JobSpec> parseJobSpec(AvroJobSpec record) {
JobSpec.Builder jobSpecBuilder = JobSpec.builder(record.getUri());
Properties props = new Properties();
@@ -127,17 +126,10 @@ public class AvroJobSpecKafkaJobMonitor extends
KafkaAvroJobMonitor<AvroJobSpec>
}
}
- String verbName = record.getMetadata().get(VERB_KEY);
- SpecExecutor.Verb verb = SpecExecutor.Verb.valueOf(verbName);
-
JobSpec jobSpec = jobSpecBuilder.build();
log.info("Parsed job spec " + jobSpec.toString());
- if (verb == Verb.ADD || verb == Verb.UPDATE) {
- return Lists.newArrayList(Either.left(jobSpec));
- } else {
- return Lists.newArrayList(Either.right(jobSpec.getUri()));
- }
+ return Lists.newArrayList(jobSpec);
}
}
\ No newline at end of file
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_monitor/KafkaAvroJobMonitor.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_monitor/KafkaAvroJobMonitor.java
index 14ec6cc..8e9c551 100644
---
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_monitor/KafkaAvroJobMonitor.java
+++
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_monitor/KafkaAvroJobMonitor.java
@@ -21,7 +21,6 @@ import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
import java.io.IOException;
import java.io.InputStream;
-import java.net.URI;
import java.util.Collection;
import java.util.List;
@@ -37,13 +36,11 @@ import com.google.common.base.Optional;
import com.google.common.collect.Lists;
import com.typesafe.config.Config;
-import org.apache.gobblin.kafka.client.GobblinKafkaConsumerClient;
import org.apache.gobblin.metrics.Tag;
import org.apache.gobblin.metrics.reporter.util.SchemaVersionWriter;
import org.apache.gobblin.runtime.api.JobSpec;
import org.apache.gobblin.runtime.api.MutableJobCatalog;
import org.apache.gobblin.runtime.metrics.RuntimeMetrics;
-import org.apache.gobblin.util.Either;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
@@ -102,7 +99,7 @@ public abstract class KafkaAvroJobMonitor<T> extends
KafkaJobMonitor {
}
@Override
- public Collection<Either<JobSpec, URI>> parseJobSpec(byte[] message)
+ public Collection<JobSpec> parseJobSpec(byte[] message)
throws IOException {
InputStream is = new ByteArrayInputStream(message);
@@ -126,5 +123,5 @@ public abstract class KafkaAvroJobMonitor<T> extends
KafkaJobMonitor {
/**
* Extract {@link JobSpec}s from the Kafka message.
*/
- public abstract Collection<Either<JobSpec, URI>> parseJobSpec(T message);
+ public abstract Collection<JobSpec> parseJobSpec(T message);
}
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_monitor/KafkaJobMonitor.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_monitor/KafkaJobMonitor.java
index 38c3c68..e36b940 100644
---
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_monitor/KafkaJobMonitor.java
+++
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_monitor/KafkaJobMonitor.java
@@ -24,6 +24,8 @@ import java.util.Collection;
import com.codahale.metrics.Counter;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Charsets;
+import com.google.common.base.Optional;
+import com.google.common.eventbus.EventBus;
import com.typesafe.config.Config;
import lombok.Getter;
@@ -35,10 +37,10 @@ import org.apache.gobblin.runtime.api.FlowSpec;
import org.apache.gobblin.runtime.api.JobSpec;
import org.apache.gobblin.runtime.api.JobSpecMonitor;
import org.apache.gobblin.runtime.api.MutableJobCatalog;
+import org.apache.gobblin.runtime.api.SpecExecutor;
import org.apache.gobblin.runtime.kafka.HighLevelConsumer;
import org.apache.gobblin.runtime.metrics.RuntimeMetrics;
import org.apache.gobblin.util.ConfigUtils;
-import org.apache.gobblin.util.Either;
/**
@@ -61,11 +63,11 @@ public abstract class KafkaJobMonitor extends
HighLevelConsumer<byte[], byte[]>
protected Counter removedSpecs;
/**
- * @return A collection of either {@link JobSpec}s to add/update or {@link
URI}s to remove from the catalog,
+ * @return A collection of {@link JobSpec}s to add/update/remove from the
catalog,
* parsed from the Kafka message.
* @throws IOException
*/
- public abstract Collection<Either<JobSpec, URI>> parseJobSpec(byte[]
message) throws IOException;
+ public abstract Collection<JobSpec> parseJobSpec(byte[] message) throws
IOException;
public KafkaJobMonitor(String topic, MutableJobCatalog catalog, Config
config) {
super(topic, ConfigUtils.getConfigOrEmpty(config,
KAFKA_JOB_MONITOR_PREFIX), 1);
@@ -100,17 +102,39 @@ public abstract class KafkaJobMonitor extends
HighLevelConsumer<byte[], byte[]>
@Override
protected void processMessage(DecodeableKafkaRecord<byte[],byte[]> message) {
try {
- Collection<Either<JobSpec, URI>> parsedCollection =
parseJobSpec(message.getValue());
- for (Either<JobSpec, URI> parsedMessage : parsedCollection) {
- if (parsedMessage instanceof Either.Left) {
- this.newSpecs.inc();
- this.jobCatalog.put(((Either.Left<JobSpec, URI>)
parsedMessage).getLeft());
- } else if (parsedMessage instanceof Either.Right) {
- this.removedSpecs.inc();
- URI jobSpecUri = ((Either.Right<JobSpec, URI>)
parsedMessage).getRight();
- this.jobCatalog.remove(jobSpecUri, true);
- // Delete the job state if it is a delete spec request
- deleteStateStore(jobSpecUri);
+ Collection<JobSpec> parsedCollection = parseJobSpec(message.getValue());
+ for (JobSpec parsedMessage : parsedCollection) {
+ SpecExecutor.Verb verb;
+
+ try {
+ verb =
SpecExecutor.Verb.valueOf(parsedMessage.getMetadata().get(SpecExecutor.VERB_KEY));
+ } catch (IllegalArgumentException | NullPointerException e) {
+ log.error("Unknown verb {} for spec {}",
parsedMessage.getMetadata().get(SpecExecutor.VERB_KEY), parsedMessage.getUri());
+ continue;
+ }
+
+ switch (verb) {
+ case ADD:
+ case UPDATE:
+ this.newSpecs.inc();
+ this.jobCatalog.put(parsedMessage);
+ break;
+ case UNKNOWN: // unknown are considered as add request to maintain
backward compatibility
+ log.warn("Job Spec Verb is 'UNKNOWN', putting this spec in job
catalog anyway.");
+ this.jobCatalog.put(parsedMessage);
+ break;
+ case DELETE:
+ this.removedSpecs.inc();
+ URI jobSpecUri = parsedMessage.getUri();
+ this.jobCatalog.remove(jobSpecUri);
+ // Delete the job state if it is a delete spec request
+ deleteStateStore(jobSpecUri);
+ break;
+ case CANCEL:
+ this.jobCatalog.remove(parsedMessage.getUri(), true);
+ break;
+ default:
+ log.error("Cannot process spec {} with verb {}",
parsedMessage.getUri(), verb);
}
}
} catch (IOException ioe) {
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_monitor/SLAEventKafkaJobMonitor.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_monitor/SLAEventKafkaJobMonitor.java
index 576b8c4..e6cf0a9 100644
---
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_monitor/SLAEventKafkaJobMonitor.java
+++
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_monitor/SLAEventKafkaJobMonitor.java
@@ -172,7 +172,7 @@ public class SLAEventKafkaJobMonitor extends
KafkaAvroJobMonitor<GobblinTracking
}
@Override
- public Collection<Either<JobSpec, URI>> parseJobSpec(GobblinTrackingEvent
event) {
+ public Collection<JobSpec> parseJobSpec(GobblinTrackingEvent event) {
if (!acceptEvent(event)) {
this.rejectedEvents.inc();
@@ -192,7 +192,7 @@ public class SLAEventKafkaJobMonitor extends
KafkaAvroJobMonitor<GobblinTracking
JobSpec jobSpec =
JobSpec.builder(jobSpecURI).withTemplate(this.template).withConfig(jobConfig).build();
- return Lists.newArrayList(Either.<JobSpec, URI>left(jobSpec));
+ return Lists.newArrayList(jobSpec);
}
/**
diff --git
a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/job_monitor/MockedKafkaJobMonitor.java
b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/job_monitor/MockedKafkaJobMonitor.java
index b68bc1a..2d0c64d 100644
---
a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/job_monitor/MockedKafkaJobMonitor.java
+++
b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/job_monitor/MockedKafkaJobMonitor.java
@@ -31,19 +31,18 @@ import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import com.google.common.base.Charsets;
-import com.google.common.base.Optional;
import com.google.common.base.Predicate;
import com.google.common.base.Splitter;
+import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
-import org.apache.gobblin.kafka.client.GobblinKafkaConsumerClient;
import org.apache.gobblin.runtime.api.JobSpec;
import org.apache.gobblin.runtime.api.MutableJobCatalog;
+import org.apache.gobblin.runtime.api.SpecExecutor;
import org.apache.gobblin.testing.AssertWithBackoff;
-import org.apache.gobblin.util.Either;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
@@ -90,18 +89,18 @@ public class MockedKafkaJobMonitor extends KafkaJobMonitor {
jobSpecs.remove(uri);
return null;
}
- }).when(jobCatalog).remove(Mockito.any(URI.class), Mockito.anyBoolean());
+ }).when(jobCatalog).remove(Mockito.any(URI.class));
return jobCatalog;
}
@Override
- public Collection<Either<JobSpec, URI>> parseJobSpec(byte[] message)
+ public Collection<JobSpec> parseJobSpec(byte[] message)
throws IOException {
try {
String messageString = new String(message, Charsets.UTF_8);
- List<Either<JobSpec, URI>> jobSpecs = Lists.newArrayList();
+ List<JobSpec> jobSpecs = Lists.newArrayList();
for (String oneInstruction : SPLITTER_COMMA.split(messageString)) {
@@ -109,12 +108,15 @@ public class MockedKafkaJobMonitor extends
KafkaJobMonitor {
if (tokens.get(0).equals(REMOVE)) {
URI uri = new URI(tokens.get(1));
- jobSpecs.add(Either.<JobSpec, URI>right(uri));
+ JobSpec jobSpec = new
JobSpec.Builder(uri).withConfig(ConfigFactory.empty())
+ .withMetadata(ImmutableMap.of(SpecExecutor.VERB_KEY,
SpecExecutor.Verb.DELETE.name())).build();
+ jobSpecs.add(jobSpec);
} else {
URI uri = new URI(tokens.get(0));
String version = tokens.get(1);
- JobSpec jobSpec = new
JobSpec.Builder(uri).withConfig(ConfigFactory.empty()).withVersion(version).build();
- jobSpecs.add(Either.<JobSpec, URI>left(jobSpec));
+ JobSpec jobSpec = new
JobSpec.Builder(uri).withConfig(ConfigFactory.empty()).withVersion(version)
+ .withMetadata(ImmutableMap.of(SpecExecutor.VERB_KEY,
SpecExecutor.Verb.ADD.name())).build();
+ jobSpecs.add(jobSpec);
}
}
return jobSpecs;