I've been writing some toy experimental strategies which end up adding
UnaryExec nodes to the plan. For some reason though my "PlanLater" nodes
end up being ignored and end up in the final physical plan. Is there
anything in general that I might be missing?
I'm doing my sample work on 2.1.X and adding the strategy through
experimentalMethods
Here is an abbreviated code sample
class CassandraDirectJoinStrategy extends Strategy with Serializable {
import CassandraDirectJoinStrategy._
override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
case ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, condition,
left, right)
if (validJoinBranch(left, leftKeys) || validJoinBranch(right,
rightKeys)) =>
//TODO Check which side should be the target
val (otherBranch, joinTargetBranch, joinKeys, buildType) = {
if (validJoinBranch(left, leftKeys)){
(right, left, leftKeys, BuildLeft)
} else {
(left, right, rightKeys, BuildRight)
}
}
logDebug(s"Performing Direct Cassandra Join against $joinTargetBranch")
new CassandraDirectJoinExec(
leftKeys,
rightKeys,
joinType,
buildType,
condition,
planLater(otherBranch),
joinTargetBranch) :: Nil
case _ => Nil
}
}
== Parsed Logical Plan ==
Join Inner, (k#267 = k#270)
:- LocalRelation [k#267, v#268]
+- Relation[k#270,v#271]
org.apache.spark.sql.cassandra.CassandraSourceRelation@1237db97
== Analyzed Logical Plan ==
k: int, v: int, k: int, v: int
Join Inner, (k#267 = k#270)
:- LocalRelation [k#267, v#268]
+- Relation[k#270,v#271]
org.apache.spark.sql.cassandra.CassandraSourceRelation@1237db97
== Optimized Logical Plan ==
Join Inner, (k#267 = k#270)
:- LocalRelation [k#267, v#268]
+- Relation[k#270,v#271]
org.apache.spark.sql.cassandra.CassandraSourceRelation@1237db97
== Physical Plan ==
CassandraDirectJoin
+- PlanLater LocalRelation [k#267, v#268]