Optimize order by and grouped aggregations by taking advantage of column encoding
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/9525c72f Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/9525c72f Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/9525c72f Branch: refs/heads/encodecolumns Commit: 9525c72fb45522c84bbb2fbde62042e1af735284 Parents: 6461e95 Author: Samarth <[email protected]> Authored: Fri May 27 17:44:53 2016 -0700 Committer: Samarth <[email protected]> Committed: Fri May 27 17:44:53 2016 -0700 ---------------------------------------------------------------------- .../apache/phoenix/end2end/AlterTableIT.java | 108 +- .../phoenix/end2end/AlterTableWithViewsIT.java | 112 +- .../apache/phoenix/end2end/CreateTableIT.java | 47 +- .../org/apache/phoenix/end2end/GroupByIT.java | 3 - .../phoenix/end2end/RowValueConstructorIT.java | 2 +- .../apache/phoenix/end2end/UpsertValuesIT.java | 2 +- .../apache/phoenix/compile/FromCompiler.java | 4 +- .../apache/phoenix/compile/JoinCompiler.java | 2 +- .../phoenix/compile/ListJarsQueryPlan.java | 1 + .../compile/TupleProjectionCompiler.java | 25 +- .../apache/phoenix/compile/UnionCompiler.java | 4 +- .../coprocessor/BaseScannerRegionObserver.java | 22 +- .../coprocessor/DelegateRegionScanner.java | 5 + .../GroupedAggregateRegionObserver.java | 26 +- .../coprocessor/HashJoinRegionScanner.java | 2 +- .../coprocessor/MetaDataEndpointImpl.java | 17 +- .../phoenix/coprocessor/ScanRegionObserver.java | 17 +- .../UngroupedAggregateRegionObserver.java | 12 +- .../coprocessor/generated/PTableProtos.java | 1080 ++---------------- .../phoenix/execute/SortMergeJoinPlan.java | 1 + .../apache/phoenix/execute/TupleProjector.java | 6 +- .../index/PhoenixTransactionalIndexer.java | 22 +- .../phoenix/iterate/BaseResultIterators.java | 21 +- .../iterate/LookAheadResultIterator.java | 2 +- .../phoenix/iterate/MappedByteBufferQueue.java | 1 + .../phoenix/iterate/OrderedResultIterator.java | 3 +- .../iterate/RegionScannerResultIterator.java | 14 +- .../phoenix/jdbc/PhoenixDatabaseMetaData.java | 3 +- .../apache/phoenix/jdbc/PhoenixResultSet.java | 2 +- .../apache/phoenix/join/HashCacheFactory.java | 1 + .../apache/phoenix/query/QueryConstants.java | 41 +- .../phoenix/query/QueryServicesOptions.java | 2 +- .../apache/phoenix/schema/DelegateTable.java | 5 +- .../apache/phoenix/schema/KeyValueSchema.java | 2 + .../apache/phoenix/schema/MetaDataClient.java | 189 ++- .../java/org/apache/phoenix/schema/PTable.java | 50 +- .../org/apache/phoenix/schema/PTableImpl.java | 70 +- .../schema/tuple/BoundedSkipNullCellsList.java | 354 ++++-- .../tuple/PositionBasedMultiKeyValueTuple.java | 19 +- .../phoenix/schema/tuple/ResultTuple.java | 33 +- .../java/org/apache/phoenix/util/IndexUtil.java | 8 +- .../org/apache/phoenix/util/ResultUtil.java | 60 - .../java/org/apache/phoenix/util/ScanUtil.java | 9 + .../phoenix/execute/CorrelatePlanTest.java | 4 +- .../execute/LiteralResultIteratorPlanTest.java | 4 +- .../phoenix/execute/UnnestArrayPlanTest.java | 5 +- phoenix-protocol/src/main/PTable.proto | 9 +- 47 files changed, 888 insertions(+), 1543 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/9525c72f/phoenix-core/src/it/java/org/apache/phoenix/end2end/AlterTableIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/AlterTableIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/AlterTableIT.java index 900a040..d588c63 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/AlterTableIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/AlterTableIT.java @@ -26,6 +26,7 @@ import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_NAME; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_SCHEM; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_SEQ_NUM; import static org.apache.phoenix.query.QueryConstants.DEFAULT_COLUMN_FAMILY; +import static org.apache.phoenix.query.QueryConstants.ENCODED_CQ_COUNTER_INITIAL_VALUE; import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES; import static org.apache.phoenix.util.TestUtil.closeConnection; import static org.apache.phoenix.util.TestUtil.closeStatement; @@ -63,6 +64,7 @@ import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData; import org.apache.phoenix.query.QueryConstants; import org.apache.phoenix.query.QueryServices; import org.apache.phoenix.schema.PTable; +import org.apache.phoenix.schema.PTable.EncodedCQCounter; import org.apache.phoenix.schema.PTableKey; import org.apache.phoenix.schema.TableNotFoundException; import org.apache.phoenix.util.IndexUtil; @@ -2244,13 +2246,11 @@ public class AlterTableIT extends BaseOwnClusterHBaseManagedTimeIT { long initBaseTableSeqNumber = baseTable.getSequenceNumber(); // assert that the client side cache is updated. - Map<String, Integer> cqCounters = baseTable.getEncodedCQCounters(); - assertEquals(1, cqCounters.size()); - int counter = cqCounters.get(DEFAULT_COLUMN_FAMILY); - assertEquals(1, counter); - + EncodedCQCounter cqCounter = baseTable.getEncodedCQCounter(); + assertEquals((Integer)ENCODED_CQ_COUNTER_INITIAL_VALUE, cqCounter.getValue()); + // assert that the server side metadata is updated correctly. - assertEncodedCQCounter(DEFAULT_COLUMN_FAMILY, schemaName, baseTableName, 1); + assertEncodedCQCounter(DEFAULT_COLUMN_FAMILY, schemaName, baseTableName, ENCODED_CQ_COUNTER_INITIAL_VALUE, true); assertSequenceNumber(schemaName, baseTableName, initBaseTableSeqNumber); // now create a view and validate client and server side metadata @@ -2260,19 +2260,13 @@ public class AlterTableIT extends BaseOwnClusterHBaseManagedTimeIT { PTable view = phxConn.getTable(new PTableKey(phxConn.getTenantId(), fullViewName)); // verify that the client side cache is updated. Base table's cq counters should be updated. - cqCounters = baseTable.getEncodedCQCounters(); - counter = cqCounters.get(DEFAULT_COLUMN_FAMILY); - assertEquals(2, counter); - counter = cqCounters.get("A"); - assertEquals(2, counter); - cqCounters = view.getEncodedCQCounters(); - assertTrue("A view should always have the column qualifier counters map empty", cqCounters.isEmpty()); - + assertEquals((Integer)(ENCODED_CQ_COUNTER_INITIAL_VALUE + 2), baseTable.getEncodedCQCounter().getValue()); + assertNull("A view should always have the null cq counter", view.getEncodedCQCounter().getValue()); + // assert that the server side metadata for the base table and the view is also updated correctly. - assertEncodedCQCounter(DEFAULT_COLUMN_FAMILY, schemaName, baseTableName, 2); - assertEncodedCQCounter("A", schemaName, baseTableName, 2); - assertEncodedCQValue(DEFAULT_COLUMN_FAMILY, "VIEW_COL1", schemaName, viewName, 1); - assertEncodedCQValue("A", "VIEW_COL2", schemaName, viewName, 1); + assertEncodedCQCounter(DEFAULT_COLUMN_FAMILY, schemaName, baseTableName, ENCODED_CQ_COUNTER_INITIAL_VALUE + 2, true); + assertEncodedCQValue(DEFAULT_COLUMN_FAMILY, "VIEW_COL1", schemaName, viewName, ENCODED_CQ_COUNTER_INITIAL_VALUE); + assertEncodedCQValue("A", "VIEW_COL2", schemaName, viewName, ENCODED_CQ_COUNTER_INITIAL_VALUE + 1); assertSequenceNumber(schemaName, baseTableName, initBaseTableSeqNumber + 1); assertSequenceNumber(schemaName, viewName, PTable.INITIAL_SEQ_NUM); } @@ -2305,18 +2299,14 @@ public class AlterTableIT extends BaseOwnClusterHBaseManagedTimeIT { // assert that the client side cache is updated. baseTable = phxConn.getTable(new PTableKey(phxConn.getTenantId(), fullTableName)); - Map<String, Integer> cqCounters = baseTable.getEncodedCQCounters(); - int counter = cqCounters.get(DEFAULT_COLUMN_FAMILY); - assertEquals(3, counter); - counter = cqCounters.get("B"); - assertEquals(2, counter); - + EncodedCQCounter encodedCqCounter = baseTable.getEncodedCQCounter(); + assertEquals((Integer)(ENCODED_CQ_COUNTER_INITIAL_VALUE + 3), encodedCqCounter.getValue()); + // assert that the server side metadata is updated correctly. - assertEncodedCQCounter(DEFAULT_COLUMN_FAMILY, schemaName, baseTableName, 3); - assertEncodedCQCounter("B", schemaName, baseTableName, 2); - assertEncodedCQValue(DEFAULT_COLUMN_FAMILY, "COL4", schemaName, baseTableName, 1); - assertEncodedCQValue(DEFAULT_COLUMN_FAMILY, "COL5", schemaName, baseTableName, 2); - assertEncodedCQValue("B", "COL6", schemaName, baseTableName, 1); + assertEncodedCQCounter(DEFAULT_COLUMN_FAMILY, schemaName, baseTableName, ENCODED_CQ_COUNTER_INITIAL_VALUE + 3, true); + assertEncodedCQValue(DEFAULT_COLUMN_FAMILY, "COL4", schemaName, baseTableName, ENCODED_CQ_COUNTER_INITIAL_VALUE); + assertEncodedCQValue(DEFAULT_COLUMN_FAMILY, "COL5", schemaName, baseTableName, ENCODED_CQ_COUNTER_INITIAL_VALUE + 1); + assertEncodedCQValue("B", "COL6", schemaName, baseTableName, ENCODED_CQ_COUNTER_INITIAL_VALUE + 2); assertSequenceNumber(schemaName, baseTableName, initBaseTableSeqNumber + 1); // Create a view @@ -2331,26 +2321,19 @@ public class AlterTableIT extends BaseOwnClusterHBaseManagedTimeIT { // assert that the client cache for the base table is updated baseTable = phxConn.getTable(new PTableKey(phxConn.getTenantId(), fullTableName)); - cqCounters = baseTable.getEncodedCQCounters(); - counter = cqCounters.get(DEFAULT_COLUMN_FAMILY); - assertEquals(5, counter); - counter = cqCounters.get("B"); - assertEquals(3, counter); - counter = cqCounters.get("A"); - assertEquals(3, counter); - + encodedCqCounter = baseTable.getEncodedCQCounter(); + assertEquals((Integer)(ENCODED_CQ_COUNTER_INITIAL_VALUE + 8), encodedCqCounter.getValue()); + // assert client cache for view PTable view = phxConn.getTable(new PTableKey(phxConn.getTenantId(), fullViewName)); - cqCounters = view.getEncodedCQCounters(); - assertTrue("A view should always have the column qualifier counters map empty", cqCounters.isEmpty()); - + encodedCqCounter = view.getEncodedCQCounter(); + assertNull("A view should always have the column qualifier counter as null", view.getEncodedCQCounter().getValue()); + // assert that the server side metadata for the base table and the view is also updated correctly. - assertEncodedCQCounter(DEFAULT_COLUMN_FAMILY, schemaName, baseTableName, 5); - assertEncodedCQCounter("A", schemaName, baseTableName, 3); - assertEncodedCQCounter("B", schemaName, baseTableName, 3); - assertEncodedCQValue(DEFAULT_COLUMN_FAMILY, "VIEW_COL3", schemaName, viewName, 4); - assertEncodedCQValue("A", "VIEW_COL4", schemaName, viewName, 2); - assertEncodedCQValue("B", "VIEW_COL5", schemaName, viewName, 2); + assertEncodedCQCounter(DEFAULT_COLUMN_FAMILY, schemaName, baseTableName, ENCODED_CQ_COUNTER_INITIAL_VALUE + 8, true); + assertEncodedCQValue(DEFAULT_COLUMN_FAMILY, "VIEW_COL3", schemaName, viewName, ENCODED_CQ_COUNTER_INITIAL_VALUE + 5); + assertEncodedCQValue("A", "VIEW_COL4", schemaName, viewName, ENCODED_CQ_COUNTER_INITIAL_VALUE + 6); + assertEncodedCQValue("B", "VIEW_COL5", schemaName, viewName, ENCODED_CQ_COUNTER_INITIAL_VALUE + 7); // Adding a column to the should increment the base table's sequence number too since we update the cq counters for column families. assertSequenceNumber(schemaName, baseTableName, initBaseTableSeqNumber + 3); assertSequenceNumber(schemaName, viewName, PTable.INITIAL_SEQ_NUM + 1); @@ -2361,25 +2344,18 @@ public class AlterTableIT extends BaseOwnClusterHBaseManagedTimeIT { baseTable = phxConn.getTable(new PTableKey(phxConn.getTenantId(), fullTableName)); // assert that the client cache for the base table is updated - cqCounters = baseTable.getEncodedCQCounters(); - counter = cqCounters.get(DEFAULT_COLUMN_FAMILY); - assertEquals(6, counter); - counter = cqCounters.get("B"); - assertEquals(3, counter); - counter = cqCounters.get("A"); - assertEquals(4, counter); + encodedCqCounter = baseTable.getEncodedCQCounter(); + assertEquals((Integer)(ENCODED_CQ_COUNTER_INITIAL_VALUE + 10), encodedCqCounter.getValue()); // assert client cache for view view = phxConn.getTable(new PTableKey(phxConn.getTenantId(), fullViewName)); - cqCounters = view.getEncodedCQCounters(); - assertTrue("A view should always have the column qualifier counters map empty", cqCounters.isEmpty()); + encodedCqCounter = view.getEncodedCQCounter(); + assertNull("A view should always have the column qualifier counter as null", view.getEncodedCQCounter().getValue()); // assert that the server side metadata for the base table and the view is also updated correctly. - assertEncodedCQCounter(DEFAULT_COLUMN_FAMILY, schemaName, baseTableName, 6); - assertEncodedCQCounter("A", schemaName, baseTableName, 4); - assertEncodedCQCounter("B", schemaName, baseTableName, 3); - assertEncodedCQValue(DEFAULT_COLUMN_FAMILY, "COL10", schemaName, viewName, 5); - assertEncodedCQValue("A", "COL11", schemaName, viewName, 3); + assertEncodedCQCounter(DEFAULT_COLUMN_FAMILY, schemaName, baseTableName, (ENCODED_CQ_COUNTER_INITIAL_VALUE + 10), true); + assertEncodedCQValue(DEFAULT_COLUMN_FAMILY, "COL10", schemaName, viewName, (ENCODED_CQ_COUNTER_INITIAL_VALUE + 8)); + assertEncodedCQValue("A", "COL11", schemaName, viewName, ENCODED_CQ_COUNTER_INITIAL_VALUE + 9); assertSequenceNumber(schemaName, baseTableName, initBaseTableSeqNumber + 4); assertSequenceNumber(schemaName, viewName, PTable.INITIAL_SEQ_NUM + 2); } @@ -2401,7 +2377,7 @@ public class AlterTableIT extends BaseOwnClusterHBaseManagedTimeIT { } } - private void assertEncodedCQCounter(String columnFamily, String schemaName, String tableName, int expectedValue) throws Exception { + private void assertEncodedCQCounter(String columnFamily, String schemaName, String tableName, int expectedValue, boolean rowExists) throws Exception { String query = "SELECT " + COLUMN_QUALIFIER_COUNTER + " FROM SYSTEM.CATALOG WHERE " + TABLE_SCHEM + " = ? AND " + TABLE_NAME + " = ? " + " AND " + COLUMN_FAMILY + " = ? AND " + COLUMN_QUALIFIER_COUNTER + " IS NOT NULL"; try (Connection conn = DriverManager.getConnection(getUrl())) { @@ -2410,9 +2386,13 @@ public class AlterTableIT extends BaseOwnClusterHBaseManagedTimeIT { stmt.setString(2, tableName); stmt.setString(3, columnFamily); ResultSet rs = stmt.executeQuery(); - assertTrue(rs.next()); - assertEquals(expectedValue, rs.getInt(1)); - assertFalse(rs.next()); + if (rowExists) { + assertTrue(rs.next()); + assertEquals(expectedValue, rs.getInt(1)); + assertFalse(rs.next()); + } else { + assertFalse(rs.next()); + } } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/9525c72f/phoenix-core/src/it/java/org/apache/phoenix/end2end/AlterTableWithViewsIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/AlterTableWithViewsIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/AlterTableWithViewsIT.java index 7458ed9..2f3441f 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/AlterTableWithViewsIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/AlterTableWithViewsIT.java @@ -544,61 +544,7 @@ public class AlterTableWithViewsIT extends BaseHBaseManagedTimeIT { - @Test - public void testAlteringViewThatHasChildViews() throws Exception { - String baseTable = "testAlteringViewThatHasChildViews"; - String childView = "childView"; - String grandChildView = "grandChildView"; - try (Connection conn = DriverManager.getConnection(getUrl()); - Connection viewConn = isMultiTenant ? DriverManager.getConnection(TENANT_SPECIFIC_URL1) : conn ) { - String ddlFormat = "CREATE TABLE IF NOT EXISTS " + baseTable + " (" - + " %s PK2 VARCHAR NOT NULL, V1 VARCHAR, V2 VARCHAR " - + " CONSTRAINT NAME_PK PRIMARY KEY (%s PK2)" - + " ) %s"; - conn.createStatement().execute(generateDDL(ddlFormat)); - - String childViewDDL = "CREATE VIEW " + childView + " AS SELECT * FROM " + baseTable; - viewConn.createStatement().execute(childViewDDL); - - String addColumnToChildViewDDL = - "ALTER VIEW " + childView + " ADD CHILD_VIEW_COL VARCHAR"; - viewConn.createStatement().execute(addColumnToChildViewDDL); - - String grandChildViewDDL = - "CREATE VIEW " + grandChildView + " AS SELECT * FROM " + childView; - viewConn.createStatement().execute(grandChildViewDDL); - - // dropping base table column from child view should succeed - String dropColumnFromChildView = "ALTER VIEW " + childView + " DROP COLUMN V2"; - viewConn.createStatement().execute(dropColumnFromChildView); - - // dropping view specific column from child view should succeed - dropColumnFromChildView = "ALTER VIEW " + childView + " DROP COLUMN CHILD_VIEW_COL"; - viewConn.createStatement().execute(dropColumnFromChildView); - - // Adding column to view that has child views is allowed - String addColumnToChildView = "ALTER VIEW " + childView + " ADD V5 VARCHAR"; - viewConn.createStatement().execute(addColumnToChildView); - // V5 column should be visible now for childView - viewConn.createStatement().execute("SELECT V5 FROM " + childView); - - // However, column V5 shouldn't have propagated to grandChildView. Not till PHOENIX-2054 is fixed. - try { - viewConn.createStatement().execute("SELECT V5 FROM " + grandChildView); - } catch (SQLException e) { - assertEquals(SQLExceptionCode.COLUMN_NOT_FOUND.getErrorCode(), e.getErrorCode()); - } - - // dropping column from the grand child view, however, should work. - String dropColumnFromGrandChildView = - "ALTER VIEW " + grandChildView + " DROP COLUMN CHILD_VIEW_COL"; - viewConn.createStatement().execute(dropColumnFromGrandChildView); - - // similarly, dropping column inherited from the base table should work. - dropColumnFromGrandChildView = "ALTER VIEW " + grandChildView + " DROP COLUMN V2"; - viewConn.createStatement().execute(dropColumnFromGrandChildView); - } - } + @Test public void testDivergedViewsStayDiverged() throws Exception { @@ -676,4 +622,60 @@ public class AlterTableWithViewsIT extends BaseHBaseManagedTimeIT { } } + @Test + public void testAlteringViewThatHasChildViews() throws Exception { + String baseTable = "testAlteringViewThatHasChildViews"; + String childView = "childView"; + String grandChildView = "grandChildView"; + try (Connection conn = DriverManager.getConnection(getUrl()); + Connection viewConn = isMultiTenant ? DriverManager.getConnection(TENANT_SPECIFIC_URL1) : conn ) { + String ddlFormat = "CREATE TABLE IF NOT EXISTS " + baseTable + " (" + + " %s PK2 VARCHAR NOT NULL, V1 VARCHAR, V2 VARCHAR " + + " CONSTRAINT NAME_PK PRIMARY KEY (%s PK2)" + + " ) %s"; + conn.createStatement().execute(generateDDL(ddlFormat)); + + String childViewDDL = "CREATE VIEW " + childView + " AS SELECT * FROM " + baseTable; + viewConn.createStatement().execute(childViewDDL); + + String addColumnToChildViewDDL = + "ALTER VIEW " + childView + " ADD CHILD_VIEW_COL VARCHAR"; + viewConn.createStatement().execute(addColumnToChildViewDDL); + + String grandChildViewDDL = + "CREATE VIEW " + grandChildView + " AS SELECT * FROM " + childView; + viewConn.createStatement().execute(grandChildViewDDL); + + // dropping base table column from child view should succeed + String dropColumnFromChildView = "ALTER VIEW " + childView + " DROP COLUMN V2"; + viewConn.createStatement().execute(dropColumnFromChildView); + + // dropping view specific column from child view should succeed + dropColumnFromChildView = "ALTER VIEW " + childView + " DROP COLUMN CHILD_VIEW_COL"; + viewConn.createStatement().execute(dropColumnFromChildView); + + // Adding column to view that has child views is allowed + String addColumnToChildView = "ALTER VIEW " + childView + " ADD V5 VARCHAR"; + viewConn.createStatement().execute(addColumnToChildView); + // V5 column should be visible now for childView + viewConn.createStatement().execute("SELECT V5 FROM " + childView); + + // However, column V5 shouldn't have propagated to grandChildView. Not till PHOENIX-2054 is fixed. + try { + viewConn.createStatement().execute("SELECT V5 FROM " + grandChildView); + } catch (SQLException e) { + assertEquals(SQLExceptionCode.COLUMN_NOT_FOUND.getErrorCode(), e.getErrorCode()); + } + + // dropping column from the grand child view, however, should work. + String dropColumnFromGrandChildView = + "ALTER VIEW " + grandChildView + " DROP COLUMN CHILD_VIEW_COL"; + viewConn.createStatement().execute(dropColumnFromGrandChildView); + + // similarly, dropping column inherited from the base table should work. + dropColumnFromGrandChildView = "ALTER VIEW " + grandChildView + " DROP COLUMN V2"; + viewConn.createStatement().execute(dropColumnFromGrandChildView); + } + } + } http://git-wip-us.apache.org/repos/asf/phoenix/blob/9525c72f/phoenix-core/src/it/java/org/apache/phoenix/end2end/CreateTableIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CreateTableIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CreateTableIT.java index dd64ea2..feccb8f 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CreateTableIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CreateTableIT.java @@ -20,16 +20,17 @@ package org.apache.phoenix.end2end; import static org.apache.hadoop.hbase.HColumnDescriptor.DEFAULT_REPLICATION_SCOPE; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_FAMILY; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_NAME; -import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.ENCODED_COLUMN_QUALIFIER; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_QUALIFIER_COUNTER; +import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.ENCODED_COLUMN_QUALIFIER; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_NAME; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_SCHEM; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_SEQ_NUM; import static org.apache.phoenix.query.QueryConstants.DEFAULT_COLUMN_FAMILY; +import static org.apache.phoenix.query.QueryConstants.ENCODED_CQ_COUNTER_INITIAL_VALUE; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -40,7 +41,6 @@ import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; import java.util.List; -import java.util.Map; import java.util.Properties; import org.apache.hadoop.hbase.HColumnDescriptor; @@ -50,11 +50,13 @@ import org.apache.phoenix.exception.SQLExceptionCode; import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.jdbc.PhoenixStatement; import org.apache.phoenix.query.KeyRange; +import org.apache.phoenix.query.QueryConstants; import org.apache.phoenix.query.QueryServices; import org.apache.phoenix.schema.NewerTableAlreadyExistsException; -import org.apache.phoenix.schema.SchemaNotFoundException; import org.apache.phoenix.schema.PTable; +import org.apache.phoenix.schema.PTable.EncodedCQCounter; import org.apache.phoenix.schema.PTableKey; +import org.apache.phoenix.schema.SchemaNotFoundException; import org.apache.phoenix.schema.TableAlreadyExistsException; import org.apache.phoenix.util.PhoenixRuntime; import org.apache.phoenix.util.PropertiesUtil; @@ -535,13 +537,11 @@ public class CreateTableIT extends BaseClientManagedTimeIT { long initialSequenceNumber = table.getSequenceNumber(); // assert that the client side cache is updated. - Map<String, Integer> cqCounters = table.getEncodedCQCounters(); - assertEquals(1, cqCounters.size()); - int counter = cqCounters.get(DEFAULT_COLUMN_FAMILY); - assertEquals(1, counter); - + EncodedCQCounter cqCounter = table.getEncodedCQCounter(); + assertEquals((Integer)ENCODED_CQ_COUNTER_INITIAL_VALUE, cqCounter.getValue()); + // assert that the server side metadata is updated correctly. - assertColumnFamilyCounter(DEFAULT_COLUMN_FAMILY, schemaName, tableName, 1); + assertColumnFamilyCounter(DEFAULT_COLUMN_FAMILY, schemaName, tableName, ENCODED_CQ_COUNTER_INITIAL_VALUE, true); assertSequenceNumber(schemaName, tableName, initialSequenceNumber); // now add a column and validate client and server side metadata @@ -549,18 +549,15 @@ public class CreateTableIT extends BaseClientManagedTimeIT { table = phxConn.getTable(new PTableKey(phxConn.getTenantId(), fullTableName)); // verify that the client side cache is updated. - cqCounters = table.getEncodedCQCounters(); - counter = cqCounters.get(DEFAULT_COLUMN_FAMILY); - assertEquals(3, counter); - counter = cqCounters.get("A"); - assertEquals(2, counter); + cqCounter = table.getEncodedCQCounter(); + assertEquals((Integer)14, cqCounter.getValue()); + // assert that the server side metadata is also updated correctly. - assertColumnFamilyCounter(DEFAULT_COLUMN_FAMILY, schemaName, tableName, 3); - assertColumnFamilyCounter("A", schemaName, tableName, 2); - assertColumnQualifier(DEFAULT_COLUMN_FAMILY, "COL4", schemaName, tableName, 1); - assertColumnQualifier(DEFAULT_COLUMN_FAMILY, "COL6", schemaName, tableName, 2); - assertColumnQualifier("A", "COL5", schemaName, tableName, 1); + assertColumnFamilyCounter(DEFAULT_COLUMN_FAMILY, schemaName, tableName, ENCODED_CQ_COUNTER_INITIAL_VALUE + 3, true); + assertColumnQualifier(DEFAULT_COLUMN_FAMILY, "COL4", schemaName, tableName, ENCODED_CQ_COUNTER_INITIAL_VALUE); + assertColumnQualifier(DEFAULT_COLUMN_FAMILY, "COL6", schemaName, tableName, ENCODED_CQ_COUNTER_INITIAL_VALUE + 1); + assertColumnQualifier("A", "COL5", schemaName, tableName, ENCODED_CQ_COUNTER_INITIAL_VALUE + 2); assertSequenceNumber(schemaName, tableName, initialSequenceNumber + 1); } } @@ -581,7 +578,7 @@ public class CreateTableIT extends BaseClientManagedTimeIT { } } - private void assertColumnFamilyCounter(String columnFamily, String schemaName, String tableName, int expectedValue) throws Exception { + private void assertColumnFamilyCounter(String columnFamily, String schemaName, String tableName, int expectedValue, boolean rowExists) throws Exception { String query = "SELECT " + COLUMN_QUALIFIER_COUNTER + " FROM SYSTEM.CATALOG WHERE " + TABLE_SCHEM + " = ? AND " + TABLE_NAME + " = ? " + " AND " + COLUMN_FAMILY + " = ? AND " + COLUMN_QUALIFIER_COUNTER + " IS NOT NULL"; try (Connection conn = DriverManager.getConnection(getUrl())) { @@ -590,9 +587,11 @@ public class CreateTableIT extends BaseClientManagedTimeIT { stmt.setString(2, tableName); stmt.setString(3, columnFamily); ResultSet rs = stmt.executeQuery(); - assertTrue(rs.next()); - assertEquals(expectedValue, rs.getInt(1)); - assertFalse(rs.next()); + assertEquals(rowExists, rs.next()); + if (rowExists) { + assertEquals(expectedValue, rs.getInt(1)); + assertFalse(rs.next()); + } } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/9525c72f/phoenix-core/src/it/java/org/apache/phoenix/end2end/GroupByIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/GroupByIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/GroupByIT.java index 51ab070..8eace13 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/GroupByIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/GroupByIT.java @@ -33,7 +33,6 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import java.sql.Connection; -import java.sql.Date; import java.sql.DriverManager; import java.sql.PreparedStatement; import java.sql.ResultSet; @@ -47,8 +46,6 @@ import java.util.Properties; import org.apache.phoenix.query.QueryServices; import org.apache.phoenix.util.PhoenixRuntime; import org.apache.phoenix.util.PropertiesUtil; -import org.apache.phoenix.util.ReadOnlyProps; -import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; import org.junit.runner.RunWith; http://git-wip-us.apache.org/repos/asf/phoenix/blob/9525c72f/phoenix-core/src/it/java/org/apache/phoenix/end2end/RowValueConstructorIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/RowValueConstructorIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/RowValueConstructorIT.java index be8ec59..6d3749f 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/RowValueConstructorIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/RowValueConstructorIT.java @@ -105,7 +105,7 @@ public class RowValueConstructorIT extends BaseClientManagedTimeIT { count++; } // we have 6 values for a_integer present in the atable where a >= 4. x_integer is null for a_integer = 4. So the query should have returned 5 rows. - assertTrue(count == 5); + assertEquals(5, count); } finally { conn.close(); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/9525c72f/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertValuesIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertValuesIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertValuesIT.java index 3fec718..ead3cc0 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertValuesIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertValuesIT.java @@ -958,8 +958,8 @@ public class UpsertValuesIT extends BaseClientManagedTimeIT { ResultScanner scanner = table.getScanner(new Scan()); Result next = scanner.next(); assertTrue(next.containsColumn(Bytes.toBytes("CF1"), PInteger.INSTANCE.toBytes(1))); - assertTrue(next.containsColumn(Bytes.toBytes("CF2"), PInteger.INSTANCE.toBytes(1))); assertTrue(next.containsColumn(Bytes.toBytes("CF2"), PInteger.INSTANCE.toBytes(2))); + assertTrue(next.containsColumn(Bytes.toBytes("CF2"), PInteger.INSTANCE.toBytes(3))); } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/9525c72f/phoenix-core/src/main/java/org/apache/phoenix/compile/FromCompiler.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/FromCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/FromCompiler.java index eec8c8a..ccd9a03 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/FromCompiler.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/FromCompiler.java @@ -71,6 +71,7 @@ import org.apache.phoenix.schema.PName; import org.apache.phoenix.schema.PNameFactory; import org.apache.phoenix.schema.PTable; import org.apache.phoenix.schema.PTable.IndexType; +import org.apache.phoenix.schema.PTable.StorageScheme; import org.apache.phoenix.schema.PTableImpl; import org.apache.phoenix.schema.PTableKey; import org.apache.phoenix.schema.PTableType; @@ -340,6 +341,7 @@ public class FromCompiler { if (connection.getSchema() != null) { schema = schema != null ? schema : connection.getSchema(); } + //TODO: samarth should we change the ptableimpl constructor here to set non-encoded column name scheme and null counter PTable theTable = new PTableImpl(connection.getTenantId(), schema, table.getName().getTableName(), scn == null ? HConstants.LATEST_TIMESTAMP : scn, families); theTable = this.addDynamicColumns(table.getDynamicColumns(), theTable); @@ -781,7 +783,7 @@ public class FromCompiler { MetaDataProtocol.MIN_TABLE_TIMESTAMP, PTable.INITIAL_SEQ_NUM, null, null, columns, null, null, Collections.<PTable> emptyList(), false, Collections.<PName> emptyList(), null, null, false, false, false, null, null, null, false, false, 0, 0L, SchemaUtil - .isNamespaceMappingEnabled(PTableType.SUBQUERY, connection.getQueryServices().getProps()), null, null); + .isNamespaceMappingEnabled(PTableType.SUBQUERY, connection.getQueryServices().getProps()), StorageScheme.NON_ENCODED_COLUMN_NAMES, PTable.EncodedCQCounter.NULL_COUNTER); String alias = subselectNode.getAlias(); TableRef tableRef = new TableRef(alias, t, MetaDataProtocol.MIN_TABLE_TIMESTAMP, false); http://git-wip-us.apache.org/repos/asf/phoenix/blob/9525c72f/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java index 69b9bfb..a37d071 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java @@ -1308,7 +1308,7 @@ public class JoinCompiler { left.getBucketNum(), merged,left.getParentSchemaName(), left.getParentTableName(), left.getIndexes(), left.isImmutableRows(), Collections.<PName>emptyList(), null, null, PTable.DEFAULT_DISABLE_WAL, left.isMultiTenant(), left.getStoreNulls(), left.getViewType(), left.getViewIndexId(), left.getIndexType(), - left.rowKeyOrderOptimizable(), left.isTransactional(), left.getUpdateCacheFrequency(), left.getIndexDisableTimestamp(), left.isNamespaceMapped(), StorageScheme.NON_ENCODED_COLUMN_NAMES, null); + left.rowKeyOrderOptimizable(), left.isTransactional(), left.getUpdateCacheFrequency(), left.getIndexDisableTimestamp(), left.isNamespaceMapped(), StorageScheme.NON_ENCODED_COLUMN_NAMES, PTable.EncodedCQCounter.NULL_COUNTER); //FIXME: samarth } http://git-wip-us.apache.org/repos/asf/phoenix/blob/9525c72f/phoenix-core/src/main/java/org/apache/phoenix/compile/ListJarsQueryPlan.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/ListJarsQueryPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/ListJarsQueryPlan.java index cceef9a..520f9e3 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/ListJarsQueryPlan.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/ListJarsQueryPlan.java @@ -161,6 +161,7 @@ public class ListJarsQueryPlan implements QueryPlan { Type.Put.getCode(), HConstants.EMPTY_BYTE_ARRAY); List<Cell> cells = new ArrayList<Cell>(1); cells.add(cell); + //TODO: samarth confirm if passing false is the right thing to do here. return new ResultTuple(Result.create(cells)); } catch (IOException e) { throw new SQLException(e); http://git-wip-us.apache.org/repos/asf/phoenix/blob/9525c72f/phoenix-core/src/main/java/org/apache/phoenix/compile/TupleProjectionCompiler.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/TupleProjectionCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/TupleProjectionCompiler.java index f9e7f44..75eb66f 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/TupleProjectionCompiler.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/TupleProjectionCompiler.java @@ -17,6 +17,8 @@ */ package org.apache.phoenix.compile; +import static org.apache.phoenix.query.QueryConstants.VALUE_COLUMN_FAMILY; + import java.sql.SQLException; import java.util.ArrayList; import java.util.Collections; @@ -24,7 +26,6 @@ import java.util.HashSet; import java.util.List; import java.util.Set; -import org.apache.phoenix.execute.TupleProjector; import org.apache.phoenix.parse.AliasedNode; import org.apache.phoenix.parse.ColumnParseNode; import org.apache.phoenix.parse.FamilyWildcardParseNode; @@ -43,7 +44,9 @@ import org.apache.phoenix.schema.PColumn; import org.apache.phoenix.schema.PName; import org.apache.phoenix.schema.PNameFactory; import org.apache.phoenix.schema.PTable; +import org.apache.phoenix.schema.PTable.EncodedCQCounter; import org.apache.phoenix.schema.PTable.IndexType; +import org.apache.phoenix.schema.PTable.StorageScheme; import org.apache.phoenix.schema.PTableImpl; import org.apache.phoenix.schema.PTableType; import org.apache.phoenix.schema.ProjectedColumn; @@ -154,7 +157,7 @@ public class TupleProjectionCompiler { table.getParentName(), table.getIndexes(), table.isImmutableRows(), Collections.<PName> emptyList(), null, null, table.isWALDisabled(), table.isMultiTenant(), table.getStoreNulls(), table.getViewType(), table.getViewIndexId(), - table.getIndexType(), table.rowKeyOrderOptimizable(), table.isTransactional(), table.getUpdateCacheFrequency(), table.getIndexDisableTimestamp(), table.isNamespaceMapped(), table.getStorageScheme(), table.getEncodedCQCounters()); + table.getIndexType(), table.rowKeyOrderOptimizable(), table.isTransactional(), table.getUpdateCacheFrequency(), table.getIndexDisableTimestamp(), table.isNamespaceMapped(), table.getStorageScheme(), table.getEncodedCQCounter()); } public static PTable createProjectedTable(TableRef tableRef, List<ColumnRef> sourceColumnRefs, boolean retainPKColumns) throws SQLException { @@ -162,6 +165,8 @@ public class TupleProjectionCompiler { boolean hasSaltingColumn = retainPKColumns && table.getBucketNum() != null; List<PColumn> projectedColumns = new ArrayList<PColumn>(); int position = hasSaltingColumn ? 1 : 0; + StorageScheme storageScheme = StorageScheme.NON_ENCODED_COLUMN_NAMES; + Integer counter = null; for (int i = position; i < sourceColumnRefs.size(); i++) { ColumnRef sourceColumnRef = sourceColumnRefs.get(i); PColumn sourceColumn = sourceColumnRef.getColumn(); @@ -172,17 +177,29 @@ public class TupleProjectionCompiler { PColumn column = new ProjectedColumn(PNameFactory.newName(aliasedName), retainPKColumns && SchemaUtil.isPKColumn(sourceColumn) ? - null : PNameFactory.newName(TupleProjector.VALUE_COLUMN_FAMILY), + null : PNameFactory.newName(VALUE_COLUMN_FAMILY), position++, sourceColumn.isNullable(), sourceColumnRef); + if (EncodedColumnsUtil.hasEncodedColumnName(sourceColumn)) { + if (counter == null) { + counter = 1; + } else { + counter++; + } + } projectedColumns.add(column); } + EncodedCQCounter cqCounter = PTable.EncodedCQCounter.NULL_COUNTER; + if (counter != null) { + cqCounter = new EncodedCQCounter(counter); //TODO: samarth I am not sure whether the exact count matters here or not + storageScheme = StorageScheme.ENCODED_COLUMN_NAMES; + } return PTableImpl.makePTable(table.getTenantId(), PROJECTED_TABLE_SCHEMA, table.getName(), PTableType.PROJECTED, null, table.getTimeStamp(), table.getSequenceNumber(), table.getPKName(), retainPKColumns ? table.getBucketNum() : null, projectedColumns, null, null, Collections.<PTable> emptyList(), table.isImmutableRows(), Collections.<PName> emptyList(), null, null, table.isWALDisabled(), table.isMultiTenant(), table.getStoreNulls(), table.getViewType(), table.getViewIndexId(), null, table.rowKeyOrderOptimizable(), table.isTransactional(), - table.getUpdateCacheFrequency(), table.getIndexDisableTimestamp(), table.isNamespaceMapped(), table.getStorageScheme(), table.getEncodedCQCounters()); + table.getUpdateCacheFrequency(), table.getIndexDisableTimestamp(), table.isNamespaceMapped(), storageScheme, cqCounter); } // For extracting column references from single select statement http://git-wip-us.apache.org/repos/asf/phoenix/blob/9525c72f/phoenix-core/src/main/java/org/apache/phoenix/compile/UnionCompiler.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/UnionCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/UnionCompiler.java index 9c89817..6376d60 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/UnionCompiler.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/UnionCompiler.java @@ -35,6 +35,7 @@ import org.apache.phoenix.schema.PTable; import org.apache.phoenix.schema.PTableImpl; import org.apache.phoenix.schema.PTableType; import org.apache.phoenix.schema.TableRef; +import org.apache.phoenix.schema.PTable.StorageScheme; import org.apache.phoenix.schema.types.PDataType; import org.apache.phoenix.util.SchemaUtil; @@ -81,12 +82,13 @@ public class UnionCompiler { projectedColumns.add(projectedColumn); } Long scn = statement.getConnection().getSCN(); + //TODO: samarth this is likely just an in memory reference for compilation purposes. Probably ok to pass non-encoded scheme and null counter. PTable tempTable = PTableImpl.makePTable(statement.getConnection().getTenantId(), UNION_SCHEMA_NAME, UNION_TABLE_NAME, PTableType.SUBQUERY, null, HConstants.LATEST_TIMESTAMP, scn == null ? HConstants.LATEST_TIMESTAMP : scn, null, null, projectedColumns, null, null, null, true, null, null, null, true, true, true, null, null, null, false, false, 0, 0L, SchemaUtil.isNamespaceMappingEnabled(PTableType.SUBQUERY, - statement.getConnection().getQueryServices().getProps()), null, null); + statement.getConnection().getQueryServices().getProps()), StorageScheme.NON_ENCODED_COLUMN_NAMES, PTable.EncodedCQCounter.NULL_COUNTER); TableRef tableRef = new TableRef(null, tempTable, 0, false); return tableRef; } http://git-wip-us.apache.org/repos/asf/phoenix/blob/9525c72f/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java index 18a2057..4b0454c 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java @@ -49,6 +49,8 @@ import org.apache.phoenix.schema.KeyValueSchema; import org.apache.phoenix.schema.StaleRegionBoundaryCacheException; import org.apache.phoenix.schema.ValueBitSet; import org.apache.phoenix.schema.tuple.MultiKeyValueTuple; +import org.apache.phoenix.schema.tuple.PositionBasedMultiKeyValueTuple; +import org.apache.phoenix.schema.tuple.PositionBasedResultTuple; import org.apache.phoenix.schema.tuple.ResultTuple; import org.apache.phoenix.schema.tuple.Tuple; import org.apache.phoenix.util.IndexUtil; @@ -246,14 +248,14 @@ abstract public class BaseScannerRegionObserver extends BaseRegionObserver { * @param indexMaintainer * @param viewConstants */ - protected RegionScanner getWrappedScanner(final ObserverContext<RegionCoprocessorEnvironment> c, + RegionScanner getWrappedScanner(final ObserverContext<RegionCoprocessorEnvironment> c, final RegionScanner s, final int offset, final Scan scan, final ColumnReference[] dataColumns, final TupleProjector tupleProjector, final HRegion dataRegion, final IndexMaintainer indexMaintainer, final byte[][] viewConstants, final TupleProjector projector, - final ImmutableBytesWritable ptr) { + final ImmutableBytesWritable ptr, final boolean useQualiferAsListIndex) { return getWrappedScanner(c, s, null, null, offset, scan, dataColumns, tupleProjector, - dataRegion, indexMaintainer, null, viewConstants, null, null, projector, ptr); + dataRegion, indexMaintainer, null, viewConstants, null, null, projector, ptr, useQualiferAsListIndex); } /** @@ -271,7 +273,7 @@ abstract public class BaseScannerRegionObserver extends BaseRegionObserver { * @param tx current transaction * @param viewConstants */ - protected RegionScanner getWrappedScanner(final ObserverContext<RegionCoprocessorEnvironment> c, + RegionScanner getWrappedScanner(final ObserverContext<RegionCoprocessorEnvironment> c, final RegionScanner s, final Set<KeyValueColumnExpression> arrayKVRefs, final Expression[] arrayFuncRefs, final int offset, final Scan scan, final ColumnReference[] dataColumns, final TupleProjector tupleProjector, @@ -279,7 +281,7 @@ abstract public class BaseScannerRegionObserver extends BaseRegionObserver { Transaction tx, final byte[][] viewConstants, final KeyValueSchema kvSchema, final ValueBitSet kvSchemaBitSet, final TupleProjector projector, - final ImmutableBytesWritable ptr) { + final ImmutableBytesWritable ptr, final boolean useQualifierAsListIndex) { return new RegionScanner() { @Override @@ -344,11 +346,14 @@ abstract public class BaseScannerRegionObserver extends BaseRegionObserver { tupleProjector, dataRegion, indexMaintainer, viewConstants, ptr); } if (projector != null) { - Tuple tuple = projector.projectResults(new ResultTuple(Result.create(result))); + // TODO: samarth think if this is the right thing to do here. + Tuple toProject = useQualifierAsListIndex ? new PositionBasedResultTuple(result) : new ResultTuple(Result.create(result)); + Tuple tuple = projector.projectResults(toProject); result.clear(); result.add(tuple.getValue(0)); - if(arrayElementCell != null) + if (arrayElementCell != null) { result.add(arrayElementCell); + } } // There is a scanattribute set to retrieve the specific array element return next; @@ -375,7 +380,8 @@ abstract public class BaseScannerRegionObserver extends BaseRegionObserver { tupleProjector, dataRegion, indexMaintainer, viewConstants, ptr); } if (projector != null) { - Tuple tuple = projector.projectResults(new ResultTuple(Result.create(result))); + Tuple toProject = useQualifierAsListIndex ? new PositionBasedMultiKeyValueTuple(result) : new ResultTuple(Result.create(result)); + Tuple tuple = projector.projectResults(toProject); result.clear(); result.add(tuple.getValue(0)); if(arrayElementCell != null) http://git-wip-us.apache.org/repos/asf/phoenix/blob/9525c72f/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/DelegateRegionScanner.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/DelegateRegionScanner.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/DelegateRegionScanner.java index f88a931..089c4fe 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/DelegateRegionScanner.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/DelegateRegionScanner.java @@ -56,22 +56,27 @@ public class DelegateRegionScanner implements RegionScanner { delegate.close(); } + @Override public long getMaxResultSize() { return delegate.getMaxResultSize(); } + @Override public boolean next(List<Cell> arg0, int arg1) throws IOException { return delegate.next(arg0, arg1); } + @Override public boolean next(List<Cell> arg0) throws IOException { return delegate.next(arg0); } + @Override public boolean nextRaw(List<Cell> arg0, int arg1) throws IOException { return delegate.nextRaw(arg0, arg1); } + @Override public boolean nextRaw(List<Cell> arg0) throws IOException { return delegate.nextRaw(arg0); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/9525c72f/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java index 52a25d3..39a4ab8 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java @@ -138,6 +138,7 @@ public class GroupedAggregateRegionObserver extends BaseScannerRegionObserver { final TupleProjector p = TupleProjector.deserializeProjectorFromScan(scan); final HashJoinInfo j = HashJoinInfo.deserializeHashJoinFromScan(scan); + boolean useQualifierAsIndex = ScanUtil.useQualifierAsIndex(ScanUtil.getMinMaxQualifiersFromScan(scan), j != null); if (ScanUtil.isLocalIndex(scan) || (j == null && p != null)) { if (dataColumns != null) { tupleProjector = IndexUtil.getTupleProjector(scan, dataColumns); @@ -147,7 +148,7 @@ public class GroupedAggregateRegionObserver extends BaseScannerRegionObserver { ImmutableBytesWritable tempPtr = new ImmutableBytesWritable(); innerScanner = getWrappedScanner(c, innerScanner, offset, scan, dataColumns, tupleProjector, - dataRegion, indexMaintainers == null ? null : indexMaintainers.get(0), viewConstants, p, tempPtr); + dataRegion, indexMaintainers == null ? null : indexMaintainers.get(0), viewConstants, p, tempPtr, useQualifierAsIndex); } if (j != null) { @@ -163,9 +164,9 @@ public class GroupedAggregateRegionObserver extends BaseScannerRegionObserver { } if (keyOrdered) { // Optimize by taking advantage that the rows are // already in the required group by key order - return scanOrdered(c, scan, innerScanner, expressions, aggregators, limit); + return scanOrdered(c, scan, innerScanner, expressions, aggregators, limit, j != null); } else { // Otherwse, collect them all up in an in memory map - return scanUnordered(c, scan, innerScanner, expressions, aggregators, limit); + return scanUnordered(c, scan, innerScanner, expressions, aggregators, limit, j != null); } } @@ -371,7 +372,7 @@ public class GroupedAggregateRegionObserver extends BaseScannerRegionObserver { */ private RegionScanner scanUnordered(ObserverContext<RegionCoprocessorEnvironment> c, Scan scan, final RegionScanner scanner, final List<Expression> expressions, - final ServerAggregators aggregators, long limit) throws IOException { + final ServerAggregators aggregators, long limit, boolean isJoin) throws IOException { if (logger.isDebugEnabled()) { logger.debug(LogUtil.addCustomAnnotations("Grouped aggregation over unordered rows with scan " + scan + ", group by " + expressions + ", aggregators " + aggregators, ScanUtil.getCustomAnnotations(scan))); @@ -386,7 +387,7 @@ public class GroupedAggregateRegionObserver extends BaseScannerRegionObserver { (int) (Bytes.toInt(estDistValsBytes) * 1.5f)); } Pair<Integer, Integer> minMaxQualifiers = getMinMaxQualifiersFromScan(scan); - boolean useEncodedScheme = minMaxQualifiers != null; + boolean useQualifierAsIndex = ScanUtil.useQualifierAsIndex(ScanUtil.getMinMaxQualifiersFromScan(scan), isJoin); final boolean spillableEnabled = conf.getBoolean(GROUPBY_SPILLABLE_ATTRIB, DEFAULT_GROUPBY_SPILLABLE); @@ -397,7 +398,7 @@ public class GroupedAggregateRegionObserver extends BaseScannerRegionObserver { boolean success = false; try { boolean hasMore; - Tuple result = useEncodedScheme ? new PositionBasedMultiKeyValueTuple() : new MultiKeyValueTuple(); + Tuple result = useQualifierAsIndex ? new PositionBasedMultiKeyValueTuple() : new MultiKeyValueTuple(); if (logger.isDebugEnabled()) { logger.debug(LogUtil.addCustomAnnotations("Spillable groupby enabled: " + spillableEnabled, ScanUtil.getCustomAnnotations(scan))); } @@ -406,7 +407,7 @@ public class GroupedAggregateRegionObserver extends BaseScannerRegionObserver { try { synchronized (scanner) { do { - List<Cell> results = useEncodedScheme ? new BoundedSkipNullCellsList(minMaxQualifiers.getFirst(), minMaxQualifiers.getSecond()) : new ArrayList<Cell>(); + List<Cell> results = useQualifierAsIndex ? new BoundedSkipNullCellsList(minMaxQualifiers.getFirst(), minMaxQualifiers.getSecond()) : new ArrayList<Cell>(); // Results are potentially returned even when the return // value of s.next is false // since this is an indication of whether or not there are @@ -450,14 +451,14 @@ public class GroupedAggregateRegionObserver extends BaseScannerRegionObserver { */ private RegionScanner scanOrdered(final ObserverContext<RegionCoprocessorEnvironment> c, final Scan scan, final RegionScanner scanner, final List<Expression> expressions, - final ServerAggregators aggregators, final long limit) throws IOException { + final ServerAggregators aggregators, final long limit, final boolean isJoin) throws IOException { if (logger.isDebugEnabled()) { logger.debug(LogUtil.addCustomAnnotations("Grouped aggregation over ordered rows with scan " + scan + ", group by " + expressions + ", aggregators " + aggregators, ScanUtil.getCustomAnnotations(scan))); } final Pair<Integer, Integer> minMaxQualifiers = getMinMaxQualifiersFromScan(scan); - final boolean useEncodedScheme = minMaxQualifiers != null; + final boolean useQualifierAsIndex = ScanUtil.useQualifierAsIndex(ScanUtil.getMinMaxQualifiersFromScan(scan), isJoin); return new BaseRegionScanner(scanner) { private long rowCount = 0; private ImmutableBytesWritable currentKey = null; @@ -467,7 +468,7 @@ public class GroupedAggregateRegionObserver extends BaseScannerRegionObserver { boolean hasMore; boolean atLimit; boolean aggBoundary = false; - Tuple result = useEncodedScheme ? new PositionBasedMultiKeyValueTuple() : new MultiKeyValueTuple(); + Tuple result = useQualifierAsIndex ? new PositionBasedMultiKeyValueTuple() : new MultiKeyValueTuple(); ImmutableBytesWritable key = null; Aggregator[] rowAggregators = aggregators.getAggregators(); // If we're calculating no aggregate functions, we can exit at the @@ -478,7 +479,7 @@ public class GroupedAggregateRegionObserver extends BaseScannerRegionObserver { try { synchronized (scanner) { do { - List<Cell> kvs = useEncodedScheme ? new BoundedSkipNullCellsList(minMaxQualifiers.getFirst(), minMaxQualifiers.getSecond()) : new ArrayList<Cell>(); + List<Cell> kvs = useQualifierAsIndex ? new BoundedSkipNullCellsList(minMaxQualifiers.getFirst(), minMaxQualifiers.getSecond()) : new ArrayList<Cell>(); // Results are potentially returned even when the return // value of s.next is false // since this is an indication of whether or not there @@ -516,6 +517,9 @@ public class GroupedAggregateRegionObserver extends BaseScannerRegionObserver { KeyValueUtil.newKeyValue(currentKey.get(), currentKey.getOffset(), currentKey.getLength(), SINGLE_COLUMN_FAMILY, SINGLE_COLUMN, AGG_TIMESTAMP, value, 0, value.length); + //TODO: samarth aaha how do we handle this. It looks like we are adding stuff like this to the results + // that we are returning. Bounded skip null cell list won't handle this properly. Interesting. So how do we + // handle this. Does having a reserved set of column qualifiers help here? results.add(keyValue); if (logger.isDebugEnabled()) { logger.debug(LogUtil.addCustomAnnotations("Adding new aggregate row: " http://git-wip-us.apache.org/repos/asf/phoenix/blob/9525c72f/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/HashJoinRegionScanner.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/HashJoinRegionScanner.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/HashJoinRegionScanner.java index 2650225..8c2c3d6 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/HashJoinRegionScanner.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/HashJoinRegionScanner.java @@ -110,7 +110,7 @@ public class HashJoinRegionScanner implements RegionScanner { private void processResults(List<Cell> result, boolean hasBatchLimit) throws IOException { if (result.isEmpty()) return; - + //TODO: samarth make joins work with position based lookup. Tuple tuple = new ResultTuple(Result.create(result)); // For backward compatibility. In new versions, HashJoinInfo.forceProjection() // always returns true. http://git-wip-us.apache.org/repos/asf/phoenix/blob/9525c72f/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java index 58a637a..3b57097 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java @@ -25,6 +25,7 @@ import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.CLASS_NAME_BYTES; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_COUNT_BYTES; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_DEF_BYTES; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_NAME_INDEX; +import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_QUALIFIER_COUNTER_BYTES; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_SIZE_BYTES; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.DATA_TABLE_NAME_BYTES; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.DATA_TYPE_BYTES; @@ -33,7 +34,6 @@ import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.DEFAULT_COLUMN_FAM import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.DEFAULT_VALUE_BYTES; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.DISABLE_WAL_BYTES; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.ENCODED_COLUMN_QUALIFIER_BYTES; -import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_QUALIFIER_COUNTER_BYTES; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.FAMILY_NAME_INDEX; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.IMMUTABLE_ROWS_BYTES; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.INDEX_DISABLE_TIMESTAMP_BYTES; @@ -85,11 +85,9 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.Comparator; -import java.util.HashMap; import java.util.Iterator; import java.util.List; -import java.util.Map; -import java.util.Set; + import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.Coprocessor; @@ -184,6 +182,7 @@ import org.apache.phoenix.schema.PMetaDataEntity; import org.apache.phoenix.schema.PName; import org.apache.phoenix.schema.PNameFactory; import org.apache.phoenix.schema.PTable; +import org.apache.phoenix.schema.PTable.EncodedCQCounter; import org.apache.phoenix.schema.PTable.IndexType; import org.apache.phoenix.schema.PTable.LinkType; import org.apache.phoenix.schema.PTable.StorageScheme; @@ -925,7 +924,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso List<PColumn> columns = Lists.newArrayListWithExpectedSize(columnCount); List<PTable> indexes = new ArrayList<PTable>(); List<PName> physicalTables = new ArrayList<PName>(); - Map<String, Integer> encodedColumnQualifierCounters = new HashMap<>(); //TODO: samarth size properly. + int counter = 0; while (true) { results.clear(); scanner.next(results); @@ -940,9 +939,8 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso PName famName = newPName(colKv.getRowArray(), colKv.getRowOffset() + colKeyOffset, colKeyLength-colKeyOffset); if (colName.getString().isEmpty() && famName != null) { if (isQualifierCounterKv(colKv)) { - Integer counter = (Integer)PInteger.INSTANCE.toObject(colKv.getValueArray(), - colKv.getValueOffset(), colKv.getValueLength()); - encodedColumnQualifierCounters.put(famName.getString(), counter); + counter = PInteger.INSTANCE.getCodec().decodeInt(colKv.getValueArray(), + colKv.getValueOffset(), SortOrder.ASC); } else { LinkType linkType = LinkType.fromSerializedValue(colKv.getValueArray()[colKv.getValueOffset()]); if (linkType == LinkType.INDEX_TABLE) { @@ -956,6 +954,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso } } } + EncodedCQCounter cqCounter = (storageScheme == StorageScheme.NON_ENCODED_COLUMN_NAMES || tableType == PTableType.VIEW) ? PTable.EncodedCQCounter.NULL_COUNTER : new EncodedCQCounter(counter); PName physicalTableName = physicalTables.isEmpty() ? PNameFactory.newName(SchemaUtil.getPhysicalTableName( Bytes.toBytes(SchemaUtil.getTableName(schemaName.getBytes(), tableName.getBytes())), isNamespaceMapped) .getNameAsString()) : physicalTables.get(0); @@ -980,7 +979,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso tableType == INDEX ? dataTableName : null, indexes, isImmutableRows, physicalTables, defaultFamilyName, viewStatement, disableWAL, multiTenant, storeNulls, viewType, viewIndexId, indexType, rowKeyOrderOptimizable, transactional, updateCacheFrequency, stats, baseColumnCount, - indexDisableTimestamp, isNamespaceMapped, storageScheme, encodedColumnQualifierCounters); + indexDisableTimestamp, isNamespaceMapped, storageScheme, cqCounter); } private PSchema getSchema(RegionScanner scanner, long clientTimeStamp) throws IOException, SQLException { http://git-wip-us.apache.org/repos/asf/phoenix/blob/9525c72f/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java index 72f6d09..61b98d4 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java @@ -27,6 +27,8 @@ import java.util.ArrayList; import java.util.List; import java.util.Set; +import co.cask.tephra.Transaction; + import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.client.Result; @@ -36,7 +38,6 @@ import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.RegionScanner; -import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.WritableUtils; import org.apache.phoenix.cache.GlobalCache; import org.apache.phoenix.cache.TenantCache; @@ -68,8 +69,6 @@ import org.apache.phoenix.util.ServerUtil; import com.google.common.collect.Lists; import com.google.common.collect.Sets; -import co.cask.tephra.Transaction; - /** * @@ -108,7 +107,7 @@ public class ScanRegionObserver extends BaseScannerRegionObserver { } } - public static OrderedResultIterator deserializeFromScan(Scan scan, RegionScanner s) { + private static OrderedResultIterator deserializeFromScan(Scan scan, RegionScanner s, boolean isJoin) { byte[] topN = scan.getAttribute(BaseScannerRegionObserver.TOPN); if (topN == null) { return null; @@ -126,7 +125,7 @@ public class ScanRegionObserver extends BaseScannerRegionObserver { orderByExpression.readFields(input); orderByExpressions.add(orderByExpression); } - ResultIterator inner = new RegionScannerResultIterator(s); + ResultIterator inner = new RegionScannerResultIterator(s, ScanUtil.getMinMaxQualifiersFromScan(scan), isJoin); return new OrderedResultIterator(inner, orderByExpressions, thresholdBytes, limit >= 0 ? limit : null, null, estimatedRowSize); } catch (IOException e) { @@ -219,10 +218,12 @@ public class ScanRegionObserver extends BaseScannerRegionObserver { final TupleProjector p = TupleProjector.deserializeProjectorFromScan(scan); final HashJoinInfo j = HashJoinInfo.deserializeHashJoinFromScan(scan); + //TODO: samarth get rid of this join shit. Joins should support position based look up. + boolean useQualifierAsIndex = ScanUtil.useQualifierAsIndex(ScanUtil.getMinMaxQualifiersFromScan(scan), j != null) && scan.getAttribute(BaseScannerRegionObserver.TOPN) != null; innerScanner = getWrappedScanner(c, innerScanner, arrayKVRefs, arrayFuncRefs, offset, scan, dataColumns, tupleProjector, dataRegion, indexMaintainer, tx, - viewConstants, kvSchema, kvSchemaBitSet, j == null ? p : null, ptr); + viewConstants, kvSchema, kvSchemaBitSet, j == null ? p : null, ptr, useQualifierAsIndex); final ImmutableBytesWritable tenantId = ScanUtil.getTenantId(scan); if (j != null) { @@ -230,10 +231,10 @@ public class ScanRegionObserver extends BaseScannerRegionObserver { } if (scanOffset != null) { innerScanner = getOffsetScanner(c, innerScanner, - new OffsetResultIterator(new RegionScannerResultIterator(innerScanner), scanOffset), + new OffsetResultIterator(new RegionScannerResultIterator(innerScanner, ScanUtil.getMinMaxQualifiersFromScan(scan), j != null), scanOffset), scan.getAttribute(QueryConstants.LAST_SCAN) != null); } - final OrderedResultIterator iterator = deserializeFromScan(scan,innerScanner); + final OrderedResultIterator iterator = deserializeFromScan(scan, innerScanner, j != null); if (iterator == null) { return innerScanner; } http://git-wip-us.apache.org/repos/asf/phoenix/blob/9525c72f/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java index 7c3bd28..b412b88 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java @@ -39,6 +39,8 @@ import java.util.List; import java.util.Set; import java.util.concurrent.Callable; +import co.cask.tephra.TxConstants; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CoprocessorEnvironment; @@ -118,8 +120,6 @@ import com.google.common.base.Throwables; import com.google.common.collect.Lists; import com.google.common.collect.Sets; -import co.cask.tephra.TxConstants; - /** * Region observer that aggregates ungrouped rows(i.e. SQL query with aggregation function and no GROUP BY). @@ -259,6 +259,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver boolean localIndexScan = ScanUtil.isLocalIndex(scan); final TupleProjector p = TupleProjector.deserializeProjectorFromScan(scan); final HashJoinInfo j = HashJoinInfo.deserializeHashJoinFromScan(scan); + boolean useQualifierAsIndex = ScanUtil.useQualifierAsIndex(ScanUtil.getMinMaxQualifiersFromScan(scan), j != null) && scan.getAttribute(BaseScannerRegionObserver.TOPN) != null; if ((localIndexScan && !isDelete && !isDescRowKeyOrderUpgrade) || (j == null && p != null)) { if (dataColumns != null) { tupleProjector = IndexUtil.getTupleProjector(scan, dataColumns); @@ -268,7 +269,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver ImmutableBytesWritable tempPtr = new ImmutableBytesWritable(); theScanner = getWrappedScanner(c, theScanner, offset, scan, dataColumns, tupleProjector, - dataRegion, indexMaintainers == null ? null : indexMaintainers.get(0), viewConstants, p, tempPtr); + dataRegion, indexMaintainers == null ? null : indexMaintainers.get(0), viewConstants, p, tempPtr, useQualifierAsIndex); } if (j != null) { @@ -289,8 +290,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver boolean hasMore; boolean hasAny = false; Pair<Integer, Integer> minMaxQualifiers = getMinMaxQualifiersFromScan(scan); - boolean useEncodedScheme = minMaxQualifiers != null; - Tuple result = useEncodedScheme ? new PositionBasedMultiKeyValueTuple() : new MultiKeyValueTuple(); + Tuple result = useQualifierAsIndex ? new PositionBasedMultiKeyValueTuple() : new MultiKeyValueTuple(); if (logger.isDebugEnabled()) { logger.debug(LogUtil.addCustomAnnotations("Starting ungrouped coprocessor scan " + scan + " "+region.getRegionInfo(), ScanUtil.getCustomAnnotations(scan))); } @@ -300,7 +300,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver try { synchronized (innerScanner) { do { - List<Cell> results = useEncodedScheme ? new BoundedSkipNullCellsList(minMaxQualifiers.getFirst(), minMaxQualifiers.getSecond()) : new ArrayList<Cell>(); + List<Cell> results = useQualifierAsIndex ? new BoundedSkipNullCellsList(minMaxQualifiers.getFirst(), minMaxQualifiers.getSecond()) : new ArrayList<Cell>(); // Results are potentially returned even when the return value of s.next is false // since this is an indication of whether or not there are more values after the // ones returned
