http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/8f958edb/tephra-hbase-compat-1.3/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilter.java ---------------------------------------------------------------------- diff --git a/tephra-hbase-compat-1.3/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilter.java b/tephra-hbase-compat-1.3/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilter.java new file mode 100644 index 0000000..40e2c37 --- /dev/null +++ b/tephra-hbase-compat-1.3/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilter.java @@ -0,0 +1,313 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tephra.hbase.coprocessor; + +import com.google.common.collect.Maps; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.filter.Filter; +import org.apache.hadoop.hbase.filter.FilterBase; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.regionserver.ScanType; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.tephra.Transaction; +import org.apache.tephra.TxConstants; +import org.apache.tephra.util.TxUtils; + +import java.io.IOException; +import java.util.List; +import java.util.Map; +import javax.annotation.Nullable; + +/** + * Applies filtering of data based on transactional visibility (HBase 1.3 specific version). + * Note: this is intended for server-side use only, as additional properties need to be set on + * any {@code Scan} or {@code Get} operation performed. + */ +public class TransactionVisibilityFilter extends FilterBase { + private final Transaction tx; + // oldest visible timestamp by column family, used to apply TTL when reading + private final Map<ImmutableBytesWritable, Long> oldestTsByFamily; + // if false, empty values will be interpreted as deletes + private final boolean allowEmptyValues; + // whether or not we can remove delete markers + // these can only be safely removed when we are traversing all storefiles + private final boolean clearDeletes; + // optional sub-filter to apply to visible cells + private final Filter cellFilter; + // since we traverse KVs in order, cache the current oldest TS to avoid map lookups per KV + private final ImmutableBytesWritable currentFamily = new ImmutableBytesWritable(HConstants.EMPTY_BYTE_ARRAY); + + private long currentOldestTs; + + private DeleteTracker deleteTracker = new DeleteTracker(); + + /** + * Creates a new {@link org.apache.hadoop.hbase.filter.Filter} for returning data only from visible transactions. + * + * @param tx the current transaction to apply. Only data visible to this transaction will be returned. + * @param ttlByFamily map of time-to-live (TTL) (in milliseconds) by column family name + * @param allowEmptyValues if {@code true} cells with empty {@code byte[]} values will be returned, if {@code false} + * these will be interpreted as "delete" markers and the column will be filtered out + * @param scanType the type of scan operation being performed + */ + public TransactionVisibilityFilter(Transaction tx, Map<byte[], Long> ttlByFamily, boolean allowEmptyValues, + ScanType scanType) { + this(tx, ttlByFamily, allowEmptyValues, scanType, null); + } + + /** + * Creates a new {@link org.apache.hadoop.hbase.filter.Filter} for returning data only from visible transactions. + * + * @param tx the current transaction to apply. Only data visible to this transaction will be returned. + * @param ttlByFamily map of time-to-live (TTL) (in milliseconds) by column family name + * @param allowEmptyValues if {@code true} cells with empty {@code byte[]} values will be returned, if {@code false} + * these will be interpreted as "delete" markers and the column will be filtered out + * @param scanType the type of scan operation being performed + * @param cellFilter if non-null, this filter will be applied to all cells visible to the current transaction, by + * calling {@link Filter#filterKeyValue(org.apache.hadoop.hbase.Cell)}. If null, then + * {@link Filter.ReturnCode#INCLUDE_AND_NEXT_COL} will be returned instead. + */ + public TransactionVisibilityFilter(Transaction tx, Map<byte[], Long> ttlByFamily, boolean allowEmptyValues, + ScanType scanType, @Nullable Filter cellFilter) { + this.tx = tx; + this.oldestTsByFamily = Maps.newTreeMap(); + for (Map.Entry<byte[], Long> ttlEntry : ttlByFamily.entrySet()) { + long familyTTL = ttlEntry.getValue(); + oldestTsByFamily.put(new ImmutableBytesWritable(ttlEntry.getKey()), + familyTTL <= 0 ? 0 : tx.getVisibilityUpperBound() - familyTTL * TxConstants.MAX_TX_PER_MS); + } + this.allowEmptyValues = allowEmptyValues; + this.clearDeletes = + scanType == ScanType.COMPACT_DROP_DELETES || + (scanType == ScanType.USER_SCAN && tx.getVisibilityLevel() != Transaction.VisibilityLevel.SNAPSHOT_ALL); + this.cellFilter = cellFilter; + } + + @Override + public ReturnCode filterKeyValue(Cell cell) throws IOException { + if (!CellUtil.matchingFamily(cell, currentFamily.get(), currentFamily.getOffset(), currentFamily.getLength())) { + // column family changed + currentFamily.set(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength()); + Long familyOldestTs = oldestTsByFamily.get(currentFamily); + currentOldestTs = familyOldestTs != null ? familyOldestTs : 0; + deleteTracker.reset(); + } + // need to apply TTL for the column family here + long kvTimestamp = cell.getTimestamp(); + if (TxUtils.getTimestampForTTL(kvTimestamp) < currentOldestTs) { + // passed TTL for this column, seek to next + return ReturnCode.NEXT_COL; + } else if (tx.isVisible(kvTimestamp)) { + // Return all writes done by current transaction (including deletes) for VisibilityLevel.SNAPSHOT_ALL + if (tx.getVisibilityLevel() == Transaction.VisibilityLevel.SNAPSHOT_ALL && tx.isCurrentWrite(kvTimestamp)) { + // cell is visible + // visibility SNAPSHOT_ALL needs all matches + return runSubFilter(ReturnCode.INCLUDE, cell); + } + if (DeleteTracker.isFamilyDelete(cell)) { + deleteTracker.addFamilyDelete(cell); + if (clearDeletes) { + return ReturnCode.NEXT_COL; + } else { + // cell is visible + // as soon as we find a KV to include we can move to the next column + return runSubFilter(ReturnCode.INCLUDE_AND_NEXT_COL, cell); + } + } + // check if masked by family delete + if (deleteTracker.isDeleted(cell)) { + return ReturnCode.NEXT_COL; + } + // check for column delete + if (isColumnDelete(cell)) { + if (clearDeletes) { + // skip "deleted" cell + return ReturnCode.NEXT_COL; + } else { + // keep the marker but skip any remaining versions + return runSubFilter(ReturnCode.INCLUDE_AND_NEXT_COL, cell); + } + } + // cell is visible + // as soon as we find a KV to include we can move to the next column + return runSubFilter(ReturnCode.INCLUDE_AND_NEXT_COL, cell); + } else { + return ReturnCode.SKIP; + } + } + + private ReturnCode runSubFilter(ReturnCode txFilterCode, Cell cell) throws IOException { + if (cellFilter != null) { + ReturnCode subFilterCode = cellFilter.filterKeyValue(cell); + return determineReturnCode(txFilterCode, subFilterCode); + } + return txFilterCode; + } + + /** + * Determines the return code of TransactionVisibilityFilter based on sub-filter's return code. + * Sub-filter can only exclude cells included by TransactionVisibilityFilter, i.e., sub-filter's + * INCLUDE will be ignored. This behavior makes sure that sub-filter only sees cell versions valid for the + * given transaction. If sub-filter needs to see older versions of cell, then this method can be overridden. + * + * @param txFilterCode return code from TransactionVisibilityFilter + * @param subFilterCode return code from sub-filter + * @return final return code + */ + protected ReturnCode determineReturnCode(ReturnCode txFilterCode, ReturnCode subFilterCode) { + // Return the more restrictive of the two filter responses + switch (subFilterCode) { + case INCLUDE: + return txFilterCode; + case INCLUDE_AND_NEXT_COL: + return ReturnCode.INCLUDE_AND_NEXT_COL; + case SKIP: + return txFilterCode == ReturnCode.INCLUDE ? ReturnCode.SKIP : ReturnCode.NEXT_COL; + default: + return subFilterCode; + } + } + + @Override + public boolean filterRow() throws IOException { + if (cellFilter != null) { + return cellFilter.filterRow(); + } + return super.filterRow(); + } + + @Override + public Cell transformCell(Cell cell) throws IOException { + // Convert Tephra deletes back into HBase deletes + if (tx.getVisibilityLevel() == Transaction.VisibilityLevel.SNAPSHOT_ALL) { + if (DeleteTracker.isFamilyDelete(cell)) { + return new KeyValue(CellUtil.cloneRow(cell), CellUtil.cloneFamily(cell), null, cell.getTimestamp(), + KeyValue.Type.DeleteFamily); + } else if (isColumnDelete(cell)) { + // Note: in some cases KeyValue.Type.Delete is used in Delete object, + // and in some other cases KeyValue.Type.DeleteColumn is used. + // Since Tephra cannot distinguish between the two, we return KeyValue.Type.DeleteColumn. + // KeyValue.Type.DeleteColumn makes both CellUtil.isDelete and CellUtil.isDeleteColumns return true, and will + // work in both cases. + return new KeyValue(CellUtil.cloneRow(cell), CellUtil.cloneFamily(cell), CellUtil.cloneQualifier(cell), + cell.getTimestamp(), KeyValue.Type.DeleteColumn); + } + } + return cell; + } + + @Override + public void reset() throws IOException { + deleteTracker.reset(); + if (cellFilter != null) { + cellFilter.reset(); + } + } + + @Override + public boolean filterRowKey(byte[] buffer, int offset, int length) throws IOException { + if (cellFilter != null) { + return cellFilter.filterRowKey(buffer, offset, length); + } + return super.filterRowKey(buffer, offset, length); + } + + @Override + public boolean filterAllRemaining() throws IOException { + if (cellFilter != null) { + return cellFilter.filterAllRemaining(); + } + return super.filterAllRemaining(); + } + + @Override + public void filterRowCells(List<Cell> kvs) throws IOException { + if (cellFilter != null) { + cellFilter.filterRowCells(kvs); + } else { + super.filterRowCells(kvs); + } + } + + @Override + public boolean hasFilterRow() { + if (cellFilter != null) { + return cellFilter.hasFilterRow(); + } + return super.hasFilterRow(); + } + + @SuppressWarnings("deprecation") + @Override + public KeyValue getNextKeyHint(KeyValue currentKV) throws IOException { + if (cellFilter != null) { + return cellFilter.getNextKeyHint(currentKV); + } + return super.getNextKeyHint(currentKV); + } + + @Override + public Cell getNextCellHint(Cell currentKV) throws IOException { + if (cellFilter != null) { + return cellFilter.getNextCellHint(currentKV); + } + return super.getNextCellHint(currentKV); + } + + @Override + public boolean isFamilyEssential(byte[] name) throws IOException { + if (cellFilter != null) { + return cellFilter.isFamilyEssential(name); + } + return super.isFamilyEssential(name); + } + + private boolean isColumnDelete(Cell cell) { + return !TxUtils.isPreExistingVersion(cell.getTimestamp()) && cell.getValueLength() == 0 && !allowEmptyValues; + } + + private static final class DeleteTracker { + private long familyDeleteTs; + private byte[] rowKey; + + public static boolean isFamilyDelete(Cell cell) { + return !TxUtils.isPreExistingVersion(cell.getTimestamp()) && + CellUtil.matchingQualifier(cell, TxConstants.FAMILY_DELETE_QUALIFIER) && + CellUtil.matchingValue(cell, HConstants.EMPTY_BYTE_ARRAY); + } + + public void addFamilyDelete(Cell delete) { + this.familyDeleteTs = delete.getTimestamp(); + this.rowKey = Bytes.copy(delete.getRowArray(), delete.getRowOffset(), delete.getRowLength()); + } + + public boolean isDeleted(Cell cell) { + return rowKey != null && Bytes.compareTo(cell.getRowArray(), cell.getRowOffset(), + cell.getRowLength(), rowKey, 0, rowKey.length) == 0 && cell.getTimestamp() <= familyDeleteTs; + } + + public void reset() { + this.familyDeleteTs = 0; + this.rowKey = null; + } + } +}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/8f958edb/tephra-hbase-compat-1.3/src/main/java/org/apache/tephra/hbase/txprune/CompactionState.java ---------------------------------------------------------------------- diff --git a/tephra-hbase-compat-1.3/src/main/java/org/apache/tephra/hbase/txprune/CompactionState.java b/tephra-hbase-compat-1.3/src/main/java/org/apache/tephra/hbase/txprune/CompactionState.java new file mode 100644 index 0000000..9b856d9 --- /dev/null +++ b/tephra-hbase-compat-1.3/src/main/java/org/apache/tephra/hbase/txprune/CompactionState.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tephra.hbase.txprune; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; +import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; +import org.apache.tephra.Transaction; +import org.apache.tephra.persist.TransactionVisibilityState; +import org.apache.tephra.util.TxUtils; + +import java.io.IOException; +import javax.annotation.Nullable; + +/** + * Record compaction state for invalid list pruning + */ +public class CompactionState { + private static final Log LOG = LogFactory.getLog(CompactionState.class); + + private final byte[] regionName; + private final String regionNameAsString; + private final PruneUpperBoundWriterSupplier pruneUpperBoundWriterSupplier; + private final PruneUpperBoundWriter pruneUpperBoundWriter; + + private volatile long pruneUpperBound = -1; + + public CompactionState(final RegionCoprocessorEnvironment env, final TableName stateTable, long pruneFlushInterval) { + this.regionName = env.getRegionInfo().getRegionName(); + this.regionNameAsString = env.getRegionInfo().getRegionNameAsString(); + DataJanitorState dataJanitorState = new DataJanitorState(new DataJanitorState.TableSupplier() { + @Override + public Table get() throws IOException { + return env.getTable(stateTable); + } + }); + this.pruneUpperBoundWriterSupplier = new PruneUpperBoundWriterSupplier(stateTable, dataJanitorState, + pruneFlushInterval); + this.pruneUpperBoundWriter = pruneUpperBoundWriterSupplier.get(); + } + + /** + * Records the transaction state used for a compaction. This method is called when the compaction starts. + * + * @param request {@link CompactionRequest} for the compaction + * @param snapshot transaction state that will be used for the compaction + */ + public void record(CompactionRequest request, @Nullable TransactionVisibilityState snapshot) { + if (request.isMajor() && snapshot != null) { + Transaction tx = TxUtils.createDummyTransaction(snapshot); + pruneUpperBound = TxUtils.getPruneUpperBound(tx); + if (LOG.isDebugEnabled()) { + LOG.debug( + String.format("Computed prune upper bound %s for compaction request %s using transaction state from time %s", + pruneUpperBound, request, snapshot.getTimestamp())); + } + } else { + pruneUpperBound = -1; + } + } + + /** + * Persists the transaction state recorded by {@link #record(CompactionRequest, TransactionVisibilityState)}. + * This method is called after the compaction has successfully completed. + */ + public void persist() { + if (pruneUpperBound != -1) { + pruneUpperBoundWriter.persistPruneEntry(regionName, pruneUpperBound); + if (LOG.isDebugEnabled()) { + LOG.debug(String.format("Enqueued prune upper bound %s for region %s", pruneUpperBound, regionNameAsString)); + } + } + } + + /** + * Persist that the given region is empty at the given time + * @param time time in milliseconds + */ + public void persistRegionEmpty(long time) { + pruneUpperBoundWriter.persistRegionEmpty(regionName, time); + if (LOG.isDebugEnabled()) { + LOG.debug(String.format("Enqueued empty region %s at time %s", regionNameAsString, time)); + } + } + + /** + * Releases the usage {@link PruneUpperBoundWriter}. + */ + public void stop() { + pruneUpperBoundWriterSupplier.release(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/8f958edb/tephra-hbase-compat-1.3/src/main/java/org/apache/tephra/hbase/txprune/DataJanitorState.java ---------------------------------------------------------------------- diff --git a/tephra-hbase-compat-1.3/src/main/java/org/apache/tephra/hbase/txprune/DataJanitorState.java b/tephra-hbase-compat-1.3/src/main/java/org/apache/tephra/hbase/txprune/DataJanitorState.java new file mode 100644 index 0000000..db59d7d --- /dev/null +++ b/tephra-hbase-compat-1.3/src/main/java/org/apache/tephra/hbase/txprune/DataJanitorState.java @@ -0,0 +1,536 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tephra.hbase.txprune; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.Maps; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.tephra.hbase.coprocessor.TransactionProcessor; +import org.apache.tephra.txprune.RegionPruneInfo; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.SortedSet; +import java.util.TreeMap; +import java.util.TreeSet; +import javax.annotation.Nullable; + +/** + * Persist data janitor state into an HBase table. + * This is used by both {@link TransactionProcessor} and by the {@link HBaseTransactionPruningPlugin} + * to persist and read the compaction state. + */ +@SuppressWarnings("WeakerAccess") +public class DataJanitorState { + private static final Log LOG = LogFactory.getLog(DataJanitorState.class); + + public static final byte[] FAMILY = {'f'}; + public static final byte[] PRUNE_UPPER_BOUND_COL = {'p'}; + + private static final byte[] REGION_TIME_COL = {'r'}; + private static final byte[] INACTIVE_TRANSACTION_BOUND_TIME_COL = {'i'}; + private static final byte[] EMPTY_REGION_TIME_COL = {'e'}; + + private static final byte[] REGION_KEY_PREFIX = {0x1}; + private static final byte[] REGION_KEY_PREFIX_STOP = {0x2}; + + private static final byte[] REGION_TIME_KEY_PREFIX = {0x2}; + private static final byte[] REGION_TIME_KEY_PREFIX_STOP = {0x3}; + + private static final byte[] INACTIVE_TRANSACTION_BOUND_TIME_KEY_PREFIX = {0x3}; + private static final byte[] INACTIVE_TRANSACTION_BOUND_TIME_KEY_PREFIX_STOP = {0x4}; + + private static final byte[] EMPTY_REGION_TIME_KEY_PREFIX = {0x4}; + private static final byte[] EMPTY_REGION_TIME_KEY_PREFIX_STOP = {0x5}; + + private static final byte[] REGION_TIME_COUNT_KEY_PREFIX = {0x5}; + private static final byte[] REGION_TIME_COUNT_KEY_PREFIX_STOP = {0x6}; + + private static final byte[] EMPTY_BYTE_ARRAY = new byte[0]; + // This value can be used when we don't care about the value we write in a column + private static final byte[] COL_VAL = Bytes.toBytes('1'); + + private final TableSupplier stateTableSupplier; + + + public DataJanitorState(TableSupplier stateTableSupplier) { + this.stateTableSupplier = stateTableSupplier; + } + + // ---------------------------------------------------------------- + // ------- Methods for prune upper bound for a given region ------- + // ---------------------------------------------------------------- + // The data is stored in the following format - + // Key: 0x1<region-id> + // Col 'u': <prune upper bound> + // ---------------------------------------------------------------- + + /** + * Persist the latest prune upper bound for a given region. This is called by {@link TransactionProcessor} + * after major compaction. + * + * @param regionId region id + * @param pruneUpperBound the latest prune upper bound for the region + * @throws IOException when not able to persist the data to HBase + */ + public void savePruneUpperBoundForRegion(byte[] regionId, long pruneUpperBound) throws IOException { + try (Table stateTable = stateTableSupplier.get()) { + Put put = new Put(makeRegionKey(regionId)); + put.addColumn(FAMILY, PRUNE_UPPER_BOUND_COL, Bytes.toBytes(pruneUpperBound)); + stateTable.put(put); + } + } + + /** + * Get latest prune upper bound for a given region. This indicates the largest invalid transaction that no + * longer has writes in this region. + * + * @param regionId region id + * @return latest prune upper bound for the region + * @throws IOException when not able to read the data from HBase + */ + public long getPruneUpperBoundForRegion(byte[] regionId) throws IOException { + RegionPruneInfo regionPruneInfo = getPruneInfoForRegion(regionId); + return (regionPruneInfo == null) ? -1 : regionPruneInfo.getPruneUpperBound(); + } + + /** + * Get the latest {@link RegionPruneInfo} for a given region. + * + * @param regionId region id + * @return {@link RegionPruneInfo} for the region + * @throws IOException when not able to read the data from HBase + */ + @Nullable + public RegionPruneInfo getPruneInfoForRegion(byte[] regionId) throws IOException { + try (Table stateTable = stateTableSupplier.get()) { + Get get = new Get(makeRegionKey(regionId)); + get.addColumn(FAMILY, PRUNE_UPPER_BOUND_COL); + Cell cell = stateTable.get(get).getColumnLatestCell(FAMILY, PRUNE_UPPER_BOUND_COL); + if (cell == null) { + return null; + } + byte[] pruneUpperBoundBytes = CellUtil.cloneValue(cell); + long timestamp = cell.getTimestamp(); + return new RegionPruneInfo(regionId, Bytes.toStringBinary(regionId), + Bytes.toLong(pruneUpperBoundBytes), timestamp); + } + } + + /** + * Get latest prune upper bounds for given regions. This is a batch operation of method + * {@link #getPruneUpperBoundForRegion(byte[])} + * + * @param regions a set of regions + * @return a map containing region id and its latest prune upper bound value + * @throws IOException when not able to read the data from HBase + */ + public Map<byte[], Long> getPruneUpperBoundForRegions(SortedSet<byte[]> regions) throws IOException { + Map<byte[], Long> resultMap = new TreeMap<>(Bytes.BYTES_COMPARATOR); + List<RegionPruneInfo> regionPruneInfos = getPruneInfoForRegions(regions); + for (RegionPruneInfo regionPruneInfo : regionPruneInfos) { + resultMap.put(regionPruneInfo.getRegionName(), regionPruneInfo.getPruneUpperBound()); + } + return Collections.unmodifiableMap(resultMap); + } + + /** + * Gets a list of {@link RegionPruneInfo} for given regions. Returns all regions if the given regions set is null. + * + * @param regions a set of regions + * @return list of {@link RegionPruneInfo}s. + * @throws IOException when not able to read the data from HBase + */ + public List<RegionPruneInfo> getPruneInfoForRegions(@Nullable SortedSet<byte[]> regions) throws IOException { + List<RegionPruneInfo> regionPruneInfos = new ArrayList<>(); + try (Table stateTable = stateTableSupplier.get()) { + byte[] startRow = makeRegionKey(EMPTY_BYTE_ARRAY); + Scan scan = new Scan(startRow, REGION_KEY_PREFIX_STOP); + scan.addColumn(FAMILY, PRUNE_UPPER_BOUND_COL); + + try (ResultScanner scanner = stateTable.getScanner(scan)) { + Result next; + while ((next = scanner.next()) != null) { + byte[] region = getRegionFromKey(next.getRow()); + if (regions == null || regions.contains(region)) { + Cell cell = next.getColumnLatestCell(FAMILY, PRUNE_UPPER_BOUND_COL); + if (cell != null) { + byte[] pruneUpperBoundBytes = CellUtil.cloneValue(cell); + long timestamp = cell.getTimestamp(); + regionPruneInfos.add(new RegionPruneInfo(region, Bytes.toStringBinary(region), + Bytes.toLong(pruneUpperBoundBytes), timestamp)); + } + } + } + } + } + return Collections.unmodifiableList(regionPruneInfos); + } + + /** + * Delete prune upper bounds for the regions that are not in the given exclude set, and the + * prune upper bound is less than the given value. + * After the invalid list is pruned up to deletionPruneUpperBound, we do not need entries for regions that have + * prune upper bound less than deletionPruneUpperBound. We however limit the deletion to only regions that are + * no longer in existence (due to deletion, etc.), to avoid update/delete race conditions. + * + * @param deletionPruneUpperBound prune upper bound below which regions will be deleted + * @param excludeRegions set of regions that should not be deleted + * @throws IOException when not able to delete data in HBase + */ + public void deletePruneUpperBounds(long deletionPruneUpperBound, SortedSet<byte[]> excludeRegions) + throws IOException { + try (Table stateTable = stateTableSupplier.get()) { + byte[] startRow = makeRegionKey(EMPTY_BYTE_ARRAY); + Scan scan = new Scan(startRow, REGION_KEY_PREFIX_STOP); + scan.addColumn(FAMILY, PRUNE_UPPER_BOUND_COL); + + try (ResultScanner scanner = stateTable.getScanner(scan)) { + Result next; + while ((next = scanner.next()) != null) { + byte[] region = getRegionFromKey(next.getRow()); + if (!excludeRegions.contains(region)) { + byte[] timeBytes = next.getValue(FAMILY, PRUNE_UPPER_BOUND_COL); + if (timeBytes != null) { + long pruneUpperBoundRegion = Bytes.toLong(timeBytes); + if (pruneUpperBoundRegion < deletionPruneUpperBound) { + stateTable.delete(new Delete(next.getRow())); + } + } + } + } + } + } + } + + // --------------------------------------------------- + // ------- Methods for regions at a given time ------- + // --------------------------------------------------- + // Key: 0x2<inverted time><region-id> + // Col 't': <empty byte array> + // --------------------------------------------------- + + /** + * Persist the regions for the given time. {@link HBaseTransactionPruningPlugin} saves the set of + * transactional regions existing in the HBase instance periodically. + * + * @param time timestamp in milliseconds + * @param regions set of regions at the time + * @throws IOException when not able to persist the data to HBase + */ + public void saveRegionsForTime(long time, Set<byte[]> regions) throws IOException { + byte[] timeBytes = Bytes.toBytes(getInvertedTime(time)); + try (Table stateTable = stateTableSupplier.get()) { + for (byte[] region : regions) { + Put put = new Put(makeTimeRegionKey(timeBytes, region)); + put.addColumn(FAMILY, REGION_TIME_COL, COL_VAL); + stateTable.put(put); + } + + // Save the count of regions as a checksum + saveRegionCountForTime(stateTable, timeBytes, regions.size()); + } + } + + @VisibleForTesting + void saveRegionCountForTime(Table stateTable, byte[] timeBytes, int count) throws IOException { + Put put = new Put(makeTimeRegionCountKey(timeBytes)); + put.addColumn(FAMILY, REGION_TIME_COL, Bytes.toBytes(count)); + stateTable.put(put); + } + + /** + * Return the set of regions saved for the time at or before the given time. This method finds the greatest time + * that is less than or equal to the given time, and then returns all regions with that exact time, but none that are + * older than that. + * + * @param time timestamp in milliseconds + * @return set of regions and time at which they were recorded, or null if no regions found + * @throws IOException when not able to read the data from HBase + */ + @Nullable + public TimeRegions getRegionsOnOrBeforeTime(long time) throws IOException { + try (Table stateTable = stateTableSupplier.get()) { + TimeRegions timeRegions; + while ((timeRegions = getNextSetOfTimeRegions(stateTable, time)) != null) { + int count = getRegionCountForTime(stateTable, timeRegions.getTime()); + if (count != -1 && count == timeRegions.getRegions().size()) { + return timeRegions; + } else { + LOG.warn(String.format("Got incorrect count for regions saved at time %s, expected = %s but actual = %s", + timeRegions.getTime(), count, timeRegions.getRegions().size())); + time = timeRegions.getTime() - 1; + } + } + return null; + } + } + + @Nullable + private TimeRegions getNextSetOfTimeRegions(Table stateTable, long time) throws IOException { + byte[] timeBytes = Bytes.toBytes(getInvertedTime(time)); + Scan scan = new Scan(makeTimeRegionKey(timeBytes, EMPTY_BYTE_ARRAY), REGION_TIME_KEY_PREFIX_STOP); + scan.addColumn(FAMILY, REGION_TIME_COL); + + + long currentRegionTime = -1; + SortedSet<byte[]> regions = new TreeSet<>(Bytes.BYTES_COMPARATOR); + Result next; + try (ResultScanner scanner = stateTable.getScanner(scan)) { + while ((next = scanner.next()) != null) { + Map.Entry<Long, byte[]> timeRegion = getTimeRegion(next.getRow()); + // Stop if reached next time value + if (currentRegionTime == -1) { + currentRegionTime = timeRegion.getKey(); + } else if (timeRegion.getKey() < currentRegionTime) { + break; + } else if (timeRegion.getKey() > currentRegionTime) { + throw new IllegalStateException( + String.format("Got out of order time %d when expecting time less than or equal to %d", + timeRegion.getKey(), currentRegionTime)); + } + regions.add(timeRegion.getValue()); + } + } + return regions.isEmpty() ? null : new TimeRegions(currentRegionTime, Collections.unmodifiableSortedSet(regions)); + } + + @VisibleForTesting + int getRegionCountForTime(Table stateTable, long time) throws IOException { + Get get = new Get(makeTimeRegionCountKey(Bytes.toBytes(getInvertedTime(time)))); + get.addColumn(FAMILY, REGION_TIME_COL); + Result result = stateTable.get(get); + byte[] value = result.getValue(FAMILY, REGION_TIME_COL); + return value == null ? -1 : Bytes.toInt(value); + } + + /** + * Delete all the regions that were recorded for all times equal or less than the given time. + * + * @param time timestamp in milliseconds + * @throws IOException when not able to delete data in HBase + */ + public void deleteAllRegionsOnOrBeforeTime(long time) throws IOException { + byte[] timeBytes = Bytes.toBytes(getInvertedTime(time)); + try (Table stateTable = stateTableSupplier.get()) { + // Delete the regions + Scan scan = new Scan(makeTimeRegionKey(timeBytes, EMPTY_BYTE_ARRAY), REGION_TIME_KEY_PREFIX_STOP); + scan.addColumn(FAMILY, REGION_TIME_COL); + deleteFromScan(stateTable, scan); + + // Delete the count + scan = new Scan(makeTimeRegionCountKey(timeBytes), REGION_TIME_COUNT_KEY_PREFIX_STOP); + scan.addColumn(FAMILY, REGION_TIME_COL); + deleteFromScan(stateTable, scan); + } + } + + // --------------------------------------------------------------------- + // ------- Methods for inactive transaction bound for given time ------- + // --------------------------------------------------------------------- + // Key: 0x3<inverted time> + // Col 'p': <inactive transaction bound> + // --------------------------------------------------------------------- + + /** + * Persist inactive transaction bound for a given time. This is the smallest not in-progress transaction that + * will not have writes in any HBase regions that are created after the given time. + * + * @param time time in milliseconds + * @param inactiveTransactionBound inactive transaction bound for the given time + * @throws IOException when not able to persist the data to HBase + */ + public void saveInactiveTransactionBoundForTime(long time, long inactiveTransactionBound) throws IOException { + try (Table stateTable = stateTableSupplier.get()) { + Put put = new Put(makeInactiveTransactionBoundTimeKey(Bytes.toBytes(getInvertedTime(time)))); + put.addColumn(FAMILY, INACTIVE_TRANSACTION_BOUND_TIME_COL, Bytes.toBytes(inactiveTransactionBound)); + stateTable.put(put); + } + } + + /** + * Return inactive transaction bound for the given time. + * + * @param time time in milliseconds + * @return inactive transaction bound for the given time + * @throws IOException when not able to read the data from HBase + */ + public long getInactiveTransactionBoundForTime(long time) throws IOException { + try (Table stateTable = stateTableSupplier.get()) { + Get get = new Get(makeInactiveTransactionBoundTimeKey(Bytes.toBytes(getInvertedTime(time)))); + get.addColumn(FAMILY, INACTIVE_TRANSACTION_BOUND_TIME_COL); + byte[] result = stateTable.get(get).getValue(FAMILY, INACTIVE_TRANSACTION_BOUND_TIME_COL); + return result == null ? -1 : Bytes.toLong(result); + } + } + + /** + * Delete all inactive transaction bounds recorded for a time less than the given time + * + * @param time time in milliseconds + * @throws IOException when not able to delete data in HBase + */ + public void deleteInactiveTransactionBoundsOnOrBeforeTime(long time) throws IOException { + try (Table stateTable = stateTableSupplier.get()) { + Scan scan = new Scan(makeInactiveTransactionBoundTimeKey(Bytes.toBytes(getInvertedTime(time))), + INACTIVE_TRANSACTION_BOUND_TIME_KEY_PREFIX_STOP); + scan.addColumn(FAMILY, INACTIVE_TRANSACTION_BOUND_TIME_COL); + deleteFromScan(stateTable, scan); + } + } + + // -------------------------------------------------------- + // ------- Methods for empty regions at a given time ------- + // -------------------------------------------------------- + // Key: 0x4<time><region-id> + // Col 'e': <empty byte array> + // -------------------------------------------------------- + + /** + * Save the given region as empty as of the given time. + * + * @param time time in milliseconds + * @param regionId region id + */ + public void saveEmptyRegionForTime(long time, byte[] regionId) throws IOException { + byte[] timeBytes = Bytes.toBytes(time); + try (Table stateTable = stateTableSupplier.get()) { + Put put = new Put(makeEmptyRegionTimeKey(timeBytes, regionId)); + put.addColumn(FAMILY, EMPTY_REGION_TIME_COL, COL_VAL); + stateTable.put(put); + } + } + + /** + * Return regions that were recorded as empty after the given time. + * + * @param time time in milliseconds + * @param includeRegions If not null, the returned set will be an intersection of the includeRegions set + * and the empty regions after the given time + */ + public SortedSet<byte[]> getEmptyRegionsAfterTime(long time, @Nullable SortedSet<byte[]> includeRegions) + throws IOException { + SortedSet<byte[]> emptyRegions = new TreeSet<>(Bytes.BYTES_COMPARATOR); + try (Table stateTable = stateTableSupplier.get()) { + Scan scan = new Scan(makeEmptyRegionTimeKey(Bytes.toBytes(time + 1), EMPTY_BYTE_ARRAY), + EMPTY_REGION_TIME_KEY_PREFIX_STOP); + scan.addColumn(FAMILY, EMPTY_REGION_TIME_COL); + + try (ResultScanner scanner = stateTable.getScanner(scan)) { + Result next; + while ((next = scanner.next()) != null) { + byte[] emptyRegion = getEmptyRegionFromKey(next.getRow()); + if (includeRegions == null || includeRegions.contains(emptyRegion)) { + emptyRegions.add(emptyRegion); + } + } + } + } + return Collections.unmodifiableSortedSet(emptyRegions); + } + + /** + * Delete empty region records saved on or before the given time. + * + * @param time time in milliseconds + */ + public void deleteEmptyRegionsOnOrBeforeTime(long time) throws IOException { + try (Table stateTable = stateTableSupplier.get()) { + Scan scan = new Scan(); + scan.setStopRow(makeEmptyRegionTimeKey(Bytes.toBytes(time + 1), EMPTY_BYTE_ARRAY)); + scan.addColumn(FAMILY, EMPTY_REGION_TIME_COL); + deleteFromScan(stateTable, scan); + } + } + + @VisibleForTesting + void deleteFromScan(Table stateTable, Scan scan) throws IOException { + try (ResultScanner scanner = stateTable.getScanner(scan)) { + Result next; + while ((next = scanner.next()) != null) { + stateTable.delete(new Delete(next.getRow())); + } + } + } + + private byte[] makeRegionKey(byte[] regionId) { + return Bytes.add(REGION_KEY_PREFIX, regionId); + } + + private byte[] getRegionFromKey(byte[] regionKey) { + int prefixLen = REGION_KEY_PREFIX.length; + return Bytes.copy(regionKey, prefixLen, regionKey.length - prefixLen); + } + + private byte[] makeTimeRegionKey(byte[] time, byte[] regionId) { + return Bytes.add(REGION_TIME_KEY_PREFIX, time, regionId); + } + + private byte[] makeTimeRegionCountKey(byte[] time) { + return Bytes.add(REGION_TIME_COUNT_KEY_PREFIX, time); + } + + private byte[] makeInactiveTransactionBoundTimeKey(byte[] time) { + return Bytes.add(INACTIVE_TRANSACTION_BOUND_TIME_KEY_PREFIX, time); + } + + private Map.Entry<Long, byte[]> getTimeRegion(byte[] key) { + int offset = REGION_TIME_KEY_PREFIX.length; + long time = getInvertedTime(Bytes.toLong(key, offset)); + offset += Bytes.SIZEOF_LONG; + byte[] regionName = Bytes.copy(key, offset, key.length - offset); + return Maps.immutableEntry(time, regionName); + } + + private byte[] makeEmptyRegionTimeKey(byte[] time, byte[] regionId) { + return Bytes.add(EMPTY_REGION_TIME_KEY_PREFIX, time, regionId); + } + + private byte[] getEmptyRegionFromKey(byte[] key) { + int prefixLen = EMPTY_REGION_TIME_KEY_PREFIX.length + Bytes.SIZEOF_LONG; + return Bytes.copy(key, prefixLen, key.length - prefixLen); + } + + private long getInvertedTime(long time) { + return Long.MAX_VALUE - time; + } + + /** + * Supplies table for persisting state + */ + public interface TableSupplier { + Table get() throws IOException; + } +} http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/8f958edb/tephra-hbase-compat-1.3/src/main/java/org/apache/tephra/hbase/txprune/HBaseTransactionPruningPlugin.java ---------------------------------------------------------------------- diff --git a/tephra-hbase-compat-1.3/src/main/java/org/apache/tephra/hbase/txprune/HBaseTransactionPruningPlugin.java b/tephra-hbase-compat-1.3/src/main/java/org/apache/tephra/hbase/txprune/HBaseTransactionPruningPlugin.java new file mode 100644 index 0000000..84c480a --- /dev/null +++ b/tephra-hbase-compat-1.3/src/main/java/org/apache/tephra/hbase/txprune/HBaseTransactionPruningPlugin.java @@ -0,0 +1,373 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tephra.hbase.txprune; + +import com.google.common.base.Function; +import com.google.common.collect.Iterables; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.TableExistsException; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.tephra.TxConstants; +import org.apache.tephra.hbase.coprocessor.TransactionProcessor; +import org.apache.tephra.txprune.TransactionPruningPlugin; +import org.apache.tephra.util.TxUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.SortedSet; +import java.util.TreeMap; +import java.util.TreeSet; + +/** + * Default implementation of the {@link TransactionPruningPlugin} for HBase. + * + * This plugin determines the prune upper bound for transactional HBase tables that use + * coprocessor {@link TransactionProcessor}. + * + * <h3>State storage:</h3> + * + * This plugin expects the TransactionProcessor to save the prune upper bound for invalid transactions + * after every major compaction of a region. Let's call this <i>(region, prune upper bound)</i>. + * In addition, the plugin also persists the following information on a run at time <i>t</i> + * <ul> + * <li> + * <i>(t, set of regions)</i>: Set of transactional regions at time <i>t</i>. + * Transactional regions are regions of the tables that have the coprocessor TransactionProcessor + * attached to them. + * </li> + * <li> + * <i>(t, inactive transaction bound)</i>: This is the smallest not in-progress transaction that + * will not have writes in any HBase regions that are created after time <i>t</i>. + * This value is determined by the Transaction Service based on the transaction state at time <i>t</i> + * and passed on to the plugin. + * </li> + * </ul> + * + * <h3>Computing prune upper bound:</h3> + * + * In a typical HBase instance, there can be a constant change in the number of regions due to region creations, + * splits and merges. At any given time there can always be a region on which a major compaction has not been run. + * Since the prune upper bound will get recorded for a region only after a major compaction, + * using only the latest set of regions we may not be able to find the + * prune upper bounds for all the current regions. Hence we persist the set of regions that exist at that time + * of each run of the plugin, and use historical region set for time <i>t</i>, <i>t - 1</i>, etc. + * to determine the prune upper bound. + * + * From the regions saved at time <i>t</i>, <i>t - 1</i>, etc., + * the plugin tries to find the latest <i>(t, set of regions)</i> where all regions have been major compacted, + * i.e, all regions have prune upper bound recorded in <i>(region, prune upper bound)</i>. + * <br/> + * If such a set is found for time <i>t1</i>, the prune upper bound returned by the plugin is the minimum of + * <ul> + * <li>Prune upper bounds of regions in set <i>(t1, set of regions)</i></li> + * <li>Inactive transaction bound from <i>(t1, inactive transaction bound)</i></li> + * </ul> + * + * <p/> + * Above, when we find <i>(t1, set of regions)</i>, there may a region that was created after time <i>t1</i>, + * but has a data write from an invalid transaction that is smaller than the prune upper bounds of all + * regions in <i>(t1, set of regions)</i>. This is possible because <i>(region, prune upper bound)</i> persisted by + * TransactionProcessor is always the latest prune upper bound for a region. + * <br/> + * However a region created after time <i>t1</i> cannot have writes from an invalid transaction that is smaller than + * inactive transaction bound at the time the region was created. + * Since we limit the plugin prune upper bound using <i>(t1, inactive transaction bound)</i>, + * there should be no invalid transactions smaller than the plugin prune upper bound with writes in any + * transactional region of this HBase instance. + * + * <p/> + * Note: If your tables uses a transactional coprocessor other than TransactionProcessor, + * then you may need to write a new plugin to compute prune upper bound for those tables. + */ +@SuppressWarnings("WeakerAccess") +public class HBaseTransactionPruningPlugin implements TransactionPruningPlugin { + public static final Logger LOG = LoggerFactory.getLogger(HBaseTransactionPruningPlugin.class); + + protected Configuration conf; + protected Connection connection; + protected DataJanitorState dataJanitorState; + + @Override + public void initialize(Configuration conf) throws IOException { + this.conf = conf; + this.connection = ConnectionFactory.createConnection(conf); + + final TableName stateTable = TableName.valueOf(conf.get(TxConstants.TransactionPruning.PRUNE_STATE_TABLE, + TxConstants.TransactionPruning.DEFAULT_PRUNE_STATE_TABLE)); + LOG.info("Initializing plugin with state table {}", stateTable.getNameWithNamespaceInclAsString()); + createPruneTable(stateTable); + this.dataJanitorState = new DataJanitorState(new DataJanitorState.TableSupplier() { + @Override + public Table get() throws IOException { + return connection.getTable(stateTable); + } + }); + } + + /** + * Determines prune upper bound for the data store as mentioned above. + */ + @Override + public long fetchPruneUpperBound(long time, long inactiveTransactionBound) throws IOException { + LOG.debug("Fetching prune upper bound for time {} and inactive transaction bound {}", + time, inactiveTransactionBound); + if (time < 0 || inactiveTransactionBound < 0) { + return -1; + } + + // Get all the current transactional regions + SortedSet<byte[]> transactionalRegions = getTransactionalRegions(); + if (!transactionalRegions.isEmpty()) { + LOG.debug("Saving {} transactional regions for time {}", transactionalRegions.size(), time); + dataJanitorState.saveRegionsForTime(time, transactionalRegions); + // Save inactive transaction bound for time as the final step. + // We can then use its existence to make sure that the data for a given time is complete or not + LOG.debug("Saving inactive transaction bound {} for time {}", inactiveTransactionBound, time); + dataJanitorState.saveInactiveTransactionBoundForTime(time, inactiveTransactionBound); + } + + return computePruneUpperBound(new TimeRegions(time, transactionalRegions)); + } + + /** + * After invalid list has been pruned, this cleans up state information that is no longer required. + * This includes - + * <ul> + * <li> + * <i>(region, prune upper bound)</i> - prune upper bound for regions that are older + * than maxPrunedInvalid + * </li> + * <li> + * <i>(t, set of regions) - Regions set that were recorded on or before the start time + * of maxPrunedInvalid + * </li> + * <li> + * (t, inactive transaction bound) - Smallest not in-progress transaction without any writes in new regions + * information recorded on or before the start time of maxPrunedInvalid + * </li> + * </ul> + */ + @Override + public void pruneComplete(long time, long maxPrunedInvalid) throws IOException { + LOG.debug("Prune complete for time {} and prune upper bound {}", time, maxPrunedInvalid); + if (time < 0 || maxPrunedInvalid < 0) { + return; + } + + // Get regions for the current time, so as to not delete the prune upper bounds for them. + // The prune upper bounds for regions are recorded by TransactionProcessor and the deletion + // is done by this class. To avoid update/delete race condition, we only delete prune upper + // bounds for the stale regions. + TimeRegions regionsToExclude = dataJanitorState.getRegionsOnOrBeforeTime(time); + if (regionsToExclude != null) { + LOG.debug("Deleting prune upper bounds smaller than {} for stale regions", maxPrunedInvalid); + dataJanitorState.deletePruneUpperBounds(maxPrunedInvalid, regionsToExclude.getRegions()); + } else { + LOG.warn("Cannot find saved regions on or before time {}", time); + } + long pruneTime = TxUtils.getTimestamp(maxPrunedInvalid); + LOG.debug("Deleting regions recorded before time {}", pruneTime); + dataJanitorState.deleteAllRegionsOnOrBeforeTime(pruneTime); + LOG.debug("Deleting inactive transaction bounds recorded on or before time {}", pruneTime); + dataJanitorState.deleteInactiveTransactionBoundsOnOrBeforeTime(pruneTime); + LOG.debug("Deleting empty regions recorded on or before time {}", pruneTime); + dataJanitorState.deleteEmptyRegionsOnOrBeforeTime(pruneTime); + } + + @Override + public void destroy() { + LOG.info("Stopping plugin..."); + try { + connection.close(); + } catch (IOException e) { + LOG.error("Got exception while closing HBase connection", e); + } + } + + /** + * Create the prune state table given the {@link TableName} if the table doesn't exist already. + * + * @param stateTable prune state table name + */ + protected void createPruneTable(TableName stateTable) throws IOException { + try (Admin admin = this.connection.getAdmin()) { + if (admin.tableExists(stateTable)) { + LOG.debug("Not creating pruneStateTable {} since it already exists.", + stateTable.getNameWithNamespaceInclAsString()); + return; + } + + HTableDescriptor htd = new HTableDescriptor(stateTable); + htd.addFamily(new HColumnDescriptor(DataJanitorState.FAMILY).setMaxVersions(1)); + admin.createTable(htd); + LOG.info("Created pruneTable {}", stateTable.getNameWithNamespaceInclAsString()); + } catch (TableExistsException ex) { + // Expected if the prune state table is being created at the same time by another client + LOG.debug("Not creating pruneStateTable {} since it already exists.", + stateTable.getNameWithNamespaceInclAsString(), ex); + } + } + + /** + * Returns whether the table is a transactional table. By default, it is a table is identified as a transactional + * table if it has a the coprocessor {@link TransactionProcessor} attached to it. Should be overriden if the users + * attach a different coprocessor. + * + * @param tableDescriptor {@link HTableDescriptor} of the table + * @return true if the table is transactional + */ + protected boolean isTransactionalTable(HTableDescriptor tableDescriptor) { + return tableDescriptor.hasCoprocessor(TransactionProcessor.class.getName()); + } + + protected SortedSet<byte[]> getTransactionalRegions() throws IOException { + SortedSet<byte[]> regions = new TreeSet<>(Bytes.BYTES_COMPARATOR); + try (Admin admin = connection.getAdmin()) { + HTableDescriptor[] tableDescriptors = admin.listTables(); + LOG.debug("Got {} tables to process", tableDescriptors == null ? 0 : tableDescriptors.length); + if (tableDescriptors != null) { + for (HTableDescriptor tableDescriptor : tableDescriptors) { + if (isTransactionalTable(tableDescriptor)) { + List<HRegionInfo> tableRegions = admin.getTableRegions(tableDescriptor.getTableName()); + LOG.debug("Regions for table {}: {}", tableDescriptor.getTableName(), tableRegions); + if (tableRegions != null) { + for (HRegionInfo region : tableRegions) { + regions.add(region.getRegionName()); + } + } + } else { + LOG.debug("{} is not a transactional table", tableDescriptor.getTableName()); + } + } + } + } + return regions; + } + + /** + * Try to find the latest set of regions in which all regions have been major compacted, and + * compute prune upper bound from them. Starting from newest to oldest, this looks into the + * region set that has been saved periodically, and joins it with the prune upper bound data + * for a region recorded after a major compaction. + * + * @param timeRegions the latest set of regions + * @return prune upper bound + * @throws IOException when not able to talk to HBase + */ + private long computePruneUpperBound(TimeRegions timeRegions) throws IOException { + do { + LOG.debug("Computing prune upper bound for {}", timeRegions); + SortedSet<byte[]> transactionalRegions = timeRegions.getRegions(); + long time = timeRegions.getTime(); + + long inactiveTransactionBound = dataJanitorState.getInactiveTransactionBoundForTime(time); + LOG.debug("Got inactive transaction bound {}", inactiveTransactionBound); + // If inactiveTransactionBound is not recorded then that means the data is not complete for these regions + if (inactiveTransactionBound == -1) { + if (LOG.isDebugEnabled()) { + LOG.debug("Ignoring regions for time {} as no inactiveTransactionBound was found for that time, " + + "and hence the data must be incomplete", time); + } + continue; + } + + // Get the prune upper bounds for all the transactional regions + Map<byte[], Long> pruneUpperBoundRegions = + dataJanitorState.getPruneUpperBoundForRegions(transactionalRegions); + logPruneUpperBoundRegions(pruneUpperBoundRegions); + + // Use inactiveTransactionBound as the prune upper bound for the empty regions since the regions that are + // recorded as empty after inactiveTransactionBoundTime will not have invalid data + // for transactions started on or before inactiveTransactionBoundTime + pruneUpperBoundRegions = handleEmptyRegions(inactiveTransactionBound, transactionalRegions, + pruneUpperBoundRegions); + + // If prune upper bounds are found for all the transactional regions, then compute the prune upper bound + // across all regions + if (!transactionalRegions.isEmpty() && + pruneUpperBoundRegions.size() == transactionalRegions.size()) { + Long minPruneUpperBoundRegions = Collections.min(pruneUpperBoundRegions.values()); + long pruneUpperBound = Math.min(inactiveTransactionBound, minPruneUpperBoundRegions); + LOG.debug("Found prune upper bound {} for time {}", pruneUpperBound, time); + return pruneUpperBound; + } else { + if (LOG.isDebugEnabled()) { + Sets.SetView<byte[]> difference = + Sets.difference(transactionalRegions, pruneUpperBoundRegions.keySet()); + LOG.debug("Ignoring regions for time {} because the following regions did not record a pruneUpperBound: {}", + time, Iterables.transform(difference, TimeRegions.BYTE_ARR_TO_STRING_FN)); + } + } + + timeRegions = dataJanitorState.getRegionsOnOrBeforeTime(time - 1); + } while (timeRegions != null); + return -1; + } + + private Map<byte[], Long> handleEmptyRegions(long inactiveTransactionBound, + SortedSet<byte[]> transactionalRegions, + Map<byte[], Long> pruneUpperBoundRegions) throws IOException { + long inactiveTransactionBoundTime = TxUtils.getTimestamp(inactiveTransactionBound); + SortedSet<byte[]> emptyRegions = + dataJanitorState.getEmptyRegionsAfterTime(inactiveTransactionBoundTime, transactionalRegions); + LOG.debug("Got empty transactional regions for inactive transaction bound time {}: {}", + inactiveTransactionBoundTime, Iterables.transform(emptyRegions, TimeRegions.BYTE_ARR_TO_STRING_FN)); + + // The regions that are recorded as empty after inactiveTransactionBoundTime will not have invalid data + // for transactions started before or on inactiveTransactionBoundTime. Hence we can consider the prune upper bound + // for these empty regions as inactiveTransactionBound + Map<byte[], Long> pubWithEmptyRegions = new TreeMap<>(Bytes.BYTES_COMPARATOR); + pubWithEmptyRegions.putAll(pruneUpperBoundRegions); + for (byte[] emptyRegion : emptyRegions) { + if (!pruneUpperBoundRegions.containsKey(emptyRegion)) { + pubWithEmptyRegions.put(emptyRegion, inactiveTransactionBound); + } + } + return Collections.unmodifiableMap(pubWithEmptyRegions); + } + + private void logPruneUpperBoundRegions(Map<byte[], Long> pruneUpperBoundRegions) { + if (LOG.isDebugEnabled()) { + LOG.debug("Got region - prune upper bound map: {}", + Iterables.transform(pruneUpperBoundRegions.entrySet(), + new Function<Map.Entry<byte[], Long>, Map.Entry<String, Long>>() { + @Override + public Map.Entry<String, Long> apply(Map.Entry<byte[], Long> input) { + String regionName = TimeRegions.BYTE_ARR_TO_STRING_FN.apply(input.getKey()); + return Maps.immutableEntry(regionName, input.getValue()); + } + })); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/8f958edb/tephra-hbase-compat-1.3/src/main/java/org/apache/tephra/hbase/txprune/InvalidListPruningDebug.java ---------------------------------------------------------------------- diff --git a/tephra-hbase-compat-1.3/src/main/java/org/apache/tephra/hbase/txprune/InvalidListPruningDebug.java b/tephra-hbase-compat-1.3/src/main/java/org/apache/tephra/hbase/txprune/InvalidListPruningDebug.java new file mode 100644 index 0000000..443c998 --- /dev/null +++ b/tephra-hbase-compat-1.3/src/main/java/org/apache/tephra/hbase/txprune/InvalidListPruningDebug.java @@ -0,0 +1,294 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tephra.hbase.txprune; + +import com.google.common.collect.Iterables; +import com.google.common.collect.MinMaxPriorityQueue; +import com.google.common.collect.Sets; +import com.google.gson.Gson; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.tephra.TxConstants; +import org.apache.tephra.txprune.RegionPruneInfo; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.PrintWriter; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Queue; +import java.util.Set; +import java.util.SortedSet; +import java.util.TreeSet; +import javax.annotation.Nullable; + +/** + * Invalid List Pruning Debug Tool. + */ +public class InvalidListPruningDebug { + private static final Logger LOG = LoggerFactory.getLogger(InvalidListPruningDebug.class); + private static final Gson GSON = new Gson(); + private DataJanitorState dataJanitorState; + private Connection connection; + private TableName tableName; + + /** + * Initialize the Invalid List Debug Tool. + * @param conf {@link Configuration} + * @throws IOException + */ + public void initialize(final Configuration conf) throws IOException { + LOG.debug("InvalidListPruningDebugMain : initialize method called"); + connection = ConnectionFactory.createConnection(conf); + tableName = TableName.valueOf(conf.get(TxConstants.TransactionPruning.PRUNE_STATE_TABLE, + TxConstants.TransactionPruning.DEFAULT_PRUNE_STATE_TABLE)); + dataJanitorState = new DataJanitorState(new DataJanitorState.TableSupplier() { + @Override + public Table get() throws IOException { + return connection.getTable(tableName); + } + }); + } + + public void destroy() throws IOException { + if (connection != null) { + connection.close(); + } + } + + /** + * Returns a set of regions that are live but are not empty nor have a prune upper bound recorded. These regions + * will stop the progress of pruning. + * + * @param numRegions number of regions + * @return {@link Set} of regions that needs to be compacted and flushed + */ + public Set<String> getRegionsToBeCompacted(Integer numRegions) throws IOException { + // Fetch the live regions + Map<Long, SortedSet<String>> latestTimeRegion = getRegionsOnOrBeforeTime(System.currentTimeMillis()); + if (latestTimeRegion.isEmpty()) { + return new HashSet<>(); + } + + Long timestamp = latestTimeRegion.keySet().iterator().next(); + SortedSet<String> liveRegions = latestTimeRegion.get(timestamp); + + SortedSet<byte[]> emptyRegions = dataJanitorState.getEmptyRegionsAfterTime(timestamp, null); + SortedSet<String> emptyRegionNames = new TreeSet<>(); + Iterable<String> regionStrings = Iterables.transform(emptyRegions, TimeRegions.BYTE_ARR_TO_STRING_FN); + for (String regionString : regionStrings) { + emptyRegionNames.add(regionString); + } + + Set<String> nonEmptyRegions = Sets.newHashSet(Sets.difference(liveRegions, emptyRegionNames)); + + // Get all pruned regions and remove them from the nonEmptyRegions, resulting in a set of regions that are + // not empty and have not been registered prune upper bound + Queue<RegionPruneInfo> prunedRegions = getIdleRegions(-1); + for (RegionPruneInfo prunedRegion : prunedRegions) { + if (nonEmptyRegions.contains(prunedRegion.getRegionNameAsString())) { + nonEmptyRegions.remove(prunedRegion.getRegionNameAsString()); + } + } + + if ((numRegions < 0) || (numRegions >= nonEmptyRegions.size())) { + return nonEmptyRegions; + } + + Set<String> subsetRegions = new HashSet<>(numRegions); + for (String regionName : nonEmptyRegions) { + if (subsetRegions.size() == numRegions) { + break; + } + subsetRegions.add(regionName); + } + return subsetRegions; + } + + /** + * Return a list of RegionPruneInfo. These regions are the ones that have the lowest prune upper bounds. + * If -1 is passed in, all the regions and their prune upper bound will be returned. Note that only the regions + * that are known to be live will be returned. + * + * @param numRegions number of regions + * @return Map of region name and its prune upper bound + */ + public Queue<RegionPruneInfo> getIdleRegions(Integer numRegions) throws IOException { + List<RegionPruneInfo> regionPruneInfos = dataJanitorState.getPruneInfoForRegions(null); + if (regionPruneInfos.isEmpty()) { + return new LinkedList<>(); + } + + // Create a set with region names + Set<String> pruneRegionNameSet = new HashSet<>(); + for (RegionPruneInfo regionPruneInfo : regionPruneInfos) { + pruneRegionNameSet.add(regionPruneInfo.getRegionNameAsString()); + } + + // Fetch the live regions + Map<Long, SortedSet<String>> latestTimeRegion = getRegionsOnOrBeforeTime(System.currentTimeMillis()); + if (!latestTimeRegion.isEmpty()) { + SortedSet<String> liveRegions = latestTimeRegion.values().iterator().next(); + Set<String> liveRegionsWithPruneInfo = Sets.intersection(liveRegions, pruneRegionNameSet); + List<RegionPruneInfo> liveRegionWithPruneInfoList = new ArrayList<>(); + for (RegionPruneInfo regionPruneInfo : regionPruneInfos) { + if (liveRegionsWithPruneInfo.contains(regionPruneInfo.getRegionNameAsString())) { + liveRegionWithPruneInfoList.add(regionPruneInfo); + } + } + + // Use the subset of live regions and prune regions + regionPruneInfos = liveRegionWithPruneInfoList; + } + + if (numRegions < 0) { + numRegions = regionPruneInfos.size(); + } + + Queue<RegionPruneInfo> lowestPrunes = MinMaxPriorityQueue.orderedBy(new Comparator<RegionPruneInfo>() { + @Override + public int compare(RegionPruneInfo o1, RegionPruneInfo o2) { + return (int) (o1.getPruneUpperBound() - o2.getPruneUpperBound()); + } + }).maximumSize(numRegions).create(); + + for (RegionPruneInfo pruneInfo : regionPruneInfos) { + lowestPrunes.add(pruneInfo); + } + return lowestPrunes; + } + + /** + * Return the prune upper bound value of a given region. If no prune upper bound has been written for this region yet, + * it will return a null. + * + * @param regionId region id + * @return {@link RegionPruneInfo} of the region + * @throws IOException if there are any errors while trying to fetch the {@link RegionPruneInfo} + */ + @Nullable + public RegionPruneInfo getRegionPruneInfo(String regionId) throws IOException { + return dataJanitorState.getPruneInfoForRegion(Bytes.toBytesBinary(regionId)); + } + + /** + * + * @param time Given a time, provide the {@link TimeRegions} at or before that time + * @return transactional regions that are present at or before the given time + * @throws IOException if there are any errors while trying to fetch the {@link TimeRegions} + */ + public Map<Long, SortedSet<String>> getRegionsOnOrBeforeTime(Long time) throws IOException { + Map<Long, SortedSet<String>> regionMap = new HashMap<>(); + TimeRegions timeRegions = dataJanitorState.getRegionsOnOrBeforeTime(time); + if (timeRegions == null) { + return regionMap; + } + SortedSet<String> regionNames = new TreeSet<>(); + Iterable<String> regionStrings = Iterables.transform(timeRegions.getRegions(), TimeRegions.BYTE_ARR_TO_STRING_FN); + for (String regionString : regionStrings) { + regionNames.add(regionString); + } + regionMap.put(timeRegions.getTime(), regionNames); + return regionMap; + } + + private void printUsage(PrintWriter pw) { + pw.println("Usage : org.apache.tephra.hbase.txprune.InvalidListPruning <command> <parameter>"); + pw.println("Available commands, corresponding parameters are:"); + pw.println("****************************************************"); + pw.println("time-region ts"); + pw.println("Desc: Prints out the transactional regions present in HBase at time 'ts' (in milliseconds) " + + "or the latest time before time 'ts'."); + pw.println("idle-regions limit"); + pw.println("Desc: Prints out 'limit' number of regions which has the lowest prune upper bounds. If '-1' is " + + "provided as the limit, prune upper bounds of all regions are returned."); + pw.println("prune-info region-name-as-string"); + pw.println("Desc: Prints out the Pruning information for the region 'region-name-as-string'"); + pw.println("to-compact-regions limit"); + pw.println("Desc: Prints out 'limit' number of regions that are active, but are not empty, " + + "and have not registered a prune upper bound."); + } + + private boolean execute(String[] args) throws IOException { + try (PrintWriter pw = new PrintWriter(System.out)) { + if (args.length != 2) { + printUsage(pw); + return false; + } + + String command = args[0]; + String parameter = args[1]; + if ("time-region".equals(command)) { + Long time = Long.parseLong(parameter); + Map<Long, SortedSet<String>> timeRegion = getRegionsOnOrBeforeTime(time); + pw.println(GSON.toJson(timeRegion)); + return true; + } else if ("idle-regions".equals(command)) { + Integer numRegions = Integer.parseInt(parameter); + Queue<RegionPruneInfo> regionPruneInfos = getIdleRegions(numRegions); + pw.println(GSON.toJson(regionPruneInfos)); + return true; + } else if ("prune-info".equals(command)) { + RegionPruneInfo regionPruneInfo = getRegionPruneInfo(parameter); + if (regionPruneInfo != null) { + pw.println(GSON.toJson(regionPruneInfo)); + } else { + pw.println(String.format("No prune info found for the region %s.", parameter)); + } + return true; + } else if ("to-compact-regions".equals(command)) { + Integer numRegions = Integer.parseInt(parameter); + Set<String> toBeCompactedRegions = getRegionsToBeCompacted(numRegions); + pw.println(GSON.toJson(toBeCompactedRegions)); + return true; + } else { + pw.println(String.format("%s is not a valid command.", command)); + printUsage(pw); + return false; + } + } + } + + public static void main(String[] args) { + Configuration hConf = HBaseConfiguration.create(); + InvalidListPruningDebug pruningDebug = new InvalidListPruningDebug(); + try { + pruningDebug.initialize(hConf); + boolean success = pruningDebug.execute(args); + pruningDebug.destroy(); + if (!success) { + System.exit(1); + } + } catch (IOException ex) { + LOG.error("Received an exception while trying to execute the debug tool. ", ex); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/8f958edb/tephra-hbase-compat-1.3/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java ---------------------------------------------------------------------- diff --git a/tephra-hbase-compat-1.3/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java b/tephra-hbase-compat-1.3/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java new file mode 100644 index 0000000..677710b --- /dev/null +++ b/tephra-hbase-compat-1.3/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriter.java @@ -0,0 +1,164 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tephra.hbase.txprune; + +import com.google.common.util.concurrent.AbstractIdleService; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.security.User; +import org.apache.hadoop.hbase.util.Bytes; + +import java.io.IOException; +import java.security.PrivilegedExceptionAction; +import java.util.Map; +import java.util.concurrent.ConcurrentSkipListMap; +import java.util.concurrent.TimeUnit; + +/** + * Thread that will write the the prune upper bound. An instance of this class should be obtained only + * through {@link PruneUpperBoundWriterSupplier} which will also handle the lifecycle of this instance. + */ +public class PruneUpperBoundWriter extends AbstractIdleService { + private static final Log LOG = LogFactory.getLog(PruneUpperBoundWriter.class); + + private final TableName tableName; + private final DataJanitorState dataJanitorState; + private final long pruneFlushInterval; + // Map of region name -> prune upper bound + private final ConcurrentSkipListMap<byte[], Long> pruneEntries; + // Map of region name -> time the region was found to be empty + private final ConcurrentSkipListMap<byte[], Long> emptyRegions; + + private volatile Thread flushThread; + private volatile boolean stopped; + + private long lastChecked; + + @SuppressWarnings("WeakerAccess") + public PruneUpperBoundWriter(TableName tableName, DataJanitorState dataJanitorState, long pruneFlushInterval) { + this.tableName = tableName; + this.dataJanitorState = dataJanitorState; + this.pruneFlushInterval = pruneFlushInterval; + this.pruneEntries = new ConcurrentSkipListMap<>(Bytes.BYTES_COMPARATOR); + this.emptyRegions = new ConcurrentSkipListMap<>(Bytes.BYTES_COMPARATOR); + } + + @SuppressWarnings("WeakerAccess") + public void persistPruneEntry(byte[] regionName, long pruneUpperBound) { + warnIfNotRunning(regionName); + // The number of entries in this map is bound by the number of regions in this region server and thus it will not + // grow indefinitely + pruneEntries.put(regionName, pruneUpperBound); + } + + @SuppressWarnings("WeakerAccess") + public void persistRegionEmpty(byte[] regionName, long time) { + warnIfNotRunning(regionName); + // The number of entries in this map is bound by the number of regions in this region server and thus it will not + // grow indefinitely + emptyRegions.put(regionName, time); + } + + @SuppressWarnings("WeakerAccess") + public boolean isAlive() { + return flushThread != null && flushThread.isAlive(); + } + + @Override + protected void startUp() throws Exception { + LOG.info("Starting PruneUpperBoundWriter Thread."); + startFlushThread(); + } + + @Override + protected void shutDown() throws Exception { + LOG.info("Stopping PruneUpperBoundWriter Thread."); + stopped = true; + if (flushThread != null) { + flushThread.interrupt(); + flushThread.join(TimeUnit.SECONDS.toMillis(1)); + if (flushThread.isAlive()) { + flushThread.interrupt(); + flushThread.join(TimeUnit.SECONDS.toMillis(1)); + } + } + } + + private void startFlushThread() { + flushThread = new Thread("tephra-prune-upper-bound-writer") { + @Override + public void run() { + while ((!isInterrupted()) && (!stopped)) { + long now = System.currentTimeMillis(); + if (now > (lastChecked + pruneFlushInterval)) { + // should flush data + try { + User.runAsLoginUser(new PrivilegedExceptionAction<Void>() { + @Override + public Void run() throws Exception { + // Record prune upper bound + while (!pruneEntries.isEmpty()) { + Map.Entry<byte[], Long> firstEntry = pruneEntries.firstEntry(); + dataJanitorState.savePruneUpperBoundForRegion(firstEntry.getKey(), firstEntry.getValue()); + // We can now remove the entry only if the key and value match with what we wrote since it is + // possible that a new pruneUpperBound for the same key has been added + pruneEntries.remove(firstEntry.getKey(), firstEntry.getValue()); + } + // Record empty regions + while (!emptyRegions.isEmpty()) { + Map.Entry<byte[], Long> firstEntry = emptyRegions.firstEntry(); + dataJanitorState.saveEmptyRegionForTime(firstEntry.getValue(), firstEntry.getKey()); + // We can now remove the entry only if the key and value match with what we wrote since it is + // possible that a new value for the same key has been added + emptyRegions.remove(firstEntry.getKey(), firstEntry.getValue()); + } + return null; + } + }); + } catch (IOException ex) { + LOG.warn("Cannot record prune upper bound for a region to table " + + tableName.getNameWithNamespaceInclAsString(), ex); + } + lastChecked = now; + } + + try { + TimeUnit.SECONDS.sleep(1); + } catch (InterruptedException ex) { + interrupt(); + break; + } + } + + LOG.info("PruneUpperBound Writer thread terminated."); + } + }; + + flushThread.setDaemon(true); + flushThread.start(); + } + + private void warnIfNotRunning(byte[] regionName) { + if (!isRunning() || !isAlive()) { + LOG.warn(String.format("Trying to persist prune upper bound for region %s when writer is not %s!", + Bytes.toStringBinary(regionName), isRunning() ? "alive" : "running")); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/8f958edb/tephra-hbase-compat-1.3/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriterSupplier.java ---------------------------------------------------------------------- diff --git a/tephra-hbase-compat-1.3/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriterSupplier.java b/tephra-hbase-compat-1.3/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriterSupplier.java new file mode 100644 index 0000000..cb93fab --- /dev/null +++ b/tephra-hbase-compat-1.3/src/main/java/org/apache/tephra/hbase/txprune/PruneUpperBoundWriterSupplier.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tephra.hbase.txprune; + + +import com.google.common.base.Supplier; +import org.apache.hadoop.hbase.TableName; +import org.apache.tephra.coprocessor.CacheSupplier; +import org.apache.tephra.coprocessor.ReferenceCountedSupplier; + +/** + * Supplies instances of {@link PruneUpperBoundWriter} implementations. + */ +public class PruneUpperBoundWriterSupplier implements CacheSupplier<PruneUpperBoundWriter> { + + private static final ReferenceCountedSupplier<PruneUpperBoundWriter> referenceCountedSupplier = + new ReferenceCountedSupplier<>(PruneUpperBoundWriter.class.getSimpleName()); + + private final Supplier<PruneUpperBoundWriter> supplier; + + public PruneUpperBoundWriterSupplier(final TableName tableName, final DataJanitorState dataJanitorState, + final long pruneFlushInterval) { + this.supplier = new Supplier<PruneUpperBoundWriter>() { + @Override + public PruneUpperBoundWriter get() { + return new PruneUpperBoundWriter(tableName, dataJanitorState, pruneFlushInterval); + } + }; + } + + @Override + public PruneUpperBoundWriter get() { + return referenceCountedSupplier.getOrCreate(supplier); + } + + public void release() { + referenceCountedSupplier.release(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/8f958edb/tephra-hbase-compat-1.3/src/main/java/org/apache/tephra/hbase/txprune/TimeRegions.java ---------------------------------------------------------------------- diff --git a/tephra-hbase-compat-1.3/src/main/java/org/apache/tephra/hbase/txprune/TimeRegions.java b/tephra-hbase-compat-1.3/src/main/java/org/apache/tephra/hbase/txprune/TimeRegions.java new file mode 100644 index 0000000..4ac8887 --- /dev/null +++ b/tephra-hbase-compat-1.3/src/main/java/org/apache/tephra/hbase/txprune/TimeRegions.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tephra.hbase.txprune; + +import com.google.common.base.Function; +import com.google.common.base.Joiner; +import com.google.common.collect.Iterables; +import org.apache.hadoop.hbase.util.Bytes; + +import java.util.Objects; +import java.util.SortedSet; + +/** + * Contains information on the set of transactional regions recorded at a given time + */ +@SuppressWarnings("WeakerAccess") +public class TimeRegions { + static final Function<byte[], String> BYTE_ARR_TO_STRING_FN = + new Function<byte[], String>() { + @Override + public String apply(byte[] input) { + return Bytes.toStringBinary(input); + } + }; + + private final long time; + private final SortedSet<byte[]> regions; + + public TimeRegions(long time, SortedSet<byte[]> regions) { + this.time = time; + this.regions = regions; + } + + public long getTime() { + return time; + } + + public SortedSet<byte[]> getRegions() { + return regions; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + TimeRegions that = (TimeRegions) o; + return time == that.time && + Objects.equals(regions, that.regions); + } + + @Override + public int hashCode() { + return Objects.hash(time, regions); + } + + @Override + public String toString() { + Iterable<String> regionStrings = Iterables.transform(regions, BYTE_ARR_TO_STRING_FN); + return "TimeRegions{" + + "time=" + time + + ", regions=[" + Joiner.on(" ").join(regionStrings) + "]" + + '}'; + } +}
