[
https://issues.apache.org/jira/browse/HBASE-25608?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Shinya Yoshida updated HBASE-25608:
-----------------------------------
Release Note:
Added configurations to specify the ZK cluster key for remote cluster in
HFileOutputFormat2.
Default, input and output are to the cluster specified in Job configuration.
Use HFileOutputformat2#configureRemoteCluster to have output go to a remote
cluster.
HFileOutputFormat2#configureIncrementalLoad(Job, Table, RegionLocator)
configure them using Table's configuration.
You can also configure them by calling
HFileOutputFormat2#configureRemoteCluster explicitly.
> Support HFileOutputFormat locality sensitive even destination cluster is
> different from source cluster
> ------------------------------------------------------------------------------------------------------
>
> Key: HBASE-25608
> URL: https://issues.apache.org/jira/browse/HBASE-25608
> Project: HBase
> Issue Type: Improvement
> Affects Versions: 3.0.0-alpha-1, 1.7.0, 2.4.1, 1.8.0
> Reporter: Shinya Yoshida
> Assignee: Shinya Yoshida
> Priority: Major
> Fix For: 3.0.0-alpha-1
>
>
> Sometimes, we want to perform MR job which is source cluster and destination
> cluster is different like following for data migration, batch job and so on.
>
> {code:java}
> Configuration conf =
> HBaseConfiguration.createClusterConf(HBaseConfiguration.create(),
> sourceClusterKey);
> final Job job = Job.getInstance(conf, jobName);
> // ...
> FileOutputFormat.setOutputPath(job, new Path(outputPath));
> Scan scan = createScanner();
> TableMapReduceUtil.initTableMapperJob(
> sourceTableName, scan,
> Mapper.class,
> ImmutableBytesWritable.class, Put.class, job);
> try (Connection con =
> ConnectionFactory.createConnection(destinationClusterKey);
> Table table = con.getTable(destinationTableName);
> RegionLocator regionLocator =
> con.getRegionLocator(destinationTableName)) {
> HFileOutputFormat2.configureIncrementalLoad(job, table,
> regionLocator);
> }
> return job.waitForCompletion(true) ? 0 : 1;
> {code}
> HFileOutputFormat2 doesn't create locality-sensitive hfiles.
> We got following exception
> {code:java}
> 2021-02-24 19:55:48,298 WARN [main]
> org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2: there's something wrong
> when locating rowkey: xxxxxxxxxxxx
> org.apache.hadoop.hbase.TableNotFoundException: Table 'table' was not found,
> got: XXXX.
> at
> org.apache.hadoop.hbase.client.ConnectionManager$HConnectionImplementation.locateRegionInMeta(ConnectionManager.java:1302)
> at
> org.apache.hadoop.hbase.client.ConnectionManager$HConnectionImplementation.locateRegion(ConnectionManager.java:1181)
> at
> org.apache.hadoop.hbase.client.ConnectionManager$HConnectionImplementation.locateRegion(ConnectionManager.java:1165)
> at
> org.apache.hadoop.hbase.client.ConnectionManager$HConnectionImplementation.locateRegion(ConnectionManager.java:1122)
> at
> org.apache.hadoop.hbase.client.ConnectionManager$HConnectionImplementation.getRegionLocation(ConnectionManager.java:957)
> at
> org.apache.hadoop.hbase.client.HRegionLocator.getRegionLocation(HRegionLocator.java:74)
> at
> org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2$1.write(HFileOutputFormat2.java:216)
> at
> org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2$1.write(HFileOutputFormat2.java:167)
> at
> org.apache.hadoop.mapred.ReduceTask$NewTrackingRecordWriter.write(ReduceTask.java:558)
> at
> org.apache.hadoop.mapreduce.task.TaskInputOutputContextImpl.write(TaskInputOutputContextImpl.java:89)
> at
> org.apache.hadoop.mapreduce.lib.reduce.WrappedReducer$Context.write(WrappedReducer.java:105)
> at
> org.apache.hadoop.hbase.mapreduce.PutSortReducer.reduce(PutSortReducer.java:78)
> at
> org.apache.hadoop.hbase.mapreduce.PutSortReducer.reduce(PutSortReducer.java:43)
> at org.apache.hadoop.mapreduce.Reducer.run(Reducer.java:171)
> at
> org.apache.hadoop.mapred.ReduceTask.runNewReducer(ReduceTask.java:627)
> at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:389)
> at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:164)
> at java.security.AccessController.doPrivileged(Native Method)
> at javax.security.auth.Subject.doAs(Subject.java:422)
> at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1657)
> at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:158)
> {code}
> Because it creates connection using task configuration which is configured
> for source cluster.
> Thus, it tried to connect to the source cluster and get locations for the
> table that should exist in the destination.
> {code:java}
> InetSocketAddress[] favoredNodes = null;
> if (conf.getBoolean(LOCALITY_SENSITIVE_CONF_KEY,
> DEFAULT_LOCALITY_SENSITIVE)) {
> HRegionLocation loc = null;
> String tableName = Bytes.toString(tableNameBytes);
> if (tableName != null) {
> try (Connection connection =
> ConnectionFactory.createConnection(conf);
> RegionLocator locator =
>
> connection.getRegionLocator(TableName.valueOf(tableName))) {
> loc = locator.getRegionLocation(rowKey);
> } catch (Throwable e) {
> LOG.warn("Something wrong locating rowkey {} in {}",
> Bytes.toString(rowKey),
> tableName, e);
> loc = null;
> }
> }
> if (null == loc) {
> LOG.trace("Failed get of location, use default writer {}",
> Bytes.toString(rowKey));
> } else {
> LOG.debug("First rowkey: [{}]", Bytes.toString(rowKey));
> InetSocketAddress initialIsa =
> new InetSocketAddress(loc.getHostname(), loc.getPort());
> if (initialIsa.isUnresolved()) {
> LOG.trace("Failed resolve address {}, use default writer",
> loc.getHostnamePort());
> } else {
> LOG.debug("Use favored nodes writer: {}",
> initialIsa.getHostString());
> favoredNodes = new InetSocketAddress[] { initialIsa };
> }
> }
> }
> wl = getNewWriter(tableNameBytes, family, conf, favoredNodes);
> {code}
> HFileOutputFormat2 should be aware of destination cluster correctly when
> source and destination is different for proper location-sensitive HFile
> generation
--
This message was sent by Atlassian Jira
(v8.3.4#803005)