Repository: incubator-tephra Updated Branches: refs/heads/master 3af3da869 -> 9b63985fc
TEPHRA-152 Using ReferenceCounting for TransactionStateCache refresh thread, so that it can be stopped This closes #41 from GitHub. Signed-off-by: Gokul Gunasekaran <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/incubator-tephra/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-tephra/commit/9b63985f Tree: http://git-wip-us.apache.org/repos/asf/incubator-tephra/tree/9b63985f Diff: http://git-wip-us.apache.org/repos/asf/incubator-tephra/diff/9b63985f Branch: refs/heads/master Commit: 9b63985fcfbe25469270c77d2bfb17e21e27ce5c Parents: 3af3da8 Author: Gokul Gunasekaran <[email protected]> Authored: Tue Mar 21 15:37:48 2017 -0700 Committer: Gokul Gunasekaran <[email protected]> Committed: Wed May 17 17:16:32 2017 -0700 ---------------------------------------------------------------------- .../tephra/coprocessor/CacheSupplier.java | 43 +++++++++ .../coprocessor/ReferenceCountedSupplier.java | 94 ++++++++++++++++++++ .../TransactionStateCacheSupplier.java | 43 +++++---- .../hbase/coprocessor/TransactionProcessor.java | 15 +++- .../hbase/txprune/PruneUpperBoundWriter.java | 4 + .../txprune/PruneUpperBoundWriterSupplier.java | 66 ++++---------- .../hbase/txprune/InvalidListPruneTest.java | 11 ++- .../hbase/coprocessor/TransactionProcessor.java | 12 ++- .../hbase/txprune/PruneUpperBoundWriter.java | 4 + .../txprune/PruneUpperBoundWriterSupplier.java | 66 ++++---------- .../hbase/txprune/InvalidListPruneTest.java | 11 ++- .../hbase/coprocessor/TransactionProcessor.java | 15 +++- .../hbase/txprune/PruneUpperBoundWriter.java | 4 + .../txprune/PruneUpperBoundWriterSupplier.java | 66 ++++---------- .../hbase/txprune/InvalidListPruneTest.java | 11 ++- .../hbase/coprocessor/TransactionProcessor.java | 15 +++- .../hbase/txprune/PruneUpperBoundWriter.java | 4 + .../txprune/PruneUpperBoundWriterSupplier.java | 66 ++++---------- .../hbase/txprune/InvalidListPruneTest.java | 11 ++- .../hbase/coprocessor/TransactionProcessor.java | 15 +++- .../hbase/txprune/PruneUpperBoundWriter.java | 4 + .../txprune/PruneUpperBoundWriterSupplier.java | 66 ++++---------- .../hbase/txprune/InvalidListPruneTest.java | 11 ++- 23 files changed, 357 insertions(+), 300 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9b63985f/tephra-core/src/main/java/org/apache/tephra/coprocessor/CacheSupplier.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/org/apache/tephra/coprocessor/CacheSupplier.java b/tephra-core/src/main/java/org/apache/tephra/coprocessor/CacheSupplier.java new file mode 100644 index 0000000..db93965 --- /dev/null +++ b/tephra-core/src/main/java/org/apache/tephra/coprocessor/CacheSupplier.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tephra.coprocessor; + +import com.google.common.base.Supplier; +import com.google.common.util.concurrent.Service; + +/** + * Provides ability to get and release objects + * + * @param <T> type of the object supplied + */ +public interface CacheSupplier<T extends Service> extends Supplier<T> { + + /** + * @return Get an instance of T and if it is the first call, then the service will be started. Subsequent calls + * will get a reference to the same instance + */ + @Override + T get(); + + /** + * Release the object obtained through {code Supplier#get()}. If this is the last release call, then the service will + * be stopped. + */ + void release(); +} http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9b63985f/tephra-core/src/main/java/org/apache/tephra/coprocessor/ReferenceCountedSupplier.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/org/apache/tephra/coprocessor/ReferenceCountedSupplier.java b/tephra-core/src/main/java/org/apache/tephra/coprocessor/ReferenceCountedSupplier.java new file mode 100644 index 0000000..a0fa7ad --- /dev/null +++ b/tephra-core/src/main/java/org/apache/tephra/coprocessor/ReferenceCountedSupplier.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tephra.coprocessor; + +import com.google.common.base.Supplier; +import com.google.common.util.concurrent.Service; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; + +/** + * Reference counts the {@link Service} and manages the lifecycle of the {@link Service} instance. + * + * @param <T> type of {@link Service} that is reference counted + */ +public class ReferenceCountedSupplier<T extends Service> { + private static final Log LOG = LogFactory.getLog(ReferenceCountedSupplier.class); + + private final AtomicReference<T> instance = new AtomicReference<>(null); + private final AtomicInteger refCount = new AtomicInteger(0); + private final Object lock = new Object(); + + private final String instanceName; + + public ReferenceCountedSupplier(String instanceName) { + this.instanceName = instanceName; + } + + public T getOrCreate(Supplier<T> instanceSupplier) { + synchronized (lock) { + if (instance.get() == null) { + if (LOG.isDebugEnabled()) { + LOG.debug(String.format("Creating and starting Service %s.", instanceName)); + } + + // Instance has not been instantiated + T serviceInstance = instanceSupplier.get(); + instance.set(serviceInstance); + serviceInstance.start(); + } + int newCount = refCount.incrementAndGet(); + if (LOG.isDebugEnabled()) { + LOG.debug(String.format("Incrementing reference count for Service %s: %d", instanceName, newCount)); + } + return instance.get(); + } + } + + public void release() { + synchronized (lock) { + if (refCount.get() <= 0) { + LOG.warn(String.format("Reference Count for Service %s is already zero.", instanceName)); + return; + } + + int newCount = refCount.decrementAndGet(); + if (LOG.isDebugEnabled()) { + LOG.debug(String.format("Decrementing reference count for Service %s: %d", instanceName, newCount)); + } + + if (newCount == 0) { + try { + if (LOG.isDebugEnabled()) { + LOG.debug(String.format("Reference Count for Service is 0. Stopping Service %s.", instanceName)); + } + + Service serviceInstance = instance.get(); + serviceInstance.stopAndWait(); + instance.set(null); + } catch (Exception ex) { + LOG.warn(String.format("Exception while trying to stop Service %s.", instanceName), ex); + } + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9b63985f/tephra-core/src/main/java/org/apache/tephra/coprocessor/TransactionStateCacheSupplier.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/org/apache/tephra/coprocessor/TransactionStateCacheSupplier.java b/tephra-core/src/main/java/org/apache/tephra/coprocessor/TransactionStateCacheSupplier.java index d19da36..db0ca50 100644 --- a/tephra-core/src/main/java/org/apache/tephra/coprocessor/TransactionStateCacheSupplier.java +++ b/tephra-core/src/main/java/org/apache/tephra/coprocessor/TransactionStateCacheSupplier.java @@ -24,31 +24,40 @@ import org.apache.hadoop.conf.Configuration; /** * Supplies instances of {@link TransactionStateCache} implementations. */ -public class TransactionStateCacheSupplier implements Supplier<TransactionStateCache> { - protected static volatile TransactionStateCache instance; - protected static Object lock = new Object(); +public class TransactionStateCacheSupplier implements CacheSupplier<TransactionStateCache> { - protected final Configuration conf; + private static final ReferenceCountedSupplier<TransactionStateCache> referenceCountedSupplier = + new ReferenceCountedSupplier<>(TransactionStateCache.class.getSimpleName()); - public TransactionStateCacheSupplier(Configuration conf) { - this.conf = conf; + private final Supplier<TransactionStateCache> supplier; + + public TransactionStateCacheSupplier(Supplier<TransactionStateCache> supplier) { + this.supplier = supplier; + } + + public TransactionStateCacheSupplier(final Configuration conf) { + this.supplier = new Supplier<TransactionStateCache>() { + @Override + public TransactionStateCache get() { + TransactionStateCache transactionStateCache = new TransactionStateCache(); + transactionStateCache.setConf(conf); + return transactionStateCache; + } + }; } /** * Returns a singleton instance of the transaction state cache, performing lazy initialization if necessary. - * @return A shared instance of the transaction state cache. + * + * @return A shared instance of the transaction state cache */ @Override public TransactionStateCache get() { - if (instance == null) { - synchronized (lock) { - if (instance == null) { - instance = new TransactionStateCache(); - instance.setConf(conf); - instance.start(); - } - } - } - return instance; + return referenceCountedSupplier.getOrCreate(supplier); + } + + @Override + public void release() { + referenceCountedSupplier.release(); } } http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9b63985f/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java ---------------------------------------------------------------------- diff --git a/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java b/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java index d2402a6..10ecfa4 100644 --- a/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java +++ b/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java @@ -18,7 +18,6 @@ package org.apache.tephra.hbase.coprocessor; -import com.google.common.base.Supplier; import com.google.common.collect.Maps; import com.google.common.collect.Sets; import org.apache.commons.logging.Log; @@ -57,6 +56,7 @@ import org.apache.hadoop.hbase.util.Bytes; import org.apache.tephra.Transaction; import org.apache.tephra.TransactionCodec; import org.apache.tephra.TxConstants; +import org.apache.tephra.coprocessor.CacheSupplier; import org.apache.tephra.coprocessor.TransactionStateCache; import org.apache.tephra.coprocessor.TransactionStateCacheSupplier; import org.apache.tephra.hbase.txprune.CompactionState; @@ -108,6 +108,7 @@ public class TransactionProcessor extends BaseRegionObserver { private final TransactionCodec txCodec; private TransactionStateCache cache; private volatile CompactionState compactionState; + private CacheSupplier<TransactionStateCache> cacheSupplier; protected volatile Boolean pruneEnable; protected volatile Long txMaxLifetimeMillis; @@ -125,7 +126,7 @@ public class TransactionProcessor extends BaseRegionObserver { public void start(CoprocessorEnvironment e) throws IOException { if (e instanceof RegionCoprocessorEnvironment) { RegionCoprocessorEnvironment env = (RegionCoprocessorEnvironment) e; - Supplier<TransactionStateCache> cacheSupplier = getTransactionStateCacheSupplier(env); + this.cacheSupplier = getTransactionStateCacheSupplier(env); this.cache = cacheSupplier.get(); HTableDescriptor tableDesc = env.getRegion().getTableDesc(); @@ -168,13 +169,19 @@ public class TransactionProcessor extends BaseRegionObserver { return env.getConfiguration(); } - protected Supplier<TransactionStateCache> getTransactionStateCacheSupplier(RegionCoprocessorEnvironment env) { + protected CacheSupplier<TransactionStateCache> getTransactionStateCacheSupplier(RegionCoprocessorEnvironment env) { return new TransactionStateCacheSupplier(env.getConfiguration()); } @Override public void stop(CoprocessorEnvironment e) throws IOException { - resetPruneState(); + try { + resetPruneState(); + } finally { + if (cacheSupplier != null) { + cacheSupplier.release(); + } + } } @Override http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9b63985f/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java ---------------------------------------------------------------------- diff --git a/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java b/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java index 1c26ef1..5e0d435 100644 --- a/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java +++ b/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java @@ -94,6 +94,10 @@ public class PruneUpperBoundWriter extends AbstractIdleService { if (flushThread != null) { flushThread.interrupt(); flushThread.join(TimeUnit.SECONDS.toMillis(1)); + if (flushThread.isAlive()) { + flushThread.interrupt(); + flushThread.join(TimeUnit.SECONDS.toMillis(1)); + } } } http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9b63985f/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriterSupplier.java ---------------------------------------------------------------------- diff --git a/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriterSupplier.java b/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriterSupplier.java index 98f3334..cb93fab 100644 --- a/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriterSupplier.java +++ b/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriterSupplier.java @@ -20,70 +20,36 @@ package org.apache.tephra.hbase.txprune; import com.google.common.base.Supplier; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.TableName; +import org.apache.tephra.coprocessor.CacheSupplier; +import org.apache.tephra.coprocessor.ReferenceCountedSupplier; /** * Supplies instances of {@link PruneUpperBoundWriter} implementations. */ -public class PruneUpperBoundWriterSupplier implements Supplier<PruneUpperBoundWriter> { - private static final Log LOG = LogFactory.getLog(PruneUpperBoundWriterSupplier.class); +public class PruneUpperBoundWriterSupplier implements CacheSupplier<PruneUpperBoundWriter> { - private static volatile PruneUpperBoundWriter instance; - private static volatile int refCount = 0; - private static final Object lock = new Object(); + private static final ReferenceCountedSupplier<PruneUpperBoundWriter> referenceCountedSupplier = + new ReferenceCountedSupplier<>(PruneUpperBoundWriter.class.getSimpleName()); - private final TableName tableName; - private final DataJanitorState dataJanitorState; - private final long pruneFlushInterval; + private final Supplier<PruneUpperBoundWriter> supplier; - public PruneUpperBoundWriterSupplier(TableName tableName, DataJanitorState dataJanitorState, - long pruneFlushInterval) { - this.tableName = tableName; - this.dataJanitorState = dataJanitorState; - this.pruneFlushInterval = pruneFlushInterval; + public PruneUpperBoundWriterSupplier(final TableName tableName, final DataJanitorState dataJanitorState, + final long pruneFlushInterval) { + this.supplier = new Supplier<PruneUpperBoundWriter>() { + @Override + public PruneUpperBoundWriter get() { + return new PruneUpperBoundWriter(tableName, dataJanitorState, pruneFlushInterval); + } + }; } @Override public PruneUpperBoundWriter get() { - synchronized (lock) { - if (instance == null) { - instance = new PruneUpperBoundWriter(tableName, dataJanitorState, pruneFlushInterval); - instance.startAndWait(); - } - refCount++; - if (LOG.isDebugEnabled()) { - LOG.debug("Incrementing Reference Count for PruneUpperBoundWriter " + refCount); - } - return instance; - } + return referenceCountedSupplier.getOrCreate(supplier); } public void release() { - synchronized (lock) { - refCount--; - if (LOG.isDebugEnabled()) { - LOG.debug("Decrementing Reference Count for PruneUpperBoundWriter " + refCount); - } - - if (refCount == 0) { - try { - instance.stopAndWait(); - } catch (Exception ex) { - LOG.warn("Exception while trying to shutdown PruneUpperBoundWriter thread. ", ex); - } finally { - // If the thread is still alive (might happen if the thread was blocked on HBase PUT call), interrupt it again - if (instance.isAlive()) { - try { - instance.shutDown(); - } catch (Exception e) { - LOG.warn("Exception while trying to shutdown PruneUpperBoundWriter thread. ", e); - } - } - instance = null; - } - } - } + referenceCountedSupplier.release(); } } http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9b63985f/tephra-hbase-compat-0.96/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java ---------------------------------------------------------------------- diff --git a/tephra-hbase-compat-0.96/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java b/tephra-hbase-compat-0.96/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java index e3f5c6b..91bbc1a 100644 --- a/tephra-hbase-compat-0.96/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java +++ b/tephra-hbase-compat-0.96/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java @@ -19,7 +19,6 @@ package org.apache.tephra.hbase.txprune; -import com.google.common.base.Supplier; import com.google.common.collect.ImmutableSet; import com.google.common.collect.ImmutableSortedMap; import com.google.common.collect.ImmutableSortedSet; @@ -41,6 +40,7 @@ import org.apache.hadoop.hbase.util.Bytes; import org.apache.tephra.TransactionContext; import org.apache.tephra.TransactionManager; import org.apache.tephra.TxConstants; +import org.apache.tephra.coprocessor.CacheSupplier; import org.apache.tephra.coprocessor.TransactionStateCache; import org.apache.tephra.hbase.AbstractHBaseTableTest; import org.apache.tephra.hbase.TransactionAwareHTable; @@ -404,12 +404,17 @@ public class InvalidListPruneTest extends AbstractHBaseTableTest { private static final AtomicLong lastMajorCompactionTime = new AtomicLong(-1); @Override - protected Supplier<TransactionStateCache> getTransactionStateCacheSupplier(RegionCoprocessorEnvironment env) { - return new Supplier<TransactionStateCache>() { + protected CacheSupplier<TransactionStateCache> getTransactionStateCacheSupplier(RegionCoprocessorEnvironment env) { + return new CacheSupplier<TransactionStateCache>() { @Override public TransactionStateCache get() { return new InMemoryTransactionStateCache(); } + + @Override + public void release() { + // no-op + } }; } http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9b63985f/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java ---------------------------------------------------------------------- diff --git a/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java b/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java index 84776cf..30b69a1 100644 --- a/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java +++ b/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java @@ -57,6 +57,7 @@ import org.apache.hadoop.hbase.util.Bytes; import org.apache.tephra.Transaction; import org.apache.tephra.TransactionCodec; import org.apache.tephra.TxConstants; +import org.apache.tephra.coprocessor.CacheSupplier; import org.apache.tephra.coprocessor.TransactionStateCache; import org.apache.tephra.coprocessor.TransactionStateCacheSupplier; import org.apache.tephra.hbase.txprune.CompactionState; @@ -108,6 +109,7 @@ public class TransactionProcessor extends BaseRegionObserver { private final TransactionCodec txCodec; private TransactionStateCache cache; private volatile CompactionState compactionState; + private CacheSupplier<TransactionStateCache> cacheSupplier; protected volatile Boolean pruneEnable; protected volatile Long txMaxLifetimeMillis; @@ -168,13 +170,19 @@ public class TransactionProcessor extends BaseRegionObserver { return env.getConfiguration(); } - protected Supplier<TransactionStateCache> getTransactionStateCacheSupplier(RegionCoprocessorEnvironment env) { + protected CacheSupplier<TransactionStateCache> getTransactionStateCacheSupplier(RegionCoprocessorEnvironment env) { return new TransactionStateCacheSupplier(env.getConfiguration()); } @Override public void stop(CoprocessorEnvironment e) throws IOException { - resetPruneState(); + try { + resetPruneState(); + } finally { + if (cacheSupplier != null) { + cacheSupplier.release(); + } + } } @Override http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9b63985f/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java ---------------------------------------------------------------------- diff --git a/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java b/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java index 1c26ef1..5e0d435 100644 --- a/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java +++ b/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java @@ -94,6 +94,10 @@ public class PruneUpperBoundWriter extends AbstractIdleService { if (flushThread != null) { flushThread.interrupt(); flushThread.join(TimeUnit.SECONDS.toMillis(1)); + if (flushThread.isAlive()) { + flushThread.interrupt(); + flushThread.join(TimeUnit.SECONDS.toMillis(1)); + } } } http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9b63985f/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriterSupplier.java ---------------------------------------------------------------------- diff --git a/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriterSupplier.java b/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriterSupplier.java index 98f3334..cb93fab 100644 --- a/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriterSupplier.java +++ b/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriterSupplier.java @@ -20,70 +20,36 @@ package org.apache.tephra.hbase.txprune; import com.google.common.base.Supplier; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.TableName; +import org.apache.tephra.coprocessor.CacheSupplier; +import org.apache.tephra.coprocessor.ReferenceCountedSupplier; /** * Supplies instances of {@link PruneUpperBoundWriter} implementations. */ -public class PruneUpperBoundWriterSupplier implements Supplier<PruneUpperBoundWriter> { - private static final Log LOG = LogFactory.getLog(PruneUpperBoundWriterSupplier.class); +public class PruneUpperBoundWriterSupplier implements CacheSupplier<PruneUpperBoundWriter> { - private static volatile PruneUpperBoundWriter instance; - private static volatile int refCount = 0; - private static final Object lock = new Object(); + private static final ReferenceCountedSupplier<PruneUpperBoundWriter> referenceCountedSupplier = + new ReferenceCountedSupplier<>(PruneUpperBoundWriter.class.getSimpleName()); - private final TableName tableName; - private final DataJanitorState dataJanitorState; - private final long pruneFlushInterval; + private final Supplier<PruneUpperBoundWriter> supplier; - public PruneUpperBoundWriterSupplier(TableName tableName, DataJanitorState dataJanitorState, - long pruneFlushInterval) { - this.tableName = tableName; - this.dataJanitorState = dataJanitorState; - this.pruneFlushInterval = pruneFlushInterval; + public PruneUpperBoundWriterSupplier(final TableName tableName, final DataJanitorState dataJanitorState, + final long pruneFlushInterval) { + this.supplier = new Supplier<PruneUpperBoundWriter>() { + @Override + public PruneUpperBoundWriter get() { + return new PruneUpperBoundWriter(tableName, dataJanitorState, pruneFlushInterval); + } + }; } @Override public PruneUpperBoundWriter get() { - synchronized (lock) { - if (instance == null) { - instance = new PruneUpperBoundWriter(tableName, dataJanitorState, pruneFlushInterval); - instance.startAndWait(); - } - refCount++; - if (LOG.isDebugEnabled()) { - LOG.debug("Incrementing Reference Count for PruneUpperBoundWriter " + refCount); - } - return instance; - } + return referenceCountedSupplier.getOrCreate(supplier); } public void release() { - synchronized (lock) { - refCount--; - if (LOG.isDebugEnabled()) { - LOG.debug("Decrementing Reference Count for PruneUpperBoundWriter " + refCount); - } - - if (refCount == 0) { - try { - instance.stopAndWait(); - } catch (Exception ex) { - LOG.warn("Exception while trying to shutdown PruneUpperBoundWriter thread. ", ex); - } finally { - // If the thread is still alive (might happen if the thread was blocked on HBase PUT call), interrupt it again - if (instance.isAlive()) { - try { - instance.shutDown(); - } catch (Exception e) { - LOG.warn("Exception while trying to shutdown PruneUpperBoundWriter thread. ", e); - } - } - instance = null; - } - } - } + referenceCountedSupplier.release(); } } http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9b63985f/tephra-hbase-compat-0.98/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java ---------------------------------------------------------------------- diff --git a/tephra-hbase-compat-0.98/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java b/tephra-hbase-compat-0.98/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java index e3f5c6b..91bbc1a 100644 --- a/tephra-hbase-compat-0.98/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java +++ b/tephra-hbase-compat-0.98/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java @@ -19,7 +19,6 @@ package org.apache.tephra.hbase.txprune; -import com.google.common.base.Supplier; import com.google.common.collect.ImmutableSet; import com.google.common.collect.ImmutableSortedMap; import com.google.common.collect.ImmutableSortedSet; @@ -41,6 +40,7 @@ import org.apache.hadoop.hbase.util.Bytes; import org.apache.tephra.TransactionContext; import org.apache.tephra.TransactionManager; import org.apache.tephra.TxConstants; +import org.apache.tephra.coprocessor.CacheSupplier; import org.apache.tephra.coprocessor.TransactionStateCache; import org.apache.tephra.hbase.AbstractHBaseTableTest; import org.apache.tephra.hbase.TransactionAwareHTable; @@ -404,12 +404,17 @@ public class InvalidListPruneTest extends AbstractHBaseTableTest { private static final AtomicLong lastMajorCompactionTime = new AtomicLong(-1); @Override - protected Supplier<TransactionStateCache> getTransactionStateCacheSupplier(RegionCoprocessorEnvironment env) { - return new Supplier<TransactionStateCache>() { + protected CacheSupplier<TransactionStateCache> getTransactionStateCacheSupplier(RegionCoprocessorEnvironment env) { + return new CacheSupplier<TransactionStateCache>() { @Override public TransactionStateCache get() { return new InMemoryTransactionStateCache(); } + + @Override + public void release() { + // no-op + } }; } http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9b63985f/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java ---------------------------------------------------------------------- diff --git a/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java b/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java index b73bdc1..ca96052 100644 --- a/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java +++ b/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java @@ -18,7 +18,6 @@ package org.apache.tephra.hbase.coprocessor; -import com.google.common.base.Supplier; import com.google.common.collect.Maps; import com.google.common.collect.Sets; import org.apache.commons.logging.Log; @@ -57,6 +56,7 @@ import org.apache.hadoop.hbase.util.Bytes; import org.apache.tephra.Transaction; import org.apache.tephra.TransactionCodec; import org.apache.tephra.TxConstants; +import org.apache.tephra.coprocessor.CacheSupplier; import org.apache.tephra.coprocessor.TransactionStateCache; import org.apache.tephra.coprocessor.TransactionStateCacheSupplier; import org.apache.tephra.hbase.txprune.CompactionState; @@ -108,6 +108,7 @@ public class TransactionProcessor extends BaseRegionObserver { private final TransactionCodec txCodec; private TransactionStateCache cache; private volatile CompactionState compactionState; + private CacheSupplier<TransactionStateCache> cacheSupplier; protected volatile Boolean pruneEnable; protected volatile Long txMaxLifetimeMillis; @@ -125,7 +126,7 @@ public class TransactionProcessor extends BaseRegionObserver { public void start(CoprocessorEnvironment e) throws IOException { if (e instanceof RegionCoprocessorEnvironment) { RegionCoprocessorEnvironment env = (RegionCoprocessorEnvironment) e; - Supplier<TransactionStateCache> cacheSupplier = getTransactionStateCacheSupplier(env); + this.cacheSupplier = getTransactionStateCacheSupplier(env); this.cache = cacheSupplier.get(); HTableDescriptor tableDesc = env.getRegion().getTableDesc(); @@ -168,13 +169,19 @@ public class TransactionProcessor extends BaseRegionObserver { return env.getConfiguration(); } - protected Supplier<TransactionStateCache> getTransactionStateCacheSupplier(RegionCoprocessorEnvironment env) { + protected CacheSupplier<TransactionStateCache> getTransactionStateCacheSupplier(RegionCoprocessorEnvironment env) { return new TransactionStateCacheSupplier(env.getConfiguration()); } @Override public void stop(CoprocessorEnvironment e) throws IOException { - resetPruneState(); + try { + resetPruneState(); + } finally { + if (cacheSupplier != null) { + cacheSupplier.release(); + } + } } @Override http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9b63985f/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java ---------------------------------------------------------------------- diff --git a/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java b/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java index 1c26ef1..5e0d435 100644 --- a/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java +++ b/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java @@ -94,6 +94,10 @@ public class PruneUpperBoundWriter extends AbstractIdleService { if (flushThread != null) { flushThread.interrupt(); flushThread.join(TimeUnit.SECONDS.toMillis(1)); + if (flushThread.isAlive()) { + flushThread.interrupt(); + flushThread.join(TimeUnit.SECONDS.toMillis(1)); + } } } http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9b63985f/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriterSupplier.java ---------------------------------------------------------------------- diff --git a/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriterSupplier.java b/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriterSupplier.java index 98f3334..cb93fab 100644 --- a/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriterSupplier.java +++ b/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriterSupplier.java @@ -20,70 +20,36 @@ package org.apache.tephra.hbase.txprune; import com.google.common.base.Supplier; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.TableName; +import org.apache.tephra.coprocessor.CacheSupplier; +import org.apache.tephra.coprocessor.ReferenceCountedSupplier; /** * Supplies instances of {@link PruneUpperBoundWriter} implementations. */ -public class PruneUpperBoundWriterSupplier implements Supplier<PruneUpperBoundWriter> { - private static final Log LOG = LogFactory.getLog(PruneUpperBoundWriterSupplier.class); +public class PruneUpperBoundWriterSupplier implements CacheSupplier<PruneUpperBoundWriter> { - private static volatile PruneUpperBoundWriter instance; - private static volatile int refCount = 0; - private static final Object lock = new Object(); + private static final ReferenceCountedSupplier<PruneUpperBoundWriter> referenceCountedSupplier = + new ReferenceCountedSupplier<>(PruneUpperBoundWriter.class.getSimpleName()); - private final TableName tableName; - private final DataJanitorState dataJanitorState; - private final long pruneFlushInterval; + private final Supplier<PruneUpperBoundWriter> supplier; - public PruneUpperBoundWriterSupplier(TableName tableName, DataJanitorState dataJanitorState, - long pruneFlushInterval) { - this.tableName = tableName; - this.dataJanitorState = dataJanitorState; - this.pruneFlushInterval = pruneFlushInterval; + public PruneUpperBoundWriterSupplier(final TableName tableName, final DataJanitorState dataJanitorState, + final long pruneFlushInterval) { + this.supplier = new Supplier<PruneUpperBoundWriter>() { + @Override + public PruneUpperBoundWriter get() { + return new PruneUpperBoundWriter(tableName, dataJanitorState, pruneFlushInterval); + } + }; } @Override public PruneUpperBoundWriter get() { - synchronized (lock) { - if (instance == null) { - instance = new PruneUpperBoundWriter(tableName, dataJanitorState, pruneFlushInterval); - instance.startAndWait(); - } - refCount++; - if (LOG.isDebugEnabled()) { - LOG.debug("Incrementing Reference Count for PruneUpperBoundWriter " + refCount); - } - return instance; - } + return referenceCountedSupplier.getOrCreate(supplier); } public void release() { - synchronized (lock) { - refCount--; - if (LOG.isDebugEnabled()) { - LOG.debug("Decrementing Reference Count for PruneUpperBoundWriter " + refCount); - } - - if (refCount == 0) { - try { - instance.stopAndWait(); - } catch (Exception ex) { - LOG.warn("Exception while trying to shutdown PruneUpperBoundWriter thread. ", ex); - } finally { - // If the thread is still alive (might happen if the thread was blocked on HBase PUT call), interrupt it again - if (instance.isAlive()) { - try { - instance.shutDown(); - } catch (Exception e) { - LOG.warn("Exception while trying to shutdown PruneUpperBoundWriter thread. ", e); - } - } - instance = null; - } - } - } + referenceCountedSupplier.release(); } } http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9b63985f/tephra-hbase-compat-1.0-cdh/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java ---------------------------------------------------------------------- diff --git a/tephra-hbase-compat-1.0-cdh/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java b/tephra-hbase-compat-1.0-cdh/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java index c99904b..f2c1abc 100644 --- a/tephra-hbase-compat-1.0-cdh/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java +++ b/tephra-hbase-compat-1.0-cdh/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java @@ -19,7 +19,6 @@ package org.apache.tephra.hbase.txprune; -import com.google.common.base.Supplier; import com.google.common.collect.ImmutableSet; import com.google.common.collect.ImmutableSortedMap; import com.google.common.collect.ImmutableSortedSet; @@ -39,6 +38,7 @@ import org.apache.hadoop.hbase.util.Bytes; import org.apache.tephra.TransactionContext; import org.apache.tephra.TransactionManager; import org.apache.tephra.TxConstants; +import org.apache.tephra.coprocessor.CacheSupplier; import org.apache.tephra.coprocessor.TransactionStateCache; import org.apache.tephra.hbase.AbstractHBaseTableTest; import org.apache.tephra.hbase.TransactionAwareHTable; @@ -400,12 +400,17 @@ public class InvalidListPruneTest extends AbstractHBaseTableTest { private static final AtomicLong lastMajorCompactionTime = new AtomicLong(-1); @Override - protected Supplier<TransactionStateCache> getTransactionStateCacheSupplier(RegionCoprocessorEnvironment env) { - return new Supplier<TransactionStateCache>() { + protected CacheSupplier<TransactionStateCache> getTransactionStateCacheSupplier(RegionCoprocessorEnvironment env) { + return new CacheSupplier<TransactionStateCache>() { @Override public TransactionStateCache get() { return new InMemoryTransactionStateCache(); } + + @Override + public void release() { + // no-op + } }; } http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9b63985f/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java ---------------------------------------------------------------------- diff --git a/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java b/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java index f9bb35e..263ee98 100644 --- a/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java +++ b/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java @@ -18,7 +18,6 @@ package org.apache.tephra.hbase.coprocessor; -import com.google.common.base.Supplier; import com.google.common.collect.Maps; import com.google.common.collect.Sets; import org.apache.commons.logging.Log; @@ -57,6 +56,7 @@ import org.apache.hadoop.hbase.util.Bytes; import org.apache.tephra.Transaction; import org.apache.tephra.TransactionCodec; import org.apache.tephra.TxConstants; +import org.apache.tephra.coprocessor.CacheSupplier; import org.apache.tephra.coprocessor.TransactionStateCache; import org.apache.tephra.coprocessor.TransactionStateCacheSupplier; import org.apache.tephra.hbase.txprune.CompactionState; @@ -108,6 +108,7 @@ public class TransactionProcessor extends BaseRegionObserver { private final TransactionCodec txCodec; private TransactionStateCache cache; private volatile CompactionState compactionState; + private CacheSupplier<TransactionStateCache> cacheSupplier; protected volatile Boolean pruneEnable; protected volatile Long txMaxLifetimeMillis; @@ -125,7 +126,7 @@ public class TransactionProcessor extends BaseRegionObserver { public void start(CoprocessorEnvironment e) throws IOException { if (e instanceof RegionCoprocessorEnvironment) { RegionCoprocessorEnvironment env = (RegionCoprocessorEnvironment) e; - Supplier<TransactionStateCache> cacheSupplier = getTransactionStateCacheSupplier(env); + this.cacheSupplier = getTransactionStateCacheSupplier(env); this.cache = cacheSupplier.get(); HTableDescriptor tableDesc = env.getRegion().getTableDesc(); @@ -168,13 +169,19 @@ public class TransactionProcessor extends BaseRegionObserver { return env.getConfiguration(); } - protected Supplier<TransactionStateCache> getTransactionStateCacheSupplier(RegionCoprocessorEnvironment env) { + protected CacheSupplier<TransactionStateCache> getTransactionStateCacheSupplier(RegionCoprocessorEnvironment env) { return new TransactionStateCacheSupplier(env.getConfiguration()); } @Override public void stop(CoprocessorEnvironment e) throws IOException { - resetPruneState(); + try { + resetPruneState(); + } finally { + if (cacheSupplier != null) { + cacheSupplier.release(); + } + } } @Override http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9b63985f/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java ---------------------------------------------------------------------- diff --git a/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java b/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java index 1c26ef1..5e0d435 100644 --- a/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java +++ b/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java @@ -94,6 +94,10 @@ public class PruneUpperBoundWriter extends AbstractIdleService { if (flushThread != null) { flushThread.interrupt(); flushThread.join(TimeUnit.SECONDS.toMillis(1)); + if (flushThread.isAlive()) { + flushThread.interrupt(); + flushThread.join(TimeUnit.SECONDS.toMillis(1)); + } } } http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9b63985f/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriterSupplier.java ---------------------------------------------------------------------- diff --git a/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriterSupplier.java b/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriterSupplier.java index 98f3334..cb93fab 100644 --- a/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriterSupplier.java +++ b/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriterSupplier.java @@ -20,70 +20,36 @@ package org.apache.tephra.hbase.txprune; import com.google.common.base.Supplier; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.TableName; +import org.apache.tephra.coprocessor.CacheSupplier; +import org.apache.tephra.coprocessor.ReferenceCountedSupplier; /** * Supplies instances of {@link PruneUpperBoundWriter} implementations. */ -public class PruneUpperBoundWriterSupplier implements Supplier<PruneUpperBoundWriter> { - private static final Log LOG = LogFactory.getLog(PruneUpperBoundWriterSupplier.class); +public class PruneUpperBoundWriterSupplier implements CacheSupplier<PruneUpperBoundWriter> { - private static volatile PruneUpperBoundWriter instance; - private static volatile int refCount = 0; - private static final Object lock = new Object(); + private static final ReferenceCountedSupplier<PruneUpperBoundWriter> referenceCountedSupplier = + new ReferenceCountedSupplier<>(PruneUpperBoundWriter.class.getSimpleName()); - private final TableName tableName; - private final DataJanitorState dataJanitorState; - private final long pruneFlushInterval; + private final Supplier<PruneUpperBoundWriter> supplier; - public PruneUpperBoundWriterSupplier(TableName tableName, DataJanitorState dataJanitorState, - long pruneFlushInterval) { - this.tableName = tableName; - this.dataJanitorState = dataJanitorState; - this.pruneFlushInterval = pruneFlushInterval; + public PruneUpperBoundWriterSupplier(final TableName tableName, final DataJanitorState dataJanitorState, + final long pruneFlushInterval) { + this.supplier = new Supplier<PruneUpperBoundWriter>() { + @Override + public PruneUpperBoundWriter get() { + return new PruneUpperBoundWriter(tableName, dataJanitorState, pruneFlushInterval); + } + }; } @Override public PruneUpperBoundWriter get() { - synchronized (lock) { - if (instance == null) { - instance = new PruneUpperBoundWriter(tableName, dataJanitorState, pruneFlushInterval); - instance.startAndWait(); - } - refCount++; - if (LOG.isDebugEnabled()) { - LOG.debug("Incrementing Reference Count for PruneUpperBoundWriter " + refCount); - } - return instance; - } + return referenceCountedSupplier.getOrCreate(supplier); } public void release() { - synchronized (lock) { - refCount--; - if (LOG.isDebugEnabled()) { - LOG.debug("Decrementing Reference Count for PruneUpperBoundWriter " + refCount); - } - - if (refCount == 0) { - try { - instance.stopAndWait(); - } catch (Exception ex) { - LOG.warn("Exception while trying to shutdown PruneUpperBoundWriter thread. ", ex); - } finally { - // If the thread is still alive (might happen if the thread was blocked on HBase PUT call), interrupt it again - if (instance.isAlive()) { - try { - instance.shutDown(); - } catch (Exception e) { - LOG.warn("Exception while trying to shutdown PruneUpperBoundWriter thread. ", e); - } - } - instance = null; - } - } - } + referenceCountedSupplier.release(); } } http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9b63985f/tephra-hbase-compat-1.0/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java ---------------------------------------------------------------------- diff --git a/tephra-hbase-compat-1.0/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java b/tephra-hbase-compat-1.0/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java index 07746d8..ac5e923 100644 --- a/tephra-hbase-compat-1.0/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java +++ b/tephra-hbase-compat-1.0/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java @@ -19,7 +19,6 @@ package org.apache.tephra.hbase.txprune; -import com.google.common.base.Supplier; import com.google.common.collect.ImmutableSet; import com.google.common.collect.ImmutableSortedMap; import com.google.common.collect.ImmutableSortedSet; @@ -39,6 +38,7 @@ import org.apache.hadoop.hbase.util.Bytes; import org.apache.tephra.TransactionContext; import org.apache.tephra.TransactionManager; import org.apache.tephra.TxConstants; +import org.apache.tephra.coprocessor.CacheSupplier; import org.apache.tephra.coprocessor.TransactionStateCache; import org.apache.tephra.hbase.AbstractHBaseTableTest; import org.apache.tephra.hbase.TransactionAwareHTable; @@ -400,12 +400,17 @@ public class InvalidListPruneTest extends AbstractHBaseTableTest { private static final AtomicLong lastMajorCompactionTime = new AtomicLong(-1); @Override - protected Supplier<TransactionStateCache> getTransactionStateCacheSupplier(RegionCoprocessorEnvironment env) { - return new Supplier<TransactionStateCache>() { + protected CacheSupplier<TransactionStateCache> getTransactionStateCacheSupplier(RegionCoprocessorEnvironment env) { + return new CacheSupplier<TransactionStateCache>() { @Override public TransactionStateCache get() { return new InMemoryTransactionStateCache(); } + + @Override + public void release() { + // no-op + } }; } http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9b63985f/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java ---------------------------------------------------------------------- diff --git a/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java b/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java index 02e2dac..553f598 100644 --- a/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java +++ b/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java @@ -18,7 +18,6 @@ package org.apache.tephra.hbase.coprocessor; -import com.google.common.base.Supplier; import com.google.common.collect.Maps; import com.google.common.collect.Sets; import org.apache.commons.logging.Log; @@ -57,6 +56,7 @@ import org.apache.hadoop.hbase.util.Bytes; import org.apache.tephra.Transaction; import org.apache.tephra.TransactionCodec; import org.apache.tephra.TxConstants; +import org.apache.tephra.coprocessor.CacheSupplier; import org.apache.tephra.coprocessor.TransactionStateCache; import org.apache.tephra.coprocessor.TransactionStateCacheSupplier; import org.apache.tephra.hbase.txprune.CompactionState; @@ -108,6 +108,7 @@ public class TransactionProcessor extends BaseRegionObserver { private final TransactionCodec txCodec; private TransactionStateCache cache; private volatile CompactionState compactionState; + private CacheSupplier<TransactionStateCache> cacheSupplier; protected volatile Boolean pruneEnable; protected volatile Long txMaxLifetimeMillis; @@ -125,7 +126,7 @@ public class TransactionProcessor extends BaseRegionObserver { public void start(CoprocessorEnvironment e) throws IOException { if (e instanceof RegionCoprocessorEnvironment) { RegionCoprocessorEnvironment env = (RegionCoprocessorEnvironment) e; - Supplier<TransactionStateCache> cacheSupplier = getTransactionStateCacheSupplier(env); + this.cacheSupplier = getTransactionStateCacheSupplier(env); this.cache = cacheSupplier.get(); HTableDescriptor tableDesc = env.getRegion().getTableDesc(); @@ -168,13 +169,19 @@ public class TransactionProcessor extends BaseRegionObserver { return env.getConfiguration(); } - protected Supplier<TransactionStateCache> getTransactionStateCacheSupplier(RegionCoprocessorEnvironment env) { + protected CacheSupplier<TransactionStateCache> getTransactionStateCacheSupplier(RegionCoprocessorEnvironment env) { return new TransactionStateCacheSupplier(env.getConfiguration()); } @Override public void stop(CoprocessorEnvironment e) throws IOException { - resetPruneState(); + try { + resetPruneState(); + } finally { + if (cacheSupplier != null) { + cacheSupplier.release(); + } + } } @Override http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9b63985f/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java ---------------------------------------------------------------------- diff --git a/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java b/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java index 6bd8bab..677710b 100644 --- a/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java +++ b/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java @@ -94,6 +94,10 @@ public class PruneUpperBoundWriter extends AbstractIdleService { if (flushThread != null) { flushThread.interrupt(); flushThread.join(TimeUnit.SECONDS.toMillis(1)); + if (flushThread.isAlive()) { + flushThread.interrupt(); + flushThread.join(TimeUnit.SECONDS.toMillis(1)); + } } } http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9b63985f/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriterSupplier.java ---------------------------------------------------------------------- diff --git a/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriterSupplier.java b/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriterSupplier.java index 98f3334..cb93fab 100644 --- a/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriterSupplier.java +++ b/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriterSupplier.java @@ -20,70 +20,36 @@ package org.apache.tephra.hbase.txprune; import com.google.common.base.Supplier; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.TableName; +import org.apache.tephra.coprocessor.CacheSupplier; +import org.apache.tephra.coprocessor.ReferenceCountedSupplier; /** * Supplies instances of {@link PruneUpperBoundWriter} implementations. */ -public class PruneUpperBoundWriterSupplier implements Supplier<PruneUpperBoundWriter> { - private static final Log LOG = LogFactory.getLog(PruneUpperBoundWriterSupplier.class); +public class PruneUpperBoundWriterSupplier implements CacheSupplier<PruneUpperBoundWriter> { - private static volatile PruneUpperBoundWriter instance; - private static volatile int refCount = 0; - private static final Object lock = new Object(); + private static final ReferenceCountedSupplier<PruneUpperBoundWriter> referenceCountedSupplier = + new ReferenceCountedSupplier<>(PruneUpperBoundWriter.class.getSimpleName()); - private final TableName tableName; - private final DataJanitorState dataJanitorState; - private final long pruneFlushInterval; + private final Supplier<PruneUpperBoundWriter> supplier; - public PruneUpperBoundWriterSupplier(TableName tableName, DataJanitorState dataJanitorState, - long pruneFlushInterval) { - this.tableName = tableName; - this.dataJanitorState = dataJanitorState; - this.pruneFlushInterval = pruneFlushInterval; + public PruneUpperBoundWriterSupplier(final TableName tableName, final DataJanitorState dataJanitorState, + final long pruneFlushInterval) { + this.supplier = new Supplier<PruneUpperBoundWriter>() { + @Override + public PruneUpperBoundWriter get() { + return new PruneUpperBoundWriter(tableName, dataJanitorState, pruneFlushInterval); + } + }; } @Override public PruneUpperBoundWriter get() { - synchronized (lock) { - if (instance == null) { - instance = new PruneUpperBoundWriter(tableName, dataJanitorState, pruneFlushInterval); - instance.startAndWait(); - } - refCount++; - if (LOG.isDebugEnabled()) { - LOG.debug("Incrementing Reference Count for PruneUpperBoundWriter " + refCount); - } - return instance; - } + return referenceCountedSupplier.getOrCreate(supplier); } public void release() { - synchronized (lock) { - refCount--; - if (LOG.isDebugEnabled()) { - LOG.debug("Decrementing Reference Count for PruneUpperBoundWriter " + refCount); - } - - if (refCount == 0) { - try { - instance.stopAndWait(); - } catch (Exception ex) { - LOG.warn("Exception while trying to shutdown PruneUpperBoundWriter thread. ", ex); - } finally { - // If the thread is still alive (might happen if the thread was blocked on HBase PUT call), interrupt it again - if (instance.isAlive()) { - try { - instance.shutDown(); - } catch (Exception e) { - LOG.warn("Exception while trying to shutdown PruneUpperBoundWriter thread. ", e); - } - } - instance = null; - } - } - } + referenceCountedSupplier.release(); } } http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9b63985f/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java ---------------------------------------------------------------------- diff --git a/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java b/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java index 07746d8..ac5e923 100644 --- a/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java +++ b/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java @@ -19,7 +19,6 @@ package org.apache.tephra.hbase.txprune; -import com.google.common.base.Supplier; import com.google.common.collect.ImmutableSet; import com.google.common.collect.ImmutableSortedMap; import com.google.common.collect.ImmutableSortedSet; @@ -39,6 +38,7 @@ import org.apache.hadoop.hbase.util.Bytes; import org.apache.tephra.TransactionContext; import org.apache.tephra.TransactionManager; import org.apache.tephra.TxConstants; +import org.apache.tephra.coprocessor.CacheSupplier; import org.apache.tephra.coprocessor.TransactionStateCache; import org.apache.tephra.hbase.AbstractHBaseTableTest; import org.apache.tephra.hbase.TransactionAwareHTable; @@ -400,12 +400,17 @@ public class InvalidListPruneTest extends AbstractHBaseTableTest { private static final AtomicLong lastMajorCompactionTime = new AtomicLong(-1); @Override - protected Supplier<TransactionStateCache> getTransactionStateCacheSupplier(RegionCoprocessorEnvironment env) { - return new Supplier<TransactionStateCache>() { + protected CacheSupplier<TransactionStateCache> getTransactionStateCacheSupplier(RegionCoprocessorEnvironment env) { + return new CacheSupplier<TransactionStateCache>() { @Override public TransactionStateCache get() { return new InMemoryTransactionStateCache(); } + + @Override + public void release() { + // no-op + } }; }
