exceptionfactory commented on code in PR #70:
URL: https://github.com/apache/nifi-api/pull/70#discussion_r2867814589


##########
src/main/java/org/apache/nifi/components/connector/components/ProcessGroupLifecycle.java:
##########
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.components.connector.components;
+
+import java.util.Collection;
+import java.util.concurrent.CompletableFuture;
+
+public interface ProcessGroupLifecycle {
+
+    CompletableFuture<Void> 
enableControllerServices(ControllerServiceReferenceScope scope, 
ControllerServiceReferenceHierarchy hierarchy);
+
+    CompletableFuture<Void> enableControllerServices(Collection<String> 
serviceIdentifiers);
+
+    CompletableFuture<Void> 
disableControllerServices(ControllerServiceReferenceHierarchy hierarchy);
+
+    CompletableFuture<Void> disableControllerServices(Collection<String> 
serviceIdentifiers);
+
+    CompletableFuture<Void> startProcessors(boolean recursive);
+
+    CompletableFuture<Void> start(ControllerServiceReferenceScope 
serviceReferenceScope);
+
+    CompletableFuture<Void> stop();
+
+    CompletableFuture<Void> stopProcessors(boolean recursive);
+
+    CompletableFuture<Void> startPorts(boolean recursive);
+
+    CompletableFuture<Void> stopPorts(boolean recursive);
+
+    CompletableFuture<Void> startRemoteProcessGroups(boolean recursive);
+
+    CompletableFuture<Void> stopRemoteProcessGroups(boolean recursive);
+
+    CompletableFuture<Void> startStatelessGroups(boolean recursive);
+
+    CompletableFuture<Void> stopStatelessGroups(boolean recursive);

Review Comment:
   Is it worth considering an `enum` instead of a `boolean` for these 
`recursive` values?



##########
src/main/java/org/apache/nifi/components/connector/AbstractConnector.java:
##########
@@ -0,0 +1,744 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.components.connector;
+
+import org.apache.nifi.components.ConfigVerificationResult;
+import org.apache.nifi.components.DescribedValue;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.components.connector.components.ConnectionFacade;
+import org.apache.nifi.components.connector.components.ControllerServiceFacade;
+import 
org.apache.nifi.components.connector.components.ControllerServiceReferenceHierarchy;
+import 
org.apache.nifi.components.connector.components.ControllerServiceReferenceScope;
+import org.apache.nifi.components.connector.components.FlowContext;
+import org.apache.nifi.components.connector.components.ProcessGroupFacade;
+import org.apache.nifi.components.connector.components.ProcessGroupLifecycle;
+import org.apache.nifi.components.connector.components.ProcessorFacade;
+import org.apache.nifi.components.connector.components.ProcessorState;
+import org.apache.nifi.controller.queue.QueueSize;
+import org.apache.nifi.flow.VersionedConnection;
+import org.apache.nifi.logging.ComponentLog;
+
+import java.text.NumberFormat;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+
+public abstract class AbstractConnector implements Connector {
+    private volatile ConnectorInitializationContext initializationContext;
+    private volatile ComponentLog logger;
+    private volatile CompletableFuture<Void> prepareUpdateFuture;
+    private String description; // effectively final
+
+    protected abstract void onStepConfigured(final String stepName, final 
FlowContext workingContext) throws FlowUpdateException;
+
+
+    @Override
+    public final void initialize(final ConnectorInitializationContext context) 
{
+        this.initializationContext = context;
+        this.logger = context.getLogger();
+        this.description = getClass().getSimpleName() + "[id=" + 
context.getIdentifier() + "]";
+
+        init();
+    }
+
+    /**
+     * No-op method for subclasses to override to perform any initialization 
logic
+     */
+    protected void init() {
+    }
+
+    protected final ComponentLog getLogger() {
+        return logger;
+    }
+
+    protected final ConnectorInitializationContext getInitializationContext() {
+        if (initializationContext == null) {
+            throw new IllegalStateException("Connector has not been 
initialized");
+        }
+
+        return initializationContext;
+    }
+
+    @Override
+    public void start(final FlowContext context) throws FlowUpdateException {
+        final ProcessGroupLifecycle lifecycle = 
context.getRootGroup().getLifecycle();
+        final CompletableFuture<Void> enableServicesFuture = 
lifecycle.enableControllerServices(
+            ControllerServiceReferenceScope.INCLUDE_REFERENCED_SERVICES_ONLY,
+            ControllerServiceReferenceHierarchy.INCLUDE_CHILD_GROUPS);
+
+        try {
+            enableServicesFuture.get();
+        } catch (final Exception e) {
+            
lifecycle.disableControllerServices(ControllerServiceReferenceHierarchy.INCLUDE_CHILD_GROUPS);
+            throw new FlowUpdateException("Failed to enable Controller 
Services while starting Connector", e);
+        }
+
+        try {
+            lifecycle.startProcessors(true).get();
+            lifecycle.startPorts(true).get();
+            lifecycle.startStatelessGroups(true).get();
+            lifecycle.startRemoteProcessGroups(true).get();
+        } catch (final Exception e) {
+            logger.error("Failed to start components for {}", this, e);
+            try {
+                stop(context);
+            } catch (final Exception stopException) {
+                e.addSuppressed(new FlowUpdateException("Failed to stop 
Connector cleanly", stopException));
+            }
+        }
+    }
+
+    @Override
+    public void stop(final FlowContext context) throws FlowUpdateException {
+        try {
+            stopAsync(context).get();
+        } catch (final Exception e) {
+            throw new FlowUpdateException("Failed to stop Connector", e);
+        }
+    }
+
+    private CompletableFuture<Void> stopAsync(final FlowContext context) {
+        final CompletableFuture<Void> result = new CompletableFuture<>();
+
+        Thread.startVirtualThread(() -> {
+            try {
+                final ProcessGroupFacade rootGroup = context.getRootGroup();
+                final ProcessGroupLifecycle lifecycle = 
rootGroup.getLifecycle();
+
+                try {
+                    lifecycle.stopProcessors(true).get(1, TimeUnit.MINUTES);
+                } catch (final TimeoutException e) {
+                    final List<ProcessorFacade> running = 
findProcessors(rootGroup, processor ->
+                        processor.getLifecycle().getState() != 
ProcessorState.STOPPED && processor.getLifecycle().getState() != 
ProcessorState.DISABLED);
+
+                    if (!running.isEmpty()) {
+                        getLogger().warn("After waiting 60 seconds for all 
Processors to stop, {} are still running. Terminating now.", running.size());
+                        running.forEach(processor -> 
processor.getLifecycle().terminate());
+                    }
+                } catch (final ExecutionException e) {
+                    throw new RuntimeException("Failed to stop all 
Processors", e.getCause());
+                }
+
+                lifecycle.stopPorts(true).get(1, TimeUnit.MINUTES);
+                lifecycle.stopRemoteProcessGroups(true).get(1, 
TimeUnit.MINUTES);
+                lifecycle.stopStatelessGroups(true).get(2, TimeUnit.MINUTES);
+                
lifecycle.disableControllerServices(ControllerServiceReferenceHierarchy.INCLUDE_CHILD_GROUPS).get(2,
 TimeUnit.MINUTES);
+
+                result.complete(null);
+            } catch (final Exception e) {
+                result.completeExceptionally(e);
+            }
+        });
+
+        return result;
+    }
+
+    @Override
+    public void prepareForUpdate(final FlowContext workingContext, final 
FlowContext activeContext) throws FlowUpdateException {
+        final CompletableFuture<Void> future = stopAsync(activeContext);
+        prepareUpdateFuture = future;
+
+        try {
+            future.get();
+        } catch (final Exception e) {
+            throw new FlowUpdateException("Failed to prepare Connector for 
update", e);
+        }
+    }
+
+    /**
+     * Drains all FlowFiles from the Connector instance.
+     *
+     * @param flowContext the FlowContext to use for drainage
+     * @return a CompletableFuture that will be completed when drainage is 
complete
+     */
+    @Override
+    public CompletableFuture<Void> drainFlowFiles(final FlowContext 
flowContext) {
+        final CompletableFuture<Void> result = new CompletableFuture<>();
+        final QueueSize initialQueueSize = 
flowContext.getRootGroup().getQueueSize();
+        if (initialQueueSize.getObjectCount() == 0) {
+            getLogger().debug("No FlowFiles to drain from Connector");
+            result.complete(null);
+            return result;
+        }
+
+        getLogger().info("Draining {} FlowFiles ({} bytes) from Connector",
+            initialQueueSize.getObjectCount(), 
NumberFormat.getNumberInstance().format(initialQueueSize.getByteCount()));
+
+        Thread.startVirtualThread(() -> {
+            try {
+                stopSourceComponents(flowContext).get();
+                startNonSourceComponents(result, flowContext);
+
+                if (!result.isDone()) {
+                    completeDrain(result, flowContext, initialQueueSize);
+                }
+            } catch (final Exception e) {
+                if (!result.isDone()) {
+                    result.completeExceptionally(new RuntimeException("Failed 
while draining FlowFiles", e));
+                }
+            }
+        });
+
+        return result;
+    }
+
+    private void completeDrain(final CompletableFuture<Void> result, final 
FlowContext flowContext, final QueueSize initialQueueSize) {
+        try {
+            ensureDrainageUnblocked();
+        } catch (final Exception e) {
+            getLogger().warn("Failed to ensure drainage is unblocked when 
draining FlowFiles", e);
+        }
+
+        Exception failureReason = null;
+        int iterations = 0;
+        while (!isGroupDrained(flowContext.getRootGroup())) {
+            if (result.isDone()) {
+                getLogger().info("Drainage has been cancelled; will no longer 
wait for FlowFiles to drain");

Review Comment:
   Recommend shortening the log message to make it more concise.
   ```suggestion
                   getLogger().info("Drain cancelled: no longer waiting for 
FlowFiles to drain");
   ```



##########
src/main/java/org/apache/nifi/components/connector/components/FlowContextType.java:
##########
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.components.connector.components;
+
+public enum FlowContextType {
+
+    ACTIVE,
+
+    WORKING;

Review Comment:
   It might be helpful to add a comment on these elements, since `ACTIVE` and 
`WORKING` sound somewhat similar. Would `RUNNING` or `PROCESSING` be better for 
`WORKING`?



##########
src/main/java/org/apache/nifi/components/connector/ConnectorPropertyDescriptor.java:
##########
@@ -0,0 +1,492 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.components.connector;
+
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.DescribedValue;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.components.Validator;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Objects;
+import java.util.Set;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+public final class ConnectorPropertyDescriptor {
+    private static final Pattern INTEGER_PATTERN = Pattern.compile("^-?\\d+$");
+    private static final Pattern DOUBLE_PATTERN = 
Pattern.compile("^-?\\d+(\\.\\d+)?$");
+    private static final Pattern BOOLEAN_PATTERN = 
Pattern.compile("^(?i)(true|false)$");
+
+    private final String name;
+    private final String description;
+    private final String defaultValue;
+    private final boolean required;
+    private final PropertyType type;
+    private final List<DescribedValue> allowableValues;
+    private final boolean allowableValuesFetchable;
+    private final List<Validator> validators;
+    private final Set<ConnectorPropertyDependency> dependencies;
+
+    private ConnectorPropertyDescriptor(final Builder builder) {
+        this.name = builder.name;
+        this.description = builder.description;
+        this.defaultValue = builder.defaultValue;
+        this.required = builder.required;
+        this.type = builder.type;
+        this.allowableValues = builder.allowableValues == null ? null : 
Collections.unmodifiableList(builder.allowableValues);
+        this.allowableValuesFetchable = builder.allowableValuesFetchable;
+        this.validators = List.copyOf(builder.validators);
+        this.dependencies = Collections.unmodifiableSet(builder.dependencies);
+    }
+
+    public String getName() {
+        return name;
+    }
+
+    public String getDescription() {
+        return description;
+    }
+
+    public String getDefaultValue() {
+        return defaultValue;
+    }
+
+    public boolean isRequired() {
+        return required;
+    }
+
+    public PropertyType getType() {
+        return type;
+    }
+
+    public List<DescribedValue> getAllowableValues() {
+        return allowableValues;
+    }
+
+    public boolean isAllowableValuesFetchable() {
+        return allowableValuesFetchable;
+    }
+
+    public Set<ConnectorPropertyDependency> getDependencies() {
+        return dependencies;
+    }
+
+    public List<Validator> getValidators() {
+        return validators;
+    }
+
+    public ValidationResult validate(final String stepName, final String 
groupName, final String value, final ConnectorValidationContext 
validationContext) {
+        final List<DescribedValue> fetchedAllowableValues;
+        if (isAllowableValuesFetchable()) {
+            try {
+                fetchedAllowableValues = 
validationContext.fetchAllowableValues(stepName, getName());
+            } catch (final Exception e) {
+                return new ValidationResult.Builder()
+                    .subject(name)
+                    .input(value)
+                    .valid(false)
+                    .explanation("Failed to fetch allowable values: " + 
e.getMessage())

Review Comment:
   Is it worth logging the full exception stack trace, or at least including 
the exception class in the explanation?



##########
src/main/java/org/apache/nifi/components/connector/AbstractConnector.java:
##########
@@ -0,0 +1,744 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.components.connector;
+
+import org.apache.nifi.components.ConfigVerificationResult;
+import org.apache.nifi.components.DescribedValue;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.components.connector.components.ConnectionFacade;
+import org.apache.nifi.components.connector.components.ControllerServiceFacade;
+import 
org.apache.nifi.components.connector.components.ControllerServiceReferenceHierarchy;
+import 
org.apache.nifi.components.connector.components.ControllerServiceReferenceScope;
+import org.apache.nifi.components.connector.components.FlowContext;
+import org.apache.nifi.components.connector.components.ProcessGroupFacade;
+import org.apache.nifi.components.connector.components.ProcessGroupLifecycle;
+import org.apache.nifi.components.connector.components.ProcessorFacade;
+import org.apache.nifi.components.connector.components.ProcessorState;
+import org.apache.nifi.controller.queue.QueueSize;
+import org.apache.nifi.flow.VersionedConnection;
+import org.apache.nifi.logging.ComponentLog;
+
+import java.text.NumberFormat;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+
+public abstract class AbstractConnector implements Connector {
+    private volatile ConnectorInitializationContext initializationContext;
+    private volatile ComponentLog logger;
+    private volatile CompletableFuture<Void> prepareUpdateFuture;
+    private String description; // effectively final
+
+    protected abstract void onStepConfigured(final String stepName, final 
FlowContext workingContext) throws FlowUpdateException;
+
+
+    @Override
+    public final void initialize(final ConnectorInitializationContext context) 
{
+        this.initializationContext = context;
+        this.logger = context.getLogger();
+        this.description = getClass().getSimpleName() + "[id=" + 
context.getIdentifier() + "]";
+
+        init();
+    }
+
+    /**
+     * No-op method for subclasses to override to perform any initialization 
logic
+     */
+    protected void init() {
+    }
+
+    protected final ComponentLog getLogger() {
+        return logger;
+    }
+
+    protected final ConnectorInitializationContext getInitializationContext() {
+        if (initializationContext == null) {
+            throw new IllegalStateException("Connector has not been 
initialized");
+        }
+
+        return initializationContext;
+    }
+
+    @Override
+    public void start(final FlowContext context) throws FlowUpdateException {
+        final ProcessGroupLifecycle lifecycle = 
context.getRootGroup().getLifecycle();
+        final CompletableFuture<Void> enableServicesFuture = 
lifecycle.enableControllerServices(
+            ControllerServiceReferenceScope.INCLUDE_REFERENCED_SERVICES_ONLY,
+            ControllerServiceReferenceHierarchy.INCLUDE_CHILD_GROUPS);
+
+        try {
+            enableServicesFuture.get();
+        } catch (final Exception e) {
+            
lifecycle.disableControllerServices(ControllerServiceReferenceHierarchy.INCLUDE_CHILD_GROUPS);
+            throw new FlowUpdateException("Failed to enable Controller 
Services while starting Connector", e);
+        }
+
+        try {
+            lifecycle.startProcessors(true).get();
+            lifecycle.startPorts(true).get();
+            lifecycle.startStatelessGroups(true).get();
+            lifecycle.startRemoteProcessGroups(true).get();
+        } catch (final Exception e) {
+            logger.error("Failed to start components for {}", this, e);
+            try {
+                stop(context);
+            } catch (final Exception stopException) {
+                e.addSuppressed(new FlowUpdateException("Failed to stop 
Connector cleanly", stopException));
+            }
+        }
+    }
+
+    @Override
+    public void stop(final FlowContext context) throws FlowUpdateException {
+        try {
+            stopAsync(context).get();
+        } catch (final Exception e) {
+            throw new FlowUpdateException("Failed to stop Connector", e);
+        }
+    }
+
+    private CompletableFuture<Void> stopAsync(final FlowContext context) {
+        final CompletableFuture<Void> result = new CompletableFuture<>();
+
+        Thread.startVirtualThread(() -> {
+            try {
+                final ProcessGroupFacade rootGroup = context.getRootGroup();
+                final ProcessGroupLifecycle lifecycle = 
rootGroup.getLifecycle();
+
+                try {
+                    lifecycle.stopProcessors(true).get(1, TimeUnit.MINUTES);
+                } catch (final TimeoutException e) {
+                    final List<ProcessorFacade> running = 
findProcessors(rootGroup, processor ->
+                        processor.getLifecycle().getState() != 
ProcessorState.STOPPED && processor.getLifecycle().getState() != 
ProcessorState.DISABLED);
+
+                    if (!running.isEmpty()) {
+                        getLogger().warn("After waiting 60 seconds for all 
Processors to stop, {} are still running. Terminating now.", running.size());
+                        running.forEach(processor -> 
processor.getLifecycle().terminate());
+                    }
+                } catch (final ExecutionException e) {
+                    throw new RuntimeException("Failed to stop all 
Processors", e.getCause());
+                }
+
+                lifecycle.stopPorts(true).get(1, TimeUnit.MINUTES);
+                lifecycle.stopRemoteProcessGroups(true).get(1, 
TimeUnit.MINUTES);
+                lifecycle.stopStatelessGroups(true).get(2, TimeUnit.MINUTES);
+                
lifecycle.disableControllerServices(ControllerServiceReferenceHierarchy.INCLUDE_CHILD_GROUPS).get(2,
 TimeUnit.MINUTES);
+
+                result.complete(null);
+            } catch (final Exception e) {
+                result.completeExceptionally(e);
+            }
+        });
+
+        return result;
+    }
+
+    @Override
+    public void prepareForUpdate(final FlowContext workingContext, final 
FlowContext activeContext) throws FlowUpdateException {
+        final CompletableFuture<Void> future = stopAsync(activeContext);
+        prepareUpdateFuture = future;
+
+        try {
+            future.get();
+        } catch (final Exception e) {
+            throw new FlowUpdateException("Failed to prepare Connector for 
update", e);
+        }
+    }
+
+    /**
+     * Drains all FlowFiles from the Connector instance.
+     *
+     * @param flowContext the FlowContext to use for drainage
+     * @return a CompletableFuture that will be completed when drainage is 
complete
+     */
+    @Override
+    public CompletableFuture<Void> drainFlowFiles(final FlowContext 
flowContext) {
+        final CompletableFuture<Void> result = new CompletableFuture<>();
+        final QueueSize initialQueueSize = 
flowContext.getRootGroup().getQueueSize();
+        if (initialQueueSize.getObjectCount() == 0) {
+            getLogger().debug("No FlowFiles to drain from Connector");
+            result.complete(null);
+            return result;
+        }
+
+        getLogger().info("Draining {} FlowFiles ({} bytes) from Connector",
+            initialQueueSize.getObjectCount(), 
NumberFormat.getNumberInstance().format(initialQueueSize.getByteCount()));
+
+        Thread.startVirtualThread(() -> {
+            try {
+                stopSourceComponents(flowContext).get();
+                startNonSourceComponents(result, flowContext);
+
+                if (!result.isDone()) {
+                    completeDrain(result, flowContext, initialQueueSize);
+                }
+            } catch (final Exception e) {
+                if (!result.isDone()) {
+                    result.completeExceptionally(new RuntimeException("Failed 
while draining FlowFiles", e));
+                }
+            }
+        });
+
+        return result;
+    }
+
+    private void completeDrain(final CompletableFuture<Void> result, final 
FlowContext flowContext, final QueueSize initialQueueSize) {
+        try {
+            ensureDrainageUnblocked();
+        } catch (final Exception e) {
+            getLogger().warn("Failed to ensure drainage is unblocked when 
draining FlowFiles", e);
+        }
+
+        Exception failureReason = null;
+        int iterations = 0;
+        while (!isGroupDrained(flowContext.getRootGroup())) {
+            if (result.isDone()) {
+                getLogger().info("Drainage has been cancelled; will no longer 
wait for FlowFiles to drain");
+                break;
+            }
+
+            // Log the current queue size every 10 seconds (20 iterations of 
500ms) so that it's clear
+            // whether or not progress is being made.
+            if (iterations++ % 20 == 0) {
+                final QueueSize queueSize = 
flowContext.getRootGroup().getQueueSize();
+                getLogger().info("Waiting for {} FlowFiles ({} bytes) to 
drain",
+                    queueSize.getObjectCount(), 
NumberFormat.getNumberInstance().format(queueSize.getByteCount()));
+            }
+
+            try {
+                Thread.sleep(500);
+            } catch (final InterruptedException e) {
+                Thread.currentThread().interrupt();
+                failureReason = e;
+                break;
+            }
+        }
+
+        // Log completion unless the result was completed exceptionally or 
cancelled.
+        if (!result.isDone()) {
+            getLogger().info("All {} FlowFiles have drained from Connector", 
initialQueueSize.getObjectCount());
+        }
+
+        try {
+            stop(flowContext);
+        } catch (final Exception e) {
+            getLogger().warn("Failed to stop source Processors after draining 
FlowFiles", e);
+            if (failureReason == null) {
+                failureReason = e;
+            } else {
+                failureReason.addSuppressed(e);
+            }
+        }
+
+        if (failureReason != null && !result.isDone()) {
+            result.completeExceptionally(new RuntimeException("Interrupted 
while waiting for " + AbstractConnector.this + " to drain", failureReason));
+        }
+
+        if (!result.isDone()) {
+            result.complete(null);
+        }
+    }
+
+    private void startNonSourceComponents(final CompletableFuture<Void> 
result, final FlowContext flowContext) {
+        if (result.isDone()) {
+            return;
+        }
+
+        final CompletableFuture<Void> enableServices = 
flowContext.getRootGroup().getLifecycle().enableControllerServices(
+            ControllerServiceReferenceScope.INCLUDE_REFERENCED_SERVICES_ONLY,
+            ControllerServiceReferenceHierarchy.INCLUDE_CHILD_GROUPS);
+
+        try {
+            // Wait for all referenced services to be enabled.
+            enableServices.get();
+
+            if (!result.isDone()) {
+                getLogger().info("Starting all non-source components to 
facilitate drainage of FlowFiles");
+                startNonSourceComponents(flowContext).get();
+            }
+        } catch (final Exception e) {
+            try {
+                
flowContext.getRootGroup().getLifecycle().disableControllerServices(ControllerServiceReferenceHierarchy.INCLUDE_CHILD_GROUPS).get();
+            } catch (final Exception e1) {
+                e.addSuppressed(e1);
+            }
+
+            result.completeExceptionally(new RuntimeException("Failed to start 
non-source components while draining FlowFiles", e.getCause()));
+        }
+    }
+
+    /**
+     * <p>
+     *     A method designed to be overridden by subclasses that need to 
ensure that any
+     *     blockages to FlowFile drainage are removed. The default 
implementation is a no-op.
+     *     Typical use cases include notifying Processors that block until a 
certain amount of data is queued up,
+     *     or until certain conditions are met, that they should immediately 
allow data to flow through.
+     * </p>
+     */
+    protected void ensureDrainageUnblocked() throws InvocationFailedException {
+    }
+
+    @Override
+    public List<ConfigVerificationResult> verify(final FlowContext 
flowContext) {
+        final List<ConfigVerificationResult> results = new ArrayList<>();
+
+        final List<ConfigurationStep> configSteps = getConfigurationSteps();
+        for (final ConfigurationStep configStep : configSteps) {
+            final List<ConfigVerificationResult> stepResults = 
verifyConfigurationStep(configStep.getName(), Map.of(), flowContext);
+            results.addAll(stepResults);
+        }
+
+        return results;
+    }
+
+    @Override
+    public List<ValidationResult> validate(final FlowContext flowContext, 
final ConnectorValidationContext validationContext) {
+        final ConnectorConfigurationContext configContext = 
flowContext.getConfigurationContext();
+        final List<ValidationResult> results = new ArrayList<>();
+        final List<ConfigurationStep> configurationSteps = 
getConfigurationSteps();
+
+        for (final ConfigurationStep configurationStep : configurationSteps) {
+            if (!isStepDependencySatisfied(configurationStep, 
configurationSteps, configContext)) {
+                getLogger().debug("Skipping validation for Configuration Step 
[{}] because its dependencies are not satisfied", configurationStep.getName());
+                continue;
+            }
+
+            results.addAll(validateConfigurationStep(configurationStep, 
configContext, validationContext));
+        }
+
+        // only run customValidate if regular validation is successful. This 
allows Processor developers to not have to check
+        // if values are null or invalid so that they can focus only on the 
interaction between the properties, etc.
+        if (results.isEmpty()) {
+            final Collection<ValidationResult> customResults = 
customValidate(configContext);
+            if (customResults != null) {
+                for (final ValidationResult result : customResults) {
+                    if (!result.isValid()) {
+                        results.add(result);
+                    }
+                }
+            }
+        }
+
+        return results;
+    }
+
+

Review Comment:
   Extra newline can be removed.



##########
src/main/java/org/apache/nifi/components/connector/AbstractConnector.java:
##########
@@ -0,0 +1,744 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.components.connector;
+
+import org.apache.nifi.components.ConfigVerificationResult;
+import org.apache.nifi.components.DescribedValue;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.components.connector.components.ConnectionFacade;
+import org.apache.nifi.components.connector.components.ControllerServiceFacade;
+import 
org.apache.nifi.components.connector.components.ControllerServiceReferenceHierarchy;
+import 
org.apache.nifi.components.connector.components.ControllerServiceReferenceScope;
+import org.apache.nifi.components.connector.components.FlowContext;
+import org.apache.nifi.components.connector.components.ProcessGroupFacade;
+import org.apache.nifi.components.connector.components.ProcessGroupLifecycle;
+import org.apache.nifi.components.connector.components.ProcessorFacade;
+import org.apache.nifi.components.connector.components.ProcessorState;
+import org.apache.nifi.controller.queue.QueueSize;
+import org.apache.nifi.flow.VersionedConnection;
+import org.apache.nifi.logging.ComponentLog;
+
+import java.text.NumberFormat;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+
+public abstract class AbstractConnector implements Connector {
+    private volatile ConnectorInitializationContext initializationContext;
+    private volatile ComponentLog logger;
+    private volatile CompletableFuture<Void> prepareUpdateFuture;
+    private String description; // effectively final
+
+    protected abstract void onStepConfigured(final String stepName, final 
FlowContext workingContext) throws FlowUpdateException;

Review Comment:
   As an `abstract` method to be implemented, it would be helpful to include a 
method comment with some basic description



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to