This is an automated email from the ASF dual-hosted git repository.
exceptionfactory pushed a commit to branch support/nifi-1.x
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/support/nifi-1.x by this push:
new 4b12182752 NIFI-11857 - CLI - recursively change version of Processors
4b12182752 is described below
commit 4b121827525e393ca365ab36023a2f26f905c2b3
Author: Pierre Villard <[email protected]>
AuthorDate: Tue Jul 25 19:12:47 2023 +0200
NIFI-11857 - CLI - recursively change version of Processors
This closes #7528
Signed-off-by: David Handermann <[email protected]>
(cherry picked from commit cb03d6de74c9931e0ac52f77ce98f5b8eb39e3d1)
---
.../toolkit/cli/impl/command/CommandOption.java | 2 +
.../cli/impl/command/nifi/NiFiCommandGroup.java | 2 +
.../nifi/processors/ChangeVersionProcessor.java | 151 +++++++++++++++++++++
.../cli/impl/result/nifi/ProcessorResult.java | 53 ++++++++
.../cli/impl/result/nifi/ProcessorsResult.java | 88 ++++++++++++
5 files changed, 296 insertions(+)
diff --git
a/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/command/CommandOption.java
b/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/command/CommandOption.java
index 6c45f9f033..6770966aea 100644
---
a/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/command/CommandOption.java
+++
b/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/command/CommandOption.java
@@ -60,6 +60,8 @@ public enum CommandOption {
EXT_BUNDLE_GROUP("gr", "group", "The group id of a bundle", true),
EXT_BUNDLE_ARTIFACT("ar", "artifact", "The artifact id of a bundle", true),
EXT_BUNDLE_VERSION("ver", "version", "The version of the bundle", true),
+ EXT_BUNDLE_CURRENT_VERSION("cver", "current-version", "The current version
of the bundle", true),
+ EXT_QUALIFIED_NAME("extname", "extension-name", "The qualified name of the
extension", true),
EXT_TYPE("et", "extensionType", "The type of extension, one of
'PROCESSOR', 'CONTROLLER_SERVICE', or 'REPORTING_TASK'.", true),
EXT_BUNDLE_TYPE("ebt", "extensionBundleType", "The type of extension
bundle, either nifi-nar or minifi-cpp", true),
diff --git
a/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/command/nifi/NiFiCommandGroup.java
b/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/command/nifi/NiFiCommandGroup.java
index 2d57d365a0..a522433afb 100644
---
a/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/command/nifi/NiFiCommandGroup.java
+++
b/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/command/nifi/NiFiCommandGroup.java
@@ -80,6 +80,7 @@ import
org.apache.nifi.toolkit.cli.impl.command.nifi.pg.PGStop;
import org.apache.nifi.toolkit.cli.impl.command.nifi.pg.PGStopVersionControl;
import org.apache.nifi.toolkit.cli.impl.command.nifi.policies.GetAccessPolicy;
import
org.apache.nifi.toolkit.cli.impl.command.nifi.policies.UpdateAccessPolicy;
+import
org.apache.nifi.toolkit.cli.impl.command.nifi.processors.ChangeVersionProcessor;
import
org.apache.nifi.toolkit.cli.impl.command.nifi.registry.CreateRegistryClient;
import
org.apache.nifi.toolkit.cli.impl.command.nifi.registry.GetRegistryClientId;
import
org.apache.nifi.toolkit.cli.impl.command.nifi.registry.ListRegistryClients;
@@ -186,6 +187,7 @@ public class NiFiCommandGroup extends AbstractCommandGroup {
commands.add(new LogoutAccessToken());
commands.add(new GetControllerConfiguration());
commands.add(new UpdateControllerConfiguration());
+ commands.add(new ChangeVersionProcessor());
return new ArrayList<>(commands);
}
}
diff --git
a/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/command/nifi/processors/ChangeVersionProcessor.java
b/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/command/nifi/processors/ChangeVersionProcessor.java
new file mode 100644
index 0000000000..7a4edae98a
--- /dev/null
+++
b/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/command/nifi/processors/ChangeVersionProcessor.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.toolkit.cli.impl.command.nifi.processors;
+
+import org.apache.commons.cli.MissingOptionException;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.toolkit.cli.api.CommandException;
+import org.apache.nifi.toolkit.cli.api.Context;
+import org.apache.nifi.toolkit.cli.impl.client.nifi.FlowClient;
+import org.apache.nifi.toolkit.cli.impl.client.nifi.NiFiClient;
+import org.apache.nifi.toolkit.cli.impl.client.nifi.NiFiClientException;
+import org.apache.nifi.toolkit.cli.impl.client.nifi.ProcessorClient;
+import org.apache.nifi.toolkit.cli.impl.command.CommandOption;
+import org.apache.nifi.toolkit.cli.impl.command.nifi.AbstractNiFiCommand;
+import org.apache.nifi.toolkit.cli.impl.result.nifi.ProcessorsResult;
+import org.apache.nifi.web.api.dto.BundleDTO;
+import org.apache.nifi.web.api.dto.ProcessorDTO;
+import org.apache.nifi.web.api.dto.flow.ProcessGroupFlowDTO;
+import org.apache.nifi.web.api.entity.ProcessGroupEntity;
+import org.apache.nifi.web.api.entity.ProcessGroupFlowEntity;
+import org.apache.nifi.web.api.entity.ProcessorEntity;
+import org.apache.nifi.web.api.entity.ProcessorsEntity;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Properties;
+import java.util.Set;
+
+/**
+ * Command to update the version of processors
+ */
+public class ChangeVersionProcessor extends
AbstractNiFiCommand<ProcessorsResult> {
+
+ public ChangeVersionProcessor() {
+ super("change-version-processor", ProcessorsResult.class);
+ }
+
+ @Override
+ public String getDescription() {
+ return "Recursively changes the version of the instances of the
specified processor. If the process group is specified, the changes "
+ + "will be scoped to that process group and its childs ; if
not specified the changes will recursively apply to the root process "
+ + "group. If the source version is specified, only instances
with this version will be updated to the new version.";
+ }
+
+ @Override
+ protected void doInitialize(Context context) {
+ addOption(CommandOption.PG_ID.createOption());
+ addOption(CommandOption.EXT_BUNDLE_GROUP.createOption());
+ addOption(CommandOption.EXT_BUNDLE_ARTIFACT.createOption());
+ addOption(CommandOption.EXT_BUNDLE_VERSION.createOption());
+ addOption(CommandOption.EXT_QUALIFIED_NAME.createOption());
+ addOption(CommandOption.EXT_BUNDLE_CURRENT_VERSION.createOption());
+ }
+
+ @Override
+ public ProcessorsResult doExecute(NiFiClient client, Properties properties)
+ throws NiFiClientException, IOException, MissingOptionException,
CommandException {
+
+ final String bundleGroup = getRequiredArg(properties,
CommandOption.EXT_BUNDLE_GROUP);
+ final String bundleArtifact = getRequiredArg(properties,
CommandOption.EXT_BUNDLE_ARTIFACT);
+ final String bundleVersion = getRequiredArg(properties,
CommandOption.EXT_BUNDLE_VERSION);
+ final String qualifiedName = getRequiredArg(properties,
CommandOption.EXT_QUALIFIED_NAME);
+ final String sourceVersion = getArg(properties,
CommandOption.EXT_BUNDLE_CURRENT_VERSION);
+
+ final FlowClient flowClient = client.getFlowClient();
+ final ProcessorClient processorClient = client.getProcessorClient();
+
+ String pgId = getArg(properties, CommandOption.PG_ID);
+ if(StringUtils.isBlank(pgId)) {
+ pgId = flowClient.getRootGroupId();
+ }
+
+ Set<ProcessorEntity> updatedComponents =
recursivelyChangeVersionProcessor(flowClient, processorClient, pgId,
bundleGroup,
+ bundleArtifact, bundleVersion, sourceVersion, qualifiedName);
+ ProcessorsEntity processorsEntity = new ProcessorsEntity();
+ processorsEntity.setProcessors(updatedComponents);
+
+ return new ProcessorsResult(getResultType(properties),
processorsEntity);
+ }
+
+ private Set<ProcessorEntity> recursivelyChangeVersionProcessor(FlowClient
flowClient, ProcessorClient processorClient, String pgId, String bundleGroup,
+ String bundleArtifact, String bundleVersion, String sourceVersion,
String qualifiedName) throws NiFiClientException, IOException {
+
+ Set<ProcessorEntity> updatedComponents = new
HashSet<ProcessorEntity>();
+
+ final ProcessGroupFlowEntity sourcePgEntity =
flowClient.getProcessGroup(pgId);
+ final ProcessGroupFlowDTO flow = sourcePgEntity.getProcessGroupFlow();
+
+ final Set<ProcessorEntity> processors = flow.getFlow().getProcessors();
+ for(ProcessorEntity processor : processors) {
+ final BundleDTO bundle = processor.getComponent().getBundle();
+ if(bundle.getGroup().equals(bundleGroup)
+ && bundle.getArtifact().equals(bundleArtifact)
+ && processor.getComponent().getType().equals(qualifiedName)
+ && (StringUtils.isBlank(sourceVersion) ||
bundle.getVersion().equals(sourceVersion))) {
+
+ final boolean isRunning =
processor.getComponent().getState().equals("RUNNING");
+ if(isRunning) {
+ // processor needs to be stopped for changing the version
+ processorClient.stopProcessor(processor);
+ // get the updated entity to have the correct revision
+ processor =
processorClient.getProcessor(processor.getId());
+ }
+
+ final BundleDTO updatedBundle = new BundleDTO(bundleGroup,
bundleArtifact, bundleVersion);
+ final ProcessorDTO processorDto = new ProcessorDTO();
+ processorDto.setId(processor.getId());
+ processorDto.setBundle(updatedBundle);
+
+ final ProcessorEntity updatedEntity = new ProcessorEntity();
+ updatedEntity.setRevision(processor.getRevision());
+ updatedEntity.setComponent(processorDto);
+ updatedEntity.setId(processor.getId());
+
+ processorClient.updateProcessor(updatedEntity);
+
+ if(isRunning) { // restart the component that was previously
running
+ // get the updated entity to have the correct revision
+ processor =
processorClient.getProcessor(processor.getId());
+ processorClient.startProcessor(processor);
+ }
+
+ // get latest version of the entity
+ processor = processorClient.getProcessor(processor.getId());
+ updatedComponents.add(processor);
+ }
+ }
+
+ final Set<ProcessGroupEntity> processGroups =
flow.getFlow().getProcessGroups();
+ for(ProcessGroupEntity processGroup : processGroups) {
+
updatedComponents.addAll(recursivelyChangeVersionProcessor(flowClient,
processorClient, processGroup.getId(), bundleGroup,
+ bundleArtifact, bundleVersion, sourceVersion,
qualifiedName));
+ }
+
+ return updatedComponents;
+ }
+}
diff --git
a/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/result/nifi/ProcessorResult.java
b/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/result/nifi/ProcessorResult.java
new file mode 100644
index 0000000000..ff3f83a422
--- /dev/null
+++
b/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/result/nifi/ProcessorResult.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.toolkit.cli.impl.result.nifi;
+
+import org.apache.commons.lang3.Validate;
+import org.apache.nifi.toolkit.cli.api.ResultType;
+import org.apache.nifi.toolkit.cli.impl.result.AbstractWritableResult;
+import org.apache.nifi.web.api.dto.BundleDTO;
+import org.apache.nifi.web.api.dto.ProcessorDTO;
+import org.apache.nifi.web.api.entity.ProcessorEntity;
+
+import java.io.IOException;
+import java.io.PrintStream;
+
+public class ProcessorResult extends AbstractWritableResult<ProcessorEntity> {
+
+ private final ProcessorEntity processorEntity;
+
+ public ProcessorResult(ResultType resultType, ProcessorEntity
processorEntity) {
+ super(resultType);
+ this.processorEntity = processorEntity;
+ Validate.notNull(processorEntity);
+ }
+
+ @Override
+ public ProcessorEntity getResult() {
+ return processorEntity;
+ }
+
+ @Override
+ protected void writeSimpleResult(PrintStream output) throws IOException {
+ final ProcessorDTO processorDTO = processorEntity.getComponent();
+
+ final BundleDTO bundle = processorDTO.getBundle();
+ output.printf("Name : %s\nID : %s\nType : %s\nBundle: %s - %s
%s\nState : %s\n",
+ processorDTO.getName(), processorDTO.getId(),
processorDTO.getType(),
+ bundle.getGroup(), bundle.getArtifact(), bundle.getVersion(),
processorDTO.getState());
+ }
+}
diff --git
a/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/result/nifi/ProcessorsResult.java
b/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/result/nifi/ProcessorsResult.java
new file mode 100644
index 0000000000..42617a6c66
--- /dev/null
+++
b/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/result/nifi/ProcessorsResult.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.toolkit.cli.impl.result.nifi;
+
+import org.apache.commons.lang3.Validate;
+import org.apache.nifi.toolkit.cli.api.ResultType;
+import org.apache.nifi.toolkit.cli.impl.result.AbstractWritableResult;
+import org.apache.nifi.toolkit.cli.impl.result.writer.DynamicTableWriter;
+import org.apache.nifi.toolkit.cli.impl.result.writer.Table;
+import org.apache.nifi.toolkit.cli.impl.result.writer.TableWriter;
+import org.apache.nifi.web.api.dto.ProcessorDTO;
+import org.apache.nifi.web.api.entity.ProcessorEntity;
+import org.apache.nifi.web.api.entity.ProcessorsEntity;
+
+import java.io.IOException;
+import java.io.PrintStream;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+public class ProcessorsResult extends AbstractWritableResult<ProcessorsEntity>
{
+
+ private final ProcessorsEntity processorsEntity;
+
+ public ProcessorsResult(ResultType resultType, ProcessorsEntity
processorsEntity) {
+ super(resultType);
+ this.processorsEntity = processorsEntity;
+ Validate.notNull(processorsEntity);
+ }
+
+ @Override
+ public ProcessorsEntity getResult() {
+ return processorsEntity;
+ }
+
+ @Override
+ protected void writeSimpleResult(PrintStream output) throws IOException {
+ final Set<ProcessorEntity> processorsEntities =
processorsEntity.getProcessors();
+ if (processorsEntities == null) {
+ return;
+ }
+
+ final List<ProcessorDTO> processorDTOS = processorsEntities.stream()
+ .map(ProcessorEntity::getComponent)
+ .sorted(Comparator.comparing(ProcessorDTO::getName))
+ .collect(Collectors.toList());
+
+ final Table table = new Table.Builder()
+ .column("#", 3, 3, false)
+ .column("Name", 5, 40, true)
+ .column("ID", 36, 36, false)
+ .column("Type", 5, 40, true)
+ .column("Run Status", 10, 20, false)
+ .column("Version", 10, 20, false)
+ .build();
+
+ for (int i = 0; i < processorDTOS.size(); i++) {
+ final ProcessorDTO processorDTO = processorDTOS.get(i);
+ final String[] typeSplit = processorDTO.getType().split("\\.", -1);
+ table.addRow(
+ String.valueOf(i + 1),
+ processorDTO.getName(),
+ processorDTO.getId(),
+ typeSplit[typeSplit.length - 1],
+ processorDTO.getState(),
+ processorDTO.getBundle().getVersion()
+ );
+ }
+
+ final TableWriter tableWriter = new DynamicTableWriter();
+ tableWriter.write(table, output);
+ }
+}
\ No newline at end of file