I've been writing some toy experimental strategies which end up adding
UnaryExec nodes to the plan. For some reason though my "PlanLater" nodes
end up being ignored and end up in the final physical plan. Is there
anything in general that I might be missing?

I'm doing my sample work on 2.1.X and adding the strategy through
experimentalMethods

Here is an abbreviated code sample

class CassandraDirectJoinStrategy extends Strategy with Serializable {
  import CassandraDirectJoinStrategy._

  override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
    case ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, condition,
left, right)
      if (validJoinBranch(left, leftKeys) || validJoinBranch(right,
rightKeys)) =>

      //TODO Check which side should be the target
      val (otherBranch, joinTargetBranch, joinKeys, buildType) = {
        if (validJoinBranch(left, leftKeys)){
          (right, left, leftKeys, BuildLeft)
        } else {
          (left, right, rightKeys, BuildRight)
        }
      }

      logDebug(s"Performing Direct Cassandra Join against $joinTargetBranch")

      new CassandraDirectJoinExec(
        leftKeys,
        rightKeys,
        joinType,
        buildType,
        condition,
        planLater(otherBranch),
        joinTargetBranch) :: Nil
    case _ => Nil
  }
}

== Parsed Logical Plan ==
Join Inner, (k#267 = k#270)
:- LocalRelation [k#267, v#268]
+- Relation[k#270,v#271]
org.apache.spark.sql.cassandra.CassandraSourceRelation@1237db97

== Analyzed Logical Plan ==
k: int, v: int, k: int, v: int
Join Inner, (k#267 = k#270)
:- LocalRelation [k#267, v#268]
+- Relation[k#270,v#271]
org.apache.spark.sql.cassandra.CassandraSourceRelation@1237db97

== Optimized Logical Plan ==
Join Inner, (k#267 = k#270)
:- LocalRelation [k#267, v#268]
+- Relation[k#270,v#271]
org.apache.spark.sql.cassandra.CassandraSourceRelation@1237db97

== Physical Plan ==
CassandraDirectJoin
+- PlanLater LocalRelation [k#267, v#268]

Reply via email to