Repository: phoenix Updated Branches: refs/heads/4.x-HBase-1.1 650666075 -> d78023017
PHOENIX-4530 Do not collect delete markers during major compaction of table with disabled mutable indexes Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/d7802301 Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/d7802301 Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/d7802301 Branch: refs/heads/4.x-HBase-1.1 Commit: d7802301707fe575612098c9f54b6659c87c91a0 Parents: 6506660 Author: Vincent Poon <vincentp...@apache.org> Authored: Thu Feb 22 16:56:36 2018 -0800 Committer: Vincent Poon <vincentp...@apache.org> Committed: Thu Feb 22 17:15:49 2018 -0800 ---------------------------------------------------------------------- .../PartialScannerResultsDisabledIT.java | 2 +- .../UngroupedAggregateRegionObserverIT.java | 171 ------------------- .../phoenix/end2end/index/MutableIndexIT.java | 57 +++++++ .../end2end/index/PartialIndexRebuilderIT.java | 39 ----- .../UngroupedAggregateRegionObserver.java | 121 +++++-------- .../java/org/apache/phoenix/util/TestUtil.java | 19 +++ 6 files changed, 118 insertions(+), 291 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/d7802301/phoenix-core/src/it/java/org/apache/phoenix/end2end/PartialScannerResultsDisabledIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/PartialScannerResultsDisabledIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/PartialScannerResultsDisabledIT.java index 817b0bd..59471dd 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/PartialScannerResultsDisabledIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/PartialScannerResultsDisabledIT.java @@ -151,7 +151,7 @@ public class PartialScannerResultsDisabledIT extends ParallelStatsDisabledIT { return RandomStringUtils.randomAlphabetic(length); } - private void writeSingleBatch(Connection connection, int batchSize, int numBatches, String tableName) throws Exception { + public static void writeSingleBatch(Connection connection, int batchSize, int numBatches, String tableName) throws Exception { for (int j = 0; j < numBatches; j++) { try (PreparedStatement statement = connection.prepareStatement(String.format(UPSERT_INTO_DATA_TABLE, tableName))) { http://git-wip-us.apache.org/repos/asf/phoenix/blob/d7802301/phoenix-core/src/it/java/org/apache/phoenix/end2end/UngroupedAggregateRegionObserverIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/UngroupedAggregateRegionObserverIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/UngroupedAggregateRegionObserverIT.java deleted file mode 100644 index 0ae1bb5..0000000 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/UngroupedAggregateRegionObserverIT.java +++ /dev/null @@ -1,171 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.phoenix.end2end; - -import static org.hamcrest.CoreMatchers.is; -import static org.junit.Assert.assertThat; -import static org.junit.Assert.assertTrue; -import static org.mockito.Mockito.never; - -import java.sql.Connection; -import java.sql.DriverManager; -import java.sql.SQLException; - -import org.apache.hadoop.hbase.HBaseTestingUtility; -import org.apache.log4j.Appender; -import org.apache.log4j.Level; -import org.apache.log4j.LogManager; -import org.apache.log4j.spi.LoggingEvent; -import org.apache.phoenix.coprocessor.UngroupedAggregateRegionObserver; -import org.apache.phoenix.jdbc.PhoenixConnection; -import org.apache.phoenix.schema.PIndexState; -import org.apache.phoenix.util.EnvironmentEdgeManager; -import org.apache.phoenix.util.IndexUtil; -import org.apache.phoenix.util.PropertiesUtil; -import org.apache.phoenix.util.SchemaUtil; -import org.apache.phoenix.util.TestUtil; -import org.junit.Before; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.mockito.ArgumentCaptor; -import org.mockito.Captor; -import org.mockito.Mock; -import org.mockito.Mockito; -import org.mockito.runners.MockitoJUnitRunner; - -@RunWith(MockitoJUnitRunner.class) -public class UngroupedAggregateRegionObserverIT extends ParallelStatsDisabledIT { - - private String dataTableName; - private String indexTableName; - private String schemaName; - private String dataTableFullName; - private static String indexTableFullName; - - @Mock - private Appender mockAppender; - - @Captor - private ArgumentCaptor<LoggingEvent> captorLoggingEvent; - private UngroupedAggregateRegionObserver ungroupedObserver; - - @Before - public void setup() { - ungroupedObserver = new UngroupedAggregateRegionObserver(); - ungroupedObserver.setCompactionConfig(PropertiesUtil.cloneConfig(config)); - } - - /** - * Tests the that post compact hook doesn't log any NPE for a System table - */ - @Test - public void testPostCompactSystemSequence() throws Exception { - try (Connection conn = DriverManager.getConnection(getUrl())) { - startCapturingIndexLog(); - // run the post-compact hook - ungroupedObserver.clearTsOnDisabledIndexes("SYSTEM.SEQUENCE"); - stopCapturingIndexLog(); - // uneventful - nothing should be logged - Mockito.verify(mockAppender, never()) - .doAppend(captorLoggingEvent.capture()); - } - } - - /** - * Tests that calling the post compact hook on the data table permanently disables an index that - * is being rebuilt (i.e. already disabled or inactive) - */ - @Test - public void testPostCompactDataTableDuringRebuild() throws Exception { - try (Connection conn = DriverManager.getConnection(getUrl())) { - generateUniqueTableNames(); - testRebuildPostCompact(conn, dataTableFullName); - } - } - - /** - * Tests that calling the post compact hook on the index table permanently disables an index - * that is being rebuilt (i.e. already disabled or inactive) - */ - @Test - public void testPostCompactIndexTableDuringRebuild() throws Exception { - try (Connection conn = DriverManager.getConnection(getUrl())) { - generateUniqueTableNames(); - testRebuildPostCompact(conn, indexTableFullName); - } - } - - private void testRebuildPostCompact(Connection conn, String tableToCompact) - throws SQLException { - conn.createStatement().execute( - String.format(PartialScannerResultsDisabledIT.TEST_TABLE_DDL, dataTableFullName)); - conn.createStatement().execute(String.format(PartialScannerResultsDisabledIT.INDEX_1_DDL, - indexTableName, dataTableFullName)); - // disable the index, simulating an index write failure - PhoenixConnection pConn = conn.unwrap(PhoenixConnection.class); - IndexUtil.updateIndexState(pConn, indexTableFullName, PIndexState.DISABLE, - EnvironmentEdgeManager.currentTimeMillis()); - - // run the post-compact hook on the data table - startCapturingIndexLog(); - ungroupedObserver.clearTsOnDisabledIndexes(tableToCompact); - stopCapturingIndexLog(); - // an event should've been logged - Mockito.verify(mockAppender).doAppend(captorLoggingEvent.capture()); - LoggingEvent loggingEvent = captorLoggingEvent.getValue(); - assertThat(loggingEvent.getLevel(), is(Level.INFO)); - // index should be permanently disabled (disabletime of 0) - assertTrue(TestUtil.checkIndexState(pConn, indexTableFullName, PIndexState.DISABLE, 0L)); - } - - /** - * Tests that a non-Phoenix table (created purely through HBase) doesn't log a warning in - * postCompact - */ - @Test - public void testPostCompactTableNotFound() throws Exception { - try (Connection conn = DriverManager.getConnection(getUrl())) { - HBaseTestingUtility utility = getUtility(); - String nonPhoenixTable = "NOT_A_PHOENIX_TABLE"; - utility.getHBaseAdmin().createTable(utility.createTableDescriptor(nonPhoenixTable)); - startCapturingIndexLog(); - ungroupedObserver.clearTsOnDisabledIndexes(nonPhoenixTable); - stopCapturingIndexLog(); - // a debug level event should've been logged - Mockito.verify(mockAppender).doAppend(captorLoggingEvent.capture()); - LoggingEvent loggingEvent = captorLoggingEvent.getValue(); - assertThat(loggingEvent.getLevel(), is(Level.DEBUG)); - } - } - - private void stopCapturingIndexLog() { - LogManager.getLogger(UngroupedAggregateRegionObserver.class).removeAppender(mockAppender); - } - - private void startCapturingIndexLog() { - LogManager.getLogger(UngroupedAggregateRegionObserver.class).addAppender(mockAppender); - } - - private void generateUniqueTableNames() { - schemaName = generateUniqueName(); - dataTableName = generateUniqueName() + "_DATA"; - dataTableFullName = SchemaUtil.getTableName(schemaName, dataTableName); - indexTableName = generateUniqueName() + "_IDX"; - indexTableFullName = SchemaUtil.getTableName(schemaName, indexTableName); - } -} http://git-wip-us.apache.org/repos/asf/phoenix/blob/d7802301/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexIT.java index b1e5183..253a134 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexIT.java @@ -40,14 +40,23 @@ import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.client.HTableInterface; +import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.HStore; import org.apache.hadoop.hbase.snapshot.SnapshotTestingUtils; +import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Threads; import org.apache.phoenix.end2end.ParallelStatsDisabledIT; +import org.apache.phoenix.end2end.PartialScannerResultsDisabledIT; import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.query.BaseTest; import org.apache.phoenix.query.ConnectionQueryServices; +import org.apache.phoenix.query.QueryConstants; import org.apache.phoenix.query.QueryServices; +import org.apache.phoenix.schema.PIndexState; import org.apache.phoenix.schema.PTableKey; +import org.apache.phoenix.util.ByteUtil; +import org.apache.phoenix.util.EnvironmentEdgeManager; +import org.apache.phoenix.util.IndexUtil; import org.apache.phoenix.util.PhoenixRuntime; import org.apache.phoenix.util.PropertiesUtil; import org.apache.phoenix.util.QueryUtil; @@ -743,6 +752,54 @@ public class MutableIndexIT extends ParallelStatsDisabledIT { } } + // Tests that if major compaction is run on a table with a disabled index, + // deleted cells are kept + @Test + public void testCompactDisabledIndex() throws Exception { + try (Connection conn = getConnection()) { + String schemaName = generateUniqueName(); + String dataTableName = generateUniqueName() + "_DATA"; + String dataTableFullName = SchemaUtil.getTableName(schemaName, dataTableName); + String indexTableName = generateUniqueName() + "_IDX"; + String indexTableFullName = SchemaUtil.getTableName(schemaName, indexTableName); + conn.createStatement().execute( + String.format(PartialScannerResultsDisabledIT.TEST_TABLE_DDL, dataTableFullName)); + conn.createStatement().execute(String.format(PartialScannerResultsDisabledIT.INDEX_1_DDL, + indexTableName, dataTableFullName)); + + //insert a row, and delete it + PartialScannerResultsDisabledIT.writeSingleBatch(conn, 1, 1, dataTableFullName); + conn.createStatement().execute("DELETE FROM " + dataTableFullName); + conn.commit(); + + // disable the index, simulating an index write failure + PhoenixConnection pConn = conn.unwrap(PhoenixConnection.class); + IndexUtil.updateIndexState(pConn, indexTableFullName, PIndexState.DISABLE, + EnvironmentEdgeManager.currentTimeMillis()); + + // major compaction should not remove the deleted row + List<HRegion> regions = getUtility().getHBaseCluster().getRegions(TableName.valueOf(dataTableFullName)); + HRegion hRegion = regions.get(0); + hRegion.flush(true); + HStore store = (HStore) hRegion.getStore(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES); + store.triggerMajorCompaction(); + store.compactRecentForTestingAssumingDefaultPolicy(1); + HTableInterface dataHTI = conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(Bytes.toBytes(dataTableFullName)); + assertEquals(1, TestUtil.getRawRowCount(dataHTI)); + + // reenable the index + IndexUtil.updateIndexState(pConn, indexTableFullName, PIndexState.INACTIVE, + EnvironmentEdgeManager.currentTimeMillis()); + IndexUtil.updateIndexState(pConn, indexTableFullName, PIndexState.ACTIVE, 0L); + + // now major compaction should remove the deleted row + store.triggerMajorCompaction(); + store.compactRecentForTestingAssumingDefaultPolicy(1); + dataHTI = conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(Bytes.toBytes(dataTableFullName)); + assertEquals(0, TestUtil.getRawRowCount(dataHTI)); + } + } + private void upsertRow(String dml, Connection tenantConn, int i) throws SQLException { PreparedStatement stmt = tenantConn.prepareStatement(dml); stmt.setString(1, "00000000000000" + String.valueOf(i)); http://git-wip-us.apache.org/repos/asf/phoenix/blob/d7802301/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/PartialIndexRebuilderIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/PartialIndexRebuilderIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/PartialIndexRebuilderIT.java index 3961d32..46443e3 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/PartialIndexRebuilderIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/PartialIndexRebuilderIT.java @@ -318,45 +318,6 @@ public class PartialIndexRebuilderIT extends BaseUniqueNamesOwnClusterIT { conn.commit(); return hasInactiveIndex; } - - @Test - public void testCompactionDuringRebuild() throws Throwable { - String schemaName = generateUniqueName(); - String tableName = generateUniqueName(); - String indexName1 = generateUniqueName(); - String indexName2 = generateUniqueName(); - final String fullTableName = SchemaUtil.getTableName(schemaName, tableName); - String fullIndexName1 = SchemaUtil.getTableName(schemaName, indexName1); - String fullIndexName2 = SchemaUtil.getTableName(schemaName, indexName2); - final MyClock clock = new MyClock(1000); - // Use our own clock to prevent race between partial rebuilder and compaction - EnvironmentEdgeManager.injectEdge(clock); - try (Connection conn = DriverManager.getConnection(getUrl())) { - conn.createStatement().execute("CREATE TABLE " + fullTableName + "(k INTEGER PRIMARY KEY, v1 INTEGER, v2 INTEGER) COLUMN_ENCODED_BYTES = 0, STORE_NULLS=true, GUIDE_POSTS_WIDTH=1000"); - clock.time += 100; - conn.createStatement().execute("CREATE INDEX " + indexName1 + " ON " + fullTableName + " (v1) INCLUDE (v2)"); - clock.time += 100; - conn.createStatement().execute("CREATE INDEX " + indexName2 + " ON " + fullTableName + " (v2) INCLUDE (v1)"); - clock.time += 100; - conn.createStatement().execute("UPSERT INTO " + fullTableName + " VALUES(1, 2, 3)"); - conn.commit(); - clock.time += 100; - long disableTS = EnvironmentEdgeManager.currentTimeMillis(); - HTableInterface metaTable = conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES); - IndexUtil.updateIndexState(fullIndexName1, disableTS, metaTable, PIndexState.DISABLE); - IndexUtil.updateIndexState(fullIndexName2, disableTS, metaTable, PIndexState.DISABLE); - clock.time += 100; - TestUtil.doMajorCompaction(conn, fullIndexName1); - clock.time += 100; - assertTrue(TestUtil.checkIndexState(conn, fullIndexName1, PIndexState.DISABLE, 0L)); - assertFalse(TestUtil.checkIndexState(conn, fullIndexName2, PIndexState.DISABLE, 0L)); - TestUtil.doMajorCompaction(conn, fullTableName); - clock.time += 100; - assertTrue(TestUtil.checkIndexState(conn, fullIndexName2, PIndexState.DISABLE, 0L)); - } finally { - EnvironmentEdgeManager.injectEdge(null); - } - } @Test @Repeat(5) http://git-wip-us.apache.org/repos/asf/phoenix/blob/d7802301/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java index 93b42bc..6108aca 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java @@ -65,11 +65,12 @@ import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.ipc.RpcControllerFactory; import org.apache.hadoop.hbase.ipc.controller.InterRegionServerIndexRpcControllerFactory; import org.apache.hadoop.hbase.regionserver.InternalScanner; +import org.apache.hadoop.hbase.regionserver.KeyValueScanner; import org.apache.hadoop.hbase.regionserver.Region; import org.apache.hadoop.hbase.regionserver.RegionScanner; import org.apache.hadoop.hbase.regionserver.ScanType; import org.apache.hadoop.hbase.regionserver.Store; -import org.apache.hadoop.hbase.regionserver.StoreFile; +import org.apache.hadoop.hbase.regionserver.StoreScanner; import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.util.Bytes; @@ -102,14 +103,12 @@ import org.apache.phoenix.query.QueryServices; import org.apache.phoenix.query.QueryServicesOptions; import org.apache.phoenix.schema.ColumnFamilyNotFoundException; import org.apache.phoenix.schema.PColumn; -import org.apache.phoenix.schema.PIndexState; import org.apache.phoenix.schema.PRow; import org.apache.phoenix.schema.PTable; import org.apache.phoenix.schema.PTableImpl; import org.apache.phoenix.schema.PTableType; import org.apache.phoenix.schema.RowKeySchema; import org.apache.phoenix.schema.SortOrder; -import org.apache.phoenix.schema.TableNotFoundException; import org.apache.phoenix.schema.TableRef; import org.apache.phoenix.schema.ValueSchema.Field; import org.apache.phoenix.schema.stats.StatisticsCollectionRunTracker; @@ -144,7 +143,6 @@ import org.apache.phoenix.util.StringUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Throwables; import com.google.common.collect.Lists; import com.google.common.collect.Sets; @@ -986,82 +984,6 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver }); } - @Override - public void postCompact(final ObserverContext<RegionCoprocessorEnvironment> c, final Store store, - final StoreFile resultFile, CompactionRequest request) throws IOException { - // If we're compacting all files, then delete markers are removed - // and we must permanently disable an index that needs to be - // partially rebuild because we're potentially losing the information - // we need to successfully rebuilt it. - if (request.isAllFiles() || request.isMajor()) { - // Compaction and split upcalls run with the effective user context of the requesting user. - // This will lead to failure of cross cluster RPC if the effective user is not - // the login user. Switch to the login user context to ensure we have the expected - // security context. - User.runAsLoginUser(new PrivilegedExceptionAction<Void>() { - @Override - public Void run() throws Exception { - String fullTableName = c.getEnvironment().getRegion().getRegionInfo().getTable().getNameAsString(); - clearTsOnDisabledIndexes(fullTableName); - return null; - } - }); - } - } - - @VisibleForTesting - public void clearTsOnDisabledIndexes(final String fullTableName) { - try (PhoenixConnection conn = - QueryUtil.getConnectionOnServer(compactionConfig).unwrap(PhoenixConnection.class)) { - String baseTable = fullTableName; - PTable table = PhoenixRuntime.getTableNoCache(conn, baseTable); - List<PTable> indexes; - // if it's an index table, we just need to check if it's disabled - if (PTableType.INDEX.equals(table.getType())) { - indexes = Lists.newArrayList(table.getIndexes()); - indexes.add(table); - } else { - // for a data table, check all its indexes - indexes = table.getIndexes(); - } - // FIXME need handle views and indexes on views as well - // if any index is disabled, we won't have all the data for a rebuild after compaction - for (PTable index : indexes) { - if (index.getIndexDisableTimestamp() != 0) { - try { - logger.info( - "Major compaction running while index on table is disabled. Clearing index disable timestamp: " - + index); - IndexUtil.updateIndexState(conn, index.getName().getString(), - PIndexState.DISABLE, Long.valueOf(0L)); - } catch (SQLException e) { - logger.warn( - "Unable to permanently disable index " + index.getName().getString(), - e); - } - } - } - } catch (Exception e) { - if (e instanceof TableNotFoundException) { - logger.debug("Ignoring HBase table that is not a Phoenix table: " + fullTableName); - // non-Phoenix HBase tables won't be found, do nothing - return; - } - // If we can't reach the stats table, don't interrupt the normal - // compaction operation, just log a warning. - if (logger.isWarnEnabled()) { - logger.warn("Unable to permanently disable indexes being partially rebuild for " - + fullTableName, - e); - } - } - } - - @VisibleForTesting - public void setCompactionConfig(Configuration compactionConfig) { - this.compactionConfig = compactionConfig; - } - private static PTable deserializeTable(byte[] b) { try { PTableProtos.PTable ptableProto = PTableProtos.PTable.parseFrom(b); @@ -1422,4 +1344,43 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver protected boolean isRegionObserverFor(Scan scan) { return scan.getAttribute(BaseScannerRegionObserver.UNGROUPED_AGG) != null; } + + @Override + public InternalScanner preCompactScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c, + final Store store, final List<? extends KeyValueScanner> scanners, ScanType scanType, + long earliestPutTs, final InternalScanner s, final CompactionRequest request) throws IOException { + // Compaction and split upcalls run with the effective user context of the requesting user. + // This will lead to failure of cross cluster RPC if the effective user is not + // the login user. Switch to the login user context to ensure we have the expected + // security context. + return User.runAsLoginUser(new PrivilegedExceptionAction<InternalScanner>() { + @Override + public InternalScanner run() throws Exception { + // If the index is disabled, keep the deleted cells so the rebuild doesn't corrupt the index + if (request.isMajor()) { + String fullTableName = c.getEnvironment().getRegion().getRegionInfo().getTable().getNameAsString(); + try (PhoenixConnection conn = + QueryUtil.getConnectionOnServer(compactionConfig).unwrap(PhoenixConnection.class)) { + String baseTable = fullTableName; + PTable table = PhoenixRuntime.getTableNoCache(conn, baseTable); + List<PTable> indexes = PTableType.INDEX.equals(table.getType()) ? Lists.newArrayList(table) : table.getIndexes(); + // FIXME need to handle views and indexes on views as well + for (PTable index : indexes) { + if (index.getIndexDisableTimestamp() != 0) { + logger.info( + "Modifying major compaction scanner to retain deleted cells for a table with disabled index: " + + baseTable); + Scan scan = new Scan(); + scan.setMaxVersions(); + return new StoreScanner(store, store.getScanInfo(), scan, scanners, + ScanType.COMPACT_RETAIN_DELETES, store.getSmallestReadPoint(), + HConstants.OLDEST_TIMESTAMP); + } + } + } + } + return s; + } + }); + } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/d7802301/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java b/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java index 4a105f6..d50589f 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java @@ -853,6 +853,25 @@ public class TestUtil { System.out.println("-----------------------------------------------"); } + public static int getRawRowCount(HTableInterface table) throws IOException { + Scan s = new Scan(); + s.setRaw(true);; + s.setMaxVersions(); + int rows = 0; + try (ResultScanner scanner = table.getScanner(s)) { + Result result = null; + while ((result = scanner.next()) != null) { + rows++; + CellScanner cellScanner = result.cellScanner(); + Cell current = null; + while (cellScanner.advance()) { + current = cellScanner.current(); + } + } + } + return rows; + } + public static void dumpIndexStatus(Connection conn, String indexName) throws IOException, SQLException { try (HTableInterface table = conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES)) { System.out.println("************ dumping index status for " + indexName + " **************");