[GOBBLIN-505] Implement a Git-based FlowGraph Monitor. Closes #2382 from sv2000/gitGraphMonitor
Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/6b120185 Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/6b120185 Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/6b120185 Branch: refs/heads/master Commit: 6b1201852aa71b904da5f8a49a550e8596fe59fc Parents: 2c5e25d Author: suvasude <[email protected]> Authored: Mon Jun 11 14:23:21 2018 -0700 Committer: Hung Tran <[email protected]> Committed: Mon Jun 11 14:23:21 2018 -0700 ---------------------------------------------------------------------- .../configuration/ConfigurationKeys.java | 25 +- .../service/modules/core/GitConfigMonitor.java | 325 ++---------------- .../modules/core/GitFlowGraphMonitor.java | 309 ++++++++++++++++++ .../modules/core/GitMonitoringService.java | 326 +++++++++++++++++++ .../service/modules/flowgraph/BaseFlowEdge.java | 51 +-- .../modules/flowgraph/FlowEdgeFactory.java | 13 +- .../flowgraph/FlowGraphConfigurationKeys.java | 7 +- .../modules/core/GitConfigMonitorTest.java | 21 +- .../modules/core/GobblinServiceManagerTest.java | 14 +- .../modules/flowgraph/BaseFlowGraphTest.java | 46 +-- .../modules/core/GitFlowGraphMonitorTest.java | 314 ++++++++++++++++++ 11 files changed, 1059 insertions(+), 392 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/6b120185/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java ---------------------------------------------------------------------- diff --git a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java index b6196da..50e6020 100644 --- a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java +++ b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java @@ -103,7 +103,7 @@ public class ConfigurationKeys { public static final String JOB_CONFIG_FILE_DIR_KEY = "jobconf.dir"; // Path where all job configuration files stored - public static final String JOB_CONFIG_FILE_GENERAL_PATH_KEY = "jobconf.fullyQualifiedPath" ; + public static final String JOB_CONFIG_FILE_GENERAL_PATH_KEY = "jobconf.fullyQualifiedPath"; // Job configuration file extensions public static final String JOB_CONFIG_FILE_EXTENSIONS_KEY = "jobconf.extensions"; public static final String DEFAULT_JOB_CONFIG_FILE_EXTENSIONS = "pull,job"; @@ -208,13 +208,13 @@ public class ConfigurationKeys { public static final long DEFAULT_QUEUED_TASK_TIME_MAX_AGE = TimeUnit.HOURS.toMillis(1); /** Optional, for user to specified which template to use, inside .job file */ - public static final String JOB_TEMPLATE_PATH = "job.template" ; + public static final String JOB_TEMPLATE_PATH = "job.template"; /** * Configuration property used only for job configuration file's tempalte, inside .template file */ public static final String REQUIRED_ATRRIBUTES_LIST = "gobblin.template.required_attributes"; - public static final String JOB_DEPENDENCIES="dependencies"; + public static final String JOB_DEPENDENCIES = "dependencies"; /** * Configuration for emitting job events @@ -250,7 +250,7 @@ public class ConfigurationKeys { */ // This property is used to specify the URN of a dataset a job or WorkUnit extracts data for public static final String DATASET_URN_KEY = "dataset.urn"; - public static final String GLOBAL_WATERMARK_DATASET_URN="__globalDatasetWatermark"; + public static final String GLOBAL_WATERMARK_DATASET_URN = "__globalDatasetWatermark"; public static final String DEFAULT_DATASET_URN = ""; /** @@ -900,16 +900,11 @@ public class ConfigurationKeys { public static final String CONFIG_BASED_PREFIX = "gobblin.configBased"; /** - * Configuration related to the git flow config monitoring service + * Configuration related to the Git based monitoring service */ - public static final String GIT_CONFIG_MONITOR_PREFIX = "gitConfigMonitor."; - public static final String GIT_CONFIG_MONITOR_REPO_URI = GIT_CONFIG_MONITOR_PREFIX + "repositoryUri"; - public static final String GIT_CONFIG_MONITOR_REPO_DIR = GIT_CONFIG_MONITOR_PREFIX + "repositoryDirectory"; - public static final String DEFAULT_GIT_CONFIG_MONITOR_REPO_DIR = "git-flow-config"; - public static final String GIT_CONFIG_MONITOR_CONFIG_DIR = GIT_CONFIG_MONITOR_PREFIX + "configDirectory"; - public static final String DEFAULT_GIT_CONFIG_MONITOR_CONFIG_DIR = "gobblin-config"; - public static final String GIT_CONFIG_MONITOR_POLLING_INTERVAL = GIT_CONFIG_MONITOR_PREFIX + "pollingInterval"; - public static final String GIT_CONFIG_MONITOR_BRANCH_NAME = GIT_CONFIG_MONITOR_PREFIX + "branchName"; - public static final String DEFAULT_GIT_CONFIG_MONITOR_BRANCH_NAME = "master"; - public static final int DEFAULT_GIT_CONFIG_MONITOR_POLLING_INTERVAL = 60; + public static final String GIT_MONITOR_REPO_URI = "repositoryUri"; + public static final String GIT_MONITOR_REPO_DIR = "repositoryDirectory"; + public static final String GIT_MONITOR_CONFIG_BASE_DIR = "configBaseDirectory"; + public static final String GIT_MONITOR_POLLING_INTERVAL = "pollingInterval"; + public static final String GIT_MONITOR_BRANCH_NAME = "branchName"; } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/6b120185/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitConfigMonitor.java ---------------------------------------------------------------------- diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitConfigMonitor.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitConfigMonitor.java index 09d7bb4..9fd0555 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitConfigMonitor.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitConfigMonitor.java @@ -16,34 +16,13 @@ */ package org.apache.gobblin.service.modules.core; -import java.io.File; -import java.io.FileNotFoundException; import java.io.IOException; -import java.net.URI; -import java.util.List; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.eclipse.jgit.api.Git; -import org.eclipse.jgit.api.ResetCommand; -import org.eclipse.jgit.api.errors.GitAPIException; import org.eclipse.jgit.diff.DiffEntry; -import org.eclipse.jgit.errors.RepositoryNotFoundException; -import org.eclipse.jgit.lib.ObjectId; -import org.eclipse.jgit.lib.ObjectReader; -import org.eclipse.jgit.revwalk.RevCommit; -import org.eclipse.jgit.treewalk.CanonicalTreeParser; -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Charsets; -import com.google.common.base.Optional; -import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableMap; import com.google.common.io.Files; -import com.google.common.util.concurrent.AbstractIdleService; import com.typesafe.config.Config; import com.typesafe.config.ConfigFactory; import com.typesafe.config.ConfigValueFactory; @@ -55,11 +34,8 @@ import org.apache.gobblin.configuration.ConfigurationKeys; import org.apache.gobblin.runtime.api.FlowSpec; import org.apache.gobblin.runtime.spec_catalog.FlowCatalog; import org.apache.gobblin.runtime.spec_store.FSSpecStore; -import org.apache.gobblin.util.ConfigUtils; -import org.apache.gobblin.util.ExecutorsUtils; import org.apache.gobblin.util.PullFileLoader; - /** * Service that monitors for jobs from a git repository. * The git repository must have an inital commit that has no config files since that is used as a base for getting @@ -69,148 +45,54 @@ import org.apache.gobblin.util.PullFileLoader; * The <flowGroup> and <flowName> is used to generate the URI used to store the config in the {@link FlowCatalog} */ @Slf4j -public class GitConfigMonitor extends AbstractIdleService { +public class GitConfigMonitor extends GitMonitoringService { + public static final String GIT_CONFIG_MONITOR_PREFIX = "gitConfigMonitor"; + private static final String SPEC_DESCRIPTION = "Git-based flow config"; private static final String SPEC_VERSION = FlowSpec.Builder.DEFAULT_VERSION; - private static final int TERMINATION_TIMEOUT = 30; + private static final String PROPERTIES_EXTENSIONS = "pull,job"; + private static final String CONF_EXTENSIONS = "json,conf"; + private static final String DEFAULT_GIT_CONFIG_MONITOR_REPO_DIR = "git-flow-config"; + private static final String DEFAULT_GIT_CONFIG_MONITOR_CONFIG_DIR = "gobblin-config"; + private static final String DEFAULT_GIT_CONFIG_MONITOR_BRANCH_NAME = "master"; + private static final int CONFIG_FILE_DEPTH = 3; - private static final String REMOTE_NAME = "origin"; + private static final int DEFAULT_GIT_CONFIG_MONITOR_POLLING_INTERVAL = 60; + + private static final Config DEFAULT_FALLBACK = + ConfigFactory.parseMap(ImmutableMap.<String, Object>builder() + .put(ConfigurationKeys.GIT_MONITOR_REPO_DIR, DEFAULT_GIT_CONFIG_MONITOR_REPO_DIR) + .put(ConfigurationKeys.GIT_MONITOR_CONFIG_BASE_DIR, DEFAULT_GIT_CONFIG_MONITOR_CONFIG_DIR) + .put(ConfigurationKeys.GIT_MONITOR_BRANCH_NAME, DEFAULT_GIT_CONFIG_MONITOR_BRANCH_NAME) + .put(ConfigurationKeys.GIT_MONITOR_POLLING_INTERVAL, DEFAULT_GIT_CONFIG_MONITOR_POLLING_INTERVAL) + .put(GitMonitoringService.JAVA_PROPS_EXTENSIONS, PROPERTIES_EXTENSIONS) + .put(GitMonitoringService.HOCON_FILE_EXTENSIONS, CONF_EXTENSIONS) + .build()); - private final ScheduledExecutorService scheduledExecutor; - private final GitRepository gitRepo; - private final int pollingInterval; - private final String repositoryDir; - private final String configDir; - private final Path configDirPath; private final FlowCatalog flowCatalog; - private final PullFileLoader pullFileLoader; private final Config emptyConfig = ConfigFactory.empty(); - private volatile boolean isActive = false; - /** - * Create a {@link GitConfigMonitor} that monitors a git repository for changes and manages config in a - * {@link FlowCatalog} - * @param config configuration - * @param flowCatalog the flow catalog - */ GitConfigMonitor(Config config, FlowCatalog flowCatalog) { + super(config.getConfig(GIT_CONFIG_MONITOR_PREFIX).withFallback(DEFAULT_FALLBACK)); this.flowCatalog = flowCatalog; - - this.scheduledExecutor = Executors.newSingleThreadScheduledExecutor( - ExecutorsUtils.newThreadFactory(Optional.of(log), Optional.of("FetchGitConfExecutor"))); - - Preconditions.checkArgument(config.hasPath(ConfigurationKeys.GIT_CONFIG_MONITOR_REPO_URI), - ConfigurationKeys.GIT_CONFIG_MONITOR_REPO_URI + " needs to be specified."); - - String repositoryUri = config.getString(ConfigurationKeys.GIT_CONFIG_MONITOR_REPO_URI); - - this.repositoryDir = ConfigUtils.getString(config, ConfigurationKeys.GIT_CONFIG_MONITOR_REPO_DIR, - ConfigurationKeys.DEFAULT_GIT_CONFIG_MONITOR_REPO_DIR); - - this.configDir = ConfigUtils.getString(config, ConfigurationKeys.GIT_CONFIG_MONITOR_CONFIG_DIR, - ConfigurationKeys.DEFAULT_GIT_CONFIG_MONITOR_CONFIG_DIR); - - this.pollingInterval = ConfigUtils.getInt(config, ConfigurationKeys.GIT_CONFIG_MONITOR_POLLING_INTERVAL, - ConfigurationKeys.DEFAULT_GIT_CONFIG_MONITOR_POLLING_INTERVAL); - - String branchName = ConfigUtils.getString(config, ConfigurationKeys.GIT_CONFIG_MONITOR_BRANCH_NAME, - ConfigurationKeys.DEFAULT_GIT_CONFIG_MONITOR_BRANCH_NAME); - - this.configDirPath = new Path(this.repositoryDir, this.configDir); - - try { - this.pullFileLoader = new PullFileLoader(this.configDirPath, - FileSystem.get(URI.create(ConfigurationKeys.LOCAL_FS_URI), new Configuration()), - PullFileLoader.DEFAULT_JAVA_PROPS_PULL_FILE_EXTENSIONS, PullFileLoader.DEFAULT_HOCON_PULL_FILE_EXTENSIONS); - } catch (IOException e) { - throw new RuntimeException("Could not create pull file loader", e); - } - - try { - this.gitRepo = new GitRepository(repositoryUri, this.repositoryDir, branchName); - } catch (GitAPIException | IOException e) { - throw new RuntimeException("Could not open git repository", e); - } - } - - /** Start the service. */ - @Override - protected void startUp() throws Exception { - log.info("Starting the " + GitConfigMonitor.class.getSimpleName()); - log.info("Polling git with inteval {} ", this.pollingInterval); - - // Schedule the job config fetch task - this.scheduledExecutor.scheduleAtFixedRate(new Runnable() { - @Override - public void run() { - try { - processGitConfigChanges(); - } catch (GitAPIException | IOException e) { - log.error("Failed to process git config changes", e); - // next run will try again since errors could be intermittent - } - } - }, 0, this.pollingInterval, TimeUnit.SECONDS); } - /** Stop the service. */ @Override - protected void shutDown() throws Exception { - this.scheduledExecutor.shutdown(); - this.scheduledExecutor.awaitTermination(TERMINATION_TIMEOUT, TimeUnit.SECONDS); - } - - public synchronized void setActive(boolean isActive) { - if (this.isActive == isActive) { - // No-op if already in correct state - return; - } - - this.isActive = isActive; - } - - /** - * Fetch the list of changes since the last refresh of the repository and apply the changes to the {@link FlowCatalog} - * @throws GitAPIException - * @throws IOException - */ - @VisibleForTesting - void processGitConfigChanges() throws GitAPIException, IOException { + public boolean shouldPollGit() { // if not active or if the flow catalog is not up yet then can't process config changes if (!isActive || !this.flowCatalog.isRunning()) { - log.info("GitConfigMonitor: skip poll since the JobCatalog is not yet running."); - return; - } - - List<DiffEntry> changes = this.gitRepo.getChanges(); - - for (DiffEntry change : changes) { - switch (change.getChangeType()) { - case ADD: - case MODIFY: - addSpec(change); - break; - case DELETE: - removeSpec(change); - break; - case RENAME: - removeSpec(change); - addSpec(change); - break; - default: - throw new RuntimeException("Unsupported change type " + change.getChangeType()); - } + log.warn("GitConfigMonitor: skip poll since the JobCatalog is not yet running. isActive = {}", this.isActive); + return false; } - - // Done processing changes, so checkpoint - this.gitRepo.moveCheckpointAndHashesForward(); + return true; } /** * Add a {@link FlowSpec} for an added, updated, or modified flow config * @param change */ - private void addSpec(DiffEntry change) { + @Override + public void addChange(DiffEntry change) { if (checkConfigFilePath(change.getNewPath())) { Path configFilePath = new Path(this.repositoryDir, change.getNewPath()); @@ -232,7 +114,8 @@ public class GitConfigMonitor extends AbstractIdleService { * remove a {@link FlowSpec} for a deleted or renamed flow config * @param change */ - private void removeSpec(DiffEntry change) { + @Override + public void removeChange(DiffEntry change) { if (checkConfigFilePath(change.getOldPath())) { Path configFilePath = new Path(this.repositoryDir, change.getOldPath()); String flowName = FSSpecStore.getSpecName(configFilePath); @@ -265,10 +148,10 @@ public class GitConfigMonitor extends AbstractIdleService { Path configFile = new Path(configFilePath); String fileExtension = Files.getFileExtension(configFile.getName()); - if (configFile.depth() != CONFIG_FILE_DEPTH || - !configFile.getParent().getParent().getName().equals(configDir) || - !(PullFileLoader.DEFAULT_JAVA_PROPS_PULL_FILE_EXTENSIONS.contains(fileExtension) || - PullFileLoader.DEFAULT_JAVA_PROPS_PULL_FILE_EXTENSIONS.contains(fileExtension))) { + if (configFile.depth() != CONFIG_FILE_DEPTH + || !configFile.getParent().getParent().getName().equals(folderName) + || !(PullFileLoader.DEFAULT_JAVA_PROPS_PULL_FILE_EXTENSIONS.contains(fileExtension) + || PullFileLoader.DEFAULT_JAVA_PROPS_PULL_FILE_EXTENSIONS.contains(fileExtension))) { log.warn("Changed file does not conform to directory structure and file name format, skipping: " + configFilePath); @@ -292,144 +175,4 @@ public class GitConfigMonitor extends AbstractIdleService { return flowConfig.withValue(ConfigurationKeys.FLOW_NAME_KEY, ConfigValueFactory.fromAnyRef(flowName)) .withValue(ConfigurationKeys.FLOW_GROUP_KEY, ConfigValueFactory.fromAnyRef(flowGroup)); } - - /** - * Class for managing a git repository - */ - private static class GitRepository { - private final static String CHECKPOINT_FILE = "checkpoint.txt"; - private final static String CHECKPOINT_FILE_TMP = "checkpoint.tmp"; - private final String repoUri; - private final String repoDir; - private final String branchName; - private Git git; - private String lastProcessedGitHash; - private String latestGitHash; - - /** - * Create an object to manage the git repository stored locally at repoDir with a repository URI of repoDir - * @param repoUri URI of repository - * @param repoDir Directory to hold the local copy of the repository - * @param branchName Branch name - * @throws GitAPIException - * @throws IOException - */ - private GitRepository(String repoUri, String repoDir, String branchName) throws GitAPIException, IOException { - this.repoUri = repoUri; - this.repoDir = repoDir; - this.branchName = branchName; - - initRepository(); - } - - /** - * Open the repository if it exists locally, otherwise clone it - * @throws GitAPIException - * @throws IOException - */ - private void initRepository() throws GitAPIException, IOException { - File repoDirFile = new File(this.repoDir); - - try { - this.git = Git.open(repoDirFile); - - String uri = this.git.getRepository().getConfig().getString("remote", REMOTE_NAME, "url"); - - if (!uri.equals(this.repoUri)) { - throw new RuntimeException("Repo at " + this.repoDir + " has uri " + uri + " instead of " + this.repoUri); - } - } catch (RepositoryNotFoundException e) { - // if the repository was not found then clone a new one - this.git = Git.cloneRepository() - .setDirectory(repoDirFile) - .setURI(this.repoUri) - .setBranch(this.branchName) - .call(); - } - - try { - this.lastProcessedGitHash = readCheckpoint(); - } catch (FileNotFoundException e) { - // if no checkpoint is available then start with the first commit - Iterable<RevCommit> logs = git.log().call(); - RevCommit lastLog = null; - - for (RevCommit log : logs) { - lastLog = log; - } - - if (lastLog != null) { - this.lastProcessedGitHash = lastLog.getName(); - } - } - - this.latestGitHash = this.lastProcessedGitHash; - } - - /** - * Read the last processed commit githash from the checkpoint file - * @return - * @throws IOException - */ - private String readCheckpoint() throws IOException { - File checkpointFile = new File(this.repoDir, CHECKPOINT_FILE); - return Files.toString(checkpointFile, Charsets.UTF_8); - } - - /** - * Write the last processed commit githash to the checkpoint file - * @param gitHash - * @throws IOException - */ - private void writeCheckpoint(String gitHash) throws IOException { - // write to a temporary name then rename to make the operation atomic when the file system allows a file to be - // replaced - File tmpCheckpointFile = new File(this.repoDir, CHECKPOINT_FILE_TMP); - File checkpointFile = new File(this.repoDir, CHECKPOINT_FILE); - - Files.write(gitHash, tmpCheckpointFile, Charsets.UTF_8); - - Files.move(tmpCheckpointFile, checkpointFile); - } - - private void moveCheckpointAndHashesForward() throws IOException { - this.lastProcessedGitHash = this.latestGitHash; - - writeCheckpoint(this.latestGitHash); - } - - /** - * - * @throws GitAPIException - * @throws IOException - */ - private List<DiffEntry> getChanges() throws GitAPIException, IOException { - // get tree for last processed commit - ObjectId oldHeadTree = git.getRepository().resolve(this.lastProcessedGitHash + "^{tree}"); - - // refresh to latest and reset hard to handle forced pushes - this.git.fetch().setRemote(REMOTE_NAME).call(); - // reset hard to get a clean working set since pull --rebase may leave files around - this.git.reset().setMode(ResetCommand.ResetType.HARD).setRef(REMOTE_NAME + "/" + this.branchName).call(); - - ObjectId head = this.git.getRepository().resolve("HEAD"); - ObjectId headTree = this.git.getRepository().resolve("HEAD^{tree}"); - - // remember the hash for the current HEAD. This will be checkpointed after the diff is processed. - latestGitHash = head.getName(); - - // diff old and new heads to find changes - ObjectReader reader = this.git.getRepository().newObjectReader(); - CanonicalTreeParser oldTreeIter = new CanonicalTreeParser(); - oldTreeIter.reset(reader, oldHeadTree); - CanonicalTreeParser newTreeIter = new CanonicalTreeParser(); - newTreeIter.reset(reader, headTree); - - return this.git.diff() - .setNewTree(newTreeIter) - .setOldTree(oldTreeIter) - .setShowNameAndStatusOnly(true) - .call(); - } - } } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/6b120185/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitFlowGraphMonitor.java ---------------------------------------------------------------------- diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitFlowGraphMonitor.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitFlowGraphMonitor.java new file mode 100644 index 0000000..c7dd226 --- /dev/null +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitFlowGraphMonitor.java @@ -0,0 +1,309 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.service.modules.core; + +import java.io.IOException; + +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.fs.Path; +import org.eclipse.jgit.diff.DiffEntry; + +import com.google.common.base.Joiner; +import com.google.common.collect.ImmutableMap; +import com.google.common.io.Files; +import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; +import com.typesafe.config.ConfigValueFactory; + +import lombok.extern.slf4j.Slf4j; + +import org.apache.gobblin.configuration.ConfigurationKeys; +import org.apache.gobblin.service.modules.flowgraph.DataNode; +import org.apache.gobblin.service.modules.flowgraph.FlowEdge; +import org.apache.gobblin.service.modules.flowgraph.FlowEdgeFactory; +import org.apache.gobblin.service.modules.flowgraph.FlowGraph; +import org.apache.gobblin.service.modules.flowgraph.FlowGraphConfigurationKeys; +import org.apache.gobblin.service.modules.template_catalog.FSFlowCatalog; +import org.apache.gobblin.util.ConfigUtils; +import org.apache.gobblin.util.reflection.GobblinConstructorUtils; + + +/** + * Service that monitors for changes to {@link org.apache.gobblin.service.modules.flowgraph.FlowGraph} from a git repository. + * The git repository must have an inital commit that has no files since that is used as a base for getting + * the change list. + * The {@link DataNode}s and {@link FlowEdge}s in FlowGraph need to be organized with the following directory structure on git: + * <root_flowGraph_dir>/<nodeName>/<nodeName>.properties + * <root_flowGraph_dir>/<nodeName1>/<nodeName2>/<edgeName>.properties + */ +@Slf4j +public class GitFlowGraphMonitor extends GitMonitoringService { + public static final String GIT_FLOWGRAPH_MONITOR_PREFIX = "gitFlowGraphMonitor"; + + private static final String PROPERTIES_EXTENSIONS = "properties"; + private static final String CONF_EXTENSIONS = StringUtils.EMPTY; + private static final String FLOW_EDGE_LABEL_JOINER_CHAR = ":"; + private static final String DEFAULT_GIT_FLOWGRAPH_MONITOR_REPO_DIR = "git-flowgraph"; + private static final String DEFAULT_GIT_FLOWGRAPH_MONITOR_FLOWGRAPH_DIR = "gobblin-flowgraph"; + private static final String DEFAULT_GIT_FLOWGRAPH_MONITOR_BRANCH_NAME = "master"; + + private static final int NODE_FILE_DEPTH = 3; + private static final int EDGE_FILE_DEPTH = 4; + private static final int DEFAULT_GIT_FLOWGRAPH_MONITOR_POLLING_INTERVAL = 60; + + private static final Config DEFAULT_FALLBACK = + ConfigFactory.parseMap(ImmutableMap.<String, Object>builder() + .put(ConfigurationKeys.GIT_MONITOR_REPO_DIR, DEFAULT_GIT_FLOWGRAPH_MONITOR_REPO_DIR) + .put(ConfigurationKeys.GIT_MONITOR_CONFIG_BASE_DIR, DEFAULT_GIT_FLOWGRAPH_MONITOR_FLOWGRAPH_DIR) + .put(ConfigurationKeys.GIT_MONITOR_BRANCH_NAME, DEFAULT_GIT_FLOWGRAPH_MONITOR_BRANCH_NAME) + .put(ConfigurationKeys.GIT_MONITOR_POLLING_INTERVAL, DEFAULT_GIT_FLOWGRAPH_MONITOR_POLLING_INTERVAL) + .put(JAVA_PROPS_EXTENSIONS, PROPERTIES_EXTENSIONS) + .put(HOCON_FILE_EXTENSIONS, CONF_EXTENSIONS) + .build()); + + private FSFlowCatalog flowCatalog; + private FlowGraph flowGraph; + private final Config emptyConfig = ConfigFactory.empty(); + + public GitFlowGraphMonitor(Config config, FSFlowCatalog flowCatalog, FlowGraph graph) { + super(config.getConfig(GIT_FLOWGRAPH_MONITOR_PREFIX).withFallback(DEFAULT_FALLBACK)); + this.flowCatalog = flowCatalog; + this.flowGraph = graph; + } + + /** + * Determine if the service should poll Git. + */ + @Override + public boolean shouldPollGit() { + return this.isActive; + } + + /** + * Add an element (i.e., a {@link DataNode}, or a {@link FlowEdge} to + * the {@link FlowGraph} for an added, updated or modified node or edge file. + * @param change + */ + @Override + public void addChange(DiffEntry change) { + Path path = new Path(change.getNewPath()); + if (path.depth() == NODE_FILE_DEPTH) { + addDataNode(change); + } else if (path.depth() == EDGE_FILE_DEPTH) { + addFlowEdge(change); + } + } + + /** + * Remove an element (i.e. either a {@link DataNode} or a {@link FlowEdge} from the {@link FlowGraph} for + * a renamed or deleted {@link DataNode} or {@link FlowEdge} file. + * @param change + */ + @Override + public void removeChange(DiffEntry change) { + Path path = new Path(change.getOldPath()); + if (path.depth() == NODE_FILE_DEPTH) { + removeDataNode(change); + } else if (path.depth() == EDGE_FILE_DEPTH) { + removeFlowEdge(change); + } + } + + /** + * Add a {@link DataNode} to the {@link FlowGraph}. The method uses the {@link FlowGraphConfigurationKeys#DATA_NODE_CLASS} config + * to instantiate a {@link DataNode} from the node config file. + * @param change + */ + private void addDataNode(DiffEntry change) { + if (checkFilePath(change.getNewPath(), NODE_FILE_DEPTH)) { + Path nodeFilePath = new Path(this.repositoryDir, change.getNewPath()); + try { + Config config = loadNodeFileWithOverrides(nodeFilePath); + Class dataNodeClass = Class.forName(ConfigUtils.getString(config, FlowGraphConfigurationKeys.DATA_NODE_CLASS, + FlowGraphConfigurationKeys.DEFAULT_DATA_NODE_CLASS)); + DataNode dataNode = (DataNode) GobblinConstructorUtils.invokeLongestConstructor(dataNodeClass, config); + if (!this.flowGraph.addDataNode(dataNode)) { + log.warn("Could not add DataNode {} to FlowGraph; skipping", dataNode.getId()); + } + } catch (Exception e) { + log.warn("Could not add DataNode defined in {} due to exception {}", change.getNewPath(), e.getMessage()); + } + } + } + + /** + * Remove a {@link DataNode} from the {@link FlowGraph}. The method extracts the nodeId of the + * {@link DataNode} from the node config file and uses it to delete the associated {@link DataNode}. + * @param change + */ + private void removeDataNode(DiffEntry change) { + if (checkFilePath(change.getOldPath(), NODE_FILE_DEPTH)) { + Path nodeFilePath = new Path(this.repositoryDir, change.getOldPath()); + Config config = getNodeConfigWithOverrides(ConfigFactory.empty(), nodeFilePath); + String nodeId = config.getString(FlowGraphConfigurationKeys.DATA_NODE_ID_KEY); + if (!this.flowGraph.deleteDataNode(nodeId)) { + log.warn("Could not remove DataNode {} from FlowGraph; skipping", nodeId); + } + } + } + + /** + * Add a {@link FlowEdge} to the {@link FlowGraph}. The method uses the {@link FlowEdgeFactory} instance + * provided by the {@link FlowGraph} to build a {@link FlowEdge} from the edge config file. + * @param change + */ + private void addFlowEdge(DiffEntry change) { + if (checkFilePath(change.getNewPath(), EDGE_FILE_DEPTH)) { + Path edgeFilePath = new Path(this.repositoryDir, change.getNewPath()); + try { + Config config = loadEdgeFileWithOverrides(edgeFilePath); + Class flowEdgeFactoryClass = Class.forName(ConfigUtils.getString(config, FlowGraphConfigurationKeys.FLOW_EDGE_FACTORY_CLASS, + FlowGraphConfigurationKeys.DEFAULT_FLOW_EDGE_FACTORY_CLASS)); + FlowEdgeFactory flowEdgeFactory = (FlowEdgeFactory) GobblinConstructorUtils.invokeLongestConstructor(flowEdgeFactoryClass, config); + FlowEdge edge = flowEdgeFactory.createFlowEdge(config, flowCatalog); + if (!this.flowGraph.addFlowEdge(edge)) { + log.warn("Could not add edge {} to FlowGraph; skipping", edge.getId()); + } + } catch (Exception e) { + log.warn("Could not add edge defined in {} due to exception {}", change.getNewPath(), e.getMessage()); + } + } + } + + /** + * Remove a {@link FlowEdge} from the {@link FlowGraph}. The method uses {@link FlowEdgeFactory} + * to construct the edgeId of the {@link FlowEdge} from the config file and uses it to delete the associated + * {@link FlowEdge}. + * @param change + */ + private void removeFlowEdge(DiffEntry change) { + if (checkFilePath(change.getOldPath(), EDGE_FILE_DEPTH)) { + Path edgeFilePath = new Path(this.repositoryDir, change.getOldPath()); + try { + Config config = getEdgeConfigWithOverrides(ConfigFactory.empty(), edgeFilePath); + String edgeId = config.getString(FlowGraphConfigurationKeys.FLOW_EDGE_ID_KEY); + if (!this.flowGraph.deleteFlowEdge(edgeId)) { + log.warn("Could not remove FlowEdge {} from FlowGraph; skipping", edgeId); + } + } catch (Exception e) { + log.warn("Could not remove edge defined in {} due to exception {}", edgeFilePath, e.getMessage()); + } + } + } + + /** + * check whether the file has the proper naming and hierarchy + * @param file the relative path from the repo root + * @return false if the file does not conform + */ + private boolean checkFilePath(String file, int depth) { + // The file is either a node file or an edge file and needs to be stored at either: + // flowGraphDir/nodeName/nodeName.properties (if it is a node file), or + // flowGraphDir/nodeName/nodeName/edgeName.properties (if it is an edge file) + + Path filePath = new Path(file); + String fileExtension = Files.getFileExtension(filePath.getName()); + if (filePath.depth() != depth || !checkFileLevelRelativeToRoot(filePath, depth) + || !(this.javaPropsExtensions.contains(fileExtension))) { + log.warn("Changed file does not conform to directory structure and file name format, skipping: " + + filePath); + return false; + } + return true; + } + + /** + * Helper to check if a file has proper hierarchy. + * @param filePath path of the node/edge file + * @param depth expected depth of the file + * @return true if the file conforms to the expected hierarchy + */ + private boolean checkFileLevelRelativeToRoot(Path filePath, int depth) { + if(filePath == null) { + return false; + } + Path path = filePath; + for (int i = 0; i < depth - 1; i++) { + path = path.getParent(); + } + if (!path.getName().equals(folderName)) { + return false; + } + return true; + } + + /** + * Helper that overrides the data.node.id property with name derived from the node file path + * @param nodeConfig node config + * @param nodeFilePath path of the node file + * @return config with overridden data.node.id + */ + private Config getNodeConfigWithOverrides(Config nodeConfig, Path nodeFilePath) { + String nodeId = nodeFilePath.getParent().getName(); + return nodeConfig.withValue(FlowGraphConfigurationKeys.DATA_NODE_ID_KEY, ConfigValueFactory.fromAnyRef(nodeId)); + } + + /** + * Helper that overrides the flow edge properties with name derived from the edge file path + * @param edgeConfig edge config + * @param edgeFilePath path of the edge file + * @return config with overridden edge properties + */ + private Config getEdgeConfigWithOverrides(Config edgeConfig, Path edgeFilePath) { + String source = edgeFilePath.getParent().getParent().getName(); + String destination = edgeFilePath.getParent().getName(); + String edgeName = Files.getNameWithoutExtension(edgeFilePath.getName()); + return edgeConfig.withValue(FlowGraphConfigurationKeys.FLOW_EDGE_SOURCE_KEY, ConfigValueFactory.fromAnyRef(source)) + .withValue(FlowGraphConfigurationKeys.FLOW_EDGE_DESTINATION_KEY, ConfigValueFactory.fromAnyRef(destination)) + .withValue(FlowGraphConfigurationKeys.FLOW_EDGE_ID_KEY, ConfigValueFactory.fromAnyRef(getEdgeId(source, destination, edgeName))); + } + + /** + * Load the node file. + * @param filePath path of the node file relative to the repository root + * @return the configuration object + * @throws IOException + */ + private Config loadNodeFileWithOverrides(Path filePath) throws IOException { + Config nodeConfig = this.pullFileLoader.loadPullFile(filePath, emptyConfig, false); + return getNodeConfigWithOverrides(nodeConfig, filePath); + } + + /** + * Load the edge file. + * @param filePath path of the edge file relative to the repository root + * @return the configuration object + * @throws IOException + */ + private Config loadEdgeFileWithOverrides(Path filePath) throws IOException { + Config edgeConfig = this.pullFileLoader.loadPullFile(filePath, emptyConfig, false); + return getEdgeConfigWithOverrides(edgeConfig, filePath); + } + + /** + * Get an edge label from the edge properties + * @param source source data node id + * @param destination destination data node id + * @param edgeName simple name of the edge (e.g. file name without extension of the edge file) + * @return a string label identifying the edge + */ + private String getEdgeId(String source, String destination, String edgeName) { + return Joiner.on(FLOW_EDGE_LABEL_JOINER_CHAR).join(source, destination, edgeName); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/6b120185/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitMonitoringService.java ---------------------------------------------------------------------- diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitMonitoringService.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitMonitoringService.java new file mode 100644 index 0000000..2361edc --- /dev/null +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitMonitoringService.java @@ -0,0 +1,326 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.service.modules.core; + +import java.io.File; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.net.URI; +import java.util.List; +import java.util.Set; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.eclipse.jgit.api.Git; +import org.eclipse.jgit.api.ResetCommand; +import org.eclipse.jgit.api.errors.GitAPIException; +import org.eclipse.jgit.diff.DiffEntry; +import org.eclipse.jgit.errors.RepositoryNotFoundException; +import org.eclipse.jgit.lib.ObjectId; +import org.eclipse.jgit.lib.ObjectReader; +import org.eclipse.jgit.revwalk.RevCommit; +import org.eclipse.jgit.treewalk.CanonicalTreeParser; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Charsets; +import com.google.common.base.Optional; +import com.google.common.base.Preconditions; +import com.google.common.collect.Sets; +import com.google.common.io.Files; +import com.google.common.util.concurrent.AbstractIdleService; +import com.typesafe.config.Config; + +import lombok.extern.slf4j.Slf4j; + +import org.apache.gobblin.configuration.ConfigurationKeys; +import org.apache.gobblin.runtime.spec_catalog.FlowCatalog; +import org.apache.gobblin.util.ExecutorsUtils; +import org.apache.gobblin.util.PullFileLoader; + + +@Slf4j +public abstract class GitMonitoringService extends AbstractIdleService { + private static final String REMOTE_NAME = "origin"; + private static final int TERMINATION_TIMEOUT = 30; + + public static final String JAVA_PROPS_EXTENSIONS = "javaPropsExtensions"; + public static final String HOCON_FILE_EXTENSIONS = "hoconFileExtensions"; + + private Integer pollingInterval; + protected final ScheduledExecutorService scheduledExecutor; + protected GitMonitoringService.GitRepository gitRepo; + protected String repositoryDir; + protected String folderName; + protected Path folderPath; + protected final PullFileLoader pullFileLoader; + protected final Set<String> javaPropsExtensions; + protected final Set<String> hoconFileExtensions; + protected volatile boolean isActive = false; + + public GitMonitoringService(Config config) { + Preconditions.checkArgument(config.hasPath(ConfigurationKeys.GIT_MONITOR_REPO_URI), + ConfigurationKeys.GIT_MONITOR_REPO_URI + " needs to be specified."); + + String repositoryUri = config.getString(ConfigurationKeys.GIT_MONITOR_REPO_URI); + this.repositoryDir = config.getString(ConfigurationKeys.GIT_MONITOR_REPO_DIR); + String branchName = config.getString(ConfigurationKeys.GIT_MONITOR_BRANCH_NAME); + this.pollingInterval = config.getInt(ConfigurationKeys.GIT_MONITOR_POLLING_INTERVAL); + this.folderName = config.getString(ConfigurationKeys.GIT_MONITOR_CONFIG_BASE_DIR); + + try { + this.gitRepo = new GitMonitoringService.GitRepository(repositoryUri, repositoryDir, branchName); + } catch (GitAPIException | IOException e) { + throw new RuntimeException("Could not open git repository", e); + } + + this.folderPath = new Path(this.repositoryDir, this.folderName); + this.javaPropsExtensions = Sets.newHashSet(config.getString(JAVA_PROPS_EXTENSIONS).split(",")); + this.hoconFileExtensions = Sets.newHashSet(config.getString(HOCON_FILE_EXTENSIONS).split(",")); + try { + this.pullFileLoader = new PullFileLoader(this.folderPath, + FileSystem.get(URI.create(ConfigurationKeys.LOCAL_FS_URI), new Configuration()), + this.javaPropsExtensions, this.hoconFileExtensions); + } catch (IOException e) { + throw new RuntimeException("Could not create pull file loader", e); + } + + this.scheduledExecutor = Executors.newSingleThreadScheduledExecutor( + ExecutorsUtils.newThreadFactory(Optional.of(log), Optional.of("FetchGitConfExecutor"))); + } + + synchronized void setActive(boolean isActive) { + if (this.isActive == isActive) { + // No-op if already in correct state + return; + } + + this.isActive = isActive; + } + + + /** Start the service. */ + @Override + protected void startUp() throws Exception { + log.info("Starting the " + GitConfigMonitor.class.getSimpleName()); + log.info("Polling git with inteval {} ", this.pollingInterval); + + // Schedule the job config fetch task + this.scheduledExecutor.scheduleAtFixedRate(new Runnable() { + @Override + public void run() { + try { + if (shouldPollGit()) { + processGitConfigChanges(); + } + } catch (GitAPIException | IOException e) { + log.error("Failed to process git config changes", e); + // next run will try again since errors could be intermittent + } + } + }, 0, this.pollingInterval, TimeUnit.SECONDS); + } + + /** + * Fetch the list of changes since the last refresh of the repository and apply the changes to the {@link FlowCatalog} + * @throws GitAPIException + * @throws IOException + */ + @VisibleForTesting + public void processGitConfigChanges() throws GitAPIException, IOException { + List<DiffEntry> changes = this.gitRepo.getChanges(); + + for (DiffEntry change : changes) { + switch (change.getChangeType()) { + case ADD: + case MODIFY: + addChange(change); + break; + case DELETE: + removeChange(change); + break; + case RENAME: + removeChange(change); + addChange(change); + break; + default: + throw new RuntimeException("Unsupported change type " + change.getChangeType()); + } + } + + // Done processing changes, so checkpoint + this.gitRepo.moveCheckpointAndHashesForward(); + } + + /** Stop the service. */ + @Override + protected void shutDown() throws Exception { + this.scheduledExecutor.shutdown(); + this.scheduledExecutor.awaitTermination(TERMINATION_TIMEOUT, TimeUnit.SECONDS); + } + + /** + * Class for managing a git repository + */ + static class GitRepository { + private final static String CHECKPOINT_FILE = "checkpoint.txt"; + private final static String CHECKPOINT_FILE_TMP = "checkpoint.tmp"; + private final String repoUri; + private final String repoDir; + private final String branchName; + private Git git; + private String lastProcessedGitHash; + private String latestGitHash; + + /** + * Create an object to manage the git repository stored locally at repoDir with a repository URI of repoDir + * @param repoUri URI of repository + * @param repoDir Directory to hold the local copy of the repository + * @param branchName Branch name + * @throws GitAPIException + * @throws IOException + */ + GitRepository(String repoUri, String repoDir, String branchName) throws GitAPIException, IOException { + this.repoUri = repoUri; + this.repoDir = repoDir; + this.branchName = branchName; + + initRepository(); + } + + /** + * Open the repository if it exists locally, otherwise clone it + * @throws GitAPIException + * @throws IOException + */ + private void initRepository() throws GitAPIException, IOException { + File repoDirFile = new File(this.repoDir); + + try { + this.git = Git.open(repoDirFile); + + String uri = this.git.getRepository().getConfig().getString("remote", REMOTE_NAME, "url"); + + if (!uri.equals(this.repoUri)) { + throw new RuntimeException("Repo at " + this.repoDir + " has uri " + uri + " instead of " + this.repoUri); + } + } catch (RepositoryNotFoundException e) { + // if the repository was not found then clone a new one + this.git = Git.cloneRepository() + .setDirectory(repoDirFile) + .setURI(this.repoUri) + .setBranch(this.branchName) + .call(); + } + + try { + this.lastProcessedGitHash = readCheckpoint(); + } catch (FileNotFoundException e) { + // if no checkpoint is available then start with the first commit + Iterable<RevCommit> logs = git.log().call(); + RevCommit lastLog = null; + + for (RevCommit log : logs) { + lastLog = log; + } + + if (lastLog != null) { + this.lastProcessedGitHash = lastLog.getName(); + } + } + + this.latestGitHash = this.lastProcessedGitHash; + } + + /** + * Read the last processed commit githash from the checkpoint file + * @return + * @throws IOException + */ + private String readCheckpoint() throws IOException { + File checkpointFile = new File(this.repoDir, CHECKPOINT_FILE); + return Files.toString(checkpointFile, Charsets.UTF_8); + } + + /** + * Write the last processed commit githash to the checkpoint file + * @param gitHash + * @throws IOException + */ + private void writeCheckpoint(String gitHash) throws IOException { + // write to a temporary name then rename to make the operation atomic when the file system allows a file to be + // replaced + File tmpCheckpointFile = new File(this.repoDir, CHECKPOINT_FILE_TMP); + File checkpointFile = new File(this.repoDir, CHECKPOINT_FILE); + + Files.write(gitHash, tmpCheckpointFile, Charsets.UTF_8); + + Files.move(tmpCheckpointFile, checkpointFile); + } + + void moveCheckpointAndHashesForward() throws IOException { + this.lastProcessedGitHash = this.latestGitHash; + + writeCheckpoint(this.latestGitHash); + } + + /** + * + * @throws GitAPIException + * @throws IOException + */ + List<DiffEntry> getChanges() throws GitAPIException, IOException { + // get tree for last processed commit + ObjectId oldHeadTree = git.getRepository().resolve(this.lastProcessedGitHash + "^{tree}"); + + // refresh to latest and reset hard to handle forced pushes + this.git.fetch().setRemote(REMOTE_NAME).call(); + // reset hard to get a clean working set since pull --rebase may leave files around + this.git.reset().setMode(ResetCommand.ResetType.HARD).setRef(REMOTE_NAME + "/" + this.branchName).call(); + + ObjectId head = this.git.getRepository().resolve("HEAD"); + ObjectId headTree = this.git.getRepository().resolve("HEAD^{tree}"); + + // remember the hash for the current HEAD. This will be checkpointed after the diff is processed. + latestGitHash = head.getName(); + + // diff old and new heads to find changes + ObjectReader reader = this.git.getRepository().newObjectReader(); + CanonicalTreeParser oldTreeIter = new CanonicalTreeParser(); + oldTreeIter.reset(reader, oldHeadTree); + CanonicalTreeParser newTreeIter = new CanonicalTreeParser(); + newTreeIter.reset(reader, headTree); + + return this.git.diff() + .setNewTree(newTreeIter) + .setOldTree(oldTreeIter) + .setShowNameAndStatusOnly(true) + .call(); + } + } + + public abstract boolean shouldPollGit(); + + public abstract void addChange(DiffEntry change); + + public abstract void removeChange(DiffEntry change); + +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/6b120185/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowEdge.java ---------------------------------------------------------------------- diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowEdge.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowEdge.java index 12b6222..fc82cc1 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowEdge.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowEdge.java @@ -17,19 +17,19 @@ package org.apache.gobblin.service.modules.flowgraph; -import java.io.IOException; import java.net.URI; import java.util.ArrayList; import java.util.List; import org.apache.hadoop.security.UserGroupInformation; -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Joiner; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import com.typesafe.config.Config; +import joptsimple.internal.Strings; +import lombok.Getter; + import org.apache.gobblin.annotation.Alpha; import org.apache.gobblin.runtime.api.SpecExecutor; import org.apache.gobblin.service.modules.template_catalog.FSFlowCatalog; @@ -37,17 +37,12 @@ import org.apache.gobblin.service.modules.template.FlowTemplate; import org.apache.gobblin.util.reflection.GobblinConstructorUtils; import org.apache.gobblin.util.ConfigUtils; -import joptsimple.internal.Strings; -import lombok.Getter; - /** * An implementation of {@link FlowEdge}. */ @Alpha public class BaseFlowEdge implements FlowEdge { - public static final String FLOW_EDGE_LABEL_JOINER_CHAR = ":"; - @Getter protected List<String> endPoints; @@ -67,13 +62,13 @@ public class BaseFlowEdge implements FlowEdge { private boolean active; //Constructor - public BaseFlowEdge(List<String> endPoints, String edgeName, FlowTemplate flowTemplate, List<SpecExecutor> executors, Config properties, boolean active) { + public BaseFlowEdge(List<String> endPoints, String edgeId, FlowTemplate flowTemplate, List<SpecExecutor> executors, Config properties, boolean active) { this.endPoints = endPoints; this.flowTemplate = flowTemplate; this.executors = executors; this.active = active; this.props = properties; - this.id = generateEdgeId(endPoints, edgeName); + this.id = edgeId; } @Override @@ -81,10 +76,6 @@ public class BaseFlowEdge implements FlowEdge { return true; } - @VisibleForTesting - protected static String generateEdgeId(List<String> endPoints, String edgeName) { - return Joiner.on(FLOW_EDGE_LABEL_JOINER_CHAR).join(endPoints.get(0), endPoints.get(1), edgeName); - } /** * The {@link FlowEdge}s are the same if they have the same endpoints and both refer to the same {@FlowTemplate} i.e. * the {@link FlowTemplate} uris are the same @@ -100,11 +91,11 @@ public class BaseFlowEdge implements FlowEdge { FlowEdge that = (FlowEdge) o; - if(!(this.getEndPoints().get(0).equals(that.getEndPoints().get(0))) && ((this.getEndPoints().get(1)).equals(that.getEndPoints().get(1)))) { + if (!(this.getEndPoints().get(0).equals(that.getEndPoints().get(0))) && ((this.getEndPoints().get(1)).equals(that.getEndPoints().get(1)))) { return false; } - if(!this.getFlowTemplate().getUri().equals(that.getFlowTemplate().getUri())) { + if (!this.getFlowTemplate().getUri().equals(that.getFlowTemplate().getUri())) { return false; } return true; @@ -135,17 +126,17 @@ public class BaseFlowEdge implements FlowEdge { @Override public FlowEdge createFlowEdge(Config edgeProps, FSFlowCatalog flowCatalog) throws FlowEdgeCreationException { try { - String source = ConfigUtils.getString(edgeProps, FlowGraphConfigurationKeys.FLOW_EDGE_SOURCE_KEY,""); + String source = ConfigUtils.getString(edgeProps, FlowGraphConfigurationKeys.FLOW_EDGE_SOURCE_KEY, ""); Preconditions.checkArgument(!Strings.isNullOrEmpty(source), "A FlowEdge must have a non-null or empty source"); - String destination = ConfigUtils.getString(edgeProps, FlowGraphConfigurationKeys.FLOW_EDGE_DESTINATION_KEY,""); + String destination = ConfigUtils.getString(edgeProps, FlowGraphConfigurationKeys.FLOW_EDGE_DESTINATION_KEY, ""); Preconditions.checkArgument(!Strings.isNullOrEmpty(destination), "A FlowEdge must have a non-null or empty destination"); List<String> endPoints = Lists.newArrayList(source, destination); - String edgeName = ConfigUtils.getString(edgeProps, FlowGraphConfigurationKeys.FLOW_EDGE_NAME_KEY,""); - Preconditions.checkArgument(!Strings.isNullOrEmpty(edgeName), "A FlowEdge must have a non-null or empty name"); + String edgeId = ConfigUtils.getString(edgeProps, FlowGraphConfigurationKeys.FLOW_EDGE_ID_KEY, ""); + Preconditions.checkArgument(!Strings.isNullOrEmpty(edgeId), "A FlowEdge must have a non-null or empty Id"); List<Config> specExecutorConfigList = new ArrayList<>(); boolean flag; - for(int i = 0; (flag = edgeProps.hasPath(FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY + "." + i)) != false; i++) { + for (int i = 0; (flag = edgeProps.hasPath(FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY + "." + i)); i++) { specExecutorConfigList.add(edgeProps.getConfig(FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY + "." + i)); } @@ -168,26 +159,12 @@ public class BaseFlowEdge implements FlowEdge { specExecutors.add(executor); } FlowTemplate flowTemplate = flowCatalog.getFlowTemplate(new URI(flowTemplateUri)); - return new BaseFlowEdge(endPoints, edgeName, flowTemplate, specExecutors, edgeProps, isActive); - } catch(RuntimeException e) { + return new BaseFlowEdge(endPoints, edgeId, flowTemplate, specExecutors, edgeProps, isActive); + } catch (RuntimeException e) { throw e; } catch (Exception e) { throw new FlowEdgeCreationException(e); } } - - @Override - public String getEdgeId(Config edgeProps) throws IOException { - String source = ConfigUtils.getString(edgeProps, FlowGraphConfigurationKeys.FLOW_EDGE_SOURCE_KEY,""); - Preconditions.checkArgument(!Strings.isNullOrEmpty(source), "A FlowEdge must have a non-null or empty source"); - String destination = ConfigUtils.getString(edgeProps, FlowGraphConfigurationKeys.FLOW_EDGE_DESTINATION_KEY,""); - Preconditions.checkArgument(!Strings.isNullOrEmpty(source), "A FlowEdge must have a non-null or empty destination"); - String edgeName = - ConfigUtils.getString(edgeProps, FlowGraphConfigurationKeys.FLOW_EDGE_NAME_KEY, ""); - Preconditions.checkArgument(!Strings.isNullOrEmpty(edgeName), "A FlowEdge must have a non-null or empty name"); - List<String> endPoints = Lists.newArrayList(source, destination); - - return generateEdgeId(endPoints, edgeName); - } } } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/6b120185/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/FlowEdgeFactory.java ---------------------------------------------------------------------- diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/FlowEdgeFactory.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/FlowEdgeFactory.java index 851e887..2977231 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/FlowEdgeFactory.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/FlowEdgeFactory.java @@ -17,13 +17,9 @@ package org.apache.gobblin.service.modules.flowgraph; -import java.io.IOException; -import java.util.Properties; - -import org.apache.gobblin.service.modules.template_catalog.FSFlowCatalog; - import com.typesafe.config.Config; +import org.apache.gobblin.service.modules.template_catalog.FSFlowCatalog; public interface FlowEdgeFactory { /** @@ -36,13 +32,6 @@ public interface FlowEdgeFactory { */ public FlowEdge createFlowEdge(Config edgeProps, FSFlowCatalog catalog) throws FlowEdgeCreationException; - /** - * Get an edge label from the edge properties - * @param edgeProps properties of the edge - * @return a string label identifying the edge - */ - public String getEdgeId(Config edgeProps) throws IOException; - public class FlowEdgeCreationException extends Exception { private static final String MESSAGE_FORMAT = "Failed to create FlowEdge because of: %s"; http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/6b120185/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/FlowGraphConfigurationKeys.java ---------------------------------------------------------------------- diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/FlowGraphConfigurationKeys.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/FlowGraphConfigurationKeys.java index 0d94e3f..cd4876a 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/FlowGraphConfigurationKeys.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/FlowGraphConfigurationKeys.java @@ -24,17 +24,22 @@ public class FlowGraphConfigurationKeys { /** * {@link DataNode} configuration keys. */ + public static final String DATA_NODE_CLASS = DATA_NODE_PREFIX + "class"; + public static final String DEFAULT_DATA_NODE_CLASS = "org.apache.gobblin.service.modules.flowgraph.BaseDataNode"; public static final String DATA_NODE_ID_KEY = DATA_NODE_PREFIX + "id"; public static final String DATA_NODE_IS_ACTIVE_KEY = DATA_NODE_PREFIX + "isActive"; /** * {@link FlowEdge} configuration keys. */ + public static final String FLOW_EDGE_FACTORY_CLASS = FLOW_EDGE_PREFIX + "factory.class"; + public static final String DEFAULT_FLOW_EDGE_FACTORY_CLASS = "org.apache.gobblin.service.modules.flowgraph.BaseFlowEdge$Factory"; public static final String FLOW_EDGE_SOURCE_KEY = FLOW_EDGE_PREFIX + "source"; public static final String FLOW_EDGE_DESTINATION_KEY = FLOW_EDGE_PREFIX + "destination"; + public static final String FLOW_EDGE_ID_KEY = FLOW_EDGE_PREFIX + "id"; public static final String FLOW_EDGE_NAME_KEY = FLOW_EDGE_PREFIX + "name"; public static final String FLOW_EDGE_IS_ACTIVE_KEY = FLOW_EDGE_PREFIX + "isActive"; public static final String FLOW_EDGE_TEMPLATE_URI_KEY = FLOW_EDGE_PREFIX + "flowTemplateUri"; - public static final String FLOW_EDGE_SPEC_EXECUTORS_KEY = FLOW_EDGE_PREFIX +"specExecutors"; + public static final String FLOW_EDGE_SPEC_EXECUTORS_KEY = FLOW_EDGE_PREFIX + "specExecutors"; public static final String FLOW_EDGE_SPEC_EXECUTOR_CLASS_KEY = "specExecutorClass"; } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/6b120185/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GitConfigMonitorTest.java ---------------------------------------------------------------------- diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GitConfigMonitorTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GitConfigMonitorTest.java index 6cd8bf2..71ad56d 100644 --- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GitConfigMonitorTest.java +++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GitConfigMonitorTest.java @@ -93,10 +93,11 @@ public class GitConfigMonitorTest { this.gitForPush.push().setRemote("origin").setRefSpecs(this.masterRefSpec).call(); this.config = ConfigBuilder.create() - .addPrimitive(ConfigurationKeys.GIT_CONFIG_MONITOR_REPO_URI, this.remoteRepo.getDirectory().getAbsolutePath()) - .addPrimitive(ConfigurationKeys.GIT_CONFIG_MONITOR_REPO_DIR, TEST_DIR + "/jobConfig") + .addPrimitive(GitConfigMonitor.GIT_CONFIG_MONITOR_PREFIX + "." + ConfigurationKeys.GIT_MONITOR_REPO_URI, + this.remoteRepo.getDirectory().getAbsolutePath()) + .addPrimitive(GitConfigMonitor.GIT_CONFIG_MONITOR_PREFIX + "." + ConfigurationKeys.GIT_MONITOR_REPO_DIR, TEST_DIR + "/jobConfig") .addPrimitive(ConfigurationKeys.FLOWSPEC_STORE_DIR_KEY, TEST_DIR + "flowCatalog") - .addPrimitive(ConfigurationKeys.GIT_CONFIG_MONITOR_POLLING_INTERVAL, 5) + .addPrimitive(ConfigurationKeys.GIT_MONITOR_POLLING_INTERVAL, 5) .build(); this.flowCatalog = new FlowCatalog(config); @@ -153,7 +154,7 @@ public class GitConfigMonitorTest { Collection<Spec> specs = this.flowCatalog.getSpecs(); Assert.assertTrue(specs.size() == 1); - FlowSpec spec = (FlowSpec)(specs.iterator().next()); + FlowSpec spec = (FlowSpec) (specs.iterator().next()); Assert.assertEquals(spec.getUri(), new URI("gobblin-flow:/testGroup/testFlow")); Assert.assertEquals(spec.getConfig().getString(ConfigurationKeys.FLOW_NAME_KEY), "testFlow"); Assert.assertEquals(spec.getConfig().getString(ConfigurationKeys.FLOW_GROUP_KEY), "testGroup"); @@ -176,7 +177,7 @@ public class GitConfigMonitorTest { Collection<Spec> specs = this.flowCatalog.getSpecs(); Assert.assertTrue(specs.size() == 1); - FlowSpec spec = (FlowSpec)(specs.iterator().next()); + FlowSpec spec = (FlowSpec) (specs.iterator().next()); Assert.assertEquals(spec.getUri(), new URI("gobblin-flow:/testGroup/testFlow")); Assert.assertEquals(spec.getConfig().getString(ConfigurationKeys.FLOW_NAME_KEY), "testFlow"); Assert.assertEquals(spec.getConfig().getString(ConfigurationKeys.FLOW_GROUP_KEY), "testGroup"); @@ -234,13 +235,13 @@ public class GitConfigMonitorTest { } }); - FlowSpec spec = (FlowSpec)specList.get(0); + FlowSpec spec = (FlowSpec) specList.get(0); Assert.assertEquals(spec.getUri(), new URI("gobblin-flow:/testGroup/testFlow")); Assert.assertEquals(spec.getConfig().getString(ConfigurationKeys.FLOW_NAME_KEY), "testFlow"); Assert.assertEquals(spec.getConfig().getString(ConfigurationKeys.FLOW_GROUP_KEY), "testGroup"); Assert.assertEquals(spec.getConfig().getString("param1"), "value1"); - spec = (FlowSpec)specList.get(1); + spec = (FlowSpec) specList.get(1); Assert.assertEquals(spec.getUri(), new URI("gobblin-flow:/testGroup/testFlow2")); Assert.assertEquals(spec.getConfig().getString(ConfigurationKeys.FLOW_NAME_KEY), "testFlow2"); Assert.assertEquals(spec.getConfig().getString(ConfigurationKeys.FLOW_GROUP_KEY), "testGroup"); @@ -279,13 +280,13 @@ public class GitConfigMonitorTest { } }); - spec = (FlowSpec)specList.get(0); + spec = (FlowSpec) specList.get(0); Assert.assertEquals(spec.getUri(), new URI("gobblin-flow:/testGroup/testFlow2")); Assert.assertEquals(spec.getConfig().getString(ConfigurationKeys.FLOW_NAME_KEY), "testFlow2"); Assert.assertEquals(spec.getConfig().getString(ConfigurationKeys.FLOW_GROUP_KEY), "testGroup"); Assert.assertEquals(spec.getConfig().getString("param1"), "value4"); - spec = (FlowSpec)specList.get(1); + spec = (FlowSpec) specList.get(1); Assert.assertEquals(spec.getUri(), new URI("gobblin-flow:/testGroup/testFlow3")); Assert.assertEquals(spec.getConfig().getString(ConfigurationKeys.FLOW_NAME_KEY), "testFlow3"); Assert.assertEquals(spec.getConfig().getString(ConfigurationKeys.FLOW_GROUP_KEY), "testGroup"); @@ -325,7 +326,7 @@ public class GitConfigMonitorTest { specs = this.flowCatalog.getSpecs(); Assert.assertTrue(specs.size() == 1); - FlowSpec spec = (FlowSpec)(specs.iterator().next()); + FlowSpec spec = (FlowSpec) (specs.iterator().next()); Assert.assertEquals(spec.getUri(), new URI("gobblin-flow:/testGroup/testFlow")); Assert.assertEquals(spec.getConfig().getString(ConfigurationKeys.FLOW_NAME_KEY), "testFlow"); Assert.assertEquals(spec.getConfig().getString(ConfigurationKeys.FLOW_GROUP_KEY), "testGroup"); http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/6b120185/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GobblinServiceManagerTest.java ---------------------------------------------------------------------- diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GobblinServiceManagerTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GobblinServiceManagerTest.java index 926dd10..68b030b 100644 --- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GobblinServiceManagerTest.java +++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GobblinServiceManagerTest.java @@ -95,7 +95,7 @@ public class GobblinServiceManagerTest { Properties serviceCoreProperties = new Properties(); serviceCoreProperties.put(ConfigurationKeys.TOPOLOGYSPEC_STORE_DIR_KEY, TOPOLOGY_SPEC_STORE_DIR); serviceCoreProperties.put(ConfigurationKeys.FLOWSPEC_STORE_DIR_KEY, FLOW_SPEC_STORE_DIR); - serviceCoreProperties.put(ServiceConfigKeys.TOPOLOGY_FACTORY_TOPOLOGY_NAMES_KEY , TEST_GOBBLIN_EXECUTOR_NAME); + serviceCoreProperties.put(ServiceConfigKeys.TOPOLOGY_FACTORY_TOPOLOGY_NAMES_KEY, TEST_GOBBLIN_EXECUTOR_NAME); serviceCoreProperties.put(ServiceConfigKeys.TOPOLOGY_FACTORY_PREFIX + TEST_GOBBLIN_EXECUTOR_NAME + ".description", "StandaloneTestExecutor"); serviceCoreProperties.put(ServiceConfigKeys.TOPOLOGY_FACTORY_PREFIX + TEST_GOBBLIN_EXECUTOR_NAME + ".version", @@ -108,9 +108,9 @@ public class GobblinServiceManagerTest { TEST_SOURCE_NAME + ":" + TEST_SINK_NAME); serviceCoreProperties.put(ServiceConfigKeys.GOBBLIN_SERVICE_GIT_CONFIG_MONITOR_ENABLED_KEY, true); - serviceCoreProperties.put(ConfigurationKeys.GIT_CONFIG_MONITOR_REPO_URI, GIT_REMOTE_REPO_DIR); - serviceCoreProperties.put(ConfigurationKeys.GIT_CONFIG_MONITOR_REPO_DIR, GIT_LOCAL_REPO_DIR); - serviceCoreProperties.put(ConfigurationKeys.GIT_CONFIG_MONITOR_POLLING_INTERVAL, 5); + serviceCoreProperties.put(GitConfigMonitor.GIT_CONFIG_MONITOR_PREFIX + "." + ConfigurationKeys.GIT_MONITOR_REPO_URI, GIT_REMOTE_REPO_DIR); + serviceCoreProperties.put(GitConfigMonitor.GIT_CONFIG_MONITOR_PREFIX + "." + ConfigurationKeys.GIT_MONITOR_REPO_DIR, GIT_LOCAL_REPO_DIR); + serviceCoreProperties.put(GitConfigMonitor.GIT_CONFIG_MONITOR_PREFIX + "." + ConfigurationKeys.GIT_MONITOR_POLLING_INTERVAL, 5); // Create a bare repository RepositoryCache.FileKey fileKey = RepositoryCache.FileKey.exact(new File(GIT_REMOTE_REPO_DIR), FS.DETECTED); @@ -200,7 +200,7 @@ public class GobblinServiceManagerTest { Assert.assertEquals(flowConfig.getId().getFlowGroup(), TEST_GROUP_NAME); Assert.assertEquals(flowConfig.getId().getFlowName(), TEST_FLOW_NAME); - Assert.assertEquals(flowConfig.getSchedule().getCronSchedule(), TEST_SCHEDULE ); + Assert.assertEquals(flowConfig.getSchedule().getCronSchedule(), TEST_SCHEDULE); Assert.assertEquals(flowConfig.getTemplateUris(), TEST_TEMPLATE_URI); Assert.assertFalse(flowConfig.getSchedule().isRunImmediately()); // Add this assert back when getFlowSpec() is changed to return the raw flow spec @@ -228,7 +228,7 @@ public class GobblinServiceManagerTest { Assert.assertEquals(retrievedFlowConfig.getId().getFlowGroup(), TEST_GROUP_NAME); Assert.assertEquals(retrievedFlowConfig.getId().getFlowName(), TEST_FLOW_NAME); - Assert.assertEquals(retrievedFlowConfig.getSchedule().getCronSchedule(), TEST_SCHEDULE ); + Assert.assertEquals(retrievedFlowConfig.getSchedule().getCronSchedule(), TEST_SCHEDULE); Assert.assertEquals(retrievedFlowConfig.getTemplateUris(), TEST_TEMPLATE_URI); // Add this asssert when getFlowSpec() is changed to return the raw flow spec //Assert.assertEquals(flowConfig.getProperties().size(), 2); @@ -279,7 +279,7 @@ public class GobblinServiceManagerTest { specs = this.gobblinServiceManager.flowCatalog.getSpecs(); Assert.assertTrue(specs.size() == 1); - FlowSpec spec = (FlowSpec)(specs.iterator().next()); + FlowSpec spec = (FlowSpec) (specs.iterator().next()); Assert.assertEquals(spec.getUri(), new URI("gobblin-flow:/testGroup/testFlow")); Assert.assertEquals(spec.getConfig().getString(ConfigurationKeys.FLOW_NAME_KEY), "testFlow"); Assert.assertEquals(spec.getConfig().getString(ConfigurationKeys.FLOW_GROUP_KEY), "testGroup"); http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/6b120185/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowGraphTest.java ---------------------------------------------------------------------- diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowGraphTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowGraphTest.java index 9738344..04f2270 100644 --- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowGraphTest.java +++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flowgraph/BaseFlowGraphTest.java @@ -51,6 +51,10 @@ public class BaseFlowGraphTest { private FlowEdge edge3; private FlowEdge edge3c; + private String edgeId1; + private String edgeId2; + private String edgeId3; + BaseFlowGraph graph; @BeforeClass public void setUp() @@ -58,33 +62,40 @@ public class BaseFlowGraphTest { IOException, DataNode.DataNodeCreationException { Properties properties = new Properties(); properties.put("key1", "val1"); - Config node1Config = ConfigUtils.propertiesToConfig(properties).withValue(FlowGraphConfigurationKeys.DATA_NODE_ID_KEY, ConfigValueFactory.fromAnyRef("node1")); + Config node1Config = ConfigUtils.propertiesToConfig(properties).withValue(FlowGraphConfigurationKeys.DATA_NODE_ID_KEY, + ConfigValueFactory.fromAnyRef("node1")); node1 = new BaseDataNode(node1Config); properties = new Properties(); properties.put("key2", "val2"); - Config node2Config = ConfigUtils.propertiesToConfig(properties).withValue(FlowGraphConfigurationKeys.DATA_NODE_ID_KEY, ConfigValueFactory.fromAnyRef("node2")); + Config node2Config = ConfigUtils.propertiesToConfig(properties).withValue(FlowGraphConfigurationKeys.DATA_NODE_ID_KEY, + ConfigValueFactory.fromAnyRef("node2")); node2 = new BaseDataNode(node2Config); properties = new Properties(); properties.put("key3", "val3"); - Config node3Config = ConfigUtils.propertiesToConfig(properties).withValue(FlowGraphConfigurationKeys.DATA_NODE_ID_KEY, ConfigValueFactory.fromAnyRef("node3")); + Config node3Config = ConfigUtils.propertiesToConfig(properties).withValue(FlowGraphConfigurationKeys.DATA_NODE_ID_KEY, + ConfigValueFactory.fromAnyRef("node3")); node3 = new BaseDataNode(node3Config); //Create a clone of node3 node3c = new BaseDataNode(node3Config); - FlowTemplate flowTemplate1 = new StaticFlowTemplate(new URI("FS:///uri1"),"","", ConfigFactory.empty(),null, null, null); - FlowTemplate flowTemplate2 = new StaticFlowTemplate(new URI("FS:///uri2"),"","", ConfigFactory.empty(),null, null, null); - FlowTemplate flowTemplate3 = new StaticFlowTemplate(new URI("FS:///uri3"),"","", ConfigFactory.empty(),null, null, null); + FlowTemplate flowTemplate1 = new StaticFlowTemplate(new URI("FS:///uri1"), "", "", ConfigFactory.empty(), null, null, null); + FlowTemplate flowTemplate2 = new StaticFlowTemplate(new URI("FS:///uri2"), "", "", ConfigFactory.empty(), null, null, null); + FlowTemplate flowTemplate3 = new StaticFlowTemplate(new URI("FS:///uri3"), "", "", ConfigFactory.empty(), null, null, null); //Create edge instances - edge1 = new BaseFlowEdge(Lists.newArrayList("node1", "node2"), "edge1", flowTemplate1, null, ConfigFactory.empty(), true); - edge2 = new BaseFlowEdge(Lists.newArrayList("node2", "node3"), "edge2", flowTemplate2, null, ConfigFactory.empty(), true); - edge3 = new BaseFlowEdge(Lists.newArrayList("node3", "node1"), "edge3", flowTemplate3, null, ConfigFactory.empty(), true); + edgeId1 = "node1:node2:edge1"; + edgeId2 = "node2:node3:edge2"; + edgeId3 = "node3:node1:edge3"; + + edge1 = new BaseFlowEdge(Lists.newArrayList("node1", "node2"), edgeId1, flowTemplate1, null, ConfigFactory.empty(), true); + edge2 = new BaseFlowEdge(Lists.newArrayList("node2", "node3"), edgeId2, flowTemplate2, null, ConfigFactory.empty(), true); + edge3 = new BaseFlowEdge(Lists.newArrayList("node3", "node1"), edgeId3, flowTemplate3, null, ConfigFactory.empty(), true); //Create a clone of edge3 - edge3c = new BaseFlowEdge(Lists.newArrayList("node3", "node1"), "edge3", flowTemplate3, null, ConfigFactory.empty(), true); + edge3c = new BaseFlowEdge(Lists.newArrayList("node3", "node1"), edgeId3, flowTemplate3, null, ConfigFactory.empty(), true); //Create a FlowGraph graph = new BaseFlowGraph(); @@ -193,8 +204,7 @@ public class BaseFlowGraphTest { @Test (dependsOnMethods = "testDeleteDataNode") public void testDeleteFlowEdgeById() throws Exception { - String edgeLabel1 = BaseFlowEdge.generateEdgeId(Lists.newArrayList("node1", "node2"), "edge1"); - Assert.assertTrue(graph.deleteFlowEdge(edgeLabel1)); + Assert.assertTrue(graph.deleteFlowEdge(edgeId1)); Assert.assertEquals(graph.getEdges("node1").size(), 0); Assert.assertEquals(graph.getEdges("node2").size(), 1); Assert.assertEquals(graph.getEdges("node3").size(), 1); @@ -203,8 +213,7 @@ public class BaseFlowGraphTest { Assert.assertTrue(graph.getEdges("node2").contains(edge2)); Assert.assertTrue(graph.getEdges("node3").contains(edge3)); - String edgeLabel2 = BaseFlowEdge.generateEdgeId(Lists.newArrayList("node2", "node3"), "edge2"); - Assert.assertTrue(graph.deleteFlowEdge(edgeLabel2)); + Assert.assertTrue(graph.deleteFlowEdge(edgeId2)); Assert.assertEquals(graph.getEdges("node1").size(), 0); Assert.assertEquals(graph.getEdges("node2").size(), 0); Assert.assertEquals(graph.getEdges("node3").size(), 1); @@ -213,8 +222,7 @@ public class BaseFlowGraphTest { Assert.assertTrue(!graph.getEdges("node2").contains(edge2)); Assert.assertTrue(graph.getEdges("node3").contains(edge3)); - String edgeLabel3 = BaseFlowEdge.generateEdgeId(Lists.newArrayList("node3", "node1"), "edge3"); - Assert.assertTrue(graph.deleteFlowEdge(edgeLabel3)); + Assert.assertTrue(graph.deleteFlowEdge(edgeId3)); Assert.assertEquals(graph.getEdges("node1").size(), 0); Assert.assertEquals(graph.getEdges("node2").size(), 0); Assert.assertEquals(graph.getEdges("node3").size(), 0); @@ -223,8 +231,8 @@ public class BaseFlowGraphTest { Assert.assertTrue(!graph.getEdges("node2").contains(edge2)); Assert.assertTrue(!graph.getEdges("node3").contains(edge3)); - Assert.assertTrue(!graph.deleteFlowEdge(edgeLabel1)); - Assert.assertTrue(!graph.deleteFlowEdge(edgeLabel2)); - Assert.assertTrue(!graph.deleteFlowEdge(edgeLabel3)); + Assert.assertTrue(!graph.deleteFlowEdge(edgeId1)); + Assert.assertTrue(!graph.deleteFlowEdge(edgeId2)); + Assert.assertTrue(!graph.deleteFlowEdge(edgeId3)); } } \ No newline at end of file
