This is an automated email from the ASF dual-hosted git repository.
wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new d29bd9f32 [GOBBLIN-1752] Fix race condition where FSTemplateCatalog
would update at the same t… (#3612)
d29bd9f32 is described below
commit d29bd9f32158bbc2cc9c9bbbdb90d565fb34b078
Author: William Lo <[email protected]>
AuthorDate: Thu Dec 8 12:46:14 2022 -0800
[GOBBLIN-1752] Fix race condition where FSTemplateCatalog would update at
the same t… (#3612)
* Fix race condition where FSTemplateCatalog would update at the same time
as the FSFlowGraph if their files are updated at the same time as the flowgraph
monitor
* Add javadoc
* Fix checkstyle
* Address review, simplify tests
---
.../apache/gobblin/service/ServiceConfigKeys.java | 1 +
.../service/modules/flow/MultiHopFlowCompiler.java | 16 ++--
.../FSPathAlterationFlowGraphListener.java | 14 +++-
.../ObservingFSFlowEdgeTemplateCatalog.java | 64 +---------------
...og.java => UpdatableFSFlowTemplateCatalog.java} | 68 +++--------------
.../service/monitoring/FsFlowGraphMonitor.java | 21 ++++--
.../UpdatableFSFFlowTemplateCatalogTest.java | 85 ++++++++++++++++++++++
.../service/monitoring/FsFlowGraphMonitorTest.java | 40 ++++++++--
8 files changed, 170 insertions(+), 139 deletions(-)
diff --git
a/gobblin-api/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java
b/gobblin-api/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java
index 1aa6e30dc..f80177a8d 100644
---
a/gobblin-api/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java
+++
b/gobblin-api/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java
@@ -103,6 +103,7 @@ public class ServiceConfigKeys {
// Template Catalog Keys
public static final String TEMPLATE_CATALOGS_FULLY_QUALIFIED_PATH_KEY =
GOBBLIN_SERVICE_PREFIX + "templateCatalogs.fullyQualifiedPath";
+ public static final String TEMPLATE_CATALOGS_CLASS_KEY =
GOBBLIN_SERVICE_PREFIX + "templateCatalogs.class";
// Keys related to user-specified policy on route selection.
// Undesired connection to form an executable JobSpec.
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java
index 1ffef3969..20d2339c7 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java
@@ -17,7 +17,6 @@
package org.apache.gobblin.service.modules.flow;
-import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.net.URISyntaxException;
import java.util.ArrayList;
@@ -61,6 +60,7 @@ import org.apache.gobblin.runtime.api.Spec;
import org.apache.gobblin.runtime.api.SpecExecutor;
import org.apache.gobblin.runtime.api.SpecNotFoundException;
import org.apache.gobblin.service.ServiceConfigKeys;
+import
org.apache.gobblin.service.modules.template_catalog.UpdatableFSFlowTemplateCatalog;
import org.apache.gobblin.service.monitoring.GitFlowGraphMonitor;
import org.apache.gobblin.service.modules.flowgraph.BaseFlowGraph;
import org.apache.gobblin.service.modules.flowgraph.Dag;
@@ -130,12 +130,16 @@ public class MultiHopFlowCompiler extends
BaseFlowToJobSpecCompiler {
// Use atomic reference to avoid partial flowgraph upgrades during path
compilation.
this.flowGraph = new AtomicReference<>(new
BaseFlowGraph(dataNodeAliasMap));
- Optional<ObservingFSFlowEdgeTemplateCatalog> flowTemplateCatalog =
Optional.absent();
+ Optional<? extends UpdatableFSFlowTemplateCatalog> flowTemplateCatalog;
if
(config.hasPath(ServiceConfigKeys.TEMPLATE_CATALOGS_FULLY_QUALIFIED_PATH_KEY)
&&
StringUtils.isNotBlank(config.getString(ServiceConfigKeys.TEMPLATE_CATALOGS_FULLY_QUALIFIED_PATH_KEY)))
{
+
try {
- flowTemplateCatalog = Optional.of(new
ObservingFSFlowEdgeTemplateCatalog(config, rwLock));
- } catch (IOException e) {
+ String flowTemplateCatalogClassName =
ConfigUtils.getString(this.config,
ServiceConfigKeys.TEMPLATE_CATALOGS_CLASS_KEY,
ObservingFSFlowEdgeTemplateCatalog.class.getCanonicalName());
+ flowTemplateCatalog = Optional.of(
+ (UpdatableFSFlowTemplateCatalog)
ConstructorUtils.invokeConstructor(Class.forName(new
ClassAliasResolver<>(UpdatableFSFlowTemplateCatalog.class)
+ .resolve(flowTemplateCatalogClassName)), config, rwLock));
+ } catch (NoSuchMethodException | IllegalAccessException |
InvocationTargetException | InstantiationException | ClassNotFoundException e) {
throw new RuntimeException("Cannot instantiate " +
getClass().getName(), e);
}
} else {
@@ -163,7 +167,9 @@ public class MultiHopFlowCompiler extends
BaseFlowToJobSpecCompiler {
} catch (NoSuchMethodException | IllegalAccessException |
InvocationTargetException | InstantiationException | ClassNotFoundException e) {
throw new RuntimeException(e);
}
- this.serviceManager = new
ServiceManager(Lists.newArrayList(this.flowGraphMonitor,
flowTemplateCatalog.get()));
+ this.serviceManager = (flowTemplateCatalog.isPresent() &&
flowTemplateCatalog.get() instanceof ObservingFSFlowEdgeTemplateCatalog) ?
+ new ServiceManager(Lists.newArrayList(this.flowGraphMonitor,
flowTemplateCatalog.get())) : new
ServiceManager(Lists.newArrayList(this.flowGraphMonitor));
+
addShutdownHook();
//Start the git flow graph monitor
try {
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/FSPathAlterationFlowGraphListener.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/FSPathAlterationFlowGraphListener.java
index 1700d8805..1b770194a 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/FSPathAlterationFlowGraphListener.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/FSPathAlterationFlowGraphListener.java
@@ -26,7 +26,7 @@ import com.google.common.base.Optional;
import lombok.extern.slf4j.Slf4j;
import org.apache.gobblin.service.modules.flow.MultiHopFlowCompiler;
-import
org.apache.gobblin.service.modules.template_catalog.FSFlowTemplateCatalog;
+import
org.apache.gobblin.service.modules.template_catalog.UpdatableFSFlowTemplateCatalog;
import org.apache.gobblin.util.filesystem.PathAlterationListener;
import org.apache.gobblin.util.filesystem.PathAlterationObserver;
@@ -41,10 +41,14 @@ import
org.apache.gobblin.util.filesystem.PathAlterationObserver;
public class FSPathAlterationFlowGraphListener implements
PathAlterationListener {
private final MultiHopFlowCompiler compiler;
private final BaseFlowGraphHelper flowGraphHelper;
+ private Optional<UpdatableFSFlowTemplateCatalog> flowTemplateCatalog;
+ private final boolean shouldMonitorTemplateCatalog;
- public FSPathAlterationFlowGraphListener(Optional<? extends
FSFlowTemplateCatalog> flowTemplateCatalog,
- MultiHopFlowCompiler compiler, String baseDirectory, BaseFlowGraphHelper
flowGraphHelper) {
+ public
FSPathAlterationFlowGraphListener(Optional<UpdatableFSFlowTemplateCatalog>
flowTemplateCatalog,
+ MultiHopFlowCompiler compiler, String baseDirectory, BaseFlowGraphHelper
flowGraphHelper, boolean shouldMonitorTemplateCatalog) {
this.flowGraphHelper = flowGraphHelper;
+ this.flowTemplateCatalog = flowTemplateCatalog;
+ this.shouldMonitorTemplateCatalog = shouldMonitorTemplateCatalog;
File graphDir = new File(baseDirectory);
// Populate the flowgraph with any existing files
if (!graphDir.exists()) {
@@ -88,6 +92,10 @@ public class FSPathAlterationFlowGraphListener implements
PathAlterationListener
@Override
public void onCheckDetectedChange() {
log.info("Detecting change in flowgraph files, reloading flowgraph");
+ if (this.shouldMonitorTemplateCatalog) {
+ // Clear template cache as templates are colocated with the flowgraph,
and thus could have been updated too
+ this.flowTemplateCatalog.get().clearTemplates();
+ }
FlowGraph newGraph = this.flowGraphHelper.generateFlowGraph();
if (newGraph != null) {
this.compiler.setFlowGraph(newGraph);
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template_catalog/ObservingFSFlowEdgeTemplateCatalog.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template_catalog/ObservingFSFlowEdgeTemplateCatalog.java
index 5c47388bf..baf413a91 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template_catalog/ObservingFSFlowEdgeTemplateCatalog.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template_catalog/ObservingFSFlowEdgeTemplateCatalog.java
@@ -19,22 +19,13 @@ package org.apache.gobblin.service.modules.template_catalog;
import java.io.IOException;
import java.net.URI;
-import java.net.URISyntaxException;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReadWriteLock;
-import org.apache.hadoop.fs.Path;
-
import com.typesafe.config.Config;
import lombok.extern.slf4j.Slf4j;
-import org.apache.gobblin.runtime.api.JobTemplate;
-import org.apache.gobblin.runtime.api.SpecNotFoundException;
-import org.apache.gobblin.service.modules.template.FlowTemplate;
import org.apache.gobblin.util.filesystem.PathAlterationListener;
import org.apache.gobblin.util.filesystem.PathAlterationListenerAdaptor;
@@ -45,42 +36,12 @@ import
org.apache.gobblin.util.filesystem.PathAlterationListenerAdaptor;
* is detected so that the next call to {@link #getFlowTemplate(URI)} will use
the updated files.
*/
@Slf4j
-public class ObservingFSFlowEdgeTemplateCatalog extends FSFlowTemplateCatalog {
- private Map<URI, FlowTemplate> flowTemplateMap = new ConcurrentHashMap<>();
- private Map<URI, List<JobTemplate>> jobTemplateMap = new
ConcurrentHashMap<>();
- private ReadWriteLock rwLock;
+public class ObservingFSFlowEdgeTemplateCatalog extends
UpdatableFSFlowTemplateCatalog {
private AtomicBoolean shouldRefreshFlowGraph = new AtomicBoolean(false);
public ObservingFSFlowEdgeTemplateCatalog(Config sysConfig, ReadWriteLock
rwLock) throws IOException {
- super(sysConfig);
- this.rwLock = rwLock;
- }
-
- @Override
- public FlowTemplate getFlowTemplate(URI flowTemplateDirURI)
- throws SpecNotFoundException, JobTemplate.TemplateException,
IOException, URISyntaxException {
- FlowTemplate flowTemplate =
flowTemplateMap.getOrDefault(flowTemplateDirURI, null);
-
- if (flowTemplate == null) {
- flowTemplate = super.getFlowTemplate(flowTemplateDirURI);
- flowTemplateMap.put(flowTemplateDirURI, flowTemplate);
- }
-
- return flowTemplate;
- }
-
- @Override
- public List<JobTemplate> getJobTemplatesForFlow(URI flowTemplateDirURI)
- throws IOException, SpecNotFoundException,
JobTemplate.TemplateException, URISyntaxException {
- List<JobTemplate> jobTemplates =
jobTemplateMap.getOrDefault(flowTemplateDirURI, null);
-
- if (jobTemplates == null) {
- jobTemplates = super.getJobTemplatesForFlow(flowTemplateDirURI);
- jobTemplateMap.put(flowTemplateDirURI, jobTemplates);
- }
-
- return jobTemplates;
+ super(sysConfig, rwLock);
}
@Override
@@ -100,32 +61,15 @@ public class ObservingFSFlowEdgeTemplateCatalog extends
FSFlowTemplateCatalog {
return this.shouldRefreshFlowGraph.getAndSet(value);
}
- /**
- * Clear cached templates so they will be reloaded next time {@link
#getFlowTemplate(URI)} is called.
- * Also refresh git flow graph in case any edges that failed to be added on
startup are successful now.
- */
- private void clearTemplates() {
- this.rwLock.writeLock().lock();
- log.info("Change detected, reloading flow templates.");
- flowTemplateMap.clear();
- jobTemplateMap.clear();
- getAndSetShouldRefreshFlowGraph(true);
- this.rwLock.writeLock().unlock();
- }
-
/**
* {@link org.apache.gobblin.util.filesystem.PathAlterationListener} that
clears flow/job template cache if a file is
* created or updated.
*/
private class FlowCatalogPathAlterationListener extends
PathAlterationListenerAdaptor {
@Override
- public void onFileCreate(Path path) {
- clearTemplates();
- }
-
- @Override
- public void onFileChange(Path path) {
+ public void onCheckDetectedChange() {
clearTemplates();
+ getAndSetShouldRefreshFlowGraph(true);
}
}
}
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template_catalog/ObservingFSFlowEdgeTemplateCatalog.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template_catalog/UpdatableFSFlowTemplateCatalog.java
similarity index 56%
copy from
gobblin-service/src/main/java/org/apache/gobblin/service/modules/template_catalog/ObservingFSFlowEdgeTemplateCatalog.java
copy to
gobblin-service/src/main/java/org/apache/gobblin/service/modules/template_catalog/UpdatableFSFlowTemplateCatalog.java
index 5c47388bf..2915c5530 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template_catalog/ObservingFSFlowEdgeTemplateCatalog.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template_catalog/UpdatableFSFlowTemplateCatalog.java
@@ -23,36 +23,26 @@ import java.net.URISyntaxException;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReadWriteLock;
-import org.apache.hadoop.fs.Path;
import com.typesafe.config.Config;
-import lombok.extern.slf4j.Slf4j;
-
import org.apache.gobblin.runtime.api.JobTemplate;
import org.apache.gobblin.runtime.api.SpecNotFoundException;
import org.apache.gobblin.service.modules.template.FlowTemplate;
-import org.apache.gobblin.util.filesystem.PathAlterationListener;
-import org.apache.gobblin.util.filesystem.PathAlterationListenerAdaptor;
-
/**
- * {@link FSFlowTemplateCatalog} that keeps a cache of flow and job templates.
It has a
- * {@link org.apache.gobblin.util.filesystem.PathAlterationListener} on the
root path, and clears the cache when a change
- * is detected so that the next call to {@link #getFlowTemplate(URI)} will use
the updated files.
+ * {@link FSFlowTemplateCatalog} that keeps a cache of flow and job templates.
It provides a public method clearTemplates()
+ * for other classes to invoke, so that other classes can reload the job
templates before they make a change. E.g. The
+ * {@link org.apache.gobblin.service.monitoring.FsFlowGraphMonitor} has a
configuration to clear the template cache before updating the flowgraph.
*/
-@Slf4j
-public class ObservingFSFlowEdgeTemplateCatalog extends FSFlowTemplateCatalog {
- private Map<URI, FlowTemplate> flowTemplateMap = new ConcurrentHashMap<>();
- private Map<URI, List<JobTemplate>> jobTemplateMap = new
ConcurrentHashMap<>();
- private ReadWriteLock rwLock;
-
- private AtomicBoolean shouldRefreshFlowGraph = new AtomicBoolean(false);
+public class UpdatableFSFlowTemplateCatalog extends FSFlowTemplateCatalog {
+ private final Map<URI, FlowTemplate> flowTemplateMap = new
ConcurrentHashMap<>();
+ private final Map<URI, List<JobTemplate>> jobTemplateMap = new
ConcurrentHashMap<>();
+ private final ReadWriteLock rwLock;
- public ObservingFSFlowEdgeTemplateCatalog(Config sysConfig, ReadWriteLock
rwLock) throws IOException {
+ public UpdatableFSFlowTemplateCatalog(Config sysConfig, ReadWriteLock
rwLock) throws IOException {
super(sysConfig);
this.rwLock = rwLock;
}
@@ -77,55 +67,21 @@ public class ObservingFSFlowEdgeTemplateCatalog extends
FSFlowTemplateCatalog {
if (jobTemplates == null) {
jobTemplates = super.getJobTemplatesForFlow(flowTemplateDirURI);
+ log.info("Loading flow template directly from {} and caching it.",
flowTemplateDirURI);
jobTemplateMap.put(flowTemplateDirURI, jobTemplates);
}
return jobTemplates;
}
- @Override
- protected PathAlterationListener getListener() {
- return new FlowCatalogPathAlterationListener();
- }
-
- @Override
- protected void startUp() throws IOException {
- if (this.pathAlterationDetector != null) {
- this.pathAlterationDetector.start();
- }
- }
-
- @Override
- public boolean getAndSetShouldRefreshFlowGraph(boolean value) {
- return this.shouldRefreshFlowGraph.getAndSet(value);
- }
-
/**
* Clear cached templates so they will be reloaded next time {@link
#getFlowTemplate(URI)} is called.
- * Also refresh git flow graph in case any edges that failed to be added on
startup are successful now.
*/
- private void clearTemplates() {
+ public void clearTemplates() {
this.rwLock.writeLock().lock();
- log.info("Change detected, reloading flow templates.");
+ log.info("Change detected, clearing flow template cache.");
flowTemplateMap.clear();
jobTemplateMap.clear();
- getAndSetShouldRefreshFlowGraph(true);
this.rwLock.writeLock().unlock();
}
-
- /**
- * {@link org.apache.gobblin.util.filesystem.PathAlterationListener} that
clears flow/job template cache if a file is
- * created or updated.
- */
- private class FlowCatalogPathAlterationListener extends
PathAlterationListenerAdaptor {
- @Override
- public void onFileCreate(Path path) {
- clearTemplates();
- }
-
- @Override
- public void onFileChange(Path path) {
- clearTemplates();
- }
- }
-}
+}
\ No newline at end of file
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/FsFlowGraphMonitor.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/FsFlowGraphMonitor.java
index bda777ae3..f23bf7b89 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/FsFlowGraphMonitor.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/FsFlowGraphMonitor.java
@@ -42,7 +42,7 @@ import
org.apache.gobblin.service.modules.flow.MultiHopFlowCompiler;
import org.apache.gobblin.service.modules.flowgraph.BaseFlowGraphHelper;
import
org.apache.gobblin.service.modules.flowgraph.FSPathAlterationFlowGraphListener;
import org.apache.gobblin.service.modules.flowgraph.FlowGraphMonitor;
-import
org.apache.gobblin.service.modules.template_catalog.FSFlowTemplateCatalog;
+import
org.apache.gobblin.service.modules.template_catalog.UpdatableFSFlowTemplateCatalog;
import org.apache.gobblin.util.ClassAliasResolver;
import org.apache.gobblin.util.ConfigUtils;
import org.apache.gobblin.util.filesystem.PathAlterationObserver;
@@ -52,6 +52,8 @@ import
org.apache.gobblin.util.filesystem.PathAlterationObserverScheduler;
@Slf4j
public class FsFlowGraphMonitor extends AbstractIdleService implements
FlowGraphMonitor {
public static final String FS_FLOWGRAPH_MONITOR_PREFIX =
"gobblin.service.fsFlowGraphMonitor";
+ public static final String MONITOR_TEMPLATE_CATALOG_CHANGES =
"monitorTemplateChanges";
+
private static final long DEFAULT_FLOWGRAPH_POLLING_INTERVAL = 60;
private static final String DEFAULT_FS_FLOWGRAPH_MONITOR_ABSOLUTE_DIR =
"/tmp/fsFlowgraph";
private static final String DEFAULT_FS_FLOWGRAPH_MONITOR_FLOWGRAPH_DIR =
"gobblin-flowgraph";
@@ -61,7 +63,8 @@ public class FsFlowGraphMonitor extends AbstractIdleService
implements FlowGraph
private final PathAlterationObserverScheduler pathAlterationDetector;
private final FSPathAlterationFlowGraphListener listener;
private final PathAlterationObserver observer;
- private final Path flowGraphPath;
+ private Path flowGraphPath;
+ private Path observedPath;
private final MultiHopFlowCompiler compiler;
private final CountDownLatch initComplete;
private static final Config DEFAULT_FALLBACK =
ConfigFactory.parseMap(ImmutableMap.<String, Object>builder()
@@ -69,16 +72,20 @@ public class FsFlowGraphMonitor extends AbstractIdleService
implements FlowGraph
.put(ConfigurationKeys.FLOWGRAPH_BASE_DIR,
DEFAULT_FS_FLOWGRAPH_MONITOR_FLOWGRAPH_DIR)
.put(ConfigurationKeys.FLOWGRAPH_POLLING_INTERVAL,
DEFAULT_FLOWGRAPH_POLLING_INTERVAL)
.put(ConfigurationKeys.FLOWGRAPH_JAVA_PROPS_EXTENSIONS,
ConfigurationKeys.DEFAULT_PROPERTIES_EXTENSIONS)
+ .put(MONITOR_TEMPLATE_CATALOG_CHANGES, false)
.put(ConfigurationKeys.FLOWGRAPH_HOCON_FILE_EXTENSIONS,
ConfigurationKeys.DEFAULT_CONF_EXTENSIONS).build());
- public FsFlowGraphMonitor(Config config, Optional<? extends
FSFlowTemplateCatalog> flowTemplateCatalog,
+ public FsFlowGraphMonitor(Config config,
Optional<UpdatableFSFlowTemplateCatalog> flowTemplateCatalog,
MultiHopFlowCompiler compiler, Map<URI, TopologySpec> topologySpecMap,
CountDownLatch initComplete, boolean instrumentationEnabled)
throws IOException {
Config configWithFallbacks =
config.getConfig(FS_FLOWGRAPH_MONITOR_PREFIX).withFallback(DEFAULT_FALLBACK);
this.pollingInterval =
TimeUnit.SECONDS.toMillis(configWithFallbacks.getLong(ConfigurationKeys.FLOWGRAPH_POLLING_INTERVAL));
this.flowGraphPath = new
Path(configWithFallbacks.getString(ConfigurationKeys.FLOWGRAPH_ABSOLUTE_DIR));
- this.observer = new PathAlterationObserver(flowGraphPath);
+ // If the FSFlowGraphMonitor is also monitoring the templates, assume that
they are colocated under the same parent folder
+ boolean shouldMonitorTemplateCatalog =
configWithFallbacks.getBoolean(MONITOR_TEMPLATE_CATALOG_CHANGES);
+ this.observedPath = shouldMonitorTemplateCatalog ?
this.flowGraphPath.getParent() : this.flowGraphPath;
+ this.observer = new PathAlterationObserver(observedPath);
try {
String helperClassName = ConfigUtils.getString(config,
ServiceConfigKeys.GOBBLIN_SERVICE_FLOWGRAPH_HELPER_KEY,
BaseFlowGraphHelper.class.getCanonicalName());
@@ -88,7 +95,7 @@ public class FsFlowGraphMonitor extends AbstractIdleService
implements FlowGraph
} catch (NoSuchMethodException | IllegalAccessException |
InvocationTargetException | InstantiationException | ClassNotFoundException e) {
throw new RuntimeException(e);
}
- this.listener = new FSPathAlterationFlowGraphListener(flowTemplateCatalog,
compiler, flowGraphPath.toString(), this.flowGraphHelper);
+ this.listener = new FSPathAlterationFlowGraphListener(flowTemplateCatalog,
compiler, flowGraphPath.toString(), this.flowGraphHelper,
shouldMonitorTemplateCatalog);
this.compiler = compiler;
this.initComplete = initComplete;
@@ -98,7 +105,7 @@ public class FsFlowGraphMonitor extends AbstractIdleService
implements FlowGraph
this.pathAlterationDetector = new
PathAlterationObserverScheduler(pollingInterval);
Optional<PathAlterationObserver> observerOptional =
Optional.fromNullable(observer);
this.pathAlterationDetector.addPathAlterationObserver(this.listener,
observerOptional,
- this.flowGraphPath);
+ this.observedPath);
}
}
@@ -116,7 +123,7 @@ public class FsFlowGraphMonitor extends AbstractIdleService
implements FlowGraph
} else if (isActive) {
if (this.pathAlterationDetector != null) {
log.info("Starting the " + getClass().getSimpleName());
- log.info("Polling flowgraph folder with interval {} ",
this.pollingInterval);
+ log.info("Polling folder {} with interval {} ", this.observedPath,
this.pollingInterval);
try {
this.pathAlterationDetector.start();
// Manually instantiate flowgraph when the monitor becomes active
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/template_catalog/UpdatableFSFFlowTemplateCatalogTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/template_catalog/UpdatableFSFFlowTemplateCatalogTest.java
new file mode 100644
index 000000000..4a98db887
--- /dev/null
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/template_catalog/UpdatableFSFFlowTemplateCatalogTest.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.template_catalog;
+
+import java.io.File;
+import java.net.URI;
+import java.nio.file.Path;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.apache.commons.io.FileUtils;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import com.google.common.io.Files;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.service.ServiceConfigKeys;
+import org.apache.gobblin.service.modules.template.FlowTemplate;
+
+
+@Slf4j
+public class UpdatableFSFFlowTemplateCatalogTest {
+
+ private File templateDir;
+ private Config templateCatalogCfg;
+
+ @BeforeClass
+ public void setUp() throws Exception {
+ URI flowTemplateCatalogUri =
this.getClass().getClassLoader().getResource("template_catalog").toURI();
+ this.templateDir = Files.createTempDir();
+ FileUtils.forceDeleteOnExit(templateDir);
+ FileUtils.copyDirectory(new File(flowTemplateCatalogUri.getPath()),
templateDir);
+ Properties properties = new Properties();
+
properties.put(ServiceConfigKeys.TEMPLATE_CATALOGS_FULLY_QUALIFIED_PATH_KEY,
templateDir.toURI().toString());
+ Config config = ConfigFactory.parseProperties(properties);
+ this.templateCatalogCfg =
config.withValue(ConfigurationKeys.JOB_CONFIG_FILE_GENERAL_PATH_KEY,
+
config.getValue(ServiceConfigKeys.TEMPLATE_CATALOGS_FULLY_QUALIFIED_PATH_KEY));
+ }
+
+ @Test
+ public void testModifyFlowTemplate() throws Exception {
+ UpdatableFSFlowTemplateCatalog catalog = new
UpdatableFSFlowTemplateCatalog(this.templateCatalogCfg, new
ReentrantReadWriteLock());
+
+ // Check cached flow template is returned
+ FlowTemplate flowTemplate1 = catalog.getFlowTemplate(new
URI(FSFlowTemplateCatalogTest.TEST_TEMPLATE_DIR_URI));
+ FlowTemplate flowTemplate2 = catalog.getFlowTemplate(new
URI(FSFlowTemplateCatalogTest.TEST_TEMPLATE_DIR_URI));
+ Assert.assertSame(flowTemplate1, flowTemplate2);
+
+ // Update a file flow catalog and check that the getFlowTemplate returns
the new value
+ Path flowConfPath = new File(new File(this.templateDir,
FSFlowTemplateCatalogTest.TEST_TEMPLATE_NAME), "flow.conf").toPath();
+ List<String> lines = java.nio.file.Files.readAllLines(flowConfPath);
+ for (int i = 0; i < lines.size(); i++) {
+ if
(lines.get(i).equals("gobblin.flow.edge.input.dataset.descriptor.0.format=avro"))
{
+ lines.set(i,
"gobblin.flow.edge.input.dataset.descriptor.0.format=any");
+ break;
+ }
+ }
+ java.nio.file.Files.write(flowConfPath, lines);
+ catalog.clearTemplates();
+ Assert.assertEquals(catalog.getFlowTemplate(new
URI(FSFlowTemplateCatalogTest.TEST_TEMPLATE_DIR_URI)).
+
getRawTemplateConfig().getString("gobblin.flow.edge.input.dataset.descriptor.0.format"),
"any");
+ }
+}
\ No newline at end of file
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/FsFlowGraphMonitorTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/FsFlowGraphMonitorTest.java
index 7e1a17414..1b3397756 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/FsFlowGraphMonitorTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/FsFlowGraphMonitorTest.java
@@ -34,6 +34,7 @@ import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.commons.io.FileUtils;
import org.apache.gobblin.config.ConfigBuilder;
@@ -47,7 +48,7 @@ import org.apache.gobblin.service.modules.flowgraph.DataNode;
import org.apache.gobblin.service.modules.flowgraph.FlowEdge;
import org.apache.gobblin.service.modules.flowgraph.FlowGraph;
import org.apache.gobblin.service.modules.flowgraph.FlowGraphConfigurationKeys;
-import
org.apache.gobblin.service.modules.template_catalog.FSFlowTemplateCatalog;
+import
org.apache.gobblin.service.modules.template_catalog.UpdatableFSFlowTemplateCatalog;
import org.apache.hadoop.fs.Path;
import org.eclipse.jgit.transport.RefSpec;
import org.slf4j.Logger;
@@ -60,8 +61,9 @@ import org.testng.annotations.Test;
public class FsFlowGraphMonitorTest {
private static final Logger logger =
LoggerFactory.getLogger(FsFlowGraphMonitorTest.class);
- private final File TEST_DIR = new File(FileUtils.getTempDirectory(),
"fsFlowGraphTestDir");
- private final File flowGraphDir = new File(TEST_DIR, "/gobblin-flowgraph");
+ private final File TEST_DIR = new File(FileUtils.getTempDirectory(),
"flowGraphTemplates");
+ private final File flowGraphTestDir = new File(TEST_DIR,
"fsFlowGraphTestDir");
+ private final File flowGraphDir = new File(flowGraphTestDir,
"gobblin-flowgraph");
private static final String NODE_1_FILE = "node1.properties";
private final File node1Dir = new File(FileUtils.getTempDirectory(),
"node1");
private final File node1File = new File(node1Dir, NODE_1_FILE);
@@ -70,14 +72,15 @@ public class FsFlowGraphMonitorTest {
private final File node2File = new File(node2Dir, NODE_2_FILE);
private final File edge1Dir = new File(node1Dir, "node2");
private final File edge1File = new File(edge1Dir, "edge1.properties");
- private final File sharedNodeFolder = new File(TEST_DIR, "nodes");
+ private final File sharedNodeFolder = new File(flowGraphTestDir, "nodes");
private RefSpec masterRefSpec = new RefSpec("master");
- private Optional<FSFlowTemplateCatalog> flowCatalog;
+ private Optional<UpdatableFSFlowTemplateCatalog> flowCatalog;
private Config config;
private AtomicReference<FlowGraph> flowGraph;
private FsFlowGraphMonitor flowGraphMonitor;
private Map<URI, TopologySpec> topologySpecMap;
+ private File flowTemplateCatalogFolder;
@BeforeClass
public void setUp() throws Exception {
@@ -89,21 +92,25 @@ public class FsFlowGraphMonitorTest {
this.config = ConfigBuilder.create()
.addPrimitive(FsFlowGraphMonitor.FS_FLOWGRAPH_MONITOR_PREFIX + "."
- + ConfigurationKeys.FLOWGRAPH_ABSOLUTE_DIR,
TEST_DIR.getAbsolutePath())
+ + ConfigurationKeys.FLOWGRAPH_ABSOLUTE_DIR,
flowGraphTestDir.getAbsolutePath())
.addPrimitive(FsFlowGraphMonitor.FS_FLOWGRAPH_MONITOR_PREFIX + "." +
ConfigurationKeys.FLOWGRAPH_BASE_DIR, "gobblin-flowgraph")
.addPrimitive(FsFlowGraphMonitor.FS_FLOWGRAPH_MONITOR_PREFIX + "." +
ConfigurationKeys.FLOWGRAPH_POLLING_INTERVAL, 1)
+ .addPrimitive(FsFlowGraphMonitor.FS_FLOWGRAPH_MONITOR_PREFIX + "." +
FsFlowGraphMonitor.MONITOR_TEMPLATE_CATALOG_CHANGES, true)
.build();
// Create a FSFlowTemplateCatalog instance
URI flowTemplateCatalogUri =
this.getClass().getClassLoader().getResource("template_catalog").toURI();
+ this.flowTemplateCatalogFolder = new File(TEST_DIR, "template_catalog");
+ this.flowTemplateCatalogFolder.mkdirs();
+ FileUtils.copyDirectory(new File(flowTemplateCatalogUri.getPath()),
this.flowTemplateCatalogFolder);
Properties properties = new Properties();
this.flowGraphDir.mkdirs();
-
properties.put(ServiceConfigKeys.TEMPLATE_CATALOGS_FULLY_QUALIFIED_PATH_KEY,
flowTemplateCatalogUri.toString());
+
properties.put(ServiceConfigKeys.TEMPLATE_CATALOGS_FULLY_QUALIFIED_PATH_KEY,
this.flowTemplateCatalogFolder.getAbsolutePath());
Config config = ConfigFactory.parseProperties(properties);
Config templateCatalogCfg = config
.withValue(ConfigurationKeys.JOB_CONFIG_FILE_GENERAL_PATH_KEY,
config.getValue(ServiceConfigKeys.TEMPLATE_CATALOGS_FULLY_QUALIFIED_PATH_KEY));
- this.flowCatalog = Optional.of(new
FSFlowTemplateCatalog(templateCatalogCfg));
+ this.flowCatalog = Optional.of(new
UpdatableFSFlowTemplateCatalog(templateCatalogCfg, new
ReentrantReadWriteLock(true)));
//Create a FlowGraph instance with defaults
this.flowGraph = new AtomicReference<>(new BaseFlowGraph());
@@ -227,6 +234,23 @@ public class FsFlowGraphMonitorTest {
}
@Test (dependsOnMethods = "testSharedFlowgraphHelper")
+ public void testUpdateOnlyTemplates() throws Exception {
+ Assert.assertEquals(this.flowGraph.get().getEdges("node1").size(), 1);
+
+ //If deleting all the templates, the cache of flow templates will be
cleared and the flowgraph will be unable to add edges on reload.
+ cleanUpDir(this.flowTemplateCatalogFolder.getAbsolutePath());
+ Thread.sleep(3000);
+ Assert.assertEquals(this.flowGraph.get().getEdges("node1").size(), 0);
+
+ URI flowTemplateCatalogUri =
this.getClass().getClassLoader().getResource("template_catalog").toURI();
+ // Adding the flowtemplates back will make the edges eligible to be added
again on reload.
+ FileUtils.copyDirectory(new File(flowTemplateCatalogUri.getPath()),
this.flowTemplateCatalogFolder);
+
+ Thread.sleep(3000);
+ Assert.assertEquals(this.flowGraph.get().getEdges("node1").size(), 1);
+ }
+
+ @Test (dependsOnMethods = "testUpdateOnlyTemplates")
public void testRemoveEdge() throws Exception {
//Node1 has 1 edge before delete
Collection<FlowEdge> edgeSet = this.flowGraph.get().getEdges("node1");