Repository: phoenix Updated Branches: refs/heads/4.0 d2cef2bd3 -> 349d36bce
PHOENIX-1336 Exception when select from local index:Cache of region boundaries are out of date(Rajeshbabu) Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/349d36bc Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/349d36bc Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/349d36bc Branch: refs/heads/4.0 Commit: 349d36bcee61ad3200c2072983c9430fdcfaaa95 Parents: d2cef2b Author: Rajeshbabu Chintaguntla <rajeshb...@apache.org> Authored: Thu Jan 22 17:36:03 2015 +0530 Committer: Rajeshbabu Chintaguntla <rajeshb...@apache.org> Committed: Thu Jan 22 17:36:03 2015 +0530 ---------------------------------------------------------------------- .../phoenix/end2end/index/LocalIndexIT.java | 45 +++++++++++++++++++- .../org/apache/phoenix/compile/ScanRanges.java | 7 ++- .../coprocessor/BaseScannerRegionObserver.java | 1 + .../phoenix/iterate/ChunkedResultIterator.java | 24 +++++++++++ 4 files changed, 75 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/349d36bc/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/LocalIndexIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/LocalIndexIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/LocalIndexIT.java index 03323f1..6ff0475 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/LocalIndexIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/LocalIndexIT.java @@ -32,6 +32,7 @@ import java.sql.SQLException; import java.sql.Statement; import java.util.List; import java.util.Map; +import java.util.Properties; import java.util.concurrent.CountDownLatch; import org.apache.hadoop.hbase.HRegionInfo; @@ -101,7 +102,7 @@ public class LocalIndexIT extends BaseHBaseManagedTimeIT { "k3 INTEGER,\n" + "v1 VARCHAR,\n" + "CONSTRAINT pk PRIMARY KEY (t_id, k1, k2))\n" - + (saltBuckets != null && splits == null ? (",salt_buckets=" + saltBuckets) : "" + + (saltBuckets != null && splits == null ? (" salt_buckets=" + saltBuckets) : "" + (saltBuckets == null && splits != null ? (" split on " + splits) : "")); conn.createStatement().execute(ddl); conn.close(); @@ -786,6 +787,48 @@ public class LocalIndexIT extends BaseHBaseManagedTimeIT { } @Test + public void testLocalIndexScanWithSmallChunks() throws Exception { + createBaseTable(TestUtil.DEFAULT_DATA_TABLE_NAME, 3, null); + Properties props = new Properties(); + props.setProperty(QueryServices.SCAN_RESULT_CHUNK_SIZE, "2"); + Connection conn1 = DriverManager.getConnection(getUrl(), props); + try{ + String[] strings = {"a","b","c","d","e","f","g","h","i","j","k","l","m","n","o","p","q","r","s","t","u","v","w","x","y","z"}; + for (int i = 0; i < 26; i++) { + conn1.createStatement().execute( + "UPSERT INTO " + TestUtil.DEFAULT_DATA_TABLE_NAME + " values('"+strings[i]+"'," + i + "," + + (i + 1) + "," + (i + 2) + ",'" + strings[25 - i] + "')"); + } + conn1.commit(); + conn1.createStatement().execute("CREATE LOCAL INDEX " + TestUtil.DEFAULT_INDEX_TABLE_NAME + " ON " + TestUtil.DEFAULT_DATA_TABLE_NAME + "(v1)"); + conn1.createStatement().execute("CREATE LOCAL INDEX " + TestUtil.DEFAULT_INDEX_TABLE_NAME + "_2 ON " + TestUtil.DEFAULT_DATA_TABLE_NAME + "(k3)"); + + ResultSet rs = conn1.createStatement().executeQuery("SELECT * FROM " + TestUtil.DEFAULT_DATA_TABLE_NAME); + assertTrue(rs.next()); + + String query = "SELECT t_id,k1,v1 FROM " + TestUtil.DEFAULT_DATA_TABLE_NAME; + rs = conn1.createStatement().executeQuery(query); + for (int j = 0; j < 26; j++) { + assertTrue(rs.next()); + assertEquals(strings[25 - j], rs.getString("t_id")); + assertEquals(25 - j, rs.getInt("k1")); + assertEquals(strings[j], rs.getString("V1")); + } + query = "SELECT t_id,k1,k3 FROM " + TestUtil.DEFAULT_DATA_TABLE_NAME; + rs = conn1.createStatement().executeQuery(query); + Thread.sleep(1000); + for (int j = 0; j < 26; j++) { + assertTrue(rs.next()); + assertEquals(strings[j], rs.getString("t_id")); + assertEquals(j, rs.getInt("k1")); + assertEquals(j + 2, rs.getInt("k3")); + } + } finally { + conn1.close(); + } + } + + @Test public void testLocalIndexScanAfterRegionsMerge() throws Exception { createBaseTable(TestUtil.DEFAULT_DATA_TABLE_NAME, null, "('e','j','o')"); Connection conn1 = DriverManager.getConnection(getUrl()); http://git-wip-us.apache.org/repos/asf/phoenix/blob/349d36bc/phoenix-core/src/main/java/org/apache/phoenix/compile/ScanRanges.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/ScanRanges.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/ScanRanges.java index 0842f6a..473e579 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/ScanRanges.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/ScanRanges.java @@ -17,6 +17,8 @@ */ package org.apache.phoenix.compile; +import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.STARTKEY_OFFSET; + import java.util.Arrays; import java.util.Collections; import java.util.Iterator; @@ -356,7 +358,10 @@ public class ScanRanges { } newScan.setStartRow(scanStartKey); newScan.setStopRow(scanStopKey); - + if(keyOffset > 0) { + newScan.setAttribute(STARTKEY_OFFSET, Bytes.toBytes(keyOffset)); + } + return newScan; } http://git-wip-us.apache.org/repos/asf/phoenix/blob/349d36bc/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java index 4033c54..1647e5c 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java @@ -78,6 +78,7 @@ abstract public class BaseScannerRegionObserver extends BaseRegionObserver { public static final String LOCAL_INDEX_JOIN_SCHEMA = "_LocalIndexJoinSchema"; public static final String DATA_TABLE_COLUMNS_TO_JOIN = "_DataTableColumnsToJoin"; public static final String VIEW_CONSTANTS = "_ViewConstants"; + public static final String STARTKEY_OFFSET = "_StartKeyOffset"; public static final String EXPECTED_UPPER_REGION_KEY = "_ExpectedUpperRegionKey"; public static final String REVERSE_SCAN = "_ReverseScan"; public static final String ANALYZE_TABLE = "_ANALYZETABLE"; http://git-wip-us.apache.org/repos/asf/phoenix/blob/349d36bc/phoenix-core/src/main/java/org/apache/phoenix/iterate/ChunkedResultIterator.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ChunkedResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ChunkedResultIterator.java index fecb0d1..e1ee8db 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ChunkedResultIterator.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ChunkedResultIterator.java @@ -18,6 +18,8 @@ package org.apache.phoenix.iterate; +import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.STARTKEY_OFFSET; + import java.sql.SQLException; import java.util.List; @@ -152,6 +154,9 @@ public class ChunkedResultIterator implements PeekingResultIterator { // be able to start the next chunk on the next row key if (rowCount == chunkSize) { next.getKey(lastKey); + if (scan.getAttribute(STARTKEY_OFFSET) != null) { + addRegionStartKeyToLaskKey(); + } } else if (rowCount > chunkSize && rowKeyChanged(next)) { chunkComplete = true; return null; @@ -178,10 +183,29 @@ public class ChunkedResultIterator implements PeekingResultIterator { int offset = lastKey.getOffset(); int length = lastKey.getLength(); newTuple.getKey(lastKey); + if (scan.getAttribute(STARTKEY_OFFSET) != null) { + addRegionStartKeyToLaskKey(); + } return Bytes.compareTo(currentKey, offset, length, lastKey.get(), lastKey.getOffset(), lastKey.getLength()) != 0; } + /** + * Prefix region start key to last key to form actual row key in case of local index scan. + */ + private void addRegionStartKeyToLaskKey() { + byte[] offsetBytes = scan.getAttribute(STARTKEY_OFFSET); + if (offsetBytes != null) { + int startKeyOffset = Bytes.toInt(offsetBytes); + byte[] actualLastkey = + new byte[startKeyOffset + lastKey.getLength() - lastKey.getOffset()]; + System.arraycopy(scan.getStartRow(), 0, actualLastkey, 0, startKeyOffset); + System.arraycopy(lastKey.get(), lastKey.getOffset(), actualLastkey, + startKeyOffset, lastKey.getLength()); + lastKey.set(actualLastkey); + } + } + @Override public String toString() { return "SingleChunkResultIterator [rowCount=" + rowCount