This is an automated email from the ASF dual-hosted git repository.
wchevreuil pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/master by this push:
new 38b06c7 Revert "HBASE-23136 PartionedMobFileCompactor bulkloaded
files shouldn't get replicated"
38b06c7 is described below
commit 38b06c75170830f3f92eac3f03b3870e22d2c9b0
Author: Wellington Chevreuil <[email protected]>
AuthorDate: Fri Oct 11 14:45:29 2019 +0100
Revert "HBASE-23136 PartionedMobFileCompactor bulkloaded files shouldn't
get replicated"
This reverts commit fec4c5249968e112b5ab6f4154d0e6c1fe428abc.
---
.../hadoop/hbase/shaded/protobuf/ProtobufUtil.java | 11 ++--
.../hbase/shaded/protobuf/RequestConverter.java | 5 +-
.../src/main/protobuf/Client.proto | 1 -
hbase-protocol-shaded/src/main/protobuf/WAL.proto | 1 -
.../hbase/client/AsyncClusterConnection.java | 15 +++--
.../hbase/client/AsyncClusterConnectionImpl.java | 4 +-
.../mob/compactions/PartitionedMobCompactor.java | 4 +-
.../apache/hadoop/hbase/regionserver/HRegion.java | 7 +--
.../hbase/regionserver/SecureBulkLoadManager.java | 2 +-
.../replication/regionserver/ReplicationSink.java | 23 ++++----
.../apache/hadoop/hbase/tool/BulkLoadHFiles.java | 6 --
.../hadoop/hbase/tool/BulkLoadHFilesTool.java | 9 +--
.../hbase/client/DummyAsyncClusterConnection.java | 2 +-
.../regionserver/TestBulkLoadReplication.java | 65 +---------------------
.../tool/TestBulkLoadHFilesSplitRecovery.java | 3 +-
15 files changed, 35 insertions(+), 123 deletions(-)
diff --git
a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java
index 8108217..52e3bf1 100644
---
a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java
+++
b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java
@@ -2570,19 +2570,16 @@ public final class ProtobufUtil {
ByteString encodedRegionName, Map<byte[], List<Path>> storeFiles,
Map<String, Long> storeFilesSize, long bulkloadSeqId) {
return toBulkLoadDescriptor(tableName, encodedRegionName, storeFiles,
- storeFilesSize, bulkloadSeqId, null, true);
+ storeFilesSize, bulkloadSeqId, null);
}
public static WALProtos.BulkLoadDescriptor toBulkLoadDescriptor(TableName
tableName,
ByteString encodedRegionName, Map<byte[], List<Path>> storeFiles,
- Map<String, Long> storeFilesSize, long bulkloadSeqId,
- List<String> clusterIds, boolean replicate) {
+ Map<String, Long> storeFilesSize, long bulkloadSeqId, List<String>
clusterIds) {
BulkLoadDescriptor.Builder desc =
BulkLoadDescriptor.newBuilder()
- .setTableName(ProtobufUtil.toProtoTableName(tableName))
- .setEncodedRegionName(encodedRegionName)
- .setBulkloadSeqNum(bulkloadSeqId)
- .setReplicate(replicate);
+ .setTableName(ProtobufUtil.toProtoTableName(tableName))
+
.setEncodedRegionName(encodedRegionName).setBulkloadSeqNum(bulkloadSeqId);
if(clusterIds != null) {
desc.addAllClusterIds(clusterIds);
}
diff --git
a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java
index d45423c..ae3cd3f 100644
---
a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java
+++
b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java
@@ -568,7 +568,7 @@ public final class RequestConverter {
final byte[] regionName, boolean assignSeqNum,
final Token<?> userToken, final String bulkToken) {
return buildBulkLoadHFileRequest(familyPaths, regionName, assignSeqNum,
userToken, bulkToken,
- false, null, true);
+ false, null);
}
/**
@@ -585,7 +585,7 @@ public final class RequestConverter {
public static BulkLoadHFileRequest buildBulkLoadHFileRequest(
final List<Pair<byte[], String>> familyPaths, final byte[] regionName,
boolean assignSeqNum,
final Token<?> userToken, final String bulkToken, boolean copyFiles,
- List<String> clusterIds, boolean replicate) {
+ List<String> clusterIds) {
RegionSpecifier region = RequestConverter.buildRegionSpecifier(
RegionSpecifierType.REGION_NAME, regionName);
@@ -626,7 +626,6 @@ public final class RequestConverter {
if (clusterIds != null) {
request.addAllClusterIds(clusterIds);
}
- request.setReplicate(replicate);
return request.build();
}
diff --git a/hbase-protocol-shaded/src/main/protobuf/Client.proto
b/hbase-protocol-shaded/src/main/protobuf/Client.proto
index a22c623..07d8d71 100644
--- a/hbase-protocol-shaded/src/main/protobuf/Client.proto
+++ b/hbase-protocol-shaded/src/main/protobuf/Client.proto
@@ -379,7 +379,6 @@ message BulkLoadHFileRequest {
optional string bulk_token = 5;
optional bool copy_file = 6 [default = false];
repeated string cluster_ids = 7;
- optional bool replicate = 8 [default = true];
message FamilyPath {
required bytes family = 1;
diff --git a/hbase-protocol-shaded/src/main/protobuf/WAL.proto
b/hbase-protocol-shaded/src/main/protobuf/WAL.proto
index fd622cf..c103075 100644
--- a/hbase-protocol-shaded/src/main/protobuf/WAL.proto
+++ b/hbase-protocol-shaded/src/main/protobuf/WAL.proto
@@ -151,7 +151,6 @@ message BulkLoadDescriptor {
repeated StoreDescriptor stores = 3;
required int64 bulkload_seq_num = 4;
repeated string cluster_ids = 5;
- optional bool replicate = 6 [default = true];
}
/**
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnection.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnection.java
index 92118ac..5c57817 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnection.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnection.java
@@ -81,18 +81,17 @@ public interface AsyncClusterConnection extends
AsyncConnection {
* Defined as default here to avoid breaking callers who rely on the
bulkLoad version that does
* not expect additional clusterIds param.
* @param tableName the target table
- * @param familyPaths hdfs path for the the table family dirs containg files
to be loaded.
- * @param row row key.
- * @param assignSeqNum seq num for the event on WAL.
- * @param userToken user token.
- * @param bulkToken bulk load token.
- * @param copyFiles flag for copying the loaded hfiles.
+ * @param familyPaths hdfs path for the the table family dirs containg files
to be loaded
+ * @param row row key
+ * @param assignSeqNum seq num for the event on WAL
+ * @param userToken user token
+ * @param bulkToken bulk load token
+ * @param copyFiles flag for copying the loaded hfiles
* @param clusterIds list of cluster ids where the given bulk load has
already been processed.
- * @param replicate flags if the bulkload is targeted for replication.
*/
CompletableFuture<Boolean> bulkLoad(TableName tableName, List<Pair<byte[],
String>> familyPaths,
byte[] row, boolean assignSeqNum, Token<?> userToken, String bulkToken,
boolean copyFiles,
- List<String> clusterIds, boolean replicate);
+ List<String> clusterIds);
/**
* Clean up after finishing bulk load, no matter success or not.
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnectionImpl.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnectionImpl.java
index 046ef41..746c3b8 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnectionImpl.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnectionImpl.java
@@ -109,13 +109,13 @@ class AsyncClusterConnectionImpl extends
AsyncConnectionImpl implements AsyncClu
@Override
public CompletableFuture<Boolean> bulkLoad(TableName tableName,
List<Pair<byte[], String>> familyPaths, byte[] row, boolean assignSeqNum,
Token<?> userToken,
- String bulkToken, boolean copyFiles, List<String> clusterIds, boolean
replicate) {
+ String bulkToken, boolean copyFiles, List<String> clusterIds) {
return callerFactory.<Boolean> single().table(tableName).row(row)
.action((controller, loc, stub) -> ConnectionUtils
.<Void, BulkLoadHFileRequest, BulkLoadHFileResponse, Boolean>
call(controller, loc, stub,
null,
(rn, nil) -> RequestConverter.buildBulkLoadHFileRequest(familyPaths,
rn, assignSeqNum,
- userToken, bulkToken, copyFiles, clusterIds, replicate),
+ userToken, bulkToken, copyFiles, clusterIds),
(s, c, req, done) -> s.bulkLoadHFile(c, req, done), (c, resp) ->
resp.getLoaded()))
.call();
}
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/compactions/PartitionedMobCompactor.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/compactions/PartitionedMobCompactor.java
index dba591d..a5823ec 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/compactions/PartitionedMobCompactor.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/compactions/PartitionedMobCompactor.java
@@ -827,9 +827,7 @@ public class PartitionedMobCompactor extends MobCompactor {
throws IOException {
// bulkload the ref file
try {
- BulkLoadHFiles bulkLoader = BulkLoadHFiles.create(conf);
- bulkLoader.disableReplication();
- bulkLoader.bulkLoad(tableName, bulkloadDirectory);
+ BulkLoadHFiles.create(conf).bulkLoad(tableName, bulkloadDirectory);
} catch (Exception e) {
throw new IOException(e);
}
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index ccfc69d..fd8235d 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -6146,8 +6146,7 @@ public class HRegion implements HeapSize,
PropagatingConfigurationObserver, Regi
*/
public Map<byte[], List<Path>> bulkLoadHFiles(Collection<Pair<byte[],
String>> familyPaths, boolean assignSeqId,
BulkLoadListener bulkLoadListener) throws IOException {
- return bulkLoadHFiles(familyPaths, assignSeqId, bulkLoadListener, false,
- null, true);
+ return bulkLoadHFiles(familyPaths, assignSeqId, bulkLoadListener, false,
null);
}
/**
@@ -6198,7 +6197,7 @@ public class HRegion implements HeapSize,
PropagatingConfigurationObserver, Regi
*/
public Map<byte[], List<Path>> bulkLoadHFiles(Collection<Pair<byte[],
String>> familyPaths,
boolean assignSeqId, BulkLoadListener bulkLoadListener,
- boolean copyFile, List<String> clusterIds, boolean replicate) throws
IOException {
+ boolean copyFile, List<String> clusterIds) throws IOException {
long seqId = -1;
Map<byte[], List<Path>> storeFiles = new TreeMap<>(Bytes.BYTES_COMPARATOR);
Map<String, Long> storeFilesSizes = new HashMap<>();
@@ -6373,7 +6372,7 @@ public class HRegion implements HeapSize,
PropagatingConfigurationObserver, Regi
WALProtos.BulkLoadDescriptor loadDescriptor =
ProtobufUtil.toBulkLoadDescriptor(this.getRegionInfo().getTable(),
UnsafeByteOperations.unsafeWrap(this.getRegionInfo().getEncodedNameAsBytes()),
- storeFiles, storeFilesSizes, seqId, clusterIds, replicate);
+ storeFiles, storeFilesSizes, seqId, clusterIds);
WALUtil.writeBulkLoadMarkerAndSync(this.wal,
this.getReplicationScope(), getRegionInfo(),
loadDescriptor, mvcc);
} catch (IOException ioe) {
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SecureBulkLoadManager.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SecureBulkLoadManager.java
index bccc8fe..ad19473 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SecureBulkLoadManager.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SecureBulkLoadManager.java
@@ -296,7 +296,7 @@ public class SecureBulkLoadManager {
//To enable access prior to staging
return region.bulkLoadHFiles(familyPaths, true,
new SecureBulkLoadListener(fs, bulkToken, conf),
request.getCopyFile(),
- clusterIds, request.getReplicate());
+ clusterIds);
} catch (Exception e) {
LOG.error("Failed to complete bulk load", e);
}
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java
index 51cbea8..6c3f70c 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java
@@ -203,19 +203,18 @@ public class ReplicationSink {
// Handle bulk load hfiles replication
if (CellUtil.matchingQualifier(cell, WALEdit.BULK_LOAD)) {
BulkLoadDescriptor bld = WALEdit.getBulkLoadDescriptor(cell);
- if(bld.getReplicate()) {
- if (bulkLoadsPerClusters == null) {
- bulkLoadsPerClusters = new HashMap<>();
- }
- // Map of table name Vs list of pair of family and list of
- // hfile paths from its namespace
- Map<String, List<Pair<byte[], List<String>>>> bulkLoadHFileMap =
bulkLoadsPerClusters.get(bld.getClusterIdsList());
- if (bulkLoadHFileMap == null) {
- bulkLoadHFileMap = new HashMap<>();
- bulkLoadsPerClusters.put(bld.getClusterIdsList(),
bulkLoadHFileMap);
- }
- buildBulkLoadHFileMap(bulkLoadHFileMap, table, bld);
+ if(bulkLoadsPerClusters == null) {
+ bulkLoadsPerClusters = new HashMap<>();
+ }
+ // Map of table name Vs list of pair of family and list of
+ // hfile paths from its namespace
+ Map<String, List<Pair<byte[], List<String>>>> bulkLoadHFileMap =
+ bulkLoadsPerClusters.get(bld.getClusterIdsList());
+ if (bulkLoadHFileMap == null) {
+ bulkLoadHFileMap = new HashMap<>();
+ bulkLoadsPerClusters.put(bld.getClusterIdsList(),
bulkLoadHFileMap);
}
+ buildBulkLoadHFileMap(bulkLoadHFileMap, table, bld);
} else {
// Handle wal replication
if (isNewRowOrType(previousCell, cell)) {
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/BulkLoadHFiles.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/BulkLoadHFiles.java
index 1cffe05..f3d627a 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/BulkLoadHFiles.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/BulkLoadHFiles.java
@@ -85,10 +85,6 @@ public interface BulkLoadHFiles {
throws TableNotFoundException, IOException;
/**
- * Disable replication for this bulkload, if bulkload replication is
configured.
- */
- void disableReplication();
- /**
* Perform a bulk load of the given directory into the given pre-existing
table.
* @param tableName the table to load into
* @param dir the directory that was provided as the output path of a job
using
@@ -101,6 +97,4 @@ public interface BulkLoadHFiles {
static BulkLoadHFiles create(Configuration conf) {
return new BulkLoadHFilesTool(conf);
}
-
-
}
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/BulkLoadHFilesTool.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/BulkLoadHFilesTool.java
index 294d94b..0e2e029 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/BulkLoadHFilesTool.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/BulkLoadHFilesTool.java
@@ -132,7 +132,6 @@ public class BulkLoadHFilesTool extends Configured
implements BulkLoadHFiles, To
private String bulkToken;
private List<String> clusterIds = new ArrayList<>();
- private boolean replicate = true;
public BulkLoadHFilesTool(Configuration conf) {
// make a copy, just to be sure we're not overriding someone else's config
@@ -380,8 +379,7 @@ public class BulkLoadHFilesTool extends Configured
implements BulkLoadHFiles, To
.collect(Collectors.toList());
CompletableFuture<Collection<LoadQueueItem>> future = new
CompletableFuture<>();
FutureUtils.addListener(conn.bulkLoad(tableName, familyPaths, first,
assignSeqIds,
- fsDelegationToken.getUserToken(), bulkToken, copyFiles, clusterIds,
replicate),
- (loaded, error) -> {
+ fsDelegationToken.getUserToken(), bulkToken, copyFiles, clusterIds),
(loaded, error) -> {
if (error != null) {
LOG.error("Encountered unrecoverable error from region server",
error);
if (getConf().getBoolean(RETRY_ON_IO_EXCEPTION, false) &&
@@ -1054,9 +1052,4 @@ public class BulkLoadHFilesTool extends Configured
implements BulkLoadHFiles, To
int ret = ToolRunner.run(conf, new BulkLoadHFilesTool(conf), args);
System.exit(ret);
}
-
- @Override
- public void disableReplication(){
- this.replicate = false;
- }
}
diff --git
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/DummyAsyncClusterConnection.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/DummyAsyncClusterConnection.java
index 8755749..5a34457 100644
---
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/DummyAsyncClusterConnection.java
+++
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/DummyAsyncClusterConnection.java
@@ -144,7 +144,7 @@ public class DummyAsyncClusterConnection implements
AsyncClusterConnection {
@Override
public CompletableFuture<Boolean> bulkLoad(TableName tableName,
List<Pair<byte[], String>> familyPaths, byte[] row, boolean assignSeqNum,
Token<?> userToken,
- String bulkToken, boolean copyFiles, List<String> clusterIds, boolean
replicate) {
+ String bulkToken, boolean copyFiles, List<String> clusterIds) {
return null;
}
diff --git
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBulkLoadReplication.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBulkLoadReplication.java
index b227403..7a49989 100644
---
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBulkLoadReplication.java
+++
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBulkLoadReplication.java
@@ -24,15 +24,10 @@ import static org.junit.Assert.assertTrue;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
-import java.util.Arrays;
-import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.Optional;
-import java.util.Random;
-import java.util.UUID;
import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.conf.Configuration;
@@ -47,7 +42,6 @@ import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Admin;
-import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
@@ -60,22 +54,14 @@ import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.coprocessor.RegionObserver;
-import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.io.hfile.HFileContext;
-import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
-import org.apache.hadoop.hbase.mob.MobConstants;
-import org.apache.hadoop.hbase.mob.MobFileName;
-import org.apache.hadoop.hbase.mob.MobUtils;
-import org.apache.hadoop.hbase.mob.compactions.PartitionedMobCompactor;
-import org.apache.hadoop.hbase.mob.compactions.TestPartitionedMobCompactor;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.hadoop.hbase.replication.TestReplicationBase;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
import org.apache.hadoop.hbase.tool.BulkLoadHFilesTool;
import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.junit.After;
@@ -151,9 +137,7 @@ public class TestBulkLoadReplication extends
TestReplicationBase {
UTIL3.startMiniCluster(NUM_SLAVES1);
TableDescriptor table = TableDescriptorBuilder.newBuilder(tableName)
- .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(famName)
- .setMobEnabled(true)
- .setMobThreshold(4000)
+
.setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(famName).setMaxVersions(100)
.setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build())
.setColumnFamily(ColumnFamilyDescriptorBuilder.of(noRepfamName)).build();
@@ -248,23 +232,6 @@ public class TestBulkLoadReplication extends
TestReplicationBase {
assertEquals(9, BULK_LOADS_COUNT.get());
}
- @Test
- public void testPartionedMOBCompactionBulkLoadDoesntReplicate() throws
Exception {
- Path path = createMobFiles(UTIL3);
- ColumnFamilyDescriptor descriptor =
- new
ColumnFamilyDescriptorBuilder.ModifyableColumnFamilyDescriptor(famName);
- PartitionedMobCompactor compactor = new
PartitionedMobCompactor(UTIL3.getConfiguration(),
- UTIL3.getTestFileSystem(), tableName, descriptor,
Executors.newFixedThreadPool(1));
- BULK_LOAD_LATCH = new CountDownLatch(1);
- BULK_LOADS_COUNT.set(0);
-
compactor.compact(Arrays.asList(UTIL3.getTestFileSystem().listStatus(path)),
true);
- assertTrue(BULK_LOAD_LATCH.await(1, TimeUnit.SECONDS));
- Thread.sleep(400);
- assertEquals(1, BULK_LOADS_COUNT.get());
-
- }
-
-
private void assertBulkLoadConditions(byte[] row, byte[] value,
HBaseTestingUtility utility, Table...tables) throws Exception {
BULK_LOAD_LATCH = new CountDownLatch(3);
@@ -325,36 +292,6 @@ public class TestBulkLoadReplication extends
TestReplicationBase {
return hFileLocation.getAbsoluteFile().getAbsolutePath();
}
- private Path createMobFiles(HBaseTestingUtility util) throws IOException {
- Path testDir = FSUtils.getRootDir(util.getConfiguration());
- Path mobTestDir = new Path(testDir, MobConstants.MOB_DIR_NAME);
- Path basePath = new Path(new Path(mobTestDir,
tableName.getNameAsString()), "f");
- HFileContext meta = new HFileContextBuilder().withBlockSize(8 *
1024).build();
- MobFileName mobFileName = null;
- byte[] mobFileStartRow = new byte[32];
- for (byte rowKey : Bytes.toBytes("01234")) {
- mobFileName = MobFileName.create(mobFileStartRow,
MobUtils.formatDate(new Date()),
- UUID.randomUUID().toString().replaceAll("-", ""));
- StoreFileWriter mobFileWriter =
- new StoreFileWriter.Builder(util.getConfiguration(),
- new CacheConfig(util.getConfiguration()),
util.getTestFileSystem()).withFileContext(meta)
- .withFilePath(new Path(basePath, mobFileName.getFileName())).build();
- long now = System.currentTimeMillis();
- try {
- for (int i = 0; i < 10; i++) {
- byte[] key = Bytes.add(Bytes.toBytes(rowKey), Bytes.toBytes(i));
- byte[] dummyData = new byte[5000];
- new Random().nextBytes(dummyData);
- mobFileWriter.append(
- new KeyValue(key, famName, Bytes.toBytes("1"), now,
KeyValue.Type.Put, dummyData));
- }
- } finally {
- mobFileWriter.close();
- }
- }
- return basePath;
- }
-
public static class BulkReplicationTestObserver implements RegionCoprocessor
{
String clusterName;
diff --git
a/hbase-server/src/test/java/org/apache/hadoop/hbase/tool/TestBulkLoadHFilesSplitRecovery.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/tool/TestBulkLoadHFilesSplitRecovery.java
index b626fe8..1326564 100644
---
a/hbase-server/src/test/java/org/apache/hadoop/hbase/tool/TestBulkLoadHFilesSplitRecovery.java
+++
b/hbase-server/src/test/java/org/apache/hadoop/hbase/tool/TestBulkLoadHFilesSplitRecovery.java
@@ -267,8 +267,7 @@ public class TestBulkLoadHFilesSplitRecovery {
private static AsyncClusterConnection
mockAndInjectError(AsyncClusterConnection conn) {
AsyncClusterConnection errConn = spy(conn);
doReturn(failedFuture(new IOException("injecting bulk load
error"))).when(errConn)
- .bulkLoad(any(), anyList(), any(), anyBoolean(), any(), any(),
anyBoolean(), anyList(),
- anyBoolean());
+ .bulkLoad(any(), anyList(), any(), anyBoolean(), any(), any(),
anyBoolean(), anyList());
return errConn;
}