This is an automated email from the ASF dual-hosted git repository. markap14 pushed a commit to branch support/nifi-1.x in repository https://gitbox.apache.org/repos/asf/nifi.git
commit 7f1dd9c9600f41f94b0aabf9f789bd610d93f553 Author: David Young <[email protected]> AuthorDate: Fri May 12 14:38:35 2023 +0000 NIFI-11471: Define new stateless configuration points Add two new properties: nifi.stateless.component.enableTimeout nifi.stateless.processor.startTimeout to allow configuring the StatelessEngine and ProcessScheduler. This allows an operator to configure what kind of startup time the flow can tolerate. Previously these values were hard coded. --- .../config/PropertiesFileEngineConfigurationParser.java | 16 ++++++++++++++++ .../stateless/engine/StatelessEngineConfiguration.java | 16 ++++++++++++++++ .../controller/scheduling/StatelessProcessScheduler.java | 9 ++++++--- .../nifi/stateless/engine/StandardStatelessEngine.java | 10 +++++++++- .../stateless/flow/StandardStatelessDataflowFactory.java | 7 ++++++- .../nifi/stateless/flow/StandardStatelessFlow.java | 13 ++++++++----- 6 files changed, 61 insertions(+), 10 deletions(-) diff --git a/nifi-stateless/nifi-stateless-api/src/main/java/org/apache/nifi/stateless/config/PropertiesFileEngineConfigurationParser.java b/nifi-stateless/nifi-stateless-api/src/main/java/org/apache/nifi/stateless/config/PropertiesFileEngineConfigurationParser.java index 499e6d8a53..65ccd00384 100644 --- a/nifi-stateless/nifi-stateless-api/src/main/java/org/apache/nifi/stateless/config/PropertiesFileEngineConfigurationParser.java +++ b/nifi-stateless/nifi-stateless-api/src/main/java/org/apache/nifi/stateless/config/PropertiesFileEngineConfigurationParser.java @@ -53,6 +53,9 @@ public class PropertiesFileEngineConfigurationParser { private static final String CONTENT_REPO_DIRECTORY = PREFIX + "content.repository.directory"; private static final String STATUS_TASK_INTERVAL = PREFIX + "status.task.interval"; + private static final String COMPONENT_ENABLE_TIMEOUT = PREFIX + "component.enableTimeout"; + private static final String PROCESSOR_START_TIMEOUT = PREFIX + "processor.startTimeout"; + private static final String TRUSTSTORE_FILE = PREFIX + "security.truststore"; private static final String TRUSTSTORE_TYPE = PREFIX + "security.truststoreType"; private static final String TRUSTSTORE_PASSWORD = PREFIX + "security.truststorePasswd"; @@ -111,6 +114,9 @@ public class PropertiesFileEngineConfigurationParser { final String statusTaskInterval = properties.getProperty(STATUS_TASK_INTERVAL, "1 min"); + final String processorStartTimeout = properties.getProperty(PROCESSOR_START_TIMEOUT, "10 secs"); + final String componentEnableTimeout = properties.getProperty(COMPONENT_ENABLE_TIMEOUT, "10 secs"); + return new StatelessEngineConfiguration() { @Override public File getWorkingDirectory() { @@ -161,6 +167,16 @@ public class PropertiesFileEngineConfigurationParser { public String getStatusTaskInterval() { return statusTaskInterval; } + + @Override + public String getProcessorStartTimeout() { + return processorStartTimeout; + } + + @Override + public String getComponentEnableTimeout() { + return componentEnableTimeout; + } }; } diff --git a/nifi-stateless/nifi-stateless-api/src/main/java/org/apache/nifi/stateless/engine/StatelessEngineConfiguration.java b/nifi-stateless/nifi-stateless-api/src/main/java/org/apache/nifi/stateless/engine/StatelessEngineConfiguration.java index 1a0bd4dd71..e15cfc18ef 100644 --- a/nifi-stateless/nifi-stateless-api/src/main/java/org/apache/nifi/stateless/engine/StatelessEngineConfiguration.java +++ b/nifi-stateless/nifi-stateless-api/src/main/java/org/apache/nifi/stateless/engine/StatelessEngineConfiguration.java @@ -89,4 +89,20 @@ public interface StatelessEngineConfiguration { * A <code>null</code> value indicates that no status tasks are scheduled. */ String getStatusTaskInterval(); + + /** + * @return a String representing the length of time that the process scheduler should wait for a process to start + * Defaults to "10 secs" + */ + default String getProcessorStartTimeout() { + return "10 secs"; + } + + /** + * @return a String representing the length of time that the StatelessEngine should wait for a component to enable + * Defaults to "10 secs" + */ + default String getComponentEnableTimeout() { + return "10 sec"; + } } diff --git a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/controller/scheduling/StatelessProcessScheduler.java b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/controller/scheduling/StatelessProcessScheduler.java index faf6e4ee9a..813afff3df 100644 --- a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/controller/scheduling/StatelessProcessScheduler.java +++ b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/controller/scheduling/StatelessProcessScheduler.java @@ -48,6 +48,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.lang.reflect.InvocationTargetException; +import java.time.Duration; import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.CompletableFuture; @@ -63,7 +64,6 @@ import java.util.function.Supplier; public class StatelessProcessScheduler implements ProcessScheduler { private static final Logger logger = LoggerFactory.getLogger(StatelessProcessScheduler.class); private static final int ADMINISTRATIVE_YIELD_MILLIS = 1000; - private static final int PROCESSOR_START_TIMEOUT_MILLIS = 10_000; private final SchedulingAgent schedulingAgent; private final ExtensionManager extensionManager; @@ -72,8 +72,11 @@ public class StatelessProcessScheduler implements ProcessScheduler { private ScheduledExecutorService componentMonitoringThreadPool; private ProcessContextFactory processContextFactory; - public StatelessProcessScheduler(final ExtensionManager extensionManager) { + private final long processorStartTimeoutMillis; + + public StatelessProcessScheduler(final ExtensionManager extensionManager, final Duration processorStartTimeout) { this.extensionManager = extensionManager; + this.processorStartTimeoutMillis = processorStartTimeout.toMillis(); schedulingAgent = new StatelessSchedulingAgent(extensionManager); } @@ -136,7 +139,7 @@ public class StatelessProcessScheduler implements ProcessScheduler { logger.info("Starting {}", procNode); final Supplier<ProcessContext> processContextSupplier = () -> processContextFactory.createProcessContext(procNode); - procNode.start(componentMonitoringThreadPool, ADMINISTRATIVE_YIELD_MILLIS, PROCESSOR_START_TIMEOUT_MILLIS, processContextSupplier, callback, failIfStopping); + procNode.start(componentMonitoringThreadPool, ADMINISTRATIVE_YIELD_MILLIS, this.processorStartTimeoutMillis, processContextSupplier, callback, failIfStopping); return future; } diff --git a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/engine/StandardStatelessEngine.java b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/engine/StandardStatelessEngine.java index 02ce452dd6..f2bfe631ab 100644 --- a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/engine/StandardStatelessEngine.java +++ b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/engine/StandardStatelessEngine.java @@ -114,6 +114,7 @@ public class StandardStatelessEngine implements StatelessEngine { private final ExtensionRepository extensionRepository; private final CounterRepository counterRepository; private final Duration statusTaskInterval; + private final Duration componentEnableTimeout; // Member Variables created/managed internally private final ReloadComponent reloadComponent; @@ -139,6 +140,7 @@ public class StandardStatelessEngine implements StatelessEngine { this.extensionRepository = requireNonNull(builder.extensionRepository, "Extension Repository must be provided"); this.counterRepository = requireNonNull(builder.counterRepository, "Counter Repository must be provided"); this.statusTaskInterval = parseDuration(builder.statusTaskInterval); + this.componentEnableTimeout = parseDuration(builder.componentEnableTimeout); this.reloadComponent = new StatelessReloadComponent(this); this.validationTrigger = new StandardValidationTrigger(new FlowEngine(1, "Component Validation", true), () -> true); @@ -192,7 +194,7 @@ public class StandardStatelessEngine implements StatelessEngine { final List<ReportingTaskNode> reportingTaskNodes = createReportingTasks(dataflowDefinition); final StandardStatelessFlow dataflow = new StandardStatelessFlow(childGroup, reportingTaskNodes, controllerServiceProvider, processContextFactory, - repositoryContextFactory, dataflowDefinition, stateManagerProvider, processScheduler, bulletinRepository); + repositoryContextFactory, dataflowDefinition, stateManagerProvider, processScheduler, bulletinRepository, componentEnableTimeout); if (statusTaskInterval != null) { final LogComponentStatuses logComponentStatuses = new LogComponentStatuses(flowFileEventRepository, counterRepository, flowManager); @@ -672,6 +674,7 @@ public class StandardStatelessEngine implements StatelessEngine { private ExtensionRepository extensionRepository = null; private CounterRepository counterRepository = null; private String statusTaskInterval = null; + private String componentEnableTimeout = null; public Builder extensionManager(final ExtensionManager extensionManager) { this.extensionManager = extensionManager; @@ -733,6 +736,11 @@ public class StandardStatelessEngine implements StatelessEngine { return this; } + public Builder componentEnableTimeout(final String componentEnableTimeout) { + this.componentEnableTimeout = componentEnableTimeout; + return this; + } + public StandardStatelessEngine build() { return new StandardStatelessEngine(this); } diff --git a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/flow/StandardStatelessDataflowFactory.java b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/flow/StandardStatelessDataflowFactory.java index 697c033168..ecf4c7a857 100644 --- a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/flow/StandardStatelessDataflowFactory.java +++ b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/flow/StandardStatelessDataflowFactory.java @@ -73,6 +73,7 @@ import org.apache.nifi.stateless.repository.StatelessFileSystemContentRepository import org.apache.nifi.stateless.repository.StatelessFlowFileRepository; import org.apache.nifi.stateless.repository.StatelessProvenanceRepository; import org.apache.nifi.stateless.repository.StatelessRepositoryContextFactory; +import org.apache.nifi.util.FormatUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -81,10 +82,12 @@ import java.io.File; import java.io.IOException; import java.net.URL; import java.net.URLClassLoader; +import java.time.Duration; import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Optional; +import java.util.concurrent.TimeUnit; public class StandardStatelessDataflowFactory implements StatelessDataflowFactory { private static final Logger logger = LoggerFactory.getLogger(StandardStatelessDataflowFactory.class); @@ -119,7 +122,8 @@ public class StandardStatelessDataflowFactory implements StatelessDataflowFactor final StatelessStateManagerProvider stateManagerProvider = new StatelessStateManagerProvider(); final ParameterContextManager parameterContextManager = new StandardParameterContextManager(); - processScheduler = new StatelessProcessScheduler(extensionManager); + final Duration processorStartTimeoutDuration = Duration.ofSeconds((long) FormatUtils.getPreciseTimeDuration(engineConfiguration.getProcessorStartTimeout(), TimeUnit.SECONDS)); + processScheduler = new StatelessProcessScheduler(extensionManager, processorStartTimeoutDuration); provenanceRepo = new StatelessProvenanceRepository(1_000); provenanceRepo.initialize(EventReporter.NO_OP, new StatelessAuthorizer(), new StatelessProvenanceAuthorizableFactory(), IdentifierLookup.EMPTY); @@ -188,6 +192,7 @@ public class StandardStatelessDataflowFactory implements StatelessDataflowFactor .extensionRepository(extensionRepository) .counterRepository(counterRepo) .statusTaskInterval(engineConfiguration.getStatusTaskInterval()) + .componentEnableTimeout(engineConfiguration.getComponentEnableTimeout()) .build(); final StatelessFlowManager flowManager = new StatelessFlowManager(flowFileEventRepo, parameterContextManager, statelessEngine, () -> true, sslContext, bulletinRepository); diff --git a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/flow/StandardStatelessFlow.java b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/flow/StandardStatelessFlow.java index d18ef4407f..1cbff1f88a 100644 --- a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/flow/StandardStatelessFlow.java +++ b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/flow/StandardStatelessFlow.java @@ -75,6 +75,7 @@ import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.InputStream; import java.text.NumberFormat; +import java.time.Duration; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -97,7 +98,6 @@ import java.util.stream.Collectors; public class StandardStatelessFlow implements StatelessDataflow { private static final Logger logger = LoggerFactory.getLogger(StandardStatelessFlow.class); - private static final long COMPONENT_ENABLE_TIMEOUT_MILLIS = TimeUnit.SECONDS.toMillis(10); private static final long TEN_MILLIS_IN_NANOS = TimeUnit.MILLISECONDS.toNanos(10); private static final String PARENT_FLOW_GROUP_ID = "stateless-flow"; @@ -117,6 +117,7 @@ public class StandardStatelessFlow implements StatelessDataflow { private final TransactionThresholdMeter transactionThresholdMeter; private final List<BackgroundTask> backgroundTasks = new ArrayList<>(); private final BulletinRepository bulletinRepository; + private final long componentEnableTimeoutMillis; private volatile ExecutorService runDataflowExecutor; private volatile ScheduledExecutorService backgroundTaskExecutor; @@ -125,7 +126,8 @@ public class StandardStatelessFlow implements StatelessDataflow { public StandardStatelessFlow(final ProcessGroup rootGroup, final List<ReportingTaskNode> reportingTasks, final ControllerServiceProvider controllerServiceProvider, final ProcessContextFactory processContextFactory, final RepositoryContextFactory repositoryContextFactory, final DataflowDefinition dataflowDefinition, - final StatelessStateManagerProvider stateManagerProvider, final ProcessScheduler processScheduler, final BulletinRepository bulletinRepository) { + final StatelessStateManagerProvider stateManagerProvider, final ProcessScheduler processScheduler, final BulletinRepository bulletinRepository, + final Duration componentEnableTimeout) { this.rootGroup = rootGroup; this.allConnections = rootGroup.findAllConnections(); this.reportingTasks = reportingTasks; @@ -137,6 +139,7 @@ public class StandardStatelessFlow implements StatelessDataflow { this.processScheduler = processScheduler; this.transactionThresholdMeter = new TransactionThresholdMeter(dataflowDefinition.getTransactionThresholds()); this.bulletinRepository = bulletinRepository; + this.componentEnableTimeoutMillis = componentEnableTimeout.toMillis(); rootConnectables = new HashSet<>(); @@ -286,7 +289,7 @@ public class StandardStatelessFlow implements StatelessDataflow { private void waitForServicesEnabled(final ProcessGroup group) { final long startTime = System.currentTimeMillis(); - final long cutoff = startTime + COMPONENT_ENABLE_TIMEOUT_MILLIS; + final long cutoff = startTime + this.componentEnableTimeoutMillis; final Set<ControllerServiceNode> serviceNodes = group.findAllControllerServices(); for (final ControllerServiceNode serviceNode : serviceNodes) { @@ -386,7 +389,7 @@ public class StandardStatelessFlow implements StatelessDataflow { final Future<?> future = controllerServiceProvider.enableControllerServiceAndDependencies(serviceNode); try { - future.get(COMPONENT_ENABLE_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS); + future.get(this.componentEnableTimeoutMillis, TimeUnit.MILLISECONDS); } catch (final Exception e) { throw new IllegalStateException("Controller Service " + serviceNode + " has not fully enabled. Current Validation Status is " + serviceNode.getValidationStatus() + " with validation Errors: " + serviceNode.getValidationErrors(), e); @@ -411,7 +414,7 @@ public class StandardStatelessFlow implements StatelessDataflow { final long start = System.currentTimeMillis(); try { - future.get(COMPONENT_ENABLE_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS); + future.get(this.componentEnableTimeoutMillis, TimeUnit.MILLISECONDS); } catch (final Exception e) { final String validationErrors = performValidation().toString(); throw new IllegalStateException("Processor " + processor + " has not fully enabled. Current Validation Status is "
