This is an automated email from the ASF dual-hosted git repository.
zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new eaa9cc7c4 [GOBBLIN-1678] Refactor git flowgraph component to be
extensible (#3536)
eaa9cc7c4 is described below
commit eaa9cc7c411ae48dbfb29000cd38acacd4140a9c
Author: William Lo <[email protected]>
AuthorDate: Wed Aug 31 11:55:06 2022 -0700
[GOBBLIN-1678] Refactor git flowgraph component to be extensible (#3536)
* Refactor git flowgraph component to be extensible
* Move files to appropriate modules
* Cleanup and add javadocs
* Cleanup, add missing javadocs
* Address review and import order
* Fix findbugs
* Use java sort instead of collections
---
.../gobblin/configuration/ConfigurationKeys.java | 8 +
.../apache/gobblin/service/ServiceConfigKeys.java | 1 +
.../modules/core/GobblinServiceGuiceModule.java | 1 +
.../modules/core/GobblinServiceManager.java | 1 +
.../service/modules/flow/MultiHopFlowCompiler.java | 19 +-
.../BaseFlowGraphListener.java} | 205 ++++++---------------
.../modules/flowgraph/FlowGraphMonitor.java | 34 ++++
.../GitConfigListener.java} | 96 ++++------
.../service/monitoring/GitConfigMonitor.java | 83 +++++++++
.../service/monitoring/GitDiffListener.java | 31 ++++
.../service/monitoring/GitFlowGraphListener.java | 77 ++++++++
.../service/monitoring/GitFlowGraphMonitor.java | 137 ++++++++++++++
.../core => monitoring}/GitMonitoringService.java | 73 +++-----
.../gobblin/service/GobblinServiceManagerTest.java | 2 +-
.../modules/flow/MultiHopFlowCompilerTest.java | 2 +-
.../core => monitoring}/GitConfigMonitorTest.java | 2 +-
.../GitFlowGraphMonitorTest.java | 2 +-
.../util/filesystem/PathAlterationObserver.java | 2 +-
18 files changed, 520 insertions(+), 256 deletions(-)
diff --git
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
index ec85883ae..464a575ba 100644
---
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
+++
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
@@ -1015,6 +1015,14 @@ public class ConfigurationKeys {
public static final String DATASET_COMBINE_KEY =
"gobblin.flow.dataset.combine";
public static final String WHITELISTED_EDGE_IDS =
"gobblin.flow.whitelistedEdgeIds";
public static final String GOBBLIN_OUTPUT_JOB_LEVEL_METRICS =
"gobblin.job.outputJobLevelMetrics";
+
+ /**
+ * Configuration properties related to flowGraphs
+ */
+
+ public static final String JAVA_PROPS_EXTENSIONS = "javaPropsExtensions";
+ public static final String HOCON_FILE_EXTENSIONS = "hoconFileExtensions";
+
/***
* Configuration properties related to TopologySpec Store
*/
diff --git
a/gobblin-api/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java
b/gobblin-api/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java
index b4f04acd9..c1536e037 100644
---
a/gobblin-api/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java
+++
b/gobblin-api/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java
@@ -47,6 +47,7 @@ public class ServiceConfigKeys {
public static final String HELIX_INSTANCE_NAME_OPTION_NAME =
"helix_instance_name";
public static final String HELIX_INSTANCE_NAME_KEY = GOBBLIN_SERVICE_PREFIX
+ "helixInstanceName";
public static final String GOBBLIN_SERVICE_FLOWSPEC = GOBBLIN_SERVICE_PREFIX
+ "flowSpec";
+ public static final String GOBBLIN_SERVICE_FLOWGRAPH_CLASS_KEY =
GOBBLIN_SERVICE_PREFIX + "flowGraph.class";
// Helix message sub types for FlowSpec
public static final String HELIX_FLOWSPEC_ADD = "FLOWSPEC_ADD";
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceGuiceModule.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceGuiceModule.java
index efb03339c..68170a129 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceGuiceModule.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceGuiceModule.java
@@ -20,6 +20,7 @@ package org.apache.gobblin.service.modules.core;
import java.util.Objects;
import org.apache.gobblin.service.modules.orchestration.UserQuotaManager;
+import org.apache.gobblin.service.monitoring.GitConfigMonitor;
import org.apache.helix.HelixManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java
index d35eef64b..76f568495 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java
@@ -32,6 +32,7 @@ import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
import org.apache.commons.lang3.ObjectUtils;
import org.apache.gobblin.service.modules.orchestration.UserQuotaManager;
+import org.apache.gobblin.service.monitoring.GitConfigMonitor;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java
index d76b9123a..3e6eb13dc 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java
@@ -33,6 +33,7 @@ import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.reflect.ConstructorUtils;
+import org.apache.gobblin.service.modules.flowgraph.FlowGraphMonitor;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
@@ -59,7 +60,7 @@ import org.apache.gobblin.runtime.api.Spec;
import org.apache.gobblin.runtime.api.SpecExecutor;
import org.apache.gobblin.runtime.api.SpecNotFoundException;
import org.apache.gobblin.service.ServiceConfigKeys;
-import org.apache.gobblin.service.modules.core.GitFlowGraphMonitor;
+import org.apache.gobblin.service.monitoring.GitFlowGraphMonitor;
import org.apache.gobblin.service.modules.flowgraph.BaseFlowGraph;
import org.apache.gobblin.service.modules.flowgraph.Dag;
import org.apache.gobblin.service.modules.flowgraph.DataNode;
@@ -87,7 +88,7 @@ public class MultiHopFlowCompiler extends
BaseFlowToJobSpecCompiler {
@Getter
private CountDownLatch initComplete = new CountDownLatch(1);
- private GitFlowGraphMonitor gitFlowGraphMonitor;
+ private FlowGraphMonitor flowGraphMonitor;
private ReadWriteLock rwLock = new ReentrantReadWriteLock(true);
@@ -148,8 +149,14 @@ public class MultiHopFlowCompiler extends
BaseFlowToJobSpecCompiler {
throw new RuntimeException(e);
}
- this.gitFlowGraphMonitor = new GitFlowGraphMonitor(gitFlowGraphConfig,
flowTemplateCatalog, this.flowGraph, this.topologySpecMap,
this.getInitComplete());
- this.serviceManager = new
ServiceManager(Lists.newArrayList(this.gitFlowGraphMonitor,
flowTemplateCatalog.get()));
+ try {
+ String flowGraphMonitorClassName = ConfigUtils.getString(this.config,
ServiceConfigKeys.GOBBLIN_SERVICE_FLOWGRAPH_CLASS_KEY,
GitFlowGraphMonitor.class.getCanonicalName());
+ this.flowGraphMonitor = (FlowGraphMonitor)
ConstructorUtils.invokeConstructor(Class.forName(new
ClassAliasResolver<>(FlowGraphMonitor.class).resolve(
+ flowGraphMonitorClassName)), gitFlowGraphConfig, flowTemplateCatalog,
this.flowGraph, this.topologySpecMap, this.getInitComplete());
+ } catch (NoSuchMethodException | IllegalAccessException |
InvocationTargetException | InstantiationException | ClassNotFoundException e) {
+ throw new RuntimeException(e);
+ }
+ this.serviceManager = new
ServiceManager(Lists.newArrayList(this.flowGraphMonitor,
flowTemplateCatalog.get()));
addShutdownHook();
//Start the git flow graph monitor
try {
@@ -175,8 +182,8 @@ public class MultiHopFlowCompiler extends
BaseFlowToJobSpecCompiler {
@Override
public void setActive(boolean active) {
super.setActive(active);
- if (this.gitFlowGraphMonitor != null) {
- this.gitFlowGraphMonitor.setActive(active);
+ if (this.flowGraphMonitor != null) {
+ this.flowGraphMonitor.setActive(active);
}
}
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitFlowGraphMonitor.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowGraphListener.java
similarity index 58%
rename from
gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitFlowGraphMonitor.java
rename to
gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowGraphListener.java
index 6df5b99c9..fa249c4cd 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitFlowGraphMonitor.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowGraphListener.java
@@ -15,172 +15,89 @@
* limitations under the License.
*/
-package org.apache.gobblin.service.modules.core;
+package org.apache.gobblin.service.modules.flowgraph;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.List;
import java.util.Map;
-import java.util.concurrent.CountDownLatch;
+import java.util.Set;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.eclipse.jgit.api.errors.GitAPIException;
-import org.eclipse.jgit.diff.DiffEntry;
+import lombok.extern.slf4j.Slf4j;
import com.google.common.base.Joiner;
import com.google.common.base.Optional;
-import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Sets;
import com.google.common.io.Files;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import com.typesafe.config.ConfigValueFactory;
-import lombok.extern.slf4j.Slf4j;
-
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.runtime.api.SpecExecutor;
import org.apache.gobblin.runtime.api.TopologySpec;
-import org.apache.gobblin.service.modules.flowgraph.DataNode;
-import org.apache.gobblin.service.modules.flowgraph.FlowEdge;
-import org.apache.gobblin.service.modules.flowgraph.FlowEdgeFactory;
-import org.apache.gobblin.service.modules.flowgraph.FlowGraph;
-import org.apache.gobblin.service.modules.flowgraph.FlowGraphConfigurationKeys;
import
org.apache.gobblin.service.modules.template_catalog.FSFlowTemplateCatalog;
import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.util.PullFileLoader;
import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
+
/**
- * Service that monitors for changes to {@link
org.apache.gobblin.service.modules.flowgraph.FlowGraph} from a git repository.
- * The git repository must have an inital commit that has no files since that
is used as a base for getting
- * the change list.
- * The {@link DataNode}s and {@link FlowEdge}s in FlowGraph need to be
organized with the following directory structure on git:
- * <root_flowGraph_dir>/<nodeName>/<nodeName>.properties
- * <root_flowGraph_dir>/<nodeName1>/<nodeName2>/<edgeName>.properties
+ * Provides the common set of functionalities needed by listeners of {@link
FlowGraphMonitor} to read changes in files and
+ * apply them to a {@link FlowGraph}
+ * Assumes that the directory structure between flowgraphs configuration files
are the same.
*/
@Slf4j
-public class GitFlowGraphMonitor extends GitMonitoringService {
- public static final String GIT_FLOWGRAPH_MONITOR_PREFIX =
"gobblin.service.gitFlowGraphMonitor";
-
- private static final String PROPERTIES_EXTENSIONS = "properties";
- private static final String CONF_EXTENSIONS = "conf";
+public abstract class BaseFlowGraphListener {
+ protected FlowGraph flowGraph;
+ protected static final int NODE_FILE_DEPTH = 3;
+ protected static final int EDGE_FILE_DEPTH = 4;
private static final String FLOW_EDGE_LABEL_JOINER_CHAR = "_";
- private static final String DEFAULT_GIT_FLOWGRAPH_MONITOR_REPO_DIR =
"git-flowgraph";
- private static final String DEFAULT_GIT_FLOWGRAPH_MONITOR_FLOWGRAPH_DIR =
"gobblin-flowgraph";
- private static final String DEFAULT_GIT_FLOWGRAPH_MONITOR_BRANCH_NAME =
"master";
-
- private static final int NODE_FILE_DEPTH = 3;
- private static final int EDGE_FILE_DEPTH = 4;
- private static final int DEFAULT_GIT_FLOWGRAPH_MONITOR_POLLING_INTERVAL = 60;
- private static final Config DEFAULT_FALLBACK =
- ConfigFactory.parseMap(ImmutableMap.<String, Object>builder()
- .put(ConfigurationKeys.GIT_MONITOR_REPO_DIR,
DEFAULT_GIT_FLOWGRAPH_MONITOR_REPO_DIR)
- .put(ConfigurationKeys.GIT_MONITOR_CONFIG_BASE_DIR,
DEFAULT_GIT_FLOWGRAPH_MONITOR_FLOWGRAPH_DIR)
- .put(ConfigurationKeys.GIT_MONITOR_BRANCH_NAME,
DEFAULT_GIT_FLOWGRAPH_MONITOR_BRANCH_NAME)
- .put(ConfigurationKeys.GIT_MONITOR_POLLING_INTERVAL,
DEFAULT_GIT_FLOWGRAPH_MONITOR_POLLING_INTERVAL)
- .put(JAVA_PROPS_EXTENSIONS, PROPERTIES_EXTENSIONS)
- .put(HOCON_FILE_EXTENSIONS, CONF_EXTENSIONS)
- .put(SHOULD_CHECKPOINT_HASHES, false)
- .build());
+ final String baseDirectory;
+ private final Config emptyConfig = ConfigFactory.empty();
private Optional<? extends FSFlowTemplateCatalog> flowTemplateCatalog;
- private FlowGraph flowGraph;
private final Map<URI, TopologySpec> topologySpecMap;
- private final Config emptyConfig = ConfigFactory.empty();
- private final CountDownLatch initComplete;
- public GitFlowGraphMonitor(Config config, Optional<? extends
FSFlowTemplateCatalog> flowTemplateCatalog,
- FlowGraph graph, Map<URI, TopologySpec> topologySpecMap, CountDownLatch
initComplete) {
-
super(config.getConfig(GIT_FLOWGRAPH_MONITOR_PREFIX).withFallback(DEFAULT_FALLBACK));
+ final String flowGraphFolderName;
+ final PullFileLoader pullFileLoader;
+ final Set<String> javaPropsExtensions;
+ final Set<String> hoconFileExtensions;
+
+ public BaseFlowGraphListener(Optional<? extends FSFlowTemplateCatalog>
flowTemplateCatalog,
+ FlowGraph graph, Map<URI, TopologySpec> topologySpecMap, String
baseDirectory, String flowGraphFolderName,
+ String javaPropsExtentions, String hoconFileExtensions) {
this.flowTemplateCatalog = flowTemplateCatalog;
this.flowGraph = graph;
this.topologySpecMap = topologySpecMap;
- this.initComplete = initComplete;
- }
-
- /**
- * Determine if the service should poll Git. Current behavior is both master
and slave(s) will poll Git for
- * changes to {@link FlowGraph}.
- */
- @Override
- public boolean shouldPollGit() {
- return this.isActive;
- }
-
- /**
- * Sort the changes in a commit so that changes to node files appear before
changes to edge files. This is done so that
- * node related changes are applied to the FlowGraph before edge related
changes. An example where the order matters
- * is the case when a commit adds a new node n2 as well as adds an edge from
an existing node n1 to n2. To ensure that the
- * addition of edge n1->n2 is successful, node n2 must exist in the graph
and so needs to be added first. For deletions,
- * the order does not matter and ordering the changes in the commit will
result in the same FlowGraph state as if the changes
- * were unordered. In other words, deletion of a node deletes all its
incident edges from the FlowGraph. So processing an
- * edge deletion later results in a no-op. Note that node and edge files do
not change depth in case of modifications.
- *
- * If there are multiple commits between successive polls to Git, the
re-ordering of changes across commits should not
- * affect the final state of the FlowGraph. This is because, the order of
changes for a given file type (i.e. node or edge)
- * is preserved.
- */
- @Override
- void processGitConfigChanges() throws GitAPIException, IOException {
- if (flowTemplateCatalog.isPresent() &&
flowTemplateCatalog.get().getAndSetShouldRefreshFlowGraph(false)) {
- log.info("Change to template catalog detected, refreshing FlowGraph");
- this.gitRepo.initRepository();
+ this.baseDirectory = baseDirectory;
+ this.flowGraphFolderName = flowGraphFolderName;
+ Path folderPath = new Path(baseDirectory, this.flowGraphFolderName);
+ this.javaPropsExtensions = Sets.newHashSet(javaPropsExtentions.split(","));
+ this.hoconFileExtensions = Sets.newHashSet(hoconFileExtensions.split(","));
+ try {
+ this.pullFileLoader = new PullFileLoader(folderPath,
+ FileSystem.get(URI.create(ConfigurationKeys.LOCAL_FS_URI), new
Configuration()),
+ this.javaPropsExtensions, this.hoconFileExtensions);
+ } catch (IOException e) {
+ throw new RuntimeException("Could not create pull file loader", e);
}
-
- List<DiffEntry> changes = this.gitRepo.getChanges();
- Collections.sort(changes, (o1, o2) -> {
- Integer o1Depth = (o1.getNewPath() != null) ? (new
Path(o1.getNewPath())).depth() : (new Path(o1.getOldPath())).depth();
- Integer o2Depth = (o2.getNewPath() != null) ? (new
Path(o2.getNewPath())).depth() : (new Path(o2.getOldPath())).depth();
- return o1Depth.compareTo(o2Depth);
- });
- processGitConfigChangesHelper(changes);
- //Decrements the latch count. The countdown latch is initialized to 1. So
after the first time the latch is decremented,
- // the following operation should be a no-op.
- this.initComplete.countDown();
}
-
- /**
- * Add an element (i.e., a {@link DataNode}, or a {@link FlowEdge} to
- * the {@link FlowGraph} for an added, updated or modified node or edge file.
- * @param change
- */
- @Override
- public void addChange(DiffEntry change) {
- Path path = new Path(change.getNewPath());
- if (path.depth() == NODE_FILE_DEPTH) {
- addDataNode(change);
- } else if (path.depth() == EDGE_FILE_DEPTH) {
- addFlowEdge(change);
- }
- }
-
- /**
- * Remove an element (i.e. either a {@link DataNode} or a {@link FlowEdge}
from the {@link FlowGraph} for
- * a renamed or deleted {@link DataNode} or {@link FlowEdge} file.
- * @param change
- */
- @Override
- public void removeChange(DiffEntry change) {
- Path path = new Path(change.getOldPath());
- if (path.depth() == NODE_FILE_DEPTH) {
- removeDataNode(change);
- } else if (path.depth() == EDGE_FILE_DEPTH) {
- removeFlowEdge(change);
- }
- }
-
/**
* Add a {@link DataNode} to the {@link FlowGraph}. The method uses the
{@link FlowGraphConfigurationKeys#DATA_NODE_CLASS} config
* to instantiate a {@link DataNode} from the node config file.
- * @param change
+ * @param path of node to add
*/
- private void addDataNode(DiffEntry change) {
- if (checkFilePath(change.getNewPath(), NODE_FILE_DEPTH)) {
- Path nodeFilePath = new Path(this.repositoryDir, change.getNewPath());
+ protected void addDataNode(String path) {
+ if (checkFilePath(path, NODE_FILE_DEPTH)) {
+ Path nodeFilePath = new Path(this.baseDirectory, path);
try {
Config config = loadNodeFileWithOverrides(nodeFilePath);
Class dataNodeClass = Class.forName(ConfigUtils.getString(config,
FlowGraphConfigurationKeys.DATA_NODE_CLASS,
@@ -192,7 +109,7 @@ public class GitFlowGraphMonitor extends
GitMonitoringService {
log.info("Added Datanode {} to FlowGraph", dataNode.getId());
}
} catch (Exception e) {
- log.warn("Could not add DataNode defined in {} due to exception {}",
change.getNewPath(), e);
+ log.warn("Could not add DataNode defined in {} due to exception {}",
path, e);
}
}
}
@@ -200,11 +117,11 @@ public class GitFlowGraphMonitor extends
GitMonitoringService {
/**
* Remove a {@link DataNode} from the {@link FlowGraph}. The method extracts
the nodeId of the
* {@link DataNode} from the node config file and uses it to delete the
associated {@link DataNode}.
- * @param change
+ * @param path of node to delete
*/
- private void removeDataNode(DiffEntry change) {
- if (checkFilePath(change.getOldPath(), NODE_FILE_DEPTH)) {
- Path nodeFilePath = new Path(this.repositoryDir, change.getOldPath());
+ protected void removeDataNode(String path) {
+ if (checkFilePath(path, NODE_FILE_DEPTH)) {
+ Path nodeFilePath = new Path(this.baseDirectory, path);
Config config = getNodeConfigWithOverrides(ConfigFactory.empty(),
nodeFilePath);
String nodeId =
config.getString(FlowGraphConfigurationKeys.DATA_NODE_ID_KEY);
if (!this.flowGraph.deleteDataNode(nodeId)) {
@@ -218,11 +135,11 @@ public class GitFlowGraphMonitor extends
GitMonitoringService {
/**
* Add a {@link FlowEdge} to the {@link FlowGraph}. The method uses the
{@link FlowEdgeFactory} instance
* provided by the {@link FlowGraph} to build a {@link FlowEdge} from the
edge config file.
- * @param change
+ * @param path of edge to add
*/
- private void addFlowEdge(DiffEntry change) {
- if (checkFilePath(change.getNewPath(), EDGE_FILE_DEPTH)) {
- Path edgeFilePath = new Path(this.repositoryDir, change.getNewPath());
+ protected void addFlowEdge(String path) {
+ if (checkFilePath(path, EDGE_FILE_DEPTH)) {
+ Path edgeFilePath = new Path(this.baseDirectory, path);
try {
Config edgeConfig = loadEdgeFileWithOverrides(edgeFilePath);
List<SpecExecutor> specExecutors = getSpecExecutors(edgeConfig);
@@ -237,10 +154,10 @@ public class GitFlowGraphMonitor extends
GitMonitoringService {
log.info("Added edge {} to FlowGraph", edge.getId());
}
} else {
- log.warn("Could not add edge defined in {} to FlowGraph as
FlowTemplateCatalog is absent", change.getNewPath());
+ log.warn("Could not add edge defined in {} to FlowGraph as
FlowTemplateCatalog is absent", path);
}
} catch (Exception e) {
- log.warn("Could not add edge defined in {} due to exception {}",
change.getNewPath(), e.getMessage());
+ log.warn("Could not add edge defined in {} due to exception {}", path,
e.getMessage());
}
}
}
@@ -249,11 +166,11 @@ public class GitFlowGraphMonitor extends
GitMonitoringService {
* Remove a {@link FlowEdge} from the {@link FlowGraph}. The method uses
{@link FlowEdgeFactory}
* to construct the edgeId of the {@link FlowEdge} from the config file and
uses it to delete the associated
* {@link FlowEdge}.
- * @param change
+ * @param path of edge to delete
*/
- private void removeFlowEdge(DiffEntry change) {
- if (checkFilePath(change.getOldPath(), EDGE_FILE_DEPTH)) {
- Path edgeFilePath = new Path(this.repositoryDir, change.getOldPath());
+ protected void removeFlowEdge(String path) {
+ if (checkFilePath(path, EDGE_FILE_DEPTH)) {
+ Path edgeFilePath = new Path(this.baseDirectory, path);
try {
Config config = getEdgeConfigWithOverrides(ConfigFactory.empty(),
edgeFilePath);
String edgeId =
config.getString(FlowGraphConfigurationKeys.FLOW_EDGE_ID_KEY);
@@ -303,7 +220,7 @@ public class GitFlowGraphMonitor extends
GitMonitoringService {
for (int i = 0; i < depth - 1; i++) {
path = path.getParent();
}
- if (!path.getName().equals(folderName)) {
+ if (!path.getName().equals(flowGraphFolderName)) {
return false;
}
return true;
@@ -315,7 +232,7 @@ public class GitFlowGraphMonitor extends
GitMonitoringService {
* @param nodeFilePath path of the node file
* @return config with overridden data.node.id
*/
- private Config getNodeConfigWithOverrides(Config nodeConfig, Path
nodeFilePath) {
+ protected Config getNodeConfigWithOverrides(Config nodeConfig, Path
nodeFilePath) {
String nodeId = nodeFilePath.getParent().getName();
return nodeConfig.withValue(FlowGraphConfigurationKeys.DATA_NODE_ID_KEY,
ConfigValueFactory.fromAnyRef(nodeId));
}
@@ -342,8 +259,7 @@ public class GitFlowGraphMonitor extends
GitMonitoringService {
* @param edgeConfig containing the logical names of SpecExecutors for this
edge.
* @return a {@link List<SpecExecutor>}s for this edge.
*/
- private List<SpecExecutor> getSpecExecutors(Config edgeConfig)
- throws URISyntaxException {
+ private List<SpecExecutor> getSpecExecutors(Config edgeConfig) throws
URISyntaxException {
//Get the logical names of SpecExecutors where the FlowEdge can be
executed.
List<String> specExecutorNames = ConfigUtils.getStringList(edgeConfig,
FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY);
//Load all the SpecExecutor configurations for this FlowEdge from the
SpecExecutor Catalog.
@@ -361,18 +277,19 @@ public class GitFlowGraphMonitor extends
GitMonitoringService {
* @return the configuration object
* @throws IOException
*/
- private Config loadNodeFileWithOverrides(Path filePath) throws IOException {
+ protected Config loadNodeFileWithOverrides(Path filePath) throws IOException
{
Config nodeConfig = this.pullFileLoader.loadPullFile(filePath,
emptyConfig, false, false);
return getNodeConfigWithOverrides(nodeConfig, filePath);
}
+
/**
* Load the edge file.
* @param filePath path of the edge file relative to the repository root
* @return the configuration object
* @throws IOException
*/
- private Config loadEdgeFileWithOverrides(Path filePath) throws IOException {
+ protected Config loadEdgeFileWithOverrides(Path filePath) throws IOException
{
Config edgeConfig = this.pullFileLoader.loadPullFile(filePath,
emptyConfig, false, false);
return getEdgeConfigWithOverrides(edgeConfig, filePath);
}
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/FlowGraphMonitor.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/FlowGraphMonitor.java
new file mode 100644
index 000000000..7c39b0157
--- /dev/null
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/FlowGraphMonitor.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.flowgraph;
+
+import com.google.common.util.concurrent.Service;
+
+
+/**
+ * A service that listens to an external service or filesystem (Git, FS) to
apply changes to the flowgraph without having to restart GaaS
+ */
+public interface FlowGraphMonitor extends Service {
+
+ /**
+ * Indicates that the service is ready to load the flowgraph
+ * @param value whether GaaS is ready
+ */
+ void setActive(boolean value);
+
+}
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitConfigMonitor.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/GitConfigListener.java
similarity index 64%
rename from
gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitConfigMonitor.java
rename to
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/GitConfigListener.java
index 101d9f46d..3c6488d8d 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitConfigMonitor.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/GitConfigListener.java
@@ -14,23 +14,25 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.gobblin.service.modules.core;
+
+package org.apache.gobblin.service.monitoring;
import java.io.IOException;
+import java.net.URI;
+import java.util.Set;
-import org.apache.hadoop.fs.Path;
+import lombok.extern.slf4j.Slf4j;
import org.eclipse.jgit.diff.DiffEntry;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
-import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Sets;
import com.google.common.io.Files;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import com.typesafe.config.ConfigValueFactory;
-import javax.inject.Inject;
-import javax.inject.Singleton;
-import lombok.extern.slf4j.Slf4j;
-
import org.apache.gobblin.config.ConfigBuilder;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.runtime.api.FlowSpec;
@@ -38,59 +40,41 @@ import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
import org.apache.gobblin.runtime.spec_store.FSSpecStore;
import org.apache.gobblin.util.PullFileLoader;
+
/**
- * Service that monitors for jobs from a git repository.
- * The git repository must have an initial commit that has no config files
since that is used as a base for getting
- * the change list.
- * The config needs to be organized with the following structure:
- * <root_config_dir>/<flowGroup>/<flowName>.(pull|job|json|conf)
- * The <flowGroup> and <flowName> is used to generate the URI used to store
the config in the {@link FlowCatalog}
+ * Listener for {@link GitConfigMonitor} to apply changes from Git to a {@link
FlowCatalog} for adding and removing jobs
*/
@Slf4j
-@Singleton
-public class GitConfigMonitor extends GitMonitoringService {
- public static final String GIT_CONFIG_MONITOR_PREFIX =
"gobblin.service.gitConfigMonitor";
-
+public class GitConfigListener implements GitDiffListener {
+ private static final int CONFIG_FILE_DEPTH = 3;
private static final String SPEC_DESCRIPTION = "Git-based flow config";
private static final String SPEC_VERSION = FlowSpec.Builder.DEFAULT_VERSION;
- private static final String PROPERTIES_EXTENSIONS = "pull,job";
- private static final String CONF_EXTENSIONS = "json,conf";
- private static final String DEFAULT_GIT_CONFIG_MONITOR_REPO_DIR =
"git-flow-config";
- private static final String DEFAULT_GIT_CONFIG_MONITOR_CONFIG_DIR =
"gobblin-config";
- private static final String DEFAULT_GIT_CONFIG_MONITOR_BRANCH_NAME =
"master";
-
- private static final int CONFIG_FILE_DEPTH = 3;
- private static final int DEFAULT_GIT_CONFIG_MONITOR_POLLING_INTERVAL = 60;
-
- private static final Config DEFAULT_FALLBACK =
- ConfigFactory.parseMap(ImmutableMap.<String, Object>builder()
- .put(ConfigurationKeys.GIT_MONITOR_REPO_DIR,
DEFAULT_GIT_CONFIG_MONITOR_REPO_DIR)
- .put(ConfigurationKeys.GIT_MONITOR_CONFIG_BASE_DIR,
DEFAULT_GIT_CONFIG_MONITOR_CONFIG_DIR)
- .put(ConfigurationKeys.GIT_MONITOR_BRANCH_NAME,
DEFAULT_GIT_CONFIG_MONITOR_BRANCH_NAME)
- .put(ConfigurationKeys.GIT_MONITOR_POLLING_INTERVAL,
DEFAULT_GIT_CONFIG_MONITOR_POLLING_INTERVAL)
- .put(GitMonitoringService.JAVA_PROPS_EXTENSIONS,
PROPERTIES_EXTENSIONS)
- .put(GitMonitoringService.HOCON_FILE_EXTENSIONS, CONF_EXTENSIONS)
- .build());
-
private final FlowCatalog flowCatalog;
+
+ final String repositoryDir;
+ final String configBaseFolderName;
+ final PullFileLoader pullFileLoader;
+ final Set<String> javaPropsExtensions;
+ final Set<String> hoconFileExtensions;
private final Config emptyConfig = ConfigFactory.empty();
- @Inject
- GitConfigMonitor(Config config, FlowCatalog flowCatalog) {
-
super(config.getConfig(GIT_CONFIG_MONITOR_PREFIX).withFallback(DEFAULT_FALLBACK));
- this.flowCatalog = flowCatalog;
- }
- @Override
- public boolean shouldPollGit() {
- // if not active or if the flow catalog is not up yet then can't process
config changes
- if (!isActive || !this.flowCatalog.isRunning()) {
- log.warn("GitConfigMonitor: skip poll since the JobCatalog is not yet
running. isActive = {}", this.isActive);
- return false;
+ public GitConfigListener(FlowCatalog flowCatalog, String repositoryDir,
String configBaseFolderName, String javaPropsExtentions, String
hoconFileExtentions) {
+ this.flowCatalog = flowCatalog;
+ this.configBaseFolderName = configBaseFolderName;
+ this.repositoryDir = repositoryDir;
+
+ Path folderPath = new Path(repositoryDir, configBaseFolderName);
+ this.javaPropsExtensions = Sets.newHashSet(javaPropsExtentions.split(","));
+ this.hoconFileExtensions = Sets.newHashSet(hoconFileExtentions.split(","));
+ try {
+ this.pullFileLoader = new PullFileLoader(folderPath,
+ FileSystem.get(URI.create(ConfigurationKeys.LOCAL_FS_URI), new
Configuration()),
+ this.javaPropsExtensions, this.hoconFileExtensions);
+ } catch (IOException e) {
+ throw new RuntimeException("Could not create pull file loader", e);
}
- return true;
}
-
/**
* Add a {@link FlowSpec} for an added, updated, or modified flow config
* @param change
@@ -137,23 +121,23 @@ public class GitConfigMonitor extends
GitMonitoringService {
.withDescription(SPEC_DESCRIPTION)
.build();
- this.flowCatalog.remove(spec.getUri());
+ this.flowCatalog.remove(spec.getUri());
}
}
- /**
- * check whether the file has the proper naming and hierarchy
- * @param configFilePath the relative path from the repo root
- * @return false if the file does not conform
- */
+ /**
+ * check whether the file has the proper naming and hierarchy
+ * @param configFilePath the relative path from the repo root
+ * @return false if the file does not conform
+ */
private boolean checkConfigFilePath(String configFilePath) {
// The config needs to stored at
configDir/flowGroup/flowName.(pull|job|json|conf)
Path configFile = new Path(configFilePath);
String fileExtension = Files.getFileExtension(configFile.getName());
if (configFile.depth() != CONFIG_FILE_DEPTH
- || !configFile.getParent().getParent().getName().equals(folderName)
+ ||
!configFile.getParent().getParent().getName().equals(configBaseFolderName)
||
!(PullFileLoader.DEFAULT_JAVA_PROPS_PULL_FILE_EXTENSIONS.contains(fileExtension)
||
PullFileLoader.DEFAULT_JAVA_PROPS_PULL_FILE_EXTENSIONS.contains(fileExtension)))
{
log.warn("Changed file does not conform to directory structure and file
name format, skipping: "
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/GitConfigMonitor.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/GitConfigMonitor.java
new file mode 100644
index 000000000..e7f639ce8
--- /dev/null
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/GitConfigMonitor.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.service.monitoring;
+
+import com.google.common.collect.ImmutableMap;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+
+import javax.inject.Inject;
+import javax.inject.Singleton;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
+
+/**
+ * Service that monitors for jobs from a git repository.
+ * The git repository must have an initial commit that has no config files
since that is used as a base for getting
+ * the change list.
+ * The config needs to be organized with the following structure:
+ * <root_config_dir>/<flowGroup>/<flowName>.(pull|job|json|conf)
+ * The <flowGroup> and <flowName> is used to generate the URI used to store
the config in the {@link FlowCatalog}
+ */
+@Slf4j
+@Singleton
+public class GitConfigMonitor extends GitMonitoringService {
+ public static final String GIT_CONFIG_MONITOR_PREFIX =
"gobblin.service.gitConfigMonitor";
+
+ private static final String PROPERTIES_EXTENSIONS = "pull,job";
+ private static final String CONF_EXTENSIONS = "json,conf";
+ private static final String DEFAULT_GIT_CONFIG_MONITOR_REPO_DIR =
"git-flow-config";
+ private static final String DEFAULT_GIT_CONFIG_MONITOR_CONFIG_DIR =
"gobblin-config";
+ private static final String DEFAULT_GIT_CONFIG_MONITOR_BRANCH_NAME =
"master";
+
+ private static final int DEFAULT_GIT_CONFIG_MONITOR_POLLING_INTERVAL = 60;
+
+ private static final Config DEFAULT_FALLBACK =
+ ConfigFactory.parseMap(ImmutableMap.<String, Object>builder()
+ .put(ConfigurationKeys.GIT_MONITOR_REPO_DIR,
DEFAULT_GIT_CONFIG_MONITOR_REPO_DIR)
+ .put(ConfigurationKeys.GIT_MONITOR_CONFIG_BASE_DIR,
DEFAULT_GIT_CONFIG_MONITOR_CONFIG_DIR)
+ .put(ConfigurationKeys.GIT_MONITOR_BRANCH_NAME,
DEFAULT_GIT_CONFIG_MONITOR_BRANCH_NAME)
+ .put(ConfigurationKeys.GIT_MONITOR_POLLING_INTERVAL,
DEFAULT_GIT_CONFIG_MONITOR_POLLING_INTERVAL)
+ .put(ConfigurationKeys.JAVA_PROPS_EXTENSIONS, PROPERTIES_EXTENSIONS)
+ .put(ConfigurationKeys.HOCON_FILE_EXTENSIONS, CONF_EXTENSIONS)
+ .build());
+
+ private final FlowCatalog flowCatalog;
+
+ @Inject
+ GitConfigMonitor(Config config, FlowCatalog flowCatalog) {
+
super(config.getConfig(GIT_CONFIG_MONITOR_PREFIX).withFallback(DEFAULT_FALLBACK));
+ this.flowCatalog = flowCatalog;
+ Config configWithFallbacks =
config.getConfig(GIT_CONFIG_MONITOR_PREFIX).withFallback(DEFAULT_FALLBACK);
+ this.listeners.add(new GitConfigListener(flowCatalog,
configWithFallbacks.getString(ConfigurationKeys.GIT_MONITOR_REPO_DIR),
+
configWithFallbacks.getString(ConfigurationKeys.GIT_MONITOR_CONFIG_BASE_DIR),
configWithFallbacks.getString(ConfigurationKeys.JAVA_PROPS_EXTENSIONS),
+
configWithFallbacks.getString(ConfigurationKeys.HOCON_FILE_EXTENSIONS)));
+ }
+
+ @Override
+ public boolean shouldPollGit() {
+ // if not active or if the flow catalog is not up yet then can't process
config changes
+ if (!isActive || !this.flowCatalog.isRunning()) {
+ log.warn("GitConfigMonitor: skip poll since the JobCatalog is not yet
running. isActive = {}", this.isActive);
+ return false;
+ }
+ return true;
+ }
+
+}
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/GitDiffListener.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/GitDiffListener.java
new file mode 100644
index 000000000..bfb098b35
--- /dev/null
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/GitDiffListener.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.monitoring;
+
+import org.eclipse.jgit.diff.DiffEntry;
+
+
+/**
+ * Listener for {@link GitMonitoringService} to apply changes detected from
Git.
+ */
+public interface GitDiffListener {
+
+ void addChange(DiffEntry change);
+
+ void removeChange(DiffEntry change);
+}
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/GitFlowGraphListener.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/GitFlowGraphListener.java
new file mode 100644
index 000000000..1f5a16e7d
--- /dev/null
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/GitFlowGraphListener.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.monitoring;
+
+import java.net.URI;
+import java.util.Map;
+
+import org.eclipse.jgit.diff.DiffEntry;
+import org.apache.hadoop.fs.Path;
+
+import com.google.common.base.Optional;
+
+import org.apache.gobblin.runtime.api.TopologySpec;
+import org.apache.gobblin.service.modules.flowgraph.BaseFlowGraphListener;
+import org.apache.gobblin.service.modules.flowgraph.DataNode;
+import org.apache.gobblin.service.modules.flowgraph.FlowEdge;
+import org.apache.gobblin.service.modules.flowgraph.FlowGraph;
+import
org.apache.gobblin.service.modules.template_catalog.FSFlowTemplateCatalog;
+
+
+/**
+ * Listener for {@link GitFlowGraphMonitor} to apply changes from Git to a
{@link FlowGraph}
+ */
+public class GitFlowGraphListener extends BaseFlowGraphListener implements
GitDiffListener {
+
+ public GitFlowGraphListener(Optional<? extends FSFlowTemplateCatalog>
flowTemplateCatalog,
+ FlowGraph graph, Map<URI, TopologySpec> topologySpecMap, String
baseDirectory, String folderName, String javaPropsExtentions,
+ String hoconFileExtentions) {
+ super(flowTemplateCatalog, graph, topologySpecMap, baseDirectory,
folderName, javaPropsExtentions, hoconFileExtentions);
+
+ }
+
+ /**
+ * Add an element (i.e., a {@link DataNode}, or a {@link FlowEdge} to
+ * the {@link FlowGraph} for an added, updated or modified node or edge file.
+ * @param change
+ */
+ @Override
+ public void addChange(DiffEntry change) {
+ Path path = new Path(change.getNewPath());
+ if (path.depth() == NODE_FILE_DEPTH) {
+ addDataNode(change.getNewPath());
+ } else if (path.depth() == EDGE_FILE_DEPTH) {
+ addFlowEdge(change.getNewPath());
+ }
+ }
+
+ /**
+ * Remove an element (i.e. either a {@link DataNode} or a {@link FlowEdge}
from the {@link FlowGraph} for
+ * a renamed or deleted {@link DataNode} or {@link FlowEdge} file.
+ * @param change
+ */
+ @Override
+ public void removeChange(DiffEntry change) {
+ Path path = new Path(change.getOldPath());
+ if (path.depth() == NODE_FILE_DEPTH) {
+ removeDataNode(change.getOldPath());
+ } else if (path.depth() == EDGE_FILE_DEPTH) {
+ removeFlowEdge(change.getOldPath());
+ }
+ }
+}
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/GitFlowGraphMonitor.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/GitFlowGraphMonitor.java
new file mode 100644
index 000000000..bfe811fa0
--- /dev/null
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/GitFlowGraphMonitor.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.monitoring;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.net.URI;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+
+import org.apache.hadoop.fs.Path;
+import org.eclipse.jgit.api.errors.GitAPIException;
+import org.eclipse.jgit.diff.DiffEntry;
+
+import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableMap;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.runtime.api.TopologySpec;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.service.modules.flowgraph.FlowGraphMonitor;
+import org.apache.gobblin.service.modules.flowgraph.DataNode;
+import org.apache.gobblin.service.modules.flowgraph.FlowEdge;
+import org.apache.gobblin.service.modules.flowgraph.FlowGraph;
+import
org.apache.gobblin.service.modules.template_catalog.FSFlowTemplateCatalog;
+
+/**
+ * Service that monitors for changes to {@link
org.apache.gobblin.service.modules.flowgraph.FlowGraph} from a git repository.
+ * The git repository must have an inital commit that has no files since that
is used as a base for getting
+ * the change list.
+ * The {@link DataNode}s and {@link FlowEdge}s in FlowGraph need to be
organized with the following directory structure on git:
+ * <root_flowGraph_dir>/<nodeName>/<nodeName>.properties
+ * <root_flowGraph_dir>/<nodeName1>/<nodeName2>/<edgeName>.properties
+ */
+@Slf4j
+public class GitFlowGraphMonitor extends GitMonitoringService implements
FlowGraphMonitor {
+ public static final String GIT_FLOWGRAPH_MONITOR_PREFIX =
"gobblin.service.gitFlowGraphMonitor";
+ private static final String PROPERTIES_EXTENSIONS = "properties";
+ private static final String CONF_EXTENSIONS = "conf";
+ private static final String DEFAULT_GIT_FLOWGRAPH_MONITOR_REPO_DIR =
"git-flowgraph";
+ private static final String DEFAULT_GIT_FLOWGRAPH_MONITOR_FLOWGRAPH_DIR =
"gobblin-flowgraph";
+ private static final String DEFAULT_GIT_FLOWGRAPH_MONITOR_BRANCH_NAME =
"master";
+ static final String SHOULD_CHECKPOINT_HASHES = "shouldCheckpointHashes";
+
+ private static final int DEFAULT_GIT_FLOWGRAPH_MONITOR_POLLING_INTERVAL = 60;
+
+ private static final Config DEFAULT_FALLBACK =
ConfigFactory.parseMap(ImmutableMap.<String, Object>builder()
+ .put(ConfigurationKeys.GIT_MONITOR_REPO_DIR,
DEFAULT_GIT_FLOWGRAPH_MONITOR_REPO_DIR)
+ .put(ConfigurationKeys.GIT_MONITOR_CONFIG_BASE_DIR,
DEFAULT_GIT_FLOWGRAPH_MONITOR_FLOWGRAPH_DIR)
+ .put(ConfigurationKeys.GIT_MONITOR_BRANCH_NAME,
DEFAULT_GIT_FLOWGRAPH_MONITOR_BRANCH_NAME)
+ .put(ConfigurationKeys.GIT_MONITOR_POLLING_INTERVAL,
DEFAULT_GIT_FLOWGRAPH_MONITOR_POLLING_INTERVAL)
+ .put(ConfigurationKeys.JAVA_PROPS_EXTENSIONS, PROPERTIES_EXTENSIONS)
+ .put(ConfigurationKeys.HOCON_FILE_EXTENSIONS, CONF_EXTENSIONS)
+ .put(SHOULD_CHECKPOINT_HASHES, false)
+ .build());
+
+ private Optional<? extends FSFlowTemplateCatalog> flowTemplateCatalog;
+ private final CountDownLatch initComplete;
+
+ public GitFlowGraphMonitor(Config config, Optional<? extends
FSFlowTemplateCatalog> flowTemplateCatalog,
+ FlowGraph graph, Map<URI, TopologySpec> topologySpecMap, CountDownLatch
initComplete) {
+
super(config.getConfig(GIT_FLOWGRAPH_MONITOR_PREFIX).withFallback(DEFAULT_FALLBACK));
+ Config configWithFallbacks =
config.getConfig(GIT_FLOWGRAPH_MONITOR_PREFIX).withFallback(DEFAULT_FALLBACK);
+ this.flowTemplateCatalog = flowTemplateCatalog;
+ this.initComplete = initComplete;
+ this.listeners.add(new GitFlowGraphListener(
+ flowTemplateCatalog, graph, topologySpecMap,
configWithFallbacks.getString(ConfigurationKeys.GIT_MONITOR_REPO_DIR),
+
configWithFallbacks.getString(ConfigurationKeys.GIT_MONITOR_CONFIG_BASE_DIR),
configWithFallbacks.getString(ConfigurationKeys.JAVA_PROPS_EXTENSIONS),
+ configWithFallbacks.getString(ConfigurationKeys.HOCON_FILE_EXTENSIONS))
+ );
+ }
+
+ /**
+ * Determine if the service should poll Git. Current behavior is both
leaders and followers(s) will poll Git for
+ * changes to {@link FlowGraph}.
+ */
+ @Override
+ public boolean shouldPollGit() {
+ return this.isActive;
+ }
+
+ /**
+ * Sort the changes in a commit so that changes to node files appear before
changes to edge files. This is done so that
+ * node related changes are applied to the FlowGraph before edge related
changes. An example where the order matters
+ * is the case when a commit adds a new node n2 as well as adds an edge from
an existing node n1 to n2. To ensure that the
+ * addition of edge n1->n2 is successful, node n2 must exist in the graph
and so needs to be added first. For deletions,
+ * the order does not matter and ordering the changes in the commit will
result in the same FlowGraph state as if the changes
+ * were unordered. In other words, deletion of a node deletes all its
incident edges from the FlowGraph. So processing an
+ * edge deletion later results in a no-op. Note that node and edge files do
not change depth in case of modifications.
+ *
+ * If there are multiple commits between successive polls to Git, the
re-ordering of changes across commits should not
+ * affect the final state of the FlowGraph. This is because, the order of
changes for a given file type (i.e. node or edge)
+ * is preserved.
+ */
+ @Override
+ void processGitConfigChanges() throws GitAPIException, IOException {
+ if (flowTemplateCatalog.isPresent() &&
flowTemplateCatalog.get().getAndSetShouldRefreshFlowGraph(false)) {
+ log.info("Change to template catalog detected, refreshing FlowGraph");
+ this.gitRepo.initRepository();
+ }
+
+ List<DiffEntry> changes = this.gitRepo.getChanges();
+ changes.sort(new GitFlowgraphComparator());
+ processGitConfigChangesHelper(changes);
+ //Decrements the latch count. The countdown latch is initialized to 1. So
after the first time the latch is decremented,
+ // the following operation should be a no-op.
+ this.initComplete.countDown();
+ }
+
+ static class GitFlowgraphComparator implements Comparator<DiffEntry>,
Serializable {
+ public int compare(DiffEntry o1, DiffEntry o2) {
+ Integer o1Depth = (o1.getNewPath() != null) ? (new
Path(o1.getNewPath())).depth() : (new Path(o1.getOldPath())).depth();
+ Integer o2Depth = (o2.getNewPath() != null) ? (new
Path(o2.getNewPath())).depth() : (new Path(o2.getOldPath())).depth();
+ return o1Depth.compareTo(o2Depth);
+ }
+ }
+
+}
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitMonitoringService.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/GitMonitoringService.java
similarity index 91%
rename from
gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitMonitoringService.java
rename to
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/GitMonitoringService.java
index 05d7b9a6c..878d432e5 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitMonitoringService.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/GitMonitoringService.java
@@ -15,26 +15,22 @@
* limitations under the License.
*/
-package org.apache.gobblin.service.modules.core;
+package org.apache.gobblin.service.monitoring;
import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
-import java.net.URI;
import java.nio.charset.Charset;
+import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.commons.codec.binary.Base64;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
import org.eclipse.jgit.api.Git;
import org.eclipse.jgit.api.ResetCommand;
import org.eclipse.jgit.api.TransportConfigCallback;
@@ -53,14 +49,13 @@ import
org.eclipse.jgit.transport.UsernamePasswordCredentialsProvider;
import org.eclipse.jgit.treewalk.CanonicalTreeParser;
import org.eclipse.jgit.util.FS;
+import com.google.common.util.concurrent.AbstractIdleService;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Charsets;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
-import com.google.common.collect.Sets;
import com.google.common.io.Files;
-import com.google.common.util.concurrent.AbstractIdleService;
import com.jcraft.jsch.JSch;
import com.jcraft.jsch.JSchException;
import com.jcraft.jsch.Session;
@@ -73,16 +68,19 @@ import org.apache.gobblin.password.PasswordManager;
import org.apache.gobblin.util.ConfigUtils;
import org.apache.gobblin.util.Either;
import org.apache.gobblin.util.ExecutorsUtils;
-import org.apache.gobblin.util.PullFileLoader;
+/**
+ * Base monitoring service that polls git and applies changes to each of its
listeners
+ * Implementation classes should also define {@link GitDiffListener} to apply
changes from git
+ * It is possible to have multiple listeners to the same repository, and each
can apply their own changes, but
+ * determining if a change is made and their order is centrally controlled by
processGitConfigChanges()
+ */
@Slf4j
public abstract class GitMonitoringService extends AbstractIdleService {
private static final String REMOTE_NAME = "origin";
private static final int TERMINATION_TIMEOUT = 30;
- static final String JAVA_PROPS_EXTENSIONS = "javaPropsExtensions";
- static final String HOCON_FILE_EXTENSIONS = "hoconFileExtensions";
static final String SHOULD_CHECKPOINT_HASHES = "shouldCheckpointHashes";
private final Integer pollingInterval;
@@ -97,12 +95,10 @@ public abstract class GitMonitoringService extends
AbstractIdleService {
private String knownHostsFile;
final GitMonitoringService.GitRepository gitRepo;
- final String repositoryDir;
- final String folderName;
- final PullFileLoader pullFileLoader;
- final Set<String> javaPropsExtensions;
- final Set<String> hoconFileExtensions;
+ protected final String repositoryDir;
+ protected final String folderName;
+ protected List<GitDiffListener> listeners = new ArrayList<>();
protected volatile boolean isActive = false;
GitMonitoringService(Config config) {
@@ -168,17 +164,6 @@ public abstract class GitMonitoringService extends
AbstractIdleService {
throw new RuntimeException("Could not open git repository", e);
}
- Path folderPath = new Path(this.repositoryDir, this.folderName);
- this.javaPropsExtensions =
Sets.newHashSet(config.getString(JAVA_PROPS_EXTENSIONS).split(","));
- this.hoconFileExtensions =
Sets.newHashSet(config.getString(HOCON_FILE_EXTENSIONS).split(","));
- try {
- this.pullFileLoader = new PullFileLoader(folderPath,
- FileSystem.get(URI.create(ConfigurationKeys.LOCAL_FS_URI), new
Configuration()),
- this.javaPropsExtensions, this.hoconFileExtensions);
- } catch (IOException e) {
- throw new RuntimeException("Could not create pull file loader", e);
- }
-
this.scheduledExecutor = Executors.newSingleThreadScheduledExecutor(
ExecutorsUtils.newThreadFactory(Optional.of(log),
Optional.of("FetchGitConfExecutor")));
}
@@ -234,20 +219,22 @@ public abstract class GitMonitoringService extends
AbstractIdleService {
*/
void processGitConfigChangesHelper(List<DiffEntry> changes) throws
IOException {
for (DiffEntry change : changes) {
- switch (change.getChangeType()) {
- case ADD:
- case MODIFY:
- addChange(change);
- break;
- case DELETE:
- removeChange(change);
- break;
- case RENAME:
- removeChange(change);
- addChange(change);
- break;
- default:
- throw new RuntimeException("Unsupported change type " +
change.getChangeType());
+ for (GitDiffListener listener: this.listeners) {
+ switch (change.getChangeType()) {
+ case ADD:
+ case MODIFY:
+ listener.addChange(change);
+ break;
+ case DELETE:
+ listener.removeChange(change);
+ break;
+ case RENAME:
+ listener.removeChange(change);
+ listener.addChange(change);
+ break;
+ default:
+ throw new RuntimeException("Unsupported change type " +
change.getChangeType());
+ }
}
}
@@ -487,8 +474,4 @@ public abstract class GitMonitoringService extends
AbstractIdleService {
}
public abstract boolean shouldPollGit();
-
- public abstract void addChange(DiffEntry change);
-
- public abstract void removeChange(DiffEntry change);
}
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/GobblinServiceManagerTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/GobblinServiceManagerTest.java
index baac020fe..a66717c56 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/GobblinServiceManagerTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/GobblinServiceManagerTest.java
@@ -60,7 +60,7 @@ import
org.apache.gobblin.metastore.testing.TestMetastoreDatabaseFactory;
import org.apache.gobblin.runtime.api.FlowSpec;
import org.apache.gobblin.runtime.api.Spec;
import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
-import org.apache.gobblin.service.modules.core.GitConfigMonitor;
+import org.apache.gobblin.service.monitoring.GitConfigMonitor;
import org.apache.gobblin.service.modules.core.GobblinServiceManager;
import org.apache.gobblin.service.modules.flow.MockedSpecCompiler;
import org.apache.gobblin.service.monitoring.FsJobStatusRetriever;
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompilerTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompilerTest.java
index 7e675495d..0f5a66753 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompilerTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompilerTest.java
@@ -75,7 +75,7 @@ import org.apache.gobblin.runtime.api.SpecProducer;
import org.apache.gobblin.runtime.api.TopologySpec;
import org.apache.gobblin.runtime.spec_executorInstance.AbstractSpecExecutor;
import org.apache.gobblin.service.ServiceConfigKeys;
-import org.apache.gobblin.service.modules.core.GitFlowGraphMonitor;
+import org.apache.gobblin.service.monitoring.GitFlowGraphMonitor;
import org.apache.gobblin.service.modules.flowgraph.BaseFlowGraph;
import org.apache.gobblin.service.modules.flowgraph.Dag;
import org.apache.gobblin.service.modules.flowgraph.Dag.DagNode;
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GitConfigMonitorTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/GitConfigMonitorTest.java
similarity index 99%
rename from
gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GitConfigMonitorTest.java
rename to
gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/GitConfigMonitorTest.java
index 3fddd4aab..7b735a63a 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GitConfigMonitorTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/GitConfigMonitorTest.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.gobblin.service.modules.core;
+package org.apache.gobblin.service.monitoring;
import java.io.File;
import java.io.IOException;
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GitFlowGraphMonitorTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/GitFlowGraphMonitorTest.java
similarity index 99%
rename from
gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GitFlowGraphMonitorTest.java
rename to
gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/GitFlowGraphMonitorTest.java
index 501b4bcf0..efa34e1ec 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GitFlowGraphMonitorTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/GitFlowGraphMonitorTest.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.gobblin.service.modules.core;
+package org.apache.gobblin.service.monitoring;
import java.io.File;
import java.io.IOException;
diff --git
a/gobblin-utility/src/main/java/org/apache/gobblin/util/filesystem/PathAlterationObserver.java
b/gobblin-utility/src/main/java/org/apache/gobblin/util/filesystem/PathAlterationObserver.java
index 94f7d151e..6fcb44374 100644
---
a/gobblin-utility/src/main/java/org/apache/gobblin/util/filesystem/PathAlterationObserver.java
+++
b/gobblin-utility/src/main/java/org/apache/gobblin/util/filesystem/PathAlterationObserver.java
@@ -161,7 +161,7 @@ public class PathAlterationObserver {
}
/**
- * Check whether the file and its chlidren have been created, modified or
deleted.
+ * Check whether the file and its children have been created, modified or
deleted.
*/
public void checkAndNotify()
throws IOException {