HBASE-14761 Deletes with and without visibility expression do not delete
the matching mutation (Ram)


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/c92737c0
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/c92737c0
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/c92737c0

Branch: refs/heads/hbase-12439
Commit: c92737c0e912563aeba2112ab8df74af976e720a
Parents: cf81b45
Author: ramkrishna <[email protected]>
Authored: Thu Nov 19 11:20:45 2015 +0530
Committer: ramkrishna <[email protected]>
Committed: Thu Nov 19 11:21:50 2015 +0530

----------------------------------------------------------------------
 .../visibility/VisibilityScanDeleteTracker.java | 166 ++++++-----
 .../TestVisibilityLabelsWithDeletes.java        | 287 +++++++++++++++++++
 2 files changed, 377 insertions(+), 76 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/c92737c0/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityScanDeleteTracker.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityScanDeleteTracker.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityScanDeleteTracker.java
index 4fd3568..51971e7 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityScanDeleteTracker.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityScanDeleteTracker.java
@@ -116,21 +116,26 @@ public class VisibilityScanDeleteTracker extends 
ScanDeleteTracker {
     // If tag is present in the delete
     boolean hasVisTag = false;
     if (delCell.getTagsLength() > 0) {
+      Byte deleteCellVisTagsFormat = null;
       switch (type) {
       case DeleteFamily:
         List<Tag> delTags = new ArrayList<Tag>();
-        if (visibilityTagsDeleteFamily != null) {
-          Byte deleteCellVisTagsFormat = 
VisibilityUtils.extractVisibilityTags(delCell, delTags);
-          if (!delTags.isEmpty()) {
-            visibilityTagsDeleteFamily.add(new Triple<List<Tag>, Byte, Long>(
-                delTags, deleteCellVisTagsFormat, delCell.getTimestamp()));
-            hasVisTag = true;
-          }
+        if (visibilityTagsDeleteFamily == null) {
+          visibilityTagsDeleteFamily = new ArrayList<Triple<List<Tag>, Byte, 
Long>>();
+        }
+        deleteCellVisTagsFormat = 
VisibilityUtils.extractVisibilityTags(delCell, delTags);
+        if (!delTags.isEmpty()) {
+          visibilityTagsDeleteFamily.add(new Triple<List<Tag>, Byte, 
Long>(delTags,
+              deleteCellVisTagsFormat, delCell.getTimestamp()));
+          hasVisTag = true;
         }
         break;
       case DeleteFamilyVersion:
+        if(visibilityTagsDeleteFamilyVersion == null) {
+          visibilityTagsDeleteFamilyVersion = new ArrayList<Triple<List<Tag>, 
Byte, Long>>();
+        }
         delTags = new ArrayList<Tag>();
-        Byte deleteCellVisTagsFormat = 
VisibilityUtils.extractVisibilityTags(delCell, delTags);
+        deleteCellVisTagsFormat = 
VisibilityUtils.extractVisibilityTags(delCell, delTags);
         if (!delTags.isEmpty()) {
           visibilityTagsDeleteFamilyVersion.add(new Triple<List<Tag>, Byte, 
Long>(delTags,
               deleteCellVisTagsFormat, delCell.getTimestamp()));
@@ -164,23 +169,6 @@ public class VisibilityScanDeleteTracker extends 
ScanDeleteTracker {
       default:
         throw new IllegalArgumentException("Invalid delete type");
       }
-    } else {
-      switch (type) {
-      case DeleteFamily:
-        visibilityTagsDeleteFamily = null;
-        break;
-      case DeleteFamilyVersion:
-        visibilityTagsDeleteFamilyVersion = null;
-        break;
-      case DeleteColumn:
-        visibilityTagsDeleteColumns = null;
-        break;
-      case Delete:
-        visiblityTagsDeleteColumnVersion = null;
-        break;
-      default:
-        throw new IllegalArgumentException("Invalid delete type");
-      }
     }
     return hasVisTag;
   }
@@ -191,28 +179,35 @@ public class VisibilityScanDeleteTracker extends 
ScanDeleteTracker {
     try {
       if (hasFamilyStamp) {
         if (visibilityTagsDeleteFamily != null) {
-          for (int i = 0; i < visibilityTagsDeleteFamily.size(); i++) {
-            // visibilityTagsDeleteFamily is ArrayList
-            Triple<List<Tag>, Byte, Long> triple = 
visibilityTagsDeleteFamily.get(i);
-            if (timestamp <= triple.getThird()) {
-              List<Tag> putVisTags = new ArrayList<Tag>();
-              Byte putCellVisTagsFormat = 
VisibilityUtils.extractVisibilityTags(cell, putVisTags);
-              boolean matchFound = VisibilityLabelServiceManager
-                  .getInstance()
-                  .getVisibilityLabelService()
-                  .matchVisibility(putVisTags, putCellVisTagsFormat, 
triple.getFirst(),
-                      triple.getSecond());
-              if (matchFound) {
-                // A return type of FAMILY_DELETED will cause skip for all 
remaining cells from this
-                // family. We would like to match visibility expression on 
every put cells after
-                // this and only remove those matching with the family delete 
visibility. So we are
-                // returning FAMILY_VERSION_DELETED from here.
-                return DeleteResult.FAMILY_VERSION_DELETED;
+          if (!visibilityTagsDeleteFamily.isEmpty()) {
+            for (int i = 0; i < visibilityTagsDeleteFamily.size(); i++) {
+              // visibilityTagsDeleteFamily is ArrayList
+              Triple<List<Tag>, Byte, Long> triple = 
visibilityTagsDeleteFamily.get(i);
+              if (timestamp <= triple.getThird()) {
+                List<Tag> putVisTags = new ArrayList<Tag>();
+                Byte putCellVisTagsFormat = 
VisibilityUtils.extractVisibilityTags(cell, putVisTags);
+                boolean matchFound = 
VisibilityLabelServiceManager.getInstance()
+                    .getVisibilityLabelService().matchVisibility(putVisTags, 
putCellVisTagsFormat,
+                      triple.getFirst(), triple.getSecond());
+                if (matchFound) {
+                  // A return type of FAMILY_DELETED will cause skip for all 
remaining cells from
+                  // this
+                  // family. We would like to match visibility expression on 
every put cells after
+                  // this and only remove those matching with the family 
delete visibility. So we
+                  // are
+                  // returning FAMILY_VERSION_DELETED from here.
+                  return DeleteResult.FAMILY_VERSION_DELETED;
+                }
               }
             }
+          } else {
+            if (!VisibilityUtils.isVisibilityTagsPresent(cell) && timestamp <= 
familyStamp) {
+              // No tags
+              return DeleteResult.FAMILY_VERSION_DELETED;
+            }
           }
         } else {
-          if (!VisibilityUtils.isVisibilityTagsPresent(cell) && 
timestamp<=familyStamp) {
+          if (!VisibilityUtils.isVisibilityTagsPresent(cell) && timestamp <= 
familyStamp) {
             // No tags
             return DeleteResult.FAMILY_VERSION_DELETED;
           }
@@ -220,21 +215,26 @@ public class VisibilityScanDeleteTracker extends 
ScanDeleteTracker {
       }
       if (familyVersionStamps.contains(Long.valueOf(timestamp))) {
         if (visibilityTagsDeleteFamilyVersion != null) {
-          for (int i = 0; i < visibilityTagsDeleteFamilyVersion.size(); i++) {
-            // visibilityTagsDeleteFamilyVersion is ArrayList
-            Triple<List<Tag>, Byte, Long> triple = 
visibilityTagsDeleteFamilyVersion.get(i);
-            if (timestamp == triple.getThird()) {
-              List<Tag> putVisTags = new ArrayList<Tag>();
-              Byte putCellVisTagsFormat = 
VisibilityUtils.extractVisibilityTags(cell, putVisTags);
-              boolean matchFound = VisibilityLabelServiceManager
-                  .getInstance()
-                  .getVisibilityLabelService()
-                  .matchVisibility(putVisTags, putCellVisTagsFormat, 
triple.getFirst(),
-                      triple.getSecond());
-              if (matchFound) {
-                return DeleteResult.FAMILY_VERSION_DELETED;
+          if (!visibilityTagsDeleteFamilyVersion.isEmpty()) {
+            for (int i = 0; i < visibilityTagsDeleteFamilyVersion.size(); i++) 
{
+              // visibilityTagsDeleteFamilyVersion is ArrayList
+              Triple<List<Tag>, Byte, Long> triple = 
visibilityTagsDeleteFamilyVersion.get(i);
+              if (timestamp == triple.getThird()) {
+                List<Tag> putVisTags = new ArrayList<Tag>();
+                Byte putCellVisTagsFormat = 
VisibilityUtils.extractVisibilityTags(cell, putVisTags);
+                boolean matchFound = 
VisibilityLabelServiceManager.getInstance()
+                    .getVisibilityLabelService().matchVisibility(putVisTags, 
putCellVisTagsFormat,
+                      triple.getFirst(), triple.getSecond());
+                if (matchFound) {
+                  return DeleteResult.FAMILY_VERSION_DELETED;
+                }
               }
             }
+          } else {
+            if (!VisibilityUtils.isVisibilityTagsPresent(cell)) {
+              // No tags
+              return DeleteResult.FAMILY_VERSION_DELETED;
+            }
           }
         } else {
           if (!VisibilityUtils.isVisibilityTagsPresent(cell)) {
@@ -248,15 +248,21 @@ public class VisibilityScanDeleteTracker extends 
ScanDeleteTracker {
         if (ret == 0) {
           if (deleteType == KeyValue.Type.DeleteColumn.getCode()) {
             if (visibilityTagsDeleteColumns != null) {
-              for (Pair<List<Tag>, Byte> tags : visibilityTagsDeleteColumns) {
-                List<Tag> putVisTags = new ArrayList<Tag>();
-                Byte putCellVisTagsFormat = 
VisibilityUtils.extractVisibilityTags(cell, putVisTags);
-                boolean matchFound = VisibilityLabelServiceManager
-                    .getInstance()
-                    .getVisibilityLabelService()
-                    .matchVisibility(putVisTags, putCellVisTagsFormat, 
tags.getFirst(),
-                        tags.getSecond());
-                if (matchFound) {
+              if (!visibilityTagsDeleteColumns.isEmpty()) {
+                for (Pair<List<Tag>, Byte> tags : visibilityTagsDeleteColumns) 
{
+                  List<Tag> putVisTags = new ArrayList<Tag>();
+                  Byte putCellVisTagsFormat =
+                      VisibilityUtils.extractVisibilityTags(cell, putVisTags);
+                  boolean matchFound = 
VisibilityLabelServiceManager.getInstance()
+                      .getVisibilityLabelService().matchVisibility(putVisTags, 
putCellVisTagsFormat,
+                        tags.getFirst(), tags.getSecond());
+                  if (matchFound) {
+                    return DeleteResult.VERSION_DELETED;
+                  }
+                }
+              } else {
+                if (!VisibilityUtils.isVisibilityTagsPresent(cell)) {
+                  // No tags
                   return DeleteResult.VERSION_DELETED;
                 }
               }
@@ -271,15 +277,21 @@ public class VisibilityScanDeleteTracker extends 
ScanDeleteTracker {
           // If the timestamp is the same, keep this one
           if (timestamp == deleteTimestamp) {
             if (visiblityTagsDeleteColumnVersion != null) {
-              for (Pair<List<Tag>, Byte> tags : 
visiblityTagsDeleteColumnVersion) {
-                List<Tag> putVisTags = new ArrayList<Tag>();
-                Byte putCellVisTagsFormat = 
VisibilityUtils.extractVisibilityTags(cell, putVisTags);
-                boolean matchFound = VisibilityLabelServiceManager
-                    .getInstance()
-                    .getVisibilityLabelService()
-                    .matchVisibility(putVisTags, putCellVisTagsFormat, 
tags.getFirst(),
-                        tags.getSecond());
-                if (matchFound) {
+              if (!visiblityTagsDeleteColumnVersion.isEmpty()) {
+                for (Pair<List<Tag>, Byte> tags : 
visiblityTagsDeleteColumnVersion) {
+                  List<Tag> putVisTags = new ArrayList<Tag>();
+                  Byte putCellVisTagsFormat =
+                      VisibilityUtils.extractVisibilityTags(cell, putVisTags);
+                  boolean matchFound = 
VisibilityLabelServiceManager.getInstance()
+                      .getVisibilityLabelService().matchVisibility(putVisTags, 
putCellVisTagsFormat,
+                        tags.getFirst(), tags.getSecond());
+                  if (matchFound) {
+                    return DeleteResult.VERSION_DELETED;
+                  }
+                }
+              } else {
+                if (!VisibilityUtils.isVisibilityTagsPresent(cell)) {
+                  // No tags
                   return DeleteResult.VERSION_DELETED;
                 }
               }
@@ -293,6 +305,7 @@ public class VisibilityScanDeleteTracker extends 
ScanDeleteTracker {
         } else if (ret > 0) {
           // Next column case.
           deleteBuffer = null;
+          // Can nullify this because we are moving to the next column
           visibilityTagsDeleteColumns = null;
           visiblityTagsDeleteColumnVersion = null;
         } else {
@@ -312,9 +325,10 @@ public class VisibilityScanDeleteTracker extends 
ScanDeleteTracker {
   @Override
   public void reset() {
     super.reset();
+    // clear only here
     visibilityTagsDeleteColumns = null;
-    visibilityTagsDeleteFamily = new ArrayList<Triple<List<Tag>, Byte, 
Long>>();
-    visibilityTagsDeleteFamilyVersion = new ArrayList<Triple<List<Tag>, Byte, 
Long>>();
+    visibilityTagsDeleteFamily = null;
+    visibilityTagsDeleteFamilyVersion = null;
     visiblityTagsDeleteColumnVersion = null;
   }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/c92737c0/hbase-server/src/test/java/org/apache/hadoop/hbase/security/visibility/TestVisibilityLabelsWithDeletes.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/visibility/TestVisibilityLabelsWithDeletes.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/visibility/TestVisibilityLabelsWithDeletes.java
index 52c43b2..56078d8 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/visibility/TestVisibilityLabelsWithDeletes.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/visibility/TestVisibilityLabelsWithDeletes.java
@@ -470,6 +470,293 @@ public class TestVisibilityLabelsWithDeletes {
   }
 
   @Test
+  public void testDeleteColumnsWithoutAndWithVisibilityLabels() throws 
Exception {
+    final TableName tableName = TableName.valueOf(TEST_NAME.getMethodName());
+    Admin hBaseAdmin = TEST_UTIL.getHBaseAdmin();
+    HColumnDescriptor colDesc = new HColumnDescriptor(fam);
+    HTableDescriptor desc = new HTableDescriptor(tableName);
+    desc.addFamily(colDesc);
+    hBaseAdmin.createTable(desc);
+    try (Table table = TEST_UTIL.getConnection().getTable(tableName)) {
+      Put put = new Put(row1);
+      put.addColumn(fam, qual, value);
+      put.setCellVisibility(new CellVisibility(CONFIDENTIAL));
+      table.put(put);
+      Delete d = new Delete(row1);
+      // without visibility
+      d.addColumns(fam, qual, HConstants.LATEST_TIMESTAMP);
+      table.delete(d);
+      PrivilegedExceptionAction<Void> scanAction = new 
PrivilegedExceptionAction<Void>() {
+        @Override
+        public Void run() throws Exception {
+          try (Connection connection = 
ConnectionFactory.createConnection(conf);
+              Table table = connection.getTable(tableName)) {
+            Scan s = new Scan();
+            ResultScanner scanner = table.getScanner(s);
+            Result[] next = scanner.next(3);
+            assertEquals(next.length, 1);
+          } catch (Throwable t) {
+            throw new IOException(t);
+          }
+          return null;
+        }
+      };
+      SUPERUSER.runAs(scanAction);
+      d = new Delete(row1);
+      // with visibility
+      d.setCellVisibility(new CellVisibility(CONFIDENTIAL));
+      d.addColumns(fam, qual, HConstants.LATEST_TIMESTAMP);
+      table.delete(d);
+      scanAction = new PrivilegedExceptionAction<Void>() {
+        @Override
+        public Void run() throws Exception {
+          try (Connection connection = 
ConnectionFactory.createConnection(conf);
+              Table table = connection.getTable(tableName)) {
+            Scan s = new Scan();
+            ResultScanner scanner = table.getScanner(s);
+            Result[] next = scanner.next(3);
+            assertEquals(next.length, 0);
+          } catch (Throwable t) {
+            throw new IOException(t);
+          }
+          return null;
+        }
+      };
+      SUPERUSER.runAs(scanAction);
+    }
+  }
+
+  @Test
+  public void testDeleteColumnsWithAndWithoutVisibilityLabels() throws 
Exception {
+    final TableName tableName = TableName.valueOf(TEST_NAME.getMethodName());
+    Admin hBaseAdmin = TEST_UTIL.getHBaseAdmin();
+    HColumnDescriptor colDesc = new HColumnDescriptor(fam);
+    HTableDescriptor desc = new HTableDescriptor(tableName);
+    desc.addFamily(colDesc);
+    hBaseAdmin.createTable(desc);
+    try (Table table = TEST_UTIL.getConnection().getTable(tableName)) {
+      Put put = new Put(row1);
+      put.addColumn(fam, qual, value);
+      put.setCellVisibility(new CellVisibility(CONFIDENTIAL));
+      table.put(put);
+      Delete d = new Delete(row1);
+      // with visibility
+      d.setCellVisibility(new CellVisibility(CONFIDENTIAL));
+      d.addColumns(fam, qual, HConstants.LATEST_TIMESTAMP);
+      table.delete(d);
+      PrivilegedExceptionAction<Void> scanAction = new 
PrivilegedExceptionAction<Void>() {
+        @Override
+        public Void run() throws Exception {
+          try (Connection connection = 
ConnectionFactory.createConnection(conf);
+              Table table = connection.getTable(tableName)) {
+            Scan s = new Scan();
+            ResultScanner scanner = table.getScanner(s);
+            Result[] next = scanner.next(3);
+            assertEquals(next.length, 0);
+          } catch (Throwable t) {
+            throw new IOException(t);
+          }
+          return null;
+        }
+      };
+      SUPERUSER.runAs(scanAction);
+      d = new Delete(row1);
+      // without visibility
+      d.addColumns(fam, qual, HConstants.LATEST_TIMESTAMP);
+      table.delete(d);
+      scanAction = new PrivilegedExceptionAction<Void>() {
+        @Override
+        public Void run() throws Exception {
+          try (Connection connection = 
ConnectionFactory.createConnection(conf);
+              Table table = connection.getTable(tableName)) {
+            Scan s = new Scan();
+            ResultScanner scanner = table.getScanner(s);
+            Result[] next = scanner.next(3);
+            assertEquals(next.length, 0);
+          } catch (Throwable t) {
+            throw new IOException(t);
+          }
+          return null;
+        }
+      };
+      SUPERUSER.runAs(scanAction);
+    }
+  }
+
+  @Test
+  public void testDeleteFamiliesWithoutAndWithVisibilityLabels() throws 
Exception {
+    final TableName tableName = TableName.valueOf(TEST_NAME.getMethodName());
+    Admin hBaseAdmin = TEST_UTIL.getHBaseAdmin();
+    HColumnDescriptor colDesc = new HColumnDescriptor(fam);
+    HTableDescriptor desc = new HTableDescriptor(tableName);
+    desc.addFamily(colDesc);
+    hBaseAdmin.createTable(desc);
+    try (Table table = TEST_UTIL.getConnection().getTable(tableName)) {
+      Put put = new Put(row1);
+      put.addColumn(fam, qual, value);
+      put.setCellVisibility(new CellVisibility(CONFIDENTIAL));
+      table.put(put);
+      Delete d = new Delete(row1);
+      // without visibility
+      d.addFamily(fam);
+      table.delete(d);
+      PrivilegedExceptionAction<Void> scanAction = new 
PrivilegedExceptionAction<Void>() {
+        @Override
+        public Void run() throws Exception {
+          try (Connection connection = 
ConnectionFactory.createConnection(conf);
+              Table table = connection.getTable(tableName)) {
+            Scan s = new Scan();
+            ResultScanner scanner = table.getScanner(s);
+            Result[] next = scanner.next(3);
+            assertEquals(next.length, 1);
+          } catch (Throwable t) {
+            throw new IOException(t);
+          }
+          return null;
+        }
+      };
+      SUPERUSER.runAs(scanAction);
+      d = new Delete(row1);
+      // with visibility
+      d.setCellVisibility(new CellVisibility(CONFIDENTIAL));
+      d.addFamily(fam);
+      table.delete(d);
+      scanAction = new PrivilegedExceptionAction<Void>() {
+        @Override
+        public Void run() throws Exception {
+          try (Connection connection = 
ConnectionFactory.createConnection(conf);
+              Table table = connection.getTable(tableName)) {
+            Scan s = new Scan();
+            ResultScanner scanner = table.getScanner(s);
+            Result[] next = scanner.next(3);
+            assertEquals(next.length, 0);
+          } catch (Throwable t) {
+            throw new IOException(t);
+          }
+          return null;
+        }
+      };
+      SUPERUSER.runAs(scanAction);
+    }
+  }
+
+  @Test
+  public void testDeleteFamiliesWithAndWithoutVisibilityLabels() throws 
Exception {
+    final TableName tableName = TableName.valueOf(TEST_NAME.getMethodName());
+    Admin hBaseAdmin = TEST_UTIL.getHBaseAdmin();
+    HColumnDescriptor colDesc = new HColumnDescriptor(fam);
+    HTableDescriptor desc = new HTableDescriptor(tableName);
+    desc.addFamily(colDesc);
+    hBaseAdmin.createTable(desc);
+    try (Table table = TEST_UTIL.getConnection().getTable(tableName)) {
+      Put put = new Put(row1);
+      put.addColumn(fam, qual, value);
+      put.setCellVisibility(new CellVisibility(CONFIDENTIAL));
+      table.put(put);
+      Delete d = new Delete(row1);
+      d.setCellVisibility(new CellVisibility(CONFIDENTIAL));
+      // with visibility
+      d.addFamily(fam);
+      table.delete(d);
+      PrivilegedExceptionAction<Void> scanAction = new 
PrivilegedExceptionAction<Void>() {
+        @Override
+        public Void run() throws Exception {
+          try (Connection connection = 
ConnectionFactory.createConnection(conf);
+              Table table = connection.getTable(tableName)) {
+            Scan s = new Scan();
+            ResultScanner scanner = table.getScanner(s);
+            Result[] next = scanner.next(3);
+            assertEquals(next.length, 0);
+          } catch (Throwable t) {
+            throw new IOException(t);
+          }
+          return null;
+        }
+      };
+      SUPERUSER.runAs(scanAction);
+      d = new Delete(row1);
+      // without visibility
+      d.addFamily(fam);
+      table.delete(d);
+      scanAction = new PrivilegedExceptionAction<Void>() {
+        @Override
+        public Void run() throws Exception {
+          try (Connection connection = 
ConnectionFactory.createConnection(conf);
+              Table table = connection.getTable(tableName)) {
+            Scan s = new Scan();
+            ResultScanner scanner = table.getScanner(s);
+            Result[] next = scanner.next(3);
+            assertEquals(next.length, 0);
+          } catch (Throwable t) {
+            throw new IOException(t);
+          }
+          return null;
+        }
+      };
+      SUPERUSER.runAs(scanAction);
+    }
+  }
+
+  @Test
+  public void testDeletesWithoutAndWithVisibilityLabels() throws Exception {
+    final TableName tableName = TableName.valueOf(TEST_NAME.getMethodName());
+    Admin hBaseAdmin = TEST_UTIL.getHBaseAdmin();
+    HColumnDescriptor colDesc = new HColumnDescriptor(fam);
+    HTableDescriptor desc = new HTableDescriptor(tableName);
+    desc.addFamily(colDesc);
+    hBaseAdmin.createTable(desc);
+    try (Table table = TEST_UTIL.getConnection().getTable(tableName)) {
+      Put put = new Put(row1);
+      put.addColumn(fam, qual, value);
+      put.setCellVisibility(new CellVisibility(CONFIDENTIAL));
+      table.put(put);
+      Delete d = new Delete(row1);
+      // without visibility
+      d.addColumn(fam, qual);
+      table.delete(d);
+      PrivilegedExceptionAction<Void> scanAction = new 
PrivilegedExceptionAction<Void>() {
+        @Override
+        public Void run() throws Exception {
+          try (Connection connection = 
ConnectionFactory.createConnection(conf);
+              Table table = connection.getTable(tableName)) {
+            Scan s = new Scan();
+            ResultScanner scanner = table.getScanner(s);
+            // The delete would not be able to apply it because of visibility 
mismatch
+            Result[] next = scanner.next(3);
+            assertEquals(next.length, 1);
+          } catch (Throwable t) {
+            throw new IOException(t);
+          }
+          return null;
+        }
+      };
+      SUPERUSER.runAs(scanAction);
+      d = new Delete(row1);
+      // with visibility
+      d.setCellVisibility(new CellVisibility(CONFIDENTIAL));
+      d.addColumn(fam, qual);
+      table.delete(d);
+      scanAction = new PrivilegedExceptionAction<Void>() {
+        @Override
+        public Void run() throws Exception {
+          try (Connection connection = 
ConnectionFactory.createConnection(conf);
+              Table table = connection.getTable(tableName)) {
+            Scan s = new Scan();
+            ResultScanner scanner = table.getScanner(s);
+            Result[] next = scanner.next(3);
+            // this will alone match
+            assertEquals(next.length, 0);
+          } catch (Throwable t) {
+            throw new IOException(t);
+          }
+          return null;
+        }
+      };
+      SUPERUSER.runAs(scanAction);
+    }
+  }
+
+  @Test
   public void testVisibilityLabelsWithDeleteFamilyWithPutsReAppearing() throws 
Exception {
     final TableName tableName = TableName.valueOf(TEST_NAME.getMethodName());
     Admin hBaseAdmin = TEST_UTIL.getHBaseAdmin();

Reply via email to