Repository: phoenix
Updated Branches:
  refs/heads/master 168ab7ad9 -> 97a906bc7


PHOENIX-1516 Add RAND() built-in function.


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/97a906bc
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/97a906bc
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/97a906bc

Branch: refs/heads/master
Commit: 97a906bc7f6459e23409ba95533752456a3acce4
Parents: 168ab7a
Author: Lars Hofhansl <la...@apache.org>
Authored: Tue Jan 20 17:33:29 2015 -0800
Committer: Lars Hofhansl <la...@apache.org>
Committed: Tue Jan 20 17:33:29 2015 -0800

----------------------------------------------------------------------
 .../phoenix/end2end/ArithmeticQueryIT.java      | 100 ++++++++++++++
 .../apache/phoenix/end2end/UpsertValuesIT.java  |  34 +++++
 .../apache/phoenix/compile/RowProjector.java    |  73 +++++++++-
 .../apache/phoenix/compile/UpsertCompiler.java  |   8 +-
 .../UngroupedAggregateRegionObserver.java       |   5 +-
 .../apache/phoenix/expression/Determinism.java  |   2 +-
 .../phoenix/expression/ExpressionType.java      |   4 +-
 .../expression/function/FunctionExpression.java |   2 +-
 .../expression/function/RandomFunction.java     | 134 +++++++++++++++++++
 .../apache/phoenix/jdbc/PhoenixResultSet.java   |   9 +-
 .../phoenix/mapreduce/PhoenixRecordReader.java  |   5 +-
 11 files changed, 359 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/97a906bc/phoenix-core/src/it/java/org/apache/phoenix/end2end/ArithmeticQueryIT.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ArithmeticQueryIT.java 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ArithmeticQueryIT.java
index 21af737..2df1827 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ArithmeticQueryIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ArithmeticQueryIT.java
@@ -295,6 +295,106 @@ public class ArithmeticQueryIT extends 
BaseHBaseManagedTimeIT {
     }
 
     @Test
+    public void testRandomFunction() throws Exception {
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        Connection conn = DriverManager.getConnection(getUrl(), props);
+        conn.setAutoCommit(false);
+        try {
+            String ddl = "CREATE TABLE testRandomFunction (pk VARCHAR NOT NULL 
PRIMARY KEY)";
+            createTestTable(getUrl(), ddl);
+            conn.createStatement().execute("upsert into testRandomFunction 
values ('x')");
+            conn.createStatement().execute("upsert into testRandomFunction 
values ('y')");
+            conn.createStatement().execute("upsert into testRandomFunction 
values ('z')");
+            conn.commit();
+
+            ResultSet rs = conn.createStatement().executeQuery("select rand(), 
rand(), rand(1), rand(2), rand(1) from testRandomFunction");
+            assertTrue(rs.next());
+            double rand0 = rs.getDouble(1);
+            double rand1 = rs.getDouble(3);
+            double rand2 = rs.getDouble(4);
+            assertTrue(rs.getDouble(1) != rs.getDouble(2));
+            assertTrue(rs.getDouble(2) != rs.getDouble(3));
+            assertTrue(rs.getDouble(3) == rs.getDouble(5));
+            assertTrue(rs.getDouble(4) != rs.getDouble(5));
+            assertTrue(rs.next());
+            assertTrue(rand0 != rs.getDouble(1));
+            assertTrue(rand1 != rs.getDouble(3));
+            assertTrue(rand2 != rs.getDouble(4));
+            double rand01 = rs.getDouble(1);
+            double rand11 = rs.getDouble(3);
+            double rand21 = rs.getDouble(4);
+            assertTrue(rs.getDouble(1) != rs.getDouble(2));
+            assertTrue(rs.getDouble(2) != rs.getDouble(3));
+            assertTrue(rs.getDouble(3) == rs.getDouble(5));
+            assertTrue(rs.getDouble(4) != rs.getDouble(5));
+            assertTrue(rs.next());
+            assertTrue(rand01 != rs.getDouble(1));
+            assertTrue(rand11 != rs.getDouble(3));
+            assertTrue(rand21 != rs.getDouble(4));
+            assertTrue(rs.getDouble(1) != rs.getDouble(2));
+            assertTrue(rs.getDouble(2) != rs.getDouble(3));
+            assertTrue(rs.getDouble(3) == rs.getDouble(5));
+            assertTrue(rs.getDouble(4) != rs.getDouble(5));
+            double rand12 = rs.getDouble(3);
+
+            rs = conn.createStatement().executeQuery("select rand(), rand(), 
rand(1), rand(2), rand(1) from testRandomFunction");
+            assertTrue(rs.next());
+            assertTrue(rs.getDouble(1) != rs.getDouble(2));
+            assertTrue(rs.getDouble(2) != rs.getDouble(3));
+            assertTrue(rs.getDouble(3) == rs.getDouble(5));
+            assertTrue(rs.getDouble(4) != rs.getDouble(5));
+            assertTrue(rand0 != rs.getDouble(1));
+            assertTrue(rand1 == rs.getDouble(3));
+            assertTrue(rand2 == rs.getDouble(4));
+            assertTrue(rs.next());
+            assertTrue(rand01 != rs.getDouble(1));
+            assertTrue(rand11 == rs.getDouble(3));
+            assertTrue(rand21 == rs.getDouble(4));
+            assertTrue(rs.next());
+            assertTrue(rand12 == rs.getDouble(3));
+
+            ddl = "CREATE TABLE testRandomFunction1 (pk VARCHAR NOT NULL 
PRIMARY KEY, v1 UNSIGNED_DOUBLE)";
+            createTestTable(getUrl(), ddl);
+            conn.createStatement().execute("upsert into testRandomFunction1 
select pk, rand(1) from testRandomFunction");
+            conn.commit();
+
+            rs = conn.createStatement().executeQuery("select count(*) from 
testRandomFunction1 where v1 = rand(1)");
+            assertTrue(rs.next());
+            assertEquals(3, rs.getInt(1));
+
+            rs = conn.createStatement().executeQuery("select count(*) from 
testRandomFunction1 where v1 = rand(2)");
+            assertTrue(rs.next());
+            assertEquals(0, rs.getInt(1));
+
+            conn.createStatement().execute("delete from testRandomFunction1 
where v1 = rand(2)");
+            conn.commit();
+
+            rs = conn.createStatement().executeQuery("select count(*) from 
testRandomFunction1 where v1 = rand(1)");
+            assertTrue(rs.next());
+            assertEquals(3, rs.getInt(1));
+
+            conn.setAutoCommit(true);
+            conn.createStatement().execute("upsert into testRandomFunction1 
select pk, rand(2) from testRandomFunction1");
+
+            rs = conn.createStatement().executeQuery("select count(*) from 
testRandomFunction1 where v1 = rand(1)");
+            assertTrue(rs.next());
+            assertEquals(0, rs.getInt(1));
+
+            rs = conn.createStatement().executeQuery("select count(*) from 
testRandomFunction1 where v1 = rand(2)");
+            assertTrue(rs.next());
+            assertEquals(3, rs.getInt(1));
+
+            conn.createStatement().execute("delete from testRandomFunction1 
where v1 = rand(2)");
+
+            rs = conn.createStatement().executeQuery("select count(*) from 
testRandomFunction1 where v1 = rand(2)");
+            assertTrue(rs.next());
+            assertEquals(0, rs.getInt(1));
+        } finally {
+            conn.close();
+        }
+    }
+
+    @Test
     public void testDecimalArithmeticWithIntAndLong() throws Exception {
         Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
         Connection conn = DriverManager.getConnection(getUrl(), props);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/97a906bc/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertValuesIT.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertValuesIT.java 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertValuesIT.java
index 6389088..7c3c073 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertValuesIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertValuesIT.java
@@ -161,6 +161,40 @@ public class UpsertValuesIT extends 
BaseClientManagedTimeIT {
     }
 
     @Test
+    public void testUpsertRandomValues() throws Exception {
+        long ts = nextTimestamp();
+        Properties props = new Properties();
+        props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, 
Long.toString(ts));
+        Connection conn = DriverManager.getConnection(getUrl(), props);
+        conn.createStatement().execute("create table UpsertRandomTest (k 
UNSIGNED_DOUBLE not null primary key, v1 UNSIGNED_DOUBLE, v2 UNSIGNED_DOUBLE, 
v3 UNSIGNED_DOUBLE, v4 UNSIGNED_DOUBLE)");
+        conn.close();
+
+        props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, 
Long.toString(ts+5));
+        conn = DriverManager.getConnection(getUrl(), props);
+        conn.createStatement().execute("upsert into UpsertRandomTest values 
(RAND(), RAND(), RAND(1), RAND(2), RAND(1))");
+        conn.createStatement().execute("upsert into UpsertRandomTest values 
(RAND(), RAND(), RAND(1), RAND(2), RAND(1))");
+        conn.commit();
+        conn.close();
+
+        props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, 
Long.toString(ts+10));
+        conn = DriverManager.getConnection(getUrl(), props);
+        ResultSet rs = conn.createStatement().executeQuery("select 
k,v1,v2,v3,v4 from UpsertRandomTest");
+        assertTrue(rs.next());
+        double rand0 = rs.getDouble(1);
+        double rand1 = rs.getDouble(3);
+        double rand2 = rs.getDouble(4);
+        assertTrue(rs.getDouble(1) != rs.getDouble(2));
+        assertTrue(rs.getDouble(2) != rs.getDouble(3));
+        assertTrue(rand1 == rs.getDouble(5));
+        assertTrue(rs.getDouble(4) != rs.getDouble(5));
+        assertTrue(rs.next());
+        assertTrue(rand0 != rs.getDouble(1));
+        assertTrue(rand1 == rs.getDouble(3) && rand1 == rs.getDouble(5));
+        assertTrue(rand2 == rs.getDouble(4));
+        conn.close();
+    }
+
+    @Test
     public void testUpsertVarCharWithMaxLength() throws Exception {
         long ts = nextTimestamp();
         Properties props = new Properties();

http://git-wip-us.apache.org/repos/asf/phoenix/blob/97a906bc/phoenix-core/src/main/java/org/apache/phoenix/compile/RowProjector.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/compile/RowProjector.java 
b/phoenix-core/src/main/java/org/apache/phoenix/compile/RowProjector.java
index ee575f1..d1a9180 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/RowProjector.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/RowProjector.java
@@ -17,18 +17,25 @@
  */
 package org.apache.phoenix.compile;
 
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
 import java.sql.SQLException;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
-import java.util.Map;
 
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.expression.ExpressionType;
 import org.apache.phoenix.schema.AmbiguousColumnException;
 import org.apache.phoenix.schema.ColumnNotFoundException;
 import org.apache.phoenix.util.SchemaUtil;
+import org.apache.phoenix.util.TrustedByteArrayOutputStream;
 
 import com.google.common.collect.ArrayListMultimap;
 import com.google.common.collect.ListMultimap;
-import com.google.common.collect.Maps;
 
 
 /**
@@ -40,7 +47,7 @@ import com.google.common.collect.Maps;
  * 
  * @since 0.1
  */
-public class RowProjector {
+public class RowProjector implements Cloneable {
     public static final RowProjector EMPTY_PROJECTOR = new 
RowProjector(Collections.<ColumnProjector>emptyList(),0, true);
 
     private final List<? extends ColumnProjector> columnProjectors;
@@ -49,6 +56,7 @@ public class RowProjector {
     private final boolean someCaseSensitive;
     private final int estimatedSize;
     private final boolean isProjectEmptyKeyValue;
+    private volatile List<byte[]> rowExpressions;
     
     public RowProjector(RowProjector projector, boolean 
isProjectEmptyKeyValue) {
         this(projector.getColumnProjectors(), 
projector.getEstimatedRowByteSize(), isProjectEmptyKeyValue);
@@ -80,6 +88,54 @@ public class RowProjector {
         this.estimatedSize = estimatedRowSize;
         this.isProjectEmptyKeyValue = isProjectEmptyKeyValue;
     }
+
+    @Override
+    public RowProjector clone() {
+        return cloneRowProjector();
+    }
+
+    private RowProjector cloneRowProjector() {
+        if (rowExpressions == null) {
+            synchronized(this) {
+                if (rowExpressions == null) {
+                    List<byte[]> localRowExpressions = new 
ArrayList<byte[]>(this.getColumnCount());
+                    try {
+                        for (int i = 0; i < this.getColumnCount(); i++) {
+                             TrustedByteArrayOutputStream bytesOut = new 
TrustedByteArrayOutputStream(1024);
+                             DataOutputStream output = new 
DataOutputStream(bytesOut);
+                             Expression expression = 
this.getColumnProjector(i).getExpression();
+                             WritableUtils.writeVInt(output, 
ExpressionType.valueOf(expression).ordinal());
+                             expression.write(output);
+                             output.flush();
+                             localRowExpressions.add(bytesOut.getBuffer());
+                        }
+                    } catch (IOException io) {
+                        throw new RuntimeException(io);
+                    }
+                    rowExpressions = localRowExpressions;
+                }
+            }
+        }
+        List<ColumnProjector> colProjectors = new 
ArrayList<ColumnProjector>(rowExpressions.size());
+        try {
+            for (int i=0; i<rowExpressions.size(); i++) {
+                ByteArrayInputStream bytesIn = new 
ByteArrayInputStream(rowExpressions.get(i));
+                DataInputStream input = new DataInputStream(bytesIn);
+                Expression expression = 
ExpressionType.values()[WritableUtils.readVInt(input)].newInstance();
+                expression.readFields(input);
+                ColumnProjector colProjector = this.getColumnProjector(i);
+                colProjectors.add(new 
ExpressionProjector(colProjector.getName(),
+                    colProjector.getTableName(), 
+                    expression,
+                    colProjector.isCaseSensitive()));
+            }
+            return new RowProjector(colProjectors, 
+                this.getEstimatedRowByteSize(),
+                this.isProjectEmptyKeyValue());
+        } catch (IOException io) {
+            throw new RuntimeException(io);
+        }
+   }
     
     public boolean isProjectEmptyKeyValue() {
         return isProjectEmptyKeyValue;
@@ -135,4 +191,13 @@ public class RowProjector {
     public int getEstimatedRowByteSize() {
         return estimatedSize;
     }
-}
\ No newline at end of file
+
+    /**
+     * allow individual expressions to reset their state between rows
+     */
+    public void reset() {
+        for (ColumnProjector projector : columnProjectors) {
+            projector.getExpression().reset();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/97a906bc/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java 
b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
index 5395210..e0ec3fd 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
@@ -73,7 +73,6 @@ import org.apache.phoenix.schema.MetaDataClient;
 import org.apache.phoenix.schema.MetaDataEntityNotFoundException;
 import org.apache.phoenix.schema.PColumn;
 import org.apache.phoenix.schema.PColumnImpl;
-import org.apache.phoenix.schema.types.PLong;
 import org.apache.phoenix.schema.PName;
 import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.schema.PTable.ViewType;
@@ -84,6 +83,7 @@ import org.apache.phoenix.schema.SortOrder;
 import org.apache.phoenix.schema.TableRef;
 import org.apache.phoenix.schema.TypeMismatchException;
 import org.apache.phoenix.schema.tuple.Tuple;
+import org.apache.phoenix.schema.types.PLong;
 import org.apache.phoenix.util.ByteUtil;
 import org.apache.phoenix.util.IndexUtil;
 import org.apache.phoenix.util.MetaDataUtil;
@@ -186,11 +186,13 @@ public class UpsertCompiler {
 
         @Override
         protected MutationState mutate(StatementContext context, 
ResultIterator iterator, PhoenixConnection connection) throws SQLException {
-            PhoenixStatement statement = new PhoenixStatement(connection);
             if (context.getSequenceManager().getSequenceCount() > 0) {
                 throw new IllegalStateException("Cannot pipeline upsert when 
sequence is referenced");
             }
-            return upsertSelect(statement, tableRef, projector, iterator, 
columnIndexes, pkSlotIndexes);
+            PhoenixStatement statement = new PhoenixStatement(connection);
+            // Clone the row projector as it's not thread safe and would be 
used simultaneously by
+            // multiple threads otherwise.
+            return upsertSelect(statement, tableRef, projector.clone(), 
iterator, columnIndexes, pkSlotIndexes);
         }
         
         public void setRowProjector(RowProjector projector) {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/97a906bc/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
index 7e46370..5bbd5d3 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
@@ -75,13 +75,13 @@ import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.query.QueryServicesOptions;
 import org.apache.phoenix.schema.ConstraintViolationException;
 import org.apache.phoenix.schema.PColumn;
-import org.apache.phoenix.schema.types.PDataType;
 import org.apache.phoenix.schema.PRow;
 import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.schema.PTableImpl;
 import org.apache.phoenix.schema.SortOrder;
 import org.apache.phoenix.schema.stats.StatisticsCollector;
 import org.apache.phoenix.schema.tuple.MultiKeyValueTuple;
+import org.apache.phoenix.schema.types.PDataType;
 import org.apache.phoenix.util.ByteUtil;
 import org.apache.phoenix.util.IndexUtil;
 import org.apache.phoenix.util.KeyValueUtil;
@@ -323,6 +323,9 @@ public class UngroupedAggregateRegionObserver extends 
BaseScannerRegionObserver{
                             for (Mutation mutation : row.toRowMutations()) {
                                 mutations.add(mutation);
                             }
+                            for (i = 0; i < selectExpressions.size(); i++) {
+                                selectExpressions.get(i).reset();
+                            }
                         } else if (deleteCF != null && deleteCQ != null) {
                             // No need to search for delete column, since we 
project only it
                             // if no empty key value is being set

http://git-wip-us.apache.org/repos/asf/phoenix/blob/97a906bc/phoenix-core/src/main/java/org/apache/phoenix/expression/Determinism.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/expression/Determinism.java 
b/phoenix-core/src/main/java/org/apache/phoenix/expression/Determinism.java
index b2f3524..5acbfc7 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/expression/Determinism.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/expression/Determinism.java
@@ -11,7 +11,7 @@ package org.apache.phoenix.expression;
 
 public enum Determinism {
        
-       ALWAYS, PER_STATEMENT, PER_ROW;
+       ALWAYS, PER_STATEMENT, PER_ROW, PER_INVOCATION;
        
        public Determinism combine (Determinism that) {
                return Determinism.values()[Math.max(this.ordinal(), 
that.ordinal())];

http://git-wip-us.apache.org/repos/asf/phoenix/blob/97a906bc/phoenix-core/src/main/java/org/apache/phoenix/expression/ExpressionType.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/expression/ExpressionType.java 
b/phoenix-core/src/main/java/org/apache/phoenix/expression/ExpressionType.java
index 2893655..839ef40 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/expression/ExpressionType.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/expression/ExpressionType.java
@@ -55,6 +55,7 @@ import 
org.apache.phoenix.expression.function.PercentRankAggregateFunction;
 import org.apache.phoenix.expression.function.PercentileContAggregateFunction;
 import org.apache.phoenix.expression.function.PercentileDiscAggregateFunction;
 import org.apache.phoenix.expression.function.RTrimFunction;
+import org.apache.phoenix.expression.function.RandomFunction;
 import org.apache.phoenix.expression.function.RegexpReplaceFunction;
 import org.apache.phoenix.expression.function.RegexpSplitFunction;
 import org.apache.phoenix.expression.function.RegexpSubstrFunction;
@@ -185,7 +186,8 @@ public enum ExpressionType {
     SQLIndexTypeFunction(SQLIndexTypeFunction.class),
     ModulusExpression(ModulusExpression.class),
     DistinctValueAggregateFunction(DistinctValueAggregateFunction.class),
-    RegexpSplitFunctiond(RegexpSplitFunction.class);
+    RegexpSplitFunctiond(RegexpSplitFunction.class),
+    RandomFunction(RandomFunction.class);
     ExpressionType(Class<? extends Expression> clazz) {
         this.clazz = clazz;
     }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/97a906bc/phoenix-core/src/main/java/org/apache/phoenix/expression/function/FunctionExpression.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/expression/function/FunctionExpression.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/expression/function/FunctionExpression.java
index 45c3e15..b45706a 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/expression/function/FunctionExpression.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/expression/function/FunctionExpression.java
@@ -57,7 +57,7 @@ public abstract class FunctionExpression extends 
BaseCompoundExpression {
     abstract public String getName();
     
     @Override
-    public final String toString() {
+    public String toString() {
         StringBuilder buf = new StringBuilder(getName() + "(");
         if (children.size()==0)
             return buf.append(")").toString();

http://git-wip-us.apache.org/repos/asf/phoenix/blob/97a906bc/phoenix-core/src/main/java/org/apache/phoenix/expression/function/RandomFunction.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/expression/function/RandomFunction.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/expression/function/RandomFunction.java
new file mode 100644
index 0000000..ffa405b
--- /dev/null
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/expression/function/RandomFunction.java
@@ -0,0 +1,134 @@
+package org.apache.phoenix.expression.function;
+
+import java.io.DataInput;
+import java.io.IOException;
+import java.util.List;
+import java.util.Random;
+
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.phoenix.expression.Determinism;
+import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.expression.LiteralExpression;
+import org.apache.phoenix.parse.FunctionParseNode.Argument;
+import org.apache.phoenix.parse.FunctionParseNode.BuiltInFunction;
+import org.apache.phoenix.schema.tuple.Tuple;
+import org.apache.phoenix.schema.types.PDataType;
+import org.apache.phoenix.schema.types.PLong;
+import org.apache.phoenix.schema.types.PUnsignedDouble;
+
+/**
+ * Random function that produces a unique value upon each invocation unless a 
seed is provided.
+ * If a seed is provided the returned value is identical across each 
invocation for a single row, but different across multiple rows.
+ * The seed must be a constant.
+ * <p>
+ * Example:
+ * <pre>
+ * 0: jdbc:phoenix:localhost> select rand(), rand(), rand(1), rand(2), rand(1) 
from t;
+ * 
+----------------------------+----------------------------+----------------------------+----------------------------+-----------------------+
+ * |           RAND()           |           RAND()           |          
RAND(1)           |          RAND(2)           |          RAND(1)      |
+ * 
+----------------------------+----------------------------+----------------------------+----------------------------+-----------------------+
+ * | 0.18927325291276054        | 0.19335253869230284        | 
0.7308781907032909         | 0.7311469360199058         | 0.7308781907032909    
|
+ * | 0.08156917775368278        | 0.10178318739559034        | 
0.41008081149220166        | 0.9014476240300544         | 0.41008081149220166   
|
+ * 
+----------------------------+----------------------------+----------------------------+----------------------------+-----------------------+
+ * 2 rows selected (0.096 seconds)
+ * 0: jdbc:phoenix:localhost> select rand(), rand(), rand(1), rand(2), rand(1) 
from t;
+ * 
+----------------------------+----------------------------+----------------------------+----------------------------+-----------------------+
+ * |           RAND()           |           RAND()           |          
RAND(1)           |          RAND(2)           |          RAND(1)      |
+ * 
+----------------------------+----------------------------+----------------------------+----------------------------+-----------------------+
+ * | 0.6452639556507597         | 0.8167638693890659         | 
0.7308781907032909         | 0.7311469360199058         | 0.7308781907032909    
|
+ * | 0.8084646053276106         | 0.6969504742211767         | 
0.41008081149220166        | 0.9014476240300544         | 0.41008081149220166   
|
+ * 
+----------------------------+----------------------------+----------------------------+----------------------------+-----------------------+
+ * 2 rows selected (0.098 seconds)
+ * </pre>
+ */
+@BuiltInFunction(name = RandomFunction.NAME, args = 
{@Argument(allowedTypes={PLong.class},defaultValue="null",isConstant=true)})
+public class RandomFunction extends ScalarFunction {
+    public static final String NAME = "RAND";
+    private Random random;
+    private boolean hasSeed;
+    private Double current;
+
+    public RandomFunction() {
+    }
+
+    public RandomFunction(List<Expression> children) {
+        super(children);
+        init();
+    }
+
+    private void init() {
+        Number seed = (Number)((LiteralExpression)children.get(0)).getValue();
+        random = seed == null ? new Random() : new Random(seed.longValue());
+        hasSeed = seed != null;
+        current = null;
+    }
+
+    @Override
+    public void readFields(DataInput input) throws IOException {
+        super.readFields(input);
+        init();
+    }
+
+    @Override
+    public boolean evaluate(Tuple tuple, ImmutableBytesWritable ptr) {
+        if (current == null) {
+            current = random.nextDouble();
+        }
+        ptr.set(PUnsignedDouble.INSTANCE.toBytes(current));
+        return true;
+    }
+
+    // produce a new random value for each row
+    @Override
+    public void reset() {
+        super.reset();
+        current = null;
+    }
+
+    @Override
+    public PDataType<?> getDataType() {
+        return PUnsignedDouble.INSTANCE;
+    }
+
+    @Override
+    public String getName() {
+        return NAME;
+    }
+
+    @Override
+    public Determinism getDeterminism() {
+        return Determinism.PER_INVOCATION;
+    }
+
+    @Override
+    public boolean isStateless() {
+        return true;
+    }
+
+    // take the random object onto account
+    @Override
+    public int hashCode() {
+        final int prime = 31;
+        int result = 1;
+        result = prime * result + children.hashCode() + random.hashCode();
+        return result;
+    }
+
+    // take the random object onto account
+    @Override
+    public boolean equals(Object obj) {
+        return super.equals(obj) && 
random.equals(((RandomFunction)obj).random);
+    }
+
+    // make sure we do not show the default 'null' parameter
+    @Override
+    public final String toString() {
+        StringBuilder buf = new StringBuilder(getName() + "(");
+        if (!hasSeed) return buf.append(")").toString();
+        for (int i = 0; i < children.size() - 1; i++) {
+            buf.append(children.get(i) + ", ");
+        }
+        buf.append(children.get(children.size()-1) + ")");
+        return buf.toString();
+    }
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/97a906bc/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixResultSet.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixResultSet.java 
b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixResultSet.java
index 1630dbd..7d91dbb 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixResultSet.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixResultSet.java
@@ -49,13 +49,15 @@ import org.apache.phoenix.compile.RowProjector;
 import org.apache.phoenix.exception.SQLExceptionCode;
 import org.apache.phoenix.exception.SQLExceptionInfo;
 import org.apache.phoenix.iterate.ResultIterator;
-import org.apache.phoenix.schema.types.PDecimal;
+import org.apache.phoenix.schema.tuple.ResultTuple;
+import org.apache.phoenix.schema.tuple.Tuple;
 import org.apache.phoenix.schema.types.PBoolean;
+import org.apache.phoenix.schema.types.PDataType;
 import org.apache.phoenix.schema.types.PDate;
+import org.apache.phoenix.schema.types.PDecimal;
 import org.apache.phoenix.schema.types.PDouble;
 import org.apache.phoenix.schema.types.PFloat;
 import org.apache.phoenix.schema.types.PInteger;
-import org.apache.phoenix.schema.types.PDataType;
 import org.apache.phoenix.schema.types.PLong;
 import org.apache.phoenix.schema.types.PSmallint;
 import org.apache.phoenix.schema.types.PTime;
@@ -63,8 +65,6 @@ import org.apache.phoenix.schema.types.PTimestamp;
 import org.apache.phoenix.schema.types.PTinyint;
 import org.apache.phoenix.schema.types.PVarbinary;
 import org.apache.phoenix.schema.types.PVarchar;
-import org.apache.phoenix.schema.tuple.ResultTuple;
-import org.apache.phoenix.schema.tuple.Tuple;
 import org.apache.phoenix.util.DateUtil;
 import org.apache.phoenix.util.SQLCloseable;
 
@@ -762,6 +762,7 @@ public class PhoenixResultSet implements ResultSet, 
SQLCloseable, org.apache.pho
         checkOpen();
         try {
             currentRow = scanner.next();
+            rowProjector.reset();
         } catch (RuntimeException e) {
             // FIXME: Expression.evaluate does not throw SQLException
             // so this will unwrap throws from that.

http://git-wip-us.apache.org/repos/asf/phoenix/blob/97a906bc/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordReader.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordReader.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordReader.java
index 2c206ab..308f0ad 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordReader.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordReader.java
@@ -108,8 +108,9 @@ public class PhoenixRecordReader<T extends DBWritable> 
extends RecordReader<Null
             if(queryPlan.getContext().getSequenceManager().getSequenceCount() 
> 0) {
                 iterator = new SequenceResultIterator(iterator, 
queryPlan.getContext().getSequenceManager());
             }
-            this.resultIterator = iterator;
-            this.resultSet = new PhoenixResultSet(this.resultIterator, 
queryPlan.getProjector(),queryPlan.getContext().getStatement());
+            // Clone the row projector as it's not thread safe and would be 
used simultaneously by
+            // multiple threads otherwise.
+            this.resultSet = new PhoenixResultSet(this.resultIterator, 
queryPlan.getProjector().clone(),queryPlan.getContext().getStatement());
         } catch (SQLException e) {
             LOG.error(String.format(" Error [%s] initializing 
PhoenixRecordReader. ",e.getMessage()));
             Throwables.propagate(e);

Reply via email to