PHOENIX-4849 Phoenix may generate incorrectly replace TableResultIterators 
after HBase region splits.


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/019ef373
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/019ef373
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/019ef373

Branch: refs/heads/4.14-cdh5.11
Commit: 019ef3733990972e3b9ab9a2712c3e213109a8d1
Parents: 1e4cb6f
Author: Lars Hofhansl <la...@apache.org>
Authored: Wed Sep 26 19:18:05 2018 +0100
Committer: Pedro Boado <pbo...@apache.org>
Committed: Wed Oct 17 20:33:14 2018 +0100

----------------------------------------------------------------------
 .../end2end/UpsertSelectAutoCommitIT.java       | 22 ++++++---
 .../apache/phoenix/compile/UpsertCompiler.java  | 17 +++++++
 .../phoenix/coprocessor/ScanRegionObserver.java |  9 +++-
 .../phoenix/iterate/TableResultIterator.java    | 50 +++++++-------------
 .../java/org/apache/phoenix/util/ScanUtil.java  |  8 ++++
 5 files changed, 63 insertions(+), 43 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/019ef373/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertSelectAutoCommitIT.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertSelectAutoCommitIT.java
 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertSelectAutoCommitIT.java
index 38d48d6..d81c2d0 100644
--- 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertSelectAutoCommitIT.java
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertSelectAutoCommitIT.java
@@ -34,10 +34,13 @@ import java.sql.SQLException;
 import java.sql.Statement;
 import java.util.Properties;
 
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
 import org.apache.phoenix.exception.SQLExceptionCode;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.util.PropertiesUtil;
+import org.apache.phoenix.util.TestUtil;
 import org.junit.Test;
 
 
@@ -161,19 +164,24 @@ public class UpsertSelectAutoCommitIT extends 
ParallelStatsDisabledIT {
         props.setProperty(QueryServices.SCAN_RESULT_CHUNK_SIZE, 
Integer.toString(3));
         Connection conn = DriverManager.getConnection(getUrl(), props);
         conn.setAutoCommit(true);
-        conn.createStatement().execute("CREATE SEQUENCE keys");
+        conn.createStatement().execute("CREATE SEQUENCE keys CACHE 1000");
         String tableName = generateUniqueName();
-        conn.createStatement().execute(
-            "CREATE TABLE " + tableName + " (pk INTEGER PRIMARY KEY, val 
INTEGER)");
+        conn.createStatement().execute("CREATE TABLE " + tableName
+                + " (pk INTEGER PRIMARY KEY, val INTEGER) 
UPDATE_CACHE_FREQUENCY=3600000");
 
         conn.createStatement().execute(
             "UPSERT INTO " + tableName + " VALUES (NEXT VALUE FOR keys,1)");
-        for (int i=0; i<6; i++) {
-            Statement stmt = conn.createStatement();
-            int upsertCount = stmt.executeUpdate(
-                "UPSERT INTO " + tableName + " SELECT NEXT VALUE FOR keys, val 
FROM " + tableName);
+        PreparedStatement stmt =
+                conn.prepareStatement("UPSERT INTO " + tableName
+                        + " SELECT NEXT VALUE FOR keys, val FROM " + 
tableName);
+        HBaseAdmin admin =
+                driver.getConnectionQueryServices(getUrl(), 
TestUtil.TEST_PROPERTIES).getAdmin();
+        for (int i=0; i<12; i++) {
+            admin.split(TableName.valueOf(tableName));
+            int upsertCount = stmt.executeUpdate();
             assertEquals((int)Math.pow(2, i), upsertCount);
         }
+        admin.close();
         conn.close();
     }
 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/019ef373/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java 
b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
index c3cfa10..fb1169d 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
@@ -76,12 +76,14 @@ import org.apache.phoenix.parse.SelectStatement;
 import org.apache.phoenix.parse.SequenceValueParseNode;
 import org.apache.phoenix.parse.UpsertStatement;
 import org.apache.phoenix.query.ConnectionQueryServices;
+import org.apache.phoenix.query.QueryConstants;
 import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.query.QueryServicesOptions;
 import org.apache.phoenix.schema.ColumnRef;
 import org.apache.phoenix.schema.ConstraintViolationException;
 import org.apache.phoenix.schema.DelegateColumn;
 import org.apache.phoenix.schema.IllegalDataException;
+import org.apache.phoenix.schema.MetaDataClient;
 import org.apache.phoenix.schema.PColumn;
 import org.apache.phoenix.schema.PColumnImpl;
 import org.apache.phoenix.schema.PName;
@@ -551,6 +553,21 @@ public class UpsertCompiler {
             // Use optimizer to choose the best plan
             QueryCompiler compiler = new QueryCompiler(statement, select, 
selectResolver, targetColumns, parallelIteratorFactoryToBe, new 
SequenceManager(statement), true, false, null);
             queryPlanToBe = compiler.compile();
+
+            if (sameTable) {
+                // in the UPSERT INTO X ... SELECT FROM X case enforce the 
source tableRef's TS
+                // as max TS, so that the query can safely restarted and still 
work of a snapshot
+                // (so it won't see its own data in case of concurrent splits)
+                // see PHOENIX-4849
+                long serverTime = 
selectResolver.getTables().get(0).getCurrentTime();
+                if (serverTime == QueryConstants.UNSET_TIMESTAMP) {
+                    // if this is the first time this table is resolved the 
ref's current time might not be defined, yet
+                    // in that case force an RPC to get the server time
+                    serverTime = new 
MetaDataClient(connection).getCurrentTime(schemaName, tableName);
+                }
+                Scan scan = queryPlanToBe.getContext().getScan();
+                ScanUtil.setTimeRange(scan, scan.getTimeRange().getMin(), 
serverTime);
+            }
             // This is post-fix: if the tableRef is a projected table, this 
means there are post-processing
             // steps and parallelIteratorFactory did not take effect.
             if (queryPlanToBe.getTableRef().getTable().getType() == 
PTableType.PROJECTED || queryPlanToBe.getTableRef().getTable().getType() == 
PTableType.SUBQUERY) {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/019ef373/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java
index 2d9cd4f..c2dfc4c 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java
@@ -32,6 +32,7 @@ import org.apache.phoenix.expression.OrderByExpression;
 import org.apache.phoenix.iterate.NonAggregateRegionScannerFactory;
 import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.util.EncodedColumnsUtil;
+import org.apache.phoenix.util.ScanUtil;
 
 /**
  *
@@ -75,8 +76,12 @@ public class ScanRegionObserver extends 
BaseScannerRegionObserver {
     }
 
     @Override
-    protected boolean isRegionObserverFor(Scan scan) {
-        return 
scan.getAttribute(BaseScannerRegionObserver.NON_AGGREGATE_QUERY) != null;
+    protected boolean skipRegionBoundaryCheck(Scan scan) {
+        return super.skipRegionBoundaryCheck(scan) || 
ScanUtil.isSimpleScan(scan);
     }
 
+    @Override
+    protected boolean isRegionObserverFor(Scan scan) {
+        return ScanUtil.isNonAggregateScan(scan);
+    }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/019ef373/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java
index 06f612a..e6b94fb 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java
@@ -170,7 +170,7 @@ public class TableResultIterator implements ResultIterator {
             } catch (SQLException e) {
                 try {
                     throw ServerUtil.parseServerException(e);
-                } catch(StaleRegionBoundaryCacheException | 
HashJoinCacheNotFoundException e1) {
+                } catch(HashJoinCacheNotFoundException e1) {
                     if(ScanUtil.isNonAggregateScan(scan) && 
plan.getContext().getAggregationManager().isEmpty()) {
                         // For non aggregate queries if we get stale region 
boundary exception we can
                         // continue scanning from the next value of lasted 
fetched result.
@@ -188,42 +188,24 @@ public class TableResultIterator implements 
ResultIterator {
                             }
                         }
                         
plan.getContext().getConnection().getQueryServices().clearTableRegionCache(htable.getTableName());
-                        if (e1 instanceof HashJoinCacheNotFoundException) {
-                            logger.debug(
-                                    "Retrying when Hash Join cache is not 
found on the server ,by sending the cache again");
-                            if (retry <= 0) {
+                        logger.debug(
+                                "Retrying when Hash Join cache is not found on 
the server ,by sending the cache again");
+                        if (retry <= 0) {
+                            throw e1;
+                        }
+                        Long cacheId = ((HashJoinCacheNotFoundException) 
e1).getCacheId();
+                        retry--;
+                        try {
+                            ServerCache cache = caches == null ? null :
+                                    caches.get(new 
ImmutableBytesPtr(Bytes.toBytes(cacheId)));
+                            if 
(!hashCacheClient.addHashCacheToServer(newScan.getStartRow(),
+                                    cache, plan.getTableRef().getTable())) {
                                 throw e1;
                             }
-                            retry--;
-                            try {
-                                Long cacheId = 
((HashJoinCacheNotFoundException) e1).getCacheId();
-
-                                ServerCache cache = caches == null ? null :
-                                    caches.get(new 
ImmutableBytesPtr(Bytes.toBytes(cacheId)));
+                            this.scanIterator = ((BaseQueryPlan) 
plan).iterator(caches, scanGrouper, newScan);
 
-                                if 
(!hashCacheClient.addHashCacheToServer(newScan.getStartRow(),
-                                        cache, plan.getTableRef().getTable())) 
{
-                                    throw e1;
-                                }
-                                this.scanIterator = ((BaseQueryPlan) 
plan).iterator(caches, scanGrouper, newScan);
-
-                            } catch (Exception ex) {
-                                throw ServerUtil.parseServerException(ex);
-                            }
-                        } else {
-                            try {
-                                
if(plan.getContext().isClientSideUpsertSelect()) {
-                                    if(ScanUtil.isLocalIndex(newScan)) {
-                                        throw e;
-                                    }
-                                    this.scanIterator =
-                                            new 
ScanningResultIterator(htable.getScanner(newScan), newScan, scanMetricsHolder);
-                                } else {
-                                    this.scanIterator = 
plan.iterator(scanGrouper, newScan);
-                                }
-                            } catch (IOException ex) {
-                                throw ServerUtil.parseServerException(ex);
-                            }
+                        } catch (Exception ex) {
+                            throw ServerUtil.parseServerException(ex);
                         }
                         lastTuple = scanIterator.next();
                     } else {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/019ef373/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java 
b/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java
index 62ecebd..2ac08e8 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java
@@ -119,6 +119,14 @@ public class ScanUtil {
         return 
scan.getAttribute(BaseScannerRegionObserver.NON_AGGREGATE_QUERY) != null;
     }
 
+    // Designates a "simple scan", i.e. a scan that does not need to be scoped
+    // to a single region.
+    public static boolean isSimpleScan(Scan scan) {
+        return  ScanUtil.isNonAggregateScan(scan) &&
+                scan.getAttribute(BaseScannerRegionObserver.TOPN) == null &&
+                scan.getAttribute(BaseScannerRegionObserver.SCAN_OFFSET) == 
null;
+    }
+
     // Use getTenantId and pass in column name to match against
     // in as PSchema attribute. If column name matches in 
     // KeyExpressions, set on scan as attribute

Reply via email to