NIFI-259 Corrected some logic in AbstractListProcessor regarding being elected primary node and improved readability in GetHBase a smidge
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/29fb9c93 Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/29fb9c93 Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/29fb9c93 Branch: refs/heads/master Commit: 29fb9c939368054bd199edca2f3653796e586927 Parents: 6f4c3db Author: jpercivall <joeperciv...@yahoo.com> Authored: Thu Feb 4 16:07:18 2016 -0500 Committer: jpercivall <joeperciv...@yahoo.com> Committed: Thu Feb 4 16:07:18 2016 -0500 ---------------------------------------------------------------------- .../src/main/java/org/apache/nifi/hbase/GetHBase.java | 14 ++++++-------- .../processors/standard/AbstractListProcessor.java | 12 ++++++------ 2 files changed, 12 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/29fb9c93/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/GetHBase.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/GetHBase.java b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/GetHBase.java index 65b261a..fa4d80a 100644 --- a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/GetHBase.java +++ b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/GetHBase.java @@ -107,7 +107,7 @@ public class GetHBase extends AbstractProcessor { .name("Distributed Cache Service") .description("Specifies the Controller Service that should be used to maintain state about what has been pulled from HBase" + " so that if a new node begins pulling data, it won't duplicate all of the work that has been done.") - .required(false) + .required(false) .identifiesControllerService(DistributedMapCacheClient.class) .build(); static final PropertyDescriptor CHARSET = new PropertyDescriptor.Builder() @@ -156,7 +156,7 @@ public class GetHBase extends AbstractProcessor { private volatile ScanResult lastResult = null; private volatile List<Column> columns = new ArrayList<>(); - private volatile boolean electedPrimaryNode = false; + private volatile boolean justElectedPrimaryNode = false; private volatile String previousTable = null; @Override @@ -236,9 +236,7 @@ public class GetHBase extends AbstractProcessor { @OnPrimaryNodeStateChange public void onPrimaryNodeChange(final PrimaryNodeState newState) { - if (newState == PrimaryNodeState.ELECTED_PRIMARY_NODE) { - electedPrimaryNode = true; - } + justElectedPrimaryNode = (newState == PrimaryNodeState.ELECTED_PRIMARY_NODE); } @OnRemoved @@ -411,7 +409,7 @@ public class GetHBase extends AbstractProcessor { lastResult = scanResult; } - // save state to local storage and to distributed cache + // save state using the framework's state manager storeState(lastResult, context.getStateManager()); } catch (final IOException e) { getLogger().error("Failed to receive data from HBase due to {}", e); @@ -478,7 +476,7 @@ public class GetHBase extends AbstractProcessor { ScanResult scanResult = lastResult; // if we have no previous result, or we just became primary, pull from distributed cache - if (scanResult == null || electedPrimaryNode) { + if (scanResult == null || justElectedPrimaryNode) { if (client != null) { final Object obj = client.get(getKey(), stringSerDe, objectSerDe); if (obj == null || !(obj instanceof ScanResult)) { @@ -490,7 +488,7 @@ public class GetHBase extends AbstractProcessor { } // no requirement to pull an update from the distributed cache anymore. - electedPrimaryNode = false; + justElectedPrimaryNode = false; } // Check the persistence file. We want to use the latest timestamp that we have so that http://git-wip-us.apache.org/repos/asf/nifi/blob/29fb9c93/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractListProcessor.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractListProcessor.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractListProcessor.java index b1c683c..53d0604 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractListProcessor.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractListProcessor.java @@ -163,7 +163,7 @@ public abstract class AbstractListProcessor<T extends ListableEntity> extends Ab private volatile Set<String> latestIdentifiersListed = new HashSet<>(); private volatile Long lastListingTime = null; - private volatile boolean electedPrimaryNode = false; + private volatile boolean justElectedPrimaryNode = false; private volatile boolean resetListing = false; static final String TIMESTAMP = "timestamp"; @@ -198,9 +198,7 @@ public abstract class AbstractListProcessor<T extends ListableEntity> extends Ab @OnPrimaryNodeStateChange public void onPrimaryNodeChange(final PrimaryNodeState newState) { - if (newState == PrimaryNodeState.ELECTED_PRIMARY_NODE) { - electedPrimaryNode = true; - } + justElectedPrimaryNode = (newState == PrimaryNodeState.ELECTED_PRIMARY_NODE); } @OnScheduled @@ -222,7 +220,7 @@ public abstract class AbstractListProcessor<T extends ListableEntity> extends Ab // delete the local file, since it is no longer needed final File localFile = new File(path); - if (localFile.exists() && !!localFile.delete()) { + if (localFile.exists() && !localFile.delete()) { getLogger().warn("Migrated state but failed to delete local persistence file"); } @@ -322,7 +320,7 @@ public abstract class AbstractListProcessor<T extends ListableEntity> extends Ab try { // We need to fetch the state from the cluster if we don't yet know the last listing time, // or if we were just elected the primary node - if (this.lastListingTime == null || electedPrimaryNode) { + if (this.lastListingTime == null || justElectedPrimaryNode) { final StateMap stateMap = context.getStateManager().getState(getStateScope(context)); final Map<String, String> stateValues = stateMap.toMap(); final String timestamp = stateValues.get(TIMESTAMP); @@ -343,6 +341,8 @@ public abstract class AbstractListProcessor<T extends ListableEntity> extends Ab latestIdentifiersListed.add(value); } } + + justElectedPrimaryNode = false; } } catch (final IOException ioe) { getLogger().error("Failed to retrieve timestamp of last listing from Distributed Cache Service. Will not perform listing until this is accomplished.");