This is an automated email from the ASF dual-hosted git repository. jfeinauer pushed a commit to branch feature/improve-monitoring-metrics in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
commit af08f6fc1864ebc13445c3a710a944ffc3ad82af Author: Julian Feinauer <[email protected]> AuthorDate: Mon Jun 1 21:35:34 2020 +0200 Add Metrics Service based on Micrometer. * Prometheus Endpoint is exposed based on Micrometer Prometheus. * Add IoTDBRegistry which stores all Metrics as IoTDB timeseries under root._metrics * Added Monitoring at several points in the programm * Added IService Implementation for new Service --- server/pom.xml | 11 + .../apache/iotdb/db/metrics2/IoTDBRegistry.java | 229 +++++++++++++++++++++ .../iotdb/db/metrics2/IoTDBRegistryConfig.java | 33 +++ .../iotdb/db/metrics2/MicrometerServerService.java | 112 ++++++++++ .../java/org/apache/iotdb/db/service/IoTDB.java | 2 + .../org/apache/iotdb/db/service/ServiceType.java | 1 + .../org/apache/iotdb/db/service/TSServiceImpl.java | 7 + .../db/writelog/node/ExclusiveWriteLogNode.java | 5 + 8 files changed, 400 insertions(+) diff --git a/server/pom.xml b/server/pom.xml index 351f6a6..c622007 100644 --- a/server/pom.xml +++ b/server/pom.xml @@ -170,6 +170,17 @@ <artifactId>oauth2-oidc-sdk</artifactId> <version>8.3</version> </dependency> + <!-- Micrometer Metrics --> + <dependency> + <groupId>io.micrometer</groupId> + <artifactId>micrometer-registry-prometheus</artifactId> + <version>1.1.11</version> + </dependency> + <dependency> + <groupId>io.github.mweirauch</groupId> + <artifactId>micrometer-jvm-extras</artifactId> + <version>0.2.0</version> + </dependency> </dependencies> <build> <plugins> diff --git a/server/src/main/java/org/apache/iotdb/db/metrics2/IoTDBRegistry.java b/server/src/main/java/org/apache/iotdb/db/metrics2/IoTDBRegistry.java new file mode 100644 index 0000000..5ca656c --- /dev/null +++ b/server/src/main/java/org/apache/iotdb/db/metrics2/IoTDBRegistry.java @@ -0,0 +1,229 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.metrics2; + +import io.micrometer.core.instrument.Clock; +import io.micrometer.core.instrument.Counter; +import io.micrometer.core.instrument.DistributionSummary; +import io.micrometer.core.instrument.FunctionCounter; +import io.micrometer.core.instrument.FunctionTimer; +import io.micrometer.core.instrument.Gauge; +import io.micrometer.core.instrument.LongTaskTimer; +import io.micrometer.core.instrument.Measurement; +import io.micrometer.core.instrument.Meter; +import io.micrometer.core.instrument.Metrics; +import io.micrometer.core.instrument.Tag; +import io.micrometer.core.instrument.Timer; +import io.micrometer.core.instrument.config.NamingConvention; +import io.micrometer.core.instrument.cumulative.CumulativeCounter; +import io.micrometer.core.instrument.cumulative.CumulativeDistributionSummary; +import io.micrometer.core.instrument.cumulative.CumulativeFunctionCounter; +import io.micrometer.core.instrument.cumulative.CumulativeFunctionTimer; +import io.micrometer.core.instrument.cumulative.CumulativeTimer; +import io.micrometer.core.instrument.distribution.DistributionStatisticConfig; +import io.micrometer.core.instrument.distribution.pause.PauseDetector; +import io.micrometer.core.instrument.internal.DefaultGauge; +import io.micrometer.core.instrument.internal.DefaultLongTaskTimer; +import io.micrometer.core.instrument.internal.DefaultMeter; +import io.micrometer.core.instrument.push.PushMeterRegistry; +import io.micrometer.core.instrument.util.NamedThreadFactory; +import org.apache.commons.lang3.NotImplementedException; +import org.apache.iotdb.db.exception.StorageEngineException; +import org.apache.iotdb.db.exception.metadata.StorageGroupNotSetException; +import org.apache.iotdb.db.exception.query.QueryProcessException; +import org.apache.iotdb.db.qp.Planner; +import org.apache.iotdb.db.qp.executor.PlanExecutor; +import org.apache.iotdb.db.qp.physical.PhysicalPlan; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; +import java.util.Locale; +import java.util.concurrent.TimeUnit; +import java.util.function.ToDoubleFunction; +import java.util.function.ToLongFunction; +import java.util.stream.Collectors; + +/** + * Registry which stores all values as IoTDB Time Series. + * All time-related values should be milliseconds. + * + * With default settings, each second all values are written. + */ +public class IoTDBRegistry extends PushMeterRegistry { + + private static final Logger logger = LoggerFactory.getLogger(IoTDBRegistry.class); + + private final PlanExecutor executor; + private final Planner planner; + + public IoTDBRegistry(IoTDBRegistryConfig config, Clock clock) { + super(config, clock); + + planner = new Planner(); + try { + executor = new PlanExecutor(); + } catch (QueryProcessException e) { + throw new RuntimeException("Unable to instantiate IoTDB Metric Backend", e); + } + + // Prepare a metric for here... + start(new NamedThreadFactory("iotdb-metrics-publisher")); + } + + @Override + protected <T> Gauge newGauge(Meter.Id id, T obj, ToDoubleFunction<T> valueFunction) { + return new DefaultGauge<>(id, obj, valueFunction); + } + + @Override + protected Counter newCounter(Meter.Id id) { + return new CumulativeCounter(id); + } + + @Override + protected LongTaskTimer newLongTaskTimer(Meter.Id id) { + return new DefaultLongTaskTimer(id, clock); + } + + @Override + protected Timer newTimer(Meter.Id id, DistributionStatisticConfig distributionStatisticConfig, PauseDetector pauseDetector) { + return new CumulativeTimer(id, clock, distributionStatisticConfig, pauseDetector, TimeUnit.MILLISECONDS); + } + + @Override + protected DistributionSummary newDistributionSummary(Meter.Id id, DistributionStatisticConfig distributionStatisticConfig, double scale) { + return new CumulativeDistributionSummary(id, clock, distributionStatisticConfig, scale, true); + } + + @Override + protected Meter newMeter(Meter.Id id, Meter.Type type, Iterable<Measurement> measurements) { + return new DefaultMeter(id, type, measurements); + } + + @Override + protected <T> FunctionTimer newFunctionTimer(Meter.Id id, T obj, ToLongFunction<T> countFunction, ToDoubleFunction<T> totalTimeFunction, TimeUnit totalTimeFunctionUnit) { + return new CumulativeFunctionTimer<T>(id, obj, countFunction, totalTimeFunction, totalTimeFunctionUnit, TimeUnit.MILLISECONDS); + } + + @Override + protected <T> FunctionCounter newFunctionCounter(Meter.Id id, T obj, ToDoubleFunction<T> countFunction) { + return new CumulativeFunctionCounter<T>(id, obj, countFunction); + } + + @Override + protected TimeUnit getBaseTimeUnit() { + return TimeUnit.MILLISECONDS; + } + + @Override + protected DistributionStatisticConfig defaultHistogramConfig() { + return DistributionStatisticConfig.DEFAULT; + } + + + @Override + protected void publish() { + Metrics.timer("metrics.write.timer").record(this::writeMetrics); + } + + private void writeMetrics() { + for (Meter meter : getMeters()) { + + + // Add this to an IoTDB Timeseries now + final String conventionName = meter.getId().getConventionName(NamingConvention.dot); + final List<Tag> conventionTags = meter.getId().getConventionTags(NamingConvention.dot); + + // Now we add this as a timeseries + final String query = meter.match( + g -> createQuery(conventionName, conventionTags, g.value()), + c -> createQuery(conventionName, conventionTags, c.count()), + t -> createQueryForTimer(conventionName, conventionTags, t), + a -> {throw new NotImplementedException("");}, + a -> {throw new NotImplementedException("");}, + tg -> createQuery(conventionName, conventionTags, tg.value(TimeUnit.MILLISECONDS)), + fc -> createQuery(conventionName, conventionTags, fc.count()), + a -> {throw new NotImplementedException("");}, + a -> {throw new NotImplementedException("");}); + + try { + final PhysicalPlan physicalPlan = planner.parseSQLToPhysicalPlan(query); + final boolean success = executor.processNonQuery(physicalPlan); + if (!success) { + logger.warn("Unable to process metrics query '{}'!", query); + } + } catch (QueryProcessException | StorageEngineException | StorageGroupNotSetException e) { + logger.error("Unable to store metrics", e); + } + } + } + + private String createQuery(String conventionName, List<Tag> conventionTags, double value) { + final String tagKeys = conventionTags.stream() + .map(Tag::getKey) + .collect(Collectors.joining(",")); + final String tagValues = conventionTags.stream() + .map(Tag::getValue) + .map(s -> "\"" + s + "\"") + .collect(Collectors.joining(",")); + + final String escapedPath = conventionName + .replace("load", "_load") + .replace("count", "_count") + .replace("time", "_time"); + + final String query; + if (tagKeys.isEmpty()) { + // In this case we use the last part as measurement + final int idx = escapedPath.lastIndexOf("."); + String path = escapedPath.substring(0, idx); + String name = escapedPath.substring(idx + 1); + query = String.format(Locale.ENGLISH, "INSERT INTO root._metrics.%s.%s (timestamp, value) VALUES (NOW(), %f)", path, name, value); + } else { + query = String.format(Locale.ENGLISH, "INSERT INTO root._metrics.%s (timestamp, %s, value) VALUES (NOW(), %s, %f)", escapedPath, tagKeys, tagValues, value); + } + return query; + } + + private String createQueryForTimer(String conventionName, List<Tag> conventionTags, Timer timer) { + final String tagKeys = conventionTags.stream() + .map(Tag::getKey) + .collect(Collectors.joining(",")); + final String tagValues = conventionTags.stream() + .map(Tag::getValue) + .map(s -> "\"" + s + "\"") + .collect(Collectors.joining(",")); + + final String escapedPath = conventionName + .replace("load", "_load") + .replace("count", "_count") + .replace("time", "_time"); + + final String query; + if (tagKeys.isEmpty()) { + query = String.format(Locale.ENGLISH, "INSERT INTO root._metrics.%s (timestamp, _count, _mean, _max, _total) VALUES (NOW(), %d, %f, %f, %f)", conventionName, timer.count(), timer.mean(TimeUnit.MILLISECONDS), timer.max(TimeUnit.MILLISECONDS), timer.totalTime(TimeUnit.MILLISECONDS)); + } else { + query = String.format(Locale.ENGLISH, "INSERT INTO root._metrics.%s (timestamp, %s, _count, _mean, _max, _total) VALUES (NOW(), %s, %d, %f, %f, %f)", escapedPath, tagKeys, tagValues, timer.count(), timer.mean(TimeUnit.MILLISECONDS), timer.max(TimeUnit.MILLISECONDS), timer.totalTime(TimeUnit.MILLISECONDS)); + } + return query; + } + +} diff --git a/server/src/main/java/org/apache/iotdb/db/metrics2/IoTDBRegistryConfig.java b/server/src/main/java/org/apache/iotdb/db/metrics2/IoTDBRegistryConfig.java new file mode 100644 index 0000000..aa46657 --- /dev/null +++ b/server/src/main/java/org/apache/iotdb/db/metrics2/IoTDBRegistryConfig.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.metrics2; + +import io.micrometer.core.instrument.push.PushRegistryConfig; + +public interface IoTDBRegistryConfig extends PushRegistryConfig { + + IoTDBRegistryConfig DEFAULT = k -> null; + + @Override + default String prefix() { + return "iotdb"; + } + +} diff --git a/server/src/main/java/org/apache/iotdb/db/metrics2/MicrometerServerService.java b/server/src/main/java/org/apache/iotdb/db/metrics2/MicrometerServerService.java new file mode 100644 index 0000000..af90e86 --- /dev/null +++ b/server/src/main/java/org/apache/iotdb/db/metrics2/MicrometerServerService.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.metrics2; + +import com.sun.net.httpserver.HttpServer; +import io.github.mweirauch.micrometer.jvm.extras.ProcessMemoryMetrics; +import io.github.mweirauch.micrometer.jvm.extras.ProcessThreadMetrics; +import io.micrometer.core.instrument.Clock; +import io.micrometer.core.instrument.MeterRegistry; +import io.micrometer.core.instrument.Metrics; +import io.micrometer.core.instrument.binder.jvm.ClassLoaderMetrics; +import io.micrometer.core.instrument.binder.jvm.JvmGcMetrics; +import io.micrometer.core.instrument.binder.jvm.JvmMemoryMetrics; +import io.micrometer.core.instrument.binder.jvm.JvmThreadMetrics; +import io.micrometer.core.instrument.binder.system.FileDescriptorMetrics; +import io.micrometer.core.instrument.binder.system.ProcessorMetrics; +import io.micrometer.core.instrument.binder.system.UptimeMetrics; +import io.micrometer.core.instrument.composite.CompositeMeterRegistry; +import io.micrometer.prometheus.PrometheusConfig; +import io.micrometer.prometheus.PrometheusMeterRegistry; +import org.apache.iotdb.db.exception.StartupException; +import org.apache.iotdb.db.service.IService; +import org.apache.iotdb.db.service.ServiceType; + +import java.io.IOException; +import java.io.OutputStream; +import java.net.InetSocketAddress; +import java.util.Arrays; + +/** + * Metrics Service that is based on micrometer. + * It does two things. + * First, exposes a Prometheus Endpoint on :8080/metrics. + * Second, it logs all collected metrics into IoTDB in the storage group root._metrics. + */ +public class MicrometerServerService implements IService { + + private static final MicrometerServerService INSTANCE = new MicrometerServerService(); + + private HttpServer server; + private final PrometheusMeterRegistry prometheusMeterRegistry; + + public MicrometerServerService() { + prometheusMeterRegistry = new PrometheusMeterRegistry(PrometheusConfig.DEFAULT); + } + + public static IService getInstance() { + return INSTANCE; + } + + @Override + public void start() throws StartupException { + // Define Meter Registry + MeterRegistry registry = new CompositeMeterRegistry(Clock.SYSTEM, + Arrays.asList(new IoTDBRegistry(IoTDBRegistryConfig.DEFAULT, Clock.SYSTEM), prometheusMeterRegistry)); + // Set this as default, then users can simply write Metrics.xxx + Metrics.addRegistry(registry); + // Wire up JVM and Other Default Bindings + new ClassLoaderMetrics().bindTo(registry); + new JvmMemoryMetrics().bindTo(registry); + new JvmGcMetrics().bindTo(registry); + new ProcessorMetrics().bindTo(registry); + new JvmThreadMetrics().bindTo(registry); + new ProcessMemoryMetrics().bindTo(registry); + new ProcessThreadMetrics().bindTo(registry); + new UptimeMetrics().bindTo(registry); + new FileDescriptorMetrics().bindTo(registry); + + // Serve an Endpoint for prometheus + try { + server = HttpServer.create(new InetSocketAddress(8080), 0); + server.createContext("/metrics", httpExchange -> { + String response = prometheusMeterRegistry.scrape(); + httpExchange.sendResponseHeaders(200, response.getBytes().length); + try (OutputStream os = httpExchange.getResponseBody()) { + os.write(response.getBytes()); + } + }); + + new Thread(server::start).start(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Override + public void stop() { + server.stop(0); + } + + @Override + public ServiceType getID() { + return ServiceType.METRICS2_SERVICE; + } +} diff --git a/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java b/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java index 0988c17..a86e18a 100644 --- a/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java +++ b/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java @@ -32,6 +32,7 @@ import org.apache.iotdb.db.engine.flush.FlushManager; import org.apache.iotdb.db.engine.merge.manage.MergeManager; import org.apache.iotdb.db.exception.StartupException; import org.apache.iotdb.db.metadata.MManager; +import org.apache.iotdb.db.metrics2.MicrometerServerService; import org.apache.iotdb.db.monitor.StatMonitor; import org.apache.iotdb.db.rescon.TVListAllocator; import org.apache.iotdb.db.sync.receiver.SyncServerManager; @@ -105,6 +106,7 @@ public class IoTDB implements IoTDBMBean { registerManager.register(UpgradeSevice.getINSTANCE()); registerManager.register(MergeManager.getINSTANCE()); registerManager.register(StorageEngine.getInstance()); + registerManager.register(MicrometerServerService.getInstance()); // When registering statMonitor, we should start recovering some statistics // with latest values stored diff --git a/server/src/main/java/org/apache/iotdb/db/service/ServiceType.java b/server/src/main/java/org/apache/iotdb/db/service/ServiceType.java index e1498bc..c0ab389 100644 --- a/server/src/main/java/org/apache/iotdb/db/service/ServiceType.java +++ b/server/src/main/java/org/apache/iotdb/db/service/ServiceType.java @@ -25,6 +25,7 @@ public enum ServiceType { STORAGE_ENGINE_SERVICE("Storage Engine ServerService", ""), JMX_SERVICE("JMX ServerService", "JMX ServerService"), METRICS_SERVICE("Metrics ServerService","MetricsService"), + METRICS2_SERVICE("Micrometer based Metrics ServerService","Metrics2Service"), RPC_SERVICE("RPC ServerService", "RPCService"), MQTT_SERVICE("MQTTService", ""), MONITOR_SERVICE("Monitor ServerService", "Monitor"), diff --git a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java index d0cff5d..d4dc0af 100644 --- a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java +++ b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java @@ -36,6 +36,7 @@ import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Collectors; +import io.micrometer.core.instrument.Metrics; import org.antlr.v4.runtime.misc.ParseCancellationException; import org.apache.iotdb.db.auth.AuthException; import org.apache.iotdb.db.auth.AuthorityChecker; @@ -178,11 +179,13 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext { IoTDBConstant.GLOBAL_DB_NAME, req.getUsername()); + boolean status; IAuthorizer authorizer; try { authorizer = BasicAuthorizer.getInstance(); } catch (AuthException e) { + Metrics.counter("open.session.request", "status", "INTERNAL_EXCEPTION").increment(); throw new TException(e); } String loginMessage = null; @@ -190,6 +193,7 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext { status = authorizer.login(req.getUsername(), req.getPassword()); } catch (AuthException e) { logger.info("meet error while logging in.", e); + Metrics.counter("open.session.request", "status", "INTERNAL_EXCEPTION").increment(); status = false; loginMessage = e.getMessage(); } @@ -205,16 +209,19 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext { TSOpenSessionResp resp = new TSOpenSessionResp(tsStatus, TSProtocolVersion.IOTDB_SERVICE_PROTOCOL_V2); resp.setSessionId(sessionId); + Metrics.counter("open.session.request", "status", "VERSION_INCOMPATIBLE").increment(); return resp; } tsStatus = RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS, "Login successfully"); + Metrics.counter("open.session.request", "status", "SUCCESS").increment(); sessionId = sessionIdGenerator.incrementAndGet(); sessionIdUsernameMap.put(sessionId, req.getUsername()); sessionIdZoneIdMap.put(sessionId, config.getZoneID()); currSessionId.set(sessionId); } else { tsStatus = RpcUtils.getStatus(TSStatusCode.WRONG_LOGIN_PASSWORD_ERROR); + Metrics.counter("open.session.request", "status", "LOGIN_FAILED").increment(); tsStatus.setMessage(loginMessage); } TSOpenSessionResp resp = new TSOpenSessionResp(tsStatus, diff --git a/server/src/main/java/org/apache/iotdb/db/writelog/node/ExclusiveWriteLogNode.java b/server/src/main/java/org/apache/iotdb/db/writelog/node/ExclusiveWriteLogNode.java index dda1a7e..397987f 100644 --- a/server/src/main/java/org/apache/iotdb/db/writelog/node/ExclusiveWriteLogNode.java +++ b/server/src/main/java/org/apache/iotdb/db/writelog/node/ExclusiveWriteLogNode.java @@ -26,6 +26,8 @@ import java.util.Arrays; import java.util.Comparator; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; +import io.micrometer.core.instrument.Counter; +import io.micrometer.core.instrument.Metrics; import org.apache.commons.io.FileUtils; import org.apache.iotdb.db.conf.IoTDBConfig; import org.apache.iotdb.db.conf.IoTDBDescriptor; @@ -64,6 +66,7 @@ public class ExclusiveWriteLogNode implements WriteLogNode, Comparable<Exclusive private long lastFlushedId = 0; private int bufferedLogNum = 0; + private final Counter syncCounter; /** * constructor of ExclusiveWriteLogNode. @@ -77,6 +80,7 @@ public class ExclusiveWriteLogNode implements WriteLogNode, Comparable<Exclusive if (SystemFileFactory.INSTANCE.getFile(logDirectory).mkdirs()) { logger.info("create the WAL folder {}." + logDirectory); } + syncCounter = Metrics.counter("wal.sync.count", "_group", identifier.substring(0, identifier.indexOf("-"))); } @Override @@ -215,6 +219,7 @@ public class ExclusiveWriteLogNode implements WriteLogNode, Comparable<Exclusive } private void sync() { + syncCounter.increment(); lock.writeLock().lock(); try { if (bufferedLogNum == 0) {
