This is an automated email from the ASF dual-hosted git repository.

nsabonyi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new dcf42d01a4 NIFI-11852 - CLI - connect two process groups
dcf42d01a4 is described below

commit dcf42d01a4d31f3dc8a7fad8618659bb8c7248c5
Author: Pierre Villard <[email protected]>
AuthorDate: Mon Jul 24 22:18:44 2023 +0200

    NIFI-11852 - CLI - connect two process groups
    
    This closes #7527
    
    Signed-off-by: Nandor Soma Abonyi <[email protected]>
---
 .../toolkit/cli/impl/command/CommandOption.java    |   5 +
 .../cli/impl/command/nifi/NiFiCommandGroup.java    |   2 +
 .../cli/impl/command/nifi/pg/PGConnect.java        | 143 +++++++++++++++++++++
 3 files changed, 150 insertions(+)

diff --git 
a/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/command/CommandOption.java
 
b/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/command/CommandOption.java
index ed88dc11e8..de5d6cfc95 100644
--- 
a/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/command/CommandOption.java
+++ 
b/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/command/CommandOption.java
@@ -91,6 +91,11 @@ public enum CommandOption {
     POS_X("px", "posX", "The x coordinate of a position", true),
     POS_Y("py", "posY", "The y coordinate of a position", true),
 
+    SOURCE_PG("sourcePg", "source-pg", "The ID of the source process group", 
true),
+    DESTINATION_PG("destPg", "destination-pg", "The ID of the destination 
process group", true),
+    SOURCE_OUTPUT_PORT("sourceOutput", "source-output-port", "The name of the 
output port in the source process group", true),
+    DESTINATION_INPUT_PORT("destInput", "destination-input-port", "The name of 
the input port in the destination process group", true),
+
     // NiFi - Controller Services
     CS_ID("cs", "controllerServiceId", "The id of a controller service", true),
 
diff --git 
a/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/command/nifi/NiFiCommandGroup.java
 
b/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/command/nifi/NiFiCommandGroup.java
index 91c063ec0e..af24fa011a 100644
--- 
a/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/command/nifi/NiFiCommandGroup.java
+++ 
b/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/command/nifi/NiFiCommandGroup.java
@@ -60,6 +60,7 @@ import 
org.apache.nifi.toolkit.cli.impl.command.nifi.params.SetInheritedParamCon
 import org.apache.nifi.toolkit.cli.impl.command.nifi.params.SetParam;
 import 
org.apache.nifi.toolkit.cli.impl.command.nifi.params.SetParamProviderProperty;
 import org.apache.nifi.toolkit.cli.impl.command.nifi.pg.PGChangeVersion;
+import org.apache.nifi.toolkit.cli.impl.command.nifi.pg.PGConnect;
 import org.apache.nifi.toolkit.cli.impl.command.nifi.pg.PGCreate;
 import 
org.apache.nifi.toolkit.cli.impl.command.nifi.pg.PGCreateControllerService;
 import 
org.apache.nifi.toolkit.cli.impl.command.nifi.pg.PGDisableControllerServices;
@@ -123,6 +124,7 @@ public class NiFiCommandGroup extends AbstractCommandGroup {
         commands.add(new UpdateRegistryClient());
         commands.add(new GetRegistryClientId());
         commands.add(new PGImport());
+        commands.add(new PGConnect());
         commands.add(new PGStart());
         commands.add(new PGStop());
         commands.add(new PGCreate());
diff --git 
a/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/command/nifi/pg/PGConnect.java
 
b/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/command/nifi/pg/PGConnect.java
new file mode 100644
index 0000000000..c7d34227ca
--- /dev/null
+++ 
b/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/command/nifi/pg/PGConnect.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.toolkit.cli.impl.command.nifi.pg;
+
+import org.apache.commons.cli.MissingOptionException;
+import org.apache.nifi.toolkit.cli.api.Context;
+import org.apache.nifi.toolkit.cli.impl.client.nifi.ConnectionClient;
+import org.apache.nifi.toolkit.cli.impl.client.nifi.FlowClient;
+import org.apache.nifi.toolkit.cli.impl.client.nifi.NiFiClient;
+import org.apache.nifi.toolkit.cli.impl.client.nifi.NiFiClientException;
+import org.apache.nifi.toolkit.cli.impl.command.CommandOption;
+import org.apache.nifi.toolkit.cli.impl.command.nifi.AbstractNiFiCommand;
+import org.apache.nifi.toolkit.cli.impl.result.StringResult;
+import org.apache.nifi.web.api.dto.ConnectableDTO;
+import org.apache.nifi.web.api.dto.ConnectionDTO;
+import org.apache.nifi.web.api.dto.RevisionDTO;
+import org.apache.nifi.web.api.entity.ConnectionEntity;
+import org.apache.nifi.web.api.entity.PortEntity;
+import org.apache.nifi.web.api.entity.ProcessGroupFlowEntity;
+
+import java.io.IOException;
+import java.util.Properties;
+import java.util.Set;
+
+/**
+ * Command to stop the components of a process group.
+ */
+public class PGConnect extends AbstractNiFiCommand<StringResult> {
+
+    public PGConnect() {
+        super("pg-connect", StringResult.class);
+    }
+
+    @Override
+    public String getDescription() {
+        return "Connects the output port of the source process group to the 
input port of a destination process group.";
+    }
+
+    @Override
+    protected void doInitialize(final Context context) {
+        addOption(CommandOption.SOURCE_PG.createOption());
+        addOption(CommandOption.SOURCE_OUTPUT_PORT.createOption());
+        addOption(CommandOption.DESTINATION_PG.createOption());
+        addOption(CommandOption.DESTINATION_INPUT_PORT.createOption());
+    }
+
+    @Override
+    public StringResult doExecute(final NiFiClient client, final Properties 
properties)
+            throws NiFiClientException, IOException, MissingOptionException {
+
+        final String sourcePgId = getRequiredArg(properties, 
CommandOption.SOURCE_PG);
+        final String sourceOutputPort = getRequiredArg(properties, 
CommandOption.SOURCE_OUTPUT_PORT);
+        final String destinationPgId = getRequiredArg(properties, 
CommandOption.DESTINATION_PG);
+        final String destinationInputPort = getRequiredArg(properties, 
CommandOption.DESTINATION_INPUT_PORT);
+
+        PortEntity source = null;
+        PortEntity destination = null;
+
+        final FlowClient pgClient = client.getFlowClient();
+
+        final ProcessGroupFlowEntity sourcePgEntity = 
pgClient.getProcessGroup(sourcePgId);
+        final ProcessGroupFlowEntity destinationPgEntity = 
pgClient.getProcessGroup(destinationPgId);
+
+        final String parentPgId = 
sourcePgEntity.getProcessGroupFlow().getParentGroupId();
+        
if(!parentPgId.equals(destinationPgEntity.getProcessGroupFlow().getParentGroupId()))
 {
+            throw new IOException("The source process group and the 
destination process group are not at the same level");
+        }
+
+        // retrieving the ID of the output port based on its name in source 
process group
+        Set<PortEntity> outputPorts = 
sourcePgEntity.getProcessGroupFlow().getFlow().getOutputPorts();
+        for(PortEntity outputPort : outputPorts) {
+            if(outputPort.getComponent().getName().equals(sourceOutputPort)) {
+                source = outputPort;
+                break;
+            }
+        }
+        if(source == null) {
+            throw new IOException("Unable to find an output port with the name 
'" + sourceOutputPort + "' in the source process group");
+        }
+
+        // retrieving the ID of the output port based on its name in source 
process group
+        Set<PortEntity> inputPorts = 
destinationPgEntity.getProcessGroupFlow().getFlow().getInputPorts();
+        for(PortEntity inputPort : inputPorts) {
+            
if(inputPort.getComponent().getName().equals(destinationInputPort)) {
+                destination = inputPort;
+                break;
+            }
+        }
+        if(destination == null) {
+            throw new IOException("Unable to find an input port with the name 
'" + destinationInputPort + "' in the destination process group");
+        }
+
+        final ConnectionEntity connectionEntity = new ConnectionEntity();
+
+        connectionEntity.setDestinationGroupId(destinationPgId);
+        connectionEntity.setDestinationId(destination.getId());
+        connectionEntity.setDestinationType(destination.getPortType());
+
+        connectionEntity.setSourceGroupId(sourcePgId);
+        connectionEntity.setSourceId(source.getId());
+        connectionEntity.setSourceType(source.getPortType());
+
+        final RevisionDTO revisionDto = new RevisionDTO();
+        revisionDto.setClientId(getClass().getName());
+        revisionDto.setVersion(0L);
+        connectionEntity.setRevision(revisionDto);
+
+        final ConnectionDTO connectionDto = new ConnectionDTO();
+        connectionDto.setDestination(createConnectableDTO(destination));
+        connectionDto.setSource(createConnectableDTO(source));
+        connectionDto.setParentGroupId(parentPgId);
+        connectionEntity.setComponent(connectionDto);
+
+        final ConnectionClient connectionClient = client.getConnectionClient();
+        final ConnectionEntity createdEntity = 
connectionClient.createConnection(parentPgId, connectionEntity);
+        return new StringResult(createdEntity.getId(), 
getContext().isInteractive());
+    }
+
+    private ConnectableDTO createConnectableDTO(final PortEntity port) {
+        final ConnectableDTO dto = new ConnectableDTO();
+        dto.setGroupId(port.getComponent().getParentGroupId());
+        dto.setId(port.getId());
+        dto.setName(port.getComponent().getName());
+        
dto.setRunning("RUNNING".equalsIgnoreCase(port.getComponent().getState()));
+        dto.setType(port.getPortType());
+        return dto;
+    }
+
+}

Reply via email to