http://git-wip-us.apache.org/repos/asf/nifi/blob/57dadb72/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/NodeDescription.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/NodeDescription.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/NodeDescription.java new file mode 100644 index 0000000..0c284bf --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/NodeDescription.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.controller.state; + +/** + * Provides information about a node in a NiFi cluster + */ +public interface NodeDescription { + /** + * @return the unique identifier for this node in the cluster + */ + String getNodeIdentifier(); + + /** + * @return the hostname of the node + */ + String getHostname(); + + /** + * @return the port on which the node's embedded ZooKeeper Server is running, or <code>null</code> if the node is + * not running an embedded ZooKeeper server + */ + Integer getEmbeddedZooKeeperPort(); +}
http://git-wip-us.apache.org/repos/asf/nifi/blob/57dadb72/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/StandardStateManager.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/StandardStateManager.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/StandardStateManager.java new file mode 100644 index 0000000..e83bbac --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/StandardStateManager.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.controller.state; + +import java.io.IOException; +import java.util.Map; + +import org.apache.nifi.components.state.Scope; +import org.apache.nifi.components.state.StateManager; +import org.apache.nifi.components.state.StateMap; +import org.apache.nifi.components.state.StateProvider; + +public class StandardStateManager implements StateManager { + private final StateProvider localProvider; + private final StateProvider clusterProvider; + private final String componentId; + + public StandardStateManager(final StateProvider localProvider, final StateProvider clusterProvider, final String componentId) { + this.localProvider = localProvider; + this.clusterProvider = clusterProvider; + this.componentId = componentId; + } + + private StateProvider getProvider(final Scope scope) { + if (scope == Scope.LOCAL || clusterProvider == null || !clusterProvider.isEnabled()) { + return localProvider; + } + + return clusterProvider; + } + + + @Override + public StateMap getState(final Scope scope) throws IOException { + return getProvider(scope).getState(componentId); + } + + + @Override + public boolean replace(final StateMap oldValue, final Map<String, String> newValue, final Scope scope) throws IOException { + return getProvider(scope).replace(oldValue, newValue, componentId); + } + + @Override + public void setState(final Map<String, String> state, final Scope scope) throws IOException { + getProvider(scope).setState(state, componentId); + } + + @Override + public void clear(final Scope scope) throws IOException { + getProvider(scope).clear(componentId); + } + + @Override + public String toString() { + return "StandardStateManager[componentId=" + componentId + "]"; + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/57dadb72/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/StandardStateMap.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/StandardStateMap.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/StandardStateMap.java new file mode 100644 index 0000000..b006ac6 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/StandardStateMap.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.controller.state; + +import java.util.Collections; +import java.util.Map; + +import org.apache.nifi.components.state.StateMap; + +public class StandardStateMap implements StateMap { + private final Map<String, String> stateValues; + private final long version; + + public StandardStateMap(final Map<String, String> stateValues, final long version) { + this.stateValues = Collections.unmodifiableMap(stateValues == null ? Collections.<String, String> emptyMap() : stateValues); + this.version = version; + } + + @Override + public long getVersion() { + return version; + } + + @Override + public String get(final String key) { + return stateValues.get(key); + } + + @Override + public Map<String, String> toMap() { + return stateValues; + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/57dadb72/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/StandardStateProviderInitializationContext.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/StandardStateProviderInitializationContext.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/StandardStateProviderInitializationContext.java new file mode 100644 index 0000000..c9e7b8e --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/StandardStateProviderInitializationContext.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.controller.state; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +import javax.net.ssl.SSLContext; + +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.PropertyValue; +import org.apache.nifi.components.state.StateProviderInitializationContext; + +public class StandardStateProviderInitializationContext implements StateProviderInitializationContext { + private final String id; + private final Map<PropertyDescriptor, PropertyValue> properties; + private final SSLContext sslContext; + + public StandardStateProviderInitializationContext(final String identifier, final Map<PropertyDescriptor, PropertyValue> properties, final SSLContext sslContext) { + this.id = identifier; + this.properties = new HashMap<>(properties); + this.sslContext = sslContext; + } + + @Override + public Map<PropertyDescriptor, PropertyValue> getProperties() { + return Collections.unmodifiableMap(properties); + } + + @Override + public PropertyValue getProperty(final PropertyDescriptor property) { + return properties.get(property); + } + + @Override + public String getIdentifier() { + return id; + } + + @Override + public SSLContext getSSLContext() { + return sslContext; + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/57dadb72/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/StateMapSerDe.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/StateMapSerDe.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/StateMapSerDe.java new file mode 100644 index 0000000..5671f5a --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/StateMapSerDe.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.controller.state; + +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +import org.apache.nifi.components.state.StateMap; +import org.wali.SerDe; +import org.wali.UpdateType; + +public class StateMapSerDe implements SerDe<StateMapUpdate> { + private static final int VERSION = 0; + + @Override + public void serializeEdit(final StateMapUpdate previousRecordState, final StateMapUpdate newRecordState, final DataOutputStream out) throws IOException { + serializeRecord(newRecordState, out); + } + + @Override + public void serializeRecord(final StateMapUpdate record, final DataOutputStream out) throws IOException { + out.writeUTF(record.getComponentId()); + out.writeUTF(record.getUpdateType().name()); + if (record.getUpdateType() == UpdateType.DELETE) { + return; + } + + final StateMap stateMap = record.getStateMap(); + final long recordVersion = stateMap.getVersion(); + out.writeLong(recordVersion); + + final Map<String, String> map = stateMap.toMap(); + out.writeInt(map.size()); + for (final Map.Entry<String, String> entry : map.entrySet()) { + out.writeUTF(entry.getKey()); + out.writeUTF(entry.getValue()); + } + } + + @Override + public StateMapUpdate deserializeEdit(final DataInputStream in, final Map<Object, StateMapUpdate> currentRecordStates, final int version) throws IOException { + return deserializeRecord(in, version); + } + + @Override + public StateMapUpdate deserializeRecord(final DataInputStream in, final int version) throws IOException { + final String componentId = in.readUTF(); + final String updateTypeName = in.readUTF(); + final UpdateType updateType = UpdateType.valueOf(updateTypeName); + if (updateType == UpdateType.DELETE) { + return new StateMapUpdate(null, componentId, updateType); + } + + final long recordVersion = in.readLong(); + final int numEntries = in.readInt(); + final Map<String, String> stateValues = new HashMap<>(numEntries); + for (int i = 0; i < numEntries; i++) { + final String key = in.readUTF(); + final String value = in.readUTF(); + stateValues.put(key, value); + } + + return new StateMapUpdate(new StandardStateMap(stateValues, recordVersion), componentId, updateType); + } + + @Override + public Object getRecordIdentifier(final StateMapUpdate record) { + return record.getComponentId(); + } + + @Override + public UpdateType getUpdateType(final StateMapUpdate record) { + return record.getUpdateType(); + } + + @Override + public String getLocation(final StateMapUpdate record) { + return null; + } + + @Override + public int getVersion() { + return VERSION; + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/57dadb72/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/StateMapUpdate.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/StateMapUpdate.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/StateMapUpdate.java new file mode 100644 index 0000000..9eb478b --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/StateMapUpdate.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.controller.state; + +import org.apache.nifi.components.state.StateMap; +import org.wali.UpdateType; + +public class StateMapUpdate { + private final StateMap stateMap; + private final String componentId; + private final UpdateType updateType; + + public StateMapUpdate(final StateMap stateMap, final String componentId, final UpdateType updateType) { + this.stateMap = stateMap; + this.componentId = componentId; + this.updateType = updateType; + } + + public StateMap getStateMap() { + return stateMap; + } + + public String getComponentId() { + return componentId; + } + + public UpdateType getUpdateType() { + return updateType; + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/57dadb72/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/StateProviderException.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/StateProviderException.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/StateProviderException.java new file mode 100644 index 0000000..f51d229 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/StateProviderException.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.controller.state; + +public class StateProviderException extends RuntimeException { + private static final long serialVersionUID = -4701298038474540654L; + + public StateProviderException(final String message) { + super(message); + } + + public StateProviderException(final String message, final Throwable cause) { + super(message, cause); + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/57dadb72/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/config/StateManagerConfiguration.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/config/StateManagerConfiguration.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/config/StateManagerConfiguration.java new file mode 100644 index 0000000..c8becee --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/config/StateManagerConfiguration.java @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.controller.state.config; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import javax.xml.parsers.DocumentBuilder; +import javax.xml.parsers.DocumentBuilderFactory; +import javax.xml.parsers.ParserConfigurationException; + +import org.apache.nifi.controller.state.ConfigParseException; +import org.apache.nifi.util.DomUtils; +import org.w3c.dom.Document; +import org.w3c.dom.Element; +import org.xml.sax.SAXException; + +public class StateManagerConfiguration { + private final Map<String, StateProviderConfiguration> providers; + + private StateManagerConfiguration(final Map<String, StateProviderConfiguration> providerConfigs) { + this.providers = providerConfigs; + } + + public Map<String, StateProviderConfiguration> getStateProviderConfigurations() { + return Collections.unmodifiableMap(providers); + } + + public StateProviderConfiguration getStateProviderConfiguration(final String providerId) { + return providers.get(providerId); + } + + public List<StateProviderConfiguration> getStateProviderConfigurations(final StateProviderScope scope) { + final List<StateProviderConfiguration> configs = new ArrayList<>(); + for (final StateProviderConfiguration config : providers.values()) { + if (config.getScope() == scope) { + configs.add(config); + } + } + + return configs; + } + + public static StateManagerConfiguration parse(final File configFile) throws IOException, ConfigParseException { + final DocumentBuilderFactory factory = DocumentBuilderFactory.newInstance(); + factory.setNamespaceAware(false); + + final Document document; + DocumentBuilder builder; + try { + builder = factory.newDocumentBuilder(); + document = builder.parse(configFile); + } catch (ParserConfigurationException | SAXException e) { + throw new ConfigParseException("Unable to parse file " + configFile + ", as it does not appear to be a valid XML File", e); + } + + final Element rootElement = document.getDocumentElement(); + final List<Element> localProviderElements = DomUtils.getChildElementsByTagName(rootElement, "local-provider"); + if (localProviderElements.isEmpty()) { + throw new ConfigParseException("State Management config file " + configFile + " is not a valid configuration file, " + + "as it does not contain a 'local-provider' element, or the local-provider element is not the child of the root element"); + } + + final Map<String, StateProviderConfiguration> configs = new HashMap<>(); + for (final Element localProviderElement : localProviderElements) { + final StateProviderConfiguration providerConfig = parseProviderConfiguration(localProviderElement, StateProviderScope.LOCAL, configFile); + if (configs.containsKey(providerConfig.getId())) { + throw new ConfigParseException("State Management config file " + configFile + " is not a valid configuration file, " + + "as it contains multiple providers with the \"id\" of \"" + providerConfig.getId() + "\""); + } + + configs.put(providerConfig.getId(), providerConfig); + } + + final List<Element> clusterProviderElements = DomUtils.getChildElementsByTagName(rootElement, "cluster-provider"); + for (final Element clusterProviderElement : clusterProviderElements) { + final StateProviderConfiguration providerConfig = parseProviderConfiguration(clusterProviderElement, StateProviderScope.CLUSTER, configFile); + if (configs.containsKey(providerConfig.getId())) { + throw new ConfigParseException("State Management config file " + configFile + " is not a valid configuration file, " + + "as it contains multiple providers with the \"id\" of \"" + providerConfig.getId() + "\""); + } + + configs.put(providerConfig.getId(), providerConfig); + } + + return new StateManagerConfiguration(configs); + } + + private static StateProviderConfiguration parseProviderConfiguration(final Element providerElement, final StateProviderScope scope, final File configFile) throws ConfigParseException { + final String elementName = providerElement.getNodeName(); + + final String id = DomUtils.getChildText(providerElement, "id"); + if (id == null) { + throw new ConfigParseException("State Management config file " + configFile + " is not a valid configuration file, " + + "as a " + elementName + " element does not contain an \"id\" element"); + } + if (id.trim().isEmpty()) { + throw new ConfigParseException("State Management config file " + configFile + " is not a valid configuration file, " + + "as a " + elementName + "'s \"id\" element is empty"); + } + + final String className = DomUtils.getChildText(providerElement, "class"); + if (className == null) { + throw new ConfigParseException("State Management config file " + configFile + " is not a valid configuration file, " + + "as a " + elementName + " element does not contain an \"class\" element"); + } + if (className.trim().isEmpty()) { + throw new ConfigParseException("State Management config file " + configFile + " is not a valid configuration file, " + + "as a " + elementName + "'s \"class\" element is empty"); + } + + final List<Element> propertyElements = DomUtils.getChildElementsByTagName(providerElement, "property"); + final Map<String, String> propertyMap = new HashMap<>(); + for (final Element propertyElement : propertyElements) { + final String propertyName = propertyElement.getAttribute("name"); + final String propertyValue = propertyElement.getTextContent(); + propertyMap.put(propertyName, propertyValue); + } + + return new StateProviderConfiguration(id, className, scope, propertyMap); + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/57dadb72/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/config/StateProviderConfiguration.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/config/StateProviderConfiguration.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/config/StateProviderConfiguration.java new file mode 100644 index 0000000..290a750 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/config/StateProviderConfiguration.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.controller.state.config; + +import java.util.HashMap; +import java.util.Map; + +public class StateProviderConfiguration { + private final String id; + private final StateProviderScope scope; + private final String className; + private final Map<String, String> properties; + + public StateProviderConfiguration(final String id, final String className, final StateProviderScope scope, final Map<String, String> properties) { + this.id = id; + this.className = className; + this.scope = scope; + this.properties = new HashMap<>(properties); + } + + public String getId() { + return id; + } + + public String getClassName() { + return className; + } + + public Map<String, String> getProperties() { + return properties; + } + + public StateProviderScope getScope() { + return scope; + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/57dadb72/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/config/StateProviderScope.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/config/StateProviderScope.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/config/StateProviderScope.java new file mode 100644 index 0000000..40e1865 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/config/StateProviderScope.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.controller.state.config; + +public enum StateProviderScope { + /** + * Provider is a Local State Provider + */ + LOCAL, + + /** + * Provider is a Cluster State Provider + */ + CLUSTER; +} http://git-wip-us.apache.org/repos/asf/nifi/blob/57dadb72/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/manager/StandardStateManagerProvider.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/manager/StandardStateManagerProvider.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/manager/StandardStateManagerProvider.java new file mode 100644 index 0000000..f887527 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/manager/StandardStateManagerProvider.java @@ -0,0 +1,295 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.controller.state.manager; + +import java.io.File; +import java.io.IOException; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +import javax.net.ssl.SSLContext; + +import org.apache.nifi.attribute.expression.language.StandardPropertyValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.PropertyValue; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.components.state.Scope; +import org.apache.nifi.components.state.StateManager; +import org.apache.nifi.components.state.StateManagerProvider; +import org.apache.nifi.components.state.StateProvider; +import org.apache.nifi.components.state.StateProviderInitializationContext; +import org.apache.nifi.controller.state.ConfigParseException; +import org.apache.nifi.controller.state.StandardStateManager; +import org.apache.nifi.controller.state.StandardStateProviderInitializationContext; +import org.apache.nifi.controller.state.config.StateManagerConfiguration; +import org.apache.nifi.controller.state.config.StateProviderConfiguration; +import org.apache.nifi.controller.state.config.StateProviderScope; +import org.apache.nifi.framework.security.util.SslContextFactory; +import org.apache.nifi.nar.ExtensionManager; +import org.apache.nifi.processor.StandardValidationContext; +import org.apache.nifi.util.NiFiProperties; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class StandardStateManagerProvider implements StateManagerProvider { + private static final Logger logger = LoggerFactory.getLogger(StandardStateManagerProvider.class); + + private final ConcurrentMap<String, StateManager> stateManagers = new ConcurrentHashMap<>(); + private final StateProvider localStateProvider; + private final StateProvider clusterStateProvider; + + private StandardStateManagerProvider(final StateProvider localStateProvider, final StateProvider clusterStateProvider) { + this.localStateProvider = localStateProvider; + this.clusterStateProvider = clusterStateProvider; + } + + public static StateManagerProvider create(final NiFiProperties properties) throws ConfigParseException, IOException { + final StateProvider localProvider = createLocalStateProvider(properties); + + final StateProvider clusterProvider; + if (properties.isNode()) { + clusterProvider = createClusteredStateProvider(properties); + } else { + clusterProvider = null; + } + + return new StandardStateManagerProvider(localProvider, clusterProvider); + } + + private static StateProvider createLocalStateProvider(final NiFiProperties properties) throws IOException, ConfigParseException { + final File configFile = properties.getStateManagementConfigFile(); + return createStateProvider(configFile, StateProviderScope.LOCAL, properties); + } + + + private static StateProvider createClusteredStateProvider(final NiFiProperties properties) throws IOException, ConfigParseException { + final File configFile = properties.getStateManagementConfigFile(); + return createStateProvider(configFile, StateProviderScope.CLUSTER, properties); + } + + + private static StateProvider createStateProvider(final File configFile, final StateProviderScope scope, final NiFiProperties properties) throws ConfigParseException, IOException { + final String providerId; + final String providerIdPropertyName; + final String providerDescription; + final String providerXmlElementName; + final String oppositeScopeXmlElementName; + + switch (scope) { + case CLUSTER: + providerId = properties.getClusterStateProviderId(); + providerIdPropertyName = NiFiProperties.STATE_MANAGEMENT_CLUSTER_PROVIDER_ID; + providerDescription = "Cluster State Provider"; + providerXmlElementName = "cluster-provider"; + oppositeScopeXmlElementName = "local-provider"; + break; + case LOCAL: + providerId = properties.getLocalStateProviderId(); + providerIdPropertyName = NiFiProperties.STATE_MANAGEMENT_LOCAL_PROVIDER_ID; + providerDescription = "Local State Provider"; + providerXmlElementName = "local-provider"; + oppositeScopeXmlElementName = "cluster-provider"; + break; + default: + throw new AssertionError("Attempted to create State Provider for unknown Scope: " + scope); + } + + if (!configFile.exists()) { + throw new IllegalStateException("Cannot create " + providerDescription + " because the State Management Configuration File " + configFile + " does not exist"); + } + if (!configFile.canRead()) { + throw new IllegalStateException("Cannot create " + providerDescription + " because the State Management Configuration File " + configFile + " cannot be read"); + } + + if (providerId == null) { + if (scope == StateProviderScope.CLUSTER) { + throw new IllegalStateException("Cannot create Cluster State Provider because the '" + providerIdPropertyName + + "' property is missing from the NiFi Properties file. In order to run NiFi in a cluster, the " + providerIdPropertyName + + " property must be configured in nifi.properties"); + } + + throw new IllegalStateException("Cannot create " + providerDescription + " because the '" + providerIdPropertyName + + "' property is missing from the NiFi Properties file"); + } + + if (providerId.trim().isEmpty()) { + throw new IllegalStateException("Cannot create " + providerDescription + " because the '" + providerIdPropertyName + + "' property in the NiFi Properties file has no value set. This is a required property and must reference the identifier of one of the " + + providerXmlElementName + " elements in the State Management Configuraiton File (" + configFile + ")"); + } + + final StateManagerConfiguration config = StateManagerConfiguration.parse(configFile); + final StateProviderConfiguration providerConfig = config.getStateProviderConfiguration(providerId); + if (providerConfig == null) { + throw new IllegalStateException("Cannot create " + providerDescription + " because the '" + providerIdPropertyName + + "' property in the NiFi Properties file is set to '" + providerId + "', but there is no " + providerXmlElementName + + " entry in the State Management Configuration File (" + configFile + ") with this id"); + } + + if (providerConfig.getScope() != scope) { + throw new IllegalStateException("Cannot create " + providerDescription + " because the '" + providerIdPropertyName + + "' property in the NiFi Properties file is set to '" + providerId + "', but this id is assigned to a " + oppositeScopeXmlElementName + + " entry in the State Management Configuration File (" + configFile + "), rather than a " + providerXmlElementName + " entry"); + } + + final String providerClassName = providerConfig.getClassName(); + + final StateProvider provider; + try { + provider = instantiateStateProvider(providerClassName); + } catch (final Exception e) { + throw new RuntimeException("Cannot create " + providerDescription + " of type " + providerClassName, e); + } + + final Map<PropertyDescriptor, PropertyValue> propertyMap = new HashMap<>(); + final Map<PropertyDescriptor, String> propertyStringMap = new HashMap<>(); + for (final PropertyDescriptor descriptor : provider.getPropertyDescriptors()) { + propertyMap.put(descriptor, new StandardPropertyValue(descriptor.getDefaultValue(), null)); + propertyStringMap.put(descriptor, descriptor.getDefaultValue()); + } + + for (final Map.Entry<String, String> entry : providerConfig.getProperties().entrySet()) { + final PropertyDescriptor descriptor = provider.getPropertyDescriptor(entry.getKey()); + propertyStringMap.put(descriptor, entry.getValue()); + propertyMap.put(descriptor, new StandardPropertyValue(entry.getValue(), null)); + } + + final SSLContext sslContext = SslContextFactory.createSslContext(properties, false); + final StateProviderInitializationContext initContext = new StandardStateProviderInitializationContext(providerId, propertyMap, sslContext); + + synchronized (provider) { + provider.initialize(initContext); + } + + final ValidationContext validationContext = new StandardValidationContext(null, propertyStringMap, null); + final Collection<ValidationResult> results = provider.validate(validationContext); + final StringBuilder validationFailures = new StringBuilder(); + + int invalidCount = 0; + for (final ValidationResult result : results) { + if (!result.isValid()) { + validationFailures.append(result.toString()).append("\n"); + invalidCount++; + } + } + + if (invalidCount > 0) { + throw new IllegalStateException("Could not initialize State Providers because the " + providerDescription + " is not valid. The following " + + invalidCount + " Validation Errors occurred:\n" + validationFailures.toString() + "\nPlease check the configuration of the " + providerDescription + " with ID [" + + providerId.trim() + "] in the file " + configFile.getAbsolutePath()); + } + + return provider; + } + + private static StateProvider instantiateStateProvider(final String type) throws ClassNotFoundException, InstantiationException, IllegalAccessException { + final ClassLoader ctxClassLoader = Thread.currentThread().getContextClassLoader(); + try { + final ClassLoader detectedClassLoaderForType = ExtensionManager.getClassLoader(type); + final Class<?> rawClass; + if (detectedClassLoaderForType == null) { + // try to find from the current class loader + rawClass = Class.forName(type); + } else { + // try to find from the registered classloader for that type + rawClass = Class.forName(type, true, ExtensionManager.getClassLoader(type)); + } + + Thread.currentThread().setContextClassLoader(detectedClassLoaderForType); + final Class<? extends StateProvider> mgrClass = rawClass.asSubclass(StateProvider.class); + return mgrClass.newInstance(); + } finally { + if (ctxClassLoader != null) { + Thread.currentThread().setContextClassLoader(ctxClassLoader); + } + } + } + + /** + * Returns the State Manager that has been created for the given component ID, or <code>null</code> if none exists + * + * @return the StateManager that can be used by the component with the given ID, or <code>null</code> if none exists + */ + @Override + public synchronized StateManager getStateManager(final String componentId) { + StateManager stateManager = stateManagers.get(componentId); + if (stateManager != null) { + return stateManager; + } + + stateManager = new StandardStateManager(localStateProvider, clusterStateProvider, componentId); + stateManagers.put(componentId, stateManager); + return stateManager; + } + + @Override + public synchronized void shutdown() { + localStateProvider.shutdown(); + if (clusterStateProvider != null) { + clusterStateProvider.shutdown(); + } + } + + @Override + public void enableClusterProvider() { + clusterStateProvider.enable(); + } + + @Override + public void disableClusterProvider() { + clusterStateProvider.disable(); + } + + @Override + public void onComponentRemoved(final String componentId) { + final StateManager mgr = stateManagers.remove(componentId); + if (mgr == null) { + return; + } + + try { + mgr.clear(Scope.CLUSTER); + } catch (final Exception e) { + logger.warn("Component with ID {} was removed from NiFi instance but failed to clear clustered state for the component", e); + } + + try { + mgr.clear(Scope.LOCAL); + } catch (final Exception e) { + logger.warn("Component with ID {} was removed from NiFi instance but failed to clear local state for the component", e); + } + + try { + localStateProvider.onComponentRemoved(componentId); + } catch (final Exception e) { + logger.warn("Component with ID {} was removed from NiFi instance but failed to cleanup resources used to maintain its local state", e); + } + + if (clusterStateProvider != null) { + try { + clusterStateProvider.onComponentRemoved(componentId); + } catch (final Exception e) { + logger.warn("Component with ID {} was removed from NiFi instance but failed to cleanup resources used to maintain its clustered state", e); + } + } + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/57dadb72/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/providers/AbstractStateProvider.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/providers/AbstractStateProvider.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/providers/AbstractStateProvider.java new file mode 100644 index 0000000..078dce3 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/providers/AbstractStateProvider.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.controller.state.providers; + +import java.io.IOException; + +import org.apache.nifi.components.AbstractConfigurableComponent; +import org.apache.nifi.components.state.StateProvider; +import org.apache.nifi.components.state.StateProviderInitializationContext; + +public abstract class AbstractStateProvider extends AbstractConfigurableComponent implements StateProvider { + private String identifier; + + private volatile boolean enabled; + + @Override + public final void initialize(final StateProviderInitializationContext context) throws IOException { + this.identifier = context.getIdentifier(); + init(context); + } + + @Override + public String getIdentifier() { + return identifier; + } + + @Override + public void enable() { + enabled = true; + } + + @Override + public void disable() { + enabled = false; + } + + @Override + public boolean isEnabled() { + return enabled; + } + + public abstract void init(final StateProviderInitializationContext context) throws IOException; +} http://git-wip-us.apache.org/repos/asf/nifi/blob/57dadb72/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/providers/local/WriteAheadLocalStateProvider.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/providers/local/WriteAheadLocalStateProvider.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/providers/local/WriteAheadLocalStateProvider.java new file mode 100644 index 0000000..c23e517 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/providers/local/WriteAheadLocalStateProvider.java @@ -0,0 +1,224 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.controller.state.providers.local; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.state.StateMap; +import org.apache.nifi.components.state.StateProviderInitializationContext; +import org.apache.nifi.controller.state.StandardStateMap; +import org.apache.nifi.controller.state.StateMapSerDe; +import org.apache.nifi.controller.state.StateMapUpdate; +import org.apache.nifi.controller.state.providers.AbstractStateProvider; +import org.apache.nifi.processor.util.StandardValidators; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.wali.MinimalLockingWriteAheadLog; +import org.wali.UpdateType; +import org.wali.WriteAheadRepository; + +/** + * Provides state management for local (node-only) state, backed by a write-ahead log + */ +public class WriteAheadLocalStateProvider extends AbstractStateProvider { + private static final Logger logger = LoggerFactory.getLogger(WriteAheadLocalStateProvider.class); + + // TODO: CREATE BACKGROUND THREAD OR USE EXECUTOR (in StateProviderInitializationContext?) to schedule checkpointing. + private static final long CHECKPOINT_NANOS = TimeUnit.MINUTES.toNanos(2); + + private final StateMapSerDe serde; + private final ConcurrentMap<String, ComponentProvider> componentProviders = new ConcurrentHashMap<>(); + + static final PropertyDescriptor PATH = new PropertyDescriptor.Builder() + .name("Directory") + .description("The directory where the Provider should store its data") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .defaultValue("./state") + .required(true) + .build(); + + private WriteAheadRepository<StateMapUpdate> writeAheadLog; + private AtomicLong versionGenerator; + + public WriteAheadLocalStateProvider() { + serde = new StateMapSerDe(); + } + + @Override + public synchronized void init(final StateProviderInitializationContext context) throws IOException { + final File basePath = new File(context.getProperty(PATH).getValue()); + + if (!basePath.exists() && !basePath.mkdirs()) { + throw new RuntimeException("Cannot Initialize Local State Provider because the 'Directory' property is set to \"" + basePath + "\", but that directory could not be created"); + } + + if (!basePath.isDirectory()) { + throw new RuntimeException("Cannot Initialize Local State Provider because the 'Directory' property is set to \"" + basePath + "\", but that is a file, rather than a directory"); + } + + if (!basePath.canWrite()) { + throw new RuntimeException("Cannot Initialize Local State Provider because the 'Directory' property is set to \"" + basePath + "\", but that directory cannot be written to"); + } + + if (!basePath.canRead()) { + throw new RuntimeException("Cannot Initialize Local State Provider because the 'Directory' property is set to \"" + basePath + "\", but that directory cannot be read"); + } + + versionGenerator = new AtomicLong(-1L); + writeAheadLog = new MinimalLockingWriteAheadLog<>(basePath.toPath(), 16, serde, null); + + final Collection<StateMapUpdate> updates = writeAheadLog.recoverRecords(); + long maxRecordVersion = -1L; + + for (final StateMapUpdate update : updates) { + if (update.getUpdateType() == UpdateType.DELETE) { + continue; + } + + final long recordVersion = update.getStateMap().getVersion(); + if (recordVersion > maxRecordVersion) { + maxRecordVersion = recordVersion; + } + + final String componentId = update.getComponentId(); + componentProviders.put(componentId, new ComponentProvider(writeAheadLog, versionGenerator, componentId, update.getStateMap())); + } + + // keep a separate maxRecordVersion and set it at the end so that we don't have to continually update an AtomicLong, which is more + // expensive than just keeping track of a local 'long' variable. Since we won't actually increment this at any point until this after + // the init() method completes, this is okay to do. + versionGenerator.set(maxRecordVersion); + } + + @Override + public List<PropertyDescriptor> getSupportedPropertyDescriptors() { + final List<PropertyDescriptor> properties = new ArrayList<>(); + properties.add(PATH); + return properties; + } + + @Override + public synchronized void shutdown() { + try { + writeAheadLog.shutdown(); + } catch (final IOException ioe) { + logger.warn("Failed to shut down {} successfully due to {}", this, ioe.toString()); + logger.warn("", ioe); + } + } + + private ComponentProvider getProvider(final String componentId) { + ComponentProvider componentProvider = componentProviders.get(componentId); + if (componentProvider == null) { + final StateMap stateMap = new StandardStateMap(Collections.<String, String> emptyMap(), -1L); + componentProvider = new ComponentProvider(writeAheadLog, versionGenerator, componentId, stateMap); + + final ComponentProvider existingComponentProvider = componentProviders.putIfAbsent(componentId, componentProvider); + if (existingComponentProvider != null) { + componentProvider = existingComponentProvider; + } + } + + return componentProvider; + } + + @Override + public StateMap getState(final String componentId) throws IOException { + return getProvider(componentId).getState(); + } + + @Override + public void setState(final Map<String, String> state, final String componentId) throws IOException { + getProvider(componentId).setState(state); + } + + @Override + public boolean replace(final StateMap oldValue, final Map<String, String> newValue, final String componentId) throws IOException { + return getProvider(componentId).replace(oldValue, newValue); + } + + @Override + public void clear(final String componentId) throws IOException { + getProvider(componentId).clear(); + } + + @Override + public void onComponentRemoved(final String componentId) throws IOException { + clear(componentId); + componentProviders.remove(componentId); + } + + private static class ComponentProvider { + private final AtomicLong versionGenerator; + private final WriteAheadRepository<StateMapUpdate> wal; + private final String componentId; + + private StateMap stateMap; + + public ComponentProvider(final WriteAheadRepository<StateMapUpdate> wal, final AtomicLong versionGenerator, final String componentId, final StateMap stateMap) { + this.wal = wal; + this.versionGenerator = versionGenerator; + this.componentId = componentId; + this.stateMap = stateMap; + } + + public synchronized StateMap getState() throws IOException { + return stateMap; + } + + // synchronized because we need to ensure that update of state in WAL and updating of local stateMap variable is atomic. + // Additionally, the implementation of WriteAheadRepository that we are using requires that only a single thread update the + // repository at a time for a record with the same key. I.e., many threads can update the repository at once, as long as they + // are not updating the repository with records that have the same identifier. + public synchronized void setState(final Map<String, String> state) throws IOException { + stateMap = new StandardStateMap(state, versionGenerator.incrementAndGet()); + final StateMapUpdate updateRecord = new StateMapUpdate(stateMap, componentId, UpdateType.UPDATE); + wal.update(Collections.singleton(updateRecord), false); + } + + // see above explanation as to why this method is synchronized. + public synchronized boolean replace(final StateMap oldValue, final Map<String, String> newValue) throws IOException { + if (stateMap != oldValue) { + return false; + } + + stateMap = new StandardStateMap(new HashMap<>(newValue), versionGenerator.incrementAndGet()); + final StateMapUpdate updateRecord = new StateMapUpdate(stateMap, componentId, UpdateType.UPDATE); + wal.update(Collections.singleton(updateRecord), false); + return true; + } + + public synchronized void clear() throws IOException { + stateMap = new StandardStateMap(null, versionGenerator.incrementAndGet()); + final StateMapUpdate update = new StateMapUpdate(stateMap, componentId, UpdateType.UPDATE); + wal.update(Collections.singleton(update), false); + } + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/57dadb72/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/providers/zookeeper/ZooKeeperStateProvider.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/providers/zookeeper/ZooKeeperStateProvider.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/providers/zookeeper/ZooKeeperStateProvider.java new file mode 100644 index 0000000..f865c8b --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/providers/zookeeper/ZooKeeperStateProvider.java @@ -0,0 +1,330 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.controller.state.providers.zookeeper; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.state.StateMap; +import org.apache.nifi.components.state.StateProviderInitializationContext; +import org.apache.nifi.controller.state.StandardStateMap; +import org.apache.nifi.controller.state.providers.AbstractStateProvider; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.KeeperException.Code; +import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.Watcher; +import org.apache.zookeeper.ZKUtil; +import org.apache.zookeeper.ZooDefs.Ids; +import org.apache.zookeeper.ZooKeeper; +import org.apache.zookeeper.data.ACL; +import org.apache.zookeeper.data.Stat; + +public class ZooKeeperStateProvider extends AbstractStateProvider { + static final PropertyDescriptor CONNECTION_STRING = new PropertyDescriptor.Builder() + .name("Connect String") + .description("The ZooKeeper Connect String to use. This is a comma-separated list of hostnames/IP addresses, such as \"host1, host2, 127.0.0.1, host4, host5\"") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .required(false) + .build(); + static final PropertyDescriptor SESSION_TIMEOUT = new PropertyDescriptor.Builder() + .name("Session Timeout") + .description("Specifies how long this instance of NiFi is allowed to be disconnected from ZooKeeper before creating a new ZooKeeper Session") + .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) + .defaultValue("30 sec") + .required(true) + .build(); + static final PropertyDescriptor ROOT_NODE = new PropertyDescriptor.Builder() + .name("Root Node") + .description("The Root Node to use in ZooKeeper to store state in") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .defaultValue("/nifi") + .required(true) + .build(); + + + private final List<ACL> acl; + + private ZooKeeper zooKeeper; + private int timeoutMillis; + private String rootNode; + private String connectionString; + + private static final int ENCODING_VERSION = 1; + + public ZooKeeperStateProvider() throws Exception { + // TODO: Provide SSL Context + // TODO: Use more appropriate acl + acl = Ids.OPEN_ACL_UNSAFE; + } + + + @Override + public synchronized void init(final StateProviderInitializationContext context) { + connectionString = context.getProperty(CONNECTION_STRING).getValue(); + + rootNode = context.getProperty(ROOT_NODE).getValue(); + timeoutMillis = context.getProperty(SESSION_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue(); + } + + @Override + public synchronized void shutdown() { + if (zooKeeper != null) { + try { + zooKeeper.close(); + } catch (final InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + + zooKeeper = null; + } + + @Override + public List<PropertyDescriptor> getSupportedPropertyDescriptors() { + final List<PropertyDescriptor> properties = new ArrayList<>(); + properties.add(CONNECTION_STRING); + properties.add(SESSION_TIMEOUT); + properties.add(ROOT_NODE); + return properties; + } + + private synchronized ZooKeeper getZooKeeper() throws IOException { + if (zooKeeper != null && !zooKeeper.getState().isAlive()) { + invalidateClient(); + } + + if (zooKeeper == null) { + zooKeeper = new ZooKeeper(connectionString, timeoutMillis, new Watcher() { + @Override + public void process(WatchedEvent event) { + } + }); + } + + return zooKeeper; + } + + private synchronized void invalidateClient() { + shutdown(); + } + + private String getComponentPath(final String componentId) { + return rootNode + "/components/" + componentId; + } + + private void verifyEnabled() throws IOException { + if (!isEnabled()) { + throw new IOException("Cannot update or retrieve cluster state becuase node is no longer connected to a cluster"); + } + } + + @Override + public void onComponentRemoved(final String componentId) throws IOException { + try { + ZKUtil.deleteRecursive(getZooKeeper(), getComponentPath(componentId)); + } catch (final KeeperException ke) { + // Node doesn't exist so just ignore + if (Code.NONODE == ke.code()) { + return; + } + if (Code.SESSIONEXPIRED == ke.code()) { + invalidateClient(); + onComponentRemoved(componentId); + } + + throw new IOException("Unable to remove state for component with ID '" + componentId + "' from ZooKeeper", ke); + } catch (final InterruptedException e) { + Thread.currentThread().interrupt(); + throw new IOException("Failed to remove state for component with ID '" + componentId + "' from ZooKeeper due to being interrupted", e); + } + } + + @Override + public void setState(final Map<String, String> state, final String componentId) throws IOException { + setState(state, -1, componentId); + } + + + private byte[] serialize(final Map<String, String> stateValues) throws IOException { + try (final ByteArrayOutputStream baos = new ByteArrayOutputStream(); + final DataOutputStream dos = new DataOutputStream(baos)) { + dos.writeInt(ENCODING_VERSION); + dos.writeInt(stateValues.size()); + for (final Map.Entry<String, String> entry : stateValues.entrySet()) { + dos.writeUTF(entry.getKey()); + dos.writeUTF(entry.getValue()); + } + return baos.toByteArray(); + } + } + + private StateMap deserialize(final byte[] data, final int recordVersion, final String componentId) throws IOException { + try (final ByteArrayInputStream bais = new ByteArrayInputStream(data); + final DataInputStream dis = new DataInputStream(bais)) { + + final int encodingVersion = dis.readInt(); + if (encodingVersion > ENCODING_VERSION) { + throw new IOException("Retrieved a response from ZooKeeper when retrieving state for component with ID " + componentId + + ", but the response was encoded using the ZooKeeperStateProvider Encoding Version of " + encodingVersion + + " but this instance can only decode versions up to " + ENCODING_VERSION + + "; it appears that the state was encoded using a newer version of NiFi than is currently running. This information cannot be decoded."); + } + + final int numEntries = dis.readInt(); + final Map<String, String> stateValues = new HashMap<>(numEntries); + for (int i = 0; i < numEntries; i++) { + final String key = dis.readUTF(); + final String value = dis.readUTF(); + stateValues.put(key, value); + } + + return new StandardStateMap(stateValues, recordVersion); + } + } + + private void setState(final Map<String, String> stateValues, final int version, final String componentId) throws IOException { + verifyEnabled(); + + try { + final String path = getComponentPath(componentId); + final byte[] data = serialize(stateValues); + + final ZooKeeper keeper = getZooKeeper(); + try { + keeper.setData(path, data, version); + } catch (final KeeperException ke) { + if (ke.code() == Code.NONODE) { + createNode(path, data); + } else { + throw ke; + } + } + } catch (final InterruptedException e) { + Thread.currentThread().interrupt(); + throw new IOException("Failed to set cluster-wide state in ZooKeeper for component with ID " + componentId + " due to interruption", e); + } catch (final KeeperException ke) { + if (Code.SESSIONEXPIRED == ke.code()) { + invalidateClient(); + setState(stateValues, version, componentId); + } + + throw new IOException("Failed to set cluster-wide state in ZooKeeper for component with ID " + componentId, ke); + } catch (final IOException ioe) { + throw new IOException("Failed to set cluster-wide state in ZooKeeper for component with ID " + componentId, ioe); + } + } + + + private void createNode(final String path, final byte[] data) throws IOException, KeeperException { + try { + getZooKeeper().create(path, data, acl, CreateMode.PERSISTENT); + } catch (final InterruptedException ie) { + throw new IOException("Failed to update cluster-wide state due to interruption", ie); + } catch (final KeeperException ke) { + if (ke.code() == Code.NONODE) { + final String parentPath = StringUtils.substringBeforeLast(path, "/"); + createNode(parentPath, null); + createNode(path, data); + return; + } + if (Code.SESSIONEXPIRED == ke.code()) { + invalidateClient(); + createNode(path, data); + } + + // Node already exists. Node must have been created by "someone else". Just set the data. + if (ke.code() == Code.NODEEXISTS) { + try { + getZooKeeper().setData(path, data, -1); + } catch (final KeeperException ke1) { + // Node no longer exists -- it was removed by someone else. Go recreate the node. + if (ke1.code() == Code.NONODE) { + createNode(path, data); + } + } catch (final InterruptedException ie) { + throw new IOException("Failed to update cluster-wide state due to interruption", ie); + } + } + + + throw ke; + } + } + + @Override + public StateMap getState(final String componentId) throws IOException { + verifyEnabled(); + + try { + final Stat stat = new Stat(); + final String path = getComponentPath(componentId); + final byte[] data = getZooKeeper().getData(path, false, stat); + + final StateMap stateMap = deserialize(data, stat.getVersion(), componentId); + return stateMap; + } catch (final InterruptedException e) { + Thread.currentThread().interrupt(); + throw new IOException("Failed to obtain value from ZooKeeper for component with ID " + componentId + ", due to interruption", e); + } catch (final KeeperException ke) { + if (ke.code() == Code.NONODE) { + return new StandardStateMap(null, -1L); + } + if (Code.SESSIONEXPIRED == ke.code()) { + invalidateClient(); + return getState(componentId); + } + + throw new IOException("Failed to obtain value from ZooKeeper for component with ID " + componentId, ke); + } catch (final IOException ioe) { + // provide more context in the error message + throw new IOException("Failed to obtain value from ZooKeeper for component with ID " + componentId, ioe); + } + } + + @Override + public boolean replace(final StateMap oldValue, final Map<String, String> newValue, final String componentId) throws IOException { + verifyEnabled(); + + try { + setState(newValue, (int) oldValue.getVersion(), componentId); + return true; + } catch (final IOException ioe) { + return false; + } + } + + + @Override + public void clear(final String componentId) throws IOException { + verifyEnabled(); + setState(Collections.<String, String> emptyMap(), componentId); + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/57dadb72/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/server/ZooKeeperStateServer.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/server/ZooKeeperStateServer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/server/ZooKeeperStateServer.java new file mode 100644 index 0000000..8af9e5a --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/server/ZooKeeperStateServer.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.controller.state.server; + +import java.io.BufferedInputStream; +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.util.Properties; + +import org.apache.nifi.util.NiFiProperties; +import org.apache.zookeeper.server.ServerConfig; +import org.apache.zookeeper.server.ZooKeeperServerMain; +import org.apache.zookeeper.server.quorum.QuorumPeerConfig; +import org.apache.zookeeper.server.quorum.QuorumPeerConfig.ConfigException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ZooKeeperStateServer extends ZooKeeperServerMain { + private static final Logger logger = LoggerFactory.getLogger(ZooKeeperStateServer.class); + + private QuorumPeerConfig quorumPeerConfig; + + private ZooKeeperStateServer(final Properties zkProperties) throws IOException, ConfigException { + quorumPeerConfig = new QuorumPeerConfig(); + quorumPeerConfig.parseProperties(zkProperties); + } + + public synchronized void start() throws IOException { + logger.info("Starting Embedded ZooKeeper Server"); + + final ServerConfig serverConfig = new ServerConfig(); + serverConfig.readFrom(quorumPeerConfig); + runFromConfig(serverConfig); + } + + @Override + public synchronized void shutdown() { + super.shutdown(); + } + + public static ZooKeeperStateServer create(final NiFiProperties properties) throws IOException, ConfigException { + final File propsFile = properties.getEmbeddedZooKeeperPropertiesFile(); + if (propsFile == null) { + return null; + } + + if (!propsFile.exists() || !propsFile.canRead()) { + throw new IOException("Cannot create Embedded ZooKeeper Server because the Properties File " + propsFile.getAbsolutePath() + + " referenced in nifi.properties does not exist or cannot be read"); + } + + final Properties zkProperties = new Properties(); + try (final InputStream fis = new FileInputStream(propsFile); + final InputStream bis = new BufferedInputStream(fis)) { + zkProperties.load(bis); + } + + return new ZooKeeperStateServer(zkProperties); + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/57dadb72/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java index 88fa43f..17f9149 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java @@ -37,6 +37,7 @@ import org.apache.commons.lang3.builder.ToStringStyle; import org.apache.nifi.annotation.lifecycle.OnRemoved; import org.apache.nifi.annotation.lifecycle.OnShutdown; import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.state.StateManager; import org.apache.nifi.connectable.Connectable; import org.apache.nifi.connectable.ConnectableType; import org.apache.nifi.connectable.Connection; @@ -44,6 +45,7 @@ import org.apache.nifi.connectable.Funnel; import org.apache.nifi.connectable.LocalPort; import org.apache.nifi.connectable.Port; import org.apache.nifi.connectable.Position; +import org.apache.nifi.controller.FlowController; import org.apache.nifi.controller.ProcessScheduler; import org.apache.nifi.controller.ProcessorNode; import org.apache.nifi.controller.ScheduledState; @@ -73,6 +75,7 @@ public final class StandardProcessGroup implements ProcessGroup { private final ProcessScheduler scheduler; private final ControllerServiceProvider controllerServiceProvider; + private final FlowController flowController; private final Map<String, Port> inputPorts = new HashMap<>(); private final Map<String, Port> outputPorts = new HashMap<>(); @@ -90,13 +93,16 @@ public final class StandardProcessGroup implements ProcessGroup { private static final Logger LOG = LoggerFactory.getLogger(StandardProcessGroup.class); - public StandardProcessGroup(final String id, final ControllerServiceProvider serviceProvider, final ProcessScheduler scheduler, final NiFiProperties nifiProps, final StringEncryptor encryptor) { + public StandardProcessGroup(final String id, final ControllerServiceProvider serviceProvider, final ProcessScheduler scheduler, final NiFiProperties nifiProps, final StringEncryptor encryptor, + final FlowController flowController) { this.id = id; this.controllerServiceProvider = serviceProvider; this.parent = new AtomicReference<>(); this.scheduler = scheduler; this.comments = new AtomicReference<>(""); this.encryptor = encryptor; + this.flowController = flowController; + name = new AtomicReference<>(); position = new AtomicReference<>(new Position(0D, 0D)); } @@ -327,11 +333,15 @@ public final class StandardProcessGroup implements ProcessGroup { } } + private StateManager getStateManager(final String componentId) { + return flowController.getStateManagerProvider().getStateManager(componentId); + } + @SuppressWarnings("deprecation") private void shutdown(final ProcessGroup procGroup) { for (final ProcessorNode node : procGroup.getProcessors()) { try (final NarCloseable x = NarCloseable.withNarLoader()) { - final StandardProcessContext processContext = new StandardProcessContext(node, controllerServiceProvider, encryptor); + final StandardProcessContext processContext = new StandardProcessContext(node, controllerServiceProvider, encryptor, getStateManager(node.getIdentifier())); ReflectionUtils.quietlyInvokeMethodsWithAnnotations(OnShutdown.class, org.apache.nifi.processor.annotation.OnShutdown.class, node.getProcessor(), processContext); } } @@ -681,8 +691,8 @@ public final class StandardProcessGroup implements ProcessGroup { } } - @SuppressWarnings("deprecation") @Override + @SuppressWarnings("deprecation") public void removeProcessor(final ProcessorNode processor) { final String id = requireNonNull(processor).getIdentifier(); writeLock.lock(); @@ -697,7 +707,7 @@ public final class StandardProcessGroup implements ProcessGroup { } try (final NarCloseable x = NarCloseable.withNarLoader()) { - final StandardProcessContext processContext = new StandardProcessContext(processor, controllerServiceProvider, encryptor); + final StandardProcessContext processContext = new StandardProcessContext(processor, controllerServiceProvider, encryptor, getStateManager(processor.getIdentifier())); ReflectionUtils.quietlyInvokeMethodsWithAnnotations(OnRemoved.class, org.apache.nifi.processor.annotation.OnRemoved.class, processor.getProcessor(), processContext); } catch (final Exception e) { throw new ComponentLifeCycleException("Failed to invoke 'OnRemoved' methods of " + processor, e); @@ -719,6 +729,8 @@ public final class StandardProcessGroup implements ProcessGroup { processors.remove(id); LogRepositoryFactory.getRepository(processor.getIdentifier()).removeAllObservers(); + flowController.getStateManagerProvider().onComponentRemoved(processor.getIdentifier()); + // must copy to avoid a concurrent modification final Set<Connection> copy = new HashSet<>(processor.getConnections()); for (final Connection conn : copy) { http://git-wip-us.apache.org/repos/asf/nifi/blob/57dadb72/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardProcessContext.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardProcessContext.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardProcessContext.java index 8e52a46..5bb1a86 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardProcessContext.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardProcessContext.java @@ -29,6 +29,7 @@ import org.apache.nifi.attribute.expression.language.StandardPropertyValue; import org.apache.nifi.attribute.expression.language.Query.Range; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.PropertyValue; +import org.apache.nifi.components.state.StateManager; import org.apache.nifi.connectable.Connection; import org.apache.nifi.controller.ControllerService; import org.apache.nifi.controller.ControllerServiceLookup; @@ -43,11 +44,13 @@ public class StandardProcessContext implements ProcessContext, ControllerService private final ControllerServiceProvider controllerServiceProvider; private final Map<PropertyDescriptor, PreparedQuery> preparedQueries; private final StringEncryptor encryptor; + private final StateManager stateManager; - public StandardProcessContext(final ProcessorNode processorNode, final ControllerServiceProvider controllerServiceProvider, final StringEncryptor encryptor) { + public StandardProcessContext(final ProcessorNode processorNode, final ControllerServiceProvider controllerServiceProvider, final StringEncryptor encryptor, final StateManager stateManager) { this.procNode = processorNode; this.controllerServiceProvider = controllerServiceProvider; this.encryptor = encryptor; + this.stateManager = stateManager; preparedQueries = new HashMap<>(); for (final Map.Entry<PropertyDescriptor, String> entry : procNode.getProperties().entrySet()) { @@ -208,4 +211,9 @@ public class StandardProcessContext implements ProcessContext, ControllerService final List<Range> elRanges = Query.extractExpressionRanges(getProperty(property).getValue()); return (elRanges != null && !elRanges.isEmpty()); } + + @Override + public StateManager getStateManager() { + return stateManager; + } }
