This is an automated email from the ASF dual-hosted git repository.

zhangduo pushed a commit to branch branch-3
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-3 by this push:
     new 0c188fe5d36 HBASE-28440 Add support for using mapreduce sort in 
HFileOutputFormat2 (#7294)
0c188fe5d36 is described below

commit 0c188fe5d3683772f7ebffca63dcfd327691a4d7
Author: Hernan Romer <[email protected]>
AuthorDate: Wed Sep 24 14:51:32 2025 -0400

    HBASE-28440 Add support for using mapreduce sort in HFileOutputFormat2 
(#7294)
    
    Co-authored-by: Hernan Gelaf-Romer <[email protected]>
    Signed-off-by: Ray Mattingly <[email protected]>
    (cherry picked from commit 608c1b98048523fbfd4d870a49b26e14bad89ba7)
---
 .../backup/impl/IncrementalTableBackupClient.java  |  15 ++-
 .../mapreduce/MapReduceHFileSplitterJob.java       |  36 +++++-
 .../hadoop/hbase/mapreduce/HFileOutputFormat2.java |  32 +++++-
 .../hbase/mapreduce/KeyOnlyCellComparable.java     |  94 +++++++++++++++
 .../hbase/mapreduce/PreSortedCellsReducer.java     |  47 ++++++++
 .../apache/hadoop/hbase/mapreduce/WALPlayer.java   |  38 ++++++-
 .../hadoop/hbase/mapreduce/TestWALPlayer.java      | 126 ++++++++++++++-------
 7 files changed, 327 insertions(+), 61 deletions(-)

diff --git 
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java
 
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java
index cfc5149f369..4fac0ca3c93 100644
--- 
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java
+++ 
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java
@@ -48,6 +48,7 @@ import org.apache.hadoop.hbase.client.Admin;
 import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
 import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.io.hfile.HFile;
+import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2;
 import org.apache.hadoop.hbase.mapreduce.WALPlayer;
 import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
 import org.apache.hadoop.hbase.snapshot.SnapshotManifest;
@@ -351,6 +352,7 @@ public class IncrementalTableBackupClient extends 
TableBackupClient {
   }
 
   protected void incrementalCopyHFiles(String[] files, String backupDest) 
throws IOException {
+    boolean diskBasedSortingOriginalValue = 
HFileOutputFormat2.diskBasedSortingEnabled(conf);
     try {
       LOG.debug("Incremental copy HFiles is starting. dest=" + backupDest);
       // set overall backup phase: incremental_copy
@@ -365,6 +367,7 @@ public class IncrementalTableBackupClient extends 
TableBackupClient {
         LOG.debug("Setting incremental copy HFiles job name to : " + jobname);
       }
       conf.set(JOB_NAME_CONF_KEY, jobname);
+      conf.setBoolean(HFileOutputFormat2.DISK_BASED_SORTING_ENABLED_KEY, true);
 
       BackupCopyJob copyService = BackupRestoreFactory.getBackupCopyJob(conf);
       int res = copyService.copy(backupInfo, backupManager, conf, 
BackupType.INCREMENTAL, strArr);
@@ -377,6 +380,8 @@ public class IncrementalTableBackupClient extends 
TableBackupClient {
         + " finished.");
     } finally {
       deleteBulkLoadDirectory();
+      conf.setBoolean(HFileOutputFormat2.DISK_BASED_SORTING_ENABLED_KEY,
+        diskBasedSortingOriginalValue);
     }
   }
 
@@ -430,6 +435,9 @@ public class IncrementalTableBackupClient extends 
TableBackupClient {
     conf.set(WALPlayer.INPUT_FILES_SEPARATOR_KEY, ";");
     conf.setBoolean(WALPlayer.MULTI_TABLES_SUPPORT, true);
     conf.set(JOB_NAME_CONF_KEY, jobname);
+
+    boolean diskBasedSortingEnabledOriginalValue = 
HFileOutputFormat2.diskBasedSortingEnabled(conf);
+    conf.setBoolean(HFileOutputFormat2.DISK_BASED_SORTING_ENABLED_KEY, true);
     String[] playerArgs = { dirs, StringUtils.join(tableList, ",") };
 
     try {
@@ -438,13 +446,16 @@ public class IncrementalTableBackupClient extends 
TableBackupClient {
       if (result != 0) {
         throw new IOException("WAL Player failed");
       }
-      conf.unset(WALPlayer.INPUT_FILES_SEPARATOR_KEY);
-      conf.unset(JOB_NAME_CONF_KEY);
     } catch (IOException e) {
       throw e;
     } catch (Exception ee) {
       throw new IOException("Can not convert from directory " + dirs
         + " (check Hadoop, HBase and WALPlayer M/R job logs) ", ee);
+    } finally {
+      conf.setBoolean(HFileOutputFormat2.DISK_BASED_SORTING_ENABLED_KEY,
+        diskBasedSortingEnabledOriginalValue);
+      conf.unset(WALPlayer.INPUT_FILES_SEPARATOR_KEY);
+      conf.unset(JOB_NAME_CONF_KEY);
     }
   }
 
diff --git 
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceHFileSplitterJob.java
 
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceHFileSplitterJob.java
index 7d9430914cb..85df58e0946 100644
--- 
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceHFileSplitterJob.java
+++ 
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceHFileSplitterJob.java
@@ -23,6 +23,7 @@ import org.apache.hadoop.conf.Configured;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.ExtendedCell;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.PrivateCellUtil;
 import org.apache.hadoop.hbase.TableName;
@@ -34,11 +35,14 @@ import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.mapreduce.CellSortReducer;
 import org.apache.hadoop.hbase.mapreduce.HFileInputFormat;
 import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2;
+import org.apache.hadoop.hbase.mapreduce.KeyOnlyCellComparable;
+import org.apache.hadoop.hbase.mapreduce.PreSortedCellsReducer;
 import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
 import org.apache.hadoop.hbase.snapshot.SnapshotRegionLocator;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.util.MapReduceExtendedCell;
 import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.Mapper;
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
@@ -72,18 +76,28 @@ public class MapReduceHFileSplitterJob extends Configured 
implements Tool {
   /**
    * A mapper that just writes out cells. This one can be used together with 
{@link CellSortReducer}
    */
-  static class HFileCellMapper extends Mapper<NullWritable, Cell, 
ImmutableBytesWritable, Cell> {
+  static class HFileCellMapper extends Mapper<NullWritable, Cell, 
WritableComparable<?>, Cell> {
+
+    private boolean diskBasedSortingEnabled = false;
 
     @Override
     public void map(NullWritable key, Cell value, Context context)
       throws IOException, InterruptedException {
-      context.write(new ImmutableBytesWritable(CellUtil.cloneRow(value)),
-        new MapReduceExtendedCell(PrivateCellUtil.ensureExtendedCell(value)));
+      ExtendedCell extendedCell = PrivateCellUtil.ensureExtendedCell(value);
+      context.write(wrap(extendedCell), new 
MapReduceExtendedCell(extendedCell));
     }
 
     @Override
     public void setup(Context context) throws IOException {
-      // do nothing
+      diskBasedSortingEnabled =
+        HFileOutputFormat2.diskBasedSortingEnabled(context.getConfiguration());
+    }
+
+    private WritableComparable<?> wrap(ExtendedCell cell) {
+      if (diskBasedSortingEnabled) {
+        return new KeyOnlyCellComparable(cell);
+      }
+      return new ImmutableBytesWritable(CellUtil.cloneRow(cell));
     }
   }
 
@@ -107,13 +121,23 @@ public class MapReduceHFileSplitterJob extends Configured 
implements Tool {
       true);
     job.setJarByClass(MapReduceHFileSplitterJob.class);
     job.setInputFormatClass(HFileInputFormat.class);
-    job.setMapOutputKeyClass(ImmutableBytesWritable.class);
     String hfileOutPath = conf.get(BULK_OUTPUT_CONF_KEY);
+    boolean diskBasedSortingEnabled = 
HFileOutputFormat2.diskBasedSortingEnabled(conf);
+    if (diskBasedSortingEnabled) {
+      job.setMapOutputKeyClass(KeyOnlyCellComparable.class);
+      
job.setSortComparatorClass(KeyOnlyCellComparable.KeyOnlyCellComparator.class);
+    } else {
+      job.setMapOutputKeyClass(ImmutableBytesWritable.class);
+    }
     if (hfileOutPath != null) {
       LOG.debug("add incremental job :" + hfileOutPath + " from " + inputDirs);
       TableName tableName = TableName.valueOf(tabName);
       job.setMapperClass(HFileCellMapper.class);
-      job.setReducerClass(CellSortReducer.class);
+      if (diskBasedSortingEnabled) {
+        job.setReducerClass(PreSortedCellsReducer.class);
+      } else {
+        job.setReducerClass(CellSortReducer.class);
+      }
       Path outputDir = new Path(hfileOutPath);
       FileOutputFormat.setOutputPath(job, outputDir);
       job.setMapOutputValueClass(MapReduceExtendedCell.class);
diff --git 
a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java
 
b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java
index 50b24818bdc..0e81c95677c 100644
--- 
a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java
+++ 
b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java
@@ -51,6 +51,7 @@ import org.apache.hadoop.hbase.ExtendedCell;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HRegionLocation;
 import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.KeyValueUtil;
 import org.apache.hadoop.hbase.PrivateCellUtil;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
@@ -83,6 +84,7 @@ import org.apache.hadoop.hbase.util.MapReduceExtendedCell;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.OutputCommitter;
 import org.apache.hadoop.mapreduce.OutputFormat;
@@ -171,6 +173,11 @@ public class HFileOutputFormat2 extends 
FileOutputFormat<ImmutableBytesWritable,
     "hbase.mapreduce.hfileoutputformat.extendedcell.enabled";
   static final boolean EXTENDED_CELL_SERIALIZATION_ENABLED_DEFULT = false;
 
+  @InterfaceAudience.Private
+  public static final String DISK_BASED_SORTING_ENABLED_KEY =
+    "hbase.mapreduce.hfileoutputformat.disk.based.sorting.enabled";
+  private static final boolean DISK_BASED_SORTING_ENABLED_DEFAULT = false;
+
   public static final String REMOTE_CLUSTER_CONF_PREFIX = 
"hbase.hfileoutputformat.remote.cluster.";
   public static final String REMOTE_CLUSTER_ZOOKEEPER_QUORUM_CONF_KEY =
     REMOTE_CLUSTER_CONF_PREFIX + "zookeeper.quorum";
@@ -548,12 +555,19 @@ public class HFileOutputFormat2 extends 
FileOutputFormat<ImmutableBytesWritable,
 
     // Write the actual file
     FileSystem fs = partitionsPath.getFileSystem(conf);
-    SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf, 
partitionsPath,
-      ImmutableBytesWritable.class, NullWritable.class);
+    boolean diskBasedSortingEnabled = diskBasedSortingEnabled(conf);
+    Class<? extends Writable> keyClass =
+      diskBasedSortingEnabled ? KeyOnlyCellComparable.class : 
ImmutableBytesWritable.class;
+    SequenceFile.Writer writer =
+      SequenceFile.createWriter(fs, conf, partitionsPath, keyClass, 
NullWritable.class);
 
     try {
       for (ImmutableBytesWritable startKey : sorted) {
-        writer.append(startKey, NullWritable.get());
+        Writable writable = diskBasedSortingEnabled
+          ? new 
KeyOnlyCellComparable(KeyValueUtil.createFirstOnRow(startKey.get()))
+          : startKey;
+
+        writer.append(writable, NullWritable.get());
       }
     } finally {
       writer.close();
@@ -600,6 +614,10 @@ public class HFileOutputFormat2 extends 
FileOutputFormat<ImmutableBytesWritable,
     configureIncrementalLoad(job, singleTableInfo, HFileOutputFormat2.class);
   }
 
+  public static boolean diskBasedSortingEnabled(Configuration conf) {
+    return conf.getBoolean(DISK_BASED_SORTING_ENABLED_KEY, 
DISK_BASED_SORTING_ENABLED_DEFAULT);
+  }
+
   static void configureIncrementalLoad(Job job, List<TableInfo> multiTableInfo,
     Class<? extends OutputFormat<?, ?>> cls) throws IOException {
     Configuration conf = job.getConfiguration();
@@ -618,7 +636,13 @@ public class HFileOutputFormat2 extends 
FileOutputFormat<ImmutableBytesWritable,
     // Based on the configured map output class, set the correct reducer to 
properly
     // sort the incoming values.
     // TODO it would be nice to pick one or the other of these formats.
-    if (
+    boolean diskBasedSorting = diskBasedSortingEnabled(conf);
+
+    if (diskBasedSorting) {
+      job.setMapOutputKeyClass(KeyOnlyCellComparable.class);
+      
job.setSortComparatorClass(KeyOnlyCellComparable.KeyOnlyCellComparator.class);
+      job.setReducerClass(PreSortedCellsReducer.class);
+    } else if (
       KeyValue.class.equals(job.getMapOutputValueClass())
         || MapReduceExtendedCell.class.equals(job.getMapOutputValueClass())
     ) {
diff --git 
a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/KeyOnlyCellComparable.java
 
b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/KeyOnlyCellComparable.java
new file mode 100644
index 00000000000..d9b28f8a689
--- /dev/null
+++ 
b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/KeyOnlyCellComparable.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapreduce;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.IOException;
+import org.apache.hadoop.hbase.CellComparator;
+import org.apache.hadoop.hbase.ExtendedCell;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.PrivateCellUtil;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.WritableComparator;
+import org.apache.yetus.audience.InterfaceAudience;
+
[email protected]
+public class KeyOnlyCellComparable implements 
WritableComparable<KeyOnlyCellComparable> {
+
+  static {
+    WritableComparator.define(KeyOnlyCellComparable.class, new 
KeyOnlyCellComparator());
+  }
+
+  private ExtendedCell cell = null;
+
+  public KeyOnlyCellComparable() {
+  }
+
+  public KeyOnlyCellComparable(ExtendedCell cell) {
+    this.cell = cell;
+  }
+
+  public ExtendedCell getCell() {
+    return cell;
+  }
+
+  @Override
+  @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = 
"EQ_COMPARETO_USE_OBJECT_EQUALS",
+      justification = "This is wrong, yes, but we should be purging Writables, 
not fixing them")
+  public int compareTo(KeyOnlyCellComparable o) {
+    return CellComparator.getInstance().compare(cell, o.cell);
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    int keyLen = PrivateCellUtil.estimatedSerializedSizeOfKey(cell);
+    int valueLen = 0; // We avoid writing value here. So just serialize as if 
an empty value.
+    out.writeInt(keyLen + valueLen + KeyValue.KEYVALUE_INFRASTRUCTURE_SIZE);
+    out.writeInt(keyLen);
+    out.writeInt(valueLen);
+    PrivateCellUtil.writeFlatKey(cell, out);
+    out.writeLong(cell.getSequenceId());
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    cell = KeyValue.create(in);
+    long seqId = in.readLong();
+    cell.setSequenceId(seqId);
+  }
+
+  public static class KeyOnlyCellComparator extends WritableComparator {
+
+    @Override
+    public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
+      try (DataInputStream d1 = new DataInputStream(new 
ByteArrayInputStream(b1, s1, l1));
+        DataInputStream d2 = new DataInputStream(new ByteArrayInputStream(b2, 
s2, l2))) {
+        KeyOnlyCellComparable kv1 = new KeyOnlyCellComparable();
+        kv1.readFields(d1);
+        KeyOnlyCellComparable kv2 = new KeyOnlyCellComparable();
+        kv2.readFields(d2);
+        return compare(kv1, kv2);
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+    }
+  }
+}
diff --git 
a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/PreSortedCellsReducer.java
 
b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/PreSortedCellsReducer.java
new file mode 100644
index 00000000000..8f4b2953ec0
--- /dev/null
+++ 
b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/PreSortedCellsReducer.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapreduce;
+
+import java.io.IOException;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.PrivateCellUtil;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.util.MapReduceExtendedCell;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.yetus.audience.InterfaceAudience;
+
[email protected]
+public class PreSortedCellsReducer
+  extends Reducer<KeyOnlyCellComparable, Cell, ImmutableBytesWritable, Cell> {
+
+  @Override
+  protected void reduce(KeyOnlyCellComparable keyComparable, Iterable<Cell> 
values, Context context)
+    throws IOException, InterruptedException {
+
+    int index = 0;
+    ImmutableBytesWritable key =
+      new ImmutableBytesWritable(CellUtil.cloneRow(keyComparable.getCell()));
+    for (Cell cell : values) {
+      context.write(key, new 
MapReduceExtendedCell(PrivateCellUtil.ensureExtendedCell(cell)));
+      if (++index % 100 == 0) {
+        context.setStatus("Wrote " + index + " cells");
+      }
+    }
+  }
+}
diff --git 
a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java
 
b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java
index 5e2dc0902e0..9813118e250 100644
--- 
a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java
+++ 
b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java
@@ -34,6 +34,7 @@ import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.ExtendedCell;
 import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.PrivateCellUtil;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.Connection;
@@ -53,6 +54,7 @@ import org.apache.hadoop.hbase.util.MapReduceExtendedCell;
 import org.apache.hadoop.hbase.wal.WALEdit;
 import org.apache.hadoop.hbase.wal.WALEditInternalHelper;
 import org.apache.hadoop.hbase.wal.WALKey;
+import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.Mapper;
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
@@ -96,9 +98,10 @@ public class WALPlayer extends Configured implements Tool {
    * A mapper that just writes out KeyValues. This one can be used together 
with
    * {@link CellSortReducer}
    */
-  static class WALKeyValueMapper extends Mapper<WALKey, WALEdit, 
ImmutableBytesWritable, Cell> {
+  static class WALKeyValueMapper extends Mapper<WALKey, WALEdit, 
WritableComparable<?>, Cell> {
     private Set<String> tableSet = new HashSet<String>();
     private boolean multiTableSupport = false;
+    private boolean diskBasedSortingEnabled = false;
 
     @Override
     public void map(WALKey key, WALEdit value, Context context) throws 
IOException {
@@ -120,8 +123,8 @@ public class WALPlayer extends Configured implements Tool {
             byte[] outKey = multiTableSupport
               ? Bytes.add(table.getName(), Bytes.toBytes(tableSeparator), 
CellUtil.cloneRow(cell))
               : CellUtil.cloneRow(cell);
-            context.write(new ImmutableBytesWritable(outKey),
-              new 
MapReduceExtendedCell(PrivateCellUtil.ensureExtendedCell(cell)));
+            ExtendedCell extendedCell = 
PrivateCellUtil.ensureExtendedCell(cell);
+            context.write(wrapKey(outKey, extendedCell), new 
MapReduceExtendedCell(extendedCell));
           }
         }
       } catch (InterruptedException e) {
@@ -135,8 +138,23 @@ public class WALPlayer extends Configured implements Tool {
       Configuration conf = context.getConfiguration();
       String[] tables = conf.getStrings(TABLES_KEY);
       this.multiTableSupport = conf.getBoolean(MULTI_TABLES_SUPPORT, false);
+      this.diskBasedSortingEnabled = 
HFileOutputFormat2.diskBasedSortingEnabled(conf);
       Collections.addAll(tableSet, tables);
     }
+
+    private WritableComparable<?> wrapKey(byte[] key, ExtendedCell cell) {
+      if (this.diskBasedSortingEnabled) {
+        // Important to build a new cell with the updated key to maintain 
multi-table support
+        KeyValue kv = new KeyValue(key, 0, key.length, cell.getFamilyArray(),
+          cell.getFamilyOffset(), cell.getFamilyLength(), 
cell.getQualifierArray(),
+          cell.getQualifierOffset(), cell.getQualifierLength(), 
cell.getTimestamp(),
+          KeyValue.Type.codeToType(PrivateCellUtil.getTypeByte(cell)), null, 
0, 0);
+        kv.setSequenceId(cell.getSequenceId());
+        return new KeyOnlyCellComparable(kv);
+      } else {
+        return new ImmutableBytesWritable(key);
+      }
+    }
   }
 
   /**
@@ -313,7 +331,13 @@ public class WALPlayer extends Configured implements Tool {
     job.setJarByClass(WALPlayer.class);
 
     job.setInputFormatClass(WALInputFormat.class);
-    job.setMapOutputKeyClass(ImmutableBytesWritable.class);
+    boolean diskBasedSortingEnabled = 
HFileOutputFormat2.diskBasedSortingEnabled(conf);
+    if (diskBasedSortingEnabled) {
+      job.setMapOutputKeyClass(KeyOnlyCellComparable.class);
+      
job.setSortComparatorClass(KeyOnlyCellComparable.KeyOnlyCellComparator.class);
+    } else {
+      job.setMapOutputKeyClass(ImmutableBytesWritable.class);
+    }
 
     String hfileOutPath = conf.get(BULK_OUTPUT_CONF_KEY);
     if (hfileOutPath != null) {
@@ -328,7 +352,11 @@ public class WALPlayer extends Configured implements Tool {
       List<TableName> tableNames = getTableNameList(tables);
 
       job.setMapperClass(WALKeyValueMapper.class);
-      job.setReducerClass(CellSortReducer.class);
+      if (diskBasedSortingEnabled) {
+        job.setReducerClass(PreSortedCellsReducer.class);
+      } else {
+        job.setReducerClass(CellSortReducer.class);
+      }
       Path outputDir = new Path(hfileOutPath);
       FileOutputFormat.setOutputPath(job, outputDir);
       job.setMapOutputValueClass(MapReduceExtendedCell.class);
diff --git 
a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALPlayer.java
 
b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALPlayer.java
index b39d04802c9..220e9a3793c 100644
--- 
a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALPlayer.java
+++ 
b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALPlayer.java
@@ -64,6 +64,7 @@ import org.apache.hadoop.hbase.util.MapReduceExtendedCell;
 import org.apache.hadoop.hbase.wal.WAL;
 import org.apache.hadoop.hbase.wal.WALEdit;
 import org.apache.hadoop.hbase.wal.WALKey;
+import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapreduce.Mapper;
 import org.apache.hadoop.mapreduce.Mapper.Context;
 import org.apache.hadoop.util.ToolRunner;
@@ -123,19 +124,22 @@ public class TestWALPlayer {
     TEST_UTIL.createTable(tn, TestRecoveredEdits.RECOVEREDEDITS_COLUMNFAMILY);
     // Copy testing recovered.edits file that is over under hbase-server test 
resources
     // up into a dir in our little hdfs cluster here.
-    String hbaseServerTestResourcesEdits =
-      System.getProperty("test.build.classes") + 
"/../../../hbase-server/src/test/resources/"
-        + TestRecoveredEdits.RECOVEREDEDITS_PATH.getName();
-    assertTrue(new File(hbaseServerTestResourcesEdits).exists());
-    FileSystem dfs = TEST_UTIL.getDFSCluster().getFileSystem();
-    // Target dir.
-    Path targetDir = new Path("edits").makeQualified(dfs.getUri(), 
dfs.getHomeDirectory());
-    assertTrue(dfs.mkdirs(targetDir));
-    dfs.copyFromLocalFile(new Path(hbaseServerTestResourcesEdits), targetDir);
-    assertEquals(0,
-      ToolRunner.run(new WALPlayer(this.conf), new String[] { 
targetDir.toString() }));
-    // I don't know how many edits are in this file for this table... so just 
check more than 1.
-    assertTrue(TEST_UTIL.countRows(tn) > 0);
+    runWithDiskBasedSortingDisabledAndEnabled(() -> {
+      String hbaseServerTestResourcesEdits =
+        System.getProperty("test.build.classes") + 
"/../../../hbase-server/src/test/resources/"
+          + TestRecoveredEdits.RECOVEREDEDITS_PATH.getName();
+      assertTrue(new File(hbaseServerTestResourcesEdits).exists());
+      FileSystem dfs = TEST_UTIL.getDFSCluster().getFileSystem();
+      // Target dir.
+      Path targetDir = new Path("edits").makeQualified(dfs.getUri(), 
dfs.getHomeDirectory());
+      assertTrue(dfs.mkdirs(targetDir));
+      dfs.copyFromLocalFile(new Path(hbaseServerTestResourcesEdits), 
targetDir);
+      assertEquals(0,
+        ToolRunner.run(new WALPlayer(this.conf), new String[] { 
targetDir.toString() }));
+      // I don't know how many edits are in this file for this table... so 
just check more than 1.
+      assertTrue(TEST_UTIL.countRows(tn) > 0);
+      dfs.delete(targetDir, true);
+    });
   }
 
   /**
@@ -150,7 +154,7 @@ public class TestWALPlayer {
     final byte[] column1 = Bytes.toBytes("c1");
     final byte[] column2 = Bytes.toBytes("c2");
     final byte[] row = Bytes.toBytes("row");
-    Table table = TEST_UTIL.createTable(tableName, family);
+    final Table table = TEST_UTIL.createTable(tableName, family);
 
     long now = EnvironmentEdgeManager.currentTime();
     // put a row into the first table
@@ -188,28 +192,37 @@ public class TestWALPlayer {
     configuration.setBoolean(WALPlayer.MULTI_TABLES_SUPPORT, true);
 
     WALPlayer player = new WALPlayer(configuration);
-    assertEquals(0, ToolRunner.run(configuration, player,
-      new String[] { walInputDir, tableName.getNameAsString() }));
+    final byte[] finalLastVal = lastVal;
 
-    Get g = new Get(row);
-    Result result = table.get(g);
-    byte[] value = CellUtil.cloneValue(result.getColumnLatestCell(family, 
column1));
-    assertThat(Bytes.toStringBinary(value), 
equalTo(Bytes.toStringBinary(lastVal)));
+    runWithDiskBasedSortingDisabledAndEnabled(() -> {
+      assertEquals(0, ToolRunner.run(configuration, player,
+        new String[] { walInputDir, tableName.getNameAsString() }));
 
-    table = TEST_UTIL.truncateTable(tableName);
-    g = new Get(row);
-    result = table.get(g);
-    assertThat(result.listCells(), nullValue());
+      Get g = new Get(row);
+      Result result = table.get(g);
+      byte[] value = CellUtil.cloneValue(result.getColumnLatestCell(family, 
column1));
+      assertThat(Bytes.toStringBinary(value), 
equalTo(Bytes.toStringBinary(finalLastVal)));
 
-    BulkLoadHFiles.create(configuration).bulkLoad(tableName,
-      new Path(outPath, tableName.getNamespaceAsString() + "/" + 
tableName.getNameAsString()));
+      TEST_UTIL.truncateTable(tableName);
+      g = new Get(row);
+      result = table.get(g);
+      assertThat(result.listCells(), nullValue());
 
-    g = new Get(row);
-    result = table.get(g);
-    value = CellUtil.cloneValue(result.getColumnLatestCell(family, column1));
+      BulkLoadHFiles.create(configuration).bulkLoad(tableName,
+        new Path(outPath, tableName.getNamespaceAsString() + "/" + 
tableName.getNameAsString()));
 
-    assertThat(result.listCells(), notNullValue());
-    assertThat(Bytes.toStringBinary(value), 
equalTo(Bytes.toStringBinary(lastVal)));
+      g = new Get(row);
+      result = table.get(g);
+      value = CellUtil.cloneValue(result.getColumnLatestCell(family, column1));
+
+      assertThat(result.listCells(), notNullValue());
+      assertThat(Bytes.toStringBinary(value), 
equalTo(Bytes.toStringBinary(finalLastVal)));
+
+      // cleanup
+      Path out = new Path(outPath);
+      FileSystem fs = out.getFileSystem(configuration);
+      assertTrue(fs.delete(out, true));
+    });
   }
 
   /**
@@ -244,18 +257,21 @@ public class TestWALPlayer {
 
     Configuration configuration = TEST_UTIL.getConfiguration();
     WALPlayer player = new WALPlayer(configuration);
-    String optionName = "_test_.name";
-    configuration.set(optionName, "1000");
-    player.setupTime(configuration, optionName);
-    assertEquals(1000, configuration.getLong(optionName, 0));
-    assertEquals(0, ToolRunner.run(configuration, player,
-      new String[] { walInputDir, tableName1.getNameAsString(), 
tableName2.getNameAsString() }));
-
-    // verify the WAL was player into table 2
-    Get g = new Get(ROW);
-    Result r = t2.get(g);
-    assertEquals(1, r.size());
-    assertTrue(CellUtil.matchingQualifier(r.rawCells()[0], COLUMN2));
+
+    runWithDiskBasedSortingDisabledAndEnabled(() -> {
+      String optionName = "_test_.name";
+      configuration.set(optionName, "1000");
+      player.setupTime(configuration, optionName);
+      assertEquals(1000, configuration.getLong(optionName, 0));
+      assertEquals(0, ToolRunner.run(configuration, player,
+        new String[] { walInputDir, tableName1.getNameAsString(), 
tableName2.getNameAsString() }));
+
+      // verify the WAL was player into table 2
+      Get g = new Get(ROW);
+      Result r = t2.get(g);
+      assertEquals(1, r.size());
+      assertTrue(CellUtil.matchingQualifier(r.rawCells()[0], COLUMN2));
+    });
   }
 
   /**
@@ -278,7 +294,7 @@ public class TestWALPlayer {
     WALKey key = mock(WALKey.class);
     when(key.getTableName()).thenReturn(TableName.valueOf("table"));
     @SuppressWarnings("unchecked")
-    Mapper<WALKey, WALEdit, ImmutableBytesWritable, Cell>.Context context = 
mock(Context.class);
+    Mapper<WALKey, WALEdit, WritableComparable<?>, Cell>.Context context = 
mock(Context.class);
     when(context.getConfiguration()).thenReturn(configuration);
 
     WALEdit value = mock(WALEdit.class);
@@ -335,7 +351,29 @@ public class TestWALPlayer {
       System.setErr(oldPrintStream);
       System.setSecurityManager(SECURITY_MANAGER);
     }
+  }
+
+  private static void runWithDiskBasedSortingDisabledAndEnabled(TestMethod 
method)
+    throws Exception {
+    
TEST_UTIL.getConfiguration().setBoolean(HFileOutputFormat2.DISK_BASED_SORTING_ENABLED_KEY,
+      false);
+    try {
+      method.run();
+    } finally {
+      
TEST_UTIL.getConfiguration().unset(HFileOutputFormat2.DISK_BASED_SORTING_ENABLED_KEY);
+    }
+
+    
TEST_UTIL.getConfiguration().setBoolean(HFileOutputFormat2.DISK_BASED_SORTING_ENABLED_KEY,
+      true);
+    try {
+      method.run();
+    } finally {
+      
TEST_UTIL.getConfiguration().unset(HFileOutputFormat2.DISK_BASED_SORTING_ENABLED_KEY);
+    }
+  }
 
+  private interface TestMethod {
+    void run() throws Exception;
   }
 
 }

Reply via email to