Repository: incubator-tephra Updated Branches: refs/heads/master 827a70c68 -> b3370b662
TEPHRA-244 Remove regions of deleted tables when computing prune upper bound This closes #55 Signed-off-by: poorna <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/incubator-tephra/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-tephra/commit/b3370b66 Tree: http://git-wip-us.apache.org/repos/asf/incubator-tephra/tree/b3370b66 Diff: http://git-wip-us.apache.org/repos/asf/incubator-tephra/diff/b3370b66 Branch: refs/heads/master Commit: b3370b662c2df0fbe8581944754fd2376ad70dc8 Parents: 827a70c Author: poorna <[email protected]> Authored: Sun Sep 10 21:38:08 2017 -0700 Committer: poorna <[email protected]> Committed: Mon Sep 11 11:38:21 2017 -0700 ---------------------------------------------------------------------- .../txprune/HBaseTransactionPruningPlugin.java | 35 +++++++++++ .../hbase/txprune/InvalidListPruneTest.java | 65 ++++++++++++++++++++ .../src/test/resources/logback-test.xml | 39 ++++++++++++ .../txprune/HBaseTransactionPruningPlugin.java | 35 +++++++++++ .../hbase/txprune/InvalidListPruneTest.java | 65 ++++++++++++++++++++ .../src/test/resources/logback-test.xml | 39 ++++++++++++ .../txprune/HBaseTransactionPruningPlugin.java | 35 +++++++++++ .../hbase/txprune/InvalidListPruneTest.java | 65 ++++++++++++++++++++ .../src/test/resources/logback-test.xml | 39 ++++++++++++ .../txprune/HBaseTransactionPruningPlugin.java | 35 +++++++++++ .../hbase/txprune/InvalidListPruneTest.java | 65 ++++++++++++++++++++ .../src/test/resources/logback-test.xml | 39 ++++++++++++ .../txprune/HBaseTransactionPruningPlugin.java | 35 +++++++++++ .../hbase/txprune/InvalidListPruneTest.java | 65 ++++++++++++++++++++ .../src/test/resources/logback-test.xml | 39 ++++++++++++ .../txprune/HBaseTransactionPruningPlugin.java | 35 +++++++++++ .../hbase/txprune/InvalidListPruneTest.java | 65 ++++++++++++++++++++ .../src/test/resources/logback-test.xml | 39 ++++++++++++ 18 files changed, 834 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/b3370b66/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/txprune/HBaseTransactionPruningPlugin.java ---------------------------------------------------------------------- diff --git a/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/txprune/HBaseTransactionPruningPlugin.java b/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/txprune/HBaseTransactionPruningPlugin.java index 021f1b2..44b4bac 100644 --- a/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/txprune/HBaseTransactionPruningPlugin.java +++ b/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase/txprune/HBaseTransactionPruningPlugin.java @@ -20,6 +20,7 @@ package org.apache.tephra.hbase.txprune; import com.google.common.base.Function; +import com.google.common.base.Predicate; import com.google.common.collect.Iterables; import com.google.common.collect.Maps; import com.google.common.collect.Sets; @@ -43,8 +44,10 @@ import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.Collections; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.SortedSet; import java.util.TreeMap; import java.util.TreeSet; @@ -293,6 +296,10 @@ public class HBaseTransactionPruningPlugin implements TransactionPruningPlugin { * @throws IOException when not able to talk to HBase */ private long computePruneUpperBound(TimeRegions timeRegions) throws IOException { + // Get the tables for the current time from the latest regions set + final Set<TableName> existingTables = getTableNamesForRegions(timeRegions.getRegions()); + LOG.debug("Tables for time {} = {}", timeRegions.getTime(), existingTables); + do { LOG.debug("Computing prune upper bound for {}", timeRegions); SortedSet<byte[]> transactionalRegions = timeRegions.getRegions(); @@ -309,6 +316,15 @@ public class HBaseTransactionPruningPlugin implements TransactionPruningPlugin { continue; } + // Remove non-existing tables from the transactional regions set, so that we don't lookup prune upper bounds + // for them. Since the deleted tables do not exist anymore, there is no need to make sure they have been + // compacted. This ensures that transient tables do not block pruning progress. + transactionalRegions = filterDeletedTableRegions(existingTables, transactionalRegions); + if (LOG.isDebugEnabled()) { + LOG.debug("Transactional regions after removing the regions of non-existing tables = {}", + Iterables.transform(transactionalRegions, TimeRegions.BYTE_ARR_TO_STRING_FN)); + } + // Get the prune upper bounds for all the transactional regions Map<byte[], Long> pruneUpperBoundRegions = dataJanitorState.getPruneUpperBoundForRegions(transactionalRegions); @@ -342,6 +358,25 @@ public class HBaseTransactionPruningPlugin implements TransactionPruningPlugin { return -1; } + private SortedSet<byte[]> filterDeletedTableRegions(final Set<TableName> existingTables, + SortedSet<byte[]> transactionalRegions) { + return Sets.filter(transactionalRegions, + new Predicate<byte[]>() { + @Override + public boolean apply(byte[] region) { + return existingTables.contains(HRegionInfo.getTable(region)); + } + }); + } + + private Set<TableName> getTableNamesForRegions(Set<byte[]> regions) { + Set<TableName> tableNames = new HashSet<>(regions.size()); + for (byte[] region : regions) { + tableNames.add(HRegionInfo.getTable(region)); + } + return tableNames; + } + private Map<byte[], Long> handleEmptyRegions(long inactiveTransactionBound, SortedSet<byte[]> transactionalRegions, Map<byte[], Long> pruneUpperBoundRegions) throws IOException { http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/b3370b66/tephra-hbase-compat-0.96/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java ---------------------------------------------------------------------- diff --git a/tephra-hbase-compat-0.96/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java b/tephra-hbase-compat-0.96/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java index 91bbc1a..cea9b5c 100644 --- a/tephra-hbase-compat-0.96/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java +++ b/tephra-hbase-compat-0.96/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java @@ -391,6 +391,71 @@ public class InvalidListPruneTest extends AbstractHBaseTableTest { } } + @Test + public void testPruneTransientTable() throws Exception { + // Make sure that transient tables do not block the progress of pruning + + // Create a temp table + TableName txTempTable = TableName.valueOf("tempTable"); + createTable(txTempTable.getName(), new byte[][]{family}, false, + Collections.singletonList(TestTransactionProcessor.class.getName())); + + TableName txDataTable2 = null; + + TransactionPruningPlugin transactionPruningPlugin = new TestTransactionPruningPlugin(); + transactionPruningPlugin.initialize(conf); + + try { + long now1 = System.currentTimeMillis(); + long inactiveTxTimeNow1 = (now1 - 150) * TxConstants.MAX_TX_PER_MS; + long noPruneUpperBound = -1; + long expectedPruneUpperBound1 = (now1 - 200) * TxConstants.MAX_TX_PER_MS; + InMemoryTransactionStateCache.setTransactionSnapshot( + new TransactionSnapshot(expectedPruneUpperBound1, expectedPruneUpperBound1, expectedPruneUpperBound1, + ImmutableSet.of(expectedPruneUpperBound1), + ImmutableSortedMap.<Long, TransactionManager.InProgressTx>of())); + + // fetch prune upper bound, there should be no prune upper bound since nothing has been compacted yet. + // This run is only to store the initial set of regions + long pruneUpperBound1 = transactionPruningPlugin.fetchPruneUpperBound(now1, inactiveTxTimeNow1); + Assert.assertEquals(noPruneUpperBound, pruneUpperBound1); + transactionPruningPlugin.pruneComplete(now1, noPruneUpperBound); + + // Now delete the transient table + hBaseAdmin.disableTable(txTempTable); + hBaseAdmin.deleteTable(txTempTable); + txTempTable = null; + + // Compact the data table now + testUtil.compact(txDataTable1, true); + // Since the write to prune table happens async, we need to sleep a bit before checking the state of the table + TimeUnit.SECONDS.sleep(2); + + // Create a new table that will not be compacted + txDataTable2 = TableName.valueOf("invalidListPruneTestTable2"); + createTable(txDataTable2.getName(), new byte[][]{family}, false, + Collections.singletonList(TestTransactionProcessor.class.getName())); + + // fetch prune upper bound, there should be a prune upper bound even though txTempTable does not exist anymore, + // and txDataTable2 has not been compacted/flushed yet + long now2 = System.currentTimeMillis(); + long inactiveTxTimeNow2 = (now1 - 150) * TxConstants.MAX_TX_PER_MS; + long pruneUpperBound2 = transactionPruningPlugin.fetchPruneUpperBound(now2, inactiveTxTimeNow2); + Assert.assertEquals(expectedPruneUpperBound1, pruneUpperBound2); + transactionPruningPlugin.pruneComplete(now2, expectedPruneUpperBound1); + } finally { + transactionPruningPlugin.destroy(); + if (txDataTable2 != null) { + hBaseAdmin.disableTable(txDataTable2); + hBaseAdmin.deleteTable(txDataTable2); + } + if (txTempTable != null) { + hBaseAdmin.disableTable(txTempTable); + hBaseAdmin.deleteTable(txTempTable); + } + } + } + private byte[] getRegionName(TableName dataTable, byte[] row) throws IOException { HRegionLocation regionLocation = connection.getRegionLocation(dataTable, row, true); return regionLocation.getRegionInfo().getRegionName(); http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/b3370b66/tephra-hbase-compat-0.96/src/test/resources/logback-test.xml ---------------------------------------------------------------------- diff --git a/tephra-hbase-compat-0.96/src/test/resources/logback-test.xml b/tephra-hbase-compat-0.96/src/test/resources/logback-test.xml new file mode 100644 index 0000000..36f0a37 --- /dev/null +++ b/tephra-hbase-compat-0.96/src/test/resources/logback-test.xml @@ -0,0 +1,39 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one + ~ or more contributor license agreements. See the NOTICE file + ~ distributed with this work for additional information + ~ regarding copyright ownership. The ASF licenses this file + ~ to you under the Apache License, Version 2.0 (the + ~ "License"); you may not use this file except in compliance + ~ with the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, + ~ software distributed under the License is distributed on an + ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + ~ KIND, either express or implied. See the License for the + ~ specific language governing permissions and limitations + ~ under the License. + --> + +<configuration> + <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender"> + <encoder> + <pattern>%d{ISO8601} - %-5p [%t:%C{1}@%L] - %m%n</pattern> + </encoder> + </appender> + + <logger name="org.apache.hadoop" level="WARN" /> + <!-- BlockStateChange is used by org.apache.hadoop.hdfs.server.blockmanagement.BlockManager --> + <logger name="BlockStateChange" level="WARN" /> + <logger name="org.apache.zookeeper" level="WARN" /> + <logger name="org.mortbay.log" level="WARN" /> + <logger name="org.apache.tephra.hbase.txprune" level="DEBUG" /> + + <root level="INFO"> + <appender-ref ref="STDOUT"/> + </root> + +</configuration> http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/b3370b66/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/txprune/HBaseTransactionPruningPlugin.java ---------------------------------------------------------------------- diff --git a/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/txprune/HBaseTransactionPruningPlugin.java b/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/txprune/HBaseTransactionPruningPlugin.java index 021f1b2..44b4bac 100644 --- a/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/txprune/HBaseTransactionPruningPlugin.java +++ b/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/txprune/HBaseTransactionPruningPlugin.java @@ -20,6 +20,7 @@ package org.apache.tephra.hbase.txprune; import com.google.common.base.Function; +import com.google.common.base.Predicate; import com.google.common.collect.Iterables; import com.google.common.collect.Maps; import com.google.common.collect.Sets; @@ -43,8 +44,10 @@ import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.Collections; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.SortedSet; import java.util.TreeMap; import java.util.TreeSet; @@ -293,6 +296,10 @@ public class HBaseTransactionPruningPlugin implements TransactionPruningPlugin { * @throws IOException when not able to talk to HBase */ private long computePruneUpperBound(TimeRegions timeRegions) throws IOException { + // Get the tables for the current time from the latest regions set + final Set<TableName> existingTables = getTableNamesForRegions(timeRegions.getRegions()); + LOG.debug("Tables for time {} = {}", timeRegions.getTime(), existingTables); + do { LOG.debug("Computing prune upper bound for {}", timeRegions); SortedSet<byte[]> transactionalRegions = timeRegions.getRegions(); @@ -309,6 +316,15 @@ public class HBaseTransactionPruningPlugin implements TransactionPruningPlugin { continue; } + // Remove non-existing tables from the transactional regions set, so that we don't lookup prune upper bounds + // for them. Since the deleted tables do not exist anymore, there is no need to make sure they have been + // compacted. This ensures that transient tables do not block pruning progress. + transactionalRegions = filterDeletedTableRegions(existingTables, transactionalRegions); + if (LOG.isDebugEnabled()) { + LOG.debug("Transactional regions after removing the regions of non-existing tables = {}", + Iterables.transform(transactionalRegions, TimeRegions.BYTE_ARR_TO_STRING_FN)); + } + // Get the prune upper bounds for all the transactional regions Map<byte[], Long> pruneUpperBoundRegions = dataJanitorState.getPruneUpperBoundForRegions(transactionalRegions); @@ -342,6 +358,25 @@ public class HBaseTransactionPruningPlugin implements TransactionPruningPlugin { return -1; } + private SortedSet<byte[]> filterDeletedTableRegions(final Set<TableName> existingTables, + SortedSet<byte[]> transactionalRegions) { + return Sets.filter(transactionalRegions, + new Predicate<byte[]>() { + @Override + public boolean apply(byte[] region) { + return existingTables.contains(HRegionInfo.getTable(region)); + } + }); + } + + private Set<TableName> getTableNamesForRegions(Set<byte[]> regions) { + Set<TableName> tableNames = new HashSet<>(regions.size()); + for (byte[] region : regions) { + tableNames.add(HRegionInfo.getTable(region)); + } + return tableNames; + } + private Map<byte[], Long> handleEmptyRegions(long inactiveTransactionBound, SortedSet<byte[]> transactionalRegions, Map<byte[], Long> pruneUpperBoundRegions) throws IOException { http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/b3370b66/tephra-hbase-compat-0.98/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java ---------------------------------------------------------------------- diff --git a/tephra-hbase-compat-0.98/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java b/tephra-hbase-compat-0.98/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java index 91bbc1a..cea9b5c 100644 --- a/tephra-hbase-compat-0.98/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java +++ b/tephra-hbase-compat-0.98/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java @@ -391,6 +391,71 @@ public class InvalidListPruneTest extends AbstractHBaseTableTest { } } + @Test + public void testPruneTransientTable() throws Exception { + // Make sure that transient tables do not block the progress of pruning + + // Create a temp table + TableName txTempTable = TableName.valueOf("tempTable"); + createTable(txTempTable.getName(), new byte[][]{family}, false, + Collections.singletonList(TestTransactionProcessor.class.getName())); + + TableName txDataTable2 = null; + + TransactionPruningPlugin transactionPruningPlugin = new TestTransactionPruningPlugin(); + transactionPruningPlugin.initialize(conf); + + try { + long now1 = System.currentTimeMillis(); + long inactiveTxTimeNow1 = (now1 - 150) * TxConstants.MAX_TX_PER_MS; + long noPruneUpperBound = -1; + long expectedPruneUpperBound1 = (now1 - 200) * TxConstants.MAX_TX_PER_MS; + InMemoryTransactionStateCache.setTransactionSnapshot( + new TransactionSnapshot(expectedPruneUpperBound1, expectedPruneUpperBound1, expectedPruneUpperBound1, + ImmutableSet.of(expectedPruneUpperBound1), + ImmutableSortedMap.<Long, TransactionManager.InProgressTx>of())); + + // fetch prune upper bound, there should be no prune upper bound since nothing has been compacted yet. + // This run is only to store the initial set of regions + long pruneUpperBound1 = transactionPruningPlugin.fetchPruneUpperBound(now1, inactiveTxTimeNow1); + Assert.assertEquals(noPruneUpperBound, pruneUpperBound1); + transactionPruningPlugin.pruneComplete(now1, noPruneUpperBound); + + // Now delete the transient table + hBaseAdmin.disableTable(txTempTable); + hBaseAdmin.deleteTable(txTempTable); + txTempTable = null; + + // Compact the data table now + testUtil.compact(txDataTable1, true); + // Since the write to prune table happens async, we need to sleep a bit before checking the state of the table + TimeUnit.SECONDS.sleep(2); + + // Create a new table that will not be compacted + txDataTable2 = TableName.valueOf("invalidListPruneTestTable2"); + createTable(txDataTable2.getName(), new byte[][]{family}, false, + Collections.singletonList(TestTransactionProcessor.class.getName())); + + // fetch prune upper bound, there should be a prune upper bound even though txTempTable does not exist anymore, + // and txDataTable2 has not been compacted/flushed yet + long now2 = System.currentTimeMillis(); + long inactiveTxTimeNow2 = (now1 - 150) * TxConstants.MAX_TX_PER_MS; + long pruneUpperBound2 = transactionPruningPlugin.fetchPruneUpperBound(now2, inactiveTxTimeNow2); + Assert.assertEquals(expectedPruneUpperBound1, pruneUpperBound2); + transactionPruningPlugin.pruneComplete(now2, expectedPruneUpperBound1); + } finally { + transactionPruningPlugin.destroy(); + if (txDataTable2 != null) { + hBaseAdmin.disableTable(txDataTable2); + hBaseAdmin.deleteTable(txDataTable2); + } + if (txTempTable != null) { + hBaseAdmin.disableTable(txTempTable); + hBaseAdmin.deleteTable(txTempTable); + } + } + } + private byte[] getRegionName(TableName dataTable, byte[] row) throws IOException { HRegionLocation regionLocation = connection.getRegionLocation(dataTable, row, true); return regionLocation.getRegionInfo().getRegionName(); http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/b3370b66/tephra-hbase-compat-0.98/src/test/resources/logback-test.xml ---------------------------------------------------------------------- diff --git a/tephra-hbase-compat-0.98/src/test/resources/logback-test.xml b/tephra-hbase-compat-0.98/src/test/resources/logback-test.xml new file mode 100644 index 0000000..36f0a37 --- /dev/null +++ b/tephra-hbase-compat-0.98/src/test/resources/logback-test.xml @@ -0,0 +1,39 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one + ~ or more contributor license agreements. See the NOTICE file + ~ distributed with this work for additional information + ~ regarding copyright ownership. The ASF licenses this file + ~ to you under the Apache License, Version 2.0 (the + ~ "License"); you may not use this file except in compliance + ~ with the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, + ~ software distributed under the License is distributed on an + ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + ~ KIND, either express or implied. See the License for the + ~ specific language governing permissions and limitations + ~ under the License. + --> + +<configuration> + <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender"> + <encoder> + <pattern>%d{ISO8601} - %-5p [%t:%C{1}@%L] - %m%n</pattern> + </encoder> + </appender> + + <logger name="org.apache.hadoop" level="WARN" /> + <!-- BlockStateChange is used by org.apache.hadoop.hdfs.server.blockmanagement.BlockManager --> + <logger name="BlockStateChange" level="WARN" /> + <logger name="org.apache.zookeeper" level="WARN" /> + <logger name="org.mortbay.log" level="WARN" /> + <logger name="org.apache.tephra.hbase.txprune" level="DEBUG" /> + + <root level="INFO"> + <appender-ref ref="STDOUT"/> + </root> + +</configuration> http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/b3370b66/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/txprune/HBaseTransactionPruningPlugin.java ---------------------------------------------------------------------- diff --git a/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/txprune/HBaseTransactionPruningPlugin.java b/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/txprune/HBaseTransactionPruningPlugin.java index 84c480a..42c9f84 100644 --- a/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/txprune/HBaseTransactionPruningPlugin.java +++ b/tephra-hbase-compat-1.0-cdh/src/main/java/org/apache/tephra/hbase/txprune/HBaseTransactionPruningPlugin.java @@ -20,6 +20,7 @@ package org.apache.tephra.hbase.txprune; import com.google.common.base.Function; +import com.google.common.base.Predicate; import com.google.common.collect.Iterables; import com.google.common.collect.Maps; import com.google.common.collect.Sets; @@ -43,8 +44,10 @@ import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.Collections; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.SortedSet; import java.util.TreeMap; import java.util.TreeSet; @@ -286,6 +289,10 @@ public class HBaseTransactionPruningPlugin implements TransactionPruningPlugin { * @throws IOException when not able to talk to HBase */ private long computePruneUpperBound(TimeRegions timeRegions) throws IOException { + // Get the tables for the current time from the latest regions set + final Set<TableName> existingTables = getTableNamesForRegions(timeRegions.getRegions()); + LOG.debug("Tables for time {} = {}", timeRegions.getTime(), existingTables); + do { LOG.debug("Computing prune upper bound for {}", timeRegions); SortedSet<byte[]> transactionalRegions = timeRegions.getRegions(); @@ -302,6 +309,15 @@ public class HBaseTransactionPruningPlugin implements TransactionPruningPlugin { continue; } + // Remove non-existing tables from the transactional regions set, so that we don't lookup prune upper bounds + // for them. Since the deleted tables do not exist anymore, there is no need to make sure they have been + // compacted. This ensures that transient tables do not block pruning progress. + transactionalRegions = filterDeletedTableRegions(existingTables, transactionalRegions); + if (LOG.isDebugEnabled()) { + LOG.debug("Transactional regions after removing the regions of non-existing tables = {}", + Iterables.transform(transactionalRegions, TimeRegions.BYTE_ARR_TO_STRING_FN)); + } + // Get the prune upper bounds for all the transactional regions Map<byte[], Long> pruneUpperBoundRegions = dataJanitorState.getPruneUpperBoundForRegions(transactionalRegions); @@ -335,6 +351,25 @@ public class HBaseTransactionPruningPlugin implements TransactionPruningPlugin { return -1; } + private SortedSet<byte[]> filterDeletedTableRegions(final Set<TableName> existingTables, + SortedSet<byte[]> transactionalRegions) { + return Sets.filter(transactionalRegions, + new Predicate<byte[]>() { + @Override + public boolean apply(byte[] region) { + return existingTables.contains(HRegionInfo.getTable(region)); + } + }); + } + + private Set<TableName> getTableNamesForRegions(Set<byte[]> regions) { + Set<TableName> tableNames = new HashSet<>(regions.size()); + for (byte[] region : regions) { + tableNames.add(HRegionInfo.getTable(region)); + } + return tableNames; + } + private Map<byte[], Long> handleEmptyRegions(long inactiveTransactionBound, SortedSet<byte[]> transactionalRegions, Map<byte[], Long> pruneUpperBoundRegions) throws IOException { http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/b3370b66/tephra-hbase-compat-1.0-cdh/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java ---------------------------------------------------------------------- diff --git a/tephra-hbase-compat-1.0-cdh/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java b/tephra-hbase-compat-1.0-cdh/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java index f2c1abc..55348b0 100644 --- a/tephra-hbase-compat-1.0-cdh/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java +++ b/tephra-hbase-compat-1.0-cdh/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java @@ -386,6 +386,71 @@ public class InvalidListPruneTest extends AbstractHBaseTableTest { } } + @Test + public void testPruneTransientTable() throws Exception { + // Make sure that transient tables do not block the progress of pruning + + // Create a temp table + TableName txTempTable = TableName.valueOf("tempTable"); + createTable(txTempTable.getName(), new byte[][]{family}, false, + Collections.singletonList(TestTransactionProcessor.class.getName())); + + TableName txDataTable2 = null; + + TransactionPruningPlugin transactionPruningPlugin = new TestTransactionPruningPlugin(); + transactionPruningPlugin.initialize(conf); + + try { + long now1 = System.currentTimeMillis(); + long inactiveTxTimeNow1 = (now1 - 150) * TxConstants.MAX_TX_PER_MS; + long noPruneUpperBound = -1; + long expectedPruneUpperBound1 = (now1 - 200) * TxConstants.MAX_TX_PER_MS; + InMemoryTransactionStateCache.setTransactionSnapshot( + new TransactionSnapshot(expectedPruneUpperBound1, expectedPruneUpperBound1, expectedPruneUpperBound1, + ImmutableSet.of(expectedPruneUpperBound1), + ImmutableSortedMap.<Long, TransactionManager.InProgressTx>of())); + + // fetch prune upper bound, there should be no prune upper bound since nothing has been compacted yet. + // This run is only to store the initial set of regions + long pruneUpperBound1 = transactionPruningPlugin.fetchPruneUpperBound(now1, inactiveTxTimeNow1); + Assert.assertEquals(noPruneUpperBound, pruneUpperBound1); + transactionPruningPlugin.pruneComplete(now1, noPruneUpperBound); + + // Now delete the transient table + hBaseAdmin.disableTable(txTempTable); + hBaseAdmin.deleteTable(txTempTable); + txTempTable = null; + + // Compact the data table now + testUtil.compact(txDataTable1, true); + // Since the write to prune table happens async, we need to sleep a bit before checking the state of the table + TimeUnit.SECONDS.sleep(2); + + // Create a new table that will not be compacted + txDataTable2 = TableName.valueOf("invalidListPruneTestTable2"); + createTable(txDataTable2.getName(), new byte[][]{family}, false, + Collections.singletonList(TestTransactionProcessor.class.getName())); + + // fetch prune upper bound, there should be a prune upper bound even though txTempTable does not exist anymore, + // and txDataTable2 has not been compacted/flushed yet + long now2 = System.currentTimeMillis(); + long inactiveTxTimeNow2 = (now1 - 150) * TxConstants.MAX_TX_PER_MS; + long pruneUpperBound2 = transactionPruningPlugin.fetchPruneUpperBound(now2, inactiveTxTimeNow2); + Assert.assertEquals(expectedPruneUpperBound1, pruneUpperBound2); + transactionPruningPlugin.pruneComplete(now2, expectedPruneUpperBound1); + } finally { + transactionPruningPlugin.destroy(); + if (txDataTable2 != null) { + hBaseAdmin.disableTable(txDataTable2); + hBaseAdmin.deleteTable(txDataTable2); + } + if (txTempTable != null) { + hBaseAdmin.disableTable(txTempTable); + hBaseAdmin.deleteTable(txTempTable); + } + } + } + private byte[] getRegionName(TableName dataTable, byte[] row) throws IOException { HRegionLocation regionLocation = testUtil.getConnection().getRegionLocator(dataTable).getRegionLocation(row); http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/b3370b66/tephra-hbase-compat-1.0-cdh/src/test/resources/logback-test.xml ---------------------------------------------------------------------- diff --git a/tephra-hbase-compat-1.0-cdh/src/test/resources/logback-test.xml b/tephra-hbase-compat-1.0-cdh/src/test/resources/logback-test.xml new file mode 100644 index 0000000..36f0a37 --- /dev/null +++ b/tephra-hbase-compat-1.0-cdh/src/test/resources/logback-test.xml @@ -0,0 +1,39 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one + ~ or more contributor license agreements. See the NOTICE file + ~ distributed with this work for additional information + ~ regarding copyright ownership. The ASF licenses this file + ~ to you under the Apache License, Version 2.0 (the + ~ "License"); you may not use this file except in compliance + ~ with the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, + ~ software distributed under the License is distributed on an + ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + ~ KIND, either express or implied. See the License for the + ~ specific language governing permissions and limitations + ~ under the License. + --> + +<configuration> + <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender"> + <encoder> + <pattern>%d{ISO8601} - %-5p [%t:%C{1}@%L] - %m%n</pattern> + </encoder> + </appender> + + <logger name="org.apache.hadoop" level="WARN" /> + <!-- BlockStateChange is used by org.apache.hadoop.hdfs.server.blockmanagement.BlockManager --> + <logger name="BlockStateChange" level="WARN" /> + <logger name="org.apache.zookeeper" level="WARN" /> + <logger name="org.mortbay.log" level="WARN" /> + <logger name="org.apache.tephra.hbase.txprune" level="DEBUG" /> + + <root level="INFO"> + <appender-ref ref="STDOUT"/> + </root> + +</configuration> http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/b3370b66/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/txprune/HBaseTransactionPruningPlugin.java ---------------------------------------------------------------------- diff --git a/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/txprune/HBaseTransactionPruningPlugin.java b/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/txprune/HBaseTransactionPruningPlugin.java index 8142601..1a895b2 100644 --- a/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/txprune/HBaseTransactionPruningPlugin.java +++ b/tephra-hbase-compat-1.0/src/main/java/org/apache/tephra/hbase/txprune/HBaseTransactionPruningPlugin.java @@ -20,6 +20,7 @@ package org.apache.tephra.hbase.txprune; import com.google.common.base.Function; +import com.google.common.base.Predicate; import com.google.common.collect.Iterables; import com.google.common.collect.Maps; import com.google.common.collect.Sets; @@ -43,8 +44,10 @@ import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.Collections; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.SortedSet; import java.util.TreeMap; import java.util.TreeSet; @@ -287,6 +290,10 @@ public class HBaseTransactionPruningPlugin implements TransactionPruningPlugin { * @throws IOException when not able to talk to HBase */ private long computePruneUpperBound(TimeRegions timeRegions) throws IOException { + // Get the tables for the current time from the latest regions set + final Set<TableName> existingTables = getTableNamesForRegions(timeRegions.getRegions()); + LOG.debug("Tables for time {} = {}", timeRegions.getTime(), existingTables); + do { LOG.debug("Computing prune upper bound for {}", timeRegions); SortedSet<byte[]> transactionalRegions = timeRegions.getRegions(); @@ -303,6 +310,15 @@ public class HBaseTransactionPruningPlugin implements TransactionPruningPlugin { continue; } + // Remove non-existing tables from the transactional regions set, so that we don't lookup prune upper bounds + // for them. Since the deleted tables do not exist anymore, there is no need to make sure they have been + // compacted. This ensures that transient tables do not block pruning progress. + transactionalRegions = filterDeletedTableRegions(existingTables, transactionalRegions); + if (LOG.isDebugEnabled()) { + LOG.debug("Transactional regions after removing the regions of non-existing tables = {}", + Iterables.transform(transactionalRegions, TimeRegions.BYTE_ARR_TO_STRING_FN)); + } + // Get the prune upper bounds for all the transactional regions Map<byte[], Long> pruneUpperBoundRegions = dataJanitorState.getPruneUpperBoundForRegions(transactionalRegions); @@ -336,6 +352,25 @@ public class HBaseTransactionPruningPlugin implements TransactionPruningPlugin { return -1; } + private SortedSet<byte[]> filterDeletedTableRegions(final Set<TableName> existingTables, + SortedSet<byte[]> transactionalRegions) { + return Sets.filter(transactionalRegions, + new Predicate<byte[]>() { + @Override + public boolean apply(byte[] region) { + return existingTables.contains(HRegionInfo.getTable(region)); + } + }); + } + + private Set<TableName> getTableNamesForRegions(Set<byte[]> regions) { + Set<TableName> tableNames = new HashSet<>(regions.size()); + for (byte[] region : regions) { + tableNames.add(HRegionInfo.getTable(region)); + } + return tableNames; + } + private Map<byte[], Long> handleEmptyRegions(long inactiveTransactionBound, SortedSet<byte[]> transactionalRegions, Map<byte[], Long> pruneUpperBoundRegions) throws IOException { http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/b3370b66/tephra-hbase-compat-1.0/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java ---------------------------------------------------------------------- diff --git a/tephra-hbase-compat-1.0/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java b/tephra-hbase-compat-1.0/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java index ac5e923..55348b0 100644 --- a/tephra-hbase-compat-1.0/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java +++ b/tephra-hbase-compat-1.0/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java @@ -384,7 +384,72 @@ public class InvalidListPruneTest extends AbstractHBaseTableTest { hBaseAdmin.disableTable(txEmptyTable); hBaseAdmin.deleteTable(txEmptyTable); } + } + + @Test + public void testPruneTransientTable() throws Exception { + // Make sure that transient tables do not block the progress of pruning + + // Create a temp table + TableName txTempTable = TableName.valueOf("tempTable"); + createTable(txTempTable.getName(), new byte[][]{family}, false, + Collections.singletonList(TestTransactionProcessor.class.getName())); + + TableName txDataTable2 = null; + + TransactionPruningPlugin transactionPruningPlugin = new TestTransactionPruningPlugin(); + transactionPruningPlugin.initialize(conf); + + try { + long now1 = System.currentTimeMillis(); + long inactiveTxTimeNow1 = (now1 - 150) * TxConstants.MAX_TX_PER_MS; + long noPruneUpperBound = -1; + long expectedPruneUpperBound1 = (now1 - 200) * TxConstants.MAX_TX_PER_MS; + InMemoryTransactionStateCache.setTransactionSnapshot( + new TransactionSnapshot(expectedPruneUpperBound1, expectedPruneUpperBound1, expectedPruneUpperBound1, + ImmutableSet.of(expectedPruneUpperBound1), + ImmutableSortedMap.<Long, TransactionManager.InProgressTx>of())); + + // fetch prune upper bound, there should be no prune upper bound since nothing has been compacted yet. + // This run is only to store the initial set of regions + long pruneUpperBound1 = transactionPruningPlugin.fetchPruneUpperBound(now1, inactiveTxTimeNow1); + Assert.assertEquals(noPruneUpperBound, pruneUpperBound1); + transactionPruningPlugin.pruneComplete(now1, noPruneUpperBound); + + // Now delete the transient table + hBaseAdmin.disableTable(txTempTable); + hBaseAdmin.deleteTable(txTempTable); + txTempTable = null; + + // Compact the data table now + testUtil.compact(txDataTable1, true); + // Since the write to prune table happens async, we need to sleep a bit before checking the state of the table + TimeUnit.SECONDS.sleep(2); + + // Create a new table that will not be compacted + txDataTable2 = TableName.valueOf("invalidListPruneTestTable2"); + createTable(txDataTable2.getName(), new byte[][]{family}, false, + Collections.singletonList(TestTransactionProcessor.class.getName())); + + // fetch prune upper bound, there should be a prune upper bound even though txTempTable does not exist anymore, + // and txDataTable2 has not been compacted/flushed yet + long now2 = System.currentTimeMillis(); + long inactiveTxTimeNow2 = (now1 - 150) * TxConstants.MAX_TX_PER_MS; + long pruneUpperBound2 = transactionPruningPlugin.fetchPruneUpperBound(now2, inactiveTxTimeNow2); + Assert.assertEquals(expectedPruneUpperBound1, pruneUpperBound2); + transactionPruningPlugin.pruneComplete(now2, expectedPruneUpperBound1); + } finally { + transactionPruningPlugin.destroy(); + if (txDataTable2 != null) { + hBaseAdmin.disableTable(txDataTable2); + hBaseAdmin.deleteTable(txDataTable2); + } + if (txTempTable != null) { + hBaseAdmin.disableTable(txTempTable); + hBaseAdmin.deleteTable(txTempTable); + } } + } private byte[] getRegionName(TableName dataTable, byte[] row) throws IOException { HRegionLocation regionLocation = http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/b3370b66/tephra-hbase-compat-1.0/src/test/resources/logback-test.xml ---------------------------------------------------------------------- diff --git a/tephra-hbase-compat-1.0/src/test/resources/logback-test.xml b/tephra-hbase-compat-1.0/src/test/resources/logback-test.xml new file mode 100644 index 0000000..36f0a37 --- /dev/null +++ b/tephra-hbase-compat-1.0/src/test/resources/logback-test.xml @@ -0,0 +1,39 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one + ~ or more contributor license agreements. See the NOTICE file + ~ distributed with this work for additional information + ~ regarding copyright ownership. The ASF licenses this file + ~ to you under the Apache License, Version 2.0 (the + ~ "License"); you may not use this file except in compliance + ~ with the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, + ~ software distributed under the License is distributed on an + ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + ~ KIND, either express or implied. See the License for the + ~ specific language governing permissions and limitations + ~ under the License. + --> + +<configuration> + <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender"> + <encoder> + <pattern>%d{ISO8601} - %-5p [%t:%C{1}@%L] - %m%n</pattern> + </encoder> + </appender> + + <logger name="org.apache.hadoop" level="WARN" /> + <!-- BlockStateChange is used by org.apache.hadoop.hdfs.server.blockmanagement.BlockManager --> + <logger name="BlockStateChange" level="WARN" /> + <logger name="org.apache.zookeeper" level="WARN" /> + <logger name="org.mortbay.log" level="WARN" /> + <logger name="org.apache.tephra.hbase.txprune" level="DEBUG" /> + + <root level="INFO"> + <appender-ref ref="STDOUT"/> + </root> + +</configuration> http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/b3370b66/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/HBaseTransactionPruningPlugin.java ---------------------------------------------------------------------- diff --git a/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/HBaseTransactionPruningPlugin.java b/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/HBaseTransactionPruningPlugin.java index 84c480a..42c9f84 100644 --- a/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/HBaseTransactionPruningPlugin.java +++ b/tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/HBaseTransactionPruningPlugin.java @@ -20,6 +20,7 @@ package org.apache.tephra.hbase.txprune; import com.google.common.base.Function; +import com.google.common.base.Predicate; import com.google.common.collect.Iterables; import com.google.common.collect.Maps; import com.google.common.collect.Sets; @@ -43,8 +44,10 @@ import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.Collections; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.SortedSet; import java.util.TreeMap; import java.util.TreeSet; @@ -286,6 +289,10 @@ public class HBaseTransactionPruningPlugin implements TransactionPruningPlugin { * @throws IOException when not able to talk to HBase */ private long computePruneUpperBound(TimeRegions timeRegions) throws IOException { + // Get the tables for the current time from the latest regions set + final Set<TableName> existingTables = getTableNamesForRegions(timeRegions.getRegions()); + LOG.debug("Tables for time {} = {}", timeRegions.getTime(), existingTables); + do { LOG.debug("Computing prune upper bound for {}", timeRegions); SortedSet<byte[]> transactionalRegions = timeRegions.getRegions(); @@ -302,6 +309,15 @@ public class HBaseTransactionPruningPlugin implements TransactionPruningPlugin { continue; } + // Remove non-existing tables from the transactional regions set, so that we don't lookup prune upper bounds + // for them. Since the deleted tables do not exist anymore, there is no need to make sure they have been + // compacted. This ensures that transient tables do not block pruning progress. + transactionalRegions = filterDeletedTableRegions(existingTables, transactionalRegions); + if (LOG.isDebugEnabled()) { + LOG.debug("Transactional regions after removing the regions of non-existing tables = {}", + Iterables.transform(transactionalRegions, TimeRegions.BYTE_ARR_TO_STRING_FN)); + } + // Get the prune upper bounds for all the transactional regions Map<byte[], Long> pruneUpperBoundRegions = dataJanitorState.getPruneUpperBoundForRegions(transactionalRegions); @@ -335,6 +351,25 @@ public class HBaseTransactionPruningPlugin implements TransactionPruningPlugin { return -1; } + private SortedSet<byte[]> filterDeletedTableRegions(final Set<TableName> existingTables, + SortedSet<byte[]> transactionalRegions) { + return Sets.filter(transactionalRegions, + new Predicate<byte[]>() { + @Override + public boolean apply(byte[] region) { + return existingTables.contains(HRegionInfo.getTable(region)); + } + }); + } + + private Set<TableName> getTableNamesForRegions(Set<byte[]> regions) { + Set<TableName> tableNames = new HashSet<>(regions.size()); + for (byte[] region : regions) { + tableNames.add(HRegionInfo.getTable(region)); + } + return tableNames; + } + private Map<byte[], Long> handleEmptyRegions(long inactiveTransactionBound, SortedSet<byte[]> transactionalRegions, Map<byte[], Long> pruneUpperBoundRegions) throws IOException { http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/b3370b66/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java ---------------------------------------------------------------------- diff --git a/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java b/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java index ac5e923..55348b0 100644 --- a/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java +++ b/tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java @@ -384,7 +384,72 @@ public class InvalidListPruneTest extends AbstractHBaseTableTest { hBaseAdmin.disableTable(txEmptyTable); hBaseAdmin.deleteTable(txEmptyTable); } + } + + @Test + public void testPruneTransientTable() throws Exception { + // Make sure that transient tables do not block the progress of pruning + + // Create a temp table + TableName txTempTable = TableName.valueOf("tempTable"); + createTable(txTempTable.getName(), new byte[][]{family}, false, + Collections.singletonList(TestTransactionProcessor.class.getName())); + + TableName txDataTable2 = null; + + TransactionPruningPlugin transactionPruningPlugin = new TestTransactionPruningPlugin(); + transactionPruningPlugin.initialize(conf); + + try { + long now1 = System.currentTimeMillis(); + long inactiveTxTimeNow1 = (now1 - 150) * TxConstants.MAX_TX_PER_MS; + long noPruneUpperBound = -1; + long expectedPruneUpperBound1 = (now1 - 200) * TxConstants.MAX_TX_PER_MS; + InMemoryTransactionStateCache.setTransactionSnapshot( + new TransactionSnapshot(expectedPruneUpperBound1, expectedPruneUpperBound1, expectedPruneUpperBound1, + ImmutableSet.of(expectedPruneUpperBound1), + ImmutableSortedMap.<Long, TransactionManager.InProgressTx>of())); + + // fetch prune upper bound, there should be no prune upper bound since nothing has been compacted yet. + // This run is only to store the initial set of regions + long pruneUpperBound1 = transactionPruningPlugin.fetchPruneUpperBound(now1, inactiveTxTimeNow1); + Assert.assertEquals(noPruneUpperBound, pruneUpperBound1); + transactionPruningPlugin.pruneComplete(now1, noPruneUpperBound); + + // Now delete the transient table + hBaseAdmin.disableTable(txTempTable); + hBaseAdmin.deleteTable(txTempTable); + txTempTable = null; + + // Compact the data table now + testUtil.compact(txDataTable1, true); + // Since the write to prune table happens async, we need to sleep a bit before checking the state of the table + TimeUnit.SECONDS.sleep(2); + + // Create a new table that will not be compacted + txDataTable2 = TableName.valueOf("invalidListPruneTestTable2"); + createTable(txDataTable2.getName(), new byte[][]{family}, false, + Collections.singletonList(TestTransactionProcessor.class.getName())); + + // fetch prune upper bound, there should be a prune upper bound even though txTempTable does not exist anymore, + // and txDataTable2 has not been compacted/flushed yet + long now2 = System.currentTimeMillis(); + long inactiveTxTimeNow2 = (now1 - 150) * TxConstants.MAX_TX_PER_MS; + long pruneUpperBound2 = transactionPruningPlugin.fetchPruneUpperBound(now2, inactiveTxTimeNow2); + Assert.assertEquals(expectedPruneUpperBound1, pruneUpperBound2); + transactionPruningPlugin.pruneComplete(now2, expectedPruneUpperBound1); + } finally { + transactionPruningPlugin.destroy(); + if (txDataTable2 != null) { + hBaseAdmin.disableTable(txDataTable2); + hBaseAdmin.deleteTable(txDataTable2); + } + if (txTempTable != null) { + hBaseAdmin.disableTable(txTempTable); + hBaseAdmin.deleteTable(txTempTable); + } } + } private byte[] getRegionName(TableName dataTable, byte[] row) throws IOException { HRegionLocation regionLocation = http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/b3370b66/tephra-hbase-compat-1.1-base/src/test/resources/logback-test.xml ---------------------------------------------------------------------- diff --git a/tephra-hbase-compat-1.1-base/src/test/resources/logback-test.xml b/tephra-hbase-compat-1.1-base/src/test/resources/logback-test.xml new file mode 100644 index 0000000..36f0a37 --- /dev/null +++ b/tephra-hbase-compat-1.1-base/src/test/resources/logback-test.xml @@ -0,0 +1,39 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one + ~ or more contributor license agreements. See the NOTICE file + ~ distributed with this work for additional information + ~ regarding copyright ownership. The ASF licenses this file + ~ to you under the Apache License, Version 2.0 (the + ~ "License"); you may not use this file except in compliance + ~ with the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, + ~ software distributed under the License is distributed on an + ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + ~ KIND, either express or implied. See the License for the + ~ specific language governing permissions and limitations + ~ under the License. + --> + +<configuration> + <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender"> + <encoder> + <pattern>%d{ISO8601} - %-5p [%t:%C{1}@%L] - %m%n</pattern> + </encoder> + </appender> + + <logger name="org.apache.hadoop" level="WARN" /> + <!-- BlockStateChange is used by org.apache.hadoop.hdfs.server.blockmanagement.BlockManager --> + <logger name="BlockStateChange" level="WARN" /> + <logger name="org.apache.zookeeper" level="WARN" /> + <logger name="org.mortbay.log" level="WARN" /> + <logger name="org.apache.tephra.hbase.txprune" level="DEBUG" /> + + <root level="INFO"> + <appender-ref ref="STDOUT"/> + </root> + +</configuration> http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/b3370b66/tephra-hbase-compat-1.3/src/main/java/org/apache/tephra/hbase/txprune/HBaseTransactionPruningPlugin.java ---------------------------------------------------------------------- diff --git a/tephra-hbase-compat-1.3/src/main/java/org/apache/tephra/hbase/txprune/HBaseTransactionPruningPlugin.java b/tephra-hbase-compat-1.3/src/main/java/org/apache/tephra/hbase/txprune/HBaseTransactionPruningPlugin.java index 84c480a..42c9f84 100644 --- a/tephra-hbase-compat-1.3/src/main/java/org/apache/tephra/hbase/txprune/HBaseTransactionPruningPlugin.java +++ b/tephra-hbase-compat-1.3/src/main/java/org/apache/tephra/hbase/txprune/HBaseTransactionPruningPlugin.java @@ -20,6 +20,7 @@ package org.apache.tephra.hbase.txprune; import com.google.common.base.Function; +import com.google.common.base.Predicate; import com.google.common.collect.Iterables; import com.google.common.collect.Maps; import com.google.common.collect.Sets; @@ -43,8 +44,10 @@ import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.Collections; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.SortedSet; import java.util.TreeMap; import java.util.TreeSet; @@ -286,6 +289,10 @@ public class HBaseTransactionPruningPlugin implements TransactionPruningPlugin { * @throws IOException when not able to talk to HBase */ private long computePruneUpperBound(TimeRegions timeRegions) throws IOException { + // Get the tables for the current time from the latest regions set + final Set<TableName> existingTables = getTableNamesForRegions(timeRegions.getRegions()); + LOG.debug("Tables for time {} = {}", timeRegions.getTime(), existingTables); + do { LOG.debug("Computing prune upper bound for {}", timeRegions); SortedSet<byte[]> transactionalRegions = timeRegions.getRegions(); @@ -302,6 +309,15 @@ public class HBaseTransactionPruningPlugin implements TransactionPruningPlugin { continue; } + // Remove non-existing tables from the transactional regions set, so that we don't lookup prune upper bounds + // for them. Since the deleted tables do not exist anymore, there is no need to make sure they have been + // compacted. This ensures that transient tables do not block pruning progress. + transactionalRegions = filterDeletedTableRegions(existingTables, transactionalRegions); + if (LOG.isDebugEnabled()) { + LOG.debug("Transactional regions after removing the regions of non-existing tables = {}", + Iterables.transform(transactionalRegions, TimeRegions.BYTE_ARR_TO_STRING_FN)); + } + // Get the prune upper bounds for all the transactional regions Map<byte[], Long> pruneUpperBoundRegions = dataJanitorState.getPruneUpperBoundForRegions(transactionalRegions); @@ -335,6 +351,25 @@ public class HBaseTransactionPruningPlugin implements TransactionPruningPlugin { return -1; } + private SortedSet<byte[]> filterDeletedTableRegions(final Set<TableName> existingTables, + SortedSet<byte[]> transactionalRegions) { + return Sets.filter(transactionalRegions, + new Predicate<byte[]>() { + @Override + public boolean apply(byte[] region) { + return existingTables.contains(HRegionInfo.getTable(region)); + } + }); + } + + private Set<TableName> getTableNamesForRegions(Set<byte[]> regions) { + Set<TableName> tableNames = new HashSet<>(regions.size()); + for (byte[] region : regions) { + tableNames.add(HRegionInfo.getTable(region)); + } + return tableNames; + } + private Map<byte[], Long> handleEmptyRegions(long inactiveTransactionBound, SortedSet<byte[]> transactionalRegions, Map<byte[], Long> pruneUpperBoundRegions) throws IOException { http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/b3370b66/tephra-hbase-compat-1.3/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java ---------------------------------------------------------------------- diff --git a/tephra-hbase-compat-1.3/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java b/tephra-hbase-compat-1.3/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java index ac5e923..55348b0 100644 --- a/tephra-hbase-compat-1.3/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java +++ b/tephra-hbase-compat-1.3/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java @@ -384,7 +384,72 @@ public class InvalidListPruneTest extends AbstractHBaseTableTest { hBaseAdmin.disableTable(txEmptyTable); hBaseAdmin.deleteTable(txEmptyTable); } + } + + @Test + public void testPruneTransientTable() throws Exception { + // Make sure that transient tables do not block the progress of pruning + + // Create a temp table + TableName txTempTable = TableName.valueOf("tempTable"); + createTable(txTempTable.getName(), new byte[][]{family}, false, + Collections.singletonList(TestTransactionProcessor.class.getName())); + + TableName txDataTable2 = null; + + TransactionPruningPlugin transactionPruningPlugin = new TestTransactionPruningPlugin(); + transactionPruningPlugin.initialize(conf); + + try { + long now1 = System.currentTimeMillis(); + long inactiveTxTimeNow1 = (now1 - 150) * TxConstants.MAX_TX_PER_MS; + long noPruneUpperBound = -1; + long expectedPruneUpperBound1 = (now1 - 200) * TxConstants.MAX_TX_PER_MS; + InMemoryTransactionStateCache.setTransactionSnapshot( + new TransactionSnapshot(expectedPruneUpperBound1, expectedPruneUpperBound1, expectedPruneUpperBound1, + ImmutableSet.of(expectedPruneUpperBound1), + ImmutableSortedMap.<Long, TransactionManager.InProgressTx>of())); + + // fetch prune upper bound, there should be no prune upper bound since nothing has been compacted yet. + // This run is only to store the initial set of regions + long pruneUpperBound1 = transactionPruningPlugin.fetchPruneUpperBound(now1, inactiveTxTimeNow1); + Assert.assertEquals(noPruneUpperBound, pruneUpperBound1); + transactionPruningPlugin.pruneComplete(now1, noPruneUpperBound); + + // Now delete the transient table + hBaseAdmin.disableTable(txTempTable); + hBaseAdmin.deleteTable(txTempTable); + txTempTable = null; + + // Compact the data table now + testUtil.compact(txDataTable1, true); + // Since the write to prune table happens async, we need to sleep a bit before checking the state of the table + TimeUnit.SECONDS.sleep(2); + + // Create a new table that will not be compacted + txDataTable2 = TableName.valueOf("invalidListPruneTestTable2"); + createTable(txDataTable2.getName(), new byte[][]{family}, false, + Collections.singletonList(TestTransactionProcessor.class.getName())); + + // fetch prune upper bound, there should be a prune upper bound even though txTempTable does not exist anymore, + // and txDataTable2 has not been compacted/flushed yet + long now2 = System.currentTimeMillis(); + long inactiveTxTimeNow2 = (now1 - 150) * TxConstants.MAX_TX_PER_MS; + long pruneUpperBound2 = transactionPruningPlugin.fetchPruneUpperBound(now2, inactiveTxTimeNow2); + Assert.assertEquals(expectedPruneUpperBound1, pruneUpperBound2); + transactionPruningPlugin.pruneComplete(now2, expectedPruneUpperBound1); + } finally { + transactionPruningPlugin.destroy(); + if (txDataTable2 != null) { + hBaseAdmin.disableTable(txDataTable2); + hBaseAdmin.deleteTable(txDataTable2); + } + if (txTempTable != null) { + hBaseAdmin.disableTable(txTempTable); + hBaseAdmin.deleteTable(txTempTable); + } } + } private byte[] getRegionName(TableName dataTable, byte[] row) throws IOException { HRegionLocation regionLocation = http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/b3370b66/tephra-hbase-compat-1.3/src/test/resources/logback-test.xml ---------------------------------------------------------------------- diff --git a/tephra-hbase-compat-1.3/src/test/resources/logback-test.xml b/tephra-hbase-compat-1.3/src/test/resources/logback-test.xml new file mode 100644 index 0000000..36f0a37 --- /dev/null +++ b/tephra-hbase-compat-1.3/src/test/resources/logback-test.xml @@ -0,0 +1,39 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one + ~ or more contributor license agreements. See the NOTICE file + ~ distributed with this work for additional information + ~ regarding copyright ownership. The ASF licenses this file + ~ to you under the Apache License, Version 2.0 (the + ~ "License"); you may not use this file except in compliance + ~ with the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, + ~ software distributed under the License is distributed on an + ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + ~ KIND, either express or implied. See the License for the + ~ specific language governing permissions and limitations + ~ under the License. + --> + +<configuration> + <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender"> + <encoder> + <pattern>%d{ISO8601} - %-5p [%t:%C{1}@%L] - %m%n</pattern> + </encoder> + </appender> + + <logger name="org.apache.hadoop" level="WARN" /> + <!-- BlockStateChange is used by org.apache.hadoop.hdfs.server.blockmanagement.BlockManager --> + <logger name="BlockStateChange" level="WARN" /> + <logger name="org.apache.zookeeper" level="WARN" /> + <logger name="org.mortbay.log" level="WARN" /> + <logger name="org.apache.tephra.hbase.txprune" level="DEBUG" /> + + <root level="INFO"> + <appender-ref ref="STDOUT"/> + </root> + +</configuration>
