Repository: nifi
Updated Branches:
  refs/heads/master 2094786ec -> 92b4a3208


NIFI-5136 Ensure processor references are removed from LogRepository and from 
ProcessScheduler
- Forcing FileSystem statistics thread to be interrupted when HDFS processors 
are stopped
- Stop creating temp components during import from registry, use bundle info 
instead

This closes #2668.

Signed-off-by: Mark Payne <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/92b4a320
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/92b4a320
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/92b4a320

Branch: refs/heads/master
Commit: 92b4a3208fddd44f0d7e3d618761b9db238b758c
Parents: 2094786
Author: Bryan Bende <[email protected]>
Authored: Fri Apr 27 14:52:58 2018 -0400
Committer: Mark Payne <[email protected]>
Committed: Tue May 8 15:29:37 2018 -0400

----------------------------------------------------------------------
 .../hadoop/AbstractHadoopProcessor.java         | 57 ++++++++++++++++++++
 .../nifi/controller/ProcessScheduler.java       |  8 +++
 .../nifi/logging/LogRepositoryFactory.java      |  4 ++
 .../apache/nifi/controller/FlowController.java  | 15 ++++--
 .../scheduling/StandardProcessScheduler.java    |  5 ++
 .../StandardControllerServiceProvider.java      |  2 +
 .../nifi/groups/StandardProcessGroup.java       | 29 ++++++----
 7 files changed, 106 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/92b4a320/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractHadoopProcessor.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractHadoopProcessor.java
 
b/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractHadoopProcessor.java
index 1eca5af..4104e30 100644
--- 
a/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractHadoopProcessor.java
+++ 
b/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractHadoopProcessor.java
@@ -23,6 +23,7 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.compress.CompressionCodecFactory;
 import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.SaslPlainServer;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.nifi.annotation.behavior.RequiresInstanceClassLoading;
 import org.apache.nifi.annotation.lifecycle.OnScheduled;
@@ -44,10 +45,12 @@ import javax.net.SocketFactory;
 import java.io.File;
 import java.io.IOException;
 import java.lang.ref.WeakReference;
+import java.lang.reflect.Field;
 import java.net.InetSocketAddress;
 import java.net.Socket;
 import java.net.URI;
 import java.security.PrivilegedExceptionAction;
+import java.security.Security;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -257,9 +260,63 @@ public abstract class AbstractHadoopProcessor extends 
AbstractProcessor {
 
     @OnStopped
     public final void abstractOnStopped() {
+        final HdfsResources resources = hdfsResources.get();
+        if (resources != null) {
+            // Attempt to close the FileSystem
+            final FileSystem fileSystem = resources.getFileSystem();
+            try {
+                interruptStatisticsThread(fileSystem);
+            } catch (Exception e) {
+                getLogger().warn("Error stopping FileSystem statistics thread: 
" + e.getMessage(), e);
+            } finally {
+                if (fileSystem != null) {
+                    try {
+                        fileSystem.close();
+                    } catch (IOException e) {
+                        getLogger().warn("Error close FileSystem: " + 
e.getMessage(), e);
+                    }
+                }
+            }
+
+            // Clean-up the static reference to the Configuration instance
+            UserGroupInformation.setConfiguration(new Configuration());
+
+            // Clean-up the reference to the InstanceClassLoader that was put 
into Configuration
+            final Configuration configuration = resources.getConfiguration();
+            configuration.setClassLoader(null);
+
+            // Need to remove the Provider instance from the JVM's Providers 
class so that InstanceClassLoader can be GC'd eventually
+            final SaslPlainServer.SecurityProvider saslProvider = new 
SaslPlainServer.SecurityProvider();
+            Security.removeProvider(saslProvider.getName());
+        }
+
+        // Clear out the reference to the resources
         hdfsResources.set(new HdfsResources(null, null, null));
     }
 
+    private void interruptStatisticsThread(final FileSystem fileSystem) throws 
NoSuchFieldException, IllegalAccessException {
+        final Field statsField = 
FileSystem.class.getDeclaredField("statistics");
+        statsField.setAccessible(true);
+
+        final Object statsObj = statsField.get(fileSystem);
+        if (statsObj != null && statsObj instanceof FileSystem.Statistics) {
+            final FileSystem.Statistics statistics = (FileSystem.Statistics) 
statsObj;
+
+            final Field statsThreadField = 
statistics.getClass().getDeclaredField("STATS_DATA_CLEANER");
+            statsThreadField.setAccessible(true);
+
+            final Object statsThreadObj = statsThreadField.get(statistics);
+            if (statsThreadObj != null && statsThreadObj instanceof Thread) {
+                final Thread statsThread = (Thread) statsThreadObj;
+                try {
+                    statsThread.interrupt();
+                } catch (Exception e) {
+                    getLogger().warn("Error interrupting thread: " + 
e.getMessage(), e);
+                }
+            }
+        }
+    }
+
     private static Configuration getConfigurationFromResources(final 
Configuration config, String configResources) throws IOException {
         boolean foundResources = false;
         if (null != configResources) {

http://git-wip-us.apache.org/repos/asf/nifi/blob/92b4a320/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessScheduler.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessScheduler.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessScheduler.java
index 9231382..de005e4 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessScheduler.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessScheduler.java
@@ -83,6 +83,14 @@ public interface ProcessScheduler {
      */
     void terminateProcessor(ProcessorNode procNode);
 
+    /*
+     * Notifies the schedule that the given processor is being removed so the 
scheduler may clean up any resources
+     * related to the given processor.
+     *
+     * @param procNode the processor node being removed
+     */
+    void onProcessorRemoved(ProcessorNode procNode);
+
     /**
      * Starts scheduling the given Port to run. If the Port is already 
scheduled
      * to run, does nothing.

http://git-wip-us.apache.org/repos/asf/nifi/blob/92b4a320/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/logging/LogRepositoryFactory.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/logging/LogRepositoryFactory.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/logging/LogRepositoryFactory.java
index d7fa3fc..530b6ef 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/logging/LogRepositoryFactory.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/logging/LogRepositoryFactory.java
@@ -58,4 +58,8 @@ public class LogRepositoryFactory {
 
         return repository;
     }
+
+    public static LogRepository removeRepository(final String componentId) {
+        return repositoryMap.remove(requireNonNull(componentId));
+    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/92b4a320/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
index ba4075e..92ab7b3 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
@@ -1129,6 +1129,10 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
 
         boolean creationSuccessful;
         LoggableComponent<Processor> processor;
+
+        // make sure the first reference to LogRepository happens outside of a 
NarCloseable so that we use the framework's ClassLoader
+        final LogRepository logRepository = 
LogRepositoryFactory.getRepository(id);
+
         try {
             processor = instantiateProcessor(type, id, coordinate, 
additionalUrls);
             creationSuccessful = true;
@@ -1154,7 +1158,6 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
                 componentType, type, nifiProperties, componentVarRegistry, 
this, true);
         }
 
-        final LogRepository logRepository = 
LogRepositoryFactory.getRepository(id);
         if (registerLogObserver) {
             
logRepository.addObserver(StandardProcessorNode.BULLETIN_OBSERVER_ID, 
LogLevel.WARN, new ProcessorLogObserver(getBulletinRepository(), procNode));
         }
@@ -3371,6 +3374,10 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
 
         LoggableComponent<ReportingTask> task = null;
         boolean creationSuccessful = true;
+
+        // make sure the first reference to LogRepository happens outside of a 
NarCloseable so that we use the framework's ClassLoader
+        final LogRepository logRepository = 
LogRepositoryFactory.getRepository(id);
+
         try {
             task = instantiateReportingTask(type, id, bundleCoordinate, 
additionalUrls);
         } catch (final Exception e) {
@@ -3418,7 +3425,6 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
             reportingTasks.put(id, taskNode);
 
             // Register log observer to provide bulletins when reporting task 
logs anything at WARN level or above
-            final LogRepository logRepository = 
LogRepositoryFactory.getRepository(id);
             
logRepository.addObserver(StandardProcessorNode.BULLETIN_OBSERVER_ID, 
LogLevel.WARN,
                     new ReportingTaskLogObserver(getBulletinRepository(), 
taskNode));
         }
@@ -3551,6 +3557,7 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
         }
 
         reportingTasks.remove(reportingTaskNode.getIdentifier());
+        
LogRepositoryFactory.removeRepository(reportingTaskNode.getIdentifier());
         
ExtensionManager.removeInstanceClassLoader(reportingTaskNode.getIdentifier());
     }
 
@@ -3565,10 +3572,12 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
 
     @Override
     public ControllerServiceNode createControllerService(final String type, 
final String id, final BundleCoordinate bundleCoordinate, final Set<URL> 
additionalUrls, final boolean firstTimeAdded) {
+        // make sure the first reference to LogRepository happens outside of a 
NarCloseable so that we use the framework's ClassLoader
+        final LogRepository logRepository = 
LogRepositoryFactory.getRepository(id);
+
         final ControllerServiceNode serviceNode = 
controllerServiceProvider.createControllerService(type, id, bundleCoordinate, 
additionalUrls, firstTimeAdded);
 
         // Register log observer to provide bulletins when reporting task logs 
anything at WARN level or above
-        final LogRepository logRepository = 
LogRepositoryFactory.getRepository(id);
         logRepository.addObserver(StandardProcessorNode.BULLETIN_OBSERVER_ID, 
LogLevel.WARN,
                 new ControllerServiceLogObserver(getBulletinRepository(), 
serviceNode));
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/92b4a320/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java
index 7315ef6..491abf0 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java
@@ -378,6 +378,11 @@ public final class StandardProcessScheduler implements 
ProcessScheduler {
     }
 
     @Override
+    public void onProcessorRemoved(final ProcessorNode procNode) {
+        this.lifecycleStates.remove(procNode);
+    }
+
+    @Override
     public void yield(final ProcessorNode procNode) {
         // This exists in the ProcessScheduler so that the scheduler can take
         // advantage of the fact that

http://git-wip-us.apache.org/repos/asf/nifi/blob/92b4a320/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java
index cba1978..640fcf7 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java
@@ -62,6 +62,7 @@ import 
org.apache.nifi.controller.scheduling.StandardProcessScheduler;
 import org.apache.nifi.events.BulletinFactory;
 import org.apache.nifi.groups.ProcessGroup;
 import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.logging.LogRepositoryFactory;
 import org.apache.nifi.nar.ExtensionManager;
 import org.apache.nifi.nar.NarCloseable;
 import org.apache.nifi.processor.SimpleProcessLogger;
@@ -703,6 +704,7 @@ public class StandardControllerServiceProvider implements 
ControllerServiceProvi
         }
 
         group.removeControllerService(serviceNode);
+        LogRepositoryFactory.removeRepository(serviceNode.getIdentifier());
         
ExtensionManager.removeInstanceClassLoader(serviceNode.getIdentifier());
         serviceCache.remove(serviceNode.getIdentifier());
     }

http://git-wip-us.apache.org/repos/asf/nifi/blob/92b4a320/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
index 551d39e..7357756 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
@@ -914,8 +914,8 @@ public final class StandardProcessGroup implements 
ProcessGroup {
             processors.remove(id);
             onComponentModified();
 
+            scheduler.onProcessorRemoved(processor);
             flowController.onProcessorRemoved(processor);
-            
LogRepositoryFactory.getRepository(processor.getIdentifier()).removeAllObservers();
 
             final StateManagerProvider stateManagerProvider = 
flowController.getStateManagerProvider();
             scheduler.submitFrameworkTask(new Runnable() {
@@ -936,6 +936,7 @@ public final class StandardProcessGroup implements 
ProcessGroup {
         } finally {
             if (removed) {
                 try {
+                    
LogRepositoryFactory.removeRepository(processor.getIdentifier());
                     ExtensionManager.removeInstanceClassLoader(id);
                 } catch (Throwable t) {
                 }
@@ -4462,11 +4463,14 @@ public final class StandardProcessGroup implements 
ProcessGroup {
                 .forEach(proc -> 
proposedProcessors.remove(proc.getVersionedComponentId().get()));
 
             for (final VersionedProcessor processorToAdd : 
proposedProcessors.values()) {
-                final BundleCoordinate coordinate = 
toCoordinate(processorToAdd.getBundle());
-                try {
-                    flowController.createProcessor(processorToAdd.getType(), 
UUID.randomUUID().toString(), coordinate, false);
-                } catch (Exception e) {
-                    throw new IllegalArgumentException("Unable to create 
Processor of type " + processorToAdd.getType(), e);
+                final String processorToAddClass = processorToAdd.getType();
+                final BundleCoordinate processorToAddCoordinate = 
toCoordinate(processorToAdd.getBundle());
+
+                final boolean bundleExists = 
ExtensionManager.getBundles(processorToAddClass).stream()
+                        .anyMatch(b -> 
processorToAddCoordinate.equals(b.getBundleDetails().getCoordinate()));
+
+                if (!bundleExists) {
+                    throw new IllegalArgumentException("Unknown bundle " + 
processorToAddCoordinate.toString() + " for processor type " + 
processorToAddClass);
                 }
             }
 
@@ -4479,11 +4483,14 @@ public final class StandardProcessGroup implements 
ProcessGroup {
                 .forEach(service -> 
proposedServices.remove(service.getVersionedComponentId().get()));
 
             for (final VersionedControllerService serviceToAdd : 
proposedServices.values()) {
-                final BundleCoordinate coordinate = 
toCoordinate(serviceToAdd.getBundle());
-                try {
-                    
flowController.createControllerService(serviceToAdd.getType(), 
UUID.randomUUID().toString(), coordinate, Collections.emptySet(), false);
-                } catch (Exception e) {
-                    throw new IllegalArgumentException("Unable to create 
Controller Service of type " + serviceToAdd.getType(), e);
+                final String serviceToAddClass = serviceToAdd.getType();
+                final BundleCoordinate serviceToAddCoordinate = 
toCoordinate(serviceToAdd.getBundle());
+
+                final boolean bundleExists = 
ExtensionManager.getBundles(serviceToAddClass).stream()
+                        .anyMatch(b -> 
serviceToAddCoordinate.equals(b.getBundleDetails().getCoordinate()));
+
+                if (!bundleExists) {
+                    throw new IllegalArgumentException("Unknown bundle " + 
serviceToAddCoordinate.toString() + " for service type " + serviceToAddClass);
                 }
             }
 

Reply via email to