Repository: phoenix
Updated Branches:
  refs/heads/4.0 64a2af142 -> 06bf8a952


PHOENIX-1096 Duplicate sequence values returned when doing upsert select 
against a salted table (Jan Fernando)


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/06bf8a95
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/06bf8a95
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/06bf8a95

Branch: refs/heads/4.0
Commit: 06bf8a952ff5e48368e8160936bc5dfc72aa48c9
Parents: 64a2af1
Author: James Taylor <jtay...@salesforce.com>
Authored: Fri Jul 18 11:00:25 2014 -0700
Committer: James Taylor <jtay...@salesforce.com>
Committed: Fri Jul 18 11:00:25 2014 -0700

----------------------------------------------------------------------
 .../apache/phoenix/end2end/UpsertSelectIT.java  | 70 ++++++++++++++++++++
 .../apache/phoenix/compile/SequenceManager.java |  2 +-
 2 files changed, 71 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/06bf8a95/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertSelectIT.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertSelectIT.java 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertSelectIT.java
index fea4ce4..ed79fcc 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertSelectIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertSelectIT.java
@@ -39,18 +39,36 @@ import java.sql.Date;
 import java.sql.DriverManager;
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
+import java.util.Map;
 import java.util.Properties;
 
 import org.apache.phoenix.query.QueryConstants;
+import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.schema.PDataType;
 import org.apache.phoenix.util.PhoenixRuntime;
 import org.apache.phoenix.util.QueryUtil;
+import org.apache.phoenix.util.ReadOnlyProps;
 import org.apache.phoenix.util.TestUtil;
+import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
+import com.google.common.collect.Maps;
+
 @Category(ClientManagedTimeTest.class)
 public class UpsertSelectIT extends BaseClientManagedTimeIT {
+       
+       
+  @BeforeClass
+  @Shadower(classBeingShadowed = BaseClientManagedTimeIT.class)
+  public static void doSetup() throws Exception {
+      Map<String,String> props = Maps.newHashMapWithExpectedSize(5);
+      props.put(QueryServices.QUEUE_SIZE_ATTRIB, Integer.toString(500));
+      props.put(QueryServices.THREAD_POOL_SIZE_ATTRIB, Integer.toString(64));
+
+      // Must update config before starting server
+      setUpTestDriver(getUrl(), new 
ReadOnlyProps(props.entrySet().iterator()));
+  }
     
     @Test
     public void testUpsertSelectWithNoIndex() throws Exception {
@@ -726,4 +744,56 @@ public class UpsertSelectIT extends 
BaseClientManagedTimeIT {
         assertFalse(rs.next());
         conn.close();
     }
+    
+    @Test
+    public void testUpsertSelectWithSequenceAndSaltingAndLargeDataSet() throws 
Exception {
+       
+               int numOfRecords = 10000;
+        long ts = nextTimestamp();
+        Properties props = new Properties();
+             props.setProperty(QueryServices.THREAD_POOL_SIZE_ATTRIB, 
Integer.toString(64));
+             props.setProperty(QueryServices.QUEUE_SIZE_ATTRIB, 
Integer.toString(500));
+             props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, 
Long.toString(ts));
+        Connection conn = DriverManager.getConnection(getUrl(), props);
+        String ddl = "CREATE TABLE IF NOT EXISTS DUMMY_CURSOR_STORAGE ("
+                                       + "ORGANIZATION_ID CHAR(15) NOT NULL, 
QUERY_ID CHAR(15) NOT NULL, CURSOR_ORDER BIGINT NOT NULL "
+                                       + "CONSTRAINT MAIN_PK PRIMARY KEY 
(ORGANIZATION_ID, QUERY_ID, CURSOR_ORDER) "
+                                       + ") SALT_BUCKETS = 64";
+        conn.createStatement().execute(ddl);
+        conn.createStatement().execute("CREATE TABLE DUMMY_SEQ_TEST_DATA " +
+                                                                               
                                                 "(ORGANIZATION_ID CHAR(15) NOT 
NULL, k1 integer not null, v1 integer not null " +
+                                                                               
                                                 "CONSTRAINT PK PRIMARY KEY 
(ORGANIZATION_ID, k1, v1) ) VERSIONS=1, SALT_BUCKETS = 64");
+        conn.createStatement().execute("create sequence s cache " + 
Integer.MAX_VALUE);
+        conn.close();
+
+        props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts 
+ 10));
+        conn = DriverManager.getConnection(getUrl(), props);
+        for (int i = 0; i < numOfRecords; i++) {
+               conn.createStatement().execute("upsert into DUMMY_SEQ_TEST_DATA 
values ('00Dxx0000001gEH'," + i + "," + i + ")");
+        }
+        conn.commit();
+
+        props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts 
+ 15));
+        conn = DriverManager.getConnection(getUrl(), props);
+        conn.setAutoCommit(true);
+        conn.createStatement().execute("upsert into DUMMY_CURSOR_STORAGE 
select '00Dxx0000001gEH', 'MyQueryId', next value for s FROM 
DUMMY_SEQ_TEST_DATA");
+
+        props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts 
+ 20));
+        conn = DriverManager.getConnection(getUrl(), props);
+        ResultSet rs = conn.createStatement().executeQuery("select count(*) 
from DUMMY_CURSOR_STORAGE");
+        
+        assertTrue(rs.next());
+        assertEquals(numOfRecords, rs.getLong(1));
+        conn.close();
+
+        props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts 
+ 25));
+        ResultSet rs2 = conn.createStatement().executeQuery("select 
cursor_order from DUMMY_CURSOR_STORAGE");
+        long seq = 1;
+        while (rs2.next()) {
+            assertEquals(seq, rs2.getLong(1));
+            seq++;
+        }
+        conn.close();
+    
+    }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/06bf8a95/phoenix-core/src/main/java/org/apache/phoenix/compile/SequenceManager.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/compile/SequenceManager.java 
b/phoenix-core/src/main/java/org/apache/phoenix/compile/SequenceManager.java
index 8e71c3b..de352b5 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/SequenceManager.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/SequenceManager.java
@@ -174,7 +174,6 @@ public class SequenceManager {
     
     private class SequenceValueExpression extends BaseTerminalExpression {
         private final int index;
-        private final byte[] valueBuffer = new 
byte[PDataType.LONG.getByteSize()];
 
         private SequenceValueExpression(int index) {
             this.index = index;
@@ -186,6 +185,7 @@ public class SequenceManager {
         
         @Override
         public boolean evaluate(Tuple tuple, ImmutableBytesWritable ptr) {
+                       byte[] valueBuffer = new 
byte[PDataType.LONG.getByteSize()];
             
PDataType.LONG.getCodec().encodeLong(tuple.getSequenceValue(index), 
valueBuffer, 0);
             ptr.set(valueBuffer);
             return true;

Reply via email to