Repository: accumulo Updated Branches: refs/heads/metrics2-backwardscompat [created] 6bc63eb0b
First attempt at getting existing metrics and metrics2 working side-by-side. Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/c4aea2d6 Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/c4aea2d6 Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/c4aea2d6 Branch: refs/heads/metrics2-backwardscompat Commit: c4aea2d6f4bc350598334912cbe7b1d8bec0fb27 Parents: 4fb52f9 Author: Josh Elser <[email protected]> Authored: Wed Dec 3 23:33:14 2014 -0500 Committer: Josh Elser <[email protected]> Committed: Wed Dec 3 23:33:14 2014 -0500 ---------------------------------------------------------------------- .../org/apache/accumulo/core/conf/Property.java | 2 + .../server/metrics/AbstractMetricsImpl.java | 91 ++++---- .../apache/accumulo/server/metrics/Metrics.java | 28 +++ .../server/metrics/Metrics2ThriftMetrics.java | 63 ++++++ .../accumulo/server/metrics/MetricsFactory.java | 54 +++++ .../accumulo/server/util/TServerUtils.java | 18 +- .../accumulo/gc/SimpleGarbageCollector.java | 2 +- .../java/org/apache/accumulo/master/Master.java | 1 + .../master/metrics/MasterMetricsFactory.java | 58 ++++++ .../metrics/Metrics2ReplicationMetrics.java | 125 +++++++++++ .../master/metrics/ReplicationMetrics.java | 13 +- .../apache/accumulo/tserver/TabletServer.java | 61 +++--- .../metrics/Metrics2TabletServerMetrics.java | 208 +++++++++++++++++++ .../Metrics2TabletServerMinCMetrics.java | 62 ++++++ .../Metrics2TabletServerScanMetrics.java | 62 ++++++ .../Metrics2TabletServerUpdateMetrics.java | 62 ++++++ .../tserver/metrics/TabletServerMBeanImpl.java | 23 +- .../metrics/TabletServerMetricsFactory.java | 79 +++++++ .../apache/accumulo/tserver/tablet/Tablet.java | 3 +- .../accumulo/test/functional/ZombieTServer.java | 2 +- .../test/performance/thrift/NullTserver.java | 10 +- 21 files changed, 927 insertions(+), 100 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/c4aea2d6/core/src/main/java/org/apache/accumulo/core/conf/Property.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java b/core/src/main/java/org/apache/accumulo/core/conf/Property.java index c2c3587..cc7d548 100644 --- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java +++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java @@ -168,6 +168,8 @@ public enum Property { GENERAL_VOLUME_CHOOSER("general.volume.chooser", "org.apache.accumulo.server.fs.RandomVolumeChooser", PropertyType.CLASSNAME, "The class that will be used to select which volume will be used to create new files."), GENERAL_SECURITY_CREDENTIAL_PROVIDER_PATHS("general.security.credential.provider.paths", "", PropertyType.STRING, "Comma-separated list of paths to CredentialProviders"), + GENERAL_LEGACY_METRICS("general.legacy.metrics", "false", PropertyType.BOOLEAN, + "Use the old metric infrastructure configured by accumulo-metrics.xml, instead of Hadoop Metrics2"), // properties that are specific to master server behavior MASTER_PREFIX("master.", null, PropertyType.PREFIX, "Properties in this category affect the behavior of the master server"), http://git-wip-us.apache.org/repos/asf/accumulo/blob/c4aea2d6/server/base/src/main/java/org/apache/accumulo/server/metrics/AbstractMetricsImpl.java ---------------------------------------------------------------------- diff --git a/server/base/src/main/java/org/apache/accumulo/server/metrics/AbstractMetricsImpl.java b/server/base/src/main/java/org/apache/accumulo/server/metrics/AbstractMetricsImpl.java index 54ca8de..657fc31 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/metrics/AbstractMetricsImpl.java +++ b/server/base/src/main/java/org/apache/accumulo/server/metrics/AbstractMetricsImpl.java @@ -36,87 +36,87 @@ import org.apache.commons.lang.builder.ToStringBuilder; import org.apache.commons.lang.builder.ToStringStyle; import org.apache.commons.lang.time.DateUtils; -public abstract class AbstractMetricsImpl { - +public abstract class AbstractMetricsImpl implements Metrics { + public class Metric { - + private long count = 0; private long avg = 0; private long min = 0; private long max = 0; - + public long getCount() { return count; } - + public long getAvg() { return avg; } - + public long getMin() { return min; } - + public long getMax() { return max; } - + public void incCount() { count++; } - + public void addAvg(long a) { if (a < 0) return; avg = (long) ((avg * .8) + (a * .2)); } - + public void addMin(long a) { if (a < 0) return; min = Math.min(min, a); } - + public void addMax(long a) { if (a < 0) return; max = Math.max(max, a); } - + @Override public String toString() { return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE).append("count", count).append("average", avg).append("minimum", min).append("maximum", max).toString(); } - + } - + static final org.apache.log4j.Logger log = org.apache.log4j.Logger.getLogger(AbstractMetricsImpl.class); - + private static ConcurrentHashMap<String,Metric> registry = new ConcurrentHashMap<String,Metric>(); - + private boolean currentlyLogging = false; - + private File logDir = null; - + private String metricsPrefix = null; - + private Date today = new Date(); - + private File logFile = null; - + private Writer logWriter = null; - + private SimpleDateFormat formatter = new SimpleDateFormat("yyyyMMdd"); - + private SimpleDateFormat logFormatter = new SimpleDateFormat("yyyyMMddhhmmssz"); - + private MetricsConfiguration config = null; - + public AbstractMetricsImpl() { this.metricsPrefix = getMetricsPrefix(); config = new MetricsConfiguration(metricsPrefix); } - + /** * Registers a StandardMBean with the MBean Server */ @@ -126,13 +126,14 @@ public abstract class AbstractMetricsImpl { if (null == getObjectName()) throw new IllegalArgumentException("MBean object name must be set."); mbs.registerMBean(mbean, getObjectName()); - + setupLogging(); } - + /** * Registers this MBean with the MBean Server */ + @Override public void register() throws Exception { // Register this object with the MBeanServer MBeanServer mbs = ManagementFactory.getPlatformMBeanServer(); @@ -141,31 +142,31 @@ public abstract class AbstractMetricsImpl { mbs.registerMBean(this, getObjectName()); setupLogging(); } - + public void createMetric(String name) { registry.put(name, new Metric()); } - + public Metric getMetric(String name) { return registry.get(name); } - + public long getMetricCount(String name) { return registry.get(name).getCount(); } - + public long getMetricAvg(String name) { return registry.get(name).getAvg(); } - + public long getMetricMin(String name) { return registry.get(name).getMin(); } - + public long getMetricMax(String name) { return registry.get(name).getMax(); } - + private void setupLogging() throws IOException { if (null == config.getMetricsConfiguration()) return; @@ -176,7 +177,7 @@ public abstract class AbstractMetricsImpl { if (null != mDir) { File dir = new File(mDir); if (!dir.isDirectory()) - if (!dir.mkdir()) + if (!dir.mkdir()) log.warn("Could not create log directory: " + dir); logDir = dir; // Create new log file @@ -185,7 +186,7 @@ public abstract class AbstractMetricsImpl { currentlyLogging = true; } } - + private void startNewLog() throws IOException { if (null != logWriter) { logWriter.flush(); @@ -201,7 +202,7 @@ public abstract class AbstractMetricsImpl { } logWriter = new OutputStreamWriter(new FileOutputStream(logFile, true), UTF_8); } - + private void writeToLog(String name) throws IOException { if (null == logWriter) return; @@ -213,7 +214,8 @@ public abstract class AbstractMetricsImpl { } logWriter.append(logFormatter.format(now)).append(" Metric: ").append(name).append(": ").append(registry.get(name).toString()).append("\n"); } - + + @Override public void add(String name, long time) { if (isEnabled()) { registry.get(name).incCount(); @@ -248,15 +250,16 @@ public abstract class AbstractMetricsImpl { } } } - + + @Override public boolean isEnabled() { return config.isEnabled(); } - + protected abstract ObjectName getObjectName(); - + protected abstract String getMetricsPrefix(); - + @Override protected void finalize() { if (null != logWriter) { @@ -270,5 +273,5 @@ public abstract class AbstractMetricsImpl { } logFile = null; } - + } http://git-wip-us.apache.org/repos/asf/accumulo/blob/c4aea2d6/server/base/src/main/java/org/apache/accumulo/server/metrics/Metrics.java ---------------------------------------------------------------------- diff --git a/server/base/src/main/java/org/apache/accumulo/server/metrics/Metrics.java b/server/base/src/main/java/org/apache/accumulo/server/metrics/Metrics.java new file mode 100644 index 0000000..6c79b28 --- /dev/null +++ b/server/base/src/main/java/org/apache/accumulo/server/metrics/Metrics.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.server.metrics; + +/** + * + */ +public interface Metrics { + void register() throws Exception; + + void add(String name, long time); + + boolean isEnabled(); +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/c4aea2d6/server/base/src/main/java/org/apache/accumulo/server/metrics/Metrics2ThriftMetrics.java ---------------------------------------------------------------------- diff --git a/server/base/src/main/java/org/apache/accumulo/server/metrics/Metrics2ThriftMetrics.java b/server/base/src/main/java/org/apache/accumulo/server/metrics/Metrics2ThriftMetrics.java new file mode 100644 index 0000000..93a1b70 --- /dev/null +++ b/server/base/src/main/java/org/apache/accumulo/server/metrics/Metrics2ThriftMetrics.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.server.metrics; + +import org.apache.hadoop.metrics2.MetricsCollector; +import org.apache.hadoop.metrics2.MetricsRecordBuilder; +import org.apache.hadoop.metrics2.MetricsSource; +import org.apache.hadoop.metrics2.MetricsSystem; +import org.apache.hadoop.metrics2.lib.Interns; +import org.apache.hadoop.metrics2.lib.MetricsRegistry; + +/** + * + */ +public class Metrics2ThriftMetrics implements Metrics, MetricsSource { + + private final MetricsSystem system; + private final MetricsRegistry registry; + private final String name, desc; + + public Metrics2ThriftMetrics(MetricsSystem system, String serverName, String threadName) { + this.system = system; + this.name = "ThriftMetrics" + serverName; + this.desc = "Thrift Server Metrics - " + serverName + " " + threadName; + this.registry = new MetricsRegistry(Interns.info(name, desc)); + } + + @Override + public void add(String name, long time) { + registry.add(name, time); + } + + @Override + public void register() { + system.register(name, desc, this); + } + + @Override + public boolean isEnabled() { + return true; + } + + @Override + public void getMetrics(MetricsCollector collector, boolean all) { + MetricsRecordBuilder builder = collector.addRecord("Accumulo").setContext("thrift"); + + registry.snapshot(builder, all); + } +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/c4aea2d6/server/base/src/main/java/org/apache/accumulo/server/metrics/MetricsFactory.java ---------------------------------------------------------------------- diff --git a/server/base/src/main/java/org/apache/accumulo/server/metrics/MetricsFactory.java b/server/base/src/main/java/org/apache/accumulo/server/metrics/MetricsFactory.java new file mode 100644 index 0000000..6a5a539 --- /dev/null +++ b/server/base/src/main/java/org/apache/accumulo/server/metrics/MetricsFactory.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.server.metrics; + +import org.apache.accumulo.core.conf.AccumuloConfiguration; +import org.apache.accumulo.core.conf.Property; +import org.apache.hadoop.metrics2.MetricsSystem; +import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; + +import com.google.common.base.Preconditions; + +/** + * + */ +public class MetricsFactory { + public static final String METRICS_PREFIX = "Accumulo"; + + private final boolean useOldMetrics; + private final MetricsSystem metricsSystem; + + public MetricsFactory(AccumuloConfiguration conf) { + Preconditions.checkNotNull(conf); + useOldMetrics = conf.getBoolean(Property.GENERAL_LEGACY_METRICS); + + if (useOldMetrics) { + metricsSystem = null; + } else { + metricsSystem = DefaultMetricsSystem.initialize(METRICS_PREFIX); + } + } + + public Metrics createThriftMetrics(String serverName, String threadName) { + if (useOldMetrics) { + return new ThriftMetrics(serverName, threadName); + } + + return new Metrics2ThriftMetrics(metricsSystem, serverName, threadName); + } + +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/c4aea2d6/server/base/src/main/java/org/apache/accumulo/server/util/TServerUtils.java ---------------------------------------------------------------------- diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/TServerUtils.java b/server/base/src/main/java/org/apache/accumulo/server/util/TServerUtils.java index d30f101..f1156d4 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/util/TServerUtils.java +++ b/server/base/src/main/java/org/apache/accumulo/server/util/TServerUtils.java @@ -28,6 +28,7 @@ import java.util.Random; import java.util.concurrent.ExecutorService; import java.util.concurrent.ThreadPoolExecutor; +import org.apache.accumulo.core.conf.AccumuloConfiguration; import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.util.Daemon; import org.apache.accumulo.core.util.LoggingRunnable; @@ -37,6 +38,8 @@ import org.apache.accumulo.core.util.TBufferedSocket; import org.apache.accumulo.core.util.ThriftUtil; import org.apache.accumulo.core.util.UtilWaitThread; import org.apache.accumulo.server.AccumuloServerContext; +import org.apache.accumulo.server.metrics.Metrics; +import org.apache.accumulo.server.metrics.MetricsFactory; import org.apache.accumulo.server.metrics.ThriftMetrics; import org.apache.accumulo.server.util.time.SimpleTimer; import org.apache.log4j.Logger; @@ -70,7 +73,7 @@ public class TServerUtils { /** * Start a server, at the given port, or higher, if that port is not available. - * + * * @param portHintProperty * the port to attempt to open, can be zero, meaning "any available port" * @param processor @@ -100,7 +103,7 @@ public class TServerUtils { if (portSearchProperty != null) portSearch = service.getConfiguration().getBoolean(portSearchProperty); // create the TimedProcessor outside the port search loop so we don't try to register the same metrics mbean more than once - TServerUtils.TimedProcessor timedProcessor = new TServerUtils.TimedProcessor(processor, serverName, threadName); + TServerUtils.TimedProcessor timedProcessor = new TServerUtils.TimedProcessor(service.getConfiguration(), processor, serverName, threadName); Random random = new Random(); for (int j = 0; j < 100; j++) { @@ -143,14 +146,15 @@ public class TServerUtils { public static class TimedProcessor implements TProcessor { final TProcessor other; - ThriftMetrics metrics = null; + Metrics metrics = null; long idleStart = 0; - TimedProcessor(TProcessor next, String serverName, String threadName) { + TimedProcessor(AccumuloConfiguration conf, TProcessor next, String serverName, String threadName) { this.other = next; // Register the metrics MBean + MetricsFactory factory = new MetricsFactory(conf); + metrics = factory.createThriftMetrics(serverName, threadName); try { - metrics = new ThriftMetrics(serverName, threadName); metrics.register(); } catch (Exception e) { log.error("Exception registering MBean with MBean Server", e); @@ -280,9 +284,9 @@ public class TServerUtils { return new ServerAddress(createThreadPoolServer(transport, processor), address); } - public static ServerAddress startTServer(HostAndPort address, TProcessor processor, String serverName, String threadName, int numThreads, int numSTThreads, + public static ServerAddress startTServer(AccumuloConfiguration conf, HostAndPort address, TProcessor processor, String serverName, String threadName, int numThreads, int numSTThreads, long timeBetweenThreadChecks, long maxMessageSize, SslConnectionParams sslParams, long sslSocketTimeout) throws TTransportException { - return startTServer(address, new TimedProcessor(processor, serverName, threadName), serverName, threadName, numThreads, numSTThreads, + return startTServer(address, new TimedProcessor(conf, processor, serverName, threadName), serverName, threadName, numThreads, numSTThreads, timeBetweenThreadChecks, maxMessageSize, sslParams, sslSocketTimeout); } http://git-wip-us.apache.org/repos/asf/accumulo/blob/c4aea2d6/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java ---------------------------------------------------------------------- diff --git a/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java b/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java index a223eb9..371c94d 100644 --- a/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java +++ b/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java @@ -716,7 +716,7 @@ public class SimpleGarbageCollector extends AccumuloServerContext implements Ifa HostAndPort result = HostAndPort.fromParts(opts.getAddress(), port); log.debug("Starting garbage collector listening on " + result); try { - return TServerUtils.startTServer(result, processor, this.getClass().getSimpleName(), "GC Monitor Service", 2, + return TServerUtils.startTServer(getConfiguration(), result, processor, this.getClass().getSimpleName(), "GC Monitor Service", 2, getConfiguration().getCount(Property.GENERAL_SIMPLETIMER_THREADPOOL_SIZE), 1000, maxMessageSize, getServerSslParams(), 0).address; } catch (Exception ex) { log.fatal(ex, ex); http://git-wip-us.apache.org/repos/asf/accumulo/blob/c4aea2d6/server/master/src/main/java/org/apache/accumulo/master/Master.java ---------------------------------------------------------------------- diff --git a/server/master/src/main/java/org/apache/accumulo/master/Master.java b/server/master/src/main/java/org/apache/accumulo/master/Master.java index d5d5145..9d54e98 100644 --- a/server/master/src/main/java/org/apache/accumulo/master/Master.java +++ b/server/master/src/main/java/org/apache/accumulo/master/Master.java @@ -1126,6 +1126,7 @@ public class Master extends AccumuloServerContext implements LiveTServerSet.List ZooReaderWriter.getInstance().putPersistentData(ZooUtil.getRoot(getInstance()) + Constants.ZMASTER_REPLICATION_COORDINATOR_ADDR, replAddress.address.toString().getBytes(UTF_8), NodeExistsPolicy.OVERWRITE); + try { ReplicationMetrics beanImpl = new ReplicationMetrics(this); beanImpl.register(); http://git-wip-us.apache.org/repos/asf/accumulo/blob/c4aea2d6/server/master/src/main/java/org/apache/accumulo/master/metrics/MasterMetricsFactory.java ---------------------------------------------------------------------- diff --git a/server/master/src/main/java/org/apache/accumulo/master/metrics/MasterMetricsFactory.java b/server/master/src/main/java/org/apache/accumulo/master/metrics/MasterMetricsFactory.java new file mode 100644 index 0000000..606b85c --- /dev/null +++ b/server/master/src/main/java/org/apache/accumulo/master/metrics/MasterMetricsFactory.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.master.metrics; + +import org.apache.accumulo.core.conf.AccumuloConfiguration; +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.master.Master; +import org.apache.accumulo.server.metrics.Metrics; +import org.apache.hadoop.metrics2.MetricsSystem; +import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; + +import com.google.common.base.Preconditions; + +/** + * + */ +public class MasterMetricsFactory { + public static final String METRICS_PREFIX = "Accumulo"; + + private final boolean useOldMetrics; + private final MetricsSystem metricsSystem; + private final Master master; + + public MasterMetricsFactory(AccumuloConfiguration conf, Master master) { + Preconditions.checkNotNull(conf); + useOldMetrics = conf.getBoolean(Property.GENERAL_LEGACY_METRICS); + this.master = master; + + if (useOldMetrics) { + metricsSystem = null; + } else { + metricsSystem = DefaultMetricsSystem.initialize(METRICS_PREFIX); + } + } + + public Metrics createReplicationMetrics() { + if (useOldMetrics) { + return new ReplicationMetrics(master); + } + + return new Metrics2ReplicationMetrics(master, metricsSystem); + } + +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/c4aea2d6/server/master/src/main/java/org/apache/accumulo/master/metrics/Metrics2ReplicationMetrics.java ---------------------------------------------------------------------- diff --git a/server/master/src/main/java/org/apache/accumulo/master/metrics/Metrics2ReplicationMetrics.java b/server/master/src/main/java/org/apache/accumulo/master/metrics/Metrics2ReplicationMetrics.java new file mode 100644 index 0000000..374d998 --- /dev/null +++ b/server/master/src/main/java/org/apache/accumulo/master/metrics/Metrics2ReplicationMetrics.java @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.master.metrics; + +import java.util.Map; +import java.util.Set; + +import org.apache.accumulo.core.client.impl.Tables; +import org.apache.accumulo.core.master.state.tables.TableState; +import org.apache.accumulo.core.replication.ReplicationTable; +import org.apache.accumulo.core.replication.ReplicationTarget; +import org.apache.accumulo.master.Master; +import org.apache.accumulo.server.metrics.Metrics; +import org.apache.accumulo.server.replication.ReplicationUtil; +import org.apache.hadoop.metrics2.MetricsCollector; +import org.apache.hadoop.metrics2.MetricsRecordBuilder; +import org.apache.hadoop.metrics2.MetricsSource; +import org.apache.hadoop.metrics2.MetricsSystem; +import org.apache.hadoop.metrics2.lib.Interns; +import org.apache.hadoop.metrics2.lib.MetricsRegistry; + +/** + * + */ +public class Metrics2ReplicationMetrics implements Metrics, MetricsSource { + public static final String PENDING_FILES = "filesPendingReplication", NUM_PEERS = "numPeers", MAX_REPLICATION_THREADS = "maxReplicationThreads"; + + private final Master master; + private final MetricsSystem system; + private final MetricsRegistry registry; + private final ReplicationUtil replicationUtil; + + public Metrics2ReplicationMetrics(Master master, MetricsSystem system) { + this.master = master; + this.system = system; + + registry = new MetricsRegistry(Interns.info("ReplicationMetrics", "Data-Center Replication Metrics")); + replicationUtil = new ReplicationUtil(master); + } + + protected void snapshot() { + registry.add(PENDING_FILES, getNumFilesPendingReplication()); + registry.add(NUM_PEERS, getNumConfiguredPeers()); + registry.add(MAX_REPLICATION_THREADS, getMaxReplicationThreads()); + } + + @Override + public void getMetrics(MetricsCollector collector, boolean all) { + MetricsRecordBuilder builder = collector.addRecord("Accumulo").setContext("replication"); + + snapshot(); + + registry.snapshot(builder, all); + } + + @Override + public void register() throws Exception { + system.register("ReplicationMetrics", "Data-Center Replication Metrics", this); + } + + @Override + public void add(String name, long time) { + throw new UnsupportedOperationException("add() is not implemented"); + } + + @Override + public boolean isEnabled() { + return true; + } + + protected int getNumFilesPendingReplication() { + if (TableState.ONLINE != Tables.getTableState(master.getInstance(), ReplicationTable.ID)) { + return 0; + } + + // Get all of the configured replication peers + Map<String,String> peers = replicationUtil.getPeers(); + + // A quick lookup to see if have any replication peer configured + if (peers.isEmpty()) { + return 0; + } + + // The total set of configured targets + Set<ReplicationTarget> allConfiguredTargets = replicationUtil.getReplicationTargets(); + + // Number of files per target we have to replicate + Map<ReplicationTarget,Long> targetCounts = replicationUtil.getPendingReplications(); + + int filesPending = 0; + + // Sum pending replication over all targets + for (ReplicationTarget configuredTarget : allConfiguredTargets) { + Long numFiles = targetCounts.get(configuredTarget); + + if (null != numFiles) { + filesPending += numFiles; + } + } + + return filesPending; + } + + protected int getNumConfiguredPeers() { + return replicationUtil.getPeers().size(); + } + + protected int getMaxReplicationThreads() { + return replicationUtil.getMaxReplicationThreads(master.getMasterMonitorInfo()); + } +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/c4aea2d6/server/master/src/main/java/org/apache/accumulo/master/metrics/ReplicationMetrics.java ---------------------------------------------------------------------- diff --git a/server/master/src/main/java/org/apache/accumulo/master/metrics/ReplicationMetrics.java b/server/master/src/main/java/org/apache/accumulo/master/metrics/ReplicationMetrics.java index 39112d0..5a3b2aa 100644 --- a/server/master/src/main/java/org/apache/accumulo/master/metrics/ReplicationMetrics.java +++ b/server/master/src/main/java/org/apache/accumulo/master/metrics/ReplicationMetrics.java @@ -19,11 +19,8 @@ package org.apache.accumulo.master.metrics; import java.util.Map; import java.util.Set; -import javax.management.MalformedObjectNameException; import javax.management.ObjectName; -import org.apache.accumulo.core.client.AccumuloException; -import org.apache.accumulo.core.client.AccumuloSecurityException; import org.apache.accumulo.core.client.impl.Tables; import org.apache.accumulo.core.master.state.tables.TableState; import org.apache.accumulo.core.replication.ReplicationTable; @@ -31,21 +28,27 @@ import org.apache.accumulo.core.replication.ReplicationTarget; import org.apache.accumulo.master.Master; import org.apache.accumulo.server.metrics.AbstractMetricsImpl; import org.apache.accumulo.server.replication.ReplicationUtil; +import org.apache.log4j.Logger; /** * JMX bindings to expose 'high-level' metrics about Replication */ public class ReplicationMetrics extends AbstractMetricsImpl implements ReplicationMetricsMBean { + private static final Logger log = Logger.getLogger(ReplicationMetrics.class); private static final String METRICS_PREFIX = "replication"; private Master master; private ObjectName objectName = null; private ReplicationUtil replicationUtil; - public ReplicationMetrics(Master master) throws MalformedObjectNameException, AccumuloException, AccumuloSecurityException { + public ReplicationMetrics(Master master) { super(); this.master = master; - objectName = new ObjectName("accumulo.server.metrics:service=Replication Metrics,name=ReplicationMBean,instance=" + Thread.currentThread().getName()); + try { + objectName = new ObjectName("accumulo.server.metrics:service=Replication Metrics,name=ReplicationMBean,instance=" + Thread.currentThread().getName()); + } catch (Exception e) { + log.error("Exception setting MBean object name", e); + } replicationUtil = new ReplicationUtil(master); } http://git-wip-us.apache.org/repos/asf/accumulo/blob/c4aea2d6/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java ---------------------------------------------------------------------- diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java index d4447ab..8492537 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java @@ -16,6 +16,9 @@ */ package org.apache.accumulo.tserver; +import static java.nio.charset.StandardCharsets.UTF_8; +import static org.apache.accumulo.server.problems.ProblemType.TABLET_LOAD; + import java.io.FileNotFoundException; import java.io.IOException; import java.lang.management.ManagementFactory; @@ -49,9 +52,6 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.ReentrantLock; -import javax.management.StandardMBean; - -import com.google.common.net.HostAndPort; import org.apache.accumulo.core.Constants; import org.apache.accumulo.core.client.AccumuloException; import org.apache.accumulo.core.client.AccumuloSecurityException; @@ -166,6 +166,7 @@ import org.apache.accumulo.server.master.state.TabletLocationState; import org.apache.accumulo.server.master.state.TabletLocationState.BadLocationStateException; import org.apache.accumulo.server.master.state.TabletStateStore; import org.apache.accumulo.server.master.state.ZooTabletStateStore; +import org.apache.accumulo.server.metrics.Metrics; import org.apache.accumulo.server.problems.ProblemReport; import org.apache.accumulo.server.problems.ProblemReports; import org.apache.accumulo.server.replication.ZooKeeperInitialization; @@ -199,8 +200,7 @@ import org.apache.accumulo.tserver.log.TabletServerLogger; import org.apache.accumulo.tserver.mastermessage.MasterMessage; import org.apache.accumulo.tserver.mastermessage.SplitReportMessage; import org.apache.accumulo.tserver.mastermessage.TabletStatusMessage; -import org.apache.accumulo.tserver.metrics.TabletServerMBean; -import org.apache.accumulo.tserver.metrics.TabletServerMBeanImpl; +import org.apache.accumulo.tserver.metrics.TabletServerMetricsFactory; import org.apache.accumulo.tserver.metrics.TabletServerMinCMetrics; import org.apache.accumulo.tserver.metrics.TabletServerScanMetrics; import org.apache.accumulo.tserver.metrics.TabletServerUpdateMetrics; @@ -238,8 +238,7 @@ import org.apache.thrift.server.TServer; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.KeeperException.NoNodeException; -import static java.nio.charset.StandardCharsets.UTF_8; -import static org.apache.accumulo.server.problems.ProblemType.TABLET_LOAD; +import com.google.common.net.HostAndPort; public class TabletServer extends AccumuloServerContext implements Runnable { private static final Logger log = Logger.getLogger(TabletServer.class); @@ -254,9 +253,12 @@ public class TabletServer extends AccumuloServerContext implements Runnable { private final TabletServerLogger logger; - private final TabletServerMinCMetrics mincMetrics = new TabletServerMinCMetrics(); + private final TabletServerMetricsFactory metricsFactory; + private final Metrics updateMetrics; + private final Metrics scanMetrics; + private final Metrics mincMetrics; - public TabletServerMinCMetrics getMinCMetrics() { + public Metrics getMinCMetrics() { return mincMetrics; } @@ -337,13 +339,15 @@ public class TabletServer extends AccumuloServerContext implements Runnable { logger = new TabletServerLogger(this, walogMaxSize, syncCounter, flushCounter); this.resourceManager = new TabletServerResourceManager(this, fs); this.security = AuditedSecurityOperation.getInstance(this); + + metricsFactory = new TabletServerMetricsFactory(aconf); + updateMetrics = metricsFactory.createUpdateMetrics(); + scanMetrics = new TabletServerScanMetrics(); + mincMetrics = new TabletServerMinCMetrics(); } private final SessionManager sessionManager; - private final TabletServerUpdateMetrics updateMetrics = new TabletServerUpdateMetrics(); - - private final TabletServerScanMetrics scanMetrics = new TabletServerScanMetrics(); private final WriteTracker writeTracker = new WriteTracker(); @@ -351,19 +355,12 @@ public class TabletServer extends AccumuloServerContext implements Runnable { private final AtomicLong totalQueuedMutationSize = new AtomicLong(0); private final ReentrantLock recoveryLock = new ReentrantLock(true); - + private class ThriftClientHandler extends ClientServiceHandler implements TabletClientService.Iface { ThriftClientHandler() { super(TabletServer.this, watcher, fs); log.debug(ThriftClientHandler.class.getName() + " created"); - // Register the metrics MBean - try { - updateMetrics.register(); - scanMetrics.register(); - } catch (Exception e) { - log.error("Exception registering MBean with MBean Server", e); - } } @Override @@ -2207,7 +2204,7 @@ public class TabletServer extends AccumuloServerContext implements Runnable { recoveryLock.unlock(); } } - + public void addLoggersToMetadata(List<DfsLogger> logs, KeyExtent extent, int id) { if (!this.onlineTablets.containsKey(extent)) { log.info("Not adding " + logs.size() + " logs for extent " + extent + " as alias " + id + " tablet is offline"); @@ -2379,6 +2376,18 @@ public class TabletServer extends AccumuloServerContext implements Runnable { throw new RuntimeException(e); } + Metrics tserverMetrics = metricsFactory.createTabletServerMetrics(this); + + // Register MBeans + try { + tserverMetrics.register(); + mincMetrics.register(); + scanMetrics.register(); + updateMetrics.register(); + } catch (Exception e) { + log.error("Error registering with JMX", e); + } + try { clientAddress = startTabletClientService(); } catch (UnknownHostException e1) { @@ -2428,16 +2437,6 @@ public class TabletServer extends AccumuloServerContext implements Runnable { }; SimpleTimer.getInstance(aconf).schedule(replicationWorkThreadPoolResizer, 10000, 30000); - try { - // Do this because interface not in same package. - TabletServerMBeanImpl beanImpl = new TabletServerMBeanImpl(this); - StandardMBean mbean = new StandardMBean(beanImpl, TabletServerMBean.class, false); - beanImpl.register(mbean); - mincMetrics.register(); - } catch (Exception e) { - log.error("Error registering with JMX", e); - } - String masterHost; while (!serverStopRequested) { // send all of the pending messages http://git-wip-us.apache.org/repos/asf/accumulo/blob/c4aea2d6/server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/Metrics2TabletServerMetrics.java ---------------------------------------------------------------------- diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/Metrics2TabletServerMetrics.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/Metrics2TabletServerMetrics.java new file mode 100644 index 0000000..8fecf44 --- /dev/null +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/Metrics2TabletServerMetrics.java @@ -0,0 +1,208 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.tserver.metrics; + +import org.apache.accumulo.server.metrics.Metrics; +import org.apache.accumulo.tserver.TabletServer; +import org.apache.accumulo.tserver.tablet.Tablet; +import org.apache.hadoop.metrics2.MetricsCollector; +import org.apache.hadoop.metrics2.MetricsRecordBuilder; +import org.apache.hadoop.metrics2.MetricsSource; +import org.apache.hadoop.metrics2.MetricsSystem; +import org.apache.hadoop.metrics2.lib.Interns; +import org.apache.hadoop.metrics2.lib.MetricsRegistry; + +/** + * + */ +public class Metrics2TabletServerMetrics implements Metrics, MetricsSource { + public static final String ENTRIES = "entries", ENTRIES_IN_MEM = "entriesInMem", HOLD_TIME = "holdTime", FILES_PER_TABLET = "filesPerTablet", + ACTIVE_MAJCS = "activeMajCs", QUEUED_MAJCS = "queuedMajCs", ACTIVE_MINCS = "activeMinCs", QUEUED_MINCS = "queuedMinCs", ONLINE_TABLETS = "onlineTablets", + OPENING_TABLETS = "openingTablets", UNOPENED_TABLETS = "unopenedTablets", QUERIES = "queries", TOTAL_MINCS = "totalMinCs"; + + private final TabletServer tserver; + private final MetricsSystem system; + private final MetricsRegistry registry; + + public Metrics2TabletServerMetrics(TabletServer tserver, MetricsSystem system) { + this.tserver = tserver; + this.system = system; + this.registry = new MetricsRegistry(Interns.info("TabletServerMetrics", "General TabletServer Metrics")); + + registry.newStat(ENTRIES, "Number of entries", "Ops", "Count"); + registry.newStat(ENTRIES_IN_MEM, "Number of entries in memory", "Ops", "Count"); + registry.newStat(FILES_PER_TABLET, "Number of files per tablet", "Ops", "Files", true); + + registry.newGauge(Interns.info(ACTIVE_MAJCS, "Number of active major compactions"), 0l); + registry.newGauge(Interns.info("queuedMajCs", "Number of queued major compactions"), 0l); + registry.newGauge(Interns.info("activeMinCs", "Number of active minor compactions"), 0l); + registry.newGauge(Interns.info("queuedMinCs", "Number of queued minor compactions"), 0l); + registry.newGauge(Interns.info("onlineTablets", "Number of online tablets"), 0l); + registry.newGauge(Interns.info("openingTablets", "Number of opening tablets"), 0l); + registry.newGauge(Interns.info("unopenedTablets", "Number of unopened tablets"), 0l); + registry.newGauge(Interns.info("queries", "Number of queries"), 0l); + registry.newGauge(Interns.info("totalMinCs", "Total number of minor compactions performed"), 0l); + + } + + @Override + public void add(String name, long time) { + throw new UnsupportedOperationException("add() is not implemented"); + } + + @Override + public void register() { + system.register("TabletServerMetrics", "General TabletServer Metrics", this); + } + + @Override + public boolean isEnabled() { + return true; + } + + protected void snapshot() { + registry.add(ENTRIES, getEntries()); + registry.add(ENTRIES_IN_MEM, getEntriesInMemory()); + registry.add(FILES_PER_TABLET, (long) this.getAverageFilesPerTablet()); + + registry.add(ACTIVE_MAJCS, getMajorCompactions()); + registry.add(QUEUED_MAJCS, getMajorCompactionsQueued()); + registry.add(ACTIVE_MINCS, getMinorCompactions()); + registry.add(QUEUED_MINCS, getMinorCompactionsQueued()); + registry.add(ONLINE_TABLETS, getOnlineCount()); + registry.add(OPENING_TABLETS, getOpeningCount()); + registry.add(UNOPENED_TABLETS, getUnopenedCount()); + registry.add(QUERIES, getQueries()); + registry.add(TOTAL_MINCS, getTotalMinorCompactions()); + } + + @Override + public void getMetrics(MetricsCollector collector, boolean all) { + MetricsRecordBuilder builder = collector.addRecord("Accumulo").setContext("tserver"); + + snapshot(); + + // TODO Some day, MetricsRegistry will also support the MetricsGaugeDouble + builder.addGauge(Interns.info(HOLD_TIME, "Time commits held"), getHoldTime()); + + registry.snapshot(builder, all); + } + + public long getEntries() { + long result = 0; + for (Tablet tablet : tserver.getOnlineTablets()) { + result += tablet.getNumEntries(); + } + return result; + } + + public long getEntriesInMemory() { + long result = 0; + for (Tablet tablet : tserver.getOnlineTablets()) { + result += tablet.getNumEntriesInMemory(); + } + return result; + } + + public long getIngest() { + long result = 0; + for (Tablet tablet : tserver.getOnlineTablets()) { + result += tablet.getNumEntriesInMemory(); + } + return result; + } + + public int getMajorCompactions() { + int result = 0; + for (Tablet tablet : tserver.getOnlineTablets()) { + if (tablet.isMajorCompactionRunning()) + result++; + } + return result; + } + + public int getMajorCompactionsQueued() { + int result = 0; + for (Tablet tablet : tserver.getOnlineTablets()) { + if (tablet.isMajorCompactionQueued()) + result++; + } + return result; + } + + public int getMinorCompactions() { + int result = 0; + for (Tablet tablet : tserver.getOnlineTablets()) { + if (tablet.isMinorCompactionRunning()) + result++; + } + return result; + } + + public int getMinorCompactionsQueued() { + int result = 0; + for (Tablet tablet : tserver.getOnlineTablets()) { + if (tablet.isMinorCompactionQueued()) + result++; + } + return result; + } + + public int getOnlineCount() { + return tserver.getOnlineTablets().size(); + } + + public int getOpeningCount() { + return tserver.getOpeningCount(); + } + + public long getQueries() { + long result = 0; + for (Tablet tablet : tserver.getOnlineTablets()) { + result += tablet.totalQueries(); + } + return result; + } + + public int getUnopenedCount() { + return tserver.getUnopenedCount(); + } + + public String getName() { + return tserver.getClientAddressString(); + } + + public long getTotalMinorCompactions() { + return tserver.getTotalMinorCompactions(); + } + + public double getHoldTime() { + return tserver.getHoldTimeMillis() / 1000.; + } + + public double getAverageFilesPerTablet() { + int count = 0; + long result = 0; + for (Tablet tablet : tserver.getOnlineTablets()) { + result += tablet.getDatafiles().size(); + count++; + } + if (count == 0) + return 0; + return result / (double) count; + } +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/c4aea2d6/server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/Metrics2TabletServerMinCMetrics.java ---------------------------------------------------------------------- diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/Metrics2TabletServerMinCMetrics.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/Metrics2TabletServerMinCMetrics.java new file mode 100644 index 0000000..f152d48 --- /dev/null +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/Metrics2TabletServerMinCMetrics.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.tserver.metrics; + +import org.apache.accumulo.server.metrics.Metrics; +import org.apache.hadoop.metrics2.MetricsCollector; +import org.apache.hadoop.metrics2.MetricsRecordBuilder; +import org.apache.hadoop.metrics2.MetricsSource; +import org.apache.hadoop.metrics2.MetricsSystem; +import org.apache.hadoop.metrics2.lib.Interns; +import org.apache.hadoop.metrics2.lib.MetricsRegistry; + +/** + * + */ +public class Metrics2TabletServerMinCMetrics implements Metrics, MetricsSource { + + private final MetricsSystem system; + private final MetricsRegistry registry; + + public Metrics2TabletServerMinCMetrics(MetricsSystem system) { + this.system = system; + this.registry = new MetricsRegistry(Interns.info("MinorCompactionMetrics", "TabletServer Minor Compaction Metrics")); + } + + @Override + public void add(String name, long time) { + registry.add(name, time); + } + + @Override + public void register() { + system.register("MinorCompactionMetrics", "TabletServer Minor Compaction Metrics", this); + } + + @Override + public boolean isEnabled() { + return true; + } + + @Override + public void getMetrics(MetricsCollector collector, boolean all) { + MetricsRecordBuilder builder = collector.addRecord("Accumulo").setContext("tserver.minc"); + + registry.snapshot(builder, all); + } + +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/c4aea2d6/server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/Metrics2TabletServerScanMetrics.java ---------------------------------------------------------------------- diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/Metrics2TabletServerScanMetrics.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/Metrics2TabletServerScanMetrics.java new file mode 100644 index 0000000..b57f582 --- /dev/null +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/Metrics2TabletServerScanMetrics.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.tserver.metrics; + +import org.apache.accumulo.server.metrics.Metrics; +import org.apache.hadoop.metrics2.MetricsCollector; +import org.apache.hadoop.metrics2.MetricsRecordBuilder; +import org.apache.hadoop.metrics2.MetricsSource; +import org.apache.hadoop.metrics2.MetricsSystem; +import org.apache.hadoop.metrics2.lib.Interns; +import org.apache.hadoop.metrics2.lib.MetricsRegistry; + +/** + * + */ +public class Metrics2TabletServerScanMetrics implements Metrics, MetricsSource { + + private final MetricsSystem system; + private final MetricsRegistry registry; + + public Metrics2TabletServerScanMetrics(MetricsSystem system) { + this.system = system; + this.registry = new MetricsRegistry(Interns.info("ScanMetrics", "TabletServer Scan Metrics")); + } + + @Override + public void add(String name, long time) { + registry.add(name, time); + } + + @Override + public void register() { + system.register("ScanMetrics", "TabletServer Scan Metrics", this); + } + + @Override + public boolean isEnabled() { + return true; + } + + @Override + public void getMetrics(MetricsCollector collector, boolean all) { + MetricsRecordBuilder builder = collector.addRecord("Accumulo").setContext("tserver.scan"); + + registry.snapshot(builder, all); + } + +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/c4aea2d6/server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/Metrics2TabletServerUpdateMetrics.java ---------------------------------------------------------------------- diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/Metrics2TabletServerUpdateMetrics.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/Metrics2TabletServerUpdateMetrics.java new file mode 100644 index 0000000..a8e0ad4 --- /dev/null +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/Metrics2TabletServerUpdateMetrics.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.tserver.metrics; + +import org.apache.accumulo.server.metrics.Metrics; +import org.apache.hadoop.metrics2.MetricsCollector; +import org.apache.hadoop.metrics2.MetricsRecordBuilder; +import org.apache.hadoop.metrics2.MetricsSource; +import org.apache.hadoop.metrics2.MetricsSystem; +import org.apache.hadoop.metrics2.lib.Interns; +import org.apache.hadoop.metrics2.lib.MetricsRegistry; + +/** + * + */ +public class Metrics2TabletServerUpdateMetrics implements Metrics, MetricsSource { + + private final MetricsSystem system; + private final MetricsRegistry registry; + + public Metrics2TabletServerUpdateMetrics(MetricsSystem system) { + this.system = system; + this.registry = new MetricsRegistry(Interns.info("UpdateMetrics", "TabletServer Update Metrics")); + } + + @Override + public void add(String name, long time) { + registry.add(name, time); + } + + @Override + public void register() { + system.register("UpdateMetrics", "TabletServer Update Metrics", this); + } + + @Override + public boolean isEnabled() { + return true; + } + + @Override + public void getMetrics(MetricsCollector collector, boolean all) { + MetricsRecordBuilder builder = collector.addRecord("Accumulo").setContext("tserver.update"); + + registry.snapshot(builder, all); + } + +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/c4aea2d6/server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/TabletServerMBeanImpl.java ---------------------------------------------------------------------- diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/TabletServerMBeanImpl.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/TabletServerMBeanImpl.java index 3970379..a12f56d 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/TabletServerMBeanImpl.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/TabletServerMBeanImpl.java @@ -18,23 +18,36 @@ package org.apache.accumulo.tserver.metrics; import javax.management.MalformedObjectNameException; import javax.management.ObjectName; +import javax.management.StandardMBean; import org.apache.accumulo.server.metrics.AbstractMetricsImpl; import org.apache.accumulo.tserver.TabletServer; import org.apache.accumulo.tserver.tablet.Tablet; +import org.apache.log4j.Logger; public class TabletServerMBeanImpl extends AbstractMetricsImpl implements TabletServerMBean { - + private static final Logger log = Logger.getLogger(TabletServerMBeanImpl.class); private static final String METRICS_PREFIX = "tserver"; private static ObjectName OBJECT_NAME = null; final TabletServer server; - - public TabletServerMBeanImpl(TabletServer server) throws MalformedObjectNameException { + + public TabletServerMBeanImpl(TabletServer server) { this.server = server; - OBJECT_NAME = new ObjectName("accumulo.server.metrics:service=TServerInfo,name=TabletServerMBean,instance=" + Thread.currentThread().getName()); + try { + OBJECT_NAME = new ObjectName("accumulo.server.metrics:service=TServerInfo,name=TabletServerMBean,instance=" + Thread.currentThread().getName()); + } catch (MalformedObjectNameException e) { + log.error("Exception setting MBean object name", e); + } } - + + @Override + public void register() throws Exception { + // Do this because interface not in same package. + StandardMBean mbean = new StandardMBean(this, TabletServerMBean.class, false); + register(mbean); + } + @Override public long getEntries() { if (isEnabled()) { http://git-wip-us.apache.org/repos/asf/accumulo/blob/c4aea2d6/server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/TabletServerMetricsFactory.java ---------------------------------------------------------------------- diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/TabletServerMetricsFactory.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/TabletServerMetricsFactory.java new file mode 100644 index 0000000..c799feb --- /dev/null +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/TabletServerMetricsFactory.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.tserver.metrics; + +import org.apache.accumulo.core.conf.AccumuloConfiguration; +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.server.metrics.Metrics; +import org.apache.accumulo.tserver.TabletServer; +import org.apache.hadoop.metrics2.MetricsSystem; +import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; + +import com.google.common.base.Preconditions; + +/** + * + */ +public class TabletServerMetricsFactory { + public static final String METRICS_PREFIX = "Accumulo"; + + private final boolean useOldMetrics; + private final MetricsSystem metricsSystem; + + public TabletServerMetricsFactory(AccumuloConfiguration conf) { + Preconditions.checkNotNull(conf); + useOldMetrics = conf.getBoolean(Property.GENERAL_LEGACY_METRICS); + + if (useOldMetrics) { + metricsSystem = null; + } else { + metricsSystem = DefaultMetricsSystem.initialize(METRICS_PREFIX); + } + } + + public Metrics createMincMetrics() { + if (useOldMetrics) { + return new TabletServerMinCMetrics(); + } + + return new Metrics2TabletServerMinCMetrics(metricsSystem); + } + + public Metrics createTabletServerMetrics(TabletServer tserver) { + if (useOldMetrics) { + return new TabletServerMBeanImpl(tserver); + } + + return new Metrics2TabletServerMetrics(tserver, metricsSystem); + } + + public Metrics createScanMetrics() { + if (useOldMetrics) { + return new TabletServerScanMetrics(); + } + + return new Metrics2TabletServerScanMetrics(metricsSystem); + } + + public Metrics createUpdateMetrics() { + if (useOldMetrics) { + return new TabletServerUpdateMetrics(); + } + + return new Metrics2TabletServerUpdateMetrics(metricsSystem); + } +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/c4aea2d6/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java ---------------------------------------------------------------------- diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java index bc55c4f..63bf1bf 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java @@ -97,6 +97,7 @@ import org.apache.accumulo.server.fs.VolumeUtil; import org.apache.accumulo.server.fs.VolumeUtil.TabletFiles; import org.apache.accumulo.server.master.state.TServerInstance; import org.apache.accumulo.server.master.tableOps.CompactionIterators; +import org.apache.accumulo.server.metrics.Metrics; import org.apache.accumulo.server.problems.ProblemReport; import org.apache.accumulo.server.problems.ProblemReports; import org.apache.accumulo.server.problems.ProblemType; @@ -971,7 +972,7 @@ public class Tablet implements TabletCommitter { if (!failed) { lastMinorCompactionFinishTime = System.currentTimeMillis(); } - TabletServerMinCMetrics minCMetrics = getTabletServer().getMinCMetrics(); + Metrics minCMetrics = getTabletServer().getMinCMetrics(); if (minCMetrics.isEnabled()) minCMetrics.add(TabletServerMinCMetrics.minc, (lastMinorCompactionFinishTime - start)); if (hasQueueTime) { http://git-wip-us.apache.org/repos/asf/accumulo/blob/c4aea2d6/test/src/main/java/org/apache/accumulo/test/functional/ZombieTServer.java ---------------------------------------------------------------------- diff --git a/test/src/main/java/org/apache/accumulo/test/functional/ZombieTServer.java b/test/src/main/java/org/apache/accumulo/test/functional/ZombieTServer.java index ef18efd..4094c5f 100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/ZombieTServer.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/ZombieTServer.java @@ -102,7 +102,7 @@ public class ZombieTServer { TransactionWatcher watcher = new TransactionWatcher(); final ThriftClientHandler tch = new ThriftClientHandler(context, watcher); Processor<Iface> processor = new Processor<Iface>(tch); - ServerAddress serverPort = TServerUtils.startTServer(HostAndPort.fromParts("0.0.0.0", port), processor, "ZombieTServer", "walking dead", 2, 1, 1000, + ServerAddress serverPort = TServerUtils.startTServer(context.getConfiguration(), HostAndPort.fromParts("0.0.0.0", port), processor, "ZombieTServer", "walking dead", 2, 1, 1000, 10 * 1024 * 1024, null, -1); String addressString = serverPort.address.toString(); http://git-wip-us.apache.org/repos/asf/accumulo/blob/c4aea2d6/test/src/main/java/org/apache/accumulo/test/performance/thrift/NullTserver.java ---------------------------------------------------------------------- diff --git a/test/src/main/java/org/apache/accumulo/test/performance/thrift/NullTserver.java b/test/src/main/java/org/apache/accumulo/test/performance/thrift/NullTserver.java index ae07925..671ead6 100644 --- a/test/src/main/java/org/apache/accumulo/test/performance/thrift/NullTserver.java +++ b/test/src/main/java/org/apache/accumulo/test/performance/thrift/NullTserver.java @@ -247,17 +247,17 @@ public class NullTserver { Opts opts = new Opts(); opts.parseArgs(NullTserver.class.getName(), args); + // modify metadata + ZooKeeperInstance zki = new ZooKeeperInstance(new ClientConfiguration().withInstance(opts.iname).withZkHosts(opts.keepers)); + AccumuloServerContext context = new AccumuloServerContext(new ServerConfigurationFactory(zki)); + TransactionWatcher watcher = new TransactionWatcher(); ThriftClientHandler tch = new ThriftClientHandler(new AccumuloServerContext(new ServerConfigurationFactory(HdfsZooInstance.getInstance())), watcher); Processor<Iface> processor = new Processor<Iface>(tch); - TServerUtils.startTServer(HostAndPort.fromParts("0.0.0.0", opts.port), processor, "NullTServer", "null tserver", 2, 1, 1000, 10 * 1024 * 1024, null, -1); + TServerUtils.startTServer(context.getConfiguration(), HostAndPort.fromParts("0.0.0.0", opts.port), processor, "NullTServer", "null tserver", 2, 1, 1000, 10 * 1024 * 1024, null, -1); HostAndPort addr = HostAndPort.fromParts(InetAddress.getLocalHost().getHostName(), opts.port); - // modify metadata - ZooKeeperInstance zki = new ZooKeeperInstance(new ClientConfiguration().withInstance(opts.iname).withZkHosts(opts.keepers)); - AccumuloServerContext context = new AccumuloServerContext(new ServerConfigurationFactory(zki)); - String tableId = Tables.getTableId(zki, opts.tableName); // read the locations for the table
