ZhanxiongWang created HBASE-23062:
-------------------------------------
Summary: Use TableInputFormat to read data from Hbase, when
Scan.setCaching(size) the size is too big, some rowkeys will lost without
exctpions.
Key: HBASE-23062
URL: https://issues.apache.org/jira/browse/HBASE-23062
Project: HBase
Issue Type: Bug
Affects Versions: 0.98.6.1
Reporter: ZhanxiongWang
Attachments: pom.xml
I did the experiment in two ways. One way I use spark to read hbase, the other
I use mapreduce to read hbase. In both cases, when I increase the Scan Caching
size, some data will be lost. To be more accurately, When I set
scan.setCaching(500), I can receive 7622 rows of data, but when I set
scan.setCaching(50000), I can receive only 4226 rows of data. The seriousness
of the problem is that the data is lost but there is no exceptions, it is
difficult to find the reason.
My spark code is like this:
{code:java}
Configuration hbaseConfiguration = HBaseConfiguration.create();
hbaseConfiguration.set("hbase.zookeeper.property.clientPort", zkPort);
hbaseConfiguration.set("hbase.zookeeper.quorum", zkMaster);
hbaseConfiguration.set("zookeeper.znode.parent", zkPath);
hbaseConfiguration.set(TableInputFormat.INPUT_TABLE,hbaseTableName);
hbaseConfiguration.setLong("hbase.client.scanner.timeout.period",6000000);
hbaseConfiguration.setLong("hbase.rpc.timeout",6000000);
final Scan hbaseScan = new Scan();
hbaseScan.addFamily(familyName);
hbaseScan.setCaching(50000);//if Caching is too big, some rowkeys will lost!
for(String[] cell:cellNames){
String column = cell[0];
hbaseScan.addColumn(familyName,Bytes.toBytes(column));
}
hbaseScan.setStartRow(Bytes.toBytes(startRowkeyStr));
hbaseScan.setStopRow(Bytes.toBytes(endRowkeyStr));
try {
ClientProtos.Scan scanProto = ProtobufUtil.toScan(hbaseScan);
hbaseConfiguration.set(TableInputFormat.SCAN,
Base64.encodeBytes(scanProto.toByteArray()));
JavaPairRDD<ImmutableBytesWritable, Result> pairRDD =
jsc.<ImmutableBytesWritable, Result, TableInputFormat>newAPIHadoopRDD(
hbaseConfiguration, TableInputFormat.class, ImmutableBytesWritable.class,
Result.class );
System.out.println("pairRDD.count(): " + pairRDD.count());
}
catch (IOException e) {
System.out.println("Scan Exception!!!!!! " + e.getMessage());
}
{code}
My mapreduce code is like this:
{code:java}
static class HbaseMapper extends TableMapper<ImmutableBytesWritable, Text> {
@Override protected void map(ImmutableBytesWritable key, Result
value,Mapper.Context context) throws IOException, InterruptedException {
for(Cell cell :value.rawCells()){
context.write(new ImmutableBytesWritable("A".getBytes()),new Text("max")); } }}
public static void main(String[] args) throws Exception {
org.apache.hadoop.conf.Configuration hbaseConfiguration =
HBaseConfiguration.create();
hbaseConfiguration.set("hbase.zookeeper.property.clientPort", zkPort);
hbaseConfiguration.set("hbase.zookeeper.quorum", zkMaster);
hbaseConfiguration.set("zookeeper.znode.parent", zkPath);
hbaseConfiguration.setLong("hbase.client.scanner.timeout.period",6000000);
hbaseConfiguration.setLong("hbase.rpc.timeout",6000000);
Job job = Job.getInstance(hbaseConfiguration); job.setJarByClass(App.class);
List<Scan> list = new ArrayList<Scan>(); Scan scan = new Scan();
scan.addFamily(Bytes.toBytes(familyName));
scan.setCaching(50000);//if Caching is too big, some rowkeys will lost!
for (String[] cell : cellNames) {
String column = cell[0];
scan.addColumn(familyName,Bytes.toBytes(column));
}
scan.setStartRow(Bytes.toBytes(startRowkeyStr));
scan.setStopRow(Bytes.toBytes(endRowkeyStr));
scan.setAttribute(Scan.SCAN_ATTRIBUTES_TABLE_NAME,
Bytes.toBytes(hbaseTableName)); list.add(scan);
System.out.println("size: "+list.size());
TableMapReduceUtil.initTableMapperJob(list,HbaseMapper.class,ImmutableBytesWritable.class,Text.class,
job);
job.setMapOutputKeyClass(ImmutableBytesWritable.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(ImmutableBytesWritable.class);
job.setOutputValueClass(Text.class);
FileOutputFormat.setOutputPath(job, new Path("maxTestOutput"));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}{code}
The pom.xml for mapreduce code is like this:
[^pom.xml]
--
This message was sent by Atlassian Jira
(v8.3.4#803005)