This is an automated email from the ASF dual-hosted git repository.

pvillard pushed a commit to branch support/nifi-1.x
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/support/nifi-1.x by this push:
     new 485ece3300 NIFI-11896 Corrected QuestDB Status Repository Shutdown 
handling
485ece3300 is described below

commit 485ece33002d22f9ef0d1f25f1212ecbb7881cce
Author: exceptionfactory <[email protected]>
AuthorDate: Wed Aug 2 16:25:43 2023 -0500

    NIFI-11896 Corrected QuestDB Status Repository Shutdown handling
    
    - Updated QuestDB Status Repository shutdown method to cancel scheduled 
tasks before immediate shutdown of Scheduled Executor Service
    - Updated QuestDB Scheduled Tasks to include initial delay to avoid 
unnecessary execution when starting
    - Updated QuestDB test class to minimize logging for QuestDB 7
    - Improved logging and exception messages
    
    Signed-off-by: Pierre Villard <[email protected]>
    
    This closes #7564.
---
 .../history/EmbeddedQuestDbRolloverHandler.java    |  8 ++---
 .../EmbeddedQuestDbStatusHistoryRepository.java    | 40 +++++++++++++++++-----
 .../history/questdb/QuestDbDatabaseManager.java    | 17 ++++-----
 .../history/questdb/QuestDbWritingTemplate.java    | 24 ++++++++-----
 .../history/storage/BufferedWriterFlushWorker.java |  4 +--
 ...EmbeddedQuestDbStatusHistoryRepositoryTest.java | 11 ++++++
 ...dQuestDbStatusHistoryRepositoryForNodeTest.java | 20 ++++++++++-
 7 files changed, 92 insertions(+), 32 deletions(-)

diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/EmbeddedQuestDbRolloverHandler.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/EmbeddedQuestDbRolloverHandler.java
index b0437346cc..8c524917db 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/EmbeddedQuestDbRolloverHandler.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/EmbeddedQuestDbRolloverHandler.java
@@ -71,9 +71,9 @@ public class EmbeddedQuestDbRolloverHandler implements 
Runnable {
 
     @Override
     public void run() {
-        LOGGER.debug("Starting rollover");
+        LOGGER.debug("Rollover started for Tables {}", tables);
         tables.forEach(tableName -> rolloverTable(tableName));
-        LOGGER.debug("Finishing rollover");
+        LOGGER.debug("Rollover completed for Tables {}", tables);
     }
 
     private void rolloverTable(final CharSequence tableName) {
@@ -89,7 +89,7 @@ public class EmbeddedQuestDbRolloverHandler implements 
Runnable {
                 }
             }
         } catch (final Exception e) {
-            LOGGER.error("Could not rollover table " + tableName, e);
+            LOGGER.error("Rollover failed for table [{}]", tableName, e);
         }
     }
 
@@ -98,7 +98,7 @@ public class EmbeddedQuestDbRolloverHandler implements 
Runnable {
             final CompiledQuery compile = 
compiler.compile(String.format(DELETION_QUERY, tableName, partition), 
dbContext.getSqlExecutionContext());
             compile.execute(new SCSequence(new TimeoutBlockingWaitStrategy(5, 
TimeUnit.SECONDS)));
         } catch (final Exception e) {
-            LOGGER.error("Dropping partition " + partition + " of table " + 
tableName + " failed", e);
+            LOGGER.error("Dropping partition [{}] of table [{}] failed", 
partition, tableName, e);
         }
     }
 
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/EmbeddedQuestDbStatusHistoryRepository.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/EmbeddedQuestDbStatusHistoryRepository.java
index 409b19f279..c478191924 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/EmbeddedQuestDbStatusHistoryRepository.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/EmbeddedQuestDbStatusHistoryRepository.java
@@ -47,6 +47,7 @@ import org.slf4j.LoggerFactory;
 import java.nio.file.Path;
 import java.time.Instant;
 import java.time.temporal.ChronoUnit;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Date;
 import java.util.HashMap;
@@ -54,6 +55,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 
 public class EmbeddedQuestDbStatusHistoryRepository implements 
StatusHistoryRepository {
@@ -86,6 +88,8 @@ public class EmbeddedQuestDbStatusHistoryRepository 
implements StatusHistoryRepo
     private final BufferedWriterForStatusStorage<NodeStatus> nodeStatusWriter;
     private final BufferedWriterForStatusStorage<GarbageCollectionStatus> 
garbageCollectionStatusWriter;
 
+    private final List<ScheduledFuture<?>> scheduledFutures = new 
ArrayList<>();
+
     /**
      * Default no args constructor for service loading only
      */
@@ -141,7 +145,7 @@ public class EmbeddedQuestDbStatusHistoryRepository 
implements StatusHistoryRepo
 
     @Override
     public void start() {
-        LOGGER.debug("Starting status history repository");
+        LOGGER.debug("Repository start initiated");
 
         final EmbeddedQuestDbRolloverHandler nodeRolloverHandler = new 
EmbeddedQuestDbRolloverHandler(QuestDbDatabaseManager.getNodeTableNames(), 
daysToKeepNodeData, dbContext);
         final EmbeddedQuestDbRolloverHandler componentRolloverHandler = new 
EmbeddedQuestDbRolloverHandler(QuestDbDatabaseManager.getComponentTableNames(), 
daysToKeepComponentData, dbContext);
@@ -154,19 +158,39 @@ public class EmbeddedQuestDbStatusHistoryRepository 
implements StatusHistoryRepo
             remoteProcessGroupStatusWriter
         ));
 
-        scheduledExecutorService.scheduleWithFixedDelay(nodeRolloverHandler, 
0, ROLL_FREQUENCY, TimeUnit.MILLISECONDS);
-        
scheduledExecutorService.scheduleWithFixedDelay(componentRolloverHandler, 0, 
ROLL_FREQUENCY, TimeUnit.MILLISECONDS);
-        scheduledExecutorService.scheduleWithFixedDelay(writer, 0, 
persistFrequency, TimeUnit.MILLISECONDS);
+        final ScheduledFuture<?> nodeRollerFuture = 
scheduledExecutorService.scheduleWithFixedDelay(nodeRolloverHandler, 
ROLL_FREQUENCY, ROLL_FREQUENCY, TimeUnit.MILLISECONDS);
+        scheduledFutures.add(nodeRollerFuture);
+
+        final ScheduledFuture<?> componentRolloverFuture = 
scheduledExecutorService.scheduleWithFixedDelay(componentRolloverHandler, 
ROLL_FREQUENCY, ROLL_FREQUENCY, TimeUnit.MILLISECONDS);
+        scheduledFutures.add(componentRolloverFuture);
+
+        final ScheduledFuture<?> writerFuture = 
scheduledExecutorService.scheduleWithFixedDelay(writer, persistFrequency, 
persistFrequency, TimeUnit.MILLISECONDS);
+        scheduledFutures.add(writerFuture);
 
-        LOGGER.debug("Status history repository is started");
+        LOGGER.debug("Repository start completed");
     }
 
     @Override
     public void shutdown() {
-        LOGGER.debug("Status history repository started to shut down");
-        scheduledExecutorService.shutdown();
+        LOGGER.debug("Repository shutdown started");
+
+        int cancelCompleted = 0;
+        int cancelFailed = 0;
+        for (ScheduledFuture<?> scheduledFuture : scheduledFutures) {
+            final boolean cancelled = scheduledFuture.cancel(true);
+            if (cancelled) {
+                cancelCompleted++;
+            } else {
+                cancelFailed++;
+            }
+        }
+        LOGGER.debug("Repository shutdown task cancellation status: completed 
[{}] failed [{}]", cancelCompleted, cancelFailed);
+
+        final List<Runnable> tasks = scheduledExecutorService.shutdownNow();
+        LOGGER.debug("Repository Scheduled Task Service shutdown remaining 
tasks [{}]", tasks.size());
+
         dbContext.close();
-        LOGGER.debug("Status history repository has been shut down");
+        LOGGER.debug("Repository shutdown completed");
     }
 
     @Override
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/questdb/QuestDbDatabaseManager.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/questdb/QuestDbDatabaseManager.java
index 420904328f..bdf75c83bb 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/questdb/QuestDbDatabaseManager.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/questdb/QuestDbDatabaseManager.java
@@ -64,7 +64,7 @@ public final class QuestDbDatabaseManager {
 
     public static void checkDatabaseStatus(final Path persistLocation) {
         final QuestDbDatabaseManager.DatabaseStatus databaseStatus = 
getDatabaseStatus(persistLocation);
-        LOGGER.debug("Starting status repository. It's estimated status is 
{}", databaseStatus);
+        LOGGER.debug("Starting Status Repository: Database Status [{}]", 
databaseStatus);
 
         if (databaseStatus == 
QuestDbDatabaseManager.DatabaseStatus.NON_EXISTING) {
             createDatabase(persistLocation);
@@ -120,27 +120,28 @@ public final class QuestDbDatabaseManager {
     }
 
     private static boolean checkConnection(final Path persistLocation) {
-        final CairoConfiguration configuration = new 
DefaultCairoConfiguration(persistLocation.toFile().getAbsolutePath());
+        final String absolutePath = persistLocation.toFile().getAbsolutePath();
+        final CairoConfiguration configuration = new 
DefaultCairoConfiguration(absolutePath);
 
         try (
             final CairoEngine engine = new CairoEngine(configuration)
         ) {
-            LOGGER.info("Connection to database was successful");
+            LOGGER.info("Database connection successful [{}]", absolutePath);
             return true;
         } catch (Exception e) {
-            LOGGER.error("Error during connection to database", e);
+            LOGGER.error("Database connection failed [{}]", absolutePath, e);
             return false;
         }
     }
 
     private static void createDatabase(final Path persistLocation) {
-        LOGGER.info("Creating database");
+        LOGGER.debug("Database creation started [{}]", persistLocation);
         final CairoConfiguration configuration;
 
         try {
             
FileUtils.ensureDirectoryExistAndCanReadAndWrite(persistLocation.toFile());
         } catch (final Exception e) {
-            throw new RuntimeException("Could not create database folder " + 
persistLocation.toAbsolutePath().toString(), e);
+            throw new RuntimeException(String.format("Database directory 
creation failed [%s]", persistLocation), e);
         }
 
         configuration = new 
DefaultCairoConfiguration(persistLocation.toFile().getAbsolutePath());
@@ -163,9 +164,9 @@ public final class QuestDbDatabaseManager {
             compiler.compile(QuestDbQueries.CREATE_PROCESSOR_STATUS, context);
             compiler.compile(QuestDbQueries.CREATE_COMPONENT_COUNTER, context);
 
-            LOGGER.info("Database is created");
+            LOGGER.info("Database creation completed [{}]", persistLocation);
         } catch (final Exception e) {
-            throw new RuntimeException("Could not create database!", e);
+            throw new RuntimeException(String.format("Database creation failed 
[%s]", persistLocation), e);
         }
     }
 
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/questdb/QuestDbWritingTemplate.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/questdb/QuestDbWritingTemplate.java
index db01d7f38a..36a9fc954f 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/questdb/QuestDbWritingTemplate.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/questdb/QuestDbWritingTemplate.java
@@ -17,6 +17,7 @@
 package org.apache.nifi.controller.status.history.questdb;
 
 import io.questdb.cairo.CairoEngine;
+import io.questdb.cairo.TableToken;
 import io.questdb.cairo.TableWriter;
 import io.questdb.griffin.SqlExecutionContext;
 import org.slf4j.Logger;
@@ -53,15 +54,20 @@ public abstract class QuestDbWritingTemplate<T> {
             return;
         }
 
-        try (
-            final TableWriter tableWriter = 
engine.getWriter(context.getCairoSecurityContext(), 
engine.getTableToken(tableName), "adding rows")
-        ) {
-            addRows(tableWriter, entries);
-            tableWriter.commit();
-        } catch (final Exception e) {
-            LOGGER.error("Error happened during writing into table " + 
tableName, e);
-        } finally {
-            engine.releaseInactive();
+        final TableToken tableToken = engine.getTableTokenIfExists(tableName);
+        if (tableToken == null) {
+            LOGGER.error("Table Token for table [{}] not found", tableName);
+        } else {
+            try (
+                final TableWriter tableWriter = engine.getWriter(tableToken, 
"adding rows")
+            ) {
+                addRows(tableWriter, entries);
+                tableWriter.commit();
+            } catch (final Exception e) {
+                LOGGER.error("Add rows [{}] to table [{}] failed", 
entries.size(), tableName, e);
+            } finally {
+                engine.releaseInactive();
+            }
         }
     }
 
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/storage/BufferedWriterFlushWorker.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/storage/BufferedWriterFlushWorker.java
index 18ae7678ef..4ae0c4a556 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/storage/BufferedWriterFlushWorker.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/storage/BufferedWriterFlushWorker.java
@@ -34,9 +34,9 @@ public class BufferedWriterFlushWorker implements Runnable {
     @Override
     public void run() {
         try {
-            bufferedWriterList.forEach(bufferedWriter -> 
bufferedWriter.flush());
+            bufferedWriterList.forEach(BufferedEntryWriter::flush);
         } catch (final Exception e) {
-            LOGGER.error("Error happened during calling flush.", e);
+            LOGGER.error("Flush Buffered Writer failed", e);
         }
     }
 }
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/status/history/AbstractEmbeddedQuestDbStatusHistoryRepositoryTest.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/status/history/AbstractEmbeddedQuestDbStatusHistoryRepositoryTest.java
index c4da66d282..efb1154d1a 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/status/history/AbstractEmbeddedQuestDbStatusHistoryRepositoryTest.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/status/history/AbstractEmbeddedQuestDbStatusHistoryRepositoryTest.java
@@ -18,10 +18,12 @@ package org.apache.nifi.controller.status.history;
 
 import org.apache.nifi.util.NiFiProperties;
 import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.io.TempDir;
 import org.mockito.Mockito;
 
+import java.net.URL;
 import java.nio.file.Path;
 import java.util.Date;
 import java.util.concurrent.TimeUnit;
@@ -42,6 +44,15 @@ public abstract class 
AbstractEmbeddedQuestDbStatusHistoryRepositoryTest extends
     @TempDir
     private Path temporaryDirectory;
 
+    @BeforeAll
+    public static void setLogging() {
+        final URL logConfUrl = 
AbstractEmbeddedQuestDbStatusHistoryRepositoryTest.class.getResource("/log-stdout.conf");
+        if (logConfUrl == null) {
+            throw new IllegalStateException("QuestDB log configuration not 
found");
+        }
+        System.setProperty("out", logConfUrl.getPath());
+    }
+
     @BeforeEach
     public void setUp() throws Exception {
         repository = startRepository();
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/status/history/EmbeddedQuestDbStatusHistoryRepositoryForNodeTest.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/status/history/EmbeddedQuestDbStatusHistoryRepositoryForNodeTest.java
index 509e26f880..c7da8b2811 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/status/history/EmbeddedQuestDbStatusHistoryRepositoryForNodeTest.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/status/history/EmbeddedQuestDbStatusHistoryRepositoryForNodeTest.java
@@ -22,6 +22,9 @@ import org.junit.jupiter.api.Test;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 public class EmbeddedQuestDbStatusHistoryRepositoryForNodeTest extends 
AbstractEmbeddedQuestDbStatusHistoryRepositoryTest {
+    private static final long ZERO_BYTES = 0L;
+
+    private static final int ZERO_COUNT = 0;
 
     @Test
     public void testReadingEmptyRepository() {
@@ -35,7 +38,9 @@ public class 
EmbeddedQuestDbStatusHistoryRepositoryForNodeTest extends AbstractE
 
     @Test
     public void testWritingThenReadingComponents() throws Exception {
-        repository.capture(givenNodeStatus(), new ProcessGroupStatus(), 
givenGarbageCollectionStatuses(INSERTED_AT), INSERTED_AT);
+        final ProcessGroupStatus processGroupStatus = getProcessGroupStatus();
+
+        repository.capture(givenNodeStatus(), processGroupStatus, 
givenGarbageCollectionStatuses(INSERTED_AT), INSERTED_AT);
         waitUntilPersisted();
 
         final StatusHistory nodeStatusHistory = 
repository.getNodeStatusHistory(START, END);
@@ -45,4 +50,17 @@ public class 
EmbeddedQuestDbStatusHistoryRepositoryForNodeTest extends AbstractE
         
assertGc1Status(garbageCollectionHistory.getGarbageCollectionStatuses("gc1"));
         
assertGc2Status(garbageCollectionHistory.getGarbageCollectionStatuses("gc2"));
     }
+
+    private static ProcessGroupStatus getProcessGroupStatus() {
+        final ProcessGroupStatus processGroupStatus = new ProcessGroupStatus();
+        processGroupStatus.setBytesRead(ZERO_BYTES);
+        processGroupStatus.setBytesWritten(ZERO_BYTES);
+        processGroupStatus.setInputCount(ZERO_COUNT);
+        processGroupStatus.setOutputCount(ZERO_COUNT);
+        processGroupStatus.setQueuedCount(ZERO_COUNT);
+        processGroupStatus.setInputContentSize(ZERO_BYTES);
+        processGroupStatus.setOutputContentSize(ZERO_BYTES);
+        processGroupStatus.setQueuedContentSize(ZERO_BYTES);
+        return processGroupStatus;
+    }
 }

Reply via email to