This is an automated email from the ASF dual-hosted git repository.
leesf pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 585ce00 [HUDI-1301] use spark INCREMENTAL mode query hudi dataset
support schema version. (#2125)
585ce00 is described below
commit 585ce0094d6527bab988f7657b4e84d12274ee28
Author: lw0090 <[email protected]>
AuthorDate: Sat Oct 10 20:53:41 2020 +0800
[HUDI-1301] use spark INCREMENTAL mode query hudi dataset support schema
version. (#2125)
---
.../hudi/common/table/TableSchemaResolver.java | 29 ++++++++++++++++++++--
.../scala/org/apache/hudi/DataSourceOptions.scala | 9 +++++++
.../org/apache/hudi/IncrementalRelation.scala | 23 +++++++++++------
3 files changed, 51 insertions(+), 10 deletions(-)
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java
index db68667..372b393 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java
@@ -175,20 +175,45 @@ public class TableSchemaResolver {
* @throws Exception
*/
public Schema getTableAvroSchemaWithoutMetadataFields() throws Exception {
- Option<Schema> schemaFromCommitMetadata =
getTableSchemaFromCommitMetadata(false);
+ HoodieTimeline timeline =
metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
+ Option<Schema> schemaFromCommitMetadata =
getTableSchemaFromCommitMetadata(timeline.lastInstant().get(), false);
return schemaFromCommitMetadata.isPresent() ?
schemaFromCommitMetadata.get() :
HoodieAvroUtils.removeMetadataFields(getTableAvroSchemaFromDataFile());
}
/**
+ * Gets users data schema for a hoodie table in Avro format of the instant.
+ *
+ * @param instant will get the instant data schema
+ * @return Avro user data schema
+ * @throws Exception
+ */
+ public Schema getTableAvroSchemaWithoutMetadataFields(HoodieInstant instant)
throws Exception {
+ Option<Schema> schemaFromCommitMetadata =
getTableSchemaFromCommitMetadata(instant, false);
+ return schemaFromCommitMetadata.isPresent() ?
schemaFromCommitMetadata.get() :
+ HoodieAvroUtils.removeMetadataFields(getTableAvroSchemaFromDataFile());
+ }
+
+ /**
* Gets the schema for a hoodie table in Avro format from the
HoodieCommitMetadata of the last commit.
*
* @return Avro schema for this table
*/
private Option<Schema> getTableSchemaFromCommitMetadata(boolean
includeMetadataFields) {
+ HoodieTimeline timeline =
metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
+ return getTableSchemaFromCommitMetadata(timeline.lastInstant().get(),
includeMetadataFields);
+ }
+
+
+ /**
+ * Gets the schema for a hoodie table in Avro format from the
HoodieCommitMetadata of the instant.
+ *
+ * @return Avro schema for this table
+ */
+ private Option<Schema> getTableSchemaFromCommitMetadata(HoodieInstant
instant, boolean includeMetadataFields) {
try {
HoodieTimeline timeline =
metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
- byte[] data =
timeline.getInstantDetails(timeline.lastInstant().get()).get();
+ byte[] data = timeline.getInstantDetails(instant).get();
HoodieCommitMetadata metadata = HoodieCommitMetadata.fromBytes(data,
HoodieCommitMetadata.class);
String existingSchemaStr =
metadata.getMetadata(HoodieCommitMetadata.SCHEMA_KEY);
diff --git a/hudi-spark/src/main/scala/org/apache/hudi/DataSourceOptions.scala
b/hudi-spark/src/main/scala/org/apache/hudi/DataSourceOptions.scala
index e153e92..d66fa96 100644
--- a/hudi-spark/src/main/scala/org/apache/hudi/DataSourceOptions.scala
+++ b/hudi-spark/src/main/scala/org/apache/hudi/DataSourceOptions.scala
@@ -109,6 +109,15 @@ object DataSourceReadOptions {
val END_INSTANTTIME_OPT_KEY = "hoodie.datasource.read.end.instanttime"
/**
+ * If use the end instant schema when incrementally fetched data to.
+ *
+ * Default: false (use latest instant schema)
+ *
+ */
+ val INCREMENTAL_READ_SCHEMA_USE_END_INSTANTTIME_OPT_KEY =
"hoodie.datasource.read.schema.use.end.instanttime"
+ val DEFAULT_INCREMENTAL_READ_SCHEMA_USE_END_INSTANTTIME_OPT_VAL = "false"
+
+ /**
* For use-cases like DeltaStreamer which reads from Hoodie Incremental
table and applies opaque map functions,
* filters appearing late in the sequence of transformations cannot be
automatically pushed down.
* This option allows setting filters directly on Hoodie Source
diff --git
a/hudi-spark/src/main/scala/org/apache/hudi/IncrementalRelation.scala
b/hudi-spark/src/main/scala/org/apache/hudi/IncrementalRelation.scala
index e113d4a..ff68ef0 100644
--- a/hudi-spark/src/main/scala/org/apache/hudi/IncrementalRelation.scala
+++ b/hudi-spark/src/main/scala/org/apache/hudi/IncrementalRelation.scala
@@ -55,7 +55,6 @@ class IncrementalRelation(val sqlContext: SQLContext,
private val log = LogManager.getLogger(classOf[IncrementalRelation])
-
val skeletonSchema: StructType = HoodieSparkUtils.getMetaSchema
private val metaClient = new
HoodieTableMetaClient(sqlContext.sparkContext.hadoopConfiguration, basePath,
true)
@@ -76,6 +75,9 @@ class IncrementalRelation(val sqlContext: SQLContext,
s"option ${DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY}")
}
+ val useEndInstantSchema =
optParams.getOrElse(DataSourceReadOptions.INCREMENTAL_READ_SCHEMA_USE_END_INSTANTTIME_OPT_KEY,
+
DataSourceReadOptions.DEFAULT_INCREMENTAL_READ_SCHEMA_USE_END_INSTANTTIME_OPT_VAL).toBoolean
+
private val lastInstant = commitTimeline.lastInstant().get()
private val commitsToReturn = commitTimeline.findInstantsInRange(
@@ -83,11 +85,16 @@ class IncrementalRelation(val sqlContext: SQLContext,
optParams.getOrElse(DataSourceReadOptions.END_INSTANTTIME_OPT_KEY,
lastInstant.getTimestamp))
.getInstants.iterator().toList
- // use schema from a file produced in the latest instant
- val latestSchema: StructType = {
+ // use schema from a file produced in the end/latest instant
+ val usedSchema: StructType = {
log.info("Inferring schema..")
val schemaResolver = new TableSchemaResolver(metaClient)
- val tableSchema = schemaResolver.getTableAvroSchemaWithoutMetadataFields
+ val tableSchema = if (useEndInstantSchema) {
+ if (commitsToReturn.isEmpty)
schemaResolver.getTableAvroSchemaWithoutMetadataFields() else
+
schemaResolver.getTableAvroSchemaWithoutMetadataFields(commitsToReturn.last)
+ } else {
+ schemaResolver.getTableAvroSchemaWithoutMetadataFields()
+ }
val dataSchema =
AvroConversionUtils.convertAvroSchemaToStructType(tableSchema)
StructType(skeletonSchema.fields ++ dataSchema.fields)
}
@@ -104,7 +111,7 @@ class IncrementalRelation(val sqlContext: SQLContext,
}
}
- override def schema: StructType = latestSchema
+ override def schema: StructType = usedSchema
override def buildScan(): RDD[Row] = {
val regularFileIdToFullPath = mutable.HashMap[String, String]()
@@ -148,12 +155,12 @@ class IncrementalRelation(val sqlContext: SQLContext,
} else {
log.info("Additional Filters to be applied to incremental source are :"
+ filters)
- var df: DataFrame =
sqlContext.createDataFrame(sqlContext.sparkContext.emptyRDD[Row], latestSchema)
+ var df: DataFrame =
sqlContext.createDataFrame(sqlContext.sparkContext.emptyRDD[Row], usedSchema)
if (metaBootstrapFileIdToFullPath.nonEmpty) {
df = sqlContext.sparkSession.read
.format("hudi")
- .schema(latestSchema)
+ .schema(usedSchema)
.option(DataSourceReadOptions.READ_PATHS_OPT_KEY,
filteredMetaBootstrapFullPaths.mkString(","))
.load()
}
@@ -161,7 +168,7 @@ class IncrementalRelation(val sqlContext: SQLContext,
if (regularFileIdToFullPath.nonEmpty)
{
df = df.union(sqlContext.read.options(sOpts)
- .schema(latestSchema)
+ .schema(usedSchema)
.parquet(filteredRegularFullPaths.toList: _*)
.filter(String.format("%s >= '%s'",
HoodieRecord.COMMIT_TIME_METADATA_FIELD,
commitsToReturn.head.getTimestamp))