PHOENIX-1257 Upserted data seen by SELECT in UPSERT SELECT execution (Lars Hofhansl)
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/e49e8dcf Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/e49e8dcf Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/e49e8dcf Branch: refs/heads/4.0 Commit: e49e8dcfbed740e13515c0b9aaf79db602059fd4 Parents: c9101f8 Author: James Taylor <jtay...@salesforce.com> Authored: Sun Oct 5 13:26:52 2014 -0700 Committer: James Taylor <jtay...@salesforce.com> Committed: Sun Oct 5 18:11:37 2014 -0700 ---------------------------------------------------------------------- .../phoenix/end2end/CoalesceFunctionIT.java | 67 ++++++++------------ .../apache/phoenix/end2end/ReverseScanIT.java | 2 +- ...ipRangeParallelIteratorRegionSplitterIT.java | 3 +- .../end2end/TenantSpecificTablesDDLIT.java | 2 +- .../phoenix/end2end/ToCharFunctionIT.java | 4 +- .../phoenix/end2end/ToNumberFunctionIT.java | 4 +- .../end2end/UpsertSelectAutoCommitIT.java | 23 +++++++ .../salted/SaltedTableVarLengthRowKeyIT.java | 8 +-- .../apache/phoenix/compile/FromCompiler.java | 32 +++++++--- .../apache/phoenix/compile/UpsertCompiler.java | 19 ++++++ .../apache/phoenix/execute/BaseQueryPlan.java | 6 -- 11 files changed, 104 insertions(+), 66 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/e49e8dcf/phoenix-core/src/it/java/org/apache/phoenix/end2end/CoalesceFunctionIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CoalesceFunctionIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CoalesceFunctionIT.java index 57599e6..45fcb48 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CoalesceFunctionIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CoalesceFunctionIT.java @@ -67,7 +67,7 @@ public class CoalesceFunctionIT extends BaseHBaseManagedTimeIT { public void coalesceWithSumExplicitLong() throws Exception { Connection conn = DriverManager.getConnection(getUrl()); - String ddl = "CREATE TABLE IF NOT EXISTS TEST_COALESCE(" + String ddl = "CREATE TABLE TEST_COALESCE(" + " ID BIGINT NOT NULL, " + " COUNT BIGINT " + " CONSTRAINT pk PRIMARY KEY(ID))"; @@ -91,7 +91,7 @@ public class CoalesceFunctionIT extends BaseHBaseManagedTimeIT { public void coalesceWithSumImplicitLong() throws Exception { Connection conn = DriverManager.getConnection(getUrl()); - String ddl = "CREATE TABLE IF NOT EXISTS TEST_COALESCE(" + String ddl = "CREATE TABLE TEST_COALESCE(" + " ID BIGINT NOT NULL, " + " COUNT BIGINT " + " CONSTRAINT pk PRIMARY KEY(ID))"; @@ -115,7 +115,7 @@ public class CoalesceFunctionIT extends BaseHBaseManagedTimeIT { public void coalesceWithSecondParamAsExpression() throws Exception { Connection conn = DriverManager.getConnection(getUrl()); - String ddl = "CREATE TABLE IF NOT EXISTS TEST_COALESCE(" + String ddl = "CREATE TABLE TEST_COALESCE(" + " ID BIGINT NOT NULL, " + " COUNT BIGINT " + " CONSTRAINT pk PRIMARY KEY(ID))"; @@ -139,7 +139,7 @@ public class CoalesceFunctionIT extends BaseHBaseManagedTimeIT { public void nonTypedSecondParameterLong() throws Exception { Connection conn = DriverManager.getConnection(getUrl()); - String ddl = "CREATE TABLE IF NOT EXISTS TEST_COALESCE(" + String ddl = "CREATE TABLE TEST_COALESCE(" + " ID BIGINT NOT NULL, " + " COUNT BIGINT " //first parameter to coalesce + " CONSTRAINT pk PRIMARY KEY(ID))"; @@ -163,47 +163,32 @@ public class CoalesceFunctionIT extends BaseHBaseManagedTimeIT { public void nonTypedSecondParameterUnsignedDataTypes() throws Exception { Connection conn = DriverManager.getConnection(getUrl()); - String[] dataTypes = { - "UNSIGNED_INT", - "UNSIGNED_LONG", - "UNSIGNED_TINYINT", - "UNSIGNED_SMALLINT", - "UNSIGNED_FLOAT", - "UNSIGNED_DOUBLE", - "UNSIGNED_TIME", - "UNSIGNED_DATE", - "UNSIGNED_TIMESTAMP" - }; - - for (String dataType : dataTypes) { - - String ddl = "CREATE TABLE IF NOT EXISTS TEST_COALESCE(" - + " ID BIGINT NOT NULL, " - + " COUNT " + dataType //first parameter to coalesce - + " CONSTRAINT pk PRIMARY KEY(ID))"; - conn.createStatement().execute(ddl); - - conn.createStatement().execute("UPSERT INTO TEST_COALESCE(ID, COUNT) VALUES(2, null)"); - conn.commit(); - - //second param to coalesce is signed int - ResultSet rs = conn.createStatement().executeQuery( - "SELECT " - + "COALESCE(NTH_VALUE(COUNT, 100) WITHIN GROUP (ORDER BY COUNT DESC), 1) " - + "FROM TEST_COALESCE " - + "GROUP BY ID"); + String ddl = "CREATE TABLE TEST_COALESCE (" + + " ID BIGINT NOT NULL, " + + " COUNT UNSIGNED_INT " //first parameter to coalesce + + " CONSTRAINT pk PRIMARY KEY(ID))"; + conn.createStatement().execute(ddl); - assertTrue(rs.next()); - assertEquals(1, rs.getInt(1)); - assertFalse(rs.wasNull()); - } + conn.createStatement().execute("UPSERT INTO TEST_COALESCE (ID, COUNT) VALUES(2, null)"); + conn.commit(); + + //second param to coalesce is signed int + ResultSet rs = conn.createStatement().executeQuery( + "SELECT " + + " COALESCE(NTH_VALUE(COUNT, 100) WITHIN GROUP (ORDER BY COUNT DESC), 1) " + + " FROM TEST_COALESCE" + + " GROUP BY ID"); + + assertTrue(rs.next()); + assertEquals(1, rs.getInt(1)); + assertFalse(rs.wasNull()); } @Test public void testWithNthValueAggregationFunction() throws Exception { Connection conn = DriverManager.getConnection(getUrl()); - String ddl = "CREATE TABLE IF NOT EXISTS TEST_NTH(" + String ddl = "CREATE TABLE TEST_NTH(" + " ID BIGINT NOT NULL, " + " DATE TIMESTAMP NOT NULL, " + " COUNT BIGINT " @@ -234,7 +219,7 @@ public class CoalesceFunctionIT extends BaseHBaseManagedTimeIT { public void wrongDataTypeOfSecondParameter() throws Exception { Connection conn = DriverManager.getConnection(getUrl()); - String ddl = "CREATE TABLE IF NOT EXISTS TEST_COALESCE(" + String ddl = "CREATE TABLE TEST_COALESCE(" + " ID UNSIGNED_INT NOT NULL, " + " COUNT UNSIGNED_INT " + " CONSTRAINT pk PRIMARY KEY(ID))"; @@ -260,7 +245,7 @@ public class CoalesceFunctionIT extends BaseHBaseManagedTimeIT { public void testImplicitSecondArgCastingException() throws Exception { Connection conn = DriverManager.getConnection(getUrl()); - String ddl = "CREATE TABLE IF NOT EXISTS TEST_COALESCE(" + String ddl = "CREATE TABLE TEST_COALESCE(" + " ID INTEGER NOT NULL, " + " COUNT UNSIGNED_INT " //first parameter to coalesce + " CONSTRAINT pk PRIMARY KEY(ID))"; @@ -288,7 +273,7 @@ public class CoalesceFunctionIT extends BaseHBaseManagedTimeIT { public void testImplicitSecondArgCasting() throws Exception { Connection conn = DriverManager.getConnection(getUrl()); - String ddl = "CREATE TABLE IF NOT EXISTS TEST_COALESCE(" + String ddl = "CREATE TABLE TEST_COALESCE(" + " ID DOUBLE NOT NULL, " + " COUNT INTEGER " //first parameter to coalesce + " CONSTRAINT pk PRIMARY KEY(ID))"; http://git-wip-us.apache.org/repos/asf/phoenix/blob/e49e8dcf/phoenix-core/src/it/java/org/apache/phoenix/end2end/ReverseScanIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ReverseScanIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ReverseScanIT.java index f738773..26d6d4b 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ReverseScanIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ReverseScanIT.java @@ -47,7 +47,7 @@ import org.junit.experimental.categories.Category; import com.google.common.collect.Maps; -@Category(ClientManagedTimeTest.class) +@Category(HBaseManagedTimeTest.class) public class ReverseScanIT extends BaseHBaseManagedTimeIT { @BeforeClass @Shadower(classBeingShadowed = BaseClientManagedTimeIT.class) http://git-wip-us.apache.org/repos/asf/phoenix/blob/e49e8dcf/phoenix-core/src/it/java/org/apache/phoenix/end2end/SkipRangeParallelIteratorRegionSplitterIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/SkipRangeParallelIteratorRegionSplitterIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/SkipRangeParallelIteratorRegionSplitterIT.java index 18d7910..a760f74 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/SkipRangeParallelIteratorRegionSplitterIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/SkipRangeParallelIteratorRegionSplitterIT.java @@ -357,7 +357,8 @@ public class SkipRangeParallelIteratorRegionSplitterIT extends BaseClientManaged } }; - PhoenixConnection connection = DriverManager.getConnection(getUrl(), PropertiesUtil.deepCopy(TEST_PROPERTIES)).unwrap(PhoenixConnection.class); + String url = getUrl() + ";" + PhoenixRuntime.CURRENT_SCN_ATTRIB + "=" + nextTimestamp(); + PhoenixConnection connection = DriverManager.getConnection(url, PropertiesUtil.deepCopy(TEST_PROPERTIES)).unwrap(PhoenixConnection.class); final PhoenixStatement statement = new PhoenixStatement(connection); final StatementContext context = new StatementContext(statement, resolver, scan, new SequenceManager(statement)); context.setScanRanges(scanRanges); http://git-wip-us.apache.org/repos/asf/phoenix/blob/e49e8dcf/phoenix-core/src/it/java/org/apache/phoenix/end2end/TenantSpecificTablesDDLIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/TenantSpecificTablesDDLIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/TenantSpecificTablesDDLIT.java index 2d6c30b..0e2b75c 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/TenantSpecificTablesDDLIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/TenantSpecificTablesDDLIT.java @@ -506,7 +506,7 @@ public class TenantSpecificTablesDDLIT extends BaseTenantSpecificTablesIT { conn.close(); // Global connection sees all tenant tables - conn = DriverManager.getConnection(getUrl()); + conn = DriverManager.getConnection(getUrl(), props); rs = conn.getMetaData().getSuperTables(TENANT_ID, null, null); assertTrue(rs.next()); assertEquals(TENANT_ID, rs.getString(PhoenixDatabaseMetaData.TABLE_CAT)); http://git-wip-us.apache.org/repos/asf/phoenix/blob/e49e8dcf/phoenix-core/src/it/java/org/apache/phoenix/end2end/ToCharFunctionIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ToCharFunctionIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ToCharFunctionIT.java index 10c34ee..1fa88f5 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ToCharFunctionIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ToCharFunctionIT.java @@ -218,7 +218,9 @@ public class ToCharFunctionIT extends BaseClientManagedTimeIT { } private void runOneRowQueryTest(String oneRowQuery, Integer pkValue, String projectedValue) throws Exception { - Connection conn = DriverManager.getConnection(getUrl()); + long ts = nextTimestamp(); + String url = getUrl() + ";" + PhoenixRuntime.CURRENT_SCN_ATTRIB + "=" + ts; + Connection conn = DriverManager.getConnection(url); try { PreparedStatement statement = conn.prepareStatement(oneRowQuery); ResultSet rs = statement.executeQuery(); http://git-wip-us.apache.org/repos/asf/phoenix/blob/e49e8dcf/phoenix-core/src/it/java/org/apache/phoenix/end2end/ToNumberFunctionIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ToNumberFunctionIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ToNumberFunctionIT.java index 9f415cf..431cbda 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ToNumberFunctionIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ToNumberFunctionIT.java @@ -284,7 +284,9 @@ public class ToNumberFunctionIT extends BaseClientManagedTimeIT { } private void runOneRowQueryTest(String oneRowQuery, boolean isIntegerColumn, Integer expectedIntValue, BigDecimal expectedDecimalValue) throws Exception { - Connection conn = DriverManager.getConnection(getUrl()); + long ts = nextTimestamp(); + String url = getUrl() + ";" + PhoenixRuntime.CURRENT_SCN_ATTRIB + "=" + ts; + Connection conn = DriverManager.getConnection(url); try { PreparedStatement statement = conn.prepareStatement(oneRowQuery); ResultSet rs = statement.executeQuery(); http://git-wip-us.apache.org/repos/asf/phoenix/blob/e49e8dcf/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertSelectAutoCommitIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertSelectAutoCommitIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertSelectAutoCommitIT.java index 86a52d0..e39619c 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertSelectAutoCommitIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertSelectAutoCommitIT.java @@ -29,8 +29,10 @@ import java.sql.Date; import java.sql.DriverManager; import java.sql.PreparedStatement; import java.sql.ResultSet; +import java.sql.Statement; import java.util.Properties; +import org.apache.phoenix.query.QueryServices; import org.apache.phoenix.util.PropertiesUtil; import org.junit.Test; import org.junit.experimental.categories.Category; @@ -135,4 +137,25 @@ public class UpsertSelectAutoCommitIT extends BaseHBaseManagedTimeIT { conn.commit(); } + + @Test + public void testUpsertSelectDoesntSeeUpsertedData() throws Exception { + Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); + props.setProperty(QueryServices.MUTATE_BATCH_SIZE_ATTRIB, Integer.toString(3)); + props.setProperty(QueryServices.SCAN_CACHE_SIZE_ATTRIB, Integer.toString(3)); + props.setProperty(QueryServices.SCAN_RESULT_CHUNK_SIZE, Integer.toString(3)); + Connection conn = DriverManager.getConnection(getUrl(), props); + conn.setAutoCommit(true); + conn.createStatement().execute("CREATE SEQUENCE keys"); + conn.createStatement().execute("CREATE TABLE foo (pk INTEGER PRIMARY KEY, val INTEGER)"); + + conn.createStatement().execute("UPSERT INTO foo VALUES (NEXT VALUE FOR keys,1)"); + for (int i=0; i<6; i++) { + Statement stmt = conn.createStatement(); + int upsertCount = stmt.executeUpdate("UPSERT INTO foo SELECT NEXT VALUE FOR keys, val FROM foo"); + assertEquals((int)Math.pow(2, i), upsertCount); + } + conn.close(); + } + } http://git-wip-us.apache.org/repos/asf/phoenix/blob/e49e8dcf/phoenix-core/src/it/java/org/apache/phoenix/end2end/salted/SaltedTableVarLengthRowKeyIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/salted/SaltedTableVarLengthRowKeyIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/salted/SaltedTableVarLengthRowKeyIT.java index ae696eb..db517a6 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/salted/SaltedTableVarLengthRowKeyIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/salted/SaltedTableVarLengthRowKeyIT.java @@ -29,14 +29,14 @@ import java.sql.PreparedStatement; import java.sql.ResultSet; import java.util.Properties; -import org.apache.phoenix.end2end.BaseClientManagedTimeIT; -import org.apache.phoenix.end2end.ClientManagedTimeTest; +import org.apache.phoenix.end2end.BaseHBaseManagedTimeIT; +import org.apache.phoenix.end2end.HBaseManagedTimeTest; import org.apache.phoenix.util.PropertiesUtil; import org.junit.Test; import org.junit.experimental.categories.Category; -@Category(ClientManagedTimeTest.class) -public class SaltedTableVarLengthRowKeyIT extends BaseClientManagedTimeIT { +@Category(HBaseManagedTimeTest.class) +public class SaltedTableVarLengthRowKeyIT extends BaseHBaseManagedTimeIT { private static void initTableValues() throws Exception { Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); http://git-wip-us.apache.org/repos/asf/phoenix/blob/e49e8dcf/phoenix-core/src/main/java/org/apache/phoenix/compile/FromCompiler.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/FromCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/FromCompiler.java index 5ee29e2..6f7b006 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/FromCompiler.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/FromCompiler.java @@ -156,9 +156,9 @@ public class FromCompiler { throws SQLException { List<TableNode> fromNodes = statement.getFrom(); if (!statement.isJoin() && fromNodes.get(0) instanceof NamedTableNode) - return new SingleTableColumnResolver(connection, (NamedTableNode) fromNodes.get(0), true); + return new SingleTableColumnResolver(connection, (NamedTableNode) fromNodes.get(0), true, 1); - MultiTableColumnResolver visitor = new MultiTableColumnResolver(connection); + MultiTableColumnResolver visitor = new MultiTableColumnResolver(connection, 1); for (TableNode node : fromNodes) { node.accept(visitor); } @@ -187,11 +187,11 @@ public class FromCompiler { } private static class SingleTableColumnResolver extends BaseColumnResolver { - private final List<TableRef> tableRefs; - private final String alias; + private final List<TableRef> tableRefs; + private final String alias; public SingleTableColumnResolver(PhoenixConnection connection, NamedTableNode table, long timeStamp) throws SQLException { - super(connection); + super(connection, 0); List<PColumnFamily> families = Lists.newArrayListWithExpectedSize(table.getDynamicColumns().size()); for (ColumnDef def : table.getDynamicColumns()) { if (def.getColumnDefName().getFamilyName() != null) { @@ -206,13 +206,17 @@ public class FromCompiler { } public SingleTableColumnResolver(PhoenixConnection connection, NamedTableNode tableNode, boolean updateCacheImmediately) throws SQLException { - super(connection); + this(connection, tableNode, updateCacheImmediately, 0); + } + + public SingleTableColumnResolver(PhoenixConnection connection, NamedTableNode tableNode, boolean updateCacheImmediately, int tsAddition) throws SQLException { + super(connection, tsAddition); alias = tableNode.getAlias(); TableRef tableRef = createTableRef(tableNode, updateCacheImmediately); tableRefs = ImmutableList.of(tableRef); } - @Override + @Override public List<TableRef> getTables() { return tableRefs; } @@ -273,10 +277,15 @@ public class FromCompiler { private static abstract class BaseColumnResolver implements ColumnResolver { protected final PhoenixConnection connection; protected final MetaDataClient client; + // Fudge factor to add to current time we calculate. We need this when we do a SELECT + // on Windows because the millis timestamp granularity is so bad we sometimes won't + // get the data back that we just upsert. + private final int tsAddition; - private BaseColumnResolver(PhoenixConnection connection) { + private BaseColumnResolver(PhoenixConnection connection, int tsAddition) { this.connection = connection; this.client = new MetaDataClient(connection); + this.tsAddition = tsAddition; } protected TableRef createTableRef(NamedTableNode tableNode, boolean updateCacheImmediately) throws SQLException { @@ -319,6 +328,9 @@ public class FromCompiler { // Add any dynamic columns to the table declaration List<ColumnDef> dynamicColumns = tableNode.getDynamicColumns(); theTable = addDynamicColumns(dynamicColumns, theTable); + if (timeStamp != QueryConstants.UNSET_TIMESTAMP) { + timeStamp += tsAddition; + } TableRef tableRef = new TableRef(tableNode.getAlias(), theTable, timeStamp, !dynamicColumns.isEmpty()); if (logger.isDebugEnabled() && timeStamp != QueryConstants.UNSET_TIMESTAMP) { logger.debug(LogUtil.addCustomAnnotations("Re-resolved stale table " + fullTableName + " with seqNum " + tableRef.getTable().getSequenceNumber() + " at timestamp " + tableRef.getTable().getTimeStamp() + " with " + tableRef.getTable().getColumns().size() + " columns: " + tableRef.getTable().getColumns(), connection)); @@ -359,8 +371,8 @@ public class FromCompiler { private final ListMultimap<String, TableRef> tableMap; private final List<TableRef> tables; - private MultiTableColumnResolver(PhoenixConnection connection) { - super(connection); + private MultiTableColumnResolver(PhoenixConnection connection, int tsAddition) { + super(connection, tsAddition); tableMap = ArrayListMultimap.<String, TableRef> create(); tables = Lists.newArrayList(); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/e49e8dcf/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java index 3a48a93..f363bdc 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java @@ -374,6 +374,7 @@ public class UpsertCompiler { select = prependTenantAndViewConstants(table, select, tenantId, addViewColumnsToBe); sameTable = select.getFrom().size() == 1 && tableRefToBe.equals(selectResolver.getTables().get(0)); + tableRefToBe = adjustTimestampToMinOfSameTable(tableRefToBe, selectResolver.getTables()); /* We can run the upsert in a coprocessor if: * 1) from has only 1 table and the into table matches from table * 2) the select query isn't doing aggregation (which requires a client-side final merge) @@ -797,6 +798,24 @@ public class UpsertCompiler { }; } + private TableRef adjustTimestampToMinOfSameTable(TableRef upsertRef, List<TableRef> selectRefs) { + long minTimestamp = Long.MAX_VALUE; + for (TableRef selectRef : selectRefs) { + if (selectRef.equals(upsertRef)) { + minTimestamp = Math.min(minTimestamp, selectRef.getTimeStamp()); + } + } + if (minTimestamp != Long.MAX_VALUE) { + // If we found the same table is selected from that is being upserted to, + // reset the timestamp of the upsert (which controls the Put timestamp) + // to the lowest timestamp we found to ensure that the data being selected + // will not see the data being upserted. This prevents infinite loops + // like the one in PHOENIX-1257. + return new TableRef(upsertRef, minTimestamp); + } + return upsertRef; + } + private static final class UpsertValuesCompiler extends ExpressionCompiler { private PColumn column; http://git-wip-us.apache.org/repos/asf/phoenix/blob/e49e8dcf/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java index d35ee8d..9a3e399 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java @@ -28,7 +28,6 @@ import java.util.Set; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.util.Bytes; @@ -177,11 +176,6 @@ public abstract class BaseQueryPlan implements QueryPlan { Long scn = connection.getSCN(); if (scn == null) { scn = context.getCurrentTime(); - // Add one to server time since max of time range is exclusive - // and we need to account of OSs with lower resolution clocks. - if (scn < HConstants.LATEST_TIMESTAMP) { - scn++; - } } ScanUtil.setTimeRange(scan, scn); } else {