This is an automated email from the ASF dual-hosted git repository.
chengchengjin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 74907b44a9 [GLUTEN-8948][VL] Rename the iceberg test classes to follow
surefire plugin's name pattern and fix tests (#9927)
74907b44a9 is described below
commit 74907b44a901978940542d5aae90210fa365ff68
Author: Jin Chengcheng <[email protected]>
AuthorDate: Wed Jun 25 07:55:45 2025 +0100
[GLUTEN-8948][VL] Rename the iceberg test classes to follow surefire
plugin's name pattern and fix tests (#9927)
---
.../extensions/GlutenTestMergeOnReadMerge.java | 62 ---------
...elete.java => TestGlutenCopyOnWriteDelete.java} | 4 +-
...elete.java => TestGlutenMergeOnReadDelete.java} | 4 +-
.../extensions/TestGlutenMergeOnReadMerge.java | 145 +++++++++++++++++++++
...pdate.java => TestGlutenMergeOnReadUpdate.java} | 4 +-
...oragePartitionedJoinsInRowLevelOperations.java} | 4 +-
...va => TestGlutenSystemFunctionPushDownDQL.java} | 4 +-
...ystemFunctionPushDownInRowLevelOperations.java} | 6 +-
...terV2.java => TestGlutenDataFrameWriterV2.java} | 2 +-
...va => TestGlutenDataFrameWriterV2Coercion.java} | 4 +-
...tions.java => TestGlutenDataSourceOptions.java} | 2 +-
...java => TestGlutenIcebergSourceHiveTables.java} | 2 +-
...a.java => TestGlutenIdentityPartitionData.java} | 4 +-
...le.java => TestGlutenPositionDeletesTable.java} | 4 +-
...tering.java => TestGlutenRuntimeFiltering.java} | 4 +-
...ns.java => TestGlutenSparkMetadataColumns.java} | 4 +-
...gedScan.java => TestGlutenSparkStagedScan.java} | 4 +-
...hDown.java => TestGlutenAggregatePushDown.java} | 4 +-
...stDeleteFrom.java => TestGlutenDeleteFrom.java} | 4 +-
...va => TestGlutenPartitionedWritesAsSelect.java} | 2 +-
...va => TestGlutenPartitionedWritesToBranch.java} | 4 +-
...=> TestGlutenPartitionedWritesToWapBranch.java} | 4 +-
...GlutenTestSelect.java => TestGlutenSelect.java} | 4 +-
...ne.java => TestGlutenTimestampWithoutZone.java} | 4 +-
.../spark/source/TestSparkReaderDeletes.java | 3 +-
.../gluten/execution/IcebergScanTransformer.scala | 119 +++++++++++++++--
pom.xml | 2 +-
27 files changed, 298 insertions(+), 115 deletions(-)
diff --git
a/backends-velox/src-iceberg-spark34/test/java/org/apache/gluten/extensions/GlutenTestMergeOnReadMerge.java
b/backends-velox/src-iceberg-spark34/test/java/org/apache/gluten/extensions/GlutenTestMergeOnReadMerge.java
deleted file mode 100644
index f4774a08bf..0000000000
---
a/backends-velox/src-iceberg-spark34/test/java/org/apache/gluten/extensions/GlutenTestMergeOnReadMerge.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gluten.extensions;
-
-import org.apache.iceberg.PlanningMode;
-import org.apache.iceberg.spark.extensions.TestMergeOnReadMerge;
-import org.junit.Test;
-
-import java.util.Map;
-
-public class GlutenTestMergeOnReadMerge extends TestMergeOnReadMerge {
- public GlutenTestMergeOnReadMerge(
- String catalogName,
- String implementation,
- Map<String, String> config,
- String fileFormat,
- boolean vectorized,
- String distributionMode,
- boolean fanoutEnabled,
- String branch,
- PlanningMode planningMode) {
- super(
- catalogName,
- implementation,
- config,
- fileFormat,
- vectorized,
- distributionMode,
- fanoutEnabled,
- branch,
- planningMode);
- }
-
- @Test
- public synchronized void testMergeWithConcurrentTableRefresh() {
- System.out.println("Run timeout");
- }
-
- @Test
- public synchronized void testMergeWithSerializableIsolation() {
- System.out.println("Run timeout");
- }
-
- @Test
- public synchronized void testMergeWithSnapshotIsolation() {
- System.out.println("Run timeout");
- }
-}
diff --git
a/backends-velox/src-iceberg-spark34/test/java/org/apache/gluten/extensions/GlutenTestCopyOnWriteDelete.java
b/backends-velox/src-iceberg-spark34/test/java/org/apache/gluten/extensions/TestGlutenCopyOnWriteDelete.java
similarity index 94%
rename from
backends-velox/src-iceberg-spark34/test/java/org/apache/gluten/extensions/GlutenTestCopyOnWriteDelete.java
rename to
backends-velox/src-iceberg-spark34/test/java/org/apache/gluten/extensions/TestGlutenCopyOnWriteDelete.java
index d161fca882..e03d4aba8c 100644
---
a/backends-velox/src-iceberg-spark34/test/java/org/apache/gluten/extensions/GlutenTestCopyOnWriteDelete.java
+++
b/backends-velox/src-iceberg-spark34/test/java/org/apache/gluten/extensions/TestGlutenCopyOnWriteDelete.java
@@ -23,8 +23,8 @@ import org.junit.Test;
import java.util.Map;
import java.util.concurrent.ExecutionException;
-public class GlutenTestCopyOnWriteDelete extends TestCopyOnWriteDelete {
- public GlutenTestCopyOnWriteDelete(
+public class TestGlutenCopyOnWriteDelete extends TestCopyOnWriteDelete {
+ public TestGlutenCopyOnWriteDelete(
String catalogName,
String implementation,
Map<String, String> config,
diff --git
a/backends-velox/src-iceberg-spark34/test/java/org/apache/gluten/extensions/GlutenTestMergeOnReadDelete.java
b/backends-velox/src-iceberg-spark34/test/java/org/apache/gluten/extensions/TestGlutenMergeOnReadDelete.java
similarity index 94%
rename from
backends-velox/src-iceberg-spark34/test/java/org/apache/gluten/extensions/GlutenTestMergeOnReadDelete.java
rename to
backends-velox/src-iceberg-spark34/test/java/org/apache/gluten/extensions/TestGlutenMergeOnReadDelete.java
index 7a2932a321..f2fe3e3341 100644
---
a/backends-velox/src-iceberg-spark34/test/java/org/apache/gluten/extensions/GlutenTestMergeOnReadDelete.java
+++
b/backends-velox/src-iceberg-spark34/test/java/org/apache/gluten/extensions/TestGlutenMergeOnReadDelete.java
@@ -23,8 +23,8 @@ import org.junit.Test;
import java.util.Map;
import java.util.concurrent.ExecutionException;
-public class GlutenTestMergeOnReadDelete extends TestMergeOnReadDelete {
- public GlutenTestMergeOnReadDelete(
+public class TestGlutenMergeOnReadDelete extends TestMergeOnReadDelete {
+ public TestGlutenMergeOnReadDelete(
String catalogName,
String implementation,
Map<String, String> config,
diff --git
a/backends-velox/src-iceberg-spark34/test/java/org/apache/gluten/extensions/TestGlutenMergeOnReadMerge.java
b/backends-velox/src-iceberg-spark34/test/java/org/apache/gluten/extensions/TestGlutenMergeOnReadMerge.java
new file mode 100644
index 0000000000..efb919f1b4
--- /dev/null
+++
b/backends-velox/src-iceberg-spark34/test/java/org/apache/gluten/extensions/TestGlutenMergeOnReadMerge.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.extensions;
+
+import org.apache.iceberg.PlanningMode;
+import org.apache.iceberg.RowLevelOperationMode;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.spark.extensions.TestMergeOnReadMerge;
+import org.apache.spark.sql.execution.SparkPlan;
+import org.apache.spark.sql.internal.SQLConf;
+import org.junit.Test;
+
+import java.util.Map;
+
+import static org.apache.iceberg.RowLevelOperationMode.COPY_ON_WRITE;
+import static org.apache.iceberg.TableProperties.MERGE_MODE;
+import static org.apache.iceberg.TableProperties.MERGE_MODE_DEFAULT;
+import static org.assertj.core.api.Assertions.assertThat;
+
+public class TestGlutenMergeOnReadMerge extends TestMergeOnReadMerge {
+ public TestGlutenMergeOnReadMerge(
+ String catalogName,
+ String implementation,
+ Map<String, String> config,
+ String fileFormat,
+ boolean vectorized,
+ String distributionMode,
+ boolean fanoutEnabled,
+ String branch,
+ PlanningMode planningMode) {
+ super(
+ catalogName,
+ implementation,
+ config,
+ fileFormat,
+ vectorized,
+ distributionMode,
+ fanoutEnabled,
+ branch,
+ planningMode);
+ }
+
+ @Test
+ public synchronized void testMergeWithConcurrentTableRefresh() {
+ System.out.println("Run timeout");
+ }
+
+ @Test
+ public synchronized void testMergeWithSerializableIsolation() {
+ System.out.println("Run timeout");
+ }
+
+ @Test
+ public synchronized void testMergeWithSnapshotIsolation() {
+ System.out.println("Run timeout");
+ }
+
+ // The matched join string is changed from Join to
ShuffledHashJoinExecTransformer
+ @Test
+ public void testMergeConditionSplitIntoTargetPredicateAndJoinCondition() {
+ createAndInitTable(
+ "id INT, salary INT, dep STRING, sub_dep STRING",
+ "PARTITIONED BY (dep, sub_dep)",
+ "{ \"id\": 1, \"salary\": 100, \"dep\": \"d1\", \"sub_dep\": \"sd1\"
}\n"
+ + "{ \"id\": 6, \"salary\": 600, \"dep\": \"d6\", \"sub_dep\":
\"sd6\" }");
+
+ createOrReplaceView(
+ "source",
+ "id INT, salary INT, dep STRING, sub_dep STRING",
+ "{ \"id\": 1, \"salary\": 101, \"dep\": \"d1\", \"sub_dep\": \"sd1\"
}\n"
+ + "{ \"id\": 2, \"salary\": 200, \"dep\": \"d2\", \"sub_dep\":
\"sd2\" }\n"
+ + "{ \"id\": 3, \"salary\": 300, \"dep\": \"d3\", \"sub_dep\":
\"sd3\" }");
+
+ String query =
+ String.format(
+ "MERGE INTO %s AS t USING source AS s "
+ + "ON t.id == s.id AND ((t.dep = 'd1' AND t.sub_dep IN ('sd1',
'sd3')) OR (t.dep = 'd6' AND t.sub_dep IN ('sd2', 'sd6'))) "
+ + "WHEN MATCHED THEN "
+ + " UPDATE SET salary = s.salary "
+ + "WHEN NOT MATCHED THEN "
+ + " INSERT *",
+ commitTarget());
+
+ Table table = validationCatalog.loadTable(tableIdent);
+
+ if (mode(table) == COPY_ON_WRITE) {
+ checkJoinAndFilterConditions(
+ query,
+ "ShuffledHashJoinExecTransformer [id], [id], FullOuter",
+ "((dep = 'd1' AND sub_dep IN ('sd1', 'sd3')) OR (dep = 'd6' AND
sub_dep IN ('sd2', 'sd6')))");
+ } else {
+ checkJoinAndFilterConditions(
+ query,
+ "ShuffledHashJoinExecTransformer [id], [id], RightOuter",
+ "((dep = 'd1' AND sub_dep IN ('sd1', 'sd3')) OR (dep = 'd6' AND
sub_dep IN ('sd2', 'sd6')))");
+ }
+
+ assertEquals(
+ "Should have expected rows",
+ ImmutableList.of(
+ row(1, 101, "d1", "sd1"), // updated
+ row(2, 200, "d2", "sd2"), // new
+ row(3, 300, "d3", "sd3"), // new
+ row(6, 600, "d6", "sd6")), // existing
+ sql("SELECT * FROM %s ORDER BY id", selectTarget()));
+ }
+
+ private void checkJoinAndFilterConditions(String query, String join, String
icebergFilters) {
+ // disable runtime filtering for easier validation
+ withSQLConf(
+ ImmutableMap.of(SQLConf.DYNAMIC_PARTITION_PRUNING_ENABLED().key(),
"false"),
+ () -> {
+ SparkPlan sparkPlan = executeAndKeepPlan(() -> sql(query));
+ String planAsString = sparkPlan.toString().replaceAll("#(\\d+L?)",
"");
+
+ // Remove "\n" because gluten prints BuildRight or BuildLeft in the
end.
+ assertThat(planAsString).as("Join should match").contains(join);
+
+ assertThat(planAsString)
+ .as("Pushed filters must match")
+ .contains("[filters=" + icebergFilters + ",");
+ });
+ }
+
+ private RowLevelOperationMode mode(Table table) {
+ String modeName = table.properties().getOrDefault(MERGE_MODE,
MERGE_MODE_DEFAULT);
+ return RowLevelOperationMode.fromName(modeName);
+ }
+}
diff --git
a/backends-velox/src-iceberg-spark34/test/java/org/apache/gluten/extensions/GlutenTestMergeOnReadUpdate.java
b/backends-velox/src-iceberg-spark34/test/java/org/apache/gluten/extensions/TestGlutenMergeOnReadUpdate.java
similarity index 94%
rename from
backends-velox/src-iceberg-spark34/test/java/org/apache/gluten/extensions/GlutenTestMergeOnReadUpdate.java
rename to
backends-velox/src-iceberg-spark34/test/java/org/apache/gluten/extensions/TestGlutenMergeOnReadUpdate.java
index b74157d3f3..f2db135cec 100644
---
a/backends-velox/src-iceberg-spark34/test/java/org/apache/gluten/extensions/GlutenTestMergeOnReadUpdate.java
+++
b/backends-velox/src-iceberg-spark34/test/java/org/apache/gluten/extensions/TestGlutenMergeOnReadUpdate.java
@@ -23,8 +23,8 @@ import org.junit.Test;
import java.util.Map;
import java.util.concurrent.ExecutionException;
-public class GlutenTestMergeOnReadUpdate extends TestMergeOnReadUpdate {
- public GlutenTestMergeOnReadUpdate(
+public class TestGlutenMergeOnReadUpdate extends TestMergeOnReadUpdate {
+ public TestGlutenMergeOnReadUpdate(
String catalogName,
String implementation,
Map<String, String> config,
diff --git
a/backends-velox/src-iceberg-spark34/test/java/org/apache/gluten/extensions/GlutenTestStoragePartitionedJoinsInRowLevelOperations.java
b/backends-velox/src-iceberg-spark34/test/java/org/apache/gluten/extensions/TestGlutenStoragePartitionedJoinsInRowLevelOperations.java
similarity index 90%
rename from
backends-velox/src-iceberg-spark34/test/java/org/apache/gluten/extensions/GlutenTestStoragePartitionedJoinsInRowLevelOperations.java
rename to
backends-velox/src-iceberg-spark34/test/java/org/apache/gluten/extensions/TestGlutenStoragePartitionedJoinsInRowLevelOperations.java
index 3f324b7827..9d650c6f6c 100644
---
a/backends-velox/src-iceberg-spark34/test/java/org/apache/gluten/extensions/GlutenTestStoragePartitionedJoinsInRowLevelOperations.java
+++
b/backends-velox/src-iceberg-spark34/test/java/org/apache/gluten/extensions/TestGlutenStoragePartitionedJoinsInRowLevelOperations.java
@@ -20,9 +20,9 @@ import
org.apache.iceberg.spark.extensions.TestStoragePartitionedJoinsInRowLevel
import java.util.Map;
-public class GlutenTestStoragePartitionedJoinsInRowLevelOperations
+public class TestGlutenStoragePartitionedJoinsInRowLevelOperations
extends TestStoragePartitionedJoinsInRowLevelOperations {
- public GlutenTestStoragePartitionedJoinsInRowLevelOperations(
+ public TestGlutenStoragePartitionedJoinsInRowLevelOperations(
String catalogName, String implementation, Map<String, String> config) {
super(catalogName, implementation, config);
}
diff --git
a/backends-velox/src-iceberg-spark34/test/java/org/apache/gluten/extensions/GlutenTestSystemFunctionPushDownDQL.java
b/backends-velox/src-iceberg-spark34/test/java/org/apache/gluten/extensions/TestGlutenSystemFunctionPushDownDQL.java
similarity index 90%
rename from
backends-velox/src-iceberg-spark34/test/java/org/apache/gluten/extensions/GlutenTestSystemFunctionPushDownDQL.java
rename to
backends-velox/src-iceberg-spark34/test/java/org/apache/gluten/extensions/TestGlutenSystemFunctionPushDownDQL.java
index d195b3e21b..059da14725 100644
---
a/backends-velox/src-iceberg-spark34/test/java/org/apache/gluten/extensions/GlutenTestSystemFunctionPushDownDQL.java
+++
b/backends-velox/src-iceberg-spark34/test/java/org/apache/gluten/extensions/TestGlutenSystemFunctionPushDownDQL.java
@@ -20,8 +20,8 @@ import
org.apache.iceberg.spark.extensions.TestSystemFunctionPushDownDQL;
import java.util.Map;
-public class GlutenTestSystemFunctionPushDownDQL extends
TestSystemFunctionPushDownDQL {
- public GlutenTestSystemFunctionPushDownDQL(
+public class TestGlutenSystemFunctionPushDownDQL extends
TestSystemFunctionPushDownDQL {
+ public TestGlutenSystemFunctionPushDownDQL(
String catalogName, String implementation, Map<String, String> config) {
super(catalogName, implementation, config);
}
diff --git
a/backends-velox/src-iceberg-spark34/test/java/org/apache/gluten/extensions/GlutenTestSystemFunctionPushDownInRowLevelOperations.java
b/backends-velox/src-iceberg-spark34/test/java/org/apache/gluten/extensions/TestGlutenSystemFunctionPushDownInRowLevelOperations.java
similarity index 84%
rename from
backends-velox/src-iceberg-spark34/test/java/org/apache/gluten/extensions/GlutenTestSystemFunctionPushDownInRowLevelOperations.java
rename to
backends-velox/src-iceberg-spark34/test/java/org/apache/gluten/extensions/TestGlutenSystemFunctionPushDownInRowLevelOperations.java
index 85480eac0c..2eaaa6e5fe 100644
---
a/backends-velox/src-iceberg-spark34/test/java/org/apache/gluten/extensions/GlutenTestSystemFunctionPushDownInRowLevelOperations.java
+++
b/backends-velox/src-iceberg-spark34/test/java/org/apache/gluten/extensions/TestGlutenSystemFunctionPushDownInRowLevelOperations.java
@@ -18,9 +18,9 @@ package org.apache.gluten.extensions;
import java.util.Map;
-public class GlutenTestSystemFunctionPushDownInRowLevelOperations
- extends GlutenTestSystemFunctionPushDownDQL {
- public GlutenTestSystemFunctionPushDownInRowLevelOperations(
+public class TestGlutenSystemFunctionPushDownInRowLevelOperations
+ extends TestGlutenSystemFunctionPushDownDQL {
+ public TestGlutenSystemFunctionPushDownInRowLevelOperations(
String catalogName, String implementation, Map<String, String> config) {
super(catalogName, implementation, config);
}
diff --git
a/backends-velox/src-iceberg-spark34/test/java/org/apache/gluten/source/GlutenTestDataFrameWriterV2.java
b/backends-velox/src-iceberg-spark34/test/java/org/apache/gluten/source/TestGlutenDataFrameWriterV2.java
similarity index 93%
rename from
backends-velox/src-iceberg-spark34/test/java/org/apache/gluten/source/GlutenTestDataFrameWriterV2.java
rename to
backends-velox/src-iceberg-spark34/test/java/org/apache/gluten/source/TestGlutenDataFrameWriterV2.java
index f192c9be40..b66015515d 100644
---
a/backends-velox/src-iceberg-spark34/test/java/org/apache/gluten/source/GlutenTestDataFrameWriterV2.java
+++
b/backends-velox/src-iceberg-spark34/test/java/org/apache/gluten/source/TestGlutenDataFrameWriterV2.java
@@ -18,4 +18,4 @@ package org.apache.gluten.source;
import org.apache.iceberg.spark.source.TestDataFrameWriterV2;
-public class GlutenTestDataFrameWriterV2 extends TestDataFrameWriterV2 {}
+public class TestGlutenDataFrameWriterV2 extends TestDataFrameWriterV2 {}
diff --git
a/backends-velox/src-iceberg-spark34/test/java/org/apache/gluten/source/GlutenTestDataFrameWriterV2Coercion.java
b/backends-velox/src-iceberg-spark34/test/java/org/apache/gluten/source/TestGlutenDataFrameWriterV2Coercion.java
similarity index 88%
rename from
backends-velox/src-iceberg-spark34/test/java/org/apache/gluten/source/GlutenTestDataFrameWriterV2Coercion.java
rename to
backends-velox/src-iceberg-spark34/test/java/org/apache/gluten/source/TestGlutenDataFrameWriterV2Coercion.java
index aeeb9245a7..f40b98bf18 100644
---
a/backends-velox/src-iceberg-spark34/test/java/org/apache/gluten/source/GlutenTestDataFrameWriterV2Coercion.java
+++
b/backends-velox/src-iceberg-spark34/test/java/org/apache/gluten/source/TestGlutenDataFrameWriterV2Coercion.java
@@ -19,8 +19,8 @@ package org.apache.gluten.source;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.spark.source.TestDataFrameWriterV2Coercion;
-public class GlutenTestDataFrameWriterV2Coercion extends
TestDataFrameWriterV2Coercion {
- public GlutenTestDataFrameWriterV2Coercion(FileFormat format, String
dataType) {
+public class TestGlutenDataFrameWriterV2Coercion extends
TestDataFrameWriterV2Coercion {
+ public TestGlutenDataFrameWriterV2Coercion(FileFormat format, String
dataType) {
super(format, dataType);
}
}
diff --git
a/backends-velox/src-iceberg-spark34/test/java/org/apache/gluten/source/GlutenTestDataSourceOptions.java
b/backends-velox/src-iceberg-spark34/test/java/org/apache/gluten/source/TestGlutenDataSourceOptions.java
similarity index 93%
rename from
backends-velox/src-iceberg-spark34/test/java/org/apache/gluten/source/GlutenTestDataSourceOptions.java
rename to
backends-velox/src-iceberg-spark34/test/java/org/apache/gluten/source/TestGlutenDataSourceOptions.java
index 44da14b7d6..11c2ecfe71 100644
---
a/backends-velox/src-iceberg-spark34/test/java/org/apache/gluten/source/GlutenTestDataSourceOptions.java
+++
b/backends-velox/src-iceberg-spark34/test/java/org/apache/gluten/source/TestGlutenDataSourceOptions.java
@@ -18,4 +18,4 @@ package org.apache.gluten.source;
import org.apache.iceberg.spark.source.TestDataSourceOptions;
-public class GlutenTestDataSourceOptions extends TestDataSourceOptions {}
+public class TestGlutenDataSourceOptions extends TestDataSourceOptions {}
diff --git
a/backends-velox/src-iceberg-spark34/test/java/org/apache/gluten/source/GlutenTestIcebergSourceHiveTables.java
b/backends-velox/src-iceberg-spark34/test/java/org/apache/gluten/source/TestGlutenIcebergSourceHiveTables.java
similarity index 94%
rename from
backends-velox/src-iceberg-spark34/test/java/org/apache/gluten/source/GlutenTestIcebergSourceHiveTables.java
rename to
backends-velox/src-iceberg-spark34/test/java/org/apache/gluten/source/TestGlutenIcebergSourceHiveTables.java
index 99a79e8352..c3e921e324 100644
---
a/backends-velox/src-iceberg-spark34/test/java/org/apache/gluten/source/GlutenTestIcebergSourceHiveTables.java
+++
b/backends-velox/src-iceberg-spark34/test/java/org/apache/gluten/source/TestGlutenIcebergSourceHiveTables.java
@@ -19,4 +19,4 @@ package org.apache.gluten.source;
import org.apache.iceberg.spark.source.TestIcebergSourceHiveTables;
// Fallback all the table scan because source table is metadata table with
format avro.
-public class GlutenTestIcebergSourceHiveTables extends
TestIcebergSourceHiveTables {}
+public class TestGlutenIcebergSourceHiveTables extends
TestIcebergSourceHiveTables {}
diff --git
a/backends-velox/src-iceberg-spark34/test/java/org/apache/gluten/source/GlutenTestIdentityPartitionData.java
b/backends-velox/src-iceberg-spark34/test/java/org/apache/gluten/source/TestGlutenIdentityPartitionData.java
similarity index 91%
rename from
backends-velox/src-iceberg-spark34/test/java/org/apache/gluten/source/GlutenTestIdentityPartitionData.java
rename to
backends-velox/src-iceberg-spark34/test/java/org/apache/gluten/source/TestGlutenIdentityPartitionData.java
index 02811e5afd..506f8a5226 100644
---
a/backends-velox/src-iceberg-spark34/test/java/org/apache/gluten/source/GlutenTestIdentityPartitionData.java
+++
b/backends-velox/src-iceberg-spark34/test/java/org/apache/gluten/source/TestGlutenIdentityPartitionData.java
@@ -19,8 +19,8 @@ package org.apache.gluten.source;
import org.apache.iceberg.PlanningMode;
import org.apache.iceberg.spark.source.TestIdentityPartitionData;
-public class GlutenTestIdentityPartitionData extends TestIdentityPartitionData
{
- public GlutenTestIdentityPartitionData(
+public class TestGlutenIdentityPartitionData extends TestIdentityPartitionData
{
+ public TestGlutenIdentityPartitionData(
String format, boolean vectorized, PlanningMode planningMode) {
super(format, vectorized, planningMode);
}
diff --git
a/backends-velox/src-iceberg-spark34/test/java/org/apache/gluten/source/GlutenTestPositionDeletesTable.java
b/backends-velox/src-iceberg-spark34/test/java/org/apache/gluten/source/TestGlutenPositionDeletesTable.java
similarity index 91%
rename from
backends-velox/src-iceberg-spark34/test/java/org/apache/gluten/source/GlutenTestPositionDeletesTable.java
rename to
backends-velox/src-iceberg-spark34/test/java/org/apache/gluten/source/TestGlutenPositionDeletesTable.java
index 3dd9df6d1a..02d348544d 100644
---
a/backends-velox/src-iceberg-spark34/test/java/org/apache/gluten/source/GlutenTestPositionDeletesTable.java
+++
b/backends-velox/src-iceberg-spark34/test/java/org/apache/gluten/source/TestGlutenPositionDeletesTable.java
@@ -21,8 +21,8 @@ import
org.apache.iceberg.spark.source.TestPositionDeletesTable;
import java.util.Map;
-public class GlutenTestPositionDeletesTable extends TestPositionDeletesTable {
- public GlutenTestPositionDeletesTable(
+public class TestGlutenPositionDeletesTable extends TestPositionDeletesTable {
+ public TestGlutenPositionDeletesTable(
String catalogName, String implementation, Map<String, String> config,
FileFormat format) {
super(catalogName, implementation, config, format);
}
diff --git
a/backends-velox/src-iceberg-spark34/test/java/org/apache/gluten/source/GlutenTestRuntimeFiltering.java
b/backends-velox/src-iceberg-spark34/test/java/org/apache/gluten/source/TestGlutenRuntimeFiltering.java
similarity index 88%
rename from
backends-velox/src-iceberg-spark34/test/java/org/apache/gluten/source/GlutenTestRuntimeFiltering.java
rename to
backends-velox/src-iceberg-spark34/test/java/org/apache/gluten/source/TestGlutenRuntimeFiltering.java
index 950db498ed..90e3828991 100644
---
a/backends-velox/src-iceberg-spark34/test/java/org/apache/gluten/source/GlutenTestRuntimeFiltering.java
+++
b/backends-velox/src-iceberg-spark34/test/java/org/apache/gluten/source/TestGlutenRuntimeFiltering.java
@@ -19,8 +19,8 @@ package org.apache.gluten.source;
import org.apache.iceberg.PlanningMode;
import org.apache.iceberg.spark.source.TestRuntimeFiltering;
-public class GlutenTestRuntimeFiltering extends TestRuntimeFiltering {
- public GlutenTestRuntimeFiltering(PlanningMode planningMode) {
+public class TestGlutenRuntimeFiltering extends TestRuntimeFiltering {
+ public TestGlutenRuntimeFiltering(PlanningMode planningMode) {
super(planningMode);
}
}
diff --git
a/backends-velox/src-iceberg-spark34/test/java/org/apache/gluten/source/GlutenTestSparkMetadataColumns.java
b/backends-velox/src-iceberg-spark34/test/java/org/apache/gluten/source/TestGlutenSparkMetadataColumns.java
similarity index 91%
rename from
backends-velox/src-iceberg-spark34/test/java/org/apache/gluten/source/GlutenTestSparkMetadataColumns.java
rename to
backends-velox/src-iceberg-spark34/test/java/org/apache/gluten/source/TestGlutenSparkMetadataColumns.java
index b090b10b1d..8e49b5876b 100644
---
a/backends-velox/src-iceberg-spark34/test/java/org/apache/gluten/source/GlutenTestSparkMetadataColumns.java
+++
b/backends-velox/src-iceberg-spark34/test/java/org/apache/gluten/source/TestGlutenSparkMetadataColumns.java
@@ -19,8 +19,8 @@ package org.apache.gluten.source;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.spark.source.TestSparkMetadataColumns;
-public class GlutenTestSparkMetadataColumns extends TestSparkMetadataColumns {
- public GlutenTestSparkMetadataColumns(
+public class TestGlutenSparkMetadataColumns extends TestSparkMetadataColumns {
+ public TestGlutenSparkMetadataColumns(
FileFormat fileFormat, boolean vectorized, int formatVersion) {
super(fileFormat, vectorized, formatVersion);
}
diff --git
a/backends-velox/src-iceberg-spark34/test/java/org/apache/gluten/source/GlutenTestSparkStagedScan.java
b/backends-velox/src-iceberg-spark34/test/java/org/apache/gluten/source/TestGlutenSparkStagedScan.java
similarity index 91%
rename from
backends-velox/src-iceberg-spark34/test/java/org/apache/gluten/source/GlutenTestSparkStagedScan.java
rename to
backends-velox/src-iceberg-spark34/test/java/org/apache/gluten/source/TestGlutenSparkStagedScan.java
index 32c1441432..09a6583320 100644
---
a/backends-velox/src-iceberg-spark34/test/java/org/apache/gluten/source/GlutenTestSparkStagedScan.java
+++
b/backends-velox/src-iceberg-spark34/test/java/org/apache/gluten/source/TestGlutenSparkStagedScan.java
@@ -20,8 +20,8 @@ import org.apache.iceberg.spark.source.TestSparkStagedScan;
import java.util.Map;
-public class GlutenTestSparkStagedScan extends TestSparkStagedScan {
- public GlutenTestSparkStagedScan(
+public class TestGlutenSparkStagedScan extends TestSparkStagedScan {
+ public TestGlutenSparkStagedScan(
String catalogName, String implementation, Map<String, String> config) {
super(catalogName, implementation, config);
}
diff --git
a/backends-velox/src-iceberg-spark34/test/java/org/apache/gluten/sql/GlutenTestAggregatePushDown.java
b/backends-velox/src-iceberg-spark34/test/java/org/apache/gluten/sql/TestGlutenAggregatePushDown.java
similarity index 95%
rename from
backends-velox/src-iceberg-spark34/test/java/org/apache/gluten/sql/GlutenTestAggregatePushDown.java
rename to
backends-velox/src-iceberg-spark34/test/java/org/apache/gluten/sql/TestGlutenAggregatePushDown.java
index 6b332ddab1..17a578bad8 100644
---
a/backends-velox/src-iceberg-spark34/test/java/org/apache/gluten/sql/GlutenTestAggregatePushDown.java
+++
b/backends-velox/src-iceberg-spark34/test/java/org/apache/gluten/sql/TestGlutenAggregatePushDown.java
@@ -31,8 +31,8 @@ import org.junit.BeforeClass;
import java.util.Map;
-public class GlutenTestAggregatePushDown extends TestAggregatePushDown {
- public GlutenTestAggregatePushDown(
+public class TestGlutenAggregatePushDown extends TestAggregatePushDown {
+ public TestGlutenAggregatePushDown(
String catalogName, String implementation, Map<String, String> config) {
super(catalogName, implementation, config);
}
diff --git
a/backends-velox/src-iceberg-spark34/test/java/org/apache/gluten/sql/GlutenTestDeleteFrom.java
b/backends-velox/src-iceberg-spark34/test/java/org/apache/gluten/sql/TestGlutenDeleteFrom.java
similarity index 92%
rename from
backends-velox/src-iceberg-spark34/test/java/org/apache/gluten/sql/GlutenTestDeleteFrom.java
rename to
backends-velox/src-iceberg-spark34/test/java/org/apache/gluten/sql/TestGlutenDeleteFrom.java
index af68cbbdf7..f52f0ddb8a 100644
---
a/backends-velox/src-iceberg-spark34/test/java/org/apache/gluten/sql/GlutenTestDeleteFrom.java
+++
b/backends-velox/src-iceberg-spark34/test/java/org/apache/gluten/sql/TestGlutenDeleteFrom.java
@@ -20,8 +20,8 @@ import org.apache.iceberg.spark.sql.TestDeleteFrom;
import java.util.Map;
-public class GlutenTestDeleteFrom extends TestDeleteFrom {
- public GlutenTestDeleteFrom(
+public class TestGlutenDeleteFrom extends TestDeleteFrom {
+ public TestGlutenDeleteFrom(
String catalogName, String implementation, Map<String, String> config) {
super(catalogName, implementation, config);
}
diff --git
a/backends-velox/src-iceberg-spark34/test/java/org/apache/gluten/sql/GlutenTestPartitionedWritesAsSelect.java
b/backends-velox/src-iceberg-spark34/test/java/org/apache/gluten/sql/TestGlutenPartitionedWritesAsSelect.java
similarity index 93%
rename from
backends-velox/src-iceberg-spark34/test/java/org/apache/gluten/sql/GlutenTestPartitionedWritesAsSelect.java
rename to
backends-velox/src-iceberg-spark34/test/java/org/apache/gluten/sql/TestGlutenPartitionedWritesAsSelect.java
index 31266eabe2..52221d6e85 100644
---
a/backends-velox/src-iceberg-spark34/test/java/org/apache/gluten/sql/GlutenTestPartitionedWritesAsSelect.java
+++
b/backends-velox/src-iceberg-spark34/test/java/org/apache/gluten/sql/TestGlutenPartitionedWritesAsSelect.java
@@ -18,4 +18,4 @@ package org.apache.gluten.sql;
import org.apache.iceberg.spark.sql.TestPartitionedWritesAsSelect;
-public class GlutenTestPartitionedWritesAsSelect extends
TestPartitionedWritesAsSelect {}
+public class TestGlutenPartitionedWritesAsSelect extends
TestPartitionedWritesAsSelect {}
diff --git
a/backends-velox/src-iceberg-spark34/test/java/org/apache/gluten/sql/GlutenTestPartitionedWritesToBranch.java
b/backends-velox/src-iceberg-spark34/test/java/org/apache/gluten/sql/TestGlutenPartitionedWritesToBranch.java
similarity index 90%
rename from
backends-velox/src-iceberg-spark34/test/java/org/apache/gluten/sql/GlutenTestPartitionedWritesToBranch.java
rename to
backends-velox/src-iceberg-spark34/test/java/org/apache/gluten/sql/TestGlutenPartitionedWritesToBranch.java
index b0aeb1e332..6711a7fd22 100644
---
a/backends-velox/src-iceberg-spark34/test/java/org/apache/gluten/sql/GlutenTestPartitionedWritesToBranch.java
+++
b/backends-velox/src-iceberg-spark34/test/java/org/apache/gluten/sql/TestGlutenPartitionedWritesToBranch.java
@@ -20,8 +20,8 @@ import
org.apache.iceberg.spark.sql.TestPartitionedWritesToBranch;
import java.util.Map;
-public class GlutenTestPartitionedWritesToBranch extends
TestPartitionedWritesToBranch {
- public GlutenTestPartitionedWritesToBranch(
+public class TestGlutenPartitionedWritesToBranch extends
TestPartitionedWritesToBranch {
+ public TestGlutenPartitionedWritesToBranch(
String catalogName, String implementation, Map<String, String> config) {
super(catalogName, implementation, config);
}
diff --git
a/backends-velox/src-iceberg-spark34/test/java/org/apache/gluten/sql/GlutenTestPartitionedWritesToWapBranch.java
b/backends-velox/src-iceberg-spark34/test/java/org/apache/gluten/sql/TestGlutenPartitionedWritesToWapBranch.java
similarity index 90%
rename from
backends-velox/src-iceberg-spark34/test/java/org/apache/gluten/sql/GlutenTestPartitionedWritesToWapBranch.java
rename to
backends-velox/src-iceberg-spark34/test/java/org/apache/gluten/sql/TestGlutenPartitionedWritesToWapBranch.java
index 11a4778161..935ca6872e 100644
---
a/backends-velox/src-iceberg-spark34/test/java/org/apache/gluten/sql/GlutenTestPartitionedWritesToWapBranch.java
+++
b/backends-velox/src-iceberg-spark34/test/java/org/apache/gluten/sql/TestGlutenPartitionedWritesToWapBranch.java
@@ -20,8 +20,8 @@ import
org.apache.iceberg.spark.sql.TestPartitionedWritesToWapBranch;
import java.util.Map;
-public class GlutenTestPartitionedWritesToWapBranch extends
TestPartitionedWritesToWapBranch {
- public GlutenTestPartitionedWritesToWapBranch(
+public class TestGlutenPartitionedWritesToWapBranch extends
TestPartitionedWritesToWapBranch {
+ public TestGlutenPartitionedWritesToWapBranch(
String catalogName, String implementation, Map<String, String> config) {
super(catalogName, implementation, config);
}
diff --git
a/backends-velox/src-iceberg-spark34/test/java/org/apache/gluten/sql/GlutenTestSelect.java
b/backends-velox/src-iceberg-spark34/test/java/org/apache/gluten/sql/TestGlutenSelect.java
similarity index 89%
rename from
backends-velox/src-iceberg-spark34/test/java/org/apache/gluten/sql/GlutenTestSelect.java
rename to
backends-velox/src-iceberg-spark34/test/java/org/apache/gluten/sql/TestGlutenSelect.java
index b7072a0771..eff29920df 100644
---
a/backends-velox/src-iceberg-spark34/test/java/org/apache/gluten/sql/GlutenTestSelect.java
+++
b/backends-velox/src-iceberg-spark34/test/java/org/apache/gluten/sql/TestGlutenSelect.java
@@ -20,8 +20,8 @@ import org.apache.iceberg.spark.sql.TestSelect;
import java.util.Map;
-public class GlutenTestSelect extends TestSelect {
- public GlutenTestSelect(String catalogName, String implementation,
Map<String, String> config) {
+public class TestGlutenSelect extends TestSelect {
+ public TestGlutenSelect(String catalogName, String implementation,
Map<String, String> config) {
super(catalogName, implementation, config);
}
}
diff --git
a/backends-velox/src-iceberg-spark34/test/java/org/apache/gluten/sql/GlutenTestTimestampWithoutZone.java
b/backends-velox/src-iceberg-spark34/test/java/org/apache/gluten/sql/TestGlutenTimestampWithoutZone.java
similarity index 91%
rename from
backends-velox/src-iceberg-spark34/test/java/org/apache/gluten/sql/GlutenTestTimestampWithoutZone.java
rename to
backends-velox/src-iceberg-spark34/test/java/org/apache/gluten/sql/TestGlutenTimestampWithoutZone.java
index a1a394bab0..af83dafd1d 100644
---
a/backends-velox/src-iceberg-spark34/test/java/org/apache/gluten/sql/GlutenTestTimestampWithoutZone.java
+++
b/backends-velox/src-iceberg-spark34/test/java/org/apache/gluten/sql/TestGlutenTimestampWithoutZone.java
@@ -20,8 +20,8 @@ import org.apache.iceberg.spark.sql.TestTimestampWithoutZone;
import java.util.Map;
-public class GlutenTestTimestampWithoutZone extends TestTimestampWithoutZone {
- public GlutenTestTimestampWithoutZone(
+public class TestGlutenTimestampWithoutZone extends TestTimestampWithoutZone {
+ public TestGlutenTimestampWithoutZone(
String catalogName, String implementation, Map<String, String> config) {
super(catalogName, implementation, config);
}
diff --git
a/backends-velox/src-iceberg-spark34/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java
b/backends-velox/src-iceberg-spark34/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java
index 9a610dc340..d3350ddf25 100644
---
a/backends-velox/src-iceberg-spark34/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java
+++
b/backends-velox/src-iceberg-spark34/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java
@@ -173,8 +173,9 @@ public class TestSparkReaderDeletes extends DeleteReadTests
{
catalog.dropTable(TableIdentifier.of("default", name));
}
+ // The native side does not report the numDeletes metric.
protected boolean countDeletes() {
- return true;
+ return false;
}
@Override
diff --git
a/gluten-iceberg/src/main/scala/org/apache/gluten/execution/IcebergScanTransformer.scala
b/gluten-iceberg/src/main/scala/org/apache/gluten/execution/IcebergScanTransformer.scala
index 78b1848b6b..0c0fecc98d 100644
---
a/gluten-iceberg/src/main/scala/org/apache/gluten/execution/IcebergScanTransformer.scala
+++
b/gluten-iceberg/src/main/scala/org/apache/gluten/execution/IcebergScanTransformer.scala
@@ -19,8 +19,8 @@ package org.apache.gluten.execution
import org.apache.gluten.backendsapi.BackendsApiManager
import
org.apache.gluten.execution.IcebergScanTransformer.{containsMetadataColumn,
containsUuidOrFixedType}
import org.apache.gluten.sql.shims.SparkShimLoader
+import org.apache.gluten.substrait.rel.{LocalFilesNode, SplitInfo}
import org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat
-import org.apache.gluten.substrait.rel.SplitInfo
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{AttributeReference,
DynamicPruningExpression, Expression, Literal}
@@ -28,12 +28,14 @@ import org.apache.spark.sql.catalyst.plans.QueryPlan
import org.apache.spark.sql.connector.catalog.Table
import org.apache.spark.sql.connector.read.{InputPartition, Scan}
import org.apache.spark.sql.execution.datasources.v2.BatchScanExec
-import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.execution.metric.SQLMetrics
+import org.apache.spark.sql.types.{ArrayType, DataType, StructType}
-import org.apache.iceberg.{BaseTable, MetadataColumns, SnapshotSummary}
+import org.apache.iceberg.{BaseTable, MetadataColumns, Schema, SnapshotSummary}
import org.apache.iceberg.avro.AvroSchemaUtil
import org.apache.iceberg.spark.source.{GlutenIcebergSourceUtil, SparkTable}
-import org.apache.iceberg.types.Type
+import org.apache.iceberg.spark.source.metrics.NumSplits
+import org.apache.iceberg.types.{Type, Types}
import org.apache.iceberg.types.Type.TypeID
import org.apache.iceberg.types.Types.{ListType, MapType, NestedField}
@@ -53,6 +55,11 @@ case class IcebergScanTransformer(
commonPartitionValues = commonPartitionValues
) {
+ // PartitionReader reports the metric by currentMetricsValues,
+ // but the implementation is different.
+ // So use Metric to get NumSplits, NumDeletes is not reported by native
metric
+ private val numSplits = SQLMetrics.createMetric(sparkContext, new
NumSplits().description())
+
protected[this] def supportsBatchScan(scan: Scan): Boolean = {
IcebergScanTransformer.supportsBatchScan(scan)
}
@@ -91,12 +98,18 @@ case class IcebergScanTransformer(
case t: SparkTable =>
t.table() match {
case t: BaseTable =>
- t.operations()
+ val snapshot = t
+ .operations()
.current()
.currentSnapshot()
- .summary()
- .getOrDefault(SnapshotSummary.TOTAL_EQ_DELETES_PROP, "0")
- .toInt > 0
+ if (snapshot == null) {
+ false
+ } else {
+ snapshot
+ .summary()
+ .getOrDefault(SnapshotSummary.TOTAL_EQ_DELETES_PROP, "0")
+ .toInt > 0
+ }
case _ => false
}
case _ => false
@@ -104,6 +117,11 @@ case class IcebergScanTransformer(
if (containsEqualityDelete) {
return ValidationResult.failed("Contains equality delete files")
}
+
+ if (hasRenamedColumn) {
+ return ValidationResult.failed(
+ "The column is renamed or data type mismatch, cannot read it.")
+ }
}
ValidationResult.succeeded
@@ -120,10 +138,12 @@ case class IcebergScanTransformer(
override lazy val fileFormat: ReadFileFormat =
GlutenIcebergSourceUtil.getFileFormat(scan)
override def getSplitInfosWithIndex: Seq[SplitInfo] = {
- getPartitionsWithIndex.zipWithIndex.map {
+ val splitInfos = getPartitionsWithIndex.zipWithIndex.map {
case (partitions, index) =>
GlutenIcebergSourceUtil.genSplitInfo(partitions, index,
getPartitionSchema)
}
+ numSplits.add(splitInfos.map(s =>
s.asInstanceOf[LocalFilesNode].getPaths.size()).sum)
+ splitInfos
}
override def getSplitInfosFromPartitions(partitions: Seq[InputPartition]):
Seq[SplitInfo] = {
@@ -138,10 +158,12 @@ case class IcebergScanTransformer(
applyPartialClustering,
replicatePartitions)
.flatten
- groupedPartitions.zipWithIndex.map {
+ val splitInfos = groupedPartitions.zipWithIndex.map {
case (p, index) =>
GlutenIcebergSourceUtil.genSplitInfoForPartition(p, index,
getPartitionSchema)
}
+ numSplits.add(splitInfos.map(s =>
s.asInstanceOf[LocalFilesNode].getPaths.size()).sum)
+ splitInfos
}
override def doCanonicalize(): IcebergScanTransformer = {
@@ -156,6 +178,83 @@ case class IcebergScanTransformer(
private[execution] def getKeyGroupPartitioning: Option[Seq[Expression]] =
keyGroupedPartitioning
override def nodeName: String = "Iceberg" + super.nodeName
+
+ private def hasRenamedColumn: Boolean = {
+ val icebergTable = table match {
+ case t: SparkTable =>
+ t.table() match {
+ case t: BaseTable => t
+ case _ => null
+ }
+ case _ => null
+ }
+ if (icebergTable == null) {
+ return false
+ }
+
+ // The read fields always should be found in current schema,
+ // but may have different id in history schemas
+ val ops = icebergTable.operations().current()
+ val currentSchema = ops.schema()
+ val oldSchemas = icebergTable.operations().current().schemas()
+ oldSchemas
+ .stream()
+ .filter(s => s.schemaId() != ops.currentSchemaId())
+ .anyMatch(s => !typesMatch(s.asStruct(), currentSchema.asStruct(),
scan.readSchema()))
+ }
+
+ private def typesMatch(icebergType: Type, currentType: Type, sparkType:
DataType): Boolean = {
+ if (icebergType.isPrimitiveType) {
+ if (!currentType.isPrimitiveType) {
+ return false
+ }
+ sparkType match {
+ case _: StructType => return false
+ case _: ArrayType => return false
+ case _: org.apache.spark.sql.types.MapType => return false
+ case _ => return true
+ }
+ }
+ (icebergType, currentType, sparkType) match {
+ case (iceberg: Types.StructType, currentType: Types.StructType,
sparkStruct: StructType) =>
+ sparkStruct.fields.forall {
+ f =>
+ val currentField = new
Schema(currentType.fields()).findField(f.name)
+ // Find not exists column
+ if (currentField == null) {
+ false
+ } else {
+ val field = new
Schema(iceberg.fields()).findField(currentField.fieldId())
+ // The field does not exist in old schema, add column case
+ if (field == null) {
+ true
+ } else {
+ // Maybe rename column
+ field.name() == f.name &&
+ typesMatch(field.`type`(), currentField.`type`(), f.dataType)
+ }
+ }
+ }
+
+ // Array types
+ case (iceberg: Types.ListType, current: Types.ListType, spark:
ArrayType) =>
+ iceberg.elementId() == current.elementId() &&
+ typesMatch(iceberg.elementType(), current.elementType(),
spark.elementType)
+
+ // Map types
+ case (
+ iceberg: Types.MapType,
+ current: Types.MapType,
+ spark: org.apache.spark.sql.types.MapType) =>
+ iceberg.keyId() == current.keyId() && iceberg.valueId() ==
current.valueId() &&
+ typesMatch(iceberg.keyType(), current.keyType(), spark.keyType) &&
typesMatch(
+ iceberg.valueType(),
+ current.valueType(),
+ spark.valueType)
+
+ case _ => false
+ }
+ }
}
object IcebergScanTransformer {
diff --git a/pom.xml b/pom.xml
index 4c5d5d4c47..35d47c72e9 100644
--- a/pom.xml
+++ b/pom.xml
@@ -808,7 +808,7 @@
<source>${project.basedir}/src-iceberg/test/java</source>
<source>${project.basedir}/src-iceberg-spark${spark.plain.version}/test/scala</source>
<!-- // TODO: temporary mark disable to pass CI -->
- <!--
<source>${project.basedir}/src-iceberg-spark${spark.plain.version}/test/java</source>
-->
+
<source>${project.basedir}/src-iceberg-spark${spark.plain.version}/test/java</source>
</sources>
</configuration>
</execution>
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]