CHUKWA-767. Implemented low pass filter for Charting REST API. (Eric Yang)
Project: http://git-wip-us.apache.org/repos/asf/chukwa/repo Commit: http://git-wip-us.apache.org/repos/asf/chukwa/commit/f3dfc942 Tree: http://git-wip-us.apache.org/repos/asf/chukwa/tree/f3dfc942 Diff: http://git-wip-us.apache.org/repos/asf/chukwa/diff/f3dfc942 Branch: refs/heads/master Commit: f3dfc942025d4f476815df85091d35559e13f72a Parents: 068b454 Author: Eric Yang <[email protected]> Authored: Wed Jun 24 14:46:11 2015 -0700 Committer: Eric Yang <[email protected]> Committed: Wed Jun 24 14:46:11 2015 -0700 ---------------------------------------------------------------------- CHANGES.txt | 2 + .../chukwa/datastore/ChukwaHBaseStore.java | 76 ++++++++++++++++---- 2 files changed, 63 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/chukwa/blob/f3dfc942/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 8ea4d46..633a7c9 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -18,6 +18,8 @@ Trunk (unreleased changes) IMPROVEMENTS + CHUKWA-767. Implemented low pass filter for Charting REST API. (Eric Yang) + CHUKWA-765. Minor stylesheets clean up. (Eric Yang) CHUKWA-759. Configuration for Chukwa to monitor HBase. (Eric Yang) http://git-wip-us.apache.org/repos/asf/chukwa/blob/f3dfc942/src/main/java/org/apache/hadoop/chukwa/datastore/ChukwaHBaseStore.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/hadoop/chukwa/datastore/ChukwaHBaseStore.java b/src/main/java/org/apache/hadoop/chukwa/datastore/ChukwaHBaseStore.java index cade66a..2bef452 100644 --- a/src/main/java/org/apache/hadoop/chukwa/datastore/ChukwaHBaseStore.java +++ b/src/main/java/org/apache/hadoop/chukwa/datastore/ChukwaHBaseStore.java @@ -63,11 +63,17 @@ import org.apache.hadoop.hbase.util.Bytes; import org.apache.log4j.Logger; import org.json.simple.JSONObject; import org.json.simple.JSONValue; +import org.mortbay.log.Log; import com.google.gson.Gson; public class ChukwaHBaseStore { static Logger LOG = Logger.getLogger(ChukwaHBaseStore.class); + static int MINUTES_IN_HOUR = 60; + static double RESOLUTION = 360; + static int MINUTE = 60000; //60 milliseconds + static int SECOND = 1000; + static byte[] COLUMN_FAMILY = "t".getBytes(); static byte[] ANNOTATION_FAMILY = "a".getBytes(); static byte[] KEY_NAMES = "k".getBytes(); @@ -139,6 +145,7 @@ public class ChukwaHBaseStore { startTime = endTime; endTime = temp; } + getHBaseConnection(); Table table = connection.getTable(TableName.valueOf(CHUKWA)); Scan scan = new Scan(); @@ -150,10 +157,7 @@ public class ChukwaHBaseStore { long currentDay = startTime; for (int i = startDay; i <= endDay; i++) { byte[] rowKey = HBaseUtil.buildKey(currentDay, metric, source); - // ColumnRangeFilter crf = new - // ColumnRangeFilter(Long.valueOf(startTime).toString().getBytes(), - // true, Long.valueOf(endTime).toString().getBytes(), true); - // scan.setFilter(crf); + scan.addFamily(COLUMN_FAMILY); scan.setStartRow(rowKey); scan.setStopRow(rowKey); @@ -162,8 +166,7 @@ public class ChukwaHBaseStore { ResultScanner results = table.getScanner(scan); Iterator<Result> it = results.iterator(); - // TODO: Apply discrete wavelet transformation to limit the output - // size to 1000 data points for graphing optimization. (i.e jwave) + while (it.hasNext()) { Result result = it.next(); for (KeyValue kv : result.raw()) { @@ -497,6 +500,23 @@ public class ChukwaHBaseStore { startTime = endTime; endTime = temp; } + // Figure out the time range and determine the best resolution + // to fetch the data + long range = Math.round((endTime - startTime) + / (MINUTES_IN_HOUR * MINUTE)); + long sampleRate = 1; + if (range <= 1) { + sampleRate = 5; + } else if(range <= 24) { + sampleRate = 240; + } else if (range <= 720) { + sampleRate = 7200; + } else if(range >= 720) { + sampleRate = 87600; + } + double smoothing = (endTime - startTime) + / (sampleRate * SECOND ) / RESOLUTION; + getHBaseConnection(); Table table = connection.getTable(TableName.valueOf(CHUKWA)); Scan scan = new Scan(); @@ -522,8 +542,11 @@ public class ChukwaHBaseStore { ResultScanner results = table.getScanner(scan); Iterator<Result> it = results.iterator(); - // TODO: Apply discrete wavelet transformation to limit the output - // size to 1000 data points for graphing optimization. (i.e jwave) + double filteredValue = 0.0d; + long lastTime = startTime; + long totalElapsedTime = 0; + int initial = 0; + while (it.hasNext()) { Result result = it.next(); for (KeyValue kv : result.raw()) { @@ -531,11 +554,34 @@ public class ChukwaHBaseStore { long timestamp = ByteBuffer.wrap(key).getLong(); double value = Double.parseDouble(new String(kv.getValue(), "UTF-8")); - ArrayList<Number> points = new ArrayList<Number>(); - points.add(timestamp); - points.add(value); - data.add(points); + if(initial==0) { + filteredValue = value; + } + long elapsedTime = (timestamp - lastTime) / SECOND; + lastTime = timestamp; + // Determine if there is any gap, if there is gap in data, reset + // calculation. + if (elapsedTime > sampleRate) { + filteredValue = 0.0d; + } else { + if (smoothing != 0.0d) { + // Apply low pass filter to calculate + filteredValue = filteredValue + (double) ((double) elapsedTime * (double) ((double) (value - filteredValue) / smoothing)); + } else { + // Use original value + filteredValue = value; + } + } + totalElapsedTime = totalElapsedTime + elapsedTime; + if (totalElapsedTime >= sampleRate) { + ArrayList<Number> points = new ArrayList<Number>(); + points.add(timestamp); + points.add(filteredValue); + data.add(points); + totalElapsedTime = 0; + } } + initial++; } results.close(); currentDay = currentDay + (i * MILLISECONDS_IN_DAY); @@ -748,15 +794,15 @@ public class ChukwaHBaseStore { String[] metrics = { "SystemMetrics.LoadAverage.1" }; createChart("1", "", "System Load Average", metrics, hostname); String[] cpuMetrics = { "SystemMetrics.cpu.combined", "SystemMetrics.cpu.sys", "SystemMetrics.cpu.user" }; - createChart("2", "%", "CPU Utilization", cpuMetrics, hostname); + createChart("2", "percent", "CPU Utilization", cpuMetrics, hostname); String[] memMetrics = { "SystemMetrics.memory.FreePercent", "SystemMetrics.memory.UsedPercent"}; - createChart("3", "%", "Memory Utilization", memMetrics, hostname); + createChart("3", "percent", "Memory Utilization", memMetrics, hostname); String[] diskMetrics = { "SystemMetrics.disk.ReadBytes", "SystemMetrics.disk.WriteBytes" }; createChart("4", "bytes-decimal", "Disk Utilization", diskMetrics, hostname); String[] netMetrics = { "SystemMetrics.network.TxBytes", "SystemMetrics.network.RxBytes" }; createChart("5", "bytes", "Network Utilization", netMetrics, hostname); String[] swapMetrics = { "SystemMetrics.swap.Total", "SystemMetrics.swap.Used", "SystemMetrics.swap.Free" }; - createChart("6", "", "Swap Utilization", swapMetrics, hostname); + createChart("6", "bytes-decimal", "Swap Utilization", swapMetrics, hostname); // Populate default widgets Widget widget = new Widget();
