http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/c28d9f57/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/spring/WebClusterManagerFactoryBean.java ---------------------------------------------------------------------- diff --cc nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/spring/WebClusterManagerFactoryBean.java index 0000000,7169730..86bc07c mode 000000,100644..100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/spring/WebClusterManagerFactoryBean.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/spring/WebClusterManagerFactoryBean.java @@@ -1,0 -1,139 +1,141 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.nifi.cluster.spring; + -import java.nio.file.Paths; + import org.apache.nifi.admin.service.AuditService; + import org.apache.nifi.cluster.event.EventManager; + import org.apache.nifi.cluster.firewall.ClusterNodeFirewall; + import org.apache.nifi.cluster.flow.DataFlowManagementService; + import org.apache.nifi.cluster.manager.HttpRequestReplicator; + import org.apache.nifi.cluster.manager.HttpResponseMapper; + import org.apache.nifi.cluster.manager.impl.WebClusterManager; + import org.apache.nifi.cluster.protocol.impl.ClusterManagerProtocolSenderListener; + import org.apache.nifi.cluster.protocol.impl.ClusterServicesBroadcaster; -import org.apache.nifi.controller.service.ControllerServiceLoader; + import org.apache.nifi.encrypt.StringEncryptor; + import org.apache.nifi.io.socket.multicast.DiscoverableService; + import org.apache.nifi.io.socket.multicast.DiscoverableServiceImpl; + import org.apache.nifi.util.NiFiProperties; ++import org.apache.nifi.web.OptimisticLockingManager; + import org.springframework.beans.BeansException; + import org.springframework.beans.factory.FactoryBean; + import org.springframework.context.ApplicationContext; + import org.springframework.context.ApplicationContextAware; + + /** + * Factory bean for creating a singleton WebClusterManager instance. If the + * application is not configured to act as the cluster manager, then null is + * always returned as the created instance. + */ + public class WebClusterManagerFactoryBean implements FactoryBean, ApplicationContextAware { + + private ApplicationContext applicationContext; + + private WebClusterManager clusterManager; + + private NiFiProperties properties; + + private StringEncryptor encryptor; ++ ++ private OptimisticLockingManager optimisticLockingManager; + + @Override + public Object getObject() throws Exception { + if (properties.isClusterManager() && properties.isNode()) { + throw new IllegalStateException("Application may be configured as a cluster manager or a node, but not both."); + } else if (!properties.isClusterManager()) { + /* + * If not configured for the cluster manager, then the cluster manager is never used. + * null is returned so that we don't instantiate a thread pool or other resources. + */ + return null; + } else if (clusterManager == null) { + + // get the service configuration path (fail early) + final String serviceConfigurationFile = properties.getProperty(NiFiProperties.SERVICE_CONFIGURATION_FILE); + if (serviceConfigurationFile == null) { + throw new NullPointerException("The service configuration file has not been specified."); + } + + final HttpRequestReplicator requestReplicator = applicationContext.getBean("httpRequestReplicator", HttpRequestReplicator.class); + final HttpResponseMapper responseMapper = applicationContext.getBean("httpResponseMapper", HttpResponseMapper.class); + final DataFlowManagementService dataFlowService = applicationContext.getBean("dataFlowManagementService", DataFlowManagementService.class); + final ClusterManagerProtocolSenderListener senderListener = applicationContext.getBean("clusterManagerProtocolSenderListener", ClusterManagerProtocolSenderListener.class); + + // create the manager + clusterManager = new WebClusterManager( + requestReplicator, + responseMapper, + dataFlowService, + senderListener, + properties, - encryptor ++ encryptor, ++ optimisticLockingManager + ); + + // set the service broadcaster + if (properties.getClusterProtocolUseMulticast()) { + + // create broadcaster + final ClusterServicesBroadcaster broadcaster = applicationContext.getBean("clusterServicesBroadcaster", ClusterServicesBroadcaster.class); + + // register the cluster manager protocol service + final String clusterManagerProtocolServiceName = applicationContext.getBean("clusterManagerProtocolServiceName", String.class); + final DiscoverableService clusterManagerProtocolService = new DiscoverableServiceImpl(clusterManagerProtocolServiceName, properties.getClusterManagerProtocolAddress()); + broadcaster.addService(clusterManagerProtocolService); + + clusterManager.setServicesBroadcaster(broadcaster); + } + + // set the event manager + clusterManager.setEventManager(applicationContext.getBean("nodeEventHistoryManager", EventManager.class)); + + // set the cluster firewall + clusterManager.setClusterFirewall(applicationContext.getBean("clusterFirewall", ClusterNodeFirewall.class)); + + // set the audit service + clusterManager.setAuditService(applicationContext.getBean("auditService", AuditService.class)); - - // load the controller services - final ControllerServiceLoader serviceLoader = new ControllerServiceLoader(Paths.get(serviceConfigurationFile)); - serviceLoader.loadControllerServices(clusterManager); + } + return clusterManager; + } + + @Override + public Class getObjectType() { + return WebClusterManager.class; + } + + @Override + public boolean isSingleton() { + return true; + } + + @Override + public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { + this.applicationContext = applicationContext; + } + + public void setProperties(NiFiProperties properties) { + this.properties = properties; + } + + public void setEncryptor(final StringEncryptor encryptor) { + this.encryptor = encryptor; + } ++ ++ public void setOptimisticLockingManager(OptimisticLockingManager optimisticLockingManager) { ++ this.optimisticLockingManager = optimisticLockingManager; ++ } + }
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/c28d9f57/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/resources/nifi-cluster-manager-context.xml ---------------------------------------------------------------------- diff --cc nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/resources/nifi-cluster-manager-context.xml index 0000000,68c29bc..72c7bff mode 000000,100644..100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/resources/nifi-cluster-manager-context.xml +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/resources/nifi-cluster-manager-context.xml @@@ -1,0 -1,124 +1,128 @@@ + <?xml version="1.0" encoding="UTF-8"?> + <!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + --> + <!-- marked as lazy so that clustering beans are not created when applications runs in non-clustered mode --> + <beans default-lazy-init="true" + xmlns="http://www.springframework.org/schema/beans" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xmlns:util="http://www.springframework.org/schema/util" + xmlns:context="http://www.springframework.org/schema/context" + xmlns:aop="http://www.springframework.org/schema/aop" + xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.1.xsd + http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-3.1.xsd + http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.1.xsd + http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-3.1.xsd"> + + <!-- jersey client --> + <bean id="jersey-client" class="org.apache.nifi.web.util.WebUtils" factory-method="createClient"> + <constructor-arg> + <bean class="com.sun.jersey.api.client.config.DefaultClientConfig"/> + </constructor-arg> + <constructor-arg> + <bean class="org.apache.nifi.framework.security.util.SslContextFactory" factory-method="createSslContext"> + <constructor-arg ref="nifiProperties"/> + </bean> + </constructor-arg> + </bean> + + <!-- http request replicator --> + <bean id="httpRequestReplicator" class="org.apache.nifi.cluster.manager.impl.HttpRequestReplicatorImpl"> + <constructor-arg index="0"> + <bean factory-bean="nifiProperties" factory-method="getClusterManagerNodeApiRequestThreads"/> + </constructor-arg> + <constructor-arg ref="jersey-client" index="1"/> + <constructor-arg index="2"> + <bean factory-bean="nifiProperties" factory-method="getClusterManagerNodeApiConnectionTimeout"/> + </constructor-arg> + <constructor-arg index="3"> + <bean factory-bean="nifiProperties" factory-method="getClusterManagerNodeApiReadTimeout"/> + </constructor-arg> + <property name="nodeProtocolScheme"> + <bean factory-bean="nifiProperties" factory-method="getClusterProtocolManagerToNodeApiScheme"/> + </property> + </bean> + + <!-- http response mapper --> + <bean id="httpResponseMapper" class="org.apache.nifi.cluster.manager.impl.HttpResponseMapperImpl"/> + + <!-- cluster flow DAO --> + <bean id="dataFlowDao" class="org.apache.nifi.cluster.flow.impl.DataFlowDaoImpl"> + <constructor-arg index="0"> + <bean factory-bean="nifiProperties" factory-method="getFlowConfigurationFileDir"/> + </constructor-arg> + <constructor-arg index="1"> + <bean factory-bean="nifiProperties" factory-method="getRestoreDirectory"/> + </constructor-arg> + <constructor-arg index="2"> + <bean factory-bean="nifiProperties" factory-method="getAutoResumeState"/> + </constructor-arg> + </bean> + + <!-- dataflow management service --> + <bean id="dataFlowManagementService" class="org.apache.nifi.cluster.flow.impl.DataFlowManagementServiceImpl"> + <constructor-arg ref="dataFlowDao"/> + <constructor-arg ref="clusterManagerProtocolSender"/> + <property name="retrievalDelay"> + <bean factory-bean="nifiProperties" factory-method="getClusterManagerFlowRetrievalDelay"/> + </property> + </bean> + + <!-- node event history manager --> + <bean id="nodeEventHistoryManager" class="org.apache.nifi.cluster.event.impl.EventManagerImpl"> + <constructor-arg> + <bean factory-bean="nifiProperties" factory-method="getClusterManagerNodeEventHistorySize"/> + </constructor-arg> + </bean> + + <!-- cluster firewall --> + <bean id="clusterFirewall" class="org.apache.nifi.cluster.spring.FileBasedClusterNodeFirewallFactoryBean"> + <property name="properties" ref="nifiProperties"/> + </bean> + ++ <!-- cluster manager optimistic locking manager --> ++ <bean id="clusterManagerOptimisticLockingManager" class="org.apache.nifi.web.StandardOptimisticLockingManager"/> ++ + <!-- cluster manager --> + <bean id="clusterManager" class="org.apache.nifi.cluster.spring.WebClusterManagerFactoryBean"> + <property name="properties" ref="nifiProperties"/> + <property name="encryptor" ref="stringEncryptor"/> ++ <property name="optimisticLockingManager" ref="clusterManagerOptimisticLockingManager"/> + </bean> + + <!-- discoverable services --> + + <!-- cluster manager protocol discoverable service --> + + <!-- service name for communicating with the cluster manager using sockets --> + <bean id="clusterManagerProtocolServiceName" class="java.lang.String"> + <constructor-arg value="cluster-manager-protocol" /> + </bean> + + <!-- cluster manager protocol service discovery --> + <bean id="clusterManagerProtocolServiceDiscovery" class="org.apache.nifi.cluster.protocol.impl.ClusterServiceDiscovery"> + <constructor-arg ref="clusterManagerProtocolServiceName" index="0"/> + <constructor-arg index="1"> + <bean factory-bean="nifiProperties" factory-method="getClusterProtocolMulticastAddress"/> + </constructor-arg> + <constructor-arg ref="protocolMulticastConfiguration" index="2"/> + <constructor-arg ref="protocolContext" index="3"/> + </bean> + + <!-- cluster manager protocol service locator --> + <bean id="clusterManagerProtocolServiceLocator" class="org.apache.nifi.cluster.spring.ClusterManagerProtocolServiceLocatorFactoryBean"> + <property name="properties" ref="nifiProperties"/> + </bean> + + </beans> http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/c28d9f57/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/pom.xml ----------------------------------------------------------------------
