PHOENIX-3112 Partial row scan not handled correctly
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/aaa41a33 Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/aaa41a33 Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/aaa41a33 Branch: refs/heads/master Commit: aaa41a33d025ad6daa832fe8b42fc235e7154648 Parents: bd21ed3 Author: Sergey Soldatov <s...@apache.org> Authored: Wed Aug 2 16:56:04 2017 -0700 Committer: Sergey Soldatov <s...@apache.org> Committed: Tue Oct 3 00:44:26 2017 -0700 ---------------------------------------------------------------------- .../PartialResultServerConfigurationIT.java | 148 ++++++++++++++ .../PartialScannerResultsDisabledIT.java | 193 +++++++++++++++++++ .../DataTableLocalIndexRegionScanner.java | 7 +- .../hbase/regionserver/ScannerContextUtil.java | 41 ++++ .../phoenix/coprocessor/BaseRegionScanner.java | 4 +- .../coprocessor/BaseScannerRegionObserver.java | 11 +- .../coprocessor/DelegateRegionScanner.java | 4 +- .../coprocessor/HashJoinRegionScanner.java | 38 +--- .../phoenix/iterate/RegionScannerFactory.java | 51 +---- .../phoenix/schema/stats/StatisticsScanner.java | 4 +- 10 files changed, 411 insertions(+), 90 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/aaa41a33/phoenix-core/src/it/java/org/apache/phoenix/end2end/PartialResultServerConfigurationIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/PartialResultServerConfigurationIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/PartialResultServerConfigurationIT.java new file mode 100644 index 0000000..1c9ac38 --- /dev/null +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/PartialResultServerConfigurationIT.java @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.end2end; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.MiniHBaseCluster; +import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest; +import org.apache.phoenix.jdbc.PhoenixConnection; +import org.apache.phoenix.jdbc.PhoenixDriver; +import org.apache.phoenix.query.ConnectionQueryServices.Feature; +import org.apache.phoenix.query.QueryServices; +import org.apache.phoenix.util.PhoenixRuntime; +import org.apache.phoenix.util.PropertiesUtil; +import org.junit.AfterClass; +import org.junit.Assume; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.Statement; +import java.util.Properties; +import java.util.Random; +import java.util.UUID; + +import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD; +import static org.apache.phoenix.query.BaseTest.generateUniqueName; +import static org.apache.phoenix.query.BaseTest.setUpConfigForMiniCluster; +import static org.apache.phoenix.query.QueryServices.*; +import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + + +/** + * This is a separate from @PartialResultDisabledIT because it requires server side configuration + */ +@Category(NeedsOwnMiniClusterTest.class) +public class PartialResultServerConfigurationIT { + private static HBaseTestingUtility hbaseTestUtil; + private static String zkQuorum; + private static String url; + + @BeforeClass + public static void setUp() throws Exception { + Configuration conf = HBaseConfiguration.create(); + hbaseTestUtil = new HBaseTestingUtility(conf); + setUpConfigForMiniCluster(conf); + + //Enforce the limit of the result, so scans will stop between cells. + conf.setLong(HConstants.HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY, 5); + conf.setLong(HConstants.HBASE_SERVER_SCANNER_MAX_RESULT_SIZE_KEY, 5); + + hbaseTestUtil.startMiniCluster(); + zkQuorum = "localhost:" + hbaseTestUtil.getZkCluster().getClientPort(); + url = PhoenixRuntime.JDBC_PROTOCOL + PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR + zkQuorum; + + DriverManager.registerDriver(PhoenixDriver.INSTANCE); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + try { + DriverManager.deregisterDriver(PhoenixDriver.INSTANCE); + } finally { + hbaseTestUtil.shutdownMiniCluster(); + } + } + + /** + * This test creates two tables with a single record at the end of each tables that match the join condition + * if scanner context is used during the scan, it would produce a partial row with NULL values. + * @throws Exception + */ + @Test + public void testJoinScan() throws Exception { + String tableNameR = generateUniqueName(); + String tableNameL = generateUniqueName(); + + int numRecords = 1000; + try (Connection conn = DriverManager.getConnection(url)) { + conn.createStatement().execute( + "CREATE TABLE " + tableNameR + " (PK1 INTEGER NOT NULL PRIMARY KEY, KV1 VARCHAR)"); + int i = 0; + String upsert = "UPSERT INTO " + tableNameR + " VALUES (?, ?)"; + Random random = new Random(); + PreparedStatement stmt = conn.prepareStatement(upsert); + while (i < numRecords) { + stmt.setInt(1, i); + stmt.setString(2, UUID.randomUUID().toString()); + stmt.executeUpdate(); + i++; + } + stmt.setInt(1, 9999); + stmt.setString(2, UUID.randomUUID().toString()); + stmt.executeUpdate(); + conn.commit(); + conn.createStatement().execute( + "CREATE TABLE " + tableNameL + " (PK1 INTEGER NOT NULL PRIMARY KEY, KV1 VARCHAR)"); + upsert = "UPSERT INTO " + tableNameL + " VALUES (?, ?)"; + stmt = conn.prepareStatement(upsert); + while (i < numRecords * 2) { + stmt.setInt(1, random.nextInt()); + stmt.setString(2, "KV" + random.nextInt()); + stmt.executeUpdate(); + i++; + } + stmt.setInt(1, 9999); + stmt.setString(2, UUID.randomUUID().toString()); + stmt.executeUpdate(); + conn.commit(); + + String sql = "SELECT * FROM " + tableNameR + " A JOIN " + tableNameL + " B ON A.PK1 = B.PK1"; + Statement s = conn.createStatement(); + s.setFetchSize(2); + ResultSet rs = s.executeQuery(sql); + int count = 0; + while (rs.next()) { + if (rs.getString(2) == null || rs.getString(4) == null) + fail("Null value because of partial result from scan"); + count++; + } + assertEquals(count, 1); + } + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/phoenix/blob/aaa41a33/phoenix-core/src/it/java/org/apache/phoenix/end2end/PartialScannerResultsDisabledIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/PartialScannerResultsDisabledIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/PartialScannerResultsDisabledIT.java new file mode 100644 index 0000000..817b0bd --- /dev/null +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/PartialScannerResultsDisabledIT.java @@ -0,0 +1,193 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable + * law or agreed to in writing, software distributed under the License is distributed on an "AS IS" + * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License + * for the specific language governing permissions and limitations under the License. + */ +package org.apache.phoenix.end2end; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.sql.Timestamp; +import java.util.Properties; +import java.util.Random; +import java.util.UUID; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.commons.lang.RandomStringUtils; +import org.apache.hadoop.hbase.HConstants; +import org.apache.phoenix.util.IndexScrutiny; +import org.apache.phoenix.util.SchemaUtil; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class PartialScannerResultsDisabledIT extends ParallelStatsDisabledIT { + public static final String TEST_TABLE_DDL = + "CREATE TABLE IF NOT EXISTS %s\n" + "(\n" + " ORGANIZATION_ID CHAR(15) NOT NULL,\n" + + " FEED_ELEMENT_ID CHAR(15) NOT NULL,\n" + + " CONTAINER_ID CHAR(15) NOT NULL,\n" + + " FEED_TYPE VARCHAR(1) NOT NULL, \n" + + " NETWORK_ID CHAR(15) NOT NULL,\n" + " USER_ID CHAR(15) NOT NULL,\n" + + " CREATED_TIME TIMESTAMP,\n" + " LAST_UPDATE TIMESTAMP,\n" + + " RELEVANCE_SCORE DOUBLE,\n" + " FEED_ITEM_TYPE VARCHAR(1),\n" + + " FEED_ELEMENT_TYPE VARCHAR(1),\n" + + " FEED_ELEMENT_IS_SYS_GEN BOOLEAN,\n" + + " FEED_ELEMENT_STATUS VARCHAR(1),\n" + + " FEED_ELEMENT_VISIBILITY VARCHAR(1),\n" + " PARENT_ID CHAR(15),\n" + + " CREATED_BY CHAR(15),\n" + " BEST_COMMENT_ID CHAR(15),\n" + + " COMMENT_COUNT INTEGER,\n" + " CONSTRAINT PK PRIMARY KEY\n" + " (\n" + + " ORGANIZATION_ID,\n" + " FEED_ELEMENT_ID,\n" + + " CONTAINER_ID,\n" + " FEED_TYPE,\n" + " NETWORK_ID,\n" + + " USER_ID\n" + " )\n" + ") COLUMN_ENCODED_BYTES = 0"; + + public static final String INDEX_1_DDL = + "CREATE INDEX IF NOT EXISTS %s\n" + "ON %s (\n" + " NETWORK_ID,\n" + + " CONTAINER_ID,\n" + " FEED_TYPE,\n" + " USER_ID,\n" + + " CREATED_TIME DESC,\n" + " FEED_ELEMENT_ID DESC,\n" + + " CREATED_BY\n" + ") " + + " INCLUDE (\n" + " FEED_ITEM_TYPE,\n" + + " FEED_ELEMENT_TYPE,\n" + " FEED_ELEMENT_IS_SYS_GEN,\n" + + " FEED_ELEMENT_STATUS,\n" + " FEED_ELEMENT_VISIBILITY,\n" + + " PARENT_ID,\n" + " BEST_COMMENT_ID,\n" + " COMMENT_COUNT\n" + ")"; + + private static final String UPSERT_INTO_DATA_TABLE = + "UPSERT INTO %s\n" + "(\n" + " ORGANIZATION_ID,\n" + " FEED_ELEMENT_ID,\n" + + " CONTAINER_ID,\n" + " FEED_TYPE,\n" + " NETWORK_ID,\n" + + " USER_ID,\n" + " CREATED_TIME,\n" + " LAST_UPDATE,\n" + + " FEED_ITEM_TYPE,\n" + " FEED_ELEMENT_TYPE,\n" + + " FEED_ELEMENT_IS_SYS_GEN,\n" + " FEED_ELEMENT_STATUS,\n" + + " FEED_ELEMENT_VISIBILITY,\n" + " PARENT_ID,\n" + " CREATED_BY,\n" + + " BEST_COMMENT_ID,\n" + " COMMENT_COUNT\n" + ")" + + "VALUES(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)"; + + private String dataTableName; + private String indexTableName; + private String schemaName; + private String dataTableFullName; + private static String indexTableFullName; + private static final Logger logger = LoggerFactory.getLogger(PartialScannerResultsDisabledIT.class); + private static Random random = new Random(1); + // background writer threads + private static Random sourceOfRandomness = new Random(0); + private static AtomicInteger upsertIdCounter = new AtomicInteger(1); + + @Before + public void setup() throws Exception { + // create the tables + generateUniqueTableNames(); + createTestTable(getUrl(), String.format(TEST_TABLE_DDL, dataTableFullName)); + createTestTable(getUrl(), String.format(INDEX_1_DDL, indexTableName, dataTableFullName)); + } + + @Test + public void testWithEnoughData() throws Exception { + try (Connection conn = DriverManager.getConnection(getUrl())) { + // Write enough data to trigger partial scanner results + // TODO: it's likely that less data could be written if whatever + // config parameters decide this are lowered. + writeSingleBatch(conn, 100, 20, dataTableFullName); + logger.info("Running scrutiny"); + // Scutunize index to see if partial results are silently returned + // In that case we'll get a false positive on the scrutiny run. + long rowCount = IndexScrutiny.scrutinizeIndex(conn, dataTableFullName, indexTableFullName); + assertEquals(2000,rowCount); + } + } + + /** + * Simple select query with fetch size that exceed the result size. In that case scan would start to produce + * partial result sets that from Phoenix perspective are the rows with NULL values. + * @throws SQLException + */ + @Test + public void partialResultDuringSelect () throws SQLException { + String tableName = generateUniqueName(); + Properties props = new Properties(); + props.setProperty(HConstants.HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY, "5"); + int numRecords = 10; + try (Connection conn = DriverManager.getConnection(url, props)) { + conn.createStatement().execute( + "CREATE TABLE " + tableName + " (PK1 INTEGER NOT NULL PRIMARY KEY, KV1 VARCHAR)"); + int i = 0; + String upsert = "UPSERT INTO " + tableName + " VALUES (?, ?)"; + PreparedStatement stmt = conn.prepareStatement(upsert); + while (i < numRecords) { + stmt.setInt(1, i); + stmt.setString(2, UUID.randomUUID().toString()); + stmt.executeUpdate(); + i++; + } + conn.commit(); + + String sql = "SELECT * FROM " + tableName; + // at every next call wait for this period. This will cause lease to expire. + Statement s = conn.createStatement(); + s.setFetchSize(100); + ResultSet rs = s.executeQuery(sql); + int count = 0; + while (rs.next()) { + if (rs.getString(2) == null) + fail("Null value because of partial row scan"); + } + count++; + } + + } + + private static String randString(int length, Random random) { + return RandomStringUtils.randomAlphabetic(length); + } + + private void writeSingleBatch(Connection connection, int batchSize, int numBatches, String tableName) throws Exception { + for (int j = 0; j < numBatches; j++) { + try (PreparedStatement statement = + connection.prepareStatement(String.format(UPSERT_INTO_DATA_TABLE, tableName))) { + for (int i = 0; i < batchSize; i++) { + int index = 0; + String id = "" + upsertIdCounter.getAndIncrement(); + statement.setString(++index, id); // ORGANIZATION_ID + statement.setString(++index, id); // FEED_ELEMENT_ID,\n" + statement.setString(++index, id); // CONTAINER_ID,\n" + statement.setString(++index, randString(1, sourceOfRandomness)); // FEED_TYPE,\n" + statement.setString(++index, randString(15, sourceOfRandomness)); // NETWORK_ID,\n" + statement.setString(++index, randString(15, sourceOfRandomness)); // USER_ID,\n" + statement.setTimestamp(++index, new Timestamp(System.currentTimeMillis())); // CREATED_TIME,\n" + statement.setTimestamp(++index, new Timestamp(System.currentTimeMillis())); // LAST_UPDATE\n" + statement.setString(++index, randString(1, sourceOfRandomness)); // FEED_ITEM_TYPE\n" + statement.setString(++index, randString(1, sourceOfRandomness)); // FEED_ELEMENT_TYPE\n" + statement.setBoolean(++index, false); // FEED_ELEMENT_IS_SYS_GEN\n" + statement.setString(++index, randString(1, sourceOfRandomness)); // FEED_ELEMENT_STATUS\n" + statement.setString(++index, randString(1, sourceOfRandomness)); // FEED_ELEMENT_VISIBILITY\n" + statement.setString(++index, randString(15, sourceOfRandomness)); // PARENT_ID\n" + statement.setString(++index, randString(15, sourceOfRandomness)); // CREATED_BY\n" + statement.setString(++index, randString(15, sourceOfRandomness)); // BEST_COMMENT_ID\n" + statement.setInt(++index, random.nextInt()); // COMMENT_COUNT\n" + ")" + statement.execute(); + } + connection.commit(); + } + } + } + + private void generateUniqueTableNames() { + schemaName = generateUniqueName(); + dataTableName = generateUniqueName() + "_DATA"; + dataTableFullName = SchemaUtil.getTableName(schemaName, dataTableName); + indexTableName = generateUniqueName() + "_IDX"; + indexTableFullName = SchemaUtil.getTableName(schemaName, indexTableName); + } + +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/aaa41a33/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/DataTableLocalIndexRegionScanner.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/DataTableLocalIndexRegionScanner.java b/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/DataTableLocalIndexRegionScanner.java index 4c44e82..64d4ac4 100644 --- a/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/DataTableLocalIndexRegionScanner.java +++ b/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/DataTableLocalIndexRegionScanner.java @@ -85,8 +85,13 @@ public class DataTableLocalIndexRegionScanner extends DelegateRegionScanner { @Override public boolean next(List<Cell> outResult, ScannerContext scannerContext) throws IOException { + return next(outResult); + } + + @Override + public boolean next(List<Cell> results) throws IOException { List<Cell> dataTableResults = new ArrayList<Cell>(); - boolean next = super.next(dataTableResults, scannerContext); + boolean next = super.next(dataTableResults); addMutations(dataTableResults); if (ServerUtil.readyToCommit(mutationList.size(), mutationList.byteSize(), maxBatchSize, maxBatchSizeBytes)||!next) { region.batchMutate(mutationList.toArray(new Mutation[mutationList.size()]), HConstants.NO_NONCE, http://git-wip-us.apache.org/repos/asf/phoenix/blob/aaa41a33/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/ScannerContextUtil.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/ScannerContextUtil.java b/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/ScannerContextUtil.java new file mode 100644 index 0000000..126e0b1 --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/ScannerContextUtil.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.regionserver; + + +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellUtil; + +import java.util.List; + +/** + * @ScannerContext has all methods package visible. To properly update the context progress for our scanners we + * need this helper + */ +public class ScannerContextUtil { + public static void incrementSizeProgress(ScannerContext sc, List<Cell> cells) { + for (Cell cell : cells) { + sc.incrementSizeProgress(CellUtil.estimatedHeapSizeOfWithoutTags(cell)); + } + } + + public static void updateTimeProgress(ScannerContext sc) { + sc.updateTimeProgress(); + } +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/aaa41a33/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseRegionScanner.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseRegionScanner.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseRegionScanner.java index b5e9c9f..945c1c4 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseRegionScanner.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseRegionScanner.java @@ -41,7 +41,7 @@ public abstract class BaseRegionScanner extends DelegateRegionScanner { @Override public boolean next(List<Cell> result, ScannerContext scannerContext) throws IOException { - return next(result); + throw new IOException("Next with scannerContext should not be called in Phoenix environment"); } @Override @@ -56,6 +56,6 @@ public abstract class BaseRegionScanner extends DelegateRegionScanner { @Override public boolean nextRaw(List<Cell> result, ScannerContext scannerContext) throws IOException { - return next(result, scannerContext); + throw new IOException("NextRaw with scannerContext should not be called in Phoenix environment"); } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/aaa41a33/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java index 34361ac..d3b257b 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java @@ -33,6 +33,7 @@ import org.apache.hadoop.hbase.io.TimeRange; import org.apache.hadoop.hbase.regionserver.Region; import org.apache.hadoop.hbase.regionserver.RegionScanner; import org.apache.hadoop.hbase.regionserver.ScannerContext; +import org.apache.hadoop.hbase.regionserver.ScannerContextUtil; import org.apache.hadoop.hbase.util.Bytes; import org.apache.htrace.Span; import org.apache.htrace.Trace; @@ -267,7 +268,10 @@ abstract public class BaseScannerRegionObserver extends BaseRegionObserver { @Override public boolean next(List<Cell> result, ScannerContext scannerContext) throws IOException { overrideDelegate(); - return super.next(result, scannerContext); + boolean res = super.next(result); + ScannerContextUtil.incrementSizeProgress(scannerContext, result); + ScannerContextUtil.updateTimeProgress(scannerContext); + return res; } @Override @@ -279,7 +283,10 @@ abstract public class BaseScannerRegionObserver extends BaseRegionObserver { @Override public boolean nextRaw(List<Cell> result, ScannerContext scannerContext) throws IOException { overrideDelegate(); - return super.nextRaw(result, scannerContext); + boolean res = super.nextRaw(result); + ScannerContextUtil.incrementSizeProgress(scannerContext, result); + ScannerContextUtil.updateTimeProgress(scannerContext); + return res; } @Override http://git-wip-us.apache.org/repos/asf/phoenix/blob/aaa41a33/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/DelegateRegionScanner.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/DelegateRegionScanner.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/DelegateRegionScanner.java index 0ddabed..95d449a 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/DelegateRegionScanner.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/DelegateRegionScanner.java @@ -64,7 +64,7 @@ public class DelegateRegionScanner implements RegionScanner { @Override public boolean next(List<Cell> result, ScannerContext scannerContext) throws IOException { - return delegate.next(result, scannerContext); + throw new IOException("Next with scannerContext should not be called in Phoenix environment"); } @Override @@ -74,7 +74,7 @@ public class DelegateRegionScanner implements RegionScanner { @Override public boolean nextRaw(List<Cell> result, ScannerContext scannerContext) throws IOException { - return delegate.nextRaw(result, scannerContext); + throw new IOException("NextRaw with scannerContext should not be called in Phoenix environment"); } @Override http://git-wip-us.apache.org/repos/asf/phoenix/blob/aaa41a33/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/HashJoinRegionScanner.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/HashJoinRegionScanner.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/HashJoinRegionScanner.java index 5061d94..59f844d 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/HashJoinRegionScanner.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/HashJoinRegionScanner.java @@ -276,18 +276,7 @@ public class HashJoinRegionScanner implements RegionScanner { @Override public boolean nextRaw(List<Cell> result, ScannerContext scannerContext) throws IOException { - try { - while (shouldAdvance()) { - hasMore = scanner.nextRaw(result, scannerContext); - processResults(result, false); // TODO detect if limit used here - result.clear(); - } - - return nextInQueue(result); - } catch (Throwable t) { - ServerUtil.throwIOException(env.getRegion().getRegionInfo().getRegionNameAsString(), t); - return false; // impossible - } + throw new IOException("Next with scannerContext should not be called in Phoenix environment"); } @Override @@ -302,33 +291,12 @@ public class HashJoinRegionScanner implements RegionScanner { @Override public boolean next(List<Cell> result) throws IOException { - try { - while (shouldAdvance()) { - hasMore = scanner.next(result); - processResults(result, false); - result.clear(); - } - - return nextInQueue(result); - } catch (Throwable t) { - ServerUtil.throwIOException(env.getRegion().getRegionInfo().getRegionNameAsString(), t); - return false; // impossible - } + throw new IOException("Next should not be used in HashJoin scanner"); } @Override public boolean next(List<Cell> result, ScannerContext scannerContext) throws IOException { - try { - while (shouldAdvance()) { - hasMore = scanner.next(result, scannerContext); - processResults(result, false); // TODO detect if limit used here - result.clear(); - } - return nextInQueue(result); - } catch (Throwable t) { - ServerUtil.throwIOException(env.getRegion().getRegionInfo().getRegionNameAsString(), t); - return false; // impossible - } + throw new IOException("Next with scannerContext should not be called in Phoenix environment"); } @Override http://git-wip-us.apache.org/repos/asf/phoenix/blob/aaa41a33/phoenix-core/src/main/java/org/apache/phoenix/iterate/RegionScannerFactory.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/RegionScannerFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/RegionScannerFactory.java index 898a573..3dcbef9 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/RegionScannerFactory.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/RegionScannerFactory.java @@ -28,6 +28,7 @@ import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.regionserver.Region; import org.apache.hadoop.hbase.regionserver.RegionScanner; import org.apache.hadoop.hbase.regionserver.ScannerContext; +import org.apache.hadoop.hbase.regionserver.ScannerContextUtil; import org.apache.hadoop.hbase.util.Bytes; import org.apache.phoenix.execute.TupleProjector; import org.apache.phoenix.expression.Expression; @@ -140,12 +141,7 @@ public abstract class RegionScannerFactory { @Override public boolean next(List<Cell> result, ScannerContext scannerContext) throws IOException { - try { - return s.next(result, scannerContext); - } catch (Throwable t) { - ServerUtil.throwIOException(getRegion().getRegionInfo().getRegionNameAsString(), t); - return false; // impossible - } + throw new IOException("Next with scannerContext should not be called in Phoenix environment"); } @Override @@ -221,45 +217,10 @@ public abstract class RegionScannerFactory { @Override public boolean nextRaw(List<Cell> result, ScannerContext scannerContext) throws IOException { - try { - boolean next = s.nextRaw(result, scannerContext); - Cell arrayElementCell = null; - if (result.size() == 0) { - return next; - } - if (arrayFuncRefs != null && arrayFuncRefs.length > 0 && arrayKVRefs.size() > 0) { - int arrayElementCellPosition = replaceArrayIndexElement(arrayKVRefs, arrayFuncRefs, result); - arrayElementCell = result.get(arrayElementCellPosition); - } - if ((offset > 0 || ScanUtil.isLocalIndex(scan)) && !ScanUtil.isAnalyzeTable(scan)) { - if(hasReferences && actualStartKey!=null) { - next = scanTillScanStartRow(s, arrayKVRefs, arrayFuncRefs, result, - scannerContext, arrayElementCell); - if (result.isEmpty()) { - return next; - } - } - /* In the following, c is only used when data region is null. - dataRegion will never be null in case of non-coprocessor call, - therefore no need to refactor - */ - IndexUtil.wrapResultUsingOffset(env, result, offset, dataColumns, - tupleProjector, dataRegion, indexMaintainer, viewConstants, ptr); - } - if (projector != null) { - Tuple toProject = useQualifierAsListIndex ? new PositionBasedMultiKeyValueTuple(result) : new ResultTuple(Result.create(result)); - Tuple tuple = projector.projectResults(toProject, useNewValueColumnQualifier); - result.clear(); - result.add(tuple.getValue(0)); - if(arrayElementCell != null) - result.add(arrayElementCell); - } - // There is a scanattribute set to retrieve the specific array element - return next; - } catch (Throwable t) { - ServerUtil.throwIOException(getRegion().getRegionInfo().getRegionNameAsString(), t); - return false; // impossible - } + boolean res = next(result); + ScannerContextUtil.incrementSizeProgress(scannerContext, result); + ScannerContextUtil.updateTimeProgress(scannerContext); + return res; } /** http://git-wip-us.apache.org/repos/asf/phoenix/blob/aaa41a33/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsScanner.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsScanner.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsScanner.java index ab6b3db..2fb6f14 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsScanner.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsScanner.java @@ -73,9 +73,7 @@ public class StatisticsScanner implements InternalScanner { @Override public boolean next(List<Cell> result, ScannerContext scannerContext) throws IOException { - boolean ret = delegate.next(result, scannerContext); - updateStats(result); - return ret; + return next(result); } /**