Ma Jian created HUDI-7246:
-----------------------------

             Summary: Data Skipping Issue: No Results When Query Conditions 
Involve Both Columns with and without Column Stats
                 Key: HUDI-7246
                 URL: https://issues.apache.org/jira/browse/HUDI-7246
             Project: Apache Hudi
          Issue Type: Bug
            Reporter: Ma Jian


In the current code version, support for column stats has not yet been extended 
to handle complex nested data types, such as map-type data structures. Take the 
table tbl as an example, which is defined with three fields: an integer field 
id, a string field name, and a map-type field attributes. Within this table 
structure, the id and name fields support column stats, and as such, HUDI will 
generate the corresponding column stats indices for these two fields at the 
time of table creation. However, no corresponding index will be generated for 
the attributes field. The specific table creation statement is as follows:
create table tbl (
  id int,
  name string,
  attributes map<string, string>
) ...
To elaborate further, consider the following insert operation:
insert into tbl values
(1, 'a1', map('color', 'red', 'size', 'M')),
(2, 'a2', map('color', 'blue', 'size', 'L'));After the execution of the insert, 
the content of the column stats should be as follows:
a.parquet   id     min: 1   max: 1   null: 0
b.parquet   id     min: 2   max: 2   null: 0
a.parquet   name   min: 'a1' max: 'a1' null: 0
b.parquet   name   min: 'a2' max: 'a2' null: 0{{}}
This means that there is no column stats index for the attributes column.

Based on the table tbl, when we execute a query:


h3. 1.Queries containing only columns supported by column stats:

At this point, the data skipping code looks like this:
columnStatsIndex.loadTransposed(queryReferencedColumns, shouldReadInMemory) \{ 
transposedColStatsDF =>
  Some(getCandidateFiles(transposedColStatsDF, queryFilters))
}
The content of queryReferencedColumns is name, and the content of queryFilters 
is isnotnull(name#94) and (name#94 = a1). The transposedColStatsDF is then 
based on the queryReferencedColumns to select the corresponding column stats:
+--------------------+----------+-------------+-------------+--------------+
|            fileName|valueCount|name_minValue|name_maxValue|name_nullCount|
+--------------------+----------+-------------+-------------+--------------+
|688f0d1e-527b-480...|         1|           a1|           a1|             0|
|08999951-faa4-48e...|         1|           a2|           a2|             0|
+--------------------+----------+-------------+-------------+--------------+{{}}
Inside the getCandidateFiles function, indexSchema and indexFilter are similar 
to the two parameters above, with the main difference being that in 
indexFilter, isnotnull(name#94) is converted into ('name_nullCount < 
'valueCount). This judges that the name column is not null based on the number 
of non-nulls being less than the total count. Thus, prunedCandidateFileNames 
can correctly filter out the required files.
private def getCandidateFiles(indexDf: DataFrame, queryFilters: 
Seq[Expression]): Set[String] = \{
  val indexSchema = indexDf.schema
  val indexFilter = queryFilters.map(translateIntoColumnStatsIndexFilterExpr(_, 
indexSchema)).reduce(And)
  val prunedCandidateFileNames =
    indexDf.where(new Column(indexFilter))
      .select(HoodieMetadataPayload.COLUMN_STATS_FIELD_FILE_NAME)
      .collect()
      .map(_.getString(0))
      .toSet
  val allIndexedFileNames =
    indexDf.select(HoodieMetadataPayload.COLUMN_STATS_FIELD_FILE_NAME)
      .collect()
      .map(_.getString(0))
      .toSet
  val notIndexedFileNames = lookupFileNamesMissingFromIndex(allIndexedFileNames)
  prunedCandidateFileNames ++ notIndexedFileNames
}{{}}
h3. 2.Queries containing columns not supported by column stats Suppose our 
query is adjusted to:
select * from tbl where attributes.color = 'red'
At this time, queryReferencedColumns is attributes, and queryFilters are 
isnotnull(attributes#95) and (attributes#95[color] = red). Since 
transposedColStatsDF does not have column stats for this column, it will be 
empty. No matter what the query conditions are, prunedCandidateFileNames and 
allIndexedFileNames in getCandidateFiles will be empty, hitting the logic of 
notIndexedFileNames and returning all files. Thus, the query is still correct.




h3. 3.Queries containing both columns supported and not supported by column 
stats

Let's consider a query adjusted to:
select * from tbl where attributes.color = 'red' and name = 'a1'{{}}
At this point, queryReferencedColumns includes attributes and name, and the 
queryFilters are as follows. The content of transposedColStatsDF is shown below:
+--------------------+----------+-------------------+-------------------+--------------------+-------------+-------------+--------------+
|            
fileName|valueCount|attributes_minValue|attributes_maxValue|attributes_nullCount|name_minValue|name_maxValue|name_nullCount|
+--------------------+----------+-------------------+-------------------+--------------------+-------------+-------------+--------------+
|3363b50e-ca25-4fb...|         1|               null|               null|       
            1|           a2|           a2|             0|
|876af2ed-d529-4df...|         1|               null|               null|       
            1|           a1|           a1|             0|
+--------------------+----------+-------------------+-------------------+--------------------+-------------+-------------+--------------+
This is due to the transpose method called within loadTransposed, where the row 
is returned in the following manner. In case 1, since targetIndexedColumns only 
contains name, the final transposedRows will only contain name. In case 2, 
colStatsRecords is empty, so all subsequent stream code won't execute, and 
transposedRows will be empty.
val transposedRows: HoodieData[Row] = colStatsRecords
  .filter(/* filter logic */)
  .mapToPair(/* map logic */)
  .groupByKey()
  .map(/* map logic */)
In case 3, because colStatsRecords have content, the function inside the last 
map will be executed. The targetIndexedColumns are assigned as follows 
(selecting relevant code from different functions in sequence):
private lazy val indexedColumns: Set[String] = \{
  val customIndexedColumns = metadataConfig.getColumnsEnabledForColumnStatsIndex
  if (customIndexedColumns.isEmpty) {
    tableSchema.fieldNames.toSet
  } else \{
    customIndexedColumns.asScala.toSet
  }
}{{}}
This implies that targetIndexedColumns is the intersection of targetColumnNames 
and indexedColumns, and indexedColumns, by default, includes all the schema 
fields of the table (tableSchema.fieldNames.toSet) when we haven't explicitly 
configured a column stats list. This includes the Map type attributes, and 
targetColumnNames includes columns relevant to the query, so the intersection 
still results in targetColumnNames and name.

Returning to the code for transposedRows, because there are no corresponding 
column stats for attributes, rows where min=null, max=null, and 
null_count=valueCount are added to avoid errors during subsequent where 
filtering. This is the origin of attributes_minValue, attributes_maxValue, and 
attributes_nullCount in the resulting DataFrame.

Let's revisit getCandidateFiles and take a closer look at the assembly of 
indexFilter. The queryFilters mentioned earlier are processed in 
translateIntoColumnStatsIndexFilterExpr, where expressions are transformed, 
defaulting to True if they cannot be converted. We can skip over name since 
it's a string type and can be converted directly.

For map types, there are two expressions: isnotnull(attributes#95) This hits 
the case:
case IsNotNull(attribute: AttributeReference) =>
  getTargetIndexedColumnName(attribute, indexSchema)
    .map(colName => LessThan(genColNumNullsExpr(colName), genColValueCountExpr))
The corresponding value is 'attributes_nullCount < 'valueCount.

However, EqualTo expressions like (attributes#95[color] = red) should, 
theoretically, match equalTo cases but actually do not since 
AllowedTransformationExpression does not support the getMapValue type, 
resulting in a None and no match.

Ultimately, nothing matches, resulting in None, which means the expression for 
attributes ends up being 'attributes_nullCount < 'valueCount And True.

Herein lies a problem:
acc ++= Seq(null, null, valueCount)
The null_count for attributes always equals valueCount, so the expression 
'attributes_nullCount < 'valueCount will always be False. As a result, 
prunedCandidateFileNames will always be empty. However, since indexDf does have 
value, allIndexedFileNames will not be empty, leading notIndexedFileNames to be 
empty.

Consequently, the end result is that getCandidateFiles will always return an 
empty set! This presents a significant issue because the indexFilter 
effectively fails to filter any files based on the attributes column, due to 
the fact that its null_count is always set to match valueCount, rendering the 
condition perpetually false. 


h3. 4.Explicitly specifying the supported index columns based on case 3.

Here is an additional scenario to consider, building on case 3: if the index 
class is explicitly specified, the issue mentioned will not occur.

This is because the following code will hit 
metadataConfig.getColumnsEnabledForColumnStatsIndex, resulting in 
indexedColumns not containing any unspecified columns. Consequently, columns 
that are not supported will not be added to indexDf, and it will remain empty.
private lazy val indexedColumns: Set[String] = \{
  val customIndexedColumns = metadataConfig.getColumnsEnabledForColumnStatsIndex
  // Column Stats Index could index either
  //    - The whole table
  //    - Only configured columns
  if (customIndexedColumns.isEmpty) {
    tableSchema.fieldNames.toSet
  } else \{
    customIndexedColumns.asScala.toSet
  }
}
Simultaneously, indexSchema is also obtained from the schema of indexDf, so it 
will not include any unsupported columns. Therefore, query conditions for 
unsupported columns will also be filtered out in the indexFilter.
val indexSchema = indexDf.schema
val indexFilter = queryFilters.map(translateIntoColumnStatsIndexFilterExpr(_, 
indexSchema)).reduce(And){{}}

h3. Problem Summary

Under the existing code implementation, when data skipping is enabled and the 
index columns have not been explicitly specified in the parameters, if the 
query conditions include both columns that are supported by column stats and 
those that are not, no data will ever be retrieved!
h3. Problem Fix

We can see that the variables directly causing the issue are indexDf and 
indexFilter:
 * indexDf includes unsupported columns, filled with inappropriate values.
 * indexFilter also contains query conditions not supported by the index. And 
indexDf is filled with inappropriate values to adapt to these conditions.
 * Moreover, the unsupported query conditions in indexFilter are obtained from 
filtering the schema derived from indexDf.

Thus, the root of all problems lies in indexDf being filled with unsupported 
columns.

Therefore, when fixing the issue, we consider removing the columns that are not 
supported from the relevant schema before obtaining the column stats records. 
There are two scenarios to consider:
 * If index columns are not specified by default, all columns that support 
column stats are indexed. By removing the unsupported columns, we effectively 
remove the columns that are not indexed. This will not affect the query results.
 * If specific index columns are specified, it might be possible that some 
columns that do support indexing are not indexed. In this case, what we remove 
is not the complete set of non-indexed columns. But as argued in case 4, this 
situation will also not affect the query results.

If subsequent changes or an incomplete consideration of types cause columns 
that do have indexes to be removed, at worst, it might temporarily affect the 
data skipping performance. However, the filter will return to the computation 
layer for further filtering, so it will not impact the query results.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to