Repository: nifi
Updated Branches:
  refs/heads/master 5a8b2cf7f -> 8c488d7e8


NIFI-1587: Always poll state from State Manager when running ListHDFS instead 
of relying on local state over the cluster state

Signed-off-by: joewitt <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/8c488d7e
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/8c488d7e
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/8c488d7e

Branch: refs/heads/master
Commit: 8c488d7e8efcc2029079bb33aa3b859ba44b931f
Parents: 5a8b2cf
Author: Mark Payne <[email protected]>
Authored: Fri Mar 4 16:06:42 2016 -0500
Committer: joewitt <[email protected]>
Committed: Sun Mar 13 14:36:36 2016 -0400

----------------------------------------------------------------------
 .../apache/nifi/processors/hadoop/ListHDFS.java | 44 +++++---------------
 .../nifi/processors/hadoop/TestListHDFS.java    |  4 --
 2 files changed, 10 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/8c488d7e/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java
 
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java
index d624e6f..b300b88 100644
--- 
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java
+++ 
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java
@@ -42,8 +42,6 @@ import 
org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.SeeAlso;
 import org.apache.nifi.annotation.documentation.Tags;
 import org.apache.nifi.annotation.lifecycle.OnScheduled;
-import org.apache.nifi.annotation.notification.OnPrimaryNodeStateChange;
-import org.apache.nifi.annotation.notification.PrimaryNodeState;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.components.state.Scope;
 import org.apache.nifi.components.state.StateManager;
@@ -123,7 +121,6 @@ public class ListHDFS extends AbstractHadoopProcessor {
 
     private volatile Long lastListingTime = null;
     private volatile Set<Path> latestPathsListed = new HashSet<>();
-    private volatile boolean electedPrimaryNode = false;
 
     @Override
     protected void init(final ProcessorInitializationContext context) {
@@ -158,12 +155,6 @@ public class ListHDFS extends AbstractHadoopProcessor {
         return getIdentifier() + ".lastListingTime." + directory;
     }
 
-    @OnPrimaryNodeStateChange
-    public void onPrimaryNodeChange(final PrimaryNodeState newState) {
-        if ( newState == PrimaryNodeState.ELECTED_PRIMARY_NODE ) {
-            electedPrimaryNode = true;
-        }
-    }
 
     @Override
     public void onPropertyModified(final PropertyDescriptor descriptor, final 
String oldValue, final String newValue) {
@@ -214,44 +205,27 @@ public class ListHDFS extends AbstractHadoopProcessor {
     }
 
 
-    private Long getMinTimestamp(final String directory, final HDFSListing 
remoteListing) throws IOException {
-        // No cluster-wide state has been recovered. Just use whatever values 
we already have.
-        if (remoteListing == null) {
-            return lastListingTime;
-        }
-
-        // If our local timestamp is already later than the remote listing's 
timestamp, use our local info.
-        Long minTimestamp = lastListingTime;
-        if (minTimestamp != null && minTimestamp > 
remoteListing.getLatestTimestamp().getTime()) {
-            return minTimestamp;
-        }
-
-        // Use the remote listing's information.
-        if (minTimestamp == null || electedPrimaryNode) {
-            this.latestPathsListed = remoteListing.toPaths();
-            this.lastListingTime = 
remoteListing.getLatestTimestamp().getTime();
-        }
-
-        return minTimestamp;
-    }
-
     @Override
     public void onTrigger(final ProcessContext context, final ProcessSession 
session) throws ProcessException {
-        final String directory = context.getProperty(DIRECTORY).getValue();
-
         // Ensure that we are using the latest listing information before we 
try to perform a listing of HDFS files.
-        final Long minTimestamp;
+        Long minTimestamp = null;
         try {
             final HDFSListing stateListing;
             final StateMap stateMap = 
context.getStateManager().getState(Scope.CLUSTER);
             if (stateMap.getVersion() == -1L) {
                 stateListing = null;
+                latestPathsListed = new HashSet<>();
+                lastListingTime = null;
             } else {
                 final Map<String, String> stateValues = stateMap.toMap();
                 stateListing = HDFSListing.fromMap(stateValues);
+
+                if (stateListing != null) {
+                    latestPathsListed = stateListing.toPaths();
+                    lastListingTime = minTimestamp = 
stateListing.getLatestTimestamp().getTime();
+                }
             }
 
-            minTimestamp = getMinTimestamp(directory, stateListing);
         } catch (final IOException ioe) {
             getLogger().error("Failed to retrieve timestamp of last listing 
from Distributed Cache Service. Will not perform listing until this is 
accomplished.");
             context.yield();
@@ -260,6 +234,7 @@ public class ListHDFS extends AbstractHadoopProcessor {
 
         // Pull in any file that is newer than the timestamp that we have.
         final FileSystem hdfs = getFileSystem();
+        final String directory = context.getProperty(DIRECTORY).getValue();
         final boolean recursive = 
context.getProperty(RECURSE_SUBDIRS).asBoolean();
         final Path rootPath = new Path(directory);
 
@@ -339,6 +314,7 @@ public class ListHDFS extends AbstractHadoopProcessor {
     private Set<FileStatus> getStatuses(final Path path, final boolean 
recursive, final FileSystem hdfs) throws IOException {
         final Set<FileStatus> statusSet = new HashSet<>();
 
+        getLogger().debug("Fetching listing for {}", new Object[] {path});
         final FileStatus[] statuses = hdfs.listStatus(path);
 
         for ( final FileStatus status : statuses ) {

http://git-wip-us.apache.org/repos/asf/nifi/blob/8c488d7e/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestListHDFS.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestListHDFS.java
 
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestListHDFS.java
index add89e8..6c944c8 100644
--- 
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestListHDFS.java
+++ 
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestListHDFS.java
@@ -40,7 +40,6 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.util.Progressable;
-import org.apache.nifi.annotation.notification.PrimaryNodeState;
 import org.apache.nifi.components.state.Scope;
 import org.apache.nifi.controller.AbstractControllerService;
 import org.apache.nifi.distributed.cache.client.Deserializer;
@@ -148,9 +147,6 @@ public class TestListHDFS {
         // add new file to pull
         proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, 
false, 1, 1L, 1999L, 0L, create777(), "owner", "group", new 
Path("/test/testFile2.txt")));
 
-        // trigger primary node change
-        proc.onPrimaryNodeChange(PrimaryNodeState.ELECTED_PRIMARY_NODE);
-
         // cause calls to service to fail
         service.failOnCalls = true;
 

Reply via email to