Repository: phoenix Updated Branches: refs/heads/4.x-HBase-1.4 b0231dfc3 -> bb2e77db8
PHOENIX-4930 Add test for ORDER BY and LIMIT queries during a split Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/bb2e77db Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/bb2e77db Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/bb2e77db Branch: refs/heads/4.x-HBase-1.4 Commit: bb2e77db8ec513cb74ed92c9afbb3f69725d1a40 Parents: b0231df Author: Thomas D'Silva <[email protected]> Authored: Sat Sep 15 14:27:14 2018 -0700 Committer: Thomas D'Silva <[email protected]> Committed: Thu Sep 27 21:14:11 2018 -0700 ---------------------------------------------------------------------- .../org/apache/phoenix/end2end/SplitIT.java | 260 +++++++++++++++++++ 1 file changed, 260 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/bb2e77db/phoenix-core/src/it/java/org/apache/phoenix/end2end/SplitIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/SplitIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/SplitIT.java new file mode 100644 index 0000000..73cf1f0 --- /dev/null +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/SplitIT.java @@ -0,0 +1,260 @@ +package org.apache.phoenix.end2end; + +import com.google.common.collect.Maps; +import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver; +import org.apache.hadoop.hbase.coprocessor.ObserverContext; +import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; +import org.apache.hadoop.hbase.regionserver.InternalScanner; +import org.apache.hadoop.hbase.regionserver.Region; +import org.apache.phoenix.hbase.index.Indexer; +import org.apache.phoenix.query.QueryServices; +import org.apache.phoenix.schema.StaleRegionBoundaryCacheException; +import org.apache.phoenix.schema.types.PInteger; +import org.apache.phoenix.util.ReadOnlyProps; +import org.apache.phoenix.util.TestUtil; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.io.IOException; +import java.sql.*; +import java.util.List; +import java.util.Map; + +import static org.junit.Assert.*; + +public class SplitIT extends BaseUniqueNamesOwnClusterIT { + private static final String SPLIT_TABLE_NAME_PREFIX = "SPLIT_TABLE_"; + private static boolean tableWasSplitDuringScannerNext = false; + private static byte[] splitPoint = null; + + @BeforeClass + public static void doSetup() throws Exception { + Map<String, String> serverProps = Maps.newHashMapWithExpectedSize(1); + serverProps.put("hbase.coprocessor.region.classes", TestRegionObserver.class.getName()); + serverProps.put(Indexer.CHECK_VERSION_CONF_KEY, "false"); + Map<String, String> clientProps = Maps.newHashMapWithExpectedSize(3); + clientProps.put(QueryServices.MUTATE_BATCH_SIZE_ATTRIB, Integer.toString(10)); + // read rows in batches 3 at time + clientProps.put(QueryServices.SCAN_CACHE_SIZE_ATTRIB, Integer.toString(3)); + setUpTestDriver(new ReadOnlyProps(serverProps.entrySet().iterator()), new ReadOnlyProps(clientProps.entrySet().iterator())); + } + + public static class TestRegionObserver extends BaseRegionObserver { + + @Override + public boolean postScannerNext(final ObserverContext<RegionCoprocessorEnvironment> c, + final InternalScanner s, final List<Result> results, final int limit, + final boolean hasMore) throws IOException { + Region region = c.getEnvironment().getRegion(); + String tableName = region.getRegionInfo().getTable().getNameAsString(); + if (tableName.startsWith(SPLIT_TABLE_NAME_PREFIX) && results.size()>1) { + int pk = (Integer)PInteger.INSTANCE.toObject(results.get(0).getRow()); + // split when row 10 is read + if (pk==10 && !tableWasSplitDuringScannerNext) { + try { + // split on the first row being scanned if splitPoint is null + splitPoint = splitPoint!=null ? splitPoint : results.get(0).getRow(); + splitTable(splitPoint, tableName); + tableWasSplitDuringScannerNext = true; + } + catch (SQLException e) { + throw new IOException(e); + } + } + } + return hasMore; + } + + } + + public static void splitTable(byte[] splitPoint, String tableName) throws SQLException, IOException { + HBaseAdmin admin = driver.getConnectionQueryServices(getUrl(), TestUtil.TEST_PROPERTIES).getAdmin(); + int nRegions = admin.getTableRegions(tableName.getBytes()).size(); + int nInitialRegions = nRegions; + admin.split(tableName.getBytes(), splitPoint); + admin.disableTable(tableName); + admin.enableTable(tableName); + nRegions = admin.getTableRegions(tableName.getBytes()).size(); + if (nRegions == nInitialRegions) + throw new IOException("Could not split for " + tableName); + } + + /** + * Runs an UPSERT SELECT into the same table while the table is split + */ + public void helpTestUpsertSelectWithSplit(boolean splitTableBeforeUpsertSelect) throws Exception { + Connection conn = DriverManager.getConnection(getUrl()); + conn.setAutoCommit(true); + String keys = generateUniqueName(); + conn.createStatement().execute("CREATE SEQUENCE " + keys + " CACHE 1000"); + String tableName = SPLIT_TABLE_NAME_PREFIX + generateUniqueName(); + conn.createStatement().execute( + "CREATE TABLE " + tableName + " (pk INTEGER PRIMARY KEY, val INTEGER)"); + + conn.createStatement().execute( + "UPSERT INTO " + tableName + " VALUES (NEXT VALUE FOR " + keys + ",1)"); + PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + tableName + " SELECT NEXT VALUE FOR " + keys + ", pk FROM " + tableName); + for (int i=0; i<7; i++) { + if (splitTableBeforeUpsertSelect) { + // split the table and then run the UPSERT SELECT + splitTable(PInteger.INSTANCE.toBytes(Math.pow(2, i)), tableName); + } + int upsertCount = stmt.executeUpdate(); + assertEquals((int) Math.pow(2, i), upsertCount); + } + conn.close(); + } + + /** + * Runs SELECT to a table that is being written to while a SPLIT happens + */ + public void helpTestSelectWithSplit(boolean splitTableBeforeSelect, boolean orderBy, boolean limit) throws Exception { + Connection conn = DriverManager.getConnection(getUrl()); + conn.setAutoCommit(true); + String tableName = SPLIT_TABLE_NAME_PREFIX + generateUniqueName(); + int pk = 1; + conn.createStatement().execute( + "CREATE TABLE " + tableName + " (pk INTEGER PRIMARY KEY, val INTEGER)"); + + conn.createStatement().execute( + "UPSERT INTO " + tableName + " VALUES (1,1)"); + PreparedStatement stmt = conn.prepareStatement(" UPSERT INTO " + tableName + " VALUES (?,?) "); + for (int i=0; i<5; i++) { + if (splitTableBeforeSelect) { + // split the table and then run the SELECT + splitTable(PInteger.INSTANCE.toBytes(Math.pow(2, i)), tableName); + } + + int count = 0; + while (count<2) { + String query = "SELECT * FROM " + tableName + (orderBy ? " ORDER BY val" : "") + (limit ? " LIMIT 32" : ""); + try { + ResultSet rs = conn.createStatement().executeQuery(query); + while (rs.next()) { + stmt.setInt(1, ++pk); + stmt.setInt(2, rs.getInt(1)); + stmt.execute(); + } + break; + } catch (StaleRegionBoundaryCacheException e) { + if (!orderBy) + fail("Simple selects should not check for splits, they let HBase restart the scan"); + if (count>0) + throw e; + ++count; + } + } + + ResultSet rs = conn.createStatement().executeQuery("SELECT COUNT(1) FROM " + tableName); + assertTrue(rs.next()); + int rowCount = rs.getInt(1); + assertFalse(rs.next()); + + // for ORDER BY a StaleRegionBoundaryException is thrown when a sp[it happens + if (orderBy) { + // if the table splits before the SELECT we always detect this so we never see rows written after the scan started + if (splitTableBeforeSelect) + assertEquals((int) Math.pow(2, i + 1), rowCount); + // else we see rows written after the SELECT started + else if (i == 4) { + assert ((int) Math.pow(2, i + 1) < rowCount); + } + } + // verify that we will see more rows written after the scan started after a split happens for simple select + else if ((splitTableBeforeSelect && i == 3) || i == 4) { + assert ((int) Math.pow(2, i + 1) < rowCount); + } + } + conn.close(); + } + + @Test + public void testUpsertSelectAfterTableSplit() throws Exception { + // no need to split the table during the UPSERT SELECT for this test so just set the flag to true + tableWasSplitDuringScannerNext = true; + helpTestUpsertSelectWithSplit(true); + } + + @Test + public void testUpsertSelectDuringSplitOnRowScanned() throws Exception { + tableWasSplitDuringScannerNext = false; + splitPoint = null; + helpTestUpsertSelectWithSplit(false); + } + + @Test + public void testUpsertSelectDuringSplitOnRowInMiddleOfRegionBeingScanned() throws Exception { + tableWasSplitDuringScannerNext = false; + // when the table has 16 rows, split the table in the middle of the region on row 14 + splitPoint = PInteger.INSTANCE.toBytes(14); + helpTestUpsertSelectWithSplit(false); + } + + @Test + public void testSimpleSelectAfterTableSplit() throws Exception { + // no need to split the table while running the SELECT and the UPSERT so just set the flag to true + tableWasSplitDuringScannerNext = true; + helpTestSelectWithSplit(true, false, false); + } + + @Test + public void testSimpleSelectDuringSplitOnRowScanned() throws Exception { + tableWasSplitDuringScannerNext = false; + splitPoint = null; + helpTestSelectWithSplit(false, false, false); + } + + @Test + public void testSimpleSelectDuringSplitOnRowInMiddleOfRegionBeingScanned() throws Exception { + tableWasSplitDuringScannerNext = false; + // when the table has 16 rows, split the table in the middle of the region on row 14 + splitPoint = PInteger.INSTANCE.toBytes(14); + helpTestSelectWithSplit(false, false, false); + } + + @Test + public void testOrderByAfterTableSplit() throws Exception { + // no need to split the table while running the SELECT and the UPSERT so just set the flag to true + tableWasSplitDuringScannerNext = true; + helpTestSelectWithSplit(true, true, false); + } + + @Test + public void testOrderByDuringSplitOnRowScanned() throws Exception { + tableWasSplitDuringScannerNext = false; + splitPoint = null; + helpTestSelectWithSplit(false, true, false); + } + + @Test + public void testOrderByDuringSplitOnRowInMiddleOfRegionBeingScanned() throws Exception { + tableWasSplitDuringScannerNext = false; + // when the table has 16 rows, split the table in the middle of the region on row 14 + splitPoint = PInteger.INSTANCE.toBytes(14); + helpTestSelectWithSplit(false, true, false); + } + + @Test + public void testLimitAfterTableSplit() throws Exception { + // no need to split the table while running the SELECT and the UPSERT so just set the flag to true + tableWasSplitDuringScannerNext = true; + helpTestSelectWithSplit(true, false, true); + } + + @Test + public void testLimitDuringSplitOnRowScanned() throws Exception { + tableWasSplitDuringScannerNext = false; + splitPoint = null; + helpTestSelectWithSplit(false, false, true); + } + + @Test + public void testLimitDuringSplitOnRowInMiddleOfRegionBeingScanned() throws Exception { + tableWasSplitDuringScannerNext = false; + // when the table has 16 rows, split the table in the middle of the region on row 14 + splitPoint = PInteger.INSTANCE.toBytes(14); + helpTestSelectWithSplit(false, false, true); + } +}
