TEPHRA-203 Invalid transaction pruning service Signed-off-by: poorna <[email protected]>
Project: http://git-wip-us.apache.org/repos/asf/incubator-tephra/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-tephra/commit/79b97198 Tree: http://git-wip-us.apache.org/repos/asf/incubator-tephra/tree/79b97198 Diff: http://git-wip-us.apache.org/repos/asf/incubator-tephra/diff/79b97198 Branch: refs/heads/master Commit: 79b97198ca92655b26f09d79304b798007a5dc45 Parents: 7c8267c Author: poorna <[email protected]> Authored: Tue Dec 6 01:55:55 2016 -0800 Committer: poorna <[email protected]> Committed: Wed Dec 28 16:08:11 2016 +0530 ---------------------------------------------------------------------- .../java/org/apache/tephra/TxConstants.java | 26 +- .../tephra/distributed/TransactionService.java | 18 + .../janitor/TransactionPruningPlugin.java | 91 ----- .../txprune/TransactionPruningPlugin.java | 88 +++++ .../txprune/TransactionPruningRunnable.java | 128 +++++++ .../txprune/TransactionPruningService.java | 144 ++++++++ .../java/org/apache/tephra/util/TxUtils.java | 12 + .../txprune/TransactionPruningServiceTest.java | 337 +++++++++++++++++ .../hbase/coprocessor/TransactionProcessor.java | 12 +- .../coprocessor/janitor/CompactionState.java | 92 ----- .../coprocessor/janitor/DataJanitorState.java | 362 ------------------- .../janitor/HBaseTransactionPruningPlugin.java | 299 --------------- .../hbase/coprocessor/janitor/TimeRegions.java | 85 ----- .../tephra/hbase/txprune/CompactionState.java | 94 +++++ .../tephra/hbase/txprune/DataJanitorState.java | 362 +++++++++++++++++++ .../txprune/HBaseTransactionPruningPlugin.java | 299 +++++++++++++++ .../tephra/hbase/txprune/TimeRegions.java | 85 +++++ .../janitor/DataJanitorStateTest.java | 205 ----------- .../janitor/InvalidListPruneTest.java | 361 ------------------ .../hbase/txprune/DataJanitorStateTest.java | 205 +++++++++++ .../hbase/txprune/InvalidListPruneTest.java | 361 ++++++++++++++++++ 21 files changed, 2163 insertions(+), 1503 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/79b97198/tephra-core/src/main/java/org/apache/tephra/TxConstants.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/org/apache/tephra/TxConstants.java b/tephra-core/src/main/java/org/apache/tephra/TxConstants.java index bc02936..512e93c 100644 --- a/tephra-core/src/main/java/org/apache/tephra/TxConstants.java +++ b/tephra-core/src/main/java/org/apache/tephra/TxConstants.java @@ -354,13 +354,35 @@ public class TxConstants { } /** - * Configuration for data janitor + * Configuration for invalid transaction pruning */ - public static final class DataJanitor { + public static final class TransactionPruning { + /** + * Flag to enable automatic invalid transaction pruning. + */ public static final String PRUNE_ENABLE = "data.tx.prune.enable"; + /** + * The table used to store intermediate state when pruning is enabled. + */ public static final String PRUNE_STATE_TABLE = "data.tx.prune.state.table"; + /** + * Interval in seconds to schedule prune run. + */ + public static final String PRUNE_INTERVAL = "data.tx.prune.interval"; + /** + * Comma separated list of invalid transaction pruning plugins to load + */ + public static final String PLUGINS = "data.tx.prune.plugins"; + /** + * Class name for the plugins will be plugin-name + ".class" suffix + */ + public static final String PLUGIN_CLASS_SUFFIX = ".class"; public static final boolean DEFAULT_PRUNE_ENABLE = false; public static final String DEFAULT_PRUNE_STATE_TABLE = "data_tx_janitor_state"; + public static final long DEFAULT_PRUNE_INTERVAL = TimeUnit.HOURS.toSeconds(6); + public static final String DEFAULT_PLUGIN = "data.tx.prune.plugin.default"; + public static final String DEFAULT_PLUGIN_CLASS = + "org.apache.tephra.hbase.txprune.HBaseTransactionPruningPlugin"; } } http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/79b97198/tephra-core/src/main/java/org/apache/tephra/distributed/TransactionService.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/org/apache/tephra/distributed/TransactionService.java b/tephra-core/src/main/java/org/apache/tephra/distributed/TransactionService.java index 4061c4d..d4a0f87 100644 --- a/tephra-core/src/main/java/org/apache/tephra/distributed/TransactionService.java +++ b/tephra-core/src/main/java/org/apache/tephra/distributed/TransactionService.java @@ -19,6 +19,8 @@ package org.apache.tephra.distributed; import com.google.common.annotations.VisibleForTesting; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.Uninterruptibles; import com.google.inject.Inject; @@ -28,6 +30,7 @@ import org.apache.tephra.TransactionManager; import org.apache.tephra.distributed.thrift.TTransactionServer; import org.apache.tephra.inmemory.InMemoryTransactionService; import org.apache.tephra.rpc.ThriftRPCServer; +import org.apache.tephra.txprune.TransactionPruningService; import org.apache.twill.api.ElectionHandler; import org.apache.twill.discovery.DiscoveryService; import org.apache.twill.internal.ServiceListenerAdapter; @@ -50,9 +53,11 @@ import javax.annotation.Nullable; public final class TransactionService extends InMemoryTransactionService { private static final Logger LOG = LoggerFactory.getLogger(TransactionService.class); private LeaderElection leaderElection; + private final Configuration conf; private final ZKClient zkClient; private ThriftRPCServer<TransactionServiceThriftHandler, TTransactionServer> server; + private TransactionPruningService pruningService; @Inject public TransactionService(Configuration conf, @@ -60,6 +65,7 @@ public final class TransactionService extends InMemoryTransactionService { DiscoveryService discoveryService, Provider<TransactionManager> txManagerProvider) { super(conf, discoveryService, txManagerProvider); + this.conf = conf; this.zkClient = zkClient; } @@ -91,6 +97,8 @@ public final class TransactionService extends InMemoryTransactionService { } }, MoreExecutors.sameThreadExecutor()); + pruningService = new TransactionPruningService(conf, txManager); + server = ThriftRPCServer.builder(TTransactionServer.class) .setHost(address) .setPort(port) @@ -100,6 +108,7 @@ public final class TransactionService extends InMemoryTransactionService { .build(new TransactionServiceThriftHandler(txManager)); try { server.startAndWait(); + pruningService.startAndWait(); doRegister(); LOG.info("Transaction Thrift Service started successfully on " + getAddress()); } catch (Throwable t) { @@ -111,12 +120,21 @@ public final class TransactionService extends InMemoryTransactionService { @Override public void follower() { + ListenableFuture<State> stopFuture = null; // First stop the transaction server as un-registering from discovery can block sometimes. // That can lead to multiple transaction servers being active at the same time. if (server != null && server.isRunning()) { server.stopAndWait(); } + if (pruningService != null && pruningService.isRunning()) { + // Wait for pruning service to stop after un-registering from discovery + stopFuture = pruningService.stop(); + } undoRegister(); + + if (stopFuture != null) { + Futures.getUnchecked(stopFuture); + } } }); leaderElection.start(); http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/79b97198/tephra-core/src/main/java/org/apache/tephra/janitor/TransactionPruningPlugin.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/org/apache/tephra/janitor/TransactionPruningPlugin.java b/tephra-core/src/main/java/org/apache/tephra/janitor/TransactionPruningPlugin.java deleted file mode 100644 index 7ccceec..0000000 --- a/tephra-core/src/main/java/org/apache/tephra/janitor/TransactionPruningPlugin.java +++ /dev/null @@ -1,91 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.tephra.janitor; - -import org.apache.hadoop.conf.Configuration; - -import java.io.IOException; - -/** - * Data janitor interface to manage the invalid transaction list. - * - * <p/> - * An invalid transaction can only be removed from the invalid list after the data written - * by the invalid transactions has been removed from all the data stores. - * The term data store is used here to represent a set of tables in a database that have - * the same data clean up policy, like all Apache Phoenix tables in an HBase instance. - * - * <p/> - * Typically every data store will have a background job which cleans up the data written by invalid transactions. - * Prune upper bound for a data store is defined as the largest invalid transaction whose data has been - * cleaned up from that data store. - * <pre> - * prune-upper-bound = min(max(invalid list), min(in-progress list) - 1) - * </pre> - * where invalid list and in-progress list are from the transaction snapshot used to clean up the invalid data in the - * data store. - * - * <p/> - * There will be one such plugin per data store. The plugins will be executed as part of the Transaction Service. - * Each plugin will be invoked periodically to fetch the prune upper bound for its data store. - * Invalid transaction list can pruned up to the minimum of prune upper bounds returned by all the plugins. - */ -public interface TransactionPruningPlugin { - /** - * Called once when the Transaction Service starts up. - * - * @param conf configuration for the plugin - */ - void initialize(Configuration conf) throws IOException; - - /** - * Called periodically to fetch prune upper bound for a data store. The plugin examines the state of data cleanup - * in the data store, and determines an upper bound for invalid transactions such that any invalid transaction - * smaller than or equal to this upper bound is guaranteed to have all its writes removed from the data store. - * It then returns this upper bound as the prune upper bound for this data store. - * - * @param time start time of this prune iteration in milliseconds - * @param inactiveTransactionBound the largest invalid transaction that can be possibly removed - * from the invalid list for the given time. This is an upper bound determined - * by the Transaction Service, based on its knowledge of in-progress and invalid - * transactions that may still have active processes and therefore future writes. - * The plugin will typically return a reduced upper bound based on the state of - * the invalid transaction data clean up in the data store. - * @return prune upper bound for the data store - */ - long fetchPruneUpperBound(long time, long inactiveTransactionBound) throws IOException; - - /** - * Called after successfully pruning the invalid list using the prune upper bound returned by - * {@link #fetchPruneUpperBound(long, long)}. - * The largest invalid transaction that was removed from the invalid list is passed as a parameter in this call. - * The plugin can use this information to clean up its state. - * - * @param time start time of this prune iteration in milliseconds (same value as passed to - * {@link #fetchPruneUpperBound(long, long)} in the same run) - * @param maxPrunedInvalid the largest invalid transaction that was removed from the invalid list - */ - void pruneComplete(long time, long maxPrunedInvalid) throws IOException; - - /** - * Called once during the shutdown of the Transaction Service. - */ - void destroy(); -} http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/79b97198/tephra-core/src/main/java/org/apache/tephra/txprune/TransactionPruningPlugin.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/org/apache/tephra/txprune/TransactionPruningPlugin.java b/tephra-core/src/main/java/org/apache/tephra/txprune/TransactionPruningPlugin.java new file mode 100644 index 0000000..2261331 --- /dev/null +++ b/tephra-core/src/main/java/org/apache/tephra/txprune/TransactionPruningPlugin.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tephra.txprune; + +import com.google.common.annotations.Beta; +import org.apache.hadoop.conf.Configuration; + +import java.io.IOException; + +/** + * Interface to manage the invalid transaction list. + * + * <p/> + * An invalid transaction can only be removed from the invalid list after the data written + * by the invalid transactions has been removed from all the data stores. + * The term data store is used here to represent a set of tables in a database that have + * the same data clean up policy, like all Apache Phoenix tables in an HBase instance. + * + * <p/> + * Typically every data store will have a background job which cleans up the data written by invalid transactions. + * Prune upper bound for a data store is defined as the largest invalid transaction whose data has been + * cleaned up from that data store. + * + * <p/> + * There will be one such plugin per data store. The plugins will be executed as part of the Transaction Service. + * Each plugin will be invoked periodically to fetch the prune upper bound for its data store. + * Invalid transaction list can pruned up to the minimum of prune upper bounds returned by all the plugins. + */ +@Beta +public interface TransactionPruningPlugin { + /** + * Called once when the Transaction Service starts up. + * + * @param conf configuration for the plugin + */ + void initialize(Configuration conf) throws IOException; + + /** + * Called periodically to fetch prune upper bound for a data store. The plugin examines the state of data cleanup + * in the data store, and determines an upper bound for invalid transactions such that any invalid transaction + * smaller than or equal to this upper bound is guaranteed to have all its writes removed from the data store. + * It then returns this upper bound as the prune upper bound for this data store. + * + * @param time start time of this prune iteration in milliseconds + * @param inactiveTransactionBound the largest invalid transaction that can be possibly removed + * from the invalid list for the given time. This is an upper bound determined + * by the Transaction Service, based on its knowledge of in-progress and invalid + * transactions that may still have active processes and therefore future writes. + * The plugin will typically return a reduced upper bound based on the state of + * the invalid transaction data clean up in the data store. + * @return prune upper bound for the data store + */ + long fetchPruneUpperBound(long time, long inactiveTransactionBound) throws IOException; + + /** + * Called after successfully pruning the invalid list using the prune upper bound returned by + * {@link #fetchPruneUpperBound(long, long)}. + * The largest invalid transaction that was removed from the invalid list is passed as a parameter in this call. + * The plugin can use this information to clean up its state. + * + * @param time start time of this prune iteration in milliseconds (same value as passed to + * {@link #fetchPruneUpperBound(long, long)} in the same run) + * @param maxPrunedInvalid the largest invalid transaction that was removed from the invalid list + */ + void pruneComplete(long time, long maxPrunedInvalid) throws IOException; + + /** + * Called once during the shutdown of the Transaction Service. + */ + void destroy(); +} http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/79b97198/tephra-core/src/main/java/org/apache/tephra/txprune/TransactionPruningRunnable.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/org/apache/tephra/txprune/TransactionPruningRunnable.java b/tephra-core/src/main/java/org/apache/tephra/txprune/TransactionPruningRunnable.java new file mode 100644 index 0000000..8ea5a11 --- /dev/null +++ b/tephra-core/src/main/java/org/apache/tephra/txprune/TransactionPruningRunnable.java @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tephra.txprune; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.tephra.Transaction; +import org.apache.tephra.TransactionManager; +import org.apache.tephra.util.TxUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.TreeSet; + +/** + * This class executes one run of transaction pruning every time it is invoked. + * Typically, this class will be scheduled to run periodically. + */ +public class TransactionPruningRunnable implements Runnable { + private static final Logger LOG = LoggerFactory.getLogger(TransactionPruningRunnable.class); + + private final TransactionManager txManager; + private final Map<String, TransactionPruningPlugin> plugins; + private final long txMaxLifetimeMillis; + + public TransactionPruningRunnable(TransactionManager txManager, Map<String, TransactionPruningPlugin> plugins, + long txMaxLifetimeMillis) { + this.txManager = txManager; + this.plugins = plugins; + this.txMaxLifetimeMillis = txMaxLifetimeMillis; + } + + @Override + public void run() { + try { + // TODO: TEPHRA-159 Start a read only transaction here + Transaction tx = txManager.startShort(); + txManager.abort(tx); + + long now = getTime(); + long inactiveTransactionBound = TxUtils.getInactiveTxBound(now, txMaxLifetimeMillis); + LOG.info("Starting invalid prune run for time {} and inactive transaction bound {}", + now, inactiveTransactionBound); + + List<Long> pruneUpperBounds = new ArrayList<>(); + for (Map.Entry<String, TransactionPruningPlugin> entry : plugins.entrySet()) { + String name = entry.getKey(); + TransactionPruningPlugin plugin = entry.getValue(); + try { + LOG.debug("Fetching prune upper bound using plugin {}", name); + long pruneUpperBound = plugin.fetchPruneUpperBound(now, inactiveTransactionBound); + LOG.debug("Got prune upper bound {} from plugin {}", pruneUpperBound, name); + pruneUpperBounds.add(pruneUpperBound); + } catch (Exception e) { + LOG.error("Aborting invalid prune run for time {} due to exception from plugin {}", now, name, e); + return; + } + } + + long minPruneUpperBound = Collections.min(pruneUpperBounds); + LOG.info("Got minimum prune upper bound {} across all plugins", minPruneUpperBound); + if (minPruneUpperBound <= 0) { + LOG.info("Not pruning invalid list since minimum prune upper bound ({}) is less than 1", minPruneUpperBound); + return; + } + + long[] invalids = tx.getInvalids(); + TreeSet<Long> toTruncate = new TreeSet<>(); + LOG.debug("Invalid list: {}", invalids); + for (long invalid : invalids) { + if (invalid <= minPruneUpperBound) { + toTruncate.add(invalid); + } + } + if (toTruncate.isEmpty()) { + LOG.info("Not pruning invalid list since no invalid id is less than or equal to the minimum prune upper bound"); + return; + } + + LOG.debug("Removing the following invalid ids from the invalid list", toTruncate); + txManager.truncateInvalidTx(toTruncate); + LOG.info("Removed {} invalid ids from the invalid list", toTruncate.size()); + + // Call prune complete on all plugins + Long maxPrunedInvalid = toTruncate.last(); + for (Map.Entry<String, TransactionPruningPlugin> entry : plugins.entrySet()) { + String name = entry.getKey(); + TransactionPruningPlugin plugin = entry.getValue(); + try { + LOG.debug("Calling prune complete on plugin {}", name); + plugin.pruneComplete(now, maxPrunedInvalid); + } catch (Exception e) { + // Ignore any exceptions and continue with other plugins + LOG.error("Got error while calling prune complete on plugin {}", name, e); + } + } + + LOG.info("Invalid prune run for time {} is complete", now); + } catch (Exception e) { + LOG.error("Got exception during invalid list prune run", e); + } + } + + @VisibleForTesting + long getTime() { + return System.currentTimeMillis(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/79b97198/tephra-core/src/main/java/org/apache/tephra/txprune/TransactionPruningService.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/org/apache/tephra/txprune/TransactionPruningService.java b/tephra-core/src/main/java/org/apache/tephra/txprune/TransactionPruningService.java new file mode 100644 index 0000000..52d7279 --- /dev/null +++ b/tephra-core/src/main/java/org/apache/tephra/txprune/TransactionPruningService.java @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tephra.txprune; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.util.concurrent.AbstractIdleService; +import org.apache.hadoop.conf.Configuration; +import org.apache.tephra.TransactionManager; +import org.apache.tephra.TxConstants; +import org.apache.twill.internal.utils.Instances; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.lang.reflect.InvocationTargetException; +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +/** + * Service to prune the invalid list periodically. + */ +public class TransactionPruningService extends AbstractIdleService { + private static final Logger LOG = LoggerFactory.getLogger(TransactionPruningService.class); + + private final Configuration conf; + private final TransactionManager txManager; + private final long scheduleInterval; + private final boolean pruneEnabled; + private ScheduledExecutorService scheduledExecutorService; + + public TransactionPruningService(Configuration conf, TransactionManager txManager) { + this.conf = conf; + this.txManager = txManager; + this.pruneEnabled = conf.getBoolean(TxConstants.TransactionPruning.PRUNE_ENABLE, + TxConstants.TransactionPruning.DEFAULT_PRUNE_ENABLE); + this.scheduleInterval = conf.getLong(TxConstants.TransactionPruning.PRUNE_INTERVAL, + TxConstants.TransactionPruning.DEFAULT_PRUNE_INTERVAL); + } + + @Override + protected void startUp() throws Exception { + if (!pruneEnabled) { + LOG.info("Transaction pruning is not enabled"); + return; + } + + LOG.info("Starting {}...", this.getClass().getSimpleName()); + scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(); + + Map<String, TransactionPruningPlugin> plugins = initializePlugins(); + long txMaxLifetimeMillis = TimeUnit.SECONDS.toMillis(conf.getInt(TxConstants.Manager.CFG_TX_MAX_LIFETIME, + TxConstants.Manager.DEFAULT_TX_MAX_LIFETIME)); + scheduledExecutorService.scheduleAtFixedRate( + getTxPruneRunnable(txManager, plugins, txMaxLifetimeMillis), + scheduleInterval, scheduleInterval, TimeUnit.SECONDS); + LOG.info("Scheduled {} plugins with interval {} seconds", plugins.size(), scheduleInterval); + } + + @Override + protected void shutDown() throws Exception { + if (!pruneEnabled) { + return; + } + + LOG.info("Stopping {}...", this.getClass().getSimpleName()); + try { + scheduledExecutorService.shutdown(); + scheduledExecutorService.awaitTermination(30, TimeUnit.SECONDS); + } catch (InterruptedException e) { + scheduledExecutorService.shutdownNow(); + Thread.currentThread().interrupt(); + } + LOG.info("Stopped {}", this.getClass().getSimpleName()); + } + + @VisibleForTesting + TransactionPruningRunnable getTxPruneRunnable(TransactionManager txManager, + Map<String, TransactionPruningPlugin> plugins, + long txMaxLifetimeMillis) { + return new TransactionPruningRunnable(txManager, plugins, txMaxLifetimeMillis); + } + + private Map<String, TransactionPruningPlugin> initializePlugins() + throws ClassNotFoundException, NoSuchMethodException, IllegalAccessException, InvocationTargetException, + InstantiationException, IOException { + Map<String, TransactionPruningPlugin> initializedPlugins = new HashMap<>(); + + // Read set of plugin names from configuration + Set<String> plugins = + new HashSet<>(Arrays.asList(conf.getTrimmedStrings(TxConstants.TransactionPruning.PLUGINS, + TxConstants.TransactionPruning.DEFAULT_PLUGIN))); + + LOG.info("Initializing invalid list prune plugins {}", plugins); + for (String plugin : plugins) { + // Load the class for the plugin + // TODO: TEPHRA-205 classloader isolation for plugins + Class<? extends TransactionPruningPlugin> clazz = null; + if (TxConstants.TransactionPruning.DEFAULT_PLUGIN.equals(plugin)) { + Class<?> defaultClass = Class.forName(TxConstants.TransactionPruning.DEFAULT_PLUGIN_CLASS); + if (TransactionPruningPlugin.class.isAssignableFrom(defaultClass)) { + //noinspection unchecked + clazz = (Class<? extends TransactionPruningPlugin>) defaultClass; + } + } else { + clazz = conf.getClass(plugin + TxConstants.TransactionPruning.PLUGIN_CLASS_SUFFIX, + null, TransactionPruningPlugin.class); + } + if (clazz == null) { + throw new IllegalStateException("No class specified in configuration for invalid pruning plugin " + plugin); + } + LOG.debug("Got class {} for plugin {}", clazz.getName(), plugin); + + TransactionPruningPlugin instance = Instances.newInstance(clazz); + instance.initialize(conf); + LOG.debug("Plugin {} initialized", plugin); + initializedPlugins.put(plugin, instance); + } + + return initializedPlugins; + } +} http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/79b97198/tephra-core/src/main/java/org/apache/tephra/util/TxUtils.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/org/apache/tephra/util/TxUtils.java b/tephra-core/src/main/java/org/apache/tephra/util/TxUtils.java index 3a1a071..aaca23d 100644 --- a/tephra-core/src/main/java/org/apache/tephra/util/TxUtils.java +++ b/tephra-core/src/main/java/org/apache/tephra/util/TxUtils.java @@ -168,6 +168,18 @@ public class TxUtils { } /** + * Returns the greatest transaction that has passed the maximum duration a transaction can be used for data writes. + * In other words, at <code>timeMills</code> there can be no writes from transactions equal to or smaller + * than the returned bound. + * + * @param timeMills time in milliseconds for which the inactive transaction bound needs to be determined + * @param txMaxLifetimeMillis maximum duration a transaction can be used for data writes, in milliseconds + */ + public static long getInactiveTxBound(long timeMills, long txMaxLifetimeMillis) { + return (timeMills - txMaxLifetimeMillis) * TxConstants.MAX_TX_PER_MS - 1; + } + + /** * Returns the timestamp at which the given transaction id was generated. * * @param txId transaction id http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/79b97198/tephra-core/src/test/java/org/apache/tephra/txprune/TransactionPruningServiceTest.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/test/java/org/apache/tephra/txprune/TransactionPruningServiceTest.java b/tephra-core/src/test/java/org/apache/tephra/txprune/TransactionPruningServiceTest.java new file mode 100644 index 0000000..9c23ab7 --- /dev/null +++ b/tephra-core/src/test/java/org/apache/tephra/txprune/TransactionPruningServiceTest.java @@ -0,0 +1,337 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tephra.txprune; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Iterables; +import com.google.common.collect.Iterators; +import org.apache.hadoop.conf.Configuration; +import org.apache.tephra.Transaction; +import org.apache.tephra.TransactionManager; +import org.apache.tephra.TxConstants; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +/** + * Test {@link TransactionPruningService}. + */ +public class TransactionPruningServiceTest { + + @Before + public void resetData() { + MockTxManager.getPrunedInvalidsList().clear(); + + MockPlugin1.getInactiveTransactionBoundList().clear(); + MockPlugin1.getMaxPrunedInvalidList().clear(); + + MockPlugin2.getInactiveTransactionBoundList().clear(); + MockPlugin2.getMaxPrunedInvalidList().clear(); + } + + @Test + public void testTransactionPruningService() throws Exception { + // Setup plugins + Configuration conf = new Configuration(); + conf.set(TxConstants.TransactionPruning.PLUGINS, + "data.tx.txprune.plugin.mockPlugin1, data.tx.txprune.plugin.mockPlugin2"); + conf.set("data.tx.txprune.plugin.mockPlugin1.class", + "org.apache.tephra.txprune.TransactionPruningServiceTest$MockPlugin1"); + conf.set("data.tx.txprune.plugin.mockPlugin2.class", + "org.apache.tephra.txprune.TransactionPruningServiceTest$MockPlugin2"); + // Setup schedule to run every second + conf.setBoolean(TxConstants.TransactionPruning.PRUNE_ENABLE, true); + conf.setInt(TxConstants.TransactionPruning.PRUNE_INTERVAL, 1); + conf.setInt(TxConstants.Manager.CFG_TX_MAX_LIFETIME, 10); + + // Setup mock data + long m = 1000; + long n = m * TxConstants.MAX_TX_PER_MS; + // Current time to be returned + Iterator<Long> currentTime = Iterators.cycle(120L * m, 220L * m); + // Transaction objects to be returned by mock tx manager + Iterator<Transaction> txns = + Iterators.cycle(new Transaction(100 * n, 110 * n, new long[]{40 * n, 50 * n, 60 * n, 70 * n}, + new long[]{80 * n, 90 * n}, 80 * n), + new Transaction(200 * n, 210 * n, new long[]{60 * n, 75 * n, 78 * n, 100 * n, 110 * n, 120 * n}, + new long[]{80 * n, 90 * n}, 80 * n)); + // Prune upper bounds to be returned by the mock plugins + Iterator<Long> pruneUpperBoundsPlugin1 = Iterators.cycle(60L * n, 80L * n); + Iterator<Long> pruneUpperBoundsPlugin2 = Iterators.cycle(70L * n, 77L * n); + + TestTransactionPruningRunnable.setCurrentTime(currentTime); + MockTxManager.setTxIter(txns); + MockPlugin1.setPruneUpperBoundIter(pruneUpperBoundsPlugin1); + MockPlugin2.setPruneUpperBoundIter(pruneUpperBoundsPlugin2); + + MockTxManager mockTxManager = new MockTxManager(conf); + TransactionPruningService pruningService = new TestTransactionPruningService(conf, mockTxManager); + pruningService.startAndWait(); + // This will cause the pruning run to happen three times, + // but we are interested in only first two runs for the assertions later + TimeUnit.SECONDS.sleep(3); + pruningService.stopAndWait(); + + // Assert inactive transaction bound that the plugins receive. + // Both the plugins should get the same inactive transaction bound since it is + // computed and passed by the transaction service + Assert.assertEquals(ImmutableList.of(110L * n - 1, 210L * n - 1), + limitTwo(MockPlugin1.getInactiveTransactionBoundList())); + Assert.assertEquals(ImmutableList.of(110L * n - 1, 210L * n - 1), + limitTwo(MockPlugin2.getInactiveTransactionBoundList())); + + // Assert invalid list entries that got pruned + // The min prune upper bound for the first run should be 60, and for the second run 77 + Assert.assertEquals(ImmutableList.of(ImmutableSet.of(40L * n, 50L * n, 60L * n), ImmutableSet.of(60L * n, 75L * n)), + limitTwo(MockTxManager.getPrunedInvalidsList())); + + // Assert max invalid tx pruned that the plugins receive for the prune complete call + // Both the plugins should get the same max invalid tx pruned value since it is + // computed and passed by the transaction service + Assert.assertEquals(ImmutableList.of(60L * n, 75L * n), limitTwo(MockPlugin1.getMaxPrunedInvalidList())); + Assert.assertEquals(ImmutableList.of(60L * n, 75L * n), limitTwo(MockPlugin2.getMaxPrunedInvalidList())); + } + + @Test + public void testNoPruning() throws Exception { + // Setup plugins + Configuration conf = new Configuration(); + conf.set(TxConstants.TransactionPruning.PLUGINS, + "data.tx.txprune.plugin.mockPlugin1, data.tx.txprune.plugin.mockPlugin2"); + conf.set("data.tx.txprune.plugin.mockPlugin1.class", + "org.apache.tephra.txprune.TransactionPruningServiceTest$MockPlugin1"); + conf.set("data.tx.txprune.plugin.mockPlugin2.class", + "org.apache.tephra.txprune.TransactionPruningServiceTest$MockPlugin2"); + // Setup schedule to run every second + conf.setBoolean(TxConstants.TransactionPruning.PRUNE_ENABLE, true); + conf.setInt(TxConstants.TransactionPruning.PRUNE_INTERVAL, 1); + conf.setInt(TxConstants.Manager.CFG_TX_MAX_LIFETIME, 10); + + // Setup mock data + long m = 1000; + long n = m * TxConstants.MAX_TX_PER_MS; + // Current time to be returned + Iterator<Long> currentTime = Iterators.cycle(120L * m, 220L * m); + // Transaction objects to be returned by mock tx manager + Iterator<Transaction> txns = + Iterators.cycle(new Transaction(100 * n, 110 * n, new long[]{40 * n, 50 * n, 60 * n, 70 * n}, + new long[]{80 * n, 90 * n}, 80 * n), + new Transaction(200 * n, 210 * n, new long[]{60 * n, 75 * n, 78 * n, 100 * n, 110 * n, 120 * n}, + new long[]{80 * n, 90 * n}, 80 * n)); + // Prune upper bounds to be returned by the mock plugins + Iterator<Long> pruneUpperBoundsPlugin1 = Iterators.cycle(35L * n, -1L); + Iterator<Long> pruneUpperBoundsPlugin2 = Iterators.cycle(70L * n, 100L * n); + + TestTransactionPruningRunnable.setCurrentTime(currentTime); + MockTxManager.setTxIter(txns); + MockPlugin1.setPruneUpperBoundIter(pruneUpperBoundsPlugin1); + MockPlugin2.setPruneUpperBoundIter(pruneUpperBoundsPlugin2); + + MockTxManager mockTxManager = new MockTxManager(conf); + TransactionPruningService pruningService = new TestTransactionPruningService(conf, mockTxManager); + pruningService.startAndWait(); + // This will cause the pruning run to happen three times, + // but we are interested in only first two runs for the assertions later + TimeUnit.SECONDS.sleep(3); + pruningService.stopAndWait(); + + // Assert inactive transaction bound + Assert.assertEquals(ImmutableList.of(110L * n - 1, 210L * n - 1), + limitTwo(MockPlugin1.getInactiveTransactionBoundList())); + Assert.assertEquals(ImmutableList.of(110L * n - 1, 210L * n - 1), + limitTwo(MockPlugin2.getInactiveTransactionBoundList())); + + // Invalid entries should not be pruned in any run + Assert.assertEquals(ImmutableList.of(), MockTxManager.getPrunedInvalidsList()); + + // No max invalid tx pruned for any run + Assert.assertEquals(ImmutableList.of(), limitTwo(MockPlugin1.getMaxPrunedInvalidList())); + Assert.assertEquals(ImmutableList.of(), limitTwo(MockPlugin2.getMaxPrunedInvalidList())); + } + + /** + * Mock transaction manager for testing + */ + private static class MockTxManager extends TransactionManager { + private static Iterator<Transaction> txIter; + private static List<Set<Long>> prunedInvalidsList = new ArrayList<>(); + + MockTxManager(Configuration config) { + super(config); + } + + @Override + public Transaction startShort() { + return txIter.next(); + } + + @Override + public void abort(Transaction tx) { + // do nothing + } + + @Override + public boolean truncateInvalidTx(Set<Long> invalidTxIds) { + prunedInvalidsList.add(invalidTxIds); + return true; + } + + static void setTxIter(Iterator<Transaction> txIter) { + MockTxManager.txIter = txIter; + } + + static List<Set<Long>> getPrunedInvalidsList() { + return prunedInvalidsList; + } + } + + /** + * Extends {@link TransactionPruningService} to use mock time to help in testing. + */ + private static class TestTransactionPruningService extends TransactionPruningService { + TestTransactionPruningService(Configuration conf, TransactionManager txManager) { + super(conf, txManager); + } + + @Override + TransactionPruningRunnable getTxPruneRunnable(TransactionManager txManager, + Map<String, TransactionPruningPlugin> plugins, + long txMaxLifetimeMillis) { + return new TestTransactionPruningRunnable(txManager, plugins, txMaxLifetimeMillis); + } + } + + /** + * Extends {@link TransactionPruningRunnable} to use mock time to help in testing. + */ + private static class TestTransactionPruningRunnable extends TransactionPruningRunnable { + private static Iterator<Long> currentTime; + TestTransactionPruningRunnable(TransactionManager txManager, Map<String, TransactionPruningPlugin> plugins, + long txMaxLifetimeMillis) { + super(txManager, plugins, txMaxLifetimeMillis); + } + + @Override + long getTime() { + return currentTime.next(); + } + + static void setCurrentTime(Iterator<Long> currentTime) { + TestTransactionPruningRunnable.currentTime = currentTime; + } + } + + /** + * Mock transaction pruning plugin for testing. + */ + private static class MockPlugin1 implements TransactionPruningPlugin { + private static Iterator<Long> pruneUpperBoundIter; + private static List<Long> inactiveTransactionBoundList = new ArrayList<>(); + private static List<Long> maxPrunedInvalidList = new ArrayList<>(); + + @Override + public void initialize(Configuration conf) throws IOException { + // Nothing to do + } + + @Override + public long fetchPruneUpperBound(long time, long inactiveTransactionBound) throws IOException { + inactiveTransactionBoundList.add(inactiveTransactionBound); + return pruneUpperBoundIter.next(); + } + + @Override + public void pruneComplete(long time, long maxPrunedInvalid) throws IOException { + maxPrunedInvalidList.add(maxPrunedInvalid); + } + + @Override + public void destroy() { + // Nothing to do + } + + static void setPruneUpperBoundIter(Iterator<Long> pruneUpperBoundIter) { + MockPlugin1.pruneUpperBoundIter = pruneUpperBoundIter; + } + + static List<Long> getInactiveTransactionBoundList() { + return inactiveTransactionBoundList; + } + + static List<Long> getMaxPrunedInvalidList() { + return maxPrunedInvalidList; + } + } + + /** + * Mock transaction pruning plugin for testing. + */ + private static class MockPlugin2 implements TransactionPruningPlugin { + private static Iterator<Long> pruneUpperBoundIter; + private static List<Long> inactiveTransactionBoundList = new ArrayList<>(); + private static List<Long> maxPrunedInvalidList = new ArrayList<>(); + + @Override + public void initialize(Configuration conf) throws IOException { + // Nothing to do + } + + @Override + public long fetchPruneUpperBound(long time, long inactiveTransactionBound) throws IOException { + inactiveTransactionBoundList.add(inactiveTransactionBound); + return pruneUpperBoundIter.next(); + } + + @Override + public void pruneComplete(long time, long maxPrunedInvalid) throws IOException { + maxPrunedInvalidList.add(maxPrunedInvalid); + } + + @Override + public void destroy() { + // Nothing to do + } + + static void setPruneUpperBoundIter(Iterator<Long> pruneUpperBoundIter) { + MockPlugin2.pruneUpperBoundIter = pruneUpperBoundIter; + } + + static List<Long> getInactiveTransactionBoundList() { + return inactiveTransactionBoundList; + } + + static List<Long> getMaxPrunedInvalidList() { + return maxPrunedInvalidList; + } + } + + private static <T> List<T> limitTwo(Iterable<T> iterable) { + return ImmutableList.copyOf(Iterables.limit(iterable, 2)); + } +} http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/79b97198/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java ---------------------------------------------------------------------- diff --git a/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java b/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java index 132c157..e495692 100644 --- a/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java +++ b/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java @@ -57,7 +57,7 @@ import org.apache.tephra.TransactionCodec; import org.apache.tephra.TxConstants; import org.apache.tephra.coprocessor.TransactionStateCache; import org.apache.tephra.coprocessor.TransactionStateCacheSupplier; -import org.apache.tephra.hbase.coprocessor.janitor.CompactionState; +import org.apache.tephra.hbase.txprune.CompactionState; import org.apache.tephra.persist.TransactionVisibilityState; import org.apache.tephra.util.TxUtils; @@ -151,12 +151,12 @@ public class TransactionProcessor extends BaseRegionObserver { TimeUnit.SECONDS.toMillis(env.getConfiguration().getInt(TxConstants.Manager.CFG_TX_MAX_LIFETIME, TxConstants.Manager.DEFAULT_TX_MAX_LIFETIME)); - boolean pruneEnabled = env.getConfiguration().getBoolean(TxConstants.DataJanitor.PRUNE_ENABLE, - TxConstants.DataJanitor.DEFAULT_PRUNE_ENABLE); + boolean pruneEnabled = env.getConfiguration().getBoolean(TxConstants.TransactionPruning.PRUNE_ENABLE, + TxConstants.TransactionPruning.DEFAULT_PRUNE_ENABLE); if (pruneEnabled) { - String pruneTable = env.getConfiguration().get(TxConstants.DataJanitor.PRUNE_STATE_TABLE, - TxConstants.DataJanitor.DEFAULT_PRUNE_STATE_TABLE); - compactionState = new CompactionState(env, TableName.valueOf(pruneTable)); + String pruneTable = env.getConfiguration().get(TxConstants.TransactionPruning.PRUNE_STATE_TABLE, + TxConstants.TransactionPruning.DEFAULT_PRUNE_STATE_TABLE); + compactionState = new CompactionState(env, TableName.valueOf(pruneTable), txMaxLifetimeMillis); LOG.debug("Automatic invalid list pruning is enabled. Compaction state will be recorded in table " + pruneTable); } http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/79b97198/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/janitor/CompactionState.java ---------------------------------------------------------------------- diff --git a/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/janitor/CompactionState.java b/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/janitor/CompactionState.java deleted file mode 100644 index 7412a18..0000000 --- a/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/janitor/CompactionState.java +++ /dev/null @@ -1,92 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.tephra.hbase.coprocessor.janitor; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.client.Table; -import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; -import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; -import org.apache.tephra.Transaction; -import org.apache.tephra.persist.TransactionVisibilityState; -import org.apache.tephra.util.TxUtils; - -import java.io.IOException; -import javax.annotation.Nullable; - -/** - * Record compaction state for invalid list pruning - */ -public class CompactionState { - private static final Log LOG = LogFactory.getLog(CompactionState.class); - - private final byte[] regionName; - private final String regionNameAsString; - private final TableName stateTable; - private final DataJanitorState dataJanitorState; - private volatile long pruneUpperBound = -1; - - public CompactionState(final RegionCoprocessorEnvironment env, final TableName stateTable) { - this.regionName = env.getRegionInfo().getRegionName(); - this.regionNameAsString = env.getRegionInfo().getRegionNameAsString(); - this.stateTable = stateTable; - this.dataJanitorState = new DataJanitorState(new DataJanitorState.TableSupplier() { - @Override - public Table get() throws IOException { - return env.getTable(stateTable); - } - }); - } - - /** - * Records the transaction state used for a compaction. This method is called when the compaction starts. - * - * @param request {@link CompactionRequest} for the compaction - * @param snapshot transaction state that will be used for the compaction - */ - public void record(CompactionRequest request, @Nullable TransactionVisibilityState snapshot) { - if (request.isMajor() && snapshot != null) { - Transaction tx = TxUtils.createDummyTransaction(snapshot); - pruneUpperBound = TxUtils.getPruneUpperBound(tx); - LOG.debug( - String.format("Computed prune upper bound %s for compaction request %s using transaction state from time %s", - pruneUpperBound, request, snapshot.getTimestamp())); - } else { - pruneUpperBound = -1; - } - } - - /** - * Persists the transaction state recorded by {@link #record(CompactionRequest, TransactionVisibilityState)}. - * This method is called after the compaction has successfully completed. - */ - public void persist() { - if (pruneUpperBound != -1) { - try { - dataJanitorState.savePruneUpperBoundForRegion(regionName, pruneUpperBound); - LOG.debug(String.format("Saved prune upper bound %s for region %s", pruneUpperBound, regionNameAsString)); - } catch (IOException e) { - LOG.warn(String.format("Cannot record prune upper bound in table %s after compacting region %s", - stateTable, regionNameAsString), e); - } - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/79b97198/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/janitor/DataJanitorState.java ---------------------------------------------------------------------- diff --git a/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/janitor/DataJanitorState.java b/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/janitor/DataJanitorState.java deleted file mode 100644 index 7daac94..0000000 --- a/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/janitor/DataJanitorState.java +++ /dev/null @@ -1,362 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.tephra.hbase.coprocessor.janitor; - -import com.google.common.collect.Maps; -import org.apache.hadoop.hbase.client.Delete; -import org.apache.hadoop.hbase.client.Get; -import org.apache.hadoop.hbase.client.Put; -import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.client.ResultScanner; -import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.client.Table; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.tephra.hbase.coprocessor.TransactionProcessor; - -import java.io.IOException; -import java.util.Map; -import java.util.Set; -import java.util.SortedSet; -import java.util.TreeMap; -import java.util.TreeSet; -import javax.annotation.Nullable; - -/** - * Persist data janitor state into an HBase table. - * This is used by both {@link TransactionProcessor} and by the {@link HBaseTransactionPruningPlugin} - * to persist and read the compaction state. - */ -@SuppressWarnings("WeakerAccess") -public class DataJanitorState { - public static final byte[] FAMILY = {'f'}; - - private static final byte[] PRUNE_UPPER_BOUND_COL = {'p'}; - private static final byte[] REGION_TIME_COL = {'r'}; - private static final byte[] INACTIVE_TRANSACTION_BOUND_TIME_COL = {'i'}; - - private static final byte[] REGION_KEY_PREFIX = {0x1}; - private static final byte[] REGION_KEY_PREFIX_STOP = {0x2}; - - private static final byte[] REGION_TIME_KEY_PREFIX = {0x2}; - private static final byte[] REGION_TIME_KEY_PREFIX_STOP = {0x3}; - - private static final byte[] INACTIVE_TRANSACTION_BOUND_TIME_KEY_PREFIX = {0x3}; - private static final byte[] INACTIVE_TRANSACTION_BOUND_TIME_KEY_PREFIX_STOP = {0x4}; - - private static final byte[] EMPTY_BYTE_ARRAY = new byte[0]; - - private final TableSupplier stateTableSupplier; - - - public DataJanitorState(TableSupplier stateTableSupplier) { - this.stateTableSupplier = stateTableSupplier; - } - - // ---------------------------------------------------------------- - // ------- Methods for prune upper bound for a given region ------- - // ---------------------------------------------------------------- - // The data is stored in the following format - - // Key: 0x1<region-id> - // Col 'u': <prune upper bound> - // ---------------------------------------------------------------- - - /** - * Persist the latest prune upper bound for a given region. This is called by {@link TransactionProcessor} - * after major compaction. - * - * @param regionId region id - * @param pruneUpperBound the latest prune upper bound for the region - * @throws IOException when not able to persist the data to HBase - */ - public void savePruneUpperBoundForRegion(byte[] regionId, long pruneUpperBound) throws IOException { - try (Table stateTable = stateTableSupplier.get()) { - Put put = new Put(makeRegionKey(regionId)); - put.addColumn(FAMILY, PRUNE_UPPER_BOUND_COL, Bytes.toBytes(pruneUpperBound)); - stateTable.put(put); - } - } - - /** - * Get latest prune upper bound for a given region. This indicates the largest invalid transaction that no - * longer has writes in this region. - * - * @param regionId region id - * @return latest prune upper bound for the region - * @throws IOException when not able to read the data from HBase - */ - public long getPruneUpperBoundForRegion(byte[] regionId) throws IOException { - try (Table stateTable = stateTableSupplier.get()) { - Get get = new Get(makeRegionKey(regionId)); - get.addColumn(FAMILY, PRUNE_UPPER_BOUND_COL); - byte[] result = stateTable.get(get).getValue(FAMILY, PRUNE_UPPER_BOUND_COL); - return result == null ? -1 : Bytes.toLong(result); - } - } - - /** - * Get latest prune upper bounds for given regions. This is a batch operation of method - * {@link #getPruneUpperBoundForRegion(byte[])} - * - * @param regions a set of regions - * @return a map containing region id and its latest prune upper bound value - * @throws IOException when not able to read the data from HBase - */ - public Map<byte[], Long> getPruneUpperBoundForRegions(SortedSet<byte[]> regions) throws IOException { - Map<byte[], Long> resultMap = new TreeMap<>(Bytes.BYTES_COMPARATOR); - try (Table stateTable = stateTableSupplier.get()) { - byte[] startRow = makeRegionKey(EMPTY_BYTE_ARRAY); - Scan scan = new Scan(startRow, REGION_KEY_PREFIX_STOP); - scan.addColumn(FAMILY, PRUNE_UPPER_BOUND_COL); - - try (ResultScanner scanner = stateTable.getScanner(scan)) { - Result next; - while ((next = scanner.next()) != null) { - byte[] region = getRegionFromKey(next.getRow()); - if (regions.contains(region)) { - byte[] timeBytes = next.getValue(FAMILY, PRUNE_UPPER_BOUND_COL); - if (timeBytes != null) { - long pruneUpperBoundRegion = Bytes.toLong(timeBytes); - resultMap.put(region, pruneUpperBoundRegion); - } - } - } - } - return resultMap; - } - } - - /** - * Delete prune upper bounds for the regions that are not in the given exclude set, and the - * prune upper bound is less than the given value. - * After the invalid list is pruned up to deletionPruneUpperBound, we do not need entries for regions that have - * prune upper bound less than deletionPruneUpperBound. We however limit the deletion to only regions that are - * no longer in existence (due to deletion, etc.), to avoid update/delete race conditions. - * - * @param deletionPruneUpperBound prune upper bound below which regions will be deleted - * @param excludeRegions set of regions that should not be deleted - * @throws IOException when not able to delete data in HBase - */ - public void deletePruneUpperBounds(long deletionPruneUpperBound, SortedSet<byte[]> excludeRegions) - throws IOException { - try (Table stateTable = stateTableSupplier.get()) { - byte[] startRow = makeRegionKey(EMPTY_BYTE_ARRAY); - Scan scan = new Scan(startRow, REGION_KEY_PREFIX_STOP); - scan.addColumn(FAMILY, PRUNE_UPPER_BOUND_COL); - - try (ResultScanner scanner = stateTable.getScanner(scan)) { - Result next; - while ((next = scanner.next()) != null) { - byte[] region = getRegionFromKey(next.getRow()); - if (!excludeRegions.contains(region)) { - byte[] timeBytes = next.getValue(FAMILY, PRUNE_UPPER_BOUND_COL); - if (timeBytes != null) { - long pruneUpperBoundRegion = Bytes.toLong(timeBytes); - if (pruneUpperBoundRegion < deletionPruneUpperBound) { - stateTable.delete(new Delete(next.getRow())); - } - } - } - } - } - } - } - - // --------------------------------------------------- - // ------- Methods for regions at a given time ------- - // --------------------------------------------------- - // Key: 0x2<time><region-id> - // Col 't': <empty byte array> - // --------------------------------------------------- - - /** - * Persist the regions for the given time. {@link HBaseTransactionPruningPlugin} saves the set of - * transactional regions existing in the HBase instance periodically. - * - * @param time timestamp in milliseconds - * @param regions set of regions at the time - * @throws IOException when not able to persist the data to HBase - */ - public void saveRegionsForTime(long time, Set<byte[]> regions) throws IOException { - byte[] timeBytes = Bytes.toBytes(getInvertedTime(time)); - try (Table stateTable = stateTableSupplier.get()) { - for (byte[] region : regions) { - Put put = new Put(makeTimeRegionKey(timeBytes, region)); - put.addColumn(FAMILY, REGION_TIME_COL, EMPTY_BYTE_ARRAY); - stateTable.put(put); - } - } - } - - /** - * Return the set of regions saved for the time at or before the given time. This method finds the greatest time - * that is less than or equal to the given time, and then returns all regions with that exact time, but none that are - * older than that. - * - * @param time timestamp in milliseconds - * @return set of regions and time at which they were recorded, or null if no regions found - * @throws IOException when not able to read the data from HBase - */ - @Nullable - public TimeRegions getRegionsOnOrBeforeTime(long time) throws IOException { - byte[] timeBytes = Bytes.toBytes(getInvertedTime(time)); - try (Table stateTable = stateTableSupplier.get()) { - Scan scan = new Scan(makeTimeRegionKey(timeBytes, EMPTY_BYTE_ARRAY), REGION_TIME_KEY_PREFIX_STOP); - scan.addColumn(FAMILY, REGION_TIME_COL); - - SortedSet<byte[]> regions = new TreeSet<>(Bytes.BYTES_COMPARATOR); - long currentRegionTime = -1; - try (ResultScanner scanner = stateTable.getScanner(scan)) { - Result next; - while ((next = scanner.next()) != null) { - Map.Entry<Long, byte[]> timeRegion = getTimeRegion(next.getRow()); - // Stop if reached next time value - if (currentRegionTime == -1) { - currentRegionTime = timeRegion.getKey(); - } else if (timeRegion.getKey() < currentRegionTime) { - break; - } else if (timeRegion.getKey() > currentRegionTime) { - throw new IllegalStateException( - String.format("Got out of order time %d when expecting time less than or equal to %d", - timeRegion.getKey(), currentRegionTime)); - } - regions.add(timeRegion.getValue()); - } - } - return regions.isEmpty() ? null : new TimeRegions(currentRegionTime, regions); - } - } - - /** - * Delete all the regions that were recorded for all times equal or less than the given time. - * - * @param time timestamp in milliseconds - * @throws IOException when not able to delete data in HBase - */ - public void deleteAllRegionsOnOrBeforeTime(long time) throws IOException { - byte[] timeBytes = Bytes.toBytes(getInvertedTime(time)); - try (Table stateTable = stateTableSupplier.get()) { - Scan scan = new Scan(makeTimeRegionKey(timeBytes, EMPTY_BYTE_ARRAY), REGION_TIME_KEY_PREFIX_STOP); - scan.addColumn(FAMILY, REGION_TIME_COL); - - try (ResultScanner scanner = stateTable.getScanner(scan)) { - Result next; - while ((next = scanner.next()) != null) { - stateTable.delete(new Delete(next.getRow())); - } - } - } - } - - // --------------------------------------------------------------------- - // ------- Methods for inactive transaction bound for given time ------- - // --------------------------------------------------------------------- - // Key: 0x3<inverted time> - // Col 'p': <inactive transaction bound> - // --------------------------------------------------------------------- - - /** - * Persist inactive transaction bound for a given time. This is the smallest not in-progress transaction that - * will not have writes in any HBase regions that are created after the given time. - * - * @param time time in milliseconds - * @param inactiveTransactionBound inactive transaction bound for the given time - * @throws IOException when not able to persist the data to HBase - */ - public void saveInactiveTransactionBoundForTime(long time, long inactiveTransactionBound) throws IOException { - try (Table stateTable = stateTableSupplier.get()) { - Put put = new Put(makeInactiveTransactionBoundTimeKey(Bytes.toBytes(getInvertedTime(time)))); - put.addColumn(FAMILY, INACTIVE_TRANSACTION_BOUND_TIME_COL, Bytes.toBytes(inactiveTransactionBound)); - stateTable.put(put); - } - } - - /** - * Return inactive transaction bound for the given time. - * - * @param time time in milliseconds - * @return inactive transaction bound for the given time - * @throws IOException when not able to read the data from HBase - */ - public long getInactiveTransactionBoundForTime(long time) throws IOException { - try (Table stateTable = stateTableSupplier.get()) { - Get get = new Get(makeInactiveTransactionBoundTimeKey(Bytes.toBytes(getInvertedTime(time)))); - get.addColumn(FAMILY, INACTIVE_TRANSACTION_BOUND_TIME_COL); - byte[] result = stateTable.get(get).getValue(FAMILY, INACTIVE_TRANSACTION_BOUND_TIME_COL); - return result == null ? -1 : Bytes.toLong(result); - } - } - - /** - * Delete all inactive transaction bounds recorded for a time less than the given time - * - * @param time time in milliseconds - * @throws IOException when not able to delete data in HBase - */ - public void deleteInactiveTransactionBoundsOnOrBeforeTime(long time) throws IOException { - try (Table stateTable = stateTableSupplier.get()) { - Scan scan = new Scan(makeInactiveTransactionBoundTimeKey(Bytes.toBytes(getInvertedTime(time))), - INACTIVE_TRANSACTION_BOUND_TIME_KEY_PREFIX_STOP); - scan.addColumn(FAMILY, INACTIVE_TRANSACTION_BOUND_TIME_COL); - - try (ResultScanner scanner = stateTable.getScanner(scan)) { - Result next; - while ((next = scanner.next()) != null) { - stateTable.delete(new Delete(next.getRow())); - } - } - } - } - - private byte[] makeRegionKey(byte[] regionId) { - return Bytes.add(REGION_KEY_PREFIX, regionId); - } - - private byte[] getRegionFromKey(byte[] regionKey) { - int prefixLen = REGION_KEY_PREFIX.length; - return Bytes.copy(regionKey, prefixLen, regionKey.length - prefixLen); - } - - private byte[] makeTimeRegionKey(byte[] time, byte[] regionId) { - return Bytes.add(REGION_TIME_KEY_PREFIX, time, regionId); - } - - private byte[] makeInactiveTransactionBoundTimeKey(byte[] time) { - return Bytes.add(INACTIVE_TRANSACTION_BOUND_TIME_KEY_PREFIX, time); - } - - private Map.Entry<Long, byte[]> getTimeRegion(byte[] key) { - int offset = REGION_TIME_KEY_PREFIX.length; - long time = getInvertedTime(Bytes.toLong(key, offset)); - offset += Bytes.SIZEOF_LONG; - byte[] regionName = Bytes.copy(key, offset, key.length - offset); - return Maps.immutableEntry(time, regionName); - } - - private long getInvertedTime(long time) { - return Long.MAX_VALUE - time; - } - - /** - * Supplies table for persisting state - */ - public interface TableSupplier { - Table get() throws IOException; - } -} http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/79b97198/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/janitor/HBaseTransactionPruningPlugin.java ---------------------------------------------------------------------- diff --git a/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/janitor/HBaseTransactionPruningPlugin.java b/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/janitor/HBaseTransactionPruningPlugin.java deleted file mode 100644 index f662e37..0000000 --- a/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/janitor/HBaseTransactionPruningPlugin.java +++ /dev/null @@ -1,299 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.tephra.hbase.coprocessor.janitor; - -import com.google.common.base.Function; -import com.google.common.collect.Iterables; -import com.google.common.collect.Maps; -import com.google.common.collect.Sets; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.HRegionInfo; -import org.apache.hadoop.hbase.HTableDescriptor; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.client.Admin; -import org.apache.hadoop.hbase.client.Connection; -import org.apache.hadoop.hbase.client.ConnectionFactory; -import org.apache.hadoop.hbase.client.Table; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.tephra.TxConstants; -import org.apache.tephra.hbase.coprocessor.TransactionProcessor; -import org.apache.tephra.janitor.TransactionPruningPlugin; -import org.apache.tephra.util.TxUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.util.Collections; -import java.util.List; -import java.util.Map; -import java.util.SortedSet; -import java.util.TreeSet; - -/** - * Default implementation of the {@link TransactionPruningPlugin} for HBase. - * - * This plugin determines the prune upper bound for transactional HBase tables that use - * coprocessor {@link TransactionProcessor}. - * - * <h3>State storage:</h3> - * - * This plugin expects the TransactionProcessor to save the prune upper bound for invalid transactions - * after every major compaction of a region. Let's call this <i>(region, prune upper bound)</i>. - * In addition, the plugin also persists the following information on a run at time <i>t</i> - * <ul> - * <li> - * <i>(t, set of regions)</i>: Set of transactional regions at time <i>t</i>. - * Transactional regions are regions of the tables that have the coprocessor TransactionProcessor - * attached to them. - * </li> - * <li> - * <i>(t, inactive transaction bound)</i>: This is the smallest not in-progress transaction that - * will not have writes in any HBase regions that are created after time <i>t</i>. - * This value is determined by the Transaction Service based on the transaction state at time <i>t</i> - * and passed on to the plugin. - * </li> - * </ul> - * - * <h3>Computing prune upper bound:</h3> - * - * In a typical HBase instance, there can be a constant change in the number of regions due to region creations, - * splits and merges. At any given time there can always be a region on which a major compaction has not been run. - * Since the prune upper bound will get recorded for a region only after a major compaction, - * using only the latest set of regions we may not be able to find the - * prune upper bounds for all the current regions. Hence we persist the set of regions that exist at that time - * of each run of the plugin, and use historical region set for time <i>t</i>, <i>t - 1</i>, etc. - * to determine the prune upper bound. - * - * From the regions saved at time <i>t</i>, <i>t - 1</i>, etc., - * the plugin tries to find the latest <i>(t, set of regions)</i> where all regions have been major compacted, - * i.e, all regions have prune upper bound recorded in <i>(region, prune upper bound)</i>. - * <br/> - * If such a set is found for time <i>t1</i>, the prune upper bound returned by the plugin is the minimum of - * <ul> - * <li>Prune upper bounds of regions in set <i>(t1, set of regions)</i></li> - * <li>Inactive transaction bound from <i>(t1, inactive transaction bound)</i></li> - * </ul> - * - * <p/> - * Above, when we find <i>(t1, set of regions)</i>, there may a region that was created after time <i>t1</i>, - * but has a data write from an invalid transaction that is smaller than the prune upper bounds of all - * regions in <i>(t1, set of regions)</i>. This is possible because <i>(region, prune upper bound)</i> persisted by - * TransactionProcessor is always the latest prune upper bound for a region. - * <br/> - * However a region created after time <i>t1</i> cannot have writes from an invalid transaction that is smaller than - * inactive transaction bound at the time the region was created. - * Since we limit the plugin prune upper bound using <i>(t1, inactive transaction bound)</i>, - * there should be no invalid transactions smaller than the plugin prune upper bound with writes in any - * transactional region of this HBase instance. - * - * <p/> - * Note: If your tables uses a transactional coprocessor other than TransactionProcessor, - * then you may need to write a new plugin to compute prune upper bound for those tables. - */ -@SuppressWarnings("WeakerAccess") -public class HBaseTransactionPruningPlugin implements TransactionPruningPlugin { - public static final Logger LOG = LoggerFactory.getLogger(HBaseTransactionPruningPlugin.class); - - protected Configuration conf; - protected Connection connection; - protected DataJanitorState dataJanitorState; - - @Override - public void initialize(Configuration conf) throws IOException { - this.conf = conf; - this.connection = ConnectionFactory.createConnection(conf); - - final TableName stateTable = TableName.valueOf(conf.get(TxConstants.DataJanitor.PRUNE_STATE_TABLE, - TxConstants.DataJanitor.DEFAULT_PRUNE_STATE_TABLE)); - LOG.info("Initializing plugin with state table {}", stateTable.getNameWithNamespaceInclAsString()); - this.dataJanitorState = new DataJanitorState(new DataJanitorState.TableSupplier() { - @Override - public Table get() throws IOException { - return connection.getTable(stateTable); - } - }); - } - - /** - * Determines prune upper bound for the data store as mentioned above. - */ - @Override - public long fetchPruneUpperBound(long time, long inactiveTransactionBound) throws IOException { - LOG.debug("Fetching prune upper bound for time {} and inactive transaction bound {}", - time, inactiveTransactionBound); - if (time < 0 || inactiveTransactionBound < 0) { - return -1; - } - - // Get all the current transactional regions - SortedSet<byte[]> transactionalRegions = getTransactionalRegions(); - if (!transactionalRegions.isEmpty()) { - LOG.debug("Saving {} transactional regions for time {}", transactionalRegions.size(), time); - dataJanitorState.saveRegionsForTime(time, transactionalRegions); - // Save inactive transaction bound for time as the final step. - // We can then use its existence to make sure that the data for a given time is complete or not - LOG.debug("Saving inactive transaction bound {} for time {}", inactiveTransactionBound, time); - dataJanitorState.saveInactiveTransactionBoundForTime(time, inactiveTransactionBound); - } - - return computePruneUpperBound(new TimeRegions(time, transactionalRegions)); - } - - /** - * After invalid list has been pruned, this cleans up state information that is no longer required. - * This includes - - * <ul> - * <li> - * <i>(region, prune upper bound)</i> - prune upper bound for regions that are older - * than maxPrunedInvalid - * </li> - * <li> - * <i>(t, set of regions) - Regions set that were recorded on or before the start time - * of maxPrunedInvalid - * </li> - * <li> - * (t, inactive transaction bound) - Smallest not in-progress transaction without any writes in new regions - * information recorded on or before the start time of maxPrunedInvalid - * </li> - * </ul> - */ - @Override - public void pruneComplete(long time, long maxPrunedInvalid) throws IOException { - LOG.debug("Prune complete for time {} and prune upper bound {}", time, maxPrunedInvalid); - if (time < 0 || maxPrunedInvalid < 0) { - return; - } - - // Get regions for the current time, so as to not delete the prune upper bounds for them. - // The prune upper bounds for regions are recorded by TransactionProcessor and the deletion - // is done by this class. To avoid update/delete race condition, we only delete prune upper - // bounds for the stale regions. - TimeRegions regionsToExclude = dataJanitorState.getRegionsOnOrBeforeTime(time); - if (regionsToExclude != null) { - LOG.debug("Deleting prune upper bounds smaller than {} for stale regions", maxPrunedInvalid); - dataJanitorState.deletePruneUpperBounds(maxPrunedInvalid, regionsToExclude.getRegions()); - } else { - LOG.warn("Cannot find saved regions on or before time {}", time); - } - long pruneTime = TxUtils.getTimestamp(maxPrunedInvalid); - LOG.debug("Deleting regions recorded before time {}", pruneTime); - dataJanitorState.deleteAllRegionsOnOrBeforeTime(pruneTime); - LOG.debug("Deleting inactive transaction bounds recorded on or before time {}", pruneTime); - dataJanitorState.deleteInactiveTransactionBoundsOnOrBeforeTime(pruneTime); - } - - @Override - public void destroy() { - LOG.info("Stopping plugin..."); - try { - connection.close(); - } catch (IOException e) { - LOG.error("Got exception while closing HBase connection", e); - } - } - - protected boolean isTransactionalTable(HTableDescriptor tableDescriptor) { - return tableDescriptor.hasCoprocessor(TransactionProcessor.class.getName()); - } - - protected SortedSet<byte[]> getTransactionalRegions() throws IOException { - SortedSet<byte[]> regions = new TreeSet<>(Bytes.BYTES_COMPARATOR); - try (Admin admin = connection.getAdmin()) { - HTableDescriptor[] tableDescriptors = admin.listTables(); - LOG.debug("Got {} tables to process", tableDescriptors == null ? 0 : tableDescriptors.length); - if (tableDescriptors != null) { - for (HTableDescriptor tableDescriptor : tableDescriptors) { - if (isTransactionalTable(tableDescriptor)) { - List<HRegionInfo> tableRegions = admin.getTableRegions(tableDescriptor.getTableName()); - LOG.debug("Regions for table {}: {}", tableDescriptor.getTableName(), tableRegions); - if (tableRegions != null) { - for (HRegionInfo region : tableRegions) { - regions.add(region.getRegionName()); - } - } - } else { - LOG.debug("{} is not a transactional table", tableDescriptor.getTableName()); - } - } - } - } - return regions; - } - - /** - * Try to find the latest set of regions in which all regions have been major compacted, and - * compute prune upper bound from them. Starting from newest to oldest, this looks into the - * region set that has been saved periodically, and joins it with the prune upper bound data - * for a region recorded after a major compaction. - * - * @param timeRegions the latest set of regions - * @return prune upper bound - * @throws IOException when not able to talk to HBase - */ - private long computePruneUpperBound(TimeRegions timeRegions) throws IOException { - do { - LOG.debug("Computing prune upper bound for {}", timeRegions); - SortedSet<byte[]> transactionalRegions = timeRegions.getRegions(); - long time = timeRegions.getTime(); - - Map<byte[], Long> pruneUpperBoundRegions = dataJanitorState.getPruneUpperBoundForRegions(transactionalRegions); - logPruneUpperBoundRegions(pruneUpperBoundRegions); - // If prune upper bounds are found for all the transactional regions, then compute the prune upper bound - // across all regions - if (!transactionalRegions.isEmpty() && pruneUpperBoundRegions.size() == transactionalRegions.size()) { - long inactiveTransactionBound = dataJanitorState.getInactiveTransactionBoundForTime(time); - LOG.debug("Found max prune upper bound {} for time {}", inactiveTransactionBound, time); - // If inactiveTransactionBound is not recorded then that means the data is not complete for these regions - if (inactiveTransactionBound != -1) { - Long minPruneUpperBoundRegions = Collections.min(pruneUpperBoundRegions.values()); - return Math.min(inactiveTransactionBound, minPruneUpperBoundRegions); - } else { - if (LOG.isDebugEnabled()) { - LOG.debug("Ignoring regions for time {} as no inactiveTransactionBound was found for that time, " + - "and hence the data must be incomplete", time); - } - } - } else { - if (LOG.isDebugEnabled()) { - Sets.SetView<byte[]> difference = Sets.difference(transactionalRegions, pruneUpperBoundRegions.keySet()); - LOG.debug("Ignoring regions for time {} because the following regions did not record a pruneUpperBound: {}", - time, Iterables.transform(difference, TimeRegions.BYTE_ARR_TO_STRING_FN)); - } - } - - timeRegions = dataJanitorState.getRegionsOnOrBeforeTime(time - 1); - } while (timeRegions != null); - return -1; - } - - private void logPruneUpperBoundRegions(Map<byte[], Long> pruneUpperBoundRegions) { - if (LOG.isDebugEnabled()) { - LOG.debug("Got region - prune upper bound map: {}", - Iterables.transform(pruneUpperBoundRegions.entrySet(), - new Function<Map.Entry<byte[], Long>, Map.Entry<String, Long>>() { - @Override - public Map.Entry<String, Long> apply(Map.Entry<byte[], Long> input) { - String regionName = TimeRegions.BYTE_ARR_TO_STRING_FN.apply(input.getKey()); - return Maps.immutableEntry(regionName, input.getValue()); - } - })); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/79b97198/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/janitor/TimeRegions.java ---------------------------------------------------------------------- diff --git a/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/janitor/TimeRegions.java b/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/janitor/TimeRegions.java deleted file mode 100644 index 813f5dd..0000000 --- a/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/janitor/TimeRegions.java +++ /dev/null @@ -1,85 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.tephra.hbase.coprocessor.janitor; - -import com.google.common.base.Function; -import com.google.common.base.Joiner; -import com.google.common.collect.Iterables; -import org.apache.hadoop.hbase.util.Bytes; - -import java.util.Objects; -import java.util.SortedSet; - -/** - * Contains information on the set of transactional regions recorded at a given time - */ -@SuppressWarnings("WeakerAccess") -public class TimeRegions { - static final Function<byte[], String> BYTE_ARR_TO_STRING_FN = - new Function<byte[], String>() { - @Override - public String apply(byte[] input) { - return Bytes.toStringBinary(input); - } - }; - - private final long time; - private final SortedSet<byte[]> regions; - - public TimeRegions(long time, SortedSet<byte[]> regions) { - this.time = time; - this.regions = regions; - } - - public long getTime() { - return time; - } - - public SortedSet<byte[]> getRegions() { - return regions; - } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - TimeRegions that = (TimeRegions) o; - return time == that.time && - Objects.equals(regions, that.regions); - } - - @Override - public int hashCode() { - return Objects.hash(time, regions); - } - - @Override - public String toString() { - Iterable<String> regionStrings = Iterables.transform(regions, BYTE_ARR_TO_STRING_FN); - return "TimeRegions{" + - "time=" + time + - ", regions=[" + Joiner.on(" ").join(regionStrings) + "]" + - '}'; - } -}
