[ 
https://issues.apache.org/jira/browse/GOBBLIN-1696?focusedWorklogId=808479&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-808479
 ]

ASF GitHub Bot logged work on GOBBLIN-1696:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 14/Sep/22 00:00
            Start Date: 14/Sep/22 00:00
    Worklog Time Spent: 10m 
      Work Description: Will-Lo commented on code in PR #3548:
URL: https://github.com/apache/gobblin/pull/3548#discussion_r970178249


##########
gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/FsFlowGraphMonitorTest.java:
##########
@@ -0,0 +1,310 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.monitoring;
+
+import com.google.common.base.Charsets;
+import com.google.common.base.Joiner;
+import com.google.common.base.Optional;
+import com.google.common.io.Files;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import org.apache.commons.io.FileUtils;
+import org.apache.gobblin.config.ConfigBuilder;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.runtime.api.TopologySpec;
+import org.apache.gobblin.service.ServiceConfigKeys;
+import org.apache.gobblin.service.modules.flow.MultiHopFlowCompilerTest;
+import org.apache.gobblin.service.modules.flowgraph.BaseFlowGraph;
+import org.apache.gobblin.service.modules.flowgraph.DataNode;
+import org.apache.gobblin.service.modules.flowgraph.FlowEdge;
+import org.apache.gobblin.service.modules.flowgraph.FlowGraphConfigurationKeys;
+import 
org.apache.gobblin.service.modules.template_catalog.FSFlowTemplateCatalog;
+import org.apache.hadoop.fs.Path;
+import org.eclipse.jgit.transport.RefSpec;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+
+public class FsFlowGraphMonitorTest {
+  private static final Logger logger = 
LoggerFactory.getLogger(GitFlowGraphMonitor.class);
+  private final File TEST_DIR = new File(FileUtils.getTempDirectory(), 
"fsFlowGraphTestDir");
+  private final File flowGraphDir = new File(TEST_DIR, "/gobblin-flowgraph");
+  private static final String NODE_1_FILE = "node1.properties";
+  private final File node1Dir = new File(FileUtils.getTempDirectory(), 
"node1");
+  private final File node1File = new File(node1Dir, NODE_1_FILE);
+  private static final String NODE_2_FILE = "node2.properties";
+  private final File node2Dir = new File(FileUtils.getTempDirectory(), 
"node2");
+  private final File node2File = new File(node2Dir, NODE_2_FILE);
+  private final File edge1Dir = new File(node1Dir, "node2");
+  private final File edge1File = new File(edge1Dir, "edge1.properties");
+
+  private RefSpec masterRefSpec = new RefSpec("master");
+  private Optional<FSFlowTemplateCatalog> flowCatalog;
+  private Config config;
+  private BaseFlowGraph flowGraph;
+  private FsFlowGraphMonitor flowGraphMonitor;
+  private Map<URI, TopologySpec> topologySpecMap;
+
+  @BeforeClass
+  public void setUp() throws Exception {
+    cleanUpDir(TEST_DIR.toString());
+
+
+    URI topologyCatalogUri = 
this.getClass().getClassLoader().getResource("topologyspec_catalog").toURI();
+    this.topologySpecMap = 
MultiHopFlowCompilerTest.buildTopologySpecMap(topologyCatalogUri);
+
+    this.config = ConfigBuilder.create()
+        .addPrimitive(FsFlowGraphMonitor.FS_FLOWGRAPH_MONITOR_PREFIX + "."
+            + ConfigurationKeys.FLOWGRAPH_REPO_DIR, 
this.flowGraphDir.getAbsolutePath())
+        .addPrimitive(FsFlowGraphMonitor.FS_FLOWGRAPH_MONITOR_PREFIX + "." + 
ConfigurationKeys.FLOWGRAPH_BASE_DIR, "gobblin-flowgraph")
+        .addPrimitive(FsFlowGraphMonitor.FS_FLOWGRAPH_MONITOR_PREFIX + "." + 
ConfigurationKeys.FLOWGRAPH_POLLING_INTERVAL, 2)
+        .build();
+
+    // Create a FSFlowTemplateCatalog instance
+    URI flowTemplateCatalogUri = 
this.getClass().getClassLoader().getResource("template_catalog").toURI();
+    Properties properties = new Properties();
+    this.flowGraphDir.mkdirs();
+    
properties.put(ServiceConfigKeys.TEMPLATE_CATALOGS_FULLY_QUALIFIED_PATH_KEY, 
flowTemplateCatalogUri.toString());
+    Config config = ConfigFactory.parseProperties(properties);
+    Config templateCatalogCfg = config
+        .withValue(ConfigurationKeys.JOB_CONFIG_FILE_GENERAL_PATH_KEY,
+            
config.getValue(ServiceConfigKeys.TEMPLATE_CATALOGS_FULLY_QUALIFIED_PATH_KEY));
+    this.flowCatalog = Optional.of(new 
FSFlowTemplateCatalog(templateCatalogCfg));
+
+    //Create a FlowGraph instance with defaults
+    this.flowGraph = new BaseFlowGraph();
+
+    this.flowGraphMonitor = new FsFlowGraphMonitor(this.config, 
this.flowCatalog, this.flowGraph, topologySpecMap, new CountDownLatch(1));
+    this.flowGraphMonitor.startUp();
+    this.flowGraphMonitor.setActive(true);
+  }
+
+  @Test
+  public void testAddNode() throws Exception {
+    String file1Contents = FlowGraphConfigurationKeys.DATA_NODE_IS_ACTIVE_KEY 
+ "=true\nparam1=value1\n";
+    String file2Contents = FlowGraphConfigurationKeys.DATA_NODE_IS_ACTIVE_KEY 
+ "=true\nparam2=value2\n";
+
+    addNode(this.node1Dir, this.node1File, file1Contents);
+    addNode(this.node2Dir, this.node2File, file2Contents);
+
+    // Let the monitor pick up the nodes that were recently added
+    Thread.sleep(3000);
+    for (int i = 0; i < 1; i++) {

Review Comment:
   Ah shoot I think I set the loop differently to test something else but 
forgot to fix, thanks for the callout.





Issue Time Tracking
-------------------

    Worklog Id:     (was: 808479)
    Time Spent: 1h 50m  (was: 1h 40m)

> Build a file-based flowgraph that watches for changes and updates
> -----------------------------------------------------------------
>
>                 Key: GOBBLIN-1696
>                 URL: https://issues.apache.org/jira/browse/GOBBLIN-1696
>             Project: Apache Gobblin
>          Issue Type: New Feature
>          Components: gobblin-service
>            Reporter: William Lo
>            Assignee: Abhishek Tiwari
>            Priority: Major
>          Time Spent: 1h 50m
>  Remaining Estimate: 0h
>
> Gobblin-as-a-Service only has a Git based flowgraph, which is difficult to 
> build CI/CD around. We can provide an alternate flowgraph that is just based 
> off files. This flowgraph should update atomically.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to