Repository: nifi
Updated Branches:
  refs/heads/master 98395de74 -> a7b97419e


NIFI-1626: Throw an Exception proactively if too much state is attempting to be 
stored via ZooKeeperStateProvider

NIFI-1626: Updated State Management section of Developer Guide

Signed-off-by: jpercivall <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/a7b97419
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/a7b97419
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/a7b97419

Branch: refs/heads/master
Commit: a7b97419e5e0addba6a4758fc735e6259cc06e44
Parents: 98395de
Author: Mark Payne <[email protected]>
Authored: Wed Mar 16 13:19:33 2016 -0400
Committer: jpercivall <[email protected]>
Committed: Wed Mar 16 16:12:00 2016 -0400

----------------------------------------------------------------------
 .../nifi/components/state/StateManager.java     |  3 ++
 .../state/exception/StateTooLargeException.java | 32 ++++++++++++++
 .../src/main/asciidoc/developer-guide.adoc      | 45 +++++++++++---------
 .../zookeeper/ZooKeeperStateProvider.java       | 29 ++++++++++---
 .../zookeeper/TestZooKeeperStateProvider.java   | 36 ++++++++++++++++
 5 files changed, 118 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/a7b97419/nifi-api/src/main/java/org/apache/nifi/components/state/StateManager.java
----------------------------------------------------------------------
diff --git 
a/nifi-api/src/main/java/org/apache/nifi/components/state/StateManager.java 
b/nifi-api/src/main/java/org/apache/nifi/components/state/StateManager.java
index 7ff8cec..768f773 100644
--- a/nifi-api/src/main/java/org/apache/nifi/components/state/StateManager.java
+++ b/nifi-api/src/main/java/org/apache/nifi/components/state/StateManager.java
@@ -21,6 +21,7 @@ import java.io.IOException;
 import java.util.Map;
 
 import org.apache.nifi.annotation.behavior.Stateful;
+import org.apache.nifi.components.state.exception.StateTooLargeException;
 
 /**
  * <p>
@@ -60,6 +61,7 @@ public interface StateManager {
      * @param state the value to change the state to
      * @param scope the scope to use when storing the state
      *
+     * @throws StateTooLargeException if attempting to store more state than 
is allowed by the backing storage mechanism
      * @throws IOException if unable to communicate with the underlying 
storage mechanism
      */
     void setState(Map<String, String> state, Scope scope) throws IOException;
@@ -84,6 +86,7 @@ public interface StateManager {
      * @return <code>true</code> if the state was updated to the new value, 
<code>false</code> if the state's value was not
      *         equal to oldValue
      *
+     * @throws StateTooLargeException if attempting to store more state than 
is allowed by the backing storage mechanism
      * @throws IOException if unable to communicate with the underlying 
storage mechanism
      */
     boolean replace(StateMap oldValue, Map<String, String> newValue, Scope 
scope) throws IOException;

http://git-wip-us.apache.org/repos/asf/nifi/blob/a7b97419/nifi-api/src/main/java/org/apache/nifi/components/state/exception/StateTooLargeException.java
----------------------------------------------------------------------
diff --git 
a/nifi-api/src/main/java/org/apache/nifi/components/state/exception/StateTooLargeException.java
 
b/nifi-api/src/main/java/org/apache/nifi/components/state/exception/StateTooLargeException.java
new file mode 100644
index 0000000..99546ae
--- /dev/null
+++ 
b/nifi-api/src/main/java/org/apache/nifi/components/state/exception/StateTooLargeException.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.components.state.exception;
+
+import java.io.IOException;
+
+/**
+ * Thrown when attempting to store state via the {@link StateManager} but the 
state being
+ * stored is larger than is allowed by the backing storage mechanism.
+ */
+public class StateTooLargeException extends IOException {
+    private static final long serialVersionUID = 1L;
+
+    public StateTooLargeException(final String message) {
+        super(message);
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/a7b97419/nifi-docs/src/main/asciidoc/developer-guide.adoc
----------------------------------------------------------------------
diff --git a/nifi-docs/src/main/asciidoc/developer-guide.adoc 
b/nifi-docs/src/main/asciidoc/developer-guide.adoc
index 1b13142..bcdd6cd 100644
--- a/nifi-docs/src/main/asciidoc/developer-guide.adoc
+++ b/nifi-docs/src/main/asciidoc/developer-guide.adoc
@@ -570,20 +570,28 @@ relied upon for critical business logic.
 
 From the ProcessContext, ReportingContext, and 
ControllerServiceInitializationContext, components are
 able to call the `getStateManager()` method. This State Manager is responsible 
for providing a simple API
-for storing and retrieving state. As such, the API is designed to be quite 
similar to the ConcurrentMap
-API, which most Java developers are already familiar with.
+for storing and retrieving state. This mechanism is intended to provide 
developers with the ability to
+very easily store a set of key/value pairs, retrieve those values, and update 
them atomically. The state
+can be stored local to the node or across all nodes in a cluster. It is 
important to note, however, that
+this mechanism is intended only to provide a mechanism for storing very 
'simple' state. As such, the API
+simply allows a `Map<String, String>` to be stored and retrieved and for the 
entire Map to be atomically
+replaced. Moreover, the only implementation that is currently supported for 
storing cluster-wide state is
+backed by ZooKeeper. As such, the entire State Map must be less than 1 MB in 
size, after being serialized.
+Attempting to store more than this will result in an Exception being thrown. 
If the interactions required
+by the Processor for managing state are more complex than this (e.g., large 
amounts of data must be stored
+and retrieved, or individual keys must be stored and fetched individually) 
than a different mechanism should
+be used (e.g., communicating with an external database). 
 
 
 [[state_scope]]
 ==== Scope
-One very notable difference between the StateManager API and the ConcurrentMap 
API, however, is the presence
-of a Scope object on each method call of the StateManager. This Scope will 
either be `Scope.NODE` or `Scope.CLUSTER`.
-If NiFi is run in a cluster, this Scope provides important information to the 
framework about how the operation should
-occur.
+When communicating with the State Manager, all method calls require that a 
Scope be provided. This Scope will
+either be `Scope.NODE` or `Scope.CLUSTER`. If NiFi is run in a cluster, this 
Scope provides important information
+to the framework about how the operation should occur.
 
 If state as stored using `Scope.CLUSTER`, then all nodes in the cluster will 
be communicating with the same
-state storage mechanism, as if all nodes were to share a single ConcurrentMap. 
If state is stored and retrieved using
-`Scope.NODE`, then each node will see a different representation of the state.
+state storage mechanism. If state is stored and retrieved using `Scope.NODE`, 
then each node will see a different
+representation of the state.
 
 It is also worth noting that if NiFi is configured to run as a standalone 
instance, rather than running in a cluster,
 a scope of `Scope.NODE` is always used. This is done in order to allow the 
developer of a NiFi component to write the code
@@ -593,13 +601,12 @@ that the instance is clustered and write the code 
accordingly.
 
 ==== Storing and Retrieving State
 
-State is stored using the StateManager's `set`, `replace`, `putIfAbsent`, 
`remove`, and `clear` methods. All of these methods,
-with the exception of `clear` take as the first argument the key to be set. 
The key that is used is unique only to the same
-instance of the component and for the same Scope. That is, if two Processors 
store a value using the key _My Key_, those Processors
-will not conflict with each other, even if both Processors are of the same 
type (e.g., both are of type ListFile) and scope. Furthermore,
-if a Processor stores a value with the key of _My Key_ using the 
`Scope.CLUSTER` scope, and then attempts to retrieve the value
-using the `Scope.NODE` scope, the value retrieved will be `null`. Each 
Processor's state, then, is stored in isolation from other
-Processors' state. A unique key can be thought of as a triple of <Processor 
Instance, Key, Scope>.
+State is stored using the StateManager's `getState`, `setState`, `replace`, 
and `clear` methods. All of these methods
+require that a Scope be provided. It should be noted that the state that is 
stored with the Local scope is entirely different
+than state stored with a Cluster scope. If a Processor stores a value with the 
key of _My Key_ using the `Scope.CLUSTER` scope,
+and then attempts to retrieve the value using the `Scope.NODE` scope, the 
value retrieved will be `null` (unless a value was
+also stored with the same key using the `Scope.CLUSTER` scope). Each 
Processor's state, is stored in isolation from other
+Processors' state.
 
 It follows, then, that two Processors cannot share the same state. There are, 
however, some circumstances in which it is very
 necessary to share state between two Processors of different types, or two 
Processors of the same type. This can be accomplished
@@ -615,13 +622,9 @@ defined by the `StateManager` interface, that help 
developers to more easily dev
 
 First, the `MockStateManager` implements the `StateManager` interface, so all 
of the state can be examined from within a unit test.
 Additionally, the `MockStateManager` exposes a handful of `assert*` methods to 
perform assertions that the State is set as expected.
-There are times, however, that state could be updated multiple times during 
the run of a single invocation of a Processor's `onTrigger`
-method. In this case, inspecting the values after running the Processor may 
not be sufficient. Additionally, we must always remember at each
-step to check the value of the stored state, which can become error-prone and 
burdensome for the developer.
+The `MockStateManager` also provides the ability to indicate that the unit 
test should immediately fail if state is updated for a particular
+`Scope`.
 
-For these reasons, the `MockStateManager` provides an additional method, named 
`failIfStateSet`. This method instructs the State Manager that
-the unit test should immediately fail if the state for a given key is ever 
set, or if it is set to a specific value. The `doNotFailIfStateSet`
-method can then be used to instruct the Mock Framework to clear this state and 
allow state to be set to any value.
 
 
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/a7b97419/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/providers/zookeeper/ZooKeeperStateProvider.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/providers/zookeeper/ZooKeeperStateProvider.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/providers/zookeeper/ZooKeeperStateProvider.java
index acc0bc3..3a8a8e4 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/providers/zookeeper/ZooKeeperStateProvider.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/providers/zookeeper/ZooKeeperStateProvider.java
@@ -40,6 +40,7 @@ import org.apache.nifi.components.Validator;
 import org.apache.nifi.components.state.Scope;
 import org.apache.nifi.components.state.StateMap;
 import org.apache.nifi.components.state.StateProviderInitializationContext;
+import org.apache.nifi.components.state.exception.StateTooLargeException;
 import org.apache.nifi.controller.state.StandardStateMap;
 import org.apache.nifi.controller.state.providers.AbstractStateProvider;
 import org.apache.nifi.processor.util.StandardValidators;
@@ -62,6 +63,8 @@ import org.apache.zookeeper.data.Stat;
  * consistency across configuration interactions.
  */
 public class ZooKeeperStateProvider extends AbstractStateProvider {
+    private static final int ONE_MB = 1024 * 1024;
+
     static final AllowableValue OPEN_TO_WORLD = new AllowableValue("Open", 
"Open", "ZNodes will be open to any ZooKeeper client.");
     static final AllowableValue CREATOR_ONLY = new 
AllowableValue("CreatorOnly", "CreatorOnly",
         "ZNodes will be accessible only by the creator. The creator will have 
full access to create, read, write, delete, and administer the ZNodes.");
@@ -343,6 +346,7 @@ public class ZooKeeperStateProvider extends 
AbstractStateProvider {
      *
      * @throws IOException if unable to communicate with ZooKeeper
      * @throws NoNodeException if the corresponding ZNode does not exist in 
ZooKeeper and allowNodeCreation is set to <code>false</code>
+     * @throws StateTooLargeException if the state to be stored exceeds the 
maximum size allowed by ZooKeeper (1 MB, after serialization)
      */
     private void setState(final Map<String, String> stateValues, final int 
version, final String componentId, final boolean allowNodeCreation) throws 
IOException, NoNodeException {
         verifyEnabled();
@@ -350,13 +354,18 @@ public class ZooKeeperStateProvider extends 
AbstractStateProvider {
         try {
             final String path = getComponentPath(componentId);
             final byte[] data = serialize(stateValues);
+            if (data.length > ONE_MB) {
+                throw new StateTooLargeException("Failed to set cluster-wide 
state in ZooKeeper for component with ID " + componentId
+                    + " because the state had " + stateValues.size() + " 
values, which serialized to " + data.length
+                    + " bytes, and the maximum allowed by ZooKeeper is 1 MB (" 
+ ONE_MB + " bytes)");
+            }
 
             final ZooKeeper keeper = getZooKeeper();
             try {
                 keeper.setData(path, data, version);
             } catch (final NoNodeException nne) {
                 if (allowNodeCreation) {
-                    createNode(path, data);
+                    createNode(path, data, componentId, stateValues);
                     return;
                 } else {
                     throw nne;
@@ -379,14 +388,22 @@ public class ZooKeeperStateProvider extends 
AbstractStateProvider {
             }
 
             throw new IOException("Failed to set cluster-wide state in 
ZooKeeper for component with ID " + componentId, ke);
+        } catch (final StateTooLargeException stle) {
+            throw stle;
         } catch (final IOException ioe) {
             throw new IOException("Failed to set cluster-wide state in 
ZooKeeper for component with ID " + componentId, ioe);
         }
     }
 
 
-    private void createNode(final String path, final byte[] data) throws 
IOException, KeeperException {
+    private void createNode(final String path, final byte[] data, final String 
componentId, final Map<String, String> stateValues) throws IOException, 
KeeperException {
         try {
+            if (data != null && data.length > ONE_MB) {
+                throw new StateTooLargeException("Failed to set cluster-wide 
state in ZooKeeper for component with ID " + componentId
+                    + " because the state had " + stateValues.size() + " 
values, which serialized to " + data.length
+                    + " bytes, and the maximum allowed by ZooKeeper is 1 MB (" 
+ ONE_MB + " bytes)");
+            }
+
             getZooKeeper().create(path, data, acl, CreateMode.PERSISTENT);
         } catch (final InterruptedException ie) {
             throw new IOException("Failed to update cluster-wide state due to 
interruption", ie);
@@ -394,13 +411,13 @@ public class ZooKeeperStateProvider extends 
AbstractStateProvider {
             final Code exceptionCode = ke.code();
             if (Code.NONODE == exceptionCode) {
                 final String parentPath = 
StringUtils.substringBeforeLast(path, "/");
-                createNode(parentPath, null);
-                createNode(path, data);
+                createNode(parentPath, null, componentId, stateValues);
+                createNode(path, data, componentId, stateValues);
                 return;
             }
             if (Code.SESSIONEXPIRED == exceptionCode) {
                 invalidateClient();
-                createNode(path, data);
+                createNode(path, data, componentId, stateValues);
                 return;
             }
 
@@ -412,7 +429,7 @@ public class ZooKeeperStateProvider extends 
AbstractStateProvider {
                 } catch (final KeeperException ke1) {
                     // Node no longer exists -- it was removed by someone 
else. Go recreate the node.
                     if (ke1.code() == Code.NONODE) {
-                        createNode(path, data);
+                        createNode(path, data, componentId, stateValues);
                         return;
                     }
                 } catch (final InterruptedException ie) {

http://git-wip-us.apache.org/repos/asf/nifi/blob/a7b97419/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/state/providers/zookeeper/TestZooKeeperStateProvider.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/state/providers/zookeeper/TestZooKeeperStateProvider.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/state/providers/zookeeper/TestZooKeeperStateProvider.java
index 7e03a9c..ef72d6c 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/state/providers/zookeeper/TestZooKeeperStateProvider.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/state/providers/zookeeper/TestZooKeeperStateProvider.java
@@ -34,6 +34,7 @@ import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.components.PropertyValue;
 import org.apache.nifi.components.state.StateProvider;
 import org.apache.nifi.components.state.StateProviderInitializationContext;
+import org.apache.nifi.components.state.exception.StateTooLargeException;
 import org.apache.nifi.controller.state.providers.AbstractTestStateProvider;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.KeeperException.Code;
@@ -202,4 +203,39 @@ public class TestZooKeeperStateProvider extends 
AbstractTestStateProvider {
             authorizedProvider.shutdown();
         }
     }
+
+    @Test
+    public void testStateTooLargeExceptionThrown() {
+        final Map<String, String> state = new HashMap<>();
+        final StringBuilder sb = new StringBuilder();
+
+        // Build a string that is a little less than 64 KB, because that's
+        // the largest value available for DataOutputStream.writeUTF
+        for (int i = 0; i < 6500; i++) {
+            sb.append("0123456789");
+        }
+
+        for (int i = 0; i < 20; i++) {
+            state.put("numbers." + i, sb.toString());
+        }
+
+        try {
+            getProvider().setState(state, componentId);
+            Assert.fail("Expected StateTooLargeException");
+        } catch (final StateTooLargeException stle) {
+            // expected behavior.
+        } catch (final Exception e) {
+            Assert.fail("Expected StateTooLargeException but " + e.getClass() 
+ " was thrown", e);
+        }
+
+        try {
+            getProvider().replace(getProvider().getState(componentId), state, 
componentId);
+            Assert.fail("Expected StateTooLargeException");
+        } catch (final StateTooLargeException stle) {
+            // expected behavior.
+        } catch (final Exception e) {
+            Assert.fail("Expected StateTooLargeException");
+        }
+
+    }
 }

Reply via email to