[GOBBLIN-406] delete job state store on spec delete request Closes #2281 from arjun4084346/deleteInCluster
Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/f8950570 Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/f8950570 Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/f8950570 Branch: refs/heads/0.12.0 Commit: f89505702a3a130a7e73c617b33a285ea228bd8c Parents: 5c678d9 Author: Arjun <[email protected]> Authored: Fri Feb 9 16:04:10 2018 -0800 Committer: Hung Tran <[email protected]> Committed: Fri Feb 9 16:04:10 2018 -0800 ---------------------------------------------------------------------- .../runtime/job_monitor/KafkaJobMonitor.java | 25 ++++++++++++++++- .../job_monitor/KafkaJobMonitorTest.java | 24 +++++++++++++++-- .../runtime/kafka/HighLevelConsumerTest.java | 7 +++++ .../modules/flow/BaseFlowToJobSpecCompiler.java | 28 +++++++++++++++++++- .../flow/MultiHopsFlowToJobSpecCompiler.java | 8 ++++-- 5 files changed, 86 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f8950570/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_monitor/KafkaJobMonitor.java ---------------------------------------------------------------------- diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_monitor/KafkaJobMonitor.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_monitor/KafkaJobMonitor.java index ba79305..0bb4f14 100644 --- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_monitor/KafkaJobMonitor.java +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_monitor/KafkaJobMonitor.java @@ -26,6 +26,8 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Charsets; import com.typesafe.config.Config; +import org.apache.gobblin.configuration.ConfigurationKeys; +import org.apache.gobblin.metastore.DatasetStateStore; import org.apache.gobblin.runtime.api.JobSpec; import org.apache.gobblin.runtime.api.JobSpecMonitor; import org.apache.gobblin.runtime.api.MutableJobCatalog; @@ -50,8 +52,9 @@ public abstract class KafkaJobMonitor extends HighLevelConsumer<byte[], byte[]> public static final String KAFKA_AUTO_OFFSET_RESET_KEY = KAFKA_JOB_MONITOR_PREFIX + ".auto.offset.reset"; public static final String KAFKA_AUTO_OFFSET_RESET_SMALLEST = "smallest"; public static final String KAFKA_AUTO_OFFSET_RESET_LARGEST = "largest"; - + private DatasetStateStore datasetStateStore; private final MutableJobCatalog jobCatalog; + @Getter private Counter newSpecs; @Getter @@ -67,6 +70,15 @@ public abstract class KafkaJobMonitor extends HighLevelConsumer<byte[], byte[]> public KafkaJobMonitor(String topic, MutableJobCatalog catalog, Config config) { super(topic, ConfigUtils.getConfigOrEmpty(config, KAFKA_JOB_MONITOR_PREFIX), 1); this.jobCatalog = catalog; + try { + if (config.hasPath(ConfigurationKeys.STATE_STORE_ENABLED) && + config.getBoolean(ConfigurationKeys.STATE_STORE_ENABLED) && + config.hasPath(ConfigurationKeys.STATE_STORE_ROOT_DIR_KEY)) { + this.datasetStateStore = DatasetStateStore.buildDatasetStateStore(config); + } + } catch (IOException e) { + log.warn("DatasetStateStore could not be created."); + } } @Override @@ -100,6 +112,17 @@ public abstract class KafkaJobMonitor extends HighLevelConsumer<byte[], byte[]> } else if (parsedMessage instanceof Either.Right) { this.remmovedSpecs.inc(); this.jobCatalog.remove(((Either.Right<JobSpec, URI>) parsedMessage).getRight()); + + // Refer FlowConfigsResources:delete to understand the pattern of flow URI + // FlowToJobSpec Compilers use the flowSpecURI to derive jobSpecURI + String[] uriTokens = ((URI)(((Either.Right) parsedMessage).getRight())).getPath().split("/"); + if (uriTokens.length == 3) { + String jobName = uriTokens[2]; + // Delete the job state if it is a delete spec request + if (this.datasetStateStore != null) { + this.datasetStateStore.delete(jobName); + } + } } } } catch (IOException ioe) { http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f8950570/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/job_monitor/KafkaJobMonitorTest.java ---------------------------------------------------------------------- diff --git a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/job_monitor/KafkaJobMonitorTest.java b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/job_monitor/KafkaJobMonitorTest.java index 1dd90b1..57f99a9 100644 --- a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/job_monitor/KafkaJobMonitorTest.java +++ b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/job_monitor/KafkaJobMonitorTest.java @@ -19,11 +19,16 @@ package org.apache.gobblin.runtime.job_monitor; import java.net.URI; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.testng.Assert; import org.testng.annotations.Test; import com.google.common.base.Optional; +import com.typesafe.config.Config; +import org.apache.gobblin.configuration.ConfigurationKeys; import org.apache.gobblin.runtime.kafka.HighLevelConsumerTest; @@ -32,8 +37,11 @@ public class KafkaJobMonitorTest { @Test public void test() throws Exception { - MockedKafkaJobMonitor monitor = - MockedKafkaJobMonitor.create(HighLevelConsumerTest.getSimpleConfig(Optional.of(KafkaJobMonitor.KAFKA_JOB_MONITOR_PREFIX))); + Config config = HighLevelConsumerTest.getSimpleConfig(Optional.of(KafkaJobMonitor.KAFKA_JOB_MONITOR_PREFIX)); + String stateStoreRootDir = config.getString(ConfigurationKeys.STATE_STORE_ROOT_DIR_KEY); + FileSystem fs = FileSystem.getLocal(new Configuration()); + + MockedKafkaJobMonitor monitor = MockedKafkaJobMonitor.create(config); monitor.startAsync(); monitor.getMockKafkaStream().pushToStream("job1:1"); @@ -58,6 +66,18 @@ public class KafkaJobMonitorTest { Assert.assertTrue(monitor.getJobSpecs().containsKey(new URI("job2"))); Assert.assertEquals(monitor.getJobSpecs().get(new URI("job2")).getVersion(), "2"); + monitor.getMockKafkaStream().pushToStream("/flow3/job3:1"); + monitor.awaitExactlyNSpecs(3); + Assert.assertTrue(monitor.getJobSpecs().containsKey(new URI("/flow3/job3"))); + + // TODO: Currently, state stores are not categorized by flow name. + // This can lead to one job overwriting other jobs' job state. + fs.create(new Path(stateStoreRootDir, "job3")); + Assert.assertTrue(fs.exists(new Path(stateStoreRootDir, "job3"))); + monitor.getMockKafkaStream().pushToStream(MockedKafkaJobMonitor.REMOVE + ":/flow3/job3"); + monitor.awaitExactlyNSpecs(2); + Assert.assertFalse(fs.exists(new Path(stateStoreRootDir, "job3"))); + monitor.shutDown(); } } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f8950570/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/kafka/HighLevelConsumerTest.java ---------------------------------------------------------------------- diff --git a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/kafka/HighLevelConsumerTest.java b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/kafka/HighLevelConsumerTest.java index 6e8d7a2..e8d4e6c 100644 --- a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/kafka/HighLevelConsumerTest.java +++ b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/kafka/HighLevelConsumerTest.java @@ -17,6 +17,7 @@ package org.apache.gobblin.runtime.kafka; +import java.io.File; import java.util.Properties; import java.util.concurrent.TimeoutException; @@ -24,9 +25,11 @@ import org.testng.Assert; import org.testng.annotations.Test; import com.google.common.base.Optional; +import com.google.common.io.Files; import com.typesafe.config.Config; import com.typesafe.config.ConfigFactory; +import org.apache.gobblin.configuration.ConfigurationKeys; import org.apache.gobblin.runtime.job_monitor.MockKafkaStream; @@ -35,6 +38,10 @@ public class HighLevelConsumerTest { public static Config getSimpleConfig(Optional<String> prefix) { Properties properties = new Properties(); properties.put(getConfigKey(prefix, "zookeeper.connect"), "zookeeper"); + properties.put(ConfigurationKeys.STATE_STORE_ENABLED, "true"); + File tmpDir = Files.createTempDir(); + tmpDir.deleteOnExit(); + properties.put(ConfigurationKeys.STATE_STORE_ROOT_DIR_KEY, tmpDir.toString()); return ConfigFactory.parseProperties(properties); } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f8950570/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/BaseFlowToJobSpecCompiler.java ---------------------------------------------------------------------- diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/BaseFlowToJobSpecCompiler.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/BaseFlowToJobSpecCompiler.java index db92ef9..855d692 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/BaseFlowToJobSpecCompiler.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/BaseFlowToJobSpecCompiler.java @@ -211,7 +211,7 @@ public abstract class BaseFlowToJobSpecCompiler implements SpecCompiler { */ protected JobSpec jobSpecGenerator(FlowSpec flowSpec) { JobSpec jobSpec; - JobSpec.Builder jobSpecBuilder = JobSpec.builder(flowSpec.getUri()) + JobSpec.Builder jobSpecBuilder = JobSpec.builder(jobSpecURIGenerator(flowSpec)) .withConfig(flowSpec.getConfig()) .withDescription(flowSpec.getDescription()) .withVersion(flowSpec.getVersion()); @@ -254,6 +254,32 @@ public abstract class BaseFlowToJobSpecCompiler implements SpecCompiler { } /** + * It can receive multiple number of parameters, needed to generate a unique URI. + * Implementation is flowSpecCompiler dependent. + * This method should return URI which has job name at third place, when split by "/" + * e.g. /flowGroup/flowName + * /flowGroup/flowName/sourceNode-targetNode + * SafeDatasetCommit creates state store using this name and + * {@link org.apache.gobblin.runtime.job_monitor.KafkaJobMonitor} extract job name to find the state store path. + * @param objects + * @return + */ + public URI jobSpecURIGenerator(Object... objects) { + return ((FlowSpec)objects[0]).getUri(); + } + + /** + * It returns the template uri for job. + * This method can be overridden by derived FlowToJobSpecCompiler classes. + * @param flowSpec + * @return template URI + */ + protected URI jobSpecTemplateURIGenerator(FlowSpec flowSpec) { + // For now only first template uri will be honored for Identity + return flowSpec.getTemplateURIs().get().iterator().next(); + } + + /** * Ideally each edge has its own eligible template repository(Based on {@link SpecExecutor}) * to pick templates from. * http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f8950570/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopsFlowToJobSpecCompiler.java ---------------------------------------------------------------------- diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopsFlowToJobSpecCompiler.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopsFlowToJobSpecCompiler.java index ba5c203..544ca42 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopsFlowToJobSpecCompiler.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopsFlowToJobSpecCompiler.java @@ -323,14 +323,18 @@ public class MultiHopsFlowToJobSpecCompiler extends BaseFlowToJobSpecCompiler { private URI getTemplateURI (ServiceNode sourceNode, ServiceNode targetNode, FlowSpec flowSpec, FlowEdge flowEdge) { URI firstTemplateURI = (edgeTemplateMap != null && edgeTemplateMap.containsKey(flowEdge.getEdgeIdentity())) ? edgeTemplateMap.get( - flowEdge.getEdgeIdentity()).get(0) : jobSpecGenerator(flowSpec).getTemplateURI().orNull(); + flowEdge.getEdgeIdentity()).get(0) : jobSpecTemplateURIGenerator(flowSpec); return firstTemplateURI; } /** * A naive implementation of generating a jobSpec's URI within a multi-hop logical Flow. */ - public static URI jobSpecURIGenerator(FlowSpec flowSpec, ServiceNode sourceNode, ServiceNode targetNode) { + @Override + public URI jobSpecURIGenerator(Object... objects) { + FlowSpec flowSpec = (FlowSpec) objects[0]; + ServiceNode sourceNode = (ServiceNode) objects[1]; + ServiceNode targetNode = (ServiceNode) objects[2]; try { return new URI(JobSpec.Builder.DEFAULT_JOB_CATALOG_SCHEME, flowSpec.getUri().getAuthority(), StringUtils.appendIfMissing(StringUtils.prependIfMissing(flowSpec.getUri().getPath(), "/"),"/")
