Repository: spark Updated Branches: refs/heads/master 674eb2a4c -> affbe329a
[SPARK-9071][SQL] MonotonicallyIncreasingID and SparkPartitionID should be marked as nondeterministic. I also took the chance to more explicitly define the semantics of deterministic. Author: Reynold Xin <[email protected]> Closes #7428 from rxin/non-deterministic and squashes the following commits: a760827 [Reynold Xin] [SPARK-9071][SQL] MonotonicallyIncreasingID and SparkPartitionID should be marked as nondeterministic. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/affbe329 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/affbe329 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/affbe329 Branch: refs/heads/master Commit: affbe329ae0100bd50a3c3fb081b0f2b07efce33 Parents: 674eb2a Author: Reynold Xin <[email protected]> Authored: Wed Jul 15 14:52:02 2015 -0700 Committer: Reynold Xin <[email protected]> Committed: Wed Jul 15 14:52:02 2015 -0700 ---------------------------------------------------------------------- .../spark/sql/catalyst/expressions/Expression.scala | 10 ++++++++-- .../execution/expressions/MonotonicallyIncreasingID.scala | 4 +++- .../sql/execution/expressions/SparkPartitionID.scala | 4 +++- 3 files changed, 14 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/affbe329/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala index 3f19ac2..7b37ae7 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala @@ -61,9 +61,15 @@ abstract class Expression extends TreeNode[Expression] { def foldable: Boolean = false /** - * Returns true when the current expression always return the same result for fixed input values. + * Returns true when the current expression always return the same result for fixed inputs from + * children. + * + * Note that this means that an expression should be considered as non-deterministic if: + * - if it relies on some mutable internal state, or + * - if it relies on some implicit input that is not part of the children expression list. + * + * An example would be `SparkPartitionID` that relies on the partition id returned by TaskContext. */ - // TODO: Need to define explicit input values vs implicit input values. def deterministic: Boolean = true def nullable: Boolean http://git-wip-us.apache.org/repos/asf/spark/blob/affbe329/sql/core/src/main/scala/org/apache/spark/sql/execution/expressions/MonotonicallyIncreasingID.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/expressions/MonotonicallyIncreasingID.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/expressions/MonotonicallyIncreasingID.scala index 69a3775..fec403f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/expressions/MonotonicallyIncreasingID.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/expressions/MonotonicallyIncreasingID.scala @@ -41,7 +41,9 @@ private[sql] case class MonotonicallyIncreasingID() extends LeafExpression { */ @transient private[this] var count: Long = 0L - @transient private lazy val partitionMask = TaskContext.getPartitionId.toLong << 33 + @transient private lazy val partitionMask = TaskContext.getPartitionId().toLong << 33 + + override def deterministic: Boolean = false override def nullable: Boolean = false http://git-wip-us.apache.org/repos/asf/spark/blob/affbe329/sql/core/src/main/scala/org/apache/spark/sql/execution/expressions/SparkPartitionID.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/expressions/SparkPartitionID.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/expressions/SparkPartitionID.scala index 5f1b514..7c790c5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/expressions/SparkPartitionID.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/expressions/SparkPartitionID.scala @@ -29,11 +29,13 @@ import org.apache.spark.sql.types.{IntegerType, DataType} */ private[sql] case object SparkPartitionID extends LeafExpression { + override def deterministic: Boolean = false + override def nullable: Boolean = false override def dataType: DataType = IntegerType - @transient private lazy val partitionId = TaskContext.getPartitionId + @transient private lazy val partitionId = TaskContext.getPartitionId() override def eval(input: InternalRow): Int = partitionId --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
