[ 
https://issues.apache.org/jira/browse/HBASE-8024?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13615013#comment-13615013
 ] 

Maryann Xue commented on HBASE-8024:
------------------------------------

Modification of HStore#internalFlushCache() for our LOB use case:
{code}
  private Path internalFlushCacheToBlobStore(final SortedSet<KeyValue> set,
      final long logCacheFlushId,
      TimeRangeTracker snapshotTimeRangeTracker,
      AtomicLong flushedSize,
      MonitoredTask status)
      throws IOException {
    StoreFile.Writer writer;
    
    // Find the smallest read point across all the Scanners.
    long smallestReadPoint = region.getSmallestReadPoint();
    long flushed = 0;
    Path referenceFilePath = null;
    Path blobFilePath = null;
    // Don't flush if there are no entries.
    if (set.size() == 0) {
      return null;
    }
    Scan scan = new Scan();
    scan.setMaxVersions(scanInfo.getMaxVersions());
    // Use a store scanner to find which rows to flush.
    // Note that we need to retain deletes, hence
    // treat this as a minor compaction.
    InternalScanner scanner = new StoreScanner(this, scan, Collections
        .singletonList(new CollectionBackedScanner(set, this.comparator)),
        ScanType.MINOR_COMPACT, this.region.getSmallestReadPoint(),
        HConstants.OLDEST_TIMESTAMP);
    
    BlobStore blobStore = 
BlobStoreManager.getInstance().getBlobStore(getTableName(), 
family.getNameAsString());
    if (null == blobStore) {
      blobStore = 
BlobStoreManager.getInstance().createBlobStore(getTableName(), family);
    }
    
    StoreFile.Writer blobWriter = null;
    try {
      // TODO:  We can fail in the below block before we complete adding this
      // flush to list of store files.  Add cleanup of anything put on 
filesystem
      // if we fail.
      synchronized (flushLock) {
        status.setStatus("Flushing " + this + ": creating writer");
        
        int referenceKeyValueCount = set.size();
        int blobKeyValueCount = 0;
        
        // A. Write the map out to the disk
        writer = createWriterInTmp(referenceKeyValueCount);
        writer.setTimeRangeTracker(snapshotTimeRangeTracker);
        referenceFilePath = writer.getPath();
        
        Iterator<KeyValue> iter = set.iterator();
        
        while(null != iter && iter.hasNext()) {
          if (iter.next().getType() == KeyValue.Type.Put.getCode()) {
            blobKeyValueCount++;
          }
        }
        
        blobWriter = blobStore.createWriterInTmp(blobKeyValueCount, 
this.compression, 
            region.getRegionInfo());
        blobFilePath = blobWriter.getPath();
        String targetPathName = dateFormatter.format(new Date());
        Path targetPath = new Path(blobStore.getHomePath(), targetPathName);
        
        String relativePath =  targetPathName + Path.SEPARATOR +  
blobFilePath.getName();
        
        // Append the BLOB_STORE_VERSION before the relative path name
        byte[] referenceValue = Bytes.add(
            new byte[] { BlobStoreConstants.BLOB_STORE_VERSION },
            Bytes.toBytes(relativePath));
        
        try {
          List<KeyValue> kvs = new ArrayList<KeyValue>();
          boolean hasMore;
          do {
            hasMore = scanner.next(kvs);
            if (!kvs.isEmpty()) {
              for (KeyValue kv : kvs) {
                // If we know that this KV is going to be included always, then 
let us
                // set its memstoreTS to 0. This will help us save space when 
writing to disk.
                if (kv.getMemstoreTS() <= smallestReadPoint) {
                  // let us not change the original KV. It could be in the 
memstore
                  // changing its memstoreTS could affect other 
threads/scanners.
                  kv = kv.shallowCopy();
                  kv.setMemstoreTS(0);
                }
                
                if (kv.getType() == KeyValue.Type.Reference.getCode()) {
                  writer.append(kv);
                }
                else {
                
                  // append the original keyValue in the blob file.
                  blobWriter.append(kv);
                                    
                  // append reference KeyValue.
                  // The key is same, the value is the blobfile's filename
                  KeyValue reference = new KeyValue(kv.getBuffer(), 
                      kv.getRowOffset(), 
                      kv.getRowLength(),
                      kv.getBuffer(),
                      kv.getFamilyOffset(),
                      kv.getFamilyLength(),
                      kv.getBuffer(),
                      kv.getQualifierOffset(),
                      kv.getQualifierLength(),
                      kv.getTimestamp(),
                      KeyValue.Type.Reference,
                      referenceValue, 
                      0, 
                      referenceValue.length);
                  writer.append(reference);
                }
                
                flushed += this.memstore.heapSizeChange(kv, true);
              }
              kvs.clear();
            }
          } while (hasMore);
        } finally {
          // Write out the log sequence number that corresponds to this output
          // hfile.  The hfile is current up to and including logCacheFlushId.
          status.setStatus("Flushing " + this + ": appending metadata");
          writer.appendMetadata(logCacheFlushId, false);
          writer.appendFileInfo(StoreFile.KEYVALUE_COUNT, 
Bytes.toBytes(referenceKeyValueCount));
          blobWriter.appendMetadata(logCacheFlushId, false);
          blobWriter.appendFileInfo(StoreFile.KEYVALUE_COUNT, 
Bytes.toBytes(blobKeyValueCount));
          status.setStatus("Flushing " + this + ": closing flushed file");
          writer.close();
          blobWriter.close();
          
          /*
           * commit the blob file from tmp folder to real folder.
           */
          blobStore.commitFile(blobFilePath, targetPath);          
        }
      }
    } finally {
      flushedSize.set(flushed);
      scanner.close();
    }
    if (LOG.isInfoEnabled()) {
      LOG.info("Flushed " +
               ", sequenceid=" + logCacheFlushId +
               ", memsize=" + StringUtils.humanReadableInt(flushed) +
               ", into tmp file " + referenceFilePath);
    }
    return referenceFilePath;
  }
{code}
                
> Make Store flush algorithm pluggable
> ------------------------------------
>
>                 Key: HBASE-8024
>                 URL: https://issues.apache.org/jira/browse/HBASE-8024
>             Project: HBase
>          Issue Type: Sub-task
>          Components: regionserver
>    Affects Versions: 0.95.0, 0.96.0, 0.94.5
>            Reporter: Maryann Xue
>            Assignee: Maryann Xue
>         Attachments: HBASE-8024-trunk.patch, HBASE-8024.v2.patch
>
>
> The idea is to make "StoreFlusher" an interface instead of an implementation 
> class, and have the original StoreFlusher as the default store flush impl.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

Reply via email to