Author: cdouglas
Date: Tue Feb 10 00:15:34 2009
New Revision: 742797
URL: http://svn.apache.org/viewvc?rev=742797&view=rev
Log:
HADOOP-5018. Add pipelined writers to Chukwa. Contributed by Ari Rabkin
Added:
hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/writer/Dedup.java
hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/writer/PipelineableWriter.java
hadoop/core/trunk/src/contrib/chukwa/src/test/org/apache/hadoop/chukwa/datacollection/collector/
hadoop/core/trunk/src/contrib/chukwa/src/test/org/apache/hadoop/chukwa/datacollection/collector/CaptureWriter.java
hadoop/core/trunk/src/contrib/chukwa/src/test/org/apache/hadoop/chukwa/datacollection/collector/CollectorTest.java
Modified:
hadoop/core/trunk/CHANGES.txt
hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/collector/CollectorStub.java
hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/collector/servlet/ServletCollector.java
hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/test/FileTailerStressTest.java
hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/writer/ChukwaWriter.java
hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/writer/ConsoleWriter.java
hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/writer/InMemoryWriter.java
hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/writer/PipelineStageWriter.java
hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/writer/SeqFileWriter.java
Modified: hadoop/core/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=742797&r1=742796&r2=742797&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Tue Feb 10 00:15:34 2009
@@ -38,6 +38,8 @@
HADOOP-3741. Add a web ui to the SecondaryNameNode for showing its status.
(szetszwo)
+ HADOOP-5018. Add pipelined writers to Chukwa. (Ari Rabkin via cdouglas)
+
IMPROVEMENTS
HADOOP-4565. Added CombineFileInputFormat to use data locality information
Modified:
hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/collector/CollectorStub.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/collector/CollectorStub.java?rev=742797&r1=742796&r2=742797&view=diff
==============================================================================
---
hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/collector/CollectorStub.java
(original)
+++
hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/collector/CollectorStub.java
Tue Feb 10 00:15:34 2009
@@ -48,27 +48,26 @@
portNum = Integer.parseInt(args[0]);
//pick a writer.
+ ChukwaWriter w = null;
if(args.length > 1) {
if(args[1].equals("pretend"))
- ServletCollector.setWriter(new ConsoleWriter(true));
+ w= new ConsoleWriter(true);
else if(args[1].equals("pretend-quietly"))
- ServletCollector.setWriter(new ConsoleWriter(false));
+ w = new ConsoleWriter(false);
else if(args[1].equals("-classname")) {
if(args.length < 3)
System.err.println("need to specify a writer class");
else {
- Class<?> writerClass = Class.forName(args[2]);
- if(writerClass != null &&
- ChukwaWriter.class.isAssignableFrom(writerClass))
- ServletCollector.setWriter(
- (ChukwaWriter) writerClass.newInstance());
- else
- System.err.println(args[2]+ "is not a ChukwaWriter");
+ conf.set("chukwaCollector.writerClass", args[2]);
}
}
else
System.out.println("WARNING: unknown command line arg "+ args[1]);
}
+ if(w != null) {
+ w.init(conf);
+ ServletCollector.setWriter(w);
+ }
//set up jetty connector
SelectChannelConnector jettyConnector = new SelectChannelConnector();
@@ -85,7 +84,7 @@
jettyServer.setThreadPool(pool);
//and add the servlet to it
Context root = new Context(jettyServer,"/",Context.SESSIONS);
- root.addServlet(new ServletHolder(new ServletCollector()), "/*");
+ root.addServlet(new ServletHolder(new ServletCollector(conf)), "/*");
jettyServer.start();
jettyServer.setStopAtShutdown(false);
Modified:
hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/collector/servlet/ServletCollector.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/collector/servlet/ServletCollector.java?rev=742797&r1=742796&r2=742797&view=diff
==============================================================================
---
hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/collector/servlet/ServletCollector.java
(original)
+++
hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/collector/servlet/ServletCollector.java
Tue Feb 10 00:15:34 2009
@@ -21,10 +21,7 @@
import java.io.DataInputStream;
import java.io.IOException;
import java.io.PrintStream;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Timer;
-import java.util.TimerTask;
+import java.util.*;
import javax.servlet.ServletConfig;
import javax.servlet.ServletException;
@@ -33,10 +30,11 @@
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
+
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.chukwa.Chunk;
import org.apache.hadoop.chukwa.ChunkImpl;
-import org.apache.hadoop.chukwa.datacollection.writer.SeqFileWriter;
-import org.apache.hadoop.chukwa.datacollection.writer.WriterException;
+import org.apache.hadoop.chukwa.datacollection.writer.*;
import org.apache.log4j.Logger;
public class ServletCollector extends HttpServlet
@@ -51,12 +49,18 @@
public static void
setWriter(org.apache.hadoop.chukwa.datacollection.writer.ChukwaWriter w) throws
WriterException
{
writer = w;
- w.init();
}
static long statTime = 0L;
static int numberHTTPConnection = 0;
static int numberchunks = 0;
+ Configuration conf;
+
+ public ServletCollector(Configuration c) {
+ conf =c;
+ }
+
+
public void init(ServletConfig servletConf) throws ServletException
{
@@ -66,7 +70,6 @@
return;
}
-
Timer statTimer = new Timer();
statTimer.schedule(new TimerTask()
{
@@ -80,31 +83,26 @@
}
}, (1000), (60*1000));
- try
- {
- // read the application->pipeline settings from a
config file in the format:
- // appliation_name: PipelinedWriter1, PipelinedWriter2,
Writer
- // use reflection to set up the pipeline after reading
in the list of writers from the config file
-
- /*
- String strPipelines =
"HadoopLogs:HdfsWriter\nApplication2:SameerWriter:HdfsWriter";
- String[] pipelines = strPipelines.split("\n");
- // split into pipes
- for (String pipe : pipelines){
- String[] tmp = pipe.split(":");
- String app = tmp[0];
- String[] stages = tmp[1].split(",");
-
- //loop through pipes, creating linked list of
stages per pipe, one at a time
- for (String stage : stages){
- Class curr =
ClassLoader.loadClass(stage);
- }
- }
- */
- //FIXME: seems weird to initialize a static object here
- if (writer == null)
- writer = new SeqFileWriter();
-
+ if(writer != null) {
+ log.info("writer set up statically, no need for
Collector.init() to do it");
+ return;
+ }
+
+ try {
+ String writerClassName = conf.get("chukwaCollector.writerClass",
+ SeqFileWriter.class.getCanonicalName());
+ Class<?> writerClass = Class.forName(writerClassName);
+ if(writerClass != null
&&ChukwaWriter.class.isAssignableFrom(writerClass))
+ writer = (ChukwaWriter) writerClass.newInstance();
+ } catch(Exception e) {
+ log.warn("failed to use user-chosen writer class, defaulting
to SeqFileWriter", e);
+ }
+
+ //We default to here if the pipeline construction failed or didn't happen.
+ try{
+ if(writer == null)
+ writer = new SeqFileWriter();//default to SeqFileWriter
+ writer.init(conf);
} catch (WriterException e) {
throw new ServletException("Problem init-ing servlet",
e);
}
Modified:
hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/test/FileTailerStressTest.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/test/FileTailerStressTest.java?rev=742797&r1=742796&r2=742797&view=diff
==============================================================================
---
hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/test/FileTailerStressTest.java
(original)
+++
hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/test/FileTailerStressTest.java
Tue Feb 10 00:15:34 2009
@@ -73,7 +73,7 @@
Context root = new Context(server,"/",Context.SESSIONS);
ServletCollector.setWriter(new ConsoleWriter(true));
- root.addServlet(new ServletHolder(new ServletCollector()), "/*");
+ root.addServlet(new ServletHolder(new ServletCollector(new
ChukwaConfiguration(true))), "/*");
server.start();
server.setStopAtShutdown(false);
Modified:
hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/writer/ChukwaWriter.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/writer/ChukwaWriter.java?rev=742797&r1=742796&r2=742797&view=diff
==============================================================================
---
hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/writer/ChukwaWriter.java
(original)
+++
hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/writer/ChukwaWriter.java
Tue Feb 10 00:15:34 2009
@@ -21,10 +21,11 @@
import java.util.List;
import org.apache.hadoop.chukwa.Chunk;
+import org.apache.hadoop.conf.Configuration;
public interface ChukwaWriter
{
- public void init() throws WriterException;
+ public void init(Configuration c) throws WriterException;
public void add(Chunk data) throws WriterException;
public void add(List<Chunk> chunks) throws WriterException;
public void close() throws WriterException;;
Modified:
hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/writer/ConsoleWriter.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/writer/ConsoleWriter.java?rev=742797&r1=742796&r2=742797&view=diff
==============================================================================
---
hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/writer/ConsoleWriter.java
(original)
+++
hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/writer/ConsoleWriter.java
Tue Feb 10 00:15:34 2009
@@ -23,6 +23,7 @@
import java.util.TimerTask;
import org.apache.hadoop.chukwa.Chunk;
+import org.apache.hadoop.conf.Configuration;
public class ConsoleWriter implements ChukwaWriter {
@@ -48,6 +49,10 @@
}
};
+
+ public ConsoleWriter() {
+ this(true);
+ }
public ConsoleWriter(boolean printData) {
this.printData = printData;
@@ -59,7 +64,7 @@
statTimer.cancel();
}
- public void init() throws WriterException
+ public void init(Configuration conf) throws WriterException
{
System.out.println("---- DUMMY HDFS WRITER IN USE ---");
@@ -73,7 +78,7 @@
dataSize += data.getData().length;
if(printData) {
System.out.println(data.getData().length + " bytes of data in chunk");
-
+
for(int offset: data.getRecordOffsets()) {
System.out.print(data.getStreamName());
System.out.print(" ");
Added:
hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/writer/Dedup.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/writer/Dedup.java?rev=742797&view=auto
==============================================================================
---
hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/writer/Dedup.java
(added)
+++
hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/writer/Dedup.java
Tue Feb 10 00:15:34 2009
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.chukwa.datacollection.writer;
+
+import java.util.ArrayDeque;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Queue;
+
+import org.apache.hadoop.chukwa.Chunk;
+import org.apache.hadoop.conf.Configuration;
+
+public class Dedup implements PipelineableWriter {
+
+ static final class DedupKey {
+ String name;
+ long val; //sequence number
+
+ public DedupKey(String n, long p) {
+ name = n;
+ val = p;
+ }
+
+ public int hashCode() {
+ return (int) (name.hashCode() ^ val ^ (val >> 32));
+ }
+
+ public boolean equals(Object dk) {
+ if(dk instanceof DedupKey)
+ return name.equals(((DedupKey)dk).name) && val == ((DedupKey)dk).val;
+ else return false;
+ }
+ }
+
+ static class FixedSizeCache<EntryType> {
+ final HashSet<EntryType> hs;
+ final Queue<EntryType> toDrop;
+ final int maxSize;
+ volatile long dupchunks =0;
+ public FixedSizeCache(int size) {
+ maxSize = size;
+ hs = new HashSet<EntryType>(maxSize);
+ toDrop = new ArrayDeque<EntryType>(maxSize);
+ }
+
+ public synchronized void add(EntryType t) {
+ if(maxSize == 0)
+ return;
+
+ if(hs.size() >= maxSize)
+ while(hs.size() >= maxSize) {
+ EntryType td = toDrop.remove();
+ hs.remove(td);
+ }
+
+ hs.add(t);
+ toDrop.add(t);
+ }
+
+ private synchronized boolean addAndCheck(EntryType t) {
+ if(maxSize == 0)
+ return false;
+
+ boolean b= hs.contains(t);
+ if(b)
+ dupchunks++;
+ else {
+ hs.add(t);
+ toDrop.add(t);
+ }
+ return b;
+ }
+
+ private long dupCount() {
+ return dupchunks;
+ }
+ }
+
+
+ FixedSizeCache<DedupKey> cache;
+ ChukwaWriter next;
+
+ @Override
+ public void setNextStage(ChukwaWriter next) {
+ this.next = next;
+ }
+
+ @Override
+ public void add(Chunk data) throws WriterException {
+ if(! cache.addAndCheck(new DedupKey(data.getStreamName(),
data.getSeqID())))
+ next.add(data);
+ }
+
+ @Override
+ public void add(List<Chunk> chunks) throws WriterException {
+ for(Chunk c: chunks)
+ add(c);
+
+ }
+
+ @Override
+ public void close() throws WriterException {
+ next.close();
+ }
+
+ @Override
+ public void init(Configuration c) throws WriterException {
+ int csize = c.getInt("chukwaCollector.chunkSuppressBufferSize", 0);
+ cache = new FixedSizeCache<DedupKey>(csize);
+ }
+
+}
Modified:
hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/writer/InMemoryWriter.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/writer/InMemoryWriter.java?rev=742797&r1=742796&r2=742797&view=diff
==============================================================================
---
hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/writer/InMemoryWriter.java
(original)
+++
hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/writer/InMemoryWriter.java
Tue Feb 10 00:15:34 2009
@@ -22,6 +22,7 @@
import org.apache.hadoop.chukwa.Chunk;
import org.apache.hadoop.chukwa.ChunkImpl;
+import org.apache.hadoop.conf.Configuration;
public class InMemoryWriter implements ChukwaWriter {
@@ -31,7 +32,7 @@
buf.reset();
}
- public void init() throws WriterException {
+ public void init(Configuration conf) throws WriterException {
buf = new ByteArrayOutputStream();
}
Modified:
hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/writer/PipelineStageWriter.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/writer/PipelineStageWriter.java?rev=742797&r1=742796&r2=742797&view=diff
==============================================================================
---
hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/writer/PipelineStageWriter.java
(original)
+++
hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/writer/PipelineStageWriter.java
Tue Feb 10 00:15:34 2009
@@ -18,6 +18,84 @@
package org.apache.hadoop.chukwa.datacollection.writer;
-public interface PipelineStageWriter extends ChukwaWriter{
- public void setNextStage(ChukwaWriter next);
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.chukwa.Chunk;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.log4j.Logger;
+
+public class PipelineStageWriter implements ChukwaWriter {
+ Logger log = Logger.getLogger(PipelineStageWriter.class);
+
+ ChukwaWriter writer; //head of pipeline
+
+ @Override
+ public void add(Chunk data) throws WriterException {
+ writer.add(data);
+
+ }
+
+ @Override
+ public void add(List<Chunk> chunks) throws WriterException {
+ writer.add(chunks);
+ }
+
+ @Override
+ public void close() throws WriterException {
+ writer.close();
+ }
+
+ @Override
+ public void init(Configuration conf) throws WriterException {
+ if (conf.get("chukwaCollector.pipeline") != null) {
+ String pipeline = conf.get("chukwaCollector.pipeline");
+ try {
+ String[] classes = pipeline.split(",");
+ ArrayList<PipelineableWriter> stages = new
ArrayList<PipelineableWriter>();
+
+ PipelineableWriter lastWriter= null;
+ if(classes.length > 1) {
+ lastWriter = (PipelineableWriter)
conf.getClassByName(classes[0]).newInstance();
+ lastWriter.init(conf);
+ writer = lastWriter;
+ }
+
+ for(int i = 1; i < classes.length -1; ++i) {
+ Class stageClass = conf.getClassByName(classes[i]);
+ Object st = stageClass.newInstance();
+ if(!(st instanceof PipelineableWriter))
+ log.error("class "+ classes[i]+ " in processing pipeline isn't a
pipeline stage");
+
+ PipelineableWriter stage = (PipelineableWriter)
stageClass.newInstance();
+ stage.init(conf);
+ //throws exception if types don't match or class not found; this is
OK.
+
+ lastWriter.setNextStage(stage);
+ lastWriter = stage;
+ }
+ Class stageClass = conf.getClassByName(classes[classes.length-1]);
+ Object st = stageClass.newInstance();
+
+ if(!(st instanceof ChukwaWriter)) {
+ log.error("class "+ classes[classes.length-1]+ " at end of
processing pipeline isn't a ChukwaWriter");
+ throw new WriterException("bad pipeline");
+ } else {
+ if(lastWriter != null)
+ lastWriter.setNextStage((ChukwaWriter) st);
+ else
+ writer = (ChukwaWriter) st; //one stage pipeline
+ }
+ return;
+ } catch(Exception e) {
+ //if anything went wrong (missing class, etc) we wind up here.
+ log.error("failed to set up pipeline, defaulting to
SeqFileWriter",e);
+ //fall through to default case
+ throw new WriterException("bad pipeline");
+ }
+ } else {
+ throw new WriterException("must set chukwaCollector.pipeline");
+ }
+ }
+
}
Added:
hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/writer/PipelineableWriter.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/writer/PipelineableWriter.java?rev=742797&view=auto
==============================================================================
---
hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/writer/PipelineableWriter.java
(added)
+++
hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/writer/PipelineableWriter.java
Tue Feb 10 00:15:34 2009
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.chukwa.datacollection.writer;
+
+public interface PipelineableWriter extends ChukwaWriter{
+ public void setNextStage(ChukwaWriter next);
+}
Modified:
hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/writer/SeqFileWriter.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/writer/SeqFileWriter.java?rev=742797&r1=742796&r2=742797&view=diff
==============================================================================
---
hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/writer/SeqFileWriter.java
(original)
+++
hadoop/core/trunk/src/contrib/chukwa/src/java/org/apache/hadoop/chukwa/datacollection/writer/SeqFileWriter.java
Tue Feb 10 00:15:34 2009
@@ -29,7 +29,7 @@
import org.apache.hadoop.chukwa.ChukwaArchiveKey;
import org.apache.hadoop.chukwa.Chunk;
import org.apache.hadoop.chukwa.ChunkImpl;
-import org.apache.hadoop.chukwa.conf.ChukwaConfiguration;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -51,7 +51,7 @@
static Logger log = Logger.getLogger(SeqFileWriter.class);
private FileSystem fs = null;
- private ChukwaConfiguration conf = null;
+ private Configuration conf = null;
private String outputDir = null;
private Calendar calendar = Calendar.getInstance();
@@ -78,11 +78,9 @@
public SeqFileWriter() throws WriterException
{
- conf = new ChukwaConfiguration(true);
- init();
}
- public void init() throws WriterException
+ public void init(Configuration conf) throws WriterException
{
outputDir = conf.get("chukwaCollector.outputDir", "/chukwa");
Added:
hadoop/core/trunk/src/contrib/chukwa/src/test/org/apache/hadoop/chukwa/datacollection/collector/CaptureWriter.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/test/org/apache/hadoop/chukwa/datacollection/collector/CaptureWriter.java?rev=742797&view=auto
==============================================================================
---
hadoop/core/trunk/src/contrib/chukwa/src/test/org/apache/hadoop/chukwa/datacollection/collector/CaptureWriter.java
(added)
+++
hadoop/core/trunk/src/contrib/chukwa/src/test/org/apache/hadoop/chukwa/datacollection/collector/CaptureWriter.java
Tue Feb 10 00:15:34 2009
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.chukwa.datacollection.collector;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.chukwa.Chunk;
+import org.apache.hadoop.chukwa.datacollection.writer.ChukwaWriter;
+import org.apache.hadoop.chukwa.datacollection.writer.WriterException;
+import org.apache.hadoop.conf.Configuration;
+
+public class CaptureWriter implements ChukwaWriter {
+ static ArrayList<Chunk> outputs = new ArrayList<Chunk>();
+
+ @Override
+ public void add(Chunk data) throws WriterException {
+ synchronized(outputs) {
+ outputs.add(data);
+ }
+
+ }
+
+ @Override
+ public void add(List<Chunk> chunks) throws WriterException {
+ for(Chunk c: chunks)
+ add(c);
+ }
+
+ @Override
+ public void close() throws WriterException { }
+
+ @Override
+ public void init(Configuration c) throws WriterException { }
+
+}
+
Added:
hadoop/core/trunk/src/contrib/chukwa/src/test/org/apache/hadoop/chukwa/datacollection/collector/CollectorTest.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/chukwa/src/test/org/apache/hadoop/chukwa/datacollection/collector/CollectorTest.java?rev=742797&view=auto
==============================================================================
---
hadoop/core/trunk/src/contrib/chukwa/src/test/org/apache/hadoop/chukwa/datacollection/collector/CollectorTest.java
(added)
+++
hadoop/core/trunk/src/contrib/chukwa/src/test/org/apache/hadoop/chukwa/datacollection/collector/CollectorTest.java
Tue Feb 10 00:15:34 2009
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.chukwa.datacollection.collector;
+
+import junit.framework.TestCase;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.chukwa.conf.ChukwaConfiguration;
+import org.apache.hadoop.chukwa.*;
+import
org.apache.hadoop.chukwa.datacollection.collector.servlet.ServletCollector;
+import org.apache.hadoop.chukwa.datacollection.sender.*;
+import org.apache.hadoop.chukwa.datacollection.writer.*;
+
+import java.util.*;
+
+import org.mortbay.jetty.Server;
+import org.mortbay.jetty.servlet.Context;
+import org.mortbay.jetty.servlet.ServletHolder;
+
+public class CollectorTest extends TestCase {
+
+
+ public void testCollector() {
+ try {
+ Configuration conf = new Configuration();
+ conf.set("chukwaCollector.chunkSuppressBufferSize", "10");
+ conf.set("chukwaCollector.pipeline",
+ "org.apache.hadoop.chukwa.datacollection.writer.Dedup,"//note comma
+ +
"org.apache.hadoop.chukwa.datacollection.collector.CaptureWriter");
+ conf.set("chukwaCollector.writerClass",
PipelineStageWriter.class.getCanonicalName());
+ ChukwaHttpSender sender = new ChukwaHttpSender(conf);
+ ArrayList<String> collectorList = new ArrayList<String>();
+ collectorList.add("http://localhost:9990/chukwa");
+ sender.setCollectors(new RetryListOfCollectors(collectorList, 50));
+ Server server = new Server(9990);
+ Context root = new Context(server,"/",Context.SESSIONS);
+
+ root.addServlet(new ServletHolder(new ServletCollector(conf)), "/*");
+ server.start();
+ server.setStopAtShutdown(false);
+ Thread.sleep(1000);
+
+ Chunk c = new ChunkImpl("data", "stream", 0, "testing -- this should
appear once".getBytes(), null);
+ ArrayList<Chunk> toSend = new ArrayList<Chunk>();
+ toSend.add(c);
+ toSend.add(c);
+ sender.send(toSend);
+ Thread.sleep(1000);
+ assertEquals(1, CaptureWriter.outputs.size());
+ } catch(Exception e) {
+ fail(e.toString());
+ }
+
+ }
+
+}