PHOENIX-3112 Partial row scan not handled correctly

Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/aaa41a33
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/aaa41a33
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/aaa41a33

Branch: refs/heads/master
Commit: aaa41a33d025ad6daa832fe8b42fc235e7154648
Parents: bd21ed3
Author: Sergey Soldatov <s...@apache.org>
Authored: Wed Aug 2 16:56:04 2017 -0700
Committer: Sergey Soldatov <s...@apache.org>
Committed: Tue Oct 3 00:44:26 2017 -0700

----------------------------------------------------------------------
 .../PartialResultServerConfigurationIT.java     | 148 ++++++++++++++
 .../PartialScannerResultsDisabledIT.java        | 193 +++++++++++++++++++
 .../DataTableLocalIndexRegionScanner.java       |   7 +-
 .../hbase/regionserver/ScannerContextUtil.java  |  41 ++++
 .../phoenix/coprocessor/BaseRegionScanner.java  |   4 +-
 .../coprocessor/BaseScannerRegionObserver.java  |  11 +-
 .../coprocessor/DelegateRegionScanner.java      |   4 +-
 .../coprocessor/HashJoinRegionScanner.java      |  38 +---
 .../phoenix/iterate/RegionScannerFactory.java   |  51 +----
 .../phoenix/schema/stats/StatisticsScanner.java |   4 +-
 10 files changed, 411 insertions(+), 90 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/aaa41a33/phoenix-core/src/it/java/org/apache/phoenix/end2end/PartialResultServerConfigurationIT.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/PartialResultServerConfigurationIT.java
 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/PartialResultServerConfigurationIT.java
new file mode 100644
index 0000000..1c9ac38
--- /dev/null
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/PartialResultServerConfigurationIT.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.end2end;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.MiniHBaseCluster;
+import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.jdbc.PhoenixDriver;
+import org.apache.phoenix.query.ConnectionQueryServices.Feature;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.util.PhoenixRuntime;
+import org.apache.phoenix.util.PropertiesUtil;
+import org.junit.AfterClass;
+import org.junit.Assume;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.Statement;
+import java.util.Properties;
+import java.util.Random;
+import java.util.UUID;
+
+import static 
org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD;
+import static org.apache.phoenix.query.BaseTest.generateUniqueName;
+import static org.apache.phoenix.query.BaseTest.setUpConfigForMiniCluster;
+import static org.apache.phoenix.query.QueryServices.*;
+import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+
+/**
+ * This is a separate from @PartialResultDisabledIT because it requires server 
side configuration
+ */
+@Category(NeedsOwnMiniClusterTest.class)
+public class PartialResultServerConfigurationIT {
+    private static HBaseTestingUtility hbaseTestUtil;
+    private static String zkQuorum;
+    private static String url;
+
+    @BeforeClass
+    public static void setUp() throws Exception {
+        Configuration conf = HBaseConfiguration.create();
+        hbaseTestUtil = new HBaseTestingUtility(conf);
+        setUpConfigForMiniCluster(conf);
+
+        //Enforce the limit of the result, so scans will stop between cells.
+        conf.setLong(HConstants.HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY, 5);
+        conf.setLong(HConstants.HBASE_SERVER_SCANNER_MAX_RESULT_SIZE_KEY, 5);
+
+        hbaseTestUtil.startMiniCluster();
+        zkQuorum = "localhost:" + hbaseTestUtil.getZkCluster().getClientPort();
+        url = PhoenixRuntime.JDBC_PROTOCOL + 
PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR + zkQuorum;
+
+        DriverManager.registerDriver(PhoenixDriver.INSTANCE);
+    }
+
+    @AfterClass
+    public static void tearDownAfterClass() throws Exception {
+        try {
+            DriverManager.deregisterDriver(PhoenixDriver.INSTANCE);
+        } finally {
+            hbaseTestUtil.shutdownMiniCluster();
+        }
+    }
+
+    /**
+     * This test creates two tables with a single record at the end of each 
tables that match the join condition
+     * if scanner context is used during the scan, it would produce a partial 
row with NULL values.
+     * @throws Exception
+     */
+    @Test
+    public void testJoinScan() throws Exception {
+        String tableNameR = generateUniqueName();
+        String tableNameL = generateUniqueName();
+
+        int numRecords = 1000;
+        try (Connection conn = DriverManager.getConnection(url)) {
+            conn.createStatement().execute(
+                    "CREATE TABLE " + tableNameR + " (PK1 INTEGER NOT NULL 
PRIMARY KEY, KV1 VARCHAR)");
+            int i = 0;
+            String upsert = "UPSERT INTO " + tableNameR + " VALUES (?, ?)";
+            Random random = new Random();
+            PreparedStatement stmt = conn.prepareStatement(upsert);
+            while (i < numRecords) {
+                stmt.setInt(1, i);
+                stmt.setString(2, UUID.randomUUID().toString());
+                stmt.executeUpdate();
+                i++;
+            }
+            stmt.setInt(1, 9999);
+            stmt.setString(2, UUID.randomUUID().toString());
+            stmt.executeUpdate();
+            conn.commit();
+            conn.createStatement().execute(
+                    "CREATE TABLE " + tableNameL + " (PK1 INTEGER NOT NULL 
PRIMARY KEY, KV1 VARCHAR)");
+            upsert = "UPSERT INTO " + tableNameL + " VALUES (?, ?)";
+            stmt = conn.prepareStatement(upsert);
+            while (i < numRecords * 2) {
+                stmt.setInt(1, random.nextInt());
+                stmt.setString(2, "KV" + random.nextInt());
+                stmt.executeUpdate();
+                i++;
+            }
+            stmt.setInt(1, 9999);
+            stmt.setString(2, UUID.randomUUID().toString());
+            stmt.executeUpdate();
+            conn.commit();
+
+            String sql = "SELECT * FROM " + tableNameR + " A JOIN " + 
tableNameL + " B ON A.PK1 = B.PK1";
+            Statement s = conn.createStatement();
+            s.setFetchSize(2);
+            ResultSet rs = s.executeQuery(sql);
+            int count = 0;
+            while (rs.next()) {
+                if (rs.getString(2) == null || rs.getString(4) == null)
+                    fail("Null value because of partial result from scan");
+                count++;
+            }
+            assertEquals(count, 1);
+        }
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/aaa41a33/phoenix-core/src/it/java/org/apache/phoenix/end2end/PartialScannerResultsDisabledIT.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/PartialScannerResultsDisabledIT.java
 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/PartialScannerResultsDisabledIT.java
new file mode 100644
index 0000000..817b0bd
--- /dev/null
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/PartialScannerResultsDisabledIT.java
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless 
required by applicable
+ * law or agreed to in writing, software distributed under the License is 
distributed on an "AS IS"
+ * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or 
implied. See the License
+ * for the specific language governing permissions and limitations under the 
License.
+ */
+package org.apache.phoenix.end2end;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.sql.Timestamp;
+import java.util.Properties;
+import java.util.Random;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.commons.lang.RandomStringUtils;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.phoenix.util.IndexScrutiny;
+import org.apache.phoenix.util.SchemaUtil;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class PartialScannerResultsDisabledIT extends ParallelStatsDisabledIT {
+    public static final String TEST_TABLE_DDL =
+            "CREATE TABLE IF NOT EXISTS %s\n" + "(\n" + "    ORGANIZATION_ID 
CHAR(15) NOT NULL,\n"
+                    + "    FEED_ELEMENT_ID CHAR(15) NOT NULL,\n"
+                    + "    CONTAINER_ID CHAR(15) NOT NULL,\n"
+                    + "    FEED_TYPE VARCHAR(1) NOT NULL, \n"
+                    + "    NETWORK_ID CHAR(15) NOT NULL,\n" + "    USER_ID 
CHAR(15) NOT NULL,\n"
+                    + "    CREATED_TIME TIMESTAMP,\n" + "    LAST_UPDATE 
TIMESTAMP,\n"
+                    + "    RELEVANCE_SCORE DOUBLE,\n" + "    FEED_ITEM_TYPE 
VARCHAR(1),\n"
+                    + "    FEED_ELEMENT_TYPE VARCHAR(1),\n"
+                    + "    FEED_ELEMENT_IS_SYS_GEN BOOLEAN,\n"
+                    + "    FEED_ELEMENT_STATUS VARCHAR(1),\n"
+                    + "    FEED_ELEMENT_VISIBILITY VARCHAR(1),\n" + "    
PARENT_ID CHAR(15),\n"
+                    + "    CREATED_BY CHAR(15),\n" + "    BEST_COMMENT_ID 
CHAR(15),\n"
+                    + "    COMMENT_COUNT INTEGER,\n" + "    CONSTRAINT PK 
PRIMARY KEY\n" + "    (\n"
+                    + "        ORGANIZATION_ID,\n" + "        
FEED_ELEMENT_ID,\n"
+                    + "        CONTAINER_ID,\n" + "        FEED_TYPE,\n" + "   
     NETWORK_ID,\n"
+                    + "        USER_ID\n" + "    )\n" + ") 
COLUMN_ENCODED_BYTES = 0";
+
+    public static final String INDEX_1_DDL =
+            "CREATE INDEX IF NOT EXISTS %s\n" + "ON %s (\n" + "    
NETWORK_ID,\n"
+                    + "    CONTAINER_ID,\n" + "    FEED_TYPE,\n" + "    
USER_ID,\n"
+                    + "    CREATED_TIME DESC,\n" + "    FEED_ELEMENT_ID 
DESC,\n"
+                    + "    CREATED_BY\n" + ") "
+                    + "    INCLUDE (\n" + "    FEED_ITEM_TYPE,\n"
+                    + "    FEED_ELEMENT_TYPE,\n" + "    
FEED_ELEMENT_IS_SYS_GEN,\n"
+                    + "    FEED_ELEMENT_STATUS,\n" + "    
FEED_ELEMENT_VISIBILITY,\n"
+                    + "    PARENT_ID,\n" + "    BEST_COMMENT_ID,\n" + "    
COMMENT_COUNT\n" + ")";
+
+    private static final String UPSERT_INTO_DATA_TABLE =
+            "UPSERT INTO %s\n" + "(\n" + "    ORGANIZATION_ID,\n" + "    
FEED_ELEMENT_ID,\n"
+                    + "    CONTAINER_ID,\n" + "    FEED_TYPE,\n" + "    
NETWORK_ID,\n"
+                    + "    USER_ID,\n" + "    CREATED_TIME,\n" + "    
LAST_UPDATE,\n"
+                    + "    FEED_ITEM_TYPE,\n" + "    FEED_ELEMENT_TYPE,\n"
+                    + "    FEED_ELEMENT_IS_SYS_GEN,\n" + "    
FEED_ELEMENT_STATUS,\n"
+                    + "    FEED_ELEMENT_VISIBILITY,\n" + "    PARENT_ID,\n" + 
"    CREATED_BY,\n"
+                    + "    BEST_COMMENT_ID,\n" + "    COMMENT_COUNT\n" + ")"
+                    + "VALUES(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)";
+
+    private String dataTableName;
+    private String indexTableName;
+    private String schemaName;
+    private String dataTableFullName;
+    private static String indexTableFullName;
+    private static final Logger logger = 
LoggerFactory.getLogger(PartialScannerResultsDisabledIT.class);
+    private static Random random = new Random(1);
+    // background writer threads
+    private static Random sourceOfRandomness = new Random(0);
+    private static AtomicInteger upsertIdCounter = new AtomicInteger(1);
+
+    @Before
+    public void setup() throws Exception {
+        // create the tables
+        generateUniqueTableNames();
+        createTestTable(getUrl(), String.format(TEST_TABLE_DDL, 
dataTableFullName));
+        createTestTable(getUrl(), String.format(INDEX_1_DDL, indexTableName, 
dataTableFullName));
+    }
+    
+    @Test
+    public void testWithEnoughData() throws Exception {
+        try (Connection conn = DriverManager.getConnection(getUrl())) {
+            // Write enough data to trigger partial scanner results
+            // TODO: it's likely that less data could be written if whatever
+            // config parameters decide this are lowered.
+            writeSingleBatch(conn, 100, 20, dataTableFullName);
+            logger.info("Running scrutiny");
+            // Scutunize index to see if partial results are silently returned
+            // In that case we'll get a false positive on the scrutiny run.
+            long rowCount = IndexScrutiny.scrutinizeIndex(conn, 
dataTableFullName, indexTableFullName);
+            assertEquals(2000,rowCount);
+        }
+    }
+
+    /**
+     * Simple select query with fetch size that exceed the result size. In 
that case scan would start to produce
+     * partial result sets that from Phoenix perspective are the rows with 
NULL values.
+     * @throws SQLException
+     */
+    @Test
+    public void partialResultDuringSelect () throws SQLException {
+        String tableName = generateUniqueName();
+        Properties props = new Properties();
+        props.setProperty(HConstants.HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY, 
"5");
+        int numRecords = 10;
+        try (Connection conn = DriverManager.getConnection(url, props)) {
+            conn.createStatement().execute(
+                    "CREATE TABLE " + tableName + " (PK1 INTEGER NOT NULL 
PRIMARY KEY, KV1 VARCHAR)");
+            int i = 0;
+            String upsert = "UPSERT INTO " + tableName + " VALUES (?, ?)";
+            PreparedStatement stmt = conn.prepareStatement(upsert);
+            while (i < numRecords) {
+                stmt.setInt(1, i);
+                stmt.setString(2, UUID.randomUUID().toString());
+                stmt.executeUpdate();
+                i++;
+            }
+            conn.commit();
+
+            String sql = "SELECT * FROM " + tableName;
+            // at every next call wait for this period. This will cause lease 
to expire.
+            Statement s = conn.createStatement();
+            s.setFetchSize(100);
+            ResultSet rs = s.executeQuery(sql);
+            int count = 0;
+            while (rs.next()) {
+                if (rs.getString(2) == null)
+                    fail("Null value because of partial row scan");
+            }
+            count++;
+        }
+
+    }
+
+    private static String randString(int length, Random random) {
+        return RandomStringUtils.randomAlphabetic(length);
+    }
+    
+    private void writeSingleBatch(Connection connection, int batchSize, int 
numBatches, String tableName) throws Exception {
+        for (int j = 0; j < numBatches; j++) {
+            try (PreparedStatement statement =
+                    
connection.prepareStatement(String.format(UPSERT_INTO_DATA_TABLE, tableName))) {
+                for (int i = 0; i < batchSize; i++) {
+                    int index = 0;
+                    String id = "" + upsertIdCounter.getAndIncrement();
+                    statement.setString(++index, id); // ORGANIZATION_ID
+                    statement.setString(++index, id); // FEED_ELEMENT_ID,\n"
+                    statement.setString(++index, id); // CONTAINER_ID,\n"
+                    statement.setString(++index, randString(1, 
sourceOfRandomness)); // FEED_TYPE,\n"
+                    statement.setString(++index, randString(15, 
sourceOfRandomness)); // NETWORK_ID,\n"
+                    statement.setString(++index, randString(15, 
sourceOfRandomness)); // USER_ID,\n"
+                    statement.setTimestamp(++index, new 
Timestamp(System.currentTimeMillis())); // CREATED_TIME,\n"
+                    statement.setTimestamp(++index, new 
Timestamp(System.currentTimeMillis())); // LAST_UPDATE\n"
+                    statement.setString(++index, randString(1, 
sourceOfRandomness)); // FEED_ITEM_TYPE\n"
+                    statement.setString(++index, randString(1, 
sourceOfRandomness)); // FEED_ELEMENT_TYPE\n"
+                    statement.setBoolean(++index, false); // 
FEED_ELEMENT_IS_SYS_GEN\n"
+                    statement.setString(++index, randString(1, 
sourceOfRandomness)); // FEED_ELEMENT_STATUS\n"
+                    statement.setString(++index, randString(1, 
sourceOfRandomness)); // FEED_ELEMENT_VISIBILITY\n"
+                    statement.setString(++index, randString(15, 
sourceOfRandomness)); // PARENT_ID\n"
+                    statement.setString(++index, randString(15, 
sourceOfRandomness)); // CREATED_BY\n"
+                    statement.setString(++index, randString(15, 
sourceOfRandomness)); // BEST_COMMENT_ID\n"
+                    statement.setInt(++index, random.nextInt()); // 
COMMENT_COUNT\n" + ")"
+                    statement.execute();
+                }
+                connection.commit();
+            }
+        }
+    }
+    
+    private void generateUniqueTableNames() {
+        schemaName = generateUniqueName();
+        dataTableName = generateUniqueName() + "_DATA";
+        dataTableFullName = SchemaUtil.getTableName(schemaName, dataTableName);
+        indexTableName = generateUniqueName() + "_IDX";
+        indexTableFullName = SchemaUtil.getTableName(schemaName, 
indexTableName);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/aaa41a33/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/DataTableLocalIndexRegionScanner.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/DataTableLocalIndexRegionScanner.java
 
b/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/DataTableLocalIndexRegionScanner.java
index 4c44e82..64d4ac4 100644
--- 
a/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/DataTableLocalIndexRegionScanner.java
+++ 
b/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/DataTableLocalIndexRegionScanner.java
@@ -85,8 +85,13 @@ public class DataTableLocalIndexRegionScanner extends 
DelegateRegionScanner {
 
     @Override
     public boolean next(List<Cell> outResult, ScannerContext scannerContext) 
throws IOException {
+        return next(outResult);
+    }
+
+    @Override
+    public boolean next(List<Cell> results) throws IOException {
         List<Cell> dataTableResults = new ArrayList<Cell>();
-        boolean next = super.next(dataTableResults, scannerContext);
+        boolean next = super.next(dataTableResults);
         addMutations(dataTableResults);
         if (ServerUtil.readyToCommit(mutationList.size(), 
mutationList.byteSize(), maxBatchSize, maxBatchSizeBytes)||!next) {
             region.batchMutate(mutationList.toArray(new 
Mutation[mutationList.size()]), HConstants.NO_NONCE,

http://git-wip-us.apache.org/repos/asf/phoenix/blob/aaa41a33/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/ScannerContextUtil.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/ScannerContextUtil.java
 
b/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/ScannerContextUtil.java
new file mode 100644
index 0000000..126e0b1
--- /dev/null
+++ 
b/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/ScannerContextUtil.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.regionserver;
+
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+
+import java.util.List;
+
+/**
+ * @ScannerContext has all methods package visible. To properly update the 
context progress for our scanners we
+ * need this helper
+ */
+public class ScannerContextUtil {
+    public static void incrementSizeProgress(ScannerContext sc, List<Cell> 
cells) {
+        for (Cell cell : cells) {
+            
sc.incrementSizeProgress(CellUtil.estimatedHeapSizeOfWithoutTags(cell));
+        }
+    }
+
+    public static void updateTimeProgress(ScannerContext sc) {
+        sc.updateTimeProgress();
+    }
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/aaa41a33/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseRegionScanner.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseRegionScanner.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseRegionScanner.java
index b5e9c9f..945c1c4 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseRegionScanner.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseRegionScanner.java
@@ -41,7 +41,7 @@ public abstract class BaseRegionScanner extends 
DelegateRegionScanner {
 
     @Override
     public boolean next(List<Cell> result, ScannerContext scannerContext) 
throws IOException {
-        return next(result);
+        throw new IOException("Next with scannerContext should not be called 
in Phoenix environment");
     }
 
     @Override
@@ -56,6 +56,6 @@ public abstract class BaseRegionScanner extends 
DelegateRegionScanner {
 
     @Override
     public boolean nextRaw(List<Cell> result, ScannerContext scannerContext) 
throws IOException {
-        return next(result, scannerContext);
+        throw new IOException("NextRaw with scannerContext should not be 
called in Phoenix environment");
     }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/aaa41a33/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
index 34361ac..d3b257b 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
@@ -33,6 +33,7 @@ import org.apache.hadoop.hbase.io.TimeRange;
 import org.apache.hadoop.hbase.regionserver.Region;
 import org.apache.hadoop.hbase.regionserver.RegionScanner;
 import org.apache.hadoop.hbase.regionserver.ScannerContext;
+import org.apache.hadoop.hbase.regionserver.ScannerContextUtil;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.htrace.Span;
 import org.apache.htrace.Trace;
@@ -267,7 +268,10 @@ abstract public class BaseScannerRegionObserver extends 
BaseRegionObserver {
             @Override
             public boolean next(List<Cell> result, ScannerContext 
scannerContext) throws IOException {
                 overrideDelegate();
-                return super.next(result, scannerContext);
+                boolean res = super.next(result);
+                ScannerContextUtil.incrementSizeProgress(scannerContext, 
result);
+                ScannerContextUtil.updateTimeProgress(scannerContext);
+                return res;
             }
 
             @Override
@@ -279,7 +283,10 @@ abstract public class BaseScannerRegionObserver extends 
BaseRegionObserver {
             @Override
             public boolean nextRaw(List<Cell> result, ScannerContext 
scannerContext) throws IOException {
                 overrideDelegate();
-                return super.nextRaw(result, scannerContext);
+                boolean res = super.nextRaw(result);
+                ScannerContextUtil.incrementSizeProgress(scannerContext, 
result);
+                ScannerContextUtil.updateTimeProgress(scannerContext);
+                return res;
             }
             
             @Override

http://git-wip-us.apache.org/repos/asf/phoenix/blob/aaa41a33/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/DelegateRegionScanner.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/DelegateRegionScanner.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/DelegateRegionScanner.java
index 0ddabed..95d449a 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/DelegateRegionScanner.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/DelegateRegionScanner.java
@@ -64,7 +64,7 @@ public class DelegateRegionScanner implements RegionScanner {
 
     @Override
     public boolean next(List<Cell> result, ScannerContext scannerContext) 
throws IOException {
-        return delegate.next(result, scannerContext);
+        throw new IOException("Next with scannerContext should not be called 
in Phoenix environment");
     }
 
     @Override
@@ -74,7 +74,7 @@ public class DelegateRegionScanner implements RegionScanner {
 
     @Override
     public boolean nextRaw(List<Cell> result, ScannerContext scannerContext) 
throws IOException {
-        return delegate.nextRaw(result, scannerContext);
+        throw new IOException("NextRaw with scannerContext should not be 
called in Phoenix environment");
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/phoenix/blob/aaa41a33/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/HashJoinRegionScanner.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/HashJoinRegionScanner.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/HashJoinRegionScanner.java
index 5061d94..59f844d 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/HashJoinRegionScanner.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/HashJoinRegionScanner.java
@@ -276,18 +276,7 @@ public class HashJoinRegionScanner implements 
RegionScanner {
     @Override
     public boolean nextRaw(List<Cell> result, ScannerContext scannerContext)
             throws IOException {
-        try {
-            while (shouldAdvance()) {
-                hasMore = scanner.nextRaw(result, scannerContext);
-                processResults(result, false); // TODO detect if limit used 
here
-                result.clear();
-            }
-            
-            return nextInQueue(result);
-        } catch (Throwable t) {
-            
ServerUtil.throwIOException(env.getRegion().getRegionInfo().getRegionNameAsString(),
 t);
-            return false; // impossible
-        }
+        throw new IOException("Next with scannerContext should not be called 
in Phoenix environment");
     }
 
     @Override
@@ -302,33 +291,12 @@ public class HashJoinRegionScanner implements 
RegionScanner {
 
     @Override
     public boolean next(List<Cell> result) throws IOException {
-        try {
-            while (shouldAdvance()) {
-                hasMore = scanner.next(result);
-                processResults(result, false);
-                result.clear();
-            }
-            
-            return nextInQueue(result);
-        } catch (Throwable t) {
-            
ServerUtil.throwIOException(env.getRegion().getRegionInfo().getRegionNameAsString(),
 t);
-            return false; // impossible
-        }
+        throw new IOException("Next should not be used in HashJoin scanner");
     }
 
     @Override
     public boolean next(List<Cell> result, ScannerContext scannerContext) 
throws IOException {
-        try {
-            while (shouldAdvance()) {
-                hasMore = scanner.next(result, scannerContext);
-                processResults(result, false); // TODO detect if limit used 
here
-                result.clear();
-            }
-            return nextInQueue(result);
-        } catch (Throwable t) {
-            
ServerUtil.throwIOException(env.getRegion().getRegionInfo().getRegionNameAsString(),
 t);
-            return false; // impossible
-        }
+        throw new IOException("Next with scannerContext should not be called 
in Phoenix environment");
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/phoenix/blob/aaa41a33/phoenix-core/src/main/java/org/apache/phoenix/iterate/RegionScannerFactory.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/RegionScannerFactory.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/RegionScannerFactory.java
index 898a573..3dcbef9 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/RegionScannerFactory.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/RegionScannerFactory.java
@@ -28,6 +28,7 @@ import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.regionserver.Region;
 import org.apache.hadoop.hbase.regionserver.RegionScanner;
 import org.apache.hadoop.hbase.regionserver.ScannerContext;
+import org.apache.hadoop.hbase.regionserver.ScannerContextUtil;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.phoenix.execute.TupleProjector;
 import org.apache.phoenix.expression.Expression;
@@ -140,12 +141,7 @@ public abstract class RegionScannerFactory {
 
       @Override
       public boolean next(List<Cell> result, ScannerContext scannerContext) 
throws IOException {
-        try {
-          return s.next(result, scannerContext);
-        } catch (Throwable t) {
-          
ServerUtil.throwIOException(getRegion().getRegionInfo().getRegionNameAsString(),
 t);
-          return false; // impossible
-        }
+          throw new IOException("Next with scannerContext should not be called 
in Phoenix environment");
       }
 
       @Override
@@ -221,45 +217,10 @@ public abstract class RegionScannerFactory {
       @Override
       public boolean nextRaw(List<Cell> result, ScannerContext scannerContext)
           throws IOException {
-        try {
-          boolean next = s.nextRaw(result, scannerContext);
-          Cell arrayElementCell = null;
-          if (result.size() == 0) {
-            return next;
-          }
-          if (arrayFuncRefs != null && arrayFuncRefs.length > 0 && 
arrayKVRefs.size() > 0) {
-            int arrayElementCellPosition = 
replaceArrayIndexElement(arrayKVRefs, arrayFuncRefs, result);
-            arrayElementCell = result.get(arrayElementCellPosition);
-          }
-          if ((offset > 0 || ScanUtil.isLocalIndex(scan))  && 
!ScanUtil.isAnalyzeTable(scan)) {
-            if(hasReferences && actualStartKey!=null) {
-              next = scanTillScanStartRow(s, arrayKVRefs, arrayFuncRefs, 
result,
-                  scannerContext, arrayElementCell);
-              if (result.isEmpty()) {
-                return next;
-              }
-            }
-            /* In the following, c is only used when data region is null.
-            dataRegion will never be null in case of non-coprocessor call,
-            therefore no need to refactor
-             */
-            IndexUtil.wrapResultUsingOffset(env, result, offset, dataColumns,
-                tupleProjector, dataRegion, indexMaintainer, viewConstants, 
ptr);
-          }
-          if (projector != null) {
-            Tuple toProject = useQualifierAsListIndex ? new 
PositionBasedMultiKeyValueTuple(result) : new 
ResultTuple(Result.create(result));
-            Tuple tuple = projector.projectResults(toProject, 
useNewValueColumnQualifier);
-            result.clear();
-            result.add(tuple.getValue(0));
-            if(arrayElementCell != null)
-              result.add(arrayElementCell);
-          }
-          // There is a scanattribute set to retrieve the specific array 
element
-          return next;
-        } catch (Throwable t) {
-          
ServerUtil.throwIOException(getRegion().getRegionInfo().getRegionNameAsString(),
 t);
-          return false; // impossible
-        }
+        boolean res = next(result);
+        ScannerContextUtil.incrementSizeProgress(scannerContext, result);
+        ScannerContextUtil.updateTimeProgress(scannerContext);
+        return res;
       }
 
       /**

http://git-wip-us.apache.org/repos/asf/phoenix/blob/aaa41a33/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsScanner.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsScanner.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsScanner.java
index ab6b3db..2fb6f14 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsScanner.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsScanner.java
@@ -73,9 +73,7 @@ public class StatisticsScanner implements InternalScanner {
 
     @Override
     public boolean next(List<Cell> result, ScannerContext scannerContext) 
throws IOException {
-        boolean ret = delegate.next(result, scannerContext);
-        updateStats(result);
-        return ret;
+        return next(result);
     }
 
     /**

Reply via email to