Thijsvandepoll opened a new issue, #7600:
URL: https://github.com/apache/iceberg/issues/7600
### Apache Iceberg version
1.2.1 (latest release)
### Query engine
Spark
### Please describe the bug 🐞
Hi I have been seeing some unexpected behavior related to the sort ordering
of a Iceberg table. The problem is that I set up SORT ORDER correctly such that
the partitions are ordered. However, it seems from the data that it does not
respect this setting. To showcase the problem I have created a small example:
```
import os
from pyspark.sql import SparkSession
from pyspark.sql.types import IntegerType, StructField, StructType
deps = [
"org.apache.iceberg:iceberg-spark-runtime-3.3_2.12:1.2.1",
"org.apache.iceberg:iceberg-aws:1.2.1",
"software.amazon.awssdk:bundle:2.17.257",
"software.amazon.awssdk:url-connection-client:2.17.257",
]
os.environ["PYSPARK_SUBMIT_ARGS"] = f"--packages {','.join(deps)}
pyspark-shell"
os.environ["AWS_ACCESS_KEY_ID"] = "minioadmin"
os.environ["AWS_SECRET_ACCESS_KEY"] = "minioadmin"
os.environ["AWS_REGION"] = "eu-east-1"
catalog = "hive_catalog"
spark = (
SparkSession.builder.appName("Iceberg Reader")
.config(
"spark.sql.extensions",
"org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions",
)
.config(f"spark.sql.catalog.{catalog}",
"org.apache.iceberg.spark.SparkCatalog")
.config(f"spark.sql.catalog.{catalog}.type", "hive")
.config(f"spark.sql.catalog.{catalog}.uri", "thrift://localhost:9083")
.config(
f"spark.sql.catalog.{catalog}.io-impl",
"org.apache.iceberg.aws.s3.S3FileIO"
)
.config(f"spark.sql.catalog.{catalog}.s3.endpoint",
"http://localhost:9000")
.config(f"spark.sql.catalog.{catalog}.warehouse", "s3a://lakehouse")
.config("spark.sql.defaultCatalog", catalog)
.config("hive.metastore.uris", "thrift://localhost:9083")
.enableHiveSupport()
.getOrCreate()
)
# Create namespace and table
spark.sql("CREATE NAMESPACE IF NOT EXISTS new_ns;")
spark.sql(
"CREATE TABLE IF NOT EXISTS new_ns.new_table (first_id INT, second_id
INT)"
" PARTITIONED BY (second_id);"
)
# Add sort order
spark.sql("ALTER TABLE new_ns.new_table WRITE ORDERED BY first_id")
# Describe
spark.sql("DESCRIBE new_ns.new_table;").show()
#+--------------+---------+-------+
#| col_name|data_type|comment|
#+--------------+---------+-------+
#| first_id| int| |
#| second_id| int| |
#| | | |
#|# Partitioning| | |
#| Part 0|second_id| |
#+--------------+---------+-------+
```
This shows that the table has been created properly and the partitioning is
defined correctly. Also inspecting the metadata.json:
```{
"format-version" : 1,
"table-uuid" : "4a867367-6207-4e41-b8ea-45bbaf6ca4d0",
"location" : "s3a://lakehouse/new_ns.db/new_table",
"last-updated-ms" : 1683982109298,
"last-column-id" : 2,
"schema" : {
"type" : "struct",
"schema-id" : 0,
"fields" : [ {
"id" : 1,
"name" : "first_id",
"required" : false,
"type" : "int"
}, {
"id" : 2,
"name" : "second_id",
"required" : false,
"type" : "int"
} ]
},
"current-schema-id" : 0,
"schemas" : [ {
"type" : "struct",
"schema-id" : 0,
"fields" : [ {
"id" : 1,
"name" : "first_id",
"required" : false,
"type" : "int"
}, {
"id" : 2,
"name" : "second_id",
"required" : false,
"type" : "int"
} ]
} ],
"partition-spec" : [ {
"name" : "second_id",
"transform" : "identity",
"source-id" : 2,
"field-id" : 1000
} ],
"default-spec-id" : 0,
"partition-specs" : [ {
"spec-id" : 0,
"fields" : [ {
"name" : "second_id",
"transform" : "identity",
"source-id" : 2,
"field-id" : 1000
} ]
} ],
"last-partition-id" : 1000,
"default-sort-order-id" : 1,
"sort-orders" : [ {
"order-id" : 0,
"fields" : [ ]
}, {
"order-id" : 1,
"fields" : [ {
"transform" : "identity",
"source-id" : 1,
"direction" : "asc",
"null-order" : "nulls-first"
} ]
} ],
"properties" : {
"owner" : "thijsvandepoll",
"write.distribution-mode" : "range"
},
"current-snapshot-id" : -1,
"refs" : { },
"snapshots" : [ ],
"statistics" : [ ],
"snapshot-log" : [ ],
"metadata-log" : [ {
"timestamp-ms" : 1683982096980,
"metadata-file" :
"s3a://lakehouse/new_ns.db/new_table/metadata/00000-e96e0798-ed81-4e2d-91b2-e4705067999d.metadata.json"
} ]
}```
It seems like the `sort_order_id` is also configured correctly. The
`default-sort-order-id` is set to 1, which refers to the same ordering as the
one defined in the SQL.
Now we continue and try to insert some data:
```
# Insert some data
spark.createDataFrame(
[(1, 1), (2, 1), (3, 1)],
schema=StructType(
[
StructField("first_id", IntegerType()),
StructField("second_id", IntegerType()),
]
),
).createOrReplaceTempView("tmp")
# Merge into
spark.sql(
"""
MERGE INTO new_ns.new_table A
USING (SELECT * FROM tmp) B
ON A.first_id=B.first_id
WHEN MATCHED THEN UPDATE SET A.first_id=B.first_id,
A.second_id=B.second_id
WHEN NOT MATCHED THEN INSERT *
"""
)
spark.sql("SELECT * FROM new_ns.new_table;").show()
#+--------+---------+
#|first_id|second_id|
#+--------+---------+
#| 1| 1|
#| 2| 1|
#| 3| 1|
#+--------+---------+
# Now inspect the .files to check the sort_order_id
spark.sql(
"SELECT partition, record_count, sort_order_id FROM
new_ns.new_table.files"
).show()
#+---------+------------+-------------+
#|partition|record_count|sort_order_id|
#+---------+------------+-------------+
#| {1}| 3| 0|
#+---------+------------+-------------+
```
It shows that the wrong sort_order_id is being used.
I am not entirely sure if this is just wrongly reported or that it actually
uses the wrong sort_order_id. Does anyone have more information on this? Thanks
in advance!
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]