http://git-wip-us.apache.org/repos/asf/nifi/blob/57dadb72/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/components/state/StateManagerProvider.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/components/state/StateManagerProvider.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/components/state/StateManagerProvider.java new file mode 100644 index 0000000..0b1ab8d --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/components/state/StateManagerProvider.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.components.state; + +/** + * <p> + * Interface that provides a mechanism for obtaining the {@link StateManager} for a particular component + * </p> + */ +public interface StateManagerProvider { + /** + * Returns the StateManager for the component with the given ID, or <code>null</code> if no State Manager + * exists for the component with the given ID + * + * @param componentId the id of the component for which the StateManager should be returned + * + * @return the StateManager for the component with the given ID, or <code>null</code> if no State Manager + * exists for the component with the given ID + */ + StateManager getStateManager(String componentId); + + /** + * Notifies the State Manager Provider that the component with the given ID has been removed from the NiFi instance + * and will no longer be needed, so the appropriate resource cleanup can take place. + * + * @param componentId the ID of the component that has been removed + */ + void onComponentRemoved(String componentId); + + /** + * Shuts down the state managers, cleaning up any resources that they occupy + */ + void shutdown(); + + /** + * Initializes the Cluster State Provider and enables it for use + */ + void enableClusterProvider(); + + /** + * Disables the Cluster State Provider and begins using the Local State Provider to persist and retrieve + * state, even when components request a clustered provider + */ + void disableClusterProvider(); +}
http://git-wip-us.apache.org/repos/asf/nifi/blob/57dadb72/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractConfiguredComponent.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractConfiguredComponent.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractConfiguredComponent.java index 978c612..214467d 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractConfiguredComponent.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractConfiguredComponent.java @@ -85,7 +85,7 @@ public abstract class AbstractConfiguredComponent implements ConfigurableCompone } @Override - public void setProperty(final String name, final String value) { + public void setProperty(final String name, final String value, final boolean triggerOnPropertyModified) { if (null == name || null == value) { throw new IllegalArgumentException(); } @@ -114,10 +114,12 @@ public abstract class AbstractConfiguredComponent implements ConfigurableCompone } } - try { - component.onPropertyModified(descriptor, oldValue, value); - } catch (final Throwable t) { - // nothing really to do here... + if (triggerOnPropertyModified) { + try { + component.onPropertyModified(descriptor, oldValue, value); + } catch (final Exception e) { + // nothing really to do here... + } } } } @@ -133,11 +135,12 @@ public abstract class AbstractConfiguredComponent implements ConfigurableCompone * if was a dynamic property. * * @param name the property to remove + * @param triggerOnPropertyModified specifies whether or not the onPropertyModified method should be called * @return true if removed; false otherwise * @throws java.lang.IllegalArgumentException if the name is null */ @Override - public boolean removeProperty(final String name) { + public boolean removeProperty(final String name, final boolean triggerOnPropertyModified) { if (null == name) { throw new IllegalArgumentException(); } @@ -160,7 +163,10 @@ public abstract class AbstractConfiguredComponent implements ConfigurableCompone } } - component.onPropertyModified(descriptor, value, null); + if (triggerOnPropertyModified) { + component.onPropertyModified(descriptor, value, null); + } + return true; } } http://git-wip-us.apache.org/repos/asf/nifi/blob/57dadb72/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ConfiguredComponent.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ConfiguredComponent.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ConfiguredComponent.java index 8b2794d..7e49700 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ConfiguredComponent.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ConfiguredComponent.java @@ -34,7 +34,15 @@ public interface ConfiguredComponent { public void setAnnotationData(String data); - public void setProperty(String name, String value); + /** + * Sets the property with the given name to the given value + * + * @param name the name of the property to update + * @param value the value to update the property to + * @param triggerOnPropertyModified if <code>true</code>, will trigger the #onPropertyModified method of the component + * to be called, otherwise will not + */ + public void setProperty(String name, String value, boolean triggerOnPropertyModified); /** * Removes the property and value for the given property name if a @@ -43,10 +51,12 @@ public interface ConfiguredComponent { * if was a dynamic property. * * @param name the property to remove + * @param triggerOnPropertyModified if <code>true</code>, will trigger the #onPropertyModified method of the component + * to be called, otherwise will not * @return true if removed; false otherwise * @throws java.lang.IllegalArgumentException if the name is null */ - public boolean removeProperty(String name); + public boolean removeProperty(String name, boolean triggerOnPropertyModified); public Map<PropertyDescriptor, String> getProperties(); http://git-wip-us.apache.org/repos/asf/nifi/blob/57dadb72/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/pom.xml ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/pom.xml b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/pom.xml index 783235a..882695e 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/pom.xml +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/pom.xml @@ -1,19 +1,16 @@ <?xml version="1.0" encoding="UTF-8"?> -<!-- - Licensed to the Apache Software Foundation (ASF) under one or more - contributor license agreements. See the NOTICE file distributed with - this work for additional information regarding copyright ownership. - The ASF licenses this file to You under the Apache License, Version 2.0 - (the "License"); you may not use this file except in compliance with - the License. You may obtain a copy of the License at - http://www.apache.org/licenses/LICENSE-2.0 - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. ---> -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> +<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor + license agreements. See the NOTICE file distributed with this work for additional + information regarding copyright ownership. The ASF licenses this file to + You under the Apache License, Version 2.0 (the "License"); you may not use + this file except in compliance with the License. You may obtain a copy of + the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required + by applicable law or agreed to in writing, software distributed under the + License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS + OF ANY KIND, either express or implied. See the License for the specific + language governing permissions and limitations under the License. --> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.apache.nifi</groupId> @@ -120,6 +117,27 @@ <artifactId>nifi-write-ahead-log</artifactId> </dependency> <dependency> + <groupId>org.apache.zookeeper</groupId> + <artifactId>zookeeper</artifactId> + </dependency> + + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-processor-utils</artifactId> + </dependency> + + <dependency> + <groupId>org.apache.curator</groupId> + <artifactId>curator-test</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.testng</groupId> + <artifactId>testng</artifactId> + <scope>test</scope> + </dependency> + + <dependency> <groupId>org.apache.nifi</groupId> <artifactId>nifi-mock</artifactId> <scope>test</scope> http://git-wip-us.apache.org/repos/asf/nifi/blob/57dadb72/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/cluster/HeartbeatPayload.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/cluster/HeartbeatPayload.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/cluster/HeartbeatPayload.java index 5414259..ff3ad4e 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/cluster/HeartbeatPayload.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/cluster/HeartbeatPayload.java @@ -56,8 +56,6 @@ public class HeartbeatPayload { private long totalFlowFileCount; private long totalFlowFileBytes; private SystemDiagnostics systemDiagnostics; - private Integer siteToSitePort; - private boolean siteToSiteSecure; private long systemStartTime; @XmlJavaTypeAdapter(CounterAdapter.class) @@ -109,22 +107,6 @@ public class HeartbeatPayload { this.systemDiagnostics = systemDiagnostics; } - public boolean isSiteToSiteSecure() { - return siteToSiteSecure; - } - - public void setSiteToSiteSecure(final boolean secure) { - this.siteToSiteSecure = secure; - } - - public Integer getSiteToSitePort() { - return siteToSitePort; - } - - public void setSiteToSitePort(final Integer port) { - this.siteToSitePort = port; - } - public long getSystemStartTime() { return systemStartTime; } http://git-wip-us.apache.org/repos/asf/nifi/blob/57dadb72/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java index dd3b687..95c93ba 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java @@ -16,7 +16,39 @@ */ package org.apache.nifi.controller; -import com.sun.jersey.api.client.ClientHandlerException; +import static java.util.Objects.requireNonNull; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.text.DateFormat; +import java.text.SimpleDateFormat; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.Date; +import java.util.HashSet; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +import javax.net.ssl.SSLContext; + import org.apache.commons.lang3.StringUtils; import org.apache.nifi.action.Action; import org.apache.nifi.admin.service.AuditService; @@ -37,6 +69,7 @@ import org.apache.nifi.cluster.protocol.UnknownServiceAddressException; import org.apache.nifi.cluster.protocol.message.HeartbeatMessage; import org.apache.nifi.cluster.protocol.message.NodeBulletinsMessage; import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.state.StateManagerProvider; import org.apache.nifi.connectable.Connectable; import org.apache.nifi.connectable.ConnectableType; import org.apache.nifi.connectable.Connection; @@ -88,6 +121,8 @@ import org.apache.nifi.controller.service.ControllerServiceNode; import org.apache.nifi.controller.service.ControllerServiceProvider; import org.apache.nifi.controller.service.StandardConfigurationContext; import org.apache.nifi.controller.service.StandardControllerServiceProvider; +import org.apache.nifi.controller.state.manager.StandardStateManagerProvider; +import org.apache.nifi.controller.state.server.ZooKeeperStateServer; import org.apache.nifi.controller.status.ConnectionStatus; import org.apache.nifi.controller.status.PortStatus; import org.apache.nifi.controller.status.ProcessGroupStatus; @@ -173,41 +208,11 @@ import org.apache.nifi.web.api.dto.RemoteProcessGroupDTO; import org.apache.nifi.web.api.dto.RemoteProcessGroupPortDTO; import org.apache.nifi.web.api.dto.TemplateDTO; import org.apache.nifi.web.api.dto.status.StatusHistoryDTO; +import org.apache.zookeeper.server.quorum.QuorumPeerConfig.ConfigException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.net.ssl.SSLContext; - -import java.io.ByteArrayInputStream; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.text.DateFormat; -import java.text.SimpleDateFormat; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; -import java.util.Date; -import java.util.HashSet; -import java.util.LinkedHashSet; -import java.util.List; -import java.util.Locale; -import java.util.Map; -import java.util.Set; -import java.util.UUID; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicReference; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantReadWriteLock; - -import static java.util.Objects.requireNonNull; +import com.sun.jersey.api.client.ClientHandlerException; public class FlowController implements EventAccess, ControllerServiceProvider, ReportingTaskProvider, Heartbeater, QueueProvider { @@ -251,9 +256,12 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R private final AuditService auditService; private final EventDrivenWorkerQueue eventDrivenWorkerQueue; private final ComponentStatusRepository componentStatusRepository; + private final StateManagerProvider stateManagerProvider; private final long systemStartTime = System.currentTimeMillis(); // time at which the node was started private final ConcurrentMap<String, ReportingTaskNode> reportingTasks = new ConcurrentHashMap<>(); + private volatile ZooKeeperStateServer zooKeeperStateServer; + // The Heartbeat Bean is used to provide an Atomic Reference to data that is used in heartbeats that may // change while the instance is running. We do this because we want to generate heartbeats even if we // are unable to obtain a read lock on the entire FlowController. @@ -419,13 +427,19 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R throw new RuntimeException("Unable to create Provenance Repository", e); } - processScheduler = new StandardProcessScheduler(this, this, encryptor); + try { + this.stateManagerProvider = StandardStateManagerProvider.create(properties); + } catch (final IOException e) { + throw new RuntimeException(e); + } + + processScheduler = new StandardProcessScheduler(this, this, encryptor, stateManagerProvider); eventDrivenWorkerQueue = new EventDrivenWorkerQueue(false, false, processScheduler); - controllerServiceProvider = new StandardControllerServiceProvider(processScheduler, bulletinRepository); + controllerServiceProvider = new StandardControllerServiceProvider(processScheduler, bulletinRepository, stateManagerProvider); final ProcessContextFactory contextFactory = new ProcessContextFactory(contentRepository, flowFileRepository, flowFileEventRepository, counterRepositoryRef.get(), provenanceEventRepository); processScheduler.setSchedulingAgent(SchedulingStrategy.EVENT_DRIVEN, new EventDrivenSchedulingAgent( - eventDrivenEngineRef.get(), this, eventDrivenWorkerQueue, contextFactory, maxEventDrivenThreads.get(), encryptor)); + eventDrivenEngineRef.get(), this, stateManagerProvider, eventDrivenWorkerQueue, contextFactory, maxEventDrivenThreads.get(), encryptor)); final QuartzSchedulingAgent quartzSchedulingAgent = new QuartzSchedulingAgent(this, timerDrivenEngineRef.get(), contextFactory, encryptor); final TimerDrivenSchedulingAgent timerDrivenAgent = new TimerDrivenSchedulingAgent(this, timerDrivenEngineRef.get(), contextFactory, encryptor); @@ -469,7 +483,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R this.snippetManager = new SnippetManager(); - rootGroup = new StandardProcessGroup(UUID.randomUUID().toString(), this, processScheduler, properties, encryptor); + rootGroup = new StandardProcessGroup(UUID.randomUUID().toString(), this, processScheduler, properties, encryptor, this); rootGroup.setName(DEFAULT_ROOT_GROUP_NAME); instanceId = UUID.randomUUID().toString(); @@ -496,6 +510,17 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R snapshotMillis = FormatUtils.getTimeDuration(NiFiProperties.DEFAULT_COMPONENT_STATUS_SNAPSHOT_FREQUENCY, TimeUnit.MILLISECONDS); } + // Initialize the Embedded ZooKeeper server, if applicable + if (properties.isStartEmbeddedZooKeeper()) { + try { + zooKeeperStateServer = ZooKeeperStateServer.create(properties); + } catch (final IOException | ConfigException e) { + throw new IllegalStateException("Unable to initailize Flow because NiFi was configured to start an Embedded Zookeeper server but failed to do so", e); + } + } else { + zooKeeperStateServer = null; + } + componentStatusRepository = createComponentStatusRepository(); timerDrivenEngineRef.get().scheduleWithFixedDelay(new Runnable() { @Override @@ -668,6 +693,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R } } + private ContentRepository createContentRepository(final NiFiProperties properties) throws InstantiationException, IllegalAccessException, ClassNotFoundException { final String implementationClassName = properties.getProperty(NiFiProperties.CONTENT_REPOSITORY_IMPLEMENTATION, DEFAULT_CONTENT_REPO_IMPLEMENTATION); if (implementationClassName == null) { @@ -714,6 +740,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R } } + /** * Creates a connection between two Connectable objects. * @@ -835,7 +862,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R * @throws NullPointerException if the argument is null */ public ProcessGroup createProcessGroup(final String id) { - return new StandardProcessGroup(requireNonNull(id).intern(), this, processScheduler, properties, encryptor); + return new StandardProcessGroup(requireNonNull(id).intern(), this, processScheduler, properties, encryptor, this); } /** @@ -945,6 +972,10 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R return snippetManager; } + public StateManagerProvider getStateManagerProvider() { + return stateManagerProvider; + } + /** * Creates a Port to use as an Input Port for the root Process Group, which is used for Site-to-Site communications * @@ -1021,6 +1052,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R } } + /** * Sets the name for the Root Group, which also changes the name for the controller. * @@ -1106,6 +1138,8 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R // Trigger any processors' methods marked with @OnShutdown to be called rootGroup.shutdown(); + stateManagerProvider.shutdown(); + // invoke any methods annotated with @OnShutdown on Controller Services for (final ControllerServiceNode serviceNode : getAllControllerServices()) { try (final NarCloseable narCloseable = NarCloseable.withNarLoader()) { @@ -1443,7 +1477,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R for (final Map.Entry<String, String> entry : controllerServiceDTO.getProperties().entrySet()) { if (entry.getValue() != null) { - serviceNode.setProperty(entry.getKey(), entry.getValue()); + serviceNode.setProperty(entry.getKey(), entry.getValue(), true); } } } @@ -1561,7 +1595,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R if (config.getProperties() != null) { for (final Map.Entry<String, String> entry : config.getProperties().entrySet()) { if (entry.getValue() != null) { - procNode.setProperty(entry.getKey(), entry.getValue()); + procNode.setProperty(entry.getKey(), entry.getValue(), true); } } } @@ -3019,7 +3053,6 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R * * @param clustered true if clustered * @param clusterInstanceId if clustered is true, indicates the InstanceID of the Cluster Manager - * @param clusterManagerDn the DN of the NCM */ public void setClustered(final boolean clustered, final String clusterInstanceId, final String clusterManagerDn) { writeLock.lock(); @@ -3046,8 +3079,26 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R if (clustered) { nodeBulletinSubscriber.set(new NodeBulletinProcessingStrategy()); bulletinRepository.overrideDefaultBulletinProcessing(nodeBulletinSubscriber.get()); + stateManagerProvider.enableClusterProvider(); + + if (zooKeeperStateServer != null) { + processScheduler.submitFrameworkTask(new Runnable() { + @Override + public void run() { + try { + zooKeeperStateServer.start(); + } catch (final Exception e) { + LOG.error("NiFi was connected to the cluster but failed to start embedded ZooKeeper Server", e); + } + } + }); + } } else { bulletinRepository.restoreDefaultBulletinProcessing(); + if (zooKeeperStateServer != null) { + zooKeeperStateServer.shutdown(); + } + stateManagerProvider.disableClusterProvider(); } final List<RemoteProcessGroup> remoteGroups = getGroup(getRootGroupId()).findAllRemoteProcessGroups(); @@ -3063,6 +3114,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R } } + /** * @return true if this instance is the primary node in the cluster; false otherwise */ @@ -3687,8 +3739,6 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R hbPayload.setCounters(getCounters()); hbPayload.setSystemDiagnostics(getSystemDiagnostics()); hbPayload.setProcessGroupStatus(procGroupStatus); - hbPayload.setSiteToSitePort(remoteInputSocketPort); - hbPayload.setSiteToSiteSecure(isSiteToSiteSecure); // create heartbeat message final Heartbeat heartbeat = new Heartbeat(getNodeId(), bean.isPrimary(), bean.isConnected(), hbPayload.marshal()); http://git-wip-us.apache.org/repos/asf/nifi/blob/57dadb72/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java index 1511293..6250c5a 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java @@ -26,7 +26,9 @@ import java.nio.file.Path; import java.nio.file.Paths; import java.nio.file.StandardOpenOption; import java.util.Calendar; +import java.util.Collections; import java.util.Date; +import java.util.Map; import java.util.UUID; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; @@ -37,6 +39,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.zip.GZIPInputStream; import java.util.zip.GZIPOutputStream; +import org.apache.commons.lang3.StringUtils; import org.apache.nifi.cluster.ConnectionException; import org.apache.nifi.cluster.protocol.ConnectionRequest; import org.apache.nifi.cluster.protocol.ConnectionResponse; @@ -57,9 +60,11 @@ import org.apache.nifi.cluster.protocol.message.ProtocolMessage; import org.apache.nifi.cluster.protocol.message.ReconnectionFailureMessage; import org.apache.nifi.cluster.protocol.message.ReconnectionRequestMessage; import org.apache.nifi.cluster.protocol.message.ReconnectionResponseMessage; +import org.apache.nifi.components.state.Scope; +import org.apache.nifi.components.state.StateManager; +import org.apache.nifi.encrypt.StringEncryptor; import org.apache.nifi.engine.FlowEngine; import org.apache.nifi.events.BulletinFactory; -import org.apache.nifi.util.file.FileUtils; import org.apache.nifi.groups.ProcessGroup; import org.apache.nifi.lifecycle.LifeCycleStartException; import org.apache.nifi.logging.LogLevel; @@ -69,15 +74,17 @@ import org.apache.nifi.reporting.Bulletin; import org.apache.nifi.services.FlowService; import org.apache.nifi.util.FormatUtils; import org.apache.nifi.util.NiFiProperties; - -import org.apache.commons.lang3.StringUtils; -import org.apache.nifi.encrypt.StringEncryptor; +import org.apache.nifi.util.file.FileUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class StandardFlowService implements FlowService, ProtocolHandler { private static final String EVENT_CATEGORY = "Controller"; + private static final String CLUSTER_NODE_CONFIG = "Cluster Node Configuration"; + + // state keys + private static final String NODE_UUID = "Node UUID"; private final FlowController controller; private final Path flowXml; @@ -169,8 +176,21 @@ public class StandardFlowService implements FlowService, ProtocolHandler { final InetSocketAddress nodeApiAddress = properties.getNodeApiAddress(); final InetSocketAddress nodeSocketAddress = properties.getClusterNodeProtocolAddress(); + String nodeUuid = null; + final StateManager stateManager = controller.getStateManagerProvider().getStateManager(CLUSTER_NODE_CONFIG); + if (stateManager != null) { + nodeUuid = stateManager.getState(Scope.LOCAL).get(NODE_UUID); + } + + if (nodeUuid == null) { + nodeUuid = UUID.randomUUID().toString(); + } + // use a random UUID as the proposed node identifier - this.nodeId = new NodeIdentifier(UUID.randomUUID().toString(), nodeApiAddress.getHostName(), nodeApiAddress.getPort(), nodeSocketAddress.getHostName(), nodeSocketAddress.getPort()); + this.nodeId = new NodeIdentifier(nodeUuid, + nodeApiAddress.getHostName(), nodeApiAddress.getPort(), + nodeSocketAddress.getHostName(), nodeSocketAddress.getPort(), + properties.getRemoteInputHost(), properties.getRemoteInputPort(), properties.isSiteToSiteSecure()); } else { this.configuredForClustering = false; @@ -179,6 +199,7 @@ public class StandardFlowService implements FlowService, ProtocolHandler { } + @Override public void saveFlowChanges() throws IOException { writeLock.lock(); @@ -507,7 +528,7 @@ public class StandardFlowService implements FlowService, ProtocolHandler { // reconnect final ConnectionResponse connectionResponse = new ConnectionResponse(nodeId, request.getDataFlow(), request.isPrimary(), - request.getManagerRemoteSiteListeningPort(), request.isManagerRemoteSiteCommsSecure(), request.getInstanceId()); + request.getManagerRemoteSiteListeningPort(), request.isManagerRemoteSiteCommsSecure(), request.getInstanceId()); connectionResponse.setClusterManagerDN(request.getRequestorDN()); loadFromConnectionResponse(connectionResponse); @@ -616,8 +637,6 @@ public class StandardFlowService implements FlowService, ProtocolHandler { if (firstControllerInitialization) { logger.debug("First controller initialization. Loading reporting tasks and initializing controller."); - // load the controller tasks -// dao.loadReportingTasks(controller); // initialize the flow controller.initializeFlow(); @@ -650,8 +669,8 @@ public class StandardFlowService implements FlowService, ProtocolHandler { for (int i = 0; i < maxAttempts || retryIndefinitely; i++) { try { response = senderListener.requestConnection(requestMsg).getConnectionResponse(); - if (response.isBlockedByFirewall()) { - logger.warn("Connection request was blocked by cluster manager's firewall."); + if (response.getRejectionReason() != null) { + logger.warn("Connection request was blocked by cluster manager with the explanation: " + response.getRejectionReason()); // set response to null and treat a firewall blockage the same as getting no response from manager response = null; break; @@ -667,7 +686,6 @@ public class StandardFlowService implements FlowService, ProtocolHandler { // we received a successful connection response from manager break; } - } catch (final Exception pe) { // could not create a socket and communicate with manager logger.warn("Failed to connect to cluster due to: " + pe, pe); @@ -691,6 +709,16 @@ public class StandardFlowService implements FlowService, ProtocolHandler { return null; } else { // cluster manager provided a successful response with a current dataflow + // persist node uuid and index returned by NCM and return the response to the caller + try { + // Ensure that we have registered our 'cluster node configuration' state key + final Map<String, String> map = Collections.singletonMap(NODE_UUID, response.getNodeIdentifier().getId()); + controller.getStateManagerProvider().getStateManager(CLUSTER_NODE_CONFIG).setState(map, Scope.LOCAL); + } catch (final IOException ioe) { + logger.warn("Received successful response from Cluster Manager but failed to persist state about the Node's Unique Identifier and the Node's Index. " + + "This node may be assigned a different UUID when the node is restarted.", ioe); + } + return response; } } finally { http://git-wip-us.apache.org/repos/asf/nifi/blob/57dadb72/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java index 0dd3b64..07dd58b 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java @@ -171,7 +171,7 @@ public class StandardFlowSynchronizer implements FlowSynchronizer { controller.setMaxEventDrivenThreadCount(maxThreadCount / 3); } - final Element reportingTasksElement = (Element) DomUtils.getChild(rootElement, "reportingTasks"); + final Element reportingTasksElement = DomUtils.getChild(rootElement, "reportingTasks"); final List<Element> taskElements; if (reportingTasksElement == null) { taskElements = Collections.emptyList(); @@ -179,7 +179,7 @@ public class StandardFlowSynchronizer implements FlowSynchronizer { taskElements = DomUtils.getChildElementsByTagName(reportingTasksElement, "reportingTask"); } - final Element controllerServicesElement = (Element) DomUtils.getChild(rootElement, "controllerServices"); + final Element controllerServicesElement = DomUtils.getChild(rootElement, "controllerServices"); final List<Element> controllerServiceElements; if (controllerServicesElement == null) { controllerServiceElements = Collections.emptyList(); @@ -252,7 +252,7 @@ public class StandardFlowSynchronizer implements FlowSynchronizer { // get the root group XML element final Element rootGroupElement = (Element) rootElement.getElementsByTagName("rootGroup").item(0); - final Element controllerServicesElement = (Element) DomUtils.getChild(rootElement, "controllerServices"); + final Element controllerServicesElement = DomUtils.getChild(rootElement, "controllerServices"); if (controllerServicesElement != null) { final List<Element> serviceElements = DomUtils.getChildElementsByTagName(controllerServicesElement, "controllerService"); @@ -274,7 +274,7 @@ public class StandardFlowSynchronizer implements FlowSynchronizer { updateProcessGroup(controller, /* parent group */ null, rootGroupElement, encryptor); } - final Element reportingTasksElement = (Element) DomUtils.getChild(rootElement, "reportingTasks"); + final Element reportingTasksElement = DomUtils.getChild(rootElement, "reportingTasks"); if (reportingTasksElement != null) { final List<Element> taskElements = DomUtils.getChildElementsByTagName(reportingTasksElement, "reportingTask"); for (final Element taskElement : taskElements) { @@ -403,9 +403,9 @@ public class StandardFlowSynchronizer implements FlowSynchronizer { for (final Map.Entry<String, String> entry : dto.getProperties().entrySet()) { if (entry.getValue() == null) { - reportingTask.removeProperty(entry.getKey()); + reportingTask.removeProperty(entry.getKey(), false); } else { - reportingTask.setProperty(entry.getKey(), entry.getValue()); + reportingTask.setProperty(entry.getKey(), entry.getValue(), false); } } @@ -735,9 +735,9 @@ public class StandardFlowSynchronizer implements FlowSynchronizer { for (final Map.Entry<String, String> entry : config.getProperties().entrySet()) { if (entry.getValue() == null) { - procNode.removeProperty(entry.getKey()); + procNode.removeProperty(entry.getKey(), false); } else { - procNode.setProperty(entry.getKey(), entry.getValue()); + procNode.setProperty(entry.getKey(), entry.getValue(), false); } } http://git-wip-us.apache.org/repos/asf/nifi/blob/57dadb72/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/AbstractReportingTaskNode.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/AbstractReportingTaskNode.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/AbstractReportingTaskNode.java index b537c30..09d1b40 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/AbstractReportingTaskNode.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/AbstractReportingTaskNode.java @@ -125,15 +125,15 @@ public abstract class AbstractReportingTaskNode extends AbstractConfiguredCompon } @Override - public void setProperty(final String name, final String value) { - super.setProperty(name, value); + public void setProperty(final String name, final String value, final boolean triggerOnPropertyModified) { + super.setProperty(name, value, triggerOnPropertyModified); onConfigured(); } @Override - public boolean removeProperty(String name) { - final boolean removed = super.removeProperty(name); + public boolean removeProperty(String name, final boolean triggerOnPropertyModified) { + final boolean removed = super.removeProperty(name, triggerOnPropertyModified); if (removed) { onConfigured(); } http://git-wip-us.apache.org/repos/asf/nifi/blob/57dadb72/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/StandardReportingContext.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/StandardReportingContext.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/StandardReportingContext.java index a4d337f..11d1b51 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/StandardReportingContext.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/StandardReportingContext.java @@ -26,6 +26,7 @@ import org.apache.nifi.attribute.expression.language.Query; import org.apache.nifi.attribute.expression.language.StandardPropertyValue; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.PropertyValue; +import org.apache.nifi.components.state.StateManager; import org.apache.nifi.connectable.Connectable; import org.apache.nifi.controller.ControllerService; import org.apache.nifi.controller.ControllerServiceLookup; @@ -37,24 +38,27 @@ import org.apache.nifi.reporting.Bulletin; import org.apache.nifi.reporting.BulletinRepository; import org.apache.nifi.reporting.EventAccess; import org.apache.nifi.reporting.ReportingContext; +import org.apache.nifi.reporting.ReportingTask; import org.apache.nifi.reporting.Severity; public class StandardReportingContext implements ReportingContext, ControllerServiceLookup { private final FlowController flowController; private final EventAccess eventAccess; + private final ReportingTask reportingTask; private final BulletinRepository bulletinRepository; private final ControllerServiceProvider serviceProvider; private final Map<PropertyDescriptor, String> properties; private final Map<PropertyDescriptor, PreparedQuery> preparedQueries; public StandardReportingContext(final FlowController flowController, final BulletinRepository bulletinRepository, - final Map<PropertyDescriptor, String> properties, final ControllerServiceProvider serviceProvider) { + final Map<PropertyDescriptor, String> properties, final ControllerServiceProvider serviceProvider, final ReportingTask reportingTask) { this.flowController = flowController; this.eventAccess = flowController; this.bulletinRepository = bulletinRepository; this.properties = Collections.unmodifiableMap(properties); this.serviceProvider = serviceProvider; + this.reportingTask = reportingTask; preparedQueries = new HashMap<>(); for (final Map.Entry<PropertyDescriptor, String> entry : properties.entrySet()) { @@ -140,4 +144,8 @@ public class StandardReportingContext implements ReportingContext, ControllerSer return serviceProvider.getControllerServiceName(serviceIdentifier); } + @Override + public StateManager getStateManager() { + return flowController.getStateManagerProvider().getStateManager(reportingTask.getIdentifier()); + } } http://git-wip-us.apache.org/repos/asf/nifi/blob/57dadb72/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/StandardReportingTaskNode.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/StandardReportingTaskNode.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/StandardReportingTaskNode.java index fe3af92..1a40e8a 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/StandardReportingTaskNode.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/StandardReportingTaskNode.java @@ -35,6 +35,6 @@ public class StandardReportingTaskNode extends AbstractReportingTaskNode impleme @Override public ReportingContext getReportingContext() { - return new StandardReportingContext(flowController, flowController.getBulletinRepository(), getProperties(), flowController); + return new StandardReportingContext(flowController, flowController.getBulletinRepository(), getProperties(), flowController, getReportingTask()); } } http://git-wip-us.apache.org/repos/asf/nifi/blob/57dadb72/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/BatchingSessionFactory.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/BatchingSessionFactory.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/BatchingSessionFactory.java index fd3f1cc..bac45af 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/BatchingSessionFactory.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/BatchingSessionFactory.java @@ -49,8 +49,8 @@ public class BatchingSessionFactory implements ProcessSessionFactory { return highThroughputSession; } - private class HighThroughputSession implements ProcessSession { + private class HighThroughputSession implements ProcessSession { private final StandardProcessSession session; public HighThroughputSession(final StandardProcessSession session) { @@ -241,7 +241,5 @@ public class BatchingSessionFactory implements ProcessSessionFactory { public ProvenanceReporter getProvenanceReporter() { return session.getProvenanceReporter(); } - } - } http://git-wip-us.apache.org/repos/asf/nifi/blob/57dadb72/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/ConnectableProcessContext.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/ConnectableProcessContext.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/ConnectableProcessContext.java index c68c78d..5e26c09 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/ConnectableProcessContext.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/ConnectableProcessContext.java @@ -26,6 +26,7 @@ import java.util.concurrent.TimeUnit; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.PropertyValue; +import org.apache.nifi.components.state.StateManager; import org.apache.nifi.connectable.Connectable; import org.apache.nifi.connectable.Connection; import org.apache.nifi.controller.ControllerService; @@ -46,10 +47,12 @@ public class ConnectableProcessContext implements ProcessContext { private final Connectable connectable; private final StringEncryptor encryptor; + private final StateManager stateManager; - public ConnectableProcessContext(final Connectable connectable, final StringEncryptor encryptor) { + public ConnectableProcessContext(final Connectable connectable, final StringEncryptor encryptor, final StateManager stateManager) { this.connectable = connectable; this.encryptor = encryptor; + this.stateManager = stateManager; } @Override @@ -235,4 +238,9 @@ public class ConnectableProcessContext implements ProcessContext { public boolean isExpressionLanguagePresent(PropertyDescriptor property) { return false; } + + @Override + public StateManager getStateManager() { + return stateManager; + } } http://git-wip-us.apache.org/repos/asf/nifi/blob/57dadb72/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/EventDrivenSchedulingAgent.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/EventDrivenSchedulingAgent.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/EventDrivenSchedulingAgent.java index e5582ec..76d7c08 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/EventDrivenSchedulingAgent.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/EventDrivenSchedulingAgent.java @@ -24,6 +24,8 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.components.state.StateManager; +import org.apache.nifi.components.state.StateManagerProvider; import org.apache.nifi.connectable.Connectable; import org.apache.nifi.controller.EventBasedWorker; import org.apache.nifi.controller.EventDrivenWorkerQueue; @@ -54,7 +56,8 @@ public class EventDrivenSchedulingAgent implements SchedulingAgent { private static final Logger logger = LoggerFactory.getLogger(EventDrivenSchedulingAgent.class); private final FlowEngine flowEngine; - private final ControllerServiceProvider controllerServiceProvider; + private final ControllerServiceProvider serviceProvider; + private final StateManagerProvider stateManagerProvider; private final EventDrivenWorkerQueue workerQueue; private final ProcessContextFactory contextFactory; private final AtomicInteger maxThreadCount; @@ -65,10 +68,11 @@ public class EventDrivenSchedulingAgent implements SchedulingAgent { private final ConcurrentMap<Connectable, AtomicLong> connectionIndexMap = new ConcurrentHashMap<>(); private final ConcurrentMap<Connectable, ScheduleState> scheduleStates = new ConcurrentHashMap<>(); - public EventDrivenSchedulingAgent(final FlowEngine flowEngine, final ControllerServiceProvider flowController, - final EventDrivenWorkerQueue workerQueue, final ProcessContextFactory contextFactory, final int maxThreadCount, final StringEncryptor encryptor) { + public EventDrivenSchedulingAgent(final FlowEngine flowEngine, final ControllerServiceProvider serviceProvider, final StateManagerProvider stateManagerProvider, + final EventDrivenWorkerQueue workerQueue, final ProcessContextFactory contextFactory, final int maxThreadCount, final StringEncryptor encryptor) { this.flowEngine = flowEngine; - this.controllerServiceProvider = flowController; + this.serviceProvider = serviceProvider; + this.stateManagerProvider = stateManagerProvider; this.workerQueue = workerQueue; this.contextFactory = contextFactory; this.maxThreadCount = new AtomicInteger(maxThreadCount); @@ -80,6 +84,10 @@ public class EventDrivenSchedulingAgent implements SchedulingAgent { } } + private StateManager getStateManager(final String componentId) { + return stateManagerProvider.getStateManager(componentId); + } + @Override public void shutdown() { flowEngine.shutdown(); @@ -177,7 +185,8 @@ public class EventDrivenSchedulingAgent implements SchedulingAgent { if (connectable instanceof ProcessorNode) { final ProcessorNode procNode = (ProcessorNode) connectable; - final StandardProcessContext standardProcessContext = new StandardProcessContext(procNode, controllerServiceProvider, encryptor); + final StandardProcessContext standardProcessContext = new StandardProcessContext(procNode, serviceProvider, + encryptor, getStateManager(connectable.getIdentifier())); final long runNanos = procNode.getRunDuration(TimeUnit.NANOSECONDS); final ProcessSessionFactory sessionFactory; @@ -251,7 +260,7 @@ public class EventDrivenSchedulingAgent implements SchedulingAgent { } } else { final ProcessSessionFactory sessionFactory = new StandardProcessSessionFactory(context); - final ConnectableProcessContext connectableProcessContext = new ConnectableProcessContext(connectable, encryptor); + final ConnectableProcessContext connectableProcessContext = new ConnectableProcessContext(connectable, encryptor, getStateManager(connectable.getIdentifier())); trigger(connectable, scheduleState, connectableProcessContext, sessionFactory); // See explanation above for the ProcessorNode as to why we do this. http://git-wip-us.apache.org/repos/asf/nifi/blob/57dadb72/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/QuartzSchedulingAgent.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/QuartzSchedulingAgent.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/QuartzSchedulingAgent.java index 4278cee..3fa6b52 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/QuartzSchedulingAgent.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/QuartzSchedulingAgent.java @@ -25,6 +25,7 @@ import java.util.concurrent.Callable; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.nifi.components.state.StateManager; import org.apache.nifi.connectable.Connectable; import org.apache.nifi.connectable.ConnectableType; import org.apache.nifi.controller.FlowController; @@ -61,6 +62,10 @@ public class QuartzSchedulingAgent implements SchedulingAgent { this.encryptor = enryptor; } + private StateManager getStateManager(final String componentId) { + return flowController.getStateManagerProvider().getStateManager(componentId); + } + @Override public void shutdown() { } @@ -133,14 +138,15 @@ public class QuartzSchedulingAgent implements SchedulingAgent { final List<AtomicBoolean> triggers = new ArrayList<>(); for (int i = 0; i < connectable.getMaxConcurrentTasks(); i++) { final Callable<Boolean> continuallyRunTask; + if (connectable.getConnectableType() == ConnectableType.PROCESSOR) { final ProcessorNode procNode = (ProcessorNode) connectable; - final StandardProcessContext standardProcContext = new StandardProcessContext(procNode, flowController, encryptor); + final StandardProcessContext standardProcContext = new StandardProcessContext(procNode, flowController, encryptor, getStateManager(connectable.getIdentifier())); ContinuallyRunProcessorTask runnableTask = new ContinuallyRunProcessorTask(this, procNode, flowController, contextFactory, scheduleState, standardProcContext); continuallyRunTask = runnableTask; } else { - final ConnectableProcessContext connProcContext = new ConnectableProcessContext(connectable, encryptor); + final ConnectableProcessContext connProcContext = new ConnectableProcessContext(connectable, encryptor, getStateManager(connectable.getIdentifier())); continuallyRunTask = new ContinuallyRunConnectableTask(contextFactory, connectable, scheduleState, connProcContext); } http://git-wip-us.apache.org/repos/asf/nifi/blob/57dadb72/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/ScheduleState.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/ScheduleState.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/ScheduleState.java index ea0b456..e03cc05 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/ScheduleState.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/ScheduleState.java @@ -24,6 +24,8 @@ import java.util.concurrent.ScheduledFuture; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import org.apache.nifi.annotation.lifecycle.OnStopped; + public class ScheduleState { private final AtomicInteger activeThreadCount = new AtomicInteger(0); @@ -62,12 +64,12 @@ public class ScheduleState { } /** - * Maintains an AtomicBoolean so that the first thread to call this method after a Processor is no longer scheduled to run will receive a <code>true</code> and MUST call the methods annotated with - * - * @OnStopped + * Maintains an AtomicBoolean so that the first thread to call this method after a Processor is no longer + * scheduled to run will receive a <code>true</code> and MUST call the methods annotated with + * {@link OnStopped @OnStopped} * * @return <code>true</code> if the caller is required to call Processor methods annotated with - * @OnStopped, <code>false</code> otherwise + * @OnStopped, <code>false</code> otherwise */ public boolean mustCallOnStoppedMethods() { return mustCallOnStoppedMethods.getAndSet(false); http://git-wip-us.apache.org/repos/asf/nifi/blob/57dadb72/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java index 82fc812..ee764e6 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java @@ -32,6 +32,8 @@ import org.apache.nifi.annotation.lifecycle.OnScheduled; import org.apache.nifi.annotation.lifecycle.OnStopped; import org.apache.nifi.annotation.lifecycle.OnUnscheduled; import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.state.StateManager; +import org.apache.nifi.components.state.StateManagerProvider; import org.apache.nifi.connectable.Connectable; import org.apache.nifi.connectable.Funnel; import org.apache.nifi.connectable.Port; @@ -74,6 +76,7 @@ public final class StandardProcessScheduler implements ProcessScheduler { private final Heartbeater heartbeater; private final long administrativeYieldMillis; private final String administrativeYieldDuration; + private final StateManagerProvider stateManagerProvider; private final ConcurrentMap<Object, ScheduleState> scheduleStates = new ConcurrentHashMap<>(); private final ScheduledExecutorService frameworkTaskExecutor; @@ -84,10 +87,12 @@ public final class StandardProcessScheduler implements ProcessScheduler { private final StringEncryptor encryptor; - public StandardProcessScheduler(final Heartbeater heartbeater, final ControllerServiceProvider controllerServiceProvider, final StringEncryptor encryptor) { + public StandardProcessScheduler(final Heartbeater heartbeater, final ControllerServiceProvider controllerServiceProvider, final StringEncryptor encryptor, + final StateManagerProvider stateManagerProvider) { this.heartbeater = heartbeater; this.controllerServiceProvider = controllerServiceProvider; this.encryptor = encryptor; + this.stateManagerProvider = stateManagerProvider; administrativeYieldDuration = NiFiProperties.getInstance().getAdministrativeYieldDuration(); administrativeYieldMillis = FormatUtils.getTimeDuration(administrativeYieldDuration, TimeUnit.MILLISECONDS); @@ -95,6 +100,10 @@ public final class StandardProcessScheduler implements ProcessScheduler { frameworkTaskExecutor = new FlowEngine(4, "Framework Task Thread"); } + private StateManager getStateManager(final String componentId) { + return stateManagerProvider.getStateManager(componentId); + } + public void scheduleFrameworkTask(final Runnable command, final String taskName, final long initialDelay, final long delay, final TimeUnit timeUnit) { frameworkTaskExecutor.scheduleWithFixedDelay(new Runnable() { @Override @@ -102,7 +111,7 @@ public final class StandardProcessScheduler implements ProcessScheduler { try { command.run(); } catch (final Throwable t) { - LOG.error("Failed to run Framework Task {} due to {}", command, t.toString()); + LOG.error("Failed to run Framework Task {} due to {}", taskName, t.toString()); if (LOG.isDebugEnabled()) { LOG.error("", t); } @@ -111,6 +120,15 @@ public final class StandardProcessScheduler implements ProcessScheduler { }, initialDelay, delay, timeUnit); } + /** + * Submits the given task to be executed exactly once in a background thread + * + * @param task the task to perform + */ + public void submitFrameworkTask(final Runnable task) { + frameworkTaskExecutor.submit(task); + } + @Override public void setMaxThreadCount(final SchedulingStrategy schedulingStrategy, final int maxThreadCount) { final SchedulingAgent agent = getSchedulingAgent(schedulingStrategy); @@ -299,7 +317,7 @@ public final class StandardProcessScheduler implements ProcessScheduler { public void run() { try (final NarCloseable x = NarCloseable.withNarLoader()) { final long lastStopTime = scheduleState.getLastStopTime(); - final StandardProcessContext processContext = new StandardProcessContext(procNode, controllerServiceProvider, encryptor); + final StandardProcessContext processContext = new StandardProcessContext(procNode, controllerServiceProvider, encryptor, getStateManager(procNode.getIdentifier())); final Set<String> serviceIds = new HashSet<>(); for (final PropertyDescriptor descriptor : processContext.getProperties().keySet()) { @@ -343,7 +361,8 @@ public final class StandardProcessScheduler implements ProcessScheduler { return; } - final SchedulingContext schedulingContext = new StandardSchedulingContext(processContext, controllerServiceProvider, procNode); + final SchedulingContext schedulingContext = new StandardSchedulingContext(processContext, controllerServiceProvider, + procNode, getStateManager(procNode.getIdentifier())); ReflectionUtils.invokeMethodsWithAnnotations(OnScheduled.class, org.apache.nifi.processor.annotation.OnScheduled.class, procNode.getProcessor(), schedulingContext); getSchedulingAgent(procNode).schedule(procNode, scheduleState); @@ -420,7 +439,7 @@ public final class StandardProcessScheduler implements ProcessScheduler { @Override public void run() { try (final NarCloseable x = NarCloseable.withNarLoader()) { - final StandardProcessContext processContext = new StandardProcessContext(procNode, controllerServiceProvider, encryptor); + final StandardProcessContext processContext = new StandardProcessContext(procNode, controllerServiceProvider, encryptor, getStateManager(procNode.getIdentifier())); ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnUnscheduled.class, procNode.getProcessor(), processContext); @@ -503,7 +522,7 @@ public final class StandardProcessScheduler implements ProcessScheduler { getSchedulingAgent(connectable).unschedule(connectable, state); if (!state.isScheduled() && state.getActiveThreadCount() == 0 && state.mustCallOnStoppedMethods()) { - final ConnectableProcessContext processContext = new ConnectableProcessContext(connectable, encryptor); + final ConnectableProcessContext processContext = new ConnectableProcessContext(connectable, encryptor, getStateManager(connectable.getIdentifier())); try (final NarCloseable x = NarCloseable.withNarLoader()) { ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnStopped.class, connectable, processContext); heartbeater.heartbeat(); http://git-wip-us.apache.org/repos/asf/nifi/blob/57dadb72/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/TimerDrivenSchedulingAgent.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/TimerDrivenSchedulingAgent.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/TimerDrivenSchedulingAgent.java index 96cee20..04db549 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/TimerDrivenSchedulingAgent.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/TimerDrivenSchedulingAgent.java @@ -23,6 +23,7 @@ import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; +import org.apache.nifi.components.state.StateManager; import org.apache.nifi.connectable.Connectable; import org.apache.nifi.connectable.ConnectableType; import org.apache.nifi.controller.FlowController; @@ -67,6 +68,10 @@ public class TimerDrivenSchedulingAgent implements SchedulingAgent { } } + private StateManager getStateManager(final String componentId) { + return flowController.getStateManagerProvider().getStateManager(componentId); + } + @Override public void shutdown() { flowEngine.shutdown(); @@ -96,14 +101,14 @@ public class TimerDrivenSchedulingAgent implements SchedulingAgent { // Determine the task to run and create it. if (connectable.getConnectableType() == ConnectableType.PROCESSOR) { final ProcessorNode procNode = (ProcessorNode) connectable; - final StandardProcessContext standardProcContext = new StandardProcessContext(procNode, flowController, encryptor); + final StandardProcessContext standardProcContext = new StandardProcessContext(procNode, flowController, encryptor, getStateManager(connectable.getIdentifier())); final ContinuallyRunProcessorTask runnableTask = new ContinuallyRunProcessorTask(this, procNode, flowController, contextFactory, scheduleState, standardProcContext); continuallyRunTask = runnableTask; processContext = standardProcContext; } else { - processContext = new ConnectableProcessContext(connectable, encryptor); + processContext = new ConnectableProcessContext(connectable, encryptor, getStateManager(connectable.getIdentifier())); continuallyRunTask = new ContinuallyRunConnectableTask(contextFactory, connectable, scheduleState, processContext); } http://git-wip-us.apache.org/repos/asf/nifi/blob/57dadb72/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/ControllerServiceLoader.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/ControllerServiceLoader.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/ControllerServiceLoader.java index b5c3855..f08d45b 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/ControllerServiceLoader.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/ControllerServiceLoader.java @@ -155,9 +155,9 @@ public class ControllerServiceLoader { for (final Map.Entry<String, String> entry : dto.getProperties().entrySet()) { if (entry.getValue() == null) { - node.removeProperty(entry.getKey()); + node.removeProperty(entry.getKey(), false); } else { - node.setProperty(entry.getKey(), entry.getValue()); + node.setProperty(entry.getKey(), entry.getValue(), false); } } } http://git-wip-us.apache.org/repos/asf/nifi/blob/57dadb72/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceInitializationContext.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceInitializationContext.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceInitializationContext.java index 02d6263..482dabf 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceInitializationContext.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceInitializationContext.java @@ -18,6 +18,7 @@ package org.apache.nifi.controller.service; import java.util.Set; +import org.apache.nifi.components.state.StateManager; import org.apache.nifi.controller.ControllerService; import org.apache.nifi.controller.ControllerServiceInitializationContext; import org.apache.nifi.controller.ControllerServiceLookup; @@ -28,11 +29,13 @@ public class StandardControllerServiceInitializationContext implements Controlle private final String id; private final ControllerServiceProvider serviceProvider; private final ComponentLog logger; + private final StateManager stateManager; - public StandardControllerServiceInitializationContext(final String identifier, final ComponentLog logger, final ControllerServiceProvider serviceProvider) { + public StandardControllerServiceInitializationContext(final String identifier, final ComponentLog logger, final ControllerServiceProvider serviceProvider, final StateManager stateManager) { this.id = identifier; this.logger = logger; this.serviceProvider = serviceProvider; + this.stateManager = stateManager; } @Override @@ -79,4 +82,9 @@ public class StandardControllerServiceInitializationContext implements Controlle public ComponentLog getLogger() { return logger; } + + @Override + public StateManager getStateManager() { + return stateManager; + } } http://git-wip-us.apache.org/repos/asf/nifi/blob/57dadb72/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java index ba03ee3..9a4c1e6 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java @@ -123,14 +123,14 @@ public class StandardControllerServiceNode extends AbstractConfiguredComponent i } @Override - public void setProperty(final String name, final String value) { - super.setProperty(name, value); + public void setProperty(final String name, final String value, final boolean triggerOnPropertyModified) { + super.setProperty(name, value, triggerOnPropertyModified); onConfigured(); } @Override - public boolean removeProperty(String name) { - final boolean removed = super.removeProperty(name); + public boolean removeProperty(String name, final boolean triggerOnPropertyModified) { + final boolean removed = super.removeProperty(name, triggerOnPropertyModified); if (removed) { onConfigured(); } http://git-wip-us.apache.org/repos/asf/nifi/blob/57dadb72/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java index 6561eb8..660f596 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java @@ -38,6 +38,8 @@ import java.util.concurrent.Executors; import org.apache.nifi.annotation.lifecycle.OnAdded; import org.apache.nifi.annotation.lifecycle.OnRemoved; import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.state.StateManager; +import org.apache.nifi.components.state.StateManagerProvider; import org.apache.nifi.controller.ConfigurationContext; import org.apache.nifi.controller.ConfiguredComponent; import org.apache.nifi.controller.ControllerService; @@ -46,8 +48,8 @@ import org.apache.nifi.controller.ProcessorNode; import org.apache.nifi.controller.ReportingTaskNode; import org.apache.nifi.controller.ScheduledState; import org.apache.nifi.controller.ValidationContextFactory; -import org.apache.nifi.controller.exception.ControllerServiceInstantiationException; import org.apache.nifi.controller.exception.ComponentLifeCycleException; +import org.apache.nifi.controller.exception.ControllerServiceInstantiationException; import org.apache.nifi.events.BulletinFactory; import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.nar.ExtensionManager; @@ -69,6 +71,7 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi private final ConcurrentMap<String, ControllerServiceNode> controllerServices; private static final Set<Method> validDisabledMethods; private final BulletinRepository bulletinRepo; + private final StateManagerProvider stateManagerProvider; static { // methods that are okay to be called when the service is disabled. @@ -82,12 +85,13 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi validDisabledMethods = Collections.unmodifiableSet(validMethods); } - public StandardControllerServiceProvider(final ProcessScheduler scheduler, final BulletinRepository bulletinRepo) { + public StandardControllerServiceProvider(final ProcessScheduler scheduler, final BulletinRepository bulletinRepo, final StateManagerProvider stateManagerProvider) { // the following 2 maps must be updated atomically, but we do not lock around them because they are modified // only in the createControllerService method, and both are modified before the method returns this.controllerServices = new ConcurrentHashMap<>(); this.processScheduler = scheduler; this.bulletinRepo = bulletinRepo; + this.stateManagerProvider = stateManagerProvider; } private Class<?>[] getInterfaces(final Class<?> cls) { @@ -110,6 +114,10 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi } } + private StateManager getStateManager(final String componentId) { + return stateManagerProvider.getStateManager(componentId); + } + @Override public ControllerServiceNode createControllerService(final String type, final String id, final boolean firstTimeAdded) { if (type == null || id == null) { @@ -171,7 +179,7 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi logger.info("Created Controller Service of type {} with identifier {}", type, id); final ComponentLog serviceLogger = new SimpleProcessLogger(id, originalService); - originalService.initialize(new StandardControllerServiceInitializationContext(id, serviceLogger, this)); + originalService.initialize(new StandardControllerServiceInitializationContext(id, serviceLogger, this, getStateManager(id))); final ValidationContextFactory validationContextFactory = new StandardValidationContextFactory(this); @@ -489,6 +497,8 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi } controllerServices.remove(serviceNode.getIdentifier()); + + stateManagerProvider.onComponentRemoved(serviceNode.getIdentifier()); } @Override http://git-wip-us.apache.org/repos/asf/nifi/blob/57dadb72/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/ClusterState.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/ClusterState.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/ClusterState.java new file mode 100644 index 0000000..8227bbb --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/ClusterState.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.controller.state; + +import java.util.Set; + +public interface ClusterState { + /** + * @return <code>true</code> if this instance of NiFi is connected to a cluster, <code>false</code> if the node is disconnected + */ + boolean isConnected(); + + /** + * @return the identifier that is used to identify this node in the cluster + */ + String getNodeIdentifier(); + + /** + * @return a Set of {@link NodeDescription} objects that can be used to determine which other nodes are in the same cluster. This + * Set will not be <code>null</code> but will be empty if the node is not connected to a cluster + */ + Set<NodeDescription> getNodes(); +} http://git-wip-us.apache.org/repos/asf/nifi/blob/57dadb72/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/ConfigParseException.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/ConfigParseException.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/ConfigParseException.java new file mode 100644 index 0000000..ae91fcf --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/ConfigParseException.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.controller.state; + +public class ConfigParseException extends RuntimeException { + private static final long serialVersionUID = 4956533590170361572L; + + public ConfigParseException(final String message) { + super(message); + } + + public ConfigParseException(final String message, final Throwable cause) { + super(message, cause); + } +}
