This is an automated email from the ASF dual-hosted git repository.
junegunn pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/master by this push:
new adbccfe3b0f HBASE-30159 Make hash algorithm configurable for HashTable
and SyncTable (#8321)
adbccfe3b0f is described below
commit adbccfe3b0f8584e9f7c7803b5656ab363e55099
Author: Jinhyuk Kim <[email protected]>
AuthorDate: Sun Jun 7 18:09:25 2026 +0900
HBASE-30159 Make hash algorithm configurable for HashTable and SyncTable
(#8321)
Signed-off-by: Junegunn Choi <[email protected]>
---
.../apache/hadoop/hbase/mapreduce/HashTable.java | 46 ++++--
.../apache/hadoop/hbase/mapreduce/SyncTable.java | 2 +-
.../hadoop/hbase/mapreduce/TestHashTable.java | 171 +++++++++++++++++++++
.../hadoop/hbase/mapreduce/TestSyncTable.java | 31 ++++
4 files changed, 240 insertions(+), 10 deletions(-)
diff --git
a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HashTable.java
b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HashTable.java
index ccaf55e5025..34696cd5066 100644
---
a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HashTable.java
+++
b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HashTable.java
@@ -58,7 +58,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.base.Charsets;
-import org.apache.hbase.thirdparty.com.google.common.base.Throwables;
import org.apache.hbase.thirdparty.com.google.common.collect.Ordering;
@InterfaceAudience.Private
@@ -68,7 +67,14 @@ public class HashTable extends Configured implements Tool {
private static final int DEFAULT_BATCH_SIZE = 8000;
+ /**
+ * Default hash algorithm. Kept as MD5 so that manifests produced by older
versions, which did not
+ * record an algorithm, remain readable.
+ */
+ static final String DEFAULT_HASH_ALGORITHM = "MD5";
+
private final static String HASH_BATCH_SIZE_CONF_KEY = "hash.batch.size";
+ final static String HASH_ALGORITHM_CONF_KEY = "hash.algorithm";
final static String PARTITIONS_FILE_NAME = "partitions";
final static String MANIFEST_FILE_NAME = "manifest";
final static String HASH_DATA_DIR = "hashes";
@@ -99,6 +105,7 @@ public class HashTable extends Configured implements Tool {
long endTime = 0;
boolean ignoreTimestamps;
boolean rawScan;
+ String hashAlgorithm = DEFAULT_HASH_ALGORITHM;
List<ImmutableBytesWritable> partitions;
@@ -138,6 +145,7 @@ public class HashTable extends Configured implements Tool {
p.setProperty("endTimestamp", Long.toString(endTime));
}
p.setProperty("rawScan", Boolean.toString(rawScan));
+ p.setProperty("hashAlgorithm", hashAlgorithm);
try (OutputStreamWriter osw = new OutputStreamWriter(fs.create(path),
Charsets.UTF_8)) {
p.store(osw, null);
@@ -189,6 +197,8 @@ public class HashTable extends Configured implements Tool {
if (endTimeString != null) {
endTime = Long.parseLong(endTimeString);
}
+
+ hashAlgorithm = p.getProperty("hashAlgorithm", DEFAULT_HASH_ALGORITHM);
}
Scan initScan() throws IOException {
@@ -316,6 +326,7 @@ public class HashTable extends Configured implements Tool {
sb.append(", versions=").append(versions);
}
sb.append(", rawScan=").append(rawScan);
+ sb.append(", hashAlgorithm=").append(hashAlgorithm);
if (startTime != 0) {
sb.append("startTime=").append(startTime);
}
@@ -448,6 +459,7 @@ public class HashTable extends Configured implements Tool {
Configuration jobConf = job.getConfiguration();
jobConf.setLong(HASH_BATCH_SIZE_CONF_KEY, tableHash.batchSize);
jobConf.setBoolean(IGNORE_TIMESTAMPS, tableHash.ignoreTimestamps);
+ jobConf.set(HASH_ALGORITHM_CONF_KEY, tableHash.hashAlgorithm);
job.setJarByClass(HashTable.class);
TableMapReduceUtil.initTableMapperJob(tableHash.tableName,
tableHash.initScan(),
@@ -487,11 +499,11 @@ public class HashTable extends Configured implements Tool
{
private long batchSize = 0;
boolean ignoreTimestamps;
- public ResultHasher() {
+ public ResultHasher(String algorithm) {
try {
- digest = MessageDigest.getInstance("MD5");
+ digest = MessageDigest.getInstance(algorithm);
} catch (NoSuchAlgorithmException e) {
- Throwables.propagate(e);
+ throw new IllegalArgumentException("Unsupported hash algorithm: " +
algorithm, e);
}
}
@@ -566,10 +578,10 @@ public class HashTable extends Configured implements Tool
{
@Override
protected void setup(Context context) throws IOException,
InterruptedException {
- targetBatchSize =
- context.getConfiguration().getLong(HASH_BATCH_SIZE_CONF_KEY,
DEFAULT_BATCH_SIZE);
- hasher = new ResultHasher();
- hasher.ignoreTimestamps =
context.getConfiguration().getBoolean(IGNORE_TIMESTAMPS, false);
+ Configuration conf = context.getConfiguration();
+ targetBatchSize = conf.getLong(HASH_BATCH_SIZE_CONF_KEY,
DEFAULT_BATCH_SIZE);
+ hasher = new ResultHasher(conf.get(HASH_ALGORITHM_CONF_KEY,
DEFAULT_HASH_ALGORITHM));
+ hasher.ignoreTimestamps = conf.getBoolean(IGNORE_TIMESTAMPS, false);
TableSplit split = (TableSplit) context.getInputSplit();
hasher.startBatch(new ImmutableBytesWritable(split.getStartRow()));
}
@@ -640,6 +652,9 @@ public class HashTable extends Configured implements Tool {
System.err.println(" families comma-separated list of families to
include");
System.err.println(" ignoreTimestamps if true, ignores cell timestamps");
System.err.println(" when calculating hashes");
+ System.err.println(" hashAlgorithm MessageDigest algorithm to use for
batch hashes");
+ System.err.println(" examples: MD5, SHA-256, SHA-384,
SHA-512");
+ System.err.println(" (defaults to " +
DEFAULT_HASH_ALGORITHM + ")");
System.err.println();
System.err.println("Args:");
System.err.println(" tablename Name of the table to hash");
@@ -665,7 +680,7 @@ public class HashTable extends Configured implements Tool {
for (int i = 0; i < args.length - NUM_ARGS; i++) {
String cmd = args[i];
- if (cmd.equals("-h") || cmd.startsWith("--h")) {
+ if (cmd.equals("-h") || cmd.equals("--help")) {
printUsage(null);
return false;
}
@@ -737,6 +752,12 @@ public class HashTable extends Configured implements Tool {
continue;
}
+ final String hashAlgorithmKey = "--hashAlgorithm=";
+ if (cmd.startsWith(hashAlgorithmKey)) {
+ tableHash.hashAlgorithm = cmd.substring(hashAlgorithmKey.length());
+ continue;
+ }
+
printUsage("Invalid argument '" + cmd + "'");
return false;
}
@@ -749,6 +770,13 @@ public class HashTable extends Configured implements Tool {
return false;
}
+ try {
+ MessageDigest.getInstance(tableHash.hashAlgorithm);
+ } catch (NoSuchAlgorithmException e) {
+ printUsage("Unsupported hash algorithm: " + tableHash.hashAlgorithm);
+ return false;
+ }
+
} catch (Exception e) {
LOG.error("Failed to parse commandLine arguments", e);
printUsage("Can't start because " + e.getMessage());
diff --git
a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/SyncTable.java
b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/SyncTable.java
index 3b083b33dbd..5598f422d87 100644
---
a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/SyncTable.java
+++
b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/SyncTable.java
@@ -270,7 +270,7 @@ public class SyncTable extends Configured implements Tool {
// create a hasher, but don't start it right away
// instead, find the first hash batch at or after the start row
// and skip any rows that come before. they will be caught by the
previous task
- targetHasher = new HashTable.ResultHasher();
+ targetHasher = new HashTable.ResultHasher(sourceTableHash.hashAlgorithm);
targetHasher.ignoreTimestamps = ignoreTimestamp;
}
diff --git
a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHashTable.java
b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHashTable.java
index ec4d3ce3f02..f793a8f72a0 100644
---
a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHashTable.java
+++
b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHashTable.java
@@ -20,8 +20,10 @@ package org.apache.hadoop.hbase.mapreduce;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.fail;
+import java.io.OutputStream;
import java.util.HashMap;
import java.util.Map;
+import java.util.Properties;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -181,4 +183,173 @@ public class TestHashTable {
TEST_UTIL.deleteTable(tableName);
TEST_UTIL.cleanupDataTestDirOnTestFS();
}
+
+ @Test
+ public void testHashTableWithSha256(TestInfo testInfo) throws Exception {
+ final TableName tableName =
TableName.valueOf(testInfo.getTestMethod().get().getName());
+ final byte[] family = Bytes.toBytes("family");
+ final byte[] column1 = Bytes.toBytes("c1");
+ final byte[] column2 = Bytes.toBytes("c2");
+ final byte[] column3 = Bytes.toBytes("c3");
+
+ int numRows = 100;
+ int numRegions = 10;
+ int numHashFiles = 3;
+
+ byte[][] splitRows = new byte[numRegions - 1][];
+ for (int i = 1; i < numRegions; i++) {
+ splitRows[i - 1] = Bytes.toBytes(numRows * i / numRegions);
+ }
+
+ long timestamp = 1430764183454L;
+ Table t1 = TEST_UTIL.createTable(tableName, family, splitRows);
+ for (int i = 0; i < numRows; i++) {
+ Put p = new Put(Bytes.toBytes(i), timestamp);
+ p.addColumn(family, column1, column1);
+ p.addColumn(family, column2, column2);
+ p.addColumn(family, column3, column3);
+ t1.put(p);
+ }
+ t1.close();
+
+ HashTable hashTable = new HashTable(TEST_UTIL.getConfiguration());
+ Path testDir =
TEST_UTIL.getDataTestDirOnTestFS(tableName.getNameAsString());
+
+ long batchSize = 300;
+ int code = hashTable.run(
+ new String[] { "--batchsize=" + batchSize, "--numhashfiles=" +
numHashFiles, "--scanbatch=2",
+ "--hashAlgorithm=SHA-256", tableName.getNameAsString(),
testDir.toString() });
+ assertEquals(0, code, "test job failed");
+
+ FileSystem fs = TEST_UTIL.getTestFileSystem();
+ HashTable.TableHash tableHash = HashTable.TableHash.read(fs.getConf(),
testDir);
+ assertEquals("SHA-256", tableHash.hashAlgorithm,
+ "manifest must record the algorithm used to produce the digests");
+
+ ImmutableMap<Integer, ImmutableBytesWritable> expectedHashes =
+ ImmutableMap.<Integer, ImmutableBytesWritable> builder()
+ .put(-1,
+ new ImmutableBytesWritable(
+
Bytes.fromHex("e1452041ec73beb9b5677c0b74ed73a9118ca502d9b2b9abe62ba18d92fc51be")))
+ .put(5,
+ new ImmutableBytesWritable(
+
Bytes.fromHex("6c55999354be6571a8912b7781d37359e88153173cb5fcf143211682d7ac06c5")))
+ .put(10,
+ new ImmutableBytesWritable(
+
Bytes.fromHex("44e9c6aedd838e9a9c78d1983ce5139fda3ef3b0b8a3812893512e7c6f05c51f")))
+ .put(15,
+ new ImmutableBytesWritable(
+
Bytes.fromHex("57f18d831a22057155fb9cfb9bf4706a1c2cf134fc1e92262c8748ca545b58a1")))
+ .put(20,
+ new ImmutableBytesWritable(
+
Bytes.fromHex("83119dfb3deec8901f69cccac31c1039c624870de0cff4b85946147359966d42")))
+ .put(25,
+ new ImmutableBytesWritable(
+
Bytes.fromHex("933eabcc837b7e7b24be2b553b1ec50fb00e95cbf3d8f898f59f5f8bbace0a60")))
+ .put(30,
+ new ImmutableBytesWritable(
+
Bytes.fromHex("b6a3752581d74f362f64b59d56a96ad52763b3245dd5bfc85f6fe9261f2d03f1")))
+ .put(35,
+ new ImmutableBytesWritable(
+
Bytes.fromHex("d3784bac940584dbc0754eff73bc39cce4f9c4aec87939747fff4b0ecc6a0617")))
+ .put(40,
+ new ImmutableBytesWritable(
+
Bytes.fromHex("87f4b810b751abd64e9c22cb7b40b5ce600965e4b8eda2c0eae075d5623088c2")))
+ .put(45,
+ new ImmutableBytesWritable(
+
Bytes.fromHex("ce1f422fcdbe0f926e10b68cb3ead497066560235a1341d29151a9e1847deaab")))
+ .put(50,
+ new ImmutableBytesWritable(
+
Bytes.fromHex("118c771b1eeabe8523f1ad96fb5bf16537d76e0b3855d84c3dbac864de726229")))
+ .put(55,
+ new ImmutableBytesWritable(
+
Bytes.fromHex("00dfe840a275aca3de9268ea61699881a441d47fea93071bca69c39bf7845dac")))
+ .put(60,
+ new ImmutableBytesWritable(
+
Bytes.fromHex("062239ede0306fd9046eb5a3a2f66d997b37c8c1a4defc35789644e66930fff1")))
+ .put(65,
+ new ImmutableBytesWritable(
+
Bytes.fromHex("09a63a94681e75edf975f9b46fe94f1e592840a627cac728a77728b7f9f695aa")))
+ .put(70,
+ new ImmutableBytesWritable(
+
Bytes.fromHex("e634097804d269cbaeef49ce7a009a1388e6f636700badcab05fe20759f6043f")))
+ .put(75,
+ new ImmutableBytesWritable(
+
Bytes.fromHex("69f614ccc16a9c651538681525be1b2e40859c9833a55d9009d77ef39abaffcd")))
+ .put(80,
+ new ImmutableBytesWritable(
+
Bytes.fromHex("6530b957c8064fc043620bee89647960de0d27a0f986b40f183f5347093a12d2")))
+ .put(85,
+ new ImmutableBytesWritable(
+
Bytes.fromHex("403ed0417cd8ab955cbd4c8fe84218cd152b95da9237300050e9b7c90c809faf")))
+ .put(90,
+ new ImmutableBytesWritable(
+
Bytes.fromHex("e27fb9193ae3363fec70a148e62df7c57d514dd7de74a6a332fbda002af67efb")))
+ .put(95,
+ new ImmutableBytesWritable(
+
Bytes.fromHex("a31cb9d55e37f17c773a6eee757f15d6d7fe52d77cd1037fcb7ee00ed2bef6c9")))
+ .build();
+
+ Map<Integer, ImmutableBytesWritable> actualHashes = new HashMap<>();
+ Path dataDir = new Path(testDir, HashTable.HASH_DATA_DIR);
+ for (int i = 0; i < numHashFiles; i++) {
+ Path hashPath = new Path(dataDir,
HashTable.TableHash.getDataFileName(i));
+ try (MapFile.Reader reader = new MapFile.Reader(hashPath, fs.getConf()))
{
+ ImmutableBytesWritable key = new ImmutableBytesWritable();
+ ImmutableBytesWritable hash = new ImmutableBytesWritable();
+ while (reader.next(key, hash)) {
+ int intKey = -1;
+ if (key.getLength() > 0) {
+ intKey = Bytes.toInt(key.get(), key.getOffset(), key.getLength());
+ }
+ if (actualHashes.containsKey(intKey)) {
+ fail("duplicate key in data files: " + intKey);
+ }
+ actualHashes.put(intKey, new
ImmutableBytesWritable(hash.copyBytes()));
+ }
+ }
+ }
+
+ if (!expectedHashes.equals(actualHashes)) {
+ LOG.error("Diff: " + Maps.difference(expectedHashes, actualHashes));
+ }
+ assertEquals(expectedHashes, actualHashes);
+
+ TEST_UTIL.deleteTable(tableName);
+ TEST_UTIL.cleanupDataTestDirOnTestFS();
+ }
+
+ /**
+ * A manifest written by an older HashTable does not carry the hashAlgorithm
property. Reading
+ * such a manifest must default to MD5 so existing on-disk hash data stays
usable.
+ */
+ @Test
+ public void testManifestWithoutAlgorithmDefaultsToMd5(TestInfo testInfo)
throws Exception {
+ Path testDir =
+
TEST_UTIL.getDataTestDirOnTestFS(testInfo.getTestMethod().get().getName() +
"_legacy");
+ FileSystem fs = TEST_UTIL.getTestFileSystem();
+ fs.mkdirs(testDir);
+
+ // hand-craft a legacy manifest with no hashAlgorithm property
+ Properties p = new Properties();
+ p.setProperty("table", "legacy");
+ p.setProperty("targetBatchSize", "8000");
+ p.setProperty("numHashFiles", "1");
+ p.setProperty("rawScan", "false");
+ Path manifest = new Path(testDir, HashTable.MANIFEST_FILE_NAME);
+ try (OutputStream out = fs.create(manifest)) {
+ p.store(out, null);
+ }
+
+ // write an empty partitions file so TableHash.read() succeeds
+ HashTable.TableHash empty = new HashTable.TableHash();
+ empty.partitions = new java.util.ArrayList<>();
+ empty.writePartitionFile(fs.getConf(), new Path(testDir,
HashTable.PARTITIONS_FILE_NAME));
+
+ HashTable.TableHash tableHash = HashTable.TableHash.read(fs.getConf(),
testDir);
+ assertEquals(HashTable.DEFAULT_HASH_ALGORITHM, tableHash.hashAlgorithm,
+ "Manifests without an algorithm property must default to MD5 for
back-compat");
+
+ TEST_UTIL.cleanupDataTestDirOnTestFS();
+ }
}
diff --git
a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestSyncTable.java
b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestSyncTable.java
index 7bd65da54c1..a85da23e842 100644
---
a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestSyncTable.java
+++
b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestSyncTable.java
@@ -181,6 +181,37 @@ public class TestSyncTable {
UTIL2.deleteTable(targetTableName);
}
+ @Test
+ public void testSyncTableWithSha256(TestInfo testInfo) throws Exception {
+ final TableName sourceTableName =
+ TableName.valueOf(testInfo.getTestMethod().get().getName() + "_source");
+ final TableName targetTableName =
+ TableName.valueOf(testInfo.getTestMethod().get().getName() + "_target");
+ Path testDir =
UTIL1.getDataTestDirOnTestFS(testInfo.getTestMethod().get().getName());
+
+ writeTestData(UTIL1, sourceTableName, UTIL1, targetTableName);
+ hashSourceTable(UTIL1, sourceTableName, testDir,
"--hashAlgorithm=SHA-256");
+
+ HashTable.TableHash tableHash =
+ HashTable.TableHash.read(UTIL1.getTestFileSystem().getConf(), testDir);
+ assertEquals("SHA-256", tableHash.hashAlgorithm,
+ "manifest must carry the algorithm so SyncTable can match the
source-side digest");
+
+ Counters syncCounters =
+ syncTables(UTIL1.getConfiguration(), sourceTableName, targetTableName,
testDir);
+ assertEqualTables(90, UTIL1, sourceTableName, UTIL1, targetTableName,
false);
+
+ assertEquals(60,
syncCounters.findCounter(Counter.ROWSWITHDIFFS).getValue());
+ assertEquals(10,
syncCounters.findCounter(Counter.SOURCEMISSINGROWS).getValue());
+ assertEquals(10,
syncCounters.findCounter(Counter.TARGETMISSINGROWS).getValue());
+ assertEquals(50,
syncCounters.findCounter(Counter.SOURCEMISSINGCELLS).getValue());
+ assertEquals(50,
syncCounters.findCounter(Counter.TARGETMISSINGCELLS).getValue());
+ assertEquals(20,
syncCounters.findCounter(Counter.DIFFERENTCELLVALUES).getValue());
+
+ UTIL1.deleteTable(sourceTableName);
+ UTIL1.deleteTable(targetTableName);
+ }
+
@Test
public void testSyncTableIgnoreTimestampsTrue(TestInfo testInfo) throws
Exception {
final TableName sourceTableName =