NIFI-259 Corrected some logic in AbstractListProcessor regarding being elected 
primary node and improved readability in GetHBase a smidge


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/29fb9c93
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/29fb9c93
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/29fb9c93

Branch: refs/heads/master
Commit: 29fb9c939368054bd199edca2f3653796e586927
Parents: 6f4c3db
Author: jpercivall <joeperciv...@yahoo.com>
Authored: Thu Feb 4 16:07:18 2016 -0500
Committer: jpercivall <joeperciv...@yahoo.com>
Committed: Thu Feb 4 16:07:18 2016 -0500

----------------------------------------------------------------------
 .../src/main/java/org/apache/nifi/hbase/GetHBase.java | 14 ++++++--------
 .../processors/standard/AbstractListProcessor.java    | 12 ++++++------
 2 files changed, 12 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/29fb9c93/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/GetHBase.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/GetHBase.java
 
b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/GetHBase.java
index 65b261a..fa4d80a 100644
--- 
a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/GetHBase.java
+++ 
b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/GetHBase.java
@@ -107,7 +107,7 @@ public class GetHBase extends AbstractProcessor {
             .name("Distributed Cache Service")
             .description("Specifies the Controller Service that should be used 
to maintain state about what has been pulled from HBase" +
                     " so that if a new node begins pulling data, it won't 
duplicate all of the work that has been done.")
-        .required(false)
+            .required(false)
             .identifiesControllerService(DistributedMapCacheClient.class)
             .build();
     static final PropertyDescriptor CHARSET = new PropertyDescriptor.Builder()
@@ -156,7 +156,7 @@ public class GetHBase extends AbstractProcessor {
 
     private volatile ScanResult lastResult = null;
     private volatile List<Column> columns = new ArrayList<>();
-    private volatile boolean electedPrimaryNode = false;
+    private volatile boolean justElectedPrimaryNode = false;
     private volatile String previousTable = null;
 
     @Override
@@ -236,9 +236,7 @@ public class GetHBase extends AbstractProcessor {
 
     @OnPrimaryNodeStateChange
     public void onPrimaryNodeChange(final PrimaryNodeState newState) {
-        if (newState == PrimaryNodeState.ELECTED_PRIMARY_NODE) {
-            electedPrimaryNode = true;
-        }
+        justElectedPrimaryNode = (newState == 
PrimaryNodeState.ELECTED_PRIMARY_NODE);
     }
 
     @OnRemoved
@@ -411,7 +409,7 @@ public class GetHBase extends AbstractProcessor {
                 lastResult = scanResult;
             }
 
-            // save state to local storage and to distributed cache
+            // save state using the framework's state manager
             storeState(lastResult, context.getStateManager());
         } catch (final IOException e) {
             getLogger().error("Failed to receive data from HBase due to {}", 
e);
@@ -478,7 +476,7 @@ public class GetHBase extends AbstractProcessor {
 
         ScanResult scanResult = lastResult;
         // if we have no previous result, or we just became primary, pull from 
distributed cache
-        if (scanResult == null || electedPrimaryNode) {
+        if (scanResult == null || justElectedPrimaryNode) {
             if (client != null) {
                 final Object obj = client.get(getKey(), stringSerDe, 
objectSerDe);
                 if (obj == null || !(obj instanceof ScanResult)) {
@@ -490,7 +488,7 @@ public class GetHBase extends AbstractProcessor {
             }
 
             // no requirement to pull an update from the distributed cache 
anymore.
-            electedPrimaryNode = false;
+            justElectedPrimaryNode = false;
         }
 
         // Check the persistence file. We want to use the latest timestamp 
that we have so that

http://git-wip-us.apache.org/repos/asf/nifi/blob/29fb9c93/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractListProcessor.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractListProcessor.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractListProcessor.java
index b1c683c..53d0604 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractListProcessor.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractListProcessor.java
@@ -163,7 +163,7 @@ public abstract class AbstractListProcessor<T extends 
ListableEntity> extends Ab
 
     private volatile Set<String> latestIdentifiersListed = new HashSet<>();
     private volatile Long lastListingTime = null;
-    private volatile boolean electedPrimaryNode = false;
+    private volatile boolean justElectedPrimaryNode = false;
     private volatile boolean resetListing = false;
 
     static final String TIMESTAMP = "timestamp";
@@ -198,9 +198,7 @@ public abstract class AbstractListProcessor<T extends 
ListableEntity> extends Ab
 
     @OnPrimaryNodeStateChange
     public void onPrimaryNodeChange(final PrimaryNodeState newState) {
-        if (newState == PrimaryNodeState.ELECTED_PRIMARY_NODE) {
-            electedPrimaryNode = true;
-        }
+        justElectedPrimaryNode = (newState == 
PrimaryNodeState.ELECTED_PRIMARY_NODE);
     }
 
     @OnScheduled
@@ -222,7 +220,7 @@ public abstract class AbstractListProcessor<T extends 
ListableEntity> extends Ab
 
         // delete the local file, since it is no longer needed
         final File localFile = new File(path);
-        if (localFile.exists() && !!localFile.delete()) {
+        if (localFile.exists() && !localFile.delete()) {
             getLogger().warn("Migrated state but failed to delete local 
persistence file");
         }
 
@@ -322,7 +320,7 @@ public abstract class AbstractListProcessor<T extends 
ListableEntity> extends Ab
         try {
             // We need to fetch the state from the cluster if we don't yet 
know the last listing time,
             // or if we were just elected the primary node
-            if (this.lastListingTime == null || electedPrimaryNode) {
+            if (this.lastListingTime == null || justElectedPrimaryNode) {
                 final StateMap stateMap = 
context.getStateManager().getState(getStateScope(context));
                 final Map<String, String> stateValues = stateMap.toMap();
                 final String timestamp = stateValues.get(TIMESTAMP);
@@ -343,6 +341,8 @@ public abstract class AbstractListProcessor<T extends 
ListableEntity> extends Ab
                         latestIdentifiersListed.add(value);
                     }
                 }
+
+                justElectedPrimaryNode = false;
             }
         } catch (final IOException ioe) {
             getLogger().error("Failed to retrieve timestamp of last listing 
from Distributed Cache Service. Will not perform listing until this is 
accomplished.");

Reply via email to