Repository: phoenix Updated Branches: refs/heads/master e4c479ab0 -> 30c2e7555
PHOENIX-2478 Rows committed in transaction overlapping index creation are not populated Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/30c2e755 Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/30c2e755 Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/30c2e755 Branch: refs/heads/master Commit: 30c2e75556c365b059f23d43de5ac272f92aa285 Parents: e4c479a Author: James Taylor <[email protected]> Authored: Tue Jan 12 17:50:34 2016 -0800 Committer: James Taylor <[email protected]> Committed: Tue Jan 12 17:51:40 2016 -0800 ---------------------------------------------------------------------- .../apache/phoenix/end2end/index/IndexIT.java | 73 +++--- .../org/apache/phoenix/tx/TransactionIT.java | 2 - .../phoenix/compile/DelegateMutationPlan.java | 70 ++++++ .../phoenix/compile/PostIndexDDLCompiler.java | 15 +- .../phoenix/exception/SQLExceptionCode.java | 1 + .../apache/phoenix/execute/MutationState.java | 230 +++++++++++++++---- .../query/ConnectionQueryServicesImpl.java | 12 +- .../apache/phoenix/schema/MetaDataClient.java | 41 ++-- .../org/apache/phoenix/schema/TableRef.java | 6 +- .../org/apache/phoenix/util/SchemaUtil.java | 21 +- pom.xml | 2 +- 11 files changed, 357 insertions(+), 116 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/30c2e755/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexIT.java index f48b847..e369dae 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexIT.java @@ -45,6 +45,7 @@ import org.apache.phoenix.end2end.BaseHBaseManagedTimeIT; import org.apache.phoenix.end2end.Shadower; import org.apache.phoenix.exception.SQLExceptionCode; import org.apache.phoenix.jdbc.PhoenixConnection; +import org.apache.phoenix.jdbc.PhoenixStatement; import org.apache.phoenix.parse.NamedTableNode; import org.apache.phoenix.parse.TableName; import org.apache.phoenix.query.BaseTest; @@ -222,15 +223,19 @@ public class IndexIT extends BaseHBaseManagedTimeIT { @Test public void testCreateIndexAfterUpsertStarted() throws Exception { - if (transactional) { // FIXME: PHOENIX-2446 - return; + testCreateIndexAfterUpsertStarted(false, fullTableName + "1", fullIndexName + "1"); + if (transactional) { + testCreateIndexAfterUpsertStarted(true, fullTableName + "2", fullIndexName + "2"); } + } + + private void testCreateIndexAfterUpsertStarted(boolean readOwnWrites, String fullTableName, String fullIndexName) throws Exception { Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); try (Connection conn1 = DriverManager.getConnection(getUrl(), props)) { - conn1.setAutoCommit(false); + conn1.setAutoCommit(true); String ddl ="CREATE TABLE " + fullTableName + BaseTest.TEST_TABLE_SCHEMA + tableDDLOptions; - Statement stmt = conn1.createStatement(); - stmt.execute(ddl); + Statement stmt1 = conn1.createStatement(); + stmt1.execute(ddl); BaseTest.populateTestTable(fullTableName); ResultSet rs; @@ -243,32 +248,40 @@ public class IndexIT extends BaseHBaseManagedTimeIT { String upsert = "UPSERT INTO " + fullTableName + " VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"; - PreparedStatement pstmt = conn2.prepareStatement(upsert); - pstmt.setString(1, "varchar4"); - pstmt.setString(2, "char4"); - pstmt.setInt(3, 4); - pstmt.setLong(4, 4L); - pstmt.setBigDecimal(5, new BigDecimal(4.0)); + PreparedStatement pstmt2 = conn2.prepareStatement(upsert); + pstmt2.setString(1, "varchar4"); + pstmt2.setString(2, "char4"); + pstmt2.setInt(3, 4); + pstmt2.setLong(4, 4L); + pstmt2.setBigDecimal(5, new BigDecimal(4.0)); Date date = DateUtil.parseDate("2015-01-01 00:00:00"); - pstmt.setDate(6, date); - pstmt.setString(7, "varchar_a"); - pstmt.setString(8, "chara"); - pstmt.setInt(9, 2); - pstmt.setLong(10, 2L); - pstmt.setBigDecimal(11, new BigDecimal(2.0)); - pstmt.setDate(12, date); - pstmt.setString(13, "varchar_b"); - pstmt.setString(14, "charb"); - pstmt.setInt(15, 3); - pstmt.setLong(16, 3L); - pstmt.setBigDecimal(17, new BigDecimal(3.0)); - pstmt.setDate(18, date); - pstmt.executeUpdate(); + pstmt2.setDate(6, date); + pstmt2.setString(7, "varchar_a"); + pstmt2.setString(8, "chara"); + pstmt2.setInt(9, 2); + pstmt2.setLong(10, 2L); + pstmt2.setBigDecimal(11, new BigDecimal(2.0)); + pstmt2.setDate(12, date); + pstmt2.setString(13, "varchar_b"); + pstmt2.setString(14, "charb"); + pstmt2.setInt(15, 3); + pstmt2.setLong(16, 3L); + pstmt2.setBigDecimal(17, new BigDecimal(3.0)); + pstmt2.setDate(18, date); + pstmt2.executeUpdate(); + + if (readOwnWrites) { + String query = "SELECT long_pk FROM " + fullTableName + " WHERE long_pk=4"; + rs = conn2.createStatement().executeQuery(query); + assertTrue(rs.next()); + assertFalse(rs.next()); + } + String indexName = SchemaUtil.getTableNameFromFullName(fullIndexName); ddl = "CREATE " + (localIndex ? "LOCAL" : "") + " INDEX " + indexName + " ON " + fullTableName + " (long_pk, varchar_pk)" + " INCLUDE (long_col1, long_col2)"; - stmt.execute(ddl); + stmt1.execute(ddl); /* * Commit upsert after index created through different connection. @@ -276,16 +289,18 @@ public class IndexIT extends BaseHBaseManagedTimeIT { * at commit time, recognize the new index, and generate the correct metadata (or index * rows for immutable indexes). * - * FIXME: PHOENIX-2446. For transactional data, this is problematic because the index + * For transactional data, this is problematic because the index * gets a timestamp *after* the commit timestamp of conn2 and thus won't be seen during * the commit. Also, when the index is being built, the data hasn't yet been committed - * and thus won't be part of the initial index build. + * and thus won't be part of the initial index build (fixed by PHOENIX-2446). */ conn2.commit(); - rs = conn1.createStatement().executeQuery("SELECT COUNT(*) FROM " + fullIndexName); + stmt1 = conn1.createStatement(); + rs = stmt1.executeQuery("SELECT COUNT(*) FROM " + fullTableName); assertTrue(rs.next()); assertEquals(4,rs.getInt(1)); + assertEquals(fullIndexName, stmt1.unwrap(PhoenixStatement.class).getQueryPlan().getTableRef().getTable().getName().getString()); String query = "SELECT /*+ NO_INDEX */ long_pk FROM " + fullTableName; rs = conn1.createStatement().executeQuery(query); http://git-wip-us.apache.org/repos/asf/phoenix/blob/30c2e755/phoenix-core/src/it/java/org/apache/phoenix/tx/TransactionIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/tx/TransactionIT.java b/phoenix-core/src/it/java/org/apache/phoenix/tx/TransactionIT.java index b1c3510..e08225c 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/tx/TransactionIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/tx/TransactionIT.java @@ -772,7 +772,6 @@ public class TransactionIT extends BaseHBaseManagedTimeIT { } } - @Ignore("Add back once TEPHRA-162 gets fixed") @Test public void testInflightUpdateNotSeen() throws Exception { String selectSQL = "SELECT * FROM " + FULL_TABLE_NAME; @@ -824,7 +823,6 @@ public class TransactionIT extends BaseHBaseManagedTimeIT { } } - @Ignore("Add back once TEPHRA-162 gets fixed") @Test public void testInflightDeleteNotSeen() throws Exception { String selectSQL = "SELECT * FROM " + FULL_TABLE_NAME; http://git-wip-us.apache.org/repos/asf/phoenix/blob/30c2e755/phoenix-core/src/main/java/org/apache/phoenix/compile/DelegateMutationPlan.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/DelegateMutationPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/DelegateMutationPlan.java new file mode 100644 index 0000000..7ef5c48 --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/DelegateMutationPlan.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.compile; + +import java.sql.ParameterMetaData; +import java.sql.SQLException; +import java.util.Set; + +import org.apache.phoenix.execute.MutationState; +import org.apache.phoenix.jdbc.PhoenixStatement.Operation; +import org.apache.phoenix.schema.TableRef; + +public class DelegateMutationPlan implements MutationPlan { + @Override + public MutationState execute() throws SQLException { + return plan.execute(); + } + + @Override + public StatementContext getContext() { + return plan.getContext(); + } + + @Override + public TableRef getTargetRef() { + return plan.getTargetRef(); + } + + @Override + public ParameterMetaData getParameterMetaData() { + return plan.getParameterMetaData(); + } + + @Override + public ExplainPlan getExplainPlan() throws SQLException { + return plan.getExplainPlan(); + } + + @Override + public Set<TableRef> getSourceRefs() { + return plan.getSourceRefs(); + } + + @Override + public Operation getOperation() { + return plan.getOperation(); + } + + private final MutationPlan plan; + + public DelegateMutationPlan(MutationPlan plan) { + this.plan = plan; + } + +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/30c2e755/phoenix-core/src/main/java/org/apache/phoenix/compile/PostIndexDDLCompiler.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/PostIndexDDLCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/PostIndexDDLCompiler.java index f5bb4c4..bb0b595 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/PostIndexDDLCompiler.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/PostIndexDDLCompiler.java @@ -20,6 +20,7 @@ package org.apache.phoenix.compile; import java.sql.SQLException; import java.util.List; +import org.apache.phoenix.execute.MutationState; import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.jdbc.PhoenixStatement; import org.apache.phoenix.schema.PColumn; @@ -100,9 +101,10 @@ public class PostIndexDDLCompiler { } } + final PTable dataTable = dataTableRef.getTable(); dataColumns.setLength(dataColumns.length()-1); indexColumns.setLength(indexColumns.length()-1); - String schemaName = dataTableRef.getTable().getSchemaName().getString(); + String schemaName = dataTable.getSchemaName().getString(); String tableName = indexTable.getTableName().getString(); StringBuilder updateStmtStr = new StringBuilder(); @@ -110,12 +112,19 @@ public class PostIndexDDLCompiler { .append(indexColumns).append(") "); final StringBuilder selectQueryBuilder = new StringBuilder(); selectQueryBuilder.append(" SELECT ").append(dataColumns).append(" FROM ") - .append(schemaName.length() == 0 ? "" : '"' + schemaName + "\".").append('"').append(dataTableRef.getTable().getTableName().getString()).append('"'); + .append(schemaName.length() == 0 ? "" : '"' + schemaName + "\".").append('"').append(dataTable.getTableName().getString()).append('"'); this.selectQuery = selectQueryBuilder.toString(); updateStmtStr.append(this.selectQuery); try (final PhoenixStatement statement = new PhoenixStatement(connection)) { - return statement.compileMutation(updateStmtStr.toString()); + DelegateMutationPlan delegate = new DelegateMutationPlan(statement.compileMutation(updateStmtStr.toString())) { + @Override + public MutationState execute() throws SQLException { + connection.getMutationState().commitWriteFence(dataTable); + return super.execute(); + } + }; + return delegate; } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/30c2e755/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java index 38e8ea0..9767cbe 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java @@ -280,6 +280,7 @@ public enum SQLExceptionCode { TX_MUST_BE_ENABLED_TO_SET_TX_CONTEXT(1082, "44A13", "Cannot set transaction context if transactions are disabled"), TX_MUST_BE_ENABLED_TO_SET_AUTO_FLUSH(1083, "44A14", "Cannot set auto flush if transactions are disabled"), TX_MUST_BE_ENABLED_TO_SET_ISOLATION_LEVEL(1084, "44A15", "Cannot set isolation level to TRANSACTION_READ_COMMITTED or TRANSACTION_SERIALIZABLE if transactions are disabled"), + TX_UNABLE_TO_GET_WRITE_FENCE(1085, "44A16", "Unable to obtain write fence for DDL operation"), /** Sequence related */ SEQUENCE_ALREADY_EXIST(1200, "42Z00", "Sequence already exists.", new Factory() { http://git-wip-us.apache.org/repos/asf/phoenix/blob/30c2e755/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java index 6cac825..41d677a 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java @@ -31,6 +31,8 @@ import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import javax.annotation.Nonnull; import javax.annotation.concurrent.Immutable; @@ -66,9 +68,11 @@ import org.apache.phoenix.query.QueryConstants; import org.apache.phoenix.schema.IllegalDataException; import org.apache.phoenix.schema.MetaDataClient; import org.apache.phoenix.schema.PColumn; +import org.apache.phoenix.schema.PMetaData; import org.apache.phoenix.schema.PRow; import org.apache.phoenix.schema.PTable; import org.apache.phoenix.schema.PTable.IndexType; +import org.apache.phoenix.schema.PTableRef; import org.apache.phoenix.schema.PTableType; import org.apache.phoenix.schema.RowKeySchema; import org.apache.phoenix.schema.TableNotFoundException; @@ -82,6 +86,7 @@ import org.apache.phoenix.util.LogUtil; import org.apache.phoenix.util.PhoenixRuntime; import org.apache.phoenix.util.SQLCloseable; import org.apache.phoenix.util.SQLCloseables; +import org.apache.phoenix.util.SchemaUtil; import org.apache.phoenix.util.ServerUtil; import org.apache.phoenix.util.TransactionUtil; import org.slf4j.Logger; @@ -97,10 +102,13 @@ import co.cask.tephra.Transaction; import co.cask.tephra.Transaction.VisibilityLevel; import co.cask.tephra.TransactionAware; import co.cask.tephra.TransactionCodec; +import co.cask.tephra.TransactionConflictException; import co.cask.tephra.TransactionContext; import co.cask.tephra.TransactionFailureException; import co.cask.tephra.TransactionSystemClient; import co.cask.tephra.hbase11.TransactionAwareHTable; +import co.cask.tephra.visibility.FenceWait; +import co.cask.tephra.visibility.VisibilityFence; /** * @@ -126,6 +134,7 @@ public class MutationState implements SQLCloseable { private int numRows = 0; private int[] uncommittedStatementIndexes = EMPTY_STATEMENT_INDEX_ARRAY; private boolean isExternalTxContext = false; + private Map<TableRef, Map<ImmutableBytesPtr,RowMutationState>> txMutations; private final MutationMetricQueue mutationMetricQueue; private ReadMetricQueue readMetricQueue; @@ -151,7 +160,7 @@ public class MutationState implements SQLCloseable { } private MutationState(long maxSize, PhoenixConnection connection, Transaction tx, TransactionContext txContext, long sizeOffset) { - this(maxSize, connection, Maps.<TableRef, Map<ImmutableBytesPtr,RowMutationState>>newHashMapWithExpectedSize(connection.getMutateBatchSize()), tx, txContext); + this(maxSize, connection, Maps.<TableRef, Map<ImmutableBytesPtr,RowMutationState>>newHashMapWithExpectedSize(5), tx, txContext); this.sizeOffset = sizeOffset; } @@ -195,6 +204,53 @@ public class MutationState implements SQLCloseable { return maxSize; } + /** + * Commit a write fence when creating an index so that we can detect + * when a data table transaction is started before the create index + * but completes after it. In this case, we need to rerun the data + * table transaction after the index creation so that the index rows + * are generated. See {@link #addReadFence(PTable)} and TEPHRA-157 + * for more information. + * @param dataTable the data table upon which an index is being added + * @throws SQLException + */ + public void commitWriteFence(PTable dataTable) throws SQLException { + if (dataTable.isTransactional()) { + byte[] key = SchemaUtil.getTableKey(dataTable); + try { + FenceWait fenceWait = VisibilityFence.prepareWait(key, connection.getQueryServices().getTransactionSystemClient()); + fenceWait.await(10000, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new SQLExceptionInfo.Builder(SQLExceptionCode.INTERRUPTED_EXCEPTION).setRootCause(e).build().buildException(); + } catch (TimeoutException | TransactionFailureException e) { + throw new SQLExceptionInfo.Builder(SQLExceptionCode.TX_UNABLE_TO_GET_WRITE_FENCE) + .setSchemaName(dataTable.getSchemaName().getString()) + .setTableName(dataTable.getTableName().getString()) + .build().buildException(); + } finally { + // The client expects a transaction to be in progress on the txContext while the + // VisibilityFence.prepareWait() starts a new tx and finishes/aborts it. After it's + // finished, we start a new one here. + // TODO: seems like an autonomous tx capability in Tephra would be useful here. + try { + txContext.start(); + } catch (TransactionFailureException e) { + throw TransactionUtil.getTransactionFailureException(e); + } + } + } + } + + private void addReadFence(PTable dataTable) throws SQLException { + byte[] logicalKey = SchemaUtil.getTableKey(dataTable); + this.txContext.addTransactionAware(VisibilityFence.create(logicalKey)); + byte[] physicalKey = dataTable.getPhysicalName().getBytes(); + if (Bytes.compareTo(physicalKey, logicalKey) != 0) { + this.txContext.addTransactionAware(VisibilityFence.create(physicalKey)); + } + } + public boolean checkpointIfNeccessary(MutationPlan plan) throws SQLException { Transaction currentTx = getTransaction(); if (getTransaction() == null || plan.getTargetRef() == null || plan.getTargetRef().getTable() == null || !plan.getTargetRef().getTable().isTransactional()) { @@ -332,6 +388,12 @@ public class MutationState implements SQLCloseable { try { if (!isTransactionStarted()) { + // Clear any transactional state in case transaction was ended outside + // of Phoenix so we don't carry the old transaction state forward. We + // cannot call reset() here due to the case of having mutations and + // then transitioning from non transactional to transactional (which + // would end up clearing our uncommitted state). + resetTransactionalState(); txContext.start(); return true; } @@ -410,6 +472,10 @@ public class MutationState implements SQLCloseable { // Put the existing one back now that it's merged this.mutations.put(entry.getKey(), existingRows); } else { + // Size new map at batch size as that's what it'll likely grow to. + Map<ImmutableBytesPtr,RowMutationState> newRows = Maps.newHashMapWithExpectedSize(connection.getMutateBatchSize()); + newRows.putAll(entry.getValue()); + this.mutations.put(tableRef, newRows); if (!isIndex) { numRows += entry.getValue().size(); } @@ -705,7 +771,6 @@ public class MutationState implements SQLCloseable { @Override public void delete(List<Delete> deletes) throws IOException { ServerCache cache = null; - SQLException sqlE = null; try { PTable table = tableRef.getTable(); List<PTable> indexes = table.getIndexes(); @@ -764,6 +829,7 @@ public class MutationState implements SQLCloseable { sendAll = true; } + Map<ImmutableBytesPtr, RowMutationState> valuesMap; List<TableRef> txTableRefs = Lists.newArrayListWithExpectedSize(mutations.size()); // add tracing for this operation try (TraceScope trace = Tracing.startNewSpan(connection, "Committing mutations to tables")) { @@ -773,7 +839,7 @@ public class MutationState implements SQLCloseable { while (tableRefIterator.hasNext()) { // at this point we are going through mutations for each table final TableRef tableRef = tableRefIterator.next(); - Map<ImmutableBytesPtr, RowMutationState> valuesMap = mutations.get(tableRef); + valuesMap = mutations.get(tableRef); if (valuesMap == null || valuesMap.isEmpty()) { continue; } @@ -782,6 +848,7 @@ public class MutationState implements SQLCloseable { final PTable table = tableRef.getTable(); // Track tables to which we've sent uncommitted data if (isTransactional = table.isTransactional()) { + addReadFence(table); txTableRefs.add(tableRef); uncommittedPhysicalNames.add(table.getPhysicalName().getString()); } @@ -832,7 +899,7 @@ public class MutationState implements SQLCloseable { GLOBAL_MUTATION_BATCH_SIZE.update(numMutations); long startTime = System.currentTimeMillis(); - child.addTimelineAnnotation("Attempt " + retryCount);; + child.addTimelineAnnotation("Attempt " + retryCount); hTable.batch(mutationList); child.stop(); child.stop(); @@ -899,6 +966,13 @@ public class MutationState implements SQLCloseable { // committed in the event of a failure. if (isTransactional) { addUncommittedStatementIndexes(valuesMap.values()); + if (txMutations == null) { + txMutations = Maps.newHashMapWithExpectedSize(mutations.size()); + } + // Keep all mutations we've encountered until a commit or rollback. + // This is not ideal, but there's not good way to get the values back + // in the event that we need to replay the commit. + txMutations.put(tableRef, valuesMap); } // Remove batches as we process them if (sendAll) { @@ -985,12 +1059,17 @@ public class MutationState implements SQLCloseable { public void close() throws SQLException { } - private void reset() { + private void resetState() { + numRows = 0; + this.mutations.clear(); + resetTransactionalState(); + } + + private void resetTransactionalState() { tx = null; txAwares.clear(); + txMutations = null; uncommittedPhysicalNames.clear(); - this.mutations.clear(); - numRows = 0; uncommittedStatementIndexes = EMPTY_STATEMENT_INDEX_ARRAY; } @@ -1004,62 +1083,121 @@ public class MutationState implements SQLCloseable { } } } finally { - reset(); + resetState(); } } public void commit() throws SQLException { - boolean sendSuccessful=false; - SQLException sqlE = null; - try { - send(); - sendSuccessful=true; - } catch (SQLException e) { - sqlE = e; - } finally { + Map<TableRef, Map<ImmutableBytesPtr,RowMutationState>> txMutations = Collections.emptyMap(); + int retryCount = 0; + do { + boolean sendSuccessful=false; + boolean retryCommit = false; + SQLException sqlE = null; try { - if (txContext != null && isTransactionStarted()) { - TransactionFailureException txFailure = null; - boolean finishSuccessful=false; - try { - if (sendSuccessful) { - txContext.finish(); - finishSuccessful = true; - } - } catch (TransactionFailureException e) { - txFailure = e; - SQLException nextE = TransactionUtil.getTransactionFailureException(e); - if (sqlE == null) { - sqlE = nextE; - } else { - sqlE.setNextException(nextE); + send(); + txMutations = this.txMutations; + sendSuccessful=true; + } catch (SQLException e) { + sqlE = e; + } finally { + try { + if (txContext != null && isTransactionStarted()) { + TransactionFailureException txFailure = null; + boolean finishSuccessful=false; + try { + if (sendSuccessful) { + txContext.finish(); + finishSuccessful = true; + } + } catch (TransactionFailureException e) { + retryCommit = (e instanceof TransactionConflictException && retryCount == 0); + txFailure = e; + SQLException nextE = TransactionUtil.getTransactionFailureException(e); + if (sqlE == null) { + sqlE = nextE; + } else { + sqlE.setNextException(nextE); + } + } finally { + // If send fails or finish fails, abort the tx + if (!finishSuccessful) { + try { + txContext.abort(txFailure); + } catch (TransactionFailureException e) { + SQLException nextE = TransactionUtil.getTransactionFailureException(e); + if (sqlE == null) { + sqlE = nextE; + } else { + sqlE.setNextException(nextE); + } + } + } } + } + } finally { + try { + resetState(); } finally { - // If send fails or finish fails, abort the tx - if (!finishSuccessful) { + if (retryCommit) { + startTransaction(); try { - txContext.abort(txFailure); - } catch (TransactionFailureException e) { - SQLException nextE = TransactionUtil.getTransactionFailureException(e); + retryCommit = wasIndexAdded(txMutations.keySet()); + } catch (SQLException e) { + retryCommit = false; if (sqlE == null) { - sqlE = nextE; + sqlE = e; } else { - sqlE.setNextException(nextE); + sqlE.setNextException(e); } } } + if (sqlE != null && !retryCommit) { + throw sqlE; + } } } - } finally { - try { - reset(); - } finally { - if (sqlE != null) { - throw sqlE; - } - } + } + // Retry commit once if conflict occurred and index was added + if (!retryCommit) { + break; + } + retryCount++; + mutations.putAll(txMutations); + } while (true); + } + + /** + * Determines whether indexes were added to mutated tables while the transaction was in progress. + * @return true if indexes were added and false otherwise. + * @throws SQLException + */ + private boolean wasIndexAdded(Set<TableRef> txTableRefs) throws SQLException { + MetaDataClient client = new MetaDataClient(connection); + PMetaData cache = connection.getMetaDataCache(); + boolean addedIndexes = false; + for (TableRef tableRef : txTableRefs) { + PTable dataTable = tableRef.getTable(); + List<PTable> oldIndexes; + PTableRef ptableRef = cache.getTableRef(dataTable.getKey()); + oldIndexes = ptableRef.getTable().getIndexes(); + MetaDataMutationResult result = client.updateCache(dataTable.getTenantId(), dataTable.getSchemaName().getString(), dataTable.getTableName().getString()); + long timestamp = TransactionUtil.getResolvedTime(connection, result); + tableRef.setTimeStamp(timestamp); + if (result.getTable() == null) { + throw new TableNotFoundException(dataTable.getSchemaName().getString(), dataTable.getTableName().getString()); + } + if (!result.wasUpdated()) { + continue; + } + if (!addedIndexes) { + // TODO: in theory we should do a deep equals check here, as it's possible + // that an index was dropped and recreated with the same name but different + // indexed/covered columns. + addedIndexes = (!oldIndexes.equals(result.getTable().getIndexes())); } } + return addedIndexes; } /** http://git-wip-us.apache.org/repos/asf/phoenix/blob/30c2e755/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java index fb29e2c..530299b 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java @@ -58,12 +58,6 @@ import java.util.concurrent.TimeoutException; import javax.annotation.concurrent.GuardedBy; -import co.cask.tephra.TransactionSystemClient; -import co.cask.tephra.TxConstants; -import co.cask.tephra.distributed.PooledClientProvider; -import co.cask.tephra.distributed.TransactionServiceClient; -import co.cask.tephra.hbase11.coprocessor.TransactionProcessor; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HConstants; @@ -205,6 +199,12 @@ import com.google.common.collect.Maps; import com.google.common.collect.Sets; import com.google.common.util.concurrent.ThreadFactoryBuilder; +import co.cask.tephra.TransactionSystemClient; +import co.cask.tephra.TxConstants; +import co.cask.tephra.distributed.PooledClientProvider; +import co.cask.tephra.distributed.TransactionServiceClient; +import co.cask.tephra.hbase11.coprocessor.TransactionProcessor; + public class ConnectionQueryServicesImpl extends DelegateQueryServices implements ConnectionQueryServices { private static final Logger logger = LoggerFactory.getLogger(ConnectionQueryServicesImpl.class); http://git-wip-us.apache.org/repos/asf/phoenix/blob/30c2e755/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java index 2a514d5..7ee30fb 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java @@ -109,8 +109,6 @@ import java.util.Map; import java.util.Properties; import java.util.Set; -import co.cask.tephra.TxConstants; - import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HConstants; @@ -213,6 +211,8 @@ import com.google.common.collect.Maps; import com.google.common.collect.Sets; import com.google.common.primitives.Ints; +import co.cask.tephra.TxConstants; + public class MetaDataClient { private static final Logger logger = LoggerFactory.getLogger(MetaDataClient.class); @@ -1053,7 +1053,7 @@ public class MetaDataClient { throw new SQLException(e); } ImmutableBytesWritable ptr = new ImmutableBytesWritable(); - PTable dataTable = tableRef.getTable(); + final PTable dataTable = tableRef.getTable(); for(PTable idx: dataTable.getIndexes()) { if(idx.getName().equals(index.getName())) { index = idx; @@ -1078,6 +1078,7 @@ public class MetaDataClient { @Override public MutationState execute() throws SQLException { + connection.getMutationState().commitWriteFence(dataTable); Cell kv = plan.iterator().next().getValue(0); ImmutableBytesWritable tmpPtr = new ImmutableBytesWritable(kv.getValueArray(), kv.getValueOffset(), kv.getValueLength()); // A single Cell will be returned with the count(*) - we decode that here @@ -1581,24 +1582,22 @@ public class MetaDataClient { if (parent != null && tableType == PTableType.INDEX) { timestamp = TransactionUtil.getTableTimestamp(connection, transactional); storeNulls = parent.getStoreNulls(); - if (tableType == PTableType.INDEX) { - // Index on view - // TODO: Can we support a multi-tenant index directly on a multi-tenant - // table instead of only a view? We don't have anywhere to put the link - // from the table to the index, though. - if (indexType == IndexType.LOCAL || (parent.getType() == PTableType.VIEW && parent.getViewType() != ViewType.MAPPED)) { - PName physicalName = parent.getPhysicalName(); - saltBucketNum = parent.getBucketNum(); - addSaltColumn = (saltBucketNum != null && indexType != IndexType.LOCAL); - defaultFamilyName = parent.getDefaultFamilyName() == null ? null : parent.getDefaultFamilyName().getString(); - if (indexType == IndexType.LOCAL) { - saltBucketNum = null; - // Set physical name of local index table - physicalNames = Collections.singletonList(PNameFactory.newName(MetaDataUtil.getLocalIndexPhysicalName(physicalName.getBytes()))); - } else { - // Set physical name of view index table - physicalNames = Collections.singletonList(PNameFactory.newName(MetaDataUtil.getViewIndexPhysicalName(physicalName.getBytes()))); - } + // Index on view + // TODO: Can we support a multi-tenant index directly on a multi-tenant + // table instead of only a view? We don't have anywhere to put the link + // from the table to the index, though. + if (indexType == IndexType.LOCAL || (parent.getType() == PTableType.VIEW && parent.getViewType() != ViewType.MAPPED)) { + PName physicalName = parent.getPhysicalName(); + saltBucketNum = parent.getBucketNum(); + addSaltColumn = (saltBucketNum != null && indexType != IndexType.LOCAL); + defaultFamilyName = parent.getDefaultFamilyName() == null ? null : parent.getDefaultFamilyName().getString(); + if (indexType == IndexType.LOCAL) { + saltBucketNum = null; + // Set physical name of local index table + physicalNames = Collections.singletonList(PNameFactory.newName(MetaDataUtil.getLocalIndexPhysicalName(physicalName.getBytes()))); + } else { + // Set physical name of view index table + physicalNames = Collections.singletonList(PNameFactory.newName(MetaDataUtil.getViewIndexPhysicalName(physicalName.getBytes()))); } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/30c2e755/phoenix-core/src/main/java/org/apache/phoenix/schema/TableRef.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/TableRef.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/TableRef.java index 26cbbc3..35e2f77 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/schema/TableRef.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/TableRef.java @@ -31,8 +31,8 @@ public class TableRef { public static final TableRef EMPTY_TABLE_REF = new TableRef(new PTableImpl()); private PTable table; + private long upperBoundTimeStamp; private final String alias; - private final long upperBoundTimeStamp; private final long lowerBoundTimeStamp; private final boolean hasDynamicCols; @@ -73,6 +73,10 @@ public class TableRef { this.table = value; } + public void setTimeStamp(long timeStamp) { + this.upperBoundTimeStamp = timeStamp; + } + public String getTableAlias() { return alias; } http://git-wip-us.apache.org/repos/asf/phoenix/blob/30c2e755/phoenix-core/src/main/java/org/apache/phoenix/util/SchemaUtil.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/SchemaUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/SchemaUtil.java index c6a20b3..de92046 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/util/SchemaUtil.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/util/SchemaUtil.java @@ -17,6 +17,13 @@ */ package org.apache.phoenix.util; +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; +import static com.google.common.base.Strings.isNullOrEmpty; +import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES; +import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_FUNCTION_NAME_BYTES; +import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_STATS_NAME_BYTES; + import java.sql.Connection; import java.sql.DatabaseMetaData; import java.sql.ResultSet; @@ -31,6 +38,7 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.TreeSet; + import javax.annotation.Nullable; import org.apache.hadoop.hbase.KeyValue; @@ -66,13 +74,6 @@ import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import com.google.common.collect.Maps; -import static com.google.common.base.Preconditions.checkArgument; -import static com.google.common.base.Preconditions.checkNotNull; -import static com.google.common.base.Strings.isNullOrEmpty; -import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES; -import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_FUNCTION_NAME_BYTES; -import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_STATS_NAME_BYTES; - /** * * Static class for various schema-related utilities @@ -890,4 +891,10 @@ public class SchemaUtil { public static boolean hasRowTimestampColumn(PTable table) { return table.getRowTimestampColPos()>0; } + + public static byte[] getTableKey(PTable dataTable) { + PName tenantId = dataTable.getTenantId(); + PName schemaName = dataTable.getSchemaName(); + return getTableKey(tenantId == null ? ByteUtil.EMPTY_BYTE_ARRAY : tenantId.getBytes(), schemaName == null ? ByteUtil.EMPTY_BYTE_ARRAY : schemaName.getBytes(), dataTable.getTableName().getBytes()); + } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/30c2e755/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 8860451..72551a4 100644 --- a/pom.xml +++ b/pom.xml @@ -112,7 +112,7 @@ <joni.version>2.1.2</joni.version> <calcite.version>1.5.0</calcite.version> <jettyVersion>8.1.7.v20120910</jettyVersion> - <tephra.version>0.6.3</tephra.version> + <tephra.version>0.6.4</tephra.version> <!-- Test Dependencies --> <mockito-all.version>1.8.5</mockito-all.version>
