Repository: incubator-tephra Updated Branches: refs/heads/master 6edfa091c -> aeeee00be
(TEPHRA-214) Tool to debug the state and progress of Invalid List Pruning This closes #31 from GitHub. Signed-off-by: Gokul Gunasekaran <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/incubator-tephra/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-tephra/commit/aeeee00b Tree: http://git-wip-us.apache.org/repos/asf/incubator-tephra/tree/aeeee00b Diff: http://git-wip-us.apache.org/repos/asf/incubator-tephra/diff/aeeee00b Branch: refs/heads/master Commit: aeeee00bef25e07257b020087d5cdbe7ed5eac35 Parents: 6edfa09 Author: Gokul Gunasekaran <[email protected]> Authored: Fri Feb 3 12:10:40 2017 -0800 Committer: Gokul Gunasekaran <[email protected]> Committed: Wed Feb 8 00:44:07 2017 -0800 ---------------------------------------------------------------------- .../apache/tephra/txprune/RegionPruneInfo.java | 85 ++++++++ .../tephra/hbase/txprune/DataJanitorState.java | 58 +++++- .../hbase/txprune/InvalidListPruningDebug.java | 201 +++++++++++++++++++ .../tephra/hbase/txprune/DataJanitorState.java | 58 +++++- .../hbase/txprune/InvalidListPruningDebug.java | 201 +++++++++++++++++++ .../tephra/hbase/txprune/DataJanitorState.java | 58 +++++- .../hbase/txprune/InvalidListPruningDebug.java | 201 +++++++++++++++++++ .../tephra/hbase/txprune/DataJanitorState.java | 58 +++++- .../hbase/txprune/InvalidListPruningDebug.java | 201 +++++++++++++++++++ .../tephra/hbase/txprune/DataJanitorState.java | 58 +++++- .../hbase/txprune/InvalidListPruningDebug.java | 201 +++++++++++++++++++ 11 files changed, 1340 insertions(+), 40 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/aeeee00b/tephra-core/src/main/java/org/apache/tephra/txprune/RegionPruneInfo.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/org/apache/tephra/txprune/RegionPruneInfo.java b/tephra-core/src/main/java/org/apache/tephra/txprune/RegionPruneInfo.java new file mode 100644 index 0000000..2fdcf4f --- /dev/null +++ b/tephra-core/src/main/java/org/apache/tephra/txprune/RegionPruneInfo.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tephra.txprune; + +import java.util.Objects; + +/** + * Contains the region id, prune upper bound and prune record timestamp information. + */ +public class RegionPruneInfo { + private transient byte[] regionName; + private final String regionNameAsString; + private final long pruneUpperBound; + private final long pruneRecordTime; + + public RegionPruneInfo(byte[] regionName, String regionNameAsString, long pruneUpperBound, long pruneRecordTime) { + this.regionName = regionName; + this.regionNameAsString = regionNameAsString; + this.pruneUpperBound = pruneUpperBound; + this.pruneRecordTime = pruneRecordTime; + } + + public byte[] getRegionName() { + return regionName; + } + + public String getRegionNameAsString() { + return regionNameAsString; + } + + public long getPruneUpperBound() { + return pruneUpperBound; + } + + public long getPruneRecordTime() { + return pruneRecordTime; + } + + @Override + public int hashCode() { + return Objects.hash(regionName, regionNameAsString, pruneUpperBound, pruneRecordTime); + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (obj == null || getClass() != obj.getClass()) { + return false; + } + + RegionPruneInfo other = (RegionPruneInfo) obj; + return Objects.equals(regionName, other.getRegionName()) + && Objects.equals(regionNameAsString, other.getRegionNameAsString()) + && Objects.equals(pruneUpperBound, other.getPruneUpperBound()) + && Objects.equals(pruneRecordTime, other.getPruneRecordTime()); + } + + @Override + public String toString() { + return "RegionPruneInfo{" + + "regionNameAsString='" + regionNameAsString + '\'' + + ", pruneUpperBound='" + pruneUpperBound + '\'' + + ", pruneRecordTime=" + pruneRecordTime + + '}'; + } +} http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/aeeee00b/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/txprune/DataJanitorState.java ---------------------------------------------------------------------- diff --git a/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/txprune/DataJanitorState.java b/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/txprune/DataJanitorState.java index 5817fe2..979eb1a 100644 --- a/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/txprune/DataJanitorState.java +++ b/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/txprune/DataJanitorState.java @@ -20,6 +20,8 @@ package org.apache.tephra.hbase.txprune; import com.google.common.collect.Maps; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.HTableInterface; @@ -29,8 +31,11 @@ import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.util.Bytes; import org.apache.tephra.hbase.coprocessor.TransactionProcessor; +import org.apache.tephra.txprune.RegionPruneInfo; import java.io.IOException; +import java.util.ArrayList; +import java.util.List; import java.util.Map; import java.util.Set; import java.util.SortedSet; @@ -102,11 +107,30 @@ public class DataJanitorState { * @throws IOException when not able to read the data from HBase */ public long getPruneUpperBoundForRegion(byte[] regionId) throws IOException { + RegionPruneInfo regionPruneInfo = getPruneInfoForRegion(regionId); + return (regionPruneInfo == null) ? -1 : regionPruneInfo.getPruneUpperBound(); + } + + /** + * Get the latest {@link RegionPruneInfo} for a given region. + * + * @param regionId region id + * @return {@link RegionPruneInfo} for the region + * @throws IOException when not able to read the data from HBase + */ + @Nullable + public RegionPruneInfo getPruneInfoForRegion(byte[] regionId) throws IOException { try (HTableInterface stateTable = stateTableSupplier.get()) { Get get = new Get(makeRegionKey(regionId)); get.addColumn(FAMILY, PRUNE_UPPER_BOUND_COL); - byte[] result = stateTable.get(get).getValue(FAMILY, PRUNE_UPPER_BOUND_COL); - return result == null ? -1 : Bytes.toLong(result); + Cell cell = stateTable.get(get).getColumnLatestCell(FAMILY, PRUNE_UPPER_BOUND_COL); + if (cell == null) { + return null; + } + byte[] pruneUpperBoundBytes = CellUtil.cloneValue(cell); + long timestamp = cell.getTimestamp(); + return new RegionPruneInfo(regionId, Bytes.toStringBinary(regionId), + Bytes.toLong(pruneUpperBoundBytes), timestamp); } } @@ -120,6 +144,22 @@ public class DataJanitorState { */ public Map<byte[], Long> getPruneUpperBoundForRegions(SortedSet<byte[]> regions) throws IOException { Map<byte[], Long> resultMap = new TreeMap<>(Bytes.BYTES_COMPARATOR); + List<RegionPruneInfo> regionPruneInfos = getPruneInfoForRegions(regions); + for (RegionPruneInfo regionPruneInfo : regionPruneInfos) { + resultMap.put(regionPruneInfo.getRegionName(), regionPruneInfo.getPruneUpperBound()); + } + return resultMap; + } + + /** + * Gets a list of {@link RegionPruneInfo} for given regions. Returns all regions if the given regions set is null. + * + * @param regions a set of regions + * @return list of {@link RegionPruneInfo}s. + * @throws IOException when not able to read the data from HBase + */ + public List<RegionPruneInfo> getPruneInfoForRegions(@Nullable SortedSet<byte[]> regions) throws IOException { + List<RegionPruneInfo> regionPruneInfos = new ArrayList<>(); try (HTableInterface stateTable = stateTableSupplier.get()) { byte[] startRow = makeRegionKey(EMPTY_BYTE_ARRAY); Scan scan = new Scan(startRow, REGION_KEY_PREFIX_STOP); @@ -129,17 +169,19 @@ public class DataJanitorState { Result next; while ((next = scanner.next()) != null) { byte[] region = getRegionFromKey(next.getRow()); - if (regions.contains(region)) { - byte[] timeBytes = next.getValue(FAMILY, PRUNE_UPPER_BOUND_COL); - if (timeBytes != null) { - long pruneUpperBoundRegion = Bytes.toLong(timeBytes); - resultMap.put(region, pruneUpperBoundRegion); + if (regions == null || regions.contains(region)) { + Cell cell = next.getColumnLatestCell(FAMILY, PRUNE_UPPER_BOUND_COL); + if (cell != null) { + byte[] pruneUpperBoundBytes = CellUtil.cloneValue(cell); + long timestamp = cell.getTimestamp(); + regionPruneInfos.add(new RegionPruneInfo(region, Bytes.toStringBinary(region), + Bytes.toLong(pruneUpperBoundBytes), timestamp)); } } } } - return resultMap; } + return regionPruneInfos; } /** http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/aeeee00b/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/txprune/InvalidListPruningDebug.java ---------------------------------------------------------------------- diff --git a/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/txprune/InvalidListPruningDebug.java b/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/txprune/InvalidListPruningDebug.java new file mode 100644 index 0000000..d48e48d --- /dev/null +++ b/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/txprune/InvalidListPruningDebug.java @@ -0,0 +1,201 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tephra.hbase.txprune; + +import com.google.common.collect.Iterables; +import com.google.common.collect.MinMaxPriorityQueue; +import com.google.gson.Gson; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.client.HConnection; +import org.apache.hadoop.hbase.client.HTableInterface; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.tephra.TxConstants; +import org.apache.tephra.txprune.RegionPruneInfo; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.PrintWriter; +import java.util.Comparator; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Queue; +import java.util.SortedSet; +import java.util.TreeSet; +import javax.annotation.Nullable; + +/** + * Invalid List Pruning Debug Tool. + */ +public class InvalidListPruningDebug { + private static final Logger LOG = LoggerFactory.getLogger(InvalidListPruningDebug.class); + private static final Gson GSON = new Gson(); + private DataJanitorState dataJanitorState; + + /** + * Initialize the Invalid List Debug Tool. + * @param conf {@link Configuration} + * @throws IOException + */ + public void initialize(final Configuration conf) throws IOException { + LOG.debug("InvalidListPruningDebugMain : initialize method called"); + final HConnection connection = new HBaseAdmin(conf).getConnection(); + dataJanitorState = new DataJanitorState(new DataJanitorState.TableSupplier() { + @Override + public HTableInterface get() throws IOException { + return connection.getTable(TableName.valueOf( + conf.get(TxConstants.TransactionPruning.PRUNE_STATE_TABLE, + TxConstants.TransactionPruning.DEFAULT_PRUNE_STATE_TABLE))); + } + }); + } + + /** + * Return a list of RegionPruneInfo. These regions are the ones that have the lowest prune upper bounds. + * If -1 is passed in, all the regions and their prune upper bound will be returned. + * + * @param numRegions number of regions + * @return Map of region name and its prune upper bound + */ + public Queue<RegionPruneInfo> getIdleRegions(Integer numRegions) throws IOException { + List<RegionPruneInfo> regionPruneInfos = dataJanitorState.getPruneInfoForRegions(null); + if (regionPruneInfos.isEmpty()) { + return new LinkedList<>(); + } + + if (numRegions < 0) { + numRegions = regionPruneInfos.size(); + } + + Queue<RegionPruneInfo> lowestPrunes = MinMaxPriorityQueue.orderedBy(new Comparator<RegionPruneInfo>() { + @Override + public int compare(RegionPruneInfo o1, RegionPruneInfo o2) { + return (int) (o1.getPruneUpperBound() - o2.getPruneUpperBound()); + } + }).maximumSize(numRegions).create(); + + for (RegionPruneInfo pruneInfo : regionPruneInfos) { + lowestPrunes.add(pruneInfo); + } + return lowestPrunes; + } + + /** + * Return the prune upper bound value of a given region. If no prune upper bound has been written for this region yet, + * it will return a null. + * + * @param regionId region id + * @return {@link RegionPruneInfo} of the region + * @throws IOException if there are any errors while trying to fetch the {@link RegionPruneInfo} + */ + @Nullable + public RegionPruneInfo getRegionPruneInfo(String regionId) throws IOException { + return dataJanitorState.getPruneInfoForRegion(Bytes.toBytesBinary(regionId)); + } + + /** + * + * @param time Given a time, provide the {@link TimeRegions} at or before that time + * @return transactional regions that are present at or before the given time + * @throws IOException if there are any errors while trying to fetch the {@link TimeRegions} + */ + public Map<Long, SortedSet<String>> getRegionsOnOrBeforeTime(Long time) throws IOException { + Map<Long, SortedSet<String>> regionMap = new HashMap<>(); + TimeRegions timeRegions = dataJanitorState.getRegionsOnOrBeforeTime(time); + if (timeRegions == null) { + return regionMap; + } + SortedSet<String> regionNames = new TreeSet<>(); + Iterable<String> regionStrings = Iterables.transform(timeRegions.getRegions(), TimeRegions.BYTE_ARR_TO_STRING_FN); + for (String regionString : regionStrings) { + regionNames.add(regionString); + } + regionMap.put(timeRegions.getTime(), regionNames); + return regionMap; + } + + private void printUsage(PrintWriter pw) { + pw.println("Usage : org.apache.tephra.hbase.txprune.InvalidListPruning <command> <parameter>"); + pw.println("Available commands, corresponding parameters are:"); + pw.println("****************************************************"); + pw.println("time-region ts"); + pw.println("Desc: Prints out the transactional regions present in HBase at time 'ts' (in milliseconds) " + + "or the latest time before time 'ts'."); + pw.println("idle-regions limit"); + pw.println("Desc: Prints out 'limit' number of regions which has the lowest prune upper bounds. If '-1' is " + + "provided as the limit, prune upper bounds of all regions are returned."); + pw.println("prune-info region-name-as-string"); + pw.println("Desc: Prints out the Pruning information for the region 'region-name-as-string'"); + } + + private boolean execute(String[] args) throws IOException { + try (PrintWriter pw = new PrintWriter(System.out)) { + if (args.length != 2) { + printUsage(pw); + return false; + } + + String command = args[0]; + String parameter = args[1]; + if ("time-region".equals(command)) { + Long time = Long.parseLong(parameter); + Map<Long, SortedSet<String>> timeRegion = getRegionsOnOrBeforeTime(time); + pw.println(GSON.toJson(timeRegion)); + return true; + } else if ("idle-regions".equals(command)) { + Integer numRegions = Integer.parseInt(parameter); + Queue<RegionPruneInfo> regionPruneInfos = getIdleRegions(numRegions); + pw.println(GSON.toJson(regionPruneInfos)); + return true; + } else if ("prune-info".equals(command)) { + RegionPruneInfo regionPruneInfo = getRegionPruneInfo(parameter); + if (regionPruneInfo != null) { + pw.println(GSON.toJson(regionPruneInfo)); + } else { + pw.println(String.format("No prune info found for the region %s.", parameter)); + } + return true; + } else { + pw.println(String.format("%s is not a valid command.", command)); + printUsage(pw); + return false; + } + } + } + + public static void main(String[] args) { + Configuration hConf = HBaseConfiguration.create(); + InvalidListPruningDebug pruningDebug = new InvalidListPruningDebug(); + try { + pruningDebug.initialize(hConf); + boolean success = pruningDebug.execute(args); + if (!success) { + System.exit(1); + } + } catch (IOException ex) { + LOG.error("Received an exception while trying to execute the debug tool. ", ex); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/aeeee00b/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/txprune/DataJanitorState.java ---------------------------------------------------------------------- diff --git a/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/txprune/DataJanitorState.java b/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/txprune/DataJanitorState.java index 5817fe2..979eb1a 100644 --- a/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/txprune/DataJanitorState.java +++ b/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/txprune/DataJanitorState.java @@ -20,6 +20,8 @@ package org.apache.tephra.hbase.txprune; import com.google.common.collect.Maps; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.HTableInterface; @@ -29,8 +31,11 @@ import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.util.Bytes; import org.apache.tephra.hbase.coprocessor.TransactionProcessor; +import org.apache.tephra.txprune.RegionPruneInfo; import java.io.IOException; +import java.util.ArrayList; +import java.util.List; import java.util.Map; import java.util.Set; import java.util.SortedSet; @@ -102,11 +107,30 @@ public class DataJanitorState { * @throws IOException when not able to read the data from HBase */ public long getPruneUpperBoundForRegion(byte[] regionId) throws IOException { + RegionPruneInfo regionPruneInfo = getPruneInfoForRegion(regionId); + return (regionPruneInfo == null) ? -1 : regionPruneInfo.getPruneUpperBound(); + } + + /** + * Get the latest {@link RegionPruneInfo} for a given region. + * + * @param regionId region id + * @return {@link RegionPruneInfo} for the region + * @throws IOException when not able to read the data from HBase + */ + @Nullable + public RegionPruneInfo getPruneInfoForRegion(byte[] regionId) throws IOException { try (HTableInterface stateTable = stateTableSupplier.get()) { Get get = new Get(makeRegionKey(regionId)); get.addColumn(FAMILY, PRUNE_UPPER_BOUND_COL); - byte[] result = stateTable.get(get).getValue(FAMILY, PRUNE_UPPER_BOUND_COL); - return result == null ? -1 : Bytes.toLong(result); + Cell cell = stateTable.get(get).getColumnLatestCell(FAMILY, PRUNE_UPPER_BOUND_COL); + if (cell == null) { + return null; + } + byte[] pruneUpperBoundBytes = CellUtil.cloneValue(cell); + long timestamp = cell.getTimestamp(); + return new RegionPruneInfo(regionId, Bytes.toStringBinary(regionId), + Bytes.toLong(pruneUpperBoundBytes), timestamp); } } @@ -120,6 +144,22 @@ public class DataJanitorState { */ public Map<byte[], Long> getPruneUpperBoundForRegions(SortedSet<byte[]> regions) throws IOException { Map<byte[], Long> resultMap = new TreeMap<>(Bytes.BYTES_COMPARATOR); + List<RegionPruneInfo> regionPruneInfos = getPruneInfoForRegions(regions); + for (RegionPruneInfo regionPruneInfo : regionPruneInfos) { + resultMap.put(regionPruneInfo.getRegionName(), regionPruneInfo.getPruneUpperBound()); + } + return resultMap; + } + + /** + * Gets a list of {@link RegionPruneInfo} for given regions. Returns all regions if the given regions set is null. + * + * @param regions a set of regions + * @return list of {@link RegionPruneInfo}s. + * @throws IOException when not able to read the data from HBase + */ + public List<RegionPruneInfo> getPruneInfoForRegions(@Nullable SortedSet<byte[]> regions) throws IOException { + List<RegionPruneInfo> regionPruneInfos = new ArrayList<>(); try (HTableInterface stateTable = stateTableSupplier.get()) { byte[] startRow = makeRegionKey(EMPTY_BYTE_ARRAY); Scan scan = new Scan(startRow, REGION_KEY_PREFIX_STOP); @@ -129,17 +169,19 @@ public class DataJanitorState { Result next; while ((next = scanner.next()) != null) { byte[] region = getRegionFromKey(next.getRow()); - if (regions.contains(region)) { - byte[] timeBytes = next.getValue(FAMILY, PRUNE_UPPER_BOUND_COL); - if (timeBytes != null) { - long pruneUpperBoundRegion = Bytes.toLong(timeBytes); - resultMap.put(region, pruneUpperBoundRegion); + if (regions == null || regions.contains(region)) { + Cell cell = next.getColumnLatestCell(FAMILY, PRUNE_UPPER_BOUND_COL); + if (cell != null) { + byte[] pruneUpperBoundBytes = CellUtil.cloneValue(cell); + long timestamp = cell.getTimestamp(); + regionPruneInfos.add(new RegionPruneInfo(region, Bytes.toStringBinary(region), + Bytes.toLong(pruneUpperBoundBytes), timestamp)); } } } } - return resultMap; } + return regionPruneInfos; } /** http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/aeeee00b/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/txprune/InvalidListPruningDebug.java ---------------------------------------------------------------------- diff --git a/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/txprune/InvalidListPruningDebug.java b/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/txprune/InvalidListPruningDebug.java new file mode 100644 index 0000000..d48e48d --- /dev/null +++ b/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/txprune/InvalidListPruningDebug.java @@ -0,0 +1,201 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tephra.hbase.txprune; + +import com.google.common.collect.Iterables; +import com.google.common.collect.MinMaxPriorityQueue; +import com.google.gson.Gson; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.client.HConnection; +import org.apache.hadoop.hbase.client.HTableInterface; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.tephra.TxConstants; +import org.apache.tephra.txprune.RegionPruneInfo; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.PrintWriter; +import java.util.Comparator; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Queue; +import java.util.SortedSet; +import java.util.TreeSet; +import javax.annotation.Nullable; + +/** + * Invalid List Pruning Debug Tool. + */ +public class InvalidListPruningDebug { + private static final Logger LOG = LoggerFactory.getLogger(InvalidListPruningDebug.class); + private static final Gson GSON = new Gson(); + private DataJanitorState dataJanitorState; + + /** + * Initialize the Invalid List Debug Tool. + * @param conf {@link Configuration} + * @throws IOException + */ + public void initialize(final Configuration conf) throws IOException { + LOG.debug("InvalidListPruningDebugMain : initialize method called"); + final HConnection connection = new HBaseAdmin(conf).getConnection(); + dataJanitorState = new DataJanitorState(new DataJanitorState.TableSupplier() { + @Override + public HTableInterface get() throws IOException { + return connection.getTable(TableName.valueOf( + conf.get(TxConstants.TransactionPruning.PRUNE_STATE_TABLE, + TxConstants.TransactionPruning.DEFAULT_PRUNE_STATE_TABLE))); + } + }); + } + + /** + * Return a list of RegionPruneInfo. These regions are the ones that have the lowest prune upper bounds. + * If -1 is passed in, all the regions and their prune upper bound will be returned. + * + * @param numRegions number of regions + * @return Map of region name and its prune upper bound + */ + public Queue<RegionPruneInfo> getIdleRegions(Integer numRegions) throws IOException { + List<RegionPruneInfo> regionPruneInfos = dataJanitorState.getPruneInfoForRegions(null); + if (regionPruneInfos.isEmpty()) { + return new LinkedList<>(); + } + + if (numRegions < 0) { + numRegions = regionPruneInfos.size(); + } + + Queue<RegionPruneInfo> lowestPrunes = MinMaxPriorityQueue.orderedBy(new Comparator<RegionPruneInfo>() { + @Override + public int compare(RegionPruneInfo o1, RegionPruneInfo o2) { + return (int) (o1.getPruneUpperBound() - o2.getPruneUpperBound()); + } + }).maximumSize(numRegions).create(); + + for (RegionPruneInfo pruneInfo : regionPruneInfos) { + lowestPrunes.add(pruneInfo); + } + return lowestPrunes; + } + + /** + * Return the prune upper bound value of a given region. If no prune upper bound has been written for this region yet, + * it will return a null. + * + * @param regionId region id + * @return {@link RegionPruneInfo} of the region + * @throws IOException if there are any errors while trying to fetch the {@link RegionPruneInfo} + */ + @Nullable + public RegionPruneInfo getRegionPruneInfo(String regionId) throws IOException { + return dataJanitorState.getPruneInfoForRegion(Bytes.toBytesBinary(regionId)); + } + + /** + * + * @param time Given a time, provide the {@link TimeRegions} at or before that time + * @return transactional regions that are present at or before the given time + * @throws IOException if there are any errors while trying to fetch the {@link TimeRegions} + */ + public Map<Long, SortedSet<String>> getRegionsOnOrBeforeTime(Long time) throws IOException { + Map<Long, SortedSet<String>> regionMap = new HashMap<>(); + TimeRegions timeRegions = dataJanitorState.getRegionsOnOrBeforeTime(time); + if (timeRegions == null) { + return regionMap; + } + SortedSet<String> regionNames = new TreeSet<>(); + Iterable<String> regionStrings = Iterables.transform(timeRegions.getRegions(), TimeRegions.BYTE_ARR_TO_STRING_FN); + for (String regionString : regionStrings) { + regionNames.add(regionString); + } + regionMap.put(timeRegions.getTime(), regionNames); + return regionMap; + } + + private void printUsage(PrintWriter pw) { + pw.println("Usage : org.apache.tephra.hbase.txprune.InvalidListPruning <command> <parameter>"); + pw.println("Available commands, corresponding parameters are:"); + pw.println("****************************************************"); + pw.println("time-region ts"); + pw.println("Desc: Prints out the transactional regions present in HBase at time 'ts' (in milliseconds) " + + "or the latest time before time 'ts'."); + pw.println("idle-regions limit"); + pw.println("Desc: Prints out 'limit' number of regions which has the lowest prune upper bounds. If '-1' is " + + "provided as the limit, prune upper bounds of all regions are returned."); + pw.println("prune-info region-name-as-string"); + pw.println("Desc: Prints out the Pruning information for the region 'region-name-as-string'"); + } + + private boolean execute(String[] args) throws IOException { + try (PrintWriter pw = new PrintWriter(System.out)) { + if (args.length != 2) { + printUsage(pw); + return false; + } + + String command = args[0]; + String parameter = args[1]; + if ("time-region".equals(command)) { + Long time = Long.parseLong(parameter); + Map<Long, SortedSet<String>> timeRegion = getRegionsOnOrBeforeTime(time); + pw.println(GSON.toJson(timeRegion)); + return true; + } else if ("idle-regions".equals(command)) { + Integer numRegions = Integer.parseInt(parameter); + Queue<RegionPruneInfo> regionPruneInfos = getIdleRegions(numRegions); + pw.println(GSON.toJson(regionPruneInfos)); + return true; + } else if ("prune-info".equals(command)) { + RegionPruneInfo regionPruneInfo = getRegionPruneInfo(parameter); + if (regionPruneInfo != null) { + pw.println(GSON.toJson(regionPruneInfo)); + } else { + pw.println(String.format("No prune info found for the region %s.", parameter)); + } + return true; + } else { + pw.println(String.format("%s is not a valid command.", command)); + printUsage(pw); + return false; + } + } + } + + public static void main(String[] args) { + Configuration hConf = HBaseConfiguration.create(); + InvalidListPruningDebug pruningDebug = new InvalidListPruningDebug(); + try { + pruningDebug.initialize(hConf); + boolean success = pruningDebug.execute(args); + if (!success) { + System.exit(1); + } + } catch (IOException ex) { + LOG.error("Received an exception while trying to execute the debug tool. ", ex); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/aeeee00b/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/txprune/DataJanitorState.java ---------------------------------------------------------------------- diff --git a/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/txprune/DataJanitorState.java b/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/txprune/DataJanitorState.java index 51dc181..897e00e 100644 --- a/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/txprune/DataJanitorState.java +++ b/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/txprune/DataJanitorState.java @@ -20,6 +20,8 @@ package org.apache.tephra.hbase.txprune; import com.google.common.collect.Maps; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Put; @@ -29,8 +31,11 @@ import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.util.Bytes; import org.apache.tephra.hbase.coprocessor.TransactionProcessor; +import org.apache.tephra.txprune.RegionPruneInfo; import java.io.IOException; +import java.util.ArrayList; +import java.util.List; import java.util.Map; import java.util.Set; import java.util.SortedSet; @@ -102,11 +107,30 @@ public class DataJanitorState { * @throws IOException when not able to read the data from HBase */ public long getPruneUpperBoundForRegion(byte[] regionId) throws IOException { + RegionPruneInfo regionPruneInfo = getPruneInfoForRegion(regionId); + return (regionPruneInfo == null) ? -1 : regionPruneInfo.getPruneUpperBound(); + } + + /** + * Get the latest {@link RegionPruneInfo} for a given region. + * + * @param regionId region id + * @return {@link RegionPruneInfo} for the region + * @throws IOException when not able to read the data from HBase + */ + @Nullable + public RegionPruneInfo getPruneInfoForRegion(byte[] regionId) throws IOException { try (Table stateTable = stateTableSupplier.get()) { Get get = new Get(makeRegionKey(regionId)); get.addColumn(FAMILY, PRUNE_UPPER_BOUND_COL); - byte[] result = stateTable.get(get).getValue(FAMILY, PRUNE_UPPER_BOUND_COL); - return result == null ? -1 : Bytes.toLong(result); + Cell cell = stateTable.get(get).getColumnLatestCell(FAMILY, PRUNE_UPPER_BOUND_COL); + if (cell == null) { + return null; + } + byte[] pruneUpperBoundBytes = CellUtil.cloneValue(cell); + long timestamp = cell.getTimestamp(); + return new RegionPruneInfo(regionId, Bytes.toStringBinary(regionId), + Bytes.toLong(pruneUpperBoundBytes), timestamp); } } @@ -120,6 +144,22 @@ public class DataJanitorState { */ public Map<byte[], Long> getPruneUpperBoundForRegions(SortedSet<byte[]> regions) throws IOException { Map<byte[], Long> resultMap = new TreeMap<>(Bytes.BYTES_COMPARATOR); + List<RegionPruneInfo> regionPruneInfos = getPruneInfoForRegions(regions); + for (RegionPruneInfo regionPruneInfo : regionPruneInfos) { + resultMap.put(regionPruneInfo.getRegionName(), regionPruneInfo.getPruneUpperBound()); + } + return resultMap; + } + + /** + * Gets a list of {@link RegionPruneInfo} for given regions. Returns all regions if the given regions set is null. + * + * @param regions a set of regions + * @return list of {@link RegionPruneInfo}s. + * @throws IOException when not able to read the data from HBase + */ + public List<RegionPruneInfo> getPruneInfoForRegions(@Nullable SortedSet<byte[]> regions) throws IOException { + List<RegionPruneInfo> regionPruneInfos = new ArrayList<>(); try (Table stateTable = stateTableSupplier.get()) { byte[] startRow = makeRegionKey(EMPTY_BYTE_ARRAY); Scan scan = new Scan(startRow, REGION_KEY_PREFIX_STOP); @@ -129,17 +169,19 @@ public class DataJanitorState { Result next; while ((next = scanner.next()) != null) { byte[] region = getRegionFromKey(next.getRow()); - if (regions.contains(region)) { - byte[] timeBytes = next.getValue(FAMILY, PRUNE_UPPER_BOUND_COL); - if (timeBytes != null) { - long pruneUpperBoundRegion = Bytes.toLong(timeBytes); - resultMap.put(region, pruneUpperBoundRegion); + if (regions == null || regions.contains(region)) { + Cell cell = next.getColumnLatestCell(FAMILY, PRUNE_UPPER_BOUND_COL); + if (cell != null) { + byte[] pruneUpperBoundBytes = CellUtil.cloneValue(cell); + long timestamp = cell.getTimestamp(); + regionPruneInfos.add(new RegionPruneInfo(region, Bytes.toStringBinary(region), + Bytes.toLong(pruneUpperBoundBytes), timestamp)); } } } } - return resultMap; } + return regionPruneInfos; } /** http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/aeeee00b/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/txprune/InvalidListPruningDebug.java ---------------------------------------------------------------------- diff --git a/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/txprune/InvalidListPruningDebug.java b/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/txprune/InvalidListPruningDebug.java new file mode 100644 index 0000000..e748d90 --- /dev/null +++ b/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/txprune/InvalidListPruningDebug.java @@ -0,0 +1,201 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tephra.hbase.txprune; + +import com.google.common.collect.Iterables; +import com.google.common.collect.MinMaxPriorityQueue; +import com.google.gson.Gson; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.tephra.TxConstants; +import org.apache.tephra.txprune.RegionPruneInfo; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.PrintWriter; +import java.util.Comparator; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Queue; +import java.util.SortedSet; +import java.util.TreeSet; +import javax.annotation.Nullable; + +/** + * Invalid List Pruning Debug Tool. + */ +public class InvalidListPruningDebug { + private static final Logger LOG = LoggerFactory.getLogger(InvalidListPruningDebug.class); + private static final Gson GSON = new Gson(); + private DataJanitorState dataJanitorState; + + /** + * Initialize the Invalid List Debug Tool. + * @param conf {@link Configuration} + * @throws IOException + */ + public void initialize(final Configuration conf) throws IOException { + LOG.debug("InvalidListPruningDebugMain : initialize method called"); + final Connection connection = ConnectionFactory.createConnection(conf); + dataJanitorState = new DataJanitorState(new DataJanitorState.TableSupplier() { + @Override + public Table get() throws IOException { + return connection.getTable(TableName.valueOf( + conf.get(TxConstants.TransactionPruning.PRUNE_STATE_TABLE, + TxConstants.TransactionPruning.DEFAULT_PRUNE_STATE_TABLE))); + } + }); + } + + /** + * Return a list of RegionPruneInfo. These regions are the ones that have the lowest prune upper bounds. + * If -1 is passed in, all the regions and their prune upper bound will be returned. + * + * @param numRegions number of regions + * @return Map of region name and its prune upper bound + */ + public Queue<RegionPruneInfo> getIdleRegions(Integer numRegions) throws IOException { + List<RegionPruneInfo> regionPruneInfos = dataJanitorState.getPruneInfoForRegions(null); + if (regionPruneInfos.isEmpty()) { + return new LinkedList<>(); + } + + if (numRegions < 0) { + numRegions = regionPruneInfos.size(); + } + + Queue<RegionPruneInfo> lowestPrunes = MinMaxPriorityQueue.orderedBy(new Comparator<RegionPruneInfo>() { + @Override + public int compare(RegionPruneInfo o1, RegionPruneInfo o2) { + return (int) (o1.getPruneUpperBound() - o2.getPruneUpperBound()); + } + }).maximumSize(numRegions).create(); + + for (RegionPruneInfo pruneInfo : regionPruneInfos) { + lowestPrunes.add(pruneInfo); + } + return lowestPrunes; + } + + /** + * Return the prune upper bound value of a given region. If no prune upper bound has been written for this region yet, + * it will return a null. + * + * @param regionId region id + * @return {@link RegionPruneInfo} of the region + * @throws IOException if there are any errors while trying to fetch the {@link RegionPruneInfo} + */ + @Nullable + public RegionPruneInfo getRegionPruneInfo(String regionId) throws IOException { + return dataJanitorState.getPruneInfoForRegion(Bytes.toBytesBinary(regionId)); + } + + /** + * + * @param time Given a time, provide the {@link TimeRegions} at or before that time + * @return transactional regions that are present at or before the given time + * @throws IOException if there are any errors while trying to fetch the {@link TimeRegions} + */ + public Map<Long, SortedSet<String>> getRegionsOnOrBeforeTime(Long time) throws IOException { + Map<Long, SortedSet<String>> regionMap = new HashMap<>(); + TimeRegions timeRegions = dataJanitorState.getRegionsOnOrBeforeTime(time); + if (timeRegions == null) { + return regionMap; + } + SortedSet<String> regionNames = new TreeSet<>(); + Iterable<String> regionStrings = Iterables.transform(timeRegions.getRegions(), TimeRegions.BYTE_ARR_TO_STRING_FN); + for (String regionString : regionStrings) { + regionNames.add(regionString); + } + regionMap.put(timeRegions.getTime(), regionNames); + return regionMap; + } + + private void printUsage(PrintWriter pw) { + pw.println("Usage : org.apache.tephra.hbase.txprune.InvalidListPruning <command> <parameter>"); + pw.println("Available commands, corresponding parameters are:"); + pw.println("****************************************************"); + pw.println("time-region ts"); + pw.println("Desc: Prints out the transactional regions present in HBase at time 'ts' (in milliseconds) " + + "or the latest time before time 'ts'."); + pw.println("idle-regions limit"); + pw.println("Desc: Prints out 'limit' number of regions which has the lowest prune upper bounds. If '-1' is " + + "provided as the limit, prune upper bounds of all regions are returned."); + pw.println("prune-info region-name-as-string"); + pw.println("Desc: Prints out the Pruning information for the region 'region-name-as-string'"); + } + + private boolean execute(String[] args) throws IOException { + try (PrintWriter pw = new PrintWriter(System.out)) { + if (args.length != 2) { + printUsage(pw); + return false; + } + + String command = args[0]; + String parameter = args[1]; + if ("time-region".equals(command)) { + Long time = Long.parseLong(parameter); + Map<Long, SortedSet<String>> timeRegion = getRegionsOnOrBeforeTime(time); + pw.println(GSON.toJson(timeRegion)); + return true; + } else if ("idle-regions".equals(command)) { + Integer numRegions = Integer.parseInt(parameter); + Queue<RegionPruneInfo> regionPruneInfos = getIdleRegions(numRegions); + pw.println(GSON.toJson(regionPruneInfos)); + return true; + } else if ("prune-info".equals(command)) { + RegionPruneInfo regionPruneInfo = getRegionPruneInfo(parameter); + if (regionPruneInfo != null) { + pw.println(GSON.toJson(regionPruneInfo)); + } else { + pw.println(String.format("No prune info found for the region %s.", parameter)); + } + return true; + } else { + pw.println(String.format("%s is not a valid command.", command)); + printUsage(pw); + return false; + } + } + } + + public static void main(String[] args) { + Configuration hConf = HBaseConfiguration.create(); + InvalidListPruningDebug pruningDebug = new InvalidListPruningDebug(); + try { + pruningDebug.initialize(hConf); + boolean success = pruningDebug.execute(args); + if (!success) { + System.exit(1); + } + } catch (IOException ex) { + LOG.error("Received an exception while trying to execute the debug tool. ", ex); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/aeeee00b/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/txprune/DataJanitorState.java ---------------------------------------------------------------------- diff --git a/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/txprune/DataJanitorState.java b/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/txprune/DataJanitorState.java index 51dc181..897e00e 100644 --- a/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/txprune/DataJanitorState.java +++ b/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/txprune/DataJanitorState.java @@ -20,6 +20,8 @@ package org.apache.tephra.hbase.txprune; import com.google.common.collect.Maps; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Put; @@ -29,8 +31,11 @@ import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.util.Bytes; import org.apache.tephra.hbase.coprocessor.TransactionProcessor; +import org.apache.tephra.txprune.RegionPruneInfo; import java.io.IOException; +import java.util.ArrayList; +import java.util.List; import java.util.Map; import java.util.Set; import java.util.SortedSet; @@ -102,11 +107,30 @@ public class DataJanitorState { * @throws IOException when not able to read the data from HBase */ public long getPruneUpperBoundForRegion(byte[] regionId) throws IOException { + RegionPruneInfo regionPruneInfo = getPruneInfoForRegion(regionId); + return (regionPruneInfo == null) ? -1 : regionPruneInfo.getPruneUpperBound(); + } + + /** + * Get the latest {@link RegionPruneInfo} for a given region. + * + * @param regionId region id + * @return {@link RegionPruneInfo} for the region + * @throws IOException when not able to read the data from HBase + */ + @Nullable + public RegionPruneInfo getPruneInfoForRegion(byte[] regionId) throws IOException { try (Table stateTable = stateTableSupplier.get()) { Get get = new Get(makeRegionKey(regionId)); get.addColumn(FAMILY, PRUNE_UPPER_BOUND_COL); - byte[] result = stateTable.get(get).getValue(FAMILY, PRUNE_UPPER_BOUND_COL); - return result == null ? -1 : Bytes.toLong(result); + Cell cell = stateTable.get(get).getColumnLatestCell(FAMILY, PRUNE_UPPER_BOUND_COL); + if (cell == null) { + return null; + } + byte[] pruneUpperBoundBytes = CellUtil.cloneValue(cell); + long timestamp = cell.getTimestamp(); + return new RegionPruneInfo(regionId, Bytes.toStringBinary(regionId), + Bytes.toLong(pruneUpperBoundBytes), timestamp); } } @@ -120,6 +144,22 @@ public class DataJanitorState { */ public Map<byte[], Long> getPruneUpperBoundForRegions(SortedSet<byte[]> regions) throws IOException { Map<byte[], Long> resultMap = new TreeMap<>(Bytes.BYTES_COMPARATOR); + List<RegionPruneInfo> regionPruneInfos = getPruneInfoForRegions(regions); + for (RegionPruneInfo regionPruneInfo : regionPruneInfos) { + resultMap.put(regionPruneInfo.getRegionName(), regionPruneInfo.getPruneUpperBound()); + } + return resultMap; + } + + /** + * Gets a list of {@link RegionPruneInfo} for given regions. Returns all regions if the given regions set is null. + * + * @param regions a set of regions + * @return list of {@link RegionPruneInfo}s. + * @throws IOException when not able to read the data from HBase + */ + public List<RegionPruneInfo> getPruneInfoForRegions(@Nullable SortedSet<byte[]> regions) throws IOException { + List<RegionPruneInfo> regionPruneInfos = new ArrayList<>(); try (Table stateTable = stateTableSupplier.get()) { byte[] startRow = makeRegionKey(EMPTY_BYTE_ARRAY); Scan scan = new Scan(startRow, REGION_KEY_PREFIX_STOP); @@ -129,17 +169,19 @@ public class DataJanitorState { Result next; while ((next = scanner.next()) != null) { byte[] region = getRegionFromKey(next.getRow()); - if (regions.contains(region)) { - byte[] timeBytes = next.getValue(FAMILY, PRUNE_UPPER_BOUND_COL); - if (timeBytes != null) { - long pruneUpperBoundRegion = Bytes.toLong(timeBytes); - resultMap.put(region, pruneUpperBoundRegion); + if (regions == null || regions.contains(region)) { + Cell cell = next.getColumnLatestCell(FAMILY, PRUNE_UPPER_BOUND_COL); + if (cell != null) { + byte[] pruneUpperBoundBytes = CellUtil.cloneValue(cell); + long timestamp = cell.getTimestamp(); + regionPruneInfos.add(new RegionPruneInfo(region, Bytes.toStringBinary(region), + Bytes.toLong(pruneUpperBoundBytes), timestamp)); } } } } - return resultMap; } + return regionPruneInfos; } /** http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/aeeee00b/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/txprune/InvalidListPruningDebug.java ---------------------------------------------------------------------- diff --git a/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/txprune/InvalidListPruningDebug.java b/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/txprune/InvalidListPruningDebug.java new file mode 100644 index 0000000..e748d90 --- /dev/null +++ b/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/txprune/InvalidListPruningDebug.java @@ -0,0 +1,201 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tephra.hbase.txprune; + +import com.google.common.collect.Iterables; +import com.google.common.collect.MinMaxPriorityQueue; +import com.google.gson.Gson; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.tephra.TxConstants; +import org.apache.tephra.txprune.RegionPruneInfo; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.PrintWriter; +import java.util.Comparator; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Queue; +import java.util.SortedSet; +import java.util.TreeSet; +import javax.annotation.Nullable; + +/** + * Invalid List Pruning Debug Tool. + */ +public class InvalidListPruningDebug { + private static final Logger LOG = LoggerFactory.getLogger(InvalidListPruningDebug.class); + private static final Gson GSON = new Gson(); + private DataJanitorState dataJanitorState; + + /** + * Initialize the Invalid List Debug Tool. + * @param conf {@link Configuration} + * @throws IOException + */ + public void initialize(final Configuration conf) throws IOException { + LOG.debug("InvalidListPruningDebugMain : initialize method called"); + final Connection connection = ConnectionFactory.createConnection(conf); + dataJanitorState = new DataJanitorState(new DataJanitorState.TableSupplier() { + @Override + public Table get() throws IOException { + return connection.getTable(TableName.valueOf( + conf.get(TxConstants.TransactionPruning.PRUNE_STATE_TABLE, + TxConstants.TransactionPruning.DEFAULT_PRUNE_STATE_TABLE))); + } + }); + } + + /** + * Return a list of RegionPruneInfo. These regions are the ones that have the lowest prune upper bounds. + * If -1 is passed in, all the regions and their prune upper bound will be returned. + * + * @param numRegions number of regions + * @return Map of region name and its prune upper bound + */ + public Queue<RegionPruneInfo> getIdleRegions(Integer numRegions) throws IOException { + List<RegionPruneInfo> regionPruneInfos = dataJanitorState.getPruneInfoForRegions(null); + if (regionPruneInfos.isEmpty()) { + return new LinkedList<>(); + } + + if (numRegions < 0) { + numRegions = regionPruneInfos.size(); + } + + Queue<RegionPruneInfo> lowestPrunes = MinMaxPriorityQueue.orderedBy(new Comparator<RegionPruneInfo>() { + @Override + public int compare(RegionPruneInfo o1, RegionPruneInfo o2) { + return (int) (o1.getPruneUpperBound() - o2.getPruneUpperBound()); + } + }).maximumSize(numRegions).create(); + + for (RegionPruneInfo pruneInfo : regionPruneInfos) { + lowestPrunes.add(pruneInfo); + } + return lowestPrunes; + } + + /** + * Return the prune upper bound value of a given region. If no prune upper bound has been written for this region yet, + * it will return a null. + * + * @param regionId region id + * @return {@link RegionPruneInfo} of the region + * @throws IOException if there are any errors while trying to fetch the {@link RegionPruneInfo} + */ + @Nullable + public RegionPruneInfo getRegionPruneInfo(String regionId) throws IOException { + return dataJanitorState.getPruneInfoForRegion(Bytes.toBytesBinary(regionId)); + } + + /** + * + * @param time Given a time, provide the {@link TimeRegions} at or before that time + * @return transactional regions that are present at or before the given time + * @throws IOException if there are any errors while trying to fetch the {@link TimeRegions} + */ + public Map<Long, SortedSet<String>> getRegionsOnOrBeforeTime(Long time) throws IOException { + Map<Long, SortedSet<String>> regionMap = new HashMap<>(); + TimeRegions timeRegions = dataJanitorState.getRegionsOnOrBeforeTime(time); + if (timeRegions == null) { + return regionMap; + } + SortedSet<String> regionNames = new TreeSet<>(); + Iterable<String> regionStrings = Iterables.transform(timeRegions.getRegions(), TimeRegions.BYTE_ARR_TO_STRING_FN); + for (String regionString : regionStrings) { + regionNames.add(regionString); + } + regionMap.put(timeRegions.getTime(), regionNames); + return regionMap; + } + + private void printUsage(PrintWriter pw) { + pw.println("Usage : org.apache.tephra.hbase.txprune.InvalidListPruning <command> <parameter>"); + pw.println("Available commands, corresponding parameters are:"); + pw.println("****************************************************"); + pw.println("time-region ts"); + pw.println("Desc: Prints out the transactional regions present in HBase at time 'ts' (in milliseconds) " + + "or the latest time before time 'ts'."); + pw.println("idle-regions limit"); + pw.println("Desc: Prints out 'limit' number of regions which has the lowest prune upper bounds. If '-1' is " + + "provided as the limit, prune upper bounds of all regions are returned."); + pw.println("prune-info region-name-as-string"); + pw.println("Desc: Prints out the Pruning information for the region 'region-name-as-string'"); + } + + private boolean execute(String[] args) throws IOException { + try (PrintWriter pw = new PrintWriter(System.out)) { + if (args.length != 2) { + printUsage(pw); + return false; + } + + String command = args[0]; + String parameter = args[1]; + if ("time-region".equals(command)) { + Long time = Long.parseLong(parameter); + Map<Long, SortedSet<String>> timeRegion = getRegionsOnOrBeforeTime(time); + pw.println(GSON.toJson(timeRegion)); + return true; + } else if ("idle-regions".equals(command)) { + Integer numRegions = Integer.parseInt(parameter); + Queue<RegionPruneInfo> regionPruneInfos = getIdleRegions(numRegions); + pw.println(GSON.toJson(regionPruneInfos)); + return true; + } else if ("prune-info".equals(command)) { + RegionPruneInfo regionPruneInfo = getRegionPruneInfo(parameter); + if (regionPruneInfo != null) { + pw.println(GSON.toJson(regionPruneInfo)); + } else { + pw.println(String.format("No prune info found for the region %s.", parameter)); + } + return true; + } else { + pw.println(String.format("%s is not a valid command.", command)); + printUsage(pw); + return false; + } + } + } + + public static void main(String[] args) { + Configuration hConf = HBaseConfiguration.create(); + InvalidListPruningDebug pruningDebug = new InvalidListPruningDebug(); + try { + pruningDebug.initialize(hConf); + boolean success = pruningDebug.execute(args); + if (!success) { + System.exit(1); + } + } catch (IOException ex) { + LOG.error("Received an exception while trying to execute the debug tool. ", ex); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/aeeee00b/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/DataJanitorState.java ---------------------------------------------------------------------- diff --git a/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/DataJanitorState.java b/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/DataJanitorState.java index 51dc181..897e00e 100644 --- a/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/DataJanitorState.java +++ b/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/DataJanitorState.java @@ -20,6 +20,8 @@ package org.apache.tephra.hbase.txprune; import com.google.common.collect.Maps; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Put; @@ -29,8 +31,11 @@ import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.util.Bytes; import org.apache.tephra.hbase.coprocessor.TransactionProcessor; +import org.apache.tephra.txprune.RegionPruneInfo; import java.io.IOException; +import java.util.ArrayList; +import java.util.List; import java.util.Map; import java.util.Set; import java.util.SortedSet; @@ -102,11 +107,30 @@ public class DataJanitorState { * @throws IOException when not able to read the data from HBase */ public long getPruneUpperBoundForRegion(byte[] regionId) throws IOException { + RegionPruneInfo regionPruneInfo = getPruneInfoForRegion(regionId); + return (regionPruneInfo == null) ? -1 : regionPruneInfo.getPruneUpperBound(); + } + + /** + * Get the latest {@link RegionPruneInfo} for a given region. + * + * @param regionId region id + * @return {@link RegionPruneInfo} for the region + * @throws IOException when not able to read the data from HBase + */ + @Nullable + public RegionPruneInfo getPruneInfoForRegion(byte[] regionId) throws IOException { try (Table stateTable = stateTableSupplier.get()) { Get get = new Get(makeRegionKey(regionId)); get.addColumn(FAMILY, PRUNE_UPPER_BOUND_COL); - byte[] result = stateTable.get(get).getValue(FAMILY, PRUNE_UPPER_BOUND_COL); - return result == null ? -1 : Bytes.toLong(result); + Cell cell = stateTable.get(get).getColumnLatestCell(FAMILY, PRUNE_UPPER_BOUND_COL); + if (cell == null) { + return null; + } + byte[] pruneUpperBoundBytes = CellUtil.cloneValue(cell); + long timestamp = cell.getTimestamp(); + return new RegionPruneInfo(regionId, Bytes.toStringBinary(regionId), + Bytes.toLong(pruneUpperBoundBytes), timestamp); } } @@ -120,6 +144,22 @@ public class DataJanitorState { */ public Map<byte[], Long> getPruneUpperBoundForRegions(SortedSet<byte[]> regions) throws IOException { Map<byte[], Long> resultMap = new TreeMap<>(Bytes.BYTES_COMPARATOR); + List<RegionPruneInfo> regionPruneInfos = getPruneInfoForRegions(regions); + for (RegionPruneInfo regionPruneInfo : regionPruneInfos) { + resultMap.put(regionPruneInfo.getRegionName(), regionPruneInfo.getPruneUpperBound()); + } + return resultMap; + } + + /** + * Gets a list of {@link RegionPruneInfo} for given regions. Returns all regions if the given regions set is null. + * + * @param regions a set of regions + * @return list of {@link RegionPruneInfo}s. + * @throws IOException when not able to read the data from HBase + */ + public List<RegionPruneInfo> getPruneInfoForRegions(@Nullable SortedSet<byte[]> regions) throws IOException { + List<RegionPruneInfo> regionPruneInfos = new ArrayList<>(); try (Table stateTable = stateTableSupplier.get()) { byte[] startRow = makeRegionKey(EMPTY_BYTE_ARRAY); Scan scan = new Scan(startRow, REGION_KEY_PREFIX_STOP); @@ -129,17 +169,19 @@ public class DataJanitorState { Result next; while ((next = scanner.next()) != null) { byte[] region = getRegionFromKey(next.getRow()); - if (regions.contains(region)) { - byte[] timeBytes = next.getValue(FAMILY, PRUNE_UPPER_BOUND_COL); - if (timeBytes != null) { - long pruneUpperBoundRegion = Bytes.toLong(timeBytes); - resultMap.put(region, pruneUpperBoundRegion); + if (regions == null || regions.contains(region)) { + Cell cell = next.getColumnLatestCell(FAMILY, PRUNE_UPPER_BOUND_COL); + if (cell != null) { + byte[] pruneUpperBoundBytes = CellUtil.cloneValue(cell); + long timestamp = cell.getTimestamp(); + regionPruneInfos.add(new RegionPruneInfo(region, Bytes.toStringBinary(region), + Bytes.toLong(pruneUpperBoundBytes), timestamp)); } } } } - return resultMap; } + return regionPruneInfos; } /** http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/aeeee00b/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/InvalidListPruningDebug.java ---------------------------------------------------------------------- diff --git a/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/InvalidListPruningDebug.java b/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/InvalidListPruningDebug.java new file mode 100644 index 0000000..e748d90 --- /dev/null +++ b/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/InvalidListPruningDebug.java @@ -0,0 +1,201 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tephra.hbase.txprune; + +import com.google.common.collect.Iterables; +import com.google.common.collect.MinMaxPriorityQueue; +import com.google.gson.Gson; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.tephra.TxConstants; +import org.apache.tephra.txprune.RegionPruneInfo; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.PrintWriter; +import java.util.Comparator; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Queue; +import java.util.SortedSet; +import java.util.TreeSet; +import javax.annotation.Nullable; + +/** + * Invalid List Pruning Debug Tool. + */ +public class InvalidListPruningDebug { + private static final Logger LOG = LoggerFactory.getLogger(InvalidListPruningDebug.class); + private static final Gson GSON = new Gson(); + private DataJanitorState dataJanitorState; + + /** + * Initialize the Invalid List Debug Tool. + * @param conf {@link Configuration} + * @throws IOException + */ + public void initialize(final Configuration conf) throws IOException { + LOG.debug("InvalidListPruningDebugMain : initialize method called"); + final Connection connection = ConnectionFactory.createConnection(conf); + dataJanitorState = new DataJanitorState(new DataJanitorState.TableSupplier() { + @Override + public Table get() throws IOException { + return connection.getTable(TableName.valueOf( + conf.get(TxConstants.TransactionPruning.PRUNE_STATE_TABLE, + TxConstants.TransactionPruning.DEFAULT_PRUNE_STATE_TABLE))); + } + }); + } + + /** + * Return a list of RegionPruneInfo. These regions are the ones that have the lowest prune upper bounds. + * If -1 is passed in, all the regions and their prune upper bound will be returned. + * + * @param numRegions number of regions + * @return Map of region name and its prune upper bound + */ + public Queue<RegionPruneInfo> getIdleRegions(Integer numRegions) throws IOException { + List<RegionPruneInfo> regionPruneInfos = dataJanitorState.getPruneInfoForRegions(null); + if (regionPruneInfos.isEmpty()) { + return new LinkedList<>(); + } + + if (numRegions < 0) { + numRegions = regionPruneInfos.size(); + } + + Queue<RegionPruneInfo> lowestPrunes = MinMaxPriorityQueue.orderedBy(new Comparator<RegionPruneInfo>() { + @Override + public int compare(RegionPruneInfo o1, RegionPruneInfo o2) { + return (int) (o1.getPruneUpperBound() - o2.getPruneUpperBound()); + } + }).maximumSize(numRegions).create(); + + for (RegionPruneInfo pruneInfo : regionPruneInfos) { + lowestPrunes.add(pruneInfo); + } + return lowestPrunes; + } + + /** + * Return the prune upper bound value of a given region. If no prune upper bound has been written for this region yet, + * it will return a null. + * + * @param regionId region id + * @return {@link RegionPruneInfo} of the region + * @throws IOException if there are any errors while trying to fetch the {@link RegionPruneInfo} + */ + @Nullable + public RegionPruneInfo getRegionPruneInfo(String regionId) throws IOException { + return dataJanitorState.getPruneInfoForRegion(Bytes.toBytesBinary(regionId)); + } + + /** + * + * @param time Given a time, provide the {@link TimeRegions} at or before that time + * @return transactional regions that are present at or before the given time + * @throws IOException if there are any errors while trying to fetch the {@link TimeRegions} + */ + public Map<Long, SortedSet<String>> getRegionsOnOrBeforeTime(Long time) throws IOException { + Map<Long, SortedSet<String>> regionMap = new HashMap<>(); + TimeRegions timeRegions = dataJanitorState.getRegionsOnOrBeforeTime(time); + if (timeRegions == null) { + return regionMap; + } + SortedSet<String> regionNames = new TreeSet<>(); + Iterable<String> regionStrings = Iterables.transform(timeRegions.getRegions(), TimeRegions.BYTE_ARR_TO_STRING_FN); + for (String regionString : regionStrings) { + regionNames.add(regionString); + } + regionMap.put(timeRegions.getTime(), regionNames); + return regionMap; + } + + private void printUsage(PrintWriter pw) { + pw.println("Usage : org.apache.tephra.hbase.txprune.InvalidListPruning <command> <parameter>"); + pw.println("Available commands, corresponding parameters are:"); + pw.println("****************************************************"); + pw.println("time-region ts"); + pw.println("Desc: Prints out the transactional regions present in HBase at time 'ts' (in milliseconds) " + + "or the latest time before time 'ts'."); + pw.println("idle-regions limit"); + pw.println("Desc: Prints out 'limit' number of regions which has the lowest prune upper bounds. If '-1' is " + + "provided as the limit, prune upper bounds of all regions are returned."); + pw.println("prune-info region-name-as-string"); + pw.println("Desc: Prints out the Pruning information for the region 'region-name-as-string'"); + } + + private boolean execute(String[] args) throws IOException { + try (PrintWriter pw = new PrintWriter(System.out)) { + if (args.length != 2) { + printUsage(pw); + return false; + } + + String command = args[0]; + String parameter = args[1]; + if ("time-region".equals(command)) { + Long time = Long.parseLong(parameter); + Map<Long, SortedSet<String>> timeRegion = getRegionsOnOrBeforeTime(time); + pw.println(GSON.toJson(timeRegion)); + return true; + } else if ("idle-regions".equals(command)) { + Integer numRegions = Integer.parseInt(parameter); + Queue<RegionPruneInfo> regionPruneInfos = getIdleRegions(numRegions); + pw.println(GSON.toJson(regionPruneInfos)); + return true; + } else if ("prune-info".equals(command)) { + RegionPruneInfo regionPruneInfo = getRegionPruneInfo(parameter); + if (regionPruneInfo != null) { + pw.println(GSON.toJson(regionPruneInfo)); + } else { + pw.println(String.format("No prune info found for the region %s.", parameter)); + } + return true; + } else { + pw.println(String.format("%s is not a valid command.", command)); + printUsage(pw); + return false; + } + } + } + + public static void main(String[] args) { + Configuration hConf = HBaseConfiguration.create(); + InvalidListPruningDebug pruningDebug = new InvalidListPruningDebug(); + try { + pruningDebug.initialize(hConf); + boolean success = pruningDebug.execute(args); + if (!success) { + System.exit(1); + } + } catch (IOException ex) { + LOG.error("Received an exception while trying to execute the debug tool. ", ex); + } + } +}
