CHUKWA-747. Update Widget API to store data in HBase. (Eric Yang)
Project: http://git-wip-us.apache.org/repos/asf/chukwa/repo Commit: http://git-wip-us.apache.org/repos/asf/chukwa/commit/4134fc32 Tree: http://git-wip-us.apache.org/repos/asf/chukwa/tree/4134fc32 Diff: http://git-wip-us.apache.org/repos/asf/chukwa/diff/4134fc32 Branch: refs/heads/master Commit: 4134fc32c095e83570bd36ec571b2f1904348149 Parents: 1c4ed7b Author: Eric Yang <[email protected]> Authored: Sun May 10 16:30:22 2015 -0700 Committer: Eric Yang <[email protected]> Committed: Sun May 10 16:30:22 2015 -0700 ---------------------------------------------------------------------- .../chukwa/datastore/ChukwaHBaseStore.java | 330 +++++++++++++++++-- 1 file changed, 295 insertions(+), 35 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/chukwa/blob/4134fc32/src/main/java/org/apache/hadoop/chukwa/datastore/ChukwaHBaseStore.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/hadoop/chukwa/datastore/ChukwaHBaseStore.java b/src/main/java/org/apache/hadoop/chukwa/datastore/ChukwaHBaseStore.java index 7494aa8..476bd79 100644 --- a/src/main/java/org/apache/hadoop/chukwa/datastore/ChukwaHBaseStore.java +++ b/src/main/java/org/apache/hadoop/chukwa/datastore/ChukwaHBaseStore.java @@ -18,44 +18,46 @@ package org.apache.hadoop.chukwa.datastore; import java.io.IOException; +import java.net.InetAddress; +import java.net.URI; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Calendar; -import java.util.Collection; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; import java.util.List; -import java.util.Map.Entry; -import java.util.NavigableMap; import java.util.Set; import java.util.TimeZone; import java.util.UUID; import java.util.concurrent.CopyOnWriteArraySet; import java.util.concurrent.TimeUnit; -import java.util.regex.Matcher; -import java.util.regex.Pattern; -import org.apache.hadoop.chukwa.hicc.Chart; +import org.apache.hadoop.chukwa.hicc.bean.Chart; import org.apache.hadoop.chukwa.hicc.bean.HeatMapPoint; import org.apache.hadoop.chukwa.hicc.bean.Heatmap; +import org.apache.hadoop.chukwa.hicc.bean.LineOptions; import org.apache.hadoop.chukwa.hicc.bean.Series; +import org.apache.hadoop.chukwa.hicc.bean.SeriesMetaData; +import org.apache.hadoop.chukwa.hicc.bean.Widget; import org.apache.hadoop.chukwa.util.ExceptionUtil; import org.apache.hadoop.chukwa.util.HBaseUtil; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; -import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.filter.ColumnPrefixFilter; +import org.apache.hadoop.hbase.filter.Filter; +import org.apache.hadoop.hbase.util.Bytes; import org.apache.log4j.Logger; import org.json.simple.JSONObject; import org.json.simple.JSONValue; @@ -63,25 +65,30 @@ import org.json.simple.JSONValue; import com.google.gson.Gson; public class ChukwaHBaseStore { - private static Configuration hconf = HBaseConfiguration.create(); static Logger LOG = Logger.getLogger(ChukwaHBaseStore.class); static byte[] COLUMN_FAMILY = "t".getBytes(); static byte[] ANNOTATION_FAMILY = "a".getBytes(); static byte[] KEY_NAMES = "k".getBytes(); static byte[] CHART_TYPE = "chart_meta".getBytes(); static byte[] CHART_FAMILY = "c".getBytes(); + static byte[] COMMON_FAMILY = "c".getBytes(); + static byte[] WIDGET_TYPE = "widget_meta".getBytes(); private static final String CHUKWA = "chukwa"; private static final String CHUKWA_META = "chukwa_meta"; private static long MILLISECONDS_IN_DAY = 86400000L; - private static Connection connection = null; + protected static Connection connection = null; - public static void getHBaseConnection() throws IOException { + public ChukwaHBaseStore() { + super(); + } + + public static synchronized void getHBaseConnection() throws IOException { if (connection == null || connection.isClosed()) { - connection = ConnectionFactory.createConnection(hconf); + connection = ConnectionFactory.createConnection(); } } - public static void closeHBase() { + public static synchronized void closeHBase() { try { if(connection != null) { connection.close(); @@ -236,6 +243,7 @@ public class ChukwaHBaseStore { } } } + rs.close(); table.close(); } catch (Exception e) { closeHBase(); @@ -354,6 +362,12 @@ public class ChukwaHBaseStore { return clusters; } + /** + * Get a chart from HBase by ID. + * + * @param id + * @return + */ public static Chart getChart(String id) { Chart chart = null; try { @@ -374,6 +388,12 @@ public class ChukwaHBaseStore { return chart; } + /** + * Update a chart in HBase by ID. + * + * @param id + * @param chart + */ public static void putChart(String id, Chart chart) { try { getHBaseConnection(); @@ -391,33 +411,54 @@ public class ChukwaHBaseStore { } - public static String createChart(Chart chart) throws IOException { - getHBaseConnection(); + /** + * Create a chart in HBase. + * + * @param chart + * @return id of newly created chart + * @throws IOException + */ + public static synchronized String createChart(Chart chart) { String id = chart.getId(); - if(id!=null) { - // Check if there is existing chart with same id. - Chart test = getChart(id); - if(test!=null) { - // If id already exists, randomly generate an id. + try { + getHBaseConnection(); + if (id != null) { + // Check if there is existing chart with same id. + Chart test = getChart(id); + if (test != null) { + // If id already exists, randomly generate an id. + id = String.valueOf(UUID.randomUUID()); + } + } else { + // If id is not provided, randomly generate an id. id = String.valueOf(UUID.randomUUID()); } - } else { - // If id is not provided, randomly generate an id. - id = String.valueOf(UUID.randomUUID()); + chart.setId(id); + Table table = connection.getTable(TableName.valueOf(CHUKWA_META)); + Put put = new Put(CHART_TYPE); + Gson gson = new Gson(); + String buffer = gson.toJson(chart); + put.add(CHART_FAMILY, id.getBytes(), buffer.getBytes()); + table.put(put); + table.close(); + } catch (Exception e) { + closeHBase(); + LOG.error(ExceptionUtil.getStackTrace(e)); + id = null; } - chart.setId(id); - Table table = connection.getTable(TableName.valueOf(CHUKWA_META)); - Put put = new Put(CHART_TYPE); - Gson gson = new Gson(); - String buffer = gson.toJson(chart); - put.add(CHART_FAMILY, id.getBytes(), buffer.getBytes()); - table.put(put); - table.close(); return id; } - public static synchronized ArrayList<org.apache.hadoop.chukwa.hicc.Series> getChartSeries(ArrayList<org.apache.hadoop.chukwa.hicc.Series> series, long startTime, long endTime) { - ArrayList<org.apache.hadoop.chukwa.hicc.Series> list = new ArrayList<org.apache.hadoop.chukwa.hicc.Series>(); + /** + * Return data for multiple series of metrics stored in HBase. + * + * @param series + * @param startTime + * @param endTime + * @return + */ + public static synchronized ArrayList<org.apache.hadoop.chukwa.hicc.bean.SeriesMetaData> getChartSeries(ArrayList<org.apache.hadoop.chukwa.hicc.bean.SeriesMetaData> series, long startTime, long endTime) { + ArrayList<org.apache.hadoop.chukwa.hicc.bean.SeriesMetaData> list = new ArrayList<org.apache.hadoop.chukwa.hicc.bean.SeriesMetaData>(); try { // Swap start and end if the values are inverted. if (startTime > endTime) { @@ -433,8 +474,8 @@ public class ChukwaHBaseStore { int startDay = c.get(Calendar.DAY_OF_YEAR); c.setTimeInMillis(endTime); int endDay = c.get(Calendar.DAY_OF_YEAR); - for (org.apache.hadoop.chukwa.hicc.Series s : series) { - org.apache.hadoop.chukwa.hicc.Series clone = (org.apache.hadoop.chukwa.hicc.Series) s.clone(); + for (org.apache.hadoop.chukwa.hicc.bean.SeriesMetaData s : series) { + org.apache.hadoop.chukwa.hicc.bean.SeriesMetaData clone = (org.apache.hadoop.chukwa.hicc.bean.SeriesMetaData) s.clone(); long currentDay = startTime; String[] parts = s.getUrl().toString().split("/"); String metric = parts[5]; @@ -479,4 +520,223 @@ public class ChukwaHBaseStore { return list; } + /** + * List widgets stored in HBase. + * + * @param limit + * @param offset + * @return + */ + public static synchronized List<Widget> listWidget(int limit, int offset) { + ArrayList<Widget> list = new ArrayList<Widget>(); + try { + getHBaseConnection(); + Table table = connection.getTable(TableName.valueOf(CHUKWA_META)); + Scan scan = new Scan(); + scan.setStartRow(WIDGET_TYPE); + scan.setStopRow(WIDGET_TYPE); + ResultScanner rs = table.getScanner(scan); + Iterator<Result> it = rs.iterator(); + int c = 0; + while(it.hasNext()) { + Result result = it.next(); + for(KeyValue kv : result.raw()) { + if(c > limit) { + break; + } + if(c < offset) { + continue; + } + Gson gson = new Gson(); + Widget widget = gson.fromJson(new String(kv.getValue(), "UTF-8"), Widget.class); + list.add(widget); + c++; + } + } + rs.close(); + table.close(); + } catch (Exception e) { + closeHBase(); + LOG.error(ExceptionUtil.getStackTrace(e)); + } + return list; + } + + /** + * Find widget by title prefix in HBase. + * + * @param query - Prefix query of widget title. + * @return + */ + public static synchronized List<Widget> searchWidget(String query) { + ArrayList<Widget> list = new ArrayList<Widget>(); + try { + getHBaseConnection(); + Table table = connection.getTable(TableName.valueOf(CHUKWA_META)); + Filter filter = new ColumnPrefixFilter(Bytes.toBytes(query)); + Scan scan = new Scan(); + scan.setStartRow(WIDGET_TYPE); + scan.setStopRow(WIDGET_TYPE); + scan.setFilter(filter); + ResultScanner rs = table.getScanner(scan); + Iterator<Result> it = rs.iterator(); + while(it.hasNext()) { + Result result = it.next(); + for(KeyValue kv : result.raw()) { + Gson gson = new Gson(); + Widget widget = gson.fromJson(new String(kv.getValue(), "UTF-8"), Widget.class); + list.add(widget); + } + } + rs.close(); + table.close(); + } catch (Exception e) { + closeHBase(); + LOG.error(ExceptionUtil.getStackTrace(e)); + } + return list; + } + + /** + * View a widget information in HBase. + * + * @param title - Title of the widget. + * @return + */ + public static synchronized Widget viewWidget(String title) { + Widget w = null; + try { + getHBaseConnection(); + Table table = connection.getTable(TableName.valueOf(CHUKWA_META)); + Get widget = new Get(WIDGET_TYPE); + widget.addColumn(COMMON_FAMILY, title.getBytes()); + Result rs = table.get(widget); + byte[] buffer = rs.getValue(COMMON_FAMILY, title.getBytes()); + Gson gson = new Gson(); + w = gson.fromJson(new String(buffer), Widget.class); + table.close(); + } catch (Exception e) { + closeHBase(); + LOG.error(ExceptionUtil.getStackTrace(e)); + } + return w; + } + + /** + * Create a widget in HBase. + * + * @param widget + */ + public static synchronized boolean createWidget(Widget widget) { + boolean created = false; + try { + widget.tokenize(); + getHBaseConnection(); + Table table = connection.getTable(TableName.valueOf(CHUKWA_META)); + Get widgetTest = new Get(WIDGET_TYPE); + widgetTest.addColumn(COMMON_FAMILY, widget.getTitle().getBytes()); + if (table.exists(widgetTest)) { + LOG.warn("Widget: " + widget.getTitle() + " already exists."); + created = false; + } else { + Put put = new Put(WIDGET_TYPE); + Gson gson = new Gson(); + String buffer = gson.toJson(widget); + put.add(COMMON_FAMILY, widget.getTitle().getBytes(), buffer.getBytes()); + table.put(put); + created = true; + } + table.close(); + } catch (Exception e) { + closeHBase(); + LOG.error(ExceptionUtil.getStackTrace(e)); + } + return created; + } + + /** + * Update a widget in HBase. + * + * @param title + * @param widget + * @throws IOException + */ + public static synchronized boolean updateWidget(String title, Widget widget) { + boolean result = false; + try { + getHBaseConnection(); + Table table = connection.getTable(TableName.valueOf(CHUKWA_META)); + Delete oldWidget = new Delete(WIDGET_TYPE); + oldWidget.addColumn(COMMON_FAMILY, title.getBytes()); + table.delete(oldWidget); + Put put = new Put(WIDGET_TYPE); + Gson gson = new Gson(); + String buffer = gson.toJson(widget); + put.add(COMMON_FAMILY, title.getBytes(), buffer.getBytes()); + table.put(put); + table.close(); + result = true; + } catch (Exception e) { + closeHBase(); + LOG.error(ExceptionUtil.getStackTrace(e)); + LOG.error("Error in updating widget, original title: " + + title + " new title:" + widget.getTitle()); + } + return result; + } + + /** + * Delete a widget in HBase. + * + * @param title + * @param widget + * @throws IOException + */ + public static synchronized boolean deleteWidget(String title) { + boolean result = false; + try { + getHBaseConnection(); + Table table = connection.getTable(TableName.valueOf(CHUKWA_META)); + Delete oldWidget = new Delete(WIDGET_TYPE); + oldWidget.addColumn(COMMON_FAMILY, title.getBytes()); + table.delete(oldWidget); + table.close(); + result = true; + } catch (Exception e) { + closeHBase(); + LOG.error(ExceptionUtil.getStackTrace(e)); + LOG.error("Error in deleting widget: "+ title); + } + return result; + } + + public static void populateDefaults() { + try { + String hostname = InetAddress.getLocalHost().getHostName(); + // Populate default widgets + Widget widget = new Widget(); + widget.setTitle("System Load Average"); + widget.setUrl(new URI("/hicc/v1/chart/draw/1")); + createWidget(widget); + + // Populate example chart widgets + Chart chart = new Chart("1"); + chart.setYUnitType(""); + chart.setTitle("Load Average"); + ArrayList<SeriesMetaData> series = new ArrayList<SeriesMetaData>(); + + SeriesMetaData s = new SeriesMetaData(); + s.setLabel("SystemMetrics.LoadAverage.1/" + hostname); + s.setUrl(new URI("/hicc/v1/metrics/series/SystemMetrics.LoadAverage.1/" + + hostname)); + LineOptions l = new LineOptions(); + s.setLineOptions(l); + series.add(s); + + chart.SetSeries(series); + createChart(chart); + } catch (Throwable ex) { + LOG.error(ExceptionUtil.getStackTrace(ex)); + } + } }
