Repository: phoenix
Updated Branches:
  refs/heads/txn 3d9c786ed -> d49c3bf6f


Modify MutationState commit to use transactions


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/d49c3bf6
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/d49c3bf6
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/d49c3bf6

Branch: refs/heads/txn
Commit: d49c3bf6f3c25f8ec35659e18d345a3c17281154
Parents: 3d9c786
Author: Thomas D'Silva <[email protected]>
Authored: Wed Mar 11 17:47:44 2015 -0700
Committer: Thomas D'Silva <[email protected]>
Committed: Wed Mar 11 18:53:25 2015 -0700

----------------------------------------------------------------------
 .../apache/phoenix/compile/DeleteCompiler.java  |   2 +-
 .../phoenix/exception/SQLExceptionCode.java     |   6 +
 .../apache/phoenix/execute/MutationState.java   | 120 +++++++++++++++----
 .../apache/phoenix/jdbc/PhoenixConnection.java  |  17 +--
 .../phoenix/jdbc/PhoenixEmbeddedDriver.java     |   5 +
 5 files changed, 112 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/d49c3bf6/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java 
b/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
index b15bfd8..0452af4 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
@@ -426,7 +426,7 @@ public class DeleteCompiler {
                     }
     
                     @Override
-                    public MutationState execute() {
+                    public MutationState execute() throws SQLException {
                         // We have a point lookup, so we know we have a simple 
set of fully qualified
                         // keys for our ranges
                         ScanRanges ranges = context.getScanRanges();

http://git-wip-us.apache.org/repos/asf/phoenix/blob/d49c3bf6/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java 
b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
index 207f80b..278a41c 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
@@ -157,6 +157,12 @@ public enum SQLExceptionCode {
      AGGREGATE_EXPRESSION_NOT_ALLOWED_IN_INDEX(520, "42897", "Aggreagaate 
expression not allowed in an index"),
      NON_DETERMINISTIC_EXPRESSION_NOT_ALLOWED_IN_INDEX(521, "42898", 
"Non-deterministic expression not allowed in an index"),
      STATELESS_EXPRESSION_NOT_ALLOWED_IN_INDEX(522, "42899", "Stateless 
expression not allowed in an index"),
+     
+     /**
+      *  Transaction exceptions.
+      */
+     TRANSACTION_FINISH_EXCEPTION(523, "42900", "Exception while finishing 
transaction"),
+     TRANSACTION_ABORT_EXCEPTION(524, "42901", "Exception while aborting 
transaction"),
 
      /** 
      * HBase and Phoenix specific implementation defined sub-classes.

http://git-wip-us.apache.org/repos/asf/phoenix/blob/d49c3bf6/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java 
b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
index 4270ef9..6e37cc5 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
@@ -27,9 +27,9 @@ import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.TimeUnit;
 
-import co.cask.tephra.hbase98.TransactionAwareHTable;
-
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.client.HTableInterface;
 import org.apache.hadoop.hbase.client.Mutation;
@@ -39,12 +39,15 @@ import org.apache.phoenix.cache.ServerCacheClient;
 import org.apache.phoenix.cache.ServerCacheClient.ServerCache;
 import org.apache.phoenix.coprocessor.MetaDataProtocol.MetaDataMutationResult;
 import org.apache.phoenix.exception.SQLExceptionCode;
+import org.apache.phoenix.exception.SQLExceptionInfo;
 import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
 import org.apache.phoenix.index.IndexMaintainer;
 import org.apache.phoenix.index.IndexMetaDataCacheClient;
 import org.apache.phoenix.index.PhoenixIndexCodec;
 import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.jdbc.PhoenixEmbeddedDriver.ConnectionInfo;
 import org.apache.phoenix.monitoring.PhoenixMetrics;
+import org.apache.phoenix.query.HBaseFactoryProvider;
 import org.apache.phoenix.query.QueryConstants;
 import org.apache.phoenix.schema.IllegalDataException;
 import org.apache.phoenix.schema.MetaDataClient;
@@ -60,11 +63,24 @@ import org.apache.phoenix.util.LogUtil;
 import org.apache.phoenix.util.PhoenixRuntime;
 import org.apache.phoenix.util.SQLCloseable;
 import org.apache.phoenix.util.ServerUtil;
+import org.apache.twill.discovery.ZKDiscoveryService;
+import org.apache.twill.zookeeper.RetryStrategies;
+import org.apache.twill.zookeeper.ZKClientService;
+import org.apache.twill.zookeeper.ZKClientServices;
+import org.apache.twill.zookeeper.ZKClients;
 import org.cloudera.htrace.Span;
 import org.cloudera.htrace.TraceScope;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import co.cask.tephra.TransactionAware;
+import co.cask.tephra.TransactionContext;
+import co.cask.tephra.TransactionFailureException;
+import co.cask.tephra.distributed.PooledClientProvider;
+import co.cask.tephra.distributed.TransactionServiceClient;
+import co.cask.tephra.hbase98.TransactionAwareHTable;
+
+import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Iterators;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
@@ -80,40 +96,53 @@ public class MutationState implements SQLCloseable {
     private static final Logger logger = 
LoggerFactory.getLogger(MutationState.class);
 
     private PhoenixConnection connection;
+    private final TransactionServiceClient transactionServiceClient;
     private final long maxSize;
     private final ImmutableBytesPtr tempPtr = new ImmutableBytesPtr();
-    private final Map<TableRef, Map<ImmutableBytesPtr,Map<PColumn,byte[]>>> 
mutations = Maps.newHashMapWithExpectedSize(3); // TODO: Sizing?
+    private final Map<TableRef, Map<ImmutableBytesPtr,Map<PColumn,byte[]>>> 
mutations; // TODO: Sizing?
     private long sizeOffset;
-    private int numRows = 0;
+    private int numRows;
     
-    public MutationState(int maxSize, PhoenixConnection connection) {
+    public MutationState(int maxSize, PhoenixConnection connection) throws 
SQLException {
         this(maxSize,connection,0);
     }
     
-    public MutationState(int maxSize, PhoenixConnection connection, long 
sizeOffset) {
-        this.maxSize = maxSize;
-        this.connection = connection;
-        this.sizeOffset = sizeOffset;
+    public MutationState(int maxSize, PhoenixConnection connection, long 
sizeOffset) throws SQLException {
+       this(maxSize, connection, sizeOffset, Maps.<TableRef, 
Map<ImmutableBytesPtr,Map<PColumn,byte[]>>>newHashMapWithExpectedSize(3), 0);
     }
     
-    public MutationState(TableRef table, 
Map<ImmutableBytesPtr,Map<PColumn,byte[]>> mutations, long sizeOffset, long 
maxSize, PhoenixConnection connection) {
-        this.maxSize = maxSize;
-        this.connection = connection;
-        this.mutations.put(table, mutations);
-        this.sizeOffset = sizeOffset;
-        this.numRows = mutations.size();
-        throwIfTooBig();
+    public MutationState(TableRef table, 
Map<ImmutableBytesPtr,Map<PColumn,byte[]>> mutations, long sizeOffset, long 
maxSize, PhoenixConnection connection) throws SQLException {
+       this(maxSize, connection, sizeOffset, 
Maps.newHashMap(ImmutableMap.of(table, mutations)), mutations.size());
     }
     
-    private MutationState(List<Map.Entry<TableRef, 
Map<ImmutableBytesPtr,Map<PColumn,byte[]>>>> entries, long sizeOffset, long 
maxSize, PhoenixConnection connection) {
+    public MutationState(long maxSize, PhoenixConnection connection, long 
sizeOffset, Map<TableRef, Map<ImmutableBytesPtr,Map<PColumn,byte[]>>> 
mutations, int numRows) throws SQLException {
         this.maxSize = maxSize;
         this.connection = connection;
         this.sizeOffset = sizeOffset;
-        for (Map.Entry<TableRef, Map<ImmutableBytesPtr,Map<PColumn,byte[]>>> 
entry : entries) {
-            numRows += entry.getValue().size();
-            this.mutations.put(entry.getKey(), entry.getValue());
-        }
+        this.mutations = mutations; 
+        this.numRows = numRows;
         throwIfTooBig();
+        
+        //create a transaction service client
+//        Configuration config = 
HBaseFactoryProvider.getConfigurationFactory().getConfiguration();
+//             String zkQuorumServersString = 
ConnectionInfo.getZookeeperConnectionString(connection.getURL());
+//             ZKClientService zkClientService = ZKClientServices.delegate(
+//                           ZKClients.reWatchOnExpire(
+//                             ZKClients.retryOnFailure(
+//                               
ZKClientService.Builder.of(zkQuorumServersString)
+//                                 
.setSessionTimeout(config.getInt(HConstants.ZK_SESSION_TIMEOUT, 
HConstants.DEFAULT_ZK_SESSION_TIMEOUT))
+//                                 .build(),
+//                               RetryStrategies.exponentialDelay(500, 2000, 
TimeUnit.MILLISECONDS)
+//                             )
+//                           )
+//                         );
+//             zkClientService.startAndWait();
+//             ZKDiscoveryService zkDiscoveryService = new 
ZKDiscoveryService(zkClientService);
+//             PooledClientProvider pooledClientProvider = new 
PooledClientProvider(
+//                             config, zkDiscoveryService);
+//             this.transactionServiceClient = new 
TransactionServiceClient(config,
+//                             pooledClientProvider);
+               this.transactionServiceClient = null;
     }
     
     private void throwIfTooBig() {
@@ -354,6 +383,47 @@ public class MutationState implements SQLCloseable {
     
     @SuppressWarnings("deprecation")
     public void commit() throws SQLException {
+       // create list of transaction aware htables
+        List<TransactionAware> txAwareHTables = 
Lists.newArrayListWithExpectedSize(mutations.size());
+        // create list of htables (some of which could be transactional)
+        List<HTableInterface> hTables = 
Lists.newArrayListWithExpectedSize(mutations.size());
+        for ( TableRef tableRef : this.mutations.keySet()) {
+               PTable table = tableRef.getTable();
+                       byte[] hTableName = table.getPhysicalName().getBytes();
+               HTableInterface hTable = 
connection.getQueryServices().getTable(hTableName);
+               if (table.isTransactional()) {
+                       TransactionAwareHTable transactionAwareHTable = new 
TransactionAwareHTable(hTable);
+               txAwareHTables.add(transactionAwareHTable);
+               hTable = transactionAwareHTable;
+               }
+               hTables.add(hTable);
+        }
+        
+        if (txAwareHTables.isEmpty()) {
+               commitMutations(hTables);
+        }
+        else {
+               TransactionContext transactionContext = new 
TransactionContext(transactionServiceClient, txAwareHTables);
+                       try {
+                               transactionContext.start();
+                               commitMutations(hTables);
+                               transactionContext.finish();
+                       } catch (TransactionFailureException e) {
+                               try {
+                                       transactionContext.abort();
+                                       throw new SQLExceptionInfo.Builder(
+                                                       
SQLExceptionCode.TRANSACTION_FINISH_EXCEPTION)
+                                                       
.setRootCause(e).build().buildException();
+                               } catch (TransactionFailureException e1) {
+                                       throw new SQLExceptionInfo.Builder(
+                                                       
SQLExceptionCode.TRANSACTION_ABORT_EXCEPTION)
+                                                       
.setRootCause(e1).build().buildException();
+                               }
+                       }
+        }
+    }
+    
+    public void commitMutations(List<HTableInterface> hTables) throws 
SQLException {
         int i = 0;
         byte[] tenantId = connection.getTenantId() == null ? null : 
connection.getTenantId().getBytes();
         long[] serverTimeStamps = validate();
@@ -454,7 +524,13 @@ public class MutationState implements SQLCloseable {
                         }
                         // Throw to client with both what was committed so far 
and what is left to be committed.
                         // That way, client can either undo what was done or 
try again with what was not done.
-                        sqlE = new CommitException(e, this, new 
MutationState(committedList, this.sizeOffset, this.maxSize, this.connection));
+                        int numCommitedMutations = 0;
+                        Map<TableRef, 
Map<ImmutableBytesPtr,Map<PColumn,byte[]>>> commitedMutations = 
Maps.newHashMapWithExpectedSize(committedList.size());
+                        for (Map.Entry<TableRef, 
Map<ImmutableBytesPtr,Map<PColumn,byte[]>>> committedEntry : committedList) {
+                               numCommitedMutations += 
committedEntry.getValue().size();
+                               commitedMutations.put(committedEntry.getKey(), 
committedEntry.getValue());
+                        }
+                        sqlE = new CommitException(e, this, new 
MutationState(this.maxSize, this.connection, this.sizeOffset, 
commitedMutations, numCommitedMutations));
                     } finally {
                         try {
                             hTable.close();

http://git-wip-us.apache.org/repos/asf/phoenix/blob/d49c3bf6/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java 
b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java
index 3f6b1b2..d5687f0 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java
@@ -95,9 +95,6 @@ import org.apache.phoenix.util.SQLCloseables;
 import org.cloudera.htrace.Sampler;
 import org.cloudera.htrace.TraceScope;
 
-//import co.cask.tephra.TransactionContext;
-//import co.cask.tephra.TransactionFailureException;
-
 import com.google.common.base.Objects;
 import com.google.common.base.Strings;
 import com.google.common.collect.ImmutableMap;
@@ -126,7 +123,6 @@ public class PhoenixConnection implements Connection, 
org.apache.phoenix.jdbc.Jd
     private List<SQLCloseable> statements = new ArrayList<SQLCloseable>();
     private final Map<PDataType<?>, Format> formatters = new HashMap<>();
     private final MutationState mutationState;
-//    private TransactionContext transactionContext;
     private final int mutateBatchSize;
     private final Long scn;
     private boolean isAutoCommit = false;
@@ -245,6 +241,7 @@ public class PhoenixConnection implements Connection, 
org.apache.phoenix.jdbc.Jd
         // setup tracing, if its enabled
         this.sampler = Tracing.getConfiguredSampler(this);
         this.customTracingAnnotations = getImmutableCustomTracingAnnotations();
+               
     }
     
     private ImmutableMap<String, String> 
getImmutableCustomTracingAnnotations() {
@@ -429,18 +426,13 @@ public class PhoenixConnection implements Connection, 
org.apache.phoenix.jdbc.Jd
             isClosed = true;
         }
     }
-
+    
     @Override
     public void commit() throws SQLException {
         CallRunner.run(new CallRunner.CallableThrowable<Void, SQLException>() {
             @Override
             public Void call() throws SQLException {
                 mutationState.commit();
-//                try {
-//                                     transactionContext.finish();
-//                             } catch (TransactionFailureException e) {
-//                                     throw new SQLException(e);
-//                             }
                 return null;
             }
         }, Tracing.withTracing(this, "committing mutations"));
@@ -644,11 +636,6 @@ public class PhoenixConnection implements Connection, 
org.apache.phoenix.jdbc.Jd
     @Override
     public void rollback() throws SQLException {
         mutationState.rollback(this);
-//        try {
-//                     transactionContext.abort();
-//             } catch (TransactionFailureException e) {
-//                     throw new SQLException(e);
-//             }
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/phoenix/blob/d49c3bf6/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixEmbeddedDriver.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixEmbeddedDriver.java 
b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixEmbeddedDriver.java
index ff25fae..4a65b76 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixEmbeddedDriver.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixEmbeddedDriver.java
@@ -251,6 +251,11 @@ public abstract class PhoenixEmbeddedDriver implements 
Driver, org.apache.phoeni
             return new ConnectionInfo(quorum,port,rootNode, principal, 
keytabFile);
         }
         
+        public static String getZookeeperConnectionString(String url) throws 
SQLException {
+               ConnectionInfo connInfo = ConnectionInfo.create(url);
+               return connInfo.getZookeeperQuorum()+":"+connInfo.getPort();
+        }
+        
         public ConnectionInfo normalize(ReadOnlyProps props) throws 
SQLException {
             String zookeeperQuorum = this.getZookeeperQuorum();
             Integer port = this.getPort();

Reply via email to