This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 3354fac [HUDI-2449] Incremental read for Flink (#3686)
3354fac is described below
commit 3354fac42f9a2c4dbc8ac73ca4749160e9b9459b
Author: Danny Chan <[email protected]>
AuthorDate: Sun Sep 19 09:06:46 2021 +0800
[HUDI-2449] Incremental read for Flink (#3686)
---
.../apache/hudi/configuration/FlinkOptions.java | 14 +-
.../java/org/apache/hudi/source/FileIndex.java | 23 +-
...ngFunction.java => IncrementalInputSplits.java} | 376 +++++++++------------
.../hudi/source/StreamReadMonitoringFunction.java | 211 +-----------
.../org/apache/hudi/table/HoodieTableFactory.java | 12 +
.../org/apache/hudi/table/HoodieTableSource.java | 213 ++++++------
.../table/format/mor/MergeOnReadInputFormat.java | 29 +-
.../java/org/apache/hudi/source/TestFileIndex.java | 14 +
.../source/TestStreamReadMonitoringFunction.java | 4 +-
.../apache/hudi/source/TestStreamReadOperator.java | 4 -
.../apache/hudi/table/HoodieDataSourceITCase.java | 35 +-
.../apache/hudi/table/TestHoodieTableFactory.java | 26 ++
.../apache/hudi/table/TestHoodieTableSource.java | 10 +-
.../apache/hudi/table/format/TestInputFormat.java | 81 ++++-
.../test/java/org/apache/hudi/utils/TestData.java | 10 +-
.../test/java/org/apache/hudi/utils/TestUtils.java | 3 +-
16 files changed, 480 insertions(+), 585 deletions(-)
diff --git
a/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
b/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
index 3a2e615..c736821 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
@@ -197,12 +197,18 @@ public class FlinkOptions extends HoodieConfig {
.withDescription("Check interval for streaming read of SECOND, default 1
minute");
public static final String START_COMMIT_EARLIEST = "earliest";
- public static final ConfigOption<String> READ_STREAMING_START_COMMIT =
ConfigOptions
- .key("read.streaming.start-commit")
+ public static final ConfigOption<String> READ_START_COMMIT = ConfigOptions
+ .key("read.start-commit")
.stringType()
.noDefaultValue()
- .withDescription("Start commit instant for streaming read, the commit
time format should be 'yyyyMMddHHmmss', "
- + "by default reading from the latest instant");
+ .withDescription("Start commit instant for reading, the commit time
format should be 'yyyyMMddHHmmss', "
+ + "by default reading from the latest instant for streaming read");
+
+ public static final ConfigOption<String> READ_END_COMMIT = ConfigOptions
+ .key("read.end-commit")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("End commit instant for reading, the commit time format
should be 'yyyyMMddHHmmss'");
// ------------------------------------------------------------------------
// Write Options
diff --git a/hudi-flink/src/main/java/org/apache/hudi/source/FileIndex.java
b/hudi-flink/src/main/java/org/apache/hudi/source/FileIndex.java
index be02fc4..f1abf4b 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/source/FileIndex.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/source/FileIndex.java
@@ -28,6 +28,8 @@ import org.apache.flink.configuration.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
+import javax.annotation.Nullable;
+
import java.io.File;
import java.util.ArrayList;
import java.util.Arrays;
@@ -36,6 +38,7 @@ import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
+import java.util.Set;
/**
* A file index which supports listing files efficiently through metadata
table.
@@ -138,10 +141,28 @@ public class FileIndex {
}
// -------------------------------------------------------------------------
+ // Getter/Setter
+ // -------------------------------------------------------------------------
+
+ /**
+ * Sets up explicit partition paths for pruning.
+ */
+ public void setPartitionPaths(@Nullable Set<String> partitionPaths) {
+ if (partitionPaths != null) {
+ this.partitionPaths = new ArrayList<>(partitionPaths);
+ }
+ }
+
+ // -------------------------------------------------------------------------
// Utilities
// -------------------------------------------------------------------------
- private List<String> getOrBuildPartitionPaths() {
+ /**
+ * Returns all the relative partition paths.
+ *
+ * <p>The partition paths are cached once invoked.
+ */
+ public List<String> getOrBuildPartitionPaths() {
if (this.partitionPaths != null) {
return this.partitionPaths;
}
diff --git
a/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadMonitoringFunction.java
b/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java
similarity index 53%
copy from
hudi-flink/src/main/java/org/apache/hudi/source/StreamReadMonitoringFunction.java
copy to
hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java
index c5610d2..7d31920 100644
---
a/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadMonitoringFunction.java
+++
b/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -28,23 +28,12 @@ import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
import org.apache.hudi.common.util.Option;
-import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.sink.partitioner.profile.WriteProfiles;
import org.apache.hudi.table.format.mor.MergeOnReadInputSplit;
-import org.apache.hudi.util.StreamerUtil;
-import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.api.common.state.ListState;
-import org.apache.flink.api.common.state.ListStateDescriptor;
-import org.apache.flink.api.common.typeutils.base.StringSerializer;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.Path;
-import org.apache.flink.runtime.state.FunctionInitializationContext;
-import org.apache.flink.runtime.state.FunctionSnapshotContext;
-import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
-import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
-import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.hadoop.fs.FileStatus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -55,180 +44,101 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
+import java.util.Objects;
import java.util.Set;
-import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static
org.apache.hudi.common.table.timeline.HoodieTimeline.GREATER_THAN;
import static
org.apache.hudi.common.table.timeline.HoodieTimeline.GREATER_THAN_OR_EQUALS;
+import static
org.apache.hudi.common.table.timeline.HoodieTimeline.LESSER_THAN_OR_EQUALS;
/**
- * This is the single (non-parallel) monitoring task which takes a {@link
MergeOnReadInputSplit}
- * , it is responsible for:
+ * Utilities to generate incremental input splits {@link
MergeOnReadInputSplit}.
+ * The input splits are used for streaming and incremental read.
*
+ * <p>How to generate the input splits:
* <ol>
- * <li>Monitoring a user-provided hoodie table path.</li>
- * <li>Deciding which files(or split) should be further read and
processed.</li>
- * <li>Creating the {@link MergeOnReadInputSplit splits} corresponding to
those files.</li>
- * <li>Assigning them to downstream tasks for further processing.</li>
+ * <li>first fetch all the commit metadata for the incremental instants;</li>
+ * <li>resolve the incremental commit file paths;</li>
+ * <li>filter the full file paths by required partitions;</li>
+ * <li>use the file paths from #step 3 as the back-up of the filesystem
view.</li>
* </ol>
- *
- * <p>The splits to be read are forwarded to the downstream {@link
StreamReadOperator}
- * which can have parallelism greater than one.
- *
- * <p><b>IMPORTANT NOTE: </b> Splits are forwarded downstream for reading in
ascending instant commits time order,
- * in each downstream task, the splits are also read in receiving sequence. We
do not ensure split consuming sequence
- * among the downstream tasks.
*/
-public class StreamReadMonitoringFunction
- extends RichSourceFunction<MergeOnReadInputSplit> implements
CheckpointedFunction {
- private static final Logger LOG =
LoggerFactory.getLogger(StreamReadMonitoringFunction.class);
-
- private static final long serialVersionUID = 1L;
-
- /**
- * The path to monitor.
- */
- private final Path path;
-
- /**
- * The interval between consecutive path scans.
- */
- private final long interval;
-
- private transient Object checkpointLock;
-
- private volatile boolean isRunning = true;
-
- private String issuedInstant;
-
- private transient ListState<String> instantState;
-
+public class IncrementalInputSplits {
+ private static final Logger LOG =
LoggerFactory.getLogger(IncrementalInputSplits.class);
private final Configuration conf;
-
- private transient org.apache.hadoop.conf.Configuration hadoopConf;
-
- private HoodieTableMetaClient metaClient;
-
+ private final Path path;
private final long maxCompactionMemoryInBytes;
-
// for partition pruning
- private final Set<String> requiredPartitionPaths;
+ private final Set<String> requiredPartitions;
- public StreamReadMonitoringFunction(
+ private IncrementalInputSplits(
Configuration conf,
Path path,
long maxCompactionMemoryInBytes,
- Set<String> requiredPartitionPaths) {
+ @Nullable Set<String> requiredPartitions) {
this.conf = conf;
this.path = path;
- this.interval =
conf.getInteger(FlinkOptions.READ_STREAMING_CHECK_INTERVAL);
this.maxCompactionMemoryInBytes = maxCompactionMemoryInBytes;
- this.requiredPartitionPaths = requiredPartitionPaths;
+ this.requiredPartitions = requiredPartitions;
}
- @Override
- public void initializeState(FunctionInitializationContext context) throws
Exception {
-
- ValidationUtils.checkState(this.instantState == null,
- "The " + getClass().getSimpleName() + " has already been
initialized.");
-
- this.instantState = context.getOperatorStateStore().getListState(
- new ListStateDescriptor<>(
- "file-monitoring-state",
- StringSerializer.INSTANCE
- )
- );
-
- if (context.isRestored()) {
- LOG.info("Restoring state for the class {} with table {} and base path
{}.",
- getClass().getSimpleName(), conf.getString(FlinkOptions.TABLE_NAME),
path);
-
- List<String> retrievedStates = new ArrayList<>();
- for (String entry : this.instantState.get()) {
- retrievedStates.add(entry);
- }
-
- ValidationUtils.checkArgument(retrievedStates.size() <= 1,
- getClass().getSimpleName() + " retrieved invalid state.");
-
- if (retrievedStates.size() == 1 && issuedInstant != null) {
- // this is the case where we have both legacy and new state.
- // the two should be mutually exclusive for the operator, thus we
throw the exception.
-
- throw new IllegalArgumentException(
- "The " + getClass().getSimpleName() + " has already restored from
a previous Flink version.");
-
- } else if (retrievedStates.size() == 1) {
- this.issuedInstant = retrievedStates.get(0);
- if (LOG.isDebugEnabled()) {
- LOG.debug("{} retrieved a issued instant of time {} for table {}
with path {}.",
- getClass().getSimpleName(), issuedInstant,
conf.get(FlinkOptions.TABLE_NAME), path);
- }
- }
- }
- }
-
- @Override
- public void open(Configuration parameters) throws Exception {
- super.open(parameters);
- this.hadoopConf = StreamerUtil.getHadoopConf();
- }
-
- @Override
- public void run(SourceFunction.SourceContext<MergeOnReadInputSplit> context)
throws Exception {
- checkpointLock = context.getCheckpointLock();
- while (isRunning) {
- synchronized (checkpointLock) {
- monitorDirAndForwardSplits(context);
- }
- TimeUnit.SECONDS.sleep(interval);
- }
+ /**
+ * Returns the builder.
+ */
+ public static Builder builder() {
+ return new Builder();
}
- @Nullable
- private HoodieTableMetaClient getOrCreateMetaClient() {
- if (this.metaClient != null) {
- return this.metaClient;
- }
- if (StreamerUtil.tableExists(this.path.toString(), hadoopConf)) {
- this.metaClient = StreamerUtil.createMetaClient(this.path.toString(),
hadoopConf);
- return this.metaClient;
- }
- // fallback
- return null;
+ /**
+ * Returns the incremental input splits.
+ *
+ * @param metaClient The meta client
+ * @param hadoopConf The hadoop configuration
+ * @return The list of incremental input splits or empty if there are no new
instants
+ */
+ public Result inputSplits(
+ HoodieTableMetaClient metaClient,
+ org.apache.hadoop.conf.Configuration hadoopConf) {
+ return inputSplits(metaClient, hadoopConf, null);
}
- @VisibleForTesting
- public void monitorDirAndForwardSplits(SourceContext<MergeOnReadInputSplit>
context) {
- HoodieTableMetaClient metaClient = getOrCreateMetaClient();
- if (metaClient == null) {
- // table does not exist
- return;
- }
+ /**
+ * Returns the incremental input splits.
+ *
+ * @param metaClient The meta client
+ * @param hadoopConf The hadoop configuration
+ * @param issuedInstant The last issued instant, only valid in streaming read
+ * @return The list of incremental input splits or empty if there are no new
instants
+ */
+ public Result inputSplits(
+ HoodieTableMetaClient metaClient,
+ org.apache.hadoop.conf.Configuration hadoopConf,
+ String issuedInstant) {
metaClient.reloadActiveTimeline();
HoodieTimeline commitTimeline =
metaClient.getCommitsAndCompactionTimeline().filterCompletedAndCompactionInstants();
if (commitTimeline.empty()) {
LOG.warn("No splits found for the table under path " + path);
- return;
+ return Result.EMPTY;
}
- List<HoodieInstant> instants = filterInstantsWithStart(commitTimeline,
this.issuedInstant);
+ List<HoodieInstant> instants = filterInstantsWithRange(commitTimeline,
issuedInstant);
// get the latest instant that satisfies condition
final HoodieInstant instantToIssue = instants.size() == 0 ? null :
instants.get(instants.size() - 1);
final InstantRange instantRange;
if (instantToIssue != null) {
- if (this.issuedInstant != null) {
- // had already consumed an instant
- instantRange = InstantRange.getInstance(this.issuedInstant,
instantToIssue.getTimestamp(),
+ if (issuedInstant != null) {
+ // the streaming reader may record the last issued instant, if the
issued instant is present,
+ // the instant range should be: (issued instant, the latest instant].
+ instantRange = InstantRange.getInstance(issuedInstant,
instantToIssue.getTimestamp(),
InstantRange.RangeType.OPEN_CLOSE);
- } else if
(this.conf.getOptional(FlinkOptions.READ_STREAMING_START_COMMIT).isPresent()) {
+ } else if
(this.conf.getOptional(FlinkOptions.READ_START_COMMIT).isPresent()) {
// first time consume and has a start commit
- final String specifiedStart =
this.conf.getString(FlinkOptions.READ_STREAMING_START_COMMIT);
- instantRange =
specifiedStart.equalsIgnoreCase(FlinkOptions.START_COMMIT_EARLIEST)
+ final String startCommit =
this.conf.getString(FlinkOptions.READ_START_COMMIT);
+ instantRange =
startCommit.equalsIgnoreCase(FlinkOptions.START_COMMIT_EARLIEST)
? null
- : InstantRange.getInstance(specifiedStart,
instantToIssue.getTimestamp(), InstantRange.RangeType.CLOSE_CLOSE);
+ : InstantRange.getInstance(startCommit,
instantToIssue.getTimestamp(), InstantRange.RangeType.CLOSE_CLOSE);
} else {
// first time consume and no start commit, consumes the latest
incremental data set.
instantRange = InstantRange.getInstance(instantToIssue.getTimestamp(),
instantToIssue.getTimestamp(),
@@ -236,18 +146,13 @@ public class StreamReadMonitoringFunction
}
} else {
LOG.info("No new instant found for the table under path " + path + ",
skip reading");
- return;
+ return Result.EMPTY;
}
- // generate input split:
- // 1. first fetch all the commit metadata for the incremental instants;
- // 2. filter the relative partition paths
- // 3. filter the full file paths
- // 4. use the file paths from #step 3 as the back-up of the filesystem view
String tableName = conf.getString(FlinkOptions.TABLE_NAME);
List<HoodieCommitMetadata> activeMetadataList = instants.stream()
.map(instant -> WriteProfiles.getCommitMetadata(tableName, path,
instant, commitTimeline)).collect(Collectors.toList());
- List<HoodieCommitMetadata> archivedMetadataList =
getArchivedMetadata(instantRange, commitTimeline, tableName);
+ List<HoodieCommitMetadata> archivedMetadataList =
getArchivedMetadata(metaClient, instantRange, commitTimeline, tableName);
if (archivedMetadataList.size() > 0) {
LOG.warn(""
+
"--------------------------------------------------------------------------------\n"
@@ -261,22 +166,22 @@ public class StreamReadMonitoringFunction
Set<String> writePartitions = getWritePartitionPaths(metadataList);
// apply partition push down
- if (this.requiredPartitionPaths.size() > 0) {
+ if (this.requiredPartitions != null) {
writePartitions = writePartitions.stream()
-
.filter(this.requiredPartitionPaths::contains).collect(Collectors.toSet());
+
.filter(this.requiredPartitions::contains).collect(Collectors.toSet());
}
FileStatus[] fileStatuses = WriteProfiles.getWritePathsOfInstants(path,
hadoopConf, metadataList);
if (fileStatuses.length == 0) {
LOG.warn("No files found for reading in user provided path.");
- return;
+ return Result.EMPTY;
}
HoodieTableFileSystemView fsView = new
HoodieTableFileSystemView(metaClient, commitTimeline, fileStatuses);
- final String commitToIssue = instantToIssue.getTimestamp();
+ final String endInstant = instantToIssue.getTimestamp();
final AtomicInteger cnt = new AtomicInteger(0);
final String mergeType = this.conf.getString(FlinkOptions.MERGE_TYPE);
List<MergeOnReadInputSplit> inputSplits = writePartitions.stream()
- .map(relPartitionPath ->
fsView.getLatestMergedFileSlicesBeforeOrOn(relPartitionPath, commitToIssue)
+ .map(relPartitionPath ->
fsView.getLatestMergedFileSlicesBeforeOrOn(relPartitionPath, endInstant)
.map(fileSlice -> {
Option<List<String>> logPaths =
Option.ofNullable(fileSlice.getLogFiles()
.sorted(HoodieLogFile.getLogFileComparator())
@@ -284,64 +189,12 @@ public class StreamReadMonitoringFunction
.collect(Collectors.toList()));
String basePath =
fileSlice.getBaseFile().map(BaseFile::getPath).orElse(null);
return new MergeOnReadInputSplit(cnt.getAndAdd(1),
- basePath, logPaths, commitToIssue,
+ basePath, logPaths, endInstant,
metaClient.getBasePath(), maxCompactionMemoryInBytes,
mergeType, instantRange);
}).collect(Collectors.toList()))
.flatMap(Collection::stream)
.collect(Collectors.toList());
-
- for (MergeOnReadInputSplit split : inputSplits) {
- context.collect(split);
- }
- // update the issues instant time
- this.issuedInstant = commitToIssue;
- LOG.info(""
- + "------------------------------------------------------------\n"
- + "---------- consumed to instant: {}\n"
- + "------------------------------------------------------------",
- commitToIssue);
- }
-
- @Override
- public void close() throws Exception {
- super.close();
-
- if (checkpointLock != null) {
- synchronized (checkpointLock) {
- issuedInstant = null;
- isRunning = false;
- }
- }
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("Closed File Monitoring Source for path: " + path + ".");
- }
- }
-
- @Override
- public void cancel() {
- if (checkpointLock != null) {
- // this is to cover the case where cancel() is called before the run()
- synchronized (checkpointLock) {
- issuedInstant = null;
- isRunning = false;
- }
- } else {
- issuedInstant = null;
- isRunning = false;
- }
- }
-
- // -------------------------------------------------------------------------
- // Checkpointing
- // -------------------------------------------------------------------------
-
- @Override
- public void snapshotState(FunctionSnapshotContext context) throws Exception {
- this.instantState.clear();
- if (this.issuedInstant != null) {
- this.instantState.add(this.issuedInstant);
- }
+ return Result.instance(inputSplits, endInstant);
}
/**
@@ -350,12 +203,14 @@ public class StreamReadMonitoringFunction
*
* <p>Note: should improve it with metadata table when the metadata table is
stable enough.
*
+ * @param metaClient The meta client
* @param instantRange The instant range to filter the timeline instants
* @param commitTimeline The commit timeline
* @param tableName The table name
* @return the list of archived metadata, or empty if there is no need to
read the archived timeline
*/
private List<HoodieCommitMetadata> getArchivedMetadata(
+ HoodieTableMetaClient metaClient,
InstantRange instantRange,
HoodieTimeline commitTimeline,
String tableName) {
@@ -389,23 +244,30 @@ public class StreamReadMonitoringFunction
* @param issuedInstant The last issued instant that has already been
delivered to downstream
* @return the filtered hoodie instants
*/
- private List<HoodieInstant> filterInstantsWithStart(
+ private List<HoodieInstant> filterInstantsWithRange(
HoodieTimeline commitTimeline,
final String issuedInstant) {
HoodieTimeline completedTimeline =
commitTimeline.filterCompletedInstants();
if (issuedInstant != null) {
+ // returns early for streaming mode
return completedTimeline.getInstants()
.filter(s -> HoodieTimeline.compareTimestamps(s.getTimestamp(),
GREATER_THAN, issuedInstant))
.collect(Collectors.toList());
- } else if
(this.conf.getOptional(FlinkOptions.READ_STREAMING_START_COMMIT).isPresent()
- &&
!this.conf.get(FlinkOptions.READ_STREAMING_START_COMMIT).equalsIgnoreCase(FlinkOptions.START_COMMIT_EARLIEST))
{
- String definedStartCommit =
this.conf.get(FlinkOptions.READ_STREAMING_START_COMMIT);
- return completedTimeline.getInstants()
- .filter(s -> HoodieTimeline.compareTimestamps(s.getTimestamp(),
GREATER_THAN_OR_EQUALS, definedStartCommit))
- .collect(Collectors.toList());
- } else {
- return completedTimeline.getInstants().collect(Collectors.toList());
}
+
+ Stream<HoodieInstant> instantStream = completedTimeline.getInstants();
+
+ if (this.conf.getOptional(FlinkOptions.READ_START_COMMIT).isPresent()
+ &&
!this.conf.get(FlinkOptions.READ_START_COMMIT).equalsIgnoreCase(FlinkOptions.START_COMMIT_EARLIEST))
{
+ final String startCommit = this.conf.get(FlinkOptions.READ_START_COMMIT);
+ instantStream = instantStream
+ .filter(s -> HoodieTimeline.compareTimestamps(s.getTimestamp(),
GREATER_THAN_OR_EQUALS, startCommit));
+ }
+ if (this.conf.getOptional(FlinkOptions.READ_END_COMMIT).isPresent()) {
+ final String endCommit = this.conf.get(FlinkOptions.READ_END_COMMIT);
+ instantStream = instantStream.filter(s ->
HoodieTimeline.compareTimestamps(s.getTimestamp(), LESSER_THAN_OR_EQUALS,
endCommit));
+ }
+ return instantStream.collect(Collectors.toList());
}
/**
@@ -426,4 +288,78 @@ public class StreamReadMonitoringFunction
merged.addAll(list2);
return merged;
}
+
+ // -------------------------------------------------------------------------
+ // Inner Class
+ // -------------------------------------------------------------------------
+
+ /**
+ * Represents a result of calling {@link #inputSplits}.
+ */
+ public static class Result {
+ private final List<MergeOnReadInputSplit> inputSplits; // input splits
+ private final String endInstant; // end instant to consume to
+
+ public static final Result EMPTY = instance(Collections.emptyList(), "");
+
+ public boolean isEmpty() {
+ return this.inputSplits.size() == 0;
+ }
+
+ public List<MergeOnReadInputSplit> getInputSplits() {
+ return this.inputSplits;
+ }
+
+ public String getEndInstant() {
+ return this.endInstant;
+ }
+
+ private Result(List<MergeOnReadInputSplit> inputSplits, String endInstant)
{
+ this.inputSplits = inputSplits;
+ this.endInstant = endInstant;
+ }
+
+ public static Result instance(List<MergeOnReadInputSplit> inputSplits,
String endInstant) {
+ return new Result(inputSplits, endInstant);
+ }
+ }
+
+ /**
+ * Builder for {@link IncrementalInputSplits}.
+ */
+ public static class Builder {
+ private Configuration conf;
+ private Path path;
+ private long maxCompactionMemoryInBytes;
+ // for partition pruning
+ private Set<String> requiredPartitions;
+
+ public Builder() {
+ }
+
+ public Builder conf(Configuration conf) {
+ this.conf = conf;
+ return this;
+ }
+
+ public Builder path(Path path) {
+ this.path = path;
+ return this;
+ }
+
+ public Builder maxCompactionMemoryInBytes(long maxCompactionMemoryInBytes)
{
+ this.maxCompactionMemoryInBytes = maxCompactionMemoryInBytes;
+ return this;
+ }
+
+ public Builder requiredPartitions(@Nullable Set<String>
requiredPartitions) {
+ this.requiredPartitions = requiredPartitions;
+ return this;
+ }
+
+ public IncrementalInputSplits build() {
+ return new IncrementalInputSplits(Objects.requireNonNull(this.conf),
Objects.requireNonNull(this.path),
+ this.maxCompactionMemoryInBytes, this.requiredPartitions);
+ }
+ }
}
diff --git
a/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadMonitoringFunction.java
b/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadMonitoringFunction.java
index c5610d2..bfd7452 100644
---
a/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadMonitoringFunction.java
+++
b/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadMonitoringFunction.java
@@ -18,19 +18,9 @@
package org.apache.hudi.source;
-import org.apache.hudi.common.model.BaseFile;
-import org.apache.hudi.common.model.HoodieCommitMetadata;
-import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.table.HoodieTableMetaClient;
-import org.apache.hudi.common.table.log.InstantRange;
-import org.apache.hudi.common.table.timeline.HoodieArchivedTimeline;
-import org.apache.hudi.common.table.timeline.HoodieInstant;
-import org.apache.hudi.common.table.timeline.HoodieTimeline;
-import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
-import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.configuration.FlinkOptions;
-import org.apache.hudi.sink.partitioner.profile.WriteProfiles;
import org.apache.hudi.table.format.mor.MergeOnReadInputSplit;
import org.apache.hudi.util.StreamerUtil;
@@ -45,24 +35,15 @@ import
org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.apache.hadoop.fs.FileStatus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
-
-import static
org.apache.hudi.common.table.timeline.HoodieTimeline.GREATER_THAN;
-import static
org.apache.hudi.common.table.timeline.HoodieTimeline.GREATER_THAN_OR_EQUALS;
/**
* This is the single (non-parallel) monitoring task which takes a {@link
MergeOnReadInputSplit}
@@ -112,21 +93,21 @@ public class StreamReadMonitoringFunction
private HoodieTableMetaClient metaClient;
- private final long maxCompactionMemoryInBytes;
-
- // for partition pruning
- private final Set<String> requiredPartitionPaths;
+ private final IncrementalInputSplits incrementalInputSplits;
public StreamReadMonitoringFunction(
Configuration conf,
Path path,
long maxCompactionMemoryInBytes,
- Set<String> requiredPartitionPaths) {
+ @Nullable Set<String> requiredPartitionPaths) {
this.conf = conf;
this.path = path;
this.interval =
conf.getInteger(FlinkOptions.READ_STREAMING_CHECK_INTERVAL);
- this.maxCompactionMemoryInBytes = maxCompactionMemoryInBytes;
- this.requiredPartitionPaths = requiredPartitionPaths;
+ this.incrementalInputSplits = IncrementalInputSplits.builder()
+ .conf(conf)
+ .path(path)
+ .maxCompactionMemoryInBytes(maxCompactionMemoryInBytes)
+ .requiredPartitions(requiredPartitionPaths).build();
}
@Override
@@ -208,98 +189,23 @@ public class StreamReadMonitoringFunction
// table does not exist
return;
}
- metaClient.reloadActiveTimeline();
- HoodieTimeline commitTimeline =
metaClient.getCommitsAndCompactionTimeline().filterCompletedAndCompactionInstants();
- if (commitTimeline.empty()) {
- LOG.warn("No splits found for the table under path " + path);
- return;
- }
- List<HoodieInstant> instants = filterInstantsWithStart(commitTimeline,
this.issuedInstant);
- // get the latest instant that satisfies condition
- final HoodieInstant instantToIssue = instants.size() == 0 ? null :
instants.get(instants.size() - 1);
- final InstantRange instantRange;
- if (instantToIssue != null) {
- if (this.issuedInstant != null) {
- // had already consumed an instant
- instantRange = InstantRange.getInstance(this.issuedInstant,
instantToIssue.getTimestamp(),
- InstantRange.RangeType.OPEN_CLOSE);
- } else if
(this.conf.getOptional(FlinkOptions.READ_STREAMING_START_COMMIT).isPresent()) {
- // first time consume and has a start commit
- final String specifiedStart =
this.conf.getString(FlinkOptions.READ_STREAMING_START_COMMIT);
- instantRange =
specifiedStart.equalsIgnoreCase(FlinkOptions.START_COMMIT_EARLIEST)
- ? null
- : InstantRange.getInstance(specifiedStart,
instantToIssue.getTimestamp(), InstantRange.RangeType.CLOSE_CLOSE);
- } else {
- // first time consume and no start commit, consumes the latest
incremental data set.
- instantRange = InstantRange.getInstance(instantToIssue.getTimestamp(),
instantToIssue.getTimestamp(),
- InstantRange.RangeType.CLOSE_CLOSE);
- }
- } else {
- LOG.info("No new instant found for the table under path " + path + ",
skip reading");
- return;
- }
- // generate input split:
- // 1. first fetch all the commit metadata for the incremental instants;
- // 2. filter the relative partition paths
- // 3. filter the full file paths
- // 4. use the file paths from #step 3 as the back-up of the filesystem view
-
- String tableName = conf.getString(FlinkOptions.TABLE_NAME);
- List<HoodieCommitMetadata> activeMetadataList = instants.stream()
- .map(instant -> WriteProfiles.getCommitMetadata(tableName, path,
instant, commitTimeline)).collect(Collectors.toList());
- List<HoodieCommitMetadata> archivedMetadataList =
getArchivedMetadata(instantRange, commitTimeline, tableName);
- if (archivedMetadataList.size() > 0) {
- LOG.warn(""
- +
"--------------------------------------------------------------------------------\n"
- + "---------- caution: the reader has fall behind too much from the
writer,\n"
- + "---------- tweak 'read.tasks' option to add parallelism of read
tasks.\n"
- +
"--------------------------------------------------------------------------------");
- }
- List<HoodieCommitMetadata> metadataList = archivedMetadataList.size() > 0
- ? mergeList(activeMetadataList, archivedMetadataList)
- : activeMetadataList;
-
- Set<String> writePartitions = getWritePartitionPaths(metadataList);
- // apply partition push down
- if (this.requiredPartitionPaths.size() > 0) {
- writePartitions = writePartitions.stream()
-
.filter(this.requiredPartitionPaths::contains).collect(Collectors.toSet());
- }
- FileStatus[] fileStatuses = WriteProfiles.getWritePathsOfInstants(path,
hadoopConf, metadataList);
- if (fileStatuses.length == 0) {
- LOG.warn("No files found for reading in user provided path.");
+ IncrementalInputSplits.Result result =
+ incrementalInputSplits.inputSplits(metaClient, this.hadoopConf,
this.issuedInstant);
+ if (result.isEmpty()) {
+ // no new instants, returns early
return;
}
- HoodieTableFileSystemView fsView = new
HoodieTableFileSystemView(metaClient, commitTimeline, fileStatuses);
- final String commitToIssue = instantToIssue.getTimestamp();
- final AtomicInteger cnt = new AtomicInteger(0);
- final String mergeType = this.conf.getString(FlinkOptions.MERGE_TYPE);
- List<MergeOnReadInputSplit> inputSplits = writePartitions.stream()
- .map(relPartitionPath ->
fsView.getLatestMergedFileSlicesBeforeOrOn(relPartitionPath, commitToIssue)
- .map(fileSlice -> {
- Option<List<String>> logPaths =
Option.ofNullable(fileSlice.getLogFiles()
- .sorted(HoodieLogFile.getLogFileComparator())
- .map(logFile -> logFile.getPath().toString())
- .collect(Collectors.toList()));
- String basePath =
fileSlice.getBaseFile().map(BaseFile::getPath).orElse(null);
- return new MergeOnReadInputSplit(cnt.getAndAdd(1),
- basePath, logPaths, commitToIssue,
- metaClient.getBasePath(), maxCompactionMemoryInBytes,
mergeType, instantRange);
- }).collect(Collectors.toList()))
- .flatMap(Collection::stream)
- .collect(Collectors.toList());
-
- for (MergeOnReadInputSplit split : inputSplits) {
+ for (MergeOnReadInputSplit split : result.getInputSplits()) {
context.collect(split);
}
// update the issues instant time
- this.issuedInstant = commitToIssue;
+ this.issuedInstant = result.getEndInstant();
LOG.info(""
- + "------------------------------------------------------------\n"
- + "---------- consumed to instant: {}\n"
- + "------------------------------------------------------------",
- commitToIssue);
+ + "------------------------------------------------------------\n"
+ + "---------- consumed to instant: {}\n"
+ + "------------------------------------------------------------",
+ this.issuedInstant);
}
@Override
@@ -343,87 +249,4 @@ public class StreamReadMonitoringFunction
this.instantState.add(this.issuedInstant);
}
}
-
- /**
- * Returns the archived metadata in case the reader consumes untimely or it
wants
- * to read from the earliest.
- *
- * <p>Note: should improve it with metadata table when the metadata table is
stable enough.
- *
- * @param instantRange The instant range to filter the timeline instants
- * @param commitTimeline The commit timeline
- * @param tableName The table name
- * @return the list of archived metadata, or empty if there is no need to
read the archived timeline
- */
- private List<HoodieCommitMetadata> getArchivedMetadata(
- InstantRange instantRange,
- HoodieTimeline commitTimeline,
- String tableName) {
- if (instantRange == null ||
commitTimeline.isBeforeTimelineStarts(instantRange.getStartInstant())) {
- // read the archived metadata if:
- // 1. the start commit is 'earliest';
- // 2. the start instant is archived.
- HoodieArchivedTimeline archivedTimeline =
metaClient.getArchivedTimeline();
- HoodieTimeline archivedCompleteTimeline =
archivedTimeline.getCommitsTimeline().filterCompletedInstants();
- if (!archivedCompleteTimeline.empty()) {
- final String endTs =
archivedCompleteTimeline.lastInstant().get().getTimestamp();
- Stream<HoodieInstant> instantStream =
archivedCompleteTimeline.getInstants();
- if (instantRange != null) {
-
archivedTimeline.loadInstantDetailsInMemory(instantRange.getStartInstant(),
endTs);
- instantStream = instantStream.filter(s ->
HoodieTimeline.compareTimestamps(s.getTimestamp(), GREATER_THAN_OR_EQUALS,
instantRange.getStartInstant()));
- } else {
- final String startTs =
archivedCompleteTimeline.firstInstant().get().getTimestamp();
- archivedTimeline.loadInstantDetailsInMemory(startTs, endTs);
- }
- return instantStream
- .map(instant -> WriteProfiles.getCommitMetadata(tableName, path,
instant, archivedTimeline)).collect(Collectors.toList());
- }
- }
- return Collections.emptyList();
- }
-
- /**
- * Returns the instants with a given issuedInstant to start from.
- *
- * @param commitTimeline The completed commits timeline
- * @param issuedInstant The last issued instant that has already been
delivered to downstream
- * @return the filtered hoodie instants
- */
- private List<HoodieInstant> filterInstantsWithStart(
- HoodieTimeline commitTimeline,
- final String issuedInstant) {
- HoodieTimeline completedTimeline =
commitTimeline.filterCompletedInstants();
- if (issuedInstant != null) {
- return completedTimeline.getInstants()
- .filter(s -> HoodieTimeline.compareTimestamps(s.getTimestamp(),
GREATER_THAN, issuedInstant))
- .collect(Collectors.toList());
- } else if
(this.conf.getOptional(FlinkOptions.READ_STREAMING_START_COMMIT).isPresent()
- &&
!this.conf.get(FlinkOptions.READ_STREAMING_START_COMMIT).equalsIgnoreCase(FlinkOptions.START_COMMIT_EARLIEST))
{
- String definedStartCommit =
this.conf.get(FlinkOptions.READ_STREAMING_START_COMMIT);
- return completedTimeline.getInstants()
- .filter(s -> HoodieTimeline.compareTimestamps(s.getTimestamp(),
GREATER_THAN_OR_EQUALS, definedStartCommit))
- .collect(Collectors.toList());
- } else {
- return completedTimeline.getInstants().collect(Collectors.toList());
- }
- }
-
- /**
- * Returns all the incremental write partition paths as a set with the given
commits metadata.
- *
- * @param metadataList The commits metadata
- * @return the partition path set
- */
- private Set<String> getWritePartitionPaths(List<HoodieCommitMetadata>
metadataList) {
- return metadataList.stream()
- .map(HoodieCommitMetadata::getWritePartitionPaths)
- .flatMap(Collection::stream)
- .collect(Collectors.toSet());
- }
-
- private static <T> List<T> mergeList(List<T> list1, List<T> list2) {
- List<T> merged = new ArrayList<>(list1);
- merged.addAll(list2);
- return merged;
- }
}
diff --git
a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java
b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java
index cf1cbd5..a2d0960 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java
@@ -155,6 +155,8 @@ public class HoodieTableFactory implements
DynamicTableSourceFactory, DynamicTab
setupCompactionOptions(conf);
// hive options
setupHiveOptions(conf);
+ // read options
+ setupReadOptions(conf);
// infer avro schema from physical DDL schema
inferAvroSchema(conf,
schema.toPhysicalRowDataType().notNull().getLogicalType());
}
@@ -271,6 +273,16 @@ public class HoodieTableFactory implements
DynamicTableSourceFactory, DynamicTab
}
/**
+ * Sets up the read options from the table definition.
+ */
+ private static void setupReadOptions(Configuration conf) {
+ if (!conf.getBoolean(FlinkOptions.READ_AS_STREAMING)
+ && (conf.getOptional(FlinkOptions.READ_START_COMMIT).isPresent() ||
conf.getOptional(FlinkOptions.READ_END_COMMIT).isPresent())) {
+ conf.setString(FlinkOptions.QUERY_TYPE,
FlinkOptions.QUERY_TYPE_INCREMENTAL);
+ }
+ }
+
+ /**
* Inferences the deserialization Avro schema from the table schema (e.g.
the DDL)
* if both options {@link FlinkOptions#SOURCE_AVRO_SCHEMA_PATH} and
* {@link FlinkOptions#SOURCE_AVRO_SCHEMA} are not specified.
diff --git
a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
index 43743fc..0494143 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
@@ -19,7 +19,6 @@
package org.apache.hudi.table;
import org.apache.hudi.avro.HoodieAvroUtils;
-import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.BaseFile;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodieTableType;
@@ -31,6 +30,7 @@ import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.hadoop.HoodieROTablePathFilter;
import org.apache.hudi.source.FileIndex;
+import org.apache.hudi.source.IncrementalInputSplits;
import org.apache.hudi.source.StreamReadMonitoringFunction;
import org.apache.hudi.source.StreamReadOperator;
import org.apache.hudi.table.format.FilePathUtils;
@@ -108,6 +108,9 @@ public class HoodieTableSource implements
private static final int NO_LIMIT_CONSTANT = -1;
+ private static final InputFormat<RowData, ?> EMPTY_INPUT_FORMAT =
+ new CollectionInputFormat<>(Collections.emptyList(), null);
+
private final transient org.apache.hadoop.conf.Configuration hadoopConf;
private final transient HoodieTableMetaClient metaClient;
private final long maxCompactionMemoryInBytes;
@@ -220,7 +223,7 @@ public class HoodieTableSource implements
public Result applyFilters(List<ResolvedExpression> filters) {
this.filters = new ArrayList<>(filters);
// refuse all the filters now
- return Result.of(Collections.emptyList(), new ArrayList<>(filters));
+ return SupportsFilterPushDown.Result.of(Collections.emptyList(), new
ArrayList<>(filters));
}
@Override
@@ -256,8 +259,8 @@ public class HoodieTableSource implements
DataType[] schemaTypes = this.schema.getColumnDataTypes().toArray(new
DataType[0]);
return DataTypes.ROW(Arrays.stream(this.requiredPos)
- .mapToObj(i -> DataTypes.FIELD(schemaFieldNames[i], schemaTypes[i]))
- .toArray(DataTypes.Field[]::new))
+ .mapToObj(i -> DataTypes.FIELD(schemaFieldNames[i],
schemaTypes[i]))
+ .toArray(DataTypes.Field[]::new))
.bridgedTo(RowData.class);
}
@@ -268,16 +271,21 @@ public class HoodieTableSource implements
return requiredPartitions;
}
+ @Nullable
private Set<String> getRequiredPartitionPaths() {
if (this.requiredPartitions == null) {
- return Collections.emptySet();
+ // returns null for non partition pruning
+ return null;
}
return FilePathUtils.toRelativePartitionPaths(this.partitionKeys,
this.requiredPartitions,
conf.getBoolean(FlinkOptions.HIVE_STYLE_PARTITIONING));
}
- private List<MergeOnReadInputSplit> buildFileIndex(Path[] paths) {
- if (paths.length == 0) {
+ private List<MergeOnReadInputSplit> buildFileIndex() {
+ Set<String> requiredPartitionPaths = getRequiredPartitionPaths();
+ fileIndex.setPartitionPaths(requiredPartitionPaths);
+ List<String> relPartitionPaths = fileIndex.getOrBuildPartitionPaths();
+ if (relPartitionPaths.size() == 0) {
return Collections.emptyList();
}
FileStatus[] fileStatuses = fileIndex.getFilesInPartitions();
@@ -292,19 +300,17 @@ public class HoodieTableSource implements
final String mergeType = this.conf.getString(FlinkOptions.MERGE_TYPE);
final AtomicInteger cnt = new AtomicInteger(0);
// generates one input split for each file group
- return Arrays.stream(paths).map(partitionPath -> {
- String relPartitionPath = FSUtils.getRelativePartitionPath(path,
partitionPath);
- return fsView.getLatestMergedFileSlicesBeforeOrOn(relPartitionPath,
latestCommit)
- .map(fileSlice -> {
- String basePath =
fileSlice.getBaseFile().map(BaseFile::getPath).orElse(null);
- Option<List<String>> logPaths =
Option.ofNullable(fileSlice.getLogFiles()
- .sorted(HoodieLogFile.getLogFileComparator())
- .map(logFile -> logFile.getPath().toString())
- .collect(Collectors.toList()));
- return new MergeOnReadInputSplit(cnt.getAndAdd(1), basePath,
logPaths, latestCommit,
- metaClient.getBasePath(), maxCompactionMemoryInBytes,
mergeType, null);
- }).collect(Collectors.toList());
- })
+ return relPartitionPaths.stream()
+ .map(relPartitionPath ->
fsView.getLatestMergedFileSlicesBeforeOrOn(relPartitionPath, latestCommit)
+ .map(fileSlice -> {
+ String basePath =
fileSlice.getBaseFile().map(BaseFile::getPath).orElse(null);
+ Option<List<String>> logPaths =
Option.ofNullable(fileSlice.getLogFiles()
+ .sorted(HoodieLogFile.getLogFileComparator())
+ .map(logFile -> logFile.getPath().toString())
+ .collect(Collectors.toList()));
+ return new MergeOnReadInputSplit(cnt.getAndAdd(1), basePath,
logPaths, latestCommit,
+ metaClient.getBasePath(), maxCompactionMemoryInBytes,
mergeType, null);
+ }).collect(Collectors.toList()))
.flatMap(Collection::stream)
.collect(Collectors.toList());
}
@@ -319,16 +325,6 @@ public class HoodieTableSource implements
}
private InputFormat<RowData, ?> getBatchInputFormat() {
- // When this table has no partition, just return an empty source.
- if (!partitionKeys.isEmpty() && getOrFetchPartitions().isEmpty()) {
- return new CollectionInputFormat<>(Collections.emptyList(), null);
- }
-
- final Path[] paths = getReadPaths();
- if (paths.length == 0) {
- return new CollectionInputFormat<>(Collections.emptyList(), null);
- }
-
final Schema tableAvroSchema = getTableAvroSchema();
final DataType rowDataType =
AvroSchemaConverter.convertToDataType(tableAvroSchema);
final RowType rowType = (RowType) rowDataType.getLogicalType();
@@ -340,62 +336,37 @@ public class HoodieTableSource implements
final HoodieTableType tableType =
HoodieTableType.valueOf(this.conf.getString(FlinkOptions.TABLE_TYPE));
switch (tableType) {
case MERGE_ON_READ:
- final List<MergeOnReadInputSplit> inputSplits =
buildFileIndex(paths);
+ final List<MergeOnReadInputSplit> inputSplits = buildFileIndex();
if (inputSplits.size() == 0) {
// When there is no input splits, just return an empty source.
LOG.warn("No input splits generate for MERGE_ON_READ input
format, returns empty collection instead");
- return new CollectionInputFormat<>(Collections.emptyList(),
null);
+ return EMPTY_INPUT_FORMAT;
}
- final MergeOnReadTableState hoodieTableState = new
MergeOnReadTableState(
- rowType,
- requiredRowType,
- tableAvroSchema.toString(),
-
AvroSchemaConverter.convertToSchema(requiredRowType).toString(),
- inputSplits,
- conf.getString(FlinkOptions.RECORD_KEY_FIELD).split(","));
- return MergeOnReadInputFormat.builder()
- .config(this.conf)
- .paths(FilePathUtils.toFlinkPaths(paths))
- .tableState(hoodieTableState)
- // use the explicit fields data type because the
AvroSchemaConverter
- // is not very stable.
- .fieldTypes(rowDataType.getChildren())
-
.defaultPartName(conf.getString(FlinkOptions.PARTITION_DEFAULT_NAME))
- .limit(this.limit)
- .emitDelete(false)
- .build();
+ return mergeOnReadInputFormat(rowType, requiredRowType,
tableAvroSchema,
+ rowDataType, inputSplits, false);
case COPY_ON_WRITE:
- FileInputFormat<RowData> format = new CopyOnWriteInputFormat(
- FilePathUtils.toFlinkPaths(paths),
- this.schema.getColumnNames().toArray(new String[0]),
- this.schema.getColumnDataTypes().toArray(new DataType[0]),
- this.requiredPos,
- this.conf.getString(FlinkOptions.PARTITION_DEFAULT_NAME),
- this.limit == NO_LIMIT_CONSTANT ? Long.MAX_VALUE : this.limit,
// ParquetInputFormat always uses the limit value
- getParquetConf(this.conf, this.hadoopConf),
- this.conf.getBoolean(FlinkOptions.UTC_TIMEZONE)
- );
- format.setFilesFilter(new LatestFileFilter(this.hadoopConf));
- return format;
+ return baseFileOnlyInputFormat();
default:
throw new HoodieException("Unexpected table type: " +
this.conf.getString(FlinkOptions.TABLE_TYPE));
}
case FlinkOptions.QUERY_TYPE_READ_OPTIMIZED:
- FileInputFormat<RowData> format = new CopyOnWriteInputFormat(
- FilePathUtils.toFlinkPaths(paths),
- this.schema.getColumnNames().toArray(new String[0]),
- this.schema.getColumnDataTypes().toArray(new DataType[0]),
- this.requiredPos,
- "default",
- this.limit == NO_LIMIT_CONSTANT ? Long.MAX_VALUE : this.limit, //
ParquetInputFormat always uses the limit value
- getParquetConf(this.conf, this.hadoopConf),
- this.conf.getBoolean(FlinkOptions.UTC_TIMEZONE)
- );
- format.setFilesFilter(new LatestFileFilter(this.hadoopConf));
- return format;
+ return baseFileOnlyInputFormat();
+ case FlinkOptions.QUERY_TYPE_INCREMENTAL:
+ IncrementalInputSplits incrementalInputSplits =
IncrementalInputSplits.builder()
+ .conf(conf).path(FilePathUtils.toFlinkPath(path))
+ .maxCompactionMemoryInBytes(maxCompactionMemoryInBytes)
+ .requiredPartitions(getRequiredPartitionPaths()).build();
+ final IncrementalInputSplits.Result result =
incrementalInputSplits.inputSplits(metaClient, hadoopConf);
+ if (result.isEmpty()) {
+ // When there is no input splits, just return an empty source.
+ LOG.warn("No input splits generate for incremental read, returns
empty collection instead");
+ return new CollectionInputFormat<>(Collections.emptyList(), null);
+ }
+ return mergeOnReadInputFormat(rowType, requiredRowType,
tableAvroSchema,
+ rowDataType, result.getInputSplits(), false);
default:
- String errMsg = String.format("Invalid query type : '%s', options
['%s', '%s'] are supported now", queryType,
- FlinkOptions.QUERY_TYPE_SNAPSHOT,
FlinkOptions.QUERY_TYPE_READ_OPTIMIZED);
+ String errMsg = String.format("Invalid query type : '%s', options
['%s', '%s', '%s'] are supported now", queryType,
+ FlinkOptions.QUERY_TYPE_SNAPSHOT,
FlinkOptions.QUERY_TYPE_READ_OPTIMIZED, FlinkOptions.QUERY_TYPE_INCREMENTAL);
throw new HoodieException(errMsg);
}
}
@@ -408,56 +379,62 @@ public class HoodieTableSource implements
final RowType requiredRowType = (RowType)
getProducedDataType().notNull().getLogicalType();
final String queryType = this.conf.getString(FlinkOptions.QUERY_TYPE);
- org.apache.flink.core.fs.Path[] paths = new
org.apache.flink.core.fs.Path[0];
if (FlinkOptions.QUERY_TYPE_SNAPSHOT.equals(queryType)) {
final HoodieTableType tableType =
HoodieTableType.valueOf(this.conf.getString(FlinkOptions.TABLE_TYPE));
- switch (tableType) {
- case MERGE_ON_READ:
- final MergeOnReadTableState hoodieTableState = new
MergeOnReadTableState(
- rowType,
- requiredRowType,
- tableAvroSchema.toString(),
- AvroSchemaConverter.convertToSchema(requiredRowType).toString(),
- Collections.emptyList(),
- conf.getString(FlinkOptions.RECORD_KEY_FIELD).split(","));
- return MergeOnReadInputFormat.builder()
- .config(this.conf)
- .paths(paths)
- .tableState(hoodieTableState)
- // use the explicit fields data type because the
AvroSchemaConverter
- // is not very stable.
- .fieldTypes(rowDataType.getChildren())
-
.defaultPartName(conf.getString(FlinkOptions.PARTITION_DEFAULT_NAME))
- .limit(this.limit)
- .emitDelete(true)
- .build();
- case COPY_ON_WRITE:
- final MergeOnReadTableState hoodieTableState2 = new
MergeOnReadTableState(
- rowType,
- requiredRowType,
- tableAvroSchema.toString(),
- AvroSchemaConverter.convertToSchema(requiredRowType).toString(),
- Collections.emptyList(),
- conf.getString(FlinkOptions.RECORD_KEY_FIELD).split(","));
- return MergeOnReadInputFormat.builder()
- .config(this.conf)
- .paths(paths)
- .tableState(hoodieTableState2)
- // use the explicit fields data type because the
AvroSchemaConverter
- // is not very stable.
- .fieldTypes(rowDataType.getChildren())
-
.defaultPartName(conf.getString(FlinkOptions.PARTITION_DEFAULT_NAME))
- .limit(this.limit)
- .build();
- default:
- throw new HoodieException("Unexpected table type: " +
this.conf.getString(FlinkOptions.TABLE_TYPE));
- }
+ boolean emitDelete = tableType == HoodieTableType.MERGE_ON_READ;
+ return mergeOnReadInputFormat(rowType, requiredRowType, tableAvroSchema,
+ rowDataType, Collections.emptyList(), emitDelete);
}
String errMsg = String.format("Invalid query type : '%s', options ['%s']
are supported now", queryType,
FlinkOptions.QUERY_TYPE_SNAPSHOT);
throw new HoodieException(errMsg);
}
+ private MergeOnReadInputFormat mergeOnReadInputFormat(
+ RowType rowType,
+ RowType requiredRowType,
+ Schema tableAvroSchema,
+ DataType rowDataType,
+ List<MergeOnReadInputSplit> inputSplits,
+ boolean emitDelete) {
+ final MergeOnReadTableState hoodieTableState = new MergeOnReadTableState(
+ rowType,
+ requiredRowType,
+ tableAvroSchema.toString(),
+ AvroSchemaConverter.convertToSchema(requiredRowType).toString(),
+ inputSplits,
+ conf.getString(FlinkOptions.RECORD_KEY_FIELD).split(","));
+ return MergeOnReadInputFormat.builder()
+ .config(this.conf)
+ .tableState(hoodieTableState)
+ // use the explicit fields' data type because the AvroSchemaConverter
+ // is not very stable.
+ .fieldTypes(rowDataType.getChildren())
+ .defaultPartName(conf.getString(FlinkOptions.PARTITION_DEFAULT_NAME))
+ .limit(this.limit)
+ .emitDelete(emitDelete)
+ .build();
+ }
+
+ private InputFormat<RowData, ?> baseFileOnlyInputFormat() {
+ final Path[] paths = getReadPaths();
+ if (paths.length == 0) {
+ return EMPTY_INPUT_FORMAT;
+ }
+ FileInputFormat<RowData> format = new CopyOnWriteInputFormat(
+ FilePathUtils.toFlinkPaths(paths),
+ this.schema.getColumnNames().toArray(new String[0]),
+ this.schema.getColumnDataTypes().toArray(new DataType[0]),
+ this.requiredPos,
+ this.conf.getString(FlinkOptions.PARTITION_DEFAULT_NAME),
+ this.limit == NO_LIMIT_CONSTANT ? Long.MAX_VALUE : this.limit, //
ParquetInputFormat always uses the limit value
+ getParquetConf(this.conf, this.hadoopConf),
+ this.conf.getBoolean(FlinkOptions.UTC_TIMEZONE)
+ );
+ format.setFilesFilter(new LatestFileFilter(this.hadoopConf));
+ return format;
+ }
+
private Schema inferSchemaFromDdl() {
Schema schema =
AvroSchemaConverter.convertToSchema(this.schema.toPhysicalRowDataType().getLogicalType());
return HoodieAvroUtils.addMetadataFields(schema,
conf.getBoolean(FlinkOptions.CHANGELOG_ENABLED));
diff --git
a/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java
b/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java
index 2042b96..e3a8eee 100644
---
a/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java
+++
b/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java
@@ -45,7 +45,6 @@ import
org.apache.flink.api.common.io.DefaultInputSplitAssigner;
import org.apache.flink.api.common.io.RichInputFormat;
import org.apache.flink.api.common.io.statistics.BaseStatistics;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.fs.Path;
import org.apache.flink.core.io.InputSplitAssigner;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
@@ -85,8 +84,6 @@ public class MergeOnReadInputFormat
private transient org.apache.hadoop.conf.Configuration hadoopConf;
- private Path[] paths;
-
private final MergeOnReadTableState tableState;
/**
@@ -134,14 +131,12 @@ public class MergeOnReadInputFormat
private MergeOnReadInputFormat(
Configuration conf,
- Path[] paths,
MergeOnReadTableState tableState,
List<DataType> fieldTypes,
String defaultPartName,
long limit,
boolean emitDelete) {
this.conf = conf;
- this.paths = paths;
this.tableState = tableState;
this.fieldNames = tableState.getRowType().getFieldNames();
this.fieldTypes = fieldTypes;
@@ -165,7 +160,7 @@ public class MergeOnReadInputFormat
this.currentReadCount = 0L;
this.hadoopConf = StreamerUtil.getHadoopConf();
if (!(split.getLogPaths().isPresent() && split.getLogPaths().get().size()
> 0)) {
- if (conf.getBoolean(FlinkOptions.READ_AS_STREAMING)) {
+ if (split.getInstantRange() != null) {
// base file only with commit time filtering
this.iterator = new BaseFileOnlyFilteringIterator(
split.getInstantRange(),
@@ -212,16 +207,8 @@ public class MergeOnReadInputFormat
@Override
public void configure(Configuration configuration) {
- if (this.paths.length == 0) {
- // file path was not specified yet. Try to set it from the parameters.
- String filePath = configuration.getString(FlinkOptions.PATH, null);
- if (filePath == null) {
- throw new IllegalArgumentException("File path was not specified in
input format or configuration.");
- } else {
- this.paths = new Path[] {new Path(filePath)};
- }
- }
- // may supports nested files in the future.
+ // no operation
+ // may support nested files in the future.
}
@Override
@@ -750,7 +737,6 @@ public class MergeOnReadInputFormat
*/
public static class Builder {
private Configuration conf;
- private Path[] paths;
private MergeOnReadTableState tableState;
private List<DataType> fieldTypes;
private String defaultPartName;
@@ -762,11 +748,6 @@ public class MergeOnReadInputFormat
return this;
}
- public Builder paths(Path[] paths) {
- this.paths = paths;
- return this;
- }
-
public Builder tableState(MergeOnReadTableState tableState) {
this.tableState = tableState;
return this;
@@ -793,8 +774,8 @@ public class MergeOnReadInputFormat
}
public MergeOnReadInputFormat build() {
- return new MergeOnReadInputFormat(conf, paths, tableState,
- fieldTypes, defaultPartName, limit, emitDelete);
+ return new MergeOnReadInputFormat(conf, tableState, fieldTypes,
+ defaultPartName, limit, emitDelete);
}
}
diff --git a/hudi-flink/src/test/java/org/apache/hudi/source/TestFileIndex.java
b/hudi-flink/src/test/java/org/apache/hudi/source/TestFileIndex.java
index f229f2d..334df59 100644
--- a/hudi-flink/src/test/java/org/apache/hudi/source/TestFileIndex.java
+++ b/hudi-flink/src/test/java/org/apache/hudi/source/TestFileIndex.java
@@ -88,4 +88,18 @@ public class TestFileIndex {
assertThat(fileStatuses.length, is(1));
assertTrue(fileStatuses[0].getPath().toString().endsWith(HoodieFileFormat.PARQUET.getFileExtension()));
}
+
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ void testFileListingEmptyTable(boolean enableMetadata) {
+ Configuration conf =
TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());
+ conf.setBoolean(FlinkOptions.METADATA_ENABLED, enableMetadata);
+ FileIndex fileIndex = FileIndex.instance(new
Path(tempFile.getAbsolutePath()), conf);
+ List<String> partitionKeys = Collections.singletonList("partition");
+ List<Map<String, String>> partitions =
fileIndex.getPartitions(partitionKeys, "default", false);
+ assertThat(partitions.size(), is(0));
+
+ FileStatus[] fileStatuses = fileIndex.getFilesInPartitions();
+ assertThat(fileStatuses.length, is(0));
+ }
}
diff --git
a/hudi-flink/src/test/java/org/apache/hudi/source/TestStreamReadMonitoringFunction.java
b/hudi-flink/src/test/java/org/apache/hudi/source/TestStreamReadMonitoringFunction.java
index d13f683..3687e9d 100644
---
a/hudi-flink/src/test/java/org/apache/hudi/source/TestStreamReadMonitoringFunction.java
+++
b/hudi-flink/src/test/java/org/apache/hudi/source/TestStreamReadMonitoringFunction.java
@@ -144,7 +144,7 @@ public class TestStreamReadMonitoringFunction {
TestData.writeData(TestData.DATA_SET_INSERT, conf);
TestData.writeData(TestData.DATA_SET_UPDATE_INSERT, conf);
String specifiedCommit =
TestUtils.getLatestCommit(tempFile.getAbsolutePath());
- conf.setString(FlinkOptions.READ_STREAMING_START_COMMIT, specifiedCommit);
+ conf.setString(FlinkOptions.READ_START_COMMIT, specifiedCommit);
StreamReadMonitoringFunction function = TestUtils.getMonitorFunc(conf);
try (AbstractStreamOperatorTestHarness<MergeOnReadInputSplit> harness =
createHarness(function)) {
harness.setup();
@@ -175,7 +175,7 @@ public class TestStreamReadMonitoringFunction {
TestData.writeData(TestData.DATA_SET_INSERT, conf);
TestData.writeData(TestData.DATA_SET_UPDATE_INSERT, conf);
String specifiedCommit =
TestUtils.getLatestCommit(tempFile.getAbsolutePath());
- conf.setString(FlinkOptions.READ_STREAMING_START_COMMIT,
FlinkOptions.START_COMMIT_EARLIEST);
+ conf.setString(FlinkOptions.READ_START_COMMIT,
FlinkOptions.START_COMMIT_EARLIEST);
StreamReadMonitoringFunction function = TestUtils.getMonitorFunc(conf);
try (AbstractStreamOperatorTestHarness<MergeOnReadInputSplit> harness =
createHarness(function)) {
harness.setup();
diff --git
a/hudi-flink/src/test/java/org/apache/hudi/source/TestStreamReadOperator.java
b/hudi-flink/src/test/java/org/apache/hudi/source/TestStreamReadOperator.java
index 233e6fa..911c685 100644
---
a/hudi-flink/src/test/java/org/apache/hudi/source/TestStreamReadOperator.java
+++
b/hudi-flink/src/test/java/org/apache/hudi/source/TestStreamReadOperator.java
@@ -22,7 +22,6 @@ import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.TableSchemaResolver;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.exception.HoodieException;
-import org.apache.hudi.table.format.FilePathUtils;
import org.apache.hudi.table.format.mor.MergeOnReadInputFormat;
import org.apache.hudi.table.format.mor.MergeOnReadInputSplit;
import org.apache.hudi.table.format.mor.MergeOnReadTableState;
@@ -45,7 +44,6 @@ import
org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.RowType;
-import org.apache.hadoop.fs.Path;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
@@ -263,10 +261,8 @@ public class TestStreamReadOperator {
AvroSchemaConverter.convertToSchema(TestConfigurations.ROW_TYPE).toString(),
Collections.emptyList(),
new String[0]);
- Path[] paths = FilePathUtils.getReadPaths(new Path(basePath), conf,
hadoopConf, partitionKeys);
MergeOnReadInputFormat inputFormat = MergeOnReadInputFormat.builder()
.config(conf)
- .paths(FilePathUtils.toFlinkPaths(paths))
.tableState(hoodieTableState)
.fieldTypes(rowDataType.getChildren())
.defaultPartName("default").limit(1000L)
diff --git
a/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java
b/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java
index 9d0bcab..db7111b 100644
--- a/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java
+++ b/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java
@@ -113,7 +113,7 @@ public class HoodieDataSourceITCase extends
AbstractTestBase {
.option(FlinkOptions.PATH, tempFile.getAbsolutePath())
.option(FlinkOptions.READ_AS_STREAMING, true)
.option(FlinkOptions.TABLE_TYPE, tableType)
- .option(FlinkOptions.READ_STREAMING_START_COMMIT, firstCommit)
+ .option(FlinkOptions.READ_START_COMMIT, firstCommit)
.end();
streamTableEnv.executeSql(hoodieTableDDL);
List<Row> rows = execSelectSql(streamTableEnv, "select * from t1", 10);
@@ -186,7 +186,7 @@ public class HoodieDataSourceITCase extends
AbstractTestBase {
.option(FlinkOptions.PATH, tempFile.getAbsolutePath())
.option(FlinkOptions.READ_AS_STREAMING, true)
.option(FlinkOptions.TABLE_TYPE, tableType)
- .option(FlinkOptions.READ_STREAMING_START_COMMIT, specifiedCommit)
+ .option(FlinkOptions.READ_START_COMMIT, specifiedCommit)
.end();
streamTableEnv.executeSql(createHoodieTable2);
List<Row> rows = execSelectSql(streamTableEnv, "select * from t2", 10);
@@ -289,7 +289,7 @@ public class HoodieDataSourceITCase extends
AbstractTestBase {
.option(FlinkOptions.TABLE_TYPE, FlinkOptions.TABLE_TYPE_MERGE_ON_READ)
.option(FlinkOptions.READ_AS_STREAMING, true)
.option(FlinkOptions.READ_STREAMING_CHECK_INTERVAL, 2)
- .option(FlinkOptions.READ_STREAMING_START_COMMIT, latestCommit)
+ .option(FlinkOptions.READ_START_COMMIT, latestCommit)
.option(FlinkOptions.CHANGELOG_ENABLED, true)
.end();
streamTableEnv.executeSql(hoodieTableDDL);
@@ -343,7 +343,7 @@ public class HoodieDataSourceITCase extends
AbstractTestBase {
.option(FlinkOptions.PATH, tempFile.getAbsolutePath())
.option(FlinkOptions.TABLE_TYPE, FlinkOptions.TABLE_TYPE_MERGE_ON_READ)
.option(FlinkOptions.READ_AS_STREAMING, true)
- .option(FlinkOptions.READ_STREAMING_START_COMMIT,
FlinkOptions.START_COMMIT_EARLIEST)
+ .option(FlinkOptions.READ_START_COMMIT,
FlinkOptions.START_COMMIT_EARLIEST)
.option(FlinkOptions.READ_STREAMING_CHECK_INTERVAL, 2)
// close the async compaction
.option(FlinkOptions.COMPACTION_ASYNC_ENABLED, false)
@@ -879,6 +879,33 @@ public class HoodieDataSourceITCase extends
AbstractTestBase {
assertRowsEquals(result1, "[+I[1.23, 12345678.12, 12345.12,
123456789.123450000000000000]]");
}
+ @ParameterizedTest
+ @EnumSource(value = HoodieTableType.class)
+ void testIncrementalRead(HoodieTableType tableType) throws Exception {
+ TableEnvironment tableEnv = batchTableEnv;
+ Configuration conf =
TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());
+ conf.setString(FlinkOptions.TABLE_NAME, "t1");
+ conf.setString(FlinkOptions.TABLE_TYPE, tableType.name());
+
+ // write 3 batches of data set
+ TestData.writeData(TestData.dataSetInsert(1, 2), conf);
+ TestData.writeData(TestData.dataSetInsert(3, 4), conf);
+ TestData.writeData(TestData.dataSetInsert(5, 6), conf);
+
+ String latestCommit =
TestUtils.getLatestCommit(tempFile.getAbsolutePath());
+
+ String hoodieTableDDL = sql("t1")
+ .option(FlinkOptions.PATH, tempFile.getAbsolutePath())
+ .option(FlinkOptions.TABLE_TYPE, tableType)
+ .option(FlinkOptions.READ_START_COMMIT, latestCommit)
+ .end();
+ tableEnv.executeSql(hoodieTableDDL);
+
+ List<Row> result = CollectionUtil.iterableToList(
+ () -> tableEnv.sqlQuery("select * from t1").execute().collect());
+ assertRowsEquals(result, TestData.dataSetInsert(5, 6));
+ }
+
// -------------------------------------------------------------------------
// Utilities
// -------------------------------------------------------------------------
diff --git
a/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java
b/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java
index 1572dd4..bbbb49d 100644
--- a/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java
+++ b/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java
@@ -232,6 +232,32 @@ public class TestHoodieTableFactory {
}
@Test
+ void testSetupReadOptionsForSource() {
+ // definition with simple primary key and partition path
+ ResolvedSchema schema1 = SchemaBuilder.instance()
+ .field("f0", DataTypes.INT().notNull())
+ .field("f1", DataTypes.VARCHAR(20))
+ .field("f2", DataTypes.TIMESTAMP(3))
+ .field("ts", DataTypes.TIMESTAMP(3))
+ .primaryKey("f0")
+ .build();
+ // set up new retains commits that is less than min archive commits
+ this.conf.setString(FlinkOptions.READ_END_COMMIT, "123");
+
+ final MockContext sourceContext1 = MockContext.getInstance(this.conf,
schema1, "f2");
+ final HoodieTableSource tableSource1 = (HoodieTableSource) new
HoodieTableFactory().createDynamicTableSource(sourceContext1);
+ final Configuration conf1 = tableSource1.getConf();
+ assertThat(conf1.getString(FlinkOptions.QUERY_TYPE),
is(FlinkOptions.QUERY_TYPE_INCREMENTAL));
+
+ this.conf.removeConfig(FlinkOptions.READ_END_COMMIT);
+ this.conf.setString(FlinkOptions.READ_START_COMMIT, "123");
+ final MockContext sourceContext2 = MockContext.getInstance(this.conf,
schema1, "f2");
+ final HoodieTableSource tableSource2 = (HoodieTableSource) new
HoodieTableFactory().createDynamicTableSource(sourceContext2);
+ final Configuration conf2 = tableSource2.getConf();
+ assertThat(conf2.getString(FlinkOptions.QUERY_TYPE),
is(FlinkOptions.QUERY_TYPE_INCREMENTAL));
+ }
+
+ @Test
void testInferAvroSchemaForSink() {
// infer the schema if not specified
final HoodieTableSink tableSink1 =
diff --git
a/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableSource.java
b/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableSource.java
index d50a716..8ee18a9 100644
--- a/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableSource.java
+++ b/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableSource.java
@@ -19,7 +19,6 @@
package org.apache.hudi.table;
import org.apache.hudi.configuration.FlinkOptions;
-import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.table.format.mor.MergeOnReadInputFormat;
import org.apache.hudi.utils.TestConfigurations;
import org.apache.hudi.utils.TestData;
@@ -31,6 +30,7 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.data.RowData;
import org.apache.hadoop.fs.Path;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.function.ThrowingSupplier;
import org.junit.jupiter.api.io.TempDir;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -46,9 +46,9 @@ import java.util.stream.Collectors;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.core.Is.is;
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
-import static org.junit.jupiter.api.Assertions.assertThrows;
/**
* Test cases for HoodieTableSource.
@@ -112,9 +112,9 @@ public class TestHoodieTableSource {
inputFormat = tableSource.getInputFormat();
assertThat(inputFormat, is(instanceOf(MergeOnReadInputFormat.class)));
conf.setString(FlinkOptions.QUERY_TYPE.key(),
FlinkOptions.QUERY_TYPE_INCREMENTAL);
- assertThrows(HoodieException.class,
- () -> tableSource.getInputFormat(),
- "Invalid query type : 'incremental'. Only 'snapshot' is supported
now");
+ assertDoesNotThrow(
+ (ThrowingSupplier<? extends InputFormat<RowData, ?>>)
tableSource::getInputFormat,
+ "Query type: 'incremental' should be supported");
}
@Test
diff --git
a/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java
b/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java
index f83b2d9..d469205 100644
--- a/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java
+++ b/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java
@@ -19,6 +19,8 @@
package org.apache.hudi.table.format;
import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.table.HoodieTableSource;
import org.apache.hudi.table.format.cow.CopyOnWriteInputFormat;
@@ -44,6 +46,7 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.stream.Collectors;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.CoreMatchers.is;
@@ -71,12 +74,7 @@ public class TestInputFormat {
options.forEach((key, value) -> conf.setString(key, value));
StreamerUtil.initTableIfNotExists(conf);
- this.tableSource = new HoodieTableSource(
- TestConfigurations.TABLE_SCHEMA,
- new Path(tempFile.getAbsolutePath()),
- Collections.singletonList("partition"),
- "default",
- conf);
+ this.tableSource = getTableSource(conf);
}
@ParameterizedTest
@@ -385,10 +383,81 @@ public class TestInputFormat {
assertThat(actual, is(expected));
}
+ @ParameterizedTest
+ @EnumSource(value = HoodieTableType.class)
+ void testReadIncrementally(HoodieTableType tableType) throws Exception {
+ Map<String, String> options = new HashMap<>();
+ options.put(FlinkOptions.QUERY_TYPE.key(),
FlinkOptions.QUERY_TYPE_INCREMENTAL);
+ beforeEach(tableType, options);
+
+ // write another commit to read again
+ for (int i = 0; i < 6; i += 2) {
+ List<RowData> dataset = TestData.dataSetInsert(i + 1, i + 2);
+ TestData.writeData(dataset, conf);
+ }
+
+ HoodieTableMetaClient metaClient =
StreamerUtil.createMetaClient(tempFile.getAbsolutePath());
+ List<String> commits =
metaClient.getCommitsTimeline().filterCompletedInstants().getInstants()
+ .map(HoodieInstant::getTimestamp).collect(Collectors.toList());
+
+ assertThat(commits.size(), is(3));
+
+ // only the start commit
+ conf.setString(FlinkOptions.READ_START_COMMIT, commits.get(1));
+ this.tableSource = getTableSource(conf);
+ InputFormat<RowData, ?> inputFormat1 = this.tableSource.getInputFormat();
+ assertThat(inputFormat1, instanceOf(MergeOnReadInputFormat.class));
+
+ List<RowData> actual1 = readData(inputFormat1);
+ final List<RowData> expected1 = TestData.dataSetInsert(3, 4, 5, 6);
+ TestData.assertRowDataEquals(actual1, expected1);
+
+ // only the start commit: earliest
+ conf.setString(FlinkOptions.READ_START_COMMIT,
FlinkOptions.START_COMMIT_EARLIEST);
+ this.tableSource = getTableSource(conf);
+ InputFormat<RowData, ?> inputFormat2 = this.tableSource.getInputFormat();
+ assertThat(inputFormat2, instanceOf(MergeOnReadInputFormat.class));
+
+ List<RowData> actual2 = readData(inputFormat2);
+ final List<RowData> expected2 = TestData.dataSetInsert(1, 2, 3, 4, 5, 6);
+ TestData.assertRowDataEquals(actual2, expected2);
+
+ // start and end commit: [start commit, end commit]
+ conf.setString(FlinkOptions.READ_START_COMMIT, commits.get(0));
+ conf.setString(FlinkOptions.READ_END_COMMIT, commits.get(1));
+ this.tableSource = getTableSource(conf);
+ InputFormat<RowData, ?> inputFormat3 = this.tableSource.getInputFormat();
+ assertThat(inputFormat3, instanceOf(MergeOnReadInputFormat.class));
+
+ List<RowData> actual3 = readData(inputFormat3);
+ final List<RowData> expected3 = TestData.dataSetInsert(1, 2, 3, 4);
+ TestData.assertRowDataEquals(actual3, expected3);
+
+ // only the end commit: point in time query
+ conf.removeConfig(FlinkOptions.READ_START_COMMIT);
+ conf.setString(FlinkOptions.READ_END_COMMIT, commits.get(1));
+ this.tableSource = getTableSource(conf);
+ InputFormat<RowData, ?> inputFormat4 = this.tableSource.getInputFormat();
+ assertThat(inputFormat4, instanceOf(MergeOnReadInputFormat.class));
+
+ List<RowData> actual4 = readData(inputFormat4);
+ final List<RowData> expected4 = TestData.dataSetInsert(3, 4);
+ TestData.assertRowDataEquals(actual4, expected4);
+ }
+
// -------------------------------------------------------------------------
// Utilities
// -------------------------------------------------------------------------
+ private HoodieTableSource getTableSource(Configuration conf) {
+ return new HoodieTableSource(
+ TestConfigurations.TABLE_SCHEMA,
+ new Path(tempFile.getAbsolutePath()),
+ Collections.singletonList("partition"),
+ "default",
+ conf);
+ }
+
@SuppressWarnings("unchecked, rawtypes")
private static List<RowData> readData(InputFormat inputFormat) throws
IOException {
InputSplit[] inputSplits = inputFormat.createInputSplits(1);
diff --git a/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
b/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
index 3e0afc2..b0f7b5f 100644
--- a/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
+++ b/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
@@ -284,6 +284,14 @@ public class TestData {
TimestampData.fromEpochMillis(2), StringData.fromString("par1"))
);
+ public static List<RowData> dataSetInsert(int... ids) {
+ List<RowData> inserts = new ArrayList<>();
+ Arrays.stream(ids).forEach(i -> inserts.add(
+ insertRow(StringData.fromString("id" + i),
StringData.fromString("Danny"), 23,
+ TimestampData.fromEpochMillis(i), StringData.fromString("par1"))));
+ return inserts;
+ }
+
private static Integer toIdSafely(Object id) {
if (id == null) {
return -1;
@@ -424,7 +432,7 @@ public class TestData {
*/
public static void assertRowDataEquals(List<RowData> rows, List<RowData>
expected) {
String rowsString = rowDataToString(rows);
- assertThat(rowDataToString(expected), is(rowsString));
+ assertThat(rowsString, is(rowDataToString(expected)));
}
/**
diff --git a/hudi-flink/src/test/java/org/apache/hudi/utils/TestUtils.java
b/hudi-flink/src/test/java/org/apache/hudi/utils/TestUtils.java
index 4e9ad51..3719705 100644
--- a/hudi-flink/src/test/java/org/apache/hudi/utils/TestUtils.java
+++ b/hudi-flink/src/test/java/org/apache/hudi/utils/TestUtils.java
@@ -28,7 +28,6 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.Path;
import java.io.File;
-import java.util.Collections;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -58,6 +57,6 @@ public class TestUtils {
public static StreamReadMonitoringFunction getMonitorFunc(Configuration
conf) {
final String basePath = conf.getString(FlinkOptions.PATH);
- return new StreamReadMonitoringFunction(conf, new Path(basePath), 1024 *
1024L, Collections.emptySet());
+ return new StreamReadMonitoringFunction(conf, new Path(basePath), 1024 *
1024L, null);
}
}