This is an automated email from the ASF dual-hosted git repository.
huaxingao pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new aa14aae011 Spark 4.0: Support recursive delegate unwrapping to find
ExtendedParser in parser chains (#13625)
aa14aae011 is described below
commit aa14aae0111d757f8cded70b87c1d58da3fe272c
Author: majian <[email protected]>
AuthorDate: Sun Nov 2 07:37:22 2025 +0800
Spark 4.0: Support recursive delegate unwrapping to find ExtendedParser in
parser chains (#13625)
* Spark 4.0: Support recursive delegate unwrapping to find ExtendedParser
in parser chains
* Spark 4.0: Support recursive delegate unwrapping to find ExtendedParser
in parser chains
* Spark 4.0: Support recursive delegate unwrapping to find ExtendedParser
in parser chains
* Spark 4.0: Support recursive delegate unwrapping to find ExtendedParser
in parser chains
* Spark 4.0: Support recursive delegate unwrapping to find ExtendedParser
in parser chains
* Spark 4.0: Support recursive delegate unwrapping to find ExtendedParser
in parser chains
* Spark 4.0: Support recursive delegate unwrapping to find ExtendedParser
in parser chains
* Spark 4.0: Support recursive delegate unwrapping to find ExtendedParser
in parser chains
* Spark 4.0: Support recursive delegate unwrapping to find ExtendedParser
in parser chains
* Spark 4.0: Support recursive delegate unwrapping to find ExtendedParser
in parser chains
* Spark 4.0: Support recursive delegate unwrapping to find ExtendedParser
in parser chains
* Spark 4.0: Support recursive delegate unwrapping to find ExtendedParser
in parser chains
* Spark 4.0: Support recursive delegate unwrapping to find ExtendedParser
in parser chains
* Spark 4.0: Support recursive delegate unwrapping to find ExtendedParser
in parser chains
---
.../apache/iceberg/spark/TestExtendedParser.java | 231 +++++++++++++++++++++
.../org/apache/iceberg/spark/ExtendedParser.java | 45 +++-
2 files changed, 273 insertions(+), 3 deletions(-)
diff --git
a/spark/v4.0/spark-extensions/src/test/java/org/apache/iceberg/spark/TestExtendedParser.java
b/spark/v4.0/spark-extensions/src/test/java/org/apache/iceberg/spark/TestExtendedParser.java
new file mode 100644
index 0000000000..bfcb5af235
--- /dev/null
+++
b/spark/v4.0/spark-extensions/src/test/java/org/apache/iceberg/spark/TestExtendedParser.java
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.lang.reflect.Field;
+import java.util.Collections;
+import java.util.List;
+import org.apache.iceberg.NullOrder;
+import org.apache.iceberg.SortDirection;
+import org.apache.iceberg.expressions.Term;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.catalyst.parser.AbstractSqlParser;
+import org.apache.spark.sql.catalyst.parser.AstBuilder;
+import org.apache.spark.sql.catalyst.parser.ParserInterface;
+import
org.apache.spark.sql.catalyst.parser.extensions.IcebergSparkSqlExtensionsParser;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+public class TestExtendedParser {
+
+ private static SparkSession spark;
+ private static final String SQL_PARSER_FIELD = "sqlParser";
+ private ParserInterface originalParser;
+
+ @BeforeAll
+ public static void before() {
+ spark =
SparkSession.builder().master("local").appName("TestExtendedParser").getOrCreate();
+ }
+
+ @AfterAll
+ public static void after() {
+ if (spark != null) {
+ spark.stop();
+ }
+ }
+
+ @BeforeEach
+ public void saveOriginalParser() throws Exception {
+ Class<?> clazz = spark.sessionState().getClass();
+ Field parserField = null;
+ while (clazz != null && parserField == null) {
+ try {
+ parserField = clazz.getDeclaredField(SQL_PARSER_FIELD);
+ } catch (NoSuchFieldException e) {
+ clazz = clazz.getSuperclass();
+ }
+ }
+ parserField.setAccessible(true);
+ originalParser = (ParserInterface) parserField.get(spark.sessionState());
+ }
+
+ @AfterEach
+ public void restoreOriginalParser() throws Exception {
+ setSessionStateParser(spark.sessionState(), originalParser);
+ }
+
+ /**
+ * Tests that the Iceberg extended SQL parser can correctly parse a sort
order string and return
+ * the expected RawOrderField.
+ *
+ * @throws Exception if reflection access fails
+ */
+ @Test
+ public void testParseSortOrderWithRealIcebergExtendedParser() throws
Exception {
+ ParserInterface origParser = null;
+ Class<?> clazz = spark.sessionState().getClass();
+ while (clazz != null && origParser == null) {
+ try {
+ Field parserField = clazz.getDeclaredField(SQL_PARSER_FIELD);
+ parserField.setAccessible(true);
+ origParser = (ParserInterface) parserField.get(spark.sessionState());
+ } catch (NoSuchFieldException e) {
+ clazz = clazz.getSuperclass();
+ }
+ }
+ assertThat(origParser).isNotNull();
+
+ IcebergSparkSqlExtensionsParser icebergParser = new
IcebergSparkSqlExtensionsParser(origParser);
+
+ setSessionStateParser(spark.sessionState(), icebergParser);
+
+ List<ExtendedParser.RawOrderField> fields =
+ ExtendedParser.parseSortOrder(spark, "id ASC NULLS FIRST");
+
+ assertThat(fields).isNotEmpty();
+ ExtendedParser.RawOrderField first = fields.get(0);
+ assertThat(first.direction()).isEqualTo(SortDirection.ASC);
+ assertThat(first.nullOrder()).isEqualTo(NullOrder.NULLS_FIRST);
+ }
+
+ /**
+ * Tests that parseSortOrder can find and use an ExtendedParser that is
wrapped inside another
+ * ParserInterface implementation.
+ *
+ * @throws Exception if reflection access fails
+ */
+ @Test
+ public void testParseSortOrderFindsNestedExtendedParser() throws Exception {
+ ExtendedParser icebergParser = mock(ExtendedParser.class);
+
+ ExtendedParser.RawOrderField field =
+ new ExtendedParser.RawOrderField(
+ mock(Term.class), SortDirection.ASC, NullOrder.NULLS_FIRST);
+ List<ExtendedParser.RawOrderField> expected =
Collections.singletonList(field);
+
+ when(icebergParser.parseSortOrder("id ASC NULLS
FIRST")).thenReturn(expected);
+
+ ParserInterface wrapper = new WrapperParser(icebergParser);
+
+ setSessionStateParser(spark.sessionState(), wrapper);
+
+ List<ExtendedParser.RawOrderField> result =
+ ExtendedParser.parseSortOrder(spark, "id ASC NULLS FIRST");
+ assertThat(result).isSameAs(expected);
+
+ verify(icebergParser).parseSortOrder("id ASC NULLS FIRST");
+ }
+
+ /**
+ * Tests that parseSortOrder throws an exception if no ExtendedParser
instance can be found in the
+ * parser chain.
+ *
+ * @throws Exception if reflection access fails
+ */
+ @Test
+ public void testParseSortOrderThrowsWhenNoExtendedParserFound() throws
Exception {
+ ParserInterface dummy = mock(ParserInterface.class);
+ setSessionStateParser(spark.sessionState(), dummy);
+
+ assertThatThrownBy(() -> ExtendedParser.parseSortOrder(spark, "id ASC"))
+ .isInstanceOf(IllegalStateException.class)
+ .hasMessageContaining("Iceberg ExtendedParser");
+ }
+
+ /**
+ * Tests that parseSortOrder can find an ExtendedParser in a parent class
field of the parser.
+ *
+ * @throws Exception if reflection access fails
+ */
+ @Test
+ public void testParseSortOrderFindsExtendedParserInParentClassField() throws
Exception {
+ ExtendedParser icebergParser = mock(ExtendedParser.class);
+ ExtendedParser.RawOrderField field =
+ new ExtendedParser.RawOrderField(
+ mock(Term.class), SortDirection.ASC, NullOrder.NULLS_FIRST);
+ List<ExtendedParser.RawOrderField> expected =
Collections.singletonList(field);
+ when(icebergParser.parseSortOrder("id ASC NULLS
FIRST")).thenReturn(expected);
+ ParserInterface parser = new GrandChildParser(icebergParser);
+ setSessionStateParser(spark.sessionState(), parser);
+
+ List<ExtendedParser.RawOrderField> result =
+ ExtendedParser.parseSortOrder(spark, "id ASC NULLS FIRST");
+ assertThat(result).isSameAs(expected);
+ verify(icebergParser).parseSortOrder("id ASC NULLS FIRST");
+ }
+
+ private static void setSessionStateParser(Object sessionState,
ParserInterface parser)
+ throws Exception {
+ Class<?> clazz = sessionState.getClass();
+ Field targetField = null;
+ while (clazz != null && targetField == null) {
+ try {
+ targetField = clazz.getDeclaredField(SQL_PARSER_FIELD);
+ } catch (NoSuchFieldException e) {
+ clazz = clazz.getSuperclass();
+ }
+ }
+ if (targetField == null) {
+ throw new IllegalStateException(
+ "No suitable sqlParser field found in sessionState class
hierarchy!");
+ }
+ targetField.setAccessible(true);
+ targetField.set(sessionState, parser);
+ }
+
+ private static class WrapperParser extends AbstractSqlParser {
+ private final ParserInterface delegate;
+ private String name;
+
+ WrapperParser(ParserInterface delegate) {
+ this.delegate = delegate;
+ this.name = "delegate";
+ }
+
+ public ParserInterface getDelegate() {
+ return delegate;
+ }
+
+ @Override
+ public AstBuilder astBuilder() {
+ return null;
+ }
+ }
+
+ private static class ChildParser extends WrapperParser {
+ ChildParser(ParserInterface parent) {
+ super(parent);
+ }
+ }
+
+ private static class GrandChildParser extends ChildParser {
+ GrandChildParser(ParserInterface parent) {
+ super(parent);
+ }
+ }
+}
diff --git
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/ExtendedParser.java
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/ExtendedParser.java
index 19b3dd8f49..d852dc96a3 100644
---
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/ExtendedParser.java
+++
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/ExtendedParser.java
@@ -18,6 +18,7 @@
*/
package org.apache.iceberg.spark;
+import java.lang.reflect.Field;
import java.util.List;
import org.apache.iceberg.NullOrder;
import org.apache.iceberg.SortDirection;
@@ -52,10 +53,10 @@ public interface ExtendedParser extends ParserInterface {
}
static List<RawOrderField> parseSortOrder(SparkSession spark, String
orderString) {
- if (spark.sessionState().sqlParser() instanceof ExtendedParser) {
- ExtendedParser parser = (ExtendedParser)
spark.sessionState().sqlParser();
+ ExtendedParser extParser = findParser(spark.sessionState().sqlParser(),
ExtendedParser.class);
+ if (extParser != null) {
try {
- return parser.parseSortOrder(orderString);
+ return extParser.parseSortOrder(orderString);
} catch (AnalysisException e) {
throw new IllegalArgumentException(
String.format("Unable to parse sortOrder: %s", orderString), e);
@@ -66,5 +67,43 @@ public interface ExtendedParser extends ParserInterface {
}
}
+ private static <T> T findParser(ParserInterface parser, Class<T> clazz) {
+ ParserInterface current = parser;
+ while (current != null) {
+ if (clazz.isInstance(current)) {
+ return clazz.cast(current);
+ }
+
+ ParserInterface next = getNextDelegateParser(current);
+ if (next == null) {
+ break;
+ }
+
+ current = next;
+ }
+
+ return null;
+ }
+
+ private static ParserInterface getNextDelegateParser(ParserInterface parser)
{
+ try {
+ Class<?> clazz = parser.getClass();
+ while (clazz != null) {
+ for (Field field : clazz.getDeclaredFields()) {
+ field.setAccessible(true);
+ Object value = field.get(parser);
+ if (value instanceof ParserInterface && value != parser) {
+ return (ParserInterface) value;
+ }
+ }
+ clazz = clazz.getSuperclass();
+ }
+ } catch (Exception e) {
+ // ignore
+ }
+
+ return null;
+ }
+
List<RawOrderField> parseSortOrder(String orderString) throws
AnalysisException;
}