Repository: incubator-tephra Updated Branches: refs/heads/master aeeee00be -> 69a6cc6be
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/69a6cc6b/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java ---------------------------------------------------------------------- diff --git a/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java b/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java index 5a355e6..015077b 100644 --- a/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java +++ b/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java @@ -331,8 +331,10 @@ public class TransactionProcessor extends BaseRegionObserver { conf.getLong(TxConstants.TransactionPruning.PRUNE_FLUSH_INTERVAL, TxConstants.TransactionPruning.DEFAULT_PRUNE_FLUSH_INTERVAL)); compactionState = new CompactionState(c.getEnvironment(), TableName.valueOf(pruneTable), pruneFlushInterval); - LOG.debug("Automatic invalid list pruning is enabled. Compaction state will be recorded in table " - + pruneTable); + if (LOG.isDebugEnabled()) { + LOG.debug("Automatic invalid list pruning is enabled. Compaction state will be recorded in table " + + pruneTable); + } } } } http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/69a6cc6b/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/CompactionState.java ---------------------------------------------------------------------- diff --git a/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/CompactionState.java b/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/CompactionState.java index 58596be..db7880b 100644 --- a/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/CompactionState.java +++ b/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/CompactionState.java @@ -38,27 +38,25 @@ import javax.annotation.Nullable; public class CompactionState { private static final Log LOG = LogFactory.getLog(CompactionState.class); - private final TableName stateTable; private final byte[] regionName; private final String regionNameAsString; - private final DataJanitorState dataJanitorState; - private final long pruneFlushInterval; - private volatile long pruneUpperBound = -1; + private final PruneUpperBoundWriterSupplier pruneUpperBoundWriterSupplier; + private final PruneUpperBoundWriter pruneUpperBoundWriter; - private PruneUpperBoundWriter pruneUpperBoundWriter; + private volatile long pruneUpperBound = -1; public CompactionState(final RegionCoprocessorEnvironment env, final TableName stateTable, long pruneFlushInterval) { - this.stateTable = stateTable; this.regionName = env.getRegionInfo().getRegionName(); this.regionNameAsString = env.getRegionInfo().getRegionNameAsString(); - this.dataJanitorState = new DataJanitorState(new DataJanitorState.TableSupplier() { + DataJanitorState dataJanitorState = new DataJanitorState(new DataJanitorState.TableSupplier() { @Override public Table get() throws IOException { return env.getTable(stateTable); } }); - this.pruneFlushInterval = pruneFlushInterval; - this.pruneUpperBoundWriter = createPruneUpperBoundWriter(); + this.pruneUpperBoundWriterSupplier = new PruneUpperBoundWriterSupplier(stateTable, dataJanitorState, + pruneFlushInterval); + this.pruneUpperBoundWriter = pruneUpperBoundWriterSupplier.get(); } /** @@ -71,38 +69,33 @@ public class CompactionState { if (request.isMajor() && snapshot != null) { Transaction tx = TxUtils.createDummyTransaction(snapshot); pruneUpperBound = TxUtils.getPruneUpperBound(tx); - LOG.debug( - String.format("Computed prune upper bound %s for compaction request %s using transaction state from time %s", - pruneUpperBound, request, snapshot.getTimestamp())); + if (LOG.isDebugEnabled()) { + LOG.debug( + String.format("Computed prune upper bound %s for compaction request %s using transaction state from time %s", + pruneUpperBound, request, snapshot.getTimestamp())); + } } else { pruneUpperBound = -1; } } /** - * Stops the current {@link PruneUpperBoundWriter}. - */ - public void stop() { - if (pruneUpperBoundWriter != null) { - pruneUpperBoundWriter.stop(); - } - } - - /** * Persists the transaction state recorded by {@link #record(CompactionRequest, TransactionVisibilityState)}. * This method is called after the compaction has successfully completed. */ public void persist() { if (pruneUpperBound != -1) { - if (!pruneUpperBoundWriter.isAlive()) { - pruneUpperBoundWriter = createPruneUpperBoundWriter(); + pruneUpperBoundWriter.persistPruneEntry(regionName, pruneUpperBound); + if (LOG.isDebugEnabled()) { + LOG.debug(String.format("Enqueued prune upper bound %s for region %s", pruneUpperBound, regionNameAsString)); } - pruneUpperBoundWriter.persistPruneEntry(pruneUpperBound); - LOG.debug(String.format("Enqueued prune upper bound %s for region %s", pruneUpperBound, regionNameAsString)); } } - private PruneUpperBoundWriter createPruneUpperBoundWriter() { - return new PruneUpperBoundWriter(dataJanitorState, stateTable, regionNameAsString, regionName, pruneFlushInterval); + /** + * Releases the usage {@link PruneUpperBoundWriter}. + */ + public void stop() { + pruneUpperBoundWriterSupplier.release(); } } http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/69a6cc6b/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java ---------------------------------------------------------------------- diff --git a/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java b/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java index 7bceaff..7e9d1a3 100644 --- a/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java +++ b/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java @@ -18,57 +18,62 @@ package org.apache.tephra.hbase.txprune; +import com.google.common.util.concurrent.AbstractIdleService; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.TableName; -import org.apache.tephra.hbase.coprocessor.TransactionProcessor; +import org.apache.hadoop.hbase.util.Bytes; import java.io.IOException; +import java.util.Map; +import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicLong; /** - * Thread that will write the the prune upper bound + * Thread that will write the the prune upper bound. An instance of this class should be obtained only + * through {@link PruneUpperBoundWriterSupplier} which will also handle the lifecycle of this instance. */ -public class PruneUpperBoundWriter { - private static final Log LOG = LogFactory.getLog(TransactionProcessor.class); +public class PruneUpperBoundWriter extends AbstractIdleService { + private static final Log LOG = LogFactory.getLog(PruneUpperBoundWriter.class); - private final TableName pruneStateTable; + private final TableName tableName; private final DataJanitorState dataJanitorState; - private final byte[] regionName; - private final String regionNameAsString; private final long pruneFlushInterval; - private final AtomicLong pruneUpperBound; - private final AtomicBoolean shouldFlush; + private final ConcurrentSkipListMap<byte[], Long> pruneEntries; + + private volatile Thread flushThread; - private Thread flushThread; private long lastChecked; - public PruneUpperBoundWriter(DataJanitorState dataJanitorState, TableName pruneStateTable, String regionNameAsString, - byte[] regionName, long pruneFlushInterval) { - this.pruneStateTable = pruneStateTable; + public PruneUpperBoundWriter(TableName tableName, DataJanitorState dataJanitorState, long pruneFlushInterval) { + this.tableName = tableName; this.dataJanitorState = dataJanitorState; - this.regionName = regionName; - this.regionNameAsString = regionNameAsString; this.pruneFlushInterval = pruneFlushInterval; - this.pruneUpperBound = new AtomicLong(); - this.shouldFlush = new AtomicBoolean(false); - startFlushThread(); + this.pruneEntries = new ConcurrentSkipListMap<>(Bytes.BYTES_COMPARATOR); + } + + public void persistPruneEntry(byte[] regionName, long pruneUpperBound) { + // The number of entries in this map is bound by the number of regions in this region server and thus it will not + // grow indefinitely + pruneEntries.put(regionName, pruneUpperBound); } public boolean isAlive() { - return flushThread.isAlive(); + return flushThread != null && flushThread.isAlive(); } - public void persistPruneEntry(long pruneUpperBound) { - this.pruneUpperBound.set(pruneUpperBound); - this.shouldFlush.set(true); + @Override + protected void startUp() throws Exception { + LOG.info("Starting PruneUpperBoundWriter Thread."); + startFlushThread(); } - public void stop() { + @Override + protected void shutDown() throws Exception { + LOG.info("Stopping PruneUpperBoundWriter Thread."); if (flushThread != null) { flushThread.interrupt(); + flushThread.join(TimeUnit.SECONDS.toMillis(1)); } } @@ -76,19 +81,21 @@ public class PruneUpperBoundWriter { flushThread = new Thread("tephra-prune-upper-bound-writer") { @Override public void run() { - while (!isInterrupted()) { + while ((!isInterrupted()) && isRunning()) { long now = System.currentTimeMillis(); if (now > (lastChecked + pruneFlushInterval)) { - if (shouldFlush.compareAndSet(true, false)) { - // should flush data - try { - dataJanitorState.savePruneUpperBoundForRegion(regionName, pruneUpperBound.get()); - } catch (IOException ex) { - LOG.warn("Cannot record prune upper bound for region " + regionNameAsString + " in the table " + - pruneStateTable.getNameWithNamespaceInclAsString() + " after compacting region.", ex); - // Retry again - shouldFlush.set(true); + // should flush data + try { + while (pruneEntries.firstEntry() != null) { + Map.Entry<byte[], Long> firstEntry = pruneEntries.firstEntry(); + dataJanitorState.savePruneUpperBoundForRegion(firstEntry.getKey(), firstEntry.getValue()); + // We can now remove the entry only if the key and value match with what we wrote since it is + // possible that a new pruneUpperBound for the same key has been added + pruneEntries.remove(firstEntry.getKey(), firstEntry.getValue()); } + } catch (IOException ex) { + LOG.warn("Cannot record prune upper bound for a region to table " + + tableName.getNameWithNamespaceInclAsString(), ex); } lastChecked = now; } http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/69a6cc6b/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriterSupplier.java ---------------------------------------------------------------------- diff --git a/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriterSupplier.java b/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriterSupplier.java new file mode 100644 index 0000000..98f3334 --- /dev/null +++ b/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriterSupplier.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tephra.hbase.txprune; + + +import com.google.common.base.Supplier; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.TableName; + +/** + * Supplies instances of {@link PruneUpperBoundWriter} implementations. + */ +public class PruneUpperBoundWriterSupplier implements Supplier<PruneUpperBoundWriter> { + private static final Log LOG = LogFactory.getLog(PruneUpperBoundWriterSupplier.class); + + private static volatile PruneUpperBoundWriter instance; + private static volatile int refCount = 0; + private static final Object lock = new Object(); + + private final TableName tableName; + private final DataJanitorState dataJanitorState; + private final long pruneFlushInterval; + + public PruneUpperBoundWriterSupplier(TableName tableName, DataJanitorState dataJanitorState, + long pruneFlushInterval) { + this.tableName = tableName; + this.dataJanitorState = dataJanitorState; + this.pruneFlushInterval = pruneFlushInterval; + } + + @Override + public PruneUpperBoundWriter get() { + synchronized (lock) { + if (instance == null) { + instance = new PruneUpperBoundWriter(tableName, dataJanitorState, pruneFlushInterval); + instance.startAndWait(); + } + refCount++; + if (LOG.isDebugEnabled()) { + LOG.debug("Incrementing Reference Count for PruneUpperBoundWriter " + refCount); + } + return instance; + } + } + + public void release() { + synchronized (lock) { + refCount--; + if (LOG.isDebugEnabled()) { + LOG.debug("Decrementing Reference Count for PruneUpperBoundWriter " + refCount); + } + + if (refCount == 0) { + try { + instance.stopAndWait(); + } catch (Exception ex) { + LOG.warn("Exception while trying to shutdown PruneUpperBoundWriter thread. ", ex); + } finally { + // If the thread is still alive (might happen if the thread was blocked on HBase PUT call), interrupt it again + if (instance.isAlive()) { + try { + instance.shutDown(); + } catch (Exception e) { + LOG.warn("Exception while trying to shutdown PruneUpperBoundWriter thread. ", e); + } + } + instance = null; + } + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/69a6cc6b/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriterSupplierTest.java ---------------------------------------------------------------------- diff --git a/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriterSupplierTest.java b/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriterSupplierTest.java new file mode 100644 index 0000000..08d3b49 --- /dev/null +++ b/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriterSupplierTest.java @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tephra.hbase.txprune; + +import org.junit.Assert; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.List; +import java.util.Random; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * Tests to verify the behavior of {@link PruneUpperBoundWriterSupplier}. + */ +public class PruneUpperBoundWriterSupplierTest { + private static final Logger LOG = LoggerFactory.getLogger(PruneUpperBoundWriterSupplierTest.class); + private static final int NUM_OPS = 10000; + private static final int NUM_THREADS = 50; + + @Test + public void testSupplier() throws Exception { + final PruneUpperBoundWriterSupplier supplier = new PruneUpperBoundWriterSupplier(null, null, 10L); + // Get one instance now, for later comparisons + final PruneUpperBoundWriter writer = supplier.get(); + final AtomicInteger numOps = new AtomicInteger(NUM_OPS); + final Random random = new Random(System.currentTimeMillis()); + + // Start threads that will 'get' PruneUpperBoundWriters + ExecutorService executor = Executors.newFixedThreadPool(NUM_THREADS); + List<Future> futureList = new ArrayList<>(); + for (int i = 0; i < NUM_THREADS; i++) { + futureList.add(executor.submit(new Runnable() { + + @Override + public void run() { + // Perform NUM_OPS 'gets' of PruneUpperBoundWriter + while (numOps.decrementAndGet() > 0) { + PruneUpperBoundWriter newWriter = supplier.get(); + Assert.assertTrue(newWriter == writer); + int waitTime = random.nextInt(10); + try { + TimeUnit.MICROSECONDS.sleep(waitTime); + } catch (InterruptedException e) { + LOG.warn("Received an exception.", e); + } + } + } + })); + } + + for (Future future : futureList) { + future.get(5, TimeUnit.SECONDS); + } + executor.shutdown(); + executor.awaitTermination(2, TimeUnit.SECONDS); + + futureList.clear(); + numOps.set(NUM_OPS); + // Start thread that release PruneUpperBoundWriters + executor = Executors.newFixedThreadPool(NUM_THREADS); + for (int i = 0; i < NUM_THREADS; i++) { + futureList.add(executor.submit(new Runnable() { + + @Override + public void run() { + // We need to release all NUM_OPS 'gets' that were executed to trigger shutdown of the single instance of + // PruneUpperBoundWriter + while (numOps.decrementAndGet() > 0) { + supplier.release(); + try { + TimeUnit.MICROSECONDS.sleep(random.nextInt(10)); + } catch (InterruptedException e) { + LOG.warn("Received an exception.", e); + } + } + } + })); + } + + for (Future future : futureList) { + future.get(1, TimeUnit.SECONDS); + } + + executor.shutdown(); + executor.awaitTermination(2, TimeUnit.SECONDS); + + // Verify that the PruneUpperBoundWriter is still running and the pruneThread is still alive. + Assert.assertTrue(writer.isRunning()); + Assert.assertTrue(writer.isAlive()); + + // Since we got one instance in the beginning, we need to release it + supplier.release(); + + // Verify that the PruneUpperBoundWriter is shutdown and the pruneThread is not alive anymore. + Assert.assertFalse(writer.isRunning()); + Assert.assertFalse(writer.isAlive()); + } +}
