[
https://issues.apache.org/jira/browse/GOBBLIN-1696?focusedWorklogId=808479&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-808479
]
ASF GitHub Bot logged work on GOBBLIN-1696:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 14/Sep/22 00:00
Start Date: 14/Sep/22 00:00
Worklog Time Spent: 10m
Work Description: Will-Lo commented on code in PR #3548:
URL: https://github.com/apache/gobblin/pull/3548#discussion_r970178249
##########
gobblin-service/src/test/java/org/apache/gobblin/service/monitoring/FsFlowGraphMonitorTest.java:
##########
@@ -0,0 +1,310 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.monitoring;
+
+import com.google.common.base.Charsets;
+import com.google.common.base.Joiner;
+import com.google.common.base.Optional;
+import com.google.common.io.Files;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import org.apache.commons.io.FileUtils;
+import org.apache.gobblin.config.ConfigBuilder;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.runtime.api.TopologySpec;
+import org.apache.gobblin.service.ServiceConfigKeys;
+import org.apache.gobblin.service.modules.flow.MultiHopFlowCompilerTest;
+import org.apache.gobblin.service.modules.flowgraph.BaseFlowGraph;
+import org.apache.gobblin.service.modules.flowgraph.DataNode;
+import org.apache.gobblin.service.modules.flowgraph.FlowEdge;
+import org.apache.gobblin.service.modules.flowgraph.FlowGraphConfigurationKeys;
+import
org.apache.gobblin.service.modules.template_catalog.FSFlowTemplateCatalog;
+import org.apache.hadoop.fs.Path;
+import org.eclipse.jgit.transport.RefSpec;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+
+public class FsFlowGraphMonitorTest {
+ private static final Logger logger =
LoggerFactory.getLogger(GitFlowGraphMonitor.class);
+ private final File TEST_DIR = new File(FileUtils.getTempDirectory(),
"fsFlowGraphTestDir");
+ private final File flowGraphDir = new File(TEST_DIR, "/gobblin-flowgraph");
+ private static final String NODE_1_FILE = "node1.properties";
+ private final File node1Dir = new File(FileUtils.getTempDirectory(),
"node1");
+ private final File node1File = new File(node1Dir, NODE_1_FILE);
+ private static final String NODE_2_FILE = "node2.properties";
+ private final File node2Dir = new File(FileUtils.getTempDirectory(),
"node2");
+ private final File node2File = new File(node2Dir, NODE_2_FILE);
+ private final File edge1Dir = new File(node1Dir, "node2");
+ private final File edge1File = new File(edge1Dir, "edge1.properties");
+
+ private RefSpec masterRefSpec = new RefSpec("master");
+ private Optional<FSFlowTemplateCatalog> flowCatalog;
+ private Config config;
+ private BaseFlowGraph flowGraph;
+ private FsFlowGraphMonitor flowGraphMonitor;
+ private Map<URI, TopologySpec> topologySpecMap;
+
+ @BeforeClass
+ public void setUp() throws Exception {
+ cleanUpDir(TEST_DIR.toString());
+
+
+ URI topologyCatalogUri =
this.getClass().getClassLoader().getResource("topologyspec_catalog").toURI();
+ this.topologySpecMap =
MultiHopFlowCompilerTest.buildTopologySpecMap(topologyCatalogUri);
+
+ this.config = ConfigBuilder.create()
+ .addPrimitive(FsFlowGraphMonitor.FS_FLOWGRAPH_MONITOR_PREFIX + "."
+ + ConfigurationKeys.FLOWGRAPH_REPO_DIR,
this.flowGraphDir.getAbsolutePath())
+ .addPrimitive(FsFlowGraphMonitor.FS_FLOWGRAPH_MONITOR_PREFIX + "." +
ConfigurationKeys.FLOWGRAPH_BASE_DIR, "gobblin-flowgraph")
+ .addPrimitive(FsFlowGraphMonitor.FS_FLOWGRAPH_MONITOR_PREFIX + "." +
ConfigurationKeys.FLOWGRAPH_POLLING_INTERVAL, 2)
+ .build();
+
+ // Create a FSFlowTemplateCatalog instance
+ URI flowTemplateCatalogUri =
this.getClass().getClassLoader().getResource("template_catalog").toURI();
+ Properties properties = new Properties();
+ this.flowGraphDir.mkdirs();
+
properties.put(ServiceConfigKeys.TEMPLATE_CATALOGS_FULLY_QUALIFIED_PATH_KEY,
flowTemplateCatalogUri.toString());
+ Config config = ConfigFactory.parseProperties(properties);
+ Config templateCatalogCfg = config
+ .withValue(ConfigurationKeys.JOB_CONFIG_FILE_GENERAL_PATH_KEY,
+
config.getValue(ServiceConfigKeys.TEMPLATE_CATALOGS_FULLY_QUALIFIED_PATH_KEY));
+ this.flowCatalog = Optional.of(new
FSFlowTemplateCatalog(templateCatalogCfg));
+
+ //Create a FlowGraph instance with defaults
+ this.flowGraph = new BaseFlowGraph();
+
+ this.flowGraphMonitor = new FsFlowGraphMonitor(this.config,
this.flowCatalog, this.flowGraph, topologySpecMap, new CountDownLatch(1));
+ this.flowGraphMonitor.startUp();
+ this.flowGraphMonitor.setActive(true);
+ }
+
+ @Test
+ public void testAddNode() throws Exception {
+ String file1Contents = FlowGraphConfigurationKeys.DATA_NODE_IS_ACTIVE_KEY
+ "=true\nparam1=value1\n";
+ String file2Contents = FlowGraphConfigurationKeys.DATA_NODE_IS_ACTIVE_KEY
+ "=true\nparam2=value2\n";
+
+ addNode(this.node1Dir, this.node1File, file1Contents);
+ addNode(this.node2Dir, this.node2File, file2Contents);
+
+ // Let the monitor pick up the nodes that were recently added
+ Thread.sleep(3000);
+ for (int i = 0; i < 1; i++) {
Review Comment:
Ah shoot I think I set the loop differently to test something else but
forgot to fix, thanks for the callout.
Issue Time Tracking
-------------------
Worklog Id: (was: 808479)
Time Spent: 1h 50m (was: 1h 40m)
> Build a file-based flowgraph that watches for changes and updates
> -----------------------------------------------------------------
>
> Key: GOBBLIN-1696
> URL: https://issues.apache.org/jira/browse/GOBBLIN-1696
> Project: Apache Gobblin
> Issue Type: New Feature
> Components: gobblin-service
> Reporter: William Lo
> Assignee: Abhishek Tiwari
> Priority: Major
> Time Spent: 1h 50m
> Remaining Estimate: 0h
>
> Gobblin-as-a-Service only has a Git based flowgraph, which is difficult to
> build CI/CD around. We can provide an alternate flowgraph that is just based
> off files. This flowgraph should update atomically.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)