Repository: phoenix Updated Branches: refs/heads/omid2 6dbfe5d55 -> 52e1ab589
PHOENIX-4943 Write Omid shadow cells when building global index synchronous Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/52e1ab58 Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/52e1ab58 Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/52e1ab58 Branch: refs/heads/omid2 Commit: 52e1ab589b17aae704f825d492809730ac139737 Parents: 6dbfe5d Author: James Taylor <[email protected]> Authored: Wed Oct 3 23:00:58 2018 -0700 Committer: James Taylor <[email protected]> Committed: Wed Oct 3 23:00:58 2018 -0700 ---------------------------------------------------------------------- .../phoenix/end2end/index/BaseIndexIT.java | 75 +++++++++++++++++++- .../phoenix/end2end/index/MutableIndexIT.java | 1 - .../apache/phoenix/compile/UpsertCompiler.java | 14 ++-- .../UngroupedAggregateRegionObserver.java | 13 ++-- .../apache/phoenix/execute/MutationState.java | 7 +- .../transaction/OmidTransactionContext.java | 5 +- .../transaction/OmidTransactionProvider.java | 2 +- .../transaction/OmidTransactionTable.java | 14 ++-- .../transaction/PhoenixTransactionProvider.java | 2 +- .../transaction/TephraTransactionProvider.java | 2 +- 10 files changed, 115 insertions(+), 20 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/52e1ab58/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/BaseIndexIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/BaseIndexIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/BaseIndexIT.java index 16d8cca..334828b 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/BaseIndexIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/BaseIndexIT.java @@ -67,11 +67,13 @@ import org.apache.phoenix.jdbc.PhoenixStatement; import org.apache.phoenix.parse.NamedTableNode; import org.apache.phoenix.parse.TableName; import org.apache.phoenix.query.BaseTest; +import org.apache.phoenix.query.QueryServices; import org.apache.phoenix.schema.PTable; import org.apache.phoenix.schema.PTableImpl; import org.apache.phoenix.schema.PTableKey; import org.apache.phoenix.schema.PTableType; import org.apache.phoenix.transaction.PhoenixTransactionProvider.Feature; +import org.apache.phoenix.transaction.TransactionFactory; import org.apache.phoenix.util.DateUtil; import org.apache.phoenix.util.EnvironmentEdgeManager; import org.apache.phoenix.util.PhoenixRuntime; @@ -91,6 +93,7 @@ public abstract class BaseIndexIT extends ParallelStatsDisabledIT { private final boolean localIndex; private final boolean transactional; + private final TransactionFactory.Provider transactionProvider; private final boolean mutable; private final String tableDDLOptions; @@ -116,6 +119,9 @@ public abstract class BaseIndexIT extends ParallelStatsDisabledIT { if (optionBuilder.length()!=0) optionBuilder.append(","); optionBuilder.append(" TRANSACTIONAL=true,TRANSACTION_PROVIDER='" + transactionProvider + "'"); + this.transactionProvider = TransactionFactory.Provider.valueOf(transactionProvider); + } else { + this.transactionProvider = null; } this.tableDDLOptions = optionBuilder.toString(); } @@ -927,7 +933,6 @@ public abstract class BaseIndexIT extends ParallelStatsDisabledIT { stmt.setString(2, "y"); stmt.setString(3, "2"); stmt.execute(); - conn.commit(); query = "SELECT * FROM " + fullTableName + " WHERE \"v2\" = '1'"; rs = conn.createStatement().executeQuery("EXPLAIN " + query); @@ -948,6 +953,18 @@ public abstract class BaseIndexIT extends ParallelStatsDisabledIT { assertEquals("1",rs.getString("v2")); assertFalse(rs.next()); + // Shadow cells shouldn't exist yet since commit hasn't happened + if (transactional && transactionProvider == TransactionFactory.Provider.OMID) { + assertShadowCellsDoNotExist(conn, fullTableName, fullIndexName); + } + + conn.commit(); + + // Confirm shadow cells exist after commit + if (transactional && transactionProvider == TransactionFactory.Provider.OMID) { + assertShadowCellsExist(conn, fullTableName, fullIndexName); + } + query = "SELECT \"V1\", \"V1\" as foo1, \"v2\" as foo, \"v2\" as \"Foo1\", \"v2\" FROM " + fullTableName + " ORDER BY foo"; rs = conn.createStatement().executeQuery("EXPLAIN " + query); if(localIndex){ @@ -1033,12 +1050,22 @@ public abstract class BaseIndexIT extends ParallelStatsDisabledIT { } @Test - public void testIndexWithDecimalCol() throws Exception { + public void testIndexWithDecimalColServerSideUpsert() throws Exception { + testIndexWithDecimalCol(true); + } + + @Test + public void testIndexWithDecimalColClientSideUpsert() throws Exception { + testIndexWithDecimalCol(false); + } + + private void testIndexWithDecimalCol(boolean enableServerSideUpsert) throws Exception { Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); String tableName = "TBL_" + generateUniqueName(); String indexName = "IND_" + generateUniqueName(); String fullTableName = SchemaUtil.getTableName(TestUtil.DEFAULT_SCHEMA_NAME, tableName); String fullIndexName = SchemaUtil.getTableName(TestUtil.DEFAULT_SCHEMA_NAME, indexName); + props.setProperty(QueryServices.ENABLE_SERVER_UPSERT_SELECT, Boolean.toString(enableServerSideUpsert)); try (Connection conn = DriverManager.getConnection(getUrl(), props)) { conn.setAutoCommit(false); String query; @@ -1050,6 +1077,10 @@ public abstract class BaseIndexIT extends ParallelStatsDisabledIT { String ddl = null; ddl = "CREATE " + (localIndex ? "LOCAL " : "") + "INDEX " + indexName + " ON " + fullTableName + " (decimal_pk) INCLUDE (decimal_col1, decimal_col2)"; conn.createStatement().execute(ddl); + + if (transactional && transactionProvider == TransactionFactory.Provider.OMID) { + assertShadowCellsExist(conn, fullTableName, fullIndexName); + } query = "SELECT decimal_pk, decimal_col1, decimal_col2 from " + fullTableName ; rs = conn.createStatement().executeQuery("EXPLAIN " + query); @@ -1076,6 +1107,46 @@ public abstract class BaseIndexIT extends ParallelStatsDisabledIT { } } + private static void assertShadowCellsDoNotExist(Connection conn, String fullTableName, String fullIndexName) + throws Exception { + assertShadowCells(conn, fullTableName, fullIndexName, false); + } + + private static void assertShadowCellsExist(Connection conn, String fullTableName, String fullIndexName) + throws Exception { + assertShadowCells(conn, fullTableName, fullIndexName, true); + } + + private static void assertShadowCells(Connection conn, String fullTableName, String fullIndexName, boolean exists) + throws Exception { + PTable ptable = conn.unwrap(PhoenixConnection.class).getTable(new PTableKey(null, fullTableName)); + int nTableKVColumns = ptable.getColumns().size() - ptable.getPKColumns().size(); + HTableInterface hTable = conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(Bytes.toBytes(fullTableName)); + ResultScanner tableScanner = hTable.getScanner(new Scan()); + Result tableResult; + PTable pindex = conn.unwrap(PhoenixConnection.class).getTable(new PTableKey(null, fullIndexName)); + int nIndexKVColumns = pindex.getColumns().size() - pindex.getPKColumns().size(); + HTableInterface hIndex = conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(Bytes.toBytes(fullIndexName)); + ResultScanner indexScanner = hIndex.getScanner(new Scan()); + Result indexResult; + while ((indexResult = indexScanner.next()) != null) { + int nColumns = 0; + CellScanner scanner = indexResult.cellScanner(); + while (scanner.advance()) { + nColumns++; + } + assertEquals(exists, nColumns > nIndexKVColumns * 2); + assertNotNull(tableResult = tableScanner.next()); + nColumns = 0; + scanner = tableResult.cellScanner(); + while (scanner.advance()) { + nColumns++; + } + assertEquals(exists, nColumns > nTableKVColumns * 2); + } + assertNull(tableScanner.next()); + } + /** * Ensure that HTD contains table priorities correctly. */ http://git-wip-us.apache.org/repos/asf/phoenix/blob/52e1ab58/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexIT.java index 2a58a6e..a994094 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexIT.java @@ -114,7 +114,6 @@ public class MutableIndexIT extends ParallelStatsDisabledIT { { false, "OMID", false }, { true, null, false }, { true, null, true }, { true, "TEPHRA", false }, { true, "TEPHRA", true }, - { true, "OMID", false }, }),1); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/52e1ab58/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java index 4f08846..61be561 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java @@ -109,7 +109,6 @@ import org.apache.phoenix.schema.types.PVarbinary; import org.apache.phoenix.util.ByteUtil; import org.apache.phoenix.util.ExpressionUtil; import org.apache.phoenix.util.IndexUtil; -import org.apache.phoenix.util.MetaDataUtil; import org.apache.phoenix.util.ScanUtil; import org.apache.phoenix.util.SchemaUtil; @@ -541,7 +540,11 @@ public class UpsertCompiler { // Disable running upsert select on server side if a table has global mutable secondary indexes on it boolean hasGlobalMutableIndexes = SchemaUtil.hasGlobalIndex(table) && !table.isImmutableRows(); boolean hasWhereSubquery = select.getWhere() != null && select.getWhere().hasSubquery(); - runOnServer = (sameTable || (serverUpsertSelectEnabled && !hasGlobalMutableIndexes)) && isAutoCommit && !table.isTransactional() + runOnServer = (sameTable || (serverUpsertSelectEnabled && !hasGlobalMutableIndexes)) && isAutoCommit + // We can run the upsert select for initial index population on server side for transactional + // tables since the writes do not need to be done transactionally, since we gate the index + // usage on successfully writing all data rows. + && (!table.isTransactional() || table.getType() == PTableType.INDEX) && !(table.isImmutableRows() && !table.getIndexes().isEmpty()) && !select.isJoin() && !hasWhereSubquery && table.getRowTimestampColPos() == -1; } @@ -1039,12 +1042,15 @@ public class UpsertCompiler { byte[] txState = table.isTransactional() ? connection.getMutationState().encodeTransaction() : ByteUtil.EMPTY_BYTE_ARRAY; + ScanUtil.setClientVersion(scan, MetaDataProtocol.PHOENIX_VERSION); + if (aggPlan.getTableRef().getTable().isTransactional() + || (table.getType() == PTableType.INDEX && table.isTransactional())) { + scan.setAttribute(BaseScannerRegionObserver.TX_STATE, txState); + } if (ptr.getLength() > 0) { byte[] uuidValue = ServerCacheClient.generateId(); scan.setAttribute(PhoenixIndexCodec.INDEX_UUID, uuidValue); scan.setAttribute(PhoenixIndexCodec.INDEX_PROTO_MD, ptr.get()); - scan.setAttribute(BaseScannerRegionObserver.TX_STATE, txState); - ScanUtil.setClientVersion(scan, MetaDataProtocol.PHOENIX_VERSION); } ResultIterator iterator = aggPlan.iterator(); try { http://git-wip-us.apache.org/repos/asf/phoenix/blob/52e1ab58/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java index 96d7504..a667316 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java @@ -431,6 +431,12 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver byte[] replayMutations = scan.getAttribute(BaseScannerRegionObserver.REPLAY_WRITES); byte[] indexUUID = scan.getAttribute(PhoenixIndexCodec.INDEX_UUID); byte[] txState = scan.getAttribute(BaseScannerRegionObserver.TX_STATE); + byte[] clientVersionBytes = scan.getAttribute(BaseScannerRegionObserver.CLIENT_VERSION); + PhoenixTransactionProvider txnProvider = null; + if (txState != null) { + int clientVersion = clientVersionBytes == null ? ScanUtil.UNKNOWN_CLIENT_VERSION : Bytes.toInt(clientVersionBytes); + txnProvider = TransactionFactory.getTransactionProvider(txState, clientVersion); + } List<Expression> selectExpressions = null; byte[] upsertSelectTable = scan.getAttribute(BaseScannerRegionObserver.UPSERT_SELECT_TABLE); boolean isUpsert = false; @@ -537,7 +543,6 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver useIndexProto = false; } - byte[] clientVersionBytes = scan.getAttribute(BaseScannerRegionObserver.CLIENT_VERSION); if(needToWrite) { synchronized (lock) { if (isRegionClosingOrSplitting) { @@ -661,9 +666,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver env.getRegion().getRegionInfo().getStartKey(), env.getRegion().getRegionInfo().getEndKey()); - if (txState != null) { - int clientVersion = clientVersionBytes == null ? ScanUtil.UNKNOWN_CLIENT_VERSION : Bytes.toInt(clientVersionBytes); - PhoenixTransactionProvider txnProvider = TransactionFactory.getTransactionProvider(txState, clientVersion); + if (txnProvider != null) { put = txnProvider.markPutAsCommitted(put, ts, ts); } indexMutations.add(put); @@ -733,6 +736,8 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver for (Mutation mutation : row.toRowMutations()) { if (replayMutations != null) { mutation.setAttribute(REPLAY_WRITES, replayMutations); + } else if (txnProvider != null && projectedTable.getType() == PTableType.INDEX) { + mutation = txnProvider.markPutAsCommitted((Put)mutation, ts, ts); } mutations.add(mutation); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/52e1ab58/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java index ee5a9c5..14f13b3 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java @@ -966,7 +966,12 @@ public class MutationState implements SQLCloseable { uncommittedPhysicalNames.add(table.getPhysicalName().getString()); phoenixTransactionContext.markDMLFence(table); } - hTable = phoenixTransactionContext.getTransactionalTableWriter(connection, table, hTable, !tableInfo.isDataTable()); + // Only pass true for last argument if the index is being written to on it's own (i.e. initial + // index population), not if it's being written to for normal maintenance due to writes to + // the data table. This case is different because the initial index population does not need + // to be done transactionally since the index is only made active after all writes have + // occurred successfully. + hTable = phoenixTransactionContext.getTransactionalTableWriter(connection, table, hTable, tableInfo.isDataTable() && table.getType() == PTableType.INDEX); } numMutations = mutationList.size(); GLOBAL_MUTATION_BATCH_SIZE.update(numMutations); http://git-wip-us.apache.org/repos/asf/phoenix/blob/52e1ab58/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionContext.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionContext.java b/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionContext.java index 42aeb08..f040ac1 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionContext.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionContext.java @@ -317,6 +317,9 @@ public class OmidTransactionContext implements PhoenixTransactionContext { @Override public Table getTransactionalTableWriter(PhoenixConnection connection, PTable table, Table htable, boolean isIndex) throws SQLException { - return new OmidTransactionTable(this, htable, table.isImmutableRows() || isIndex); + // When we're getting a table for writing, if the table being written to is an index, + // write the shadow cells immediately since the only time we write to an index is + // when we initially populate it synchronously. + return new OmidTransactionTable(this, htable, table.isImmutableRows() || isIndex, isIndex); } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/52e1ab58/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionProvider.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionProvider.java b/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionProvider.java index 25171f2..e81991a 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionProvider.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionProvider.java @@ -214,7 +214,7 @@ public class OmidTransactionProvider implements PhoenixTransactionProvider { } @Override - public Put markPutAsCommitted(Put put, long timestamp, long commitTimestamp) throws IOException { + public Put markPutAsCommitted(Put put, long timestamp, long commitTimestamp) { return TTable.markPutAsCommitted(put, timestamp, timestamp); } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/52e1ab58/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionTable.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionTable.java b/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionTable.java index bb6764c..cec7f59 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionTable.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/transaction/OmidTransactionTable.java @@ -58,10 +58,12 @@ public class OmidTransactionTable implements Table { private TTable tTable; private Transaction tx; + private final boolean addShadowCells; public OmidTransactionTable() throws SQLException { this.tTable = null; this.tx = null; + this.addShadowCells = false; } public OmidTransactionTable(PhoenixTransactionContext ctx, Table hTable) throws SQLException { @@ -69,10 +71,14 @@ public class OmidTransactionTable implements Table { } public OmidTransactionTable(PhoenixTransactionContext ctx, Table hTable, boolean isConflictFree) throws SQLException { + this(ctx, hTable, isConflictFree, false); + } + + public OmidTransactionTable(PhoenixTransactionContext ctx, Table hTable, boolean isConflictFree, boolean addShadowCells) throws SQLException { assert(ctx instanceof OmidTransactionContext); OmidTransactionContext omidTransactionContext = (OmidTransactionContext) ctx; - + this.addShadowCells = addShadowCells; try { tTable = new TTable(hTable, true, isConflictFree); } catch (IOException e) { @@ -92,7 +98,7 @@ public class OmidTransactionTable implements Table { @Override public void put(Put put) throws IOException { - tTable.put(tx, put); + tTable.put(tx, put, addShadowCells); } @Override @@ -139,7 +145,7 @@ public class OmidTransactionTable implements Table { @Override public void put(List<Put> puts) throws IOException { - tTable.put(tx, puts); + tTable.put(tx, puts, addShadowCells); } @Override @@ -166,7 +172,7 @@ public class OmidTransactionTable implements Table { @Override public void batch(List<? extends Row> actions, Object[] results) throws IOException, InterruptedException { - tTable.batch(tx, actions); + tTable.batch(tx, actions, addShadowCells); Arrays.fill(results, EMPTY_RESULT_EXISTS_TRUE); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/52e1ab58/phoenix-core/src/main/java/org/apache/phoenix/transaction/PhoenixTransactionProvider.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/transaction/PhoenixTransactionProvider.java b/phoenix-core/src/main/java/org/apache/phoenix/transaction/PhoenixTransactionProvider.java index 72424fb..b7f660e 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/transaction/PhoenixTransactionProvider.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/transaction/PhoenixTransactionProvider.java @@ -65,5 +65,5 @@ public interface PhoenixTransactionProvider { * @return put operation with metadata * @throws IOException */ - public Put markPutAsCommitted(Put put, long timestamp, long commitTimestamp) throws IOException; + public Put markPutAsCommitted(Put put, long timestamp, long commitTimestamp); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/52e1ab58/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionProvider.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionProvider.java b/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionProvider.java index c251133..70937cf 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionProvider.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/transaction/TephraTransactionProvider.java @@ -205,7 +205,7 @@ public class TephraTransactionProvider implements PhoenixTransactionProvider { } @Override - public Put markPutAsCommitted(Put put, long timestamp, long commitTimestamp) throws IOException { + public Put markPutAsCommitted(Put put, long timestamp, long commitTimestamp) { return put; } }
