This is an automated email from the ASF dual-hosted git repository.

joewitt pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 1d9ccb3857 NIFI-12161: This closes #7829. Ensuring framework threads 
use lightweight threads instead of a capped thread pool.  This prevents 
framework threads from livelocking in the event enough framework threads are 
holding threads while those needing to run cannot get them.
1d9ccb3857 is described below

commit 1d9ccb3857901faaf531645ae1b09547a0944050
Author: Mark Payne <marka...@hotmail.com>
AuthorDate: Mon Oct 2 11:12:10 2023 -0400

    NIFI-12161: This closes #7829. Ensuring framework threads use lightweight 
threads instead of a capped thread pool.  This prevents framework threads from 
livelocking in the event enough framework threads are holding threads while 
those needing to run cannot get them.
    
    Signed-off-by: Joseph Witt <joew...@apache.org>
---
 .../scheduling/StandardProcessScheduler.java       | 75 ++++++++++++++++------
 .../apache/nifi/tests/system/NiFiClientUtil.java   | 56 +++++++++++++---
 .../org/apache/nifi/tests/system/NiFiSystemIT.java | 22 +++++--
 .../SpawnedStandaloneNiFiInstanceFactory.java      |  7 ++
 .../system/pg/SingleFlowFileConcurrencyIT.java     | 16 ++++-
 5 files changed, 142 insertions(+), 34 deletions(-)

diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java
index ef68c3d67b..f0175d1b7c 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java
@@ -73,6 +73,7 @@ import java.util.concurrent.Future;
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.Supplier;
 
 import static java.util.Objects.requireNonNull;
@@ -90,11 +91,12 @@ public final class StandardProcessScheduler implements 
ProcessScheduler {
     private final StateManagerProvider stateManagerProvider;
     private final long processorStartTimeoutMillis;
     private final LifecycleStateManager lifecycleStateManager;
+    private final AtomicLong frameworkTaskThreadIndex = new AtomicLong(1L);
 
-    private final ScheduledExecutorService frameworkTaskExecutor;
     private final ConcurrentMap<SchedulingStrategy, SchedulingAgent> 
strategyAgentMap = new ConcurrentHashMap<>();
 
     // thread pool for starting/stopping components
+    private volatile boolean shutdown = false;
     private final ScheduledExecutorService componentLifeCycleThreadPool;
     private final ScheduledExecutorService componentMonitoringThreadPool = new 
FlowEngine(2, "Monitor Processor Lifecycle", true);
 
@@ -111,8 +113,6 @@ public final class StandardProcessScheduler implements 
ProcessScheduler {
 
         final String timeoutString = 
nifiProperties.getProperty(NiFiProperties.PROCESSOR_SCHEDULING_TIMEOUT);
         processorStartTimeoutMillis = timeoutString == null ? 60000 : 
FormatUtils.getTimeDuration(timeoutString.trim(), TimeUnit.MILLISECONDS);
-
-        frameworkTaskExecutor = new FlowEngine(4, "Framework Task Thread");
     }
 
     public ControllerServiceProvider getControllerServiceProvider() {
@@ -123,20 +123,36 @@ public final class StandardProcessScheduler implements 
ProcessScheduler {
         return stateManagerProvider.getStateManager(componentId);
     }
 
-    public void scheduleFrameworkTask(final Runnable command, final String 
taskName, final long initialDelay, final long delay, final TimeUnit timeUnit) {
-        frameworkTaskExecutor.scheduleWithFixedDelay(new Runnable() {
-            @Override
-            public void run() {
-                try {
-                    command.run();
-                } catch (final Throwable t) {
-                    LOG.error("Failed to run Framework Task {} due to {}", 
taskName, t.toString());
-                    if (LOG.isDebugEnabled()) {
-                        LOG.error("", t);
-                    }
-                }
+    public void scheduleFrameworkTask(final Runnable task, final String 
taskName, final long initialDelay, final long delay, final TimeUnit timeUnit) {
+        Thread.ofVirtual()
+            .name(taskName)
+            .start(() -> invokeRepeatedly(task, taskName, initialDelay, delay, 
timeUnit));
+    }
+
+    private void invokeRepeatedly(final Runnable task, final String taskName, 
final long initialDelay, final long delayBetweenInvocations, final TimeUnit 
timeUnit) {
+        if (initialDelay > 0) {
+            try {
+                timeUnit.sleep(initialDelay);
+            } catch (final InterruptedException ie) {
+                Thread.currentThread().interrupt();
+                return;
+            }
+        }
+
+        while (!this.shutdown) {
+            try {
+                task.run();
+            } catch (final Exception e) {
+                LOG.error("Failed to run Framework Task {}", taskName, e);
+            }
+
+            try {
+                timeUnit.sleep(delayBetweenInvocations);
+            } catch (final InterruptedException e) {
+                Thread.currentThread().interrupt();
+                return;
             }
-        }, initialDelay, delay, timeUnit);
+        }
     }
 
     /**
@@ -145,7 +161,29 @@ public final class StandardProcessScheduler implements 
ProcessScheduler {
      * @param task the task to perform
      */
     public Future<?> submitFrameworkTask(final Runnable task) {
-        return frameworkTaskExecutor.submit(task);
+        final CompletableFuture<?> future = new CompletableFuture<>();
+
+        Thread.ofVirtual()
+            .name("Framework Task Thread-" + 
frameworkTaskThreadIndex.getAndIncrement())
+            .start(wrapTask(task, future));
+
+        return future;
+    }
+
+    private Runnable wrapTask(final Runnable task, final CompletableFuture<?> 
future) {
+        return () -> {
+            try {
+                task.run();
+                future.complete(null);
+            } catch (final Exception e) {
+                LOG.error("Encountered unexpected Exception when performing 
background Framework Task", e);
+                future.completeExceptionally(e);
+            } catch (final Throwable t) {
+                LOG.error("Encountered unexpected Exception when performing 
background Framework Task", t);
+                future.completeExceptionally(t);
+                throw t;
+            }
+        };
     }
 
     @Override
@@ -172,6 +210,8 @@ public final class StandardProcessScheduler implements 
ProcessScheduler {
 
     @Override
     public void shutdown() {
+        shutdown = true;
+
         for (final SchedulingAgent schedulingAgent : 
strategyAgentMap.values()) {
             try {
                 schedulingAgent.shutdown();
@@ -181,7 +221,6 @@ public final class StandardProcessScheduler implements 
ProcessScheduler {
             }
         }
 
-        frameworkTaskExecutor.shutdown();
         componentLifeCycleThreadPool.shutdown();
     }
 
diff --git 
a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiClientUtil.java
 
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiClientUtil.java
index 397f98c595..b55bd80967 100644
--- 
a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiClientUtil.java
+++ 
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiClientUtil.java
@@ -126,6 +126,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
+import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
 public class NiFiClientUtil {
@@ -696,7 +697,10 @@ public class NiFiClientUtil {
     }
 
     public void waitForProcessorState(final String processorId, final String 
expectedState) throws NiFiClientException, IOException, InterruptedException {
-        while (true) {
+        final long maxTimestamp = System.currentTimeMillis() + 
TimeUnit.MINUTES.toMillis(2);
+        logger.info("Waiting for Processor {} to reach state {}", processorId, 
expectedState);
+
+        while (System.currentTimeMillis() < maxTimestamp) {
             final ProcessorEntity entity = 
getProcessorClient().getProcessor(processorId);
             final String state = entity.getComponent().getState();
 
@@ -714,6 +718,7 @@ public class NiFiClientUtil {
 
             final ProcessorStatusSnapshotDTO snapshotDto = 
entity.getStatus().getAggregateSnapshot();
             if (snapshotDto.getActiveThreadCount() == 0 && 
snapshotDto.getTerminatedThreadCount() == 0) {
+                logger.info("Processor {} has reached desired state of {}", 
processorId, expectedState);
                 return;
             }
 
@@ -722,7 +727,10 @@ public class NiFiClientUtil {
     }
 
     public ReportingTaskEntity waitForReportingTaskState(final String 
reportingTaskId, final String expectedState) throws NiFiClientException, 
IOException, InterruptedException {
-        while (true) {
+        final long maxTimestamp = System.currentTimeMillis() + 
TimeUnit.MINUTES.toMillis(2L);
+        logger.info("Waiting for Reporting Task {} to reach desired state of 
{}", reportingTaskId, expectedState);
+
+        while (System.currentTimeMillis() < maxTimestamp) {
             final ReportingTaskEntity entity = 
nifiClient.getReportingTasksClient().getReportingTask(reportingTaskId);
             final String state = entity.getComponent().getState();
 
@@ -735,15 +743,19 @@ public class NiFiClientUtil {
             }
 
             if ("RUNNING".equals(expectedState)) {
+                logger.info("Reporting task {} is now running", 
reportingTaskId);
                 return entity;
             }
 
             if (entity.getStatus().getActiveThreadCount() == 0) {
+                logger.info("Reporting task {} is now stopped", 
reportingTaskId);
                 return entity;
             }
 
             Thread.sleep(10L);
         }
+
+        throw new IOException("Timed out waiting for Reporting Task " + 
reportingTaskId + " to reach state of " + expectedState);
     }
 
     public void waitForReportingTaskValid(final String reportingTaskId) throws 
NiFiClientException, IOException {
@@ -868,9 +880,13 @@ public class NiFiClientUtil {
     }
 
     private void waitForNoRunningComponents(final String groupId) throws 
NiFiClientException, IOException {
-        while (true) {
+        final long maxTimestamp = System.currentTimeMillis() + 
TimeUnit.MINUTES.toMillis(2);
+        logger.info("Waiting for no more running components for group {}", 
groupId);
+
+        while (System.currentTimeMillis() < maxTimestamp) {
             final boolean anyRunning = isAnyComponentRunning(groupId);
             if (!anyRunning) {
+                logger.info("All Process Groups have finished");
                 return;
             }
 
@@ -906,6 +922,8 @@ public class NiFiClientUtil {
     }
 
     private void waitForProcessorsStopped(final String groupId) throws 
IOException, NiFiClientException {
+        logger.info("Waiting for processors in group {} to stop", groupId);
+
         final ProcessGroupFlowEntity rootGroup = 
nifiClient.getFlowClient().getProcessGroup(groupId);
         final FlowDTO rootFlowDTO = rootGroup.getProcessGroupFlow().getFlow();
         for (final ProcessorEntity processor : rootFlowDTO.getProcessors()) {
@@ -920,6 +938,8 @@ public class NiFiClientUtil {
         for (final ProcessGroupEntity group : rootFlowDTO.getProcessGroups()) {
             waitForProcessorsStopped(group.getComponent());
         }
+
+        logger.info("All processors in group {} have stopped", groupId);
     }
 
     private void waitForProcessorsStopped(final ProcessGroupDTO group) throws 
IOException, NiFiClientException {
@@ -956,6 +976,8 @@ public class NiFiClientUtil {
     }
 
     public ActivateControllerServicesEntity disableControllerServices(final 
String groupId, final boolean recurse) throws NiFiClientException, IOException {
+        logger.info("Starting disableControllerServices for group {}, 
recurse={}", groupId, recurse);
+
         final ActivateControllerServicesEntity 
activateControllerServicesEntity = new ActivateControllerServicesEntity();
         activateControllerServicesEntity.setId(groupId);
         
activateControllerServicesEntity.setState(ActivateControllerServicesEntity.STATE_DISABLED);
@@ -973,6 +995,7 @@ public class NiFiClientUtil {
             }
         }
 
+        logger.info("Finished disableControllerServices for group {}", 
groupId);
         return activateControllerServices;
     }
 
@@ -998,7 +1021,10 @@ public class NiFiClientUtil {
     }
 
     public void waitForControllerServiceRunStatus(final String id, final 
String requestedRunStatus) throws NiFiClientException, IOException {
-        while (true) {
+        final long maxTimestamp = System.currentTimeMillis() + 
TimeUnit.MINUTES.toMillis(2L);
+        logger.info("Waiting for Controller Service {} to have a Run Status of 
{}", id, requestedRunStatus);
+
+        while (System.currentTimeMillis() < maxTimestamp) {
             final ControllerServiceEntity serviceEntity = 
nifiClient.getControllerServicesClient().getControllerService(id);
             final String serviceState = 
serviceEntity.getComponent().getState();
             if (requestedRunStatus.equals(serviceState)) {
@@ -1029,7 +1055,9 @@ public class NiFiClientUtil {
     }
 
     public void waitForControllerServiceState(final String groupId, final 
String desiredState, final Collection<String> serviceIdsOfInterest) throws 
NiFiClientException, IOException {
-        while (true) {
+        final long maxTimestamp = System.currentTimeMillis() + 
TimeUnit.MINUTES.toMillis(2L);
+
+        while (System.currentTimeMillis() < maxTimestamp) {
             final List<ControllerServiceEntity> nonDisabledServices = 
getControllerServicesNotInState(groupId, desiredState, serviceIdsOfInterest);
             if (nonDisabledServices.isEmpty()) {
                 logger.info("Process Group [{}] Controller Services have 
desired state [{}]", groupId, desiredState);
@@ -1049,7 +1077,9 @@ public class NiFiClientUtil {
     }
 
     public void waitForControllerServiceValidationStatus(final String 
controllerServiceId, final String validationStatus) throws NiFiClientException, 
IOException {
-        while (true) {
+        final long maxTimestamp = System.currentTimeMillis() + 
TimeUnit.MINUTES.toMillis(2L);
+
+        while (System.currentTimeMillis() < maxTimestamp) {
             final ControllerServiceEntity controllerServiceEntity = 
nifiClient.getControllerServicesClient().getControllerService(controllerServiceId);
             final String currentValidationStatus = 
controllerServiceEntity.getComponent().getValidationStatus();
             if (validationStatus.equals(currentValidationStatus)) {
@@ -1070,7 +1100,9 @@ public class NiFiClientUtil {
     }
 
     public void waitForReportingTaskValidationStatus(final String 
reportingTaskId, final String validationStatus) throws NiFiClientException, 
IOException {
-        while (true) {
+        final long maxTimestamp = System.currentTimeMillis() + 
TimeUnit.MINUTES.toMillis(2L);
+
+        while (System.currentTimeMillis() < maxTimestamp) {
             final ReportingTaskEntity reportingTaskEntity = 
nifiClient.getReportingTasksClient().getReportingTask(reportingTaskId);
             final String currentValidationStatus = 
reportingTaskEntity.getStatus().getValidationStatus();
             if (validationStatus.equalsIgnoreCase(currentValidationStatus)) {
@@ -1306,11 +1338,17 @@ public class NiFiClientUtil {
     public DropRequestEntity emptyQueue(final String connectionId) throws 
NiFiClientException, IOException {
         final ConnectionClient connectionClient = getConnectionClient();
 
-        DropRequestEntity requestEntity;
-        while (true) {
+        final long maxTimestamp = System.currentTimeMillis() + 
TimeUnit.MINUTES.toMillis(2L);
+
+        DropRequestEntity requestEntity = null;
+        while (System.currentTimeMillis() < maxTimestamp) {
             requestEntity = connectionClient.emptyQueue(connectionId);
             try {
                 while (requestEntity.getDropRequest().getPercentCompleted() < 
100) {
+                    if (System.currentTimeMillis() > maxTimestamp) {
+                        throw new IOException("Timed out waiting for queue " + 
connectionId + " to empty");
+                    }
+
                     try {
                         Thread.sleep(10L);
                     } catch (final InterruptedException ie) {
diff --git 
a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiSystemIT.java
 
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiSystemIT.java
index b927fae922..5168a152c5 100644
--- 
a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiSystemIT.java
+++ 
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiSystemIT.java
@@ -99,7 +99,7 @@ public abstract class NiFiSystemIT implements 
NiFiInstanceProvider {
     private static final NiFiInstanceCache instanceCache = new 
NiFiInstanceCache();
 
     static {
-        Runtime.getRuntime().addShutdownHook(new Thread(() -> 
instanceCache.shutdown()));
+        Runtime.getRuntime().addShutdownHook(new 
Thread(instanceCache::shutdown));
     }
 
     private TestInfo testInfo;
@@ -107,6 +107,7 @@ public abstract class NiFiSystemIT implements 
NiFiInstanceProvider {
     @BeforeEach
     public void setup(final TestInfo testInfo) throws IOException {
         this.testInfo = testInfo;
+
         final String testClassName = 
testInfo.getTestClass().map(Class::getSimpleName).orElse("<Unknown Test 
Class>");
         final String friendlyTestName = testClassName + ":" + 
testInfo.getDisplayName();
         logger.info("Beginning Test {}", friendlyTestName);
@@ -133,21 +134,24 @@ public abstract class NiFiSystemIT implements 
NiFiInstanceProvider {
         return true;
     }
 
-    protected TestInfo getTestInfo() {
-        return testInfo;
-    }
 
     @AfterAll
     public static void cleanup() {
+        logger.info("Beginning cleanup");
+
         final NiFiInstance nifi = nifiRef.get();
         nifiRef.set(null);
         if (nifi != null) {
             instanceCache.stopOrRecycle(nifi);
         }
+
+        logger.info("Finished cleanup");
     }
 
     @AfterEach
     public void teardown() throws Exception {
+        logger.info("Beginning teardown");
+
         try {
             Exception destroyFlowFailure = null;
 
@@ -182,6 +186,8 @@ public abstract class NiFiSystemIT implements 
NiFiInstanceProvider {
             if (nifiClient != null) {
                 nifiClient.close();
             }
+
+            logger.info("Finished teardown");
         }
     }
 
@@ -230,6 +236,8 @@ public abstract class NiFiSystemIT implements 
NiFiInstanceProvider {
     }
 
     protected void destroyFlow() throws NiFiClientException, IOException, 
InterruptedException {
+        logger.info("Starting destroyFlow");
+
         getClientUtil().stopProcessGroupComponents("root");
         getClientUtil().disableControllerServices("root", true);
         getClientUtil().stopReportingTasks();
@@ -238,6 +246,8 @@ public abstract class NiFiSystemIT implements 
NiFiInstanceProvider {
         getClientUtil().deleteAll("root");
         getClientUtil().deleteControllerLevelServices();
         getClientUtil().deleteReportingTasks();
+
+        logger.info("Finished destroyFlow");
     }
 
     protected void waitForAllNodesConnected() {
@@ -273,7 +283,7 @@ public abstract class NiFiSystemIT implements 
NiFiInstanceProvider {
             }
 
             if (System.currentTimeMillis() > maxTime) {
-                throw new RuntimeException("Waited up to 60 seconds for both 
nodes to connect but only " + connectedNodeCount + " nodes connected");
+                throw new RuntimeException("Waited up to 60 seconds for all 
nodes to connect but only " + connectedNodeCount + " nodes connected");
             }
 
             try {
@@ -569,7 +579,7 @@ public abstract class NiFiSystemIT implements 
NiFiInstanceProvider {
     }
 
     protected void waitForCoordinatorElected() throws InterruptedException {
-        waitFor(() -> isCoordinatorElected());
+        waitFor(this::isCoordinatorElected);
     }
 
     protected boolean isCoordinatorElected() throws NiFiClientException, 
IOException {
diff --git 
a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/SpawnedStandaloneNiFiInstanceFactory.java
 
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/SpawnedStandaloneNiFiInstanceFactory.java
index 11b1172771..bc1047f927 100644
--- 
a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/SpawnedStandaloneNiFiInstanceFactory.java
+++ 
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/SpawnedStandaloneNiFiInstanceFactory.java
@@ -37,6 +37,7 @@ import java.util.Collections;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Properties;
+import java.util.concurrent.TimeUnit;
 
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
@@ -234,6 +235,8 @@ public class SpawnedStandaloneNiFiInstanceFactory 
implements NiFiInstanceFactory
         }
 
         private void waitForStartup() throws IOException {
+            final long timeoutMillis = System.currentTimeMillis() + 
TimeUnit.MINUTES.toMillis(5L);
+
             try (final NiFiClient client = createClient()) {
                 while (true) {
                     try {
@@ -241,6 +244,10 @@ public class SpawnedStandaloneNiFiInstanceFactory 
implements NiFiInstanceFactory
                         logger.info("NiFi Startup Completed [{}]", 
instanceDirectory.getName());
                         return;
                     } catch (final Exception e) {
+                        if (System.currentTimeMillis() > timeoutMillis) {
+                            throw new IOException("After waiting 5 minutes, 
NiFi instance still has not started");
+                        }
+
                         try {
                             Thread.sleep(1000L);
                         } catch (InterruptedException ex) {
diff --git 
a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/pg/SingleFlowFileConcurrencyIT.java
 
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/pg/SingleFlowFileConcurrencyIT.java
index 7c96e82edf..dc02c2cc7b 100644
--- 
a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/pg/SingleFlowFileConcurrencyIT.java
+++ 
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/pg/SingleFlowFileConcurrencyIT.java
@@ -27,6 +27,8 @@ import org.apache.nifi.web.api.entity.PortEntity;
 import org.apache.nifi.web.api.entity.ProcessGroupEntity;
 import org.apache.nifi.web.api.entity.ProcessorEntity;
 import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.Collections;
@@ -37,10 +39,12 @@ import static 
org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 public class SingleFlowFileConcurrencyIT extends NiFiSystemIT {
-
+    private static final Logger logger = 
LoggerFactory.getLogger(SingleFlowFileConcurrencyIT.class);
 
     @Test
     public void testSingleConcurrency() throws NiFiClientException, 
IOException, InterruptedException {
+        logger.info("Beginning test testSingleConcurrency");
+
         final ProcessGroupEntity processGroupEntity = 
getClientUtil().createProcessGroup("My Group", "root");
         final PortEntity inputPort = getClientUtil().createInputPort("In", 
processGroupEntity.getId());
         final PortEntity outputPort = getClientUtil().createOutputPort("Out", 
processGroupEntity.getId());
@@ -94,11 +98,15 @@ public class SingleFlowFileConcurrencyIT extends 
NiFiSystemIT {
 
         // Ensure that 3 FlowFiles are queued up for Terminate
         waitForQueueCount(outputToTerminate.getId(), 3);
+
+        logger.info("Finished test testSingleConcurrency");
     }
 
 
     @Test
     public void testSingleConcurrencyAndBatchOutput() throws 
NiFiClientException, IOException, InterruptedException {
+        logger.info("Beginning test testSingleConcurrencyAndBatchOutput");
+
         final ProcessGroupEntity processGroupEntity = 
getClientUtil().createProcessGroup("My Group", "root");
         final PortEntity inputPort = getClientUtil().createInputPort("In", 
processGroupEntity.getId());
         final PortEntity outputPort = getClientUtil().createOutputPort("Out", 
processGroupEntity.getId());
@@ -163,11 +171,15 @@ public class SingleFlowFileConcurrencyIT extends 
NiFiSystemIT {
         final Map<String, String> secondOutAttributes = 
secondOutFlowFile.getFlowFile().getAttributes();
         assertEquals("1", secondOutAttributes.get("batch.output.Out"));
         assertEquals("1", secondOutAttributes.get("batch.output.Out2"));
+
+        logger.info("Finished test testSingleConcurrencyAndBatchOutput");
     }
 
 
     @Test
     public void testBatchOutputHasCorrectNumbersOnRestart() throws 
NiFiClientException, IOException, InterruptedException {
+        logger.info("Beginning test 
testBatchOutputHasCorrectNumbersOnRestart");
+
         final ProcessGroupEntity processGroupEntity = 
getClientUtil().createProcessGroup("My Group", "root");
         final PortEntity inputPort = getClientUtil().createInputPort("In", 
processGroupEntity.getId());
         final PortEntity outputPort = getClientUtil().createOutputPort("Out", 
processGroupEntity.getId());
@@ -238,6 +250,8 @@ public class SingleFlowFileConcurrencyIT extends 
NiFiSystemIT {
         final Map<String, String> secondOutAttributes = 
secondOutFlowFile.getFlowFile().getAttributes();
         assertEquals("1", secondOutAttributes.get("batch.output.Out"));
         assertEquals("1", secondOutAttributes.get("batch.output.Out2"));
+
+        logger.info("Finished test testBatchOutputHasCorrectNumbersOnRestart");
     }
 
 }

Reply via email to