Wops, the serialization was missing the new field in the ChukwaRecordKey. Now the fixed patch works fine in my environment.
-- Guille -ℬḭṩḩø- <bi...@tuenti.com> :wq
Index: src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/AbstractProcessor.java =================================================================== --- src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/AbstractProcessor.java (revision 918900) +++ src/java/org/apache/hadoop/chukwa/extraction/demux/processor/mapper/AbstractProcessor.java (working copy) @@ -20,29 +20,31 @@ import java.util.Calendar; import org.apache.hadoop.chukwa.ChukwaArchiveKey; import org.apache.hadoop.chukwa.Chunk; import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord; import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey; import org.apache.hadoop.chukwa.extraction.engine.Record; +import org.apache.hadoop.chukwa.extraction.engine.RecordUtil; import org.apache.hadoop.chukwa.util.RecordConstants; import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.Reporter; import org.apache.log4j.Logger; public abstract class AbstractProcessor implements MapProcessor { static Logger log = Logger.getLogger(AbstractProcessor.class); Calendar calendar = Calendar.getInstance(); byte[] bytes; int[] recordOffsets; int currentPos = 0; int startOffset = 0; + protected String chunkClusterName; protected ChukwaArchiveKey archiveKey = null; protected ChukwaRecordKey key = new ChukwaRecordKey(); protected Chunk chunk = null; boolean chunkInErrorSaved = false; OutputCollector<ChukwaRecordKey, ChukwaRecord> output = null; Reporter reporter = null; @@ -90,32 +92,35 @@ calendar.setTimeInMillis(timestamp); calendar.set(Calendar.MINUTE, 0); calendar.set(Calendar.SECOND, 0); calendar.set(Calendar.MILLISECOND, 0); key.setKey("" + calendar.getTimeInMillis() + "/" + chunk.getSource() + "/" + timestamp); key.setReduceType(dataSource); + key.setClusterName(chunkClusterName); if (body != null) { record.add(Record.bodyField, body); } record.setTime(timestamp); - record.add(Record.tagsField, chunk.getTags()); + // Completely remove tagsField, or is needed for other applications? + //record.add(Record.tagsField, chunk.getTags()); record.add(Record.sourceField, chunk.getSource()); record.add(Record.applicationField, chunk.getStreamName()); } protected void reset(Chunk chunk) { this.chunk = chunk; this.bytes = chunk.getData(); this.recordOffsets = chunk.getRecordOffsets(); + this.chunkClusterName = RecordUtil.getClusterName(chunk); currentPos = 0; startOffset = 0; } protected boolean hasNext() { return (currentPos < recordOffsets.length); } Index: src/java/org/apache/hadoop/chukwa/extraction/demux/ChukwaRecordOutputFormat.java =================================================================== --- src/java/org/apache/hadoop/chukwa/extraction/demux/ChukwaRecordOutputFormat.java (revision 918900) +++ src/java/org/apache/hadoop/chukwa/extraction/demux/ChukwaRecordOutputFormat.java (working copy) @@ -28,17 +28,17 @@ public class ChukwaRecordOutputFormat extends MultipleSequenceFileOutputFormat<ChukwaRecordKey, ChukwaRecord> { static Logger log = Logger.getLogger(ChukwaRecordOutputFormat.class); @Override protected String generateFileNameForKeyValue(ChukwaRecordKey key, ChukwaRecord record, String name) { - String output = RecordUtil.getClusterName(record) + "/" + String output = key.getClusterName() + "/" + key.getReduceType() + "/" + key.getReduceType() + Util.generateTimeOutput(record.getTime()); // {log.info("ChukwaOutputFormat.fileName: [" + output +"]");} return output; } } Index: src/java/org/apache/hadoop/chukwa/extraction/engine/ChukwaRecordKey.java =================================================================== --- src/java/org/apache/hadoop/chukwa/extraction/engine/ChukwaRecordKey.java (revision 918900) +++ src/java/org/apache/hadoop/chukwa/extraction/engine/ChukwaRecordKey.java (working copy) @@ -12,25 +12,32 @@ _rio_recTypeInfo.addField("reduceType", org.apache.hadoop.record.meta.TypeID.StringTypeID); _rio_recTypeInfo.addField("key", org.apache.hadoop.record.meta.TypeID.StringTypeID); } private String reduceType; private String key; + private String clusterName="undefined"; public ChukwaRecordKey() { } public ChukwaRecordKey(final String reduceType, final String key) { this.reduceType = reduceType; this.key = key; } + public ChukwaRecordKey(final String reduceType, final String key, final String clusterName) { + this.reduceType = reduceType; + this.key = key; + this.clusterName = clusterName; + } + public static org.apache.hadoop.record.meta.RecordTypeInfo getTypeInfo() { return _rio_recTypeInfo; } public static void setTypeFilter( org.apache.hadoop.record.meta.RecordTypeInfo rti) { if (null == rti) return; @@ -73,38 +80,48 @@ public String getReduceType() { return reduceType; } public void setReduceType(final String reduceType) { this.reduceType = reduceType; } + public String getClusterName() { + return clusterName; + } + + public void setClusterName(final String clusterName) { + this.clusterName = clusterName; + } + public String getKey() { return key; } public void setKey(final String key) { this.key = key; } public void serialize(final org.apache.hadoop.record.RecordOutput _rio_a, final String _rio_tag) throws java.io.IOException { _rio_a.startRecord(this, _rio_tag); _rio_a.writeString(reduceType, "reduceType"); _rio_a.writeString(key, "key"); + _rio_a.writeString(clusterName, "clusterName"); _rio_a.endRecord(this, _rio_tag); } private void deserializeWithoutFilter( final org.apache.hadoop.record.RecordInput _rio_a, final String _rio_tag) throws java.io.IOException { _rio_a.startRecord(_rio_tag); reduceType = _rio_a.readString("reduceType"); key = _rio_a.readString("key"); + clusterName = _rio_a.readString("clusterName"); _rio_a.endRecord(_rio_tag); } public void deserialize(final org.apache.hadoop.record.RecordInput _rio_a, final String _rio_tag) throws java.io.IOException { if (null == _rio_rtiFilter) { deserializeWithoutFilter(_rio_a, _rio_tag); return; @@ -112,16 +129,18 @@ // if we're here, we need to read based on version info _rio_a.startRecord(_rio_tag); setupRtiFields(); for (int _rio_i = 0; _rio_i < _rio_rtiFilter.getFieldTypeInfos().size(); _rio_i++) { if (1 == _rio_rtiFilterFields[_rio_i]) { reduceType = _rio_a.readString("reduceType"); } else if (2 == _rio_rtiFilterFields[_rio_i]) { key = _rio_a.readString("key"); + } else if (3 == _rio_rtiFilterFields[_rio_i]) { + clusterName = _rio_a.readString("clusterName"); } else { java.util.ArrayList<org.apache.hadoop.record.meta.FieldTypeInfo> typeInfos = (java.util.ArrayList<org.apache.hadoop.record.meta.FieldTypeInfo>) (_rio_rtiFilter .getFieldTypeInfos()); org.apache.hadoop.record.meta.Utils.skip(_rio_a, typeInfos.get(_rio_i) .getFieldID(), typeInfos.get(_rio_i).getTypeID()); } } _rio_a.endRecord(_rio_tag); Index: src/java/org/apache/hadoop/chukwa/extraction/engine/RecordUtil.java =================================================================== --- src/java/org/apache/hadoop/chukwa/extraction/engine/RecordUtil.java (revision 918900) +++ src/java/org/apache/hadoop/chukwa/extraction/engine/RecordUtil.java (working copy) @@ -26,27 +26,16 @@ /** * Various utility methods. * */ public class RecordUtil { static Pattern clusterPattern = Pattern .compile("(.*)?cluster=\"(.*?)\"(.*)?"); - public static String getClusterName(Record record) { - String tags = record.getValue(Record.tagsField); - if (tags != null) { - Matcher matcher = clusterPattern.matcher(tags); - if (matcher.matches()) { - return matcher.group(2); - } - } - - return "undefined"; - } /** * Uses a precompiled pattern, so theoretically faster than * Chunk.getTag(). * */ public static String getClusterName(Chunk chunk) { String tags = chunk.getTags(); if (tags != null) { Index: src/java/org/apache/hadoop/chukwa/dataloader/MetricDataLoader.java =================================================================== --- src/java/org/apache/hadoop/chukwa/dataloader/MetricDataLoader.java (revision 918900) +++ src/java/org/apache/hadoop/chukwa/dataloader/MetricDataLoader.java (working copy) @@ -193,17 +193,17 @@ int numOfRecords = 0; try { Pattern p = Pattern.compile("(.*)\\-(\\d+)$"); int batch = 0; while (reader.next(key, record)) { numOfRecords++; if(first) { try { - cluster = RecordUtil.getClusterName(record); + cluster = key.getClusterName(); initEnv(cluster); first=false; } catch(Exception ex) { log.error("Initialization failed for: "+cluster+". Please check jdbc configuration."); return false; } } String sqlTime = DatabaseWriter.formatTimeStamp(record.getTime());