Thijsvandepoll opened a new issue, #7600:
URL: https://github.com/apache/iceberg/issues/7600

   ### Apache Iceberg version
   
   1.2.1 (latest release)
   
   ### Query engine
   
   Spark
   
   ### Please describe the bug 🐞
   
   Hi I have been seeing some unexpected behavior related to the sort ordering 
of a Iceberg table. The problem is that I set up SORT ORDER correctly such that 
the partitions are ordered. However, it seems from the data that it does not 
respect this setting. To showcase the problem I have created a small example:
   
   ```
   import os
   
   from pyspark.sql import SparkSession
   from pyspark.sql.types import IntegerType, StructField, StructType
   
   deps = [
       "org.apache.iceberg:iceberg-spark-runtime-3.3_2.12:1.2.1",
       "org.apache.iceberg:iceberg-aws:1.2.1",
       "software.amazon.awssdk:bundle:2.17.257",
       "software.amazon.awssdk:url-connection-client:2.17.257",
   ]
   os.environ["PYSPARK_SUBMIT_ARGS"] = f"--packages {','.join(deps)} 
pyspark-shell"
   os.environ["AWS_ACCESS_KEY_ID"] = "minioadmin"
   os.environ["AWS_SECRET_ACCESS_KEY"] = "minioadmin"
   os.environ["AWS_REGION"] = "eu-east-1"
   
   
   catalog = "hive_catalog"
   spark = (
       SparkSession.builder.appName("Iceberg Reader")
       .config(
           "spark.sql.extensions",
           "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions",
       )
       .config(f"spark.sql.catalog.{catalog}", 
"org.apache.iceberg.spark.SparkCatalog")
       .config(f"spark.sql.catalog.{catalog}.type", "hive")
       .config(f"spark.sql.catalog.{catalog}.uri", "thrift://localhost:9083")
       .config(
           f"spark.sql.catalog.{catalog}.io-impl", 
"org.apache.iceberg.aws.s3.S3FileIO"
       )
       .config(f"spark.sql.catalog.{catalog}.s3.endpoint", 
"http://localhost:9000";)
       .config(f"spark.sql.catalog.{catalog}.warehouse", "s3a://lakehouse")
       .config("spark.sql.defaultCatalog", catalog)
       .config("hive.metastore.uris", "thrift://localhost:9083")
       .enableHiveSupport()
       .getOrCreate()
   )
   
   # Create namespace and table
   spark.sql("CREATE NAMESPACE IF NOT EXISTS new_ns;")
   spark.sql(
       "CREATE TABLE IF NOT EXISTS new_ns.new_table (first_id INT, second_id 
INT)"
       " PARTITIONED BY (second_id);"
   )
   
   # Add sort order
   spark.sql("ALTER TABLE new_ns.new_table WRITE ORDERED BY first_id")
   
   # Describe
   spark.sql("DESCRIBE new_ns.new_table;").show()
   #+--------------+---------+-------+
   #|      col_name|data_type|comment|
   #+--------------+---------+-------+
   #|      first_id|      int|       |
   #|     second_id|      int|       |
   #|              |         |       |
   #|# Partitioning|         |       |
   #|        Part 0|second_id|       |
   #+--------------+---------+-------+
   ```
   
   This shows that the table has been created properly and the partitioning is 
defined correctly. Also inspecting the metadata.json:
   
   ```{
     "format-version" : 1,
     "table-uuid" : "4a867367-6207-4e41-b8ea-45bbaf6ca4d0",
     "location" : "s3a://lakehouse/new_ns.db/new_table",
     "last-updated-ms" : 1683982109298,
     "last-column-id" : 2,
     "schema" : {
       "type" : "struct",
       "schema-id" : 0,
       "fields" : [ {
         "id" : 1,
         "name" : "first_id",
         "required" : false,
         "type" : "int"
       }, {
         "id" : 2,
         "name" : "second_id",
         "required" : false,
         "type" : "int"
       } ]
     },
     "current-schema-id" : 0,
     "schemas" : [ {
       "type" : "struct",
       "schema-id" : 0,
       "fields" : [ {
         "id" : 1,
         "name" : "first_id",
         "required" : false,
         "type" : "int"
       }, {
         "id" : 2,
         "name" : "second_id",
         "required" : false,
         "type" : "int"
       } ]
     } ],
     "partition-spec" : [ {
       "name" : "second_id",
       "transform" : "identity",
       "source-id" : 2,
       "field-id" : 1000
     } ],
     "default-spec-id" : 0,
     "partition-specs" : [ {
       "spec-id" : 0,
       "fields" : [ {
         "name" : "second_id",
         "transform" : "identity",
         "source-id" : 2,
         "field-id" : 1000
       } ]
     } ],
     "last-partition-id" : 1000,
     "default-sort-order-id" : 1,
     "sort-orders" : [ {
       "order-id" : 0,
       "fields" : [ ]
     }, {
       "order-id" : 1,
       "fields" : [ {
         "transform" : "identity",
         "source-id" : 1,
         "direction" : "asc",
         "null-order" : "nulls-first"
       } ]
     } ],
     "properties" : {
       "owner" : "thijsvandepoll",
       "write.distribution-mode" : "range"
     },
     "current-snapshot-id" : -1,
     "refs" : { },
     "snapshots" : [ ],
     "statistics" : [ ],
     "snapshot-log" : [ ],
     "metadata-log" : [ {
       "timestamp-ms" : 1683982096980,
       "metadata-file" : 
"s3a://lakehouse/new_ns.db/new_table/metadata/00000-e96e0798-ed81-4e2d-91b2-e4705067999d.metadata.json"
     } ]
   }```
   
   It seems like the `sort_order_id` is also configured correctly. The 
`default-sort-order-id` is set to 1, which refers to the same ordering as the 
one defined in the SQL.
   
   
   Now we continue and try to insert some data:
   ```
   # Insert some data
   spark.createDataFrame(
       [(1, 1), (2, 1), (3, 1)],
       schema=StructType(
           [
               StructField("first_id", IntegerType()),
               StructField("second_id", IntegerType()),
           ]
       ),
   ).createOrReplaceTempView("tmp")
   
   # Merge into
   spark.sql(
       """
       MERGE INTO new_ns.new_table A 
       USING (SELECT * FROM tmp) B 
       ON A.first_id=B.first_id
       WHEN MATCHED THEN UPDATE SET A.first_id=B.first_id, 
A.second_id=B.second_id
       WHEN NOT MATCHED THEN INSERT *
       """
   )
   
   spark.sql("SELECT * FROM new_ns.new_table;").show()
   #+--------+---------+
   #|first_id|second_id|
   #+--------+---------+
   #|       1|        1|
   #|       2|        1|
   #|       3|        1|
   #+--------+---------+
   
   # Now inspect the .files to check the sort_order_id
   spark.sql(
       "SELECT partition, record_count, sort_order_id FROM 
new_ns.new_table.files"
   ).show()
   #+---------+------------+-------------+
   #|partition|record_count|sort_order_id|
   #+---------+------------+-------------+
   #|      {1}|           3|            0|
   #+---------+------------+-------------+
   ```
   
   It shows that the wrong sort_order_id is being used.
   
   I am not entirely sure if this is just wrongly reported or that it actually 
uses the wrong sort_order_id. Does anyone have more information on this? Thanks 
in advance!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to