[
https://issues.apache.org/jira/browse/PHOENIX-1096?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14065682#comment-14065682
]
ASF GitHub Bot commented on PHOENIX-1096:
-----------------------------------------
GitHub user jfernandosf opened a pull request:
https://github.com/apache/phoenix/pull/4
PHOENIX-1096 Fix for concurreny bug caused by a cached SequenceExpression
using the same byte[] buffer in multiple threads
This is a fix for the Upsert...Select issue we ran into when using
Sequences in the SELECT portion of the statement. We were generating mutations
with identical row keys because multiple rows were being assigned the same
sequence number. I tracked this down to the fact that we cache
SequenceExpression and have a member field byte[] where we store the sequence
after reading it. This buffer was being written to by multiple threads and so
there was no guarantee a thread would see the sequence it read and based on
timing multiple threads would read the same sequence when generating the row
key for the mutation.
Moving the buffer to a local variable addresses this issue.
I have also included a test that Samarth and I created to repro the issue
that is now passing.
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/jfernandosf/phoenix 3.0
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/phoenix/pull/4.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #4
----
commit dad771fa7be4320a20fb508a495c6478dcca53f6
Author: Jan Fernando <[email protected]>
Date: 2014-07-17T21:35:00Z
Fix for concurreny bug caused by a cached SequenceExpression using the same
byte[] buffer in multiple threads to store sequence values by multiple threads
when processing upsert...select result sets
----
> Duplicate sequence values returned when doing upsert select against a salted
> table.
> -----------------------------------------------------------------------------------
>
> Key: PHOENIX-1096
> URL: https://issues.apache.org/jira/browse/PHOENIX-1096
> Project: Phoenix
> Issue Type: Bug
> Reporter: Samarth Jain
> Fix For: 3.0.0, 4.0.0, 5.0.0
>
>
> {code}
> @Test
> public void testUpsertSelectWithSequenceAndLargeDataSet() throws
> Exception {
> long ts = nextTimestamp();
> Properties props = new Properties();
> //props.setProperty(QueryServices.THREAD_POOL_SIZE_ATTRIB,
> Integer.toString(64));
> props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB,
> Long.toString(ts));
> Connection conn = DriverManager.getConnection(getUrl(), props);
> String ddl = "CREATE TABLE IF NOT EXISTS DUMMY_CURSOR_STORAGE ("
> + "ORGANIZATION_ID CHAR(15) NOT NULL, QUERY_ID CHAR(15) NOT NULL,
> CURSOR_ORDER BIGINT NOT NULL "
> + "CONSTRAINT MAIN_PK PRIMARY KEY (ORGANIZATION_ID, QUERY_ID,
> CURSOR_ORDER) "
> + ") SALT_BUCKETS = 64";
> conn.createStatement().execute(ddl);
> conn.createStatement().execute("CREATE TABLE DUMMY_SEQ_TEST_DATA
> (ORGANIZATION_ID CHAR(15) NOT NULL, k1 integer not null, v1 integer not null
> CONSTRAINT PK PRIMARY KEY (ORGANIZATION_ID, k1, v1) ) VERSIONS=1,
> SALT_BUCKETS=64");
> conn.createStatement().execute("create sequence s cache " +
> Long.MAX_VALUE);
> conn.close();
> props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts
> + 10));
> conn = DriverManager.getConnection(getUrl(), props);
> for (int i = 0; i < 500000; i++) {
> conn.createStatement().execute("upsert into DUMMY_SEQ_TEST_DATA
> values ('00Dxx0000001gEH'," + i + "," + i + ")");
> }
> conn.commit();
> props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts
> + 15));
> conn = DriverManager.getConnection(getUrl(), props);
> conn.setAutoCommit(true);
> conn.createStatement().execute("upsert into DUMMY_CURSOR_STORAGE
> select ORGANIZATION_ID, 'MyQueryId', next value for s FROM
> DUMMY_SEQ_TEST_DATA");
> //conn.commit();
> props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts
> + 20));
> conn = DriverManager.getConnection(getUrl(), props);
> ResultSet rs = conn.createStatement().executeQuery("select count(*)
> from DUMMY_CURSOR_STORAGE");
>
> assertTrue(rs.next());
> assertEquals(500000, rs.getLong(1));
> conn.close();
> props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts
> + 25));
> ResultSet rs2 = conn.createStatement().executeQuery("select
> cursor_order from DUMMY_CURSOR_STORAGE");
> long seq = 1;
> while (rs2.next()) {
> assertEquals(seq, rs2.getLong(1));
> seq++;
> }
> conn.close();
>
> }
> {code}
--
This message was sent by Atlassian JIRA
(v6.2#6252)