Repository: hbase
Updated Branches:
  refs/heads/master 1f437ac22 -> b2fcf765a


HBASE-21363 Rewrite the buildingHoldCleanupTracker method in WALProcedureStore


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/b2fcf765
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/b2fcf765
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/b2fcf765

Branch: refs/heads/master
Commit: b2fcf765ae9c2764b52151523863d9f2f1f835bb
Parents: 1f437ac
Author: Duo Zhang <zhang...@apache.org>
Authored: Wed Oct 24 14:13:25 2018 +0800
Committer: Duo Zhang <zhang...@apache.org>
Committed: Wed Oct 24 14:14:19 2018 +0800

----------------------------------------------------------------------
 .../hbase/procedure2/store/BitSetNode.java      |   6 +-
 .../procedure2/store/ProcedureStoreTracker.java |  60 ++++----
 .../store/wal/ProcedureWALFormat.java           |  11 ++
 .../store/wal/ProcedureWALFormatReader.java     |   5 +-
 .../procedure2/store/wal/WALProcedureStore.java |  41 +++--
 .../hbase/procedure2/TestProcedureCleanup.java  | 148 ++++++++++++-------
 6 files changed, 163 insertions(+), 108 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/b2fcf765/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/BitSetNode.java
----------------------------------------------------------------------
diff --git 
a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/BitSetNode.java
 
b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/BitSetNode.java
index 2030c8b..3102bde 100644
--- 
a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/BitSetNode.java
+++ 
b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/BitSetNode.java
@@ -131,9 +131,11 @@ class BitSetNode {
 
   public BitSetNode(BitSetNode other, boolean resetDelete) {
     this.start = other.start;
-    this.partial = other.partial;
-    this.modified = other.modified.clone();
     // The resetDelete will be set to true when building cleanup tracker.
+    // as we will reset deleted flags for all the unmodified bits to 1, the 
partial flag is useless
+    // so set it to false for not confusing the developers when debugging.
+    this.partial = resetDelete ? false : other.partial;
+    this.modified = other.modified.clone();
     // The intention here is that, if a procedure is not modified in this 
tracker, then we do not
     // need to take care of it, so we will set deleted to true for these bits, 
i.e, if modified is
     // 0, then we set deleted to 1, otherwise keep it as is. So here, the 
equation is

http://git-wip-us.apache.org/repos/asf/hbase/blob/b2fcf765/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStoreTracker.java
----------------------------------------------------------------------
diff --git 
a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStoreTracker.java
 
b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStoreTracker.java
index 9f99e26..a0978e1 100644
--- 
a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStoreTracker.java
+++ 
b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStoreTracker.java
@@ -23,6 +23,7 @@ import java.util.Arrays;
 import java.util.Iterator;
 import java.util.Map;
 import java.util.TreeMap;
+import java.util.function.BiFunction;
 import java.util.stream.LongStream;
 import org.apache.hadoop.hbase.procedure2.Procedure;
 import org.apache.yetus.audience.InterfaceAudience;
@@ -87,7 +88,10 @@ public class ProcedureStoreTracker {
    */
   public void resetTo(ProcedureStoreTracker tracker, boolean resetDelete) {
     reset();
-    this.partial = tracker.partial;
+    // resetDelete will true if we are building the cleanup tracker, as we 
will reset deleted flags
+    // for all the unmodified bits to 1, the partial flag is useless so set it 
to false for not
+    // confusing the developers when debugging.
+    this.partial = resetDelete ? false : tracker.partial;
     this.minModifiedProcId = tracker.minModifiedProcId;
     this.maxModifiedProcId = tracker.maxModifiedProcId;
     this.keepDeletes = tracker.keepDeletes;
@@ -197,43 +201,19 @@ public class ProcedureStoreTracker {
     }
   }
 
-  /**
-   * Similar with {@link #setDeletedIfModified(long...)}, but here the {@code 
procId} are given by
-   * the {@code tracker}. If a procedure is modified by us, and also by the 
given {@code tracker},
-   * then we mark it as deleted.
-   * @see #setDeletedIfModified(long...)
-   */
-  public void setDeletedIfModifiedInBoth(ProcedureStoreTracker tracker, 
boolean globalTracker) {
+  private void setDeleteIf(ProcedureStoreTracker tracker,
+      BiFunction<BitSetNode, Long, Boolean> func) {
     BitSetNode trackerNode = null;
     for (BitSetNode node : map.values()) {
-      final long minProcId = node.getStart();
-      final long maxProcId = node.getEnd();
+      long minProcId = node.getStart();
+      long maxProcId = node.getEnd();
       for (long procId = minProcId; procId <= maxProcId; ++procId) {
         if (!node.isModified(procId)) {
           continue;
         }
 
         trackerNode = tracker.lookupClosestNode(trackerNode, procId);
-        if (trackerNode == null || !trackerNode.contains(procId)) {
-          // the procId is not exist in the track, we can only delete the proc
-          // if globalTracker set to true.
-          // Only if the procedure is not in the global tracker we can delete 
the
-          // the procedure. In other cases, the procedure may not update in a 
single
-          // log, we cannot delete it just because the log's track doesn't have
-          // any info for the procedure.
-          if (globalTracker) {
-            node.delete(procId);
-          }
-          continue;
-        }
-        // Only check delete in the global tracker, only global tracker has the
-        // whole picture
-        if (globalTracker && trackerNode.isDeleted(procId) == DeleteState.YES) 
{
-          node.delete(procId);
-          continue;
-        }
-        if (trackerNode.isModified(procId)) {
-          // the procedure was modified
+        if (func.apply(trackerNode, procId)) {
           node.delete(procId);
         }
       }
@@ -241,6 +221,26 @@ public class ProcedureStoreTracker {
   }
 
   /**
+   * For the global tracker, we will use this method to build the 
holdingCleanupTracker, as the
+   * modified flags will be cleared after rolling so we only need to test the 
deleted flags.
+   * @see #setDeletedIfModifiedInBoth(ProcedureStoreTracker)
+   */
+  public void setDeletedIfDeletedByThem(ProcedureStoreTracker tracker) {
+    setDeleteIf(tracker, (node, procId) -> node == null || 
!node.contains(procId) ||
+      node.isDeleted(procId) == DeleteState.YES);
+  }
+
+  /**
+   * Similar with {@link #setDeletedIfModified(long...)}, but here the {@code 
procId} are given by
+   * the {@code tracker}. If a procedure is modified by us, and also by the 
given {@code tracker},
+   * then we mark it as deleted.
+   * @see #setDeletedIfModified(long...)
+   */
+  public void setDeletedIfModifiedInBoth(ProcedureStoreTracker tracker) {
+    setDeleteIf(tracker, (node, procId) -> node != null && 
node.isModified(procId));
+  }
+
+  /**
    * lookup the node containing the specified procId.
    * @param node cached node to check before doing a lookup
    * @param procId the procId to lookup

http://git-wip-us.apache.org/repos/asf/hbase/blob/b2fcf765/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormat.java
----------------------------------------------------------------------
diff --git 
a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormat.java
 
b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormat.java
index c9986ed..179c740 100644
--- 
a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormat.java
+++ 
b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormat.java
@@ -73,6 +73,17 @@ public final class ProcedureWALFormat {
 
   private ProcedureWALFormat() {}
 
+  /**
+   * Load all the procedures in these ProcedureWALFiles, and rebuild the given 
{@code tracker} if
+   * needed, i.e, the {@code tracker} is a partial one.
+   * <p/>
+   * The method in the give {@code loader} will be called at the end after we 
load all the
+   * procedures and construct the hierarchy.
+   * <p/>
+   * And we will call the {@link ProcedureStoreTracker#resetModified()} method 
for the given
+   * {@code tracker} before returning, as it will be used to track the next 
proc wal file's modified
+   * procedures.
+   */
   public static void load(Iterator<ProcedureWALFile> logs, 
ProcedureStoreTracker tracker,
       Loader loader) throws IOException {
     ProcedureWALFormatReader reader = new ProcedureWALFormatReader(tracker, 
loader);

http://git-wip-us.apache.org/repos/asf/hbase/blob/b2fcf765/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormatReader.java
----------------------------------------------------------------------
diff --git 
a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormatReader.java
 
b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormatReader.java
index 2e1e06c..1b19abb 100644
--- 
a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormatReader.java
+++ 
b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormatReader.java
@@ -134,9 +134,8 @@ public class ProcedureWALFormatReader {
       }
       procedureMap.merge(localProcedureMap);
     }
-    if (localTracker.isPartial()) {
-      localTracker.setPartialFlag(false);
-    }
+    // Do not reset the partial flag for local tracker, as here the local 
tracker only know the
+    // procedures which are modified in this file.
   }
 
   public void finish() throws IOException {

http://git-wip-us.apache.org/repos/asf/hbase/blob/b2fcf765/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java
----------------------------------------------------------------------
diff --git 
a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java
 
b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java
index 0a89c3f..afde41b 100644
--- 
a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java
+++ 
b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java
@@ -97,7 +97,7 @@ import 
org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.Procedu
  * will first be initialized to the oldest file's tracker(which is stored in 
the trailer), using the
  * method {@link ProcedureStoreTracker#resetTo(ProcedureStoreTracker, 
boolean)}, and then merge it
  * with the tracker of every newer wal files, using the
- * {@link 
ProcedureStoreTracker#setDeletedIfModifiedInBoth(ProcedureStoreTracker, 
boolean)}.
+ * {@link 
ProcedureStoreTracker#setDeletedIfModifiedInBoth(ProcedureStoreTracker)}.
  * If we find out
  * that all the modified procedures for the oldest wal file are modified or 
deleted in newer wal
  * files, then we can delete it. This is because that, every time we call
@@ -1173,27 +1173,26 @@ public class WALProcedureStore extends 
ProcedureStoreBase {
     }
 
     // compute the holding tracker.
-    //  - the first WAL is used for the 'updates'
-    //  - the global tracker is passed in first to decide which procedures are 
not
-    //    exist anymore, so we can mark them as deleted in 
holdingCleanupTracker.
-    //    Only global tracker have the whole picture here.
-    //  - the other WALs are scanned to remove procs already updated in a 
newer wal.
-    //    If it is updated in a newer wal, we can mark it as delelted in 
holdingCleanupTracker
-    //    But, we can not delete it if it was shown deleted in the newer wal, 
as said
-    //    above.
-    // TODO: exit early if holdingCleanupTracker.isEmpty()
+    // - the first WAL is used for the 'updates'
+    // - the global tracker will be used to determine whether a procedure has 
been deleted
+    // - other trackers will be used to determine whether a procedure has been 
updated, as a deleted
+    // procedure can always be detected by checking the global tracker, we can 
save the deleted
+    // checks when applying other trackers
     holdingCleanupTracker.resetTo(logs.getFirst().getTracker(), true);
-    //Passing in the global tracker, we can delete the procedures not in the 
global
-    //tracker, because they are deleted in the later logs
-    holdingCleanupTracker.setDeletedIfModifiedInBoth(storeTracker, true);
-    for (int i = 1, size = logs.size() - 1; i < size; ++i) {
-      // Set deleteIfNotExists to false since a single log's tracker is passed 
in.
-      // Since a specific procedure may not show up in the log at all(not 
executed or
-      // updated during the time), we can not delete the procedure just 
because this log
-      // don't have the info of the procedure. We can delete the procedure 
only if
-      // in this log's tracker, it was cleanly showed that the procedure is 
modified or deleted
-      // in the corresponding BitSetNode.
-      
holdingCleanupTracker.setDeletedIfModifiedInBoth(logs.get(i).getTracker(), 
false);
+    holdingCleanupTracker.setDeletedIfDeletedByThem(storeTracker);
+    // the logs is a linked list, so avoid calling get(index) on it.
+    Iterator<ProcedureWALFile> iter = logs.iterator();
+    // skip the tracker for the first file when creating the iterator.
+    iter.next();
+    ProcedureStoreTracker tracker = iter.next().getTracker();
+    // testing iter.hasNext after calling iter.next to skip applying the 
tracker for last file,
+    // which is just the storeTracker above.
+    while (iter.hasNext()) {
+      holdingCleanupTracker.setDeletedIfModifiedInBoth(tracker);
+      if (holdingCleanupTracker.isEmpty()) {
+        break;
+      }
+      iter.next();
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/b2fcf765/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureCleanup.java
----------------------------------------------------------------------
diff --git 
a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureCleanup.java
 
b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureCleanup.java
index e06fdc5..82917ea 100644
--- 
a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureCleanup.java
+++ 
b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureCleanup.java
@@ -19,8 +19,13 @@ package org.apache.hadoop.hbase.procedure2;
 
 import static org.junit.Assert.assertTrue;
 
+import java.io.IOException;
+import java.util.Arrays;
 import java.util.concurrent.CountDownLatch;
-
+import java.util.concurrent.Exchanger;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HBaseClassTestRule;
@@ -28,28 +33,32 @@ import org.apache.hadoop.hbase.HBaseCommonTestingUtility;
 import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore;
 import org.apache.hadoop.hbase.testclassification.MasterTests;
 import org.apache.hadoop.hbase.testclassification.SmallTests;
-import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.ClassRule;
+import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.hbase.thirdparty.com.google.common.io.ByteStreams;
 
-@Category({MasterTests.class, SmallTests.class})
+@Category({ MasterTests.class, SmallTests.class })
 public class TestProcedureCleanup {
-  @ClassRule public static final HBaseClassTestRule CLASS_RULE = 
HBaseClassTestRule
-      .forClass(TestProcedureCleanup.class);
+
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+    HBaseClassTestRule.forClass(TestProcedureCleanup.class);
 
 
   private static final Logger LOG = 
LoggerFactory.getLogger(TestProcedureCleanup.class);
-  private static final int PROCEDURE_EXECUTOR_SLOTS = 1;
 
-  private static TestProcEnv procEnv;
+  private static final int PROCEDURE_EXECUTOR_SLOTS = 2;
+
   private static WALProcedureStore procStore;
 
-  private static ProcedureExecutor<TestProcEnv> procExecutor;
+  private static ProcedureExecutor<Void> procExecutor;
 
   private static HBaseCommonTestingUtility htu;
 
@@ -57,43 +66,35 @@ public class TestProcedureCleanup {
   private static Path testDir;
   private static Path logDir;
 
-  private static class TestProcEnv {
-
-  }
+  @Rule
+  public final TestName name = new TestName();
 
-  private void createProcExecutor(String dir) throws Exception {
-    logDir = new Path(testDir, dir);
+  private void createProcExecutor() throws Exception {
+    logDir = new Path(testDir, name.getMethodName());
     procStore = ProcedureTestingUtility.createWalStore(htu.getConfiguration(), 
logDir);
-    procExecutor = new ProcedureExecutor<>(htu.getConfiguration(), procEnv,
-        procStore);
+    procExecutor = new ProcedureExecutor<>(htu.getConfiguration(), null, 
procStore);
     procStore.start(PROCEDURE_EXECUTOR_SLOTS);
-    ProcedureTestingUtility
-        .initAndStartWorkers(procExecutor, PROCEDURE_EXECUTOR_SLOTS, true, 
true);
+    ProcedureTestingUtility.initAndStartWorkers(procExecutor, 
PROCEDURE_EXECUTOR_SLOTS, true, true);
   }
 
   @BeforeClass
   public static void setUp() throws Exception {
     htu = new HBaseCommonTestingUtility();
-
+    
htu.getConfiguration().setBoolean(WALProcedureStore.EXEC_WAL_CLEANUP_ON_LOAD_CONF_KEY,
 true);
     // NOTE: The executor will be created by each test
-    procEnv = new TestProcEnv();
     testDir = htu.getDataTestDir();
     fs = testDir.getFileSystem(htu.getConfiguration());
     assertTrue(testDir.depth() > 1);
-
-
   }
 
   @Test
   public void testProcedureShouldNotCleanOnLoad() throws Exception {
-    createProcExecutor("testProcedureShouldNotCleanOnLoad");
+    createProcExecutor();
     final RootProcedure proc = new RootProcedure();
     long rootProc = procExecutor.submitProcedure(proc);
     LOG.info("Begin to execute " + rootProc);
     // wait until the child procedure arrival
-    while(procExecutor.getProcedures().size() < 2) {
-      Thread.sleep(100);
-    }
+    htu.waitFor(10000, () -> procExecutor.getProcedures().size() >= 2);
     SuspendProcedure suspendProcedure = (SuspendProcedure) procExecutor
         .getProcedures().get(1);
     // wait until the suspendProcedure executed
@@ -107,17 +108,17 @@ public class TestProcedureCleanup {
     LOG.info("begin to restart1 ");
     ProcedureTestingUtility.restart(procExecutor, true);
     LOG.info("finish to restart1 ");
-    Assert.assertTrue(procExecutor.getProcedure(rootProc) != null);
+    assertTrue(procExecutor.getProcedure(rootProc) != null);
     Thread.sleep(500);
     LOG.info("begin to restart2 ");
     ProcedureTestingUtility.restart(procExecutor, true);
     LOG.info("finish to restart2 ");
-    Assert.assertTrue(procExecutor.getProcedure(rootProc) != null);
+    assertTrue(procExecutor.getProcedure(rootProc) != null);
   }
 
   @Test
   public void testProcedureUpdatedShouldClean() throws Exception {
-    createProcExecutor("testProcedureUpdatedShouldClean");
+    createProcExecutor();
     SuspendProcedure suspendProcedure = new SuspendProcedure();
     long suspendProc = procExecutor.submitProcedure(suspendProcedure);
     LOG.info("Begin to execute " + suspendProc);
@@ -126,15 +127,13 @@ public class TestProcedureCleanup {
     LOG.info("begin to restart1 ");
     ProcedureTestingUtility.restart(procExecutor, true);
     LOG.info("finish to restart1 ");
-    while(procExecutor.getProcedure(suspendProc) == null) {
-      Thread.sleep(100);
-    }
+    htu.waitFor(10000, () -> procExecutor.getProcedure(suspendProc) != null);
     // Wait until the suspendProc executed after restart
     suspendProcedure = (SuspendProcedure) 
procExecutor.getProcedure(suspendProc);
     suspendProcedure.latch.countDown();
     Thread.sleep(500);
     // Should be 1 log since the suspendProcedure is updated in the new log
-    Assert.assertTrue(procStore.getActiveLogs().size() == 1);
+    assertTrue(procStore.getActiveLogs().size() == 1);
     // restart procExecutor
     LOG.info("begin to restart2");
     // Restart the executor but do not start the workers.
@@ -143,14 +142,14 @@ public class TestProcedureCleanup {
     ProcedureTestingUtility.restart(procExecutor, true, false);
     LOG.info("finish to restart2");
     // There should be two active logs
-    Assert.assertTrue(procStore.getActiveLogs().size() == 2);
+    assertTrue(procStore.getActiveLogs().size() == 2);
     procExecutor.startWorkers();
 
   }
 
   @Test
   public void testProcedureDeletedShouldClean() throws Exception {
-    createProcExecutor("testProcedureDeletedShouldClean");
+    createProcExecutor();
     WaitProcedure waitProcedure = new WaitProcedure();
     long waitProce = procExecutor.submitProcedure(waitProcedure);
     LOG.info("Begin to execute " + waitProce);
@@ -158,15 +157,13 @@ public class TestProcedureCleanup {
     LOG.info("begin to restart1 ");
     ProcedureTestingUtility.restart(procExecutor, true);
     LOG.info("finish to restart1 ");
-    while(procExecutor.getProcedure(waitProce) == null) {
-      Thread.sleep(100);
-    }
+    htu.waitFor(10000, () -> procExecutor.getProcedure(waitProce) != null);
     // Wait until the suspendProc executed after restart
     waitProcedure = (WaitProcedure) procExecutor.getProcedure(waitProce);
     waitProcedure.latch.countDown();
     Thread.sleep(500);
     // Should be 1 log since the suspendProcedure is updated in the new log
-    Assert.assertTrue(procStore.getActiveLogs().size() == 1);
+    assertTrue(procStore.getActiveLogs().size() == 1);
     // restart procExecutor
     LOG.info("begin to restart2");
     // Restart the executor but do not start the workers.
@@ -175,12 +172,64 @@ public class TestProcedureCleanup {
     ProcedureTestingUtility.restart(procExecutor, true, false);
     LOG.info("finish to restart2");
     // There should be two active logs
-    Assert.assertTrue(procStore.getActiveLogs().size() == 2);
+    assertTrue(procStore.getActiveLogs().size() == 2);
     procExecutor.startWorkers();
   }
 
-  public static class WaitProcedure
-      extends ProcedureTestingUtility.NoopProcedure<TestProcEnv> {
+  private void corrupt(FileStatus file) throws IOException {
+    LOG.info("Corrupt " + file);
+    Path tmpFile = file.getPath().suffix(".tmp");
+    // remove the last byte to make the trailer corrupted
+    try (FSDataInputStream in = fs.open(file.getPath());
+      FSDataOutputStream out = fs.create(tmpFile)) {
+      ByteStreams.copy(ByteStreams.limit(in, file.getLen() - 1), out);
+    }
+    fs.delete(file.getPath(), false);
+    fs.rename(tmpFile, file.getPath());
+  }
+
+
+  public static final class ExchangeProcedure extends 
ProcedureTestingUtility.NoopProcedure<Void> {
+
+    private final Exchanger<Boolean> exchanger = new Exchanger<>();
+
+    @SuppressWarnings("unchecked")
+    @Override
+    protected Procedure<Void>[] execute(Void env)
+        throws ProcedureYieldException, ProcedureSuspendedException, 
InterruptedException {
+      if (exchanger.exchange(Boolean.TRUE)) {
+        return new Procedure[] { this };
+      } else {
+        return null;
+      }
+    }
+  }
+
+  @Test
+  public void testResetDeleteWhenBuildingHoldingCleanupTracker() throws 
Exception {
+    createProcExecutor();
+    ExchangeProcedure proc1 = new ExchangeProcedure();
+    ExchangeProcedure proc2 = new ExchangeProcedure();
+    procExecutor.submitProcedure(proc1);
+    long procId2 = procExecutor.submitProcedure(proc2);
+    Thread.sleep(500);
+    procStore.rollWriterForTesting();
+    proc1.exchanger.exchange(Boolean.TRUE);
+    Thread.sleep(500);
+
+    FileStatus[] walFiles = fs.listStatus(logDir);
+    Arrays.sort(walFiles, (f1, f2) -> 
f1.getPath().getName().compareTo(f2.getPath().getName()));
+    // corrupt the first proc wal file, so we will have a partial tracker for 
it after restarting
+    corrupt(walFiles[0]);
+    ProcedureTestingUtility.restart(procExecutor, false, true);
+    // also update proc2, which means that all the procedures in the first 
proc wal have been
+    // updated and it should be deleted.
+    proc2 = (ExchangeProcedure) procExecutor.getProcedure(procId2);
+    proc2.exchanger.exchange(Boolean.TRUE);
+    htu.waitFor(10000, () -> !fs.exists(walFiles[0].getPath()));
+  }
+
+  public static class WaitProcedure extends 
ProcedureTestingUtility.NoopProcedure<Void> {
     public WaitProcedure() {
       super();
     }
@@ -188,8 +237,7 @@ public class TestProcedureCleanup {
     private CountDownLatch latch = new CountDownLatch(1);
 
     @Override
-    protected Procedure[] execute(final TestProcEnv env)
-        throws ProcedureSuspendedException {
+    protected Procedure<Void>[] execute(Void env) throws 
ProcedureSuspendedException {
       // Always wait here
       LOG.info("wait here");
       try {
@@ -202,7 +250,7 @@ public class TestProcedureCleanup {
     }
   }
 
-  public static class SuspendProcedure extends 
ProcedureTestingUtility.NoopProcedure<TestProcEnv> {
+  public static class SuspendProcedure extends 
ProcedureTestingUtility.NoopProcedure<Void> {
     public SuspendProcedure() {
       super();
     }
@@ -210,8 +258,7 @@ public class TestProcedureCleanup {
     private CountDownLatch latch = new CountDownLatch(1);
 
     @Override
-    protected Procedure[] execute(final TestProcEnv env)
-        throws ProcedureSuspendedException {
+    protected Procedure<Void>[] execute(Void env) throws 
ProcedureSuspendedException {
       // Always suspend the procedure
       LOG.info("suspend here");
       latch.countDown();
@@ -219,7 +266,7 @@ public class TestProcedureCleanup {
     }
   }
 
-  public static class RootProcedure extends 
ProcedureTestingUtility.NoopProcedure<TestProcEnv> {
+  public static class RootProcedure extends 
ProcedureTestingUtility.NoopProcedure<Void> {
     private boolean childSpwaned = false;
 
     public RootProcedure() {
@@ -227,16 +274,13 @@ public class TestProcedureCleanup {
     }
 
     @Override
-    protected Procedure[] execute(final TestProcEnv env)
-        throws ProcedureSuspendedException {
+    protected Procedure<Void>[] execute(Void env) throws 
ProcedureSuspendedException {
       if (!childSpwaned) {
         childSpwaned = true;
-        return new Procedure[] {new SuspendProcedure()};
+        return new Procedure[] { new SuspendProcedure() };
       } else {
         return null;
       }
     }
   }
-
-
 }

Reply via email to