[
https://issues.apache.org/jira/browse/HBASE-10254?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13859046#comment-13859046
]
James Taylor commented on HBASE-10254:
--------------------------------------
As an experiment, I copy/pasted the HRegion.increment() code, adding a
incrementTimestamp argument and returning null if the KeyValue doesn't exist
(see below). Here's what I learned:
- you can't get at the HRegion.updatesLock, so this is likely problematic
- you can't do a HRegion.requestFlush()
- too much of the guts of the implementation leaks out to coprocessors IMO
I'm thinking to just have my own coprocessor that locks the row and does a Get
followed by a Put through the HRegion methods. It'll be a lot less code.
package com.salesforce.phoenix.coprocessor;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.Collections;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.CoprocessorEnvironment;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.RegionTooBusyException;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.coprocessor.BaseEndpointCoprocessor;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.io.TimeRange;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hadoop.hbase.regionserver.WrongRegionException;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class SequenceEndpointImpl extends BaseEndpointCoprocessor {
private static final Logger LOG =
LoggerFactory.getLogger(GroupedAggregateRegionObserver.class);
private static final long DEFAULT_BUSY_WAIT_DURATION =
HConstants.DEFAULT_HBASE_RPC_TIMEOUT;
// If updating multiple rows in one call, wait longer,
// i.e. waiting for busyWaitDuration * # of rows. However,
// we can limit the max multiplier.
private int maxBusyWaitMultiplier;
// Max busy wait duration. There is no point to wait longer than the RPC
// purge timeout, when a RPC call will be terminated by the RPC engine.
private long maxBusyWaitDuration;
// The internal wait duration to acquire a lock before read/update
// from the region. It is not per row. The purpose of this wait time
// is to avoid waiting a long time while the region is busy, so that
// we can release the IPC handler soon enough to improve the
// availability of the region server. It can be adjusted by
// tuning configuration "hbase.busy.wait.duration".
private long busyWaitDuration;
private long memstoreFlushSize;
private boolean deferredLogSyncDisabled;
// Stop updates lock
private final ReentrantReadWriteLock updatesLock = new
ReentrantReadWriteLock();
@Override
public void start(CoprocessorEnvironment e) {
Configuration conf = e.getConfiguration();
this.busyWaitDuration = conf.getLong("hbase.busy.wait.duration",
DEFAULT_BUSY_WAIT_DURATION);
this.maxBusyWaitMultiplier =
conf.getInt("hbase.busy.wait.multiplier.max", 2);
if (busyWaitDuration * maxBusyWaitMultiplier <= 0L) { throw new
IllegalArgumentException(
"Invalid hbase.busy.wait.duration (" + busyWaitDuration + ") or
hbase.busy.wait.multiplier.max ("
+ maxBusyWaitMultiplier + "). Their product should be
positive"); }
this.maxBusyWaitDuration = conf.getLong("ipc.client.call.purge.timeout",
2 * HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
RegionCoprocessorEnvironment env = (RegionCoprocessorEnvironment)e;
HRegion region = env.getRegion();
long flushSize = region.getTableDesc().getMemStoreFlushSize();
if (flushSize <= 0) {
flushSize = conf.getLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE,
HTableDescriptor.DEFAULT_MEMSTORE_FLUSH_SIZE);
}
this.memstoreFlushSize = flushSize;
// When hbase.regionserver.optionallogflushinterval <= 0 , deferred log
sync is disabled.
this.deferredLogSyncDisabled =
conf.getLong("hbase.regionserver.optionallogflushinterval", 1 * 1000) <= 0;
}
public static boolean rowIsInRange(HRegion info, final byte[] row) {
return ((info.getStartKey().length == 0) ||
(Bytes.compareTo(info.getStartKey(), row) <= 0))
&& ((info.getEndKey().length == 0) ||
(Bytes.compareTo(info.getEndKey(), row) > 0));
}
/** Make sure this is a valid row for the HRegion */
void checkRow(final byte[] row, String op, HRegion region) throws
IOException {
if (!rowIsInRange(region, row)) { throw new
WrongRegionException("Requested row out of range for " + op
+ " on HRegion " + this + ", startKey='" +
Bytes.toStringBinary(region.getStartKey())
+ "', getEndKey()='" + Bytes.toStringBinary(region.getEndKey())
+ "', row='"
+ Bytes.toStringBinary(row) + "'"); }
}
private void lock(final Lock lock) throws RegionTooBusyException,
InterruptedIOException {
lock(lock, 1);
}
/**
* Try to acquire a lock. Throw RegionTooBusyException if failed to get the
lock in time. Throw
* InterruptedIOException if interrupted while waiting for the lock.
*/
private void lock(final Lock lock, final int multiplier) throws
RegionTooBusyException, InterruptedIOException {
try {
final long waitTime = Math.min(maxBusyWaitDuration,
busyWaitDuration * Math.min(multiplier,
maxBusyWaitMultiplier));
if (!lock.tryLock(waitTime, TimeUnit.MILLISECONDS)) { throw new
RegionTooBusyException(
"failed to get a lock in " + waitTime + "ms"); }
} catch (InterruptedException ie) {
LOG.info("Interrupted while waiting for a lock");
InterruptedIOException iie = new InterruptedIOException();
iie.initCause(ie);
throw iie;
}
}
@Override
public RegionCoprocessorEnvironment getEnvironment() {
return (RegionCoprocessorEnvironment)super.getEnvironment();
}
/*
* @param size
* @return True if size is over the flush threshold
*/
private boolean isFlushSize(final long size) {
return size > memstoreFlushSize;
}
/**
* check if current region is deferred sync enabled.
*/
private boolean isDeferredLogSyncEnabled(HRegion region) {
return (region.getTableDesc().isDeferredLogFlush() &&
!this.deferredLogSyncDisabled);
}
public Long incrementColumnValue(byte[] row, byte[] family, byte[]
qualifier, long amount, TimeRange tr,
long incrementTimestamp, boolean writeToWAL) throws IOException {
RegionCoprocessorEnvironment env = this.getEnvironment();
HRegion region = env.getRegion();
HTableDescriptor htableDescriptor = region.getTableDesc();
checkRow(row, "increment", region);
boolean flush = false;
long txid = 0;
// Lock row
region.startRegionOperation();
try {
Integer lid = region.getLock(null, row, true);
lock(this.updatesLock.readLock());
try {
Store store = region.getStore(family);
// Get the old value:
Get get = new Get(row);
get.setTimeRange(tr.getMin(), tr.getMax());
get.addColumn(family, qualifier);
Result result = region.get(get);
if (result.isEmpty()) { return null; }
KeyValue kv = result.raw()[0];
if (kv.getValueLength() == Bytes.SIZEOF_LONG) {
byte[] buffer = kv.getBuffer();
int valueOffset = kv.getValueOffset();
amount += Bytes.toLong(buffer, valueOffset,
Bytes.SIZEOF_LONG);
} else {
throw new DoNotRetryIOException("Attempted to increment
field that isn't 64 bits wide");
}
// build the KeyValue now:
KeyValue newKv = new KeyValue(row, family, qualifier,
incrementTimestamp, Bytes.toBytes(amount));
// now log it:
if (writeToWAL) {
long now = EnvironmentEdgeManager.currentTimeMillis();
WALEdit walEdit = new WALEdit();
walEdit.add(newKv);
// Using default cluster id, as this can only happen in the
// orginating cluster. A slave cluster receives the final
value (not
// the delta) as a Put.
txid = region.getLog().appendNoSync(region.getRegionInfo(),
htableDescriptor.getName(), walEdit,
HConstants.DEFAULT_CLUSTER_ID, now,
htableDescriptor);
}
long size = store.upsert(Collections.singletonList(newKv));
size = region.addAndGetGlobalMemstoreSize(size);
flush = isFlushSize(size);
} finally {
this.updatesLock.readLock().unlock();
region.releaseRowLock(lid);
}
if (writeToWAL) {
// sync the transaction log outside the rowlock
if (!isDeferredLogSyncEnabled(region)) {
region.getLog().sync(txid);
}
}
} finally {
region.closeRegionOperation();
}
if (flush) {
// Request a cache flush. Do it outside update lock.
// This method isn't exposed on HRegion, so we can't do this
// requestFlush();
}
return amount;
}
}
> Optionally return null when attempting to Increment KeyValue that doesn't
> exist
> -------------------------------------------------------------------------------
>
> Key: HBASE-10254
> URL: https://issues.apache.org/jira/browse/HBASE-10254
> Project: HBase
> Issue Type: Bug
> Reporter: James Taylor
>
> Instead of creating a new KeyValue starting from 0 when an Increment is done
> on a row that doesn't exist, we should optionally return null. A Get is
> already being done, so it's easy to detect this case. This can be done in a
> backward compatible manner if the behavior is done optionally. In addition,
> Increment does not allow me to specify the timestamp to use for the KeyValue.
> This should be added as well.
--
This message was sent by Atlassian JIRA
(v6.1.5#6160)