This is an automated email from the ASF dual-hosted git repository.

haonan pushed a commit to branch metric
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 3effccc2e92172bdf21a06c277990940944a62cb
Author: HTHou <[email protected]>
AuthorDate: Fri Nov 10 12:19:13 2023 +0800

    init
---
 .../main/java/org/apache/iotdb/SessionExample.java | 68 ++++++++++----------
 .../java/org/apache/iotdb/SessionPoolExample.java  | 74 ++++++++--------------
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java |  2 +-
 .../iotdb/db/service/metrics/WritingMetrics.java   | 11 ++++
 .../db/storageengine/rescon/memory/SystemInfo.java | 43 +++++++------
 .../iotdb/commons/service/metric/enums/Metric.java |  9 +++
 6 files changed, 105 insertions(+), 102 deletions(-)

diff --git a/example/session/src/main/java/org/apache/iotdb/SessionExample.java 
b/example/session/src/main/java/org/apache/iotdb/SessionExample.java
index f5c0f1da9e0..b83f5e4b9a8 100644
--- a/example/session/src/main/java/org/apache/iotdb/SessionExample.java
+++ b/example/session/src/main/java/org/apache/iotdb/SessionExample.java
@@ -79,18 +79,18 @@ public class SessionExample {
     // set session fetchSize
     session.setFetchSize(10000);
 
-    try {
-      session.createDatabase("root.sg1");
-    } catch (StatementExecutionException e) {
-      if (e.getStatusCode() != 
TSStatusCode.DATABASE_ALREADY_EXISTS.getStatusCode()) {
-        throw e;
-      }
-    }
-
-    //     createTemplate();
-    createTimeseries();
-    createMultiTimeseries();
-    insertRecord();
+//    try {
+//      session.createDatabase("root.sg1");
+//    } catch (StatementExecutionException e) {
+//      if (e.getStatusCode() != 
TSStatusCode.DATABASE_ALREADY_EXISTS.getStatusCode()) {
+//        throw e;
+//      }
+//    }
+//
+//    //     createTemplate();
+//    createTimeseries();
+//    createMultiTimeseries();
+//    insertRecord();
     insertTablet();
     //    insertTabletWithNullValues();
     //    insertTablets();
@@ -99,28 +99,28 @@ public class SessionExample {
     //    selectInto();
     //    createAndDropContinuousQueries();
     //    nonQuery();
-    query();
-    //    queryWithTimeout();
-    rawDataQuery();
-    lastDataQuery();
-    aggregationQuery();
-    groupByQuery();
-    //    queryByIterator();
-    //    deleteData();
-    //    deleteTimeseries();
-    //    setTimeout();
-
-    sessionEnableRedirect = new Session(LOCAL_HOST, 6667, "root", "root");
-    sessionEnableRedirect.setEnableQueryRedirection(true);
-    sessionEnableRedirect.open(false);
-
-    // set session fetchSize
-    sessionEnableRedirect.setFetchSize(10000);
-
-    fastLastDataQueryForOneDevice();
-    insertRecord4Redirect();
-    query4Redirect();
-    sessionEnableRedirect.close();
+//    query();
+//    //    queryWithTimeout();
+//    rawDataQuery();
+//    lastDataQuery();
+//    aggregationQuery();
+//    groupByQuery();
+//    //    queryByIterator();
+//    //    deleteData();
+//    //    deleteTimeseries();
+//    //    setTimeout();
+//
+//    sessionEnableRedirect = new Session(LOCAL_HOST, 6667, "root", "root");
+//    sessionEnableRedirect.setEnableQueryRedirection(true);
+//    sessionEnableRedirect.open(false);
+//
+//    // set session fetchSize
+//    sessionEnableRedirect.setFetchSize(10000);
+//
+//    fastLastDataQueryForOneDevice();
+//    insertRecord4Redirect();
+//    query4Redirect();
+//    sessionEnableRedirect.close();
     session.close();
   }
 
diff --git 
a/example/session/src/main/java/org/apache/iotdb/SessionPoolExample.java 
b/example/session/src/main/java/org/apache/iotdb/SessionPoolExample.java
index 1e399b7358f..12c6d969608 100644
--- a/example/session/src/main/java/org/apache/iotdb/SessionPoolExample.java
+++ b/example/session/src/main/java/org/apache/iotdb/SessionPoolExample.java
@@ -26,6 +26,8 @@ import org.apache.iotdb.rpc.StatementExecutionException;
 import org.apache.iotdb.session.pool.SessionPool;
 import org.apache.iotdb.tsfile.enums.TSDataType;
 
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -49,7 +51,7 @@ public class SessionPoolExample {
             .port(6667)
             .user("root")
             .password("root")
-            .maxSize(3)
+            .maxSize(20)
             .build();
   }
 
@@ -70,20 +72,20 @@ public class SessionPoolExample {
   public static void main(String[] args)
       throws StatementExecutionException, IoTDBConnectionException, 
InterruptedException {
     // Choose the SessionPool you going to use
-    constructRedirectSessionPool();
+    constructCustomSessionPool();
 
-    service = Executors.newFixedThreadPool(10);
+    service = Executors.newFixedThreadPool(20);
     insertRecord();
+    Thread.sleep(2000);
     queryByRowRecord();
-    Thread.sleep(1000);
-    queryByIterator();
+    Thread.sleep(5000);
     sessionPool.close();
     service.shutdown();
   }
 
   // more insert example, see SessionExample.java
   private static void insertRecord() throws StatementExecutionException, 
IoTDBConnectionException {
-    String deviceId = "root.sg1.d1";
+    String deviceId = "root.sg.d1";
     List<String> measurements = new ArrayList<>();
     List<TSDataType> types = new ArrayList<>();
     measurements.add("s1");
@@ -92,61 +94,35 @@ public class SessionPoolExample {
     types.add(TSDataType.INT64);
     types.add(TSDataType.INT64);
     types.add(TSDataType.INT64);
-
-    for (long time = 0; time < 10; time++) {
-      List<Object> values = new ArrayList<>();
-      values.add(1L);
-      values.add(2L);
-      values.add(3L);
-      sessionPool.insertRecord(deviceId, time, measurements, types, values);
-    }
-  }
-
-  private static void queryByRowRecord() {
-    for (int i = 0; i < 1; i++) {
+    List<Object> values = new ArrayList<>();
+    values.add(1L);
+    values.add(2L);
+    values.add(3L);
+    for (int i = 0; i < 10; i++) {
+      long time = i;
       service.submit(
           () -> {
-            SessionDataSetWrapper wrapper = null;
-            try {
-              wrapper = sessionPool.executeQueryStatement("select * from 
root.sg1.d1");
-              System.out.println(wrapper.getColumnNames());
-              System.out.println(wrapper.getColumnTypes());
-              while (wrapper.hasNext()) {
-                System.out.println(wrapper.next());
+            while (true) {
+              try {
+                sessionPool.insertRecord(deviceId, time, measurements, types, 
values);
+                break;
+              } catch (IoTDBConnectionException | StatementExecutionException 
ignored) {
+
               }
-            } catch (IoTDBConnectionException | StatementExecutionException e) 
{
-              logger.error("Query by row record error", e);
-            } finally {
-              // remember to close data set finally!
-              sessionPool.closeResultSet(wrapper);
             }
           });
     }
   }
 
-  private static void queryByIterator() {
-    for (int i = 0; i < 1; i++) {
+  private static void queryByRowRecord() {
+    for (int i = 1; i < 4; i++) {
+      int finalI = i;
       service.submit(
           () -> {
-            SessionDataSetWrapper wrapper = null;
             try {
-              wrapper = sessionPool.executeQueryStatement("select * from 
root.sg1.d1");
-              // get DataIterator like JDBC
-              DataIterator dataIterator = wrapper.iterator();
-              System.out.println(wrapper.getColumnNames());
-              System.out.println(wrapper.getColumnTypes());
-              while (dataIterator.next()) {
-                StringBuilder builder = new StringBuilder();
-                for (String columnName : wrapper.getColumnNames()) {
-                  builder.append(dataIterator.getString(columnName) + " ");
-                }
-                System.out.println(builder);
-              }
+              sessionPool.createTimeseries("root.sg.d1.s" + finalI, 
TSDataType.INT64, TSEncoding.PLAIN, CompressionType.SNAPPY);
             } catch (IoTDBConnectionException | StatementExecutionException e) 
{
-              logger.error("Query by Iterator error", e);
-            } finally {
-              // remember to close data set finally!
-              sessionPool.closeResultSet(wrapper);
+              logger.error("   ", e);
             }
           });
     }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index f0ec678e894..00f4eaad421 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -613,7 +613,7 @@ public class IoTDBConfig {
   private boolean chunkBufferPoolEnable = false;
 
   /** Switch of creating schema automatically */
-  private boolean enableAutoCreateSchema = true;
+  private boolean enableAutoCreateSchema = false;
 
   /** register time series as which type when receiving boolean string "true" 
or "false" */
   private TSDataType booleanStringInferType = TSDataType.BOOLEAN;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/metrics/WritingMetrics.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/metrics/WritingMetrics.java
index 8e2b611d172..0b170bc54a1 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/metrics/WritingMetrics.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/metrics/WritingMetrics.java
@@ -343,6 +343,9 @@ public class WritingMetrics implements IMetricSet {
       "oldest_mem_table_ram_when_cause_flush";
   public static final String FLUSH_TSFILE_SIZE = "flush_tsfile_size";
 
+  private Histogram flushThreholdHistogram = 
DoNothingMetricManager.DO_NOTHING_HISTOGRAM;
+  private Histogram rejectThreholdHistogram = 
DoNothingMetricManager.DO_NOTHING_HISTOGRAM;
+
   public void bindDataRegionMetrics() {
     List<DataRegion> allDataRegions = 
StorageEngine.getInstance().getAllDataRegions();
     List<DataRegionId> allDataRegionIds = 
StorageEngine.getInstance().getAllDataRegionIds();
@@ -652,6 +655,14 @@ public class WritingMetrics implements IMetricSet {
   public void recordWALBufferEntriesCount(long count) {
     entriesCountHistogram.update(count);
   }
+
+  public void recordFlushThreshold(double flushThreshold) {
+    flushThreholdHistogram.update((long) flushThreshold);
+  }
+
+  public void recordRejectThreshold(double rejectThreshold) {
+    rejectThreholdHistogram.update((long) rejectThreshold);
+  }
   // endregion
 
   @Override
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/memory/SystemInfo.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/memory/SystemInfo.java
index 1ff1244ca18..4fd1a11a6f8 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/memory/SystemInfo.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/memory/SystemInfo.java
@@ -25,6 +25,7 @@ import org.apache.iotdb.commons.utils.TestOnly;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.exception.WriteProcessRejectException;
+import org.apache.iotdb.db.service.metrics.WritingMetrics;
 import org.apache.iotdb.db.storageengine.dataregion.DataRegionInfo;
 import 
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.exception.CompactionFileCountExceededException;
 import 
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.exception.CompactionMemoryNotEnoughException;
@@ -62,8 +63,8 @@ public class SystemInfo {
 
   private ExecutorService flushTaskSubmitThreadPool =
       
IoTDBThreadPoolFactory.newSingleThreadExecutor(ThreadName.FLUSH_TASK_SUBMIT.getName());
-  private double FLUSH_THERSHOLD = memorySizeForMemtable * 
config.getFlushProportion();
-  private double REJECT_THERSHOLD = memorySizeForMemtable * 
config.getRejectProportion();
+  private double FLUSH_THRESHOLD = memorySizeForMemtable * 
config.getFlushProportion();
+  private double REJECT_THRESHOLD = memorySizeForMemtable * 
config.getRejectProportion();
 
   private volatile boolean isEncodingFasterThanIo = true;
 
@@ -93,10 +94,10 @@ public class SystemInfo {
     }
     reportedStorageGroupMemCostMap.put(dataRegionInfo, 
currentDataRegionMemCost);
     dataRegionInfo.setLastReportedSize(currentDataRegionMemCost);
-    if (totalStorageGroupMemCost < FLUSH_THERSHOLD) {
+    if (totalStorageGroupMemCost < FLUSH_THRESHOLD) {
       return true;
-    } else if (totalStorageGroupMemCost >= FLUSH_THERSHOLD
-        && totalStorageGroupMemCost < REJECT_THERSHOLD) {
+    } else if (totalStorageGroupMemCost >= FLUSH_THRESHOLD
+        && totalStorageGroupMemCost < REJECT_THRESHOLD) {
       logger.debug(
           "The total database mem costs are too large, call for flushing. "
               + "Current sg cost is {}",
@@ -109,7 +110,7 @@ public class SystemInfo {
           dataRegionInfo.getDataRegion().getDatabaseName(),
           delta,
           totalStorageGroupMemCost,
-          REJECT_THERSHOLD);
+          REJECT_THRESHOLD);
       rejected = true;
       if (chooseMemTablesToMarkFlush(tsFileProcessor)) {
         if (totalStorageGroupMemCost < memorySizeForMemtable) {
@@ -145,8 +146,8 @@ public class SystemInfo {
       reportedStorageGroupMemCostMap.put(dataRegionInfo, 
currentDataRegionMemCost);
     }
 
-    if (totalStorageGroupMemCost >= FLUSH_THERSHOLD
-        && totalStorageGroupMemCost < REJECT_THERSHOLD) {
+    if (totalStorageGroupMemCost >= FLUSH_THRESHOLD
+        && totalStorageGroupMemCost < REJECT_THRESHOLD) {
       logger.debug(
           "SG ({}) released memory (delta: {}) but still exceeding flush 
proportion (totalSgMemCost: {}), call flush.",
           dataRegionInfo.getDataRegion().getDatabaseName(),
@@ -161,7 +162,7 @@ public class SystemInfo {
       }
       logCurrentTotalSGMemory();
       rejected = false;
-    } else if (totalStorageGroupMemCost >= REJECT_THERSHOLD) {
+    } else if (totalStorageGroupMemCost >= REJECT_THRESHOLD) {
       logger.warn(
           "SG ({}) released memory (delta: {}), but system is still in reject 
status (totalSgMemCost: {}).",
           dataRegionInfo.getDataRegion().getDatabaseName(),
@@ -265,8 +266,10 @@ public class SystemInfo {
             (config.getAllocateMemoryForStorageEngine() * 
config.getWriteProportionForMemtable());
     memorySizeForCompaction =
         (long) (config.getAllocateMemoryForStorageEngine() * 
config.getCompactionProportion());
-    FLUSH_THERSHOLD = memorySizeForMemtable * config.getFlushProportion();
-    REJECT_THERSHOLD = memorySizeForMemtable * config.getRejectProportion();
+    FLUSH_THRESHOLD = memorySizeForMemtable * config.getFlushProportion();
+    REJECT_THRESHOLD = memorySizeForMemtable * config.getRejectProportion();
+    WritingMetrics.getInstance().recordFlushThreshold(FLUSH_THRESHOLD);
+    WritingMetrics.getInstance().recordRejectThreshold(REJECT_THRESHOLD);
   }
 
   @TestOnly
@@ -317,7 +320,7 @@ public class SystemInfo {
     boolean isCurrentTsFileProcessorSelected = false;
     long memCost = 0;
     long activeMemSize = totalStorageGroupMemCost - flushingMemTablesCost;
-    while (activeMemSize - memCost > FLUSH_THERSHOLD) {
+    while (activeMemSize - memCost > FLUSH_THRESHOLD) {
       if (allTsFileProcessors.isEmpty()
           || allTsFileProcessors.peek().getWorkMemTableRamCost() == 0) {
         return false;
@@ -368,14 +371,18 @@ public class SystemInfo {
 
   public synchronized void applyTemporaryMemoryForFlushing(long 
estimatedTemporaryMemSize) {
     memorySizeForMemtable -= estimatedTemporaryMemSize;
-    FLUSH_THERSHOLD = memorySizeForMemtable * config.getFlushProportion();
-    REJECT_THERSHOLD = memorySizeForMemtable * config.getRejectProportion();
+    FLUSH_THRESHOLD = memorySizeForMemtable * config.getFlushProportion();
+    REJECT_THRESHOLD = memorySizeForMemtable * config.getRejectProportion();
+    WritingMetrics.getInstance().recordFlushThreshold(FLUSH_THRESHOLD);
+    WritingMetrics.getInstance().recordRejectThreshold(REJECT_THRESHOLD);
   }
 
   public synchronized void releaseTemporaryMemoryForFlushing(long 
estimatedTemporaryMemSize) {
     memorySizeForMemtable += estimatedTemporaryMemSize;
-    FLUSH_THERSHOLD = memorySizeForMemtable * config.getFlushProportion();
-    REJECT_THERSHOLD = memorySizeForMemtable * config.getRejectProportion();
+    FLUSH_THRESHOLD = memorySizeForMemtable * config.getFlushProportion();
+    REJECT_THRESHOLD = memorySizeForMemtable * config.getRejectProportion();
+    WritingMetrics.getInstance().recordFlushThreshold(FLUSH_THRESHOLD);
+    WritingMetrics.getInstance().recordRejectThreshold(REJECT_THRESHOLD);
   }
 
   public long getTotalMemTableSize() {
@@ -383,11 +390,11 @@ public class SystemInfo {
   }
 
   public double getFlushThershold() {
-    return FLUSH_THERSHOLD;
+    return FLUSH_THRESHOLD;
   }
 
   public double getRejectThershold() {
-    return REJECT_THERSHOLD;
+    return REJECT_THRESHOLD;
   }
 
   public int flushingMemTableNum() {
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java
index f8c1a9fe9dc..a0c38e6a0bc 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java
@@ -64,6 +64,15 @@ public enum Metric {
   WAL_COST("wal_cost"),
   FLUSH_COST("flush_cost"),
   FLUSH_SUB_TASK_COST("flush_sub_task_cost"),
+  FLUSH_THRESHOLD("flush_threshold"),
+  REJECT_THRESHOLD("reject_threshold"),
+  TIMED_FLUSH_MEMTABLE_COUNT("timed_flush_memtable_count"),
+  WAL_FLUSH_MEMTABLE_COUNT("wal_flush_memtable_count"),
+  SERIES_FULL_FLUSH_MEMTABLE("series_full_flush_memtable"),
+  ACTIVE_MEMTABLE_COUNT("active_memtable_count"),
+  ACTIVE_TIME_PARTITION_COUNT("active_time_partition_count"),
+  MEMTABLE_LIVE_DURATION("memtable_live_duration"),
+
   // compaction related
   DATA_WRITTEN("data_written"),
   DATA_READ("data_read"),

Reply via email to