Repository: incubator-tephra Updated Branches: refs/heads/release/0.11.0-incubating 95c6bfb6b -> 66a2fce79
(TEPHRA-227) Add new command to get the set of regions that have not been compacted This closes #40 from GitHub. Signed-off-by: Gokul Gunasekaran <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/incubator-tephra/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-tephra/commit/66a2fce7 Tree: http://git-wip-us.apache.org/repos/asf/incubator-tephra/tree/66a2fce7 Diff: http://git-wip-us.apache.org/repos/asf/incubator-tephra/diff/66a2fce7 Branch: refs/heads/release/0.11.0-incubating Commit: 66a2fce795dec34da7b992aeda883345fbbf3083 Parents: 95c6bfb Author: Gokul Gunasekaran <[email protected]> Authored: Wed Mar 1 14:54:40 2017 -0800 Committer: Gokul Gunasekaran <[email protected]> Committed: Mon Mar 6 13:47:14 2017 -0800 ---------------------------------------------------------------------- .../hbase/txprune/InvalidListPruningDebug.java | 105 +++++++++++++++++-- .../hbase/txprune/InvalidListPruningDebug.java | 105 +++++++++++++++++-- .../hbase/txprune/InvalidListPruningDebug.java | 105 +++++++++++++++++-- .../hbase/txprune/InvalidListPruningDebug.java | 105 +++++++++++++++++-- .../hbase/txprune/InvalidListPruningDebug.java | 105 +++++++++++++++++-- 5 files changed, 495 insertions(+), 30 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/66a2fce7/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/txprune/InvalidListPruningDebug.java ---------------------------------------------------------------------- diff --git a/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/txprune/InvalidListPruningDebug.java b/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/txprune/InvalidListPruningDebug.java index d48e48d..620885b 100644 --- a/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/txprune/InvalidListPruningDebug.java +++ b/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/txprune/InvalidListPruningDebug.java @@ -21,6 +21,7 @@ package org.apache.tephra.hbase.txprune; import com.google.common.collect.Iterables; import com.google.common.collect.MinMaxPriorityQueue; +import com.google.common.collect.Sets; import com.google.gson.Gson; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; @@ -36,12 +37,15 @@ import org.slf4j.LoggerFactory; import java.io.IOException; import java.io.PrintWriter; +import java.util.ArrayList; import java.util.Comparator; import java.util.HashMap; +import java.util.HashSet; import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Queue; +import java.util.Set; import java.util.SortedSet; import java.util.TreeSet; import javax.annotation.Nullable; @@ -53,6 +57,8 @@ public class InvalidListPruningDebug { private static final Logger LOG = LoggerFactory.getLogger(InvalidListPruningDebug.class); private static final Gson GSON = new Gson(); private DataJanitorState dataJanitorState; + private HConnection connection; + private TableName tableName; /** * Initialize the Invalid List Debug Tool. @@ -61,20 +67,76 @@ public class InvalidListPruningDebug { */ public void initialize(final Configuration conf) throws IOException { LOG.debug("InvalidListPruningDebugMain : initialize method called"); - final HConnection connection = new HBaseAdmin(conf).getConnection(); + connection = new HBaseAdmin(conf).getConnection(); + tableName = TableName.valueOf(conf.get(TxConstants.TransactionPruning.PRUNE_STATE_TABLE, + TxConstants.TransactionPruning.DEFAULT_PRUNE_STATE_TABLE)); dataJanitorState = new DataJanitorState(new DataJanitorState.TableSupplier() { @Override public HTableInterface get() throws IOException { - return connection.getTable(TableName.valueOf( - conf.get(TxConstants.TransactionPruning.PRUNE_STATE_TABLE, - TxConstants.TransactionPruning.DEFAULT_PRUNE_STATE_TABLE))); + return connection.getTable(tableName); } }); } + public void destroy() throws IOException { + if (connection != null) { + connection.close(); + } + } + + /** + * Returns a set of regions that are live but are not empty nor have a prune upper bound recorded. These regions + * will stop the progress of pruning. + * + * @param numRegions number of regions + * @return {@link Set} of regions that needs to be compacted and flushed + */ + public Set<String> getRegionsToBeCompacted(Integer numRegions) throws IOException { + // Fetch the live regions + Map<Long, SortedSet<String>> latestTimeRegion = getRegionsOnOrBeforeTime(System.currentTimeMillis()); + if (latestTimeRegion.isEmpty()) { + return new HashSet<>(); + } + + Long timestamp = latestTimeRegion.keySet().iterator().next(); + SortedSet<String> liveRegions = latestTimeRegion.get(timestamp); + + SortedSet<byte[]> emptyRegions = dataJanitorState.getEmptyRegionsAfterTime(timestamp, null); + SortedSet<String> emptyRegionNames = new TreeSet<>(); + Iterable<String> regionStrings = Iterables.transform(emptyRegions, TimeRegions.BYTE_ARR_TO_STRING_FN); + for (String regionString : regionStrings) { + emptyRegionNames.add(regionString); + } + + Set<String> nonEmptyRegions = Sets.newHashSet(Sets.difference(liveRegions, emptyRegionNames)); + + // Get all pruned regions and remove them from the nonEmptyRegions, resulting in a set of regions that are + // not empty and have not been registered prune upper bound + Queue<RegionPruneInfo> prunedRegions = getIdleRegions(-1); + for (RegionPruneInfo prunedRegion : prunedRegions) { + if (nonEmptyRegions.contains(prunedRegion.getRegionNameAsString())) { + nonEmptyRegions.remove(prunedRegion.getRegionNameAsString()); + } + } + + if ((numRegions < 0) || (numRegions >= nonEmptyRegions.size())) { + return nonEmptyRegions; + } + + Set<String> subsetRegions = new HashSet<>(numRegions); + for (String regionName : nonEmptyRegions) { + if (subsetRegions.size() == numRegions) { + break; + } + subsetRegions.add(regionName); + } + return subsetRegions; + } + /** * Return a list of RegionPruneInfo. These regions are the ones that have the lowest prune upper bounds. - * If -1 is passed in, all the regions and their prune upper bound will be returned. + * If -1 is passed in, all the regions and their prune upper bound will be returned. Note that only the regions + * that are known to be live will be returned. * * @param numRegions number of regions * @return Map of region name and its prune upper bound @@ -85,10 +147,32 @@ public class InvalidListPruningDebug { return new LinkedList<>(); } + // Create a set with region names + Set<String> pruneRegionNameSet = new HashSet<>(); + for (RegionPruneInfo regionPruneInfo : regionPruneInfos) { + pruneRegionNameSet.add(regionPruneInfo.getRegionNameAsString()); + } + + // Fetch the live regions + Map<Long, SortedSet<String>> latestTimeRegion = getRegionsOnOrBeforeTime(System.currentTimeMillis()); + if (!latestTimeRegion.isEmpty()) { + SortedSet<String> liveRegions = latestTimeRegion.values().iterator().next(); + Set<String> liveRegionsWithPruneInfo = Sets.intersection(liveRegions, pruneRegionNameSet); + List<RegionPruneInfo> liveRegionWithPruneInfoList = new ArrayList<>(); + for (RegionPruneInfo regionPruneInfo : regionPruneInfos) { + if (liveRegionsWithPruneInfo.contains(regionPruneInfo.getRegionNameAsString())) { + liveRegionWithPruneInfoList.add(regionPruneInfo); + } + } + + // Use the subset of live regions and prune regions + regionPruneInfos = liveRegionWithPruneInfoList; + } + if (numRegions < 0) { numRegions = regionPruneInfos.size(); } - + Queue<RegionPruneInfo> lowestPrunes = MinMaxPriorityQueue.orderedBy(new Comparator<RegionPruneInfo>() { @Override public int compare(RegionPruneInfo o1, RegionPruneInfo o2) { @@ -148,6 +232,9 @@ public class InvalidListPruningDebug { "provided as the limit, prune upper bounds of all regions are returned."); pw.println("prune-info region-name-as-string"); pw.println("Desc: Prints out the Pruning information for the region 'region-name-as-string'"); + pw.println("to-compact-regions limit"); + pw.println("Desc: Prints out 'limit' number of regions that are active, but are not empty, " + + "and have not registered a prune upper bound."); } private boolean execute(String[] args) throws IOException { @@ -177,6 +264,11 @@ public class InvalidListPruningDebug { pw.println(String.format("No prune info found for the region %s.", parameter)); } return true; + } else if ("to-compact-regions".equals(command)) { + Integer numRegions = Integer.parseInt(parameter); + Set<String> toBeCompactedRegions = getRegionsToBeCompacted(numRegions); + pw.println(GSON.toJson(toBeCompactedRegions)); + return true; } else { pw.println(String.format("%s is not a valid command.", command)); printUsage(pw); @@ -191,6 +283,7 @@ public class InvalidListPruningDebug { try { pruningDebug.initialize(hConf); boolean success = pruningDebug.execute(args); + pruningDebug.destroy(); if (!success) { System.exit(1); } http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/66a2fce7/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/txprune/InvalidListPruningDebug.java ---------------------------------------------------------------------- diff --git a/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/txprune/InvalidListPruningDebug.java b/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/txprune/InvalidListPruningDebug.java index d48e48d..620885b 100644 --- a/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/txprune/InvalidListPruningDebug.java +++ b/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/txprune/InvalidListPruningDebug.java @@ -21,6 +21,7 @@ package org.apache.tephra.hbase.txprune; import com.google.common.collect.Iterables; import com.google.common.collect.MinMaxPriorityQueue; +import com.google.common.collect.Sets; import com.google.gson.Gson; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; @@ -36,12 +37,15 @@ import org.slf4j.LoggerFactory; import java.io.IOException; import java.io.PrintWriter; +import java.util.ArrayList; import java.util.Comparator; import java.util.HashMap; +import java.util.HashSet; import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Queue; +import java.util.Set; import java.util.SortedSet; import java.util.TreeSet; import javax.annotation.Nullable; @@ -53,6 +57,8 @@ public class InvalidListPruningDebug { private static final Logger LOG = LoggerFactory.getLogger(InvalidListPruningDebug.class); private static final Gson GSON = new Gson(); private DataJanitorState dataJanitorState; + private HConnection connection; + private TableName tableName; /** * Initialize the Invalid List Debug Tool. @@ -61,20 +67,76 @@ public class InvalidListPruningDebug { */ public void initialize(final Configuration conf) throws IOException { LOG.debug("InvalidListPruningDebugMain : initialize method called"); - final HConnection connection = new HBaseAdmin(conf).getConnection(); + connection = new HBaseAdmin(conf).getConnection(); + tableName = TableName.valueOf(conf.get(TxConstants.TransactionPruning.PRUNE_STATE_TABLE, + TxConstants.TransactionPruning.DEFAULT_PRUNE_STATE_TABLE)); dataJanitorState = new DataJanitorState(new DataJanitorState.TableSupplier() { @Override public HTableInterface get() throws IOException { - return connection.getTable(TableName.valueOf( - conf.get(TxConstants.TransactionPruning.PRUNE_STATE_TABLE, - TxConstants.TransactionPruning.DEFAULT_PRUNE_STATE_TABLE))); + return connection.getTable(tableName); } }); } + public void destroy() throws IOException { + if (connection != null) { + connection.close(); + } + } + + /** + * Returns a set of regions that are live but are not empty nor have a prune upper bound recorded. These regions + * will stop the progress of pruning. + * + * @param numRegions number of regions + * @return {@link Set} of regions that needs to be compacted and flushed + */ + public Set<String> getRegionsToBeCompacted(Integer numRegions) throws IOException { + // Fetch the live regions + Map<Long, SortedSet<String>> latestTimeRegion = getRegionsOnOrBeforeTime(System.currentTimeMillis()); + if (latestTimeRegion.isEmpty()) { + return new HashSet<>(); + } + + Long timestamp = latestTimeRegion.keySet().iterator().next(); + SortedSet<String> liveRegions = latestTimeRegion.get(timestamp); + + SortedSet<byte[]> emptyRegions = dataJanitorState.getEmptyRegionsAfterTime(timestamp, null); + SortedSet<String> emptyRegionNames = new TreeSet<>(); + Iterable<String> regionStrings = Iterables.transform(emptyRegions, TimeRegions.BYTE_ARR_TO_STRING_FN); + for (String regionString : regionStrings) { + emptyRegionNames.add(regionString); + } + + Set<String> nonEmptyRegions = Sets.newHashSet(Sets.difference(liveRegions, emptyRegionNames)); + + // Get all pruned regions and remove them from the nonEmptyRegions, resulting in a set of regions that are + // not empty and have not been registered prune upper bound + Queue<RegionPruneInfo> prunedRegions = getIdleRegions(-1); + for (RegionPruneInfo prunedRegion : prunedRegions) { + if (nonEmptyRegions.contains(prunedRegion.getRegionNameAsString())) { + nonEmptyRegions.remove(prunedRegion.getRegionNameAsString()); + } + } + + if ((numRegions < 0) || (numRegions >= nonEmptyRegions.size())) { + return nonEmptyRegions; + } + + Set<String> subsetRegions = new HashSet<>(numRegions); + for (String regionName : nonEmptyRegions) { + if (subsetRegions.size() == numRegions) { + break; + } + subsetRegions.add(regionName); + } + return subsetRegions; + } + /** * Return a list of RegionPruneInfo. These regions are the ones that have the lowest prune upper bounds. - * If -1 is passed in, all the regions and their prune upper bound will be returned. + * If -1 is passed in, all the regions and their prune upper bound will be returned. Note that only the regions + * that are known to be live will be returned. * * @param numRegions number of regions * @return Map of region name and its prune upper bound @@ -85,10 +147,32 @@ public class InvalidListPruningDebug { return new LinkedList<>(); } + // Create a set with region names + Set<String> pruneRegionNameSet = new HashSet<>(); + for (RegionPruneInfo regionPruneInfo : regionPruneInfos) { + pruneRegionNameSet.add(regionPruneInfo.getRegionNameAsString()); + } + + // Fetch the live regions + Map<Long, SortedSet<String>> latestTimeRegion = getRegionsOnOrBeforeTime(System.currentTimeMillis()); + if (!latestTimeRegion.isEmpty()) { + SortedSet<String> liveRegions = latestTimeRegion.values().iterator().next(); + Set<String> liveRegionsWithPruneInfo = Sets.intersection(liveRegions, pruneRegionNameSet); + List<RegionPruneInfo> liveRegionWithPruneInfoList = new ArrayList<>(); + for (RegionPruneInfo regionPruneInfo : regionPruneInfos) { + if (liveRegionsWithPruneInfo.contains(regionPruneInfo.getRegionNameAsString())) { + liveRegionWithPruneInfoList.add(regionPruneInfo); + } + } + + // Use the subset of live regions and prune regions + regionPruneInfos = liveRegionWithPruneInfoList; + } + if (numRegions < 0) { numRegions = regionPruneInfos.size(); } - + Queue<RegionPruneInfo> lowestPrunes = MinMaxPriorityQueue.orderedBy(new Comparator<RegionPruneInfo>() { @Override public int compare(RegionPruneInfo o1, RegionPruneInfo o2) { @@ -148,6 +232,9 @@ public class InvalidListPruningDebug { "provided as the limit, prune upper bounds of all regions are returned."); pw.println("prune-info region-name-as-string"); pw.println("Desc: Prints out the Pruning information for the region 'region-name-as-string'"); + pw.println("to-compact-regions limit"); + pw.println("Desc: Prints out 'limit' number of regions that are active, but are not empty, " + + "and have not registered a prune upper bound."); } private boolean execute(String[] args) throws IOException { @@ -177,6 +264,11 @@ public class InvalidListPruningDebug { pw.println(String.format("No prune info found for the region %s.", parameter)); } return true; + } else if ("to-compact-regions".equals(command)) { + Integer numRegions = Integer.parseInt(parameter); + Set<String> toBeCompactedRegions = getRegionsToBeCompacted(numRegions); + pw.println(GSON.toJson(toBeCompactedRegions)); + return true; } else { pw.println(String.format("%s is not a valid command.", command)); printUsage(pw); @@ -191,6 +283,7 @@ public class InvalidListPruningDebug { try { pruningDebug.initialize(hConf); boolean success = pruningDebug.execute(args); + pruningDebug.destroy(); if (!success) { System.exit(1); } http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/66a2fce7/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/txprune/InvalidListPruningDebug.java ---------------------------------------------------------------------- diff --git a/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/txprune/InvalidListPruningDebug.java b/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/txprune/InvalidListPruningDebug.java index e748d90..443c998 100644 --- a/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/txprune/InvalidListPruningDebug.java +++ b/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/txprune/InvalidListPruningDebug.java @@ -21,6 +21,7 @@ package org.apache.tephra.hbase.txprune; import com.google.common.collect.Iterables; import com.google.common.collect.MinMaxPriorityQueue; +import com.google.common.collect.Sets; import com.google.gson.Gson; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; @@ -36,12 +37,15 @@ import org.slf4j.LoggerFactory; import java.io.IOException; import java.io.PrintWriter; +import java.util.ArrayList; import java.util.Comparator; import java.util.HashMap; +import java.util.HashSet; import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Queue; +import java.util.Set; import java.util.SortedSet; import java.util.TreeSet; import javax.annotation.Nullable; @@ -53,6 +57,8 @@ public class InvalidListPruningDebug { private static final Logger LOG = LoggerFactory.getLogger(InvalidListPruningDebug.class); private static final Gson GSON = new Gson(); private DataJanitorState dataJanitorState; + private Connection connection; + private TableName tableName; /** * Initialize the Invalid List Debug Tool. @@ -61,20 +67,76 @@ public class InvalidListPruningDebug { */ public void initialize(final Configuration conf) throws IOException { LOG.debug("InvalidListPruningDebugMain : initialize method called"); - final Connection connection = ConnectionFactory.createConnection(conf); + connection = ConnectionFactory.createConnection(conf); + tableName = TableName.valueOf(conf.get(TxConstants.TransactionPruning.PRUNE_STATE_TABLE, + TxConstants.TransactionPruning.DEFAULT_PRUNE_STATE_TABLE)); dataJanitorState = new DataJanitorState(new DataJanitorState.TableSupplier() { @Override public Table get() throws IOException { - return connection.getTable(TableName.valueOf( - conf.get(TxConstants.TransactionPruning.PRUNE_STATE_TABLE, - TxConstants.TransactionPruning.DEFAULT_PRUNE_STATE_TABLE))); + return connection.getTable(tableName); } }); } + public void destroy() throws IOException { + if (connection != null) { + connection.close(); + } + } + + /** + * Returns a set of regions that are live but are not empty nor have a prune upper bound recorded. These regions + * will stop the progress of pruning. + * + * @param numRegions number of regions + * @return {@link Set} of regions that needs to be compacted and flushed + */ + public Set<String> getRegionsToBeCompacted(Integer numRegions) throws IOException { + // Fetch the live regions + Map<Long, SortedSet<String>> latestTimeRegion = getRegionsOnOrBeforeTime(System.currentTimeMillis()); + if (latestTimeRegion.isEmpty()) { + return new HashSet<>(); + } + + Long timestamp = latestTimeRegion.keySet().iterator().next(); + SortedSet<String> liveRegions = latestTimeRegion.get(timestamp); + + SortedSet<byte[]> emptyRegions = dataJanitorState.getEmptyRegionsAfterTime(timestamp, null); + SortedSet<String> emptyRegionNames = new TreeSet<>(); + Iterable<String> regionStrings = Iterables.transform(emptyRegions, TimeRegions.BYTE_ARR_TO_STRING_FN); + for (String regionString : regionStrings) { + emptyRegionNames.add(regionString); + } + + Set<String> nonEmptyRegions = Sets.newHashSet(Sets.difference(liveRegions, emptyRegionNames)); + + // Get all pruned regions and remove them from the nonEmptyRegions, resulting in a set of regions that are + // not empty and have not been registered prune upper bound + Queue<RegionPruneInfo> prunedRegions = getIdleRegions(-1); + for (RegionPruneInfo prunedRegion : prunedRegions) { + if (nonEmptyRegions.contains(prunedRegion.getRegionNameAsString())) { + nonEmptyRegions.remove(prunedRegion.getRegionNameAsString()); + } + } + + if ((numRegions < 0) || (numRegions >= nonEmptyRegions.size())) { + return nonEmptyRegions; + } + + Set<String> subsetRegions = new HashSet<>(numRegions); + for (String regionName : nonEmptyRegions) { + if (subsetRegions.size() == numRegions) { + break; + } + subsetRegions.add(regionName); + } + return subsetRegions; + } + /** * Return a list of RegionPruneInfo. These regions are the ones that have the lowest prune upper bounds. - * If -1 is passed in, all the regions and their prune upper bound will be returned. + * If -1 is passed in, all the regions and their prune upper bound will be returned. Note that only the regions + * that are known to be live will be returned. * * @param numRegions number of regions * @return Map of region name and its prune upper bound @@ -85,10 +147,32 @@ public class InvalidListPruningDebug { return new LinkedList<>(); } + // Create a set with region names + Set<String> pruneRegionNameSet = new HashSet<>(); + for (RegionPruneInfo regionPruneInfo : regionPruneInfos) { + pruneRegionNameSet.add(regionPruneInfo.getRegionNameAsString()); + } + + // Fetch the live regions + Map<Long, SortedSet<String>> latestTimeRegion = getRegionsOnOrBeforeTime(System.currentTimeMillis()); + if (!latestTimeRegion.isEmpty()) { + SortedSet<String> liveRegions = latestTimeRegion.values().iterator().next(); + Set<String> liveRegionsWithPruneInfo = Sets.intersection(liveRegions, pruneRegionNameSet); + List<RegionPruneInfo> liveRegionWithPruneInfoList = new ArrayList<>(); + for (RegionPruneInfo regionPruneInfo : regionPruneInfos) { + if (liveRegionsWithPruneInfo.contains(regionPruneInfo.getRegionNameAsString())) { + liveRegionWithPruneInfoList.add(regionPruneInfo); + } + } + + // Use the subset of live regions and prune regions + regionPruneInfos = liveRegionWithPruneInfoList; + } + if (numRegions < 0) { numRegions = regionPruneInfos.size(); } - + Queue<RegionPruneInfo> lowestPrunes = MinMaxPriorityQueue.orderedBy(new Comparator<RegionPruneInfo>() { @Override public int compare(RegionPruneInfo o1, RegionPruneInfo o2) { @@ -148,6 +232,9 @@ public class InvalidListPruningDebug { "provided as the limit, prune upper bounds of all regions are returned."); pw.println("prune-info region-name-as-string"); pw.println("Desc: Prints out the Pruning information for the region 'region-name-as-string'"); + pw.println("to-compact-regions limit"); + pw.println("Desc: Prints out 'limit' number of regions that are active, but are not empty, " + + "and have not registered a prune upper bound."); } private boolean execute(String[] args) throws IOException { @@ -177,6 +264,11 @@ public class InvalidListPruningDebug { pw.println(String.format("No prune info found for the region %s.", parameter)); } return true; + } else if ("to-compact-regions".equals(command)) { + Integer numRegions = Integer.parseInt(parameter); + Set<String> toBeCompactedRegions = getRegionsToBeCompacted(numRegions); + pw.println(GSON.toJson(toBeCompactedRegions)); + return true; } else { pw.println(String.format("%s is not a valid command.", command)); printUsage(pw); @@ -191,6 +283,7 @@ public class InvalidListPruningDebug { try { pruningDebug.initialize(hConf); boolean success = pruningDebug.execute(args); + pruningDebug.destroy(); if (!success) { System.exit(1); } http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/66a2fce7/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/txprune/InvalidListPruningDebug.java ---------------------------------------------------------------------- diff --git a/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/txprune/InvalidListPruningDebug.java b/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/txprune/InvalidListPruningDebug.java index e748d90..443c998 100644 --- a/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/txprune/InvalidListPruningDebug.java +++ b/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/txprune/InvalidListPruningDebug.java @@ -21,6 +21,7 @@ package org.apache.tephra.hbase.txprune; import com.google.common.collect.Iterables; import com.google.common.collect.MinMaxPriorityQueue; +import com.google.common.collect.Sets; import com.google.gson.Gson; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; @@ -36,12 +37,15 @@ import org.slf4j.LoggerFactory; import java.io.IOException; import java.io.PrintWriter; +import java.util.ArrayList; import java.util.Comparator; import java.util.HashMap; +import java.util.HashSet; import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Queue; +import java.util.Set; import java.util.SortedSet; import java.util.TreeSet; import javax.annotation.Nullable; @@ -53,6 +57,8 @@ public class InvalidListPruningDebug { private static final Logger LOG = LoggerFactory.getLogger(InvalidListPruningDebug.class); private static final Gson GSON = new Gson(); private DataJanitorState dataJanitorState; + private Connection connection; + private TableName tableName; /** * Initialize the Invalid List Debug Tool. @@ -61,20 +67,76 @@ public class InvalidListPruningDebug { */ public void initialize(final Configuration conf) throws IOException { LOG.debug("InvalidListPruningDebugMain : initialize method called"); - final Connection connection = ConnectionFactory.createConnection(conf); + connection = ConnectionFactory.createConnection(conf); + tableName = TableName.valueOf(conf.get(TxConstants.TransactionPruning.PRUNE_STATE_TABLE, + TxConstants.TransactionPruning.DEFAULT_PRUNE_STATE_TABLE)); dataJanitorState = new DataJanitorState(new DataJanitorState.TableSupplier() { @Override public Table get() throws IOException { - return connection.getTable(TableName.valueOf( - conf.get(TxConstants.TransactionPruning.PRUNE_STATE_TABLE, - TxConstants.TransactionPruning.DEFAULT_PRUNE_STATE_TABLE))); + return connection.getTable(tableName); } }); } + public void destroy() throws IOException { + if (connection != null) { + connection.close(); + } + } + + /** + * Returns a set of regions that are live but are not empty nor have a prune upper bound recorded. These regions + * will stop the progress of pruning. + * + * @param numRegions number of regions + * @return {@link Set} of regions that needs to be compacted and flushed + */ + public Set<String> getRegionsToBeCompacted(Integer numRegions) throws IOException { + // Fetch the live regions + Map<Long, SortedSet<String>> latestTimeRegion = getRegionsOnOrBeforeTime(System.currentTimeMillis()); + if (latestTimeRegion.isEmpty()) { + return new HashSet<>(); + } + + Long timestamp = latestTimeRegion.keySet().iterator().next(); + SortedSet<String> liveRegions = latestTimeRegion.get(timestamp); + + SortedSet<byte[]> emptyRegions = dataJanitorState.getEmptyRegionsAfterTime(timestamp, null); + SortedSet<String> emptyRegionNames = new TreeSet<>(); + Iterable<String> regionStrings = Iterables.transform(emptyRegions, TimeRegions.BYTE_ARR_TO_STRING_FN); + for (String regionString : regionStrings) { + emptyRegionNames.add(regionString); + } + + Set<String> nonEmptyRegions = Sets.newHashSet(Sets.difference(liveRegions, emptyRegionNames)); + + // Get all pruned regions and remove them from the nonEmptyRegions, resulting in a set of regions that are + // not empty and have not been registered prune upper bound + Queue<RegionPruneInfo> prunedRegions = getIdleRegions(-1); + for (RegionPruneInfo prunedRegion : prunedRegions) { + if (nonEmptyRegions.contains(prunedRegion.getRegionNameAsString())) { + nonEmptyRegions.remove(prunedRegion.getRegionNameAsString()); + } + } + + if ((numRegions < 0) || (numRegions >= nonEmptyRegions.size())) { + return nonEmptyRegions; + } + + Set<String> subsetRegions = new HashSet<>(numRegions); + for (String regionName : nonEmptyRegions) { + if (subsetRegions.size() == numRegions) { + break; + } + subsetRegions.add(regionName); + } + return subsetRegions; + } + /** * Return a list of RegionPruneInfo. These regions are the ones that have the lowest prune upper bounds. - * If -1 is passed in, all the regions and their prune upper bound will be returned. + * If -1 is passed in, all the regions and their prune upper bound will be returned. Note that only the regions + * that are known to be live will be returned. * * @param numRegions number of regions * @return Map of region name and its prune upper bound @@ -85,10 +147,32 @@ public class InvalidListPruningDebug { return new LinkedList<>(); } + // Create a set with region names + Set<String> pruneRegionNameSet = new HashSet<>(); + for (RegionPruneInfo regionPruneInfo : regionPruneInfos) { + pruneRegionNameSet.add(regionPruneInfo.getRegionNameAsString()); + } + + // Fetch the live regions + Map<Long, SortedSet<String>> latestTimeRegion = getRegionsOnOrBeforeTime(System.currentTimeMillis()); + if (!latestTimeRegion.isEmpty()) { + SortedSet<String> liveRegions = latestTimeRegion.values().iterator().next(); + Set<String> liveRegionsWithPruneInfo = Sets.intersection(liveRegions, pruneRegionNameSet); + List<RegionPruneInfo> liveRegionWithPruneInfoList = new ArrayList<>(); + for (RegionPruneInfo regionPruneInfo : regionPruneInfos) { + if (liveRegionsWithPruneInfo.contains(regionPruneInfo.getRegionNameAsString())) { + liveRegionWithPruneInfoList.add(regionPruneInfo); + } + } + + // Use the subset of live regions and prune regions + regionPruneInfos = liveRegionWithPruneInfoList; + } + if (numRegions < 0) { numRegions = regionPruneInfos.size(); } - + Queue<RegionPruneInfo> lowestPrunes = MinMaxPriorityQueue.orderedBy(new Comparator<RegionPruneInfo>() { @Override public int compare(RegionPruneInfo o1, RegionPruneInfo o2) { @@ -148,6 +232,9 @@ public class InvalidListPruningDebug { "provided as the limit, prune upper bounds of all regions are returned."); pw.println("prune-info region-name-as-string"); pw.println("Desc: Prints out the Pruning information for the region 'region-name-as-string'"); + pw.println("to-compact-regions limit"); + pw.println("Desc: Prints out 'limit' number of regions that are active, but are not empty, " + + "and have not registered a prune upper bound."); } private boolean execute(String[] args) throws IOException { @@ -177,6 +264,11 @@ public class InvalidListPruningDebug { pw.println(String.format("No prune info found for the region %s.", parameter)); } return true; + } else if ("to-compact-regions".equals(command)) { + Integer numRegions = Integer.parseInt(parameter); + Set<String> toBeCompactedRegions = getRegionsToBeCompacted(numRegions); + pw.println(GSON.toJson(toBeCompactedRegions)); + return true; } else { pw.println(String.format("%s is not a valid command.", command)); printUsage(pw); @@ -191,6 +283,7 @@ public class InvalidListPruningDebug { try { pruningDebug.initialize(hConf); boolean success = pruningDebug.execute(args); + pruningDebug.destroy(); if (!success) { System.exit(1); } http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/66a2fce7/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/InvalidListPruningDebug.java ---------------------------------------------------------------------- diff --git a/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/InvalidListPruningDebug.java b/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/InvalidListPruningDebug.java index e748d90..443c998 100644 --- a/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/InvalidListPruningDebug.java +++ b/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/InvalidListPruningDebug.java @@ -21,6 +21,7 @@ package org.apache.tephra.hbase.txprune; import com.google.common.collect.Iterables; import com.google.common.collect.MinMaxPriorityQueue; +import com.google.common.collect.Sets; import com.google.gson.Gson; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; @@ -36,12 +37,15 @@ import org.slf4j.LoggerFactory; import java.io.IOException; import java.io.PrintWriter; +import java.util.ArrayList; import java.util.Comparator; import java.util.HashMap; +import java.util.HashSet; import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Queue; +import java.util.Set; import java.util.SortedSet; import java.util.TreeSet; import javax.annotation.Nullable; @@ -53,6 +57,8 @@ public class InvalidListPruningDebug { private static final Logger LOG = LoggerFactory.getLogger(InvalidListPruningDebug.class); private static final Gson GSON = new Gson(); private DataJanitorState dataJanitorState; + private Connection connection; + private TableName tableName; /** * Initialize the Invalid List Debug Tool. @@ -61,20 +67,76 @@ public class InvalidListPruningDebug { */ public void initialize(final Configuration conf) throws IOException { LOG.debug("InvalidListPruningDebugMain : initialize method called"); - final Connection connection = ConnectionFactory.createConnection(conf); + connection = ConnectionFactory.createConnection(conf); + tableName = TableName.valueOf(conf.get(TxConstants.TransactionPruning.PRUNE_STATE_TABLE, + TxConstants.TransactionPruning.DEFAULT_PRUNE_STATE_TABLE)); dataJanitorState = new DataJanitorState(new DataJanitorState.TableSupplier() { @Override public Table get() throws IOException { - return connection.getTable(TableName.valueOf( - conf.get(TxConstants.TransactionPruning.PRUNE_STATE_TABLE, - TxConstants.TransactionPruning.DEFAULT_PRUNE_STATE_TABLE))); + return connection.getTable(tableName); } }); } + public void destroy() throws IOException { + if (connection != null) { + connection.close(); + } + } + + /** + * Returns a set of regions that are live but are not empty nor have a prune upper bound recorded. These regions + * will stop the progress of pruning. + * + * @param numRegions number of regions + * @return {@link Set} of regions that needs to be compacted and flushed + */ + public Set<String> getRegionsToBeCompacted(Integer numRegions) throws IOException { + // Fetch the live regions + Map<Long, SortedSet<String>> latestTimeRegion = getRegionsOnOrBeforeTime(System.currentTimeMillis()); + if (latestTimeRegion.isEmpty()) { + return new HashSet<>(); + } + + Long timestamp = latestTimeRegion.keySet().iterator().next(); + SortedSet<String> liveRegions = latestTimeRegion.get(timestamp); + + SortedSet<byte[]> emptyRegions = dataJanitorState.getEmptyRegionsAfterTime(timestamp, null); + SortedSet<String> emptyRegionNames = new TreeSet<>(); + Iterable<String> regionStrings = Iterables.transform(emptyRegions, TimeRegions.BYTE_ARR_TO_STRING_FN); + for (String regionString : regionStrings) { + emptyRegionNames.add(regionString); + } + + Set<String> nonEmptyRegions = Sets.newHashSet(Sets.difference(liveRegions, emptyRegionNames)); + + // Get all pruned regions and remove them from the nonEmptyRegions, resulting in a set of regions that are + // not empty and have not been registered prune upper bound + Queue<RegionPruneInfo> prunedRegions = getIdleRegions(-1); + for (RegionPruneInfo prunedRegion : prunedRegions) { + if (nonEmptyRegions.contains(prunedRegion.getRegionNameAsString())) { + nonEmptyRegions.remove(prunedRegion.getRegionNameAsString()); + } + } + + if ((numRegions < 0) || (numRegions >= nonEmptyRegions.size())) { + return nonEmptyRegions; + } + + Set<String> subsetRegions = new HashSet<>(numRegions); + for (String regionName : nonEmptyRegions) { + if (subsetRegions.size() == numRegions) { + break; + } + subsetRegions.add(regionName); + } + return subsetRegions; + } + /** * Return a list of RegionPruneInfo. These regions are the ones that have the lowest prune upper bounds. - * If -1 is passed in, all the regions and their prune upper bound will be returned. + * If -1 is passed in, all the regions and their prune upper bound will be returned. Note that only the regions + * that are known to be live will be returned. * * @param numRegions number of regions * @return Map of region name and its prune upper bound @@ -85,10 +147,32 @@ public class InvalidListPruningDebug { return new LinkedList<>(); } + // Create a set with region names + Set<String> pruneRegionNameSet = new HashSet<>(); + for (RegionPruneInfo regionPruneInfo : regionPruneInfos) { + pruneRegionNameSet.add(regionPruneInfo.getRegionNameAsString()); + } + + // Fetch the live regions + Map<Long, SortedSet<String>> latestTimeRegion = getRegionsOnOrBeforeTime(System.currentTimeMillis()); + if (!latestTimeRegion.isEmpty()) { + SortedSet<String> liveRegions = latestTimeRegion.values().iterator().next(); + Set<String> liveRegionsWithPruneInfo = Sets.intersection(liveRegions, pruneRegionNameSet); + List<RegionPruneInfo> liveRegionWithPruneInfoList = new ArrayList<>(); + for (RegionPruneInfo regionPruneInfo : regionPruneInfos) { + if (liveRegionsWithPruneInfo.contains(regionPruneInfo.getRegionNameAsString())) { + liveRegionWithPruneInfoList.add(regionPruneInfo); + } + } + + // Use the subset of live regions and prune regions + regionPruneInfos = liveRegionWithPruneInfoList; + } + if (numRegions < 0) { numRegions = regionPruneInfos.size(); } - + Queue<RegionPruneInfo> lowestPrunes = MinMaxPriorityQueue.orderedBy(new Comparator<RegionPruneInfo>() { @Override public int compare(RegionPruneInfo o1, RegionPruneInfo o2) { @@ -148,6 +232,9 @@ public class InvalidListPruningDebug { "provided as the limit, prune upper bounds of all regions are returned."); pw.println("prune-info region-name-as-string"); pw.println("Desc: Prints out the Pruning information for the region 'region-name-as-string'"); + pw.println("to-compact-regions limit"); + pw.println("Desc: Prints out 'limit' number of regions that are active, but are not empty, " + + "and have not registered a prune upper bound."); } private boolean execute(String[] args) throws IOException { @@ -177,6 +264,11 @@ public class InvalidListPruningDebug { pw.println(String.format("No prune info found for the region %s.", parameter)); } return true; + } else if ("to-compact-regions".equals(command)) { + Integer numRegions = Integer.parseInt(parameter); + Set<String> toBeCompactedRegions = getRegionsToBeCompacted(numRegions); + pw.println(GSON.toJson(toBeCompactedRegions)); + return true; } else { pw.println(String.format("%s is not a valid command.", command)); printUsage(pw); @@ -191,6 +283,7 @@ public class InvalidListPruningDebug { try { pruningDebug.initialize(hConf); boolean success = pruningDebug.execute(args); + pruningDebug.destroy(); if (!success) { System.exit(1); }
