PHOENIX-4943 Write Omid shadow cells when building global index synchronous
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/c2446305 Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/c2446305 Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/c2446305 Branch: refs/heads/parameterize-tx-provider Commit: c24463050fbb79a8a605117caeeddd9f74882143 Parents: 5585cec Author: James Taylor <[email protected]> Authored: Wed Oct 3 23:05:40 2018 -0700 Committer: James Taylor <[email protected]> Committed: Wed Oct 3 23:05:40 2018 -0700 ---------------------------------------------------------------------- .../phoenix/end2end/index/BaseIndexIT.java | 75 +++++++++++++++++++- .../phoenix/end2end/index/MutableIndexIT.java | 1 - .../apache/phoenix/compile/UpsertCompiler.java | 14 ++-- .../UngroupedAggregateRegionObserver.java | 13 ++-- .../apache/phoenix/execute/MutationState.java | 7 +- .../transaction/OmidTransactionProvider.java | 2 +- .../transaction/PhoenixTransactionProvider.java | 2 +- .../transaction/TephraTransactionProvider.java | 2 +- 8 files changed, 101 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/c2446305/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/BaseIndexIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/BaseIndexIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/BaseIndexIT.java index 16d8cca..334828b 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/BaseIndexIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/BaseIndexIT.java @@ -67,11 +67,13 @@ import org.apache.phoenix.jdbc.PhoenixStatement; import org.apache.phoenix.parse.NamedTableNode; import org.apache.phoenix.parse.TableName; import org.apache.phoenix.query.BaseTest; +import org.apache.phoenix.query.QueryServices; import org.apache.phoenix.schema.PTable; import org.apache.phoenix.schema.PTableImpl; import org.apache.phoenix.schema.PTableKey; import org.apache.phoenix.schema.PTableType; import org.apache.phoenix.transaction.PhoenixTransactionProvider.Feature; +import org.apache.phoenix.transaction.TransactionFactory; import org.apache.phoenix.util.DateUtil; import org.apache.phoenix.util.EnvironmentEdgeManager; import org.apache.phoenix.util.PhoenixRuntime; @@ -91,6 +93,7 @@ public abstract class BaseIndexIT extends ParallelStatsDisabledIT { private final boolean localIndex; private final boolean transactional; + private final TransactionFactory.Provider transactionProvider; private final boolean mutable; private final String tableDDLOptions; @@ -116,6 +119,9 @@ public abstract class BaseIndexIT extends ParallelStatsDisabledIT { if (optionBuilder.length()!=0) optionBuilder.append(","); optionBuilder.append(" TRANSACTIONAL=true,TRANSACTION_PROVIDER='" + transactionProvider + "'"); + this.transactionProvider = TransactionFactory.Provider.valueOf(transactionProvider); + } else { + this.transactionProvider = null; } this.tableDDLOptions = optionBuilder.toString(); } @@ -927,7 +933,6 @@ public abstract class BaseIndexIT extends ParallelStatsDisabledIT { stmt.setString(2, "y"); stmt.setString(3, "2"); stmt.execute(); - conn.commit(); query = "SELECT * FROM " + fullTableName + " WHERE \"v2\" = '1'"; rs = conn.createStatement().executeQuery("EXPLAIN " + query); @@ -948,6 +953,18 @@ public abstract class BaseIndexIT extends ParallelStatsDisabledIT { assertEquals("1",rs.getString("v2")); assertFalse(rs.next()); + // Shadow cells shouldn't exist yet since commit hasn't happened + if (transactional && transactionProvider == TransactionFactory.Provider.OMID) { + assertShadowCellsDoNotExist(conn, fullTableName, fullIndexName); + } + + conn.commit(); + + // Confirm shadow cells exist after commit + if (transactional && transactionProvider == TransactionFactory.Provider.OMID) { + assertShadowCellsExist(conn, fullTableName, fullIndexName); + } + query = "SELECT \"V1\", \"V1\" as foo1, \"v2\" as foo, \"v2\" as \"Foo1\", \"v2\" FROM " + fullTableName + " ORDER BY foo"; rs = conn.createStatement().executeQuery("EXPLAIN " + query); if(localIndex){ @@ -1033,12 +1050,22 @@ public abstract class BaseIndexIT extends ParallelStatsDisabledIT { } @Test - public void testIndexWithDecimalCol() throws Exception { + public void testIndexWithDecimalColServerSideUpsert() throws Exception { + testIndexWithDecimalCol(true); + } + + @Test + public void testIndexWithDecimalColClientSideUpsert() throws Exception { + testIndexWithDecimalCol(false); + } + + private void testIndexWithDecimalCol(boolean enableServerSideUpsert) throws Exception { Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); String tableName = "TBL_" + generateUniqueName(); String indexName = "IND_" + generateUniqueName(); String fullTableName = SchemaUtil.getTableName(TestUtil.DEFAULT_SCHEMA_NAME, tableName); String fullIndexName = SchemaUtil.getTableName(TestUtil.DEFAULT_SCHEMA_NAME, indexName); + props.setProperty(QueryServices.ENABLE_SERVER_UPSERT_SELECT, Boolean.toString(enableServerSideUpsert)); try (Connection conn = DriverManager.getConnection(getUrl(), props)) { conn.setAutoCommit(false); String query; @@ -1050,6 +1077,10 @@ public abstract class BaseIndexIT extends ParallelStatsDisabledIT { String ddl = null; ddl = "CREATE " + (localIndex ? "LOCAL " : "") + "INDEX " + indexName + " ON " + fullTableName + " (decimal_pk) INCLUDE (decimal_col1, decimal_col2)"; conn.createStatement().execute(ddl); + + if (transactional && transactionProvider == TransactionFactory.Provider.OMID) { + assertShadowCellsExist(conn, fullTableName, fullIndexName); + } query = "SELECT decimal_pk, decimal_col1, decimal_col2 from " + fullTableName ; rs = conn.createStatement().executeQuery("EXPLAIN " + query); @@ -1076,6 +1107,46 @@ public abstract class BaseIndexIT extends ParallelStatsDisabledIT { } } + private static void assertShadowCellsDoNotExist(Connection conn, String fullTableName, String fullIndexName) + throws Exception { + assertShadowCells(conn, fullTableName, fullIndexName, false); + } + + private static void assertShadowCellsExist(Connection conn, String fullTableName, String fullIndexName) + throws Exception { + assertShadowCells(conn, fullTableName, fullIndexName, true); + } + + private static void assertShadowCells(Connection conn, String fullTableName, String fullIndexName, boolean exists) + throws Exception { + PTable ptable = conn.unwrap(PhoenixConnection.class).getTable(new PTableKey(null, fullTableName)); + int nTableKVColumns = ptable.getColumns().size() - ptable.getPKColumns().size(); + HTableInterface hTable = conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(Bytes.toBytes(fullTableName)); + ResultScanner tableScanner = hTable.getScanner(new Scan()); + Result tableResult; + PTable pindex = conn.unwrap(PhoenixConnection.class).getTable(new PTableKey(null, fullIndexName)); + int nIndexKVColumns = pindex.getColumns().size() - pindex.getPKColumns().size(); + HTableInterface hIndex = conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(Bytes.toBytes(fullIndexName)); + ResultScanner indexScanner = hIndex.getScanner(new Scan()); + Result indexResult; + while ((indexResult = indexScanner.next()) != null) { + int nColumns = 0; + CellScanner scanner = indexResult.cellScanner(); + while (scanner.advance()) { + nColumns++; + } + assertEquals(exists, nColumns > nIndexKVColumns * 2); + assertNotNull(tableResult = tableScanner.next()); + nColumns = 0; + scanner = tableResult.cellScanner(); + while (scanner.advance()) { + nColumns++; + } + assertEquals(exists, nColumns > nTableKVColumns * 2); + } + assertNull(tableScanner.next()); + } + /** * Ensure that HTD contains table priorities correctly. */ http://git-wip-us.apache.org/repos/asf/phoenix/blob/c2446305/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexIT.java index 2a58a6e..a994094 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexIT.java @@ -114,7 +114,6 @@ public class MutableIndexIT extends ParallelStatsDisabledIT { { false, "OMID", false }, { true, null, false }, { true, null, true }, { true, "TEPHRA", false }, { true, "TEPHRA", true }, - { true, "OMID", false }, }),1); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/c2446305/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java index 4f08846..61be561 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java @@ -109,7 +109,6 @@ import org.apache.phoenix.schema.types.PVarbinary; import org.apache.phoenix.util.ByteUtil; import org.apache.phoenix.util.ExpressionUtil; import org.apache.phoenix.util.IndexUtil; -import org.apache.phoenix.util.MetaDataUtil; import org.apache.phoenix.util.ScanUtil; import org.apache.phoenix.util.SchemaUtil; @@ -541,7 +540,11 @@ public class UpsertCompiler { // Disable running upsert select on server side if a table has global mutable secondary indexes on it boolean hasGlobalMutableIndexes = SchemaUtil.hasGlobalIndex(table) && !table.isImmutableRows(); boolean hasWhereSubquery = select.getWhere() != null && select.getWhere().hasSubquery(); - runOnServer = (sameTable || (serverUpsertSelectEnabled && !hasGlobalMutableIndexes)) && isAutoCommit && !table.isTransactional() + runOnServer = (sameTable || (serverUpsertSelectEnabled && !hasGlobalMutableIndexes)) && isAutoCommit + // We can run the upsert select for initial index population on server side for transactional + // tables since the writes do not need to be done transactionally, since we gate the index + // usage on successfully writing all data rows. + && (!table.isTransactional() || table.getType() == PTableType.INDEX) && !(table.isImmutableRows() && !table.getIndexes().isEmpty()) && !select.isJoin() && !hasWhereSubquery && table.getRowTimestampColPos() == -1; } @@ -1039,12 +1042,15 @@ public class UpsertCompiler { byte[] txState = table.isTransactional() ? connection.getMutationState().encodeTransaction() : ByteUtil.EMPTY_BYTE_ARRAY; + ScanUtil.setClientVersion(scan, MetaDataProtocol.PHOENIX_VERSION); + if (aggPlan.getTableRef().getTable().isTransactional() + || (table.getType() == PTableType.INDEX && table.isTransactional())) { + scan.setAttribute(BaseScannerRegionObserver.TX_STATE, txState); + } if (ptr.getLength() > 0) { byte[] uuidValue = ServerCacheClient.generateId(); scan.setAttribute(PhoenixIndexCodec.INDEX_UUID, uuidValue); scan.setAttribute(PhoenixIndexCodec.INDEX_PROTO_MD, ptr.get()); - scan.setAttribute(BaseScannerRegionObserver.TX_STATE, txState); - ScanUtil.setClientVersion(scan, MetaDataProtocol.PHOENIX_VERSION); } ResultIterator iterator = aggPlan.iterator(); try { http://git-wip-us.apache.org/repos/asf/phoenix/blob/c2446305/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java index 96d7504..a667316 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java @@ -431,6 +431,12 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver byte[] replayMutations = scan.getAttribute(BaseScannerRegionObserver.REPLAY_WRITES); byte[] indexUUID = scan.getAttribute(PhoenixIndexCodec.INDEX_UUID); byte[] txState = scan.getAttribute(BaseScannerRegionObserver.TX_STATE); + byte[] clientVersionBytes = scan.getAttribute(BaseScannerRegionObserver.CLIENT_VERSION); + PhoenixTransactionProvider txnProvider = null; + if (txState != null) { + int clientVersion = clientVersionBytes == null ? ScanUtil.UNKNOWN_CLIENT_VERSION : Bytes.toInt(clientVersionBytes); + txnProvider = TransactionFactory.getTransactionProvider(txState, clientVersion); + } List<Expression> selectExpressions = null; byte[] upsertSelectTable = scan.getAttribute(BaseScannerRegionObserver.UPSERT_SELECT_TABLE); boolean isUpsert = false; @@ -537,7 +543,6 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver useIndexProto = false; } - byte[] clientVersionBytes = scan.getAttribute(BaseScannerRegionObserver.CLIENT_VERSION); if(needToWrite) { synchronized (lock) { if (isRegionClosingOrSplitting) { @@ -661,9 +666,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver env.getRegion().getRegionInfo().getStartKey(), env.getRegion().getRegionInfo().getEndKey()); - if (txState != null) { - int clientVersion = clientVersionBytes == null ? ScanUtil.UNKNOWN_CLIENT_VERSION : Bytes.toInt(clientVersionBytes); - PhoenixTransactionProvider txnProvider = TransactionFactory.getTransactionProvider(txState, clientVersion); + if (txnProvider != null) { put = txnProvider.markPutAsCommitted(put, ts, ts); } indexMutations.add(put); @@ -733,6 +736,8 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver for (Mutation mutation : row.toRowMutations()) { if (replayMutations != null) { mutation.setAttribute(REPLAY_WRITES, replayMutations); + } else if (txnProvider != null && projectedTable.getType() == PTableType.INDEX) { + mutation = txnProvider.markPutAsCommitted((Put)mutation, ts, ts); } mutations.add(mutation); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/c2446305/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java index ee5a9c5..14f13b3 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java @@ -966,7 +966,12 @@ public class MutationState implements SQLCloseable { uncommittedPhysicalNames.add(table.getPhysicalName().getString()); phoenixTransactionContext.markDMLFence(table); } - hTable = phoenixTransactionContext.getTransactionalTableWriter(connection, table, hTable, !tableInfo.isDataTable()); + // Only pass true for last argument if the index is being written to on it's own (i.e. initial + // index population), not if it's being written to for normal maintenance due to writes to + // the data table. This case is different because the initial index population does not need + // to be done transactionally since the index is only made active after all writes have + // occurred successfully. + hTable = phoenixTransactionContext.getTransactionalTableWriter(connection, table, hTable, tableInfo.isDataTable() && table.getType() == PTableType.INDEX); } numMutations = mutationList.size(); GLOBAL_MUTATION_BATCH_SIZE.update(numMutations); http://git-wip-us.apache.org/repos/asf/phoenix/blob/c2446305/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionProvider.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionProvider.java b/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionProvider.java index be49e24..7831ccc 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionProvider.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionProvider.java @@ -105,7 +105,7 @@ public class OmidTransactionProvider implements PhoenixTransactionProvider { } @Override - public Put markPutAsCommitted(Put put, long timestamp, long commitTimestamp) throws IOException { + public Put markPutAsCommitted(Put put, long timestamp, long commitTimestamp) { return null; } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/c2446305/phoenix-core/src/main/java/org/apache/phoenix/transaction/PhoenixTransactionProvider.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/transaction/PhoenixTransactionProvider.java b/phoenix-core/src/main/java/org/apache/phoenix/transaction/PhoenixTransactionProvider.java index 72424fb..b7f660e 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/transaction/PhoenixTransactionProvider.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/transaction/PhoenixTransactionProvider.java @@ -65,5 +65,5 @@ public interface PhoenixTransactionProvider { * @return put operation with metadata * @throws IOException */ - public Put markPutAsCommitted(Put put, long timestamp, long commitTimestamp) throws IOException; + public Put markPutAsCommitted(Put put, long timestamp, long commitTimestamp); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/c2446305/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionProvider.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionProvider.java b/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionProvider.java index c251133..70937cf 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionProvider.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionProvider.java @@ -205,7 +205,7 @@ public class TephraTransactionProvider implements PhoenixTransactionProvider { } @Override - public Put markPutAsCommitted(Put put, long timestamp, long commitTimestamp) throws IOException { + public Put markPutAsCommitted(Put put, long timestamp, long commitTimestamp) { return put; } }
