exceptionfactory commented on code in PR #70: URL: https://github.com/apache/nifi-api/pull/70#discussion_r2867814589
########## src/main/java/org/apache/nifi/components/connector/components/ProcessGroupLifecycle.java: ########## @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.components.connector.components; + +import java.util.Collection; +import java.util.concurrent.CompletableFuture; + +public interface ProcessGroupLifecycle { + + CompletableFuture<Void> enableControllerServices(ControllerServiceReferenceScope scope, ControllerServiceReferenceHierarchy hierarchy); + + CompletableFuture<Void> enableControllerServices(Collection<String> serviceIdentifiers); + + CompletableFuture<Void> disableControllerServices(ControllerServiceReferenceHierarchy hierarchy); + + CompletableFuture<Void> disableControllerServices(Collection<String> serviceIdentifiers); + + CompletableFuture<Void> startProcessors(boolean recursive); + + CompletableFuture<Void> start(ControllerServiceReferenceScope serviceReferenceScope); + + CompletableFuture<Void> stop(); + + CompletableFuture<Void> stopProcessors(boolean recursive); + + CompletableFuture<Void> startPorts(boolean recursive); + + CompletableFuture<Void> stopPorts(boolean recursive); + + CompletableFuture<Void> startRemoteProcessGroups(boolean recursive); + + CompletableFuture<Void> stopRemoteProcessGroups(boolean recursive); + + CompletableFuture<Void> startStatelessGroups(boolean recursive); + + CompletableFuture<Void> stopStatelessGroups(boolean recursive); Review Comment: Is it worth considering an `enum` instead of a `boolean` for these `recursive` values? ########## src/main/java/org/apache/nifi/components/connector/AbstractConnector.java: ########## @@ -0,0 +1,744 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.components.connector; + +import org.apache.nifi.components.ConfigVerificationResult; +import org.apache.nifi.components.DescribedValue; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.components.connector.components.ConnectionFacade; +import org.apache.nifi.components.connector.components.ControllerServiceFacade; +import org.apache.nifi.components.connector.components.ControllerServiceReferenceHierarchy; +import org.apache.nifi.components.connector.components.ControllerServiceReferenceScope; +import org.apache.nifi.components.connector.components.FlowContext; +import org.apache.nifi.components.connector.components.ProcessGroupFacade; +import org.apache.nifi.components.connector.components.ProcessGroupLifecycle; +import org.apache.nifi.components.connector.components.ProcessorFacade; +import org.apache.nifi.components.connector.components.ProcessorState; +import org.apache.nifi.controller.queue.QueueSize; +import org.apache.nifi.flow.VersionedConnection; +import org.apache.nifi.logging.ComponentLog; + +import java.text.NumberFormat; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.function.Consumer; +import java.util.function.Function; +import java.util.function.Predicate; +import java.util.stream.Collectors; + +public abstract class AbstractConnector implements Connector { + private volatile ConnectorInitializationContext initializationContext; + private volatile ComponentLog logger; + private volatile CompletableFuture<Void> prepareUpdateFuture; + private String description; // effectively final + + protected abstract void onStepConfigured(final String stepName, final FlowContext workingContext) throws FlowUpdateException; + + + @Override + public final void initialize(final ConnectorInitializationContext context) { + this.initializationContext = context; + this.logger = context.getLogger(); + this.description = getClass().getSimpleName() + "[id=" + context.getIdentifier() + "]"; + + init(); + } + + /** + * No-op method for subclasses to override to perform any initialization logic + */ + protected void init() { + } + + protected final ComponentLog getLogger() { + return logger; + } + + protected final ConnectorInitializationContext getInitializationContext() { + if (initializationContext == null) { + throw new IllegalStateException("Connector has not been initialized"); + } + + return initializationContext; + } + + @Override + public void start(final FlowContext context) throws FlowUpdateException { + final ProcessGroupLifecycle lifecycle = context.getRootGroup().getLifecycle(); + final CompletableFuture<Void> enableServicesFuture = lifecycle.enableControllerServices( + ControllerServiceReferenceScope.INCLUDE_REFERENCED_SERVICES_ONLY, + ControllerServiceReferenceHierarchy.INCLUDE_CHILD_GROUPS); + + try { + enableServicesFuture.get(); + } catch (final Exception e) { + lifecycle.disableControllerServices(ControllerServiceReferenceHierarchy.INCLUDE_CHILD_GROUPS); + throw new FlowUpdateException("Failed to enable Controller Services while starting Connector", e); + } + + try { + lifecycle.startProcessors(true).get(); + lifecycle.startPorts(true).get(); + lifecycle.startStatelessGroups(true).get(); + lifecycle.startRemoteProcessGroups(true).get(); + } catch (final Exception e) { + logger.error("Failed to start components for {}", this, e); + try { + stop(context); + } catch (final Exception stopException) { + e.addSuppressed(new FlowUpdateException("Failed to stop Connector cleanly", stopException)); + } + } + } + + @Override + public void stop(final FlowContext context) throws FlowUpdateException { + try { + stopAsync(context).get(); + } catch (final Exception e) { + throw new FlowUpdateException("Failed to stop Connector", e); + } + } + + private CompletableFuture<Void> stopAsync(final FlowContext context) { + final CompletableFuture<Void> result = new CompletableFuture<>(); + + Thread.startVirtualThread(() -> { + try { + final ProcessGroupFacade rootGroup = context.getRootGroup(); + final ProcessGroupLifecycle lifecycle = rootGroup.getLifecycle(); + + try { + lifecycle.stopProcessors(true).get(1, TimeUnit.MINUTES); + } catch (final TimeoutException e) { + final List<ProcessorFacade> running = findProcessors(rootGroup, processor -> + processor.getLifecycle().getState() != ProcessorState.STOPPED && processor.getLifecycle().getState() != ProcessorState.DISABLED); + + if (!running.isEmpty()) { + getLogger().warn("After waiting 60 seconds for all Processors to stop, {} are still running. Terminating now.", running.size()); + running.forEach(processor -> processor.getLifecycle().terminate()); + } + } catch (final ExecutionException e) { + throw new RuntimeException("Failed to stop all Processors", e.getCause()); + } + + lifecycle.stopPorts(true).get(1, TimeUnit.MINUTES); + lifecycle.stopRemoteProcessGroups(true).get(1, TimeUnit.MINUTES); + lifecycle.stopStatelessGroups(true).get(2, TimeUnit.MINUTES); + lifecycle.disableControllerServices(ControllerServiceReferenceHierarchy.INCLUDE_CHILD_GROUPS).get(2, TimeUnit.MINUTES); + + result.complete(null); + } catch (final Exception e) { + result.completeExceptionally(e); + } + }); + + return result; + } + + @Override + public void prepareForUpdate(final FlowContext workingContext, final FlowContext activeContext) throws FlowUpdateException { + final CompletableFuture<Void> future = stopAsync(activeContext); + prepareUpdateFuture = future; + + try { + future.get(); + } catch (final Exception e) { + throw new FlowUpdateException("Failed to prepare Connector for update", e); + } + } + + /** + * Drains all FlowFiles from the Connector instance. + * + * @param flowContext the FlowContext to use for drainage + * @return a CompletableFuture that will be completed when drainage is complete + */ + @Override + public CompletableFuture<Void> drainFlowFiles(final FlowContext flowContext) { + final CompletableFuture<Void> result = new CompletableFuture<>(); + final QueueSize initialQueueSize = flowContext.getRootGroup().getQueueSize(); + if (initialQueueSize.getObjectCount() == 0) { + getLogger().debug("No FlowFiles to drain from Connector"); + result.complete(null); + return result; + } + + getLogger().info("Draining {} FlowFiles ({} bytes) from Connector", + initialQueueSize.getObjectCount(), NumberFormat.getNumberInstance().format(initialQueueSize.getByteCount())); + + Thread.startVirtualThread(() -> { + try { + stopSourceComponents(flowContext).get(); + startNonSourceComponents(result, flowContext); + + if (!result.isDone()) { + completeDrain(result, flowContext, initialQueueSize); + } + } catch (final Exception e) { + if (!result.isDone()) { + result.completeExceptionally(new RuntimeException("Failed while draining FlowFiles", e)); + } + } + }); + + return result; + } + + private void completeDrain(final CompletableFuture<Void> result, final FlowContext flowContext, final QueueSize initialQueueSize) { + try { + ensureDrainageUnblocked(); + } catch (final Exception e) { + getLogger().warn("Failed to ensure drainage is unblocked when draining FlowFiles", e); + } + + Exception failureReason = null; + int iterations = 0; + while (!isGroupDrained(flowContext.getRootGroup())) { + if (result.isDone()) { + getLogger().info("Drainage has been cancelled; will no longer wait for FlowFiles to drain"); Review Comment: Recommend shortening the log message to make it more concise. ```suggestion getLogger().info("Drain cancelled: no longer waiting for FlowFiles to drain"); ``` ########## src/main/java/org/apache/nifi/components/connector/components/FlowContextType.java: ########## @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.components.connector.components; + +public enum FlowContextType { + + ACTIVE, + + WORKING; Review Comment: It might be helpful to add a comment on these elements, since `ACTIVE` and `WORKING` sound somewhat similar. Would `RUNNING` or `PROCESSING` be better for `WORKING`? ########## src/main/java/org/apache/nifi/components/connector/ConnectorPropertyDescriptor.java: ########## @@ -0,0 +1,492 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.components.connector; + +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.DescribedValue; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.components.Validator; + +import java.io.File; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.EnumSet; +import java.util.HashSet; +import java.util.List; +import java.util.Objects; +import java.util.Set; +import java.util.regex.Pattern; +import java.util.stream.Collectors; + +public final class ConnectorPropertyDescriptor { + private static final Pattern INTEGER_PATTERN = Pattern.compile("^-?\\d+$"); + private static final Pattern DOUBLE_PATTERN = Pattern.compile("^-?\\d+(\\.\\d+)?$"); + private static final Pattern BOOLEAN_PATTERN = Pattern.compile("^(?i)(true|false)$"); + + private final String name; + private final String description; + private final String defaultValue; + private final boolean required; + private final PropertyType type; + private final List<DescribedValue> allowableValues; + private final boolean allowableValuesFetchable; + private final List<Validator> validators; + private final Set<ConnectorPropertyDependency> dependencies; + + private ConnectorPropertyDescriptor(final Builder builder) { + this.name = builder.name; + this.description = builder.description; + this.defaultValue = builder.defaultValue; + this.required = builder.required; + this.type = builder.type; + this.allowableValues = builder.allowableValues == null ? null : Collections.unmodifiableList(builder.allowableValues); + this.allowableValuesFetchable = builder.allowableValuesFetchable; + this.validators = List.copyOf(builder.validators); + this.dependencies = Collections.unmodifiableSet(builder.dependencies); + } + + public String getName() { + return name; + } + + public String getDescription() { + return description; + } + + public String getDefaultValue() { + return defaultValue; + } + + public boolean isRequired() { + return required; + } + + public PropertyType getType() { + return type; + } + + public List<DescribedValue> getAllowableValues() { + return allowableValues; + } + + public boolean isAllowableValuesFetchable() { + return allowableValuesFetchable; + } + + public Set<ConnectorPropertyDependency> getDependencies() { + return dependencies; + } + + public List<Validator> getValidators() { + return validators; + } + + public ValidationResult validate(final String stepName, final String groupName, final String value, final ConnectorValidationContext validationContext) { + final List<DescribedValue> fetchedAllowableValues; + if (isAllowableValuesFetchable()) { + try { + fetchedAllowableValues = validationContext.fetchAllowableValues(stepName, getName()); + } catch (final Exception e) { + return new ValidationResult.Builder() + .subject(name) + .input(value) + .valid(false) + .explanation("Failed to fetch allowable values: " + e.getMessage()) Review Comment: Is it worth logging the full exception stack trace, or at least including the exception class in the explanation? ########## src/main/java/org/apache/nifi/components/connector/AbstractConnector.java: ########## @@ -0,0 +1,744 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.components.connector; + +import org.apache.nifi.components.ConfigVerificationResult; +import org.apache.nifi.components.DescribedValue; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.components.connector.components.ConnectionFacade; +import org.apache.nifi.components.connector.components.ControllerServiceFacade; +import org.apache.nifi.components.connector.components.ControllerServiceReferenceHierarchy; +import org.apache.nifi.components.connector.components.ControllerServiceReferenceScope; +import org.apache.nifi.components.connector.components.FlowContext; +import org.apache.nifi.components.connector.components.ProcessGroupFacade; +import org.apache.nifi.components.connector.components.ProcessGroupLifecycle; +import org.apache.nifi.components.connector.components.ProcessorFacade; +import org.apache.nifi.components.connector.components.ProcessorState; +import org.apache.nifi.controller.queue.QueueSize; +import org.apache.nifi.flow.VersionedConnection; +import org.apache.nifi.logging.ComponentLog; + +import java.text.NumberFormat; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.function.Consumer; +import java.util.function.Function; +import java.util.function.Predicate; +import java.util.stream.Collectors; + +public abstract class AbstractConnector implements Connector { + private volatile ConnectorInitializationContext initializationContext; + private volatile ComponentLog logger; + private volatile CompletableFuture<Void> prepareUpdateFuture; + private String description; // effectively final + + protected abstract void onStepConfigured(final String stepName, final FlowContext workingContext) throws FlowUpdateException; + + + @Override + public final void initialize(final ConnectorInitializationContext context) { + this.initializationContext = context; + this.logger = context.getLogger(); + this.description = getClass().getSimpleName() + "[id=" + context.getIdentifier() + "]"; + + init(); + } + + /** + * No-op method for subclasses to override to perform any initialization logic + */ + protected void init() { + } + + protected final ComponentLog getLogger() { + return logger; + } + + protected final ConnectorInitializationContext getInitializationContext() { + if (initializationContext == null) { + throw new IllegalStateException("Connector has not been initialized"); + } + + return initializationContext; + } + + @Override + public void start(final FlowContext context) throws FlowUpdateException { + final ProcessGroupLifecycle lifecycle = context.getRootGroup().getLifecycle(); + final CompletableFuture<Void> enableServicesFuture = lifecycle.enableControllerServices( + ControllerServiceReferenceScope.INCLUDE_REFERENCED_SERVICES_ONLY, + ControllerServiceReferenceHierarchy.INCLUDE_CHILD_GROUPS); + + try { + enableServicesFuture.get(); + } catch (final Exception e) { + lifecycle.disableControllerServices(ControllerServiceReferenceHierarchy.INCLUDE_CHILD_GROUPS); + throw new FlowUpdateException("Failed to enable Controller Services while starting Connector", e); + } + + try { + lifecycle.startProcessors(true).get(); + lifecycle.startPorts(true).get(); + lifecycle.startStatelessGroups(true).get(); + lifecycle.startRemoteProcessGroups(true).get(); + } catch (final Exception e) { + logger.error("Failed to start components for {}", this, e); + try { + stop(context); + } catch (final Exception stopException) { + e.addSuppressed(new FlowUpdateException("Failed to stop Connector cleanly", stopException)); + } + } + } + + @Override + public void stop(final FlowContext context) throws FlowUpdateException { + try { + stopAsync(context).get(); + } catch (final Exception e) { + throw new FlowUpdateException("Failed to stop Connector", e); + } + } + + private CompletableFuture<Void> stopAsync(final FlowContext context) { + final CompletableFuture<Void> result = new CompletableFuture<>(); + + Thread.startVirtualThread(() -> { + try { + final ProcessGroupFacade rootGroup = context.getRootGroup(); + final ProcessGroupLifecycle lifecycle = rootGroup.getLifecycle(); + + try { + lifecycle.stopProcessors(true).get(1, TimeUnit.MINUTES); + } catch (final TimeoutException e) { + final List<ProcessorFacade> running = findProcessors(rootGroup, processor -> + processor.getLifecycle().getState() != ProcessorState.STOPPED && processor.getLifecycle().getState() != ProcessorState.DISABLED); + + if (!running.isEmpty()) { + getLogger().warn("After waiting 60 seconds for all Processors to stop, {} are still running. Terminating now.", running.size()); + running.forEach(processor -> processor.getLifecycle().terminate()); + } + } catch (final ExecutionException e) { + throw new RuntimeException("Failed to stop all Processors", e.getCause()); + } + + lifecycle.stopPorts(true).get(1, TimeUnit.MINUTES); + lifecycle.stopRemoteProcessGroups(true).get(1, TimeUnit.MINUTES); + lifecycle.stopStatelessGroups(true).get(2, TimeUnit.MINUTES); + lifecycle.disableControllerServices(ControllerServiceReferenceHierarchy.INCLUDE_CHILD_GROUPS).get(2, TimeUnit.MINUTES); + + result.complete(null); + } catch (final Exception e) { + result.completeExceptionally(e); + } + }); + + return result; + } + + @Override + public void prepareForUpdate(final FlowContext workingContext, final FlowContext activeContext) throws FlowUpdateException { + final CompletableFuture<Void> future = stopAsync(activeContext); + prepareUpdateFuture = future; + + try { + future.get(); + } catch (final Exception e) { + throw new FlowUpdateException("Failed to prepare Connector for update", e); + } + } + + /** + * Drains all FlowFiles from the Connector instance. + * + * @param flowContext the FlowContext to use for drainage + * @return a CompletableFuture that will be completed when drainage is complete + */ + @Override + public CompletableFuture<Void> drainFlowFiles(final FlowContext flowContext) { + final CompletableFuture<Void> result = new CompletableFuture<>(); + final QueueSize initialQueueSize = flowContext.getRootGroup().getQueueSize(); + if (initialQueueSize.getObjectCount() == 0) { + getLogger().debug("No FlowFiles to drain from Connector"); + result.complete(null); + return result; + } + + getLogger().info("Draining {} FlowFiles ({} bytes) from Connector", + initialQueueSize.getObjectCount(), NumberFormat.getNumberInstance().format(initialQueueSize.getByteCount())); + + Thread.startVirtualThread(() -> { + try { + stopSourceComponents(flowContext).get(); + startNonSourceComponents(result, flowContext); + + if (!result.isDone()) { + completeDrain(result, flowContext, initialQueueSize); + } + } catch (final Exception e) { + if (!result.isDone()) { + result.completeExceptionally(new RuntimeException("Failed while draining FlowFiles", e)); + } + } + }); + + return result; + } + + private void completeDrain(final CompletableFuture<Void> result, final FlowContext flowContext, final QueueSize initialQueueSize) { + try { + ensureDrainageUnblocked(); + } catch (final Exception e) { + getLogger().warn("Failed to ensure drainage is unblocked when draining FlowFiles", e); + } + + Exception failureReason = null; + int iterations = 0; + while (!isGroupDrained(flowContext.getRootGroup())) { + if (result.isDone()) { + getLogger().info("Drainage has been cancelled; will no longer wait for FlowFiles to drain"); + break; + } + + // Log the current queue size every 10 seconds (20 iterations of 500ms) so that it's clear + // whether or not progress is being made. + if (iterations++ % 20 == 0) { + final QueueSize queueSize = flowContext.getRootGroup().getQueueSize(); + getLogger().info("Waiting for {} FlowFiles ({} bytes) to drain", + queueSize.getObjectCount(), NumberFormat.getNumberInstance().format(queueSize.getByteCount())); + } + + try { + Thread.sleep(500); + } catch (final InterruptedException e) { + Thread.currentThread().interrupt(); + failureReason = e; + break; + } + } + + // Log completion unless the result was completed exceptionally or cancelled. + if (!result.isDone()) { + getLogger().info("All {} FlowFiles have drained from Connector", initialQueueSize.getObjectCount()); + } + + try { + stop(flowContext); + } catch (final Exception e) { + getLogger().warn("Failed to stop source Processors after draining FlowFiles", e); + if (failureReason == null) { + failureReason = e; + } else { + failureReason.addSuppressed(e); + } + } + + if (failureReason != null && !result.isDone()) { + result.completeExceptionally(new RuntimeException("Interrupted while waiting for " + AbstractConnector.this + " to drain", failureReason)); + } + + if (!result.isDone()) { + result.complete(null); + } + } + + private void startNonSourceComponents(final CompletableFuture<Void> result, final FlowContext flowContext) { + if (result.isDone()) { + return; + } + + final CompletableFuture<Void> enableServices = flowContext.getRootGroup().getLifecycle().enableControllerServices( + ControllerServiceReferenceScope.INCLUDE_REFERENCED_SERVICES_ONLY, + ControllerServiceReferenceHierarchy.INCLUDE_CHILD_GROUPS); + + try { + // Wait for all referenced services to be enabled. + enableServices.get(); + + if (!result.isDone()) { + getLogger().info("Starting all non-source components to facilitate drainage of FlowFiles"); + startNonSourceComponents(flowContext).get(); + } + } catch (final Exception e) { + try { + flowContext.getRootGroup().getLifecycle().disableControllerServices(ControllerServiceReferenceHierarchy.INCLUDE_CHILD_GROUPS).get(); + } catch (final Exception e1) { + e.addSuppressed(e1); + } + + result.completeExceptionally(new RuntimeException("Failed to start non-source components while draining FlowFiles", e.getCause())); + } + } + + /** + * <p> + * A method designed to be overridden by subclasses that need to ensure that any + * blockages to FlowFile drainage are removed. The default implementation is a no-op. + * Typical use cases include notifying Processors that block until a certain amount of data is queued up, + * or until certain conditions are met, that they should immediately allow data to flow through. + * </p> + */ + protected void ensureDrainageUnblocked() throws InvocationFailedException { + } + + @Override + public List<ConfigVerificationResult> verify(final FlowContext flowContext) { + final List<ConfigVerificationResult> results = new ArrayList<>(); + + final List<ConfigurationStep> configSteps = getConfigurationSteps(); + for (final ConfigurationStep configStep : configSteps) { + final List<ConfigVerificationResult> stepResults = verifyConfigurationStep(configStep.getName(), Map.of(), flowContext); + results.addAll(stepResults); + } + + return results; + } + + @Override + public List<ValidationResult> validate(final FlowContext flowContext, final ConnectorValidationContext validationContext) { + final ConnectorConfigurationContext configContext = flowContext.getConfigurationContext(); + final List<ValidationResult> results = new ArrayList<>(); + final List<ConfigurationStep> configurationSteps = getConfigurationSteps(); + + for (final ConfigurationStep configurationStep : configurationSteps) { + if (!isStepDependencySatisfied(configurationStep, configurationSteps, configContext)) { + getLogger().debug("Skipping validation for Configuration Step [{}] because its dependencies are not satisfied", configurationStep.getName()); + continue; + } + + results.addAll(validateConfigurationStep(configurationStep, configContext, validationContext)); + } + + // only run customValidate if regular validation is successful. This allows Processor developers to not have to check + // if values are null or invalid so that they can focus only on the interaction between the properties, etc. + if (results.isEmpty()) { + final Collection<ValidationResult> customResults = customValidate(configContext); + if (customResults != null) { + for (final ValidationResult result : customResults) { + if (!result.isValid()) { + results.add(result); + } + } + } + } + + return results; + } + + Review Comment: Extra newline can be removed. ########## src/main/java/org/apache/nifi/components/connector/AbstractConnector.java: ########## @@ -0,0 +1,744 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.components.connector; + +import org.apache.nifi.components.ConfigVerificationResult; +import org.apache.nifi.components.DescribedValue; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.components.connector.components.ConnectionFacade; +import org.apache.nifi.components.connector.components.ControllerServiceFacade; +import org.apache.nifi.components.connector.components.ControllerServiceReferenceHierarchy; +import org.apache.nifi.components.connector.components.ControllerServiceReferenceScope; +import org.apache.nifi.components.connector.components.FlowContext; +import org.apache.nifi.components.connector.components.ProcessGroupFacade; +import org.apache.nifi.components.connector.components.ProcessGroupLifecycle; +import org.apache.nifi.components.connector.components.ProcessorFacade; +import org.apache.nifi.components.connector.components.ProcessorState; +import org.apache.nifi.controller.queue.QueueSize; +import org.apache.nifi.flow.VersionedConnection; +import org.apache.nifi.logging.ComponentLog; + +import java.text.NumberFormat; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.function.Consumer; +import java.util.function.Function; +import java.util.function.Predicate; +import java.util.stream.Collectors; + +public abstract class AbstractConnector implements Connector { + private volatile ConnectorInitializationContext initializationContext; + private volatile ComponentLog logger; + private volatile CompletableFuture<Void> prepareUpdateFuture; + private String description; // effectively final + + protected abstract void onStepConfigured(final String stepName, final FlowContext workingContext) throws FlowUpdateException; Review Comment: As an `abstract` method to be implemented, it would be helpful to include a method comment with some basic description -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
