This is an automated email from the ASF dual-hosted git repository.
jonvex pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 668d86d948e6 [HUDI-9609] Add support for V1 incremental queries with
fg reader (#13529)
668d86d948e6 is described below
commit 668d86d948e632ea17f540a18f123a4904473ae5
Author: Jon Vexler <[email protected]>
AuthorDate: Thu Jul 24 13:21:27 2025 -0400
[HUDI-9609] Add support for V1 incremental queries with fg reader (#13529)
---
.../org/apache/hudi/BaseHoodieTableFileIndex.java | 47 +++++++++----
.../main/scala/org/apache/hudi/DefaultSource.scala | 66 ++++++-------------
.../scala/org/apache/hudi/HoodieCDCFileIndex.scala | 16 ++---
.../hudi/HoodieHadoopFsRelationFactory.scala | 76 ++++++++++++++++------
.../apache/hudi/HoodieIncrementalFileIndex.scala | 6 +-
.../hudi/MergeOnReadIncrementalRelationV1.scala | 6 +-
.../hudi/MergeOnReadIncrementalRelationV2.scala | 11 +++-
7 files changed, 130 insertions(+), 98 deletions(-)
diff --git
a/hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java
b/hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java
index 9919e0bde3cb..096a11ccf5dd 100644
--- a/hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java
+++ b/hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java
@@ -29,6 +29,7 @@ import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodieTableQueryType;
import org.apache.hudi.common.serialization.HoodieFileSliceSerializer;
import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.HoodieTableVersion;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.TimelineUtils;
@@ -97,8 +98,8 @@ public abstract class BaseHoodieTableFileIndex implements
AutoCloseable {
private final TypedProperties configProperties;
private final HoodieTableQueryType queryType;
private final Option<String> specifiedQueryInstant;
- private final Option<String> startCompletionTime;
- private final Option<String> endCompletionTime;
+ private final Option<String> incrementalQueryStartTime;
+ private final Option<String> incrementalQueryEndTime;
private final List<StoragePath> queryPaths;
private final boolean shouldIncludePendingCommits;
@@ -117,6 +118,7 @@ public abstract class BaseHoodieTableFileIndex implements
AutoCloseable {
private final HoodieTableMetaClient metaClient;
private final HoodieEngineContext engineContext;
+ private final boolean isCompletionTimeBasedQuery;
private final transient FileStatusCache fileStatusCache;
@@ -139,8 +141,8 @@ public abstract class BaseHoodieTableFileIndex implements
AutoCloseable {
* @param shouldIncludePendingCommits flags whether file-index should
exclude any pending operations
* @param shouldValidateInstant flags to validate whether query
instant is present in the timeline
* @param fileStatusCache transient cache of fetched
[[FileStatus]]es
- * @param startCompletionTime start completion time for incremental
query (optional)
- * @param endCompletionTime end completion time for incremental
query (optional)
+ * @param incrementalQueryStartTime start completion time for
incremental query (optional)
+ * @param incrementalQueryEndTime end completion time for
incremental query (optional)
*/
public BaseHoodieTableFileIndex(HoodieEngineContext engineContext,
HoodieTableMetaClient metaClient,
@@ -152,8 +154,8 @@ public abstract class BaseHoodieTableFileIndex implements
AutoCloseable {
boolean shouldValidateInstant,
FileStatusCache fileStatusCache,
boolean shouldListLazily,
- Option<String> startCompletionTime,
- Option<String> endCompletionTime) {
+ Option<String> incrementalQueryStartTime,
+ Option<String> incrementalQueryEndTime) {
this.partitionColumns = metaClient.getTableConfig().getPartitionFields()
.orElseGet(() -> new String[0]);
@@ -169,8 +171,8 @@ public abstract class BaseHoodieTableFileIndex implements
AutoCloseable {
this.shouldIncludePendingCommits = shouldIncludePendingCommits;
this.shouldValidateInstant = shouldValidateInstant;
this.shouldListLazily = shouldListLazily;
- this.startCompletionTime = startCompletionTime;
- this.endCompletionTime = endCompletionTime;
+ this.incrementalQueryStartTime = incrementalQueryStartTime;
+ this.incrementalQueryEndTime = incrementalQueryEndTime;
this.basePath = metaClient.getBasePath();
@@ -178,6 +180,9 @@ public abstract class BaseHoodieTableFileIndex implements
AutoCloseable {
this.engineContext = engineContext;
this.fileStatusCache = fileStatusCache;
this.configProperties = configProperties;
+ HoodieTableVersion tableVersion =
HoodieTableVersion.fromVersionCode(configProperties.getInteger("hoodie.datasource.read.incr.table.version",
+ metaClient.getTableConfig().getTableVersion().versionCode()));
+ this.isCompletionTimeBasedQuery =
tableVersion.greaterThanOrEquals(HoodieTableVersion.EIGHT);
doRefresh();
}
@@ -324,12 +329,8 @@ public abstract class BaseHoodieTableFileIndex implements
AutoCloseable {
List<String> matchedPartitionPaths;
try {
if (isPartitionedTable()) {
- if (queryType == HoodieTableQueryType.INCREMENTAL &&
startCompletionTime.isPresent()
- &&
!metaClient.getActiveTimeline().isBeforeTimelineStartsByCompletionTime(startCompletionTime.get()))
{
- HoodieTimeline timelineToQuery =
metaClient.getActiveTimeline().getWriteTimeline()
- .findInstantsInRangeByCompletionTime(
- startCompletionTime.get(),
- endCompletionTime.orElse(String.valueOf(Long.MAX_VALUE)));
+ if (queryType == HoodieTableQueryType.INCREMENTAL &&
incrementalQueryStartTime.isPresent() && !isBeforeTimelineStarts()) {
+ HoodieTimeline timelineToQuery = findInstantsInRange();
matchedPartitionPaths =
TimelineUtils.getWrittenPartitions(timelineToQuery);
} else {
matchedPartitionPaths =
tableMetadata.getPartitionPathWithPathPrefixes(relativePartitionPaths);
@@ -347,6 +348,24 @@ public abstract class BaseHoodieTableFileIndex implements
AutoCloseable {
.collect(Collectors.toList());
}
+ private boolean isBeforeTimelineStarts() {
+ if (isCompletionTimeBasedQuery) {
+ return
metaClient.getActiveTimeline().isBeforeTimelineStartsByCompletionTime(incrementalQueryStartTime.get());
+ } else {
+ return
metaClient.getActiveTimeline().isBeforeTimelineStarts(incrementalQueryStartTime.get());
+ }
+ }
+
+ private HoodieTimeline findInstantsInRange() {
+ if (isCompletionTimeBasedQuery) {
+ return metaClient.getActiveTimeline().getWriteTimeline()
+
.findInstantsInRangeByCompletionTime(incrementalQueryStartTime.get(),
incrementalQueryEndTime.orElse(String.valueOf(Long.MAX_VALUE)));
+ } else {
+ return metaClient.getActiveTimeline().getWriteTimeline()
+ .findInstantsInRange(incrementalQueryStartTime.get(),
incrementalQueryEndTime.orElse(String.valueOf(Long.MAX_VALUE)));
+ }
+ }
+
protected void refresh() {
fileStatusCache.invalidate();
doRefresh();
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala
index a0d7ed059ddf..75eaa7e8a00d 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala
@@ -307,6 +307,12 @@ object DefaultSource {
val useNewParquetFileFormat =
parameters.getOrElse(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(),
HoodieReaderConfig.FILE_GROUP_READER_ENABLED.defaultValue().toString).toBoolean
&&
!metaClient.isMetadataTable && (globPaths == null || globPaths.isEmpty)
+ lazy val tableVersion = if
(SparkConfigUtils.containsConfigProperty(parameters,
INCREMENTAL_READ_TABLE_VERSION)) {
+ Integer.parseInt(parameters(INCREMENTAL_READ_TABLE_VERSION.key))
+ } else {
+ metaClient.getTableConfig.getTableVersion.versionCode()
+ }
+ lazy val hoodieTableSupportsCompletionTime = tableVersion >=
HoodieTableVersion.EIGHT.versionCode()
if
(metaClient.getCommitsTimeline.filterCompletedInstants.countInstants() == 0) {
new EmptyRelation(sqlContext, resolveSchema(metaClient, parameters,
Some(schema)))
} else if (isCdcQuery) {
@@ -334,29 +340,13 @@ object DefaultSource {
resolveBaseFileOnlyRelation(sqlContext, globPaths, userSchema,
metaClient, parameters)
}
case (COPY_ON_WRITE, QUERY_TYPE_INCREMENTAL_OPT_VAL, _) =>
- if (SparkConfigUtils.containsConfigProperty(parameters,
INCREMENTAL_READ_TABLE_VERSION)) {
- val writeTableVersion =
Integer.parseInt(parameters(INCREMENTAL_READ_TABLE_VERSION.key))
- if (writeTableVersion >= HoodieTableVersion.EIGHT.versionCode())
{
- if (useNewParquetFileFormat) {
- new HoodieCopyOnWriteIncrementalHadoopFsRelationFactory(
- sqlContext, metaClient, parameters, userSchema,
isBootstrappedTable).build()
- } else {
- new IncrementalRelationV2(sqlContext, parameters,
userSchema, metaClient, RangeType.CLOSED_CLOSED)
- }
- } else {
- new IncrementalRelationV1(sqlContext, parameters, userSchema,
metaClient)
- }
- } else {
- if (metaClient.getTableConfig.getTableVersion.versionCode() >=
HoodieTableVersion.EIGHT.versionCode()) {
- if (useNewParquetFileFormat) {
- new HoodieCopyOnWriteIncrementalHadoopFsRelationFactory(
- sqlContext, metaClient, parameters, userSchema,
isBootstrappedTable).build()
- } else {
- new IncrementalRelationV2(sqlContext, parameters,
userSchema, metaClient, RangeType.CLOSED_CLOSED)
- }
- } else {
- new IncrementalRelationV1(sqlContext, parameters, userSchema,
metaClient)
- }
+ (hoodieTableSupportsCompletionTime, useNewParquetFileFormat) match
{
+ case (true, true) => new
HoodieCopyOnWriteIncrementalHadoopFsRelationFactoryV2(
+ sqlContext, metaClient, parameters, userSchema,
isBootstrappedTable, RangeType.CLOSED_CLOSED).build()
+ case (true, false) => new IncrementalRelationV2(sqlContext,
parameters, userSchema, metaClient, RangeType.CLOSED_CLOSED)
+ case (false, true) => new
HoodieCopyOnWriteIncrementalHadoopFsRelationFactoryV1(
+ sqlContext, metaClient, parameters, userSchema,
isBootstrappedTable).build()
+ case (false, false) => new IncrementalRelationV1(sqlContext,
parameters, userSchema, metaClient)
}
case (MERGE_ON_READ, QUERY_TYPE_SNAPSHOT_OPT_VAL, false) =>
@@ -376,29 +366,13 @@ object DefaultSource {
}
case (MERGE_ON_READ, QUERY_TYPE_INCREMENTAL_OPT_VAL, _) =>
- if (SparkConfigUtils.containsConfigProperty(parameters,
INCREMENTAL_READ_TABLE_VERSION)) {
- val writeTableVersion =
Integer.parseInt(parameters(INCREMENTAL_READ_TABLE_VERSION.key))
- if (writeTableVersion >= HoodieTableVersion.EIGHT.versionCode())
{
- if (useNewParquetFileFormat) {
- new HoodieMergeOnReadIncrementalHadoopFsRelationFactory(
- sqlContext, metaClient, parameters, userSchema,
isBootstrappedTable).build()
- } else {
- MergeOnReadIncrementalRelationV2(sqlContext, parameters,
metaClient, userSchema)
- }
- } else {
- MergeOnReadIncrementalRelationV1(sqlContext, parameters,
metaClient, userSchema)
- }
- } else {
- if (metaClient.getTableConfig.getTableVersion.versionCode() >=
HoodieTableVersion.EIGHT.versionCode()) {
- if (useNewParquetFileFormat) {
- new HoodieMergeOnReadIncrementalHadoopFsRelationFactory(
- sqlContext, metaClient, parameters, userSchema,
isBootstrappedTable).build()
- } else {
- MergeOnReadIncrementalRelationV2(sqlContext, parameters,
metaClient, userSchema)
- }
- } else {
- MergeOnReadIncrementalRelationV1(sqlContext, parameters,
metaClient, userSchema)
- }
+ (hoodieTableSupportsCompletionTime, useNewParquetFileFormat) match
{
+ case (true, true) => new
HoodieMergeOnReadIncrementalHadoopFsRelationFactoryV2(
+ sqlContext, metaClient, parameters, userSchema,
isBootstrappedTable).build()
+ case (true, false) =>
MergeOnReadIncrementalRelationV2(sqlContext, parameters, metaClient, userSchema)
+ case (false, true) => new
HoodieMergeOnReadIncrementalHadoopFsRelationFactoryV1(
+ sqlContext, metaClient, parameters, userSchema,
isBootstrappedTable).build()
+ case (false, false) =>
MergeOnReadIncrementalRelationV1(sqlContext, parameters, metaClient, userSchema)
}
case (_, _, true) =>
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieCDCFileIndex.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieCDCFileIndex.scala
index 07d8b224d977..c2413f33d427 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieCDCFileIndex.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieCDCFileIndex.scala
@@ -33,21 +33,21 @@ import org.apache.spark.sql.types.StructType
import scala.collection.JavaConverters._
-class HoodieCDCFileIndex (override val spark: SparkSession,
- override val metaClient: HoodieTableMetaClient,
- override val schemaSpec: Option[StructType],
- override val options: Map[String, String],
- @transient override val fileStatusCache:
FileStatusCache = NoopCache,
- override val includeLogFiles: Boolean,
- override val shouldEmbedFileSlices: Boolean)
+class HoodieCDCFileIndex(override val spark: SparkSession,
+ override val metaClient: HoodieTableMetaClient,
+ override val schemaSpec: Option[StructType],
+ override val options: Map[String, String],
+ @transient override val fileStatusCache:
FileStatusCache = NoopCache,
+ override val includeLogFiles: Boolean)
extends HoodieFileIndex(
- spark, metaClient, schemaSpec, options, fileStatusCache, includeLogFiles,
shouldEmbedFileSlices
+ spark, metaClient, schemaSpec, options, fileStatusCache, includeLogFiles,
shouldEmbedFileSlices = true
) with FileIndex {
private val emptyPartitionPath: String = "empty_partition_path";
val cdcRelation: CDCRelation = CDCRelation.getCDCRelation(spark.sqlContext,
metaClient, options)
val cdcExtractor: HoodieCDCExtractor = cdcRelation.cdcExtractor
override def listFiles(partitionFilters: Seq[Expression], dataFilters:
Seq[Expression]): Seq[PartitionDirectory] = {
+ hasPushedDownPartitionPredicates = true
cdcExtractor.extractCDCFileSplits().asScala.map {
case (fileGroupId, fileSplits) =>
val partitionPath = if (fileGroupId.getPartitionPath.isEmpty)
emptyPartitionPath else fileGroupId.getPartitionPath
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieHadoopFsRelationFactory.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieHadoopFsRelationFactory.scala
index cacc8fc36c6b..ff6322dc6fad 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieHadoopFsRelationFactory.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieHadoopFsRelationFactory.scala
@@ -25,6 +25,7 @@ import org.apache.hudi.common.config.{ConfigProperty,
HoodieMetadataConfig, Hood
import
org.apache.hudi.common.config.HoodieMetadataConfig.{DEFAULT_METADATA_ENABLE_FOR_READERS,
ENABLE}
import org.apache.hudi.common.model.HoodieRecord
import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient,
TableSchemaResolver}
+import org.apache.hudi.common.table.log.InstantRange.RangeType
import org.apache.hudi.common.table.timeline.HoodieTimeline
import org.apache.hudi.common.util.{ConfigUtils, StringUtils}
import org.apache.hudi.common.util.StringUtils.isNullOrEmpty
@@ -329,7 +330,7 @@ class
HoodieMergeOnReadSnapshotHadoopFsRelationFactory(override val sqlContext:
override def buildDataSchema(): StructType = fileIndex.dataSchema
}
-abstract class
BaseHoodieMergeOnReadIncrementalHadoopFsRelationFactory(override val
sqlContext: SQLContext,
+abstract class
HoodieBaseMergeOnReadIncrementalHadoopFsRelationFactory(override val
sqlContext: SQLContext,
override val metaClient: HoodieTableMetaClient,
override val options: Map[String, String],
override val schemaSpec: Option[StructType],
@@ -343,15 +344,16 @@ abstract class
BaseHoodieMergeOnReadIncrementalHadoopFsRelationFactory(override
override protected def isIncremental: Boolean = true
}
-class HoodieMergeOnReadIncrementalHadoopFsRelationFactory(override val
sqlContext: SQLContext,
- override val
metaClient: HoodieTableMetaClient,
- override val
options: Map[String, String],
- override val
schemaSpec: Option[StructType],
- isBootstrap: Boolean)
- extends BaseHoodieMergeOnReadIncrementalHadoopFsRelationFactory(sqlContext,
metaClient, options, schemaSpec, isBootstrap) {
+abstract class HoodieMergeOnReadIncrementalHadoopFsRelationFactory(override
val sqlContext: SQLContext,
+ override
val metaClient: HoodieTableMetaClient,
+ override
val options: Map[String, String],
+ override
val schemaSpec: Option[StructType],
+
isBootstrap: Boolean,
+
mergeOnReadIncrementalRelation: MergeOnReadIncrementalRelation)
+ extends HoodieBaseMergeOnReadIncrementalHadoopFsRelationFactory(sqlContext,
metaClient, options, schemaSpec, isBootstrap) {
private val incrementalFileIndex = new HoodieIncrementalFileIndex(
- sparkSession, metaClient, schemaSpec, options, fileStatusCache, true, true)
+ sparkSession, metaClient, schemaSpec, options, fileStatusCache, true,
mergeOnReadIncrementalRelation)
override def buildFileIndex(): HoodieFileIndex = incrementalFileIndex
@@ -362,14 +364,30 @@ class
HoodieMergeOnReadIncrementalHadoopFsRelationFactory(override val sqlContex
override def buildDataSchema(): StructType = incrementalFileIndex.dataSchema
}
+class HoodieMergeOnReadIncrementalHadoopFsRelationFactoryV1(override val
sqlContext: SQLContext,
+ override val
metaClient: HoodieTableMetaClient,
+ override val
options: Map[String, String],
+ override val
schemaSpec: Option[StructType],
+ isBootstrap:
Boolean)
+ extends HoodieMergeOnReadIncrementalHadoopFsRelationFactory(sqlContext,
metaClient, options, schemaSpec, isBootstrap,
+ MergeOnReadIncrementalRelationV1(sqlContext, options, metaClient,
schemaSpec))
+
+class HoodieMergeOnReadIncrementalHadoopFsRelationFactoryV2(override val
sqlContext: SQLContext,
+ override val
metaClient: HoodieTableMetaClient,
+ override val
options: Map[String, String],
+ override val
schemaSpec: Option[StructType],
+ isBootstrap:
Boolean)
+ extends HoodieMergeOnReadIncrementalHadoopFsRelationFactory(sqlContext,
metaClient, options, schemaSpec, isBootstrap,
+ MergeOnReadIncrementalRelationV2(sqlContext, options, metaClient,
schemaSpec))
+
class HoodieMergeOnReadCDCHadoopFsRelationFactory(override val sqlContext:
SQLContext,
override val metaClient:
HoodieTableMetaClient,
override val options:
Map[String, String],
override val schemaSpec:
Option[StructType],
isBootstrap: Boolean)
- extends BaseHoodieMergeOnReadIncrementalHadoopFsRelationFactory(sqlContext,
metaClient, options, schemaSpec, isBootstrap) {
+ extends HoodieBaseMergeOnReadIncrementalHadoopFsRelationFactory(sqlContext,
metaClient, options, schemaSpec, isBootstrap) {
private val hoodieCDCFileIndex = new HoodieCDCFileIndex(
- sparkSession, metaClient, schemaSpec, options, fileStatusCache, true, true)
+ sparkSession, metaClient, schemaSpec, options, fileStatusCache, true)
override def buildFileIndex(): HoodieFileIndex = hoodieCDCFileIndex
@@ -408,7 +426,7 @@ class
HoodieCopyOnWriteSnapshotHadoopFsRelationFactory(override val sqlContext:
override def buildDataSchema(): StructType = fileIndex.dataSchema
}
-abstract class
BaseHoodieCopyOnWriteIncrementalHadoopFsRelationFactory(override val
sqlContext: SQLContext,
+abstract class
HoodieBaseCopyOnWriteIncrementalHadoopFsRelationFactory(override val
sqlContext: SQLContext,
override val metaClient: HoodieTableMetaClient,
override val options: Map[String, String],
override val schemaSpec: Option[StructType],
@@ -424,15 +442,16 @@ abstract class
BaseHoodieCopyOnWriteIncrementalHadoopFsRelationFactory(override
}
-class HoodieCopyOnWriteIncrementalHadoopFsRelationFactory(override val
sqlContext: SQLContext,
- override val
metaClient: HoodieTableMetaClient,
- override val
options: Map[String, String],
- override val
schemaSpec: Option[StructType],
- isBootstrap: Boolean)
- extends BaseHoodieCopyOnWriteIncrementalHadoopFsRelationFactory(sqlContext,
metaClient, options, schemaSpec, isBootstrap) {
+abstract class HoodieCopyOnWriteIncrementalHadoopFsRelationFactory(override
val sqlContext: SQLContext,
+
override val metaClient: HoodieTableMetaClient,
+
override val options: Map[String, String],
+
override val schemaSpec: Option[StructType],
+
isBootstrap: Boolean,
+
mergeOnReadIncrementalRelation: MergeOnReadIncrementalRelation)
+ extends HoodieBaseCopyOnWriteIncrementalHadoopFsRelationFactory(sqlContext,
metaClient, options, schemaSpec, isBootstrap) {
private val incrementalFileIndex = new HoodieIncrementalFileIndex(
- sparkSession, metaClient, schemaSpec, options, fileStatusCache, false,
true)
+ sparkSession, metaClient, schemaSpec, options, fileStatusCache, false,
mergeOnReadIncrementalRelation)
override def buildFileIndex(): HoodieFileIndex = incrementalFileIndex
@@ -443,15 +462,32 @@ class
HoodieCopyOnWriteIncrementalHadoopFsRelationFactory(override val sqlContex
override def buildDataSchema(): StructType = incrementalFileIndex.dataSchema
}
+class HoodieCopyOnWriteIncrementalHadoopFsRelationFactoryV1(override val
sqlContext: SQLContext,
+ override val
metaClient: HoodieTableMetaClient,
+ override val
options: Map[String, String],
+ override val
schemaSpec: Option[StructType],
+ isBootstrap:
Boolean)
+extends HoodieCopyOnWriteIncrementalHadoopFsRelationFactory(sqlContext,
metaClient, options, schemaSpec, isBootstrap,
+ MergeOnReadIncrementalRelationV1(sqlContext, options, metaClient,
schemaSpec))
+
+class HoodieCopyOnWriteIncrementalHadoopFsRelationFactoryV2(override val
sqlContext: SQLContext,
+ override val
metaClient: HoodieTableMetaClient,
+ override val
options: Map[String, String],
+ override val
schemaSpec: Option[StructType],
+ isBootstrap:
Boolean,
+ rangeType:
RangeType)
+ extends HoodieCopyOnWriteIncrementalHadoopFsRelationFactory(sqlContext,
metaClient, options, schemaSpec, isBootstrap,
+ MergeOnReadIncrementalRelationV2(sqlContext, options, metaClient,
schemaSpec, None, rangeType))
+
class HoodieCopyOnWriteCDCHadoopFsRelationFactory(override val sqlContext:
SQLContext,
override val metaClient:
HoodieTableMetaClient,
override val options:
Map[String, String],
override val schemaSpec:
Option[StructType],
isBootstrap: Boolean)
- extends BaseHoodieCopyOnWriteIncrementalHadoopFsRelationFactory(sqlContext,
metaClient, options, schemaSpec, isBootstrap) {
+ extends HoodieBaseCopyOnWriteIncrementalHadoopFsRelationFactory(sqlContext,
metaClient, options, schemaSpec, isBootstrap) {
private val hoodieCDCFileIndex = new HoodieCDCFileIndex(
- sparkSession, metaClient, schemaSpec, options, fileStatusCache, false,
true)
+ sparkSession, metaClient, schemaSpec, options, fileStatusCache, false)
override def buildFileIndex(): HoodieFileIndex = hoodieCDCFileIndex
override def buildDataSchema(): StructType =
hoodieCDCFileIndex.cdcRelation.schema
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieIncrementalFileIndex.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieIncrementalFileIndex.scala
index d5717aaafff8..4c2091e8feda 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieIncrementalFileIndex.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieIncrementalFileIndex.scala
@@ -38,12 +38,10 @@ class HoodieIncrementalFileIndex(override val spark:
SparkSession,
override val options: Map[String, String],
@transient override val fileStatusCache:
FileStatusCache = NoopCache,
override val includeLogFiles: Boolean,
- override val shouldEmbedFileSlices: Boolean)
+ mergeOnReadIncrementalRelation:
MergeOnReadIncrementalRelation)
extends HoodieFileIndex(
- spark, metaClient, schemaSpec, options, fileStatusCache, includeLogFiles,
shouldEmbedFileSlices
+ spark, metaClient, schemaSpec, options, fileStatusCache, includeLogFiles,
shouldEmbedFileSlices = true
) with FileIndex {
- val mergeOnReadIncrementalRelation: MergeOnReadIncrementalRelationV2 =
MergeOnReadIncrementalRelationV2(
- spark.sqlContext, options, metaClient, schemaSpec, schemaSpec)
override def listFiles(partitionFilters: Seq[Expression], dataFilters:
Seq[Expression]): Seq[PartitionDirectory] = {
val fileSlices =
mergeOnReadIncrementalRelation.listFileSplits(partitionFilters,
dataFilters).toSeq.flatMap(
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelationV1.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelationV1.scala
index 4538d7c31821..2e0b14ecd968 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelationV1.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelationV1.scala
@@ -51,7 +51,7 @@ case class MergeOnReadIncrementalRelationV1(override val
sqlContext: SQLContext,
private val userSchema:
Option[StructType],
private val prunedDataSchema:
Option[StructType] = None)
extends BaseMergeOnReadSnapshotRelation(sqlContext, optParams, metaClient,
Seq(), userSchema, prunedDataSchema)
- with HoodieIncrementalRelationV1Trait {
+ with HoodieIncrementalRelationV1Trait with MergeOnReadIncrementalRelation {
override type Relation = MergeOnReadIncrementalRelationV1
@@ -123,7 +123,7 @@ case class MergeOnReadIncrementalRelationV1(override val
sqlContext: SQLContext,
}
}
- def listFileSplits(partitionFilters: Seq[Expression], dataFilters:
Seq[Expression]): Map[InternalRow, Seq[FileSlice]] = {
+ override def listFileSplits(partitionFilters: Seq[Expression], dataFilters:
Seq[Expression]): Map[InternalRow, Seq[FileSlice]] = {
val slices = if (includedCommits.isEmpty) {
List()
} else {
@@ -152,7 +152,7 @@ case class MergeOnReadIncrementalRelationV1(override val
sqlContext: SQLContext,
})
}
- def getRequiredFilters: Seq[Filter] = {
+ override def getRequiredFilters: Seq[Filter] = {
if (includedCommits.isEmpty) {
Seq.empty
} else {
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelationV2.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelationV2.scala
index 216ead57f8f3..9946e11d707b 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelationV2.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelationV2.scala
@@ -42,6 +42,11 @@ import org.apache.spark.sql.types.StructType
import scala.collection.JavaConverters._
import scala.collection.immutable
+trait MergeOnReadIncrementalRelation {
+ def listFileSplits(partitionFilters: Seq[Expression], dataFilters:
Seq[Expression]): Map[InternalRow, Seq[FileSlice]]
+ def getRequiredFilters: Seq[Filter]
+}
+
case class MergeOnReadIncrementalRelationV2(override val sqlContext:
SQLContext,
override val optParams:
Map[String, String],
override val metaClient:
HoodieTableMetaClient,
@@ -49,7 +54,7 @@ case class MergeOnReadIncrementalRelationV2(override val
sqlContext: SQLContext,
private val prunedDataSchema:
Option[StructType] = None,
override val rangeType: RangeType
= RangeType.CLOSED_CLOSED)
extends BaseMergeOnReadSnapshotRelation(sqlContext, optParams, metaClient,
Seq(), userSchema, prunedDataSchema)
- with HoodieIncrementalRelationV2Trait {
+ with HoodieIncrementalRelationV2Trait with MergeOnReadIncrementalRelation {
override type Relation = MergeOnReadIncrementalRelationV2
@@ -120,7 +125,7 @@ case class MergeOnReadIncrementalRelationV2(override val
sqlContext: SQLContext,
}
}
- def listFileSplits(partitionFilters: Seq[Expression], dataFilters:
Seq[Expression]): Map[InternalRow, Seq[FileSlice]] = {
+ override def listFileSplits(partitionFilters: Seq[Expression], dataFilters:
Seq[Expression]): Map[InternalRow, Seq[FileSlice]] = {
val slices = if (includedCommits.isEmpty) {
List()
} else {
@@ -149,7 +154,7 @@ case class MergeOnReadIncrementalRelationV2(override val
sqlContext: SQLContext,
})
}
- def getRequiredFilters: Seq[Filter] = {
+ override def getRequiredFilters: Seq[Filter] = {
if (includedCommits.isEmpty) {
Seq.empty
} else {