simonbence commented on a change in pull request #4821: URL: https://github.com/apache/nifi/pull/4821#discussion_r581143770
########## File path: nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/EmbeddedQuestDbRolloverHandler.java ########## @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.controller.status.history; + +import io.questdb.cairo.sql.Record; +import io.questdb.cairo.sql.RecordCursor; +import io.questdb.cairo.sql.RecordCursorFactory; +import io.questdb.griffin.SqlCompiler; +import io.questdb.griffin.SqlExecutionContext; +import org.apache.commons.lang3.time.FastDateFormat; +import org.apache.nifi.controller.status.history.questdb.QuestDbContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +/** + * QuestDB does not provide the possibility for deleting individual lines. Instead there is the option to drop older + * partitions. In order to clean up older status information, the partitions are outside of the scope of data we intend + * to keep will be deleted. + */ +public class EmbeddedQuestDbRolloverHandler implements Runnable { + private static final Logger LOGGER = LoggerFactory.getLogger(EmbeddedQuestDbRolloverHandler.class); + + // Drop keyword is intentionally not uppercase as the query parser only recognizes it in this way + private static final String DELETION_QUERY = "ALTER TABLE %s drop PARTITION '%s'"; + // Distinct keyword is not recognized if the date mapping is not within an inner query + static final String SELECTION_QUERY = "SELECT DISTINCT * FROM (SELECT (to_str(capturedAt, 'yyyy-MM-dd')) AS partitionName FROM %s)"; + + static final FastDateFormat DATE_FORMAT = FastDateFormat.getInstance("yyyy-MM-dd"); + + private final QuestDbContext dbContext; + private final List<String> tables = new ArrayList<>(); + private final int daysToKeepData; + + public EmbeddedQuestDbRolloverHandler(final Collection<String> tables, final int daysToKeepData, final QuestDbContext dbContext) { + this.tables.addAll(tables); + this.dbContext = dbContext; + this.daysToKeepData = daysToKeepData; + } + + @Override + public void run() { + LOGGER.debug("Starting rollover"); + tables.forEach(tableName -> rollOverTable(tableName)); + LOGGER.debug("Finishing rollover"); + } + + private void rollOverTable(final CharSequence tableName) { + try { + final Set<String> partitions = getPartitions(tableName); + final Set<String> partitionToKeep = getPartitionsToKeep(); + + for (final String partition : partitions) { + if (!partitionToKeep.contains(partition)) { + deletePartition(tableName, partition); + } + } + } catch (final Exception e) { + LOGGER.error("Could not rollover table " + tableName, e); + } + } + + private void deletePartition(final CharSequence tableName, final String partition) { + try (final SqlCompiler compiler = dbContext.getCompiler()) { + compiler.compile(String.format(DELETION_QUERY, new Object[]{tableName, partition}), dbContext.getSqlExecutionContext()); + } catch (final Exception e) { + LOGGER.error("Dropping partition " + partition + " of table " + tableName + " failed", e); + } + } + + private Set<String> getPartitions(final CharSequence tableName) throws Exception { + final SqlExecutionContext executionContext = dbContext.getSqlExecutionContext(); + final Set<String> result = new HashSet<>(); + + try ( + final SqlCompiler compiler = dbContext.getCompiler(); + final RecordCursorFactory recordCursorFactory = compiler.compile(String.format(SELECTION_QUERY, new Object[]{tableName}), executionContext).getRecordCursorFactory(); + final RecordCursor cursor = recordCursorFactory.getCursor(executionContext); + ) { + while (cursor.hasNext()) { + final Record record = cursor.getRecord(); + result.add(new StringBuilder(record.getStr(0)).toString()); Review comment: The record return with an undetermined implementation of `CharSequence` (actually it's `CharSequenceView`, which is an internal implementation of the QuestDB) where it is not guaranteed that the `toString` will be implemented, or implemented properly. I was striving to keep in the safe side ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected]
