[ 
https://issues.apache.org/jira/browse/HBASE-23062?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ZhanxiongWang updated HBASE-23062:
----------------------------------
    Fix Version/s: 1.2.5

> Use TableInputFormat to read data from Hbase, when Scan.setCaching(size) the 
> size is too big, some rowkeys will lost without exctpions.
> ---------------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: HBASE-23062
>                 URL: https://issues.apache.org/jira/browse/HBASE-23062
>             Project: HBase
>          Issue Type: Bug
>    Affects Versions: 0.98.6.1
>            Reporter: ZhanxiongWang
>            Priority: Major
>             Fix For: 1.2.5
>
>         Attachments: pom.xml
>
>
> I did the experiment in three ways. One way I use spark to read hbase, second 
> way I use mapreduce to read hbase. In both cases, when I increase the Scan 
> Caching size, some data will be lost. To be more accurately, When I set 
> scan.setCaching(500), I can receive 7622 rows of data, but when I set 
> scan.setCaching(50000), I can receive only 4226 rows of data.  Third way I 
> use Scan to read hbase directly, caching size does not affect the result, I 
> can always receive 7622 rows of data.
> The seriousness of the problem is that the data is lost but there is no 
> exceptions, it is difficult to find the reason.
> My spark code is like this:
> {code:java}
> Configuration hbaseConfiguration = HBaseConfiguration.create();
> hbaseConfiguration.set("hbase.zookeeper.property.clientPort", zkPort);
> hbaseConfiguration.set("hbase.zookeeper.quorum", zkMaster);
> hbaseConfiguration.set("zookeeper.znode.parent", zkPath);
> hbaseConfiguration.set(TableInputFormat.INPUT_TABLE,hbaseTableName);
> hbaseConfiguration.setLong("hbase.client.scanner.timeout.period",6000000);
> hbaseConfiguration.setLong("hbase.rpc.timeout",6000000);
> final Scan hbaseScan = new Scan();
> hbaseScan.addFamily(familyName);
> hbaseScan.setCaching(50000);//if Caching is too big, some rowkeys will lost!
> for(String[] cell:cellNames){ 
>   String column = cell[0]; 
>   hbaseScan.addColumn(familyName,Bytes.toBytes(column));
> }
> hbaseScan.setStartRow(Bytes.toBytes(startRowkeyStr));
> hbaseScan.setStopRow(Bytes.toBytes(endRowkeyStr));
> try { 
>   ClientProtos.Scan scanProto = ProtobufUtil.toScan(hbaseScan); 
>   hbaseConfiguration.set(TableInputFormat.SCAN, 
> Base64.encodeBytes(scanProto.toByteArray()));
> JavaPairRDD<ImmutableBytesWritable, Result> pairRDD = 
> jsc.<ImmutableBytesWritable, Result, TableInputFormat>newAPIHadoopRDD( 
> hbaseConfiguration,TableInputFormat.class, ImmutableBytesWritable.class, 
> Result.class );
>   System.out.println("pairRDD.count(): " + pairRDD.count());
> } 
> catch (IOException e) { 
>   System.out.println("Scan Exception!!!!!! " + e.getMessage());
> }
> {code}
> My mapreduce code is like this:
> {code:java}
> static class HbaseMapper extends TableMapper<ImmutableBytesWritable, Text> {
>    @Override protected void map(ImmutableBytesWritable key, Result 
> value,Mapper.Context context) throws IOException, InterruptedException {
>       for(Cell cell :value.rawCells()){ 
>         context.write(new ImmutableBytesWritable("A".getBytes()),new 
> Text("max")); 
>       } 
>    }
> }
> public static void main(String[] args) throws Exception { 
> org.apache.hadoop.conf.Configuration hbaseConfiguration = 
> HBaseConfiguration.create(); 
> hbaseConfiguration.set("hbase.zookeeper.property.clientPort", zkPort); 
> hbaseConfiguration.set("hbase.zookeeper.quorum", zkMaster);
> hbaseConfiguration.set("zookeeper.znode.parent", zkPath);
> hbaseConfiguration.setLong("hbase.client.scanner.timeout.period",6000000); 
> hbaseConfiguration.setLong("hbase.rpc.timeout",6000000);
> Job job = Job.getInstance(hbaseConfiguration); 
> job.setJarByClass(App.class);
> List<Scan> list = new ArrayList<Scan>(); 
> Scan scan = new Scan(); 
> scan.addFamily(Bytes.toBytes(familyName)); 
> scan.setCaching(50000);//if Caching is too big, some rowkeys will lost! 
> for (String[] cell : cellNames) { 
>   String column = cell[0]; 
>   scan.addColumn(familyName,Bytes.toBytes(column)); 
> } 
> scan.setStartRow(Bytes.toBytes(startRowkeyStr)); 
> scan.setStopRow(Bytes.toBytes(endRowkeyStr)); 
> scan.setAttribute(Scan.SCAN_ATTRIBUTES_TABLE_NAME, 
> Bytes.toBytes(hbaseTableName)); 
> list.add(scan);
> System.out.println("size: "+list.size()); 
> TableMapReduceUtil.initTableMapperJob(list,HbaseMapper.class,ImmutableBytesWritable.class,Text.class,
>  job);
> job.setMapOutputKeyClass(ImmutableBytesWritable.class); 
> job.setMapOutputValueClass(Text.class); 
> job.setOutputKeyClass(ImmutableBytesWritable.class); 
> job.setOutputValueClass(Text.class); 
> FileOutputFormat.setOutputPath(job, new Path("maxTestOutput")); 
> System.exit(job.waitForCompletion(true) ? 0 : 1);
> }{code}
> The pom.xml for mapreduce code is like this:
> [^pom.xml]
>  Third way code is like this:
> {code:java}
> public static void main(String[] args) throws Exception{
>  org.apache.hadoop.conf.Configuration hbaseConfiguration = 
> HBaseConfiguration.create();
>  hbaseConfiguration.set("hbase.zookeeper.property.clientPort", zkPort);
>  hbaseConfiguration.set("hbase.zookeeper.quorum", zkMaster);
>  hbaseConfiguration.set("zookeeper.znode.parent", zkPath);
>  hbaseConfiguration.setLong("hbase.client.scanner.timeout.period",6000000);
>  hbaseConfiguration.setLong("hbase.rpc.timeout",6000000);
>  Connection conn = ConnectionFactory.createConnection(hbaseConfiguration);
>  HTable table = (HTable) conn.getTable(TableName.valueOf(hbaseTableName));
>  Long res = 0l;
>  final Scan hbaseScan = new Scan();
>  hbaseScan.addFamily(Bytes.toBytes(familyName));
>  hbaseScan.setCaching(50000);//if Caching is too big, some rowkeys will lost!
>  for (String[] cell : cellNames) {
>      String column = cell[0];
>      hbaseScan.addColumn(Bytes.toBytes(familyName), Bytes.toBytes(column)); 
> }
>  hbaseScan.setStartRow(Bytes.toBytes(startRowkeyStr));
>  hbaseScan.setStopRow(Bytes.toBytes(endRowkeyStr));
>  try {
>  ResultScanner scanner = table.getScanner(hbaseScan);
>  Iterator<Result> it = scanner.iterator();
>  while (it.hasNext()) {
>      res++;
>      Result r = it.next();
>  }
>  scanner.close(); 
> } catch (IOException e) {
>  System.out.println("Scan Exception!!!!!! " + e.getMessage()); 
> }
>  System.out.println("Successful!");
> }
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to