This is an automated email from the ASF dual-hosted git repository. wchevreuil pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/master by this push: new fec4c52 HBASE-23136 PartionedMobFileCompactor bulkloaded files shouldn't get replicated fec4c52 is described below commit fec4c5249968e112b5ab6f4154d0e6c1fe428abc Author: Wellington Ramos Chevreuil <wchevre...@apache.org> AuthorDate: Thu Oct 10 10:37:42 2019 +0100 HBASE-23136 PartionedMobFileCompactor bulkloaded files shouldn't get replicated Signed-off-by: stack <st...@apache.org> --- .../hadoop/hbase/shaded/protobuf/ProtobufUtil.java | 11 ++-- .../hbase/shaded/protobuf/RequestConverter.java | 5 +- .../src/main/protobuf/Client.proto | 1 + hbase-protocol-shaded/src/main/protobuf/WAL.proto | 1 + .../hbase/client/AsyncClusterConnection.java | 15 ++--- .../hbase/client/AsyncClusterConnectionImpl.java | 4 +- .../mob/compactions/PartitionedMobCompactor.java | 4 +- .../apache/hadoop/hbase/regionserver/HRegion.java | 7 ++- .../hbase/regionserver/SecureBulkLoadManager.java | 2 +- .../replication/regionserver/ReplicationSink.java | 23 ++++---- .../apache/hadoop/hbase/tool/BulkLoadHFiles.java | 6 ++ .../hadoop/hbase/tool/BulkLoadHFilesTool.java | 9 ++- .../hbase/client/DummyAsyncClusterConnection.java | 2 +- .../regionserver/TestBulkLoadReplication.java | 65 +++++++++++++++++++++- .../tool/TestBulkLoadHFilesSplitRecovery.java | 3 +- 15 files changed, 123 insertions(+), 35 deletions(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java index 52e3bf1..8108217 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java @@ -2570,16 +2570,19 @@ public final class ProtobufUtil { ByteString encodedRegionName, Map<byte[], List<Path>> storeFiles, Map<String, Long> storeFilesSize, long bulkloadSeqId) { return toBulkLoadDescriptor(tableName, encodedRegionName, storeFiles, - storeFilesSize, bulkloadSeqId, null); + storeFilesSize, bulkloadSeqId, null, true); } public static WALProtos.BulkLoadDescriptor toBulkLoadDescriptor(TableName tableName, ByteString encodedRegionName, Map<byte[], List<Path>> storeFiles, - Map<String, Long> storeFilesSize, long bulkloadSeqId, List<String> clusterIds) { + Map<String, Long> storeFilesSize, long bulkloadSeqId, + List<String> clusterIds, boolean replicate) { BulkLoadDescriptor.Builder desc = BulkLoadDescriptor.newBuilder() - .setTableName(ProtobufUtil.toProtoTableName(tableName)) - .setEncodedRegionName(encodedRegionName).setBulkloadSeqNum(bulkloadSeqId); + .setTableName(ProtobufUtil.toProtoTableName(tableName)) + .setEncodedRegionName(encodedRegionName) + .setBulkloadSeqNum(bulkloadSeqId) + .setReplicate(replicate); if(clusterIds != null) { desc.addAllClusterIds(clusterIds); } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java index ae3cd3f..d45423c 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java @@ -568,7 +568,7 @@ public final class RequestConverter { final byte[] regionName, boolean assignSeqNum, final Token<?> userToken, final String bulkToken) { return buildBulkLoadHFileRequest(familyPaths, regionName, assignSeqNum, userToken, bulkToken, - false, null); + false, null, true); } /** @@ -585,7 +585,7 @@ public final class RequestConverter { public static BulkLoadHFileRequest buildBulkLoadHFileRequest( final List<Pair<byte[], String>> familyPaths, final byte[] regionName, boolean assignSeqNum, final Token<?> userToken, final String bulkToken, boolean copyFiles, - List<String> clusterIds) { + List<String> clusterIds, boolean replicate) { RegionSpecifier region = RequestConverter.buildRegionSpecifier( RegionSpecifierType.REGION_NAME, regionName); @@ -626,6 +626,7 @@ public final class RequestConverter { if (clusterIds != null) { request.addAllClusterIds(clusterIds); } + request.setReplicate(replicate); return request.build(); } diff --git a/hbase-protocol-shaded/src/main/protobuf/Client.proto b/hbase-protocol-shaded/src/main/protobuf/Client.proto index 07d8d71..a22c623 100644 --- a/hbase-protocol-shaded/src/main/protobuf/Client.proto +++ b/hbase-protocol-shaded/src/main/protobuf/Client.proto @@ -379,6 +379,7 @@ message BulkLoadHFileRequest { optional string bulk_token = 5; optional bool copy_file = 6 [default = false]; repeated string cluster_ids = 7; + optional bool replicate = 8 [default = true]; message FamilyPath { required bytes family = 1; diff --git a/hbase-protocol-shaded/src/main/protobuf/WAL.proto b/hbase-protocol-shaded/src/main/protobuf/WAL.proto index c103075..fd622cf 100644 --- a/hbase-protocol-shaded/src/main/protobuf/WAL.proto +++ b/hbase-protocol-shaded/src/main/protobuf/WAL.proto @@ -151,6 +151,7 @@ message BulkLoadDescriptor { repeated StoreDescriptor stores = 3; required int64 bulkload_seq_num = 4; repeated string cluster_ids = 5; + optional bool replicate = 6 [default = true]; } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnection.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnection.java index 5c57817..92118ac 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnection.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnection.java @@ -81,17 +81,18 @@ public interface AsyncClusterConnection extends AsyncConnection { * Defined as default here to avoid breaking callers who rely on the bulkLoad version that does * not expect additional clusterIds param. * @param tableName the target table - * @param familyPaths hdfs path for the the table family dirs containg files to be loaded - * @param row row key - * @param assignSeqNum seq num for the event on WAL - * @param userToken user token - * @param bulkToken bulk load token - * @param copyFiles flag for copying the loaded hfiles + * @param familyPaths hdfs path for the the table family dirs containg files to be loaded. + * @param row row key. + * @param assignSeqNum seq num for the event on WAL. + * @param userToken user token. + * @param bulkToken bulk load token. + * @param copyFiles flag for copying the loaded hfiles. * @param clusterIds list of cluster ids where the given bulk load has already been processed. + * @param replicate flags if the bulkload is targeted for replication. */ CompletableFuture<Boolean> bulkLoad(TableName tableName, List<Pair<byte[], String>> familyPaths, byte[] row, boolean assignSeqNum, Token<?> userToken, String bulkToken, boolean copyFiles, - List<String> clusterIds); + List<String> clusterIds, boolean replicate); /** * Clean up after finishing bulk load, no matter success or not. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnectionImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnectionImpl.java index 746c3b8..046ef41 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnectionImpl.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnectionImpl.java @@ -109,13 +109,13 @@ class AsyncClusterConnectionImpl extends AsyncConnectionImpl implements AsyncClu @Override public CompletableFuture<Boolean> bulkLoad(TableName tableName, List<Pair<byte[], String>> familyPaths, byte[] row, boolean assignSeqNum, Token<?> userToken, - String bulkToken, boolean copyFiles, List<String> clusterIds) { + String bulkToken, boolean copyFiles, List<String> clusterIds, boolean replicate) { return callerFactory.<Boolean> single().table(tableName).row(row) .action((controller, loc, stub) -> ConnectionUtils .<Void, BulkLoadHFileRequest, BulkLoadHFileResponse, Boolean> call(controller, loc, stub, null, (rn, nil) -> RequestConverter.buildBulkLoadHFileRequest(familyPaths, rn, assignSeqNum, - userToken, bulkToken, copyFiles, clusterIds), + userToken, bulkToken, copyFiles, clusterIds, replicate), (s, c, req, done) -> s.bulkLoadHFile(c, req, done), (c, resp) -> resp.getLoaded())) .call(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/compactions/PartitionedMobCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/compactions/PartitionedMobCompactor.java index a5823ec..dba591d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/compactions/PartitionedMobCompactor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/compactions/PartitionedMobCompactor.java @@ -827,7 +827,9 @@ public class PartitionedMobCompactor extends MobCompactor { throws IOException { // bulkload the ref file try { - BulkLoadHFiles.create(conf).bulkLoad(tableName, bulkloadDirectory); + BulkLoadHFiles bulkLoader = BulkLoadHFiles.create(conf); + bulkLoader.disableReplication(); + bulkLoader.bulkLoad(tableName, bulkloadDirectory); } catch (Exception e) { throw new IOException(e); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index a9ff440..571be00 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -6146,7 +6146,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi */ public Map<byte[], List<Path>> bulkLoadHFiles(Collection<Pair<byte[], String>> familyPaths, boolean assignSeqId, BulkLoadListener bulkLoadListener) throws IOException { - return bulkLoadHFiles(familyPaths, assignSeqId, bulkLoadListener, false, null); + return bulkLoadHFiles(familyPaths, assignSeqId, bulkLoadListener, false, + null, true); } /** @@ -6197,7 +6198,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi */ public Map<byte[], List<Path>> bulkLoadHFiles(Collection<Pair<byte[], String>> familyPaths, boolean assignSeqId, BulkLoadListener bulkLoadListener, - boolean copyFile, List<String> clusterIds) throws IOException { + boolean copyFile, List<String> clusterIds, boolean replicate) throws IOException { long seqId = -1; Map<byte[], List<Path>> storeFiles = new TreeMap<>(Bytes.BYTES_COMPARATOR); Map<String, Long> storeFilesSizes = new HashMap<>(); @@ -6372,7 +6373,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi WALProtos.BulkLoadDescriptor loadDescriptor = ProtobufUtil.toBulkLoadDescriptor(this.getRegionInfo().getTable(), UnsafeByteOperations.unsafeWrap(this.getRegionInfo().getEncodedNameAsBytes()), - storeFiles, storeFilesSizes, seqId, clusterIds); + storeFiles, storeFilesSizes, seqId, clusterIds, replicate); WALUtil.writeBulkLoadMarkerAndSync(this.wal, this.getReplicationScope(), getRegionInfo(), loadDescriptor, mvcc); } catch (IOException ioe) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SecureBulkLoadManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SecureBulkLoadManager.java index ad19473..bccc8fe 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SecureBulkLoadManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SecureBulkLoadManager.java @@ -296,7 +296,7 @@ public class SecureBulkLoadManager { //To enable access prior to staging return region.bulkLoadHFiles(familyPaths, true, new SecureBulkLoadListener(fs, bulkToken, conf), request.getCopyFile(), - clusterIds); + clusterIds, request.getReplicate()); } catch (Exception e) { LOG.error("Failed to complete bulk load", e); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java index 6c3f70c..51cbea8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java @@ -203,18 +203,19 @@ public class ReplicationSink { // Handle bulk load hfiles replication if (CellUtil.matchingQualifier(cell, WALEdit.BULK_LOAD)) { BulkLoadDescriptor bld = WALEdit.getBulkLoadDescriptor(cell); - if(bulkLoadsPerClusters == null) { - bulkLoadsPerClusters = new HashMap<>(); - } - // Map of table name Vs list of pair of family and list of - // hfile paths from its namespace - Map<String, List<Pair<byte[], List<String>>>> bulkLoadHFileMap = - bulkLoadsPerClusters.get(bld.getClusterIdsList()); - if (bulkLoadHFileMap == null) { - bulkLoadHFileMap = new HashMap<>(); - bulkLoadsPerClusters.put(bld.getClusterIdsList(), bulkLoadHFileMap); + if(bld.getReplicate()) { + if (bulkLoadsPerClusters == null) { + bulkLoadsPerClusters = new HashMap<>(); + } + // Map of table name Vs list of pair of family and list of + // hfile paths from its namespace + Map<String, List<Pair<byte[], List<String>>>> bulkLoadHFileMap = bulkLoadsPerClusters.get(bld.getClusterIdsList()); + if (bulkLoadHFileMap == null) { + bulkLoadHFileMap = new HashMap<>(); + bulkLoadsPerClusters.put(bld.getClusterIdsList(), bulkLoadHFileMap); + } + buildBulkLoadHFileMap(bulkLoadHFileMap, table, bld); } - buildBulkLoadHFileMap(bulkLoadHFileMap, table, bld); } else { // Handle wal replication if (isNewRowOrType(previousCell, cell)) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/BulkLoadHFiles.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/BulkLoadHFiles.java index f3d627a..1cffe05 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/BulkLoadHFiles.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/BulkLoadHFiles.java @@ -85,6 +85,10 @@ public interface BulkLoadHFiles { throws TableNotFoundException, IOException; /** + * Disable replication for this bulkload, if bulkload replication is configured. + */ + void disableReplication(); + /** * Perform a bulk load of the given directory into the given pre-existing table. * @param tableName the table to load into * @param dir the directory that was provided as the output path of a job using @@ -97,4 +101,6 @@ public interface BulkLoadHFiles { static BulkLoadHFiles create(Configuration conf) { return new BulkLoadHFilesTool(conf); } + + } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/BulkLoadHFilesTool.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/BulkLoadHFilesTool.java index 0e2e029..294d94b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/BulkLoadHFilesTool.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/BulkLoadHFilesTool.java @@ -132,6 +132,7 @@ public class BulkLoadHFilesTool extends Configured implements BulkLoadHFiles, To private String bulkToken; private List<String> clusterIds = new ArrayList<>(); + private boolean replicate = true; public BulkLoadHFilesTool(Configuration conf) { // make a copy, just to be sure we're not overriding someone else's config @@ -379,7 +380,8 @@ public class BulkLoadHFilesTool extends Configured implements BulkLoadHFiles, To .collect(Collectors.toList()); CompletableFuture<Collection<LoadQueueItem>> future = new CompletableFuture<>(); FutureUtils.addListener(conn.bulkLoad(tableName, familyPaths, first, assignSeqIds, - fsDelegationToken.getUserToken(), bulkToken, copyFiles, clusterIds), (loaded, error) -> { + fsDelegationToken.getUserToken(), bulkToken, copyFiles, clusterIds, replicate), + (loaded, error) -> { if (error != null) { LOG.error("Encountered unrecoverable error from region server", error); if (getConf().getBoolean(RETRY_ON_IO_EXCEPTION, false) && @@ -1052,4 +1054,9 @@ public class BulkLoadHFilesTool extends Configured implements BulkLoadHFiles, To int ret = ToolRunner.run(conf, new BulkLoadHFilesTool(conf), args); System.exit(ret); } + + @Override + public void disableReplication(){ + this.replicate = false; + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/DummyAsyncClusterConnection.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/DummyAsyncClusterConnection.java index 5a34457..8755749 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/DummyAsyncClusterConnection.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/DummyAsyncClusterConnection.java @@ -144,7 +144,7 @@ public class DummyAsyncClusterConnection implements AsyncClusterConnection { @Override public CompletableFuture<Boolean> bulkLoad(TableName tableName, List<Pair<byte[], String>> familyPaths, byte[] row, boolean assignSeqNum, Token<?> userToken, - String bulkToken, boolean copyFiles, List<String> clusterIds) { + String bulkToken, boolean copyFiles, List<String> clusterIds, boolean replicate) { return null; } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBulkLoadReplication.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBulkLoadReplication.java index 7a49989..b227403 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBulkLoadReplication.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBulkLoadReplication.java @@ -24,10 +24,15 @@ import static org.junit.Assert.assertTrue; import java.io.File; import java.io.FileOutputStream; import java.io.IOException; +import java.util.Arrays; +import java.util.Date; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.Random; +import java.util.UUID; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import org.apache.hadoop.conf.Configuration; @@ -42,6 +47,7 @@ import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; @@ -54,14 +60,22 @@ import org.apache.hadoop.hbase.coprocessor.ObserverContext; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; import org.apache.hadoop.hbase.coprocessor.RegionObserver; +import org.apache.hadoop.hbase.io.hfile.CacheConfig; import org.apache.hadoop.hbase.io.hfile.HFile; import org.apache.hadoop.hbase.io.hfile.HFileContext; +import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; +import org.apache.hadoop.hbase.mob.MobConstants; +import org.apache.hadoop.hbase.mob.MobFileName; +import org.apache.hadoop.hbase.mob.MobUtils; +import org.apache.hadoop.hbase.mob.compactions.PartitionedMobCompactor; +import org.apache.hadoop.hbase.mob.compactions.TestPartitionedMobCompactor; import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; import org.apache.hadoop.hbase.replication.TestReplicationBase; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.ReplicationTests; import org.apache.hadoop.hbase.tool.BulkLoadHFilesTool; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.junit.After; @@ -137,7 +151,9 @@ public class TestBulkLoadReplication extends TestReplicationBase { UTIL3.startMiniCluster(NUM_SLAVES1); TableDescriptor table = TableDescriptorBuilder.newBuilder(tableName) - .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(famName).setMaxVersions(100) + .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(famName) + .setMobEnabled(true) + .setMobThreshold(4000) .setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build()) .setColumnFamily(ColumnFamilyDescriptorBuilder.of(noRepfamName)).build(); @@ -232,6 +248,23 @@ public class TestBulkLoadReplication extends TestReplicationBase { assertEquals(9, BULK_LOADS_COUNT.get()); } + @Test + public void testPartionedMOBCompactionBulkLoadDoesntReplicate() throws Exception { + Path path = createMobFiles(UTIL3); + ColumnFamilyDescriptor descriptor = + new ColumnFamilyDescriptorBuilder.ModifyableColumnFamilyDescriptor(famName); + PartitionedMobCompactor compactor = new PartitionedMobCompactor(UTIL3.getConfiguration(), + UTIL3.getTestFileSystem(), tableName, descriptor, Executors.newFixedThreadPool(1)); + BULK_LOAD_LATCH = new CountDownLatch(1); + BULK_LOADS_COUNT.set(0); + compactor.compact(Arrays.asList(UTIL3.getTestFileSystem().listStatus(path)), true); + assertTrue(BULK_LOAD_LATCH.await(1, TimeUnit.SECONDS)); + Thread.sleep(400); + assertEquals(1, BULK_LOADS_COUNT.get()); + + } + + private void assertBulkLoadConditions(byte[] row, byte[] value, HBaseTestingUtility utility, Table...tables) throws Exception { BULK_LOAD_LATCH = new CountDownLatch(3); @@ -292,6 +325,36 @@ public class TestBulkLoadReplication extends TestReplicationBase { return hFileLocation.getAbsoluteFile().getAbsolutePath(); } + private Path createMobFiles(HBaseTestingUtility util) throws IOException { + Path testDir = FSUtils.getRootDir(util.getConfiguration()); + Path mobTestDir = new Path(testDir, MobConstants.MOB_DIR_NAME); + Path basePath = new Path(new Path(mobTestDir, tableName.getNameAsString()), "f"); + HFileContext meta = new HFileContextBuilder().withBlockSize(8 * 1024).build(); + MobFileName mobFileName = null; + byte[] mobFileStartRow = new byte[32]; + for (byte rowKey : Bytes.toBytes("01234")) { + mobFileName = MobFileName.create(mobFileStartRow, MobUtils.formatDate(new Date()), + UUID.randomUUID().toString().replaceAll("-", "")); + StoreFileWriter mobFileWriter = + new StoreFileWriter.Builder(util.getConfiguration(), + new CacheConfig(util.getConfiguration()), util.getTestFileSystem()).withFileContext(meta) + .withFilePath(new Path(basePath, mobFileName.getFileName())).build(); + long now = System.currentTimeMillis(); + try { + for (int i = 0; i < 10; i++) { + byte[] key = Bytes.add(Bytes.toBytes(rowKey), Bytes.toBytes(i)); + byte[] dummyData = new byte[5000]; + new Random().nextBytes(dummyData); + mobFileWriter.append( + new KeyValue(key, famName, Bytes.toBytes("1"), now, KeyValue.Type.Put, dummyData)); + } + } finally { + mobFileWriter.close(); + } + } + return basePath; + } + public static class BulkReplicationTestObserver implements RegionCoprocessor { String clusterName; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/tool/TestBulkLoadHFilesSplitRecovery.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/tool/TestBulkLoadHFilesSplitRecovery.java index 1326564..b626fe8 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/tool/TestBulkLoadHFilesSplitRecovery.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/tool/TestBulkLoadHFilesSplitRecovery.java @@ -267,7 +267,8 @@ public class TestBulkLoadHFilesSplitRecovery { private static AsyncClusterConnection mockAndInjectError(AsyncClusterConnection conn) { AsyncClusterConnection errConn = spy(conn); doReturn(failedFuture(new IOException("injecting bulk load error"))).when(errConn) - .bulkLoad(any(), anyList(), any(), anyBoolean(), any(), any(), anyBoolean(), anyList()); + .bulkLoad(any(), anyList(), any(), anyBoolean(), any(), any(), anyBoolean(), anyList(), + anyBoolean()); return errConn; }