Github user bbende commented on a diff in the pull request: https://github.com/apache/nifi-registry/pull/112#discussion_r184458388 --- Diff: nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/flow/git/GitFlowMetaData.java --- @@ -0,0 +1,384 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.registry.provider.flow.git; + +import org.eclipse.jgit.api.Git; +import org.eclipse.jgit.api.PushCommand; +import org.eclipse.jgit.api.Status; +import org.eclipse.jgit.api.errors.GitAPIException; +import org.eclipse.jgit.api.errors.NoHeadException; +import org.eclipse.jgit.lib.ObjectId; +import org.eclipse.jgit.lib.Repository; +import org.eclipse.jgit.lib.UserConfig; +import org.eclipse.jgit.revwalk.RevCommit; +import org.eclipse.jgit.revwalk.RevTree; +import org.eclipse.jgit.storage.file.FileRepositoryBuilder; +import org.eclipse.jgit.transport.CredentialsProvider; +import org.eclipse.jgit.transport.PushResult; +import org.eclipse.jgit.transport.RemoteConfig; +import org.eclipse.jgit.transport.UsernamePasswordCredentialsProvider; +import org.eclipse.jgit.treewalk.TreeWalk; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.yaml.snakeyaml.Yaml; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStreamWriter; +import java.io.Writer; +import java.nio.charset.StandardCharsets; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.stream.Collectors; + +import static java.lang.String.format; +import static org.apache.commons.lang3.StringUtils.isEmpty; + +class GitFlowMetaData { + + static final int CURRENT_LAYOUT_VERSION = 1; + + static final String LAYOUT_VERSION = "layoutVer"; + static final String BUCKET_ID = "bucketId"; + static final String FLOWS = "flows"; + static final String VER = "ver"; + static final String FILE = "file"; + static final String BUCKET_FILENAME = "bucket.yml"; + + private static final Logger logger = LoggerFactory.getLogger(GitFlowMetaData.class); + + private Repository gitRepo; + private String remoteToPush; + private CredentialsProvider credentialsProvider; + + /** + * Bucket ID to Bucket. + */ + private Map<String, Bucket> buckets = new HashMap<>(); + + public void setRemoteToPush(String remoteToPush) { + this.remoteToPush = remoteToPush; + } + + public void setRemoteCredential(String userName, String password) { + this.credentialsProvider = new UsernamePasswordCredentialsProvider(userName, password); + } + + /** + * Open a Git repository using the specified directory. + * @param gitProjectRootDir a root directory of a Git project + * @return created Repository + * @throws IOException thrown when the specified directory does not exist, + * does not have read/write privilege or not containing .git directory + */ + private Repository openRepository(final File gitProjectRootDir) throws IOException { + + // Instead of using FileUtils.ensureDirectoryExistAndCanReadAndWrite, check availability manually here. + // Because the util will try to create a dir if not exist. + // The git dir should be initialized and configured by users. + if (!gitProjectRootDir.isDirectory()) { + throw new IOException(format("'%s' is not a directory or does not exist.", gitProjectRootDir)); + } + + if (!(gitProjectRootDir.canRead() && gitProjectRootDir.canWrite())) { + throw new IOException(format("Directory '%s' does not have read/write privilege.", gitProjectRootDir)); + } + + // Search .git dir but avoid searching parent directories. + final FileRepositoryBuilder builder = new FileRepositoryBuilder() + .readEnvironment() + .setMustExist(true) + .addCeilingDirectory(gitProjectRootDir) + .findGitDir(gitProjectRootDir); + + if (builder.getGitDir() == null) { + throw new IOException(format("Directory '%s' does not contain a .git directory." + + " Please init and configure the directory with 'git init' command before using it from NiFi Registry.", + gitProjectRootDir)); + } + + return builder.build(); + } + + @SuppressWarnings("unchecked") + public void loadGitRepository(File gitProjectRootDir) throws IOException, GitAPIException { + gitRepo = openRepository(gitProjectRootDir); + + try (final Git git = new Git(gitRepo)) { + + // Check if remote exists. + if (!isEmpty(remoteToPush)) { + final List<RemoteConfig> remotes = git.remoteList().call(); + final boolean isRemoteExist = remotes.stream().anyMatch(remote -> remote.getName().equals(remoteToPush)); + if (!isRemoteExist) { + final List<String> remoteNames = remotes.stream().map(RemoteConfig::getName).collect(Collectors.toList()); + throw new IllegalArgumentException( + format("The configured remote '%s' to push does not exist. Available remotes are %s", remoteToPush, remoteNames)); + } + } + + boolean isLatestCommit = true; + try { + for (RevCommit commit : git.log().call()) { + final String shortCommitId = commit.getId().abbreviate(7).name(); + logger.debug("Processing a commit: {}", shortCommitId); + final RevTree tree = commit.getTree(); + + try (final TreeWalk treeWalk = new TreeWalk(gitRepo)) { + treeWalk.addTree(tree); + + // Path -> ObjectId + final Map<String, ObjectId> bucketObjectIds = new HashMap<>(); + final Map<String, ObjectId> flowSnapshotObjectIds = new HashMap<>(); + while (treeWalk.next()) { + if (treeWalk.isSubtree()) { + treeWalk.enterSubtree(); + } else { + final String pathString = treeWalk.getPathString(); + // TODO: what is this nth?? When does it get grater than 0? Tree count seems to be always 1.. + if (pathString.endsWith("/" + BUCKET_FILENAME)) { + bucketObjectIds.put(pathString, treeWalk.getObjectId(0)); + } else if (pathString.endsWith(GitFlowPersistenceProvider.SNAPSHOT_EXTENSION)) { + flowSnapshotObjectIds.put(pathString, treeWalk.getObjectId(0)); + } + } + } + + if (bucketObjectIds.isEmpty()) { + // No bucket.yml means at this point, all flows are deleted. No need to scan older commits because those are already deleted. + logger.debug("Tree at commit {} does not contain any " + BUCKET_FILENAME + ". Stop loading commits here.", shortCommitId); + return; + } + + loadBuckets(gitRepo, commit, isLatestCommit, bucketObjectIds, flowSnapshotObjectIds); + isLatestCommit = false; + } + } + } catch (NoHeadException e) { + logger.debug("'{}' does not have any commit yet. Starting with empty buckets.", gitProjectRootDir); + } + } + } + + @SuppressWarnings("unchecked") + private void loadBuckets(Repository gitRepo, RevCommit commit, boolean isLatestCommit, Map<String, ObjectId> bucketObjectIds, Map<String, ObjectId> flowSnapshotObjectIds) throws IOException { + final Yaml yaml = new Yaml(); + for (String bucketFilePath : bucketObjectIds.keySet()) { + final ObjectId bucketObjectId = bucketObjectIds.get(bucketFilePath); + final Map<String, Object> bucketMeta; + try (InputStream bucketIn = gitRepo.newObjectReader().open(bucketObjectId).openStream()) { + bucketMeta = yaml.load(bucketIn); + } + + if (!validateRequiredValue(bucketMeta, bucketFilePath, LAYOUT_VERSION, BUCKET_ID, FLOWS)) { + continue; + } + + int layoutVersion = (int) bucketMeta.get(LAYOUT_VERSION); + if (layoutVersion > CURRENT_LAYOUT_VERSION) { + logger.warn("{} has unsupported {} {}. This Registry can only support {} or lower. Skipping it.", + bucketFilePath, LAYOUT_VERSION, layoutVersion, CURRENT_LAYOUT_VERSION); + continue; + } + + final String bucketId = (String) bucketMeta.get(BUCKET_ID); + + final Bucket bucket; + if (isLatestCommit) { + // If this is the latest commit, then create one. + bucket = getBucketOrCreate(bucketId); + } else { + // Otherwise non-existing bucket means it's already deleted. + final Optional<Bucket> bucketOpt = getBucket(bucketId); + if (bucketOpt.isPresent()) { + bucket = bucketOpt.get(); + } else { + logger.debug("Bucket {} does not exist any longer. It may have been deleted.", bucketId); + continue; + } + } + + // E.g. DirA/DirB/DirC/bucket.yml -> DirC will be the bucket name. + final String[] pathNames = bucketFilePath.split("/"); --- End diff -- This made me wonder... should we do some sanitizing/converting of bucket and flow names before using them to create directories and files? The Registry UI lets you enter anything so I created a bucket with the name "This / is / a test" which created three directories for the repo since it split on forward slashes. Technically it works fine and did not produce any errors when saving/retrieving flows, but we could possibly convert slashes and spaces to underscores? Something like: `This_is_a_test/This_is_my_flow.snapshot`
---