This is an automated email from the ASF dual-hosted git repository.
bbende pushed a commit to branch NIFI-15258
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/NIFI-15258 by this push:
new 9822e3e491 NIFI-15488: Added significant number of debug log messages
as well as some info/error for connector-related events (#10803)
9822e3e491 is described below
commit 9822e3e491ed5c2395a337bccbf17da6b911caf5
Author: Mark Payne <[email protected]>
AuthorDate: Thu Feb 5 12:47:14 2026 -0500
NIFI-15488: Added significant number of debug log messages as well as some
info/error for connector-related events (#10803)
---
.../connector/StandardConnectorNode.java | 43 +++++++++++++++++++++-
.../connector/StandardConnectorRepository.java | 16 ++++++--
2 files changed, 54 insertions(+), 5 deletions(-)
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/StandardConnectorNode.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/StandardConnectorNode.java
index 75525d16c3..77a577d86a 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/StandardConnectorNode.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/StandardConnectorNode.java
@@ -164,9 +164,11 @@ public class StandardConnectorNode implements
ConnectorNode {
+ "; it must be UPDATING.");
}
+ logger.debug("Preparing {} for update", this);
try (final NarCloseable ignored =
NarCloseable.withComponentNarLoader(extensionManager,
getConnector().getClass(), getIdentifier())) {
getConnector().prepareForUpdate(workingFlowContext,
activeFlowContext);
stateTransition.setCurrentState(ConnectorState.UPDATING);
+ logger.debug("Successfully prepared {} for update", this);
} catch (final Throwable t) {
logger.error("Failed to prepare update for {}", this, t);
@@ -184,6 +186,7 @@ public class StandardConnectorNode implements ConnectorNode
{
public void inheritConfiguration(final List<VersionedConfigurationStep>
activeConfig, final List<VersionedConfigurationStep> workingConfig,
final Bundle flowContextBundle) throws FlowUpdateException {
+ logger.debug("Inheriting configuration for {}", this);
final MutableConnectorConfigurationContext configurationContext =
createConfigurationContext(activeConfig);
final FrameworkFlowContext inheritContext =
flowContextFactory.createWorkingFlowContext(identifier,
connectorDetails.getComponentLog(), configurationContext,
flowContextBundle);
@@ -196,6 +199,8 @@ public class StandardConnectorNode implements ConnectorNode
{
final StepConfiguration stepConfig = createStepConfiguration(step);
setConfiguration(step.getName(), stepConfig, true);
}
+
+ logger.debug("Successfully inherited configuration for {}", this);
}
private StepConfiguration createStepConfiguration(final
VersionedConfigurationStep step) {
@@ -280,6 +285,7 @@ public class StandardConnectorNode implements ConnectorNode
{
stateTransition.setCurrentState(ConnectorState.UPDATED);
stateTransition.setDesiredState(ConnectorState.UPDATED);
+ logger.info("Successfully applied update for {}", this);
}
private void destroyWorkingContext() {
@@ -306,6 +312,7 @@ public class StandardConnectorNode implements ConnectorNode
{
try (final NarCloseable ignored =
NarCloseable.withComponentNarLoader(extensionManager,
getConnector().getClass(), getIdentifier())) {
getConnector().abortUpdate(workingFlowContext, cause);
}
+ logger.debug("Aborted update for {}", this);
}
@Override
@@ -325,6 +332,7 @@ public class StandardConnectorNode implements ConnectorNode
{
try (final NarCloseable ignored =
NarCloseable.withComponentNarLoader(extensionManager, connector.getClass(),
getIdentifier())) {
logger.debug("Notifying {} of configuration change for
configuration step {}", this, stepName);
connector.onConfigurationStepConfigured(stepName,
workingFlowContext);
+ logger.debug("Successfully set configuration for step {} on {}",
stepName, this);
} catch (final FlowUpdateException e) {
throw e;
} catch (final Exception e) {
@@ -442,6 +450,7 @@ public class StandardConnectorNode implements ConnectorNode
{
@Override
public Future<Void> drainFlowFiles() {
+ logger.debug("Draining FlowFiles for {}", this);
requireStopped("drain FlowFiles", ConnectorState.DRAINING);
try (final NarCloseable ignored =
NarCloseable.withComponentNarLoader(extensionManager,
connectorDetails.getConnector().getClass(), getIdentifier())) {
@@ -451,7 +460,12 @@ public class StandardConnectorNode implements
ConnectorNode {
final CompletableFuture<Void> stateUpdateFuture =
drainFuture.whenComplete((result, failureCause) -> {
drainFutureRef.set(null);
- logger.info("Successfully drained FlowFiles from {}; ensuring
all components are stopped.", this);
+
+ if (failureCause == null) {
+ logger.info("Successfully drained FlowFiles for {}", this);
+ } else {
+ logger.error("Failed to drain FlowFiles for {}", this,
failureCause);
+ }
try {
connectorDetails.getConnector().stop(activeFlowContext);
@@ -465,6 +479,7 @@ public class StandardConnectorNode implements ConnectorNode
{
return stateUpdateFuture;
} catch (final Throwable t) {
+ logger.error("Failed to drain FlowFiles for {}", this, t);
stateTransition.setCurrentState(ConnectorState.STOPPED);
throw t;
}
@@ -509,15 +524,25 @@ public class StandardConnectorNode implements
ConnectorNode {
@Override
public Future<Void> purgeFlowFiles(final String requestor) {
+ logger.debug("Purging FlowFiles for {}", this);
requireStopped("purge FlowFiles", ConnectorState.PURGING);
try {
final String dropRequestId = UUID.randomUUID().toString();
final DropFlowFileStatus status =
activeFlowContext.getManagedProcessGroup().dropAllFlowFiles(dropRequestId,
requestor);
final CompletableFuture<Void> future =
status.getCompletionFuture();
- final CompletableFuture<Void> stateUpdateFuture =
future.whenComplete((result, failureCause) ->
stateTransition.setCurrentState(ConnectorState.STOPPED));
+ final CompletableFuture<Void> stateUpdateFuture =
future.whenComplete((result, failureCause) -> {
+ stateTransition.setCurrentState(ConnectorState.STOPPED);
+
+ if (failureCause == null) {
+ logger.info("Successfully purged FlowFiles for {}", this);
+ } else {
+ logger.error("Failed to purge FlowFiles for {}", this,
failureCause);
+ }
+ });
return stateUpdateFuture;
} catch (final Throwable t) {
+ logger.error("Failed to purge FlowFiles for {}", this, t);
stateTransition.setCurrentState(ConnectorState.STOPPED);
throw t;
}
@@ -541,6 +566,7 @@ public class StandardConnectorNode implements ConnectorNode
{
}
private void stopComponent(final FlowEngine scheduler, final
CompletableFuture<Void> stopCompleteFuture) {
+ logger.debug("Stopping component for {}", this);
try (final NarCloseable ignored =
NarCloseable.withComponentNarLoader(extensionManager,
connectorDetails.getConnector().getClass(), getIdentifier())) {
connectorDetails.getConnector().stop(activeFlowContext);
} catch (final Exception e) {
@@ -551,6 +577,7 @@ public class StandardConnectorNode implements ConnectorNode
{
stateTransition.setCurrentState(ConnectorState.STOPPED);
stopCompleteFuture.complete(null);
+ logger.info("Successfully stopped {}", this);
final ConnectorState desiredState = getDesiredState();
if (desiredState == ConnectorState.RUNNING) {
@@ -560,6 +587,7 @@ public class StandardConnectorNode implements ConnectorNode
{
}
private void startComponent(final ScheduledExecutorService scheduler,
final CompletableFuture<Void> startCompleteFuture) {
+ logger.debug("Starting component for {}", this);
final ConnectorState desiredState = getDesiredState();
if (desiredState != ConnectorState.RUNNING) {
logger.info("Will not start {} because the desired state is no
longer RUNNING but is now {}", this, desiredState);
@@ -576,6 +604,7 @@ public class StandardConnectorNode implements ConnectorNode
{
stateTransition.setCurrentState(ConnectorState.RUNNING);
startCompleteFuture.complete(null);
+ logger.info("Successfully started {}", this);
}
@@ -654,6 +683,7 @@ public class StandardConnectorNode implements ConnectorNode
{
@Override
public void initializeConnector(final
FrameworkConnectorInitializationContext initializationContext) {
+ logger.debug("Initializing {}", this);
this.initializationContext = initializationContext;
try (NarCloseable ignored =
NarCloseable.withComponentNarLoader(extensionManager,
getConnector().getClass(), getIdentifier())) {
@@ -661,10 +691,12 @@ public class StandardConnectorNode implements
ConnectorNode {
}
recreateWorkingFlowContext();
+ logger.info("Successfully initialized {}", this);
}
@Override
public void loadInitialFlow() throws FlowUpdateException {
+ logger.debug("Loading initial flow for {}", this);
if (initializationContext == null) {
throw new IllegalStateException("Cannot load initial flow because
" + this + " has not been initialized yet.");
}
@@ -733,6 +765,7 @@ public class StandardConnectorNode implements ConnectorNode
{
@Override
public List<ConfigVerificationResult> verifyConfigurationStep(final String
stepName, final StepConfiguration configurationOverrides) {
+ logger.debug("Verifying configuration step {} for {}", stepName, this);
final List<SecretReference> invalidSecretRefs = new ArrayList<>();
final List<AssetReference> invalidAssetRefs = new ArrayList<>();
final Map<String, String> resolvedPropertyOverrides =
resolvePropertyReferences(configurationOverrides, invalidSecretRefs,
invalidAssetRefs);
@@ -790,6 +823,7 @@ public class StandardConnectorNode implements ConnectorNode
{
results.addAll(invalidConfigResults);
}
+ logger.debug("Completed verification of configuration step {} for
{}", stepName, this);
return results;
}
}
@@ -895,6 +929,7 @@ public class StandardConnectorNode implements ConnectorNode
{
@Override
public List<ConfigVerificationResult> verify() {
+ logger.debug("Verifying {}", this);
final List<ConfigVerificationResult> results = new ArrayList<>();
final ValidationState state = performValidation();
@@ -909,6 +944,7 @@ public class StandardConnectorNode implements ConnectorNode
{
.explanation("There are " +
validationFailureExplanations.size() + " validation failures: " +
validationFailureExplanations)
.build());
+ logger.debug("Completed verification for {} with validation
failures", this);
return results;
}
@@ -916,6 +952,7 @@ public class StandardConnectorNode implements ConnectorNode
{
results.addAll(getConnector().verify(workingFlowContext));
}
+ logger.debug("Completed verification for {}", this);
return results;
}
@@ -955,6 +992,7 @@ public class StandardConnectorNode implements ConnectorNode
{
@Override
public void discardWorkingConfiguration() {
recreateWorkingFlowContext();
+ logger.debug("Discarded working configuration for {}", this);
}
@Override
@@ -1192,6 +1230,7 @@ public class StandardConnectorNode implements
ConnectorNode {
@Override
public ValidationState performValidation() {
+ logger.debug("Performing validation for {}", this);
try (final NarCloseable ignored =
NarCloseable.withComponentNarLoader(extensionManager,
getConnector().getClass(), getIdentifier())) {
final ConnectorValidationContext validationContext =
createValidationContext(activeFlowContext);
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/StandardConnectorRepository.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/StandardConnectorRepository.java
index 5ae380cdfa..702bb5777d 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/StandardConnectorRepository.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/StandardConnectorRepository.java
@@ -53,10 +53,12 @@ public class StandardConnectorRepository implements
ConnectorRepository {
@Override
public void initialize(final ConnectorRepositoryInitializationContext
context) {
+ logger.debug("Initializing ConnectorRepository");
this.extensionManager = context.getExtensionManager();
this.requestReplicator = context.getRequestReplicator();
this.secretsManager = context.getSecretsManager();
this.assetRepository = new
StandardConnectorAssetRepository(context.getAssetManager());
+ logger.debug("Successfully initialized ConnectorRepository");
}
@Override
@@ -67,10 +69,12 @@ public class StandardConnectorRepository implements
ConnectorRepository {
@Override
public void restoreConnector(final ConnectorNode connector) {
addConnector(connector);
+ logger.debug("Successfully restored {}", connector);
}
@Override
public void removeConnector(final String connectorId) {
+ logger.debug("Removing {}", connectorId);
final ConnectorNode connectorNode = connectors.get(connectorId);
if (connectorNode == null) {
throw new IllegalStateException("No connector found with ID " +
connectorId);
@@ -136,6 +140,7 @@ public class StandardConnectorRepository implements
ConnectorRepository {
@Override
public void applyUpdate(final ConnectorNode connector, final
ConnectorUpdateContext context) throws FlowUpdateException {
+ logger.debug("Applying update to {}", connector);
final ConnectorState initialDesiredState = connector.getDesiredState();
logger.info("Applying update to Connector {}", connector);
@@ -155,6 +160,7 @@ public class StandardConnectorRepository implements
ConnectorRepository {
}
private void updateConnector(final ConnectorNode connector, final
ConnectorState initialDesiredState, final ConnectorUpdateContext context) {
+ logger.debug("Updating {}", connector);
try {
// Perform whatever preparation is necessary for the update.
Default implementation is to stop the connector.
logger.debug("Preparing {} for update", connector);
@@ -179,17 +185,17 @@ public class StandardConnectorRepository implements
ConnectorRepository {
// If the initial desired state was RUNNING, start the connector
again. Otherwise, stop it.
// We don't simply leave it be as the prepareForUpdate / update
may have changed the state of some components.
if (initialDesiredState == ConnectorState.RUNNING) {
- logger.info("Connector {} has been successfully updated;
starting Connector to resume initial state", connector);
+ logger.info("{} has been successfully updated; starting to
resume initial state", connector);
connector.start(lifecycleExecutor);
} else {
- logger.info("Connector {} has been successfully updated;
stopping Connector to resume initial state", connector);
+ logger.info("{} has been successfully updated; stopping to
resume initial state", connector);
connector.stop(lifecycleExecutor);
}
// We've updated the state of the connector so save flow again
context.saveFlow();
} catch (final Exception e) {
- logger.error("Failed to apply connector update for {}", connector,
e);
+ logger.error("Failed to apply update for {}", connector, e);
connector.abortUpdate(e);
}
}
@@ -268,18 +274,22 @@ public class StandardConnectorRepository implements
ConnectorRepository {
@Override
public void configureConnector(final ConnectorNode connector, final String
stepName, final StepConfiguration configuration) throws FlowUpdateException {
connector.setConfiguration(stepName, configuration);
+ logger.info("Successfully configured {} for step {}", connector,
stepName);
}
@Override
public void inheritConfiguration(final ConnectorNode connector, final
List<VersionedConfigurationStep> activeFlowConfiguration,
final List<VersionedConfigurationStep>
workingFlowConfiguration, final Bundle flowContextBundle) throws
FlowUpdateException {
+ logger.debug("Inheriting configuration for {}", connector);
connector.transitionStateForUpdating();
connector.prepareForUpdate();
try {
connector.inheritConfiguration(activeFlowConfiguration,
workingFlowConfiguration, flowContextBundle);
+ logger.debug("Successfully inherited configuration for {}",
connector);
} catch (final Exception e) {
+ logger.error("Failed to inherit configuration for {}", connector,
e);
connector.abortUpdate(e);
throw e;
}