markap14 commented on a change in pull request #4669: URL: https://github.com/apache/nifi/pull/4669#discussion_r526296217
########## File path: nifi-nar-bundles/nifi-framework-bundle/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/engine/StandardStatelessEngine.java ########## @@ -0,0 +1,550 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.stateless.engine; + +import org.apache.nifi.bundle.Bundle; +import org.apache.nifi.bundle.BundleCoordinate; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.state.StateManagerProvider; +import org.apache.nifi.components.validation.StandardValidationTrigger; +import org.apache.nifi.components.validation.ValidationTrigger; +import org.apache.nifi.controller.ProcessScheduler; +import org.apache.nifi.controller.ReloadComponent; +import org.apache.nifi.controller.ReportingTaskNode; +import org.apache.nifi.controller.flow.FlowManager; +import org.apache.nifi.controller.kerberos.KerberosConfig; +import org.apache.nifi.controller.repository.FlowFileEventRepository; +import org.apache.nifi.controller.service.ControllerServiceProvider; +import org.apache.nifi.encrypt.StringEncryptor; +import org.apache.nifi.engine.FlowEngine; +import org.apache.nifi.extensions.ExtensionRepository; +import org.apache.nifi.groups.ProcessGroup; +import org.apache.nifi.nar.ExtensionManager; +import org.apache.nifi.parameter.Parameter; +import org.apache.nifi.parameter.ParameterContext; +import org.apache.nifi.provenance.ProvenanceRepository; +import org.apache.nifi.registry.VariableRegistry; +import org.apache.nifi.registry.flow.FlowRegistryClient; +import org.apache.nifi.registry.flow.VersionedFlow; +import org.apache.nifi.registry.flow.VersionedFlowSnapshot; +import org.apache.nifi.registry.flow.VersionedProcessGroup; +import org.apache.nifi.reporting.BulletinRepository; +import org.apache.nifi.reporting.ReportingTask; +import org.apache.nifi.scheduling.SchedulingStrategy; +import org.apache.nifi.stateless.config.ParameterContextDefinition; +import org.apache.nifi.stateless.config.ParameterDefinition; +import org.apache.nifi.stateless.config.ParameterOverride; +import org.apache.nifi.stateless.config.ReportingTaskDefinition; +import org.apache.nifi.stateless.flow.DataflowDefinition; +import org.apache.nifi.stateless.flow.StandardStatelessFlow; +import org.apache.nifi.stateless.flow.StatelessDataflow; +import org.apache.nifi.stateless.repository.RepositoryContextFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.stream.Collectors; + +import static java.util.Objects.requireNonNull; + +public class StandardStatelessEngine implements StatelessEngine<VersionedFlowSnapshot> { + private static final Logger logger = LoggerFactory.getLogger(StandardStatelessEngine.class); + + // Member Variables injected via Builder + private final ExtensionManager extensionManager; + private final BulletinRepository bulletinRepository; + private final StateManagerProvider stateManagerProvider; + private final StringEncryptor encryptor; + private final FlowRegistryClient flowRegistryClient; + private final VariableRegistry rootVariableRegistry; + private final ProcessScheduler processScheduler; + private final KerberosConfig kerberosConfig; + private final FlowFileEventRepository flowFileEventRepository; + private final ProvenanceRepository provenanceRepository; + private final ExtensionRepository extensionRepository; + + // Member Variables created/managed internally + private final ReloadComponent reloadComponent; + private final ValidationTrigger validationTrigger; + + // Member Variables injected via initialization. Effectively final. + private FlowManager flowManager; + private ControllerServiceProvider controllerServiceProvider; + private ProcessContextFactory processContextFactory; + private RepositoryContextFactory repositoryContextFactory; + private boolean initialized = false; + + + private StandardStatelessEngine(final Builder builder) { + this.extensionManager = requireNonNull(builder.extensionManager, "Extension Manager must be provided"); + this.bulletinRepository = requireNonNull(builder.bulletinRepository, "Bulletin Repository must be provided"); + this.stateManagerProvider = requireNonNull(builder.stateManagerProvider, "State Manager Provider must be provided"); + this.encryptor = requireNonNull(builder.encryptor, "Encryptor must be provided"); + this.flowRegistryClient = requireNonNull(builder.flowRegistryClient, "Flow Registry Client must be provided"); + this.rootVariableRegistry = requireNonNull(builder.variableRegistry, "Variable Registry must be provided"); + this.processScheduler = requireNonNull(builder.processScheduler, "Process Scheduler must be provided"); + this.kerberosConfig = requireNonNull(builder.kerberosConfig, "Kerberos Configuration must be provided"); + this.flowFileEventRepository = requireNonNull(builder.flowFileEventRepository, "FlowFile Event Repository must be provided"); + this.provenanceRepository = requireNonNull(builder.provenanceRepository, "Provenance Repository must be provided"); + this.extensionRepository = requireNonNull(builder.extensionRepository, "Extension Repository must be provided"); + + this.reloadComponent = new StatelessReloadComponent(); + this.validationTrigger = new StandardValidationTrigger(new FlowEngine(1, "Component Validation", true), () -> true); + } + + @Override + public void initialize(final StatelessEngineInitializationContext initContext) { + this.flowManager = initContext.getFlowManager(); + this.controllerServiceProvider = initContext.getControllerServiceProvider(); + this.processContextFactory = initContext.getProcessContextFactory(); + this.repositoryContextFactory = initContext.getRepositoryContextFactory(); + this.initialized = true; + } + + @Override + public StatelessDataflow createFlow(final DataflowDefinition<VersionedFlowSnapshot> dataflowDefinition, final List<ParameterOverride> parameterOverrides) { + if (!this.initialized) { + throw new IllegalStateException("Cannot create Flow without first initializing Stateless Engine"); + } + + final VersionedFlow versionedFlow = dataflowDefinition.getFlowSnapshot().getFlow(); + logger.info("Building Dataflow {}", versionedFlow == null ? "" : versionedFlow.getName()); + + loadNecessaryExtensions(dataflowDefinition); + + extensionManager.logClassLoaderDetails(); + + // Create a child group and add it to the root group. We do this, rather than interacting with the root group directly + // because the flow may well have Local Input/Output ports, and those are not allowed on the Root Group. + final ProcessGroup rootGroup = flowManager.getRootGroup(); + final ProcessGroup childGroup = flowManager.createProcessGroup("stateless-flow"); + childGroup.setName("Stateless Flow"); + rootGroup.addProcessGroup(childGroup); + + childGroup.updateFlow(dataflowDefinition.getFlowSnapshot(), "stateless-component-id-seed", false, true, true); + + // Map existing parameter contexts by name + final Set<ParameterContext> parameterContexts = flowManager.getParameterContextManager().getParameterContexts(); + final Map<String, ParameterContext> parameterContextMap = parameterContexts.stream() + .collect(Collectors.toMap(ParameterContext::getName, context -> context)); + + // Update Parameters to match those that are provided in the flow configuration, plus those overrides provided + final List<ParameterContextDefinition> parameterContextDefinitions = dataflowDefinition.getParameterContexts(); + if (parameterContextDefinitions != null) { + parameterContextDefinitions.forEach(contextDefinition -> registerParameterContext(contextDefinition, parameterContextMap)); + } + + overrideParameters(parameterContextMap, parameterOverrides); + + final List<ReportingTaskNode> reportingTaskNodes = createReportingTasks(dataflowDefinition); + final StandardStatelessFlow dataflow = new StandardStatelessFlow(childGroup, reportingTaskNodes, controllerServiceProvider, processContextFactory, + repositoryContextFactory, dataflowDefinition); + dataflow.initialize(processScheduler); + return dataflow; + } + + private void loadNecessaryExtensions(final DataflowDefinition<VersionedFlowSnapshot> dataflowDefinition) { + final VersionedProcessGroup group = dataflowDefinition.getFlowSnapshot().getFlowContents(); + final Set<BundleCoordinate> requiredBundles = gatherRequiredBundles(group); + + for (final ReportingTaskDefinition reportingTaskDefinition : dataflowDefinition.getReportingTaskDefinitions()) { + final BundleCoordinate coordinate = parseBundleCoordinate(reportingTaskDefinition); + if (coordinate == null) { + continue; + } + + requiredBundles.add(coordinate); + } + + final int concurrentDownloads = 4; Review comment: Fair enough. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected]
