[
https://issues.apache.org/jira/browse/TEPHRA-272?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16318414#comment-16318414
]
ASF GitHub Bot commented on TEPHRA-272:
---------------------------------------
Github user poornachandra commented on a diff in the pull request:
https://github.com/apache/incubator-tephra/pull/67#discussion_r160407023
--- Diff:
tephra-hbase-compat-2.0/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionProcessorTest.java
---
@@ -0,0 +1,677 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra.hbase.coprocessor;
+
+import com.google.common.collect.ImmutableSortedMap;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+import it.unimi.dsi.fastutil.longs.LongArrayList;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.ChoreService;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.MockRegionServerServices;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
+import org.apache.hadoop.hbase.io.util.MemorySizeUtil;
+import org.apache.hadoop.hbase.regionserver.ChunkCreator;
+import org.apache.hadoop.hbase.regionserver.FlushLifeCycleTracker;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.HRegion.FlushResult;
+import org.apache.hadoop.hbase.regionserver.HRegion.FlushResultImpl;
+import org.apache.hadoop.hbase.regionserver.HRegionFileSystem;
+import org.apache.hadoop.hbase.regionserver.MemStoreLAB;
+import org.apache.hadoop.hbase.regionserver.MemStoreLABImpl;
+import org.apache.hadoop.hbase.regionserver.RegionScanner;
+import org.apache.hadoop.hbase.regionserver.ScanType;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.wal.WAL;
+import org.apache.hadoop.hbase.wal.WALFactory;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.tephra.Transaction;
+import org.apache.tephra.TransactionManager;
+import org.apache.tephra.TxConstants;
+import org.apache.tephra.coprocessor.TransactionStateCache;
+import org.apache.tephra.coprocessor.TransactionStateCacheSupplier;
+import org.apache.tephra.manager.InvalidTxList;
+import org.apache.tephra.metrics.TxMetricsCollector;
+import org.apache.tephra.persist.HDFSTransactionStateStorage;
+import org.apache.tephra.persist.TransactionSnapshot;
+import org.apache.tephra.persist.TransactionVisibilityState;
+import org.apache.tephra.snapshot.DefaultSnapshotCodec;
+import org.apache.tephra.snapshot.SnapshotCodecProvider;
+import org.apache.tephra.util.TxUtils;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.lang.management.ManagementFactory;
+import java.net.InetAddress;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests filtering of invalid transaction data by the {@link
TransactionProcessor} coprocessor.
+ */
+public class TransactionProcessorTest {
+ private static final Logger LOG =
LoggerFactory.getLogger(TransactionProcessorTest.class);
+ protected static ChunkCreator chunkCreator;
+ // 8 versions, 1 hour apart, latest is current ts.
+ private static final long[] V;
+
+ static {
+ long now = System.currentTimeMillis();
+ V = new long[9];
+ for (int i = 0; i < V.length; i++) {
+ V[i] = (now - TimeUnit.HOURS.toMillis(8 - i)) *
TxConstants.MAX_TX_PER_MS;
+ }
+ }
+
+ @ClassRule
+ public static TemporaryFolder tmpFolder = new TemporaryFolder();
+ private static MiniDFSCluster dfsCluster;
+ private static Configuration conf;
+ private static LongArrayList invalidSet = new LongArrayList(new
long[]{V[3], V[5], V[7]});
+ private static TransactionVisibilityState txVisibilityState;
+
+ @BeforeClass
+ public static void setupBeforeClass() throws Exception {
+ Configuration hConf = new Configuration();
+ String rootDir = tmpFolder.newFolder().getAbsolutePath();
+ hConf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, rootDir);
+ hConf.set(HConstants.HBASE_DIR, rootDir + "/hbase");
+
+ dfsCluster = new MiniDFSCluster.Builder(hConf).numDataNodes(1).build();
+ dfsCluster.waitActive();
+ conf = HBaseConfiguration.create(dfsCluster.getFileSystem().getConf());
+
+ conf.unset(TxConstants.Manager.CFG_TX_HDFS_USER);
+ conf.unset(TxConstants.Persist.CFG_TX_SNAPHOT_CODEC_CLASSES);
+ String localTestDir = tmpFolder.newFolder().getAbsolutePath();
+ conf.set(TxConstants.Manager.CFG_TX_SNAPSHOT_DIR, localTestDir);
+ conf.set(TxConstants.Persist.CFG_TX_SNAPHOT_CODEC_CLASSES,
DefaultSnapshotCodec.class.getName());
+
+ // write an initial transaction snapshot
+ InvalidTxList invalidTxList = new InvalidTxList();
+ invalidTxList.addAll(invalidSet);
+ TransactionSnapshot txSnapshot = TransactionSnapshot.copyFrom(
+ System.currentTimeMillis(), V[6] - 1, V[7], invalidTxList,
+ // this will set visibility upper bound to V[6]
+ Maps.newTreeMap(ImmutableSortedMap.of(V[6], new
TransactionManager.InProgressTx(
+ V[6] - 1, Long.MAX_VALUE,
TransactionManager.InProgressType.SHORT))),
+ new HashMap<Long, TransactionManager.ChangeSet>(), new
TreeMap<Long, TransactionManager.ChangeSet>());
+ txVisibilityState = new TransactionSnapshot(txSnapshot.getTimestamp(),
txSnapshot.getReadPointer(),
+
txSnapshot.getWritePointer(), txSnapshot.getInvalid(),
+
txSnapshot.getInProgress());
+ HDFSTransactionStateStorage tmpStorage =
+ new HDFSTransactionStateStorage(conf, new
SnapshotCodecProvider(conf), new TxMetricsCollector());
+ tmpStorage.startAndWait();
+ tmpStorage.writeSnapshot(txSnapshot);
+ tmpStorage.stopAndWait();
+ long globalMemStoreLimit = (long)
(ManagementFactory.getMemoryMXBean().getHeapMemoryUsage()
+ .getMax() * MemorySizeUtil.getGlobalMemStoreHeapPercent(conf,
false));
+ chunkCreator =
ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false,
+ globalMemStoreLimit, 0.2f,
MemStoreLAB.POOL_INITIAL_SIZE_DEFAULT, null);
+ assertTrue(chunkCreator != null);
+ }
+
+ @AfterClass
+ public static void shutdownAfterClass() throws Exception {
+ dfsCluster.shutdown();
+ }
+
+ @Test
+ public void testDataJanitorRegionScanner() throws Exception {
+ String tableName = "TestRegionScanner";
+ byte[] familyBytes = Bytes.toBytes("f");
+ byte[] columnBytes = Bytes.toBytes("c");
+ HRegion region = createRegion(tableName, familyBytes,
TimeUnit.HOURS.toMillis(3));
+ try {
+ region.initialize();
+ TransactionStateCache cache = new
TransactionStateCacheSupplier(conf).get();
+ LOG.info("Coprocessor is using transaction state: " +
waitForTransactionState(cache));
+
+ for (int i = 1; i <= 8; i++) {
+ for (int k = 1; k <= i; k++) {
+ Put p = new Put(Bytes.toBytes(i));
+ p.addColumn(familyBytes, columnBytes, V[k], Bytes.toBytes(V[k]));
+ region.put(p);
+ }
+ }
+
+ List<Cell> results = Lists.newArrayList();
+
+ // force a flush to clear the data
+ // during flush, the coprocessor should drop all KeyValues with
timestamps in the invalid set
+
+ LOG.info("Flushing region " +
region.getRegionInfo().getRegionNameAsString());
+ FlushResultImpl flushResult = region.flushcache(true, false, new
FlushLifeCycleTracker() { });
+ Assert.assertTrue("Unexpected flush result: " + flushResult,
flushResult.isFlushSucceeded());
+
+ // now a normal scan should only return the valid rows
+ // do not use a filter here to test that cleanup works on flush
+ Scan scan = new Scan();
+ scan.setMaxVersions(10);
+ RegionScanner regionScanner = region.getScanner(scan);
+
+ // first returned value should be "4" with version "4"
+ results.clear();
+ assertTrue(regionScanner.next(results));
+ assertKeyValueMatches(results, 4, new long[]{V[4]});
+
+ results.clear();
+ assertTrue(regionScanner.next(results));
+ assertKeyValueMatches(results, 5, new long[] {V[4]});
+
+ results.clear();
+ assertTrue(regionScanner.next(results));
+ assertKeyValueMatches(results, 6, new long[]{V[6], V[4]});
+
+ results.clear();
+ assertTrue(regionScanner.next(results));
+ assertKeyValueMatches(results, 7, new long[]{V[6], V[4]});
+
+ results.clear();
+ assertFalse(regionScanner.next(results));
+ assertKeyValueMatches(results, 8, new long[] {V[8], V[6], V[4]});
+ } finally {
+ //region.close();
+ }
+ }
+
+ @Test
+ public void testDeleteFiltering() throws Exception {
+ String tableName = "TestDeleteFiltering";
+ byte[] familyBytes = Bytes.toBytes("f");
+ byte[] columnBytes = Bytes.toBytes("c");
+ HRegion region = createRegion(tableName, familyBytes, 0);
+ try {
+ region.initialize();
+ TransactionStateCache cache = new
TransactionStateCacheSupplier(conf).get();
+ LOG.info("Coprocessor is using transaction state: " +
waitForTransactionState(cache));
+
+ byte[] row = Bytes.toBytes(1);
+ for (int i = 4; i < V.length; i++) {
+ Put p = new Put(row);
+ p.addColumn(familyBytes, columnBytes, V[i], Bytes.toBytes(V[i]));
+ region.put(p);
+ }
+
+ // delete from the third entry back
+ // take that cell's timestamp + 1 to simulate a delete in a new tx
+ long deleteTs = V[5] + 1;
+ Delete d = new Delete(row, deleteTs);
+ LOG.info("Issuing delete at timestamp " + deleteTs);
+ // row deletes are not yet supported (TransactionAwareHTable
normally handles this)
+ d.addColumns(familyBytes, columnBytes);
+ region.delete(d);
+
+ List<Cell> results = Lists.newArrayList();
+
+ // force a flush to clear the data
+ // during flush, we should drop the deleted version, but not the
others
+ LOG.info("Flushing region " +
region.getRegionInfo().getRegionNameAsString());
+ region.flushcache(true, false, new FlushLifeCycleTracker() { });
+
+ // now a normal scan should return row with versions at: V[8], V[6].
+ // V[7] is invalid and V[5] and prior are deleted.
+ Scan scan = new Scan();
+ scan.setMaxVersions(10);
+ RegionScanner regionScanner = region.getScanner(scan);
+ // should be only one row
+ assertFalse(regionScanner.next(results));
+ assertKeyValueMatches(results, 1,
+ new long[]{V[8], V[6], deleteTs},
+ new byte[][]{Bytes.toBytes(V[8]), Bytes.toBytes(V[6]), new
byte[0]});
+ } finally {
+ //region.close();
+ }
+ }
+
+ @Test
+ public void testDeleteMarkerCleanup() throws Exception {
+ String tableName = "TestDeleteMarkerCleanup";
+ byte[] familyBytes = Bytes.toBytes("f");
+ HRegion region = createRegion(tableName, familyBytes, 0);
+ try {
+ region.initialize();
+
+ // all puts use a timestamp before the tx snapshot's visibility
upper bound, making them eligible for removal
+ long writeTs = txVisibilityState.getVisibilityUpperBound() - 10;
+ // deletes are performed after the writes, but still before the
visibility upper bound
+ long deleteTs = writeTs + 1;
+ // write separate columns to confirm that delete markers survive
across flushes
+ byte[] row = Bytes.toBytes(100);
+ Put p = new Put(row);
+
+ LOG.info("Writing columns at timestamp " + writeTs);
+ for (int i = 0; i < 5; i++) {
+ byte[] iBytes = Bytes.toBytes(i);
+ p.addColumn(familyBytes, iBytes, writeTs, iBytes);
+ }
+ region.put(p);
+ // read all back
+ Scan scan = new Scan(row);
+ RegionScanner regionScanner = region.getScanner(scan);
+ List<Cell> results = Lists.newArrayList();
+ assertFalse(regionScanner.next(results));
+
+ for (int i = 0; i < 5; i++) {
+ Cell cell = results.get(i);
+ assertArrayEquals(row, CellUtil.cloneRow(cell));
+ byte[] idxBytes = Bytes.toBytes(i);
+ assertArrayEquals(idxBytes, CellUtil.cloneQualifier(cell));
+ assertArrayEquals(idxBytes, CellUtil.cloneValue(cell));
+ }
+
+ // force a flush to clear the memstore
+ LOG.info("Before delete, flushing region " +
region.getRegionInfo().getRegionNameAsString());
+ region.flushcache(false, false, new FlushLifeCycleTracker() { });
+ // delete the odd entries
+ for (int i = 0; i < 5; i++) {
+ if (i % 2 == 1) {
+ // deletes are performed as puts with empty values
+ Put deletePut = new Put(row);
+ deletePut.addColumn(familyBytes, Bytes.toBytes(i), deleteTs, new
byte[0]);
+ region.put(deletePut);
+ }
+ }
+
+ // read all back
+ scan = new Scan(row);
+ scan.readVersions(1);
+
scan.setFilter(TransactionFilters.getVisibilityFilter(TxUtils.createDummyTransaction(txVisibilityState),
+ new
TreeMap<byte[], Long>(), false, ScanType.USER_SCAN));
+ regionScanner = region.getScanner(scan);
+ results = Lists.newArrayList();
+ assertFalse(regionScanner.next(results));
+ assertEquals(3, results.size());
+ // only even columns should exist
+ for (int i = 0; i < 3; i++) {
+ Cell cell = results.get(i);
+ LOG.info("Got cell " + cell);
+ assertArrayEquals(row, CellUtil.cloneRow(cell));
+ byte[] idxBytes = Bytes.toBytes(i * 2);
+ assertArrayEquals(idxBytes, CellUtil.cloneQualifier(cell));
+ assertArrayEquals(idxBytes, CellUtil.cloneValue(cell));
+ }
+
+ // force another flush on the delete markers
+ // during flush, we should retain the delete markers, since they can
only safely be dropped by a major compaction
+ LOG.info("After delete, flushing region " +
region.getRegionInfo().getRegionNameAsString());
+ FlushResultImpl flushResultImpl = region.flushcache(true, false, new
FlushLifeCycleTracker() { });
+ assertTrue(flushResultImpl
+ .getResult() ==
FlushResult.Result.FLUSHED_NO_COMPACTION_NEEDED
+ || flushResultImpl.getResult() ==
FlushResult.Result.FLUSHED_COMPACTION_NEEDED);
+ scan = new Scan(row);
+
scan.setFilter(TransactionFilters.getVisibilityFilter(TxUtils.createDummyTransaction(txVisibilityState),
+ new
TreeMap<byte[], Long>(), false, ScanType.USER_SCAN));
+ scan.readVersions(1);
+ regionScanner = region.getScanner(scan);
+ results = Lists.newArrayList();
+ assertFalse(regionScanner.next(results));
+ assertEquals(3, results.size());
+ // only even columns should exist
+ for (int i = 0; i < 3; i++) {
+ Cell cell = results.get(i);
+ assertArrayEquals(row, CellUtil.cloneRow(cell));
+ byte[] idxBytes = Bytes.toBytes(i * 2);
+ assertArrayEquals(idxBytes, CellUtil.cloneQualifier(cell));
+ assertArrayEquals(idxBytes, CellUtil.cloneValue(cell));
+ }
+
+ // force a major compaction
+ LOG.info("Forcing major compaction of region " +
region.getRegionInfo().getRegionNameAsString());
+ region.compact(true);
+
+ // perform a raw scan (no filter) to confirm that the delete markers
are now gone
+ scan = new Scan(row);
+ System.out.println("scan started");
+ regionScanner = region.getScanner(scan);
+ results = Lists.newArrayList();
+ assertFalse(regionScanner.next(results));
+ assertEquals(3, results.size());
+ // only even columns should exist
+ for (int i = 0; i < 3; i++) {
+ Cell cell = results.get(i);
+ assertArrayEquals(row, CellUtil.cloneRow(cell));
+ byte[] idxBytes = Bytes.toBytes(i * 2);
+ assertArrayEquals(idxBytes, CellUtil.cloneQualifier(cell));
+ assertArrayEquals(idxBytes, CellUtil.cloneValue(cell));
+ }
+ } finally {
+// region.close();
+ }
+ }
+
+ /**
+ * Test that we correctly preserve the timestamp set for column family
delete markers. This is not
+ * directly required for the TransactionAwareHTable usage, but is the
right thing to do and ensures
+ * that we make it easy to interoperate with other systems.
+ */
+ @Test
+ public void testFamilyDeleteTimestamp() throws Exception {
+ String tableName = "TestFamilyDeleteTimestamp";
+ byte[] family1Bytes = Bytes.toBytes("f1");
+ byte[] columnBytes = Bytes.toBytes("c");
+ byte[] rowBytes = Bytes.toBytes("row");
+ byte[] valBytes = Bytes.toBytes("val");
+ HRegion region = createRegion(tableName, family1Bytes, 0);
+ try {
+ region.initialize();
+
+ long now = System.currentTimeMillis() * TxConstants.MAX_TX_PER_MS;
+ Put p = new Put(rowBytes);
+ p.addColumn(family1Bytes, columnBytes, now - 10, valBytes);
+ region.put(p);
+
+ // issue a family delete with an explicit timestamp
+ Delete delete = new Delete(rowBytes, now);
+ delete.addFamily(family1Bytes, now - 5);
+ region.delete(delete);
+
+ // test that the delete marker preserved the timestamp
+ Scan scan = new Scan();
+ scan.setMaxVersions();
+ RegionScanner scanner = region.getScanner(scan);
+ List<Cell> results = Lists.newArrayList();
+ scanner.next(results);
+ assertEquals(2, results.size());
+ // delete marker should appear first
+ Cell cell = results.get(0);
+ assertArrayEquals(new byte[0], CellUtil.cloneQualifier(cell));
+ assertArrayEquals(new byte[0], CellUtil.cloneValue(cell));
+ assertEquals(now - 5, cell.getTimestamp());
+ // since this is an unfiltered scan against the region, the original
put should be next
+ cell = results.get(1);
+ assertArrayEquals(valBytes, CellUtil.cloneValue(cell));
+ assertEquals(now - 10, cell.getTimestamp());
+ scanner.close();
+
+
+ // with a filtered scan the original put should disappear
+ scan = new Scan();
+ scan.setMaxVersions();
+
scan.setFilter(TransactionFilters.getVisibilityFilter(TxUtils.createDummyTransaction(txVisibilityState),
+ new
TreeMap<byte[], Long>(), false, ScanType.USER_SCAN));
+ scanner = region.getScanner(scan);
+ results = Lists.newArrayList();
+ scanner.next(results);
+ assertEquals(0, results.size());
+ scanner.close();
+ } finally {
+ region.close();
+ }
+ }
+
+ @Test
+ public void testPreExistingData() throws Exception {
+ String tableName = "TestPreExistingData";
+ byte[] familyBytes = Bytes.toBytes("f");
+ long ttlMillis = TimeUnit.DAYS.toMillis(14);
+ HRegion region = createRegion(tableName, familyBytes, ttlMillis);
+ try {
+ region.initialize();
+
+ // timestamps for pre-existing, non-transactional data
+ long now = txVisibilityState.getVisibilityUpperBound() /
TxConstants.MAX_TX_PER_MS;
+ long older = now - ttlMillis / 2;
+ long newer = now - ttlMillis / 3;
+ // timestamps for transactional data
+ long nowTx = txVisibilityState.getVisibilityUpperBound();
+ long olderTx = nowTx - (ttlMillis / 2) * TxConstants.MAX_TX_PER_MS;
+ long newerTx = nowTx - (ttlMillis / 3) * TxConstants.MAX_TX_PER_MS;
+
+ Map<byte[], Long> ttls = Maps.newTreeMap(Bytes.BYTES_COMPARATOR);
+ ttls.put(familyBytes, ttlMillis);
+
+ List<Cell> cells = new ArrayList<>();
+ cells.add(new KeyValue(Bytes.toBytes("r1"), familyBytes,
Bytes.toBytes("c1"), older, Bytes.toBytes("v11")));
+ cells.add(new KeyValue(Bytes.toBytes("r1"), familyBytes,
Bytes.toBytes("c2"), newer, Bytes.toBytes("v12")));
+ cells.add(new KeyValue(Bytes.toBytes("r2"), familyBytes,
Bytes.toBytes("c1"), older, Bytes.toBytes("v21")));
+ cells.add(new KeyValue(Bytes.toBytes("r2"), familyBytes,
Bytes.toBytes("c2"), newer, Bytes.toBytes("v22")));
+ cells.add(new KeyValue(Bytes.toBytes("r3"), familyBytes,
Bytes.toBytes("c1"), olderTx, Bytes.toBytes("v31")));
+ cells.add(new KeyValue(Bytes.toBytes("r3"), familyBytes,
Bytes.toBytes("c2"), newerTx, Bytes.toBytes("v32")));
+
+ // Write non-transactional and transactional data
+ for (Cell c : cells) {
+ region.put(new
Put(CellUtil.cloneRow(c)).addColumn(CellUtil.cloneFamily(c),
CellUtil.cloneQualifier(c),
+ c.getTimestamp(), CellUtil.cloneValue(c)));
+ }
+
+ Scan rawScan = new Scan();
+ rawScan.setMaxVersions();
+
+ Transaction dummyTransaction =
TxUtils.createDummyTransaction(txVisibilityState);
+ Scan txScan = new Scan();
+ txScan.setMaxVersions();
+ txScan.setTimeRange(TxUtils.getOldestVisibleTimestamp(ttls,
dummyTransaction, true),
+
TxUtils.getMaxVisibleTimestamp(dummyTransaction));
+
txScan.setFilter(TransactionFilters.getVisibilityFilter(dummyTransaction, ttls,
false, ScanType.USER_SCAN));
+
+ // read all back with raw scanner
+ scanAndAssert(region, cells, rawScan);
+
+ // read all back with transaction filter
+ scanAndAssert(region, cells, txScan);
+
+ // force a flush to clear the memstore
+ region.flushcache(true, false, new FlushLifeCycleTracker() { });
+ scanAndAssert(region, cells, txScan);
+
+ // force a major compaction to remove any expired cells
+ region.compact(true);
+ scanAndAssert(region, cells, txScan);
+
+ // Reduce TTL, this should make cells with timestamps older and
olderTx expire
+ long newTtl = ttlMillis / 2 - 1;
+ region = updateTtl(region, familyBytes, newTtl);
+ ttls.put(familyBytes, newTtl);
+ txScan.setTimeRange(TxUtils.getOldestVisibleTimestamp(ttls,
dummyTransaction, true),
+
TxUtils.getMaxVisibleTimestamp(dummyTransaction));
+
txScan.setFilter(TransactionFilters.getVisibilityFilter(dummyTransaction, ttls,
false, ScanType.USER_SCAN));
+
+ // Raw scan should still give all cells
+ scanAndAssert(region, cells, rawScan);
+ // However, tx scan should not return expired cells
+ scanAndAssert(region, select(cells, 1, 3, 5), txScan);
+
+ region.flushcache(true, false, new FlushLifeCycleTracker() { });
+ scanAndAssert(region, cells, rawScan);
+
+ // force a major compaction to remove any expired cells
+ region.compact(true);
+ // This time raw scan too should not return expired cells, as they
would be dropped during major compaction
+ scanAndAssert(region, select(cells, 1, 3, 5), rawScan);
+
+ // Reduce TTL again to 1 ms, this should expire all cells
+ newTtl = 1;
+ region = updateTtl(region, familyBytes, newTtl);
+ ttls.put(familyBytes, newTtl);
+ txScan.setTimeRange(TxUtils.getOldestVisibleTimestamp(ttls,
dummyTransaction, true),
+
TxUtils.getMaxVisibleTimestamp(dummyTransaction));
+
txScan.setFilter(TransactionFilters.getVisibilityFilter(dummyTransaction, ttls,
false, ScanType.USER_SCAN));
+
+ // force a major compaction to remove expired cells
+ region.compact(true);
+ // This time raw scan should not return any cells, as all cells have
expired.
+ scanAndAssert(region, Collections.<Cell>emptyList(), rawScan);
+ } finally {
+ region.close();
+ }
+ }
+
+ private List<Cell> select(List<Cell> cells, int... indexes) {
+ List<Cell> newCells = new ArrayList<>();
+ for (int i : indexes) {
+ newCells.add(cells.get(i));
+ }
+ return newCells;
+ }
+
+ @SuppressWarnings("StatementWithEmptyBody")
+ private void scanAndAssert(HRegion region, List<Cell> expected, Scan
scan) throws Exception {
+ try (RegionScanner regionScanner = region.getScanner(scan)) {
+ List<Cell> results = Lists.newArrayList();
+ while (regionScanner.next(results)) { }
+ assertEquals(expected, results);
+ }
+ }
+
+ private HRegion updateTtl(HRegion region, byte[] family, long ttl)
throws Exception {
+ region.close();
+ TableDescriptorBuilder tableBuilder =
+
TableDescriptorBuilder.newBuilder(region.getTableDescriptor());
+ ColumnFamilyDescriptorBuilder cfd =
+ ColumnFamilyDescriptorBuilder
+
.newBuilder(tableBuilder.build().getColumnFamily(family));
+ if (ttl > 0) {
+ cfd.setValue(Bytes.toBytes(TxConstants.PROPERTY_TTL),
+ Bytes.toBytes(String.valueOf(ttl)));
+ }
+ cfd.setMaxVersions(10);
+ tableBuilder.removeColumnFamily(family);
--- End diff --
Just curious as to why we need to remove the column family and then add it
back again?
> Add HBase 2.0 compatibility module
> ----------------------------------
>
> Key: TEPHRA-272
> URL: https://issues.apache.org/jira/browse/TEPHRA-272
> Project: Tephra
> Issue Type: Improvement
> Reporter: Ankit Singhal
> Assignee: Ankit Singhal
> Labels: HBase-2.0
> Fix For: 0.14.0-incubating
>
>
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)