Repository: phoenix
Updated Branches:
  refs/heads/txn d905a6662 -> c7700b41d


Support rollback across tables with indexes


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/c7700b41
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/c7700b41
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/c7700b41

Branch: refs/heads/txn
Commit: c7700b41d156049228e521da5f964f4c56c905b9
Parents: d905a66
Author: James Taylor <[email protected]>
Authored: Wed May 20 01:43:48 2015 -0700
Committer: James Taylor <[email protected]>
Committed: Wed May 20 01:43:48 2015 -0700

----------------------------------------------------------------------
 .../end2end/index/BaseMutableIndexIT.java       |   1 +
 .../end2end/index/TxGlobalMutableIndexIT.java   |  43 +++
 .../execute/DelegateHTableInterface.java        | 282 +++++++++++++++++++
 .../apache/phoenix/execute/MutationState.java   | 161 ++++++++---
 .../apache/phoenix/index/IndexMaintainer.java   |  30 ++
 .../phoenix/index/IndexMetaDataCacheClient.java |   5 +-
 .../phoenix/index/PhoenixIndexMetaData.java     |   8 +-
 .../index/PhoenixTransactionalIndexer.java      | 109 +++++--
 .../apache/phoenix/jdbc/PhoenixStatement.java   |   8 +-
 .../apache/phoenix/schema/DelegateTable.java    |   4 +-
 .../java/org/apache/phoenix/schema/PTable.java  |   2 +-
 .../org/apache/phoenix/schema/PTableImpl.java   |   3 +-
 .../java/org/apache/phoenix/util/IndexUtil.java |  20 +-
 .../java/org/apache/phoenix/util/ScanUtil.java  |   6 +-
 pom.xml                                         |  45 ---
 15 files changed, 612 insertions(+), 115 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/c7700b41/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/BaseMutableIndexIT.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/BaseMutableIndexIT.java
 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/BaseMutableIndexIT.java
index c6aadca..2676548 100644
--- 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/BaseMutableIndexIT.java
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/BaseMutableIndexIT.java
@@ -1171,4 +1171,5 @@ public abstract class BaseMutableIndexIT extends 
BaseHBaseManagedTimeIT {
                conn.close();
        }
     }
+    
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/c7700b41/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/TxGlobalMutableIndexIT.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/TxGlobalMutableIndexIT.java
 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/TxGlobalMutableIndexIT.java
index a2e0412..5196b0a 100644
--- 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/TxGlobalMutableIndexIT.java
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/TxGlobalMutableIndexIT.java
@@ -17,12 +17,24 @@
  */
 package org.apache.phoenix.end2end.index;
 
+import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.Statement;
 import java.util.Map;
+import java.util.Properties;
 
 import org.apache.phoenix.end2end.Shadower;
 import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.util.PropertiesUtil;
 import org.apache.phoenix.util.ReadOnlyProps;
 import org.junit.BeforeClass;
+import org.junit.Test;
 
 import com.google.common.collect.Maps;
 
@@ -39,4 +51,35 @@ public class TxGlobalMutableIndexIT extends 
GlobalMutableIndexIT {
         props.put(QueryServices.DEFAULT_TRANSACTIONAL_ATTRIB, 
Boolean.toString(true));
         setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
     }
+    
+    @Test
+    public void testRollbackOfUncommittedIndexChange() throws Exception {
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        Connection conn = DriverManager.getConnection(getUrl(), props);
+        conn.setAutoCommit(false);
+        try {
+            Statement stmt = conn.createStatement();
+            stmt.execute("CREATE TABLE DEMO(v1 VARCHAR PRIMARY KEY, v2 
VARCHAR, v3 VARCHAR)");
+            stmt.execute("CREATE INDEX DEMO_idx ON DEMO (v2) INCLUDE(v3)");
+            
+            stmt.executeUpdate("upsert into DEMO values('x', 'y', 'a')");
+            
+            //assert values in data table
+            ResultSet rs = stmt.executeQuery("select v1, v2, v3 from DEMO");
+            assertTrue(rs.next());
+            assertEquals("x", rs.getString(1));
+            assertEquals("y", rs.getString(2));
+            assertEquals("a", rs.getString(3));
+            assertFalse(rs.next());
+            
+            conn.rollback();
+            
+            //assert values in data table
+            rs = stmt.executeQuery("select v1, v2, v3 from DEMO");
+            assertFalse(rs.next());
+            
+        } finally {
+            conn.close();
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/c7700b41/phoenix-core/src/main/java/org/apache/phoenix/execute/DelegateHTableInterface.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/execute/DelegateHTableInterface.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/execute/DelegateHTableInterface.java
new file mode 100644
index 0000000..07bdcc8
--- /dev/null
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/execute/DelegateHTableInterface.java
@@ -0,0 +1,282 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.execute;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Append;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Durability;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.client.Increment;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Row;
+import org.apache.hadoop.hbase.client.RowMutations;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.coprocessor.Batch.Call;
+import org.apache.hadoop.hbase.client.coprocessor.Batch.Callback;
+import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
+import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
+
+import com.google.protobuf.Descriptors.MethodDescriptor;
+import com.google.protobuf.Message;
+import com.google.protobuf.Service;
+import com.google.protobuf.ServiceException;
+
+public class DelegateHTableInterface implements HTableInterface {
+    protected final HTableInterface delegate;
+
+    public DelegateHTableInterface(HTableInterface delegate) {
+        this.delegate = delegate;
+    }
+
+    @Override
+    public byte[] getTableName() {
+        return delegate.getTableName();
+    }
+
+    @Override
+    public TableName getName() {
+        return delegate.getName();
+    }
+
+    @Override
+    public Configuration getConfiguration() {
+        return delegate.getConfiguration();
+    }
+
+    @Override
+    public HTableDescriptor getTableDescriptor() throws IOException {
+        return delegate.getTableDescriptor();
+    }
+
+    @Override
+    public boolean exists(Get get) throws IOException {
+        return delegate.exists(get);
+    }
+
+    @Override
+    public Boolean[] exists(List<Get> gets) throws IOException {
+        return delegate.exists(gets);
+    }
+
+    @Override
+    public void batch(List<? extends Row> actions, Object[] results) throws 
IOException, InterruptedException {
+        delegate.batch(actions, results);
+    }
+
+    @SuppressWarnings("deprecation")
+    @Override
+    public Object[] batch(List<? extends Row> actions) throws IOException, 
InterruptedException {
+        return delegate.batch(actions);
+    }
+
+    @Override
+    public <R> void batchCallback(List<? extends Row> actions, Object[] 
results, Callback<R> callback)
+            throws IOException, InterruptedException {
+        delegate.batchCallback(actions, results, callback);
+    }
+
+    @SuppressWarnings("deprecation")
+    @Override
+    public <R> Object[] batchCallback(List<? extends Row> actions, Callback<R> 
callback) throws IOException,
+            InterruptedException {
+        return delegate.batchCallback(actions, callback);
+    }
+
+    @Override
+    public Result get(Get get) throws IOException {
+        return delegate.get(get);
+    }
+
+    @Override
+    public Result[] get(List<Get> gets) throws IOException {
+        return delegate.get(gets);
+    }
+
+    @SuppressWarnings("deprecation")
+    @Override
+    public Result getRowOrBefore(byte[] row, byte[] family) throws IOException 
{
+        return delegate.getRowOrBefore(row, family);
+    }
+
+    @Override
+    public ResultScanner getScanner(Scan scan) throws IOException {
+        return delegate.getScanner(scan);
+    }
+
+    @Override
+    public ResultScanner getScanner(byte[] family) throws IOException {
+        return delegate.getScanner(family);
+    }
+
+    @Override
+    public ResultScanner getScanner(byte[] family, byte[] qualifier) throws 
IOException {
+        return delegate.getScanner(family, qualifier);
+    }
+
+    @Override
+    public void put(Put put) throws IOException {
+        delegate.put(put);
+    }
+
+    @Override
+    public void put(List<Put> puts) throws IOException {
+        delegate.put(puts);
+    }
+
+    @Override
+    public boolean checkAndPut(byte[] row, byte[] family, byte[] qualifier, 
byte[] value, Put put) throws IOException {
+        return delegate.checkAndPut(row, family, qualifier, value, put);
+    }
+
+    @Override
+    public void delete(Delete delete) throws IOException {
+        delegate.delete(delete);
+    }
+
+    @Override
+    public void delete(List<Delete> deletes) throws IOException {
+        delegate.delete(deletes);
+    }
+
+    @Override
+    public boolean checkAndDelete(byte[] row, byte[] family, byte[] qualifier, 
byte[] value, Delete delete)
+            throws IOException {
+        return delegate.checkAndDelete(row, family, qualifier, value, delete);
+    }
+
+    @Override
+    public void mutateRow(RowMutations rm) throws IOException {
+        delegate.mutateRow(rm);
+    }
+
+    @Override
+    public Result append(Append append) throws IOException {
+        return delegate.append(append);
+    }
+
+    @Override
+    public Result increment(Increment increment) throws IOException {
+        return delegate.increment(increment);
+    }
+
+    @Override
+    public long incrementColumnValue(byte[] row, byte[] family, byte[] 
qualifier, long amount) throws IOException {
+        return delegate.incrementColumnValue(row, family, qualifier, amount);
+    }
+
+    @Override
+    public long incrementColumnValue(byte[] row, byte[] family, byte[] 
qualifier, long amount, Durability durability)
+            throws IOException {
+        return delegate.incrementColumnValue(row, family, qualifier, amount, 
durability);
+    }
+
+    @SuppressWarnings("deprecation")
+    @Override
+    public long incrementColumnValue(byte[] row, byte[] family, byte[] 
qualifier, long amount, boolean writeToWAL)
+            throws IOException {
+        return delegate.incrementColumnValue(row, family, qualifier, amount, 
writeToWAL);
+    }
+
+    @Override
+    public boolean isAutoFlush() {
+        return delegate.isAutoFlush();
+    }
+
+    @Override
+    public void flushCommits() throws IOException {
+        delegate.flushCommits();
+    }
+
+    @Override
+    public void close() throws IOException {
+        delegate.close();
+    }
+
+    @Override
+    public CoprocessorRpcChannel coprocessorService(byte[] row) {
+        return delegate.coprocessorService(row);
+    }
+
+    @Override
+    public <T extends Service, R> Map<byte[], R> coprocessorService(Class<T> 
service, byte[] startKey, byte[] endKey,
+            Call<T, R> callable) throws ServiceException, Throwable {
+        return delegate.coprocessorService(service, startKey, endKey, 
callable);
+    }
+
+    @Override
+    public <T extends Service, R> void coprocessorService(Class<T> service, 
byte[] startKey, byte[] endKey,
+            Call<T, R> callable, Callback<R> callback) throws 
ServiceException, Throwable {
+        delegate.coprocessorService(service, startKey, endKey, callable, 
callback);
+    }
+
+    @SuppressWarnings("deprecation")
+    @Override
+    public void setAutoFlush(boolean autoFlush) {
+        delegate.setAutoFlush(autoFlush);
+    }
+
+    @Override
+    public void setAutoFlush(boolean autoFlush, boolean clearBufferOnFail) {
+        delegate.setAutoFlush(autoFlush, clearBufferOnFail);
+    }
+
+    @Override
+    public void setAutoFlushTo(boolean autoFlush) {
+        delegate.setAutoFlushTo(autoFlush);
+    }
+
+    @Override
+    public long getWriteBufferSize() {
+        return delegate.getWriteBufferSize();
+    }
+
+    @Override
+    public void setWriteBufferSize(long writeBufferSize) throws IOException {
+        delegate.setWriteBufferSize(writeBufferSize);
+    }
+
+    @Override
+    public <R extends Message> Map<byte[], R> 
batchCoprocessorService(MethodDescriptor methodDescriptor,
+            Message request, byte[] startKey, byte[] endKey, R 
responsePrototype) throws ServiceException, Throwable {
+        return delegate.batchCoprocessorService(methodDescriptor, request, 
startKey, endKey, responsePrototype);
+    }
+
+    @Override
+    public <R extends Message> void batchCoprocessorService(MethodDescriptor 
methodDescriptor, Message request,
+            byte[] startKey, byte[] endKey, R responsePrototype, Callback<R> 
callback) throws ServiceException,
+            Throwable {
+        delegate.batchCoprocessorService(methodDescriptor, request, startKey, 
endKey, responsePrototype, callback);
+    }
+
+    @Override
+    public boolean checkAndMutate(byte[] row, byte[] family, byte[] qualifier, 
CompareOp compareOp, byte[] value,
+            RowMutations mutation) throws IOException {
+        return delegate.checkAndMutate(row, family, qualifier, compareOp, 
value, mutation);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/c7700b41/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java 
b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
index b7c7850..4f1a2cd 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
@@ -36,8 +36,10 @@ import co.cask.tephra.TransactionSystemClient;
 import co.cask.tephra.hbase98.TransactionAwareHTable;
 
 import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.HTableInterface;
 import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.phoenix.cache.ServerCacheClient;
@@ -58,6 +60,8 @@ import org.apache.phoenix.schema.MetaDataClient;
 import org.apache.phoenix.schema.PColumn;
 import org.apache.phoenix.schema.PRow;
 import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.PTable.IndexType;
+import org.apache.phoenix.schema.PTableKey;
 import org.apache.phoenix.schema.PTableType;
 import org.apache.phoenix.schema.TableRef;
 import org.apache.phoenix.trace.util.Tracing;
@@ -90,7 +94,7 @@ public class MutationState implements SQLCloseable {
 
     private PhoenixConnection connection;
     private final long maxSize;
-    private final ImmutableBytesPtr tempPtr = new ImmutableBytesPtr();
+    private final ImmutableBytesWritable tempPtr = new 
ImmutableBytesWritable();
     // map from table to rows 
     //   rows - map from rowkey to columns
     //      columns - map from column to value
@@ -428,11 +432,76 @@ public class MutationState implements SQLCloseable {
         }
     }
     
+    private boolean hasKeyValueColumn(PTable table, PTable index) {
+        IndexMaintainer maintainer = index.getIndexMaintainer(table, 
connection);
+        return !maintainer.getAllColumns().isEmpty();
+    }
+    
+    private void divideImmutableIndexes(Iterator<PTable> 
enabledImmutableIndexes, PTable table, List<PTable> rowKeyIndexes, List<PTable> 
keyValueIndexes) {
+        while (enabledImmutableIndexes.hasNext()) {
+            PTable index = enabledImmutableIndexes.next();
+            if (index.getIndexType() != IndexType.LOCAL) {
+                if (hasKeyValueColumn(table, index)) {
+                    keyValueIndexes.add(index);
+                } else {
+                    rowKeyIndexes.add(index);
+                }
+            }
+        }
+    }
+    private class MetaDataAwareHTable extends DelegateHTableInterface {
+        private final TableRef tableRef;
+        
+        private MetaDataAwareHTable(HTableInterface delegate, TableRef 
tableRef) {
+            super(delegate);
+            this.tableRef = tableRef;
+        }
+        
+        @Override
+        public void delete(List<Delete> deletes) throws IOException {
+            try {
+                PTable table = tableRef.getTable();
+                List<PTable> indexes = table.getIndexes();
+                Iterator<PTable> enabledIndexes = 
IndexMaintainer.nonDisabledIndexIterator(indexes.iterator());
+                if (enabledIndexes.hasNext()) {
+                    List<PTable> keyValueIndexes = Collections.emptyList();
+                    ImmutableBytesWritable indexMetaDataPtr = new 
ImmutableBytesWritable();
+                    boolean attachMetaData = 
table.getIndexMaintainers(indexMetaDataPtr, connection);
+                    if (table.isImmutableRows()) {
+                        List<PTable> rowKeyIndexes = 
Lists.newArrayListWithExpectedSize(indexes.size());
+                        keyValueIndexes = 
Lists.newArrayListWithExpectedSize(indexes.size());
+                        divideImmutableIndexes(enabledIndexes, table, 
rowKeyIndexes, keyValueIndexes);
+                        // Generate index deletes for immutable indexes that 
only reference row key
+                        // columns and submit directly here.
+                        for (PTable index : rowKeyIndexes) {
+                            List<Delete> indexDeletes = 
IndexUtil.generateDeleteIndexData(table, index, deletes, tempPtr, 
connection.getKeyValueBuilder(), connection);
+                            HTableInterface hindex = 
connection.getQueryServices().getTable(index.getPhysicalName().getBytes());
+                            hindex.delete(indexDeletes);
+                        }
+                    }
+                    
+                    // If we have mutable indexes, local immutable indexes, or 
global immutable indexes
+                    // that reference key value columns, setup index meta data 
and attach here. In this
+                    // case updates to the indexes will be generated on the 
server side.
+                    if (!keyValueIndexes.isEmpty()) {
+                        attachMetaData = true;
+                        IndexMaintainer.serializeAdditional(table, 
indexMetaDataPtr, keyValueIndexes, connection);
+                    }
+                    if (attachMetaData) {
+                        setMetaDataOnMutations(tableRef, deletes, 
indexMetaDataPtr);
+                    }
+                }
+                delegate.delete(deletes);
+            } catch (SQLException e) {
+                throw new IOException(e);
+            }
+        }
+    }
+    
     @SuppressWarnings("deprecation")
     private void send(Iterator<TableRef> tableRefIterator) throws SQLException 
{
         int i = 0;
         long[] serverTimeStamps = null;
-        byte[] tenantId = connection.getTenantId() == null ? null : 
connection.getTenantId().getBytes();
         // Validate up front if not transactional so that we 
         if (tableRefIterator == null) {
             serverTimeStamps = validateAll();
@@ -449,8 +518,7 @@ public class MutationState implements SQLCloseable {
                 continue;
             }
             PTable table = tableRef.getTable();
-            table.getIndexMaintainers(tempPtr, connection);
-            boolean hasIndexMaintainers = tempPtr.getLength() > 0;
+            boolean hasIndexMaintainers = table.getIndexMaintainers(tempPtr, 
connection);
             boolean isDataTable = true;
             // Validate as we go if transactional since we can undo if a 
problem occurs (which is unlikely)
             long serverTimestamp = serverTimeStamps == null ? 
validate(tableRef, valuesMap) : serverTimeStamps[i++];
@@ -469,43 +537,23 @@ public class MutationState implements SQLCloseable {
                 do {
                     ServerCache cache = null;
                     if (hasIndexMaintainers && isDataTable) {
-                        byte[] attribValue = null;
-                        byte[] uuidValue;
-                        byte[] txState = ByteUtil.EMPTY_BYTE_ARRAY;
-                        if (table.isTransactional()) {
-                            txState = 
TransactionUtil.encodeTxnState(getTransaction());
-                        }
-                        if 
(IndexMetaDataCacheClient.useIndexMetadataCache(connection, mutations, 
tempPtr.getLength() + txState.length)) {
-                            IndexMetaDataCacheClient client = new 
IndexMetaDataCacheClient(connection, tableRef);
-                            cache = client.addIndexMetadataCache(mutations, 
tempPtr, txState);
-                            child.addTimelineAnnotation("Updated index 
metadata cache");
-                            uuidValue = cache.getId();
-                            // If we haven't retried yet, retry for this case 
only, as it's possible that
-                            // a split will occur after we send the index 
metadata cache to all known
-                            // region servers.
-                            shouldRetry = true;
-                        } else {
-                            attribValue = 
ByteUtil.copyKeyBytesIfNecessary(tempPtr);
-                            uuidValue = ServerCacheClient.generateId();
-                        }
-                        // Either set the UUID to be able to access the index 
metadata from the cache
-                        // or set the index metadata directly on the Mutation
-                        for (Mutation mutation : mutations) {
-                            if (tenantId != null) {
-                                
mutation.setAttribute(PhoenixRuntime.TENANT_ID_ATTRIB, tenantId);
-                            }
-                            
mutation.setAttribute(PhoenixIndexCodec.INDEX_UUID, uuidValue);
-                            if (attribValue != null) {
-                                
mutation.setAttribute(PhoenixIndexCodec.INDEX_MD, attribValue);
-                                
mutation.setAttribute(BaseScannerRegionObserver.TX_STATE, txState);
-                            }
-                        }
+                        cache = setMetaDataOnMutations(tableRef, mutations, 
tempPtr);
                     }
                 
+                    // If we haven't retried yet, retry for this case only, as 
it's possible that
+                    // a split will occur after we send the index metadata 
cache to all known
+                    // region servers.
+                    shouldRetry = cache != null;
                     SQLException sqlE = null;
                     HTableInterface hTable = 
connection.getQueryServices().getTable(htableName);
                     try {
                         if (table.isTransactional()) {
+                            // If we have indexes, wrap the HTable in a 
delegate HTable that
+                            // will attach the necessary index meta data in 
the event of a
+                            // rollback
+                            if (!table.getIndexes().isEmpty()) {
+                                hTable = new MetaDataAwareHTable(hTable, 
tableRef);
+                            }
                             TransactionAwareHTable txnAware = 
TransactionUtil.getTransactionAwareHTable(hTable);
                             // Don't add immutable indexes (those are the only 
ones that would participate
                             // during a commit), as we don't need conflict 
detection for these.
@@ -580,6 +628,40 @@ public class MutationState implements SQLCloseable {
         assert(numRows==0);
         assert(this.mutations.isEmpty());
     }
+
+    private ServerCache setMetaDataOnMutations(TableRef tableRef, List<? 
extends Mutation> mutations,
+            ImmutableBytesWritable indexMetaDataPtr) throws SQLException {
+        PTable table = tableRef.getTable();
+        byte[] tenantId = connection.getTenantId() == null ? null : 
connection.getTenantId().getBytes();
+        ServerCache cache = null;
+        byte[] attribValue = null;
+        byte[] uuidValue;
+        byte[] txState = ByteUtil.EMPTY_BYTE_ARRAY;
+        if (table.isTransactional()) {
+            txState = TransactionUtil.encodeTxnState(getTransaction());
+        }
+        if (IndexMetaDataCacheClient.useIndexMetadataCache(connection, 
mutations, indexMetaDataPtr.getLength() + txState.length)) {
+            IndexMetaDataCacheClient client = new 
IndexMetaDataCacheClient(connection, tableRef);
+            cache = client.addIndexMetadataCache(mutations, indexMetaDataPtr, 
txState);
+            uuidValue = cache.getId();
+        } else {
+            attribValue = ByteUtil.copyKeyBytesIfNecessary(indexMetaDataPtr);
+            uuidValue = ServerCacheClient.generateId();
+        }
+        // Either set the UUID to be able to access the index metadata from 
the cache
+        // or set the index metadata directly on the Mutation
+        for (Mutation mutation : mutations) {
+            if (tenantId != null) {
+                mutation.setAttribute(PhoenixRuntime.TENANT_ID_ATTRIB, 
tenantId);
+            }
+            mutation.setAttribute(PhoenixIndexCodec.INDEX_UUID, uuidValue);
+            if (attribValue != null) {
+                mutation.setAttribute(PhoenixIndexCodec.INDEX_MD, attribValue);
+                mutation.setAttribute(BaseScannerRegionObserver.TX_STATE, 
txState);
+            }
+        }
+        return cache;
+    }
     
     public void clear() throws SQLException {
         this.mutations.clear();
@@ -649,7 +731,14 @@ public class MutationState implements SQLCloseable {
             // We really should be keying the tables based on the physical 
table name.
             List<TableRef> strippedAliases = 
Lists.newArrayListWithExpectedSize(mutations.keySet().size());
             while (filteredTableRefs.hasNext()) {
-                strippedAliases.add(new TableRef(filteredTableRefs.next(), 
null));
+                /*
+                 * We'll have a PROJECTED table here, but we need the TABLE 
instead as otherwise we can't
+                 * get the cf:cq which we need for IndexMaintainer.
+                 */
+                TableRef tableRef = filteredTableRefs.next();
+                PTable projectedTable = tableRef.getTable();
+                PTable nonProjectedTable = 
connection.getMetaDataCache().getTable(new 
PTableKey(projectedTable.getTenantId(), projectedTable.getName().getString()));
+                strippedAliases.add(new TableRef(null, nonProjectedTable, 
tableRef.getTimeStamp(), tableRef.getLowerBoundTimeStamp(), 
tableRef.hasDynamicCols()));
             }
             startTransaction();
             send(strippedAliases.iterator());

http://git-wip-us.apache.org/repos/asf/phoenix/blob/c7700b41/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java 
b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
index 1be0aa3..be668d6 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
@@ -194,6 +194,35 @@ public class IndexMaintainer implements Writable, 
Iterable<ColumnReference> {
         ptr.set(stream.getBuffer(), 0, stream.size());
     }
     
+
+    public static void serializeAdditional(PTable table, 
ImmutableBytesWritable indexMetaDataPtr,
+            List<PTable> keyValueIndexes, PhoenixConnection connection) {
+        int nMutableIndexes = indexMetaDataPtr.getLength() == 0 ? 0 : 
ByteUtil.vintFromBytes(indexMetaDataPtr);
+        int nIndexes = nMutableIndexes + keyValueIndexes.size();
+        int estimatedSize = indexMetaDataPtr.getLength() + 1; // Just in case 
new size increases buffer
+        for (PTable index : keyValueIndexes) {
+            estimatedSize += index.getIndexMaintainer(table, 
connection).getEstimatedByteSize();
+        }
+        TrustedByteArrayOutputStream stream = new 
TrustedByteArrayOutputStream(estimatedSize + 1);
+        DataOutput output = new DataOutputStream(stream);
+        try {
+            // Encode data table salting in sign of number of indexes
+            WritableUtils.writeVInt(output, nIndexes * (table.getBucketNum() 
== null ? 1 : -1));
+            // Serialize current mutable indexes, subtracting the vint size 
from the length
+            // as its still included
+            if (indexMetaDataPtr.getLength() > 0) {
+                output.write(indexMetaDataPtr.get(), 
indexMetaDataPtr.getOffset(), 
indexMetaDataPtr.getLength()-WritableUtils.getVIntSize(nMutableIndexes));
+            }
+            // Serialize mutable indexes afterwards
+            for (PTable index : keyValueIndexes) {
+                index.getIndexMaintainer(table, connection).write(output);
+            }
+        } catch (IOException e) {
+            throw new RuntimeException(e); // Impossible
+        }
+        indexMetaDataPtr.set(stream.getBuffer(), 0, stream.size());
+    }
+    
     public static List<IndexMaintainer> deserialize(ImmutableBytesWritable 
metaDataPtr,
             KeyValueBuilder builder) {
         return deserialize(metaDataPtr.get(), metaDataPtr.getOffset(), 
metaDataPtr.getLength());
@@ -264,6 +293,7 @@ public class IndexMaintainer implements Writable, 
Iterable<ColumnReference> {
 
     private IndexMaintainer(PTable dataTable, PTable index, PhoenixConnection 
connection) {
         this(dataTable.getRowKeySchema(), dataTable.getBucketNum() != null);
+        assert(dataTable.getType() == PTableType.SYSTEM || dataTable.getType() 
== PTableType.TABLE || dataTable.getType() == PTableType.VIEW);
         this.isMultiTenant = dataTable.isMultiTenant();
         this.viewIndexId = index.getViewIndexId() == null ? null : 
MetaDataUtil.getViewIndexIdDataType().toBytes(index.getViewIndexId());
         this.isLocalIndex = index.getIndexType() == IndexType.LOCAL;

http://git-wip-us.apache.org/repos/asf/phoenix/blob/c7700b41/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMetaDataCacheClient.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMetaDataCacheClient.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMetaDataCacheClient.java
index c1135bc..1b0c599 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMetaDataCacheClient.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMetaDataCacheClient.java
@@ -24,7 +24,6 @@ import java.util.List;
 
 import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-
 import org.apache.phoenix.cache.ServerCacheClient;
 import org.apache.phoenix.cache.ServerCacheClient.ServerCache;
 import org.apache.phoenix.compile.ScanRanges;
@@ -59,7 +58,7 @@ public class IndexMetaDataCacheClient {
      * @param mutations the list of mutations that will be sent in a batch to 
server
      * @param indexMetaDataByteLength length in bytes of the index metadata 
cache
      */
-    public static boolean useIndexMetadataCache(PhoenixConnection connection, 
List<Mutation> mutations, int indexMetaDataByteLength) {
+    public static boolean useIndexMetadataCache(PhoenixConnection connection, 
List<? extends Mutation> mutations, int indexMetaDataByteLength) {
         ReadOnlyProps props = connection.getQueryServices().getProps();
         int threshold = props.getInt(INDEX_MUTATE_BATCH_SIZE_THRESHOLD_ATTRIB, 
QueryServicesOptions.DEFAULT_INDEX_MUTATE_BATCH_SIZE_THRESHOLD);
         return (indexMetaDataByteLength > ServerCacheClient.UUID_LENGTH && 
mutations.size() > threshold);
@@ -73,7 +72,7 @@ public class IndexMetaDataCacheClient {
      * @throws MaxServerCacheSizeExceededException if size of hash cache 
exceeds max allowed
      * size
      */
-    public ServerCache addIndexMetadataCache(List<Mutation> mutations, 
ImmutableBytesWritable ptr, byte[] txState) throws SQLException {
+    public ServerCache addIndexMetadataCache(List<? extends Mutation> 
mutations, ImmutableBytesWritable ptr, byte[] txState) throws SQLException {
         /**
          * Serialize and compress hashCacheTable
          */

http://git-wip-us.apache.org/repos/asf/phoenix/blob/c7700b41/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexMetaData.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexMetaData.java 
b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexMetaData.java
index 26c1c12..ff0eb14 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexMetaData.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexMetaData.java
@@ -40,6 +40,7 @@ import org.apache.phoenix.util.ServerUtil;
 import org.apache.phoenix.util.TransactionUtil;
 
 public class PhoenixIndexMetaData implements IndexMetaData {
+    private final Map<String, byte[]> attributes;
     private final IndexMetaDataCache indexMetaDataCache;
     
     private static IndexMetaDataCache 
getIndexMetaData(RegionCoprocessorEnvironment env, Map<String, byte[]> 
attributes) throws IOException {
@@ -84,7 +85,8 @@ public class PhoenixIndexMetaData implements IndexMetaData {
     }
 
     public PhoenixIndexMetaData(RegionCoprocessorEnvironment env, 
Map<String,byte[]> attributes) throws IOException {
-        indexMetaDataCache = getIndexMetaData(env, attributes);
+        this.indexMetaDataCache = getIndexMetaData(env, attributes);
+        this.attributes = attributes;
     }
     
     public Transaction getTransaction() {
@@ -94,4 +96,8 @@ public class PhoenixIndexMetaData implements IndexMetaData {
     public List<IndexMaintainer> getIndexMaintainers() {
         return indexMetaDataCache.getIndexMaintainers();
     }
+
+    public Map<String, byte[]> getAttributes() {
+        return attributes;
+    }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/c7700b41/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
index 6f1e28c..862c4ba 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
@@ -14,11 +14,13 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
 import co.cask.tephra.Transaction;
+import co.cask.tephra.TxConstants;
 import co.cask.tephra.hbase98.TransactionAwareHTable;
 
 import org.apache.commons.logging.Log;
@@ -28,6 +30,8 @@ import org.apache.hadoop.hbase.CellScanner;
 import org.apache.hadoop.hbase.CoprocessorEnvironment;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Durability;
 import org.apache.hadoop.hbase.client.HTableInterface;
 import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.client.Result;
@@ -40,6 +44,7 @@ import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.io.TimeRange;
 import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
 import org.apache.hadoop.hbase.regionserver.RegionScanner;
+import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.phoenix.compile.ScanRanges;
 import org.apache.phoenix.hbase.index.MultiMutation;
@@ -64,6 +69,7 @@ import org.cloudera.htrace.Span;
 import org.cloudera.htrace.Trace;
 import org.cloudera.htrace.TraceScope;
 
+import com.google.common.collect.Iterators;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
@@ -101,16 +107,39 @@ public class PhoenixTransactionalIndexer extends 
BaseRegionObserver {
         this.writer.stop(msg);
     }
 
+    private static Iterator<Mutation> getMutationIterator(final 
MiniBatchOperationInProgress<Mutation> miniBatchOp) {
+        return new Iterator<Mutation>() {
+            private int i = 0;
+            
+            @Override
+            public boolean hasNext() {
+                return i < miniBatchOp.size();
+            }
+
+            @Override
+            public Mutation next() {
+                return miniBatchOp.getOperation(i++);
+            }
+
+            @Override
+            public void remove() {
+                throw new UnsupportedOperationException();
+            }
+            
+        };
+    }
     @Override
     public void preBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c,
             MiniBatchOperationInProgress<Mutation> miniBatchOp) throws 
IOException {
 
         Mutation m = miniBatchOp.getOperation(0);
-        if (!codec.isEnabled(m)) {
+        if (!codec.isEnabled(m) || 
m.getAttribute(TxConstants.TX_ROLLBACK_ATTRIBUTE_KEY) != null) {
             super.preBatchMutate(c, miniBatchOp);
             return;
         }
 
+        Map<String,byte[]> updateAttributes = m.getAttributesMap();
+        PhoenixIndexMetaData indexMetaData = new 
PhoenixIndexMetaData(c.getEnvironment(),updateAttributes);
         Collection<Pair<Mutation, byte[]>> indexUpdates = null;
         // get the current span, or just use a null-span to avoid a bunch of 
if statements
         try (TraceScope scope = Trace.startSpan("Starting to build index 
updates")) {
@@ -120,7 +149,7 @@ public class PhoenixTransactionalIndexer extends 
BaseRegionObserver {
             }
 
             // get the index updates for all elements in this batch
-            indexUpdates = getIndexUpdates(c.getEnvironment(), miniBatchOp);
+            indexUpdates = getIndexUpdates(c.getEnvironment(), indexMetaData, 
getMutationIterator(miniBatchOp), false);
 
             current.addTimelineAnnotation("Built index updates, doing 
preStep");
             TracingUtils.addAnnotation(current, "index update count", 
indexUpdates.size());
@@ -159,29 +188,41 @@ public class PhoenixTransactionalIndexer extends 
BaseRegionObserver {
         return s;
     }
 
-    private Collection<Pair<Mutation, byte[]>> 
getIndexUpdates(RegionCoprocessorEnvironment env, 
MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
-        // Collect the set of mutable ColumnReferences so that we can first
-        // run a scan to get the current state. We'll need this to delete
-        // the existing index rows.
-        Map<String,byte[]> updateAttributes = 
miniBatchOp.getOperation(0).getAttributesMap();
-        PhoenixIndexMetaData indexMetaData = new 
PhoenixIndexMetaData(env,updateAttributes);
-        Transaction tx = indexMetaData.getTransaction();
-        assert(tx != null);
-        List<IndexMaintainer> indexMaintainers = 
indexMetaData.getIndexMaintainers();
-        Set<ColumnReference> mutableColumns = 
Sets.newHashSetWithExpectedSize(indexMaintainers.size() * 10);
-        for (IndexMaintainer indexMaintainer : indexMaintainers) {
-            if (!indexMaintainer.isImmutableRows()) {
-                mutableColumns.addAll(indexMaintainer.getAllColumns());
+    @Override
+    public void preDelete(final ObserverContext<RegionCoprocessorEnvironment> 
e, final Delete delete,
+        final WALEdit edit, final Durability durability) throws IOException {
+        
+        // Need to do this in preDelete as otherwise our scan won't see the 
old values unless
+        // we do a raw scan.
+        if (delete.getAttribute(TxConstants.TX_ROLLBACK_ATTRIBUTE_KEY) == null 
|| !codec.isEnabled(delete)) {
+            super.preDelete(e, delete, edit, durability);
+            return;
+        }
+        Map<String,byte[]> updateAttributes = delete.getAttributesMap();
+        PhoenixIndexMetaData indexMetaData = new 
PhoenixIndexMetaData(e.getEnvironment(),updateAttributes);
+        Collection<Pair<Mutation, byte[]>> indexUpdates = null;
+        try {
+            indexUpdates = getIndexUpdates(e.getEnvironment(), indexMetaData, 
Iterators.<Mutation>singletonIterator(delete), true);
+            // no index updates, so we are done
+            if (!indexUpdates.isEmpty()) {
+                this.writer.write(indexUpdates);
             }
+        } catch (Throwable t) {
+            String msg = "Failed to rollback index updates: " + indexUpdates;
+            LOG.error(msg, t);
+            ServerUtil.throwIOException(msg, t);
         }
+    }
+
+    private Collection<Pair<Mutation, byte[]>> 
getIndexUpdates(RegionCoprocessorEnvironment env, PhoenixIndexMetaData 
indexMetaData, Iterator<Mutation> mutationIterator, boolean readOwnWrites) 
throws IOException {
         ResultScanner scanner = null;
         TransactionAwareHTable txTable = null;
         
         // Collect up all mutations in batch
         Map<ImmutableBytesPtr, MultiMutation> mutations =
                 new HashMap<ImmutableBytesPtr, MultiMutation>();
-        for (int i = 0; i < miniBatchOp.size(); i++) {
-            Mutation m = miniBatchOp.getOperation(i);
+        while(mutationIterator.hasNext()) {
+            Mutation m = mutationIterator.next();
             // add the mutation to the batch set
             ImmutableBytesPtr row = new ImmutableBytesPtr(m.getRow());
             MultiMutation stored = mutations.get(row);
@@ -193,6 +234,19 @@ public class PhoenixTransactionalIndexer extends 
BaseRegionObserver {
             stored.addAll(m);
         }
         
+        // Collect the set of mutable ColumnReferences so that we can first
+        // run a scan to get the current state. We'll need this to delete
+        // the existing index rows.
+        Transaction tx = indexMetaData.getTransaction();
+        assert(tx != null);
+        List<IndexMaintainer> indexMaintainers = 
indexMetaData.getIndexMaintainers();
+        Set<ColumnReference> mutableColumns = 
Sets.newHashSetWithExpectedSize(indexMaintainers.size() * 10);
+        for (IndexMaintainer indexMaintainer : indexMaintainers) {
+            if (!indexMaintainer.isImmutableRows()) {
+                mutableColumns.addAll(indexMaintainer.getAllColumns());
+            }
+        }
+
         Collection<Pair<Mutation, byte[]>> indexUpdates = new 
ArrayList<Pair<Mutation, byte[]>>(mutations.size() * 2 * 
indexMaintainers.size());
         try {
             if (!mutableColumns.isEmpty()) {
@@ -201,7 +255,9 @@ public class PhoenixTransactionalIndexer extends 
BaseRegionObserver {
                     
keys.add(PVarbinary.INSTANCE.getKeyRange(ptr.copyBytesIfNecessary()));
                 }
                 Scan scan = new Scan();
-                scan.setAttribute(TX_NO_READ_OWN_WRITES, 
PDataType.TRUE_BYTES); // TODO: remove when Tephra allows this
+                if (!readOwnWrites) {
+                    scan.setAttribute(TX_NO_READ_OWN_WRITES, 
PDataType.TRUE_BYTES); // TODO: remove when Tephra allows this
+                }
                 // Project all mutable columns
                 for (ColumnReference ref : mutableColumns) {
                     scan.addColumn(ref.getFamily(), ref.getQualifier());
@@ -217,14 +273,17 @@ public class PhoenixTransactionalIndexer extends 
BaseRegionObserver {
                 txTable.startTx(tx);
                 scanner = txTable.getScanner(scan);
             }
+            ColumnReference emptyColRef = new 
ColumnReference(indexMaintainers.get(0).getDataEmptyKeyValueCF(), 
QueryConstants.EMPTY_COLUMN_BYTES);
             if (scanner != null) {
                 Result result;
                 while ((result = scanner.next()) != null) {
                     Mutation m = mutations.remove(new 
ImmutableBytesPtr(result.getRow()));
-                    TxTableState state = new TxTableState(env, mutableColumns, 
updateAttributes, tx.getWritePointer(), m, result);
+                    byte[] attribValue = 
m.getAttribute(TxConstants.TX_ROLLBACK_ATTRIBUTE_KEY);
+                    TxTableState state = new TxTableState(env, mutableColumns, 
indexMetaData.getAttributes(), tx.getWritePointer(), m, emptyColRef, result);
                     Iterable<IndexUpdate> deletes = 
codec.getIndexDeletes(state, indexMetaData);
                     for (IndexUpdate delete : deletes) {
                         if (delete.isValid()) {
+                            
delete.getUpdate().setAttribute(TxConstants.TX_ROLLBACK_ATTRIBUTE_KEY, 
attribValue);
                             indexUpdates.add(new Pair<Mutation, 
byte[]>(delete.getUpdate(),delete.getTableName()));
                         }
                     }
@@ -238,7 +297,7 @@ public class PhoenixTransactionalIndexer extends 
BaseRegionObserver {
                 }
             }
             for (Mutation m : mutations.values()) {
-                TxTableState state = new TxTableState(env, mutableColumns, 
updateAttributes, tx.getWritePointer(), m);
+                TxTableState state = new TxTableState(env, mutableColumns, 
indexMetaData.getAttributes(), tx.getWritePointer(), m);
                 state.applyMutation();
                 Iterable<IndexUpdate> puts = codec.getIndexUpserts(state, 
indexMetaData);
                 for (IndexUpdate put : puts) {
@@ -284,7 +343,7 @@ public class PhoenixTransactionalIndexer extends 
BaseRegionObserver {
             }
         }
         
-        public TxTableState(RegionCoprocessorEnvironment env, 
Set<ColumnReference> indexedColumns, Map<String, byte[]> attributes, long 
currentTimestamp, Mutation m, Result r) {
+        public TxTableState(RegionCoprocessorEnvironment env, 
Set<ColumnReference> indexedColumns, Map<String, byte[]> attributes, long 
currentTimestamp, Mutation m, ColumnReference emptyColRef, Result r) {
             this(env, indexedColumns, attributes, currentTimestamp, m);
 
             for (ColumnReference ref : indexedColumns) {
@@ -295,6 +354,14 @@ public class PhoenixTransactionalIndexer extends 
BaseRegionObserver {
                     valueMap.put(ref, ptr);
                 }
             }
+            /*
+            Cell cell = r.getColumnLatestCell(emptyColRef.getFamily(), 
emptyColRef.getQualifier());
+            if (cell != null) {
+                ImmutableBytesWritable ptr = new ImmutableBytesWritable();
+                ptr.set(cell.getValueArray(), cell.getValueOffset(), 
cell.getValueLength());
+                valueMap.put(emptyColRef, ptr);
+            }
+            */
         }
         
         @Override

http://git-wip-us.apache.org/repos/asf/phoenix/blob/c7700b41/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java 
b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
index ef2a233..19beecd 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
@@ -33,6 +33,7 @@ import java.text.Format;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.Iterator;
 import java.util.List;
 
 import org.apache.hadoop.hbase.client.Scan;
@@ -134,6 +135,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Throwables;
+import com.google.common.collect.Iterators;
 import com.google.common.collect.ListMultimap;
 import com.google.common.collect.Lists;
 /**
@@ -236,7 +238,11 @@ public class PhoenixStatement implements Statement, 
SQLCloseable, org.apache.pho
                         QueryPlan plan = 
stmt.compilePlan(PhoenixStatement.this, Sequence.ValueOp.RESERVE_SEQUENCE);
                         // Send mutations to hbase, so they are visible to 
subsequent reads.
                         // Use original plan for data table so that data and 
immutable indexes will be sent
-                        boolean isTransactional = 
connection.getMutationState().startTransaction(plan.getContext().getResolver().getTables().iterator());
+                        // TODO: for joins, we need to iterate through all 
tables, but we need the original table,
+                        // not the projected table, so 
plan.getContext().getResolver().getTables() won't work.
+                        TableRef tableRef = plan.getTableRef();
+                        Iterator<TableRef> tableRefs = tableRef == null ? 
Iterators.<TableRef>emptyIterator() : Iterators.singletonIterator(tableRef);
+                        boolean isTransactional = 
connection.getMutationState().startTransaction(tableRefs);
                         plan = 
connection.getQueryServices().getOptimizer().optimize(PhoenixStatement.this, 
plan);
                         if (isTransactional) {
                             // After optimize so that we have the right 
context object

http://git-wip-us.apache.org/repos/asf/phoenix/blob/c7700b41/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateTable.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateTable.java 
b/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateTable.java
index 89c2283..98f9f4a 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateTable.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateTable.java
@@ -162,8 +162,8 @@ public class DelegateTable implements PTable {
     }
 
     @Override
-    public void getIndexMaintainers(ImmutableBytesWritable ptr, 
PhoenixConnection connection) {
-        delegate.getIndexMaintainers(ptr, connection);
+    public boolean getIndexMaintainers(ImmutableBytesWritable ptr, 
PhoenixConnection connection) {
+        return delegate.getIndexMaintainers(ptr, connection);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/phoenix/blob/c7700b41/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java 
b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java
index 6b7f8c6..4b4729f 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java
@@ -307,7 +307,7 @@ public interface PTable {
     PName getPhysicalName();
     boolean isImmutableRows();
 
-    void getIndexMaintainers(ImmutableBytesWritable ptr, PhoenixConnection 
connection);
+    boolean getIndexMaintainers(ImmutableBytesWritable ptr, PhoenixConnection 
connection);
     IndexMaintainer getIndexMaintainer(PTable dataTable, PhoenixConnection 
connection);
     PName getDefaultFamilyName();
 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/c7700b41/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java 
b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java
index 90c68fb..a32e922 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java
@@ -830,7 +830,7 @@ public class PTableImpl implements PTable {
     }
 
     @Override
-    public synchronized void getIndexMaintainers(ImmutableBytesWritable ptr, 
PhoenixConnection connection) {
+    public synchronized boolean getIndexMaintainers(ImmutableBytesWritable 
ptr, PhoenixConnection connection) {
         if (indexMaintainersPtr == null) {
             indexMaintainersPtr = new ImmutableBytesWritable();
             if (indexes.isEmpty()) {
@@ -840,6 +840,7 @@ public class PTableImpl implements PTable {
             }
         }
         ptr.set(indexMaintainersPtr.get(), indexMaintainersPtr.getOffset(), 
indexMaintainersPtr.getLength());
+        return indexMaintainersPtr.getLength() > 0;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/phoenix/blob/c7700b41/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java 
b/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java
index 45729a2..fd111e0 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java
@@ -30,6 +30,7 @@ import java.util.Map;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Get;
 import org.apache.hadoop.hbase.client.HTableInterface;
 import org.apache.hadoop.hbase.client.Mutation;
@@ -209,13 +210,30 @@ public class IndexUtil {
                             .getLength()) == 0);
     }
 
+    public static List<Delete> generateDeleteIndexData(final PTable table, 
PTable index,
+            List<Delete> dataMutations, ImmutableBytesWritable ptr, final 
KeyValueBuilder kvBuilder, PhoenixConnection connection)
+            throws SQLException {
+        try {
+            IndexMaintainer maintainer = index.getIndexMaintainer(table, 
connection);
+            List<Delete> indexMutations = 
Lists.newArrayListWithExpectedSize(dataMutations.size());
+            for (final Mutation dataMutation : dataMutations) {
+                long ts = MetaDataUtil.getClientTimeStamp(dataMutation);
+                ptr.set(dataMutation.getRow());
+                indexMutations.add(maintainer.buildDeleteMutation(kvBuilder, 
ptr, ts));
+            }
+            return indexMutations;
+        } catch (IOException e) {
+            throw new SQLException(e);
+        }
+    }
+    
     public static List<Mutation> generateIndexData(final PTable table, PTable 
index,
             List<Mutation> dataMutations, ImmutableBytesWritable ptr, final 
KeyValueBuilder kvBuilder, PhoenixConnection connection)
             throws SQLException {
         try {
             IndexMaintainer maintainer = index.getIndexMaintainer(table, 
connection);
             List<Mutation> indexMutations = 
Lists.newArrayListWithExpectedSize(dataMutations.size());
-           for (final Mutation dataMutation : dataMutations) {
+            for (final Mutation dataMutation : dataMutations) {
                 long ts = MetaDataUtil.getClientTimeStamp(dataMutation);
                 ptr.set(dataMutation.getRow());
                 /*

http://git-wip-us.apache.org/repos/asf/phoenix/blob/c7700b41/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java 
b/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java
index 2dfa573..3786e6d 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java
@@ -45,12 +45,12 @@ import org.apache.phoenix.filter.SkipScanFilter;
 import org.apache.phoenix.query.KeyRange;
 import org.apache.phoenix.query.KeyRange.Bound;
 import org.apache.phoenix.query.QueryConstants;
-import org.apache.phoenix.schema.types.PDataType;
 import org.apache.phoenix.schema.PName;
 import org.apache.phoenix.schema.PNameFactory;
-import org.apache.phoenix.schema.types.PVarbinary;
 import org.apache.phoenix.schema.RowKeySchema;
 import org.apache.phoenix.schema.ValueSchema.Field;
+import org.apache.phoenix.schema.types.PDataType;
+import org.apache.phoenix.schema.types.PVarbinary;
 
 import com.google.common.collect.Lists;
 
@@ -434,7 +434,7 @@ public class ScanUtil {
         }
     }
     
-    public static ScanRanges newScanRanges(List<Mutation> mutations) throws 
SQLException {
+    public static ScanRanges newScanRanges(List<? extends Mutation> mutations) 
throws SQLException {
         List<KeyRange> keys = 
Lists.newArrayListWithExpectedSize(mutations.size());
         for (Mutation m : mutations) {
             keys.add(PVarbinary.INSTANCE.getKeyRange(m.getRow()));

http://git-wip-us.apache.org/repos/asf/phoenix/blob/c7700b41/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 90eba1d..9ace160 100644
--- a/pom.xml
+++ b/pom.xml
@@ -116,7 +116,6 @@
     
     <maven-dependency-plugin.version>2.1</maven-dependency-plugin.version>
     <maven.assembly.version>2.5.2</maven.assembly.version>
-    <maven.rat.version>0.8</maven.rat.version>
     
     <!-- Plugin options -->
     <numForkedUT>3</numForkedUT>
@@ -174,38 +173,6 @@
           <artifactId>maven-assembly-plugin</artifactId>
           <version>${maven.assembly.version}</version>
         </plugin>
-        <plugin>
-          <groupId>org.apache.rat</groupId>
-          <artifactId>apache-rat-plugin</artifactId>
-          <version>${maven.rat.version}</version>
-          <configuration>
-            <excludes>
-              <exclude>CHANGES</exclude>
-              <exclude>README.md</exclude>
-              <exclude>README</exclude>
-                 <exclude>dev/phoenix.importorder</exclude>
-              <exclude>dev/release_files/**</exclude>
-              <exclude>**/target/**</exclude>
-              <exclude>**/*.versionsBackup</exclude>
-              <!-- exclude docs -->
-              <exclude>docs/**</exclude>
-              <!-- exclude examples -->
-              <exclude>examples/**</exclude>
-              <!-- exclude source control files -->
-              <exclude>.gitignore</exclude>
-              <exclude>.git/**</exclude>
-              <!-- exclude IDE files -->
-              <exclude>**/.idea/**</exclude>
-              <exclude>**/*.iml</exclude>
-              <exclude>.project</exclude>
-              <exclude>.classpath</exclude>
-              <exclude>.settings/**</exclude>
-              <exclude>**/resources/java.sql.Driver</exclude>
-              <!-- exclude protobuf files -->
-              <exclude>**/generated/**</exclude>
-            </excludes>
-          </configuration>
-        </plugin>
         <!-- We put slow-running tests into src/it and run them during the
             integration-test phase using the failsafe plugin. This way
             developers can run unit tests conveniently from the IDE or via
@@ -661,18 +628,6 @@
       <id>release</id>
       <build>
         <plugins>
-          <plugin>
-            <groupId>org.apache.rat</groupId>
-            <artifactId>apache-rat-plugin</artifactId>
-            <executions>
-              <execution>
-                <phase>package</phase>
-                <goals>
-                  <goal>check</goal>
-                </goals>
-              </execution>
-            </executions>
-          </plugin>
         </plugins>
       </build>
     </profile>

Reply via email to