This is an automated email from the ASF dual-hosted git repository.
exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 9fd624cf43 NIFI-14134 Added support for JSON file upload to CLI
pg-import (#9611)
9fd624cf43 is described below
commit 9fd624cf43b7c26a5ce51c880e62424f37c97ccf
Author: Pierre Villard <[email protected]>
AuthorDate: Thu Jan 23 21:55:01 2025 +0100
NIFI-14134 Added support for JSON file upload to CLI pg-import (#9611)
Signed-off-by: David Handermann <[email protected]>
---
.../toolkit/cli/impl/command/nifi/pg/PGImport.java | 132 ++++++++++++++-------
nifi-toolkit/nifi-toolkit-client/pom.xml | 4 +
.../nifi/toolkit/client/ProcessGroupClient.java | 2 +
.../client/impl/JerseyProcessGroupClient.java | 34 ++++++
4 files changed, 130 insertions(+), 42 deletions(-)
diff --git
a/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/command/nifi/pg/PGImport.java
b/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/command/nifi/pg/PGImport.java
index 54d202db25..76b80972b9 100644
---
a/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/command/nifi/pg/PGImport.java
+++
b/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/command/nifi/pg/PGImport.java
@@ -16,6 +16,8 @@
*/
package org.apache.nifi.toolkit.cli.impl.command.nifi.pg;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.commons.cli.MissingOptionException;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.toolkit.cli.api.Context;
@@ -34,6 +36,7 @@ import
org.apache.nifi.web.api.entity.FlowRegistryClientEntity;
import org.apache.nifi.web.api.entity.FlowRegistryClientsEntity;
import org.apache.nifi.web.api.entity.ProcessGroupEntity;
+import java.io.File;
import java.io.IOException;
import java.util.Properties;
import java.util.Set;
@@ -43,6 +46,8 @@ import java.util.Set;
*/
public class PGImport extends AbstractNiFiCommand<StringResult> {
+ private final static ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
public PGImport() {
super("pg-import", StringResult.class);
}
@@ -59,6 +64,7 @@ public class PGImport extends
AbstractNiFiCommand<StringResult> {
@Override
protected void doInitialize(Context context) {
addOption(CommandOption.PG_ID.createOption());
+ addOption(CommandOption.INPUT_SOURCE.createOption());
addOption(CommandOption.REGISTRY_CLIENT_ID.createOption());
addOption(CommandOption.BUCKET_ID.createOption());
addOption(CommandOption.FLOW_ID.createOption());
@@ -73,9 +79,11 @@ public class PGImport extends
AbstractNiFiCommand<StringResult> {
public StringResult doExecute(final NiFiClient client, final Properties
properties)
throws NiFiClientException, IOException, MissingOptionException {
- final String bucketId = getRequiredArg(properties,
CommandOption.BUCKET_ID);
- final String flowId = getRequiredArg(properties,
CommandOption.FLOW_ID);
- final String flowVersion = getRequiredArg(properties,
CommandOption.FLOW_VERSION);
+ final String inputSource = getArg(properties,
CommandOption.INPUT_SOURCE);
+
+ final String bucketId = getArg(properties, CommandOption.BUCKET_ID);
+ final String flowId = getArg(properties, CommandOption.FLOW_ID);
+ final String flowVersion = getArg(properties,
CommandOption.FLOW_VERSION);
final String flowBranch = getArg(properties,
CommandOption.FLOW_BRANCH);
final String posXStr = getArg(properties, CommandOption.POS_X);
@@ -85,6 +93,35 @@ public class PGImport extends
AbstractNiFiCommand<StringResult> {
final boolean posXExists = StringUtils.isNotBlank(posXStr);
final boolean posYExists = StringUtils.isNotBlank(posYStr);
+ final File input = new File(inputSource);
+
+ if (StringUtils.isBlank(inputSource)) {
+ if (StringUtils.isBlank(bucketId)) {
+ throw new IllegalArgumentException("Input path is not
specified so Bucket ID must be specified");
+ }
+ if (StringUtils.isBlank(flowId)) {
+ throw new IllegalArgumentException("Input path is not
specified so Flow ID must be specified");
+ }
+ if (StringUtils.isBlank(flowVersion)) {
+ throw new IllegalArgumentException("Input path is not
specified so Flow Version must be specified");
+ }
+ } else {
+ if (!input.exists() || !input.isFile() || !input.canRead()) {
+ throw new IllegalArgumentException("Specified input is not a
local readable file: " + inputSource);
+ }
+ if (StringUtils.isNotBlank(bucketId)) {
+ throw new IllegalArgumentException("Input path is specified so
Bucket ID should not be specified");
+ }
+ if (StringUtils.isNotBlank(flowId)) {
+ throw new IllegalArgumentException("Input path is specified so
Flow ID should not be specified");
+ }
+ if (StringUtils.isNotBlank(flowVersion)) {
+ throw new IllegalArgumentException("Input path is specified so
Flow Version should not be specified");
+ }
+ if (StringUtils.isNotBlank(flowBranch)) {
+ throw new IllegalArgumentException("Input path is specified so
Flow Branch should not be specified");
+ }
+ }
if ((posXExists && !posYExists)) {
throw new IllegalArgumentException("Missing Y position - Please
specify both X and Y, or specify neither");
@@ -100,27 +137,6 @@ public class PGImport extends
AbstractNiFiCommand<StringResult> {
keepExistingPC = "true";
}
- // if a registry client is specified use it, otherwise see if there is
only one
- // available and use that,
- // if more than one is available then throw an exception because we
don't know
- // which one to use
- String registryId = getArg(properties,
CommandOption.REGISTRY_CLIENT_ID);
- if (StringUtils.isBlank(registryId)) {
- final FlowRegistryClientsEntity registries =
client.getControllerClient().getRegistryClients();
-
- final Set<FlowRegistryClientEntity> entities =
registries.getRegistries();
- if (entities == null || entities.isEmpty()) {
- throw new NiFiClientException("No registry clients available");
- }
-
- if (entities.size() == 1) {
- registryId = entities.stream().findFirst().get().getId();
- } else {
- throw new
MissingOptionException(CommandOption.REGISTRY_CLIENT_ID.getLongName()
- + " must be provided when there is more than one
available");
- }
- }
-
// get the optional id of the parent PG, otherwise fallback to the
root group
String parentPgId = getArg(properties, CommandOption.PG_ID);
if (StringUtils.isBlank(parentPgId)) {
@@ -128,16 +144,6 @@ public class PGImport extends
AbstractNiFiCommand<StringResult> {
parentPgId = flowClient.getRootGroupId();
}
- final VersionControlInformationDTO versionControlInfo = new
VersionControlInformationDTO();
- versionControlInfo.setRegistryId(registryId);
- versionControlInfo.setBucketId(bucketId);
- versionControlInfo.setFlowId(flowId);
- versionControlInfo.setVersion(flowVersion);
-
- if (StringUtils.isNotBlank(flowBranch)) {
- versionControlInfo.setBranch(flowBranch);
- }
-
final PositionDTO posDto = new PositionDTO();
if (posXExists && posYExists) {
posDto.setX(Double.parseDouble(posXStr));
@@ -148,16 +154,58 @@ public class PGImport extends
AbstractNiFiCommand<StringResult> {
posDto.setY(Integer.valueOf(pgBox.getY()).doubleValue());
}
- final ProcessGroupDTO pgDto = new ProcessGroupDTO();
- pgDto.setVersionControlInformation(versionControlInfo);
- pgDto.setPosition(posDto);
+ final ProcessGroupClient pgClient = client.getProcessGroupClient();
+ ProcessGroupEntity createdEntity = null;
+
+ if (StringUtils.isBlank(inputSource)) {
+
+ // if a registry client is specified use it, otherwise see if
there is only one
+ // available and use that, if more than one is available then
throw an exception
+ // because we don't know which one to use
+ String registryId = getArg(properties,
CommandOption.REGISTRY_CLIENT_ID);
+ if (StringUtils.isBlank(registryId)) {
+ final FlowRegistryClientsEntity registries =
client.getControllerClient().getRegistryClients();
+
+ final Set<FlowRegistryClientEntity> entities =
registries.getRegistries();
+ if (entities == null || entities.isEmpty()) {
+ throw new NiFiClientException("No registry clients
available");
+ }
+
+ if (entities.size() == 1) {
+ registryId = entities.stream().findFirst().get().getId();
+ } else {
+ throw new
MissingOptionException(CommandOption.REGISTRY_CLIENT_ID.getLongName()
+ + " must be provided when there is more than one
available");
+ }
+ }
- final ProcessGroupEntity pgEntity = new ProcessGroupEntity();
- pgEntity.setComponent(pgDto);
- pgEntity.setRevision(getInitialRevisionDTO());
+ final VersionControlInformationDTO versionControlInfo = new
VersionControlInformationDTO();
+ versionControlInfo.setRegistryId(registryId);
+ versionControlInfo.setBucketId(bucketId);
+ versionControlInfo.setFlowId(flowId);
+ versionControlInfo.setVersion(flowVersion);
+
+ if (StringUtils.isNotBlank(flowBranch)) {
+ versionControlInfo.setBranch(flowBranch);
+ }
+
+ final ProcessGroupDTO pgDto = new ProcessGroupDTO();
+ pgDto.setVersionControlInformation(versionControlInfo);
+ pgDto.setPosition(posDto);
+
+ final ProcessGroupEntity pgEntity = new ProcessGroupEntity();
+ pgEntity.setComponent(pgDto);
+ pgEntity.setRevision(getInitialRevisionDTO());
+
+ createdEntity = pgClient.createProcessGroup(parentPgId, pgEntity,
Boolean.parseBoolean(keepExistingPC));
+
+ } else {
+ JsonNode rootNode = OBJECT_MAPPER.readTree(input);
+ JsonNode flowContentsNode = rootNode.path("flowContents");
+ String pgName = flowContentsNode.path("name").asText();
+ createdEntity = pgClient.upload(parentPgId, input, pgName,
posDto.getX(), posDto.getY());
+ }
- final ProcessGroupClient pgClient = client.getProcessGroupClient();
- final ProcessGroupEntity createdEntity =
pgClient.createProcessGroup(parentPgId, pgEntity,
Boolean.parseBoolean(keepExistingPC));
return new StringResult(createdEntity.getId(),
getContext().isInteractive());
}
diff --git a/nifi-toolkit/nifi-toolkit-client/pom.xml
b/nifi-toolkit/nifi-toolkit-client/pom.xml
index 53cebb8f39..44b2d7ce85 100644
--- a/nifi-toolkit/nifi-toolkit-client/pom.xml
+++ b/nifi-toolkit/nifi-toolkit-client/pom.xml
@@ -59,6 +59,10 @@ language governing permissions and limitations under the
License. -->
<groupId>org.glassfish.jersey.core</groupId>
<artifactId>jersey-client</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.glassfish.jersey.media</groupId>
+ <artifactId>jersey-media-multipart</artifactId>
+ </dependency>
<dependency>
<groupId>org.glassfish.jersey.media</groupId>
<artifactId>jersey-media-json-jackson</artifactId>
diff --git
a/nifi-toolkit/nifi-toolkit-client/src/main/java/org/apache/nifi/toolkit/client/ProcessGroupClient.java
b/nifi-toolkit/nifi-toolkit-client/src/main/java/org/apache/nifi/toolkit/client/ProcessGroupClient.java
index db8b88208b..2d530eff05 100644
---
a/nifi-toolkit/nifi-toolkit-client/src/main/java/org/apache/nifi/toolkit/client/ProcessGroupClient.java
+++
b/nifi-toolkit/nifi-toolkit-client/src/main/java/org/apache/nifi/toolkit/client/ProcessGroupClient.java
@@ -75,4 +75,6 @@ public interface ProcessGroupClient {
CopyResponseEntity copy(String processGroupId, CopyRequestEntity
copyRequestEntity) throws NiFiClientException, IOException;
PasteResponseEntity paste(String processGroupId, PasteRequestEntity
pasteRequestEntity) throws NiFiClientException, IOException;
+
+ ProcessGroupEntity upload(String parentPgId, File file, String pgName,
Double posX, Double posY) throws NiFiClientException, IOException;
}
diff --git
a/nifi-toolkit/nifi-toolkit-client/src/main/java/org/apache/nifi/toolkit/client/impl/JerseyProcessGroupClient.java
b/nifi-toolkit/nifi-toolkit-client/src/main/java/org/apache/nifi/toolkit/client/impl/JerseyProcessGroupClient.java
index ce0e1bf871..de853db42f 100644
---
a/nifi-toolkit/nifi-toolkit-client/src/main/java/org/apache/nifi/toolkit/client/impl/JerseyProcessGroupClient.java
+++
b/nifi-toolkit/nifi-toolkit-client/src/main/java/org/apache/nifi/toolkit/client/impl/JerseyProcessGroupClient.java
@@ -36,12 +36,15 @@ import org.apache.nifi.web.api.entity.PasteResponseEntity;
import org.apache.nifi.web.api.entity.ProcessGroupEntity;
import org.apache.nifi.web.api.entity.ProcessGroupImportEntity;
import org.apache.nifi.web.api.entity.ProcessGroupReplaceRequestEntity;
+import org.glassfish.jersey.media.multipart.FormDataMultiPart;
+import org.glassfish.jersey.media.multipart.file.FileDataBodyPart;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.nio.file.Files;
import java.nio.file.StandardCopyOption;
+import java.util.UUID;
/**
* Jersey implementation of ProcessGroupClient.
@@ -378,4 +381,35 @@ public class JerseyProcessGroupClient extends
AbstractJerseyClient implements Pr
PasteResponseEntity.class);
});
}
+
+ @Override
+ public ProcessGroupEntity upload(String parentPgId, File file, String
pgName, Double posX, Double posY) throws NiFiClientException, IOException {
+ if (StringUtils.isBlank(parentPgId)) {
+ throw new IllegalArgumentException("Parent process group id cannot
be null or blank");
+ }
+ if (file == null) {
+ throw new IllegalArgumentException("File cannot be null");
+ }
+ if (!file.exists() || !file.canRead()) {
+ throw new IllegalArgumentException("Specified file is not a local
readable file: " + file.getAbsolutePath());
+ }
+
+ FormDataMultiPart form = new FormDataMultiPart();
+
+ form.field("id", parentPgId);
+ form.field("groupName", pgName);
+ form.field("positionX", Double.toString(posX));
+ form.field("positionY", Double.toString(posY));
+ form.field("clientId", UUID.randomUUID().toString());
+ form.bodyPart(new FileDataBodyPart("file", file,
MediaType.APPLICATION_JSON_TYPE));
+
+ return executeAction("Error uploading process group", () -> {
+ final WebTarget target = processGroupsTarget
+ .path("{id}/process-groups/upload")
+ .resolveTemplate("id", parentPgId);
+ return getRequestBuilder(target).post(
+ Entity.entity(form, MediaType.MULTIPART_FORM_DATA),
+ ProcessGroupEntity.class);
+ });
+ }
}
\ No newline at end of file