This is an automated email from the ASF dual-hosted git repository. haonan pushed a commit to branch optimize_flushing_memtable_check in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 9e44dd1b6cb7a749e0b593c8bfaa2434dce62815 Author: HTHou <[email protected]> AuthorDate: Mon Feb 26 10:28:51 2024 +0800 Optimize the closing memtable check logic --- .../main/java/org/apache/iotdb/SessionExample.java | 87 +++++++++++----------- .../resources/conf/iotdb-datanode.properties | 4 +- .../db/storageengine/dataregion/DataRegion.java | 13 ++-- 3 files changed, 53 insertions(+), 51 deletions(-) diff --git a/example/session/src/main/java/org/apache/iotdb/SessionExample.java b/example/session/src/main/java/org/apache/iotdb/SessionExample.java index d5f40452ea8..2f642171341 100644 --- a/example/session/src/main/java/org/apache/iotdb/SessionExample.java +++ b/example/session/src/main/java/org/apache/iotdb/SessionExample.java @@ -26,7 +26,6 @@ import org.apache.iotdb.isession.template.Template; import org.apache.iotdb.isession.util.Version; import org.apache.iotdb.rpc.IoTDBConnectionException; import org.apache.iotdb.rpc.StatementExecutionException; -import org.apache.iotdb.rpc.TSStatusCode; import org.apache.iotdb.session.Session; import org.apache.iotdb.session.template.MeasurementNode; import org.apache.iotdb.tsfile.common.conf.TSFileConfig; @@ -78,49 +77,49 @@ public class SessionExample { // set session fetchSize session.setFetchSize(10000); - - try { - session.createDatabase("root.sg1"); - } catch (StatementExecutionException e) { - if (e.getStatusCode() != TSStatusCode.DATABASE_ALREADY_EXISTS.getStatusCode()) { - throw e; - } - } - - // createTemplate(); - createTimeseries(); - createMultiTimeseries(); - insertRecord(); + // + // try { + // session.createDatabase("root.sg1"); + // } catch (StatementExecutionException e) { + // if (e.getStatusCode() != TSStatusCode.DATABASE_ALREADY_EXISTS.getStatusCode()) { + // throw e; + // } + // } + // + // // createTemplate(); + // createTimeseries(); + // createMultiTimeseries(); + // insertRecord(); insertTablet(); - // insertTabletWithNullValues(); - // insertTablets(); - // insertRecords(); - // insertText(); - // selectInto(); - // createAndDropContinuousQueries(); - // nonQuery(); - query(); - // queryWithTimeout(); - rawDataQuery(); - lastDataQuery(); - aggregationQuery(); - groupByQuery(); - // queryByIterator(); - // deleteData(); - // deleteTimeseries(); - // setTimeout(); - - sessionEnableRedirect = new Session(LOCAL_HOST, 6667, "root", "root"); - sessionEnableRedirect.setEnableQueryRedirection(true); - sessionEnableRedirect.open(false); - - // set session fetchSize - sessionEnableRedirect.setFetchSize(10000); - - fastLastDataQueryForOneDevice(); - insertRecord4Redirect(); - query4Redirect(); - sessionEnableRedirect.close(); + // // insertTabletWithNullValues(); + // // insertTablets(); + // // insertRecords(); + // // insertText(); + // // selectInto(); + // // createAndDropContinuousQueries(); + // // nonQuery(); + // query(); + // // queryWithTimeout(); + // rawDataQuery(); + // lastDataQuery(); + // aggregationQuery(); + // groupByQuery(); + // // queryByIterator(); + // // deleteData(); + // // deleteTimeseries(); + // // setTimeout(); + // + // sessionEnableRedirect = new Session(LOCAL_HOST, 6667, "root", "root"); + // sessionEnableRedirect.setEnableQueryRedirection(true); + // sessionEnableRedirect.open(false); + // + // // set session fetchSize + // sessionEnableRedirect.setFetchSize(10000); + // + // fastLastDataQueryForOneDevice(); + // insertRecord4Redirect(); + // query4Redirect(); + // sessionEnableRedirect.close(); session.close(); } @@ -393,7 +392,7 @@ public class SessionExample { // The schema of measurements of one device // only measurementId and data type in MeasurementSchema take effects in Tablet List<MeasurementSchema> schemaList = new ArrayList<>(); - schemaList.add(new MeasurementSchema("s1", TSDataType.INT64)); + schemaList.add(new MeasurementSchema("`s1.aaa`", TSDataType.INT64)); schemaList.add(new MeasurementSchema("s2", TSDataType.INT64)); schemaList.add(new MeasurementSchema("s3", TSDataType.INT64)); diff --git a/iotdb-core/datanode/src/assembly/resources/conf/iotdb-datanode.properties b/iotdb-core/datanode/src/assembly/resources/conf/iotdb-datanode.properties index e6ee742796e..2d67f46b700 100644 --- a/iotdb-core/datanode/src/assembly/resources/conf/iotdb-datanode.properties +++ b/iotdb-core/datanode/src/assembly/resources/conf/iotdb-datanode.properties @@ -199,7 +199,7 @@ dn_seed_config_node=127.0.0.1:10710 # Note: If wal_dirs is assigned an empty string(i.e.,zero-size), it will be handled as a relative path. # For windows platform # If its prefix is a drive specifier followed by "\\", or if its prefix is "\\\\", then the path is absolute. Otherwise, it is relative. -# dn_wal_dirs=data\\datanode\\wal +dn_wal_dirs=data\\datanode\\wal # For Linux platform # If its prefix is "/", then the path is absolute. Otherwise, it is relative. # dn_wal_dirs=data/datanode/wal @@ -313,4 +313,4 @@ dn_metric_prometheus_reporter_port=9092 # trust_store_pwd="" # SSL timeout (in seconds) -# idle_timeout_in_seconds=50000 \ No newline at end of file +# idle_timeout_in_seconds=50000 diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java index d5cf0821ba4..e0b853755e3 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java @@ -105,7 +105,6 @@ import org.apache.iotdb.db.storageengine.rescon.memory.TimePartitionManager; import org.apache.iotdb.db.storageengine.rescon.memory.TsFileResourceManager; import org.apache.iotdb.db.storageengine.rescon.quotas.DataNodeSpaceQuotaManager; import org.apache.iotdb.db.tools.settle.TsFileAndModSettleTool; -import org.apache.iotdb.db.utils.CopyOnReadLinkedList; import org.apache.iotdb.db.utils.DateTimeUtils; import org.apache.iotdb.rpc.RpcUtils; import org.apache.iotdb.rpc.TSStatusCode; @@ -218,12 +217,11 @@ public class DataRegion implements IDataRegionForQuery { private final TreeMap<Long, TsFileProcessor> workUnsequenceTsFileProcessors = new TreeMap<>(); /** sequence tsfile processors which are closing. */ - private final CopyOnReadLinkedList<TsFileProcessor> closingSequenceTsFileProcessor = - new CopyOnReadLinkedList<>(); + private final Set<TsFileProcessor> closingSequenceTsFileProcessor = ConcurrentHashMap.newKeySet(); /** unsequence tsfile processors which are closing. */ - private final CopyOnReadLinkedList<TsFileProcessor> closingUnSequenceTsFileProcessor = - new CopyOnReadLinkedList<>(); + private final Set<TsFileProcessor> closingUnSequenceTsFileProcessor = + ConcurrentHashMap.newKeySet(); /** data region id. */ private final String dataRegionId; @@ -1300,6 +1298,11 @@ public class DataRegion implements IDataRegionForQuery { * @param tsFileProcessor tsfile processor in which memTable to be flushed */ public void submitAFlushTaskWhenShouldFlush(TsFileProcessor tsFileProcessor) { + if (closingSequenceTsFileProcessor.contains(tsFileProcessor) + || closingUnSequenceTsFileProcessor.contains(tsFileProcessor) + || tsFileProcessor.alreadyMarkedClosing()) { + return; + } writeLock("submitAFlushTaskWhenShouldFlush"); try { // check memtable size and may asyncTryToFlush the work memtable
