Repository: hbase
Updated Branches:
  refs/heads/master 5bc518b38 -> 317136e27


HBASE-16754 All WALSplitter OutputSinks should process compaction events


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/317136e2
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/317136e2
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/317136e2

Branch: refs/heads/master
Commit: 317136e2721f0722104283cba326bde4c59d0742
Parents: 5bc518b
Author: Gary Helmling <ga...@apache.org>
Authored: Thu Oct 13 15:31:42 2016 -0700
Committer: Gary Helmling <ga...@apache.org>
Committed: Tue Oct 18 09:37:37 2016 -0700

----------------------------------------------------------------------
 .../hadoop/hbase/regionserver/wal/WALEdit.java  | 11 +++-
 .../RegionReplicaReplicationEndpoint.java       |  2 +-
 .../apache/hadoop/hbase/wal/WALSplitter.java    | 19 ++++---
 .../apache/hadoop/hbase/wal/TestWALSplit.java   | 54 ++++++++++++++++++++
 4 files changed, 77 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/317136e2/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java
index f92db13..75c1c3e 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java
@@ -363,13 +363,22 @@ public class WALEdit implements Writable, HeapSize {
    * @return deserialized CompactionDescriptor or null.
    */
   public static CompactionDescriptor getCompaction(Cell kv) throws IOException 
{
-    if (CellUtil.matchingColumn(kv, METAFAMILY, COMPACTION)) {
+    if (isCompactionMarker(kv)) {
       return CompactionDescriptor.parseFrom(CellUtil.cloneValue(kv));
     }
     return null;
   }
 
   /**
+   * Returns true if the given cell is a serialized {@link 
CompactionDescriptor}
+   *
+   * @see #getCompaction(Cell)
+   */
+  public static boolean isCompactionMarker(Cell cell) {
+    return CellUtil.matchingColumn(cell, METAFAMILY, COMPACTION);
+  }
+
+  /**
    * Create a bulk loader WALEdit
    *
    * @param hri                The HRegionInfo for the region in which we are 
bulk loading

http://git-wip-us.apache.org/repos/asf/hbase/blob/317136e2/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java
index 0e33e55..dc4fad0 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java
@@ -361,7 +361,7 @@ public class RegionReplicaReplicationEndpoint extends 
HBaseReplicationEndpoint {
     }
 
     @Override
-    public boolean keepRegionEvents() {
+    public boolean keepRegionEvent(Entry entry) {
       return true;
     }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/317136e2/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java
index 0483651..2fe9f38 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java
@@ -358,7 +358,7 @@ public class WALSplitter {
           continue;
         }
         // Don't send Compaction/Close/Open region events to recovered edit 
type sinks.
-        if (entry.getEdit().isMetaEdit() && !outputSink.keepRegionEvents()) {
+        if (entry.getEdit().isMetaEdit() && 
!outputSink.keepRegionEvent(entry)) {
           editsSkipped++;
           continue;
         }
@@ -1277,12 +1277,11 @@ public class WALSplitter {
 
     /**
      * Some WALEdit's contain only KV's for account on what happened to a 
region.
-     * Not all sinks will want to get those edits.
+     * Not all sinks will want to get all of those edits.
      *
-     * @return Return true if this sink wants to get all WALEdit's regardless 
of if it's a region
-     * event.
+     * @return Return true if this sink wants to accept this region-level 
WALEdit.
      */
-    public abstract boolean keepRegionEvents();
+    public abstract boolean keepRegionEvent(Entry entry);
   }
 
   /**
@@ -1627,7 +1626,13 @@ public class WALSplitter {
     }
 
     @Override
-    public boolean keepRegionEvents() {
+    public boolean keepRegionEvent(Entry entry) {
+      ArrayList<Cell> cells = entry.getEdit().getCells();
+      for (int i = 0; i < cells.size(); i++) {
+        if (WALEdit.isCompactionMarker(cells.get(i))) {
+          return true;
+        }
+      }
       return false;
     }
 
@@ -2082,7 +2087,7 @@ public class WALSplitter {
     }
 
     @Override
-    public boolean keepRegionEvents() {
+    public boolean keepRegionEvent(Entry entry) {
       return true;
     }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/317136e2/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplit.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplit.java 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplit.java
index bc6ec52..de8d9e1 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplit.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplit.java
@@ -32,6 +32,7 @@ import java.io.IOException;
 import java.lang.reflect.Method;
 import java.security.PrivilegedExceptionAction;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -61,6 +62,7 @@ import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.shaded.com.google.protobuf.ByteString;
 import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos;
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode;
@@ -449,6 +451,39 @@ public class TestWALSplit {
     assertEquals(1, splitLog.length);
 
     assertFalse("edits differ after split", logsAreEqual(originalLog, 
splitLog[0]));
+    // split log should only have the test edits
+    assertEquals(10, countWAL(splitLog[0]));
+  }
+
+
+  @Test (timeout=300000)
+  public void testSplitLeavesCompactionEventsEdits() throws IOException{
+    HRegionInfo hri = new HRegionInfo(TABLE_NAME);
+    REGIONS.clear();
+    REGIONS.add(hri.getEncodedName());
+    Path regionDir = new Path(FSUtils.getTableDir(HBASEDIR, TABLE_NAME), 
hri.getEncodedName());
+    LOG.info("Creating region directory: " + regionDir);
+    assertTrue(fs.mkdirs(regionDir));
+
+    Writer writer = generateWALs(1, 10, 0, 10);
+    String[] compactInputs = new String[]{"file1", "file2", "file3"};
+    String compactOutput = "file4";
+    appendCompactionEvent(writer, hri, compactInputs, compactOutput);
+    writer.close();
+
+    useDifferentDFSClient();
+    WALSplitter.split(HBASEDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
+
+    Path originalLog = (fs.listStatus(OLDLOGDIR))[0].getPath();
+    // original log should have 10 test edits, 10 region markers, 1 compaction 
marker
+    assertEquals(21, countWAL(originalLog));
+
+    Path[] splitLog = getLogForRegion(HBASEDIR, TABLE_NAME, 
hri.getEncodedName());
+    assertEquals(1, splitLog.length);
+
+    assertFalse("edits differ after split", logsAreEqual(originalLog, 
splitLog[0]));
+    // split log should have 10 test edits plus 1 compaction marker
+    assertEquals(11, countWAL(splitLog[0]));
   }
 
   /**
@@ -1300,6 +1335,24 @@ public class TestWALSplit {
     return count;
   }
 
+  private static void appendCompactionEvent(Writer w, HRegionInfo hri, 
String[] inputs,
+      String output) throws IOException {
+    WALProtos.CompactionDescriptor.Builder desc = 
WALProtos.CompactionDescriptor.newBuilder();
+    desc.setTableName(ByteString.copyFrom(hri.getTable().toBytes()))
+        .setEncodedRegionName(ByteString.copyFrom(hri.getEncodedNameAsBytes()))
+        .setRegionName(ByteString.copyFrom(hri.getRegionName()))
+        .setFamilyName(ByteString.copyFrom(FAMILY))
+        .setStoreHomeDir(hri.getEncodedName() + "/" + Bytes.toString(FAMILY))
+        .addAllCompactionInput(Arrays.asList(inputs))
+        .addCompactionOutput(output);
+
+    WALEdit edit = WALEdit.createCompaction(hri, desc.build());
+    WALKey key = new WALKey(hri.getEncodedNameAsBytes(), TABLE_NAME, 1,
+        EnvironmentEdgeManager.currentTime(), HConstants.DEFAULT_CLUSTER_ID);
+    w.append(new Entry(key, edit));
+    w.sync();
+  }
+
   private static void appendRegionEvent(Writer w, String region) throws 
IOException {
     WALProtos.RegionEventDescriptor regionOpenDesc = 
ProtobufUtil.toRegionEventDescriptor(
         WALProtos.RegionEventDescriptor.EventType.REGION_OPEN,
@@ -1315,6 +1368,7 @@ public class TestWALSplit {
         HConstants.DEFAULT_CLUSTER_ID);
     w.append(
         new Entry(walKey, new WALEdit().add(kv)));
+    w.sync();
   }
 
   public static long appendEntry(Writer writer, TableName table, byte[] region,

Reply via email to