[
https://issues.apache.org/jira/browse/HBASE-23062?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
ZhanxiongWang updated HBASE-23062:
----------------------------------
Fix Version/s: 1.2.5
> Use TableInputFormat to read data from Hbase, when Scan.setCaching(size) the
> size is too big, some rowkeys will lost without exctpions.
> ---------------------------------------------------------------------------------------------------------------------------------------
>
> Key: HBASE-23062
> URL: https://issues.apache.org/jira/browse/HBASE-23062
> Project: HBase
> Issue Type: Bug
> Affects Versions: 0.98.6.1
> Reporter: ZhanxiongWang
> Priority: Major
> Fix For: 1.2.5
>
> Attachments: pom.xml
>
>
> I did the experiment in three ways. One way I use spark to read hbase, second
> way I use mapreduce to read hbase. In both cases, when I increase the Scan
> Caching size, some data will be lost. To be more accurately, When I set
> scan.setCaching(500), I can receive 7622 rows of data, but when I set
> scan.setCaching(50000), I can receive only 4226 rows of data. Third way I
> use Scan to read hbase directly, caching size does not affect the result, I
> can always receive 7622 rows of data.
> The seriousness of the problem is that the data is lost but there is no
> exceptions, it is difficult to find the reason.
> My spark code is like this:
> {code:java}
> Configuration hbaseConfiguration = HBaseConfiguration.create();
> hbaseConfiguration.set("hbase.zookeeper.property.clientPort", zkPort);
> hbaseConfiguration.set("hbase.zookeeper.quorum", zkMaster);
> hbaseConfiguration.set("zookeeper.znode.parent", zkPath);
> hbaseConfiguration.set(TableInputFormat.INPUT_TABLE,hbaseTableName);
> hbaseConfiguration.setLong("hbase.client.scanner.timeout.period",6000000);
> hbaseConfiguration.setLong("hbase.rpc.timeout",6000000);
> final Scan hbaseScan = new Scan();
> hbaseScan.addFamily(familyName);
> hbaseScan.setCaching(50000);//if Caching is too big, some rowkeys will lost!
> for(String[] cell:cellNames){
> String column = cell[0];
> hbaseScan.addColumn(familyName,Bytes.toBytes(column));
> }
> hbaseScan.setStartRow(Bytes.toBytes(startRowkeyStr));
> hbaseScan.setStopRow(Bytes.toBytes(endRowkeyStr));
> try {
> ClientProtos.Scan scanProto = ProtobufUtil.toScan(hbaseScan);
> hbaseConfiguration.set(TableInputFormat.SCAN,
> Base64.encodeBytes(scanProto.toByteArray()));
> JavaPairRDD<ImmutableBytesWritable, Result> pairRDD =
> jsc.<ImmutableBytesWritable, Result, TableInputFormat>newAPIHadoopRDD(
> hbaseConfiguration,TableInputFormat.class, ImmutableBytesWritable.class,
> Result.class );
> System.out.println("pairRDD.count(): " + pairRDD.count());
> }
> catch (IOException e) {
> System.out.println("Scan Exception!!!!!! " + e.getMessage());
> }
> {code}
> My mapreduce code is like this:
> {code:java}
> static class HbaseMapper extends TableMapper<ImmutableBytesWritable, Text> {
> @Override protected void map(ImmutableBytesWritable key, Result
> value,Mapper.Context context) throws IOException, InterruptedException {
> for(Cell cell :value.rawCells()){
> context.write(new ImmutableBytesWritable("A".getBytes()),new
> Text("max"));
> }
> }
> }
> public static void main(String[] args) throws Exception {
> org.apache.hadoop.conf.Configuration hbaseConfiguration =
> HBaseConfiguration.create();
> hbaseConfiguration.set("hbase.zookeeper.property.clientPort", zkPort);
> hbaseConfiguration.set("hbase.zookeeper.quorum", zkMaster);
> hbaseConfiguration.set("zookeeper.znode.parent", zkPath);
> hbaseConfiguration.setLong("hbase.client.scanner.timeout.period",6000000);
> hbaseConfiguration.setLong("hbase.rpc.timeout",6000000);
> Job job = Job.getInstance(hbaseConfiguration);
> job.setJarByClass(App.class);
> List<Scan> list = new ArrayList<Scan>();
> Scan scan = new Scan();
> scan.addFamily(Bytes.toBytes(familyName));
> scan.setCaching(50000);//if Caching is too big, some rowkeys will lost!
> for (String[] cell : cellNames) {
> String column = cell[0];
> scan.addColumn(familyName,Bytes.toBytes(column));
> }
> scan.setStartRow(Bytes.toBytes(startRowkeyStr));
> scan.setStopRow(Bytes.toBytes(endRowkeyStr));
> scan.setAttribute(Scan.SCAN_ATTRIBUTES_TABLE_NAME,
> Bytes.toBytes(hbaseTableName));
> list.add(scan);
> System.out.println("size: "+list.size());
> TableMapReduceUtil.initTableMapperJob(list,HbaseMapper.class,ImmutableBytesWritable.class,Text.class,
> job);
> job.setMapOutputKeyClass(ImmutableBytesWritable.class);
> job.setMapOutputValueClass(Text.class);
> job.setOutputKeyClass(ImmutableBytesWritable.class);
> job.setOutputValueClass(Text.class);
> FileOutputFormat.setOutputPath(job, new Path("maxTestOutput"));
> System.exit(job.waitForCompletion(true) ? 0 : 1);
> }{code}
> The pom.xml for mapreduce code is like this:
> [^pom.xml]
> Third way code is like this:
> {code:java}
> public static void main(String[] args) throws Exception{
> org.apache.hadoop.conf.Configuration hbaseConfiguration =
> HBaseConfiguration.create();
> hbaseConfiguration.set("hbase.zookeeper.property.clientPort", zkPort);
> hbaseConfiguration.set("hbase.zookeeper.quorum", zkMaster);
> hbaseConfiguration.set("zookeeper.znode.parent", zkPath);
> hbaseConfiguration.setLong("hbase.client.scanner.timeout.period",6000000);
> hbaseConfiguration.setLong("hbase.rpc.timeout",6000000);
> Connection conn = ConnectionFactory.createConnection(hbaseConfiguration);
> HTable table = (HTable) conn.getTable(TableName.valueOf(hbaseTableName));
> Long res = 0l;
> final Scan hbaseScan = new Scan();
> hbaseScan.addFamily(Bytes.toBytes(familyName));
> hbaseScan.setCaching(50000);//if Caching is too big, some rowkeys will lost!
> for (String[] cell : cellNames) {
> String column = cell[0];
> hbaseScan.addColumn(Bytes.toBytes(familyName), Bytes.toBytes(column));
> }
> hbaseScan.setStartRow(Bytes.toBytes(startRowkeyStr));
> hbaseScan.setStopRow(Bytes.toBytes(endRowkeyStr));
> try {
> ResultScanner scanner = table.getScanner(hbaseScan);
> Iterator<Result> it = scanner.iterator();
> while (it.hasNext()) {
> res++;
> Result r = it.next();
> }
> scanner.close();
> } catch (IOException e) {
> System.out.println("Scan Exception!!!!!! " + e.getMessage());
> }
> System.out.println("Successful!");
> }
> {code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)