[
https://issues.apache.org/jira/browse/AVRO-782?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13007801#comment-13007801
]
Scott Carey commented on AVRO-782:
----------------------------------
Something is odd with how the objects passed in to reduce() are re-used by the
framework.
So, if you change
{code}
value.put("rowKey", key);
{code}
to
{code}
value.put("rowKey", new
Utf8(Arrays.copyOf(key.getBytes(),key.getByteLength())));
{code}
or
{code}
value.put("rowKey", key.toString();
{code}
It works.
Since Utf8 is mutable, the next reduce call modifies it rather than creating a
new one. The same thing can happen with any object.
For example, try this version of the mapper:
{code}
public static class MapImpl extends AvroMapper<GenericRecord,
Pair<Utf8, GenericRecord>> {
static GenericRecord foo = null;
@Override
public void map(GenericRecord input, AvroCollector<Pair<Utf8,
GenericRecord>> collector,
Reporter reporter) throws IOException {
if (null == foo) foo = input;
collector.collect(new
Pair<Utf8,GenericRecord>(input.get("rowKey"),input));
System.out.println("working on " + input.toString() );
System.out.println("the first was " + foo.toString());
}
}
{code}
It will print out:
{noformat}
working on 0000000000000000000000000000000000000
the first was 0000000000000000000000000000000000000
working on 0000000100000000000000000000000000001
the first was 0000000100000000000000000000000000001
working on 0000000200000000000000000000000000002
the first was 0000000200000000000000000000000000002
working on 0000000300000000000000000000000000003
the first was 0000000300000000000000000000000000003
working on 0000000400000000000000000000000000004
the first was 0000000400000000000000000000000000004
{noformat}
the call to collect() here works without copying in the mapper because it
immediately serializes the value. The problem on the mapper side is during
reading, not writing. It reads a value twice in a row for some reason. In
fact, it reads both the '1' and '2' values and keys in the '1' loop.
I think what is happening is that 'key', passed into reduce() is used by the
internal Hadoop value iterator. Once we read a second value in in the generic
record, we overwrite the iterator state and breaks, causing it to skip an
iteration and become offset such that the current loop matches the next key.
The workaround in the short term is to copy the key before setting it on a
mutable object where it can be changed.
We probably want a copy-constructor for Utf8 to make copies easier too.
> issue of cache coherence or reuse for avro map reduce
> -----------------------------------------------------
>
> Key: AVRO-782
> URL: https://issues.apache.org/jira/browse/AVRO-782
> Project: Avro
> Issue Type: Bug
> Components: java
> Affects Versions: 1.5.0
> Environment: Mac with VMWare running Linux training-vm
> 2.6.28-19-server #61-Ubuntu
> Reporter: ey-chih chow
> Attachments: AVRO-782.patch
>
> Original Estimate: 168h
> Remaining Estimate: 168h
>
> Our map reduce jobs are using Avro map/reduce API. For one of the jobs, we
> got the following trace for the reducer:
> ====================================================================================================
> attempt_20110310145147365_0002_r_000000_0/syslog:2011-03-10 14:52:31,226 INFO
> com.ngmoco.ngpipes.sourcing.NgActivityGatheringReducer: working on
> 0000000000000000000000000000000000000 whose rowKey is
> 0000000000000000000000000000000000000
> attempt_20110310145315542_0002_r_000000_0/syslog:2011-03-10 14:53:59,010 INFO
> com.ngmoco.ngpipes.sourcing.NgActivityGatheringReducer: working on
> 0000000000000000000000000000000000000 whose rowKey is
> 0000000000000000000000000000000000000
> attempt_20110310145315542_0002_r_000000_0/syslog:2011-03-10 14:53:59,016 INFO
> com.ngmoco.ngpipes.sourcing.NgActivityGatheringReducer: working on
> 0000000100000000000000000000000000001 whose rowKey is
> 0000000200000000000000000000000000002
> attempt_20110310145315542_0002_r_000000_0/syslog:2011-03-10 14:53:59,017 INFO
> com.ngmoco.ngpipes.sourcing.NgActivityGatheringReducer: working on
> 0000000200000000000000000000000000002 whose rowKey is
> 0000000300000000000000000000000000003
> attempt_20110310145315542_0002_r_000000_0/syslog:2011-03-10 14:53:59,021 INFO
> com.ngmoco.ngpipes.sourcing.NgActivityGatheringReducer: working on
> 0000000300000000000000000000000000003 whose rowKey is
> 0000000400000000000000000000000000004
> attempt_20110310145315542_0002_r_000000_0/syslog:2011-03-10 14:53:59,023 INFO
> com.ngmoco.ngpipes.sourcing.NgActivityGatheringReducer: working on
> 0000000400000000000000000000000000004 whose rowKey is
> 0000000500000000000000000000000000005
> attempt_20110310145315542_0002_r_000000_0/syslog:2011-03-10 14:53:59,024 INFO
> com.ngmoco.ngpipes.sourcing.NgActivityGatheringReducer: working on
> 0000000500000000000000000000000000005 whose rowKey is
> 0000000500000000000000000000000000005
> ====================================================================================================
> If we add the following two lines to the reducer code:
> ====================================================================================================
> boolean workAround =
> getConf().getBoolean(NgActivityGatheringJob.NG_AVRO_BUG_WORKAROUND, true);
> Utf8 dupKey = (workAround) ? new Utf8(key.toString()) : key; // use dupKey
> instead of key passed to reducer
> ====================================================================================================
> We got the following trace, which we consider as the right behavior:
> ====================================================================================================
> 2011-03-10 15:04:33,431 INFO
> com.ngmoco.ngpipes.sourcing.NgActivityGatheringReducer: working on
> 0000000000000000000000000000000000000 whose rowKey is
> 0000000000000000000000000000000000000
> attempt_20110310150517897_0002_r_000000_0/syslog:2011-03-10 15:06:01,374 INFO
> com.ngmoco.ngpipes.sourcing.NgActivityGatheringReducer: working on
> 0000000000000000000000000000000000000 whose rowKey is
> 0000000000000000000000000000000000000
> attempt_20110310150517897_0002_r_000000_0/syslog:2011-03-10 15:06:01,381 INFO
> com.ngmoco.ngpipes.sourcing.NgActivityGatheringReducer: working on
> 0000000100000000000000000000000000001 whose rowKey is
> 0000000100000000000000000000000000001
> attempt_20110310150517897_0002_r_000000_0/syslog:2011-03-10 15:06:01,383 INFO
> com.ngmoco.ngpipes.sourcing.NgActivityGatheringReducer: working on
> 0000000200000000000000000000000000002 whose rowKey is
> 0000000200000000000000000000000000002
> attempt_20110310150517897_0002_r_000000_0/syslog:2011-03-10 15:06:01,389 INFO
> com.ngmoco.ngpipes.sourcing.NgActivityGatheringReducer: working on
> 0000000300000000000000000000000000003 whose rowKey is
> 0000000300000000000000000000000000003
> attempt_20110310150517897_0002_r_000000_0/syslog:2011-03-10 15:06:01,391 INFO
> com.ngmoco.ngpipes.sourcing.NgActivityGatheringReducer: working on
> 0000000400000000000000000000000000004 whose rowKey is
> 0000000400000000000000000000000000004
> attempt_20110310150517897_0002_r_000000_0/syslog:2011-03-10 15:06:01,393 INFO
> com.ngmoco.ngpipes.sourcing.NgActivityGatheringReducer: working on
> 0000000500000000000000000000000000005 whose rowKey is
> 0000000500000000000000000000000000005
> ====================================================================================================
> According to Scott Carey, this might relate to object reuse. We have created
> an Unit test case that will reproduce the problem. The test case will be
> attached as a patch. Note that we run this test case under our Ngmoco dev
> environment, which might need to make some adjustment to run on other
> environment.
--
This message is automatically generated by JIRA.
For more information on JIRA, see: http://www.atlassian.com/software/jira