This is an automated email from the ASF dual-hosted git repository.
zhangduo pushed a commit to branch branch-3
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/branch-3 by this push:
new 0c188fe5d36 HBASE-28440 Add support for using mapreduce sort in
HFileOutputFormat2 (#7294)
0c188fe5d36 is described below
commit 0c188fe5d3683772f7ebffca63dcfd327691a4d7
Author: Hernan Romer <[email protected]>
AuthorDate: Wed Sep 24 14:51:32 2025 -0400
HBASE-28440 Add support for using mapreduce sort in HFileOutputFormat2
(#7294)
Co-authored-by: Hernan Gelaf-Romer <[email protected]>
Signed-off-by: Ray Mattingly <[email protected]>
(cherry picked from commit 608c1b98048523fbfd4d870a49b26e14bad89ba7)
---
.../backup/impl/IncrementalTableBackupClient.java | 15 ++-
.../mapreduce/MapReduceHFileSplitterJob.java | 36 +++++-
.../hadoop/hbase/mapreduce/HFileOutputFormat2.java | 32 +++++-
.../hbase/mapreduce/KeyOnlyCellComparable.java | 94 +++++++++++++++
.../hbase/mapreduce/PreSortedCellsReducer.java | 47 ++++++++
.../apache/hadoop/hbase/mapreduce/WALPlayer.java | 38 ++++++-
.../hadoop/hbase/mapreduce/TestWALPlayer.java | 126 ++++++++++++++-------
7 files changed, 327 insertions(+), 61 deletions(-)
diff --git
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java
index cfc5149f369..4fac0ca3c93 100644
---
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java
+++
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalTableBackupClient.java
@@ -48,6 +48,7 @@ import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.io.hfile.HFile;
+import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2;
import org.apache.hadoop.hbase.mapreduce.WALPlayer;
import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
import org.apache.hadoop.hbase.snapshot.SnapshotManifest;
@@ -351,6 +352,7 @@ public class IncrementalTableBackupClient extends
TableBackupClient {
}
protected void incrementalCopyHFiles(String[] files, String backupDest)
throws IOException {
+ boolean diskBasedSortingOriginalValue =
HFileOutputFormat2.diskBasedSortingEnabled(conf);
try {
LOG.debug("Incremental copy HFiles is starting. dest=" + backupDest);
// set overall backup phase: incremental_copy
@@ -365,6 +367,7 @@ public class IncrementalTableBackupClient extends
TableBackupClient {
LOG.debug("Setting incremental copy HFiles job name to : " + jobname);
}
conf.set(JOB_NAME_CONF_KEY, jobname);
+ conf.setBoolean(HFileOutputFormat2.DISK_BASED_SORTING_ENABLED_KEY, true);
BackupCopyJob copyService = BackupRestoreFactory.getBackupCopyJob(conf);
int res = copyService.copy(backupInfo, backupManager, conf,
BackupType.INCREMENTAL, strArr);
@@ -377,6 +380,8 @@ public class IncrementalTableBackupClient extends
TableBackupClient {
+ " finished.");
} finally {
deleteBulkLoadDirectory();
+ conf.setBoolean(HFileOutputFormat2.DISK_BASED_SORTING_ENABLED_KEY,
+ diskBasedSortingOriginalValue);
}
}
@@ -430,6 +435,9 @@ public class IncrementalTableBackupClient extends
TableBackupClient {
conf.set(WALPlayer.INPUT_FILES_SEPARATOR_KEY, ";");
conf.setBoolean(WALPlayer.MULTI_TABLES_SUPPORT, true);
conf.set(JOB_NAME_CONF_KEY, jobname);
+
+ boolean diskBasedSortingEnabledOriginalValue =
HFileOutputFormat2.diskBasedSortingEnabled(conf);
+ conf.setBoolean(HFileOutputFormat2.DISK_BASED_SORTING_ENABLED_KEY, true);
String[] playerArgs = { dirs, StringUtils.join(tableList, ",") };
try {
@@ -438,13 +446,16 @@ public class IncrementalTableBackupClient extends
TableBackupClient {
if (result != 0) {
throw new IOException("WAL Player failed");
}
- conf.unset(WALPlayer.INPUT_FILES_SEPARATOR_KEY);
- conf.unset(JOB_NAME_CONF_KEY);
} catch (IOException e) {
throw e;
} catch (Exception ee) {
throw new IOException("Can not convert from directory " + dirs
+ " (check Hadoop, HBase and WALPlayer M/R job logs) ", ee);
+ } finally {
+ conf.setBoolean(HFileOutputFormat2.DISK_BASED_SORTING_ENABLED_KEY,
+ diskBasedSortingEnabledOriginalValue);
+ conf.unset(WALPlayer.INPUT_FILES_SEPARATOR_KEY);
+ conf.unset(JOB_NAME_CONF_KEY);
}
}
diff --git
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceHFileSplitterJob.java
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceHFileSplitterJob.java
index 7d9430914cb..85df58e0946 100644
---
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceHFileSplitterJob.java
+++
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceHFileSplitterJob.java
@@ -23,6 +23,7 @@ import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.ExtendedCell;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.PrivateCellUtil;
import org.apache.hadoop.hbase.TableName;
@@ -34,11 +35,14 @@ import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.CellSortReducer;
import org.apache.hadoop.hbase.mapreduce.HFileInputFormat;
import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2;
+import org.apache.hadoop.hbase.mapreduce.KeyOnlyCellComparable;
+import org.apache.hadoop.hbase.mapreduce.PreSortedCellsReducer;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.snapshot.SnapshotRegionLocator;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.MapReduceExtendedCell;
import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
@@ -72,18 +76,28 @@ public class MapReduceHFileSplitterJob extends Configured
implements Tool {
/**
* A mapper that just writes out cells. This one can be used together with
{@link CellSortReducer}
*/
- static class HFileCellMapper extends Mapper<NullWritable, Cell,
ImmutableBytesWritable, Cell> {
+ static class HFileCellMapper extends Mapper<NullWritable, Cell,
WritableComparable<?>, Cell> {
+
+ private boolean diskBasedSortingEnabled = false;
@Override
public void map(NullWritable key, Cell value, Context context)
throws IOException, InterruptedException {
- context.write(new ImmutableBytesWritable(CellUtil.cloneRow(value)),
- new MapReduceExtendedCell(PrivateCellUtil.ensureExtendedCell(value)));
+ ExtendedCell extendedCell = PrivateCellUtil.ensureExtendedCell(value);
+ context.write(wrap(extendedCell), new
MapReduceExtendedCell(extendedCell));
}
@Override
public void setup(Context context) throws IOException {
- // do nothing
+ diskBasedSortingEnabled =
+ HFileOutputFormat2.diskBasedSortingEnabled(context.getConfiguration());
+ }
+
+ private WritableComparable<?> wrap(ExtendedCell cell) {
+ if (diskBasedSortingEnabled) {
+ return new KeyOnlyCellComparable(cell);
+ }
+ return new ImmutableBytesWritable(CellUtil.cloneRow(cell));
}
}
@@ -107,13 +121,23 @@ public class MapReduceHFileSplitterJob extends Configured
implements Tool {
true);
job.setJarByClass(MapReduceHFileSplitterJob.class);
job.setInputFormatClass(HFileInputFormat.class);
- job.setMapOutputKeyClass(ImmutableBytesWritable.class);
String hfileOutPath = conf.get(BULK_OUTPUT_CONF_KEY);
+ boolean diskBasedSortingEnabled =
HFileOutputFormat2.diskBasedSortingEnabled(conf);
+ if (diskBasedSortingEnabled) {
+ job.setMapOutputKeyClass(KeyOnlyCellComparable.class);
+
job.setSortComparatorClass(KeyOnlyCellComparable.KeyOnlyCellComparator.class);
+ } else {
+ job.setMapOutputKeyClass(ImmutableBytesWritable.class);
+ }
if (hfileOutPath != null) {
LOG.debug("add incremental job :" + hfileOutPath + " from " + inputDirs);
TableName tableName = TableName.valueOf(tabName);
job.setMapperClass(HFileCellMapper.class);
- job.setReducerClass(CellSortReducer.class);
+ if (diskBasedSortingEnabled) {
+ job.setReducerClass(PreSortedCellsReducer.class);
+ } else {
+ job.setReducerClass(CellSortReducer.class);
+ }
Path outputDir = new Path(hfileOutPath);
FileOutputFormat.setOutputPath(job, outputDir);
job.setMapOutputValueClass(MapReduceExtendedCell.class);
diff --git
a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java
b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java
index 50b24818bdc..0e81c95677c 100644
---
a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java
+++
b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java
@@ -51,6 +51,7 @@ import org.apache.hadoop.hbase.ExtendedCell;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.KeyValueUtil;
import org.apache.hadoop.hbase.PrivateCellUtil;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
@@ -83,6 +84,7 @@ import org.apache.hadoop.hbase.util.MapReduceExtendedCell;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.OutputFormat;
@@ -171,6 +173,11 @@ public class HFileOutputFormat2 extends
FileOutputFormat<ImmutableBytesWritable,
"hbase.mapreduce.hfileoutputformat.extendedcell.enabled";
static final boolean EXTENDED_CELL_SERIALIZATION_ENABLED_DEFULT = false;
+ @InterfaceAudience.Private
+ public static final String DISK_BASED_SORTING_ENABLED_KEY =
+ "hbase.mapreduce.hfileoutputformat.disk.based.sorting.enabled";
+ private static final boolean DISK_BASED_SORTING_ENABLED_DEFAULT = false;
+
public static final String REMOTE_CLUSTER_CONF_PREFIX =
"hbase.hfileoutputformat.remote.cluster.";
public static final String REMOTE_CLUSTER_ZOOKEEPER_QUORUM_CONF_KEY =
REMOTE_CLUSTER_CONF_PREFIX + "zookeeper.quorum";
@@ -548,12 +555,19 @@ public class HFileOutputFormat2 extends
FileOutputFormat<ImmutableBytesWritable,
// Write the actual file
FileSystem fs = partitionsPath.getFileSystem(conf);
- SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf,
partitionsPath,
- ImmutableBytesWritable.class, NullWritable.class);
+ boolean diskBasedSortingEnabled = diskBasedSortingEnabled(conf);
+ Class<? extends Writable> keyClass =
+ diskBasedSortingEnabled ? KeyOnlyCellComparable.class :
ImmutableBytesWritable.class;
+ SequenceFile.Writer writer =
+ SequenceFile.createWriter(fs, conf, partitionsPath, keyClass,
NullWritable.class);
try {
for (ImmutableBytesWritable startKey : sorted) {
- writer.append(startKey, NullWritable.get());
+ Writable writable = diskBasedSortingEnabled
+ ? new
KeyOnlyCellComparable(KeyValueUtil.createFirstOnRow(startKey.get()))
+ : startKey;
+
+ writer.append(writable, NullWritable.get());
}
} finally {
writer.close();
@@ -600,6 +614,10 @@ public class HFileOutputFormat2 extends
FileOutputFormat<ImmutableBytesWritable,
configureIncrementalLoad(job, singleTableInfo, HFileOutputFormat2.class);
}
+ public static boolean diskBasedSortingEnabled(Configuration conf) {
+ return conf.getBoolean(DISK_BASED_SORTING_ENABLED_KEY,
DISK_BASED_SORTING_ENABLED_DEFAULT);
+ }
+
static void configureIncrementalLoad(Job job, List<TableInfo> multiTableInfo,
Class<? extends OutputFormat<?, ?>> cls) throws IOException {
Configuration conf = job.getConfiguration();
@@ -618,7 +636,13 @@ public class HFileOutputFormat2 extends
FileOutputFormat<ImmutableBytesWritable,
// Based on the configured map output class, set the correct reducer to
properly
// sort the incoming values.
// TODO it would be nice to pick one or the other of these formats.
- if (
+ boolean diskBasedSorting = diskBasedSortingEnabled(conf);
+
+ if (diskBasedSorting) {
+ job.setMapOutputKeyClass(KeyOnlyCellComparable.class);
+
job.setSortComparatorClass(KeyOnlyCellComparable.KeyOnlyCellComparator.class);
+ job.setReducerClass(PreSortedCellsReducer.class);
+ } else if (
KeyValue.class.equals(job.getMapOutputValueClass())
|| MapReduceExtendedCell.class.equals(job.getMapOutputValueClass())
) {
diff --git
a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/KeyOnlyCellComparable.java
b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/KeyOnlyCellComparable.java
new file mode 100644
index 00000000000..d9b28f8a689
--- /dev/null
+++
b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/KeyOnlyCellComparable.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapreduce;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.IOException;
+import org.apache.hadoop.hbase.CellComparator;
+import org.apache.hadoop.hbase.ExtendedCell;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.PrivateCellUtil;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.WritableComparator;
+import org.apache.yetus.audience.InterfaceAudience;
+
[email protected]
+public class KeyOnlyCellComparable implements
WritableComparable<KeyOnlyCellComparable> {
+
+ static {
+ WritableComparator.define(KeyOnlyCellComparable.class, new
KeyOnlyCellComparator());
+ }
+
+ private ExtendedCell cell = null;
+
+ public KeyOnlyCellComparable() {
+ }
+
+ public KeyOnlyCellComparable(ExtendedCell cell) {
+ this.cell = cell;
+ }
+
+ public ExtendedCell getCell() {
+ return cell;
+ }
+
+ @Override
+ @edu.umd.cs.findbugs.annotations.SuppressWarnings(value =
"EQ_COMPARETO_USE_OBJECT_EQUALS",
+ justification = "This is wrong, yes, but we should be purging Writables,
not fixing them")
+ public int compareTo(KeyOnlyCellComparable o) {
+ return CellComparator.getInstance().compare(cell, o.cell);
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ int keyLen = PrivateCellUtil.estimatedSerializedSizeOfKey(cell);
+ int valueLen = 0; // We avoid writing value here. So just serialize as if
an empty value.
+ out.writeInt(keyLen + valueLen + KeyValue.KEYVALUE_INFRASTRUCTURE_SIZE);
+ out.writeInt(keyLen);
+ out.writeInt(valueLen);
+ PrivateCellUtil.writeFlatKey(cell, out);
+ out.writeLong(cell.getSequenceId());
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ cell = KeyValue.create(in);
+ long seqId = in.readLong();
+ cell.setSequenceId(seqId);
+ }
+
+ public static class KeyOnlyCellComparator extends WritableComparator {
+
+ @Override
+ public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
+ try (DataInputStream d1 = new DataInputStream(new
ByteArrayInputStream(b1, s1, l1));
+ DataInputStream d2 = new DataInputStream(new ByteArrayInputStream(b2,
s2, l2))) {
+ KeyOnlyCellComparable kv1 = new KeyOnlyCellComparable();
+ kv1.readFields(d1);
+ KeyOnlyCellComparable kv2 = new KeyOnlyCellComparable();
+ kv2.readFields(d2);
+ return compare(kv1, kv2);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+}
diff --git
a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/PreSortedCellsReducer.java
b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/PreSortedCellsReducer.java
new file mode 100644
index 00000000000..8f4b2953ec0
--- /dev/null
+++
b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/PreSortedCellsReducer.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapreduce;
+
+import java.io.IOException;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.PrivateCellUtil;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.util.MapReduceExtendedCell;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.yetus.audience.InterfaceAudience;
+
[email protected]
+public class PreSortedCellsReducer
+ extends Reducer<KeyOnlyCellComparable, Cell, ImmutableBytesWritable, Cell> {
+
+ @Override
+ protected void reduce(KeyOnlyCellComparable keyComparable, Iterable<Cell>
values, Context context)
+ throws IOException, InterruptedException {
+
+ int index = 0;
+ ImmutableBytesWritable key =
+ new ImmutableBytesWritable(CellUtil.cloneRow(keyComparable.getCell()));
+ for (Cell cell : values) {
+ context.write(key, new
MapReduceExtendedCell(PrivateCellUtil.ensureExtendedCell(cell)));
+ if (++index % 100 == 0) {
+ context.setStatus("Wrote " + index + " cells");
+ }
+ }
+ }
+}
diff --git
a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java
b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java
index 5e2dc0902e0..9813118e250 100644
---
a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java
+++
b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java
@@ -34,6 +34,7 @@ import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.ExtendedCell;
import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.PrivateCellUtil;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
@@ -53,6 +54,7 @@ import org.apache.hadoop.hbase.util.MapReduceExtendedCell;
import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.hadoop.hbase.wal.WALEditInternalHelper;
import org.apache.hadoop.hbase.wal.WALKey;
+import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
@@ -96,9 +98,10 @@ public class WALPlayer extends Configured implements Tool {
* A mapper that just writes out KeyValues. This one can be used together
with
* {@link CellSortReducer}
*/
- static class WALKeyValueMapper extends Mapper<WALKey, WALEdit,
ImmutableBytesWritable, Cell> {
+ static class WALKeyValueMapper extends Mapper<WALKey, WALEdit,
WritableComparable<?>, Cell> {
private Set<String> tableSet = new HashSet<String>();
private boolean multiTableSupport = false;
+ private boolean diskBasedSortingEnabled = false;
@Override
public void map(WALKey key, WALEdit value, Context context) throws
IOException {
@@ -120,8 +123,8 @@ public class WALPlayer extends Configured implements Tool {
byte[] outKey = multiTableSupport
? Bytes.add(table.getName(), Bytes.toBytes(tableSeparator),
CellUtil.cloneRow(cell))
: CellUtil.cloneRow(cell);
- context.write(new ImmutableBytesWritable(outKey),
- new
MapReduceExtendedCell(PrivateCellUtil.ensureExtendedCell(cell)));
+ ExtendedCell extendedCell =
PrivateCellUtil.ensureExtendedCell(cell);
+ context.write(wrapKey(outKey, extendedCell), new
MapReduceExtendedCell(extendedCell));
}
}
} catch (InterruptedException e) {
@@ -135,8 +138,23 @@ public class WALPlayer extends Configured implements Tool {
Configuration conf = context.getConfiguration();
String[] tables = conf.getStrings(TABLES_KEY);
this.multiTableSupport = conf.getBoolean(MULTI_TABLES_SUPPORT, false);
+ this.diskBasedSortingEnabled =
HFileOutputFormat2.diskBasedSortingEnabled(conf);
Collections.addAll(tableSet, tables);
}
+
+ private WritableComparable<?> wrapKey(byte[] key, ExtendedCell cell) {
+ if (this.diskBasedSortingEnabled) {
+ // Important to build a new cell with the updated key to maintain
multi-table support
+ KeyValue kv = new KeyValue(key, 0, key.length, cell.getFamilyArray(),
+ cell.getFamilyOffset(), cell.getFamilyLength(),
cell.getQualifierArray(),
+ cell.getQualifierOffset(), cell.getQualifierLength(),
cell.getTimestamp(),
+ KeyValue.Type.codeToType(PrivateCellUtil.getTypeByte(cell)), null,
0, 0);
+ kv.setSequenceId(cell.getSequenceId());
+ return new KeyOnlyCellComparable(kv);
+ } else {
+ return new ImmutableBytesWritable(key);
+ }
+ }
}
/**
@@ -313,7 +331,13 @@ public class WALPlayer extends Configured implements Tool {
job.setJarByClass(WALPlayer.class);
job.setInputFormatClass(WALInputFormat.class);
- job.setMapOutputKeyClass(ImmutableBytesWritable.class);
+ boolean diskBasedSortingEnabled =
HFileOutputFormat2.diskBasedSortingEnabled(conf);
+ if (diskBasedSortingEnabled) {
+ job.setMapOutputKeyClass(KeyOnlyCellComparable.class);
+
job.setSortComparatorClass(KeyOnlyCellComparable.KeyOnlyCellComparator.class);
+ } else {
+ job.setMapOutputKeyClass(ImmutableBytesWritable.class);
+ }
String hfileOutPath = conf.get(BULK_OUTPUT_CONF_KEY);
if (hfileOutPath != null) {
@@ -328,7 +352,11 @@ public class WALPlayer extends Configured implements Tool {
List<TableName> tableNames = getTableNameList(tables);
job.setMapperClass(WALKeyValueMapper.class);
- job.setReducerClass(CellSortReducer.class);
+ if (diskBasedSortingEnabled) {
+ job.setReducerClass(PreSortedCellsReducer.class);
+ } else {
+ job.setReducerClass(CellSortReducer.class);
+ }
Path outputDir = new Path(hfileOutPath);
FileOutputFormat.setOutputPath(job, outputDir);
job.setMapOutputValueClass(MapReduceExtendedCell.class);
diff --git
a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALPlayer.java
b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALPlayer.java
index b39d04802c9..220e9a3793c 100644
---
a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALPlayer.java
+++
b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALPlayer.java
@@ -64,6 +64,7 @@ import org.apache.hadoop.hbase.util.MapReduceExtendedCell;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.hadoop.hbase.wal.WALKey;
+import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Mapper.Context;
import org.apache.hadoop.util.ToolRunner;
@@ -123,19 +124,22 @@ public class TestWALPlayer {
TEST_UTIL.createTable(tn, TestRecoveredEdits.RECOVEREDEDITS_COLUMNFAMILY);
// Copy testing recovered.edits file that is over under hbase-server test
resources
// up into a dir in our little hdfs cluster here.
- String hbaseServerTestResourcesEdits =
- System.getProperty("test.build.classes") +
"/../../../hbase-server/src/test/resources/"
- + TestRecoveredEdits.RECOVEREDEDITS_PATH.getName();
- assertTrue(new File(hbaseServerTestResourcesEdits).exists());
- FileSystem dfs = TEST_UTIL.getDFSCluster().getFileSystem();
- // Target dir.
- Path targetDir = new Path("edits").makeQualified(dfs.getUri(),
dfs.getHomeDirectory());
- assertTrue(dfs.mkdirs(targetDir));
- dfs.copyFromLocalFile(new Path(hbaseServerTestResourcesEdits), targetDir);
- assertEquals(0,
- ToolRunner.run(new WALPlayer(this.conf), new String[] {
targetDir.toString() }));
- // I don't know how many edits are in this file for this table... so just
check more than 1.
- assertTrue(TEST_UTIL.countRows(tn) > 0);
+ runWithDiskBasedSortingDisabledAndEnabled(() -> {
+ String hbaseServerTestResourcesEdits =
+ System.getProperty("test.build.classes") +
"/../../../hbase-server/src/test/resources/"
+ + TestRecoveredEdits.RECOVEREDEDITS_PATH.getName();
+ assertTrue(new File(hbaseServerTestResourcesEdits).exists());
+ FileSystem dfs = TEST_UTIL.getDFSCluster().getFileSystem();
+ // Target dir.
+ Path targetDir = new Path("edits").makeQualified(dfs.getUri(),
dfs.getHomeDirectory());
+ assertTrue(dfs.mkdirs(targetDir));
+ dfs.copyFromLocalFile(new Path(hbaseServerTestResourcesEdits),
targetDir);
+ assertEquals(0,
+ ToolRunner.run(new WALPlayer(this.conf), new String[] {
targetDir.toString() }));
+ // I don't know how many edits are in this file for this table... so
just check more than 1.
+ assertTrue(TEST_UTIL.countRows(tn) > 0);
+ dfs.delete(targetDir, true);
+ });
}
/**
@@ -150,7 +154,7 @@ public class TestWALPlayer {
final byte[] column1 = Bytes.toBytes("c1");
final byte[] column2 = Bytes.toBytes("c2");
final byte[] row = Bytes.toBytes("row");
- Table table = TEST_UTIL.createTable(tableName, family);
+ final Table table = TEST_UTIL.createTable(tableName, family);
long now = EnvironmentEdgeManager.currentTime();
// put a row into the first table
@@ -188,28 +192,37 @@ public class TestWALPlayer {
configuration.setBoolean(WALPlayer.MULTI_TABLES_SUPPORT, true);
WALPlayer player = new WALPlayer(configuration);
- assertEquals(0, ToolRunner.run(configuration, player,
- new String[] { walInputDir, tableName.getNameAsString() }));
+ final byte[] finalLastVal = lastVal;
- Get g = new Get(row);
- Result result = table.get(g);
- byte[] value = CellUtil.cloneValue(result.getColumnLatestCell(family,
column1));
- assertThat(Bytes.toStringBinary(value),
equalTo(Bytes.toStringBinary(lastVal)));
+ runWithDiskBasedSortingDisabledAndEnabled(() -> {
+ assertEquals(0, ToolRunner.run(configuration, player,
+ new String[] { walInputDir, tableName.getNameAsString() }));
- table = TEST_UTIL.truncateTable(tableName);
- g = new Get(row);
- result = table.get(g);
- assertThat(result.listCells(), nullValue());
+ Get g = new Get(row);
+ Result result = table.get(g);
+ byte[] value = CellUtil.cloneValue(result.getColumnLatestCell(family,
column1));
+ assertThat(Bytes.toStringBinary(value),
equalTo(Bytes.toStringBinary(finalLastVal)));
- BulkLoadHFiles.create(configuration).bulkLoad(tableName,
- new Path(outPath, tableName.getNamespaceAsString() + "/" +
tableName.getNameAsString()));
+ TEST_UTIL.truncateTable(tableName);
+ g = new Get(row);
+ result = table.get(g);
+ assertThat(result.listCells(), nullValue());
- g = new Get(row);
- result = table.get(g);
- value = CellUtil.cloneValue(result.getColumnLatestCell(family, column1));
+ BulkLoadHFiles.create(configuration).bulkLoad(tableName,
+ new Path(outPath, tableName.getNamespaceAsString() + "/" +
tableName.getNameAsString()));
- assertThat(result.listCells(), notNullValue());
- assertThat(Bytes.toStringBinary(value),
equalTo(Bytes.toStringBinary(lastVal)));
+ g = new Get(row);
+ result = table.get(g);
+ value = CellUtil.cloneValue(result.getColumnLatestCell(family, column1));
+
+ assertThat(result.listCells(), notNullValue());
+ assertThat(Bytes.toStringBinary(value),
equalTo(Bytes.toStringBinary(finalLastVal)));
+
+ // cleanup
+ Path out = new Path(outPath);
+ FileSystem fs = out.getFileSystem(configuration);
+ assertTrue(fs.delete(out, true));
+ });
}
/**
@@ -244,18 +257,21 @@ public class TestWALPlayer {
Configuration configuration = TEST_UTIL.getConfiguration();
WALPlayer player = new WALPlayer(configuration);
- String optionName = "_test_.name";
- configuration.set(optionName, "1000");
- player.setupTime(configuration, optionName);
- assertEquals(1000, configuration.getLong(optionName, 0));
- assertEquals(0, ToolRunner.run(configuration, player,
- new String[] { walInputDir, tableName1.getNameAsString(),
tableName2.getNameAsString() }));
-
- // verify the WAL was player into table 2
- Get g = new Get(ROW);
- Result r = t2.get(g);
- assertEquals(1, r.size());
- assertTrue(CellUtil.matchingQualifier(r.rawCells()[0], COLUMN2));
+
+ runWithDiskBasedSortingDisabledAndEnabled(() -> {
+ String optionName = "_test_.name";
+ configuration.set(optionName, "1000");
+ player.setupTime(configuration, optionName);
+ assertEquals(1000, configuration.getLong(optionName, 0));
+ assertEquals(0, ToolRunner.run(configuration, player,
+ new String[] { walInputDir, tableName1.getNameAsString(),
tableName2.getNameAsString() }));
+
+ // verify the WAL was player into table 2
+ Get g = new Get(ROW);
+ Result r = t2.get(g);
+ assertEquals(1, r.size());
+ assertTrue(CellUtil.matchingQualifier(r.rawCells()[0], COLUMN2));
+ });
}
/**
@@ -278,7 +294,7 @@ public class TestWALPlayer {
WALKey key = mock(WALKey.class);
when(key.getTableName()).thenReturn(TableName.valueOf("table"));
@SuppressWarnings("unchecked")
- Mapper<WALKey, WALEdit, ImmutableBytesWritable, Cell>.Context context =
mock(Context.class);
+ Mapper<WALKey, WALEdit, WritableComparable<?>, Cell>.Context context =
mock(Context.class);
when(context.getConfiguration()).thenReturn(configuration);
WALEdit value = mock(WALEdit.class);
@@ -335,7 +351,29 @@ public class TestWALPlayer {
System.setErr(oldPrintStream);
System.setSecurityManager(SECURITY_MANAGER);
}
+ }
+
+ private static void runWithDiskBasedSortingDisabledAndEnabled(TestMethod
method)
+ throws Exception {
+
TEST_UTIL.getConfiguration().setBoolean(HFileOutputFormat2.DISK_BASED_SORTING_ENABLED_KEY,
+ false);
+ try {
+ method.run();
+ } finally {
+
TEST_UTIL.getConfiguration().unset(HFileOutputFormat2.DISK_BASED_SORTING_ENABLED_KEY);
+ }
+
+
TEST_UTIL.getConfiguration().setBoolean(HFileOutputFormat2.DISK_BASED_SORTING_ENABLED_KEY,
+ true);
+ try {
+ method.run();
+ } finally {
+
TEST_UTIL.getConfiguration().unset(HFileOutputFormat2.DISK_BASED_SORTING_ENABLED_KEY);
+ }
+ }
+ private interface TestMethod {
+ void run() throws Exception;
}
}