yanghua commented on a change in pull request #2640:
URL: https://github.com/apache/hudi/pull/2640#discussion_r590077931



##########
File path: hudi-flink/src/main/java/org/apache/hudi/operator/FlinkOptions.java
##########
@@ -112,6 +118,25 @@ private FlinkOptions() {
           + " time and LocalDateTime. Hive 0.x/1.x/2.x use local timezone. But 
Hive 3.x"
           + " use UTC timezone, by default true");
 
+  public static final ConfigOption<Boolean> READ_AS_STREAMING = ConfigOptions
+      .key("read.streaming.enabled")
+      .booleanType()
+      .defaultValue(false)// default read as batch
+      .withDescription("Whether to read as streaming source, default false");
+
+  public static final ConfigOption<Integer> STREAMING_CHECK_INTERVAL = 
ConfigOptions
+      .key("read.streaming.check-interval")
+      .intType()
+      .defaultValue(60)// default 1 minute
+      .withDescription("Check interval for streaming read of SECOND, default 1 
minute");
+
+  public static final ConfigOption<String> STREAMING_START_COMMIT = 
ConfigOptions

Review comment:
       ditto

##########
File path: 
hudi-flink/src/main/java/org/apache/hudi/operator/StreamReadMonitoringFunction.java
##########
@@ -0,0 +1,324 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.operator;
+
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieLogFile;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.source.format.FilePathUtils;
+import org.apache.hudi.source.format.mor.InstantRange;
+import org.apache.hudi.source.format.mor.MergeOnReadInputSplit;
+import org.apache.hudi.util.StreamerUtil;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+
+import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.GREATER_THAN;
+import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.GREATER_THAN_OR_EQUALS;
+
+/**
+ * This is the single (non-parallel) monitoring task which takes a {@link 
MergeOnReadInputSplit}
+ * , it is responsible for:
+ *
+ * <ol>
+ *     <li>Monitoring a user-provided hoodie table path.</li>
+ *     <li>Deciding which files(or split) should be further read and 
processed.</li>
+ *     <li>Creating the {@link MergeOnReadInputSplit splits} corresponding to 
those files.</li>
+ *     <li>Assigning them to downstream tasks for further processing.</li>
+ * </ol>
+ *
+ * <p>The splits to be read are forwarded to the downstream {@link 
StreamReadOperator}
+ * which can have parallelism greater than one.
+ *
+ * <p><b>IMPORTANT NOTE: </b> Splits are forwarded downstream for reading in 
ascending instant commits time order,
+ * in each downstream task, the splits are also read in receiving sequence. We 
do not ensure split consuming sequence
+ * among the downstream tasks.
+ */
+public class StreamReadMonitoringFunction

Review comment:
       This is very like the `ContinuousFileMonitoringFunction` in flink 
project. And it seems `ContinuousFileMonitoringFunction` sounds more 
reasonable. Because we monitor the file (or the path) right?

##########
File path: 
hudi-flink/src/main/java/org/apache/hudi/operator/StreamReadMonitoringFunction.java
##########
@@ -0,0 +1,324 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.operator;
+
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieLogFile;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.source.format.FilePathUtils;
+import org.apache.hudi.source.format.mor.InstantRange;
+import org.apache.hudi.source.format.mor.MergeOnReadInputSplit;
+import org.apache.hudi.util.StreamerUtil;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+
+import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.GREATER_THAN;
+import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.GREATER_THAN_OR_EQUALS;
+
+/**
+ * This is the single (non-parallel) monitoring task which takes a {@link 
MergeOnReadInputSplit}
+ * , it is responsible for:
+ *
+ * <ol>
+ *     <li>Monitoring a user-provided hoodie table path.</li>
+ *     <li>Deciding which files(or split) should be further read and 
processed.</li>
+ *     <li>Creating the {@link MergeOnReadInputSplit splits} corresponding to 
those files.</li>
+ *     <li>Assigning them to downstream tasks for further processing.</li>
+ * </ol>
+ *
+ * <p>The splits to be read are forwarded to the downstream {@link 
StreamReadOperator}
+ * which can have parallelism greater than one.
+ *
+ * <p><b>IMPORTANT NOTE: </b> Splits are forwarded downstream for reading in 
ascending instant commits time order,
+ * in each downstream task, the splits are also read in receiving sequence. We 
do not ensure split consuming sequence
+ * among the downstream tasks.
+ */
+public class StreamReadMonitoringFunction
+    extends RichSourceFunction<MergeOnReadInputSplit> implements 
CheckpointedFunction {
+  private static final Logger LOG = 
LoggerFactory.getLogger(StreamReadMonitoringFunction.class);
+
+  private static final long serialVersionUID = 1L;
+
+  /** The path to monitor. */
+  private final Path path;
+
+  /** The interval between consecutive path scans. */
+  private final long interval;
+
+  private transient Object checkpointLock;
+
+  private volatile boolean isRunning = true;
+
+  private String issuedInstant;
+
+  private transient ListState<String> instantState;
+
+  private final Configuration conf;
+
+  private transient org.apache.hadoop.conf.Configuration hadoopConf;
+
+  private final HoodieTableMetaClient metaClient;
+
+  private final List<String> partitionKeys;
+
+  private final String defaultParName;
+
+  private final long maxCompactionMemoryInBytes;
+
+  public StreamReadMonitoringFunction(
+      Configuration conf,
+      Path path,
+      HoodieTableMetaClient metaClient,
+      List<String> partitionKeys,
+      long maxCompactionMemoryInBytes) {
+    this.conf = conf;
+    this.path = path;
+    this.metaClient = metaClient;
+    this.partitionKeys = partitionKeys;
+    this.defaultParName = conf.getString(FlinkOptions.PARTITION_DEFAULT_NAME);
+    this.interval = conf.getInteger(FlinkOptions.STREAMING_CHECK_INTERVAL);
+    this.maxCompactionMemoryInBytes = maxCompactionMemoryInBytes;
+  }
+
+  @Override
+  public void initializeState(FunctionInitializationContext context) throws 
Exception {
+
+    ValidationUtils.checkState(this.instantState == null,
+        "The " + getClass().getSimpleName() + " has already been 
initialized.");
+
+    this.instantState = context.getOperatorStateStore().getListState(
+        new ListStateDescriptor<>(
+            "file-monitoring-state",
+            StringSerializer.INSTANCE
+        )
+    );
+
+    if (context.isRestored()) {
+      LOG.info("Restoring state for the {}.", getClass().getSimpleName());

Review comment:
       IMO, this log missed some key information, for example, table, path, and 
so on.

##########
File path: 
hudi-flink/src/main/java/org/apache/hudi/operator/StreamReadMonitoringFunction.java
##########
@@ -0,0 +1,324 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.operator;
+
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieLogFile;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.source.format.FilePathUtils;
+import org.apache.hudi.source.format.mor.InstantRange;
+import org.apache.hudi.source.format.mor.MergeOnReadInputSplit;
+import org.apache.hudi.util.StreamerUtil;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+
+import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.GREATER_THAN;
+import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.GREATER_THAN_OR_EQUALS;
+
+/**
+ * This is the single (non-parallel) monitoring task which takes a {@link 
MergeOnReadInputSplit}
+ * , it is responsible for:
+ *
+ * <ol>
+ *     <li>Monitoring a user-provided hoodie table path.</li>
+ *     <li>Deciding which files(or split) should be further read and 
processed.</li>
+ *     <li>Creating the {@link MergeOnReadInputSplit splits} corresponding to 
those files.</li>
+ *     <li>Assigning them to downstream tasks for further processing.</li>
+ * </ol>
+ *
+ * <p>The splits to be read are forwarded to the downstream {@link 
StreamReadOperator}
+ * which can have parallelism greater than one.
+ *
+ * <p><b>IMPORTANT NOTE: </b> Splits are forwarded downstream for reading in 
ascending instant commits time order,
+ * in each downstream task, the splits are also read in receiving sequence. We 
do not ensure split consuming sequence
+ * among the downstream tasks.
+ */
+public class StreamReadMonitoringFunction
+    extends RichSourceFunction<MergeOnReadInputSplit> implements 
CheckpointedFunction {
+  private static final Logger LOG = 
LoggerFactory.getLogger(StreamReadMonitoringFunction.class);
+
+  private static final long serialVersionUID = 1L;
+
+  /** The path to monitor. */
+  private final Path path;
+
+  /** The interval between consecutive path scans. */
+  private final long interval;
+
+  private transient Object checkpointLock;
+
+  private volatile boolean isRunning = true;
+
+  private String issuedInstant;
+
+  private transient ListState<String> instantState;
+
+  private final Configuration conf;
+
+  private transient org.apache.hadoop.conf.Configuration hadoopConf;
+
+  private final HoodieTableMetaClient metaClient;
+
+  private final List<String> partitionKeys;
+
+  private final String defaultParName;
+
+  private final long maxCompactionMemoryInBytes;
+
+  public StreamReadMonitoringFunction(
+      Configuration conf,
+      Path path,
+      HoodieTableMetaClient metaClient,
+      List<String> partitionKeys,
+      long maxCompactionMemoryInBytes) {
+    this.conf = conf;
+    this.path = path;
+    this.metaClient = metaClient;
+    this.partitionKeys = partitionKeys;
+    this.defaultParName = conf.getString(FlinkOptions.PARTITION_DEFAULT_NAME);
+    this.interval = conf.getInteger(FlinkOptions.STREAMING_CHECK_INTERVAL);
+    this.maxCompactionMemoryInBytes = maxCompactionMemoryInBytes;
+  }
+
+  @Override
+  public void initializeState(FunctionInitializationContext context) throws 
Exception {
+
+    ValidationUtils.checkState(this.instantState == null,
+        "The " + getClass().getSimpleName() + " has already been 
initialized.");
+
+    this.instantState = context.getOperatorStateStore().getListState(
+        new ListStateDescriptor<>(
+            "file-monitoring-state",
+            StringSerializer.INSTANCE
+        )
+    );
+
+    if (context.isRestored()) {
+      LOG.info("Restoring state for the {}.", getClass().getSimpleName());
+
+      List<String> retrievedStates = new ArrayList<>();
+      for (String entry : this.instantState.get()) {
+        retrievedStates.add(entry);
+      }
+
+      ValidationUtils.checkArgument(retrievedStates.size() <= 1,
+          getClass().getSimpleName() + " retrieved invalid state.");
+
+      if (retrievedStates.size() == 1 && issuedInstant != null) {
+        // this is the case where we have both legacy and new state.
+        // the two should be mutually exclusive for the operator, thus we 
throw the exception.
+
+        throw new IllegalArgumentException(
+            "The " + getClass().getSimpleName() + " has already restored from 
a previous Flink version.");
+
+      } else if (retrievedStates.size() == 1) {
+        this.issuedInstant = retrievedStates.get(0);
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("{} retrieved a issued instant of time {}.",
+              getClass().getSimpleName(), issuedInstant);
+        }
+      }
+    } else {
+      LOG.info("No state to restore for the {}.", getClass().getSimpleName());
+    }
+  }
+
+  @Override
+  public void open(Configuration parameters) throws Exception {
+    super.open(parameters);
+    this.hadoopConf = StreamerUtil.getHadoopConf();
+  }
+
+  @Override
+  public void run(SourceFunction.SourceContext<MergeOnReadInputSplit> context) 
throws Exception {
+    checkpointLock = context.getCheckpointLock();
+    while (isRunning) {
+      synchronized (checkpointLock) {
+        monitorDirAndForwardSplits(context);
+      }
+      TimeUnit.SECONDS.sleep(interval);
+    }
+  }
+
+  @VisibleForTesting
+  public void monitorDirAndForwardSplits(SourceContext<MergeOnReadInputSplit> 
context) {
+    // list files

Review comment:
       Do we list files here?

##########
File path: 
hudi-flink/src/main/java/org/apache/hudi/operator/StreamReadMonitoringFunction.java
##########
@@ -0,0 +1,324 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.operator;
+
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieLogFile;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.source.format.FilePathUtils;
+import org.apache.hudi.source.format.mor.InstantRange;
+import org.apache.hudi.source.format.mor.MergeOnReadInputSplit;
+import org.apache.hudi.util.StreamerUtil;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+
+import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.GREATER_THAN;
+import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.GREATER_THAN_OR_EQUALS;
+
+/**
+ * This is the single (non-parallel) monitoring task which takes a {@link 
MergeOnReadInputSplit}
+ * , it is responsible for:
+ *
+ * <ol>
+ *     <li>Monitoring a user-provided hoodie table path.</li>
+ *     <li>Deciding which files(or split) should be further read and 
processed.</li>
+ *     <li>Creating the {@link MergeOnReadInputSplit splits} corresponding to 
those files.</li>
+ *     <li>Assigning them to downstream tasks for further processing.</li>
+ * </ol>
+ *
+ * <p>The splits to be read are forwarded to the downstream {@link 
StreamReadOperator}
+ * which can have parallelism greater than one.
+ *
+ * <p><b>IMPORTANT NOTE: </b> Splits are forwarded downstream for reading in 
ascending instant commits time order,
+ * in each downstream task, the splits are also read in receiving sequence. We 
do not ensure split consuming sequence
+ * among the downstream tasks.
+ */
+public class StreamReadMonitoringFunction
+    extends RichSourceFunction<MergeOnReadInputSplit> implements 
CheckpointedFunction {
+  private static final Logger LOG = 
LoggerFactory.getLogger(StreamReadMonitoringFunction.class);
+
+  private static final long serialVersionUID = 1L;
+
+  /** The path to monitor. */
+  private final Path path;
+
+  /** The interval between consecutive path scans. */
+  private final long interval;
+
+  private transient Object checkpointLock;
+
+  private volatile boolean isRunning = true;
+
+  private String issuedInstant;
+
+  private transient ListState<String> instantState;
+
+  private final Configuration conf;
+
+  private transient org.apache.hadoop.conf.Configuration hadoopConf;
+
+  private final HoodieTableMetaClient metaClient;
+
+  private final List<String> partitionKeys;
+
+  private final String defaultParName;
+
+  private final long maxCompactionMemoryInBytes;
+
+  public StreamReadMonitoringFunction(
+      Configuration conf,
+      Path path,
+      HoodieTableMetaClient metaClient,
+      List<String> partitionKeys,
+      long maxCompactionMemoryInBytes) {
+    this.conf = conf;
+    this.path = path;
+    this.metaClient = metaClient;
+    this.partitionKeys = partitionKeys;
+    this.defaultParName = conf.getString(FlinkOptions.PARTITION_DEFAULT_NAME);
+    this.interval = conf.getInteger(FlinkOptions.STREAMING_CHECK_INTERVAL);
+    this.maxCompactionMemoryInBytes = maxCompactionMemoryInBytes;
+  }
+
+  @Override
+  public void initializeState(FunctionInitializationContext context) throws 
Exception {
+
+    ValidationUtils.checkState(this.instantState == null,
+        "The " + getClass().getSimpleName() + " has already been 
initialized.");
+
+    this.instantState = context.getOperatorStateStore().getListState(
+        new ListStateDescriptor<>(
+            "file-monitoring-state",
+            StringSerializer.INSTANCE
+        )
+    );
+
+    if (context.isRestored()) {
+      LOG.info("Restoring state for the {}.", getClass().getSimpleName());
+
+      List<String> retrievedStates = new ArrayList<>();
+      for (String entry : this.instantState.get()) {
+        retrievedStates.add(entry);
+      }
+
+      ValidationUtils.checkArgument(retrievedStates.size() <= 1,
+          getClass().getSimpleName() + " retrieved invalid state.");
+
+      if (retrievedStates.size() == 1 && issuedInstant != null) {
+        // this is the case where we have both legacy and new state.
+        // the two should be mutually exclusive for the operator, thus we 
throw the exception.
+
+        throw new IllegalArgumentException(
+            "The " + getClass().getSimpleName() + " has already restored from 
a previous Flink version.");
+
+      } else if (retrievedStates.size() == 1) {
+        this.issuedInstant = retrievedStates.get(0);
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("{} retrieved a issued instant of time {}.",
+              getClass().getSimpleName(), issuedInstant);
+        }
+      }
+    } else {
+      LOG.info("No state to restore for the {}.", getClass().getSimpleName());

Review comment:
       Do we need this log?

##########
File path: hudi-flink/src/main/java/org/apache/hudi/operator/FlinkOptions.java
##########
@@ -112,6 +118,25 @@ private FlinkOptions() {
           + " time and LocalDateTime. Hive 0.x/1.x/2.x use local timezone. But 
Hive 3.x"
           + " use UTC timezone, by default true");
 
+  public static final ConfigOption<Boolean> READ_AS_STREAMING = ConfigOptions
+      .key("read.streaming.enabled")
+      .booleanType()
+      .defaultValue(false)// default read as batch
+      .withDescription("Whether to read as streaming source, default false");
+
+  public static final ConfigOption<Integer> STREAMING_CHECK_INTERVAL = 
ConfigOptions

Review comment:
       It would be better to add `READ_`.

##########
File path: 
hudi-flink/src/main/java/org/apache/hudi/operator/StreamReadMonitoringFunction.java
##########
@@ -0,0 +1,324 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.operator;
+
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieLogFile;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.source.format.FilePathUtils;
+import org.apache.hudi.source.format.mor.InstantRange;
+import org.apache.hudi.source.format.mor.MergeOnReadInputSplit;
+import org.apache.hudi.util.StreamerUtil;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+
+import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.GREATER_THAN;
+import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.GREATER_THAN_OR_EQUALS;
+
+/**
+ * This is the single (non-parallel) monitoring task which takes a {@link 
MergeOnReadInputSplit}
+ * , it is responsible for:
+ *
+ * <ol>
+ *     <li>Monitoring a user-provided hoodie table path.</li>
+ *     <li>Deciding which files(or split) should be further read and 
processed.</li>
+ *     <li>Creating the {@link MergeOnReadInputSplit splits} corresponding to 
those files.</li>
+ *     <li>Assigning them to downstream tasks for further processing.</li>
+ * </ol>
+ *
+ * <p>The splits to be read are forwarded to the downstream {@link 
StreamReadOperator}
+ * which can have parallelism greater than one.
+ *
+ * <p><b>IMPORTANT NOTE: </b> Splits are forwarded downstream for reading in 
ascending instant commits time order,
+ * in each downstream task, the splits are also read in receiving sequence. We 
do not ensure split consuming sequence
+ * among the downstream tasks.
+ */
+public class StreamReadMonitoringFunction
+    extends RichSourceFunction<MergeOnReadInputSplit> implements 
CheckpointedFunction {
+  private static final Logger LOG = 
LoggerFactory.getLogger(StreamReadMonitoringFunction.class);
+
+  private static final long serialVersionUID = 1L;
+
+  /** The path to monitor. */
+  private final Path path;
+
+  /** The interval between consecutive path scans. */
+  private final long interval;
+
+  private transient Object checkpointLock;
+
+  private volatile boolean isRunning = true;
+
+  private String issuedInstant;
+
+  private transient ListState<String> instantState;
+
+  private final Configuration conf;
+
+  private transient org.apache.hadoop.conf.Configuration hadoopConf;
+
+  private final HoodieTableMetaClient metaClient;
+
+  private final List<String> partitionKeys;
+
+  private final String defaultParName;
+
+  private final long maxCompactionMemoryInBytes;
+
+  public StreamReadMonitoringFunction(
+      Configuration conf,
+      Path path,
+      HoodieTableMetaClient metaClient,
+      List<String> partitionKeys,
+      long maxCompactionMemoryInBytes) {
+    this.conf = conf;
+    this.path = path;
+    this.metaClient = metaClient;
+    this.partitionKeys = partitionKeys;
+    this.defaultParName = conf.getString(FlinkOptions.PARTITION_DEFAULT_NAME);
+    this.interval = conf.getInteger(FlinkOptions.STREAMING_CHECK_INTERVAL);
+    this.maxCompactionMemoryInBytes = maxCompactionMemoryInBytes;
+  }
+
+  @Override
+  public void initializeState(FunctionInitializationContext context) throws 
Exception {
+
+    ValidationUtils.checkState(this.instantState == null,
+        "The " + getClass().getSimpleName() + " has already been 
initialized.");
+
+    this.instantState = context.getOperatorStateStore().getListState(
+        new ListStateDescriptor<>(
+            "file-monitoring-state",
+            StringSerializer.INSTANCE
+        )
+    );
+
+    if (context.isRestored()) {
+      LOG.info("Restoring state for the {}.", getClass().getSimpleName());
+
+      List<String> retrievedStates = new ArrayList<>();
+      for (String entry : this.instantState.get()) {
+        retrievedStates.add(entry);
+      }
+
+      ValidationUtils.checkArgument(retrievedStates.size() <= 1,
+          getClass().getSimpleName() + " retrieved invalid state.");
+
+      if (retrievedStates.size() == 1 && issuedInstant != null) {
+        // this is the case where we have both legacy and new state.
+        // the two should be mutually exclusive for the operator, thus we 
throw the exception.
+
+        throw new IllegalArgumentException(
+            "The " + getClass().getSimpleName() + " has already restored from 
a previous Flink version.");
+
+      } else if (retrievedStates.size() == 1) {
+        this.issuedInstant = retrievedStates.get(0);
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("{} retrieved a issued instant of time {}.",
+              getClass().getSimpleName(), issuedInstant);
+        }
+      }
+    } else {
+      LOG.info("No state to restore for the {}.", getClass().getSimpleName());
+    }
+  }
+
+  @Override
+  public void open(Configuration parameters) throws Exception {
+    super.open(parameters);
+    this.hadoopConf = StreamerUtil.getHadoopConf();
+  }
+
+  @Override
+  public void run(SourceFunction.SourceContext<MergeOnReadInputSplit> context) 
throws Exception {
+    checkpointLock = context.getCheckpointLock();
+    while (isRunning) {
+      synchronized (checkpointLock) {
+        monitorDirAndForwardSplits(context);
+      }
+      TimeUnit.SECONDS.sleep(interval);
+    }
+  }
+
+  @VisibleForTesting
+  public void monitorDirAndForwardSplits(SourceContext<MergeOnReadInputSplit> 
context) {
+    // list files
+    metaClient.reloadActiveTimeline();
+    HoodieTimeline commitTimeline = 
metaClient.getCommitsAndCompactionTimeline().filterCompletedInstants();
+    if (commitTimeline.empty()) {
+      LOG.warn("No splits found for the table under path " + path);
+      return;
+    }
+    List<HoodieInstant> instants = getUncompactedInstants(commitTimeline, 
this.issuedInstant);
+    // get the latest instant that satisfies condition
+    final HoodieInstant instantToIssue = instants.size() == 0 ? null : 
instants.get(instants.size() - 1);
+    final InstantRange instantRange;
+    if (instantToIssue != null) {
+      if (this.issuedInstant != null) {
+        // had already consumed an instant
+        instantRange = InstantRange.getInstance(this.issuedInstant, 
instantToIssue.getTimestamp(),
+            InstantRange.RangeType.OPEN_CLOSE);
+      } else if 
(this.conf.getOptional(FlinkOptions.STREAMING_START_COMMIT).isPresent()) {
+        // first time consume and has a start commit
+        final String specifiedStart = 
this.conf.getString(FlinkOptions.STREAMING_START_COMMIT);
+        instantRange = InstantRange.getInstance(specifiedStart, 
instantToIssue.getTimestamp(),
+            InstantRange.RangeType.CLOSE_CLOSE);
+      } else {
+        // first time consume and no start commit,
+        // would consume all the snapshot data PLUS incremental data set
+        instantRange = null;
+      }
+    } else {
+      LOG.info("No new instant found for the table under path " + path + ", 
skip reading");
+      return;
+    }
+    // generate input split
+    Path[] paths = FilePathUtils.getReadPaths(path, conf, partitionKeys, 
defaultParName);
+    org.apache.hadoop.fs.FileStatus[] fileStatuses = Arrays.stream(paths)
+        .flatMap(path -> 
Arrays.stream(FilePathUtils.getHadoopFileStatusRecursively(path, 1, 
hadoopConf)))
+        .toArray(org.apache.hadoop.fs.FileStatus[]::new);
+    if (fileStatuses.length == 0) {
+      throw new HoodieException("No files found for reading in user provided 
path.");
+    }
+
+    HoodieTableFileSystemView fsView = new 
HoodieTableFileSystemView(metaClient, commitTimeline, fileStatuses);
+    final String commitToIssue = instantToIssue.getTimestamp();
+    final AtomicInteger cnt = new AtomicInteger(0);
+    final String mergeType = this.conf.getString(FlinkOptions.MERGE_TYPE);
+    List<MergeOnReadInputSplit> inputSplits = 
Arrays.stream(paths).map(partitionPath -> {
+      String relPartitionPath = FSUtils.getRelativePartitionPath(
+          new org.apache.hadoop.fs.Path(path.toUri()),
+          new org.apache.hadoop.fs.Path(partitionPath.toUri()));
+      return fsView.getLatestMergedFileSlicesBeforeOrOn(relPartitionPath, 
commitToIssue)
+          .map(fileSlice -> {
+            Option<List<String>> logPaths = 
Option.ofNullable(fileSlice.getLogFiles()
+                .sorted(HoodieLogFile.getLogFileComparator())
+                .map(logFile -> logFile.getPath().toString())
+                .collect(Collectors.toList()));
+            return new MergeOnReadInputSplit(cnt.getAndAdd(1),
+                null, logPaths, commitToIssue,
+                metaClient.getBasePath(), maxCompactionMemoryInBytes, 
mergeType, instantRange);
+          }).collect(Collectors.toList()); })
+        .flatMap(Collection::stream)
+        .collect(Collectors.toList());
+
+    for (MergeOnReadInputSplit split : inputSplits) {
+      LOG.info("Forwarding split: " + split);

Review comment:
       Do we really need to log each split even for a big table?

##########
File path: 
hudi-flink/src/main/java/org/apache/hudi/operator/StreamReadMonitoringFunction.java
##########
@@ -0,0 +1,324 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.operator;
+
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieLogFile;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.source.format.FilePathUtils;
+import org.apache.hudi.source.format.mor.InstantRange;
+import org.apache.hudi.source.format.mor.MergeOnReadInputSplit;
+import org.apache.hudi.util.StreamerUtil;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+
+import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.GREATER_THAN;
+import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.GREATER_THAN_OR_EQUALS;
+
+/**
+ * This is the single (non-parallel) monitoring task which takes a {@link 
MergeOnReadInputSplit}
+ * , it is responsible for:
+ *
+ * <ol>
+ *     <li>Monitoring a user-provided hoodie table path.</li>
+ *     <li>Deciding which files(or split) should be further read and 
processed.</li>
+ *     <li>Creating the {@link MergeOnReadInputSplit splits} corresponding to 
those files.</li>
+ *     <li>Assigning them to downstream tasks for further processing.</li>
+ * </ol>
+ *
+ * <p>The splits to be read are forwarded to the downstream {@link 
StreamReadOperator}
+ * which can have parallelism greater than one.
+ *
+ * <p><b>IMPORTANT NOTE: </b> Splits are forwarded downstream for reading in 
ascending instant commits time order,
+ * in each downstream task, the splits are also read in receiving sequence. We 
do not ensure split consuming sequence
+ * among the downstream tasks.
+ */
+public class StreamReadMonitoringFunction
+    extends RichSourceFunction<MergeOnReadInputSplit> implements 
CheckpointedFunction {
+  private static final Logger LOG = 
LoggerFactory.getLogger(StreamReadMonitoringFunction.class);
+
+  private static final long serialVersionUID = 1L;
+
+  /** The path to monitor. */
+  private final Path path;
+
+  /** The interval between consecutive path scans. */
+  private final long interval;
+
+  private transient Object checkpointLock;
+
+  private volatile boolean isRunning = true;
+
+  private String issuedInstant;
+
+  private transient ListState<String> instantState;
+
+  private final Configuration conf;
+
+  private transient org.apache.hadoop.conf.Configuration hadoopConf;
+
+  private final HoodieTableMetaClient metaClient;
+
+  private final List<String> partitionKeys;
+
+  private final String defaultParName;
+
+  private final long maxCompactionMemoryInBytes;
+
+  public StreamReadMonitoringFunction(
+      Configuration conf,
+      Path path,
+      HoodieTableMetaClient metaClient,
+      List<String> partitionKeys,
+      long maxCompactionMemoryInBytes) {
+    this.conf = conf;
+    this.path = path;
+    this.metaClient = metaClient;
+    this.partitionKeys = partitionKeys;
+    this.defaultParName = conf.getString(FlinkOptions.PARTITION_DEFAULT_NAME);
+    this.interval = conf.getInteger(FlinkOptions.STREAMING_CHECK_INTERVAL);
+    this.maxCompactionMemoryInBytes = maxCompactionMemoryInBytes;
+  }
+
+  @Override
+  public void initializeState(FunctionInitializationContext context) throws 
Exception {
+
+    ValidationUtils.checkState(this.instantState == null,
+        "The " + getClass().getSimpleName() + " has already been 
initialized.");
+
+    this.instantState = context.getOperatorStateStore().getListState(
+        new ListStateDescriptor<>(
+            "file-monitoring-state",
+            StringSerializer.INSTANCE
+        )
+    );
+
+    if (context.isRestored()) {
+      LOG.info("Restoring state for the {}.", getClass().getSimpleName());
+
+      List<String> retrievedStates = new ArrayList<>();
+      for (String entry : this.instantState.get()) {
+        retrievedStates.add(entry);
+      }
+
+      ValidationUtils.checkArgument(retrievedStates.size() <= 1,
+          getClass().getSimpleName() + " retrieved invalid state.");
+
+      if (retrievedStates.size() == 1 && issuedInstant != null) {
+        // this is the case where we have both legacy and new state.
+        // the two should be mutually exclusive for the operator, thus we 
throw the exception.
+
+        throw new IllegalArgumentException(
+            "The " + getClass().getSimpleName() + " has already restored from 
a previous Flink version.");
+
+      } else if (retrievedStates.size() == 1) {
+        this.issuedInstant = retrievedStates.get(0);
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("{} retrieved a issued instant of time {}.",

Review comment:
       ditto, we need more information in the log.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to