[ 
https://issues.apache.org/jira/browse/PARQUET-577?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Remek Zajac updated PARQUET-577:
--------------------------------
    Description: 
Avro spec schema [resolution rules 
](https://avro.apache.org/docs/1.7.7/spec.html#schema_record) say: 
"if the reader's record schema has a field with no default value, and writer's 
schema does not have a field with the same name, an error is signalled."

I can't find the implementation of this aspect in parquet.avro and indeed 
observe this rule seemingly ignored. I am using 1.6.0 because that's what we 
can get off maven. 

My writer's schema:
{code}
{
  "type" : "record",
  "name" : "SampleSchema_v1",
  "namespace" : "com.xxxx.spark",
  "fields" : [ {
    "name" : "stringField",
    "type" : "string",
    "doc"  : "Sample string field"
  },{
    "name" : "longField",
    "type" : "long",
    "doc"  : "Sample long field"
  } ],
  "doc:" : "A sample/test schema"
}
{code}

My reader schema:
{code}
{
  "type" : "record",
  "name" : "SampleSchema_newDefaultlessCol",
  "namespace" : "com.xxxx.spark",
  "fields" : [ {
    "name" : "stringField",
    "type" : "string",
    "doc"  : "Sample string field"
  },{
    "name" : "longField",
    "type" : "long",
    "doc"  : "Sample long field"
  },{
    "name" : "mandatoryIntField",
    "type" : "int",
    "doc"  : "Sample mandatory! int field"
  }],
  "doc:" : "v1 + one extra column that has no default"
}
{code}
This is my test case:
{code}
    "accept new column w/o a default [schema-evolution, undesired]" in new 
MockAvroParquetGrid {
      //TODO: the behaviour this test case exercises is UNDESIRED, i.e.: a new 
column with no default value should
      //TODO: Ticket to track this: https://jira.xxxx.io/browse/ADR-610
      //constitute an incompatible schema break, instead, this thing uses 0 for 
the default
      val inputSampleRecordsV1  = Seq(new SampleSchema_v1(s"string", 1))
      dao.writeParquet[SampleSchema_v1](
        SparkBase.sc.parallelize(inputSampleRecordsV1),
        SampleSchema_v1.SCHEMA$,
        parquetFolder
      )

      dao
        .readParquet[SampleSchema_newDefaultlessCol](parquetFolder, 
SampleSchema_newDefaultlessCol.SCHEMA$)
        .collect().toSeq.head
        .getMandatoryIntField must equalTo(0) //TODO: zero is an unwelcome guess
    }
{code}
This is the implementation of writeParquet and readParquet
{code}
  def writeParquet[C](source: RDD[C], schema: org.apache.avro.Schema, dstPath: 
String)
                     (implicit ctag: ClassTag[C]): Unit = {
    val hadoopJob = Job.getInstance()
    ParquetOutputFormat.setWriteSupportClass(hadoopJob, 
classOf[AvroWriteSupport])
    ParquetOutputFormat.setCompression(hadoopJob, CompressionCodecName.GZIP)
    AvroWriteSupport.setSchema(hadoopJob.getConfiguration, schema)

    new PairRDDFunctions[Void,C](
      source.map(sourceRecord => (null, sourceRecord))
    ).saveAsNewAPIHadoopFile(
      bucketDAO.uri(dstPath),
      classOf[Void],                            //K
      ctag.runtimeClass.asInstanceOf[Class[C]], //V
      classOf[AvroParquetOutputFormat],
      hadoopJob.getConfiguration
    )
  }

  def readParquet[C](srcPath: String, schema: org.apache.avro.Schema)(implicit 
ctag: ClassTag[C]): RDD[C] = {
    val hadoopJob = Job.getInstance()
    ParquetInputFormat.setReadSupportClass(hadoopJob, 
classOf[AvroReadSupport[C]])
    AvroReadSupport.setAvroReadSchema(hadoopJob.getConfiguration, schema)
    sc.newAPIHadoopFile(
      bucketDAO.uri(srcPath),
      classOf[ParquetInputFormat[C]],
      classOf[Void],                            //K
      ctag.runtimeClass.asInstanceOf[Class[C]], //V
      hadoopJob.getConfiguration
    ).map { _._2 }
  }
{code}
We use avro-tools to generate java classes from our avro schemas.
java -jar /path/to/avro-tools-1.8.0.jar compile schema <schema file> 
<destination>

The test case harvests zeroes as values of mandatoryIntField

Naively, I see a problem in the [indexed revord 
converter](https://git-wip-us.apache.org/repos/asf?p=parquet-mr.git;a=blob;f=parquet-avro/src/main/java/org/apache/parquet/avro/AvroIndexedRecordConverter.java;h=06c66d692571da08298ae1da4f9967446c4864ee;hb=HEAD#l105)
 in that it cheerfully accepts a condition doomed to fail. The condition being: 
the reader schema has a column with no default value that is absent in the 
writer schema.

I am writing predominantly to confirm my diagnosis and to get the intell on why 
is it implemented the way it is. Is it fixable (or other depend on it as on a 
feature)? Can people think of a workaround?


  was:
Avro spec schema [resolution rules 
](https://avro.apache.org/docs/1.7.7/spec.html#schema_record) say: 
"if the reader's record schema has a field with no default value, and writer's 
schema does not have a field with the same name, an error is signalled."

I can't find the implementation of this aspect in parquet.avro and indeed 
observe this rule seemingly ignored. I am using 1.6.0 because that's what we 
can get off maven. 

My writer's schema:
{code}
{
  "type" : "record",
  "name" : "SampleSchema_v1",
  "namespace" : "com.xxxx.spark",
  "fields" : [ {
    "name" : "stringField",
    "type" : "string",
    "doc"  : "Sample string field"
  },{
    "name" : "longField",
    "type" : "long",
    "doc"  : "Sample long field"
  } ],
  "doc:" : "A sample/test schema"
}
{code}

My reader schema:
{code}
{
  "type" : "record",
  "name" : "SampleSchema_newDefaultlessCol",
  "namespace" : "com.xxxx.spark",
  "fields" : [ {
    "name" : "stringField",
    "type" : "string",
    "doc"  : "Sample string field"
  },{
    "name" : "longField",
    "type" : "long",
    "doc"  : "Sample long field"
  },{
    "name" : "mandatoryIntField",
    "type" : "int",
    "doc"  : "Sample mandatory! int field"
  }],
  "doc:" : "v1 + one extra column that has no default"
}
{code}
This is my test case:
{code}
    "accept new column w/o a default [schema-evolution, undesired]" in new 
MockAvroParquetGrid {
      //TODO: the behaviour this test case exercises is UNDESIRED, i.e.: a new 
column with no default value should
      //TODO: Ticket to track this: https://jira.xxxx.io/browse/ADR-610
      //constitute an incompatible schema break, instead, this thing uses 0 for 
the default
      val inputSampleRecordsV1  = Seq(new SampleSchema_v1(s"string", 1))
      dao.writeParquet[SampleSchema_v1](
        SparkBase.sc.parallelize(inputSampleRecordsV1),
        SampleSchema_v1.SCHEMA$,
        parquetFolder
      )

      dao
        .readParquet[SampleSchema_newDefaultlessCol](parquetFolder, 
SampleSchema_newDefaultlessCol.SCHEMA$)
        .collect().toSeq.head
        .getMandatoryIntField must equalTo(0) //TODO: zero is an unwelcome guess
    }
{code}
This is the implementation of writeParquet and readParquet
{code}
  def writeParquet[C](source: RDD[C], schema: org.apache.avro.Schema, dstPath: 
String)
                     (implicit ctag: ClassTag[C]): Unit = {
    val hadoopJob = Job.getInstance()
    ParquetOutputFormat.setWriteSupportClass(hadoopJob, 
classOf[AvroWriteSupport])
    ParquetOutputFormat.setCompression(hadoopJob, CompressionCodecName.GZIP)
    AvroWriteSupport.setSchema(hadoopJob.getConfiguration, schema)

    new PairRDDFunctions[Void,C](
      source.map(sourceRecord => (null, sourceRecord))
    ).saveAsNewAPIHadoopFile(
      bucketDAO.uri(dstPath),
      classOf[Void],                            //K
      ctag.runtimeClass.asInstanceOf[Class[C]], //V
      classOf[AvroParquetOutputFormat],
      hadoopJob.getConfiguration
    )
  }

  def readParquet[C](srcPath: String, schema: org.apache.avro.Schema)(implicit 
ctag: ClassTag[C]): RDD[C] = {
    val hadoopJob = Job.getInstance()
    ParquetInputFormat.setReadSupportClass(hadoopJob, 
classOf[AvroReadSupport[C]])
    AvroReadSupport.setAvroReadSchema(hadoopJob.getConfiguration, schema)
    sc.newAPIHadoopFile(
      bucketDAO.uri(srcPath),
      classOf[ParquetInputFormat[C]],
      classOf[Void],                            //K
      ctag.runtimeClass.asInstanceOf[Class[C]], //V
      hadoopJob.getConfiguration
    ).map { _._2 }
  }
{code}
We use avro-tools to generate java classes from our avro schemas.
java -jar /path/to/avro-tools-1.8.0.jar compile schema <schema file> 
<destination>

The test case harvests zeroes as values of mandatoryIntField

Naively, I see a problem in the [indexed revord 
converter](https://git-wip-us.apache.org/repos/asf?p=parquet-mr.git;a=blob;f=parquet-avro/src/main/java/org/apache/parquet/avro/AvroIndexedRecordConverter.java;h=06c66d692571da08298ae1da4f9967446c4864ee;hb=HEAD#l99)
 in that it cheerfully accepts a condition doomed to fail. The condition being: 
the reader schema has a column with no default value that is absent in the 
writer schema.

I am writing predominantly to confirm my diagnosis and to get the intell on why 
is it implemented the way it is. Is it fixable (or other depend on it as on a 
feature)? Can people think of a workaround?



> mandatory status of avro columns ignored
> ----------------------------------------
>
>                 Key: PARQUET-577
>                 URL: https://issues.apache.org/jira/browse/PARQUET-577
>             Project: Parquet
>          Issue Type: Bug
>          Components: parquet-avro
>    Affects Versions: 1.6.0
>            Reporter: Remek Zajac
>
> Avro spec schema [resolution rules 
> ](https://avro.apache.org/docs/1.7.7/spec.html#schema_record) say: 
> "if the reader's record schema has a field with no default value, and 
> writer's schema does not have a field with the same name, an error is 
> signalled."
> I can't find the implementation of this aspect in parquet.avro and indeed 
> observe this rule seemingly ignored. I am using 1.6.0 because that's what we 
> can get off maven. 
> My writer's schema:
> {code}
> {
>   "type" : "record",
>   "name" : "SampleSchema_v1",
>   "namespace" : "com.xxxx.spark",
>   "fields" : [ {
>     "name" : "stringField",
>     "type" : "string",
>     "doc"  : "Sample string field"
>   },{
>     "name" : "longField",
>     "type" : "long",
>     "doc"  : "Sample long field"
>   } ],
>   "doc:" : "A sample/test schema"
> }
> {code}
> My reader schema:
> {code}
> {
>   "type" : "record",
>   "name" : "SampleSchema_newDefaultlessCol",
>   "namespace" : "com.xxxx.spark",
>   "fields" : [ {
>     "name" : "stringField",
>     "type" : "string",
>     "doc"  : "Sample string field"
>   },{
>     "name" : "longField",
>     "type" : "long",
>     "doc"  : "Sample long field"
>   },{
>     "name" : "mandatoryIntField",
>     "type" : "int",
>     "doc"  : "Sample mandatory! int field"
>   }],
>   "doc:" : "v1 + one extra column that has no default"
> }
> {code}
> This is my test case:
> {code}
>     "accept new column w/o a default [schema-evolution, undesired]" in new 
> MockAvroParquetGrid {
>       //TODO: the behaviour this test case exercises is UNDESIRED, i.e.: a 
> new column with no default value should
>       //TODO: Ticket to track this: https://jira.xxxx.io/browse/ADR-610
>       //constitute an incompatible schema break, instead, this thing uses 0 
> for the default
>       val inputSampleRecordsV1  = Seq(new SampleSchema_v1(s"string", 1))
>       dao.writeParquet[SampleSchema_v1](
>         SparkBase.sc.parallelize(inputSampleRecordsV1),
>         SampleSchema_v1.SCHEMA$,
>         parquetFolder
>       )
>       dao
>         .readParquet[SampleSchema_newDefaultlessCol](parquetFolder, 
> SampleSchema_newDefaultlessCol.SCHEMA$)
>         .collect().toSeq.head
>         .getMandatoryIntField must equalTo(0) //TODO: zero is an unwelcome 
> guess
>     }
> {code}
> This is the implementation of writeParquet and readParquet
> {code}
>   def writeParquet[C](source: RDD[C], schema: org.apache.avro.Schema, 
> dstPath: String)
>                      (implicit ctag: ClassTag[C]): Unit = {
>     val hadoopJob = Job.getInstance()
>     ParquetOutputFormat.setWriteSupportClass(hadoopJob, 
> classOf[AvroWriteSupport])
>     ParquetOutputFormat.setCompression(hadoopJob, CompressionCodecName.GZIP)
>     AvroWriteSupport.setSchema(hadoopJob.getConfiguration, schema)
>     new PairRDDFunctions[Void,C](
>       source.map(sourceRecord => (null, sourceRecord))
>     ).saveAsNewAPIHadoopFile(
>       bucketDAO.uri(dstPath),
>       classOf[Void],                            //K
>       ctag.runtimeClass.asInstanceOf[Class[C]], //V
>       classOf[AvroParquetOutputFormat],
>       hadoopJob.getConfiguration
>     )
>   }
>   def readParquet[C](srcPath: String, schema: 
> org.apache.avro.Schema)(implicit ctag: ClassTag[C]): RDD[C] = {
>     val hadoopJob = Job.getInstance()
>     ParquetInputFormat.setReadSupportClass(hadoopJob, 
> classOf[AvroReadSupport[C]])
>     AvroReadSupport.setAvroReadSchema(hadoopJob.getConfiguration, schema)
>     sc.newAPIHadoopFile(
>       bucketDAO.uri(srcPath),
>       classOf[ParquetInputFormat[C]],
>       classOf[Void],                            //K
>       ctag.runtimeClass.asInstanceOf[Class[C]], //V
>       hadoopJob.getConfiguration
>     ).map { _._2 }
>   }
> {code}
> We use avro-tools to generate java classes from our avro schemas.
> java -jar /path/to/avro-tools-1.8.0.jar compile schema <schema file> 
> <destination>
> The test case harvests zeroes as values of mandatoryIntField
> Naively, I see a problem in the [indexed revord 
> converter](https://git-wip-us.apache.org/repos/asf?p=parquet-mr.git;a=blob;f=parquet-avro/src/main/java/org/apache/parquet/avro/AvroIndexedRecordConverter.java;h=06c66d692571da08298ae1da4f9967446c4864ee;hb=HEAD#l105)
>  in that it cheerfully accepts a condition doomed to fail. The condition 
> being: the reader schema has a column with no default value that is absent in 
> the writer schema.
> I am writing predominantly to confirm my diagnosis and to get the intell on 
> why is it implemented the way it is. Is it fixable (or other depend on it as 
> on a feature)? Can people think of a workaround?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to