NIFI-259: Bug fixes
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/0cd6f80f Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/0cd6f80f Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/0cd6f80f Branch: refs/heads/master Commit: 0cd6f80f3624f4d46af63f444c3a0175021d2891 Parents: 06f525b Author: Mark Payne <marka...@hotmail.com> Authored: Wed Jan 13 15:11:53 2016 -0500 Committer: Mark Payne <marka...@hotmail.com> Committed: Wed Jan 13 15:11:53 2016 -0500 ---------------------------------------------------------------------- .../src/main/asciidoc/administration-guide.adoc | 2 ++ .../zookeeper/ZooKeeperStateProvider.java | 3 +++ .../standard/AbstractListProcessor.java | 19 ++++++++++--------- .../nifi/processors/standard/TailFile.java | 1 + 4 files changed, 16 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/0cd6f80f/nifi-docs/src/main/asciidoc/administration-guide.adoc ---------------------------------------------------------------------- diff --git a/nifi-docs/src/main/asciidoc/administration-guide.adoc b/nifi-docs/src/main/asciidoc/administration-guide.adoc index 23327e2..8945600 100644 --- a/nifi-docs/src/main/asciidoc/administration-guide.adoc +++ b/nifi-docs/src/main/asciidoc/administration-guide.adoc @@ -553,6 +553,8 @@ Note, the above `kinit` command requires that Kerberos client libraries be insta [source] yum install krb5-workstation krb5-libs krb5-auth-dialog +Once this is complete, the /etc/krb5.conf will need to be configured appropriately for your organization's Kerberos envrionment. + Now, when we start NiFi, it will use Kerberos to authentication as the `nifi` user when communicating with ZooKeeper. http://git-wip-us.apache.org/repos/asf/nifi/blob/0cd6f80f/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/providers/zookeeper/ZooKeeperStateProvider.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/providers/zookeeper/ZooKeeperStateProvider.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/providers/zookeeper/ZooKeeperStateProvider.java index faa0364..984a229 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/providers/zookeeper/ZooKeeperStateProvider.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/state/providers/zookeeper/ZooKeeperStateProvider.java @@ -310,6 +310,9 @@ public class ZooKeeperStateProvider extends AbstractStateProvider { invalidateClient(); setState(stateValues, version, componentId); } + if (Code.NODEEXISTS == ke.code()) { + setState(stateValues, version, componentId); + } throw new IOException("Failed to set cluster-wide state in ZooKeeper for component with ID " + componentId, ke); } catch (final IOException ioe) { http://git-wip-us.apache.org/repos/asf/nifi/blob/0cd6f80f/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractListProcessor.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractListProcessor.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractListProcessor.java index 494f227..fc19ad7 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractListProcessor.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractListProcessor.java @@ -209,12 +209,12 @@ public abstract class AbstractListProcessor<T extends ListableEntity> extends Ab final DistributedMapCacheClient client = context.getProperty(DISTRIBUTED_CACHE_SERVICE).asControllerService(DistributedMapCacheClient.class); // Check if state already exists for this path. If so, we have already migrated the state. - final StateMap stateMap = context.getStateManager().getState(Scope.CLUSTER); + final StateMap stateMap = context.getStateManager().getState(getStateScope(context)); if (stateMap.getVersion() == -1L) { try { // Migrate state from the old way of managing state (distributed cache service and local file) // to the new mechanism (State Manager). - migrateState(path, client, context.getStateManager()); + migrateState(path, client, context.getStateManager(), getStateScope(context)); } catch (final IOException ioe) { throw new IOException("Failed to properly migrate state to State Manager", ioe); } @@ -237,7 +237,7 @@ public abstract class AbstractListProcessor<T extends ListableEntity> extends Ab } if (resetListing) { - context.getStateManager().clear(Scope.CLUSTER); + context.getStateManager().clear(getStateScope(context)); resetListing = false; } } @@ -250,9 +250,10 @@ public abstract class AbstractListProcessor<T extends ListableEntity> extends Ab * @param path the path to migrate state for * @param client the DistributedMapCacheClient that is capable of obtaining the current state * @param stateManager the StateManager to use in order to store the new state + * @param scope the scope to use * @throws IOException if unable to retrieve or store the state */ - private void migrateState(final String path, final DistributedMapCacheClient client, final StateManager stateManager) throws IOException { + private void migrateState(final String path, final DistributedMapCacheClient client, final StateManager stateManager, final Scope scope) throws IOException { Long minTimestamp = null; final Set<String> latestIdentifiersListed = new HashSet<>(); @@ -289,11 +290,11 @@ public abstract class AbstractListProcessor<T extends ListableEntity> extends Ab } if (minTimestamp != null) { - persist(minTimestamp, latestIdentifiersListed, stateManager); + persist(minTimestamp, latestIdentifiersListed, stateManager, scope); } } - private void persist(final long timestamp, final Collection<String> identifiers, final StateManager stateManager) throws IOException { + private void persist(final long timestamp, final Collection<String> identifiers, final StateManager stateManager, final Scope scope) throws IOException { final Map<String, String> updatedState = new HashMap<>(identifiers.size() + 1); updatedState.put(TIMESTAMP, String.valueOf(timestamp)); int counter = 0; @@ -301,7 +302,7 @@ public abstract class AbstractListProcessor<T extends ListableEntity> extends Ab final String index = String.valueOf(++counter); updatedState.put(IDENTIFIER_PREFIX + "." + index, identifier); } - stateManager.setState(updatedState, Scope.CLUSTER); + stateManager.setState(updatedState, scope); } protected String getKey(final String directory) { @@ -322,7 +323,7 @@ public abstract class AbstractListProcessor<T extends ListableEntity> extends Ab // We need to fetch the state from the cluster if we don't yet know the last listing time, // or if we were just elected the primary node if (this.lastListingTime == null || electedPrimaryNode) { - final StateMap stateMap = context.getStateManager().getState(Scope.CLUSTER); + final StateMap stateMap = context.getStateManager().getState(getStateScope(context)); final Map<String, String> stateValues = stateMap.toMap(); final String timestamp = stateValues.get(TIMESTAMP); @@ -409,7 +410,7 @@ public abstract class AbstractListProcessor<T extends ListableEntity> extends Ab for (final T entity : newEntries) { identifiers.add(entity.getIdentifier()); } - persist(latestListingTimestamp, identifiers, context.getStateManager()); + persist(latestListingTimestamp, identifiers, context.getStateManager(), getStateScope(context)); } catch (final IOException ioe) { getLogger().warn("Unable to save state due to {}. If NiFi is restarted before state is saved, or " + "if another node begins executing this Processor, data duplication may occur.", ioe); http://git-wip-us.apache.org/repos/asf/nifi/blob/0cd6f80f/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TailFile.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TailFile.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TailFile.java index f992978..b128366 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TailFile.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TailFile.java @@ -150,6 +150,7 @@ public class TailFile extends AbstractProcessor { properties.add(ROLLING_FILENAME_PATTERN); properties.add(new PropertyDescriptor.Builder().fromPropertyDescriptor(STATE_FILE).defaultValue("./conf/state/" + getIdentifier()).build()); properties.add(START_POSITION); + properties.add(FILE_LOCATION); return properties; }