http://git-wip-us.apache.org/repos/asf/phoenix/blob/8bc58328/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java index b81b904..18e543c 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java @@ -64,6 +64,7 @@ import org.apache.phoenix.compile.StatementContext; import org.apache.phoenix.compile.WhereCompiler; import org.apache.phoenix.coprocessor.BaseScannerRegionObserver; import org.apache.phoenix.coprocessor.MetaDataProtocol.MetaDataMutationResult; +import org.apache.phoenix.coprocessor.MetaDataProtocol.MutationCode; import org.apache.phoenix.coprocessor.generated.MetaDataProtos.MetaDataResponse; import org.apache.phoenix.coprocessor.generated.MetaDataProtos.MetaDataService; import org.apache.phoenix.coprocessor.generated.MetaDataProtos.UpdateIndexStateRequest; @@ -113,8 +114,8 @@ import org.apache.phoenix.schema.types.PVarbinary; import org.apache.phoenix.schema.types.PVarchar; import org.apache.phoenix.transaction.PhoenixTransactionContext; +import com.google.common.base.Preconditions; import com.google.common.collect.Lists; -import com.google.protobuf.ServiceException; public class IndexUtil { public static final String INDEX_COLUMN_NAME_SEP = ":"; @@ -308,7 +309,7 @@ public class IndexUtil { } @Override - public ImmutableBytesWritable getLatestValue(ColumnReference ref) { + public ImmutableBytesWritable getLatestValue(ColumnReference ref, long ts) { // Always return null for our empty key value, as this will cause the index // maintainer to always treat this Put as a new row. if (isEmptyKeyValue(table, ref)) { @@ -712,8 +713,8 @@ public class IndexUtil { HConstants.NO_NONCE, HConstants.NO_NONCE); } - public static MetaDataMutationResult setIndexDisableTimeStamp(String indexTableName, long minTimeStamp, - HTableInterface metaTable, PIndexState newState) throws ServiceException, Throwable { + public static MetaDataMutationResult updateIndexState(String indexTableName, long minTimeStamp, + HTableInterface metaTable, PIndexState newState) throws Throwable { byte[] indexTableKey = SchemaUtil.getTableKeyFromFullName(indexTableName); // Mimic the Put that gets generated by the client on an update of the index state Put put = new Put(indexTableKey); @@ -786,5 +787,41 @@ public class IndexUtil { public static boolean isLocalIndexFamily(String family) { return family.indexOf(LOCAL_INDEX_COLUMN_FAMILY_PREFIX) != -1; } + + public static void updateIndexState(PhoenixConnection conn, String indexTableName, + PIndexState newState, Long indexDisableTimestamp) throws SQLException { + if (newState == PIndexState.ACTIVE) { + Preconditions.checkArgument(indexDisableTimestamp == 0, + "Index disable timestamp has to be 0 when marking an index as active"); + } + byte[] indexTableKey = SchemaUtil.getTableKeyFromFullName(indexTableName); + String schemaName = SchemaUtil.getSchemaNameFromFullName(indexTableName); + String indexName = SchemaUtil.getTableNameFromFullName(indexTableName); + // Mimic the Put that gets generated by the client on an update of the + // index state + Put put = new Put(indexTableKey); + put.addColumn(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES, PhoenixDatabaseMetaData.INDEX_STATE_BYTES, + newState.getSerializedBytes()); + if (indexDisableTimestamp != null) { + put.addColumn(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES, + PhoenixDatabaseMetaData.INDEX_DISABLE_TIMESTAMP_BYTES, + PLong.INSTANCE.toBytes(indexDisableTimestamp)); + } + if (newState == PIndexState.ACTIVE) { + put.addColumn(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES, + PhoenixDatabaseMetaData.ASYNC_REBUILD_TIMESTAMP_BYTES, PLong.INSTANCE.toBytes(0)); + } + final List<Mutation> tableMetadata = Collections.<Mutation> singletonList(put); + MetaDataMutationResult result = conn.getQueryServices().updateIndexState(tableMetadata, null); + MutationCode code = result.getMutationCode(); + if (code == MutationCode.TABLE_NOT_FOUND) { + throw new TableNotFoundException(schemaName, indexName); + } + if (code == MutationCode.UNALLOWED_TABLE_MUTATION) { + throw new SQLExceptionInfo.Builder(SQLExceptionCode.INVALID_INDEX_STATE_TRANSITION) + .setMessage("indexState=" + newState).setSchemaName(schemaName) + .setTableName(indexName).build().buildException(); + } + } }
http://git-wip-us.apache.org/repos/asf/phoenix/blob/8bc58328/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/TestLocalTableState.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/TestLocalTableState.java b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/TestLocalTableState.java index 941493e..dd027b0 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/TestLocalTableState.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/TestLocalTableState.java @@ -37,6 +37,7 @@ import org.apache.phoenix.hbase.index.covered.data.LocalHBaseState; import org.apache.phoenix.hbase.index.covered.data.LocalTable; import org.apache.phoenix.hbase.index.covered.update.ColumnReference; import org.apache.phoenix.hbase.index.scanner.Scanner; +import org.apache.phoenix.hbase.index.scanner.ScannerBuilder.CoveredDeleteScanner; import org.junit.Test; import org.mockito.Mockito; import org.mockito.invocation.InvocationOnMock; @@ -102,7 +103,7 @@ public class TestLocalTableState { ColumnReference col = new ColumnReference(fam, qual); table.setCurrentTimestamp(ts); //check that our value still shows up first on scan, even though this is a lazy load - Pair<Scanner, IndexUpdate> p = table.getIndexedColumnsTableState(Arrays.asList(col), false, false, indexMetaData); + Pair<CoveredDeleteScanner, IndexUpdate> p = table.getIndexedColumnsTableState(Arrays.asList(col), false, false, indexMetaData); Scanner s = p.getFirst(); assertEquals("Didn't get the pending mutation's value first", m.get(fam, qual).get(0), s.next()); } @@ -185,7 +186,7 @@ public class TestLocalTableState { ColumnReference col = new ColumnReference(fam, qual); table.setCurrentTimestamp(ts); //check that our value still shows up first on scan, even though this is a lazy load - Pair<Scanner, IndexUpdate> p = table.getIndexedColumnsTableState(Arrays.asList(col), false, false, indexMetaData); + Pair<CoveredDeleteScanner, IndexUpdate> p = table.getIndexedColumnsTableState(Arrays.asList(col), false, false, indexMetaData); Scanner s = p.getFirst(); assertEquals("Didn't get the pending mutation's value first", m.get(fam, qual).get(0), s.next()); } @@ -229,7 +230,7 @@ public class TestLocalTableState { ColumnReference col = new ColumnReference(fam, qual); table.setCurrentTimestamp(ts); // check that the value is there - Pair<Scanner, IndexUpdate> p = table.getIndexedColumnsTableState(Arrays.asList(col), false, false, indexMetaData); + Pair<CoveredDeleteScanner, IndexUpdate> p = table.getIndexedColumnsTableState(Arrays.asList(col), false, false, indexMetaData); Scanner s = p.getFirst(); assertEquals("Didn't get the pending mutation's value first", kv, s.next()); @@ -273,7 +274,7 @@ public class TestLocalTableState { ColumnReference col = new ColumnReference(fam, qual); table.setCurrentTimestamp(ts); // check that the value is there - Pair<Scanner, IndexUpdate> p = table.getIndexedColumnsTableState(Arrays.asList(col), false, false, indexMetaData); + Pair<CoveredDeleteScanner, IndexUpdate> p = table.getIndexedColumnsTableState(Arrays.asList(col), false, false, indexMetaData); Scanner s = p.getFirst(); // make sure it read the table the one time assertEquals("Didn't get the stored keyvalue!", storedKv, s.next()); http://git-wip-us.apache.org/repos/asf/phoenix/blob/8bc58328/phoenix-core/src/test/java/org/apache/phoenix/index/IndexMaintainerTest.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/test/java/org/apache/phoenix/index/IndexMaintainerTest.java b/phoenix-core/src/test/java/org/apache/phoenix/index/IndexMaintainerTest.java index 6aabacf..dbf67fc 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/index/IndexMaintainerTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/index/IndexMaintainerTest.java @@ -34,6 +34,7 @@ import java.util.Map; import java.util.Properties; import java.util.Set; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.Put; @@ -77,7 +78,7 @@ public class IndexMaintainerTest extends BaseConnectionlessQueryTest { return new ValueGetter() { @Override - public ImmutableBytesWritable getLatestValue(ColumnReference ref) { + public ImmutableBytesWritable getLatestValue(ColumnReference ref, long ts) { return new ImmutableBytesPtr(valueMap.get(ref)); } @@ -140,7 +141,7 @@ public class IndexMaintainerTest extends BaseConnectionlessQueryTest { Mutation indexMutation = indexMutations.get(0); ImmutableBytesWritable indexKeyPtr = new ImmutableBytesWritable(indexMutation.getRow()); ptr.set(rowKeyPtr.get(), rowKeyPtr.getOffset(), rowKeyPtr.getLength()); - byte[] mutablelndexRowKey = im1.buildRowKey(valueGetter, ptr, null, null); + byte[] mutablelndexRowKey = im1.buildRowKey(valueGetter, ptr, null, null, HConstants.LATEST_TIMESTAMP); byte[] immutableIndexRowKey = indexKeyPtr.copyBytes(); assertArrayEquals(immutableIndexRowKey, mutablelndexRowKey); for (ColumnReference ref : im1.getCoveredColumns()) { http://git-wip-us.apache.org/repos/asf/phoenix/blob/8bc58328/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java b/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java index 8a5a8e4..266f4da 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java @@ -41,6 +41,7 @@ import java.sql.Date; import java.sql.DriverManager; import java.sql.PreparedStatement; import java.sql.ResultSet; +import java.sql.ResultSetMetaData; import java.sql.SQLException; import java.sql.Statement; import java.util.ArrayList; @@ -107,6 +108,7 @@ import org.apache.phoenix.query.QueryConstants; import org.apache.phoenix.query.QueryServices; import org.apache.phoenix.query.QueryServicesOptions; import org.apache.phoenix.schema.PColumn; +import org.apache.phoenix.schema.PIndexState; import org.apache.phoenix.schema.PLongColumn; import org.apache.phoenix.schema.PName; import org.apache.phoenix.schema.PTable; @@ -120,6 +122,7 @@ import org.apache.phoenix.schema.stats.GuidePostsKey; import org.apache.phoenix.schema.tuple.Tuple; import org.apache.phoenix.schema.types.PDataType; +import com.google.common.base.Objects; import com.google.common.collect.Lists; @@ -854,4 +857,141 @@ public class TestUtil { System.out.println("-----------------------------------------------"); } + public static void waitForIndexRebuild(Connection conn, String fullIndexName, PIndexState expectedIndexState) throws InterruptedException, SQLException { + boolean isActive = false; + String schema = SchemaUtil.getSchemaNameFromFullName(fullIndexName); + String index = SchemaUtil.getTableNameFromFullName(fullIndexName); + int maxTries = 12, nTries = 0; + do { + Thread.sleep(5 * 1000); // sleep 5 secs + String query = "SELECT CAST(" + PhoenixDatabaseMetaData.INDEX_DISABLE_TIMESTAMP + " AS BIGINT) FROM " + + PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME + " WHERE (" + PhoenixDatabaseMetaData.TABLE_SCHEM + "," + PhoenixDatabaseMetaData.TABLE_NAME + + ") = (" + "'" + schema + "','" + index + "') " + + "AND " + PhoenixDatabaseMetaData.COLUMN_FAMILY + " IS NULL AND " + PhoenixDatabaseMetaData.COLUMN_NAME + " IS NULL" + + " AND " + PhoenixDatabaseMetaData.INDEX_STATE + " = '" + expectedIndexState.getSerializedValue() + "'"; + ResultSet rs = conn.createStatement().executeQuery(query); + assertTrue(rs.next()); + if (expectedIndexState == PIndexState.ACTIVE) { + if (rs.getLong(1) == 0 && !rs.wasNull()) { + isActive = true; + break; + } + } + } while (++nTries < maxTries); + if (expectedIndexState == PIndexState.ACTIVE) { + assertTrue(isActive); + } + } + + public static long scutinizeIndex(Connection conn, String fullTableName, String fullIndexName) throws SQLException { + PhoenixConnection pconn = conn.unwrap(PhoenixConnection.class); + PTable ptable = pconn.getTable(new PTableKey(pconn.getTenantId(), fullTableName)); + PTable pindex = pconn.getTable(new PTableKey(pconn.getTenantId(), fullIndexName)); + StringBuilder indexQueryBuf = new StringBuilder("SELECT "); + for (PColumn dcol : ptable.getPKColumns()) { + indexQueryBuf.append("CAST(\"" + IndexUtil.getIndexColumnName(dcol) + "\" AS " + dcol.getDataType().getSqlTypeName() + ")"); + indexQueryBuf.append(","); + } + for (PColumn icol : pindex.getColumns()) { + PColumn dcol = IndexUtil.getDataColumn(ptable, icol.getName().getString()); + if (SchemaUtil.isPKColumn(icol) && !SchemaUtil.isPKColumn(dcol)) { + indexQueryBuf.append("CAST (\"" + icol.getName().getString() + "\" AS " + dcol.getDataType().getSqlTypeName() + ")"); + indexQueryBuf.append(","); + } + } + for (PColumn icol : pindex.getColumns()) { + if (!SchemaUtil.isPKColumn(icol)) { + PColumn dcol = IndexUtil.getDataColumn(ptable, icol.getName().getString()); + indexQueryBuf.append("CAST (\"" + icol.getName().getString() + "\" AS " + dcol.getDataType().getSqlTypeName() + ")"); + indexQueryBuf.append(","); + } + } + indexQueryBuf.setLength(indexQueryBuf.length()-1); + indexQueryBuf.append("\nFROM " + fullIndexName); + + StringBuilder tableQueryBuf = new StringBuilder("SELECT "); + for (PColumn dcol : ptable.getPKColumns()) { + tableQueryBuf.append("\"" + dcol.getName().getString() + "\""); + tableQueryBuf.append(","); + } + for (PColumn icol : pindex.getColumns()) { + PColumn dcol = IndexUtil.getDataColumn(ptable, icol.getName().getString()); + if (SchemaUtil.isPKColumn(icol) && !SchemaUtil.isPKColumn(dcol)) { + if (dcol.getFamilyName() != null) { + tableQueryBuf.append("\"" + dcol.getFamilyName().getString() + "\""); + tableQueryBuf.append("."); + } + tableQueryBuf.append("\"" + dcol.getName().getString() + "\""); + tableQueryBuf.append(","); + } + } + for (PColumn icol : pindex.getColumns()) { + if (!SchemaUtil.isPKColumn(icol)) { + PColumn dcol = IndexUtil.getDataColumn(ptable, icol.getName().getString()); + if (dcol.getFamilyName() != null) { + tableQueryBuf.append("\"" + dcol.getFamilyName().getString() + "\""); + tableQueryBuf.append("."); + } + tableQueryBuf.append("\"" + dcol.getName().getString() + "\""); + tableQueryBuf.append(","); + } + } + tableQueryBuf.setLength(tableQueryBuf.length()-1); + tableQueryBuf.append("\nFROM " + fullTableName + "\nWHERE ("); + for (PColumn dcol : ptable.getPKColumns()) { + tableQueryBuf.append("\"" + dcol.getName().getString() + "\""); + tableQueryBuf.append(","); + } + tableQueryBuf.setLength(tableQueryBuf.length()-1); + tableQueryBuf.append(") = (("); + for (int i = 0; i < ptable.getPKColumns().size(); i++) { + tableQueryBuf.append("?"); + tableQueryBuf.append(","); + } + tableQueryBuf.setLength(tableQueryBuf.length()-1); + tableQueryBuf.append("))"); + + String tableQuery = tableQueryBuf.toString(); + PreparedStatement istmt = conn.prepareStatement(tableQuery); + + String indexQuery = indexQueryBuf.toString(); + ResultSet irs = conn.createStatement().executeQuery(indexQuery); + ResultSetMetaData irsmd = irs.getMetaData(); + long icount = 0; + while (irs.next()) { + icount++; + StringBuilder pkBuf = new StringBuilder("("); + for (int i = 0; i < ptable.getPKColumns().size(); i++) { + PColumn dcol = ptable.getPKColumns().get(i); + Object pkVal = irs.getObject(i+1); + PDataType pkType = PDataType.fromTypeId(irsmd.getColumnType(i + 1)); + istmt.setObject(i+1, pkVal, dcol.getDataType().getSqlType()); + pkBuf.append(pkType.toStringLiteral(pkVal)); + pkBuf.append(","); + } + pkBuf.setLength(pkBuf.length()-1); + pkBuf.append(")"); + ResultSet drs = istmt.executeQuery(); + ResultSetMetaData drsmd = drs.getMetaData(); + assertTrue("Expected to find PK in data table: " + pkBuf, drs.next()); + for (int i = 0; i < irsmd.getColumnCount(); i++) { + Object iVal = irs.getObject(i + 1); + PDataType iType = PDataType.fromTypeId(irsmd.getColumnType(i + 1)); + Object dVal = drs.getObject(i + 1); + PDataType dType = PDataType.fromTypeId(drsmd.getColumnType(i + 1)); + assertTrue("Expected equality for " + drsmd.getColumnName(i + 1) + ", but " + iType.toStringLiteral(iVal) + "!=" + dType.toStringLiteral(dVal), Objects.equal(iVal, dVal)); + } + } + + long dcount = getRowCount(conn, fullTableName); + assertEquals("Expected data table row count to match", icount, dcount); + return icount; + } + + private static long getRowCount(Connection conn, String tableName) throws SQLException { + ResultSet rs = conn.createStatement().executeQuery("SELECT /*+ NO_INDEX */ count(*) FROM " + tableName); + assertTrue(rs.next()); + return rs.getLong(1); + } + }
