and124578963 opened a new issue, #4590:
URL: https://github.com/apache/datafusion-comet/issues/4590
### Describe the bug
Comet native Iceberg scan can return duplicate rows when an Iceberg
`FileScanTask` byte range splits a Parquet file that contains a single row
group.
The issue appears when:
- the Iceberg table has a Parquet data file with one row group;
- the file is planned into multiple byte-range scan tasks, e.g. via
`split-size` smaller than row group size;
- Comet uses `CometIcebergNativeScanExec`.
Expected behavior: the row group should be read by exactly one split.
Actual behavior: the same row group is read by multiple split tasks, so
matching rows are returned multiple times.
Vanilla Spark/Iceberg does not duplicate rows.
### Steps to reproduce
Minimal Scala regression test:
```scala
test("native Iceberg scan does not duplicate a row group split by byte
range") {
assume(icebergAvailable, "Iceberg not available in classpath")
withTempIcebergDir { warehouseDir =>
withSQLConf(
"spark.sql.catalog.split_cat" ->
"org.apache.iceberg.spark.SparkCatalog",
"spark.sql.catalog.split_cat.type" -> "hadoop",
"spark.sql.catalog.split_cat.warehouse" ->
warehouseDir.getAbsolutePath,
CometConf.COMET_ENABLED.key -> "true",
CometConf.COMET_EXEC_ENABLED.key -> "true",
CometConf.COMET_ICEBERG_NATIVE_ENABLED.key -> "true") {
val dataPath =
s"${warehouseDir.getAbsolutePath}/single_row_group_parquet"
spark
.sql("SELECT CAST(0 AS INT) AS id, repeat('x', 1024) AS payload")
.coalesce(1)
.write
.mode("overwrite")
.parquet(dataPath)
spark.sql("""
CREATE TABLE split_cat.db.single_row_group_split (
id INT,
payload STRING
) USING iceberg
""")
val parquetFiles = new File(dataPath)
.listFiles()
.filter(file => file.getName.startsWith("part-") &&
file.getName.endsWith(".parquet"))
assert(parquetFiles.length == 1)
val sourceParquetFile = parquetFiles.head
val catalog = spark.sessionState.catalogManager.catalog("split_cat")
val ident =
org.apache.spark.sql.connector.catalog.Identifier.of(Array("db"),
"single_row_group_split")
val table = catalog
.asInstanceOf[org.apache.iceberg.spark.SparkCatalog]
.loadTable(ident)
.asInstanceOf[org.apache.iceberg.spark.source.SparkTable]
.table()
val dataFile = org.apache.iceberg.DataFiles
.builder(table.spec())
.withPath(sourceParquetFile.getAbsolutePath)
.withFormat(org.apache.iceberg.FileFormat.PARQUET)
.withFileSizeInBytes(sourceParquetFile.length())
.withRecordCount(1)
.build()
table.newAppend().appendFile(dataFile).commit()
val df = spark.read
.format("iceberg")
.option("split-size", "64")
.option("file-open-cost", "64")
.load("split_cat.db.single_row_group_split")
.where("id = 0")
.select("id")
val rows = df.collect()
assert(rows.length == 1, s"Expected 1 row, got ${rows.length}:
${rows.mkString(", ")}")
}
}
}
```
### Actual behavior
The query returns the same row multiple times.
Example failure:
```text
Expected 1 row, got 4: [0], [0], [0], [0]
```
### Expected behavior
The query should return exactly one row:
```text
[0]
```
### Additional context
The same Parquet row group appears to be selected by multiple byte-range
tasks.
This happens because the native iceberg-rust reader treats row group
selection as byte-range overlap:
```text
row_group_start < split_end && split_start < row_group_end
```
For a single row group split into N byte ranges, all N ranges overlap the
row group, so all N tasks read and emit it.
Parquet Java / vanilla Spark avoids this by assigning each row group to
exactly one split, using row-group midpoint ownership semantics. In
parquet-java, split filtering keeps a row group only when the split's range
contains the row group's midpoint:
```java
long midPoint = startIndex + totalSize / 2;
if (filter.contains(midPoint)) {
newRowGroups.add(rowGroup);
}
```
### Environment
- Comet version: current local 0.16.0-SNAPSHOT / 0.16.0.001
- Spark: 3.5.8
- Iceberg catalog: Hadoop catalog
- Native Iceberg scan enabled:
```text
spark.comet.scan.icebergNative.enabled=true
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]