Repository: phoenix Updated Branches: refs/heads/5.x-HBase-2.0 d36558f4a -> 645fc39e8
PHOENIX-4340 Implements Observer interfaces instead of extending base observers classes Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/645fc39e Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/645fc39e Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/645fc39e Branch: refs/heads/5.x-HBase-2.0 Commit: 645fc39e823f503e5503ffd4b26f126f84449c2d Parents: d36558f Author: Ankit Singhal <ankitsingha...@gmail.com> Authored: Wed Nov 8 15:13:12 2017 +0530 Committer: Ankit Singhal <ankitsingha...@gmail.com> Committed: Wed Nov 8 15:13:12 2017 +0530 ---------------------------------------------------------------------- .../coprocessor/BaseScannerRegionObserver.java | 10 +- .../coprocessor/DelegateRegionObserver.java | 421 ++----------------- .../coprocessor/DelegateRegionScanner.java | 18 +- .../coprocessor/MetaDataRegionObserver.java | 12 +- .../coprocessor/SequenceRegionObserver.java | 6 +- .../UngroupedAggregateRegionObserver.java | 42 +- .../org/apache/phoenix/hbase/index/Indexer.java | 47 +-- .../hbase/index/master/IndexMasterObserver.java | 4 +- .../index/PhoenixTransactionalIndexer.java | 14 +- .../transaction/OmidTransactionContext.java | 5 +- .../transaction/PhoenixTransactionContext.java | 12 +- .../transaction/TephraTransactionContext.java | 20 +- 12 files changed, 123 insertions(+), 488 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/645fc39e/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java index d3b257b..95379a6 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java @@ -21,13 +21,12 @@ import java.io.IOException; import java.util.List; import org.apache.hadoop.hbase.Cell; -import org.apache.hadoop.hbase.CoprocessorEnvironment; import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.NotServingRegionException; import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver; import org.apache.hadoop.hbase.coprocessor.ObserverContext; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; +import org.apache.hadoop.hbase.coprocessor.RegionObserver; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.io.TimeRange; import org.apache.hadoop.hbase.regionserver.Region; @@ -50,7 +49,7 @@ import org.apache.phoenix.util.ScanUtil; import org.apache.phoenix.util.ServerUtil; -abstract public class BaseScannerRegionObserver extends BaseRegionObserver { +abstract public class BaseScannerRegionObserver implements RegionObserver { public static final String AGGREGATORS = "_Aggs"; public static final String UNORDERED_GROUP_BY_EXPRESSIONS = "_UnorderedGroupByExpressions"; @@ -135,11 +134,6 @@ abstract public class BaseScannerRegionObserver extends BaseRegionObserver { protected QualifierEncodingScheme encodingScheme; protected boolean useNewValueColumnQualifier; - @Override - public void start(CoprocessorEnvironment e) throws IOException { - super.start(e); - } - /** * Used by logger to identify coprocessor */ http://git-wip-us.apache.org/repos/asf/phoenix/blob/645fc39e/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/DelegateRegionObserver.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/DelegateRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/DelegateRegionObserver.java index 59b2271..34eee78 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/DelegateRegionObserver.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/DelegateRegionObserver.java @@ -18,15 +18,11 @@ package org.apache.phoenix.coprocessor; import java.io.IOException; -import java.security.PrivilegedExceptionAction; import java.util.List; -import java.util.NavigableSet; +import java.util.Map; -import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Cell; -import org.apache.hadoop.hbase.CoprocessorEnvironment; -import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.client.Append; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Durability; @@ -34,36 +30,23 @@ import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Increment; import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.coprocessor.ObserverContext; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; import org.apache.hadoop.hbase.coprocessor.RegionObserver; -import org.apache.hadoop.hbase.filter.ByteArrayComparable; -import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; -import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper; -import org.apache.hadoop.hbase.io.Reference; -import org.apache.hadoop.hbase.io.hfile.CacheConfig; -import org.apache.hadoop.hbase.regionserver.DeleteTracker; import org.apache.hadoop.hbase.regionserver.InternalScanner; -import org.apache.hadoop.hbase.regionserver.KeyValueScanner; import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress; -import org.apache.hadoop.hbase.regionserver.Region; import org.apache.hadoop.hbase.regionserver.Region.Operation; import org.apache.hadoop.hbase.regionserver.RegionScanner; -import org.apache.hadoop.hbase.regionserver.ScanType; import org.apache.hadoop.hbase.regionserver.Store; import org.apache.hadoop.hbase.regionserver.StoreFile; -import org.apache.hadoop.hbase.regionserver.StoreFile.Reader; -import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; -import org.apache.hadoop.hbase.regionserver.wal.HLogKey; -import org.apache.hadoop.hbase.regionserver.wal.WALEdit; -import org.apache.hadoop.hbase.security.User; +import org.apache.hadoop.hbase.regionserver.querymatcher.DeleteTracker; import org.apache.hadoop.hbase.util.Pair; +import org.apache.hadoop.hbase.wal.WALEdit; import org.apache.hadoop.hbase.wal.WALKey; -import com.google.common.collect.ImmutableList; - public class DelegateRegionObserver implements RegionObserver { protected final RegionObserver delegate; @@ -72,15 +55,7 @@ public class DelegateRegionObserver implements RegionObserver { this.delegate = delegate; } - @Override - public void start(CoprocessorEnvironment env) throws IOException { - delegate.start(env); - } - - @Override - public void stop(CoprocessorEnvironment env) throws IOException { - delegate.stop(env); - } + @Override public void preOpen(ObserverContext<RegionCoprocessorEnvironment> c) throws IOException { @@ -97,11 +72,6 @@ public class DelegateRegionObserver implements RegionObserver { delegate.postLogReplay(c); } - @Override - public InternalScanner preFlushScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c, - Store store, KeyValueScanner memstoreScanner, InternalScanner s) throws IOException { - return delegate.preFlushScannerOpen(c, store, memstoreScanner, s); - } @Override public void preFlush(ObserverContext<RegionCoprocessorEnvironment> c) throws IOException { @@ -125,236 +95,7 @@ public class DelegateRegionObserver implements RegionObserver { delegate.postFlush(c, store, resultFile); } - // Compaction and split upcalls run with the effective user context of the requesting user. - // This will lead to failure of cross cluster RPC if the effective user is not - // the login user. Switch to the login user context to ensure we have the expected - // security context. - - @Override - public void preCompactSelection(final ObserverContext<RegionCoprocessorEnvironment> c, final Store store, - final List<StoreFile> candidates, final CompactionRequest request) throws IOException { - User.runAsLoginUser(new PrivilegedExceptionAction<Void>() { - @Override - public Void run() throws Exception { - delegate.preCompactSelection(c, store, candidates, request); - return null; - } - }); - } - - @Override - public void preCompactSelection(final ObserverContext<RegionCoprocessorEnvironment> c, final Store store, - final List<StoreFile> candidates) throws IOException { - User.runAsLoginUser(new PrivilegedExceptionAction<Void>() { - @Override - public Void run() throws Exception { - delegate.preCompactSelection(c, store, candidates); - return null; - } - }); - } - - @Override - public void postCompactSelection(final ObserverContext<RegionCoprocessorEnvironment> c, final Store store, - final ImmutableList<StoreFile> selected, final CompactionRequest request) { - try { - User.runAsLoginUser(new PrivilegedExceptionAction<Void>() { - @Override - public Void run() throws Exception { - delegate.postCompactSelection(c, store, selected, request); - return null; - } - }); - } catch (IOException e) { - throw new RuntimeException(e); - } - } - - @Override - public void postCompactSelection(final ObserverContext<RegionCoprocessorEnvironment> c, final Store store, - final ImmutableList<StoreFile> selected) { - try { - User.runAsLoginUser(new PrivilegedExceptionAction<Void>() { - @Override - public Void run() throws Exception { - delegate.postCompactSelection(c, store, selected); - return null; - } - }); - } catch (IOException e) { - throw new RuntimeException(e); - } - } - - @Override - public InternalScanner preCompact(final ObserverContext<RegionCoprocessorEnvironment> c, final Store store, - final InternalScanner scanner, final ScanType scanType, final CompactionRequest request) - throws IOException { - return User.runAsLoginUser(new PrivilegedExceptionAction<InternalScanner>() { - @Override - public InternalScanner run() throws Exception { - return delegate.preCompact(c, store, scanner, scanType, request); - } - }); - } - - @Override - public InternalScanner preCompact(final ObserverContext<RegionCoprocessorEnvironment> c, final Store store, - final InternalScanner scanner, final ScanType scanType) throws IOException { - return User.runAsLoginUser(new PrivilegedExceptionAction<InternalScanner>() { - @Override - public InternalScanner run() throws Exception { - return delegate.preCompact(c, store, scanner, scanType); - } - }); - } - - @Override - public InternalScanner preCompactScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c, - final Store store, final List<? extends KeyValueScanner> scanners, final ScanType scanType, - final long earliestPutTs, final InternalScanner s, final CompactionRequest request) throws IOException { - return User.runAsLoginUser(new PrivilegedExceptionAction<InternalScanner>() { - @Override - public InternalScanner run() throws Exception { - return delegate.preCompactScannerOpen(c, store, scanners, scanType, earliestPutTs, s, - request); - } - }); - } - - @Override - public InternalScanner preCompactScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c, - final Store store, final List<? extends KeyValueScanner> scanners, final ScanType scanType, - final long earliestPutTs, final InternalScanner s) throws IOException { - return User.runAsLoginUser(new PrivilegedExceptionAction<InternalScanner>() { - @Override - public InternalScanner run() throws Exception { - return delegate.preCompactScannerOpen(c, store, scanners, scanType, earliestPutTs, s); - } - }); - } - - @Override - public void postCompact(final ObserverContext<RegionCoprocessorEnvironment> c, final Store store, - final StoreFile resultFile, final CompactionRequest request) throws IOException { - User.runAsLoginUser(new PrivilegedExceptionAction<Void>() { - @Override - public Void run() throws Exception { - delegate.postCompact(c, store, resultFile, request); - return null; - } - }); - } - - @Override - public void postCompact(final ObserverContext<RegionCoprocessorEnvironment> c, final Store store, - final StoreFile resultFile) throws IOException { - User.runAsLoginUser(new PrivilegedExceptionAction<Void>() { - @Override - public Void run() throws Exception { - delegate.postCompact(c, store, resultFile); - return null; - } - }); - } - - @Override - public void preSplit(final ObserverContext<RegionCoprocessorEnvironment> c) throws IOException { - User.runAsLoginUser(new PrivilegedExceptionAction<Void>() { - @Override - public Void run() throws Exception { - delegate.preSplit(c); - return null; - } - }); - } - - @Override - public void preSplit(final ObserverContext<RegionCoprocessorEnvironment> c, final byte[] splitRow) - throws IOException { - User.runAsLoginUser(new PrivilegedExceptionAction<Void>() { - @Override - public Void run() throws Exception { - delegate.preSplit(c, splitRow); - return null; - } - }); - } - - @Override - public void postSplit(final ObserverContext<RegionCoprocessorEnvironment> c, final Region l, final Region r) - throws IOException { - User.runAsLoginUser(new PrivilegedExceptionAction<Void>() { - @Override - public Void run() throws Exception { - delegate.postSplit(c, l, r); - return null; - } - }); - } - - @Override - public void preSplitBeforePONR(final ObserverContext<RegionCoprocessorEnvironment> ctx, - final byte[] splitKey, final List<Mutation> metaEntries) throws IOException { - User.runAsLoginUser(new PrivilegedExceptionAction<Void>() { - @Override - public Void run() throws Exception { - delegate.preSplitBeforePONR(ctx, splitKey, metaEntries); - return null; - } - }); - } - - @Override - public void preSplitAfterPONR(final ObserverContext<RegionCoprocessorEnvironment> ctx) - throws IOException { - User.runAsLoginUser(new PrivilegedExceptionAction<Void>() { - @Override - public Void run() throws Exception { - delegate.preSplitAfterPONR(ctx); - return null; - } - }); - } - - @Override - public void preRollBackSplit(final ObserverContext<RegionCoprocessorEnvironment> ctx) - throws IOException { - User.runAsLoginUser(new PrivilegedExceptionAction<Void>() { - @Override - public Void run() throws Exception { - delegate.preRollBackSplit(ctx); - return null; - } - }); - } - - @Override - public void postRollBackSplit(final ObserverContext<RegionCoprocessorEnvironment> ctx) - throws IOException { - User.runAsLoginUser(new PrivilegedExceptionAction<Void>() { - @Override - public Void run() throws Exception { - delegate.postRollBackSplit(ctx); - return null; - } - }); - } - - @Override - public void postCompleteSplit(final ObserverContext<RegionCoprocessorEnvironment> ctx) - throws IOException { - // NOTE: This one is an exception and doesn't need a context change. Should - // be infrequent and overhead is low, so let's ensure we have the right context - // anyway to avoid potential surprise. - User.runAsLoginUser(new PrivilegedExceptionAction<Void>() { - @Override - public Void run() throws Exception { - delegate.postCompleteSplit(ctx); - return null; - } - }); - } + @Override public void preClose(ObserverContext<RegionCoprocessorEnvironment> c, boolean abortRequested) @@ -366,19 +107,7 @@ public class DelegateRegionObserver implements RegionObserver { public void postClose(ObserverContext<RegionCoprocessorEnvironment> c, boolean abortRequested) { delegate.postClose(c, abortRequested); } - - @Override - public void preGetClosestRowBefore(ObserverContext<RegionCoprocessorEnvironment> c, byte[] row, - byte[] family, Result result) throws IOException { - delegate.preGetClosestRowBefore(c, row, family, result); - } - - @Override - public void postGetClosestRowBefore(ObserverContext<RegionCoprocessorEnvironment> c, - byte[] row, byte[] family, Result result) throws IOException { - delegate.postGetClosestRowBefore(c, row, family, result); - } - + @Override public void preGetOp(ObserverContext<RegionCoprocessorEnvironment> c, Get get, List<Cell> result) @@ -466,69 +195,6 @@ public class DelegateRegionObserver implements RegionObserver { } @Override - public boolean preCheckAndPut(ObserverContext<RegionCoprocessorEnvironment> c, byte[] row, - byte[] family, byte[] qualifier, CompareOp compareOp, ByteArrayComparable comparator, - Put put, boolean result) throws IOException { - return delegate.preCheckAndPut(c, row, family, qualifier, compareOp, comparator, put, - result); - } - - @Override - public boolean preCheckAndPutAfterRowLock(ObserverContext<RegionCoprocessorEnvironment> c, - byte[] row, byte[] family, byte[] qualifier, CompareOp compareOp, - ByteArrayComparable comparator, Put put, boolean result) throws IOException { - return delegate.preCheckAndPutAfterRowLock(c, row, family, qualifier, compareOp, - comparator, put, result); - } - - @Override - public boolean postCheckAndPut(ObserverContext<RegionCoprocessorEnvironment> c, byte[] row, - byte[] family, byte[] qualifier, CompareOp compareOp, ByteArrayComparable comparator, - Put put, boolean result) throws IOException { - return delegate.postCheckAndPut(c, row, family, qualifier, compareOp, comparator, put, - result); - } - - @Override - public boolean preCheckAndDelete(ObserverContext<RegionCoprocessorEnvironment> c, byte[] row, - byte[] family, byte[] qualifier, CompareOp compareOp, ByteArrayComparable comparator, - Delete delete, boolean result) throws IOException { - return delegate.preCheckAndDelete(c, row, family, qualifier, compareOp, comparator, delete, - result); - } - - @Override - public boolean preCheckAndDeleteAfterRowLock(ObserverContext<RegionCoprocessorEnvironment> c, - byte[] row, byte[] family, byte[] qualifier, CompareOp compareOp, - ByteArrayComparable comparator, Delete delete, boolean result) throws IOException { - return delegate.preCheckAndDeleteAfterRowLock(c, row, family, qualifier, compareOp, - comparator, delete, result); - } - - @Override - public boolean postCheckAndDelete(ObserverContext<RegionCoprocessorEnvironment> c, byte[] row, - byte[] family, byte[] qualifier, CompareOp compareOp, ByteArrayComparable comparator, - Delete delete, boolean result) throws IOException { - return delegate.postCheckAndDelete(c, row, family, qualifier, compareOp, comparator, - delete, result); - } - - @Override - public long preIncrementColumnValue(ObserverContext<RegionCoprocessorEnvironment> c, - byte[] row, byte[] family, byte[] qualifier, long amount, boolean writeToWAL) - throws IOException { - return delegate.preIncrementColumnValue(c, row, family, qualifier, amount, writeToWAL); - } - - @Override - public long postIncrementColumnValue(ObserverContext<RegionCoprocessorEnvironment> c, - byte[] row, byte[] family, byte[] qualifier, long amount, boolean writeToWAL, - long result) throws IOException { - return delegate.postIncrementColumnValue(c, row, family, qualifier, amount, writeToWAL, - result); - } - - @Override public Result preAppend(ObserverContext<RegionCoprocessorEnvironment> c, Append append) throws IOException { return delegate.preAppend(c, append); @@ -572,13 +238,6 @@ public class DelegateRegionObserver implements RegionObserver { } @Override - public KeyValueScanner preStoreScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c, - Store store, Scan scan, NavigableSet<byte[]> targetCols, KeyValueScanner s) - throws IOException { - return delegate.preStoreScannerOpen(c, store, scan, targetCols, s); - } - - @Override public RegionScanner postScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c, Scan scan, RegionScanner s) throws IOException { return delegate.postScannerOpen(c, scan, s); @@ -596,12 +255,7 @@ public class DelegateRegionObserver implements RegionObserver { return delegate.postScannerNext(c, s, result, limit, hasNext); } - @Override - public boolean postScannerFilterRow(ObserverContext<RegionCoprocessorEnvironment> c, - InternalScanner s, byte[] currentRow, int offset, short length, boolean hasMore) - throws IOException { - return delegate.postScannerFilterRow(c, s, currentRow, offset, length, hasMore); - } + @Override public void preScannerClose(ObserverContext<RegionCoprocessorEnvironment> c, InternalScanner s) @@ -617,28 +271,19 @@ public class DelegateRegionObserver implements RegionObserver { } @Override - public void preWALRestore(ObserverContext<? extends RegionCoprocessorEnvironment> ctx, - HRegionInfo info, WALKey logKey, WALEdit logEdit) throws IOException { - delegate.preWALRestore(ctx, info, logKey, logEdit); - } - - @Override - public void preWALRestore(ObserverContext<RegionCoprocessorEnvironment> ctx, HRegionInfo info, - HLogKey logKey, WALEdit logEdit) throws IOException { + public void preWALRestore(ObserverContext<? extends RegionCoprocessorEnvironment> ctx, RegionInfo info, + WALKey logKey, WALEdit logEdit) throws IOException { delegate.preWALRestore(ctx, info, logKey, logEdit); } - + + @Override - public void postWALRestore(ObserverContext<? extends RegionCoprocessorEnvironment> ctx, - HRegionInfo info, WALKey logKey, WALEdit logEdit) throws IOException { + public void postWALRestore(ObserverContext<? extends RegionCoprocessorEnvironment> ctx, RegionInfo info, + WALKey logKey, WALEdit logEdit) throws IOException { delegate.postWALRestore(ctx, info, logKey, logEdit); } - @Override - public void postWALRestore(ObserverContext<RegionCoprocessorEnvironment> ctx, HRegionInfo info, - HLogKey logKey, WALEdit logEdit) throws IOException { - delegate.postWALRestore(ctx, info, logKey, logEdit); - } + @Override public void preBulkLoadHFile(ObserverContext<RegionCoprocessorEnvironment> ctx, @@ -646,37 +291,39 @@ public class DelegateRegionObserver implements RegionObserver { delegate.preBulkLoadHFile(ctx, familyPaths); } + @Override - public boolean postBulkLoadHFile(ObserverContext<RegionCoprocessorEnvironment> ctx, - List<Pair<byte[], String>> familyPaths, boolean hasLoaded) throws IOException { - return delegate.postBulkLoadHFile(ctx, familyPaths, hasLoaded); + public Cell postMutationBeforeWAL(ObserverContext<RegionCoprocessorEnvironment> ctx, + MutationType opType, Mutation mutation, Cell oldCell, Cell newCell) throws IOException { + return delegate.postMutationBeforeWAL(ctx, opType, mutation, oldCell, newCell); } @Override - public Reader preStoreFileReaderOpen(ObserverContext<RegionCoprocessorEnvironment> ctx, - FileSystem fs, Path p, FSDataInputStreamWrapper in, long size, CacheConfig cacheConf, - Reference r, Reader reader) throws IOException { - return delegate.preStoreFileReaderOpen(ctx, fs, p, in, size, cacheConf, r, reader); + public DeleteTracker postInstantiateDeleteTracker( + ObserverContext<RegionCoprocessorEnvironment> ctx, DeleteTracker delTracker) + throws IOException { + return delegate.postInstantiateDeleteTracker(ctx, delTracker); } @Override - public Reader postStoreFileReaderOpen(ObserverContext<RegionCoprocessorEnvironment> ctx, - FileSystem fs, Path p, FSDataInputStreamWrapper in, long size, CacheConfig cacheConf, - Reference r, Reader reader) throws IOException { - return delegate.postStoreFileReaderOpen(ctx, fs, p, in, size, cacheConf, r, reader); + public void preCommitStoreFile(ObserverContext<RegionCoprocessorEnvironment> ctx, byte[] family, + List<Pair<Path, Path>> pairs) throws IOException { + delegate.preCommitStoreFile(ctx, family, pairs); + } @Override - public Cell postMutationBeforeWAL(ObserverContext<RegionCoprocessorEnvironment> ctx, - MutationType opType, Mutation mutation, Cell oldCell, Cell newCell) throws IOException { - return delegate.postMutationBeforeWAL(ctx, opType, mutation, oldCell, newCell); + public void postCommitStoreFile(ObserverContext<RegionCoprocessorEnvironment> ctx, byte[] family, Path srcPath, + Path dstPath) throws IOException { + delegate.postCommitStoreFile(ctx, family, srcPath, dstPath); + } @Override - public DeleteTracker postInstantiateDeleteTracker( - ObserverContext<RegionCoprocessorEnvironment> ctx, DeleteTracker delTracker) + public boolean postBulkLoadHFile(ObserverContext<RegionCoprocessorEnvironment> ctx, + List<Pair<byte[], String>> stagingFamilyPaths, Map<byte[], List<Path>> finalPaths, boolean hasLoaded) throws IOException { - return delegate.postInstantiateDeleteTracker(ctx, delTracker); + return delegate.postBulkLoadHFile(ctx, stagingFamilyPaths, finalPaths, hasLoaded); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/645fc39e/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/DelegateRegionScanner.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/DelegateRegionScanner.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/DelegateRegionScanner.java index 95d449a..21a8eef 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/DelegateRegionScanner.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/DelegateRegionScanner.java @@ -20,7 +20,7 @@ import java.io.IOException; import java.util.List; import org.apache.hadoop.hbase.Cell; -import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.regionserver.RegionScanner; import org.apache.hadoop.hbase.regionserver.ScannerContext; @@ -33,11 +33,6 @@ public class DelegateRegionScanner implements RegionScanner { } @Override - public HRegionInfo getRegionInfo() { - return delegate.getRegionInfo(); - } - - @Override public boolean isFilterDone() throws IOException { return delegate.isFilterDone(); } @@ -86,4 +81,15 @@ public class DelegateRegionScanner implements RegionScanner { public int getBatch() { return delegate.getBatch(); } + + @Override + public void shipped() throws IOException { + delegate.shipped(); + + } + + @Override + public RegionInfo getRegionInfo() { + return delegate.getRegionInfo(); + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/phoenix/blob/645fc39e/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java index c816549..e11ff14 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java @@ -26,6 +26,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Properties; import java.util.TimerTask; import java.util.concurrent.ScheduledThreadPoolExecutor; @@ -40,12 +41,12 @@ import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.CoprocessorEnvironment; import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.client.HTableInterface; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver; import org.apache.hadoop.hbase.coprocessor.ObserverContext; +import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; +import org.apache.hadoop.hbase.coprocessor.RegionObserver; import org.apache.hadoop.hbase.filter.CompareFilter; import org.apache.hadoop.hbase.filter.SingleColumnValueFilter; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; @@ -95,7 +96,7 @@ import com.google.common.collect.Maps; * to SYSTEM.TABLE. */ @SuppressWarnings("deprecation") -public class MetaDataRegionObserver extends BaseRegionObserver { +public class MetaDataRegionObserver implements RegionObserver,RegionCoprocessor { public static final Log LOG = LogFactory.getLog(MetaDataRegionObserver.class); public static final String REBUILD_INDEX_APPEND_TO_URL_STRING = "REBUILDINDEX"; private static final byte[] SYSTEM_CATALOG_KEY = SchemaUtil.getTableKey( @@ -117,6 +118,11 @@ public class MetaDataRegionObserver extends BaseRegionObserver { executor.shutdownNow(); GlobalCache.getInstance(c.getEnvironment()).getMetaDataCache().invalidateAll(); } + + @Override + public Optional<RegionObserver> getRegionObserver() { + return Optional.of(this); + } @Override public void start(CoprocessorEnvironment env) throws IOException { http://git-wip-us.apache.org/repos/asf/phoenix/blob/645fc39e/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/SequenceRegionObserver.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/SequenceRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/SequenceRegionObserver.java index 6773f36..8ef5e80 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/SequenceRegionObserver.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/SequenceRegionObserver.java @@ -34,9 +34,9 @@ import org.apache.hadoop.hbase.client.Increment; import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver; import org.apache.hadoop.hbase.coprocessor.ObserverContext; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; +import org.apache.hadoop.hbase.coprocessor.RegionObserver; import org.apache.hadoop.hbase.io.TimeRange; import org.apache.hadoop.hbase.regionserver.Region; import org.apache.hadoop.hbase.regionserver.Region.RowLock; @@ -73,7 +73,7 @@ import com.google.common.collect.Lists; * * @since 3.0.0 */ -public class SequenceRegionObserver extends BaseRegionObserver { +public class SequenceRegionObserver implements RegionObserver { public static final String OPERATION_ATTRIB = "SEQUENCE_OPERATION"; public static final String MAX_TIMERANGE_ATTRIB = "MAX_TIMERANGE"; public static final String CURRENT_VALUE_ATTRIB = "CURRENT_VALUE"; @@ -399,7 +399,7 @@ public class SequenceRegionObserver extends BaseRegionObserver { Mutation m = null; switch (op) { case RETURN_SEQUENCE: - KeyValue currentValueKV = result.raw()[0]; + KeyValue currentValueKV = org.apache.hadoop.hbase.KeyValueUtil.ensureKeyValue(result.rawCells()[0]); long expectedValue = PLong.INSTANCE.getCodec().decodeLong(append.getAttribute(CURRENT_VALUE_ATTRIB), 0, SortOrder.getDefault()); long value = PLong.INSTANCE.getCodec().decodeLong(currentValueKV.getValueArray(), currentValueKV.getValueOffset(), SortOrder.getDefault()); http://git-wip-us.apache.org/repos/asf/phoenix/blob/645fc39e/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java index c2f789a..a770aa0 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java @@ -36,6 +36,7 @@ import java.sql.SQLException; import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import java.util.Optional; import java.util.Set; import java.util.concurrent.Callable; @@ -53,22 +54,19 @@ import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.NamespaceDescriptor; import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.client.CoprocessorHConnection; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Durability; -import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.HTable; -import org.apache.hadoop.hbase.client.HTableInterface; import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.Put; -import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.coprocessor.ObserverContext; +import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; +import org.apache.hadoop.hbase.coprocessor.RegionObserver; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.ipc.RpcControllerFactory; import org.apache.hadoop.hbase.ipc.controller.InterRegionServerIndexRpcControllerFactory; -import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.regionserver.InternalScanner; import org.apache.hadoop.hbase.regionserver.Region; import org.apache.hadoop.hbase.regionserver.RegionScanner; @@ -81,7 +79,6 @@ import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.io.WritableUtils; import org.apache.phoenix.cache.ServerCacheClient; -import org.apache.phoenix.coprocessor.MetaDataProtocol.MutationCode; import org.apache.phoenix.coprocessor.generated.PTableProtos; import org.apache.phoenix.exception.DataExceedsCapacityException; import org.apache.phoenix.execute.TupleProjector; @@ -98,7 +95,6 @@ import org.apache.phoenix.hbase.index.util.KeyValueBuilder; import org.apache.phoenix.index.IndexMaintainer; import org.apache.phoenix.index.PhoenixIndexCodec; import org.apache.phoenix.jdbc.PhoenixConnection; -import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData; import org.apache.phoenix.join.HashJoinInfo; import org.apache.phoenix.query.QueryConstants; import org.apache.phoenix.query.QueryServices; @@ -136,7 +132,6 @@ import org.apache.phoenix.util.ExpressionUtil; import org.apache.phoenix.util.IndexUtil; import org.apache.phoenix.util.KeyValueUtil; import org.apache.phoenix.util.LogUtil; -import org.apache.phoenix.util.MetaDataUtil; import org.apache.phoenix.util.PhoenixRuntime; import org.apache.phoenix.util.PropertiesUtil; import org.apache.phoenix.util.QueryUtil; @@ -148,9 +143,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Predicate; import com.google.common.base.Throwables; -import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import com.google.common.collect.Sets; import com.google.common.primitives.Ints; @@ -162,7 +155,7 @@ import com.google.common.primitives.Ints; * * @since 0.1 */ -public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver { +public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver implements RegionCoprocessor { // TODO: move all constants into a single class public static final String UNGROUPED_AGG = "UngroupedAgg"; public static final String DELETE_AGG = "DeleteAgg"; @@ -206,8 +199,13 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver private Configuration compactionConfig; @Override + public Optional<RegionObserver> getRegionObserver() { + return Optional.of(this); + } + + + @Override public void start(CoprocessorEnvironment e) throws IOException { - super.start(e); // Can't use ClientKeyValueBuilder on server-side because the memstore expects to // be able to get a single backing buffer for a KeyValue. this.kvBuilder = GenericKeyValueBuilder.INSTANCE; @@ -243,7 +241,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver Mutation[] mutationArray = new Mutation[mutations.size()]; // When memstore size reaches blockingMemstoreSize we are waiting 3 seconds for the // flush happen which decrease the memstore size and then writes allowed on the region. - for (int i = 0; region.getMemstoreSize() > blockingMemstoreSize && i < 30; i++) { + for (int i = 0; region.getMemStoreSize() > blockingMemstoreSize && i < 30; i++) { try { checkForRegionClosing(); Thread.sleep(100); @@ -712,7 +710,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver Delete delete = new Delete(results.get(0).getRowArray(), results.get(0).getRowOffset(), results.get(0).getRowLength()); - delete.deleteColumns(deleteCF, deleteCQ, ts); + delete.addColumn(deleteCF, deleteCQ, ts); // force tephra to ignore this deletes delete.setAttribute(PhoenixTransactionContext.TX_ROLLBACK_ATTRIBUTE_KEY, new byte[0]); mutations.add(delete); @@ -902,8 +900,11 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver } @Override - public InternalScanner preCompact(final ObserverContext<RegionCoprocessorEnvironment> c, final Store store, - final InternalScanner scanner, final ScanType scanType) throws IOException { + public InternalScanner preCompact( + org.apache.hadoop.hbase.coprocessor.ObserverContext<RegionCoprocessorEnvironment> c, Store store, + InternalScanner scanner, ScanType scanType, + org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker tracker, + CompactionRequest request) throws IOException { // Compaction and split upcalls run with the effective user context of the requesting user. // This will lead to failure of cross cluster RPC if the effective user is not // the login user. Switch to the login user context to ensure we have the expected @@ -918,7 +919,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver long clientTimeStamp = EnvironmentEdgeManager.currentTimeMillis(); StatisticsCollector stats = StatisticsCollectorFactory.createStatisticsCollector( c.getEnvironment(), table.getNameAsString(), clientTimeStamp, - store.getFamily().getName()); + store.getColumnFamilyDescriptor().getName()); internalScanner = stats.createCompactionScanner(c.getEnvironment(), store, scanner); } catch (IOException e) { // If we can't reach the stats table, don't interrupt the normal @@ -934,8 +935,11 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver } @Override - public void postCompact(final ObserverContext<RegionCoprocessorEnvironment> c, final Store store, - final StoreFile resultFile, CompactionRequest request) throws IOException { + public void postCompact(org.apache.hadoop.hbase.coprocessor.ObserverContext<RegionCoprocessorEnvironment> c, + Store store, StoreFile resultFile, + org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker tracker, + CompactionRequest request) throws IOException { + // If we're compacting all files, then delete markers are removed // and we must permanently disable an index that needs to be // partially rebuild because we're potentially losing the information http://git-wip-us.apache.org/repos/asf/phoenix/blob/645fc39e/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java index 24eeab5..1c78fff 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java @@ -24,7 +24,6 @@ import static org.apache.phoenix.hbase.index.write.IndexWriterUtils.INDEX_WRITER import static org.apache.phoenix.hbase.index.write.IndexWriterUtils.INDEX_WRITER_RPC_RETRIES_NUMBER; import java.io.IOException; -import java.security.PrivilegedExceptionAction; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -32,6 +31,7 @@ import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Optional; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -50,23 +50,18 @@ import org.apache.hadoop.hbase.client.Increment; import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver; import org.apache.hadoop.hbase.coprocessor.ObserverContext; +import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; +import org.apache.hadoop.hbase.coprocessor.RegionObserver; import org.apache.hadoop.hbase.ipc.RpcControllerFactory; import org.apache.hadoop.hbase.ipc.controller.InterRegionServerIndexRpcControllerFactory; -import org.apache.hadoop.hbase.regionserver.InternalScanner; -import org.apache.hadoop.hbase.regionserver.KeyValueScanner; import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress; import org.apache.hadoop.hbase.regionserver.OperationStatus; import org.apache.hadoop.hbase.regionserver.Region; -import org.apache.hadoop.hbase.regionserver.ScanType; -import org.apache.hadoop.hbase.regionserver.Store; -import org.apache.hadoop.hbase.regionserver.wal.HLogKey; -import org.apache.hadoop.hbase.regionserver.wal.WALEdit; -import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; +import org.apache.hadoop.hbase.wal.WALEdit; import org.apache.htrace.Span; import org.apache.htrace.Trace; import org.apache.htrace.TraceScope; @@ -119,7 +114,7 @@ import com.google.common.collect.Multimap; * Phoenix always does batch mutations. * <p> */ -public class Indexer extends BaseRegionObserver { +public class Indexer implements RegionObserver, RegionCoprocessor { private static final Log LOG = LogFactory.getLog(Indexer.class); private static final OperationStatus IGNORE = new OperationStatus(OperationStatusCode.SUCCESS); @@ -199,6 +194,11 @@ public class Indexer extends BaseRegionObserver { private static final int DEFAULT_ROWLOCK_WAIT_DURATION = 30000; @Override + public Optional<RegionObserver> getRegionObserver() { + return Optional.of(this); + } + + @Override public void start(CoprocessorEnvironment e) throws IOException { try { final RegionCoprocessorEnvironment env = (RegionCoprocessorEnvironment) e; @@ -266,7 +266,6 @@ public class Indexer extends BaseRegionObserver { } } catch (NoSuchMethodError ex) { disabled = true; - super.start(e); LOG.error("Must be too early a version of HBase. Disabled coprocessor ", ex); } } @@ -301,7 +300,6 @@ public class Indexer extends BaseRegionObserver { return; } if (this.disabled) { - super.stop(e); return; } this.stopped = true; @@ -362,7 +360,6 @@ public class Indexer extends BaseRegionObserver { public void preBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c, MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException { if (this.disabled) { - super.preBatchMutate(c, miniBatchOp); return; } long start = EnvironmentEdgeManager.currentTimeMillis(); @@ -578,7 +575,6 @@ public class Indexer extends BaseRegionObserver { public void postBatchMutateIndispensably(ObserverContext<RegionCoprocessorEnvironment> c, MiniBatchOperationInProgress<Mutation> miniBatchOp, final boolean success) throws IOException { if (this.disabled) { - super.postBatchMutateIndispensably(c, miniBatchOp, success); return; } long start = EnvironmentEdgeManager.currentTimeMillis(); @@ -755,29 +751,6 @@ public class Indexer extends BaseRegionObserver { } } - /** - * Create a custom {@link InternalScanner} for a compaction that tracks the versions of rows that - * are removed so we can clean then up from the the index table(s). - * <p> - * This is not yet implemented - its not clear if we should even mess around with the Index table - * for these rows as those points still existed. TODO: v2 of indexing - */ - @Override - public InternalScanner preCompactScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c, - final Store store, final List<? extends KeyValueScanner> scanners, final ScanType scanType, - final long earliestPutTs, final InternalScanner s) throws IOException { - // Compaction and split upcalls run with the effective user context of the requesting user. - // This will lead to failure of cross cluster RPC if the effective user is not - // the login user. Switch to the login user context to ensure we have the expected - // security context. - // NOTE: Not necessary here at this time but leave in place to document this critical detail. - return User.runAsLoginUser(new PrivilegedExceptionAction<InternalScanner>() { - @Override - public InternalScanner run() throws Exception { - return Indexer.super.preCompactScannerOpen(c, store, scanners, scanType, earliestPutTs, s); - } - }); - } /** * Exposed for testing! http://git-wip-us.apache.org/repos/asf/phoenix/blob/645fc39e/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/master/IndexMasterObserver.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/master/IndexMasterObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/master/IndexMasterObserver.java index 2f83f8d..c2b177a 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/master/IndexMasterObserver.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/master/IndexMasterObserver.java @@ -17,12 +17,12 @@ */ package org.apache.phoenix.hbase.index.master; -import org.apache.hadoop.hbase.coprocessor.BaseMasterObserver; +import org.apache.hadoop.hbase.coprocessor.MasterObserver; /** * Defines of coprocessor hooks(to support secondary indexing) of operations on * {@link org.apache.hadoop.hbase.master.HMaster} process. */ -public class IndexMasterObserver extends BaseMasterObserver { +public class IndexMasterObserver implements MasterObserver { } http://git-wip-us.apache.org/repos/asf/phoenix/blob/645fc39e/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java index 3495267..f3c1dbd 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java @@ -33,6 +33,7 @@ import java.util.LinkedList; import java.util.List; import java.util.ListIterator; import java.util.Map; +import java.util.Optional; import java.util.Set; import org.apache.commons.logging.Log; @@ -45,14 +46,14 @@ import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValueUtil; import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.client.HTableInterface; import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver; import org.apache.hadoop.hbase.coprocessor.ObserverContext; +import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; +import org.apache.hadoop.hbase.coprocessor.RegionObserver; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.ipc.RpcControllerFactory; import org.apache.hadoop.hbase.ipc.controller.InterRegionServerIndexRpcControllerFactory; @@ -101,7 +102,7 @@ import com.google.common.primitives.Longs; * bit simpler than the non transactional case. For example, there's no need to muck with the WAL, as failure scenarios * are handled by aborting the transaction. */ -public class PhoenixTransactionalIndexer extends BaseRegionObserver { +public class PhoenixTransactionalIndexer implements RegionObserver, RegionCoprocessor { private static final Log LOG = LogFactory.getLog(PhoenixTransactionalIndexer.class); @@ -117,7 +118,12 @@ public class PhoenixTransactionalIndexer extends BaseRegionObserver { private PhoenixIndexCodec codec; private IndexWriter writer; private boolean stopped; - + + @Override + public Optional<RegionObserver> getRegionObserver() { + return Optional.of(this); + } + @Override public void start(CoprocessorEnvironment e) throws IOException { final RegionCoprocessorEnvironment env = (RegionCoprocessorEnvironment)e; http://git-wip-us.apache.org/repos/asf/phoenix/blob/645fc39e/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionContext.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionContext.java b/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionContext.java index d4553ec..6505cff 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionContext.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionContext.java @@ -19,10 +19,9 @@ package org.apache.phoenix.transaction; import java.io.IOException; import java.sql.SQLException; -import java.util.concurrent.TimeoutException; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver; +import org.apache.hadoop.hbase.coprocessor.RegionObserver; import org.apache.phoenix.jdbc.PhoenixEmbeddedDriver.ConnectionInfo; import org.apache.phoenix.schema.PTable; import org.apache.phoenix.util.ReadOnlyProps; @@ -128,7 +127,7 @@ public class OmidTransactionContext implements PhoenixTransactionContext { } @Override - public BaseRegionObserver getCoProcessor() { + public RegionObserver getCoProcessor() { // TODO Auto-generated method stub return null; } http://git-wip-us.apache.org/repos/asf/phoenix/blob/645fc39e/phoenix-core/src/main/java/org/apache/phoenix/transaction/PhoenixTransactionContext.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/transaction/PhoenixTransactionContext.java b/phoenix-core/src/main/java/org/apache/phoenix/transaction/PhoenixTransactionContext.java index d335692..0e46ae9 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/transaction/PhoenixTransactionContext.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/transaction/PhoenixTransactionContext.java @@ -17,18 +17,18 @@ */ package org.apache.phoenix.transaction; +import java.io.IOException; +import java.sql.SQLException; +import java.util.concurrent.TimeoutException; + import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver; +import org.apache.hadoop.hbase.coprocessor.RegionObserver; import org.apache.phoenix.jdbc.PhoenixEmbeddedDriver.ConnectionInfo; import org.apache.phoenix.schema.PTable; import org.apache.phoenix.util.ReadOnlyProps; import org.apache.twill.zookeeper.ZKClientService; import org.slf4j.Logger; -import java.io.IOException; -import java.sql.SQLException; -import java.util.concurrent.TimeoutException; - public interface PhoenixTransactionContext { /** @@ -166,7 +166,7 @@ public interface PhoenixTransactionContext { * * @return the coprocessor */ - public BaseRegionObserver getCoProcessor(); + public RegionObserver getCoProcessor(); /** * http://git-wip-us.apache.org/repos/asf/phoenix/blob/645fc39e/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionContext.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionContext.java b/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionContext.java index 7515a9c..ebd7d2d 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionContext.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionContext.java @@ -24,17 +24,19 @@ import java.util.List; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver; +import org.apache.hadoop.hbase.coprocessor.RegionObserver; import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.conf.Configuration; import org.apache.phoenix.exception.SQLExceptionCode; import org.apache.phoenix.exception.SQLExceptionInfo; import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.jdbc.PhoenixEmbeddedDriver.ConnectionInfo; import org.apache.phoenix.schema.PTable; +import org.apache.phoenix.transaction.PhoenixTransactionContext.PhoenixVisibilityLevel; import org.apache.phoenix.util.ReadOnlyProps; import org.apache.tephra.Transaction; +import org.apache.tephra.Transaction.VisibilityLevel; import org.apache.tephra.TransactionAware; import org.apache.tephra.TransactionCodec; import org.apache.tephra.TransactionConflictException; @@ -42,20 +44,19 @@ import org.apache.tephra.TransactionContext; import org.apache.tephra.TransactionFailureException; import org.apache.tephra.TransactionManager; import org.apache.tephra.TransactionSystemClient; -import org.apache.tephra.Transaction.VisibilityLevel; import org.apache.tephra.TxConstants; import org.apache.tephra.distributed.PooledClientProvider; +import org.apache.tephra.distributed.TransactionService; import org.apache.tephra.distributed.TransactionServiceClient; import org.apache.tephra.hbase.coprocessor.TransactionProcessor; import org.apache.tephra.inmemory.InMemoryTxSystemClient; +import org.apache.tephra.metrics.TxMetricsCollector; +import org.apache.tephra.persist.HDFSTransactionStateStorage; +import org.apache.tephra.snapshot.SnapshotCodecProvider; import org.apache.tephra.util.TxUtils; import org.apache.tephra.visibility.FenceWait; import org.apache.tephra.visibility.VisibilityFence; import org.apache.tephra.zookeeper.TephraZKClientService; -import org.apache.tephra.distributed.TransactionService; -import org.apache.tephra.metrics.TxMetricsCollector; -import org.apache.tephra.persist.HDFSTransactionStateStorage; -import org.apache.tephra.snapshot.SnapshotCodecProvider; import org.apache.twill.discovery.DiscoveryService; import org.apache.twill.discovery.ZKDiscoveryService; import org.apache.twill.internal.utils.Networks; @@ -63,13 +64,12 @@ import org.apache.twill.zookeeper.RetryStrategies; import org.apache.twill.zookeeper.ZKClientService; import org.apache.twill.zookeeper.ZKClientServices; import org.apache.twill.zookeeper.ZKClients; +import org.slf4j.Logger; import com.google.common.collect.ArrayListMultimap; import com.google.common.collect.Lists; import com.google.inject.util.Providers; -import org.slf4j.Logger; - public class TephraTransactionContext implements PhoenixTransactionContext { private static final TransactionCodec CODEC = new TransactionCodec(); @@ -423,7 +423,7 @@ public class TephraTransactionContext implements PhoenixTransactionContext { } @Override - public BaseRegionObserver getCoProcessor() { + public RegionObserver getCoProcessor() { return new TransactionProcessor(); }