This is an automated email from the ASF dual-hosted git repository. gokcen pushed a commit to branch 4.x in repository https://gitbox.apache.org/repos/asf/phoenix.git
commit 67268793412789e3806664f90845e074b1f21a36 Author: Gokcen Iskender <gisken...@salesforce.com> AuthorDate: Wed Nov 25 21:12:46 2020 -0800 PHOENIX-6219 GlobalIndexChecker doesn't work for SingleCell indexes Signed-off-by: Gokcen Iskender <gisken...@salesforce.com> --- .../end2end/ConcurrentMutationsExtendedIT.java | 3 + .../apache/phoenix/end2end/IndexExtendedIT.java | 33 +++++-- .../end2end/IndexRepairRegionScannerIT.java | 20 +++- .../phoenix/end2end/IndexScrutinyToolIT.java | 8 ++ .../org/apache/phoenix/end2end/IndexToolIT.java | 4 + .../end2end/index/GlobalIndexCheckerIT.java | 40 ++++---- .../phoenix/end2end/index/SingleCellIndexIT.java | 108 ++++++++++++++++++++- .../apache/phoenix/index/GlobalIndexChecker.java | 4 +- .../org/apache/phoenix/index/IndexMaintainer.java | 54 ++++++++--- .../java/org/apache/phoenix/util/IndexUtil.java | 4 +- 10 files changed, 228 insertions(+), 50 deletions(-) diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsExtendedIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsExtendedIT.java index 52ba058..f39520d 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsExtendedIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsExtendedIT.java @@ -209,11 +209,13 @@ public class ConcurrentMutationsExtendedIT extends ParallelStatsDisabledIT { public void testConcurrentDeletesAndUpsertValues() throws Exception { final String tableName = generateUniqueName(); final String indexName = generateUniqueName(); + final String singleCellindexName = "SC_" + generateUniqueName(); Connection conn = DriverManager.getConnection(getUrl()); conn.createStatement().execute("CREATE TABLE " + tableName + "(k1 INTEGER NOT NULL, k2 INTEGER NOT NULL, v1 INTEGER, CONSTRAINT pk PRIMARY KEY (k1,k2))"); TestUtil.addCoprocessor(conn, tableName, DelayingRegionObserver.class); conn.createStatement().execute("CREATE INDEX " + indexName + " ON " + tableName + "(v1)"); + conn.createStatement().execute("CREATE INDEX " + singleCellindexName + " ON " + tableName + "(v1) IMMUTABLE_STORAGE_SCHEME=SINGLE_CELL_ARRAY_WITH_OFFSETS, COLUMN_ENCODED_BYTES=2"); final CountDownLatch doneSignal = new CountDownLatch(2); Runnable r1 = new Runnable() { @@ -264,6 +266,7 @@ public class ConcurrentMutationsExtendedIT extends ParallelStatsDisabledIT { doneSignal.await(60, TimeUnit.SECONDS); verifyIndexTable(tableName, indexName, conn); + verifyIndexTable(tableName, singleCellindexName, conn); } @Test diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexExtendedIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexExtendedIT.java index 8821fd6..a99e49c 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexExtendedIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexExtendedIT.java @@ -59,6 +59,8 @@ import org.junit.runners.Parameterized.Parameters; import com.google.common.collect.Lists; import com.google.common.collect.Maps; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Tests for the {@link IndexTool} @@ -69,19 +71,30 @@ public class IndexExtendedIT extends BaseTest { private final boolean localIndex; private final boolean useViewIndex; private final String tableDDLOptions; + private final String indexDDLOptions; private final boolean mutable; private final boolean useSnapshot; - + public IndexExtendedIT( boolean mutable, boolean localIndex, boolean useViewIndex, boolean useSnapshot) { this.localIndex = localIndex; this.useViewIndex = useViewIndex; this.mutable = mutable; this.useSnapshot = useSnapshot; StringBuilder optionBuilder = new StringBuilder(); + StringBuilder indexOptionBuilder = new StringBuilder(); if (!mutable) { optionBuilder.append(" IMMUTABLE_ROWS=true "); } + + if (!localIndex) { + if (!(optionBuilder.length() == 0)) { + optionBuilder.append(","); + } + optionBuilder.append(" IMMUTABLE_STORAGE_SCHEME=ONE_CELL_PER_COLUMN, COLUMN_ENCODED_BYTES=0 "); + indexOptionBuilder.append(" IMMUTABLE_STORAGE_SCHEME=SINGLE_CELL_ARRAY_WITH_OFFSETS,COLUMN_ENCODED_BYTES=2 "); + } optionBuilder.append(" SPLIT ON(1,2)"); + this.indexDDLOptions = indexOptionBuilder.toString(); this.tableDDLOptions = optionBuilder.toString(); } @@ -105,7 +118,7 @@ public class IndexExtendedIT extends BaseTest { for (boolean localIndex : Booleans) { for (boolean useViewIndex : Booleans) { for (boolean useSnapshot : Booleans) { - list.add(new Boolean[] { mutable, localIndex, useViewIndex, useSnapshot }); + list.add(new Boolean[] { mutable, localIndex, useViewIndex, useSnapshot}); } } } @@ -142,7 +155,7 @@ public class IndexExtendedIT extends BaseTest { IndexToolIT.upsertRow(stmt1, id++); conn.commit(); - stmt.execute(String.format("CREATE " + (localIndex ? "LOCAL" : "") + " INDEX %s ON %s (UPPER(NAME, 'en_US')) ASYNC ", indexTableName,dataTableFullName)); + stmt.execute(String.format("CREATE " + (localIndex ? "LOCAL" : "") + " INDEX %s ON %s (UPPER(NAME, 'en_US')) ASYNC %s" , indexTableName,dataTableFullName, this.indexDDLOptions)); //update a row stmt1.setInt(1, 1); @@ -196,7 +209,7 @@ public class IndexExtendedIT extends BaseTest { conn.close(); } } - + @Test public void testDeleteFromImmutable() throws Exception { if (mutable) { @@ -208,7 +221,7 @@ public class IndexExtendedIT extends BaseTest { String schemaName = generateUniqueName(); String dataTableName = generateUniqueName(); String dataTableFullName = SchemaUtil.getTableName(schemaName, dataTableName); - String indexTableName = generateUniqueName(); + String indexTableName = "IDX_" + generateUniqueName(); String indexTableFullName = SchemaUtil.getTableName(schemaName, indexTableName); Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); try (Connection conn = DriverManager.getConnection(getUrl(), props)) { @@ -226,7 +239,7 @@ public class IndexExtendedIT extends BaseTest { conn.createStatement().execute("upsert into " + dataTableFullName + " (pk1, pk2, pk3) values ('a', '1', '1')"); conn.createStatement().execute("upsert into " + dataTableFullName + " (pk1, pk2, pk3) values ('b', '2', '2')"); conn.commit(); - conn.createStatement().execute("CREATE " + (localIndex ? "LOCAL" : "") + " INDEX " + indexTableName + " ON " + dataTableFullName + " (pk3, pk2) ASYNC"); + conn.createStatement().execute("CREATE " + (localIndex ? "LOCAL" : "") + " INDEX " + indexTableName + " ON " + dataTableFullName + " (pk3, pk2) ASYNC " + this.indexDDLOptions); // this delete will be issued at a timestamp later than the above timestamp of the index table conn.createStatement().execute("delete from " + dataTableFullName + " where pk1 = 'a'"); @@ -315,8 +328,8 @@ public class IndexExtendedIT extends BaseTest { Table hIndexTable = conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(Bytes.toBytes(physicalTableNameOfIndex)); stmt.execute( - String.format("CREATE INDEX %s ON %s (UPPER(NAME, 'en_US')) ", indexName, - baseTableFullNameOfIndex)); + String.format("CREATE INDEX %s ON %s (UPPER(NAME, 'en_US')) %s", indexName, + baseTableFullNameOfIndex, this.indexDDLOptions)); long dataCnt = getRowCount(conn, dataTableFullName); long indexCnt = getUtility().countRows(hIndexTable); assertEquals(dataCnt, indexCnt); @@ -372,8 +385,8 @@ public class IndexExtendedIT extends BaseTest { // lead to any change on index and thus index verify during index rebuild should fail IndexRebuildRegionScanner.setIgnoreIndexRebuildForTesting(true); stmt.execute(String.format( - "CREATE INDEX %s ON %s (NAME) INCLUDE (ZIP) ASYNC", - indexTableName, dataTableFullName)); + "CREATE INDEX %s ON %s (NAME) INCLUDE (ZIP) ASYNC %s", + indexTableName, dataTableFullName, this.indexDDLOptions)); // Verify that the index table is not in the ACTIVE state assertFalse(checkIndexState(conn, indexFullName, PIndexState.ACTIVE, 0L)); diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexRepairRegionScannerIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexRepairRegionScannerIT.java index 5018cdd..fbea615 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexRepairRegionScannerIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexRepairRegionScannerIT.java @@ -96,25 +96,37 @@ import static org.junit.Assert.fail; public class IndexRepairRegionScannerIT extends ParallelStatsDisabledIT { private final String tableDDLOptions; + private final String indexDDLOptions; private boolean mutable; @Rule public ExpectedException exceptionRule = ExpectedException.none(); - public IndexRepairRegionScannerIT(boolean mutable) { + public IndexRepairRegionScannerIT(boolean mutable, boolean singleCellIndex) { StringBuilder optionBuilder = new StringBuilder(); + StringBuilder indexOptionBuilder = new StringBuilder(); this.mutable = mutable; if (!mutable) { optionBuilder.append(" IMMUTABLE_ROWS=true "); } + if (singleCellIndex) { + if (!(optionBuilder.length() == 0)) { + optionBuilder.append(","); + } + optionBuilder.append(" IMMUTABLE_STORAGE_SCHEME=ONE_CELL_PER_COLUMN, COLUMN_ENCODED_BYTES=0 "); + indexOptionBuilder.append(" IMMUTABLE_STORAGE_SCHEME=SINGLE_CELL_ARRAY_WITH_OFFSETS,COLUMN_ENCODED_BYTES=2"); + } optionBuilder.append(" SPLIT ON(1,2)"); + this.indexDDLOptions = indexOptionBuilder.toString(); this.tableDDLOptions = optionBuilder.toString(); } - @Parameterized.Parameters(name = "mutable={0}") + @Parameterized.Parameters(name = "mutable={0}, singleCellIndex={1}") public static synchronized Collection<Object[]> data() { return Arrays.asList(new Object[][] { - {true}, - {false} }); + {true, true}, + {true, false}, + {false, true}, + {false, false}}); } @BeforeClass diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexScrutinyToolIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexScrutinyToolIT.java index 8237fd1..03b6bba 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexScrutinyToolIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexScrutinyToolIT.java @@ -305,6 +305,9 @@ public class IndexScrutinyToolIT extends IndexScrutinyToolBaseIT { * a covered index value is incorrect. Scrutiny should report the invalid row */ @Test public void testCoveredValueIncorrect() throws Exception { + if (isOnlyIndexSingleCell()) { + return; + } // insert one valid row upsertRow(dataTableUpsertStmt, 1, "name-1", 94010); conn.commit(); @@ -322,6 +325,7 @@ public class IndexScrutinyToolIT extends IndexScrutinyToolBaseIT { List<Job> completedJobs = runScrutiny(schemaName, dataTableName, indexTableName); Job job = completedJobs.get(0); assertTrue(job.isSuccessful()); + Counters counters = job.getCounters(); assertEquals(1, getCounterValue(counters, VALID_ROW_COUNT)); assertEquals(1, getCounterValue(counters, INVALID_ROW_COUNT)); @@ -405,6 +409,7 @@ public class IndexScrutinyToolIT extends IndexScrutinyToolBaseIT { runScrutiny(schemaName, dataTableName, indexTableName, 10L, SourceTable.INDEX_TABLE_SOURCE); Job job = completedJobs.get(0); assertTrue(job.isSuccessful()); + Counters counters = job.getCounters(); assertEquals(1, getCounterValue(counters, VALID_ROW_COUNT)); assertEquals(2, getCounterValue(counters, INVALID_ROW_COUNT)); @@ -415,6 +420,9 @@ public class IndexScrutinyToolIT extends IndexScrutinyToolBaseIT { * incorrectly indexed row, it should be reported in each direction */ @Test public void testBothDataAndIndexAsSource() throws Exception { + if (isOnlyIndexSingleCell()) { + return; + } // insert one valid row upsertRow(dataTableUpsertStmt, 1, "name-1", 94010); conn.commit(); diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java index 5c62837..8fb5ea7 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java @@ -134,6 +134,10 @@ public class IndexToolIT extends BaseUniqueNamesOwnClusterIT { this.tableDDLOptions = optionBuilder.toString(); StringBuilder indexOptionBuilder = new StringBuilder(); if (!localIndex && transactionProvider == null) { + if (!(optionBuilder.length() == 0)) { + optionBuilder.append(","); + } + optionBuilder.append(" COLUMN_ENCODED_BYTES=0"); indexOptionBuilder.append(" IMMUTABLE_STORAGE_SCHEME=SINGLE_CELL_ARRAY_WITH_OFFSETS,COLUMN_ENCODED_BYTES=2"); } this.indexDDLOptions = indexOptionBuilder.toString(); diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/GlobalIndexCheckerIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/GlobalIndexCheckerIT.java index 97efeee..985e182 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/GlobalIndexCheckerIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/GlobalIndexCheckerIT.java @@ -43,6 +43,7 @@ import java.util.List; import java.util.Map; import java.util.Properties; +import com.google.common.base.Strings; import com.google.common.collect.Maps; import org.apache.phoenix.end2end.BaseUniqueNamesOwnClusterIT; @@ -71,8 +72,10 @@ import org.slf4j.LoggerFactory; public class GlobalIndexCheckerIT extends BaseUniqueNamesOwnClusterIT { private static final Logger LOG = LoggerFactory.getLogger(GlobalIndexCheckerIT.class); private final boolean async; + private String indexDDLOptions; private String tableDDLOptions; private StringBuilder optionBuilder; + private StringBuilder indexOptionBuilder; private final boolean encoded; public GlobalIndexCheckerIT(boolean async, boolean encoded) { this.async = async; @@ -89,10 +92,13 @@ public class GlobalIndexCheckerIT extends BaseUniqueNamesOwnClusterIT { @Before public void beforeTest(){ optionBuilder = new StringBuilder(); + indexOptionBuilder = new StringBuilder(); if (!encoded) { optionBuilder.append(" COLUMN_ENCODED_BYTES=0"); + indexOptionBuilder.append(" IMMUTABLE_STORAGE_SCHEME=SINGLE_CELL_ARRAY_WITH_OFFSETS, COLUMN_ENCODED_BYTES=2"); } this.tableDDLOptions = optionBuilder.toString(); + this.indexDDLOptions = indexOptionBuilder.toString(); } @Parameters( @@ -143,7 +149,7 @@ public class GlobalIndexCheckerIT extends BaseUniqueNamesOwnClusterIT { conn.commit(); String indexTableName = generateUniqueName(); conn.createStatement().execute("CREATE INDEX " + indexTableName + " on " + - dataTableName + " (val1) include (val2, val3)" + (async ? "ASYNC" : "")); + dataTableName + " (val1) include (val2, val3)" + (async ? "ASYNC" : "") + this.indexDDLOptions); if (async) { // run the index MR job. IndexToolIT.runIndexTool(true, false, null, dataTableName, indexTableName); @@ -170,7 +176,7 @@ public class GlobalIndexCheckerIT extends BaseUniqueNamesOwnClusterIT { populateTable(dataTableName); // with two rows ('a', 'ab', 'abc', 'abcd') and ('b', 'bc', 'bcd', 'bcde') String indexTableName = generateUniqueName(); conn.createStatement().execute("CREATE INDEX " + indexTableName + " on " + - dataTableName + " (val1) include (val2, val3)"); + dataTableName + " (val1) include (val2, val3)" + this.indexDDLOptions); String dml = "DELETE from " + dataTableName + " WHERE id = 'a'"; conn.createStatement().executeUpdate(dml); @@ -196,7 +202,7 @@ public class GlobalIndexCheckerIT extends BaseUniqueNamesOwnClusterIT { try (Connection conn = DriverManager.getConnection(getUrl())) { populateTable(dataTableName); // with two rows ('a', 'ab', 'abc', 'abcd') and ('b', 'bc', 'bcd', 'bcde') conn.createStatement().execute("CREATE INDEX " + indexTableName + " on " + - dataTableName + " (val1) include (val2, val3)"); + dataTableName + " (val1) include (val2, val3)" + this.indexDDLOptions); scn = EnvironmentEdgeManager.currentTimeMillis(); // Configure IndexRegionObserver to fail the data write phase IndexRegionObserver.setFailDataTableUpdatesForTesting(true); @@ -235,7 +241,7 @@ public class GlobalIndexCheckerIT extends BaseUniqueNamesOwnClusterIT { try (Connection conn = DriverManager.getConnection(getUrl())) { String indexTableName = generateUniqueName(); conn.createStatement().execute("CREATE INDEX " + indexTableName + " on " + - dataTableName + " (val1) include (val2, val3)"); + dataTableName + " (val1) include (val2, val3)" + this.indexDDLOptions); conn.commit(); // Read all index rows and rewrite them back directly. This will overwrite existing rows with newer // timestamps and set the empty column to value "x". This will make them unverified @@ -287,7 +293,7 @@ public class GlobalIndexCheckerIT extends BaseUniqueNamesOwnClusterIT { populateTable(dataTableName); // with two rows ('a', 'ab', 'abc', 'abcd') and ('b', 'bc', 'bcd', 'bcde') String indexTableName = generateUniqueName(); conn.createStatement().execute("CREATE INDEX " + indexTableName + " on " + - dataTableName + " (val1) include (val2, val3)" + (async ? "ASYNC" : "")); + dataTableName + " (val1) include (val2, val3)" + (async ? "ASYNC" : "") + this.indexDDLOptions); if (async) { // run the index MR job. IndexToolIT.runIndexTool(true, false, null, dataTableName, indexTableName); @@ -331,7 +337,7 @@ public class GlobalIndexCheckerIT extends BaseUniqueNamesOwnClusterIT { try (Connection conn = DriverManager.getConnection(getUrl())) { String indexTableName = generateUniqueName(); conn.createStatement().execute("CREATE INDEX " + indexTableName + " on " + - dataTableName + " (val1) include (val2, val3)" + (async ? "ASYNC" : "")); + dataTableName + " (val1) include (val2, val3)" + (async ? "ASYNC" : "") + this.indexDDLOptions); if (async) { // run the index MR job. IndexToolIT.runIndexTool(true, false, null, dataTableName, indexTableName); @@ -376,7 +382,7 @@ public class GlobalIndexCheckerIT extends BaseUniqueNamesOwnClusterIT { Connection conn = DriverManager.getConnection(getUrl()); String indexTableName = generateUniqueName(); conn.createStatement().execute("CREATE INDEX " + indexTableName + " on " + - dataTableName + " (val1) include (val2, val3)" + (async ? "ASYNC" : "")); + dataTableName + " (val1) include (val2, val3)" + (async ? "ASYNC" : "") + this.indexDDLOptions); if (async) { // run the index MR job. IndexToolIT.runIndexTool(true, false, null, dataTableName, indexTableName); @@ -446,7 +452,7 @@ public class GlobalIndexCheckerIT extends BaseUniqueNamesOwnClusterIT { conn.commit(); String indexTableName = generateUniqueName(); conn.createStatement().execute("CREATE INDEX " + indexTableName + " on " + - dataTableName + " (val1) include (val2, val3)"); + dataTableName + " (val1) include (val2, val3)" + this.indexDDLOptions); conn.createStatement().execute("upsert into " + dataTableName + " (id, val1, val2) values ('a', 'ab', 'abcc')"); conn.commit(); String selectSql = "SELECT * from " + dataTableName + " WHERE val1 = 'ab'"; @@ -469,7 +475,7 @@ public class GlobalIndexCheckerIT extends BaseUniqueNamesOwnClusterIT { try (Connection conn = DriverManager.getConnection(getUrl())) { String indexTableName = generateUniqueName(); conn.createStatement().execute("CREATE INDEX " + indexTableName + " on " + - dataTableName + " (val1) include (val2, val3)" + (async ? "ASYNC" : "")); + dataTableName + " (val1) include (val2, val3)" + (async ? "ASYNC" : "") + this.indexDDLOptions); if (async) { // run the index MR job. IndexToolIT.runIndexTool(true, false, null, dataTableName, indexTableName); @@ -502,7 +508,7 @@ public class GlobalIndexCheckerIT extends BaseUniqueNamesOwnClusterIT { try (Connection conn = DriverManager.getConnection(getUrl())) { String indexTableName = generateUniqueName(); conn.createStatement().execute("CREATE INDEX " + indexTableName + " on " + - dataTableName + " (val1) include (val2, val3)" + (async ? "ASYNC" : "")); + dataTableName + " (val1) include (val2, val3)" + (async ? "ASYNC" : "") + this.indexDDLOptions); if (async) { // run the index MR job. IndexToolIT.runIndexTool(true, false, null, dataTableName, indexTableName); @@ -553,7 +559,7 @@ public class GlobalIndexCheckerIT extends BaseUniqueNamesOwnClusterIT { " (id varchar(10) not null primary key, val1 varchar(10), val2 varchar(10), val3 varchar(10))" + tableDDLOptions); String indexTableName = generateUniqueName(); conn.createStatement().execute("CREATE INDEX " + indexTableName + " on " + - dataTableName + " (val1) include (val2, val3)"); + dataTableName + " (val1) include (val2, val3)" + this.indexDDLOptions); // Configure IndexRegionObserver to fail the data write phase IndexRegionObserver.setFailDataTableUpdatesForTesting(true); @@ -591,7 +597,7 @@ public class GlobalIndexCheckerIT extends BaseUniqueNamesOwnClusterIT { " (id varchar(10) not null primary key, a.val1 varchar(10), b.val2 varchar(10), c.val3 varchar(10))" + tableDDLOptions); String indexTableName = generateUniqueName(); conn.createStatement().execute("CREATE INDEX " + indexTableName + " on " + - dataTableName + " (val1) include (val2, val3)"); + dataTableName + " (val1) include (val2, val3)" + this.indexDDLOptions); // Configure IndexRegionObserver to fail the data write phase IndexRegionObserver.setFailDataTableUpdatesForTesting(true); conn.createStatement().execute("upsert into " + dataTableName + " (id, val1, val3) values ('a', 'ab','abcde')"); @@ -647,9 +653,9 @@ public class GlobalIndexCheckerIT extends BaseUniqueNamesOwnClusterIT { populateTable(dataTableName); // with two rows ('a', 'ab', 'abc', 'abcd') and ('b', 'bc', 'bcd', 'bcde') String indexTableName = generateUniqueName(); conn.createStatement().execute("CREATE INDEX " + indexTableName + "1 on " + - dataTableName + " (val1) include (val2, val3)" + (async ? "ASYNC" : "")); + dataTableName + " (val1) include (val2, val3)" + (async ? "ASYNC" : "") + this.indexDDLOptions); conn.createStatement().execute("CREATE INDEX " + indexTableName + "2 on " + - dataTableName + " (val2) include (val1, val3)" + (async ? "ASYNC" : "")); + dataTableName + " (val2) include (val1, val3)" + (async ? "ASYNC" : "") + this.indexDDLOptions); if (async) { // run the index MR job. IndexToolIT.runIndexTool(true, false, null, dataTableName, indexTableName + "1"); @@ -765,10 +771,10 @@ public class GlobalIndexCheckerIT extends BaseUniqueNamesOwnClusterIT { populateTable(dataTableName); // with two rows ('a', 'ab', 'abc', 'abcd') and ('b', 'bc', 'bcd', 'bcde') conn.createStatement().execute("CREATE INDEX " + indexTableName + "1 on " + dataTableName + " (val1) include (val2, val3)" + (async ? "ASYNC" : "") + - " VERSIONS=" + indexVersions); + " VERSIONS=" + indexVersions + (Strings.isNullOrEmpty(this.indexDDLOptions) ? "" : "," + this.indexDDLOptions)); conn.createStatement().execute("CREATE INDEX " + indexTableName + "2 on " + dataTableName + " (val2) include (val1, val3)" + (async ? "ASYNC" : "")+ - " VERSIONS=" + indexVersions); + " VERSIONS=" + indexVersions + (Strings.isNullOrEmpty(this.indexDDLOptions) ? "" : "," + this.indexDDLOptions)); conn.commit(); if (async) { // run the index MR job. @@ -835,7 +841,7 @@ public class GlobalIndexCheckerIT extends BaseUniqueNamesOwnClusterIT { // Create an index on the view String indexName = generateUniqueName(); conn.createStatement().execute("CREATE INDEX " + indexName + " on " + - viewName + " (val2) include (val3)"); + viewName + " (val2) include (val3)" + this.indexDDLOptions); Properties props = new Properties(); props.setProperty(PhoenixRuntime.TENANT_ID_ATTRIB, "o1"); try (Connection tenantConn = DriverManager.getConnection(getUrl(), props)) { diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/SingleCellIndexIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/SingleCellIndexIT.java index 1d6763a..9fb6e7e 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/SingleCellIndexIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/SingleCellIndexIT.java @@ -84,7 +84,7 @@ public class SingleCellIndexIT extends ParallelStatsDisabledIT { String tableName = "TBL_" + generateUniqueName(); String idxName = "IND_" + generateUniqueName(); - createTableAndIndex(conn, tableName, idxName, this.tableDDLOptions, 3); + createTableAndIndex(conn, tableName, idxName, this.tableDDLOptions, false,3); assertMetadata(conn, ONE_CELL_PER_COLUMN, NON_ENCODED_QUALIFIERS, tableName); assertMetadata(conn, SINGLE_CELL_ARRAY_WITH_OFFSETS, TWO_BYTE_QUALIFIERS, idxName); @@ -134,7 +134,7 @@ public class SingleCellIndexIT extends ParallelStatsDisabledIT { String tableName = "TBL_" + generateUniqueName(); String idxName = "IND_" + generateUniqueName(); - createTableAndIndex(conn, tableName, idxName, null, 3); + createTableAndIndex(conn, tableName, idxName, null, false,3); assertMetadata(conn, ONE_CELL_PER_COLUMN, NON_ENCODED_QUALIFIERS, tableName); assertMetadata(conn, SINGLE_CELL_ARRAY_WITH_OFFSETS, TWO_BYTE_QUALIFIERS, idxName); @@ -170,7 +170,7 @@ public class SingleCellIndexIT extends ParallelStatsDisabledIT { String tableName = "TBL_" + generateUniqueName(); String idxName = "IND_" + generateUniqueName(); - createTableAndIndex(conn, tableName, idxName, null, 1); + createTableAndIndex(conn, tableName, idxName, null, false,1); assertMetadata(conn, ONE_CELL_PER_COLUMN, NON_ENCODED_QUALIFIERS, tableName); assertMetadata(conn, SINGLE_CELL_ARRAY_WITH_OFFSETS, TWO_BYTE_QUALIFIERS, idxName); @@ -303,6 +303,102 @@ public class SingleCellIndexIT extends ParallelStatsDisabledIT { } } + @Test + public void testUpsertSelect() throws Exception { + String tableName = generateUniqueName(); + String idxName = "IDX_" + generateUniqueName(); + + try (Connection conn = DriverManager.getConnection(getUrl(), testProps)) { + conn.setAutoCommit(true); + createTableAndIndex(conn, tableName, idxName, null, true, 2); + assertMetadata(conn, ONE_CELL_PER_COLUMN, NON_ENCODED_QUALIFIERS, tableName); + assertMetadata(conn, SINGLE_CELL_ARRAY_WITH_OFFSETS, TWO_BYTE_QUALIFIERS, idxName); + + // this delete will be issued at a timestamp later than the above timestamp of the index table + conn.createStatement().execute("delete from " + tableName + " where pk1 = 'PK1'"); + conn.commit(); + String + sql = + "UPSERT INTO " + idxName + "(\":PK1\",\":INT_PK\",\"0:V1\",\"0:V2\",\"0:V4\") " + + " SELECT /*+ NO_INDEX */ PK1,INT_PK, V1, V2,V4 FROM " + + tableName; + conn.createStatement().executeUpdate(sql); + conn.commit(); + + // validate that delete markers were issued correctly and only ('a', '1', 'value1') was + // deleted + String query = "SELECT \":PK1\" from " + idxName + " ORDER BY \":PK1\""; + ResultSet rs = conn.createStatement().executeQuery(query); + assertTrue(rs.next()); + assertEquals("PK2", rs.getString(1)); + assertFalse(rs.next()); + } + } + + @Test + public void testMultipleColumnFamilies() throws Exception { + String tableName = generateUniqueName(); + String idxName = "IDX_" + generateUniqueName(); + int numOfRows = 2; + + try (Connection conn = DriverManager.getConnection(getUrl(), testProps)) { + conn.setAutoCommit(true); + String createTableSql = "CREATE TABLE " + tableName + + " (PK1 VARCHAR NOT NULL, INT_PK INTEGER NOT NULL, V1 VARCHAR, A.V2 INTEGER, B.V3 INTEGER, A.V4 VARCHAR, B.V5 VARCHAR CONSTRAINT NAME_PK PRIMARY KEY(PK1, INT_PK)) "; + conn.createStatement().execute(createTableSql); + String createIndexSql = "CREATE INDEX " + idxName + " ON " + tableName + " (A.V2) include (B.V3, A.V4, B.V5) " + + " IMMUTABLE_STORAGE_SCHEME=SINGLE_CELL_ARRAY_WITH_OFFSETS, COLUMN_ENCODED_BYTES=2"; + LOGGER.debug(createIndexSql); + conn.createStatement().execute(createIndexSql); + String upsert = "UPSERT INTO " + tableName + " (PK1, INT_PK, V1, A.V2, B.V3, A.V4, B.V5) VALUES (?,?,?,?,?,?,?)"; + PreparedStatement upsertStmt = conn.prepareStatement(upsert); + + for (int i=1; i <= numOfRows; i++) { + upsertStmt.setString(1, "PK"+i); + upsertStmt.setInt(2, i); + upsertStmt.setString(3, "V1"+i); + upsertStmt.setInt(4, i+1); + upsertStmt.setInt(5, i+2); + upsertStmt.setString(6, "V4"+i); + upsertStmt.setString(7, "V5"+i); + upsertStmt.executeUpdate(); + } + assertMetadata(conn, ONE_CELL_PER_COLUMN, NON_ENCODED_QUALIFIERS, tableName); + assertMetadata(conn, SINGLE_CELL_ARRAY_WITH_OFFSETS, TWO_BYTE_QUALIFIERS, idxName); + + String query = "SELECT * from " + idxName + " ORDER BY \":PK1\""; + ResultSet rs = conn.createStatement().executeQuery(query); + assertTrue(rs.next()); + assertEquals("2", rs.getString(1)); + assertEquals("PK1", rs.getString(2)); + assertEquals("1", rs.getString(3)); + assertEquals("3", rs.getString(4)); + assertEquals("V41", rs.getString(5)); + assertEquals("V51", rs.getString(6)); + assertTrue(rs.next()); + assertEquals("3", rs.getString(1)); + assertEquals("PK2", rs.getString(2)); + assertEquals("2", rs.getString(3)); + assertEquals("4", rs.getString(4)); + assertEquals("V42", rs.getString(5)); + assertEquals("V52", rs.getString(6)); + + String selectFromIndex = "SELECT PK1, INT_PK, A.V2, B.V3, A.V4, B.V5 FROM " + tableName + " where A.V4='V42' and B.V3 >= 3"; + rs = conn.createStatement().executeQuery("EXPLAIN " + selectFromIndex); + String actualExplainPlan = QueryUtil.getExplainPlan(rs); + assertTrue(actualExplainPlan.contains(idxName)); + rs = conn.createStatement().executeQuery(selectFromIndex); + assertTrue(rs.next()); + assertEquals("PK2", rs.getString(1)); + assertEquals("2", rs.getString(2)); + assertEquals("3", rs.getString(3)); + assertEquals("4", rs.getString(4)); + assertEquals("V42", rs.getString(5)); + assertEquals("V52", rs.getString(6)); + assertFalse(rs.next()); + } + } + private Connection getTenantConnection(String tenantId) throws Exception { Properties tenantProps = PropertiesUtil.deepCopy(testProps); tenantProps.setProperty(PhoenixRuntime.TENANT_ID_ATTRIB, tenantId); @@ -318,14 +414,16 @@ public class SingleCellIndexIT extends ParallelStatsDisabledIT { assertEquals(expectedColumnEncoding, table.getEncodingScheme()); } - private void createTableAndIndex(Connection conn, String tableName, String indexName, String tableDDL, int numOfRows) + private void createTableAndIndex(Connection conn, String tableName, String indexName, String tableDDL, boolean async, int numOfRows) throws SQLException { String createTableSql = "CREATE TABLE " + tableName + " (PK1 VARCHAR NOT NULL, INT_PK INTEGER NOT NULL, V1 VARCHAR, V2 INTEGER, V3 INTEGER, V4 VARCHAR, V5 VARCHAR CONSTRAINT NAME_PK PRIMARY KEY(PK1, INT_PK)) " + (tableDDL == null ? "" : tableDDL); LOGGER.debug(createTableSql); conn.createStatement().execute(createTableSql); - String createIndexSql = "CREATE INDEX " + indexName + " ON " + tableName + " (PK1, INT_PK) include (V1,V2,V4) IMMUTABLE_STORAGE_SCHEME=SINGLE_CELL_ARRAY_WITH_OFFSETS, COLUMN_ENCODED_BYTES=2"; + String createIndexSql = "CREATE INDEX " + indexName + " ON " + tableName + " (PK1, INT_PK) include (V1,V2,V4) " + (async? "ASYNC":"") + + " IMMUTABLE_STORAGE_SCHEME=SINGLE_CELL_ARRAY_WITH_OFFSETS, COLUMN_ENCODED_BYTES=2"; + LOGGER.debug(createIndexSql); conn.createStatement().execute(createIndexSql); String upsert = "UPSERT INTO " + tableName + " (PK1, INT_PK, V1, V2, V3, V4, V5) VALUES (?,?,?,?,?,?,?)"; diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/GlobalIndexChecker.java b/phoenix-core/src/main/java/org/apache/phoenix/index/GlobalIndexChecker.java index 6961861..12592c0 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/index/GlobalIndexChecker.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/index/GlobalIndexChecker.java @@ -328,10 +328,10 @@ public class GlobalIndexChecker extends BaseRegionObserver { buildIndexScan.setAttribute(BaseScannerRegionObserver.REBUILD_INDEXES, TRUE_BYTES); buildIndexScan.setAttribute(BaseScannerRegionObserver.SKIP_REGION_BOUNDARY_CHECK, Bytes.toBytes(true)); // Scan only columns included in the index table plus the empty column - for (ColumnReference column : indexMaintainer.getAllColumns()) { + for (ColumnReference column : indexMaintainer.getAllColumnsForDataTable()) { buildIndexScan.addColumn(column.getFamily(), column.getQualifier()); } - buildIndexScan.addColumn(indexMaintainer.getDataEmptyKeyValueCF(), indexMaintainer.getEmptyKeyValueQualifier()); + buildIndexScan.addColumn(indexMaintainer.getDataEmptyKeyValueCF(), indexMaintainer.getEmptyKeyValueQualifierForDataTable()); } // Rebuild the index row from the corresponding the row in the the data table // Get the data row key from the index row key diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java index c7a5e70..c0bd10f 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java @@ -160,15 +160,27 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { }); } - public static Iterator<PTable> maintainedGlobalIndexes(Iterator<PTable> indexes) { + public static Iterator<PTable> maintainedGlobalIndexesWithMatchingStorageScheme(final PTable dataTable, Iterator<PTable> indexes) { return Iterators.filter(indexes, new Predicate<PTable>() { @Override public boolean apply(PTable index) { - return sendIndexMaintainer(index) && index.getIndexType() == IndexType.GLOBAL; + return sendIndexMaintainer(index) && index.getIndexType() == IndexType.GLOBAL + && dataTable.getImmutableStorageScheme() == index.getImmutableStorageScheme(); } }); } - + + public static Iterator<PTable> maintainedLocalOrGlobalIndexesWithoutMatchingStorageScheme(final PTable dataTable, Iterator<PTable> indexes) { + return Iterators.filter(indexes, new Predicate<PTable>() { + @Override + public boolean apply(PTable index) { + return sendIndexMaintainer(index) && ((index.getIndexType() == IndexType.GLOBAL + && dataTable.getImmutableStorageScheme() != index.getImmutableStorageScheme()) + || index.getIndexType() == IndexType.LOCAL); + } + }); + } + public static Iterator<PTable> maintainedLocalIndexes(Iterator<PTable> indexes) { return Iterators.filter(indexes, new Predicate<PTable>() { @Override @@ -195,7 +207,7 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { if (onlyLocalIndexes) { if (!dataTable.isTransactional() || !dataTable.getTransactionProvider().getTransactionProvider().isUnsupported(Feature.MAINTAIN_LOCAL_INDEX_ON_SERVER)) { - indexesItr = maintainedLocalIndexes(indexes.iterator()); + indexesItr = maintainedLocalOrGlobalIndexesWithoutMatchingStorageScheme(dataTable, indexes.iterator()); } } else { indexesItr = maintainedIndexes(indexes.iterator()); @@ -360,6 +372,7 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { private int estimatedExpressionSize; private int[] dataPkPosition; private int maxTrailingNulls; + private ColumnReference indexEmptyKeyValueRef; private ColumnReference dataEmptyKeyValueRef; private boolean rowKeyOrderOptimizable; @@ -1019,7 +1032,7 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { Put put = null; // New row being inserted: add the empty key value ImmutableBytesWritable latestValue = null; - if (valueGetter==null || (latestValue = valueGetter.getLatestValue(dataEmptyKeyValueRef, ts)) == null || latestValue == ValueGetter.HIDDEN_BY_DELETE) { + if (valueGetter==null || (latestValue = valueGetter.getLatestValue(indexEmptyKeyValueRef, ts)) == null || latestValue == ValueGetter.HIDDEN_BY_DELETE) { // We need to track whether or not our empty key value is hidden by a Delete Family marker at the same timestamp. // If it is, these Puts will be masked so should not be emitted. if (latestValue == ValueGetter.HIDDEN_BY_DELETE) { @@ -1028,7 +1041,7 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { put = new Put(indexRowKey); // add the keyvalue for the empty row put.add(kvBuilder.buildPut(new ImmutableBytesPtr(indexRowKey), - this.getEmptyKeyValueFamily(), dataEmptyKeyValueRef.getQualifierWritable(), ts, + this.getEmptyKeyValueFamily(), indexEmptyKeyValueRef.getQualifierWritable(), ts, QueryConstants.EMPTY_COLUMN_VALUE_BYTES_PTR)); put.setDurability(!indexWALDisabled ? Durability.USE_DEFAULT : Durability.SKIP_WAL); } @@ -1282,7 +1295,20 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { public Set<ColumnReference> getAllColumns() { return allColumns; } - + + public Set<ColumnReference> getAllColumnsForDataTable() { + Set<ColumnReference> result = Sets.newLinkedHashSetWithExpectedSize(indexedExpressions.size() + coveredColumnsMap.size()); + result.addAll(indexedColumns); + for (ColumnReference colRef : coveredColumnsMap.keySet()) { + if (getDataImmutableStorageScheme()==ImmutableStorageScheme.ONE_CELL_PER_COLUMN) { + result.add(colRef); + } else { + result.add(new ColumnReference(colRef.getFamily(), QueryConstants.SINGLE_KEYVALUE_COLUMN_QUALIFIER_BYTES)); + } + } + return result; + } + public ImmutableBytesPtr getEmptyKeyValueFamily() { // Since the metadata of an index table will never change, // we can infer this based on the family of the first covered column @@ -1682,8 +1708,10 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { * Init calculated state reading/creating */ private void initCachedState() { - byte[] emptyKvQualifier = EncodedColumnsUtil.getEmptyKeyValueInfo(encodingScheme).getFirst(); - dataEmptyKeyValueRef = new ColumnReference(dataEmptyKeyValueCF, emptyKvQualifier); + byte[] indexEmptyKvQualifier = EncodedColumnsUtil.getEmptyKeyValueInfo(encodingScheme).getFirst(); + byte[] dataEmptyKvQualifier = EncodedColumnsUtil.getEmptyKeyValueInfo(dataEncodingScheme).getFirst(); + indexEmptyKeyValueRef = new ColumnReference(dataEmptyKeyValueCF, indexEmptyKvQualifier); + dataEmptyKeyValueRef = new ColumnReference(dataEmptyKeyValueCF, dataEmptyKvQualifier); this.allColumns = Sets.newLinkedHashSetWithExpectedSize(indexedExpressions.size() + coveredColumnsMap.size()); // columns that are required to evaluate all expressions in indexedExpressions (not including columns in data row key) this.indexedColumns = Sets.newLinkedHashSetWithExpectedSize(indexedExpressions.size()); @@ -1928,7 +1956,7 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { return new ValueGetter() { @Override public ImmutableBytesWritable getLatestValue(ColumnReference ref, long ts) { - if(ref.equals(dataEmptyKeyValueRef)) return null; + if(ref.equals(indexEmptyKeyValueRef)) return null; return valueMap.get(ref); } @Override @@ -1982,9 +2010,13 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { } public byte[] getEmptyKeyValueQualifier() { + return indexEmptyKeyValueRef.getQualifier(); + } + + public byte[] getEmptyKeyValueQualifierForDataTable() { return dataEmptyKeyValueRef.getQualifier(); } - + public Set<Pair<String, String>> getIndexedColumnInfo() { return indexedColumnsInfo; } diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java index 1cba74f..6e9e8f0 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java @@ -878,7 +878,9 @@ public class IndexUtil { (table.isTransactional() && table.getTransactionProvider().getTransactionProvider().isUnsupported(Feature.MAINTAIN_LOCAL_INDEX_ON_SERVER)) ? IndexMaintainer.maintainedIndexes(table.getIndexes().iterator()) : (table.isImmutableRows() || table.isTransactional()) ? - IndexMaintainer.maintainedGlobalIndexes(table.getIndexes().iterator()) : + // If the data table has a different storage scheme than index table, don't maintain this on the client + // For example, if the index is single cell but the data table is one_cell, if there is a partial update on the data table, index can't be built on the client. + IndexMaintainer.maintainedGlobalIndexesWithMatchingStorageScheme(table, table.getIndexes().iterator()) : Collections.<PTable>emptyIterator(); return Lists.newArrayList(indexIterator); }