yihua commented on code in PR #9276:
URL: https://github.com/apache/hudi/pull/9276#discussion_r1280272430
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieBootstrapConfig.java:
##########
@@ -67,13 +67,6 @@ public class HoodieBootstrapConfig extends HoodieConfig {
.sinceVersion("0.6.0")
.withDocumentation("Selects the mode in which each file/partition in the
bootstrapped dataset gets bootstrapped");
- public static final ConfigProperty<String> DATA_QUERIES_ONLY = ConfigProperty
- .key("hoodie.bootstrap.data.queries.only")
- .defaultValue("false")
- .markAdvanced()
- .sinceVersion("0.14.0")
- .withDocumentation("Improves query performance, but queries cannot use
hudi metadata fields");
Review Comment:
Could you add elaboration on what this config controls? Currently, it's not
apparent.
##########
hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala:
##########
@@ -166,6 +170,17 @@ trait SparkAdapter extends Serializable {
* Create instance of [[ParquetFileFormat]]
*/
def createHoodieParquetFileFormat(appendPartitionValues: Boolean):
Option[ParquetFileFormat]
+ def createMORBootstrapFileFormat(appendPartitionValues: Boolean,
Review Comment:
In terms of the naming, could we do this:
existing file format: `HoodieParquetFileFormat` ->
`LegacyHoodieParquetFileFormat`
new file format: `HoodieParquetFileFormat`
The goal is that the new file format should be used for all query types by
default in the future, so we can pick the naming `HoodieParquetFileFormat` for
the new format, while renaming existing classes to different names and
deprecate and remove them once the new file format is stable. wdyt?
##########
hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieTypes.scala:
##########
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi
+
+import org.apache.hudi.common.config.HoodieMetadataConfig
+import org.apache.hudi.internal.schema.InternalSchema
+import org.apache.spark.sql.types.StructType
+
+case class HoodieTableSchema(structTypeSchema: StructType, avroSchemaStr:
String, internalSchema: Option[InternalSchema] = None)
+
+case class HoodieTableState(tablePath: String,
Review Comment:
Scaladocs to mention this is used for broadcasting in Spark?
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala:
##########
@@ -247,6 +245,9 @@ object DefaultSource {
Option(schema)
}
+ val useMORBootstrapFF = parameters.getOrElse(MOR_BOOTSTRAP_FILE_READER.key,
+ MOR_BOOTSTRAP_FILE_READER.defaultValue).toBoolean && (globPaths == null
|| globPaths.isEmpty)
Review Comment:
Any reason not to use the new file format with globbed paths?
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala:
##########
@@ -262,16 +263,30 @@ object DefaultSource {
new IncrementalRelation(sqlContext, parameters, userSchema,
metaClient)
case (MERGE_ON_READ, QUERY_TYPE_SNAPSHOT_OPT_VAL, false) =>
- new MergeOnReadSnapshotRelation(sqlContext, parameters, metaClient,
globPaths, userSchema)
+ val relation = new MergeOnReadSnapshotRelation(sqlContext,
parameters, metaClient, globPaths, userSchema)
+ if (useMORBootstrapFF && !relation.hasSchemaOnRead) {
+ relation.toHadoopFsRelation
+ } else {
+ relation
+ }
case (MERGE_ON_READ, QUERY_TYPE_INCREMENTAL_OPT_VAL, _) =>
new MergeOnReadIncrementalRelation(sqlContext, parameters,
metaClient, userSchema)
case (MERGE_ON_READ, QUERY_TYPE_SNAPSHOT_OPT_VAL, true) =>
- new HoodieBootstrapMORRelation(sqlContext, userSchema, globPaths,
metaClient, parameters)
-
+ val relation = new HoodieBootstrapMORRelation(sqlContext,
userSchema, globPaths, metaClient, parameters)
+ if (useMORBootstrapFF && !relation.hasSchemaOnRead) {
+ relation.toHadoopFsRelation
+ } else {
+ relation
+ }
case (_, _, true) =>
- resolveHoodieBootstrapRelation(sqlContext, globPaths, userSchema,
metaClient, parameters)
+ val relation = new HoodieBootstrapRelation(sqlContext, userSchema,
globPaths, metaClient, parameters)
+ if (useMORBootstrapFF && !relation.hasSchemaOnRead) {
+ relation.toHadoopFsRelation
+ } else {
+ relation
Review Comment:
Existing logic may still use `HadoopFsRelation` based on multiple conditions:
```
!enableFileIndex || isSchemaEvolutionEnabledOnRead
|| globPaths.nonEmpty || !parameters.getOrElse(DATA_QUERIES_ONLY.key,
DATA_QUERIES_ONLY.defaultValue).toBoolean
```
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBootstrapRelation.scala:
##########
@@ -59,12 +59,16 @@ case class HoodieBootstrapRelation(override val sqlContext:
SQLContext,
this.copy(prunedDataSchema = Some(prunedSchema))
def toHadoopFsRelation: HadoopFsRelation = {
+ fileIndex.shouldBroadcast = true
HadoopFsRelation(
location = fileIndex,
partitionSchema = fileIndex.partitionSchema,
dataSchema = fileIndex.dataSchema,
bucketSpec = None,
- fileFormat = fileFormat,
+ fileFormat =
sparkAdapter.createMORBootstrapFileFormat(shouldExtractPartitionValuesFromPartitionPath,
Review Comment:
same here
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala:
##########
@@ -88,18 +83,10 @@ class SparkHoodieTableFileIndex(spark: SparkSession,
/**
* Get the schema of the table.
*/
- lazy val schema: StructType = if (shouldFastBootstrap) {
- StructType(rawSchema.fields.filterNot(f =>
HOODIE_META_COLUMNS_WITH_OPERATION.contains(f.name)))
- } else {
- rawSchema
- }
-
- private lazy val rawSchema: StructType = schemaSpec.getOrElse({
- val schemaUtil = new TableSchemaResolver(metaClient)
-
AvroConversionUtils.convertAvroSchemaToStructType(schemaUtil.getTableAvroSchema)
- })
Review Comment:
is this not used?
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala:
##########
@@ -87,6 +87,12 @@ object DataSourceReadOptions {
s"payload implementation to merge (${REALTIME_PAYLOAD_COMBINE_OPT_VAL})
or skip merging altogether" +
s"${REALTIME_SKIP_MERGE_OPT_VAL}")
+ val MOR_BOOTSTRAP_FILE_READER: ConfigProperty[String] = ConfigProperty
+ .key("hoodie.datasource.read.mor.bootstrap.file.reader")
Review Comment:
rename to `hoodie.datasource.read.use.new.file.format`?
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala:
##########
@@ -102,8 +101,7 @@ class DefaultSource extends RelationProvider
)
} else {
Map()
- }) ++ DataSourceOptionsHelper.parametersWithReadDefaults(optParams +
- (DATA_QUERIES_ONLY.key() -> sqlContext.getConf(DATA_QUERIES_ONLY.key(),
optParams.getOrElse(DATA_QUERIES_ONLY.key(),
DATA_QUERIES_ONLY.defaultValue()))))
Review Comment:
Do we still need this logic with the legacy file format and MOR RDD approach?
##########
hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieCatalystPlansUtils.scala:
##########
@@ -77,6 +78,9 @@ trait HoodieCatalystPlansUtils {
*/
def unapplyMergeIntoTable(plan: LogicalPlan): Option[(LogicalPlan,
LogicalPlan, Expression)]
+
+ def applyMORBootstrapFileFormatProjection(plan: LogicalPlan): LogicalPlan
Review Comment:
Could we rename this as `applyNewHoodieParquetFileFormatProjection` to align
with `NewHoodieParquetFileFormat`?
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/InternalRowBroadcast.scala:
##########
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi
+
+import org.apache.hudi.common.model.FileSlice
+import org.apache.spark.broadcast.Broadcast
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.util.{ArrayData, MapData}
+import org.apache.spark.sql.types.{DataType, Decimal}
+import org.apache.spark.unsafe.types.{CalendarInterval, UTF8String}
+
+class InternalRowBroadcast(internalRow: InternalRow,
Review Comment:
Should this be renamed to `PartitionFileSliceMapping`?
##########
hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/MORBootstrap24FileFormat.scala:
##########
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.parquet
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hudi.{HoodieTableSchema, HoodieTableState}
+import org.apache.spark.broadcast.Broadcast
+import org.apache.spark.sql.BootstrapMORIteratorFactory.MORBootstrapFileFormat
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.execution.datasources.PartitionedFile
+import org.apache.spark.sql.sources.Filter
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.{BootstrapMORIteratorFactory, SparkSession}
+
+class MORBootstrap24FileFormat(shouldAppendPartitionValues: Boolean,
Review Comment:
Could we make the new file format implementation independent of existing
implementation and directly extending `ParquetFileFormat`? That way, we can
keep the current file format implementation intact and remove current file
format classes easily in the future.
For simplicity, not all features need to be supported in the new file
format, such as full schema evolution (schema-on-read).
##########
hudi-spark-datasource/hudi-spark2/src/main/java/org/apache/spark/sql/execution/datasources/parquet/Spark24SpecificParquetRecordReaderBase.java:
##########
@@ -0,0 +1,311 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.parquet;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.parquet.bytes.BytesInput;
+import org.apache.parquet.bytes.BytesUtils;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.column.values.ValuesReader;
+import org.apache.parquet.column.values.rle.RunLengthBitPackingHybridDecoder;
+import org.apache.parquet.filter2.compat.FilterCompat;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.ParquetInputSplit;
+import org.apache.parquet.hadoop.api.InitContext;
+import org.apache.parquet.hadoop.api.ReadSupport;
+import org.apache.parquet.hadoop.metadata.BlockMetaData;
+import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.Types;
+import org.apache.spark.TaskContext;
+import org.apache.spark.TaskContext$;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.sql.types.StructType$;
+import org.apache.spark.util.AccumulatorV2;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import scala.Option;
+
+import static org.apache.parquet.filter2.compat.RowGroupFilter.filterRowGroups;
+import static
org.apache.parquet.format.converter.ParquetMetadataConverter.NO_FILTER;
+import static
org.apache.parquet.format.converter.ParquetMetadataConverter.range;
+import static org.apache.parquet.hadoop.ParquetFileReader.readFooter;
+import static org.apache.parquet.hadoop.ParquetInputFormat.getFilter;
+
+/**
+ * Base class for custom RecordReaders for Parquet that directly materialize
to `T`.
+ * This class handles computing row groups, filtering on them, setting up the
column readers,
+ * etc.
+ * This is heavily based on parquet-mr's RecordReader.
+ * TODO: move this to the parquet-mr project. There are performance benefits
of doing it
+ * this way, albeit at a higher cost to implement. This base class is reusable.
+ */
+public abstract class Spark24SpecificParquetRecordReaderBase<T> extends
RecordReader<Void, T> {
Review Comment:
Why are `Spark24SpecificParquetRecordReaderBase` and
`Spark24VectorizedParquetRecordReader` copied over?
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala:
##########
@@ -87,6 +87,12 @@ object DataSourceReadOptions {
s"payload implementation to merge (${REALTIME_PAYLOAD_COMBINE_OPT_VAL})
or skip merging altogether" +
s"${REALTIME_SKIP_MERGE_OPT_VAL}")
+ val MOR_BOOTSTRAP_FILE_READER: ConfigProperty[String] = ConfigProperty
+ .key("hoodie.datasource.read.mor.bootstrap.file.reader")
+ .defaultValue("true")
+ .markAdvanced()
Review Comment:
Don't forget to mark since version.
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBootstrapMORRelation.scala:
##########
@@ -108,4 +111,18 @@ case class HoodieBootstrapMORRelation(override val
sqlContext: SQLContext,
override def updatePrunedDataSchema(prunedSchema: StructType):
HoodieBootstrapMORRelation =
this.copy(prunedDataSchema = Some(prunedSchema))
+
+ def toHadoopFsRelation: HadoopFsRelation = {
+ fileIndex.shouldBroadcast = true
+ HadoopFsRelation(
+ location = fileIndex,
+ partitionSchema = fileIndex.partitionSchema,
+ dataSchema = fileIndex.dataSchema,
+ bucketSpec = None,
+ fileFormat =
sparkAdapter.createMORBootstrapFileFormat(shouldExtractPartitionValuesFromPartitionPath,
Review Comment:
This should be controlled by the config, i.e., using existing Hudi file
format implementation my default.
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala:
##########
@@ -226,9 +214,7 @@ abstract class HoodieBaseRelation(val sqlContext:
SQLContext,
val shouldExtractPartitionValueFromPath =
optParams.getOrElse(DataSourceReadOptions.EXTRACT_PARTITION_VALUES_FROM_PARTITION_PATH.key,
DataSourceReadOptions.EXTRACT_PARTITION_VALUES_FROM_PARTITION_PATH.defaultValue.toString).toBoolean
- val shouldUseBootstrapFastRead =
optParams.getOrElse(DATA_QUERIES_ONLY.key(), "false").toBoolean
Review Comment:
We should keep this assuming that the new file format for bootstrap queries
is not turned on by default.
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala:
##########
@@ -45,13 +45,27 @@ case class MergeOnReadSnapshotRelation(override val
sqlContext: SQLContext,
private val globPaths: Seq[Path],
private val userSchema:
Option[StructType],
private val prunedDataSchema:
Option[StructType] = None)
- extends BaseMergeOnReadSnapshotRelation(sqlContext, optParams, metaClient,
globPaths, userSchema, prunedDataSchema) {
+ extends BaseMergeOnReadSnapshotRelation(sqlContext, optParams, metaClient,
globPaths, userSchema, prunedDataSchema) with SparkAdapterSupport {
override type Relation = MergeOnReadSnapshotRelation
override def updatePrunedDataSchema(prunedSchema: StructType):
MergeOnReadSnapshotRelation =
this.copy(prunedDataSchema = Some(prunedSchema))
+ def toHadoopFsRelation: HadoopFsRelation = {
+ fileIndex.shouldBroadcast = true
+ HadoopFsRelation(
+ location = fileIndex,
+ partitionSchema = fileIndex.partitionSchema,
+ dataSchema = fileIndex.dataSchema,
+ bucketSpec = None,
+ fileFormat =
sparkAdapter.createMORBootstrapFileFormat(shouldExtractPartitionValuesFromPartitionPath,
+ sparkSession.sparkContext.broadcast(tableState),
+
sparkSession.sparkContext.broadcast(HoodieTableSchema(tableStructSchema,
tableAvroSchema.toString, internalSchemaOpt)),
+ metaClient.getTableConfig.getTableName, mergeType, mandatoryFields,
isMOR = true, isBootstrap = false).get,
+ optParams)(sparkSession)
+ }
Review Comment:
Could the `HadoopFsRelation` object be directly created in `DefaultSource`
without going through `MergeOnReadSnapshotRelation`? (I have a similar
question on bootstrap query too)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]