This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 0ba4a30d5241 [SPARK-54656][SQL] Refactor SupportsPushDownVariants to
be a ScanBuilder mix-in
0ba4a30d5241 is described below
commit 0ba4a30d52410de550f674ad2a5ab4eba478b132
Author: Liang-Chi Hsieh <[email protected]>
AuthorDate: Wed Dec 10 13:34:45 2025 -0800
[SPARK-54656][SQL] Refactor SupportsPushDownVariants to be a ScanBuilder
mix-in
### What changes were proposed in this pull request?
SupportsPushDownVariants was a Scan mix-in in #52578. This patch changes it
to be a ScanBuilder mix-in to follow the established patterns in the codebase.
### Why are the changes needed?
SupportsPushDownVariants was a Scan mix-in in #52578. This patch changes it
to be a ScanBuilder mix-in to follow the established patterns in the codebase,
e.g, join pushdown, aggregate pushdown...etc.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Unit tests
### Was this patch authored or co-authored using generative AI tooling?
Generated-by: Claude Code v2.0.14
Closes #53276 from viirya/pushvariantdsv2-pr-refactor.
Lead-authored-by: Liang-Chi Hsieh <[email protected]>
Co-authored-by: Dongjoon Hyun <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
---
.../read/SupportsPushDownVariantExtractions.java | 63 +++++++
.../connector/read/SupportsPushDownVariants.java | 77 ---------
.../sql/connector/read/VariantAccessInfo.java | 105 -----------
.../sql/connector/read/VariantExtraction.java | 64 +++++++
.../internal/connector/VariantExtractionImpl.scala | 45 +++++
.../datasources/PushVariantIntoScan.scala | 116 +------------
.../datasources/v2/V2ScanRelationPushDown.scala | 191 ++++++++++++++++++++-
.../datasources/v2/parquet/ParquetScan.scala | 110 ++++++------
.../v2/parquet/ParquetScanBuilder.scala | 17 +-
9 files changed, 440 insertions(+), 348 deletions(-)
diff --git
a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/SupportsPushDownVariantExtractions.java
b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/SupportsPushDownVariantExtractions.java
new file mode 100644
index 000000000000..750e0479e542
--- /dev/null
+++
b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/SupportsPushDownVariantExtractions.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connector.read;
+
+import org.apache.spark.annotation.Experimental;
+
+/**
+ * A mix-in interface for {@link ScanBuilder}. Data sources can implement this
interface to
+ * support pushing down variant field extraction operations to the data source.
+ * <p>
+ * When variant columns are accessed with specific field extractions (e.g.,
variant_get,
+ * try_variant_get), the optimizer can push these extractions down to the data
source.
+ * The data source can then read only the required fields from variant
columns, reducing
+ * I/O and improving performance.
+ * <p>
+ * Each {@link VariantExtraction} in the input array represents one field
extraction operation.
+ * Data sources should examine each extraction and determine which ones can be
handled efficiently.
+ * The return value is a boolean array of the same length, where each element
indicates whether
+ * the corresponding extraction was accepted.
+ *
+ * @since 4.1.0
+ */
+@Experimental
+public interface SupportsPushDownVariantExtractions extends ScanBuilder {
+
+ /**
+ * Pushes down variant field extractions to the data source.
+ * <p>
+ * Each element in the input array represents one field extraction operation
from a variant
+ * column. Data sources should examine each extraction and determine whether
it can be
+ * pushed down based on the data source's capabilities (e.g., supported data
types,
+ * path complexity, etc.).
+ * <p>
+ * The return value is a boolean array of the same length as the input
array, where each
+ * element indicates whether the corresponding extraction was accepted:
+ * <ul>
+ * <li>true: The extraction will be handled by the data source</li>
+ * <li>false: The extraction will be handled by Spark after reading</li>
+ * </ul>
+ * <p>
+ * Data sources can choose to accept all, some, or none of the extractions.
Spark will
+ * handle any extractions that are not pushed down.
+ *
+ * @param extractions Array of variant extractions, one per field extraction
operation
+ * @return Boolean array indicating which extractions were accepted (same
length as input)
+ */
+ boolean[] pushVariantExtractions(VariantExtraction[] extractions);
+}
diff --git
a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/SupportsPushDownVariants.java
b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/SupportsPushDownVariants.java
deleted file mode 100644
index ff82e71bfd58..000000000000
---
a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/SupportsPushDownVariants.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.connector.read;
-
-import org.apache.spark.annotation.Evolving;
-
-/**
- * A mix-in interface for {@link Scan}. Data sources can implement this
interface to
- * support pushing down variant field access operations to the data source.
- * <p>
- * When variant columns are accessed with specific field extractions (e.g.,
variant_get),
- * the optimizer can push these accesses down to the data source. The data
source can then
- * read only the required fields from variant columns, reducing I/O and
improving performance.
- * <p>
- * The typical workflow is:
- * <ol>
- * <li>Optimizer analyzes the query plan and identifies variant field
accesses</li>
- * <li>Optimizer calls {@link #pushVariantAccess} with the access
information</li>
- * <li>Data source validates and stores the variant access information</li>
- * <li>Optimizer retrieves pushed information via {@link
#pushedVariantAccess}</li>
- * <li>Data source uses the information to optimize reading in {@link
#readSchema()}
- * and readers</li>
- * </ol>
- *
- * @since 4.1.0
- */
-@Evolving
-public interface SupportsPushDownVariants extends Scan {
-
- /**
- * Pushes down variant field access information to the data source.
- * <p>
- * Implementations should validate if the variant accesses can be pushed
down based on
- * the data source's capabilities. If some accesses cannot be pushed down,
the implementation
- * can choose to:
- * <ul>
- * <li>Push down only the supported accesses and return true</li>
- * <li>Reject all pushdown and return false</li>
- * </ul>
- * <p>
- * The implementation should store the variant access information that can
be pushed down.
- * The stored information will be retrieved later via {@link
#pushedVariantAccess()}.
- *
- * @param variantAccessInfo Array of variant access information, one per
variant column
- * @return true if at least some variant accesses were pushed down, false if
none were pushed
- */
- boolean pushVariantAccess(VariantAccessInfo[] variantAccessInfo);
-
- /**
- * Returns the variant access information that has been pushed down to this
scan.
- * <p>
- * This method is called by the optimizer after {@link #pushVariantAccess}
to retrieve
- * what variant accesses were actually accepted by the data source. The
optimizer uses
- * this information to rewrite the query plan.
- * <p>
- * If {@link #pushVariantAccess} was not called or returned false, this
should return
- * an empty array.
- *
- * @return Array of pushed down variant access information
- */
- VariantAccessInfo[] pushedVariantAccess();
-}
diff --git
a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/VariantAccessInfo.java
b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/VariantAccessInfo.java
deleted file mode 100644
index 4f61a42d0519..000000000000
---
a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/VariantAccessInfo.java
+++ /dev/null
@@ -1,105 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.connector.read;
-
-import java.io.Serializable;
-import java.util.Objects;
-
-import org.apache.spark.annotation.Evolving;
-import org.apache.spark.sql.types.StructType;
-
-/**
- * Variant access information that describes how variant fields are accessed
in a query.
- * <p>
- * This class captures the information needed by data sources to optimize
reading variant columns.
- * Instead of reading the entire variant value, the data source can read only
the fields that
- * are actually accessed, represented as a structured schema.
- * <p>
- * For example, if a query accesses `variant_get(v, '$.a', 'int')` and
- * `variant_get(v, '$.b', 'string')`, the extracted schema would be
- * `struct<0:int, 1:string>` where field ordinals correspond to the
access order.
- *
- * @since 4.1.0
- */
-@Evolving
-public final class VariantAccessInfo implements Serializable {
- private final String columnName;
- private final StructType extractedSchema;
-
- /**
- * Creates variant access information for a variant column.
- *
- * @param columnName The name of the variant column
- * @param extractedSchema The schema representing extracted fields from the
variant.
- * Each field represents one variant field access,
with field names
- * typically being ordinals (e.g., "0", "1", "2") and
metadata
- * containing variant-specific information like JSON
path.
- */
- public VariantAccessInfo(String columnName, StructType extractedSchema) {
- this.columnName = Objects.requireNonNull(columnName, "columnName cannot be
null");
- this.extractedSchema =
- Objects.requireNonNull(extractedSchema, "extractedSchema cannot be
null");
- }
-
- /**
- * Returns the name of the variant column.
- */
- public String columnName() {
- return columnName;
- }
-
- /**
- * Returns the schema representing fields extracted from the variant column.
- * <p>
- * The schema structure is:
- * <ul>
- * <li>Field names: Typically ordinals ("0", "1", "2", ...) representing
access order</li>
- * <li>Field types: The target data type for each field extraction</li>
- * <li>Field metadata: Contains variant-specific information such as JSON
path,
- * timezone, and error handling mode</li>
- * </ul>
- * <p>
- * Data sources should use this schema to determine what fields to extract
from the variant
- * and what types they should be converted to.
- */
- public StructType extractedSchema() {
- return extractedSchema;
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) return true;
- if (o == null || getClass() != o.getClass()) return false;
- VariantAccessInfo that = (VariantAccessInfo) o;
- return columnName.equals(that.columnName) &&
- extractedSchema.equals(that.extractedSchema);
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(columnName, extractedSchema);
- }
-
- @Override
- public String toString() {
- return "VariantAccessInfo{" +
- "columnName='" + columnName + '\'' +
- ", extractedSchema=" + extractedSchema +
- '}';
- }
-}
diff --git
a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/VariantExtraction.java
b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/VariantExtraction.java
new file mode 100644
index 000000000000..64987299934a
--- /dev/null
+++
b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/VariantExtraction.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connector.read;
+
+import java.io.Serializable;
+
+import org.apache.spark.annotation.Experimental;
+import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.Metadata;
+
+/**
+ * Variant extraction information that describes a single field extraction
from a variant column.
+ * <p>
+ * This interface captures the information needed by data sources to optimize
reading variant
+ * columns. Each instance represents one field extraction operation (e.g.,
from variant_get or
+ * try_variant_get).
+ * <p>
+ * For example, if a query contains `variant_get(v, '$.a', 'int')`, this would
be represented
+ * as a VariantExtraction with columnName=["v"], path="$.a", and
expectedDataType=IntegerType.
+ *
+ * @since 4.1.0
+ */
+@Experimental
+public interface VariantExtraction extends Serializable {
+ /**
+ * Returns the path to the variant column. For top-level variant columns,
this is a single
+ * element array containing the column name. For nested variant columns
within structs,
+ * this is an array representing the path (e.g., ["structCol",
"innerStruct", "variantCol"]).
+ */
+ String[] columnName();
+
+ /**
+ * Returns the expected data type for the extracted value.
+ * This is the target type specified in variant_get (e.g., IntegerType,
StringType).
+ */
+ DataType expectedDataType();
+
+ /**
+ * Returns the metadata associated with this variant extraction.
+ * This may include additional information needed by the data source:
+ * - "path": the extraction path from variant_get or try_variant_get.
+ * This follows JSON path syntax (e.g., "$.a", "$.b.c", "$[0]").
+ * - "failOnError": whether the extraction to expected data type should
throw an exception
+ * or return null if the cast fails.
+ * - "timeZoneId": a string identifier of a time zone. It is required by
timestamp-related casts.
+ *
+ */
+ Metadata metadata();
+}
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/connector/VariantExtractionImpl.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/connector/VariantExtractionImpl.scala
new file mode 100644
index 000000000000..87db41a3217e
--- /dev/null
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/connector/VariantExtractionImpl.scala
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.internal.connector
+
+import org.apache.spark.sql.connector.read.VariantExtraction
+import org.apache.spark.sql.types.{DataType, Metadata}
+
+/**
+ * Implementation of [[VariantExtraction]].
+ *
+ * @param columnName Path to the variant column (e.g., Array("v") for
top-level,
+ * Array("struct1", "v") for nested)
+ * @param metadata The metadata for extraction including JSON path,
failOnError, and timeZoneId
+ * @param expectedDataType The expected data type for the extracted value
+ */
+case class VariantExtractionImpl(
+ columnName: Array[String],
+ metadata: Metadata,
+ expectedDataType: DataType) extends VariantExtraction {
+
+ require(columnName != null, "columnName cannot be null")
+ require(metadata != null, "metadata cannot be null")
+ require(expectedDataType != null, "expectedDataType cannot be null")
+ require(columnName.nonEmpty, "columnName cannot be empty")
+
+ override def toString: String = {
+ s"VariantExtraction{columnName=${columnName.mkString("[", ", ", "]")}, " +
+ s"metadata='$metadata', expectedDataType=$expectedDataType}"
+ }
+}
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PushVariantIntoScan.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PushVariantIntoScan.scala
index 2cf1a5e9b8cd..b0b20d08dccb 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PushVariantIntoScan.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PushVariantIntoScan.scala
@@ -26,10 +26,8 @@ import
org.apache.spark.sql.catalyst.planning.PhysicalOperation
import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan,
Project, Subquery}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.catalyst.util.ResolveDefaultColumns
-import org.apache.spark.sql.connector.read.{SupportsPushDownVariants,
VariantAccessInfo}
import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
-import org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanRelation
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
@@ -225,8 +223,11 @@ class VariantInRelation {
case Some(variants) =>
variants.get(path) match {
case Some(fields) =>
+ // Accessing the full variant value
addField(fields, RequestedVariantField.fullVariant)
case _ =>
+ // Accessing the struct containing a variant.
+ // This variant is not eligible for push down.
// Remove non-eligible variants.
variants.filterInPlace { case (key, _) => !key.startsWith(path) }
}
@@ -281,11 +282,6 @@ object PushVariantIntoScan extends Rule[LogicalPlan] {
relation @ LogicalRelationWithTable(
hadoopFsRelation@HadoopFsRelation(_, _, _, _, _: ParquetFileFormat, _),
_)) =>
rewritePlan(p, projectList, filters, relation, hadoopFsRelation)
-
- case p@PhysicalOperation(projectList, filters,
- scanRelation @ DataSourceV2ScanRelation(
- relation, scan: SupportsPushDownVariants, output, _, _)) =>
- rewritePlanV2(p, projectList, filters, scanRelation, scan)
}
}
@@ -333,112 +329,10 @@ object PushVariantIntoScan extends Rule[LogicalPlan] {
hadoopFsRelation.sparkSession)
val newRelation = relation.copy(relation = newHadoopFsRelation, output =
newOutput.toIndexedSeq)
- buildFilterAndProject(newRelation, projectList, filters, variants,
attributeMap)
- }
-
- // DataSource V2 rewrite method using SupportsPushDownVariants API
- // Key differences from V1 implementation:
- // 1. V2 uses DataSourceV2ScanRelation instead of LogicalRelation
- // 2. Uses SupportsPushDownVariants API instead of directly manipulating scan
- // 3. Schema is already resolved in scanRelation.output (no need for
relation.resolve())
- // 4. Scan rebuilding is handled by the scan implementation via the API
- // Data sources like Delta and Iceberg can implement this API to support
variant pushdown.
- private def rewritePlanV2(
- originalPlan: LogicalPlan,
- projectList: Seq[NamedExpression],
- filters: Seq[Expression],
- scanRelation: DataSourceV2ScanRelation,
- scan: SupportsPushDownVariants): LogicalPlan = {
- val variants = new VariantInRelation
-
- // Extract schema attributes from V2 scan relation
- val schemaAttributes = scanRelation.output
-
- // Construct schema for default value resolution
- val structSchema = StructType(schemaAttributes.map(a =>
- StructField(a.name, a.dataType, a.nullable, a.metadata)))
-
- val defaultValues =
ResolveDefaultColumns.existenceDefaultValues(structSchema)
-
- // Add variant fields from the V2 scan schema
- for ((a, defaultValue) <- schemaAttributes.zip(defaultValues)) {
- variants.addVariantFields(a.exprId, a.dataType, defaultValue, Nil)
- }
- if (variants.mapping.isEmpty) return originalPlan
-
- // Collect requested fields from project list and filters
- projectList.foreach(variants.collectRequestedFields)
- filters.foreach(variants.collectRequestedFields)
-
- // If no variant columns remain after collection, return original plan
- if (variants.mapping.forall(_._2.isEmpty)) return originalPlan
-
- // Build VariantAccessInfo array for the API
- val variantAccessInfoArray = schemaAttributes.flatMap { attr =>
- variants.mapping.get(attr.exprId).flatMap(_.get(Nil)).map { fields =>
- // Build extracted schema for this variant column
- val extractedFields = fields.toArray.sortBy(_._2).map { case (field,
ordinal) =>
- StructField(ordinal.toString, field.targetType, metadata =
field.path.toMetadata)
- }
- val extractedSchema = if (extractedFields.isEmpty) {
- // Add placeholder field to avoid empty struct
- val placeholder = VariantMetadata("$.__placeholder_field__",
- failOnError = false, timeZoneId = "UTC")
- StructType(Array(StructField("0", BooleanType, metadata =
placeholder.toMetadata)))
- } else {
- StructType(extractedFields)
- }
- new VariantAccessInfo(attr.name, extractedSchema)
- }
- }.toArray
-
- // Call the API to push down variant access
- if (variantAccessInfoArray.isEmpty) return originalPlan
-
- val pushed = scan.pushVariantAccess(variantAccessInfoArray)
- if (!pushed) return originalPlan
-
- // Get what was actually pushed
- val pushedVariantAccess = scan.pushedVariantAccess()
- if (pushedVariantAccess.isEmpty) return originalPlan
-
- // Build new attribute mapping based on pushed variant access
- val pushedColumnNames = pushedVariantAccess.map(_.columnName()).toSet
- val attributeMap = schemaAttributes.map { a =>
- if (pushedColumnNames.contains(a.name) &&
variants.mapping.get(a.exprId).exists(_.nonEmpty)) {
- val newType = variants.rewriteType(a.exprId, a.dataType, Nil)
- val newAttr = AttributeReference(a.name, newType, a.nullable,
a.metadata)(
- qualifier = a.qualifier)
- (a.exprId, newAttr)
- } else {
- (a.exprId, a)
- }
- }.toMap
-
- val newOutput = scanRelation.output.map(a =>
attributeMap.getOrElse(a.exprId, a))
-
- // The scan implementation should have updated its readSchema() based on
the pushed info
- // We just need to create a new scan relation with the updated output
- val newScanRelation = scanRelation.copy(
- output = newOutput
- )
-
- buildFilterAndProject(newScanRelation, projectList, filters, variants,
attributeMap)
- }
-
- /**
- * Build the final Project(Filter(relation)) plan with rewritten expressions.
- */
- private def buildFilterAndProject(
- relation: LogicalPlan,
- projectList: Seq[NamedExpression],
- filters: Seq[Expression],
- variants: VariantInRelation,
- attributeMap: Map[ExprId, AttributeReference]): LogicalPlan = {
val withFilter = if (filters.nonEmpty) {
- Filter(filters.map(variants.rewriteExpr(_, attributeMap)).reduce(And),
relation)
+ Filter(filters.map(variants.rewriteExpr(_, attributeMap)).reduce(And),
newRelation)
} else {
- relation
+ newRelation
}
val newProjectList = projectList.map { e =>
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala
index 31a98e1ff96c..adfd5ceacd67 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala
@@ -21,8 +21,9 @@ import java.util.Locale
import scala.collection.mutable
+import org.apache.spark.SparkException
import org.apache.spark.internal.LogKeys.{AGGREGATE_FUNCTIONS, COLUMN_NAMES,
GROUP_BY_EXPRS, JOIN_CONDITION, JOIN_TYPE, POST_SCAN_FILTERS, PUSHED_FILTERS,
RELATION_NAME, RELATION_OUTPUT}
-import org.apache.spark.sql.catalyst.expressions.{aggregate, Alias, And,
Attribute, AttributeMap, AttributeReference, AttributeSet, Cast, Expression,
IntegerLiteral, Literal, NamedExpression, PredicateHelper,
ProjectionOverSchema, SortOrder, SubqueryExpression}
+import org.apache.spark.sql.catalyst.expressions.{aggregate, Alias, And,
Attribute, AttributeMap, AttributeReference, AttributeSet, Cast, Expression,
ExprId, IntegerLiteral, Literal, NamedExpression, PredicateHelper,
ProjectionOverSchema, SortOrder, SubqueryExpression}
import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
import org.apache.spark.sql.catalyst.optimizer.CollapseProject
import org.apache.spark.sql.catalyst.planning.{PhysicalOperation,
ScanOperation}
@@ -32,10 +33,11 @@ import
org.apache.spark.sql.catalyst.types.DataTypeUtils.toAttributes
import org.apache.spark.sql.connector.expressions.{SortOrder => V2SortOrder}
import org.apache.spark.sql.connector.expressions.aggregate.{Aggregation, Avg,
Count, CountStar, Max, Min, Sum}
import org.apache.spark.sql.connector.expressions.filter.Predicate
-import org.apache.spark.sql.connector.read.{Scan, ScanBuilder,
SupportsPushDownAggregates, SupportsPushDownFilters, SupportsPushDownJoin,
V1Scan}
-import org.apache.spark.sql.execution.datasources.DataSourceStrategy
+import org.apache.spark.sql.connector.read.{Scan, ScanBuilder,
SupportsPushDownAggregates, SupportsPushDownFilters, SupportsPushDownJoin,
SupportsPushDownVariantExtractions, V1Scan, VariantExtraction}
+import org.apache.spark.sql.execution.datasources.{DataSourceStrategy,
VariantInRelation}
+import org.apache.spark.sql.internal.connector.VariantExtractionImpl
import org.apache.spark.sql.sources
-import org.apache.spark.sql.types.{DataType, DecimalType, IntegerType,
StructType}
+import org.apache.spark.sql.types.{DataType, DecimalType, IntegerType,
StructField, StructType}
import org.apache.spark.sql.util.SchemaUtils._
import org.apache.spark.util.ArrayImplicits._
@@ -49,9 +51,11 @@ object V2ScanRelationPushDown extends Rule[LogicalPlan] with
PredicateHelper {
pushDownFilters,
pushDownJoin,
pushDownAggregates,
+ pushDownVariants,
pushDownLimitAndOffset,
buildScanWithPushedAggregate,
buildScanWithPushedJoin,
+ buildScanWithPushedVariants,
pruneColumns)
pushdownRules.foldLeft(plan) { (newPlan, pushDownRule) =>
@@ -318,6 +322,139 @@ object V2ScanRelationPushDown extends Rule[LogicalPlan]
with PredicateHelper {
case agg: Aggregate => rewriteAggregate(agg)
}
+ def pushDownVariants(plan: LogicalPlan): LogicalPlan = plan.transformDown {
+ case p@PhysicalOperation(projectList, filters, sHolder @
ScanBuilderHolder(_, _,
+ builder: SupportsPushDownVariantExtractions))
+ if
conf.getConf(org.apache.spark.sql.internal.SQLConf.PUSH_VARIANT_INTO_SCAN) =>
+ pushVariantExtractions(p, projectList, filters, sHolder, builder)
+ }
+
+ /**
+ * Converts an ordinal path to a field name path.
+ *
+ * @param structType The top-level struct type
+ * @param ordinals The ordinal path (e.g., [1, 1] for nested.field)
+ * @return The field name path (e.g., ["nested", "field"])
+ */
+ private def getColumnName(structType: StructType, ordinals: Seq[Int]):
Seq[String] = {
+ ordinals match {
+ case Seq() =>
+ // Base case: no more ordinals
+ Seq.empty
+ case ordinal +: rest =>
+ // Get the field at this ordinal
+ val field = structType.fields(ordinal)
+ if (rest.isEmpty) {
+ // Last ordinal in the path
+ Seq(field.name)
+ } else {
+ // Recurse into nested struct
+ field.dataType match {
+ case nestedStruct: StructType =>
+ field.name +: getColumnName(nestedStruct, rest)
+ case _ =>
+ throw SparkException.internalError(
+ s"Expected StructType at field '${field.name}' but got
${field.dataType}")
+ }
+ }
+ }
+ }
+
+ private def pushVariantExtractions(
+ originalPlan: LogicalPlan,
+ projectList: Seq[NamedExpression],
+ filters: Seq[Expression],
+ sHolder: ScanBuilderHolder,
+ builder: SupportsPushDownVariantExtractions): LogicalPlan = {
+ val variants = new VariantInRelation
+
+ // Extract schema attributes from scan builder holder
+ val schemaAttributes = sHolder.output
+
+ // Construct schema for default value resolution
+ val structSchema = StructType(schemaAttributes.map(a =>
+ StructField(a.name, a.dataType, a.nullable, a.metadata)))
+
+ val defaultValues =
org.apache.spark.sql.catalyst.util.ResolveDefaultColumns.
+ existenceDefaultValues(structSchema)
+
+ // Add variant fields from the V2 scan schema
+ for ((a, defaultValue) <- schemaAttributes.zip(defaultValues)) {
+ variants.addVariantFields(a.exprId, a.dataType, defaultValue, Nil)
+ }
+ if (variants.mapping.isEmpty) return originalPlan
+
+ // Collect requested fields from project list and filters
+ projectList.foreach(variants.collectRequestedFields)
+ filters.foreach(variants.collectRequestedFields)
+
+ // If no variant columns remain after collection, return original plan
+ if (variants.mapping.forall(_._2.isEmpty)) return originalPlan
+
+ // Build individual VariantExtraction for each field access
+ // Track which extraction corresponds to which (attr, field, ordinal)
+ val extractionInfo = schemaAttributes.flatMap { topAttr =>
+ val variantFields = variants.mapping.get(topAttr.exprId)
+ if (variantFields.isEmpty || variantFields.get.isEmpty) {
+ // No variant fields for this attribute
+ Seq.empty
+ } else {
+ variantFields.get.toSeq.flatMap { case (pathToVariant, fields) =>
+ val columnName = if (pathToVariant.isEmpty) {
+ Seq(topAttr.name)
+ } else {
+ Seq(topAttr.name) ++
+ getColumnName(topAttr.dataType.asInstanceOf[StructType],
pathToVariant)
+ }
+ fields.toArray.sortBy(_._2).map { case (field, ordinal) =>
+ val extraction = new VariantExtractionImpl(
+ columnName.toArray,
+ field.path.toMetadata,
+ field.targetType
+ )
+ (extraction, topAttr, field, ordinal)
+ }
+ }
+ }
+ }
+
+ // Call the API to push down variant extractions
+ if (extractionInfo.isEmpty) return originalPlan
+
+ val extractions: Array[VariantExtraction] =
extractionInfo.map(_._1).toArray
+ val pushedResults = builder.pushVariantExtractions(extractions)
+
+ // Filter to only the accepted extractions
+ val acceptedExtractions =
extractionInfo.zip(pushedResults).filter(_._2).map(_._1)
+ if (acceptedExtractions.isEmpty) return originalPlan
+
+ // Group accepted extractions by attribute to rebuild the struct schemas
+ val extractionsByAttr = acceptedExtractions.groupBy(_._2)
+ val pushedColumnNames = extractionsByAttr.keys.map(_.name).toSet
+
+ // Build new attribute mapping based on pushed variant extractions
+ val attributeMap = schemaAttributes.map { a =>
+ if (pushedColumnNames.contains(a.name) &&
variants.mapping.get(a.exprId).exists(_.nonEmpty)) {
+ val newType = variants.rewriteType(a.exprId, a.dataType, Nil)
+ val newAttr = AttributeReference(a.name, newType, a.nullable,
a.metadata)(
+ qualifier = a.qualifier)
+ (a.exprId, newAttr)
+ } else {
+ (a.exprId, a.asInstanceOf[AttributeReference])
+ }
+ }.toMap
+
+ val newOutput = sHolder.output.map(a => attributeMap.getOrElse(a.exprId,
a))
+
+ // Store the transformation info on the holder for later use
+ sHolder.pushedVariants = Some(variants)
+ sHolder.pushedVariantAttributeMap = attributeMap
+ sHolder.output = newOutput
+
+ // Return the original plan unchanged - transformation happens in
buildScanWithPushedVariants
+ originalPlan
+ }
+
private def rewriteAggregate(agg: Aggregate): LogicalPlan = agg.child match {
case PhysicalOperation(project, Nil, holder @ ScanBuilderHolder(_, _,
r: SupportsPushDownAggregates)) if
CollapseProject.canCollapseExpressions(
@@ -589,6 +726,48 @@ object V2ScanRelationPushDown extends Rule[LogicalPlan]
with PredicateHelper {
Project(projectList, scanRelation)
}
+ def buildScanWithPushedVariants(plan: LogicalPlan): LogicalPlan =
plan.transform {
+ case p@PhysicalOperation(projectList, filters, holder: ScanBuilderHolder)
+ if holder.pushedVariants.isDefined =>
+ val variants = holder.pushedVariants.get
+ val attributeMap = holder.pushedVariantAttributeMap
+
+ // Build the scan
+ val scan = holder.builder.build()
+ val realOutput = toAttributes(scan.readSchema())
+ val wrappedScan = getWrappedScan(scan, holder)
+ val scanRelation = DataSourceV2ScanRelation(holder.relation,
wrappedScan, realOutput)
+
+ // Create projection to map real output to expected output (with
transformed types)
+ val outputProjection = realOutput.zip(holder.output).map { case
(realAttr, expectedAttr) =>
+ Alias(realAttr, expectedAttr.name)(expectedAttr.exprId)
+ }
+
+ // Rewrite filter expressions using the variant transformation
+ val rewrittenFilters = if (filters.nonEmpty) {
+ val rewrittenFilterExprs = filters.map(variants.rewriteExpr(_,
attributeMap))
+ Some(rewrittenFilterExprs.reduce(And))
+ } else {
+ None
+ }
+
+ // Rewrite project list expressions using the variant transformation
+ val rewrittenProjectList = projectList.map { e =>
+ val rewritten = variants.rewriteExpr(e, attributeMap)
+ rewritten match {
+ case n: NamedExpression => n
+ // When the variant column is directly selected, we replace the
attribute
+ // reference with a struct access, which is not a NamedExpression.
Wrap it with Alias.
+ case _ => Alias(rewritten, e.name)(e.exprId, e.qualifier)
+ }
+ }
+
+ // Build the plan: Project(outputProjection) -> [Filter?] -> scanRelation
+ val withProjection = Project(outputProjection, scanRelation)
+ val withFilter = rewrittenFilters.map(Filter(_,
withProjection)).getOrElse(withProjection)
+ Project(rewrittenProjectList, withFilter)
+ }
+
def pruneColumns(plan: LogicalPlan): LogicalPlan = plan.transform {
case ScanOperation(project, filtersStayUp, filtersPushDown, sHolder:
ScanBuilderHolder) =>
// column pruning
@@ -834,6 +1013,10 @@ case class ScanBuilderHolder(
var joinedRelationsPushedDownOperators: Seq[PushedDownOperators] =
Seq.empty[PushedDownOperators]
var pushedJoinOutputMap: AttributeMap[Expression] =
AttributeMap.empty[Expression]
+
+ var pushedVariantAttributeMap: Map[ExprId, AttributeReference] = Map.empty
+
+ var pushedVariants: Option[VariantInRelation] = None
}
// A wrapper for v1 scan to carry the translated filters and the handled ones,
along with
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetScan.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetScan.scala
index d347cb04f0bc..5a427aad5f89 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetScan.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetScan.scala
@@ -25,13 +25,13 @@ import org.apache.parquet.hadoop.ParquetInputFormat
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.connector.expressions.aggregate.Aggregation
-import org.apache.spark.sql.connector.read.{PartitionReaderFactory,
SupportsPushDownVariants, VariantAccessInfo}
-import org.apache.spark.sql.execution.datasources.{AggregatePushDownUtils,
PartitioningAwareFileIndex}
+import org.apache.spark.sql.connector.read.{PartitionReaderFactory,
VariantExtraction}
+import org.apache.spark.sql.execution.datasources.{AggregatePushDownUtils,
PartitioningAwareFileIndex, VariantMetadata}
import org.apache.spark.sql.execution.datasources.parquet.{ParquetOptions,
ParquetReadSupport, ParquetWriteSupport}
import org.apache.spark.sql.execution.datasources.v2.FileScan
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources.Filter
-import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.types.{BooleanType, DataType, StructField,
StructType, VariantType}
import org.apache.spark.sql.util.CaseInsensitiveStringMap
import org.apache.spark.util.ArrayImplicits._
import org.apache.spark.util.SerializableConfiguration
@@ -48,8 +48,7 @@ case class ParquetScan(
pushedAggregate: Option[Aggregation] = None,
partitionFilters: Seq[Expression] = Seq.empty,
dataFilters: Seq[Expression] = Seq.empty,
- pushedVariantAccessInfo: Array[VariantAccessInfo] = Array.empty) extends
FileScan
- with SupportsPushDownVariants {
+ pushedVariantExtractions: Array[VariantExtraction] = Array.empty) extends
FileScan {
override def isSplitable(path: Path): Boolean = {
// If aggregate is pushed down, only the file footer will be read once,
// so file should not be split across multiple tasks.
@@ -58,20 +57,57 @@ case class ParquetScan(
// Build transformed schema if variant pushdown is active
private def effectiveReadDataSchema: StructType = {
- if (_pushedVariantAccess.isEmpty) {
+ if (pushedVariantExtractions.isEmpty) {
readDataSchema
} else {
- // Build a mapping from column name to extracted schema
- val variantSchemaMap = _pushedVariantAccess.map(info =>
- info.columnName() -> info.extractedSchema()).toMap
-
- // Transform the read data schema by replacing variant columns with
their extracted schemas
- StructType(readDataSchema.map { field =>
- variantSchemaMap.get(field.name) match {
- case Some(extractedSchema) => field.copy(dataType = extractedSchema)
- case None => field
+ rewriteVariantPushdownSchema(readDataSchema)
+ }
+ }
+
+ private def rewriteVariantPushdownSchema(schema: StructType): StructType = {
+ // Group extractions by column name and build extracted schemas
+ val variantSchemaMap: Map[Seq[String], StructType] =
pushedVariantExtractions
+ .groupBy(e => e.columnName().toSeq)
+ .map { case (colName, extractions) =>
+ // Build struct schema with ordinal-named fields for each extraction
+ var fields = extractions.zipWithIndex.map { case (extraction, idx) =>
+ // Attach VariantMetadata so Parquet reader knows this is a variant
extraction
+ StructField(idx.toString, extraction.expectedDataType(), nullable =
true,
+ extraction.metadata())
+ }
+
+ // Avoid producing an empty struct of requested fields. This happens
+ // if the variant is not used, or only used in `IsNotNull/IsNull`
expressions.
+ // The value of the placeholder field doesn't matter.
+ if (fields.size == 1 &&
fields.head.dataType.isInstanceOf[VariantType]) {
+ val placeholder = VariantMetadata("$.__placeholder_field__",
+ failOnError = false, timeZoneId = "UTC")
+ fields = Array(StructField("0", BooleanType,
+ metadata = placeholder.toMetadata))
}
- })
+
+ colName -> StructType(fields)
+ }.toMap
+
+ rewriteType(schema, Seq.empty, variantSchemaMap).asInstanceOf[StructType]
+ }
+
+ private def rewriteType(
+ dataType: DataType,
+ path: Seq[String],
+ mapping: Map[Seq[String], StructType]): DataType = {
+ dataType match {
+ case structType: StructType if
!VariantMetadata.isVariantStruct(structType) =>
+ val fields = structType.fields.map { field =>
+ mapping.get(path :+ field.name) match {
+ case Some(extractedSchema) =>
+ field.copy(dataType = extractedSchema)
+ case None =>
+ field.copy(dataType = rewriteType(field.dataType, path :+
field.name, mapping))
+ }
+ }
+ StructType(fields)
+ case otherType => otherType
}
}
@@ -84,38 +120,14 @@ case class ParquetScan(
// super.readSchema() combines readDataSchema + readPartitionSchema
// Apply variant transformation if variant pushdown is active
val baseSchema = super.readSchema()
- if (_pushedVariantAccess.isEmpty) {
+ if (pushedVariantExtractions.isEmpty) {
baseSchema
} else {
- val variantSchemaMap = _pushedVariantAccess.map(info =>
- info.columnName() -> info.extractedSchema()).toMap
- StructType(baseSchema.map { field =>
- variantSchemaMap.get(field.name) match {
- case Some(extractedSchema) => field.copy(dataType =
extractedSchema)
- case None => field
- }
- })
+ rewriteVariantPushdownSchema(baseSchema)
}
}
}
- // SupportsPushDownVariants API implementation
- private var _pushedVariantAccess: Array[VariantAccessInfo] =
pushedVariantAccessInfo
-
- override def pushVariantAccess(variantAccessInfo: Array[VariantAccessInfo]):
Boolean = {
- // Parquet supports variant pushdown for all variant accesses
- if (variantAccessInfo.nonEmpty) {
- _pushedVariantAccess = variantAccessInfo
- true
- } else {
- false
- }
- }
-
- override def pushedVariantAccess(): Array[VariantAccessInfo] = {
- _pushedVariantAccess
- }
-
override def createReaderFactory(): PartitionReaderFactory = {
val effectiveSchema = effectiveReadDataSchema
val readDataSchemaAsJson = effectiveSchema.json
@@ -171,8 +183,8 @@ case class ParquetScan(
pushedAggregate.isEmpty && p.pushedAggregate.isEmpty
}
val pushedVariantEqual =
-
java.util.Arrays.equals(_pushedVariantAccess.asInstanceOf[Array[Object]],
- p._pushedVariantAccess.asInstanceOf[Array[Object]])
+
java.util.Arrays.equals(pushedVariantExtractions.asInstanceOf[Array[Object]],
+ p.pushedVariantExtractions.asInstanceOf[Array[Object]])
super.equals(p) && dataSchema == p.dataSchema && options == p.options &&
equivalentFilters(pushedFilters, p.pushedFilters) &&
pushedDownAggEqual &&
pushedVariantEqual
@@ -189,15 +201,17 @@ case class ParquetScan(
}
override def getMetaData(): Map[String, String] = {
- val variantAccessStr = if (_pushedVariantAccess.nonEmpty) {
- _pushedVariantAccess.map(info =>
- s"${info.columnName()}->${info.extractedSchema()}").mkString("[", ",
", "]")
+ val variantExtractionStr = if (pushedVariantExtractions.nonEmpty) {
+ pushedVariantExtractions.map { extraction =>
+ val colName = extraction.columnName().mkString(".")
+ s"$colName:${extraction.metadata()}:${extraction.expectedDataType()}"
+ }.mkString("[", ", ", "]")
} else {
"[]"
}
super.getMetaData() ++ Map("PushedFilters" ->
seqToString(pushedFilters.toImmutableArraySeq)) ++
Map("PushedAggregation" -> pushedAggregationsStr) ++
Map("PushedGroupBy" -> pushedGroupByStr) ++
- Map("PushedVariantAccess" -> variantAccessStr)
+ Map("PushedVariantExtractions" -> variantExtractionStr)
}
}
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetScanBuilder.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetScanBuilder.scala
index 01367675e65b..94da53f22934 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetScanBuilder.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetScanBuilder.scala
@@ -22,7 +22,7 @@ import scala.jdk.CollectionConverters._
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.util.RebaseDateTime.RebaseSpec
import org.apache.spark.sql.connector.expressions.aggregate.Aggregation
-import org.apache.spark.sql.connector.read.SupportsPushDownAggregates
+import org.apache.spark.sql.connector.read.{SupportsPushDownAggregates,
SupportsPushDownVariantExtractions, VariantExtraction}
import org.apache.spark.sql.execution.datasources.{AggregatePushDownUtils,
PartitioningAwareFileIndex}
import org.apache.spark.sql.execution.datasources.parquet.{ParquetFilters,
SparkToParquetSchemaConverter}
import org.apache.spark.sql.execution.datasources.v2.FileScanBuilder
@@ -39,7 +39,8 @@ case class ParquetScanBuilder(
dataSchema: StructType,
options: CaseInsensitiveStringMap)
extends FileScanBuilder(sparkSession, fileIndex, dataSchema)
- with SupportsPushDownAggregates {
+ with SupportsPushDownAggregates
+ with SupportsPushDownVariantExtractions {
lazy val hadoopConf = {
val caseSensitiveMap = options.asCaseSensitiveMap.asScala.toMap
// Hadoop Configurations are case sensitive.
@@ -50,6 +51,8 @@ case class ParquetScanBuilder(
private var pushedAggregations = Option.empty[Aggregation]
+ private var pushedVariantExtractions = Array.empty[VariantExtraction]
+
override protected val supportsNestedSchemaPruning: Boolean = true
override def pushDataFilters(dataFilters: Array[Filter]): Array[Filter] = {
@@ -99,6 +102,14 @@ case class ParquetScanBuilder(
}
}
+ // SupportsPushDownVariantExtractions API implementation
+ override def pushVariantExtractions(extractions: Array[VariantExtraction]):
Array[Boolean] = {
+ // Parquet supports variant pushdown for all variant extractions
+ pushedVariantExtractions = extractions
+ // Return true for all extractions (Parquet can handle all of them)
+ Array.fill(extractions.length)(true)
+ }
+
override def build(): ParquetScan = {
// the `finalSchema` is either pruned in pushAggregation (if aggregates are
// pushed down), or pruned in readDataSchema() (in regular column
pruning). These
@@ -108,6 +119,6 @@ case class ParquetScanBuilder(
}
ParquetScan(sparkSession, hadoopConf, fileIndex, dataSchema, finalSchema,
readPartitionSchema(), pushedDataFilters, options, pushedAggregations,
- partitionFilters, dataFilters)
+ partitionFilters, dataFilters, pushedVariantExtractions)
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]