[
https://issues.apache.org/jira/browse/SPARK-6910?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14557628#comment-14557628
]
Cheolsoo Park commented on SPARK-6910:
--------------------------------------
I don't know what's the best way to fix this, but here is how I work around it
at work.
*Fortunately*, I have an internal metadata service (similar to HCat) at work
that takes a string representation of filter expression and returns a list of
partitions. So I can simply push down predicates in Analyzer, convert them to a
string, and call my metadata service. This approach only involves isolated
changes to Analyzer and Catalog.
*Unfortunately*, I couldn't make my solution fully work with Hive metastore.
Even though Hive metastore client api has a similar method called
{{getPartitionsByFilter()}}, it only works with string type partition keys,
which is not so practical in real world.
Although this issue is no longer blocking me, I am curious how it can be fixed
with Hive metastore. The following are code snippets that are not specific to
my internal metastore service. Feel free to use it if it helps anyhow-
# I added predicate pushdown in Analyzer.ReloveRelations.
{code}
object ResolveRelations extends Rule[LogicalPlan] {
def getTable(u: UnresolvedRelation, predicates: Seq[Expression] = Nil):
LogicalPlan = {
try {
catalog.lookupRelation(u.tableIdentifier, u.alias, predicates) // push
down predicates into catalog
} catch {
case _: NoSuchTableException =>
u.failAnalysis(s"no such table ${u.tableName}")
}
}
def apply(plan: LogicalPlan): LogicalPlan = plan transform {
case i @ InsertIntoTable(u: UnresolvedRelation, _, _, _, _) =>
i.copy(table = EliminateSubQueries(getTable(u)))
case f @ Filter(condition, u: UnresolvedRelation) => { // find any filter
operator that has a unresolved relation child.
f match {
case FilteredOperation(predicates, _) => // extract conditions from
filter operator
f.copy(condition, getTable(u, predicates))
}
}
case u: UnresolvedRelation =>
getTable(u)
}
}
{code}
# In HiveMetastoreCatalog, I extract partition columns from predicates and
build a string that represents the filter expression.
{code}
val partitionFilter: String =
if (predicates.nonEmpty) {
val partitionKeys: Set[String] = franklinTable.partition_keys().toSet
val (pruningPredicates, _) = predicates.partition { predicate =>
val refs: Set[String] = predicate.references.map(_.name).toSet
refs.nonEmpty && refs.subsetOf(partitionKeys) &&
predicate.isInstanceOf[BinaryComparison]
}
pruningPredicates.foldLeft("") { (str, expr) =>
toFranklinExpression(str, expr, fieldType) // convert predicates to a
string representation
}
} else {
""
}
{code}
> Support for pushing predicates down to metastore for partition pruning
> ----------------------------------------------------------------------
>
> Key: SPARK-6910
> URL: https://issues.apache.org/jira/browse/SPARK-6910
> Project: Spark
> Issue Type: Sub-task
> Components: SQL
> Reporter: Michael Armbrust
>
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]