JonasJ-ap opened a new pull request, #6997:
URL: https://github.com/apache/iceberg/pull/6997

   ## Problem Addressed:
   This PR fixes #6505, #6647 .
   It adds support to infer iceberg schema from parquet schema when the parquet 
file does not contain metadata holding the encoded iceberg schema.
   
   This PR is still under development. I put it here to receive some initial 
feedback about the overall design and proper structure. Thank you in advance 
for your help.
   
   ## Tests:
   Working on unit tests.
   
   Sample on AWS Athena (reproduce and fix the bug following the procedures in 
#6505 ):
   1. Create a table on AWS Glue:
   ```scala
   val type_frame = spark
                           .range(0, 5, 1, 5)
                           .withColumnRenamed("id", "longCol")
                           .withColumn("intCol", expr("CAST(longCol AS INT)"))
                           .withColumn("floatCol", expr("CAST(longCol AS 
FLOAT)"))
                           .withColumn("doubleCol", expr("CAST(longCol AS 
DOUBLE)"))
                           .withColumn("dateCol", date_add(current_date(), 1))
                           .withColumn("timestampCol", 
expr("TO_TIMESTAMP(dateCol)"))
                           .withColumn("stringCol", expr("CAST(dateCol AS 
STRING)"))
                           .withColumn("booleanCol", expr("longCol > 5"))
                           .withColumn("binaryCol", expr("CAST(longCol AS 
BINARY)"))
                           .withColumn("byteCol", expr("CAST(longCol AS BYTE)"))
                           .withColumn("decimalCol", expr("CAST(longCol AS 
DECIMAL(10, 2))"))
                           .withColumn("shortCol", expr("CAST(longCol AS 
SHORT)"))
                           .withColumn("mapCol", expr("MAP(longCol, 
decimalCol)"))
                           .withColumn("arrayCol", expr("ARRAY(longCol)"))
                           .withColumn("structCol", expr("STRUCT(mapCol, 
arrayCol)"))
   
type_frame.writeTo(s"demo.$DB_NAME.type_test_ref_unpartitioned2").tableProperty("format-version",
 "2").createOrReplace()
   ```
   2. Use AWS Athena to optimize the table
   ```bash
   OPTIMIZE type_test_ref_unpartitioned REWRITE DATA USING BIN_PACK;
   ```
   3. Run code snippet leads to error:
   ```python
   from pyiceberg.catalog import load_catalog
   
   catalog = load_catalog("default", warehouse="s3://gluetestjonas/warehouse")
   table = catalog.load_table("iceberg_ref.type_test_ref_unpartitioned")
   
   df = table.scan().to_arrow()
   print(df)
   ```
   On current master branch:
   ```bash
   issue_6505 python3.10 parquet_schema.py           
   Traceback (most recent call last):
     File 
"/Users/jonasjiang/Workspace/Apache_Iceberg_ws/python_test/issue_6505/parquet_schema.py",
 line 6, in <module>
       df = table.scan().to_arrow()
     File 
"/Users/jonasjiang/Workspace/Apache_Iceberg_ws/iceberg_python/python/pyiceberg/table/__init__.py",
 line 404, in to_arrow
       return project_table(
     File 
"/Users/jonasjiang/Workspace/Apache_Iceberg_ws/iceberg_python/python/pyiceberg/io/pyarrow.py",
 line 777, in project_table
       for table in pool.starmap(
     File 
"/opt/homebrew/Cellar/[email protected]/3.10.10_1/Frameworks/Python.framework/Versions/3.10/lib/python3.10/multiprocessing/pool.py",
 line 375, in starmap
       return self._map_async(func, iterable, starmapstar, chunksize).get()
     File 
"/opt/homebrew/Cellar/[email protected]/3.10.10_1/Frameworks/Python.framework/Versions/3.10/lib/python3.10/multiprocessing/pool.py",
 line 774, in get
       raise self._value
     File 
"/opt/homebrew/Cellar/[email protected]/3.10.10_1/Frameworks/Python.framework/Versions/3.10/lib/python3.10/multiprocessing/pool.py",
 line 125, in worker
       result = (True, func(*args, **kwds))
     File 
"/opt/homebrew/Cellar/[email protected]/3.10.10_1/Frameworks/Python.framework/Versions/3.10/lib/python3.10/multiprocessing/pool.py",
 line 51, in starmapstar
       return list(itertools.starmap(args[0], args[1]))
     File 
"/Users/jonasjiang/Workspace/Apache_Iceberg_ws/iceberg_python/python/pyiceberg/io/pyarrow.py",
 line 714, in _file_to_table
       raise ValueError(
   ValueError: Iceberg schema is not embedded into the Parquet file, see 
https://github.com/apache/iceberg/issues/6505
   ```
   On this PR:
   ```bash
   issue_6505 python3.10 parquet_schema.py
   pyarrow.Table
   longCol: int64
   intCol: int32
   floatCol: float
   doubleCol: double
   dateCol: date32[day]
   timestampCol: timestamp[us, tz=UTC]
   stringCol: string
   booleanCol: bool
   binaryCol: binary
   byteCol: int32
   decimalCol: decimal128(10, 2)
   shortCol: int32
   mapCol: map<int64, decimal128(10, 2)>
     child 0, entries: struct<key: int64 not null, value: decimal128(10, 2)> 
not null
         child 0, key: int64 not null
         child 1, value: decimal128(10, 2)
   arrayCol: list<item: int64>
     child 0, item: int64
   structCol: struct<mapCol: map<int64, decimal128(10, 2)>, arrayCol: 
list<item: int64>>
     child 0, mapCol: map<int64, decimal128(10, 2)>
         child 0, entries: struct<key: int64 not null, value: decimal128(10, 
2)> not null
             child 0, key: int64 not null
             child 1, value: decimal128(10, 2)
     child 1, arrayCol: list<item: int64>
         child 0, item: int64
   ----
   longCol: [[3,0,2,1,4]]
   intCol: [[3,0,2,1,4]]
   floatCol: [[3,0,2,1,4]]
   doubleCol: [[3,0,2,1,4]]
   dateCol: [[2023-03-04,2023-03-04,2023-03-04,2023-03-04,2023-03-04]]
   timestampCol: [[2023-03-04 00:00:00.000000,2023-03-04 
00:00:00.000000,2023-03-04 00:00:00.000000,2023-03-04 
00:00:00.000000,2023-03-04 00:00:00.000000]]
   stringCol: 
[["2023-03-04","2023-03-04","2023-03-04","2023-03-04","2023-03-04"]]
   booleanCol: [[false,false,false,false,false]]
   binaryCol: 
[[0000000000000003,0000000000000000,0000000000000002,0000000000000001,0000000000000004]]
   byteCol: [[3,0,2,1,4]]
   ```
   Indicating now the table can be read normally
   
   `.pyiceberg.yaml`:
   ```yaml
   catalog:
     default:
       type: glue
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to