markap14 commented on a change in pull request #4821:
URL: https://github.com/apache/nifi/pull/4821#discussion_r578557060
##########
File path:
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties
##########
@@ -120,11 +120,40 @@
nifi.provenance.repository.concurrent.merge.threads=${nifi.provenance.repository
# Volatile Provenance Respository Properties
nifi.provenance.repository.buffer.size=${nifi.provenance.repository.buffer.size}
-# Component Status Repository
+# Component and Node Status Repository
+
+# Implementation based specification. Must be the classname of
org.apache.nifi.controller.status.history.StatusRepository implementation.
+# This property specifies both the Component and the Node Status Repositories.
+# This approach takes precedence over the builder based specification if
presents.
nifi.components.status.repository.implementation=${nifi.components.status.repository.implementation}
+
+# Builder based specification. Gives the possibility to store Node and
Component Status History information in different storage solutions.
Review comment:
All properties that get added to this file need to be fully documented
in the `administration-guide.adoc` in `nifi-docs`
##########
File path:
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties
##########
@@ -120,11 +120,40 @@
nifi.provenance.repository.concurrent.merge.threads=${nifi.provenance.repository
# Volatile Provenance Respository Properties
nifi.provenance.repository.buffer.size=${nifi.provenance.repository.buffer.size}
-# Component Status Repository
+# Component and Node Status Repository
+
+# Implementation based specification. Must be the classname of
org.apache.nifi.controller.status.history.StatusRepository implementation.
+# This property specifies both the Component and the Node Status Repositories.
+# This approach takes precedence over the builder based specification if
presents.
nifi.components.status.repository.implementation=${nifi.components.status.repository.implementation}
+
+# Builder based specification. Gives the possibility to store Node and
Component Status History information in different storage solutions.
+#
nifi.status.repository.builder.inmemory=org.apache.nifi.controller.status.history.InMemoryStatusRepositoryBuilder
+#
nifi.status.repository.builder.persistent=org.apache.nifi.controller.status.history.EmbeddedQuestDbStatusRepositoryBuilder
+# nifi.status.repository.roles.component=persistent
+# nifi.status.repository.roles.node=inmemory
Review comment:
I'm not sure that I see the benefit to adding these properties at all.
If the user wants to persist the data, it should be persisted. If they want to
keep it in memory, it should be kept in memory. These properties become
confusing and add dubious value. We should lean more toward simple
configuration vs. more raw power when we're able to.
Recommend removing all 4 of these properties. Instead, just allow the
QuestDB Status Repository to be configured via the
`nifi.components.status.repository.implementation` property, in which case all
stats are persistent. If Volatile repo is used, store everything in memory.
##########
File path:
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
##########
@@ -1124,20 +1129,77 @@ private ProvenanceRepository
createProvenanceRepository(final NiFiProperties pro
}
}
- private ComponentStatusRepository createComponentStatusRepository() {
- final String implementationClassName =
nifiProperties.getProperty(NiFiProperties.COMPONENT_STATUS_REPOSITORY_IMPLEMENTATION,
DEFAULT_COMPONENT_STATUS_REPO_IMPLEMENTATION);
- if (implementationClassName == null) {
- throw new RuntimeException("Cannot create Component Status
Repository because the NiFi Properties is missing the following property: "
- +
NiFiProperties.COMPONENT_STATUS_REPOSITORY_IMPLEMENTATION);
+ private StatusRepository createStatusRepositories() {
+ // Creating status repository based on implementation class takes
precedence over creation based on builder
+ final String implementationClassName =
nifiProperties.getProperty(NiFiProperties.COMPONENT_STATUS_REPOSITORY_IMPLEMENTATION);
+
+ if (implementationClassName != null) {
Review comment:
Any time that we fetch a property value from nifi properties, we need to
treat `null` the same as empty strings or strings with only white space. If the
property name exists but with no value, you'll get back an empty string here
instead of null.
##########
File path:
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties
##########
@@ -120,11 +120,40 @@
nifi.provenance.repository.concurrent.merge.threads=${nifi.provenance.repository
# Volatile Provenance Respository Properties
nifi.provenance.repository.buffer.size=${nifi.provenance.repository.buffer.size}
-# Component Status Repository
+# Component and Node Status Repository
+
+# Implementation based specification. Must be the classname of
org.apache.nifi.controller.status.history.StatusRepository implementation.
+# This property specifies both the Component and the Node Status Repositories.
+# This approach takes precedence over the builder based specification if
presents.
nifi.components.status.repository.implementation=${nifi.components.status.repository.implementation}
+
+# Builder based specification. Gives the possibility to store Node and
Component Status History information in different storage solutions.
+#
nifi.status.repository.builder.inmemory=org.apache.nifi.controller.status.history.InMemoryStatusRepositoryBuilder
+#
nifi.status.repository.builder.persistent=org.apache.nifi.controller.status.history.EmbeddedQuestDbStatusRepositoryBuilder
+# nifi.status.repository.roles.component=persistent
+# nifi.status.repository.roles.node=inmemory
+
+# Volatile Status Repository Properties
nifi.components.status.repository.buffer.size=${nifi.components.status.repository.buffer.size}
nifi.components.status.snapshot.frequency=${nifi.components.status.snapshot.frequency}
+# QuestDB Status Repository Properties
+#
nifi.status.repository.questdb.persist.frequency=${nifi.status.repository.questdb.persist.frequency}
+#
nifi.status.repository.questdb.persist.roll.frequency=${nifi.status.repository.questdb.persist.roll.frequency}
+#
nifi.status.repository.questdb.persist.batch.size=${nifi.status.repository.questdb.persist.batch.size}
+#
nifi.status.repository.questdb.persist.node.days=${nifi.status.repository.questdb.persist.node.days}
+#
nifi.status.repository.questdb.persist.component.days=${nifi.status.repository.questdb.persist.component.days}
+#
nifi.status.repository.questdb.persist.location=${nifi.status.repository.questdb.persist.location}
+
+# The properties below are used for optimize QuesDB performance. For further
details please see https://questdb.io/docs/reference/sql/create-table
+# An estimation of the number of components per component type, serves as hint
for QuestDB performance optimization
+#
nifi.status.repository.questdb.component.id.distinctvalues=${nifi.status.repository.questdb.component.id.distinctvalues}
+# If true, it turns on Java heap based caching for quicker lookup. This
increases selection speed but consumes heap memory.
+#
nifi.status.repository.questdb.component.id.cached=${nifi.status.repository.questdb.component.id.cached}
+# Turns on indexing of the component id field. For further details please see
https://questdb.io/docs/concept/indexes/
Review comment:
While referencing the QuestDB docs may provide some additional insights,
we should not expect users to understand how QuestDB works. That is simply an
implementation detail. We need to ensure that we fully document exactly how
this property will affect the user, given the context of NiFi. We should do
this in the administration guide, though, rather than add too much to the
nifi.properties.
##########
File path:
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/EmbeddedQuestDbRolloverHandler.java
##########
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller.status.history;
+
+import io.questdb.cairo.sql.Record;
+import io.questdb.cairo.sql.RecordCursor;
+import io.questdb.cairo.sql.RecordCursorFactory;
+import io.questdb.griffin.SqlCompiler;
+import io.questdb.griffin.SqlExecutionContext;
+import org.apache.commons.lang3.time.FastDateFormat;
+import org.apache.nifi.controller.status.history.questdb.QuestDbContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * QuestDB does not provide the possibility for deleting individual lines.
Instead there is the option to drop older
+ * partitions. In order to clean up older status information, the partitions
are outside of the scope of data we intend
+ * to keep will be deleted.
+ */
+public class EmbeddedQuestDbRolloverHandler implements Runnable {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(EmbeddedQuestDbRolloverHandler.class);
+
+ // Drop keyword is intentionally not uppercase as the query parser only
recognizes it in this way
+ private static final String DELETION_QUERY = "ALTER TABLE %s drop
PARTITION '%s'";
+ // Distinct keyword is not recognized if the date mapping is not within an
inner query
+ static final String SELECTION_QUERY = "SELECT DISTINCT * FROM (SELECT
(to_str(capturedAt, 'yyyy-MM-dd')) AS partitionName FROM %s)";
+
+ static final FastDateFormat DATE_FORMAT =
FastDateFormat.getInstance("yyyy-MM-dd");
+
+ private final QuestDbContext dbContext;
+ private final List<String> tables = new ArrayList<>();
+ private final int daysToKeepData;
+
+ public EmbeddedQuestDbRolloverHandler(final Collection<String> tables,
final int daysToKeepData, final QuestDbContext dbContext) {
+ this.tables.addAll(tables);
+ this.dbContext = dbContext;
+ this.daysToKeepData = daysToKeepData;
+ }
+
+ @Override
+ public void run() {
+ LOGGER.debug("Starting rollover");
+ tables.forEach(tableName -> rollOverTable(tableName));
+ LOGGER.debug("Finishing rollover");
+ }
+
+ private void rollOverTable(final CharSequence tableName) {
+ try {
+ final Set<String> partitions = getPartitions(tableName);
+ final Set<String> partitionToKeep = getPartitionsToKeep();
+
+ for (final String partition : partitions) {
+ if (!partitionToKeep.contains(partition)) {
+ deletePartition(tableName, partition);
+ }
+ }
+ } catch (final Exception e) {
+ LOGGER.error("Could not rollover table " + tableName, e);
+ }
+ }
+
+ private void deletePartition(final CharSequence tableName, final String
partition) {
+ try (final SqlCompiler compiler = dbContext.getCompiler()) {
+ compiler.compile(String.format(DELETION_QUERY, new
Object[]{tableName, partition}), dbContext.getSqlExecutionContext());
+ } catch (final Exception e) {
+ LOGGER.error("Dropping partition " + partition + " of table " +
tableName + " failed", e);
+ }
+ }
+
+ private Set<String> getPartitions(final CharSequence tableName) throws
Exception {
+ final SqlExecutionContext executionContext =
dbContext.getSqlExecutionContext();
+ final Set<String> result = new HashSet<>();
+
+ try (
+ final SqlCompiler compiler = dbContext.getCompiler();
+ final RecordCursorFactory recordCursorFactory =
compiler.compile(String.format(SELECTION_QUERY, new Object[]{tableName}),
executionContext).getRecordCursorFactory();
+ final RecordCursor cursor =
recordCursorFactory.getCursor(executionContext);
+ ) {
+ while (cursor.hasNext()) {
+ final Record record = cursor.getRecord();
+ result.add(new StringBuilder(record.getStr(0)).toString());
Review comment:
Why are we creating a StringBuilder here?
##########
File path:
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties
##########
@@ -120,11 +120,40 @@
nifi.provenance.repository.concurrent.merge.threads=${nifi.provenance.repository
# Volatile Provenance Respository Properties
nifi.provenance.repository.buffer.size=${nifi.provenance.repository.buffer.size}
-# Component Status Repository
+# Component and Node Status Repository
+
+# Implementation based specification. Must be the classname of
org.apache.nifi.controller.status.history.StatusRepository implementation.
+# This property specifies both the Component and the Node Status Repositories.
+# This approach takes precedence over the builder based specification if
presents.
nifi.components.status.repository.implementation=${nifi.components.status.repository.implementation}
+
+# Builder based specification. Gives the possibility to store Node and
Component Status History information in different storage solutions.
+#
nifi.status.repository.builder.inmemory=org.apache.nifi.controller.status.history.InMemoryStatusRepositoryBuilder
+#
nifi.status.repository.builder.persistent=org.apache.nifi.controller.status.history.EmbeddedQuestDbStatusRepositoryBuilder
+# nifi.status.repository.roles.component=persistent
+# nifi.status.repository.roles.node=inmemory
+
+# Volatile Status Repository Properties
nifi.components.status.repository.buffer.size=${nifi.components.status.repository.buffer.size}
nifi.components.status.snapshot.frequency=${nifi.components.status.snapshot.frequency}
+# QuestDB Status Repository Properties
+#
nifi.status.repository.questdb.persist.frequency=${nifi.status.repository.questdb.persist.frequency}
+#
nifi.status.repository.questdb.persist.roll.frequency=${nifi.status.repository.questdb.persist.roll.frequency}
+#
nifi.status.repository.questdb.persist.batch.size=${nifi.status.repository.questdb.persist.batch.size}
+#
nifi.status.repository.questdb.persist.node.days=${nifi.status.repository.questdb.persist.node.days}
+#
nifi.status.repository.questdb.persist.component.days=${nifi.status.repository.questdb.persist.component.days}
+#
nifi.status.repository.questdb.persist.location=${nifi.status.repository.questdb.persist.location}
+
+# The properties below are used for optimize QuesDB performance. For further
details please see https://questdb.io/docs/reference/sql/create-table
Review comment:
Typo in "QuestDB" - missing the T. Should also read "properties are used
to optimize" rather than "for optimize"
##########
File path:
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/ProcessGroupStatusDescriptor.java
##########
@@ -113,16 +113,16 @@ public String getField() {
private static long calculateTaskMillis(final ProcessGroupStatus status) {
- long nanos = 0L;
Review comment:
Why are these nanos being changed to millis? This leads to a lot of
rounding errors, resulting in the data being both less precise and less
accurate. By holding onto nanos and converting once at the end, it's also more
efficient.
##########
File path:
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties
##########
@@ -120,11 +120,40 @@
nifi.provenance.repository.concurrent.merge.threads=${nifi.provenance.repository
# Volatile Provenance Respository Properties
nifi.provenance.repository.buffer.size=${nifi.provenance.repository.buffer.size}
-# Component Status Repository
+# Component and Node Status Repository
+
+# Implementation based specification. Must be the classname of
org.apache.nifi.controller.status.history.StatusRepository implementation.
+# This property specifies both the Component and the Node Status Repositories.
+# This approach takes precedence over the builder based specification if
presents.
nifi.components.status.repository.implementation=${nifi.components.status.repository.implementation}
+
+# Builder based specification. Gives the possibility to store Node and
Component Status History information in different storage solutions.
+#
nifi.status.repository.builder.inmemory=org.apache.nifi.controller.status.history.InMemoryStatusRepositoryBuilder
+#
nifi.status.repository.builder.persistent=org.apache.nifi.controller.status.history.EmbeddedQuestDbStatusRepositoryBuilder
+# nifi.status.repository.roles.component=persistent
+# nifi.status.repository.roles.node=inmemory
+
+# Volatile Status Repository Properties
nifi.components.status.repository.buffer.size=${nifi.components.status.repository.buffer.size}
nifi.components.status.snapshot.frequency=${nifi.components.status.snapshot.frequency}
+# QuestDB Status Repository Properties
+#
nifi.status.repository.questdb.persist.frequency=${nifi.status.repository.questdb.persist.frequency}
+#
nifi.status.repository.questdb.persist.roll.frequency=${nifi.status.repository.questdb.persist.roll.frequency}
+#
nifi.status.repository.questdb.persist.batch.size=${nifi.status.repository.questdb.persist.batch.size}
Review comment:
These properties also seem too complex to me. Admins shouldn't need to
guess what an appropriate "batch size" is for storing metrics. We should try to
keep this as simple as possible and just configure how frequently we capture a
snapshot. Can always add in additional properties later, if necessary, for
tuning. Just don't want to overwhelm users with 15 additional properties when
all the user really cares about is "I want this persisted for longer and across
restarts."
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]