exceptionfactory commented on code in PR #8123:
URL: https://github.com/apache/nifi/pull/8123#discussion_r1417672045


##########
nifi-api/src/main/java/org/apache/nifi/controller/status/NodeStatus.java:
##########
@@ -35,7 +35,6 @@ public class NodeStatus implements Cloneable {
     private double processorLoadAverage;
 
     private long totalThreads;
-    private long eventDrivenThreads;

Review Comment:
   This change is a helpful cleanup, which would be useful to handle 
separately. As this particular pull request may take some time to review, would 
you consider separating out this removal to its own pull request?



##########
nifi-commons/nifi-questdb/pom.xml:
##########
@@ -0,0 +1,71 @@
+<?xml version="1.0"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+      http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
https://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.nifi</groupId>
+        <artifactId>nifi-commons</artifactId>
+        <version>2.0.0-SNAPSHOT</version>
+    </parent>
+    <artifactId>nifi-questdb</artifactId>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-utils</artifactId>
+            <version>2.0.0-SNAPSHOT</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.commons</groupId>
+            <artifactId>commons-lang3</artifactId>
+            <version>3.13.0</version>
+        </dependency>
+        <dependency>
+            <groupId>commons-io</groupId>
+            <artifactId>commons-io</artifactId>
+            <version>2.13.0</version>
+        </dependency>
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-api</artifactId>
+            <version>2.0.9</version>
+        </dependency>
+        <dependency>
+            <groupId>org.questdb</groupId>
+            <artifactId>questdb</artifactId>
+            <version>7.2</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.junit.jupiter</groupId>
+            <artifactId>junit-jupiter-engine</artifactId>
+            <version>5.9.2</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.mockito</groupId>
+            <artifactId>mockito-core</artifactId>
+            <version>5.6.0</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.mockito</groupId>
+            <artifactId>mockito-junit-jupiter</artifactId>
+            <version>5.6.0</version>
+        </dependency>

Review Comment:
   With the exception of `questdb`, all of these dependency versions are 
managed at the root Maven configuration, so the specific version numbers should 
be removed.



##########
nifi-api/src/main/java/org/apache/nifi/controller/status/ConnectionStatus.java:
##########
@@ -22,6 +22,7 @@
 /**
  */
 public class ConnectionStatus implements Cloneable {
+    private long createdAtInMs;

Review Comment:
   Although it might require some additional replication in the QuestDB 
implementation itself, it is important to avoid introducing specific concepts 
to the NiFi API that do not have general applicability. I would have to take a 
closer look at how this is used, but in general, I believe any changes to the 
nifi-api should be avoided for the purpose of this implementation.



##########
nifi-commons/nifi-questdb/src/test/java/org/apache/nifi/questdb/QuestDbTestUtil.java:
##########
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.questdb;
+
+import java.io.File;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.UUID;
+
+public class QuestDbTestUtil {
+    static final File TEST_DB_PATH = new File("src/test/resources/testdb");

Review Comment:
   The JUnit 5 `@TempDir` annotation is preferable to hard-coded paths that 
reference source directories, since that annotation provides lifecycle 
management for temporary directories.



##########
nifi-commons/nifi-questdb/src/test/java/org/apache/nifi/questdb/CompositeClientTest.java:
##########
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.questdb;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.junit.jupiter.MockitoExtension;
+import org.mockito.junit.jupiter.MockitoSettings;
+import org.mockito.quality.Strictness;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.BiConsumer;
+
+import static org.apache.nifi.questdb.QuestDbTestUtil.SELECT_QUERY;
+
+// This test uses the {@code compile} method to cover different scenarios. It 
is expected that other methods behave the same
+// but the coverage of the individual behaviour of the individual methods is 
provided by different tests.
+@ExtendWith(MockitoExtension.class)
+@MockitoSettings(strictness = Strictness.LENIENT)
+public class CompositeClientTest extends EmbeddedQuestDbTest {
+    private static final int NUMBER_OF_RETRIES = 3;
+    private static final int NUMBER_OF_ATTEMPTS = NUMBER_OF_RETRIES + 1;
+    private static final int LOCK_ATTEMPT_TIME = 20;
+    private static final TimeUnit LOCK_ATTEMPT_TIME_UNIT = 
TimeUnit.MILLISECONDS;
+
+    @Mock
+    Client client;
+
+    @Mock
+    Client fallback;
+
+    @Mock
+    BiConsumer<Integer, Exception> errorAction;
+
+    @Mock
+    ConditionAwareClient.Condition condition;
+
+    private ReadWriteLock databaseStructureLock;
+
+    private LockedClient lockedClientSpy;
+
+    private Client testSubject;
+
+    @BeforeEach
+    public void setUp() {
+        databaseStructureLock = new ReentrantReadWriteLock();
+
+        testSubject = getTestSubject();
+
+        Mockito.when(condition.check()).thenReturn(true);

Review Comment:
   I recommend using static imports for Mockito and Assertion methods for more 
concise test code.



##########
nifi-commons/nifi-questdb/src/main/java/org/apache/nifi/questdb/EmbeddedDatabaseManager.java:
##########
@@ -0,0 +1,294 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.questdb;
+
+import io.questdb.cairo.CairoConfiguration;
+import io.questdb.cairo.CairoEngine;
+import io.questdb.cairo.DefaultCairoConfiguration;
+import io.questdb.cairo.TableToken;
+import io.questdb.cairo.sql.TableRecordMetadata;
+import org.apache.commons.lang3.concurrent.BasicThreadFactory;
+import org.apache.nifi.util.file.FileUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.stream.Collectors;
+
+final class EmbeddedDatabaseManager implements DatabaseManager {
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(EmbeddedDatabaseManager.class);
+
+    private final String id = UUID.randomUUID().toString();
+    private final AtomicReference<EmbeddedDatabaseManagerStatus> state = new 
AtomicReference(EmbeddedDatabaseManagerStatus.UNINITIALIZED);
+    private final ReadWriteLock databaseStructureLock = new 
ReentrantReadWriteLock();
+    private final EmbeddedDatabaseManagerContext context;
+    private final AtomicReference<CairoEngine> engine = new 
AtomicReference<>();
+    private final List<ScheduledFuture<?>> scheduledFutures = new 
ArrayList<>();
+    private final ScheduledExecutorService scheduledExecutorService = Executors
+            .newScheduledThreadPool(2, new 
BasicThreadFactory.Builder().namingPattern("EmbeddedQuestDbManagerWorker-" + id 
+ "-%d").build());
+
+    EmbeddedDatabaseManager(final EmbeddedDatabaseManagerContext context) {
+        this.context = context;
+    }
+
+    @Override
+    public void init() {
+        if (state.get() != EmbeddedDatabaseManagerStatus.UNINITIALIZED) {
+            throw new IllegalStateException("Manager is already initialized!");
+        }
+
+        ensureDatabaseIsReady();
+        startRollover();
+    }
+
+    private void ensureDatabaseIsReady() {
+        boolean successful = false;
+
+        try {
+            databaseStructureLock.writeLock().lock();
+            state.set(EmbeddedDatabaseManagerStatus.REPAIRING);
+
+            try {
+                ensurePersistLocationIsAccessible();
+                ensureConnectionEstablished();
+                ensureTablesAreInPlaceAndHealthy();
+                successful = true;
+            } catch (final CorruptedDatabaseException e) {
+                boolean couldMoveOldToBackup = false;
+
+                try {
+                    LOGGER.error("Database is corrupted. Recreation is 
triggered.", e);
+                    final File backupFolder = new 
File(context.getBackupLocationAsFile(), "backup_" + System.currentTimeMillis());
+                    
FileUtils.ensureDirectoryExistAndCanAccess(context.getBackupLocationAsFile());
+                    Files.move(context.getPersistLocationAsPath(), 
backupFolder.toPath());
+                    couldMoveOldToBackup = true;
+                } catch (IOException ex) {
+                    LOGGER.error("Could not create backup", ex);
+                }
+
+                if (!couldMoveOldToBackup) {
+                    try {
+                        
FileUtils.deleteFile(context.getPersistLocationAsFile(), true);
+                        couldMoveOldToBackup = true;
+                    } catch (IOException ex) {
+                        LOGGER.error("Could not clean up corrupted database", 
ex);
+                    }
+                }
+
+                if (couldMoveOldToBackup) {
+                    try {
+                        ensurePersistLocationIsAccessible();
+                        ensureConnectionEstablished();
+                        ensureTablesAreInPlaceAndHealthy();
+                        successful = true;
+                    } catch (CorruptedDatabaseException ex) {
+                        LOGGER.error("Could not create backup", ex);
+                    }
+                }
+            }
+        } finally {
+            state.set(successful? EmbeddedDatabaseManagerStatus.HEALTHY : 
EmbeddedDatabaseManagerStatus.CORRUPTED);
+
+            if (!successful) {
+                engine.set(null);
+            }
+
+            databaseStructureLock.writeLock().unlock();
+        }
+
+    }
+
+    private void ensurePersistLocationIsAccessible() throws 
CorruptedDatabaseException {
+        try {
+            
FileUtils.ensureDirectoryExistAndCanAccess(context.getPersistLocationAsPath().toFile());
+        } catch (final Exception e) {
+            final String errorMessage = String.format("Database directory 
creation failed [%s]", context.getPersistLocationAsPath());
+            LOGGER.error(errorMessage, e.toString());
+            throw new CorruptedDatabaseException(errorMessage);

Review Comment:
   As a general implementation note, the exception cause should be passed to 
the new exception for tracking. As it stands, the source of the stack is 
neither logged nor passed, which limits potential debugging.



##########
nifi-commons/nifi-questdb/pom.xml:
##########
@@ -0,0 +1,71 @@
+<?xml version="1.0"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+      http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
https://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.nifi</groupId>
+        <artifactId>nifi-commons</artifactId>
+        <version>2.0.0-SNAPSHOT</version>
+    </parent>
+    <artifactId>nifi-questdb</artifactId>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-utils</artifactId>
+            <version>2.0.0-SNAPSHOT</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.commons</groupId>
+            <artifactId>commons-lang3</artifactId>
+            <version>3.13.0</version>
+        </dependency>
+        <dependency>
+            <groupId>commons-io</groupId>
+            <artifactId>commons-io</artifactId>
+            <version>2.13.0</version>
+        </dependency>
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-api</artifactId>
+            <version>2.0.9</version>
+        </dependency>
+        <dependency>
+            <groupId>org.questdb</groupId>
+            <artifactId>questdb</artifactId>
+            <version>7.2</version>

Review Comment:
   The latest version of QuestDB appears to be 7.3.5, so this is a good 
opportunity to move to the latest version.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to