Repository: incubator-tephra
Updated Branches:
  refs/heads/master 87cb21a0f -> 0016b2034


http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/0016b203/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java
----------------------------------------------------------------------
diff --git 
a/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java
 
b/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java
new file mode 100644
index 0000000..c981e15
--- /dev/null
+++ 
b/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra.hbase.txprune;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.tephra.hbase.coprocessor.TransactionProcessor;
+
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Thread that will write the the prune upper bound
+ */
+public class PruneUpperBoundWriter {
+  private static final Log LOG = LogFactory.getLog(TransactionProcessor.class);
+
+  private final TableName pruneStateTable;
+  private final DataJanitorState dataJanitorState;
+  private final byte[] regionName;
+  private final String regionNameAsString;
+  private final long pruneFlushInterval;
+  private final AtomicLong pruneUpperBound;
+  private final AtomicBoolean shouldFlush;
+
+  private Thread flushThread;
+  private long lastChecked;
+
+  public PruneUpperBoundWriter(DataJanitorState dataJanitorState, TableName 
pruneStateTable, String regionNameAsString,
+                               byte[] regionName, long pruneFlushInterval) {
+    this.pruneStateTable = pruneStateTable;
+    this.dataJanitorState = dataJanitorState;
+    this.regionName = regionName;
+    this.regionNameAsString = regionNameAsString;
+    this.pruneFlushInterval = pruneFlushInterval;
+    this.pruneUpperBound = new AtomicLong();
+    this.shouldFlush = new AtomicBoolean(false);
+    startFlushThread();
+  }
+
+  public boolean isAlive() {
+    return flushThread.isAlive();
+  }
+
+  public void persistPruneEntry(long pruneUpperBound) {
+    this.pruneUpperBound.set(pruneUpperBound);
+    this.shouldFlush.set(true);
+  }
+
+  public void stop() {
+    if (flushThread != null) {
+      flushThread.interrupt();
+    }
+  }
+
+  private void startFlushThread() {
+    flushThread = new Thread("tephra-prune-upper-bound-writer") {
+      @Override
+      public void run() {
+        while (!isInterrupted()) {
+          long now = System.currentTimeMillis();
+          if (now > (lastChecked + pruneFlushInterval)) {
+            if (shouldFlush.compareAndSet(true, false)) {
+              // should flush data
+              try {
+                dataJanitorState.savePruneUpperBoundForRegion(regionName, 
pruneUpperBound.get());
+              } catch (IOException ex) {
+                LOG.warn("Cannot record prune upper bound for region " + 
regionNameAsString + " in the table " +
+                           pruneStateTable.getNamespaceAsString() + ":" + 
pruneStateTable.getNameAsString() +
+                           " after compacting region.", ex);
+                // Retry again
+                shouldFlush.set(true);
+              }
+            }
+            lastChecked = now;
+          }
+
+          try {
+            TimeUnit.SECONDS.sleep(1);
+          } catch (InterruptedException ex) {
+            interrupt();
+            break;
+          }
+        }
+
+        LOG.info("PruneUpperBound Writer thread terminated.");
+      }
+    };
+
+    flushThread.setDaemon(true);
+    flushThread.start();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/0016b203/tephra-hbase-compat-1.0/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java
----------------------------------------------------------------------
diff --git 
a/tephra-hbase-compat-1.0/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java
 
b/tephra-hbase-compat-1.0/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java
index 310c710..a431ee3 100644
--- 
a/tephra-hbase-compat-1.0/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java
+++ 
b/tephra-hbase-compat-1.0/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java
@@ -59,6 +59,7 @@ import org.junit.Test;
 
 import java.io.IOException;
 import java.util.Collections;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 
 /**
@@ -78,6 +79,8 @@ public class InvalidListPruneTest extends 
AbstractHBaseTableTest {
     // Setup the configuration to start HBase cluster with the invalid list 
pruning enabled
     conf = HBaseConfiguration.create();
     conf.setBoolean(TxConstants.TransactionPruning.PRUNE_ENABLE, true);
+    // Flush prune data to table quickly, so that tests don't need have to 
wait long to see updates
+    conf.setLong(TxConstants.TransactionPruning.PRUNE_FLUSH_INTERVAL, 0L);
     AbstractHBaseTableTest.startMiniCluster();
 
     TransactionStateStorage txStateStorage = new 
InMemoryTransactionStateStorage();
@@ -135,6 +138,15 @@ public class InvalidListPruneTest extends 
AbstractHBaseTableTest {
     }
   }
 
+  private void truncatePruneStateTable() throws Exception {
+    if (hBaseAdmin.tableExists(pruneStateTable)) {
+      if (hBaseAdmin.isTableEnabled(pruneStateTable)) {
+        hBaseAdmin.disableTable(pruneStateTable);
+      }
+      hBaseAdmin.truncateTable(pruneStateTable, true);
+    }
+  }
+
   @Test
   public void testRecordCompactionState() throws Exception {
     DataJanitorState dataJanitorState =
@@ -145,6 +157,13 @@ public class InvalidListPruneTest extends 
AbstractHBaseTableTest {
         }
       });
 
+    // Since the write to prune table happens async, we need to sleep a bit 
before checking the state of the table
+    TimeUnit.SECONDS.sleep(2);
+    // Truncate prune state table to clear any data that might have been 
written by the previous test
+    // This is required because during the shutdown of the previous test, 
compaction might have kicked in and the
+    // coprocessor still had some data to flush and it might be flushed at the 
beginning of this test.
+    truncatePruneStateTable();
+
     // No prune upper bound initially
     Assert.assertEquals(-1,
                         
dataJanitorState.getPruneUpperBoundForRegion(getRegionName(txDataTable1, 
Bytes.toBytes(0))));
@@ -155,17 +174,23 @@ public class InvalidListPruneTest extends 
AbstractHBaseTableTest {
                               ImmutableSortedMap.<Long, 
TransactionManager.InProgressTx>of()));
     // Run minor compaction
     testUtil.compact(txDataTable1, false);
+    // Since the write to prune table happens async, we need to sleep a bit 
before checking the state of the table
+    TimeUnit.SECONDS.sleep(2);
     // No prune upper bound after minor compaction too
     Assert.assertEquals(-1,
                         
dataJanitorState.getPruneUpperBoundForRegion(getRegionName(txDataTable1, 
Bytes.toBytes(0))));
 
     // Run major compaction, and verify prune upper bound
     testUtil.compact(txDataTable1, true);
+    // Since the write to prune table happens async, we need to sleep a bit 
before checking the state of the table
+    TimeUnit.SECONDS.sleep(2);
     Assert.assertEquals(50,
                         
dataJanitorState.getPruneUpperBoundForRegion(getRegionName(txDataTable1, 
Bytes.toBytes(0))));
 
     // Run major compaction again with same snapshot, prune upper bound should 
not change
     testUtil.compact(txDataTable1, true);
+    // Since the write to prune table happens async, we need to sleep a bit 
before checking the state of the table
+    TimeUnit.SECONDS.sleep(2);
     Assert.assertEquals(50,
                         
dataJanitorState.getPruneUpperBoundForRegion(getRegionName(txDataTable1, 
Bytes.toBytes(0))));
 
@@ -179,6 +204,8 @@ public class InvalidListPruneTest extends 
AbstractHBaseTableTest {
 
     // Run major compaction again, now prune upper bound should change
     testUtil.compact(txDataTable1, true);
+    // Since the write to prune table happens async, we need to sleep a bit 
before checking the state of the table
+    TimeUnit.SECONDS.sleep(2);
     Assert.assertEquals(104,
                         
dataJanitorState.getPruneUpperBoundForRegion(getRegionName(txDataTable1, 
Bytes.toBytes(0))));
   }
@@ -196,6 +223,8 @@ public class InvalidListPruneTest extends 
AbstractHBaseTableTest {
     // Run major compaction, and verify it completes
     long now = System.currentTimeMillis();
     testUtil.compact(txDataTable1, true);
+    // Since the write to prune table happens async, we need to sleep a bit 
before checking the state of the table
+    TimeUnit.SECONDS.sleep(2);
     long lastMajorCompactionTime = 
TestTransactionProcessor.lastMajorCompactionTime.get();
     Assert.assertTrue(String.format("Expected %d, but was %d", now, 
lastMajorCompactionTime),
                       lastMajorCompactionTime >= now);
@@ -209,6 +238,8 @@ public class InvalidListPruneTest extends 
AbstractHBaseTableTest {
     // Run major compaction, and verify it completes
     long now = System.currentTimeMillis();
     testUtil.compact(txDataTable1, true);
+    // Since the write to prune table happens async, we need to sleep a bit 
before checking the state of the table
+    TimeUnit.SECONDS.sleep(2);
     long lastMajorCompactionTime = 
TestTransactionProcessor.lastMajorCompactionTime.get();
     Assert.assertTrue(String.format("Expected %d, but was %d", now, 
lastMajorCompactionTime),
                       lastMajorCompactionTime >= now);
@@ -226,6 +257,14 @@ public class InvalidListPruneTest extends 
AbstractHBaseTableTest {
 
     TransactionPruningPlugin transactionPruningPlugin = new 
TestTransactionPruningPlugin();
     transactionPruningPlugin.initialize(conf);
+
+    // Since the write to prune table happens async, we need to sleep a bit 
before checking the state of the table
+    TimeUnit.SECONDS.sleep(2);
+    // Truncate prune state table to clear any data that might have been 
written by the previous test
+    // This is required because during the shutdown of the previous test, 
compaction might have kicked in and the
+    // coprocessor still had some data to flush and it might be flushed at the 
beginning of this test.
+    truncatePruneStateTable();
+
     try {
       // Run without a transaction snapshot first
       long now1 = 200;
@@ -270,6 +309,8 @@ public class InvalidListPruneTest extends 
AbstractHBaseTableTest {
                           .add(getRegionName(txDataTable1, Bytes.toBytes(0)))
                           .build());
       testUtil.compact(txDataTable1, true);
+      // Since the write to prune table happens async, we need to sleep a bit 
before checking the state of the table
+      TimeUnit.SECONDS.sleep(2);
       long pruneUpperBound2 = 
transactionPruningPlugin.fetchPruneUpperBound(now2, inactiveTxTimeNow2);
       Assert.assertEquals(expectedPruneUpperBound2, pruneUpperBound2);
 

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/0016b203/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java
----------------------------------------------------------------------
diff --git 
a/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java
 
b/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java
index 45eed50..5a355e6 100644
--- 
a/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java
+++ 
b/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java
@@ -170,7 +170,9 @@ public class TransactionProcessor extends 
BaseRegionObserver {
 
   @Override
   public void stop(CoprocessorEnvironment e) throws IOException {
-    // nothing to do
+    if (compactionState != null) {
+      compactionState.stop();
+    }
   }
 
   @Override
@@ -191,7 +193,7 @@ public class TransactionProcessor extends 
BaseRegionObserver {
   public void prePut(ObserverContext<RegionCoprocessorEnvironment> e, Put put, 
WALEdit edit, Durability durability)
     throws IOException {
     Transaction tx = getFromOperation(put);
-    ensureValidTxLifetime(e.getEnvironment(), tx);
+    ensureValidTxLifetime(e.getEnvironment(), put, tx);
   }
 
   @Override
@@ -208,7 +210,7 @@ public class TransactionProcessor extends 
BaseRegionObserver {
     }
 
     Transaction tx = getFromOperation(delete);
-    ensureValidTxLifetime(e.getEnvironment(), tx);
+    ensureValidTxLifetime(e.getEnvironment(), delete, tx);
 
     // Other deletes are client-initiated and need to be translated into our 
own tombstones
     // TODO: this should delegate to the DeleteStrategy implementation.
@@ -322,11 +324,16 @@ public class TransactionProcessor extends 
BaseRegionObserver {
       if (conf != null) {
         pruneEnable = 
conf.getBoolean(TxConstants.TransactionPruning.PRUNE_ENABLE,
                                       
TxConstants.TransactionPruning.DEFAULT_PRUNE_ENABLE);
-        String pruneTable = 
conf.get(TxConstants.TransactionPruning.PRUNE_STATE_TABLE,
-                                     
TxConstants.TransactionPruning.DEFAULT_PRUNE_STATE_TABLE);
-        compactionState = new CompactionState(c.getEnvironment(), 
TableName.valueOf(pruneTable));
-        LOG.debug("Automatic invalid list pruning is enabled. Compaction state 
will be recorded in table " +
-                    pruneTable);
+        if (Boolean.TRUE.equals(pruneEnable)) {
+          String pruneTable = 
conf.get(TxConstants.TransactionPruning.PRUNE_STATE_TABLE,
+                                       
TxConstants.TransactionPruning.DEFAULT_PRUNE_STATE_TABLE);
+          long pruneFlushInterval = TimeUnit.SECONDS.toMillis(
+            conf.getLong(TxConstants.TransactionPruning.PRUNE_FLUSH_INTERVAL,
+                         
TxConstants.TransactionPruning.DEFAULT_PRUNE_FLUSH_INTERVAL));
+          compactionState = new CompactionState(c.getEnvironment(), 
TableName.valueOf(pruneTable), pruneFlushInterval);
+          LOG.debug("Automatic invalid list pruning is enabled. Compaction 
state will be recorded in table "
+                      + pruneTable);
+        }
       }
     }
 
@@ -390,11 +397,13 @@ public class TransactionProcessor extends 
BaseRegionObserver {
    * Make sure that the transaction is within the max valid transaction 
lifetime.
    *
    * @param env {@link RegionCoprocessorEnvironment} of the Region to which 
the coprocessor is associated
+   * @param op {@link OperationWithAttributes} HBase operation to access its 
attributes if required
    * @param tx {@link Transaction} supplied by the
    * @throws DoNotRetryIOException thrown if the transaction is older than the 
max lifetime of a transaction
    *         IOException throw if the value of max lifetime of transaction is 
unavailable
    */
   protected void ensureValidTxLifetime(RegionCoprocessorEnvironment env,
+                                       @SuppressWarnings("unused") 
OperationWithAttributes op,
                                        @Nullable Transaction tx) throws 
IOException {
     if (tx == null) {
       return;

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/0016b203/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/CompactionState.java
----------------------------------------------------------------------
diff --git 
a/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/CompactionState.java
 
b/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/CompactionState.java
index 850f508..58596be 100644
--- 
a/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/CompactionState.java
+++ 
b/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/CompactionState.java
@@ -38,22 +38,27 @@ import javax.annotation.Nullable;
 public class CompactionState {
   private static final Log LOG = LogFactory.getLog(CompactionState.class);
 
+  private final TableName stateTable;
   private final byte[] regionName;
   private final String regionNameAsString;
-  private final TableName stateTable;
   private final DataJanitorState dataJanitorState;
+  private final long pruneFlushInterval;
   private volatile long pruneUpperBound = -1;
 
-  public CompactionState(final RegionCoprocessorEnvironment env, final 
TableName stateTable) {
+  private PruneUpperBoundWriter pruneUpperBoundWriter;
+
+  public CompactionState(final RegionCoprocessorEnvironment env, final 
TableName stateTable, long pruneFlushInterval) {
+    this.stateTable = stateTable;
     this.regionName = env.getRegionInfo().getRegionName();
     this.regionNameAsString = env.getRegionInfo().getRegionNameAsString();
-    this.stateTable = stateTable;
     this.dataJanitorState = new DataJanitorState(new 
DataJanitorState.TableSupplier() {
       @Override
       public Table get() throws IOException {
         return env.getTable(stateTable);
       }
     });
+    this.pruneFlushInterval = pruneFlushInterval;
+    this.pruneUpperBoundWriter = createPruneUpperBoundWriter();
   }
 
   /**
@@ -75,18 +80,29 @@ public class CompactionState {
   }
 
   /**
+   * Stops the current {@link PruneUpperBoundWriter}.
+   */
+  public void stop() {
+    if (pruneUpperBoundWriter != null) {
+      pruneUpperBoundWriter.stop();
+    }
+  }
+
+  /**
    * Persists the transaction state recorded by {@link 
#record(CompactionRequest, TransactionVisibilityState)}.
    * This method is called after the compaction has successfully completed.
    */
   public void persist() {
     if (pruneUpperBound != -1) {
-      try {
-        dataJanitorState.savePruneUpperBoundForRegion(regionName, 
pruneUpperBound);
-        LOG.debug(String.format("Saved prune upper bound %s for region %s", 
pruneUpperBound, regionNameAsString));
-      } catch (IOException e) {
-        LOG.warn(String.format("Cannot record prune upper bound in table %s 
after compacting region %s",
-                               stateTable, regionNameAsString), e);
+      if (!pruneUpperBoundWriter.isAlive()) {
+        pruneUpperBoundWriter = createPruneUpperBoundWriter();
       }
+      pruneUpperBoundWriter.persistPruneEntry(pruneUpperBound);
+      LOG.debug(String.format("Enqueued prune upper bound %s for region %s", 
pruneUpperBound, regionNameAsString));
     }
   }
+
+  private PruneUpperBoundWriter createPruneUpperBoundWriter() {
+    return new PruneUpperBoundWriter(dataJanitorState, stateTable, 
regionNameAsString, regionName, pruneFlushInterval);
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/0016b203/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/DataJanitorState.java
----------------------------------------------------------------------
diff --git 
a/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/DataJanitorState.java
 
b/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/DataJanitorState.java
index c6d03c4..51dc181 100644
--- 
a/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/DataJanitorState.java
+++ 
b/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/DataJanitorState.java
@@ -46,8 +46,8 @@ import javax.annotation.Nullable;
 @SuppressWarnings("WeakerAccess")
 public class DataJanitorState {
   public static final byte[] FAMILY = {'f'};
+  public static final byte[] PRUNE_UPPER_BOUND_COL = {'p'};
 
-  private static final byte[] PRUNE_UPPER_BOUND_COL = {'p'};
   private static final byte[] REGION_TIME_COL = {'r'};
   private static final byte[] INACTIVE_TRANSACTION_BOUND_TIME_COL = {'i'};
 

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/0016b203/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/HBaseTransactionPruningPlugin.java
----------------------------------------------------------------------
diff --git 
a/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/HBaseTransactionPruningPlugin.java
 
b/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/HBaseTransactionPruningPlugin.java
index 83e3948..99c514f 100644
--- 
a/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/HBaseTransactionPruningPlugin.java
+++ 
b/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/HBaseTransactionPruningPlugin.java
@@ -24,8 +24,10 @@ import com.google.common.collect.Iterables;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.TableExistsException;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.Admin;
 import org.apache.hadoop.hbase.client.Connection;
@@ -119,10 +121,11 @@ public class HBaseTransactionPruningPlugin implements 
TransactionPruningPlugin {
   public void initialize(Configuration conf) throws IOException {
     this.conf = conf;
     this.connection = ConnectionFactory.createConnection(conf);
-
+    
     final TableName stateTable = 
TableName.valueOf(conf.get(TxConstants.TransactionPruning.PRUNE_STATE_TABLE,
                                                             
TxConstants.TransactionPruning.DEFAULT_PRUNE_STATE_TABLE));
     LOG.info("Initializing plugin with state table {}", 
stateTable.getNameWithNamespaceInclAsString());
+    createPruneTable(stateTable);
     this.dataJanitorState = new DataJanitorState(new 
DataJanitorState.TableSupplier() {
       @Override
       public Table get() throws IOException {
@@ -209,6 +212,38 @@ public class HBaseTransactionPruningPlugin implements 
TransactionPruningPlugin {
     }
   }
 
+  /**
+   * Create the prune state table given the {@link TableName} if the table 
doesn't exist already.
+   *
+   * @param stateTable prune state table name
+   */
+  protected void createPruneTable(TableName stateTable) throws IOException {
+    try (Admin admin = this.connection.getAdmin()) {
+      if (admin.tableExists(stateTable)) {
+        LOG.debug("Not creating pruneStateTable {} since it already exists.",
+                  stateTable.getNameWithNamespaceInclAsString());
+        return;
+      }
+
+      HTableDescriptor htd = new HTableDescriptor(stateTable);
+      htd.addFamily(new 
HColumnDescriptor(DataJanitorState.FAMILY).setMaxVersions(1));
+      admin.createTable(htd);
+      LOG.info("Created pruneTable {}", 
stateTable.getNameWithNamespaceInclAsString());
+    } catch (TableExistsException ex) {
+      // Expected if the prune state table is being created at the same time 
by another client
+      LOG.debug("Not creating pruneStateTable {} since it already exists.",
+                stateTable.getNameWithNamespaceInclAsString(), ex);
+    }
+  }
+
+  /**
+   * Returns whether the table is a transactional table. By default, it is a 
table is identified as a transactional
+   * table if it has a the coprocessor {@link TransactionProcessor} attached 
to it. Should be overriden if the users
+   * attach a different coprocessor.
+   *
+   * @param tableDescriptor {@link HTableDescriptor} of the table
+   * @return true if the table is transactional
+   */
   protected boolean isTransactionalTable(HTableDescriptor tableDescriptor) {
     return 
tableDescriptor.hasCoprocessor(TransactionProcessor.class.getName());
   }

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/0016b203/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java
----------------------------------------------------------------------
diff --git 
a/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java
 
b/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java
new file mode 100644
index 0000000..7bceaff
--- /dev/null
+++ 
b/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra.hbase.txprune;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.tephra.hbase.coprocessor.TransactionProcessor;
+
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Thread that will write the the prune upper bound
+ */
+public class PruneUpperBoundWriter {
+  private static final Log LOG = LogFactory.getLog(TransactionProcessor.class);
+
+  private final TableName pruneStateTable;
+  private final DataJanitorState dataJanitorState;
+  private final byte[] regionName;
+  private final String regionNameAsString;
+  private final long pruneFlushInterval;
+  private final AtomicLong pruneUpperBound;
+  private final AtomicBoolean shouldFlush;
+
+  private Thread flushThread;
+  private long lastChecked;
+
+  public PruneUpperBoundWriter(DataJanitorState dataJanitorState, TableName 
pruneStateTable, String regionNameAsString,
+                               byte[] regionName, long pruneFlushInterval) {
+    this.pruneStateTable = pruneStateTable;
+    this.dataJanitorState = dataJanitorState;
+    this.regionName = regionName;
+    this.regionNameAsString = regionNameAsString;
+    this.pruneFlushInterval = pruneFlushInterval;
+    this.pruneUpperBound = new AtomicLong();
+    this.shouldFlush = new AtomicBoolean(false);
+    startFlushThread();
+  }
+
+  public boolean isAlive() {
+    return flushThread.isAlive();
+  }
+
+  public void persistPruneEntry(long pruneUpperBound) {
+    this.pruneUpperBound.set(pruneUpperBound);
+    this.shouldFlush.set(true);
+  }
+
+  public void stop() {
+    if (flushThread != null) {
+      flushThread.interrupt();
+    }
+  }
+
+  private void startFlushThread() {
+    flushThread = new Thread("tephra-prune-upper-bound-writer") {
+      @Override
+      public void run() {
+        while (!isInterrupted()) {
+          long now = System.currentTimeMillis();
+          if (now > (lastChecked + pruneFlushInterval)) {
+            if (shouldFlush.compareAndSet(true, false)) {
+              // should flush data
+              try {
+                dataJanitorState.savePruneUpperBoundForRegion(regionName, 
pruneUpperBound.get());
+              } catch (IOException ex) {
+                LOG.warn("Cannot record prune upper bound for region " + 
regionNameAsString + " in the table " +
+                           pruneStateTable.getNameWithNamespaceInclAsString() 
+ " after compacting region.", ex);
+                // Retry again
+                shouldFlush.set(true);
+              }
+            }
+            lastChecked = now;
+          }
+
+          try {
+            TimeUnit.SECONDS.sleep(1);
+          } catch (InterruptedException ex) {
+            interrupt();
+            break;
+          }
+        }
+
+        LOG.info("PruneUpperBound Writer thread terminated.");
+      }
+    };
+
+    flushThread.setDaemon(true);
+    flushThread.start();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/0016b203/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java
----------------------------------------------------------------------
diff --git 
a/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java
 
b/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java
index 310c710..a431ee3 100644
--- 
a/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java
+++ 
b/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java
@@ -59,6 +59,7 @@ import org.junit.Test;
 
 import java.io.IOException;
 import java.util.Collections;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 
 /**
@@ -78,6 +79,8 @@ public class InvalidListPruneTest extends 
AbstractHBaseTableTest {
     // Setup the configuration to start HBase cluster with the invalid list 
pruning enabled
     conf = HBaseConfiguration.create();
     conf.setBoolean(TxConstants.TransactionPruning.PRUNE_ENABLE, true);
+    // Flush prune data to table quickly, so that tests don't need have to 
wait long to see updates
+    conf.setLong(TxConstants.TransactionPruning.PRUNE_FLUSH_INTERVAL, 0L);
     AbstractHBaseTableTest.startMiniCluster();
 
     TransactionStateStorage txStateStorage = new 
InMemoryTransactionStateStorage();
@@ -135,6 +138,15 @@ public class InvalidListPruneTest extends 
AbstractHBaseTableTest {
     }
   }
 
+  private void truncatePruneStateTable() throws Exception {
+    if (hBaseAdmin.tableExists(pruneStateTable)) {
+      if (hBaseAdmin.isTableEnabled(pruneStateTable)) {
+        hBaseAdmin.disableTable(pruneStateTable);
+      }
+      hBaseAdmin.truncateTable(pruneStateTable, true);
+    }
+  }
+
   @Test
   public void testRecordCompactionState() throws Exception {
     DataJanitorState dataJanitorState =
@@ -145,6 +157,13 @@ public class InvalidListPruneTest extends 
AbstractHBaseTableTest {
         }
       });
 
+    // Since the write to prune table happens async, we need to sleep a bit 
before checking the state of the table
+    TimeUnit.SECONDS.sleep(2);
+    // Truncate prune state table to clear any data that might have been 
written by the previous test
+    // This is required because during the shutdown of the previous test, 
compaction might have kicked in and the
+    // coprocessor still had some data to flush and it might be flushed at the 
beginning of this test.
+    truncatePruneStateTable();
+
     // No prune upper bound initially
     Assert.assertEquals(-1,
                         
dataJanitorState.getPruneUpperBoundForRegion(getRegionName(txDataTable1, 
Bytes.toBytes(0))));
@@ -155,17 +174,23 @@ public class InvalidListPruneTest extends 
AbstractHBaseTableTest {
                               ImmutableSortedMap.<Long, 
TransactionManager.InProgressTx>of()));
     // Run minor compaction
     testUtil.compact(txDataTable1, false);
+    // Since the write to prune table happens async, we need to sleep a bit 
before checking the state of the table
+    TimeUnit.SECONDS.sleep(2);
     // No prune upper bound after minor compaction too
     Assert.assertEquals(-1,
                         
dataJanitorState.getPruneUpperBoundForRegion(getRegionName(txDataTable1, 
Bytes.toBytes(0))));
 
     // Run major compaction, and verify prune upper bound
     testUtil.compact(txDataTable1, true);
+    // Since the write to prune table happens async, we need to sleep a bit 
before checking the state of the table
+    TimeUnit.SECONDS.sleep(2);
     Assert.assertEquals(50,
                         
dataJanitorState.getPruneUpperBoundForRegion(getRegionName(txDataTable1, 
Bytes.toBytes(0))));
 
     // Run major compaction again with same snapshot, prune upper bound should 
not change
     testUtil.compact(txDataTable1, true);
+    // Since the write to prune table happens async, we need to sleep a bit 
before checking the state of the table
+    TimeUnit.SECONDS.sleep(2);
     Assert.assertEquals(50,
                         
dataJanitorState.getPruneUpperBoundForRegion(getRegionName(txDataTable1, 
Bytes.toBytes(0))));
 
@@ -179,6 +204,8 @@ public class InvalidListPruneTest extends 
AbstractHBaseTableTest {
 
     // Run major compaction again, now prune upper bound should change
     testUtil.compact(txDataTable1, true);
+    // Since the write to prune table happens async, we need to sleep a bit 
before checking the state of the table
+    TimeUnit.SECONDS.sleep(2);
     Assert.assertEquals(104,
                         
dataJanitorState.getPruneUpperBoundForRegion(getRegionName(txDataTable1, 
Bytes.toBytes(0))));
   }
@@ -196,6 +223,8 @@ public class InvalidListPruneTest extends 
AbstractHBaseTableTest {
     // Run major compaction, and verify it completes
     long now = System.currentTimeMillis();
     testUtil.compact(txDataTable1, true);
+    // Since the write to prune table happens async, we need to sleep a bit 
before checking the state of the table
+    TimeUnit.SECONDS.sleep(2);
     long lastMajorCompactionTime = 
TestTransactionProcessor.lastMajorCompactionTime.get();
     Assert.assertTrue(String.format("Expected %d, but was %d", now, 
lastMajorCompactionTime),
                       lastMajorCompactionTime >= now);
@@ -209,6 +238,8 @@ public class InvalidListPruneTest extends 
AbstractHBaseTableTest {
     // Run major compaction, and verify it completes
     long now = System.currentTimeMillis();
     testUtil.compact(txDataTable1, true);
+    // Since the write to prune table happens async, we need to sleep a bit 
before checking the state of the table
+    TimeUnit.SECONDS.sleep(2);
     long lastMajorCompactionTime = 
TestTransactionProcessor.lastMajorCompactionTime.get();
     Assert.assertTrue(String.format("Expected %d, but was %d", now, 
lastMajorCompactionTime),
                       lastMajorCompactionTime >= now);
@@ -226,6 +257,14 @@ public class InvalidListPruneTest extends 
AbstractHBaseTableTest {
 
     TransactionPruningPlugin transactionPruningPlugin = new 
TestTransactionPruningPlugin();
     transactionPruningPlugin.initialize(conf);
+
+    // Since the write to prune table happens async, we need to sleep a bit 
before checking the state of the table
+    TimeUnit.SECONDS.sleep(2);
+    // Truncate prune state table to clear any data that might have been 
written by the previous test
+    // This is required because during the shutdown of the previous test, 
compaction might have kicked in and the
+    // coprocessor still had some data to flush and it might be flushed at the 
beginning of this test.
+    truncatePruneStateTable();
+
     try {
       // Run without a transaction snapshot first
       long now1 = 200;
@@ -270,6 +309,8 @@ public class InvalidListPruneTest extends 
AbstractHBaseTableTest {
                           .add(getRegionName(txDataTable1, Bytes.toBytes(0)))
                           .build());
       testUtil.compact(txDataTable1, true);
+      // Since the write to prune table happens async, we need to sleep a bit 
before checking the state of the table
+      TimeUnit.SECONDS.sleep(2);
       long pruneUpperBound2 = 
transactionPruningPlugin.fetchPruneUpperBound(now2, inactiveTxTimeNow2);
       Assert.assertEquals(expectedPruneUpperBound2, pruneUpperBound2);
 

Reply via email to