PHOENIX-3037 Setup proper security context in compaction/split coprocessor hooks
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/96234fa3 Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/96234fa3 Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/96234fa3 Branch: refs/heads/4.x-HBase-1.1 Commit: 96234fa32124a5dc731709e130b81042463ef11d Parents: 1d29ec0 Author: Andrew Purtell <apurt...@apache.org> Authored: Wed Jun 29 12:22:04 2016 -0700 Committer: Andrew Purtell <apurt...@apache.org> Committed: Thu Jun 30 14:24:19 2016 -0700 ---------------------------------------------------------------------- .../coprocessor/DelegateRegionObserver.java | 248 ++++++++++++++----- .../UngroupedAggregateRegionObserver.java | 47 ++-- .../org/apache/phoenix/hbase/index/Indexer.java | 20 +- 3 files changed, 230 insertions(+), 85 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/96234fa3/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/DelegateRegionObserver.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/DelegateRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/DelegateRegionObserver.java index 1b321b8..59b2271 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/DelegateRegionObserver.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/DelegateRegionObserver.java @@ -18,6 +18,7 @@ package org.apache.phoenix.coprocessor; import java.io.IOException; +import java.security.PrivilegedExceptionAction; import java.util.List; import java.util.NavigableSet; @@ -57,6 +58,7 @@ import org.apache.hadoop.hbase.regionserver.StoreFile.Reader; import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; import org.apache.hadoop.hbase.regionserver.wal.HLogKey; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; +import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.wal.WALKey; @@ -123,115 +125,235 @@ public class DelegateRegionObserver implements RegionObserver { delegate.postFlush(c, store, resultFile); } - @Override - public void preCompactSelection(ObserverContext<RegionCoprocessorEnvironment> c, Store store, - List<StoreFile> candidates, CompactionRequest request) throws IOException { - delegate.preCompactSelection(c, store, candidates, request); - } - - @Override - public void preCompactSelection(ObserverContext<RegionCoprocessorEnvironment> c, Store store, - List<StoreFile> candidates) throws IOException { - delegate.preCompactSelection(c, store, candidates); - } - - @Override - public void postCompactSelection(ObserverContext<RegionCoprocessorEnvironment> c, Store store, - ImmutableList<StoreFile> selected, CompactionRequest request) { - delegate.postCompactSelection(c, store, selected, request); - } - - @Override - public void postCompactSelection(ObserverContext<RegionCoprocessorEnvironment> c, Store store, - ImmutableList<StoreFile> selected) { - delegate.postCompactSelection(c, store, selected); - } - - @Override - public InternalScanner preCompact(ObserverContext<RegionCoprocessorEnvironment> c, Store store, - InternalScanner scanner, ScanType scanType, CompactionRequest request) + // Compaction and split upcalls run with the effective user context of the requesting user. + // This will lead to failure of cross cluster RPC if the effective user is not + // the login user. Switch to the login user context to ensure we have the expected + // security context. + + @Override + public void preCompactSelection(final ObserverContext<RegionCoprocessorEnvironment> c, final Store store, + final List<StoreFile> candidates, final CompactionRequest request) throws IOException { + User.runAsLoginUser(new PrivilegedExceptionAction<Void>() { + @Override + public Void run() throws Exception { + delegate.preCompactSelection(c, store, candidates, request); + return null; + } + }); + } + + @Override + public void preCompactSelection(final ObserverContext<RegionCoprocessorEnvironment> c, final Store store, + final List<StoreFile> candidates) throws IOException { + User.runAsLoginUser(new PrivilegedExceptionAction<Void>() { + @Override + public Void run() throws Exception { + delegate.preCompactSelection(c, store, candidates); + return null; + } + }); + } + + @Override + public void postCompactSelection(final ObserverContext<RegionCoprocessorEnvironment> c, final Store store, + final ImmutableList<StoreFile> selected, final CompactionRequest request) { + try { + User.runAsLoginUser(new PrivilegedExceptionAction<Void>() { + @Override + public Void run() throws Exception { + delegate.postCompactSelection(c, store, selected, request); + return null; + } + }); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Override + public void postCompactSelection(final ObserverContext<RegionCoprocessorEnvironment> c, final Store store, + final ImmutableList<StoreFile> selected) { + try { + User.runAsLoginUser(new PrivilegedExceptionAction<Void>() { + @Override + public Void run() throws Exception { + delegate.postCompactSelection(c, store, selected); + return null; + } + }); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Override + public InternalScanner preCompact(final ObserverContext<RegionCoprocessorEnvironment> c, final Store store, + final InternalScanner scanner, final ScanType scanType, final CompactionRequest request) throws IOException { - return delegate.preCompact(c, store, scanner, scanType, request); + return User.runAsLoginUser(new PrivilegedExceptionAction<InternalScanner>() { + @Override + public InternalScanner run() throws Exception { + return delegate.preCompact(c, store, scanner, scanType, request); + } + }); } @Override - public InternalScanner preCompact(ObserverContext<RegionCoprocessorEnvironment> c, Store store, - InternalScanner scanner, ScanType scanType) throws IOException { - return delegate.preCompact(c, store, scanner, scanType); + public InternalScanner preCompact(final ObserverContext<RegionCoprocessorEnvironment> c, final Store store, + final InternalScanner scanner, final ScanType scanType) throws IOException { + return User.runAsLoginUser(new PrivilegedExceptionAction<InternalScanner>() { + @Override + public InternalScanner run() throws Exception { + return delegate.preCompact(c, store, scanner, scanType); + } + }); } @Override - public InternalScanner preCompactScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c, - Store store, List<? extends KeyValueScanner> scanners, ScanType scanType, - long earliestPutTs, InternalScanner s, CompactionRequest request) throws IOException { - return delegate.preCompactScannerOpen(c, store, scanners, scanType, earliestPutTs, s, - request); + public InternalScanner preCompactScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c, + final Store store, final List<? extends KeyValueScanner> scanners, final ScanType scanType, + final long earliestPutTs, final InternalScanner s, final CompactionRequest request) throws IOException { + return User.runAsLoginUser(new PrivilegedExceptionAction<InternalScanner>() { + @Override + public InternalScanner run() throws Exception { + return delegate.preCompactScannerOpen(c, store, scanners, scanType, earliestPutTs, s, + request); + } + }); } @Override - public InternalScanner preCompactScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c, - Store store, List<? extends KeyValueScanner> scanners, ScanType scanType, - long earliestPutTs, InternalScanner s) throws IOException { - return delegate.preCompactScannerOpen(c, store, scanners, scanType, earliestPutTs, s); + public InternalScanner preCompactScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c, + final Store store, final List<? extends KeyValueScanner> scanners, final ScanType scanType, + final long earliestPutTs, final InternalScanner s) throws IOException { + return User.runAsLoginUser(new PrivilegedExceptionAction<InternalScanner>() { + @Override + public InternalScanner run() throws Exception { + return delegate.preCompactScannerOpen(c, store, scanners, scanType, earliestPutTs, s); + } + }); } @Override - public void postCompact(ObserverContext<RegionCoprocessorEnvironment> c, Store store, - StoreFile resultFile, CompactionRequest request) throws IOException { - delegate.postCompact(c, store, resultFile, request); + public void postCompact(final ObserverContext<RegionCoprocessorEnvironment> c, final Store store, + final StoreFile resultFile, final CompactionRequest request) throws IOException { + User.runAsLoginUser(new PrivilegedExceptionAction<Void>() { + @Override + public Void run() throws Exception { + delegate.postCompact(c, store, resultFile, request); + return null; + } + }); } @Override - public void postCompact(ObserverContext<RegionCoprocessorEnvironment> c, Store store, - StoreFile resultFile) throws IOException { - delegate.postCompact(c, store, resultFile); + public void postCompact(final ObserverContext<RegionCoprocessorEnvironment> c, final Store store, + final StoreFile resultFile) throws IOException { + User.runAsLoginUser(new PrivilegedExceptionAction<Void>() { + @Override + public Void run() throws Exception { + delegate.postCompact(c, store, resultFile); + return null; + } + }); } @Override - public void preSplit(ObserverContext<RegionCoprocessorEnvironment> c) throws IOException { - delegate.preSplit(c); + public void preSplit(final ObserverContext<RegionCoprocessorEnvironment> c) throws IOException { + User.runAsLoginUser(new PrivilegedExceptionAction<Void>() { + @Override + public Void run() throws Exception { + delegate.preSplit(c); + return null; + } + }); } @Override - public void preSplit(ObserverContext<RegionCoprocessorEnvironment> c, byte[] splitRow) + public void preSplit(final ObserverContext<RegionCoprocessorEnvironment> c, final byte[] splitRow) throws IOException { - delegate.preSplit(c, splitRow); + User.runAsLoginUser(new PrivilegedExceptionAction<Void>() { + @Override + public Void run() throws Exception { + delegate.preSplit(c, splitRow); + return null; + } + }); } @Override - public void postSplit(ObserverContext<RegionCoprocessorEnvironment> c, Region l, Region r) + public void postSplit(final ObserverContext<RegionCoprocessorEnvironment> c, final Region l, final Region r) throws IOException { - delegate.postSplit(c, l, r); + User.runAsLoginUser(new PrivilegedExceptionAction<Void>() { + @Override + public Void run() throws Exception { + delegate.postSplit(c, l, r); + return null; + } + }); } @Override - public void preSplitBeforePONR(ObserverContext<RegionCoprocessorEnvironment> ctx, - byte[] splitKey, List<Mutation> metaEntries) throws IOException { - delegate.preSplitBeforePONR(ctx, splitKey, metaEntries); + public void preSplitBeforePONR(final ObserverContext<RegionCoprocessorEnvironment> ctx, + final byte[] splitKey, final List<Mutation> metaEntries) throws IOException { + User.runAsLoginUser(new PrivilegedExceptionAction<Void>() { + @Override + public Void run() throws Exception { + delegate.preSplitBeforePONR(ctx, splitKey, metaEntries); + return null; + } + }); } @Override - public void preSplitAfterPONR(ObserverContext<RegionCoprocessorEnvironment> ctx) + public void preSplitAfterPONR(final ObserverContext<RegionCoprocessorEnvironment> ctx) throws IOException { - delegate.preSplitAfterPONR(ctx); + User.runAsLoginUser(new PrivilegedExceptionAction<Void>() { + @Override + public Void run() throws Exception { + delegate.preSplitAfterPONR(ctx); + return null; + } + }); } @Override - public void preRollBackSplit(ObserverContext<RegionCoprocessorEnvironment> ctx) + public void preRollBackSplit(final ObserverContext<RegionCoprocessorEnvironment> ctx) throws IOException { - delegate.preRollBackSplit(ctx); + User.runAsLoginUser(new PrivilegedExceptionAction<Void>() { + @Override + public Void run() throws Exception { + delegate.preRollBackSplit(ctx); + return null; + } + }); } @Override - public void postRollBackSplit(ObserverContext<RegionCoprocessorEnvironment> ctx) + public void postRollBackSplit(final ObserverContext<RegionCoprocessorEnvironment> ctx) throws IOException { - delegate.postRollBackSplit(ctx); + User.runAsLoginUser(new PrivilegedExceptionAction<Void>() { + @Override + public Void run() throws Exception { + delegate.postRollBackSplit(ctx); + return null; + } + }); } @Override - public void postCompleteSplit(ObserverContext<RegionCoprocessorEnvironment> ctx) + public void postCompleteSplit(final ObserverContext<RegionCoprocessorEnvironment> ctx) throws IOException { - delegate.postCompleteSplit(ctx); + // NOTE: This one is an exception and doesn't need a context change. Should + // be infrequent and overhead is low, so let's ensure we have the right context + // anyway to avoid potential surprise. + User.runAsLoginUser(new PrivilegedExceptionAction<Void>() { + @Override + public Void run() throws Exception { + delegate.postCompleteSplit(ctx); + return null; + } + }); } @Override http://git-wip-us.apache.org/repos/asf/phoenix/blob/96234fa3/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java index d474665..d783670 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java @@ -30,6 +30,7 @@ import java.io.ByteArrayOutputStream; import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.IOException; +import java.security.PrivilegedExceptionAction; import java.sql.SQLException; import java.util.ArrayList; import java.util.Arrays; @@ -58,6 +59,7 @@ import org.apache.hadoop.hbase.regionserver.Region; import org.apache.hadoop.hbase.regionserver.RegionScanner; import org.apache.hadoop.hbase.regionserver.ScanType; import org.apache.hadoop.hbase.regionserver.Store; +import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.io.WritableUtils; @@ -584,26 +586,35 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver } @Override - public InternalScanner preCompact(ObserverContext<RegionCoprocessorEnvironment> c, final Store store, - InternalScanner scanner, final ScanType scanType) throws IOException { - TableName table = c.getEnvironment().getRegion().getRegionInfo().getTable(); - InternalScanner internalScanner = scanner; - if (scanType.equals(ScanType.COMPACT_DROP_DELETES)) { - try { - long clientTimeStamp = TimeKeeper.SYSTEM.getCurrentTime(); - StatisticsCollector stats = StatisticsCollectorFactory.createStatisticsCollector( - c.getEnvironment(), table.getNameAsString(), clientTimeStamp, - store.getFamily().getName()); - internalScanner = stats.createCompactionScanner(c.getEnvironment(), store, scanner); - } catch (IOException e) { - // If we can't reach the stats table, don't interrupt the normal - // compaction operation, just log a warning. - if (logger.isWarnEnabled()) { - logger.warn("Unable to collect stats for " + table, e); + public InternalScanner preCompact(final ObserverContext<RegionCoprocessorEnvironment> c, final Store store, + final InternalScanner scanner, final ScanType scanType) throws IOException { + // Compaction and split upcalls run with the effective user context of the requesting user. + // This will lead to failure of cross cluster RPC if the effective user is not + // the login user. Switch to the login user context to ensure we have the expected + // security context. + return User.runAsLoginUser(new PrivilegedExceptionAction<InternalScanner>() { + @Override + public InternalScanner run() throws Exception { + TableName table = c.getEnvironment().getRegion().getRegionInfo().getTable(); + InternalScanner internalScanner = scanner; + if (scanType.equals(ScanType.COMPACT_DROP_DELETES)) { + try { + long clientTimeStamp = TimeKeeper.SYSTEM.getCurrentTime(); + StatisticsCollector stats = StatisticsCollectorFactory.createStatisticsCollector( + c.getEnvironment(), table.getNameAsString(), clientTimeStamp, + store.getFamily().getName()); + internalScanner = stats.createCompactionScanner(c.getEnvironment(), store, scanner); + } catch (IOException e) { + // If we can't reach the stats table, don't interrupt the normal + // compaction operation, just log a warning. + if (logger.isWarnEnabled()) { + logger.warn("Unable to collect stats for " + table, e); + } + } } + return internalScanner; } - } - return internalScanner; + }); } private static PTable deserializeTable(byte[] b) { http://git-wip-us.apache.org/repos/asf/phoenix/blob/96234fa3/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java index 2811c43..6e5edee 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java @@ -20,6 +20,7 @@ package org.apache.phoenix.hbase.index; import static org.apache.phoenix.hbase.index.util.IndexManagementUtil.rethrowIndexingException; import java.io.IOException; +import java.security.PrivilegedExceptionAction; import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; @@ -52,6 +53,7 @@ import org.apache.hadoop.hbase.regionserver.ScanType; import org.apache.hadoop.hbase.regionserver.Store; import org.apache.hadoop.hbase.regionserver.wal.HLogKey; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; +import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; import org.apache.phoenix.hbase.index.builder.IndexBuildManager; @@ -521,10 +523,20 @@ public class Indexer extends BaseRegionObserver { * for these rows as those points still existed. TODO: v2 of indexing */ @Override - public InternalScanner preCompactScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c, - Store store, List<? extends KeyValueScanner> scanners, ScanType scanType, long earliestPutTs, - InternalScanner s) throws IOException { - return super.preCompactScannerOpen(c, store, scanners, scanType, earliestPutTs, s); + public InternalScanner preCompactScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c, + final Store store, final List<? extends KeyValueScanner> scanners, final ScanType scanType, + final long earliestPutTs, final InternalScanner s) throws IOException { + // Compaction and split upcalls run with the effective user context of the requesting user. + // This will lead to failure of cross cluster RPC if the effective user is not + // the login user. Switch to the login user context to ensure we have the expected + // security context. + // NOTE: Not necessary here at this time but leave in place to document this critical detail. + return User.runAsLoginUser(new PrivilegedExceptionAction<InternalScanner>() { + @Override + public InternalScanner run() throws Exception { + return Indexer.super.preCompactScannerOpen(c, store, scanners, scanType, earliestPutTs, s); + } + }); } /**