markap14 commented on a change in pull request #4669:
URL: https://github.com/apache/nifi/pull/4669#discussion_r526305234



##########
File path: 
nifi-nar-bundles/nifi-framework-bundle/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/flow/StandardStatelessDataflowFactory.java
##########
@@ -0,0 +1,306 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.stateless.flow;
+
+import org.apache.nifi.components.state.HashMapStateProvider;
+import org.apache.nifi.components.state.StateManagerProvider;
+import org.apache.nifi.components.state.StateProvider;
+import org.apache.nifi.controller.kerberos.KerberosConfig;
+import org.apache.nifi.controller.repository.ContentRepository;
+import org.apache.nifi.controller.repository.CounterRepository;
+import org.apache.nifi.controller.repository.FlowFileEventRepository;
+import org.apache.nifi.controller.repository.FlowFileRepository;
+import org.apache.nifi.controller.repository.StandardCounterRepository;
+import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
+import 
org.apache.nifi.controller.repository.claim.StandardResourceClaimManager;
+import org.apache.nifi.controller.repository.metrics.RingBufferEventRepository;
+import org.apache.nifi.controller.service.ControllerServiceProvider;
+import org.apache.nifi.controller.service.StandardControllerServiceProvider;
+import org.apache.nifi.controller.state.manager.StandardStateManagerProvider;
+import org.apache.nifi.encrypt.StringEncryptor;
+import org.apache.nifi.events.EventReporter;
+import org.apache.nifi.events.VolatileBulletinRepository;
+import org.apache.nifi.extensions.ExtensionClient;
+import org.apache.nifi.extensions.ExtensionRepository;
+import org.apache.nifi.extensions.FileSystemExtensionRepository;
+import org.apache.nifi.extensions.NexusExtensionClient;
+import org.apache.nifi.groups.ProcessGroup;
+import org.apache.nifi.nar.ExtensionDiscoveringManager;
+import org.apache.nifi.nar.NarClassLoaders;
+import org.apache.nifi.nar.NarClassLoadersHolder;
+import org.apache.nifi.parameter.ParameterContextManager;
+import org.apache.nifi.parameter.StandardParameterContextManager;
+import org.apache.nifi.provenance.IdentifierLookup;
+import org.apache.nifi.provenance.ProvenanceRepository;
+import org.apache.nifi.provenance.VolatileProvenanceRepository;
+import org.apache.nifi.registry.VariableRegistry;
+import org.apache.nifi.registry.flow.FlowRegistryClient;
+import org.apache.nifi.registry.flow.InMemoryFlowRegistry;
+import org.apache.nifi.registry.flow.StandardFlowRegistryClient;
+import org.apache.nifi.registry.flow.VersionedFlowSnapshot;
+import org.apache.nifi.reporting.BulletinRepository;
+import org.apache.nifi.stateless.bootstrap.ExtensionDiscovery;
+import org.apache.nifi.stateless.config.ExtensionClientDefinition;
+import org.apache.nifi.stateless.config.ParameterOverride;
+import org.apache.nifi.stateless.config.SslConfigurationUtil;
+import org.apache.nifi.stateless.config.SslContextDefinition;
+import org.apache.nifi.stateless.config.StatelessConfigurationException;
+import org.apache.nifi.stateless.engine.CachingProcessContextFactory;
+import org.apache.nifi.stateless.engine.ProcessContextFactory;
+import org.apache.nifi.stateless.engine.StandardStatelessEngine;
+import org.apache.nifi.stateless.engine.StatelessAuthorizer;
+import org.apache.nifi.stateless.engine.StatelessEngine;
+import org.apache.nifi.stateless.engine.StatelessEngineConfiguration;
+import org.apache.nifi.stateless.engine.StatelessEngineInitializationContext;
+import org.apache.nifi.stateless.engine.StatelessFlowManager;
+import org.apache.nifi.stateless.engine.StatelessProcessContextFactory;
+import org.apache.nifi.stateless.engine.StatelessProcessScheduler;
+import org.apache.nifi.stateless.engine.StatelessProvenanceAuthorizableFactory;
+import org.apache.nifi.stateless.repository.ByteArrayContentRepository;
+import org.apache.nifi.stateless.repository.RepositoryContextFactory;
+import org.apache.nifi.stateless.repository.StatelessFlowFileRepository;
+import org.apache.nifi.stateless.repository.StatelessRepositoryContextFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.net.ssl.SSLContext;
+import java.io.File;
+import java.io.IOException;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.util.ArrayList;
+import java.util.List;
+
+public class StandardStatelessDataflowFactory implements 
StatelessDataflowFactory<VersionedFlowSnapshot> {
+    private static final Logger logger = 
LoggerFactory.getLogger(StandardStatelessDataflowFactory.class);
+
+    @Override
+    public StatelessDataflow createDataflow(final StatelessEngineConfiguration 
engineConfiguration, final DataflowDefinition<VersionedFlowSnapshot> 
dataflowDefinition,
+                                            final List<ParameterOverride> 
parameterOverrides) throws IOException, StatelessConfigurationException {
+        final long start = System.currentTimeMillis();
+
+        final VersionedFlowSnapshot flowSnapshot = 
dataflowDefinition.getFlowSnapshot();
+
+        ProvenanceRepository provenanceRepo = null;
+        ContentRepository contentRepo = null;
+        StatelessProcessScheduler processScheduler = null;
+        FlowFileRepository flowFileRepo = null;
+        FlowFileEventRepository flowFileEventRepo = null;
+
+        try {
+            final BulletinRepository bulletinRepository = new 
VolatileBulletinRepository();
+            final File workingDir = engineConfiguration.getWorkingDirectory();
+            if (!workingDir.exists() && !workingDir.mkdirs()) {
+                throw new IOException("Working Directory " + workingDir + " 
does not exist and could not be created");
+            }
+
+            final InMemoryFlowRegistry flowRegistry = new 
InMemoryFlowRegistry();
+            flowRegistry.addFlowSnapshot(flowSnapshot);
+            final FlowRegistryClient flowRegistryClient = new 
StandardFlowRegistryClient();
+            flowRegistryClient.addFlowRegistry(flowRegistry);
+
+            final File extensionsWorkingDir = new File(workingDir, 
"extensions");
+            final ClassLoader systemClassLoader = 
createSystemClassLoader(engineConfiguration.getNarDirectory());
+            final ExtensionDiscoveringManager extensionManager = 
ExtensionDiscovery.discover(extensionsWorkingDir, systemClassLoader);
+
+            flowFileEventRepo = new RingBufferEventRepository(5);
+
+            final StateProvider stateProvider = new HashMapStateProvider();
+            final StateManagerProvider stateManagerProvider = new 
StandardStateManagerProvider(stateProvider, stateProvider);
+
+            final ParameterContextManager parameterContextManager = new 
StandardParameterContextManager();
+            processScheduler = new StatelessProcessScheduler(extensionManager);
+            provenanceRepo = new VolatileProvenanceRepository(1_000, "", "");
+            provenanceRepo.initialize(EventReporter.NO_OP, new 
StatelessAuthorizer(), new StatelessProvenanceAuthorizableFactory(), 
IdentifierLookup.EMPTY);
+
+            final SSLContext sslContext;
+            try {
+                sslContext = 
SslConfigurationUtil.createSslContext(engineConfiguration.getSslContext());
+            } catch (StatelessConfigurationException e) {
+                throw new StatelessConfigurationException("Could not create 
SSLContext", e);
+            }
+
+            // Build Extension Repository
+            final NarClassLoaders narClassLoaders = 
NarClassLoadersHolder.getInstance();
+            final List<ExtensionClient> extensionClients = new ArrayList<>();
+            for (final ExtensionClientDefinition extensionClientDefinition : 
engineConfiguration.getExtensionClients()) {
+                final ExtensionClient extensionClient = 
createExtensionClient(extensionClientDefinition, 
engineConfiguration.getSslContext());
+                extensionClients.add(extensionClient);
+            }
+
+            final ExtensionRepository extensionRepository = new 
FileSystemExtensionRepository(extensionManager, 
engineConfiguration.getNarDirectory(), 
engineConfiguration.getWorkingDirectory(),
+                narClassLoaders, extensionClients);
+
+            final VariableRegistry variableRegistry = 
VariableRegistry.EMPTY_REGISTRY;
+            final StringEncryptor encryptor = 
StringEncryptor.createEncryptor("PBEWITHMD5AND256BITAES-CBC-OPENSSL", "BC", 
engineConfiguration.getSensitivePropsKey());
+
+            final File krb5File = engineConfiguration.getKrb5File();
+            final KerberosConfig kerberosConfig = new KerberosConfig(null, 
null, krb5File);
+            logger.info("Setting java.security.krb5.conf to {}", 
krb5File.getAbsolutePath());
+            System.setProperty("java.security.krb5.conf", 
krb5File.getAbsolutePath());
+
+            final StatelessEngine<VersionedFlowSnapshot> statelessEngine = new 
StandardStatelessEngine.Builder()
+                .bulletinRepository(bulletinRepository)
+                .encryptor(encryptor)
+                .extensionManager(extensionManager)
+                .flowRegistryClient(flowRegistryClient)
+                .stateManagerProvider(stateManagerProvider)
+                .variableRegistry(variableRegistry)
+                .processScheduler(processScheduler)
+                .kerberosConfiguration(kerberosConfig)
+                .flowFileEventRepository(flowFileEventRepo)
+                .provenanceRepository(provenanceRepo)
+                .extensionRepository(extensionRepository)
+                .build();
+
+            final StatelessFlowManager flowManager = new 
StatelessFlowManager(flowFileEventRepo, parameterContextManager, 
statelessEngine, () -> true, sslContext);
+            final ControllerServiceProvider controllerServiceProvider = new 
StandardControllerServiceProvider(processScheduler, bulletinRepository, 
flowManager, extensionManager);
+
+            final ProcessContextFactory rawProcessContextFactory = new 
StatelessProcessContextFactory(controllerServiceProvider, encryptor, 
stateManagerProvider);
+            final ProcessContextFactory processContextFactory = new 
CachingProcessContextFactory(rawProcessContextFactory);
+            contentRepo = new ByteArrayContentRepository();
+            flowFileRepo = new StatelessFlowFileRepository();
+            final CounterRepository counterRepo = new 
StandardCounterRepository();
+
+            final RepositoryContextFactory repositoryContextFactory = new 
StatelessRepositoryContextFactory(contentRepo, flowFileRepo, flowFileEventRepo, 
counterRepo, provenanceRepo);
+            final StatelessEngineInitializationContext 
statelessEngineInitializationContext = new 
StatelessEngineInitializationContext(controllerServiceProvider, flowManager, 
processContextFactory,
+                repositoryContextFactory);
+
+            processScheduler.initialize(processContextFactory);
+            statelessEngine.initialize(statelessEngineInitializationContext);
+
+            // Initialize components. This is generally needed because of the 
interdependencies between the components.
+            // There are some circular dependencies that are resolved by 
passing objects via initialization rather than by providing to the constructors.
+            final ResourceClaimManager resourceClaimManager = new 
StandardResourceClaimManager();
+            contentRepo.initialize(resourceClaimManager);
+            flowFileRepo.initialize(resourceClaimManager);
+            flowManager.initialize(controllerServiceProvider);
+
+            // Create flow
+            final ProcessGroup rootGroup = 
flowManager.createProcessGroup("root");
+            rootGroup.setName("root");
+            flowManager.setRootGroup(rootGroup);
+
+            final StatelessDataflow dataflow = 
statelessEngine.createFlow(dataflowDefinition, parameterOverrides);
+            final long millis = System.currentTimeMillis() - start;
+            logger.info("NiFi Stateless Engine and Dataflow created and 
initialized in {} millis", millis);
+
+            return dataflow;
+        } catch (final Exception e) {
+            try {
+                if (provenanceRepo != null) {
+                    provenanceRepo.close();
+                }
+            } catch (final IOException ioe) {
+                e.addSuppressed(ioe);
+            }
+
+            if (contentRepo != null) {
+                contentRepo.shutdown();
+            }
+
+            if (processScheduler != null) {
+                processScheduler.shutdown();
+            }
+
+            if (flowFileRepo != null) {
+                try {
+                    flowFileRepo.close();
+                } catch (final IOException ioe) {
+                    e.addSuppressed(ioe);
+                }
+            }
+
+            if (flowFileEventRepo != null) {
+                try {
+                    flowFileEventRepo.close();
+                } catch (final IOException ioe) {
+                    e.addSuppressed(ioe);
+                }
+            }
+
+            throw e;
+        }
+    }
+
+    private ExtensionClient createExtensionClient(final 
ExtensionClientDefinition definition, final SslContextDefinition 
sslContextDefinition) {
+        final String type = definition.getExtensionClientType();
+        if (!"nexus".equalsIgnoreCase(type.trim())) {
+            throw new IllegalArgumentException("Invalid Extension Client type: 
<" + definition.getExtensionClientType() +">. Currently, the only supported 
type is <nexus>");
+        }
+
+        final SslContextDefinition sslContext = (definition.isUseSslContext() 
&& sslContextDefinition != null) ? sslContextDefinition : null;
+        return new NexusExtensionClient(definition.getBaseUrl(), sslContext, 
definition.getCommsTimeout());
+    }
+
+    private ClassLoader createSystemClassLoader(final File narDirectory) 
throws StatelessConfigurationException {
+        final ClassLoader systemClassLoader = 
StatelessDataflowFactory.class.getClassLoader();
+        final int javaMajorVersion = getJavaMajorVersion();
+        if (javaMajorVersion >= 11) {
+            // If running on Java 11 or greater, add the 
JAXB/activation/annotation libs to the classpath.
+            // TODO: Once the minimum Java version requirement of NiFi is 11, 
this processing should be removed.
+            // JAXB/activation/annotation will be added as an actual 
dependency via pom.xml.
+            return createJava11OrLaterSystemClassLoader(javaMajorVersion, 
narDirectory, systemClassLoader);
+        }
+
+        return systemClassLoader;
+    }
+
+    private ClassLoader createJava11OrLaterSystemClassLoader(final int 
javaMajorVersion, final File narDirectory, final ClassLoader parentClassLoader) 
throws StatelessConfigurationException {
+        final List<URL> java11JarFileUrls = new ArrayList<>();
+
+        final File java11Dir = new File(narDirectory, "java11");

Review comment:
       This follows the same pattern as traditional NiFi (which is important 
since it can be run from `bin/nifi.sh stateless`). It's a fair enough argument, 
and perhaps something that should be done in a follow-on PR, but I think it's 
out of scope of this task.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to