PHOENIX-3037 Setup proper security context in compaction/split coprocessor hooks


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/96234fa3
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/96234fa3
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/96234fa3

Branch: refs/heads/4.x-HBase-1.1
Commit: 96234fa32124a5dc731709e130b81042463ef11d
Parents: 1d29ec0
Author: Andrew Purtell <apurt...@apache.org>
Authored: Wed Jun 29 12:22:04 2016 -0700
Committer: Andrew Purtell <apurt...@apache.org>
Committed: Thu Jun 30 14:24:19 2016 -0700

----------------------------------------------------------------------
 .../coprocessor/DelegateRegionObserver.java     | 248 ++++++++++++++-----
 .../UngroupedAggregateRegionObserver.java       |  47 ++--
 .../org/apache/phoenix/hbase/index/Indexer.java |  20 +-
 3 files changed, 230 insertions(+), 85 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/96234fa3/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/DelegateRegionObserver.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/DelegateRegionObserver.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/DelegateRegionObserver.java
index 1b321b8..59b2271 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/DelegateRegionObserver.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/DelegateRegionObserver.java
@@ -18,6 +18,7 @@
 package org.apache.phoenix.coprocessor;
 
 import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
 import java.util.List;
 import java.util.NavigableSet;
 
@@ -57,6 +58,7 @@ import org.apache.hadoop.hbase.regionserver.StoreFile.Reader;
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
 import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+import org.apache.hadoop.hbase.security.User;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.wal.WALKey;
 
@@ -123,115 +125,235 @@ public class DelegateRegionObserver implements 
RegionObserver {
         delegate.postFlush(c, store, resultFile);
     }
 
-    @Override
-    public void 
preCompactSelection(ObserverContext<RegionCoprocessorEnvironment> c, Store 
store,
-            List<StoreFile> candidates, CompactionRequest request) throws 
IOException {
-        delegate.preCompactSelection(c, store, candidates, request);
-    }
-
-    @Override
-    public void 
preCompactSelection(ObserverContext<RegionCoprocessorEnvironment> c, Store 
store,
-            List<StoreFile> candidates) throws IOException {
-        delegate.preCompactSelection(c, store, candidates);
-    }
-
-    @Override
-    public void 
postCompactSelection(ObserverContext<RegionCoprocessorEnvironment> c, Store 
store,
-            ImmutableList<StoreFile> selected, CompactionRequest request) {
-        delegate.postCompactSelection(c, store, selected, request);
-    }
-
-    @Override
-    public void 
postCompactSelection(ObserverContext<RegionCoprocessorEnvironment> c, Store 
store,
-            ImmutableList<StoreFile> selected) {
-        delegate.postCompactSelection(c, store, selected);
-    }
-
-    @Override
-    public InternalScanner 
preCompact(ObserverContext<RegionCoprocessorEnvironment> c, Store store,
-            InternalScanner scanner, ScanType scanType, CompactionRequest 
request)
+    // Compaction and split upcalls run with the effective user context of the 
requesting user.
+    // This will lead to failure of cross cluster RPC if the effective user is 
not
+    // the login user. Switch to the login user context to ensure we have the 
expected
+    // security context.
+
+    @Override    
+    public void preCompactSelection(final 
ObserverContext<RegionCoprocessorEnvironment> c, final Store store,
+            final List<StoreFile> candidates, final CompactionRequest request) 
throws IOException {
+        User.runAsLoginUser(new PrivilegedExceptionAction<Void>() {
+            @Override
+            public Void run() throws Exception {
+                delegate.preCompactSelection(c, store, candidates, request);
+                return null;
+            }
+        });
+    }
+
+    @Override
+    public void preCompactSelection(final 
ObserverContext<RegionCoprocessorEnvironment> c, final Store store,
+            final List<StoreFile> candidates) throws IOException {
+        User.runAsLoginUser(new PrivilegedExceptionAction<Void>() {
+            @Override
+            public Void run() throws Exception {
+                delegate.preCompactSelection(c, store, candidates);
+                return null;
+            }
+        });
+    }
+
+    @Override
+    public void postCompactSelection(final 
ObserverContext<RegionCoprocessorEnvironment> c, final Store store,
+            final ImmutableList<StoreFile> selected, final CompactionRequest 
request) {
+        try {
+            User.runAsLoginUser(new PrivilegedExceptionAction<Void>() {
+                @Override
+                public Void run() throws Exception {
+                    delegate.postCompactSelection(c, store, selected, request);
+                    return null;
+                }
+            });
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    public void postCompactSelection(final 
ObserverContext<RegionCoprocessorEnvironment> c, final Store store,
+            final ImmutableList<StoreFile> selected) {
+        try {
+            User.runAsLoginUser(new PrivilegedExceptionAction<Void>() {
+                @Override
+                public Void run() throws Exception {
+                    delegate.postCompactSelection(c, store, selected);
+                    return null;
+                }
+            });
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    public InternalScanner preCompact(final 
ObserverContext<RegionCoprocessorEnvironment> c, final Store store,
+            final InternalScanner scanner, final ScanType scanType, final 
CompactionRequest request)
             throws IOException {
-        return delegate.preCompact(c, store, scanner, scanType, request);
+        return User.runAsLoginUser(new 
PrivilegedExceptionAction<InternalScanner>() {
+            @Override
+            public InternalScanner run() throws Exception {
+                return delegate.preCompact(c, store, scanner, scanType, 
request);
+            }
+        });
     }
 
     @Override
-    public InternalScanner 
preCompact(ObserverContext<RegionCoprocessorEnvironment> c, Store store,
-            InternalScanner scanner, ScanType scanType) throws IOException {
-        return delegate.preCompact(c, store, scanner, scanType);
+    public InternalScanner preCompact(final 
ObserverContext<RegionCoprocessorEnvironment> c, final Store store,
+            final InternalScanner scanner, final ScanType scanType) throws 
IOException {
+        return User.runAsLoginUser(new 
PrivilegedExceptionAction<InternalScanner>() {
+            @Override
+            public InternalScanner run() throws Exception {
+                return delegate.preCompact(c, store, scanner, scanType);
+            }
+        });
     }
 
     @Override
-    public InternalScanner 
preCompactScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c,
-            Store store, List<? extends KeyValueScanner> scanners, ScanType 
scanType,
-            long earliestPutTs, InternalScanner s, CompactionRequest request) 
throws IOException {
-        return delegate.preCompactScannerOpen(c, store, scanners, scanType, 
earliestPutTs, s,
-            request);
+    public InternalScanner preCompactScannerOpen(final 
ObserverContext<RegionCoprocessorEnvironment> c,
+            final Store store, final List<? extends KeyValueScanner> scanners, 
final ScanType scanType,
+            final long earliestPutTs, final InternalScanner s, final 
CompactionRequest request) throws IOException {
+        return User.runAsLoginUser(new 
PrivilegedExceptionAction<InternalScanner>() {
+            @Override
+            public InternalScanner run() throws Exception {
+                return delegate.preCompactScannerOpen(c, store, scanners, 
scanType, earliestPutTs, s,
+                  request);
+            }
+        });
     }
 
     @Override
-    public InternalScanner 
preCompactScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c,
-            Store store, List<? extends KeyValueScanner> scanners, ScanType 
scanType,
-            long earliestPutTs, InternalScanner s) throws IOException {
-        return delegate.preCompactScannerOpen(c, store, scanners, scanType, 
earliestPutTs, s);
+    public InternalScanner preCompactScannerOpen(final 
ObserverContext<RegionCoprocessorEnvironment> c,
+            final Store store, final List<? extends KeyValueScanner> scanners, 
final ScanType scanType,
+            final long earliestPutTs, final InternalScanner s) throws 
IOException {
+        return User.runAsLoginUser(new 
PrivilegedExceptionAction<InternalScanner>() {
+            @Override
+            public InternalScanner run() throws Exception {
+                return delegate.preCompactScannerOpen(c, store, scanners, 
scanType, earliestPutTs, s);
+            }
+        });
     }
 
     @Override
-    public void postCompact(ObserverContext<RegionCoprocessorEnvironment> c, 
Store store,
-            StoreFile resultFile, CompactionRequest request) throws 
IOException {
-        delegate.postCompact(c, store, resultFile, request);
+    public void postCompact(final 
ObserverContext<RegionCoprocessorEnvironment> c, final Store store,
+            final StoreFile resultFile, final CompactionRequest request) 
throws IOException {
+        User.runAsLoginUser(new PrivilegedExceptionAction<Void>() {
+            @Override
+            public Void run() throws Exception {
+              delegate.postCompact(c, store, resultFile, request);
+              return null;
+            }
+        });
     }
 
     @Override
-    public void postCompact(ObserverContext<RegionCoprocessorEnvironment> c, 
Store store,
-            StoreFile resultFile) throws IOException {
-        delegate.postCompact(c, store, resultFile);
+    public void postCompact(final 
ObserverContext<RegionCoprocessorEnvironment> c, final Store store,
+            final StoreFile resultFile) throws IOException {
+        User.runAsLoginUser(new PrivilegedExceptionAction<Void>() {
+            @Override
+            public Void run() throws Exception {
+                delegate.postCompact(c, store, resultFile);
+                return null;
+            }
+        });
     }
 
     @Override
-    public void preSplit(ObserverContext<RegionCoprocessorEnvironment> c) 
throws IOException {
-        delegate.preSplit(c);
+    public void preSplit(final ObserverContext<RegionCoprocessorEnvironment> 
c) throws IOException {
+        User.runAsLoginUser(new PrivilegedExceptionAction<Void>() {
+            @Override
+            public Void run() throws Exception {
+                delegate.preSplit(c);
+                return null;
+            }
+        });
     }
 
     @Override
-    public void preSplit(ObserverContext<RegionCoprocessorEnvironment> c, 
byte[] splitRow)
+    public void preSplit(final ObserverContext<RegionCoprocessorEnvironment> 
c, final byte[] splitRow)
             throws IOException {
-        delegate.preSplit(c, splitRow);
+        User.runAsLoginUser(new PrivilegedExceptionAction<Void>() {
+            @Override
+            public Void run() throws Exception {
+                delegate.preSplit(c, splitRow);
+                return null;
+            }
+        });
     }
 
     @Override
-    public void postSplit(ObserverContext<RegionCoprocessorEnvironment> c, 
Region l, Region r)
+    public void postSplit(final ObserverContext<RegionCoprocessorEnvironment> 
c, final Region l, final Region r)
             throws IOException {
-        delegate.postSplit(c, l, r);
+        User.runAsLoginUser(new PrivilegedExceptionAction<Void>() {
+            @Override
+            public Void run() throws Exception {
+                delegate.postSplit(c, l, r);
+                return null;
+            }
+        });
     }
 
     @Override
-    public void 
preSplitBeforePONR(ObserverContext<RegionCoprocessorEnvironment> ctx,
-            byte[] splitKey, List<Mutation> metaEntries) throws IOException {
-        delegate.preSplitBeforePONR(ctx, splitKey, metaEntries);
+    public void preSplitBeforePONR(final 
ObserverContext<RegionCoprocessorEnvironment> ctx,
+            final byte[] splitKey, final List<Mutation> metaEntries) throws 
IOException {
+        User.runAsLoginUser(new PrivilegedExceptionAction<Void>() {
+            @Override
+            public Void run() throws Exception {
+                delegate.preSplitBeforePONR(ctx, splitKey, metaEntries);
+                return null;
+            }
+        });
     }
 
     @Override
-    public void 
preSplitAfterPONR(ObserverContext<RegionCoprocessorEnvironment> ctx)
+    public void preSplitAfterPONR(final 
ObserverContext<RegionCoprocessorEnvironment> ctx)
             throws IOException {
-        delegate.preSplitAfterPONR(ctx);
+        User.runAsLoginUser(new PrivilegedExceptionAction<Void>() {
+            @Override
+            public Void run() throws Exception {
+                delegate.preSplitAfterPONR(ctx);
+                return null;
+            }
+        });
     }
 
     @Override
-    public void preRollBackSplit(ObserverContext<RegionCoprocessorEnvironment> 
ctx)
+    public void preRollBackSplit(final 
ObserverContext<RegionCoprocessorEnvironment> ctx)
             throws IOException {
-        delegate.preRollBackSplit(ctx);
+        User.runAsLoginUser(new PrivilegedExceptionAction<Void>() {
+            @Override
+            public Void run() throws Exception {
+                delegate.preRollBackSplit(ctx);
+                return null;
+            }
+        });
     }
 
     @Override
-    public void 
postRollBackSplit(ObserverContext<RegionCoprocessorEnvironment> ctx)
+    public void postRollBackSplit(final 
ObserverContext<RegionCoprocessorEnvironment> ctx)
             throws IOException {
-        delegate.postRollBackSplit(ctx);
+        User.runAsLoginUser(new PrivilegedExceptionAction<Void>() {
+            @Override
+            public Void run() throws Exception {
+                delegate.postRollBackSplit(ctx);
+                return null;
+            }
+        });
     }
 
     @Override
-    public void 
postCompleteSplit(ObserverContext<RegionCoprocessorEnvironment> ctx)
+    public void postCompleteSplit(final 
ObserverContext<RegionCoprocessorEnvironment> ctx)
             throws IOException {
-        delegate.postCompleteSplit(ctx);
+        // NOTE: This one is an exception and doesn't need a context change. 
Should
+        // be infrequent and overhead is low, so let's ensure we have the 
right context
+        // anyway to avoid potential surprise.
+        User.runAsLoginUser(new PrivilegedExceptionAction<Void>() {
+            @Override
+            public Void run() throws Exception {
+                delegate.postCompleteSplit(ctx);
+                return null;
+            }
+        });
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/phoenix/blob/96234fa3/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
index d474665..d783670 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
@@ -30,6 +30,7 @@ import java.io.ByteArrayOutputStream;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
 import java.sql.SQLException;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -58,6 +59,7 @@ import org.apache.hadoop.hbase.regionserver.Region;
 import org.apache.hadoop.hbase.regionserver.RegionScanner;
 import org.apache.hadoop.hbase.regionserver.ScanType;
 import org.apache.hadoop.hbase.regionserver.Store;
+import org.apache.hadoop.hbase.security.User;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.io.WritableUtils;
@@ -584,26 +586,35 @@ public class UngroupedAggregateRegionObserver extends 
BaseScannerRegionObserver
     }
 
     @Override
-    public InternalScanner 
preCompact(ObserverContext<RegionCoprocessorEnvironment> c, final Store store,
-            InternalScanner scanner, final ScanType scanType) throws 
IOException {
-        TableName table = 
c.getEnvironment().getRegion().getRegionInfo().getTable();
-        InternalScanner internalScanner = scanner;
-        if (scanType.equals(ScanType.COMPACT_DROP_DELETES)) {
-            try {
-                long clientTimeStamp = TimeKeeper.SYSTEM.getCurrentTime();
-                StatisticsCollector stats = 
StatisticsCollectorFactory.createStatisticsCollector(
-                        c.getEnvironment(), table.getNameAsString(), 
clientTimeStamp,
-                        store.getFamily().getName());
-                internalScanner = 
stats.createCompactionScanner(c.getEnvironment(), store, scanner);
-            } catch (IOException e) {
-                // If we can't reach the stats table, don't interrupt the 
normal
-                // compaction operation, just log a warning.
-                if (logger.isWarnEnabled()) {
-                    logger.warn("Unable to collect stats for " + table, e);
+    public InternalScanner preCompact(final 
ObserverContext<RegionCoprocessorEnvironment> c, final Store store,
+            final InternalScanner scanner, final ScanType scanType) throws 
IOException {
+        // Compaction and split upcalls run with the effective user context of 
the requesting user.
+        // This will lead to failure of cross cluster RPC if the effective 
user is not
+        // the login user. Switch to the login user context to ensure we have 
the expected
+        // security context.
+        return User.runAsLoginUser(new 
PrivilegedExceptionAction<InternalScanner>() {
+            @Override
+            public InternalScanner run() throws Exception {
+                TableName table = 
c.getEnvironment().getRegion().getRegionInfo().getTable();
+                InternalScanner internalScanner = scanner;
+                if (scanType.equals(ScanType.COMPACT_DROP_DELETES)) {
+                    try {
+                        long clientTimeStamp = 
TimeKeeper.SYSTEM.getCurrentTime();
+                        StatisticsCollector stats = 
StatisticsCollectorFactory.createStatisticsCollector(
+                            c.getEnvironment(), table.getNameAsString(), 
clientTimeStamp,
+                            store.getFamily().getName());
+                        internalScanner = 
stats.createCompactionScanner(c.getEnvironment(), store, scanner);
+                    } catch (IOException e) {
+                        // If we can't reach the stats table, don't interrupt 
the normal
+                      // compaction operation, just log a warning.
+                      if (logger.isWarnEnabled()) {
+                          logger.warn("Unable to collect stats for " + table, 
e);
+                      }
+                    }
                 }
+                return internalScanner;
             }
-        }
-        return internalScanner;
+        });
     }
 
     private static PTable deserializeTable(byte[] b) {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/96234fa3/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java 
b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java
index 2811c43..6e5edee 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java
@@ -20,6 +20,7 @@ package org.apache.phoenix.hbase.index;
 import static 
org.apache.phoenix.hbase.index.util.IndexManagementUtil.rethrowIndexingException;
 
 import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
@@ -52,6 +53,7 @@ import org.apache.hadoop.hbase.regionserver.ScanType;
 import org.apache.hadoop.hbase.regionserver.Store;
 import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+import org.apache.hadoop.hbase.security.User;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.phoenix.hbase.index.builder.IndexBuildManager;
@@ -521,10 +523,20 @@ public class Indexer extends BaseRegionObserver {
    * for these rows as those points still existed. TODO: v2 of indexing
    */
   @Override
-  public InternalScanner 
preCompactScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c,
-      Store store, List<? extends KeyValueScanner> scanners, ScanType 
scanType, long earliestPutTs,
-      InternalScanner s) throws IOException {
-    return super.preCompactScannerOpen(c, store, scanners, scanType, 
earliestPutTs, s);
+  public InternalScanner preCompactScannerOpen(final 
ObserverContext<RegionCoprocessorEnvironment> c,
+          final Store store, final List<? extends KeyValueScanner> scanners, 
final ScanType scanType,
+          final long earliestPutTs, final InternalScanner s) throws 
IOException {
+      // Compaction and split upcalls run with the effective user context of 
the requesting user.
+      // This will lead to failure of cross cluster RPC if the effective user 
is not
+      // the login user. Switch to the login user context to ensure we have 
the expected
+      // security context.
+      // NOTE: Not necessary here at this time but leave in place to document 
this critical detail.
+      return User.runAsLoginUser(new 
PrivilegedExceptionAction<InternalScanner>() {
+          @Override
+          public InternalScanner run() throws Exception {
+              return Indexer.super.preCompactScannerOpen(c, store, scanners, 
scanType, earliestPutTs, s);
+          }
+      });
   }
 
   /**

Reply via email to