Repository: hbase
Updated Branches:
  refs/heads/0.98 0e32f563a -> cc7eb46ee


HBASE-11384 - [Visibility Controller]Check for users covering
authorizations for every mutation (Ram)


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/cc7eb46e
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/cc7eb46e
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/cc7eb46e

Branch: refs/heads/0.98
Commit: cc7eb46ee453ba0d09650b72ce830a475c25f82d
Parents: 0e32f56
Author: Ramkrishna <[email protected]>
Authored: Fri Aug 1 13:25:36 2014 -0700
Committer: Ramkrishna <[email protected]>
Committed: Fri Aug 1 13:25:36 2014 -0700

----------------------------------------------------------------------
 .../visibility/VisibilityConstants.java         |   3 +
 .../src/main/resources/hbase-default.xml        |   8 +
 .../visibility/VisibilityController.java        |  54 +++--
 .../visibility/VisibilityLabelsManager.java     |  15 ++
 .../TestVisibilityWithCheckAuths.java           | 231 +++++++++++++++++++
 5 files changed, 298 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/cc7eb46e/hbase-client/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityConstants.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityConstants.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityConstants.java
index f98efec..d91f0ef 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityConstants.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityConstants.java
@@ -49,4 +49,7 @@ public final class VisibilityConstants {
   public static final byte[] SORTED_ORDINAL_SERIALIZATION_FORMAT =
       new byte[] { VISIBILITY_SERIALIZATION_VERSION };
 
+  public static final String CHECK_AUTHS_FOR_MUTATION = 
+      "hbase.security.visibility.mutations.checkauths";
+
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/cc7eb46e/hbase-common/src/main/resources/hbase-default.xml
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/resources/hbase-default.xml 
b/hbase-common/src/main/resources/hbase-default.xml
index 9bc3c59..dac527b 100644
--- a/hbase-common/src/main/resources/hbase-default.xml
+++ b/hbase-common/src/main/resources/hbase-default.xml
@@ -1210,4 +1210,12 @@ possible configurations would overwhelm and obscure the 
important.
     procedure. After implementing your own MasterProcedureManager, just put it 
in HBase's classpath
     and add the fully qualified class name here.</description>
   </property>
+  <property>
+    <name>hbase.security.visibility.mutations.checkauths</name>
+    <value>false</value>
+    <description>
+      This property if enabled, will check whether the labels in the 
visibility expression are associated
+      with the user issuing the mutation
+    </description>
+  </property>
 </configuration>

http://git-wip-us.apache.org/repos/asf/hbase/blob/cc7eb46e/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityController.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityController.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityController.java
index 42e9ac6..7d1d742 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityController.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityController.java
@@ -35,6 +35,7 @@ import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -114,6 +115,7 @@ import 
org.apache.hadoop.hbase.security.visibility.expression.LeafExpressionNode
 import 
org.apache.hadoop.hbase.security.visibility.expression.NonLeafExpressionNode;
 import org.apache.hadoop.hbase.security.visibility.expression.Operator;
 import org.apache.hadoop.hbase.util.ByteRange;
+import org.apache.hadoop.hbase.util.ByteStringer;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.util.SimpleByteRange;
@@ -122,7 +124,6 @@ import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
 import com.google.common.collect.Lists;
 import com.google.common.collect.MapMaker;
 import com.google.protobuf.ByteString;
-import org.apache.hadoop.hbase.util.ByteStringer;
 import com.google.protobuf.RpcCallback;
 import com.google.protobuf.RpcController;
 import com.google.protobuf.Service;
@@ -155,6 +156,7 @@ public class VisibilityController extends 
BaseRegionObserver implements MasterOb
   private boolean acOn = false;
   private Configuration conf;
   private volatile boolean initialized = false;
+  private boolean checkAuths = false;
   /** Mapping of scanner instances to the user who created them */
   private Map<InternalScanner,String> scannerOwners =
       new MapMaker().weakKeys().makeMap();
@@ -588,6 +590,8 @@ public class VisibilityController extends 
BaseRegionObserver implements MasterOb
         initialize(e);
       }
     } else {
+      checkAuths = e.getEnvironment().getConfiguration()
+          .getBoolean(VisibilityConstants.CHECK_AUTHS_FOR_MUTATION, false);
       this.initialized = true;
     }
   }
@@ -660,6 +664,11 @@ public class VisibilityController extends 
BaseRegionObserver implements MasterOb
     }
     // TODO this can be made as a global LRU cache at HRS level?
     Map<String, List<Tag>> labelCache = new HashMap<String, List<Tag>>();
+    Set<Integer> auths = null;
+    User user = getActiveUser();
+    if (checkAuths && user != null && user.getShortName() != null) {
+      auths = this.visibilityManager.getAuthsAsOrdinals(user.getShortName());
+    }
     for (int i = 0; i < miniBatchOp.size(); i++) {
       Mutation m = miniBatchOp.getOperation(i);
       CellVisibility cellVisibility = null;
@@ -685,7 +694,7 @@ public class VisibilityController extends 
BaseRegionObserver implements MasterOb
             List<Tag> visibilityTags = labelCache.get(labelsExp);
             if (visibilityTags == null) {
               try {
-                visibilityTags = createVisibilityTags(labelsExp, true);
+                visibilityTags = createVisibilityTags(labelsExp, true, auths, 
user.getShortName());
               } catch (ParseException e) {
                 miniBatchOp.setOperationStatus(i,
                     new OperationStatus(SANITY_CHECK_FAILURE, e.getMessage()));
@@ -745,7 +754,7 @@ public class VisibilityController extends 
BaseRegionObserver implements MasterOb
     if (cellVisibility != null) {
       String labelsExp = cellVisibility.getExpression();
       try {
-        visibilityTags = createVisibilityTags(labelsExp, false);
+        visibilityTags = createVisibilityTags(labelsExp, false, null, null);
       } catch (ParseException e) {
         throw new IOException("Invalid cell visibility expression " + 
labelsExp, e);
       } catch (InvalidLabelException e) {
@@ -879,8 +888,9 @@ public class VisibilityController extends 
BaseRegionObserver implements MasterOb
     return true;
   }
 
-  private List<Tag> createVisibilityTags(String visibilityLabelsExp, boolean 
addSerializationTag)
-      throws IOException, ParseException, InvalidLabelException {
+  private List<Tag> createVisibilityTags(String visibilityLabelsExp, boolean 
addSerializationTag,
+      Set<Integer> auths, String userName) throws IOException, ParseException,
+      InvalidLabelException {
     ExpressionNode node = null;
     node = this.expressionParser.parse(visibilityLabelsExp);
     node = this.expressionExpander.expand(node);
@@ -894,7 +904,7 @@ public class VisibilityController extends 
BaseRegionObserver implements MasterOb
       tags.add(VisibilityUtils.VIS_SERIALIZATION_TAG);
     }
     if (node.isSingleNode()) {
-      getLabelOrdinals(node, labelOrdinals);
+      getLabelOrdinals(node, labelOrdinals, auths, userName);
       writeLabelOrdinalsToStream(labelOrdinals, dos);
       tags.add(new Tag(VisibilityUtils.VISIBILITY_TAG_TYPE, 
baos.toByteArray()));
       baos.reset();
@@ -902,14 +912,14 @@ public class VisibilityController extends 
BaseRegionObserver implements MasterOb
       NonLeafExpressionNode nlNode = (NonLeafExpressionNode) node;
       if (nlNode.getOperator() == Operator.OR) {
         for (ExpressionNode child : nlNode.getChildExps()) {
-          getLabelOrdinals(child, labelOrdinals);
+          getLabelOrdinals(child, labelOrdinals, auths, userName);
           writeLabelOrdinalsToStream(labelOrdinals, dos);
           tags.add(new Tag(VisibilityUtils.VISIBILITY_TAG_TYPE, 
baos.toByteArray()));
           baos.reset();
           labelOrdinals.clear();
         }
       } else {
-        getLabelOrdinals(nlNode, labelOrdinals);
+        getLabelOrdinals(nlNode, labelOrdinals, auths, userName);
         writeLabelOrdinalsToStream(labelOrdinals, dos);
         tags.add(new Tag(VisibilityUtils.VISIBILITY_TAG_TYPE, 
baos.toByteArray()));
         baos.reset();
@@ -926,8 +936,8 @@ public class VisibilityController extends 
BaseRegionObserver implements MasterOb
     }
   }
 
-  private void getLabelOrdinals(ExpressionNode node, List<Integer> 
labelOrdinals)
-      throws IOException, InvalidLabelException {
+  private void getLabelOrdinals(ExpressionNode node, List<Integer> 
labelOrdinals,
+      Set<Integer> auths, String userName) throws IOException, 
InvalidLabelException {
     if (node.isSingleNode()) {
       String identifier = null;
       int labelOrdinal = 0;
@@ -938,12 +948,14 @@ public class VisibilityController extends 
BaseRegionObserver implements MasterOb
           LOG.trace("The identifier is "+identifier);
         }
         labelOrdinal = this.visibilityManager.getLabelOrdinal(identifier);
+        checkAuths(auths, userName, labelOrdinal, identifier);
       } else {
         // This is a NOT node.
         LeafExpressionNode lNode = (LeafExpressionNode) 
((NonLeafExpressionNode) node)
             .getChildExps().get(0);
         identifier = lNode.getIdentifier();
         labelOrdinal = this.visibilityManager.getLabelOrdinal(identifier);
+        checkAuths(auths, userName, labelOrdinal, identifier);
         labelOrdinal = -1 * labelOrdinal; // Store NOT node as -ve ordinal.
       }
       if (labelOrdinal == 0) {
@@ -953,11 +965,21 @@ public class VisibilityController extends 
BaseRegionObserver implements MasterOb
     } else {
       List<ExpressionNode> childExps = ((NonLeafExpressionNode) 
node).getChildExps();
       for (ExpressionNode child : childExps) {
-        getLabelOrdinals(child, labelOrdinals);
+        getLabelOrdinals(child, labelOrdinals, auths, userName);
       }
     }
   }
-  
+
+  private void checkAuths(Set<Integer> auths, String userName, int 
labelOrdinal, String identifier)
+      throws IOException {
+    if (checkAuths && !isSystemOrSuperUser()) {
+      if (auths == null || (!auths.contains(labelOrdinal))) {
+        throw new AccessDeniedException("Visibility label " + identifier
+            + " not authorized for the user " + userName);
+      }
+    }
+  }
+
   @Override
   public RegionScanner 
preScannerOpen(ObserverContext<RegionCoprocessorEnvironment> e, Scan scan,
       RegionScanner s) throws IOException {
@@ -1199,6 +1221,11 @@ public class VisibilityController extends 
BaseRegionObserver implements MasterOb
     if (cellVisibility == null) {
       return newCell;
     }
+    Set<Integer> auths = null;
+    User user = getActiveUser();
+    if (checkAuths && user != null && user.getShortName() != null) {
+      auths = this.visibilityManager.getAuthsAsOrdinals(user.getShortName());
+    }
     // Adding all other tags
     Iterator<Tag> tagsItr = CellUtil.tagsIterator(newCell.getTagsArray(), 
newCell.getTagsOffset(),
         newCell.getTagsLengthUnsigned());
@@ -1210,7 +1237,8 @@ public class VisibilityController extends 
BaseRegionObserver implements MasterOb
       }
     }
     try {
-      tags.addAll(createVisibilityTags(cellVisibility.getExpression(), true));
+      tags.addAll(createVisibilityTags(cellVisibility.getExpression(), true, 
auths,
+          user.getShortName()));
     } catch (ParseException e) {
       throw new IOException(e);
     }

http://git-wip-us.apache.org/repos/asf/hbase/blob/cc7eb46e/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityLabelsManager.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityLabelsManager.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityLabelsManager.java
index 7f1f278..e840c64 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityLabelsManager.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityLabelsManager.java
@@ -173,6 +173,21 @@ public class VisibilityLabelsManager {
   }
 
   /**
+   * Returns the list of ordinals of authentications associated with the user
+   *
+   * @param user
+   * @return the list of ordinals
+   */
+  public Set<Integer> getAuthsAsOrdinals(String user) {
+    this.lock.readLock().lock();
+    try {
+      return userAuths.get(user);
+    } finally {
+      this.lock.readLock().unlock();
+    }
+  }
+
+  /**
    * Writes the labels data to zookeeper node.
    * @param data
    * @param labelsOrUserAuths true for writing labels and false for user auths.

http://git-wip-us.apache.org/repos/asf/hbase/blob/cc7eb46e/hbase-server/src/test/java/org/apache/hadoop/hbase/security/visibility/TestVisibilityWithCheckAuths.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/visibility/TestVisibilityWithCheckAuths.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/visibility/TestVisibilityWithCheckAuths.java
new file mode 100644
index 0000000..c4f667b
--- /dev/null
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/visibility/TestVisibilityWithCheckAuths.java
@@ -0,0 +1,231 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.security.visibility;
+
+import static 
org.apache.hadoop.hbase.security.visibility.VisibilityConstants.LABELS_TABLE_NAME;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.MediumTests;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Append;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import 
org.apache.hadoop.hbase.protobuf.generated.VisibilityLabelsProtos.VisibilityLabelsResponse;
+import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+
+@Category(MediumTests.class)
+/**
+ * Test visibility by setting 'hbase.security.visibility.mutations.checkauths' 
to true
+ */
+public class TestVisibilityWithCheckAuths {
+  private static final String TOPSECRET = "TOPSECRET";
+  private static final String PUBLIC = "PUBLIC";
+  public static final HBaseTestingUtility TEST_UTIL = new 
HBaseTestingUtility();
+  private static final byte[] row1 = Bytes.toBytes("row1");
+  private final static byte[] fam = Bytes.toBytes("info");
+  private final static byte[] qual = Bytes.toBytes("qual");
+  private final static byte[] value = Bytes.toBytes("value");
+  public static Configuration conf;
+
+  @Rule
+  public final TestName TEST_NAME = new TestName();
+  public static User SUPERUSER;
+  public static User USER;
+  @BeforeClass
+  public static void setupBeforeClass() throws Exception {
+    // setup configuration
+    conf = TEST_UTIL.getConfiguration();
+    conf.setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, false);
+    conf.setInt("hfile.format.version", 3);
+    conf.set("hbase.coprocessor.master.classes", 
VisibilityController.class.getName());
+    conf.set("hbase.coprocessor.region.classes", 
VisibilityController.class.getName());
+    conf.setBoolean(VisibilityConstants.CHECK_AUTHS_FOR_MUTATION, true);
+    conf.setClass(VisibilityUtils.VISIBILITY_LABEL_GENERATOR_CLASS, 
SimpleScanLabelGenerator.class,
+        ScanLabelGenerator.class);
+    conf.set("hbase.superuser", "admin");
+    TEST_UTIL.startMiniCluster(2);
+    SUPERUSER = User.createUserForTesting(conf, "admin", new String[] { 
"supergroup" });
+    USER = User.createUserForTesting(conf, "user", new String[]{});
+    // Wait for the labels table to become available
+    TEST_UTIL.waitTableEnabled(LABELS_TABLE_NAME.getName(), 50000);
+    addLabels();
+  }
+
+  @AfterClass
+  public static void tearDownAfterClass() throws Exception {
+    TEST_UTIL.shutdownMiniCluster();
+  }
+
+  public static void addLabels() throws Exception {
+    PrivilegedExceptionAction<VisibilityLabelsResponse> action = 
+        new PrivilegedExceptionAction<VisibilityLabelsResponse>() {
+      public VisibilityLabelsResponse run() throws Exception {
+        String[] labels = { TOPSECRET };
+        try {
+          VisibilityClient.addLabels(conf, labels);
+        } catch (Throwable t) {
+          throw new IOException(t);
+        }
+        return null;
+      }
+    };
+    SUPERUSER.runAs(action);
+  }
+
+  @Test
+  public void testVerifyAccessDeniedForInvalidUserAuths() throws Exception {
+    PrivilegedExceptionAction<VisibilityLabelsResponse> action = 
+        new PrivilegedExceptionAction<VisibilityLabelsResponse>() {
+      public VisibilityLabelsResponse run() throws Exception {
+        try {
+          return VisibilityClient.setAuths(conf, new String[] { TOPSECRET },
+              USER.getShortName());
+        } catch (Throwable e) {
+        }
+        return null;
+      }
+    };
+    SUPERUSER.runAs(action);
+    TableName tableName = TableName.valueOf(TEST_NAME.getMethodName());
+    HBaseAdmin hBaseAdmin = TEST_UTIL.getHBaseAdmin();
+    HColumnDescriptor colDesc = new HColumnDescriptor(fam);
+    colDesc.setMaxVersions(5);
+    HTableDescriptor desc = new HTableDescriptor(tableName);
+    desc.addFamily(colDesc);
+    hBaseAdmin.createTable(desc);
+    HTable table = null;
+    try {
+      TEST_UTIL.getHBaseAdmin().flush(tableName.getNameAsString());
+      PrivilegedExceptionAction<Void> actiona = new 
PrivilegedExceptionAction<Void>() {
+        public Void run() throws Exception {
+          HTable table = null;
+          try {
+            table = new HTable(conf, TEST_NAME.getMethodName());
+            Put p = new Put(row1);
+            p.setCellVisibility(new CellVisibility(PUBLIC + "&" + TOPSECRET));
+            p.add(fam, qual, 125l, value);
+            table.put(p);
+            Assert.fail("Testcase should fail with AccesDeniedException");
+          } catch (Throwable t) {
+            assertTrue(t.getMessage().contains("AccessDeniedException"));
+          } finally {
+            table.close();
+          }
+          return null;
+        }
+      };
+      USER.runAs(actiona);
+    } catch (Exception e) {
+      throw new IOException(e);
+    }
+  }
+
+  @Test
+  public void testLabelsWithAppend() throws Throwable {
+    PrivilegedExceptionAction<VisibilityLabelsResponse> action = 
+        new PrivilegedExceptionAction<VisibilityLabelsResponse>() {
+      public VisibilityLabelsResponse run() throws Exception {
+        try {
+          return VisibilityClient.setAuths(conf, new String[] { TOPSECRET },
+              USER.getShortName());
+        } catch (Throwable e) {
+        }
+        return null;
+      }
+    };
+    SUPERUSER.runAs(action);
+    TableName tableName = TableName.valueOf(TEST_NAME.getMethodName());
+    HTable table = null;
+    try {
+      table = TEST_UTIL.createTable(tableName, fam);
+      final byte[] row1 = Bytes.toBytes("row1");
+      final byte[] val = Bytes.toBytes("a");
+      PrivilegedExceptionAction<Void> actiona = new 
PrivilegedExceptionAction<Void>() {
+        public Void run() throws Exception {
+          HTable table = null;
+          try {
+            table = new HTable(conf, TEST_NAME.getMethodName());
+            Put put = new Put(row1);
+            put.add(fam, qual, HConstants.LATEST_TIMESTAMP, val);
+            put.setCellVisibility(new CellVisibility(TOPSECRET));
+            table.put(put);
+          } finally {
+            table.close();
+          }
+          return null;
+        }
+      };
+      USER.runAs(actiona);
+      actiona = new PrivilegedExceptionAction<Void>() {
+        public Void run() throws Exception {
+          HTable table = null;
+          try {
+            table = new HTable(conf, TEST_NAME.getMethodName());
+            Append append = new Append(row1);
+            append.add(fam, qual, Bytes.toBytes("b"));
+            table.append(append);
+          } finally {
+            table.close();
+          }
+          return null;
+        }
+      };
+      USER.runAs(actiona);
+      actiona = new PrivilegedExceptionAction<Void>() {
+        public Void run() throws Exception {
+          HTable table = null;
+          try {
+            table = new HTable(conf, TEST_NAME.getMethodName());
+            Append append = new Append(row1);
+            append.add(fam, qual, Bytes.toBytes("c"));
+            append.setCellVisibility(new CellVisibility(PUBLIC));
+            table.append(append);
+            Assert.fail("Testcase should fail with AccesDeniedException");
+          } catch (Throwable t) {
+            assertTrue(t.getMessage().contains("AccessDeniedException"));
+          } finally {
+            table.close();
+          }
+          return null;
+        }
+      };
+      USER.runAs(actiona);
+    } finally {
+      if (table != null) {
+        table.close();
+      }
+    }
+  }
+}

Reply via email to