Github user anew commented on a diff in the pull request: https://github.com/apache/incubator-tephra/pull/20#discussion_r90759375 --- Diff: tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/janitor/DataJanitorState.java --- @@ -58,10 +110,248 @@ public long getPruneUpperBound(byte[] regionId) throws IOException { } } + /** + * Get latest prune upper bounds for given regions. This is a batch operation of method + * {@link #getPruneUpperBoundForRegion(byte[])} + * + * @param regions a set of regions + * @return a map containing region id and its latest prune upper bound value + * @throws IOException when not able to read the data from HBase + */ + public Map<byte[], Long> getPruneUpperBoundForRegions(SortedSet<byte[]> regions) throws IOException { + Map<byte[], Long> resultMap = new TreeMap<>(Bytes.BYTES_COMPARATOR); + try (Table stateTable = stateTableSupplier.get()) { + byte[] startRow = makeRegionKey(EMPTY_BYTE_ARRAY); + Scan scan = new Scan(startRow, REGION_KEY_PREFIX_STOP); + scan.addColumn(FAMILY, PRUNE_UPPER_BOUND_COL); + + try (ResultScanner scanner = stateTable.getScanner(scan)) { + Result next; + while ((next = scanner.next()) != null) { + byte[] region = getRegionFromKey(next.getRow()); + if (regions.contains(region)) { + byte[] timeBytes = next.getValue(FAMILY, PRUNE_UPPER_BOUND_COL); + if (timeBytes != null) { + long pruneUpperBoundRegion = Bytes.toLong(timeBytes); + resultMap.put(region, pruneUpperBoundRegion); + } + } + } + } + return resultMap; + } + } + + /** + * Delete all regions that are not in the given exclude set and whose prune upper bound is less than a given value. + * After the invalid list is pruned up to deletionPruneUpperBound, we do not need entries for regions that have + * prune upper bound less than deletionPruneUpperBound. We however limit the deletion to only regions that are + * no longer in existence (due to deletion, etc.), to avoid update/delete race conditions. + * + * @param deletionPruneUpperBound prune upper bound below which regions will be deleted + * @param excludeRegions set of regions that should not be deleted + * @throws IOException when not able to delete data in HBase + */ + public void deleteRegionsWithPruneUpperBoundBefore(long deletionPruneUpperBound, SortedSet<byte[]> excludeRegions) + throws IOException { + try (Table stateTable = stateTableSupplier.get()) { + byte[] startRow = makeRegionKey(EMPTY_BYTE_ARRAY); + Scan scan = new Scan(startRow, REGION_KEY_PREFIX_STOP); + scan.addColumn(FAMILY, PRUNE_UPPER_BOUND_COL); + + try (ResultScanner scanner = stateTable.getScanner(scan)) { + Result next; + while ((next = scanner.next()) != null) { + byte[] region = getRegionFromKey(next.getRow()); + if (!excludeRegions.contains(region)) { + byte[] timeBytes = next.getValue(FAMILY, PRUNE_UPPER_BOUND_COL); + if (timeBytes != null) { + long pruneUpperBoundRegion = Bytes.toLong(timeBytes); + if (pruneUpperBoundRegion < deletionPruneUpperBound) { + stateTable.delete(new Delete(next.getRow())); + } + } + } + } + } + } + } + + // --------------------------------------------------- + // ------- Methods for regions at a given time ------- + // --------------------------------------------------- + // Key: 0x2<time><region-id> + // Col 't': <empty byte array> + // --------------------------------------------------- + + /** + * Persist the regions for the given time. {@link HBaseTransactionPruningPlugin} saves the set of + * transactional regions existing in the HBase instance periodically. + * + * @param time timestamp in milliseconds + * @param regions set of regions at the time + * @throws IOException when not able to persist the data to HBase + */ + public void saveRegionsForTime(long time, Set<byte[]> regions) throws IOException { + byte[] timeBytes = Bytes.toBytes(getInvertedTime(time)); + try (Table stateTable = stateTableSupplier.get()) { + for (byte[] region : regions) { + Put put = new Put(makeTimeRegionKey(timeBytes, region)); + put.addColumn(FAMILY, REGION_TIME_COL, EMPTY_BYTE_ARRAY); + stateTable.put(put); + } + } + } + + /** + * Return the set of regions saved for the time just before the given time. This method finds the greatest time + * that is less than the given time, and then returns all regions with that exact time, but none that are + * older than that. + * + * @param time timestamp in milliseconds + * @return set of regions and time at which they were recorded + * @throws IOException when not able to read the data from HBase + */ + @Nullable + public TimeRegions getRegionsOnOrBeforeTime(long time) throws IOException { + byte[] timeBytes = Bytes.toBytes(getInvertedTime(time)); + try (Table stateTable = stateTableSupplier.get()) { + Scan scan = new Scan(makeTimeRegionKey(timeBytes, EMPTY_BYTE_ARRAY), REGION_TIME_KEY_PREFIX_STOP); + scan.addColumn(FAMILY, REGION_TIME_COL); + + SortedSet<byte[]> regions = new TreeSet<>(Bytes.BYTES_COMPARATOR); + long currentRegionTime = -1; + try (ResultScanner scanner = stateTable.getScanner(scan)) { + Result next; + while ((next = scanner.next()) != null) { + Map.Entry<Long, byte[]> timeRegion = getTimeRegion(next.getRow()); + // Stop if reached next time value + if (currentRegionTime == -1) { + currentRegionTime = timeRegion.getKey(); + } else if (timeRegion.getKey() < currentRegionTime) { + break; + } else if (timeRegion.getKey() > currentRegionTime) { + throw new IllegalStateException( + String.format("Got out of order time %d when expecting time lesser than %d", --- End diff -- less than or equal to
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---