exceptionfactory commented on code in PR #8152: URL: https://github.com/apache/nifi/pull/8152#discussion_r1424815114
########## nifi-commons/nifi-utils/src/main/java/org/apache/nifi/retry/NoReturnCallable.java: ########## @@ -14,25 +14,16 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.nifi.controller.status.history.storage; - -import org.apache.nifi.controller.status.history.GarbageCollectionHistory; -import org.apache.nifi.controller.status.history.GarbageCollectionStatus; - -import java.time.Instant; +package org.apache.nifi.retry; /** - * Readable status storage for garbage collection status entries. + * Represents a portion of callable code without expected return value. */ -public interface GarbageCollectionStatusStorage extends StatusStorage<GarbageCollectionStatus> { +@FunctionalInterface +public interface NoReturnCallable { Review Comment: There seems like a very generic interface. Is it necessary as opposed to the standard `Runnable`, or `Callable` with `Void` return? ########## nifi-docs/src/main/asciidoc/administration-guide.adoc: ########## @@ -3538,15 +3538,19 @@ of 576. ==== Persistent repository -If the value of the property `nifi.components.status.repository.implementation` is `EmbeddedQuestDbStatusHistoryRepository`, the -status history data will be stored to the disk in a persistent manner. Data will be kept between restarts. +If the value of the property `nifi.components.status.repository.implementation` is `org.apache.nifi.controller.status.history.questdb.EmbeddedQuestDbStatusHistoryRepository`, the +status history data will be stored to the disk in a persistent manner. Data will be kept between restarts. In order to use persistent repository, the QuestDB NAR must be re-built with the `include-questdb` profiles enabled. |==== |*Property*|*Description* |`nifi.status.repository.questdb.persist.node.days`|The number of days the node status data (such as Repository disk space free, garbage collection information, etc.) will be kept. The default values is `14`. |`nifi.status.repository.questdb.persist.component.days`|The number of days the component status data (i.e., stats for each Processor, Connection, etc.) will be kept. The default value is `3`. |`nifi.status.repository.questdb.persist.location`|The location of the persistent Status History Repository. The default value is `./status_repository`. +|`nifi.status.repository.questdb.persist.location.backup`|The location of the database backup in case the database is being corrupted and recreated. The default value is `./status_repository_backup`. +|`nifi.status.repository.questdb.persist.batchsize`|The QuestDb based status history repository persists the collected status information in batches. The batch size determines the maximum number of persisted status records at a given time. The default value is `1000`. Review Comment: How likely is it that this will need to be changed? Is it a case where having an internal default is sufficient? I suppose it depends on the volume of data, but if it does not need to be changed, it may be better to keep it as an internal setting versus a public property. ########## nifi-nar-bundles/nifi-questdb-bundle/nifi-questdb-status-history/src/main/java/org/apache/nifi/controller/status/history/questdb/EmbeddedQuestDbStatusHistoryRepositoryDefinitions.java: ########## @@ -0,0 +1,343 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.controller.status.history.questdb; + +import org.apache.nifi.controller.status.ConnectionStatus; +import org.apache.nifi.controller.status.NodeStatus; +import org.apache.nifi.controller.status.ProcessGroupStatus; +import org.apache.nifi.controller.status.ProcessorStatus; +import org.apache.nifi.controller.status.RemoteProcessGroupStatus; +import org.apache.nifi.controller.status.history.ConnectionStatusDescriptor; +import org.apache.nifi.controller.status.history.GarbageCollectionStatus; +import org.apache.nifi.controller.status.history.MetricDescriptor; +import org.apache.nifi.controller.status.history.NodeStatusDescriptor; +import org.apache.nifi.controller.status.history.ProcessGroupStatusDescriptor; +import org.apache.nifi.controller.status.history.ProcessorStatusDescriptor; +import org.apache.nifi.controller.status.history.RemoteProcessGroupStatusDescriptor; +import org.apache.nifi.controller.status.history.StandardMetricDescriptor; +import org.apache.nifi.controller.status.history.StandardStatusSnapshot; +import org.apache.nifi.questdb.InsertRowDataSource; +import org.apache.nifi.questdb.QueryResultProcessor; +import org.apache.nifi.questdb.mapping.RequestMapping; +import org.apache.nifi.questdb.mapping.RequestMappingBuilder; + +import java.time.ZoneId; +import java.time.format.DateTimeFormatter; +import java.util.Collection; +import java.util.Date; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +final class EmbeddedQuestDbStatusHistoryRepositoryDefinitions { + /** + * Date format expected by the storage. + */ + static final String CAPTURE_DATE_FORMAT = "yyyy-MM-dd:HH:mm:ss Z"; + + /** + * Date formatter for the database fields. + */ + static final DateTimeFormatter DATE_FORMATTER = DateTimeFormatter.ofPattern(CAPTURE_DATE_FORMAT).withZone(ZoneId.systemDefault()); + + // General component + + static final String COMPONENT_STATUS_QUERY = + "SELECT * FROM %s " + + "WHERE componentId = '%s' " + + "AND capturedAt > to_timestamp('%s', '" + CAPTURE_DATE_FORMAT + "') " + + "AND capturedAt < to_timestamp('%s', '" + CAPTURE_DATE_FORMAT + "') " + + "ORDER BY capturedAt ASC"; + + // Connection + + static final String TABLE_NAME_CONNECTION_STATUS = "connectionStatus"; + + static final String CREATE_CONNECTION_STATUS = + "CREATE TABLE " + TABLE_NAME_CONNECTION_STATUS + " (" + + "capturedAt TIMESTAMP," + + "componentId SYMBOL capacity 2000 nocache index capacity 1500," + + "inputBytes LONG," + + "inputCount LONG," + + "outputBytes LONG," + + "outputCount LONG," + + "queuedBytes LONG," + + "queuedCount LONG," + + "totalQueuedDuration LONG," + + "maxQueuedDuration LONG," + + "averageQueuedDuration LONG" + + ") TIMESTAMP(capturedAt) PARTITION BY DAY"; + + private static final Map<Integer, MetricDescriptor<ConnectionStatus>> CONNECTION_METRICS = new HashMap<>() {{ + put(2, ConnectionStatusDescriptor.INPUT_BYTES.getDescriptor()); + put(3, ConnectionStatusDescriptor.INPUT_COUNT.getDescriptor()); + put(4, ConnectionStatusDescriptor.OUTPUT_BYTES.getDescriptor()); + put(5, ConnectionStatusDescriptor.OUTPUT_COUNT.getDescriptor()); + put(6, ConnectionStatusDescriptor.QUEUED_BYTES.getDescriptor()); + put(7, ConnectionStatusDescriptor.QUEUED_COUNT.getDescriptor()); + put(8, ConnectionStatusDescriptor.TOTAL_QUEUED_DURATION.getDescriptor()); + put(9, ConnectionStatusDescriptor.MAX_QUEUED_DURATION.getDescriptor()); + put(10, ConnectionStatusDescriptor.AVERAGE_QUEUED_DURATION.getDescriptor()); + }}; Review Comment: The anonymous HashMap creation with nested `put` should not be used. Recommend either `Map.of()` or using a static initializer. If the order is important, this should also be a `LinkedHashMap`. ########## nifi-nar-bundles/nifi-questdb-bundle/nifi-questdb-status-history/src/main/java/org/apache/nifi/controller/status/history/questdb/BufferedStatusHistoryStorage.java: ########## @@ -0,0 +1,190 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.controller.status.history.questdb; + +import org.apache.commons.lang3.concurrent.BasicThreadFactory; +import org.apache.nifi.controller.status.ConnectionStatus; +import org.apache.nifi.controller.status.NodeStatus; +import org.apache.nifi.controller.status.ProcessGroupStatus; +import org.apache.nifi.controller.status.ProcessorStatus; +import org.apache.nifi.controller.status.RemoteProcessGroupStatus; +import org.apache.nifi.controller.status.history.GarbageCollectionStatus; +import org.apache.nifi.controller.status.history.StatusSnapshot; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Date; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; + +final class BufferedStatusHistoryStorage implements StatusHistoryStorage { + private static final Logger LOGGER = LoggerFactory.getLogger(BufferedStatusHistoryStorage.class); + + private final String id = UUID.randomUUID().toString(); + private final List<ScheduledFuture<?>> scheduledFutures = new ArrayList<>(); + private final ScheduledExecutorService scheduledExecutorService = Executors + .newScheduledThreadPool(1, new BasicThreadFactory.Builder().namingPattern("BufferedStatusHistoryStorage-" + id + "-%d").build()); + + private final StatusHistoryStorage storage; + private final long persistFrequencyInMs; + private final int persistBatchSize; + + private final BlockingQueue<CapturedStatus<NodeStatus>> nodeStatusQueue = new LinkedBlockingQueue<>(); + private final BlockingQueue<CapturedStatus<GarbageCollectionStatus>> garbageCollectionStatusQueue = new LinkedBlockingQueue<>(); + private final BlockingQueue<CapturedStatus<ProcessGroupStatus>> processGroupStatusQueue = new LinkedBlockingQueue<>(); + private final BlockingQueue<CapturedStatus<ConnectionStatus>> connectionStatusQueue = new LinkedBlockingQueue<>(); + private final BlockingQueue<CapturedStatus<RemoteProcessGroupStatus>> remoteProcessGroupStatusQueue = new LinkedBlockingQueue<>(); + private final BlockingQueue<CapturedStatus<ProcessorStatus>> processorStatusQueue = new LinkedBlockingQueue<>(); + + public BufferedStatusHistoryStorage(final StatusHistoryStorage storage, final long persistFrequencyInMs, final int persistBatchSize) { + this.storage = storage; + this.persistFrequencyInMs = persistFrequencyInMs; + this.persistBatchSize = persistBatchSize; + } + + @Override + public void init() { + storage.init(); + final ScheduledFuture<?> future = scheduledExecutorService.scheduleWithFixedDelay( + new BufferedStatusHistoryStorageWorker(), persistFrequencyInMs, persistFrequencyInMs, TimeUnit.MILLISECONDS); + scheduledFutures.add(future); + LOGGER.info("Flushing is initiated"); + } + + @Override + public void close() { + storage.close(); + LOGGER.debug("Flushing shutdown started"); + int cancelCompleted = 0; + int cancelFailed = 0; + + for (final ScheduledFuture<?> scheduledFuture : scheduledFutures) { + final boolean cancelled = scheduledFuture.cancel(true); + if (cancelled) { + cancelCompleted++; + } else { + cancelFailed++; + } + } + + LOGGER.debug("Flushing shutdown task cancellation status: completed [{}] failed [{}]", cancelCompleted, cancelFailed); + final List<Runnable> tasks = scheduledExecutorService.shutdownNow(); + LOGGER.debug(" Scheduled Task Service shutdown remaining tasks [{}]", tasks.size()); + + } + + @Override + public List<StatusSnapshot> getConnectionSnapshots(final String componentId, final Date start, final Date end) { + return storage.getConnectionSnapshots(componentId, start, end); + } + + @Override + public List<StatusSnapshot> getProcessGroupSnapshots(final String componentId, final Date start, final Date end) { + return storage.getProcessGroupSnapshots(componentId, start, end); + } + + @Override + public List<StatusSnapshot> getRemoteProcessGroupSnapshots(final String componentId, final Date start, final Date end) { + return storage.getRemoteProcessGroupSnapshots(componentId, start, end); + } + + @Override + public List<StatusSnapshot> getProcessorSnapshots(final String componentId, final Date start, final Date end) { + return storage.getProcessorSnapshots(componentId, start, end); + } + + @Override + public List<StatusSnapshot> getProcessorSnapshotsWithCounters(final String componentId, final Date start, final Date end) { + return storage.getProcessorSnapshotsWithCounters(componentId, start, end); + } + + @Override + public List<GarbageCollectionStatus> getGarbageCollectionSnapshots(final Date start, final Date end) { + return storage.getGarbageCollectionSnapshots(start, end); + } + + @Override + public List<StatusSnapshot> getNodeStatusSnapshots(final Date start, final Date end) { + return storage.getNodeStatusSnapshots(start, end); + } + + @Override + public void storeNodeStatuses(final Collection<CapturedStatus<NodeStatus>> statuses) { + nodeStatusQueue.addAll(statuses); + } + + @Override + public void storeGarbageCollectionStatuses(final Collection<CapturedStatus<GarbageCollectionStatus>> statuses) { + garbageCollectionStatusQueue.addAll(statuses); + } + + @Override + public void storeProcessGroupStatuses(final Collection<CapturedStatus<ProcessGroupStatus>> statuses) { + processGroupStatusQueue.addAll(statuses); + } + + @Override + public void storeConnectionStatuses(final Collection<CapturedStatus<ConnectionStatus>> statuses) { + connectionStatusQueue.addAll(statuses); + } + + @Override + public void storeRemoteProcessorGroupStatuses(final Collection<CapturedStatus<RemoteProcessGroupStatus>> statuses) { + remoteProcessGroupStatusQueue.addAll(statuses); + } + + @Override + public void storeProcessorStatuses(final Collection<CapturedStatus<ProcessorStatus>> statuses) { + processorStatusQueue.addAll(statuses); + } + + private class BufferedStatusHistoryStorageWorker implements Runnable { + @Override + public void run() { + LOGGER.debug("Start flushing"); + flush(nodeStatusQueue, storage::storeNodeStatuses); + flush(garbageCollectionStatusQueue, storage::storeGarbageCollectionStatuses); + flush(processGroupStatusQueue, storage::storeProcessGroupStatuses); + flush(connectionStatusQueue, storage::storeConnectionStatuses); + flush(remoteProcessGroupStatusQueue, storage::storeRemoteProcessorGroupStatuses); + flush(processorStatusQueue, storage::storeProcessorStatuses); + LOGGER.debug("Finish flushing"); + } + + private <T> void flush(final BlockingQueue<T> source, final Consumer<Collection<T>> target) { + final ArrayList<T> statusEntries = new ArrayList<>(persistBatchSize); + source.drainTo(statusEntries, persistBatchSize); + + if (!statusEntries.isEmpty()) { + try { + target.accept(statusEntries); + } catch (final Exception e) { + LOGGER.error("Error during flushing buffered status history information: " + e.getMessage(), e); Review Comment: Including `e.getMessage()` is unnecessary since it will be included in the stack trace. ```suggestion LOGGER.error("Error during flushing buffered status history information", e); ``` ########## nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-status-history-shared/pom.xml: ########## @@ -30,5 +30,9 @@ <groupId>org.apache.nifi</groupId> <artifactId>nifi-framework-api</artifactId> </dependency> + <dependency> + <groupId>org.apache.commons</groupId> + <artifactId>commons-lang3</artifactId> Review Comment: Is this dependency necessary? It would be better to keep the shared module limited in terms of dependencies if possible. ########## nifi-nar-bundles/nifi-questdb-bundle/nifi-questdb-status-history/src/main/java/org/apache/nifi/controller/status/history/questdb/CapturedStatus.java: ########## @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.controller.status.history.questdb; + +import java.time.Instant; +import java.util.Objects; + +final class CapturedStatus<T> { + private final T status; + private final Instant capturedAt; Review Comment: Recommend avoiding the `At` suffix and just using `captured` as the property name. ########## nifi-nar-bundles/nifi-questdb-bundle/nifi-questdb-status-history/src/main/java/org/apache/nifi/controller/status/history/questdb/EmbeddedQuestDbStatusHistoryRepositoryDefinitions.java: ########## @@ -0,0 +1,343 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.controller.status.history.questdb; + +import org.apache.nifi.controller.status.ConnectionStatus; +import org.apache.nifi.controller.status.NodeStatus; +import org.apache.nifi.controller.status.ProcessGroupStatus; +import org.apache.nifi.controller.status.ProcessorStatus; +import org.apache.nifi.controller.status.RemoteProcessGroupStatus; +import org.apache.nifi.controller.status.history.ConnectionStatusDescriptor; +import org.apache.nifi.controller.status.history.GarbageCollectionStatus; +import org.apache.nifi.controller.status.history.MetricDescriptor; +import org.apache.nifi.controller.status.history.NodeStatusDescriptor; +import org.apache.nifi.controller.status.history.ProcessGroupStatusDescriptor; +import org.apache.nifi.controller.status.history.ProcessorStatusDescriptor; +import org.apache.nifi.controller.status.history.RemoteProcessGroupStatusDescriptor; +import org.apache.nifi.controller.status.history.StandardMetricDescriptor; +import org.apache.nifi.controller.status.history.StandardStatusSnapshot; +import org.apache.nifi.questdb.InsertRowDataSource; +import org.apache.nifi.questdb.QueryResultProcessor; +import org.apache.nifi.questdb.mapping.RequestMapping; +import org.apache.nifi.questdb.mapping.RequestMappingBuilder; + +import java.time.ZoneId; +import java.time.format.DateTimeFormatter; +import java.util.Collection; +import java.util.Date; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +final class EmbeddedQuestDbStatusHistoryRepositoryDefinitions { + /** + * Date format expected by the storage. + */ + static final String CAPTURE_DATE_FORMAT = "yyyy-MM-dd:HH:mm:ss Z"; Review Comment: Is the standard timestamp format for QuestDB queries, as opposed to the ISO 8601, using `T` to separate the year-month-day from `hour-minute-second`, and without the space before the timezone? ########## nifi-nar-bundles/nifi-questdb-bundle/nifi-questdb/pom.xml: ########## @@ -0,0 +1,54 @@ +<?xml version="1.0"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-questdb-bundle</artifactId> + <version>2.0.0-SNAPSHOT</version> + </parent> + <artifactId>nifi-questdb</artifactId> + + <dependencies> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-utils</artifactId> + <version>2.0.0-SNAPSHOT</version> + </dependency> + + <dependency> + <groupId>org.apache.commons</groupId> + <artifactId>commons-lang3</artifactId> + </dependency> + <dependency> + <groupId>commons-io</groupId> + <artifactId>commons-io</artifactId> + </dependency> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-api</artifactId> + </dependency> + <dependency> + <groupId>org.questdb</groupId> + <artifactId>questdb</artifactId> + <version>7.3.5</version> Review Comment: The latest version in the main branch is 7.3.7, so this should match that version. ```suggestion <version>7.3.7</version> ``` ########## nifi-nar-bundles/nifi-questdb-bundle/nifi-questdb/src/main/java/org/apache/nifi/questdb/embedded/EmbeddedClient.java: ########## @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.questdb.embedded; + +import io.questdb.cairo.CairoEngine; +import io.questdb.cairo.CairoError; +import io.questdb.cairo.TableToken; +import io.questdb.cairo.TableWriter; +import io.questdb.cairo.sql.RecordCursor; +import io.questdb.cairo.sql.RecordCursorFactory; +import io.questdb.griffin.CompiledQuery; +import io.questdb.griffin.SqlCompiler; +import io.questdb.griffin.SqlCompilerFactoryImpl; +import io.questdb.griffin.SqlException; +import io.questdb.griffin.SqlExecutionContext; +import io.questdb.mp.SCSequence; +import io.questdb.mp.TimeoutBlockingWaitStrategy; +import org.apache.nifi.questdb.Client; +import org.apache.nifi.questdb.DatabaseException; +import org.apache.nifi.questdb.InsertRowDataSource; +import org.apache.nifi.questdb.QueryResultProcessor; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Supplier; + +final class EmbeddedClient implements Client { + private final static Logger LOGGER = LoggerFactory.getLogger(EmbeddedClient.class); + + private final Supplier<CairoEngine> engine; + private final AtomicBoolean disconnected = new AtomicBoolean(false); + + EmbeddedClient(final Supplier<CairoEngine> engine) { + this.engine = engine; + } + + @Override + public void execute(final String query) throws DatabaseException { + checkConnectionState(); + + try (final SqlCompiler compiler = getCompiler()) { + final CompiledQuery compile = compiler.compile(query, getSqlExecutionContext()); + compile.execute(new SCSequence(new TimeoutBlockingWaitStrategy(5, TimeUnit.SECONDS))); + } catch (final SqlException | CairoError e) { + throw new DatabaseException(e); + } + } + + @Override + public void insert( + final String tableName, + final InsertRowDataSource rowDataSource + ) throws DatabaseException { + checkConnectionState(); + + if (!rowDataSource.hasNextToInsert()) { + LOGGER.debug("No rows to insert into {}", tableName); + return; + } + + final TableToken tableToken = engine.get().getTableTokenIfExists(tableName); + + if (tableToken == null) { + throw new DatabaseException(String.format("Table Token for table [%s] not found", tableName)); + } + + try ( + final TableWriter tableWriter = engine.get().getWriter(tableToken, "adding rows") + ) { + final TableWriterBasedInsertRowContext context = new TableWriterBasedInsertRowContext(tableWriter); + + while (rowDataSource.hasNextToInsert()) { + context.addRow(rowDataSource); + } + + LOGGER.debug("Committing {} rows", tableWriter.getRowCount()); + tableWriter.commit(); + } catch (final Exception | CairoError e) { + // CairoError might be thrown in extreme cases, for example when no space left on the disk + LOGGER.error("Add rows to table [{}] failed", tableName, e); + throw new DatabaseException(e); Review Comment: Is there a reason for logging the error and throwing the exception? The caller should also catch and log the exception. ########## nifi-nar-bundles/nifi-questdb-bundle/nifi-questdb/src/main/java/org/apache/nifi/questdb/embedded/ClientIsDisconnectedException.java: ########## @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.questdb.embedded; + +import org.apache.nifi.questdb.DatabaseException; + +final class ClientIsDisconnectedException extends DatabaseException { Review Comment: Recommend adjusting the name to `ClientDisconnectedException` ########## nifi-nar-bundles/nifi-questdb-bundle/nifi-questdb-status-history/src/main/java/org/apache/nifi/controller/status/history/questdb/StatusHistoryStorage.java: ########## @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.controller.status.history.questdb; + +import org.apache.nifi.controller.status.ConnectionStatus; +import org.apache.nifi.controller.status.NodeStatus; +import org.apache.nifi.controller.status.ProcessGroupStatus; +import org.apache.nifi.controller.status.ProcessorStatus; +import org.apache.nifi.controller.status.RemoteProcessGroupStatus; +import org.apache.nifi.controller.status.history.GarbageCollectionStatus; +import org.apache.nifi.controller.status.history.StatusSnapshot; + +import java.util.Collection; +import java.util.Date; +import java.util.List; + +interface StatusHistoryStorage { + + default void init() {}; + default void close() {}; + + List<StatusSnapshot> getConnectionSnapshots(final String componentId, final Date start, final Date end); Review Comment: Is there a reason for using `java.util.Date` as opposed to `java.time.Instant` in this new interface definition? ########## nifi-nar-bundles/nifi-questdb-bundle/nifi-questdb-status-history/src/main/java/org/apache/nifi/controller/status/history/questdb/QuestDbStatusHistoryStorage.java: ########## @@ -0,0 +1,181 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.controller.status.history.questdb; + +import org.apache.nifi.controller.status.ConnectionStatus; +import org.apache.nifi.controller.status.NodeStatus; +import org.apache.nifi.controller.status.ProcessGroupStatus; +import org.apache.nifi.controller.status.ProcessorStatus; +import org.apache.nifi.controller.status.RemoteProcessGroupStatus; +import org.apache.nifi.controller.status.history.GarbageCollectionStatus; +import org.apache.nifi.controller.status.history.StandardMetricDescriptor; +import org.apache.nifi.controller.status.history.StandardStatusSnapshot; +import org.apache.nifi.controller.status.history.StatusSnapshot; +import org.apache.nifi.questdb.Client; +import org.apache.nifi.questdb.DatabaseException; +import org.apache.nifi.questdb.InsertRowDataSource; +import org.apache.nifi.questdb.QueryResultProcessor; +import org.apache.nifi.questdb.mapping.RequestMapping; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.Instant; +import java.time.temporal.ChronoUnit; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Date; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.apache.nifi.controller.status.history.questdb.EmbeddedQuestDbStatusHistoryRepositoryDefinitions.COMPONENT_STATUS_QUERY; +import static org.apache.nifi.controller.status.history.questdb.EmbeddedQuestDbStatusHistoryRepositoryDefinitions.CONNECTION_STATUS_REQUEST_MAPPING; +import static org.apache.nifi.controller.status.history.questdb.EmbeddedQuestDbStatusHistoryRepositoryDefinitions.NODE_STATUS_QUERY; +import static org.apache.nifi.controller.status.history.questdb.EmbeddedQuestDbStatusHistoryRepositoryDefinitions.PROCESSOR_STATUS_REQUEST_MAPPING; +import static org.apache.nifi.controller.status.history.questdb.EmbeddedQuestDbStatusHistoryRepositoryDefinitions.PROCESS_GROUP_STATUS_REQUEST_MAPPING; +import static org.apache.nifi.controller.status.history.questdb.EmbeddedQuestDbStatusHistoryRepositoryDefinitions.REMOTE_PROCESS_GROUP_STATUS_REQUEST_MAPPING; +import static org.apache.nifi.controller.status.history.questdb.EmbeddedQuestDbStatusHistoryRepositoryDefinitions.STATUS_QUERY_GARBAGE_COLLECTION; +import static org.apache.nifi.controller.status.history.questdb.EmbeddedQuestDbStatusHistoryRepositoryDefinitions.STORAGE_STATUS_QUERY; +import static org.apache.nifi.controller.status.history.questdb.EmbeddedQuestDbStatusHistoryRepositoryDefinitions.TABLE_NAME_COMPONENT_COUNTER; +import static org.apache.nifi.controller.status.history.questdb.EmbeddedQuestDbStatusHistoryRepositoryDefinitions.TABLE_NAME_CONNECTION_STATUS; +import static org.apache.nifi.controller.status.history.questdb.EmbeddedQuestDbStatusHistoryRepositoryDefinitions.TABLE_NAME_GARBAGE_COLLECTION_STATUS; +import static org.apache.nifi.controller.status.history.questdb.EmbeddedQuestDbStatusHistoryRepositoryDefinitions.TABLE_NAME_NODE_STATUS; +import static org.apache.nifi.controller.status.history.questdb.EmbeddedQuestDbStatusHistoryRepositoryDefinitions.TABLE_NAME_PROCESSOR_STATUS; +import static org.apache.nifi.controller.status.history.questdb.EmbeddedQuestDbStatusHistoryRepositoryDefinitions.TABLE_NAME_PROCESS_GROUP_STATUS; +import static org.apache.nifi.controller.status.history.questdb.EmbeddedQuestDbStatusHistoryRepositoryDefinitions.TABLE_NAME_REMOTE_PROCESS_GROUP_STATUS; +import static org.apache.nifi.controller.status.history.questdb.EmbeddedQuestDbStatusHistoryRepositoryDefinitions.TABLE_NAME_STORAGE_STATUS; + +final class QuestDbStatusHistoryStorage implements StatusHistoryStorage { + private static final Logger LOGGER = LoggerFactory.getLogger(QuestDbStatusHistoryStorage.class); + + private final Client client; + + QuestDbStatusHistoryStorage(final Client client) { + this.client = client; + } + + @Override + public List<StatusSnapshot> getConnectionSnapshots(final String componentId, final Date start, final Date end) { + return getComponentSnapshots(TABLE_NAME_CONNECTION_STATUS, componentId, CONNECTION_STATUS_REQUEST_MAPPING, start, end); + } + + @Override + public List<StatusSnapshot> getProcessGroupSnapshots(final String componentId, final Date start, final Date end) { + return getComponentSnapshots(TABLE_NAME_PROCESS_GROUP_STATUS, componentId, PROCESS_GROUP_STATUS_REQUEST_MAPPING, start, end); + } + + @Override + public List<StatusSnapshot> getRemoteProcessGroupSnapshots(final String componentId, final Date start, final Date end) { + return getComponentSnapshots(TABLE_NAME_REMOTE_PROCESS_GROUP_STATUS, componentId, REMOTE_PROCESS_GROUP_STATUS_REQUEST_MAPPING, start, end); + } + + @Override + public List<StatusSnapshot> getProcessorSnapshots(final String componentId, final Date start, final Date end) { + return getComponentSnapshots(TABLE_NAME_PROCESSOR_STATUS, componentId, PROCESSOR_STATUS_REQUEST_MAPPING, start, end); + } + + @Override + public List<StatusSnapshot> getProcessorSnapshotsWithCounters(final String componentId, final Date start, final Date end) { + final List<StatusSnapshot> componentSnapshots = getComponentSnapshots(TABLE_NAME_PROCESSOR_STATUS, componentId, PROCESSOR_STATUS_REQUEST_MAPPING, start, end); + final String query = String.format(COMPONENT_STATUS_QUERY, TABLE_NAME_COMPONENT_COUNTER, componentId, getStartTime(start), getEndTime(end)); + return getResult(query, new CounterStatisticsResultProcessor(componentSnapshots), Collections.emptyList()); + } + + @Override + public List<GarbageCollectionStatus> getGarbageCollectionSnapshots(final Date start, final Date end) { + final String query = String.format(STATUS_QUERY_GARBAGE_COLLECTION, getStartTime(start), getEndTime(end)); + return getResult(query, new GarbageCollectionResultProcessor(), Collections.emptyList()); + } + + @Override + public List<StatusSnapshot> getNodeStatusSnapshots(final Date start, final Date end) { + final String storageStatusQuery = String.format(STORAGE_STATUS_QUERY, getStartTime(start), getEndTime(end)); + final Map<Long, Map<StandardMetricDescriptor<NodeStatus>, Long>> statusMetricsByTime + = getResult(storageStatusQuery, EmbeddedQuestDbStatusHistoryRepositoryDefinitions.getStorageStatusResultProcessor(), new HashMap<>()); + final String nodeStatusQuery = String.format(NODE_STATUS_QUERY, getStartTime(start), getEndTime(end)); + return getSnapshot(nodeStatusQuery, EmbeddedQuestDbStatusHistoryRepositoryDefinitions.getNodeStatusResultProcessor(statusMetricsByTime)); + } + + @Override + public void storeNodeStatuses(final Collection<CapturedStatus<NodeStatus>> statuses) { + store(TABLE_NAME_NODE_STATUS, EmbeddedQuestDbStatusHistoryRepositoryDefinitions.getNodeStatusDataSource(statuses)); + store(TABLE_NAME_STORAGE_STATUS, StorateStatusDataSource.getInstance(statuses)); + } + + @Override + public void storeGarbageCollectionStatuses(final Collection<CapturedStatus<GarbageCollectionStatus>> statuses) { + store(TABLE_NAME_GARBAGE_COLLECTION_STATUS, EmbeddedQuestDbStatusHistoryRepositoryDefinitions.getGarbageCollectionStatusDataSource(statuses)); + } + + @Override + public void storeProcessGroupStatuses(final Collection<CapturedStatus<ProcessGroupStatus>> statuses) { + store(TABLE_NAME_PROCESS_GROUP_STATUS, EmbeddedQuestDbStatusHistoryRepositoryDefinitions.getProcessGroupStatusDataSource(statuses)); + } + + @Override + public void storeConnectionStatuses(final Collection<CapturedStatus<ConnectionStatus>> statuses) { + store(TABLE_NAME_CONNECTION_STATUS, EmbeddedQuestDbStatusHistoryRepositoryDefinitions.getConnectionStatusDataSource(statuses)); + } + + @Override + public void storeRemoteProcessorGroupStatuses(final Collection<CapturedStatus<RemoteProcessGroupStatus>> statuses) { + store(TABLE_NAME_REMOTE_PROCESS_GROUP_STATUS, EmbeddedQuestDbStatusHistoryRepositoryDefinitions.getRemoteProcessGroupStatusDataSource(statuses)); + } + + @Override + public void storeProcessorStatuses(final Collection<CapturedStatus<ProcessorStatus>> statuses) { + store(TABLE_NAME_PROCESSOR_STATUS, EmbeddedQuestDbStatusHistoryRepositoryDefinitions.getProcessorStatusDataSource(statuses)); + store(TABLE_NAME_COMPONENT_COUNTER, EmbeddedQuestDbStatusHistoryRepositoryDefinitions.getCounterStatisticsDataSource(statuses)); + } + + private <T> void store(final String tableName, final InsertRowDataSource source) { + try { + client.insert(tableName, source); + } catch (final DatabaseException e) { + LOGGER.error("Error during storing snapshots to " + tableName, e); + } + } + + private List<StatusSnapshot> getComponentSnapshots(final String tableName, final String componentId, final RequestMapping<StandardStatusSnapshot> mapping, final Date start, final Date end) { + final String query = String.format(COMPONENT_STATUS_QUERY, tableName, componentId, getStartTime(start), getEndTime(end)); + return getSnapshot(query, RequestMapping.getResultProcessor(mapping)); + } + + private List<StatusSnapshot> getSnapshot(final String query, final QueryResultProcessor<List<StandardStatusSnapshot>> rowProcessor) { + return new ArrayList<>(getResult(query, rowProcessor, Collections.emptyList())); + } + + private <T> T getResult(final String query, final QueryResultProcessor<T> rowProcessor, final T errorResult) { + try { + return client.query(query, rowProcessor); + } catch (final DatabaseException e) { + LOGGER.error("Error during returning results for query " + query, e); Review Comment: Log statements should always use placeholder values instead of string concatenation. ```suggestion LOGGER.error("Error during returning results for query {}", query, e); ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org