CHUKWA-667. Optimize HBase metrics schema. (Eric Yang)
Project: http://git-wip-us.apache.org/repos/asf/chukwa/repo Commit: http://git-wip-us.apache.org/repos/asf/chukwa/commit/7ae68398 Tree: http://git-wip-us.apache.org/repos/asf/chukwa/tree/7ae68398 Diff: http://git-wip-us.apache.org/repos/asf/chukwa/diff/7ae68398 Branch: refs/heads/master Commit: 7ae68398626b33ac5fc06caf83573e992b766af0 Parents: fb022bf Author: Eric Yang <[email protected]> Authored: Sun Apr 12 00:16:58 2015 -0700 Committer: Eric Yang <[email protected]> Committed: Sun Apr 12 00:16:58 2015 -0700 ---------------------------------------------------------------------- CHANGES.txt | 2 + conf/chukwa-demux-conf.xml | 11 +- conf/hbase.schema | 64 +-- .../writer/hbase/HBaseWriter.java | 141 +---- .../writer/hbase/OutputCollector.java | 73 --- .../datacollection/writer/hbase/Reporter.java | 105 ++-- .../chukwa/datastore/ChukwaHBaseStore.java | 510 +++++++++---------- .../demux/processor/mapper/DFInvalidRecord.java | 44 -- .../extraction/demux/processor/mapper/Df.java | 118 ----- .../mapper/HadoopMetricsProcessor.java | 17 +- .../demux/processor/mapper/Iostat.java | 146 ------ .../demux/processor/mapper/JobSummary.java | 2 +- .../mapper/Log4JMetricsContextProcessor.java | 2 +- .../demux/processor/mapper/PbsInvalidEntry.java | 44 -- .../demux/processor/mapper/PbsNodes.java | 198 ------- .../extraction/demux/processor/mapper/Ps.java | 144 ------ .../extraction/demux/processor/mapper/Sar.java | 172 ------- .../extraction/demux/processor/mapper/Top.java | 171 ------- .../demux/processor/mapper/Torque.java | 93 ---- .../demux/processor/mapper/YWatch.java | 121 ----- .../processor/mapper/YwatchInvalidEntry.java | 43 -- .../chukwa/hicc/rest/HeatmapController.java | 61 ++- .../chukwa/hicc/rest/MetricsController.java | 112 +--- .../inputtools/log4j/Log4JMetricsContext.java | 2 +- src/main/web/hicc/jsp/graph_explorer.jsp | 28 +- .../web/hicc/jsp/host_selector_dropdown.jsp | 2 +- .../TestLog4JMetricsContextChukwaRecord.java | 16 +- .../demux/processor/mapper/TestPsOutput.java | 53 -- 28 files changed, 420 insertions(+), 2075 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/chukwa/blob/7ae68398/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 21b9f54..7acc0f8 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -10,6 +10,8 @@ Trunk (unreleased changes) IMPROVEMENTS + CHUKWA-667. Optimize HBase metrics schema. (Eric Yang) + CHUKWA-741. Updated to Hadoop 2.6.0 and HBase 1.0.0. (Eric Yang) CHUKWA-740. Updated README file to be more current. (Lewis John McGibbney via Eric Yang) http://git-wip-us.apache.org/repos/asf/chukwa/blob/7ae68398/conf/chukwa-demux-conf.xml ---------------------------------------------------------------------- diff --git a/conf/chukwa-demux-conf.xml b/conf/chukwa-demux-conf.xml index 62aa04f..151bfa5 100644 --- a/conf/chukwa-demux-conf.xml +++ b/conf/chukwa-demux-conf.xml @@ -211,13 +211,13 @@ <property> <name>ChukwaMetrics</name> - <value>org.apache.hadoop.chukwa.extraction.demux.processor.mapper.ChukwaMetricsProcessor</value> + <value>org.apache.hadoop.chukwa.extraction.hbase.ChukwaMetricsProcessor</value> <description></description> </property> <property> <name>SystemMetrics</name> - <value>org.apache.hadoop.chukwa.extraction.demux.processor.mapper.SystemMetrics</value> + <value>org.apache.hadoop.chukwa.extraction.hbase.SystemMetrics</value> <description></description> </property> @@ -269,13 +269,6 @@ <description> Reducer class for Reduce Type MRJobReduceProcessor </description> </property> - <!-- Demux configs for both mapper and reducer --> - <property> - <name>SystemMetrics</name> - <value>org.apache.hadoop.chukwa.extraction.demux.processor.mapper.SystemMetrics,org.apache.hadoop.chukwa.extraction.demux.processor.reducer.SystemMetrics</value> - <description> Reducer class for Reduce Type SystemMetrics </description> - </property> - <property> <name>ClientTrace</name> <value>org.apache.hadoop.chukwa.extraction.demux.processor.mapper.ClientTraceProcessor,org.apache.hadoop.chukwa.extraction.demux.processor.reducer.ClientTrace</value> http://git-wip-us.apache.org/repos/asf/chukwa/blob/7ae68398/conf/hbase.schema ---------------------------------------------------------------------- diff --git a/conf/hbase.schema b/conf/hbase.schema index 7015725..5e32f90 100644 --- a/conf/hbase.schema +++ b/conf/hbase.schema @@ -1,61 +1,5 @@ -create "Hadoop", -{NAME => "ClientTrace", VERSIONS => 65535}, -{NAME => "dfs_namenode", VERSIONS => 65535}, -{NAME => "dfs_FSNamesystem", VERSIONS => 65535}, -{NAME => "dfs_datanode", VERSIONS => 65535}, -{NAME => "mapred_jobtracker", VERSIONS => 65535}, -{NAME => "mapred_shuffleOutput", VERSIONS => 65535}, -{NAME => "mapred_tasktracker", VERSIONS => 65535}, -{NAME => "jvm_metrics", VERSIONS => 65535}, -{NAME => "mapred_Queue", VERSIONS => 65535}, -{NAME => "metricssystem_MetricsSystem", VERSIONS => 65535}, -{NAME => "rpc_rpc", VERSIONS => 65535}, -{NAME => "rpcdetailed_rpcdetailed", VERSIONS => 65535}, -{NAME => "ugi_ugi", VERSIONS => 65535} -create "HadoopLog", -{NAME => "NameNode", VERSIONS => 65535}, -{NAME => "Audit", VERSIONS => 65535} -create "Jobs", -{NAME => "summary" } -create "SystemMetrics", -{NAME => "cpu", VERSIONS => 65535}, -{NAME => "system", VERSIONS => 65535}, -{NAME => "disk", VERSIONS => 65535}, -{NAME => "memory", VERSIONS => 65535}, -{NAME => "swap", VERSIONS => 65535}, -{NAME => "network", VERSIONS => 65535}, -{NAME => "tags", VERSIONS => 65535} -create "ClusterSummary", -{NAME=> "cpu", VERSIONS => 65535}, -{NAME => "system", VERSIONS => 65535}, -{NAME => "disk", VERSIONS => 65535}, -{NAME => "memory", VERSIONS => 65535}, -{NAME => "network", VERSIONS => 65535}, -{NAME => "swap", VERSIONS => 65535}, -{NAME => "hdfs", VERSIONS => 65535}, -{NAME => "mapreduce", VERSIONS => 65535} +create "chukwa_meta", +{NAME=>"k"} create "chukwa", -{NAME=>"chukwaAgent_chunkQueue", VERSIONS => 65535}, -{NAME => "chukwaAgent_metrics", VERSIONS => 65535}, -{NAME => "chukwaAgent_httpSender", VERSIONS => 65535} -create "HBase", -{NAME => "master", VERSIONS => 65535}, -{NAME => "regionserver", VERSIONS => 65535} -create "Namenode", -{NAME => "summary", VERSIONS => 65535}, -{NAME => "hdfs", VERSIONS => 65535}, -{NAME => "rpc", VERSIONS => 65535}, -{NAME => "jvm", VERSIONS => 65535} -create "Zookeeper", -{NAME => "zk", VERSIONS => 65535} -create "JobTracker", -{NAME => "jt", VERSIONS => 65535}, -{NAME => "jvm", VERSIONS => 65535}, -{NAME => "rpc", VERSIONS => 65535} -create "Datanode", -{NAME => "dn", VERSIONS => 65535}, -{NAME => "jvm", VERSIONS => 65535}, -{NAME => "rpc", VERSIONS => 65535} - - - +{NAME=>"t"}, +{NAME=>"a"} http://git-wip-us.apache.org/repos/asf/chukwa/blob/7ae68398/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/hbase/HBaseWriter.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/hbase/HBaseWriter.java b/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/hbase/HBaseWriter.java index 8344318..34c82e1 100644 --- a/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/hbase/HBaseWriter.java +++ b/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/hbase/HBaseWriter.java @@ -19,49 +19,42 @@ package org.apache.hadoop.chukwa.datacollection.writer.hbase; import java.io.IOException; +import java.security.NoSuchAlgorithmException; +import java.util.ArrayList; import java.util.List; import java.util.Timer; import java.util.TimerTask; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.chukwa.ChukwaArchiveKey; import org.apache.hadoop.chukwa.Chunk; import org.apache.hadoop.chukwa.conf.ChukwaConfiguration; import org.apache.hadoop.chukwa.datacollection.writer.ChukwaWriter; import org.apache.hadoop.chukwa.datacollection.writer.PipelineableWriter; import org.apache.hadoop.chukwa.datacollection.writer.WriterException; -import org.apache.hadoop.chukwa.extraction.demux.processor.mapper.MapProcessor; -import org.apache.hadoop.chukwa.extraction.demux.processor.mapper.MapProcessorFactory; -import org.apache.hadoop.chukwa.extraction.demux.processor.mapper.UnknownRecordTypeException; -import org.apache.hadoop.chukwa.extraction.demux.Demux; -import org.apache.hadoop.chukwa.util.ClassUtils; -import org.apache.hadoop.chukwa.util.DaemonWatcher; +import org.apache.hadoop.chukwa.extraction.hbase.AbstractProcessor; +import org.apache.hadoop.chukwa.extraction.hbase.ProcessorFactory; +import org.apache.hadoop.chukwa.extraction.hbase.UnknownRecordTypeException; import org.apache.hadoop.chukwa.util.ExceptionUtil; import org.apache.hadoop.hbase.HBaseConfiguration; -import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.HTableDescriptor; -import org.apache.hadoop.hbase.ZooKeeperConnectionException; -import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.client.HConnection; import org.apache.hadoop.hbase.client.HConnectionManager; import org.apache.hadoop.hbase.client.HTableInterface; -import org.apache.hadoop.hbase.client.HTablePool; -import org.apache.hadoop.chukwa.datacollection.writer.hbase.Annotation.Table; -import org.apache.hadoop.chukwa.datacollection.writer.hbase.Annotation.Tables; +import org.apache.hadoop.hbase.client.Put; import org.apache.log4j.Logger; public class HBaseWriter extends PipelineableWriter { static Logger log = Logger.getLogger(HBaseWriter.class); + private static final String CHUKWA_TABLE = "chukwa"; + private static final String CHUKWA_META_TABLE = "chukwa_meta"; boolean reportStats; volatile long dataSize = 0; final Timer statTimer; - private OutputCollector output; + private ArrayList<Put> output; private Reporter reporter; private ChukwaConfiguration conf; String defaultProcessor; private HConnection connection; - private Configuration hconf; private class StatReportingTask extends TimerTask { private long lastTs = System.currentTimeMillis(); @@ -98,20 +91,19 @@ public class HBaseWriter extends PipelineableWriter { private HBaseWriter(boolean reportStats, ChukwaConfiguration conf, Configuration hconf) throws IOException { this.reportStats = reportStats; this.conf = conf; - this.hconf = hconf; this.statTimer = new Timer(); this.defaultProcessor = conf.get( "chukwa.demux.mapper.default.processor", "org.apache.hadoop.chukwa.extraction.demux.processor.mapper.DefaultProcessor"); - Demux.jobConf = conf; log.info("hbase.zookeeper.quorum: " + hconf.get(HConstants.ZOOKEEPER_QUORUM) + ":" + hconf.get(HConstants.ZOOKEEPER_CLIENT_PORT)); if (reportStats) { statTimer.schedule(new StatReportingTask(), 1000, 10 * 1000); } - output = new OutputCollector(); - reporter = new Reporter(); - if(conf.getBoolean("hbase.writer.verify.schema", false)) { - verifyHbaseSchema(); + output = new ArrayList<Put>(); + try { + reporter = new Reporter(); + } catch (NoSuchAlgorithmException e) { + throw new IOException("Can not register hashing algorithm."); } connection = HConnectionManager.createConnection(hconf); } @@ -125,83 +117,21 @@ public class HBaseWriter extends PipelineableWriter { public void init(Configuration conf) throws WriterException { } - private boolean verifyHbaseTable(HBaseAdmin admin, Table table) { - boolean status = false; - try { - if(admin.tableExists(table.name())) { - HTableDescriptor descriptor = admin.getTableDescriptor(table.name().getBytes()); - HColumnDescriptor[] columnDescriptors = descriptor.getColumnFamilies(); - for(HColumnDescriptor cd : columnDescriptors) { - if(cd.getNameAsString().equals(table.columnFamily())) { - log.info("Verified schema - table: "+table.name()+" column family: "+table.columnFamily()); - status = true; - } - } - } else { - throw new Exception("HBase table: "+table.name()+ " does not exist."); - } - } catch(Exception e) { - log.error(ExceptionUtil.getStackTrace(e)); - status = false; - } - return status; - } - - private void verifyHbaseSchema() { - log.debug("Verify Demux parser with HBase schema"); - boolean schemaVerified = true; - try { - HBaseAdmin admin = new HBaseAdmin(hconf); - List<Class> demuxParsers = ClassUtils.getClassesForPackage(conf.get("hbase.demux.package")); - for(Class<?> x : demuxParsers) { - if(x.isAnnotationPresent(Tables.class)) { - Tables list = x.getAnnotation(Tables.class); - for(Table table : list.annotations()) { - if(!verifyHbaseTable(admin, table)) { - schemaVerified = false; - log.warn("Validation failed - table: "+table.name()+" column family: "+table.columnFamily()+" does not exist."); - } - } - } else if(x.isAnnotationPresent(Table.class)) { - Table table = x.getAnnotation(Table.class); - if(!verifyHbaseTable(admin, table)) { - schemaVerified = false; - log.warn("Validation failed - table: "+table.name()+" column family: "+table.columnFamily()+" does not exist."); - } - } - } - } catch (Exception e) { - schemaVerified = false; - log.error(ExceptionUtil.getStackTrace(e)); - } - if(!schemaVerified) { - log.error("Hbase schema mismatch with demux parser."); - if(conf.getBoolean("hbase.writer.halt.on.schema.mismatch", true)) { - log.error("Exiting..."); - DaemonWatcher.bailout(-1); - } - } - } - @Override public CommitStatus add(List<Chunk> chunks) throws WriterException { CommitStatus rv = ChukwaWriter.COMMIT_OK; try { + HTableInterface hbase = connection.getTable(CHUKWA_TABLE); + HTableInterface meta = connection.getTable(CHUKWA_META_TABLE); for(Chunk chunk : chunks) { synchronized (this) { try { - Table table = findHBaseTable(chunk.getDataType()); - - if(table!=null) { - HTableInterface hbase = connection.getTable(table.name()); - MapProcessor processor = getProcessor(chunk.getDataType()); - processor.process(new ChukwaArchiveKey(), chunk, output, reporter); - hbase.put(output.getKeyValues()); - } else { - log.warn("Error finding HBase table for data type:"+chunk.getDataType()); - } - } catch (Exception e) { - log.warn(output.getKeyValues()); + AbstractProcessor processor = getProcessor(chunk.getDataType()); + processor.process(chunk, output, reporter); + hbase.put(output); + meta.put(reporter.getInfo()); + } catch (Throwable e) { + log.warn(output); log.warn(ExceptionUtil.getStackTrace(e)); } dataSize += chunk.getData().length; @@ -209,6 +139,7 @@ public class HBaseWriter extends PipelineableWriter { reporter.clear(); } } + hbase.close(); } catch (Exception e) { log.error(ExceptionUtil.getStackTrace(e)); throw new WriterException("Failed to store data to HBase."); @@ -219,31 +150,9 @@ public class HBaseWriter extends PipelineableWriter { return rv; } - public Table findHBaseTable(String dataType) throws UnknownRecordTypeException { - MapProcessor processor = getProcessor(dataType); - - Table table = null; - if(processor.getClass().isAnnotationPresent(Table.class)) { - return processor.getClass().getAnnotation(Table.class); - } else if(processor.getClass().isAnnotationPresent(Tables.class)) { - Tables tables = processor.getClass().getAnnotation(Tables.class); - for(Table t : tables.annotations()) { - table = t; - } - } - - return table; - } - - public String findHBaseColumnFamilyName(String dataType) - throws UnknownRecordTypeException { - Table table = findHBaseTable(dataType); - return table.columnFamily(); - } - - private MapProcessor getProcessor(String dataType) throws UnknownRecordTypeException { + private AbstractProcessor getProcessor(String dataType) throws UnknownRecordTypeException { String processorClass = findProcessor(conf.get(dataType, defaultProcessor), defaultProcessor); - return MapProcessorFactory.getProcessor(processorClass); + return ProcessorFactory.getProcessor(processorClass); } /** http://git-wip-us.apache.org/repos/asf/chukwa/blob/7ae68398/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/hbase/OutputCollector.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/hbase/OutputCollector.java b/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/hbase/OutputCollector.java deleted file mode 100644 index 352ca14..0000000 --- a/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/hbase/OutputCollector.java +++ /dev/null @@ -1,73 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.chukwa.datacollection.writer.hbase; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; - -import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord; -import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey; -import org.apache.hadoop.hbase.client.Put; - -public class OutputCollector implements - org.apache.hadoop.mapred.OutputCollector<ChukwaRecordKey, ChukwaRecord> { - - private List<Put> buffers; - private StringBuffer s = new StringBuffer(); - private byte[] rowKey = null; - private byte[] cf = null; - private long now = 0L; - - public OutputCollector() { - buffers = new ArrayList<Put>(); - } - - @Override - public void collect(ChukwaRecordKey key, ChukwaRecord value) throws IOException { - String[] keyParts = key.getKey().split("/"); - s.setLength(0); - s.append(keyParts[2]); - s.append("-"); - s.append(keyParts[1]); - - rowKey = s.toString().getBytes(); - - cf = key.getReduceType().getBytes(); - now = value.getTime(); - - Put kv = new Put(rowKey); - for(String field : value.getFields()) { - kv.add(cf, field.getBytes(), now , value.getValue(field).getBytes()); - } - buffers.add(kv); - } - - public List<Put> getKeyValues() { - return buffers; - } - - public void clear() { - s.setLength(0); - rowKey = null; - cf = null; - buffers.clear(); - } - -} http://git-wip-us.apache.org/repos/asf/chukwa/blob/7ae68398/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/hbase/Reporter.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/hbase/Reporter.java b/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/hbase/Reporter.java index ab1ce82..8b1674d 100644 --- a/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/hbase/Reporter.java +++ b/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/hbase/Reporter.java @@ -18,62 +18,91 @@ package org.apache.hadoop.chukwa.datacollection.writer.hbase; -import org.apache.hadoop.mapred.InputSplit; -import org.apache.hadoop.mapred.Counters.Counter; +import java.io.UnsupportedEncodingException; +import java.security.MessageDigest; +import java.security.NoSuchAlgorithmException; +import java.util.ArrayList; +import java.util.List; -public class Reporter implements org.apache.hadoop.mapred.Reporter { +import org.apache.hadoop.hbase.client.Put; +import org.json.simple.JSONObject; +import org.mortbay.log.Log; - @Override - public Counter getCounter(Enum<?> arg0) { - // TODO Auto-generated method stub - return null; +public class Reporter { + private ArrayList<Put> meta = new ArrayList<Put>(); + private MessageDigest md5 = null; + + public Reporter() throws NoSuchAlgorithmException { + md5 = MessageDigest.getInstance("md5"); } - @Override - public Counter getCounter(String arg0, String arg1) { - // TODO Auto-generated method stub - return null; + public void putSource(String type, String source) { + byte[] value = getHash(source); + JSONObject json = new JSONObject(); + + try { + json.put("sig", new String(value, "UTF-8")); + json.put("type", "source"); + } catch (UnsupportedEncodingException e) { + Log.warn("Error encoding metadata."); + Log.warn(e); + } + put(type.getBytes(), source.getBytes(), json.toString().getBytes()); + } - @Override - public InputSplit getInputSplit() throws UnsupportedOperationException { - // TODO Auto-generated method stub - return null; + public void putMetric(String type, String metric) { + String buf = new StringBuilder(type).append(".").append(metric).toString(); + byte[] pk = getHash(buf); + + JSONObject json = new JSONObject(); + try { + json.put("sig", new String(pk, "UTF-8")); + json.put("type", "metric"); + } catch (UnsupportedEncodingException e) { + Log.warn("Error encoding metadata."); + Log.warn(e); + } + put(type.getBytes(), metric.getBytes(), json.toString().getBytes()); + } - @Override - public void incrCounter(Enum<?> arg0, long arg1) { - // TODO Auto-generated method stub - + public void put(String key, String source, String info) { + put(key.getBytes(), source.getBytes(), info.getBytes()); } - @Override - public void incrCounter(String arg0, String arg1, long arg2) { - // TODO Auto-generated method stub - + public void put(byte[] key, byte[] source, byte[] info) { + Put put = new Put(key); + put.add("k".getBytes(), source, info); + meta.add(put); } - @Override - public void setStatus(String arg0) { - // TODO Auto-generated method stub - + public void clear() { + meta.clear(); } - @Override - public void progress() { - // TODO Auto-generated method stub - + public List<Put> getInfo() { + return meta; } - @Override - public float getProgress() { - // TODO Auto-generated method stub - return 0.0f; + private byte[] getHash(String key) { + byte[] hash = new byte[3]; + System.arraycopy(md5.digest(key.getBytes()), 0, hash, 0, 3); + return hash; } - public void clear() { - // TODO Auto-generated method stub - + public void putClusterName(String type, String clusterName) { + byte[] value = getHash(clusterName); + JSONObject json = new JSONObject(); + + try { + json.put("sig", new String(value, "UTF-8")); + json.put("type", "cluster"); + } catch (UnsupportedEncodingException e) { + Log.warn("Error encoding metadata."); + Log.warn(e); + } + put(type.getBytes(), clusterName.getBytes(), json.toString().getBytes()); } } http://git-wip-us.apache.org/repos/asf/chukwa/blob/7ae68398/src/main/java/org/apache/hadoop/chukwa/datastore/ChukwaHBaseStore.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/hadoop/chukwa/datastore/ChukwaHBaseStore.java b/src/main/java/org/apache/hadoop/chukwa/datastore/ChukwaHBaseStore.java index 82a5e57..d9c32d6 100644 --- a/src/main/java/org/apache/hadoop/chukwa/datastore/ChukwaHBaseStore.java +++ b/src/main/java/org/apache/hadoop/chukwa/datastore/ChukwaHBaseStore.java @@ -18,15 +18,18 @@ package org.apache.hadoop.chukwa.datastore; import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; import java.util.Calendar; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; import java.util.List; +import java.util.Map.Entry; +import java.util.NavigableMap; import java.util.Set; +import java.util.TimeZone; import java.util.concurrent.CopyOnWriteArraySet; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -35,330 +38,293 @@ import org.apache.hadoop.chukwa.hicc.bean.HeatMapPoint; import org.apache.hadoop.chukwa.hicc.bean.Heatmap; import org.apache.hadoop.chukwa.hicc.bean.Series; import org.apache.hadoop.chukwa.util.ExceptionUtil; +import org.apache.hadoop.chukwa.util.HBaseUtil; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.HTableDescriptor; -import org.apache.hadoop.hbase.client.HBaseAdmin; -import org.apache.hadoop.hbase.client.HConnection; -import org.apache.hadoop.hbase.client.HConnectionManager; -import org.apache.hadoop.hbase.client.HTableInterface; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.filter.ColumnPrefixFilter; -import org.apache.hadoop.hbase.filter.RowFilter; -import org.apache.hadoop.hbase.filter.RegexStringComparator; -import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; +import org.apache.hadoop.hbase.client.Table; import org.apache.log4j.Logger; +import org.json.simple.JSONObject; +import org.json.simple.JSONValue; public class ChukwaHBaseStore { private static Configuration hconf = HBaseConfiguration.create(); - private static HConnection connection = null; - private static final int POOL_SIZE = 60; - static Logger log = Logger.getLogger(ChukwaHBaseStore.class); - - public static Series getSeries(String tableName, String rkey, String family, String column, - long startTime, long endTime, boolean filterByRowKey) { - StringBuilder seriesName = new StringBuilder(); - seriesName.append(rkey); - seriesName.append(":"); - seriesName.append(family); - seriesName.append(":"); - seriesName.append(column); + static Logger LOG = Logger.getLogger(ChukwaHBaseStore.class); + static byte[] COLUMN_FAMILY = "t".getBytes(); + static byte[] ANNOTATION_FAMILY = "a".getBytes(); + static byte[] KEY_NAMES = "k".getBytes(); + private static final String CHUKWA = "chukwa"; + private static final String CHUKWA_META = "chukwa_meta"; + private static long MILLISECONDS_IN_DAY = 86400000L; - Series series = new Series(seriesName.toString()); + /** + * Scan chukwa table for a particular metric group and metric name based on + * time ranges. + * + * @param metricGroup + * @param metric + * @param source + * @param startTime + * @param endTime + * @return + */ + public static Series getSeries(String metricGroup, String metric, + String source, long startTime, long endTime) { + String fullMetricName = new StringBuilder(metricGroup).append(".") + .append(metric).toString(); + return getSeries(fullMetricName, source, startTime, endTime); + } + + /** + * Scan chukwa table for a full metric name based on time ranges. + * + * @param metric + * @param source + * @param startTime + * @param endTime + * @return + */ + public static Series getSeries(String metric, String source, long startTime, + long endTime) { + String seriesName = new StringBuilder(metric).append(":").append(source).toString(); + Series series = new Series(seriesName); try { - HTableInterface table = getHTableConnection().getTable(tableName); - Calendar c = Calendar.getInstance(); - c.setTimeInMillis(startTime); - c.set(Calendar.MINUTE, 0); - c.set(Calendar.SECOND, 0); - c.set(Calendar.MILLISECOND, 0); - String startRow = c.getTimeInMillis()+rkey; - Scan scan = new Scan(); - scan.addColumn(family.getBytes(), column.getBytes()); - scan.setStartRow(startRow.getBytes()); - scan.setTimeRange(startTime, endTime); - scan.setMaxVersions(); - if(filterByRowKey) { - RowFilter rf = new RowFilter(CompareOp.EQUAL, new - RegexStringComparator("[0-9]+-"+rkey+"$")); - scan.setFilter(rf); + // Swap start and end if the values are inverted. + if (startTime > endTime) { + long temp = endTime; + startTime = endTime; + endTime = temp; } - ResultScanner results = table.getScanner(scan); - Iterator<Result> it = results.iterator(); - // TODO: Apply discrete wavelet transformation to limit the output - // size to 1000 data points for graphing optimization. (i.e jwave) - while(it.hasNext()) { - Result result = it.next(); - String temp = new String(result.getValue(family.getBytes(), column.getBytes())); - double value = Double.parseDouble(temp); - // TODO: Pig Store function does not honor HBase timestamp, hence need to parse rowKey for timestamp. - String buf = new String(result.getRow()); - Long timestamp = Long.parseLong(buf.split("-")[0]); - // If Pig Store function can honor HBase timestamp, use the following line is better. - // series.add(result.getCellValue().getTimestamp(), value); - series.add(timestamp, value); + Connection connection = ConnectionFactory.createConnection(hconf); + Table table = connection.getTable(TableName.valueOf(CHUKWA)); + Scan scan = new Scan(); + Calendar c = Calendar.getInstance(TimeZone.getTimeZone("UTC")); + c.setTimeInMillis(startTime); + int startDay = c.get(Calendar.DAY_OF_YEAR); + c.setTimeInMillis(endTime); + int endDay = c.get(Calendar.DAY_OF_YEAR); + long currentDay = startTime; + for (int i = startDay; i <= endDay; i++) { + byte[] rowKey = HBaseUtil.buildKey(currentDay, metric, source); + // ColumnRangeFilter crf = new + // ColumnRangeFilter(Long.valueOf(startTime).toString().getBytes(), + // true, Long.valueOf(endTime).toString().getBytes(), true); + // scan.setFilter(crf); + scan.addFamily(COLUMN_FAMILY); + scan.setStartRow(rowKey); + scan.setStopRow(rowKey); + scan.setTimeRange(startTime, endTime); + scan.setBatch(10000); + + ResultScanner results = table.getScanner(scan); + Iterator<Result> it = results.iterator(); + // TODO: Apply discrete wavelet transformation to limit the output + // size to 1000 data points for graphing optimization. (i.e jwave) + while (it.hasNext()) { + Result result = it.next(); + for (KeyValue kv : result.raw()) { + byte[] key = kv.getQualifier(); + long timestamp = ByteBuffer.wrap(key).getLong(); + double value = Double + .parseDouble(new String(kv.getValue(), "UTF-8")); + series.add(timestamp, value); + } + } + results.close(); + currentDay = currentDay + (i * MILLISECONDS_IN_DAY); } - results.close(); table.close(); - } catch(Exception e) { - log.error(ExceptionUtil.getStackTrace(e)); + } catch (Exception e) { + LOG.error(ExceptionUtil.getStackTrace(e)); } return series; } - public static Set<String> getFamilyNames(String tableName) { + public static Set<String> getMetricNames(String metricGroup) { Set<String> familyNames = new CopyOnWriteArraySet<String>(); try { - HTableInterface table = getHTableConnection().getTable(tableName); - Set<byte[]> families = table.getTableDescriptor().getFamiliesKeys(); - for(byte[] name : families) { - familyNames.add(new String(name)); + Connection connection = ConnectionFactory.createConnection(hconf); + Table table = connection.getTable(TableName.valueOf(CHUKWA_META)); + Get get = new Get(metricGroup.getBytes()); + Result result = table.get(get); + for (KeyValue kv : result.raw()) { + JSONObject json = (JSONObject) JSONValue.parse(new String(kv.getValue(), "UTF-8")); + if (json.get("type").equals("metric")) { + familyNames.add(new String(kv.getQualifier(), "UTF-8")); + } } table.close(); - } catch(Exception e) { - log.error(ExceptionUtil.getStackTrace(e)); + connection.close(); + } catch (Exception e) { + LOG.error(ExceptionUtil.getStackTrace(e)); } return familyNames; - + } - - public static Set<String> getTableNames() { - Set<String> tableNames = new CopyOnWriteArraySet<String>(); + + public static Set<String> getMetricGroups() { + Set<String> metricGroups = new CopyOnWriteArraySet<String>(); try { - HBaseAdmin admin = new HBaseAdmin(hconf); - HTableDescriptor[] td = admin.listTables(); - for(HTableDescriptor table : td) { - tableNames.add(new String(table.getName())); + Connection connection = ConnectionFactory.createConnection(hconf); + Table table = connection.getTable(TableName.valueOf(CHUKWA_META)); + Scan scan = new Scan(); + scan.addFamily(KEY_NAMES); + ResultScanner rs = table.getScanner(scan); + Iterator<Result> it = rs.iterator(); + while (it.hasNext()) { + Result result = it.next(); + metricGroups.add(new String(result.getRow(), "UTF-8")); } - } catch(Exception e) { - log.error(ExceptionUtil.getStackTrace(e)); + table.close(); + connection.close(); + } catch (Exception e) { + LOG.error(ExceptionUtil.getStackTrace(e)); } - return tableNames; + return metricGroups; } - public static void getColumnNamesHelper(Set<String>columnNames, Iterator<Result> it) { - Result result = it.next(); - if(result!=null) { - List<Cell> cList = result.listCells(); - for(Cell cell : cList) { - columnNames.add(new String(CellUtil.cloneQualifier(cell))); - } - } - } - - public static Set<String> getColumnNames(String tableName, String family, long startTime, long endTime, boolean fullScan) { - Set<String> columnNames = new CopyOnWriteArraySet<String>(); + public static Set<String> getSourceNames(String dataType) { + Set<String> pk = new HashSet<String>(); try { - HTableInterface table = getHTableConnection().getTable(tableName); + Connection connection = ConnectionFactory.createConnection(hconf); + Table table = connection.getTable(TableName.valueOf(CHUKWA_META)); Scan scan = new Scan(); - if(!fullScan) { - // Take sample columns of the recent time. - StringBuilder temp = new StringBuilder(); - temp.append(endTime-300000L); - scan.setStartRow(temp.toString().getBytes()); - temp.setLength(0); - temp.append(endTime); - scan.setStopRow(temp.toString().getBytes()); - } else { - StringBuilder temp = new StringBuilder(); - temp.append(startTime); - scan.setStartRow(temp.toString().getBytes()); - temp.setLength(0); - temp.append(endTime); - scan.setStopRow(temp.toString().getBytes()); - } - scan.addFamily(family.getBytes()); - ResultScanner results = table.getScanner(scan); - Iterator<Result> it = results.iterator(); - if(fullScan) { - while(it.hasNext()) { - getColumnNamesHelper(columnNames, it); - } - } else { - getColumnNamesHelper(columnNames, it); + scan.addFamily(KEY_NAMES); + ResultScanner rs = table.getScanner(scan); + Iterator<Result> it = rs.iterator(); + while (it.hasNext()) { + Result result = it.next(); + for (Cell cell : result.rawCells()) { + JSONObject json = (JSONObject) JSONValue.parse(new String(cell.getValue(), "UTF-8")); + if (json.get("type").equals("source")) { + pk.add(new String(cell.getQualifier(), "UTF-8")); + } + } } - results.close(); table.close(); - } catch(Exception e) { - log.error(ExceptionUtil.getStackTrace(e)); + connection.close(); + } catch (Exception e) { + LOG.error(ExceptionUtil.getStackTrace(e)); } - return columnNames; + return pk; } - - public static Set<String> getRowNames(String tableName, String family, String qualifier, long startTime, long endTime, boolean fullScan) { - Set<String> rows = new HashSet<String>(); + + public static Heatmap getHeatmap(String metricGroup, String metric, + long startTime, long endTime, double max, double scale, int height) { + final long MINUTE = TimeUnit.MINUTES.toMillis(1); + Heatmap heatmap = new Heatmap(); + Set<String> sources = getSourceNames(metricGroup); + Set<String> metrics = getMetricNames(metricGroup); + List<Get> series = new ArrayList<Get>(); + String fullName = new StringBuilder(metricGroup).append(".").append(metric).toString(); try { - HTableInterface table = getHTableConnection().getTable(tableName); - Scan scan = new Scan(); - scan.addColumn(family.getBytes(), qualifier.getBytes()); - if(!fullScan) { - // Take sample columns of the recent time. - StringBuilder temp = new StringBuilder(); - temp.append(endTime-300000L); - scan.setStartRow(temp.toString().getBytes()); - temp.setLength(0); - temp.append(endTime); - scan.setStopRow(temp.toString().getBytes()); - } else { - StringBuilder temp = new StringBuilder(); - temp.append(startTime); - scan.setStartRow(temp.toString().getBytes()); - temp.setLength(0); - temp.append(endTime); - scan.setStopRow(temp.toString().getBytes()); + Connection connection = ConnectionFactory.createConnection(hconf); + Table table = connection.getTable(TableName.valueOf(CHUKWA)); + Calendar c = Calendar.getInstance(TimeZone.getTimeZone("UTC")); + c.setTimeInMillis(startTime); + int startDay = c.get(Calendar.DAY_OF_YEAR); + c.setTimeInMillis(endTime); + int endDay = c.get(Calendar.DAY_OF_YEAR); + long currentDay = startTime; + for (int i = startDay; i <= endDay; i++) { + for (String m : metrics) { + if (m.startsWith(fullName)) { + for (String source : sources) { + byte[] rowKey = HBaseUtil.buildKey(currentDay, m, source); + Get serie = new Get(rowKey); + serie.addFamily(COLUMN_FAMILY); + serie.setTimeRange(startTime, endTime); + series.add(serie); + } + } + } + currentDay = currentDay + (i * MILLISECONDS_IN_DAY); } - ResultScanner results = table.getScanner(scan); - Iterator<Result> it = results.iterator(); - while(it.hasNext()) { - Result result = it.next(); - String buffer = new String(result.getRow()); - String[] parts = buffer.split("-", 2); - if(!rows.contains(parts[1])) { - rows.add(parts[1]); - } + Result[] rs = table.get(series); + int index = 0; + // Series display in y axis + int y = 0; + HashMap<String, Integer> keyMap = new HashMap<String, Integer>(); + for (Result result : rs) { + for(Cell cell : result.rawCells()) { + byte[] dest = new byte[5]; + System.arraycopy(cell.getRow(), 3, dest, 0, 5); + String source = new String(dest); + long time = cell.getTimestamp(); + // Time display in x axis + int x = (int) ((time - startTime) / MINUTE); + if (keyMap.containsKey(source)) { + y = keyMap.get(source); + } else { + keyMap.put(source, new Integer(index)); + y = index; + index++; + } + double v = Double.parseDouble(new String(CellUtil.cloneValue(cell))); + heatmap.put(x, y, v); + if (v > max) { + max = v; + } + } } - results.close(); table.close(); - } catch(Exception e) { - log.error(ExceptionUtil.getStackTrace(e)); + int radius = height / index; + // Usually scale max from 0 to 100 for visualization + heatmap.putMax(scale); + for (HeatMapPoint point : heatmap.getHeatmap()) { + double round = point.count / max * scale; + round = Math.round(round * 100.0) / 100.0; + point.put(point.x, point.y * radius, round); + } + heatmap.putRadius(radius); + heatmap.putSeries(index); + } catch (IOException e) { + LOG.error(ExceptionUtil.getStackTrace(e)); } - return rows; + return heatmap; } - - public static Set<String> getHostnames(String cluster, long startTime, long endTime, boolean fullScan) { - return getRowNames("SystemMetrics","system", "csource", startTime, endTime, fullScan); - } - + + /** + * Scan chukwa table and find cluster tag from annotation column family from a + * range of entries. + * + * @param startTime + * @param endTime + * @return + */ public static Set<String> getClusterNames(long startTime, long endTime) { - String tableName = "SystemMetrics"; - String family = "system"; - String column = "ctags"; Set<String> clusters = new HashSet<String>(); - Pattern p = Pattern.compile("\\s*cluster=\"(.*?)\""); try { - HTableInterface table = getHTableConnection().getTable(tableName); + Connection connection = ConnectionFactory.createConnection(hconf); + Table table = connection.getTable(TableName.valueOf(CHUKWA_META)); Scan scan = new Scan(); - scan.addColumn(family.getBytes(), column.getBytes()); - scan.setTimeRange(startTime, endTime); - ResultScanner results = table.getScanner(scan); - Iterator<Result> it = results.iterator(); - while(it.hasNext()) { + scan.addFamily(KEY_NAMES); + ResultScanner rs = table.getScanner(scan); + Iterator<Result> it = rs.iterator(); + while (it.hasNext()) { Result result = it.next(); - String buffer = new String(result.getValue(family.getBytes(), column.getBytes())); - Matcher m = p.matcher(buffer); - if(m.matches()) { - clusters.add(m.group(1)); + for (Cell cell : result.rawCells()) { + JSONObject json = (JSONObject) JSONValue.parse(new String(cell.getValue(), "UTF-8")); + if (json.get("type").equals("cluster")) { + clusters.add(new String(cell.getQualifier(), "UTF-8")); + } } } - results.close(); table.close(); - } catch(Exception e) { - log.error(ExceptionUtil.getStackTrace(e)); + connection.close(); + } catch (Exception e) { + LOG.error(ExceptionUtil.getStackTrace(e)); } return clusters; } - - public static Heatmap getHeatmap(String tableName, String family, String column, - long startTime, long endTime, double max, double scale, int height) { - final long MINUTE = TimeUnit.MINUTES.toMillis(1); - Heatmap heatmap = new Heatmap(); - - try { - HTableInterface table = getHTableConnection().getTable(tableName); - Scan scan = new Scan(); - ColumnPrefixFilter cpf = new ColumnPrefixFilter(column.getBytes()); - scan.addFamily(family.getBytes()); - scan.setFilter(cpf); - scan.setTimeRange(startTime, endTime); - scan.setBatch(10000); - ResultScanner results = table.getScanner(scan); - Iterator<Result> it = results.iterator(); - int index = 0; - // Series display in y axis - int y = 0; - HashMap<String, Integer> keyMap = new HashMap<String, Integer>(); - while(it.hasNext()) { - Result result = it.next(); - List<Cell> cList = result.listCells(); - for(Cell cell : cList) { - String key = parseRowKey(result.getRow()); - StringBuilder tmp = new StringBuilder(); - tmp.append(key); - tmp.append(":"); - tmp.append(new String(CellUtil.cloneQualifier(cell))); - String seriesName = tmp.toString(); - long time = parseTime(result.getRow()); - // Time display in x axis - int x = (int) ((time - startTime) / MINUTE); - if(keyMap.containsKey(seriesName)) { - y = keyMap.get(seriesName); - } else { - keyMap.put(seriesName, new Integer(index)); - y = index; - index++; - } - double v = Double.parseDouble(new String(CellUtil.cloneValue(cell))); - heatmap.put(x, y, v); - if(v > max) { - max = v; - } - } - } - results.close(); - table.close(); - int radius = height / index; - // Usually scale max from 0 to 100 for visualization - heatmap.putMax(scale); - for(HeatMapPoint point : heatmap.getHeatmap()) { - double round = point.count / max * scale; - round = Math.round(round * 100.0) / 100.0; - point.put(point.x, point.y * radius, round); - } - heatmap.putRadius(radius); - heatmap.putSeries(index); - } catch (IOException e) { - log.error(ExceptionUtil.getStackTrace(e)); - } - return heatmap; - } - - private static String parseRowKey(byte[] row) { - String key = new String(row); - String[] parts = key.split("-", 2); - return parts[1]; - } - - private static long parseTime(byte[] row) { - String key = new String(row); - String[] parts = key.split("-", 2); - long time = Long.parseLong(parts[0]); - return time; - } - - private static HConnection getHTableConnection() { - if(connection == null) { - synchronized(ChukwaHBaseStore.class) { - try { - ExecutorService pool = Executors.newFixedThreadPool(POOL_SIZE); - /* Set the hbase client properties to unblock immediately in case - * hbase goes down. This will ensure we timeout on socket connection to - * hbase early. - */ - hconf.setInt("hbase.client.operation.timeout", 60000); - hconf.setLong("hbase.client.pause", 1000); - hconf.setInt("hbase.client.retries.number", 1); - connection = HConnectionManager.createConnection(hconf, pool); - }catch(IOException e) { - log.error("Unable to obtain connection to HBase " + e.getMessage()); - e.printStackTrace(); - } - } - } - return connection; - } } http://git-wip-us.apache.org/repos/asf/chukwa/blob/7ae68398/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/DFInvalidRecord.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/DFInvalidRecord.java b/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/DFInvalidRecord.java deleted file mode 100644 index 6b60b3a..0000000 --- a/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/DFInvalidRecord.java +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.chukwa.extraction.demux.processor.mapper; - - -public class DFInvalidRecord extends Exception { - - /** - * - */ - private static final long serialVersionUID = 1254238125122522523L; - - public DFInvalidRecord() { - } - - public DFInvalidRecord(String arg0) { - super(arg0); - } - - public DFInvalidRecord(Throwable arg0) { - super(arg0); - } - - public DFInvalidRecord(String arg0, Throwable arg1) { - super(arg0, arg1); - } - -} http://git-wip-us.apache.org/repos/asf/chukwa/blob/7ae68398/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/Df.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/Df.java b/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/Df.java deleted file mode 100644 index ef2bddd..0000000 --- a/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/Df.java +++ /dev/null @@ -1,118 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.chukwa.extraction.demux.processor.mapper; - - -import java.io.IOException; -import java.text.ParseException; -import java.text.SimpleDateFormat; -import java.util.Date; - -import org.apache.hadoop.chukwa.datacollection.writer.hbase.Annotation.Table; -import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord; -import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey; -import org.apache.hadoop.mapred.OutputCollector; -import org.apache.hadoop.mapred.Reporter; -import org.apache.log4j.Logger; - -@Table(name="SystemMetrics",columnFamily="Disk") -public class Df extends AbstractProcessor { - static Logger log = Logger.getLogger(Df.class); - - private static final String[] headerSplitCols = { "Filesystem", "1K-blocks", - "Used", "Available", "Use%", "Mounted", "on" }; - private static final String[] headerCols = { "Filesystem", "1K-blocks", - "Used", "Available", "Use%", "Mounted on" }; - private SimpleDateFormat sdf = null; - - public Df() { - sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm"); - } - - @Override - protected void parse(String recordEntry, - OutputCollector<ChukwaRecordKey, ChukwaRecord> output, Reporter reporter) - throws Throwable { - - try { - String dStr = recordEntry.substring(0, 23); - int start = 24; - int idx = recordEntry.indexOf(' ', start); - // String level = recordEntry.substring(start, idx); - start = idx + 1; - idx = recordEntry.indexOf(' ', start); - // String className = recordEntry.substring(start, idx-1); - String body = recordEntry.substring(idx + 1); - - Date d = sdf.parse(dStr); - String[] lines = body.split("\n"); - - String[] outputCols = lines[0].substring(lines[0].indexOf("Filesystem")).split("[\\s]++"); - - if (outputCols.length != headerSplitCols.length - || outputCols[0].intern() != headerSplitCols[0].intern() - || outputCols[1].intern() != headerSplitCols[1].intern() - || outputCols[2].intern() != headerSplitCols[2].intern() - || outputCols[3].intern() != headerSplitCols[3].intern() - || outputCols[4].intern() != headerSplitCols[4].intern() - || outputCols[5].intern() != headerSplitCols[5].intern() - || outputCols[6].intern() != headerSplitCols[6].intern()) { - throw new DFInvalidRecord("Wrong output format (header) [" - + recordEntry + "]"); - } - - String[] values = null; - - // Data - ChukwaRecord record = null; - - for (int i = 1; i < lines.length; i++) { - values = lines[i].split("[\\s]++"); - key = new ChukwaRecordKey(); - record = new ChukwaRecord(); - this.buildGenericRecord(record, null, d.getTime(), "Df"); - - record.add(headerCols[0], values[0]); - record.add(headerCols[1], values[1]); - record.add(headerCols[2], values[2]); - record.add(headerCols[3], values[3]); - record.add(headerCols[4], values[4] - .substring(0, values[4].length() - 1)); // Remove % - record.add(headerCols[5], values[5]); - - output.collect(key, record); - } - - // log.info("DFProcessor output 1 DF record"); - } catch (ParseException e) { - e.printStackTrace(); - log.warn("Wrong format in DFProcessor [" + recordEntry + "]", e); - throw e; - } catch (IOException e) { - e.printStackTrace(); - log.warn("Unable to collect output in DFProcessor [" + recordEntry + "]", - e); - throw e; - } catch (DFInvalidRecord e) { - e.printStackTrace(); - log.warn("Wrong format in DFProcessor [" + recordEntry + "]", e); - throw e; - } - } -} http://git-wip-us.apache.org/repos/asf/chukwa/blob/7ae68398/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/HadoopMetricsProcessor.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/HadoopMetricsProcessor.java b/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/HadoopMetricsProcessor.java index 769c054..f671049 100644 --- a/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/HadoopMetricsProcessor.java +++ b/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/HadoopMetricsProcessor.java @@ -50,22 +50,9 @@ import org.json.simple.JSONValue; @Table(name="Hadoop",columnFamily="mapred_tasktracker"), @Table(name="Hadoop",columnFamily="rpc_metrics") }) -public class HadoopMetricsProcessor extends AbstractProcessor { -// public static final String jvm = "jvm_metrics"; -// public static final String mapred = "mapred_metrics"; -// public static final String dfs = "dfs_metrics"; -// public static final String namenode = "dfs_namenode"; -// public static final String fsdir = "dfs_FSDirectory"; -// public static final String fsname = "dfs_FSNamesystem"; -// public static final String datanode = "dfs_datanode"; -// public static final String jobtracker = "mapred_jobtracker"; -// public static final String shuffleIn = "mapred_shuffleInput"; -// public static final String shuffleOut = "mapred_shuffleOutput"; -// public static final String tasktracker = "mapred_tasktracker"; -// public static final String mr = "mapred_job"; - +public class HadoopMetricsProcessor extends AbstractProcessor { static Logger log = Logger.getLogger(HadoopMetricsProcessor.class); - static final String chukwaTimestampField = "chukwa_timestamp"; + static final String chukwaTimestampField = "timestamp"; static final String contextNameField = "contextName"; static final String recordNameField = "recordName"; http://git-wip-us.apache.org/repos/asf/chukwa/blob/7ae68398/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/Iostat.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/Iostat.java b/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/Iostat.java deleted file mode 100644 index 452ddb8..0000000 --- a/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/Iostat.java +++ /dev/null @@ -1,146 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.chukwa.extraction.demux.processor.mapper; - - -import java.text.SimpleDateFormat; -import java.util.Date; -import java.util.regex.Matcher; -import java.util.regex.Pattern; - -import org.apache.hadoop.chukwa.datacollection.writer.hbase.Annotation.Table; -import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord; -import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey; -import org.apache.hadoop.mapred.OutputCollector; -import org.apache.hadoop.mapred.Reporter; -import org.apache.log4j.Logger; - -@Table(name="SystemMetrics",columnFamily="SystemMetrics") -public class Iostat extends AbstractProcessor { - static Logger log = Logger.getLogger(Iostat.class); - public final String recordType = this.getClass().getName(); - - private static String regex = "([0-9]{4}-[0-9]{2}-[0-9]{2} [0-9]{2}:[0-9]{2}:[0-9]{2},[0-9]{3}) (.*?) (.*?): (.*?) \\((.*?)\\)"; - private static Pattern p = null; - - private Matcher matcher = null; - private SimpleDateFormat sdf = null; - - public Iostat() { - // TODO move that to config - sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm"); - p = Pattern.compile(regex); - } - - @Override - protected void parse(String recordEntry, - OutputCollector<ChukwaRecordKey, ChukwaRecord> output, Reporter reporter) - throws Throwable { - - log.debug("Iostat record: [" + recordEntry + "] type[" - + chunk.getDataType() + "]"); - int i = 0; - - matcher = p.matcher(recordEntry); - while (matcher.find()) { - log.debug("Iostat Processor Matches"); - - try { - Date d = sdf.parse(matcher.group(1).trim()); - - String[] lines = recordEntry.split("\n"); - String[] headers = null; - for (int skip = 0; skip < 2; skip++) { - i++; - while (i < lines.length && lines[i].indexOf("avg-cpu") < 0) { - // Skip the first output because the numbers are averaged from - // system boot up - log.debug("skip line:" + lines[i]); - i++; - } - } - while (i < lines.length) { - ChukwaRecord record = null; - - if (lines[i].indexOf("avg-cpu") >= 0 - || lines[i].indexOf("Device") >= 0) { - headers = parseHeader(lines[i]); - i++; - } - String data[] = parseData(lines[i]); - if (headers[0].equals("avg-cpu:")) { - log.debug("Matched CPU-Utilization"); - record = new ChukwaRecord(); - key = new ChukwaRecordKey(); - buildGenericRecord(record, null, d.getTime(), "SystemMetrics"); - } else if (headers[0].equals("Device:")) { - log.debug("Matched Iostat"); - record = new ChukwaRecord(); - key = new ChukwaRecordKey(); - buildGenericRecord(record, null, d.getTime(), "SystemMetrics"); - } else { - log.debug("No match:" + headers[0]); - } - if (record != null) { - int j = 0; - log.debug("Data Length: " + data.length); - while (j < data.length) { - log.debug("header:" + headers[j] + " data:" + data[j]); - if (!headers[j].equals("avg-cpu:")) { - try { - // Filter out overflow values for older linux systems - long x=Long.parseLong(data[j]); - if(x<100000000000L) { - record.add(headers[j],data[j]); - } - } catch(NumberFormatException ex) { - record.add(headers[j],data[j]); - } - } - j++; - } - record.setTime(d.getTime()); - if (data.length > 3) { - output.collect(key, record); - } - } - i++; - } - // End of parsing - } catch (Exception e) { - e.printStackTrace(); - throw e; - } - } - } - - public String[] parseHeader(String header) { - String[] headers = header.split("\\s+"); - return headers; - } - - public String[] parseData(String dataLine) { - String[] data = dataLine.split("\\s+"); - return data; - } - - public String getDataType() { - return recordType; - } -} http://git-wip-us.apache.org/repos/asf/chukwa/blob/7ae68398/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/JobSummary.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/JobSummary.java b/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/JobSummary.java index 0be8615..dbf4baa 100644 --- a/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/JobSummary.java +++ b/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/JobSummary.java @@ -39,7 +39,7 @@ import org.apache.log4j.Logger; }) public class JobSummary extends AbstractProcessor { static Logger log = Logger.getLogger(JobSummary.class); - static final String chukwaTimestampField = "chukwa_timestamp"; + static final String chukwaTimestampField = "timestamp"; static final String contextNameField = "contextName"; static final String recordNameField = "recordName"; http://git-wip-us.apache.org/repos/asf/chukwa/blob/7ae68398/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/Log4JMetricsContextProcessor.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/Log4JMetricsContextProcessor.java b/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/Log4JMetricsContextProcessor.java index 9c8a3ae..16b12f3 100644 --- a/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/Log4JMetricsContextProcessor.java +++ b/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/Log4JMetricsContextProcessor.java @@ -53,7 +53,7 @@ public class Log4JMetricsContextProcessor extends AbstractProcessor { JSONObject json = new JSONObject(log.getBody()); // round timestamp - timestamp = json.getLong("chukwa_timestamp"); + timestamp = json.getLong("timestamp"); timestamp = (timestamp / 60000) * 60000; // get record type http://git-wip-us.apache.org/repos/asf/chukwa/blob/7ae68398/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/PbsInvalidEntry.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/PbsInvalidEntry.java b/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/PbsInvalidEntry.java deleted file mode 100644 index 93252d4..0000000 --- a/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/PbsInvalidEntry.java +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.chukwa.extraction.demux.processor.mapper; - - -public class PbsInvalidEntry extends Exception { - - /** - * - */ - private static final long serialVersionUID = 9154096600390233023L; - - public PbsInvalidEntry() { - } - - public PbsInvalidEntry(String message) { - super(message); - } - - public PbsInvalidEntry(Throwable cause) { - super(cause); - } - - public PbsInvalidEntry(String message, Throwable cause) { - super(message, cause); - } - -} http://git-wip-us.apache.org/repos/asf/chukwa/blob/7ae68398/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/PbsNodes.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/PbsNodes.java b/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/PbsNodes.java deleted file mode 100644 index 23dc74c..0000000 --- a/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/PbsNodes.java +++ /dev/null @@ -1,198 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.chukwa.extraction.demux.processor.mapper; - - -import java.io.IOException; -import java.text.ParseException; -import java.text.SimpleDateFormat; -import java.util.Date; -import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord; -import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey; -import org.apache.hadoop.mapred.OutputCollector; -import org.apache.hadoop.mapred.Reporter; -import org.apache.log4j.Logger; - -public class PbsNodes extends AbstractProcessor { - static Logger log = Logger.getLogger(PbsNodes.class); - - private static final String rawPBSRecordType = "PbsNodes"; - private static final String machinePBSRecordType = "MachinePbsNodes"; - private SimpleDateFormat sdf = null; - - public PbsNodes() { - // TODO move that to config - sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm"); - } - - @Override - protected void parse(String recordEntry, - OutputCollector<ChukwaRecordKey, ChukwaRecord> output, Reporter reporter) - throws Throwable { - - // log.info("PbsNodeProcessor record: [" + recordEntry + "] type[" + - // chunk.getDataType() + "]"); - - StringBuilder sb = new StringBuilder(); - int i = 0; - String nodeActivityStatus = null; - StringBuilder sbFreeMachines = new StringBuilder(); - StringBuilder sbUsedMachines = new StringBuilder(); - StringBuilder sbDownMachines = new StringBuilder(); - - int totalFreeNode = 0; - int totalUsedNode = 0; - int totalDownNode = 0; - - String body = null; - ChukwaRecord record = null; - - try { - - String dStr = recordEntry.substring(0, 23); - int start = 24; - int idx = recordEntry.indexOf(' ', start); - // String level = recordEntry.substring(start, idx); - start = idx + 1; - idx = recordEntry.indexOf(' ', start); - // String className = recordEntry.substring(start, idx-1); - body = recordEntry.substring(idx + 1); - - Date d = sdf.parse(dStr); - - String[] lines = body.split("\n"); - while (i < lines.length) { - while ((i < lines.length) && (lines[i].trim().length() > 0)) { - sb.append(lines[i].trim()).append("\n"); - i++; - } - - if ((i < lines.length) && (lines[i].trim().length() > 0)) { - throw new PbsInvalidEntry(recordEntry); - } - - // Empty line - i++; - - if (sb.length() > 0) { - body = sb.toString(); - // Process all entries for a machine - // System.out.println("=========>>> Record [" + body+ "]"); - - record = new ChukwaRecord(); - key = new ChukwaRecordKey(); - - buildGenericRecord(record, null, d.getTime(), machinePBSRecordType); - parsePbsRecord(body, record); - - // Output PbsNode record for 1 machine - output.collect(key, record); - // log.info("PbsNodeProcessor output 1 sub-record"); - - // compute Node Activity information - nodeActivityStatus = record.getValue("state"); - if (nodeActivityStatus != null) { - if (nodeActivityStatus.equals("free")) { - totalFreeNode++; - sbFreeMachines.append(record.getValue("Machine")).append(","); - } else if (nodeActivityStatus.equals("job-exclusive")) { - totalUsedNode++; - sbUsedMachines.append(record.getValue("Machine")).append(","); - } else { - totalDownNode++; - sbDownMachines.append(record.getValue("Machine")).append(","); - } - } - sb = new StringBuilder(); - } - } - - // End of parsing - - record = new ChukwaRecord(); - key = new ChukwaRecordKey(); - buildGenericRecord(record, null, d.getTime(), "NodeActivity"); - - record.setTime(d.getTime()); - record.add("used", "" + totalUsedNode); - record.add("free", "" + totalFreeNode); - record.add("down", "" + totalDownNode); - record.add("usedMachines", sbUsedMachines.toString()); - record.add("freeMachines", sbFreeMachines.toString()); - record.add("downMachines", sbDownMachines.toString()); - - output.collect(key, record); - // log.info("PbsNodeProcessor output 1 NodeActivity"); - } catch (ParseException e) { - e.printStackTrace(); - log.warn("Wrong format in PbsNodesProcessor [" + recordEntry + "]", e); - throw e; - } catch (IOException e) { - log.warn("Unable to collect output in PbsNodesProcessor [" + recordEntry - + "]", e); - e.printStackTrace(); - throw e; - } catch (PbsInvalidEntry e) { - log.warn("Wrong format in PbsNodesProcessor [" + recordEntry + "]", e); - e.printStackTrace(); - throw e; - } - - } - - protected static void parsePbsRecord(String recordLine, ChukwaRecord record) { - int i = 0; - String[] lines = recordLine.split("\n"); - record.add("Machine", lines[0]); - - i++; - String[] data = null; - while (i < lines.length) { - data = extractFields(lines[i]); - record.add(data[0].trim(), data[1].trim()); - if (data[0].trim().equalsIgnoreCase("status")) { - parseStatusField(data[1].trim(), record); - } - i++; - } - } - - protected static void parseStatusField(String statusField, ChukwaRecord record) { - String[] data = null; - String[] subFields = statusField.trim().split(","); - for (String subflied : subFields) { - data = extractFields(subflied); - record.add("status-" + data[0].trim(), data[1].trim()); - } - } - - static String[] extractFields(String line) { - String[] args = new String[2]; - int index = line.indexOf("="); - args[0] = line.substring(0, index); - args[1] = line.substring(index + 1); - - return args; - } - - public String getDataType() { - return PbsNodes.rawPBSRecordType; - } - -} http://git-wip-us.apache.org/repos/asf/chukwa/blob/7ae68398/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/Ps.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/Ps.java b/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/Ps.java deleted file mode 100644 index 7b10edd..0000000 --- a/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/Ps.java +++ /dev/null @@ -1,144 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.chukwa.extraction.demux.processor.mapper; - -import java.util.ArrayList; -import java.util.HashMap; -import java.util.Map.Entry; -import java.util.regex.Matcher; -import java.util.regex.Pattern; - -import org.apache.hadoop.chukwa.datacollection.writer.hbase.Annotation.Table; -import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord; -import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey; -import org.apache.hadoop.mapred.OutputCollector; -import org.apache.hadoop.mapred.Reporter; -import org.apache.log4j.Logger; - -@Table(name="SystemMetrics",columnFamily="Ps") -public class Ps extends AbstractProcessor { - static Logger log = Logger.getLogger(Ps.class); - public static final String reduceType = "Ps"; - - @Override - protected void parse(String recordEntry, - OutputCollector<ChukwaRecordKey, ChukwaRecord> output, Reporter reporter) - throws Throwable { - LogEntry log = new LogEntry(recordEntry); - PsOutput ps = new PsOutput(log.getBody()); - for (HashMap<String, String> processInfo : ps.getProcessList()) { - key = new ChukwaRecordKey(); - ChukwaRecord record = new ChukwaRecord(); - this.buildGenericRecord(record, null, log.getDate().getTime(), reduceType); - for (Entry<String, String> entry : processInfo.entrySet()) { - record.add(entry.getKey(), entry.getValue()); - } - output.collect(key, record); - } - } - - public static class PsOutput { - - // processes info - private ArrayList<HashMap<String, String>> recordList = new ArrayList<HashMap<String, String>>(); - - public PsOutput(String psCmdOutput) throws InvalidPsRecord { - if (psCmdOutput == null || psCmdOutput.length() == 0) - return; - - String[] lines = psCmdOutput.split("[\n\r]+"); - - // at least two lines - if (lines.length < 2) - return; - - // header - ArrayList<String> header = new ArrayList<String>(); - Matcher matcher = Pattern.compile("[^ ^\t]+").matcher(lines[0]); - while (matcher.find()) { - header.add(matcher.group(0)); - } - if (!header.get(header.size() - 1).equals("CMD")) { - throw new InvalidPsRecord("CMD must be the last column"); - } - - // records - boolean foundInitCmd = false; - for (int line = 1; line < lines.length; line++) { - HashMap<String, String> record = new HashMap<String, String>(); - recordList.add(record); - - matcher = Pattern.compile("[^ ^\t]+").matcher(lines[line]); - for (int index = 0; index < header.size(); index++) { - String key = header.get(index); - matcher.find(); - if (!key.equals("CMD")) { - String value = matcher.group(0); - /** - * For STARTED column, it could be in two formats: "MMM dd" or - * "hh:mm:ss". If we use ' ' as the delimiter, we must read twice to - * the date if it's with "MMM dd" format. - */ - if (key.equals("STARTED")) { - char c = value.charAt(0); - if (c < '0' || c > '9') { - matcher.find(); - value += matcher.group(0); - } - } - record.put(key, value); - } else { - // reached the cmd part. all remains should be put - // together as the command - String value = lines[line].substring(matcher.start()); - record.put(key, value); - if (!foundInitCmd) - foundInitCmd = value.startsWith("init"); - break; - } - } - } - if (!foundInitCmd) - throw new InvalidPsRecord("Did not find 'init' cmd"); - } - - public ArrayList<HashMap<String, String>> getProcessList() { - return recordList; - } - } - - public static class InvalidPsRecord extends Exception { - private static final long serialVersionUID = 1L; - - public InvalidPsRecord() { - } - - public InvalidPsRecord(String arg0) { - super(arg0); - } - - public InvalidPsRecord(Throwable arg0) { - super(arg0); - } - - public InvalidPsRecord(String arg0, Throwable arg1) { - super(arg0, arg1); - } - } -} http://git-wip-us.apache.org/repos/asf/chukwa/blob/7ae68398/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/Sar.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/Sar.java b/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/Sar.java deleted file mode 100644 index 3573b4c..0000000 --- a/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/Sar.java +++ /dev/null @@ -1,172 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.chukwa.extraction.demux.processor.mapper; - - -import java.text.SimpleDateFormat; -import java.util.Date; -import java.util.regex.Matcher; -import java.util.regex.Pattern; - -import org.apache.hadoop.chukwa.datacollection.writer.hbase.Annotation.Table; -import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord; -import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey; -import org.apache.hadoop.mapred.OutputCollector; -import org.apache.hadoop.mapred.Reporter; -import org.apache.log4j.Logger; - -@Table(name="SystemMetrics",columnFamily="SystemMetrics") -public class Sar extends AbstractProcessor { - static Logger log = Logger.getLogger(Sar.class); - public static final String reduceType = "SystemMetrics"; - public final String recordType = this.getClass().getName(); - - private static String regex = "([0-9]{4}-[0-9]{2}-[0-9]{2} [0-9]{2}:[0-9]{2}:[0-9]{2},[0-9]{3}) (.*?) (.*?): (.*?) \\((.*?)\\)"; - private static Pattern p = null; - - private Matcher matcher = null; - private SimpleDateFormat sdf = null; - - public Sar() { - // TODO move that to config - sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm"); - p = Pattern.compile(regex); - } - - @Override - protected void parse(String recordEntry, - OutputCollector<ChukwaRecordKey, ChukwaRecord> output, Reporter reporter) - throws Throwable { - - log.debug("Sar record: [" + recordEntry + "] type[" + chunk.getDataType() - + "]"); - int i = 0; - - // String logLevel = null; - // String className = null; - - matcher = p.matcher(recordEntry); - while (matcher.find()) { - log.debug("Sar Processor Matches"); - - try { - Date d = sdf.parse(matcher.group(1).trim()); - - // logLevel = matcher.group(2); - // className = matcher.group(3); - - // TODO create a more specific key structure - // part of ChukwaArchiveKey + record index if needed - key.setKey("" + d.getTime()); - - String[] lines = recordEntry.split("\n"); - - String[] headers = null; - while (i < (lines.length - 1) && lines[i + 1].indexOf("Average:") < 0) { - // Skip to the average lines - log.debug("skip:" + lines[i]); - i++; - } - while (i < lines.length) { - ChukwaRecord record = null; - if (lines[i].equals("")) { - i++; - headers = parseHeader(lines[i]); - i++; - } - String data[] = parseData(lines[i]); - - // FIXME please validate this - if (headers[1].equals("IFACE") && headers[2].equals("rxpck/s")) { - log.debug("Matched Sar-Network"); - - record = new ChukwaRecord(); - key = new ChukwaRecordKey(); - this.buildGenericRecord(record, null, d.getTime(), reduceType); - } else if (headers[1].equals("IFACE") && headers[2].equals("rxerr/s")) { - log.debug("Matched Sar-Network"); - - record = new ChukwaRecord(); - key = new ChukwaRecordKey(); - this.buildGenericRecord(record, null, d.getTime(), reduceType); - } else if (headers[1].equals("kbmemfree")) { - log.debug("Matched Sar-Memory"); - - record = new ChukwaRecord(); - key = new ChukwaRecordKey(); - this.buildGenericRecord(record, null, d.getTime(), reduceType); - } else if (headers[1].equals("totsck")) { - log.debug("Matched Sar-NetworkSockets"); - - record = new ChukwaRecord(); - key = new ChukwaRecordKey(); - this.buildGenericRecord(record, null, d.getTime(), reduceType); - } else if (headers[1].equals("runq-sz")) { - log.debug("Matched Sar-LoadAverage"); - - record = new ChukwaRecord(); - key = new ChukwaRecordKey(); - this.buildGenericRecord(record, null, d.getTime(), reduceType); - } else { - log.debug("No match:" + headers[1] + " " + headers[2]); - } - if (record != null) { - int j = 0; - - log.debug("Data Length: " + data.length); - while (j < data.length) { - log.debug("header:" + headers[j] + " data:" + data[j]); - - //special case code to work around peculiar versions of Sar - if(headers[j].equals("rxkB/s")) { - record.add("rxbyt/s", Double.toString(Double.parseDouble(data[j]) * 1000)); - } else if(headers[j].equals("txkB/s")){ - record.add("txbyt/s", Double.toString(Double.parseDouble(data[j]) * 1000)); - } else if (!headers[j].equals("Average:")) { //common case - record.add(headers[j], data[j]); - } - j++; - } - - output.collect(key, record); - } - i++; - } - // End of parsing - } catch (Exception e) { - e.printStackTrace(); - throw e; - } - } - } - - public String[] parseHeader(String header) { - String[] headers = header.split("\\s+"); - return headers; - } - - public String[] parseData(String dataLine) { - String[] data = dataLine.split("\\s+"); - return data; - } - - public String getDataType() { - return recordType; - } -} \ No newline at end of file
