garyli1019 commented on a change in pull request #1722:
URL: https://github.com/apache/hudi/pull/1722#discussion_r444623018
##########
File path: hudi-spark/src/main/scala/org/apache/hudi/DataSourceOptions.scala
##########
@@ -48,7 +49,7 @@ object DataSourceReadOptions {
val QUERY_TYPE_SNAPSHOT_OPT_VAL = "snapshot"
val QUERY_TYPE_READ_OPTIMIZED_OPT_VAL = "read_optimized"
val QUERY_TYPE_INCREMENTAL_OPT_VAL = "incremental"
- val DEFAULT_QUERY_TYPE_OPT_VAL: String = QUERY_TYPE_SNAPSHOT_OPT_VAL
+ val DEFAULT_QUERY_TYPE_OPT_VAL: String = QUERY_TYPE_READ_OPTIMIZED_OPT_VAL
Review comment:
This is like incremental relation. We need the base path to get the hudi
table view. We need a smarter path handler to handle the glob path and udit's
pr have this.
##########
File path:
hudi-spark/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieRealtimeInputFormat.scala
##########
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.parquet
+
+import org.apache.hudi.hadoop.realtime.HoodieRealtimeFileSplit
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.mapred.{FileSplit, JobConf}
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
+import org.apache.hadoop.mapreduce.{JobID, TaskAttemptID, TaskID, TaskType}
+import org.apache.parquet.filter2.compat.FilterCompat
+import org.apache.parquet.filter2.predicate.FilterApi
+import
org.apache.parquet.format.converter.ParquetMetadataConverter.SKIP_ROW_GROUPS
+import org.apache.parquet.hadoop.{ParquetFileReader, ParquetInputFormat,
ParquetRecordReader}
+import org.apache.spark.TaskContext
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.InternalRow
+import
org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
+import org.apache.spark.sql.catalyst.expressions.{JoinedRow, UnsafeRow}
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
+import org.apache.spark.sql.execution.datasources.PartitionedFile
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.sources.Filter
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.util.SerializableConfiguration
+
+import java.net.URI
+import scala.collection.JavaConverters._
+
+/**
+ * This class is an extension of ParquetFileFormat from Spark SQL.
+ * The file split, record reader, record reader iterator are customized to
read Hudi MOR table.
+ */
+class HoodieRealtimeInputFormat extends ParquetFileFormat {
+ //TODO: Better usage of this short name.
+ override def shortName(): String = "hudi.snapshot"
+ override def toString(): String = "hudi.snapshot"
+
+ override def buildReaderWithPartitionValues(sparkSession: SparkSession,
+ dataSchema: StructType,
+ partitionSchema: StructType,
+ requiredSchema: StructType,
+ filters: Seq[Filter], options:
Map[String, String],
+ hadoopConf: Configuration):
(PartitionedFile) =>
+ Iterator[InternalRow] = {
+ hadoopConf.set(ParquetInputFormat.READ_SUPPORT_CLASS,
classOf[ParquetReadSupport].getName)
+ hadoopConf.set(ParquetReadSupport.SPARK_ROW_REQUESTED_SCHEMA,
requiredSchema.json)
+ hadoopConf.set(ParquetWriteSupport.SPARK_ROW_SCHEMA, requiredSchema.json)
+ hadoopConf.set(SQLConf.SESSION_LOCAL_TIMEZONE.key,
+ sparkSession.sessionState.conf.sessionLocalTimeZone)
+ hadoopConf.setBoolean(SQLConf.CASE_SENSITIVE.key,
+ sparkSession.sessionState.conf.caseSensitiveAnalysis)
+
+ ParquetWriteSupport.setSchema(requiredSchema, hadoopConf)
+
+ // Sets flags for `ParquetToSparkSchemaConverter`
+ hadoopConf.setBoolean(
+ SQLConf.PARQUET_BINARY_AS_STRING.key,
+ sparkSession.sessionState.conf.isParquetBinaryAsString)
+ hadoopConf.setBoolean(
+ SQLConf.PARQUET_INT96_AS_TIMESTAMP.key,
+ sparkSession.sessionState.conf.isParquetINT96AsTimestamp)
+
+ val broadcastedHadoopConf =
+ sparkSession.sparkContext.broadcast(new
SerializableConfiguration(hadoopConf))
+
+ // TODO: if you move this into the closure it reverts to the default
values.
+ // If true, enable using the custom RecordReader for parquet. This only
works for
+ // a subset of the types (no complex types).
+ //val resultSchema = StructType(partitionSchema.fields ++
requiredSchema.fields)
+ val sqlConf = sparkSession.sessionState.conf
+ val enableRecordFilter: Boolean = sqlConf.parquetRecordFilterEnabled
+ val timestampConversion: Boolean =
sqlConf.isParquetINT96TimestampConversion
+ val enableParquetFilterPushDown: Boolean = sqlConf.parquetFilterPushDown
+ // Whole stage codegen (PhysicalRDD) is able to deal with batches directly
+ //val returningBatch = supportBatch(sparkSession, resultSchema)
+ val pushDownDate = sqlConf.parquetFilterPushDownDate
+ val pushDownTimestamp = sqlConf.parquetFilterPushDownTimestamp
+ val pushDownDecimal = sqlConf.parquetFilterPushDownDecimal
+ val pushDownStringStartWith = sqlConf.parquetFilterPushDownStringStartWith
+ val pushDownInFilterThreshold =
sqlConf.parquetFilterPushDownInFilterThreshold
+ val isCaseSensitive = sqlConf.caseSensitiveAnalysis
+
+ (file: PartitionedFile) => {
+ assert(file.partitionValues.numFields == partitionSchema.size)
+
+ val sharedConf = broadcastedHadoopConf.value.value
+ val fileSplit =
+ new FileSplit(new Path(new URI(file.filePath)), file.start,
file.length, new Array[String](0))
Review comment:
Could be only a portion of a file. From spark description `A part (i.e.
"block") of a single file that should be read, along with partition column
values that need to be prepended to each row.`
##########
File path:
hudi-spark/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieParquetRecordReaderIterator.scala
##########
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.parquet
Review comment:
correct
##########
File path: hudi-spark/src/main/scala/org/apache/hudi/DataSourceOptions.scala
##########
@@ -65,7 +66,7 @@ object DataSourceReadOptions {
* This eases migration from old configs to new configs.
*/
def translateViewTypesToQueryTypes(optParams: Map[String, String]) :
Map[String, String] = {
- val translation = Map(VIEW_TYPE_READ_OPTIMIZED_OPT_VAL ->
QUERY_TYPE_SNAPSHOT_OPT_VAL,
+ val translation = Map(VIEW_TYPE_READ_OPTIMIZED_OPT_VAL ->
QUERY_TYPE_READ_OPTIMIZED_OPT_VAL,
Review comment:
I got confused by the naming sometimes...
So for COW table, snapshot view = read optimized view
for MOR, snapshot view and read optimized view are different things.
With bootstrap, we will have one more view.
Can we call `read optimized view -> parquet only(including bootstrap)`
`snapshot view -> parquet(with bootstrap) merge with log` regardless of table
type?
##########
File path:
hudi-spark/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieParquetRecordReaderIterator.scala
##########
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.parquet
+
+import org.apache.hadoop.mapred.JobConf
+import org.apache.hudi.common.fs.FSUtils
+import org.apache.hudi.common.table.log.{HoodieMergedLogRecordScanner,
LogReaderUtils}
+import org.apache.hudi.hadoop.config.HoodieRealtimeConfig
+import org.apache.hudi.hadoop.realtime.HoodieRealtimeFileSplit
+import org.apache.parquet.hadoop.ParquetRecordReader
+
+import org.apache.avro.Schema
+import
org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.HOODIE_RECORD_KEY_COL_POS
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.avro.{AvroDeserializer, SchemaConverters}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{UnsafeProjection, UnsafeRow}
+import org.apache.spark.sql.types.StructType
+import java.io.Closeable
+import java.util
+
+import scala.util.Try
+
+/**
+ * This class is the iterator for Hudi MOR table.
+ * Log files are scanned on initialization.
+ * This iterator will read the parquet file first and skip the record if it
present in the log file.
+ * Then read the log file.
+ * Custom payload is not supported yet. This combining logic is matching with
[OverwriteWithLatestAvroPayload]
+ * @param rowReader HoodieRealtimeParquetRecordReader
+ */
+class HoodieParquetRecordReaderIterator(private[this] var rowReader:
ParquetRecordReader[UnsafeRow],
+ private[this] val split:
HoodieRealtimeFileSplit,
+ private[this] val jobConf: JobConf)
extends Iterator[UnsafeRow] with Closeable with Logging {
+ private[this] var havePair = false
+ private[this] var finished = false
+ private[this] var parquetFinished = false
+
+ private[this] var deltaRecordMap:
util.Map[String,org.apache.hudi.common.model
+ .HoodieRecord[_ <: org.apache.hudi.common.model.HoodieRecordPayload[_ <:
org.apache.hudi.common.model.HoodieRecordPayload[_ <: AnyRef]]]] = _
Review comment:
good catch
##########
File path: hudi-spark/src/main/scala/org/apache/hudi/SnapshotRelation.scala
##########
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi
+
+import org.apache.hudi.avro.HoodieAvroUtils
+import org.apache.hudi.common.table.{HoodieTableMetaClient,
TableSchemaResolver}
+import org.apache.hudi.config.HoodieWriteConfig
+import org.apache.hudi.hadoop.{HoodieParquetInputFormat,
HoodieROTablePathFilter}
+import org.apache.hudi.hadoop.utils.HoodieRealtimeInputFormatUtils
+import org.apache.hudi.exception.HoodieException
+import org.apache.hudi.table.HoodieTable
+
+import org.apache.hadoop.mapred.JobConf
+import org.apache.log4j.LogManager
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.{Row, SQLContext}
+import org.apache.spark.sql.sources.{BaseRelation, TableScan}
+import org.apache.spark.sql.types.StructType
+
+import java.util
+import scala.collection.JavaConverters._
+
+/**
+ * This is the Spark DataSourceV1 relation to read Hudi MOR table.
+ * @param sqlContext
+ * @param basePath
+ * @param optParams
+ * @param userSchema
+ */
+class SnapshotRelation(val sqlContext: SQLContext,
+ val basePath: String,
+ val optParams: Map[String, String],
+ val userSchema: StructType) extends BaseRelation with
TableScan {
+
+ private val log = LogManager.getLogger(classOf[SnapshotRelation])
+ private val conf = sqlContext.sparkContext.hadoopConfiguration
+
+ // Set config for listStatus() in HoodieParquetInputFormat
+ // TODO(garyli): Switch to bootstrap file listing methods
+ conf.setClass(
+ "mapreduce.input.pathFilter.class",
+ classOf[HoodieROTablePathFilter],
+ classOf[org.apache.hadoop.fs.PathFilter])
+ conf.setStrings("mapreduce.input.fileinputformat.inputdir", basePath)
+ conf.setStrings("mapreduce.input.fileinputformat.input.dir.recursive",
"true")
+
+ private val HoodieInputFormat = new HoodieParquetInputFormat
+ HoodieInputFormat.setConf(conf)
+ private val fileStatus = HoodieInputFormat.listStatus(new JobConf(conf))
+ private val fileGroup =
HoodieRealtimeInputFormatUtils.groupLogsByBaseFile(conf,
util.Arrays.stream(fileStatus)).asScala
+
+ // Split the file group to: parquet file without a matching log file,
parquet file need to merge with log files
+ private val parquetWithoutLogPaths: List[String] = fileGroup.filter(p =>
p._2.size() == 0).keys.toList
+ private val fileWithLogMap: Map[String, String] = fileGroup.filter(p =>
p._2.size() > 0).map{ case(k, v) => (k, v.asScala.toList.mkString(","))}.toMap
+
+ if (log.isDebugEnabled) {
+ log.debug("All parquet files" + fileStatus.map(s =>
s.getPath.toString).mkString(","))
+ log.debug("ParquetWithoutLogPaths" + parquetWithoutLogPaths.mkString(","))
+ log.debug("ParquetWithLogPaths" + fileWithLogMap.map(m =>
s"${m._1}:${m._2}").mkString(","))
+ }
+
+ // Add log file map to options
+ private val finalOps = optParams ++ fileWithLogMap
+
+ // Load Hudi metadata
+ val metaClient = new HoodieTableMetaClient(conf, basePath, true)
+ private val hoodieTable = HoodieTable.create(metaClient,
HoodieWriteConfig.newBuilder().withPath(basePath).build(), conf)
+
+ private val commitTimeline =
hoodieTable.getMetaClient.getCommitsAndCompactionTimeline
+ if (commitTimeline.empty()) {
Review comment:
good point.
##########
File path:
hudi-spark/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieParquetRecordReaderIterator.scala
##########
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.parquet
+
+import org.apache.hadoop.mapred.JobConf
+import org.apache.hudi.common.fs.FSUtils
+import org.apache.hudi.common.table.log.{HoodieMergedLogRecordScanner,
LogReaderUtils}
+import org.apache.hudi.hadoop.config.HoodieRealtimeConfig
+import org.apache.hudi.hadoop.realtime.HoodieRealtimeFileSplit
+import org.apache.parquet.hadoop.ParquetRecordReader
+
+import org.apache.avro.Schema
+import
org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.HOODIE_RECORD_KEY_COL_POS
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.avro.{AvroDeserializer, SchemaConverters}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{UnsafeProjection, UnsafeRow}
+import org.apache.spark.sql.types.StructType
+import java.io.Closeable
+import java.util
+
+import scala.util.Try
+
+/**
+ * This class is the iterator for Hudi MOR table.
+ * Log files are scanned on initialization.
+ * This iterator will read the parquet file first and skip the record if it
present in the log file.
+ * Then read the log file.
+ * Custom payload is not supported yet. This combining logic is matching with
[OverwriteWithLatestAvroPayload]
+ * @param rowReader HoodieRealtimeParquetRecordReader
+ */
+class HoodieParquetRecordReaderIterator(private[this] var rowReader:
ParquetRecordReader[UnsafeRow],
+ private[this] val split:
HoodieRealtimeFileSplit,
+ private[this] val jobConf: JobConf)
extends Iterator[UnsafeRow] with Closeable with Logging {
+ private[this] var havePair = false
+ private[this] var finished = false
+ private[this] var parquetFinished = false
+
+ private[this] var deltaRecordMap:
util.Map[String,org.apache.hudi.common.model
+ .HoodieRecord[_ <: org.apache.hudi.common.model.HoodieRecordPayload[_ <:
org.apache.hudi.common.model.HoodieRecordPayload[_ <: AnyRef]]]] = _
+ private[this] var deltaRecordKeys: util.Set[String] = _
+ private[this] var deltaIter: util.Iterator[String] = _
+ private[this] var avroSchema: Schema = _
+ private[this] var sparkTypes: StructType = _
+ private[this] var converter: AvroDeserializer = _
+
+ // SPARK-23457 Register a task completion lister before `initialization`.
Review comment:
yea, doesn't really make sense to put it here
##########
File path:
hudi-spark/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieParquetRecordReaderIterator.scala
##########
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.parquet
+
+import org.apache.avro.Schema
+import
org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.HOODIE_RECORD_KEY_COL_POS
+import org.apache.hudi.realtime.HoodieRealtimeParquetRecordReader
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.avro.{AvroDeserializer, SchemaConverters}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{UnsafeProjection, UnsafeRow}
+import org.apache.spark.sql.types.StructType
+import java.io.Closeable
+import java.util
+
+/**
+ * This class is the iterator for Hudi MOR table.
+ * This iterator will read the parquet file first and skip the record if it
present in the log file.
+ * Then read the log file.
+ * Custom payload is not supported yet. This combining logic is matching with
[OverwriteWithLatestAvroPayload]
Review comment:
This goes back to the MapReduce v1 vs v2 API. Hive use V1 and Spark use
V2. Spark also warp V2(from parquet) into its own record reader. I couldn't
find a way to make Hive and Spark share one record reader. The vectorized
reader would be another story as well.
##########
File path: hudi-spark/src/main/scala/org/apache/hudi/SnapshotRelation.scala
##########
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi
+
+import org.apache.hudi.avro.HoodieAvroUtils
+import org.apache.hudi.common.table.{HoodieTableMetaClient,
TableSchemaResolver}
+import org.apache.hudi.config.HoodieWriteConfig
+import org.apache.hudi.hadoop.{HoodieParquetInputFormat,
HoodieROTablePathFilter}
+import org.apache.hudi.hadoop.utils.HoodieRealtimeInputFormatUtils
+import org.apache.hudi.exception.HoodieException
+import org.apache.hudi.table.HoodieTable
+
+import org.apache.hadoop.mapred.JobConf
+import org.apache.log4j.LogManager
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.{Row, SQLContext}
+import org.apache.spark.sql.sources.{BaseRelation, TableScan}
+import org.apache.spark.sql.types.StructType
+
+import java.util
+import scala.collection.JavaConverters._
+
+/**
+ * This is the Spark DataSourceV1 relation to read Hudi MOR table.
+ * @param sqlContext
+ * @param basePath
+ * @param optParams
+ * @param userSchema
+ */
+class SnapshotRelation(val sqlContext: SQLContext,
+ val basePath: String,
+ val optParams: Map[String, String],
+ val userSchema: StructType) extends BaseRelation with
TableScan {
+
+ private val log = LogManager.getLogger(classOf[SnapshotRelation])
+ private val conf = sqlContext.sparkContext.hadoopConfiguration
+
+ // Set config for listStatus() in HoodieParquetInputFormat
+ // TODO(garyli): Switch to bootstrap file listing methods
+ conf.setClass(
+ "mapreduce.input.pathFilter.class",
+ classOf[HoodieROTablePathFilter],
+ classOf[org.apache.hadoop.fs.PathFilter])
+ conf.setStrings("mapreduce.input.fileinputformat.inputdir", basePath)
+ conf.setStrings("mapreduce.input.fileinputformat.input.dir.recursive",
"true")
+
+ private val HoodieInputFormat = new HoodieParquetInputFormat
+ HoodieInputFormat.setConf(conf)
+ private val fileStatus = HoodieInputFormat.listStatus(new JobConf(conf))
+ private val fileGroup =
HoodieRealtimeInputFormatUtils.groupLogsByBaseFile(conf,
util.Arrays.stream(fileStatus)).asScala
+
+ // Split the file group to: parquet file without a matching log file,
parquet file need to merge with log files
+ private val parquetWithoutLogPaths: List[String] = fileGroup.filter(p =>
p._2.size() == 0).keys.toList
+ private val fileWithLogMap: Map[String, String] = fileGroup.filter(p =>
p._2.size() > 0).map{ case(k, v) => (k, v.asScala.toList.mkString(","))}.toMap
+
+ if (log.isDebugEnabled) {
+ log.debug("All parquet files" + fileStatus.map(s =>
s.getPath.toString).mkString(","))
+ log.debug("ParquetWithoutLogPaths" + parquetWithoutLogPaths.mkString(","))
+ log.debug("ParquetWithLogPaths" + fileWithLogMap.map(m =>
s"${m._1}:${m._2}").mkString(","))
+ }
+
+ // Add log file map to options
+ private val finalOps = optParams ++ fileWithLogMap
+
+ // Load Hudi metadata
+ val metaClient = new HoodieTableMetaClient(conf, basePath, true)
+ private val hoodieTable = HoodieTable.create(metaClient,
HoodieWriteConfig.newBuilder().withPath(basePath).build(), conf)
+
+ private val commitTimeline =
hoodieTable.getMetaClient.getCommitsAndCompactionTimeline
+ if (commitTimeline.empty()) {
+ throw new HoodieException("No Valid Hudi timeline exists")
+ }
+ private val completedCommitTimeline =
hoodieTable.getMetaClient.getCommitsTimeline.filterCompletedInstants()
+ private val lastInstant = completedCommitTimeline.lastInstant().get()
+ conf.setStrings("hoodie.realtime.last.commit", lastInstant.getTimestamp)
+
+ // use schema from latest metadata, if not present, read schema from the
data file
+ private val latestSchema = {
+ val schemaUtil = new TableSchemaResolver(metaClient)
+ val tableSchema =
HoodieAvroUtils.createHoodieWriteSchema(schemaUtil.getTableAvroSchemaWithoutMetadataFields);
+ AvroConversionUtils.convertAvroSchemaToStructType(tableSchema)
+ }
+
+ override def schema: StructType = latestSchema
+
+ override def buildScan(): RDD[Row] = {
+ if (fileWithLogMap.isEmpty) {
+ sqlContext
+ .read
+ .options(finalOps)
+ .schema(schema)
+ .format("parquet")
+ .load(parquetWithoutLogPaths:_*)
+ .toDF()
+ .rdd
Review comment:
remove `toDF()` worked as well. This will likely be optimized by Spark.
Forgot why I put it here. Probably following the incremental relation
implementation.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]