This is an automated email from the ASF dual-hosted git repository.

markap14 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git

commit 60b8fcac1ba4dff64c28139ddf282d915f4c221b
Author: Mark Payne <[email protected]>
AuthorDate: Tue May 14 11:37:50 2019 -0400

    NIFI-5922: Bug fixes; initialize, setup, and enable controller services; 
code cleanup
---
 .../org/apache/nifi/stateless/NiFiStateless.java   |  3 +-
 .../stateless/core/AbstractStatelessComponent.java |  2 +-
 .../nifi/stateless/core/ComponentFactory.java      | 22 ++++--
 .../StatelessControllerServiceConfiguration.java   |  9 ++-
 ...lessControllerServiceInitializationContext.java | 80 ++++++++++++++++++++++
 .../core/StatelessControllerServiceLookup.java     | 41 +++++++++--
 .../apache/nifi/stateless/core/StatelessFlow.java  | 43 +++++++++---
 .../stateless/core/StatelessProcessContext.java    | 18 ++---
 .../stateless/core/StatelessProcessSession.java    |  3 +-
 .../stateless/core/StatelessProcessorWrapper.java  |  6 +-
 10 files changed, 188 insertions(+), 39 deletions(-)

diff --git 
a/nifi-stateless/nifi-stateless-bootstrap/src/main/java/org/apache/nifi/stateless/NiFiStateless.java
 
b/nifi-stateless/nifi-stateless-bootstrap/src/main/java/org/apache/nifi/stateless/NiFiStateless.java
index 778881b..74b697c 100644
--- 
a/nifi-stateless/nifi-stateless-bootstrap/src/main/java/org/apache/nifi/stateless/NiFiStateless.java
+++ 
b/nifi-stateless/nifi-stateless-bootstrap/src/main/java/org/apache/nifi/stateless/NiFiStateless.java
@@ -43,8 +43,9 @@ public class NiFiStateless {
     public static void main(final String[] args) throws IOException, 
ClassNotFoundException, NoSuchMethodException, InvocationTargetException, 
IllegalAccessException {
 
         String nifi_home = System.getenv("NIFI_HOME");
-        if(nifi_home == null || nifi_home.equals(""))
+        if(nifi_home == null || nifi_home.equals("")) {
             nifi_home = ".";
+        }
 
         final File libDir = new File(nifi_home+"/lib");
         final File statelesslibDir = new File(nifi_home+"/stateless-lib");
diff --git 
a/nifi-stateless/nifi-stateless-core/src/main/java/org/apache/nifi/stateless/core/AbstractStatelessComponent.java
 
b/nifi-stateless/nifi-stateless-core/src/main/java/org/apache/nifi/stateless/core/AbstractStatelessComponent.java
index 67151df..3028e9f 100644
--- 
a/nifi-stateless/nifi-stateless-core/src/main/java/org/apache/nifi/stateless/core/AbstractStatelessComponent.java
+++ 
b/nifi-stateless/nifi-stateless-core/src/main/java/org/apache/nifi/stateless/core/AbstractStatelessComponent.java
@@ -87,7 +87,7 @@ public abstract class AbstractStatelessComponent implements 
StatelessComponent {
             boolean hasSuccessOutputPort = 
this.successOutputPorts.contains(relationship);
 
             if (!(hasChildren || hasAutoterminate || hasFailureOutputPort || 
hasSuccessOutputPort)) {
-                getLogger().error("Component: {}, Relationship: {}, needs 
either auto terminate, child processors, or an output port", new Object[] 
{toString(), relationship.getName()});
+                getLogger().error("Component: {}, Relationship: {}, either 
needs to be auto-terminated or connected to another component", new Object[] 
{toString(), relationship.getName()});
                 return false;
             }
         }
diff --git 
a/nifi-stateless/nifi-stateless-core/src/main/java/org/apache/nifi/stateless/core/ComponentFactory.java
 
b/nifi-stateless/nifi-stateless-core/src/main/java/org/apache/nifi/stateless/core/ComponentFactory.java
index 2fc3459..d0acdc7 100644
--- 
a/nifi-stateless/nifi-stateless-core/src/main/java/org/apache/nifi/stateless/core/ComponentFactory.java
+++ 
b/nifi-stateless/nifi-stateless-core/src/main/java/org/apache/nifi/stateless/core/ComponentFactory.java
@@ -21,7 +21,10 @@ import 
org.apache.nifi.attribute.expression.language.StandardPropertyValue;
 import org.apache.nifi.bundle.Bundle;
 import org.apache.nifi.bundle.BundleCoordinate;
 import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.state.StateManager;
 import org.apache.nifi.controller.ControllerService;
+import org.apache.nifi.controller.ControllerServiceInitializationContext;
+import org.apache.nifi.controller.ControllerServiceLookup;
 import 
org.apache.nifi.controller.exception.ControllerServiceInstantiationException;
 import org.apache.nifi.controller.exception.ProcessorInstantiationException;
 import org.apache.nifi.logging.ComponentLog;
@@ -33,6 +36,8 @@ import org.apache.nifi.registry.VariableRegistry;
 import org.apache.nifi.registry.flow.VersionedControllerService;
 import org.apache.nifi.registry.flow.VersionedProcessor;
 import org.apache.nifi.util.file.classloader.ClassLoaderUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.net.MalformedURLException;
 import java.net.URL;
@@ -44,6 +49,7 @@ import java.util.Map;
 import java.util.Set;
 
 public class ComponentFactory {
+    private static final Logger logger = 
LoggerFactory.getLogger(ComponentFactory.class);
     private final ExtensionManager extensionManager;
 
     public ComponentFactory(final ExtensionManager extensionManager) {
@@ -69,7 +75,7 @@ public class ComponentFactory {
             final ClassLoader detectedClassLoader = 
extensionManager.createInstanceClassLoader(type, identifier, bundle,
                 classpathUrls == null ? Collections.emptySet() : 
classpathUrls);
 
-            System.out.println("Setting context class loader to " + 
detectedClassLoader + " (parent = " + detectedClassLoader.getParent() + ") to 
create " + type);
+            logger.debug("Setting context class loader to {} (parent = {}) to 
create {}", detectedClassLoader, detectedClassLoader.getParent(), type);
             final Class<?> rawClass = Class.forName(type, true, 
detectedClassLoader);
             Thread.currentThread().setContextClassLoader(detectedClassLoader);
 
@@ -138,12 +144,14 @@ public class ComponentFactory {
     }
 
 
-    public ControllerService createControllerService(final 
VersionedControllerService versionedControllerService, final VariableRegistry 
variableRegistry) {
-        return createControllerService(versionedControllerService, 
variableRegistry, null);
+    public ControllerService createControllerService(final 
VersionedControllerService versionedControllerService, final VariableRegistry 
variableRegistry,
+                                                     final 
ControllerServiceLookup serviceLookup, final StateManager stateManager) {
+        return createControllerService(versionedControllerService, 
variableRegistry, null, serviceLookup, stateManager);
     }
 
 
-    private ControllerService createControllerService(final 
VersionedControllerService versionedControllerService, final VariableRegistry 
variableRegistry, final Set<URL> classpathUrls) {
+    private ControllerService createControllerService(final 
VersionedControllerService versionedControllerService, final VariableRegistry 
variableRegistry, final Set<URL> classpathUrls,
+                                                      final 
ControllerServiceLookup serviceLookup, final StateManager stateManager) {
 
         final String type = versionedControllerService.getType();
         final String identifier = versionedControllerService.getIdentifier();
@@ -161,7 +169,7 @@ public class ComponentFactory {
             final ClassLoader detectedClassLoader = 
extensionManager.createInstanceClassLoader(type, identifier, bundle,
                 classpathUrls == null ? Collections.emptySet() : 
classpathUrls);
 
-            System.out.println("Setting context class loader to " + 
detectedClassLoader + " (parent = " + detectedClassLoader.getParent() + ") to 
create " + type);
+            logger.debug("Setting context class loader to {} (parent = {}) to 
create {}", detectedClassLoader, detectedClassLoader.getParent(), type);
             final Class<?> rawClass = Class.forName(type, true, 
detectedClassLoader);
             Thread.currentThread().setContextClassLoader(detectedClassLoader);
 
@@ -169,6 +177,8 @@ public class ComponentFactory {
             final ComponentLog componentLog = new 
SLF4JComponentLog(extensionInstance);
 
             final ControllerService service = (ControllerService) 
extensionInstance;
+            final ControllerServiceInitializationContext initializationContext 
= new StatelessControllerServiceInitializationContext(identifier, service, 
serviceLookup, stateManager);
+            service.initialize(initializationContext);
 
             // If no classpath urls were provided, check if we need to add 
additional classpath URL's based on configured properties.
             if (classpathUrls == null) {
@@ -176,7 +186,7 @@ public class ComponentFactory {
                     variableRegistry, componentLog);
 
                 if (!additionalClasspathUrls.isEmpty()) {
-                    return createControllerService(versionedControllerService, 
variableRegistry, additionalClasspathUrls);
+                    return createControllerService(versionedControllerService, 
variableRegistry, additionalClasspathUrls, serviceLookup, stateManager);
                 }
             }
 
diff --git 
a/nifi-stateless/nifi-stateless-core/src/main/java/org/apache/nifi/stateless/core/StatelessControllerServiceConfiguration.java
 
b/nifi-stateless/nifi-stateless-core/src/main/java/org/apache/nifi/stateless/core/StatelessControllerServiceConfiguration.java
index e2d2a74..5cd32cf 100644
--- 
a/nifi-stateless/nifi-stateless-core/src/main/java/org/apache/nifi/stateless/core/StatelessControllerServiceConfiguration.java
+++ 
b/nifi-stateless/nifi-stateless-core/src/main/java/org/apache/nifi/stateless/core/StatelessControllerServiceConfiguration.java
@@ -27,12 +27,15 @@ import java.util.concurrent.atomic.AtomicBoolean;
 public class StatelessControllerServiceConfiguration {
 
     private final ControllerService service;
+    private final String name;
+
     private final AtomicBoolean enabled = new AtomicBoolean(false);
     private String annotationData;
     private Map<PropertyDescriptor, String> properties = new HashMap<>();
 
-    public StatelessControllerServiceConfiguration(final ControllerService 
service) {
+    public StatelessControllerServiceConfiguration(final ControllerService 
service, final String name) {
         this.service = service;
+        this.name = name;
     }
 
     public ControllerService getService() {
@@ -75,4 +78,8 @@ public class StatelessControllerServiceConfiguration {
     public Map<PropertyDescriptor, String> getProperties() {
         return Collections.unmodifiableMap(properties);
     }
+
+    public String getName() {
+        return name;
+    }
 }
diff --git 
a/nifi-stateless/nifi-stateless-core/src/main/java/org/apache/nifi/stateless/core/StatelessControllerServiceInitializationContext.java
 
b/nifi-stateless/nifi-stateless-core/src/main/java/org/apache/nifi/stateless/core/StatelessControllerServiceInitializationContext.java
new file mode 100644
index 0000000..b0801f0
--- /dev/null
+++ 
b/nifi-stateless/nifi-stateless-core/src/main/java/org/apache/nifi/stateless/core/StatelessControllerServiceInitializationContext.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.stateless.core;
+
+import org.apache.nifi.components.state.StateManager;
+import org.apache.nifi.controller.ControllerService;
+import org.apache.nifi.controller.ControllerServiceInitializationContext;
+import org.apache.nifi.controller.ControllerServiceLookup;
+import org.apache.nifi.controller.NodeTypeProvider;
+import org.apache.nifi.logging.ComponentLog;
+
+import java.io.File;
+
+public class StatelessControllerServiceInitializationContext implements 
ControllerServiceInitializationContext {
+    private final ComponentLog logger;
+    private final String processorId;
+    private final ControllerServiceLookup controllerServiceLookup;
+    private final StateManager stateManager;
+
+    public StatelessControllerServiceInitializationContext(final String id, 
final ControllerService controllerService, final ControllerServiceLookup 
serviceLookup, final StateManager stateManager) {
+        processorId = id;
+        logger = new SLF4JComponentLog(controllerService);
+        controllerServiceLookup = serviceLookup;
+        this.stateManager = stateManager;
+    }
+
+    public String getIdentifier() {
+        return processorId;
+    }
+
+    public ComponentLog getLogger() {
+        return logger;
+    }
+
+    @Override
+    public StateManager getStateManager() {
+        return stateManager;
+    }
+
+    public ControllerServiceLookup getControllerServiceLookup() {
+        return controllerServiceLookup;
+    }
+
+    public NodeTypeProvider getNodeTypeProvider() {
+        return new NodeTypeProvider() {
+            public boolean isClustered() {
+                return false;
+            }
+
+            public boolean isPrimary() {
+                return false;
+            }
+        };
+    }
+
+    public String getKerberosServicePrincipal() {
+        return null; //this needs to be wired in.
+    }
+
+    public File getKerberosServiceKeytab() {
+        return null; //this needs to be wired in.
+    }
+
+    public File getKerberosConfigurationFile() {
+        return null; //this needs to be wired in.
+    }}
diff --git 
a/nifi-stateless/nifi-stateless-core/src/main/java/org/apache/nifi/stateless/core/StatelessControllerServiceLookup.java
 
b/nifi-stateless/nifi-stateless-core/src/main/java/org/apache/nifi/stateless/core/StatelessControllerServiceLookup.java
index 00b1c48..f220ed8 100644
--- 
a/nifi-stateless/nifi-stateless-core/src/main/java/org/apache/nifi/stateless/core/StatelessControllerServiceLookup.java
+++ 
b/nifi-stateless/nifi-stateless-core/src/main/java/org/apache/nifi/stateless/core/StatelessControllerServiceLookup.java
@@ -21,6 +21,7 @@ import org.apache.nifi.annotation.lifecycle.OnEnabled;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.components.ValidationContext;
 import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.components.state.StateManager;
 import org.apache.nifi.controller.ConfigurationContext;
 import org.apache.nifi.controller.ControllerService;
 import org.apache.nifi.controller.ControllerServiceLookup;
@@ -28,6 +29,7 @@ import org.apache.nifi.registry.VariableRegistry;
 import org.apache.nifi.reporting.InitializationException;
 
 import java.lang.reflect.InvocationTargetException;
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
@@ -47,7 +49,7 @@ public class StatelessControllerServiceLookup implements 
ControllerServiceLookup
     }
 
 
-    public void addControllerService(final ControllerService service) throws 
InitializationException {
+    public void addControllerService(final ControllerService service, final 
String serviceName) throws InitializationException {
         final String identifier = service.getIdentifier();
         final SLF4JComponentLog logger = new SLF4JComponentLog(service);
         controllerServiceLoggers.put(identifier, logger);
@@ -55,7 +57,7 @@ public class StatelessControllerServiceLookup implements 
ControllerServiceLookup
         StatelessStateManager serviceStateManager = new 
StatelessStateManager();
         controllerServiceStateManagers.put(identifier, serviceStateManager);
 
-        final StatelessProcessContext initContext = new 
StatelessProcessContext(requireNonNull(service), this, 
requireNonNull(identifier), logger, serviceStateManager);
+        final StatelessProcessContext initContext = new 
StatelessProcessContext(requireNonNull(service), this, serviceName, logger, 
serviceStateManager);
         service.initialize(initContext);
 
         try {
@@ -64,7 +66,7 @@ public class StatelessControllerServiceLookup implements 
ControllerServiceLookup
             throw new InitializationException(e);
         }
 
-        final StatelessControllerServiceConfiguration config = new 
StatelessControllerServiceConfiguration(service);
+        final StatelessControllerServiceConfiguration config = new 
StatelessControllerServiceConfiguration(service, serviceName);
         controllerServiceMap.put(identifier, config);
     }
 
@@ -116,7 +118,33 @@ public class StatelessControllerServiceLookup implements 
ControllerServiceLookup
         return status == null ? null : serviceIdentifier;
     }
 
-    public void enableControllerService(final ControllerService service, 
VariableRegistry registry) throws InvocationTargetException, 
IllegalAccessException {
+
+    public void enableControllerServices(final VariableRegistry 
variableRegistry) {
+        for (final StatelessControllerServiceConfiguration config : 
controllerServiceMap.values()) {
+            final ControllerService service = config.getService();
+            final Collection<ValidationResult> validationResults = 
validate(service, config.getName(), variableRegistry);
+            if (!validationResults.isEmpty()) {
+                throw new RuntimeException("Failed to enable Controller 
Service {id=" + service.getIdentifier() + ", name=" + config.getName() + ", 
type=" + service.getClass() + "} because " +
+                    "validation failed: " + validationResults);
+            }
+
+            try {
+                enableControllerService(service, variableRegistry);
+            } catch (IllegalAccessException| InvocationTargetException e) {
+                throw new RuntimeException("Failed to enable Controller 
Service {id=" + service.getIdentifier() + ", name=" + config.getName() + ", 
type=" + service.getClass() + "}", e);
+            }
+        }
+    }
+
+    public Collection<ValidationResult> validate(final ControllerService 
service, final String serviceName, final VariableRegistry variableRegistry) {
+        final StateManager stateManager = 
controllerServiceStateManagers.get(service.getIdentifier());
+        final SLF4JComponentLog logger = 
controllerServiceLoggers.get(service.getIdentifier());
+        final StatelessProcessContext processContext = new 
StatelessProcessContext(service, this, serviceName, logger, stateManager, 
variableRegistry);
+        final StatelessValidationContext validationContext = new 
StatelessValidationContext(processContext, this, stateManager, 
variableRegistry);
+        return service.validate(validationContext);
+    }
+
+    private void enableControllerService(final ControllerService service, 
final VariableRegistry registry) throws InvocationTargetException, 
IllegalAccessException {
         final StatelessControllerServiceConfiguration configuration = 
getConfiguration(service.getIdentifier());
         if (configuration == null) {
             throw new IllegalArgumentException("Controller Service " + service 
+ " is not known");
@@ -125,6 +153,7 @@ public class StatelessControllerServiceLookup implements 
ControllerServiceLookup
         if (configuration.isEnabled()) {
             throw new IllegalStateException("Cannot enable Controller Service 
" + service + " because it is not disabled");
         }
+
         final ConfigurationContext configContext = new 
StatelessConfigurationContext(service, configuration.getProperties(), this, 
registry);
         ReflectionUtils.invokeMethodsWithAnnotation(OnEnabled.class, service, 
configContext);
 
@@ -150,8 +179,8 @@ public class StatelessControllerServiceLookup implements 
ControllerServiceLookup
         return configuration;
     }
 
-    public ValidationResult setControllerServiceProperty(final 
ControllerService service, final PropertyDescriptor property, final 
StatelessProcessContext context, final VariableRegistry registry, final
-    String value) {
+    public ValidationResult setControllerServiceProperty(final 
ControllerService service, final PropertyDescriptor property, final 
StatelessProcessContext context,
+                                                         final 
VariableRegistry registry, final String value) {
         final StatelessStateManager serviceStateManager = 
controllerServiceStateManagers.get(service.getIdentifier());
         if (serviceStateManager == null) {
             throw new IllegalStateException("Controller service " + service + 
" has not been added to this TestRunner via the #addControllerService method");
diff --git 
a/nifi-stateless/nifi-stateless-core/src/main/java/org/apache/nifi/stateless/core/StatelessFlow.java
 
b/nifi-stateless/nifi-stateless-core/src/main/java/org/apache/nifi/stateless/core/StatelessFlow.java
index a2a8f04..2b17376 100644
--- 
a/nifi-stateless/nifi-stateless-core/src/main/java/org/apache/nifi/stateless/core/StatelessFlow.java
+++ 
b/nifi-stateless/nifi-stateless-core/src/main/java/org/apache/nifi/stateless/core/StatelessFlow.java
@@ -19,11 +19,10 @@ package org.apache.nifi.stateless.core;
 import com.google.gson.JsonArray;
 import com.google.gson.JsonElement;
 import com.google.gson.JsonObject;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.state.StateManager;
 import org.apache.nifi.controller.ControllerService;
 import org.apache.nifi.controller.exception.ProcessorInstantiationException;
-import org.apache.nifi.stateless.bootstrap.ExtensionDiscovery;
-import org.apache.nifi.stateless.bootstrap.InMemoryFlowFile;
-import org.apache.nifi.stateless.bootstrap.RunnableFlow;
 import org.apache.nifi.nar.ExtensionManager;
 import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.registry.VariableDescriptor;
@@ -40,6 +39,9 @@ import org.apache.nifi.registry.flow.VersionedRemoteGroupPort;
 import org.apache.nifi.registry.flow.VersionedRemoteProcessGroup;
 import org.apache.nifi.reporting.InitializationException;
 import org.apache.nifi.security.util.SslContextFactory;
+import org.apache.nifi.stateless.bootstrap.ExtensionDiscovery;
+import org.apache.nifi.stateless.bootstrap.InMemoryFlowFile;
+import org.apache.nifi.stateless.bootstrap.RunnableFlow;
 
 import javax.net.ssl.SSLContext;
 import java.io.File;
@@ -117,12 +119,36 @@ public class StatelessFlow implements RunnableFlow {
 
         final Set<VersionedControllerService> controllerServices = 
flow.getControllerServices();
         for (final VersionedControllerService versionedControllerService : 
controllerServices) {
-            final ControllerService service = 
componentFactory.createControllerService(versionedControllerService, 
variableRegistry);
-            serviceLookup.addControllerService(service);
+            final StateManager stateManager = new StatelessStateManager();
+
+            final ControllerService service = 
componentFactory.createControllerService(versionedControllerService, 
variableRegistry, serviceLookup, stateManager);
+            serviceLookup.addControllerService(service, 
versionedControllerService.getName());
+            serviceLookup.setControllerServiceAnnotationData(service, 
versionedControllerService.getAnnotationData());
+
+            final SLF4JComponentLog logger = new SLF4JComponentLog(service);
+            final StatelessProcessContext processContext = new 
StatelessProcessContext(service, serviceLookup, 
versionedControllerService.getName(), logger, stateManager, variableRegistry);
+
+            final Map<String, String> versionedPropertyValues = 
versionedControllerService.getProperties();
+            for (final Map.Entry<String, String> entry : 
versionedPropertyValues.entrySet()) {
+                final String propertyName = entry.getKey();
+                final String propertyValue = entry.getValue();
+                final PropertyDescriptor descriptor = 
service.getPropertyDescriptor(propertyName);
+
+                serviceLookup.setControllerServiceProperty(service, 
descriptor, processContext, variableRegistry, propertyValue);
+            }
+
+            for (final PropertyDescriptor descriptor : 
service.getPropertyDescriptors()) {
+                final String versionedPropertyValue = 
versionedPropertyValues.get(descriptor.getName());
+                if (versionedPropertyValue == null && 
descriptor.getDefaultValue() != null) {
+                    serviceLookup.setControllerServiceProperty(service, 
descriptor, processContext, variableRegistry, descriptor.getDefaultValue());
+                }
+            }
         }
 
-        final Map<String, StatelessComponent> componentMap = new HashMap<>();
+        serviceLookup.enableControllerServices(variableRegistry);
 
+
+        final Map<String, StatelessComponent> componentMap = new HashMap<>();
         for (final VersionedConnection connection : connections) {
             boolean isInputPortConnection = false;
 
@@ -249,10 +275,9 @@ public class StatelessFlow implements RunnableFlow {
             }
         }
 
-        roots = componentMap.entrySet()
+        roots = componentMap.values()
             .stream()
-            .filter(e -> e.getValue().getParents().isEmpty())
-            .map(Map.Entry::getValue)
+            .filter(statelessComponent -> 
statelessComponent.getParents().isEmpty())
             .collect(Collectors.toList());
     }
 
diff --git 
a/nifi-stateless/nifi-stateless-core/src/main/java/org/apache/nifi/stateless/core/StatelessProcessContext.java
 
b/nifi-stateless/nifi-stateless-core/src/main/java/org/apache/nifi/stateless/core/StatelessProcessContext.java
index 7566b5c..3766b07 100644
--- 
a/nifi-stateless/nifi-stateless-core/src/main/java/org/apache/nifi/stateless/core/StatelessProcessContext.java
+++ 
b/nifi-stateless/nifi-stateless-core/src/main/java/org/apache/nifi/stateless/core/StatelessProcessContext.java
@@ -70,29 +70,25 @@ public class StatelessProcessContext implements 
SchedulingContext, ControllerSer
 
     private final StatelessControllerServiceLookup lookup;
 
-    public StatelessProcessContext(final ConfigurableComponent component, 
final StatelessControllerServiceLookup lookup, final String componentName, 
final StateManager stateManager, final VariableRegistry
-        variableRegistry) {
+    public StatelessProcessContext(final ConfigurableComponent component, 
final StatelessControllerServiceLookup lookup, final String componentName, 
final StateManager stateManager,
+                                   final VariableRegistry variableRegistry) {
         this(component, lookup, componentName, new 
SLF4JComponentLog(component), stateManager, variableRegistry);
     }
 
-    public StatelessProcessContext(final ConfigurableComponent component, 
final StatelessControllerServiceLookup lookup, final String componentName, 
final SLF4JComponentLog logger, final StatelessStateManager
-        statemanager) {
+    public StatelessProcessContext(final ConfigurableComponent component, 
final StatelessControllerServiceLookup lookup, final String componentName, 
final SLF4JComponentLog logger,
+                                   final StatelessStateManager statemanager) {
         this(component, lookup, componentName, logger, statemanager, 
VariableRegistry.EMPTY_REGISTRY);
     }
 
-    public StatelessProcessContext(final ConfigurableComponent component,
-                                   final StatelessControllerServiceLookup 
lookup,
-                                   final String componentName,
-                                   final SLF4JComponentLog logger,
-                                   final StateManager stateManager,
-                                   final VariableRegistry variableRegistry) {
+    public StatelessProcessContext(final ConfigurableComponent component, 
final StatelessControllerServiceLookup lookup, final String componentName,
+                                   final SLF4JComponentLog logger, final 
StateManager stateManager, final VariableRegistry variableRegistry) {
         this.component = Objects.requireNonNull(component);
         this.componentName = componentName == null ? "" : componentName;
         this.inputRequirement = 
component.getClass().getAnnotation(InputRequirement.class);
         this.lookup = lookup;
         this.stateManager = stateManager;
         this.variableRegistry = variableRegistry;
-        this.identifier = "ProcessContext-" + this.hashCode();
+        this.identifier = component.getIdentifier();
         this.logger = logger;
     }
 
diff --git 
a/nifi-stateless/nifi-stateless-core/src/main/java/org/apache/nifi/stateless/core/StatelessProcessSession.java
 
b/nifi-stateless/nifi-stateless-core/src/main/java/org/apache/nifi/stateless/core/StatelessProcessSession.java
index 431b5cf..a6c5dec 100644
--- 
a/nifi-stateless/nifi-stateless-core/src/main/java/org/apache/nifi/stateless/core/StatelessProcessSession.java
+++ 
b/nifi-stateless/nifi-stateless-core/src/main/java/org/apache/nifi/stateless/core/StatelessProcessSession.java
@@ -777,12 +777,13 @@ public class StatelessProcessSession implements 
ProcessSession {
             throw new IllegalArgumentException("Cannot export a flow file that 
I did not create");
         }
 
-        final StatelessFlowFile StatelessFlowFile = validateState(flowFile);
+        validateState(flowFile);
         final ByteArrayOutputStream baos = new ByteArrayOutputStream() {
             @Override
             public void close() throws IOException {
                 super.close();
                 final StatelessFlowFile newFlowFile = new 
StatelessFlowFile((StatelessFlowFile) flowFile, materializeContent);
+                newFlowFile.setData(toByteArray());
                 currentVersions.put(newFlowFile.getId(), newFlowFile);
             }
         };
diff --git 
a/nifi-stateless/nifi-stateless-core/src/main/java/org/apache/nifi/stateless/core/StatelessProcessorWrapper.java
 
b/nifi-stateless/nifi-stateless-core/src/main/java/org/apache/nifi/stateless/core/StatelessProcessorWrapper.java
index dd4f4a8..108b2e2 100644
--- 
a/nifi-stateless/nifi-stateless-core/src/main/java/org/apache/nifi/stateless/core/StatelessProcessorWrapper.java
+++ 
b/nifi-stateless/nifi-stateless-core/src/main/java/org/apache/nifi/stateless/core/StatelessProcessorWrapper.java
@@ -107,8 +107,8 @@ public class StatelessProcessorWrapper extends 
AbstractStatelessComponent implem
         //Validate context
         final Collection<ValidationResult> validationResult = 
context.validate();
         if (validationResult.stream().anyMatch(a -> !a.isValid()) || 
!this.validate()) {
-            throw new IllegalArgumentException("Processor is not valid: "
-                + String.join("\n", 
validationResult.stream().map(ValidationResult::toString).collect(Collectors.toList())));
+            throw new IllegalArgumentException(processor + " is not valid: "
+                + 
validationResult.stream().map(ValidationResult::toString).collect(Collectors.joining("\n")));
         }
 
         try (final CloseableNarLoader c = withNarClassLoader()) {
@@ -147,7 +147,7 @@ public class StatelessProcessorWrapper extends 
AbstractStatelessComponent implem
             final AtomicBoolean nextStepCalled = new AtomicBoolean(false);
 
             try {
-                logger.info("Running " + 
this.processor.getClass().getSimpleName() + ".onTrigger with " + 
inputQueue.size() + " FlowFiles");
+                logger.debug("Running {}.onTrigger with {} FlowFiles", new 
Object[] {this.processor.getClass().getSimpleName(), inputQueue.size()});
 
                 try (final CloseableNarLoader c = withNarClassLoader()) { // 
Trigger processor with the appropriate class loader
                     processor.onTrigger(context, () -> {

Reply via email to