Github user bbende commented on a diff in the pull request:
https://github.com/apache/nifi-registry/pull/112#discussion_r184458388
--- Diff:
nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/flow/git/GitFlowMetaData.java
---
@@ -0,0 +1,384 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.registry.provider.flow.git;
+
+import org.eclipse.jgit.api.Git;
+import org.eclipse.jgit.api.PushCommand;
+import org.eclipse.jgit.api.Status;
+import org.eclipse.jgit.api.errors.GitAPIException;
+import org.eclipse.jgit.api.errors.NoHeadException;
+import org.eclipse.jgit.lib.ObjectId;
+import org.eclipse.jgit.lib.Repository;
+import org.eclipse.jgit.lib.UserConfig;
+import org.eclipse.jgit.revwalk.RevCommit;
+import org.eclipse.jgit.revwalk.RevTree;
+import org.eclipse.jgit.storage.file.FileRepositoryBuilder;
+import org.eclipse.jgit.transport.CredentialsProvider;
+import org.eclipse.jgit.transport.PushResult;
+import org.eclipse.jgit.transport.RemoteConfig;
+import org.eclipse.jgit.transport.UsernamePasswordCredentialsProvider;
+import org.eclipse.jgit.treewalk.TreeWalk;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.yaml.snakeyaml.Yaml;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStreamWriter;
+import java.io.Writer;
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+import static java.lang.String.format;
+import static org.apache.commons.lang3.StringUtils.isEmpty;
+
+class GitFlowMetaData {
+
+ static final int CURRENT_LAYOUT_VERSION = 1;
+
+ static final String LAYOUT_VERSION = "layoutVer";
+ static final String BUCKET_ID = "bucketId";
+ static final String FLOWS = "flows";
+ static final String VER = "ver";
+ static final String FILE = "file";
+ static final String BUCKET_FILENAME = "bucket.yml";
+
+ private static final Logger logger =
LoggerFactory.getLogger(GitFlowMetaData.class);
+
+ private Repository gitRepo;
+ private String remoteToPush;
+ private CredentialsProvider credentialsProvider;
+
+ /**
+ * Bucket ID to Bucket.
+ */
+ private Map<String, Bucket> buckets = new HashMap<>();
+
+ public void setRemoteToPush(String remoteToPush) {
+ this.remoteToPush = remoteToPush;
+ }
+
+ public void setRemoteCredential(String userName, String password) {
+ this.credentialsProvider = new
UsernamePasswordCredentialsProvider(userName, password);
+ }
+
+ /**
+ * Open a Git repository using the specified directory.
+ * @param gitProjectRootDir a root directory of a Git project
+ * @return created Repository
+ * @throws IOException thrown when the specified directory does not
exist,
+ * does not have read/write privilege or not containing .git directory
+ */
+ private Repository openRepository(final File gitProjectRootDir) throws
IOException {
+
+ // Instead of using
FileUtils.ensureDirectoryExistAndCanReadAndWrite, check availability manually
here.
+ // Because the util will try to create a dir if not exist.
+ // The git dir should be initialized and configured by users.
+ if (!gitProjectRootDir.isDirectory()) {
+ throw new IOException(format("'%s' is not a directory or does
not exist.", gitProjectRootDir));
+ }
+
+ if (!(gitProjectRootDir.canRead() &&
gitProjectRootDir.canWrite())) {
+ throw new IOException(format("Directory '%s' does not have
read/write privilege.", gitProjectRootDir));
+ }
+
+ // Search .git dir but avoid searching parent directories.
+ final FileRepositoryBuilder builder = new FileRepositoryBuilder()
+ .readEnvironment()
+ .setMustExist(true)
+ .addCeilingDirectory(gitProjectRootDir)
+ .findGitDir(gitProjectRootDir);
+
+ if (builder.getGitDir() == null) {
+ throw new IOException(format("Directory '%s' does not contain
a .git directory." +
+ " Please init and configure the directory with 'git
init' command before using it from NiFi Registry.",
+ gitProjectRootDir));
+ }
+
+ return builder.build();
+ }
+
+ @SuppressWarnings("unchecked")
+ public void loadGitRepository(File gitProjectRootDir) throws
IOException, GitAPIException {
+ gitRepo = openRepository(gitProjectRootDir);
+
+ try (final Git git = new Git(gitRepo)) {
+
+ // Check if remote exists.
+ if (!isEmpty(remoteToPush)) {
+ final List<RemoteConfig> remotes = git.remoteList().call();
+ final boolean isRemoteExist =
remotes.stream().anyMatch(remote -> remote.getName().equals(remoteToPush));
+ if (!isRemoteExist) {
+ final List<String> remoteNames =
remotes.stream().map(RemoteConfig::getName).collect(Collectors.toList());
+ throw new IllegalArgumentException(
+ format("The configured remote '%s' to push
does not exist. Available remotes are %s", remoteToPush, remoteNames));
+ }
+ }
+
+ boolean isLatestCommit = true;
+ try {
+ for (RevCommit commit : git.log().call()) {
+ final String shortCommitId =
commit.getId().abbreviate(7).name();
+ logger.debug("Processing a commit: {}", shortCommitId);
+ final RevTree tree = commit.getTree();
+
+ try (final TreeWalk treeWalk = new TreeWalk(gitRepo)) {
+ treeWalk.addTree(tree);
+
+ // Path -> ObjectId
+ final Map<String, ObjectId> bucketObjectIds = new
HashMap<>();
+ final Map<String, ObjectId> flowSnapshotObjectIds
= new HashMap<>();
+ while (treeWalk.next()) {
+ if (treeWalk.isSubtree()) {
+ treeWalk.enterSubtree();
+ } else {
+ final String pathString =
treeWalk.getPathString();
+ // TODO: what is this nth?? When does it
get grater than 0? Tree count seems to be always 1..
+ if (pathString.endsWith("/" +
BUCKET_FILENAME)) {
+ bucketObjectIds.put(pathString,
treeWalk.getObjectId(0));
+ } else if
(pathString.endsWith(GitFlowPersistenceProvider.SNAPSHOT_EXTENSION)) {
+ flowSnapshotObjectIds.put(pathString,
treeWalk.getObjectId(0));
+ }
+ }
+ }
+
+ if (bucketObjectIds.isEmpty()) {
+ // No bucket.yml means at this point, all
flows are deleted. No need to scan older commits because those are already
deleted.
+ logger.debug("Tree at commit {} does not
contain any " + BUCKET_FILENAME + ". Stop loading commits here.",
shortCommitId);
+ return;
+ }
+
+ loadBuckets(gitRepo, commit, isLatestCommit,
bucketObjectIds, flowSnapshotObjectIds);
+ isLatestCommit = false;
+ }
+ }
+ } catch (NoHeadException e) {
+ logger.debug("'{}' does not have any commit yet. Starting
with empty buckets.", gitProjectRootDir);
+ }
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ private void loadBuckets(Repository gitRepo, RevCommit commit, boolean
isLatestCommit, Map<String, ObjectId> bucketObjectIds, Map<String, ObjectId>
flowSnapshotObjectIds) throws IOException {
+ final Yaml yaml = new Yaml();
+ for (String bucketFilePath : bucketObjectIds.keySet()) {
+ final ObjectId bucketObjectId =
bucketObjectIds.get(bucketFilePath);
+ final Map<String, Object> bucketMeta;
+ try (InputStream bucketIn =
gitRepo.newObjectReader().open(bucketObjectId).openStream()) {
+ bucketMeta = yaml.load(bucketIn);
+ }
+
+ if (!validateRequiredValue(bucketMeta, bucketFilePath,
LAYOUT_VERSION, BUCKET_ID, FLOWS)) {
+ continue;
+ }
+
+ int layoutVersion = (int) bucketMeta.get(LAYOUT_VERSION);
+ if (layoutVersion > CURRENT_LAYOUT_VERSION) {
+ logger.warn("{} has unsupported {} {}. This Registry can
only support {} or lower. Skipping it.",
+ bucketFilePath, LAYOUT_VERSION, layoutVersion,
CURRENT_LAYOUT_VERSION);
+ continue;
+ }
+
+ final String bucketId = (String) bucketMeta.get(BUCKET_ID);
+
+ final Bucket bucket;
+ if (isLatestCommit) {
+ // If this is the latest commit, then create one.
+ bucket = getBucketOrCreate(bucketId);
+ } else {
+ // Otherwise non-existing bucket means it's already
deleted.
+ final Optional<Bucket> bucketOpt = getBucket(bucketId);
+ if (bucketOpt.isPresent()) {
+ bucket = bucketOpt.get();
+ } else {
+ logger.debug("Bucket {} does not exist any longer. It
may have been deleted.", bucketId);
+ continue;
+ }
+ }
+
+ // E.g. DirA/DirB/DirC/bucket.yml -> DirC will be the bucket
name.
+ final String[] pathNames = bucketFilePath.split("/");
--- End diff --
This made me wonder... should we do some sanitizing/converting of bucket
and flow names before using them to create directories and files?
The Registry UI lets you enter anything so I created a bucket with the name
"This / is / a test" which created three directories for the repo since it
split on forward slashes.
Technically it works fine and did not produce any errors when
saving/retrieving flows, but we could possibly convert slashes and spaces to
underscores?
Something like:
`This_is_a_test/This_is_my_flow.snapshot`
---