http://git-wip-us.apache.org/repos/asf/chukwa/blob/7f662e8c/src/main/java/org/apache/hadoop/chukwa/dataloader/MetricDataLoader.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/hadoop/chukwa/dataloader/MetricDataLoader.java b/src/main/java/org/apache/hadoop/chukwa/dataloader/MetricDataLoader.java index ae9233c..5538a40 100644 --- a/src/main/java/org/apache/hadoop/chukwa/dataloader/MetricDataLoader.java +++ b/src/main/java/org/apache/hadoop/chukwa/dataloader/MetricDataLoader.java @@ -27,6 +27,7 @@ import java.text.SimpleDateFormat; import java.util.Date; import java.util.HashMap; import java.util.Iterator; +import java.util.Map.Entry; import java.util.regex.Matcher; import java.util.regex.Pattern; import java.util.concurrent.Callable; @@ -62,10 +63,15 @@ public class MetricDataLoader implements Callable { private Connection conn = null; private Path source = null; - private static ChukwaConfiguration conf = null; - private static FileSystem fs = null; + private ChukwaConfiguration conf = null; + private FileSystem fs = null; private String jdbc_url = ""; + public MetricDataLoader(String fileName) throws IOException { + conf = new ChukwaConfiguration(); + fs = FileSystem.get(conf); + } + /** Creates a new instance of DBWriter */ public MetricDataLoader(ChukwaConfiguration conf, FileSystem fs, String fileName) { source = new Path(fileName); @@ -171,6 +177,9 @@ public class MetricDataLoader implements Callable { return( sb.toString()); } + @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = + "SQL_NONCONSTANT_STRING_PASSED_TO_EXECUTE", + justification = "Dynamic based upon tables in the database") public boolean run() throws IOException { boolean first=true; log.info("StreamName: " + source.getName()); @@ -195,7 +204,7 @@ public class MetricDataLoader implements Callable { try { Pattern p = Pattern.compile("(.*)\\-(\\d+)$"); int batch = 0; - while (reader.next(key, record)) { + while (reader !=null && reader.next(key, record)) { numOfRecords++; if(first) { try { @@ -336,12 +345,9 @@ public class MetricDataLoader implements Callable { } } - Iterator<String> i = hashReport.keySet().iterator(); - while (i.hasNext()) { - Object iteratorNode = i.next(); - HashMap<String, String> recordSet = hashReport.get(iteratorNode); - Iterator<String> fi = recordSet.keySet().iterator(); - // Map any primary key that was not included in the report keyName + for(Entry<String, HashMap<String, String>> entry : hashReport.entrySet()) { + HashMap<String, String> recordSet = entry.getValue(); + // Map any primary key that was not included in the report keyName StringBuilder sqlPriKeys = new StringBuilder(); try { for (String priKey : priKeys) { @@ -363,8 +369,9 @@ public class MetricDataLoader implements Callable { // Map the hash objects to database table columns StringBuilder sqlValues = new StringBuilder(); boolean firstValue = true; - while (fi.hasNext()) { - String fieldKey = fi.next(); + for(Entry<String, String> fi : recordSet.entrySet()) { + String fieldKey = fi.getKey(); + String fieldValue = fi.getValue(); if (transformer.containsKey(fieldKey) && transformer.get(fieldKey).intern()!="_delete".intern()) { if (!firstValue) { sqlValues.append(", "); @@ -378,12 +385,12 @@ public class MetricDataLoader implements Callable { if (conversion.containsKey(conversionKey)) { sqlValues.append(transformer.get(fieldKey)); sqlValues.append("="); - sqlValues.append(recordSet.get(fieldKey)); + sqlValues.append(fieldValue); sqlValues.append(conversion.get(conversionKey).toString()); } else { sqlValues.append(transformer.get(fieldKey)); sqlValues.append("=\'"); - sqlValues.append(escapeQuotes(recordSet.get(fieldKey))); + sqlValues.append(escapeQuotes(fieldValue)); sqlValues.append("\'"); } } else if (dbSchema.get(dbTables.get(dbKey)).get( @@ -391,8 +398,7 @@ public class MetricDataLoader implements Callable { SimpleDateFormat formatter = new SimpleDateFormat( "yyyy-MM-dd HH:mm:ss"); Date recordDate = new Date(); - recordDate.setTime(Long.parseLong(recordSet - .get(fieldKey))); + recordDate.setTime(Long.parseLong(fieldValue)); sqlValues.append(transformer.get(fieldKey)); sqlValues.append("=\""); sqlValues.append(formatter.format(recordDate)); @@ -405,7 +411,7 @@ public class MetricDataLoader implements Callable { transformer.get(fieldKey)) == java.sql.Types.INTEGER) { long tmp = 0; try { - tmp = Long.parseLong(recordSet.get(fieldKey).toString()); + tmp = Long.parseLong(fieldValue); String conversionKey = "conversion." + fieldKey; if (conversion.containsKey(conversionKey)) { tmp = tmp @@ -420,7 +426,7 @@ public class MetricDataLoader implements Callable { sqlValues.append(tmp); } else { double tmp = 0; - tmp = Double.parseDouble(recordSet.get(fieldKey).toString()); + tmp = Double.parseDouble(fieldValue); String conversionKey = "conversion." + fieldKey; if (conversion.containsKey(conversionKey)) { tmp = tmp @@ -455,7 +461,6 @@ public class MetricDataLoader implements Callable { } } } - StringBuilder sql = new StringBuilder(); if (sqlPriKeys.length() > 0) { sql.append("INSERT INTO "); @@ -587,9 +592,7 @@ public class MetricDataLoader implements Callable { public static void main(String[] args) { try { - conf = new ChukwaConfiguration(); - fs = FileSystem.get(conf); - MetricDataLoader mdl = new MetricDataLoader(conf, fs, args[0]); + MetricDataLoader mdl = new MetricDataLoader(args[0]); mdl.run(); } catch (Exception e) { e.printStackTrace();
http://git-wip-us.apache.org/repos/asf/chukwa/blob/7f662e8c/src/main/java/org/apache/hadoop/chukwa/dataloader/MetricDataLoaderPool.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/hadoop/chukwa/dataloader/MetricDataLoaderPool.java b/src/main/java/org/apache/hadoop/chukwa/dataloader/MetricDataLoaderPool.java index b763087..5ad5258 100644 --- a/src/main/java/org/apache/hadoop/chukwa/dataloader/MetricDataLoaderPool.java +++ b/src/main/java/org/apache/hadoop/chukwa/dataloader/MetricDataLoaderPool.java @@ -37,8 +37,8 @@ public class MetricDataLoaderPool extends DataLoaderFactory { protected MetricDataLoader threads[] = null; private static String DATA_LOADER_THREAD_LIMIT = "chukwa.data.loader.threads.limit"; private int size = 1; - private static CompletionService completion = null; - private static ExecutorService executor = null; + private CompletionService completion = null; + private ExecutorService executor = null; public MetricDataLoaderPool() { } http://git-wip-us.apache.org/repos/asf/chukwa/blob/7f662e8c/src/main/java/org/apache/hadoop/chukwa/dataloader/SocketDataLoader.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/hadoop/chukwa/dataloader/SocketDataLoader.java b/src/main/java/org/apache/hadoop/chukwa/dataloader/SocketDataLoader.java index 1cf801c..c47bdff 100644 --- a/src/main/java/org/apache/hadoop/chukwa/dataloader/SocketDataLoader.java +++ b/src/main/java/org/apache/hadoop/chukwa/dataloader/SocketDataLoader.java @@ -22,6 +22,7 @@ import java.io.DataOutputStream; import java.io.IOException; import java.net.Socket; import java.net.SocketException; +import java.nio.charset.Charset; import java.util.Collection; import java.util.Collections; import java.util.Iterator; @@ -100,7 +101,7 @@ public class SocketDataLoader implements Runnable { output.append(" all"); } output.append("\n"); - dos.write((output.toString()).getBytes()); + dos.write((output.toString()).getBytes(Charset.forName("UTF-8"))); } catch (SocketException e) { log.warn("Error while settin soTimeout to 120000"); } @@ -135,7 +136,7 @@ public class SocketDataLoader implements Runnable { /* * Unsubscribe from Chukwa collector and stop streaming. */ - public void stop() { + public synchronized void stop() { if(s!=null) { try { dis.close(); @@ -169,7 +170,7 @@ public class SocketDataLoader implements Runnable { * into SDL queue. */ @Override - public void run() { + public synchronized void run() { try { Chunk c; while ((c = ChunkImpl.read(dis)) != null) { http://git-wip-us.apache.org/repos/asf/chukwa/blob/7f662e8c/src/main/java/org/apache/hadoop/chukwa/datastore/UserStore.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/hadoop/chukwa/datastore/UserStore.java b/src/main/java/org/apache/hadoop/chukwa/datastore/UserStore.java index 196a38a..8802fcf 100755 --- a/src/main/java/org/apache/hadoop/chukwa/datastore/UserStore.java +++ b/src/main/java/org/apache/hadoop/chukwa/datastore/UserStore.java @@ -20,13 +20,13 @@ package org.apache.hadoop.chukwa.datastore; import java.io.File; import java.io.IOException; +import java.nio.charset.Charset; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.json.simple.JSONArray; import org.json.simple.JSONObject; import org.json.simple.JSONValue; - import org.apache.hadoop.chukwa.conf.ChukwaConfiguration; import org.apache.hadoop.chukwa.hicc.HiccWebServer; import org.apache.hadoop.chukwa.rest.bean.UserBean; @@ -44,10 +44,14 @@ public class UserStore { private static Log log = LogFactory.getLog(UserStore.class); private static Configuration config = new Configuration(); private static ChukwaConfiguration chukwaConf = new ChukwaConfiguration(); - private static String hiccPath = config.get("fs.defaultFS")+File.separator+chukwaConf.get("chukwa.data.dir")+File.separator+"hicc"+File.separator+"users"; + private static String hiccPath = null; + + static { + config = HiccWebServer.getConfig(); + hiccPath = config.get("fs.defaultFS")+File.separator+chukwaConf.get("chukwa.data.dir")+File.separator+"hicc"+File.separator+"users"; +} public UserStore() throws IllegalAccessException { - UserStore.config = HiccWebServer.getConfig(); } public UserStore(String uid) throws IllegalAccessException { @@ -73,7 +77,7 @@ public class UserStore { viewStream.readFully(buffer); viewStream.close(); try { - JSONObject json = (JSONObject) JSONValue.parse(new String(buffer)); + JSONObject json = (JSONObject) JSONValue.parse(new String(buffer, Charset.forName("UTF-8"))); profile = new UserBean(json); } catch (Exception e) { log.error(ExceptionUtil.getStackTrace(e)); @@ -110,7 +114,7 @@ public class UserStore { try { fs = FileSystem.get(config); FSDataOutputStream out = fs.create(profileFile,true); - out.write(profile.deserialize().toString().getBytes()); + out.write(profile.deserialize().toString().getBytes(Charset.forName("UTF-8"))); out.close(); } catch (IOException ex) { log.error(ExceptionUtil.getStackTrace(ex)); @@ -138,7 +142,7 @@ public class UserStore { profileStream.readFully(buffer); profileStream.close(); try { - UserBean user = new UserBean((JSONObject) JSONValue.parse(new String(buffer))); + UserBean user = new UserBean((JSONObject) JSONValue.parse(new String(buffer, Charset.forName("UTF-8")))); list.add(user.getId()); } catch (Exception e) { log.error(ExceptionUtil.getStackTrace(e)); http://git-wip-us.apache.org/repos/asf/chukwa/blob/7f662e8c/src/main/java/org/apache/hadoop/chukwa/datastore/ViewStore.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/hadoop/chukwa/datastore/ViewStore.java b/src/main/java/org/apache/hadoop/chukwa/datastore/ViewStore.java index 9db639f..258300c 100755 --- a/src/main/java/org/apache/hadoop/chukwa/datastore/ViewStore.java +++ b/src/main/java/org/apache/hadoop/chukwa/datastore/ViewStore.java @@ -20,13 +20,12 @@ package org.apache.hadoop.chukwa.datastore; import java.io.File; import java.io.IOException; +import java.nio.charset.Charset; import java.util.LinkedHashMap; import java.util.Map; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; - - import org.apache.hadoop.chukwa.conf.ChukwaConfiguration; import org.apache.hadoop.chukwa.hicc.HiccWebServer; import org.apache.hadoop.chukwa.rest.bean.ViewBean; @@ -44,15 +43,18 @@ public class ViewStore { private String uid = null; private ViewBean view = null; private static Log log = LogFactory.getLog(ViewStore.class); - private static Configuration config = new Configuration(); + private static Configuration config = null; private static ChukwaConfiguration chukwaConf = new ChukwaConfiguration(); - private static String viewPath = config.get("fs.defaultFS")+File.separator+chukwaConf.get("chukwa.data.dir")+File.separator+"hicc"+File.separator+"views"; + private static String viewPath = null; private static String publicViewPath = viewPath+File.separator+"public"; private static String usersViewPath = viewPath+File.separator+"users"; private static String PUBLIC = "public".intern(); + static { + config = HiccWebServer.getConfig(); + viewPath = config.get("fs.defaultFS")+File.separator+chukwaConf.get("chukwa.data.dir")+File.separator+"hicc"+File.separator+"views"; + } public ViewStore() throws IllegalAccessException { - ViewStore.config = HiccWebServer.getConfig(); } public ViewStore(String uid, String vid) throws IllegalAccessException { @@ -141,7 +143,7 @@ public class ViewStore { try { FileSystem fs = FileSystem.get(config); FSDataOutputStream out = fs.create(viewFile,true); - out.write(view.deserialize().toString().getBytes()); + out.write(view.deserialize().toString().getBytes(Charset.forName("UTF-8"))); out.close(); } catch (IOException ex) { log.error(ExceptionUtil.getStackTrace(ex)); http://git-wip-us.apache.org/repos/asf/chukwa/blob/7f662e8c/src/main/java/org/apache/hadoop/chukwa/datastore/WidgetStore.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/hadoop/chukwa/datastore/WidgetStore.java b/src/main/java/org/apache/hadoop/chukwa/datastore/WidgetStore.java index 10343d5..9512824 100755 --- a/src/main/java/org/apache/hadoop/chukwa/datastore/WidgetStore.java +++ b/src/main/java/org/apache/hadoop/chukwa/datastore/WidgetStore.java @@ -20,13 +20,13 @@ package org.apache.hadoop.chukwa.datastore; import java.io.File; import java.io.IOException; +import java.nio.charset.Charset; import java.util.HashMap; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.json.simple.JSONObject; import org.json.simple.JSONValue; - import org.apache.hadoop.chukwa.conf.ChukwaConfiguration; import org.apache.hadoop.chukwa.hicc.HiccWebServer; import org.apache.hadoop.chukwa.rest.bean.CatalogBean; @@ -43,12 +43,15 @@ public class WidgetStore { private static Log log = LogFactory.getLog(WidgetStore.class); private static Configuration config = new Configuration(); private static ChukwaConfiguration chukwaConf = new ChukwaConfiguration(); - private static String hiccPath = config.get("fs.defaultFS")+File.separator+chukwaConf.get("chukwa.data.dir")+File.separator+"hicc"+File.separator+"widgets"; + private static String hiccPath = null; private static CatalogBean catalog = null; private static HashMap<String, WidgetBean> list = new HashMap<String, WidgetBean>(); + static { + config = HiccWebServer.getConfig(); + hiccPath = config.get("fs.defaultFS")+File.separator+chukwaConf.get("chukwa.data.dir")+File.separator+"hicc"+File.separator+"widgets"; + } public WidgetStore() throws IllegalAccessException { - WidgetStore.config = HiccWebServer.getConfig(); } public void set(WidgetBean widget) throws IllegalAccessException { @@ -94,7 +97,7 @@ public class WidgetStore { widgetStream.readFully(buffer); widgetStream.close(); try { - JSONObject widgetBuffer = (JSONObject) JSONValue.parse(new String(buffer)); + JSONObject widgetBuffer = (JSONObject) JSONValue.parse(new String(buffer, Charset.forName("UTF-8"))); WidgetBean widget = new WidgetBean(widgetBuffer); catalog.addCatalog(widget); list.put(widget.getId(),widget); http://git-wip-us.apache.org/repos/asf/chukwa/blob/7f662e8c/src/main/java/org/apache/hadoop/chukwa/datatrigger/HttpTriggerAction.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/hadoop/chukwa/datatrigger/HttpTriggerAction.java b/src/main/java/org/apache/hadoop/chukwa/datatrigger/HttpTriggerAction.java index 7bf451e..db71668 100644 --- a/src/main/java/org/apache/hadoop/chukwa/datatrigger/HttpTriggerAction.java +++ b/src/main/java/org/apache/hadoop/chukwa/datatrigger/HttpTriggerAction.java @@ -30,8 +30,11 @@ import java.io.OutputStreamWriter; import java.net.URL; import java.net.HttpURLConnection; import java.net.MalformedURLException; +import java.nio.charset.Charset; +import java.util.Arrays; import java.util.Map; import java.util.HashMap; +import java.util.Map.Entry; /** * Trigger action that makes an HTTP request when executed. @@ -129,14 +132,16 @@ public class HttpTriggerAction implements TriggerAction { // set headers boolean contentLengthExists = false; if (headers != null) { - for (String name: headers.keySet()) { + for(Entry<String, String> entry : headers.entrySet()) { + String name = entry.getKey(); + String value = entry.getValue(); if (log.isDebugEnabled()) { - log.debug("Setting header " + name + ": " + headers.get(name)); + log.debug("Setting header " + name + ": " + value); } if (name.equalsIgnoreCase("content-length")) { contentLengthExists = true; } - conn.setRequestProperty(name, headers.get(name)); + conn.setRequestProperty(name, value); } } @@ -149,7 +154,7 @@ public class HttpTriggerAction implements TriggerAction { // send body if it exists if (body != null) { conn.setDoOutput(true); - OutputStreamWriter writer = new OutputStreamWriter(conn.getOutputStream()); + OutputStreamWriter writer = new OutputStreamWriter(conn.getOutputStream(), Charset.forName("UTF-8")); writer.write(body); writer.flush(); writer.close(); @@ -169,7 +174,7 @@ public class HttpTriggerAction implements TriggerAction { } else { BufferedReader reader = new BufferedReader( - new InputStreamReader(conn.getInputStream())); + new InputStreamReader(conn.getInputStream(), Charset.forName("UTF-8"))); String line; StringBuilder sb = new StringBuilder(); while ((line = reader.readLine()) != null) { @@ -215,7 +220,7 @@ public class HttpTriggerAction implements TriggerAction { for (String header : headersSplit) { String[] nvp = header.split(":", 2); if (nvp.length < 2) { - log.error("Invalid HTTP header found: " + nvp); + log.error("Invalid HTTP header found: " + Arrays.toString(nvp)); continue; } headerMap.put(nvp[0].trim(), nvp[1].trim()); http://git-wip-us.apache.org/repos/asf/chukwa/blob/7f662e8c/src/main/java/org/apache/hadoop/chukwa/extraction/archive/ChukwaArchiveManager.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/hadoop/chukwa/extraction/archive/ChukwaArchiveManager.java b/src/main/java/org/apache/hadoop/chukwa/extraction/archive/ChukwaArchiveManager.java index 1ef6c00..6cb0dc1 100644 --- a/src/main/java/org/apache/hadoop/chukwa/extraction/archive/ChukwaArchiveManager.java +++ b/src/main/java/org/apache/hadoop/chukwa/extraction/archive/ChukwaArchiveManager.java @@ -33,7 +33,7 @@ import org.apache.log4j.Logger; public class ChukwaArchiveManager implements CHUKWA_CONSTANT { static Logger log = Logger.getLogger(ChukwaArchiveManager.class); - static SimpleDateFormat day = new java.text.SimpleDateFormat("yyyyMMdd"); + SimpleDateFormat day = new java.text.SimpleDateFormat("yyyyMMdd"); static final int ONE_HOUR = 60 * 60 * 1000; static final int ONE_DAY = 24*ONE_HOUR; @@ -113,7 +113,7 @@ public class ChukwaArchiveManager implements CHUKWA_CONSTANT { if (maxPermittedErrorCount != -1 && errorCount >= maxPermittedErrorCount) { log.warn("==================\nToo many errors (" + errorCount + "), Bail out!\n=================="); - System.exit(-1); + break; } // /chukwa/archives/<YYYYMMDD>/dataSinkDirXXX // to http://git-wip-us.apache.org/repos/asf/chukwa/blob/7f662e8c/src/main/java/org/apache/hadoop/chukwa/extraction/demux/DailyChukwaRecordRolling.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/hadoop/chukwa/extraction/demux/DailyChukwaRecordRolling.java b/src/main/java/org/apache/hadoop/chukwa/extraction/demux/DailyChukwaRecordRolling.java index d1e2b24..bebd1e5 100644 --- a/src/main/java/org/apache/hadoop/chukwa/extraction/demux/DailyChukwaRecordRolling.java +++ b/src/main/java/org/apache/hadoop/chukwa/extraction/demux/DailyChukwaRecordRolling.java @@ -165,7 +165,7 @@ public class DailyChukwaRecordRolling extends Configured implements Tool { new DailyChukwaRecordRolling(), mergeArgs, deleteRawdata); List<RecordMerger> allMerge = new ArrayList<RecordMerger>(); if (rollInSequence) { - merge.run(); + merge.mergeRecords(); } else { allMerge.add(merge); merge.start(); http://git-wip-us.apache.org/repos/asf/chukwa/blob/7f662e8c/src/main/java/org/apache/hadoop/chukwa/extraction/demux/Demux.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/hadoop/chukwa/extraction/demux/Demux.java b/src/main/java/org/apache/hadoop/chukwa/extraction/demux/Demux.java index be63e16..71ac1f7 100644 --- a/src/main/java/org/apache/hadoop/chukwa/extraction/demux/Demux.java +++ b/src/main/java/org/apache/hadoop/chukwa/extraction/demux/Demux.java @@ -61,16 +61,28 @@ import org.apache.log4j.Logger; public class Demux extends Configured implements Tool { static Logger log = Logger.getLogger(Demux.class); - static SimpleDateFormat day = new java.text.SimpleDateFormat("yyyyMMdd_HH_mm"); public static Configuration jobConf = null; + protected static void setJobConf(JobConf jobConf) { + Demux.jobConf = jobConf; + } + + protected Configuration getJobConf() { + return Demux.jobConf; + } public static class MapClass extends MapReduceBase implements Mapper<ChukwaArchiveKey, ChunkImpl, ChukwaRecordKey, ChukwaRecord> { + private Configuration jobConf = null; + @Override public void configure(JobConf jobConf) { super.configure(jobConf); - Demux.jobConf = jobConf; + setJobConf(jobConf); + } + + private void setJobConf(JobConf jobConf) { + this.jobConf = jobConf; } public void map(ChukwaArchiveKey key, ChunkImpl chunk, @@ -82,15 +94,15 @@ public class Demux extends Configured implements Tool { try { long duration = System.currentTimeMillis(); if (log.isDebugEnabled()) { - log.debug("Entry: [" + chunk.getData() + "] EventType: [" + log.debug("Entry: [" + String.valueOf(chunk.getData()) + "] EventType: [" + chunk.getDataType() + "]"); } - String defaultProcessor = Demux.jobConf.get( + String defaultProcessor = jobConf.get( "chukwa.demux.mapper.default.processor", "org.apache.hadoop.chukwa.extraction.demux.processor.mapper.DefaultProcessor"); - String processorClass_pri = Demux.jobConf.get(chunk.getDataType(), + String processorClass_pri = jobConf.get(chunk.getDataType(), defaultProcessor); String processorClass = processorClass_pri.split(",")[0]; @@ -125,9 +137,11 @@ public class Demux extends Configured implements Tool { public static class ReduceClass extends MapReduceBase implements Reducer<ChukwaRecordKey, ChukwaRecord, ChukwaRecordKey, ChukwaRecord> { + private Configuration jobConf = null; + public void configure(JobConf jobConf) { super.configure(jobConf); - Demux.jobConf = jobConf; + this.jobConf = jobConf; } public void reduce(ChukwaRecordKey key, Iterator<ChukwaRecord> values, @@ -143,10 +157,10 @@ public class Demux extends Configured implements Tool { String defaultProcessor_classname = "org.apache.hadoop.chukwa.extraction.demux.processor.reducer" + ".IdentityReducer"; - String defaultProcessor = Demux.jobConf.get("chukwa.demux.reducer.default.processor", + String defaultProcessor = jobConf.get("chukwa.demux.reducer.default.processor", "," + defaultProcessor_classname); - String processClass_pri = Demux.jobConf.get(key.getReduceType(), defaultProcessor); + String processClass_pri = jobConf.get(key.getReduceType(), defaultProcessor); String[] processClass_tmps = processClass_pri.split(","); String processClass = null; if (processClass_tmps.length != 2) @@ -199,7 +213,7 @@ public class Demux extends Configured implements Tool { public int run(String[] args) throws Exception { JobConf conf = new JobConf(new ChukwaConfiguration(), Demux.class); - + SimpleDateFormat day = new java.text.SimpleDateFormat("yyyyMMdd_HH_mm"); conf.setJobName("Chukwa-Demux_" + day.format(new Date())); conf.setInputFormat(SequenceFileInputFormat.class); conf.setMapperClass(Demux.MapClass.class); http://git-wip-us.apache.org/repos/asf/chukwa/blob/7f662e8c/src/main/java/org/apache/hadoop/chukwa/extraction/demux/DemuxManager.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/hadoop/chukwa/extraction/demux/DemuxManager.java b/src/main/java/org/apache/hadoop/chukwa/extraction/demux/DemuxManager.java index 8fd155e..9fcb65b 100644 --- a/src/main/java/org/apache/hadoop/chukwa/extraction/demux/DemuxManager.java +++ b/src/main/java/org/apache/hadoop/chukwa/extraction/demux/DemuxManager.java @@ -38,8 +38,8 @@ import org.apache.log4j.Logger; public class DemuxManager implements CHUKWA_CONSTANT { static Logger log = Logger.getLogger(DemuxManager.class); - static int globalErrorcounter = 0; - static Date firstErrorTime = null; + int globalErrorcounter = 0; + Date firstErrorTime = null; protected int ERROR_SLEEP_TIME = 60; protected int NO_DATASINK_SLEEP_TIME = 20; @@ -144,7 +144,7 @@ public class DemuxManager implements CHUKWA_CONSTANT { + nagiosPort + ", reportingHost:" + reportingHost); - if (nagiosHost == null || nagiosHost.length() == 0 || nagiosPort == 0 || reportingHost.length() == 0 || reportingHost == null) { + if (nagiosHost == null || nagiosHost.length() == 0 || nagiosPort == 0 || reportingHost == null || reportingHost.length() == 0) { sendAlert = false; log.warn("Alerting is OFF"); } @@ -159,7 +159,7 @@ public class DemuxManager implements CHUKWA_CONSTANT { if (maxPermittedErrorCount != -1 && globalErrorcounter >= maxPermittedErrorCount) { log.warn("==================\nToo many errors (" + globalErrorcounter + "), Bail out!\n=================="); - System.exit(-1); + break; } // Check for anomalies http://git-wip-us.apache.org/repos/asf/chukwa/blob/7f662e8c/src/main/java/org/apache/hadoop/chukwa/extraction/demux/HourlyChukwaRecordRolling.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/hadoop/chukwa/extraction/demux/HourlyChukwaRecordRolling.java b/src/main/java/org/apache/hadoop/chukwa/extraction/demux/HourlyChukwaRecordRolling.java index c8f2799..b59b229 100644 --- a/src/main/java/org/apache/hadoop/chukwa/extraction/demux/HourlyChukwaRecordRolling.java +++ b/src/main/java/org/apache/hadoop/chukwa/extraction/demux/HourlyChukwaRecordRolling.java @@ -121,7 +121,7 @@ public class HourlyChukwaRecordRolling extends Configured implements Tool { new HourlyChukwaRecordRolling(), mergeArgs, deleteRawdata); List<RecordMerger> allMerge = new ArrayList<RecordMerger>(); if (rollInSequence) { - merge.run(); + merge.mergeRecords(); } else { allMerge.add(merge); merge.start(); http://git-wip-us.apache.org/repos/asf/chukwa/blob/7f662e8c/src/main/java/org/apache/hadoop/chukwa/extraction/demux/PostProcessorManager.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/hadoop/chukwa/extraction/demux/PostProcessorManager.java b/src/main/java/org/apache/hadoop/chukwa/extraction/demux/PostProcessorManager.java index 9685471..df1ff88 100644 --- a/src/main/java/org/apache/hadoop/chukwa/extraction/demux/PostProcessorManager.java +++ b/src/main/java/org/apache/hadoop/chukwa/extraction/demux/PostProcessorManager.java @@ -28,7 +28,17 @@ import java.util.List; import org.apache.hadoop.chukwa.conf.ChukwaConfiguration; import org.apache.hadoop.chukwa.dataloader.DataLoaderFactory; -import org.apache.hadoop.chukwa.extraction.CHUKWA_CONSTANT; +import static org.apache.hadoop.chukwa.extraction.CHUKWA_CONSTANT.HDFS_DEFAULT_NAME_FIELD; +import static org.apache.hadoop.chukwa.extraction.CHUKWA_CONSTANT.CHUKWA_ROOT_DIR_FIELD; +import static org.apache.hadoop.chukwa.extraction.CHUKWA_CONSTANT.CHUKWA_POST_PROCESS_DIR_FIELD; +import static org.apache.hadoop.chukwa.extraction.CHUKWA_CONSTANT.DEFAULT_CHUKWA_POSTPROCESS_DIR_NAME; +import static org.apache.hadoop.chukwa.extraction.CHUKWA_CONSTANT.CHUKWA_ROOT_REPOS_DIR_FIELD; +import static org.apache.hadoop.chukwa.extraction.CHUKWA_CONSTANT.DEFAULT_REPOS_DIR_NAME; +import static org.apache.hadoop.chukwa.extraction.CHUKWA_CONSTANT.CHUKWA_POSTPROCESS_IN_ERROR_DIR_FIELD; +import static org.apache.hadoop.chukwa.extraction.CHUKWA_CONSTANT.DEFAULT_POSTPROCESS_IN_ERROR_DIR_NAME; +import static org.apache.hadoop.chukwa.extraction.CHUKWA_CONSTANT.CHUKWA_POSTPROCESS_MAX_ERROR_COUNT_FIELD; +import static org.apache.hadoop.chukwa.extraction.CHUKWA_CONSTANT.POST_DEMUX_DATA_LOADER; +import static org.apache.hadoop.chukwa.extraction.CHUKWA_CONSTANT.POST_DEMUX_SUCCESS_ACTION; import org.apache.hadoop.chukwa.util.ExceptionUtil; import org.apache.hadoop.chukwa.util.HierarchyDataType; import org.apache.hadoop.chukwa.datatrigger.TriggerAction; @@ -39,11 +49,11 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathFilter; import org.apache.log4j.Logger; -public class PostProcessorManager implements CHUKWA_CONSTANT{ +public class PostProcessorManager { static Logger log = Logger.getLogger(PostProcessorManager.class); - protected static HashMap<String, String> dataSources = new HashMap<String, String>(); - public static int errorCount = 0; + protected HashMap<String, String> dataSources = new HashMap<String, String>(); + protected int errorCount = 0; protected int ERROR_SLEEP_TIME = 60; protected ChukwaConfiguration conf = null; http://git-wip-us.apache.org/repos/asf/chukwa/blob/7f662e8c/src/main/java/org/apache/hadoop/chukwa/extraction/demux/RecordMerger.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/hadoop/chukwa/extraction/demux/RecordMerger.java b/src/main/java/org/apache/hadoop/chukwa/extraction/demux/RecordMerger.java index 4b26e45..363016b 100644 --- a/src/main/java/org/apache/hadoop/chukwa/extraction/demux/RecordMerger.java +++ b/src/main/java/org/apache/hadoop/chukwa/extraction/demux/RecordMerger.java @@ -43,12 +43,16 @@ public class RecordMerger extends Thread { this.conf = conf; this.fs = fs; this.tool = tool; - this.mergeArgs = mergeArgs; + this.mergeArgs = mergeArgs.clone(); this.deleteRawData = deleteRawData; } @Override public void run() { + mergeRecords(); + } + + void mergeRecords() { System.out.println("\t Running Merge! : output [" + mergeArgs[1] + "]"); int res; try { http://git-wip-us.apache.org/repos/asf/chukwa/blob/7f662e8c/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/AbstractProcessor.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/AbstractProcessor.java b/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/AbstractProcessor.java index f11b727..72574eb 100644 --- a/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/AbstractProcessor.java +++ b/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/AbstractProcessor.java @@ -19,7 +19,9 @@ package org.apache.hadoop.chukwa.extraction.demux.processor.mapper; +import java.nio.charset.Charset; import java.util.Calendar; + import org.apache.hadoop.chukwa.ChukwaArchiveKey; import org.apache.hadoop.chukwa.Chunk; import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord; @@ -121,7 +123,7 @@ public abstract class AbstractProcessor implements MapProcessor { protected String nextLine() { String log = new String(bytes, startOffset, (recordOffsets[currentPos] - - startOffset + 1)); + - startOffset + 1), Charset.forName("UTF-8")); startOffset = recordOffsets[currentPos] + 1; currentPos++; return RecordConstants.recoverRecordSeparators("\n", log); http://git-wip-us.apache.org/repos/asf/chukwa/blob/7f662e8c/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/ChunkSaver.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/ChunkSaver.java b/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/ChunkSaver.java index 61ba28c..90a561c 100644 --- a/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/ChunkSaver.java +++ b/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/ChunkSaver.java @@ -19,7 +19,9 @@ package org.apache.hadoop.chukwa.extraction.demux.processor.mapper; +import java.nio.charset.Charset; import java.util.Calendar; + import org.apache.hadoop.chukwa.Chunk; import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord; import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey; @@ -60,7 +62,7 @@ public class ChunkSaver { DataOutputBuffer ob = new DataOutputBuffer(chunk .getSerializedSizeEstimate()); chunk.write(ob); - record.add(Record.chunkDataField, new String(ob.getData())); + record.add(Record.chunkDataField, new String(ob.getData(), Charset.forName("UTF-8"))); record.add(Record.chunkExceptionField, ExceptionUtil .getStackTrace(throwable)); output.collect(key, record); @@ -73,7 +75,7 @@ public class ChunkSaver { + " - source:" + chunk.getSource() + " - dataType: " + chunk.getDataType() + " - Stream: " + chunk.getStreamName() + " - SeqId: " + chunk.getSeqID() + " - Data: " - + new String(chunk.getData())); + + new String(chunk.getData(), Charset.forName("UTF-8"))); } catch (Throwable e1) { e.printStackTrace(); } http://git-wip-us.apache.org/repos/asf/chukwa/blob/7f662e8c/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/ClientTraceProcessor.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/ClientTraceProcessor.java b/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/ClientTraceProcessor.java index f249512..afc78ed 100644 --- a/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/ClientTraceProcessor.java +++ b/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/ClientTraceProcessor.java @@ -130,7 +130,7 @@ public class ClientTraceProcessor extends AbstractProcessor { rec.add(Record.tagsField, chunk.getTags()); rec.add(Record.sourceField, chunk.getSource()); rec.add(Record.applicationField, chunk.getStreamName()); - rec.add("actual_time",(new Long(ms_fullresolution)).toString()); + rec.add("actual_time",Long.toString(ms_fullresolution)); output.collect(key, rec); } catch (ParseException e) { http://git-wip-us.apache.org/repos/asf/chukwa/blob/7f662e8c/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/DatanodeProcessor.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/DatanodeProcessor.java b/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/DatanodeProcessor.java index 4e5765d..85db7fc 100644 --- a/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/DatanodeProcessor.java +++ b/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/DatanodeProcessor.java @@ -118,11 +118,13 @@ public class DatanodeProcessor extends AbstractProcessor { } else { timeStamp = Long.parseLong(ttTag); } - Iterator<String> keys = obj.keySet().iterator(); + @SuppressWarnings("unchecked") + Iterator<Map.Entry<String, ?>> keys = obj.entrySet().iterator(); while (keys.hasNext()) { - String key = keys.next(); - Object value = obj.get(key); + Map.Entry<String, ?> entry = keys.next(); + String key = entry.getKey(); + Object value = entry.getValue(); String valueString = value == null ? "" : value.toString(); // Calculate rate for some of the metrics http://git-wip-us.apache.org/repos/asf/chukwa/blob/7f662e8c/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/HBaseMasterProcessor.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/HBaseMasterProcessor.java b/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/HBaseMasterProcessor.java index c04e752..924f6aa 100644 --- a/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/HBaseMasterProcessor.java +++ b/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/HBaseMasterProcessor.java @@ -18,6 +18,7 @@ package org.apache.hadoop.chukwa.extraction.demux.processor.mapper; +import java.nio.charset.Charset; import java.util.Calendar; import java.util.HashMap; import java.util.Iterator; @@ -67,11 +68,13 @@ public class HBaseMasterProcessor extends AbstractProcessor { } else { timeStamp = Long.parseLong(ttTag); } - Iterator<String> keys = obj.keySet().iterator(); + @SuppressWarnings("unchecked") + Iterator<Map.Entry<String, ?>> keys = obj.entrySet().iterator(); while (keys.hasNext()) { - String key = keys.next(); - Object value = obj.get(key); + Map.Entry<String, ?> entry = keys.next(); + String key = entry.getKey(); + Object value = entry.getValue(); String valueString = value == null ? "" : value.toString(); // Calculate rate for some of the metrics @@ -88,7 +91,7 @@ public class HBaseMasterProcessor extends AbstractProcessor { valueString = Long.toString(newValue); } - Buffer b = new Buffer(valueString.getBytes()); + Buffer b = new Buffer(valueString.getBytes(Charset.forName("UTF-8"))); metricsMap.put(key, b); } http://git-wip-us.apache.org/repos/asf/chukwa/blob/7f662e8c/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/HBaseRegionServerProcessor.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/HBaseRegionServerProcessor.java b/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/HBaseRegionServerProcessor.java index 8fab057..6ea0169 100644 --- a/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/HBaseRegionServerProcessor.java +++ b/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/HBaseRegionServerProcessor.java @@ -18,6 +18,7 @@ package org.apache.hadoop.chukwa.extraction.demux.processor.mapper; +import java.nio.charset.Charset; import java.util.Calendar; import java.util.HashMap; import java.util.Iterator; @@ -60,13 +61,15 @@ public class HBaseRegionServerProcessor extends AbstractProcessor { } else { timeStamp = Long.parseLong(ttTag); } - Iterator<String> keys = obj.keySet().iterator(); + @SuppressWarnings("unchecked") + Iterator<Map.Entry<String, ?>> keys = obj.entrySet().iterator(); while (keys.hasNext()) { - String key = keys.next(); - Object value = obj.get(key); + Map.Entry<String, ?> entry = keys.next(); + String key = entry.getKey(); + Object value = entry.getValue(); String valueString = value == null ? "" : value.toString(); - Buffer b = new Buffer(valueString.getBytes()); + Buffer b = new Buffer(valueString.getBytes(Charset.forName("UTF-8"))); metricsMap.put(key, b); } http://git-wip-us.apache.org/repos/asf/chukwa/blob/7f662e8c/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/HadoopMetricsProcessor.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/HadoopMetricsProcessor.java b/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/HadoopMetricsProcessor.java index f671049..8351e84 100644 --- a/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/HadoopMetricsProcessor.java +++ b/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/HadoopMetricsProcessor.java @@ -25,6 +25,7 @@ import java.text.SimpleDateFormat; import java.util.Calendar; import java.util.Date; import java.util.Iterator; +import java.util.Map; import org.apache.hadoop.chukwa.datacollection.writer.hbase.Annotation.Tables; import org.apache.hadoop.chukwa.datacollection.writer.hbase.Annotation.Table; @@ -91,24 +92,26 @@ public class HadoopMetricsProcessor extends AbstractProcessor { String contextName = null; String recordName = null; - Iterator<String> ki = json.keySet().iterator(); + Iterator<Map.Entry<String, ?>> ki = json.entrySet().iterator(); while (ki.hasNext()) { - String keyName = ki.next(); + Map.Entry<String, ?> entry = ki.next(); + String keyName = entry.getKey(); + Object value = entry.getValue(); if (chukwaTimestampField.intern() == keyName.intern()) { - d = new Date((Long) json.get(keyName)); + d = new Date((Long) value); Calendar cal = Calendar.getInstance(); cal.setTimeInMillis(d.getTime()); cal.set(Calendar.SECOND, 0); cal.set(Calendar.MILLISECOND, 0); d.setTime(cal.getTimeInMillis()); } else if (contextNameField.intern() == keyName.intern()) { - contextName = (String) json.get(keyName); + contextName = (String) value; } else if (recordNameField.intern() == keyName.intern()) { - recordName = (String) json.get(keyName); - record.add(keyName, json.get(keyName).toString()); + recordName = (String) value; + record.add(keyName, value.toString()); } else { if(json.get(keyName)!=null) { - record.add(keyName, json.get(keyName).toString()); + record.add(keyName, value.toString()); } } } http://git-wip-us.apache.org/repos/asf/chukwa/blob/7f662e8c/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/JobConfProcessor.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/JobConfProcessor.java b/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/JobConfProcessor.java index 7e2e4e2..b910165 100644 --- a/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/JobConfProcessor.java +++ b/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/JobConfProcessor.java @@ -19,6 +19,8 @@ package org.apache.hadoop.chukwa.extraction.demux.processor.mapper; import java.io.File; import java.io.FileOutputStream; +import java.io.IOException; +import java.nio.charset.Charset; import java.util.Calendar; import java.util.Random; import java.util.regex.Matcher; @@ -78,13 +80,14 @@ public class JobConfProcessor extends AbstractProcessor { = DocumentBuilderFactory.newInstance(); //ignore all comments inside the xml file docBuilderFactory.setIgnoringComments(true); + FileOutputStream out = null; try { DocumentBuilder builder = docBuilderFactory.newDocumentBuilder(); Document doc = null; String fileName = "test_"+randomNumber.nextInt(); File tmp = new File(fileName); - FileOutputStream out = new FileOutputStream(tmp); - out.write(recordEntry.getBytes()); + out = new FileOutputStream(tmp); + out.write(recordEntry.getBytes(Charset.forName("UTF-8"))); out.close(); doc = builder.parse(fileName); Element root = doc.getDocumentElement(); @@ -139,10 +142,15 @@ public class JobConfProcessor extends AbstractProcessor { buildGenericRecord(jobConfRecord, null, time, jobConfData); output.collect(key, jobConfRecord); - tmp.delete(); - } catch(Exception e) { - e.printStackTrace(); - throw e; + if(!tmp.delete()) { + log.warn(tmp.getAbsolutePath() + " cannot be deleted."); + } + } catch(IOException e) { + if(out != null) { + out.close(); + } + e.printStackTrace(); + throw e; } } http://git-wip-us.apache.org/repos/asf/chukwa/blob/7f662e8c/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/JobLogHistoryProcessor.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/JobLogHistoryProcessor.java b/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/JobLogHistoryProcessor.java index 5a2a851..fdd51f2 100644 --- a/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/JobLogHistoryProcessor.java +++ b/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/JobLogHistoryProcessor.java @@ -22,6 +22,7 @@ package org.apache.hadoop.chukwa.extraction.demux.processor.mapper; import java.io.IOException; import java.util.HashMap; import java.util.Iterator; +import java.util.Map.Entry; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -38,13 +39,12 @@ public class JobLogHistoryProcessor extends AbstractProcessor { static Logger log = Logger.getLogger(JobLogHistoryProcessor.class); private static final String recordType = "JobLogHistory"; - private static String internalRegex = null; - private static Pattern ip = null; + private static final String internalRegex = "(.*?)=\"(.*?)\"(.*)([\\n])?"; + private Pattern ip = null; private Matcher internalMatcher = null; public JobLogHistoryProcessor() { - internalRegex = "(.*?)=\"(.*?)\"(.*)([\\n])?"; ip = Pattern.compile(internalRegex); internalMatcher = ip.matcher("-"); } @@ -331,10 +331,8 @@ public class JobLogHistoryProcessor extends AbstractProcessor { record.setTime(Long.parseLong(keys.get("FINISH_TIME"))); } - Iterator<String> it = keys.keySet().iterator(); - while (it.hasNext()) { - String field = it.next(); - record.add(field, keys.get(field)); + for(Entry<String, String> entry : keys.entrySet()) { + record.add(entry.getKey(), entry.getValue()); } output.collect(key, record); http://git-wip-us.apache.org/repos/asf/chukwa/blob/7f662e8c/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/JobTrackerProcessor.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/JobTrackerProcessor.java b/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/JobTrackerProcessor.java index c2f7b52..d42c329 100644 --- a/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/JobTrackerProcessor.java +++ b/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/JobTrackerProcessor.java @@ -22,6 +22,8 @@ import java.util.Calendar; import java.util.HashMap; import java.util.Iterator; import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; import java.util.TimeZone; import java.util.concurrent.ConcurrentHashMap; @@ -113,12 +115,9 @@ public class JobTrackerProcessor extends AbstractProcessor { } else { timeStamp = Long.parseLong(ttTag); } - Iterator<String> keys = obj.keySet().iterator(); - - while (keys.hasNext()) { - String key = keys.next(); - Object value = obj.get(key); - String valueString = value == null ? "" : value.toString(); + for(Entry<String, Object> entry : (Set<Entry<String, Object>>) obj.entrySet()) { + String key = entry.getKey(); + String valueString = entry.getValue().toString(); // Calculate rate for some of the metrics if (rateMap.containsKey(key)) { http://git-wip-us.apache.org/repos/asf/chukwa/blob/7f662e8c/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/Log4JMetricsContextProcessor.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/Log4JMetricsContextProcessor.java b/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/Log4JMetricsContextProcessor.java index 79291a1..3962628 100644 --- a/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/Log4JMetricsContextProcessor.java +++ b/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/Log4JMetricsContextProcessor.java @@ -19,13 +19,15 @@ package org.apache.hadoop.chukwa.extraction.demux.processor.mapper; import java.util.Iterator; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord; import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey; import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.Reporter; import org.apache.log4j.Logger; - import org.json.simple.JSONObject; import org.json.simple.JSONValue; @@ -67,10 +69,9 @@ public class Log4JMetricsContextProcessor extends AbstractProcessor { recordType += "_" + recordName; } - Iterator<String> ki = json.keySet().iterator(); - while (ki.hasNext()) { - String key = ki.next(); - String value = String.valueOf(json.get(key)); + for(Entry<String, Object> entry : (Set<Map.Entry>) json.entrySet()) { + String key = entry.getKey(); + String value = String.valueOf(entry.getValue()); if(value != null) { chukwaRecord.add(key, value); } http://git-wip-us.apache.org/repos/asf/chukwa/blob/7f662e8c/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/Log4jJobHistoryProcessor.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/Log4jJobHistoryProcessor.java b/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/Log4jJobHistoryProcessor.java index 272980a..2cb0980 100644 --- a/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/Log4jJobHistoryProcessor.java +++ b/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/Log4jJobHistoryProcessor.java @@ -34,13 +34,12 @@ public class Log4jJobHistoryProcessor extends AbstractProcessor { static Logger log = Logger.getLogger(Log4jJobHistoryProcessor.class); private static final String recordType = "JobLogHistory"; - private static String internalRegex = null; - private static Pattern ip = null; + private static String internalRegex = "(.*?)=\"(.*?)\"(.*)([\\n])?"; + private Pattern ip = null; private Matcher internalMatcher = null; public Log4jJobHistoryProcessor() { - internalRegex = "(.*?)=\"(.*?)\"(.*)([\\n])?"; ip = Pattern.compile(internalRegex); internalMatcher = ip.matcher("-"); } http://git-wip-us.apache.org/repos/asf/chukwa/blob/7f662e8c/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/LogEntry.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/LogEntry.java b/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/LogEntry.java index 400bd78..5b75939 100644 --- a/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/LogEntry.java +++ b/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/LogEntry.java @@ -22,7 +22,7 @@ import java.text.SimpleDateFormat; import java.util.Date; public class LogEntry { - private final static SimpleDateFormat sdf = new SimpleDateFormat( + private SimpleDateFormat sdf = new SimpleDateFormat( "yyyy-MM-dd HH:mm"); private Date date; @@ -43,11 +43,11 @@ public class LogEntry { } public Date getDate() { - return date; + return (Date) date.clone(); } public void setDate(Date date) { - this.date = date; + this.date = (Date) date.clone(); } public String getLogLevel() { http://git-wip-us.apache.org/repos/asf/chukwa/blob/7f662e8c/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/NamenodeProcessor.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/NamenodeProcessor.java b/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/NamenodeProcessor.java index 1e6e9d7..075ab5c 100644 --- a/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/NamenodeProcessor.java +++ b/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/NamenodeProcessor.java @@ -131,11 +131,13 @@ public class NamenodeProcessor extends AbstractProcessor { } else { timeStamp = Long.parseLong(ttTag); } - Iterator<String> keys = obj.keySet().iterator(); + @SuppressWarnings("unchecked") + Iterator<Map.Entry<String, ?>> keys = obj.entrySet().iterator(); while (keys.hasNext()) { - String key = keys.next(); - Object value = obj.get(key); + Map.Entry<String, ?> entry = keys.next(); + String key = entry.getKey(); + Object value = entry.getValue(); String valueString = (value == null) ? "" : value.toString(); // These metrics are string types with JSON structure. So we parse them http://git-wip-us.apache.org/repos/asf/chukwa/blob/7f662e8c/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/SysLog.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/SysLog.java b/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/SysLog.java index 4c643a2..d38673c 100644 --- a/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/SysLog.java +++ b/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/SysLog.java @@ -49,13 +49,6 @@ public class SysLog extends AbstractProcessor { throws Throwable { try { String dStr = recordEntry.substring(0, 15); - int start = 15; - int idx = recordEntry.indexOf(' ', start); - start = idx + 1; - idx = recordEntry.indexOf(' ', start); - String body = recordEntry.substring(idx + 1); - body = body.replaceAll("\n", ""); - Calendar convertDate = Calendar.getInstance(); Date d = sdf.parse(dStr); int year = convertDate.get(Calendar.YEAR); http://git-wip-us.apache.org/repos/asf/chukwa/blob/7f662e8c/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/SystemMetrics.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/SystemMetrics.java b/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/SystemMetrics.java index e293543..3fa1816 100644 --- a/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/SystemMetrics.java +++ b/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/SystemMetrics.java @@ -24,6 +24,7 @@ package org.apache.hadoop.chukwa.extraction.demux.processor.mapper; import java.util.Calendar; import java.util.Iterator; +import java.util.Map; import java.util.TimeZone; import org.apache.hadoop.chukwa.datacollection.writer.hbase.Annotation.Table; @@ -69,14 +70,17 @@ public class SystemMetrics extends AbstractProcessor { continue; } actualSize++; - Iterator<String> keys = cpu.keySet().iterator(); combined = combined + Double.parseDouble(cpu.get("combined").toString()); user = user + Double.parseDouble(cpu.get("user").toString()); sys = sys + Double.parseDouble(cpu.get("sys").toString()); idle = idle + Double.parseDouble(cpu.get("idle").toString()); + @SuppressWarnings("unchecked") + Iterator<Map.Entry<String, ?>> keys = cpu.entrySet().iterator(); while(keys.hasNext()) { - String key = keys.next(); - record.add(key + "." + i, cpu.get(key).toString()); + Map.Entry<String, ?> entry = keys.next(); + String key = entry.getKey(); + Object value = entry.getValue(); + record.add(key + "." + i, value.toString()); } } combined = combined / actualSize; @@ -101,20 +105,26 @@ public class SystemMetrics extends AbstractProcessor { record = new ChukwaRecord(); JSONObject memory = (JSONObject) json.get("memory"); - Iterator<String> memKeys = memory.keySet().iterator(); + @SuppressWarnings("unchecked") + Iterator<Map.Entry<String, ?>> memKeys = memory.entrySet().iterator(); while(memKeys.hasNext()) { - String key = memKeys.next(); - record.add(key, memory.get(key).toString()); + Map.Entry<String, ?> entry = memKeys.next(); + String key = entry.getKey(); + Object value = entry.getValue(); + record.add(key, value.toString()); } buildGenericRecord(record, null, cal.getTimeInMillis(), "memory"); output.collect(key, record); record = new ChukwaRecord(); JSONObject swap = (JSONObject) json.get("swap"); - Iterator<String> swapKeys = swap.keySet().iterator(); + @SuppressWarnings("unchecked") + Iterator<Map.Entry<String, ?>> swapKeys = swap.entrySet().iterator(); while(swapKeys.hasNext()) { - String key = swapKeys.next(); - record.add(key, swap.get(key).toString()); + Map.Entry<String, ?> entry = swapKeys.next(); + String key = entry.getKey(); + Object value = entry.getValue(); + record.add(key, value.toString()); } buildGenericRecord(record, null, cal.getTimeInMillis(), "swap"); output.collect(key, record); @@ -131,25 +141,28 @@ public class SystemMetrics extends AbstractProcessor { JSONArray netList = (JSONArray) json.get("network"); for(int i = 0;i < netList.size(); i++) { JSONObject netIf = (JSONObject) netList.get(i); - Iterator<String> keys = netIf.keySet().iterator(); + @SuppressWarnings("unchecked") + Iterator<Map.Entry<String, ?>> keys = netIf.entrySet().iterator(); while(keys.hasNext()) { - String key = keys.next(); - record.add(key + "." + i, netIf.get(key).toString()); + Map.Entry<String, ?> entry = keys.next(); + String key = entry.getKey(); + Object value = entry.getValue(); + record.add(key + "." + i, value.toString()); if(i!=0) { if(key.equals("RxBytes")) { - rxBytes = rxBytes + (Long) netIf.get(key); + rxBytes = rxBytes + (Long) value; } else if(key.equals("RxDropped")) { - rxDropped = rxDropped + (Long) netIf.get(key); + rxDropped = rxDropped + (Long) value; } else if(key.equals("RxErrors")) { - rxErrors = rxErrors + (Long) netIf.get(key); + rxErrors = rxErrors + (Long) value; } else if(key.equals("RxPackets")) { - rxPackets = rxPackets + (Long) netIf.get(key); + rxPackets = rxPackets + (Long) value; } else if(key.equals("TxBytes")) { - txBytes = txBytes + (Long) netIf.get(key); + txBytes = txBytes + (Long) value; } else if(key.equals("TxCollisions")) { - txCollisions = txCollisions + (Long) netIf.get(key); + txCollisions = txCollisions + (Long) value; } else if(key.equals("TxErrors")) { - txErrors = txErrors + (Long) netIf.get(key); + txErrors = txErrors + (Long) value; } else if(key.equals("TxPackets")) { txPackets = txPackets + (Long) netIf.get(key); } @@ -177,22 +190,25 @@ public class SystemMetrics extends AbstractProcessor { JSONArray diskList = (JSONArray) json.get("disk"); for(int i = 0;i < diskList.size(); i++) { JSONObject disk = (JSONObject) diskList.get(i); - Iterator<String> keys = disk.keySet().iterator(); + @SuppressWarnings("unchecked") + Iterator<Map.Entry<String, ?>> keys = disk.entrySet().iterator(); while(keys.hasNext()) { - String key = keys.next(); - record.add(key + "." + i, disk.get(key).toString()); + Map.Entry<String, ?> entry = keys.next(); + String key = entry.getKey(); + Object value = entry.getValue(); + record.add(key + "." + i, value.toString()); if(key.equals("ReadBytes")) { - readBytes = readBytes + (Long) disk.get("ReadBytes"); + readBytes = readBytes + (Long) value; } else if(key.equals("Reads")) { - reads = reads + (Long) disk.get("Reads"); + reads = reads + (Long) value; } else if(key.equals("WriteBytes")) { - writeBytes = writeBytes + (Long) disk.get("WriteBytes"); + writeBytes = writeBytes + (Long) value; } else if(key.equals("Writes")) { - writes = writes + (Long) disk.get("Writes"); + writes = writes + (Long) value; } else if(key.equals("Total")) { - total = total + (Long) disk.get("Total"); + total = total + (Long) value; } else if(key.equals("Used")) { - used = used + (Long) disk.get("Used"); + used = used + (Long) value; } } } http://git-wip-us.apache.org/repos/asf/chukwa/blob/7f662e8c/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/ZookeeperProcessor.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/ZookeeperProcessor.java b/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/ZookeeperProcessor.java index 417fbb5..fe050ed 100644 --- a/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/ZookeeperProcessor.java +++ b/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/ZookeeperProcessor.java @@ -77,11 +77,13 @@ public class ZookeeperProcessor extends AbstractProcessor { } else { timeStamp = Long.parseLong(ttTag); } - Iterator<String> keys = ((JSONObject) obj).keySet().iterator(); + @SuppressWarnings("unchecked") + Iterator<Map.Entry<String, ?>> keys = ((JSONObject) obj).entrySet().iterator(); while (keys.hasNext()) { - String key = keys.next(); - Object value = obj.get(key); + Map.Entry<String, ?> entry = keys.next(); + String key = entry.getKey(); + Object value = entry.getValue(); String valueString = value == null ? "" : value.toString(); if (metricsMap.containsKey(key)) { http://git-wip-us.apache.org/repos/asf/chukwa/blob/7f662e8c/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/reducer/ClientTrace.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/reducer/ClientTrace.java b/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/reducer/ClientTrace.java index 4002c6c..5c658dc 100644 --- a/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/reducer/ClientTrace.java +++ b/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/reducer/ClientTrace.java @@ -48,7 +48,7 @@ public class ClientTrace implements ReduceProcessor { while (values.hasNext()) { /* aggregate bytes for current key */ rec = values.next(); - bytes += Long.valueOf(rec.getValue("bytes")); + bytes += Long.parseLong(rec.getValue("bytes")); /* output raw values to different data type for uses which * require detailed per-operation data */ @@ -70,7 +70,7 @@ public class ClientTrace implements ReduceProcessor { String[] k = key.getKey().split("/"); emit.add(k[1] + "_" + k[2], String.valueOf(bytes)); - emit.setTime(Long.valueOf(k[3])); + emit.setTime(Long.parseLong(k[3])); output.collect(key, emit); } catch (IOException e) { http://git-wip-us.apache.org/repos/asf/chukwa/blob/7f662e8c/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/reducer/MRJobReduceProcessor.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/reducer/MRJobReduceProcessor.java b/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/reducer/MRJobReduceProcessor.java index 5e8814c..c5050f2 100644 --- a/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/reducer/MRJobReduceProcessor.java +++ b/src/main/java/org/apache/hadoop/chukwa/extraction/demux/processor/reducer/MRJobReduceProcessor.java @@ -21,6 +21,8 @@ package org.apache.hadoop.chukwa.extraction.demux.processor.reducer; import java.io.IOException; import java.util.HashMap; import java.util.Iterator; +import java.util.Map; + import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord; import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey; import org.apache.hadoop.chukwa.extraction.engine.Record; @@ -73,10 +75,12 @@ public class MRJobReduceProcessor implements ReduceProcessor { newRecord.add(Record.tagsField, record.getValue(Record.tagsField)); newRecord.setTime(initTime); newRecord.add(Record.tagsField, record.getValue(Record.tagsField)); - Iterator<String> it = data.keySet().iterator(); + Iterator<Map.Entry<String, String>> it = data.entrySet().iterator(); while (it.hasNext()) { - String field = it.next(); - newRecord.add(field, data.get(field)); + Map.Entry<String, ?> entry = it.next(); + String field = entry.getKey(); + String value = entry.getValue().toString(); + newRecord.add(field, value); } output.collect(newKey, newRecord); http://git-wip-us.apache.org/repos/asf/chukwa/blob/7f662e8c/src/main/java/org/apache/hadoop/chukwa/extraction/engine/ChukwaRecord.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/hadoop/chukwa/extraction/engine/ChukwaRecord.java b/src/main/java/org/apache/hadoop/chukwa/extraction/engine/ChukwaRecord.java index 4fdb365..a884cc1 100644 --- a/src/main/java/org/apache/hadoop/chukwa/extraction/engine/ChukwaRecord.java +++ b/src/main/java/org/apache/hadoop/chukwa/extraction/engine/ChukwaRecord.java @@ -19,10 +19,12 @@ package org.apache.hadoop.chukwa.extraction.engine; +import java.nio.charset.Charset; import java.util.Iterator; import java.util.Map; import java.util.Set; import java.util.TreeMap; + import org.apache.hadoop.record.Buffer; public class ChukwaRecord extends ChukwaRecordJT implements Record { @@ -32,10 +34,10 @@ public class ChukwaRecord extends ChukwaRecordJT implements Record { public void add(String key, String value) { synchronized (this) { if (this.mapFields == null) { - this.mapFields = new TreeMap<String, org.apache.hadoop.record.Buffer>(); + this.mapFields = new TreeMap<String, Buffer>(); } } - this.mapFields.put(key, new Buffer(value.getBytes())); + this.mapFields.put(key, new Buffer(value.getBytes(Charset.forName("UTF-8")))); } public String[] getFields() { @@ -44,7 +46,7 @@ public class ChukwaRecord extends ChukwaRecordJT implements Record { public String getValue(String field) { if (this.mapFields.containsKey(field)) { - return new String(this.mapFields.get(field).get()); + return new String(this.mapFields.get(field).get(), Charset.forName("UTF-8")); } else { return null; } @@ -77,7 +79,7 @@ public class ChukwaRecord extends ChukwaRecordJT implements Record { while (it.hasNext()) { entry = it.next(); key = entry.getKey().intern(); - val = new String(entry.getValue().get()); + val = new String(entry.getValue().get(), Charset.forName("UTF-8")); if (key.intern() == Record.bodyField.intern()) { hasBody = true; bodyVal = val; http://git-wip-us.apache.org/repos/asf/chukwa/blob/7f662e8c/src/main/java/org/apache/hadoop/chukwa/extraction/engine/ChukwaRecordJT.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/hadoop/chukwa/extraction/engine/ChukwaRecordJT.java b/src/main/java/org/apache/hadoop/chukwa/extraction/engine/ChukwaRecordJT.java index b2660a2..04cca36 100644 --- a/src/main/java/org/apache/hadoop/chukwa/extraction/engine/ChukwaRecordJT.java +++ b/src/main/java/org/apache/hadoop/chukwa/extraction/engine/ChukwaRecordJT.java @@ -19,8 +19,11 @@ // File generated by hadoop record compiler. Do not edit. package org.apache.hadoop.chukwa.extraction.engine; +import java.io.Serializable; -public class ChukwaRecordJT extends org.apache.hadoop.record.Record { + +public class ChukwaRecordJT extends org.apache.hadoop.record.Record implements Serializable { + private static final long serialVersionUID = 15015L; private static final org.apache.hadoop.record.meta.RecordTypeInfo _rio_recTypeInfo; private static org.apache.hadoop.record.meta.RecordTypeInfo _rio_rtiFilter; private static int[] _rio_rtiFilterFields; @@ -236,6 +239,7 @@ public class ChukwaRecordJT extends org.apache.hadoop.record.Record { } public Object clone() throws CloneNotSupportedException { + super.clone(); ChukwaRecordJT _rio_other = new ChukwaRecordJT(); _rio_other.time = this.time; _rio_other.mapFields = (java.util.TreeMap<String, org.apache.hadoop.record.Buffer>) this.mapFields @@ -258,7 +262,7 @@ public class ChukwaRecordJT extends org.apache.hadoop.record.Record { } public static class Comparator extends - org.apache.hadoop.record.RecordComparator { + org.apache.hadoop.record.RecordComparator implements Serializable { public Comparator() { super(ChukwaRecordJT.class); } http://git-wip-us.apache.org/repos/asf/chukwa/blob/7f662e8c/src/main/java/org/apache/hadoop/chukwa/extraction/engine/ChukwaRecordKey.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/hadoop/chukwa/extraction/engine/ChukwaRecordKey.java b/src/main/java/org/apache/hadoop/chukwa/extraction/engine/ChukwaRecordKey.java index 7bc6718..0e602d7 100644 --- a/src/main/java/org/apache/hadoop/chukwa/extraction/engine/ChukwaRecordKey.java +++ b/src/main/java/org/apache/hadoop/chukwa/extraction/engine/ChukwaRecordKey.java @@ -178,6 +178,7 @@ public class ChukwaRecordKey extends org.apache.hadoop.record.Record { } public Object clone() throws CloneNotSupportedException { + super.clone(); ChukwaRecordKey _rio_other = new ChukwaRecordKey(); _rio_other.reduceType = this.reduceType; _rio_other.key = this.key; http://git-wip-us.apache.org/repos/asf/chukwa/blob/7f662e8c/src/main/java/org/apache/hadoop/chukwa/extraction/engine/Token.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/hadoop/chukwa/extraction/engine/Token.java b/src/main/java/org/apache/hadoop/chukwa/extraction/engine/Token.java index 5b71a61..eb14414 100644 --- a/src/main/java/org/apache/hadoop/chukwa/extraction/engine/Token.java +++ b/src/main/java/org/apache/hadoop/chukwa/extraction/engine/Token.java @@ -21,4 +21,8 @@ package org.apache.hadoop.chukwa.extraction.engine; public class Token { public String key = null; public boolean hasMore = false; + + public boolean getMore() { + return hasMore; + } } http://git-wip-us.apache.org/repos/asf/chukwa/blob/7f662e8c/src/main/java/org/apache/hadoop/chukwa/extraction/engine/datasource/DsDirectory.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/hadoop/chukwa/extraction/engine/datasource/DsDirectory.java b/src/main/java/org/apache/hadoop/chukwa/extraction/engine/datasource/DsDirectory.java index 68dbb2c..dc0c576 100644 --- a/src/main/java/org/apache/hadoop/chukwa/extraction/engine/datasource/DsDirectory.java +++ b/src/main/java/org/apache/hadoop/chukwa/extraction/engine/datasource/DsDirectory.java @@ -37,7 +37,7 @@ public class DsDirectory { private DataConfig dataConfig = null; private static FileSystem fs = null; - private static Configuration conf = null; + private Configuration conf = null; private DsDirectory() { dataConfig = new DataConfig(); http://git-wip-us.apache.org/repos/asf/chukwa/blob/7f662e8c/src/main/java/org/apache/hadoop/chukwa/extraction/engine/datasource/database/DatabaseDS.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/hadoop/chukwa/extraction/engine/datasource/database/DatabaseDS.java b/src/main/java/org/apache/hadoop/chukwa/extraction/engine/datasource/database/DatabaseDS.java index c2602b4..bc0f83d 100644 --- a/src/main/java/org/apache/hadoop/chukwa/extraction/engine/datasource/database/DatabaseDS.java +++ b/src/main/java/org/apache/hadoop/chukwa/extraction/engine/datasource/database/DatabaseDS.java @@ -45,6 +45,9 @@ import org.apache.commons.logging.LogFactory; public class DatabaseDS implements DataSource { private static final Log log = LogFactory.getLog(DatabaseDS.class); + @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = + "SQL_NONCONSTANT_STRING_PASSED_TO_EXECUTE", + justification = "Dynamic based upon tables in the database") public SearchResult search(SearchResult result, String cluster, String dataSource, long t0, long t1, String filter, Token token) throws DataSourceException { @@ -60,8 +63,6 @@ public class DatabaseDS implements DataSource { timeField = "LAUNCH_TIME"; } else if (dataSource.equalsIgnoreCase("HodJob")) { timeField = "StartTime"; - } else if (dataSource.equalsIgnoreCase("QueueInfo")) { - timeField = "timestamp"; } else { timeField = "timestamp"; } @@ -88,13 +89,16 @@ public class DatabaseDS implements DataSource { int col = rmeta.getColumnCount(); while (rs.next()) { ChukwaRecord event = new ChukwaRecord(); - String cell = ""; + StringBuilder cell = new StringBuilder();; long timestamp = 0; for (int i = 1; i < col; i++) { String value = rs.getString(i); if (value != null) { - cell = cell + " " + rmeta.getColumnName(i) + ":" + value; + cell.append(" "); + cell.append(rmeta.getColumnName(i)); + cell.append(":"); + cell.append(value); } if (rmeta.getColumnName(i).equals(timeField)) { timestamp = rs.getLong(i); @@ -111,7 +115,7 @@ public class DatabaseDS implements DataSource { continue; } - event.add(Record.bodyField, cell); + event.add(Record.bodyField, cell.toString()); event.add(Record.sourceField, cluster + "." + dataSource); if (records.containsKey(timestamp)) { records.get(timestamp).add(event); http://git-wip-us.apache.org/repos/asf/chukwa/blob/7f662e8c/src/main/java/org/apache/hadoop/chukwa/extraction/engine/datasource/record/ChukwaDSInternalResult.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/hadoop/chukwa/extraction/engine/datasource/record/ChukwaDSInternalResult.java b/src/main/java/org/apache/hadoop/chukwa/extraction/engine/datasource/record/ChukwaDSInternalResult.java index bb1797b..f197420 100644 --- a/src/main/java/org/apache/hadoop/chukwa/extraction/engine/datasource/record/ChukwaDSInternalResult.java +++ b/src/main/java/org/apache/hadoop/chukwa/extraction/engine/datasource/record/ChukwaDSInternalResult.java @@ -34,4 +34,12 @@ public class ChukwaDSInternalResult { String fileName = null; ChukwaRecordKey key = null; + + public ChukwaRecordKey getKey() { + return key; + } + + protected void setKey(ChukwaRecordKey key) { + this.key = key; + } } http://git-wip-us.apache.org/repos/asf/chukwa/blob/7f662e8c/src/main/java/org/apache/hadoop/chukwa/extraction/engine/datasource/record/ChukwaFileParser.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/hadoop/chukwa/extraction/engine/datasource/record/ChukwaFileParser.java b/src/main/java/org/apache/hadoop/chukwa/extraction/engine/datasource/record/ChukwaFileParser.java index dc23ef6..799390b 100644 --- a/src/main/java/org/apache/hadoop/chukwa/extraction/engine/datasource/record/ChukwaFileParser.java +++ b/src/main/java/org/apache/hadoop/chukwa/extraction/engine/datasource/record/ChukwaFileParser.java @@ -131,12 +131,14 @@ public class ChukwaFileParser { } } while (line != null); - } catch (Exception e) { + } catch (IOException e) { e.printStackTrace(); } finally { System.out.println("File: " + fileName + " Line count: " + lineCount); try { - dataIS.close(); + if(dataIS != null) { + dataIS.close(); + } } catch (IOException e) { } } http://git-wip-us.apache.org/repos/asf/chukwa/blob/7f662e8c/src/main/java/org/apache/hadoop/chukwa/extraction/engine/datasource/record/ChukwaRecordDataSource.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/hadoop/chukwa/extraction/engine/datasource/record/ChukwaRecordDataSource.java b/src/main/java/org/apache/hadoop/chukwa/extraction/engine/datasource/record/ChukwaRecordDataSource.java index 7c9e02c..93fdd2a 100644 --- a/src/main/java/org/apache/hadoop/chukwa/extraction/engine/datasource/record/ChukwaRecordDataSource.java +++ b/src/main/java/org/apache/hadoop/chukwa/extraction/engine/datasource/record/ChukwaRecordDataSource.java @@ -26,7 +26,10 @@ import java.util.Date; import java.util.Iterator; import java.util.LinkedList; import java.util.List; +import java.util.Map; +import java.util.Map.Entry; import java.util.TreeMap; + import org.apache.hadoop.chukwa.conf.ChukwaConfiguration; import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord; import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey; @@ -150,7 +153,7 @@ public class ChukwaRecordDataSource implements DataSource { { log.debug("check for hours"); for (int hour = 0; hour < 24; hour++) { - if (workingDay == res.day && hour < workingHour) { + if (workingDay.equals(res.day) && hour < workingHour) { continue; } log.debug(" Hour? -->" + filePath + dataSource + "/" @@ -349,7 +352,7 @@ public class ChukwaRecordDataSource implements DataSource { } } - } catch (Exception e) { + } catch (IOException e) { e.printStackTrace(); } finally { try { @@ -375,7 +378,10 @@ public class ChukwaRecordDataSource implements DataSource { contains = fs.exists(new Path(rootFolder + dataSource + "/" + workingDay + "/" + workingHour + "/rotateDone")); break; - + default: + contains = fs.exists(new Path(rootFolder + dataSource + "/" + workingDay + + "/rotateDone")); + break; } return contains; } @@ -400,7 +406,10 @@ public class ChukwaRecordDataSource implements DataSource { contains = fs.exists(new Path(rootFolder + dataSource + "/" + workingDay + "/" + workingHour + "/" + raw)); break; - + default: + contains = fs + .exists(new Path(rootFolder + dataSource + "/" + workingDay)); + break; } return contains; } @@ -440,6 +449,10 @@ public class ChukwaRecordDataSource implements DataSource { + raws[rawIndex] + "/" + dataSource + "_" + day + "_" + hour + "_" + raws[rawIndex] + "." + spill + ".evt"; break; + default: + fileName = rootFolder + "/" + dataSource + "/" + day + "/" + dataSource + + "_" + day + "." + spill + ".evt"; + break; } log.debug("buildFileName :" + fileName); return fileName; @@ -473,12 +486,10 @@ public class ChukwaRecordDataSource implements DataSource { ds.search(result, cluster, dataSource, t0, t1, filter, token); TreeMap<Long, List<Record>> records = result.getRecords(); - Iterator<Long> it = records.keySet().iterator(); - - while (it.hasNext()) { - long ts = it.next(); + for(Entry<Long, List<Record>> entry : records.entrySet()) { + long ts = entry.getKey(); System.out.println("\n\nTimestamp: " + new Date(ts)); - List<Record> list = records.get(ts); + List<Record> list = entry.getValue(); for (int i = 0; i < list.size(); i++) { System.out.println(list.get(i)); } http://git-wip-us.apache.org/repos/asf/chukwa/blob/7f662e8c/src/main/java/org/apache/hadoop/chukwa/extraction/engine/datasource/record/ChukwaSequenceFileParser.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/hadoop/chukwa/extraction/engine/datasource/record/ChukwaSequenceFileParser.java b/src/main/java/org/apache/hadoop/chukwa/extraction/engine/datasource/record/ChukwaSequenceFileParser.java index dbaadc2..59b8dcd 100644 --- a/src/main/java/org/apache/hadoop/chukwa/extraction/engine/datasource/record/ChukwaSequenceFileParser.java +++ b/src/main/java/org/apache/hadoop/chukwa/extraction/engine/datasource/record/ChukwaSequenceFileParser.java @@ -118,7 +118,7 @@ public class ChukwaSequenceFileParser { } } - } catch (Exception e) { + } catch (IOException e) { e.printStackTrace(); } finally { System.out.println("File: " + fileName + " Line count: " + lineCount); http://git-wip-us.apache.org/repos/asf/chukwa/blob/7f662e8c/src/main/java/org/apache/hadoop/chukwa/hicc/JSONLoader.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/hadoop/chukwa/hicc/JSONLoader.java b/src/main/java/org/apache/hadoop/chukwa/hicc/JSONLoader.java index f721978..48c578a 100644 --- a/src/main/java/org/apache/hadoop/chukwa/hicc/JSONLoader.java +++ b/src/main/java/org/apache/hadoop/chukwa/hicc/JSONLoader.java @@ -20,13 +20,13 @@ package org.apache.hadoop.chukwa.hicc; import java.net.*; +import java.nio.charset.Charset; import java.text.ParseException; import java.io.*; import org.json.simple.JSONArray; import org.json.simple.JSONObject; import org.json.simple.JSONValue; - import org.apache.log4j.Logger; import org.apache.hadoop.chukwa.util.ExceptionUtil; @@ -43,7 +43,7 @@ public class JSONLoader { // FileReader always assumes default encoding is OK! URL yahoo = new URL(source); BufferedReader in = new BufferedReader(new InputStreamReader(yahoo - .openStream())); + .openStream(), Charset.forName("UTF-8"))); String inputLine; http://git-wip-us.apache.org/repos/asf/chukwa/blob/7f662e8c/src/main/java/org/apache/hadoop/chukwa/hicc/OfflineTimeHandler.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/hadoop/chukwa/hicc/OfflineTimeHandler.java b/src/main/java/org/apache/hadoop/chukwa/hicc/OfflineTimeHandler.java index b3e7bf5..1e98989 100644 --- a/src/main/java/org/apache/hadoop/chukwa/hicc/OfflineTimeHandler.java +++ b/src/main/java/org/apache/hadoop/chukwa/hicc/OfflineTimeHandler.java @@ -52,10 +52,8 @@ public class OfflineTimeHandler { public void init(HashMap<String, String> map) { Calendar now = Calendar.getInstance(); - if (map == null || (map != null - && map.get("time_type") == null - && map.get("time_type") == null - && map.get("period") == null)) { + if (map == null || + (map.get("time_type") == null && map.get("period") == null)) { end = now.getTimeInMillis(); start = end - 60 * 60 * 1000; } else if (map.get("period") != null http://git-wip-us.apache.org/repos/asf/chukwa/blob/7f662e8c/src/main/java/org/apache/hadoop/chukwa/hicc/Views.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/hadoop/chukwa/hicc/Views.java b/src/main/java/org/apache/hadoop/chukwa/hicc/Views.java index dbb6707..19e537d 100644 --- a/src/main/java/org/apache/hadoop/chukwa/hicc/Views.java +++ b/src/main/java/org/apache/hadoop/chukwa/hicc/Views.java @@ -20,12 +20,12 @@ package org.apache.hadoop.chukwa.hicc; import java.io.*; +import java.nio.charset.Charset; import java.util.*; import org.json.simple.JSONArray; import org.json.simple.JSONObject; import org.json.simple.JSONValue; - import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.chukwa.util.ExceptionUtil; @@ -43,7 +43,7 @@ public class Views { try { // use buffering, reading one line at a time // FileReader always assumes default encoding is OK! - BufferedReader input = new BufferedReader(new FileReader(aFile)); + BufferedReader input = new BufferedReader(new InputStreamReader(new FileInputStream(aFile.getAbsolutePath()), Charset.forName("UTF-8"))); try { String line = null; // not declared within while loop /*
