[
https://issues.apache.org/jira/browse/HBASE-12757?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14258616#comment-14258616
]
pangxiaoxi commented on HBASE-12757:
------------------------------------
I have a table A, I want translate A to Table B, A & B has different rowkey
,but most of values are same;
I write a Custom MR like ImportTSV, but my InputFormat is table A.
When I excute MR , some map() input rowkey is out of range on current Region
(get it from inputsplit ).
this mabey lost data or get unused data.
code :
=======================================
public class RebuildTable{
public final static class RebuildMapper extends
TableMapper<ImmutableBytesWritable, Writable>{
public boolean isOutputRel = true;
public boolean isOutputData = true;
private static byte[] DOC_FAMILY = Bytes.toBytes("doc");
private static byte[] URL_QUALIFIER= Bytes.toBytes("url");
private static byte[] FWDURL_QUALIFIER =
Bytes.toBytes("forward_url");
private static byte[] PKEY_QUALIFIER =
Bytes.toBytes("rel_pkey");
private static byte[] DATAKEY_QUALIFIER =
Bytes.toBytes("data_key");
private static byte[] TN_QUALIFIER =
Bytes.toBytes("table_name");
private static byte[] CURL_QUALIFIER = Bytes.toBytes("c_url");
private Logger logger =
LoggerFactory.getLogger(RebuildMapper.class);
protected int type = -1;
protected long count = 0;
private HTable relTable = null;
private String table_name = null;
private String table_ng_name = null;
private String location = null;
private byte[] start_row = null;
private byte[] end_row = null;
@Override
protected void setup(Context context){
type =
Integer.valueOf(context.getConfiguration().get("job.hylanda.data_type"));
//初始化一个xxxx_rel_ng表对象
try {
System.out.println( table_name + "=>" +
table_ng_name);
TableSplit split =
(TableSplit)context.getInputSplit();
if(split != null){
start_row = split.getStartRow();
end_row = split.getEndRow();
System.out.println( split.toString());
location = split.getRegionLocation();
System.out.println(String.format("location=%1$s,start_row=%2$s , end_row=%3$s",
location
,HbaseUtil.printRowkey(start_row),HbaseUtil.printRowkey(end_row)));
}
isOutputRel =
context.getConfiguration().getBoolean("conf.output_rel", true);
isOutputData=
context.getConfiguration().getBoolean("conf.output_data", true);
table_name =
context.getConfiguration().get("conf.table_name");
table_ng_name =
context.getConfiguration().get("conf.table_ng_name");
if(isOutputRel){
Configuration conf = new
Configuration(context.getConfiguration());
conf.setLong("hbase.htable.threads.keepalivetime", 180);
relTable = new
HTable(conf,context.getConfiguration().get("conf.reltable_name"));
relTable.setAutoFlush(false);
}
} catch (Exception e) {
// TODO 自动生成的 catch 块
logger.error("setup ex:"+e);
e.printStackTrace();
}
}
@Override
protected void cleanup(Context context){
if(relTable != null){
try {
relTable.flushCommits();
relTable.close();
} catch (IOException e) {
// TODO 自动生成的 catch 块
e.printStackTrace();
}
}
}
@Override
public void map(ImmutableBytesWritable row,Result columns
,Context context) {
try{
byte[] rowkey = row.get();
if(Bytes.compareTo(start_row, rowkey) > 0 ||
Bytes.compareTo(end_row, rowkey) < 0){ //test code
TableSplit split =
(TableSplit)context.getInputSplit();
if(split != null){
SimpleDateFormat sdf = new
SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
System.err.println(String.format("%5$s\tlocation=%1$s,start_row=%2$s
,rowkey=%3$s ,end_row=%4$s",
split.getRegionLocation() ,HbaseUtil.printRowkey(split.getStartRow()),
HbaseUtil.printRowkey(rowkey),HbaseUtil.printRowkey(split.getEndRow()),sdf.format(new
Date())));
}
return;
}
if(count++ % 10000 == 0) {
logger.info("Scan="+ count + "
;rowkey=" + HbaseUtil.printRowkey(rowkey));
}
String url =
Bytes.toString(columns.getValue(DOC_FAMILY , URL_QUALIFIER));
long rcrc =
GenUrlCrc64.GenReverseCrc64Long(url); //gen 64-bit crc
Bytes.putLong(rowkey, 0, rcrc);
Put put = new Put(rowkey); //写ng表的put
List<Put> puts = new ArrayList<Put>();
//写rel表的puts
if(type == weibo_type ){
for(KeyValue kv :columns.list()){
if(Bytes.toString(kv.getQualifier()).equals("rel_pkey")){
byte[] pkey =
columns.getValue(DOC_FAMILY , PKEY_QUALIFIER);
String pkurl =
Bytes.toString(columns.getValue(DOC_FAMILY , FWDURL_QUALIFIER)); //取原串
Bytes.putLong(pkey, 0,
GenUrlCrc64.GenReverseCrc64Long(pkurl));
put.add(kv.getFamily(),
kv.getQualifier(), kv.getTimestamp(), pkey);
Put put_rel = new
Put(Bytes.toBytes(GenUrlCrc64.GenCrc64Long(pkurl)));
put_rel.add(DOC_FAMILY
, Bytes.add(Bytes.toBytes("rel_"), rowkey),Bytes.toBytes(table_ng_name));
puts.add(put_rel);
}else{
put.add(kv.getFamily(),
kv.getQualifier(), kv.getTimestamp(), kv.getValue());
}
}
}else if(type == ebusiness_type){
for(KeyValue kv :columns.list()){
if(Bytes.toString(kv.getQualifier()).equals("rel_pkey")){
String pkurl =
Bytes.toString(columns.getValue(DOC_FAMILY , CURL_QUALIFIER)); //取原串
byte[] pkey =
columns.getValue(DOC_FAMILY , PKEY_QUALIFIER);
Bytes.putLong(pkey, 0,
GenUrlCrc64.GenReverseCrc64Long(pkurl));
put.add(kv.getFamily(),
kv.getQualifier(), kv.getTimestamp(), pkey);
//
Put put_rel = new
Put(Bytes.toBytes(GenUrlCrc64.GenCrc64Long(pkurl)));
put_rel.add(DOC_FAMILY
, Bytes.add(Bytes.toBytes("rel_"), rowkey),Bytes.toBytes(table_ng_name));
puts.add(put_rel);
}else{
put.add(kv.getFamily(),
kv.getQualifier(), kv.getTimestamp(), kv.getValue());
}
}
}else{
for(KeyValue kv :columns.list()){
put.add(kv.getFamily(),
kv.getQualifier(), kv.getTimestamp(), kv.getValue());
}
}
while(isOutputData){
try{
context.write(new
ImmutableBytesWritable(rowkey), put);
break;
}catch(Exception ex){
logger.error("context write
ex:"+ex);
}
}
//写rel表基本信息
byte[] urlcrc = Bytes.tail(rowkey, 8);
Put putRel = new Put(urlcrc);
putRel.add(DOC_FAMILY , DATAKEY_QUALIFIER,
rowkey);
putRel.add(DOC_FAMILY , TN_QUALIFIER,
Bytes.toBytes(table_ng_name));
puts.add(putRel);
while(isOutputRel && relTable != null){
try{
relTable.put(puts);
break;
}catch(Exception ex){
logger.error("put
ex:"+ex.toString());
}
}
context.getCounter("Rebuild","success").increment(1);
}catch(Exception ex){
System.err.println("Err:"+ex +",row:" +
Bytes.toStringBinary(row.get()));
context.getCounter("Rebuild","failed").increment(1);
}
}
}
public static void main(String[] argv){
String hdfsip = "10.0.5.34";
String zkIps = "10.0.5.34";
Configuration conf = new Configuration();
System.setProperty("HADOOP_USER_NAME", "hadoop");
Configuration hbaseConfiguration =
HBaseConfiguration.create(conf);
hbaseConfiguration.set("mapred.job.priority",
JobPriority.HIGH.name());
hbaseConfiguration.set("fs.default.name", "hdfs://" +
hdfsip + ":9000");
hbaseConfiguration.set("mapred.job.tracker", hdfsip +
":9001");
hbaseConfiguration.set("hbase.zookeeper.quorum", zkIps);
hbaseConfiguration.set("hbase.zookeeper.property.clientPort", "2181");
hbaseConfiguration.set("mapred.reduce.tasks.speculative.execution", "false");
hbaseConfiguration.set("mapred.map.tasks.speculative.execution", "false");
hbaseConfiguration.set("mapred.job.queue.name",
"default");
hbaseConfiguration.set("mapred.child.java.opts","-Xmx1024m");
hbaseConfiguration.setLong(HConstants.HBASE_REGIONSERVER_LEASE_PERIOD_KEY,
180000);
hbaseConfiguration.setLong("dfs.socket.timeout",
180000);
hbaseConfiguration.setLong("hbase.htable.threads.keepalivetime",180);
String tablename = "news_201411";
String tablename_ng = "news_201411_ng";
String tablename_rel_ng = "news_rel_ng";
HbaseUtil.createRelTable(hbaseConfiguration,
relngName); //create another table
hbaseConfiguration.set("conf.table_name", tablename);
hbaseConfiguration.set("conf.table_ng_name",
tablename_ng);
hbaseConfiguration.set("conf.reltable_name",
tablename_rel_ng);
Job job = new
Job(hbaseConfiguration,"job_rebuilder_"+tablename);
job.setJarByClass(RebuildMapper.class);
List<Scan> scans = new ArrayList<Scan>();
Scan scan = new Scan();
scan.setCacheBlocks(false);
scan.setAttribute(Scan.SCAN_ATTRIBUTES_TABLE_NAME,
Bytes.toBytes(tablename));
scan.setCaching(100);
scans.add(scan);
TableMapReduceUtil.initTableMapperJob(scans,
RebuildMapper.class,ImmutableBytesWritable.class ,Put.class,job);
job.setReducerClass(PutSortReducer.class);
String hfileOutPath = "/user/hadoop/"+tablename_ng ;
Path outputDir = new Path(hfileOutPath);
FileOutputFormat.setOutputPath(job, outputDir);
job.setMapOutputKeyClass(ImmutableBytesWritable.class);
job.setMapOutputValueClass(Put.class);
HTable table = new HTable(conf,tablename);
HFileOutputFormat.configureIncrementalLoad(job, table);
System.out.println("set job begin,'" + tablename + "'
=> '" + tablename_ng + "'");
boolean bCompleted = job.waitForCompletion(true);
}
}
> MR map's input rowkey out of range of current Region
> -----------------------------------------------------
>
> Key: HBASE-12757
> URL: https://issues.apache.org/jira/browse/HBASE-12757
> Project: HBase
> Issue Type: Bug
> Components: Client, hbase
> Affects Versions: 0.94.7
> Environment: hadoop 1.1.2, r1440782
> hbase 0.94.7
> linux 2.6.32-279.el6.x86_64
> Reporter: pangxiaoxi
> Priority: Critical
>
> I excute mapreduce scan all table, sometimes map input value of rowkey is out
> of range on current Region (get from inputsplit ).
> this mabey lost data or get unused data.
> ps. I want to use ImportTSV translate table.....
> eg.
> location=datanode11,start_row=D9CB114FD09A82A3_0000000000000000_m_43DAAA689D4AFC86
> ,rowkey=D323E1D0A51E5185_0000000000000000_m_75686B8924108044
> ,end_row=DB0C4FC44E6D80C1_0000000000000000_m_E956CC65322BA3E5
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)