Repository: phoenix Updated Branches: refs/heads/master 168ab7ad9 -> 97a906bc7
PHOENIX-1516 Add RAND() built-in function. Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/97a906bc Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/97a906bc Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/97a906bc Branch: refs/heads/master Commit: 97a906bc7f6459e23409ba95533752456a3acce4 Parents: 168ab7a Author: Lars Hofhansl <la...@apache.org> Authored: Tue Jan 20 17:33:29 2015 -0800 Committer: Lars Hofhansl <la...@apache.org> Committed: Tue Jan 20 17:33:29 2015 -0800 ---------------------------------------------------------------------- .../phoenix/end2end/ArithmeticQueryIT.java | 100 ++++++++++++++ .../apache/phoenix/end2end/UpsertValuesIT.java | 34 +++++ .../apache/phoenix/compile/RowProjector.java | 73 +++++++++- .../apache/phoenix/compile/UpsertCompiler.java | 8 +- .../UngroupedAggregateRegionObserver.java | 5 +- .../apache/phoenix/expression/Determinism.java | 2 +- .../phoenix/expression/ExpressionType.java | 4 +- .../expression/function/FunctionExpression.java | 2 +- .../expression/function/RandomFunction.java | 134 +++++++++++++++++++ .../apache/phoenix/jdbc/PhoenixResultSet.java | 9 +- .../phoenix/mapreduce/PhoenixRecordReader.java | 5 +- 11 files changed, 359 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/97a906bc/phoenix-core/src/it/java/org/apache/phoenix/end2end/ArithmeticQueryIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ArithmeticQueryIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ArithmeticQueryIT.java index 21af737..2df1827 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ArithmeticQueryIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ArithmeticQueryIT.java @@ -295,6 +295,106 @@ public class ArithmeticQueryIT extends BaseHBaseManagedTimeIT { } @Test + public void testRandomFunction() throws Exception { + Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); + Connection conn = DriverManager.getConnection(getUrl(), props); + conn.setAutoCommit(false); + try { + String ddl = "CREATE TABLE testRandomFunction (pk VARCHAR NOT NULL PRIMARY KEY)"; + createTestTable(getUrl(), ddl); + conn.createStatement().execute("upsert into testRandomFunction values ('x')"); + conn.createStatement().execute("upsert into testRandomFunction values ('y')"); + conn.createStatement().execute("upsert into testRandomFunction values ('z')"); + conn.commit(); + + ResultSet rs = conn.createStatement().executeQuery("select rand(), rand(), rand(1), rand(2), rand(1) from testRandomFunction"); + assertTrue(rs.next()); + double rand0 = rs.getDouble(1); + double rand1 = rs.getDouble(3); + double rand2 = rs.getDouble(4); + assertTrue(rs.getDouble(1) != rs.getDouble(2)); + assertTrue(rs.getDouble(2) != rs.getDouble(3)); + assertTrue(rs.getDouble(3) == rs.getDouble(5)); + assertTrue(rs.getDouble(4) != rs.getDouble(5)); + assertTrue(rs.next()); + assertTrue(rand0 != rs.getDouble(1)); + assertTrue(rand1 != rs.getDouble(3)); + assertTrue(rand2 != rs.getDouble(4)); + double rand01 = rs.getDouble(1); + double rand11 = rs.getDouble(3); + double rand21 = rs.getDouble(4); + assertTrue(rs.getDouble(1) != rs.getDouble(2)); + assertTrue(rs.getDouble(2) != rs.getDouble(3)); + assertTrue(rs.getDouble(3) == rs.getDouble(5)); + assertTrue(rs.getDouble(4) != rs.getDouble(5)); + assertTrue(rs.next()); + assertTrue(rand01 != rs.getDouble(1)); + assertTrue(rand11 != rs.getDouble(3)); + assertTrue(rand21 != rs.getDouble(4)); + assertTrue(rs.getDouble(1) != rs.getDouble(2)); + assertTrue(rs.getDouble(2) != rs.getDouble(3)); + assertTrue(rs.getDouble(3) == rs.getDouble(5)); + assertTrue(rs.getDouble(4) != rs.getDouble(5)); + double rand12 = rs.getDouble(3); + + rs = conn.createStatement().executeQuery("select rand(), rand(), rand(1), rand(2), rand(1) from testRandomFunction"); + assertTrue(rs.next()); + assertTrue(rs.getDouble(1) != rs.getDouble(2)); + assertTrue(rs.getDouble(2) != rs.getDouble(3)); + assertTrue(rs.getDouble(3) == rs.getDouble(5)); + assertTrue(rs.getDouble(4) != rs.getDouble(5)); + assertTrue(rand0 != rs.getDouble(1)); + assertTrue(rand1 == rs.getDouble(3)); + assertTrue(rand2 == rs.getDouble(4)); + assertTrue(rs.next()); + assertTrue(rand01 != rs.getDouble(1)); + assertTrue(rand11 == rs.getDouble(3)); + assertTrue(rand21 == rs.getDouble(4)); + assertTrue(rs.next()); + assertTrue(rand12 == rs.getDouble(3)); + + ddl = "CREATE TABLE testRandomFunction1 (pk VARCHAR NOT NULL PRIMARY KEY, v1 UNSIGNED_DOUBLE)"; + createTestTable(getUrl(), ddl); + conn.createStatement().execute("upsert into testRandomFunction1 select pk, rand(1) from testRandomFunction"); + conn.commit(); + + rs = conn.createStatement().executeQuery("select count(*) from testRandomFunction1 where v1 = rand(1)"); + assertTrue(rs.next()); + assertEquals(3, rs.getInt(1)); + + rs = conn.createStatement().executeQuery("select count(*) from testRandomFunction1 where v1 = rand(2)"); + assertTrue(rs.next()); + assertEquals(0, rs.getInt(1)); + + conn.createStatement().execute("delete from testRandomFunction1 where v1 = rand(2)"); + conn.commit(); + + rs = conn.createStatement().executeQuery("select count(*) from testRandomFunction1 where v1 = rand(1)"); + assertTrue(rs.next()); + assertEquals(3, rs.getInt(1)); + + conn.setAutoCommit(true); + conn.createStatement().execute("upsert into testRandomFunction1 select pk, rand(2) from testRandomFunction1"); + + rs = conn.createStatement().executeQuery("select count(*) from testRandomFunction1 where v1 = rand(1)"); + assertTrue(rs.next()); + assertEquals(0, rs.getInt(1)); + + rs = conn.createStatement().executeQuery("select count(*) from testRandomFunction1 where v1 = rand(2)"); + assertTrue(rs.next()); + assertEquals(3, rs.getInt(1)); + + conn.createStatement().execute("delete from testRandomFunction1 where v1 = rand(2)"); + + rs = conn.createStatement().executeQuery("select count(*) from testRandomFunction1 where v1 = rand(2)"); + assertTrue(rs.next()); + assertEquals(0, rs.getInt(1)); + } finally { + conn.close(); + } + } + + @Test public void testDecimalArithmeticWithIntAndLong() throws Exception { Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); Connection conn = DriverManager.getConnection(getUrl(), props); http://git-wip-us.apache.org/repos/asf/phoenix/blob/97a906bc/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertValuesIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertValuesIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertValuesIT.java index 6389088..7c3c073 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertValuesIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertValuesIT.java @@ -161,6 +161,40 @@ public class UpsertValuesIT extends BaseClientManagedTimeIT { } @Test + public void testUpsertRandomValues() throws Exception { + long ts = nextTimestamp(); + Properties props = new Properties(); + props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts)); + Connection conn = DriverManager.getConnection(getUrl(), props); + conn.createStatement().execute("create table UpsertRandomTest (k UNSIGNED_DOUBLE not null primary key, v1 UNSIGNED_DOUBLE, v2 UNSIGNED_DOUBLE, v3 UNSIGNED_DOUBLE, v4 UNSIGNED_DOUBLE)"); + conn.close(); + + props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts+5)); + conn = DriverManager.getConnection(getUrl(), props); + conn.createStatement().execute("upsert into UpsertRandomTest values (RAND(), RAND(), RAND(1), RAND(2), RAND(1))"); + conn.createStatement().execute("upsert into UpsertRandomTest values (RAND(), RAND(), RAND(1), RAND(2), RAND(1))"); + conn.commit(); + conn.close(); + + props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(ts+10)); + conn = DriverManager.getConnection(getUrl(), props); + ResultSet rs = conn.createStatement().executeQuery("select k,v1,v2,v3,v4 from UpsertRandomTest"); + assertTrue(rs.next()); + double rand0 = rs.getDouble(1); + double rand1 = rs.getDouble(3); + double rand2 = rs.getDouble(4); + assertTrue(rs.getDouble(1) != rs.getDouble(2)); + assertTrue(rs.getDouble(2) != rs.getDouble(3)); + assertTrue(rand1 == rs.getDouble(5)); + assertTrue(rs.getDouble(4) != rs.getDouble(5)); + assertTrue(rs.next()); + assertTrue(rand0 != rs.getDouble(1)); + assertTrue(rand1 == rs.getDouble(3) && rand1 == rs.getDouble(5)); + assertTrue(rand2 == rs.getDouble(4)); + conn.close(); + } + + @Test public void testUpsertVarCharWithMaxLength() throws Exception { long ts = nextTimestamp(); Properties props = new Properties(); http://git-wip-us.apache.org/repos/asf/phoenix/blob/97a906bc/phoenix-core/src/main/java/org/apache/phoenix/compile/RowProjector.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/RowProjector.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/RowProjector.java index ee575f1..d1a9180 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/RowProjector.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/RowProjector.java @@ -17,18 +17,25 @@ */ package org.apache.phoenix.compile; +import java.io.ByteArrayInputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; import java.sql.SQLException; +import java.util.ArrayList; import java.util.Collections; import java.util.List; -import java.util.Map; +import org.apache.hadoop.io.WritableUtils; +import org.apache.phoenix.expression.Expression; +import org.apache.phoenix.expression.ExpressionType; import org.apache.phoenix.schema.AmbiguousColumnException; import org.apache.phoenix.schema.ColumnNotFoundException; import org.apache.phoenix.util.SchemaUtil; +import org.apache.phoenix.util.TrustedByteArrayOutputStream; import com.google.common.collect.ArrayListMultimap; import com.google.common.collect.ListMultimap; -import com.google.common.collect.Maps; /** @@ -40,7 +47,7 @@ import com.google.common.collect.Maps; * * @since 0.1 */ -public class RowProjector { +public class RowProjector implements Cloneable { public static final RowProjector EMPTY_PROJECTOR = new RowProjector(Collections.<ColumnProjector>emptyList(),0, true); private final List<? extends ColumnProjector> columnProjectors; @@ -49,6 +56,7 @@ public class RowProjector { private final boolean someCaseSensitive; private final int estimatedSize; private final boolean isProjectEmptyKeyValue; + private volatile List<byte[]> rowExpressions; public RowProjector(RowProjector projector, boolean isProjectEmptyKeyValue) { this(projector.getColumnProjectors(), projector.getEstimatedRowByteSize(), isProjectEmptyKeyValue); @@ -80,6 +88,54 @@ public class RowProjector { this.estimatedSize = estimatedRowSize; this.isProjectEmptyKeyValue = isProjectEmptyKeyValue; } + + @Override + public RowProjector clone() { + return cloneRowProjector(); + } + + private RowProjector cloneRowProjector() { + if (rowExpressions == null) { + synchronized(this) { + if (rowExpressions == null) { + List<byte[]> localRowExpressions = new ArrayList<byte[]>(this.getColumnCount()); + try { + for (int i = 0; i < this.getColumnCount(); i++) { + TrustedByteArrayOutputStream bytesOut = new TrustedByteArrayOutputStream(1024); + DataOutputStream output = new DataOutputStream(bytesOut); + Expression expression = this.getColumnProjector(i).getExpression(); + WritableUtils.writeVInt(output, ExpressionType.valueOf(expression).ordinal()); + expression.write(output); + output.flush(); + localRowExpressions.add(bytesOut.getBuffer()); + } + } catch (IOException io) { + throw new RuntimeException(io); + } + rowExpressions = localRowExpressions; + } + } + } + List<ColumnProjector> colProjectors = new ArrayList<ColumnProjector>(rowExpressions.size()); + try { + for (int i=0; i<rowExpressions.size(); i++) { + ByteArrayInputStream bytesIn = new ByteArrayInputStream(rowExpressions.get(i)); + DataInputStream input = new DataInputStream(bytesIn); + Expression expression = ExpressionType.values()[WritableUtils.readVInt(input)].newInstance(); + expression.readFields(input); + ColumnProjector colProjector = this.getColumnProjector(i); + colProjectors.add(new ExpressionProjector(colProjector.getName(), + colProjector.getTableName(), + expression, + colProjector.isCaseSensitive())); + } + return new RowProjector(colProjectors, + this.getEstimatedRowByteSize(), + this.isProjectEmptyKeyValue()); + } catch (IOException io) { + throw new RuntimeException(io); + } + } public boolean isProjectEmptyKeyValue() { return isProjectEmptyKeyValue; @@ -135,4 +191,13 @@ public class RowProjector { public int getEstimatedRowByteSize() { return estimatedSize; } -} \ No newline at end of file + + /** + * allow individual expressions to reset their state between rows + */ + public void reset() { + for (ColumnProjector projector : columnProjectors) { + projector.getExpression().reset(); + } + } +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/97a906bc/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java index 5395210..e0ec3fd 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java @@ -73,7 +73,6 @@ import org.apache.phoenix.schema.MetaDataClient; import org.apache.phoenix.schema.MetaDataEntityNotFoundException; import org.apache.phoenix.schema.PColumn; import org.apache.phoenix.schema.PColumnImpl; -import org.apache.phoenix.schema.types.PLong; import org.apache.phoenix.schema.PName; import org.apache.phoenix.schema.PTable; import org.apache.phoenix.schema.PTable.ViewType; @@ -84,6 +83,7 @@ import org.apache.phoenix.schema.SortOrder; import org.apache.phoenix.schema.TableRef; import org.apache.phoenix.schema.TypeMismatchException; import org.apache.phoenix.schema.tuple.Tuple; +import org.apache.phoenix.schema.types.PLong; import org.apache.phoenix.util.ByteUtil; import org.apache.phoenix.util.IndexUtil; import org.apache.phoenix.util.MetaDataUtil; @@ -186,11 +186,13 @@ public class UpsertCompiler { @Override protected MutationState mutate(StatementContext context, ResultIterator iterator, PhoenixConnection connection) throws SQLException { - PhoenixStatement statement = new PhoenixStatement(connection); if (context.getSequenceManager().getSequenceCount() > 0) { throw new IllegalStateException("Cannot pipeline upsert when sequence is referenced"); } - return upsertSelect(statement, tableRef, projector, iterator, columnIndexes, pkSlotIndexes); + PhoenixStatement statement = new PhoenixStatement(connection); + // Clone the row projector as it's not thread safe and would be used simultaneously by + // multiple threads otherwise. + return upsertSelect(statement, tableRef, projector.clone(), iterator, columnIndexes, pkSlotIndexes); } public void setRowProjector(RowProjector projector) { http://git-wip-us.apache.org/repos/asf/phoenix/blob/97a906bc/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java index 7e46370..5bbd5d3 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java @@ -75,13 +75,13 @@ import org.apache.phoenix.query.QueryServices; import org.apache.phoenix.query.QueryServicesOptions; import org.apache.phoenix.schema.ConstraintViolationException; import org.apache.phoenix.schema.PColumn; -import org.apache.phoenix.schema.types.PDataType; import org.apache.phoenix.schema.PRow; import org.apache.phoenix.schema.PTable; import org.apache.phoenix.schema.PTableImpl; import org.apache.phoenix.schema.SortOrder; import org.apache.phoenix.schema.stats.StatisticsCollector; import org.apache.phoenix.schema.tuple.MultiKeyValueTuple; +import org.apache.phoenix.schema.types.PDataType; import org.apache.phoenix.util.ByteUtil; import org.apache.phoenix.util.IndexUtil; import org.apache.phoenix.util.KeyValueUtil; @@ -323,6 +323,9 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver{ for (Mutation mutation : row.toRowMutations()) { mutations.add(mutation); } + for (i = 0; i < selectExpressions.size(); i++) { + selectExpressions.get(i).reset(); + } } else if (deleteCF != null && deleteCQ != null) { // No need to search for delete column, since we project only it // if no empty key value is being set http://git-wip-us.apache.org/repos/asf/phoenix/blob/97a906bc/phoenix-core/src/main/java/org/apache/phoenix/expression/Determinism.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/expression/Determinism.java b/phoenix-core/src/main/java/org/apache/phoenix/expression/Determinism.java index b2f3524..5acbfc7 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/expression/Determinism.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/expression/Determinism.java @@ -11,7 +11,7 @@ package org.apache.phoenix.expression; public enum Determinism { - ALWAYS, PER_STATEMENT, PER_ROW; + ALWAYS, PER_STATEMENT, PER_ROW, PER_INVOCATION; public Determinism combine (Determinism that) { return Determinism.values()[Math.max(this.ordinal(), that.ordinal())]; http://git-wip-us.apache.org/repos/asf/phoenix/blob/97a906bc/phoenix-core/src/main/java/org/apache/phoenix/expression/ExpressionType.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/expression/ExpressionType.java b/phoenix-core/src/main/java/org/apache/phoenix/expression/ExpressionType.java index 2893655..839ef40 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/expression/ExpressionType.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/expression/ExpressionType.java @@ -55,6 +55,7 @@ import org.apache.phoenix.expression.function.PercentRankAggregateFunction; import org.apache.phoenix.expression.function.PercentileContAggregateFunction; import org.apache.phoenix.expression.function.PercentileDiscAggregateFunction; import org.apache.phoenix.expression.function.RTrimFunction; +import org.apache.phoenix.expression.function.RandomFunction; import org.apache.phoenix.expression.function.RegexpReplaceFunction; import org.apache.phoenix.expression.function.RegexpSplitFunction; import org.apache.phoenix.expression.function.RegexpSubstrFunction; @@ -185,7 +186,8 @@ public enum ExpressionType { SQLIndexTypeFunction(SQLIndexTypeFunction.class), ModulusExpression(ModulusExpression.class), DistinctValueAggregateFunction(DistinctValueAggregateFunction.class), - RegexpSplitFunctiond(RegexpSplitFunction.class); + RegexpSplitFunctiond(RegexpSplitFunction.class), + RandomFunction(RandomFunction.class); ExpressionType(Class<? extends Expression> clazz) { this.clazz = clazz; } http://git-wip-us.apache.org/repos/asf/phoenix/blob/97a906bc/phoenix-core/src/main/java/org/apache/phoenix/expression/function/FunctionExpression.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/expression/function/FunctionExpression.java b/phoenix-core/src/main/java/org/apache/phoenix/expression/function/FunctionExpression.java index 45c3e15..b45706a 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/expression/function/FunctionExpression.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/expression/function/FunctionExpression.java @@ -57,7 +57,7 @@ public abstract class FunctionExpression extends BaseCompoundExpression { abstract public String getName(); @Override - public final String toString() { + public String toString() { StringBuilder buf = new StringBuilder(getName() + "("); if (children.size()==0) return buf.append(")").toString(); http://git-wip-us.apache.org/repos/asf/phoenix/blob/97a906bc/phoenix-core/src/main/java/org/apache/phoenix/expression/function/RandomFunction.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/expression/function/RandomFunction.java b/phoenix-core/src/main/java/org/apache/phoenix/expression/function/RandomFunction.java new file mode 100644 index 0000000..ffa405b --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/expression/function/RandomFunction.java @@ -0,0 +1,134 @@ +package org.apache.phoenix.expression.function; + +import java.io.DataInput; +import java.io.IOException; +import java.util.List; +import java.util.Random; + +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.phoenix.expression.Determinism; +import org.apache.phoenix.expression.Expression; +import org.apache.phoenix.expression.LiteralExpression; +import org.apache.phoenix.parse.FunctionParseNode.Argument; +import org.apache.phoenix.parse.FunctionParseNode.BuiltInFunction; +import org.apache.phoenix.schema.tuple.Tuple; +import org.apache.phoenix.schema.types.PDataType; +import org.apache.phoenix.schema.types.PLong; +import org.apache.phoenix.schema.types.PUnsignedDouble; + +/** + * Random function that produces a unique value upon each invocation unless a seed is provided. + * If a seed is provided the returned value is identical across each invocation for a single row, but different across multiple rows. + * The seed must be a constant. + * <p> + * Example: + * <pre> + * 0: jdbc:phoenix:localhost> select rand(), rand(), rand(1), rand(2), rand(1) from t; + * +----------------------------+----------------------------+----------------------------+----------------------------+-----------------------+ + * | RAND() | RAND() | RAND(1) | RAND(2) | RAND(1) | + * +----------------------------+----------------------------+----------------------------+----------------------------+-----------------------+ + * | 0.18927325291276054 | 0.19335253869230284 | 0.7308781907032909 | 0.7311469360199058 | 0.7308781907032909 | + * | 0.08156917775368278 | 0.10178318739559034 | 0.41008081149220166 | 0.9014476240300544 | 0.41008081149220166 | + * +----------------------------+----------------------------+----------------------------+----------------------------+-----------------------+ + * 2 rows selected (0.096 seconds) + * 0: jdbc:phoenix:localhost> select rand(), rand(), rand(1), rand(2), rand(1) from t; + * +----------------------------+----------------------------+----------------------------+----------------------------+-----------------------+ + * | RAND() | RAND() | RAND(1) | RAND(2) | RAND(1) | + * +----------------------------+----------------------------+----------------------------+----------------------------+-----------------------+ + * | 0.6452639556507597 | 0.8167638693890659 | 0.7308781907032909 | 0.7311469360199058 | 0.7308781907032909 | + * | 0.8084646053276106 | 0.6969504742211767 | 0.41008081149220166 | 0.9014476240300544 | 0.41008081149220166 | + * +----------------------------+----------------------------+----------------------------+----------------------------+-----------------------+ + * 2 rows selected (0.098 seconds) + * </pre> + */ +@BuiltInFunction(name = RandomFunction.NAME, args = {@Argument(allowedTypes={PLong.class},defaultValue="null",isConstant=true)}) +public class RandomFunction extends ScalarFunction { + public static final String NAME = "RAND"; + private Random random; + private boolean hasSeed; + private Double current; + + public RandomFunction() { + } + + public RandomFunction(List<Expression> children) { + super(children); + init(); + } + + private void init() { + Number seed = (Number)((LiteralExpression)children.get(0)).getValue(); + random = seed == null ? new Random() : new Random(seed.longValue()); + hasSeed = seed != null; + current = null; + } + + @Override + public void readFields(DataInput input) throws IOException { + super.readFields(input); + init(); + } + + @Override + public boolean evaluate(Tuple tuple, ImmutableBytesWritable ptr) { + if (current == null) { + current = random.nextDouble(); + } + ptr.set(PUnsignedDouble.INSTANCE.toBytes(current)); + return true; + } + + // produce a new random value for each row + @Override + public void reset() { + super.reset(); + current = null; + } + + @Override + public PDataType<?> getDataType() { + return PUnsignedDouble.INSTANCE; + } + + @Override + public String getName() { + return NAME; + } + + @Override + public Determinism getDeterminism() { + return Determinism.PER_INVOCATION; + } + + @Override + public boolean isStateless() { + return true; + } + + // take the random object onto account + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + result = prime * result + children.hashCode() + random.hashCode(); + return result; + } + + // take the random object onto account + @Override + public boolean equals(Object obj) { + return super.equals(obj) && random.equals(((RandomFunction)obj).random); + } + + // make sure we do not show the default 'null' parameter + @Override + public final String toString() { + StringBuilder buf = new StringBuilder(getName() + "("); + if (!hasSeed) return buf.append(")").toString(); + for (int i = 0; i < children.size() - 1; i++) { + buf.append(children.get(i) + ", "); + } + buf.append(children.get(children.size()-1) + ")"); + return buf.toString(); + } +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/97a906bc/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixResultSet.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixResultSet.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixResultSet.java index 1630dbd..7d91dbb 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixResultSet.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixResultSet.java @@ -49,13 +49,15 @@ import org.apache.phoenix.compile.RowProjector; import org.apache.phoenix.exception.SQLExceptionCode; import org.apache.phoenix.exception.SQLExceptionInfo; import org.apache.phoenix.iterate.ResultIterator; -import org.apache.phoenix.schema.types.PDecimal; +import org.apache.phoenix.schema.tuple.ResultTuple; +import org.apache.phoenix.schema.tuple.Tuple; import org.apache.phoenix.schema.types.PBoolean; +import org.apache.phoenix.schema.types.PDataType; import org.apache.phoenix.schema.types.PDate; +import org.apache.phoenix.schema.types.PDecimal; import org.apache.phoenix.schema.types.PDouble; import org.apache.phoenix.schema.types.PFloat; import org.apache.phoenix.schema.types.PInteger; -import org.apache.phoenix.schema.types.PDataType; import org.apache.phoenix.schema.types.PLong; import org.apache.phoenix.schema.types.PSmallint; import org.apache.phoenix.schema.types.PTime; @@ -63,8 +65,6 @@ import org.apache.phoenix.schema.types.PTimestamp; import org.apache.phoenix.schema.types.PTinyint; import org.apache.phoenix.schema.types.PVarbinary; import org.apache.phoenix.schema.types.PVarchar; -import org.apache.phoenix.schema.tuple.ResultTuple; -import org.apache.phoenix.schema.tuple.Tuple; import org.apache.phoenix.util.DateUtil; import org.apache.phoenix.util.SQLCloseable; @@ -762,6 +762,7 @@ public class PhoenixResultSet implements ResultSet, SQLCloseable, org.apache.pho checkOpen(); try { currentRow = scanner.next(); + rowProjector.reset(); } catch (RuntimeException e) { // FIXME: Expression.evaluate does not throw SQLException // so this will unwrap throws from that. http://git-wip-us.apache.org/repos/asf/phoenix/blob/97a906bc/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordReader.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordReader.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordReader.java index 2c206ab..308f0ad 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordReader.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordReader.java @@ -108,8 +108,9 @@ public class PhoenixRecordReader<T extends DBWritable> extends RecordReader<Null if(queryPlan.getContext().getSequenceManager().getSequenceCount() > 0) { iterator = new SequenceResultIterator(iterator, queryPlan.getContext().getSequenceManager()); } - this.resultIterator = iterator; - this.resultSet = new PhoenixResultSet(this.resultIterator, queryPlan.getProjector(),queryPlan.getContext().getStatement()); + // Clone the row projector as it's not thread safe and would be used simultaneously by + // multiple threads otherwise. + this.resultSet = new PhoenixResultSet(this.resultIterator, queryPlan.getProjector().clone(),queryPlan.getContext().getStatement()); } catch (SQLException e) { LOG.error(String.format(" Error [%s] initializing PhoenixRecordReader. ",e.getMessage())); Throwables.propagate(e);