Repository: phoenix
Updated Branches:
  refs/heads/4.x-HBase-1.0 388630f80 -> 637fefee0


PHOENIX-2478 Rows committed in transaction overlapping index creation are not 
populated


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/637fefee
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/637fefee
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/637fefee

Branch: refs/heads/4.x-HBase-1.0
Commit: 637fefee06e3675870070934339ec8006877d10b
Parents: 388630f
Author: James Taylor <[email protected]>
Authored: Tue Jan 19 08:19:44 2016 -0800
Committer: James Taylor <[email protected]>
Committed: Tue Jan 19 08:22:14 2016 -0800

----------------------------------------------------------------------
 .../phoenix/end2end/index/ImmutableIndexIT.java | 239 ++++++++++++++-----
 .../apache/phoenix/execute/MutationState.java   | 115 +++++----
 .../java/org/apache/phoenix/query/BaseTest.java | 103 +++-----
 3 files changed, 285 insertions(+), 172 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/637fefee/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ImmutableIndexIT.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ImmutableIndexIT.java
 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ImmutableIndexIT.java
index 0d329fe..c4ecfbb 100644
--- 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ImmutableIndexIT.java
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ImmutableIndexIT.java
@@ -20,7 +20,6 @@ package org.apache.phoenix.end2end.index;
 import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
 
 import java.sql.Connection;
 import java.sql.DriverManager;
@@ -29,12 +28,26 @@ import java.sql.SQLException;
 import java.sql.Statement;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.List;
 import java.util.Map;
 import java.util.Properties;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 
+import org.apache.hadoop.hbase.DoNotRetryIOException;
+import org.apache.hadoop.hbase.HBaseIOException;
+import org.apache.hadoop.hbase.client.Durability;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.coprocessor.SimpleRegionObserver;
+import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.phoenix.end2end.BaseHBaseManagedTimeIT;
 import org.apache.phoenix.end2end.Shadower;
-import org.apache.phoenix.exception.SQLExceptionCode;
 import org.apache.phoenix.query.BaseTest;
 import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.util.PropertiesUtil;
@@ -47,80 +60,194 @@ import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 import org.junit.runners.Parameterized.Parameters;
 
+import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 
 
 @RunWith(Parameterized.class)
 public class ImmutableIndexIT extends BaseHBaseManagedTimeIT {
-       
-       private final boolean localIndex;
-       private final String tableDDLOptions;
-       private final String tableName;
+
+    private final boolean localIndex;
+    private final String tableDDLOptions;
+    private final String tableName;
     private final String indexName;
     private final String fullTableName;
     private final String fullIndexName;
-       
-       public ImmutableIndexIT(boolean localIndex, boolean transactional) {
-               this.localIndex = localIndex;
-               StringBuilder optionBuilder = new 
StringBuilder("IMMUTABLE_ROWS=true");
-               if (transactional) {
-                       optionBuilder.append(", TRANSACTIONAL=true");
-               }
-               this.tableDDLOptions = optionBuilder.toString();
-               this.tableName = TestUtil.DEFAULT_DATA_TABLE_NAME + ( 
transactional ?  "_TXN" : "");
+
+    private static String TABLE_NAME;
+    private static String INDEX_DDL;
+    public static final AtomicInteger NUM_ROWS = new AtomicInteger(1);
+
+    public ImmutableIndexIT(boolean localIndex, boolean transactional) {
+        this.localIndex = localIndex;
+        StringBuilder optionBuilder = new StringBuilder("IMMUTABLE_ROWS=true");
+        if (transactional) {
+            optionBuilder.append(", TRANSACTIONAL=true");
+        }
+        this.tableDDLOptions = optionBuilder.toString();
+        this.tableName = TestUtil.DEFAULT_DATA_TABLE_NAME + ( transactional ?  
"_TXN" : "");
         this.indexName = "IDX" + ( transactional ?  "_TXN" : "");
         this.fullTableName = 
SchemaUtil.getTableName(TestUtil.DEFAULT_SCHEMA_NAME, tableName);
         this.fullIndexName = 
SchemaUtil.getTableName(TestUtil.DEFAULT_SCHEMA_NAME, indexName);
-       }
-       
-       @BeforeClass
+    }
+
+    @BeforeClass
     @Shadower(classBeingShadowed = BaseHBaseManagedTimeIT.class)
     public static void doSetup() throws Exception {
-        Map<String,String> props = Maps.newHashMapWithExpectedSize(1);
-        props.put(QueryServices.TRANSACTIONS_ENABLED, Boolean.toString(true));
-        setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
+        Map<String, String> serverProps = Maps.newHashMapWithExpectedSize(1);
+        serverProps.put("hbase.coprocessor.region.classes", 
CreateIndexRegionObserver.class.getName());
+        Map<String, String> clientProps = Maps.newHashMapWithExpectedSize(2);
+        clientProps.put(QueryServices.TRANSACTIONS_ENABLED, "true");
+        setUpTestDriver(new ReadOnlyProps(serverProps.entrySet().iterator()), 
new ReadOnlyProps(clientProps.entrySet().iterator()));
     }
-       
-       @Parameters(name="localIndex = {0} , transactional = {1}")
+
+    @Parameters(name="localIndex = {0} , transactional = {1}")
     public static Collection<Boolean[]> data() {
         return Arrays.asList(new Boolean[][] {     
-                 { false, false }, { false, true }, { true, false }, { true, 
true }
-           });
+            { false, true }, { true, true }
+        });
     }
-   
+
+
     @Test
-    public void testDropIfImmutableKeyValueColumn() throws Exception {
-       Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+    public void testCreateIndexDuringUpsertSelect() throws Exception {
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        props.setProperty(QueryServices.MUTATE_BATCH_SIZE_ATTRIB, 
Integer.toString(100));
+        TABLE_NAME = fullTableName + "_testCreateIndexDuringUpsertSelect";
+        String ddl ="CREATE TABLE " + TABLE_NAME + BaseTest.TEST_TABLE_SCHEMA 
+ tableDDLOptions;
+        INDEX_DDL = "CREATE " + (localIndex ? "LOCAL" : "") + " INDEX IF NOT 
EXISTS " + indexName + " ON " + TABLE_NAME
+                + " (long_pk, varchar_pk)"
+                + " INCLUDE (long_col1, long_col2)";
+
+        Connection conn = DriverManager.getConnection(getUrl(), props);
+        try {
+            conn.setAutoCommit(false);
+            Statement stmt = conn.createStatement();
+            stmt.execute(ddl);
+
+            upsertRows(conn, TABLE_NAME, 220);
+            conn.commit();
+
+            // run the upsert select and also create an index
+            conn.setAutoCommit(true);
+            String upsertSelect = "UPSERT INTO " + TABLE_NAME + "(varchar_pk, 
char_pk, int_pk, long_pk, decimal_pk, date_pk) " + 
+                    "SELECT varchar_pk||'_upsert_select', char_pk, int_pk, 
long_pk, decimal_pk, date_pk FROM "+ TABLE_NAME;    
+            conn.createStatement().execute(upsertSelect);
+
+            ResultSet rs;
+            rs = conn.createStatement().executeQuery("SELECT /*+ NO_INDEX */ 
COUNT(*) FROM " + TABLE_NAME);
+            assertTrue(rs.next());
+            assertEquals(440,rs.getInt(1));
+            rs = conn.createStatement().executeQuery("SELECT COUNT(*) FROM " + 
TABLE_NAME);
+            assertTrue(rs.next());
+            assertEquals(440,rs.getInt(1));
+        }
+        finally {
+            conn.close();
+        }
+    }
+
+    // used to create an index while a batch of rows are being written
+    public static class CreateIndexRegionObserver extends SimpleRegionObserver 
{
+        @Override
+        public void postPut(ObserverContext<RegionCoprocessorEnvironment> c,
+                Put put, WALEdit edit, final Durability durability)
+                        throws HBaseIOException {
+            String tableName = c.getEnvironment().getRegion().getRegionInfo()
+                    .getTable().getNameAsString();
+            if (tableName.equalsIgnoreCase(TABLE_NAME)
+                    // create the index after the second batch of 1000 rows
+                    && Bytes.startsWith(put.getRow(), 
Bytes.toBytes("varchar200_upsert_select"))) {
+                try {
+                    Properties props = 
PropertiesUtil.deepCopy(TEST_PROPERTIES);
+                    try (Connection conn = 
DriverManager.getConnection(getUrl(), props)) {
+                        conn.createStatement().execute(INDEX_DDL);
+                    }
+                } catch (SQLException e) {
+                    throw new DoNotRetryIOException(e);
+                } 
+            }
+        }
+    }
+
+    private static class UpsertRunnable implements Runnable {
+        private static final int NUM_ROWS_IN_BATCH = 10000;
+        private final String fullTableName;
+
+        public UpsertRunnable(String fullTableName) {
+            this.fullTableName = fullTableName;
+        }
+
+        public void run() {
+            Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+            try (Connection conn = DriverManager.getConnection(getUrl(), 
props)) {
+                while (true) {
+                    // write a large batch of rows
+                    boolean fistRowInBatch = true;
+                    for (int i=0; i<NUM_ROWS_IN_BATCH; ++i) {
+                        BaseTest.upsertRow(conn, fullTableName, 
NUM_ROWS.intValue(), fistRowInBatch);
+                        NUM_ROWS.incrementAndGet();
+                        fistRowInBatch = false;
+                    }
+                    conn.commit();
+                    Thread.sleep(500);
+                }
+            } catch (SQLException e) {
+                throw new RuntimeException(e);
+            } catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
+            }
+        }
+    }
+
+    @Test
+    public void testCreateIndexWhileUpsertingData() throws Exception {
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        String ddl ="CREATE TABLE " + fullTableName + 
BaseTest.TEST_TABLE_SCHEMA + tableDDLOptions;
+        String indexDDL = "CREATE " + (localIndex ? "LOCAL" : "") + " INDEX IF 
NOT EXISTS " + indexName + " ON " + fullTableName
+                + " (long_pk, varchar_pk)"
+                + " INCLUDE (long_col1, long_col2)";
+        int numThreads = 3;
         try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
-               conn.setAutoCommit(false);
-               String ddl ="CREATE TABLE " + fullTableName + 
BaseTest.TEST_TABLE_SCHEMA + tableDDLOptions;
-               Statement stmt = conn.createStatement();
-               stmt.execute(ddl);
-               populateTestTable(fullTableName);
-               ddl = "CREATE " + (localIndex ? "LOCAL" : "") + " INDEX " + 
indexName + " ON " + fullTableName + " (long_col1)";
-               stmt.execute(ddl);
-               
-               ResultSet rs;
-               
-               rs = conn.createStatement().executeQuery("SELECT COUNT(*) FROM 
" + fullTableName);
-               assertTrue(rs.next());
-               assertEquals(3,rs.getInt(1));
-               rs = conn.createStatement().executeQuery("SELECT COUNT(*) FROM 
" + fullIndexName);
-               assertTrue(rs.next());
-               assertEquals(3,rs.getInt(1));
-               
-               conn.setAutoCommit(true);
-               String dml = "DELETE from " + fullTableName + " WHERE long_col2 
= 4";
-               try {
-                   conn.createStatement().execute(dml);
-                   fail();
-               } catch (SQLException e) {
-                   
assertEquals(SQLExceptionCode.INVALID_FILTER_ON_IMMUTABLE_ROWS.getErrorCode(), 
e.getErrorCode());
-               }
-                   
-               conn.createStatement().execute("DROP TABLE " + fullTableName);
+            conn.setAutoCommit(false);
+            Statement stmt = conn.createStatement();
+            stmt.execute(ddl);
+
+            ExecutorService threadPool = 
Executors.newFixedThreadPool(numThreads);
+            List<Future<?>> futureList = 
Lists.newArrayListWithExpectedSize(numThreads);
+            for (int i =0; i<numThreads; ++i) {
+                futureList.add(threadPool.submit(new 
UpsertRunnable(fullTableName)));
+            }
+            // upsert some rows before creating the index 
+            Thread.sleep(5000);
+
+            // create the index 
+            try (Connection conn2 = DriverManager.getConnection(getUrl(), 
props)) {
+                conn2.setAutoCommit(false);
+                Statement stmt2 = conn2.createStatement();
+                stmt2.execute(indexDDL);
+                conn2.commit();
+            }
+
+            // upsert some rows after creating the index
+            Thread.sleep(1000);
+            // cancel the running threads
+            for (Future<?> future : futureList) {
+                future.cancel(true);
+            }
+            threadPool.shutdownNow();
+            threadPool.awaitTermination(30, TimeUnit.SECONDS);
+            Thread.sleep(1000);
+
+            ResultSet rs;
+            rs = conn.createStatement().executeQuery("SELECT /*+ NO_INDEX */ 
COUNT(*) FROM " + fullTableName);
+            assertTrue(rs.next());
+            int dataTableRowCount = rs.getInt(1);
+            rs = conn.createStatement().executeQuery("SELECT COUNT(*) FROM " + 
fullIndexName);
+            assertTrue(rs.next());
+            int indexTableRowCount = rs.getInt(1);
+            assertEquals("Data and Index table should have the same number of 
rows ", dataTableRowCount, indexTableRowCount);
         }
     }
-    
 
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/637fefee/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java 
b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
index be896f8..d4d893b 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
@@ -135,7 +135,7 @@ public class MutationState implements SQLCloseable {
     private int numRows = 0;
     private int[] uncommittedStatementIndexes = EMPTY_STATEMENT_INDEX_ARRAY;
     private boolean isExternalTxContext = false;
-    private Map<TableRef, Map<ImmutableBytesPtr,RowMutationState>> txMutations;
+    private Map<TableRef, Map<ImmutableBytesPtr,RowMutationState>> txMutations 
= Collections.emptyMap();
     
     private final MutationMetricQueue mutationMetricQueue;
     private ReadMetricQueue readMetricQueue;
@@ -435,6 +435,59 @@ public class MutationState implements SQLCloseable {
         return sizeOffset + numRows;
     }
     
+    private void joinMutationState(TableRef tableRef, 
Map<ImmutableBytesPtr,RowMutationState> srcRows,
+            Map<TableRef, Map<ImmutableBytesPtr, RowMutationState>> 
dstMutations) {
+        PTable table = tableRef.getTable();
+        boolean isIndex = table.getType() == PTableType.INDEX;
+        boolean incrementRowCount = dstMutations == this.mutations;
+        Map<ImmutableBytesPtr,RowMutationState> existingRows = 
dstMutations.put(tableRef, srcRows);
+        if (existingRows != null) { // Rows for that table already exist
+            // Loop through new rows and replace existing with new
+            for (Map.Entry<ImmutableBytesPtr,RowMutationState> rowEntry : 
srcRows.entrySet()) {
+                // Replace existing row with new row
+                RowMutationState existingRowMutationState = 
existingRows.put(rowEntry.getKey(), rowEntry.getValue());
+                if (existingRowMutationState != null) {
+                    Map<PColumn,byte[]> existingValues = 
existingRowMutationState.getColumnValues();
+                    if (existingValues != PRow.DELETE_MARKER) {
+                        Map<PColumn,byte[]> newRow = 
rowEntry.getValue().getColumnValues();
+                        // if new row is PRow.DELETE_MARKER, it means delete, 
and we don't need to merge it with existing row. 
+                        if (newRow != PRow.DELETE_MARKER) {
+                            // Merge existing column values with new column 
values
+                            existingRowMutationState.join(rowEntry.getValue());
+                            // Now that the existing row has been merged with 
the new row, replace it back
+                            // again (since it was merged with the new one 
above).
+                            existingRows.put(rowEntry.getKey(), 
existingRowMutationState);
+                        }
+                    }
+                } else {
+                    if (incrementRowCount && !isIndex) { // Don't count index 
rows in row count
+                        numRows++;
+                    }
+                }
+            }
+            // Put the existing one back now that it's merged
+            dstMutations.put(tableRef, existingRows);
+        } else {
+            // Size new map at batch size as that's what it'll likely grow to.
+            Map<ImmutableBytesPtr,RowMutationState> newRows = 
Maps.newHashMapWithExpectedSize(connection.getMutateBatchSize());
+            newRows.putAll(srcRows);
+            dstMutations.put(tableRef, newRows);
+            if (incrementRowCount && !isIndex) {
+                numRows += srcRows.size();
+            }
+        }
+    }
+    
+    private void joinMutationState(Map<TableRef, Map<ImmutableBytesPtr, 
RowMutationState>> srcMutations, 
+            Map<TableRef, Map<ImmutableBytesPtr, RowMutationState>> 
dstMutations) {
+        // Merge newMutation with this one, keeping state from newMutation for 
any overlaps
+        for (Map.Entry<TableRef, Map<ImmutableBytesPtr,RowMutationState>> 
entry : srcMutations.entrySet()) {
+            // Replace existing entries for the table with new entries
+            TableRef tableRef = entry.getKey();
+            Map<ImmutableBytesPtr,RowMutationState> srcRows = entry.getValue();
+            joinMutationState(tableRef, srcRows, dstMutations);
+        }
+    }
     /**
      * Combine a newer mutation with this one, where in the event of overlaps, 
the newer one will take precedence.
      * Combine any metrics collected for the newer mutation.
@@ -453,48 +506,12 @@ public class MutationState implements SQLCloseable {
             txAwares.addAll(newMutationState.txAwares);
         }
         this.sizeOffset += newMutationState.sizeOffset;
-        // Merge newMutation with this one, keeping state from newMutation for 
any overlaps
-        for (Map.Entry<TableRef, Map<ImmutableBytesPtr,RowMutationState>> 
entry : newMutationState.mutations.entrySet()) {
-            // Replace existing entries for the table with new entries
-            TableRef tableRef = entry.getKey();
-            PTable table = tableRef.getTable();
-            boolean isIndex = table.getType() == PTableType.INDEX;
-            Map<ImmutableBytesPtr,RowMutationState> existingRows = 
this.mutations.put(tableRef, entry.getValue());
-            if (existingRows != null) { // Rows for that table already exist
-                // Loop through new rows and replace existing with new
-                for (Map.Entry<ImmutableBytesPtr,RowMutationState> rowEntry : 
entry.getValue().entrySet()) {
-                    // Replace existing row with new row
-                    RowMutationState existingRowMutationState = 
existingRows.put(rowEntry.getKey(), rowEntry.getValue());
-                    if (existingRowMutationState != null) {
-                        Map<PColumn,byte[]> existingValues = 
existingRowMutationState.getColumnValues();
-                        if (existingValues != PRow.DELETE_MARKER) {
-                            Map<PColumn,byte[]> newRow = 
rowEntry.getValue().getColumnValues();
-                            // if new row is PRow.DELETE_MARKER, it means 
delete, and we don't need to merge it with existing row. 
-                            if (newRow != PRow.DELETE_MARKER) {
-                                // Merge existing column values with new 
column values
-                                
existingRowMutationState.join(rowEntry.getValue());
-                                // Now that the existing row has been merged 
with the new row, replace it back
-                                // again (since it was merged with the new one 
above).
-                                existingRows.put(rowEntry.getKey(), 
existingRowMutationState);
-                            }
-                        }
-                    } else {
-                        if (!isIndex) { // Don't count index rows in row count
-                            numRows++;
-                        }
-                    }
-                }
-                // Put the existing one back now that it's merged
-                this.mutations.put(entry.getKey(), existingRows);
-            } else {
-                // Size new map at batch size as that's what it'll likely grow 
to.
-                Map<ImmutableBytesPtr,RowMutationState> newRows = 
Maps.newHashMapWithExpectedSize(connection.getMutateBatchSize());
-                newRows.putAll(entry.getValue());
-                this.mutations.put(tableRef, newRows);
-                if (!isIndex) {
-                    numRows += entry.getValue().size();
-                }
+        joinMutationState(newMutationState.mutations, this.mutations);
+        if (!newMutationState.txMutations.isEmpty()) {
+            if (txMutations.isEmpty()) {
+                txMutations = 
Maps.newHashMapWithExpectedSize(mutations.size());
             }
+            joinMutationState(newMutationState.txMutations, this.txMutations);
         }
         
mutationMetricQueue.combineMetricQueues(newMutationState.mutationMetricQueue);
         if (readMetricQueue == null) {
@@ -915,6 +932,7 @@ public class MutationState implements SQLCloseable {
                             long startTime = System.currentTimeMillis();
                             child.addTimelineAnnotation("Attempt " + 
retryCount);
                             hTable.batch(mutationList);
+                            if (logger.isDebugEnabled()) logger.debug("Sent 
batch of " + numMutations + " for " + Bytes.toString(htableName));
                             child.stop();
                             child.stop();
                             shouldRetry = false;
@@ -980,13 +998,13 @@ public class MutationState implements SQLCloseable {
                 // committed in the event of a failure.
                 if (isTransactional) {
                     addUncommittedStatementIndexes(valuesMap.values());
-                    if (txMutations == null) {
+                    if (txMutations.isEmpty()) {
                         txMutations = 
Maps.newHashMapWithExpectedSize(mutations.size());
                     }
                     // Keep all mutations we've encountered until a commit or 
rollback.
                     // This is not ideal, but there's not good way to get the 
values back
                     // in the event that we need to replay the commit.
-                    txMutations.put(tableRef, valuesMap);
+                    joinMutationState(tableRef, valuesMap, txMutations);
                 }
                 // Remove batches as we process them
                 if (sendAll) {
@@ -1082,7 +1100,7 @@ public class MutationState implements SQLCloseable {
     private void resetTransactionalState() {
         tx = null;
         txAwares.clear();
-        txMutations = null;
+        txMutations = Collections.emptyMap();
         uncommittedPhysicalNames.clear();
         uncommittedStatementIndexes = EMPTY_STATEMENT_INDEX_ARRAY;
     }
@@ -1187,9 +1205,7 @@ public class MutationState implements SQLCloseable {
                 break;
             }
             retryCount++;
-            if (txMutations != null) {
-                mutations.putAll(txMutations);
-            }
+            mutations.putAll(txMutations);
         } while (true);
     }
 
@@ -1214,6 +1230,7 @@ public class MutationState implements SQLCloseable {
             if (result.getTable() == null) {
                 throw new 
TableNotFoundException(dataTable.getSchemaName().getString(), 
dataTable.getTableName().getString());
             }
+            tableRef.setTable(result.getTable());
             if (!result.wasUpdated()) {
                 if (logger.isInfoEnabled()) logger.info("No updates to " + 
dataTable.getName().getString() + " as of "  + timestamp);
                 continue;
@@ -1223,7 +1240,7 @@ public class MutationState implements SQLCloseable {
                 // that an index was dropped and recreated with the same name 
but different
                 // indexed/covered columns.
                 addedIndexes = 
(!oldIndexes.equals(result.getTable().getIndexes()));
-                if (logger.isInfoEnabled()) logger.info((addedIndexes ? 
"Updates " : "No updates ") + "as of "  + timestamp + " to " + 
dataTable.getName().getString() + " with indexes " + dataTable.getIndexes());
+                if (logger.isInfoEnabled()) logger.info((addedIndexes ? 
"Updates " : "No updates ") + "as of "  + timestamp + " to " + 
dataTable.getName().getString() + " with indexes " + 
tableRef.getTable().getIndexes());
             }
         }
         if (logger.isInfoEnabled()) logger.info((addedIndexes ? "Updates " : 
"No updates ") + "to indexes as of "  + getInitialWritePointer());

http://git-wip-us.apache.org/repos/asf/phoenix/blob/637fefee/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java 
b/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
index 35bb8ce..951bfce 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
@@ -17,6 +17,7 @@
  */
 package org.apache.phoenix.query;
 
+import static org.apache.phoenix.query.BaseTest.setUpConfigForMiniCluster;
 import static org.apache.phoenix.util.PhoenixRuntime.CURRENT_SCN_ATTRIB;
 import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL;
 import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL_TERMINATOR;
@@ -150,6 +151,7 @@ import org.apache.phoenix.exception.SQLExceptionInfo;
 import org.apache.phoenix.hbase.index.balancer.IndexLoadBalancer;
 import org.apache.phoenix.hbase.index.master.IndexMasterObserver;
 import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
+import org.apache.phoenix.jdbc.PhoenixDriver;
 import org.apache.phoenix.jdbc.PhoenixEmbeddedDriver;
 import org.apache.phoenix.jdbc.PhoenixEmbeddedDriver.ConnectionInfo;
 import org.apache.phoenix.jdbc.PhoenixTestDriver;
@@ -200,7 +202,7 @@ import com.google.inject.util.Providers;
 public abstract class BaseTest {
     protected static final String TEST_TABLE_SCHEMA = "(" +
             "   varchar_pk VARCHAR NOT NULL, " +
-            "   char_pk CHAR(6) NOT NULL, " +
+            "   char_pk CHAR(10) NOT NULL, " +
             "   int_pk INTEGER NOT NULL, "+ 
             "   long_pk BIGINT NOT NULL, " +
             "   decimal_pk DECIMAL(31, 10) NOT NULL, " +
@@ -1805,77 +1807,44 @@ public abstract class BaseTest {
     public HBaseTestingUtility getUtility() {
         return utility;
     }
+    
+    public static void upsertRows(Connection conn, String fullTableName, int 
numRows) throws SQLException {
+       for (int i=1; i<=numRows; ++i) {
+               upsertRow(conn, fullTableName, i, false);
+       }
+    }
+
+    public static void upsertRow(Connection conn, String fullTableName, int 
index, boolean firstRowInBatch) throws SQLException {
+       String upsert = "UPSERT INTO " + fullTableName
+                + " VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, 
?)";
+               PreparedStatement stmt = conn.prepareStatement(upsert);
+               stmt.setString(1, firstRowInBatch ? "firstRowInBatch_" : "" + 
"varchar"+index);
+               stmt.setString(2, "char"+index);
+               stmt.setInt(3, index);
+               stmt.setLong(4, index);
+               stmt.setBigDecimal(5, new BigDecimal(index));
+               Date date = DateUtil.parseDate("2015-01-01 00:00:00");
+               stmt.setDate(6, date);
+               stmt.setString(7, "varchar_a");
+               stmt.setString(8, "chara");
+               stmt.setInt(9, index+1);
+               stmt.setLong(10, index+1);
+               stmt.setBigDecimal(11, new BigDecimal(index+1));
+               stmt.setDate(12, date);
+               stmt.setString(13, "varchar_b");
+               stmt.setString(14, "charb");
+               stmt.setInt(15, index+2);
+               stmt.setLong(16, index+2);
+               stmt.setBigDecimal(17, new BigDecimal(index+2));
+               stmt.setDate(18, date);
+               stmt.executeUpdate();
+       }
 
     // Populate the test table with data.
     public static void populateTestTable(String fullTableName) throws 
SQLException {
         Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
         try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
-            String upsert = "UPSERT INTO " + fullTableName
-                    + " VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, 
?, ?)";
-            PreparedStatement stmt = conn.prepareStatement(upsert);
-            stmt.setString(1, "varchar1");
-            stmt.setString(2, "char1");
-            stmt.setInt(3, 1);
-            stmt.setLong(4, 1L);
-            stmt.setBigDecimal(5, new BigDecimal(1.0));
-            Date date = DateUtil.parseDate("2015-01-01 00:00:00");
-            stmt.setDate(6, date);
-            stmt.setString(7, "varchar_a");
-            stmt.setString(8, "chara");
-            stmt.setInt(9, 2);
-            stmt.setLong(10, 2L);
-            stmt.setBigDecimal(11, new BigDecimal(2.0));
-            stmt.setDate(12, date);
-            stmt.setString(13, "varchar_b");
-            stmt.setString(14, "charb");
-            stmt.setInt(15, 3);
-            stmt.setLong(16, 3L);
-            stmt.setBigDecimal(17, new BigDecimal(3.0));
-            stmt.setDate(18, date);
-            stmt.executeUpdate();
-            
-            stmt.setString(1, "varchar2");
-            stmt.setString(2, "char2");
-            stmt.setInt(3, 2);
-            stmt.setLong(4, 2L);
-            stmt.setBigDecimal(5, new BigDecimal(2.0));
-            date = DateUtil.parseDate("2015-01-02 00:00:00");
-            stmt.setDate(6, date);
-            stmt.setString(7, "varchar_a");
-            stmt.setString(8, "chara");
-            stmt.setInt(9, 3);
-            stmt.setLong(10, 3L);
-            stmt.setBigDecimal(11, new BigDecimal(3.0));
-            stmt.setDate(12, date);
-            stmt.setString(13, "varchar_b");
-            stmt.setString(14, "charb");
-            stmt.setInt(15, 4);
-            stmt.setLong(16, 4L);
-            stmt.setBigDecimal(17, new BigDecimal(4.0));
-            stmt.setDate(18, date);
-            stmt.executeUpdate();
-            
-            stmt.setString(1, "varchar3");
-            stmt.setString(2, "char3");
-            stmt.setInt(3, 3);
-            stmt.setLong(4, 3L);
-            stmt.setBigDecimal(5, new BigDecimal(3.0));
-            date = DateUtil.parseDate("2015-01-03 00:00:00");
-            stmt.setDate(6, date);
-            stmt.setString(7, "varchar_a");
-            stmt.setString(8, "chara");
-            stmt.setInt(9, 4);
-            stmt.setLong(10, 4L);
-            stmt.setBigDecimal(11, new BigDecimal(4.0));
-            stmt.setDate(12, date);
-            stmt.setString(13, "varchar_b");
-            stmt.setString(14, "charb");
-            stmt.setInt(15, 5);
-            stmt.setLong(16, 5L);
-            stmt.setBigDecimal(17, new BigDecimal(5.0));
-            stmt.setDate(18, date);
-            stmt.executeUpdate();
-            
+               upsertRows(conn, fullTableName, 3);
             conn.commit();
         }
     }

Reply via email to