exceptionfactory commented on code in PR #8152:
URL: https://github.com/apache/nifi/pull/8152#discussion_r1435330854


##########
nifi-commons/nifi-utils/src/test/java/org/apache/nifi/retry/ExceptionExcludingRetryConditionTest.java:
##########
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.retry;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import javax.naming.OperationNotSupportedException;
+import java.io.IOException;
+import java.util.Arrays;
+
+class ExceptionExcludingRetryConditionTest {
+    final ExceptionExcludingRetryCondition testSubject = getTestSubject();
+
+    @Test
+    public void testAllowIfNoException() {
+        final MutableRetryExecutionContext context = new 
MutableRetryExecutionContext();
+        Assertions.assertTrue(testSubject.allowNextAttempt(context));

Review Comment:
   As a general rule, it is best to use static imports for assert and mock 
methods to helpful with readability of tests.
   



##########
nifi-commons/nifi-utils/src/test/java/org/apache/nifi/retry/SynchronousRetryTemplateTest.java:
##########
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.retry;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.function.BiConsumer;
+
+public class SynchronousRetryTemplateTest {
+    public static final int NUMBER_OF_RETRIES = 2;
+    public static final Callable<String> FALLBACK_ACTION = () -> "fallback";
+    public static final Callable<String> FALLBACK_ACTION_FAILING = () -> {

Review Comment:
   Are these values used in other test classes? I may have missed the 
references, otherwise it would be helpful to lower the visibility.



##########
nifi-nar-bundles/nifi-questdb-bundle/nifi-questdb-status-history/src/main/java/org/apache/nifi/controller/status/history/questdb/QuestDbStatusHistoryStorage.java:
##########
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller.status.history.questdb;
+
+import org.apache.nifi.controller.status.ConnectionStatus;
+import org.apache.nifi.controller.status.NodeStatus;
+import org.apache.nifi.controller.status.ProcessGroupStatus;
+import org.apache.nifi.controller.status.ProcessorStatus;
+import org.apache.nifi.controller.status.RemoteProcessGroupStatus;
+import org.apache.nifi.controller.status.history.GarbageCollectionStatus;
+import org.apache.nifi.controller.status.history.StandardMetricDescriptor;
+import org.apache.nifi.controller.status.history.StandardStatusSnapshot;
+import org.apache.nifi.controller.status.history.StatusSnapshot;
+import org.apache.nifi.questdb.Client;
+import org.apache.nifi.questdb.DatabaseException;
+import org.apache.nifi.questdb.InsertRowDataSource;
+import org.apache.nifi.questdb.QueryResultProcessor;
+import org.apache.nifi.questdb.mapping.RequestMapping;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Instant;
+import java.time.temporal.ChronoUnit;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static 
org.apache.nifi.controller.status.history.questdb.EmbeddedQuestDbStatusHistoryRepositoryDefinitions.COMPONENT_STATUS_QUERY;
+import static 
org.apache.nifi.controller.status.history.questdb.EmbeddedQuestDbStatusHistoryRepositoryDefinitions.CONNECTION_STATUS_REQUEST_MAPPING;
+import static 
org.apache.nifi.controller.status.history.questdb.EmbeddedQuestDbStatusHistoryRepositoryDefinitions.NODE_STATUS_QUERY;
+import static 
org.apache.nifi.controller.status.history.questdb.EmbeddedQuestDbStatusHistoryRepositoryDefinitions.PROCESSOR_STATUS_REQUEST_MAPPING;
+import static 
org.apache.nifi.controller.status.history.questdb.EmbeddedQuestDbStatusHistoryRepositoryDefinitions.PROCESS_GROUP_STATUS_REQUEST_MAPPING;
+import static 
org.apache.nifi.controller.status.history.questdb.EmbeddedQuestDbStatusHistoryRepositoryDefinitions.REMOTE_PROCESS_GROUP_STATUS_REQUEST_MAPPING;
+import static 
org.apache.nifi.controller.status.history.questdb.EmbeddedQuestDbStatusHistoryRepositoryDefinitions.STATUS_QUERY_GARBAGE_COLLECTION;
+import static 
org.apache.nifi.controller.status.history.questdb.EmbeddedQuestDbStatusHistoryRepositoryDefinitions.STORAGE_STATUS_QUERY;
+import static 
org.apache.nifi.controller.status.history.questdb.EmbeddedQuestDbStatusHistoryRepositoryDefinitions.TABLE_NAME_COMPONENT_COUNTER;
+import static 
org.apache.nifi.controller.status.history.questdb.EmbeddedQuestDbStatusHistoryRepositoryDefinitions.TABLE_NAME_CONNECTION_STATUS;
+import static 
org.apache.nifi.controller.status.history.questdb.EmbeddedQuestDbStatusHistoryRepositoryDefinitions.TABLE_NAME_GARBAGE_COLLECTION_STATUS;
+import static 
org.apache.nifi.controller.status.history.questdb.EmbeddedQuestDbStatusHistoryRepositoryDefinitions.TABLE_NAME_NODE_STATUS;
+import static 
org.apache.nifi.controller.status.history.questdb.EmbeddedQuestDbStatusHistoryRepositoryDefinitions.TABLE_NAME_PROCESSOR_STATUS;
+import static 
org.apache.nifi.controller.status.history.questdb.EmbeddedQuestDbStatusHistoryRepositoryDefinitions.TABLE_NAME_PROCESS_GROUP_STATUS;
+import static 
org.apache.nifi.controller.status.history.questdb.EmbeddedQuestDbStatusHistoryRepositoryDefinitions.TABLE_NAME_REMOTE_PROCESS_GROUP_STATUS;
+import static 
org.apache.nifi.controller.status.history.questdb.EmbeddedQuestDbStatusHistoryRepositoryDefinitions.TABLE_NAME_STORAGE_STATUS;
+
+final class QuestDbStatusHistoryStorage implements StatusHistoryStorage {
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(QuestDbStatusHistoryStorage.class);
+
+    private final Client client;
+
+    QuestDbStatusHistoryStorage(final Client client) {
+        this.client = client;
+    }
+
+    @Override
+    public List<StatusSnapshot> getConnectionSnapshots(final String 
componentId, final Date start, final Date end) {
+        return getComponentSnapshots(TABLE_NAME_CONNECTION_STATUS, 
componentId, CONNECTION_STATUS_REQUEST_MAPPING, start, end);
+    }
+
+    @Override
+    public List<StatusSnapshot> getProcessGroupSnapshots(final String 
componentId, final Date start, final Date end) {
+        return getComponentSnapshots(TABLE_NAME_PROCESS_GROUP_STATUS, 
componentId, PROCESS_GROUP_STATUS_REQUEST_MAPPING, start, end);
+    }
+
+    @Override
+    public List<StatusSnapshot> getRemoteProcessGroupSnapshots(final String 
componentId, final Date start, final Date end) {
+        return getComponentSnapshots(TABLE_NAME_REMOTE_PROCESS_GROUP_STATUS, 
componentId, REMOTE_PROCESS_GROUP_STATUS_REQUEST_MAPPING, start, end);
+    }
+
+    @Override
+    public List<StatusSnapshot> getProcessorSnapshots(final String 
componentId, final Date start, final Date end) {
+        return getComponentSnapshots(TABLE_NAME_PROCESSOR_STATUS, componentId, 
PROCESSOR_STATUS_REQUEST_MAPPING, start, end);
+    }
+
+    @Override
+    public List<StatusSnapshot> getProcessorSnapshotsWithCounters(final String 
componentId, final Date start, final Date end) {
+        final List<StatusSnapshot> componentSnapshots = 
getComponentSnapshots(TABLE_NAME_PROCESSOR_STATUS, componentId, 
PROCESSOR_STATUS_REQUEST_MAPPING, start, end);
+        final String query = String.format(COMPONENT_STATUS_QUERY, 
TABLE_NAME_COMPONENT_COUNTER, componentId, getStartTime(start), 
getEndTime(end));
+        return getResult(query, new 
CounterStatisticsResultProcessor(componentSnapshots), Collections.emptyList());
+    }
+
+    @Override
+    public List<GarbageCollectionStatus> getGarbageCollectionSnapshots(final 
Date start, final Date end) {
+        final String query = String.format(STATUS_QUERY_GARBAGE_COLLECTION, 
getStartTime(start), getEndTime(end));
+        return getResult(query, new GarbageCollectionResultProcessor(), 
Collections.emptyList());
+    }
+
+    @Override
+    public List<StatusSnapshot> getNodeStatusSnapshots(final Date start, final 
Date end) {
+        final String storageStatusQuery = String.format(STORAGE_STATUS_QUERY, 
getStartTime(start), getEndTime(end));
+        final Map<Long, Map<StandardMetricDescriptor<NodeStatus>, Long>> 
statusMetricsByTime
+            = getResult(storageStatusQuery, 
EmbeddedQuestDbStatusHistoryRepositoryDefinitions.getStorageStatusResultProcessor(),
 new HashMap<>());
+        final String nodeStatusQuery = String.format(NODE_STATUS_QUERY, 
getStartTime(start), getEndTime(end));
+        return getSnapshot(nodeStatusQuery, 
EmbeddedQuestDbStatusHistoryRepositoryDefinitions.getNodeStatusResultProcessor(statusMetricsByTime));
+    }
+
+    @Override
+    public void storeNodeStatuses(final Collection<CapturedStatus<NodeStatus>> 
statuses) {
+        store(TABLE_NAME_NODE_STATUS, 
EmbeddedQuestDbStatusHistoryRepositoryDefinitions.getNodeStatusDataSource(statuses));
+        store(TABLE_NAME_STORAGE_STATUS, 
StorateStatusDataSource.getInstance(statuses));
+    }
+
+    @Override
+    public void storeGarbageCollectionStatuses(final 
Collection<CapturedStatus<GarbageCollectionStatus>> statuses) {
+        store(TABLE_NAME_GARBAGE_COLLECTION_STATUS, 
EmbeddedQuestDbStatusHistoryRepositoryDefinitions.getGarbageCollectionStatusDataSource(statuses));
+    }
+
+    @Override
+    public void storeProcessGroupStatuses(final 
Collection<CapturedStatus<ProcessGroupStatus>> statuses) {
+        store(TABLE_NAME_PROCESS_GROUP_STATUS, 
EmbeddedQuestDbStatusHistoryRepositoryDefinitions.getProcessGroupStatusDataSource(statuses));
+    }
+
+    @Override
+    public void storeConnectionStatuses(final 
Collection<CapturedStatus<ConnectionStatus>> statuses) {
+        store(TABLE_NAME_CONNECTION_STATUS, 
EmbeddedQuestDbStatusHistoryRepositoryDefinitions.getConnectionStatusDataSource(statuses));
+    }
+
+    @Override
+    public void storeRemoteProcessorGroupStatuses(final 
Collection<CapturedStatus<RemoteProcessGroupStatus>> statuses) {
+        store(TABLE_NAME_REMOTE_PROCESS_GROUP_STATUS, 
EmbeddedQuestDbStatusHistoryRepositoryDefinitions.getRemoteProcessGroupStatusDataSource(statuses));
+    }
+
+    @Override
+    public void storeProcessorStatuses(final 
Collection<CapturedStatus<ProcessorStatus>> statuses) {
+        store(TABLE_NAME_PROCESSOR_STATUS, 
EmbeddedQuestDbStatusHistoryRepositoryDefinitions.getProcessorStatusDataSource(statuses));
+        store(TABLE_NAME_COMPONENT_COUNTER, 
EmbeddedQuestDbStatusHistoryRepositoryDefinitions.getCounterStatisticsDataSource(statuses));
+    }
+
+    private <T> void store(final String tableName, final InsertRowDataSource 
source) {
+        try {
+            client.insert(tableName, source);
+        } catch (final DatabaseException e) {
+            LOGGER.error("Error during storing snapshots to " + tableName, e);

Review Comment:
   Placeholders should be used instead of string concatenation for logs.
   ```suggestion
               LOGGER.error("Error during storing snapshots to table [{}]", 
tableName, e);
   ```



##########
nifi-nar-bundles/nifi-questdb-bundle/nifi-questdb/src/main/java/org/apache/nifi/questdb/embedded/EmbeddedDatabaseManager.java:
##########
@@ -0,0 +1,297 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.questdb.embedded;
+
+import io.questdb.cairo.CairoConfiguration;
+import io.questdb.cairo.CairoEngine;
+import io.questdb.cairo.DefaultCairoConfiguration;
+import io.questdb.cairo.TableToken;
+import io.questdb.cairo.sql.TableRecordMetadata;
+import org.apache.commons.lang3.concurrent.BasicThreadFactory;
+import org.apache.nifi.questdb.Client;
+import org.apache.nifi.questdb.DatabaseException;
+import org.apache.nifi.questdb.DatabaseManager;
+import org.apache.nifi.util.file.FileUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.stream.Collectors;
+
+final class EmbeddedDatabaseManager implements DatabaseManager {
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(EmbeddedDatabaseManager.class);
+
+    private final String id = UUID.randomUUID().toString();
+    private final AtomicReference<EmbeddedDatabaseManagerStatus> state = new 
AtomicReference(EmbeddedDatabaseManagerStatus.UNINITIALIZED);
+    private final ReadWriteLock databaseStructureLock = new 
ReentrantReadWriteLock();
+    private final EmbeddedDatabaseManagerContext context;
+    private final AtomicReference<CairoEngine> engine = new 
AtomicReference<>();
+    private final List<ScheduledFuture<?>> scheduledFutures = new 
ArrayList<>();
+    private final ScheduledExecutorService scheduledExecutorService = Executors
+            .newScheduledThreadPool(2, new 
BasicThreadFactory.Builder().namingPattern("EmbeddedQuestDbManagerWorker-" + id 
+ "-%d").build());
+
+    EmbeddedDatabaseManager(final EmbeddedDatabaseManagerContext context) {
+        this.context = context;
+    }
+
+    @Override
+    public void init() {
+        if (state.get() != EmbeddedDatabaseManagerStatus.UNINITIALIZED) {
+            throw new IllegalStateException("Manager is already initialized!");
+        }
+
+        ensureDatabaseIsReady();
+        startRollover();
+    }
+
+    private void ensureDatabaseIsReady() {
+        boolean successful = false;
+
+        try {
+            databaseStructureLock.writeLock().lock();
+            state.set(EmbeddedDatabaseManagerStatus.REPAIRING);
+
+            try {
+                ensurePersistLocationIsAccessible();
+                ensureConnectionEstablished();
+                ensureTablesAreInPlaceAndHealthy();
+                successful = true;
+            } catch (final CorruptedDatabaseException e) {
+                boolean couldMoveOldToBackup = false;
+
+                try {
+                    LOGGER.error("Database is corrupted. Recreation is 
triggered.", e);

Review Comment:
   It would be helpful to include the `backupFolder` location in this message.



##########
nifi-nar-bundles/nifi-questdb-bundle/nifi-questdb/src/main/java/org/apache/nifi/questdb/embedded/EmbeddedDatabaseManagerContext.java:
##########
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.questdb.embedded;
+
+import java.io.File;
+import java.nio.file.Path;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+interface EmbeddedDatabaseManagerContext {
+    String getPersistLocation();
+    Path getPersistLocationAsPath();
+    File getPersistLocationAsFile();
+
+    String getBackupLocation();
+    Path getBackupLocationAsPath();
+    File getBackupLocationAsFile();
+
+    int getNumberOfAttemptedRetries();
+    int getLockAttemptTime();
+    TimeUnit getLockAttemptTimeUnit();

Review Comment:
   This could be replaced with a single method that returns a 
`java.time.Duration`.



##########
nifi-nar-bundles/nifi-questdb-bundle/nifi-questdb/src/main/java/org/apache/nifi/questdb/embedded/SimpleEmbeddedDatabaseManagerContext.java:
##########
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.questdb.embedded;
+
+import java.io.File;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+final class SimpleEmbeddedDatabaseManagerContext implements 
EmbeddedDatabaseManagerContext {
+    final private Set<ManagedTableDefinition> tableDefinitions = new 
HashSet<>();
+
+    private String persistLocation;
+    private String backupLocation;
+    private int numberOfAttemptedRetries;
+    private int lockAttemptTime;
+    private TimeUnit lockAttemptTimeUnit;
+    private int rolloverFrequency;
+    private TimeUnit rolloverFrequencyTimeUnit;

Review Comment:
   See notes on using Duration.



##########
nifi-nar-bundles/nifi-questdb-bundle/nifi-questdb/src/main/java/org/apache/nifi/questdb/embedded/EmbeddedClient.java:
##########
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.questdb.embedded;
+
+import io.questdb.cairo.CairoEngine;
+import io.questdb.cairo.CairoError;
+import io.questdb.cairo.TableToken;
+import io.questdb.cairo.TableWriter;
+import io.questdb.cairo.sql.RecordCursor;
+import io.questdb.cairo.sql.RecordCursorFactory;
+import io.questdb.griffin.CompiledQuery;
+import io.questdb.griffin.SqlCompiler;
+import io.questdb.griffin.SqlCompilerFactoryImpl;
+import io.questdb.griffin.SqlException;
+import io.questdb.griffin.SqlExecutionContext;
+import io.questdb.mp.SCSequence;
+import io.questdb.mp.TimeoutBlockingWaitStrategy;
+import org.apache.nifi.questdb.Client;
+import org.apache.nifi.questdb.DatabaseException;
+import org.apache.nifi.questdb.InsertRowDataSource;
+import org.apache.nifi.questdb.QueryResultProcessor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Supplier;
+
+final class EmbeddedClient implements Client {
+    private final static Logger LOGGER = 
LoggerFactory.getLogger(EmbeddedClient.class);
+
+    private final Supplier<CairoEngine> engine;
+    private final AtomicBoolean disconnected = new AtomicBoolean(false);
+
+    EmbeddedClient(final Supplier<CairoEngine> engine) {
+        this.engine = engine;
+    }
+
+    @Override
+    public void execute(final String query) throws DatabaseException {
+        checkConnectionState();
+
+        try (final SqlCompiler compiler = getCompiler()) {
+            final CompiledQuery compile = compiler.compile(query, 
getSqlExecutionContext());
+            compile.execute(new SCSequence(new TimeoutBlockingWaitStrategy(5, 
TimeUnit.SECONDS)));
+        } catch (final SqlException | CairoError e) {
+            throw new DatabaseException(e);
+        }
+    }
+
+    @Override
+    public void insert(
+        final String tableName,
+        final InsertRowDataSource rowDataSource
+    ) throws DatabaseException {
+        checkConnectionState();
+
+        if (!rowDataSource.hasNextToInsert()) {
+            LOGGER.debug("No rows to insert into {}", tableName);
+            return;
+        }
+
+        final TableToken tableToken = 
engine.get().getTableTokenIfExists(tableName);
+
+        if (tableToken == null) {
+            throw new DatabaseException(String.format("Table Token for table 
[%s] not found", tableName));
+        }
+
+        try (
+            final TableWriter tableWriter = engine.get().getWriter(tableToken, 
"adding rows")
+        ) {
+            final TableWriterBasedInsertRowContext context = new 
TableWriterBasedInsertRowContext(tableWriter);
+
+            while (rowDataSource.hasNextToInsert()) {
+                context.addRow(rowDataSource);
+            }
+
+            LOGGER.debug("Committing {} rows", tableWriter.getRowCount());
+            tableWriter.commit();
+        } catch (final Exception | CairoError e) {
+            // CairoError might be thrown in extreme cases, for example when 
no space left on the disk
+            throw new DatabaseException(e);
+        } finally {
+            engine.get().releaseInactive();
+        }
+    }
+
+    @Override
+    public <T> T query(final String query, final QueryResultProcessor<T> 
rowProcessor) throws DatabaseException {
+        checkConnectionState();
+
+        final CompiledQuery compiledQuery;
+
+        try (final SqlCompiler compiler = getCompiler()) {
+            compiledQuery = compiler.compile(query, getSqlExecutionContext());
+        } catch (final SqlException | CairoError e) {
+            throw new DatabaseException(e);
+        }
+
+        try (
+            final RecordCursorFactory factory = 
compiledQuery.getRecordCursorFactory();
+            final RecordCursor cursor = 
factory.getCursor(getSqlExecutionContext());
+        ) {
+            final CursorBasedQueryRowContext rowContext = new 
CursorBasedQueryRowContext(cursor);
+
+            while ((rowContext.hasNext())) {
+                rowContext.moveToNext();
+                rowProcessor.processRow(rowContext);
+            }
+
+            return rowProcessor.getResult();
+        } catch (final Exception e) {
+            throw new DatabaseException(e);
+        }
+    }
+
+    @Override
+    public void disconnect() throws DatabaseException {
+        checkConnectionState();
+        disconnected.set(true);
+        LOGGER.info("Client disconnects");

Review Comment:
   This looks like it should be at the debug level.



##########
nifi-nar-bundles/nifi-questdb-bundle/nifi-questdb/src/main/java/org/apache/nifi/questdb/embedded/EmbeddedDatabaseManager.java:
##########
@@ -0,0 +1,297 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.questdb.embedded;
+
+import io.questdb.cairo.CairoConfiguration;
+import io.questdb.cairo.CairoEngine;
+import io.questdb.cairo.DefaultCairoConfiguration;
+import io.questdb.cairo.TableToken;
+import io.questdb.cairo.sql.TableRecordMetadata;
+import org.apache.commons.lang3.concurrent.BasicThreadFactory;
+import org.apache.nifi.questdb.Client;
+import org.apache.nifi.questdb.DatabaseException;
+import org.apache.nifi.questdb.DatabaseManager;
+import org.apache.nifi.util.file.FileUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.stream.Collectors;
+
+final class EmbeddedDatabaseManager implements DatabaseManager {
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(EmbeddedDatabaseManager.class);
+
+    private final String id = UUID.randomUUID().toString();
+    private final AtomicReference<EmbeddedDatabaseManagerStatus> state = new 
AtomicReference(EmbeddedDatabaseManagerStatus.UNINITIALIZED);
+    private final ReadWriteLock databaseStructureLock = new 
ReentrantReadWriteLock();
+    private final EmbeddedDatabaseManagerContext context;
+    private final AtomicReference<CairoEngine> engine = new 
AtomicReference<>();
+    private final List<ScheduledFuture<?>> scheduledFutures = new 
ArrayList<>();
+    private final ScheduledExecutorService scheduledExecutorService = Executors
+            .newScheduledThreadPool(2, new 
BasicThreadFactory.Builder().namingPattern("EmbeddedQuestDbManagerWorker-" + id 
+ "-%d").build());
+
+    EmbeddedDatabaseManager(final EmbeddedDatabaseManagerContext context) {
+        this.context = context;
+    }
+
+    @Override
+    public void init() {
+        if (state.get() != EmbeddedDatabaseManagerStatus.UNINITIALIZED) {
+            throw new IllegalStateException("Manager is already initialized!");
+        }
+
+        ensureDatabaseIsReady();
+        startRollover();
+    }
+
+    private void ensureDatabaseIsReady() {
+        boolean successful = false;
+
+        try {
+            databaseStructureLock.writeLock().lock();
+            state.set(EmbeddedDatabaseManagerStatus.REPAIRING);
+
+            try {
+                ensurePersistLocationIsAccessible();
+                ensureConnectionEstablished();
+                ensureTablesAreInPlaceAndHealthy();
+                successful = true;
+            } catch (final CorruptedDatabaseException e) {
+                boolean couldMoveOldToBackup = false;
+
+                try {
+                    LOGGER.error("Database is corrupted. Recreation is 
triggered.", e);
+                    final File backupFolder = new 
File(context.getBackupLocationAsFile(), "backup_" + System.currentTimeMillis());
+                    
FileUtils.ensureDirectoryExistAndCanAccess(context.getBackupLocationAsFile());
+                    Files.move(context.getPersistLocationAsPath(), 
backupFolder.toPath());
+                    couldMoveOldToBackup = true;
+                } catch (IOException ex) {
+                    LOGGER.error("Could not create backup", ex);
+                }
+
+                if (!couldMoveOldToBackup) {
+                    try {
+                        
FileUtils.deleteFile(context.getPersistLocationAsFile(), true);
+                        couldMoveOldToBackup = true;
+                    } catch (IOException ex) {
+                        LOGGER.error("Could not clean up corrupted database", 
ex);
+                    }
+                }
+
+                if (couldMoveOldToBackup) {
+                    try {
+                        ensurePersistLocationIsAccessible();
+                        ensureConnectionEstablished();
+                        ensureTablesAreInPlaceAndHealthy();
+                        successful = true;
+                    } catch (CorruptedDatabaseException ex) {
+                        LOGGER.error("Could not create backup", ex);
+                    }
+                }
+            }
+        } finally {
+            state.set(successful? EmbeddedDatabaseManagerStatus.HEALTHY : 
EmbeddedDatabaseManagerStatus.CORRUPTED);
+
+            if (!successful) {
+                engine.set(null);
+            }
+
+            databaseStructureLock.writeLock().unlock();
+        }
+
+    }
+
+    private void ensurePersistLocationIsAccessible() throws 
CorruptedDatabaseException {
+        try {
+            
FileUtils.ensureDirectoryExistAndCanAccess(context.getPersistLocationAsPath().toFile());
+        } catch (final Exception e) {
+            final String errorMessage = String.format("Database directory 
creation failed [%s]", context.getPersistLocationAsPath());
+            LOGGER.error(errorMessage, e.toString());
+            throw new CorruptedDatabaseException(errorMessage, e);
+        }
+    }
+
+    private void ensureConnectionEstablished() throws 
CorruptedDatabaseException {
+        if (engine.get() != null) {
+            engine.getAndSet(null).close();
+        }
+
+        final String absolutePath = 
context.getPersistLocationAsFile().getAbsolutePath();
+        final CairoConfiguration configuration = new 
DefaultCairoConfiguration(absolutePath);
+
+        try {
+            final CairoEngine engine = new CairoEngine(configuration);
+            LOGGER.info("Database connection successful [{}]", absolutePath);
+            this.engine.set(engine);
+        } catch (final Exception e) {
+            LOGGER.error("Database connection failed [{}]", absolutePath, e);
+            throw new CorruptedDatabaseException(e);
+        }
+    }
+
+    private void ensureTablesAreInPlaceAndHealthy() throws 
CorruptedDatabaseException {
+        final Map<String, File> databaseFiles = 
Arrays.stream(context.getPersistLocationAsFile().listFiles())
+                .collect(Collectors.toMap(f -> 
f.getAbsolutePath().substring(context.getPersistLocationAsFile().getAbsolutePath().length()
 + 1), f -> f));
+        final Client client = getUnmanagedClient();
+
+        try {
+            for (final ManagedTableDefinition tableDefinition : 
context.getTableDefinitions()) {
+                if (!databaseFiles.containsKey(tableDefinition.getName())) {
+                    try {
+                        LOGGER.info("Creating table {}", 
tableDefinition.getName());
+                        client.execute(tableDefinition.getDefinition());
+                        LOGGER.info("Table {} is created", 
tableDefinition.getName());
+                    } catch (DatabaseException e) {
+                        throw new CorruptedDatabaseException(e);

Review Comment:
   It would be helpful to add a message indicating the table creation failed 
with the table name.



##########
nifi-nar-bundles/nifi-questdb-bundle/nifi-questdb/src/main/java/org/apache/nifi/questdb/DatabaseManager.java:
##########
@@ -14,25 +14,25 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.nifi.controller.status.history.storage;
-
-import org.apache.nifi.controller.status.NodeStatus;
-import org.apache.nifi.controller.status.history.StatusHistory;
-
-import java.time.Instant;
+package org.apache.nifi.questdb;
 
 /**
- * Readable status storage for the node status entries.
+ * Provides access to database via distributing clients. Also responsible to 
ensure the health of the database connection
+ * and database if possible.
  */
-public interface NodeStatusStorage extends StatusStorage<NodeStatus> {
+public interface DatabaseManager {
+    /**
+     * @return A client to execute queries against the managed database 
instance.
+     */
+    Client acquireClient();
+
+    /**
+     * Starts maintenance of the database. Necessary initialization step for 
proper use.
+     */
+    void init();
 
     /**
-     * Returns with the status history of the node for the specified time 
range.
-     *
-     * @param start Start date of the history, inclusive.
-     * @param end End date of the history, inclusive.
-     *
-     * @return Status history.
+     * Finishes maintenance of the database. After calling, manager does not 
guarantee any connection with the database.
      */
-    StatusHistory read(Instant start, Instant end);
+    void close();

Review Comment:
   It looks this interface could extend `Closeable` given the void `close()` 
method.



##########
nifi-commons/nifi-utils/src/test/java/org/apache/nifi/retry/LimitedAttemptRetryConditionTest.java:
##########
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.retry;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+@ExtendWith(MockitoExtension.class)
+class LimitedAttemptRetryConditionTest {
+
+    @Mock
+    public RetryExecutionContext context;
+
+    @Test
+    public void testCreationWithZeroAttempt() {
+        Assertions.assertThrows(IllegalArgumentException.class, () -> new 
LimitedAttemptRetryCondition(0));
+    }
+
+    @Test
+    public void testCreationWithNegativeAttempt() {
+        Assertions.assertThrows(IllegalArgumentException.class, () -> new 
LimitedAttemptRetryCondition(-1));
+    }
+
+    @Test
+    public void testAttempts() {
+        final LimitedAttemptRetryCondition testSubject = new 
LimitedAttemptRetryCondition(2);
+        Mockito.when(context.getNumberOfAttempts()).thenReturn(0, 1, 2, 3);
+        Assertions.assertTrue(testSubject.allowNextAttempt(context));
+        Assertions.assertTrue(testSubject.allowNextAttempt(context));
+        Assertions.assertTrue(testSubject.allowNextAttempt(context));
+        Assertions.assertFalse(testSubject.allowNextAttempt(context));

Review Comment:
   Just highlight this instance as an opportunity to improving the concision of 
expression:
   ```suggestion
           when(context.getNumberOfAttempts()).thenReturn(0, 1, 2, 3);
           assertTrue(testSubject.allowNextAttempt(context));
           assertTrue(testSubject.allowNextAttempt(context));
           assertTrue(testSubject.allowNextAttempt(context));
           assertFalse(testSubject.allowNextAttempt(context));
   ```



##########
nifi-nar-bundles/nifi-questdb-bundle/nifi-questdb/src/main/java/org/apache/nifi/questdb/embedded/LockedClient.java:
##########
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.questdb.embedded;
+
+import org.apache.nifi.questdb.Client;
+import org.apache.nifi.questdb.DatabaseException;
+import org.apache.nifi.questdb.InsertRowDataSource;
+import org.apache.nifi.questdb.QueryResultProcessor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
+
+final class LockedClient implements Client {
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(LockedClient.class);
+    private final Lock lock;
+    private final long lockAttemptTime;
+    private final TimeUnit lockAttemptTimeUnit;

Review Comment:
   This could be replaced with a single Duration.



##########
nifi-nar-bundles/nifi-questdb-bundle/nifi-questdb/src/main/java/org/apache/nifi/questdb/embedded/EmbeddedDatabaseManager.java:
##########
@@ -0,0 +1,297 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.questdb.embedded;
+
+import io.questdb.cairo.CairoConfiguration;
+import io.questdb.cairo.CairoEngine;
+import io.questdb.cairo.DefaultCairoConfiguration;
+import io.questdb.cairo.TableToken;
+import io.questdb.cairo.sql.TableRecordMetadata;
+import org.apache.commons.lang3.concurrent.BasicThreadFactory;
+import org.apache.nifi.questdb.Client;
+import org.apache.nifi.questdb.DatabaseException;
+import org.apache.nifi.questdb.DatabaseManager;
+import org.apache.nifi.util.file.FileUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.stream.Collectors;
+
+final class EmbeddedDatabaseManager implements DatabaseManager {
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(EmbeddedDatabaseManager.class);
+
+    private final String id = UUID.randomUUID().toString();
+    private final AtomicReference<EmbeddedDatabaseManagerStatus> state = new 
AtomicReference(EmbeddedDatabaseManagerStatus.UNINITIALIZED);
+    private final ReadWriteLock databaseStructureLock = new 
ReentrantReadWriteLock();
+    private final EmbeddedDatabaseManagerContext context;
+    private final AtomicReference<CairoEngine> engine = new 
AtomicReference<>();
+    private final List<ScheduledFuture<?>> scheduledFutures = new 
ArrayList<>();
+    private final ScheduledExecutorService scheduledExecutorService = Executors
+            .newScheduledThreadPool(2, new 
BasicThreadFactory.Builder().namingPattern("EmbeddedQuestDbManagerWorker-" + id 
+ "-%d").build());
+
+    EmbeddedDatabaseManager(final EmbeddedDatabaseManagerContext context) {
+        this.context = context;
+    }
+
+    @Override
+    public void init() {
+        if (state.get() != EmbeddedDatabaseManagerStatus.UNINITIALIZED) {
+            throw new IllegalStateException("Manager is already initialized!");

Review Comment:
   It is best to avoid including exclamations in messages.
   ```suggestion
               throw new IllegalStateException("Manager is already 
initialized");
   ```



##########
nifi-nar-bundles/nifi-questdb-bundle/nifi-questdb/src/main/java/org/apache/nifi/questdb/embedded/EmbeddedDatabaseManager.java:
##########
@@ -0,0 +1,297 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.questdb.embedded;
+
+import io.questdb.cairo.CairoConfiguration;
+import io.questdb.cairo.CairoEngine;
+import io.questdb.cairo.DefaultCairoConfiguration;
+import io.questdb.cairo.TableToken;
+import io.questdb.cairo.sql.TableRecordMetadata;
+import org.apache.commons.lang3.concurrent.BasicThreadFactory;
+import org.apache.nifi.questdb.Client;
+import org.apache.nifi.questdb.DatabaseException;
+import org.apache.nifi.questdb.DatabaseManager;
+import org.apache.nifi.util.file.FileUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.stream.Collectors;
+
+final class EmbeddedDatabaseManager implements DatabaseManager {
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(EmbeddedDatabaseManager.class);
+
+    private final String id = UUID.randomUUID().toString();
+    private final AtomicReference<EmbeddedDatabaseManagerStatus> state = new 
AtomicReference(EmbeddedDatabaseManagerStatus.UNINITIALIZED);
+    private final ReadWriteLock databaseStructureLock = new 
ReentrantReadWriteLock();
+    private final EmbeddedDatabaseManagerContext context;
+    private final AtomicReference<CairoEngine> engine = new 
AtomicReference<>();
+    private final List<ScheduledFuture<?>> scheduledFutures = new 
ArrayList<>();
+    private final ScheduledExecutorService scheduledExecutorService = Executors
+            .newScheduledThreadPool(2, new 
BasicThreadFactory.Builder().namingPattern("EmbeddedQuestDbManagerWorker-" + id 
+ "-%d").build());
+
+    EmbeddedDatabaseManager(final EmbeddedDatabaseManagerContext context) {
+        this.context = context;
+    }
+
+    @Override
+    public void init() {
+        if (state.get() != EmbeddedDatabaseManagerStatus.UNINITIALIZED) {
+            throw new IllegalStateException("Manager is already initialized!");
+        }
+
+        ensureDatabaseIsReady();
+        startRollover();
+    }
+
+    private void ensureDatabaseIsReady() {
+        boolean successful = false;
+
+        try {
+            databaseStructureLock.writeLock().lock();
+            state.set(EmbeddedDatabaseManagerStatus.REPAIRING);
+
+            try {
+                ensurePersistLocationIsAccessible();
+                ensureConnectionEstablished();
+                ensureTablesAreInPlaceAndHealthy();
+                successful = true;
+            } catch (final CorruptedDatabaseException e) {
+                boolean couldMoveOldToBackup = false;
+
+                try {
+                    LOGGER.error("Database is corrupted. Recreation is 
triggered.", e);
+                    final File backupFolder = new 
File(context.getBackupLocationAsFile(), "backup_" + System.currentTimeMillis());
+                    
FileUtils.ensureDirectoryExistAndCanAccess(context.getBackupLocationAsFile());
+                    Files.move(context.getPersistLocationAsPath(), 
backupFolder.toPath());
+                    couldMoveOldToBackup = true;
+                } catch (IOException ex) {
+                    LOGGER.error("Could not create backup", ex);
+                }
+
+                if (!couldMoveOldToBackup) {
+                    try {
+                        
FileUtils.deleteFile(context.getPersistLocationAsFile(), true);
+                        couldMoveOldToBackup = true;
+                    } catch (IOException ex) {
+                        LOGGER.error("Could not clean up corrupted database", 
ex);
+                    }
+                }
+
+                if (couldMoveOldToBackup) {
+                    try {
+                        ensurePersistLocationIsAccessible();
+                        ensureConnectionEstablished();
+                        ensureTablesAreInPlaceAndHealthy();
+                        successful = true;
+                    } catch (CorruptedDatabaseException ex) {
+                        LOGGER.error("Could not create backup", ex);
+                    }
+                }
+            }
+        } finally {
+            state.set(successful? EmbeddedDatabaseManagerStatus.HEALTHY : 
EmbeddedDatabaseManagerStatus.CORRUPTED);
+
+            if (!successful) {
+                engine.set(null);
+            }
+
+            databaseStructureLock.writeLock().unlock();
+        }
+
+    }
+
+    private void ensurePersistLocationIsAccessible() throws 
CorruptedDatabaseException {
+        try {
+            
FileUtils.ensureDirectoryExistAndCanAccess(context.getPersistLocationAsPath().toFile());
+        } catch (final Exception e) {
+            final String errorMessage = String.format("Database directory 
creation failed [%s]", context.getPersistLocationAsPath());
+            LOGGER.error(errorMessage, e.toString());

Review Comment:
   Is this log necessary, throwing the exception result in a log from calling 
code.



##########
nifi-commons/nifi-utils/src/test/java/org/apache/nifi/retry/ExceptionExcludingRetryConditionTest.java:
##########
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.retry;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import javax.naming.OperationNotSupportedException;
+import java.io.IOException;
+import java.util.Arrays;
+
+class ExceptionExcludingRetryConditionTest {
+    final ExceptionExcludingRetryCondition testSubject = getTestSubject();

Review Comment:
   Although this convention is used in a few places, I do not recommend using 
`testSubject` as the name of the component being tested. Instead, using a 
simple short name that reflects the class name, such as `condition` would be 
preferable. This is a stylistic recommendation, but it helps keep the overall 
source code repository coherent.



##########
nifi-nar-bundles/nifi-questdb-bundle/nifi-questdb/src/main/java/org/apache/nifi/questdb/embedded/EmbeddedDatabaseManager.java:
##########
@@ -0,0 +1,297 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.questdb.embedded;
+
+import io.questdb.cairo.CairoConfiguration;
+import io.questdb.cairo.CairoEngine;
+import io.questdb.cairo.DefaultCairoConfiguration;
+import io.questdb.cairo.TableToken;
+import io.questdb.cairo.sql.TableRecordMetadata;
+import org.apache.commons.lang3.concurrent.BasicThreadFactory;
+import org.apache.nifi.questdb.Client;
+import org.apache.nifi.questdb.DatabaseException;
+import org.apache.nifi.questdb.DatabaseManager;
+import org.apache.nifi.util.file.FileUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.stream.Collectors;
+
+final class EmbeddedDatabaseManager implements DatabaseManager {
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(EmbeddedDatabaseManager.class);
+
+    private final String id = UUID.randomUUID().toString();
+    private final AtomicReference<EmbeddedDatabaseManagerStatus> state = new 
AtomicReference(EmbeddedDatabaseManagerStatus.UNINITIALIZED);
+    private final ReadWriteLock databaseStructureLock = new 
ReentrantReadWriteLock();
+    private final EmbeddedDatabaseManagerContext context;
+    private final AtomicReference<CairoEngine> engine = new 
AtomicReference<>();
+    private final List<ScheduledFuture<?>> scheduledFutures = new 
ArrayList<>();
+    private final ScheduledExecutorService scheduledExecutorService = Executors
+            .newScheduledThreadPool(2, new 
BasicThreadFactory.Builder().namingPattern("EmbeddedQuestDbManagerWorker-" + id 
+ "-%d").build());
+
+    EmbeddedDatabaseManager(final EmbeddedDatabaseManagerContext context) {
+        this.context = context;
+    }
+
+    @Override
+    public void init() {
+        if (state.get() != EmbeddedDatabaseManagerStatus.UNINITIALIZED) {
+            throw new IllegalStateException("Manager is already initialized!");
+        }
+
+        ensureDatabaseIsReady();
+        startRollover();
+    }
+
+    private void ensureDatabaseIsReady() {
+        boolean successful = false;
+
+        try {
+            databaseStructureLock.writeLock().lock();
+            state.set(EmbeddedDatabaseManagerStatus.REPAIRING);
+
+            try {
+                ensurePersistLocationIsAccessible();
+                ensureConnectionEstablished();
+                ensureTablesAreInPlaceAndHealthy();
+                successful = true;
+            } catch (final CorruptedDatabaseException e) {
+                boolean couldMoveOldToBackup = false;
+
+                try {
+                    LOGGER.error("Database is corrupted. Recreation is 
triggered.", e);
+                    final File backupFolder = new 
File(context.getBackupLocationAsFile(), "backup_" + System.currentTimeMillis());
+                    
FileUtils.ensureDirectoryExistAndCanAccess(context.getBackupLocationAsFile());
+                    Files.move(context.getPersistLocationAsPath(), 
backupFolder.toPath());
+                    couldMoveOldToBackup = true;
+                } catch (IOException ex) {
+                    LOGGER.error("Could not create backup", ex);
+                }
+
+                if (!couldMoveOldToBackup) {
+                    try {
+                        
FileUtils.deleteFile(context.getPersistLocationAsFile(), true);
+                        couldMoveOldToBackup = true;
+                    } catch (IOException ex) {
+                        LOGGER.error("Could not clean up corrupted database", 
ex);
+                    }
+                }
+
+                if (couldMoveOldToBackup) {
+                    try {
+                        ensurePersistLocationIsAccessible();
+                        ensureConnectionEstablished();
+                        ensureTablesAreInPlaceAndHealthy();
+                        successful = true;
+                    } catch (CorruptedDatabaseException ex) {
+                        LOGGER.error("Could not create backup", ex);
+                    }
+                }
+            }
+        } finally {
+            state.set(successful? EmbeddedDatabaseManagerStatus.HEALTHY : 
EmbeddedDatabaseManagerStatus.CORRUPTED);
+
+            if (!successful) {
+                engine.set(null);
+            }
+
+            databaseStructureLock.writeLock().unlock();
+        }
+
+    }
+
+    private void ensurePersistLocationIsAccessible() throws 
CorruptedDatabaseException {
+        try {
+            
FileUtils.ensureDirectoryExistAndCanAccess(context.getPersistLocationAsPath().toFile());
+        } catch (final Exception e) {
+            final String errorMessage = String.format("Database directory 
creation failed [%s]", context.getPersistLocationAsPath());
+            LOGGER.error(errorMessage, e.toString());
+            throw new CorruptedDatabaseException(errorMessage, e);
+        }
+    }
+
+    private void ensureConnectionEstablished() throws 
CorruptedDatabaseException {
+        if (engine.get() != null) {
+            engine.getAndSet(null).close();
+        }
+
+        final String absolutePath = 
context.getPersistLocationAsFile().getAbsolutePath();
+        final CairoConfiguration configuration = new 
DefaultCairoConfiguration(absolutePath);
+
+        try {
+            final CairoEngine engine = new CairoEngine(configuration);
+            LOGGER.info("Database connection successful [{}]", absolutePath);
+            this.engine.set(engine);
+        } catch (final Exception e) {
+            LOGGER.error("Database connection failed [{}]", absolutePath, e);
+            throw new CorruptedDatabaseException(e);
+        }
+    }
+
+    private void ensureTablesAreInPlaceAndHealthy() throws 
CorruptedDatabaseException {
+        final Map<String, File> databaseFiles = 
Arrays.stream(context.getPersistLocationAsFile().listFiles())
+                .collect(Collectors.toMap(f -> 
f.getAbsolutePath().substring(context.getPersistLocationAsFile().getAbsolutePath().length()
 + 1), f -> f));
+        final Client client = getUnmanagedClient();
+
+        try {
+            for (final ManagedTableDefinition tableDefinition : 
context.getTableDefinitions()) {
+                if (!databaseFiles.containsKey(tableDefinition.getName())) {
+                    try {
+                        LOGGER.info("Creating table {}", 
tableDefinition.getName());
+                        client.execute(tableDefinition.getDefinition());
+                        LOGGER.info("Table {} is created", 
tableDefinition.getName());
+                    } catch (DatabaseException e) {
+                        throw new CorruptedDatabaseException(e);
+                    }
+                } else if 
(!databaseFiles.get(tableDefinition.getName()).isDirectory()) {
+                    throw new CorruptedDatabaseException(String.format("Table 
%s cannot be created because there is already a file exists with the given 
name", tableDefinition.getName()));
+                }
+            }
+
+            // Checking if tables are healthy.
+            for (final ManagedTableDefinition tableDefinition : 
context.getTableDefinitions()) {
+                try {
+                    final TableToken tableToken = 
this.engine.get().getTableTokenIfExists(tableDefinition.getName());
+                    final TableRecordMetadata metadata = 
this.engine.get().getSequencerMetadata(tableToken);
+                    metadata.close();
+
+                    client.execute(String.format("SELECT * FROM %S LIMIT 1", 
tableDefinition.getName()));
+                } catch (final Exception e) {
+                    throw new CorruptedDatabaseException(e);
+                }
+            }
+        } finally {
+            try {
+                client.disconnect();
+            } catch (DatabaseException e) {
+                throw new CorruptedDatabaseException(e);
+            }
+        }
+    }
+
+    private void startRollover() {
+        final RolloverWorker rolloverWorker = new 
RolloverWorker(acquireClient(), context.getTableDefinitions());
+        final ScheduledFuture<?> rolloverFuture = 
scheduledExecutorService.scheduleWithFixedDelay(
+                rolloverWorker, context.getRolloverFrequency(), 
context.getRolloverFrequency(), context.getRolloverFrequencyTimeUnit());
+        scheduledFutures.add(rolloverFuture);
+        LOGGER.debug("Rollover is started");

Review Comment:
   ```suggestion
           LOGGER.debug("Rollover started");
   ```



##########
nifi-nar-bundles/nifi-questdb-bundle/nifi-questdb/src/main/java/org/apache/nifi/questdb/embedded/EmbeddedDatabaseManager.java:
##########
@@ -0,0 +1,297 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.questdb.embedded;
+
+import io.questdb.cairo.CairoConfiguration;
+import io.questdb.cairo.CairoEngine;
+import io.questdb.cairo.DefaultCairoConfiguration;
+import io.questdb.cairo.TableToken;
+import io.questdb.cairo.sql.TableRecordMetadata;
+import org.apache.commons.lang3.concurrent.BasicThreadFactory;
+import org.apache.nifi.questdb.Client;
+import org.apache.nifi.questdb.DatabaseException;
+import org.apache.nifi.questdb.DatabaseManager;
+import org.apache.nifi.util.file.FileUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.stream.Collectors;
+
+final class EmbeddedDatabaseManager implements DatabaseManager {
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(EmbeddedDatabaseManager.class);
+
+    private final String id = UUID.randomUUID().toString();
+    private final AtomicReference<EmbeddedDatabaseManagerStatus> state = new 
AtomicReference(EmbeddedDatabaseManagerStatus.UNINITIALIZED);
+    private final ReadWriteLock databaseStructureLock = new 
ReentrantReadWriteLock();
+    private final EmbeddedDatabaseManagerContext context;
+    private final AtomicReference<CairoEngine> engine = new 
AtomicReference<>();
+    private final List<ScheduledFuture<?>> scheduledFutures = new 
ArrayList<>();
+    private final ScheduledExecutorService scheduledExecutorService = Executors
+            .newScheduledThreadPool(2, new 
BasicThreadFactory.Builder().namingPattern("EmbeddedQuestDbManagerWorker-" + id 
+ "-%d").build());
+
+    EmbeddedDatabaseManager(final EmbeddedDatabaseManagerContext context) {
+        this.context = context;
+    }
+
+    @Override
+    public void init() {
+        if (state.get() != EmbeddedDatabaseManagerStatus.UNINITIALIZED) {
+            throw new IllegalStateException("Manager is already initialized!");
+        }
+
+        ensureDatabaseIsReady();
+        startRollover();
+    }
+
+    private void ensureDatabaseIsReady() {
+        boolean successful = false;
+
+        try {
+            databaseStructureLock.writeLock().lock();
+            state.set(EmbeddedDatabaseManagerStatus.REPAIRING);
+
+            try {
+                ensurePersistLocationIsAccessible();
+                ensureConnectionEstablished();
+                ensureTablesAreInPlaceAndHealthy();
+                successful = true;
+            } catch (final CorruptedDatabaseException e) {
+                boolean couldMoveOldToBackup = false;
+
+                try {
+                    LOGGER.error("Database is corrupted. Recreation is 
triggered.", e);
+                    final File backupFolder = new 
File(context.getBackupLocationAsFile(), "backup_" + System.currentTimeMillis());
+                    
FileUtils.ensureDirectoryExistAndCanAccess(context.getBackupLocationAsFile());
+                    Files.move(context.getPersistLocationAsPath(), 
backupFolder.toPath());
+                    couldMoveOldToBackup = true;
+                } catch (IOException ex) {
+                    LOGGER.error("Could not create backup", ex);
+                }
+
+                if (!couldMoveOldToBackup) {
+                    try {
+                        
FileUtils.deleteFile(context.getPersistLocationAsFile(), true);
+                        couldMoveOldToBackup = true;
+                    } catch (IOException ex) {
+                        LOGGER.error("Could not clean up corrupted database", 
ex);
+                    }
+                }
+
+                if (couldMoveOldToBackup) {
+                    try {
+                        ensurePersistLocationIsAccessible();
+                        ensureConnectionEstablished();
+                        ensureTablesAreInPlaceAndHealthy();
+                        successful = true;
+                    } catch (CorruptedDatabaseException ex) {
+                        LOGGER.error("Could not create backup", ex);
+                    }
+                }
+            }
+        } finally {
+            state.set(successful? EmbeddedDatabaseManagerStatus.HEALTHY : 
EmbeddedDatabaseManagerStatus.CORRUPTED);
+
+            if (!successful) {
+                engine.set(null);
+            }
+
+            databaseStructureLock.writeLock().unlock();
+        }
+
+    }
+
+    private void ensurePersistLocationIsAccessible() throws 
CorruptedDatabaseException {
+        try {
+            
FileUtils.ensureDirectoryExistAndCanAccess(context.getPersistLocationAsPath().toFile());
+        } catch (final Exception e) {
+            final String errorMessage = String.format("Database directory 
creation failed [%s]", context.getPersistLocationAsPath());
+            LOGGER.error(errorMessage, e.toString());
+            throw new CorruptedDatabaseException(errorMessage, e);
+        }
+    }
+
+    private void ensureConnectionEstablished() throws 
CorruptedDatabaseException {
+        if (engine.get() != null) {
+            engine.getAndSet(null).close();
+        }
+
+        final String absolutePath = 
context.getPersistLocationAsFile().getAbsolutePath();
+        final CairoConfiguration configuration = new 
DefaultCairoConfiguration(absolutePath);
+
+        try {
+            final CairoEngine engine = new CairoEngine(configuration);
+            LOGGER.info("Database connection successful [{}]", absolutePath);
+            this.engine.set(engine);
+        } catch (final Exception e) {
+            LOGGER.error("Database connection failed [{}]", absolutePath, e);
+            throw new CorruptedDatabaseException(e);
+        }
+    }
+
+    private void ensureTablesAreInPlaceAndHealthy() throws 
CorruptedDatabaseException {
+        final Map<String, File> databaseFiles = 
Arrays.stream(context.getPersistLocationAsFile().listFiles())
+                .collect(Collectors.toMap(f -> 
f.getAbsolutePath().substring(context.getPersistLocationAsFile().getAbsolutePath().length()
 + 1), f -> f));
+        final Client client = getUnmanagedClient();
+
+        try {
+            for (final ManagedTableDefinition tableDefinition : 
context.getTableDefinitions()) {
+                if (!databaseFiles.containsKey(tableDefinition.getName())) {
+                    try {
+                        LOGGER.info("Creating table {}", 
tableDefinition.getName());
+                        client.execute(tableDefinition.getDefinition());
+                        LOGGER.info("Table {} is created", 
tableDefinition.getName());

Review Comment:
   This look like they should be debug messages.
   ```suggestion
                           LOGGER.debug("Creating table {}", 
tableDefinition.getName());
                           client.execute(tableDefinition.getDefinition());
                           LOGGER.debug("Table {} created", 
tableDefinition.getName());
   ```



##########
nifi-nar-bundles/nifi-questdb-bundle/nifi-questdb/src/main/java/org/apache/nifi/questdb/embedded/EmbeddedDatabaseManagerContext.java:
##########
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.questdb.embedded;
+
+import java.io.File;
+import java.nio.file.Path;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+interface EmbeddedDatabaseManagerContext {
+    String getPersistLocation();
+    Path getPersistLocationAsPath();
+    File getPersistLocationAsFile();
+
+    String getBackupLocation();
+    Path getBackupLocationAsPath();
+    File getBackupLocationAsFile();

Review Comment:
   These `asFile()` methods seem unnecessary since Path has a `toFile()` method.



##########
nifi-nar-bundles/nifi-questdb-bundle/nifi-questdb/src/main/java/org/apache/nifi/questdb/mapping/SimpleRequestMapping.java:
##########
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.questdb.mapping;
+
+import org.apache.commons.lang3.tuple.Pair;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.function.BiConsumer;
+import java.util.function.Supplier;
+
+final class SimpleRequestMapping<T> implements RequestMapping<T> {
+    private final Supplier<T> factory;
+    private final List<Pair<Class<?>, BiConsumer<T, Object>>> fieldMappings;
+
+    SimpleRequestMapping(
+        final Supplier<T> factory,
+        final List<Pair<Class<?>, BiConsumer<T, Object>>> fieldMappings
+    ) {
+        this.factory = factory;
+        this.fieldMappings = new ArrayList<>(fieldMappings);
+    }
+
+    @Override
+    public T getNewInstance() {
+        return factory.get();
+    }
+
+    @Override
+    public int getNumberOfFields() {
+        return fieldMappings.size();
+    }
+
+    @Override
+    public Class<?> getFieldTypeAt(final int position) {

Review Comment:
   Recommend removing the `At` suffix from these methods.



##########
nifi-nar-bundles/nifi-questdb-bundle/nifi-questdb/src/main/java/org/apache/nifi/questdb/embedded/EmbeddedDatabaseManagerContext.java:
##########
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.questdb.embedded;
+
+import java.io.File;
+import java.nio.file.Path;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+interface EmbeddedDatabaseManagerContext {
+    String getPersistLocation();
+    Path getPersistLocationAsPath();
+    File getPersistLocationAsFile();
+
+    String getBackupLocation();
+    Path getBackupLocationAsPath();
+    File getBackupLocationAsFile();
+
+    int getNumberOfAttemptedRetries();
+    int getLockAttemptTime();
+    TimeUnit getLockAttemptTimeUnit();
+    int getRolloverFrequency();
+    TimeUnit getRolloverFrequencyTimeUnit();

Review Comment:
   As above, this could be replaced with Duration.



##########
nifi-nar-bundles/nifi-questdb-bundle/nifi-questdb/src/main/java/org/apache/nifi/questdb/embedded/LockedClient.java:
##########
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.questdb.embedded;
+
+import org.apache.nifi.questdb.Client;
+import org.apache.nifi.questdb.DatabaseException;
+import org.apache.nifi.questdb.InsertRowDataSource;
+import org.apache.nifi.questdb.QueryResultProcessor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
+
+final class LockedClient implements Client {

Review Comment:
   Is this class necessary because QuestDB does not have locking support itself?



##########
nifi-nar-bundles/nifi-questdb-bundle/nifi-questdb/src/main/java/org/apache/nifi/questdb/embedded/EmbeddedDatabaseManager.java:
##########
@@ -0,0 +1,297 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.questdb.embedded;
+
+import io.questdb.cairo.CairoConfiguration;
+import io.questdb.cairo.CairoEngine;
+import io.questdb.cairo.DefaultCairoConfiguration;
+import io.questdb.cairo.TableToken;
+import io.questdb.cairo.sql.TableRecordMetadata;
+import org.apache.commons.lang3.concurrent.BasicThreadFactory;
+import org.apache.nifi.questdb.Client;
+import org.apache.nifi.questdb.DatabaseException;
+import org.apache.nifi.questdb.DatabaseManager;
+import org.apache.nifi.util.file.FileUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.stream.Collectors;
+
+final class EmbeddedDatabaseManager implements DatabaseManager {
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(EmbeddedDatabaseManager.class);
+
+    private final String id = UUID.randomUUID().toString();
+    private final AtomicReference<EmbeddedDatabaseManagerStatus> state = new 
AtomicReference(EmbeddedDatabaseManagerStatus.UNINITIALIZED);
+    private final ReadWriteLock databaseStructureLock = new 
ReentrantReadWriteLock();
+    private final EmbeddedDatabaseManagerContext context;
+    private final AtomicReference<CairoEngine> engine = new 
AtomicReference<>();
+    private final List<ScheduledFuture<?>> scheduledFutures = new 
ArrayList<>();
+    private final ScheduledExecutorService scheduledExecutorService = Executors
+            .newScheduledThreadPool(2, new 
BasicThreadFactory.Builder().namingPattern("EmbeddedQuestDbManagerWorker-" + id 
+ "-%d").build());
+
+    EmbeddedDatabaseManager(final EmbeddedDatabaseManagerContext context) {
+        this.context = context;
+    }
+
+    @Override
+    public void init() {
+        if (state.get() != EmbeddedDatabaseManagerStatus.UNINITIALIZED) {
+            throw new IllegalStateException("Manager is already initialized!");
+        }
+
+        ensureDatabaseIsReady();
+        startRollover();
+    }
+
+    private void ensureDatabaseIsReady() {
+        boolean successful = false;
+
+        try {
+            databaseStructureLock.writeLock().lock();
+            state.set(EmbeddedDatabaseManagerStatus.REPAIRING);
+
+            try {
+                ensurePersistLocationIsAccessible();
+                ensureConnectionEstablished();
+                ensureTablesAreInPlaceAndHealthy();
+                successful = true;
+            } catch (final CorruptedDatabaseException e) {
+                boolean couldMoveOldToBackup = false;
+
+                try {
+                    LOGGER.error("Database is corrupted. Recreation is 
triggered.", e);
+                    final File backupFolder = new 
File(context.getBackupLocationAsFile(), "backup_" + System.currentTimeMillis());
+                    
FileUtils.ensureDirectoryExistAndCanAccess(context.getBackupLocationAsFile());
+                    Files.move(context.getPersistLocationAsPath(), 
backupFolder.toPath());
+                    couldMoveOldToBackup = true;
+                } catch (IOException ex) {
+                    LOGGER.error("Could not create backup", ex);
+                }
+
+                if (!couldMoveOldToBackup) {
+                    try {
+                        
FileUtils.deleteFile(context.getPersistLocationAsFile(), true);
+                        couldMoveOldToBackup = true;
+                    } catch (IOException ex) {
+                        LOGGER.error("Could not clean up corrupted database", 
ex);
+                    }
+                }
+
+                if (couldMoveOldToBackup) {
+                    try {
+                        ensurePersistLocationIsAccessible();
+                        ensureConnectionEstablished();
+                        ensureTablesAreInPlaceAndHealthy();
+                        successful = true;
+                    } catch (CorruptedDatabaseException ex) {
+                        LOGGER.error("Could not create backup", ex);
+                    }
+                }
+            }
+        } finally {
+            state.set(successful? EmbeddedDatabaseManagerStatus.HEALTHY : 
EmbeddedDatabaseManagerStatus.CORRUPTED);
+
+            if (!successful) {
+                engine.set(null);
+            }
+
+            databaseStructureLock.writeLock().unlock();
+        }
+
+    }
+
+    private void ensurePersistLocationIsAccessible() throws 
CorruptedDatabaseException {
+        try {
+            
FileUtils.ensureDirectoryExistAndCanAccess(context.getPersistLocationAsPath().toFile());
+        } catch (final Exception e) {
+            final String errorMessage = String.format("Database directory 
creation failed [%s]", context.getPersistLocationAsPath());
+            LOGGER.error(errorMessage, e.toString());
+            throw new CorruptedDatabaseException(errorMessage, e);
+        }
+    }
+
+    private void ensureConnectionEstablished() throws 
CorruptedDatabaseException {
+        if (engine.get() != null) {
+            engine.getAndSet(null).close();
+        }
+
+        final String absolutePath = 
context.getPersistLocationAsFile().getAbsolutePath();
+        final CairoConfiguration configuration = new 
DefaultCairoConfiguration(absolutePath);
+
+        try {
+            final CairoEngine engine = new CairoEngine(configuration);
+            LOGGER.info("Database connection successful [{}]", absolutePath);
+            this.engine.set(engine);
+        } catch (final Exception e) {
+            LOGGER.error("Database connection failed [{}]", absolutePath, e);
+            throw new CorruptedDatabaseException(e);
+        }
+    }
+
+    private void ensureTablesAreInPlaceAndHealthy() throws 
CorruptedDatabaseException {
+        final Map<String, File> databaseFiles = 
Arrays.stream(context.getPersistLocationAsFile().listFiles())
+                .collect(Collectors.toMap(f -> 
f.getAbsolutePath().substring(context.getPersistLocationAsFile().getAbsolutePath().length()
 + 1), f -> f));
+        final Client client = getUnmanagedClient();
+
+        try {
+            for (final ManagedTableDefinition tableDefinition : 
context.getTableDefinitions()) {
+                if (!databaseFiles.containsKey(tableDefinition.getName())) {
+                    try {
+                        LOGGER.info("Creating table {}", 
tableDefinition.getName());
+                        client.execute(tableDefinition.getDefinition());
+                        LOGGER.info("Table {} is created", 
tableDefinition.getName());
+                    } catch (DatabaseException e) {
+                        throw new CorruptedDatabaseException(e);
+                    }
+                } else if 
(!databaseFiles.get(tableDefinition.getName()).isDirectory()) {
+                    throw new CorruptedDatabaseException(String.format("Table 
%s cannot be created because there is already a file exists with the given 
name", tableDefinition.getName()));
+                }
+            }
+
+            // Checking if tables are healthy.
+            for (final ManagedTableDefinition tableDefinition : 
context.getTableDefinitions()) {
+                try {
+                    final TableToken tableToken = 
this.engine.get().getTableTokenIfExists(tableDefinition.getName());
+                    final TableRecordMetadata metadata = 
this.engine.get().getSequencerMetadata(tableToken);
+                    metadata.close();
+
+                    client.execute(String.format("SELECT * FROM %S LIMIT 1", 
tableDefinition.getName()));
+                } catch (final Exception e) {
+                    throw new CorruptedDatabaseException(e);
+                }
+            }
+        } finally {
+            try {
+                client.disconnect();
+            } catch (DatabaseException e) {
+                throw new CorruptedDatabaseException(e);
+            }
+        }
+    }
+
+    private void startRollover() {
+        final RolloverWorker rolloverWorker = new 
RolloverWorker(acquireClient(), context.getTableDefinitions());
+        final ScheduledFuture<?> rolloverFuture = 
scheduledExecutorService.scheduleWithFixedDelay(
+                rolloverWorker, context.getRolloverFrequency(), 
context.getRolloverFrequency(), context.getRolloverFrequencyTimeUnit());
+        scheduledFutures.add(rolloverFuture);
+        LOGGER.debug("Rollover is started");
+    }
+
+    private void stopRollover() {
+        LOGGER.debug("Rollover shutdown started");
+
+        int cancelCompleted = 0;
+        int cancelFailed = 0;
+        for (final ScheduledFuture<?> scheduledFuture : scheduledFutures) {
+            final boolean cancelled = scheduledFuture.cancel(true);
+            if (cancelled) {
+                cancelCompleted++;
+            } else {
+                cancelFailed++;
+            }
+        }
+        LOGGER.debug("Rollover shutdown task cancellation status: completed 
[{}] failed [{}]", cancelCompleted, cancelFailed);
+
+        final List<Runnable> tasks = scheduledExecutorService.shutdownNow();
+        LOGGER.debug("Rollover Scheduled Task Service shutdown remaining tasks 
[{}]", tasks.size());
+    }
+
+    private Client getUnmanagedClient() {
+        return new EmbeddedClient(() -> engine.get());
+    }
+
+    public Client acquireClient() {
+        checkIfManagerIsInitialised();
+        final Client fallback = new DummyClient();
+
+        if (state.get() == EmbeddedDatabaseManagerStatus.CORRUPTED) {
+            LOGGER.warn("The database is corrupted. Dummy client is returned");

Review Comment:
   What is the behavior of the DummyClient? Perhaps NoOpClient would be clearer?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to