Author: larsh
Date: Wed Oct 2 19:06:15 2013
New Revision: 1528592
URL: http://svn.apache.org/r1528592
Log:
HBASE-8521 Cells cannot be overwritten with bulk loaded HFiles (Jean-Marc
Spaggiari)
Modified:
hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/coprocessor/SecureBulkLoadClient.java
hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java
hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java
hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFiles.java
hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java
hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoad.java
hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreFile.java
hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java
Modified:
hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/coprocessor/SecureBulkLoadClient.java
URL:
http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/coprocessor/SecureBulkLoadClient.java?rev=1528592&r1=1528591&r2=1528592&view=diff
==============================================================================
---
hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/coprocessor/SecureBulkLoadClient.java
(original)
+++
hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/coprocessor/SecureBulkLoadClient.java
Wed Oct 2 19:06:15 2013
@@ -70,11 +70,17 @@ public class SecureBulkLoadClient {
}
}
- public boolean bulkLoadHFiles(List<Pair<byte[], String>> familyPaths,
- Token<?> userToken, String bulkToken) throws
IOException {
+ public boolean bulkLoadHFiles(List<Pair<byte[], String>> familyPaths,
Token<?> userToken,
+ String bulkToken) throws IOException {
+ return bulkLoadHFiles(familyPaths, userToken, bulkToken, false);
+ }
+
+ public boolean bulkLoadHFiles(List<Pair<byte[], String>> familyPaths,
Token<?> userToken,
+ String bulkToken, boolean assignSeqNum) throws IOException {
try {
- return (Boolean)Methods.call(protocolClazz, proxy, "bulkLoadHFiles",
- new Class[]{List.class, Token.class, String.class},new
Object[]{familyPaths, userToken, bulkToken});
+ return (Boolean) Methods.call(protocolClazz, proxy, "bulkLoadHFiles",
new Class[] {
+ List.class, Token.class, String.class, Boolean.class },
+ new Object[] { familyPaths, userToken, bulkToken, assignSeqNum });
} catch (Exception e) {
throw new IOException("Failed to bulkLoadHFiles", e);
}
Modified:
hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java
URL:
http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java?rev=1528592&r1=1528591&r2=1528592&view=diff
==============================================================================
---
hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java
(original)
+++
hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java
Wed Oct 2 19:06:15 2013
@@ -392,6 +392,21 @@ public interface HRegionInterface extend
public boolean bulkLoadHFiles(List<Pair<byte[], String>> familyPaths, byte[]
regionName)
throws IOException;
+ /**
+ * Atomically bulk load multiple HFiles (say from different column families)
+ * into an open region.
+ *
+ * @param familyPaths List of (family, hfile path) pairs
+ * @param regionName name of region to load hfiles into
+ * @param assignSeqNum should we assign sequence numbers
+ * @return true if successful, false if failed recoverably
+ * @throws IOException if fails unrecoverably
+ */
+ public boolean bulkLoadHFiles(List<Pair<byte[], String>> familyPaths,
byte[] regionName,
+ boolean assignSeqNum)
+ throws IOException;
+
+
// Master methods
/**
Modified:
hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
URL:
http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java?rev=1528592&r1=1528591&r2=1528592&view=diff
==============================================================================
---
hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
(original)
+++
hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
Wed Oct 2 19:06:15 2013
@@ -93,17 +93,17 @@ import com.google.common.util.concurrent
public class LoadIncrementalHFiles extends Configured implements Tool {
private static Log LOG = LogFactory.getLog(LoadIncrementalHFiles.class);
- private static final int TABLE_CREATE_MAX_RETRIES = 20;
- private static final long TABLE_CREATE_SLEEP = 60000;
static AtomicLong regionCount = new AtomicLong(0);
private HBaseAdmin hbAdmin;
private Configuration cfg;
public static String NAME = "completebulkload";
+ public static String ASSIGN_SEQ_IDS =
"hbase.mapreduce.bulkload.assign.sequenceNumbers";
private boolean useSecure;
private Token<?> userToken;
private String bulkToken;
+ private final boolean assignSeqIds;
//package private for testing
LoadIncrementalHFiles(Configuration conf, Boolean useSecure) throws
Exception {
@@ -112,6 +112,7 @@ public class LoadIncrementalHFiles exten
this.hbAdmin = new HBaseAdmin(conf);
//added simple for testing
this.useSecure = useSecure != null ? useSecure :
User.isHBaseSecurityEnabled(conf);
+ this.assignSeqIds = conf.getBoolean(ASSIGN_SEQ_IDS, false);
}
public LoadIncrementalHFiles(Configuration conf) throws Exception {
@@ -290,7 +291,7 @@ public class LoadIncrementalHFiles exten
LOG.error(err);
}
}
-
+
if (queue != null && !queue.isEmpty()) {
throw new RuntimeException("Bulk load aborted with some files not yet
loaded."
+ "Please check log for more details.");
@@ -360,7 +361,7 @@ public class LoadIncrementalHFiles exten
Set<Future<List<LoadQueueItem>>> splittingFutures = new
HashSet<Future<List<LoadQueueItem>>>();
while (!queue.isEmpty()) {
final LoadQueueItem item = queue.remove();
-
+
final Callable<List<LoadQueueItem>> call = new
Callable<List<LoadQueueItem>>() {
public List<LoadQueueItem> call() throws Exception {
List<LoadQueueItem> splits = groupOrSplit(regionGroups, item, table,
startEndKeys);
@@ -524,11 +525,11 @@ public class LoadIncrementalHFiles exten
+ Bytes.toStringBinary(row));
byte[] regionName = location.getRegionInfo().getRegionName();
if(!useSecure) {
- success = server.bulkLoadHFiles(famPaths, regionName);
+ success = server.bulkLoadHFiles(famPaths, regionName,
assignSeqIds);
} else {
HTable table = new HTable(conn.getConfiguration(), tableName);
secureClient = new SecureBulkLoadClient(table,
location.getRegionInfo().getStartKey());
- success = secureClient.bulkLoadHFiles(famPaths, userToken,
bulkToken);
+ success = secureClient.bulkLoadHFiles(famPaths, userToken,
bulkToken, assignSeqIds);
}
return success;
} finally {
@@ -653,7 +654,7 @@ public class LoadIncrementalHFiles exten
private boolean doesTableExist(String tableName) throws Exception {
return hbAdmin.tableExists(tableName);
}
-
+
/*
* Infers region boundaries for a new table.
* Parameter:
@@ -671,16 +672,15 @@ public class LoadIncrementalHFiles exten
int runningValue = 0;
byte[] currStartKey = null;
boolean firstBoundary = true;
-
+
for (Map.Entry<byte[], Integer> item: bdryMap.entrySet()) {
if (runningValue == 0) currStartKey = item.getKey();
runningValue += item.getValue();
if (runningValue == 0) {
if (!firstBoundary) keysArray.add(currStartKey);
firstBoundary = false;
- }
+ }
}
-
return keysArray.toArray(new byte[0][0]);
}
@@ -709,7 +709,7 @@ public class LoadIncrementalHFiles exten
// Build a set of keys
byte[][] keys = null;
TreeMap<byte[], Integer> map = new TreeMap<byte[],
Integer>(Bytes.BYTES_COMPARATOR);
-
+
for (FileStatus stat : familyDirStatuses) {
if (!stat.isDir()) {
LOG.warn("Skipping non-directory " + stat.getPath());
@@ -719,10 +719,10 @@ public class LoadIncrementalHFiles exten
// Skip _logs, etc
if (familyDir.getName().startsWith("_")) continue;
byte[] family = familyDir.getName().getBytes();
-
+
hcd = new HColumnDescriptor(family);
htd.addFamily(hcd);
-
+
Path[] hfiles = FileUtil.stat2Paths(fs.listStatus(familyDir));
for (Path hfile : hfiles) {
if (hfile.getName().startsWith("_")) continue;
@@ -742,7 +742,7 @@ public class LoadIncrementalHFiles exten
LOG.info("Trying to figure out region boundaries hfile=" + hfile +
" first=" + Bytes.toStringBinary(first) +
" last=" + Bytes.toStringBinary(last));
-
+
// To eventually infer start key-end key boundaries
Integer value = map.containsKey(first)?(Integer)map.get(first):0;
map.put(first, value+1);
@@ -754,7 +754,7 @@ public class LoadIncrementalHFiles exten
}
}
}
-
+
keys = LoadIncrementalHFiles.inferBoundaries(map);
this.hbAdmin.createTable(htd,keys);
Modified:
hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
URL:
http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java?rev=1528592&r1=1528591&r2=1528592&view=diff
==============================================================================
---
hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
(original)
+++
hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
Wed Oct 2 19:06:15 2013
@@ -494,7 +494,7 @@ public class HRegion implements HeapSize
// When hbase.regionserver.optionallogflushinterval <= 0 , deferred log
sync is disabled.
this.deferredLogSyncDisabled =
conf.getLong("hbase.regionserver.optionallogflushinterval",
1 * 1000) <= 0;
-
+
if (rsServices != null) {
this.rsAccounting = this.rsServices.getRegionServerAccounting();
// don't initialize coprocessors if not running within a regionserver
@@ -608,11 +608,13 @@ public class HRegion implements HeapSize
Store store = future.get();
this.stores.put(store.getColumnFamilyName().getBytes(), store);
- long storeSeqId = store.getMaxSequenceId();
- maxSeqIdInStores.put(store.getColumnFamilyName().getBytes(),
- storeSeqId);
- if (maxSeqId == -1 || storeSeqId > maxSeqId) {
- maxSeqId = storeSeqId;
+ // Do not include bulk loaded files when determining seqIdForReplay
+ long storeSeqIdForReplay = store.getMaxSequenceId(false);
+ maxSeqIdInStores.put(store.getColumnFamilyName().getBytes(),
storeSeqIdForReplay);
+ // Include bulk loaded files when determining seqIdForAssignment
+ long storeSeqIdForAssignment = store.getMaxSequenceId(true);
+ if (maxSeqId == -1 || storeSeqIdForAssignment > maxSeqId) {
+ maxSeqId = storeSeqIdForAssignment;
}
long maxStoreMemstoreTS = store.getMaxMemstoreTS();
if (maxStoreMemstoreTS > maxMemstoreTS) {
@@ -2244,7 +2246,7 @@ public class HRegion implements HeapSize
/** Keep track of the locks we hold so we can release them in finally
clause */
List<Integer> acquiredLocks =
Lists.newArrayListWithCapacity(batchOp.operations.length);
Set<HashedBytes> rowsAlreadyLocked = Sets.newHashSet();
-
+
// reference family maps directly so coprocessors can mutate them if
desired
Map<byte[],List<KeyValue>>[] familyMaps = new
Map[batchOp.operations.length];
// We try to set up a batch in the range [firstIndex,lastIndexExclusive)
@@ -3619,7 +3621,20 @@ public class HRegion implements HeapSize
* @throws IOException if failed unrecoverably.
*/
public boolean bulkLoadHFiles(List<Pair<byte[], String>> familyPaths) throws
IOException {
- return bulkLoadHFiles(familyPaths, null);
+ return bulkLoadHFiles(familyPaths, false);
+ }
+
+ /**
+ * Attempts to atomically load a group of hfiles. This is critical for
loading rows with multiple
+ * column families atomically.
+ * @param familyPaths List of Pair<byte[] column family, String hfilePath> *
@param assignSeqNum
+ * should we assign sequence numbers
+ * @return true if successful, false if failed recoverably
+ * @throws IOException if failed unrecoverably.
+ */
+ public boolean bulkLoadHFiles(List<Pair<byte[], String>> familyPaths,
boolean assignSeqId)
+ throws IOException {
+ return bulkLoadHFiles(familyPaths, null, assignSeqId);
}
/**
@@ -3634,6 +3649,20 @@ public class HRegion implements HeapSize
*/
public boolean bulkLoadHFiles(List<Pair<byte[], String>> familyPaths,
BulkLoadListener bulkLoadListener) throws IOException {
+ return bulkLoadHFiles(familyPaths, bulkLoadListener, false);
+ }
+
+ /**
+ * Attempts to atomically load a group of hfiles. This is critical for
loading rows with multiple
+ * column families atomically.
+ * @param familyPaths List of Pair<byte[] column family, String hfilePath>
+ * @param bulkLoadListener Internal hooks enabling massaging/preparation of
a file about to be
+ * bulk loaded * @param assignSeqNum should we assign sequence
numbers
+ * @return true if successful, false if failed recoverably
+ * @throws IOException if failed unrecoverably.
+ */
+ public boolean bulkLoadHFiles(List<Pair<byte[], String>> familyPaths,
+ BulkLoadListener bulkLoadListener, boolean assignSeqId) throws
IOException {
Preconditions.checkNotNull(familyPaths);
// we need writeLock for multi-family bulk load
startBulkRegionOperation(hasMultipleColumnFamilies(familyPaths));
@@ -3642,7 +3671,7 @@ public class HRegion implements HeapSize
this.opMetrics.setWriteRequestCountMetrics(
this.writeRequestsCount.get());
// There possibly was a split that happend between when the split keys
- // were gathered and before the HReiogn's write lock was taken. We need
+ // were gathered and before the HRegion's write lock was taken. We need
// to validate the HFile region before attempting to bulk load all of
them
List<IOException> ioes = new ArrayList<IOException>();
List<Pair<byte[], String>> failures = new ArrayList<Pair<byte[],
String>>();
@@ -3697,7 +3726,7 @@ public class HRegion implements HeapSize
if(bulkLoadListener != null) {
finalPath = bulkLoadListener.prepareBulkLoad(familyName, path);
}
- store.bulkLoadHFile(finalPath);
+ store.bulkLoadHFile(finalPath, assignSeqId ? this.log.obtainSeqNum()
: -1);
if(bulkLoadListener != null) {
bulkLoadListener.doneBulkLoad(familyName, path);
}
Modified:
hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
URL:
http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java?rev=1528592&r1=1528591&r2=1528592&view=diff
==============================================================================
---
hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
(original)
+++
hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
Wed Oct 2 19:06:15 2013
@@ -2919,6 +2919,17 @@ public class HRegionServer implements HR
@Override
public boolean bulkLoadHFiles(List<Pair<byte[], String>> familyPaths,
byte[] regionName) throws IOException {
+ return bulkLoadHFiles(familyPaths, regionName, false);
+ }
+
+ /**
+ * Atomically bulk load several HFiles into an open region
+ * @return true if successful, false is failed but recoverably (no action)
+ * @throws IOException if failed unrecoverably
+ */
+ @Override
+ public boolean bulkLoadHFiles(List<Pair<byte[], String>> familyPaths,
+ byte[] regionName, boolean assignSeqNum) throws IOException {
checkOpen();
HRegion region = getRegion(regionName);
boolean bypass = false;
@@ -2927,7 +2938,7 @@ public class HRegionServer implements HR
}
boolean loaded = false;
if (!bypass) {
- loaded = region.bulkLoadHFiles(familyPaths);
+ loaded = region.bulkLoadHFiles(familyPaths, assignSeqNum);
}
if (region.getCoprocessorHost() != null) {
loaded = region.getCoprocessorHost().postBulkLoadHFile(familyPaths,
loaded);
Modified:
hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
URL:
http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java?rev=1528592&r1=1528591&r2=1528592&view=diff
==============================================================================
---
hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
(original)
+++
hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
Wed Oct 2 19:06:15 2013
@@ -347,8 +347,8 @@ public class Store extends SchemaConfigu
/**
* @return The maximum sequence id in all store files.
*/
- long getMaxSequenceId() {
- return StoreFile.getMaxSequenceIdInList(this.getStorefiles());
+ long getMaxSequenceId(boolean includeBulkFiles) {
+ return StoreFile.getMaxSequenceIdInList(this.getStorefiles(),
includeBulkFiles);
}
/**
@@ -628,7 +628,7 @@ public class Store extends SchemaConfigu
* ranges of values in the HFile fit within the stores assigned region.
* (assertBulkLoadHFileOk checks this)
*/
- void bulkLoadHFile(String srcPathStr) throws IOException {
+ public void bulkLoadHFile(String srcPathStr, long seqNum) throws IOException
{
Path srcPath = new Path(srcPathStr);
// Move the file if it's on another filesystem
@@ -647,7 +647,8 @@ public class Store extends SchemaConfigu
srcPath = tmpPath;
}
- Path dstPath = StoreFile.getRandomFilename(fs, homedir);
+ Path dstPath =
+ StoreFile.getRandomFilename(fs, homedir, (seqNum == -1) ? null :
"_SeqId_" + seqNum + "_");
LOG.debug("Renaming bulk load file " + srcPath + " to " + dstPath);
StoreFile.rename(fs, srcPath, dstPath);
@@ -1138,7 +1139,7 @@ public class Store extends SchemaConfigu
}
// Max-sequenceID is the last key in the files we're compacting
- long maxId = StoreFile.getMaxSequenceIdInList(filesToCompact);
+ long maxId = StoreFile.getMaxSequenceIdInList(filesToCompact, true);
// Ready to go. Have list of files to compact.
LOG.info("Starting compaction of " + filesToCompact.size() + " file(s) in "
@@ -1206,10 +1207,10 @@ public class Store extends SchemaConfigu
}
filesToCompact = filesToCompact.subList(count - N, count);
- maxId = StoreFile.getMaxSequenceIdInList(filesToCompact);
+ maxId = StoreFile.getMaxSequenceIdInList(filesToCompact, true);
isMajor = (filesToCompact.size() == storefiles.size());
filesCompacting.addAll(filesToCompact);
- Collections.sort(filesCompacting, StoreFile.Comparators.FLUSH_TIME);
+ Collections.sort(filesCompacting, StoreFile.Comparators.SEQ_ID);
}
} finally {
this.lock.readLock().unlock();
@@ -1426,7 +1427,7 @@ public class Store extends SchemaConfigu
filesToCompact, filesCompacting);
}
filesCompacting.addAll(filesToCompact.getFilesToCompact());
- Collections.sort(filesCompacting, StoreFile.Comparators.FLUSH_TIME);
+ Collections.sort(filesCompacting, StoreFile.Comparators.SEQ_ID);
// major compaction iff all StoreFiles are included
boolean isMajor = (filesToCompact.getFilesToCompact().size() ==
this.storefiles.size());
@@ -1896,7 +1897,7 @@ public class Store extends SchemaConfigu
}
public ImmutableList<StoreFile> sortAndClone(List<StoreFile> storeFiles) {
- Collections.sort(storeFiles, StoreFile.Comparators.FLUSH_TIME);
+ Collections.sort(storeFiles, StoreFile.Comparators.SEQ_ID);
ImmutableList<StoreFile> newList = ImmutableList.copyOf(storeFiles);
return newList;
}
Modified:
hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
URL:
http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java?rev=1528592&r1=1528591&r2=1528592&view=diff
==============================================================================
---
hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
(original)
+++
hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
Wed Oct 2 19:06:15 2013
@@ -439,10 +439,11 @@ public class StoreFile extends SchemaCon
* @return 0 if no non-bulk-load files are provided or, this is Store that
* does not yet have any store files.
*/
- public static long getMaxSequenceIdInList(Collection<StoreFile> sfs) {
+ public static long getMaxSequenceIdInList(Collection<StoreFile> sfs,
+ boolean includeBulkLoadedFiles) {
long max = 0;
for (StoreFile sf : sfs) {
- if (!sf.isBulkLoadResult()) {
+ if (includeBulkLoadedFiles || !sf.isBulkLoadResult()) {
max = Math.max(max, sf.getMaxSequenceId());
}
}
@@ -582,6 +583,24 @@ public class StoreFile extends SchemaCon
}
}
}
+
+ if (isBulkLoadResult()) {
+ // generate the sequenceId from the fileName
+ // fileName is of the form <randomName>_SeqId_<id-when-loaded>_
+ String fileName = this.path.getName();
+ int startPos = fileName.indexOf("SeqId_");
+ if (startPos != -1) {
+ this.sequenceid =
+ Long.parseLong(fileName.substring(startPos + 6,
fileName.indexOf('_', startPos + 6)));
+ // Handle reference files as done above.
+ if (isReference()) {
+ if (Reference.isTopFileRegion(this.reference.getFileRegion())) {
+ this.sequenceid += 1;
+ }
+ }
+ }
+ }
+
this.reader.setSequenceID(this.sequenceid);
b = metadataMap.get(HFileWriterV2.MAX_MEMSTORE_TS_KEY);
@@ -1859,29 +1878,35 @@ public class StoreFile extends SchemaCon
* Comparator that compares based on the flush time of
* the StoreFiles. All bulk loads are placed before all non-
* bulk loads, and then all files are sorted by sequence ID.
- * If there are ties, the path name is used as a tie-breaker.
+ * Comparator that compares based on the Sequence Ids of the
+ * the StoreFiles. Bulk loads that did not request a seq ID
+ * are given a seq id of -1; thus, they are placed before all non-
+ * bulk loads, and bulk loads with sequence Id. Among these files,
+ * the bulkLoadTime is used to determine the ordering.
+ * If there are ties, the path name is used as a tie-breaker.
*/
- static final Comparator<StoreFile> FLUSH_TIME =
+ static final Comparator<StoreFile> SEQ_ID =
Ordering.compound(ImmutableList.of(
- Ordering.natural().onResultOf(new GetBulkTime()),
Ordering.natural().onResultOf(new GetSeqId()),
+ Ordering.natural().onResultOf(new GetBulkTime()),
Ordering.natural().onResultOf(new GetPathName())
));
- private static class GetBulkTime implements Function<StoreFile, Long> {
+ private static class GetSeqId implements Function<StoreFile, Long> {
@Override
public Long apply(StoreFile sf) {
- if (!sf.isBulkLoadResult()) return Long.MAX_VALUE;
- return sf.getBulkLoadTimestamp();
+ return sf.getMaxSequenceId();
}
}
- private static class GetSeqId implements Function<StoreFile, Long> {
+
+ private static class GetBulkTime implements Function<StoreFile, Long> {
@Override
public Long apply(StoreFile sf) {
- if (sf.isBulkLoadResult()) return -1L;
- return sf.getMaxSequenceId();
+ if (!sf.isBulkLoadResult()) return Long.MAX_VALUE;
+ return sf.getBulkLoadTimestamp();
}
}
+
private static class GetPathName implements Function<StoreFile, String> {
@Override
public String apply(StoreFile sf) {
Modified:
hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java
URL:
http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java?rev=1528592&r1=1528591&r2=1528592&view=diff
==============================================================================
---
hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java
(original)
+++
hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java
Wed Oct 2 19:06:15 2013
@@ -1514,7 +1514,7 @@ public class HLog implements Syncable {
/**
* Obtain a log sequence number.
*/
- private long obtainSeqNum() {
+ public long obtainSeqNum() {
return this.logSeqNum.incrementAndGet();
}
Modified:
hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFiles.java
URL:
http://svn.apache.org/viewvc/hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFiles.java?rev=1528592&r1=1528591&r2=1528592&view=diff
==============================================================================
---
hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFiles.java
(original)
+++
hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFiles.java
Wed Oct 2 19:06:15 2013
@@ -24,6 +24,7 @@ import static org.junit.Assert.assertEqu
import static org.junit.Assert.assertTrue;
import java.io.IOException;
+import java.util.List;
import java.util.TreeMap;
import org.apache.hadoop.conf.Configuration;
@@ -32,10 +33,12 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.io.hfile.Compression;
import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.io.hfile.HFileScanner;
+import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.regionserver.StoreFile.BloomType;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.*;
@@ -157,6 +160,53 @@ public class TestLoadIncrementalHFiles {
assertEquals(expectedRows, util.countRows(table));
}
+ private void
+ verifyAssignedSequenceNumber(String testName, byte[][][] hfileRanges,
boolean nonZero)
+ throws Exception {
+ Path dir = util.getDataTestDir(testName);
+ FileSystem fs = util.getTestFileSystem();
+ dir = dir.makeQualified(fs);
+ Path familyDir = new Path(dir, Bytes.toString(FAMILY));
+
+ int hfileIdx = 0;
+ for (byte[][] range : hfileRanges) {
+ byte[] from = range[0];
+ byte[] to = range[1];
+ createHFile(util.getConfiguration(), fs, new Path(familyDir, "hfile_" +
hfileIdx++), FAMILY,
+ QUALIFIER, from, to, 1000);
+ }
+
+ final byte[] TABLE = Bytes.toBytes("mytable_" + testName);
+
+ HBaseAdmin admin = new HBaseAdmin(util.getConfiguration());
+ HTableDescriptor htd = new HTableDescriptor(TABLE);
+ HColumnDescriptor familyDesc = new HColumnDescriptor(FAMILY);
+ htd.addFamily(familyDesc);
+ admin.createTable(htd, SPLIT_KEYS);
+
+ HTable table = new HTable(util.getConfiguration(), TABLE);
+ util.waitTableAvailable(TABLE, 30000);
+ LoadIncrementalHFiles loader = new
LoadIncrementalHFiles(util.getConfiguration());
+
+ // Do a dummy put to increase the hlog sequence number
+ Put put = new Put(Bytes.toBytes("row"));
+ put.add(FAMILY, QUALIFIER, Bytes.toBytes("value"));
+ table.put(put);
+
+ loader.doBulkLoad(dir, table);
+
+ // Get the store files
+ List<StoreFile> files =
+
util.getHBaseCluster().getRegions(TABLE).get(0).getStore(FAMILY).getStorefiles();
+ for (StoreFile file : files) {
+ // the sequenceId gets initialized during createReader
+ file.createReader();
+
+ if (nonZero) assertTrue(file.getMaxSequenceId() > 0);
+ else assertTrue(file.getMaxSequenceId() == -1);
+ }
+ }
+
/**
* Test loading into a column family that does not exist.
*/
Modified:
hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java
URL:
http://svn.apache.org/viewvc/hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java?rev=1528592&r1=1528591&r2=1528592&view=diff
==============================================================================
---
hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java
(original)
+++
hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java
Wed Oct 2 19:06:15 2013
@@ -594,7 +594,7 @@ public class TestCompaction extends HBas
Store store = r.getStore(COLUMN_FAMILY);
List<StoreFile> storeFiles = store.getStorefiles();
- long maxId = StoreFile.getMaxSequenceIdInList(storeFiles);
+ long maxId = StoreFile.getMaxSequenceIdInList(storeFiles, true);
Compactor tool = new Compactor(this.conf);
StoreFile.Writer compactedFile = tool.compactForTesting(store, this.conf,
storeFiles, false,
Modified:
hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoad.java
URL:
http://svn.apache.org/viewvc/hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoad.java?rev=1528592&r1=1528591&r2=1528592&view=diff
==============================================================================
---
hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoad.java
(original)
+++
hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoad.java
Wed Oct 2 19:06:15 2013
@@ -145,7 +145,7 @@ public class TestHRegionServerBulkLoad {
LOG.debug("Going to connect to server " + location + " for row "
+ Bytes.toStringBinary(row));
byte[] regionName = location.getRegionInfo().getRegionName();
- server.bulkLoadHFiles(famPaths, regionName);
+ server.bulkLoadHFiles(famPaths, regionName, true);
return null;
}
}.withRetries();
Modified:
hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreFile.java
URL:
http://svn.apache.org/viewvc/hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreFile.java?rev=1528592&r1=1528591&r2=1528592&view=diff
==============================================================================
---
hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreFile.java
(original)
+++
hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreFile.java
Wed Oct 2 19:06:15 2013
@@ -778,8 +778,8 @@ public class TestStoreFile extends HBase
fs.delete(f, true);
}
- public void testFlushTimeComparator() {
- assertOrdering(StoreFile.Comparators.FLUSH_TIME,
+ public void testSeqIdComparator() {
+ assertOrdering(StoreFile.Comparators.SEQ_ID,
mockStoreFile(true, 1000, -1, "/foo/123"),
mockStoreFile(true, 1000, -1, "/foo/126"),
mockStoreFile(true, 2000, -1, "/foo/126"),
@@ -810,13 +810,7 @@ public class TestStoreFile extends HBase
StoreFile mock = Mockito.mock(StoreFile.class);
Mockito.doReturn(bulkLoad).when(mock).isBulkLoadResult();
Mockito.doReturn(bulkTimestamp).when(mock).getBulkLoadTimestamp();
- if (bulkLoad) {
- // Bulk load files will throw if you ask for their sequence ID
- Mockito.doThrow(new IllegalAccessError("bulk load"))
- .when(mock).getMaxSequenceId();
- } else {
- Mockito.doReturn(seqId).when(mock).getMaxSequenceId();
- }
+ Mockito.doReturn(seqId).when(mock).getMaxSequenceId();
Mockito.doReturn(new Path(path)).when(mock).getPath();
String name = "mock storefile, bulkLoad=" + bulkLoad +
" bulkTimestamp=" + bulkTimestamp +
Modified:
hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java
URL:
http://svn.apache.org/viewvc/hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java?rev=1528592&r1=1528591&r2=1528592&view=diff
==============================================================================
---
hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java
(original)
+++
hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java
Wed Oct 2 19:06:15 2013
@@ -326,7 +326,7 @@ public class TestWALReplay {
writer.close();
List <Pair<byte[],String>> hfs= new ArrayList<Pair<byte[],String>>(1);
hfs.add(Pair.newPair(family, f.toString()));
- region.bulkLoadHFiles(hfs);
+ region.bulkLoadHFiles(hfs, true);
// Add an edit so something in the WAL
region.put((new Put(row)).add(family, family, family));
wal.sync();