This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch ty/TableModelGrammar
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 5f8197e04aa100f3c59a4c177ab93f9496c702c6
Author: Potato <[email protected]>
AuthorDate: Tue Jul 23 09:56:14 2024 +0800

    Ensure the flush total points statistic function works correctly when 
enable_auto_create_schema is false #12990
    
    Signed-off-by: OneSizeFitQuorum <[email protected]>
    (cherry picked from commit 392625f38bff72fd3c7979fa4f98f0db738e8cde)
---
 .../metrics/IoTDBInternalLocalReporter.java        | 100 +++++++++++++++------
 1 file changed, 73 insertions(+), 27 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/metrics/IoTDBInternalLocalReporter.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/metrics/IoTDBInternalLocalReporter.java
index f33624fb0b2..2b6478f7959 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/metrics/IoTDBInternalLocalReporter.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/metrics/IoTDBInternalLocalReporter.java
@@ -44,16 +44,19 @@ import 
org.apache.iotdb.db.queryengine.plan.analyze.schema.ISchemaFetcher;
 import org.apache.iotdb.db.queryengine.plan.execution.ExecutionResult;
 import org.apache.iotdb.db.queryengine.plan.parser.StatementGenerator;
 import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowStatement;
+import 
org.apache.iotdb.db.queryengine.plan.statement.metadata.CreateMultiTimeSeriesStatement;
 import org.apache.iotdb.metrics.config.MetricConfigDescriptor;
 import org.apache.iotdb.metrics.reporter.iotdb.IoTDBInternalReporter;
 import org.apache.iotdb.metrics.utils.InternalReporterType;
 import org.apache.iotdb.metrics.utils.ReporterType;
 import org.apache.iotdb.rpc.IoTDBConnectionException;
 import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.service.rpc.thrift.TSCreateMultiTimeseriesReq;
 import org.apache.iotdb.service.rpc.thrift.TSInsertRecordReq;
 import org.apache.iotdb.session.util.SessionUtils;
 
 import org.apache.thrift.TException;
+import org.apache.tsfile.common.conf.TSFileDescriptor;
 import org.apache.tsfile.enums.TSDataType;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -69,7 +72,10 @@ import java.util.concurrent.Future;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 
+import static 
org.apache.iotdb.db.utils.EncodingInferenceUtils.getDefaultEncoding;
+
 public class IoTDBInternalLocalReporter extends IoTDBInternalReporter {
+
   private static final Logger LOGGER = 
LoggerFactory.getLogger(IoTDBInternalLocalReporter.class);
   private static final SessionManager SESSION_MANAGER = 
SessionManager.getInstance();
   private static final Coordinator COORDINATOR = Coordinator.getInstance();
@@ -159,43 +165,83 @@ public class IoTDBInternalLocalReporter extends 
IoTDBInternalReporter {
     service.execute(
         () -> {
           try {
-            TSInsertRecordReq request = new TSInsertRecordReq();
-            List<String> measurements = new ArrayList<>();
-            List<TSDataType> types = new ArrayList<>();
-            List<Object> values = new ArrayList<>();
-            for (Map.Entry<String, Object> entry : valueMap.entrySet()) {
-              String measurement = entry.getKey();
-              Object value = entry.getValue();
-              measurements.add(measurement);
-              types.add(inferType(value));
-              values.add(value);
+            TSStatus result = insertRecord(valueMap, prefix, time);
+            if (result.getCode() == 
TSStatusCode.METADATA_ERROR.getStatusCode()) {
+              createTimeSeries(valueMap, prefix);
+              result = insertRecord(valueMap, prefix, time);
             }
-            ByteBuffer buffer = SessionUtils.getValueBuffer(types, values);
-
-            request.setPrefixPath(prefix);
-            request.setTimestamp(time);
-            request.setMeasurements(measurements);
-            request.setValues(buffer);
-            request.setIsAligned(false);
-
-            InsertRowStatement s = StatementGenerator.createStatement(request);
-            final long queryId = SESSION_MANAGER.requestQueryId();
-            ExecutionResult result =
-                COORDINATOR.executeForTreeModel(
-                    s, queryId, sessionInfo, "", partitionFetcher, 
schemaFetcher);
-            if (result.status.getCode() != 
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
-              LOGGER.error("Failed to update the value of metric with status 
{}", result.status);
+            if (result.getCode() != 
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+              LOGGER.warn("Failed to update the value of metric with status 
{}", result);
             }
           } catch (IoTDBConnectionException e1) {
-            LOGGER.error(
+            LOGGER.warn(
                 "Failed to update the value of metric because of connection 
failure, because ", e1);
           } catch (IllegalPathException | QueryProcessException e2) {
-            LOGGER.error(
+            LOGGER.warn(
                 "Failed to update the value of metric because of internal 
error, because ", e2);
           }
         });
   }
 
+  private TSStatus insertRecord(Map<String, Object> valueMap, String prefix, 
long time)
+      throws IoTDBConnectionException, QueryProcessException, 
IllegalPathException {
+    TSInsertRecordReq request = new TSInsertRecordReq();
+    List<String> measurements = new ArrayList<>();
+    List<TSDataType> types = new ArrayList<>();
+    List<Object> values = new ArrayList<>();
+    for (Map.Entry<String, Object> entry : valueMap.entrySet()) {
+      String measurement = entry.getKey();
+      Object value = entry.getValue();
+      measurements.add(measurement);
+      types.add(inferType(value));
+      values.add(value);
+    }
+    ByteBuffer buffer = SessionUtils.getValueBuffer(types, values);
+
+    request.setPrefixPath(prefix);
+    request.setTimestamp(time);
+    request.setMeasurements(measurements);
+    request.setValues(buffer);
+    request.setIsAligned(false);
+
+    InsertRowStatement s = StatementGenerator.createStatement(request);
+    final long queryId = SESSION_MANAGER.requestQueryId();
+    ExecutionResult result =
+        COORDINATOR.executeForTreeModel(
+            s, queryId, sessionInfo, "", partitionFetcher, schemaFetcher);
+    return result.status;
+  }
+
+  private void createTimeSeries(Map<String, Object> valueMap, String prefix)
+      throws IllegalPathException {
+    TSCreateMultiTimeseriesReq request = new TSCreateMultiTimeseriesReq();
+    List<String> paths = new ArrayList<>();
+    List<Integer> types = new ArrayList<>();
+    List<Integer> encodings = new ArrayList<>();
+    List<Integer> compressors = new ArrayList<>();
+    for (Map.Entry<String, Object> entry : valueMap.entrySet()) {
+      String measurement = entry.getKey();
+      paths.add(prefix + "." + measurement);
+      TSDataType type = inferType(entry.getValue());
+      types.add(type.ordinal());
+      encodings.add((int) getDefaultEncoding(type).serialize());
+      compressors.add((int) 
TSFileDescriptor.getInstance().getConfig().getCompressor().serialize());
+    }
+    request.setPaths(paths);
+    request.setDataTypes(types);
+    request.setEncodings(encodings);
+    request.setCompressors(compressors);
+    CreateMultiTimeSeriesStatement s = 
StatementGenerator.createStatement(request);
+    final long queryId = SESSION_MANAGER.requestQueryId();
+
+    ExecutionResult result =
+        COORDINATOR.executeForTreeModel(
+            s, queryId, sessionInfo, "", partitionFetcher, schemaFetcher);
+    if (result.status.getCode() != 
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+      LOGGER.warn("Failed to auto create timeseries for {} with status {}", 
paths, result.status);
+    }
+  }
+
   @Override
   protected void writeMetricsToIoTDB(Map<String, Map<String, Object>> 
valueMap, long time) {
     for (Map.Entry<String, Map<String, Object>> value : valueMap.entrySet()) {

Reply via email to