exceptionfactory commented on code in PR #8152:
URL: https://github.com/apache/nifi/pull/8152#discussion_r1424815114


##########
nifi-commons/nifi-utils/src/main/java/org/apache/nifi/retry/NoReturnCallable.java:
##########
@@ -14,25 +14,16 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.nifi.controller.status.history.storage;
-
-import org.apache.nifi.controller.status.history.GarbageCollectionHistory;
-import org.apache.nifi.controller.status.history.GarbageCollectionStatus;
-
-import java.time.Instant;
+package org.apache.nifi.retry;
 
 /**
- * Readable status storage for garbage collection status entries.
+ * Represents a portion of callable code without expected return value.
  */
-public interface GarbageCollectionStatusStorage extends 
StatusStorage<GarbageCollectionStatus> {
+@FunctionalInterface
+public interface NoReturnCallable {

Review Comment:
   There seems like a very generic interface. Is it necessary as opposed to the 
standard `Runnable`, or `Callable` with `Void` return?



##########
nifi-docs/src/main/asciidoc/administration-guide.adoc:
##########
@@ -3538,15 +3538,19 @@ of 576.
 
 ==== Persistent repository
 
-If the value of the property 
`nifi.components.status.repository.implementation` is 
`EmbeddedQuestDbStatusHistoryRepository`, the
-status history data will be stored to the disk in a persistent manner. Data 
will be kept between restarts.
+If the value of the property 
`nifi.components.status.repository.implementation` is 
`org.apache.nifi.controller.status.history.questdb.EmbeddedQuestDbStatusHistoryRepository`,
 the
+status history data will be stored to the disk in a persistent manner. Data 
will be kept between restarts. In order to use persistent repository, the 
QuestDB NAR must be re-built with the `include-questdb` profiles enabled.
 
 |====
 |*Property*|*Description*
 |`nifi.status.repository.questdb.persist.node.days`|The number of days the 
node status data (such as Repository disk space free, garbage collection 
information, etc.) will be kept. The default values
 is `14`.
 |`nifi.status.repository.questdb.persist.component.days`|The number of days 
the component status data (i.e., stats for each Processor, Connection, etc.) 
will be kept. The default value is `3`.
 |`nifi.status.repository.questdb.persist.location`|The location of the 
persistent Status History Repository. The default value is 
`./status_repository`.
+|`nifi.status.repository.questdb.persist.location.backup`|The location of the 
database backup in case the database is being corrupted and recreated. The 
default value is `./status_repository_backup`.
+|`nifi.status.repository.questdb.persist.batchsize`|The QuestDb based status 
history repository persists the collected status information in batches. The 
batch size determines the maximum number of persisted status records at a given 
time. The default value is `1000`.

Review Comment:
   How likely is it that this will need to be changed? Is it a case where 
having an internal default is sufficient? I suppose it depends on the volume of 
data, but if it does not need to be changed, it may be better to keep it as an 
internal setting versus a public property.



##########
nifi-nar-bundles/nifi-questdb-bundle/nifi-questdb-status-history/src/main/java/org/apache/nifi/controller/status/history/questdb/EmbeddedQuestDbStatusHistoryRepositoryDefinitions.java:
##########
@@ -0,0 +1,343 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller.status.history.questdb;
+
+import org.apache.nifi.controller.status.ConnectionStatus;
+import org.apache.nifi.controller.status.NodeStatus;
+import org.apache.nifi.controller.status.ProcessGroupStatus;
+import org.apache.nifi.controller.status.ProcessorStatus;
+import org.apache.nifi.controller.status.RemoteProcessGroupStatus;
+import org.apache.nifi.controller.status.history.ConnectionStatusDescriptor;
+import org.apache.nifi.controller.status.history.GarbageCollectionStatus;
+import org.apache.nifi.controller.status.history.MetricDescriptor;
+import org.apache.nifi.controller.status.history.NodeStatusDescriptor;
+import org.apache.nifi.controller.status.history.ProcessGroupStatusDescriptor;
+import org.apache.nifi.controller.status.history.ProcessorStatusDescriptor;
+import 
org.apache.nifi.controller.status.history.RemoteProcessGroupStatusDescriptor;
+import org.apache.nifi.controller.status.history.StandardMetricDescriptor;
+import org.apache.nifi.controller.status.history.StandardStatusSnapshot;
+import org.apache.nifi.questdb.InsertRowDataSource;
+import org.apache.nifi.questdb.QueryResultProcessor;
+import org.apache.nifi.questdb.mapping.RequestMapping;
+import org.apache.nifi.questdb.mapping.RequestMappingBuilder;
+
+import java.time.ZoneId;
+import java.time.format.DateTimeFormatter;
+import java.util.Collection;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+final class EmbeddedQuestDbStatusHistoryRepositoryDefinitions {
+    /**
+     * Date format expected by the storage.
+     */
+    static final String CAPTURE_DATE_FORMAT = "yyyy-MM-dd:HH:mm:ss Z";
+
+    /**
+     * Date formatter for the database fields.
+     */
+    static final DateTimeFormatter DATE_FORMATTER = 
DateTimeFormatter.ofPattern(CAPTURE_DATE_FORMAT).withZone(ZoneId.systemDefault());
+
+    // General component
+
+    static final String COMPONENT_STATUS_QUERY =
+        "SELECT * FROM %s " +
+        "WHERE componentId = '%s' " +
+        "AND capturedAt > to_timestamp('%s', '" + CAPTURE_DATE_FORMAT + "') " +
+        "AND capturedAt < to_timestamp('%s', '" + CAPTURE_DATE_FORMAT + "') " +
+        "ORDER BY capturedAt ASC";
+
+    // Connection
+
+    static final String TABLE_NAME_CONNECTION_STATUS = "connectionStatus";
+
+    static final String CREATE_CONNECTION_STATUS =
+        "CREATE TABLE " + TABLE_NAME_CONNECTION_STATUS + " (" +
+        "capturedAt TIMESTAMP," +
+        "componentId SYMBOL capacity 2000 nocache index capacity 1500," +
+        "inputBytes LONG," +
+        "inputCount LONG," +
+        "outputBytes LONG," +
+        "outputCount LONG," +
+        "queuedBytes LONG," +
+        "queuedCount LONG," +
+        "totalQueuedDuration LONG," +
+        "maxQueuedDuration LONG," +
+        "averageQueuedDuration LONG" +
+        ") TIMESTAMP(capturedAt) PARTITION BY DAY";
+
+    private static final Map<Integer, MetricDescriptor<ConnectionStatus>> 
CONNECTION_METRICS = new HashMap<>() {{
+        put(2, ConnectionStatusDescriptor.INPUT_BYTES.getDescriptor());
+        put(3, ConnectionStatusDescriptor.INPUT_COUNT.getDescriptor());
+        put(4, ConnectionStatusDescriptor.OUTPUT_BYTES.getDescriptor());
+        put(5, ConnectionStatusDescriptor.OUTPUT_COUNT.getDescriptor());
+        put(6, ConnectionStatusDescriptor.QUEUED_BYTES.getDescriptor());
+        put(7, ConnectionStatusDescriptor.QUEUED_COUNT.getDescriptor());
+        put(8, 
ConnectionStatusDescriptor.TOTAL_QUEUED_DURATION.getDescriptor());
+        put(9, ConnectionStatusDescriptor.MAX_QUEUED_DURATION.getDescriptor());
+        put(10, 
ConnectionStatusDescriptor.AVERAGE_QUEUED_DURATION.getDescriptor());
+    }};

Review Comment:
   The anonymous HashMap creation with nested `put` should not be used. 
Recommend either `Map.of()` or using a static initializer. If the order is 
important, this should also be a `LinkedHashMap`.



##########
nifi-nar-bundles/nifi-questdb-bundle/nifi-questdb-status-history/src/main/java/org/apache/nifi/controller/status/history/questdb/BufferedStatusHistoryStorage.java:
##########
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller.status.history.questdb;
+
+import org.apache.commons.lang3.concurrent.BasicThreadFactory;
+import org.apache.nifi.controller.status.ConnectionStatus;
+import org.apache.nifi.controller.status.NodeStatus;
+import org.apache.nifi.controller.status.ProcessGroupStatus;
+import org.apache.nifi.controller.status.ProcessorStatus;
+import org.apache.nifi.controller.status.RemoteProcessGroupStatus;
+import org.apache.nifi.controller.status.history.GarbageCollectionStatus;
+import org.apache.nifi.controller.status.history.StatusSnapshot;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Date;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
+
+final class BufferedStatusHistoryStorage implements StatusHistoryStorage {
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(BufferedStatusHistoryStorage.class);
+
+    private final String id = UUID.randomUUID().toString();
+    private final List<ScheduledFuture<?>> scheduledFutures = new 
ArrayList<>();
+    private final ScheduledExecutorService scheduledExecutorService = Executors
+            .newScheduledThreadPool(1, new 
BasicThreadFactory.Builder().namingPattern("BufferedStatusHistoryStorage-" + id 
+ "-%d").build());
+
+    private final StatusHistoryStorage storage;
+    private final long persistFrequencyInMs;
+    private final int persistBatchSize;
+
+    private final BlockingQueue<CapturedStatus<NodeStatus>> nodeStatusQueue = 
new LinkedBlockingQueue<>();
+    private final BlockingQueue<CapturedStatus<GarbageCollectionStatus>> 
garbageCollectionStatusQueue = new LinkedBlockingQueue<>();
+    private final BlockingQueue<CapturedStatus<ProcessGroupStatus>> 
processGroupStatusQueue = new LinkedBlockingQueue<>();
+    private final BlockingQueue<CapturedStatus<ConnectionStatus>> 
connectionStatusQueue = new LinkedBlockingQueue<>();
+    private final BlockingQueue<CapturedStatus<RemoteProcessGroupStatus>> 
remoteProcessGroupStatusQueue = new LinkedBlockingQueue<>();
+    private final BlockingQueue<CapturedStatus<ProcessorStatus>> 
processorStatusQueue = new LinkedBlockingQueue<>();
+
+    public BufferedStatusHistoryStorage(final StatusHistoryStorage storage, 
final long persistFrequencyInMs, final int persistBatchSize) {
+        this.storage = storage;
+        this.persistFrequencyInMs = persistFrequencyInMs;
+        this.persistBatchSize = persistBatchSize;
+    }
+
+    @Override
+    public void init() {
+        storage.init();
+        final ScheduledFuture<?> future = 
scheduledExecutorService.scheduleWithFixedDelay(
+                new BufferedStatusHistoryStorageWorker(), 
persistFrequencyInMs, persistFrequencyInMs, TimeUnit.MILLISECONDS);
+        scheduledFutures.add(future);
+        LOGGER.info("Flushing is initiated");
+    }
+
+    @Override
+    public void close() {
+        storage.close();
+        LOGGER.debug("Flushing shutdown started");
+        int cancelCompleted = 0;
+        int cancelFailed = 0;
+
+        for (final ScheduledFuture<?> scheduledFuture : scheduledFutures) {
+            final boolean cancelled = scheduledFuture.cancel(true);
+            if (cancelled) {
+                cancelCompleted++;
+            } else {
+                cancelFailed++;
+            }
+        }
+
+        LOGGER.debug("Flushing shutdown task cancellation status: completed 
[{}] failed [{}]", cancelCompleted, cancelFailed);
+        final List<Runnable> tasks = scheduledExecutorService.shutdownNow();
+        LOGGER.debug(" Scheduled Task Service shutdown remaining tasks [{}]", 
tasks.size());
+
+    }
+
+    @Override
+    public List<StatusSnapshot> getConnectionSnapshots(final String 
componentId, final Date start, final Date end) {
+        return storage.getConnectionSnapshots(componentId, start, end);
+    }
+
+    @Override
+    public List<StatusSnapshot> getProcessGroupSnapshots(final String 
componentId, final Date start, final Date end) {
+        return storage.getProcessGroupSnapshots(componentId, start, end);
+    }
+
+    @Override
+    public List<StatusSnapshot> getRemoteProcessGroupSnapshots(final String 
componentId, final Date start, final Date end) {
+        return storage.getRemoteProcessGroupSnapshots(componentId, start, end);
+    }
+
+    @Override
+    public List<StatusSnapshot> getProcessorSnapshots(final String 
componentId, final Date start, final Date end) {
+        return storage.getProcessorSnapshots(componentId, start, end);
+    }
+
+    @Override
+    public List<StatusSnapshot> getProcessorSnapshotsWithCounters(final String 
componentId, final Date start, final Date end) {
+        return storage.getProcessorSnapshotsWithCounters(componentId, start, 
end);
+    }
+
+    @Override
+    public List<GarbageCollectionStatus> getGarbageCollectionSnapshots(final 
Date start, final Date end) {
+        return storage.getGarbageCollectionSnapshots(start, end);
+    }
+
+    @Override
+    public List<StatusSnapshot> getNodeStatusSnapshots(final Date start, final 
Date end) {
+        return storage.getNodeStatusSnapshots(start, end);
+    }
+
+    @Override
+    public void storeNodeStatuses(final Collection<CapturedStatus<NodeStatus>> 
statuses) {
+        nodeStatusQueue.addAll(statuses);
+    }
+
+    @Override
+    public void storeGarbageCollectionStatuses(final 
Collection<CapturedStatus<GarbageCollectionStatus>> statuses) {
+        garbageCollectionStatusQueue.addAll(statuses);
+    }
+
+    @Override
+    public void storeProcessGroupStatuses(final 
Collection<CapturedStatus<ProcessGroupStatus>> statuses) {
+        processGroupStatusQueue.addAll(statuses);
+    }
+
+    @Override
+    public void storeConnectionStatuses(final 
Collection<CapturedStatus<ConnectionStatus>> statuses) {
+        connectionStatusQueue.addAll(statuses);
+    }
+
+    @Override
+    public void storeRemoteProcessorGroupStatuses(final 
Collection<CapturedStatus<RemoteProcessGroupStatus>> statuses) {
+        remoteProcessGroupStatusQueue.addAll(statuses);
+    }
+
+    @Override
+    public void storeProcessorStatuses(final 
Collection<CapturedStatus<ProcessorStatus>> statuses) {
+        processorStatusQueue.addAll(statuses);
+    }
+
+    private class BufferedStatusHistoryStorageWorker implements Runnable {
+        @Override
+        public void run() {
+            LOGGER.debug("Start flushing");
+            flush(nodeStatusQueue, storage::storeNodeStatuses);
+            flush(garbageCollectionStatusQueue, 
storage::storeGarbageCollectionStatuses);
+            flush(processGroupStatusQueue, storage::storeProcessGroupStatuses);
+            flush(connectionStatusQueue, storage::storeConnectionStatuses);
+            flush(remoteProcessGroupStatusQueue, 
storage::storeRemoteProcessorGroupStatuses);
+            flush(processorStatusQueue, storage::storeProcessorStatuses);
+            LOGGER.debug("Finish flushing");
+        }
+
+        private <T> void flush(final BlockingQueue<T> source, final 
Consumer<Collection<T>> target) {
+            final ArrayList<T> statusEntries = new 
ArrayList<>(persistBatchSize);
+            source.drainTo(statusEntries, persistBatchSize);
+
+            if (!statusEntries.isEmpty()) {
+                try {
+                    target.accept(statusEntries);
+                } catch (final Exception e) {
+                    LOGGER.error("Error during flushing buffered status 
history information: " + e.getMessage(), e);

Review Comment:
   Including `e.getMessage()` is unnecessary since it will be included in the 
stack trace.
   ```suggestion
                       LOGGER.error("Error during flushing buffered status 
history information", e);
   ```



##########
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-status-history-shared/pom.xml:
##########
@@ -30,5 +30,9 @@
             <groupId>org.apache.nifi</groupId>
             <artifactId>nifi-framework-api</artifactId>
         </dependency>
+        <dependency>
+            <groupId>org.apache.commons</groupId>
+            <artifactId>commons-lang3</artifactId>

Review Comment:
   Is this dependency necessary? It would be better to keep the shared module 
limited in terms of dependencies if possible.



##########
nifi-nar-bundles/nifi-questdb-bundle/nifi-questdb-status-history/src/main/java/org/apache/nifi/controller/status/history/questdb/CapturedStatus.java:
##########
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller.status.history.questdb;
+
+import java.time.Instant;
+import java.util.Objects;
+
+final class CapturedStatus<T> {
+    private final T status;
+    private final Instant capturedAt;

Review Comment:
   Recommend avoiding the `At` suffix and just using `captured` as the property 
name.



##########
nifi-nar-bundles/nifi-questdb-bundle/nifi-questdb-status-history/src/main/java/org/apache/nifi/controller/status/history/questdb/EmbeddedQuestDbStatusHistoryRepositoryDefinitions.java:
##########
@@ -0,0 +1,343 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller.status.history.questdb;
+
+import org.apache.nifi.controller.status.ConnectionStatus;
+import org.apache.nifi.controller.status.NodeStatus;
+import org.apache.nifi.controller.status.ProcessGroupStatus;
+import org.apache.nifi.controller.status.ProcessorStatus;
+import org.apache.nifi.controller.status.RemoteProcessGroupStatus;
+import org.apache.nifi.controller.status.history.ConnectionStatusDescriptor;
+import org.apache.nifi.controller.status.history.GarbageCollectionStatus;
+import org.apache.nifi.controller.status.history.MetricDescriptor;
+import org.apache.nifi.controller.status.history.NodeStatusDescriptor;
+import org.apache.nifi.controller.status.history.ProcessGroupStatusDescriptor;
+import org.apache.nifi.controller.status.history.ProcessorStatusDescriptor;
+import 
org.apache.nifi.controller.status.history.RemoteProcessGroupStatusDescriptor;
+import org.apache.nifi.controller.status.history.StandardMetricDescriptor;
+import org.apache.nifi.controller.status.history.StandardStatusSnapshot;
+import org.apache.nifi.questdb.InsertRowDataSource;
+import org.apache.nifi.questdb.QueryResultProcessor;
+import org.apache.nifi.questdb.mapping.RequestMapping;
+import org.apache.nifi.questdb.mapping.RequestMappingBuilder;
+
+import java.time.ZoneId;
+import java.time.format.DateTimeFormatter;
+import java.util.Collection;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+final class EmbeddedQuestDbStatusHistoryRepositoryDefinitions {
+    /**
+     * Date format expected by the storage.
+     */
+    static final String CAPTURE_DATE_FORMAT = "yyyy-MM-dd:HH:mm:ss Z";

Review Comment:
   Is the standard timestamp format for QuestDB queries, as opposed to the ISO 
8601, using `T` to separate the year-month-day from `hour-minute-second`, and 
without the space before the timezone?



##########
nifi-nar-bundles/nifi-questdb-bundle/nifi-questdb/pom.xml:
##########
@@ -0,0 +1,54 @@
+<?xml version="1.0"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+      http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
https://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.nifi</groupId>
+        <artifactId>nifi-questdb-bundle</artifactId>
+        <version>2.0.0-SNAPSHOT</version>
+    </parent>
+    <artifactId>nifi-questdb</artifactId>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-utils</artifactId>
+            <version>2.0.0-SNAPSHOT</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.commons</groupId>
+            <artifactId>commons-lang3</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>commons-io</groupId>
+            <artifactId>commons-io</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-api</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.questdb</groupId>
+            <artifactId>questdb</artifactId>
+            <version>7.3.5</version>

Review Comment:
   The latest version in the main branch is 7.3.7, so this should match that 
version.
   ```suggestion
               <version>7.3.7</version>
   ```



##########
nifi-nar-bundles/nifi-questdb-bundle/nifi-questdb/src/main/java/org/apache/nifi/questdb/embedded/EmbeddedClient.java:
##########
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.questdb.embedded;
+
+import io.questdb.cairo.CairoEngine;
+import io.questdb.cairo.CairoError;
+import io.questdb.cairo.TableToken;
+import io.questdb.cairo.TableWriter;
+import io.questdb.cairo.sql.RecordCursor;
+import io.questdb.cairo.sql.RecordCursorFactory;
+import io.questdb.griffin.CompiledQuery;
+import io.questdb.griffin.SqlCompiler;
+import io.questdb.griffin.SqlCompilerFactoryImpl;
+import io.questdb.griffin.SqlException;
+import io.questdb.griffin.SqlExecutionContext;
+import io.questdb.mp.SCSequence;
+import io.questdb.mp.TimeoutBlockingWaitStrategy;
+import org.apache.nifi.questdb.Client;
+import org.apache.nifi.questdb.DatabaseException;
+import org.apache.nifi.questdb.InsertRowDataSource;
+import org.apache.nifi.questdb.QueryResultProcessor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Supplier;
+
+final class EmbeddedClient implements Client {
+    private final static Logger LOGGER = 
LoggerFactory.getLogger(EmbeddedClient.class);
+
+    private final Supplier<CairoEngine> engine;
+    private final AtomicBoolean disconnected = new AtomicBoolean(false);
+
+    EmbeddedClient(final Supplier<CairoEngine> engine) {
+        this.engine = engine;
+    }
+
+    @Override
+    public void execute(final String query) throws DatabaseException {
+        checkConnectionState();
+
+        try (final SqlCompiler compiler = getCompiler()) {
+            final CompiledQuery compile = compiler.compile(query, 
getSqlExecutionContext());
+            compile.execute(new SCSequence(new TimeoutBlockingWaitStrategy(5, 
TimeUnit.SECONDS)));
+        } catch (final SqlException | CairoError e) {
+            throw new DatabaseException(e);
+        }
+    }
+
+    @Override
+    public void insert(
+        final String tableName,
+        final InsertRowDataSource rowDataSource
+    ) throws DatabaseException {
+        checkConnectionState();
+
+        if (!rowDataSource.hasNextToInsert()) {
+            LOGGER.debug("No rows to insert into {}", tableName);
+            return;
+        }
+
+        final TableToken tableToken = 
engine.get().getTableTokenIfExists(tableName);
+
+        if (tableToken == null) {
+            throw new DatabaseException(String.format("Table Token for table 
[%s] not found", tableName));
+        }
+
+        try (
+            final TableWriter tableWriter = engine.get().getWriter(tableToken, 
"adding rows")
+        ) {
+            final TableWriterBasedInsertRowContext context = new 
TableWriterBasedInsertRowContext(tableWriter);
+
+            while (rowDataSource.hasNextToInsert()) {
+                context.addRow(rowDataSource);
+            }
+
+            LOGGER.debug("Committing {} rows", tableWriter.getRowCount());
+            tableWriter.commit();
+        } catch (final Exception | CairoError e) {
+            // CairoError might be thrown in extreme cases, for example when 
no space left on the disk
+            LOGGER.error("Add rows to table [{}] failed", tableName, e);
+            throw new DatabaseException(e);

Review Comment:
   Is there a reason for logging the error and throwing the exception? The 
caller should also catch and log the exception.



##########
nifi-nar-bundles/nifi-questdb-bundle/nifi-questdb/src/main/java/org/apache/nifi/questdb/embedded/ClientIsDisconnectedException.java:
##########
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.questdb.embedded;
+
+import org.apache.nifi.questdb.DatabaseException;
+
+final class ClientIsDisconnectedException extends DatabaseException {

Review Comment:
   Recommend adjusting the name to `ClientDisconnectedException`



##########
nifi-nar-bundles/nifi-questdb-bundle/nifi-questdb-status-history/src/main/java/org/apache/nifi/controller/status/history/questdb/StatusHistoryStorage.java:
##########
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller.status.history.questdb;
+
+import org.apache.nifi.controller.status.ConnectionStatus;
+import org.apache.nifi.controller.status.NodeStatus;
+import org.apache.nifi.controller.status.ProcessGroupStatus;
+import org.apache.nifi.controller.status.ProcessorStatus;
+import org.apache.nifi.controller.status.RemoteProcessGroupStatus;
+import org.apache.nifi.controller.status.history.GarbageCollectionStatus;
+import org.apache.nifi.controller.status.history.StatusSnapshot;
+
+import java.util.Collection;
+import java.util.Date;
+import java.util.List;
+
+interface StatusHistoryStorage {
+
+    default void init() {};
+    default void close() {};
+
+    List<StatusSnapshot> getConnectionSnapshots(final String componentId, 
final Date start, final Date end);

Review Comment:
   Is there a reason for using `java.util.Date` as opposed to 
`java.time.Instant` in this new interface definition?



##########
nifi-nar-bundles/nifi-questdb-bundle/nifi-questdb-status-history/src/main/java/org/apache/nifi/controller/status/history/questdb/QuestDbStatusHistoryStorage.java:
##########
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller.status.history.questdb;
+
+import org.apache.nifi.controller.status.ConnectionStatus;
+import org.apache.nifi.controller.status.NodeStatus;
+import org.apache.nifi.controller.status.ProcessGroupStatus;
+import org.apache.nifi.controller.status.ProcessorStatus;
+import org.apache.nifi.controller.status.RemoteProcessGroupStatus;
+import org.apache.nifi.controller.status.history.GarbageCollectionStatus;
+import org.apache.nifi.controller.status.history.StandardMetricDescriptor;
+import org.apache.nifi.controller.status.history.StandardStatusSnapshot;
+import org.apache.nifi.controller.status.history.StatusSnapshot;
+import org.apache.nifi.questdb.Client;
+import org.apache.nifi.questdb.DatabaseException;
+import org.apache.nifi.questdb.InsertRowDataSource;
+import org.apache.nifi.questdb.QueryResultProcessor;
+import org.apache.nifi.questdb.mapping.RequestMapping;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Instant;
+import java.time.temporal.ChronoUnit;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static 
org.apache.nifi.controller.status.history.questdb.EmbeddedQuestDbStatusHistoryRepositoryDefinitions.COMPONENT_STATUS_QUERY;
+import static 
org.apache.nifi.controller.status.history.questdb.EmbeddedQuestDbStatusHistoryRepositoryDefinitions.CONNECTION_STATUS_REQUEST_MAPPING;
+import static 
org.apache.nifi.controller.status.history.questdb.EmbeddedQuestDbStatusHistoryRepositoryDefinitions.NODE_STATUS_QUERY;
+import static 
org.apache.nifi.controller.status.history.questdb.EmbeddedQuestDbStatusHistoryRepositoryDefinitions.PROCESSOR_STATUS_REQUEST_MAPPING;
+import static 
org.apache.nifi.controller.status.history.questdb.EmbeddedQuestDbStatusHistoryRepositoryDefinitions.PROCESS_GROUP_STATUS_REQUEST_MAPPING;
+import static 
org.apache.nifi.controller.status.history.questdb.EmbeddedQuestDbStatusHistoryRepositoryDefinitions.REMOTE_PROCESS_GROUP_STATUS_REQUEST_MAPPING;
+import static 
org.apache.nifi.controller.status.history.questdb.EmbeddedQuestDbStatusHistoryRepositoryDefinitions.STATUS_QUERY_GARBAGE_COLLECTION;
+import static 
org.apache.nifi.controller.status.history.questdb.EmbeddedQuestDbStatusHistoryRepositoryDefinitions.STORAGE_STATUS_QUERY;
+import static 
org.apache.nifi.controller.status.history.questdb.EmbeddedQuestDbStatusHistoryRepositoryDefinitions.TABLE_NAME_COMPONENT_COUNTER;
+import static 
org.apache.nifi.controller.status.history.questdb.EmbeddedQuestDbStatusHistoryRepositoryDefinitions.TABLE_NAME_CONNECTION_STATUS;
+import static 
org.apache.nifi.controller.status.history.questdb.EmbeddedQuestDbStatusHistoryRepositoryDefinitions.TABLE_NAME_GARBAGE_COLLECTION_STATUS;
+import static 
org.apache.nifi.controller.status.history.questdb.EmbeddedQuestDbStatusHistoryRepositoryDefinitions.TABLE_NAME_NODE_STATUS;
+import static 
org.apache.nifi.controller.status.history.questdb.EmbeddedQuestDbStatusHistoryRepositoryDefinitions.TABLE_NAME_PROCESSOR_STATUS;
+import static 
org.apache.nifi.controller.status.history.questdb.EmbeddedQuestDbStatusHistoryRepositoryDefinitions.TABLE_NAME_PROCESS_GROUP_STATUS;
+import static 
org.apache.nifi.controller.status.history.questdb.EmbeddedQuestDbStatusHistoryRepositoryDefinitions.TABLE_NAME_REMOTE_PROCESS_GROUP_STATUS;
+import static 
org.apache.nifi.controller.status.history.questdb.EmbeddedQuestDbStatusHistoryRepositoryDefinitions.TABLE_NAME_STORAGE_STATUS;
+
+final class QuestDbStatusHistoryStorage implements StatusHistoryStorage {
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(QuestDbStatusHistoryStorage.class);
+
+    private final Client client;
+
+    QuestDbStatusHistoryStorage(final Client client) {
+        this.client = client;
+    }
+
+    @Override
+    public List<StatusSnapshot> getConnectionSnapshots(final String 
componentId, final Date start, final Date end) {
+        return getComponentSnapshots(TABLE_NAME_CONNECTION_STATUS, 
componentId, CONNECTION_STATUS_REQUEST_MAPPING, start, end);
+    }
+
+    @Override
+    public List<StatusSnapshot> getProcessGroupSnapshots(final String 
componentId, final Date start, final Date end) {
+        return getComponentSnapshots(TABLE_NAME_PROCESS_GROUP_STATUS, 
componentId, PROCESS_GROUP_STATUS_REQUEST_MAPPING, start, end);
+    }
+
+    @Override
+    public List<StatusSnapshot> getRemoteProcessGroupSnapshots(final String 
componentId, final Date start, final Date end) {
+        return getComponentSnapshots(TABLE_NAME_REMOTE_PROCESS_GROUP_STATUS, 
componentId, REMOTE_PROCESS_GROUP_STATUS_REQUEST_MAPPING, start, end);
+    }
+
+    @Override
+    public List<StatusSnapshot> getProcessorSnapshots(final String 
componentId, final Date start, final Date end) {
+        return getComponentSnapshots(TABLE_NAME_PROCESSOR_STATUS, componentId, 
PROCESSOR_STATUS_REQUEST_MAPPING, start, end);
+    }
+
+    @Override
+    public List<StatusSnapshot> getProcessorSnapshotsWithCounters(final String 
componentId, final Date start, final Date end) {
+        final List<StatusSnapshot> componentSnapshots = 
getComponentSnapshots(TABLE_NAME_PROCESSOR_STATUS, componentId, 
PROCESSOR_STATUS_REQUEST_MAPPING, start, end);
+        final String query = String.format(COMPONENT_STATUS_QUERY, 
TABLE_NAME_COMPONENT_COUNTER, componentId, getStartTime(start), 
getEndTime(end));
+        return getResult(query, new 
CounterStatisticsResultProcessor(componentSnapshots), Collections.emptyList());
+    }
+
+    @Override
+    public List<GarbageCollectionStatus> getGarbageCollectionSnapshots(final 
Date start, final Date end) {
+        final String query = String.format(STATUS_QUERY_GARBAGE_COLLECTION, 
getStartTime(start), getEndTime(end));
+        return getResult(query, new GarbageCollectionResultProcessor(), 
Collections.emptyList());
+    }
+
+    @Override
+    public List<StatusSnapshot> getNodeStatusSnapshots(final Date start, final 
Date end) {
+        final String storageStatusQuery = String.format(STORAGE_STATUS_QUERY, 
getStartTime(start), getEndTime(end));
+        final Map<Long, Map<StandardMetricDescriptor<NodeStatus>, Long>> 
statusMetricsByTime
+            = getResult(storageStatusQuery, 
EmbeddedQuestDbStatusHistoryRepositoryDefinitions.getStorageStatusResultProcessor(),
 new HashMap<>());
+        final String nodeStatusQuery = String.format(NODE_STATUS_QUERY, 
getStartTime(start), getEndTime(end));
+        return getSnapshot(nodeStatusQuery, 
EmbeddedQuestDbStatusHistoryRepositoryDefinitions.getNodeStatusResultProcessor(statusMetricsByTime));
+    }
+
+    @Override
+    public void storeNodeStatuses(final Collection<CapturedStatus<NodeStatus>> 
statuses) {
+        store(TABLE_NAME_NODE_STATUS, 
EmbeddedQuestDbStatusHistoryRepositoryDefinitions.getNodeStatusDataSource(statuses));
+        store(TABLE_NAME_STORAGE_STATUS, 
StorateStatusDataSource.getInstance(statuses));
+    }
+
+    @Override
+    public void storeGarbageCollectionStatuses(final 
Collection<CapturedStatus<GarbageCollectionStatus>> statuses) {
+        store(TABLE_NAME_GARBAGE_COLLECTION_STATUS, 
EmbeddedQuestDbStatusHistoryRepositoryDefinitions.getGarbageCollectionStatusDataSource(statuses));
+    }
+
+    @Override
+    public void storeProcessGroupStatuses(final 
Collection<CapturedStatus<ProcessGroupStatus>> statuses) {
+        store(TABLE_NAME_PROCESS_GROUP_STATUS, 
EmbeddedQuestDbStatusHistoryRepositoryDefinitions.getProcessGroupStatusDataSource(statuses));
+    }
+
+    @Override
+    public void storeConnectionStatuses(final 
Collection<CapturedStatus<ConnectionStatus>> statuses) {
+        store(TABLE_NAME_CONNECTION_STATUS, 
EmbeddedQuestDbStatusHistoryRepositoryDefinitions.getConnectionStatusDataSource(statuses));
+    }
+
+    @Override
+    public void storeRemoteProcessorGroupStatuses(final 
Collection<CapturedStatus<RemoteProcessGroupStatus>> statuses) {
+        store(TABLE_NAME_REMOTE_PROCESS_GROUP_STATUS, 
EmbeddedQuestDbStatusHistoryRepositoryDefinitions.getRemoteProcessGroupStatusDataSource(statuses));
+    }
+
+    @Override
+    public void storeProcessorStatuses(final 
Collection<CapturedStatus<ProcessorStatus>> statuses) {
+        store(TABLE_NAME_PROCESSOR_STATUS, 
EmbeddedQuestDbStatusHistoryRepositoryDefinitions.getProcessorStatusDataSource(statuses));
+        store(TABLE_NAME_COMPONENT_COUNTER, 
EmbeddedQuestDbStatusHistoryRepositoryDefinitions.getCounterStatisticsDataSource(statuses));
+    }
+
+    private <T> void store(final String tableName, final InsertRowDataSource 
source) {
+        try {
+            client.insert(tableName, source);
+        } catch (final DatabaseException e) {
+            LOGGER.error("Error during storing snapshots to " + tableName, e);
+        }
+    }
+
+    private List<StatusSnapshot> getComponentSnapshots(final String tableName, 
final String componentId, final RequestMapping<StandardStatusSnapshot> mapping, 
final Date start, final Date end) {
+        final String query = String.format(COMPONENT_STATUS_QUERY, tableName, 
componentId, getStartTime(start), getEndTime(end));
+        return getSnapshot(query, RequestMapping.getResultProcessor(mapping));
+    }
+
+    private List<StatusSnapshot> getSnapshot(final String query, final 
QueryResultProcessor<List<StandardStatusSnapshot>> rowProcessor) {
+        return new ArrayList<>(getResult(query, rowProcessor,  
Collections.emptyList()));
+    }
+
+    private <T> T getResult(final String query, final QueryResultProcessor<T> 
rowProcessor, final T errorResult) {
+        try {
+            return client.query(query, rowProcessor);
+        } catch (final DatabaseException e) {
+            LOGGER.error("Error during returning results for query " + query, 
e);

Review Comment:
   Log statements should always use placeholder values instead of string 
concatenation.
   ```suggestion
               LOGGER.error("Error during returning results for query {}", 
query, e);
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to