Repository: phoenix Updated Branches: refs/heads/4.x-HBase-0.98 56810a661 -> 69032041f
PHOENIX-2316 Add region server name as attribute on parallel Scan Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/69032041 Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/69032041 Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/69032041 Branch: refs/heads/4.x-HBase-0.98 Commit: 69032041ff58f9577a8359cfe09b0503ee2ec1c5 Parents: 56810a6 Author: James Taylor <[email protected]> Authored: Mon Oct 26 13:58:32 2015 -0700 Committer: James Taylor <[email protected]> Committed: Mon Oct 26 15:16:03 2015 -0700 ---------------------------------------------------------------------- .../phoenix/end2end/ParallelIteratorsIT.java | 29 ++++++++++++++++++++ .../coprocessor/BaseScannerRegionObserver.java | 1 + .../phoenix/iterate/BaseResultIterators.java | 10 ++++--- 3 files changed, 36 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/69032041/phoenix-core/src/it/java/org/apache/phoenix/end2end/ParallelIteratorsIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ParallelIteratorsIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ParallelIteratorsIT.java index df25c46..e86cf27 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ParallelIteratorsIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ParallelIteratorsIT.java @@ -25,13 +25,20 @@ import static org.apache.phoenix.util.TestUtil.analyzeTableIndex; import static org.apache.phoenix.util.TestUtil.getAllSplits; import static org.apache.phoenix.util.TestUtil.getSplits; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; import java.sql.Connection; import java.sql.DriverManager; import java.sql.PreparedStatement; +import java.sql.ResultSet; import java.util.List; import java.util.Map; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.phoenix.compile.QueryPlan; +import org.apache.phoenix.coprocessor.BaseScannerRegionObserver; +import org.apache.phoenix.jdbc.PhoenixStatement; import org.apache.phoenix.query.KeyRange; import org.apache.phoenix.query.QueryServices; import org.apache.phoenix.schema.types.PChar; @@ -106,6 +113,28 @@ public class ParallelIteratorsIT extends BaseOwnClusterHBaseManagedTimeIT { } @Test + public void testServerNameOnScan() throws Exception { + Connection conn = DriverManager.getConnection(getUrl(), TEST_PROPERTIES); + byte[][] splits = new byte[][] { K3, K9, KR }; + ensureTableCreated(getUrl(), STABLE_NAME, splits); + + PhoenixStatement stmt = conn.createStatement().unwrap(PhoenixStatement.class); + ResultSet rs = stmt.executeQuery("SELECT * FROM " + STABLE_NAME + " LIMIT 1"); + rs.next(); + QueryPlan plan = stmt.getQueryPlan(); + List<List<Scan>> nestedScans = plan.getScans(); + assertNotNull(nestedScans); + for (List<Scan> scans : nestedScans) { + for (Scan scan : scans) { + byte[] serverNameBytes = scan.getAttribute(BaseScannerRegionObserver.SCAN_REGION_SERVER); + assertNotNull(serverNameBytes); + ServerName serverName = ServerName.parseVersionedServerName(serverNameBytes); + assertNotNull(serverName.getHostname()); + } + } + } + + @Test public void testGuidePostsLifeCycle() throws Exception { Connection conn = DriverManager.getConnection(getUrl(), TEST_PROPERTIES); byte[][] splits = new byte[][] { K3, K9, KR }; http://git-wip-us.apache.org/repos/asf/phoenix/blob/69032041/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java index 8e94c78..49c6076 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java @@ -88,6 +88,7 @@ abstract public class BaseScannerRegionObserver extends BaseRegionObserver { public static final String GUIDEPOST_WIDTH_BYTES = "_GUIDEPOST_WIDTH_BYTES"; public static final String GUIDEPOST_PER_REGION = "_GUIDEPOST_PER_REGION"; public static final String UPGRADE_DESC_ROW_KEY = "_UPGRADE_DESC_ROW_KEY"; + public static final String SCAN_REGION_SERVER = "_SCAN_REGION_SERVER"; /** * Attribute name used to pass custom annotations in Scans and Mutations (later). Custom annotations http://git-wip-us.apache.org/repos/asf/phoenix/blob/69032041/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java index ec961d5..57e84f0 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java @@ -401,9 +401,10 @@ public abstract class BaseResultIterators extends ExplainTable implements Result return buf.toString(); } - private List<Scan> addNewScan(List<List<Scan>> parallelScans, List<Scan> scans, Scan scan, byte[] startKey, boolean crossedRegionBoundary) { + private List<Scan> addNewScan(List<List<Scan>> parallelScans, List<Scan> scans, Scan scan, byte[] startKey, boolean crossedRegionBoundary, HRegionLocation regionLocation) { boolean startNewScan = scanGrouper.shouldStartNewScan(plan, scans, startKey, crossedRegionBoundary); if (scan != null) { + scan.setAttribute(BaseScannerRegionObserver.SCAN_REGION_SERVER, regionLocation.getServerName().getVersionedBytes()); scans.add(scan); } if (startNewScan && !scans.isEmpty()) { @@ -476,15 +477,16 @@ public abstract class BaseResultIterators extends ExplainTable implements Result } else { endKey = regionBoundaries.get(regionIndex); } + HRegionLocation regionLocation = regionLocations.get(regionIndex); if (isLocalIndex) { - HRegionInfo regionInfo = regionLocations.get(regionIndex).getRegionInfo(); + HRegionInfo regionInfo = regionLocation.getRegionInfo(); endRegionKey = regionInfo.getEndKey(); keyOffset = ScanUtil.getRowKeyOffset(regionInfo.getStartKey(), endRegionKey); } while (guideIndex < gpsSize && (Bytes.compareTo(currentGuidePost = gps.get(guideIndex), endKey) <= 0 || endKey.length == 0)) { Scan newScan = scanRanges.intersectScan(scan, currentKey, currentGuidePost, keyOffset, false); - scans = addNewScan(parallelScans, scans, newScan, currentGuidePost, false); + scans = addNewScan(parallelScans, scans, newScan, currentGuidePost, false, regionLocation); currentKey = currentGuidePost; guideIndex++; } @@ -496,7 +498,7 @@ public abstract class BaseResultIterators extends ExplainTable implements Result scans.get(scans.size()-1).setAttribute(EXPECTED_UPPER_REGION_KEY, endRegionKey); } } - scans = addNewScan(parallelScans, scans, newScan, endKey, true); + scans = addNewScan(parallelScans, scans, newScan, endKey, true, regionLocation); currentKey = endKey; regionIndex++; }
