Repository: phoenix Updated Branches: refs/heads/master 2da904ebc -> 701c447d3
Revert "PHOENIX-4682 UngroupedAggregateRegionObserver preCompactScannerOpen hook should not throw exceptions" This reverts commit 2da904ebcb84d03231cccae298d78b0add1012ba. Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/49610d18 Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/49610d18 Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/49610d18 Branch: refs/heads/master Commit: 49610d188a34e078514cfc61560e2389933b80b0 Parents: 2da904e Author: Vincent Poon <[email protected]> Authored: Thu Apr 5 10:02:58 2018 -0700 Committer: Vincent Poon <[email protected]> Committed: Thu Apr 5 10:02:58 2018 -0700 ---------------------------------------------------------------------- .../phoenix/end2end/index/MutableIndexIT.java | 41 +--------- .../UngroupedAggregateRegionObserver.java | 81 ++++++++------------ 2 files changed, 35 insertions(+), 87 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/49610d18/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexIT.java index 7456ba6..efae15e 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexIT.java @@ -41,25 +41,22 @@ import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.MetaTableAccessor; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.HBaseAdmin; -import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.HTableInterface; -import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.HStore; import org.apache.hadoop.hbase.snapshot.SnapshotTestingUtils; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Threads; -import org.apache.phoenix.coprocessor.UngroupedAggregateRegionObserver; import org.apache.phoenix.end2end.ParallelStatsDisabledIT; import org.apache.phoenix.end2end.PartialScannerResultsDisabledIT; import org.apache.phoenix.jdbc.PhoenixConnection; -import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData; import org.apache.phoenix.query.BaseTest; import org.apache.phoenix.query.ConnectionQueryServices; import org.apache.phoenix.query.QueryConstants; import org.apache.phoenix.query.QueryServices; import org.apache.phoenix.schema.PIndexState; import org.apache.phoenix.schema.PTableKey; +import org.apache.phoenix.util.ByteUtil; import org.apache.phoenix.util.EnvironmentEdgeManager; import org.apache.phoenix.util.IndexUtil; import org.apache.phoenix.util.PhoenixRuntime; @@ -870,42 +867,6 @@ public class MutableIndexIT extends ParallelStatsDisabledIT { } } - // some tables (e.g. indexes on views) have UngroupedAgg coproc loaded, but don't have a - // corresponding row in syscat. This tests that compaction isn't blocked - @Test(timeout=120000) - public void testCompactNonPhoenixTable() throws Exception { - try (Connection conn = getConnection()) { - // create a vanilla HBase table (non-Phoenix) - String randomTable = generateUniqueName(); - TableName hbaseTN = TableName.valueOf(randomTable); - byte[] famBytes = Bytes.toBytes("fam"); - HTable hTable = getUtility().createTable(hbaseTN, famBytes); - TestUtil.addCoprocessor(conn, randomTable, UngroupedAggregateRegionObserver.class); - Put put = new Put(Bytes.toBytes("row")); - byte[] value = new byte[1]; - Bytes.random(value); - put.add(famBytes, Bytes.toBytes("colQ"), value); - hTable.put(put); - hTable.flushCommits(); - - // major compaction shouldn't cause a timeout or RS abort - List<HRegion> regions = getUtility().getHBaseCluster().getRegions(hbaseTN); - HRegion hRegion = regions.get(0); - hRegion.flush(true); - HStore store = (HStore) hRegion.getStore(famBytes); - store.triggerMajorCompaction(); - store.compactRecentForTestingAssumingDefaultPolicy(1); - - // we should be able to compact syscat itself as well - regions = getUtility().getHBaseCluster().getRegions(TableName.valueOf(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME)); - hRegion = regions.get(0); - hRegion.flush(true); - store = (HStore) hRegion.getStore(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES); - store.triggerMajorCompaction(); - store.compactRecentForTestingAssumingDefaultPolicy(1); - } - } - private void upsertRow(String dml, Connection tenantConn, int i) throws SQLException { PreparedStatement stmt = tenantConn.prepareStatement(dml); stmt.setString(1, "00000000000000" + String.valueOf(i)); http://git-wip-us.apache.org/repos/asf/phoenix/blob/49610d18/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java index 965ba1b..72ca58d 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java @@ -99,7 +99,6 @@ import org.apache.phoenix.index.PhoenixIndexCodec; import org.apache.phoenix.index.PhoenixIndexFailurePolicy; import org.apache.phoenix.index.PhoenixIndexFailurePolicy.MutateCommand; import org.apache.phoenix.jdbc.PhoenixConnection; -import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData; import org.apache.phoenix.join.HashJoinInfo; import org.apache.phoenix.memory.MemoryManager.MemoryChunk; import org.apache.phoenix.query.QueryConstants; @@ -113,7 +112,6 @@ import org.apache.phoenix.schema.PTableImpl; import org.apache.phoenix.schema.PTableType; import org.apache.phoenix.schema.RowKeySchema; import org.apache.phoenix.schema.SortOrder; -import org.apache.phoenix.schema.TableNotFoundException; import org.apache.phoenix.schema.TableRef; import org.apache.phoenix.schema.ValueSchema.Field; import org.apache.phoenix.schema.stats.StatisticsCollectionRunTracker; @@ -964,36 +962,35 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver } @Override - public InternalScanner preCompact(final ObserverContext<RegionCoprocessorEnvironment> c, - final Store store, final InternalScanner scanner, final ScanType scanType) - throws IOException { - if (scanType.equals(ScanType.COMPACT_DROP_DELETES)) { - final TableName table = c.getEnvironment().getRegion().getRegionInfo().getTable(); - // Compaction and split upcalls run with the effective user context of the requesting user. - // This will lead to failure of cross cluster RPC if the effective user is not - // the login user. Switch to the login user context to ensure we have the expected - // security context. - return User.runAsLoginUser(new PrivilegedExceptionAction<InternalScanner>() { - @Override public InternalScanner run() throws Exception { - InternalScanner internalScanner = scanner; + public InternalScanner preCompact(final ObserverContext<RegionCoprocessorEnvironment> c, final Store store, + final InternalScanner scanner, final ScanType scanType) throws IOException { + // Compaction and split upcalls run with the effective user context of the requesting user. + // This will lead to failure of cross cluster RPC if the effective user is not + // the login user. Switch to the login user context to ensure we have the expected + // security context. + return User.runAsLoginUser(new PrivilegedExceptionAction<InternalScanner>() { + @Override + public InternalScanner run() throws Exception { + TableName table = c.getEnvironment().getRegion().getRegionInfo().getTable(); + InternalScanner internalScanner = scanner; + if (scanType.equals(ScanType.COMPACT_DROP_DELETES)) { try { long clientTimeStamp = EnvironmentEdgeManager.currentTimeMillis(); StatisticsCollector stats = StatisticsCollectorFactory.createStatisticsCollector( c.getEnvironment(), table.getNameAsString(), clientTimeStamp, store.getFamily().getName()); internalScanner = stats.createCompactionScanner(c.getEnvironment(), store, scanner); - } catch (Exception e) { + } catch (IOException e) { // If we can't reach the stats table, don't interrupt the normal - // compaction operation, just log a warning. - if (logger.isWarnEnabled()) { - logger.warn("Unable to collect stats for " + table, e); - } + // compaction operation, just log a warning. + if (logger.isWarnEnabled()) { + logger.warn("Unable to collect stats for " + table, e); + } } - return internalScanner; } - }); - } - return scanner; + return internalScanner; + } + }); } private static PTable deserializeTable(byte[] b) { @@ -1365,23 +1362,23 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver // This will lead to failure of cross cluster RPC if the effective user is not // the login user. Switch to the login user context to ensure we have the expected // security context. - final String fullTableName = c.getEnvironment().getRegion().getRegionInfo().getTable().getNameAsString(); - // since we will make a call to syscat, do nothing if we are compacting syscat itself - if (request.isMajor() && !PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME.equals(fullTableName)) { - return User.runAsLoginUser(new PrivilegedExceptionAction<InternalScanner>() { - @Override - public InternalScanner run() throws Exception { - // If the index is disabled, keep the deleted cells so the rebuild doesn't corrupt the index - try (PhoenixConnection conn = - QueryUtil.getConnectionOnServer(compactionConfig).unwrap(PhoenixConnection.class)) { - PTable table = PhoenixRuntime.getTableNoCache(conn, fullTableName); + return User.runAsLoginUser(new PrivilegedExceptionAction<InternalScanner>() { + @Override + public InternalScanner run() throws Exception { + // If the index is disabled, keep the deleted cells so the rebuild doesn't corrupt the index + if (request.isMajor()) { + String fullTableName = c.getEnvironment().getRegion().getRegionInfo().getTable().getNameAsString(); + try (PhoenixConnection conn = + QueryUtil.getConnectionOnServer(compactionConfig).unwrap(PhoenixConnection.class)) { + String baseTable = fullTableName; + PTable table = PhoenixRuntime.getTableNoCache(conn, baseTable); List<PTable> indexes = PTableType.INDEX.equals(table.getType()) ? Lists.newArrayList(table) : table.getIndexes(); // FIXME need to handle views and indexes on views as well for (PTable index : indexes) { if (index.getIndexDisableTimestamp() != 0) { logger.info( "Modifying major compaction scanner to retain deleted cells for a table with disabled index: " - + fullTableName); + + baseTable); Scan scan = new Scan(); scan.setMaxVersions(); return new StoreScanner(store, store.getScanInfo(), scan, scanners, @@ -1389,20 +1386,10 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver HConstants.OLDEST_TIMESTAMP); } } - } catch (Exception e) { - if (e instanceof TableNotFoundException) { - logger.debug("Ignoring HBase table that is not a Phoenix table: " + fullTableName); - // non-Phoenix HBase tables won't be found, do nothing - } else { - logger.error("Unable to modify compaction scanner to retain deleted cells for a table with disabled Index; " - + fullTableName, - e); - } } - return s; } - }); - } - return s; + return s; + } + }); } }
