Repository: hbase
Updated Branches:
  refs/heads/0.98 2e978a165 -> d79861213


http://git-wip-us.apache.org/repos/asf/hbase/blob/d7986121/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityLabelsManager.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityLabelsManager.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityLabelsManager.java
deleted file mode 100644
index e840c64..0000000
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityLabelsManager.java
+++ /dev/null
@@ -1,198 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.security.visibility;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.exceptions.DeserializationException;
-import 
org.apache.hadoop.hbase.protobuf.generated.VisibilityLabelsProtos.MultiUserAuthorizations;
-import 
org.apache.hadoop.hbase.protobuf.generated.VisibilityLabelsProtos.UserAuthorizations;
-import 
org.apache.hadoop.hbase.protobuf.generated.VisibilityLabelsProtos.VisibilityLabel;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
-import org.apache.zookeeper.KeeperException;
-
-/**
- * Maintains the cache for visibility labels and also uses the zookeeper to 
update the labels in the
- * system. The cache updation happens based on the data change event that 
happens on the zookeeper
- * znode for labels table
- */
-@InterfaceAudience.Private
-public class VisibilityLabelsManager {
-
-  private static final Log LOG = 
LogFactory.getLog(VisibilityLabelsManager.class);
-  private static final List<String> EMPTY_LIST = new ArrayList<String>(0);
-  private static VisibilityLabelsManager instance;
-
-  private ZKVisibilityLabelWatcher zkVisibilityWatcher;
-  private Map<String, Integer> labels = new HashMap<String, Integer>();
-  private Map<Integer, String> ordinalVsLabels = new HashMap<Integer, 
String>();
-  private Map<String, Set<Integer>> userAuths = new HashMap<String, 
Set<Integer>>();
-  private ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
-
-  private VisibilityLabelsManager(ZooKeeperWatcher watcher, Configuration 
conf) throws IOException {
-    zkVisibilityWatcher = new ZKVisibilityLabelWatcher(watcher, this, conf);
-    try {
-      zkVisibilityWatcher.start();
-    } catch (KeeperException ke) {
-      LOG.error("ZooKeeper initialization failed", ke);
-      throw new IOException(ke);
-    }
-  }
-
-  public synchronized static VisibilityLabelsManager get(ZooKeeperWatcher 
watcher,
-      Configuration conf) throws IOException {
-    if (instance == null) {
-      instance = new VisibilityLabelsManager(watcher, conf);
-    }
-    return instance;
-  }
-
-  public static VisibilityLabelsManager get() {
-    return instance;
-  }
-
-  public void refreshLabelsCache(byte[] data) throws IOException {
-    List<VisibilityLabel> visibilityLabels = null;
-    try {
-      visibilityLabels = VisibilityUtils.readLabelsFromZKData(data);
-    } catch (DeserializationException dse) {
-      throw new IOException(dse);
-    }
-    this.lock.writeLock().lock();
-    try {
-      for (VisibilityLabel visLabel : visibilityLabels) {
-        String label = Bytes.toString(visLabel.getLabel().toByteArray());
-        labels.put(label, visLabel.getOrdinal());
-        ordinalVsLabels.put(visLabel.getOrdinal(), label);
-      }
-    } finally {
-      this.lock.writeLock().unlock();
-    }
-  }
-
-  public void refreshUserAuthsCache(byte[] data) throws IOException {
-    MultiUserAuthorizations multiUserAuths = null;
-    try {
-      multiUserAuths = VisibilityUtils.readUserAuthsFromZKData(data);
-    } catch (DeserializationException dse) {
-      throw new IOException(dse);
-    }
-    this.lock.writeLock().lock();
-    try {
-      for (UserAuthorizations userAuths : multiUserAuths.getUserAuthsList()) {
-        String user = Bytes.toString(userAuths.getUser().toByteArray());
-        this.userAuths.put(user, new 
HashSet<Integer>(userAuths.getAuthList()));
-      }
-    } finally {
-      this.lock.writeLock().unlock();
-    }
-  }
-
-  /**
-   * @param label
-   * @return The ordinal for the label. The ordinal starts from 1. Returns 0 
when the passed a non
-   *         existing label.
-   */
-  public int getLabelOrdinal(String label) {
-    Integer ordinal = null;
-    this.lock.readLock().lock();
-    try {
-      ordinal = labels.get(label);
-    } finally {
-      this.lock.readLock().unlock();
-    }
-    if (ordinal != null) {
-      return ordinal.intValue();
-    }
-    // 0 denotes not available
-    return 0;
-  }
-
-  public String getLabel(int ordinal) {
-    this.lock.readLock().lock();
-    try {
-      return this.ordinalVsLabels.get(ordinal);
-    } finally {
-      this.lock.readLock().unlock();
-    }
-  }
-
-  /**
-   * @return The total number of visibility labels.
-   */
-  public int getLabelsCount(){
-    return this.labels.size();
-  }
-
-  /**
-   * @param user
-   * @return The labels that the given user is authorized for.
-   */
-  public List<String> getAuths(String user) {
-    List<String> auths = EMPTY_LIST;
-    this.lock.readLock().lock();
-    try {
-      Set<Integer> authOrdinals = userAuths.get(user);
-      if (authOrdinals != null) {
-        auths = new ArrayList<String>(authOrdinals.size());
-        for (Integer authOrdinal : authOrdinals) {
-          auths.add(ordinalVsLabels.get(authOrdinal));
-        }
-      }
-    } finally {
-      this.lock.readLock().unlock();
-    }
-    return auths;
-  }
-
-  /**
-   * Returns the list of ordinals of authentications associated with the user
-   *
-   * @param user
-   * @return the list of ordinals
-   */
-  public Set<Integer> getAuthsAsOrdinals(String user) {
-    this.lock.readLock().lock();
-    try {
-      return userAuths.get(user);
-    } finally {
-      this.lock.readLock().unlock();
-    }
-  }
-
-  /**
-   * Writes the labels data to zookeeper node.
-   * @param data
-   * @param labelsOrUserAuths true for writing labels and false for user auths.
-   */
-  public void writeToZookeeper(byte[] data, boolean labelsOrUserAuths) {
-    this.zkVisibilityWatcher.writeToZookeeper(data, labelsOrUserAuths);
-  }
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/d7986121/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityScanDeleteTracker.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityScanDeleteTracker.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityScanDeleteTracker.java
index 89b49c0..ab71ac1 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityScanDeleteTracker.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityScanDeleteTracker.java
@@ -18,6 +18,7 @@
  */
 package org.apache.hadoop.hbase.security.visibility;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.Iterator;
@@ -26,6 +27,8 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.KeyValue;
@@ -33,6 +36,7 @@ import org.apache.hadoop.hbase.KeyValue.Type;
 import org.apache.hadoop.hbase.Tag;
 import org.apache.hadoop.hbase.regionserver.ScanDeleteTracker;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
 
 /**
  * Similar to ScanDeletTracker but tracks the visibility expression also before
@@ -41,6 +45,8 @@ import org.apache.hadoop.hbase.util.Bytes;
 @InterfaceAudience.Private
 public class VisibilityScanDeleteTracker extends ScanDeleteTracker {
 
+  private static final Log LOG = 
LogFactory.getLog(VisibilityScanDeleteTracker.class);
+
   // Its better to track the visibility tags in delete based on each type.  
Create individual
   // data structures for tracking each of them.  This would ensure that there 
is no tracking based
   // on time and also would handle all cases where deletefamily or 
deletecolumns is specified with
@@ -49,14 +55,17 @@ public class VisibilityScanDeleteTracker extends 
ScanDeleteTracker {
   // type would solve this problem and also ensure that the combination of 
different type
   // of deletes with diff ts would also work fine
   // Track per TS
-  private Map<Long, List<Tag>> visibilityTagsDeleteFamily = new HashMap<Long, 
List<Tag>>();
+  private Map<Long, Pair<List<Tag>, Byte>> visibilityTagsDeleteFamily =
+      new HashMap<Long, Pair<List<Tag>, Byte>>();
   // Delete family version with different ts and different visibility 
expression could come.
   // Need to track it per ts.
-  private Map<Long,List<Tag>> visibilityTagsDeleteFamilyVersion = new 
HashMap<Long, List<Tag>>();
-  private List<List<Tag>> visibilityTagsDeleteColumns;
+  private Map<Long,Pair<List<Tag>, Byte>> visibilityTagsDeleteFamilyVersion =
+      new HashMap<Long,Pair<List<Tag>, Byte>>();
+  private List<Pair<List<Tag>, Byte>> visibilityTagsDeleteColumns;
   // Tracking as List<List> is to handle same ts cell but different visibility 
tag. 
   // TODO : Need to handle puts with same ts but different vis tags.
-  private List<List<Tag>> visiblityTagsDeleteColumnVersion = new 
ArrayList<List<Tag>>();
+  private List<Pair<List<Tag>, Byte>> visiblityTagsDeleteColumnVersion =
+      new ArrayList<Pair<List<Tag>, Byte>>();
 
   public VisibilityScanDeleteTracker() {
     super();
@@ -72,11 +81,11 @@ public class VisibilityScanDeleteTracker extends 
ScanDeleteTracker {
     if (type == KeyValue.Type.DeleteFamily.getCode()) {
       hasFamilyStamp = true;
       //familyStamps.add(delCell.getTimestamp());
-      extractDeleteTags(delCell, KeyValue.Type.DeleteFamily);
+      extractDeleteCellVisTags(delCell, KeyValue.Type.DeleteFamily);
       return;
     } else if (type == KeyValue.Type.DeleteFamilyVersion.getCode()) {
       familyVersionStamps.add(timestamp);
-      extractDeleteTags(delCell, KeyValue.Type.DeleteFamilyVersion);
+      extractDeleteCellVisTags(delCell, KeyValue.Type.DeleteFamilyVersion);
       return;
     }
     // new column, or more general delete type
@@ -103,47 +112,51 @@ public class VisibilityScanDeleteTracker extends 
ScanDeleteTracker {
     deleteLength = qualifierLength;
     deleteType = type;
     deleteTimestamp = timestamp;
-    extractDeleteTags(delCell, KeyValue.Type.codeToType(type));
+    extractDeleteCellVisTags(delCell, KeyValue.Type.codeToType(type));
   }
 
-  private void extractDeleteTags(Cell delCell, Type type) {
+  private void extractDeleteCellVisTags(Cell delCell, Type type) {
     // If tag is present in the delete
     if (delCell.getTagsLengthUnsigned() > 0) {
       switch (type) {
         case DeleteFamily:
           List<Tag> delTags = new ArrayList<Tag>();
           if (visibilityTagsDeleteFamily != null) {
-            VisibilityUtils.getVisibilityTags(delCell, delTags);
+            Byte deleteCellVisTagsFormat = 
VisibilityUtils.extractVisibilityTags(delCell, delTags);
             if (!delTags.isEmpty()) {
-              visibilityTagsDeleteFamily.put(delCell.getTimestamp(), delTags);
+              visibilityTagsDeleteFamily.put(delCell.getTimestamp(), new 
Pair<List<Tag>, Byte>(
+                  delTags, deleteCellVisTagsFormat));
             }
           }
           break;
         case DeleteFamilyVersion:
           delTags = new ArrayList<Tag>();
-          VisibilityUtils.getVisibilityTags(delCell, delTags);
+          Byte deleteCellVisTagsFormat = 
VisibilityUtils.extractVisibilityTags(delCell, delTags);
           if (!delTags.isEmpty()) {
-            visibilityTagsDeleteFamilyVersion.put(delCell.getTimestamp(), 
delTags);
+            visibilityTagsDeleteFamilyVersion.put(delCell.getTimestamp(), new 
Pair<List<Tag>, Byte>(
+                delTags, deleteCellVisTagsFormat));
           }
           break;
         case DeleteColumn:
           if (visibilityTagsDeleteColumns == null) {
-            visibilityTagsDeleteColumns = new ArrayList<List<Tag>>();
+            visibilityTagsDeleteColumns = new ArrayList<Pair<List<Tag>, 
Byte>>();
           }
           delTags = new ArrayList<Tag>();
-          VisibilityUtils.getVisibilityTags(delCell, delTags);
+          deleteCellVisTagsFormat = 
VisibilityUtils.extractVisibilityTags(delCell, delTags);
           if (!delTags.isEmpty()) {
-            visibilityTagsDeleteColumns.add(delTags);
+            visibilityTagsDeleteColumns.add(new Pair<List<Tag>, Byte>(delTags,
+                deleteCellVisTagsFormat));
           }
           break;
         case Delete:
           if (visiblityTagsDeleteColumnVersion == null) {
-            visiblityTagsDeleteColumnVersion = new ArrayList<List<Tag>>();
+            visiblityTagsDeleteColumnVersion = new ArrayList<Pair<List<Tag>, 
Byte>>();
           }
           delTags = new ArrayList<Tag>();
-          VisibilityUtils.getVisibilityTags(delCell, delTags);
+          deleteCellVisTagsFormat = 
VisibilityUtils.extractVisibilityTags(delCell, delTags);
           if (!delTags.isEmpty()) {
-            visiblityTagsDeleteColumnVersion.add(delTags);
+            visiblityTagsDeleteColumnVersion.add(new Pair<List<Tag>, 
Byte>(delTags,
+                deleteCellVisTagsFormat));
           }
           break;
         default:
@@ -174,93 +187,119 @@ public class VisibilityScanDeleteTracker extends 
ScanDeleteTracker {
     long timestamp = cell.getTimestamp();
     int qualifierOffset = cell.getQualifierOffset();
     int qualifierLength = cell.getQualifierLength();
-    if (hasFamilyStamp) {
-      if (visibilityTagsDeleteFamily != null) {
-        Set<Entry<Long, List<Tag>>> deleteFamilies = 
visibilityTagsDeleteFamily.entrySet();
-        Iterator<Entry<Long, List<Tag>>> iterator = deleteFamilies.iterator();
-        while (iterator.hasNext()) {
-          Entry<Long, List<Tag>> entry = iterator.next();
-          if (timestamp <= entry.getKey()) {
-            boolean matchFound = 
VisibilityUtils.checkForMatchingVisibilityTags(cell,
-                entry.getValue());
-            if (matchFound) {
-              return DeleteResult.FAMILY_VERSION_DELETED;
+    try {
+      if (hasFamilyStamp) {
+        if (visibilityTagsDeleteFamily != null) {
+          Set<Entry<Long, Pair<List<Tag>, Byte>>> deleteFamilies = 
visibilityTagsDeleteFamily
+              .entrySet();
+          Iterator<Entry<Long, Pair<List<Tag>, Byte>>> iterator = 
deleteFamilies.iterator();
+          while (iterator.hasNext()) {
+            Entry<Long, Pair<List<Tag>, Byte>> entry = iterator.next();
+            if (timestamp <= entry.getKey()) {
+              List<Tag> putVisTags = new ArrayList<Tag>();
+              Byte putCellVisTagsFormat = 
VisibilityUtils.extractVisibilityTags(cell, putVisTags);
+              boolean matchFound = VisibilityLabelServiceManager
+                  .getInstance().getVisibilityLabelService()
+                  .matchVisibility(putVisTags, putCellVisTagsFormat, 
entry.getValue().getFirst(),
+                      entry.getValue().getSecond());
+              if (matchFound) {
+                return DeleteResult.FAMILY_VERSION_DELETED;
+              }
             }
           }
-        }
-      } else {
-        if (!VisibilityUtils.isVisibilityTagsPresent(cell)) {
-          // No tags
-          return DeleteResult.FAMILY_VERSION_DELETED;
+        } else {
+          if (!VisibilityUtils.isVisibilityTagsPresent(cell)) {
+            // No tags
+            return DeleteResult.FAMILY_VERSION_DELETED;
+          }
         }
       }
-    }
-    if (familyVersionStamps.contains(Long.valueOf(timestamp))) {
-      if (visibilityTagsDeleteFamilyVersion != null) {
-        List<Tag> tags = 
visibilityTagsDeleteFamilyVersion.get(Long.valueOf(timestamp));
-        if (tags != null) {
-          boolean matchFound = 
VisibilityUtils.checkForMatchingVisibilityTags(cell, tags);
-          if (matchFound) {
+      if (familyVersionStamps.contains(Long.valueOf(timestamp))) {
+        if (visibilityTagsDeleteFamilyVersion != null) {
+          Pair<List<Tag>, Byte> tags = 
visibilityTagsDeleteFamilyVersion.get(Long
+              .valueOf(timestamp));
+          if (tags != null) {
+            List<Tag> putVisTags = new ArrayList<Tag>();
+            Byte putCellVisTagsFormat = 
VisibilityUtils.extractVisibilityTags(cell, putVisTags);
+            boolean matchFound = VisibilityLabelServiceManager
+                .getInstance()
+                .getVisibilityLabelService()
+                .matchVisibility(putVisTags, putCellVisTagsFormat, 
tags.getFirst(),
+                    tags.getSecond());
+            if (matchFound) {
+              return DeleteResult.FAMILY_VERSION_DELETED;
+            }
+          }
+        } else {
+          if (!VisibilityUtils.isVisibilityTagsPresent(cell)) {
+            // No tags
             return DeleteResult.FAMILY_VERSION_DELETED;
           }
         }
-      } else {
-        if (!VisibilityUtils.isVisibilityTagsPresent(cell)) {
-          // No tags
-          return DeleteResult.FAMILY_VERSION_DELETED;
-        }
       }
-    }
-    if (deleteBuffer != null) {
-      int ret = Bytes.compareTo(deleteBuffer, deleteOffset, deleteLength, 
cell.getQualifierArray(),
-          qualifierOffset, qualifierLength);
+      if (deleteBuffer != null) {
+        int ret = Bytes.compareTo(deleteBuffer, deleteOffset, deleteLength,
+            cell.getQualifierArray(), qualifierOffset, qualifierLength);
 
-      if (ret == 0) {
-        if (deleteType == KeyValue.Type.DeleteColumn.getCode()) {
-          if (visibilityTagsDeleteColumns != null) {
-            for (List<Tag> tags : visibilityTagsDeleteColumns) {
-              boolean matchFound = 
VisibilityUtils.checkForMatchingVisibilityTags(cell,
-                  tags);
-              if (matchFound) {
+        if (ret == 0) {
+          if (deleteType == KeyValue.Type.DeleteColumn.getCode()) {
+            if (visibilityTagsDeleteColumns != null) {
+              for (Pair<List<Tag>, Byte> tags : visibilityTagsDeleteColumns) {
+                List<Tag> putVisTags = new ArrayList<Tag>();
+                Byte putCellVisTagsFormat = 
VisibilityUtils.extractVisibilityTags(cell, putVisTags);
+                boolean matchFound = VisibilityLabelServiceManager
+                    .getInstance()
+                    .getVisibilityLabelService()
+                    .matchVisibility(putVisTags, putCellVisTagsFormat, 
tags.getFirst(),
+                        tags.getSecond());
+                if (matchFound) {
+                  return DeleteResult.VERSION_DELETED;
+                }
+              }
+            } else {
+              if (!VisibilityUtils.isVisibilityTagsPresent(cell)) {
+                // No tags
                 return DeleteResult.VERSION_DELETED;
               }
             }
-          } else {
-            if (!VisibilityUtils.isVisibilityTagsPresent(cell)) {
-              // No tags
-              return DeleteResult.VERSION_DELETED;
-            }
           }
-        }
-        // Delete (aka DeleteVersion)
-        // If the timestamp is the same, keep this one
-        if (timestamp == deleteTimestamp) {
-          if (visiblityTagsDeleteColumnVersion != null) {
-            for (List<Tag> tags : visiblityTagsDeleteColumnVersion) {
-              boolean matchFound = 
VisibilityUtils.checkForMatchingVisibilityTags(cell,
-                  tags);
-              if (matchFound) {
+          // Delete (aka DeleteVersion)
+          // If the timestamp is the same, keep this one
+          if (timestamp == deleteTimestamp) {
+            if (visiblityTagsDeleteColumnVersion != null) {
+              for (Pair<List<Tag>, Byte> tags : 
visiblityTagsDeleteColumnVersion) {
+                List<Tag> putVisTags = new ArrayList<Tag>();
+                Byte putCellVisTagsFormat = 
VisibilityUtils.extractVisibilityTags(cell, putVisTags);
+                boolean matchFound = VisibilityLabelServiceManager
+                    .getInstance()
+                    .getVisibilityLabelService()
+                    .matchVisibility(putVisTags, putCellVisTagsFormat, 
tags.getFirst(),
+                        tags.getSecond());
+                if (matchFound) {
+                  return DeleteResult.VERSION_DELETED;
+                }
+              }
+            } else {
+              if (!VisibilityUtils.isVisibilityTagsPresent(cell)) {
+                // No tags
                 return DeleteResult.VERSION_DELETED;
               }
             }
-          } else {
-            if (!VisibilityUtils.isVisibilityTagsPresent(cell)) {
-              // No tags
-              return DeleteResult.VERSION_DELETED;
-            }
           }
+        } else if (ret < 0) {
+          // Next column case.
+          deleteBuffer = null;
+          visibilityTagsDeleteColumns = null;
+          visiblityTagsDeleteColumnVersion = null;
+        } else {
+          throw new IllegalStateException("isDeleted failed: deleteBuffer="
+              + Bytes.toStringBinary(deleteBuffer, deleteOffset, deleteLength) 
+ ", qualifier="
+              + Bytes.toStringBinary(cell.getQualifierArray(), 
qualifierOffset, qualifierLength)
+              + ", timestamp=" + timestamp + ", comparison result: " + ret);
         }
-      } else if (ret < 0) {
-        // Next column case.
-        deleteBuffer = null;
-        visibilityTagsDeleteColumns = null;
-        visiblityTagsDeleteColumnVersion = null;
-      } else {
-        throw new IllegalStateException("isDeleted failed: deleteBuffer="
-            + Bytes.toStringBinary(deleteBuffer, deleteOffset, deleteLength) + 
", qualifier="
-            + Bytes.toStringBinary(cell.getQualifierArray(), qualifierOffset, 
qualifierLength)
-            + ", timestamp=" + timestamp + ", comparison result: " + ret);
       }
+    } catch (IOException e) {
+      LOG.error("Error in isDeleted() check! Will treat cell as not deleted", 
e);
     }
     return DeleteResult.NOT_DELETED;
   }
@@ -269,8 +308,8 @@ public class VisibilityScanDeleteTracker extends 
ScanDeleteTracker {
   public void reset() {
     super.reset();
     visibilityTagsDeleteColumns = null;
-    visibilityTagsDeleteFamily = new HashMap<Long, List<Tag>>();
-    visibilityTagsDeleteFamilyVersion = new HashMap<Long, List<Tag>>();
+    visibilityTagsDeleteFamily = new HashMap<Long, Pair<List<Tag>, Byte>>();
+    visibilityTagsDeleteFamilyVersion = new HashMap<Long, Pair<List<Tag>, 
Byte>>();
     visiblityTagsDeleteColumnVersion = null;
   }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/d7986121/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityUtils.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityUtils.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityUtils.java
index f3c84e1..a676f2b 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityUtils.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityUtils.java
@@ -19,29 +19,36 @@ package org.apache.hadoop.hbase.security.visibility;
 
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Collections;
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 
 import org.apache.commons.lang.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.Tag;
 import org.apache.hadoop.hbase.TagType;
 import org.apache.hadoop.hbase.exceptions.DeserializationException;
-import org.apache.hadoop.hbase.io.util.StreamUtils;
+import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.ipc.RequestContext;
 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
 import 
org.apache.hadoop.hbase.protobuf.generated.VisibilityLabelsProtos.MultiUserAuthorizations;
 import 
org.apache.hadoop.hbase.protobuf.generated.VisibilityLabelsProtos.UserAuthorizations;
 import 
org.apache.hadoop.hbase.protobuf.generated.VisibilityLabelsProtos.VisibilityLabel;
 import 
org.apache.hadoop.hbase.protobuf.generated.VisibilityLabelsProtos.VisibilityLabelsRequest;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.util.ByteRange;
 import org.apache.hadoop.hbase.util.ByteStringer;
 import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.hbase.util.SimpleByteRange;
 import org.apache.hadoop.util.ReflectionUtils;
 
 import com.google.protobuf.InvalidProtocolBufferException;
@@ -52,14 +59,14 @@ import com.google.protobuf.InvalidProtocolBufferException;
 @InterfaceAudience.Private
 public class VisibilityUtils {
 
+  private static final Log LOG = LogFactory.getLog(VisibilityUtils.class);
+
   public static final String VISIBILITY_LABEL_GENERATOR_CLASS =
       "hbase.regionserver.scan.visibility.label.generator.class";
-  public static final byte VISIBILITY_TAG_TYPE = TagType.VISIBILITY_TAG_TYPE;
-  public static final byte VISIBILITY_EXP_SERIALIZATION_TAG_TYPE =
-      TagType.VISIBILITY_EXP_SERIALIZATION_TAG_TYPE;
   public static final String SYSTEM_LABEL = "system";
-  public static final Tag VIS_SERIALIZATION_TAG = new 
Tag(VISIBILITY_EXP_SERIALIZATION_TAG_TYPE,
-      VisibilityConstants.SORTED_ORDINAL_SERIALIZATION_FORMAT);
+  public static final Tag SORTED_ORDINAL_SERIALIZATION_FORMAT_TAG = new Tag(
+      TagType.VISIBILITY_EXP_SERIALIZATION_FORMAT_TAG_TYPE,
+      VisibilityConstants.SORTED_ORDINAL_SERIALIZATION_FORMAT_TAG_VAL);
   private static final String COMMA = ",";
 
   /**
@@ -140,8 +147,15 @@ public class VisibilityUtils {
     return null;
   }
 
-  public static List<ScanLabelGenerator> getScanLabelGenerators(Configuration 
conf)
-      throws IOException {
+  /**
+   * @param conf The configuration to use
+   * @return Stack of ScanLabelGenerator instances. ScanLabelGenerator classes 
can be specified in
+   *         Configuration as comma separated list using key
+   *         "hbase.regionserver.scan.visibility.label.generator.class"
+   * @throws IllegalArgumentException
+   *           when any of the specified ScanLabelGenerator class can not be 
loaded.
+   */
+  public static List<ScanLabelGenerator> getScanLabelGenerators(Configuration 
conf) {
     // There can be n SLG specified as comma separated in conf
     String slgClassesCommaSeparated = 
conf.get(VISIBILITY_LABEL_GENERATOR_CLASS);
     // We have only System level SLGs now. The order of execution will be same 
as the order in the
@@ -155,7 +169,7 @@ public class VisibilityUtils {
           slgKlass = (Class<? extends ScanLabelGenerator>) 
conf.getClassByName(slgClass.trim());
           slgs.add(ReflectionUtils.newInstance(slgKlass, conf));
         } catch (ClassNotFoundException e) {
-          throw new IOException(e);
+          throw new IllegalArgumentException("Unable to find " + slgClass, e);
         }
       }
     }
@@ -168,40 +182,28 @@ public class VisibilityUtils {
   }
 
   /**
-   * Get the list of visibility tags in the given cell
+   * Extract the visibility tags of the given Cell into the given List
    * @param cell - the cell
-   * @param tags - the tags array that will be populated if
-   * visibility tags are present
-   * @return true if the tags are in sorted order.
+   * @param tags - the array that will be populated if visibility tags are 
present
+   * @return The visibility tags serialization format
    */
-  public static boolean getVisibilityTags(Cell cell, List<Tag> tags) {
-    boolean sortedOrder = false;
-    if (cell.getTagsLengthUnsigned() == 0) {
-      return sortedOrder;
-    }
-    Iterator<Tag> tagsIterator = CellUtil.tagsIterator(cell.getTagsArray(), 
cell.getTagsOffset(),
-        cell.getTagsLengthUnsigned());
-    while (tagsIterator.hasNext()) {
-      Tag tag = tagsIterator.next();
-      if (tag.getType() == 
VisibilityUtils.VISIBILITY_EXP_SERIALIZATION_TAG_TYPE) {
-        int serializationVersion = tag.getBuffer()[tag.getTagOffset()];
-        if (serializationVersion == 
VisibilityConstants.VISIBILITY_SERIALIZATION_VERSION) {
-          sortedOrder = true;
-          continue;
+  public static Byte extractVisibilityTags(Cell cell, List<Tag> tags) {
+    Byte serializationFormat = null;
+    if (cell.getTagsLengthUnsigned() > 0) {
+      Iterator<Tag> tagsIterator = CellUtil.tagsIterator(cell.getTagsArray(), 
cell.getTagsOffset(),
+          cell.getTagsLengthUnsigned());
+      while (tagsIterator.hasNext()) {
+        Tag tag = tagsIterator.next();
+        if (tag.getType() == 
TagType.VISIBILITY_EXP_SERIALIZATION_FORMAT_TAG_TYPE) {
+          serializationFormat = tag.getBuffer()[tag.getTagOffset()];
+        } else if (tag.getType() == TagType.VISIBILITY_TAG_TYPE) {
+          tags.add(tag);
         }
       }
-      if (tag.getType() == VisibilityUtils.VISIBILITY_TAG_TYPE) {
-        tags.add(tag);
-      }
     }
-    return sortedOrder;
+    return serializationFormat;
   }
 
-  /**
-   * Checks if the cell has a visibility tag
-   * @param cell
-   * @return true if found, false if not found
-   */
   public static boolean isVisibilityTagsPresent(Cell cell) {
     if (cell.getTagsLengthUnsigned() == 0) {
       return false;
@@ -210,127 +212,38 @@ public class VisibilityUtils {
         cell.getTagsLengthUnsigned());
     while (tagsIterator.hasNext()) {
       Tag tag = tagsIterator.next();
-      if (tag.getType() == VisibilityUtils.VISIBILITY_TAG_TYPE) {
+      if (tag.getType() == TagType.VISIBILITY_TAG_TYPE) {
         return true;
       }
     }
     return false;
   }
 
-  /**
-   * Checks for the matching visibility labels in the delete mutation and
-   * the cell in consideration
-   * @param cell - the cell
-   * @param visibilityTagsInDeleteCell - that list of tags in the delete 
mutation
-   * (the specified Cell Visibility)
-   * @return true if matching tags are found
-   */
-  public static boolean checkForMatchingVisibilityTags(Cell cell,
-      List<Tag> visibilityTagsInDeleteCell) {
-    List<Tag> tags = new ArrayList<Tag>();
-    boolean sortedTags = getVisibilityTags(cell, tags);
-    if (tags.size() == 0) {
-      // Early out if there are no tags in the cell
-      return false;
-    }
-    if (sortedTags) {
-      return 
checkForMatchingVisibilityTagsWithSortedOrder(visibilityTagsInDeleteCell, tags);
-    } else {
-      try {
-        return checkForMatchingVisibilityTagsWithOutSortedOrder(cell, 
visibilityTagsInDeleteCell);
-      } catch (IOException e) {
-        // Should not happen
-        throw new RuntimeException("Exception while sorting the tags from the 
cell", e);
-      }
-    }
-  }
-
-  private static boolean checkForMatchingVisibilityTagsWithOutSortedOrder(Cell 
cell,
-      List<Tag> visibilityTagsInDeleteCell) throws IOException {
-    List<List<Integer>> sortedDeleteTags = sortTagsBasedOnOrdinal(
-        visibilityTagsInDeleteCell);
-    List<List<Integer>> sortedTags = sortTagsBasedOnOrdinal(cell);
-    return compareTagsOrdinals(sortedDeleteTags, sortedTags);
-  }
-
-  private static boolean checkForMatchingVisibilityTagsWithSortedOrder(
-      List<Tag> visibilityTagsInDeleteCell, List<Tag> tags) {
-    boolean matchFound = false;
-    if ((visibilityTagsInDeleteCell.size()) != tags.size()) {
-      // If the size does not match. Definitely we are not comparing the
-      // equal tags.
-      // Return false in that case.
-      return matchFound;
-    }
-    for (Tag tag : visibilityTagsInDeleteCell) {
-      matchFound = false;
-      for (Tag givenTag : tags) {
-        if (Bytes.equals(tag.getBuffer(), tag.getTagOffset(), 
tag.getTagLength(),
-            givenTag.getBuffer(), givenTag.getTagOffset(), 
givenTag.getTagLength())) {
-          matchFound = true;
-          break;
-        }
-      }
-    }
-    return matchFound;
-  }
-
-  private static List<List<Integer>> sortTagsBasedOnOrdinal(Cell cell) throws 
IOException {
-    List<List<Integer>> fullTagsList = new ArrayList<List<Integer>>();
-    if (cell.getTagsLengthUnsigned() == 0) {
-      return fullTagsList;
-    }
-    Iterator<Tag> tagsItr = CellUtil.tagsIterator(cell.getTagsArray(), 
cell.getTagsOffset(),
-        cell.getTagsLengthUnsigned());
-    while (tagsItr.hasNext()) {
-      Tag tag = tagsItr.next();
-      if (tag.getType() == VisibilityUtils.VISIBILITY_TAG_TYPE) {
-        getSortedTagOrdinals(fullTagsList, tag);
-      }
-    }
-    return fullTagsList;
-  }
-
-  private static List<List<Integer>> sortTagsBasedOnOrdinal(List<Tag> tags) 
throws IOException {
-    List<List<Integer>> fullTagsList = new ArrayList<List<Integer>>();
-    for (Tag tag : tags) {
-      if (tag.getType() == VisibilityUtils.VISIBILITY_TAG_TYPE) {
-        getSortedTagOrdinals(fullTagsList, tag);
-      }
-    }
-    return fullTagsList;
-  }
-
-  private static void getSortedTagOrdinals(List<List<Integer>> fullTagsList, 
Tag tag)
+  public static Filter createVisibilityLabelFilter(HRegion region, 
Authorizations authorizations)
       throws IOException {
-    List<Integer> tagsOrdinalInSortedOrder = new ArrayList<Integer>();
-    int offset = tag.getTagOffset();
-    int endOffset = offset + tag.getTagLength();
-    while (offset < endOffset) {
-      Pair<Integer, Integer> result = 
StreamUtils.readRawVarint32(tag.getBuffer(), offset);
-      tagsOrdinalInSortedOrder.add(result.getFirst());
-      offset += result.getSecond();
-    }
-    Collections.sort(tagsOrdinalInSortedOrder);
-    fullTagsList.add(tagsOrdinalInSortedOrder);
+    Map<ByteRange, Integer> cfVsMaxVersions = new HashMap<ByteRange, 
Integer>();
+    for (HColumnDescriptor hcd : region.getTableDesc().getFamilies()) {
+      cfVsMaxVersions.put(new SimpleByteRange(hcd.getName()), 
hcd.getMaxVersions());
+    }
+    VisibilityLabelService vls = VisibilityLabelServiceManager.getInstance()
+        .getVisibilityLabelService();
+    Filter visibilityLabelFilter = new VisibilityLabelFilter(
+        vls.getVisibilityExpEvaluator(authorizations), cfVsMaxVersions);
+    return visibilityLabelFilter;
   }
 
-  private static boolean compareTagsOrdinals(List<List<Integer>> tagsInDeletes,
-      List<List<Integer>> tags) {
-    boolean matchFound = false;
-    if (tagsInDeletes.size() != tags.size()) {
-      return matchFound;
-    } else {
-      for (List<Integer> deleteTagOrdinals : tagsInDeletes) {
-        matchFound = false;
-        for (List<Integer> tagOrdinals : tags) {
-          if (deleteTagOrdinals.equals(tagOrdinals)) {
-            matchFound = true;
-            break;
-          }
-        }
-      }
-      return matchFound;
+  /**
+   * @return User who called RPC method. For non-RPC handling, falls back to 
system user
+   * @throws IOException When there is IOE in getting the system user (During 
non-RPC handling).
+   */
+  public static User getActiveUser() throws IOException {
+    User user = RequestContext.getRequestUser();
+    if (!RequestContext.isInRequestContext()) {
+      user = User.getCurrent();
+    }
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("Current active user name is " + user.getShortName());
     }
+    return user;
   }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/d7986121/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/ZKVisibilityLabelWatcher.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/ZKVisibilityLabelWatcher.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/ZKVisibilityLabelWatcher.java
index 4f584e1..263c388 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/ZKVisibilityLabelWatcher.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/ZKVisibilityLabelWatcher.java
@@ -42,14 +42,14 @@ public class ZKVisibilityLabelWatcher extends 
ZooKeeperListener {
       "zookeeper.znode.visibility.user.auths.parent";
   private static final String DEFAULT_VISIBILITY_USER_AUTHS_NODE = 
"visibility/user_auths";
 
-  private VisibilityLabelsManager labelsManager;
+  private VisibilityLabelsCache labelsCache;
   private String labelZnode;
   private String userAuthsZnode;
 
-  public ZKVisibilityLabelWatcher(ZooKeeperWatcher watcher, 
VisibilityLabelsManager labelsManager,
+  public ZKVisibilityLabelWatcher(ZooKeeperWatcher watcher, 
VisibilityLabelsCache labelsCache,
       Configuration conf) {
     super(watcher);
-    this.labelsManager = labelsManager;
+    this.labelsCache = labelsCache;
     String labelZnodeParent = conf.get(VISIBILITY_LABEL_ZK_PATH, 
DEFAULT_VISIBILITY_LABEL_NODE);
     String userAuthsZnodeParent = conf.get(VISIBILITY_USER_AUTHS_ZK_PATH,
         DEFAULT_VISIBILITY_USER_AUTHS_NODE);
@@ -75,7 +75,7 @@ public class ZKVisibilityLabelWatcher extends 
ZooKeeperListener {
 
   private void refreshVisibilityLabelsCache(byte[] data) {
     try {
-      this.labelsManager.refreshLabelsCache(data);
+      this.labelsCache.refreshLabelsCache(data);
     } catch (IOException ioe) {
       LOG.error("Failed parsing data from labels table " + " from zk", ioe);
     }
@@ -83,7 +83,7 @@ public class ZKVisibilityLabelWatcher extends 
ZooKeeperListener {
 
   private void refreshUserAuthsCache(byte[] data) {
     try {
-      this.labelsManager.refreshUserAuthsCache(data);
+      this.labelsCache.refreshUserAuthsCache(data);
     } catch (IOException ioe) {
       LOG.error("Failed parsing data from labels table " + " from zk", ioe);
     }

http://git-wip-us.apache.org/repos/asf/hbase/blob/d7986121/hbase-server/src/test/java/org/apache/hadoop/hbase/security/visibility/ExpAsStringVisibilityLabelServiceImpl.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/visibility/ExpAsStringVisibilityLabelServiceImpl.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/visibility/ExpAsStringVisibilityLabelServiceImpl.java
new file mode 100644
index 0000000..90fdd73
--- /dev/null
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/visibility/ExpAsStringVisibilityLabelServiceImpl.java
@@ -0,0 +1,395 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.security.visibility;
+
+import static org.apache.hadoop.hbase.TagType.VISIBILITY_TAG_TYPE;
+import static 
org.apache.hadoop.hbase.security.visibility.VisibilityConstants.LABELS_TABLE_FAMILY;
+import static 
org.apache.hadoop.hbase.security.visibility.VisibilityConstants.LABELS_TABLE_NAME;
+import static 
org.apache.hadoop.hbase.security.visibility.VisibilityUtils.SYSTEM_LABEL;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.TagType;
+import org.apache.hadoop.hbase.HConstants.OperationStatusCode;
+import org.apache.hadoop.hbase.Tag;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.OperationStatus;
+import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.security.access.AccessControlLists;
+import org.apache.hadoop.hbase.security.visibility.expression.ExpressionNode;
+import 
org.apache.hadoop.hbase.security.visibility.expression.LeafExpressionNode;
+import 
org.apache.hadoop.hbase.security.visibility.expression.NonLeafExpressionNode;
+import org.apache.hadoop.hbase.security.visibility.expression.Operator;
+import org.apache.hadoop.hbase.util.Bytes;
+
+import com.google.common.collect.Lists;
+
+/**
+ * This is a VisibilityLabelService where labels in Mutation's visibility 
expression will be
+ * persisted as Strings itself rather than ordinals in 'labels' table. Also 
there is no need to add
+ * labels to the system, prior to using them in Mutations/Authorizations.
+ */
+@InterfaceAudience.Private
+public class ExpAsStringVisibilityLabelServiceImpl implements 
VisibilityLabelService {
+
+  private static final Log LOG = 
LogFactory.getLog(ExpAsStringVisibilityLabelServiceImpl.class);
+
+  private static final byte[] DUMMY_VALUE = new byte[0];
+  private static final byte STRING_SERIALIZATION_FORMAT = 2;
+  private static final Tag STRING_SERIALIZATION_FORMAT_TAG = new Tag(
+      TagType.VISIBILITY_EXP_SERIALIZATION_FORMAT_TAG_TYPE,
+      new byte[]{STRING_SERIALIZATION_FORMAT});
+  private final ExpressionParser expressionParser = new ExpressionParser();
+  private final ExpressionExpander expressionExpander = new 
ExpressionExpander();
+  private Configuration conf;
+  private HRegion labelsRegion;
+  private List<ScanLabelGenerator> scanLabelGenerators;
+
+  @Override
+  public OperationStatus[] addLabels(List<byte[]> labels) throws IOException {
+    // Not doing specific label add. We will just add labels in Mutation 
visibility expression as it
+    // is along with every cell.
+    OperationStatus[] status = new OperationStatus[labels.size()];
+    for (int i = 0; i < labels.size(); i++) {
+      status[i] = new OperationStatus(OperationStatusCode.SUCCESS);
+    }
+    return status;
+  }
+
+  @Override
+  public OperationStatus[] setAuths(byte[] user, List<byte[]> authLabels) 
throws IOException {
+    assert labelsRegion != null;
+    OperationStatus[] finalOpStatus = new OperationStatus[authLabels.size()];
+    Put p = new Put(user);
+    for (byte[] auth : authLabels) {
+      p.addImmutable(LABELS_TABLE_FAMILY, auth, DUMMY_VALUE);
+    }
+    this.labelsRegion.put(p);
+    // This is a testing impl and so not doing any caching
+    for (int i = 0; i < authLabels.size(); i++) {
+      finalOpStatus[i] = new OperationStatus(OperationStatusCode.SUCCESS);
+    }
+    return finalOpStatus;
+  }
+
+  @Override
+  public OperationStatus[] clearAuths(byte[] user, List<byte[]> authLabels) 
throws IOException {
+    assert labelsRegion != null;
+    OperationStatus[] finalOpStatus = new OperationStatus[authLabels.size()];
+    List<String> currentAuths = this.getAuths(user, true);
+    Delete d = new Delete(user);
+    int i = 0;
+    for (byte[] authLabel : authLabels) {
+      String authLabelStr = Bytes.toString(authLabel);
+      if (currentAuths.contains(authLabelStr)) {
+        d.deleteColumns(LABELS_TABLE_FAMILY, authLabel);
+      } else {
+        // This label is not set for the user.
+        finalOpStatus[i] = new OperationStatus(OperationStatusCode.FAILURE,
+            new InvalidLabelException("Label '" + authLabelStr + "' is not set 
for the user "
+                + Bytes.toString(user)));
+      }
+      i++;
+    }
+    this.labelsRegion.delete(d);
+    // This is a testing impl and so not doing any caching
+    for (i = 0; i < authLabels.size(); i++) {
+      if (finalOpStatus[i] == null) {
+        finalOpStatus[i] = new OperationStatus(OperationStatusCode.SUCCESS);
+      }
+    }
+    return finalOpStatus;
+  }
+
+  @Override
+  public List<String> getAuths(byte[] user, boolean systemCall) throws 
IOException {
+    assert (labelsRegion != null || systemCall);
+    List<String> auths = new ArrayList<String>();
+    Get get = new Get(user);
+    List<Cell> cells = null;
+    if (labelsRegion == null) {
+      HTable table = null;
+      try {
+        table = new HTable(conf, VisibilityConstants.LABELS_TABLE_NAME);
+        Result result = table.get(get);
+        cells = result.listCells();
+      } finally {
+        if (table != null) {
+          table.close();
+        }
+      }
+    } else {
+      cells = this.labelsRegion.get(get, false);
+    }
+    if (cells != null) {
+      for (Cell cell : cells) {
+        String auth = Bytes.toString(cell.getQualifierArray(), 
cell.getQualifierOffset(),
+            cell.getQualifierLength());
+        auths.add(auth);
+      }
+    }
+    return auths;
+  }
+
+  @Override
+  public List<Tag> createVisibilityExpTags(String visExpression, boolean 
withSerializationFormat,
+      boolean checkAuths) throws IOException {
+    ExpressionNode node = null;
+    try {
+      node = this.expressionParser.parse(visExpression);
+    } catch (ParseException e) {
+      throw new IOException(e);
+    }
+    node = this.expressionExpander.expand(node);
+    List<Tag> tags = new ArrayList<Tag>();
+    if (withSerializationFormat) {
+      tags.add(STRING_SERIALIZATION_FORMAT_TAG);
+    }
+    if (node instanceof NonLeafExpressionNode
+        && ((NonLeafExpressionNode) node).getOperator() == Operator.OR) {
+      for (ExpressionNode child : ((NonLeafExpressionNode) 
node).getChildExps()) {
+        tags.add(createTag(child));
+      }
+    } else {
+      tags.add(createTag(node));
+    }
+    return tags;
+  }
+
+  @Override
+  public VisibilityExpEvaluator getVisibilityExpEvaluator(Authorizations 
authorizations)
+      throws IOException {
+    // If a super user issues a get/scan, he should be able to scan the cells
+    // irrespective of the Visibility labels
+    if (isReadFromSuperUser()) {
+      return new VisibilityExpEvaluator() {
+        @Override
+        public boolean evaluate(Cell cell) throws IOException {
+          return true;
+        }
+      };
+    }
+    List<String> authLabels = null;
+    for (ScanLabelGenerator scanLabelGenerator : scanLabelGenerators) {
+      try {
+        // null authorizations to be handled inside SLG impl.
+        authLabels = 
scanLabelGenerator.getLabels(VisibilityUtils.getActiveUser(), authorizations);
+        authLabels = (authLabels == null) ? new ArrayList<String>() : 
authLabels;
+        authorizations = new Authorizations(authLabels);
+      } catch (Throwable t) {
+        LOG.error(t);
+        throw new IOException(t);
+      }
+    }
+    final List<String> authLabelsFinal = authLabels;
+    return new VisibilityExpEvaluator() {
+      @Override
+      public boolean evaluate(Cell cell) throws IOException {
+        boolean visibilityTagPresent = false;
+        // Save an object allocation where we can
+        if (cell.getTagsLengthUnsigned() > 0) {
+          Iterator<Tag> tagsItr = CellUtil.tagsIterator(cell.getTagsArray(), 
cell.getTagsOffset(),
+              cell.getTagsLengthUnsigned());
+          while (tagsItr.hasNext()) {
+            boolean includeKV = true;
+            Tag tag = tagsItr.next();
+            if (tag.getType() == VISIBILITY_TAG_TYPE) {
+              visibilityTagPresent = true;
+              int offset = tag.getTagOffset();
+              int endOffset = offset + tag.getTagLength();
+              while (offset < endOffset) {
+                short len = Bytes.toShort(tag.getBuffer(), offset);
+                offset += 2;
+                if (len < 0) {
+                  // This is a NOT label.
+                  len = (short) (-1 * len);
+                  String label = Bytes.toString(tag.getBuffer(), offset, len);
+                  if (authLabelsFinal.contains(label)) {
+                    includeKV = false;
+                    break;
+                  }
+                } else {
+                  String label = Bytes.toString(tag.getBuffer(), offset, len);
+                  if (!authLabelsFinal.contains(label)) {
+                    includeKV = false;
+                    break;
+                  }
+                }
+                offset += len;
+              }
+              if (includeKV) {
+                // We got one visibility expression getting evaluated to true. 
Good to include this
+                // KV in the result then.
+                return true;
+              }
+            }
+          }
+        }
+        return !(visibilityTagPresent);
+      }
+    };
+  }
+
+  protected boolean isReadFromSuperUser() throws IOException {
+    byte[] user = 
Bytes.toBytes(VisibilityUtils.getActiveUser().getShortName());
+    return havingSystemAuth(user);
+  }
+
+  private Tag createTag(ExpressionNode node) throws IOException {
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    DataOutputStream dos = new DataOutputStream(baos);
+    List<String> labels = new ArrayList<String>();
+    List<String> notLabels = new ArrayList<String>();
+    extractLabels(node, labels, notLabels);
+    Collections.sort(labels);
+    Collections.sort(notLabels);
+    // We will write the NOT labels 1st followed by normal labels
+    // Each of the label we will write with label length (as short 1st) 
followed by the label bytes.
+    // For a NOT node we will write the label length as -ve.
+    for (String label : notLabels) {
+      byte[] bLabel = Bytes.toBytes(label);
+      short length = (short) bLabel.length;
+      length = (short) (-1 * length);
+      dos.writeShort(length);
+      dos.write(bLabel);
+    }
+    for (String label : labels) {
+      byte[] bLabel = Bytes.toBytes(label);
+      dos.writeShort(bLabel.length);
+      dos.write(bLabel);
+    }
+    return new Tag(VISIBILITY_TAG_TYPE, baos.toByteArray());
+  }
+
+  private void extractLabels(ExpressionNode node, List<String> labels, 
List<String> notLabels) {
+    if (node.isSingleNode()) {
+      if (node instanceof NonLeafExpressionNode) {
+        // This is a NOT node.
+        LeafExpressionNode lNode = (LeafExpressionNode) 
((NonLeafExpressionNode) node)
+            .getChildExps().get(0);
+        notLabels.add(lNode.getIdentifier());
+      } else {
+        labels.add(((LeafExpressionNode) node).getIdentifier());
+      }
+    } else {
+      // A non leaf expression of labels with & operator.
+      NonLeafExpressionNode nlNode = (NonLeafExpressionNode) node;
+      assert nlNode.getOperator() == Operator.AND;
+      List<ExpressionNode> childExps = nlNode.getChildExps();
+      for (ExpressionNode child : childExps) {
+        extractLabels(child, labels, notLabels);
+      }
+    }
+  }
+
+  @Override
+  public Configuration getConf() {
+    return this.conf;
+  }
+
+  @Override
+  public void setConf(Configuration conf) {
+    this.conf = conf;
+  }
+
+  @Override
+  public void init(RegionCoprocessorEnvironment e) throws IOException {
+    this.scanLabelGenerators = 
VisibilityUtils.getScanLabelGenerators(this.conf);
+    if (e.getRegion().getRegionInfo().getTable().equals(LABELS_TABLE_NAME)) {
+      this.labelsRegion = e.getRegion();
+      // Set auth for "system" label for all super users.
+      List<String> superUsers = getSystemAndSuperUsers();
+      for (String superUser : superUsers) {
+        byte[] user = Bytes.toBytes(superUser);
+        List<String> auths = this.getAuths(user, true);
+        if (auths == null || auths.isEmpty()) {
+          Put p = new Put(user);
+          p.addImmutable(LABELS_TABLE_FAMILY, Bytes.toBytes(SYSTEM_LABEL), 
DUMMY_VALUE);
+          labelsRegion.put(p);
+        }
+      }
+    }
+  }
+
+  private List<String> getSystemAndSuperUsers() throws IOException {
+    User user = User.getCurrent();
+    if (user == null) {
+      throw new IOException("Unable to obtain the current user, "
+          + "authorization checks for internal operations will not work 
correctly!");
+    }
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("Current user name is " + user.getShortName());
+    }
+    String currentUser = user.getShortName();
+    List<String> superUsers = Lists.asList(currentUser,
+        this.conf.getStrings(AccessControlLists.SUPERUSER_CONF_KEY, new 
String[0]));
+    return superUsers;
+  }
+
+  @Override
+  public boolean havingSystemAuth(byte[] user) throws IOException {
+    List<String> auths = this.getAuths(user, true);
+    return auths.contains(SYSTEM_LABEL);
+  }
+
+  @Override
+  public boolean matchVisibility(List<Tag> putTags, Byte putTagsFormat, 
List<Tag> deleteTags,
+      Byte deleteTagsFormat) throws IOException {
+    assert putTagsFormat == STRING_SERIALIZATION_FORMAT;
+    assert deleteTagsFormat == STRING_SERIALIZATION_FORMAT;
+    return checkForMatchingVisibilityTagsWithSortedOrder(putTags, deleteTags);
+  }
+
+  private static boolean 
checkForMatchingVisibilityTagsWithSortedOrder(List<Tag> putVisTags,
+      List<Tag> deleteVisTags) {
+    boolean matchFound = false;
+    // If the size does not match. Definitely we are not comparing the equal 
tags.
+    if ((deleteVisTags.size()) == putVisTags.size()) {
+      for (Tag tag : deleteVisTags) {
+        matchFound = false;
+        for (Tag givenTag : putVisTags) {
+          if (Bytes.equals(tag.getBuffer(), tag.getTagOffset(), 
tag.getTagLength(),
+              givenTag.getBuffer(), givenTag.getTagOffset(), 
givenTag.getTagLength())) {
+            matchFound = true;
+            break;
+          }
+        }
+        if (!matchFound) break;
+      }
+    }
+    return matchFound;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/d7986121/hbase-server/src/test/java/org/apache/hadoop/hbase/security/visibility/TestVisibilityLabels.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/visibility/TestVisibilityLabels.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/visibility/TestVisibilityLabels.java
index ab645a5..d84a2ac 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/visibility/TestVisibilityLabels.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/visibility/TestVisibilityLabels.java
@@ -39,7 +39,6 @@ import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.MediumTests;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.Append;
 import org.apache.hadoop.hbase.client.Get;
@@ -62,64 +61,42 @@ import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
 import org.junit.After;
 import org.junit.AfterClass;
-import org.junit.Assert;
-import org.junit.BeforeClass;
 import org.junit.Rule;
 import org.junit.Test;
-import org.junit.experimental.categories.Category;
 import org.junit.rules.TestName;
 
 import com.google.protobuf.ByteString;
 
 /**
- * Test class that tests the visibility labels
+ * Base test class for visibility labels basic features
  */
-@Category(MediumTests.class)
-public class TestVisibilityLabels {
-
-  private static final String TOPSECRET = "topsecret";
-  private static final String PUBLIC = "public";
-  private static final String PRIVATE = "private";
-  private static final String CONFIDENTIAL = "confidential";
-  private static final String SECRET = "secret";
-  private static final String COPYRIGHT = "\u00A9ABC";
-  private static final String ACCENT = "\u0941";
-  private static final String UNICODE_VIS_TAG = COPYRIGHT + "\"" + ACCENT + 
"\\" + SECRET + "\""
+public abstract class TestVisibilityLabels {
+
+  public static final String TOPSECRET = "topsecret";
+  public static final String PUBLIC = "public";
+  public static final String PRIVATE = "private";
+  public static final String CONFIDENTIAL = "confidential";
+  public static final String SECRET = "secret";
+  public static final String COPYRIGHT = "\u00A9ABC";
+  public static final String ACCENT = "\u0941";
+  public static final String UNICODE_VIS_TAG = COPYRIGHT + "\"" + ACCENT + 
"\\" + SECRET + "\""
       + "\u0027&\\";
-  private static final String UC1 = "\u0027\"\u002b";
-  private static final String UC2 = "\u002d\u003f";
+  public static final String UC1 = "\u0027\"\u002b";
+  public static final String UC2 = "\u002d\u003f";
   public static final HBaseTestingUtility TEST_UTIL = new 
HBaseTestingUtility();
-  private static final byte[] row1 = Bytes.toBytes("row1");
-  private static final byte[] row2 = Bytes.toBytes("row2");
-  private static final byte[] row3 = Bytes.toBytes("row3");
-  private static final byte[] row4 = Bytes.toBytes("row4");
-  private final static byte[] fam = Bytes.toBytes("info");
-  private final static byte[] qual = Bytes.toBytes("qual");
-  private final static byte[] value = Bytes.toBytes("value");
+  public static final byte[] row1 = Bytes.toBytes("row1");
+  public static final byte[] row2 = Bytes.toBytes("row2");
+  public static final byte[] row3 = Bytes.toBytes("row3");
+  public static final byte[] row4 = Bytes.toBytes("row4");
+  public final static byte[] fam = Bytes.toBytes("info");
+  public final static byte[] qual = Bytes.toBytes("qual");
+  public final static byte[] value = Bytes.toBytes("value");
   public static Configuration conf;
 
   private volatile boolean killedRS = false;
   @Rule 
   public final TestName TEST_NAME = new TestName();
-  public static User SUPERUSER;
-
-  @BeforeClass
-  public static void setupBeforeClass() throws Exception {
-    // setup configuration
-    conf = TEST_UTIL.getConfiguration();
-    conf.setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, false);
-    conf.setBoolean("hbase.online.schema.update.enable", true);
-    VisibilityTestUtil.enableVisiblityLabels(conf);
-    conf.setClass(VisibilityUtils.VISIBILITY_LABEL_GENERATOR_CLASS, 
SimpleScanLabelGenerator.class,
-        ScanLabelGenerator.class);
-    conf.set("hbase.superuser", "admin");
-    TEST_UTIL.startMiniCluster(2);
-    SUPERUSER = User.createUserForTesting(conf, "admin", new String[] { 
"supergroup" });
-
-    // Wait for the labels table to become available
-    TEST_UTIL.waitTableEnabled(LABELS_TABLE_NAME.getName(), 50000);
-    addLabels();
-  }
+  public static User SUPERUSER, USER1;
 
   @AfterClass
   public static void tearDownAfterClass() throws Exception {
@@ -439,53 +416,7 @@ public class TestVisibilityLabels {
     }
   }
 
-  @Test(timeout = 60 * 1000)
-  public void testAddVisibilityLabelsOnRSRestart() throws Exception {
-    List<RegionServerThread> regionServerThreads = TEST_UTIL.getHBaseCluster()
-        .getRegionServerThreads();
-    for (RegionServerThread rsThread : regionServerThreads) {
-      rsThread.getRegionServer().abort("Aborting ");
-    }
-    // Start one new RS
-    RegionServerThread rs = TEST_UTIL.getHBaseCluster().startRegionServer();
-    waitForLabelsRegionAvailability(rs.getRegionServer());
-    PrivilegedExceptionAction<VisibilityLabelsResponse> action =
-        new PrivilegedExceptionAction<VisibilityLabelsResponse>() {
-      public VisibilityLabelsResponse run() throws Exception {
-        String[] labels = { SECRET, CONFIDENTIAL, PRIVATE, "ABC", "XYZ" };
-        try {
-          VisibilityClient.addLabels(conf, labels);
-        } catch (Throwable t) {
-          throw new IOException(t);
-        }
-        return null;
-      }
-    };
-    SUPERUSER.runAs(action);
-    // Scan the visibility label
-    Scan s = new Scan();
-    s.setAuthorizations(new Authorizations(VisibilityUtils.SYSTEM_LABEL));
-    HTable ht = new HTable(conf, LABELS_TABLE_NAME.getName());
-    int i = 0;
-    try {
-      ResultScanner scanner = ht.getScanner(s);
-      while (true) {
-        Result next = scanner.next();
-        if (next == null) {
-          break;
-        }
-        i++;
-      }
-    } finally {
-      if (ht != null) {
-        ht.close();
-      }
-    }
-    // One label is the "system" label.
-    Assert.assertEquals("The count should be 13", 13, i);
-  }
-
-  private void waitForLabelsRegionAvailability(HRegionServer regionServer) {
+  protected void waitForLabelsRegionAvailability(HRegionServer regionServer) {
     while (!regionServer.isOnline()) {
       try {
         Thread.sleep(10);
@@ -525,32 +456,6 @@ public class TestVisibilityLabels {
   }
 
   @Test
-  public void testAddLabels() throws Throwable {
-    PrivilegedExceptionAction<VisibilityLabelsResponse> action = 
-        new PrivilegedExceptionAction<VisibilityLabelsResponse>() {
-      public VisibilityLabelsResponse run() throws Exception {
-        String[] labels = { "L1", SECRET, "L2", "invalid~", "L3" };
-        VisibilityLabelsResponse response = null;
-        try {
-          response = VisibilityClient.addLabels(conf, labels);
-        } catch (Throwable e) {
-          fail("Should not have thrown exception");
-        }
-        List<RegionActionResult> resultList = response.getResultList();
-        assertEquals(5, resultList.size());
-        assertTrue(resultList.get(0).getException().getValue().isEmpty());
-        
assertEquals("org.apache.hadoop.hbase.security.visibility.LabelAlreadyExistsException",
-            resultList.get(1).getException().getName());
-        assertTrue(resultList.get(2).getException().getValue().isEmpty());
-        assertTrue(resultList.get(3).getException().getValue().isEmpty());
-        assertTrue(resultList.get(4).getException().getValue().isEmpty());
-        return null;
-      }
-    };
-    SUPERUSER.runAs(action);
-  }
-
-  @Test
   public void testSetAndGetUserAuths() throws Throwable {
     final String user = "user1";
     PrivilegedExceptionAction<Void> action = new 
PrivilegedExceptionAction<Void>() {
@@ -571,18 +476,14 @@ public class TestVisibilityLabels {
       scan.setAuthorizations(new Authorizations(VisibilityUtils.SYSTEM_LABEL));
       ResultScanner scanner = ht.getScanner(scan);
       Result result = null;
+      List<Result> results = new ArrayList<Result>();
       while ((result = scanner.next()) != null) {
-        Cell label = result.getColumnLatestCell(LABELS_TABLE_FAMILY, 
LABEL_QUALIFIER);
-        Cell userAuth = result.getColumnLatestCell(LABELS_TABLE_FAMILY, 
user.getBytes());
-        if (Bytes.equals(SECRET.getBytes(), 0, SECRET.getBytes().length, 
label.getValueArray(),
-            label.getValueOffset(), label.getValueLength())
-            || Bytes.equals(CONFIDENTIAL.getBytes(), 0, 
CONFIDENTIAL.getBytes().length,
-                label.getValueArray(), label.getValueOffset(), 
label.getValueLength())) {
-          assertNotNull(userAuth);
-        } else {
-          assertNull(userAuth);
-        }
+        results.add(result);
       }
+      List<String> auths = extractAuths(user, results);
+      assertTrue(auths.contains(SECRET));
+      assertTrue(auths.contains(CONFIDENTIAL));
+      assertEquals(2, auths.size());
     } finally {
       if (ht != null) {
         ht.close();
@@ -636,6 +537,19 @@ public class TestVisibilityLabels {
     SUPERUSER.runAs(action);
   }
 
+  protected List<String> extractAuths(String user, List<Result> results) {
+    List<String> auths = new ArrayList<String>();
+    for (Result result : results) {
+      Cell labelCell = result.getColumnLatestCell(LABELS_TABLE_FAMILY, 
LABEL_QUALIFIER);
+      Cell userAuthCell = result.getColumnLatestCell(LABELS_TABLE_FAMILY, 
user.getBytes());
+      if (userAuthCell != null) {
+        auths.add(Bytes.toString(labelCell.getValueArray(), 
labelCell.getValueOffset(),
+            labelCell.getValueLength()));
+      }
+    }
+    return auths;
+  }
+
   @Test
   public void testClearUserAuths() throws Throwable {
     PrivilegedExceptionAction<Void> action = new 
PrivilegedExceptionAction<Void>() {
@@ -659,24 +573,25 @@ public class TestVisibilityLabels {
         List<RegionActionResult> resultList = response.getResultList();
         assertEquals(3, resultList.size());
         assertTrue(resultList.get(0).getException().getValue().isEmpty());
-        
assertEquals("org.apache.hadoop.hbase.security.visibility.InvalidLabelException",
+        assertEquals("org.apache.hadoop.hbase.DoNotRetryIOException",
             resultList.get(1).getException().getName());
+        
assertTrue(Bytes.toString(resultList.get(1).getException().getValue().toByteArray())
+            .contains(
+                
"org.apache.hadoop.hbase.security.visibility.InvalidLabelException: "
+                    + "Label 'public' is not set for the user testUser"));
         assertTrue(resultList.get(2).getException().getValue().isEmpty());
         HTable ht = null;
         try {
           ht = new HTable(conf, LABELS_TABLE_NAME);
           ResultScanner scanner = ht.getScanner(new Scan());
           Result result = null;
+          List<Result> results = new ArrayList<Result>();
           while ((result = scanner.next()) != null) {
-            Cell label = result.getColumnLatestCell(LABELS_TABLE_FAMILY, 
LABEL_QUALIFIER);
-            Cell userAuth = result.getColumnLatestCell(LABELS_TABLE_FAMILY, 
user.getBytes());
-            if (Bytes.equals(PRIVATE.getBytes(), 0, PRIVATE.getBytes().length,
-                label.getValueArray(), label.getValueOffset(), 
label.getValueLength())) {
-              assertNotNull(userAuth);
-            } else {
-              assertNull(userAuth);
-            }
+            results.add(result);
           }
+          List<String> curAuths = extractAuths(user, results);
+          assertTrue(curAuths.contains(PRIVATE));
+          assertEquals(1, curAuths.size());
         } finally {
           if (ht != null) {
             ht.close();
@@ -977,7 +892,7 @@ public class TestVisibilityLabels {
     }
   }
 
-  private static HTable createTableAndWriteDataWithLabels(TableName tableName, 
String... labelExps)
+  static HTable createTableAndWriteDataWithLabels(TableName tableName, 
String... labelExps)
       throws Exception {
     HTable table = null;
     try {

http://git-wip-us.apache.org/repos/asf/hbase/blob/d7986121/hbase-server/src/test/java/org/apache/hadoop/hbase/security/visibility/TestVisibilityLabelsWithCustomVisLabService.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/visibility/TestVisibilityLabelsWithCustomVisLabService.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/visibility/TestVisibilityLabelsWithCustomVisLabService.java
new file mode 100644
index 0000000..0d78da0
--- /dev/null
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/visibility/TestVisibilityLabelsWithCustomVisLabService.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.security.visibility;
+
+import static 
org.apache.hadoop.hbase.security.visibility.VisibilityConstants.LABELS_TABLE_FAMILY;
+import static 
org.apache.hadoop.hbase.security.visibility.VisibilityConstants.LABELS_TABLE_NAME;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.NavigableMap;
+
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.MediumTests;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category(MediumTests.class)
+public class TestVisibilityLabelsWithCustomVisLabService extends 
TestVisibilityLabels {
+
+  @BeforeClass
+  public static void setupBeforeClass() throws Exception {
+    // setup configuration
+    conf = TEST_UTIL.getConfiguration();
+    conf.setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, false);
+    VisibilityTestUtil.enableVisiblityLabels(conf);
+    conf.setClass(VisibilityUtils.VISIBILITY_LABEL_GENERATOR_CLASS, 
SimpleScanLabelGenerator.class,
+        ScanLabelGenerator.class);
+    conf.setClass(VisibilityLabelServiceManager.VISIBILITY_LABEL_SERVICE_CLASS,
+        ExpAsStringVisibilityLabelServiceImpl.class, 
VisibilityLabelService.class);
+    conf.set("hbase.superuser", "admin");
+    TEST_UTIL.startMiniCluster(2);
+    SUPERUSER = User.createUserForTesting(conf, "admin", new String[] { 
"supergroup" });
+
+    // Wait for the labels table to become available
+    TEST_UTIL.waitTableEnabled(LABELS_TABLE_NAME.getName(), 50000);
+    addLabels();
+  }
+
+  // Extending this test from super as we don't verify predefined labels in 
ExpAsStringVisibilityLabelServiceImpl
+  @Test
+  public void testVisibilityLabelsInPutsThatDoesNotMatchAnyDefinedLabels() 
throws Exception {
+    TableName tableName = TableName.valueOf(TEST_NAME.getMethodName());
+    // This put with label "SAMPLE_LABEL" should not get failed.
+    createTableAndWriteDataWithLabels(tableName, "SAMPLE_LABEL", "TEST");
+  }
+
+  protected List<String> extractAuths(String user, List<Result> results) {
+    List<String> auths = new ArrayList<String>();
+    for (Result result : results) {
+      if (Bytes.equals(result.getRow(), Bytes.toBytes(user))) {
+        NavigableMap<byte[], byte[]> familyMap = 
result.getFamilyMap(LABELS_TABLE_FAMILY);
+        for (byte[] q : familyMap.keySet()) {
+          auths.add(Bytes.toString(q, 0, q.length));
+        }
+      }
+    }
+    return auths;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/d7986121/hbase-server/src/test/java/org/apache/hadoop/hbase/security/visibility/TestVisibilityLabelsWithDefaultVisLabelService.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/visibility/TestVisibilityLabelsWithDefaultVisLabelService.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/visibility/TestVisibilityLabelsWithDefaultVisLabelService.java
new file mode 100644
index 0000000..9abdd85
--- /dev/null
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/visibility/TestVisibilityLabelsWithDefaultVisLabelService.java
@@ -0,0 +1,142 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.security.visibility;
+
+import static 
org.apache.hadoop.hbase.security.visibility.VisibilityConstants.LABELS_TABLE_NAME;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+import java.util.List;
+
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.MediumTests;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import 
org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionActionResult;
+import 
org.apache.hadoop.hbase.protobuf.generated.VisibilityLabelsProtos.VisibilityLabelsResponse;
+import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category(MediumTests.class)
+public class TestVisibilityLabelsWithDefaultVisLabelService extends 
TestVisibilityLabels {
+
+  @BeforeClass
+  public static void setupBeforeClass() throws Exception {
+    // setup configuration
+    conf = TEST_UTIL.getConfiguration();
+    conf.setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, false);
+    conf.setBoolean("hbase.online.schema.update.enable", true);
+    VisibilityTestUtil.enableVisiblityLabels(conf);
+    conf.setClass(VisibilityUtils.VISIBILITY_LABEL_GENERATOR_CLASS, 
SimpleScanLabelGenerator.class,
+        ScanLabelGenerator.class);
+    conf.set("hbase.superuser", "admin");
+    TEST_UTIL.startMiniCluster(2);
+    SUPERUSER = User.createUserForTesting(conf, "admin", new String[] { 
"supergroup" });
+    USER1 = User.createUserForTesting(conf, "user1", new String[] {});
+
+    // Wait for the labels table to become available
+    TEST_UTIL.waitTableEnabled(LABELS_TABLE_NAME.getName(), 50000);
+    addLabels();
+  }
+
+  @Test
+  public void testAddLabels() throws Throwable {
+    PrivilegedExceptionAction<VisibilityLabelsResponse> action =
+        new PrivilegedExceptionAction<VisibilityLabelsResponse>() {
+      public VisibilityLabelsResponse run() throws Exception {
+        String[] labels = { "L1", SECRET, "L2", "invalid~", "L3" };
+        VisibilityLabelsResponse response = null;
+        try {
+          response = VisibilityClient.addLabels(conf, labels);
+        } catch (Throwable e) {
+          fail("Should not have thrown exception");
+        }
+        List<RegionActionResult> resultList = response.getResultList();
+        assertEquals(5, resultList.size());
+        assertTrue(resultList.get(0).getException().getValue().isEmpty());
+        assertEquals("org.apache.hadoop.hbase.DoNotRetryIOException", 
resultList.get(1)
+            .getException().getName());
+        
assertTrue(Bytes.toString(resultList.get(1).getException().getValue().toByteArray())
+            .contains(
+                
"org.apache.hadoop.hbase.security.visibility.LabelAlreadyExistsException: "
+                    + "Label 'secret' already exists"));
+        assertTrue(resultList.get(2).getException().getValue().isEmpty());
+        assertTrue(resultList.get(3).getException().getValue().isEmpty());
+        assertTrue(resultList.get(4).getException().getValue().isEmpty());
+        return null;
+      }
+    };
+    SUPERUSER.runAs(action);
+  }
+
+  @Test(timeout = 60 * 1000)
+  public void testAddVisibilityLabelsOnRSRestart() throws Exception {
+    List<RegionServerThread> regionServerThreads = TEST_UTIL.getHBaseCluster()
+        .getRegionServerThreads();
+    for (RegionServerThread rsThread : regionServerThreads) {
+      rsThread.getRegionServer().abort("Aborting ");
+    }
+    // Start one new RS
+    RegionServerThread rs = TEST_UTIL.getHBaseCluster().startRegionServer();
+    waitForLabelsRegionAvailability(rs.getRegionServer());
+    PrivilegedExceptionAction<VisibilityLabelsResponse> action =
+        new PrivilegedExceptionAction<VisibilityLabelsResponse>() {
+      public VisibilityLabelsResponse run() throws Exception {
+        String[] labels = { SECRET, CONFIDENTIAL, PRIVATE, "ABC", "XYZ" };
+        try {
+          VisibilityClient.addLabels(conf, labels);
+        } catch (Throwable t) {
+          throw new IOException(t);
+        }
+        return null;
+      }
+    };
+    SUPERUSER.runAs(action);
+    // Scan the visibility label
+    Scan s = new Scan();
+    s.setAuthorizations(new Authorizations(VisibilityUtils.SYSTEM_LABEL));
+    HTable ht = new HTable(conf, LABELS_TABLE_NAME.getName());
+    int i = 0;
+    try {
+      ResultScanner scanner = ht.getScanner(s);
+      while (true) {
+        Result next = scanner.next();
+        if (next == null) {
+          break;
+        }
+        i++;
+      }
+    } finally {
+      if (ht != null) {
+        ht.close();
+      }
+    }
+    // One label is the "system" label.
+    Assert.assertEquals("The count should be 13", 13, i);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/d7986121/hbase-server/src/test/java/org/apache/hadoop/hbase/security/visibility/TestVisibilityLabelsWithDistributedLogReplay.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/visibility/TestVisibilityLabelsWithDistributedLogReplay.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/visibility/TestVisibilityLabelsWithDistributedLogReplay.java
index 3476f90..11a830c 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/visibility/TestVisibilityLabelsWithDistributedLogReplay.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/visibility/TestVisibilityLabelsWithDistributedLogReplay.java
@@ -29,7 +29,8 @@ import org.junit.experimental.categories.Category;
  * Test class that tests the visibility labels with distributed log replay 
feature ON.
  */
 @Category(MediumTests.class)
-public class TestVisibilityLabelsWithDistributedLogReplay extends 
TestVisibilityLabels {
+public class TestVisibilityLabelsWithDistributedLogReplay extends
+    TestVisibilityLabelsWithDefaultVisLabelService {
 
   @BeforeClass
   public static void setupBeforeClass() throws Exception {
@@ -43,6 +44,7 @@ public class TestVisibilityLabelsWithDistributedLogReplay 
extends TestVisibility
     conf.set("hbase.superuser", "admin");
     TEST_UTIL.startMiniCluster(2);
     SUPERUSER = User.createUserForTesting(conf, "admin", new String[] { 
"supergroup" });
+    USER1 = User.createUserForTesting(conf, "user1", new String[] {});
 
     // Wait for the labels table to become available
     TEST_UTIL.waitTableEnabled(LABELS_TABLE_NAME.getName(), 50000);

http://git-wip-us.apache.org/repos/asf/hbase/blob/d7986121/hbase-server/src/test/java/org/apache/hadoop/hbase/security/visibility/TestVisibilityWithCheckAuths.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/visibility/TestVisibilityWithCheckAuths.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/visibility/TestVisibilityWithCheckAuths.java
index c4f667b..77b9e7f 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/visibility/TestVisibilityWithCheckAuths.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/visibility/TestVisibilityWithCheckAuths.java
@@ -68,9 +68,7 @@ public class TestVisibilityWithCheckAuths {
     // setup configuration
     conf = TEST_UTIL.getConfiguration();
     conf.setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, false);
-    conf.setInt("hfile.format.version", 3);
-    conf.set("hbase.coprocessor.master.classes", 
VisibilityController.class.getName());
-    conf.set("hbase.coprocessor.region.classes", 
VisibilityController.class.getName());
+    VisibilityTestUtil.enableVisiblityLabels(conf);
     conf.setBoolean(VisibilityConstants.CHECK_AUTHS_FOR_MUTATION, true);
     conf.setClass(VisibilityUtils.VISIBILITY_LABEL_GENERATOR_CLASS, 
SimpleScanLabelGenerator.class,
         ScanLabelGenerator.class);

Reply via email to