This is an automated email from the ASF dual-hosted git repository.
pvillard pushed a commit to branch support/nifi-1.x
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/support/nifi-1.x by this push:
new 485ece3300 NIFI-11896 Corrected QuestDB Status Repository Shutdown
handling
485ece3300 is described below
commit 485ece33002d22f9ef0d1f25f1212ecbb7881cce
Author: exceptionfactory <[email protected]>
AuthorDate: Wed Aug 2 16:25:43 2023 -0500
NIFI-11896 Corrected QuestDB Status Repository Shutdown handling
- Updated QuestDB Status Repository shutdown method to cancel scheduled
tasks before immediate shutdown of Scheduled Executor Service
- Updated QuestDB Scheduled Tasks to include initial delay to avoid
unnecessary execution when starting
- Updated QuestDB test class to minimize logging for QuestDB 7
- Improved logging and exception messages
Signed-off-by: Pierre Villard <[email protected]>
This closes #7564.
---
.../history/EmbeddedQuestDbRolloverHandler.java | 8 ++---
.../EmbeddedQuestDbStatusHistoryRepository.java | 40 +++++++++++++++++-----
.../history/questdb/QuestDbDatabaseManager.java | 17 ++++-----
.../history/questdb/QuestDbWritingTemplate.java | 24 ++++++++-----
.../history/storage/BufferedWriterFlushWorker.java | 4 +--
...EmbeddedQuestDbStatusHistoryRepositoryTest.java | 11 ++++++
...dQuestDbStatusHistoryRepositoryForNodeTest.java | 20 ++++++++++-
7 files changed, 92 insertions(+), 32 deletions(-)
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/EmbeddedQuestDbRolloverHandler.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/EmbeddedQuestDbRolloverHandler.java
index b0437346cc..8c524917db 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/EmbeddedQuestDbRolloverHandler.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/EmbeddedQuestDbRolloverHandler.java
@@ -71,9 +71,9 @@ public class EmbeddedQuestDbRolloverHandler implements
Runnable {
@Override
public void run() {
- LOGGER.debug("Starting rollover");
+ LOGGER.debug("Rollover started for Tables {}", tables);
tables.forEach(tableName -> rolloverTable(tableName));
- LOGGER.debug("Finishing rollover");
+ LOGGER.debug("Rollover completed for Tables {}", tables);
}
private void rolloverTable(final CharSequence tableName) {
@@ -89,7 +89,7 @@ public class EmbeddedQuestDbRolloverHandler implements
Runnable {
}
}
} catch (final Exception e) {
- LOGGER.error("Could not rollover table " + tableName, e);
+ LOGGER.error("Rollover failed for table [{}]", tableName, e);
}
}
@@ -98,7 +98,7 @@ public class EmbeddedQuestDbRolloverHandler implements
Runnable {
final CompiledQuery compile =
compiler.compile(String.format(DELETION_QUERY, tableName, partition),
dbContext.getSqlExecutionContext());
compile.execute(new SCSequence(new TimeoutBlockingWaitStrategy(5,
TimeUnit.SECONDS)));
} catch (final Exception e) {
- LOGGER.error("Dropping partition " + partition + " of table " +
tableName + " failed", e);
+ LOGGER.error("Dropping partition [{}] of table [{}] failed",
partition, tableName, e);
}
}
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/EmbeddedQuestDbStatusHistoryRepository.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/EmbeddedQuestDbStatusHistoryRepository.java
index 409b19f279..c478191924 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/EmbeddedQuestDbStatusHistoryRepository.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/EmbeddedQuestDbStatusHistoryRepository.java
@@ -47,6 +47,7 @@ import org.slf4j.LoggerFactory;
import java.nio.file.Path;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.Date;
import java.util.HashMap;
@@ -54,6 +55,7 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
public class EmbeddedQuestDbStatusHistoryRepository implements
StatusHistoryRepository {
@@ -86,6 +88,8 @@ public class EmbeddedQuestDbStatusHistoryRepository
implements StatusHistoryRepo
private final BufferedWriterForStatusStorage<NodeStatus> nodeStatusWriter;
private final BufferedWriterForStatusStorage<GarbageCollectionStatus>
garbageCollectionStatusWriter;
+ private final List<ScheduledFuture<?>> scheduledFutures = new
ArrayList<>();
+
/**
* Default no args constructor for service loading only
*/
@@ -141,7 +145,7 @@ public class EmbeddedQuestDbStatusHistoryRepository
implements StatusHistoryRepo
@Override
public void start() {
- LOGGER.debug("Starting status history repository");
+ LOGGER.debug("Repository start initiated");
final EmbeddedQuestDbRolloverHandler nodeRolloverHandler = new
EmbeddedQuestDbRolloverHandler(QuestDbDatabaseManager.getNodeTableNames(),
daysToKeepNodeData, dbContext);
final EmbeddedQuestDbRolloverHandler componentRolloverHandler = new
EmbeddedQuestDbRolloverHandler(QuestDbDatabaseManager.getComponentTableNames(),
daysToKeepComponentData, dbContext);
@@ -154,19 +158,39 @@ public class EmbeddedQuestDbStatusHistoryRepository
implements StatusHistoryRepo
remoteProcessGroupStatusWriter
));
- scheduledExecutorService.scheduleWithFixedDelay(nodeRolloverHandler,
0, ROLL_FREQUENCY, TimeUnit.MILLISECONDS);
-
scheduledExecutorService.scheduleWithFixedDelay(componentRolloverHandler, 0,
ROLL_FREQUENCY, TimeUnit.MILLISECONDS);
- scheduledExecutorService.scheduleWithFixedDelay(writer, 0,
persistFrequency, TimeUnit.MILLISECONDS);
+ final ScheduledFuture<?> nodeRollerFuture =
scheduledExecutorService.scheduleWithFixedDelay(nodeRolloverHandler,
ROLL_FREQUENCY, ROLL_FREQUENCY, TimeUnit.MILLISECONDS);
+ scheduledFutures.add(nodeRollerFuture);
+
+ final ScheduledFuture<?> componentRolloverFuture =
scheduledExecutorService.scheduleWithFixedDelay(componentRolloverHandler,
ROLL_FREQUENCY, ROLL_FREQUENCY, TimeUnit.MILLISECONDS);
+ scheduledFutures.add(componentRolloverFuture);
+
+ final ScheduledFuture<?> writerFuture =
scheduledExecutorService.scheduleWithFixedDelay(writer, persistFrequency,
persistFrequency, TimeUnit.MILLISECONDS);
+ scheduledFutures.add(writerFuture);
- LOGGER.debug("Status history repository is started");
+ LOGGER.debug("Repository start completed");
}
@Override
public void shutdown() {
- LOGGER.debug("Status history repository started to shut down");
- scheduledExecutorService.shutdown();
+ LOGGER.debug("Repository shutdown started");
+
+ int cancelCompleted = 0;
+ int cancelFailed = 0;
+ for (ScheduledFuture<?> scheduledFuture : scheduledFutures) {
+ final boolean cancelled = scheduledFuture.cancel(true);
+ if (cancelled) {
+ cancelCompleted++;
+ } else {
+ cancelFailed++;
+ }
+ }
+ LOGGER.debug("Repository shutdown task cancellation status: completed
[{}] failed [{}]", cancelCompleted, cancelFailed);
+
+ final List<Runnable> tasks = scheduledExecutorService.shutdownNow();
+ LOGGER.debug("Repository Scheduled Task Service shutdown remaining
tasks [{}]", tasks.size());
+
dbContext.close();
- LOGGER.debug("Status history repository has been shut down");
+ LOGGER.debug("Repository shutdown completed");
}
@Override
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/questdb/QuestDbDatabaseManager.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/questdb/QuestDbDatabaseManager.java
index 420904328f..bdf75c83bb 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/questdb/QuestDbDatabaseManager.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/questdb/QuestDbDatabaseManager.java
@@ -64,7 +64,7 @@ public final class QuestDbDatabaseManager {
public static void checkDatabaseStatus(final Path persistLocation) {
final QuestDbDatabaseManager.DatabaseStatus databaseStatus =
getDatabaseStatus(persistLocation);
- LOGGER.debug("Starting status repository. It's estimated status is
{}", databaseStatus);
+ LOGGER.debug("Starting Status Repository: Database Status [{}]",
databaseStatus);
if (databaseStatus ==
QuestDbDatabaseManager.DatabaseStatus.NON_EXISTING) {
createDatabase(persistLocation);
@@ -120,27 +120,28 @@ public final class QuestDbDatabaseManager {
}
private static boolean checkConnection(final Path persistLocation) {
- final CairoConfiguration configuration = new
DefaultCairoConfiguration(persistLocation.toFile().getAbsolutePath());
+ final String absolutePath = persistLocation.toFile().getAbsolutePath();
+ final CairoConfiguration configuration = new
DefaultCairoConfiguration(absolutePath);
try (
final CairoEngine engine = new CairoEngine(configuration)
) {
- LOGGER.info("Connection to database was successful");
+ LOGGER.info("Database connection successful [{}]", absolutePath);
return true;
} catch (Exception e) {
- LOGGER.error("Error during connection to database", e);
+ LOGGER.error("Database connection failed [{}]", absolutePath, e);
return false;
}
}
private static void createDatabase(final Path persistLocation) {
- LOGGER.info("Creating database");
+ LOGGER.debug("Database creation started [{}]", persistLocation);
final CairoConfiguration configuration;
try {
FileUtils.ensureDirectoryExistAndCanReadAndWrite(persistLocation.toFile());
} catch (final Exception e) {
- throw new RuntimeException("Could not create database folder " +
persistLocation.toAbsolutePath().toString(), e);
+ throw new RuntimeException(String.format("Database directory
creation failed [%s]", persistLocation), e);
}
configuration = new
DefaultCairoConfiguration(persistLocation.toFile().getAbsolutePath());
@@ -163,9 +164,9 @@ public final class QuestDbDatabaseManager {
compiler.compile(QuestDbQueries.CREATE_PROCESSOR_STATUS, context);
compiler.compile(QuestDbQueries.CREATE_COMPONENT_COUNTER, context);
- LOGGER.info("Database is created");
+ LOGGER.info("Database creation completed [{}]", persistLocation);
} catch (final Exception e) {
- throw new RuntimeException("Could not create database!", e);
+ throw new RuntimeException(String.format("Database creation failed
[%s]", persistLocation), e);
}
}
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/questdb/QuestDbWritingTemplate.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/questdb/QuestDbWritingTemplate.java
index db01d7f38a..36a9fc954f 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/questdb/QuestDbWritingTemplate.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/questdb/QuestDbWritingTemplate.java
@@ -17,6 +17,7 @@
package org.apache.nifi.controller.status.history.questdb;
import io.questdb.cairo.CairoEngine;
+import io.questdb.cairo.TableToken;
import io.questdb.cairo.TableWriter;
import io.questdb.griffin.SqlExecutionContext;
import org.slf4j.Logger;
@@ -53,15 +54,20 @@ public abstract class QuestDbWritingTemplate<T> {
return;
}
- try (
- final TableWriter tableWriter =
engine.getWriter(context.getCairoSecurityContext(),
engine.getTableToken(tableName), "adding rows")
- ) {
- addRows(tableWriter, entries);
- tableWriter.commit();
- } catch (final Exception e) {
- LOGGER.error("Error happened during writing into table " +
tableName, e);
- } finally {
- engine.releaseInactive();
+ final TableToken tableToken = engine.getTableTokenIfExists(tableName);
+ if (tableToken == null) {
+ LOGGER.error("Table Token for table [{}] not found", tableName);
+ } else {
+ try (
+ final TableWriter tableWriter = engine.getWriter(tableToken,
"adding rows")
+ ) {
+ addRows(tableWriter, entries);
+ tableWriter.commit();
+ } catch (final Exception e) {
+ LOGGER.error("Add rows [{}] to table [{}] failed",
entries.size(), tableName, e);
+ } finally {
+ engine.releaseInactive();
+ }
}
}
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/storage/BufferedWriterFlushWorker.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/storage/BufferedWriterFlushWorker.java
index 18ae7678ef..4ae0c4a556 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/storage/BufferedWriterFlushWorker.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/storage/BufferedWriterFlushWorker.java
@@ -34,9 +34,9 @@ public class BufferedWriterFlushWorker implements Runnable {
@Override
public void run() {
try {
- bufferedWriterList.forEach(bufferedWriter ->
bufferedWriter.flush());
+ bufferedWriterList.forEach(BufferedEntryWriter::flush);
} catch (final Exception e) {
- LOGGER.error("Error happened during calling flush.", e);
+ LOGGER.error("Flush Buffered Writer failed", e);
}
}
}
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/status/history/AbstractEmbeddedQuestDbStatusHistoryRepositoryTest.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/status/history/AbstractEmbeddedQuestDbStatusHistoryRepositoryTest.java
index c4da66d282..efb1154d1a 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/status/history/AbstractEmbeddedQuestDbStatusHistoryRepositoryTest.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/status/history/AbstractEmbeddedQuestDbStatusHistoryRepositoryTest.java
@@ -18,10 +18,12 @@ package org.apache.nifi.controller.status.history;
import org.apache.nifi.util.NiFiProperties;
import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.io.TempDir;
import org.mockito.Mockito;
+import java.net.URL;
import java.nio.file.Path;
import java.util.Date;
import java.util.concurrent.TimeUnit;
@@ -42,6 +44,15 @@ public abstract class
AbstractEmbeddedQuestDbStatusHistoryRepositoryTest extends
@TempDir
private Path temporaryDirectory;
+ @BeforeAll
+ public static void setLogging() {
+ final URL logConfUrl =
AbstractEmbeddedQuestDbStatusHistoryRepositoryTest.class.getResource("/log-stdout.conf");
+ if (logConfUrl == null) {
+ throw new IllegalStateException("QuestDB log configuration not
found");
+ }
+ System.setProperty("out", logConfUrl.getPath());
+ }
+
@BeforeEach
public void setUp() throws Exception {
repository = startRepository();
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/status/history/EmbeddedQuestDbStatusHistoryRepositoryForNodeTest.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/status/history/EmbeddedQuestDbStatusHistoryRepositoryForNodeTest.java
index 509e26f880..c7da8b2811 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/status/history/EmbeddedQuestDbStatusHistoryRepositoryForNodeTest.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/status/history/EmbeddedQuestDbStatusHistoryRepositoryForNodeTest.java
@@ -22,6 +22,9 @@ import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class EmbeddedQuestDbStatusHistoryRepositoryForNodeTest extends
AbstractEmbeddedQuestDbStatusHistoryRepositoryTest {
+ private static final long ZERO_BYTES = 0L;
+
+ private static final int ZERO_COUNT = 0;
@Test
public void testReadingEmptyRepository() {
@@ -35,7 +38,9 @@ public class
EmbeddedQuestDbStatusHistoryRepositoryForNodeTest extends AbstractE
@Test
public void testWritingThenReadingComponents() throws Exception {
- repository.capture(givenNodeStatus(), new ProcessGroupStatus(),
givenGarbageCollectionStatuses(INSERTED_AT), INSERTED_AT);
+ final ProcessGroupStatus processGroupStatus = getProcessGroupStatus();
+
+ repository.capture(givenNodeStatus(), processGroupStatus,
givenGarbageCollectionStatuses(INSERTED_AT), INSERTED_AT);
waitUntilPersisted();
final StatusHistory nodeStatusHistory =
repository.getNodeStatusHistory(START, END);
@@ -45,4 +50,17 @@ public class
EmbeddedQuestDbStatusHistoryRepositoryForNodeTest extends AbstractE
assertGc1Status(garbageCollectionHistory.getGarbageCollectionStatuses("gc1"));
assertGc2Status(garbageCollectionHistory.getGarbageCollectionStatuses("gc2"));
}
+
+ private static ProcessGroupStatus getProcessGroupStatus() {
+ final ProcessGroupStatus processGroupStatus = new ProcessGroupStatus();
+ processGroupStatus.setBytesRead(ZERO_BYTES);
+ processGroupStatus.setBytesWritten(ZERO_BYTES);
+ processGroupStatus.setInputCount(ZERO_COUNT);
+ processGroupStatus.setOutputCount(ZERO_COUNT);
+ processGroupStatus.setQueuedCount(ZERO_COUNT);
+ processGroupStatus.setInputContentSize(ZERO_BYTES);
+ processGroupStatus.setOutputContentSize(ZERO_BYTES);
+ processGroupStatus.setQueuedContentSize(ZERO_BYTES);
+ return processGroupStatus;
+ }
}