Repository: chukwa Updated Branches: refs/heads/master 6def7b64d -> a6e0cbad7
CHUKWA-744. Implemented new parsers for extract and transform data to HBase format. (Eric Yang) Project: http://git-wip-us.apache.org/repos/asf/chukwa/repo Commit: http://git-wip-us.apache.org/repos/asf/chukwa/commit/f9dea324 Tree: http://git-wip-us.apache.org/repos/asf/chukwa/tree/f9dea324 Diff: http://git-wip-us.apache.org/repos/asf/chukwa/diff/f9dea324 Branch: refs/heads/master Commit: f9dea324bd532478e81ca4a4fece26ec42119b6f Parents: 6def7b6 Author: Eric Yang <[email protected]> Authored: Sat Apr 18 11:54:56 2015 -0700 Committer: Eric Yang <[email protected]> Committed: Sat Apr 18 11:54:56 2015 -0700 ---------------------------------------------------------------------- .../extraction/hbase/AbstractProcessor.java | 135 +++++++++++++ .../hbase/ChukwaMetricsProcessor.java | 59 ++++++ .../extraction/hbase/DefaultProcessor.java | 50 +++++ .../hbase/HadoopMetricsProcessor.java | 86 ++++++++ .../chukwa/extraction/hbase/LogEntry.java | 64 ++++++ .../extraction/hbase/ProcessorFactory.java | 55 +++++ .../chukwa/extraction/hbase/SystemMetrics.java | 200 +++++++++++++++++++ .../hbase/UnknownRecordTypeException.java | 44 ++++ .../apache/hadoop/chukwa/util/HBaseUtil.java | 62 ++++++ 9 files changed, 755 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/chukwa/blob/f9dea324/src/main/java/org/apache/hadoop/chukwa/extraction/hbase/AbstractProcessor.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/hadoop/chukwa/extraction/hbase/AbstractProcessor.java b/src/main/java/org/apache/hadoop/chukwa/extraction/hbase/AbstractProcessor.java new file mode 100644 index 0000000..b39c789 --- /dev/null +++ b/src/main/java/org/apache/hadoop/chukwa/extraction/hbase/AbstractProcessor.java @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.chukwa.extraction.hbase; + +import java.nio.ByteBuffer; +import java.security.MessageDigest; +import java.security.NoSuchAlgorithmException; +import java.util.ArrayList; +import java.util.Calendar; +import java.util.TimeZone; + +import org.apache.hadoop.chukwa.Chunk; +import org.apache.hadoop.chukwa.datacollection.writer.hbase.Reporter; +import org.apache.hadoop.chukwa.util.HBaseUtil; +import org.apache.hadoop.hbase.client.Put; +import org.apache.log4j.Logger; + +public abstract class AbstractProcessor { + static Logger LOG = Logger.getLogger(AbstractProcessor.class); + + protected int entryCount = 0; + protected String primaryKeyHelper; + protected String sourceHelper; + + protected byte[] key = null; + byte[] CF = "t".getBytes(); + + boolean chunkInErrorSaved = false; + ArrayList<Put> output = null; + ArrayList<Put> meta = null; + Reporter reporter = null; + long time = System.currentTimeMillis(); + Chunk chunk = null; + MessageDigest md5 = null; + + public AbstractProcessor() throws NoSuchAlgorithmException { + md5 = MessageDigest.getInstance("md5"); + } + + protected abstract void parse(byte[] recordEntry) throws Throwable; + + /** + * Generic metric function to add a metric to HBase with full primary key and + * source computed. + * + * @param time + * @param metric + * @param source + * @param value + * @param output + */ + public void addRecord(long time, String metric, String source, byte[] value, + ArrayList<Put> output) { + String primaryKey = new StringBuilder(primaryKeyHelper).append(".") + .append(metric).toString(); + byte[] key = HBaseUtil.buildKey(time, primaryKey, source); + Put put = new Put(key); + byte[] timeInBytes = ByteBuffer.allocate(8).putLong(time).array(); + put.add(CF, timeInBytes, time, value); + output.add(put); + reporter.putMetric(chunk.getDataType(), primaryKey); + reporter.putSource(chunk.getDataType(), source); + } + + public void addRecord(String primaryKey, String value) { + addRecord(primaryKey, value.getBytes()); + } + + /** + * Generic function to add a metric to HBase metric table, this function + * assumes "time" and "source" have been defined and will construct primaryKey + * only, without recompute time and source md5. + * + * @param time + * @param primaryKey + * @param value + * @param output + */ + public void addRecord(String metric, byte[] value) { + String primaryKey = new StringBuilder(primaryKeyHelper).append(".") + .append(metric).toString(); + byte[] key = HBaseUtil.buildKey(time, primaryKey, sourceHelper); + Put put = new Put(key); + byte[] timeInBytes = ByteBuffer.allocate(8).putLong(time).array(); + put.add(CF, timeInBytes, time, value); + output.add(put); + reporter.putMetric(chunk.getDataType(), primaryKey); + } + + /** + * Process a chunk to store in HBase. + * + * @param chunk + * @param output + * @param reporter + * @throws Throwable + */ + public void process(Chunk chunk, ArrayList<Put> output, Reporter reporter) + throws Throwable { + this.output = output; + this.reporter = reporter; + this.chunk = chunk; + this.primaryKeyHelper = chunk.getDataType(); + this.sourceHelper = chunk.getSource(); + reporter.putSource(primaryKeyHelper, sourceHelper); + parse(chunk.getData()); + addMeta(); + } + + protected void addMeta() { + byte[] key = HBaseUtil.buildKey(time, chunk.getDataType(), sourceHelper); + Put put = new Put(key); + String family = "a"; + byte[] timeInBytes = ByteBuffer.allocate(8).putLong(time).array(); + put.add(family.getBytes(), timeInBytes, time, chunk.getTags().getBytes()); + output.add(put); + } + +} http://git-wip-us.apache.org/repos/asf/chukwa/blob/f9dea324/src/main/java/org/apache/hadoop/chukwa/extraction/hbase/ChukwaMetricsProcessor.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/hadoop/chukwa/extraction/hbase/ChukwaMetricsProcessor.java b/src/main/java/org/apache/hadoop/chukwa/extraction/hbase/ChukwaMetricsProcessor.java new file mode 100644 index 0000000..156d9d5 --- /dev/null +++ b/src/main/java/org/apache/hadoop/chukwa/extraction/hbase/ChukwaMetricsProcessor.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.chukwa.extraction.hbase; + +import java.security.NoSuchAlgorithmException; +import java.util.ArrayList; + +import org.apache.hadoop.chukwa.Chunk; +import org.apache.hadoop.chukwa.datacollection.writer.hbase.Reporter; +import org.apache.hadoop.hbase.client.Put; +import org.apache.log4j.Logger; + +public class ChukwaMetricsProcessor extends HadoopMetricsProcessor { + static Logger LOG = Logger.getLogger(ChukwaMetricsProcessor.class); + + public ChukwaMetricsProcessor() throws NoSuchAlgorithmException { + super(); + } + + /** + * Process cluster name and store in HBase. + * + * @param chunk + * @param output + * @param reporter + * @throws Throwable + */ + @Override + public void process(Chunk chunk, ArrayList<Put> output, Reporter reporter) + throws Throwable { + this.output = output; + this.reporter = reporter; + this.chunk = chunk; + this.primaryKeyHelper = chunk.getDataType(); + this.sourceHelper = chunk.getSource(); + String clusterName = chunk.getTag("cluster"); + reporter.putSource(primaryKeyHelper, sourceHelper); + reporter.putClusterName(primaryKeyHelper, clusterName); + parse(chunk.getData()); + addMeta(); + } + +} http://git-wip-us.apache.org/repos/asf/chukwa/blob/f9dea324/src/main/java/org/apache/hadoop/chukwa/extraction/hbase/DefaultProcessor.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/hadoop/chukwa/extraction/hbase/DefaultProcessor.java b/src/main/java/org/apache/hadoop/chukwa/extraction/hbase/DefaultProcessor.java new file mode 100644 index 0000000..2da64a3 --- /dev/null +++ b/src/main/java/org/apache/hadoop/chukwa/extraction/hbase/DefaultProcessor.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.chukwa.extraction.hbase; + +import java.nio.ByteBuffer; +import java.security.NoSuchAlgorithmException; + +import org.apache.hadoop.chukwa.util.HBaseUtil; +import org.apache.hadoop.hbase.client.Put; +import org.apache.log4j.Logger; +import org.json.simple.JSONObject; + +public class DefaultProcessor extends AbstractProcessor { + + public DefaultProcessor() throws NoSuchAlgorithmException { + super(); + // TODO Auto-generated constructor stub + } + +static Logger LOG = Logger.getLogger(DefaultProcessor.class); + + @Override + protected void parse(byte[] recordEntry) throws Throwable { + byte[] key = HBaseUtil.buildKey(time, chunk.getDataType(), chunk.getSource()); + Put put = new Put(key); + byte[] timeInBytes = ByteBuffer.allocate(8).putLong(time).array(); + put.add("t".getBytes(), timeInBytes, chunk.getData()); + output.add(put); + JSONObject json = new JSONObject(); + json.put("sig", key); + json.put("type", "unknown"); + reporter.put(chunk.getDataType(), chunk.getSource(), json.toString()); + } + +} http://git-wip-us.apache.org/repos/asf/chukwa/blob/f9dea324/src/main/java/org/apache/hadoop/chukwa/extraction/hbase/HadoopMetricsProcessor.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/hadoop/chukwa/extraction/hbase/HadoopMetricsProcessor.java b/src/main/java/org/apache/hadoop/chukwa/extraction/hbase/HadoopMetricsProcessor.java new file mode 100644 index 0000000..3afd71a --- /dev/null +++ b/src/main/java/org/apache/hadoop/chukwa/extraction/hbase/HadoopMetricsProcessor.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.chukwa.extraction.hbase; + + +import java.nio.ByteBuffer; +import java.security.NoSuchAlgorithmException; +import java.util.Iterator; + +import org.apache.hadoop.chukwa.util.HBaseUtil; +import org.apache.hadoop.hbase.client.Put; +import org.apache.log4j.Logger; +import org.json.simple.JSONObject; +import org.json.simple.JSONValue; + +public class HadoopMetricsProcessor extends AbstractProcessor { + + static Logger LOG = Logger.getLogger(HadoopMetricsProcessor.class); + static final String timestampField = "timestamp"; + static final String contextNameField = "contextName"; + static final String recordNameField = "recordName"; + static final byte[] cf = "t".getBytes(); + + public HadoopMetricsProcessor() throws NoSuchAlgorithmException { + } + + @Override + protected void parse(byte[] recordEntry) throws Throwable { + try { + String body = new String(recordEntry); + int start = body.indexOf('{'); + JSONObject json = (JSONObject) JSONValue.parse(body.substring(start)); + + time = ((Long) json.get(timestampField)).longValue(); + String contextName = (String) json.get(contextNameField); + String recordName = (String) json.get(recordNameField); + byte[] timeInBytes = ByteBuffer.allocate(8).putLong(time).array(); + + @SuppressWarnings("unchecked") + Iterator<String> ki = json.keySet().iterator(); + while (ki.hasNext()) { + String keyName = ki.next(); + if (timestampField.intern() == keyName.intern()) { + continue; + } else if (contextNameField.intern() == keyName.intern()) { + continue; + } else if (recordNameField.intern() == keyName.intern()) { + continue; + } else { + if(json.get(keyName)!=null) { + byte[] v = json.get(keyName).toString().getBytes(); + String primaryKey = new StringBuilder(contextName).append("."). + append(recordName).append("."). + append(keyName).toString(); + byte[] rowKey = HBaseUtil.buildKey(time, primaryKey, chunk.getSource()); + Put r = new Put(rowKey); + r.add(cf, timeInBytes, time, v); + output.add(r); + } + } + } + + } catch (Exception e) { + LOG.warn("Wrong format in HadoopMetricsProcessor [" + recordEntry + "]", + e); + throw e; + } + } + +} http://git-wip-us.apache.org/repos/asf/chukwa/blob/f9dea324/src/main/java/org/apache/hadoop/chukwa/extraction/hbase/LogEntry.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/hadoop/chukwa/extraction/hbase/LogEntry.java b/src/main/java/org/apache/hadoop/chukwa/extraction/hbase/LogEntry.java new file mode 100644 index 0000000..dcbe2d4 --- /dev/null +++ b/src/main/java/org/apache/hadoop/chukwa/extraction/hbase/LogEntry.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.chukwa.extraction.hbase; + +import java.text.ParseException; +import java.text.SimpleDateFormat; +import java.util.Date; + +public class LogEntry { + private final static SimpleDateFormat sdf = new SimpleDateFormat( + "yyyy-MM-dd HH:mm"); + + private Date date; + private String logLevel; + private String className; + private String body; + + public LogEntry(String recordEntry) throws ParseException { + String dStr = recordEntry.substring(0, 23); + date = sdf.parse(dStr); + int start = 24; + int idx = recordEntry.indexOf(' ', start); + logLevel = recordEntry.substring(start, idx); + start = idx + 1; + idx = recordEntry.indexOf(' ', start); + className = recordEntry.substring(start, idx - 1); + body = recordEntry.substring(idx + 1); + } + + public Date getDate() { + return date; + } + + public void setDate(Date date) { + this.date = date; + } + + public String getLogLevel() { + return logLevel; + } + + public String getClassName() { + return className; + } + + public String getBody() { + return body; + } +} http://git-wip-us.apache.org/repos/asf/chukwa/blob/f9dea324/src/main/java/org/apache/hadoop/chukwa/extraction/hbase/ProcessorFactory.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/hadoop/chukwa/extraction/hbase/ProcessorFactory.java b/src/main/java/org/apache/hadoop/chukwa/extraction/hbase/ProcessorFactory.java new file mode 100644 index 0000000..96931d7 --- /dev/null +++ b/src/main/java/org/apache/hadoop/chukwa/extraction/hbase/ProcessorFactory.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.chukwa.extraction.hbase; + + +import java.util.HashMap; +import org.apache.log4j.Logger; + +public class ProcessorFactory { + static Logger log = Logger.getLogger(ProcessorFactory.class); + + private static HashMap<String, AbstractProcessor> processors = new HashMap<String, AbstractProcessor>(); // registry + + public ProcessorFactory() { + } + + public static AbstractProcessor getProcessor(String parserClass) + throws UnknownRecordTypeException { + if (processors.containsKey(parserClass)) { + return processors.get(parserClass); + } else { + AbstractProcessor processor = null; + try { + processor = (AbstractProcessor) Class.forName(parserClass).getConstructor() + .newInstance(); + } catch (ClassNotFoundException e) { + throw new UnknownRecordTypeException("Unknown parserClass:" + + parserClass, e); + } catch (Exception e) { + throw new UnknownRecordTypeException("error constructing processor", e); + } + + // TODO using a ThreadSafe/reuse flag to actually decide if we want + // to reuse the same processor again and again + processors.put(parserClass, processor); + return processor; + } + } +} http://git-wip-us.apache.org/repos/asf/chukwa/blob/f9dea324/src/main/java/org/apache/hadoop/chukwa/extraction/hbase/SystemMetrics.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/hadoop/chukwa/extraction/hbase/SystemMetrics.java b/src/main/java/org/apache/hadoop/chukwa/extraction/hbase/SystemMetrics.java new file mode 100644 index 0000000..a72e1bd --- /dev/null +++ b/src/main/java/org/apache/hadoop/chukwa/extraction/hbase/SystemMetrics.java @@ -0,0 +1,200 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Demux parser for system metrics data collected through + * org.apache.hadoop.chukwa.datacollection.adaptor.sigar.SystemMetrics. + */ +package org.apache.hadoop.chukwa.extraction.hbase; + +import java.security.NoSuchAlgorithmException; +import java.util.ArrayList; +import java.util.Calendar; +import java.util.Iterator; +import java.util.TimeZone; + +import org.apache.hadoop.chukwa.Chunk; +import org.apache.hadoop.chukwa.datacollection.writer.hbase.Annotation.Table; +import org.apache.hadoop.chukwa.datacollection.writer.hbase.Annotation.Tables; +import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord; +import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.mapred.OutputCollector; +import org.apache.hadoop.mapred.Reporter; +import org.json.simple.JSONArray; +import org.json.simple.JSONObject; +import org.json.simple.JSONValue; + +public class SystemMetrics extends AbstractProcessor { + + public SystemMetrics() throws NoSuchAlgorithmException { + super(); + } + + @Override + protected void parse(byte[] recordEntry) throws Throwable { + String buffer = new String(recordEntry); + JSONObject json = (JSONObject) JSONValue.parse(buffer); + time = ((Long) json.get("timestamp")).longValue(); + ChukwaRecord record = new ChukwaRecord(); + JSONArray cpuList = (JSONArray) json.get("cpu"); + double combined = 0.0; + double user = 0.0; + double sys = 0.0; + double idle = 0.0; + int actualSize = 0; + for (int i = 0; i < cpuList.size(); i++) { + JSONObject cpu = (JSONObject) cpuList.get(i); + // Work around for sigar returning null sometimes for cpu metrics on + // pLinux + if (cpu.get("combined") == null) { + continue; + } + actualSize++; + combined = combined + Double.parseDouble(cpu.get("combined").toString()); + user = user + Double.parseDouble(cpu.get("user").toString()); + sys = sys + Double.parseDouble(cpu.get("sys").toString()); + idle = idle + Double.parseDouble(cpu.get("idle").toString()); + for (@SuppressWarnings("unchecked") + Iterator<String> iterator = (Iterator<String>) cpu.keySet().iterator(); iterator + .hasNext();) { + String key = iterator.next(); + addRecord("cpu." + key + "." + i, cpu.get(key).toString()); + } + } + combined = combined / actualSize; + user = user / actualSize; + sys = sys / actualSize; + idle = idle / actualSize; + addRecord("cpu.combined", Double.toString(combined)); + addRecord("cpu.user", Double.toString(user)); + addRecord("cpu.idle", Double.toString(idle)); + addRecord("cpu.sys", Double.toString(sys)); + + addRecord("Uptime", json.get("uptime").toString()); + JSONArray loadavg = (JSONArray) json.get("loadavg"); + addRecord("LoadAverage.1", loadavg.get(0).toString()); + addRecord("LoadAverage.5", loadavg.get(1).toString()); + addRecord("LoadAverage.15", loadavg.get(2).toString()); + + record = new ChukwaRecord(); + JSONObject memory = (JSONObject) json.get("memory"); + @SuppressWarnings("unchecked") + Iterator<String> memKeys = memory.keySet().iterator(); + while (memKeys.hasNext()) { + String key = memKeys.next(); + addRecord("memory." + key, memory.get(key).toString()); + } + + record = new ChukwaRecord(); + JSONObject swap = (JSONObject) json.get("swap"); + @SuppressWarnings("unchecked") + Iterator<String> swapKeys = swap.keySet().iterator(); + while (swapKeys.hasNext()) { + String key = swapKeys.next(); + addRecord("swap." + key, swap.get(key).toString()); + } + + double rxBytes = 0; + double rxDropped = 0; + double rxErrors = 0; + double rxPackets = 0; + double txBytes = 0; + double txCollisions = 0; + double txErrors = 0; + double txPackets = 0; + record = new ChukwaRecord(); + JSONArray netList = (JSONArray) json.get("network"); + for (int i = 0; i < netList.size(); i++) { + JSONObject netIf = (JSONObject) netList.get(i); + @SuppressWarnings("unchecked") + Iterator<String> keys = netIf.keySet().iterator(); + while (keys.hasNext()) { + String key = keys.next(); + record.add(key + "." + i, netIf.get(key).toString()); + if (i != 0) { + if (key.equals("RxBytes")) { + rxBytes = rxBytes + (Long) netIf.get(key); + } else if (key.equals("RxDropped")) { + rxDropped = rxDropped + (Long) netIf.get(key); + } else if (key.equals("RxErrors")) { + rxErrors = rxErrors + (Long) netIf.get(key); + } else if (key.equals("RxPackets")) { + rxPackets = rxPackets + (Long) netIf.get(key); + } else if (key.equals("TxBytes")) { + txBytes = txBytes + (Long) netIf.get(key); + } else if (key.equals("TxCollisions")) { + txCollisions = txCollisions + (Long) netIf.get(key); + } else if (key.equals("TxErrors")) { + txErrors = txErrors + (Long) netIf.get(key); + } else if (key.equals("TxPackets")) { + txPackets = txPackets + (Long) netIf.get(key); + } + } + } + } + + addRecord("network.RxBytes", Double.toString(rxBytes)); + addRecord("network.RxDropped", Double.toString(rxDropped)); + addRecord("network.RxErrors", Double.toString(rxErrors)); + addRecord("network.RxPackets", Double.toString(rxPackets)); + addRecord("network.TxBytes", Double.toString(txBytes)); + addRecord("network.TxCollisions", Double.toString(txCollisions)); + addRecord("network.TxErrors", Double.toString(txErrors)); + addRecord("network.TxPackets", Double.toString(txPackets)); + + double readBytes = 0; + double reads = 0; + double writeBytes = 0; + double writes = 0; + double total = 0; + double used = 0; + record = new ChukwaRecord(); + JSONArray diskList = (JSONArray) json.get("disk"); + for (int i = 0; i < diskList.size(); i++) { + JSONObject disk = (JSONObject) diskList.get(i); + Iterator<String> keys = disk.keySet().iterator(); + while (keys.hasNext()) { + String key = keys.next(); + record.add(key + "." + i, disk.get(key).toString()); + if (key.equals("ReadBytes")) { + readBytes = readBytes + (Long) disk.get("ReadBytes"); + } else if (key.equals("Reads")) { + reads = reads + (Long) disk.get("Reads"); + } else if (key.equals("WriteBytes")) { + writeBytes = writeBytes + (Long) disk.get("WriteBytes"); + } else if (key.equals("Writes")) { + writes = writes + (Long) disk.get("Writes"); + } else if (key.equals("Total")) { + total = total + (Long) disk.get("Total"); + } else if (key.equals("Used")) { + used = used + (Long) disk.get("Used"); + } + } + } + double percentUsed = used / total; + addRecord("disk.ReadBytes", Double.toString(readBytes)); + addRecord("disk.Reads", Double.toString(reads)); + addRecord("disk.WriteBytes", Double.toString(writeBytes)); + addRecord("disk.Writes", Double.toString(writes)); + addRecord("disk.Total", Double.toString(total)); + addRecord("disk.Used", Double.toString(used)); + addRecord("disk.PercentUsed", Double.toString(percentUsed)); + } + +} http://git-wip-us.apache.org/repos/asf/chukwa/blob/f9dea324/src/main/java/org/apache/hadoop/chukwa/extraction/hbase/UnknownRecordTypeException.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/hadoop/chukwa/extraction/hbase/UnknownRecordTypeException.java b/src/main/java/org/apache/hadoop/chukwa/extraction/hbase/UnknownRecordTypeException.java new file mode 100644 index 0000000..866eb2c --- /dev/null +++ b/src/main/java/org/apache/hadoop/chukwa/extraction/hbase/UnknownRecordTypeException.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.chukwa.extraction.hbase; + + +public class UnknownRecordTypeException extends Exception { + + /** + * + */ + private static final long serialVersionUID = 8925135975093252279L; + + public UnknownRecordTypeException() { + } + + public UnknownRecordTypeException(String message) { + super(message); + } + + public UnknownRecordTypeException(Throwable cause) { + super(cause); + } + + public UnknownRecordTypeException(String message, Throwable cause) { + super(message, cause); + } + +} http://git-wip-us.apache.org/repos/asf/chukwa/blob/f9dea324/src/main/java/org/apache/hadoop/chukwa/util/HBaseUtil.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/hadoop/chukwa/util/HBaseUtil.java b/src/main/java/org/apache/hadoop/chukwa/util/HBaseUtil.java new file mode 100644 index 0000000..d463dd1 --- /dev/null +++ b/src/main/java/org/apache/hadoop/chukwa/util/HBaseUtil.java @@ -0,0 +1,62 @@ +package org.apache.hadoop.chukwa.util; + +import java.security.MessageDigest; +import java.security.NoSuchAlgorithmException; +import java.util.Calendar; +import java.util.TimeZone; + +import org.apache.hadoop.chukwa.extraction.hbase.AbstractProcessor; +import org.apache.log4j.Logger; +import org.mortbay.log.Log; + +public class HBaseUtil { + private static Logger LOG = Logger.getLogger(HBaseUtil.class); + + static Calendar c = Calendar.getInstance(TimeZone.getTimeZone("UTC")); + static MessageDigest md5 = null; + static { + try { + md5 = MessageDigest.getInstance("md5"); + } catch (NoSuchAlgorithmException e) { + LOG.warn(ExceptionUtil.getStackTrace(e)); + } + } + + public HBaseUtil() throws NoSuchAlgorithmException { + } + + public byte[] buildKey(long time, String metricGroup, String metric, + String source) { + String fullKey = new StringBuilder(metricGroup).append(".") + .append(metric).toString(); + return buildKey(time, fullKey, source); + } + + public static byte[] buildKey(long time, String primaryKey) { + c.setTimeInMillis(time); + byte[] day = Integer.toString(c.get(Calendar.DAY_OF_YEAR)).getBytes(); + byte[] pk = getHash(primaryKey); + byte[] key = new byte[8]; + System.arraycopy(day, 0, key, 0, day.length); + System.arraycopy(pk, 0, key, 2, 3); + return key; + } + + public static byte[] buildKey(long time, String primaryKey, String source) { + c.setTimeInMillis(time); + byte[] day = Integer.toString(c.get(Calendar.DAY_OF_YEAR)).getBytes(); + byte[] pk = getHash(primaryKey); + byte[] src = getHash(source); + byte[] key = new byte[8]; + System.arraycopy(day, 0, key, 0, day.length); + System.arraycopy(pk, 0, key, 2, 3); + System.arraycopy(src, 0, key, 5, 3); + return key; + } + + private static byte[] getHash(String key) { + byte[] hash = new byte[3]; + System.arraycopy(md5.digest(key.getBytes()), 0, hash, 0, 3); + return hash; + } +}
