majian1998 opened a new pull request, #10389:
URL: https://github.com/apache/hudi/pull/10389

   In the current code version, support for column stats has not yet been 
extended to handle complex nested data types, such as map-type data structures. 
Take the table tbl as an example, which is defined with three fields: an 
integer field id, a string field name, and a map-type field attributes. Within 
this table structure, the id and name fields support column stats, and as such, 
HUDI will generate the corresponding column stats indices for these two fields 
at the time of table creation. However, no corresponding index will be 
generated for the attributes field. The specific table creation statement is as 
follows:
   ```
   create table tbl (
     id int,
     name string,
     attributes map<string, string>
   ) ...
   ```
   To elaborate further, consider the following insert operation:
   ```
   insert into tbl values
   (1, 'a1', map('color', 'red', 'size', 'M')),
   (2, 'a2', map('color', 'blue', 'size', 'L'));
   ```
   After the execution of the insert, the content of the column stats should be 
as follows:
   ```
   a.parquet   id     min: 1   max: 1   null: 0
   b.parquet   id     min: 2   max: 2   null: 0
   a.parquet   name   min: 'a1' max: 'a1' null: 0
   b.parquet   name   min: 'a2' max: 'a2' null: 0
   ```
   This means that there is no column stats index for the attributes column.
   Based on the table tbl, when we execute a query:
   
   #### 1.Queries containing only columns supported by column stats:
   `select * from tbl where name = 'a1'`
   At this point, the data skipping code looks like this:
   ```
   columnStatsIndex.loadTransposed(queryReferencedColumns, shouldReadInMemory) 
{ transposedColStatsDF =>
     Some(getCandidateFiles(transposedColStatsDF, queryFilters))
   }
   ```
   The content of queryReferencedColumns is name, and the content of 
queryFilters is isnotnull(name#94) and (name#94 = a1). The transposedColStatsDF 
is then based on the queryReferencedColumns to select the corresponding column 
stats:
   ```
   +--------------------+----------+-------------+-------------+--------------+
   |            fileName|valueCount|name_minValue|name_maxValue|name_nullCount|
   +--------------------+----------+-------------+-------------+--------------+
   |688f0d1e-527b-480...|         1|           a1|           a1|             0|
   |08999951-faa4-48e...|         1|           a2|           a2|             0|
   +--------------------+----------+-------------+-------------+--------------+
   ```
   Inside the getCandidateFiles function, indexSchema and indexFilter are 
similar to the two parameters above, with the main difference being that in 
indexFilter, isnotnull(name#94) is converted into ('name_nullCount < 
'valueCount). This judges that the name column is not null based on the number 
of non-nulls being less than the total count. Thus, prunedCandidateFileNames 
can correctly filter out the required files.
   ```
   private def getCandidateFiles(indexDf: DataFrame, queryFilters: 
Seq[Expression]): Set[String] = {
     val indexSchema = indexDf.schema
     val indexFilter = 
queryFilters.map(translateIntoColumnStatsIndexFilterExpr(_, 
indexSchema)).reduce(And)
     val prunedCandidateFileNames =
       indexDf.where(new Column(indexFilter))
         .select(HoodieMetadataPayload.COLUMN_STATS_FIELD_FILE_NAME)
         .collect()
         .map(_.getString(0))
         .toSet
     val allIndexedFileNames =
       indexDf.select(HoodieMetadataPayload.COLUMN_STATS_FIELD_FILE_NAME)
         .collect()
         .map(_.getString(0))
         .toSet
     val notIndexedFileNames = 
lookupFileNamesMissingFromIndex(allIndexedFileNames)
     prunedCandidateFileNames ++ notIndexedFileNames
   }
   ```
   
   #### 2.Queries containing columns not supported by column stats Suppose our 
query is adjusted to:
   `select * from tbl where attributes.color = 'red'`
   At this time, queryReferencedColumns is attributes, and queryFilters are 
isnotnull(attributes#95) and (attributes#95[color] = red). Since 
transposedColStatsDF does not have column stats for this column, it will be 
empty. No matter what the query conditions are, prunedCandidateFileNames and 
allIndexedFileNames in getCandidateFiles will be empty, hitting the logic of 
notIndexedFileNames and returning all files. Thus, the query is still correct.
   
   
   #### 3.Queries containing both columns supported and not supported by column 
stats
   Let's consider a query adjusted to:
   `select * from tbl where attributes.color = 'red' and name = 'a1'`
   At this point, queryReferencedColumns includes attributes and name, and the 
queryFilters are as follows. The content of transposedColStatsDF is shown below:
   ```
   
+--------------------+----------+-------------------+-------------------+--------------------+-------------+-------------+--------------+
   |            
fileName|valueCount|attributes_minValue|attributes_maxValue|attributes_nullCount|name_minValue|name_maxValue|name_nullCount|
   
+--------------------+----------+-------------------+-------------------+--------------------+-------------+-------------+--------------+
   |3363b50e-ca25-4fb...|         1|               null|               null|    
               1|           a2|           a2|             0|
   |876af2ed-d529-4df...|         1|               null|               null|    
               1|           a1|           a1|             0|
   
+--------------------+----------+-------------------+-------------------+--------------------+-------------+-------------+--------------+
   ```
   This is due to the transpose method called within loadTransposed, where the 
row is returned in the following manner. In case 1, since targetIndexedColumns 
only contains name, the final transposedRows will only contain name. In case 2, 
colStatsRecords is empty, so all subsequent stream code won't execute, and 
transposedRows will be empty.
   ```
   val transposedRows: HoodieData[Row] = colStatsRecords
     .filter(/* filter logic */)
     .mapToPair(/* map logic */)
     .groupByKey()
     .map(/* map logic */)
   ```
   
   In case 3, because colStatsRecords have content, the function inside the 
last map will be executed. The targetIndexedColumns are assigned as follows 
(selecting relevant code from different functions in sequence):
   ```
   private lazy val indexedColumns: Set[String] = {
     val customIndexedColumns = 
metadataConfig.getColumnsEnabledForColumnStatsIndex
     if (customIndexedColumns.isEmpty) {
       tableSchema.fieldNames.toSet
     } else {
       customIndexedColumns.asScala.toSet
     }
   }
   ```
   
   This implies that targetIndexedColumns is the intersection of 
targetColumnNames and indexedColumns, and indexedColumns, by default, includes 
all the schema fields of the table (tableSchema.fieldNames.toSet) when we 
haven't explicitly configured a column stats list. This includes the Map type 
attributes, and targetColumnNames includes columns relevant to the query, so 
the intersection still results in targetColumnNames and name.
   Returning to the code for transposedRows, because there are no corresponding 
column stats for attributes, rows where min=null, max=null, and 
null_count=valueCount are added to avoid errors during subsequent where 
filtering. This is the origin of attributes_minValue, attributes_maxValue, and 
attributes_nullCount in the resulting DataFrame.
   Let's revisit getCandidateFiles and take a closer look at the assembly of 
indexFilter. The queryFilters mentioned earlier are processed in 
translateIntoColumnStatsIndexFilterExpr, where expressions are transformed, 
defaulting to True if they cannot be converted. We can skip over name since 
it's a string type and can be converted directly.
   For map types, there are two expressions: isnotnull(attributes#95) This hits 
the case:
   ```
   case IsNotNull(attribute: AttributeReference) =>
     getTargetIndexedColumnName(attribute, indexSchema)
       .map(colName => LessThan(genColNumNullsExpr(colName), 
genColValueCountExpr))
   ```
   The corresponding value is 'attributes_nullCount < 'valueCount.
   However, EqualTo expressions like (attributes#95[color] = red) should, 
theoretically, match equalTo cases but actually do not since 
AllowedTransformationExpression does not support the getMapValue type, 
resulting in a None and no match.
   Ultimately, nothing matches, resulting in None, which means the expression 
for attributes ends up being 'attributes_nullCount < 'valueCount And True.
   Herein lies a problem:
   `acc ++= Seq(null, null, valueCount)`
   The null_count for attributes always equals valueCount, so the expression 
'attributes_nullCount < 'valueCount will always be False. As a result, 
prunedCandidateFileNames will always be empty. However, since indexDf does have 
value, allIndexedFileNames will not be empty, leading notIndexedFileNames to be 
empty.
   Consequently, the end result is that getCandidateFiles will always return an 
empty set! This presents a significant issue because the indexFilter 
effectively fails to filter any files based on the attributes column, due to 
the fact that its null_count is always set to match valueCount, rendering the 
condition perpetually false. 
   
   #### 4.Explicitly specifying the supported index columns based on case 3.
   Here is an additional scenario to consider, building on case 3: if the index 
class is explicitly specified, the issue mentioned will not occur.
   This is because the following code will hit 
metadataConfig.getColumnsEnabledForColumnStatsIndex, resulting in 
indexedColumns not containing any unspecified columns. Consequently, columns 
that are not supported will not be added to indexDf, and it will remain empty.
   ```
   private lazy val indexedColumns: Set[String] = {
     val customIndexedColumns = 
metadataConfig.getColumnsEnabledForColumnStatsIndex
     // Column Stats Index could index either
     //    - The whole table
     //    - Only configured columns
     if (customIndexedColumns.isEmpty) {
       tableSchema.fieldNames.toSet
     } else {
       customIndexedColumns.asScala.toSet
     }
   }
   
   ```
   Simultaneously, indexSchema is also obtained from the schema of indexDf, so 
it will not include any unsupported columns. Therefore, query conditions for 
unsupported columns will also be filtered out in the indexFilter.
   ```
   val indexSchema = indexDf.schema
   val indexFilter = 
queryFilters.map(translateIntoColumnStatsIndexFilterExpr(_, 
indexSchema)).reduce(And)
   
   ```
   #### Problem Summary
   Under the existing code implementation, when data skipping is enabled and 
the index columns have not been explicitly specified in the parameters, if the 
query conditions include both columns that are supported by column stats and 
those that are not, no data will ever be retrieved!
   #### Problem Fix
   We can see that the variables directly causing the issue are indexDf and 
indexFilter:
   ● indexDf includes unsupported columns, filled with inappropriate values.
   ● indexFilter also contains query conditions not supported by the index. And 
indexDf is filled with inappropriate values to adapt to these conditions.
   ● Moreover, the unsupported query conditions in indexFilter are obtained 
from filtering the schema derived from indexDf.
   Thus, the root of all problems lies in indexDf being filled with unsupported 
columns.
   Therefore, when fixing the issue, we consider removing the columns that are 
not supported from the relevant schema before obtaining the column stats 
records. There are two scenarios to consider:
   ● If index columns are not specified by default, all columns that support 
column stats are indexed. By removing the unsupported columns, we effectively 
remove the columns that are not indexed. This will not affect the query results.
   ● If specific index columns are specified, it might be possible that some 
columns that do support indexing are not indexed. In this case, what we remove 
is not the complete set of non-indexed columns. But as argued in case 4, this 
situation will also not affect the query results.
   If subsequent changes or an incomplete consideration of types cause columns 
that do have indexes to be removed, at worst, it might temporarily affect the 
data skipping performance. However, the filter will return to the computation 
layer for further filtering, so it will not impact the query results.
   
   ### Change Logs
   
   When data skipping, filter the columns.
   
   ### Impact
   
   None
   
   ### Risk level (write none, low medium or high below)
   
   medium
   
   ### Documentation Update
   
   None
   
   ### Contributor's checklist
   
   - [ ] Read through [contributor's 
guide](https://hudi.apache.org/contribute/how-to-contribute)
   - [ ] Change Logs and Impact were stated clearly
   - [ ] Adequate tests were added if applicable
   - [ ] CI passed
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to