[ 
https://issues.apache.org/jira/browse/HBASE-25857?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17340180#comment-17340180
 ] 

ZFabrik edited comment on HBASE-25857 at 5/6/21, 12:19 PM:
-----------------------------------------------------------

I think one problem is, that `CellSortImporter` emits the whole cell 
(key+value) noth as key and as value.

Please have a look at the loop inside 
`org.apache.hadoop.hbase.mapreduce.Import.CellSortImporter#map`:
{code:java}
for (Cell kv : value.rawCells()) {
  kv = filterKv(filter, kv);
  // skip if we filtered it out
  if (kv == null) continue;
  Cell ret = convertKv(kv, cfRenameMap);
  context.write(new CellWritableComparable(ret), new 
MapReduceExtendedCell(ret));
}
{code}
AFAICS: Each cell is wrapped inside a `CellWritableComparable` (while only the 
key is necessary) and(!) inside a `MapReduceExtendedCell` (which is then 
consumed by the reducer). Thus the whole data is doubled by the mapper. 
Mapper's ring buffer is not able to contain the stuff and so the data is 
spilled to local disc.

Shouldn't it be enough if the worker would just emit the key and leave the 
value empty? In this case the reducer would simply use the plain key (which 
contains the whole cell value) and ignore the value at all - something like 
this:
{code:java}
public static class CellReducer extends Reducer<CellWritableComparable, Cell, 
ImmutableBytesWritable, Cell> {

  protected void reduce(
      CellWritableComparable row,
      Iterable<Cell> kvs,
      Reducer<CellWritableComparable,
        Cell, ImmutableBytesWritable, Cell>.Context context)
      throws java.io.IOException, InterruptedException {

     // row contains the whole cell data (key+value) 
     context.write(new ImmutableBytesWritable(CellUtil.cloneRow(row.kv)));
  }
}
{code}
This should reduce the amount of data to be processed to 50%.

 


was (Author: zfabrik):
I think one problem is, that `CellSortImporter` emits the whole cell 
(key+value) noth as key and as value.

Please have a look at the loop inside 
`org.apache.hadoop.hbase.mapreduce.Import.CellSortImporter#map`:
{code:java}
for (Cell kv : value.rawCells()) {
  kv = filterKv(filter, kv);
  // skip if we filtered it out
  if (kv == null) continue;
  Cell ret = convertKv(kv, cfRenameMap);
  context.write(new CellWritableComparable(ret), new 
MapReduceExtendedCell(ret));
}
{code}
AFAICS: Each cell is wrapped inside a `CellWritableComparable` (while only the 
key is necessary) and(!) inside a `MapReduceExtendedCell` (which is then 
consumed by the reducer). Thus the whole data is doubled by the mapper. 
Mapper's ring buffer is not able to contain the stuff and so the data is 
spilled to local disc.

Shouldn't it be enough if the worker would just emit the key and leave the 
value empty? In this case the reducer would simply use the plain key (which 
contains the whole cell value) and ignore the value at all - something like 
this:
{code:java}
public static class CellReducer extends Reducer<CellWritableComparable, Cell, 
ImmutableBytesWritable, Cell> {

  protected void reduce(
      CellWritableComparable row,
      Iterable<Cell> kvs,
      Reducer<CellWritableComparable,
        Cell, ImmutableBytesWritable, Cell>.Context context)
      throws java.io.IOException, InterruptedException {

     // row contains the whole cell data (key+value) 
     context.write(new ImmutableBytesWritable(CellUtil.cloneRow(row.kv)));
  }
}
{code}
This should reduce the amount of data to be processed to 50%.

 

> HBase bulk import fails with exitCode -100
> ------------------------------------------
>
>                 Key: HBASE-25857
>                 URL: https://issues.apache.org/jira/browse/HBASE-25857
>             Project: HBase
>          Issue Type: Bug
>    Affects Versions: 2.2.5
>            Reporter: ZFabrik
>            Priority: Major
>
> I want to import the data from an HBase 1.0 cluster to our new HBase 2.2.5 
> cluster.
> Our setup is as follows:
>  * 6 data nodes with 250GB disc space each
>  * total DFS capacity (as reported by Hadoop): 1.46 TB
>  * one additional node running namenode, hmaster, resource manager
> The data I'm trying to import was created by HBase export on the 1.0 HBase 
> cluster and take 14.9 GB in HDFS (due to `hdfs dfs -du -h`)
> The import uses bulk-output with `hasLargeResult=true:`
> {noformat}
> > hdfs dfs -du -h /
> 2.6 G /TMP_IMPORT
> ...
> > yarn jar $HBASE_HOME/lib/hbase-mapreduce-2.2.5.jar import \
>          -Dmapreduce.map.speculative=false \
>          -Dmapreduce.reduce.speculative=false \
>          -Dimport.bulk.output=/HFILES \
>          -Dimport.bulk.hasLargeResult=true \
>          my_table /TMP_IMPORT
> {noformat}
>  
>  Approximately 3 hours later the import fails with this error messages:
> {noformat}
> Error: org.apache.hadoop.mapreduce.task.reduce.Shuffle$ShuffleError: error in 
> shuffle in OnDiskMerger - Thread to merge on-disk map-outputs
>         at 
> org.apache.hadoop.mapreduce.task.reduce.Shuffle.run(Shuffle.java:134)
>         at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:376)
>         at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:175)
>         at java.security.AccessController.doPrivileged(Native Method)
>         at javax.security.auth.Subject.doAs(Subject.java:422)
>         at 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1844)
>         at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:169)
> Caused by: org.apache.hadoop.util.DiskChecker$DiskErrorException: Could not 
> find any valid local directory for 
> opt/seritrack/tt/nosql/data/yarn/usercache/seritrack/appcache/application_1620201940366_0003/output/attempt_1620201940366_0003_r_000000_1/map_40.out
>         at 
> org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.getLocalPathForWrite(LocalDirAllocator.java:446)
>         at 
> org.apache.hadoop.fs.LocalDirAllocator.getLocalPathForWrite(LocalDirAllocator.java:151)
>         at 
> org.apache.hadoop.fs.LocalDirAllocator.getLocalPathForWrite(LocalDirAllocator.java:132)
>         at 
> org.apache.hadoop.mapreduce.task.reduce.MergeManagerImpl$OnDiskMerger.merge(MergeManagerImpl.java:549)
>         at 
> org.apache.hadoop.mapreduce.task.reduce.MergeThread.run(MergeThread.java:94)
> {noformat}
>  
>  The Yarn web UI reports this:
> {noformat}
> AM Container for appattempt_1620201940366_0003_000001 exited with exitCode: 
> -100
> Failing this attempt.Diagnostics: Container released on a *lost* nodeFor more 
> detailed output, check the application tracking page: 
> http://master:8088/cluster/app/application_1620201940366_0003 Then click on 
> links to logs of each attempt.
> {noformat}
> Hadoop name node reports:
> {noformat}
> Configured Capacity: 1.46 TB
> DFS Used:            34.26 GB (2.28%)
> Non DFS Used:        1.33 TB <<<<< !!!
> {noformat}
> We can see that Yarn occupies more than 200GB on each data node (inside 
> .../data/yarn/usercache/xyz/appcache), so that it uses 1.33 TB in total which 
> is almost the whole capacity of 1.5 TB. There are more than 100 files on each 
> data node named like `attempt_1620201940366_0003_m_000035_1_spill_37.out` 
> each of which is 77MB in size.
>  
> So my question is, how can I use bulk import if it needs the 100x amount of 
> disk space as cache?
>  
>  
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to