Wops, the serialization was missing the new field in the
ChukwaRecordKey. Now the fixed patch works fine in my environment.

-- 
Guille -ℬḭṩḩø- <bi...@tuenti.com>
:wq
Index: src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/AbstractProcessor.java
===================================================================
--- src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/AbstractProcessor.java	(revision 918900)
+++ src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/AbstractProcessor.java	(working copy)
@@ -20,29 +20,31 @@
 
 
 import java.util.Calendar;
 import org.apache.hadoop.chukwa.ChukwaArchiveKey;
 import org.apache.hadoop.chukwa.Chunk;
 import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
 import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
 import org.apache.hadoop.chukwa.extraction.engine.Record;
+import org.apache.hadoop.chukwa.extraction.engine.RecordUtil;
 import org.apache.hadoop.chukwa.util.RecordConstants;
 import org.apache.hadoop.mapred.OutputCollector;
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.log4j.Logger;
 
 public abstract class AbstractProcessor implements MapProcessor {
   static Logger log = Logger.getLogger(AbstractProcessor.class);
 
   Calendar calendar = Calendar.getInstance();
   byte[] bytes;
   int[] recordOffsets;
   int currentPos = 0;
   int startOffset = 0;
+  protected String chunkClusterName;
 
   protected ChukwaArchiveKey archiveKey = null;
   protected ChukwaRecordKey key = new ChukwaRecordKey();
   protected Chunk chunk = null;
 
   boolean chunkInErrorSaved = false;
   OutputCollector<ChukwaRecordKey, ChukwaRecord> output = null;
   Reporter reporter = null;
@@ -90,32 +92,35 @@
     calendar.setTimeInMillis(timestamp);
     calendar.set(Calendar.MINUTE, 0);
     calendar.set(Calendar.SECOND, 0);
     calendar.set(Calendar.MILLISECOND, 0);
 
     key.setKey("" + calendar.getTimeInMillis() + "/" + chunk.getSource() + "/"
         + timestamp);
     key.setReduceType(dataSource);
+    key.setClusterName(chunkClusterName);
 
     if (body != null) {
       record.add(Record.bodyField, body);
     }
     record.setTime(timestamp);
 
-    record.add(Record.tagsField, chunk.getTags());
+    // Completely remove tagsField, or is needed for other applications?
+    //record.add(Record.tagsField, chunk.getTags());
     record.add(Record.sourceField, chunk.getSource());
     record.add(Record.applicationField, chunk.getStreamName());
 
   }
 
   protected void reset(Chunk chunk) {
     this.chunk = chunk;
     this.bytes = chunk.getData();
     this.recordOffsets = chunk.getRecordOffsets();
+    this.chunkClusterName = RecordUtil.getClusterName(chunk);
     currentPos = 0;
     startOffset = 0;
   }
 
   protected boolean hasNext() {
     return (currentPos < recordOffsets.length);
   }
 
Index: src/java/org/apache/hadoop/chukwa/extraction/demux/ChukwaRecordOutputFormat.java
===================================================================
--- src/java/org/apache/hadoop/chukwa/extraction/demux/ChukwaRecordOutputFormat.java	(revision 918900)
+++ src/java/org/apache/hadoop/chukwa/extraction/demux/ChukwaRecordOutputFormat.java	(working copy)
@@ -28,17 +28,17 @@
 
 public class ChukwaRecordOutputFormat extends
     MultipleSequenceFileOutputFormat<ChukwaRecordKey, ChukwaRecord> {
   static Logger log = Logger.getLogger(ChukwaRecordOutputFormat.class);
 
   @Override
   protected String generateFileNameForKeyValue(ChukwaRecordKey key,
       ChukwaRecord record, String name) {
-    String output = RecordUtil.getClusterName(record) + "/"
+    String output = key.getClusterName() + "/"
         + key.getReduceType() + "/" + key.getReduceType()
         + Util.generateTimeOutput(record.getTime());
 
     // {log.info("ChukwaOutputFormat.fileName: [" + output +"]");}
 
     return output;
   }
 }
Index: src/java/org/apache/hadoop/chukwa/extraction/engine/ChukwaRecordKey.java
===================================================================
--- src/java/org/apache/hadoop/chukwa/extraction/engine/ChukwaRecordKey.java	(revision 918900)
+++ src/java/org/apache/hadoop/chukwa/extraction/engine/ChukwaRecordKey.java	(working copy)
@@ -12,25 +12,32 @@
     _rio_recTypeInfo.addField("reduceType",
         org.apache.hadoop.record.meta.TypeID.StringTypeID);
     _rio_recTypeInfo.addField("key",
         org.apache.hadoop.record.meta.TypeID.StringTypeID);
   }
 
   private String reduceType;
   private String key;
+  private String clusterName="undefined";
 
   public ChukwaRecordKey() {
   }
 
   public ChukwaRecordKey(final String reduceType, final String key) {
     this.reduceType = reduceType;
     this.key = key;
   }
 
+  public ChukwaRecordKey(final String reduceType, final String key, final String clusterName) {
+    this.reduceType = reduceType;
+    this.key = key;
+    this.clusterName = clusterName;
+  }
+
   public static org.apache.hadoop.record.meta.RecordTypeInfo getTypeInfo() {
     return _rio_recTypeInfo;
   }
 
   public static void setTypeFilter(
       org.apache.hadoop.record.meta.RecordTypeInfo rti) {
     if (null == rti)
       return;
@@ -73,38 +80,48 @@
   public String getReduceType() {
     return reduceType;
   }
 
   public void setReduceType(final String reduceType) {
     this.reduceType = reduceType;
   }
 
+  public String getClusterName() {
+    return clusterName;
+  }
+
+  public void setClusterName(final String clusterName) {
+    this.clusterName = clusterName;
+  }
+
   public String getKey() {
     return key;
   }
 
   public void setKey(final String key) {
     this.key = key;
   }
 
   public void serialize(final org.apache.hadoop.record.RecordOutput _rio_a,
       final String _rio_tag) throws java.io.IOException {
     _rio_a.startRecord(this, _rio_tag);
     _rio_a.writeString(reduceType, "reduceType");
     _rio_a.writeString(key, "key");
+    _rio_a.writeString(clusterName, "clusterName");
     _rio_a.endRecord(this, _rio_tag);
   }
 
   private void deserializeWithoutFilter(
       final org.apache.hadoop.record.RecordInput _rio_a, final String _rio_tag)
       throws java.io.IOException {
     _rio_a.startRecord(_rio_tag);
     reduceType = _rio_a.readString("reduceType");
     key = _rio_a.readString("key");
+    clusterName = _rio_a.readString("clusterName");
     _rio_a.endRecord(_rio_tag);
   }
 
   public void deserialize(final org.apache.hadoop.record.RecordInput _rio_a,
       final String _rio_tag) throws java.io.IOException {
     if (null == _rio_rtiFilter) {
       deserializeWithoutFilter(_rio_a, _rio_tag);
       return;
@@ -112,16 +129,18 @@
     // if we're here, we need to read based on version info
     _rio_a.startRecord(_rio_tag);
     setupRtiFields();
     for (int _rio_i = 0; _rio_i < _rio_rtiFilter.getFieldTypeInfos().size(); _rio_i++) {
       if (1 == _rio_rtiFilterFields[_rio_i]) {
         reduceType = _rio_a.readString("reduceType");
       } else if (2 == _rio_rtiFilterFields[_rio_i]) {
         key = _rio_a.readString("key");
+      } else if (3 == _rio_rtiFilterFields[_rio_i]) {
+        clusterName = _rio_a.readString("clusterName");
       } else {
         java.util.ArrayList<org.apache.hadoop.record.meta.FieldTypeInfo> typeInfos = (java.util.ArrayList<org.apache.hadoop.record.meta.FieldTypeInfo>) (_rio_rtiFilter
             .getFieldTypeInfos());
         org.apache.hadoop.record.meta.Utils.skip(_rio_a, typeInfos.get(_rio_i)
             .getFieldID(), typeInfos.get(_rio_i).getTypeID());
       }
     }
     _rio_a.endRecord(_rio_tag);
Index: src/java/org/apache/hadoop/chukwa/extraction/engine/RecordUtil.java
===================================================================
--- src/java/org/apache/hadoop/chukwa/extraction/engine/RecordUtil.java	(revision 918900)
+++ src/java/org/apache/hadoop/chukwa/extraction/engine/RecordUtil.java	(working copy)
@@ -26,27 +26,16 @@
 /**
  * Various utility methods.
  * 
  */
 public class RecordUtil {
   static Pattern clusterPattern = Pattern
       .compile("(.*)?cluster=\"(.*?)\"(.*)?");
 
-  public static String getClusterName(Record record) {
-    String tags = record.getValue(Record.tagsField);
-    if (tags != null) {
-      Matcher matcher = clusterPattern.matcher(tags);
-      if (matcher.matches()) {
-        return matcher.group(2);
-      }
-    }
-
-    return "undefined";
-  }
   /**
    * Uses a precompiled pattern, so theoretically faster than
    * Chunk.getTag().
    * 
    */
   public static String getClusterName(Chunk chunk) {
     String tags = chunk.getTags();
     if (tags != null) {
Index: src/java/org/apache/hadoop/chukwa/dataloader/MetricDataLoader.java
===================================================================
--- src/java/org/apache/hadoop/chukwa/dataloader/MetricDataLoader.java	(revision 918900)
+++ src/java/org/apache/hadoop/chukwa/dataloader/MetricDataLoader.java	(working copy)
@@ -193,17 +193,17 @@
     int numOfRecords = 0;
     try {
       Pattern p = Pattern.compile("(.*)\\-(\\d+)$");
       int batch = 0;
       while (reader.next(key, record)) {
     	numOfRecords++;
         if(first) { 
           try {
-            cluster = RecordUtil.getClusterName(record);
+            cluster = key.getClusterName();
             initEnv(cluster);
             first=false;
           } catch(Exception ex) {
             log.error("Initialization failed for: "+cluster+".  Please check jdbc configuration.");
             return false;
           }
         }
         String sqlTime = DatabaseWriter.formatTimeStamp(record.getTime());

Reply via email to