[
https://issues.apache.org/jira/browse/HBASE-8024?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13615013#comment-13615013
]
Maryann Xue commented on HBASE-8024:
------------------------------------
Modification of HStore#internalFlushCache() for our LOB use case:
{code}
private Path internalFlushCacheToBlobStore(final SortedSet<KeyValue> set,
final long logCacheFlushId,
TimeRangeTracker snapshotTimeRangeTracker,
AtomicLong flushedSize,
MonitoredTask status)
throws IOException {
StoreFile.Writer writer;
// Find the smallest read point across all the Scanners.
long smallestReadPoint = region.getSmallestReadPoint();
long flushed = 0;
Path referenceFilePath = null;
Path blobFilePath = null;
// Don't flush if there are no entries.
if (set.size() == 0) {
return null;
}
Scan scan = new Scan();
scan.setMaxVersions(scanInfo.getMaxVersions());
// Use a store scanner to find which rows to flush.
// Note that we need to retain deletes, hence
// treat this as a minor compaction.
InternalScanner scanner = new StoreScanner(this, scan, Collections
.singletonList(new CollectionBackedScanner(set, this.comparator)),
ScanType.MINOR_COMPACT, this.region.getSmallestReadPoint(),
HConstants.OLDEST_TIMESTAMP);
BlobStore blobStore =
BlobStoreManager.getInstance().getBlobStore(getTableName(),
family.getNameAsString());
if (null == blobStore) {
blobStore =
BlobStoreManager.getInstance().createBlobStore(getTableName(), family);
}
StoreFile.Writer blobWriter = null;
try {
// TODO: We can fail in the below block before we complete adding this
// flush to list of store files. Add cleanup of anything put on
filesystem
// if we fail.
synchronized (flushLock) {
status.setStatus("Flushing " + this + ": creating writer");
int referenceKeyValueCount = set.size();
int blobKeyValueCount = 0;
// A. Write the map out to the disk
writer = createWriterInTmp(referenceKeyValueCount);
writer.setTimeRangeTracker(snapshotTimeRangeTracker);
referenceFilePath = writer.getPath();
Iterator<KeyValue> iter = set.iterator();
while(null != iter && iter.hasNext()) {
if (iter.next().getType() == KeyValue.Type.Put.getCode()) {
blobKeyValueCount++;
}
}
blobWriter = blobStore.createWriterInTmp(blobKeyValueCount,
this.compression,
region.getRegionInfo());
blobFilePath = blobWriter.getPath();
String targetPathName = dateFormatter.format(new Date());
Path targetPath = new Path(blobStore.getHomePath(), targetPathName);
String relativePath = targetPathName + Path.SEPARATOR +
blobFilePath.getName();
// Append the BLOB_STORE_VERSION before the relative path name
byte[] referenceValue = Bytes.add(
new byte[] { BlobStoreConstants.BLOB_STORE_VERSION },
Bytes.toBytes(relativePath));
try {
List<KeyValue> kvs = new ArrayList<KeyValue>();
boolean hasMore;
do {
hasMore = scanner.next(kvs);
if (!kvs.isEmpty()) {
for (KeyValue kv : kvs) {
// If we know that this KV is going to be included always, then
let us
// set its memstoreTS to 0. This will help us save space when
writing to disk.
if (kv.getMemstoreTS() <= smallestReadPoint) {
// let us not change the original KV. It could be in the
memstore
// changing its memstoreTS could affect other
threads/scanners.
kv = kv.shallowCopy();
kv.setMemstoreTS(0);
}
if (kv.getType() == KeyValue.Type.Reference.getCode()) {
writer.append(kv);
}
else {
// append the original keyValue in the blob file.
blobWriter.append(kv);
// append reference KeyValue.
// The key is same, the value is the blobfile's filename
KeyValue reference = new KeyValue(kv.getBuffer(),
kv.getRowOffset(),
kv.getRowLength(),
kv.getBuffer(),
kv.getFamilyOffset(),
kv.getFamilyLength(),
kv.getBuffer(),
kv.getQualifierOffset(),
kv.getQualifierLength(),
kv.getTimestamp(),
KeyValue.Type.Reference,
referenceValue,
0,
referenceValue.length);
writer.append(reference);
}
flushed += this.memstore.heapSizeChange(kv, true);
}
kvs.clear();
}
} while (hasMore);
} finally {
// Write out the log sequence number that corresponds to this output
// hfile. The hfile is current up to and including logCacheFlushId.
status.setStatus("Flushing " + this + ": appending metadata");
writer.appendMetadata(logCacheFlushId, false);
writer.appendFileInfo(StoreFile.KEYVALUE_COUNT,
Bytes.toBytes(referenceKeyValueCount));
blobWriter.appendMetadata(logCacheFlushId, false);
blobWriter.appendFileInfo(StoreFile.KEYVALUE_COUNT,
Bytes.toBytes(blobKeyValueCount));
status.setStatus("Flushing " + this + ": closing flushed file");
writer.close();
blobWriter.close();
/*
* commit the blob file from tmp folder to real folder.
*/
blobStore.commitFile(blobFilePath, targetPath);
}
}
} finally {
flushedSize.set(flushed);
scanner.close();
}
if (LOG.isInfoEnabled()) {
LOG.info("Flushed " +
", sequenceid=" + logCacheFlushId +
", memsize=" + StringUtils.humanReadableInt(flushed) +
", into tmp file " + referenceFilePath);
}
return referenceFilePath;
}
{code}
> Make Store flush algorithm pluggable
> ------------------------------------
>
> Key: HBASE-8024
> URL: https://issues.apache.org/jira/browse/HBASE-8024
> Project: HBase
> Issue Type: Sub-task
> Components: regionserver
> Affects Versions: 0.95.0, 0.96.0, 0.94.5
> Reporter: Maryann Xue
> Assignee: Maryann Xue
> Attachments: HBASE-8024-trunk.patch, HBASE-8024.v2.patch
>
>
> The idea is to make "StoreFlusher" an interface instead of an implementation
> class, and have the original StoreFlusher as the default store flush impl.
--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira