This is an automated email from the ASF dual-hosted git repository. suvasude pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push: new 9a5e222 [GOBBLIN-1084] Refresh flowgraph when templates are modified 9a5e222 is described below commit 9a5e222f87a0a44903e6c2c30aaec365cc579979 Author: Jack Moseley <jmose...@linkedin.com> AuthorDate: Tue Mar 17 18:18:34 2020 -0700 [GOBBLIN-1084] Refresh flowgraph when templates are modified Closes #2924 from jack-moseley/refresh-flowgraph --- .../gobblin/service/modules/core/GitFlowGraphMonitor.java | 6 ++++++ .../gobblin/service/modules/core/GitMonitoringService.java | 2 +- .../modules/template_catalog/FSFlowTemplateCatalog.java | 4 ++++ .../template_catalog/ObservingFSFlowEdgeTemplateCatalog.java | 10 ++++++++++ .../gobblin/service/modules/core/GitFlowGraphMonitorTest.java | 1 + 5 files changed, 22 insertions(+), 1 deletion(-) diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitFlowGraphMonitor.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitFlowGraphMonitor.java index 44dd01a..9c1d105 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitFlowGraphMonitor.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitFlowGraphMonitor.java @@ -25,6 +25,7 @@ import java.util.Collections; import java.util.List; import java.util.Map; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.locks.ReadWriteLock; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.fs.Path; @@ -127,6 +128,11 @@ public class GitFlowGraphMonitor extends GitMonitoringService { */ @Override void processGitConfigChanges() throws GitAPIException, IOException { + if (flowTemplateCatalog.isPresent() && flowTemplateCatalog.get().getAndSetShouldRefreshFlowGraph(false)) { + log.info("Change to template catalog detected, refreshing FlowGraph"); + this.gitRepo.initRepository(); + } + List<DiffEntry> changes = this.gitRepo.getChanges(); Collections.sort(changes, (o1, o2) -> { Integer o1Depth = (o1.getNewPath() != null) ? (new Path(o1.getNewPath())).depth() : (new Path(o1.getOldPath())).depth(); diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitMonitoringService.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitMonitoringService.java index d577bdd..d455ad8 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitMonitoringService.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitMonitoringService.java @@ -304,7 +304,7 @@ public abstract class GitMonitoringService extends AbstractIdleService { * @throws GitAPIException * @throws IOException */ - private void initRepository() throws GitAPIException, IOException { + void initRepository() throws GitAPIException, IOException { File repoDirFile = new File(this.repoDir); try { diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template_catalog/FSFlowTemplateCatalog.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template_catalog/FSFlowTemplateCatalog.java index 24e3ff2..e309026 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template_catalog/FSFlowTemplateCatalog.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template_catalog/FSFlowTemplateCatalog.java @@ -185,4 +185,8 @@ public class FSFlowTemplateCatalog extends FSJobCatalog implements FlowCatalogWi return true; } + + public boolean getAndSetShouldRefreshFlowGraph(boolean value) { + return false; + } } diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template_catalog/ObservingFSFlowEdgeTemplateCatalog.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template_catalog/ObservingFSFlowEdgeTemplateCatalog.java index 9b68024..5c47388 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template_catalog/ObservingFSFlowEdgeTemplateCatalog.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template_catalog/ObservingFSFlowEdgeTemplateCatalog.java @@ -23,6 +23,7 @@ import java.net.URISyntaxException; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.ReadWriteLock; import org.apache.hadoop.fs.Path; @@ -49,6 +50,8 @@ public class ObservingFSFlowEdgeTemplateCatalog extends FSFlowTemplateCatalog { private Map<URI, List<JobTemplate>> jobTemplateMap = new ConcurrentHashMap<>(); private ReadWriteLock rwLock; + private AtomicBoolean shouldRefreshFlowGraph = new AtomicBoolean(false); + public ObservingFSFlowEdgeTemplateCatalog(Config sysConfig, ReadWriteLock rwLock) throws IOException { super(sysConfig); this.rwLock = rwLock; @@ -92,14 +95,21 @@ public class ObservingFSFlowEdgeTemplateCatalog extends FSFlowTemplateCatalog { } } + @Override + public boolean getAndSetShouldRefreshFlowGraph(boolean value) { + return this.shouldRefreshFlowGraph.getAndSet(value); + } + /** * Clear cached templates so they will be reloaded next time {@link #getFlowTemplate(URI)} is called. + * Also refresh git flow graph in case any edges that failed to be added on startup are successful now. */ private void clearTemplates() { this.rwLock.writeLock().lock(); log.info("Change detected, reloading flow templates."); flowTemplateMap.clear(); jobTemplateMap.clear(); + getAndSetShouldRefreshFlowGraph(true); this.rwLock.writeLock().unlock(); } diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GitFlowGraphMonitorTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GitFlowGraphMonitorTest.java index c07b56e..e7a6863 100644 --- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GitFlowGraphMonitorTest.java +++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GitFlowGraphMonitorTest.java @@ -26,6 +26,7 @@ import java.util.Properties; import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; +import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.commons.io.FileUtils; import org.apache.commons.lang3.SystemUtils;