Repository: nifi
Updated Branches:
  refs/heads/NIFI-259 774c29a4d -> d39067ede


NIFI-259: Updated GetHBase to use new State Management; bug fixes; updated docs


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/bbce596d
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/bbce596d
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/bbce596d

Branch: refs/heads/NIFI-259
Commit: bbce596d741d8c660291bc8c788c44173c0bb39c
Parents: 774c29a
Author: Mark Payne <[email protected]>
Authored: Wed Jan 13 12:47:08 2016 -0500
Committer: Mark Payne <[email protected]>
Committed: Wed Jan 13 12:47:08 2016 -0500

----------------------------------------------------------------------
 .../src/main/asciidoc/administration-guide.adoc |  22 +-
 .../zookeeper/ZooKeeperStateProvider.java       |   6 +-
 .../apache/nifi/processors/hadoop/ListHDFS.java |   6 +-
 .../java/org/apache/nifi/hbase/GetHBase.java    | 221 +++++++++++++------
 .../org/apache/nifi/hbase/TestGetHBase.java     |  93 +++++---
 .../standard/AbstractListProcessor.java         |   2 +-
 6 files changed, 248 insertions(+), 102 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/bbce596d/nifi-docs/src/main/asciidoc/administration-guide.adoc
----------------------------------------------------------------------
diff --git a/nifi-docs/src/main/asciidoc/administration-guide.adoc 
b/nifi-docs/src/main/asciidoc/administration-guide.adoc
index b24ec66..4f30686 100644
--- a/nifi-docs/src/main/asciidoc/administration-guide.adoc
+++ b/nifi-docs/src/main/asciidoc/administration-guide.adoc
@@ -443,7 +443,27 @@ in _nifi.properties_ also becomes relevant. This specifies 
the ZooKeeper propert
 with the list of ZooKeeper servers. Each of these servers is configured as 
<hostname>:<client port>[:<leader election port>]. For example, 
`myhost:2888:3888`.
 This list of nodes should be the same nodes in the NiFi cluster that have the 
`nifi.state.management.embedded.zookeeper.start`
 property set to `true`. Also note that because ZooKeeper will be listening on 
these ports, the firewall may need to be configured to open these ports
-for incoming traffic, at least between nodes in the cluster.
+for incoming traffic, at least between nodes in the cluster. Additionally, the 
port to listen on for client connections must be opened in the firewall.
+The default value for this is _2181_ but can be configured via the 
_clientPort_ property in the _zookeeper.properties_ file.
+
+When using an embedded ZooKeeper, the _conf/zookeeper.properties_ file has a 
property named `dataDir`. By default, this value is set to `./state/zookeeper`.
+If more than one NiFi node is running an embedded ZooKeeper, it is important 
to tell the server which one it is. This is accomplished by creating a file 
named
+_myid_ and placing it in ZooKeeper's data directory. The contents of this file 
should be index of the server. So for one of the ZooKeeper servers, we will
+accomplish this by performing the following commands:
+
+[source]
+cd $NIFI_HOME
+mkdir state
+mkdir state/zookeeper
+echo 1 > state/zookeeper/myid
+
+For the next NiFi Node that will run ZooKeeper, we can accomplish this by 
performing the following commands:
+cd $NIFI_HOME
+mkdir state
+mkdir state/zookeeper
+echo 2 > state/zookeeper/myid
+
+And so on.
 
 For more information on the properties used to administer ZooKeeper, see the
 link:https://zookeeper.apache.org/doc/current/zookeeperAdmin.html[ZooKeeper 
Admin Guide].

http://git-wip-us.apache.org/repos/asf/nifi/blob/bbce596d/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/providers/zookeeper/ZooKeeperStateProvider.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/providers/zookeeper/ZooKeeperStateProvider.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/providers/zookeeper/ZooKeeperStateProvider.java
index 0b66367..faa0364 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/providers/zookeeper/ZooKeeperStateProvider.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/providers/zookeeper/ZooKeeperStateProvider.java
@@ -102,7 +102,7 @@ public class ZooKeeperStateProvider extends 
AbstractStateProvider {
         .sensitive(true)
         .build();
 
-    private static final int ENCODING_VERSION = 1;
+    private static final byte ENCODING_VERSION = 1;
 
     private ZooKeeper zooKeeper;
 
@@ -251,7 +251,7 @@ public class ZooKeeperStateProvider extends 
AbstractStateProvider {
     private byte[] serialize(final Map<String, String> stateValues) throws 
IOException {
         try (final ByteArrayOutputStream baos = new ByteArrayOutputStream();
             final DataOutputStream dos = new DataOutputStream(baos)) {
-            dos.writeInt(ENCODING_VERSION);
+            dos.writeByte(ENCODING_VERSION);
             dos.writeInt(stateValues.size());
             for (final Map.Entry<String, String> entry : 
stateValues.entrySet()) {
                 dos.writeUTF(entry.getKey());
@@ -265,7 +265,7 @@ public class ZooKeeperStateProvider extends 
AbstractStateProvider {
         try (final ByteArrayInputStream bais = new ByteArrayInputStream(data);
             final DataInputStream dis = new DataInputStream(bais)) {
 
-            final int encodingVersion = dis.readInt();
+            final byte encodingVersion = dis.readByte();
             if (encodingVersion > ENCODING_VERSION) {
                 throw new IOException("Retrieved a response from ZooKeeper 
when retrieving state for component with ID " + componentId
                     + ", but the response was encoded using the 
ZooKeeperStateProvider Encoding Version of " + encodingVersion

http://git-wip-us.apache.org/repos/asf/nifi/blob/bbce596d/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java
 
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java
index 3645eec..d6c0c4e 100644
--- 
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java
+++ 
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java
@@ -188,7 +188,7 @@ public class ListHDFS extends AbstractHadoopProcessor {
         if (stateMap.getVersion() == -1L) {
             final HDFSListing serviceListing = getListingFromService(context);
             if (serviceListing != null) {
-                persistState(serviceListing, context.getStateManager());
+                context.getStateManager().setState(serviceListing.toMap(), 
Scope.CLUSTER);
             }
         }
     }
@@ -213,10 +213,6 @@ public class ListHDFS extends AbstractHadoopProcessor {
         }
     }
 
-    private void persistState(final HDFSListing listing, final StateManager 
stateManager) throws IOException {
-        final Map<String, String> stateValues = listing.toMap();
-        stateManager.setState(stateValues, Scope.CLUSTER);
-    }
 
     private Long getMinTimestamp(final String directory, final HDFSListing 
remoteListing) throws IOException {
         // No cluster-wide state has been recovered. Just use whatever values 
we already have.

http://git-wip-us.apache.org/repos/asf/nifi/blob/bbce596d/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/GetHBase.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/GetHBase.java
 
b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/GetHBase.java
index 98a612c..65b261a 100644
--- 
a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/GetHBase.java
+++ 
b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/GetHBase.java
@@ -16,8 +16,30 @@
  */
 package org.apache.nifi.hbase;
 
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.ObjectInputStream;
+import java.io.OutputStream;
+import java.io.Serializable;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
 import org.apache.commons.lang3.StringUtils;
 import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.Stateful;
 import org.apache.nifi.annotation.behavior.TriggerSerially;
 import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
 import org.apache.nifi.annotation.behavior.WritesAttribute;
@@ -32,6 +54,9 @@ import org.apache.nifi.components.AllowableValue;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.components.ValidationContext;
 import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.components.state.StateManager;
+import org.apache.nifi.components.state.StateMap;
 import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.hbase.io.JsonRowSerializer;
@@ -50,28 +75,6 @@ import org.apache.nifi.processor.io.OutputStreamCallback;
 import org.apache.nifi.processor.util.StandardValidators;
 import org.apache.nifi.util.ObjectHolder;
 
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-import java.io.OutputStream;
-import java.io.Serializable;
-import java.nio.charset.Charset;
-import java.nio.charset.StandardCharsets;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.regex.Pattern;
-
 @TriggerWhenEmpty
 @TriggerSerially
 @InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
@@ -84,6 +87,9 @@ import java.util.regex.Pattern;
     @WritesAttribute(attribute = "hbase.table", description = "The name of the 
HBase table that the data was pulled from"),
     @WritesAttribute(attribute = "mime.type", description = "Set to 
application/json to indicate that output is JSON")
 })
+@Stateful(scopes = Scope.CLUSTER, description = "After performing a fetching 
from HBase, stores a timestamp of the last-modified cell that was found. In 
addition, it stores the ID of the row(s) "
+    + "and the value of each cell that has that timestamp as its modification 
date. This is stored across the cluster and allows the next fetch to avoid 
duplicating data, even if this Processor is "
+    + "run on Primary Node only and the Primary Node changes.")
 public class GetHBase extends AbstractProcessor {
 
     static final Pattern COLUMNS_PATTERN = 
Pattern.compile("\\w+(:\\w+)?(?:,\\w+(:\\w+)?)*");
@@ -101,7 +107,7 @@ public class GetHBase extends AbstractProcessor {
             .name("Distributed Cache Service")
             .description("Specifies the Controller Service that should be used 
to maintain state about what has been pulled from HBase" +
                     " so that if a new node begins pulling data, it won't 
duplicate all of the work that has been done.")
-            .required(true)
+        .required(false)
             .identifiesControllerService(DistributedMapCacheClient.class)
             .build();
     static final PropertyDescriptor CHARSET = new PropertyDescriptor.Builder()
@@ -197,7 +203,20 @@ public class GetHBase extends AbstractProcessor {
     }
 
     @OnScheduled
-    public void parseColumns(final ProcessContext context) {
+    public void parseColumns(final ProcessContext context) throws IOException {
+        final StateMap stateMap = 
context.getStateManager().getState(Scope.CLUSTER);
+        if (stateMap.getVersion() < 0) {
+            // no state has been stored in the State Manager - check if we 
have state stored in the
+            // DistributedMapCacheClient service and migrate it if so
+            final DistributedMapCacheClient client = 
context.getProperty(DISTRIBUTED_CACHE_SERVICE).asControllerService(DistributedMapCacheClient.class);
+            final ScanResult scanResult = getState(client);
+            if (scanResult != null) {
+                storeState(scanResult, context.getStateManager());
+            }
+
+            clearState(client);
+        }
+
         final String columnsValue = context.getProperty(COLUMNS).getValue();
         final String[] columns = (columnsValue == null || 
columnsValue.isEmpty() ? new String[0] : columnsValue.split(","));
 
@@ -225,7 +244,9 @@ public class GetHBase extends AbstractProcessor {
     @OnRemoved
     public void onRemoved(final ProcessContext context) {
         final DistributedMapCacheClient client = 
context.getProperty(DISTRIBUTED_CACHE_SERVICE).asControllerService(DistributedMapCacheClient.class);
-        clearState(client);
+        if (client != null) {
+            clearState(client);
+        }
     }
 
     @Override
@@ -234,11 +255,14 @@ public class GetHBase extends AbstractProcessor {
         final String initialTimeRange = 
context.getProperty(INITIAL_TIMERANGE).getValue();
         final String filterExpression = 
context.getProperty(FILTER_EXPRESSION).getValue();
         final HBaseClientService hBaseClientService = 
context.getProperty(HBASE_CLIENT_SERVICE).asControllerService(HBaseClientService.class);
-        final DistributedMapCacheClient client = 
context.getProperty(DISTRIBUTED_CACHE_SERVICE).asControllerService(DistributedMapCacheClient.class);
 
         // if the table was changed then remove any previous state
         if (previousTable != null && !tableName.equals(previousTable)) {
-            clearState(client);
+            try {
+                context.getStateManager().clear(Scope.CLUSTER);
+            } catch (final IOException ioe) {
+                getLogger().warn("Failed to clear Cluster State", ioe);
+            }
             previousTable = tableName;
         }
 
@@ -246,7 +270,7 @@ public class GetHBase extends AbstractProcessor {
             final Charset charset = 
Charset.forName(context.getProperty(CHARSET).getValue());
             final RowSerializer serializer = new JsonRowSerializer(charset);
 
-            this.lastResult = getState(client);
+            this.lastResult = getState(context.getStateManager());
             final long defaultMinTime = 
(initialTimeRange.equals(NONE.getValue()) ? 0L : System.currentTimeMillis());
             final long minTime = (lastResult == null ? defaultMinTime : 
lastResult.getTimestamp());
 
@@ -388,8 +412,7 @@ public class GetHBase extends AbstractProcessor {
             }
 
             // save state to local storage and to distributed cache
-            persistState(client, lastResult);
-
+            storeState(lastResult, context.getStateManager());
         } catch (final IOException e) {
             getLogger().error("Failed to receive data from HBase due to {}", 
e);
             session.rollback();
@@ -421,27 +444,11 @@ public class GetHBase extends AbstractProcessor {
         return columns;
     }
 
-    private void persistState(final DistributedMapCacheClient client, final 
ScanResult scanResult) {
-        final File stateDir = getStateDir();
-        if (!stateDir.exists()) {
-            stateDir.mkdirs();
-        }
-
-        final File file = getStateFile();
-        try (final OutputStream fos = new FileOutputStream(file);
-                final ObjectOutputStream oos = new ObjectOutputStream(fos)) {
-            oos.writeObject(scanResult);
-        } catch (final IOException ioe) {
-            getLogger().warn("Unable to save state locally. If the node is 
restarted now, data may be duplicated. Failure is due to {}", ioe);
-        }
-
-        try {
-            client.put(getKey(), scanResult, new StringSerDe(), new 
ObjectSerDe());
-        } catch (final IOException ioe) {
-            getLogger().warn("Unable to communicate with distributed cache 
server due to {}. Persisting state locally instead.", ioe);
-        }
+    private void storeState(final ScanResult scanResult, final StateManager 
stateManager) throws IOException {
+        stateManager.setState(scanResult.toFlatMap(), Scope.CLUSTER);
     }
 
+
     private void clearState(final DistributedMapCacheClient client) {
         final File localState = getStateFile();
         if (localState.exists()) {
@@ -455,6 +462,16 @@ public class GetHBase extends AbstractProcessor {
         }
     }
 
+
+    private ScanResult getState(final StateManager stateManager) throws 
IOException {
+        final StateMap stateMap = stateManager.getState(Scope.CLUSTER);
+        if (stateMap.getVersion() < 0) {
+            return null;
+        }
+
+        return ScanResult.fromFlatMap(stateMap.toMap());
+    }
+
     private ScanResult getState(final DistributedMapCacheClient client) throws 
IOException {
         final StringSerDe stringSerDe = new StringSerDe();
         final ObjectSerDe objectSerDe = new ObjectSerDe();
@@ -462,12 +479,14 @@ public class GetHBase extends AbstractProcessor {
         ScanResult scanResult = lastResult;
         // if we have no previous result, or we just became primary, pull from 
distributed cache
         if (scanResult == null || electedPrimaryNode) {
-            final Object obj = client.get(getKey(), stringSerDe, objectSerDe);
-            if (obj == null || !(obj instanceof ScanResult)) {
-                scanResult = null;
-            } else {
-                scanResult = (ScanResult) obj;
-                getLogger().debug("Retrieved state from the distributed cache, 
previous timestamp was {}" , new Object[] {scanResult.getTimestamp()});
+            if (client != null) {
+                final Object obj = client.get(getKey(), stringSerDe, 
objectSerDe);
+                if (obj == null || !(obj instanceof ScanResult)) {
+                    scanResult = null;
+                } else {
+                    scanResult = (ScanResult) obj;
+                    getLogger().debug("Retrieved state from the distributed 
cache, previous timestamp was {}", new Object[] {scanResult.getTimestamp()});
+                }
             }
 
             // no requirement to pull an update from the distributed cache 
anymore.
@@ -487,16 +506,6 @@ public class GetHBase extends AbstractProcessor {
                     if (scanResult == null || localScanResult.getTimestamp() > 
scanResult.getTimestamp()) {
                         scanResult = localScanResult;
                         getLogger().debug("Using last timestamp from local 
state because it was newer than the distributed cache, or no value existed in 
the cache");
-
-                        // Our local persistence file shows a later time than 
the Distributed service.
-                        // Update the distributed service to match our local 
state.
-                        try {
-                            client.put(getKey(), localScanResult, stringSerDe, 
objectSerDe);
-                        } catch (final IOException ioe) {
-                            getLogger().warn("Local timestamp is {}, which is 
later than Distributed state but failed to update Distributed "
-                                            + "state due to {}. If a new node 
performs GetHBase Listing, data duplication may occur",
-                                    new Object[] 
{localScanResult.getTimestamp(), ioe});
-                        }
                     }
                 }
             } catch (final IOException | ClassNotFoundException ioe) {
@@ -514,6 +523,13 @@ public class GetHBase extends AbstractProcessor {
         private final long latestTimestamp;
         private final Map<String, Set<String>> matchingCellHashes;
 
+        private static final Pattern CELL_ID_PATTERN = 
Pattern.compile(Pattern.quote(StateKeys.ROW_ID_PREFIX) + "(\\d+)(\\.(\\d+))?");
+
+        public static class StateKeys {
+            public static final String TIMESTAMP = "timestamp";
+            public static final String ROW_ID_PREFIX = "row.";
+        }
+
         public ScanResult(final long timestamp, final Map<String, Set<String>> 
cellHashes) {
             latestTimestamp = timestamp;
             matchingCellHashes = cellHashes;
@@ -543,6 +559,81 @@ public class GetHBase extends AbstractProcessor {
             final String cellHash = new String(cellValue, 
StandardCharsets.UTF_8);
             return cellHashes.contains(cellHash);
         }
+
+        public Map<String, String> toFlatMap() {
+            final Map<String, String> map = new HashMap<>();
+            map.put(StateKeys.TIMESTAMP, String.valueOf(latestTimestamp));
+
+            int rowCounter = 0;
+            for (final Map.Entry<String, Set<String>> entry : 
matchingCellHashes.entrySet()) {
+                final String rowId = entry.getKey();
+
+                final String rowIdKey = StateKeys.ROW_ID_PREFIX + rowCounter;
+                final String cellKeyPrefix = rowIdKey + ".";
+                map.put(rowIdKey, rowId);
+
+                final Set<String> cellValues = entry.getValue();
+                int cellCounter = 0;
+                for (final String cellValue : cellValues) {
+                    final String cellId = cellKeyPrefix + (cellCounter++);
+                    map.put(cellId, cellValue);
+                }
+
+                rowCounter++;
+            }
+
+            return map;
+        }
+
+        public static ScanResult fromFlatMap(final Map<String, String> map) {
+            if (map == null) {
+                return null;
+            }
+
+            final String timestampValue = map.get(StateKeys.TIMESTAMP);
+            if (timestampValue == null) {
+                return null;
+            }
+
+            final long timestamp = Long.parseLong(timestampValue);
+            final Map<String, Set<String>> rowIndexToMatchingCellHashes = new 
HashMap<>();
+            final Map<String, String> rowIndexToId = new HashMap<>();
+
+            for (final Map.Entry<String, String> entry : map.entrySet()) {
+                final String key = entry.getKey();
+                final Matcher matcher = CELL_ID_PATTERN.matcher(key);
+                if (!matcher.matches()) {
+                    // if it's not a valid key, move on.
+                    continue;
+                }
+
+                final String rowIndex = matcher.group(1);
+                final String cellIndex = matcher.group(3);
+
+                Set<String> cellHashes = 
rowIndexToMatchingCellHashes.get(rowIndex);
+                if (cellHashes == null) {
+                    cellHashes = new HashSet<>();
+                    rowIndexToMatchingCellHashes.put(rowIndex, cellHashes);
+                }
+
+                if (cellIndex == null) {
+                    // this provides a Row ID.
+                    rowIndexToId.put(rowIndex, entry.getValue());
+                } else {
+                    cellHashes.add(entry.getValue());
+                }
+            }
+
+            final Map<String, Set<String>> matchingCellHashes = new 
HashMap<>(rowIndexToMatchingCellHashes.size());
+            for (final Map.Entry<String, Set<String>> entry : 
rowIndexToMatchingCellHashes.entrySet()) {
+                final String rowIndex = entry.getKey();
+                final String rowId = rowIndexToId.get(rowIndex);
+                final Set<String> cellValues = entry.getValue();
+                matchingCellHashes.put(rowId, cellValues);
+            }
+
+            return new ScanResult(timestamp, matchingCellHashes);
+        }
     }
 
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/bbce596d/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestGetHBase.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestGetHBase.java
 
b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestGetHBase.java
index 92f42f2..8f6d890 100644
--- 
a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestGetHBase.java
+++ 
b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestGetHBase.java
@@ -16,11 +16,31 @@
  */
 package org.apache.nifi.hbase;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotSame;
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.charset.CharacterCodingException;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
 import org.apache.nifi.annotation.notification.PrimaryNodeState;
+import org.apache.nifi.components.state.Scope;
 import org.apache.nifi.controller.AbstractControllerService;
 import org.apache.nifi.distributed.cache.client.Deserializer;
 import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient;
 import org.apache.nifi.distributed.cache.client.Serializer;
+import org.apache.nifi.hbase.GetHBase.ScanResult;
 import org.apache.nifi.hbase.scan.Column;
 import org.apache.nifi.hbase.util.StringSerDe;
 import org.apache.nifi.reporting.InitializationException;
@@ -31,17 +51,6 @@ import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
-import java.io.File;
-import java.io.IOException;
-import java.nio.charset.CharacterCodingException;
-import java.nio.charset.Charset;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-
 public class TestGetHBase {
 
     private TestRunner runner;
@@ -148,27 +157,17 @@ public class TestGetHBase {
         hBaseClient.addResult("row4", cells, now + 1);
         runner.run();
         runner.assertAllFlowFilesTransferred(GetHBase.REL_SUCCESS, 5);
+        runner.clearTransferState();
 
         proc = new MockGetHBase(stateFile);
-        final TestRunner newRunner = TestRunners.newTestRunner(proc);
-
-        newRunner.addControllerService("cacheClient", cacheClient);
-        newRunner.enableControllerService(cacheClient);
-
-        newRunner.addControllerService("hbaseClient", hBaseClient);
-        newRunner.enableControllerService(hBaseClient);
-
-        newRunner.setProperty(GetHBase.TABLE_NAME, "nifi");
-        newRunner.setProperty(GetHBase.DISTRIBUTED_CACHE_SERVICE, 
"cacheClient");
-        newRunner.setProperty(GetHBase.HBASE_CLIENT_SERVICE, "hbaseClient");
 
         hBaseClient.addResult("row0", cells, now - 2);
         hBaseClient.addResult("row1", cells, now - 1);
         hBaseClient.addResult("row2", cells, now - 1);
         hBaseClient.addResult("row3", cells, now);
 
-        newRunner.run(100);
-        newRunner.assertAllFlowFilesTransferred(GetHBase.REL_SUCCESS, 0);
+        runner.run(100);
+        runner.assertAllFlowFilesTransferred(GetHBase.REL_SUCCESS, 0);
     }
 
     @Test
@@ -271,8 +270,7 @@ public class TestGetHBase {
         runner.assertAllFlowFilesTransferred(GetHBase.REL_SUCCESS, 4);
 
         // should have a local state file and a cache entry before removing
-        Assert.assertTrue(proc.getStateFile().exists());
-        Assert.assertTrue(cacheClient.containsKey(proc.getKey(), new 
StringSerDe()));
+        runner.getStateManager().assertStateSet(Scope.CLUSTER);
 
         proc.onRemoved(runner.getProcessContext());
 
@@ -331,7 +329,7 @@ public class TestGetHBase {
     }
 
     @Test
-    public void testParseColumns() {
+    public void testParseColumns() throws IOException {
         runner.setProperty(GetHBase.COLUMNS, "cf1,cf2:cq1,cf3");
         proc.parseColumns(runner.getProcessContext());
 
@@ -365,6 +363,47 @@ public class TestGetHBase {
         runner.assertNotValid();
     }
 
+
+    @Test
+    public void testScanResultConvert() {
+        final long timestamp = 14L;
+        final Map<String, Set<String>> cellHashes = new LinkedHashMap<>();
+
+        final Set<String> row1Cells = new HashSet<>();
+        row1Cells.add("hello");
+        row1Cells.add("there");
+        cellHashes.put("abc", row1Cells);
+
+        final Set<String> row2Cells = new HashSet<>();
+        row2Cells.add("good-bye");
+        row2Cells.add("there");
+        cellHashes.put("xyz", row2Cells);
+
+        final ScanResult scanResult = new GetHBase.ScanResult(timestamp, 
cellHashes);
+
+        final Map<String, String> flatMap = scanResult.toFlatMap();
+        assertEquals(7, flatMap.size());
+        assertEquals("abc", flatMap.get("row.0"));
+
+        final String row0Cell0 = flatMap.get("row.0.0");
+        final String row0Cell1 = flatMap.get("row.0.1");
+        assertTrue(row0Cell0.equals("hello") || row0Cell0.equals("there"));
+        assertTrue(row0Cell1.equals("hello") || row0Cell1.equals("there"));
+        assertNotSame(row0Cell0, row0Cell1);
+
+        assertEquals("xyz", flatMap.get("row.1"));
+        final String row1Cell0 = flatMap.get("row.1.0");
+        final String row1Cell1 = flatMap.get("row.1.1");
+        assertTrue(row1Cell0.equals("good-bye") || row1Cell0.equals("there"));
+        assertTrue(row1Cell1.equals("good-bye") || row1Cell1.equals("there"));
+        assertNotSame(row1Cell0, row1Cell1);
+
+        final ScanResult reverted = ScanResult.fromFlatMap(flatMap);
+        assertEquals(timestamp, reverted.getTimestamp());
+        assertEquals(cellHashes, reverted.getMatchingCells());
+    }
+
+
     // Mock processor to override the location of the state file
     private static class MockGetHBase extends GetHBase {
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/bbce596d/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractListProcessor.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractListProcessor.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractListProcessor.java
index c9bf369..efe551f 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractListProcessor.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractListProcessor.java
@@ -402,7 +402,7 @@ public abstract class AbstractListProcessor<T extends 
ListableEntity> extends Ab
                 }
                 persist(latestListingTimestamp, identifiers, 
context.getStateManager());
             } catch (final IOException ioe) {
-                getLogger().warn("Unable to save state due to {}. If NiFi 
restarted before state is saved, or "
+                getLogger().warn("Unable to save state due to {}. If NiFi is 
restarted before state is saved, or "
                     + "if another node begins executing this Processor, data 
duplication may occur.", ioe);
             }
 

Reply via email to