NIFI-259: Bug fixes

Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/0cd6f80f
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/0cd6f80f
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/0cd6f80f

Branch: refs/heads/master
Commit: 0cd6f80f3624f4d46af63f444c3a0175021d2891
Parents: 06f525b
Author: Mark Payne <marka...@hotmail.com>
Authored: Wed Jan 13 15:11:53 2016 -0500
Committer: Mark Payne <marka...@hotmail.com>
Committed: Wed Jan 13 15:11:53 2016 -0500

----------------------------------------------------------------------
 .../src/main/asciidoc/administration-guide.adoc  |  2 ++
 .../zookeeper/ZooKeeperStateProvider.java        |  3 +++
 .../standard/AbstractListProcessor.java          | 19 ++++++++++---------
 .../nifi/processors/standard/TailFile.java       |  1 +
 4 files changed, 16 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/0cd6f80f/nifi-docs/src/main/asciidoc/administration-guide.adoc
----------------------------------------------------------------------
diff --git a/nifi-docs/src/main/asciidoc/administration-guide.adoc 
b/nifi-docs/src/main/asciidoc/administration-guide.adoc
index 23327e2..8945600 100644
--- a/nifi-docs/src/main/asciidoc/administration-guide.adoc
+++ b/nifi-docs/src/main/asciidoc/administration-guide.adoc
@@ -553,6 +553,8 @@ Note, the above `kinit` command requires that Kerberos 
client libraries be insta
 [source]
 yum install krb5-workstation krb5-libs krb5-auth-dialog
 
+Once this is complete, the /etc/krb5.conf will need to be configured 
appropriately for your organization's Kerberos envrionment.
+
 Now, when we start NiFi, it will use Kerberos to authentication as the `nifi` 
user when communicating with ZooKeeper.
 
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/0cd6f80f/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/providers/zookeeper/ZooKeeperStateProvider.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/providers/zookeeper/ZooKeeperStateProvider.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/providers/zookeeper/ZooKeeperStateProvider.java
index faa0364..984a229 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/providers/zookeeper/ZooKeeperStateProvider.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/providers/zookeeper/ZooKeeperStateProvider.java
@@ -310,6 +310,9 @@ public class ZooKeeperStateProvider extends 
AbstractStateProvider {
                 invalidateClient();
                 setState(stateValues, version, componentId);
             }
+            if (Code.NODEEXISTS == ke.code()) {
+                setState(stateValues, version, componentId);
+            }
 
             throw new IOException("Failed to set cluster-wide state in 
ZooKeeper for component with ID " + componentId, ke);
         } catch (final IOException ioe) {

http://git-wip-us.apache.org/repos/asf/nifi/blob/0cd6f80f/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractListProcessor.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractListProcessor.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractListProcessor.java
index 494f227..fc19ad7 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractListProcessor.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractListProcessor.java
@@ -209,12 +209,12 @@ public abstract class AbstractListProcessor<T extends 
ListableEntity> extends Ab
         final DistributedMapCacheClient client = 
context.getProperty(DISTRIBUTED_CACHE_SERVICE).asControllerService(DistributedMapCacheClient.class);
 
         // Check if state already exists for this path. If so, we have already 
migrated the state.
-        final StateMap stateMap = 
context.getStateManager().getState(Scope.CLUSTER);
+        final StateMap stateMap = 
context.getStateManager().getState(getStateScope(context));
         if (stateMap.getVersion() == -1L) {
             try {
                 // Migrate state from the old way of managing state 
(distributed cache service and local file)
                 // to the new mechanism (State Manager).
-                migrateState(path, client, context.getStateManager());
+                migrateState(path, client, context.getStateManager(), 
getStateScope(context));
             } catch (final IOException ioe) {
                 throw new IOException("Failed to properly migrate state to 
State Manager", ioe);
             }
@@ -237,7 +237,7 @@ public abstract class AbstractListProcessor<T extends 
ListableEntity> extends Ab
         }
 
         if (resetListing) {
-            context.getStateManager().clear(Scope.CLUSTER);
+            context.getStateManager().clear(getStateScope(context));
             resetListing = false;
         }
     }
@@ -250,9 +250,10 @@ public abstract class AbstractListProcessor<T extends 
ListableEntity> extends Ab
      * @param path the path to migrate state for
      * @param client the DistributedMapCacheClient that is capable of 
obtaining the current state
      * @param stateManager the StateManager to use in order to store the new 
state
+     * @param scope the scope to use
      * @throws IOException if unable to retrieve or store the state
      */
-    private void migrateState(final String path, final 
DistributedMapCacheClient client, final StateManager stateManager) throws 
IOException {
+    private void migrateState(final String path, final 
DistributedMapCacheClient client, final StateManager stateManager, final Scope 
scope) throws IOException {
         Long minTimestamp = null;
         final Set<String> latestIdentifiersListed = new HashSet<>();
 
@@ -289,11 +290,11 @@ public abstract class AbstractListProcessor<T extends 
ListableEntity> extends Ab
         }
 
         if (minTimestamp != null) {
-            persist(minTimestamp, latestIdentifiersListed, stateManager);
+            persist(minTimestamp, latestIdentifiersListed, stateManager, 
scope);
         }
     }
 
-    private void persist(final long timestamp, final Collection<String> 
identifiers, final StateManager stateManager) throws IOException {
+    private void persist(final long timestamp, final Collection<String> 
identifiers, final StateManager stateManager, final Scope scope) throws 
IOException {
         final Map<String, String> updatedState = new 
HashMap<>(identifiers.size() + 1);
         updatedState.put(TIMESTAMP, String.valueOf(timestamp));
         int counter = 0;
@@ -301,7 +302,7 @@ public abstract class AbstractListProcessor<T extends 
ListableEntity> extends Ab
             final String index = String.valueOf(++counter);
             updatedState.put(IDENTIFIER_PREFIX + "." + index, identifier);
         }
-        stateManager.setState(updatedState, Scope.CLUSTER);
+        stateManager.setState(updatedState, scope);
     }
 
     protected String getKey(final String directory) {
@@ -322,7 +323,7 @@ public abstract class AbstractListProcessor<T extends 
ListableEntity> extends Ab
             // We need to fetch the state from the cluster if we don't yet 
know the last listing time,
             // or if we were just elected the primary node
             if (this.lastListingTime == null || electedPrimaryNode) {
-                final StateMap stateMap = 
context.getStateManager().getState(Scope.CLUSTER);
+                final StateMap stateMap = 
context.getStateManager().getState(getStateScope(context));
                 final Map<String, String> stateValues = stateMap.toMap();
                 final String timestamp = stateValues.get(TIMESTAMP);
 
@@ -409,7 +410,7 @@ public abstract class AbstractListProcessor<T extends 
ListableEntity> extends Ab
                 for (final T entity : newEntries) {
                     identifiers.add(entity.getIdentifier());
                 }
-                persist(latestListingTimestamp, identifiers, 
context.getStateManager());
+                persist(latestListingTimestamp, identifiers, 
context.getStateManager(), getStateScope(context));
             } catch (final IOException ioe) {
                 getLogger().warn("Unable to save state due to {}. If NiFi is 
restarted before state is saved, or "
                     + "if another node begins executing this Processor, data 
duplication may occur.", ioe);

http://git-wip-us.apache.org/repos/asf/nifi/blob/0cd6f80f/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TailFile.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TailFile.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TailFile.java
index f992978..b128366 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TailFile.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TailFile.java
@@ -150,6 +150,7 @@ public class TailFile extends AbstractProcessor {
         properties.add(ROLLING_FILENAME_PATTERN);
         properties.add(new 
PropertyDescriptor.Builder().fromPropertyDescriptor(STATE_FILE).defaultValue("./conf/state/"
 + getIdentifier()).build());
         properties.add(START_POSITION);
+        properties.add(FILE_LOCATION);
         return properties;
     }
 

Reply via email to