This is an automated email from the ASF dual-hosted git repository. tanxinyu pushed a commit to branch jira5725 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit c002ff4c5042a1a711d2a9394ac9661b972131ad Author: OneSizeFitQuorum <[email protected]> AuthorDate: Fri Mar 24 12:35:05 2023 +0800 finish Signed-off-by: OneSizeFitQuorum <[email protected]> --- .../metrics/IoTDBInternalLocalReporter.java | 66 ++++++++++++---------- 1 file changed, 35 insertions(+), 31 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/service/metrics/IoTDBInternalLocalReporter.java b/server/src/main/java/org/apache/iotdb/db/service/metrics/IoTDBInternalLocalReporter.java index a275bbad6a..a634b678a0 100644 --- a/server/src/main/java/org/apache/iotdb/db/service/metrics/IoTDBInternalLocalReporter.java +++ b/server/src/main/java/org/apache/iotdb/db/service/metrics/IoTDBInternalLocalReporter.java @@ -150,39 +150,43 @@ public class IoTDBInternalLocalReporter extends IoTDBInternalReporter { @Override protected void writeMetricToIoTDB(Map<String, Object> valueMap, String prefix, long time) { - try { - TSInsertRecordReq request = new TSInsertRecordReq(); - List<String> measurements = new ArrayList<>(); - List<TSDataType> types = new ArrayList<>(); - List<Object> values = new ArrayList<>(); - for (Map.Entry<String, Object> entry : valueMap.entrySet()) { - String measurement = entry.getKey(); - Object value = entry.getValue(); - measurements.add(measurement); - types.add(inferType(value)); - values.add(value); - } - ByteBuffer buffer = SessionUtils.getValueBuffer(types, values); + service.execute( + () -> { + try { + TSInsertRecordReq request = new TSInsertRecordReq(); + List<String> measurements = new ArrayList<>(); + List<TSDataType> types = new ArrayList<>(); + List<Object> values = new ArrayList<>(); + for (Map.Entry<String, Object> entry : valueMap.entrySet()) { + String measurement = entry.getKey(); + Object value = entry.getValue(); + measurements.add(measurement); + types.add(inferType(value)); + values.add(value); + } + ByteBuffer buffer = SessionUtils.getValueBuffer(types, values); - request.setPrefixPath(prefix); - request.setTimestamp(time); - request.setMeasurements(measurements); - request.setValues(buffer); - request.setIsAligned(false); + request.setPrefixPath(prefix); + request.setTimestamp(time); + request.setMeasurements(measurements); + request.setValues(buffer); + request.setIsAligned(false); - InsertRowStatement s = StatementGenerator.createStatement(request); - final long queryId = SESSION_MANAGER.requestQueryId(); - ExecutionResult result = - COORDINATOR.execute(s, queryId, sessionInfo, "", partitionFetcher, schemaFetcher); - if (result.status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { - LOGGER.error("Failed to update the value of metric with status {}", result.status); - } - } catch (IoTDBConnectionException e1) { - LOGGER.error( - "Failed to update the value of metric because of connection failure, because ", e1); - } catch (IllegalPathException | QueryProcessException e2) { - LOGGER.error("Failed to update the value of metric because of internal error, because ", e2); - } + InsertRowStatement s = StatementGenerator.createStatement(request); + final long queryId = SESSION_MANAGER.requestQueryId(); + ExecutionResult result = + COORDINATOR.execute(s, queryId, sessionInfo, "", partitionFetcher, schemaFetcher); + if (result.status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + LOGGER.error("Failed to update the value of metric with status {}", result.status); + } + } catch (IoTDBConnectionException e1) { + LOGGER.error( + "Failed to update the value of metric because of connection failure, because ", e1); + } catch (IllegalPathException | QueryProcessException e2) { + LOGGER.error( + "Failed to update the value of metric because of internal error, because ", e2); + } + }); } @Override
