Right now we are setting the destination cluster by setting the tags
field of the record, and then in RecordUtil by applying a regular
expression, that may be slow specially if we process tons of records.

I would suggest to include the destination cluster in the key object.
I think makes more sense, it's an easy change, plus we don't fill
chukwa records with unneeded data. I attach a tentative patch that
will do this. It's much easier to control now the destination cluster
from the mapper, plus the record object is much cleaner now. And the
regular expression is done only once per chunk, not once per record,
potentially increasing speed.

What do you think? Should I open a ticket or this has no sense?

-- 
Guille -ℬḭṩḩø- <bi...@tuenti.com>
:wq
Index: src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/AbstractProcessor.java
===================================================================
--- src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/AbstractProcessor.java	(revision 918900)
+++ src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/AbstractProcessor.java	(working copy)
@@ -20,29 +20,31 @@
 
 
 import java.util.Calendar;
 import org.apache.hadoop.chukwa.ChukwaArchiveKey;
 import org.apache.hadoop.chukwa.Chunk;
 import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
 import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
 import org.apache.hadoop.chukwa.extraction.engine.Record;
+import org.apache.hadoop.chukwa.extraction.engine.RecordUtil;
 import org.apache.hadoop.chukwa.util.RecordConstants;
 import org.apache.hadoop.mapred.OutputCollector;
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.log4j.Logger;
 
 public abstract class AbstractProcessor implements MapProcessor {
   static Logger log = Logger.getLogger(AbstractProcessor.class);
 
   Calendar calendar = Calendar.getInstance();
   byte[] bytes;
   int[] recordOffsets;
   int currentPos = 0;
   int startOffset = 0;
+  protected String chunkClusterName;
 
   protected ChukwaArchiveKey archiveKey = null;
   protected ChukwaRecordKey key = new ChukwaRecordKey();
   protected Chunk chunk = null;
 
   boolean chunkInErrorSaved = false;
   OutputCollector<ChukwaRecordKey, ChukwaRecord> output = null;
   Reporter reporter = null;
@@ -90,32 +92,35 @@
     calendar.setTimeInMillis(timestamp);
     calendar.set(Calendar.MINUTE, 0);
     calendar.set(Calendar.SECOND, 0);
     calendar.set(Calendar.MILLISECOND, 0);
 
     key.setKey("" + calendar.getTimeInMillis() + "/" + chunk.getSource() + "/"
         + timestamp);
     key.setReduceType(dataSource);
+    key.setClusterName(chunkClusterName);
 
     if (body != null) {
       record.add(Record.bodyField, body);
     }
     record.setTime(timestamp);
 
-    record.add(Record.tagsField, chunk.getTags());
+    // Completely remove tagsField, or is needed for other applications?
+    //record.add(Record.tagsField, chunk.getTags());
     record.add(Record.sourceField, chunk.getSource());
     record.add(Record.applicationField, chunk.getStreamName());
 
   }
 
   protected void reset(Chunk chunk) {
     this.chunk = chunk;
     this.bytes = chunk.getData();
     this.recordOffsets = chunk.getRecordOffsets();
+    this.chunkClusterName = RecordUtil.getClusterName(chunk);
     currentPos = 0;
     startOffset = 0;
   }
 
   protected boolean hasNext() {
     return (currentPos < recordOffsets.length);
   }
 
Index: src/java/org/apache/hadoop/chukwa/extraction/demux/ChukwaRecordOutputFormat.java
===================================================================
--- src/java/org/apache/hadoop/chukwa/extraction/demux/ChukwaRecordOutputFormat.java	(revision 918900)
+++ src/java/org/apache/hadoop/chukwa/extraction/demux/ChukwaRecordOutputFormat.java	(working copy)
@@ -28,17 +28,17 @@
 
 public class ChukwaRecordOutputFormat extends
     MultipleSequenceFileOutputFormat<ChukwaRecordKey, ChukwaRecord> {
   static Logger log = Logger.getLogger(ChukwaRecordOutputFormat.class);
 
   @Override
   protected String generateFileNameForKeyValue(ChukwaRecordKey key,
       ChukwaRecord record, String name) {
-    String output = RecordUtil.getClusterName(record) + "/"
+    String output = key.getClusterName() + "/"
         + key.getReduceType() + "/" + key.getReduceType()
         + Util.generateTimeOutput(record.getTime());
 
     // {log.info("ChukwaOutputFormat.fileName: [" + output +"]");}
 
     return output;
   }
 }
Index: src/java/org/apache/hadoop/chukwa/extraction/engine/ChukwaRecordKey.java
===================================================================
--- src/java/org/apache/hadoop/chukwa/extraction/engine/ChukwaRecordKey.java	(revision 918900)
+++ src/java/org/apache/hadoop/chukwa/extraction/engine/ChukwaRecordKey.java	(working copy)
@@ -12,16 +12,17 @@
     _rio_recTypeInfo.addField("reduceType",
         org.apache.hadoop.record.meta.TypeID.StringTypeID);
     _rio_recTypeInfo.addField("key",
         org.apache.hadoop.record.meta.TypeID.StringTypeID);
   }
 
   private String reduceType;
   private String key;
+  private String clusterName="undefined";
 
   public ChukwaRecordKey() {
   }
 
   public ChukwaRecordKey(final String reduceType, final String key) {
     this.reduceType = reduceType;
     this.key = key;
   }
@@ -73,16 +74,24 @@
   public String getReduceType() {
     return reduceType;
   }
 
   public void setReduceType(final String reduceType) {
     this.reduceType = reduceType;
   }
 
+  public String getClusterName() {
+    return clusterName;
+  }
+
+  public void setClusterName(final String clusterName) {
+    this.clusterName = clusterName;
+  }
+
   public String getKey() {
     return key;
   }
 
   public void setKey(final String key) {
     this.key = key;
   }
 
Index: src/java/org/apache/hadoop/chukwa/extraction/engine/RecordUtil.java
===================================================================
--- src/java/org/apache/hadoop/chukwa/extraction/engine/RecordUtil.java	(revision 918900)
+++ src/java/org/apache/hadoop/chukwa/extraction/engine/RecordUtil.java	(working copy)
@@ -26,27 +26,16 @@
 /**
  * Various utility methods.
  * 
  */
 public class RecordUtil {
   static Pattern clusterPattern = Pattern
       .compile("(.*)?cluster=\"(.*?)\"(.*)?");
 
-  public static String getClusterName(Record record) {
-    String tags = record.getValue(Record.tagsField);
-    if (tags != null) {
-      Matcher matcher = clusterPattern.matcher(tags);
-      if (matcher.matches()) {
-        return matcher.group(2);
-      }
-    }
-
-    return "undefined";
-  }
   /**
    * Uses a precompiled pattern, so theoretically faster than
    * Chunk.getTag().
    * 
    */
   public static String getClusterName(Chunk chunk) {
     String tags = chunk.getTags();
     if (tags != null) {
Index: src/java/org/apache/hadoop/chukwa/dataloader/MetricDataLoader.java
===================================================================
--- src/java/org/apache/hadoop/chukwa/dataloader/MetricDataLoader.java	(revision 918900)
+++ src/java/org/apache/hadoop/chukwa/dataloader/MetricDataLoader.java	(working copy)
@@ -193,17 +193,17 @@
     int numOfRecords = 0;
     try {
       Pattern p = Pattern.compile("(.*)\\-(\\d+)$");
       int batch = 0;
       while (reader.next(key, record)) {
     	numOfRecords++;
         if(first) { 
           try {
-            cluster = RecordUtil.getClusterName(record);
+            cluster = key.getClusterName();
             initEnv(cluster);
             first=false;
           } catch(Exception ex) {
             log.error("Initialization failed for: "+cluster+".  Please check jdbc configuration.");
             return false;
           }
         }
         String sqlTime = DatabaseWriter.formatTimeStamp(record.getTime());

Reply via email to