[ 
https://issues.apache.org/jira/browse/PARQUET-1488?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16884204#comment-16884204
 ] 

Ryan Blue commented on PARQUET-1488:
------------------------------------

We discussed this on SPARK-28371.

Previously, Parquet did not fail if a UserDefinedPredicate did not handle null 
values, so I think that it is a regression that Parquet will cause previously 
working code to fail. I think that it is correct for Parquet to call a UDP the 
way that it is, but that Parquet should catch exceptions thrown by the 
predicate and should process the row group where there error was thrown. That 
way, Parquet can keep the optimization for columns that are all null, but it 
doesn't break existing code.

[~yumwang], would you like to submit a PR for this?

> UserDefinedPredicate throw NullPointerException
> -----------------------------------------------
>
>                 Key: PARQUET-1488
>                 URL: https://issues.apache.org/jira/browse/PARQUET-1488
>             Project: Parquet
>          Issue Type: Bug
>          Components: parquet-mr
>    Affects Versions: 1.11.0
>            Reporter: Yuming Wang
>            Assignee: Yuming Wang
>            Priority: Major
>
> It throws {{NullPointerException}} after upgrade parquet to 1.11.0 when using 
> {{UserDefinedPredicate}}.
> The  
> [UserDefinedPredicate|https://github.com/apache/spark/blob/faf73dcd33d04365c28c2846d3a1f845785f69df/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala#L548-L578]
>  is:
> {code:java}
> new UserDefinedPredicate[Binary] with Serializable {                          
>         
>   private val strToBinary = Binary.fromReusedByteArray(v.getBytes)            
>         
>   private val size = strToBinary.length                                       
>         
>                                                                               
>         
>   override def canDrop(statistics: Statistics[Binary]): Boolean = {           
>         
>     val comparator = 
> PrimitiveComparator.UNSIGNED_LEXICOGRAPHICAL_BINARY_COMPARATOR   
>     val max = statistics.getMax                                               
>         
>     val min = statistics.getMin                                               
>         
>     comparator.compare(max.slice(0, math.min(size, max.length)), strToBinary) 
> < 0 ||  
>       comparator.compare(min.slice(0, math.min(size, min.length)), 
> strToBinary) > 0   
>   }                                                                           
>         
>                                                                               
>         
>   override def inverseCanDrop(statistics: Statistics[Binary]): Boolean = {    
>         
>     val comparator = 
> PrimitiveComparator.UNSIGNED_LEXICOGRAPHICAL_BINARY_COMPARATOR   
>     val max = statistics.getMax                                               
>         
>     val min = statistics.getMin                                               
>         
>     comparator.compare(max.slice(0, math.min(size, max.length)), strToBinary) 
> == 0 && 
>       comparator.compare(min.slice(0, math.min(size, min.length)), 
> strToBinary) == 0  
>   }                                                                           
>         
>                                                                               
>         
>   override def keep(value: Binary): Boolean = {                               
>         
>     UTF8String.fromBytes(value.getBytes).startsWith(                          
>         
>       UTF8String.fromBytes(strToBinary.getBytes))                             
>         
>   }                                                                           
>         
> }                                                                             
>         
> {code}
> The stack trace is:
> {noformat}
> java.lang.NullPointerException
>       at 
> org.apache.spark.sql.execution.datasources.parquet.ParquetFilters$$anon$1.keep(ParquetFilters.scala:573)
>       at 
> org.apache.spark.sql.execution.datasources.parquet.ParquetFilters$$anon$1.keep(ParquetFilters.scala:552)
>       at 
> org.apache.parquet.internal.filter2.columnindex.ColumnIndexFilter.visit(ColumnIndexFilter.java:152)
>       at 
> org.apache.parquet.internal.filter2.columnindex.ColumnIndexFilter.visit(ColumnIndexFilter.java:56)
>       at 
> org.apache.parquet.filter2.predicate.Operators$UserDefined.accept(Operators.java:377)
>       at 
> org.apache.parquet.internal.filter2.columnindex.ColumnIndexFilter.visit(ColumnIndexFilter.java:181)
>       at 
> org.apache.parquet.internal.filter2.columnindex.ColumnIndexFilter.visit(ColumnIndexFilter.java:56)
>       at 
> org.apache.parquet.filter2.predicate.Operators$And.accept(Operators.java:309)
>       at 
> org.apache.parquet.internal.filter2.columnindex.ColumnIndexFilter$1.visit(ColumnIndexFilter.java:86)
>       at 
> org.apache.parquet.internal.filter2.columnindex.ColumnIndexFilter$1.visit(ColumnIndexFilter.java:81)
> {noformat}



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)

Reply via email to