Ma Jian created HUDI-7246:
-----------------------------
Summary: Data Skipping Issue: No Results When Query Conditions
Involve Both Columns with and without Column Stats
Key: HUDI-7246
URL: https://issues.apache.org/jira/browse/HUDI-7246
Project: Apache Hudi
Issue Type: Bug
Reporter: Ma Jian
In the current code version, support for column stats has not yet been extended
to handle complex nested data types, such as map-type data structures. Take the
table tbl as an example, which is defined with three fields: an integer field
id, a string field name, and a map-type field attributes. Within this table
structure, the id and name fields support column stats, and as such, HUDI will
generate the corresponding column stats indices for these two fields at the
time of table creation. However, no corresponding index will be generated for
the attributes field. The specific table creation statement is as follows:
create table tbl (
id int,
name string,
attributes map<string, string>
) ...
To elaborate further, consider the following insert operation:
insert into tbl values
(1, 'a1', map('color', 'red', 'size', 'M')),
(2, 'a2', map('color', 'blue', 'size', 'L'));After the execution of the insert,
the content of the column stats should be as follows:
a.parquet id min: 1 max: 1 null: 0
b.parquet id min: 2 max: 2 null: 0
a.parquet name min: 'a1' max: 'a1' null: 0
b.parquet name min: 'a2' max: 'a2' null: 0{{}}
This means that there is no column stats index for the attributes column.
Based on the table tbl, when we execute a query:
h3. 1.Queries containing only columns supported by column stats:
At this point, the data skipping code looks like this:
columnStatsIndex.loadTransposed(queryReferencedColumns, shouldReadInMemory) \{
transposedColStatsDF =>
Some(getCandidateFiles(transposedColStatsDF, queryFilters))
}
The content of queryReferencedColumns is name, and the content of queryFilters
is isnotnull(name#94) and (name#94 = a1). The transposedColStatsDF is then
based on the queryReferencedColumns to select the corresponding column stats:
+--------------------+----------+-------------+-------------+--------------+
| fileName|valueCount|name_minValue|name_maxValue|name_nullCount|
+--------------------+----------+-------------+-------------+--------------+
|688f0d1e-527b-480...| 1| a1| a1| 0|
|08999951-faa4-48e...| 1| a2| a2| 0|
+--------------------+----------+-------------+-------------+--------------+{{}}
Inside the getCandidateFiles function, indexSchema and indexFilter are similar
to the two parameters above, with the main difference being that in
indexFilter, isnotnull(name#94) is converted into ('name_nullCount <
'valueCount). This judges that the name column is not null based on the number
of non-nulls being less than the total count. Thus, prunedCandidateFileNames
can correctly filter out the required files.
private def getCandidateFiles(indexDf: DataFrame, queryFilters:
Seq[Expression]): Set[String] = \{
val indexSchema = indexDf.schema
val indexFilter = queryFilters.map(translateIntoColumnStatsIndexFilterExpr(_,
indexSchema)).reduce(And)
val prunedCandidateFileNames =
indexDf.where(new Column(indexFilter))
.select(HoodieMetadataPayload.COLUMN_STATS_FIELD_FILE_NAME)
.collect()
.map(_.getString(0))
.toSet
val allIndexedFileNames =
indexDf.select(HoodieMetadataPayload.COLUMN_STATS_FIELD_FILE_NAME)
.collect()
.map(_.getString(0))
.toSet
val notIndexedFileNames = lookupFileNamesMissingFromIndex(allIndexedFileNames)
prunedCandidateFileNames ++ notIndexedFileNames
}{{}}
h3. 2.Queries containing columns not supported by column stats Suppose our
query is adjusted to:
select * from tbl where attributes.color = 'red'
At this time, queryReferencedColumns is attributes, and queryFilters are
isnotnull(attributes#95) and (attributes#95[color] = red). Since
transposedColStatsDF does not have column stats for this column, it will be
empty. No matter what the query conditions are, prunedCandidateFileNames and
allIndexedFileNames in getCandidateFiles will be empty, hitting the logic of
notIndexedFileNames and returning all files. Thus, the query is still correct.
h3. 3.Queries containing both columns supported and not supported by column
stats
Let's consider a query adjusted to:
select * from tbl where attributes.color = 'red' and name = 'a1'{{}}
At this point, queryReferencedColumns includes attributes and name, and the
queryFilters are as follows. The content of transposedColStatsDF is shown below:
+--------------------+----------+-------------------+-------------------+--------------------+-------------+-------------+--------------+
|
fileName|valueCount|attributes_minValue|attributes_maxValue|attributes_nullCount|name_minValue|name_maxValue|name_nullCount|
+--------------------+----------+-------------------+-------------------+--------------------+-------------+-------------+--------------+
|3363b50e-ca25-4fb...| 1| null| null|
1| a2| a2| 0|
|876af2ed-d529-4df...| 1| null| null|
1| a1| a1| 0|
+--------------------+----------+-------------------+-------------------+--------------------+-------------+-------------+--------------+
This is due to the transpose method called within loadTransposed, where the row
is returned in the following manner. In case 1, since targetIndexedColumns only
contains name, the final transposedRows will only contain name. In case 2,
colStatsRecords is empty, so all subsequent stream code won't execute, and
transposedRows will be empty.
val transposedRows: HoodieData[Row] = colStatsRecords
.filter(/* filter logic */)
.mapToPair(/* map logic */)
.groupByKey()
.map(/* map logic */)
In case 3, because colStatsRecords have content, the function inside the last
map will be executed. The targetIndexedColumns are assigned as follows
(selecting relevant code from different functions in sequence):
private lazy val indexedColumns: Set[String] = \{
val customIndexedColumns = metadataConfig.getColumnsEnabledForColumnStatsIndex
if (customIndexedColumns.isEmpty) {
tableSchema.fieldNames.toSet
} else \{
customIndexedColumns.asScala.toSet
}
}{{}}
This implies that targetIndexedColumns is the intersection of targetColumnNames
and indexedColumns, and indexedColumns, by default, includes all the schema
fields of the table (tableSchema.fieldNames.toSet) when we haven't explicitly
configured a column stats list. This includes the Map type attributes, and
targetColumnNames includes columns relevant to the query, so the intersection
still results in targetColumnNames and name.
Returning to the code for transposedRows, because there are no corresponding
column stats for attributes, rows where min=null, max=null, and
null_count=valueCount are added to avoid errors during subsequent where
filtering. This is the origin of attributes_minValue, attributes_maxValue, and
attributes_nullCount in the resulting DataFrame.
Let's revisit getCandidateFiles and take a closer look at the assembly of
indexFilter. The queryFilters mentioned earlier are processed in
translateIntoColumnStatsIndexFilterExpr, where expressions are transformed,
defaulting to True if they cannot be converted. We can skip over name since
it's a string type and can be converted directly.
For map types, there are two expressions: isnotnull(attributes#95) This hits
the case:
case IsNotNull(attribute: AttributeReference) =>
getTargetIndexedColumnName(attribute, indexSchema)
.map(colName => LessThan(genColNumNullsExpr(colName), genColValueCountExpr))
The corresponding value is 'attributes_nullCount < 'valueCount.
However, EqualTo expressions like (attributes#95[color] = red) should,
theoretically, match equalTo cases but actually do not since
AllowedTransformationExpression does not support the getMapValue type,
resulting in a None and no match.
Ultimately, nothing matches, resulting in None, which means the expression for
attributes ends up being 'attributes_nullCount < 'valueCount And True.
Herein lies a problem:
acc ++= Seq(null, null, valueCount)
The null_count for attributes always equals valueCount, so the expression
'attributes_nullCount < 'valueCount will always be False. As a result,
prunedCandidateFileNames will always be empty. However, since indexDf does have
value, allIndexedFileNames will not be empty, leading notIndexedFileNames to be
empty.
Consequently, the end result is that getCandidateFiles will always return an
empty set! This presents a significant issue because the indexFilter
effectively fails to filter any files based on the attributes column, due to
the fact that its null_count is always set to match valueCount, rendering the
condition perpetually false.
h3. 4.Explicitly specifying the supported index columns based on case 3.
Here is an additional scenario to consider, building on case 3: if the index
class is explicitly specified, the issue mentioned will not occur.
This is because the following code will hit
metadataConfig.getColumnsEnabledForColumnStatsIndex, resulting in
indexedColumns not containing any unspecified columns. Consequently, columns
that are not supported will not be added to indexDf, and it will remain empty.
private lazy val indexedColumns: Set[String] = \{
val customIndexedColumns = metadataConfig.getColumnsEnabledForColumnStatsIndex
// Column Stats Index could index either
// - The whole table
// - Only configured columns
if (customIndexedColumns.isEmpty) {
tableSchema.fieldNames.toSet
} else \{
customIndexedColumns.asScala.toSet
}
}
Simultaneously, indexSchema is also obtained from the schema of indexDf, so it
will not include any unsupported columns. Therefore, query conditions for
unsupported columns will also be filtered out in the indexFilter.
val indexSchema = indexDf.schema
val indexFilter = queryFilters.map(translateIntoColumnStatsIndexFilterExpr(_,
indexSchema)).reduce(And){{}}
h3. Problem Summary
Under the existing code implementation, when data skipping is enabled and the
index columns have not been explicitly specified in the parameters, if the
query conditions include both columns that are supported by column stats and
those that are not, no data will ever be retrieved!
h3. Problem Fix
We can see that the variables directly causing the issue are indexDf and
indexFilter:
* indexDf includes unsupported columns, filled with inappropriate values.
* indexFilter also contains query conditions not supported by the index. And
indexDf is filled with inappropriate values to adapt to these conditions.
* Moreover, the unsupported query conditions in indexFilter are obtained from
filtering the schema derived from indexDf.
Thus, the root of all problems lies in indexDf being filled with unsupported
columns.
Therefore, when fixing the issue, we consider removing the columns that are not
supported from the relevant schema before obtaining the column stats records.
There are two scenarios to consider:
* If index columns are not specified by default, all columns that support
column stats are indexed. By removing the unsupported columns, we effectively
remove the columns that are not indexed. This will not affect the query results.
* If specific index columns are specified, it might be possible that some
columns that do support indexing are not indexed. In this case, what we remove
is not the complete set of non-indexed columns. But as argued in case 4, this
situation will also not affect the query results.
If subsequent changes or an incomplete consideration of types cause columns
that do have indexes to be removed, at worst, it might temporarily affect the
data skipping performance. However, the filter will return to the computation
layer for further filtering, so it will not impact the query results.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)