[ 
https://issues.apache.org/jira/browse/HBASE-10254?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13859046#comment-13859046
 ] 

James Taylor commented on HBASE-10254:
--------------------------------------

As an experiment, I copy/pasted the HRegion.increment() code, adding a 
incrementTimestamp argument and returning null if the KeyValue doesn't exist 
(see below). Here's what I learned:
- you can't get at the HRegion.updatesLock, so this is likely problematic
- you can't do a HRegion.requestFlush()
- too much of the guts of the implementation leaks out to coprocessors IMO

I'm thinking to just have my own coprocessor that locks the row and does a Get 
followed by a Put through the HRegion methods. It'll be a lot less code.

package com.salesforce.phoenix.coprocessor;

import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.Collections;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.CoprocessorEnvironment;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.RegionTooBusyException;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.coprocessor.BaseEndpointCoprocessor;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.io.TimeRange;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hadoop.hbase.regionserver.WrongRegionException;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SequenceEndpointImpl extends BaseEndpointCoprocessor {
    private static final Logger LOG = 
LoggerFactory.getLogger(GroupedAggregateRegionObserver.class);
    private static final long DEFAULT_BUSY_WAIT_DURATION = 
HConstants.DEFAULT_HBASE_RPC_TIMEOUT;

    // If updating multiple rows in one call, wait longer,
    // i.e. waiting for busyWaitDuration * # of rows. However,
    // we can limit the max multiplier.
    private int maxBusyWaitMultiplier;

    // Max busy wait duration. There is no point to wait longer than the RPC
    // purge timeout, when a RPC call will be terminated by the RPC engine.
    private long maxBusyWaitDuration;
    // The internal wait duration to acquire a lock before read/update
    // from the region. It is not per row. The purpose of this wait time
    // is to avoid waiting a long time while the region is busy, so that
    // we can release the IPC handler soon enough to improve the
    // availability of the region server. It can be adjusted by
    // tuning configuration "hbase.busy.wait.duration".
    private long busyWaitDuration;
    private long memstoreFlushSize;
    private boolean deferredLogSyncDisabled;

    // Stop updates lock
    private final ReentrantReadWriteLock updatesLock = new 
ReentrantReadWriteLock();

    @Override
    public void start(CoprocessorEnvironment e) {
        Configuration conf = e.getConfiguration();
        this.busyWaitDuration = conf.getLong("hbase.busy.wait.duration", 
DEFAULT_BUSY_WAIT_DURATION);
        this.maxBusyWaitMultiplier = 
conf.getInt("hbase.busy.wait.multiplier.max", 2);
        if (busyWaitDuration * maxBusyWaitMultiplier <= 0L) { throw new 
IllegalArgumentException(
                "Invalid hbase.busy.wait.duration (" + busyWaitDuration + ") or 
hbase.busy.wait.multiplier.max ("
                        + maxBusyWaitMultiplier + "). Their product should be 
positive"); }
        this.maxBusyWaitDuration = conf.getLong("ipc.client.call.purge.timeout",
                2 * HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
        RegionCoprocessorEnvironment env = (RegionCoprocessorEnvironment)e;
        HRegion region = env.getRegion();
        long flushSize = region.getTableDesc().getMemStoreFlushSize();

        if (flushSize <= 0) {
            flushSize = conf.getLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE,
                    HTableDescriptor.DEFAULT_MEMSTORE_FLUSH_SIZE);
        }
        this.memstoreFlushSize = flushSize;
        // When hbase.regionserver.optionallogflushinterval <= 0 , deferred log 
sync is disabled.
        this.deferredLogSyncDisabled = 
conf.getLong("hbase.regionserver.optionallogflushinterval", 1 * 1000) <= 0;

    }

    public static boolean rowIsInRange(HRegion info, final byte[] row) {
        return ((info.getStartKey().length == 0) || 
(Bytes.compareTo(info.getStartKey(), row) <= 0))
                && ((info.getEndKey().length == 0) || 
(Bytes.compareTo(info.getEndKey(), row) > 0));
    }

    /** Make sure this is a valid row for the HRegion */
    void checkRow(final byte[] row, String op, HRegion region) throws 
IOException {
        if (!rowIsInRange(region, row)) { throw new 
WrongRegionException("Requested row out of range for " + op
                + " on HRegion " + this + ", startKey='" + 
Bytes.toStringBinary(region.getStartKey())
                + "', getEndKey()='" + Bytes.toStringBinary(region.getEndKey()) 
+ "', row='"
                + Bytes.toStringBinary(row) + "'"); }
    }

    private void lock(final Lock lock) throws RegionTooBusyException, 
InterruptedIOException {
        lock(lock, 1);
    }

    /**
     * Try to acquire a lock. Throw RegionTooBusyException if failed to get the 
lock in time. Throw
     * InterruptedIOException if interrupted while waiting for the lock.
     */
    private void lock(final Lock lock, final int multiplier) throws 
RegionTooBusyException, InterruptedIOException {
        try {
            final long waitTime = Math.min(maxBusyWaitDuration,
                    busyWaitDuration * Math.min(multiplier, 
maxBusyWaitMultiplier));
            if (!lock.tryLock(waitTime, TimeUnit.MILLISECONDS)) { throw new 
RegionTooBusyException(
                    "failed to get a lock in " + waitTime + "ms"); }
        } catch (InterruptedException ie) {
            LOG.info("Interrupted while waiting for a lock");
            InterruptedIOException iie = new InterruptedIOException();
            iie.initCause(ie);
            throw iie;
        }
    }

    @Override
    public RegionCoprocessorEnvironment getEnvironment() {
        return (RegionCoprocessorEnvironment)super.getEnvironment();
    }

    /*
     * @param size
     * @return True if size is over the flush threshold
     */
    private boolean isFlushSize(final long size) {
        return size > memstoreFlushSize;
    }

    /**
     * check if current region is deferred sync enabled.
     */
    private boolean isDeferredLogSyncEnabled(HRegion region) {
        return (region.getTableDesc().isDeferredLogFlush() && 
!this.deferredLogSyncDisabled);
    }

    public Long incrementColumnValue(byte[] row, byte[] family, byte[] 
qualifier, long amount, TimeRange tr,
            long incrementTimestamp, boolean writeToWAL) throws IOException {
        RegionCoprocessorEnvironment env = this.getEnvironment();
        HRegion region = env.getRegion();
        HTableDescriptor htableDescriptor = region.getTableDesc();
        checkRow(row, "increment", region);
        boolean flush = false;
        long txid = 0;
        // Lock row
        region.startRegionOperation();
        try {
            Integer lid = region.getLock(null, row, true);
            lock(this.updatesLock.readLock());
            try {
                Store store = region.getStore(family);

                // Get the old value:
                Get get = new Get(row);
                get.setTimeRange(tr.getMin(), tr.getMax());
                get.addColumn(family, qualifier);

                Result result = region.get(get);
                if (result.isEmpty()) { return null; }

                KeyValue kv = result.raw()[0];
                if (kv.getValueLength() == Bytes.SIZEOF_LONG) {
                    byte[] buffer = kv.getBuffer();
                    int valueOffset = kv.getValueOffset();
                    amount += Bytes.toLong(buffer, valueOffset, 
Bytes.SIZEOF_LONG);
                } else {
                    throw new DoNotRetryIOException("Attempted to increment 
field that isn't 64 bits wide");
                }
                // build the KeyValue now:
                KeyValue newKv = new KeyValue(row, family, qualifier, 
incrementTimestamp, Bytes.toBytes(amount));

                // now log it:
                if (writeToWAL) {
                    long now = EnvironmentEdgeManager.currentTimeMillis();
                    WALEdit walEdit = new WALEdit();
                    walEdit.add(newKv);
                    // Using default cluster id, as this can only happen in the
                    // orginating cluster. A slave cluster receives the final 
value (not
                    // the delta) as a Put.
                    txid = region.getLog().appendNoSync(region.getRegionInfo(), 
htableDescriptor.getName(), walEdit,
                            HConstants.DEFAULT_CLUSTER_ID, now, 
htableDescriptor);
                }

                long size = store.upsert(Collections.singletonList(newKv));
                size = region.addAndGetGlobalMemstoreSize(size);
                flush = isFlushSize(size);
            } finally {
                this.updatesLock.readLock().unlock();
                region.releaseRowLock(lid);
            }
            if (writeToWAL) {
                // sync the transaction log outside the rowlock
                if (!isDeferredLogSyncEnabled(region)) {
                    region.getLog().sync(txid);
                }
            }
        } finally {
            region.closeRegionOperation();
        }

        if (flush) {
            // Request a cache flush. Do it outside update lock.
            // This method isn't exposed on HRegion, so we can't do this
            // requestFlush();
        }
        return amount;
    }

}


> Optionally return null when attempting to Increment KeyValue that doesn't 
> exist
> -------------------------------------------------------------------------------
>
>                 Key: HBASE-10254
>                 URL: https://issues.apache.org/jira/browse/HBASE-10254
>             Project: HBase
>          Issue Type: Bug
>            Reporter: James Taylor
>
> Instead of creating a new KeyValue starting from 0 when an Increment is done 
> on a row that doesn't exist, we should optionally return null. A Get is 
> already being done, so it's easy to detect this case. This can be done in a 
> backward compatible manner if the behavior is done optionally. In addition, 
> Increment does not allow me to specify the timestamp to use for the KeyValue. 
> This should be added as well.



--
This message was sent by Atlassian JIRA
(v6.1.5#6160)

Reply via email to