Is it required for DataReader to support all known DataFormat?

Hopefully, not, as assumed by the 'throw' in the interface. Then specifically 
how are we going to express capability of the given reader of its supported 
format(s), or specific support for each of "real-time data in row format, and 
history data in columnar format"?

From: Wenchen Fan <>
Sent: Sunday, April 15, 2018 7:45:01 PM
To: Spark dev list
Subject: [discuss][data source v2] remove type parameter in 

Hi all,

I'd like to propose an API change to the data source v2.

One design goal of data source v2 is API type safety. The FileFormat API is a 
bad example, it asks the implementation to return InternalRow even it's 
actually ColumnarBatch. In data source v2 we add a type parameter to 
DataReader/WriterFactoty and DataReader/Writer, so that data source supporting 
columnar scan returns ColumnarBatch at API level.

However, we met some problems when migrating streaming and file-based data 
source to data source v2.

For the streaming side, we need a variant of DataReader/WriterFactory to add 
streaming specific concept like epoch id and offset. For details please see 
ContinuousDataReaderFactory and

But this conflicts with the special format mixin traits like 
SupportsScanColumnarBatch. We have to make the streaming variant of 
DataReader/WriterFactory to extend the original DataReader/WriterFactory, and 
do type cast at runtime, which is unnecessary and violate the type safety.

For the file-based data source side, we have a problem with code duplication. 
Let's take ORC data source as an example. To support both unsafe row and 
columnar batch scan, we need something like

// A lot of parameters to carry to the executor side
class OrcUnsafeRowFactory(...) extends DataReaderFactory[UnsafeRow] {
  def createDataReader ...

class OrcColumnarBatchFactory(...) extends DataReaderFactory[ColumnarBatch] {
  def createDataReader ...

class OrcDataSourceReader extends DataSourceReader {
  def createUnsafeRowFactories = ... // logic to prepare the parameters and 
create factories

  def createColumnarBatchFactories = ... // logic to prepare the parameters and 
create factories

You can see that we have duplicated logic for preparing parameters and defining 
the factory.

Here I propose to remove all the special format mixin traits and change the 
factory interface to

public enum DataFormat {

interface DataReaderFactory {
  DataFormat dataFormat;

  default DataReader<Row> createRowDataReader() {
    throw new IllegalStateException();

  default DataReader<UnsafeRow> createUnsafeRowDataReader() {
    throw new IllegalStateException();

  default DataReader<ColumnarBatch> createColumnarBatchDataReader() {
    throw new IllegalStateException();

Spark will look at the dataFormat and decide which create data reader method to 

Now we don't have the problem for the streaming side as these special format 
mixin traits go away. And the ORC data source can also be simplified to

class OrcReaderFactory(...) extends DataReaderFactory {
  def createUnsafeRowReader ...

  def createColumnarBatchReader ...

class OrcDataSourceReader extends DataSourceReader {
  def createReadFactories = ... // logic to prepare the parameters and create 

We also have a potential benefit of supporting hybrid storage data source, 
which may keep real-time data in row format, and history data in columnar 
format. Then they can make some DataReaderFactory output InternalRow and some 
output ColumnarBatch.


Reply via email to