I want to submit this patch. There are some rough edges--like how to best get
the partitioner needed by the getPartition() method.
Suggestions on how to retrieve the job's current partitioner? I am under the
currently I just simply hardcoded the class to make it work. :)
private String getParition(ChukwaRecordKey key, ChukwaRecord record) {
return "part" + paritioner.getPartition(key, record,
conf.getInt("mapred.reduce.tasks", 0));
}
On Jul 22, 2010, at 1:34 PM, Corbin Hoenes wrote:
> -getmerge seems to work... any other suggestions on formats? I like the
> idea of making the filename more hadoopy looking.
> MyDataType_20100720_0_35_part-00001.R? Might require more code change to
> tack it onto the extension haven't looked at that bit of code yet.
>
> On Jul 21, 2010, at 10:35 AM, Eric Yang wrote:
>
>> I think this is in the right direction. Does this filename convention
>> allows dfs –getmerge to work on the directory? If it does, then I am fine
>> with it. If it doesn’t, it may be good to label output file name as
>> MyDataType_20100720_0_35.R_part0 to align with default output name of
>> mapreduce.
>>
>> Regards,
>> Eric
>>
>> On 7/20/10 11:48 PM, "Corbin Hoenes" <[email protected]> wrote:
>>
>>> I was looking at replacing the ChukwaRecordPartitioner with a
>>> HashbasedRecordParitioner. We discussed this earlier here.... there is an
>>> issue in JIRA: https://issues.apache.org/jira/browse/CHUKWA-481
>>>
>>> I patched chukwa to allow for a pluggable partitioner and configured chukwa
>>> to use the hash based partitioner. But it started failing to rename the
>>> _temporary files during the commit phase after the reduce was finished
>>> because now there were multiple reducers trying to move files to
>>> /chukwa/demuxProcessing/mrOutput with the same filename. So I added a bit
>>> more to the filename in ChukwaRecordOutputFormat
>>>
>>> private String getParition(ChukwaRecordKey key, ChukwaRecord record) {
>>> return "part" + paritioner.getPartition(key, record,
>>> conf.getInt("mapred.reduce.tasks", 0));
>>> }
>>>
>>> @Override
>>> protected String generateFileNameForKeyValue(ChukwaRecordKey key,
>>> ChukwaRecord record, String name) {
>>>
>>> String output = RecordUtil.getClusterName(record) + "/"
>>> + key.getReduceType() + "/" + key.getReduceType() + "_" + getParition(key,
>>> record)
>>> + Util.generateTimeOutput(record.getTime());
>>>
>>> return output;
>>> }
>>>
>>> So my filenames are now
>>> /chukwa/demuxProcessing/mrOutput/MyCluster/MyDataType/MyDataType_part0_20100720_0_35.R.evt
>>>
>>> Just added the part to the filename and now when PostProcessorManager picks
>>> up that directory it can mv each file into the correctly time bucket in
>>> /chukwa/repos (it increments a count for each file in that directory.
>>>
>>> Is there a better solution--I am not sure how general purpose my solution
>>> is.
>>>
>