bbende commented on code in PR #7092:
URL: https://github.com/apache/nifi/pull/7092#discussion_r1169121670
##########
nifi-registry/nifi-registry-core/nifi-registry-web-api/src/main/java/org/apache/nifi/registry/web/api/BucketFlowResource.java:
##########
@@ -295,6 +295,48 @@ public Response createFlowVersion(
return
Response.status(Response.Status.OK).entity(createdSnapshot).build();
}
+ @POST
+ @Path("{flowId}/versions/preserveSourceProperties")
Review Comment:
I think David's suggestion was that we should add an optional query param to
the existing create method, so we wouldn't need this whole new method and
instead a client could specify `?preserveSourceProperties=true`
@exceptionfactory is that what you were thinking?
##########
nifi-registry/nifi-registry-core/nifi-registry-web-api/src/main/java/org/apache/nifi/registry/web/api/BucketResource.java:
##########
@@ -92,6 +92,32 @@ public Response createBucket(
return
Response.status(Response.Status.OK).entity(createdBucket).build();
}
+ @POST
+ @Path("migrate")
Review Comment:
Same as previous comment about using a query param on the existing create
bucket and being consistent with calling it `preserveSourceProperties`
##########
nifi-registry/nifi-registry-core/nifi-registry-client/src/main/java/org/apache/nifi/registry/client/BucketClient.java:
##########
@@ -36,6 +36,16 @@ public interface BucketClient {
*/
Bucket create(Bucket bucket) throws NiFiRegistryException, IOException;
+ /**
+ * Creates the given bucket. If migration parameter is true the bucket
will be created with the provided identifier.
+ * False will set a generated identifier.
+ *
+ * @param bucket the bucket to create
+ * @param migration whether the operation is migration related
+ * @return the created bucket with the populated identifier
+ */
+ Bucket create(Bucket bucket, boolean migration) throws
NiFiRegistryException, IOException;
Review Comment:
Should call this `preserveSourceProperties` to be consistent
##########
nifi-registry/nifi-registry-core/nifi-registry-provider-api/src/main/java/org/apache/nifi/registry/hook/EventType.java:
##########
@@ -41,6 +41,14 @@ public enum EventType {
EventFieldName.VERSION,
EventFieldName.USER,
EventFieldName.COMMENT),
+
+ MIGRATE_FLOW_VERSION(
Review Comment:
See later comments about query param approach... If we use the query param
approach on the existing end-point then I think this probably goes away
##########
nifi-registry/nifi-registry-core/nifi-registry-web-api/src/main/java/org/apache/nifi/registry/web/service/ServiceFacade.java:
##########
@@ -70,6 +70,8 @@ public interface ServiceFacade {
Bucket deleteBucket(String bucketIdentifier, RevisionInfo revisionInfo);
+ Bucket migrateBucket(Bucket bucket);
Review Comment:
This could go away, or could become `createBucket(Bucket bucket, boolean
preserveSourceProperties)`
##########
nifi-registry/nifi-registry-core/nifi-registry-web-api/src/main/java/org/apache/nifi/registry/web/service/StandardServiceFacade.java:
##########
@@ -202,6 +202,22 @@ public Bucket deleteBucket(final String bucketIdentifier,
final RevisionInfo rev
() -> registryService.deleteBucket(bucketIdentifier));
}
+ @Override
+ public Bucket migrateBucket(final Bucket bucket) {
Review Comment:
I think this method can go away right? It would just use the regular create
##########
nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/command/registry/flow/ImportAllFlows.java:
##########
@@ -0,0 +1,330 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.toolkit.cli.impl.command.registry.flow;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.commons.cli.MissingOptionException;
+import org.apache.commons.cli.ParseException;
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.curator.shaded.com.google.common.collect.ComparisonChain;
+import org.apache.nifi.flow.VersionedFlowCoordinates;
+import org.apache.nifi.flow.VersionedProcessGroup;
+import org.apache.nifi.registry.bucket.Bucket;
+import org.apache.nifi.registry.client.BucketClient;
+import org.apache.nifi.registry.client.FlowClient;
+import org.apache.nifi.registry.client.FlowSnapshotClient;
+import org.apache.nifi.registry.client.NiFiRegistryClient;
+import org.apache.nifi.registry.client.NiFiRegistryException;
+import org.apache.nifi.registry.flow.VersionedFlow;
+import org.apache.nifi.registry.flow.VersionedFlowSnapshot;
+import org.apache.nifi.registry.flow.VersionedFlowSnapshotMetadata;
+import org.apache.nifi.toolkit.cli.api.CommandException;
+import org.apache.nifi.toolkit.cli.api.Context;
+import org.apache.nifi.toolkit.cli.impl.command.CommandOption;
+import
org.apache.nifi.toolkit.cli.impl.command.registry.AbstractNiFiRegistryCommand;
+import org.apache.nifi.toolkit.cli.impl.command.registry.bucket.ListBuckets;
+import org.apache.nifi.toolkit.cli.impl.result.StringResult;
+import org.apache.nifi.toolkit.cli.impl.util.JacksonUtils;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+public class ImportAllFlows extends AbstractNiFiRegistryCommand<StringResult> {
+ private static final String FILE_NAME_PREFIX =
"toolkit_registry_export_all_";
+ private static final String SKIPPING_BUCKET_CREATION = " already exists,
skipping bucket creation...";
+ private static final String SKIPPING_IMPORT = " already exists, skipping
import...";
+ private static final String SKIPPING_FLOW_CREATION = " already exists,
skipping flow creation...";
+ private static final String IMPORT_COMPLETED = "Import completed...";
+ private static final String ALL_BUCKETS_COLLECTED = "All buckets
collected...";
+ private static final String ALL_FLOWS_COLLECTED = "All flows collected...";
+ private static final String ALL_FLOW_VERSIONS_COLLECTED = "All flow
versions collected...";
+ private static final String FILE_NAME_SEPARATOR = "_";
+ private static final String STORAGE_LOCATION_URL =
"%s/nifi-registry-api/buckets/%s/flows/%s/versions/%s";
+ private static final ObjectMapper MAPPER = JacksonUtils.getObjectMapper();
+ private final ListBuckets listBuckets;
+ private final ListFlows listFlows;
+ private final ListFlowVersions listFlowVersions;
+
+ public ImportAllFlows() {
+ super("import-all-flows", StringResult.class);
+ this.listBuckets = new ListBuckets();
+ this.listFlows = new ListFlows();
+ this.listFlowVersions = new ListFlowVersions();
+ }
+
+ @Override
+ protected void doInitialize(Context context) {
+ addOption(CommandOption.INPUT_SOURCE.createOption());
+ addOption(CommandOption.SKIP_EXISTING.createOption());
+
+ listBuckets.initialize(context);
+ listFlows.initialize(context);
+ listFlowVersions.initialize(context);
+ }
+
+ @Override
+ public String getDescription() {
+ return "From a provided directory as input, the directory content must
be generated by the export-all-flows command, " +
+ "based on the file contents, the corresponding buckets, flows
and flow versions will be created." +
+ "If not configured otherwise, already existing objects will be
skipped.";
+ }
+
+ @Override
+ public StringResult doExecute(final NiFiRegistryClient client, final
Properties properties) throws IOException, NiFiRegistryException,
ParseException, CommandException {
+ final boolean skip = Boolean.parseBoolean(getRequiredArg(properties,
CommandOption.SKIP_EXISTING));
+ final boolean isInteractive = getContext().isInteractive();
+
+ //Gather all buckets and create a map for easier search by bucket name
+ final Map<String, String> bucketMap = getBucketMap(client,
isInteractive);
+
+ // Gather all flows and create a map for easier search by flow name.
+ // As flow name is only unique within the same bucket we need to use
the bucket id in the key as well
+ final Map<Pair<String, String>, String> flowMap = getFlowMap(client,
bucketMap, isInteractive);
+ final Map<Pair<String, String>, String> flowCreated = new HashMap<>();
+
+ // Gather all flow versions and create a map for easier search by flow
id
+ final Map<String, List<Integer>> versionMap = getVersionMap(client,
flowMap, isInteractive);
+
+ // Create file path list
+ final List<VersionFileMetaData> files = getFilePathList(properties);
+
+ // As we need to keep the version order the list needs to be sorted
+ files.sort((o1, o2) -> ComparisonChain.start()
+ .compare(o1.getBucketName(), o2.getBucketName())
+ .compare(o1.getFlowName(), o2.getFlowName())
+ .compare(o1.getVersion(), o2.getVersion())
+ .result());
+
+ for (VersionFileMetaData file : files) {
+ final String inputSource = file.getInputSource();
+ final String fileContent = getInputSourceContent(inputSource);
+ final VersionedFlowSnapshot snapshot =
MAPPER.readValue(fileContent, VersionedFlowSnapshot.class);
+
+ final String bucketName = snapshot.getBucket().getName();
+ final String bucketDescription =
snapshot.getBucket().getDescription();
+ final String flowName = snapshot.getFlow().getName();
+ final String flowDescription = snapshot.getFlow().getDescription();
+ final int flowVersion =
snapshot.getSnapshotMetadata().getVersion();
+ // The original bucket and flow ids must be kept otherwise NiFi
won't be able to synchronize with the NiFi Registry
+ final String flowId = snapshot.getFlow().getIdentifier();
+ final String bucketId = snapshot.getBucket().getIdentifier();
+
+ // Create bucket if missing
+ if (bucketMap.containsKey(bucketName)) {
+ printMessage(isInteractive, bucketName +
SKIPPING_BUCKET_CREATION);
+ } else {
+ //The original bucket id must be kept otherwise NiFi won't be
able to synchronize with the NiFi Registry
+ createBucket(client, bucketMap, bucketName, bucketDescription,
bucketId);
+ }
+
+ // Create flow if missing
+ if (flowMap.containsKey(new ImmutablePair<>(bucketId, flowName))) {
+ if (skip) {
+ printMessage(isInteractive, flowName + SKIPPING_IMPORT);
+ continue;
+ } else {
+ //flowId
+ printMessage(isInteractive, flowName +
SKIPPING_FLOW_CREATION);
+ }
+ } else if (!flowCreated.containsKey(new ImmutablePair<>(bucketId,
flowName))) {
+ createFlow(client, flowCreated, flowId, flowName,
flowDescription, bucketId);
+ }
+
+ // Create missing flow versions
+ if (!versionMap.getOrDefault(flowId,
Collections.emptyList()).contains(flowVersion)) {
+ //update storage location
+ final String registryUrl = getRequiredArg(properties,
CommandOption.URL);
+
+
updateStorageLocation(snapshot.getFlowContents().getVersionedFlowCoordinates(),
registryUrl);
+ for (VersionedProcessGroup processGroup :
snapshot.getFlowContents().getProcessGroups()) {
+
updateStorageLocation(processGroup.getVersionedFlowCoordinates(), registryUrl);
+ }
Review Comment:
Right now I think this will only handle the root group and first level
groups, but there could be a child group under version control at any arbitrary
level.
```
Main PG
- PG Level 1 (not under VC)
- PG Level 2 (not under VC)
- PG Level 3 (Version Controlled)
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]