PHOENIX-1257 Upserted data seen by SELECT in UPSERT SELECT execution (Lars 
Hofhansl)


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/e49e8dcf
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/e49e8dcf
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/e49e8dcf

Branch: refs/heads/4.0
Commit: e49e8dcfbed740e13515c0b9aaf79db602059fd4
Parents: c9101f8
Author: James Taylor <jtay...@salesforce.com>
Authored: Sun Oct 5 13:26:52 2014 -0700
Committer: James Taylor <jtay...@salesforce.com>
Committed: Sun Oct 5 18:11:37 2014 -0700

----------------------------------------------------------------------
 .../phoenix/end2end/CoalesceFunctionIT.java     | 67 ++++++++------------
 .../apache/phoenix/end2end/ReverseScanIT.java   |  2 +-
 ...ipRangeParallelIteratorRegionSplitterIT.java |  3 +-
 .../end2end/TenantSpecificTablesDDLIT.java      |  2 +-
 .../phoenix/end2end/ToCharFunctionIT.java       |  4 +-
 .../phoenix/end2end/ToNumberFunctionIT.java     |  4 +-
 .../end2end/UpsertSelectAutoCommitIT.java       | 23 +++++++
 .../salted/SaltedTableVarLengthRowKeyIT.java    |  8 +--
 .../apache/phoenix/compile/FromCompiler.java    | 32 +++++++---
 .../apache/phoenix/compile/UpsertCompiler.java  | 19 ++++++
 .../apache/phoenix/execute/BaseQueryPlan.java   |  6 --
 11 files changed, 104 insertions(+), 66 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/e49e8dcf/phoenix-core/src/it/java/org/apache/phoenix/end2end/CoalesceFunctionIT.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CoalesceFunctionIT.java 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CoalesceFunctionIT.java
index 57599e6..45fcb48 100644
--- 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CoalesceFunctionIT.java
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CoalesceFunctionIT.java
@@ -67,7 +67,7 @@ public class CoalesceFunctionIT extends 
BaseHBaseManagedTimeIT {
     public void coalesceWithSumExplicitLong() throws Exception {
         Connection conn = DriverManager.getConnection(getUrl());
 
-        String ddl = "CREATE TABLE IF NOT EXISTS TEST_COALESCE("
+        String ddl = "CREATE TABLE TEST_COALESCE("
                 + "    ID BIGINT NOT NULL, "
                 + "    COUNT BIGINT "
                 + "    CONSTRAINT pk PRIMARY KEY(ID))";
@@ -91,7 +91,7 @@ public class CoalesceFunctionIT extends 
BaseHBaseManagedTimeIT {
     public void coalesceWithSumImplicitLong() throws Exception {
         Connection conn = DriverManager.getConnection(getUrl());
 
-        String ddl = "CREATE TABLE IF NOT EXISTS TEST_COALESCE("
+        String ddl = "CREATE TABLE TEST_COALESCE("
                 + "    ID BIGINT NOT NULL, "
                 + "    COUNT BIGINT "
                 + "    CONSTRAINT pk PRIMARY KEY(ID))";
@@ -115,7 +115,7 @@ public class CoalesceFunctionIT extends 
BaseHBaseManagedTimeIT {
     public void coalesceWithSecondParamAsExpression() throws Exception {
         Connection conn = DriverManager.getConnection(getUrl());
 
-        String ddl = "CREATE TABLE IF NOT EXISTS TEST_COALESCE("
+        String ddl = "CREATE TABLE TEST_COALESCE("
                 + "    ID BIGINT NOT NULL, "
                 + "    COUNT BIGINT "
                 + "    CONSTRAINT pk PRIMARY KEY(ID))";
@@ -139,7 +139,7 @@ public class CoalesceFunctionIT extends 
BaseHBaseManagedTimeIT {
     public void nonTypedSecondParameterLong() throws Exception {
         Connection conn = DriverManager.getConnection(getUrl());
 
-        String ddl = "CREATE TABLE IF NOT EXISTS TEST_COALESCE("
+        String ddl = "CREATE TABLE TEST_COALESCE("
                 + "    ID BIGINT NOT NULL, "
                 + "    COUNT BIGINT " //first parameter to coalesce
                 + "    CONSTRAINT pk PRIMARY KEY(ID))";
@@ -163,47 +163,32 @@ public class CoalesceFunctionIT extends 
BaseHBaseManagedTimeIT {
     public void nonTypedSecondParameterUnsignedDataTypes() throws Exception {
         Connection conn = DriverManager.getConnection(getUrl());
 
-        String[] dataTypes = {
-            "UNSIGNED_INT",
-            "UNSIGNED_LONG",
-            "UNSIGNED_TINYINT",
-            "UNSIGNED_SMALLINT",
-            "UNSIGNED_FLOAT",
-            "UNSIGNED_DOUBLE",
-            "UNSIGNED_TIME",
-            "UNSIGNED_DATE",
-            "UNSIGNED_TIMESTAMP"
-        };
-
-        for (String dataType : dataTypes) {
-
-            String ddl = "CREATE TABLE IF NOT EXISTS TEST_COALESCE("
-                    + "    ID BIGINT NOT NULL, "
-                    + "    COUNT " + dataType //first parameter to coalesce
-                    + "    CONSTRAINT pk PRIMARY KEY(ID))";
-            conn.createStatement().execute(ddl);
-
-            conn.createStatement().execute("UPSERT INTO TEST_COALESCE(ID, 
COUNT) VALUES(2, null)");
-            conn.commit();
-
-            //second param to coalesce is signed int
-            ResultSet rs = conn.createStatement().executeQuery(
-                    "SELECT "
-                    + "COALESCE(NTH_VALUE(COUNT, 100) WITHIN GROUP (ORDER BY 
COUNT DESC), 1) "
-                    + "FROM TEST_COALESCE "
-                    + "GROUP BY ID");
+        String ddl = "CREATE TABLE TEST_COALESCE ("
+                + "    ID BIGINT NOT NULL, "
+                + "    COUNT UNSIGNED_INT " //first parameter to coalesce
+                + "    CONSTRAINT pk PRIMARY KEY(ID))";
+        conn.createStatement().execute(ddl);
 
-            assertTrue(rs.next());
-            assertEquals(1, rs.getInt(1));
-            assertFalse(rs.wasNull());
-        }
+        conn.createStatement().execute("UPSERT INTO TEST_COALESCE (ID, COUNT) 
VALUES(2, null)");
+        conn.commit();
+
+        //second param to coalesce is signed int
+        ResultSet rs = conn.createStatement().executeQuery(
+                "SELECT "
+                + " COALESCE(NTH_VALUE(COUNT, 100) WITHIN GROUP (ORDER BY 
COUNT DESC), 1) "
+                + " FROM TEST_COALESCE" 
+                + " GROUP BY ID");
+
+        assertTrue(rs.next());
+        assertEquals(1, rs.getInt(1));
+        assertFalse(rs.wasNull());
     }
 
     @Test
     public void testWithNthValueAggregationFunction() throws Exception {
         Connection conn = DriverManager.getConnection(getUrl());
 
-        String ddl = "CREATE TABLE IF NOT EXISTS TEST_NTH("
+        String ddl = "CREATE TABLE TEST_NTH("
                 + "    ID BIGINT NOT NULL, "
                 + "    DATE TIMESTAMP NOT NULL, "
                 + "    COUNT BIGINT "
@@ -234,7 +219,7 @@ public class CoalesceFunctionIT extends 
BaseHBaseManagedTimeIT {
     public void wrongDataTypeOfSecondParameter() throws Exception {
         Connection conn = DriverManager.getConnection(getUrl());
 
-        String ddl = "CREATE TABLE IF NOT EXISTS TEST_COALESCE("
+        String ddl = "CREATE TABLE TEST_COALESCE("
                 + "    ID UNSIGNED_INT NOT NULL, "
                 + "    COUNT UNSIGNED_INT "
                 + "    CONSTRAINT pk PRIMARY KEY(ID))";
@@ -260,7 +245,7 @@ public class CoalesceFunctionIT extends 
BaseHBaseManagedTimeIT {
     public void testImplicitSecondArgCastingException() throws Exception {
         Connection conn = DriverManager.getConnection(getUrl());
 
-        String ddl = "CREATE TABLE IF NOT EXISTS TEST_COALESCE("
+        String ddl = "CREATE TABLE TEST_COALESCE("
                 + "    ID INTEGER NOT NULL, "
                 + "    COUNT UNSIGNED_INT " //first parameter to coalesce
                 + "    CONSTRAINT pk PRIMARY KEY(ID))";
@@ -288,7 +273,7 @@ public class CoalesceFunctionIT extends 
BaseHBaseManagedTimeIT {
     public void testImplicitSecondArgCasting() throws Exception {
         Connection conn = DriverManager.getConnection(getUrl());
 
-        String ddl = "CREATE TABLE IF NOT EXISTS TEST_COALESCE("
+        String ddl = "CREATE TABLE TEST_COALESCE("
                 + "    ID DOUBLE NOT NULL, "
                 + "    COUNT INTEGER " //first parameter to coalesce
                 + "    CONSTRAINT pk PRIMARY KEY(ID))";

http://git-wip-us.apache.org/repos/asf/phoenix/blob/e49e8dcf/phoenix-core/src/it/java/org/apache/phoenix/end2end/ReverseScanIT.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ReverseScanIT.java 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ReverseScanIT.java
index f738773..26d6d4b 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ReverseScanIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ReverseScanIT.java
@@ -47,7 +47,7 @@ import org.junit.experimental.categories.Category;
 
 import com.google.common.collect.Maps;
 
-@Category(ClientManagedTimeTest.class)
+@Category(HBaseManagedTimeTest.class)
 public class ReverseScanIT extends BaseHBaseManagedTimeIT {
     @BeforeClass
     @Shadower(classBeingShadowed = BaseClientManagedTimeIT.class)

http://git-wip-us.apache.org/repos/asf/phoenix/blob/e49e8dcf/phoenix-core/src/it/java/org/apache/phoenix/end2end/SkipRangeParallelIteratorRegionSplitterIT.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/SkipRangeParallelIteratorRegionSplitterIT.java
 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/SkipRangeParallelIteratorRegionSplitterIT.java
index 18d7910..a760f74 100644
--- 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/SkipRangeParallelIteratorRegionSplitterIT.java
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/SkipRangeParallelIteratorRegionSplitterIT.java
@@ -357,7 +357,8 @@ public class SkipRangeParallelIteratorRegionSplitterIT 
extends BaseClientManaged
             }
             
         };
-        PhoenixConnection connection = DriverManager.getConnection(getUrl(), 
PropertiesUtil.deepCopy(TEST_PROPERTIES)).unwrap(PhoenixConnection.class);
+        String url = getUrl() + ";" + PhoenixRuntime.CURRENT_SCN_ATTRIB + "=" 
+ nextTimestamp();
+        PhoenixConnection connection = DriverManager.getConnection(url, 
PropertiesUtil.deepCopy(TEST_PROPERTIES)).unwrap(PhoenixConnection.class);
         final PhoenixStatement statement = new PhoenixStatement(connection);
         final StatementContext context = new StatementContext(statement, 
resolver, scan, new SequenceManager(statement));
         context.setScanRanges(scanRanges);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/e49e8dcf/phoenix-core/src/it/java/org/apache/phoenix/end2end/TenantSpecificTablesDDLIT.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/TenantSpecificTablesDDLIT.java
 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/TenantSpecificTablesDDLIT.java
index 2d6c30b..0e2b75c 100644
--- 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/TenantSpecificTablesDDLIT.java
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/TenantSpecificTablesDDLIT.java
@@ -506,7 +506,7 @@ public class TenantSpecificTablesDDLIT extends 
BaseTenantSpecificTablesIT {
             conn.close();
             
             // Global connection sees all tenant tables
-            conn = DriverManager.getConnection(getUrl());
+            conn = DriverManager.getConnection(getUrl(), props);
             rs = conn.getMetaData().getSuperTables(TENANT_ID, null, null);
             assertTrue(rs.next());
             assertEquals(TENANT_ID, 
rs.getString(PhoenixDatabaseMetaData.TABLE_CAT));

http://git-wip-us.apache.org/repos/asf/phoenix/blob/e49e8dcf/phoenix-core/src/it/java/org/apache/phoenix/end2end/ToCharFunctionIT.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ToCharFunctionIT.java 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ToCharFunctionIT.java
index 10c34ee..1fa88f5 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ToCharFunctionIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ToCharFunctionIT.java
@@ -218,7 +218,9 @@ public class ToCharFunctionIT extends 
BaseClientManagedTimeIT {
     }
     
     private void runOneRowQueryTest(String oneRowQuery, Integer pkValue, 
String projectedValue) throws Exception {
-        Connection conn = DriverManager.getConnection(getUrl());
+        long ts = nextTimestamp();
+        String url = getUrl() + ";" + PhoenixRuntime.CURRENT_SCN_ATTRIB + "=" 
+ ts;
+        Connection conn = DriverManager.getConnection(url);
         try {
             PreparedStatement statement = conn.prepareStatement(oneRowQuery);
             ResultSet rs = statement.executeQuery();

http://git-wip-us.apache.org/repos/asf/phoenix/blob/e49e8dcf/phoenix-core/src/it/java/org/apache/phoenix/end2end/ToNumberFunctionIT.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ToNumberFunctionIT.java 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ToNumberFunctionIT.java
index 9f415cf..431cbda 100644
--- 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ToNumberFunctionIT.java
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ToNumberFunctionIT.java
@@ -284,7 +284,9 @@ public class ToNumberFunctionIT extends 
BaseClientManagedTimeIT {
     }
     
     private void runOneRowQueryTest(String oneRowQuery, boolean 
isIntegerColumn, Integer expectedIntValue, BigDecimal expectedDecimalValue) 
throws Exception {
-        Connection conn = DriverManager.getConnection(getUrl());
+        long ts = nextTimestamp();
+        String url = getUrl() + ";" + PhoenixRuntime.CURRENT_SCN_ATTRIB + "=" 
+ ts;
+        Connection conn = DriverManager.getConnection(url);
         try {
             PreparedStatement statement = conn.prepareStatement(oneRowQuery);
             ResultSet rs = statement.executeQuery();

http://git-wip-us.apache.org/repos/asf/phoenix/blob/e49e8dcf/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertSelectAutoCommitIT.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertSelectAutoCommitIT.java
 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertSelectAutoCommitIT.java
index 86a52d0..e39619c 100644
--- 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertSelectAutoCommitIT.java
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertSelectAutoCommitIT.java
@@ -29,8 +29,10 @@ import java.sql.Date;
 import java.sql.DriverManager;
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
+import java.sql.Statement;
 import java.util.Properties;
 
+import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.util.PropertiesUtil;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
@@ -135,4 +137,25 @@ public class UpsertSelectAutoCommitIT extends 
BaseHBaseManagedTimeIT {
         conn.commit();
     }
     
+    
+    @Test
+    public void testUpsertSelectDoesntSeeUpsertedData() throws Exception {
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        props.setProperty(QueryServices.MUTATE_BATCH_SIZE_ATTRIB, 
Integer.toString(3));
+        props.setProperty(QueryServices.SCAN_CACHE_SIZE_ATTRIB, 
Integer.toString(3));
+        props.setProperty(QueryServices.SCAN_RESULT_CHUNK_SIZE, 
Integer.toString(3));
+        Connection conn = DriverManager.getConnection(getUrl(), props);
+        conn.setAutoCommit(true);
+        conn.createStatement().execute("CREATE SEQUENCE keys");
+        conn.createStatement().execute("CREATE TABLE foo (pk INTEGER PRIMARY 
KEY, val INTEGER)");
+
+        conn.createStatement().execute("UPSERT INTO foo VALUES (NEXT VALUE FOR 
keys,1)");
+        for (int i=0; i<6; i++) {
+            Statement stmt = conn.createStatement();
+            int upsertCount = stmt.executeUpdate("UPSERT INTO foo SELECT NEXT 
VALUE FOR keys, val FROM foo");
+            assertEquals((int)Math.pow(2, i), upsertCount);
+        }
+        conn.close();
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/e49e8dcf/phoenix-core/src/it/java/org/apache/phoenix/end2end/salted/SaltedTableVarLengthRowKeyIT.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/salted/SaltedTableVarLengthRowKeyIT.java
 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/salted/SaltedTableVarLengthRowKeyIT.java
index ae696eb..db517a6 100644
--- 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/salted/SaltedTableVarLengthRowKeyIT.java
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/salted/SaltedTableVarLengthRowKeyIT.java
@@ -29,14 +29,14 @@ import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.util.Properties;
 
-import org.apache.phoenix.end2end.BaseClientManagedTimeIT;
-import org.apache.phoenix.end2end.ClientManagedTimeTest;
+import org.apache.phoenix.end2end.BaseHBaseManagedTimeIT;
+import org.apache.phoenix.end2end.HBaseManagedTimeTest;
 import org.apache.phoenix.util.PropertiesUtil;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
-@Category(ClientManagedTimeTest.class)
-public class SaltedTableVarLengthRowKeyIT extends BaseClientManagedTimeIT {
+@Category(HBaseManagedTimeTest.class)
+public class SaltedTableVarLengthRowKeyIT extends BaseHBaseManagedTimeIT {
 
     private static void initTableValues() throws Exception {
         Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/e49e8dcf/phoenix-core/src/main/java/org/apache/phoenix/compile/FromCompiler.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/compile/FromCompiler.java 
b/phoenix-core/src/main/java/org/apache/phoenix/compile/FromCompiler.java
index 5ee29e2..6f7b006 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/FromCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/FromCompiler.java
@@ -156,9 +156,9 @@ public class FromCompiler {
                throws SQLException {
        List<TableNode> fromNodes = statement.getFrom();
         if (!statement.isJoin() && fromNodes.get(0) instanceof NamedTableNode)
-            return new SingleTableColumnResolver(connection, (NamedTableNode) 
fromNodes.get(0), true);
+            return new SingleTableColumnResolver(connection, (NamedTableNode) 
fromNodes.get(0), true, 1);
         
-        MultiTableColumnResolver visitor = new 
MultiTableColumnResolver(connection);
+        MultiTableColumnResolver visitor = new 
MultiTableColumnResolver(connection, 1);
         for (TableNode node : fromNodes) {
             node.accept(visitor);
         }
@@ -187,11 +187,11 @@ public class FromCompiler {
     }
     
     private static class SingleTableColumnResolver extends BaseColumnResolver {
-               private final List<TableRef> tableRefs;
-               private final String alias;
+       private final List<TableRef> tableRefs;
+       private final String alias;
        
        public SingleTableColumnResolver(PhoenixConnection connection, 
NamedTableNode table, long timeStamp) throws SQLException  {
-           super(connection);
+           super(connection, 0);
            List<PColumnFamily> families = 
Lists.newArrayListWithExpectedSize(table.getDynamicColumns().size());
            for (ColumnDef def : table.getDynamicColumns()) {
                if (def.getColumnDefName().getFamilyName() != null) {
@@ -206,13 +206,17 @@ public class FromCompiler {
        }
        
         public SingleTableColumnResolver(PhoenixConnection connection, 
NamedTableNode tableNode, boolean updateCacheImmediately) throws SQLException {
-            super(connection);
+            this(connection, tableNode, updateCacheImmediately, 0);
+        }
+
+        public SingleTableColumnResolver(PhoenixConnection connection, 
NamedTableNode tableNode, boolean updateCacheImmediately, int tsAddition) 
throws SQLException {
+            super(connection, tsAddition);
             alias = tableNode.getAlias();
             TableRef tableRef = createTableRef(tableNode, 
updateCacheImmediately);
             tableRefs = ImmutableList.of(tableRef);
         }
 
-               @Override
+        @Override
                public List<TableRef> getTables() {
                        return tableRefs;
                }
@@ -273,10 +277,15 @@ public class FromCompiler {
     private static abstract class BaseColumnResolver implements ColumnResolver 
{
         protected final PhoenixConnection connection;
         protected final MetaDataClient client;
+        // Fudge factor to add to current time we calculate. We need this when 
we do a SELECT
+        // on Windows because the millis timestamp granularity is so bad we 
sometimes won't
+        // get the data back that we just upsert.
+        private final int tsAddition;
         
-        private BaseColumnResolver(PhoenixConnection connection) {
+        private BaseColumnResolver(PhoenixConnection connection, int 
tsAddition) {
                this.connection = connection;
             this.client = new MetaDataClient(connection);
+            this.tsAddition = tsAddition;
         }
 
         protected TableRef createTableRef(NamedTableNode tableNode, boolean 
updateCacheImmediately) throws SQLException {
@@ -319,6 +328,9 @@ public class FromCompiler {
             // Add any dynamic columns to the table declaration
             List<ColumnDef> dynamicColumns = tableNode.getDynamicColumns();
             theTable = addDynamicColumns(dynamicColumns, theTable);
+            if (timeStamp != QueryConstants.UNSET_TIMESTAMP) {
+                timeStamp += tsAddition;
+            }
             TableRef tableRef = new TableRef(tableNode.getAlias(), theTable, 
timeStamp, !dynamicColumns.isEmpty());
             if (logger.isDebugEnabled() && timeStamp != 
QueryConstants.UNSET_TIMESTAMP) {
                 logger.debug(LogUtil.addCustomAnnotations("Re-resolved stale 
table " + fullTableName + " with seqNum " + 
tableRef.getTable().getSequenceNumber() + " at timestamp " + 
tableRef.getTable().getTimeStamp() + " with " + 
tableRef.getTable().getColumns().size() + " columns: " + 
tableRef.getTable().getColumns(), connection));
@@ -359,8 +371,8 @@ public class FromCompiler {
         private final ListMultimap<String, TableRef> tableMap;
         private final List<TableRef> tables;
 
-        private MultiTableColumnResolver(PhoenixConnection connection) {
-               super(connection);
+        private MultiTableColumnResolver(PhoenixConnection connection, int 
tsAddition) {
+               super(connection, tsAddition);
             tableMap = ArrayListMultimap.<String, TableRef> create();
             tables = Lists.newArrayList();
         }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/e49e8dcf/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java 
b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
index 3a48a93..f363bdc 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
@@ -374,6 +374,7 @@ public class UpsertCompiler {
                     select = prependTenantAndViewConstants(table, select, 
tenantId, addViewColumnsToBe);
                     sameTable = select.getFrom().size() == 1
                         && 
tableRefToBe.equals(selectResolver.getTables().get(0));
+                    tableRefToBe = 
adjustTimestampToMinOfSameTable(tableRefToBe, selectResolver.getTables());
                     /* We can run the upsert in a coprocessor if:
                      * 1) from has only 1 table and the into table matches 
from table
                      * 2) the select query isn't doing aggregation (which 
requires a client-side final merge)
@@ -797,6 +798,24 @@ public class UpsertCompiler {
         };
     }
     
+    private TableRef adjustTimestampToMinOfSameTable(TableRef upsertRef, 
List<TableRef> selectRefs) {
+        long minTimestamp = Long.MAX_VALUE;
+        for (TableRef selectRef : selectRefs) {
+            if (selectRef.equals(upsertRef)) {
+                minTimestamp = Math.min(minTimestamp, 
selectRef.getTimeStamp());
+            }
+        }
+        if (minTimestamp != Long.MAX_VALUE) {
+            // If we found the same table is selected from that is being 
upserted to,
+            // reset the timestamp of the upsert (which controls the Put 
timestamp)
+            // to the lowest timestamp we found to ensure that the data being 
selected
+            // will not see the data being upserted. This prevents infinite 
loops
+            // like the one in PHOENIX-1257.
+            return new TableRef(upsertRef, minTimestamp);
+        }
+        return upsertRef;
+    }
+
     private static final class UpsertValuesCompiler extends ExpressionCompiler 
{
         private PColumn column;
         

http://git-wip-us.apache.org/repos/asf/phoenix/blob/e49e8dcf/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java 
b/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java
index d35ee8d..9a3e399 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java
@@ -28,7 +28,6 @@ import java.util.Set;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -177,11 +176,6 @@ public abstract class BaseQueryPlan implements QueryPlan {
           Long scn = connection.getSCN();
           if (scn == null) {
             scn = context.getCurrentTime();
-            // Add one to server time since max of time range is exclusive
-            // and we need to account of OSs with lower resolution clocks.
-            if (scn < HConstants.LATEST_TIMESTAMP) {
-              scn++;
-            }
           }
           ScanUtil.setTimeRange(scan, scn);
         } else {

Reply via email to