[
https://issues.apache.org/jira/browse/HBASE-8874?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13709882#comment-13709882
]
rajeshbabu commented on HBASE-8874:
-----------------------------------
@Ted,
bq. How was the default value determined ?
This default value i have given is same value as in reducer because there also
we will combine all kvs of a row.
{code}
// although reduce() is called per-row, handle pathological case
long threshold = context.getConfiguration().getLong(
"putsortreducer.row.threshold", 2L * (1<<30));
{code}
But when I have tested with more than 2GB values for same row key got OOME(yarn
child heap memory is default 1GB)).
Actually we need to calculate these threshold values from yarn child JVM heap
memory and mapreduce.task.io.sort.mb value. I am trying to find it out. If you
have idea how to calculate please share it.
> PutCombiner is skipping KeyValues while combining puts of same row during
> bulkload
> ----------------------------------------------------------------------------------
>
> Key: HBASE-8874
> URL: https://issues.apache.org/jira/browse/HBASE-8874
> Project: HBase
> Issue Type: Bug
> Components: mapreduce
> Affects Versions: 0.95.0, 0.95.1
> Reporter: rajeshbabu
> Assignee: rajeshbabu
> Priority: Critical
> Fix For: 0.98.0, 0.95.2
>
> Attachments: HBASE-8874_trunk.patch
>
>
> While combining puts of same row in map phase we are using below logic in
> PutCombiner#reduce. In for loop first time we will add one Put object to puts
> map. Next time onwards we are just overriding key values of a family with key
> values of the same family in other put. So we are mostly writing one Put
> object to map output and remaining will be skipped(data loss).
> {code}
> Map<byte[], Put> puts = new TreeMap<byte[], Put>(Bytes.BYTES_COMPARATOR);
> for (Put p : vals) {
> cnt++;
> if (!puts.containsKey(p.getRow())) {
> puts.put(p.getRow(), p);
> } else {
> puts.get(p.getRow()).getFamilyMap().putAll(p.getFamilyMap());
> }
> }
> {code}
> We need to change logic similar as below because we are sure the rowkey of
> all the puts will be same.
> {code}
> Put finalPut = null;
> Map<byte[], List<? extends Cell>> familyMap = null;
> for (Put p : vals) {
> cnt++;
> if (finalPut==null) {
> finalPut = p;
> familyMap = finalPut.getFamilyMap();
> } else {
> for (Entry<byte[], List<? extends Cell>> entry :
> p.getFamilyMap().entrySet()) {
> List<? extends Cell> list = familyMap.get(entry.getKey());
> if (list == null) {
> familyMap.put(entry.getKey(), entry.getValue());
> } else {
> (((List<KeyValue>)list)).addAll((List<KeyValue>)entry.getValue());
> }
> }
> }
> }
> context.write(row, finalPut);
> {code}
> Also need to implement TODOs mentioned by Nick
> {code}
> // TODO: would be better if we knew <code>K row</code> and Put rowkey were
> // identical. Then this whole Put buffering business goes away.
> // TODO: Could use HeapSize to create an upper bound on the memory size of
> // the puts map and flush some portion of the content while looping. This
> // flush could result in multiple Puts for a single rowkey. That is
> // acceptable because Combiner is run as an optimization and it's not
> // critical that all Puts are grouped perfectly.
> {code}
--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira