Ryan Blue created SPARK-23325:

             Summary: DataSourceV2 readers should always produce InternalRow.
                 Key: SPARK-23325
                 URL: https://issues.apache.org/jira/browse/SPARK-23325
             Project: Spark
          Issue Type: Sub-task
          Components: SQL
    Affects Versions: 2.3.0
            Reporter: Ryan Blue

DataSourceV2 row-oriented implementations are limited to producing either 
{{Row}} instances or {{UnsafeRow}} instances by implementing 
{{SupportsScanUnsafeRow}}. Instead, I think that implementations should always 
produce {{InternalRow}}.

The problem with the choice between {{Row}} and {{UnsafeRow}} is that neither 
one is appropriate for implementers.

File formats don't produce {{Row}} instances or the data values used by 
{{Row}}, like {{java.sql.Timestamp}} and {{java.sql.Date}}. An implementation 
that uses {{Row}} instances must produce data that is immediately translated 
from the representation that was just produced by Spark. In my experience, it 
made little sense to translate a timestamp in microseconds to a (milliseconds, 
nanoseconds) pair, create a {{Timestamp}} instance, and pass that instance to 
Spark for immediate translation back.

On the other hand, {{UnsafeRow}} is very difficult to produce unless data is 
already held in memory. Even the Parquet support built into Spark deserializes 
to {{InternalRow}} and then uses {{UnsafeProjection}} to produce unsafe rows. 
When I went to build an implementation that deserializes Parquet or Avro 
directly to {{UnsafeRow}} (I tried both), I found that it couldn't be done 
without first deserializing into memory because the size of an array must be 
known before any values are written.

I ended up deciding to deserialize to {{InternalRow}} and use 
{{UnsafeProjection}} to convert to unsafe. There are two problems with this: 
first, this is Scala and was difficult to call from Java (it required 
reflection), and second, this causes double projection in the physical plan (a 
copy for unsafe to unsafe) if there is a projection that wasn't fully pushed to 
the data source.

I think the solution is to have a single interface for readers that expects 
{{InternalRow}}. Then, a projection should be added in the Spark plan to 
convert to unsafe and avoid projection in the plan and in the data source. If 
the data source already produces unsafe rows by deserializing directly, this 
still minimizes the number of copies because the unsafe projection will check 
whether the incoming data is already {{UnsafeRow}}.

Using {{InternalRow}} would also match the interface on the write side.

This message was sent by Atlassian JIRA

To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to