This is an automated email from the ASF dual-hosted git repository.
suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 4e238b7 [GOBBLIN-756] Add flow catalog that updates when filesystem
is modified
4e238b7 is described below
commit 4e238b7b823304cc3640db872498e3ab12d45aae
Author: Jack Moseley <[email protected]>
AuthorDate: Wed May 1 11:49:26 2019 -0700
[GOBBLIN-756] Add flow catalog that updates when filesystem is modified
Closes #2620 from jack-moseley/dynamic-flow-
template
---
.../runtime/job_catalog/ImmutableFSJobCatalog.java | 11 +-
.../service/modules/core/GitFlowGraphMonitor.java | 13 ++-
.../service/modules/flow/MultiHopFlowCompiler.java | 18 ++-
.../service/modules/flowgraph/BaseFlowEdge.java | 38 ++++++-
.../service/modules/flowgraph/BaseFlowGraph.java | 4 +-
.../ObservingFSFlowEdgeTemplateCatalog.java | 121 +++++++++++++++++++++
.../FSFlowTemplateCatalogTest.java | 4 +-
.../ObservingFSFlowEdgeTemplateCatalogTest.java | 115 ++++++++++++++++++++
8 files changed, 300 insertions(+), 24 deletions(-)
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_catalog/ImmutableFSJobCatalog.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_catalog/ImmutableFSJobCatalog.java
index fb08bc0..86e2b96 100644
---
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_catalog/ImmutableFSJobCatalog.java
+++
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_catalog/ImmutableFSJobCatalog.java
@@ -52,6 +52,7 @@ import org.apache.gobblin.runtime.api.JobSpec;
import org.apache.gobblin.runtime.api.JobSpecNotFoundException;
import org.apache.gobblin.util.PathUtils;
import org.apache.gobblin.util.PullFileLoader;
+import org.apache.gobblin.util.filesystem.PathAlterationListener;
import org.apache.gobblin.util.filesystem.PathAlterationObserverScheduler;
import org.apache.gobblin.util.filesystem.PathAlterationObserver;
@@ -127,13 +128,15 @@ public class ImmutableFSJobCatalog extends JobCatalogBase
implements JobCatalog
// If absent, the Optional object will be created automatically by
addPathAlterationObserver
Optional<PathAlterationObserver> observerOptional =
Optional.fromNullable(observer);
- FSPathAlterationListenerAdaptor configFilelistener =
- new FSPathAlterationListenerAdaptor(this.jobConfDirPath,
this.loader, this.sysConfig, this.listeners,
- this.converter);
-
this.pathAlterationDetector.addPathAlterationObserver(configFilelistener,
observerOptional, this.jobConfDirPath);
+ this.pathAlterationDetector.addPathAlterationObserver(getListener(),
observerOptional, this.jobConfDirPath);
}
}
+ protected PathAlterationListener getListener() {
+ return new FSPathAlterationListenerAdaptor(this.jobConfDirPath,
this.loader, this.sysConfig, this.listeners,
+ this.converter);
+ }
+
@Override
protected void startUp()
throws IOException {
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitFlowGraphMonitor.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitFlowGraphMonitor.java
index 024c9a1..5a69371 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitFlowGraphMonitor.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitFlowGraphMonitor.java
@@ -88,15 +88,16 @@ public class GitFlowGraphMonitor extends
GitMonitoringService {
.put(SHOULD_CHECKPOINT_HASHES, false)
.build());
- private Optional<FSFlowTemplateCatalog> flowCatalog;
+ private Optional<? extends FSFlowTemplateCatalog> flowTemplateCatalog;
private FlowGraph flowGraph;
private final Map<URI, TopologySpec> topologySpecMap;
private final Config emptyConfig = ConfigFactory.empty();
private final CountDownLatch initComplete;
- public GitFlowGraphMonitor(Config config, Optional<FSFlowTemplateCatalog>
flowCatalog, FlowGraph graph, Map<URI, TopologySpec> topologySpecMap,
CountDownLatch initComplete) {
+ public GitFlowGraphMonitor(Config config, Optional<? extends
FSFlowTemplateCatalog> flowTemplateCatalog,
+ FlowGraph graph, Map<URI, TopologySpec> topologySpecMap, CountDownLatch
initComplete) {
super(config.getConfig(GIT_FLOWGRAPH_MONITOR_PREFIX).withFallback(DEFAULT_FALLBACK));
- this.flowCatalog = flowCatalog;
+ this.flowTemplateCatalog = flowTemplateCatalog;
this.flowGraph = graph;
this.topologySpecMap = topologySpecMap;
this.initComplete = initComplete;
@@ -224,15 +225,15 @@ public class GitFlowGraphMonitor extends
GitMonitoringService {
Class flowEdgeFactoryClass =
Class.forName(ConfigUtils.getString(edgeConfig,
FlowGraphConfigurationKeys.FLOW_EDGE_FACTORY_CLASS,
FlowGraphConfigurationKeys.DEFAULT_FLOW_EDGE_FACTORY_CLASS));
FlowEdgeFactory flowEdgeFactory = (FlowEdgeFactory)
GobblinConstructorUtils.invokeLongestConstructor(flowEdgeFactoryClass,
edgeConfig);
- if (flowCatalog.isPresent()) {
- FlowEdge edge = flowEdgeFactory.createFlowEdge(edgeConfig,
flowCatalog.get(), specExecutors);
+ if (flowTemplateCatalog.isPresent()) {
+ FlowEdge edge = flowEdgeFactory.createFlowEdge(edgeConfig,
flowTemplateCatalog.get(), specExecutors);
if (!this.flowGraph.addFlowEdge(edge)) {
log.warn("Could not add edge {} to FlowGraph; skipping",
edge.getId());
} else {
log.info("Added edge {} to FlowGraph", edge.getId());
}
} else {
- log.warn("Could not add edge defined in {} to FlowGraph as
FlowCatalog is absent", change.getNewPath());
+ log.warn("Could not add edge defined in {} to FlowGraph as
FlowTemplateCatalog is absent", change.getNewPath());
}
} catch (Exception e) {
log.warn("Could not add edge defined in {} due to exception {}",
change.getNewPath(), e.getMessage());
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java
index 347235a..f067344 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java
@@ -23,9 +23,10 @@ import java.util.ArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.commons.lang3.StringUtils;
-import
org.apache.gobblin.service.modules.template_catalog.FSFlowTemplateCatalog;
import org.slf4j.Logger;
import com.google.common.annotations.VisibleForTesting;
@@ -54,6 +55,7 @@ import org.apache.gobblin.service.modules.flowgraph.FlowGraph;
import org.apache.gobblin.service.modules.flowgraph.pathfinder.PathFinder;
import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
import org.apache.gobblin.service.modules.spec.JobExecutionPlanDagFactory;
+import
org.apache.gobblin.service.modules.template_catalog.ObservingFSFlowEdgeTemplateCatalog;
import org.apache.gobblin.util.ConfigUtils;
@@ -73,6 +75,8 @@ public class MultiHopFlowCompiler extends
BaseFlowToJobSpecCompiler {
private GitFlowGraphMonitor gitFlowGraphMonitor;
+ private ReadWriteLock rwLock = new ReentrantReadWriteLock(true);
+
public MultiHopFlowCompiler(Config config) {
this(config, true);
}
@@ -88,11 +92,11 @@ public class MultiHopFlowCompiler extends
BaseFlowToJobSpecCompiler {
public MultiHopFlowCompiler(Config config, Optional<Logger> log, boolean
instrumentationEnabled) {
super(config, log, instrumentationEnabled);
this.flowGraph = new BaseFlowGraph();
- Optional<FSFlowTemplateCatalog> flowCatalog = Optional.absent();
+ Optional<ObservingFSFlowEdgeTemplateCatalog> flowTemplateCatalog =
Optional.absent();
if
(config.hasPath(ServiceConfigKeys.TEMPLATE_CATALOGS_FULLY_QUALIFIED_PATH_KEY)
&&
StringUtils.isNotBlank(config.getString(ServiceConfigKeys.TEMPLATE_CATALOGS_FULLY_QUALIFIED_PATH_KEY)))
{
try {
- flowCatalog = Optional.of(new FSFlowTemplateCatalog(config));
+ flowTemplateCatalog = Optional.of(new
ObservingFSFlowEdgeTemplateCatalog(config, rwLock));
} catch (IOException e) {
throw new RuntimeException("Cannot instantiate " +
getClass().getName(), e);
}
@@ -105,8 +109,8 @@ public class MultiHopFlowCompiler extends
BaseFlowToJobSpecCompiler {
gitFlowGraphConfig = this.config
.withValue(GitFlowGraphMonitor.GIT_FLOWGRAPH_MONITOR_PREFIX + "." +
ConfigurationKeys.ENCRYPT_KEY_LOC,
config.getValue(ConfigurationKeys.ENCRYPT_KEY_LOC));
}
- this.gitFlowGraphMonitor = new GitFlowGraphMonitor(gitFlowGraphConfig,
flowCatalog, this.flowGraph, this.topologySpecMap, this.getInitComplete());
- this.serviceManager = new
ServiceManager(Lists.newArrayList(this.gitFlowGraphMonitor));
+ this.gitFlowGraphMonitor = new GitFlowGraphMonitor(gitFlowGraphConfig,
flowTemplateCatalog, this.flowGraph, this.topologySpecMap,
this.getInitComplete());
+ this.serviceManager = new
ServiceManager(Lists.newArrayList(this.gitFlowGraphMonitor,
flowTemplateCatalog.get()));
addShutdownHook();
//Start the git flow graph monitor
try {
@@ -167,6 +171,7 @@ public class MultiHopFlowCompiler extends
BaseFlowToJobSpecCompiler {
Dag<JobExecutionPlan> jobExecutionPlanDag;
try {
+ this.rwLock.readLock().lock();
//Compute the path from source to destination.
FlowGraphPath flowGraphPath = flowGraph.findPath(flowSpec);
//Convert the path into a Dag of JobExecutionPlans.
@@ -183,9 +188,12 @@ public class MultiHopFlowCompiler extends
BaseFlowToJobSpecCompiler {
.format("Exception encountered while compiling flow for source:
%s and destination: %s", source, destination),
e);
return null;
+ } finally {
+ this.rwLock.readLock().unlock();
}
Instrumented.markMeter(flowCompilationSuccessFulMeter);
Instrumented.updateTimer(flowCompilationTimer, System.nanoTime() -
startTime, TimeUnit.NANOSECONDS);
+
return jobExecutionPlanDag;
}
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowEdge.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowEdge.java
index 0c1023e..3d65f6e 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowEdge.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowEdge.java
@@ -17,10 +17,11 @@
package org.apache.gobblin.service.modules.flowgraph;
+import java.io.IOException;
import java.net.URI;
+import java.net.URISyntaxException;
import java.util.List;
-import
org.apache.gobblin.service.modules.template_catalog.FSFlowTemplateCatalog;
import org.apache.hadoop.security.UserGroupInformation;
import com.google.common.base.Preconditions;
@@ -29,17 +30,23 @@ import com.typesafe.config.Config;
import joptsimple.internal.Strings;
import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.runtime.api.JobTemplate;
import org.apache.gobblin.runtime.api.SpecExecutor;
+import org.apache.gobblin.runtime.api.SpecNotFoundException;
import org.apache.gobblin.service.modules.template.FlowTemplate;
+import
org.apache.gobblin.service.modules.template_catalog.FSFlowTemplateCatalog;
import org.apache.gobblin.util.ConfigUtils;
/**
- * An implementation of {@link FlowEdge}.
+ * An implementation of {@link FlowEdge}. If a {@link FSFlowTemplateCatalog}
is specified in the constructor,
+ * {@link #flowTemplate} is reloaded when {@link #getFlowTemplate()} is called.
*/
@Alpha
+@Slf4j
public class BaseFlowEdge implements FlowEdge {
@Getter
protected String src;
@@ -47,7 +54,6 @@ public class BaseFlowEdge implements FlowEdge {
@Getter
protected String dest;
- @Getter
protected FlowTemplate flowTemplate;
@Getter
@@ -62,8 +68,16 @@ public class BaseFlowEdge implements FlowEdge {
@Getter
private boolean active;
+ private final FSFlowTemplateCatalog flowTemplateCatalog;
+
//Constructor
- public BaseFlowEdge(List<String> endPoints, String edgeId, FlowTemplate
flowTemplate, List<SpecExecutor> executors, Config properties, boolean active) {
+ public BaseFlowEdge(List<String> endPoints, String edgeId, FlowTemplate
flowTemplate, List<SpecExecutor> executors,
+ Config properties, boolean active) {
+ this(endPoints, edgeId, flowTemplate, executors, properties, active, null);
+ }
+
+ public BaseFlowEdge(List<String> endPoints, String edgeId, FlowTemplate
flowTemplate, List<SpecExecutor> executors,
+ Config properties, boolean active, FSFlowTemplateCatalog
flowTemplateCatalog) {
this.src = endPoints.get(0);
this.dest = endPoints.get(1);
this.flowTemplate = flowTemplate;
@@ -71,6 +85,20 @@ public class BaseFlowEdge implements FlowEdge {
this.active = active;
this.config = properties;
this.id = edgeId;
+ this.flowTemplateCatalog = flowTemplateCatalog;
+ }
+
+ @Override
+ public FlowTemplate getFlowTemplate() {
+ try {
+ if (this.flowTemplateCatalog != null) {
+ this.flowTemplate =
this.flowTemplateCatalog.getFlowTemplate(this.flowTemplate.getUri());
+ }
+ } catch (SpecNotFoundException | JobTemplate.TemplateException |
IOException | URISyntaxException e) {
+ // If loading template fails, use the template that was successfully
loaded on construction
+ log.warn("Failed to get flow template at " + this.flowTemplate.getUri()
+ ", using in-memory flow template");
+ }
+ return this.flowTemplate;
}
@Override
@@ -148,7 +176,7 @@ public class BaseFlowEdge implements FlowEdge {
boolean isActive = ConfigUtils.getBoolean(edgeProps,
FlowGraphConfigurationKeys.FLOW_EDGE_IS_ACTIVE_KEY, true);
FlowTemplate flowTemplate = flowTemplateCatalog.getFlowTemplate(new
URI(flowTemplateDirUri));
- return new BaseFlowEdge(endPoints, edgeId, flowTemplate,
specExecutors, edgeProps, isActive);
+ return new BaseFlowEdge(endPoints, edgeId, flowTemplate,
specExecutors, edgeProps, isActive, flowTemplateCatalog);
} catch (RuntimeException e) {
throw e;
} catch (Exception e) {
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowGraph.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowGraph.java
index e2b256f..83a9a58 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowGraph.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowGraph.java
@@ -24,6 +24,8 @@ import java.util.Set;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
+import lombok.extern.slf4j.Slf4j;
+
import org.apache.gobblin.annotation.Alpha;
import org.apache.gobblin.runtime.api.FlowSpec;
import org.apache.gobblin.service.modules.flow.FlowGraphPath;
@@ -31,8 +33,6 @@ import
org.apache.gobblin.service.modules.flowgraph.pathfinder.PathFinder;
import org.apache.gobblin.util.ConfigUtils;
import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
-import lombok.extern.slf4j.Slf4j;
-
/**
* A thread-safe implementation of {@link FlowGraph}. The implementation
maintains the following data structures:
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template_catalog/ObservingFSFlowEdgeTemplateCatalog.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template_catalog/ObservingFSFlowEdgeTemplateCatalog.java
new file mode 100644
index 0000000..9b68024
--- /dev/null
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template_catalog/ObservingFSFlowEdgeTemplateCatalog.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.template_catalog;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.locks.ReadWriteLock;
+
+import org.apache.hadoop.fs.Path;
+
+import com.typesafe.config.Config;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.runtime.api.JobTemplate;
+import org.apache.gobblin.runtime.api.SpecNotFoundException;
+import org.apache.gobblin.service.modules.template.FlowTemplate;
+import org.apache.gobblin.util.filesystem.PathAlterationListener;
+import org.apache.gobblin.util.filesystem.PathAlterationListenerAdaptor;
+
+
+/**
+ * {@link FSFlowTemplateCatalog} that keeps a cache of flow and job templates.
It has a
+ * {@link org.apache.gobblin.util.filesystem.PathAlterationListener} on the
root path, and clears the cache when a change
+ * is detected so that the next call to {@link #getFlowTemplate(URI)} will use
the updated files.
+ */
+@Slf4j
+public class ObservingFSFlowEdgeTemplateCatalog extends FSFlowTemplateCatalog {
+ private Map<URI, FlowTemplate> flowTemplateMap = new ConcurrentHashMap<>();
+ private Map<URI, List<JobTemplate>> jobTemplateMap = new
ConcurrentHashMap<>();
+ private ReadWriteLock rwLock;
+
+ public ObservingFSFlowEdgeTemplateCatalog(Config sysConfig, ReadWriteLock
rwLock) throws IOException {
+ super(sysConfig);
+ this.rwLock = rwLock;
+ }
+
+ @Override
+ public FlowTemplate getFlowTemplate(URI flowTemplateDirURI)
+ throws SpecNotFoundException, JobTemplate.TemplateException,
IOException, URISyntaxException {
+ FlowTemplate flowTemplate =
flowTemplateMap.getOrDefault(flowTemplateDirURI, null);
+
+ if (flowTemplate == null) {
+ flowTemplate = super.getFlowTemplate(flowTemplateDirURI);
+ flowTemplateMap.put(flowTemplateDirURI, flowTemplate);
+ }
+
+ return flowTemplate;
+ }
+
+ @Override
+ public List<JobTemplate> getJobTemplatesForFlow(URI flowTemplateDirURI)
+ throws IOException, SpecNotFoundException,
JobTemplate.TemplateException, URISyntaxException {
+ List<JobTemplate> jobTemplates =
jobTemplateMap.getOrDefault(flowTemplateDirURI, null);
+
+ if (jobTemplates == null) {
+ jobTemplates = super.getJobTemplatesForFlow(flowTemplateDirURI);
+ jobTemplateMap.put(flowTemplateDirURI, jobTemplates);
+ }
+
+ return jobTemplates;
+ }
+
+ @Override
+ protected PathAlterationListener getListener() {
+ return new FlowCatalogPathAlterationListener();
+ }
+
+ @Override
+ protected void startUp() throws IOException {
+ if (this.pathAlterationDetector != null) {
+ this.pathAlterationDetector.start();
+ }
+ }
+
+ /**
+ * Clear cached templates so they will be reloaded next time {@link
#getFlowTemplate(URI)} is called.
+ */
+ private void clearTemplates() {
+ this.rwLock.writeLock().lock();
+ log.info("Change detected, reloading flow templates.");
+ flowTemplateMap.clear();
+ jobTemplateMap.clear();
+ this.rwLock.writeLock().unlock();
+ }
+
+ /**
+ * {@link org.apache.gobblin.util.filesystem.PathAlterationListener} that
clears flow/job template cache if a file is
+ * created or updated.
+ */
+ private class FlowCatalogPathAlterationListener extends
PathAlterationListenerAdaptor {
+ @Override
+ public void onFileCreate(Path path) {
+ clearTemplates();
+ }
+
+ @Override
+ public void onFileChange(Path path) {
+ clearTemplates();
+ }
+ }
+}
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/template_catalog/FSFlowTemplateCatalogTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/template_catalog/FSFlowTemplateCatalogTest.java
index 550c97d..f8cdf79 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/template_catalog/FSFlowTemplateCatalogTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/template_catalog/FSFlowTemplateCatalogTest.java
@@ -44,8 +44,8 @@ import lombok.extern.slf4j.Slf4j;
@Slf4j
public class FSFlowTemplateCatalogTest {
- private static final String TEST_TEMPLATE_NAME = "flowEdgeTemplate";
- private static final String TEST_TEMPLATE_DIR_URI = "FS:///" +
TEST_TEMPLATE_NAME;
+ public static final String TEST_TEMPLATE_NAME = "flowEdgeTemplate";
+ public static final String TEST_TEMPLATE_DIR_URI = "FS:///" +
TEST_TEMPLATE_NAME;
@Test
public void testGetFlowTemplate() throws Exception {
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/template_catalog/ObservingFSFlowEdgeTemplateCatalogTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/template_catalog/ObservingFSFlowEdgeTemplateCatalogTest.java
new file mode 100644
index 0000000..b1de754
--- /dev/null
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/template_catalog/ObservingFSFlowEdgeTemplateCatalogTest.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.template_catalog;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.file.Path;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.apache.commons.io.FileUtils;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Lists;
+import com.google.common.io.Files;
+import com.google.common.util.concurrent.ServiceManager;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+
+import lombok.AllArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.runtime.api.JobTemplate;
+import org.apache.gobblin.runtime.api.SpecNotFoundException;
+import org.apache.gobblin.service.ServiceConfigKeys;
+import org.apache.gobblin.service.modules.template.FlowTemplate;
+import org.apache.gobblin.testing.AssertWithBackoff;
+
+
+@Slf4j
+public class ObservingFSFlowEdgeTemplateCatalogTest {
+
+ private File templateDir;
+ private Config templateCatalogCfg;
+
+ @BeforeClass
+ public void setUp() throws Exception {
+ URI flowTemplateCatalogUri =
this.getClass().getClassLoader().getResource("template_catalog").toURI();
+ this.templateDir = Files.createTempDir();
+ FileUtils.forceDeleteOnExit(templateDir);
+ FileUtils.copyDirectory(new File(flowTemplateCatalogUri.getPath()),
templateDir);
+ Properties properties = new Properties();
+
properties.put(ServiceConfigKeys.TEMPLATE_CATALOGS_FULLY_QUALIFIED_PATH_KEY,
templateDir.toURI().toString());
+
properties.put(ConfigurationKeys.JOB_CONFIG_FILE_MONITOR_POLLING_INTERVAL_KEY,
"1000");
+ Config config = ConfigFactory.parseProperties(properties);
+ this.templateCatalogCfg =
config.withValue(ConfigurationKeys.JOB_CONFIG_FILE_GENERAL_PATH_KEY,
+
config.getValue(ServiceConfigKeys.TEMPLATE_CATALOGS_FULLY_QUALIFIED_PATH_KEY));
+ }
+
+ @Test
+ public void testModifyFlowTemplate() throws Exception {
+ ObservingFSFlowEdgeTemplateCatalog catalog = new
ObservingFSFlowEdgeTemplateCatalog(this.templateCatalogCfg, new
ReentrantReadWriteLock());
+ ServiceManager serviceManager = new
ServiceManager(Lists.newArrayList(catalog));
+ serviceManager.startAsync().awaitHealthy(5, TimeUnit.SECONDS);
+
+ // Check cached flow template is returned
+ FlowTemplate flowTemplate1 = catalog.getFlowTemplate(new
URI(FSFlowTemplateCatalogTest.TEST_TEMPLATE_DIR_URI));
+ FlowTemplate flowTemplate2 = catalog.getFlowTemplate(new
URI(FSFlowTemplateCatalogTest.TEST_TEMPLATE_DIR_URI));
+ Assert.assertSame(flowTemplate1, flowTemplate2);
+
+ // Update a file flow catalog and check that the getFlowTemplate returns
the new value
+ Path flowConfPath = new File(new File(this.templateDir,
FSFlowTemplateCatalogTest.TEST_TEMPLATE_NAME), "flow.conf").toPath();
+ List<String> lines = java.nio.file.Files.readAllLines(flowConfPath);
+ for (int i = 0; i < lines.size(); i++) {
+ if
(lines.get(i).equals("gobblin.flow.edge.input.dataset.descriptor.0.format=avro"))
{
+ lines.set(i,
"gobblin.flow.edge.input.dataset.descriptor.0.format=any");
+ break;
+ }
+ }
+ java.nio.file.Files.write(flowConfPath, lines);
+
+ Function testFunction = new GetFlowTemplateConfigFunction(new
URI(FSFlowTemplateCatalogTest.TEST_TEMPLATE_DIR_URI), catalog,
+ "gobblin.flow.edge.input.dataset.descriptor.0.format");
+ AssertWithBackoff.create().timeoutMs(10000).assertEquals(testFunction,
"any", "flow template updated");
+ }
+
+ @AllArgsConstructor
+ private class GetFlowTemplateConfigFunction implements Function<Void,
String> {
+ private URI flowTemplateCatalogUri;
+ private FSFlowTemplateCatalog flowTemplateCatalog;
+ private String configKey;
+
+ @Override
+ public String apply(Void input) {
+ try {
+ return
this.flowTemplateCatalog.getFlowTemplate(this.flowTemplateCatalogUri).getRawTemplateConfig().getString(this.configKey);
+ } catch (SpecNotFoundException | JobTemplate.TemplateException |
IOException | URISyntaxException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+}
\ No newline at end of file