Porting Pruning changes to hbase-compat-0.96, hbase-compat-0.98, hbase-compat-1.0, hbase-compat-1.0-cdh
This closes #25 from GitHub. Signed-off-by: Gokul Gunasekaran <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/incubator-tephra/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-tephra/commit/92c61e7c Tree: http://git-wip-us.apache.org/repos/asf/incubator-tephra/tree/92c61e7c Diff: http://git-wip-us.apache.org/repos/asf/incubator-tephra/diff/92c61e7c Branch: refs/heads/master Commit: 92c61e7c4ec15e4e0ac94590542eb9c34e396542 Parents: 79b9719 Author: Gokul Gunasekaran <[email protected]> Authored: Wed Dec 21 13:28:51 2016 -0800 Committer: Gokul Gunasekaran <[email protected]> Committed: Thu Jan 19 12:16:23 2017 -0800 ---------------------------------------------------------------------- .../tephra/hbase/TransactionAwareHTable.java | 1 + .../hbase/coprocessor/TransactionProcessor.java | 67 +++- .../tephra/hbase/txprune/CompactionState.java | 94 +++++ .../tephra/hbase/txprune/DataJanitorState.java | 362 ++++++++++++++++++ .../txprune/HBaseTransactionPruningPlugin.java | 306 ++++++++++++++++ .../tephra/hbase/txprune/TimeRegions.java | 85 +++++ .../tephra/hbase/AbstractHBaseTableTest.java | 106 ++++++ .../hbase/TransactionAwareHTableTest.java | 250 +++++++------ .../hbase/txprune/DataJanitorStateTest.java | 210 +++++++++++ .../hbase/txprune/InvalidListPruneTest.java | 366 +++++++++++++++++++ .../tephra/hbase/TransactionAwareHTable.java | 1 + .../hbase/coprocessor/TransactionProcessor.java | 67 +++- .../tephra/hbase/txprune/CompactionState.java | 94 +++++ .../tephra/hbase/txprune/DataJanitorState.java | 362 ++++++++++++++++++ .../txprune/HBaseTransactionPruningPlugin.java | 306 ++++++++++++++++ .../tephra/hbase/txprune/TimeRegions.java | 85 +++++ .../tephra/hbase/AbstractHBaseTableTest.java | 106 ++++++ .../hbase/TransactionAwareHTableTest.java | 136 +++---- .../hbase/txprune/DataJanitorStateTest.java | 210 +++++++++++ .../hbase/txprune/InvalidListPruneTest.java | 366 +++++++++++++++++++ .../tephra/hbase/TransactionAwareHTable.java | 1 + .../hbase/coprocessor/TransactionProcessor.java | 67 +++- .../tephra/hbase/txprune/CompactionState.java | 94 +++++ .../tephra/hbase/txprune/DataJanitorState.java | 362 ++++++++++++++++++ .../txprune/HBaseTransactionPruningPlugin.java | 299 +++++++++++++++ .../tephra/hbase/txprune/TimeRegions.java | 85 +++++ .../tephra/hbase/AbstractHBaseTableTest.java | 106 ++++++ .../hbase/TransactionAwareHTableTest.java | 136 +++---- .../hbase/txprune/DataJanitorStateTest.java | 205 +++++++++++ .../hbase/txprune/InvalidListPruneTest.java | 361 ++++++++++++++++++ .../tephra/hbase/TransactionAwareHTable.java | 1 + .../hbase/coprocessor/TransactionProcessor.java | 67 +++- .../tephra/hbase/txprune/CompactionState.java | 94 +++++ .../tephra/hbase/txprune/DataJanitorState.java | 362 ++++++++++++++++++ .../txprune/HBaseTransactionPruningPlugin.java | 300 +++++++++++++++ .../tephra/hbase/txprune/TimeRegions.java | 85 +++++ .../tephra/hbase/AbstractHBaseTableTest.java | 106 ++++++ .../hbase/TransactionAwareHTableTest.java | 136 +++---- .../hbase/txprune/DataJanitorStateTest.java | 205 +++++++++++ .../hbase/txprune/InvalidListPruneTest.java | 361 ++++++++++++++++++ .../hbase/coprocessor/TransactionProcessor.java | 3 +- 41 files changed, 6689 insertions(+), 327 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/92c61e7c/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/TransactionAwareHTable.java ---------------------------------------------------------------------- diff --git a/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/TransactionAwareHTable.java b/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/TransactionAwareHTable.java index 0fcd85c..e1e5d7d 100644 --- a/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/TransactionAwareHTable.java +++ b/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/TransactionAwareHTable.java @@ -590,6 +590,7 @@ public class TransactionAwareHTable extends AbstractTransactionAwareTable txDelete.setAttribute(entry.getKey(), entry.getValue()); } txDelete.setDurability(delete.getDurability()); + addToOperation(txDelete, tx); return txDelete; } http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/92c61e7c/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java ---------------------------------------------------------------------- diff --git a/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java b/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java index 931032f..6e89571 100644 --- a/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java +++ b/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java @@ -26,9 +26,11 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.CoprocessorEnvironment; +import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Durability; import org.apache.hadoop.hbase.client.Get; @@ -45,6 +47,7 @@ import org.apache.hadoop.hbase.regionserver.KeyValueScanner; import org.apache.hadoop.hbase.regionserver.RegionScanner; import org.apache.hadoop.hbase.regionserver.ScanType; import org.apache.hadoop.hbase.regionserver.Store; +import org.apache.hadoop.hbase.regionserver.StoreFile; import org.apache.hadoop.hbase.regionserver.StoreScanner; import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; @@ -54,6 +57,7 @@ import org.apache.tephra.TransactionCodec; import org.apache.tephra.TxConstants; import org.apache.tephra.coprocessor.TransactionStateCache; import org.apache.tephra.coprocessor.TransactionStateCacheSupplier; +import org.apache.tephra.hbase.txprune.CompactionState; import org.apache.tephra.persist.TransactionVisibilityState; import org.apache.tephra.util.TxUtils; @@ -64,6 +68,8 @@ import java.util.List; import java.util.Map; import java.util.NavigableSet; import java.util.Set; +import java.util.concurrent.TimeUnit; +import javax.annotation.Nullable; /** * {@code org.apache.hadoop.hbase.coprocessor.RegionObserver} coprocessor that handles server-side processing @@ -97,11 +103,13 @@ import java.util.Set; public class TransactionProcessor extends BaseRegionObserver { private static final Log LOG = LogFactory.getLog(TransactionProcessor.class); - private TransactionStateCache cache; private final TransactionCodec txCodec; + private TransactionStateCache cache; + private CompactionState compactionState; protected Map<byte[], Long> ttlByFamily = Maps.newTreeMap(Bytes.BYTES_COMPARATOR); protected boolean allowEmptyValues = TxConstants.ALLOW_EMPTY_VALUES_DEFAULT; protected boolean readNonTxnData = TxConstants.DEFAULT_READ_NON_TX_DATA; + protected long txMaxLifetimeMillis = TimeUnit.SECONDS.toMillis(TxConstants.Manager.DEFAULT_TX_MAX_LIFETIME); public TransactionProcessor() { this.txCodec = new TransactionCodec(); @@ -138,6 +146,20 @@ public class TransactionProcessor extends BaseRegionObserver { if (readNonTxnData) { LOG.info("Reading pre-existing data enabled for table " + tableDesc.getNameAsString()); } + + this.txMaxLifetimeMillis = + TimeUnit.SECONDS.toMillis(env.getConfiguration().getInt(TxConstants.Manager.CFG_TX_MAX_LIFETIME, + TxConstants.Manager.DEFAULT_TX_MAX_LIFETIME)); + + boolean pruneEnabled = env.getConfiguration().getBoolean(TxConstants.TransactionPruning.PRUNE_ENABLE, + TxConstants.TransactionPruning.DEFAULT_PRUNE_ENABLE); + if (pruneEnabled) { + String pruneTable = env.getConfiguration().get(TxConstants.TransactionPruning.PRUNE_STATE_TABLE, + TxConstants.TransactionPruning.DEFAULT_PRUNE_STATE_TABLE); + compactionState = new CompactionState(env, TableName.valueOf(pruneTable), txMaxLifetimeMillis); + LOG.debug("Automatic invalid list pruning is enabled. Compaction state will be recorded in table " + + pruneTable); + } } } @@ -165,6 +187,13 @@ public class TransactionProcessor extends BaseRegionObserver { } @Override + public void prePut(ObserverContext<RegionCoprocessorEnvironment> e, Put put, WALEdit edit, Durability durability) + throws IOException { + Transaction tx = getFromOperation(put); + ensureValidTxLifetime(tx); + } + + @Override public void preDelete(ObserverContext<RegionCoprocessorEnvironment> e, Delete delete, WALEdit edit, Durability durability) throws IOException { // Translate deletes into our own delete tombstones @@ -177,6 +206,9 @@ public class TransactionProcessor extends BaseRegionObserver { return; } + Transaction tx = getFromOperation(delete); + ensureValidTxLifetime(tx); + // Other deletes are client-initiated and need to be translated into our own tombstones // TODO: this should delegate to the DeleteStrategy implementation. Put deleteMarkers = new Put(delete.getRow(), delete.getTimeStamp()); @@ -269,8 +301,24 @@ public class TransactionProcessor extends BaseRegionObserver { List<? extends KeyValueScanner> scanners, ScanType scanType, long earliestPutTs, InternalScanner s, CompactionRequest request) throws IOException { - return createStoreScanner(c.getEnvironment(), "compaction", cache.getLatestState(), store, scanners, scanType, - earliestPutTs); + // Get the latest tx snapshot state for the compaction + TransactionVisibilityState snapshot = cache.getLatestState(); + + // Record tx state before the compaction + if (compactionState != null) { + compactionState.record(request, snapshot); + } + // Also make sure to use the same snapshot for the compaction + return createStoreScanner(c.getEnvironment(), "compaction", snapshot, store, scanners, scanType, earliestPutTs); + } + + @Override + public void postCompact(ObserverContext<RegionCoprocessorEnvironment> e, Store store, StoreFile resultFile, + CompactionRequest request) throws IOException { + // Persist the compaction state after a succesful compaction + if (compactionState != null) { + compactionState.persist(); + } } protected InternalScanner createStoreScanner(RegionCoprocessorEnvironment env, String action, @@ -311,6 +359,19 @@ public class TransactionProcessor extends BaseRegionObserver { return null; } + private void ensureValidTxLifetime(@Nullable Transaction tx) throws DoNotRetryIOException { + if (tx == null) { + return; + } + + boolean validLifetime = + TxUtils.getTimestamp(tx.getTransactionId()) + txMaxLifetimeMillis > System.currentTimeMillis(); + if (!validLifetime) { + throw new DoNotRetryIOException(String.format("Transaction %s has exceeded max lifetime %s ms", + tx.getTransactionId(), txMaxLifetimeMillis)); + } + } + private boolean isRollbackOperation(OperationWithAttributes op) throws IOException { return op.getAttribute(TxConstants.TX_ROLLBACK_ATTRIBUTE_KEY) != null || // to support old clients http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/92c61e7c/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/txprune/CompactionState.java ---------------------------------------------------------------------- diff --git a/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/txprune/CompactionState.java b/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/txprune/CompactionState.java new file mode 100644 index 0000000..1df754b --- /dev/null +++ b/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/txprune/CompactionState.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tephra.hbase.txprune; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.HTableInterface; +import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; +import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; +import org.apache.tephra.Transaction; +import org.apache.tephra.persist.TransactionVisibilityState; +import org.apache.tephra.util.TxUtils; + +import java.io.IOException; +import javax.annotation.Nullable; + +/** + * Record compaction state for invalid list pruning + */ +public class CompactionState { + private static final Log LOG = LogFactory.getLog(CompactionState.class); + + private final byte[] regionName; + private final String regionNameAsString; + private final TableName stateTable; + private final long txMaxLifetimeMills; + private final DataJanitorState dataJanitorState; + private volatile long pruneUpperBound = -1; + + public CompactionState(final RegionCoprocessorEnvironment env, final TableName stateTable, long txMaxLifetimeMills) { + this.regionName = env.getRegion().getRegionName(); + this.regionNameAsString = env.getRegion().getRegionNameAsString(); + this.stateTable = stateTable; + this.txMaxLifetimeMills = txMaxLifetimeMills; + this.dataJanitorState = new DataJanitorState(new DataJanitorState.TableSupplier() { + @Override + public HTableInterface get() throws IOException { + return env.getTable(stateTable); + } + }); + } + + /** + * Records the transaction state used for a compaction. This method is called when the compaction starts. + * + * @param request {@link CompactionRequest} for the compaction + * @param snapshot transaction state that will be used for the compaction + */ + public void record(CompactionRequest request, @Nullable TransactionVisibilityState snapshot) { + if (request.isMajor() && snapshot != null) { + Transaction tx = TxUtils.createDummyTransaction(snapshot); + pruneUpperBound = TxUtils.getPruneUpperBound(tx); + LOG.debug( + String.format("Computed prune upper bound %s for compaction request %s using transaction state from time %s", + pruneUpperBound, request, snapshot.getTimestamp())); + } else { + pruneUpperBound = -1; + } + } + + /** + * Persists the transaction state recorded by {@link #record(CompactionRequest, TransactionVisibilityState)}. + * This method is called after the compaction has successfully completed. + */ + public void persist() { + if (pruneUpperBound != -1) { + try { + dataJanitorState.savePruneUpperBoundForRegion(regionName, pruneUpperBound); + LOG.debug(String.format("Saved prune upper bound %s for region %s", pruneUpperBound, regionNameAsString)); + } catch (IOException e) { + LOG.warn(String.format("Cannot record prune upper bound in table %s after compacting region %s", + stateTable, regionNameAsString), e); + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/92c61e7c/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/txprune/DataJanitorState.java ---------------------------------------------------------------------- diff --git a/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/txprune/DataJanitorState.java b/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/txprune/DataJanitorState.java new file mode 100644 index 0000000..bde843b --- /dev/null +++ b/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/txprune/DataJanitorState.java @@ -0,0 +1,362 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tephra.hbase.txprune; + +import com.google.common.collect.Maps; +import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.HTableInterface; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.tephra.hbase.coprocessor.TransactionProcessor; + +import java.io.IOException; +import java.util.Map; +import java.util.Set; +import java.util.SortedSet; +import java.util.TreeMap; +import java.util.TreeSet; +import javax.annotation.Nullable; + +/** + * Persist data janitor state into an HBase table. + * This is used by both {@link TransactionProcessor} and by the {@link HBaseTransactionPruningPlugin} + * to persist and read the compaction state. + */ +@SuppressWarnings("WeakerAccess") +public class DataJanitorState { + public static final byte[] FAMILY = {'f'}; + + private static final byte[] PRUNE_UPPER_BOUND_COL = {'p'}; + private static final byte[] REGION_TIME_COL = {'r'}; + private static final byte[] INACTIVE_TRANSACTION_BOUND_TIME_COL = {'i'}; + + private static final byte[] REGION_KEY_PREFIX = {0x1}; + private static final byte[] REGION_KEY_PREFIX_STOP = {0x2}; + + private static final byte[] REGION_TIME_KEY_PREFIX = {0x2}; + private static final byte[] REGION_TIME_KEY_PREFIX_STOP = {0x3}; + + private static final byte[] INACTIVE_TRANSACTION_BOUND_TIME_KEY_PREFIX = {0x3}; + private static final byte[] INACTIVE_TRANSACTION_BOUND_TIME_KEY_PREFIX_STOP = {0x4}; + + private static final byte[] EMPTY_BYTE_ARRAY = new byte[0]; + + private final TableSupplier stateTableSupplier; + + + public DataJanitorState(TableSupplier stateTableSupplier) { + this.stateTableSupplier = stateTableSupplier; + } + + // ---------------------------------------------------------------- + // ------- Methods for prune upper bound for a given region ------- + // ---------------------------------------------------------------- + // The data is stored in the following format - + // Key: 0x1<region-id> + // Col 'u': <prune upper bound> + // ---------------------------------------------------------------- + + /** + * Persist the latest prune upper bound for a given region. This is called by {@link TransactionProcessor} + * after major compaction. + * + * @param regionId region id + * @param pruneUpperBound the latest prune upper bound for the region + * @throws IOException when not able to persist the data to HBase + */ + public void savePruneUpperBoundForRegion(byte[] regionId, long pruneUpperBound) throws IOException { + try (HTableInterface stateTable = stateTableSupplier.get()) { + Put put = new Put(makeRegionKey(regionId)); + put.add(FAMILY, PRUNE_UPPER_BOUND_COL, Bytes.toBytes(pruneUpperBound)); + stateTable.put(put); + } + } + + /** + * Get latest prune upper bound for a given region. This indicates the largest invalid transaction that no + * longer has writes in this region. + * + * @param regionId region id + * @return latest prune upper bound for the region + * @throws IOException when not able to read the data from HBase + */ + public long getPruneUpperBoundForRegion(byte[] regionId) throws IOException { + try (HTableInterface stateTable = stateTableSupplier.get()) { + Get get = new Get(makeRegionKey(regionId)); + get.addColumn(FAMILY, PRUNE_UPPER_BOUND_COL); + byte[] result = stateTable.get(get).getValue(FAMILY, PRUNE_UPPER_BOUND_COL); + return result == null ? -1 : Bytes.toLong(result); + } + } + + /** + * Get latest prune upper bounds for given regions. This is a batch operation of method + * {@link #getPruneUpperBoundForRegion(byte[])} + * + * @param regions a set of regions + * @return a map containing region id and its latest prune upper bound value + * @throws IOException when not able to read the data from HBase + */ + public Map<byte[], Long> getPruneUpperBoundForRegions(SortedSet<byte[]> regions) throws IOException { + Map<byte[], Long> resultMap = new TreeMap<>(Bytes.BYTES_COMPARATOR); + try (HTableInterface stateTable = stateTableSupplier.get()) { + byte[] startRow = makeRegionKey(EMPTY_BYTE_ARRAY); + Scan scan = new Scan(startRow, REGION_KEY_PREFIX_STOP); + scan.addColumn(FAMILY, PRUNE_UPPER_BOUND_COL); + + try (ResultScanner scanner = stateTable.getScanner(scan)) { + Result next; + while ((next = scanner.next()) != null) { + byte[] region = getRegionFromKey(next.getRow()); + if (regions.contains(region)) { + byte[] timeBytes = next.getValue(FAMILY, PRUNE_UPPER_BOUND_COL); + if (timeBytes != null) { + long pruneUpperBoundRegion = Bytes.toLong(timeBytes); + resultMap.put(region, pruneUpperBoundRegion); + } + } + } + } + return resultMap; + } + } + + /** + * Delete prune upper bounds for the regions that are not in the given exclude set, and the + * prune upper bound is less than the given value. + * After the invalid list is pruned up to deletionPruneUpperBound, we do not need entries for regions that have + * prune upper bound less than deletionPruneUpperBound. We however limit the deletion to only regions that are + * no longer in existence (due to deletion, etc.), to avoid update/delete race conditions. + * + * @param deletionPruneUpperBound prune upper bound below which regions will be deleted + * @param excludeRegions set of regions that should not be deleted + * @throws IOException when not able to delete data in HBase + */ + public void deletePruneUpperBounds(long deletionPruneUpperBound, SortedSet<byte[]> excludeRegions) + throws IOException { + try (HTableInterface stateTable = stateTableSupplier.get()) { + byte[] startRow = makeRegionKey(EMPTY_BYTE_ARRAY); + Scan scan = new Scan(startRow, REGION_KEY_PREFIX_STOP); + scan.addColumn(FAMILY, PRUNE_UPPER_BOUND_COL); + + try (ResultScanner scanner = stateTable.getScanner(scan)) { + Result next; + while ((next = scanner.next()) != null) { + byte[] region = getRegionFromKey(next.getRow()); + if (!excludeRegions.contains(region)) { + byte[] timeBytes = next.getValue(FAMILY, PRUNE_UPPER_BOUND_COL); + if (timeBytes != null) { + long pruneUpperBoundRegion = Bytes.toLong(timeBytes); + if (pruneUpperBoundRegion < deletionPruneUpperBound) { + stateTable.delete(new Delete(next.getRow())); + } + } + } + } + } + } + } + + // --------------------------------------------------- + // ------- Methods for regions at a given time ------- + // --------------------------------------------------- + // Key: 0x2<time><region-id> + // Col 't': <empty byte array> + // --------------------------------------------------- + + /** + * Persist the regions for the given time. {@link HBaseTransactionPruningPlugin} saves the set of + * transactional regions existing in the HBase instance periodically. + * + * @param time timestamp in milliseconds + * @param regions set of regions at the time + * @throws IOException when not able to persist the data to HBase + */ + public void saveRegionsForTime(long time, Set<byte[]> regions) throws IOException { + byte[] timeBytes = Bytes.toBytes(getInvertedTime(time)); + try (HTableInterface stateTable = stateTableSupplier.get()) { + for (byte[] region : regions) { + Put put = new Put(makeTimeRegionKey(timeBytes, region)); + put.add(FAMILY, REGION_TIME_COL, EMPTY_BYTE_ARRAY); + stateTable.put(put); + } + } + } + + /** + * Return the set of regions saved for the time at or before the given time. This method finds the greatest time + * that is less than or equal to the given time, and then returns all regions with that exact time, but none that are + * older than that. + * + * @param time timestamp in milliseconds + * @return set of regions and time at which they were recorded, or null if no regions found + * @throws IOException when not able to read the data from HBase + */ + @Nullable + public TimeRegions getRegionsOnOrBeforeTime(long time) throws IOException { + byte[] timeBytes = Bytes.toBytes(getInvertedTime(time)); + try (HTableInterface stateTable = stateTableSupplier.get()) { + Scan scan = new Scan(makeTimeRegionKey(timeBytes, EMPTY_BYTE_ARRAY), REGION_TIME_KEY_PREFIX_STOP); + scan.addColumn(FAMILY, REGION_TIME_COL); + + SortedSet<byte[]> regions = new TreeSet<>(Bytes.BYTES_COMPARATOR); + long currentRegionTime = -1; + try (ResultScanner scanner = stateTable.getScanner(scan)) { + Result next; + while ((next = scanner.next()) != null) { + Map.Entry<Long, byte[]> timeRegion = getTimeRegion(next.getRow()); + // Stop if reached next time value + if (currentRegionTime == -1) { + currentRegionTime = timeRegion.getKey(); + } else if (timeRegion.getKey() < currentRegionTime) { + break; + } else if (timeRegion.getKey() > currentRegionTime) { + throw new IllegalStateException( + String.format("Got out of order time %d when expecting time less than or equal to %d", + timeRegion.getKey(), currentRegionTime)); + } + regions.add(timeRegion.getValue()); + } + } + return regions.isEmpty() ? null : new TimeRegions(currentRegionTime, regions); + } + } + + /** + * Delete all the regions that were recorded for all times equal or less than the given time. + * + * @param time timestamp in milliseconds + * @throws IOException when not able to delete data in HBase + */ + public void deleteAllRegionsOnOrBeforeTime(long time) throws IOException { + byte[] timeBytes = Bytes.toBytes(getInvertedTime(time)); + try (HTableInterface stateTable = stateTableSupplier.get()) { + Scan scan = new Scan(makeTimeRegionKey(timeBytes, EMPTY_BYTE_ARRAY), REGION_TIME_KEY_PREFIX_STOP); + scan.addColumn(FAMILY, REGION_TIME_COL); + + try (ResultScanner scanner = stateTable.getScanner(scan)) { + Result next; + while ((next = scanner.next()) != null) { + stateTable.delete(new Delete(next.getRow())); + } + } + } + } + + // --------------------------------------------------------------------- + // ------- Methods for inactive transaction bound for given time ------- + // --------------------------------------------------------------------- + // Key: 0x3<inverted time> + // Col 'p': <inactive transaction bound> + // --------------------------------------------------------------------- + + /** + * Persist inactive transaction bound for a given time. This is the smallest not in-progress transaction that + * will not have writes in any HBase regions that are created after the given time. + * + * @param time time in milliseconds + * @param inactiveTransactionBound inactive transaction bound for the given time + * @throws IOException when not able to persist the data to HBase + */ + public void saveInactiveTransactionBoundForTime(long time, long inactiveTransactionBound) throws IOException { + try (HTableInterface stateTable = stateTableSupplier.get()) { + Put put = new Put(makeInactiveTransactionBoundTimeKey(Bytes.toBytes(getInvertedTime(time)))); + put.add(FAMILY, INACTIVE_TRANSACTION_BOUND_TIME_COL, Bytes.toBytes(inactiveTransactionBound)); + stateTable.put(put); + } + } + + /** + * Return inactive transaction bound for the given time. + * + * @param time time in milliseconds + * @return inactive transaction bound for the given time + * @throws IOException when not able to read the data from HBase + */ + public long getInactiveTransactionBoundForTime(long time) throws IOException { + try (HTableInterface stateTable = stateTableSupplier.get()) { + Get get = new Get(makeInactiveTransactionBoundTimeKey(Bytes.toBytes(getInvertedTime(time)))); + get.addColumn(FAMILY, INACTIVE_TRANSACTION_BOUND_TIME_COL); + byte[] result = stateTable.get(get).getValue(FAMILY, INACTIVE_TRANSACTION_BOUND_TIME_COL); + return result == null ? -1 : Bytes.toLong(result); + } + } + + /** + * Delete all inactive transaction bounds recorded for a time less than the given time + * + * @param time time in milliseconds + * @throws IOException when not able to delete data in HBase + */ + public void deleteInactiveTransactionBoundsOnOrBeforeTime(long time) throws IOException { + try (HTableInterface stateTable = stateTableSupplier.get()) { + Scan scan = new Scan(makeInactiveTransactionBoundTimeKey(Bytes.toBytes(getInvertedTime(time))), + INACTIVE_TRANSACTION_BOUND_TIME_KEY_PREFIX_STOP); + scan.addColumn(FAMILY, INACTIVE_TRANSACTION_BOUND_TIME_COL); + + try (ResultScanner scanner = stateTable.getScanner(scan)) { + Result next; + while ((next = scanner.next()) != null) { + stateTable.delete(new Delete(next.getRow())); + } + } + } + } + + private byte[] makeRegionKey(byte[] regionId) { + return Bytes.add(REGION_KEY_PREFIX, regionId); + } + + private byte[] getRegionFromKey(byte[] regionKey) { + int prefixLen = REGION_KEY_PREFIX.length; + return Bytes.copy(regionKey, prefixLen, regionKey.length - prefixLen); + } + + private byte[] makeTimeRegionKey(byte[] time, byte[] regionId) { + return Bytes.add(REGION_TIME_KEY_PREFIX, time, regionId); + } + + private byte[] makeInactiveTransactionBoundTimeKey(byte[] time) { + return Bytes.add(INACTIVE_TRANSACTION_BOUND_TIME_KEY_PREFIX, time); + } + + private Map.Entry<Long, byte[]> getTimeRegion(byte[] key) { + int offset = REGION_TIME_KEY_PREFIX.length; + long time = getInvertedTime(Bytes.toLong(key, offset)); + offset += Bytes.SIZEOF_LONG; + byte[] regionName = Bytes.copy(key, offset, key.length - offset); + return Maps.immutableEntry(time, regionName); + } + + private long getInvertedTime(long time) { + return Long.MAX_VALUE - time; + } + + /** + * Supplies table for persisting state + */ + public interface TableSupplier { + HTableInterface get() throws IOException; + } +} http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/92c61e7c/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/txprune/HBaseTransactionPruningPlugin.java ---------------------------------------------------------------------- diff --git a/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/txprune/HBaseTransactionPruningPlugin.java b/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/txprune/HBaseTransactionPruningPlugin.java new file mode 100644 index 0000000..44dadc3 --- /dev/null +++ b/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/txprune/HBaseTransactionPruningPlugin.java @@ -0,0 +1,306 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tephra.hbase.txprune; + +import com.google.common.base.Function; +import com.google.common.collect.Iterables; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.client.HConnection; +import org.apache.hadoop.hbase.client.HConnectionManager; +import org.apache.hadoop.hbase.client.HTableInterface; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.tephra.TxConstants; +import org.apache.tephra.hbase.coprocessor.TransactionProcessor; +import org.apache.tephra.txprune.TransactionPruningPlugin; +import org.apache.tephra.util.TxUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.SortedSet; +import java.util.TreeSet; + +/** + * Default implementation of the {@link TransactionPruningPlugin} for HBase. + * + * This plugin determines the prune upper bound for transactional HBase tables that use + * coprocessor {@link TransactionProcessor}. + * + * <h3>State storage:</h3> + * + * This plugin expects the TransactionProcessor to save the prune upper bound for invalid transactions + * after every major compaction of a region. Let's call this <i>(region, prune upper bound)</i>. + * In addition, the plugin also persists the following information on a run at time <i>t</i> + * <ul> + * <li> + * <i>(t, set of regions)</i>: Set of transactional regions at time <i>t</i>. + * Transactional regions are regions of the tables that have the coprocessor TransactionProcessor + * attached to them. + * </li> + * <li> + * <i>(t, inactive transaction bound)</i>: This is the smallest not in-progress transaction that + * will not have writes in any HBase regions that are created after time <i>t</i>. + * This value is determined by the Transaction Service based on the transaction state at time <i>t</i> + * and passed on to the plugin. + * </li> + * </ul> + * + * <h3>Computing prune upper bound:</h3> + * + * In a typical HBase instance, there can be a constant change in the number of regions due to region creations, + * splits and merges. At any given time there can always be a region on which a major compaction has not been run. + * Since the prune upper bound will get recorded for a region only after a major compaction, + * using only the latest set of regions we may not be able to find the + * prune upper bounds for all the current regions. Hence we persist the set of regions that exist at that time + * of each run of the plugin, and use historical region set for time <i>t</i>, <i>t - 1</i>, etc. + * to determine the prune upper bound. + * + * From the regions saved at time <i>t</i>, <i>t - 1</i>, etc., + * the plugin tries to find the latest <i>(t, set of regions)</i> where all regions have been major compacted, + * i.e, all regions have prune upper bound recorded in <i>(region, prune upper bound)</i>. + * <br/> + * If such a set is found for time <i>t1</i>, the prune upper bound returned by the plugin is the minimum of + * <ul> + * <li>Prune upper bounds of regions in set <i>(t1, set of regions)</i></li> + * <li>Inactive transaction bound from <i>(t1, inactive transaction bound)</i></li> + * </ul> + * + * <p/> + * Above, when we find <i>(t1, set of regions)</i>, there may a region that was created after time <i>t1</i>, + * but has a data write from an invalid transaction that is smaller than the prune upper bounds of all + * regions in <i>(t1, set of regions)</i>. This is possible because <i>(region, prune upper bound)</i> persisted by + * TransactionProcessor is always the latest prune upper bound for a region. + * <br/> + * However a region created after time <i>t1</i> cannot have writes from an invalid transaction that is smaller than + * inactive transaction bound at the time the region was created. + * Since we limit the plugin prune upper bound using <i>(t1, inactive transaction bound)</i>, + * there should be no invalid transactions smaller than the plugin prune upper bound with writes in any + * transactional region of this HBase instance. + * + * <p/> + * Note: If your tables uses a transactional coprocessor other than TransactionProcessor, + * then you may need to write a new plugin to compute prune upper bound for those tables. + */ +@SuppressWarnings("WeakerAccess") +public class HBaseTransactionPruningPlugin implements TransactionPruningPlugin { + public static final Logger LOG = LoggerFactory.getLogger(HBaseTransactionPruningPlugin.class); + + protected Configuration conf; + protected HBaseAdmin hBaseAdmin; + protected HConnection connection; + protected DataJanitorState dataJanitorState; + + @Override + public void initialize(Configuration conf) throws IOException { + this.conf = conf; + this.hBaseAdmin = new HBaseAdmin(conf); + this.connection = HConnectionManager.createConnection(conf); + + final TableName stateTable = TableName.valueOf(conf.get(TxConstants.TransactionPruning.PRUNE_STATE_TABLE, + TxConstants.TransactionPruning.DEFAULT_PRUNE_STATE_TABLE)); + LOG.info("Initializing plugin with state table {}:{}", stateTable.getNamespaceAsString(), + stateTable.getNameAsString()); + this.dataJanitorState = new DataJanitorState(new DataJanitorState.TableSupplier() { + @Override + public HTableInterface get() throws IOException { + return connection.getTable(stateTable); + } + }); + } + + /** + * Determines prune upper bound for the data store as mentioned above. + */ + @Override + public long fetchPruneUpperBound(long time, long inactiveTransactionBound) throws IOException { + LOG.debug("Fetching prune upper bound for time {} and inactive transaction bound {}", + time, inactiveTransactionBound); + if (time < 0 || inactiveTransactionBound < 0) { + return -1; + } + + // Get all the current transactional regions + SortedSet<byte[]> transactionalRegions = getTransactionalRegions(); + if (!transactionalRegions.isEmpty()) { + LOG.debug("Saving {} transactional regions for time {}", transactionalRegions.size(), time); + dataJanitorState.saveRegionsForTime(time, transactionalRegions); + // Save inactive transaction bound for time as the final step. + // We can then use its existence to make sure that the data for a given time is complete or not + LOG.debug("Saving inactive transaction bound {} for time {}", inactiveTransactionBound, time); + dataJanitorState.saveInactiveTransactionBoundForTime(time, inactiveTransactionBound); + } + + return computePruneUpperBound(new TimeRegions(time, transactionalRegions)); + } + + /** + * After invalid list has been pruned, this cleans up state information that is no longer required. + * This includes - + * <ul> + * <li> + * <i>(region, prune upper bound)</i> - prune upper bound for regions that are older + * than maxPrunedInvalid + * </li> + * <li> + * <i>(t, set of regions) - Regions set that were recorded on or before the start time + * of maxPrunedInvalid + * </li> + * <li> + * (t, inactive transaction bound) - Smallest not in-progress transaction without any writes in new regions + * information recorded on or before the start time of maxPrunedInvalid + * </li> + * </ul> + */ + @Override + public void pruneComplete(long time, long maxPrunedInvalid) throws IOException { + LOG.debug("Prune complete for time {} and prune upper bound {}", time, maxPrunedInvalid); + if (time < 0 || maxPrunedInvalid < 0) { + return; + } + + // Get regions for the current time, so as to not delete the prune upper bounds for them. + // The prune upper bounds for regions are recorded by TransactionProcessor and the deletion + // is done by this class. To avoid update/delete race condition, we only delete prune upper + // bounds for the stale regions. + TimeRegions regionsToExclude = dataJanitorState.getRegionsOnOrBeforeTime(time); + if (regionsToExclude != null) { + LOG.debug("Deleting prune upper bounds smaller than {} for stale regions", maxPrunedInvalid); + dataJanitorState.deletePruneUpperBounds(maxPrunedInvalid, regionsToExclude.getRegions()); + } else { + LOG.warn("Cannot find saved regions on or before time {}", time); + } + long pruneTime = TxUtils.getTimestamp(maxPrunedInvalid); + LOG.debug("Deleting regions recorded before time {}", pruneTime); + dataJanitorState.deleteAllRegionsOnOrBeforeTime(pruneTime); + LOG.debug("Deleting inactive transaction bounds recorded on or before time {}", pruneTime); + dataJanitorState.deleteInactiveTransactionBoundsOnOrBeforeTime(pruneTime); + } + + @Override + public void destroy() { + LOG.info("Stopping plugin..."); + try { + connection.close(); + } catch (IOException e) { + LOG.error("Got exception while closing HConnection", e); + } + + try { + hBaseAdmin.close(); + } catch (IOException e) { + LOG.error("Got exception while closing HBase admin", e); + } + } + + protected boolean isTransactionalTable(HTableDescriptor tableDescriptor) { + return tableDescriptor.hasCoprocessor(TransactionProcessor.class.getName()); + } + + protected SortedSet<byte[]> getTransactionalRegions() throws IOException { + SortedSet<byte[]> regions = new TreeSet<>(Bytes.BYTES_COMPARATOR); + HTableDescriptor[] tableDescriptors = hBaseAdmin.listTables(); + LOG.debug("Got {} tables to process", tableDescriptors == null ? 0 : tableDescriptors.length); + if (tableDescriptors != null) { + for (HTableDescriptor tableDescriptor : tableDescriptors) { + if (isTransactionalTable(tableDescriptor)) { + List<HRegionInfo> tableRegions = hBaseAdmin.getTableRegions(tableDescriptor.getTableName()); + LOG.debug("Regions for table {}: {}", tableDescriptor.getTableName(), tableRegions); + if (tableRegions != null) { + for (HRegionInfo region : tableRegions) { + regions.add(region.getRegionName()); + } + } + } else { + LOG.debug("{} is not a transactional table", tableDescriptor.getTableName()); + } + } + } + return regions; + } + + /** + * Try to find the latest set of regions in which all regions have been major compacted, and + * compute prune upper bound from them. Starting from newest to oldest, this looks into the + * region set that has been saved periodically, and joins it with the prune upper bound data + * for a region recorded after a major compaction. + * + * @param timeRegions the latest set of regions + * @return prune upper bound + * @throws IOException when not able to talk to HBase + */ + private long computePruneUpperBound(TimeRegions timeRegions) throws IOException { + do { + LOG.debug("Computing prune upper bound for {}", timeRegions); + SortedSet<byte[]> transactionalRegions = timeRegions.getRegions(); + long time = timeRegions.getTime(); + + Map<byte[], Long> pruneUpperBoundRegions = dataJanitorState.getPruneUpperBoundForRegions(transactionalRegions); + logPruneUpperBoundRegions(pruneUpperBoundRegions); + // If prune upper bounds are found for all the transactional regions, then compute the prune upper bound + // across all regions + if (!transactionalRegions.isEmpty() && pruneUpperBoundRegions.size() == transactionalRegions.size()) { + long inactiveTransactionBound = dataJanitorState.getInactiveTransactionBoundForTime(time); + LOG.debug("Found max prune upper bound {} for time {}", inactiveTransactionBound, time); + // If inactiveTransactionBound is not recorded then that means the data is not complete for these regions + if (inactiveTransactionBound != -1) { + Long minPruneUpperBoundRegions = Collections.min(pruneUpperBoundRegions.values()); + return Math.min(inactiveTransactionBound, minPruneUpperBoundRegions); + } else { + if (LOG.isDebugEnabled()) { + LOG.debug("Ignoring regions for time {} as no inactiveTransactionBound was found for that time, " + + "and hence the data must be incomplete", time); + } + } + } else { + if (LOG.isDebugEnabled()) { + Sets.SetView<byte[]> difference = Sets.difference(transactionalRegions, pruneUpperBoundRegions.keySet()); + LOG.debug("Ignoring regions for time {} because the following regions did not record a pruneUpperBound: {}", + time, Iterables.transform(difference, TimeRegions.BYTE_ARR_TO_STRING_FN)); + } + } + + timeRegions = dataJanitorState.getRegionsOnOrBeforeTime(time - 1); + } while (timeRegions != null); + return -1; + } + + private void logPruneUpperBoundRegions(Map<byte[], Long> pruneUpperBoundRegions) { + if (LOG.isDebugEnabled()) { + LOG.debug("Got region - prune upper bound map: {}", + Iterables.transform(pruneUpperBoundRegions.entrySet(), + new Function<Map.Entry<byte[], Long>, Map.Entry<String, Long>>() { + @Override + public Map.Entry<String, Long> apply(Map.Entry<byte[], Long> input) { + String regionName = TimeRegions.BYTE_ARR_TO_STRING_FN.apply(input.getKey()); + return Maps.immutableEntry(regionName, input.getValue()); + } + })); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/92c61e7c/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/txprune/TimeRegions.java ---------------------------------------------------------------------- diff --git a/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/txprune/TimeRegions.java b/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/txprune/TimeRegions.java new file mode 100644 index 0000000..4ac8887 --- /dev/null +++ b/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/txprune/TimeRegions.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tephra.hbase.txprune; + +import com.google.common.base.Function; +import com.google.common.base.Joiner; +import com.google.common.collect.Iterables; +import org.apache.hadoop.hbase.util.Bytes; + +import java.util.Objects; +import java.util.SortedSet; + +/** + * Contains information on the set of transactional regions recorded at a given time + */ +@SuppressWarnings("WeakerAccess") +public class TimeRegions { + static final Function<byte[], String> BYTE_ARR_TO_STRING_FN = + new Function<byte[], String>() { + @Override + public String apply(byte[] input) { + return Bytes.toStringBinary(input); + } + }; + + private final long time; + private final SortedSet<byte[]> regions; + + public TimeRegions(long time, SortedSet<byte[]> regions) { + this.time = time; + this.regions = regions; + } + + public long getTime() { + return time; + } + + public SortedSet<byte[]> getRegions() { + return regions; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + TimeRegions that = (TimeRegions) o; + return time == that.time && + Objects.equals(regions, that.regions); + } + + @Override + public int hashCode() { + return Objects.hash(time, regions); + } + + @Override + public String toString() { + Iterable<String> regionStrings = Iterables.transform(regions, BYTE_ARR_TO_STRING_FN); + return "TimeRegions{" + + "time=" + time + + ", regions=[" + Joiner.on(" ").join(regionStrings) + "]" + + '}'; + } +} http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/92c61e7c/tephra-hbase-compat-0.96/src/test/java/org/apache/tephra/hbase/AbstractHBaseTableTest.java ---------------------------------------------------------------------- diff --git a/tephra-hbase-compat-0.96/src/test/java/org/apache/tephra/hbase/AbstractHBaseTableTest.java b/tephra-hbase-compat-0.96/src/test/java/org/apache/tephra/hbase/AbstractHBaseTableTest.java new file mode 100644 index 0000000..560b0fe --- /dev/null +++ b/tephra-hbase-compat-0.96/src/test/java/org/apache/tephra/hbase/AbstractHBaseTableTest.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tephra.hbase; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Coprocessor; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.tephra.TxConstants; +import org.apache.tephra.hbase.coprocessor.TransactionProcessor; +import org.junit.AfterClass; +import org.junit.BeforeClass; + +import java.util.Collections; +import java.util.List; + +/** + * Base class for tests that need a HBase cluster + */ +@SuppressWarnings("WeakerAccess") +public abstract class AbstractHBaseTableTest { + protected static HBaseTestingUtility testUtil; + protected static HBaseAdmin hBaseAdmin; + protected static Configuration conf; + + @BeforeClass + public static void startMiniCluster() throws Exception { + testUtil = conf == null ? new HBaseTestingUtility() : new HBaseTestingUtility(conf); + conf = testUtil.getConfiguration(); + + // Tune down the connection thread pool size + conf.setInt("hbase.hconnection.threads.core", 5); + conf.setInt("hbase.hconnection.threads.max", 10); + // Tunn down handler threads in regionserver + conf.setInt("hbase.regionserver.handler.count", 10); + + // Set to random port + conf.setInt("hbase.master.port", 0); + conf.setInt("hbase.master.info.port", 0); + conf.setInt("hbase.regionserver.port", 0); + conf.setInt("hbase.regionserver.info.port", 0); + + testUtil.startMiniCluster(); + hBaseAdmin = testUtil.getHBaseAdmin(); + } + + @AfterClass + public static void shutdownMiniCluster() throws Exception { + try { + if (hBaseAdmin != null) { + hBaseAdmin.close(); + } + } finally { + testUtil.shutdownMiniCluster(); + } + } + + protected static HTable createTable(byte[] tableName, byte[][] columnFamilies) throws Exception { + return createTable(tableName, columnFamilies, false, + Collections.singletonList(TransactionProcessor.class.getName())); + } + + protected static HTable createTable(byte[] tableName, byte[][] columnFamilies, boolean existingData, + List<String> coprocessors) throws Exception { + HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(tableName)); + for (byte[] family : columnFamilies) { + HColumnDescriptor columnDesc = new HColumnDescriptor(family); + columnDesc.setMaxVersions(Integer.MAX_VALUE); + columnDesc.setValue(TxConstants.PROPERTY_TTL, String.valueOf(100000)); // in millis + desc.addFamily(columnDesc); + } + if (existingData) { + desc.setValue(TxConstants.READ_NON_TX_DATA, "true"); + } + // Divide individually to prevent any overflow + int priority = Coprocessor.PRIORITY_USER; + // order in list is the same order that coprocessors will be invoked + for (String coprocessor : coprocessors) { + desc.addCoprocessor(coprocessor, null, ++priority, null); + } + hBaseAdmin.createTable(desc); + testUtil.waitTableAvailable(tableName, 5000); + return new HTable(testUtil.getConfiguration(), tableName); + } +} http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/92c61e7c/tephra-hbase-compat-0.96/src/test/java/org/apache/tephra/hbase/TransactionAwareHTableTest.java ---------------------------------------------------------------------- diff --git a/tephra-hbase-compat-0.96/src/test/java/org/apache/tephra/hbase/TransactionAwareHTableTest.java b/tephra-hbase-compat-0.96/src/test/java/org/apache/tephra/hbase/TransactionAwareHTableTest.java index 7efdd2d..afd7c01 100644 --- a/tephra-hbase-compat-0.96/src/test/java/org/apache/tephra/hbase/TransactionAwareHTableTest.java +++ b/tephra-hbase-compat-0.96/src/test/java/org/apache/tephra/hbase/TransactionAwareHTableTest.java @@ -18,86 +18,77 @@ package org.apache.tephra.hbase; import com.google.common.collect.ImmutableList; - import com.google.common.collect.Lists; - import com.google.common.primitives.Longs; - import org.apache.hadoop.conf.Configuration; - import org.apache.hadoop.hbase.Cell; - import org.apache.hadoop.hbase.CellUtil; - import org.apache.hadoop.hbase.Coprocessor; - import org.apache.hadoop.hbase.DoNotRetryIOException; - import org.apache.hadoop.hbase.HBaseTestingUtility; - import org.apache.hadoop.hbase.HColumnDescriptor; - import org.apache.hadoop.hbase.HConstants; - import org.apache.hadoop.hbase.HTableDescriptor; - import org.apache.hadoop.hbase.KeyValue; - import org.apache.hadoop.hbase.TableName; - import org.apache.hadoop.hbase.client.Delete; - import org.apache.hadoop.hbase.client.Durability; - import org.apache.hadoop.hbase.client.Get; - import org.apache.hadoop.hbase.client.HBaseAdmin; - import org.apache.hadoop.hbase.client.HTable; - import org.apache.hadoop.hbase.client.HTableInterface; - import org.apache.hadoop.hbase.client.OperationWithAttributes; - import org.apache.hadoop.hbase.client.Put; - import org.apache.hadoop.hbase.client.Result; - import org.apache.hadoop.hbase.client.ResultScanner; - import org.apache.hadoop.hbase.client.Scan; - import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver; - import org.apache.hadoop.hbase.coprocessor.ObserverContext; - import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; - import org.apache.hadoop.hbase.filter.BinaryComparator; - import org.apache.hadoop.hbase.filter.CompareFilter; - import org.apache.hadoop.hbase.filter.ValueFilter; - import org.apache.hadoop.hbase.regionserver.wal.WALEdit; - import org.apache.hadoop.hbase.util.Bytes; - import org.apache.tephra.Transaction; - import org.apache.tephra.TransactionConflictException; - import org.apache.tephra.TransactionContext; - import org.apache.tephra.TransactionManager; - import org.apache.tephra.TransactionSystemClient; - import org.apache.tephra.TxConstants; +import com.google.common.collect.Lists; +import com.google.common.primitives.Longs; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.DoNotRetryIOException; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.Durability; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.HTableInterface; +import org.apache.hadoop.hbase.client.OperationWithAttributes; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver; +import org.apache.hadoop.hbase.coprocessor.ObserverContext; +import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; +import org.apache.hadoop.hbase.filter.BinaryComparator; +import org.apache.hadoop.hbase.filter.CompareFilter; +import org.apache.hadoop.hbase.filter.ValueFilter; +import org.apache.hadoop.hbase.regionserver.wal.WALEdit; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.tephra.Transaction; +import org.apache.tephra.TransactionConflictException; +import org.apache.tephra.TransactionContext; +import org.apache.tephra.TransactionManager; +import org.apache.tephra.TransactionSystemClient; +import org.apache.tephra.TxConstants; import org.apache.tephra.hbase.coprocessor.TransactionProcessor; import org.apache.tephra.inmemory.InMemoryTxSystemClient; - import org.apache.tephra.metrics.TxMetricsCollector; - import org.apache.tephra.persist.InMemoryTransactionStateStorage; - import org.apache.tephra.persist.TransactionStateStorage; - import org.junit.After; - import org.junit.AfterClass; - import org.junit.Assert; - import org.junit.Before; - import org.junit.BeforeClass; - import org.junit.Test; - import org.slf4j.Logger; - import org.slf4j.LoggerFactory; - - import java.io.IOException; - import java.util.ArrayList; - import java.util.Collection; - import java.util.Collections; - import java.util.Iterator; - import java.util.List; - import java.util.Map; +import org.apache.tephra.metrics.TxMetricsCollector; +import org.apache.tephra.persist.InMemoryTransactionStateStorage; +import org.apache.tephra.persist.TransactionStateStorage; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + import java.util.concurrent.TimeUnit; import static org.junit.Assert.assertArrayEquals; - import static org.junit.Assert.assertEquals; - import static org.junit.Assert.assertFalse; - import static org.junit.Assert.assertNotEquals; - import static org.junit.Assert.assertNotNull; - import static org.junit.Assert.assertNull; - import static org.junit.Assert.assertTrue; - import static org.junit.Assert.fail; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; /** * Tests for TransactionAwareHTables. */ -public class TransactionAwareHTableTest { +public class TransactionAwareHTableTest extends AbstractHBaseTableTest { private static final Logger LOG = LoggerFactory.getLogger(TransactionAwareHTableTest.class); - private static HBaseTestingUtility testUtil; - private static HBaseAdmin hBaseAdmin; - private static TransactionStateStorage txStateStorage; - private static TransactionManager txManager; - private static Configuration conf; + static TransactionStateStorage txStateStorage; + static TransactionManager txManager; private TransactionContext transactionContext; private TransactionAwareHTable transactionAwareHTable; private HTable hTable; @@ -147,23 +138,6 @@ public class TransactionAwareHTableTest { @BeforeClass public static void setupBeforeClass() throws Exception { - testUtil = new HBaseTestingUtility(); - conf = testUtil.getConfiguration(); - - // Tune down the connection thread pool size - conf.setInt("hbase.hconnection.threads.core", 5); - conf.setInt("hbase.hconnection.threads.max", 10); - // Tunn down handler threads in regionserver - conf.setInt("hbase.regionserver.handler.count", 10); - - // Set to random port - conf.setInt("hbase.master.port", 0); - conf.setInt("hbase.master.info.port", 0); - conf.setInt("hbase.regionserver.port", 0); - conf.setInt("hbase.regionserver.info.port", 0); - - testUtil.startMiniCluster(); - hBaseAdmin = testUtil.getHBaseAdmin(); txStateStorage = new InMemoryTransactionStateStorage(); txManager = new TransactionManager(conf, txStateStorage, new TxMetricsCollector()); txManager.startAndWait(); @@ -171,8 +145,9 @@ public class TransactionAwareHTableTest { @AfterClass public static void shutdownAfterClass() throws Exception { - testUtil.shutdownMiniCluster(); - hBaseAdmin.close(); + if (txManager != null) { + txManager.stopAndWait(); + } } @Before @@ -188,34 +163,6 @@ public class TransactionAwareHTableTest { hBaseAdmin.deleteTable(TestBytes.table); } - private HTable createTable(byte[] tableName, byte[][] columnFamilies) throws Exception { - return createTable(tableName, columnFamilies, false, Collections.<String>emptyList()); - } - - private HTable createTable(byte[] tableName, byte[][] columnFamilies, boolean existingData, - List<String> coprocessors) throws Exception { - HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(tableName)); - for (byte[] family : columnFamilies) { - HColumnDescriptor columnDesc = new HColumnDescriptor(family); - columnDesc.setMaxVersions(Integer.MAX_VALUE); - columnDesc.setValue(TxConstants.PROPERTY_TTL, String.valueOf(100000)); // in millis - desc.addFamily(columnDesc); - } - if (existingData) { - desc.setValue(TxConstants.READ_NON_TX_DATA, "true"); - } - // Divide individually to prevent any overflow - int priority = Coprocessor.PRIORITY_USER; - desc.addCoprocessor(TransactionProcessor.class.getName(), null, priority, null); - // order in list is the same order that coprocessors will be invoked - for (String coprocessor : coprocessors) { - desc.addCoprocessor(coprocessor, null, ++priority, null); - } - hBaseAdmin.createTable(desc); - testUtil.waitTableAvailable(tableName, 5000); - return new HTable(testUtil.getConfiguration(), tableName); - } - /** * Test transactional put and get requests. * @@ -410,7 +357,7 @@ public class TransactionAwareHTableTest { public void testAttributesPreserved() throws Exception { HTable hTable = createTable(Bytes.toBytes("TestAttributesPreserved"), new byte[][]{TestBytes.family, TestBytes.family2}, false, - Lists.newArrayList(TestRegionObserver.class.getName())); + Lists.newArrayList(TransactionProcessor.class.getName(), TestRegionObserver.class.getName())); try { TransactionAwareHTable txTable = new TransactionAwareHTable(hTable); TransactionContext txContext = new TransactionContext(new InMemoryTxSystemClient(txManager), txTable); @@ -1124,7 +1071,7 @@ public class TransactionAwareHTableTest { TransactionAwareHTable txTable = new TransactionAwareHTable(createTable(Bytes.toBytes("testExistingData"), new byte[][]{TestBytes.family}, true, - Collections.<String>emptyList())); + Collections.singletonList(TransactionProcessor.class.getName()))); TransactionContext txContext = new TransactionContext(new InMemoryTxSystemClient(txManager), txTable); // Add some pre-existing, non-transactional data @@ -1273,8 +1220,9 @@ public class TransactionAwareHTableTest { @Test public void testVisibilityAll() throws Exception { - HTable nonTxTable = createTable(Bytes.toBytes("testVisibilityAll"), - new byte[][]{TestBytes.family, TestBytes.family2}, true, Collections.<String>emptyList()); + HTable nonTxTable = + createTable(Bytes.toBytes("testVisibilityAll"), new byte[][]{TestBytes.family, TestBytes.family2}, + true, Collections.singletonList(TransactionProcessor.class.getName())); TransactionAwareHTable txTable = new TransactionAwareHTable(nonTxTable, TxConstants.ConflictDetection.ROW); // ROW conflict detection to verify family deletes @@ -1552,6 +1500,66 @@ public class TransactionAwareHTableTest { transactionContext.finish(); } + @Test + public void testTxLifetime() throws Exception { + // Add some initial values + transactionContext.start(); + Put put = new Put(TestBytes.row).add(TestBytes.family, TestBytes.qualifier, TestBytes.value); + transactionAwareHTable.put(put); + put = new Put(TestBytes.row).add(TestBytes.family, TestBytes.qualifier2, TestBytes.value); + transactionAwareHTable.put(put); + transactionContext.finish(); + + // Simulate writing with a transaction past its max lifetime + transactionContext.start(); + Transaction currentTx = transactionContext.getCurrentTransaction(); + Assert.assertNotNull(currentTx); + + // Create a transaction that is past the max lifetime + long txMaxLifetimeMillis = TimeUnit.SECONDS.toMillis(conf.getInt(TxConstants.Manager.CFG_TX_MAX_LIFETIME, + TxConstants.Manager.DEFAULT_TX_MAX_LIFETIME)); + long oldTxId = currentTx.getTransactionId() - ((txMaxLifetimeMillis + 10000) * TxConstants.MAX_TX_PER_MS); + Transaction oldTx = new Transaction(currentTx.getReadPointer(), oldTxId, + currentTx.getInvalids(), currentTx.getInProgress(), + currentTx.getFirstShortInProgress()); + transactionAwareHTable.updateTx(oldTx); + // Put with the old transaction should fail + put = new Put(TestBytes.row2).add(TestBytes.family, TestBytes.qualifier, TestBytes.value); + try { + transactionAwareHTable.put(put); + Assert.fail("Excepted exception with old transaction!"); + } catch (IOException e) { + // Expected exception + } + + // Delete with the old transaction should also fail + Delete delete = new Delete(TestBytes.row).deleteColumn(TestBytes.family, TestBytes.qualifier); + try { + transactionAwareHTable.delete(delete); + Assert.fail("Excepted exception with old transaction!"); + } catch (IOException e) { + // Expected exception + } + + // Now update the table to use the current transaction + transactionAwareHTable.updateTx(currentTx); + put = new Put(TestBytes.row2).add(TestBytes.family, TestBytes.qualifier2, TestBytes.value); + transactionAwareHTable.put(put); + delete = new Delete(TestBytes.row).deleteColumn(TestBytes.family, TestBytes.qualifier2); + transactionAwareHTable.delete(delete); + + // Verify values with the same transaction since we cannot commit the old transaction + verifyRow(transactionAwareHTable, + new Get(TestBytes.row).addColumn(TestBytes.family, TestBytes.qualifier), TestBytes.value); + verifyRow(transactionAwareHTable, + new Get(TestBytes.row).addColumn(TestBytes.family, TestBytes.qualifier2), null); + verifyRow(transactionAwareHTable, + new Get(TestBytes.row2).addColumn(TestBytes.family, TestBytes.qualifier), null); + verifyRow(transactionAwareHTable, + new Get(TestBytes.row2).addColumn(TestBytes.family, TestBytes.qualifier2), TestBytes.value); + transactionContext.finish(); + } + /** * Tests that transaction co-processor works with older clients * http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/92c61e7c/tephra-hbase-compat-0.96/src/test/java/org/apache/tephra/hbase/txprune/DataJanitorStateTest.java ---------------------------------------------------------------------- diff --git a/tephra-hbase-compat-0.96/src/test/java/org/apache/tephra/hbase/txprune/DataJanitorStateTest.java b/tephra-hbase-compat-0.96/src/test/java/org/apache/tephra/hbase/txprune/DataJanitorStateTest.java new file mode 100644 index 0000000..3ae0423 --- /dev/null +++ b/tephra-hbase-compat-0.96/src/test/java/org/apache/tephra/hbase/txprune/DataJanitorStateTest.java @@ -0,0 +1,210 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tephra.hbase.txprune; + + +import com.google.common.collect.ImmutableSortedMap; +import com.google.common.collect.ImmutableSortedSet; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.HConnection; +import org.apache.hadoop.hbase.client.HConnectionManager; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.HTableInterface; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.tephra.TxConstants; +import org.apache.tephra.hbase.AbstractHBaseTableTest; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; +import java.util.Collections; +import java.util.Map; +import java.util.SortedSet; +import java.util.TreeMap; +import java.util.TreeSet; + +/** + * Test methods of {@link DataJanitorState} + */ +// TODO: Group all the tests that need HBase mini cluster into a suite, so that we start the mini-cluster only once +public class DataJanitorStateTest extends AbstractHBaseTableTest { + + private TableName pruneStateTable; + private DataJanitorState dataJanitorState; + private HConnection connection; + + @Before + public void beforeTest() throws Exception { + pruneStateTable = TableName.valueOf(conf.get(TxConstants.TransactionPruning.PRUNE_STATE_TABLE, + TxConstants.TransactionPruning.DEFAULT_PRUNE_STATE_TABLE)); + HTable table = createTable(pruneStateTable.getName(), new byte[][]{DataJanitorState.FAMILY}, false, + // Prune state table is a non-transactional table, hence no transaction co-processor + Collections.<String>emptyList()); + table.close(); + connection = HConnectionManager.createConnection(conf); + + dataJanitorState = + new DataJanitorState(new DataJanitorState.TableSupplier() { + @Override + public HTableInterface get() throws IOException { + return connection.getTable(pruneStateTable); + } + }); + + } + + @After + public void afterTest() throws Exception { + hBaseAdmin.disableTable(pruneStateTable); + hBaseAdmin.deleteTable(pruneStateTable); + connection.close(); + } + + @Test + public void testSavePruneUpperBound() throws Exception { + int max = 20; + + // Nothing should be present in the beginning + Assert.assertEquals(-1, dataJanitorState.getPruneUpperBoundForRegion(Bytes.toBytes(10L))); + + // Save some region - prune upper bound values + // We should have values for regions 0, 2, 4, 6, ..., max-2 after this + for (long i = 0; i < max; i += 2) { + dataJanitorState.savePruneUpperBoundForRegion(Bytes.toBytes(i), i); + } + + Assert.assertEquals(10L, dataJanitorState.getPruneUpperBoundForRegion(Bytes.toBytes(10L))); + + // Verify all the saved values + for (long i = 0; i < max; ++i) { + long expected = i % 2 == 0 ? i : -1; + Assert.assertEquals(expected, dataJanitorState.getPruneUpperBoundForRegion(Bytes.toBytes(i))); + } + // Regions not present should give -1 + Assert.assertEquals(-1, dataJanitorState.getPruneUpperBoundForRegion(Bytes.toBytes(max + 50L))); + Assert.assertEquals(-1, dataJanitorState.getPruneUpperBoundForRegion(Bytes.toBytes((max + 10L) * -1))); + Assert.assertEquals(-1, dataJanitorState.getPruneUpperBoundForRegion(Bytes.toBytes(3L))); + + SortedSet<byte[]> allRegions = new TreeSet<>(Bytes.BYTES_COMPARATOR); + Map<byte[], Long> expectedMap = new TreeMap<>(Bytes.BYTES_COMPARATOR); + for (long i = 0; i < max; ++i) { + allRegions.add(Bytes.toBytes(i)); + if (i % 2 == 0) { + expectedMap.put(Bytes.toBytes(i), i); + } + } + Assert.assertEquals(max / 2, expectedMap.size()); + Assert.assertEquals(expectedMap, dataJanitorState.getPruneUpperBoundForRegions(allRegions)); + + SortedSet<byte[]> regions = ImmutableSortedSet.orderedBy(Bytes.BYTES_COMPARATOR) + .add(Bytes.toBytes((max + 20L) * -1)) + .add(Bytes.toBytes(6L)) + .add(Bytes.toBytes(15L)) + .add(Bytes.toBytes(18L)) + .add(Bytes.toBytes(max + 33L)) + .build(); + expectedMap = ImmutableSortedMap.<byte[], Long>orderedBy(Bytes.BYTES_COMPARATOR) + .put(Bytes.toBytes(6L), 6L) + .put(Bytes.toBytes(18L), 18L) + .build(); + Assert.assertEquals(expectedMap, dataJanitorState.getPruneUpperBoundForRegions(regions)); + + // Delete regions that have prune upper bound before 15 and not in set (4, 8) + ImmutableSortedSet<byte[]> excludeRegions = + ImmutableSortedSet.orderedBy(Bytes.BYTES_COMPARATOR).add(Bytes.toBytes(4L)).add(Bytes.toBytes(8L)).build(); + dataJanitorState.deletePruneUpperBounds(15, excludeRegions); + // Regions 0, 2, 6 and 10 should have been deleted now + expectedMap = ImmutableSortedMap.<byte[], Long>orderedBy(Bytes.BYTES_COMPARATOR) + .put(Bytes.toBytes(4L), 4L) + .put(Bytes.toBytes(8L), 8L) + .put(Bytes.toBytes(16L), 16L) + .put(Bytes.toBytes(18L), 18L) + .build(); + Assert.assertEquals(expectedMap, dataJanitorState.getPruneUpperBoundForRegions(allRegions)); + } + + @Test + public void testSaveRegionTime() throws Exception { + int maxTime = 100; + + // Nothing should be present in the beginning + Assert.assertNull(dataJanitorState.getRegionsOnOrBeforeTime(maxTime)); + + // Save regions for time + Map<Long, SortedSet<byte[]>> regionsTime = new TreeMap<>(); + for (long time = 0; time < maxTime; time += 10) { + SortedSet<byte[]> regions = new TreeSet<>(Bytes.BYTES_COMPARATOR); + for (long region = 0; region < 10; region += 2) { + regions.add(Bytes.toBytes((time * 10) + region)); + } + regionsTime.put(time, regions); + dataJanitorState.saveRegionsForTime(time, regions); + } + + // Verify saved regions + Assert.assertEquals(new TimeRegions(30, regionsTime.get(30L)), dataJanitorState.getRegionsOnOrBeforeTime(30)); + Assert.assertEquals(new TimeRegions(20, regionsTime.get(20L)), dataJanitorState.getRegionsOnOrBeforeTime(25)); + Assert.assertEquals(new TimeRegions(30, regionsTime.get(30L)), dataJanitorState.getRegionsOnOrBeforeTime(31)); + Assert.assertEquals(new TimeRegions(90, regionsTime.get(90L)), + dataJanitorState.getRegionsOnOrBeforeTime(maxTime + 1000)); + Assert.assertNull(dataJanitorState.getRegionsOnOrBeforeTime(-10)); + + // Delete regions saved on or before time 30 + dataJanitorState.deleteAllRegionsOnOrBeforeTime(30); + // Values on or before time 30 should be deleted + Assert.assertNull(dataJanitorState.getRegionsOnOrBeforeTime(30)); + Assert.assertNull(dataJanitorState.getRegionsOnOrBeforeTime(25)); + // Values after time 30 should still exist + Assert.assertEquals(new TimeRegions(40, regionsTime.get(40L)), dataJanitorState.getRegionsOnOrBeforeTime(40)); + } + + @Test + public void testSaveInactiveTransactionBoundTime() throws Exception { + int maxTime = 100; + + // Nothing sould be present in the beginning + Assert.assertEquals(-1, dataJanitorState.getInactiveTransactionBoundForTime(10)); + + // Save inactive transaction bounds for various time values + for (long time = 0; time < maxTime; time += 10) { + dataJanitorState.saveInactiveTransactionBoundForTime(time, time + 2); + } + + // Verify written values + Assert.assertEquals(2, dataJanitorState.getInactiveTransactionBoundForTime(0)); + Assert.assertEquals(12, dataJanitorState.getInactiveTransactionBoundForTime(10)); + Assert.assertEquals(-1, dataJanitorState.getInactiveTransactionBoundForTime(15)); + Assert.assertEquals(92, dataJanitorState.getInactiveTransactionBoundForTime(90)); + Assert.assertEquals(-1, dataJanitorState.getInactiveTransactionBoundForTime(maxTime + 100)); + Assert.assertEquals(-1, dataJanitorState.getInactiveTransactionBoundForTime((maxTime + 55) * -1L)); + + // Delete values saved on or before time 20 + dataJanitorState.deleteInactiveTransactionBoundsOnOrBeforeTime(20); + // Values on or before time 20 should be deleted + Assert.assertEquals(-1, dataJanitorState.getInactiveTransactionBoundForTime(0)); + Assert.assertEquals(-1, dataJanitorState.getInactiveTransactionBoundForTime(10)); + Assert.assertEquals(-1, dataJanitorState.getInactiveTransactionBoundForTime(20)); + // Values after time 20 should still exist + Assert.assertEquals(32, dataJanitorState.getInactiveTransactionBoundForTime(30)); + Assert.assertEquals(92, dataJanitorState.getInactiveTransactionBoundForTime(90)); + } +}
