This is an automated email from the ASF dual-hosted git repository. jackietien pushed a commit to branch ty/TableModelGrammar in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 5f8197e04aa100f3c59a4c177ab93f9496c702c6 Author: Potato <[email protected]> AuthorDate: Tue Jul 23 09:56:14 2024 +0800 Ensure the flush total points statistic function works correctly when enable_auto_create_schema is false #12990 Signed-off-by: OneSizeFitQuorum <[email protected]> (cherry picked from commit 392625f38bff72fd3c7979fa4f98f0db738e8cde) --- .../metrics/IoTDBInternalLocalReporter.java | 100 +++++++++++++++------ 1 file changed, 73 insertions(+), 27 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/metrics/IoTDBInternalLocalReporter.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/metrics/IoTDBInternalLocalReporter.java index f33624fb0b2..2b6478f7959 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/metrics/IoTDBInternalLocalReporter.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/metrics/IoTDBInternalLocalReporter.java @@ -44,16 +44,19 @@ import org.apache.iotdb.db.queryengine.plan.analyze.schema.ISchemaFetcher; import org.apache.iotdb.db.queryengine.plan.execution.ExecutionResult; import org.apache.iotdb.db.queryengine.plan.parser.StatementGenerator; import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowStatement; +import org.apache.iotdb.db.queryengine.plan.statement.metadata.CreateMultiTimeSeriesStatement; import org.apache.iotdb.metrics.config.MetricConfigDescriptor; import org.apache.iotdb.metrics.reporter.iotdb.IoTDBInternalReporter; import org.apache.iotdb.metrics.utils.InternalReporterType; import org.apache.iotdb.metrics.utils.ReporterType; import org.apache.iotdb.rpc.IoTDBConnectionException; import org.apache.iotdb.rpc.TSStatusCode; +import org.apache.iotdb.service.rpc.thrift.TSCreateMultiTimeseriesReq; import org.apache.iotdb.service.rpc.thrift.TSInsertRecordReq; import org.apache.iotdb.session.util.SessionUtils; import org.apache.thrift.TException; +import org.apache.tsfile.common.conf.TSFileDescriptor; import org.apache.tsfile.enums.TSDataType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -69,7 +72,10 @@ import java.util.concurrent.Future; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import static org.apache.iotdb.db.utils.EncodingInferenceUtils.getDefaultEncoding; + public class IoTDBInternalLocalReporter extends IoTDBInternalReporter { + private static final Logger LOGGER = LoggerFactory.getLogger(IoTDBInternalLocalReporter.class); private static final SessionManager SESSION_MANAGER = SessionManager.getInstance(); private static final Coordinator COORDINATOR = Coordinator.getInstance(); @@ -159,43 +165,83 @@ public class IoTDBInternalLocalReporter extends IoTDBInternalReporter { service.execute( () -> { try { - TSInsertRecordReq request = new TSInsertRecordReq(); - List<String> measurements = new ArrayList<>(); - List<TSDataType> types = new ArrayList<>(); - List<Object> values = new ArrayList<>(); - for (Map.Entry<String, Object> entry : valueMap.entrySet()) { - String measurement = entry.getKey(); - Object value = entry.getValue(); - measurements.add(measurement); - types.add(inferType(value)); - values.add(value); + TSStatus result = insertRecord(valueMap, prefix, time); + if (result.getCode() == TSStatusCode.METADATA_ERROR.getStatusCode()) { + createTimeSeries(valueMap, prefix); + result = insertRecord(valueMap, prefix, time); } - ByteBuffer buffer = SessionUtils.getValueBuffer(types, values); - - request.setPrefixPath(prefix); - request.setTimestamp(time); - request.setMeasurements(measurements); - request.setValues(buffer); - request.setIsAligned(false); - - InsertRowStatement s = StatementGenerator.createStatement(request); - final long queryId = SESSION_MANAGER.requestQueryId(); - ExecutionResult result = - COORDINATOR.executeForTreeModel( - s, queryId, sessionInfo, "", partitionFetcher, schemaFetcher); - if (result.status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { - LOGGER.error("Failed to update the value of metric with status {}", result.status); + if (result.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + LOGGER.warn("Failed to update the value of metric with status {}", result); } } catch (IoTDBConnectionException e1) { - LOGGER.error( + LOGGER.warn( "Failed to update the value of metric because of connection failure, because ", e1); } catch (IllegalPathException | QueryProcessException e2) { - LOGGER.error( + LOGGER.warn( "Failed to update the value of metric because of internal error, because ", e2); } }); } + private TSStatus insertRecord(Map<String, Object> valueMap, String prefix, long time) + throws IoTDBConnectionException, QueryProcessException, IllegalPathException { + TSInsertRecordReq request = new TSInsertRecordReq(); + List<String> measurements = new ArrayList<>(); + List<TSDataType> types = new ArrayList<>(); + List<Object> values = new ArrayList<>(); + for (Map.Entry<String, Object> entry : valueMap.entrySet()) { + String measurement = entry.getKey(); + Object value = entry.getValue(); + measurements.add(measurement); + types.add(inferType(value)); + values.add(value); + } + ByteBuffer buffer = SessionUtils.getValueBuffer(types, values); + + request.setPrefixPath(prefix); + request.setTimestamp(time); + request.setMeasurements(measurements); + request.setValues(buffer); + request.setIsAligned(false); + + InsertRowStatement s = StatementGenerator.createStatement(request); + final long queryId = SESSION_MANAGER.requestQueryId(); + ExecutionResult result = + COORDINATOR.executeForTreeModel( + s, queryId, sessionInfo, "", partitionFetcher, schemaFetcher); + return result.status; + } + + private void createTimeSeries(Map<String, Object> valueMap, String prefix) + throws IllegalPathException { + TSCreateMultiTimeseriesReq request = new TSCreateMultiTimeseriesReq(); + List<String> paths = new ArrayList<>(); + List<Integer> types = new ArrayList<>(); + List<Integer> encodings = new ArrayList<>(); + List<Integer> compressors = new ArrayList<>(); + for (Map.Entry<String, Object> entry : valueMap.entrySet()) { + String measurement = entry.getKey(); + paths.add(prefix + "." + measurement); + TSDataType type = inferType(entry.getValue()); + types.add(type.ordinal()); + encodings.add((int) getDefaultEncoding(type).serialize()); + compressors.add((int) TSFileDescriptor.getInstance().getConfig().getCompressor().serialize()); + } + request.setPaths(paths); + request.setDataTypes(types); + request.setEncodings(encodings); + request.setCompressors(compressors); + CreateMultiTimeSeriesStatement s = StatementGenerator.createStatement(request); + final long queryId = SESSION_MANAGER.requestQueryId(); + + ExecutionResult result = + COORDINATOR.executeForTreeModel( + s, queryId, sessionInfo, "", partitionFetcher, schemaFetcher); + if (result.status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + LOGGER.warn("Failed to auto create timeseries for {} with status {}", paths, result.status); + } + } + @Override protected void writeMetricsToIoTDB(Map<String, Map<String, Object>> valueMap, long time) { for (Map.Entry<String, Map<String, Object>> value : valueMap.entrySet()) {
