[ 
https://issues.apache.org/jira/browse/PARQUET-1330?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zejun Li updated PARQUET-1330:
------------------------------
    Description: 
I got a Parquet file written by Hive with this schema:
{code:java}
file schema: hive_schema
--------------------------------------------------------------------------------
taxi_id: OPTIONAL BINARY O:UTF8 R:0 D:1
date: OPTIONAL BINARY O:UTF8 R:0 D:1
start_time: OPTIONAL INT64 R:0 D:1
end_time: OPTIONAL INT64 R:0 D:1
min_lat_wgs: OPTIONAL DOUBLE R:0 D:1
min_lng_wgs: OPTIONAL DOUBLE R:0 D:1
max_lat_wgs: OPTIONAL DOUBLE R:0 D:1
max_lng_wgs: OPTIONAL DOUBLE R:0 D:1
first_lat_wgs: OPTIONAL DOUBLE R:0 D:1
first_lng_wgs: OPTIONAL DOUBLE R:0 D:1
last_lat_wgs: OPTIONAL DOUBLE R:0 D:1
last_lng_wgs: OPTIONAL DOUBLE R:0 D:1
gps_log: OPTIONAL F:1
.bag: REPEATED F:1
..array_element: OPTIONAL F:6
...timestamp: OPTIONAL INT64 R:1 D:4
...lat_wgs: OPTIONAL DOUBLE R:1 D:4
...lng_wgs: OPTIONAL DOUBLE R:1 D:4
...item: OPTIONAL INT32 R:1 D:4
...direction: OPTIONAL INT32 R:1 D:4
...vflag: OPTIONAL INT32 R:1 D:4
{code}
I want to use parquet-avro to read it, and use 
`AvroReadSupport.setRequestedProjection` to select a subset of field.
{code:java}
{
  "type": "record",
  "name": "test",
  "fields": [
    {
      "name": "taxi_id",
      "type": ["null", "string"]
    },
    {
      "name": "gps_log",
      "type": [{
        "type": "array",
        "items": ["null", {
          "name": "point",
          "type": "record",
          "fields": [
            {
              "name": "lat_wgs",
              "type": ["null", "double"]
            },
            {
              "name": "lng_wgs",
              "type": ["null", "double"]
            }
          ]
        }]
      }],
      "default": "null"
    }
  ]
}{code}
I try to read data with code: 
{code:java}
Configuration conf = new Configuration(); 
AvroReadSupport.setRequestedProjection(conf, schema); 
conf.setBoolean(AvroWriteSupport.WRITE_OLD_LIST_STRUCTURE, false); 
conf.setBoolean(AvroSchemaConverter.ADD_LIST_ELEMENT_RECORDS, false); 
AvroParquetReader<GenericRecord> reader = new AvroParquetReader<>(conf, path);
{code}
And I got errors:
{code:java}
Exception in thread "main" parquet.io.ParquetDecodingException: The requested 
schema is not compatible with the file schema. incompatible types: 
required group gps_log (LIST) {
  repeated group list {
    optional group element {
      optional double lat_wgs;
      optional double lng_wgs;
    }
  }
} != optional group gps_log (LIST) {
  repeated group bag {
    optional group array_element {
      optional int64 timestamp;
      optional double lat_wgs;
      optional double lng_wgs;
      optional int32 item;
      optional int32 direction;
      optional int32 vflag;
    }
  }
}
{code}
This error doesn't caused by the nullability of `gps_log`. If I mark it 
nullable in Avro schema, I'll always get a null value.

 

I try to add some code in `AvroSchemaConverter.convertField`:
{code:java}
diff --git a/AvroSchemaConverter.java b/AvroSchemaConverterNew.java
index 0b8076b..48b56dd 100644
--- a/AvroSchemaConverter.java
+++ b/AvroSchemaConverterNew.java
@@ -50,12 +50,17 @@ public class AvroSchemaConverter {
     "parquet.avro.add-list-element-records";
private static final boolean ADD_LIST_ELEMENT_RECORDS_DEFAULT = true;
+ public static final String READ_HIVE_WRITE_FILE =
+    "parquet.avro.read-hive-write-file";
+ private static final boolean READ_HIVE_WRITE_FILE_DEFAULT = false;
+
private final boolean assumeRepeatedIsListElement;
private final boolean writeOldListStructure;
public AvroSchemaConverter() {
     this.assumeRepeatedIsListElement = ADD_LIST_ELEMENT_RECORDS_DEFAULT;
     this.writeOldListStructure = WRITE_OLD_LIST_STRUCTURE_DEFAULT;
+   this.isReadHiveWriteFileDefault = READ_HIVE_WRITE_FILE_DEFAULT;
}
public AvroSchemaConverter(Configuration conf) {
@@ -63,6 +68,9 @@ public class AvroSchemaConverter {
         ADD_LIST_ELEMENT_RECORDS, ADD_LIST_ELEMENT_RECORDS_DEFAULT);
     this.writeOldListStructure = conf.getBoolean(
         WRITE_OLD_LIST_STRUCTURE, WRITE_OLD_LIST_STRUCTURE_DEFAULT);
+   this.isReadHiveWriteFileDefault = conf.getBoolean(
+       READ_HIVE_WRITE_FILE, READ_HIVE_WRITE_FILE_DEFAULT
+   );
}
/**
@@ -137,7 +145,14 @@ public class AvroSchemaConverter {
     if (writeOldListStructure) {
         return ConversionPatterns.listType(repetition, fieldName,
             convertField("array", schema.getElementType(), REPEATED));
-    } else {
+    } else if (isReadHiveWriteFileDefault) {
+       Type elementType = convertField("array_element", 
schema.getElementType());
+       return new GroupType(
+           repetition,
+           fieldName,
+           LIST,
+           new GroupType(Type.Repetition.REPEATED, "bag", elementType));
+       } else {
         return ConversionPatterns.listOfElements(repetition, fieldName,
             convertField(AvroWriteSupport.LIST_ELEMENT_NAME, 
schema.getElementType()));
     }
{code}
It can read data with this file.

 

So is this a compatibility problem in parquet-avro, or just I missed some 
configuration?

 

 

  was:
I got a Parquet file written by Hive with this schema:
{code:java}
file schema: hive_schema
--------------------------------------------------------------------------------
taxi_id: OPTIONAL BINARY O:UTF8 R:0 D:1
date: OPTIONAL BINARY O:UTF8 R:0 D:1
start_time: OPTIONAL INT64 R:0 D:1
end_time: OPTIONAL INT64 R:0 D:1
min_lat_wgs: OPTIONAL DOUBLE R:0 D:1
min_lng_wgs: OPTIONAL DOUBLE R:0 D:1
max_lat_wgs: OPTIONAL DOUBLE R:0 D:1
max_lng_wgs: OPTIONAL DOUBLE R:0 D:1
first_lat_wgs: OPTIONAL DOUBLE R:0 D:1
first_lng_wgs: OPTIONAL DOUBLE R:0 D:1
last_lat_wgs: OPTIONAL DOUBLE R:0 D:1
last_lng_wgs: OPTIONAL DOUBLE R:0 D:1
gps_log: OPTIONAL F:1
.bag: REPEATED F:1
..array_element: OPTIONAL F:6
...timestamp: OPTIONAL INT64 R:1 D:4
...lat_wgs: OPTIONAL DOUBLE R:1 D:4
...lng_wgs: OPTIONAL DOUBLE R:1 D:4
...item: OPTIONAL INT32 R:1 D:4
...direction: OPTIONAL INT32 R:1 D:4
...vflag: OPTIONAL INT32 R:1 D:4
{code}
I want to use parquet-avro to read it, and use 
`AvroReadSupport.setRequestedProjection` to select a subset of field.
{code:java}
{
  "type": "record",
  "name": "test",
  "fields": [
    {
      "name": "taxi_id",
      "type": ["null", "string"]
    },
    {
      "name": "gps_log",
      "type": [{
        "type": "array",
        "items": ["null", {
          "name": "point",
          "type": "record",
          "fields": [
            {
              "name": "lat_wgs",
              "type": ["null", "double"]
            },
            {
              "name": "lng_wgs",
              "type": ["null", "double"]
            }
          ]
        }]
      }],
      "default": "null"
    }
  ]
}{code}
I try to read data with code:

 
{code:java}
Configuration conf = new Configuration(); 
AvroReadSupport.setRequestedProjection(conf, schema); 
conf.setBoolean(AvroWriteSupport.WRITE_OLD_LIST_STRUCTURE, false); 
conf.setBoolean(AvroSchemaConverter.ADD_LIST_ELEMENT_RECORDS, false); 
AvroParquetReader<GenericRecord> reader = new AvroParquetReader<>(conf, path);
{code}
And I got errors:

 
{code:java}
Exception in thread "main" parquet.io.ParquetDecodingException: The requested 
schema is not compatible with the file schema. incompatible types: 
required group gps_log (LIST) {
  repeated group list {
    optional group element {
      optional double lat_wgs;
      optional double lng_wgs;
    }
  }
} != optional group gps_log (LIST) {
  repeated group bag {
    optional group array_element {
      optional int64 timestamp;
      optional double lat_wgs;
      optional double lng_wgs;
      optional int32 item;
      optional int32 direction;
      optional int32 vflag;
    }
  }
}
{code}
This error doesn't caused by the nullability of `gps_log`. If I mark it 
nullable in Avro schema, I'll always get a null value.

 

 

I try to add some code in `AvroSchemaConverter.convertField`:

 
{code:java}
diff --git a/AvroSchemaConverter.java b/AvroSchemaConverterNew.java
index 0b8076b..48b56dd 100644
--- a/AvroSchemaConverter.java
+++ b/AvroSchemaConverterNew.java
@@ -50,12 +50,17 @@ public class AvroSchemaConverter {
     "parquet.avro.add-list-element-records";
private static final boolean ADD_LIST_ELEMENT_RECORDS_DEFAULT = true;
+ public static final String READ_HIVE_WRITE_FILE =
+    "parquet.avro.read-hive-write-file";
+ private static final boolean READ_HIVE_WRITE_FILE_DEFAULT = false;
+
private final boolean assumeRepeatedIsListElement;
private final boolean writeOldListStructure;
public AvroSchemaConverter() {
     this.assumeRepeatedIsListElement = ADD_LIST_ELEMENT_RECORDS_DEFAULT;
     this.writeOldListStructure = WRITE_OLD_LIST_STRUCTURE_DEFAULT;
+   this.isReadHiveWriteFileDefault = READ_HIVE_WRITE_FILE_DEFAULT;
}
public AvroSchemaConverter(Configuration conf) {
@@ -63,6 +68,9 @@ public class AvroSchemaConverter {
         ADD_LIST_ELEMENT_RECORDS, ADD_LIST_ELEMENT_RECORDS_DEFAULT);
     this.writeOldListStructure = conf.getBoolean(
         WRITE_OLD_LIST_STRUCTURE, WRITE_OLD_LIST_STRUCTURE_DEFAULT);
+   this.isReadHiveWriteFileDefault = conf.getBoolean(
+       READ_HIVE_WRITE_FILE, READ_HIVE_WRITE_FILE_DEFAULT
+   );
}
/**
@@ -137,7 +145,14 @@ public class AvroSchemaConverter {
     if (writeOldListStructure) {
         return ConversionPatterns.listType(repetition, fieldName,
             convertField("array", schema.getElementType(), REPEATED));
-    } else {
+    } else if (isReadHiveWriteFileDefault) {
+       Type elementType = convertField("array_element", 
schema.getElementType());
+       return new GroupType(
+           repetition,
+           fieldName,
+           LIST,
+           new GroupType(Type.Repetition.REPEATED, "bag", elementType));
+       } else {
         return ConversionPatterns.listOfElements(repetition, fieldName,
             convertField(AvroWriteSupport.LIST_ELEMENT_NAME, 
schema.getElementType()));
     }
{code}
It can read data with this file.

 

So is this a compatibility problem in parquet-avro, or just I missed some 
configuration?

 

 


> Avro RequestedProjection incompatible with Hive written data
> ------------------------------------------------------------
>
>                 Key: PARQUET-1330
>                 URL: https://issues.apache.org/jira/browse/PARQUET-1330
>             Project: Parquet
>          Issue Type: Bug
>          Components: parquet-avro
>    Affects Versions: 1.9.0
>         Environment: parquet-mr in CDH 5.14.2 (base version is 1.5.0)
> parquet-mr 1.9.0
>            Reporter: Zejun Li
>            Priority: Major
>
> I got a Parquet file written by Hive with this schema:
> {code:java}
> file schema: hive_schema
> --------------------------------------------------------------------------------
> taxi_id: OPTIONAL BINARY O:UTF8 R:0 D:1
> date: OPTIONAL BINARY O:UTF8 R:0 D:1
> start_time: OPTIONAL INT64 R:0 D:1
> end_time: OPTIONAL INT64 R:0 D:1
> min_lat_wgs: OPTIONAL DOUBLE R:0 D:1
> min_lng_wgs: OPTIONAL DOUBLE R:0 D:1
> max_lat_wgs: OPTIONAL DOUBLE R:0 D:1
> max_lng_wgs: OPTIONAL DOUBLE R:0 D:1
> first_lat_wgs: OPTIONAL DOUBLE R:0 D:1
> first_lng_wgs: OPTIONAL DOUBLE R:0 D:1
> last_lat_wgs: OPTIONAL DOUBLE R:0 D:1
> last_lng_wgs: OPTIONAL DOUBLE R:0 D:1
> gps_log: OPTIONAL F:1
> .bag: REPEATED F:1
> ..array_element: OPTIONAL F:6
> ...timestamp: OPTIONAL INT64 R:1 D:4
> ...lat_wgs: OPTIONAL DOUBLE R:1 D:4
> ...lng_wgs: OPTIONAL DOUBLE R:1 D:4
> ...item: OPTIONAL INT32 R:1 D:4
> ...direction: OPTIONAL INT32 R:1 D:4
> ...vflag: OPTIONAL INT32 R:1 D:4
> {code}
> I want to use parquet-avro to read it, and use 
> `AvroReadSupport.setRequestedProjection` to select a subset of field.
> {code:java}
> {
>   "type": "record",
>   "name": "test",
>   "fields": [
>     {
>       "name": "taxi_id",
>       "type": ["null", "string"]
>     },
>     {
>       "name": "gps_log",
>       "type": [{
>         "type": "array",
>         "items": ["null", {
>           "name": "point",
>           "type": "record",
>           "fields": [
>             {
>               "name": "lat_wgs",
>               "type": ["null", "double"]
>             },
>             {
>               "name": "lng_wgs",
>               "type": ["null", "double"]
>             }
>           ]
>         }]
>       }],
>       "default": "null"
>     }
>   ]
> }{code}
> I try to read data with code: 
> {code:java}
> Configuration conf = new Configuration(); 
> AvroReadSupport.setRequestedProjection(conf, schema); 
> conf.setBoolean(AvroWriteSupport.WRITE_OLD_LIST_STRUCTURE, false); 
> conf.setBoolean(AvroSchemaConverter.ADD_LIST_ELEMENT_RECORDS, false); 
> AvroParquetReader<GenericRecord> reader = new AvroParquetReader<>(conf, path);
> {code}
> And I got errors:
> {code:java}
> Exception in thread "main" parquet.io.ParquetDecodingException: The requested 
> schema is not compatible with the file schema. incompatible types: 
> required group gps_log (LIST) {
>   repeated group list {
>     optional group element {
>       optional double lat_wgs;
>       optional double lng_wgs;
>     }
>   }
> } != optional group gps_log (LIST) {
>   repeated group bag {
>     optional group array_element {
>       optional int64 timestamp;
>       optional double lat_wgs;
>       optional double lng_wgs;
>       optional int32 item;
>       optional int32 direction;
>       optional int32 vflag;
>     }
>   }
> }
> {code}
> This error doesn't caused by the nullability of `gps_log`. If I mark it 
> nullable in Avro schema, I'll always get a null value.
>  
> I try to add some code in `AvroSchemaConverter.convertField`:
> {code:java}
> diff --git a/AvroSchemaConverter.java b/AvroSchemaConverterNew.java
> index 0b8076b..48b56dd 100644
> --- a/AvroSchemaConverter.java
> +++ b/AvroSchemaConverterNew.java
> @@ -50,12 +50,17 @@ public class AvroSchemaConverter {
>      "parquet.avro.add-list-element-records";
> private static final boolean ADD_LIST_ELEMENT_RECORDS_DEFAULT = true;
> + public static final String READ_HIVE_WRITE_FILE =
> +    "parquet.avro.read-hive-write-file";
> + private static final boolean READ_HIVE_WRITE_FILE_DEFAULT = false;
> +
> private final boolean assumeRepeatedIsListElement;
> private final boolean writeOldListStructure;
> public AvroSchemaConverter() {
>      this.assumeRepeatedIsListElement = ADD_LIST_ELEMENT_RECORDS_DEFAULT;
>      this.writeOldListStructure = WRITE_OLD_LIST_STRUCTURE_DEFAULT;
> +   this.isReadHiveWriteFileDefault = READ_HIVE_WRITE_FILE_DEFAULT;
> }
> public AvroSchemaConverter(Configuration conf) {
> @@ -63,6 +68,9 @@ public class AvroSchemaConverter {
>          ADD_LIST_ELEMENT_RECORDS, ADD_LIST_ELEMENT_RECORDS_DEFAULT);
>      this.writeOldListStructure = conf.getBoolean(
>          WRITE_OLD_LIST_STRUCTURE, WRITE_OLD_LIST_STRUCTURE_DEFAULT);
> +   this.isReadHiveWriteFileDefault = conf.getBoolean(
> +       READ_HIVE_WRITE_FILE, READ_HIVE_WRITE_FILE_DEFAULT
> +   );
> }
> /**
> @@ -137,7 +145,14 @@ public class AvroSchemaConverter {
>      if (writeOldListStructure) {
>          return ConversionPatterns.listType(repetition, fieldName,
>              convertField("array", schema.getElementType(), REPEATED));
> -    } else {
> +    } else if (isReadHiveWriteFileDefault) {
> +       Type elementType = convertField("array_element", 
> schema.getElementType());
> +       return new GroupType(
> +           repetition,
> +           fieldName,
> +           LIST,
> +           new GroupType(Type.Repetition.REPEATED, "bag", elementType));
> +       } else {
>          return ConversionPatterns.listOfElements(repetition, fieldName,
>              convertField(AvroWriteSupport.LIST_ELEMENT_NAME, 
> schema.getElementType()));
>      }
> {code}
> It can read data with this file.
>  
> So is this a compatibility problem in parquet-avro, or just I missed some 
> configuration?
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to