Repository: phoenix Updated Branches: refs/heads/txn 3d9c786ed -> d49c3bf6f
Modify MutationState commit to use transactions Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/d49c3bf6 Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/d49c3bf6 Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/d49c3bf6 Branch: refs/heads/txn Commit: d49c3bf6f3c25f8ec35659e18d345a3c17281154 Parents: 3d9c786 Author: Thomas D'Silva <[email protected]> Authored: Wed Mar 11 17:47:44 2015 -0700 Committer: Thomas D'Silva <[email protected]> Committed: Wed Mar 11 18:53:25 2015 -0700 ---------------------------------------------------------------------- .../apache/phoenix/compile/DeleteCompiler.java | 2 +- .../phoenix/exception/SQLExceptionCode.java | 6 + .../apache/phoenix/execute/MutationState.java | 120 +++++++++++++++---- .../apache/phoenix/jdbc/PhoenixConnection.java | 17 +-- .../phoenix/jdbc/PhoenixEmbeddedDriver.java | 5 + 5 files changed, 112 insertions(+), 38 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/d49c3bf6/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java index b15bfd8..0452af4 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java @@ -426,7 +426,7 @@ public class DeleteCompiler { } @Override - public MutationState execute() { + public MutationState execute() throws SQLException { // We have a point lookup, so we know we have a simple set of fully qualified // keys for our ranges ScanRanges ranges = context.getScanRanges(); http://git-wip-us.apache.org/repos/asf/phoenix/blob/d49c3bf6/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java index 207f80b..278a41c 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java @@ -157,6 +157,12 @@ public enum SQLExceptionCode { AGGREGATE_EXPRESSION_NOT_ALLOWED_IN_INDEX(520, "42897", "Aggreagaate expression not allowed in an index"), NON_DETERMINISTIC_EXPRESSION_NOT_ALLOWED_IN_INDEX(521, "42898", "Non-deterministic expression not allowed in an index"), STATELESS_EXPRESSION_NOT_ALLOWED_IN_INDEX(522, "42899", "Stateless expression not allowed in an index"), + + /** + * Transaction exceptions. + */ + TRANSACTION_FINISH_EXCEPTION(523, "42900", "Exception while finishing transaction"), + TRANSACTION_ABORT_EXCEPTION(524, "42901", "Exception while aborting transaction"), /** * HBase and Phoenix specific implementation defined sub-classes. http://git-wip-us.apache.org/repos/asf/phoenix/blob/d49c3bf6/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java index 4270ef9..6e37cc5 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java @@ -27,9 +27,9 @@ import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.concurrent.TimeUnit; -import co.cask.tephra.hbase98.TransactionAwareHTable; - +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.client.HTableInterface; import org.apache.hadoop.hbase.client.Mutation; @@ -39,12 +39,15 @@ import org.apache.phoenix.cache.ServerCacheClient; import org.apache.phoenix.cache.ServerCacheClient.ServerCache; import org.apache.phoenix.coprocessor.MetaDataProtocol.MetaDataMutationResult; import org.apache.phoenix.exception.SQLExceptionCode; +import org.apache.phoenix.exception.SQLExceptionInfo; import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; import org.apache.phoenix.index.IndexMaintainer; import org.apache.phoenix.index.IndexMetaDataCacheClient; import org.apache.phoenix.index.PhoenixIndexCodec; import org.apache.phoenix.jdbc.PhoenixConnection; +import org.apache.phoenix.jdbc.PhoenixEmbeddedDriver.ConnectionInfo; import org.apache.phoenix.monitoring.PhoenixMetrics; +import org.apache.phoenix.query.HBaseFactoryProvider; import org.apache.phoenix.query.QueryConstants; import org.apache.phoenix.schema.IllegalDataException; import org.apache.phoenix.schema.MetaDataClient; @@ -60,11 +63,24 @@ import org.apache.phoenix.util.LogUtil; import org.apache.phoenix.util.PhoenixRuntime; import org.apache.phoenix.util.SQLCloseable; import org.apache.phoenix.util.ServerUtil; +import org.apache.twill.discovery.ZKDiscoveryService; +import org.apache.twill.zookeeper.RetryStrategies; +import org.apache.twill.zookeeper.ZKClientService; +import org.apache.twill.zookeeper.ZKClientServices; +import org.apache.twill.zookeeper.ZKClients; import org.cloudera.htrace.Span; import org.cloudera.htrace.TraceScope; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import co.cask.tephra.TransactionAware; +import co.cask.tephra.TransactionContext; +import co.cask.tephra.TransactionFailureException; +import co.cask.tephra.distributed.PooledClientProvider; +import co.cask.tephra.distributed.TransactionServiceClient; +import co.cask.tephra.hbase98.TransactionAwareHTable; + +import com.google.common.collect.ImmutableMap; import com.google.common.collect.Iterators; import com.google.common.collect.Lists; import com.google.common.collect.Maps; @@ -80,40 +96,53 @@ public class MutationState implements SQLCloseable { private static final Logger logger = LoggerFactory.getLogger(MutationState.class); private PhoenixConnection connection; + private final TransactionServiceClient transactionServiceClient; private final long maxSize; private final ImmutableBytesPtr tempPtr = new ImmutableBytesPtr(); - private final Map<TableRef, Map<ImmutableBytesPtr,Map<PColumn,byte[]>>> mutations = Maps.newHashMapWithExpectedSize(3); // TODO: Sizing? + private final Map<TableRef, Map<ImmutableBytesPtr,Map<PColumn,byte[]>>> mutations; // TODO: Sizing? private long sizeOffset; - private int numRows = 0; + private int numRows; - public MutationState(int maxSize, PhoenixConnection connection) { + public MutationState(int maxSize, PhoenixConnection connection) throws SQLException { this(maxSize,connection,0); } - public MutationState(int maxSize, PhoenixConnection connection, long sizeOffset) { - this.maxSize = maxSize; - this.connection = connection; - this.sizeOffset = sizeOffset; + public MutationState(int maxSize, PhoenixConnection connection, long sizeOffset) throws SQLException { + this(maxSize, connection, sizeOffset, Maps.<TableRef, Map<ImmutableBytesPtr,Map<PColumn,byte[]>>>newHashMapWithExpectedSize(3), 0); } - public MutationState(TableRef table, Map<ImmutableBytesPtr,Map<PColumn,byte[]>> mutations, long sizeOffset, long maxSize, PhoenixConnection connection) { - this.maxSize = maxSize; - this.connection = connection; - this.mutations.put(table, mutations); - this.sizeOffset = sizeOffset; - this.numRows = mutations.size(); - throwIfTooBig(); + public MutationState(TableRef table, Map<ImmutableBytesPtr,Map<PColumn,byte[]>> mutations, long sizeOffset, long maxSize, PhoenixConnection connection) throws SQLException { + this(maxSize, connection, sizeOffset, Maps.newHashMap(ImmutableMap.of(table, mutations)), mutations.size()); } - private MutationState(List<Map.Entry<TableRef, Map<ImmutableBytesPtr,Map<PColumn,byte[]>>>> entries, long sizeOffset, long maxSize, PhoenixConnection connection) { + public MutationState(long maxSize, PhoenixConnection connection, long sizeOffset, Map<TableRef, Map<ImmutableBytesPtr,Map<PColumn,byte[]>>> mutations, int numRows) throws SQLException { this.maxSize = maxSize; this.connection = connection; this.sizeOffset = sizeOffset; - for (Map.Entry<TableRef, Map<ImmutableBytesPtr,Map<PColumn,byte[]>>> entry : entries) { - numRows += entry.getValue().size(); - this.mutations.put(entry.getKey(), entry.getValue()); - } + this.mutations = mutations; + this.numRows = numRows; throwIfTooBig(); + + //create a transaction service client +// Configuration config = HBaseFactoryProvider.getConfigurationFactory().getConfiguration(); +// String zkQuorumServersString = ConnectionInfo.getZookeeperConnectionString(connection.getURL()); +// ZKClientService zkClientService = ZKClientServices.delegate( +// ZKClients.reWatchOnExpire( +// ZKClients.retryOnFailure( +// ZKClientService.Builder.of(zkQuorumServersString) +// .setSessionTimeout(config.getInt(HConstants.ZK_SESSION_TIMEOUT, HConstants.DEFAULT_ZK_SESSION_TIMEOUT)) +// .build(), +// RetryStrategies.exponentialDelay(500, 2000, TimeUnit.MILLISECONDS) +// ) +// ) +// ); +// zkClientService.startAndWait(); +// ZKDiscoveryService zkDiscoveryService = new ZKDiscoveryService(zkClientService); +// PooledClientProvider pooledClientProvider = new PooledClientProvider( +// config, zkDiscoveryService); +// this.transactionServiceClient = new TransactionServiceClient(config, +// pooledClientProvider); + this.transactionServiceClient = null; } private void throwIfTooBig() { @@ -354,6 +383,47 @@ public class MutationState implements SQLCloseable { @SuppressWarnings("deprecation") public void commit() throws SQLException { + // create list of transaction aware htables + List<TransactionAware> txAwareHTables = Lists.newArrayListWithExpectedSize(mutations.size()); + // create list of htables (some of which could be transactional) + List<HTableInterface> hTables = Lists.newArrayListWithExpectedSize(mutations.size()); + for ( TableRef tableRef : this.mutations.keySet()) { + PTable table = tableRef.getTable(); + byte[] hTableName = table.getPhysicalName().getBytes(); + HTableInterface hTable = connection.getQueryServices().getTable(hTableName); + if (table.isTransactional()) { + TransactionAwareHTable transactionAwareHTable = new TransactionAwareHTable(hTable); + txAwareHTables.add(transactionAwareHTable); + hTable = transactionAwareHTable; + } + hTables.add(hTable); + } + + if (txAwareHTables.isEmpty()) { + commitMutations(hTables); + } + else { + TransactionContext transactionContext = new TransactionContext(transactionServiceClient, txAwareHTables); + try { + transactionContext.start(); + commitMutations(hTables); + transactionContext.finish(); + } catch (TransactionFailureException e) { + try { + transactionContext.abort(); + throw new SQLExceptionInfo.Builder( + SQLExceptionCode.TRANSACTION_FINISH_EXCEPTION) + .setRootCause(e).build().buildException(); + } catch (TransactionFailureException e1) { + throw new SQLExceptionInfo.Builder( + SQLExceptionCode.TRANSACTION_ABORT_EXCEPTION) + .setRootCause(e1).build().buildException(); + } + } + } + } + + public void commitMutations(List<HTableInterface> hTables) throws SQLException { int i = 0; byte[] tenantId = connection.getTenantId() == null ? null : connection.getTenantId().getBytes(); long[] serverTimeStamps = validate(); @@ -454,7 +524,13 @@ public class MutationState implements SQLCloseable { } // Throw to client with both what was committed so far and what is left to be committed. // That way, client can either undo what was done or try again with what was not done. - sqlE = new CommitException(e, this, new MutationState(committedList, this.sizeOffset, this.maxSize, this.connection)); + int numCommitedMutations = 0; + Map<TableRef, Map<ImmutableBytesPtr,Map<PColumn,byte[]>>> commitedMutations = Maps.newHashMapWithExpectedSize(committedList.size()); + for (Map.Entry<TableRef, Map<ImmutableBytesPtr,Map<PColumn,byte[]>>> committedEntry : committedList) { + numCommitedMutations += committedEntry.getValue().size(); + commitedMutations.put(committedEntry.getKey(), committedEntry.getValue()); + } + sqlE = new CommitException(e, this, new MutationState(this.maxSize, this.connection, this.sizeOffset, commitedMutations, numCommitedMutations)); } finally { try { hTable.close(); http://git-wip-us.apache.org/repos/asf/phoenix/blob/d49c3bf6/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java index 3f6b1b2..d5687f0 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java @@ -95,9 +95,6 @@ import org.apache.phoenix.util.SQLCloseables; import org.cloudera.htrace.Sampler; import org.cloudera.htrace.TraceScope; -//import co.cask.tephra.TransactionContext; -//import co.cask.tephra.TransactionFailureException; - import com.google.common.base.Objects; import com.google.common.base.Strings; import com.google.common.collect.ImmutableMap; @@ -126,7 +123,6 @@ public class PhoenixConnection implements Connection, org.apache.phoenix.jdbc.Jd private List<SQLCloseable> statements = new ArrayList<SQLCloseable>(); private final Map<PDataType<?>, Format> formatters = new HashMap<>(); private final MutationState mutationState; -// private TransactionContext transactionContext; private final int mutateBatchSize; private final Long scn; private boolean isAutoCommit = false; @@ -245,6 +241,7 @@ public class PhoenixConnection implements Connection, org.apache.phoenix.jdbc.Jd // setup tracing, if its enabled this.sampler = Tracing.getConfiguredSampler(this); this.customTracingAnnotations = getImmutableCustomTracingAnnotations(); + } private ImmutableMap<String, String> getImmutableCustomTracingAnnotations() { @@ -429,18 +426,13 @@ public class PhoenixConnection implements Connection, org.apache.phoenix.jdbc.Jd isClosed = true; } } - + @Override public void commit() throws SQLException { CallRunner.run(new CallRunner.CallableThrowable<Void, SQLException>() { @Override public Void call() throws SQLException { mutationState.commit(); -// try { -// transactionContext.finish(); -// } catch (TransactionFailureException e) { -// throw new SQLException(e); -// } return null; } }, Tracing.withTracing(this, "committing mutations")); @@ -644,11 +636,6 @@ public class PhoenixConnection implements Connection, org.apache.phoenix.jdbc.Jd @Override public void rollback() throws SQLException { mutationState.rollback(this); -// try { -// transactionContext.abort(); -// } catch (TransactionFailureException e) { -// throw new SQLException(e); -// } } @Override http://git-wip-us.apache.org/repos/asf/phoenix/blob/d49c3bf6/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixEmbeddedDriver.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixEmbeddedDriver.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixEmbeddedDriver.java index ff25fae..4a65b76 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixEmbeddedDriver.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixEmbeddedDriver.java @@ -251,6 +251,11 @@ public abstract class PhoenixEmbeddedDriver implements Driver, org.apache.phoeni return new ConnectionInfo(quorum,port,rootNode, principal, keytabFile); } + public static String getZookeeperConnectionString(String url) throws SQLException { + ConnectionInfo connInfo = ConnectionInfo.create(url); + return connInfo.getZookeeperQuorum()+":"+connInfo.getPort(); + } + public ConnectionInfo normalize(ReadOnlyProps props) throws SQLException { String zookeeperQuorum = this.getZookeeperQuorum(); Integer port = this.getPort();
