Repository: phoenix Updated Branches: refs/heads/4.x-HBase-1.0 388630f80 -> 637fefee0
PHOENIX-2478 Rows committed in transaction overlapping index creation are not populated Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/637fefee Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/637fefee Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/637fefee Branch: refs/heads/4.x-HBase-1.0 Commit: 637fefee06e3675870070934339ec8006877d10b Parents: 388630f Author: James Taylor <[email protected]> Authored: Tue Jan 19 08:19:44 2016 -0800 Committer: James Taylor <[email protected]> Committed: Tue Jan 19 08:22:14 2016 -0800 ---------------------------------------------------------------------- .../phoenix/end2end/index/ImmutableIndexIT.java | 239 ++++++++++++++----- .../apache/phoenix/execute/MutationState.java | 115 +++++---- .../java/org/apache/phoenix/query/BaseTest.java | 103 +++----- 3 files changed, 285 insertions(+), 172 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/637fefee/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ImmutableIndexIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ImmutableIndexIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ImmutableIndexIT.java index 0d329fe..c4ecfbb 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ImmutableIndexIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ImmutableIndexIT.java @@ -20,7 +20,6 @@ package org.apache.phoenix.end2end.index; import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; import java.sql.Connection; import java.sql.DriverManager; @@ -29,12 +28,26 @@ import java.sql.SQLException; import java.sql.Statement; import java.util.Arrays; import java.util.Collection; +import java.util.List; import java.util.Map; import java.util.Properties; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.hadoop.hbase.DoNotRetryIOException; +import org.apache.hadoop.hbase.HBaseIOException; +import org.apache.hadoop.hbase.client.Durability; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.coprocessor.ObserverContext; +import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; +import org.apache.hadoop.hbase.coprocessor.SimpleRegionObserver; +import org.apache.hadoop.hbase.regionserver.wal.WALEdit; +import org.apache.hadoop.hbase.util.Bytes; import org.apache.phoenix.end2end.BaseHBaseManagedTimeIT; import org.apache.phoenix.end2end.Shadower; -import org.apache.phoenix.exception.SQLExceptionCode; import org.apache.phoenix.query.BaseTest; import org.apache.phoenix.query.QueryServices; import org.apache.phoenix.util.PropertiesUtil; @@ -47,80 +60,194 @@ import org.junit.runner.RunWith; import org.junit.runners.Parameterized; import org.junit.runners.Parameterized.Parameters; +import com.google.common.collect.Lists; import com.google.common.collect.Maps; @RunWith(Parameterized.class) public class ImmutableIndexIT extends BaseHBaseManagedTimeIT { - - private final boolean localIndex; - private final String tableDDLOptions; - private final String tableName; + + private final boolean localIndex; + private final String tableDDLOptions; + private final String tableName; private final String indexName; private final String fullTableName; private final String fullIndexName; - - public ImmutableIndexIT(boolean localIndex, boolean transactional) { - this.localIndex = localIndex; - StringBuilder optionBuilder = new StringBuilder("IMMUTABLE_ROWS=true"); - if (transactional) { - optionBuilder.append(", TRANSACTIONAL=true"); - } - this.tableDDLOptions = optionBuilder.toString(); - this.tableName = TestUtil.DEFAULT_DATA_TABLE_NAME + ( transactional ? "_TXN" : ""); + + private static String TABLE_NAME; + private static String INDEX_DDL; + public static final AtomicInteger NUM_ROWS = new AtomicInteger(1); + + public ImmutableIndexIT(boolean localIndex, boolean transactional) { + this.localIndex = localIndex; + StringBuilder optionBuilder = new StringBuilder("IMMUTABLE_ROWS=true"); + if (transactional) { + optionBuilder.append(", TRANSACTIONAL=true"); + } + this.tableDDLOptions = optionBuilder.toString(); + this.tableName = TestUtil.DEFAULT_DATA_TABLE_NAME + ( transactional ? "_TXN" : ""); this.indexName = "IDX" + ( transactional ? "_TXN" : ""); this.fullTableName = SchemaUtil.getTableName(TestUtil.DEFAULT_SCHEMA_NAME, tableName); this.fullIndexName = SchemaUtil.getTableName(TestUtil.DEFAULT_SCHEMA_NAME, indexName); - } - - @BeforeClass + } + + @BeforeClass @Shadower(classBeingShadowed = BaseHBaseManagedTimeIT.class) public static void doSetup() throws Exception { - Map<String,String> props = Maps.newHashMapWithExpectedSize(1); - props.put(QueryServices.TRANSACTIONS_ENABLED, Boolean.toString(true)); - setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator())); + Map<String, String> serverProps = Maps.newHashMapWithExpectedSize(1); + serverProps.put("hbase.coprocessor.region.classes", CreateIndexRegionObserver.class.getName()); + Map<String, String> clientProps = Maps.newHashMapWithExpectedSize(2); + clientProps.put(QueryServices.TRANSACTIONS_ENABLED, "true"); + setUpTestDriver(new ReadOnlyProps(serverProps.entrySet().iterator()), new ReadOnlyProps(clientProps.entrySet().iterator())); } - - @Parameters(name="localIndex = {0} , transactional = {1}") + + @Parameters(name="localIndex = {0} , transactional = {1}") public static Collection<Boolean[]> data() { return Arrays.asList(new Boolean[][] { - { false, false }, { false, true }, { true, false }, { true, true } - }); + { false, true }, { true, true } + }); } - + + @Test - public void testDropIfImmutableKeyValueColumn() throws Exception { - Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); + public void testCreateIndexDuringUpsertSelect() throws Exception { + Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); + props.setProperty(QueryServices.MUTATE_BATCH_SIZE_ATTRIB, Integer.toString(100)); + TABLE_NAME = fullTableName + "_testCreateIndexDuringUpsertSelect"; + String ddl ="CREATE TABLE " + TABLE_NAME + BaseTest.TEST_TABLE_SCHEMA + tableDDLOptions; + INDEX_DDL = "CREATE " + (localIndex ? "LOCAL" : "") + " INDEX IF NOT EXISTS " + indexName + " ON " + TABLE_NAME + + " (long_pk, varchar_pk)" + + " INCLUDE (long_col1, long_col2)"; + + Connection conn = DriverManager.getConnection(getUrl(), props); + try { + conn.setAutoCommit(false); + Statement stmt = conn.createStatement(); + stmt.execute(ddl); + + upsertRows(conn, TABLE_NAME, 220); + conn.commit(); + + // run the upsert select and also create an index + conn.setAutoCommit(true); + String upsertSelect = "UPSERT INTO " + TABLE_NAME + "(varchar_pk, char_pk, int_pk, long_pk, decimal_pk, date_pk) " + + "SELECT varchar_pk||'_upsert_select', char_pk, int_pk, long_pk, decimal_pk, date_pk FROM "+ TABLE_NAME; + conn.createStatement().execute(upsertSelect); + + ResultSet rs; + rs = conn.createStatement().executeQuery("SELECT /*+ NO_INDEX */ COUNT(*) FROM " + TABLE_NAME); + assertTrue(rs.next()); + assertEquals(440,rs.getInt(1)); + rs = conn.createStatement().executeQuery("SELECT COUNT(*) FROM " + TABLE_NAME); + assertTrue(rs.next()); + assertEquals(440,rs.getInt(1)); + } + finally { + conn.close(); + } + } + + // used to create an index while a batch of rows are being written + public static class CreateIndexRegionObserver extends SimpleRegionObserver { + @Override + public void postPut(ObserverContext<RegionCoprocessorEnvironment> c, + Put put, WALEdit edit, final Durability durability) + throws HBaseIOException { + String tableName = c.getEnvironment().getRegion().getRegionInfo() + .getTable().getNameAsString(); + if (tableName.equalsIgnoreCase(TABLE_NAME) + // create the index after the second batch of 1000 rows + && Bytes.startsWith(put.getRow(), Bytes.toBytes("varchar200_upsert_select"))) { + try { + Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); + try (Connection conn = DriverManager.getConnection(getUrl(), props)) { + conn.createStatement().execute(INDEX_DDL); + } + } catch (SQLException e) { + throw new DoNotRetryIOException(e); + } + } + } + } + + private static class UpsertRunnable implements Runnable { + private static final int NUM_ROWS_IN_BATCH = 10000; + private final String fullTableName; + + public UpsertRunnable(String fullTableName) { + this.fullTableName = fullTableName; + } + + public void run() { + Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); + try (Connection conn = DriverManager.getConnection(getUrl(), props)) { + while (true) { + // write a large batch of rows + boolean fistRowInBatch = true; + for (int i=0; i<NUM_ROWS_IN_BATCH; ++i) { + BaseTest.upsertRow(conn, fullTableName, NUM_ROWS.intValue(), fistRowInBatch); + NUM_ROWS.incrementAndGet(); + fistRowInBatch = false; + } + conn.commit(); + Thread.sleep(500); + } + } catch (SQLException e) { + throw new RuntimeException(e); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + } + + @Test + public void testCreateIndexWhileUpsertingData() throws Exception { + Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); + String ddl ="CREATE TABLE " + fullTableName + BaseTest.TEST_TABLE_SCHEMA + tableDDLOptions; + String indexDDL = "CREATE " + (localIndex ? "LOCAL" : "") + " INDEX IF NOT EXISTS " + indexName + " ON " + fullTableName + + " (long_pk, varchar_pk)" + + " INCLUDE (long_col1, long_col2)"; + int numThreads = 3; try (Connection conn = DriverManager.getConnection(getUrl(), props)) { - conn.setAutoCommit(false); - String ddl ="CREATE TABLE " + fullTableName + BaseTest.TEST_TABLE_SCHEMA + tableDDLOptions; - Statement stmt = conn.createStatement(); - stmt.execute(ddl); - populateTestTable(fullTableName); - ddl = "CREATE " + (localIndex ? "LOCAL" : "") + " INDEX " + indexName + " ON " + fullTableName + " (long_col1)"; - stmt.execute(ddl); - - ResultSet rs; - - rs = conn.createStatement().executeQuery("SELECT COUNT(*) FROM " + fullTableName); - assertTrue(rs.next()); - assertEquals(3,rs.getInt(1)); - rs = conn.createStatement().executeQuery("SELECT COUNT(*) FROM " + fullIndexName); - assertTrue(rs.next()); - assertEquals(3,rs.getInt(1)); - - conn.setAutoCommit(true); - String dml = "DELETE from " + fullTableName + " WHERE long_col2 = 4"; - try { - conn.createStatement().execute(dml); - fail(); - } catch (SQLException e) { - assertEquals(SQLExceptionCode.INVALID_FILTER_ON_IMMUTABLE_ROWS.getErrorCode(), e.getErrorCode()); - } - - conn.createStatement().execute("DROP TABLE " + fullTableName); + conn.setAutoCommit(false); + Statement stmt = conn.createStatement(); + stmt.execute(ddl); + + ExecutorService threadPool = Executors.newFixedThreadPool(numThreads); + List<Future<?>> futureList = Lists.newArrayListWithExpectedSize(numThreads); + for (int i =0; i<numThreads; ++i) { + futureList.add(threadPool.submit(new UpsertRunnable(fullTableName))); + } + // upsert some rows before creating the index + Thread.sleep(5000); + + // create the index + try (Connection conn2 = DriverManager.getConnection(getUrl(), props)) { + conn2.setAutoCommit(false); + Statement stmt2 = conn2.createStatement(); + stmt2.execute(indexDDL); + conn2.commit(); + } + + // upsert some rows after creating the index + Thread.sleep(1000); + // cancel the running threads + for (Future<?> future : futureList) { + future.cancel(true); + } + threadPool.shutdownNow(); + threadPool.awaitTermination(30, TimeUnit.SECONDS); + Thread.sleep(1000); + + ResultSet rs; + rs = conn.createStatement().executeQuery("SELECT /*+ NO_INDEX */ COUNT(*) FROM " + fullTableName); + assertTrue(rs.next()); + int dataTableRowCount = rs.getInt(1); + rs = conn.createStatement().executeQuery("SELECT COUNT(*) FROM " + fullIndexName); + assertTrue(rs.next()); + int indexTableRowCount = rs.getInt(1); + assertEquals("Data and Index table should have the same number of rows ", dataTableRowCount, indexTableRowCount); } } - } http://git-wip-us.apache.org/repos/asf/phoenix/blob/637fefee/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java index be896f8..d4d893b 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java @@ -135,7 +135,7 @@ public class MutationState implements SQLCloseable { private int numRows = 0; private int[] uncommittedStatementIndexes = EMPTY_STATEMENT_INDEX_ARRAY; private boolean isExternalTxContext = false; - private Map<TableRef, Map<ImmutableBytesPtr,RowMutationState>> txMutations; + private Map<TableRef, Map<ImmutableBytesPtr,RowMutationState>> txMutations = Collections.emptyMap(); private final MutationMetricQueue mutationMetricQueue; private ReadMetricQueue readMetricQueue; @@ -435,6 +435,59 @@ public class MutationState implements SQLCloseable { return sizeOffset + numRows; } + private void joinMutationState(TableRef tableRef, Map<ImmutableBytesPtr,RowMutationState> srcRows, + Map<TableRef, Map<ImmutableBytesPtr, RowMutationState>> dstMutations) { + PTable table = tableRef.getTable(); + boolean isIndex = table.getType() == PTableType.INDEX; + boolean incrementRowCount = dstMutations == this.mutations; + Map<ImmutableBytesPtr,RowMutationState> existingRows = dstMutations.put(tableRef, srcRows); + if (existingRows != null) { // Rows for that table already exist + // Loop through new rows and replace existing with new + for (Map.Entry<ImmutableBytesPtr,RowMutationState> rowEntry : srcRows.entrySet()) { + // Replace existing row with new row + RowMutationState existingRowMutationState = existingRows.put(rowEntry.getKey(), rowEntry.getValue()); + if (existingRowMutationState != null) { + Map<PColumn,byte[]> existingValues = existingRowMutationState.getColumnValues(); + if (existingValues != PRow.DELETE_MARKER) { + Map<PColumn,byte[]> newRow = rowEntry.getValue().getColumnValues(); + // if new row is PRow.DELETE_MARKER, it means delete, and we don't need to merge it with existing row. + if (newRow != PRow.DELETE_MARKER) { + // Merge existing column values with new column values + existingRowMutationState.join(rowEntry.getValue()); + // Now that the existing row has been merged with the new row, replace it back + // again (since it was merged with the new one above). + existingRows.put(rowEntry.getKey(), existingRowMutationState); + } + } + } else { + if (incrementRowCount && !isIndex) { // Don't count index rows in row count + numRows++; + } + } + } + // Put the existing one back now that it's merged + dstMutations.put(tableRef, existingRows); + } else { + // Size new map at batch size as that's what it'll likely grow to. + Map<ImmutableBytesPtr,RowMutationState> newRows = Maps.newHashMapWithExpectedSize(connection.getMutateBatchSize()); + newRows.putAll(srcRows); + dstMutations.put(tableRef, newRows); + if (incrementRowCount && !isIndex) { + numRows += srcRows.size(); + } + } + } + + private void joinMutationState(Map<TableRef, Map<ImmutableBytesPtr, RowMutationState>> srcMutations, + Map<TableRef, Map<ImmutableBytesPtr, RowMutationState>> dstMutations) { + // Merge newMutation with this one, keeping state from newMutation for any overlaps + for (Map.Entry<TableRef, Map<ImmutableBytesPtr,RowMutationState>> entry : srcMutations.entrySet()) { + // Replace existing entries for the table with new entries + TableRef tableRef = entry.getKey(); + Map<ImmutableBytesPtr,RowMutationState> srcRows = entry.getValue(); + joinMutationState(tableRef, srcRows, dstMutations); + } + } /** * Combine a newer mutation with this one, where in the event of overlaps, the newer one will take precedence. * Combine any metrics collected for the newer mutation. @@ -453,48 +506,12 @@ public class MutationState implements SQLCloseable { txAwares.addAll(newMutationState.txAwares); } this.sizeOffset += newMutationState.sizeOffset; - // Merge newMutation with this one, keeping state from newMutation for any overlaps - for (Map.Entry<TableRef, Map<ImmutableBytesPtr,RowMutationState>> entry : newMutationState.mutations.entrySet()) { - // Replace existing entries for the table with new entries - TableRef tableRef = entry.getKey(); - PTable table = tableRef.getTable(); - boolean isIndex = table.getType() == PTableType.INDEX; - Map<ImmutableBytesPtr,RowMutationState> existingRows = this.mutations.put(tableRef, entry.getValue()); - if (existingRows != null) { // Rows for that table already exist - // Loop through new rows and replace existing with new - for (Map.Entry<ImmutableBytesPtr,RowMutationState> rowEntry : entry.getValue().entrySet()) { - // Replace existing row with new row - RowMutationState existingRowMutationState = existingRows.put(rowEntry.getKey(), rowEntry.getValue()); - if (existingRowMutationState != null) { - Map<PColumn,byte[]> existingValues = existingRowMutationState.getColumnValues(); - if (existingValues != PRow.DELETE_MARKER) { - Map<PColumn,byte[]> newRow = rowEntry.getValue().getColumnValues(); - // if new row is PRow.DELETE_MARKER, it means delete, and we don't need to merge it with existing row. - if (newRow != PRow.DELETE_MARKER) { - // Merge existing column values with new column values - existingRowMutationState.join(rowEntry.getValue()); - // Now that the existing row has been merged with the new row, replace it back - // again (since it was merged with the new one above). - existingRows.put(rowEntry.getKey(), existingRowMutationState); - } - } - } else { - if (!isIndex) { // Don't count index rows in row count - numRows++; - } - } - } - // Put the existing one back now that it's merged - this.mutations.put(entry.getKey(), existingRows); - } else { - // Size new map at batch size as that's what it'll likely grow to. - Map<ImmutableBytesPtr,RowMutationState> newRows = Maps.newHashMapWithExpectedSize(connection.getMutateBatchSize()); - newRows.putAll(entry.getValue()); - this.mutations.put(tableRef, newRows); - if (!isIndex) { - numRows += entry.getValue().size(); - } + joinMutationState(newMutationState.mutations, this.mutations); + if (!newMutationState.txMutations.isEmpty()) { + if (txMutations.isEmpty()) { + txMutations = Maps.newHashMapWithExpectedSize(mutations.size()); } + joinMutationState(newMutationState.txMutations, this.txMutations); } mutationMetricQueue.combineMetricQueues(newMutationState.mutationMetricQueue); if (readMetricQueue == null) { @@ -915,6 +932,7 @@ public class MutationState implements SQLCloseable { long startTime = System.currentTimeMillis(); child.addTimelineAnnotation("Attempt " + retryCount); hTable.batch(mutationList); + if (logger.isDebugEnabled()) logger.debug("Sent batch of " + numMutations + " for " + Bytes.toString(htableName)); child.stop(); child.stop(); shouldRetry = false; @@ -980,13 +998,13 @@ public class MutationState implements SQLCloseable { // committed in the event of a failure. if (isTransactional) { addUncommittedStatementIndexes(valuesMap.values()); - if (txMutations == null) { + if (txMutations.isEmpty()) { txMutations = Maps.newHashMapWithExpectedSize(mutations.size()); } // Keep all mutations we've encountered until a commit or rollback. // This is not ideal, but there's not good way to get the values back // in the event that we need to replay the commit. - txMutations.put(tableRef, valuesMap); + joinMutationState(tableRef, valuesMap, txMutations); } // Remove batches as we process them if (sendAll) { @@ -1082,7 +1100,7 @@ public class MutationState implements SQLCloseable { private void resetTransactionalState() { tx = null; txAwares.clear(); - txMutations = null; + txMutations = Collections.emptyMap(); uncommittedPhysicalNames.clear(); uncommittedStatementIndexes = EMPTY_STATEMENT_INDEX_ARRAY; } @@ -1187,9 +1205,7 @@ public class MutationState implements SQLCloseable { break; } retryCount++; - if (txMutations != null) { - mutations.putAll(txMutations); - } + mutations.putAll(txMutations); } while (true); } @@ -1214,6 +1230,7 @@ public class MutationState implements SQLCloseable { if (result.getTable() == null) { throw new TableNotFoundException(dataTable.getSchemaName().getString(), dataTable.getTableName().getString()); } + tableRef.setTable(result.getTable()); if (!result.wasUpdated()) { if (logger.isInfoEnabled()) logger.info("No updates to " + dataTable.getName().getString() + " as of " + timestamp); continue; @@ -1223,7 +1240,7 @@ public class MutationState implements SQLCloseable { // that an index was dropped and recreated with the same name but different // indexed/covered columns. addedIndexes = (!oldIndexes.equals(result.getTable().getIndexes())); - if (logger.isInfoEnabled()) logger.info((addedIndexes ? "Updates " : "No updates ") + "as of " + timestamp + " to " + dataTable.getName().getString() + " with indexes " + dataTable.getIndexes()); + if (logger.isInfoEnabled()) logger.info((addedIndexes ? "Updates " : "No updates ") + "as of " + timestamp + " to " + dataTable.getName().getString() + " with indexes " + tableRef.getTable().getIndexes()); } } if (logger.isInfoEnabled()) logger.info((addedIndexes ? "Updates " : "No updates ") + "to indexes as of " + getInitialWritePointer()); http://git-wip-us.apache.org/repos/asf/phoenix/blob/637fefee/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java b/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java index 35bb8ce..951bfce 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java @@ -17,6 +17,7 @@ */ package org.apache.phoenix.query; +import static org.apache.phoenix.query.BaseTest.setUpConfigForMiniCluster; import static org.apache.phoenix.util.PhoenixRuntime.CURRENT_SCN_ATTRIB; import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL; import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL_TERMINATOR; @@ -150,6 +151,7 @@ import org.apache.phoenix.exception.SQLExceptionInfo; import org.apache.phoenix.hbase.index.balancer.IndexLoadBalancer; import org.apache.phoenix.hbase.index.master.IndexMasterObserver; import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData; +import org.apache.phoenix.jdbc.PhoenixDriver; import org.apache.phoenix.jdbc.PhoenixEmbeddedDriver; import org.apache.phoenix.jdbc.PhoenixEmbeddedDriver.ConnectionInfo; import org.apache.phoenix.jdbc.PhoenixTestDriver; @@ -200,7 +202,7 @@ import com.google.inject.util.Providers; public abstract class BaseTest { protected static final String TEST_TABLE_SCHEMA = "(" + " varchar_pk VARCHAR NOT NULL, " + - " char_pk CHAR(6) NOT NULL, " + + " char_pk CHAR(10) NOT NULL, " + " int_pk INTEGER NOT NULL, "+ " long_pk BIGINT NOT NULL, " + " decimal_pk DECIMAL(31, 10) NOT NULL, " + @@ -1805,77 +1807,44 @@ public abstract class BaseTest { public HBaseTestingUtility getUtility() { return utility; } + + public static void upsertRows(Connection conn, String fullTableName, int numRows) throws SQLException { + for (int i=1; i<=numRows; ++i) { + upsertRow(conn, fullTableName, i, false); + } + } + + public static void upsertRow(Connection conn, String fullTableName, int index, boolean firstRowInBatch) throws SQLException { + String upsert = "UPSERT INTO " + fullTableName + + " VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"; + PreparedStatement stmt = conn.prepareStatement(upsert); + stmt.setString(1, firstRowInBatch ? "firstRowInBatch_" : "" + "varchar"+index); + stmt.setString(2, "char"+index); + stmt.setInt(3, index); + stmt.setLong(4, index); + stmt.setBigDecimal(5, new BigDecimal(index)); + Date date = DateUtil.parseDate("2015-01-01 00:00:00"); + stmt.setDate(6, date); + stmt.setString(7, "varchar_a"); + stmt.setString(8, "chara"); + stmt.setInt(9, index+1); + stmt.setLong(10, index+1); + stmt.setBigDecimal(11, new BigDecimal(index+1)); + stmt.setDate(12, date); + stmt.setString(13, "varchar_b"); + stmt.setString(14, "charb"); + stmt.setInt(15, index+2); + stmt.setLong(16, index+2); + stmt.setBigDecimal(17, new BigDecimal(index+2)); + stmt.setDate(18, date); + stmt.executeUpdate(); + } // Populate the test table with data. public static void populateTestTable(String fullTableName) throws SQLException { Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); try (Connection conn = DriverManager.getConnection(getUrl(), props)) { - String upsert = "UPSERT INTO " + fullTableName - + " VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"; - PreparedStatement stmt = conn.prepareStatement(upsert); - stmt.setString(1, "varchar1"); - stmt.setString(2, "char1"); - stmt.setInt(3, 1); - stmt.setLong(4, 1L); - stmt.setBigDecimal(5, new BigDecimal(1.0)); - Date date = DateUtil.parseDate("2015-01-01 00:00:00"); - stmt.setDate(6, date); - stmt.setString(7, "varchar_a"); - stmt.setString(8, "chara"); - stmt.setInt(9, 2); - stmt.setLong(10, 2L); - stmt.setBigDecimal(11, new BigDecimal(2.0)); - stmt.setDate(12, date); - stmt.setString(13, "varchar_b"); - stmt.setString(14, "charb"); - stmt.setInt(15, 3); - stmt.setLong(16, 3L); - stmt.setBigDecimal(17, new BigDecimal(3.0)); - stmt.setDate(18, date); - stmt.executeUpdate(); - - stmt.setString(1, "varchar2"); - stmt.setString(2, "char2"); - stmt.setInt(3, 2); - stmt.setLong(4, 2L); - stmt.setBigDecimal(5, new BigDecimal(2.0)); - date = DateUtil.parseDate("2015-01-02 00:00:00"); - stmt.setDate(6, date); - stmt.setString(7, "varchar_a"); - stmt.setString(8, "chara"); - stmt.setInt(9, 3); - stmt.setLong(10, 3L); - stmt.setBigDecimal(11, new BigDecimal(3.0)); - stmt.setDate(12, date); - stmt.setString(13, "varchar_b"); - stmt.setString(14, "charb"); - stmt.setInt(15, 4); - stmt.setLong(16, 4L); - stmt.setBigDecimal(17, new BigDecimal(4.0)); - stmt.setDate(18, date); - stmt.executeUpdate(); - - stmt.setString(1, "varchar3"); - stmt.setString(2, "char3"); - stmt.setInt(3, 3); - stmt.setLong(4, 3L); - stmt.setBigDecimal(5, new BigDecimal(3.0)); - date = DateUtil.parseDate("2015-01-03 00:00:00"); - stmt.setDate(6, date); - stmt.setString(7, "varchar_a"); - stmt.setString(8, "chara"); - stmt.setInt(9, 4); - stmt.setLong(10, 4L); - stmt.setBigDecimal(11, new BigDecimal(4.0)); - stmt.setDate(12, date); - stmt.setString(13, "varchar_b"); - stmt.setString(14, "charb"); - stmt.setInt(15, 5); - stmt.setLong(16, 5L); - stmt.setBigDecimal(17, new BigDecimal(5.0)); - stmt.setDate(18, date); - stmt.executeUpdate(); - + upsertRows(conn, fullTableName, 3); conn.commit(); } }
