[ 
https://issues.apache.org/jira/browse/HADOOP-3484?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=12613049#action_12613049
 ] 

Wei-Ming Chen commented on HADOOP-3484:
---------------------------------------

Hi,

I also found the missing "+1" bug a few days ago. However, there are still 
duplicates even if the proposed patch (pos_ += m + 1) is applied. 

The duplicates happen when the assigned split boundary (end_) is between an end 
tag and another beginning tag (for example, there are several newlines between 
two records). In this case, after reading the last record in this split, the 
pos_ is still smaller than end_. Therefore, the reader will continue trying to 
find another beginning tag which may actually resides in another split. In this 
case, both splits will return the record, causing duplicates.

In addition, the pos_ variable does not present the real position in the 
situation that there are additional bytes between records, even if the proposed 
patch is applied. This is because pos_ is only updated when trying to find the 
end tag. If there are several bytes skipped during the process of finding the 
beginning tag and the pos_ is not updated, the position will be wrong.

My solution should be able to solve the two problems. If you agree with me and 
think my solution works, feel free to use my code. I willl be glad if the 
problem gets fixed in the next release.

{code:title=StreamXmlRecordReader.java|borderStyle=solid}

  boolean fastReadUntilMatch(String textPat, boolean includePat, 
DataOutputBuffer outBufOrNull) throws IOException {
    byte[] cpat = textPat.getBytes("UTF-8");
    int m = 0;
    boolean match = false;
    int msup = cpat.length;
    int LL = 120000 * 10;

    bin_.mark(LL); // large number to invalidate mark
    while (true) {
      int b = bin_.read();
      if (b == -1) break;

      byte c = (byte) b; // this assumes eight-bit matching. OK with UTF-8
      if (c == cpat[m]) {
        m++;
        if (m == msup) {
          match = true;
          break;
        }
      } else {
        bin_.mark(LL); // rest mark so we could jump back if we found a match
        if (outBufOrNull != null) {
          outBufOrNull.write(cpat, 0, m);
          outBufOrNull.write(c);
-         pos_ += m;
+         pos_ += m + 1;
+       } else {
+         pos_ += m + 1;
+         if (pos_ >= end_)
+           break;
        }
        m = 0;
      }
    }
    if (!includePat && match) {
      bin_.reset();
    } else if (outBufOrNull != null) {
      outBufOrNull.write(cpat);
      pos_ += msup;
    }
    return match;
  }
{code} 


> Duplicate Mapper input when using StreamXmlRecordReader
> -------------------------------------------------------
>
>                 Key: HADOOP-3484
>                 URL: https://issues.apache.org/jira/browse/HADOOP-3484
>             Project: Hadoop Core
>          Issue Type: Bug
>          Components: contrib/streaming
>    Affects Versions: 0.17.0
>         Environment: HADOOP 0.17.0, Java 6.0
>            Reporter: David Campbell
>            Assignee: Bo Adler
>         Attachments: 0001-test-to-demonstrate-HADOOP-3484.patch, 
> 0002-patch-for-HADOOP-3484.patch, HADOOP-3484.combined.patch
>
>
> I have an XML file with 93626 rows.  A row is marked by <row>...</row>.
> I've confirmed this with grep and the Grep example program included with 
> HADOOP.
> Here is the grep example output.  93626       <row>
> I've setup my job configuration as follows:   
> conf.set("stream.recordreader.class", 
> "org.apache.hadoop.streaming.StreamXmlRecordReader");
> conf.set("stream.recordreader.begin", "<row>");
> conf.set("stream.recordreader.end", "</row>");
> conf.setInputFormat(StreamInputFormat.class);
> I have a fairly simple test Mapper.
> Here's the map method.
>   public void map(Text key, Text value, OutputCollector<Text, IntWritable> 
> output, Reporter reporter) throws IOException {
>         try {
>             output.collect(totalWord, one);
>             if (key != null && key.toString().indexOf("01852") != -1) {
>                 output.collect(new Text("01852"), one);
>             }
>         } catch (Exception ex) {
>             Logger.getLogger(TestMapper.class.getName()).log(Level.SEVERE, 
> null, ex);
>             System.out.println(value);
>         }
>     }
> For totalWord ("TOTAL"), I get:
> TOTAL 140850
> and for 01852 I get.
> 01852 86
> There are 43 instances of 01852 in the file.
> I have the following setting in my config.  
>    conf.setNumMapTasks(1);
> I have a total of six machines in my cluster.
> If I run without this, the result is 12x the actual value, not 2x.
> Here's some info from the cluster web page.
> Maps  Reduces Total Submissions       Nodes   Map Task Capacity       Reduce 
> Task Capacity    Avg. Tasks/Node
> 0     0       1       6       12      12      4.00
> I've also noticed something really strange in the job's output.  It looks 
> like it's starting over or redoing things.
> This was run using all six nodes and no limitations on map or reduce tasks.  
> I haven't seen this behavior in any other case.
> 08/06/03 10:50:35 INFO mapred.FileInputFormat: Total input paths to process : 
> 1
> 08/06/03 10:50:36 INFO mapred.JobClient: Running job: job_200806030916_0018
> 08/06/03 10:50:37 INFO mapred.JobClient:  map 0% reduce 0%
> 08/06/03 10:50:42 INFO mapred.JobClient:  map 2% reduce 0%
> 08/06/03 10:50:45 INFO mapred.JobClient:  map 12% reduce 0%
> 08/06/03 10:50:47 INFO mapred.JobClient:  map 31% reduce 0%
> 08/06/03 10:50:48 INFO mapred.JobClient:  map 49% reduce 0%
> 08/06/03 10:50:49 INFO mapred.JobClient:  map 68% reduce 0%
> 08/06/03 10:50:50 INFO mapred.JobClient:  map 100% reduce 0%
> 08/06/03 10:50:54 INFO mapred.JobClient:  map 87% reduce 0%
> 08/06/03 10:50:55 INFO mapred.JobClient:  map 100% reduce 0%
> 08/06/03 10:50:56 INFO mapred.JobClient:  map 0% reduce 0%
> 08/06/03 10:51:00 INFO mapred.JobClient:  map 0% reduce 1%
> 08/06/03 10:51:05 INFO mapred.JobClient:  map 28% reduce 2%
> 08/06/03 10:51:07 INFO mapred.JobClient:  map 80% reduce 4%
> 08/06/03 10:51:08 INFO mapred.JobClient:  map 100% reduce 4%
> 08/06/03 10:51:09 INFO mapred.JobClient:  map 100% reduce 7%
> 08/06/03 10:51:10 INFO mapred.JobClient:  map 90% reduce 9%
> 08/06/03 10:51:11 INFO mapred.JobClient:  map 100% reduce 9%
> 08/06/03 10:51:12 INFO mapred.JobClient:  map 100% reduce 11%
> 08/06/03 10:51:13 INFO mapred.JobClient:  map 90% reduce 11%
> 08/06/03 10:51:14 INFO mapred.JobClient:  map 97% reduce 11%
> 08/06/03 10:51:15 INFO mapred.JobClient:  map 63% reduce 11%
> 08/06/03 10:51:16 INFO mapred.JobClient:  map 48% reduce 11%
> 08/06/03 10:51:17 INFO mapred.JobClient:  map 21% reduce 11%
> 08/06/03 10:51:19 INFO mapred.JobClient:  map 0% reduce 11%
> 08/06/03 10:51:20 INFO mapred.JobClient:  map 15% reduce 12%
> 08/06/03 10:51:21 INFO mapred.JobClient:  map 27% reduce 13%
> 08/06/03 10:51:22 INFO mapred.JobClient:  map 67% reduce 13%
> 08/06/03 10:51:24 INFO mapred.JobClient:  map 22% reduce 16%
> 08/06/03 10:51:25 INFO mapred.JobClient:  map 46% reduce 16%
> 08/06/03 10:51:26 INFO mapred.JobClient:  map 70% reduce 16%
> 08/06/03 10:51:27 INFO mapred.JobClient:  map 73% reduce 18%
> 08/06/03 10:51:28 INFO mapred.JobClient:  map 85% reduce 19%
> 08/06/03 10:51:29 INFO mapred.JobClient:  map 7% reduce 19%
> 08/06/03 10:51:32 INFO mapred.JobClient:  map 100% reduce 20%
> 08/06/03 10:51:35 INFO mapred.JobClient:  map 100% reduce 22%
> 08/06/03 10:51:37 INFO mapred.JobClient:  map 100% reduce 23%
> 08/06/03 10:51:38 INFO mapred.JobClient:  map 100% reduce 46%
> 08/06/03 10:51:39 INFO mapred.JobClient:  map 100% reduce 58%
> 08/06/03 10:51:40 INFO mapred.JobClient:  map 100% reduce 80%
> 08/06/03 10:51:42 INFO mapred.JobClient:  map 100% reduce 90%
> 08/06/03 10:51:43 INFO mapred.JobClient:  map 100% reduce 100%
> 08/06/03 10:51:44 INFO mapred.JobClient: Job complete: job_200806030916_0018
> 08/06/03 10:51:44 INFO mapred.JobClient: Counters: 17
> 08/06/03 10:51:44 INFO mapred.JobClient:   File Systems
> 08/06/03 10:51:44 INFO mapred.JobClient:     Local bytes read=1705
> 08/06/03 10:51:44 INFO mapred.JobClient:     Local bytes written=29782
> 08/06/03 10:51:44 INFO mapred.JobClient:     HDFS bytes read=1366064660
> 08/06/03 10:51:44 INFO mapred.JobClient:     HDFS bytes written=23
> 08/06/03 10:51:44 INFO mapred.JobClient:   Job Counters 
> 08/06/03 10:51:44 INFO mapred.JobClient:     Launched map tasks=37
> 08/06/03 10:51:44 INFO mapred.JobClient:     Launched reduce tasks=10
> 08/06/03 10:51:44 INFO mapred.JobClient:     Data-local map tasks=22
> 08/06/03 10:51:44 INFO mapred.JobClient:     Rack-local map tasks=15
> 08/06/03 10:51:44 INFO mapred.JobClient:   Map-Reduce Framework
> 08/06/03 10:51:44 INFO mapred.JobClient:     Map input records=942105
> 08/06/03 10:51:44 INFO mapred.JobClient:     Map output records=942621
> 08/06/03 10:51:44 INFO mapred.JobClient:     Map input bytes=1365761556
> 08/06/03 10:51:44 INFO mapred.JobClient:     Map output bytes=9426210
> 08/06/03 10:51:44 INFO mapred.JobClient:     Combine input records=942621
> 08/06/03 10:51:44 INFO mapred.JobClient:     Combine output records=49
> 08/06/03 10:51:44 INFO mapred.JobClient:     Reduce input groups=2
> 08/06/03 10:51:44 INFO mapred.JobClient:     Reduce input records=49
> 08/06/03 10:51:44 INFO mapred.JobClient:     Reduce output records=2

-- 
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.

Reply via email to