This is an automated email from the ASF dual-hosted git repository.
tanxinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 392625f38bf Ensure the flush total points statistic function works
correctly when enable_auto_create_schema is false #12990
392625f38bf is described below
commit 392625f38bff72fd3c7979fa4f98f0db738e8cde
Author: Potato <[email protected]>
AuthorDate: Tue Jul 23 09:56:14 2024 +0800
Ensure the flush total points statistic function works correctly when
enable_auto_create_schema is false #12990
Signed-off-by: OneSizeFitQuorum <[email protected]>
---
.../metrics/IoTDBInternalLocalReporter.java | 100 +++++++++++++++------
1 file changed, 73 insertions(+), 27 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/metrics/IoTDBInternalLocalReporter.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/metrics/IoTDBInternalLocalReporter.java
index f33624fb0b2..2b6478f7959 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/metrics/IoTDBInternalLocalReporter.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/metrics/IoTDBInternalLocalReporter.java
@@ -44,16 +44,19 @@ import
org.apache.iotdb.db.queryengine.plan.analyze.schema.ISchemaFetcher;
import org.apache.iotdb.db.queryengine.plan.execution.ExecutionResult;
import org.apache.iotdb.db.queryengine.plan.parser.StatementGenerator;
import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowStatement;
+import
org.apache.iotdb.db.queryengine.plan.statement.metadata.CreateMultiTimeSeriesStatement;
import org.apache.iotdb.metrics.config.MetricConfigDescriptor;
import org.apache.iotdb.metrics.reporter.iotdb.IoTDBInternalReporter;
import org.apache.iotdb.metrics.utils.InternalReporterType;
import org.apache.iotdb.metrics.utils.ReporterType;
import org.apache.iotdb.rpc.IoTDBConnectionException;
import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.service.rpc.thrift.TSCreateMultiTimeseriesReq;
import org.apache.iotdb.service.rpc.thrift.TSInsertRecordReq;
import org.apache.iotdb.session.util.SessionUtils;
import org.apache.thrift.TException;
+import org.apache.tsfile.common.conf.TSFileDescriptor;
import org.apache.tsfile.enums.TSDataType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -69,7 +72,10 @@ import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
+import static
org.apache.iotdb.db.utils.EncodingInferenceUtils.getDefaultEncoding;
+
public class IoTDBInternalLocalReporter extends IoTDBInternalReporter {
+
private static final Logger LOGGER =
LoggerFactory.getLogger(IoTDBInternalLocalReporter.class);
private static final SessionManager SESSION_MANAGER =
SessionManager.getInstance();
private static final Coordinator COORDINATOR = Coordinator.getInstance();
@@ -159,43 +165,83 @@ public class IoTDBInternalLocalReporter extends
IoTDBInternalReporter {
service.execute(
() -> {
try {
- TSInsertRecordReq request = new TSInsertRecordReq();
- List<String> measurements = new ArrayList<>();
- List<TSDataType> types = new ArrayList<>();
- List<Object> values = new ArrayList<>();
- for (Map.Entry<String, Object> entry : valueMap.entrySet()) {
- String measurement = entry.getKey();
- Object value = entry.getValue();
- measurements.add(measurement);
- types.add(inferType(value));
- values.add(value);
+ TSStatus result = insertRecord(valueMap, prefix, time);
+ if (result.getCode() ==
TSStatusCode.METADATA_ERROR.getStatusCode()) {
+ createTimeSeries(valueMap, prefix);
+ result = insertRecord(valueMap, prefix, time);
}
- ByteBuffer buffer = SessionUtils.getValueBuffer(types, values);
-
- request.setPrefixPath(prefix);
- request.setTimestamp(time);
- request.setMeasurements(measurements);
- request.setValues(buffer);
- request.setIsAligned(false);
-
- InsertRowStatement s = StatementGenerator.createStatement(request);
- final long queryId = SESSION_MANAGER.requestQueryId();
- ExecutionResult result =
- COORDINATOR.executeForTreeModel(
- s, queryId, sessionInfo, "", partitionFetcher,
schemaFetcher);
- if (result.status.getCode() !=
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
- LOGGER.error("Failed to update the value of metric with status
{}", result.status);
+ if (result.getCode() !=
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ LOGGER.warn("Failed to update the value of metric with status
{}", result);
}
} catch (IoTDBConnectionException e1) {
- LOGGER.error(
+ LOGGER.warn(
"Failed to update the value of metric because of connection
failure, because ", e1);
} catch (IllegalPathException | QueryProcessException e2) {
- LOGGER.error(
+ LOGGER.warn(
"Failed to update the value of metric because of internal
error, because ", e2);
}
});
}
+ private TSStatus insertRecord(Map<String, Object> valueMap, String prefix,
long time)
+ throws IoTDBConnectionException, QueryProcessException,
IllegalPathException {
+ TSInsertRecordReq request = new TSInsertRecordReq();
+ List<String> measurements = new ArrayList<>();
+ List<TSDataType> types = new ArrayList<>();
+ List<Object> values = new ArrayList<>();
+ for (Map.Entry<String, Object> entry : valueMap.entrySet()) {
+ String measurement = entry.getKey();
+ Object value = entry.getValue();
+ measurements.add(measurement);
+ types.add(inferType(value));
+ values.add(value);
+ }
+ ByteBuffer buffer = SessionUtils.getValueBuffer(types, values);
+
+ request.setPrefixPath(prefix);
+ request.setTimestamp(time);
+ request.setMeasurements(measurements);
+ request.setValues(buffer);
+ request.setIsAligned(false);
+
+ InsertRowStatement s = StatementGenerator.createStatement(request);
+ final long queryId = SESSION_MANAGER.requestQueryId();
+ ExecutionResult result =
+ COORDINATOR.executeForTreeModel(
+ s, queryId, sessionInfo, "", partitionFetcher, schemaFetcher);
+ return result.status;
+ }
+
+ private void createTimeSeries(Map<String, Object> valueMap, String prefix)
+ throws IllegalPathException {
+ TSCreateMultiTimeseriesReq request = new TSCreateMultiTimeseriesReq();
+ List<String> paths = new ArrayList<>();
+ List<Integer> types = new ArrayList<>();
+ List<Integer> encodings = new ArrayList<>();
+ List<Integer> compressors = new ArrayList<>();
+ for (Map.Entry<String, Object> entry : valueMap.entrySet()) {
+ String measurement = entry.getKey();
+ paths.add(prefix + "." + measurement);
+ TSDataType type = inferType(entry.getValue());
+ types.add(type.ordinal());
+ encodings.add((int) getDefaultEncoding(type).serialize());
+ compressors.add((int)
TSFileDescriptor.getInstance().getConfig().getCompressor().serialize());
+ }
+ request.setPaths(paths);
+ request.setDataTypes(types);
+ request.setEncodings(encodings);
+ request.setCompressors(compressors);
+ CreateMultiTimeSeriesStatement s =
StatementGenerator.createStatement(request);
+ final long queryId = SESSION_MANAGER.requestQueryId();
+
+ ExecutionResult result =
+ COORDINATOR.executeForTreeModel(
+ s, queryId, sessionInfo, "", partitionFetcher, schemaFetcher);
+ if (result.status.getCode() !=
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ LOGGER.warn("Failed to auto create timeseries for {} with status {}",
paths, result.status);
+ }
+ }
+
@Override
protected void writeMetricsToIoTDB(Map<String, Map<String, Object>>
valueMap, long time) {
for (Map.Entry<String, Map<String, Object>> value : valueMap.entrySet()) {