This is an automated email from the ASF dual-hosted git repository.
wchevreuil pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/master by this push:
new 5cf728da5a0 HBASE-26969:Eliminate MOB renames when SFT is enabled
(#4418)
5cf728da5a0 is described below
commit 5cf728da5a0fea5fd94bb31f06c9a43158ace6b4
Author: BukrosSzabolcs <[email protected]>
AuthorDate: Tue Jun 21 11:18:55 2022 +0200
HBASE-26969:Eliminate MOB renames when SFT is enabled (#4418)
Signed-off-by: Wellington Chevreuil <[email protected]>
---
.../hadoop/hbase/mob/DefaultMobStoreCompactor.java | 29 ++-
.../hadoop/hbase/mob/DefaultMobStoreFlusher.java | 16 +-
.../hadoop/hbase/mob/MobFileCleanerChore.java | 44 ++--
.../java/org/apache/hadoop/hbase/mob/MobUtils.java | 31 ++-
.../hadoop/hbase/mob/RSMobFileCleanerChore.java | 272 +++++++++++++++++++++
.../hadoop/hbase/regionserver/HMobStore.java | 39 ++-
.../hadoop/hbase/regionserver/HRegionServer.java | 15 ++
.../apache/hadoop/hbase/regionserver/HStore.java | 2 +-
.../hbase/regionserver/compactions/Compactor.java | 23 +-
.../hadoop/hbase/mob/FaultyMobStoreCompactor.java | 6 +-
.../org/apache/hadoop/hbase/mob/MobTestUtil.java | 22 ++
.../hbase/mob/TestDefaultMobStoreFlusher.java | 38 ++-
.../hadoop/hbase/mob/TestMobCompactionOptMode.java | 11 +-
.../mob/TestMobCompactionOptRegionBatchMode.java | 14 +-
.../TestMobCompactionRegularRegionBatchMode.java | 13 +-
.../hbase/mob/TestMobCompactionWithDefaults.java | 57 +++--
.../hadoop/hbase/mob/TestMobFileCleanerChore.java | 29 ++-
.../hadoop/hbase/mob/TestMobStoreCompaction.java | 29 ++-
.../org/apache/hadoop/hbase/mob/TestMobUtils.java | 5 +
...erChore.java => TestRSMobFileCleanerChore.java} | 85 +++++--
20 files changed, 664 insertions(+), 116 deletions(-)
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreCompactor.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreCompactor.java
index 57a991e45f3..f568af0b19a 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreCompactor.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreCompactor.java
@@ -293,20 +293,20 @@ public class DefaultMobStoreCompactor extends
DefaultCompactor {
* @param cleanSeqId When true, remove seqId(used to be mvcc)
value which is <=
* smallestReadPoint
* @param throughputController The compaction throughput controller.
- * @param major Is a major compaction.
- * @param numofFilesToCompact the number of files to compact
+ * @param request compaction request.
* @param progress Progress reporter.
* @return Whether compaction ended; false if it was interrupted for any
reason.
*/
@Override
protected boolean performCompaction(FileDetails fd, InternalScanner scanner,
CellSink writer,
long smallestReadPoint, boolean cleanSeqId, ThroughputController
throughputController,
- boolean major, int numofFilesToCompact, CompactionProgress progress)
throws IOException {
+ CompactionRequestImpl request, CompactionProgress progress) throws
IOException {
long bytesWrittenProgressForLog = 0;
long bytesWrittenProgressForShippedCall = 0;
// Clear old mob references
mobRefSet.get().clear();
boolean isUserRequest = userRequest.get();
+ boolean major = request.isAllFiles();
boolean compactMOBs = major && isUserRequest;
boolean discardMobMiss =
conf.getBoolean(MobConstants.MOB_UNSAFE_DISCARD_MISS_KEY,
MobConstants.DEFAULT_MOB_DISCARD_MISS);
@@ -350,12 +350,12 @@ public class DefaultMobStoreCompactor extends
DefaultCompactor {
throughputController.start(compactionName);
KeyValueScanner kvs = (scanner instanceof KeyValueScanner) ?
(KeyValueScanner) scanner : null;
long shippedCallSizeLimit =
- (long) numofFilesToCompact *
this.store.getColumnFamilyDescriptor().getBlocksize();
+ (long) request.getFiles().size() *
this.store.getColumnFamilyDescriptor().getBlocksize();
Cell mobCell = null;
try {
- mobFileWriter = newMobWriter(fd, major);
+ mobFileWriter = newMobWriter(fd, major,
request.getWriterCreationTracker());
fileName = Bytes.toBytes(mobFileWriter.getPath().getName());
do {
@@ -435,7 +435,7 @@ public class DefaultMobStoreCompactor extends
DefaultCompactor {
LOG.debug("Closing output MOB File, length={} file={},
store={}", len,
mobFileWriter.getPath().getName(), getStoreInfo());
commitOrAbortMobWriter(mobFileWriter, fd.maxSeqId,
mobCells, major);
- mobFileWriter = newMobWriter(fd, major);
+ mobFileWriter = newMobWriter(fd, major,
request.getWriterCreationTracker());
fileName =
Bytes.toBytes(mobFileWriter.getPath().getName());
mobCells = 0;
}
@@ -479,7 +479,7 @@ public class DefaultMobStoreCompactor extends
DefaultCompactor {
long len = mobFileWriter.getPos();
if (len > maxMobFileSize) {
commitOrAbortMobWriter(mobFileWriter, fd.maxSeqId,
mobCells, major);
- mobFileWriter = newMobWriter(fd, major);
+ mobFileWriter = newMobWriter(fd, major,
request.getWriterCreationTracker());
fileName =
Bytes.toBytes(mobFileWriter.getPath().getName());
mobCells = 0;
}
@@ -531,7 +531,7 @@ public class DefaultMobStoreCompactor extends
DefaultCompactor {
long len = mobFileWriter.getPos();
if (len > maxMobFileSize) {
commitOrAbortMobWriter(mobFileWriter, fd.maxSeqId, mobCells,
major);
- mobFileWriter = newMobWriter(fd, major);
+ mobFileWriter = newMobWriter(fd, major,
request.getWriterCreationTracker());
fileName = Bytes.toBytes(mobFileWriter.getPath().getName());
mobCells = 0;
}
@@ -617,11 +617,16 @@ public class DefaultMobStoreCompactor extends
DefaultCompactor {
}
}
- private StoreFileWriter newMobWriter(FileDetails fd, boolean major) throws
IOException {
+ private StoreFileWriter newMobWriter(FileDetails fd, boolean major,
+ Consumer<Path> writerCreationTracker) throws IOException {
try {
- StoreFileWriter mobFileWriter = mobStore.createWriterInTmp(new
Date(fd.latestPutTs),
- fd.maxKeyCount, major ? majorCompactionCompression :
minorCompactionCompression,
- store.getRegionInfo().getStartKey(), true);
+ StoreFileWriter mobFileWriter =
mobStore.getStoreEngine().requireWritingToTmpDirFirst()
+ ? mobStore.createWriterInTmp(new Date(fd.latestPutTs), fd.maxKeyCount,
+ major ? majorCompactionCompression : minorCompactionCompression,
+ store.getRegionInfo().getStartKey(), true)
+ : mobStore.createWriter(new Date(fd.latestPutTs), fd.maxKeyCount,
+ major ? majorCompactionCompression : minorCompactionCompression,
+ store.getRegionInfo().getStartKey(), true, writerCreationTracker);
LOG.debug("New MOB writer created={} store={}",
mobFileWriter.getPath().getName(),
getStoreInfo());
// Add reference we get for compact MOB
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreFlusher.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreFlusher.java
index a7f2ecdf242..6df17e58e22 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreFlusher.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreFlusher.java
@@ -127,7 +127,8 @@ public class DefaultMobStoreFlusher extends
DefaultStoreFlusher {
try {
// It's a mob store, flush the cells in a mob way. This is the
difference of flushing
// between a normal and a mob store.
- performMobFlush(snapshot, cacheFlushId, scanner, writer, status,
throughputController);
+ performMobFlush(snapshot, cacheFlushId, scanner, writer, status,
throughputController,
+ writerCreationTracker);
} catch (IOException ioe) {
e = ioe;
// throw the exception out
@@ -171,16 +172,21 @@ public class DefaultMobStoreFlusher extends
DefaultStoreFlusher {
*/
protected void performMobFlush(MemStoreSnapshot snapshot, long cacheFlushId,
InternalScanner scanner, StoreFileWriter writer, MonitoredTask status,
- ThroughputController throughputController) throws IOException {
+ ThroughputController throughputController, Consumer<Path>
writerCreationTracker)
+ throws IOException {
StoreFileWriter mobFileWriter = null;
int compactionKVMax =
conf.getInt(HConstants.COMPACTION_KV_MAX,
HConstants.COMPACTION_KV_MAX_DEFAULT);
long mobCount = 0;
long mobSize = 0;
long time = snapshot.getTimeRangeTracker().getMax();
- mobFileWriter = mobStore.createWriterInTmp(new Date(time),
snapshot.getCellsCount(),
- store.getColumnFamilyDescriptor().getCompressionType(),
store.getRegionInfo().getStartKey(),
- false);
+ mobFileWriter = mobStore.getStoreEngine().requireWritingToTmpDirFirst()
+ ? mobStore.createWriterInTmp(new Date(time), snapshot.getCellsCount(),
+ store.getColumnFamilyDescriptor().getCompressionType(),
store.getRegionInfo().getStartKey(),
+ false)
+ : mobStore.createWriter(new Date(time), snapshot.getCellsCount(),
+ store.getColumnFamilyDescriptor().getCompressionType(),
store.getRegionInfo().getStartKey(),
+ false, writerCreationTracker);
// the target path is {tableName}/.mob/{cfName}/mobFiles
// the relative path is mobFiles
byte[] fileName = Bytes.toBytes(mobFileWriter.getPath().getName());
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobFileCleanerChore.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobFileCleanerChore.java
index 1258a17a8eb..2c78c6f5ac7 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobFileCleanerChore.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobFileCleanerChore.java
@@ -160,14 +160,15 @@ public class MobFileCleanerChore extends ScheduledChore {
maxCreationTimeToArchive, table);
}
+ FileSystem fs = FileSystem.get(conf);
+ Set<String> regionNames = new HashSet<>();
Path rootDir = CommonFSUtils.getRootDir(conf);
Path tableDir = CommonFSUtils.getTableDir(rootDir, table);
- // How safe is this call?
- List<Path> regionDirs = FSUtils.getRegionDirs(FileSystem.get(conf),
tableDir);
+ List<Path> regionDirs = FSUtils.getRegionDirs(fs, tableDir);
Set<String> allActiveMobFileName = new HashSet<String>();
- FileSystem fs = FileSystem.get(conf);
for (Path regionPath : regionDirs) {
+ regionNames.add(regionPath.getName());
for (ColumnFamilyDescriptor hcd : list) {
String family = hcd.getNameAsString();
Path storePath = new Path(regionPath, family);
@@ -195,13 +196,26 @@ public class MobFileCleanerChore extends ScheduledChore {
for (Path pp : storeFiles) {
currentPath = pp;
LOG.trace("Store file: {}", pp);
- HStoreFile sf =
- new HStoreFile(fs, pp, conf, CacheConfig.DISABLED,
BloomType.NONE, true);
- sf.initReader();
- byte[] mobRefData =
sf.getMetadataValue(HStoreFile.MOB_FILE_REFS);
- byte[] bulkloadMarkerData =
sf.getMetadataValue(HStoreFile.BULKLOAD_TASK_KEY);
- // close store file to avoid memory leaks
- sf.closeStoreFile(true);
+ HStoreFile sf = null;
+ byte[] mobRefData = null;
+ byte[] bulkloadMarkerData = null;
+ try {
+ sf = new HStoreFile(fs, pp, conf, CacheConfig.DISABLED,
BloomType.NONE, true);
+ sf.initReader();
+ mobRefData = sf.getMetadataValue(HStoreFile.MOB_FILE_REFS);
+ bulkloadMarkerData =
sf.getMetadataValue(HStoreFile.BULKLOAD_TASK_KEY);
+ // close store file to avoid memory leaks
+ sf.closeStoreFile(true);
+ } catch (IOException ex) {
+ // When FileBased SFT is active the store dir can contain
corrupted or incomplete
+ // files. So read errors are expected. We just skip these
files.
+ if (ex instanceof FileNotFoundException) {
+ throw ex;
+ }
+ LOG.debug("Failed to get mob data from file: {} due to
error.", pp.toString(),
+ ex);
+ continue;
+ }
if (mobRefData == null) {
if (bulkloadMarkerData == null) {
LOG.warn("Found old store file with no MOB_FILE_REFS: {} -
"
@@ -256,9 +270,11 @@ public class MobFileCleanerChore extends ScheduledChore {
while (rit.hasNext()) {
LocatedFileStatus lfs = rit.next();
Path p = lfs.getPath();
- if (!allActiveMobFileName.contains(p.getName())) {
- // MOB is not in a list of active references, but it can be too
- // fresh, skip it in this case
+ String[] mobParts = p.getName().split("_");
+ String regionName = mobParts[mobParts.length - 1];
+
+ if (!regionNames.contains(regionName)) {
+ // MOB belonged to a region no longer hosted
long creationTime = fs.getFileStatus(p).getModificationTime();
if (creationTime < maxCreationTimeToArchive) {
LOG.trace("Archiving MOB file {} creation time={}", p,
@@ -269,7 +285,7 @@ public class MobFileCleanerChore extends ScheduledChore {
fs.getFileStatus(p).getModificationTime());
}
} else {
- LOG.trace("Keeping active MOB file: {}", p);
+ LOG.trace("Keeping MOB file with existing region: {}", p);
}
}
LOG.info(" MOB Cleaner found {} files to archive for table={}
family={}", toArchive.size(),
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobUtils.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobUtils.java
index 5e2ee9eb411..43cf4255235 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobUtils.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobUtils.java
@@ -33,6 +33,7 @@ import java.util.Date;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
+import java.util.function.Consumer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
@@ -584,6 +585,33 @@ public final class MobUtils {
CacheConfig cacheConfig, Encryption.Context cryptoContext, ChecksumType
checksumType,
int bytesPerChecksum, int blocksize, BloomType bloomType, boolean
isCompaction)
throws IOException {
+ return createWriter(conf, fs, family, path, maxKeyCount, compression,
cacheConfig,
+ cryptoContext, checksumType, bytesPerChecksum, blocksize, bloomType,
isCompaction, null);
+ }
+
+ /**
+ * Creates a writer for the mob file in temp directory.
+ * @param conf The current configuration.
+ * @param fs The current file system.
+ * @param family The descriptor of the current column family.
+ * @param path The path for a temp directory.
+ * @param maxKeyCount The key count.
+ * @param compression The compression algorithm.
+ * @param cacheConfig The current cache config.
+ * @param cryptoContext The encryption context.
+ * @param checksumType The checksum type.
+ * @param bytesPerChecksum The bytes per checksum.
+ * @param blocksize The HFile block size.
+ * @param bloomType The bloom filter type.
+ * @param isCompaction If the writer is used in compaction.
+ * @param writerCreationTracker to track the current writer in the store
+ * @return The writer for the mob file.
+ */
+ public static StoreFileWriter createWriter(Configuration conf, FileSystem fs,
+ ColumnFamilyDescriptor family, Path path, long maxKeyCount,
Compression.Algorithm compression,
+ CacheConfig cacheConfig, Encryption.Context cryptoContext, ChecksumType
checksumType,
+ int bytesPerChecksum, int blocksize, BloomType bloomType, boolean
isCompaction,
+ Consumer<Path> writerCreationTracker) throws IOException {
if (compression == null) {
compression = HFile.DEFAULT_COMPRESSION_ALGORITHM;
}
@@ -602,7 +630,8 @@ public final class MobUtils {
.withCreateTime(EnvironmentEdgeManager.currentTime()).build();
StoreFileWriter w = new StoreFileWriter.Builder(conf, writerCacheConf,
fs).withFilePath(path)
-
.withBloomType(bloomType).withMaxKeyCount(maxKeyCount).withFileContext(hFileContext).build();
+
.withBloomType(bloomType).withMaxKeyCount(maxKeyCount).withFileContext(hFileContext)
+ .withWriterCreationTracker(writerCreationTracker).build();
return w;
}
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/RSMobFileCleanerChore.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/RSMobFileCleanerChore.java
new file mode 100644
index 00000000000..06e34988733
--- /dev/null
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/RSMobFileCleanerChore.java
@@ -0,0 +1,272 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mob;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.hbase.ScheduledChore;
+import org.apache.hadoop.hbase.TableDescriptors;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.backup.HFileArchiver;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
+import org.apache.hadoop.hbase.regionserver.HStore;
+import org.apache.hadoop.hbase.regionserver.HStoreFile;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.CommonFSUtils;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hbase.thirdparty.com.google.common.collect.SetMultimap;
+
+/**
+ * The class RSMobFileCleanerChore for running cleaner regularly to remove the
obsolete (files which
+ * have no active references to) mob files that were referenced from the
current RS.
+ */
[email protected]
+public class RSMobFileCleanerChore extends ScheduledChore {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(RSMobFileCleanerChore.class);
+ private final HRegionServer rs;
+
+ public RSMobFileCleanerChore(HRegionServer rs) {
+ super(rs.getServerName() + "-MobFileCleanerChore", rs,
+ rs.getConfiguration().getInt(MobConstants.MOB_CLEANER_PERIOD,
+ MobConstants.DEFAULT_MOB_CLEANER_PERIOD),
+ Math.round(rs.getConfiguration().getInt(MobConstants.MOB_CLEANER_PERIOD,
+ MobConstants.DEFAULT_MOB_CLEANER_PERIOD)
+ * ((ThreadLocalRandom.current().nextDouble() + 0.5D))),
+ TimeUnit.SECONDS);
+ // to prevent a load spike on the fs the initial delay is modified by +/-
50%
+ this.rs = rs;
+ }
+
+ public RSMobFileCleanerChore() {
+ this.rs = null;
+ }
+
+ @Override
+ protected void chore() {
+
+ long minAgeToArchive =
rs.getConfiguration().getLong(MobConstants.MIN_AGE_TO_ARCHIVE_KEY,
+ MobConstants.DEFAULT_MIN_AGE_TO_ARCHIVE);
+ // We check only those MOB files, which creation time is less
+ // than maxCreationTimeToArchive. This is a current time - 1h. 1 hour gap
+ // gives us full confidence that all corresponding store files will
+ // exist at the time cleaning procedure begins and will be examined.
+ // So, if MOB file creation time is greater than this maxTimeToArchive,
+ // this will be skipped and won't be archived.
+ long maxCreationTimeToArchive = EnvironmentEdgeManager.currentTime() -
minAgeToArchive;
+
+ TableDescriptors htds = rs.getTableDescriptors();
+ try {
+ FileSystem fs = FileSystem.get(rs.getConfiguration());
+
+ Map<String, TableDescriptor> map = null;
+ try {
+ map = htds.getAll();
+ } catch (IOException e) {
+ LOG.error("MobFileCleanerChore failed", e);
+ return;
+ }
+ Map<String, Map<String, List<String>>> referencedMOBs = new HashMap<>();
+ for (TableDescriptor htd : map.values()) {
+ // Now clean obsolete files for a table
+ LOG.info("Cleaning obsolete MOB files from table={}",
htd.getTableName());
+ List<ColumnFamilyDescriptor> list = MobUtils.getMobColumnFamilies(htd);
+ List<HRegion> regions = rs.getRegions(htd.getTableName());
+ for (HRegion region : regions) {
+ for (ColumnFamilyDescriptor hcd : list) {
+ HStore store = region.getStore(hcd.getName());
+ Collection<HStoreFile> sfs = store.getStorefiles();
+ Set<String> regionMobs = new HashSet<String>();
+ Path currentPath = null;
+ try {
+ // collectinng referenced MOBs
+ for (HStoreFile sf : sfs) {
+ currentPath = sf.getPath();
+ sf.initReader();
+ byte[] mobRefData =
sf.getMetadataValue(HStoreFile.MOB_FILE_REFS);
+ byte[] bulkloadMarkerData =
sf.getMetadataValue(HStoreFile.BULKLOAD_TASK_KEY);
+ // close store file to avoid memory leaks
+ sf.closeStoreFile(true);
+ if (mobRefData == null) {
+ if (bulkloadMarkerData == null) {
+ LOG.warn(
+ "Found old store file with no MOB_FILE_REFS: {} - "
+ + "can not proceed until all old files will be
MOB-compacted.",
+ currentPath);
+ return;
+ } else {
+ LOG.debug("Skipping file without MOB references
(bulkloaded file):{}",
+ currentPath);
+ continue;
+ }
+ }
+ // file may or may not have MOB references, but was created by
the distributed
+ // mob compaction code.
+ try {
+ SetMultimap<TableName, String> mobs =
+ MobUtils.deserializeMobFileRefs(mobRefData).build();
+ LOG.debug("Found {} mob references for store={}",
mobs.size(), sf);
+ LOG.trace("Specific mob references found for store={} : {}",
sf, mobs);
+ regionMobs.addAll(mobs.values());
+ } catch (RuntimeException exception) {
+ throw new IOException("failure getting mob references for
hfile " + sf,
+ exception);
+ }
+ }
+ // collecting files, MOB included currently being written
+ regionMobs.addAll(store.getStoreFilesBeingWritten().stream()
+ .map(path -> path.getName()).collect(Collectors.toList()));
+
+ referencedMOBs
+ .computeIfAbsent(hcd.getNameAsString(), cf -> new
HashMap<String, List<String>>())
+ .computeIfAbsent(region.getRegionInfo().getEncodedName(), name
-> new ArrayList<>())
+ .addAll(regionMobs);
+
+ } catch (FileNotFoundException e) {
+ LOG.warn(
+ "Missing file:{} Starting MOB cleaning cycle from the
beginning" + " due to error",
+ currentPath, e);
+ regionMobs.clear();
+ continue;
+ } catch (IOException e) {
+ LOG.error("Failed to clean the obsolete mob files for table={}",
+ htd.getTableName().getNameAsString(), e);
+ }
+ }
+ }
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Found: {} active mob refs for table={}",
+ referencedMOBs.values().stream().map(inner -> inner.values())
+ .flatMap(lists -> lists.stream()).mapToInt(lists ->
lists.size()).sum(),
+ htd.getTableName().getNameAsString());
+ }
+ if (LOG.isTraceEnabled()) {
+ referencedMOBs.values().stream().forEach(innerMap ->
innerMap.values().stream()
+ .forEach(mobFileList -> mobFileList.stream().forEach(LOG::trace)));
+ }
+
+ // collect regions referencing MOB files belonging to the current rs
+ Set<String> regionsCovered = new HashSet<>();
+ referencedMOBs.values().stream()
+ .forEach(regionMap -> regionsCovered.addAll(regionMap.keySet()));
+
+ for (ColumnFamilyDescriptor hcd : list) {
+ List<Path> toArchive = new ArrayList<Path>();
+ String family = hcd.getNameAsString();
+ Path dir = MobUtils.getMobFamilyPath(rs.getConfiguration(),
htd.getTableName(), family);
+ RemoteIterator<LocatedFileStatus> rit = fs.listLocatedStatus(dir);
+ while (rit.hasNext()) {
+ LocatedFileStatus lfs = rit.next();
+ Path p = lfs.getPath();
+ String[] mobParts = p.getName().split("_");
+ String regionName = mobParts[mobParts.length - 1];
+
+ // skip MOB files not belonging to a region assigned to the
current rs
+ if (!regionsCovered.contains(regionName)) {
+ LOG.trace("MOB file does not belong to current rs: {}", p);
+ continue;
+ }
+
+ // check active or actively written mob files
+ Map<String, List<String>> cfMobs =
referencedMOBs.get(hcd.getNameAsString());
+ if (
+ cfMobs != null && cfMobs.get(regionName) != null
+ && cfMobs.get(regionName).contains(p.getName())
+ ) {
+ LOG.trace("Keeping active MOB file: {}", p);
+ continue;
+ }
+
+ // MOB is not in a list of active references, but it can be too
+ // fresh, skip it in this case
+ long creationTime = fs.getFileStatus(p).getModificationTime();
+ if (creationTime < maxCreationTimeToArchive) {
+ LOG.trace("Archiving MOB file {} creation time={}", p,
+ (fs.getFileStatus(p).getModificationTime()));
+ toArchive.add(p);
+ } else {
+ LOG.trace("Skipping fresh file: {}. Creation time={}", p,
+ fs.getFileStatus(p).getModificationTime());
+ }
+
+ }
+ LOG.info(" MOB Cleaner found {} files to archive for table={}
family={}",
+ toArchive.size(), htd.getTableName().getNameAsString(), family);
+ archiveMobFiles(rs.getConfiguration(), htd.getTableName(),
family.getBytes(), toArchive);
+ LOG.info(" MOB Cleaner archived {} files, table={} family={}",
toArchive.size(),
+ htd.getTableName().getNameAsString(), family);
+ }
+
+ LOG.info("Cleaning obsolete MOB files finished for table={}",
htd.getTableName());
+
+ }
+ } catch (IOException e) {
+ LOG.error("MOB Cleaner failed when trying to access the file system", e);
+ }
+ }
+
+ /**
+ * Archives the mob files.
+ * @param conf The current configuration.
+ * @param tableName The table name.
+ * @param family The name of the column family.
+ * @param storeFiles The files to be archived.
+ * @throws IOException exception
+ */
+ public void archiveMobFiles(Configuration conf, TableName tableName, byte[]
family,
+ List<Path> storeFiles) throws IOException {
+
+ if (storeFiles.size() == 0) {
+ // nothing to remove
+ LOG.debug("Skipping archiving old MOB files - no files found for
table={} cf={}", tableName,
+ Bytes.toString(family));
+ return;
+ }
+ Path mobTableDir = CommonFSUtils.getTableDir(MobUtils.getMobHome(conf),
tableName);
+ FileSystem fs = storeFiles.get(0).getFileSystem(conf);
+
+ for (Path p : storeFiles) {
+ LOG.debug("MOB Cleaner is archiving: {}", p);
+ HFileArchiver.archiveStoreFile(conf, fs,
MobUtils.getMobRegionInfo(tableName), mobTableDir,
+ family, p);
+ }
+ }
+}
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HMobStore.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HMobStore.java
index 931357a9fc5..13b7cc022bb 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HMobStore.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HMobStore.java
@@ -28,6 +28,7 @@ import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Consumer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -184,7 +185,27 @@ public class HMobStore extends HStore {
}
Path path = getTempDir();
return createWriterInTmp(MobUtils.formatDate(date), path, maxKeyCount,
compression, startKey,
- isCompaction);
+ isCompaction, null);
+ }
+
+ /**
+ * Creates the writer for the mob file in the mob family directory.
+ * @param date The latest date of written cells.
+ * @param maxKeyCount The key count.
+ * @param compression The compression algorithm.
+ * @param startKey The start key.
+ * @param isCompaction If the writer is used in compaction.
+ * @return The writer for the mob file. n
+ */
+ public StoreFileWriter createWriter(Date date, long maxKeyCount,
+ Compression.Algorithm compression, byte[] startKey, boolean isCompaction,
+ Consumer<Path> writerCreationTracker) throws IOException {
+ if (startKey == null) {
+ startKey = HConstants.EMPTY_START_ROW;
+ }
+ Path path = getPath();
+ return createWriterInTmp(MobUtils.formatDate(date), path, maxKeyCount,
compression, startKey,
+ isCompaction, writerCreationTracker);
}
/**
@@ -198,11 +219,13 @@ public class HMobStore extends HStore {
* @return The writer for the mob file. n
*/
public StoreFileWriter createWriterInTmp(String date, Path basePath, long
maxKeyCount,
- Compression.Algorithm compression, byte[] startKey, boolean isCompaction)
throws IOException {
+ Compression.Algorithm compression, byte[] startKey, boolean isCompaction,
+ Consumer<Path> writerCreationTracker) throws IOException {
MobFileName mobFileName =
MobFileName.create(startKey, date,
UUID.randomUUID().toString().replaceAll("-", ""),
getHRegion().getRegionInfo().getEncodedName());
- return createWriterInTmp(mobFileName, basePath, maxKeyCount, compression,
isCompaction);
+ return createWriterInTmp(mobFileName, basePath, maxKeyCount, compression,
isCompaction,
+ writerCreationTracker);
}
/**
@@ -214,13 +237,15 @@ public class HMobStore extends HStore {
* @param isCompaction If the writer is used in compaction.
* @return The writer for the mob file. n
*/
+
public StoreFileWriter createWriterInTmp(MobFileName mobFileName, Path
basePath, long maxKeyCount,
- Compression.Algorithm compression, boolean isCompaction) throws
IOException {
+ Compression.Algorithm compression, boolean isCompaction, Consumer<Path>
writerCreationTracker)
+ throws IOException {
return MobUtils.createWriter(conf, getFileSystem(),
getColumnFamilyDescriptor(),
new Path(basePath, mobFileName.getFileName()), maxKeyCount, compression,
getCacheConfig(),
getStoreContext().getEncryptionContext(),
StoreUtils.getChecksumType(conf),
StoreUtils.getBytesPerChecksum(conf), getStoreContext().getBlockSize(),
BloomType.NONE,
- isCompaction);
+ isCompaction, writerCreationTracker);
}
/**
@@ -234,6 +259,10 @@ public class HMobStore extends HStore {
}
Path dstPath = new Path(targetPath, sourceFile.getName());
validateMobFile(sourceFile);
+ if (sourceFile.equals(targetPath)) {
+ LOG.info("File is already in the destination dir: {}", sourceFile);
+ return;
+ }
LOG.info(" FLUSH Renaming flushed file from {} to {}", sourceFile,
dstPath);
Path parent = dstPath.getParent();
if (!getFileSystem().exists(parent)) {
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
index 1865929dc5e..e79f4bec612 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
@@ -108,6 +108,7 @@ import
org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
import org.apache.hadoop.hbase.ipc.ServerRpcController;
import org.apache.hadoop.hbase.log.HBaseMarkers;
import org.apache.hadoop.hbase.mob.MobFileCache;
+import org.apache.hadoop.hbase.mob.RSMobFileCleanerChore;
import org.apache.hadoop.hbase.monitoring.TaskMonitor;
import org.apache.hadoop.hbase.namequeues.NamedQueueRecorder;
import org.apache.hadoop.hbase.namequeues.SlowLogTableOpsChore;
@@ -438,6 +439,8 @@ public class HRegionServer extends
HBaseServerBase<RSRpcServices>
private BrokenStoreFileCleaner brokenStoreFileCleaner;
+ private RSMobFileCleanerChore rsMobFileCleanerChore;
+
@InterfaceAudience.Private
CompactedHFilesDischarger compactedFileDischarger;
@@ -1898,6 +1901,10 @@ public class HRegionServer extends
HBaseServerBase<RSRpcServices>
choreService.scheduleChore(brokenStoreFileCleaner);
}
+ if (this.rsMobFileCleanerChore != null) {
+ choreService.scheduleChore(rsMobFileCleanerChore);
+ }
+
// Leases is not a Thread. Internally it runs a daemon thread. If it gets
// an unhandled exception, it will just exit.
Threads.setDaemonThreadRunning(this.leaseManager, getName() +
".leaseChecker",
@@ -1994,6 +2001,8 @@ public class HRegionServer extends
HBaseServerBase<RSRpcServices>
new BrokenStoreFileCleaner((int) (brokenStoreFileCleanerDelay +
jitterValue),
brokenStoreFileCleanerPeriod, this, conf, this);
+ this.rsMobFileCleanerChore = new RSMobFileCleanerChore(this);
+
registerConfigurationObservers();
}
@@ -3550,6 +3559,11 @@ public class HRegionServer extends
HBaseServerBase<RSRpcServices>
return brokenStoreFileCleaner;
}
+ @InterfaceAudience.Private
+ public RSMobFileCleanerChore getRSMobFileCleanerChore() {
+ return rsMobFileCleanerChore;
+ }
+
RSSnapshotVerifier getRsSnapshotVerifier() {
return rsSnapshotVerifier;
}
@@ -3566,6 +3580,7 @@ public class HRegionServer extends
HBaseServerBase<RSRpcServices>
shutdownChore(fsUtilizationChore);
shutdownChore(slowLogTableOpsChore);
shutdownChore(brokenStoreFileCleaner);
+ shutdownChore(rsMobFileCleanerChore);
}
@Override
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
index 020009c7f5c..33c2910fcea 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
@@ -2432,7 +2432,7 @@ public class HStore
* {@link BrokenStoreFileCleaner} to prevent deleting the these files as
they are not present in
* SFT yet.
*/
- Set<Path> getStoreFilesBeingWritten() {
+ public Set<Path> getStoreFilesBeingWritten() {
return storeFileWriterCreationTrackers.stream().flatMap(t ->
t.get().stream())
.collect(Collectors.toSet());
}
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java
index e66a3e05a42..bd9ce6035ad 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java
@@ -362,7 +362,7 @@ public abstract class Compactor<T extends CellSink> {
writer = sinkFactory.createWriter(scanner, fd, dropCache,
request.isMajor(),
request.getWriterCreationTracker());
finished = performCompaction(fd, scanner, writer, smallestReadPoint,
cleanSeqId,
- throughputController, request.isAllFiles(), request.getFiles().size(),
progress);
+ throughputController, request, progress);
if (!finished) {
throw new InterruptedIOException("Aborting compaction of store " +
store + " in region "
+ store.getRegionInfo().getRegionNameAsString() + " because it was
interrupted.");
@@ -401,20 +401,19 @@ public abstract class Compactor<T extends CellSink> {
/**
* Performs the compaction.
- * @param fd FileDetails of cell sink writer
- * @param scanner Where to read from.
- * @param writer Where to write to.
- * @param smallestReadPoint Smallest read point.
- * @param cleanSeqId When true, remove seqId(used to be mvcc) value
which is <=
- * smallestReadPoint
- * @param major Is a major compaction.
- * @param numofFilesToCompact the number of files to compact
- * @param progress Progress reporter.
+ * @param fd FileDetails of cell sink writer
+ * @param scanner Where to read from.
+ * @param writer Where to write to.
+ * @param smallestReadPoint Smallest read point.
+ * @param cleanSeqId When true, remove seqId(used to be mvcc) value
which is <=
+ * smallestReadPoint
+ * @param request compaction request.
+ * @param progress Progress reporter.
* @return Whether compaction ended; false if it was interrupted for some
reason.
*/
protected boolean performCompaction(FileDetails fd, InternalScanner scanner,
CellSink writer,
long smallestReadPoint, boolean cleanSeqId, ThroughputController
throughputController,
- boolean major, int numofFilesToCompact, CompactionProgress progress)
throws IOException {
+ CompactionRequestImpl request, CompactionProgress progress) throws
IOException {
assert writer instanceof ShipperListener;
long bytesWrittenProgressForLog = 0;
long bytesWrittenProgressForShippedCall = 0;
@@ -436,7 +435,7 @@ public abstract class Compactor<T extends CellSink> {
throughputController.start(compactionName);
KeyValueScanner kvs = (scanner instanceof KeyValueScanner) ?
(KeyValueScanner) scanner : null;
long shippedCallSizeLimit =
- (long) numofFilesToCompact *
this.store.getColumnFamilyDescriptor().getBlocksize();
+ (long) request.getFiles().size() *
this.store.getColumnFamilyDescriptor().getBlocksize();
try {
do {
hasMore = scanner.next(cells, scannerContext);
diff --git
a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/FaultyMobStoreCompactor.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/FaultyMobStoreCompactor.java
index af45263a17c..96675fb69e5 100644
---
a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/FaultyMobStoreCompactor.java
+++
b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/FaultyMobStoreCompactor.java
@@ -44,6 +44,7 @@ import org.apache.hadoop.hbase.regionserver.ShipperListener;
import org.apache.hadoop.hbase.regionserver.StoreFileWriter;
import org.apache.hadoop.hbase.regionserver.compactions.CloseChecker;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequestImpl;
import org.apache.hadoop.hbase.regionserver.throttle.ThroughputControlUtil;
import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
import org.apache.hadoop.hbase.util.Bytes;
@@ -92,8 +93,9 @@ public class FaultyMobStoreCompactor extends
DefaultMobStoreCompactor {
@Override
protected boolean performCompaction(FileDetails fd, InternalScanner scanner,
CellSink writer,
long smallestReadPoint, boolean cleanSeqId, ThroughputController
throughputController,
- boolean major, int numofFilesToCompact, CompactionProgress progress)
throws IOException {
+ CompactionRequestImpl request, CompactionProgress progress) throws
IOException {
+ boolean major = request.isAllFiles();
totalCompactions.incrementAndGet();
if (major) {
totalMajorCompactions.incrementAndGet();
@@ -145,7 +147,7 @@ public class FaultyMobStoreCompactor extends
DefaultMobStoreCompactor {
throughputController.start(compactionName);
KeyValueScanner kvs = (scanner instanceof KeyValueScanner) ?
(KeyValueScanner) scanner : null;
long shippedCallSizeLimit =
- (long) numofFilesToCompact *
this.store.getColumnFamilyDescriptor().getBlocksize();
+ (long) request.getFiles().size() *
this.store.getColumnFamilyDescriptor().getBlocksize();
Cell mobCell = null;
diff --git
a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/MobTestUtil.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/MobTestUtil.java
index 2753ba33af4..9b51c22db19 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/MobTestUtil.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/MobTestUtil.java
@@ -18,17 +18,26 @@
package org.apache.hadoop.hbase.mob;
import java.io.IOException;
+import java.util.Date;
import java.util.List;
import java.util.Random;
import java.util.concurrent.ThreadLocalRandom;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseTestingUtil;
import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.io.compress.Compression;
+import org.apache.hadoop.hbase.io.crypto.Encryption;
+import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.regionserver.StoreFileWriter;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
@@ -112,4 +121,17 @@ public class MobTestUtil {
scan.setAttribute(MobConstants.MOB_SCAN_RAW, Bytes.toBytes(Boolean.TRUE));
return HBaseTestingUtil.countRows(table, scan);
}
+
+ public static Path generateMOBFileForRegion(Configuration conf, TableName
tableName,
+ ColumnFamilyDescriptor familyDescriptor, String regionName) throws
IOException {
+ Date date = new Date();
+ String dateStr = MobUtils.formatDate(date);
+ FileSystem fs = FileSystem.get(conf);
+ Path cfMOBDir = MobUtils.getMobFamilyPath(conf, tableName,
familyDescriptor.getNameAsString());
+ StoreFileWriter writer = MobUtils.createWriter(conf, fs, familyDescriptor,
dateStr, cfMOBDir,
+ 1000L, Compression.Algorithm.NONE, "startKey", CacheConfig.DISABLED,
Encryption.Context.NONE,
+ false, "");
+ writer.close();
+ return writer.getPath();
+ }
}
diff --git
a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestDefaultMobStoreFlusher.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestDefaultMobStoreFlusher.java
index 70e7bbf4cb4..6b7ec952d05 100644
---
a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestDefaultMobStoreFlusher.java
+++
b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestDefaultMobStoreFlusher.java
@@ -17,6 +17,8 @@
*/
package org.apache.hadoop.hbase.mob;
+import java.util.Arrays;
+import java.util.Collection;
import java.util.List;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
@@ -31,17 +33,21 @@ import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
+import
org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerFactory;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.util.Bytes;
-import org.junit.AfterClass;
+import org.junit.After;
import org.junit.Assert;
-import org.junit.BeforeClass;
+import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+@RunWith(Parameterized.class)
@Category(LargeTests.class)
public class TestDefaultMobStoreFlusher {
@@ -61,19 +67,35 @@ public class TestDefaultMobStoreFlusher {
@Rule
public TestName name = new TestName();
- @BeforeClass
- public static void setUpBeforeClass() throws Exception {
+ protected Boolean useFileBasedSFT;
+
+ public TestDefaultMobStoreFlusher(Boolean useFileBasedSFT) {
+ this.useFileBasedSFT = useFileBasedSFT;
+ }
+
+ @Parameterized.Parameters
+ public static Collection<Boolean> data() {
+ Boolean[] data = { false, true };
+ return Arrays.asList(data);
+ }
+
+ @Before
+ public void setUpBefore() throws Exception {
+ if (useFileBasedSFT) {
+ TEST_UTIL.getConfiguration().set(StoreFileTrackerFactory.TRACKER_IMPL,
+
"org.apache.hadoop.hbase.regionserver.storefiletracker.FileBasedStoreFileTracker");
+ }
TEST_UTIL.startMiniCluster(1);
}
- @AfterClass
- public static void tearDownAfterClass() throws Exception {
+ @After
+ public void tearDownAfter() throws Exception {
TEST_UTIL.shutdownMiniCluster();
}
@Test
public void testFlushNonMobFile() throws Exception {
- final TableName tableName = TableName.valueOf(name.getMethodName());
+ final TableName tableName =
TableName.valueOf(TestMobUtils.getTableName(name));
TableDescriptor tableDescriptor =
TableDescriptorBuilder.newBuilder(tableName)
.setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(family).setMaxVersions(4).build())
.build();
@@ -82,7 +104,7 @@ public class TestDefaultMobStoreFlusher {
@Test
public void testFlushMobFile() throws Exception {
- final TableName tableName = TableName.valueOf(name.getMethodName());
+ final TableName tableName =
TableName.valueOf(TestMobUtils.getTableName(name));
TableDescriptor tableDescriptor =
TableDescriptorBuilder.newBuilder(tableName)
.setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(family).setMobEnabled(true)
.setMobThreshold(3L).setMaxVersions(4).build())
diff --git
a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobCompactionOptMode.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobCompactionOptMode.java
index 5f3cc629959..1c586bbd10c 100644
---
a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobCompactionOptMode.java
+++
b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobCompactionOptMode.java
@@ -17,10 +17,8 @@
*/
package org.apache.hadoop.hbase.mob;
-import java.io.IOException;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.testclassification.LargeTests;
-import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.experimental.categories.Category;
@@ -41,12 +39,13 @@ public class TestMobCompactionOptMode extends
TestMobCompactionWithDefaults {
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestMobCompactionOptMode.class);
- @BeforeClass
- public static void configureOptimizedCompaction() throws
InterruptedException, IOException {
- HTU.shutdownMiniHBaseCluster();
+ public TestMobCompactionOptMode(Boolean useFileBasedSFT) {
+ super(useFileBasedSFT);
+ }
+
+ protected void additonalConfigSetup() {
conf.set(MobConstants.MOB_COMPACTION_TYPE_KEY,
MobConstants.OPTIMIZED_MOB_COMPACTION_TYPE);
conf.setLong(MobConstants.MOB_COMPACTION_MAX_FILE_SIZE_KEY, 1000000);
- HTU.startMiniHBaseCluster();
}
@Override
diff --git
a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobCompactionOptRegionBatchMode.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobCompactionOptRegionBatchMode.java
index 7b6b44d0e31..dfd6435d364 100644
---
a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobCompactionOptRegionBatchMode.java
+++
b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobCompactionOptRegionBatchMode.java
@@ -23,9 +23,10 @@ import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.junit.Before;
-import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -39,6 +40,7 @@ import org.slf4j.LoggerFactory;
* time larger than minimum age to archive 10. Runs Mob cleaner chore 11
Verifies that number of MOB
* files in a mob directory is 20. 12 Runs scanner and checks all 3 * 1000
rows.
*/
+@RunWith(Parameterized.class)
@Category(LargeTests.class)
public class TestMobCompactionOptRegionBatchMode extends
TestMobCompactionWithDefaults {
private static final Logger LOG =
@@ -50,20 +52,20 @@ public class TestMobCompactionOptRegionBatchMode extends
TestMobCompactionWithDe
private static final int batchSize = 7;
private MobFileCompactionChore compactionChore;
+ public TestMobCompactionOptRegionBatchMode(Boolean useFileBasedSFT) {
+ super(useFileBasedSFT);
+ }
+
@Before
public void setUp() throws Exception {
super.setUp();
compactionChore = new MobFileCompactionChore(conf, batchSize);
}
- @BeforeClass
- public static void configureOptimizedCompactionAndBatches()
- throws InterruptedException, IOException {
- HTU.shutdownMiniHBaseCluster();
+ protected void additonalConfigSetup() {
conf.setInt(MobConstants.MOB_MAJOR_COMPACTION_REGION_BATCH_SIZE,
batchSize);
conf.set(MobConstants.MOB_COMPACTION_TYPE_KEY,
MobConstants.OPTIMIZED_MOB_COMPACTION_TYPE);
conf.setLong(MobConstants.MOB_COMPACTION_MAX_FILE_SIZE_KEY, 1000000);
- HTU.startMiniHBaseCluster();
}
@Override
diff --git
a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobCompactionRegularRegionBatchMode.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobCompactionRegularRegionBatchMode.java
index 3d6eaa0a25a..5e2806bdee2 100644
---
a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobCompactionRegularRegionBatchMode.java
+++
b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobCompactionRegularRegionBatchMode.java
@@ -23,9 +23,10 @@ import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.junit.Before;
-import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -39,6 +40,7 @@ import org.slf4j.LoggerFactory;
* to archive 10. Runs Mob cleaner chore 11 Verifies that number of MOB files
in a mob directory is
* 20. 12 Runs scanner and checks all 3 * 1000 rows.
*/
+@RunWith(Parameterized.class)
@Category(LargeTests.class)
public class TestMobCompactionRegularRegionBatchMode extends
TestMobCompactionWithDefaults {
private static final Logger LOG =
@@ -50,17 +52,18 @@ public class TestMobCompactionRegularRegionBatchMode
extends TestMobCompactionWi
private static final int batchSize = 7;
private MobFileCompactionChore compactionChore;
+ public TestMobCompactionRegularRegionBatchMode(Boolean useFileBasedSFT) {
+ super(useFileBasedSFT);
+ }
+
@Before
public void setUp() throws Exception {
super.setUp();
compactionChore = new MobFileCompactionChore(conf, batchSize);
}
- @BeforeClass
- public static void configureCompactionBatches() throws InterruptedException,
IOException {
- HTU.shutdownMiniHBaseCluster();
+ protected void additonalConfigSetup() {
conf.setInt(MobConstants.MOB_MAJOR_COMPACTION_REGION_BATCH_SIZE,
batchSize);
- HTU.startMiniHBaseCluster();
}
@Override
diff --git
a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobCompactionWithDefaults.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobCompactionWithDefaults.java
index 4a1b3f1b8b3..69ba4ea24b2 100644
---
a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobCompactionWithDefaults.java
+++
b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobCompactionWithDefaults.java
@@ -23,6 +23,7 @@ import static org.junit.Assert.fail;
import java.io.IOException;
import java.util.Arrays;
+import java.util.Collection;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
@@ -31,6 +32,7 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtil;
+import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
@@ -41,18 +43,19 @@ import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.TableDescriptor;
+import
org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerFactory;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.RegionSplitter;
import org.junit.After;
-import org.junit.AfterClass;
import org.junit.Before;
-import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -66,6 +69,7 @@ import org.slf4j.LoggerFactory;
* Runs Mob cleaner chore 11 Verifies that number of MOB files in a mob
directory is 20. 12 Runs
* scanner and checks all 3 * 1000 rows.
*/
+@RunWith(Parameterized.class)
@Category(LargeTests.class)
public class TestMobCompactionWithDefaults {
private static final Logger LOG =
LoggerFactory.getLogger(TestMobCompactionWithDefaults.class);
@@ -73,7 +77,7 @@ public class TestMobCompactionWithDefaults {
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestMobCompactionWithDefaults.class);
- protected static HBaseTestingUtil HTU;
+ protected HBaseTestingUtil HTU;
protected static Configuration conf;
protected static long minAgeToArchive = 10000;
@@ -95,8 +99,19 @@ public class TestMobCompactionWithDefaults {
protected MobFileCleanerChore cleanerChore;
- @BeforeClass
- public static void htuStart() throws Exception {
+ protected Boolean useFileBasedSFT;
+
+ public TestMobCompactionWithDefaults(Boolean useFileBasedSFT) {
+ this.useFileBasedSFT = useFileBasedSFT;
+ }
+
+ @Parameterized.Parameters
+ public static Collection<Boolean> data() {
+ Boolean[] data = { false, true };
+ return Arrays.asList(data);
+ }
+
+ protected void htuStart() throws Exception {
HTU = new HBaseTestingUtil();
conf = HTU.getConfiguration();
conf.setInt("hfile.format.version", 3);
@@ -109,21 +124,25 @@ public class TestMobCompactionWithDefaults {
// Set compacted file discharger interval to a half minAgeToArchive
conf.setLong("hbase.hfile.compaction.discharger.interval", minAgeToArchive
/ 2);
conf.setBoolean("hbase.regionserver.compaction.enabled", false);
+ if (useFileBasedSFT) {
+ conf.set(StoreFileTrackerFactory.TRACKER_IMPL,
+
"org.apache.hadoop.hbase.regionserver.storefiletracker.FileBasedStoreFileTracker");
+ }
+ additonalConfigSetup();
HTU.startMiniCluster();
}
- @AfterClass
- public static void htuStop() throws Exception {
- HTU.shutdownMiniCluster();
+ protected void additonalConfigSetup() {
}
@Before
public void setUp() throws Exception {
+ htuStart();
admin = HTU.getAdmin();
cleanerChore = new MobFileCleanerChore();
familyDescriptor =
ColumnFamilyDescriptorBuilder.newBuilder(fam).setMobEnabled(true)
.setMobThreshold(mobLen).setMaxVersions(1).build();
- tableDescriptor = HTU.createModifyableTableDescriptor(test.getMethodName())
+ tableDescriptor =
HTU.createModifyableTableDescriptor(TestMobUtils.getTableName(test))
.setColumnFamily(familyDescriptor).build();
RegionSplitter.UniformSplit splitAlgo = new RegionSplitter.UniformSplit();
byte[][] splitKeys = splitAlgo.split(numRegions);
@@ -152,6 +171,7 @@ public class TestMobCompactionWithDefaults {
public void tearDown() throws Exception {
admin.disableTable(tableDescriptor.getTableName());
admin.deleteTable(tableDescriptor.getTableName());
+ HTU.shutdownMiniCluster();
}
@Test
@@ -167,12 +187,12 @@ public class TestMobCompactionWithDefaults {
@Test
public void testMobFileCompactionAfterSnapshotClone() throws
InterruptedException, IOException {
- final TableName clone = TableName.valueOf(test.getMethodName() + "-clone");
+ final TableName clone = TableName.valueOf(TestMobUtils.getTableName(test)
+ "-clone");
LOG.info("MOB compaction of cloned snapshot, " + description() + "
started");
loadAndFlushThreeTimes(rows, table, famStr);
LOG.debug("Taking snapshot and cloning table {}", table);
- admin.snapshot(test.getMethodName(), table);
- admin.cloneSnapshot(test.getMethodName(), clone);
+ admin.snapshot(TestMobUtils.getTableName(test), table);
+ admin.cloneSnapshot(TestMobUtils.getTableName(test), clone);
assertEquals("Should have 3 hlinks per region in MOB area from snapshot
clone", 3 * numRegions,
getNumberOfMobFiles(clone, famStr));
mobCompact(admin.getDescriptor(clone), familyDescriptor);
@@ -185,12 +205,12 @@ public class TestMobCompactionWithDefaults {
@Test
public void testMobFileCompactionAfterSnapshotCloneAndFlush()
throws InterruptedException, IOException {
- final TableName clone = TableName.valueOf(test.getMethodName() + "-clone");
+ final TableName clone = TableName.valueOf(TestMobUtils.getTableName(test)
+ "-clone");
LOG.info("MOB compaction of cloned snapshot after flush, " + description()
+ " started");
loadAndFlushThreeTimes(rows, table, famStr);
LOG.debug("Taking snapshot and cloning table {}", table);
- admin.snapshot(test.getMethodName(), table);
- admin.cloneSnapshot(test.getMethodName(), clone);
+ admin.snapshot(TestMobUtils.getTableName(test), table);
+ admin.cloneSnapshot(TestMobUtils.getTableName(test), clone);
assertEquals("Should have 3 hlinks per region in MOB area from snapshot
clone", 3 * numRegions,
getNumberOfMobFiles(clone, famStr));
loadAndFlushThreeTimes(rows, clone, famStr);
@@ -269,8 +289,11 @@ public class TestMobCompactionWithDefaults {
Thread.sleep(minAgeToArchive + 1000);
LOG.info("Cleaning up MOB files");
- // Cleanup again
- cleanerChore.cleanupObsoleteMobFiles(conf, table);
+
+ // run cleaner chore on each RS
+ for (ServerName sn : admin.getRegionServers()) {
+
HTU.getMiniHBaseCluster().getRegionServer(sn).getRSMobFileCleanerChore().chore();
+ }
assertEquals("After cleaning, we should have 1 MOB file per region based
on size.", numRegions,
getNumberOfMobFiles(table, family));
diff --git
a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobFileCleanerChore.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobFileCleanerChore.java
index f950c18dd18..bdc3cce13e4 100644
---
a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobFileCleanerChore.java
+++
b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobFileCleanerChore.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.hbase.mob;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
@@ -166,14 +167,38 @@ public class TestMobFileCleanerChore {
Thread.sleep(minAgeToArchive + 1000);
LOG.info("Cleaning up MOB files");
- // Cleanup again
+ // Cleanup
chore.cleanupObsoleteMobFiles(conf, table.getName());
+ // verify that nothing have happened
num = getNumberOfMobFiles(conf, table.getName(), new String(fam));
- assertEquals(1, num);
+ assertEquals(4, num);
long scanned = scanTable();
assertEquals(30, scanned);
+
+ // add a MOB file to with a name refering to a non-existing region
+ Path extraMOBFile = MobTestUtil.generateMOBFileForRegion(conf,
table.getName(),
+ familyDescriptor, "nonExistentRegion");
+ num = getNumberOfMobFiles(conf, table.getName(), new String(fam));
+ assertEquals(5, num);
+
+ LOG.info("Waiting for {}ms", minAgeToArchive + 1000);
+
+ Thread.sleep(minAgeToArchive + 1000);
+ LOG.info("Cleaning up MOB files");
+ chore.cleanupObsoleteMobFiles(conf, table.getName());
+
+ // check that the extra file got deleted
+ num = getNumberOfMobFiles(conf, table.getName(), new String(fam));
+ assertEquals(4, num);
+
+ FileSystem fs = FileSystem.get(conf);
+ assertFalse(fs.exists(extraMOBFile));
+
+ scanned = scanTable();
+ assertEquals(30, scanned);
+
}
private long getNumberOfMobFiles(Configuration conf, TableName tableName,
String family)
diff --git
a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobStoreCompaction.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobStoreCompaction.java
index 9951bef4747..c2f2b3fd426 100644
---
a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobStoreCompaction.java
+++
b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobStoreCompaction.java
@@ -26,11 +26,14 @@ import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
+import java.util.UUID;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
@@ -63,6 +66,7 @@ import org.apache.hadoop.hbase.regionserver.InternalScanner;
import org.apache.hadoop.hbase.regionserver.RegionAsTable;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
import
org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
+import
org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerFactory;
import
org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.testclassification.MediumTests;
@@ -76,12 +80,15 @@ import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Test mob store compaction
*/
+@RunWith(Parameterized.class)
@Category(MediumTests.class)
public class TestMobStoreCompaction {
@@ -106,15 +113,33 @@ public class TestMobStoreCompaction {
private final byte[] STARTROW = Bytes.toBytes(START_KEY);
private int compactionThreshold;
+ private Boolean useFileBasedSFT;
+
+ public TestMobStoreCompaction(Boolean useFileBasedSFT) {
+ this.useFileBasedSFT = useFileBasedSFT;
+ }
+
+ @Parameterized.Parameters
+ public static Collection<Boolean> data() {
+ Boolean[] data = { false, true };
+ return Arrays.asList(data);
+ }
+
private void init(Configuration conf, long mobThreshold) throws Exception {
+ if (useFileBasedSFT) {
+ conf.set(StoreFileTrackerFactory.TRACKER_IMPL,
+
"org.apache.hadoop.hbase.regionserver.storefiletracker.FileBasedStoreFileTracker");
+ }
+
this.conf = conf;
this.mobCellThreshold = mobThreshold;
+
HBaseTestingUtil UTIL = new HBaseTestingUtil(conf);
compactionThreshold = conf.getInt("hbase.hstore.compactionThreshold", 3);
familyDescriptor =
ColumnFamilyDescriptorBuilder.newBuilder(COLUMN_FAMILY).setMobEnabled(true)
.setMobThreshold(mobThreshold).setMaxVersions(1).build();
- tableDescriptor =
UTIL.createModifyableTableDescriptor(name.getMethodName())
+ tableDescriptor =
UTIL.createModifyableTableDescriptor(TestMobUtils.getTableName(name))
.modifyColumnFamily(familyDescriptor).build();
RegionInfo regionInfo =
RegionInfoBuilder.newBuilder(tableDescriptor.getTableName()).build();
@@ -223,7 +248,7 @@ public class TestMobStoreCompaction {
Path basedir = new Path(hbaseRootDir,
tableDescriptor.getTableName().getNameAsString());
List<Pair<byte[], String>> hfiles = new ArrayList<>(1);
for (int i = 0; i < compactionThreshold; i++) {
- Path hpath = new Path(basedir, "hfile" + i);
+ Path hpath = new Path(basedir, UUID.randomUUID().toString().replace("-",
""));
hfiles.add(Pair.newPair(COLUMN_FAMILY, hpath.toString()));
createHFile(hpath, i, dummyData);
}
diff --git
a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobUtils.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobUtils.java
index 247dfff77d1..32170e98b35 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobUtils.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobUtils.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.hbase.util.Bytes;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableSet;
import
org.apache.hbase.thirdparty.com.google.common.collect.ImmutableSetMultimap;
@@ -89,4 +90,8 @@ public class TestMobUtils {
assertTrue(testTable3Refs.contains("file3a"));
assertTrue(testTable3Refs.contains("file3b"));
}
+
+ public static String getTableName(TestName test) {
+ return test.getMethodName().replace("[", "-").replace("]", "");
+ }
}
diff --git
a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobFileCleanerChore.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestRSMobFileCleanerChore.java
similarity index 74%
copy from
hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobFileCleanerChore.java
copy to
hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestRSMobFileCleanerChore.java
index f950c18dd18..86cb3558f67 100644
---
a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobFileCleanerChore.java
+++
b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestRSMobFileCleanerChore.java
@@ -22,18 +22,22 @@ import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtil;
+import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
import org.apache.hadoop.hbase.client.CompactionState;
import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Table;
@@ -51,16 +55,16 @@ import org.slf4j.LoggerFactory;
/**
* Mob file cleaner chore test. 1. Creates MOB table 2. Load MOB data and
flushes it N times 3. Runs
- * major MOB compaction (N MOB files go to archive) 4. Verifies that number of
MOB files in a mob
- * directory is N+1 5. Waits for a period of time larger than minimum age to
archive 6. Runs Mob
- * cleaner chore 7 Verifies that number of MOB files in a mob directory is 1.
+ * major MOB compaction 4. Verifies that number of MOB files in a mob
directory is N+1 5. Waits for
+ * a period of time larger than minimum age to archive 6. Runs Mob cleaner
chore 7 Verifies that
+ * every old MOB file referenced from current RS was archived
*/
@Category(MediumTests.class)
-public class TestMobFileCleanerChore {
- private static final Logger LOG =
LoggerFactory.getLogger(TestMobFileCleanerChore.class);
+public class TestRSMobFileCleanerChore {
+ private static final Logger LOG =
LoggerFactory.getLogger(TestRSMobFileCleanerChore.class);
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
- HBaseClassTestRule.forClass(TestMobFileCleanerChore.class);
+ HBaseClassTestRule.forClass(TestRSMobFileCleanerChore.class);
private HBaseTestingUtil HTU;
@@ -76,10 +80,10 @@ public class TestMobFileCleanerChore {
private ColumnFamilyDescriptor familyDescriptor;
private Admin admin;
private Table table = null;
- private MobFileCleanerChore chore;
+ private RSMobFileCleanerChore chore;
private long minAgeToArchive = 10000;
- public TestMobFileCleanerChore() {
+ public TestRSMobFileCleanerChore() {
}
@Before
@@ -91,12 +95,11 @@ public class TestMobFileCleanerChore {
HTU.startMiniCluster();
admin = HTU.getAdmin();
- chore = new MobFileCleanerChore();
familyDescriptor =
ColumnFamilyDescriptorBuilder.newBuilder(fam).setMobEnabled(true)
.setMobThreshold(mobLen).setMaxVersions(1).build();
tableDescriptor =
HTU.createModifyableTableDescriptor("testMobCompactTable")
.setColumnFamily(familyDescriptor).build();
- table = HTU.createTable(tableDescriptor, null);
+ table = HTU.createTable(tableDescriptor, Bytes.toByteArrays("1"));
}
private void initConf() {
@@ -145,12 +148,11 @@ public class TestMobFileCleanerChore {
@Test
public void testMobFileCleanerChore() throws InterruptedException,
IOException {
-
loadData(0, 10);
loadData(10, 10);
- loadData(20, 10);
+ // loadData(20, 10);
long num = getNumberOfMobFiles(conf, table.getName(), new String(fam));
- assertEquals(3, num);
+ assertEquals(2, num);
// Major compact
admin.majorCompact(tableDescriptor.getTableName(), fam);
// wait until compaction is complete
@@ -159,21 +161,68 @@ public class TestMobFileCleanerChore {
}
num = getNumberOfMobFiles(conf, table.getName(), new String(fam));
- assertEquals(4, num);
+ assertEquals(3, num);
// We have guarantee, that compcated file discharger will run during this
pause
// because it has interval less than this wait time
LOG.info("Waiting for {}ms", minAgeToArchive + 1000);
Thread.sleep(minAgeToArchive + 1000);
LOG.info("Cleaning up MOB files");
- // Cleanup again
- chore.cleanupObsoleteMobFiles(conf, table.getName());
+
+ ServerName serverUsed = null;
+ List<RegionInfo> serverRegions = null;
+ for (ServerName sn : admin.getRegionServers()) {
+ serverRegions = admin.getRegions(sn);
+ if (serverRegions != null && serverRegions.size() > 0) {
+ // filtering out non test table regions
+ serverRegions = serverRegions.stream().filter(r -> r.getTable() ==
table.getName())
+ .collect(Collectors.toList());
+ // if such one is found use this rs
+ if (serverRegions.size() > 0) {
+ serverUsed = sn;
+ }
+ break;
+ }
+ }
+
+ chore =
HTU.getMiniHBaseCluster().getRegionServer(serverUsed).getRSMobFileCleanerChore();
+
+ chore.chore();
num = getNumberOfMobFiles(conf, table.getName(), new String(fam));
- assertEquals(1, num);
+ assertEquals(3 - serverRegions.size(), num);
long scanned = scanTable();
- assertEquals(30, scanned);
+ assertEquals(20, scanned);
+
+ // creating a MOB file not referenced from the current RS
+ Path extraMOBFile = MobTestUtil.generateMOBFileForRegion(conf,
table.getName(),
+ familyDescriptor, "nonExistentRegion");
+
+ // verifying the new MOBfile is added
+ num = getNumberOfMobFiles(conf, table.getName(), new String(fam));
+ assertEquals(4 - serverRegions.size(), num);
+
+ FileSystem fs = FileSystem.get(conf);
+ assertTrue(fs.exists(extraMOBFile));
+
+ LOG.info("Waiting for {}ms", minAgeToArchive + 1000);
+
+ Thread.sleep(minAgeToArchive + 1000);
+ LOG.info("Cleaning up MOB files");
+
+ // running chore again
+ chore.chore();
+
+ // the chore should only archive old MOB files that were referenced from
the current RS
+ // the unrelated MOB file is still there
+ num = getNumberOfMobFiles(conf, table.getName(), new String(fam));
+ assertEquals(4 - serverRegions.size(), num);
+
+ assertTrue(fs.exists(extraMOBFile));
+
+ scanned = scanTable();
+ assertEquals(20, scanned);
}
private long getNumberOfMobFiles(Configuration conf, TableName tableName,
String family)