I want to submit this patch.  There are some rough edges--like how to best get 
the partitioner needed  by the getPartition() method.

Suggestions on how to retrieve the job's current partitioner?  I am under the 
currently I just simply hardcoded the class to make it work. :)

private String getParition(ChukwaRecordKey key, ChukwaRecord record) {
                return "part" + paritioner.getPartition(key, record, 
conf.getInt("mapred.reduce.tasks", 0));            
}


On Jul 22, 2010, at 1:34 PM, Corbin Hoenes wrote:

> -getmerge seems to work...  any other suggestions on formats?  I like the 
> idea of making the filename more hadoopy looking.
> MyDataType_20100720_0_35_part-00001.R?  Might require more code change to 
> tack it onto the extension haven't looked at that bit of code yet.
> 
> On Jul 21, 2010, at 10:35 AM, Eric Yang wrote:
> 
>> I think this is in the right direction.  Does this filename convention 
>> allows dfs –getmerge to work on the directory?  If it does, then I am fine 
>> with it.  If it doesn’t, it may be good to label output file name  as 
>> MyDataType_20100720_0_35.R_part0 to align with default output name of 
>> mapreduce.
>> 
>> Regards,
>> Eric
>> 
>> On 7/20/10 11:48 PM, "Corbin Hoenes" <[email protected]> wrote:
>> 
>>> I was looking at replacing the ChukwaRecordPartitioner with a 
>>> HashbasedRecordParitioner. We discussed this earlier here.... there is an 
>>> issue in JIRA: https://issues.apache.org/jira/browse/CHUKWA-481
>>> 
>>> I patched chukwa to allow for a pluggable partitioner and configured chukwa 
>>> to use the hash based partitioner.  But it started failing to rename the 
>>> _temporary files during the commit phase after the reduce was finished 
>>> because now there were multiple reducers trying to move files to 
>>> /chukwa/demuxProcessing/mrOutput with the same filename.   So I added a bit 
>>> more to the filename in ChukwaRecordOutputFormat
>>> 
>>> private String getParition(ChukwaRecordKey key, ChukwaRecord record) {
>>> return "part" + paritioner.getPartition(key, record, 
>>> conf.getInt("mapred.reduce.tasks", 0)); 
>>> }
>>> 
>>> @Override
>>> protected String generateFileNameForKeyValue(ChukwaRecordKey key,
>>> ChukwaRecord record, String name) {
>>> 
>>> String output = RecordUtil.getClusterName(record) + "/"
>>> + key.getReduceType() + "/" + key.getReduceType() + "_" + getParition(key, 
>>> record)
>>> + Util.generateTimeOutput(record.getTime());
>>> 
>>> return output;
>>> } 
>>> 
>>> So my filenames are now 
>>> /chukwa/demuxProcessing/mrOutput/MyCluster/MyDataType/MyDataType_part0_20100720_0_35.R.evt
>>> 
>>> Just added the part to the filename and now when PostProcessorManager picks 
>>> up that directory it can mv each file into the correctly time bucket in 
>>> /chukwa/repos (it increments a count for each file in that directory.
>>> 
>>> Is there a better solution--I am not sure how general purpose my solution 
>>> is.
>>> 
> 

Reply via email to