[ 
https://issues.apache.org/jira/browse/HBASE-12757?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14258616#comment-14258616
 ] 

pangxiaoxi commented on HBASE-12757:
------------------------------------

I have a table A, I want translate A to Table B, A & B has different rowkey 
,but most of values are same;
I write a Custom MR like ImportTSV, but my InputFormat is table A.
When I excute MR , some map() input rowkey is out of range on current Region 
(get it from inputsplit ).
this mabey lost data or get unused data. 

code :
=======================================
public class RebuildTable{
        
                public final static class RebuildMapper extends 
TableMapper<ImmutableBytesWritable, Writable>{
                public boolean isOutputRel = true;
                public boolean isOutputData = true;
                private static byte[] DOC_FAMILY = Bytes.toBytes("doc");
                private static byte[] URL_QUALIFIER= Bytes.toBytes("url");
                private static byte[] FWDURL_QUALIFIER = 
Bytes.toBytes("forward_url");
                private static byte[] PKEY_QUALIFIER = 
Bytes.toBytes("rel_pkey");
                private static byte[] DATAKEY_QUALIFIER = 
Bytes.toBytes("data_key");
                private static byte[] TN_QUALIFIER = 
Bytes.toBytes("table_name");
                private static byte[] CURL_QUALIFIER = Bytes.toBytes("c_url");
                private Logger logger = 
LoggerFactory.getLogger(RebuildMapper.class);
                protected int type = -1;
                protected long count = 0;
                private HTable relTable = null;
                private String table_name = null;
                private String table_ng_name = null;
                private String location = null;
                private byte[] start_row = null;
                private byte[] end_row = null;
                
                @Override 
                protected void setup(Context context){
                        type = 
Integer.valueOf(context.getConfiguration().get("job.hylanda.data_type"));
                        //初始化一个xxxx_rel_ng表对象
                        try {
                                System.out.println( table_name + "=>" + 
table_ng_name);
                                TableSplit split = 
(TableSplit)context.getInputSplit();
                                if(split != null){
                                        start_row = split.getStartRow();
                                        end_row = split.getEndRow();
                                        System.out.println( split.toString());
                                        location = split.getRegionLocation();
                                        
System.out.println(String.format("location=%1$s,start_row=%2$s , end_row=%3$s",
                                                        location 
,HbaseUtil.printRowkey(start_row),HbaseUtil.printRowkey(end_row)));
                                }
                                isOutputRel = 
context.getConfiguration().getBoolean("conf.output_rel", true);
                                isOutputData= 
context.getConfiguration().getBoolean("conf.output_data", true);
                                table_name = 
context.getConfiguration().get("conf.table_name");
                                table_ng_name = 
context.getConfiguration().get("conf.table_ng_name");
                                if(isOutputRel){
                                        Configuration conf = new 
Configuration(context.getConfiguration());
                                        
conf.setLong("hbase.htable.threads.keepalivetime", 180);
                                        relTable = new 
HTable(conf,context.getConfiguration().get("conf.reltable_name"));
                                        relTable.setAutoFlush(false);
                                }
                        } catch (Exception e) {
                                // TODO 自动生成的 catch 块
                                logger.error("setup ex:"+e);
                                e.printStackTrace();
                        }
                }
                @Override
                protected void cleanup(Context context){
                        if(relTable != null){
                                try {
                                        relTable.flushCommits();
                                        relTable.close();
                                } catch (IOException e) {
                                        // TODO 自动生成的 catch 块
                                        e.printStackTrace();
                                }
                        }
                }
                @Override
                public void map(ImmutableBytesWritable row,Result columns 
,Context context) {
                        try{
                                byte[] rowkey = row.get();
                                if(Bytes.compareTo(start_row, rowkey) > 0 || 
Bytes.compareTo(end_row, rowkey) < 0){ //test code
                                        TableSplit split = 
(TableSplit)context.getInputSplit();
                                        if(split != null){
                                                SimpleDateFormat sdf = new 
SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
                                                
System.err.println(String.format("%5$s\tlocation=%1$s,start_row=%2$s 
,rowkey=%3$s ,end_row=%4$s",
                                                                
split.getRegionLocation() ,HbaseUtil.printRowkey(split.getStartRow()),
                                                                
HbaseUtil.printRowkey(rowkey),HbaseUtil.printRowkey(split.getEndRow()),sdf.format(new
 Date())));
                                        }
                                        return;
                                }

                                if(count++ % 10000 == 0) {
                                        logger.info("Scan="+ count + " 
;rowkey=" + HbaseUtil.printRowkey(rowkey));
                                }
                                String url = 
Bytes.toString(columns.getValue(DOC_FAMILY , URL_QUALIFIER));
                                long rcrc = 
GenUrlCrc64.GenReverseCrc64Long(url); //gen 64-bit crc 
                                Bytes.putLong(rowkey, 0, rcrc);
                                Put put = new Put(rowkey); //写ng表的put
                                List<Put> puts = new ArrayList<Put>(); 
//写rel表的puts
                                if(type == weibo_type  ){
                                        for(KeyValue kv :columns.list()){
                                                
if(Bytes.toString(kv.getQualifier()).equals("rel_pkey")){
                                                        byte[] pkey = 
columns.getValue(DOC_FAMILY , PKEY_QUALIFIER);
                                                        String pkurl = 
Bytes.toString(columns.getValue(DOC_FAMILY , FWDURL_QUALIFIER)); //取原串
                                                        Bytes.putLong(pkey, 0, 
GenUrlCrc64.GenReverseCrc64Long(pkurl));
                                                        put.add(kv.getFamily(), 
kv.getQualifier(), kv.getTimestamp(), pkey);
                                                        Put put_rel = new 
Put(Bytes.toBytes(GenUrlCrc64.GenCrc64Long(pkurl)));
                                                        put_rel.add(DOC_FAMILY 
, Bytes.add(Bytes.toBytes("rel_"), rowkey),Bytes.toBytes(table_ng_name));
                                                        puts.add(put_rel);
                                                }else{
                                                        put.add(kv.getFamily(), 
kv.getQualifier(), kv.getTimestamp(), kv.getValue());   
                                                }
                                        }
                                        
                                }else if(type == ebusiness_type){
                                        for(KeyValue kv :columns.list()){
                                                
if(Bytes.toString(kv.getQualifier()).equals("rel_pkey")){
                                                        String pkurl = 
Bytes.toString(columns.getValue(DOC_FAMILY , CURL_QUALIFIER)); //取原串
                                                        byte[] pkey = 
columns.getValue(DOC_FAMILY , PKEY_QUALIFIER);
                                                        Bytes.putLong(pkey, 0, 
GenUrlCrc64.GenReverseCrc64Long(pkurl));
                                                        put.add(kv.getFamily(), 
kv.getQualifier(), kv.getTimestamp(), pkey);
                                                        //
                                                        Put put_rel = new 
Put(Bytes.toBytes(GenUrlCrc64.GenCrc64Long(pkurl)));
                                                        put_rel.add(DOC_FAMILY 
, Bytes.add(Bytes.toBytes("rel_"), rowkey),Bytes.toBytes(table_ng_name));
                                                        puts.add(put_rel);
                                                }else{
                                                        put.add(kv.getFamily(), 
kv.getQualifier(), kv.getTimestamp(), kv.getValue());   
                                                }
                                        }
                                }else{
                                        for(KeyValue kv :columns.list()){
                                                put.add(kv.getFamily(), 
kv.getQualifier(), kv.getTimestamp(), kv.getValue());
                                        }
                                }
                                while(isOutputData){
                                        try{
                                                context.write(new 
ImmutableBytesWritable(rowkey), put);
                                                break;
                                        }catch(Exception ex){
                                                logger.error("context write 
ex:"+ex);
                                        }
                                }
                                //写rel表基本信息
                                byte[] urlcrc = Bytes.tail(rowkey, 8);
                                Put putRel = new Put(urlcrc);
                                putRel.add(DOC_FAMILY , DATAKEY_QUALIFIER, 
rowkey);
                                putRel.add(DOC_FAMILY , TN_QUALIFIER, 
Bytes.toBytes(table_ng_name));
                                puts.add(putRel);
                                while(isOutputRel && relTable != null){
                                        try{
                                                relTable.put(puts);
                                                break;
                                        }catch(Exception ex){
                                                logger.error("put 
ex:"+ex.toString());
                                        }
                                }
                                
context.getCounter("Rebuild","success").increment(1);
                        }catch(Exception ex){
                                System.err.println("Err:"+ex +",row:" + 
Bytes.toStringBinary(row.get()));
                                
context.getCounter("Rebuild","failed").increment(1);
                        }
                }
        }

                public static void main(String[] argv){
                        
                        String hdfsip = "10.0.5.34";
                        String zkIps = "10.0.5.34";
                        Configuration conf = new Configuration();
                        System.setProperty("HADOOP_USER_NAME", "hadoop");
                        Configuration hbaseConfiguration = 
HBaseConfiguration.create(conf);
                        hbaseConfiguration.set("mapred.job.priority", 
JobPriority.HIGH.name());
                        hbaseConfiguration.set("fs.default.name", "hdfs://" + 
hdfsip + ":9000");
                        hbaseConfiguration.set("mapred.job.tracker", hdfsip + 
":9001");
                        hbaseConfiguration.set("hbase.zookeeper.quorum", zkIps);
                        
hbaseConfiguration.set("hbase.zookeeper.property.clientPort", "2181");
                        
hbaseConfiguration.set("mapred.reduce.tasks.speculative.execution", "false");
                        
hbaseConfiguration.set("mapred.map.tasks.speculative.execution", "false");
                        hbaseConfiguration.set("mapred.job.queue.name", 
"default");
                        
hbaseConfiguration.set("mapred.child.java.opts","-Xmx1024m");
                        
hbaseConfiguration.setLong(HConstants.HBASE_REGIONSERVER_LEASE_PERIOD_KEY, 
180000);  
                        hbaseConfiguration.setLong("dfs.socket.timeout", 
180000);
                        
hbaseConfiguration.setLong("hbase.htable.threads.keepalivetime",180);
                        
                        
                        String tablename = "news_201411";
                        String tablename_ng = "news_201411_ng";
                        String tablename_rel_ng = "news_rel_ng";
                        HbaseUtil.createRelTable(hbaseConfiguration, 
relngName); //create another table
                        
                        hbaseConfiguration.set("conf.table_name", tablename);
                        hbaseConfiguration.set("conf.table_ng_name", 
tablename_ng);
                        hbaseConfiguration.set("conf.reltable_name", 
tablename_rel_ng);
                        
                        
                        Job job = new 
Job(hbaseConfiguration,"job_rebuilder_"+tablename);
                        job.setJarByClass(RebuildMapper.class);
                
                        List<Scan> scans = new ArrayList<Scan>();
                        Scan scan = new Scan();
                        scan.setCacheBlocks(false);
                        scan.setAttribute(Scan.SCAN_ATTRIBUTES_TABLE_NAME, 
Bytes.toBytes(tablename));
                        scan.setCaching(100);
                        scans.add(scan);
                        TableMapReduceUtil.initTableMapperJob(scans, 
RebuildMapper.class,ImmutableBytesWritable.class ,Put.class,job);
                        job.setReducerClass(PutSortReducer.class);
                        String hfileOutPath = "/user/hadoop/"+tablename_ng ;
                        Path outputDir = new Path(hfileOutPath);
            FileOutputFormat.setOutputPath(job, outputDir);
            job.setMapOutputKeyClass(ImmutableBytesWritable.class);
            job.setMapOutputValueClass(Put.class);
            
            HTable table = new HTable(conf,tablename);
            HFileOutputFormat.configureIncrementalLoad(job, table);

                        System.out.println("set job begin,'" + tablename + "' 
=> '" + tablename_ng + "'");
                        boolean bCompleted = job.waitForCompletion(true);

                }               
                
}

> MR map's input rowkey out of range of current Region 
> -----------------------------------------------------
>
>                 Key: HBASE-12757
>                 URL: https://issues.apache.org/jira/browse/HBASE-12757
>             Project: HBase
>          Issue Type: Bug
>          Components: Client, hbase
>    Affects Versions: 0.94.7
>         Environment: hadoop 1.1.2, r1440782
> hbase 0.94.7
> linux  2.6.32-279.el6.x86_64
>            Reporter: pangxiaoxi
>            Priority: Critical
>
> I excute mapreduce scan all table, sometimes map input value of rowkey is out 
> of range on current Region (get from inputsplit ).
> this mabey  lost data or get unused data.
> ps. I want  to use ImportTSV translate table.....
> eg. 
> location=datanode11,start_row=D9CB114FD09A82A3_0000000000000000_m_43DAAA689D4AFC86
>  ,rowkey=D323E1D0A51E5185_0000000000000000_m_75686B8924108044 
> ,end_row=DB0C4FC44E6D80C1_0000000000000000_m_E956CC65322BA3E5



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to