bbende commented on code in PR #7092:
URL: https://github.com/apache/nifi/pull/7092#discussion_r1169121670


##########
nifi-registry/nifi-registry-core/nifi-registry-web-api/src/main/java/org/apache/nifi/registry/web/api/BucketFlowResource.java:
##########
@@ -295,6 +295,48 @@ public Response createFlowVersion(
         return 
Response.status(Response.Status.OK).entity(createdSnapshot).build();
     }
 
+    @POST
+    @Path("{flowId}/versions/preserveSourceProperties")

Review Comment:
   I think David's suggestion was that we should add an optional query param to 
the existing create method, so we wouldn't need this whole new method and 
instead a client could specify `?preserveSourceProperties=true` 
   
   @exceptionfactory is that what you were thinking?



##########
nifi-registry/nifi-registry-core/nifi-registry-web-api/src/main/java/org/apache/nifi/registry/web/api/BucketResource.java:
##########
@@ -92,6 +92,32 @@ public Response createBucket(
         return 
Response.status(Response.Status.OK).entity(createdBucket).build();
     }
 
+    @POST
+    @Path("migrate")

Review Comment:
   Same as previous comment about using a query param on the existing create 
bucket and being consistent with calling it `preserveSourceProperties`



##########
nifi-registry/nifi-registry-core/nifi-registry-client/src/main/java/org/apache/nifi/registry/client/BucketClient.java:
##########
@@ -36,6 +36,16 @@ public interface BucketClient {
      */
     Bucket create(Bucket bucket) throws NiFiRegistryException, IOException;
 
+    /**
+     * Creates the given bucket. If migration parameter is true the bucket 
will be created with the provided identifier.
+     * False will set a generated identifier.
+     *
+     * @param bucket the bucket to create
+     * @param migration whether the operation is migration related
+     * @return the created bucket with the populated identifier
+     */
+    Bucket create(Bucket bucket, boolean migration) throws 
NiFiRegistryException, IOException;

Review Comment:
   Should call this `preserveSourceProperties` to be consistent



##########
nifi-registry/nifi-registry-core/nifi-registry-provider-api/src/main/java/org/apache/nifi/registry/hook/EventType.java:
##########
@@ -41,6 +41,14 @@ public enum EventType {
             EventFieldName.VERSION,
             EventFieldName.USER,
             EventFieldName.COMMENT),
+
+    MIGRATE_FLOW_VERSION(

Review Comment:
   See later comments about query param approach... If we use the query param 
approach on the existing end-point then I think this probably goes away



##########
nifi-registry/nifi-registry-core/nifi-registry-web-api/src/main/java/org/apache/nifi/registry/web/service/ServiceFacade.java:
##########
@@ -70,6 +70,8 @@ public interface ServiceFacade {
 
     Bucket deleteBucket(String bucketIdentifier, RevisionInfo revisionInfo);
 
+    Bucket migrateBucket(Bucket bucket);

Review Comment:
   This could go away, or could become `createBucket(Bucket bucket, boolean 
preserveSourceProperties)`



##########
nifi-registry/nifi-registry-core/nifi-registry-web-api/src/main/java/org/apache/nifi/registry/web/service/StandardServiceFacade.java:
##########
@@ -202,6 +202,22 @@ public Bucket deleteBucket(final String bucketIdentifier, 
final RevisionInfo rev
                 () -> registryService.deleteBucket(bucketIdentifier));
     }
 
+    @Override
+    public Bucket migrateBucket(final Bucket bucket) {

Review Comment:
   I think this method can go away right? It would just use the regular create



##########
nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/command/registry/flow/ImportAllFlows.java:
##########
@@ -0,0 +1,330 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.toolkit.cli.impl.command.registry.flow;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.commons.cli.MissingOptionException;
+import org.apache.commons.cli.ParseException;
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.curator.shaded.com.google.common.collect.ComparisonChain;
+import org.apache.nifi.flow.VersionedFlowCoordinates;
+import org.apache.nifi.flow.VersionedProcessGroup;
+import org.apache.nifi.registry.bucket.Bucket;
+import org.apache.nifi.registry.client.BucketClient;
+import org.apache.nifi.registry.client.FlowClient;
+import org.apache.nifi.registry.client.FlowSnapshotClient;
+import org.apache.nifi.registry.client.NiFiRegistryClient;
+import org.apache.nifi.registry.client.NiFiRegistryException;
+import org.apache.nifi.registry.flow.VersionedFlow;
+import org.apache.nifi.registry.flow.VersionedFlowSnapshot;
+import org.apache.nifi.registry.flow.VersionedFlowSnapshotMetadata;
+import org.apache.nifi.toolkit.cli.api.CommandException;
+import org.apache.nifi.toolkit.cli.api.Context;
+import org.apache.nifi.toolkit.cli.impl.command.CommandOption;
+import 
org.apache.nifi.toolkit.cli.impl.command.registry.AbstractNiFiRegistryCommand;
+import org.apache.nifi.toolkit.cli.impl.command.registry.bucket.ListBuckets;
+import org.apache.nifi.toolkit.cli.impl.result.StringResult;
+import org.apache.nifi.toolkit.cli.impl.util.JacksonUtils;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+public class ImportAllFlows extends AbstractNiFiRegistryCommand<StringResult> {
+    private static final String FILE_NAME_PREFIX = 
"toolkit_registry_export_all_";
+    private static final String SKIPPING_BUCKET_CREATION = " already exists, 
skipping bucket creation...";
+    private static final String SKIPPING_IMPORT = " already exists, skipping 
import...";
+    private static final String SKIPPING_FLOW_CREATION = " already exists, 
skipping flow creation...";
+    private static final String IMPORT_COMPLETED = "Import completed...";
+    private static final String ALL_BUCKETS_COLLECTED = "All buckets 
collected...";
+    private static final String ALL_FLOWS_COLLECTED = "All flows collected...";
+    private static final String ALL_FLOW_VERSIONS_COLLECTED = "All flow 
versions collected...";
+    private static final String FILE_NAME_SEPARATOR = "_";
+    private static final String STORAGE_LOCATION_URL = 
"%s/nifi-registry-api/buckets/%s/flows/%s/versions/%s";
+    private static final ObjectMapper MAPPER = JacksonUtils.getObjectMapper();
+    private final ListBuckets listBuckets;
+    private final ListFlows listFlows;
+    private final ListFlowVersions listFlowVersions;
+
+    public ImportAllFlows() {
+        super("import-all-flows", StringResult.class);
+        this.listBuckets = new ListBuckets();
+        this.listFlows = new ListFlows();
+        this.listFlowVersions = new ListFlowVersions();
+    }
+
+    @Override
+    protected void doInitialize(Context context) {
+        addOption(CommandOption.INPUT_SOURCE.createOption());
+        addOption(CommandOption.SKIP_EXISTING.createOption());
+
+        listBuckets.initialize(context);
+        listFlows.initialize(context);
+        listFlowVersions.initialize(context);
+    }
+
+    @Override
+    public String getDescription() {
+        return "From a provided directory as input, the directory content must 
be generated by the export-all-flows command, " +
+                "based on the file contents, the corresponding buckets, flows 
and flow versions will be created." +
+                "If not configured otherwise, already existing objects will be 
skipped.";
+    }
+
+    @Override
+    public StringResult doExecute(final NiFiRegistryClient client, final 
Properties properties) throws IOException, NiFiRegistryException, 
ParseException, CommandException {
+        final boolean skip = Boolean.parseBoolean(getRequiredArg(properties, 
CommandOption.SKIP_EXISTING));
+        final boolean isInteractive = getContext().isInteractive();
+
+        //Gather all buckets and create a map for easier search by bucket name
+        final Map<String, String> bucketMap = getBucketMap(client, 
isInteractive);
+
+        // Gather all flows and create a map for easier search by flow name.
+        // As flow name is only unique within the same bucket we need to use 
the bucket id in the key as well
+        final Map<Pair<String, String>, String> flowMap = getFlowMap(client, 
bucketMap, isInteractive);
+        final Map<Pair<String, String>, String> flowCreated = new HashMap<>();
+
+        // Gather all flow versions and create a map for easier search by flow 
id
+        final Map<String, List<Integer>> versionMap = getVersionMap(client, 
flowMap, isInteractive);
+
+        // Create file path list
+        final List<VersionFileMetaData> files = getFilePathList(properties);
+
+        // As we need to keep the version order the list needs to be sorted
+        files.sort((o1, o2) -> ComparisonChain.start()
+                .compare(o1.getBucketName(), o2.getBucketName())
+                .compare(o1.getFlowName(), o2.getFlowName())
+                .compare(o1.getVersion(), o2.getVersion())
+                .result());
+
+        for (VersionFileMetaData file : files) {
+            final String inputSource = file.getInputSource();
+            final String fileContent = getInputSourceContent(inputSource);
+            final VersionedFlowSnapshot snapshot = 
MAPPER.readValue(fileContent, VersionedFlowSnapshot.class);
+
+            final String bucketName = snapshot.getBucket().getName();
+            final String bucketDescription = 
snapshot.getBucket().getDescription();
+            final String flowName = snapshot.getFlow().getName();
+            final String flowDescription = snapshot.getFlow().getDescription();
+            final int flowVersion = 
snapshot.getSnapshotMetadata().getVersion();
+            // The original bucket and flow ids must be kept otherwise NiFi 
won't be able to synchronize with the NiFi Registry
+            final String flowId = snapshot.getFlow().getIdentifier();
+            final String bucketId = snapshot.getBucket().getIdentifier();
+
+            // Create bucket if missing
+            if (bucketMap.containsKey(bucketName)) {
+                printMessage(isInteractive, bucketName + 
SKIPPING_BUCKET_CREATION);
+            } else {
+                //The original bucket id must be kept otherwise NiFi won't be 
able to synchronize with the NiFi Registry
+                createBucket(client, bucketMap, bucketName, bucketDescription, 
bucketId);
+            }
+
+            // Create flow if missing
+            if (flowMap.containsKey(new ImmutablePair<>(bucketId, flowName))) {
+                if (skip) {
+                    printMessage(isInteractive, flowName + SKIPPING_IMPORT);
+                    continue;
+                } else {
+                    //flowId
+                    printMessage(isInteractive, flowName + 
SKIPPING_FLOW_CREATION);
+                }
+            } else if (!flowCreated.containsKey(new ImmutablePair<>(bucketId, 
flowName))) {
+                createFlow(client, flowCreated, flowId, flowName, 
flowDescription, bucketId);
+            }
+
+            // Create missing flow versions
+            if (!versionMap.getOrDefault(flowId, 
Collections.emptyList()).contains(flowVersion)) {
+                //update storage location
+                final String registryUrl = getRequiredArg(properties, 
CommandOption.URL);
+
+                
updateStorageLocation(snapshot.getFlowContents().getVersionedFlowCoordinates(), 
registryUrl);
+                for (VersionedProcessGroup processGroup : 
snapshot.getFlowContents().getProcessGroups()) {
+                    
updateStorageLocation(processGroup.getVersionedFlowCoordinates(), registryUrl);
+                }

Review Comment:
   Right now I think this will only handle the root group and first level 
groups, but there could be a child group under version control at any arbitrary 
level. 
   ```
   Main PG
     - PG Level 1 (not under VC)
       - PG Level 2 (not under VC) 
         - PG Level 3 (Version Controlled) 
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to