Repository: nifi Updated Branches: refs/heads/master 98395de74 -> a7b97419e
NIFI-1626: Throw an Exception proactively if too much state is attempting to be stored via ZooKeeperStateProvider NIFI-1626: Updated State Management section of Developer Guide Signed-off-by: jpercivall <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/a7b97419 Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/a7b97419 Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/a7b97419 Branch: refs/heads/master Commit: a7b97419e5e0addba6a4758fc735e6259cc06e44 Parents: 98395de Author: Mark Payne <[email protected]> Authored: Wed Mar 16 13:19:33 2016 -0400 Committer: jpercivall <[email protected]> Committed: Wed Mar 16 16:12:00 2016 -0400 ---------------------------------------------------------------------- .../nifi/components/state/StateManager.java | 3 ++ .../state/exception/StateTooLargeException.java | 32 ++++++++++++++ .../src/main/asciidoc/developer-guide.adoc | 45 +++++++++++--------- .../zookeeper/ZooKeeperStateProvider.java | 29 ++++++++++--- .../zookeeper/TestZooKeeperStateProvider.java | 36 ++++++++++++++++ 5 files changed, 118 insertions(+), 27 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/a7b97419/nifi-api/src/main/java/org/apache/nifi/components/state/StateManager.java ---------------------------------------------------------------------- diff --git a/nifi-api/src/main/java/org/apache/nifi/components/state/StateManager.java b/nifi-api/src/main/java/org/apache/nifi/components/state/StateManager.java index 7ff8cec..768f773 100644 --- a/nifi-api/src/main/java/org/apache/nifi/components/state/StateManager.java +++ b/nifi-api/src/main/java/org/apache/nifi/components/state/StateManager.java @@ -21,6 +21,7 @@ import java.io.IOException; import java.util.Map; import org.apache.nifi.annotation.behavior.Stateful; +import org.apache.nifi.components.state.exception.StateTooLargeException; /** * <p> @@ -60,6 +61,7 @@ public interface StateManager { * @param state the value to change the state to * @param scope the scope to use when storing the state * + * @throws StateTooLargeException if attempting to store more state than is allowed by the backing storage mechanism * @throws IOException if unable to communicate with the underlying storage mechanism */ void setState(Map<String, String> state, Scope scope) throws IOException; @@ -84,6 +86,7 @@ public interface StateManager { * @return <code>true</code> if the state was updated to the new value, <code>false</code> if the state's value was not * equal to oldValue * + * @throws StateTooLargeException if attempting to store more state than is allowed by the backing storage mechanism * @throws IOException if unable to communicate with the underlying storage mechanism */ boolean replace(StateMap oldValue, Map<String, String> newValue, Scope scope) throws IOException; http://git-wip-us.apache.org/repos/asf/nifi/blob/a7b97419/nifi-api/src/main/java/org/apache/nifi/components/state/exception/StateTooLargeException.java ---------------------------------------------------------------------- diff --git a/nifi-api/src/main/java/org/apache/nifi/components/state/exception/StateTooLargeException.java b/nifi-api/src/main/java/org/apache/nifi/components/state/exception/StateTooLargeException.java new file mode 100644 index 0000000..99546ae --- /dev/null +++ b/nifi-api/src/main/java/org/apache/nifi/components/state/exception/StateTooLargeException.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.components.state.exception; + +import java.io.IOException; + +/** + * Thrown when attempting to store state via the {@link StateManager} but the state being + * stored is larger than is allowed by the backing storage mechanism. + */ +public class StateTooLargeException extends IOException { + private static final long serialVersionUID = 1L; + + public StateTooLargeException(final String message) { + super(message); + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/a7b97419/nifi-docs/src/main/asciidoc/developer-guide.adoc ---------------------------------------------------------------------- diff --git a/nifi-docs/src/main/asciidoc/developer-guide.adoc b/nifi-docs/src/main/asciidoc/developer-guide.adoc index 1b13142..bcdd6cd 100644 --- a/nifi-docs/src/main/asciidoc/developer-guide.adoc +++ b/nifi-docs/src/main/asciidoc/developer-guide.adoc @@ -570,20 +570,28 @@ relied upon for critical business logic. From the ProcessContext, ReportingContext, and ControllerServiceInitializationContext, components are able to call the `getStateManager()` method. This State Manager is responsible for providing a simple API -for storing and retrieving state. As such, the API is designed to be quite similar to the ConcurrentMap -API, which most Java developers are already familiar with. +for storing and retrieving state. This mechanism is intended to provide developers with the ability to +very easily store a set of key/value pairs, retrieve those values, and update them atomically. The state +can be stored local to the node or across all nodes in a cluster. It is important to note, however, that +this mechanism is intended only to provide a mechanism for storing very 'simple' state. As such, the API +simply allows a `Map<String, String>` to be stored and retrieved and for the entire Map to be atomically +replaced. Moreover, the only implementation that is currently supported for storing cluster-wide state is +backed by ZooKeeper. As such, the entire State Map must be less than 1 MB in size, after being serialized. +Attempting to store more than this will result in an Exception being thrown. If the interactions required +by the Processor for managing state are more complex than this (e.g., large amounts of data must be stored +and retrieved, or individual keys must be stored and fetched individually) than a different mechanism should +be used (e.g., communicating with an external database). [[state_scope]] ==== Scope -One very notable difference between the StateManager API and the ConcurrentMap API, however, is the presence -of a Scope object on each method call of the StateManager. This Scope will either be `Scope.NODE` or `Scope.CLUSTER`. -If NiFi is run in a cluster, this Scope provides important information to the framework about how the operation should -occur. +When communicating with the State Manager, all method calls require that a Scope be provided. This Scope will +either be `Scope.NODE` or `Scope.CLUSTER`. If NiFi is run in a cluster, this Scope provides important information +to the framework about how the operation should occur. If state as stored using `Scope.CLUSTER`, then all nodes in the cluster will be communicating with the same -state storage mechanism, as if all nodes were to share a single ConcurrentMap. If state is stored and retrieved using -`Scope.NODE`, then each node will see a different representation of the state. +state storage mechanism. If state is stored and retrieved using `Scope.NODE`, then each node will see a different +representation of the state. It is also worth noting that if NiFi is configured to run as a standalone instance, rather than running in a cluster, a scope of `Scope.NODE` is always used. This is done in order to allow the developer of a NiFi component to write the code @@ -593,13 +601,12 @@ that the instance is clustered and write the code accordingly. ==== Storing and Retrieving State -State is stored using the StateManager's `set`, `replace`, `putIfAbsent`, `remove`, and `clear` methods. All of these methods, -with the exception of `clear` take as the first argument the key to be set. The key that is used is unique only to the same -instance of the component and for the same Scope. That is, if two Processors store a value using the key _My Key_, those Processors -will not conflict with each other, even if both Processors are of the same type (e.g., both are of type ListFile) and scope. Furthermore, -if a Processor stores a value with the key of _My Key_ using the `Scope.CLUSTER` scope, and then attempts to retrieve the value -using the `Scope.NODE` scope, the value retrieved will be `null`. Each Processor's state, then, is stored in isolation from other -Processors' state. A unique key can be thought of as a triple of <Processor Instance, Key, Scope>. +State is stored using the StateManager's `getState`, `setState`, `replace`, and `clear` methods. All of these methods +require that a Scope be provided. It should be noted that the state that is stored with the Local scope is entirely different +than state stored with a Cluster scope. If a Processor stores a value with the key of _My Key_ using the `Scope.CLUSTER` scope, +and then attempts to retrieve the value using the `Scope.NODE` scope, the value retrieved will be `null` (unless a value was +also stored with the same key using the `Scope.CLUSTER` scope). Each Processor's state, is stored in isolation from other +Processors' state. It follows, then, that two Processors cannot share the same state. There are, however, some circumstances in which it is very necessary to share state between two Processors of different types, or two Processors of the same type. This can be accomplished @@ -615,13 +622,9 @@ defined by the `StateManager` interface, that help developers to more easily dev First, the `MockStateManager` implements the `StateManager` interface, so all of the state can be examined from within a unit test. Additionally, the `MockStateManager` exposes a handful of `assert*` methods to perform assertions that the State is set as expected. -There are times, however, that state could be updated multiple times during the run of a single invocation of a Processor's `onTrigger` -method. In this case, inspecting the values after running the Processor may not be sufficient. Additionally, we must always remember at each -step to check the value of the stored state, which can become error-prone and burdensome for the developer. +The `MockStateManager` also provides the ability to indicate that the unit test should immediately fail if state is updated for a particular +`Scope`. -For these reasons, the `MockStateManager` provides an additional method, named `failIfStateSet`. This method instructs the State Manager that -the unit test should immediately fail if the state for a given key is ever set, or if it is set to a specific value. The `doNotFailIfStateSet` -method can then be used to instruct the Mock Framework to clear this state and allow state to be set to any value. http://git-wip-us.apache.org/repos/asf/nifi/blob/a7b97419/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/providers/zookeeper/ZooKeeperStateProvider.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/providers/zookeeper/ZooKeeperStateProvider.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/providers/zookeeper/ZooKeeperStateProvider.java index acc0bc3..3a8a8e4 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/providers/zookeeper/ZooKeeperStateProvider.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/providers/zookeeper/ZooKeeperStateProvider.java @@ -40,6 +40,7 @@ import org.apache.nifi.components.Validator; import org.apache.nifi.components.state.Scope; import org.apache.nifi.components.state.StateMap; import org.apache.nifi.components.state.StateProviderInitializationContext; +import org.apache.nifi.components.state.exception.StateTooLargeException; import org.apache.nifi.controller.state.StandardStateMap; import org.apache.nifi.controller.state.providers.AbstractStateProvider; import org.apache.nifi.processor.util.StandardValidators; @@ -62,6 +63,8 @@ import org.apache.zookeeper.data.Stat; * consistency across configuration interactions. */ public class ZooKeeperStateProvider extends AbstractStateProvider { + private static final int ONE_MB = 1024 * 1024; + static final AllowableValue OPEN_TO_WORLD = new AllowableValue("Open", "Open", "ZNodes will be open to any ZooKeeper client."); static final AllowableValue CREATOR_ONLY = new AllowableValue("CreatorOnly", "CreatorOnly", "ZNodes will be accessible only by the creator. The creator will have full access to create, read, write, delete, and administer the ZNodes."); @@ -343,6 +346,7 @@ public class ZooKeeperStateProvider extends AbstractStateProvider { * * @throws IOException if unable to communicate with ZooKeeper * @throws NoNodeException if the corresponding ZNode does not exist in ZooKeeper and allowNodeCreation is set to <code>false</code> + * @throws StateTooLargeException if the state to be stored exceeds the maximum size allowed by ZooKeeper (1 MB, after serialization) */ private void setState(final Map<String, String> stateValues, final int version, final String componentId, final boolean allowNodeCreation) throws IOException, NoNodeException { verifyEnabled(); @@ -350,13 +354,18 @@ public class ZooKeeperStateProvider extends AbstractStateProvider { try { final String path = getComponentPath(componentId); final byte[] data = serialize(stateValues); + if (data.length > ONE_MB) { + throw new StateTooLargeException("Failed to set cluster-wide state in ZooKeeper for component with ID " + componentId + + " because the state had " + stateValues.size() + " values, which serialized to " + data.length + + " bytes, and the maximum allowed by ZooKeeper is 1 MB (" + ONE_MB + " bytes)"); + } final ZooKeeper keeper = getZooKeeper(); try { keeper.setData(path, data, version); } catch (final NoNodeException nne) { if (allowNodeCreation) { - createNode(path, data); + createNode(path, data, componentId, stateValues); return; } else { throw nne; @@ -379,14 +388,22 @@ public class ZooKeeperStateProvider extends AbstractStateProvider { } throw new IOException("Failed to set cluster-wide state in ZooKeeper for component with ID " + componentId, ke); + } catch (final StateTooLargeException stle) { + throw stle; } catch (final IOException ioe) { throw new IOException("Failed to set cluster-wide state in ZooKeeper for component with ID " + componentId, ioe); } } - private void createNode(final String path, final byte[] data) throws IOException, KeeperException { + private void createNode(final String path, final byte[] data, final String componentId, final Map<String, String> stateValues) throws IOException, KeeperException { try { + if (data != null && data.length > ONE_MB) { + throw new StateTooLargeException("Failed to set cluster-wide state in ZooKeeper for component with ID " + componentId + + " because the state had " + stateValues.size() + " values, which serialized to " + data.length + + " bytes, and the maximum allowed by ZooKeeper is 1 MB (" + ONE_MB + " bytes)"); + } + getZooKeeper().create(path, data, acl, CreateMode.PERSISTENT); } catch (final InterruptedException ie) { throw new IOException("Failed to update cluster-wide state due to interruption", ie); @@ -394,13 +411,13 @@ public class ZooKeeperStateProvider extends AbstractStateProvider { final Code exceptionCode = ke.code(); if (Code.NONODE == exceptionCode) { final String parentPath = StringUtils.substringBeforeLast(path, "/"); - createNode(parentPath, null); - createNode(path, data); + createNode(parentPath, null, componentId, stateValues); + createNode(path, data, componentId, stateValues); return; } if (Code.SESSIONEXPIRED == exceptionCode) { invalidateClient(); - createNode(path, data); + createNode(path, data, componentId, stateValues); return; } @@ -412,7 +429,7 @@ public class ZooKeeperStateProvider extends AbstractStateProvider { } catch (final KeeperException ke1) { // Node no longer exists -- it was removed by someone else. Go recreate the node. if (ke1.code() == Code.NONODE) { - createNode(path, data); + createNode(path, data, componentId, stateValues); return; } } catch (final InterruptedException ie) { http://git-wip-us.apache.org/repos/asf/nifi/blob/a7b97419/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/state/providers/zookeeper/TestZooKeeperStateProvider.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/state/providers/zookeeper/TestZooKeeperStateProvider.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/state/providers/zookeeper/TestZooKeeperStateProvider.java index 7e03a9c..ef72d6c 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/state/providers/zookeeper/TestZooKeeperStateProvider.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/state/providers/zookeeper/TestZooKeeperStateProvider.java @@ -34,6 +34,7 @@ import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.PropertyValue; import org.apache.nifi.components.state.StateProvider; import org.apache.nifi.components.state.StateProviderInitializationContext; +import org.apache.nifi.components.state.exception.StateTooLargeException; import org.apache.nifi.controller.state.providers.AbstractTestStateProvider; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.KeeperException.Code; @@ -202,4 +203,39 @@ public class TestZooKeeperStateProvider extends AbstractTestStateProvider { authorizedProvider.shutdown(); } } + + @Test + public void testStateTooLargeExceptionThrown() { + final Map<String, String> state = new HashMap<>(); + final StringBuilder sb = new StringBuilder(); + + // Build a string that is a little less than 64 KB, because that's + // the largest value available for DataOutputStream.writeUTF + for (int i = 0; i < 6500; i++) { + sb.append("0123456789"); + } + + for (int i = 0; i < 20; i++) { + state.put("numbers." + i, sb.toString()); + } + + try { + getProvider().setState(state, componentId); + Assert.fail("Expected StateTooLargeException"); + } catch (final StateTooLargeException stle) { + // expected behavior. + } catch (final Exception e) { + Assert.fail("Expected StateTooLargeException but " + e.getClass() + " was thrown", e); + } + + try { + getProvider().replace(getProvider().getState(componentId), state, componentId); + Assert.fail("Expected StateTooLargeException"); + } catch (final StateTooLargeException stle) { + // expected behavior. + } catch (final Exception e) { + Assert.fail("Expected StateTooLargeException"); + } + + } }
