This is an automated email from the ASF dual-hosted git repository. haonan pushed a commit to branch metric in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 3effccc2e92172bdf21a06c277990940944a62cb Author: HTHou <[email protected]> AuthorDate: Fri Nov 10 12:19:13 2023 +0800 init --- .../main/java/org/apache/iotdb/SessionExample.java | 68 ++++++++++---------- .../java/org/apache/iotdb/SessionPoolExample.java | 74 ++++++++-------------- .../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 2 +- .../iotdb/db/service/metrics/WritingMetrics.java | 11 ++++ .../db/storageengine/rescon/memory/SystemInfo.java | 43 +++++++------ .../iotdb/commons/service/metric/enums/Metric.java | 9 +++ 6 files changed, 105 insertions(+), 102 deletions(-) diff --git a/example/session/src/main/java/org/apache/iotdb/SessionExample.java b/example/session/src/main/java/org/apache/iotdb/SessionExample.java index f5c0f1da9e0..b83f5e4b9a8 100644 --- a/example/session/src/main/java/org/apache/iotdb/SessionExample.java +++ b/example/session/src/main/java/org/apache/iotdb/SessionExample.java @@ -79,18 +79,18 @@ public class SessionExample { // set session fetchSize session.setFetchSize(10000); - try { - session.createDatabase("root.sg1"); - } catch (StatementExecutionException e) { - if (e.getStatusCode() != TSStatusCode.DATABASE_ALREADY_EXISTS.getStatusCode()) { - throw e; - } - } - - // createTemplate(); - createTimeseries(); - createMultiTimeseries(); - insertRecord(); +// try { +// session.createDatabase("root.sg1"); +// } catch (StatementExecutionException e) { +// if (e.getStatusCode() != TSStatusCode.DATABASE_ALREADY_EXISTS.getStatusCode()) { +// throw e; +// } +// } +// +// // createTemplate(); +// createTimeseries(); +// createMultiTimeseries(); +// insertRecord(); insertTablet(); // insertTabletWithNullValues(); // insertTablets(); @@ -99,28 +99,28 @@ public class SessionExample { // selectInto(); // createAndDropContinuousQueries(); // nonQuery(); - query(); - // queryWithTimeout(); - rawDataQuery(); - lastDataQuery(); - aggregationQuery(); - groupByQuery(); - // queryByIterator(); - // deleteData(); - // deleteTimeseries(); - // setTimeout(); - - sessionEnableRedirect = new Session(LOCAL_HOST, 6667, "root", "root"); - sessionEnableRedirect.setEnableQueryRedirection(true); - sessionEnableRedirect.open(false); - - // set session fetchSize - sessionEnableRedirect.setFetchSize(10000); - - fastLastDataQueryForOneDevice(); - insertRecord4Redirect(); - query4Redirect(); - sessionEnableRedirect.close(); +// query(); +// // queryWithTimeout(); +// rawDataQuery(); +// lastDataQuery(); +// aggregationQuery(); +// groupByQuery(); +// // queryByIterator(); +// // deleteData(); +// // deleteTimeseries(); +// // setTimeout(); +// +// sessionEnableRedirect = new Session(LOCAL_HOST, 6667, "root", "root"); +// sessionEnableRedirect.setEnableQueryRedirection(true); +// sessionEnableRedirect.open(false); +// +// // set session fetchSize +// sessionEnableRedirect.setFetchSize(10000); +// +// fastLastDataQueryForOneDevice(); +// insertRecord4Redirect(); +// query4Redirect(); +// sessionEnableRedirect.close(); session.close(); } diff --git a/example/session/src/main/java/org/apache/iotdb/SessionPoolExample.java b/example/session/src/main/java/org/apache/iotdb/SessionPoolExample.java index 1e399b7358f..12c6d969608 100644 --- a/example/session/src/main/java/org/apache/iotdb/SessionPoolExample.java +++ b/example/session/src/main/java/org/apache/iotdb/SessionPoolExample.java @@ -26,6 +26,8 @@ import org.apache.iotdb.rpc.StatementExecutionException; import org.apache.iotdb.session.pool.SessionPool; import org.apache.iotdb.tsfile.enums.TSDataType; +import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType; +import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -49,7 +51,7 @@ public class SessionPoolExample { .port(6667) .user("root") .password("root") - .maxSize(3) + .maxSize(20) .build(); } @@ -70,20 +72,20 @@ public class SessionPoolExample { public static void main(String[] args) throws StatementExecutionException, IoTDBConnectionException, InterruptedException { // Choose the SessionPool you going to use - constructRedirectSessionPool(); + constructCustomSessionPool(); - service = Executors.newFixedThreadPool(10); + service = Executors.newFixedThreadPool(20); insertRecord(); + Thread.sleep(2000); queryByRowRecord(); - Thread.sleep(1000); - queryByIterator(); + Thread.sleep(5000); sessionPool.close(); service.shutdown(); } // more insert example, see SessionExample.java private static void insertRecord() throws StatementExecutionException, IoTDBConnectionException { - String deviceId = "root.sg1.d1"; + String deviceId = "root.sg.d1"; List<String> measurements = new ArrayList<>(); List<TSDataType> types = new ArrayList<>(); measurements.add("s1"); @@ -92,61 +94,35 @@ public class SessionPoolExample { types.add(TSDataType.INT64); types.add(TSDataType.INT64); types.add(TSDataType.INT64); - - for (long time = 0; time < 10; time++) { - List<Object> values = new ArrayList<>(); - values.add(1L); - values.add(2L); - values.add(3L); - sessionPool.insertRecord(deviceId, time, measurements, types, values); - } - } - - private static void queryByRowRecord() { - for (int i = 0; i < 1; i++) { + List<Object> values = new ArrayList<>(); + values.add(1L); + values.add(2L); + values.add(3L); + for (int i = 0; i < 10; i++) { + long time = i; service.submit( () -> { - SessionDataSetWrapper wrapper = null; - try { - wrapper = sessionPool.executeQueryStatement("select * from root.sg1.d1"); - System.out.println(wrapper.getColumnNames()); - System.out.println(wrapper.getColumnTypes()); - while (wrapper.hasNext()) { - System.out.println(wrapper.next()); + while (true) { + try { + sessionPool.insertRecord(deviceId, time, measurements, types, values); + break; + } catch (IoTDBConnectionException | StatementExecutionException ignored) { + } - } catch (IoTDBConnectionException | StatementExecutionException e) { - logger.error("Query by row record error", e); - } finally { - // remember to close data set finally! - sessionPool.closeResultSet(wrapper); } }); } } - private static void queryByIterator() { - for (int i = 0; i < 1; i++) { + private static void queryByRowRecord() { + for (int i = 1; i < 4; i++) { + int finalI = i; service.submit( () -> { - SessionDataSetWrapper wrapper = null; try { - wrapper = sessionPool.executeQueryStatement("select * from root.sg1.d1"); - // get DataIterator like JDBC - DataIterator dataIterator = wrapper.iterator(); - System.out.println(wrapper.getColumnNames()); - System.out.println(wrapper.getColumnTypes()); - while (dataIterator.next()) { - StringBuilder builder = new StringBuilder(); - for (String columnName : wrapper.getColumnNames()) { - builder.append(dataIterator.getString(columnName) + " "); - } - System.out.println(builder); - } + sessionPool.createTimeseries("root.sg.d1.s" + finalI, TSDataType.INT64, TSEncoding.PLAIN, CompressionType.SNAPPY); } catch (IoTDBConnectionException | StatementExecutionException e) { - logger.error("Query by Iterator error", e); - } finally { - // remember to close data set finally! - sessionPool.closeResultSet(wrapper); + logger.error(" ", e); } }); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java index f0ec678e894..00f4eaad421 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java @@ -613,7 +613,7 @@ public class IoTDBConfig { private boolean chunkBufferPoolEnable = false; /** Switch of creating schema automatically */ - private boolean enableAutoCreateSchema = true; + private boolean enableAutoCreateSchema = false; /** register time series as which type when receiving boolean string "true" or "false" */ private TSDataType booleanStringInferType = TSDataType.BOOLEAN; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/metrics/WritingMetrics.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/metrics/WritingMetrics.java index 8e2b611d172..0b170bc54a1 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/metrics/WritingMetrics.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/metrics/WritingMetrics.java @@ -343,6 +343,9 @@ public class WritingMetrics implements IMetricSet { "oldest_mem_table_ram_when_cause_flush"; public static final String FLUSH_TSFILE_SIZE = "flush_tsfile_size"; + private Histogram flushThreholdHistogram = DoNothingMetricManager.DO_NOTHING_HISTOGRAM; + private Histogram rejectThreholdHistogram = DoNothingMetricManager.DO_NOTHING_HISTOGRAM; + public void bindDataRegionMetrics() { List<DataRegion> allDataRegions = StorageEngine.getInstance().getAllDataRegions(); List<DataRegionId> allDataRegionIds = StorageEngine.getInstance().getAllDataRegionIds(); @@ -652,6 +655,14 @@ public class WritingMetrics implements IMetricSet { public void recordWALBufferEntriesCount(long count) { entriesCountHistogram.update(count); } + + public void recordFlushThreshold(double flushThreshold) { + flushThreholdHistogram.update((long) flushThreshold); + } + + public void recordRejectThreshold(double rejectThreshold) { + rejectThreholdHistogram.update((long) rejectThreshold); + } // endregion @Override diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/memory/SystemInfo.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/memory/SystemInfo.java index 1ff1244ca18..4fd1a11a6f8 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/memory/SystemInfo.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/memory/SystemInfo.java @@ -25,6 +25,7 @@ import org.apache.iotdb.commons.utils.TestOnly; import org.apache.iotdb.db.conf.IoTDBConfig; import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.exception.WriteProcessRejectException; +import org.apache.iotdb.db.service.metrics.WritingMetrics; import org.apache.iotdb.db.storageengine.dataregion.DataRegionInfo; import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.exception.CompactionFileCountExceededException; import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.exception.CompactionMemoryNotEnoughException; @@ -62,8 +63,8 @@ public class SystemInfo { private ExecutorService flushTaskSubmitThreadPool = IoTDBThreadPoolFactory.newSingleThreadExecutor(ThreadName.FLUSH_TASK_SUBMIT.getName()); - private double FLUSH_THERSHOLD = memorySizeForMemtable * config.getFlushProportion(); - private double REJECT_THERSHOLD = memorySizeForMemtable * config.getRejectProportion(); + private double FLUSH_THRESHOLD = memorySizeForMemtable * config.getFlushProportion(); + private double REJECT_THRESHOLD = memorySizeForMemtable * config.getRejectProportion(); private volatile boolean isEncodingFasterThanIo = true; @@ -93,10 +94,10 @@ public class SystemInfo { } reportedStorageGroupMemCostMap.put(dataRegionInfo, currentDataRegionMemCost); dataRegionInfo.setLastReportedSize(currentDataRegionMemCost); - if (totalStorageGroupMemCost < FLUSH_THERSHOLD) { + if (totalStorageGroupMemCost < FLUSH_THRESHOLD) { return true; - } else if (totalStorageGroupMemCost >= FLUSH_THERSHOLD - && totalStorageGroupMemCost < REJECT_THERSHOLD) { + } else if (totalStorageGroupMemCost >= FLUSH_THRESHOLD + && totalStorageGroupMemCost < REJECT_THRESHOLD) { logger.debug( "The total database mem costs are too large, call for flushing. " + "Current sg cost is {}", @@ -109,7 +110,7 @@ public class SystemInfo { dataRegionInfo.getDataRegion().getDatabaseName(), delta, totalStorageGroupMemCost, - REJECT_THERSHOLD); + REJECT_THRESHOLD); rejected = true; if (chooseMemTablesToMarkFlush(tsFileProcessor)) { if (totalStorageGroupMemCost < memorySizeForMemtable) { @@ -145,8 +146,8 @@ public class SystemInfo { reportedStorageGroupMemCostMap.put(dataRegionInfo, currentDataRegionMemCost); } - if (totalStorageGroupMemCost >= FLUSH_THERSHOLD - && totalStorageGroupMemCost < REJECT_THERSHOLD) { + if (totalStorageGroupMemCost >= FLUSH_THRESHOLD + && totalStorageGroupMemCost < REJECT_THRESHOLD) { logger.debug( "SG ({}) released memory (delta: {}) but still exceeding flush proportion (totalSgMemCost: {}), call flush.", dataRegionInfo.getDataRegion().getDatabaseName(), @@ -161,7 +162,7 @@ public class SystemInfo { } logCurrentTotalSGMemory(); rejected = false; - } else if (totalStorageGroupMemCost >= REJECT_THERSHOLD) { + } else if (totalStorageGroupMemCost >= REJECT_THRESHOLD) { logger.warn( "SG ({}) released memory (delta: {}), but system is still in reject status (totalSgMemCost: {}).", dataRegionInfo.getDataRegion().getDatabaseName(), @@ -265,8 +266,10 @@ public class SystemInfo { (config.getAllocateMemoryForStorageEngine() * config.getWriteProportionForMemtable()); memorySizeForCompaction = (long) (config.getAllocateMemoryForStorageEngine() * config.getCompactionProportion()); - FLUSH_THERSHOLD = memorySizeForMemtable * config.getFlushProportion(); - REJECT_THERSHOLD = memorySizeForMemtable * config.getRejectProportion(); + FLUSH_THRESHOLD = memorySizeForMemtable * config.getFlushProportion(); + REJECT_THRESHOLD = memorySizeForMemtable * config.getRejectProportion(); + WritingMetrics.getInstance().recordFlushThreshold(FLUSH_THRESHOLD); + WritingMetrics.getInstance().recordRejectThreshold(REJECT_THRESHOLD); } @TestOnly @@ -317,7 +320,7 @@ public class SystemInfo { boolean isCurrentTsFileProcessorSelected = false; long memCost = 0; long activeMemSize = totalStorageGroupMemCost - flushingMemTablesCost; - while (activeMemSize - memCost > FLUSH_THERSHOLD) { + while (activeMemSize - memCost > FLUSH_THRESHOLD) { if (allTsFileProcessors.isEmpty() || allTsFileProcessors.peek().getWorkMemTableRamCost() == 0) { return false; @@ -368,14 +371,18 @@ public class SystemInfo { public synchronized void applyTemporaryMemoryForFlushing(long estimatedTemporaryMemSize) { memorySizeForMemtable -= estimatedTemporaryMemSize; - FLUSH_THERSHOLD = memorySizeForMemtable * config.getFlushProportion(); - REJECT_THERSHOLD = memorySizeForMemtable * config.getRejectProportion(); + FLUSH_THRESHOLD = memorySizeForMemtable * config.getFlushProportion(); + REJECT_THRESHOLD = memorySizeForMemtable * config.getRejectProportion(); + WritingMetrics.getInstance().recordFlushThreshold(FLUSH_THRESHOLD); + WritingMetrics.getInstance().recordRejectThreshold(REJECT_THRESHOLD); } public synchronized void releaseTemporaryMemoryForFlushing(long estimatedTemporaryMemSize) { memorySizeForMemtable += estimatedTemporaryMemSize; - FLUSH_THERSHOLD = memorySizeForMemtable * config.getFlushProportion(); - REJECT_THERSHOLD = memorySizeForMemtable * config.getRejectProportion(); + FLUSH_THRESHOLD = memorySizeForMemtable * config.getFlushProportion(); + REJECT_THRESHOLD = memorySizeForMemtable * config.getRejectProportion(); + WritingMetrics.getInstance().recordFlushThreshold(FLUSH_THRESHOLD); + WritingMetrics.getInstance().recordRejectThreshold(REJECT_THRESHOLD); } public long getTotalMemTableSize() { @@ -383,11 +390,11 @@ public class SystemInfo { } public double getFlushThershold() { - return FLUSH_THERSHOLD; + return FLUSH_THRESHOLD; } public double getRejectThershold() { - return REJECT_THERSHOLD; + return REJECT_THRESHOLD; } public int flushingMemTableNum() { diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java index f8c1a9fe9dc..a0c38e6a0bc 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java @@ -64,6 +64,15 @@ public enum Metric { WAL_COST("wal_cost"), FLUSH_COST("flush_cost"), FLUSH_SUB_TASK_COST("flush_sub_task_cost"), + FLUSH_THRESHOLD("flush_threshold"), + REJECT_THRESHOLD("reject_threshold"), + TIMED_FLUSH_MEMTABLE_COUNT("timed_flush_memtable_count"), + WAL_FLUSH_MEMTABLE_COUNT("wal_flush_memtable_count"), + SERIES_FULL_FLUSH_MEMTABLE("series_full_flush_memtable"), + ACTIVE_MEMTABLE_COUNT("active_memtable_count"), + ACTIVE_TIME_PARTITION_COUNT("active_time_partition_count"), + MEMTABLE_LIVE_DURATION("memtable_live_duration"), + // compaction related DATA_WRITTEN("data_written"), DATA_READ("data_read"),
