This is an automated email from the ASF dual-hosted git repository.

exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 9fd624cf43 NIFI-14134 Added support for JSON file upload to CLI 
pg-import (#9611)
9fd624cf43 is described below

commit 9fd624cf43b7c26a5ce51c880e62424f37c97ccf
Author: Pierre Villard <[email protected]>
AuthorDate: Thu Jan 23 21:55:01 2025 +0100

    NIFI-14134 Added support for JSON file upload to CLI pg-import (#9611)
    
    Signed-off-by: David Handermann <[email protected]>
---
 .../toolkit/cli/impl/command/nifi/pg/PGImport.java | 132 ++++++++++++++-------
 nifi-toolkit/nifi-toolkit-client/pom.xml           |   4 +
 .../nifi/toolkit/client/ProcessGroupClient.java    |   2 +
 .../client/impl/JerseyProcessGroupClient.java      |  34 ++++++
 4 files changed, 130 insertions(+), 42 deletions(-)

diff --git 
a/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/command/nifi/pg/PGImport.java
 
b/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/command/nifi/pg/PGImport.java
index 54d202db25..76b80972b9 100644
--- 
a/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/command/nifi/pg/PGImport.java
+++ 
b/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/command/nifi/pg/PGImport.java
@@ -16,6 +16,8 @@
  */
 package org.apache.nifi.toolkit.cli.impl.command.nifi.pg;
 
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.commons.cli.MissingOptionException;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.nifi.toolkit.cli.api.Context;
@@ -34,6 +36,7 @@ import 
org.apache.nifi.web.api.entity.FlowRegistryClientEntity;
 import org.apache.nifi.web.api.entity.FlowRegistryClientsEntity;
 import org.apache.nifi.web.api.entity.ProcessGroupEntity;
 
+import java.io.File;
 import java.io.IOException;
 import java.util.Properties;
 import java.util.Set;
@@ -43,6 +46,8 @@ import java.util.Set;
  */
 public class PGImport extends AbstractNiFiCommand<StringResult> {
 
+    private final static ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
     public PGImport() {
         super("pg-import", StringResult.class);
     }
@@ -59,6 +64,7 @@ public class PGImport extends 
AbstractNiFiCommand<StringResult> {
     @Override
     protected void doInitialize(Context context) {
         addOption(CommandOption.PG_ID.createOption());
+        addOption(CommandOption.INPUT_SOURCE.createOption());
         addOption(CommandOption.REGISTRY_CLIENT_ID.createOption());
         addOption(CommandOption.BUCKET_ID.createOption());
         addOption(CommandOption.FLOW_ID.createOption());
@@ -73,9 +79,11 @@ public class PGImport extends 
AbstractNiFiCommand<StringResult> {
     public StringResult doExecute(final NiFiClient client, final Properties 
properties)
             throws NiFiClientException, IOException, MissingOptionException {
 
-        final String bucketId = getRequiredArg(properties, 
CommandOption.BUCKET_ID);
-        final String flowId = getRequiredArg(properties, 
CommandOption.FLOW_ID);
-        final String flowVersion = getRequiredArg(properties, 
CommandOption.FLOW_VERSION);
+        final String inputSource = getArg(properties, 
CommandOption.INPUT_SOURCE);
+
+        final String bucketId = getArg(properties, CommandOption.BUCKET_ID);
+        final String flowId = getArg(properties, CommandOption.FLOW_ID);
+        final String flowVersion = getArg(properties, 
CommandOption.FLOW_VERSION);
         final String flowBranch = getArg(properties, 
CommandOption.FLOW_BRANCH);
 
         final String posXStr = getArg(properties, CommandOption.POS_X);
@@ -85,6 +93,35 @@ public class PGImport extends 
AbstractNiFiCommand<StringResult> {
 
         final boolean posXExists = StringUtils.isNotBlank(posXStr);
         final boolean posYExists = StringUtils.isNotBlank(posYStr);
+        final File input = new File(inputSource);
+
+        if (StringUtils.isBlank(inputSource)) {
+            if (StringUtils.isBlank(bucketId)) {
+                throw new IllegalArgumentException("Input path is not 
specified so Bucket ID must be specified");
+            }
+            if (StringUtils.isBlank(flowId)) {
+                throw new IllegalArgumentException("Input path is not 
specified so Flow ID must be specified");
+            }
+            if (StringUtils.isBlank(flowVersion)) {
+                throw new IllegalArgumentException("Input path is not 
specified so Flow Version must be specified");
+            }
+        } else {
+            if (!input.exists() || !input.isFile() || !input.canRead()) {
+                throw new IllegalArgumentException("Specified input is not a 
local readable file: " + inputSource);
+            }
+            if (StringUtils.isNotBlank(bucketId)) {
+                throw new IllegalArgumentException("Input path is specified so 
Bucket ID should not be specified");
+            }
+            if (StringUtils.isNotBlank(flowId)) {
+                throw new IllegalArgumentException("Input path is specified so 
Flow ID should not be specified");
+            }
+            if (StringUtils.isNotBlank(flowVersion)) {
+                throw new IllegalArgumentException("Input path is specified so 
Flow Version should not be specified");
+            }
+            if (StringUtils.isNotBlank(flowBranch)) {
+                throw new IllegalArgumentException("Input path is specified so 
Flow Branch should not be specified");
+            }
+        }
 
         if ((posXExists && !posYExists)) {
             throw new IllegalArgumentException("Missing Y position - Please 
specify both X and Y, or specify neither");
@@ -100,27 +137,6 @@ public class PGImport extends 
AbstractNiFiCommand<StringResult> {
             keepExistingPC = "true";
         }
 
-        // if a registry client is specified use it, otherwise see if there is 
only one
-        // available and use that,
-        // if more than one is available then throw an exception because we 
don't know
-        // which one to use
-        String registryId = getArg(properties, 
CommandOption.REGISTRY_CLIENT_ID);
-        if (StringUtils.isBlank(registryId)) {
-            final FlowRegistryClientsEntity registries = 
client.getControllerClient().getRegistryClients();
-
-            final Set<FlowRegistryClientEntity> entities = 
registries.getRegistries();
-            if (entities == null || entities.isEmpty()) {
-                throw new NiFiClientException("No registry clients available");
-            }
-
-            if (entities.size() == 1) {
-                registryId = entities.stream().findFirst().get().getId();
-            } else {
-                throw new 
MissingOptionException(CommandOption.REGISTRY_CLIENT_ID.getLongName()
-                        + " must be provided when there is more than one 
available");
-            }
-        }
-
         // get the optional id of the parent PG, otherwise fallback to the 
root group
         String parentPgId = getArg(properties, CommandOption.PG_ID);
         if (StringUtils.isBlank(parentPgId)) {
@@ -128,16 +144,6 @@ public class PGImport extends 
AbstractNiFiCommand<StringResult> {
             parentPgId = flowClient.getRootGroupId();
         }
 
-        final VersionControlInformationDTO versionControlInfo = new 
VersionControlInformationDTO();
-        versionControlInfo.setRegistryId(registryId);
-        versionControlInfo.setBucketId(bucketId);
-        versionControlInfo.setFlowId(flowId);
-        versionControlInfo.setVersion(flowVersion);
-
-        if (StringUtils.isNotBlank(flowBranch)) {
-            versionControlInfo.setBranch(flowBranch);
-        }
-
         final PositionDTO posDto = new PositionDTO();
         if (posXExists && posYExists) {
             posDto.setX(Double.parseDouble(posXStr));
@@ -148,16 +154,58 @@ public class PGImport extends 
AbstractNiFiCommand<StringResult> {
             posDto.setY(Integer.valueOf(pgBox.getY()).doubleValue());
         }
 
-        final ProcessGroupDTO pgDto = new ProcessGroupDTO();
-        pgDto.setVersionControlInformation(versionControlInfo);
-        pgDto.setPosition(posDto);
+        final ProcessGroupClient pgClient = client.getProcessGroupClient();
+        ProcessGroupEntity createdEntity = null;
+
+        if (StringUtils.isBlank(inputSource)) {
+
+            // if a registry client is specified use it, otherwise see if 
there is only one
+            // available and use that, if more than one is available then 
throw an exception
+            // because we don't know which one to use
+            String registryId = getArg(properties, 
CommandOption.REGISTRY_CLIENT_ID);
+            if (StringUtils.isBlank(registryId)) {
+                final FlowRegistryClientsEntity registries = 
client.getControllerClient().getRegistryClients();
+
+                final Set<FlowRegistryClientEntity> entities = 
registries.getRegistries();
+                if (entities == null || entities.isEmpty()) {
+                    throw new NiFiClientException("No registry clients 
available");
+                }
+
+                if (entities.size() == 1) {
+                    registryId = entities.stream().findFirst().get().getId();
+                } else {
+                    throw new 
MissingOptionException(CommandOption.REGISTRY_CLIENT_ID.getLongName()
+                            + " must be provided when there is more than one 
available");
+                }
+            }
 
-        final ProcessGroupEntity pgEntity = new ProcessGroupEntity();
-        pgEntity.setComponent(pgDto);
-        pgEntity.setRevision(getInitialRevisionDTO());
+            final VersionControlInformationDTO versionControlInfo = new 
VersionControlInformationDTO();
+            versionControlInfo.setRegistryId(registryId);
+            versionControlInfo.setBucketId(bucketId);
+            versionControlInfo.setFlowId(flowId);
+            versionControlInfo.setVersion(flowVersion);
+
+            if (StringUtils.isNotBlank(flowBranch)) {
+                versionControlInfo.setBranch(flowBranch);
+            }
+
+            final ProcessGroupDTO pgDto = new ProcessGroupDTO();
+            pgDto.setVersionControlInformation(versionControlInfo);
+            pgDto.setPosition(posDto);
+
+            final ProcessGroupEntity pgEntity = new ProcessGroupEntity();
+            pgEntity.setComponent(pgDto);
+            pgEntity.setRevision(getInitialRevisionDTO());
+
+            createdEntity = pgClient.createProcessGroup(parentPgId, pgEntity, 
Boolean.parseBoolean(keepExistingPC));
+
+        } else {
+            JsonNode rootNode = OBJECT_MAPPER.readTree(input);
+            JsonNode flowContentsNode = rootNode.path("flowContents");
+            String pgName = flowContentsNode.path("name").asText();
+            createdEntity = pgClient.upload(parentPgId, input, pgName, 
posDto.getX(), posDto.getY());
+        }
 
-        final ProcessGroupClient pgClient = client.getProcessGroupClient();
-        final ProcessGroupEntity createdEntity = 
pgClient.createProcessGroup(parentPgId, pgEntity, 
Boolean.parseBoolean(keepExistingPC));
         return new StringResult(createdEntity.getId(), 
getContext().isInteractive());
     }
 
diff --git a/nifi-toolkit/nifi-toolkit-client/pom.xml 
b/nifi-toolkit/nifi-toolkit-client/pom.xml
index 53cebb8f39..44b2d7ce85 100644
--- a/nifi-toolkit/nifi-toolkit-client/pom.xml
+++ b/nifi-toolkit/nifi-toolkit-client/pom.xml
@@ -59,6 +59,10 @@ language governing permissions and limitations under the 
License. -->
             <groupId>org.glassfish.jersey.core</groupId>
             <artifactId>jersey-client</artifactId>
         </dependency>
+        <dependency>
+            <groupId>org.glassfish.jersey.media</groupId>
+            <artifactId>jersey-media-multipart</artifactId>
+        </dependency>
         <dependency>
             <groupId>org.glassfish.jersey.media</groupId>
             <artifactId>jersey-media-json-jackson</artifactId>
diff --git 
a/nifi-toolkit/nifi-toolkit-client/src/main/java/org/apache/nifi/toolkit/client/ProcessGroupClient.java
 
b/nifi-toolkit/nifi-toolkit-client/src/main/java/org/apache/nifi/toolkit/client/ProcessGroupClient.java
index db8b88208b..2d530eff05 100644
--- 
a/nifi-toolkit/nifi-toolkit-client/src/main/java/org/apache/nifi/toolkit/client/ProcessGroupClient.java
+++ 
b/nifi-toolkit/nifi-toolkit-client/src/main/java/org/apache/nifi/toolkit/client/ProcessGroupClient.java
@@ -75,4 +75,6 @@ public interface ProcessGroupClient {
     CopyResponseEntity copy(String processGroupId, CopyRequestEntity 
copyRequestEntity) throws NiFiClientException, IOException;
 
     PasteResponseEntity paste(String processGroupId, PasteRequestEntity 
pasteRequestEntity) throws NiFiClientException, IOException;
+
+    ProcessGroupEntity upload(String parentPgId, File file, String pgName, 
Double posX, Double posY) throws NiFiClientException, IOException;
 }
diff --git 
a/nifi-toolkit/nifi-toolkit-client/src/main/java/org/apache/nifi/toolkit/client/impl/JerseyProcessGroupClient.java
 
b/nifi-toolkit/nifi-toolkit-client/src/main/java/org/apache/nifi/toolkit/client/impl/JerseyProcessGroupClient.java
index ce0e1bf871..de853db42f 100644
--- 
a/nifi-toolkit/nifi-toolkit-client/src/main/java/org/apache/nifi/toolkit/client/impl/JerseyProcessGroupClient.java
+++ 
b/nifi-toolkit/nifi-toolkit-client/src/main/java/org/apache/nifi/toolkit/client/impl/JerseyProcessGroupClient.java
@@ -36,12 +36,15 @@ import org.apache.nifi.web.api.entity.PasteResponseEntity;
 import org.apache.nifi.web.api.entity.ProcessGroupEntity;
 import org.apache.nifi.web.api.entity.ProcessGroupImportEntity;
 import org.apache.nifi.web.api.entity.ProcessGroupReplaceRequestEntity;
+import org.glassfish.jersey.media.multipart.FormDataMultiPart;
+import org.glassfish.jersey.media.multipart.file.FileDataBodyPart;
 
 import java.io.File;
 import java.io.IOException;
 import java.io.InputStream;
 import java.nio.file.Files;
 import java.nio.file.StandardCopyOption;
+import java.util.UUID;
 
 /**
  * Jersey implementation of ProcessGroupClient.
@@ -378,4 +381,35 @@ public class JerseyProcessGroupClient extends 
AbstractJerseyClient implements Pr
                     PasteResponseEntity.class);
         });
     }
+
+    @Override
+    public ProcessGroupEntity upload(String parentPgId, File file, String 
pgName, Double posX, Double posY) throws NiFiClientException, IOException {
+        if (StringUtils.isBlank(parentPgId)) {
+            throw new IllegalArgumentException("Parent process group id cannot 
be null or blank");
+        }
+        if (file == null) {
+            throw new IllegalArgumentException("File cannot be null");
+        }
+        if (!file.exists() || !file.canRead()) {
+            throw new IllegalArgumentException("Specified file is not a local 
readable file: " + file.getAbsolutePath());
+        }
+
+        FormDataMultiPart form = new FormDataMultiPart();
+
+        form.field("id", parentPgId);
+        form.field("groupName", pgName);
+        form.field("positionX", Double.toString(posX));
+        form.field("positionY", Double.toString(posY));
+        form.field("clientId", UUID.randomUUID().toString());
+        form.bodyPart(new FileDataBodyPart("file", file, 
MediaType.APPLICATION_JSON_TYPE));
+
+        return executeAction("Error uploading process group", () -> {
+            final WebTarget target = processGroupsTarget
+                    .path("{id}/process-groups/upload")
+                    .resolveTemplate("id", parentPgId);
+            return getRequestBuilder(target).post(
+                    Entity.entity(form, MediaType.MULTIPART_FORM_DATA),
+                    ProcessGroupEntity.class);
+        });
+    }
 }
\ No newline at end of file

Reply via email to