http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/MetricMetaParser.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/MetricMetaParser.java b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/MetricMetaParser.java new file mode 100644 index 0000000..c0a220f --- /dev/null +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/MetricMetaParser.java @@ -0,0 +1,58 @@ +package com.alibaba.jstorm.common.metric; + +import com.alibaba.jstorm.daemon.nimbus.TopologyMetricsRunnable; +import com.alibaba.jstorm.metric.MetaType; +import com.alibaba.jstorm.metric.MetricType; +import com.alibaba.jstorm.metric.MetricUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * @author wange + * @since 15/7/14 + */ +public class MetricMetaParser { + private static final Logger logger = LoggerFactory.getLogger(TopologyMetricsRunnable.class); + + public static MetricMeta fromMetricName(String name) { + try { + String[] parts = name.split(MetricUtils.DELIM); + char ch = parts[0].charAt(0); + if (ch == 'W' || ch == 'N' || ch == 'P') { + return parseWorkerMetricMeta(parts); + } else { + return parseTaskMetricMeta(parts); + } + } catch (Exception ex) { + logger.error("Error parsing metric meta, name:{}", name, ex); + } + return null; + } + + private static MetricMeta parseTaskMetricMeta(String[] parts) { + MetricMeta meta = new MetricMeta(); + meta.setMetaType(MetaType.parse(parts[0].charAt(0)).getT()); + meta.setMetricType(MetricType.parse(parts[0].charAt(1)).getT()); + meta.setTopologyId(parts[1]); + meta.setComponent(parts[2]); + meta.setTaskId(Integer.valueOf(parts[3])); + meta.setStreamId(parts[4]); + meta.setMetricGroup(parts[5]); + meta.setMetricName(parts[6]); + + return meta; + } + + private static MetricMeta parseWorkerMetricMeta(String[] parts) { + MetricMeta meta = new MetricMeta(); + meta.setMetaType(MetaType.parse(parts[0].charAt(0)).getT()); + meta.setMetricType(MetricType.parse(parts[0].charAt(1)).getT()); + meta.setTopologyId(parts[1]); + meta.setHost(parts[2]); + meta.setPort(Integer.valueOf(parts[3])); + meta.setMetricGroup(parts[4]); + meta.setMetricName(parts[5]); + + return meta; + } +}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/MetricRegistry.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/MetricRegistry.java b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/MetricRegistry.java deleted file mode 100755 index 982c5f6..0000000 --- a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/MetricRegistry.java +++ /dev/null @@ -1,316 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.alibaba.jstorm.common.metric; - -import java.util.Collections; -import java.util.Map; -import java.util.SortedMap; -import java.util.SortedSet; -import java.util.TreeMap; -import java.util.TreeSet; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.alibaba.jstorm.common.metric.window.Metric; - -public class MetricRegistry implements MetricSet { - private static final Logger LOG = LoggerFactory.getLogger(MetricRegistry.class); - - private static final long serialVersionUID = 8184106900230111064L; - public static final String NAME_SEPERATOR = "."; - - /** - * Concatenates elements to form a dotted name, eliding any null values or - * empty strings. - * - * @param name the first element of the name - * @param names the remaining elements of the name - * @return {@code name} and {@code names} concatenated by periods - */ - public static String name(String name, String... names) { - final StringBuilder builder = new StringBuilder(); - append(builder, name); - if (names != null) { - for (String s : names) { - append(builder, s); - } - } - return builder.toString(); - } - - /** - * Concatenates a class name and elements to form a dotted name, eliding any - * null values or empty strings. - * - * @param klass the first element of the name - * @param names the remaining elements of the name - * @return {@code klass} and {@code names} concatenated by periods - */ - public static String name(Class<?> klass, String... names) { - return name(klass.getName(), names); - } - - private static void append(StringBuilder builder, String part) { - if (part != null && !part.isEmpty()) { - if (builder.length() > 0) { - builder.append(NAME_SEPERATOR); - } - builder.append(part); - } - } - - protected final ConcurrentMap<String, Metric> metrics; - - /** - * Creates a new {@link MetricRegistry}. - */ - public MetricRegistry() { - this.metrics = buildMap(); - } - - /** - * Creates a new {@link ConcurrentMap} implementation for use inside the - * registry. Override this to create a {@link MetricRegistry} with space- or - * time-bounded metric lifecycles, for example. - * - * @return a new {@link ConcurrentMap} - */ - protected ConcurrentMap<String, Metric> buildMap() { - return new ConcurrentHashMap<String, Metric>(); - } - - /** - * Given a {@link Metric}, registers it under the given name. - * - * @param name the name of the metric - * @param metric the metric - * @param <T> the type of the metric - * @return {@code metric} - * @throws IllegalArgumentException if the name is already registered - */ - @SuppressWarnings("unchecked") - public <T extends Metric> T register(String name, T metric) - throws IllegalArgumentException { - if (metric instanceof MetricSet) { - registerAll(name, (MetricSet) metric); - } else { - final Metric existing = metrics.putIfAbsent(name, metric); - if (existing == null) { - // add one listener to notify - LOG.info("Successfully register metric of {}", name); - } else { - throw new IllegalArgumentException("A metric named " + name - + " already exists"); - } - } - return metric; - } - - /** - * Given a metric set, registers them. - * - * @param metrics a set of metrics - * @throws IllegalArgumentException if any of the names are already - * registered - */ - public void registerAll(MetricSet metrics) throws IllegalArgumentException { - registerAll(null, metrics); - } - - /** - * Removes the metric with the given name. - * - * @param name the name of the metric - * @return whether or not the metric was removed - */ - public boolean remove(String name) { - final Metric metric = metrics.remove(name); - if (metric != null) { - // call listener to notify remove - LOG.info("Successfully unregister metric of {}", name); - return true; - } - return false; - } - - /** - * Removes all metrics which match the given filter. - * - * @param filter a filter - */ - public void removeMatching(MetricFilter filter) { - for (Map.Entry<String, Metric> entry : metrics.entrySet()) { - if (filter.matches(entry.getKey(), entry.getValue())) { - remove(entry.getKey()); - } - } - } - - /** - * Returns a set of the names of all the metrics in the registry. - * - * @return the names of all the metrics - */ - public SortedSet<String> getNames() { - return Collections.unmodifiableSortedSet(new TreeSet<String>(metrics - .keySet())); - } - - /** - * Returns a map of all the gauges in the registry and their names. - * - * @return all the gauges in the registry - */ - public SortedMap<String, Gauge> getGauges() { - return getGauges(MetricFilter.ALL); - } - - /** - * Returns a map of all the gauges in the registry and their names which - * match the given filter. - * - * @param filter the metric filter to match - * @return all the gauges in the registry - */ - public SortedMap<String, Gauge> getGauges(MetricFilter filter) { - return getMetrics(Gauge.class, filter); - } - - /** - * Returns a map of all the counters in the registry and their names. - * - * @return all the counters in the registry - */ - public SortedMap<String, Counter> getCounters() { - return getCounters(MetricFilter.ALL); - } - - /** - * Returns a map of all the counters in the registry and their names which - * match the given filter. - * - * @param filter the metric filter to match - * @return all the counters in the registry - */ - public SortedMap<String, Counter> getCounters(MetricFilter filter) { - return getMetrics(Counter.class, filter); - } - - /** - * Returns a map of all the histograms in the registry and their names. - * - * @return all the histograms in the registry - */ - public SortedMap<String, Histogram> getHistograms() { - return getHistograms(MetricFilter.ALL); - } - - /** - * Returns a map of all the histograms in the registry and their names which - * match the given filter. - * - * @param filter the metric filter to match - * @return all the histograms in the registry - */ - public SortedMap<String, Histogram> getHistograms(MetricFilter filter) { - return getMetrics(Histogram.class, filter); - } - - /** - * Returns a map of all the meters in the registry and their names. - * - * @return all the meters in the registry - */ - public SortedMap<String, Meter> getMeters() { - return getMeters(MetricFilter.ALL); - } - - /** - * Returns a map of all the meters in the registry and their names which - * match the given filter. - * - * @param filter the metric filter to match - * @return all the meters in the registry - */ - public SortedMap<String, Meter> getMeters(MetricFilter filter) { - return getMetrics(Meter.class, filter); - } - - /** - * Returns a map of all the timers in the registry and their names. - * - * @return all the timers in the registry - */ - public SortedMap<String, Timer> getTimers() { - return getTimers(MetricFilter.ALL); - } - - /** - * Returns a map of all the timers in the registry and their names which - * match the given filter. - * - * @param filter the metric filter to match - * @return all the timers in the registry - */ - public SortedMap<String, Timer> getTimers(MetricFilter filter) { - return getMetrics(Timer.class, filter); - } - - @SuppressWarnings("unchecked") - private <T extends Metric> SortedMap<String, T> getMetrics(Class<T> klass, - MetricFilter filter) { - final TreeMap<String, T> timers = new TreeMap<String, T>(); - for (Map.Entry<String, Metric> entry : metrics.entrySet()) { - if (klass.isInstance(entry.getValue()) - && filter.matches(entry.getKey(), entry.getValue())) { - timers.put(entry.getKey(), (T) entry.getValue()); - } - } - return Collections.unmodifiableSortedMap(timers); - } - - private void registerAll(String prefix, MetricSet metrics) - throws IllegalArgumentException { - for (Map.Entry<String, Metric> entry : metrics.getMetrics().entrySet()) { - if (entry.getValue() instanceof MetricSet) { - registerAll(name(prefix, entry.getKey()), - (MetricSet) entry.getValue()); - } else { - register(name(prefix, entry.getKey()), entry.getValue()); - } - } - } - - @Override - public Map<String, Metric> getMetrics() { - return Collections.unmodifiableMap(metrics); - } - - /** - * Expose metrics is to improve performance - * - * @return - */ - public Metric getMetric(String name) { - return metrics.get(name); - } - -} http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/MetricSet.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/MetricSet.java b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/MetricSet.java deleted file mode 100755 index 243f9b8..0000000 --- a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/MetricSet.java +++ /dev/null @@ -1,28 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.alibaba.jstorm.common.metric; - -import java.io.Serializable; -import java.util.Map; - -import com.alibaba.jstorm.common.metric.window.Metric; - -public interface MetricSet extends Serializable { - Map<String, Metric> getMetrics(); - -} http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/QueueGauge.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/QueueGauge.java b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/QueueGauge.java index 0ff964e..114eeb2 100644 --- a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/QueueGauge.java +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/QueueGauge.java @@ -17,6 +17,7 @@ */ package com.alibaba.jstorm.common.metric; +import com.google.common.base.Joiner; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -34,10 +35,10 @@ public class QueueGauge extends HealthCheck implements Gauge<Double> { String name; Result healthy; - public QueueGauge(String name, DisruptorQueue queue) { + public QueueGauge(DisruptorQueue queue, String... names) { this.queue = queue; - this.name = name; - this.healthy = HealthCheck.Result.healthy(); + this.name = Joiner.on("-").join(names); + this.healthy = Result.healthy(); } @Override @@ -52,7 +53,7 @@ public class QueueGauge extends HealthCheck implements Gauge<Double> { // TODO Auto-generated method stub Double ret = (double) queue.pctFull(); if (ret > 0.9) { - return HealthCheck.Result.unhealthy(name + QUEUE_IS_FULL); + return Result.unhealthy(name + QUEUE_IS_FULL); } else { return healthy; } http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/TaskTrack.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/TaskTrack.java b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/TaskTrack.java new file mode 100644 index 0000000..0b49ecf --- /dev/null +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/TaskTrack.java @@ -0,0 +1,180 @@ +package com.alibaba.jstorm.common.metric; + +import com.alibaba.jstorm.metric.KVSerializable; +import com.alibaba.jstorm.metric.MetricUtils; +import com.alibaba.jstorm.utils.JStormUtils; + +import java.util.Date; + +/** + * @author wange + * @since 15/7/16 + */ +public class TaskTrack implements KVSerializable { + + private long id; + private String clusterName; + private String topologyId; + private String component; + private int taskId; + private String host; + private int port; + private Date start; + private Date end; + + public TaskTrack() { + } + + public TaskTrack(String clusterName, String topologyId) { + this.clusterName = clusterName; + this.topologyId = topologyId; + } + + public long getId() { + return id; + } + + public void setId(long id) { + this.id = id; + } + + public String getClusterName() { + return clusterName; + } + + public void setClusterName(String clusterName) { + this.clusterName = clusterName; + } + + public String getTopologyId() { + return topologyId; + } + + public void setTopologyId(String topologyId) { + this.topologyId = topologyId; + } + + public String getComponent() { + return component; + } + + public void setComponent(String component) { + this.component = component; + } + + public int getTaskId() { + return taskId; + } + + public void setTaskId(int taskId) { + this.taskId = taskId; + } + + public String getHost() { + return host; + } + + public void setHost(String host) { + this.host = host; + } + + public int getPort() { + return port; + } + + public void setPort(int port) { + this.port = port; + } + + public Date getStart() { + return start; + } + + public void setStart(Date start) { + this.start = start; + } + + public Date getEnd() { + return end; + } + + public void setEnd(Date end) { + this.end = end; + } + + /** + * key: clusterName + topologyId + taskId + time + */ + @Override + public byte[] getKey() { + StringBuilder sb = new StringBuilder(128); + sb.append(clusterName).append(MetricUtils.AT).append(topologyId).append(MetricUtils.AT) + .append(taskId).append(MetricUtils.AT); + if (start != null) { + sb.append(start.getTime()); + } else { + sb.append(end.getTime()); + } + return sb.toString().getBytes(); + } + + /** + * value: type + host + port + * type: S/E (start/end) + */ + @Override + public byte[] getValue() { + StringBuilder sb = new StringBuilder(32); + if (start != null) { + sb.append(KVSerializable.START); + } else { + sb.append(KVSerializable.END); + } + sb.append(MetricUtils.AT).append(host).append(MetricUtils.AT).append(port); + return sb.toString().getBytes(); + } + + @Override + public Object fromKV(byte[] key, byte[] value) { + String[] keyParts = new String(key).split(MetricUtils.DELIM); + + String[] valueParts = new String(value).split(MetricUtils.DELIM); + boolean isStart = false; + if (valueParts.length >= 3){ + if (valueParts[0].equals(KVSerializable.START)) isStart = true; + host = valueParts[1]; + port = JStormUtils.parseInt(valueParts[2]); + } + + if (keyParts.length >= 4){ + clusterName = keyParts[0]; + topologyId = keyParts[1]; + taskId = JStormUtils.parseInt(keyParts[2]); + long ts = JStormUtils.parseLong(keyParts[3]); + if (isStart) start = new Date(ts); + else end = new Date(ts); + } + + return this; + } + + public Date getTime() { + return start != null ? start : end; + } + + public String getIdentity(){ + StringBuilder sb = new StringBuilder(); + sb.append(clusterName).append(MetricUtils.AT).append(topologyId).append(MetricUtils.AT) + .append(taskId).append(MetricUtils.AT).append(host).append(MetricUtils.AT).append(port); + return sb.toString(); + } + + public void merge(TaskTrack taskTrack){ + if (taskTrack.start != null && this.start == null){ + this.start = taskTrack.start; + } + if (taskTrack.end != null && this.end == null){ + this.end = taskTrack.end; + } + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/Timer.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/Timer.java b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/Timer.java deleted file mode 100755 index daf5633..0000000 --- a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/Timer.java +++ /dev/null @@ -1,108 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.alibaba.jstorm.common.metric; - -import java.io.Closeable; -import java.util.concurrent.Callable; -import java.util.concurrent.TimeUnit; - -/** - * Use com.codahale.metrics's interface - * - * @author zhongyan.feng - * - */ -public class Timer extends Histogram { - private static final long serialVersionUID = 5915881891513771108L; - - /** - * A timing context. - * - * @see Timer#time() - */ - public static class Context implements Closeable { - private final Timer timer; - private final long startTime; - - private Context(Timer timer) { - this.timer = timer; - this.startTime = System.currentTimeMillis(); - } - - /** - * Stops recording the elapsed time, updates the timer and returns the - * elapsed time in nanoseconds. - */ - public long stop() { - final long elapsed = System.currentTimeMillis() - startTime; - timer.update(elapsed, TimeUnit.MILLISECONDS); - return elapsed; - } - - @Override - public void close() { - stop(); - } - } - - public Timer() { - init(); - } - - /** - * Adds a recorded duration. - * - * @param duration the length of the duration - * @param unit the scale unit of {@code duration} - */ - public void update(long duration, TimeUnit unit) { - update(unit.toMillis(duration)); - } - - /** - * Times and records the duration of event. - * - * @param event a {@link Callable} whose {@link Callable#call()} method - * implements a process whose duration should be timed - * @param <T> the type of the value returned by {@code event} - * @return the value returned by {@code event} - * @throws Exception if {@code event} throws an {@link Exception} - */ - public <T> T time(Callable<T> event) throws Exception { - final long startTime = System.currentTimeMillis(); - try { - return event.call(); - } finally { - update(System.currentTimeMillis() - startTime); - } - } - - /** - * Returns a new {@link Context}. - * - * @return a new {@link Context} - * @see Context - */ - public Context time() { - return new Context(this); - } - - public long getCount() { - return allWindow.getSnapshot().getTimes(); - } -} http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/TimerData.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/TimerData.java b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/TimerData.java new file mode 100644 index 0000000..3f2be9b --- /dev/null +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/TimerData.java @@ -0,0 +1,169 @@ +package com.alibaba.jstorm.common.metric; + + +import com.alibaba.jstorm.metric.Bytes; +import com.alibaba.jstorm.metric.KVSerializable; + +/** + * @author wange + * @since 15/6/23 + */ +public class TimerData extends MetricBaseData implements KVSerializable { + private long min; + private long max; + private double mean; + private double p50; + private double p75; + private double p95; + private double p98; + private double p99; + private double p999; + private double stddev; + + private double m1; + private double m5; + private double m15; + + public long getMin() { + return min; + } + + public void setMin(long min) { + this.min = min; + } + + public long getMax() { + return max; + } + + public void setMax(long max) { + this.max = max; + } + + public double getMean() { + return mean; + } + + public void setMean(double mean) { + this.mean = mean; + } + + public double getP50() { + return p50; + } + + public void setP50(double p50) { + this.p50 = p50; + } + + public double getP75() { + return p75; + } + + public void setP75(double p75) { + this.p75 = p75; + } + + public double getP95() { + return p95; + } + + public void setP95(double p95) { + this.p95 = p95; + } + + public double getP98() { + return p98; + } + + public void setP98(double p98) { + this.p98 = p98; + } + + public double getP99() { + return p99; + } + + public void setP99(double p99) { + this.p99 = p99; + } + + public double getP999() { + return p999; + } + + public void setP999(double p999) { + this.p999 = p999; + } + + public double getStddev() { + return stddev; + } + + public void setStddev(double stddev) { + this.stddev = stddev; + } + + public double getM1() { + return m1; + } + + public void setM1(double m1) { + this.m1 = m1; + } + + public double getM5() { + return m5; + } + + public void setM5(double m5) { + this.m5 = m5; + } + + public double getM15() { + return m15; + } + + public void setM15(double m15) { + this.m15 = m15; + } + + @Override + public byte[] getValue() { + byte[] ret = new byte[8 * 12]; + Bytes.putLong(ret, 0, min); + Bytes.putLong(ret, 8, max); + Bytes.putDouble(ret, 16, p50); + Bytes.putDouble(ret, 24, p75); + Bytes.putDouble(ret, 32, p95); + Bytes.putDouble(ret, 40, p98); + Bytes.putDouble(ret, 48, p99); + Bytes.putDouble(ret, 56, p999); + Bytes.putDouble(ret, 64, mean); + Bytes.putDouble(ret, 72, m1); + Bytes.putDouble(ret, 80, m5); + Bytes.putDouble(ret, 88, m15); + + return ret; + } + + @Override + public Object fromKV(byte[] key, byte[] value) { + parseKey(key); + + this.min = Bytes.toLong(value, 0, KVSerializable.LONG_SIZE); + this.max = Bytes.toLong(value, 8, KVSerializable.LONG_SIZE); + this.p50 = Bytes.toDouble(value, 16); + this.p75 = Bytes.toDouble(value, 24); + this.p95 = Bytes.toDouble(value, 32); + this.p98 = Bytes.toDouble(value, 40); + this.p99 = Bytes.toDouble(value, 48); + this.p999 = Bytes.toDouble(value, 56); + this.mean = Bytes.toDouble(value, 64); + this.m1 = Bytes.toDouble(value, 72); + this.m5 = Bytes.toDouble(value, 80); + this.m15 = Bytes.toDouble(value, 88); + + return this; + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/TimerRatio.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/TimerRatio.java b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/TimerRatio.java index 0a0e7e2..495ec4f 100755 --- a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/TimerRatio.java +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/TimerRatio.java @@ -28,11 +28,7 @@ public class TimerRatio implements Gauge<Double> { private long lastUpdateTime = 0; private long sum = 0; - private long lastGaugeTime; - - public void init() { - lastGaugeTime = System.nanoTime(); - } + private long lastGaugeTime = 0; public synchronized void start() { if (lastUpdateTime == 0) { http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/Top.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/Top.java b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/Top.java deleted file mode 100755 index 00ccc98..0000000 --- a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/Top.java +++ /dev/null @@ -1,157 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.alibaba.jstorm.common.metric; - -import java.util.ArrayList; -import java.util.Collection; -import java.util.Comparator; -import java.util.List; -import java.util.TreeSet; - -import com.alibaba.jstorm.common.metric.operator.convert.Convertor; -import com.alibaba.jstorm.common.metric.operator.merger.Merger; -import com.alibaba.jstorm.common.metric.operator.updater.Updater; -import com.alibaba.jstorm.common.metric.window.Metric; - -public class Top<T> extends Metric<List<T>, TreeSet<T>> { - private static final long serialVersionUID = 4990212679365713831L; - - final protected Comparator<T> comparator; - final protected int n; - - public Top(Comparator<T> comparator, int n) { - this.comparator = comparator; - this.n = n; - - this.defaultValue = new TreeSet<T>(comparator); - this.updater = new Top.TopUpdator<T>(comparator, n); - this.merger = new Top.TopMerger<T>(comparator, n); - this.convertor = new Top.SetToList<T>(); - - init(); - } - - public static class TopUpdator<T> implements Updater<TreeSet<T>> { - private static final long serialVersionUID = -3940041101182079146L; - - final protected Comparator<T> comparator; - final protected int n; - - public TopUpdator(Comparator<T> comparator, int n) { - this.comparator = comparator; - this.n = n; - } - - @SuppressWarnings("unchecked") - @Override - public TreeSet<T> update(Number object, TreeSet<T> cache, - Object... others) { - // TODO Auto-generated method stub - if (cache == null) { - cache = new TreeSet<T>(comparator); - } - - cache.add((T) object); - - if (cache.size() > n) { - cache.remove(cache.last()); - } - - return cache; - } - - @Override - public TreeSet<T> updateBatch(TreeSet<T> object, TreeSet<T> cache, - Object... objects) { - // TODO Auto-generated method stub - if (cache == null) { - cache = new TreeSet<T>(comparator); - } - - cache.addAll(object); - - while (cache.size() > n) { - cache.remove(cache.last()); - } - - return cache; - } - - } - - public static class TopMerger<T> implements Merger<TreeSet<T>> { - - private static final long serialVersionUID = 4478867986986581638L; - final protected Comparator<T> comparator; - final protected int n; - - public TopMerger(Comparator<T> comparator, int n) { - this.comparator = comparator; - this.n = n; - } - - @Override - public TreeSet<T> merge(Collection<TreeSet<T>> objs, - TreeSet<T> unflushed, Object... others) { - // TODO Auto-generated method stub - TreeSet<T> temp = new TreeSet<T>(comparator); - if (unflushed != null) { - temp.addAll(unflushed); - } - - for (TreeSet<T> set : objs) { - temp.addAll(set); - } - - if (temp.size() <= n) { - return temp; - } - - TreeSet<T> ret = new TreeSet<T>(comparator); - int i = 0; - for (T item : temp) { - if (i < n) { - ret.add(item); - i++; - } else { - break; - } - } - return ret; - } - - } - - public static class SetToList<T> implements Convertor<TreeSet<T>, List<T>> { - private static final long serialVersionUID = 4968816655779625255L; - - @Override - public List<T> convert(TreeSet<T> set) { - // TODO Auto-generated method stub - List<T> ret = new ArrayList<T>(); - if (set != null) { - for (T item : set) { - ret.add(item); - } - } - return ret; - } - - } - -} http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/TopologyHistory.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/TopologyHistory.java b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/TopologyHistory.java new file mode 100644 index 0000000..186e2be --- /dev/null +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/TopologyHistory.java @@ -0,0 +1,153 @@ +package com.alibaba.jstorm.common.metric; + +import com.alibaba.jstorm.metric.KVSerializable; +import com.alibaba.jstorm.metric.MetricUtils; +import com.alibaba.jstorm.utils.JStormUtils; + +import java.util.Date; + +/** + * @author wange + * @since 15/7/16 + */ +public class TopologyHistory implements KVSerializable { + + private long id; + private String clusterName; + private String topologyName; + private String topologyId; + private double sampleRate; + private Date start; + private Date end; + + public TopologyHistory() { + } + + public TopologyHistory(String clusterName, String topologyId) { + this.clusterName = clusterName; + this.topologyId = topologyId; + } + + public long getId() { + return id; + } + + public void setId(long id) { + this.id = id; + } + + public String getClusterName() { + return clusterName; + } + + public void setClusterName(String clusterName) { + this.clusterName = clusterName; + } + + public String getTopologyName() { + return topologyName; + } + + public void setTopologyName(String topologyName) { + this.topologyName = topologyName; + } + + public String getTopologyId() { + return topologyId; + } + + public void setTopologyId(String topologyId) { + this.topologyId = topologyId; + } + + public Date getStart() { + return start; + } + + public void setStart(Date start) { + this.start = start; + } + + public Date getEnd() { + return end; + } + + public void setEnd(Date end) { + this.end = end; + } + + public Date getTime() { + return start != null ? start : end; + } + + public String getTag() { + return start != null ? KVSerializable.START : KVSerializable.END; + } + + public double getSampleRate() { + return sampleRate; + } + + public void setSampleRate(Double sampleRate) { + if (sampleRate == null) { + this.sampleRate = 1.0d; + } else { + this.sampleRate = sampleRate; + } + } + + /** + * key: clusterName + topologyName + time + */ + @Override + public byte[] getKey() { + return MetricUtils.concat2(clusterName, topologyName, getTime().getTime()).getBytes(); + + } + + /** + * value: topologyId + type: S/E + */ + @Override + public byte[] getValue() { + return MetricUtils.concat2(topologyId, getTag(), sampleRate).getBytes(); + } + + @Override + public Object fromKV(byte[] key, byte[] value) { + String[] keyParts = new String(key).split(MetricUtils.DELIM); + long time = 0; + if (keyParts.length >= 3) { + this.clusterName = keyParts[0]; + this.topologyName = keyParts[1]; + time = Long.valueOf(keyParts[2]); + } + + String[] valueParts = new String(value).split(MetricUtils.DELIM); + if (valueParts.length >= 3) { + this.topologyId = valueParts[0]; + String tag = valueParts[1]; + if (tag.equals(KVSerializable.START)) { + this.start = new Date(time); + } else { + this.end = new Date(time); + } + this.sampleRate = JStormUtils.parseDouble(valueParts[2], 0.1d); + } + + return this; + } + + public String getIdentity(){ + return MetricUtils.concat2(clusterName, topologyId); + } + + public void merge(TopologyHistory history){ + if (history.start != null && this.start == null){ + this.start = history.start; + } + if (history.end != null && this.end == null){ + this.end = history.end; + } + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/Counter.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/Counter.java b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/Counter.java new file mode 100644 index 0000000..6745f14 --- /dev/null +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/Counter.java @@ -0,0 +1,51 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.jstorm.common.metric.old; + +import com.alibaba.jstorm.common.metric.old.operator.convert.DefaultConvertor; +import com.alibaba.jstorm.common.metric.old.operator.merger.SumMerger; +import com.alibaba.jstorm.common.metric.old.operator.updater.AddUpdater; +import com.alibaba.jstorm.common.metric.old.window.Metric; + +/** + * The class is similar to com.codahale.metrics.Counter + * + * Sum all window's value + * + * how to use Counter , please refer to Sampling Interface + * + * @author zhongyan.feng + * + * @param <T> + */ +public class Counter<T extends Number> extends Metric<T, T> { + private static final long serialVersionUID = -1362345159511508074L; + + public Counter(T zero) { + updater = new AddUpdater<T>(); + merger = new SumMerger<T>(); + convertor = new DefaultConvertor<T>(); + defaultValue = zero; + + init(); + } + + public static void main(String[] args) { + + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/Gauge.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/Gauge.java b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/Gauge.java new file mode 100644 index 0000000..b323df8 --- /dev/null +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/Gauge.java @@ -0,0 +1,61 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.jstorm.common.metric.old; + +import com.alibaba.jstorm.common.metric.old.window.Metric; +import com.alibaba.jstorm.common.metric.old.window.StatBuckets; + +import java.util.Map; +import java.util.TreeMap; + +public class Gauge<T extends Number> extends Metric<Number, Number> { + private static final long serialVersionUID = 1985614006717750790L; + + protected com.codahale.metrics.Gauge<T> gauge; + + public Gauge(com.codahale.metrics.Gauge<T> gauge) { + this.gauge = gauge; + + init(); + } + + @Override + public void init() { + + } + + @Override + public void update(Number obj) { + // TODO Auto-generated method stub + } + + @Override + public Map<Integer, Number> getSnapshot() { + // TODO Auto-generated method stub + Number value = gauge.getValue(); + + Map<Integer, Number> ret = new TreeMap<Integer, Number>(); + for (Integer timeKey : windowSeconds) { + ret.put(timeKey, value); + } + ret.put(StatBuckets.ALL_TIME_WINDOW, value); + + return ret; + } + +} http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/Histogram.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/Histogram.java b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/Histogram.java new file mode 100644 index 0000000..478de4e --- /dev/null +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/Histogram.java @@ -0,0 +1,102 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.jstorm.common.metric.old; + +import com.alibaba.jstorm.common.metric.old.operator.convert.Convertor; +import com.alibaba.jstorm.common.metric.old.operator.merger.AvgMerger; +import com.alibaba.jstorm.common.metric.old.operator.updater.AvgUpdater; +import com.alibaba.jstorm.common.metric.old.window.Metric; + +/** + * Meter is used to compute tps + * + * Attention: 1. + * + * @author zhongyan.feng + * + */ +public class Histogram extends Metric<Double, Histogram.HistorgramPair> { + private static final long serialVersionUID = -1362345159511508074L; + + public Histogram() { + defaultValue = new HistorgramPair(); + updater = new AvgUpdater(); + merger = new AvgMerger(); + convertor = new HistogramConvertor(); + + init(); + } + + public static class HistogramConvertor implements Convertor<HistorgramPair, Double> { + private static final long serialVersionUID = -1569170826785657226L; + + @Override + public Double convert(HistorgramPair from) { + // TODO Auto-generated method stub + if (from == null) { + return 0.0d; + } + + if (from.getTimes() == 0) { + return 0.0d; + } else { + return from.getSum() / from.getTimes(); + } + } + + } + + public static class HistorgramPair { + private double sum; + private long times; + + public HistorgramPair() { + + } + + public HistorgramPair(double sum, long times) { + this.sum = sum; + this.times = times; + } + + public double getSum() { + return sum; + } + + public void setSum(double sum) { + this.sum = sum; + } + + public void addValue(double value) { + sum += value; + } + + public long getTimes() { + return times; + } + + public void setTimes(long times) { + this.times = times; + } + + public void addTimes(long time) { + times += time; + } + } + +} http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/LongCounter.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/LongCounter.java b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/LongCounter.java new file mode 100644 index 0000000..cd64e62 --- /dev/null +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/LongCounter.java @@ -0,0 +1,39 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.jstorm.common.metric.old; + +import com.alibaba.jstorm.common.metric.old.operator.convert.AtomicLongToLong; +import com.alibaba.jstorm.common.metric.old.operator.merger.LongSumMerger; +import com.alibaba.jstorm.common.metric.old.operator.updater.LongAddUpdater; +import com.alibaba.jstorm.common.metric.old.window.Metric; + +import java.util.concurrent.atomic.AtomicLong; + +public class LongCounter extends Metric<Long, AtomicLong> { + private static final long serialVersionUID = -1362345159511508074L; + + public LongCounter() { + super.defaultValue = new AtomicLong(0); + super.updater = new LongAddUpdater(); + super.merger = new LongSumMerger(); + super.convertor = new AtomicLongToLong(); + + init(); + } + +} http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/Meter.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/Meter.java b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/Meter.java new file mode 100644 index 0000000..cde66fd --- /dev/null +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/Meter.java @@ -0,0 +1,49 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.jstorm.common.metric.old; + +import com.alibaba.jstorm.common.metric.old.operator.convert.DefaultConvertor; +import com.alibaba.jstorm.common.metric.old.operator.merger.TpsMerger; +import com.alibaba.jstorm.common.metric.old.operator.updater.AddUpdater; +import com.alibaba.jstorm.common.metric.old.window.Metric; + +/** + * Meter is used to compute tps + * + * Attention: 1. + * + * @author zhongyan.feng + * + */ +public class Meter extends Metric<Double, Double> { + private static final long serialVersionUID = -1362345159511508074L; + + public Meter() { + defaultValue = 0.0d; + updater = new AddUpdater<Double>(); + merger = new TpsMerger(); + convertor = new DefaultConvertor<Double>(); + + init(); + } + + public void update() { + update(Double.valueOf(1)); + } + +} http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/MetricFilter.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/MetricFilter.java b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/MetricFilter.java new file mode 100644 index 0000000..a91b925 --- /dev/null +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/MetricFilter.java @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.jstorm.common.metric.old; + +import com.alibaba.jstorm.common.metric.old.window.Metric; + +import java.io.Serializable; + +public interface MetricFilter extends Serializable { + /** + * Matches all metrics, regardless of type or name. + */ + MetricFilter ALL = new MetricFilter() { + private static final long serialVersionUID = 7089987006352295530L; + + @Override + public boolean matches(String name, Metric metric) { + return true; + } + }; + + /** + * Returns {@code true} if the metric matches the filter; {@code false} otherwise. + * + * @param name the metric's name + * @param metric the metric + * @return {@code true} if the metric matches the filter + */ + boolean matches(String name, Metric metric); +} http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/MetricSet.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/MetricSet.java b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/MetricSet.java new file mode 100644 index 0000000..1cce913 --- /dev/null +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/MetricSet.java @@ -0,0 +1,28 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.jstorm.common.metric.old; + +import com.alibaba.jstorm.common.metric.old.window.Metric; + +import java.io.Serializable; +import java.util.Map; + +public interface MetricSet extends Serializable { + Map<String, Metric> getMetrics(); + +} http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/MetricThrift.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/MetricThrift.java b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/MetricThrift.java new file mode 100644 index 0000000..8de6f6d --- /dev/null +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/MetricThrift.java @@ -0,0 +1,106 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.jstorm.common.metric.old; + +import backtype.storm.generated.MetricInfo; +import backtype.storm.generated.MetricWindow; +import com.alibaba.jstorm.utils.JStormUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; + +public class MetricThrift { + private static final Logger LOG = LoggerFactory.getLogger(MetricThrift.class); + + public static MetricInfo mkMetricInfo() { + return new MetricInfo(); + } + + public static void insert(MetricInfo metricInfo, String key, Map<Integer, Double> windowSet) { + } + + public static MetricWindow merge(Map<String, MetricWindow> details) { + Map<Integer, Double> merge = new HashMap<Integer, Double>(); + + for (Entry<String, MetricWindow> entry : details.entrySet()) { + MetricWindow metricWindow = entry.getValue(); + Map<Integer, Double> metric = metricWindow.get_metricWindow(); + + for (Entry<Integer, Double> metricEntry : metric.entrySet()) { + Integer key = metricEntry.getKey(); + try { + Double value = ((Number) JStormUtils.add(metricEntry.getValue(), merge.get(key))).doubleValue(); + merge.put(key, value); + } catch (Exception e) { + LOG.error("Invalid type of " + entry.getKey() + ":" + key, e); + } + } + } + + MetricWindow ret = new MetricWindow(); + + ret.set_metricWindow(merge); + return ret; + } + + public static void merge(MetricInfo metricInfo, Map<String, Map<String, MetricWindow>> extraMap) { + for (Entry<String, Map<String, MetricWindow>> entry : extraMap.entrySet()) { + String metricName = entry.getKey(); + // metricInfo.put_to_baseMetric(metricName, merge(entry.getValue())); + } + } + + public static MetricWindow mergeMetricWindow(MetricWindow fromMetric, MetricWindow toMetric) { + if (toMetric == null) { + toMetric = new MetricWindow(new HashMap<Integer, Double>()); + } + + if (fromMetric == null) { + return toMetric; + } + + List<Map<Integer, Double>> list = new ArrayList<Map<Integer, Double>>(); + list.add(fromMetric.get_metricWindow()); + list.add(toMetric.get_metricWindow()); + Map<Integer, Double> merged = JStormUtils.mergeMapList(list); + + toMetric.set_metricWindow(merged); + + return toMetric; + } + + public static MetricInfo mergeMetricInfo(MetricInfo from, MetricInfo to) { + if (to == null) { + to = mkMetricInfo(); + } + + if (from == null) { + return to; + } + // to.get_baseMetric().putAll(from.get_baseMetric()); + + return to; + + } + +} http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/RegistryType.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/RegistryType.java b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/RegistryType.java new file mode 100644 index 0000000..6e8a020 --- /dev/null +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/RegistryType.java @@ -0,0 +1,9 @@ +package com.alibaba.jstorm.common.metric.old; + +/** + * @author wange + * @since 15/6/11 + */ +public enum RegistryType { + STREAM, TASK, COMPONENT, WORKER, SYS +} http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/StaticsType.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/StaticsType.java b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/StaticsType.java new file mode 100644 index 0000000..2094b9a --- /dev/null +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/StaticsType.java @@ -0,0 +1,22 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.jstorm.common.metric.old; + +public enum StaticsType { + emitted, send_tps, recv_tps, acked, failed, transferred, process_latencies; +} http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/Timer.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/Timer.java b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/Timer.java new file mode 100644 index 0000000..675c237 --- /dev/null +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/Timer.java @@ -0,0 +1,106 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.jstorm.common.metric.old; + +import java.io.Closeable; +import java.util.concurrent.Callable; +import java.util.concurrent.TimeUnit; + +/** + * Use com.codahale.metrics's interface + * + * @author zhongyan.feng + * + */ +public class Timer extends Histogram { + private static final long serialVersionUID = 5915881891513771108L; + + /** + * A timing context. + * + * @see Timer#time() + */ + public static class Context implements Closeable { + private final Timer timer; + private final long startTime; + + private Context(Timer timer) { + this.timer = timer; + this.startTime = System.currentTimeMillis(); + } + + /** + * Stops recording the elapsed time, updates the timer and returns the elapsed time in nanoseconds. + */ + public long stop() { + final long elapsed = System.currentTimeMillis() - startTime; + timer.update(elapsed, TimeUnit.MILLISECONDS); + return elapsed; + } + + @Override + public void close() { + stop(); + } + } + + public Timer() { + init(); + } + + /** + * Adds a recorded duration. + * + * @param duration the length of the duration + * @param unit the scale unit of {@code duration} + */ + public void update(long duration, TimeUnit unit) { + update(unit.toMillis(duration)); + } + + /** + * Times and records the duration of event. + * + * @param event a {@link Callable} whose {@link Callable#call()} method implements a process whose duration should be timed + * @param <T> the type of the value returned by {@code event} + * @return the value returned by {@code event} + * @throws Exception if {@code event} throws an {@link Exception} + */ + public <T> T time(Callable<T> event) throws Exception { + final long startTime = System.currentTimeMillis(); + try { + return event.call(); + } finally { + update(System.currentTimeMillis() - startTime); + } + } + + /** + * Returns a new {@link Context}. + * + * @return a new {@link Context} + * @see Context + */ + public Context time() { + return new Context(this); + } + + public long getCount() { + return allWindow.getSnapshot().getTimes(); + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/Top.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/Top.java b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/Top.java new file mode 100644 index 0000000..e3fdbdd --- /dev/null +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/Top.java @@ -0,0 +1,154 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.jstorm.common.metric.old; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Comparator; +import java.util.List; +import java.util.TreeSet; + +import com.alibaba.jstorm.common.metric.old.operator.convert.Convertor; +import com.alibaba.jstorm.common.metric.old.operator.merger.Merger; +import com.alibaba.jstorm.common.metric.old.operator.updater.Updater; +import com.alibaba.jstorm.common.metric.old.window.Metric; + +public class Top<T> extends Metric<List<T>, TreeSet<T>> { + private static final long serialVersionUID = 4990212679365713831L; + + final protected Comparator<T> comparator; + final protected int n; + + public Top(Comparator<T> comparator, int n) { + this.comparator = comparator; + this.n = n; + + this.defaultValue = new TreeSet<T>(comparator); + this.updater = new TopUpdator<T>(comparator, n); + this.merger = new TopMerger<T>(comparator, n); + this.convertor = new SetToList<T>(); + + init(); + } + + public static class TopUpdator<T> implements Updater<TreeSet<T>> { + private static final long serialVersionUID = -3940041101182079146L; + + final protected Comparator<T> comparator; + final protected int n; + + public TopUpdator(Comparator<T> comparator, int n) { + this.comparator = comparator; + this.n = n; + } + + @SuppressWarnings("unchecked") + @Override + public TreeSet<T> update(Number object, TreeSet<T> cache, Object... others) { + // TODO Auto-generated method stub + if (cache == null) { + cache = new TreeSet<T>(comparator); + } + + cache.add((T) object); + + if (cache.size() > n) { + cache.remove(cache.last()); + } + + return cache; + } + + @Override + public TreeSet<T> updateBatch(TreeSet<T> object, TreeSet<T> cache, Object... objects) { + // TODO Auto-generated method stub + if (cache == null) { + cache = new TreeSet<T>(comparator); + } + + cache.addAll(object); + + while (cache.size() > n) { + cache.remove(cache.last()); + } + + return cache; + } + + } + + public static class TopMerger<T> implements Merger<TreeSet<T>> { + + private static final long serialVersionUID = 4478867986986581638L; + final protected Comparator<T> comparator; + final protected int n; + + public TopMerger(Comparator<T> comparator, int n) { + this.comparator = comparator; + this.n = n; + } + + @Override + public TreeSet<T> merge(Collection<TreeSet<T>> objs, TreeSet<T> unflushed, Object... others) { + // TODO Auto-generated method stub + TreeSet<T> temp = new TreeSet<T>(comparator); + if (unflushed != null) { + temp.addAll(unflushed); + } + + for (TreeSet<T> set : objs) { + temp.addAll(set); + } + + if (temp.size() <= n) { + return temp; + } + + TreeSet<T> ret = new TreeSet<T>(comparator); + int i = 0; + for (T item : temp) { + if (i < n) { + ret.add(item); + i++; + } else { + break; + } + } + return ret; + } + + } + + public static class SetToList<T> implements Convertor<TreeSet<T>, List<T>> { + private static final long serialVersionUID = 4968816655779625255L; + + @Override + public List<T> convert(TreeSet<T> set) { + // TODO Auto-generated method stub + List<T> ret = new ArrayList<T>(); + if (set != null) { + for (T item : set) { + ret.add(item); + } + } + return ret; + } + + } + +} http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/operator/Sampling.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/operator/Sampling.java b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/operator/Sampling.java new file mode 100644 index 0000000..8f3053b --- /dev/null +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/operator/Sampling.java @@ -0,0 +1,38 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.jstorm.common.metric.old.operator; + +import java.io.Serializable; + +public interface Sampling<V> extends Serializable { + + /** + * Update object into Metric + * + * @param obj + */ + void update(Number obj); + + /** + * + * Get snapshot of Metric + * + * @return + */ + V getSnapshot(); +} http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/operator/StartTime.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/operator/StartTime.java b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/operator/StartTime.java new file mode 100644 index 0000000..f87ae5a --- /dev/null +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/operator/StartTime.java @@ -0,0 +1,22 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.jstorm.common.metric.old.operator; + +public interface StartTime { + long getStartTime(); +} http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/operator/convert/AtomicLongToLong.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/operator/convert/AtomicLongToLong.java b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/operator/convert/AtomicLongToLong.java new file mode 100644 index 0000000..c9a8b24 --- /dev/null +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/operator/convert/AtomicLongToLong.java @@ -0,0 +1,35 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.jstorm.common.metric.old.operator.convert; + +import java.util.concurrent.atomic.AtomicLong; + +public class AtomicLongToLong implements Convertor<AtomicLong, Long> { + private static final long serialVersionUID = -2755066621494409063L; + + @Override + public Long convert(AtomicLong obj) { + // TODO Auto-generated method stub + if (obj == null) { + return null; + } else { + return obj.get(); + } + } + +} http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/operator/convert/Convertor.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/operator/convert/Convertor.java b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/operator/convert/Convertor.java new file mode 100644 index 0000000..713c1df --- /dev/null +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/operator/convert/Convertor.java @@ -0,0 +1,25 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.jstorm.common.metric.old.operator.convert; + +import java.io.Serializable; + +public interface Convertor<From, To> extends Serializable { + + To convert(From obj); +} http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/operator/convert/DefaultConvertor.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/operator/convert/DefaultConvertor.java b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/operator/convert/DefaultConvertor.java new file mode 100644 index 0000000..2cad206 --- /dev/null +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/operator/convert/DefaultConvertor.java @@ -0,0 +1,29 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.jstorm.common.metric.old.operator.convert; + +public class DefaultConvertor<T> implements Convertor<T, T> { + private static final long serialVersionUID = -647209923903679727L; + + @Override + public T convert(T obj) { + // TODO Auto-generated method stub + return obj; + } + +} http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/operator/convert/SetToList.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/operator/convert/SetToList.java b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/operator/convert/SetToList.java new file mode 100644 index 0000000..2569387 --- /dev/null +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/operator/convert/SetToList.java @@ -0,0 +1,39 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.jstorm.common.metric.old.operator.convert; + +import java.util.ArrayList; +import java.util.List; +import java.util.Set; + +public class SetToList<T> implements Convertor<Set<T>, List<T>> { + private static final long serialVersionUID = 4968816655779625255L; + + @Override + public List<T> convert(Set<T> set) { + // TODO Auto-generated method stub + List<T> ret = new ArrayList<T>(); + if (set != null) { + for (T item : set) { + ret.add(item); + } + } + return ret; + } + +} http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/operator/merger/AvgMerger.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/operator/merger/AvgMerger.java b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/operator/merger/AvgMerger.java new file mode 100644 index 0000000..815bb33 --- /dev/null +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/operator/merger/AvgMerger.java @@ -0,0 +1,49 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.jstorm.common.metric.old.operator.merger; + +import com.alibaba.jstorm.common.metric.old.Histogram; + +import java.util.Collection; + +public class AvgMerger implements Merger<Histogram.HistorgramPair> { + private static final long serialVersionUID = -3892281208959055221L; + + @Override + public Histogram.HistorgramPair merge(Collection<Histogram.HistorgramPair> objs, Histogram.HistorgramPair unflushed, Object... others) { + // TODO Auto-generated method stub + double sum = 0.0d; + long times = 0l; + + if (unflushed != null) { + sum = sum + unflushed.getSum(); + times = times + unflushed.getTimes(); + } + + for (Histogram.HistorgramPair item : objs) { + if (item == null) { + continue; + } + sum = sum + item.getSum(); + times = times + item.getTimes(); + } + + return new Histogram.HistorgramPair(sum, times); + } + +} http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/operator/merger/LongSumMerger.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/operator/merger/LongSumMerger.java b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/operator/merger/LongSumMerger.java new file mode 100644 index 0000000..1151718 --- /dev/null +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/common/metric/old/operator/merger/LongSumMerger.java @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alibaba.jstorm.common.metric.old.operator.merger; + +import java.util.Collection; +import java.util.concurrent.atomic.AtomicLong; + +public class LongSumMerger implements Merger<AtomicLong> { + private static final long serialVersionUID = -3500779273677666691L; + + @Override + public AtomicLong merge(Collection<AtomicLong> objs, AtomicLong unflushed, Object... others) { + AtomicLong ret = new AtomicLong(0); + if (unflushed != null) { + ret.addAndGet(unflushed.get()); + } + + for (AtomicLong item : objs) { + if (item == null) { + continue; + } + ret.addAndGet(item.get()); + } + return ret; + } + +}
