[ 
https://issues.apache.org/jira/browse/SPARK-6910?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14557628#comment-14557628
 ] 

Cheolsoo Park commented on SPARK-6910:
--------------------------------------

I don't know what's the best way to fix this, but here is how I work around it 
at work.

*Fortunately*, I have an internal metadata service (similar to HCat) at work 
that takes a string representation of filter expression and returns a list of 
partitions. So I can simply push down predicates in Analyzer, convert them to a 
string, and call my metadata service. This approach only involves isolated 
changes to Analyzer and Catalog.

*Unfortunately*, I couldn't make my solution fully work with Hive metastore. 
Even though Hive metastore client api has a similar method called 
{{getPartitionsByFilter()}}, it only works with string type partition keys, 
which is not so practical in real world.

Although this issue is no longer blocking me, I am curious how it can be fixed 
with Hive metastore. The following are code snippets that are not specific to 
my internal metastore service. Feel free to use it if it helps anyhow-
# I added predicate pushdown in Analyzer.ReloveRelations.
{code}
object ResolveRelations extends Rule[LogicalPlan] {
  def getTable(u: UnresolvedRelation, predicates: Seq[Expression] = Nil): 
LogicalPlan = {
    try {
      catalog.lookupRelation(u.tableIdentifier, u.alias, predicates) // push 
down predicates into catalog
    } catch {
      case _: NoSuchTableException =>
        u.failAnalysis(s"no such table ${u.tableName}")
    }
  }

  def apply(plan: LogicalPlan): LogicalPlan = plan transform {
    case i @ InsertIntoTable(u: UnresolvedRelation, _, _, _, _) =>
      i.copy(table = EliminateSubQueries(getTable(u)))
    case f @ Filter(condition, u: UnresolvedRelation) => { // find any filter 
operator that has a unresolved relation child.
      f match {
        case FilteredOperation(predicates, _) => // extract conditions from 
filter operator
          f.copy(condition, getTable(u, predicates))
      }
    }
    case u: UnresolvedRelation =>
      getTable(u)
  }
}
{code}
# In HiveMetastoreCatalog, I extract partition columns from predicates and 
build a string that represents the filter expression.
{code}
val partitionFilter: String =
  if (predicates.nonEmpty) {
    val partitionKeys: Set[String] = franklinTable.partition_keys().toSet
    val (pruningPredicates, _) = predicates.partition { predicate =>
    val refs: Set[String] = predicate.references.map(_.name).toSet
    refs.nonEmpty && refs.subsetOf(partitionKeys) &&
        predicate.isInstanceOf[BinaryComparison]
    }

    pruningPredicates.foldLeft("") { (str, expr) =>
      toFranklinExpression(str, expr, fieldType) // convert predicates to a 
string representation
    }
  } else {
    ""
  }
{code}


> Support for pushing predicates down to metastore for partition pruning
> ----------------------------------------------------------------------
>
>                 Key: SPARK-6910
>                 URL: https://issues.apache.org/jira/browse/SPARK-6910
>             Project: Spark
>          Issue Type: Sub-task
>          Components: SQL
>            Reporter: Michael Armbrust
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to