[
https://issues.apache.org/jira/browse/GOBBLIN-1696?focusedWorklogId=811077&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-811077
]
ASF GitHub Bot logged work on GOBBLIN-1696:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 22/Sep/22 07:45
Start Date: 22/Sep/22 07:45
Worklog Time Spent: 10m
Work Description: phet commented on code in PR #3548:
URL: https://github.com/apache/gobblin/pull/3548#discussion_r977194076
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java:
##########
@@ -82,7 +85,7 @@
@Slf4j
public class MultiHopFlowCompiler extends BaseFlowToJobSpecCompiler {
@Getter
- private final FlowGraph flowGraph;
+ private final AtomicReference<FlowGraph> flowGraph;
Review Comment:
`@Getter` may break encapsulation more than desired: let's not invite
callers to adjust our reference. better perhaps to define an accessor
returning `AtomicReference.get` to them.
in order to handle atomic updates, define a setter (yourself, `@Setter`
won't quite work)
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java:
##########
@@ -168,7 +171,7 @@ public MultiHopFlowCompiler(Config config, Optional<Logger>
log, boolean instrum
}
@VisibleForTesting
- MultiHopFlowCompiler(Config config, FlowGraph flowGraph) {
+ MultiHopFlowCompiler(Config config, AtomicReference<FlowGraph> flowGraph) {
Review Comment:
similar to comments above around encapsulation, more expected would be a
`FlowGraph` param for the MHFC to store internally into the `AtomicReference`.
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowGraphHelper.java:
##########
@@ -277,32 +282,64 @@ private List<SpecExecutor> getSpecExecutors(Config
edgeConfig) throws URISyntaxE
* @return the configuration object
* @throws IOException
*/
- protected Config loadNodeFileWithOverrides(Path filePath) throws IOException
{
+ protected Config loadNodeFileWithOverrides(Path filePath)
+ throws IOException {
Config nodeConfig = this.pullFileLoader.loadPullFile(filePath,
emptyConfig, false, false);
return getNodeConfigWithOverrides(nodeConfig, filePath);
}
-
/**
* Load the edge file.
* @param filePath path of the edge file relative to the repository root
* @return the configuration object
* @throws IOException
*/
- protected Config loadEdgeFileWithOverrides(Path filePath) throws IOException
{
+ protected Config loadEdgeFileWithOverrides(Path filePath)
+ throws IOException {
Config edgeConfig = this.pullFileLoader.loadPullFile(filePath,
emptyConfig, false, false);
return getEdgeConfigWithOverrides(edgeConfig, filePath);
}
+ /**
+ * Loads the entire flowgraph from the path configured in {@link
org.apache.gobblin.configuration.ConfigurationKeys.FLOWGRAPH_BASE_DIR }
+ * Expects nodes to be in the format of /flowGraphName/nodeA/nodeA.properties
+ * Expects edges to be in the format of
/flowGraphName/nodeA/nodeB/edgeAB.properties
+ * The current flowgraph will be swapped atomically with the new flowgraph
that is loaded
+ */
+ public void populateFlowGraphAtomically(AtomicReference<FlowGraph>
flowGraphReference) {
+ FlowGraph newFlowGraph = new BaseFlowGraph();
+ java.nio.file.Path graphPath = new File(this.baseDirectory).toPath();
+ try {
+ List<Path> edges = new ArrayList<>();
+ // All nodes must be added first before edges, otherwise edges may have
a missing source or destination.
+ // Need to convert files to Hadoop Paths to be compatible with
FileAlterationListener
+ java.nio.file.Files.walk(graphPath).forEach(fileName -> {
+ if (!java.nio.file.Files.isDirectory(fileName)) {
+ if (checkFileLevelRelativeToRoot(new Path(fileName.toString()),
NODE_FILE_DEPTH)) {
+ addDataNode(newFlowGraph, fileName.toString());
+ } else if (checkFileLevelRelativeToRoot(new
Path(fileName.toString()), EDGE_FILE_DEPTH)) {
+ edges.add(new Path(fileName.toString()));
+ }
Review Comment:
I wonder what might be silently skipped here... whether logging might assist
in debugging violated presumptions
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowGraphHelper.java:
##########
@@ -277,32 +282,64 @@ private List<SpecExecutor> getSpecExecutors(Config
edgeConfig) throws URISyntaxE
* @return the configuration object
* @throws IOException
*/
- protected Config loadNodeFileWithOverrides(Path filePath) throws IOException
{
+ protected Config loadNodeFileWithOverrides(Path filePath)
+ throws IOException {
Config nodeConfig = this.pullFileLoader.loadPullFile(filePath,
emptyConfig, false, false);
return getNodeConfigWithOverrides(nodeConfig, filePath);
}
-
/**
* Load the edge file.
* @param filePath path of the edge file relative to the repository root
* @return the configuration object
* @throws IOException
*/
- protected Config loadEdgeFileWithOverrides(Path filePath) throws IOException
{
+ protected Config loadEdgeFileWithOverrides(Path filePath)
+ throws IOException {
Config edgeConfig = this.pullFileLoader.loadPullFile(filePath,
emptyConfig, false, false);
return getEdgeConfigWithOverrides(edgeConfig, filePath);
}
+ /**
+ * Loads the entire flowgraph from the path configured in {@link
org.apache.gobblin.configuration.ConfigurationKeys.FLOWGRAPH_BASE_DIR }
+ * Expects nodes to be in the format of /flowGraphName/nodeA/nodeA.properties
+ * Expects edges to be in the format of
/flowGraphName/nodeA/nodeB/edgeAB.properties
+ * The current flowgraph will be swapped atomically with the new flowgraph
that is loaded
+ */
+ public void populateFlowGraphAtomically(AtomicReference<FlowGraph>
flowGraphReference) {
Review Comment:
rather than "handing over the keys", and combining responsibility for
loading w/ managing orderly updates, why not merely return the loaded
`FlowGraph` and leave it to the caller to handle update/replacement?
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowGraphHelper.java:
##########
@@ -84,26 +84,27 @@ public BaseFlowGraphListener(Optional<? extends
FSFlowTemplateCatalog> flowTempl
this.hoconFileExtensions = Sets.newHashSet(hoconFileExtensions.split(","));
try {
this.pullFileLoader = new PullFileLoader(folderPath,
- FileSystem.get(URI.create(ConfigurationKeys.LOCAL_FS_URI), new
Configuration()),
- this.javaPropsExtensions, this.hoconFileExtensions);
+ FileSystem.get(URI.create(ConfigurationKeys.LOCAL_FS_URI), new
Configuration()), this.javaPropsExtensions,
+ this.hoconFileExtensions);
} catch (IOException e) {
throw new RuntimeException("Could not create pull file loader", e);
}
}
+
/**
* Add a {@link DataNode} to the {@link FlowGraph}. The method uses the
{@link FlowGraphConfigurationKeys#DATA_NODE_CLASS} config
* to instantiate a {@link DataNode} from the node config file.
* @param path of node to add
*/
- protected void addDataNode(String path) {
+ public void addDataNode(FlowGraph graph, String path) {
if (checkFilePath(path, NODE_FILE_DEPTH)) {
Path nodeFilePath = new Path(this.baseDirectory, path);
try {
Config config = loadNodeFileWithOverrides(nodeFilePath);
Class dataNodeClass = Class.forName(ConfigUtils.getString(config,
FlowGraphConfigurationKeys.DATA_NODE_CLASS,
FlowGraphConfigurationKeys.DEFAULT_DATA_NODE_CLASS));
DataNode dataNode = (DataNode)
GobblinConstructorUtils.invokeLongestConstructor(dataNodeClass, config);
- if (!this.flowGraph.addDataNode(dataNode)) {
+ if (!graph.addDataNode(dataNode)) {
Review Comment:
overall I don't want to slow down the refactoring, so up to you on whether
or how to address...
this helper's methods all follow the same pattern, which appears a clunky
abstraction: they convert a `path` into whatever argument for a call on
`flowGraph` that they delegate to the same. it's as if we wished the
`FlowGraph` methods to take a `String path` as their lone arg--and have that
magically work
Since the gap boils down merely to conversion (and the `FlowGraph` itself is
not involved there), I'd propose a thinner, more cohesive abstraction: a few
converter/factory methods, each taking a `path` and returning a `DataNode` or a
`FlowEdge` or a `String flowId`.
impl-wise, all would leverage one core method to resolve `String path` to
`Config`. the latter motivates the instance state of the pull loader and the
base dir. that's a "path-based config resolver".
separately, if we wish to support the pattern of:
```
if (!flowGraph.op(arg)) {
log("op problem w/ {}", arg.id);
} else {
log("it worked w/ {}", arg.id)
}
```
write a logging `FlowGraph` Decorator that wraps each such method.
edit: see further comments below...
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowGraph.java:
##########
@@ -47,10 +47,10 @@
public class BaseFlowGraph implements FlowGraph {
private final ReadWriteLock rwLock = new ReentrantReadWriteLock(true);
Review Comment:
while you're documenting the rationale for `AtomicReference` (suggested by
zihan), let's also clarify what particular instance state this lock means to
protect.
...at this point, I don't see non-`final` fields in the object... although I
do note the maps are not immutable.
that last part is significant: even with careful atomic access and update
via the reference, the FG can mutate out from under whomever is using it!
##########
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/FsFlowGraphMonitor.java:
##########
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.monitoring;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.hadoop.fs.Path;
+
+import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.util.concurrent.AbstractIdleService;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.runtime.api.TopologySpec;
+import org.apache.gobblin.service.modules.flowgraph.BaseFlowGraphHelper;
+import
org.apache.gobblin.service.modules.flowgraph.FSPathAlterationFlowGraphListener;
+import org.apache.gobblin.service.modules.flowgraph.FlowGraph;
+import org.apache.gobblin.service.modules.flowgraph.FlowGraphMonitor;
+import
org.apache.gobblin.service.modules.template_catalog.FSFlowTemplateCatalog;
+import org.apache.gobblin.util.filesystem.PathAlterationObserver;
+import org.apache.gobblin.util.filesystem.PathAlterationObserverScheduler;
+
+
+@Slf4j
+public class FsFlowGraphMonitor extends AbstractIdleService implements
FlowGraphMonitor {
+ public static final String FS_FLOWGRAPH_MONITOR_PREFIX =
"gobblin.service.fsFlowGraphMonitor";
+ private static final long DEFAULT_FLOWGRAPH_POLLING_INTERVAL = 60;
+ private static final String DEFAULT_FS_FLOWGRAPH_MONITOR_REPO_DIR =
"git-flowgraph";
+ private static final String DEFAULT_FS_FLOWGRAPH_MONITOR_FLOWGRAPH_DIR =
"gobblin-flowgraph";
+ private volatile boolean isActive = false;
+ private final long pollingInterval;
+ private BaseFlowGraphHelper flowGraphHelper;
+ private final PathAlterationObserverScheduler pathAlterationDetector;
+ private final FSPathAlterationFlowGraphListener listener;
+ private final PathAlterationObserver observer;
+ private final Path flowGraphPath;
+ private final AtomicReference<FlowGraph> flowGraph;
+ private final CountDownLatch initComplete;
+ private static final Config DEFAULT_FALLBACK =
ConfigFactory.parseMap(ImmutableMap.<String, Object>builder()
+ .put(ConfigurationKeys.FLOWGRAPH_REPO_DIR,
DEFAULT_FS_FLOWGRAPH_MONITOR_REPO_DIR)
+ .put(ConfigurationKeys.FLOWGRAPH_BASE_DIR,
DEFAULT_FS_FLOWGRAPH_MONITOR_FLOWGRAPH_DIR)
+ .put(ConfigurationKeys.FLOWGRAPH_POLLING_INTERVAL,
DEFAULT_FLOWGRAPH_POLLING_INTERVAL)
+ .put(ConfigurationKeys.JAVA_PROPS_EXTENSIONS,
ConfigurationKeys.DEFAULT_PROPERTIES_EXTENSIONS)
+ .put(ConfigurationKeys.HOCON_FILE_EXTENSIONS,
ConfigurationKeys.DEFAULT_CONF_EXTENSIONS).build());
+
+ public FsFlowGraphMonitor(Config config, Optional<? extends
FSFlowTemplateCatalog> flowTemplateCatalog,
+ AtomicReference<FlowGraph> graph, Map<URI, TopologySpec>
topologySpecMap, CountDownLatch initComplete)
+ throws IOException {
+ Config configWithFallbacks =
config.getConfig(FS_FLOWGRAPH_MONITOR_PREFIX).withFallback(DEFAULT_FALLBACK);
+ this.pollingInterval =
+
TimeUnit.SECONDS.toMillis(configWithFallbacks.getLong(ConfigurationKeys.FLOWGRAPH_POLLING_INTERVAL));
+ this.flowGraphPath = new
Path(configWithFallbacks.getString(ConfigurationKeys.FLOWGRAPH_REPO_DIR));
+ this.observer = new PathAlterationObserver(flowGraphPath);
+ this.flowGraphHelper = new BaseFlowGraphHelper(flowTemplateCatalog,
topologySpecMap, flowGraphPath.toString(),
+ configWithFallbacks.getString(ConfigurationKeys.FLOWGRAPH_BASE_DIR),
configWithFallbacks.getString(ConfigurationKeys.JAVA_PROPS_EXTENSIONS),
+
configWithFallbacks.getString(ConfigurationKeys.HOCON_FILE_EXTENSIONS));
+ this.listener = new FSPathAlterationFlowGraphListener(flowTemplateCatalog,
graph, flowGraphPath.toString(), this.flowGraphHelper);
+
+ this.flowGraph = graph;
+ this.initComplete = initComplete;
+
+ if (pollingInterval ==
ConfigurationKeys.DISABLED_JOB_CONFIG_FILE_MONITOR_POLLING_INTERVAL) {
+ this.pathAlterationDetector = null;
+ } else {
+ this.pathAlterationDetector = new
PathAlterationObserverScheduler(pollingInterval);
+ Optional<PathAlterationObserver> observerOptional =
Optional.fromNullable(observer);
+ this.pathAlterationDetector.addPathAlterationObserver(this.listener,
observerOptional,
+ this.flowGraphPath);
+ }
+ }
+
+ @Override
+ protected void startUp()
+ throws IOException {
+ }
+
+ @Override
+ public synchronized void setActive(boolean isActive) {
+ log.info("Setting the flow graph monitor to be " + isActive + " from " +
this.isActive);
+ if (this.isActive == isActive) {
+ // No-op if already in correct state
+ return;
+ } else if (isActive) {
+ if (this.pathAlterationDetector != null) {
+ log.info("Starting the " + getClass().getSimpleName());
+ log.info("Polling flowgraph folder with interval {} ",
this.pollingInterval);
+ try {
+ this.pathAlterationDetector.start();
+ // Manually instantiate flowgraph when the monitor becomes active
+ this.flowGraphHelper.populateFlowGraphAtomically(this.flowGraph);
Review Comment:
once I read these FlowGraphMonitors, I realized why the
`BaseFlowGraphHelper.addDataNode()` and other methods are `public`. in
essence, the `flowGraphHelper` unequally serves this FS monitor over the Git
one, by providing this method only for the former, with no equivalent for the
latter: instead similar discernment of path nesting depth on the Git side is
within `GitFlowGraphListener`.
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/GitFlowGraphListener.java:
##########
@@ -53,10 +47,10 @@ public GitFlowGraphListener(Optional<? extends
FSFlowTemplateCatalog> flowTempla
@Override
public void addChange(DiffEntry change) {
Path path = new Path(change.getNewPath());
- if (path.depth() == NODE_FILE_DEPTH) {
- addDataNode(change.getNewPath());
- } else if (path.depth() == EDGE_FILE_DEPTH) {
- addFlowEdge(change.getNewPath());
+ if (path.depth() == BaseFlowGraphHelper.NODE_FILE_DEPTH) {
+ this.baseFlowGraphHelper.addDataNode(this.flowGraph.get(),
change.getNewPath());
Review Comment:
the code here illustrates the challenge: even though the `FlowGraph` is
mediated by an `AtomicReference`, it remains mutable. one irresponsible actor
could bring ruin to everyone!
ideally instead of updating the *same* flow graph, we'd rather create a
updated *copy* of it, reflecting the differences.
both because that has cost and because we'd like to enjoy the atomic
semantics we're working so hard for, we wouldn't do this on each
addition/removal, but rather one time per invocation of
`GitFlowGraphMonitor.processGitConfigChanges()`.
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/FSPathAlterationFlowGraphListener.java:
##########
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.flowgraph;
+
+import java.io.File;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.hadoop.fs.Path;
+
+import com.google.common.base.Optional;
+
+import lombok.extern.slf4j.Slf4j;
+
+import
org.apache.gobblin.service.modules.template_catalog.FSFlowTemplateCatalog;
+import org.apache.gobblin.util.filesystem.PathAlterationListener;
+import org.apache.gobblin.util.filesystem.PathAlterationObserver;
+
+
+/**
+ * An implementation of {@link PathAlterationListener} to listen for changes
in a directory and apply it to a GaaS FlowGraph
+ * Is invoked by {@link PathAlterationObserver} which would check a folder and
perform recursive comparisons on files compared to
+ * their last polled state. On any detected differences in files when a check
is done, the {@link FlowGraph} will be updated.
+ *
+ * Unlike the {@link GitFlowGraphListener}, this class will reload the entire
flowgraph on any detected change, instead loading only the diffs.
+ */
+@Slf4j
+public class FSPathAlterationFlowGraphListener implements
PathAlterationListener {
+ private final AtomicReference<FlowGraph> flowGraph;
Review Comment:
I'm surprised to see another `AtomicReference` over here. is the same one
from the MHFC... basically entrusted over to this class? I recommend instead
that this listener hold a reference to the MHFC itself and invoke a setter
method for encapsulating the `AtomicReference`.
##########
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/FsFlowGraphMonitor.java:
##########
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.monitoring;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.hadoop.fs.Path;
+
+import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.util.concurrent.AbstractIdleService;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.runtime.api.TopologySpec;
+import
org.apache.gobblin.service.modules.flowgraph.FSPathAlterationFlowGraphListener;
+import org.apache.gobblin.service.modules.flowgraph.FlowGraph;
+import org.apache.gobblin.service.modules.flowgraph.FlowGraphMonitor;
+import
org.apache.gobblin.service.modules.template_catalog.FSFlowTemplateCatalog;
+import org.apache.gobblin.util.filesystem.PathAlterationObserver;
+import org.apache.gobblin.util.filesystem.PathAlterationObserverScheduler;
+
+
+@Slf4j
+public class FsFlowGraphMonitor extends AbstractIdleService implements
FlowGraphMonitor {
+ public static final String FS_FLOWGRAPH_MONITOR_PREFIX =
"gobblin.service.fsFlowGraphMonitor";
+ private static long DEFAULT_FLOWGRAPH_POLLING_INTERVAL = 60;
+ private static final String DEFAULT_FS_FLOWGRAPH_MONITOR_REPO_DIR =
"git-flowgraph";
+ private static final String DEFAULT_FS_FLOWGRAPH_MONITOR_FLOWGRAPH_DIR =
"gobblin-flowgraph";
+ private boolean isActive;
+ private long pollingInterval;
+ private PathAlterationObserverScheduler pathAlterationDetector;
+ private PathAlterationObserver observer;
+ private Path flowGraphPath;
+
+ private static final Config DEFAULT_FALLBACK =
ConfigFactory.parseMap(ImmutableMap.<String, Object>builder()
+ .put(ConfigurationKeys.FLOWGRAPH_REPO_DIR,
DEFAULT_FS_FLOWGRAPH_MONITOR_REPO_DIR)
+ .put(ConfigurationKeys.FLOWGRAPH_BASE_DIR,
DEFAULT_FS_FLOWGRAPH_MONITOR_FLOWGRAPH_DIR)
+ .put(ConfigurationKeys.FLOWGRAPH_POLLING_INTERVAL,
DEFAULT_FLOWGRAPH_POLLING_INTERVAL)
+ .put(ConfigurationKeys.JAVA_PROPS_EXTENSIONS,
ConfigurationKeys.DEFAULT_PROPERTIES_EXTENSIONS)
+ .put(ConfigurationKeys.HOCON_FILE_EXTENSIONS,
ConfigurationKeys.DEFAULT_CONF_EXTENSIONS)
+ .build());
+
+ public FsFlowGraphMonitor(Config config, Optional<? extends
FSFlowTemplateCatalog> flowTemplateCatalog,
+ FlowGraph graph, Map<URI, TopologySpec> topologySpecMap, CountDownLatch
initComplete) throws IOException {
+ Config configWithFallbacks =
config.getConfig(FS_FLOWGRAPH_MONITOR_PREFIX).withFallback(DEFAULT_FALLBACK);
+ this.pollingInterval =
TimeUnit.SECONDS.toMillis(configWithFallbacks.getLong(ConfigurationKeys.FLOWGRAPH_POLLING_INTERVAL));
+ this.flowGraphPath = new
Path(configWithFallbacks.getString(ConfigurationKeys.FLOWGRAPH_REPO_DIR));
+ this.observer = new PathAlterationObserver(flowGraphPath);
Review Comment:
yes, probably better to create config keys specific to this
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/FSPathAlterationFlowGraphListener.java:
##########
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.flowgraph;
+
+import java.io.File;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.hadoop.fs.Path;
+
+import com.google.common.base.Optional;
+
+import lombok.extern.slf4j.Slf4j;
+
+import
org.apache.gobblin.service.modules.template_catalog.FSFlowTemplateCatalog;
+import org.apache.gobblin.util.filesystem.PathAlterationListener;
+import org.apache.gobblin.util.filesystem.PathAlterationObserver;
+
+
+/**
+ * An implementation of {@link PathAlterationListener} to listen for changes
in a directory and apply it to a GaaS FlowGraph
+ * Is invoked by {@link PathAlterationObserver} which would check a folder and
perform recursive comparisons on files compared to
+ * their last polled state. On any detected differences in files when a check
is done, the {@link FlowGraph} will be updated.
+ *
+ * Unlike the {@link GitFlowGraphListener}, this class will reload the entire
flowgraph on any detected change, instead loading only the diffs.
Review Comment:
you may mean "instead *of* loading only the diffs"
##########
gobblin-utility/src/main/java/org/apache/gobblin/util/filesystem/PathAlterationObserver.java:
##########
@@ -43,9 +43,9 @@ public class PathAlterationObserver {
private final PathFilter pathFilter;
private final Comparator<Path> comparator;
private final FileSystem fs;
-
private final Path[] EMPTY_PATH_ARRAY = new Path[0];
+ private boolean changeApplied = false;
Review Comment:
that's true: if all the others are called only within `checkThenNotify`,
which is `synchronized`, then it's not necessary to synchronize them in
addition. I hadn't grasped earlier whether that's the case.
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowGraphHelper.java:
##########
@@ -277,32 +282,64 @@ private List<SpecExecutor> getSpecExecutors(Config
edgeConfig) throws URISyntaxE
* @return the configuration object
* @throws IOException
*/
- protected Config loadNodeFileWithOverrides(Path filePath) throws IOException
{
+ protected Config loadNodeFileWithOverrides(Path filePath)
+ throws IOException {
Config nodeConfig = this.pullFileLoader.loadPullFile(filePath,
emptyConfig, false, false);
return getNodeConfigWithOverrides(nodeConfig, filePath);
}
-
/**
* Load the edge file.
* @param filePath path of the edge file relative to the repository root
* @return the configuration object
* @throws IOException
*/
- protected Config loadEdgeFileWithOverrides(Path filePath) throws IOException
{
+ protected Config loadEdgeFileWithOverrides(Path filePath)
+ throws IOException {
Config edgeConfig = this.pullFileLoader.loadPullFile(filePath,
emptyConfig, false, false);
return getEdgeConfigWithOverrides(edgeConfig, filePath);
}
+ /**
+ * Loads the entire flowgraph from the path configured in {@link
org.apache.gobblin.configuration.ConfigurationKeys.FLOWGRAPH_BASE_DIR }
+ * Expects nodes to be in the format of /flowGraphName/nodeA/nodeA.properties
+ * Expects edges to be in the format of
/flowGraphName/nodeA/nodeB/edgeAB.properties
+ * The current flowgraph will be swapped atomically with the new flowgraph
that is loaded
+ */
+ public void populateFlowGraphAtomically(AtomicReference<FlowGraph>
flowGraphReference) {
Review Comment:
also, I suggested earlier to retool the abstraction to avoid wrapping the
`FlowGraph` method invocations. seeing this method, I finally appreciate that
it is the core abstraction--basically you've got a flow graph
initializer/loader/factory. I was thrown off earlier by the other methods
being `public`, when they really seem subordinate to this one.
if you made them protected or private, renamed this class, and placed this
method first, I don't find much need to rework the abstraction.
that said, this method seems very tied to the FS impl, and not relevant to
the Git one. if so, that might be an argument for the thinner abstraction of a
"path-based config loader" shared between both. this functionality here would
be FS-specific, so not within the same class.
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/GitFlowGraphListener.java:
##########
@@ -53,10 +47,10 @@ public GitFlowGraphListener(Optional<? extends
FSFlowTemplateCatalog> flowTempla
@Override
public void addChange(DiffEntry change) {
Path path = new Path(change.getNewPath());
- if (path.depth() == NODE_FILE_DEPTH) {
- addDataNode(change.getNewPath());
- } else if (path.depth() == EDGE_FILE_DEPTH) {
- addFlowEdge(change.getNewPath());
+ if (path.depth() == BaseFlowGraphHelper.NODE_FILE_DEPTH) {
+ this.baseFlowGraphHelper.addDataNode(this.flowGraph.get(),
change.getNewPath());
Review Comment:
the difficulty here is that the java collections lib has such weak support
for immutable collections, specifically for sharing common parts between
instances (consider instead the purely functional data structures of scala or
clojure). what this means is that the copy-on-write is more expensive...
still, the approach nonetheless works, just "higher tax".
really we have two choices:
a. keep the flow graph itself immutable and allow it to be cloned into a
builder. once all adjustments are made, the builder's `.build()` renders a new
immutable instance.
b. have a scratch flow graph not directly used by the MHFC, that is mutable.
its 'add' and 'remove' methods would be `synchronzied`. additionally have a
`synchronized` `createSnapshot()` method that creates a copy of the current
state, which is an immutable flow graph.
Issue Time Tracking
-------------------
Worklog Id: (was: 811077)
Time Spent: 4h (was: 3h 50m)
> Build a file-based flowgraph that watches for changes and updates
> -----------------------------------------------------------------
>
> Key: GOBBLIN-1696
> URL: https://issues.apache.org/jira/browse/GOBBLIN-1696
> Project: Apache Gobblin
> Issue Type: New Feature
> Components: gobblin-service
> Reporter: William Lo
> Assignee: Abhishek Tiwari
> Priority: Major
> Time Spent: 4h
> Remaining Estimate: 0h
>
> Gobblin-as-a-Service only has a Git based flowgraph, which is difficult to
> build CI/CD around. We can provide an alternate flowgraph that is just based
> off files. This flowgraph should update atomically.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)