umehrot2 commented on a change in pull request #1848:
URL: https://github.com/apache/hudi/pull/1848#discussion_r459294057
##########
File path: hudi-spark/src/main/scala/org/apache/hudi/DefaultSource.scala
##########
@@ -58,26 +61,20 @@ class DefaultSource extends RelationProvider
throw new HoodieException("'path' must be specified.")
}
+ val fs = FSUtils.getFs(path.get,
sqlContext.sparkContext.hadoopConfiguration)
+ val globPaths = HudiSparkUtils.checkAndGlobPathIfNecessary(Seq(path.get),
fs)
+ val tablePath = DataSourceUtils.getTablePath(fs, globPaths.toArray)
+ val metaClient = new HoodieTableMetaClient(fs.getConf, tablePath)
Review comment:
Can we try to avoid having to do this part always ? It is not required
for the `BaseFileOnly` case, so it would be good if we can avoid it to decrease
the unnecessary overhead for `Read Optimized` queries.
##########
File path: hudi-spark/src/main/scala/org/apache/hudi/SnapshotRelation.scala
##########
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi
+
+import org.apache.hudi.avro.HoodieAvroUtils
+import org.apache.hudi.common.model.HoodieBaseFile
+import org.apache.hudi.common.table.{HoodieTableMetaClient,
TableSchemaResolver}
+import org.apache.hudi.common.table.view.HoodieTableFileSystemView
+import org.apache.hudi.exception.HoodieException
+import org.apache.hudi.hadoop.utils.HoodieRealtimeInputFormatUtils
+import
org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils.getMaxCompactionMemoryInBytes
+
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.mapred.JobConf
+import org.apache.spark.internal.Logging
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.execution.datasources.PartitionedFile
+import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
+import org.apache.spark.sql.{Row, SQLContext}
+import org.apache.spark.sql.sources.{BaseRelation, TableScan}
+import org.apache.spark.sql.types.StructType
+
+import scala.collection.JavaConverters._
+
+case class HudiMergeOnReadFileSplit(dataFile: PartitionedFile,
+ logPaths: Option[List[String]],
+ latestCommit: String,
+ tablePath: String,
+ maxCompactionMemoryInBytes: Long,
+ skipMerge: Boolean)
+
+class SnapshotRelation (val sqlContext: SQLContext,
+ val optParams: Map[String, String],
+ val userSchema: StructType,
+ val globPaths: Seq[Path],
+ val metaClient: HoodieTableMetaClient)
+ extends BaseRelation with TableScan with Logging{
+
+ private val conf = sqlContext.sparkContext.hadoopConfiguration
+
+ // use schema from latest metadata, if not present, read schema from the
data file
+ private val latestSchema = {
+ val schemaUtil = new TableSchemaResolver(metaClient)
+ val tableSchema =
HoodieAvroUtils.createHoodieWriteSchema(schemaUtil.getTableAvroSchemaWithoutMetadataFields)
+ AvroConversionUtils.convertAvroSchemaToStructType(tableSchema)
+ }
+
+ private val skipMerge = optParams.getOrElse(
+ DataSourceReadOptions.REALTIME_SKIP_MERGE_KEY,
+ DataSourceReadOptions.DEFAULT_REALTIME_SKIP_MERGE_VAL).toBoolean
+ private val maxCompactionMemoryInBytes = getMaxCompactionMemoryInBytes(new
JobConf(conf))
+ private val fileIndex = buildFileIndex()
+
+ override def schema: StructType = latestSchema
+
+ override def needConversion: Boolean = false
+
+ override def buildScan(): RDD[Row] = {
+ val parquetReaderFunction = new
ParquetFileFormat().buildReaderWithPartitionValues(
+ sparkSession = sqlContext.sparkSession,
+ dataSchema = latestSchema,
+ partitionSchema = StructType(Nil),
+ requiredSchema = latestSchema,
+ filters = Seq.empty,
Review comment:
We should try to support filter push-down. Again for the more straight
forward scenarios which I mentioned in my other comment, it should be possible
to just pass down the user filters in the reader.
However, for Log file merging scenario we will may have to take care of the
following scenario:
- Reading base file filtered out say `Row X` because of filter push-down.
- `Row X` had been updated in the log file and has an entry.
- While merging we need some way to tell that `Row X` should be filtered out
from log file as well, otherwise we may end up still returning it in the
result, because based on the merging logic I see that any remaining rows in log
file which did not have corresponding key in base file are just appended and
returned in the result.
##########
File path: hudi-spark/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala
##########
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi
+
+import org.apache.hudi.common.config.SerializableConfiguration
+import org.apache.hudi.common.fs.FSUtils
+import org.apache.hudi.common.table.log.{HoodieMergedLogRecordScanner,
LogReaderUtils}
+import org.apache.hudi.exception.HoodieException
+import org.apache.hudi.hadoop.config.HoodieRealtimeConfig
+import
org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.HOODIE_RECORD_KEY_COL_POS
+
+import org.apache.avro.Schema
+import org.apache.avro.generic.{GenericRecord, IndexedRecord}
+import org.apache.hadoop.conf.Configuration
+import org.apache.spark.broadcast.Broadcast
+import org.apache.spark.{Partition, SparkContext, TaskContext}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.avro.{AvroDeserializer, AvroSerializer,
SchemaConverters}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.execution.datasources.PartitionedFile
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.vectorized.ColumnarBatch
+
+import scala.collection.JavaConverters._
+import scala.util.Try
+
+case class HoodieMergeOnReadPartition(index: Int, split:
HoodieMergeOnReadFileSplit) extends Partition
+
+class HoodieMergeOnReadRDD(sc: SparkContext,
+ broadcastedConf:
Broadcast[SerializableConfiguration],
+ baseFileReadFunction: PartitionedFile =>
Iterator[Any],
+ dataSchema: StructType,
+ hoodieRealtimeFileSplits:
List[HoodieMergeOnReadFileSplit])
+ extends RDD[InternalRow](sc, Nil) {
+
+ // Broadcast the hadoop Configuration to executors.
+ def this(sc: SparkContext,
+ config: Configuration,
+ dataReadFunction: PartitionedFile => Iterator[Any],
+ dataSchema: StructType,
+ hoodieRealtimeFileSplits: List[HoodieMergeOnReadFileSplit]) = {
+ this(
+ sc,
+ sc.broadcast(new SerializableConfiguration(config))
+ .asInstanceOf[Broadcast[SerializableConfiguration]],
+ dataReadFunction,
+ dataSchema,
+ hoodieRealtimeFileSplits)
+ }
+
+ override def compute(split: Partition, context: TaskContext):
Iterator[InternalRow] = {
+ val mergeParquetPartition = split.asInstanceOf[HoodieMergeOnReadPartition]
+ val baseFileIterator = read(mergeParquetPartition.split.dataFile,
baseFileReadFunction)
+ mergeParquetPartition.split match {
+ case dataFileOnlySplit if dataFileOnlySplit.logPaths.isEmpty =>
+ baseFileIterator
+ case unMergeSplit if unMergeSplit.payload
+ .equals(DataSourceReadOptions.DEFAULT_MERGE_ON_READ_PAYLOAD_VAL) =>
+ unMergeFileIterator(unMergeSplit, baseFileIterator)
Review comment:
Just for my understanding, what is this use-case where we want to return
un-merged rows ? Do we do something similar for MOR queries through input
format where we want to return un-merged rows ?
##########
File path: hudi-spark/src/main/scala/org/apache/hudi/SnapshotRelation.scala
##########
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi
+
+import org.apache.hudi.avro.HoodieAvroUtils
+import org.apache.hudi.common.model.HoodieBaseFile
+import org.apache.hudi.common.table.{HoodieTableMetaClient,
TableSchemaResolver}
+import org.apache.hudi.common.table.view.HoodieTableFileSystemView
+import org.apache.hudi.exception.HoodieException
+import org.apache.hudi.hadoop.utils.HoodieRealtimeInputFormatUtils
+import
org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils.getMaxCompactionMemoryInBytes
+
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.mapred.JobConf
+import org.apache.spark.internal.Logging
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.execution.datasources.PartitionedFile
+import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
+import org.apache.spark.sql.{Row, SQLContext}
+import org.apache.spark.sql.sources.{BaseRelation, TableScan}
+import org.apache.spark.sql.types.StructType
+
+import scala.collection.JavaConverters._
+
+case class HoodieMergeOnReadFileSplit(dataFile: PartitionedFile,
+ logPaths: Option[List[String]],
+ latestCommit: String,
+ tablePath: String,
+ maxCompactionMemoryInBytes: Long,
+ payload: String,
+ orderingVal: String)
+
+class SnapshotRelation (val sqlContext: SQLContext,
Review comment:
Shouldn't we call it something like `MergeOnReadSnapshotRelation` ?
##########
File path: hudi-spark/src/main/scala/org/apache/hudi/SnapshotRelation.scala
##########
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi
+
+import org.apache.hudi.avro.HoodieAvroUtils
+import org.apache.hudi.common.model.HoodieBaseFile
+import org.apache.hudi.common.table.{HoodieTableMetaClient,
TableSchemaResolver}
+import org.apache.hudi.common.table.view.HoodieTableFileSystemView
+import org.apache.hudi.exception.HoodieException
+import org.apache.hudi.hadoop.utils.HoodieRealtimeInputFormatUtils
+import
org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils.getMaxCompactionMemoryInBytes
+
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.mapred.JobConf
+import org.apache.spark.internal.Logging
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.execution.datasources.PartitionedFile
+import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
+import org.apache.spark.sql.{Row, SQLContext}
+import org.apache.spark.sql.sources.{BaseRelation, TableScan}
+import org.apache.spark.sql.types.StructType
+
+import scala.collection.JavaConverters._
+
+case class HoodieMergeOnReadFileSplit(dataFile: PartitionedFile,
+ logPaths: Option[List[String]],
+ latestCommit: String,
+ tablePath: String,
+ maxCompactionMemoryInBytes: Long,
+ payload: String,
+ orderingVal: String)
+
+class SnapshotRelation (val sqlContext: SQLContext,
+ val optParams: Map[String, String],
+ val userSchema: StructType,
+ val globPaths: Seq[Path],
+ val metaClient: HoodieTableMetaClient)
+ extends BaseRelation with TableScan with Logging {
+
+ private val conf = sqlContext.sparkContext.hadoopConfiguration
+
+ // use schema from latest metadata, if not present, read schema from the
data file
+ private val latestSchema = {
+ val schemaUtil = new TableSchemaResolver(metaClient)
+ val tableSchema =
HoodieAvroUtils.createHoodieWriteSchema(schemaUtil.getTableAvroSchemaWithoutMetadataFields)
Review comment:
Instead of doing this, you can just do:
```
val tableSchema = schemaUtil.getTableAvroSchema
```
##########
File path: hudi-spark/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala
##########
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi
+
+import org.apache.hudi.common.config.SerializableConfiguration
+import org.apache.hudi.common.fs.FSUtils
+import org.apache.hudi.common.table.log.{HoodieMergedLogRecordScanner,
LogReaderUtils}
+import org.apache.hudi.exception.HoodieException
+import org.apache.hudi.hadoop.config.HoodieRealtimeConfig
+import
org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.HOODIE_RECORD_KEY_COL_POS
+
+import org.apache.avro.Schema
+import org.apache.avro.generic.{GenericRecord, IndexedRecord}
+import org.apache.hadoop.conf.Configuration
+import org.apache.spark.broadcast.Broadcast
+import org.apache.spark.{Partition, SparkContext, TaskContext}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.avro.{AvroDeserializer, AvroSerializer,
SchemaConverters}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.execution.datasources.PartitionedFile
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.vectorized.ColumnarBatch
+
+import scala.collection.JavaConverters._
+import scala.util.Try
+
+case class HoodieMergeOnReadPartition(index: Int, split:
HoodieMergeOnReadFileSplit) extends Partition
+
+class HoodieMergeOnReadRDD(sc: SparkContext,
Review comment:
Instead of using prefix `Hoodie` for all the newly defined classes,
shouldn't we be using `Hudi`. Isn't that where the community is headed towards ?
##########
File path: hudi-spark/src/main/scala/org/apache/hudi/SnapshotRelation.scala
##########
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi
+
+import org.apache.hudi.avro.HoodieAvroUtils
+import org.apache.hudi.common.model.HoodieBaseFile
+import org.apache.hudi.common.table.{HoodieTableMetaClient,
TableSchemaResolver}
+import org.apache.hudi.common.table.view.HoodieTableFileSystemView
+import org.apache.hudi.exception.HoodieException
+import org.apache.hudi.hadoop.utils.HoodieRealtimeInputFormatUtils
+import
org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils.getMaxCompactionMemoryInBytes
+
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.mapred.JobConf
+import org.apache.spark.internal.Logging
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.execution.datasources.PartitionedFile
+import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
+import org.apache.spark.sql.{Row, SQLContext}
+import org.apache.spark.sql.sources.{BaseRelation, TableScan}
+import org.apache.spark.sql.types.StructType
+
+import scala.collection.JavaConverters._
+
+case class HudiMergeOnReadFileSplit(dataFile: PartitionedFile,
+ logPaths: Option[List[String]],
+ latestCommit: String,
+ tablePath: String,
+ maxCompactionMemoryInBytes: Long,
+ skipMerge: Boolean)
+
+class SnapshotRelation (val sqlContext: SQLContext,
+ val optParams: Map[String, String],
+ val userSchema: StructType,
+ val globPaths: Seq[Path],
+ val metaClient: HoodieTableMetaClient)
+ extends BaseRelation with TableScan with Logging{
+
+ private val conf = sqlContext.sparkContext.hadoopConfiguration
+
+ // use schema from latest metadata, if not present, read schema from the
data file
+ private val latestSchema = {
+ val schemaUtil = new TableSchemaResolver(metaClient)
+ val tableSchema =
HoodieAvroUtils.createHoodieWriteSchema(schemaUtil.getTableAvroSchemaWithoutMetadataFields)
+ AvroConversionUtils.convertAvroSchemaToStructType(tableSchema)
+ }
+
+ private val skipMerge = optParams.getOrElse(
+ DataSourceReadOptions.REALTIME_SKIP_MERGE_KEY,
+ DataSourceReadOptions.DEFAULT_REALTIME_SKIP_MERGE_VAL).toBoolean
+ private val maxCompactionMemoryInBytes = getMaxCompactionMemoryInBytes(new
JobConf(conf))
+ private val fileIndex = buildFileIndex()
+
+ override def schema: StructType = latestSchema
+
+ override def needConversion: Boolean = false
+
+ override def buildScan(): RDD[Row] = {
+ val parquetReaderFunction = new
ParquetFileFormat().buildReaderWithPartitionValues(
Review comment:
Possibly something like `HoodieLogFileFormat` might make sense to do in
future, as it will clearly extract out the Log files reading logic.
##########
File path: hudi-spark/src/main/scala/org/apache/hudi/SnapshotRelation.scala
##########
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi
+
+import org.apache.hudi.avro.HoodieAvroUtils
+import org.apache.hudi.common.model.HoodieBaseFile
+import org.apache.hudi.common.table.{HoodieTableMetaClient,
TableSchemaResolver}
+import org.apache.hudi.common.table.view.HoodieTableFileSystemView
+import org.apache.hudi.exception.HoodieException
+import org.apache.hudi.hadoop.utils.HoodieRealtimeInputFormatUtils
+import
org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils.getMaxCompactionMemoryInBytes
+
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.mapred.JobConf
+import org.apache.spark.internal.Logging
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.execution.datasources.PartitionedFile
+import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
+import org.apache.spark.sql.{Row, SQLContext}
+import org.apache.spark.sql.sources.{BaseRelation, TableScan}
+import org.apache.spark.sql.types.StructType
+
+import scala.collection.JavaConverters._
+
+case class HudiMergeOnReadFileSplit(dataFile: PartitionedFile,
+ logPaths: Option[List[String]],
+ latestCommit: String,
+ tablePath: String,
+ maxCompactionMemoryInBytes: Long,
+ skipMerge: Boolean)
+
+class SnapshotRelation (val sqlContext: SQLContext,
+ val optParams: Map[String, String],
+ val userSchema: StructType,
+ val globPaths: Seq[Path],
+ val metaClient: HoodieTableMetaClient)
+ extends BaseRelation with TableScan with Logging{
+
+ private val conf = sqlContext.sparkContext.hadoopConfiguration
+
+ // use schema from latest metadata, if not present, read schema from the
data file
+ private val latestSchema = {
+ val schemaUtil = new TableSchemaResolver(metaClient)
+ val tableSchema =
HoodieAvroUtils.createHoodieWriteSchema(schemaUtil.getTableAvroSchemaWithoutMetadataFields)
+ AvroConversionUtils.convertAvroSchemaToStructType(tableSchema)
+ }
+
+ private val skipMerge = optParams.getOrElse(
+ DataSourceReadOptions.REALTIME_SKIP_MERGE_KEY,
+ DataSourceReadOptions.DEFAULT_REALTIME_SKIP_MERGE_VAL).toBoolean
+ private val maxCompactionMemoryInBytes = getMaxCompactionMemoryInBytes(new
JobConf(conf))
+ private val fileIndex = buildFileIndex()
+
+ override def schema: StructType = latestSchema
+
+ override def needConversion: Boolean = false
+
+ override def buildScan(): RDD[Row] = {
+ val parquetReaderFunction = new
ParquetFileFormat().buildReaderWithPartitionValues(
+ sparkSession = sqlContext.sparkSession,
+ dataSchema = latestSchema,
+ partitionSchema = StructType(Nil),
+ requiredSchema = latestSchema,
Review comment:
We may want to support `Column Pruning` here. @garyli1019 atleast for
the more straight forward parts like reading:
- Reading `base parquet files` only without `log files to be merged`
- `Unmerge` reading logic
It should be possible to push the user request columns/schema down without
complications. You can possibly introduce another `parquetReaderFunction` which
passes the requested schema down and use it for the above defined cases.
For the case where merging is required I agree it may be more complicated.
What if the user does not even request reading a column which has been updated
in the log file ? Merging in such cases may not even be required.
##########
File path: hudi-spark/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala
##########
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi
+
+import org.apache.hudi.common.config.SerializableConfiguration
+import org.apache.hudi.common.fs.FSUtils
+import org.apache.hudi.common.table.log.{HoodieMergedLogRecordScanner,
LogReaderUtils}
+import org.apache.hudi.exception.HoodieException
+import org.apache.hudi.hadoop.config.HoodieRealtimeConfig
+import
org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.HOODIE_RECORD_KEY_COL_POS
+
+import org.apache.avro.Schema
+import org.apache.avro.generic.{GenericRecord, IndexedRecord}
+import org.apache.hadoop.conf.Configuration
+import org.apache.spark.broadcast.Broadcast
+import org.apache.spark.{Partition, SparkContext, TaskContext}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.avro.{AvroDeserializer, AvroSerializer,
SchemaConverters}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.execution.datasources.PartitionedFile
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.vectorized.ColumnarBatch
+
+import scala.collection.JavaConverters._
+import scala.util.Try
+
+case class HoodieMergeOnReadPartition(index: Int, split:
HoodieMergeOnReadFileSplit) extends Partition
+
+class HoodieMergeOnReadRDD(sc: SparkContext,
+ broadcastedConf:
Broadcast[SerializableConfiguration],
+ baseFileReadFunction: PartitionedFile =>
Iterator[Any],
+ dataSchema: StructType,
+ hoodieRealtimeFileSplits:
List[HoodieMergeOnReadFileSplit])
+ extends RDD[InternalRow](sc, Nil) {
+
+ // Broadcast the hadoop Configuration to executors.
+ def this(sc: SparkContext,
+ config: Configuration,
+ dataReadFunction: PartitionedFile => Iterator[Any],
+ dataSchema: StructType,
+ hoodieRealtimeFileSplits: List[HoodieMergeOnReadFileSplit]) = {
+ this(
+ sc,
+ sc.broadcast(new SerializableConfiguration(config))
+ .asInstanceOf[Broadcast[SerializableConfiguration]],
+ dataReadFunction,
+ dataSchema,
+ hoodieRealtimeFileSplits)
+ }
+
+ override def compute(split: Partition, context: TaskContext):
Iterator[InternalRow] = {
+ val mergeParquetPartition = split.asInstanceOf[HoodieMergeOnReadPartition]
+ val baseFileIterator = read(mergeParquetPartition.split.dataFile,
baseFileReadFunction)
+ mergeParquetPartition.split match {
+ case dataFileOnlySplit if dataFileOnlySplit.logPaths.isEmpty =>
+ baseFileIterator
+ case unMergeSplit if unMergeSplit.payload
+ .equals(DataSourceReadOptions.DEFAULT_MERGE_ON_READ_PAYLOAD_VAL) =>
+ unMergeFileIterator(unMergeSplit, baseFileIterator)
+ case mergeSplit if !mergeSplit.payload.isEmpty =>
+ mergeFileIterator(mergeSplit, baseFileIterator)
+ case _ => throw new HoodieException(s"Unable to select an Iterator to
read the Hoodie MOR File Split for " +
+ s"file path: ${mergeParquetPartition.split.dataFile.filePath}" +
+ s"log paths: ${mergeParquetPartition.split.logPaths.toString}" +
+ s"hoodie table path: ${mergeParquetPartition.split.tablePath}" +
+ s"spark partition Index: ${mergeParquetPartition.index}")
+ }
+ }
+
+ override protected def getPartitions: Array[Partition] = {
+ hoodieRealtimeFileSplits.zipWithIndex.map(file =>
HoodieMergeOnReadPartition(file._2, file._1)).toArray
+ }
+
+ private def getConfig(): Configuration = {
+ broadcastedConf.value.get()
+ }
+
+ private def read(partitionedFile: PartitionedFile,
+ readFileFunction: PartitionedFile => Iterator[Any]):
Iterator[InternalRow] = {
+ val fileIterator = readFileFunction(partitionedFile)
+ val rows = fileIterator.flatMap(_ match {
+ case r: InternalRow => Seq(r)
+ case b: ColumnarBatch => b.rowIterator().asScala
+ })
+ rows
+ }
+
+ private def unMergeFileIterator(split: HoodieMergeOnReadFileSplit,
+ baseFileIterator: Iterator[InternalRow]):
Iterator[InternalRow] =
+ new Iterator[InternalRow] {
+ private val logSchema = getLogAvroSchema(split)
+ private val sparkTypes =
SchemaConverters.toSqlType(logSchema).dataType.asInstanceOf[StructType]
+ private val converter = new AvroDeserializer(logSchema, sparkTypes)
+ private val logRecords = scanLog(split, logSchema).getRecords
+ private val logRecordsIterator = logRecords.keySet().iterator().asScala
+
+ override def hasNext: Boolean = {
+ baseFileIterator.hasNext || logRecordsIterator.hasNext
+ }
+
+ override def next(): InternalRow = {
+ if (baseFileIterator.hasNext) {
+ baseFileIterator.next()
+ } else {
+ val curAvrokey = logRecordsIterator.next()
+ val curAvroRecord =
logRecords.get(curAvrokey).getData.getInsertValue(logSchema).get()
+ converter.deserialize(curAvroRecord).asInstanceOf[InternalRow]
+ }
+ }
+ }
+
+ private def mergeFileIterator(split: HoodieMergeOnReadFileSplit,
+ baseFileIterator: Iterator[InternalRow]):
Iterator[InternalRow] =
+ new Iterator[InternalRow] {
+ private val avroSchema = getLogAvroSchema(split)
+ private val sparkSchema =
SchemaConverters.toSqlType(avroSchema).dataType.asInstanceOf[StructType]
+ private val avroToRowConverter = new AvroDeserializer(avroSchema,
sparkSchema)
+ private val rowToAvroConverter = new AvroSerializer(sparkSchema,
avroSchema, false)
+ private val logRecords = scanLog(split, avroSchema).getRecords
+ private val logRecordToRead = logRecords.keySet()
+
+ private var baseFileFinished = false
+ private var logRecordsIterator: Iterator[String] = _
+ private var recordToLoad: InternalRow = _
+
+ @scala.annotation.tailrec
+ override def hasNext: Boolean = {
+ if (baseFileIterator.hasNext) {
Review comment:
- Instead of looping over `baseFileIterator` and performing the check
whether that key exists in `logRecords`, will it be more efficient to do it the
other way round. Loop over `logRecords` and perform merge. In the end append
all the remaining base file rows.
- Also is this a good practice to perform the actual fetching in `hasNext`
function, instead of `next` ?
##########
File path: hudi-spark/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala
##########
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi
+
+import org.apache.hudi.common.config.SerializableConfiguration
+import org.apache.hudi.common.fs.FSUtils
+import org.apache.hudi.common.table.log.{HoodieMergedLogRecordScanner,
LogReaderUtils}
+import org.apache.hudi.exception.HoodieException
+import org.apache.hudi.hadoop.config.HoodieRealtimeConfig
+import
org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.HOODIE_RECORD_KEY_COL_POS
+
+import org.apache.avro.Schema
+import org.apache.avro.generic.{GenericRecord, IndexedRecord}
+import org.apache.hadoop.conf.Configuration
+import org.apache.spark.broadcast.Broadcast
+import org.apache.spark.{Partition, SparkContext, TaskContext}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.avro.{AvroDeserializer, AvroSerializer,
SchemaConverters}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.execution.datasources.PartitionedFile
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.vectorized.ColumnarBatch
+
+import scala.collection.JavaConverters._
+import scala.util.Try
+
+case class HoodieMergeOnReadPartition(index: Int, split:
HoodieMergeOnReadFileSplit) extends Partition
+
+class HoodieMergeOnReadRDD(sc: SparkContext,
+ broadcastedConf:
Broadcast[SerializableConfiguration],
+ baseFileReadFunction: PartitionedFile =>
Iterator[Any],
+ dataSchema: StructType,
+ hoodieRealtimeFileSplits:
List[HoodieMergeOnReadFileSplit])
+ extends RDD[InternalRow](sc, Nil) {
+
+ // Broadcast the hadoop Configuration to executors.
+ def this(sc: SparkContext,
+ config: Configuration,
+ dataReadFunction: PartitionedFile => Iterator[Any],
+ dataSchema: StructType,
+ hoodieRealtimeFileSplits: List[HoodieMergeOnReadFileSplit]) = {
+ this(
+ sc,
+ sc.broadcast(new SerializableConfiguration(config))
+ .asInstanceOf[Broadcast[SerializableConfiguration]],
+ dataReadFunction,
+ dataSchema,
+ hoodieRealtimeFileSplits)
+ }
+
+ override def compute(split: Partition, context: TaskContext):
Iterator[InternalRow] = {
+ val mergeParquetPartition = split.asInstanceOf[HoodieMergeOnReadPartition]
+ val baseFileIterator = read(mergeParquetPartition.split.dataFile,
baseFileReadFunction)
+ mergeParquetPartition.split match {
+ case dataFileOnlySplit if dataFileOnlySplit.logPaths.isEmpty =>
+ baseFileIterator
+ case unMergeSplit if unMergeSplit.payload
+ .equals(DataSourceReadOptions.DEFAULT_MERGE_ON_READ_PAYLOAD_VAL) =>
+ unMergeFileIterator(unMergeSplit, baseFileIterator)
+ case mergeSplit if !mergeSplit.payload.isEmpty =>
+ mergeFileIterator(mergeSplit, baseFileIterator)
+ case _ => throw new HoodieException(s"Unable to select an Iterator to
read the Hoodie MOR File Split for " +
+ s"file path: ${mergeParquetPartition.split.dataFile.filePath}" +
+ s"log paths: ${mergeParquetPartition.split.logPaths.toString}" +
+ s"hoodie table path: ${mergeParquetPartition.split.tablePath}" +
+ s"spark partition Index: ${mergeParquetPartition.index}")
+ }
+ }
+
+ override protected def getPartitions: Array[Partition] = {
+ hoodieRealtimeFileSplits.zipWithIndex.map(file =>
HoodieMergeOnReadPartition(file._2, file._1)).toArray
+ }
+
+ private def getConfig(): Configuration = {
+ broadcastedConf.value.get()
+ }
+
+ private def read(partitionedFile: PartitionedFile,
+ readFileFunction: PartitionedFile => Iterator[Any]):
Iterator[InternalRow] = {
+ val fileIterator = readFileFunction(partitionedFile)
+ val rows = fileIterator.flatMap(_ match {
+ case r: InternalRow => Seq(r)
+ case b: ColumnarBatch => b.rowIterator().asScala
+ })
+ rows
+ }
+
+ private def unMergeFileIterator(split: HoodieMergeOnReadFileSplit,
+ baseFileIterator: Iterator[InternalRow]):
Iterator[InternalRow] =
+ new Iterator[InternalRow] {
+ private val logSchema = getLogAvroSchema(split)
+ private val sparkTypes =
SchemaConverters.toSqlType(logSchema).dataType.asInstanceOf[StructType]
+ private val converter = new AvroDeserializer(logSchema, sparkTypes)
Review comment:
@vinothchandar This seems like something we can consider using at other
places in Hudi code like `AvroConversionHelper` to convert Avro Records to
Rows, instead of maintaining the conversion code in-house.
##########
File path: hudi-spark/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala
##########
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi
+
+import org.apache.hudi.common.config.SerializableConfiguration
+import org.apache.hudi.common.fs.FSUtils
+import org.apache.hudi.common.table.log.{HoodieMergedLogRecordScanner,
LogReaderUtils}
+import org.apache.hudi.exception.HoodieException
+import org.apache.hudi.hadoop.config.HoodieRealtimeConfig
+import
org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.HOODIE_RECORD_KEY_COL_POS
+
+import org.apache.avro.Schema
+import org.apache.avro.generic.{GenericRecord, IndexedRecord}
+import org.apache.hadoop.conf.Configuration
+import org.apache.spark.broadcast.Broadcast
+import org.apache.spark.{Partition, SparkContext, TaskContext}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.avro.{AvroDeserializer, AvroSerializer,
SchemaConverters}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.execution.datasources.PartitionedFile
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.vectorized.ColumnarBatch
+
+import scala.collection.JavaConverters._
+import scala.util.Try
+
+case class HoodieMergeOnReadPartition(index: Int, split:
HoodieMergeOnReadFileSplit) extends Partition
+
+class HoodieMergeOnReadRDD(sc: SparkContext,
+ broadcastedConf:
Broadcast[SerializableConfiguration],
+ baseFileReadFunction: PartitionedFile =>
Iterator[Any],
+ dataSchema: StructType,
+ hoodieRealtimeFileSplits:
List[HoodieMergeOnReadFileSplit])
+ extends RDD[InternalRow](sc, Nil) {
+
+ // Broadcast the hadoop Configuration to executors.
+ def this(sc: SparkContext,
+ config: Configuration,
+ dataReadFunction: PartitionedFile => Iterator[Any],
+ dataSchema: StructType,
+ hoodieRealtimeFileSplits: List[HoodieMergeOnReadFileSplit]) = {
+ this(
+ sc,
+ sc.broadcast(new SerializableConfiguration(config))
+ .asInstanceOf[Broadcast[SerializableConfiguration]],
Review comment:
Does this need explicit casting ?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]