This is an automated email from the ASF dual-hosted git repository.
codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 7f837d0d0a4 [HUDI-7749] Bump Spark version 3.3.1 to 3.3.4 (#11198)
7f837d0d0a4 is described below
commit 7f837d0d0a46f937db5685bf7365fafe3be35b76
Author: Sagar Sumit <[email protected]>
AuthorDate: Tue May 14 16:19:00 2024 +0530
[HUDI-7749] Bump Spark version 3.3.1 to 3.3.4 (#11198)
* [HUDI-7749] Bump Spark version 3.3.1 to 3.3.4
* cdcFileReader should return batches for CDC reads only when batch read is
supported for the schema
---
.../HoodieFileGroupReaderBasedParquetFileFormat.scala | 12 ++++++++++--
pom.xml | 2 +-
2 files changed, 11 insertions(+), 3 deletions(-)
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedParquetFileFormat.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedParquetFileFormat.scala
index fc1bcacf3be..205620a89a0 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedParquetFileFormat.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedParquetFileFormat.scala
@@ -39,7 +39,7 @@ import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.JoinedRow
import org.apache.spark.sql.execution.datasources.PartitionedFile
-import
org.apache.spark.sql.execution.datasources.parquet.HoodieFileGroupReaderBasedParquetFileFormat.{ROW_INDEX_TEMPORARY_COLUMN_NAME,
getAppliedFilters, getAppliedRequiredSchema, getRecordKeyRelatedFilters,
makeCloseableFileGroupMappingRecordIterator}
+import
org.apache.spark.sql.execution.datasources.parquet.HoodieFileGroupReaderBasedParquetFileFormat.{OPTION_RETURNING_BATCH,
ROW_INDEX_TEMPORARY_COLUMN_NAME, getAppliedFilters, getAppliedRequiredSchema,
getRecordKeyRelatedFilters, makeCloseableFileGroupMappingRecordIterator}
import org.apache.spark.sql.hudi.HoodieSqlCommonUtils.isMetaField
import org.apache.spark.sql.sources.Filter
import org.apache.spark.sql.types.{LongType, Metadata, MetadataBuilder,
StringType, StructField, StructType}
@@ -312,7 +312,7 @@ class
HoodieFileGroupReaderBasedParquetFileFormat(tableState: HoodieTableState,
StructType(Nil),
tableSchema.structTypeSchema,
Nil,
- options,
+ options + (OPTION_RETURNING_BATCH -> super.supportBatch(sparkSession,
tableSchema.structTypeSchema).toString),
new Configuration(hadoopConf))
//Rules for appending partitions and filtering in the bootstrap readers:
@@ -381,6 +381,14 @@ object HoodieFileGroupReaderBasedParquetFileFormat {
private val FILE_SOURCE_METADATA_COL_ATTR_KEY = "__file_source_metadata_col"
private val METADATA_COL_ATTR_KEY = "__metadata_col"
+ /**
+ * A required option (since Spark 3.3.2) to pass to
buildReaderWithPartitionValues to return columnar batch output or not.
+ * For ParquetFileFormat and OrcFileFormat, passing this option is required.
+ * This should only be passed as true if it can actually be supported, which
can be checked
+ * by calling supportBatch.
+ */
+ private val OPTION_RETURNING_BATCH = "returning_batch"
+
def getRecordKeyRelatedFilters(filters: Seq[Filter], recordKeyColumn:
String): Seq[Filter] = {
filters.filter(f => f.references.exists(c =>
c.equalsIgnoreCase(recordKeyColumn)))
}
diff --git a/pom.xml b/pom.xml
index 2fd99672a2f..0fc50e07b71 100644
--- a/pom.xml
+++ b/pom.xml
@@ -166,7 +166,7 @@
<spark30.version>3.0.2</spark30.version>
<spark31.version>3.1.3</spark31.version>
<spark32.version>3.2.3</spark32.version>
- <spark33.version>3.3.1</spark33.version>
+ <spark33.version>3.3.4</spark33.version>
<spark34.version>3.4.3</spark34.version>
<spark35.version>3.5.1</spark35.version>
<hudi.spark.module>hudi-spark3.2.x</hudi.spark.module>