anuragmantri commented on code in PR #14948:
URL: https://github.com/apache/iceberg/pull/14948#discussion_r3282175925
##########
spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatch.java:
##########
@@ -94,21 +97,40 @@ public InputPartition[] planInputPartitions() {
InputPartition[] partitions = new InputPartition[taskGroups.size()];
for (int index = 0; index < taskGroups.size(); index++) {
+ ScanTaskGroup<?> taskGroup = taskGroups.get(index);
+
partitions[index] =
new SparkInputPartition(
groupingKeyType,
- taskGroups.get(index),
+ taskGroup,
tableBroadcast,
fileIOBroadcast,
projectionString,
caseSensitive,
locations != null ? locations[index] :
SparkPlanningUtil.NO_LOCATION_PREFERENCE,
- cacheDeleteFilesOnExecutors);
+ cacheDeleteFilesOnExecutors,
+ shouldUseMergingSortedReader(taskGroup));
}
return partitions;
}
+ /** Returns whether sort ordering was reported for this batch's scan. */
+ private boolean isOrderingEnabled() {
Review Comment:
Inlined it with `orderingEnabled` boolean.
##########
spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SortOrderAnalyzer.java:
##########
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.source;
+
+import java.util.List;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.ScanTask;
+import org.apache.iceberg.ScanTaskGroup;
+import org.apache.iceberg.SortOrder;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.StructLikeSet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Analyzes whether sort ordering can be reported for a table's task groups.
+ *
+ * <p>For sort ordering to be reported, ALL of these conditions must hold:
+ *
+ * <ul>
+ * <li>The table has a defined sort order (non-null and {@code
sortOrder.isSorted() == true})
+ * <li>Each partition key maps to exactly ONE task group (Spark drops the
ordering guarantee when
+ * multiple {@code InputPartition}s share the same partition key)
+ * <li>Every {@link FileScanTask} in every task group carries the current
sort order ID
+ * </ul>
+ */
+class SortOrderAnalyzer {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(SortOrderAnalyzer.class);
+
+ private SortOrderAnalyzer() {}
+
+ /**
+ * Returns {@code true} only when sort ordering can be safely reported to
Spark for the given
+ * table and task groups.
+ */
+ static boolean canReportOrdering(
+ Table table, List<? extends ScanTaskGroup<?>> taskGroups,
Types.StructType groupingKeyType) {
+
+ SortOrder sortOrder = table.sortOrder();
+
+ if (sortOrder == null || sortOrder.isUnsorted()) {
+ LOG.debug("Cannot report ordering: table {} has no sort order defined",
table.name());
+ return false;
+ }
+
+ if (taskGroups == null || taskGroups.isEmpty()) {
+ LOG.debug("Cannot report ordering: no task groups for table {}",
table.name());
+ return false;
+ }
+
+ if (!hasUniquePartitionKeys(taskGroups, groupingKeyType)) {
+ LOG.debug(
+ "Cannot report ordering: table {} has multiple task groups sharing
the same partition"
+ + " key.",
+ table.name());
+ return false;
+ }
+
+ for (ScanTaskGroup<?> taskGroup : taskGroups) {
+ if (!allFilesHaveSortOrder(taskGroup, sortOrder.orderId())) {
+ LOG.debug(
+ "Cannot report ordering: table {} has files whose sort order ID
does not match the"
+ + " current table sort order {}",
+ table.name(),
+ sortOrder.orderId());
+ return false;
+ }
+ }
+
+ return true;
+ }
+
+ /**
+ * Checks that each partition key appears in at most one task group.
+ *
+ * <p>When multiple {@code InputPartition}s share the same partition key,
Spark's {@code
+ * EnsureRequirements} coalesces them into a single task at join/aggregate
time. Because this
+ * coalescing simply concatenates the partitions rather than merge-sorting
them, it destroys the
+ * within-partition ordering guarantee. Reporting ordering in this situation
would cause incorrect
+ * query results.
+ */
+ private static boolean hasUniquePartitionKeys(
+ List<? extends ScanTaskGroup<?>> taskGroups, Types.StructType
groupingKeyType) {
+
+ if (groupingKeyType == null || groupingKeyType.fields().isEmpty()) {
+ return true;
+ }
+
+ StructLikeSet seenKeys = StructLikeSet.create(groupingKeyType);
+ for (ScanTaskGroup<?> taskGroup : taskGroups) {
+ StructLike key = taskGroup.groupingKey();
+ if (key != null && !seenKeys.add(key)) {
+ return false;
+ }
+ }
+
+ return true;
+ }
+
+ /**
+ * Checks that every {@link FileScanTask} in the task group carries a sort
order ID that matches
+ * the table's current sort order.
+ *
+ * <p>Non-{@code FileScanTask} entries (e.g. changelog tasks) are skipped.
+ */
+ private static boolean allFilesHaveSortOrder(
+ ScanTaskGroup<?> taskGroup, int expectedSortOrderId) {
+ for (ScanTask task : taskGroup.tasks()) {
+ if (!(task instanceof FileScanTask)) {
+ continue;
+ }
+
+ FileScanTask fileTask = (FileScanTask) task;
Review Comment:
Done.
##########
spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestSupportsReportOrdering.java:
##########
@@ -0,0 +1,958 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.source;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.iceberg.ParameterizedTestExtension;
+import org.apache.iceberg.ReplaceSortOrder;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.spark.SparkSQLProperties;
+import org.apache.iceberg.spark.TestBaseWithCatalog;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
+import org.apache.spark.sql.connector.read.Scan;
+import org.apache.spark.sql.connector.read.SupportsReportOrdering;
+import org.apache.spark.sql.execution.SortExec;
+import org.apache.spark.sql.execution.SparkPlan;
+import org.apache.spark.sql.util.CaseInsensitiveStringMap;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.TestTemplate;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+@ExtendWith(ParameterizedTestExtension.class)
+class TestSupportsReportOrdering extends TestBaseWithCatalog {
+
+ private static final Map<String, String> ENABLED_ORDERING_SQL_CONF =
orderingConfig(true);
+ private static final Map<String, String> DISABLED_ORDERING_SQL_CONF =
orderingConfig(false);
+
+ private static Map<String, String> orderingConfig(boolean preserveOrdering) {
+ return ImmutableMap.<String, String>builder()
+ .put(SparkSQLProperties.PRESERVE_DATA_ORDERING,
String.valueOf(preserveOrdering))
+ .put(SparkSQLProperties.PRESERVE_DATA_GROUPING, "true")
Review Comment:
Fixed
##########
spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestSupportsReportOrdering.java:
##########
@@ -0,0 +1,958 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.source;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.iceberg.ParameterizedTestExtension;
+import org.apache.iceberg.ReplaceSortOrder;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.spark.SparkSQLProperties;
+import org.apache.iceberg.spark.TestBaseWithCatalog;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
+import org.apache.spark.sql.connector.read.Scan;
+import org.apache.spark.sql.connector.read.SupportsReportOrdering;
+import org.apache.spark.sql.execution.SortExec;
+import org.apache.spark.sql.execution.SparkPlan;
+import org.apache.spark.sql.util.CaseInsensitiveStringMap;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.TestTemplate;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+@ExtendWith(ParameterizedTestExtension.class)
+class TestSupportsReportOrdering extends TestBaseWithCatalog {
+
+ private static final Map<String, String> ENABLED_ORDERING_SQL_CONF =
orderingConfig(true);
+ private static final Map<String, String> DISABLED_ORDERING_SQL_CONF =
orderingConfig(false);
+
+ private static Map<String, String> orderingConfig(boolean preserveOrdering) {
+ return ImmutableMap.<String, String>builder()
+ .put(SparkSQLProperties.PRESERVE_DATA_ORDERING,
String.valueOf(preserveOrdering))
+ .put(SparkSQLProperties.PRESERVE_DATA_GROUPING, "true")
+ .put("spark.sql.autoBroadcastJoinThreshold", "-1")
+ .put("spark.sql.adaptive.enabled", "false")
+ .put("spark.sql.sources.v2.bucketing.enabled", "true")
+ .put("spark.sql.sources.v2.bucketing.pushPartValues.enabled", "true")
+ .put("spark.sql.requireAllClusterKeysForCoPartition", "false")
+ .buildOrThrow();
+ }
+
+ @BeforeEach
+ void useCatalog() {
+ sql("USE %s", catalogName);
+ }
+
+ @AfterEach
+ void removeTables() {
+ sql("DROP TABLE IF EXISTS %s", tableName);
+ sql("DROP TABLE IF EXISTS %s", tableName("table_source"));
+ spark.conf().unset(SparkSQLProperties.PRESERVE_DATA_ORDERING);
+ }
+
+ @TestTemplate
+ void testMergingMultipleSortedFiles() throws NoSuchTableException {
+ Table table = createSimpleTable(tableName);
+ setSortOrder(table, "id");
+
+ writeBatches(
+ tableName,
+ SimpleRecord.class,
+ ImmutableList.of(new SimpleRecord(1, "a"), new SimpleRecord(2, "b")),
+ ImmutableList.of(new SimpleRecord(3, "c"), new SimpleRecord(4, "d")),
+ ImmutableList.of(new SimpleRecord(5, "e"), new SimpleRecord(6, "f")),
+ ImmutableList.of(new SimpleRecord(7, "g"), new SimpleRecord(8, "h")));
+
+ spark.conf().set(SparkSQLProperties.PRESERVE_DATA_ORDERING, "true");
+
+ Dataset<Row> result =
+ spark.sql(String.format("SELECT id, data FROM %s ORDER BY id",
tableName));
+ List<Object[]> rows = rowsToJava(result.collectAsList());
+
+ assertThat(rows)
+ .hasSize(8)
+ .containsExactly(
+ row(1, "a"),
+ row(2, "b"),
+ row(3, "c"),
+ row(4, "d"),
+ row(5, "e"),
+ row(6, "f"),
+ row(7, "g"),
+ row(8, "h"));
+ }
+
+ @TestTemplate
+ void testMergingWithDuplicateSortKeyValues() throws NoSuchTableException {
+ Table table = createSimpleTable(tableName);
+ setSortOrder(table, "id");
+
+ // The same id values appear across multiple files — the k-way merge must
correctly
+ // interleave rows with equal keys rather than dropping or mis-ordering
them.
+ writeBatches(
+ tableName,
+ SimpleRecord.class,
+ ImmutableList.of(new SimpleRecord(1, "a"), new SimpleRecord(2, "b")),
+ ImmutableList.of(new SimpleRecord(1, "c"), new SimpleRecord(2, "d")),
+ ImmutableList.of(new SimpleRecord(1, "e"), new SimpleRecord(3, "f")));
+
+ spark.conf().set(SparkSQLProperties.PRESERVE_DATA_ORDERING, "true");
+
+ Dataset<Row> result =
+ spark.sql(String.format("SELECT id, data FROM %s ORDER BY id, data",
tableName));
+ List<Object[]> rows = rowsToJava(result.collectAsList());
+
+ assertThat(rows)
+ .hasSize(6)
+ .containsExactly(
+ row(1, "a"), row(1, "c"), row(1, "e"), row(2, "b"), row(2, "d"),
row(3, "f"));
+ }
+
+ @TestTemplate
+ void testMergingWithNullsInSortKeyColumn() throws NoSuchTableException {
+ sql(
+ "CREATE TABLE %s (c1 INT, c2 STRING, c3 STRING) "
+ + "USING iceberg "
+ + "PARTITIONED BY (c3)",
+ tableName);
+
+ Table table = validationCatalog.loadTable(tableIdent);
+ setSortOrder(table, "c1"); // ASC NULLS FIRST (Iceberg default)
+
+ spark.conf().set(SparkSQLProperties.PRESERVE_DATA_ORDERING, "true");
+ spark.conf().set(SparkSQLProperties.PRESERVE_DATA_GROUPING, "true");
+
+ sql("INSERT INTO %s VALUES (null, 'x', 'P1'), (3, 'c', 'P1')", tableName);
+ sql("INSERT INTO %s VALUES (null, 'y', 'P1'), (1, 'a', 'P1'), (2, 'b',
'P1')", tableName);
+
+ Dataset<Row> result =
+ spark.sql(
+ String.format(
+ "SELECT c1, c2 FROM %s WHERE c3 = 'P1' ORDER BY c1 ASC NULLS
FIRST, c2",
+ tableName));
+ List<Object[]> rows = rowsToJava(result.collectAsList());
+
+ assertThat(rows)
+ .hasSize(5)
+ .containsExactly(row(null, "x"), row(null, "y"), row(1, "a"), row(2,
"b"), row(3, "c"));
+ }
+
+ @TestTemplate
+ void testMergingWithNullsInDescendingSortKeyColumn() throws
NoSuchTableException {
+ sql(
+ "CREATE TABLE %s (c1 INT, c2 STRING, c3 STRING) "
+ + "USING iceberg "
+ + "PARTITIONED BY (c3)",
+ tableName);
+
+ Table table = validationCatalog.loadTable(tableIdent);
+ table.replaceSortOrder().desc("c1").commit(); // DESC NULLS LAST (Iceberg
default for DESC)
+
+ spark.conf().set(SparkSQLProperties.PRESERVE_DATA_ORDERING, "true");
+ spark.conf().set(SparkSQLProperties.PRESERVE_DATA_GROUPING, "true");
+
+ sql("INSERT INTO %s VALUES (null, 'x', 'P1'), (1, 'a', 'P1')", tableName);
+ sql("INSERT INTO %s VALUES (null, 'y', 'P1'), (3, 'c', 'P1'), (2, 'b',
'P1')", tableName);
+
+ Dataset<Row> result =
+ spark.sql(
+ String.format(
+ "SELECT c1, c2 FROM %s WHERE c3 = 'P1' ORDER BY c1 DESC NULLS
LAST, c2",
+ tableName));
+ List<Object[]> rows = rowsToJava(result.collectAsList());
+
+ assertThat(rows)
+ .hasSize(5)
+ .containsExactly(row(3, "c"), row(2, "b"), row(1, "a"), row(null,
"x"), row(null, "y"));
+ }
+
+ @TestTemplate
+ void testDescendingSortOrder() throws NoSuchTableException {
+ Table table = createSimpleTable(tableName);
+ table.replaceSortOrder().desc("id").commit();
+
+ writeBatches(
+ tableName,
+ SimpleRecord.class,
+ ImmutableList.of(new SimpleRecord(10, "j"), new SimpleRecord(9, "i")),
+ ImmutableList.of(new SimpleRecord(8, "h"), new SimpleRecord(7, "g")),
+ ImmutableList.of(new SimpleRecord(6, "f"), new SimpleRecord(4, "d")));
+
+ spark.conf().set(SparkSQLProperties.PRESERVE_DATA_ORDERING, "true");
+
+ Dataset<Row> result = spark.sql(String.format("SELECT id FROM %s ORDER BY
id DESC", tableName));
+ List<Object[]> rows = rowsToJava(result.collectAsList());
+
+ assertThat(rows).hasSize(6).containsExactly(row(10), row(9), row(8),
row(7), row(6), row(4));
+ }
+
+ @TestTemplate
+ void testMultiColumnSortOrder() throws NoSuchTableException {
+ Table table = createThreeColumnTable(tableName);
+ setSortOrder(table, "c3", "c1");
+
+ writeBatches(
+ tableName,
+ ThreeColumnRecord.class,
+ ImmutableList.of(new ThreeColumnRecord(1, "a", "A"), new
ThreeColumnRecord(3, "c", "A")),
+ ImmutableList.of(new ThreeColumnRecord(2, "b", "A"), new
ThreeColumnRecord(1, "a", "B")),
+ ImmutableList.of(new ThreeColumnRecord(2, "b", "B"), new
ThreeColumnRecord(3, "c", "B")));
+
+ spark.conf().set(SparkSQLProperties.PRESERVE_DATA_ORDERING, "true");
+
+ Dataset<Row> result =
+ spark.sql(String.format("SELECT c3, c1, c2 FROM %s ORDER BY c3, c1",
tableName));
+ List<Object[]> rows = rowsToJava(result.collectAsList());
+
+ assertThat(rows)
+ .hasSize(6)
+ .containsExactly(
+ row("A", 1, "a"),
+ row("A", 2, "b"),
+ row("A", 3, "c"),
+ row("B", 1, "a"),
+ row("B", 2, "b"),
+ row("B", 3, "c"));
+ }
+
+ @TestTemplate
+ void testSingleFileDoesNotRequireMerging() throws NoSuchTableException {
+ Table table = createSimpleTable(tableName);
+ setSortOrder(table, "id");
+
+ List<SimpleRecord> batch = ImmutableList.of(new SimpleRecord(1, "a"), new
SimpleRecord(2, "b"));
+ spark.createDataFrame(batch,
SimpleRecord.class).coalesce(1).writeTo(tableName).append();
+
+ spark.conf().set(SparkSQLProperties.PRESERVE_DATA_ORDERING, "true");
+
+ Dataset<Row> result = spark.sql(String.format("SELECT * FROM %s",
tableName));
+ List<Object[]> rows = rowsToJava(result.collectAsList());
+
+ assertThat(rows).hasSize(2);
+ }
+
+ @TestTemplate
+ void testPartitionedTableWithMultipleFilesPerPartition() throws
NoSuchTableException {
+ sql(
+ "CREATE TABLE %s (c1 INT, c2 STRING, c3 STRING) "
+ + "USING iceberg "
+ + "PARTITIONED BY (c3)",
+ tableName);
+
+ Table table = validationCatalog.loadTable(tableIdent);
+ setSortOrder(table, "c1");
+
+ writeBatches(
+ tableName,
+ ThreeColumnRecord.class,
+ ImmutableList.of(new ThreeColumnRecord(1, "a", "P1"), new
ThreeColumnRecord(3, "c", "P1")),
+ ImmutableList.of(new ThreeColumnRecord(2, "b", "P1"), new
ThreeColumnRecord(4, "d", "P1")),
+ ImmutableList.of(new ThreeColumnRecord(5, "e", "P2"), new
ThreeColumnRecord(7, "g", "P2")),
+ ImmutableList.of(new ThreeColumnRecord(6, "f", "P2"), new
ThreeColumnRecord(8, "h", "P2")));
+
+ spark.conf().set(SparkSQLProperties.PRESERVE_DATA_ORDERING, "true");
+
+ Dataset<Row> p1Result =
+ spark.sql(String.format("SELECT c1, c2 FROM %s WHERE c3 = 'P1' ORDER
BY c1", tableName));
+ List<Object[]> p1Rows = rowsToJava(p1Result.collectAsList());
+
+ assertThat(p1Rows)
+ .hasSize(4)
+ .containsExactly(row(1, "a"), row(2, "b"), row(3, "c"), row(4, "d"));
+
+ Dataset<Row> p2Result =
+ spark.sql(String.format("SELECT c1, c2 FROM %s WHERE c3 = 'P2' ORDER
BY c1", tableName));
+ List<Object[]> p2Rows = rowsToJava(p2Result.collectAsList());
+
+ assertThat(p2Rows)
+ .hasSize(4)
+ .containsExactly(row(5, "e"), row(6, "f"), row(7, "g"), row(8, "h"));
+ }
+
+ @TestTemplate
+ void testOrderingNotReportedWhenDisabled() throws NoSuchTableException {
+ Table table = createSimpleTable(tableName);
+ setSortOrder(table, "id");
+
+ List<SimpleRecord> batch = ImmutableList.of(new SimpleRecord(1, "a"), new
SimpleRecord(2, "b"));
+ spark.createDataFrame(batch,
SimpleRecord.class).coalesce(1).writeTo(tableName).append();
+
+ spark.conf().unset(SparkSQLProperties.PRESERVE_DATA_ORDERING);
+
+ Dataset<Row> result = spark.sql(String.format("SELECT * FROM %s",
tableName));
+ List<Object[]> rows = rowsToJava(result.collectAsList());
+
+ assertThat(rows).hasSize(2);
+ }
+
+ @TestTemplate
+ void testOrderingNotReportedWhenGroupingDisabled() throws
NoSuchTableException {
+ sql(
+ "CREATE TABLE %s (c1 INT, c2 STRING, c3 STRING) "
+ + "USING iceberg "
+ + "PARTITIONED BY (c3)",
+ tableName);
+
+ Table table = validationCatalog.loadTable(tableIdent);
+ setSortOrder(table, "c1");
+
+ writeBatches(
+ tableName,
+ ThreeColumnRecord.class,
+ ImmutableList.of(new ThreeColumnRecord(1, "a", "P1"), new
ThreeColumnRecord(3, "c", "P1")),
+ ImmutableList.of(new ThreeColumnRecord(2, "b", "P1"), new
ThreeColumnRecord(4, "d", "P1")));
+
+ // Ordering enabled but grouping explicitly disabled — outputOrdering()
must return empty
+ withSQLConf(
+ ImmutableMap.of(
+ SparkSQLProperties.PRESERVE_DATA_ORDERING,
+ "true",
+ SparkSQLProperties.PRESERVE_DATA_GROUPING,
+ "false",
+ "spark.sql.autoBroadcastJoinThreshold",
+ "-1",
+ "spark.sql.adaptive.enabled",
+ "false"),
+ () -> {
+ SparkPlan plan =
+ executeAndKeepPlan(String.format("SELECT c1, c2 FROM %s ORDER BY
c1", tableName));
+ List<SortExec> sorts = collectPlans(plan, SortExec.class);
+ assertThat(sorts).isNotEmpty();
+ });
+ }
+
+ @TestTemplate
+ void testOrderingNotReportedForUnsortedTable() throws NoSuchTableException {
+ createSimpleTable(tableName);
+
+ List<SimpleRecord> batch = ImmutableList.of(new SimpleRecord(1, "a"), new
SimpleRecord(2, "b"));
+ spark.createDataFrame(batch,
SimpleRecord.class).coalesce(1).writeTo(tableName).append();
+
+ spark.conf().set(SparkSQLProperties.PRESERVE_DATA_ORDERING, "true");
+
+ Dataset<Row> result = spark.sql(String.format("SELECT * FROM %s",
tableName));
+ List<Object[]> rows = rowsToJava(result.collectAsList());
+
+ assertThat(rows).hasSize(2);
+ }
+
+ @TestTemplate
+ void testNoMergeReaderForUnpartitionedSortedTable() throws
NoSuchTableException {
+ Table table = createSimpleTable(tableName); // unpartitioned
+ setSortOrder(table, "id");
+
+ // Multiple files so that a merge reader *would* be created if the bug
were present
+ writeBatches(
+ tableName,
+ SimpleRecord.class,
+ ImmutableList.of(new SimpleRecord(1, "a"), new SimpleRecord(3, "c")),
+ ImmutableList.of(new SimpleRecord(2, "b"), new SimpleRecord(4, "d")));
+
+ spark.conf().set(SparkSQLProperties.PRESERVE_DATA_ORDERING, "true");
+
+ SparkPlan plan =
+ executeAndKeepPlan(String.format("SELECT id, data FROM %s ORDER BY
id", tableName));
+ List<SortExec> sorts = collectPlans(plan, SortExec.class);
+
+ assertThat(sorts).isNotEmpty();
+
+ Dataset<Row> result = spark.sql(String.format("SELECT id FROM %s ORDER BY
id", tableName));
+ List<Object[]> rows = rowsToJava(result.collectAsList());
+ assertThat(rows).containsExactly(row(1), row(2), row(3), row(4));
+ }
+
+ @TestTemplate
+ void testSortRequiredWhenOrderingNotReported() throws NoSuchTableException {
+ Table table = createSimpleTable(tableName);
+ setSortOrder(table, "id");
+
+ writeBatches(
+ tableName,
+ SimpleRecord.class,
+ ImmutableList.of(new SimpleRecord(1, "a"), new SimpleRecord(2, "b")),
+ ImmutableList.of(new SimpleRecord(3, "c"), new SimpleRecord(4, "d")));
+
+ spark.conf().unset(SparkSQLProperties.PRESERVE_DATA_ORDERING);
+
+ SparkPlan plan =
+ executeAndKeepPlan(String.format("SELECT id, data FROM %s ORDER BY
id", tableName));
+
+ List<SortExec> sorts = collectPlans(plan, SortExec.class);
+
+ assertThat(sorts).isNotEmpty();
+ }
+
+ @TestTemplate
+ void testSortMergeJoinWithSortedTables() throws NoSuchTableException {
+ createBucketedTable(tableName, "c1");
+ createBucketedTable(tableName("table_source"), "c1");
+
+ writeBatches(
+ tableName,
+ ThreeColumnRecord.class,
+ ImmutableList.of(new ThreeColumnRecord(1, "a", "X"), new
ThreeColumnRecord(2, "b", "X")),
+ ImmutableList.of(new ThreeColumnRecord(3, "c", "X"), new
ThreeColumnRecord(4, "d", "X")));
+
+ writeBatches(
+ tableName("table_source"),
+ ThreeColumnRecord.class,
+ ImmutableList.of(new ThreeColumnRecord(1, "A", "Y"), new
ThreeColumnRecord(2, "B", "Y")),
+ ImmutableList.of(new ThreeColumnRecord(3, "C", "Y"), new
ThreeColumnRecord(4, "D", "Y")));
+
+ assertPlanWithoutSort(
+ 0,
+ 2,
+ null,
+ "SELECT t1.c1, t1.c2, t2.c2 FROM %s t1 JOIN %s t2 ON t1.c1 = t2.c1",
+ tableName,
+ tableName("table_source"));
+ }
+
+ @TestTemplate
+ void testMergeWithSortedBucketedTables() throws NoSuchTableException {
+ sql(
+ "CREATE TABLE %s (c1 INT, c2 STRING, c3 STRING) "
+ + "USING iceberg "
+ + "PARTITIONED BY (bucket(4, c1)) "
+ + "TBLPROPERTIES('write.merge.mode' = 'merge-on-read', '%s' =
'%d')",
+ tableName, TableProperties.SPLIT_SIZE, 1024);
+
+ Table targetTable = validationCatalog.loadTable(tableIdent);
+ setSortOrder(targetTable, "c1");
+
+ writeBatches(
+ tableName,
+ ThreeColumnRecord.class,
+ ImmutableList.of(
+ new ThreeColumnRecord(1, "old1", "data1"), new
ThreeColumnRecord(2, "old2", "data2")),
+ ImmutableList.of(
+ new ThreeColumnRecord(3, "old3", "data3"), new
ThreeColumnRecord(4, "old4", "data4")));
+
+ String sourceTableName = tableName("table_source");
+ TableIdentifier sourceTableIdent =
TableIdentifier.of(Namespace.of("default"), "table_source");
+ sql(
+ "CREATE TABLE %s (c1 INT, c2 STRING, c3 STRING) "
+ + "USING iceberg "
+ + "PARTITIONED BY (bucket(4, c1)) "
+ + "TBLPROPERTIES('%s' = '%d')",
+ sourceTableName, TableProperties.SPLIT_SIZE, 1024);
+
+ Table sourceTable = validationCatalog.loadTable(sourceTableIdent);
+ setSortOrder(sourceTable, "c1");
+
+ writeBatches(
+ sourceTableName,
+ ThreeColumnRecord.class,
+ ImmutableList.of(
+ new ThreeColumnRecord(2, "new2", "data2"), new
ThreeColumnRecord(3, "new3", "data3")),
+ ImmutableList.of(
+ new ThreeColumnRecord(5, "new5", "data5"), new
ThreeColumnRecord(6, "new6", "data6")));
+
+ refreshTables(tableName, sourceTableName);
+
+ validationCatalog.loadTable(tableIdent).refresh();
+ validationCatalog.loadTable(sourceTableIdent).refresh();
+
+ assertPlanWithoutSort(
+ 1,
+ 3,
+ this::verifyMergeResults,
+ "MERGE INTO %s t USING %s s ON t.c1 = s.c1 "
+ + "WHEN MATCHED THEN UPDATE SET t.c2 = s.c2, t.c3 = s.c3 "
+ + "WHEN NOT MATCHED THEN INSERT *",
+ tableName,
+ sourceTableName);
+ }
+
+ @TestTemplate
+ void testHistoricalSortOrderInJoin() throws NoSuchTableException {
+ sql(
+ "CREATE TABLE %s (c1 INT, c2 STRING, c3 STRING) "
+ + "USING iceberg "
+ + "PARTITIONED BY (bucket(4, c1)) "
+ + "TBLPROPERTIES('%s' = '%d')",
+ tableName, TableProperties.SPLIT_SIZE, 1024);
+
+ Table table1 = validationCatalog.loadTable(tableIdent);
+ setSortOrder(table1, "c1");
+
+ writeBatches(
+ tableName,
+ ThreeColumnRecord.class,
+ ImmutableList.of(new ThreeColumnRecord(1, "a", "X"), new
ThreeColumnRecord(2, "b", "X")),
+ ImmutableList.of(new ThreeColumnRecord(3, "c", "X"), new
ThreeColumnRecord(4, "d", "X")));
+
+ table1.replaceSortOrder().asc("c2").asc("c1").commit();
+
+ String table2Name = tableName("table_source");
+ sql(
+ "CREATE TABLE %s (c1 INT, c2 STRING, c3 STRING) "
+ + "USING iceberg "
+ + "PARTITIONED BY (bucket(4, c1)) "
+ + "TBLPROPERTIES('%s' = '%d')",
+ table2Name, TableProperties.SPLIT_SIZE, 1024);
+
+ TableIdentifier table2Ident = TableIdentifier.of(Namespace.of("default"),
"table_source");
+ Table table2 = validationCatalog.loadTable(table2Ident);
+ setSortOrder(table2, "c1");
+
+ writeBatches(
+ table2Name,
+ ThreeColumnRecord.class,
+ ImmutableList.of(new ThreeColumnRecord(1, "A", "Y"), new
ThreeColumnRecord(2, "B", "Y")),
+ ImmutableList.of(new ThreeColumnRecord(3, "C", "Y"), new
ThreeColumnRecord(4, "D", "Y")));
+
+ table2.replaceSortOrder().asc("c2").asc("c1").commit();
+
+ // Both tables have files with historical sort order [c1 ASC]
+ // but current table sort order is [c2 ASC, c1 ASC].
+ // Verify neither scan reports ordering — SortOrderAnalyzer must decline
due to mismatched IDs.
+ TableIdentifier table2Ident2 = TableIdentifier.of(Namespace.of("default"),
"table_source");
+ withSQLConf(
+ ENABLED_ORDERING_SQL_CONF,
+ () -> {
+ assertScanReportsNoOrdering(tableIdent);
+ assertScanReportsNoOrdering(table2Ident2);
+ });
+ assertPlanWithoutSort(
+ 2,
+ 2,
+ null,
+ "SELECT t1.c1, t1.c2, t2.c2 FROM %s t1 JOIN %s t2 ON t1.c1 = t2.c1",
+ tableName,
+ table2Name);
+ }
+
+ @TestTemplate
+ void testMixedSortOrdersNoReporting() throws NoSuchTableException {
+ sql(
+ "CREATE TABLE %s (c1 INT, c2 STRING, c3 STRING) "
+ + "USING iceberg "
+ + "PARTITIONED BY (bucket(4, c1)) "
+ + "TBLPROPERTIES('%s' = '%d')",
+ tableName, TableProperties.SPLIT_SIZE, 1024);
+
+ Table table1 = validationCatalog.loadTable(tableIdent);
+ setSortOrder(table1, "c1");
+
+ writeBatches(
+ tableName,
+ ThreeColumnRecord.class,
+ ImmutableList.of(new ThreeColumnRecord(1, "a", "X"), new
ThreeColumnRecord(2, "b", "X")));
+
+ table1.replaceSortOrder().asc("c2").asc("c1").commit();
+
+ writeBatches(
+ tableName,
+ ThreeColumnRecord.class,
+ ImmutableList.of(new ThreeColumnRecord(3, "c", "X"), new
ThreeColumnRecord(4, "d", "X")));
+
+ String table2Name = tableName("table_source");
+ sql(
+ "CREATE TABLE %s (c1 INT, c2 STRING, c3 STRING) "
+ + "USING iceberg "
+ + "PARTITIONED BY (bucket(4, c1)) "
+ + "TBLPROPERTIES('%s' = '%d')",
+ table2Name, TableProperties.SPLIT_SIZE, 1024);
+
+ TableIdentifier table2Ident = TableIdentifier.of(Namespace.of("default"),
"table_source");
+ Table table2 = validationCatalog.loadTable(table2Ident);
+ setSortOrder(table2, "c1");
+
+ writeBatches(
+ table2Name,
+ ThreeColumnRecord.class,
+ ImmutableList.of(new ThreeColumnRecord(1, "A", "Y"), new
ThreeColumnRecord(2, "B", "Y")));
+
+ table2.replaceSortOrder().asc("c2").asc("c1").commit();
+
+ writeBatches(
+ table2Name,
+ ThreeColumnRecord.class,
+ ImmutableList.of(new ThreeColumnRecord(3, "C", "Y"), new
ThreeColumnRecord(4, "D", "Y")));
+
+ assertPlanWithoutSort(
+ 2,
+ 2,
+ null,
+ "SELECT t1.c1, t1.c2, t2.c2 FROM %s t1 JOIN %s t2 ON t1.c1 = t2.c1",
+ tableName,
+ table2Name);
+ }
+
+ @TestTemplate
+ void testSPJWithDifferentPartitionAndSortKeys() throws NoSuchTableException {
+ createBucketedTable(tableName, "c3", "c1");
+ createBucketedTable(tableName("table_source"), "c3", "c1");
+
+ writeBatches(
+ tableName,
+ ThreeColumnRecord.class,
+ ImmutableList.of(
+ new ThreeColumnRecord(1, "a", "2024-01-01"),
+ new ThreeColumnRecord(2, "b", "2024-01-02")),
+ ImmutableList.of(
+ new ThreeColumnRecord(1, "c", "2024-01-03"),
+ new ThreeColumnRecord(2, "d", "2024-01-04")));
+
+ writeBatches(
+ tableName("table_source"),
+ ThreeColumnRecord.class,
+ ImmutableList.of(
+ new ThreeColumnRecord(1, "A", "2024-01-01"),
+ new ThreeColumnRecord(2, "B", "2024-01-02")),
+ ImmutableList.of(
+ new ThreeColumnRecord(1, "C", "2024-01-03"),
+ new ThreeColumnRecord(2, "D", "2024-01-04")));
+
+ refreshTables(tableName, tableName("table_source"));
+
+ assertPlanWithoutSort(
+ 0,
+ 2,
+ null,
+ "SELECT t1.c1, t1.c2, t2.c2 FROM %s t1 JOIN %s t2 ON t1.c3 = t2.c3 AND
t1.c1 = t2.c1",
+ tableName,
+ tableName("table_source"));
+ }
+
+ @TestTemplate
+ void testHistoricalSortOrderInMerge() throws NoSuchTableException {
+ sql(
+ "CREATE TABLE %s (c1 INT, c2 STRING, c3 STRING) "
+ + "USING iceberg "
+ + "PARTITIONED BY (bucket(4, c1)) "
+ + "TBLPROPERTIES('write.merge.mode' = 'merge-on-read', '%s' =
'%d')",
+ tableName, TableProperties.SPLIT_SIZE, 1024);
+
+ Table targetTable = validationCatalog.loadTable(tableIdent);
+ setSortOrder(targetTable, "c1");
+
+ writeBatches(
+ tableName,
+ ThreeColumnRecord.class,
+ ImmutableList.of(
+ new ThreeColumnRecord(1, "old1", "data1"), new
ThreeColumnRecord(2, "old2", "data2")),
+ ImmutableList.of(
+ new ThreeColumnRecord(3, "old3", "data3"), new
ThreeColumnRecord(4, "old4", "data4")));
+
+ targetTable.replaceSortOrder().asc("c2").asc("c1").commit();
+
+ String sourceTableName = tableName("table_source");
+ TableIdentifier sourceTableIdent =
TableIdentifier.of(Namespace.of("default"), "table_source");
+ sql(
+ "CREATE TABLE %s (c1 INT, c2 STRING, c3 STRING) "
+ + "USING iceberg "
+ + "PARTITIONED BY (bucket(4, c1)) "
+ + "TBLPROPERTIES('%s' = '%d')",
+ sourceTableName, TableProperties.SPLIT_SIZE, 1024);
+
+ Table sourceTable = validationCatalog.loadTable(sourceTableIdent);
+ setSortOrder(sourceTable, "c1");
+
+ writeBatches(
+ sourceTableName,
+ ThreeColumnRecord.class,
+ ImmutableList.of(
+ new ThreeColumnRecord(2, "new2", "data2"), new
ThreeColumnRecord(3, "new3", "data3")),
+ ImmutableList.of(
+ new ThreeColumnRecord(5, "new5", "data5"), new
ThreeColumnRecord(6, "new6", "data6")));
+
+ sourceTable.replaceSortOrder().asc("c2").asc("c1").commit();
+
+ refreshTables(tableName, sourceTableName);
+
+ validationCatalog.loadTable(tableIdent).refresh();
+ validationCatalog.loadTable(sourceTableIdent).refresh();
+
+ // Files have historical sort order [c1 ASC] but tables have [c2 ASC, c1
ASC].
+ // Verify neither scan reports ordering — SortOrderAnalyzer must decline
due to mismatched IDs.
+ withSQLConf(
+ ENABLED_ORDERING_SQL_CONF,
+ () -> {
+ assertScanReportsNoOrdering(tableIdent);
+ assertScanReportsNoOrdering(sourceTableIdent);
+ });
+ assertPlanWithoutSort(
+ 3,
+ 3,
+ this::verifyMergeResults,
+ "MERGE INTO %s t USING %s s ON t.c1 = s.c1 "
+ + "WHEN MATCHED THEN UPDATE SET t.c2 = s.c2, t.c3 = s.c3 "
+ + "WHEN NOT MATCHED THEN INSERT *",
+ tableName,
+ sourceTableName);
+ }
+
+ @TestTemplate
+ void testProjectionPushdownSortKeyNotProjected() throws NoSuchTableException
{
+ sql(
+ "CREATE TABLE %s (c1 INT, c2 STRING, c3 STRING) "
+ + "USING iceberg "
+ + "PARTITIONED BY (c3)",
+ tableName);
+
+ Table table = validationCatalog.loadTable(tableIdent);
+ setSortOrder(table, "c1");
+
+ writeBatches(
+ tableName,
+ ThreeColumnRecord.class,
+ ImmutableList.of(new ThreeColumnRecord(1, "a", "P1"), new
ThreeColumnRecord(3, "c", "P1")),
+ ImmutableList.of(new ThreeColumnRecord(2, "b", "P1"), new
ThreeColumnRecord(4, "d", "P1")));
+
+ spark.conf().set(SparkSQLProperties.PRESERVE_DATA_ORDERING, "true");
+
+ // c1 is the sort key but is NOT in the SELECT list.
+ Dataset<Row> result = spark.sql(String.format("SELECT c2 FROM %s WHERE c3
= 'P1'", tableName));
+ List<Object[]> rows = rowsToJava(result.collectAsList());
+
+ assertThat(rows).hasSize(4).containsExactlyInAnyOrder(row("a"), row("b"),
row("c"), row("d"));
Review Comment:
Good catch, actually this test was not exercising the merging reader code at
all since grouping was turned off. I now set up grouping and changed it to
`containsExactly`.
##########
spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatch.java:
##########
@@ -94,21 +97,40 @@ public InputPartition[] planInputPartitions() {
InputPartition[] partitions = new InputPartition[taskGroups.size()];
for (int index = 0; index < taskGroups.size(); index++) {
+ ScanTaskGroup<?> taskGroup = taskGroups.get(index);
+
partitions[index] =
new SparkInputPartition(
groupingKeyType,
- taskGroups.get(index),
+ taskGroup,
tableBroadcast,
fileIOBroadcast,
projectionString,
caseSensitive,
locations != null ? locations[index] :
SparkPlanningUtil.NO_LOCATION_PREFERENCE,
- cacheDeleteFilesOnExecutors);
+ cacheDeleteFilesOnExecutors,
+ shouldUseMergingSortedReader(taskGroup));
}
return partitions;
}
+ /** Returns whether sort ordering was reported for this batch's scan. */
+ private boolean isOrderingEnabled() {
+ return orderingEnabled;
+ }
+
+ /**
+ * Returns true if this task group should use a k-way merging reader. This
requires ordering to be
+ * enabled at the table level (validated by {@link #isOrderingEnabled()},
multiple files in the
+ * group, and all tasks being {@link FileScanTask}s.
+ */
+ private boolean shouldUseMergingSortedReader(ScanTaskGroup<?> taskGroup) {
Review Comment:
You are right. We are anyway disabling bin packing so a task group is 1:1
with an input partition. I refactored this check for all readers
`anyGroupNeedsMergingReader()`.
##########
spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/MergingSortedRowDataReader.java:
##########
@@ -0,0 +1,299 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.source;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.stream.Collectors;
+import org.apache.iceberg.BaseScanTaskGroup;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.ScanTaskGroup;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.SortField;
+import org.apache.iceberg.SortOrder;
+import org.apache.iceberg.SortOrderComparators;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.io.CloseableGroup;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.CloseableIterator;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.spark.SparkSchemaUtil;
+import org.apache.iceberg.spark.source.metrics.TaskNumDeletes;
+import org.apache.iceberg.spark.source.metrics.TaskNumSplits;
+import org.apache.iceberg.types.TypeUtil;
+import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.SortedMerge;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
+import org.apache.spark.sql.connector.metric.CustomTaskMetric;
+import org.apache.spark.sql.connector.read.PartitionReader;
+import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.StructType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A {@link PartitionReader} that reads multiple sorted files and merges them
into a single sorted
+ * stream using a k-way heap merge ({@link SortedMerge}).
+ *
+ * <p>This reader is used when {@code preserve-data-ordering} is enabled and
the task group contains
+ * multiple files that all have the same sort order.
+ *
+ * <p>Sort key columns absent from the requested projection are temporarily
added to the read schema
+ * so that {@link SortOrderComparators} can access them during the merge. The
extra columns are
+ * stripped from each row before it is returned to Spark.
+ */
+class MergingSortedRowDataReader implements PartitionReader<InternalRow> {
+ private static final Logger LOG =
LoggerFactory.getLogger(MergingSortedRowDataReader.class);
+
+ private final CloseableGroup resources;
+ private final CloseableIterator<InternalRow> mergedIterator;
+ private final List<RowDataReader> fileReaders;
+ // non-null only when sort key columns were added to the read schema beyond
what Spark projected
+ private final int[] outputPositions;
+ private final DataType[] outputDataTypes;
+ private final Object[] outputValues; // reused per row to avoid per-row
allocation
+ private InternalRow current;
+
+ MergingSortedRowDataReader(SparkInputPartition partition) {
+ Table table = partition.table();
+ ScanTaskGroup<FileScanTask> taskGroup = partition.taskGroup();
+ Schema projection = partition.projection();
+ SortOrder sortOrder = table.sortOrder();
+
+ Preconditions.checkState(
+ sortOrder.isSorted(), "Cannot create merging reader for unsorted table
%s", table.name());
+ Preconditions.checkState(
+ taskGroup.tasks().size() > 1,
Review Comment:
Done
##########
spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/MergingSortedRowDataReader.java:
##########
@@ -0,0 +1,299 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.source;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.stream.Collectors;
+import org.apache.iceberg.BaseScanTaskGroup;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.ScanTaskGroup;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.SortField;
+import org.apache.iceberg.SortOrder;
+import org.apache.iceberg.SortOrderComparators;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.io.CloseableGroup;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.CloseableIterator;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.spark.SparkSchemaUtil;
+import org.apache.iceberg.spark.source.metrics.TaskNumDeletes;
+import org.apache.iceberg.spark.source.metrics.TaskNumSplits;
+import org.apache.iceberg.types.TypeUtil;
+import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.SortedMerge;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
+import org.apache.spark.sql.connector.metric.CustomTaskMetric;
+import org.apache.spark.sql.connector.read.PartitionReader;
+import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.StructType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A {@link PartitionReader} that reads multiple sorted files and merges them
into a single sorted
+ * stream using a k-way heap merge ({@link SortedMerge}).
+ *
+ * <p>This reader is used when {@code preserve-data-ordering} is enabled and
the task group contains
+ * multiple files that all have the same sort order.
+ *
+ * <p>Sort key columns absent from the requested projection are temporarily
added to the read schema
+ * so that {@link SortOrderComparators} can access them during the merge. The
extra columns are
+ * stripped from each row before it is returned to Spark.
+ */
+class MergingSortedRowDataReader implements PartitionReader<InternalRow> {
+ private static final Logger LOG =
LoggerFactory.getLogger(MergingSortedRowDataReader.class);
+
+ private final CloseableGroup resources;
+ private final CloseableIterator<InternalRow> mergedIterator;
+ private final List<RowDataReader> fileReaders;
+ // non-null only when sort key columns were added to the read schema beyond
what Spark projected
+ private final int[] outputPositions;
+ private final DataType[] outputDataTypes;
+ private final Object[] outputValues; // reused per row to avoid per-row
allocation
+ private InternalRow current;
+
+ MergingSortedRowDataReader(SparkInputPartition partition) {
+ Table table = partition.table();
+ ScanTaskGroup<FileScanTask> taskGroup = partition.taskGroup();
+ Schema projection = partition.projection();
+ SortOrder sortOrder = table.sortOrder();
+
+ Preconditions.checkState(
+ sortOrder.isSorted(), "Cannot create merging reader for unsorted table
%s", table.name());
+ Preconditions.checkState(
+ taskGroup.tasks().size() > 1,
+ "Merging reader requires multiple files, got %s",
+ taskGroup.tasks().size());
+
+ LOG.info(
+ "Creating merging reader for {} files with sort order {} in table {}",
+ taskGroup.tasks().size(),
+ sortOrder.orderId(),
+ table.name());
+
+ // Augment the projected schema with any sort key columns Spark did not
request so that
+ // SortOrderComparators can access every sort key field during the merge.
+ Schema mergeReadSchema = mergeReadSchema(projection, sortOrder, table);
+ this.outputPositions = buildOutputPositions(projection, mergeReadSchema);
+ this.outputDataTypes = buildOutputDataTypes(projection, outputPositions);
+ this.outputValues = outputPositions != null ? new
Object[outputPositions.length] : null;
+
+ this.resources = new CloseableGroup();
+ this.fileReaders =
+ taskGroup.tasks().stream()
+ .map(
+ task ->
+ new RowDataReader(
+ table,
+ partition.io(),
+ new
BaseScanTaskGroup<>(Collections.singletonList(task)),
+ mergeReadSchema,
+ partition.isCaseSensitive(),
+ partition.cacheDeleteFilesOnExecutors()))
+ .collect(Collectors.toList());
+ // Wrap each reader as a CloseableIterable and feed into SortedMerge.
+ List<CloseableIterable<InternalRow>> fileIterables =
+
fileReaders.stream().map(this::readerToIterable).collect(Collectors.toList());
+ SortedMerge<InternalRow> sortedMerge =
+ new SortedMerge<>(buildComparator(mergeReadSchema, sortOrder),
fileIterables);
+ resources.addCloseable(sortedMerge);
+ this.mergedIterator = sortedMerge.iterator();
Review Comment:
Hmm, this is an interesting one I had not thought of, I'm trying to
understand the InputFilleBlockHolder more.
One option is to track which file each merged row came from and update
`InputFileBlockHolder` in `MergingSortedRowDataReader.next()`. Each sub-reader
wraps a single file, so we can capture the file path when building the
iterables and tag each row with its source.
What do you think?
##########
spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/MergingSortedRowDataReader.java:
##########
@@ -0,0 +1,299 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.source;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.stream.Collectors;
+import org.apache.iceberg.BaseScanTaskGroup;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.ScanTaskGroup;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.SortField;
+import org.apache.iceberg.SortOrder;
+import org.apache.iceberg.SortOrderComparators;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.io.CloseableGroup;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.CloseableIterator;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.spark.SparkSchemaUtil;
+import org.apache.iceberg.spark.source.metrics.TaskNumDeletes;
+import org.apache.iceberg.spark.source.metrics.TaskNumSplits;
+import org.apache.iceberg.types.TypeUtil;
+import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.SortedMerge;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
+import org.apache.spark.sql.connector.metric.CustomTaskMetric;
+import org.apache.spark.sql.connector.read.PartitionReader;
+import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.StructType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A {@link PartitionReader} that reads multiple sorted files and merges them
into a single sorted
+ * stream using a k-way heap merge ({@link SortedMerge}).
+ *
+ * <p>This reader is used when {@code preserve-data-ordering} is enabled and
the task group contains
+ * multiple files that all have the same sort order.
+ *
+ * <p>Sort key columns absent from the requested projection are temporarily
added to the read schema
+ * so that {@link SortOrderComparators} can access them during the merge. The
extra columns are
+ * stripped from each row before it is returned to Spark.
+ */
+class MergingSortedRowDataReader implements PartitionReader<InternalRow> {
+ private static final Logger LOG =
LoggerFactory.getLogger(MergingSortedRowDataReader.class);
+
+ private final CloseableGroup resources;
+ private final CloseableIterator<InternalRow> mergedIterator;
+ private final List<RowDataReader> fileReaders;
+ // non-null only when sort key columns were added to the read schema beyond
what Spark projected
+ private final int[] outputPositions;
+ private final DataType[] outputDataTypes;
+ private final Object[] outputValues; // reused per row to avoid per-row
allocation
+ private InternalRow current;
+
+ MergingSortedRowDataReader(SparkInputPartition partition) {
+ Table table = partition.table();
+ ScanTaskGroup<FileScanTask> taskGroup = partition.taskGroup();
+ Schema projection = partition.projection();
+ SortOrder sortOrder = table.sortOrder();
+
+ Preconditions.checkState(
+ sortOrder.isSorted(), "Cannot create merging reader for unsorted table
%s", table.name());
+ Preconditions.checkState(
+ taskGroup.tasks().size() > 1,
+ "Merging reader requires multiple files, got %s",
+ taskGroup.tasks().size());
+
+ LOG.info(
+ "Creating merging reader for {} files with sort order {} in table {}",
+ taskGroup.tasks().size(),
+ sortOrder.orderId(),
+ table.name());
+
+ // Augment the projected schema with any sort key columns Spark did not
request so that
+ // SortOrderComparators can access every sort key field during the merge.
+ Schema mergeReadSchema = mergeReadSchema(projection, sortOrder, table);
+ this.outputPositions = buildOutputPositions(projection, mergeReadSchema);
+ this.outputDataTypes = buildOutputDataTypes(projection, outputPositions);
+ this.outputValues = outputPositions != null ? new
Object[outputPositions.length] : null;
+
+ this.resources = new CloseableGroup();
+ this.fileReaders =
+ taskGroup.tasks().stream()
+ .map(
+ task ->
+ new RowDataReader(
+ table,
+ partition.io(),
+ new
BaseScanTaskGroup<>(Collections.singletonList(task)),
+ mergeReadSchema,
+ partition.isCaseSensitive(),
+ partition.cacheDeleteFilesOnExecutors()))
+ .collect(Collectors.toList());
+ // Wrap each reader as a CloseableIterable and feed into SortedMerge.
+ List<CloseableIterable<InternalRow>> fileIterables =
+
fileReaders.stream().map(this::readerToIterable).collect(Collectors.toList());
Review Comment:
Done.
##########
spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/MergingSortedRowDataReader.java:
##########
@@ -0,0 +1,299 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.source;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.stream.Collectors;
+import org.apache.iceberg.BaseScanTaskGroup;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.ScanTaskGroup;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.SortField;
+import org.apache.iceberg.SortOrder;
+import org.apache.iceberg.SortOrderComparators;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.io.CloseableGroup;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.CloseableIterator;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.spark.SparkSchemaUtil;
+import org.apache.iceberg.spark.source.metrics.TaskNumDeletes;
+import org.apache.iceberg.spark.source.metrics.TaskNumSplits;
+import org.apache.iceberg.types.TypeUtil;
+import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.SortedMerge;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
+import org.apache.spark.sql.connector.metric.CustomTaskMetric;
+import org.apache.spark.sql.connector.read.PartitionReader;
+import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.StructType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A {@link PartitionReader} that reads multiple sorted files and merges them
into a single sorted
+ * stream using a k-way heap merge ({@link SortedMerge}).
+ *
+ * <p>This reader is used when {@code preserve-data-ordering} is enabled and
the task group contains
+ * multiple files that all have the same sort order.
+ *
+ * <p>Sort key columns absent from the requested projection are temporarily
added to the read schema
+ * so that {@link SortOrderComparators} can access them during the merge. The
extra columns are
+ * stripped from each row before it is returned to Spark.
+ */
+class MergingSortedRowDataReader implements PartitionReader<InternalRow> {
+ private static final Logger LOG =
LoggerFactory.getLogger(MergingSortedRowDataReader.class);
+
+ private final CloseableGroup resources;
+ private final CloseableIterator<InternalRow> mergedIterator;
+ private final List<RowDataReader> fileReaders;
+ // non-null only when sort key columns were added to the read schema beyond
what Spark projected
+ private final int[] outputPositions;
+ private final DataType[] outputDataTypes;
+ private final Object[] outputValues; // reused per row to avoid per-row
allocation
+ private InternalRow current;
+
+ MergingSortedRowDataReader(SparkInputPartition partition) {
+ Table table = partition.table();
+ ScanTaskGroup<FileScanTask> taskGroup = partition.taskGroup();
+ Schema projection = partition.projection();
+ SortOrder sortOrder = table.sortOrder();
+
+ Preconditions.checkState(
+ sortOrder.isSorted(), "Cannot create merging reader for unsorted table
%s", table.name());
+ Preconditions.checkState(
+ taskGroup.tasks().size() > 1,
+ "Merging reader requires multiple files, got %s",
+ taskGroup.tasks().size());
+
+ LOG.info(
+ "Creating merging reader for {} files with sort order {} in table {}",
+ taskGroup.tasks().size(),
+ sortOrder.orderId(),
+ table.name());
+
+ // Augment the projected schema with any sort key columns Spark did not
request so that
+ // SortOrderComparators can access every sort key field during the merge.
+ Schema mergeReadSchema = mergeReadSchema(projection, sortOrder, table);
+ this.outputPositions = buildOutputPositions(projection, mergeReadSchema);
+ this.outputDataTypes = buildOutputDataTypes(projection, outputPositions);
+ this.outputValues = outputPositions != null ? new
Object[outputPositions.length] : null;
+
+ this.resources = new CloseableGroup();
+ this.fileReaders =
+ taskGroup.tasks().stream()
+ .map(
+ task ->
+ new RowDataReader(
+ table,
+ partition.io(),
+ new
BaseScanTaskGroup<>(Collections.singletonList(task)),
+ mergeReadSchema,
+ partition.isCaseSensitive(),
+ partition.cacheDeleteFilesOnExecutors()))
+ .collect(Collectors.toList());
Review Comment:
Done.
##########
spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/MergingSortedRowDataReader.java:
##########
@@ -0,0 +1,299 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.source;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.stream.Collectors;
+import org.apache.iceberg.BaseScanTaskGroup;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.ScanTaskGroup;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.SortField;
+import org.apache.iceberg.SortOrder;
+import org.apache.iceberg.SortOrderComparators;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.io.CloseableGroup;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.CloseableIterator;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.spark.SparkSchemaUtil;
+import org.apache.iceberg.spark.source.metrics.TaskNumDeletes;
+import org.apache.iceberg.spark.source.metrics.TaskNumSplits;
+import org.apache.iceberg.types.TypeUtil;
+import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.SortedMerge;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
+import org.apache.spark.sql.connector.metric.CustomTaskMetric;
+import org.apache.spark.sql.connector.read.PartitionReader;
+import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.StructType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A {@link PartitionReader} that reads multiple sorted files and merges them
into a single sorted
+ * stream using a k-way heap merge ({@link SortedMerge}).
+ *
+ * <p>This reader is used when {@code preserve-data-ordering} is enabled and
the task group contains
+ * multiple files that all have the same sort order.
+ *
+ * <p>Sort key columns absent from the requested projection are temporarily
added to the read schema
+ * so that {@link SortOrderComparators} can access them during the merge. The
extra columns are
+ * stripped from each row before it is returned to Spark.
+ */
+class MergingSortedRowDataReader implements PartitionReader<InternalRow> {
+ private static final Logger LOG =
LoggerFactory.getLogger(MergingSortedRowDataReader.class);
+
+ private final CloseableGroup resources;
+ private final CloseableIterator<InternalRow> mergedIterator;
+ private final List<RowDataReader> fileReaders;
+ // non-null only when sort key columns were added to the read schema beyond
what Spark projected
+ private final int[] outputPositions;
+ private final DataType[] outputDataTypes;
+ private final Object[] outputValues; // reused per row to avoid per-row
allocation
+ private InternalRow current;
+
+ MergingSortedRowDataReader(SparkInputPartition partition) {
+ Table table = partition.table();
+ ScanTaskGroup<FileScanTask> taskGroup = partition.taskGroup();
+ Schema projection = partition.projection();
+ SortOrder sortOrder = table.sortOrder();
+
+ Preconditions.checkState(
+ sortOrder.isSorted(), "Cannot create merging reader for unsorted table
%s", table.name());
+ Preconditions.checkState(
+ taskGroup.tasks().size() > 1,
+ "Merging reader requires multiple files, got %s",
+ taskGroup.tasks().size());
+
+ LOG.info(
+ "Creating merging reader for {} files with sort order {} in table {}",
+ taskGroup.tasks().size(),
+ sortOrder.orderId(),
+ table.name());
+
+ // Augment the projected schema with any sort key columns Spark did not
request so that
+ // SortOrderComparators can access every sort key field during the merge.
+ Schema mergeReadSchema = mergeReadSchema(projection, sortOrder, table);
+ this.outputPositions = buildOutputPositions(projection, mergeReadSchema);
+ this.outputDataTypes = buildOutputDataTypes(projection, outputPositions);
+ this.outputValues = outputPositions != null ? new
Object[outputPositions.length] : null;
+
+ this.resources = new CloseableGroup();
+ this.fileReaders =
+ taskGroup.tasks().stream()
+ .map(
+ task ->
+ new RowDataReader(
+ table,
+ partition.io(),
+ new
BaseScanTaskGroup<>(Collections.singletonList(task)),
+ mergeReadSchema,
+ partition.isCaseSensitive(),
+ partition.cacheDeleteFilesOnExecutors()))
+ .collect(Collectors.toList());
+ // Wrap each reader as a CloseableIterable and feed into SortedMerge.
+ List<CloseableIterable<InternalRow>> fileIterables =
+
fileReaders.stream().map(this::readerToIterable).collect(Collectors.toList());
+ SortedMerge<InternalRow> sortedMerge =
+ new SortedMerge<>(buildComparator(mergeReadSchema, sortOrder),
fileIterables);
+ resources.addCloseable(sortedMerge);
+ this.mergedIterator = sortedMerge.iterator();
+ }
+
+ /**
+ * Adapts a {@link RowDataReader} to a {@link CloseableIterable} for use
with {@link SortedMerge}.
+ * Each row is copied before it enters the priority queue because Spark's
Parquet/ORC readers
+ * reuse {@link InternalRow} instances for performance.
+ */
+ private CloseableIterable<InternalRow> readerToIterable(RowDataReader
reader) {
+ return CloseableIterable.withNoopClose(
+ () ->
+ new CloseableIterator<>() {
+ private boolean advanced = false;
+ private boolean hasNext = false;
+
+ @Override
+ public boolean hasNext() {
+ if (!advanced) {
+ try {
+ hasNext = reader.next();
+ advanced = true;
+ } catch (IOException e) {
+ throw new UncheckedIOException("Failed to advance reader",
e);
+ }
+ }
+ return hasNext;
+ }
+
+ @Override
+ public InternalRow next() {
+ if (!advanced) {
+ hasNext();
+ }
+ advanced = false;
+ return reader.get().copy();
+ }
+
+ @Override
+ public void close() throws IOException {
+ reader.close();
+ }
+ });
+ }
+
+ @Override
+ public boolean next() throws IOException {
+ if (!mergedIterator.hasNext()) {
+ return false;
+ }
+
+ InternalRow merged = mergedIterator.next();
+ if (outputPositions == null) {
+ this.current = merged;
+ } else {
+ // Strip the extra sort key columns that were added for comparison
purposes.
+ for (int i = 0; i < outputPositions.length; i++) {
+ outputValues[i] = merged.get(outputPositions[i], outputDataTypes[i]);
+ }
+ this.current = new GenericInternalRow(outputValues);
+ }
+
+ return true;
+ }
+
+ @Override
+ public InternalRow get() {
+ return current;
+ }
+
+ @Override
+ public void close() throws IOException {
+ resources.close();
+ }
+
+ @Override
+ public CustomTaskMetric[] currentMetricsValues() {
+ long totalDeletes =
+ fileReaders.stream()
+ .flatMap(reader -> Arrays.stream(reader.currentMetricsValues()))
+ .filter(metric -> metric instanceof TaskNumDeletes)
+ .mapToLong(CustomTaskMetric::value)
+ .sum();
+ return new CustomTaskMetric[] {
+ new TaskNumSplits(fileReaders.size()), new TaskNumDeletes(totalDeletes)
+ };
+ }
+
+ /**
+ * Builds a comparator for merging {@link InternalRow}s by the given sort
order. Uses {@link
+ * SortOrderComparators} which handles all transform types (identity,
bucket, truncate), ASC/DESC
+ * directions, and null ordering. The two {@link InternalRowWrapper}
instances are allocated once
+ * and reused — {@code wrap()} just updates an internal reference.
+ */
+ private static Comparator<InternalRow> buildComparator(
+ Schema mergeReadSchema, SortOrder sortOrder) {
+ StructType sparkSchema = SparkSchemaUtil.convert(mergeReadSchema);
+ Comparator<StructLike> keyComparator =
+ SortOrderComparators.forSchema(mergeReadSchema, sortOrder);
+ InternalRowWrapper left = new InternalRowWrapper(sparkSchema,
mergeReadSchema.asStruct());
+ InternalRowWrapper right = new InternalRowWrapper(sparkSchema,
mergeReadSchema.asStruct());
+ return (r1, r2) -> keyComparator.compare(left.wrap(r1), right.wrap(r2));
+ }
+
+ /**
+ * Returns the Spark {@link DataType}s for each column in {@code
projection}, or {@code null} when
+ * {@code outputPositions} is {@code null} (no extra columns were added, no
projection needed).
+ */
+ private static DataType[] buildOutputDataTypes(Schema projection, int[]
outputPositions) {
+ if (outputPositions == null) {
+ return null;
+ }
+ StructType sparkSchema = SparkSchemaUtil.convert(projection);
+ DataType[] dataTypes = new DataType[sparkSchema.fields().length];
+ for (int i = 0; i < sparkSchema.fields().length; i++) {
+ dataTypes[i] = sparkSchema.fields()[i].dataType();
+ }
+ return dataTypes;
+ }
+
+ /**
+ * Returns the schema to use when reading each file. This is the requested
{@code projection}
+ * augmented with any sort key columns that are not already present, so the
merge comparator can
+ * access every sort key field regardless of what Spark projected.
+ */
+ private static Schema mergeReadSchema(Schema projection, SortOrder
sortOrder, Table table) {
Review Comment:
I added a test for nested structure and it failed. Thanks for pointing this
out. I have some ideas on how we can make it work for nested structs but I
would like to make that change as a follow up.
For this PR, if the sort key is a nested field, then we do not report the
ordering. I added a `hasNestedSortFields()` check in `canReportOrdering()`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]