gresockj commented on a change in pull request #5514:
URL: https://github.com/apache/nifi/pull/5514#discussion_r751614428
##########
File path:
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/flow/AbstractFlowManager.java
##########
@@ -451,8 +458,11 @@ public ParameterContext createParameterContext(final
String id, final String nam
@Override
public void withParameterContextResolution(final Runnable
parameterContextAction) {
withParameterContextResolution.set(true);
- parameterContextAction.run();
- withParameterContextResolution.set(false);
+ try {
+ parameterContextAction.run();
Review comment:
Nice catch!
##########
File path:
nifi-api/src/main/java/org/apache/nifi/flow/VersionedReportingTask.java
##########
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.flow;
+
+import io.swagger.annotations.ApiModelProperty;
+
+import java.util.Map;
+
+public class VersionedReportingTask extends VersionedComponent implements
VersionedConfigurableComponent, VersionedExtensionComponent {
+ private String type;
+ private Bundle bundle;
+ private Map<String, String> properties;
+ private Map<String, VersionedPropertyDescriptor> propertyDescriptors;
+ private String annotationData;
+ private ScheduledState scheduledState;
+ private String schedulingPeriod;
+ private String schedulingStrategy;
+
+
+ @Override
+ @ApiModelProperty(value = "The type of the reporting task.")
+ public String getType() {
+ return type;
+ }
+
+ @Override
+ public void setType(final String type) {
+ this.type = type;
+ }
+
+ @Override
+ @ApiModelProperty(value = "The details of the artifact that bundled this
reporting task type.")
+ public Bundle getBundle() {
+ return bundle;
+ }
+
+ @Override
+ public void setBundle(Bundle bundle) {
+ this.bundle = bundle;
+ }
+
+ @Override
+ @ApiModelProperty(value = "The properties of the controller service.")
Review comment:
reporting task*
##########
File path:
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardProcessGroupSynchronizer.java
##########
@@ -0,0 +1,2050 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.groups;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.bundle.BundleCoordinate;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.connectable.Connectable;
+import org.apache.nifi.connectable.ConnectableType;
+import org.apache.nifi.connectable.Connection;
+import org.apache.nifi.connectable.Funnel;
+import org.apache.nifi.connectable.Port;
+import org.apache.nifi.connectable.Position;
+import org.apache.nifi.connectable.Size;
+import org.apache.nifi.controller.ComponentNode;
+import org.apache.nifi.controller.ProcessorNode;
+import org.apache.nifi.controller.PropertyConfiguration;
+import org.apache.nifi.controller.ScheduledState;
+import org.apache.nifi.controller.Template;
+import org.apache.nifi.controller.exception.ProcessorInstantiationException;
+import org.apache.nifi.controller.label.Label;
+import org.apache.nifi.controller.queue.FlowFileQueue;
+import org.apache.nifi.controller.queue.LoadBalanceCompression;
+import org.apache.nifi.controller.queue.LoadBalanceStrategy;
+import org.apache.nifi.controller.service.ControllerServiceNode;
+import org.apache.nifi.controller.service.ControllerServiceState;
+import org.apache.nifi.encrypt.EncryptionException;
+import org.apache.nifi.flow.BatchSize;
+import org.apache.nifi.flow.Bundle;
+import org.apache.nifi.flow.ComponentType;
+import org.apache.nifi.flow.ConnectableComponent;
+import org.apache.nifi.flow.VersionedComponent;
+import org.apache.nifi.flow.VersionedConnection;
+import org.apache.nifi.flow.VersionedControllerService;
+import org.apache.nifi.flow.VersionedFlowCoordinates;
+import org.apache.nifi.flow.VersionedFunnel;
+import org.apache.nifi.flow.VersionedLabel;
+import org.apache.nifi.flow.VersionedPort;
+import org.apache.nifi.flow.VersionedProcessGroup;
+import org.apache.nifi.flow.VersionedProcessor;
+import org.apache.nifi.flow.VersionedPropertyDescriptor;
+import org.apache.nifi.flow.VersionedRemoteGroupPort;
+import org.apache.nifi.flow.VersionedRemoteProcessGroup;
+import org.apache.nifi.flowfile.FlowFilePrioritizer;
+import org.apache.nifi.logging.LogLevel;
+import org.apache.nifi.parameter.Parameter;
+import org.apache.nifi.parameter.ParameterContext;
+import org.apache.nifi.parameter.ParameterDescriptor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.registry.VariableDescriptor;
+import org.apache.nifi.registry.client.NiFiRegistryException;
+import org.apache.nifi.registry.flow.FlowRegistry;
+import org.apache.nifi.registry.flow.StandardVersionControlInformation;
+import org.apache.nifi.registry.flow.VersionControlInformation;
+import org.apache.nifi.registry.flow.VersionedFlowSnapshot;
+import org.apache.nifi.registry.flow.VersionedFlowState;
+import org.apache.nifi.registry.flow.VersionedParameter;
+import org.apache.nifi.registry.flow.VersionedParameterContext;
+import org.apache.nifi.registry.flow.diff.ComparableDataFlow;
+import org.apache.nifi.registry.flow.diff.DifferenceType;
+import org.apache.nifi.registry.flow.diff.FlowComparator;
+import org.apache.nifi.registry.flow.diff.FlowComparison;
+import org.apache.nifi.registry.flow.diff.FlowDifference;
+import org.apache.nifi.registry.flow.diff.StandardComparableDataFlow;
+import org.apache.nifi.registry.flow.diff.StandardFlowComparator;
+import org.apache.nifi.registry.flow.diff.StaticDifferenceDescriptor;
+import org.apache.nifi.registry.flow.mapping.NiFiRegistryFlowMapper;
+import org.apache.nifi.remote.PublicPort;
+import org.apache.nifi.remote.RemoteGroupPort;
+import org.apache.nifi.remote.StandardRemoteProcessGroupPortDescriptor;
+import org.apache.nifi.remote.TransferDirection;
+import org.apache.nifi.remote.protocol.SiteToSiteTransportProtocol;
+import org.apache.nifi.scheduling.ExecutionNode;
+import org.apache.nifi.scheduling.SchedulingStrategy;
+import org.apache.nifi.util.FlowDifferenceFilters;
+import org.apache.nifi.web.ResourceNotFoundException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.function.BiConsumer;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+public class StandardProcessGroupSynchronizer implements
ProcessGroupSynchronizer {
+ private static final Logger LOG =
LoggerFactory.getLogger(StandardProcessGroupSynchronizer.class);
+ private static final String TEMP_FUNNEL_ID_SUFFIX = "-temp-funnel";
+ public static final String ENC_PREFIX = "enc{";
+ public static final String ENC_SUFFIX = "}";
+
+ private final ProcessGroupSynchronizationContext context;
+ private final Set<String> updatedVersionedComponentIds = new HashSet<>();
+
+ private Set<String> preExistingVariables = new HashSet<>();
+ private GroupSynchronizationOptions syncOptions;
+
+ public StandardProcessGroupSynchronizer(final
ProcessGroupSynchronizationContext context) {
+ this.context = context;
+ }
+
+ private void setPreExistingVariables(final Set<String>
preExistingVariables) {
+ this.preExistingVariables = preExistingVariables;
+ }
+
+ private void setUpdatedVersionedComponentIds(final Set<String>
updatedVersionedComponentIds) {
+ this.updatedVersionedComponentIds.clear();
+ this.updatedVersionedComponentIds.addAll(updatedVersionedComponentIds);
+ }
+
+ public void setSynchronizationOptions(final GroupSynchronizationOptions
syncOptions) {
+ this.syncOptions = syncOptions;
+ }
+
+ @Override
+ public void synchronize(final ProcessGroup group, final
VersionedFlowSnapshot proposedSnapshot, final GroupSynchronizationOptions
options) throws ProcessorInstantiationException {
+
+ final NiFiRegistryFlowMapper mapper = new
NiFiRegistryFlowMapper(context.getExtensionManager(),
context.getFlowMappingOptions());
+ final VersionedProcessGroup versionedGroup =
mapper.mapProcessGroup(group, context.getControllerServiceProvider(),
context.getFlowRegistryClient(), true);
+
+ final ComparableDataFlow localFlow = new
StandardComparableDataFlow("Currently Loaded Flow", versionedGroup);
+ final ComparableDataFlow proposedFlow = new
StandardComparableDataFlow("Proposed Flow", proposedSnapshot.getFlowContents());
+
+ final PropertyDecryptor decryptor = options.getPropertyDecryptor();
+ final FlowComparator flowComparator = new
StandardFlowComparator(proposedFlow, localFlow, group.getAncestorServiceIds(),
new StaticDifferenceDescriptor(), decryptor::decrypt);
+ final FlowComparison flowComparison = flowComparator.compare();
+
+ updatedVersionedComponentIds.clear();
+ setSynchronizationOptions(options);
+
+ for (final FlowDifference diff : flowComparison.getDifferences()) {
+ if
(FlowDifferenceFilters.isPropertyMissingFromGhostComponent(diff,
context.getFlowManager())) {
+ continue;
+ }
+ if (FlowDifferenceFilters.isScheduledStateNew(diff)) {
+ continue;
+ }
+
+ // If this update adds a new Controller Service, then we need to
check if the service already exists at a higher level
+ // and if so compare our VersionedControllerService to the
existing service.
+ if (diff.getDifferenceType() == DifferenceType.COMPONENT_ADDED) {
+ final VersionedComponent component = diff.getComponentA() ==
null ? diff.getComponentB() : diff.getComponentA();
+ if (ComponentType.CONTROLLER_SERVICE ==
component.getComponentType()) {
+ final ControllerServiceNode serviceNode =
getVersionedControllerService(group, component.getIdentifier());
+ if (serviceNode != null) {
+ final VersionedControllerService versionedService =
mapper.mapControllerService(serviceNode, context.getControllerServiceProvider(),
+
Collections.singleton(serviceNode.getProcessGroupIdentifier()), new
HashMap<>());
+ final Set<FlowDifference> differences =
flowComparator.compareControllerServices(versionedService,
(VersionedControllerService) component);
+
+ if (!differences.isEmpty()) {
+
updatedVersionedComponentIds.add(component.getIdentifier());
+ }
+
+ continue;
+ }
+ }
+ }
+
+ final VersionedComponent component = diff.getComponentA() == null
? diff.getComponentB() : diff.getComponentA();
+ updatedVersionedComponentIds.add(component.getIdentifier());
+
+ if (component.getComponentType() ==
ComponentType.REMOTE_INPUT_PORT || component.getComponentType() ==
ComponentType.REMOTE_OUTPUT_PORT) {
+ final String remoteGroupId = ((VersionedRemoteGroupPort)
component).getRemoteGroupId();
+ updatedVersionedComponentIds.add(remoteGroupId);
+ }
+ }
+
+ if (LOG.isInfoEnabled()) {
+ final String differencesByLine =
flowComparison.getDifferences().stream()
+ .map(FlowDifference::toString)
+ .collect(Collectors.joining("\n"));
+
+ LOG.info("Updating {} to {}; there are {} differences to take into
account:\n{}", group, proposedSnapshot,
+ flowComparison.getDifferences().size(), differencesByLine);
+ }
+
+ final Set<String> knownVariables = getKnownVariableNames(group);
+
+ preExistingVariables.clear();
+
+ // If we don't want to update existing variables, we need to populate
the pre-existing variables so that we know which variables already existed.
+ // We can't do this when updating the Variable Registry for a Process
Group because variables are inherited, and the variables of the parent group
+ // may already have been updated when we get to the point of updating
a child's Variable Registry. As a result, we build up a Set of all known
+ // Variables before we update the Variable Registries.
+ if (!options.isUpdateExistingVariables()) {
+ preExistingVariables.addAll(knownVariables);
+ }
+
+ synchronize(group, proposedSnapshot.getFlowContents(),
proposedSnapshot.getParameterContexts());
Review comment:
Looks like this needs to be wrapped in:
```
context.getFlowManager().withParameterContextResolution(...)
```
Creating a process group whose param context has an inherited param context,
version controlling it, deleting the original param context, and then importing
the process group from registry results in `A ParameterContext with inherited
ParameterContexts may only be created from within a call to
AbstractFlowManager#withParameterContextResolution`
##########
File path:
nifi-api/src/main/java/org/apache/nifi/flow/VersionedReportingTask.java
##########
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.flow;
+
+import io.swagger.annotations.ApiModelProperty;
+
+import java.util.Map;
+
+public class VersionedReportingTask extends VersionedComponent implements
VersionedConfigurableComponent, VersionedExtensionComponent {
+ private String type;
+ private Bundle bundle;
+ private Map<String, String> properties;
+ private Map<String, VersionedPropertyDescriptor> propertyDescriptors;
+ private String annotationData;
+ private ScheduledState scheduledState;
+ private String schedulingPeriod;
+ private String schedulingStrategy;
+
+
+ @Override
+ @ApiModelProperty(value = "The type of the reporting task.")
+ public String getType() {
+ return type;
+ }
+
+ @Override
+ public void setType(final String type) {
+ this.type = type;
+ }
+
+ @Override
+ @ApiModelProperty(value = "The details of the artifact that bundled this
reporting task type.")
+ public Bundle getBundle() {
+ return bundle;
+ }
+
+ @Override
+ public void setBundle(Bundle bundle) {
+ this.bundle = bundle;
+ }
+
+ @Override
+ @ApiModelProperty(value = "The properties of the controller service.")
+ public Map<String, String> getProperties() {
+ return properties;
+ }
+
+ @Override
+ public void setProperties(Map<String, String> properties) {
+ this.properties = properties;
+ }
+
+ @Override
+ @ApiModelProperty("The property descriptors for the controller service.")
+ public Map<String, VersionedPropertyDescriptor> getPropertyDescriptors() {
+ return propertyDescriptors;
+ }
+
+ @Override
+ public void setPropertyDescriptors(Map<String,
VersionedPropertyDescriptor> propertyDescriptors) {
+ this.propertyDescriptors = propertyDescriptors;
+ }
+
+ @ApiModelProperty(value = "The annotation for the controller service. This
is how the custom UI relays configuration to the controller service.")
Review comment:
reporting task*
##########
File path:
nifi-api/src/main/java/org/apache/nifi/flow/VersionedReportingTask.java
##########
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.flow;
+
+import io.swagger.annotations.ApiModelProperty;
+
+import java.util.Map;
+
+public class VersionedReportingTask extends VersionedComponent implements
VersionedConfigurableComponent, VersionedExtensionComponent {
+ private String type;
+ private Bundle bundle;
+ private Map<String, String> properties;
+ private Map<String, VersionedPropertyDescriptor> propertyDescriptors;
+ private String annotationData;
+ private ScheduledState scheduledState;
+ private String schedulingPeriod;
+ private String schedulingStrategy;
+
+
+ @Override
+ @ApiModelProperty(value = "The type of the reporting task.")
+ public String getType() {
+ return type;
+ }
+
+ @Override
+ public void setType(final String type) {
+ this.type = type;
+ }
+
+ @Override
+ @ApiModelProperty(value = "The details of the artifact that bundled this
reporting task type.")
+ public Bundle getBundle() {
+ return bundle;
+ }
+
+ @Override
+ public void setBundle(Bundle bundle) {
+ this.bundle = bundle;
+ }
+
+ @Override
+ @ApiModelProperty(value = "The properties of the controller service.")
+ public Map<String, String> getProperties() {
+ return properties;
+ }
+
+ @Override
+ public void setProperties(Map<String, String> properties) {
+ this.properties = properties;
+ }
+
+ @Override
+ @ApiModelProperty("The property descriptors for the controller service.")
Review comment:
reporting task*
##########
File path:
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java
##########
@@ -174,7 +174,7 @@ public ScheduledState getScheduledState() {
* states such as STOPPING and STARTING.
*
* @return the physical state of this processor [DISABLED, STOPPED,
RUNNING,
- * STARTIING, STOPIING]
+ * STARTING, STOPIING]
Review comment:
STOPPING*
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]