Repository: incubator-tephra Updated Branches: refs/heads/master 87cb21a0f -> 0016b2034
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/0016b203/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java ---------------------------------------------------------------------- diff --git a/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java b/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java new file mode 100644 index 0000000..c981e15 --- /dev/null +++ b/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tephra.hbase.txprune; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.TableName; +import org.apache.tephra.hbase.coprocessor.TransactionProcessor; + +import java.io.IOException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; + +/** + * Thread that will write the the prune upper bound + */ +public class PruneUpperBoundWriter { + private static final Log LOG = LogFactory.getLog(TransactionProcessor.class); + + private final TableName pruneStateTable; + private final DataJanitorState dataJanitorState; + private final byte[] regionName; + private final String regionNameAsString; + private final long pruneFlushInterval; + private final AtomicLong pruneUpperBound; + private final AtomicBoolean shouldFlush; + + private Thread flushThread; + private long lastChecked; + + public PruneUpperBoundWriter(DataJanitorState dataJanitorState, TableName pruneStateTable, String regionNameAsString, + byte[] regionName, long pruneFlushInterval) { + this.pruneStateTable = pruneStateTable; + this.dataJanitorState = dataJanitorState; + this.regionName = regionName; + this.regionNameAsString = regionNameAsString; + this.pruneFlushInterval = pruneFlushInterval; + this.pruneUpperBound = new AtomicLong(); + this.shouldFlush = new AtomicBoolean(false); + startFlushThread(); + } + + public boolean isAlive() { + return flushThread.isAlive(); + } + + public void persistPruneEntry(long pruneUpperBound) { + this.pruneUpperBound.set(pruneUpperBound); + this.shouldFlush.set(true); + } + + public void stop() { + if (flushThread != null) { + flushThread.interrupt(); + } + } + + private void startFlushThread() { + flushThread = new Thread("tephra-prune-upper-bound-writer") { + @Override + public void run() { + while (!isInterrupted()) { + long now = System.currentTimeMillis(); + if (now > (lastChecked + pruneFlushInterval)) { + if (shouldFlush.compareAndSet(true, false)) { + // should flush data + try { + dataJanitorState.savePruneUpperBoundForRegion(regionName, pruneUpperBound.get()); + } catch (IOException ex) { + LOG.warn("Cannot record prune upper bound for region " + regionNameAsString + " in the table " + + pruneStateTable.getNamespaceAsString() + ":" + pruneStateTable.getNameAsString() + + " after compacting region.", ex); + // Retry again + shouldFlush.set(true); + } + } + lastChecked = now; + } + + try { + TimeUnit.SECONDS.sleep(1); + } catch (InterruptedException ex) { + interrupt(); + break; + } + } + + LOG.info("PruneUpperBound Writer thread terminated."); + } + }; + + flushThread.setDaemon(true); + flushThread.start(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/0016b203/tephra-hbase-compat-1.0/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java ---------------------------------------------------------------------- diff --git a/tephra-hbase-compat-1.0/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java b/tephra-hbase-compat-1.0/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java index 310c710..a431ee3 100644 --- a/tephra-hbase-compat-1.0/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java +++ b/tephra-hbase-compat-1.0/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java @@ -59,6 +59,7 @@ import org.junit.Test; import java.io.IOException; import java.util.Collections; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; /** @@ -78,6 +79,8 @@ public class InvalidListPruneTest extends AbstractHBaseTableTest { // Setup the configuration to start HBase cluster with the invalid list pruning enabled conf = HBaseConfiguration.create(); conf.setBoolean(TxConstants.TransactionPruning.PRUNE_ENABLE, true); + // Flush prune data to table quickly, so that tests don't need have to wait long to see updates + conf.setLong(TxConstants.TransactionPruning.PRUNE_FLUSH_INTERVAL, 0L); AbstractHBaseTableTest.startMiniCluster(); TransactionStateStorage txStateStorage = new InMemoryTransactionStateStorage(); @@ -135,6 +138,15 @@ public class InvalidListPruneTest extends AbstractHBaseTableTest { } } + private void truncatePruneStateTable() throws Exception { + if (hBaseAdmin.tableExists(pruneStateTable)) { + if (hBaseAdmin.isTableEnabled(pruneStateTable)) { + hBaseAdmin.disableTable(pruneStateTable); + } + hBaseAdmin.truncateTable(pruneStateTable, true); + } + } + @Test public void testRecordCompactionState() throws Exception { DataJanitorState dataJanitorState = @@ -145,6 +157,13 @@ public class InvalidListPruneTest extends AbstractHBaseTableTest { } }); + // Since the write to prune table happens async, we need to sleep a bit before checking the state of the table + TimeUnit.SECONDS.sleep(2); + // Truncate prune state table to clear any data that might have been written by the previous test + // This is required because during the shutdown of the previous test, compaction might have kicked in and the + // coprocessor still had some data to flush and it might be flushed at the beginning of this test. + truncatePruneStateTable(); + // No prune upper bound initially Assert.assertEquals(-1, dataJanitorState.getPruneUpperBoundForRegion(getRegionName(txDataTable1, Bytes.toBytes(0)))); @@ -155,17 +174,23 @@ public class InvalidListPruneTest extends AbstractHBaseTableTest { ImmutableSortedMap.<Long, TransactionManager.InProgressTx>of())); // Run minor compaction testUtil.compact(txDataTable1, false); + // Since the write to prune table happens async, we need to sleep a bit before checking the state of the table + TimeUnit.SECONDS.sleep(2); // No prune upper bound after minor compaction too Assert.assertEquals(-1, dataJanitorState.getPruneUpperBoundForRegion(getRegionName(txDataTable1, Bytes.toBytes(0)))); // Run major compaction, and verify prune upper bound testUtil.compact(txDataTable1, true); + // Since the write to prune table happens async, we need to sleep a bit before checking the state of the table + TimeUnit.SECONDS.sleep(2); Assert.assertEquals(50, dataJanitorState.getPruneUpperBoundForRegion(getRegionName(txDataTable1, Bytes.toBytes(0)))); // Run major compaction again with same snapshot, prune upper bound should not change testUtil.compact(txDataTable1, true); + // Since the write to prune table happens async, we need to sleep a bit before checking the state of the table + TimeUnit.SECONDS.sleep(2); Assert.assertEquals(50, dataJanitorState.getPruneUpperBoundForRegion(getRegionName(txDataTable1, Bytes.toBytes(0)))); @@ -179,6 +204,8 @@ public class InvalidListPruneTest extends AbstractHBaseTableTest { // Run major compaction again, now prune upper bound should change testUtil.compact(txDataTable1, true); + // Since the write to prune table happens async, we need to sleep a bit before checking the state of the table + TimeUnit.SECONDS.sleep(2); Assert.assertEquals(104, dataJanitorState.getPruneUpperBoundForRegion(getRegionName(txDataTable1, Bytes.toBytes(0)))); } @@ -196,6 +223,8 @@ public class InvalidListPruneTest extends AbstractHBaseTableTest { // Run major compaction, and verify it completes long now = System.currentTimeMillis(); testUtil.compact(txDataTable1, true); + // Since the write to prune table happens async, we need to sleep a bit before checking the state of the table + TimeUnit.SECONDS.sleep(2); long lastMajorCompactionTime = TestTransactionProcessor.lastMajorCompactionTime.get(); Assert.assertTrue(String.format("Expected %d, but was %d", now, lastMajorCompactionTime), lastMajorCompactionTime >= now); @@ -209,6 +238,8 @@ public class InvalidListPruneTest extends AbstractHBaseTableTest { // Run major compaction, and verify it completes long now = System.currentTimeMillis(); testUtil.compact(txDataTable1, true); + // Since the write to prune table happens async, we need to sleep a bit before checking the state of the table + TimeUnit.SECONDS.sleep(2); long lastMajorCompactionTime = TestTransactionProcessor.lastMajorCompactionTime.get(); Assert.assertTrue(String.format("Expected %d, but was %d", now, lastMajorCompactionTime), lastMajorCompactionTime >= now); @@ -226,6 +257,14 @@ public class InvalidListPruneTest extends AbstractHBaseTableTest { TransactionPruningPlugin transactionPruningPlugin = new TestTransactionPruningPlugin(); transactionPruningPlugin.initialize(conf); + + // Since the write to prune table happens async, we need to sleep a bit before checking the state of the table + TimeUnit.SECONDS.sleep(2); + // Truncate prune state table to clear any data that might have been written by the previous test + // This is required because during the shutdown of the previous test, compaction might have kicked in and the + // coprocessor still had some data to flush and it might be flushed at the beginning of this test. + truncatePruneStateTable(); + try { // Run without a transaction snapshot first long now1 = 200; @@ -270,6 +309,8 @@ public class InvalidListPruneTest extends AbstractHBaseTableTest { .add(getRegionName(txDataTable1, Bytes.toBytes(0))) .build()); testUtil.compact(txDataTable1, true); + // Since the write to prune table happens async, we need to sleep a bit before checking the state of the table + TimeUnit.SECONDS.sleep(2); long pruneUpperBound2 = transactionPruningPlugin.fetchPruneUpperBound(now2, inactiveTxTimeNow2); Assert.assertEquals(expectedPruneUpperBound2, pruneUpperBound2); http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/0016b203/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java ---------------------------------------------------------------------- diff --git a/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java b/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java index 45eed50..5a355e6 100644 --- a/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java +++ b/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java @@ -170,7 +170,9 @@ public class TransactionProcessor extends BaseRegionObserver { @Override public void stop(CoprocessorEnvironment e) throws IOException { - // nothing to do + if (compactionState != null) { + compactionState.stop(); + } } @Override @@ -191,7 +193,7 @@ public class TransactionProcessor extends BaseRegionObserver { public void prePut(ObserverContext<RegionCoprocessorEnvironment> e, Put put, WALEdit edit, Durability durability) throws IOException { Transaction tx = getFromOperation(put); - ensureValidTxLifetime(e.getEnvironment(), tx); + ensureValidTxLifetime(e.getEnvironment(), put, tx); } @Override @@ -208,7 +210,7 @@ public class TransactionProcessor extends BaseRegionObserver { } Transaction tx = getFromOperation(delete); - ensureValidTxLifetime(e.getEnvironment(), tx); + ensureValidTxLifetime(e.getEnvironment(), delete, tx); // Other deletes are client-initiated and need to be translated into our own tombstones // TODO: this should delegate to the DeleteStrategy implementation. @@ -322,11 +324,16 @@ public class TransactionProcessor extends BaseRegionObserver { if (conf != null) { pruneEnable = conf.getBoolean(TxConstants.TransactionPruning.PRUNE_ENABLE, TxConstants.TransactionPruning.DEFAULT_PRUNE_ENABLE); - String pruneTable = conf.get(TxConstants.TransactionPruning.PRUNE_STATE_TABLE, - TxConstants.TransactionPruning.DEFAULT_PRUNE_STATE_TABLE); - compactionState = new CompactionState(c.getEnvironment(), TableName.valueOf(pruneTable)); - LOG.debug("Automatic invalid list pruning is enabled. Compaction state will be recorded in table " + - pruneTable); + if (Boolean.TRUE.equals(pruneEnable)) { + String pruneTable = conf.get(TxConstants.TransactionPruning.PRUNE_STATE_TABLE, + TxConstants.TransactionPruning.DEFAULT_PRUNE_STATE_TABLE); + long pruneFlushInterval = TimeUnit.SECONDS.toMillis( + conf.getLong(TxConstants.TransactionPruning.PRUNE_FLUSH_INTERVAL, + TxConstants.TransactionPruning.DEFAULT_PRUNE_FLUSH_INTERVAL)); + compactionState = new CompactionState(c.getEnvironment(), TableName.valueOf(pruneTable), pruneFlushInterval); + LOG.debug("Automatic invalid list pruning is enabled. Compaction state will be recorded in table " + + pruneTable); + } } } @@ -390,11 +397,13 @@ public class TransactionProcessor extends BaseRegionObserver { * Make sure that the transaction is within the max valid transaction lifetime. * * @param env {@link RegionCoprocessorEnvironment} of the Region to which the coprocessor is associated + * @param op {@link OperationWithAttributes} HBase operation to access its attributes if required * @param tx {@link Transaction} supplied by the * @throws DoNotRetryIOException thrown if the transaction is older than the max lifetime of a transaction * IOException throw if the value of max lifetime of transaction is unavailable */ protected void ensureValidTxLifetime(RegionCoprocessorEnvironment env, + @SuppressWarnings("unused") OperationWithAttributes op, @Nullable Transaction tx) throws IOException { if (tx == null) { return; http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/0016b203/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/CompactionState.java ---------------------------------------------------------------------- diff --git a/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/CompactionState.java b/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/CompactionState.java index 850f508..58596be 100644 --- a/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/CompactionState.java +++ b/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/CompactionState.java @@ -38,22 +38,27 @@ import javax.annotation.Nullable; public class CompactionState { private static final Log LOG = LogFactory.getLog(CompactionState.class); + private final TableName stateTable; private final byte[] regionName; private final String regionNameAsString; - private final TableName stateTable; private final DataJanitorState dataJanitorState; + private final long pruneFlushInterval; private volatile long pruneUpperBound = -1; - public CompactionState(final RegionCoprocessorEnvironment env, final TableName stateTable) { + private PruneUpperBoundWriter pruneUpperBoundWriter; + + public CompactionState(final RegionCoprocessorEnvironment env, final TableName stateTable, long pruneFlushInterval) { + this.stateTable = stateTable; this.regionName = env.getRegionInfo().getRegionName(); this.regionNameAsString = env.getRegionInfo().getRegionNameAsString(); - this.stateTable = stateTable; this.dataJanitorState = new DataJanitorState(new DataJanitorState.TableSupplier() { @Override public Table get() throws IOException { return env.getTable(stateTable); } }); + this.pruneFlushInterval = pruneFlushInterval; + this.pruneUpperBoundWriter = createPruneUpperBoundWriter(); } /** @@ -75,18 +80,29 @@ public class CompactionState { } /** + * Stops the current {@link PruneUpperBoundWriter}. + */ + public void stop() { + if (pruneUpperBoundWriter != null) { + pruneUpperBoundWriter.stop(); + } + } + + /** * Persists the transaction state recorded by {@link #record(CompactionRequest, TransactionVisibilityState)}. * This method is called after the compaction has successfully completed. */ public void persist() { if (pruneUpperBound != -1) { - try { - dataJanitorState.savePruneUpperBoundForRegion(regionName, pruneUpperBound); - LOG.debug(String.format("Saved prune upper bound %s for region %s", pruneUpperBound, regionNameAsString)); - } catch (IOException e) { - LOG.warn(String.format("Cannot record prune upper bound in table %s after compacting region %s", - stateTable, regionNameAsString), e); + if (!pruneUpperBoundWriter.isAlive()) { + pruneUpperBoundWriter = createPruneUpperBoundWriter(); } + pruneUpperBoundWriter.persistPruneEntry(pruneUpperBound); + LOG.debug(String.format("Enqueued prune upper bound %s for region %s", pruneUpperBound, regionNameAsString)); } } + + private PruneUpperBoundWriter createPruneUpperBoundWriter() { + return new PruneUpperBoundWriter(dataJanitorState, stateTable, regionNameAsString, regionName, pruneFlushInterval); + } } http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/0016b203/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/DataJanitorState.java ---------------------------------------------------------------------- diff --git a/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/DataJanitorState.java b/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/DataJanitorState.java index c6d03c4..51dc181 100644 --- a/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/DataJanitorState.java +++ b/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/DataJanitorState.java @@ -46,8 +46,8 @@ import javax.annotation.Nullable; @SuppressWarnings("WeakerAccess") public class DataJanitorState { public static final byte[] FAMILY = {'f'}; + public static final byte[] PRUNE_UPPER_BOUND_COL = {'p'}; - private static final byte[] PRUNE_UPPER_BOUND_COL = {'p'}; private static final byte[] REGION_TIME_COL = {'r'}; private static final byte[] INACTIVE_TRANSACTION_BOUND_TIME_COL = {'i'}; http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/0016b203/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/HBaseTransactionPruningPlugin.java ---------------------------------------------------------------------- diff --git a/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/HBaseTransactionPruningPlugin.java b/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/HBaseTransactionPruningPlugin.java index 83e3948..99c514f 100644 --- a/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/HBaseTransactionPruningPlugin.java +++ b/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/HBaseTransactionPruningPlugin.java @@ -24,8 +24,10 @@ import com.google.common.collect.Iterables; import com.google.common.collect.Maps; import com.google.common.collect.Sets; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.TableExistsException; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.Connection; @@ -119,10 +121,11 @@ public class HBaseTransactionPruningPlugin implements TransactionPruningPlugin { public void initialize(Configuration conf) throws IOException { this.conf = conf; this.connection = ConnectionFactory.createConnection(conf); - + final TableName stateTable = TableName.valueOf(conf.get(TxConstants.TransactionPruning.PRUNE_STATE_TABLE, TxConstants.TransactionPruning.DEFAULT_PRUNE_STATE_TABLE)); LOG.info("Initializing plugin with state table {}", stateTable.getNameWithNamespaceInclAsString()); + createPruneTable(stateTable); this.dataJanitorState = new DataJanitorState(new DataJanitorState.TableSupplier() { @Override public Table get() throws IOException { @@ -209,6 +212,38 @@ public class HBaseTransactionPruningPlugin implements TransactionPruningPlugin { } } + /** + * Create the prune state table given the {@link TableName} if the table doesn't exist already. + * + * @param stateTable prune state table name + */ + protected void createPruneTable(TableName stateTable) throws IOException { + try (Admin admin = this.connection.getAdmin()) { + if (admin.tableExists(stateTable)) { + LOG.debug("Not creating pruneStateTable {} since it already exists.", + stateTable.getNameWithNamespaceInclAsString()); + return; + } + + HTableDescriptor htd = new HTableDescriptor(stateTable); + htd.addFamily(new HColumnDescriptor(DataJanitorState.FAMILY).setMaxVersions(1)); + admin.createTable(htd); + LOG.info("Created pruneTable {}", stateTable.getNameWithNamespaceInclAsString()); + } catch (TableExistsException ex) { + // Expected if the prune state table is being created at the same time by another client + LOG.debug("Not creating pruneStateTable {} since it already exists.", + stateTable.getNameWithNamespaceInclAsString(), ex); + } + } + + /** + * Returns whether the table is a transactional table. By default, it is a table is identified as a transactional + * table if it has a the coprocessor {@link TransactionProcessor} attached to it. Should be overriden if the users + * attach a different coprocessor. + * + * @param tableDescriptor {@link HTableDescriptor} of the table + * @return true if the table is transactional + */ protected boolean isTransactionalTable(HTableDescriptor tableDescriptor) { return tableDescriptor.hasCoprocessor(TransactionProcessor.class.getName()); } http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/0016b203/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java ---------------------------------------------------------------------- diff --git a/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java b/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java new file mode 100644 index 0000000..7bceaff --- /dev/null +++ b/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tephra.hbase.txprune; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.TableName; +import org.apache.tephra.hbase.coprocessor.TransactionProcessor; + +import java.io.IOException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; + +/** + * Thread that will write the the prune upper bound + */ +public class PruneUpperBoundWriter { + private static final Log LOG = LogFactory.getLog(TransactionProcessor.class); + + private final TableName pruneStateTable; + private final DataJanitorState dataJanitorState; + private final byte[] regionName; + private final String regionNameAsString; + private final long pruneFlushInterval; + private final AtomicLong pruneUpperBound; + private final AtomicBoolean shouldFlush; + + private Thread flushThread; + private long lastChecked; + + public PruneUpperBoundWriter(DataJanitorState dataJanitorState, TableName pruneStateTable, String regionNameAsString, + byte[] regionName, long pruneFlushInterval) { + this.pruneStateTable = pruneStateTable; + this.dataJanitorState = dataJanitorState; + this.regionName = regionName; + this.regionNameAsString = regionNameAsString; + this.pruneFlushInterval = pruneFlushInterval; + this.pruneUpperBound = new AtomicLong(); + this.shouldFlush = new AtomicBoolean(false); + startFlushThread(); + } + + public boolean isAlive() { + return flushThread.isAlive(); + } + + public void persistPruneEntry(long pruneUpperBound) { + this.pruneUpperBound.set(pruneUpperBound); + this.shouldFlush.set(true); + } + + public void stop() { + if (flushThread != null) { + flushThread.interrupt(); + } + } + + private void startFlushThread() { + flushThread = new Thread("tephra-prune-upper-bound-writer") { + @Override + public void run() { + while (!isInterrupted()) { + long now = System.currentTimeMillis(); + if (now > (lastChecked + pruneFlushInterval)) { + if (shouldFlush.compareAndSet(true, false)) { + // should flush data + try { + dataJanitorState.savePruneUpperBoundForRegion(regionName, pruneUpperBound.get()); + } catch (IOException ex) { + LOG.warn("Cannot record prune upper bound for region " + regionNameAsString + " in the table " + + pruneStateTable.getNameWithNamespaceInclAsString() + " after compacting region.", ex); + // Retry again + shouldFlush.set(true); + } + } + lastChecked = now; + } + + try { + TimeUnit.SECONDS.sleep(1); + } catch (InterruptedException ex) { + interrupt(); + break; + } + } + + LOG.info("PruneUpperBound Writer thread terminated."); + } + }; + + flushThread.setDaemon(true); + flushThread.start(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/0016b203/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java ---------------------------------------------------------------------- diff --git a/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java b/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java index 310c710..a431ee3 100644 --- a/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java +++ b/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java @@ -59,6 +59,7 @@ import org.junit.Test; import java.io.IOException; import java.util.Collections; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; /** @@ -78,6 +79,8 @@ public class InvalidListPruneTest extends AbstractHBaseTableTest { // Setup the configuration to start HBase cluster with the invalid list pruning enabled conf = HBaseConfiguration.create(); conf.setBoolean(TxConstants.TransactionPruning.PRUNE_ENABLE, true); + // Flush prune data to table quickly, so that tests don't need have to wait long to see updates + conf.setLong(TxConstants.TransactionPruning.PRUNE_FLUSH_INTERVAL, 0L); AbstractHBaseTableTest.startMiniCluster(); TransactionStateStorage txStateStorage = new InMemoryTransactionStateStorage(); @@ -135,6 +138,15 @@ public class InvalidListPruneTest extends AbstractHBaseTableTest { } } + private void truncatePruneStateTable() throws Exception { + if (hBaseAdmin.tableExists(pruneStateTable)) { + if (hBaseAdmin.isTableEnabled(pruneStateTable)) { + hBaseAdmin.disableTable(pruneStateTable); + } + hBaseAdmin.truncateTable(pruneStateTable, true); + } + } + @Test public void testRecordCompactionState() throws Exception { DataJanitorState dataJanitorState = @@ -145,6 +157,13 @@ public class InvalidListPruneTest extends AbstractHBaseTableTest { } }); + // Since the write to prune table happens async, we need to sleep a bit before checking the state of the table + TimeUnit.SECONDS.sleep(2); + // Truncate prune state table to clear any data that might have been written by the previous test + // This is required because during the shutdown of the previous test, compaction might have kicked in and the + // coprocessor still had some data to flush and it might be flushed at the beginning of this test. + truncatePruneStateTable(); + // No prune upper bound initially Assert.assertEquals(-1, dataJanitorState.getPruneUpperBoundForRegion(getRegionName(txDataTable1, Bytes.toBytes(0)))); @@ -155,17 +174,23 @@ public class InvalidListPruneTest extends AbstractHBaseTableTest { ImmutableSortedMap.<Long, TransactionManager.InProgressTx>of())); // Run minor compaction testUtil.compact(txDataTable1, false); + // Since the write to prune table happens async, we need to sleep a bit before checking the state of the table + TimeUnit.SECONDS.sleep(2); // No prune upper bound after minor compaction too Assert.assertEquals(-1, dataJanitorState.getPruneUpperBoundForRegion(getRegionName(txDataTable1, Bytes.toBytes(0)))); // Run major compaction, and verify prune upper bound testUtil.compact(txDataTable1, true); + // Since the write to prune table happens async, we need to sleep a bit before checking the state of the table + TimeUnit.SECONDS.sleep(2); Assert.assertEquals(50, dataJanitorState.getPruneUpperBoundForRegion(getRegionName(txDataTable1, Bytes.toBytes(0)))); // Run major compaction again with same snapshot, prune upper bound should not change testUtil.compact(txDataTable1, true); + // Since the write to prune table happens async, we need to sleep a bit before checking the state of the table + TimeUnit.SECONDS.sleep(2); Assert.assertEquals(50, dataJanitorState.getPruneUpperBoundForRegion(getRegionName(txDataTable1, Bytes.toBytes(0)))); @@ -179,6 +204,8 @@ public class InvalidListPruneTest extends AbstractHBaseTableTest { // Run major compaction again, now prune upper bound should change testUtil.compact(txDataTable1, true); + // Since the write to prune table happens async, we need to sleep a bit before checking the state of the table + TimeUnit.SECONDS.sleep(2); Assert.assertEquals(104, dataJanitorState.getPruneUpperBoundForRegion(getRegionName(txDataTable1, Bytes.toBytes(0)))); } @@ -196,6 +223,8 @@ public class InvalidListPruneTest extends AbstractHBaseTableTest { // Run major compaction, and verify it completes long now = System.currentTimeMillis(); testUtil.compact(txDataTable1, true); + // Since the write to prune table happens async, we need to sleep a bit before checking the state of the table + TimeUnit.SECONDS.sleep(2); long lastMajorCompactionTime = TestTransactionProcessor.lastMajorCompactionTime.get(); Assert.assertTrue(String.format("Expected %d, but was %d", now, lastMajorCompactionTime), lastMajorCompactionTime >= now); @@ -209,6 +238,8 @@ public class InvalidListPruneTest extends AbstractHBaseTableTest { // Run major compaction, and verify it completes long now = System.currentTimeMillis(); testUtil.compact(txDataTable1, true); + // Since the write to prune table happens async, we need to sleep a bit before checking the state of the table + TimeUnit.SECONDS.sleep(2); long lastMajorCompactionTime = TestTransactionProcessor.lastMajorCompactionTime.get(); Assert.assertTrue(String.format("Expected %d, but was %d", now, lastMajorCompactionTime), lastMajorCompactionTime >= now); @@ -226,6 +257,14 @@ public class InvalidListPruneTest extends AbstractHBaseTableTest { TransactionPruningPlugin transactionPruningPlugin = new TestTransactionPruningPlugin(); transactionPruningPlugin.initialize(conf); + + // Since the write to prune table happens async, we need to sleep a bit before checking the state of the table + TimeUnit.SECONDS.sleep(2); + // Truncate prune state table to clear any data that might have been written by the previous test + // This is required because during the shutdown of the previous test, compaction might have kicked in and the + // coprocessor still had some data to flush and it might be flushed at the beginning of this test. + truncatePruneStateTable(); + try { // Run without a transaction snapshot first long now1 = 200; @@ -270,6 +309,8 @@ public class InvalidListPruneTest extends AbstractHBaseTableTest { .add(getRegionName(txDataTable1, Bytes.toBytes(0))) .build()); testUtil.compact(txDataTable1, true); + // Since the write to prune table happens async, we need to sleep a bit before checking the state of the table + TimeUnit.SECONDS.sleep(2); long pruneUpperBound2 = transactionPruningPlugin.fetchPruneUpperBound(now2, inactiveTxTimeNow2); Assert.assertEquals(expectedPruneUpperBound2, pruneUpperBound2);
