http://git-wip-us.apache.org/repos/asf/chukwa/blob/7f662e8c/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/FileAdaptor.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/FileAdaptor.java b/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/FileAdaptor.java index 14cf514..4bfae2d 100644 --- a/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/FileAdaptor.java +++ b/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/FileAdaptor.java @@ -35,8 +35,6 @@ import org.apache.log4j.Logger; class FileAdaptorTailer extends Thread { static Logger log = Logger.getLogger(FileAdaptorTailer.class); private List<FileAdaptor> adaptors = null; - private static Configuration conf = null; - private Object lock = new Object(); /** * How often to call each adaptor. @@ -46,16 +44,13 @@ class FileAdaptorTailer extends Thread { public FileAdaptorTailer() { - - if (conf == null) { - ChukwaAgent agent = ChukwaAgent.getAgent(); - if (agent != null) { - conf = agent.getConfiguration(); - if (conf != null) { - SAMPLE_PERIOD_MS = conf.getInt( - "chukwaAgent.adaptor.context.switch.time", - DEFAULT_SAMPLE_PERIOD_MS); - } + ChukwaAgent agent = ChukwaAgent.getAgent(); + if (agent != null) { + Configuration conf = agent.getConfiguration(); + if (conf != null) { + SAMPLE_PERIOD_MS = conf.getInt( + "chukwaAgent.adaptor.context.switch.time", + DEFAULT_SAMPLE_PERIOD_MS); } } @@ -70,17 +65,6 @@ class FileAdaptorTailer extends Thread { while(true) { try { - while (adaptors.size() == 0) { - synchronized (lock) { - try { - log.info("Waiting queue is empty"); - lock.wait(); - } catch (InterruptedException e) { - // do nothing - } - } - } - long startTime = System.currentTimeMillis(); for (FileAdaptor adaptor: adaptors) { log.info("calling sendFile for " + adaptor.toWatch.getCanonicalPath()); @@ -100,9 +84,6 @@ class FileAdaptorTailer extends Thread { public void addFileAdaptor(FileAdaptor adaptor) { adaptors.add(adaptor); - synchronized (lock) { - lock.notifyAll(); - } } public void removeFileAdaptor(FileAdaptor adaptor) { @@ -119,7 +100,7 @@ public class FileAdaptor extends AbstractAdaptor { static FileAdaptorTailer tailer = null; static final int DEFAULT_TIMEOUT_PERIOD = 5*60*1000; - static int TIMEOUT_PERIOD = DEFAULT_TIMEOUT_PERIOD; + int TIMEOUT_PERIOD = DEFAULT_TIMEOUT_PERIOD; static { tailer = new FileAdaptorTailer();
http://git-wip-us.apache.org/repos/asf/chukwa/blob/7f662e8c/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/JMXAdaptor.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/JMXAdaptor.java b/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/JMXAdaptor.java index c07f6fa..5f4928a 100644 --- a/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/JMXAdaptor.java +++ b/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/JMXAdaptor.java @@ -21,7 +21,6 @@ package org.apache.hadoop.chukwa.datacollection.adaptor; import java.io.BufferedReader; import java.io.File; import java.io.FileInputStream; -import java.io.FileReader; import java.io.IOException; import java.io.InputStreamReader; import java.nio.charset.Charset; @@ -105,9 +104,16 @@ public class JMXAdaptor extends AbstractAdaptor{ while(!shutdown){ try{ BufferedReader br = new BufferedReader(new InputStreamReader(new FileInputStream(jmx_pw_file.getAbsolutePath()), Charset.forName("UTF-8"))); - String[] creds = br.readLine().split(" "); - Map<String, String[]> env = new HashMap<String, String[]>(); - env.put(JMXConnector.CREDENTIALS, creds); + String buffer = br.readLine(); + String[] creds = null; + if(buffer != null ) { + creds = buffer.split(" "); + } + br.close(); + Map<String, String[]> env = new HashMap<String, String[]>(); + if(creds!=null) { + env.put(JMXConnector.CREDENTIALS, creds); + } jmxc = JMXConnectorFactory.connect(url, env); mbsc = jmxc.getMBeanServerConnection(); if(timer == null) { @@ -131,7 +137,7 @@ public class JMXAdaptor extends AbstractAdaptor{ timer.cancel(); timer = null; shutdown = true; - } + } } } @@ -181,7 +187,7 @@ public class JMXAdaptor extends AbstractAdaptor{ Descriptor d = mb.getDescriptor(); val = mbsc.getAttribute(oname, mb.getName()); if(d.getFieldNames().length > 0){ //this is an open mbean - OpenType openType = (OpenType)d.getFieldValue("openType"); + OpenType<?> openType = (OpenType<?>)d.getFieldValue("openType"); if(openType.isArray()){ Object[] valarray = (Object[])val; http://git-wip-us.apache.org/repos/asf/chukwa/blob/7f662e8c/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/OozieAdaptor.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/OozieAdaptor.java b/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/OozieAdaptor.java index 39af580..5011f70 100644 --- a/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/OozieAdaptor.java +++ b/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/OozieAdaptor.java @@ -50,7 +50,7 @@ public class OozieAdaptor extends AbstractAdaptor { private final ScheduledExecutorService scheduler = Executors .newScheduledThreadPool(1); private static final long initialDelay = 60; // seconds - private static long periodicity = 60; // seconds + private long periodicity = 60; // seconds private ScheduledFuture<?> scheduledCollectorThread; @Override http://git-wip-us.apache.org/repos/asf/chukwa/blob/7f662e8c/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/SocketAdaptor.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/SocketAdaptor.java b/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/SocketAdaptor.java index b37be9c..072c151 100644 --- a/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/SocketAdaptor.java +++ b/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/SocketAdaptor.java @@ -140,16 +140,14 @@ public class SocketAdaptor extends AbstractAdaptor { } } } catch(java.io.EOFException e) { - log.info("Caught java.io.EOFException closing conneciton."); + log.debug("Caught java.io.EOFException:", e); } catch(java.net.SocketException e) { - log.info("Caught java.net.SocketException closing conneciton."); + log.debug("Caught java.net.SocketException:", e); } catch(InterruptedIOException e) { Thread.currentThread().interrupt(); - log.info("Caught java.io.InterruptedIOException: "+e); - log.info("Closing connection."); + log.debug("Caught java.io.InterruptedIOException: ", e); } catch(IOException e) { - log.info("Caught java.io.IOException: "+e); - log.info("Closing connection."); + log.debug("Caught java.io.IOException: "+e); } catch(Exception e) { log.error("Unexpected exception. Closing conneciton.", e); } finally { http://git-wip-us.apache.org/repos/asf/chukwa/blob/7f662e8c/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/SyslogAdaptor.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/SyslogAdaptor.java b/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/SyslogAdaptor.java index 07f6c66..50dec64 100644 --- a/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/SyslogAdaptor.java +++ b/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/SyslogAdaptor.java @@ -76,7 +76,7 @@ public class SyslogAdaptor extends UDPAdaptor { facility = facility / 8; dataType = facilityMap.get(facility); } catch (NumberFormatException nfe) { - log.warn("Unsupported format detected by SyslogAdaptor:"+trimmedBuf); + log.warn("Unsupported format detected by SyslogAdaptor:"+Arrays.toString(trimmedBuf)); } } http://git-wip-us.apache.org/repos/asf/chukwa/blob/7f662e8c/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/WriteaheadBuffered.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/WriteaheadBuffered.java b/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/WriteaheadBuffered.java index dcf0600..cb04aae 100644 --- a/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/WriteaheadBuffered.java +++ b/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/WriteaheadBuffered.java @@ -17,7 +17,6 @@ */ package org.apache.hadoop.chukwa.datacollection.adaptor; -import java.util.*; import java.io.*; import org.apache.hadoop.chukwa.Chunk; import org.apache.hadoop.chukwa.ChunkImpl; @@ -37,7 +36,7 @@ public class WriteaheadBuffered extends AbstractWrapper { @Override - public synchronized void add(Chunk event) throws InterruptedException { + public void add(Chunk event) throws InterruptedException { try { event.write(outToDisk); outToDisk.flush(); @@ -85,7 +84,7 @@ public class WriteaheadBuffered extends AbstractWrapper { } @Override - public synchronized void committed(long l) { + public void committed(long l) { try { long bytesOutstanding = highestSentOffset - l; @@ -93,7 +92,10 @@ public class WriteaheadBuffered extends AbstractWrapper { fSize = 0; outToDisk.close(); File outBufTmp = new File(outBuf.getAbsoluteFile(), outBuf.getName() + ".tmp"); - outBuf.renameTo(outBufTmp); + if(!outBuf.renameTo(outBufTmp)) { + log.warn("Cannot rename temp file "+outBuf.getAbsolutePath()+ + " to "+outBufTmp.getAbsolutePath()); + }; outToDisk = new DataOutputStream(new FileOutputStream(outBuf, false)); DataInputStream dis = new DataInputStream(new FileInputStream(outBufTmp)); while(dis.available() > 0) { @@ -104,7 +106,9 @@ public class WriteaheadBuffered extends AbstractWrapper { } } dis.close(); - outBufTmp.delete(); + if(!outBufTmp.delete()) { + log.warn("Can not delete temp file: "+outBufTmp.getAbsolutePath()); + }; } } catch(IOException e) { log.error(e); @@ -114,8 +118,11 @@ public class WriteaheadBuffered extends AbstractWrapper { @Override public long shutdown(AdaptorShutdownPolicy p) throws AdaptorException { - if(p != RESTARTING) - outBuf.delete(); + if(p != RESTARTING) { + if(outBuf.delete()) { + log.warn("Cannot delete output buffer file:"+outBuf.getAbsolutePath()); + }; + } return inner.shutdown(p); } http://git-wip-us.apache.org/repos/asf/chukwa/blob/7f662e8c/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/FileTailingAdaptor.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/FileTailingAdaptor.java b/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/FileTailingAdaptor.java index 5fea073..9fc25b9 100644 --- a/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/FileTailingAdaptor.java +++ b/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/FileTailingAdaptor.java @@ -22,7 +22,6 @@ import java.io.IOException; import java.io.RandomAccessFile; import java.io.File; import org.apache.hadoop.chukwa.datacollection.adaptor.*; -import org.apache.hadoop.chukwa.util.ExceptionUtil; /** * An adaptor that repeatedly tails a specified file, sending the new bytes. @@ -118,7 +117,7 @@ public class FileTailingAdaptor extends LWFTAdaptor { * @param eq the queue to write Chunks into */ @Override - public synchronized boolean tailFile() + public boolean tailFile() throws InterruptedException { boolean hasMoreData = false; http://git-wip-us.apache.org/repos/asf/chukwa/blob/7f662e8c/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/LWFTAdaptor.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/LWFTAdaptor.java b/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/LWFTAdaptor.java index 9da09d5..dc867d5 100644 --- a/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/LWFTAdaptor.java +++ b/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/LWFTAdaptor.java @@ -51,7 +51,7 @@ public class LWFTAdaptor extends AbstractAdaptor { public static final String MAX_READ_SIZE_OPT = "chukwaAgent.fileTailingAdaptor.maxReadSize"; - static int MAX_READ_SIZE = DEFAULT_MAX_READ_SIZE; + int MAX_READ_SIZE = DEFAULT_MAX_READ_SIZE; static Logger log; static FileTailer tailer; @@ -200,7 +200,7 @@ public class LWFTAdaptor extends AbstractAdaptor { return hasMoreData; } - public synchronized boolean tailFile() + public boolean tailFile() throws InterruptedException { boolean hasMoreData = false; try { http://git-wip-us.apache.org/repos/asf/chukwa/blob/7f662e8c/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/RCheckFTAdaptor.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/RCheckFTAdaptor.java b/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/RCheckFTAdaptor.java index 2fa82fe..cd8d53f 100644 --- a/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/RCheckFTAdaptor.java +++ b/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/filetailer/RCheckFTAdaptor.java @@ -26,6 +26,8 @@ import java.util.regex.Pattern; import java.util.Collections; import java.util.LinkedList; +import org.apache.commons.lang3.builder.HashCodeBuilder; + /** * Checkpoint state: * date modified of most-recently tailed file, offset of first byte of that file, @@ -54,6 +56,22 @@ public class RCheckFTAdaptor extends LWFTAdaptor implements FileFilter { //just a heuristic that hasn't been tuned yet else return (o.f.getName().compareTo(f.getName())); } + + @Override + public boolean equals(Object o) { + if(o instanceof FPair) { + return mod == ((FPair) o).mod; + } else { + return false; + } + } + + @Override + public int hashCode() { + return new HashCodeBuilder(643, 1321). + append(this.mod). + toHashCode(); + } } long prevFileLastModDate = 0; @@ -129,7 +147,7 @@ public class RCheckFTAdaptor extends LWFTAdaptor implements FileFilter { } @Override - public synchronized boolean tailFile() + public boolean tailFile() throws InterruptedException { boolean hasMoreData = false; try { http://git-wip-us.apache.org/repos/asf/chukwa/blob/7f662e8c/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/heartbeat/HttpStatusChecker.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/heartbeat/HttpStatusChecker.java b/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/heartbeat/HttpStatusChecker.java index 082dd58..e924172 100644 --- a/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/heartbeat/HttpStatusChecker.java +++ b/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/heartbeat/HttpStatusChecker.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.chukwa.datacollection.adaptor.heartbeat; +import java.io.IOException; import java.net.HttpURLConnection; import java.net.URL; @@ -54,7 +55,7 @@ public class HttpStatusChecker implements StatusChecker { connection = (HttpURLConnection)url.openConnection(); connection.connect(); status.put("status", "running"); - } catch (Exception e) { + } catch (IOException e) { status.put("status", "stopped"); } finally { if(connection != null){ http://git-wip-us.apache.org/repos/asf/chukwa/blob/7f662e8c/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/jms/JMSAdaptor.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/jms/JMSAdaptor.java b/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/jms/JMSAdaptor.java index 991cdaf..79f8db6 100644 --- a/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/jms/JMSAdaptor.java +++ b/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/jms/JMSAdaptor.java @@ -17,6 +17,8 @@ */ package org.apache.hadoop.chukwa.datacollection.adaptor.jms; +import java.nio.charset.Charset; + import org.apache.hadoop.chukwa.datacollection.adaptor.AbstractAdaptor; import org.apache.hadoop.chukwa.datacollection.adaptor.AdaptorException; import org.apache.hadoop.chukwa.datacollection.adaptor.AdaptorShutdownPolicy; @@ -100,7 +102,7 @@ public class JMSAdaptor extends AbstractAdaptor { bytesReceived += bytes.length; if (log.isDebugEnabled()) { - log.debug("Adding Chunk from JMS message: " + new String(bytes)); + log.debug("Adding Chunk from JMS message: " + new String(bytes, Charset.forName("UTF-8"))); } Chunk c = new ChunkImpl(type, source, bytesReceived, bytes, JMSAdaptor.this); @@ -142,6 +144,7 @@ public class JMSAdaptor extends AbstractAdaptor { String transformerName = null; String transformerConfs = null; + StringBuilder transformerConfsBuffer = new StringBuilder(); for (int i = 1; i < tokens.length; i++) { String value = tokens[i]; if ("-t".equals(value)) { @@ -168,17 +171,19 @@ public class JMSAdaptor extends AbstractAdaptor { transformerName = tokens[++i]; } else if ("-p".equals(value)) { - transformerConfs = tokens[++i]; + transformerConfsBuffer.append(tokens[++i]); + transformerConfs = transformerConfsBuffer.toString(); // transformerConfs can have multiple words - if (transformerConfs.startsWith("\"")) { + if (transformerConfsBuffer.toString().startsWith("\"")) { for(int j = i + 1; j < tokens.length; j++) { - transformerConfs = transformerConfs + " " + tokens[++i]; + transformerConfsBuffer.append(" "); + transformerConfsBuffer.append(tokens[++i]); if(tokens[j].endsWith("\"")) { break; } } - transformerConfs = trimQuotes(transformerConfs); + transformerConfs = trimQuotes(transformerConfsBuffer.toString()); } } } @@ -196,7 +201,7 @@ public class JMSAdaptor extends AbstractAdaptor { // create transformer if (transformerName != null) { try { - Class classDefinition = Class.forName(transformerName); + Class<?> classDefinition = Class.forName(transformerName); Object object = classDefinition.newInstance(); transformer = (JMSMessageTransformer)object; } catch (Exception e) { http://git-wip-us.apache.org/repos/asf/chukwa/blob/7f662e8c/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/jms/JMSMessagePropertyTransformer.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/jms/JMSMessagePropertyTransformer.java b/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/jms/JMSMessagePropertyTransformer.java index b0ef917..facff2d 100644 --- a/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/jms/JMSMessagePropertyTransformer.java +++ b/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/jms/JMSMessagePropertyTransformer.java @@ -73,14 +73,16 @@ public class JMSMessagePropertyTransformer implements JMSMessageTransformer { String token = tokens[i]; if ("-d".equals(token) && i <= tokens.length - 2) { - String value = tokens[++i]; + StringBuilder value = new StringBuilder(); + value.append(tokens[++i]); // we lost all spaces with the split, so we have to put them back, yuck. while (i <= tokens.length - 2 && !tokens[i + 1].startsWith("-")) { - value = value + " " + tokens[++i]; + value.append(" "); + value.append(tokens[++i]); } - delimiter = trimSingleQuotes(value); + delimiter = trimSingleQuotes(value.toString()); } else if ("-r".equals(token) && i <= tokens.length - 2) { // requiredPropertyNames = null means all are required. http://git-wip-us.apache.org/repos/asf/chukwa/blob/7f662e8c/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/jms/JMSTextMessageTransformer.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/jms/JMSTextMessageTransformer.java b/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/jms/JMSTextMessageTransformer.java index 9f98f4a..52d4cb8 100644 --- a/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/jms/JMSTextMessageTransformer.java +++ b/src/main/java/org/apache/hadoop/chukwa/datacollection/adaptor/jms/JMSTextMessageTransformer.java @@ -17,6 +17,8 @@ */ package org.apache.hadoop.chukwa.datacollection.adaptor.jms; +import java.nio.charset.Charset; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -44,7 +46,7 @@ public class JMSTextMessageTransformer implements JMSMessageTransformer { String text = ((TextMessage)message).getText(); if (text != null && text.length() > 0) { - return text.getBytes(); + return text.getBytes(Charset.forName("UTF-8")); } return null; http://git-wip-us.apache.org/repos/asf/chukwa/blob/7f662e8c/src/main/java/org/apache/hadoop/chukwa/datacollection/agent/AdaptorResetThread.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/hadoop/chukwa/datacollection/agent/AdaptorResetThread.java b/src/main/java/org/apache/hadoop/chukwa/datacollection/agent/AdaptorResetThread.java index cde2868..88ba9bc 100644 --- a/src/main/java/org/apache/hadoop/chukwa/datacollection/agent/AdaptorResetThread.java +++ b/src/main/java/org/apache/hadoop/chukwa/datacollection/agent/AdaptorResetThread.java @@ -21,7 +21,6 @@ import java.util.*; import org.apache.hadoop.conf.*; import org.apache.hadoop.chukwa.datacollection.adaptor.Adaptor; import org.apache.hadoop.chukwa.datacollection.adaptor.AdaptorShutdownPolicy; -import org.apache.hadoop.chukwa.datacollection.collector.servlet.CommitCheckServlet; import org.apache.hadoop.chukwa.datacollection.sender.AsyncAckSender; import org.apache.hadoop.chukwa.datacollection.writer.SeqFileWriter; import org.apache.log4j.Logger; @@ -52,9 +51,8 @@ public class AdaptorResetThread extends Thread { public AdaptorResetThread(Configuration conf, ChukwaAgent a) { // - timeout = conf.getInt(SeqFileWriter.ROTATE_INTERVAL_OPT, timeout/3) - + conf.getInt(AsyncAckSender.POLLPERIOD_OPT, timeout/3) - + conf.getInt(CommitCheckServlet.SCANPERIOD_OPT, timeout/3); + timeout = conf.getInt(SeqFileWriter.ROTATE_INTERVAL_OPT, timeout/2) + + conf.getInt(AsyncAckSender.POLLPERIOD_OPT, timeout/2); timeout = conf.getInt(TIMEOUT_OPT, timeout); //unless overridden http://git-wip-us.apache.org/repos/asf/chukwa/blob/7f662e8c/src/main/java/org/apache/hadoop/chukwa/datacollection/agent/AgentControlSocketListener.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/hadoop/chukwa/datacollection/agent/AgentControlSocketListener.java b/src/main/java/org/apache/hadoop/chukwa/datacollection/agent/AgentControlSocketListener.java index dda7888..d024180 100644 --- a/src/main/java/org/apache/hadoop/chukwa/datacollection/agent/AgentControlSocketListener.java +++ b/src/main/java/org/apache/hadoop/chukwa/datacollection/agent/AgentControlSocketListener.java @@ -74,7 +74,7 @@ public class AgentControlSocketListener extends Thread { InputStream in = connection.getInputStream(); BufferedReader br = new BufferedReader(new InputStreamReader(in, Charset.forName("UTF-8"))); PrintStream out = new PrintStream(new BufferedOutputStream(connection - .getOutputStream())); + .getOutputStream()), true, "UTF-8"); String cmd = null; while ((cmd = br.readLine()) != null) { processCommand(cmd, out); http://git-wip-us.apache.org/repos/asf/chukwa/blob/7f662e8c/src/main/java/org/apache/hadoop/chukwa/datacollection/agent/rest/AdaptorConfig.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/hadoop/chukwa/datacollection/agent/rest/AdaptorConfig.java b/src/main/java/org/apache/hadoop/chukwa/datacollection/agent/rest/AdaptorConfig.java index 78c307e..03ed635 100644 --- a/src/main/java/org/apache/hadoop/chukwa/datacollection/agent/rest/AdaptorConfig.java +++ b/src/main/java/org/apache/hadoop/chukwa/datacollection/agent/rest/AdaptorConfig.java @@ -17,7 +17,6 @@ */ package org.apache.hadoop.chukwa.datacollection.agent.rest; -import javax.xml.bind.annotation.XmlAttribute; import javax.xml.bind.annotation.XmlElement; import javax.xml.bind.annotation.XmlRootElement; import javax.xml.bind.annotation.XmlAccessorType; http://git-wip-us.apache.org/repos/asf/chukwa/blob/7f662e8c/src/main/java/org/apache/hadoop/chukwa/datacollection/agent/rest/AdaptorController.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/hadoop/chukwa/datacollection/agent/rest/AdaptorController.java b/src/main/java/org/apache/hadoop/chukwa/datacollection/agent/rest/AdaptorController.java index 70edc2a..dc44975 100644 --- a/src/main/java/org/apache/hadoop/chukwa/datacollection/agent/rest/AdaptorController.java +++ b/src/main/java/org/apache/hadoop/chukwa/datacollection/agent/rest/AdaptorController.java @@ -21,9 +21,6 @@ import org.apache.hadoop.chukwa.datacollection.agent.ChukwaAgent; import org.apache.hadoop.chukwa.datacollection.adaptor.AdaptorException; import org.apache.hadoop.chukwa.datacollection.adaptor.Adaptor; import org.apache.hadoop.chukwa.datacollection.OffsetStatsManager; -import org.apache.hadoop.chukwa.util.ExceptionUtil; -import org.apache.commons.lang.StringEscapeUtils; -import org.json.simple.JSONObject; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -31,15 +28,11 @@ import javax.ws.rs.Path; import javax.ws.rs.GET; import javax.ws.rs.Produces; import javax.ws.rs.PathParam; -import javax.ws.rs.QueryParam; import javax.ws.rs.DELETE; import javax.ws.rs.POST; import javax.ws.rs.Consumes; -import javax.ws.rs.WebApplicationException; -import javax.ws.rs.core.Context; import javax.ws.rs.core.Response; import javax.ws.rs.core.MediaType; -import javax.servlet.ServletContext; import javax.servlet.http.HttpServletResponse; import java.text.DecimalFormat; @@ -54,7 +47,7 @@ import java.util.Map; public class AdaptorController { private static final DecimalFormat DECIMAL_FORMAT = new DecimalFormat(); - private static final Log log = LogFactory.getLog(AdaptorController.class); + private static final Log LOG = LogFactory.getLog(AdaptorController.class); static { DECIMAL_FORMAT.setMinimumFractionDigits(2); @@ -95,6 +88,8 @@ public class AdaptorController { String adaptorId = agent.processAddCommandE(addCommand.toString()); return doGetAdaptor(adaptorId); } catch (AdaptorException e) { + LOG.warn("Could not add adaptor for data type: '" + ac.getDataType() + + "', error: " + e.getMessage()); return badRequestResponse("Could not add adaptor for data type: '" + ac.getDataType() + "', error: " + e.getMessage()); } @@ -180,7 +175,7 @@ public class AdaptorController { protected AdaptorInfo buildAdaptor(String adaptorId) { ChukwaAgent agent = ChukwaAgent.getAgent(); Adaptor adaptor = agent.getAdaptor(adaptorId); - OffsetStatsManager adaptorStats = agent.getAdaptorStatsManager(); + OffsetStatsManager<Adaptor> adaptorStats = agent.getAdaptorStatsManager(); AdaptorInfo info = new AdaptorInfo(); info.setId(adaptorId); @@ -205,7 +200,7 @@ public class AdaptorController { AdaptorList list = new AdaptorList(); for(String name : adaptorMap.keySet()) { Adaptor adaptor = agent.getAdaptor(name); - OffsetStatsManager adaptorStats = agent.getAdaptorStatsManager(); + OffsetStatsManager<Adaptor> adaptorStats = agent.getAdaptorStatsManager(); AdaptorInfo info = new AdaptorInfo(); info.setId(name); http://git-wip-us.apache.org/repos/asf/chukwa/blob/7f662e8c/src/main/java/org/apache/hadoop/chukwa/datacollection/collector/servlet/CommitCheckServlet.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/hadoop/chukwa/datacollection/collector/servlet/CommitCheckServlet.java b/src/main/java/org/apache/hadoop/chukwa/datacollection/collector/servlet/CommitCheckServlet.java index d4c2df4..df1616b 100644 --- a/src/main/java/org/apache/hadoop/chukwa/datacollection/collector/servlet/CommitCheckServlet.java +++ b/src/main/java/org/apache/hadoop/chukwa/datacollection/collector/servlet/CommitCheckServlet.java @@ -20,13 +20,18 @@ package org.apache.hadoop.chukwa.datacollection.collector.servlet; import java.io.IOException; import java.io.PrintStream; import java.net.URI; + import javax.servlet.ServletConfig; import javax.servlet.ServletException; import javax.servlet.http.HttpServlet; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; + import org.apache.log4j.Logger; + import java.util.*; + +import org.apache.commons.lang3.builder.HashCodeBuilder; import org.apache.hadoop.chukwa.datacollection.writer.SeqFileWriter; import org.apache.hadoop.chukwa.extraction.CHUKWA_CONSTANT; import org.apache.hadoop.chukwa.extraction.archive.SinkArchiver; @@ -39,8 +44,8 @@ public class CommitCheckServlet extends HttpServlet { private static final long serialVersionUID = -4627538252371890849L; protected final static Logger log = Logger.getLogger(CommitCheckServlet.class); - CommitCheckThread commitCheck; - Configuration conf; + transient CommitCheckThread commitCheck; + transient Configuration conf; //interval at which to scan the filesystem, ms public static final String SCANPERIOD_OPT = "chukwaCollector.asyncAcks.scanperiod"; @@ -78,7 +83,7 @@ public class CommitCheckServlet extends HttpServlet { protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException { - PrintStream out = new PrintStream(resp.getOutputStream()); + PrintStream out = new PrintStream(resp.getOutputStream(), true, "UTF-8"); resp.setStatus(200); out.println("<html><body><h2>Commit status</h2><ul>"); @@ -98,6 +103,7 @@ public class CommitCheckServlet extends HttpServlet { * For now, instead, we'll just do an ls in a bunch of places. */ private static class CommitCheckThread extends Thread implements CHUKWA_CONSTANT { + int checkInterval = 1000 * 30; volatile boolean running = true; final Collection<Path> pathsToSearch; @@ -116,15 +122,30 @@ public class CommitCheckServlet extends HttpServlet { this.purgeTime = time; this.len = len; } - + + @Override + public boolean equals (Object o) { + if(o == null || !(o instanceof PurgeTask)) { + return false; + } + PurgeTask other = (PurgeTask) o; + return this.hashCode() == other.hashCode(); + } + + @Override public int compareTo(PurgeTask p) { if(purgeTime < p.purgeTime) return -1; - else if (purgeTime == p.purgeTime) + else if (this.equals(p)) return 0; else return 1; } + + @Override + public int hashCode() { + return new HashCodeBuilder(3221, 4271).append(purgeTime).toHashCode(); + } } http://git-wip-us.apache.org/repos/asf/chukwa/blob/7f662e8c/src/main/java/org/apache/hadoop/chukwa/datacollection/collector/servlet/LogDisplayServlet.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/hadoop/chukwa/datacollection/collector/servlet/LogDisplayServlet.java b/src/main/java/org/apache/hadoop/chukwa/datacollection/collector/servlet/LogDisplayServlet.java index 613fa3e..dfbe53a 100644 --- a/src/main/java/org/apache/hadoop/chukwa/datacollection/collector/servlet/LogDisplayServlet.java +++ b/src/main/java/org/apache/hadoop/chukwa/datacollection/collector/servlet/LogDisplayServlet.java @@ -22,11 +22,15 @@ import javax.servlet.ServletException; import javax.servlet.http.HttpServlet; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; + import org.apache.log4j.Logger; + import java.io.*; +import java.nio.charset.Charset; import java.security.MessageDigest; import java.security.NoSuchAlgorithmException; import java.util.*; + import org.apache.hadoop.chukwa.*; import org.apache.hadoop.chukwa.datacollection.writer.ExtractorWriter; import org.apache.hadoop.conf.Configuration; @@ -61,8 +65,8 @@ public class LogDisplayServlet extends HttpServlet { public static final String BUF_SIZE_OPT = "chukwaCollector.showLogs.buffer"; long BUF_SIZE = 1024* 1024; - Configuration conf; - Map<String, Deque<Chunk>> chunksBySID = new HashMap<String, Deque<Chunk>>(); + transient Configuration conf; + transient Map<String, Deque<Chunk>> chunksBySID; Queue<String> receivedSIDs = new LinkedList<String>(); long totalStoredSize = 0; @@ -71,12 +75,20 @@ public class LogDisplayServlet extends HttpServlet { public LogDisplayServlet() { conf = new Configuration(); - ExtractorWriter.recipient = this; + chunksBySID = new HashMap<String, Deque<Chunk>>(); + ExtractorWriter.setRecipient(this); } public LogDisplayServlet(Configuration c) { conf = c; - ExtractorWriter.recipient = this; + chunksBySID = new HashMap<String, Deque<Chunk>>(); + ExtractorWriter.setRecipient(this); + } + + public LogDisplayServlet(Configuration c, Map<String, Deque<Chunk>> chunksBySID) { + conf = c; + this.chunksBySID = chunksBySID; + ExtractorWriter.setRecipient(this); } public void init(ServletConfig servletConf) throws ServletException { @@ -93,9 +105,9 @@ public class LogDisplayServlet extends HttpServlet { MessageDigest md; md = MessageDigest.getInstance("MD5"); - md.update(c.getSource().getBytes()); - md.update(c.getStreamName().getBytes()); - md.update(c.getTags().getBytes()); + md.update(c.getSource().getBytes(Charset.forName("UTF-8"))); + md.update(c.getStreamName().getBytes(Charset.forName("UTF-8"))); + md.update(c.getTags().getBytes(Charset.forName("UTF-8"))); StringBuilder sb = new StringBuilder(); byte[] bytes = md.digest(); for(int i=0; i < bytes.length; ++i) { @@ -106,7 +118,6 @@ public class LogDisplayServlet extends HttpServlet { return sb.toString(); } catch(NoSuchAlgorithmException n) { log.fatal(n); - System.exit(0); return null; } } @@ -146,7 +157,7 @@ public class LogDisplayServlet extends HttpServlet { protected synchronized void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException { - PrintStream out = new PrintStream(new BufferedOutputStream(resp.getOutputStream())); + PrintStream out = new PrintStream(new BufferedOutputStream(resp.getOutputStream()), true, "UTF-8"); resp.setStatus(200); String path = req.getServletPath(); String streamID = req.getParameter("sid"); http://git-wip-us.apache.org/repos/asf/chukwa/blob/7f662e8c/src/main/java/org/apache/hadoop/chukwa/datacollection/collector/servlet/ServletCollector.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/hadoop/chukwa/datacollection/collector/servlet/ServletCollector.java b/src/main/java/org/apache/hadoop/chukwa/datacollection/collector/servlet/ServletCollector.java index 5c3ea71..0a78f2f 100644 --- a/src/main/java/org/apache/hadoop/chukwa/datacollection/collector/servlet/ServletCollector.java +++ b/src/main/java/org/apache/hadoop/chukwa/datacollection/collector/servlet/ServletCollector.java @@ -54,14 +54,14 @@ public class ServletCollector extends HttpServlet { * If a chunk is committed; then the ack will start with the following string. */ public static final String ACK_PREFIX = "ok: "; - org.apache.hadoop.chukwa.datacollection.writer.ChukwaWriter writer = null; + transient ChukwaWriter writer = null; private static final long serialVersionUID = 6286162898591407111L; - Logger log = Logger.getRootLogger();// .getLogger(ServletCollector.class); + transient Logger log = Logger.getLogger(ServletCollector.class); - static boolean COMPRESS; - static String CODEC_NAME; - static CompressionCodec codec; + boolean COMPRESS; + String CODEC_NAME; + transient CompressionCodec codec; public void setWriter(ChukwaWriter w) { writer = w; @@ -76,7 +76,7 @@ public class ServletCollector extends HttpServlet { int numberchunks = 0; long lifetimechunks = 0; - Configuration conf; + transient Configuration conf; public ServletCollector(Configuration c) { conf = c; @@ -151,7 +151,6 @@ public class ServletCollector extends HttpServlet { protected void accept(HttpServletRequest req, HttpServletResponse resp) throws ServletException { numberHTTPConnection++; - ServletDiagnostics diagnosticPage = new ServletDiagnostics(); final long currentTime = System.currentTimeMillis(); try { @@ -173,10 +172,6 @@ public class ServletCollector extends HttpServlet { final int numEvents = di.readInt(); // log.info("saw " + numEvents+ " in request"); - if (FANCY_DIAGNOSTICS) { - diagnosticPage.sawPost(req.getRemoteHost(), numEvents, currentTime); - } - List<Chunk> events = new LinkedList<Chunk>(); StringBuilder sb = new StringBuilder(); @@ -184,9 +179,6 @@ public class ServletCollector extends HttpServlet { ChunkImpl logEvent = ChunkImpl.read(di); events.add(logEvent); - if (FANCY_DIAGNOSTICS) { - diagnosticPage.sawChunk(logEvent, i); - } } int responseStatus = HttpServletResponse.SC_OK; @@ -226,10 +218,6 @@ public class ServletCollector extends HttpServlet { l_out.println("can't write: no writer"); } - if (FANCY_DIAGNOSTICS) { - diagnosticPage.doneWithPost(); - } - resp.setStatus(responseStatus); } catch (Throwable e) { @@ -251,7 +239,7 @@ public class ServletCollector extends HttpServlet { log.info("new GET from " + req.getRemoteHost() + " at " + System.currentTimeMillis()); - PrintStream out = new PrintStream(resp.getOutputStream()); + PrintStream out = new PrintStream(resp.getOutputStream(), true, "UTF-8"); resp.setStatus(200); String pingAtt = req.getParameter("ping"); @@ -264,8 +252,6 @@ public class ServletCollector extends HttpServlet { out.println("lifetimechunks:" + lifetimechunks); } else { out.println("<html><body><h2>Chukwa servlet running</h2>"); - if (FANCY_DIAGNOSTICS) - ServletDiagnostics.printPage(out); out.println("</body></html>"); } http://git-wip-us.apache.org/repos/asf/chukwa/blob/7f662e8c/src/main/java/org/apache/hadoop/chukwa/datacollection/connector/ChunkCatcherConnector.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/hadoop/chukwa/datacollection/connector/ChunkCatcherConnector.java b/src/main/java/org/apache/hadoop/chukwa/datacollection/connector/ChunkCatcherConnector.java index a9bd744..29c1fb5 100644 --- a/src/main/java/org/apache/hadoop/chukwa/datacollection/connector/ChunkCatcherConnector.java +++ b/src/main/java/org/apache/hadoop/chukwa/datacollection/connector/ChunkCatcherConnector.java @@ -28,7 +28,7 @@ public class ChunkCatcherConnector implements Connector { Timer tm; - class Interruptor extends TimerTask { + static class Interruptor extends TimerTask { Thread targ; volatile boolean deactivate = false; Interruptor(Thread t) { http://git-wip-us.apache.org/repos/asf/chukwa/blob/7f662e8c/src/main/java/org/apache/hadoop/chukwa/datacollection/connector/PipelineConnector.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/hadoop/chukwa/datacollection/connector/PipelineConnector.java b/src/main/java/org/apache/hadoop/chukwa/datacollection/connector/PipelineConnector.java index b998139..929d871 100644 --- a/src/main/java/org/apache/hadoop/chukwa/datacollection/connector/PipelineConnector.java +++ b/src/main/java/org/apache/hadoop/chukwa/datacollection/connector/PipelineConnector.java @@ -59,7 +59,7 @@ public class PipelineConnector implements Connector, Runnable { ChunkQueue chunkQueue; - private static volatile ChukwaAgent agent = null; + private ChukwaAgent agent = null; private volatile boolean stopMe = false; protected ChukwaWriter writers = null; http://git-wip-us.apache.org/repos/asf/chukwa/blob/7f662e8c/src/main/java/org/apache/hadoop/chukwa/datacollection/connector/http/HttpConnector.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/hadoop/chukwa/datacollection/connector/http/HttpConnector.java b/src/main/java/org/apache/hadoop/chukwa/datacollection/connector/http/HttpConnector.java index 3bb0dd7..e542b2f 100644 --- a/src/main/java/org/apache/hadoop/chukwa/datacollection/connector/http/HttpConnector.java +++ b/src/main/java/org/apache/hadoop/chukwa/datacollection/connector/http/HttpConnector.java @@ -41,6 +41,8 @@ import java.util.Iterator; import java.util.List; import java.util.Timer; import java.util.TimerTask; +import java.util.concurrent.atomic.AtomicInteger; + import org.apache.hadoop.chukwa.Chunk; import org.apache.hadoop.chukwa.datacollection.ChunkQueue; import org.apache.hadoop.chukwa.datacollection.DataFactory; @@ -55,7 +57,7 @@ public class HttpConnector implements Connector, Runnable { static Logger log = Logger.getLogger(HttpConnector.class); Timer statTimer = null; - volatile int chunkCount = 0; + AtomicInteger chunkCount = new AtomicInteger(); int MAX_SIZE_PER_POST = 2 * 1024 * 1024; int MIN_POST_INTERVAL = 5 * 1000; @@ -78,8 +80,8 @@ public class HttpConnector implements Connector, Runnable { statTimer = new Timer(); statTimer.schedule(new TimerTask() { public void run() { - int count = chunkCount; - chunkCount = 0; + int count = chunkCount.get(); + chunkCount.set(0); log.info("# http chunks ACK'ed since last report: " + count); } }, 100, 60 * 1000); @@ -170,7 +172,7 @@ public class HttpConnector implements Connector, Runnable { // checkpoint the chunks which were committed for (ChukwaHttpSender.CommitListEntry cle : results) { agent.reportCommit(cle.adaptor, cle.uuid); - chunkCount++; + chunkCount.set(chunkCount.get()+1);; } long now = System.currentTimeMillis(); http://git-wip-us.apache.org/repos/asf/chukwa/blob/7f662e8c/src/main/java/org/apache/hadoop/chukwa/datacollection/sender/AsyncAckSender.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/hadoop/chukwa/datacollection/sender/AsyncAckSender.java b/src/main/java/org/apache/hadoop/chukwa/datacollection/sender/AsyncAckSender.java index 6f818e4..2b6617d 100644 --- a/src/main/java/org/apache/hadoop/chukwa/datacollection/sender/AsyncAckSender.java +++ b/src/main/java/org/apache/hadoop/chukwa/datacollection/sender/AsyncAckSender.java @@ -23,12 +23,10 @@ import org.apache.hadoop.chukwa.datacollection.agent.*; import org.apache.hadoop.chukwa.datacollection.adaptor.Adaptor; import org.apache.hadoop.chukwa.datacollection.collector.servlet.CommitCheckServlet; import org.apache.hadoop.chukwa.datacollection.collector.servlet.ServletCollector; -import org.apache.hadoop.chukwa.datacollection.sender.ChukwaHttpSender.CommitListEntry; import java.util.*; import java.util.regex.Matcher; import java.util.regex.Pattern; import org.apache.hadoop.conf.*; -import org.apache.commons.httpclient.*; import org.apache.commons.httpclient.methods.GetMethod; import org.apache.commons.httpclient.methods.PostMethod; //import com.google.common.collect.SortedSetMultimap; @@ -74,7 +72,7 @@ public class AsyncAckSender extends ChukwaHttpSender{ int c = o.aName.compareTo(this.aName); if(c != 0) return c; - c = fname.compareTo(this.fname); + c = o.fname.compareTo(this.fname); if(c != 0) return c; if(o.start < start) @@ -83,7 +81,19 @@ public class AsyncAckSender extends ChukwaHttpSender{ return -1; else return 0; } - + + @Override + public boolean equals(Object o) { + if(!(o instanceof DelayedCommit)) { + return false; + } + DelayedCommit dc = (DelayedCommit) o; + if(this.aName.equals(dc.aName)) { + return true; + } + return false; + } + public String toString() { return adaptor +" commits from" + start + " to " + uuid + " when " + fname + " hits " + fOffset; } http://git-wip-us.apache.org/repos/asf/chukwa/blob/7f662e8c/src/main/java/org/apache/hadoop/chukwa/datacollection/sender/ChukwaHttpSender.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/hadoop/chukwa/datacollection/sender/ChukwaHttpSender.java b/src/main/java/org/apache/hadoop/chukwa/datacollection/sender/ChukwaHttpSender.java index 1c8c3d2..76727fe 100644 --- a/src/main/java/org/apache/hadoop/chukwa/datacollection/sender/ChukwaHttpSender.java +++ b/src/main/java/org/apache/hadoop/chukwa/datacollection/sender/ChukwaHttpSender.java @@ -88,9 +88,9 @@ public class ChukwaHttpSender implements ChukwaSender { protected Iterator<String> collectors; - static boolean COMPRESS; - static String CODEC_NAME; - static CompressionCodec codec; + boolean COMPRESS; + String CODEC_NAME; + CompressionCodec codec; static { connectionManager = new MultiThreadedHttpConnectionManager(); @@ -112,9 +112,13 @@ public class ChukwaHttpSender implements ChukwaSender { // FIXME: probably we're better off with an EventListRequestEntity static class BuffersRequestEntity implements RequestEntity { List<DataOutputBuffer> buffers; + boolean compress; + CompressionCodec codec; - public BuffersRequestEntity(List<DataOutputBuffer> buf) { + public BuffersRequestEntity(List<DataOutputBuffer> buf, boolean compress, CompressionCodec codec) { buffers = buf; + this.compress = compress; + this.codec = codec; } private long getUncompressedContentLenght(){ @@ -125,7 +129,7 @@ public class ChukwaHttpSender implements ChukwaSender { } public long getContentLength() { - if( COMPRESS) { + if(compress) { return -1; } else { @@ -148,7 +152,7 @@ public class ChukwaHttpSender implements ChukwaSender { } public void writeRequest(OutputStream out) throws IOException { - if( COMPRESS) { + if(compress) { CompressionOutputStream cos = codec.createOutputStream(out); DataOutputStream dos = new DataOutputStream( cos); doWriteRequest( dos); @@ -239,7 +243,7 @@ public class ChukwaHttpSender implements ChukwaSender { toSend.clear(); // collect all serialized chunks into a single buffer to send - RequestEntity postData = new BuffersRequestEntity(serializedEvents); + RequestEntity postData = new BuffersRequestEntity(serializedEvents, COMPRESS, codec); PostMethod method = new PostMethod(); method.setRequestEntity(postData); http://git-wip-us.apache.org/repos/asf/chukwa/blob/7f662e8c/src/main/java/org/apache/hadoop/chukwa/datacollection/sender/RetryListOfCollectors.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/hadoop/chukwa/datacollection/sender/RetryListOfCollectors.java b/src/main/java/org/apache/hadoop/chukwa/datacollection/sender/RetryListOfCollectors.java index 15cb20a..c636ad2 100644 --- a/src/main/java/org/apache/hadoop/chukwa/datacollection/sender/RetryListOfCollectors.java +++ b/src/main/java/org/apache/hadoop/chukwa/datacollection/sender/RetryListOfCollectors.java @@ -20,7 +20,9 @@ package org.apache.hadoop.chukwa.datacollection.sender; import java.io.*; +import java.nio.charset.Charset; import java.util.*; + import org.apache.hadoop.conf.Configuration; /*** @@ -40,14 +42,14 @@ public class RetryListOfCollectors implements Iterator<String>, Cloneable { long lastLookAtFirstNode; int nextCollector = 0; private String portNo; - Configuration conf; public static final String RETRY_RATE_OPT = "chukwaAgent.connector.retryRate"; public RetryListOfCollectors(File collectorFile, Configuration conf) throws IOException { this(conf); try { - BufferedReader br = new BufferedReader(new FileReader(collectorFile)); + FileInputStream fis = new FileInputStream(collectorFile); + BufferedReader br = new BufferedReader(new InputStreamReader(fis, Charset.forName("UTF-8"))); String line, parsedline; while ((line = br.readLine()) != null) { parsedline = canonicalizeLine(line); @@ -104,7 +106,6 @@ public class RetryListOfCollectors implements Iterator<String>, Cloneable { public RetryListOfCollectors(Configuration conf) { collectors = new ArrayList<String>(); - this.conf = conf; portNo = conf.get("chukwaCollector.http.port", "8080"); maxRetryRateMs = conf.getInt(RETRY_RATE_OPT, 15 * 1000); lastLookAtFirstNode = 0; http://git-wip-us.apache.org/repos/asf/chukwa/blob/7f662e8c/src/main/java/org/apache/hadoop/chukwa/datacollection/test/ConsoleOutConnector.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/hadoop/chukwa/datacollection/test/ConsoleOutConnector.java b/src/main/java/org/apache/hadoop/chukwa/datacollection/test/ConsoleOutConnector.java index 7ba7a29..7c2e755 100644 --- a/src/main/java/org/apache/hadoop/chukwa/datacollection/test/ConsoleOutConnector.java +++ b/src/main/java/org/apache/hadoop/chukwa/datacollection/test/ConsoleOutConnector.java @@ -23,6 +23,8 @@ import org.apache.hadoop.chukwa.Chunk; import org.apache.hadoop.chukwa.datacollection.*; import org.apache.hadoop.chukwa.datacollection.agent.*; import org.apache.hadoop.chukwa.datacollection.connector.Connector; + +import java.nio.charset.Charset; import java.util.*; /** @@ -64,7 +66,7 @@ public class ConsoleOutConnector extends Thread implements Connector { System.out.println("data length was " + e.getData().length + ", not printing"); else - System.out.println(new String(e.getData())); + System.out.println(new String(e.getData(), Charset.forName("UTF-8"))); } agent.reportCommit(e.getInitiator(), e.getSeqID()); http://git-wip-us.apache.org/repos/asf/chukwa/blob/7f662e8c/src/main/java/org/apache/hadoop/chukwa/datacollection/test/FilePerPostWriter.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/hadoop/chukwa/datacollection/test/FilePerPostWriter.java b/src/main/java/org/apache/hadoop/chukwa/datacollection/test/FilePerPostWriter.java index b7215c9..d52d58f 100644 --- a/src/main/java/org/apache/hadoop/chukwa/datacollection/test/FilePerPostWriter.java +++ b/src/main/java/org/apache/hadoop/chukwa/datacollection/test/FilePerPostWriter.java @@ -28,8 +28,6 @@ import org.apache.hadoop.chukwa.ChukwaArchiveKey; import org.apache.hadoop.chukwa.Chunk; import org.apache.hadoop.chukwa.ChunkImpl; import org.apache.hadoop.chukwa.datacollection.writer.*; -import org.apache.hadoop.chukwa.datacollection.writer.ChukwaWriter.CommitStatus; -import org.apache.hadoop.chukwa.datacollection.writer.SeqFileWriter.StatReportingTask; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; @@ -45,17 +43,7 @@ public class FilePerPostWriter extends SeqFileWriter { String baseName; AtomicLong counter = new AtomicLong(0); - - protected FileSystem fs = null; - protected Configuration conf = null; - - protected String outputDir = null; -// private Calendar calendar = Calendar.getInstance(); - protected Path currentPath = null; - protected String currentFileName = null; - - @Override public synchronized CommitStatus add(List<Chunk> chunks) throws WriterException { @@ -83,12 +71,10 @@ public class FilePerPostWriter extends SeqFileWriter { + "/" + chunk.getStreamName()); archiveKey.setSeqId(chunk.getSeqID()); - if (chunk != null) { // compute size for stats dataSize += chunk.getData().length; bytesThisRotate += chunk.getData().length; seqFileWriter.append(archiveKey, chunk); - } } @@ -129,7 +115,13 @@ public class FilePerPostWriter extends SeqFileWriter { } catch(Exception e) { throw new WriterException(e); } - } + protected String getCurrentFileName() { + return currentFileName; + } + + protected Path getCurrentPath() { + return currentPath; + } } http://git-wip-us.apache.org/repos/asf/chukwa/blob/7f662e8c/src/main/java/org/apache/hadoop/chukwa/datacollection/test/FileTailerStressTest.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/hadoop/chukwa/datacollection/test/FileTailerStressTest.java b/src/main/java/org/apache/hadoop/chukwa/datacollection/test/FileTailerStressTest.java index 7aeab22..e00229a 100644 --- a/src/main/java/org/apache/hadoop/chukwa/datacollection/test/FileTailerStressTest.java +++ b/src/main/java/org/apache/hadoop/chukwa/datacollection/test/FileTailerStressTest.java @@ -25,10 +25,13 @@ import org.apache.hadoop.chukwa.datacollection.collector.servlet.ServletCollecto import org.apache.hadoop.chukwa.datacollection.connector.http.HttpConnector; import org.apache.hadoop.chukwa.datacollection.controller.ChukwaAgentController; import org.apache.hadoop.chukwa.datacollection.writer.ConsoleWriter; +import org.apache.hadoop.chukwa.datacollection.writer.SeqFileWriter; import org.apache.hadoop.conf.Configuration; +import org.apache.log4j.Logger; import org.mortbay.jetty.Server; import org.mortbay.jetty.servlet.Context; import org.mortbay.jetty.servlet.ServletHolder; + import java.io.*; import java.util.*; @@ -36,6 +39,7 @@ public class FileTailerStressTest { static final int DELAY_MIN = 10 * 1000; static final int DELAY_RANGE = 2 * 1000; + static final Logger log = Logger.getLogger(FileTailerStressTest.class); static class OccasionalWriterThread extends Thread { File file; @@ -45,9 +49,9 @@ public class FileTailerStressTest { } public void run() { + PrintWriter out = null; try { - FileOutputStream fos = new FileOutputStream(file); - PrintWriter out = new PrintWriter(fos); + out = new PrintWriter(file.getAbsolutePath(), "UTF-8"); Random rand = new Random(); while (true) { int delay = rand.nextInt(DELAY_RANGE) + DELAY_MIN; @@ -59,6 +63,9 @@ public class FileTailerStressTest { } catch (IOException e) { e.printStackTrace(); } catch (InterruptedException e) { + if(out != null) { + out.close(); + } } } } @@ -91,7 +98,9 @@ public class FileTailerStressTest { ChukwaAgentController cli = new ChukwaAgentController("localhost", portno); File workdir = new File("/tmp/stresstest/"); - workdir.mkdir(); + if(!workdir.mkdir()) { + log.warn("Error creating working directory:" + workdir.getAbsolutePath()); + } for (int i = 0; i < FILES_TO_USE; ++i) { File newTestF = new File("/tmp/stresstest/" + i); @@ -102,7 +111,9 @@ public class FileTailerStressTest { Thread.sleep(60 * 1000); System.out.println("cleaning up"); - workdir.delete(); + if(!workdir.delete()) { + log.warn("Error clean up working directory:" + workdir.getAbsolutePath()); + } } catch (Exception e) { e.printStackTrace(); } http://git-wip-us.apache.org/repos/asf/chukwa/blob/7f662e8c/src/main/java/org/apache/hadoop/chukwa/datacollection/test/SinkFileValidator.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/hadoop/chukwa/datacollection/test/SinkFileValidator.java b/src/main/java/org/apache/hadoop/chukwa/datacollection/test/SinkFileValidator.java index f48ba9a..db1be4d 100644 --- a/src/main/java/org/apache/hadoop/chukwa/datacollection/test/SinkFileValidator.java +++ b/src/main/java/org/apache/hadoop/chukwa/datacollection/test/SinkFileValidator.java @@ -19,7 +19,11 @@ package org.apache.hadoop.chukwa.datacollection.test; +import java.io.IOException; import java.net.URI; +import java.net.URISyntaxException; +import java.nio.charset.Charset; + import org.apache.hadoop.chukwa.ChukwaArchiveKey; import org.apache.hadoop.chukwa.ChunkImpl; import org.apache.hadoop.conf.Configuration; @@ -65,14 +69,16 @@ public class SinkFileValidator { if (evt.getData().length > 1000) { System.out.println("got event; data: " - + new String(evt.getData(), 0, 1000)); + + new String(evt.getData(), 0, 1000, Charset.forName("UTF-8"))); System.out.println("....[truncating]"); } else - System.out.println("got event; data: " + new String(evt.getData())); + System.out.println("got event; data: " + new String(evt.getData(), Charset.forName("UTF-8"))); events++; } System.out.println("file looks OK!"); - } catch (Exception e) { + } catch (IOException e) { + e.printStackTrace(); + } catch (URISyntaxException e) { e.printStackTrace(); } http://git-wip-us.apache.org/repos/asf/chukwa/blob/7f662e8c/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/ConsoleWriter.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/ConsoleWriter.java b/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/ConsoleWriter.java index 177d013..d8f5335 100644 --- a/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/ConsoleWriter.java +++ b/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/ConsoleWriter.java @@ -19,9 +19,11 @@ package org.apache.hadoop.chukwa.datacollection.writer; +import java.nio.charset.Charset; import java.util.List; import java.util.Timer; import java.util.TimerTask; + import org.apache.hadoop.chukwa.Chunk; import org.apache.hadoop.conf.Configuration; @@ -84,7 +86,7 @@ public class ConsoleWriter implements ChukwaWriter { System.out.print(data.getDataType()); System.out.print(") "); System.out.print(new String(data.getData(), startOffset, offset - - startOffset + 1)); + - startOffset + 1, Charset.forName("UTF-8"))); startOffset = offset + 1; } } http://git-wip-us.apache.org/repos/asf/chukwa/blob/7f662e8c/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/Dedup.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/Dedup.java b/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/Dedup.java index cefb42b..2cfd216 100644 --- a/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/Dedup.java +++ b/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/Dedup.java @@ -57,7 +57,7 @@ public class Dedup extends PipelineableWriter { final HashSet<EntryType> hs; final Queue<EntryType> toDrop; final int maxSize; - volatile long dupchunks = 0; + long dupchunks = 0; public FixedSizeCache(int size) { maxSize = size; http://git-wip-us.apache.org/repos/asf/chukwa/blob/7f662e8c/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/ExtractorWriter.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/ExtractorWriter.java b/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/ExtractorWriter.java index 1a681f5..a8a281e 100644 --- a/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/ExtractorWriter.java +++ b/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/ExtractorWriter.java @@ -24,7 +24,7 @@ import org.apache.hadoop.conf.Configuration; public class ExtractorWriter extends PipelineableWriter { - public static LogDisplayServlet recipient; + private static LogDisplayServlet recipient; @Override public void close() throws WriterException { @@ -44,4 +44,8 @@ public class ExtractorWriter extends PipelineableWriter { return ChukwaWriter.COMMIT_OK; } + public static void setRecipient(LogDisplayServlet logDisplayServlet) { + recipient = logDisplayServlet; + } + } http://git-wip-us.apache.org/repos/asf/chukwa/blob/7f662e8c/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/InMemoryWriter.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/InMemoryWriter.java b/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/InMemoryWriter.java index f2d4252..4d9e2a0 100644 --- a/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/InMemoryWriter.java +++ b/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/InMemoryWriter.java @@ -44,9 +44,6 @@ public class InMemoryWriter implements ChukwaWriter { e.printStackTrace(); throw new WriterException(e); } - synchronized (this) { - notify(); - } } @Override http://git-wip-us.apache.org/repos/asf/chukwa/blob/7f662e8c/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/PipelineStageWriter.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/PipelineStageWriter.java b/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/PipelineStageWriter.java index e30362d..141be20 100644 --- a/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/PipelineStageWriter.java +++ b/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/PipelineStageWriter.java @@ -111,7 +111,11 @@ public class PipelineStageWriter implements ChukwaWriter { writer = (ChukwaWriter) st; // one stage pipeline } return; - } catch (Exception e) { + } catch (IOException | + WriterException | + ClassNotFoundException | + IllegalAccessException | + InstantiationException e) { // if anything went wrong (missing class, etc) we wind up here. log.error("failed to set up pipeline, defaulting to SeqFileWriter", e); // fall through to default case http://git-wip-us.apache.org/repos/asf/chukwa/blob/7f662e8c/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/SeqFileWriter.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/SeqFileWriter.java b/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/SeqFileWriter.java index 3c0d268..3803a2e 100644 --- a/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/SeqFileWriter.java +++ b/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/SeqFileWriter.java @@ -47,7 +47,7 @@ import org.apache.log4j.Logger; */ public class SeqFileWriter extends PipelineableWriter implements ChukwaWriter { static Logger log = Logger.getLogger(SeqFileWriter.class); - public static boolean ENABLE_ROTATION_ON_CLOSE = true; + private static boolean ENABLE_ROTATION_ON_CLOSE = true; protected int STAT_INTERVAL_SECONDS = 30; private int rotateInterval = 1000 * 60 * 5; @@ -60,7 +60,7 @@ public class SeqFileWriter extends PipelineableWriter implements ChukwaWriter { public static final String IF_FIXED_INTERVAL_OPT = "chukwaCollector.isFixedTimeRotatorScheme"; public static final String FIXED_INTERVAL_OFFSET_OPT = "chukwaCollector.fixedTimeIntervalOffset"; public static final String OUTPUT_DIR_OPT= "chukwaCollector.outputDir"; - protected static String localHostAddr = null; + public String localHostAddr = null; protected final Semaphore lock = new Semaphore(1, true); @@ -85,7 +85,7 @@ public class SeqFileWriter extends PipelineableWriter implements ChukwaWriter { protected volatile long bytesThisRotate = 0; protected volatile boolean isRunning = false; - static { + public SeqFileWriter() { try { localHostAddr = "_" + InetAddress.getLocalHost().getHostName() + "_"; } catch (UnknownHostException e) { @@ -93,8 +93,6 @@ public class SeqFileWriter extends PipelineableWriter implements ChukwaWriter { } } - public SeqFileWriter() {} - public long getBytesWritten() { return dataSize; } @@ -135,7 +133,7 @@ public class SeqFileWriter extends PipelineableWriter implements ChukwaWriter { try { fs = FileSystem.get(new URI(fsname), conf); if (fs == null) { - log.error("can't connect to HDFS at " + fs.getUri() + " bail out!"); + log.error("can't connect to HDFS."); } } catch (Throwable e) { log.error( @@ -324,49 +322,45 @@ public class SeqFileWriter extends PipelineableWriter implements ChukwaWriter { throw new WriterException("Collector not ready"); } - if (chunks != null) { - ChukwaArchiveKey archiveKey = new ChukwaArchiveKey(); - - if (System.currentTimeMillis() >= nextTimePeriodComputation) { - computeTimePeriod(); - } - try { - lock.acquire(); - for (Chunk chunk : chunks) { - archiveKey.setTimePartition(timePeriod); - archiveKey.setDataType(chunk.getDataType()); - archiveKey.setStreamName(chunk.getTags() + "/" + chunk.getSource() - + "/" + chunk.getStreamName()); - archiveKey.setSeqId(chunk.getSeqID()); - - if (chunk != null) { - seqFileWriter.append(archiveKey, chunk); - - // compute size for stats only if append succeeded. Note though that - // seqFileWriter.append can continue taking data for quite some time - // after HDFS goes down while the client is trying to reconnect. Hence - // these stats might not reflect reality during an HDFS outage. - dataSize += chunk.getData().length; - bytesThisRotate += chunk.getData().length; - - String futureName = currentPath.getName().replace(".chukwa", ".done"); - result.addPend(futureName, currentOutputStr.getPos()); - } + ChukwaArchiveKey archiveKey = new ChukwaArchiveKey(); + + if (System.currentTimeMillis() >= nextTimePeriodComputation) { + computeTimePeriod(); + } + try { + lock.acquire(); + for (Chunk chunk : chunks) { + archiveKey.setTimePartition(timePeriod); + archiveKey.setDataType(chunk.getDataType()); + archiveKey.setStreamName(chunk.getTags() + "/" + chunk.getSource() + + "/" + chunk.getStreamName()); + archiveKey.setSeqId(chunk.getSeqID()); + + seqFileWriter.append(archiveKey, chunk); + + // compute size for stats only if append succeeded. Note though that + // seqFileWriter.append can continue taking data for quite some time + // after HDFS goes down while the client is trying to reconnect. Hence + // these stats might not reflect reality during an HDFS outage. + dataSize += chunk.getData().length; + bytesThisRotate += chunk.getData().length; + + String futureName = currentPath.getName().replace(".chukwa", ".done"); + result.addPend(futureName, currentOutputStr.getPos()); - } - } - catch (IOException e) { - log.error("IOException when trying to write a chunk, Collector will return error and keep running.", e); - return COMMIT_FAIL; - } - catch (Throwable e) { - // We don't want to loose anything - log.fatal("IOException when trying to write a chunk, Collector is going to exit!", e); - isRunning = false; - } finally { - lock.release(); } } + catch (IOException e) { + log.error("IOException when trying to write a chunk, Collector will return error and keep running.", e); + return COMMIT_FAIL; + } + catch (Throwable e) { + // We don't want to loose anything + log.fatal("IOException when trying to write a chunk, Collector is going to exit!", e); + isRunning = false; + } finally { + lock.release(); + } return result; } @@ -405,5 +399,9 @@ public class SeqFileWriter extends PipelineableWriter implements ChukwaWriter { lock.release(); } } + + public static void setEnableRotationOnClose(boolean b) { + ENABLE_ROTATION_ON_CLOSE = b; + } } http://git-wip-us.apache.org/repos/asf/chukwa/blob/7f662e8c/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/SocketTeeWriter.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/SocketTeeWriter.java b/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/SocketTeeWriter.java index 0249b4f..88ec861 100644 --- a/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/SocketTeeWriter.java +++ b/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/SocketTeeWriter.java @@ -20,14 +20,18 @@ package org.apache.hadoop.chukwa.datacollection.writer; import java.util.*; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ArrayBlockingQueue; + import org.apache.hadoop.chukwa.Chunk; import org.apache.hadoop.chukwa.util.Filter; import org.apache.hadoop.chukwa.util.RegexUtil.CheckedPatternSyntaxException; import org.apache.hadoop.conf.Configuration; import org.apache.log4j.Logger; + import java.net.ServerSocket; import java.net.Socket; +import java.nio.charset.Charset; import java.io.*; + import org.apache.hadoop.chukwa.util.ExceptionUtil; /** @@ -145,7 +149,7 @@ public class SocketTeeWriter extends PipelineableWriter { else { byte[] data = c.getData(); byte[] header = (c.getSource()+ " " + c.getDataType() + " " + c.getStreamName()+ " "+ - c.getSeqID()+"\n").getBytes(); + c.getSeqID()+"\n").getBytes(Charset.forName("UTF-8")); out.writeInt(data.length+ header.length); out.write(header); out.write(data); @@ -170,9 +174,12 @@ public class SocketTeeWriter extends PipelineableWriter { try { //inner try catches bad command syntax errors sock.setSoTimeout(timeout); sock.setKeepAlive(USE_KEEPALIVE); - in = new BufferedReader(new InputStreamReader(sock.getInputStream())); + in = new BufferedReader(new InputStreamReader(sock.getInputStream(), Charset.forName("UTF-8"))); out = new DataOutputStream(sock.getOutputStream()); String cmd = in.readLine(); + if(cmd==null) { + throw new IllegalArgumentException("No input found."); + } if(!cmd.contains(" ")) { throw new IllegalArgumentException( @@ -198,8 +205,8 @@ public class SocketTeeWriter extends PipelineableWriter { try { rules = new Filter(cmdAfterSpace); } catch (CheckedPatternSyntaxException pse) { - out.write("Error parsing command as a regex: ".getBytes()); - out.write(pse.getMessage().getBytes()); + out.write("Error parsing command as a regex: ".getBytes(Charset.forName("UTF-8"))); + out.write(pse.getMessage().getBytes(Charset.forName("UTF-8"))); out.writeByte('\n'); out.close(); in.close(); @@ -212,10 +219,10 @@ public class SocketTeeWriter extends PipelineableWriter { synchronized(tees) { tees.add(this); } - out.write("OK\n".getBytes()); + out.write("OK\n".getBytes(Charset.forName("UTF-8"))); log.info("tee to " + sock.getInetAddress() + " established"); } catch(IllegalArgumentException e) { - out.write(e.toString().getBytes()); + out.write(e.toString().getBytes(Charset.forName("UTF-8"))); out.writeByte('\n'); out.close(); in.close(); @@ -239,8 +246,11 @@ public class SocketTeeWriter extends PipelineableWriter { public void handle(Chunk c) { //don't ever block; just ignore this chunk if we don't have room for it. - if(rules.matches(c)) - sendQ.offer(c); + if(rules.matches(c)) { + if(!sendQ.offer(c)) { + log.debug("Queue is full."); + } + } } } @@ -249,7 +259,6 @@ public class SocketTeeWriter extends PipelineableWriter { SocketListenThread listenThread; List<Tee> tees; - ChukwaWriter next; @Override public void setNextStage(ChukwaWriter next) { http://git-wip-us.apache.org/repos/asf/chukwa/blob/7f662e8c/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/localfs/LocalToRemoteHdfsMover.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/localfs/LocalToRemoteHdfsMover.java b/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/localfs/LocalToRemoteHdfsMover.java index 02e7907..e0ffdc4 100644 --- a/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/localfs/LocalToRemoteHdfsMover.java +++ b/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/localfs/LocalToRemoteHdfsMover.java @@ -22,6 +22,7 @@ import java.io.FileNotFoundException; import java.net.URI; import java.util.concurrent.BlockingQueue; +import org.apache.hadoop.chukwa.datacollection.writer.WriterException; import org.apache.hadoop.chukwa.util.CopySequenceFile; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; @@ -85,8 +86,8 @@ public class LocalToRemoteHdfsMover extends Thread { remoteFs = FileSystem.get(new URI(fsname), conf); if (remoteFs == null && exitIfHDFSNotavailable) { - log.error("can't connect to HDFS at " + remoteFs.getUri() + " bail out!"); - System.exit(-1); + log.error("can't connect to HDFS."); + throw new WriterException("can't connect to HDFS."); } localFs = FileSystem.getLocal(conf); http://git-wip-us.apache.org/repos/asf/chukwa/blob/7f662e8c/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/localfs/LocalWriter.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/localfs/LocalWriter.java b/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/localfs/LocalWriter.java index bb0fdf6..14d9ab8 100644 --- a/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/localfs/LocalWriter.java +++ b/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/localfs/LocalWriter.java @@ -34,6 +34,7 @@ import org.apache.hadoop.chukwa.Chunk; import org.apache.hadoop.chukwa.ChunkImpl; import org.apache.hadoop.chukwa.datacollection.writer.ChukwaWriter; import org.apache.hadoop.chukwa.datacollection.writer.WriterException; +import org.apache.hadoop.chukwa.util.ExceptionUtil; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; @@ -148,7 +149,7 @@ public class LocalWriter implements ChukwaWriter { } } catch (Throwable e) { log.fatal("Cannot initialize LocalWriter", e); - System.exit(-1); + throw new WriterException(e); } @@ -184,7 +185,11 @@ public class LocalWriter implements ChukwaWriter { private class RotateTask extends TimerTask { public void run() { - rotate(); + try { + rotate(); + } catch(WriterException e) { + log.error(ExceptionUtil.getStackTrace(e)); + } }; } @@ -245,11 +250,9 @@ public class LocalWriter implements ChukwaWriter { + "/" + chunk.getStreamName()); archiveKey.setSeqId(chunk.getSeqID()); - if (chunk != null) { - seqFileWriter.append(archiveKey, chunk); - // compute size for stats - dataSize += chunk.getData().length; - } + seqFileWriter.append(archiveKey, chunk); + // compute size for stats + dataSize += chunk.getData().length; } }// End synchro long end = System.currentTimeMillis(); @@ -264,7 +267,6 @@ public class LocalWriter implements ChukwaWriter { if (writeChunkRetries < 0) { log .fatal("Too many IOException when trying to write a chunk, Collector is going to exit!"); - System.exit(-1); } throw new WriterException(e); } @@ -272,7 +274,7 @@ public class LocalWriter implements ChukwaWriter { return COMMIT_OK; } - protected void rotate() { + protected void rotate() throws WriterException { isRunning = true; calendar.setTimeInMillis(System.currentTimeMillis()); log.info("start Date [" + calendar.getTime() + "]"); @@ -316,10 +318,7 @@ public class LocalWriter implements ChukwaWriter { SequenceFile.CompressionType.NONE, null); } catch (IOException e) { - log.fatal("IO Exception in rotate. Exiting!", e); - // Shutting down the collector - // Watchdog will re-start it automatically - System.exit(-1); + log.fatal("IO Exception in rotate: ", e); } } @@ -336,8 +335,8 @@ public class LocalWriter implements ChukwaWriter { } if (freeSpace < minFreeAvailable) { - log.fatal("No space left on device, Bail out!"); - System.exit(-1); + log.fatal("No space left on device."); + throw new WriterException("No space left on device."); } log.debug("finished rotate()"); http://git-wip-us.apache.org/repos/asf/chukwa/blob/7f662e8c/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/solr/SolrWriter.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/solr/SolrWriter.java b/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/solr/SolrWriter.java index bf64b24..40a6ff0 100644 --- a/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/solr/SolrWriter.java +++ b/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/solr/SolrWriter.java @@ -19,12 +19,10 @@ package org.apache.hadoop.chukwa.datacollection.writer.solr; import java.io.IOException; import java.nio.charset.Charset; -import java.text.DateFormat; import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.Date; import java.util.List; -import java.util.TimeZone; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -106,6 +104,9 @@ public class SolrWriter extends PipelineableWriter { if(data.contains("mapredice")) { doc.addField(SERVICE, "mapreduce"); } + if(data.contains("hbase")) { + doc.addField(SERVICE, "hbase"); + } try { Date d = sdf.parse(data); doc.addField(DATE, d, 1.0f); http://git-wip-us.apache.org/repos/asf/chukwa/blob/7f662e8c/src/main/java/org/apache/hadoop/chukwa/dataloader/DataLoaderFactory.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/hadoop/chukwa/dataloader/DataLoaderFactory.java b/src/main/java/org/apache/hadoop/chukwa/dataloader/DataLoaderFactory.java index c09d1ee..3b8b946 100644 --- a/src/main/java/org/apache/hadoop/chukwa/dataloader/DataLoaderFactory.java +++ b/src/main/java/org/apache/hadoop/chukwa/dataloader/DataLoaderFactory.java @@ -19,14 +19,16 @@ package org.apache.hadoop.chukwa.dataloader; import java.io.IOException; +import java.util.Arrays; + import org.apache.hadoop.chukwa.conf.ChukwaConfiguration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; public abstract class DataLoaderFactory { - static ChukwaConfiguration conf = null; - static FileSystem fs = null; + ChukwaConfiguration conf = null; + FileSystem fs = null; protected FileStatus[] source = null; public DataLoaderFactory() { @@ -37,9 +39,20 @@ public abstract class DataLoaderFactory { * @throws IOException */ public void load(ChukwaConfiguration conf, FileSystem fs, FileStatus[] src) throws IOException { - this.source=src; + this.source=src.clone(); this.conf=conf; this.fs=fs; } + public FileStatus[] getSource() { + return Arrays.copyOf(source, source.length); + } + + protected FileSystem getFileSystem() { + return fs; + } + + protected ChukwaConfiguration getConf() { + return conf; + } } http://git-wip-us.apache.org/repos/asf/chukwa/blob/7f662e8c/src/main/java/org/apache/hadoop/chukwa/dataloader/FSMDataLoader.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/hadoop/chukwa/dataloader/FSMDataLoader.java b/src/main/java/org/apache/hadoop/chukwa/dataloader/FSMDataLoader.java index 336a09b..009dd2b 100644 --- a/src/main/java/org/apache/hadoop/chukwa/dataloader/FSMDataLoader.java +++ b/src/main/java/org/apache/hadoop/chukwa/dataloader/FSMDataLoader.java @@ -43,8 +43,8 @@ public class FSMDataLoader extends DataLoaderFactory { protected MetricDataLoader threads[] = null; private static String DATA_LOADER_THREAD_LIMIT = "chukwa.data.loader.threads.limit"; private int size = 1; - private static CompletionService completion = null; - private static ExecutorService executor = null; + private CompletionService completion = null; + private ExecutorService executor = null; private static String[] mappers = { "org.apache.hadoop.chukwa.analysis.salsa.fsm.DataNodeClientTraceMapper", "org.apache.hadoop.chukwa.analysis.salsa.fsm.TaskTrackerClientTraceMapper",
