[
https://issues.apache.org/jira/browse/FLINK-19103?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
fa zheng updated FLINK-19103:
-----------------------------
Description:
The PushPartitionIntoTableSourceScanRule will obtain new statistic after
pruning, however, it uses a for loop to get statistics of each partitions and
then merge them together. During this process, flink will try to call
metastore's interface four times in one loop. When remaining partitions are
huge, it spends a lot of time to get new statistic.
{code:scala}
val newStatistic = {
val tableStats = catalogOption match {
case Some(catalog) =>
def mergePartitionStats(): TableStats = {
var stats: TableStats = null
for (p <- remainingPartitions) {
getPartitionStats(catalog, tableIdentifier, p) match {
case Some(currStats) =>
if (stats == null) {
stats = currStats
} else {
stats = stats.merge(currStats)
}
case None => return null
}
}
stats
}
mergePartitionStats()
case None => null
}
FlinkStatistic.builder().statistic(statistic).tableStats(tableStats).build()
}
{code}
was:
The PushPartitionIntoTableSourceScanRule will obtain new statistic after
pruning, however, it use a for loop to get statistics of each partitions and
then merge them together. During this process, flink will try to call
metastore's interface four times in one loop. When remaining partitions are
huge, it spends a lot of time to get new statistic.
{code:scala}
val newStatistic = {
val tableStats = catalogOption match {
case Some(catalog) =>
def mergePartitionStats(): TableStats = {
var stats: TableStats = null
for (p <- remainingPartitions) {
getPartitionStats(catalog, tableIdentifier, p) match {
case Some(currStats) =>
if (stats == null) {
stats = currStats
} else {
stats = stats.merge(currStats)
}
case None => return null
}
}
stats
}
mergePartitionStats()
case None => null
}
FlinkStatistic.builder().statistic(statistic).tableStats(tableStats).build()
}
{code}
> The PushPartitionIntoTableSourceScanRule will lead a performance problem when
> there are still many partitions after pruning
> ---------------------------------------------------------------------------------------------------------------------------
>
> Key: FLINK-19103
> URL: https://issues.apache.org/jira/browse/FLINK-19103
> Project: Flink
> Issue Type: Improvement
> Components: Table SQL / Planner
> Affects Versions: 1.10.2, 1.11.1
> Reporter: fa zheng
> Priority: Major
> Fix For: 1.12.0
>
>
> The PushPartitionIntoTableSourceScanRule will obtain new statistic after
> pruning, however, it uses a for loop to get statistics of each partitions and
> then merge them together. During this process, flink will try to call
> metastore's interface four times in one loop. When remaining partitions are
> huge, it spends a lot of time to get new statistic.
>
> {code:scala}
> val newStatistic = {
> val tableStats = catalogOption match {
> case Some(catalog) =>
> def mergePartitionStats(): TableStats = {
> var stats: TableStats = null
> for (p <- remainingPartitions) {
> getPartitionStats(catalog, tableIdentifier, p) match {
> case Some(currStats) =>
> if (stats == null) {
> stats = currStats
> } else {
> stats = stats.merge(currStats)
> }
> case None => return null
> }
> }
> stats
> }
> mergePartitionStats()
> case None => null
> }
>
> FlinkStatistic.builder().statistic(statistic).tableStats(tableStats).build()
> }
> {code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)