bbende commented on a change in pull request #319:
URL: https://github.com/apache/nifi-registry/pull/319#discussion_r616802214



##########
File path: 
nifi-registry-core/nifi-registry-web-api/src/main/java/org/apache/nifi/registry/web/api/BucketFlowResource.java
##########
@@ -291,6 +303,129 @@ public Response createFlowVersion(
         return 
Response.status(Response.Status.OK).entity(createdSnapshot).build();
     }
 
+    @POST
+    @Path("{flowId}/versions/import")
+    @Consumes(MediaType.MULTIPART_FORM_DATA)
+    @Produces(MediaType.APPLICATION_JSON)
+    @ApiOperation(
+            value = "Upload flow version",
+            notes = "Uploads the next version of a flow. The version number of 
the object being created must be the " +
+                    "next available version integer. Flow versions are 
immutable after they are created.",
+            response = VersionedFlowSnapshot.class,
+            extensions = {
+                    @Extension(name = "access-policy", properties = {
+                            @ExtensionProperty(name = "action", value = 
"write"),
+                            @ExtensionProperty(name = "resource", value = 
"/buckets/{bucketId}") })
+            }
+    )
+    @ApiResponses({
+            @ApiResponse(code = 400, message = HttpStatusMessages.MESSAGE_400),
+            @ApiResponse(code = 401, message = HttpStatusMessages.MESSAGE_401),
+            @ApiResponse(code = 403, message = HttpStatusMessages.MESSAGE_403),
+            @ApiResponse(code = 404, message = HttpStatusMessages.MESSAGE_404),
+            @ApiResponse(code = 409, message = HttpStatusMessages.MESSAGE_409) 
})
+    public Response importVersionedFlow(
+            @PathParam("bucketId")
+            @ApiParam("The bucket identifier")
+            final String bucketId,
+            @PathParam("flowId")
+            @ApiParam(value = "The flow identifier")
+            final String flowId,
+            @FormDataParam("file") final InputStream in,
+            @FormDataParam("comments") final String comments) {
+
+        if (StringUtils.isBlank(bucketId)) {
+            throw new IllegalArgumentException("The bucket identifier is 
required.");
+        }
+
+        if (StringUtils.isBlank(flowId)) {
+            throw new IllegalArgumentException("The flow identifier is 
required.");
+        }
+
+        // deserialize InputStream to a VersionedFlowSnapshot
+        VersionedFlowSnapshot versionedFlowSnapshot;
+
+        versionedFlowSnapshot = deserializeVersionedFlowSnapshot(in);
+
+        // clear or set the necessary snapShotMetadata
+        if (versionedFlowSnapshot.getSnapshotMetadata() != null) {
+            
versionedFlowSnapshot.getSnapshotMetadata().setBucketIdentifier(null);
+            
versionedFlowSnapshot.getSnapshotMetadata().setFlowIdentifier(null);
+            versionedFlowSnapshot.getSnapshotMetadata().setLink(null);
+            versionedFlowSnapshot.getSnapshotMetadata().setVersion(-1);
+            versionedFlowSnapshot.getSnapshotMetadata().setVersion(-1);
+            // if there are new comments, then set it
+            // otherwise, keep the original comments
+            if (!StringUtils.isBlank(comments)) {
+                
versionedFlowSnapshot.getSnapshotMetadata().setComments(comments);
+            }
+        }
+
+        return createFlowVersion(bucketId, flowId, versionedFlowSnapshot);
+    }
+
+    @POST
+    @Path("import")
+    @Consumes(MediaType.MULTIPART_FORM_DATA)
+    @Produces(MediaType.APPLICATION_JSON)
+    @ApiOperation(
+            value = "Create flow",
+            notes = "Creates a flow in the given bucket. The flow id is 
created by the server and populated in the returned entity.",
+            response = VersionedFlow.class,
+            extensions = {
+                    @Extension(name = "access-policy", properties = {
+                            @ExtensionProperty(name = "action", value = 
"write"),
+                            @ExtensionProperty(name = "resource", value = 
"/buckets/{bucketId}")})
+            }
+    )
+    @ApiResponses({
+            @ApiResponse(code = 400, message = HttpStatusMessages.MESSAGE_400),
+            @ApiResponse(code = 401, message = HttpStatusMessages.MESSAGE_401),
+            @ApiResponse(code = 403, message = HttpStatusMessages.MESSAGE_403),
+            @ApiResponse(code = 404, message = HttpStatusMessages.MESSAGE_404),
+            @ApiResponse(code = 409, message = 
HttpStatusMessages.MESSAGE_409)})
+    public Response importFlow(

Review comment:
       I was wondering if we really need this end-point. Since there is already 
an existing POST to `/buckets/{bucketId}/flows` for creating a flow, could the 
front-end call that first to create the flow and then call the import version 
end-point? This is would make it similar to how it works when NiFi first starts 
version control on a PG, it calls create flow then adds the first snapshot.

##########
File path: 
nifi-registry-core/nifi-registry-web-api/src/main/java/org/apache/nifi/registry/web/api/BucketFlowResource.java
##########
@@ -291,6 +303,129 @@ public Response createFlowVersion(
         return 
Response.status(Response.Status.OK).entity(createdSnapshot).build();
     }
 
+    @POST
+    @Path("{flowId}/versions/import")
+    @Consumes(MediaType.MULTIPART_FORM_DATA)
+    @Produces(MediaType.APPLICATION_JSON)
+    @ApiOperation(
+            value = "Upload flow version",
+            notes = "Uploads the next version of a flow. The version number of 
the object being created must be the " +
+                    "next available version integer. Flow versions are 
immutable after they are created.",
+            response = VersionedFlowSnapshot.class,
+            extensions = {
+                    @Extension(name = "access-policy", properties = {
+                            @ExtensionProperty(name = "action", value = 
"write"),
+                            @ExtensionProperty(name = "resource", value = 
"/buckets/{bucketId}") })
+            }
+    )
+    @ApiResponses({
+            @ApiResponse(code = 400, message = HttpStatusMessages.MESSAGE_400),
+            @ApiResponse(code = 401, message = HttpStatusMessages.MESSAGE_401),
+            @ApiResponse(code = 403, message = HttpStatusMessages.MESSAGE_403),
+            @ApiResponse(code = 404, message = HttpStatusMessages.MESSAGE_404),
+            @ApiResponse(code = 409, message = HttpStatusMessages.MESSAGE_409) 
})
+    public Response importVersionedFlow(
+            @PathParam("bucketId")
+            @ApiParam("The bucket identifier")
+            final String bucketId,
+            @PathParam("flowId")
+            @ApiParam(value = "The flow identifier")
+            final String flowId,
+            @FormDataParam("file") final InputStream in,
+            @FormDataParam("comments") final String comments) {
+
+        if (StringUtils.isBlank(bucketId)) {
+            throw new IllegalArgumentException("The bucket identifier is 
required.");
+        }
+
+        if (StringUtils.isBlank(flowId)) {
+            throw new IllegalArgumentException("The flow identifier is 
required.");
+        }
+
+        // deserialize InputStream to a VersionedFlowSnapshot
+        VersionedFlowSnapshot versionedFlowSnapshot;
+
+        versionedFlowSnapshot = deserializeVersionedFlowSnapshot(in);
+
+        // clear or set the necessary snapShotMetadata
+        if (versionedFlowSnapshot.getSnapshotMetadata() != null) {
+            
versionedFlowSnapshot.getSnapshotMetadata().setBucketIdentifier(null);
+            
versionedFlowSnapshot.getSnapshotMetadata().setFlowIdentifier(null);
+            versionedFlowSnapshot.getSnapshotMetadata().setLink(null);
+            versionedFlowSnapshot.getSnapshotMetadata().setVersion(-1);
+            versionedFlowSnapshot.getSnapshotMetadata().setVersion(-1);
+            // if there are new comments, then set it
+            // otherwise, keep the original comments
+            if (!StringUtils.isBlank(comments)) {
+                
versionedFlowSnapshot.getSnapshotMetadata().setComments(comments);
+            }
+        }
+
+        return createFlowVersion(bucketId, flowId, versionedFlowSnapshot);
+    }
+
+    @POST
+    @Path("import")
+    @Consumes(MediaType.MULTIPART_FORM_DATA)
+    @Produces(MediaType.APPLICATION_JSON)
+    @ApiOperation(
+            value = "Create flow",
+            notes = "Creates a flow in the given bucket. The flow id is 
created by the server and populated in the returned entity.",
+            response = VersionedFlow.class,

Review comment:
       If we keep this end-point, then this should be VersionedFlowSnapshot 
based on what the method is returning below.

##########
File path: nifi-registry-core/nifi-registry-web-api/pom.xml
##########
@@ -423,6 +423,16 @@
             <artifactId>jjwt</artifactId>
             <version>0.7.0</version>
         </dependency>
+        <dependency>
+            <groupId>com.fasterxml.jackson.core</groupId>
+            <artifactId>jackson-databind</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>com.google.protobuf</groupId>
+            <artifactId>protobuf-java</artifactId>

Review comment:
       I haven't made it through all the code yet, but what did we need 
protocol buffers for? 

##########
File path: 
nifi-registry-core/nifi-registry-web-api/src/main/java/org/apache/nifi/registry/web/api/BucketFlowResource.java
##########
@@ -291,6 +303,129 @@ public Response createFlowVersion(
         return 
Response.status(Response.Status.OK).entity(createdSnapshot).build();
     }
 
+    @POST
+    @Path("{flowId}/versions/import")
+    @Consumes(MediaType.MULTIPART_FORM_DATA)
+    @Produces(MediaType.APPLICATION_JSON)
+    @ApiOperation(
+            value = "Upload flow version",
+            notes = "Uploads the next version of a flow. The version number of 
the object being created must be the " +
+                    "next available version integer. Flow versions are 
immutable after they are created.",
+            response = VersionedFlowSnapshot.class,
+            extensions = {
+                    @Extension(name = "access-policy", properties = {
+                            @ExtensionProperty(name = "action", value = 
"write"),
+                            @ExtensionProperty(name = "resource", value = 
"/buckets/{bucketId}") })
+            }
+    )
+    @ApiResponses({
+            @ApiResponse(code = 400, message = HttpStatusMessages.MESSAGE_400),
+            @ApiResponse(code = 401, message = HttpStatusMessages.MESSAGE_401),
+            @ApiResponse(code = 403, message = HttpStatusMessages.MESSAGE_403),
+            @ApiResponse(code = 404, message = HttpStatusMessages.MESSAGE_404),
+            @ApiResponse(code = 409, message = HttpStatusMessages.MESSAGE_409) 
})
+    public Response importVersionedFlow(
+            @PathParam("bucketId")
+            @ApiParam("The bucket identifier")
+            final String bucketId,
+            @PathParam("flowId")
+            @ApiParam(value = "The flow identifier")
+            final String flowId,
+            @FormDataParam("file") final InputStream in,
+            @FormDataParam("comments") final String comments) {
+
+        if (StringUtils.isBlank(bucketId)) {
+            throw new IllegalArgumentException("The bucket identifier is 
required.");
+        }
+
+        if (StringUtils.isBlank(flowId)) {
+            throw new IllegalArgumentException("The flow identifier is 
required.");
+        }
+
+        // deserialize InputStream to a VersionedFlowSnapshot
+        VersionedFlowSnapshot versionedFlowSnapshot;
+
+        versionedFlowSnapshot = deserializeVersionedFlowSnapshot(in);
+
+        // clear or set the necessary snapShotMetadata
+        if (versionedFlowSnapshot.getSnapshotMetadata() != null) {

Review comment:
       I think we always want to clear these metadata values for the case where 
someone has json from another registry that was obtained before your new export 
existed. We probably also need to protect against the metadata being null 
because I think it is null coming from "Download Flow" in NiFi, so maybe always 
set to a new metadata and then apply anything on top of that?
   

##########
File path: 
nifi-registry-core/nifi-registry-web-api/src/main/java/org/apache/nifi/registry/web/api/BucketFlowResource.java
##########
@@ -385,6 +520,64 @@ public Response getLatestFlowVersionMetadata(
         return Response.status(Response.Status.OK).entity(latest).build();
     }
 
+    @GET
+    @Path("{flowId}/versions/{versionNumber: \\d+}/export")
+    @Consumes(MediaType.WILDCARD)
+    @Produces(MediaType.APPLICATION_JSON)
+    @ApiOperation(
+            value = "Exports specified bucket flow version content",
+            notes = "Exports the specified version of a flow, including the 
metadata and content of the flow.",
+            response = VersionedFlowSnapshot.class,
+            extensions = {
+                    @Extension(name = "access-policy", properties = {
+                            @ExtensionProperty(name = "action", value = 
"read"),
+                            @ExtensionProperty(name = "resource", value = 
"/buckets/{bucketId}")})
+            }
+    )
+    @ApiResponses({
+            @ApiResponse(code = 401, message = HttpStatusMessages.MESSAGE_401),
+            @ApiResponse(code = 403, message = HttpStatusMessages.MESSAGE_403),
+            @ApiResponse(code = 404, message = HttpStatusMessages.MESSAGE_404),
+            @ApiResponse(code = 409, message = 
HttpStatusMessages.MESSAGE_409)})
+    public Response exportVersionedFlow(
+            @PathParam("bucketId")
+            @ApiParam("The bucket identifier") final String bucketId,
+            @PathParam("flowId")
+            @ApiParam("The flow identifier") final String flowId,
+            @PathParam("versionNumber")
+            @ApiParam("The version number") final Integer versionNumber) {
+
+        if (StringUtils.isBlank(bucketId)) {
+            throw new IllegalArgumentException("The bucket identifier is 
required.");
+        }
+
+        if (StringUtils.isBlank(flowId)) {
+            throw new IllegalArgumentException("The flow identifier is 
required.");
+        }
+
+        if (versionNumber == null) {
+            throw new IllegalArgumentException("The version number is 
required.");
+        }
+
+        final VersionedFlowSnapshot versionedFlowSnapshot = 
serviceFacade.getFlowSnapshot(bucketId, flowId, versionNumber);

Review comment:
       This is kind of minor, but it might be nice to move most of this logic 
behind the service facade to a new method like exportFlowSnapshot(bucketId, 
flowId, versionNumber). We tried to make it so most of the logic happens behind 
the service facade, and the REST end-points don't do too much.
   
   There is also an example of an existing file-download for extension bundles, 
which uses the StreamingContent/StreamingOutput, it is not as big a deal for 
this case since the JSON probably won't ever be too large, but in case you want 
to try and use this approach...
   
   
https://github.com/apache/nifi-registry/blob/main/nifi-registry-core/nifi-registry-web-api/src/main/java/org/apache/nifi/registry/web/api/ExtensionRepoResource.java#L437-L445




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to