[
https://issues.apache.org/jira/browse/HBASE-8202?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13618389#comment-13618389
]
David Koch commented on HBASE-8202:
-----------------------------------
Hello,
I asked the original question on the mailing list. Here is a minimalist example
to illustrate the behavior. Run with $quorum != $output_quorum for maximum
effect ;-).
HBase version was 0.92.1-cdh4.1.1.
{code:title=Example.java}
package org.hbase.example;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
/**
* Test to show how hbase.mapred.output.quorum setting is ignored with {@link
MultiTableOutputFormat}.
*
* @author davidkoch
*
* See: https://issues.apache.org/jira/browse/HBASE-8202
*
* Hadoop/HBase configurations are read from command line. Replace environment
variables below.
*
* 1. Test with {@link TableOutputFormat} (Ok):
*
* hadoop jar $jar_name org.hbase.example.Example \
* -D hbase.zookeeper.quorum=$quorum \
* -D hbase.zookeeper.property.clientPort=2181 \
* -D hbase.mapreduce.inputtable=$input_table \
* -D hbase.mapreduce.scan.column.family=$colfam \
* -D hbase.mapred.outputtable=$output_table \
* -D
mapreduce.outputformat.class=org.apache.hadoop.hbase.mapreduce.TableOutputFormat
\
* -D hbase.mapred.output.quorum=$output_quorum:2181:/hbase
*
* 2. Test with {@link MultiTableOutputFormat} (Fails):
*
* hadoop jar $jar_name org.hbase.example.Example \
* -D hbase.zookeeper.quorum=$quorum \
* -D hbase.zookeeper.property.clientPort=2181 \
* -D hbase.mapreduce.inputtable=$input_table \
* -D hbase.mapreduce.scan.column.family=$colfam \
* -D hbase.mapred.outputtable=$output_table \
* -D
mapreduce.outputformat.class=org.apache.hadoop.hbase.mapreduce.MultiTableOutputFormat
\
* -D hbase.mapred.output.quorum=$output_quorum:2181:/hbase
*
* In the second example, the job itself will not fail if $output_table exists
on $quorum but $output_quorum will
* be ignored.
*/
public class Example extends Configured implements Tool {
public static class ExampleMapper extends
TableMapper<ImmutableBytesWritable, Put> {
ImmutableBytesWritable tableName;
@Override
public void setup(Context context) {
tableName = new
ImmutableBytesWritable(context.getConfiguration().get("hbase.mapred.outputtable")
.getBytes());
}
public void map(ImmutableBytesWritable row, Result value, Context
context)
throws IOException, InterruptedException {
Put put = new Put(row.get());
for (KeyValue kv : value.raw()) {
put.add(kv);
}
context.write(tableName, put);
}
}
public int run(String[] args) throws Exception {
Configuration conf = getConf();
Scan scan = new Scan();
scan.addFamily(conf.get("hbase.mapreduce.scan.column.family").getBytes());
String inTable = conf.get("hbase.mapreduce.inputtable");
Job job = new Job(conf);
job.setJobName("Example-HBASE-8202");
TableMapReduceUtil.initTableMapperJob(inTable, scan,
ExampleMapper.class, null, null, job);
job.setJarByClass(Example.class);
job.setNumReduceTasks(0);
return job.waitForCompletion(true) ? 0 : 1;
}
public static void main(String[] args) throws Exception {
int res = ToolRunner.run(new Example(), args);
System.exit(res);
}
}
{code}
> MultiTableOutputFormat should support writing to another HBase cluster
> ----------------------------------------------------------------------
>
> Key: HBASE-8202
> URL: https://issues.apache.org/jira/browse/HBASE-8202
> Project: HBase
> Issue Type: Improvement
> Components: mapreduce
> Reporter: Ted Yu
>
> This was brought up by David Koch in thread 'hbase.mapred.output.quorum
> ignored in Mapper job with HDFS source and HBase sink' where he wanted to
> import a file on HDFS from one cluster A (source) into HBase
> tables on a different cluster B (destination) using a Mapper job with an
> HBase sink.
> Here is my analysis:
> MultiTableOutputFormat doesn't extend TableOutputFormat:
> {code}
> public class MultiTableOutputFormat extends
> OutputFormat<ImmutableBytesWritable, Mutation> {
> {code}
> Relevant configuration w.r.t. output quorum is setup in
> TableOutputFormat#setConf():
> {code}
> public void setConf(Configuration otherConf) {
> this.conf = HBaseConfiguration.create(otherConf);
> String tableName = this.conf.get(OUTPUT_TABLE);
> if(tableName == null || tableName.length() <= 0) {
> throw new IllegalArgumentException("Must specify table name");
> }
> String address = this.conf.get(QUORUM_ADDRESS);
> int zkClientPort = conf.getInt(QUORUM_PORT, 0);
> String serverClass = this.conf.get(REGION_SERVER_CLASS);
> String serverImpl = this.conf.get(REGION_SERVER_IMPL);
> try {
> if (address != null) {
> ZKUtil.applyClusterKeyToConf(this.conf, address);
> }
> {code}
--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira