Github user anew commented on a diff in the pull request:

    https://github.com/apache/incubator-tephra/pull/20#discussion_r90759375
  
    --- Diff: 
tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/janitor/DataJanitorState.java
 ---
    @@ -58,10 +110,248 @@ public long getPruneUpperBound(byte[] regionId) 
throws IOException {
         }
       }
     
    +  /**
    +   * Get latest prune upper bounds for given regions. This is a batch 
operation of method
    +   * {@link #getPruneUpperBoundForRegion(byte[])}
    +   *
    +   * @param regions a set of regions
    +   * @return a map containing region id and its latest prune upper bound 
value
    +   * @throws IOException when not able to read the data from HBase
    +   */
    +  public Map<byte[], Long> getPruneUpperBoundForRegions(SortedSet<byte[]> 
regions) throws IOException {
    +    Map<byte[], Long> resultMap = new TreeMap<>(Bytes.BYTES_COMPARATOR);
    +    try (Table stateTable = stateTableSupplier.get()) {
    +      byte[] startRow = makeRegionKey(EMPTY_BYTE_ARRAY);
    +      Scan scan = new Scan(startRow, REGION_KEY_PREFIX_STOP);
    +      scan.addColumn(FAMILY, PRUNE_UPPER_BOUND_COL);
    +
    +      try (ResultScanner scanner = stateTable.getScanner(scan)) {
    +        Result next;
    +        while ((next = scanner.next()) != null) {
    +          byte[] region = getRegionFromKey(next.getRow());
    +          if (regions.contains(region)) {
    +            byte[] timeBytes = next.getValue(FAMILY, 
PRUNE_UPPER_BOUND_COL);
    +            if (timeBytes != null) {
    +              long pruneUpperBoundRegion = Bytes.toLong(timeBytes);
    +              resultMap.put(region, pruneUpperBoundRegion);
    +            }
    +          }
    +        }
    +      }
    +      return resultMap;
    +    }
    +  }
    +
    +  /**
    +   * Delete all regions that are not in the given exclude set and whose 
prune upper bound is less than a given value.
    +   * After the invalid list is pruned up to deletionPruneUpperBound, we do 
not need entries for regions that have
    +   * prune upper bound less than deletionPruneUpperBound. We however limit 
the deletion to only regions that are
    +   * no longer in existence (due to deletion, etc.), to avoid 
update/delete race conditions.
    +   *
    +   * @param deletionPruneUpperBound prune upper bound below which regions 
will be deleted
    +   * @param excludeRegions set of regions that should not be deleted
    +   * @throws IOException when not able to delete data in HBase
    +   */
    +  public void deleteRegionsWithPruneUpperBoundBefore(long 
deletionPruneUpperBound, SortedSet<byte[]> excludeRegions)
    +    throws IOException {
    +    try (Table stateTable = stateTableSupplier.get()) {
    +      byte[] startRow = makeRegionKey(EMPTY_BYTE_ARRAY);
    +      Scan scan = new Scan(startRow, REGION_KEY_PREFIX_STOP);
    +      scan.addColumn(FAMILY, PRUNE_UPPER_BOUND_COL);
    +
    +      try (ResultScanner scanner = stateTable.getScanner(scan)) {
    +        Result next;
    +        while ((next = scanner.next()) != null) {
    +          byte[] region = getRegionFromKey(next.getRow());
    +          if (!excludeRegions.contains(region)) {
    +            byte[] timeBytes = next.getValue(FAMILY, 
PRUNE_UPPER_BOUND_COL);
    +            if (timeBytes != null) {
    +              long pruneUpperBoundRegion = Bytes.toLong(timeBytes);
    +              if (pruneUpperBoundRegion < deletionPruneUpperBound) {
    +                stateTable.delete(new Delete(next.getRow()));
    +              }
    +            }
    +          }
    +        }
    +      }
    +    }
    +  }
    +
    +  // ---------------------------------------------------
    +  // ------- Methods for regions at a given time -------
    +  // ---------------------------------------------------
    +  // Key: 0x2<time><region-id>
    +  // Col 't': <empty byte array>
    +  // ---------------------------------------------------
    +
    +  /**
    +   * Persist the regions for the given time. {@link 
HBaseTransactionPruningPlugin} saves the set of
    +   * transactional regions existing in the HBase instance periodically.
    +   *
    +   * @param time timestamp in milliseconds
    +   * @param regions set of regions at the time
    +   * @throws IOException when not able to persist the data to HBase
    +   */
    +  public void saveRegionsForTime(long time, Set<byte[]> regions) throws 
IOException {
    +    byte[] timeBytes = Bytes.toBytes(getInvertedTime(time));
    +    try (Table stateTable = stateTableSupplier.get()) {
    +      for (byte[] region : regions) {
    +        Put put = new Put(makeTimeRegionKey(timeBytes, region));
    +        put.addColumn(FAMILY, REGION_TIME_COL, EMPTY_BYTE_ARRAY);
    +        stateTable.put(put);
    +      }
    +    }
    +  }
    +
    +  /**
    +   * Return the set of regions saved for the time just before the given 
time. This method finds the greatest time
    +   * that is less than the given time, and then returns all regions with 
that exact time, but none that are
    +   * older than that.
    +   *
    +   * @param time timestamp in milliseconds
    +   * @return set of regions and time at which they were recorded
    +   * @throws IOException when not able to read the data from HBase
    +   */
    +  @Nullable
    +  public TimeRegions getRegionsOnOrBeforeTime(long time) throws 
IOException {
    +    byte[] timeBytes = Bytes.toBytes(getInvertedTime(time));
    +    try (Table stateTable = stateTableSupplier.get()) {
    +      Scan scan = new Scan(makeTimeRegionKey(timeBytes, EMPTY_BYTE_ARRAY), 
REGION_TIME_KEY_PREFIX_STOP);
    +      scan.addColumn(FAMILY, REGION_TIME_COL);
    +
    +      SortedSet<byte[]> regions = new TreeSet<>(Bytes.BYTES_COMPARATOR);
    +      long currentRegionTime = -1;
    +      try (ResultScanner scanner = stateTable.getScanner(scan)) {
    +        Result next;
    +        while ((next = scanner.next()) != null) {
    +          Map.Entry<Long, byte[]> timeRegion = 
getTimeRegion(next.getRow());
    +          // Stop if reached next time value
    +          if (currentRegionTime == -1) {
    +            currentRegionTime = timeRegion.getKey();
    +          } else if (timeRegion.getKey() < currentRegionTime) {
    +            break;
    +          } else if (timeRegion.getKey() > currentRegionTime) {
    +            throw new IllegalStateException(
    +              String.format("Got out of order time %d when expecting time 
lesser than %d",
    --- End diff --
    
    less than or equal to


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to