lirui-apache commented on a change in pull request #12004:
URL: https://github.com/apache/flink/pull/12004#discussion_r421206166



##########
File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/HiveTableInputFormat.java
##########
@@ -248,7 +263,11 @@ public RowData nextRecord(RowData reuse) throws 
IOException {
        }
 
        @Override
-       public void close() throws IOException {
+       public void close() {

Review comment:
       What's the difference between `close` and `closeInputFormat`? And can we 
have some comments explaining why we don't have to do anything in `close`?

##########
File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/HiveContinuousMonitoringFunction.java
##########
@@ -0,0 +1,358 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connectors.hive.read;
+
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeutils.base.ListSerializer;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.connectors.hive.HiveTablePartition;
+import org.apache.flink.connectors.hive.HiveTableSource;
+import org.apache.flink.connectors.hive.JobConfWrapper;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import 
org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator;
+import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.hive.client.HiveShim;
+import org.apache.flink.table.catalog.hive.util.HiveReflectionUtils;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.filesystem.PartitionTimeExtractor;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.LocalDateTime;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Properties;
+import java.util.Set;
+import java.util.function.Supplier;
+
+import static 
org.apache.flink.table.filesystem.TimestampPartTimeExtractor.toLocalDateTime;
+
+/**
+ * This is the single (non-parallel) monitoring task which takes a {@link 
HiveTableInputFormat},
+ * it is responsible for:
+ *
+ * <ol>
+ *     <li>Monitoring partitions of hive meta store.</li>
+ *     <li>Deciding which partitions should be further read and processed.</li>
+ *     <li>Creating the {@link HiveTableInputSplit splits} corresponding to 
those partitions.</li>
+ *     <li>Assigning them to downstream tasks for further processing.</li>
+ * </ol>
+ *
+ * <p>The splits to be read are forwarded to the downstream {@link 
ContinuousFileReaderOperator}
+ * which can have parallelism greater than one.
+ *
+ * <p><b>IMPORTANT NOTE: </b> Splits are forwarded downstream for reading in 
ascending partition time order,
+ * based on the partition time of the partitions they belong to.
+ */
+public class HiveContinuousMonitoringFunction
+               extends RichSourceFunction<TimestampedHiveInputSplit>
+               implements CheckpointedFunction {
+
+       private static final long serialVersionUID = 1L;
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(HiveContinuousMonitoringFunction.class);
+
+       /** The parallelism of the downstream readers. */
+       private final int readerParallelism;
+
+       /** The interval between consecutive path scans. */
+       private final long interval;
+
+       private final HiveShim hiveShim;
+
+       private final JobConfWrapper conf;
+
+       private final ObjectPath tablePath;
+
+       private final List<String> partitionKeys;
+
+       private final String[] fieldNames;
+
+       private final DataType[] fieldTypes;
+
+       // consumer variables
+       private final String consumeOrder;
+       private final String consumeOffset;
+
+       // extractor variables
+       private final String extractorType;
+       private final String extractorClass;
+       private final String extractorPattern;
+
+       private volatile boolean isRunning = true;
+
+       /** The maximum partition read time seen so far. */
+       private volatile long currentReadTime;
+
+       private transient PartitionDiscovery.Context context;
+
+       private transient PartitionDiscovery fetcher;
+
+       private transient Object checkpointLock;
+
+       private transient ListState<Long> currReadTimeState;
+
+       private transient ListState<List<List<String>>> distinctPartsState;
+
+       private transient IMetaStoreClient client;
+
+       private transient Properties tableProps;
+
+       private transient String defaultPartitionName;
+
+       private transient Set<List<String>> distinctPartitions;
+
+       public HiveContinuousMonitoringFunction(
+                       HiveShim hiveShim,
+                       JobConf conf,
+                       ObjectPath tablePath,
+                       CatalogTable catalogTable,
+                       int readerParallelism,
+                       String consumeOrder,
+                       String consumeOffset,
+                       String extractorType,
+                       String extractorClass,
+                       String extractorPattern,
+                       long interval) {
+               this.hiveShim = hiveShim;
+               this.conf = new JobConfWrapper(conf);
+               this.tablePath = tablePath;
+               this.partitionKeys = catalogTable.getPartitionKeys();
+               this.fieldNames = catalogTable.getSchema().getFieldNames();
+               this.fieldTypes = catalogTable.getSchema().getFieldDataTypes();
+               this.consumeOrder = consumeOrder;
+               this.extractorType = extractorType;
+               this.extractorClass = extractorClass;
+               this.extractorPattern = extractorPattern;
+               this.consumeOffset = consumeOffset;
+
+               this.interval = interval;
+               this.readerParallelism = Math.max(readerParallelism, 1);
+               this.currentReadTime = 0;
+       }
+
+       @Override
+       public void initializeState(FunctionInitializationContext context) 
throws Exception {
+               this.currReadTimeState = 
context.getOperatorStateStore().getListState(
+                       new ListStateDescriptor<>(
+                               "partition-monitoring-state",
+                               LongSerializer.INSTANCE
+                       )
+               );
+               this.distinctPartsState = 
context.getOperatorStateStore().getListState(
+                       new ListStateDescriptor<>(
+                               "partition-monitoring-state",
+                               new ListSerializer<>(new 
ListSerializer<>(StringSerializer.INSTANCE))
+                       )
+               );
+
+               this.client = this.hiveShim.getHiveMetastoreClient(new 
HiveConf(conf.conf(), HiveConf.class));
+
+               Table hiveTable = client.getTable(tablePath.getDatabaseName(), 
tablePath.getObjectName());
+               this.tableProps = 
HiveReflectionUtils.getTableMetadata(hiveShim, hiveTable);
+               this.defaultPartitionName = 
conf.conf().get(HiveConf.ConfVars.DEFAULTPARTITIONNAME.varname,
+                               
HiveConf.ConfVars.DEFAULTPARTITIONNAME.defaultStrVal);
+
+               PartitionTimeExtractor extractor = 
PartitionTimeExtractor.create(
+                               getRuntimeContext().getUserCodeClassLoader(),
+                               extractorType,
+                               extractorClass,
+                               extractorPattern);
+
+               this.fetcher = new DirectoryMonitorDiscovery();
+
+               Path location = new Path(hiveTable.getSd().getLocation());
+               FileSystem fs = location.getFileSystem(conf.conf());
+               this.context = new PartitionDiscovery.Context() {
+
+                       @Override
+                       public List<String> partitionKeys() {
+                               return partitionKeys;
+                       }
+
+                       @Override
+                       public Partition getPartition(List<String> partValues) 
throws TException {
+                               return client.getPartition(

Review comment:
       I think it's possible that a partition folder exists in FS but not in 
HMS, in which case that folder should be simply ignored instead of throwing an 
exception.

##########
File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/HiveContinuousMonitoringFunction.java
##########
@@ -0,0 +1,358 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connectors.hive.read;
+
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeutils.base.ListSerializer;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.connectors.hive.HiveTablePartition;
+import org.apache.flink.connectors.hive.HiveTableSource;
+import org.apache.flink.connectors.hive.JobConfWrapper;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import 
org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator;
+import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.hive.client.HiveShim;
+import org.apache.flink.table.catalog.hive.util.HiveReflectionUtils;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.filesystem.PartitionTimeExtractor;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.LocalDateTime;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Properties;
+import java.util.Set;
+import java.util.function.Supplier;
+
+import static 
org.apache.flink.table.filesystem.TimestampPartTimeExtractor.toLocalDateTime;
+
+/**
+ * This is the single (non-parallel) monitoring task which takes a {@link 
HiveTableInputFormat},
+ * it is responsible for:
+ *
+ * <ol>
+ *     <li>Monitoring partitions of hive meta store.</li>
+ *     <li>Deciding which partitions should be further read and processed.</li>
+ *     <li>Creating the {@link HiveTableInputSplit splits} corresponding to 
those partitions.</li>
+ *     <li>Assigning them to downstream tasks for further processing.</li>
+ * </ol>
+ *
+ * <p>The splits to be read are forwarded to the downstream {@link 
ContinuousFileReaderOperator}
+ * which can have parallelism greater than one.
+ *
+ * <p><b>IMPORTANT NOTE: </b> Splits are forwarded downstream for reading in 
ascending partition time order,
+ * based on the partition time of the partitions they belong to.
+ */
+public class HiveContinuousMonitoringFunction
+               extends RichSourceFunction<TimestampedHiveInputSplit>
+               implements CheckpointedFunction {
+
+       private static final long serialVersionUID = 1L;
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(HiveContinuousMonitoringFunction.class);
+
+       /** The parallelism of the downstream readers. */
+       private final int readerParallelism;
+
+       /** The interval between consecutive path scans. */
+       private final long interval;
+
+       private final HiveShim hiveShim;
+
+       private final JobConfWrapper conf;
+
+       private final ObjectPath tablePath;
+
+       private final List<String> partitionKeys;
+
+       private final String[] fieldNames;
+
+       private final DataType[] fieldTypes;
+
+       // consumer variables
+       private final String consumeOrder;
+       private final String consumeOffset;
+
+       // extractor variables
+       private final String extractorType;
+       private final String extractorClass;
+       private final String extractorPattern;
+
+       private volatile boolean isRunning = true;
+
+       /** The maximum partition read time seen so far. */
+       private volatile long currentReadTime;
+
+       private transient PartitionDiscovery.Context context;
+
+       private transient PartitionDiscovery fetcher;
+
+       private transient Object checkpointLock;
+
+       private transient ListState<Long> currReadTimeState;
+
+       private transient ListState<List<List<String>>> distinctPartsState;
+
+       private transient IMetaStoreClient client;
+
+       private transient Properties tableProps;
+
+       private transient String defaultPartitionName;
+
+       private transient Set<List<String>> distinctPartitions;
+
+       public HiveContinuousMonitoringFunction(
+                       HiveShim hiveShim,
+                       JobConf conf,
+                       ObjectPath tablePath,
+                       CatalogTable catalogTable,
+                       int readerParallelism,
+                       String consumeOrder,
+                       String consumeOffset,
+                       String extractorType,
+                       String extractorClass,
+                       String extractorPattern,
+                       long interval) {
+               this.hiveShim = hiveShim;
+               this.conf = new JobConfWrapper(conf);
+               this.tablePath = tablePath;
+               this.partitionKeys = catalogTable.getPartitionKeys();
+               this.fieldNames = catalogTable.getSchema().getFieldNames();
+               this.fieldTypes = catalogTable.getSchema().getFieldDataTypes();
+               this.consumeOrder = consumeOrder;
+               this.extractorType = extractorType;
+               this.extractorClass = extractorClass;
+               this.extractorPattern = extractorPattern;
+               this.consumeOffset = consumeOffset;
+
+               this.interval = interval;
+               this.readerParallelism = Math.max(readerParallelism, 1);
+               this.currentReadTime = 0;
+       }
+
+       @Override
+       public void initializeState(FunctionInitializationContext context) 
throws Exception {
+               this.currReadTimeState = 
context.getOperatorStateStore().getListState(
+                       new ListStateDescriptor<>(
+                               "partition-monitoring-state",
+                               LongSerializer.INSTANCE
+                       )
+               );
+               this.distinctPartsState = 
context.getOperatorStateStore().getListState(
+                       new ListStateDescriptor<>(
+                               "partition-monitoring-state",
+                               new ListSerializer<>(new 
ListSerializer<>(StringSerializer.INSTANCE))
+                       )
+               );
+
+               this.client = this.hiveShim.getHiveMetastoreClient(new 
HiveConf(conf.conf(), HiveConf.class));
+
+               Table hiveTable = client.getTable(tablePath.getDatabaseName(), 
tablePath.getObjectName());
+               this.tableProps = 
HiveReflectionUtils.getTableMetadata(hiveShim, hiveTable);
+               this.defaultPartitionName = 
conf.conf().get(HiveConf.ConfVars.DEFAULTPARTITIONNAME.varname,
+                               
HiveConf.ConfVars.DEFAULTPARTITIONNAME.defaultStrVal);
+
+               PartitionTimeExtractor extractor = 
PartitionTimeExtractor.create(
+                               getRuntimeContext().getUserCodeClassLoader(),
+                               extractorType,
+                               extractorClass,
+                               extractorPattern);
+
+               this.fetcher = new DirectoryMonitorDiscovery();
+
+               Path location = new Path(hiveTable.getSd().getLocation());
+               FileSystem fs = location.getFileSystem(conf.conf());
+               this.context = new PartitionDiscovery.Context() {
+
+                       @Override
+                       public List<String> partitionKeys() {
+                               return partitionKeys;
+                       }
+
+                       @Override
+                       public Partition getPartition(List<String> partValues) 
throws TException {
+                               return client.getPartition(
+                                               tablePath.getDatabaseName(),
+                                               tablePath.getObjectName(),
+                                               partValues);
+                       }
+
+                       @Override
+                       public FileSystem fileSystem() {
+                               return fs;
+                       }
+
+                       @Override
+                       public Path tableLocation() {
+                               return new 
Path(hiveTable.getSd().getLocation());
+                       }
+
+                       @Override
+                       public long extractTimestamp(
+                                       List<String> partKeys,
+                                       List<String> partValues,
+                                       Supplier<Long> fileTime) {
+                               switch (consumeOrder) {
+                                       case "create-time":

Review comment:
       Should add constants for `create-time` and `partition-time`.

##########
File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/HiveContinuousMonitoringFunction.java
##########
@@ -0,0 +1,358 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connectors.hive.read;
+
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeutils.base.ListSerializer;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.connectors.hive.HiveTablePartition;
+import org.apache.flink.connectors.hive.HiveTableSource;
+import org.apache.flink.connectors.hive.JobConfWrapper;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import 
org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator;
+import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.hive.client.HiveShim;
+import org.apache.flink.table.catalog.hive.util.HiveReflectionUtils;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.filesystem.PartitionTimeExtractor;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.LocalDateTime;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Properties;
+import java.util.Set;
+import java.util.function.Supplier;
+
+import static 
org.apache.flink.table.filesystem.TimestampPartTimeExtractor.toLocalDateTime;
+
+/**
+ * This is the single (non-parallel) monitoring task which takes a {@link 
HiveTableInputFormat},
+ * it is responsible for:
+ *
+ * <ol>
+ *     <li>Monitoring partitions of hive meta store.</li>
+ *     <li>Deciding which partitions should be further read and processed.</li>
+ *     <li>Creating the {@link HiveTableInputSplit splits} corresponding to 
those partitions.</li>
+ *     <li>Assigning them to downstream tasks for further processing.</li>
+ * </ol>
+ *
+ * <p>The splits to be read are forwarded to the downstream {@link 
ContinuousFileReaderOperator}
+ * which can have parallelism greater than one.
+ *
+ * <p><b>IMPORTANT NOTE: </b> Splits are forwarded downstream for reading in 
ascending partition time order,
+ * based on the partition time of the partitions they belong to.
+ */
+public class HiveContinuousMonitoringFunction
+               extends RichSourceFunction<TimestampedHiveInputSplit>
+               implements CheckpointedFunction {
+
+       private static final long serialVersionUID = 1L;
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(HiveContinuousMonitoringFunction.class);
+
+       /** The parallelism of the downstream readers. */
+       private final int readerParallelism;
+
+       /** The interval between consecutive path scans. */
+       private final long interval;
+
+       private final HiveShim hiveShim;
+
+       private final JobConfWrapper conf;
+
+       private final ObjectPath tablePath;
+
+       private final List<String> partitionKeys;
+
+       private final String[] fieldNames;
+
+       private final DataType[] fieldTypes;
+
+       // consumer variables
+       private final String consumeOrder;
+       private final String consumeOffset;
+
+       // extractor variables
+       private final String extractorType;
+       private final String extractorClass;
+       private final String extractorPattern;
+
+       private volatile boolean isRunning = true;
+
+       /** The maximum partition read time seen so far. */
+       private volatile long currentReadTime;
+
+       private transient PartitionDiscovery.Context context;
+
+       private transient PartitionDiscovery fetcher;
+
+       private transient Object checkpointLock;
+
+       private transient ListState<Long> currReadTimeState;
+
+       private transient ListState<List<List<String>>> distinctPartsState;
+
+       private transient IMetaStoreClient client;
+
+       private transient Properties tableProps;
+
+       private transient String defaultPartitionName;
+
+       private transient Set<List<String>> distinctPartitions;
+
+       public HiveContinuousMonitoringFunction(
+                       HiveShim hiveShim,
+                       JobConf conf,
+                       ObjectPath tablePath,
+                       CatalogTable catalogTable,
+                       int readerParallelism,
+                       String consumeOrder,
+                       String consumeOffset,
+                       String extractorType,
+                       String extractorClass,
+                       String extractorPattern,
+                       long interval) {
+               this.hiveShim = hiveShim;
+               this.conf = new JobConfWrapper(conf);
+               this.tablePath = tablePath;
+               this.partitionKeys = catalogTable.getPartitionKeys();
+               this.fieldNames = catalogTable.getSchema().getFieldNames();
+               this.fieldTypes = catalogTable.getSchema().getFieldDataTypes();
+               this.consumeOrder = consumeOrder;
+               this.extractorType = extractorType;
+               this.extractorClass = extractorClass;
+               this.extractorPattern = extractorPattern;
+               this.consumeOffset = consumeOffset;
+
+               this.interval = interval;
+               this.readerParallelism = Math.max(readerParallelism, 1);
+               this.currentReadTime = 0;
+       }
+
+       @Override
+       public void initializeState(FunctionInitializationContext context) 
throws Exception {
+               this.currReadTimeState = 
context.getOperatorStateStore().getListState(
+                       new ListStateDescriptor<>(
+                               "partition-monitoring-state",
+                               LongSerializer.INSTANCE
+                       )
+               );
+               this.distinctPartsState = 
context.getOperatorStateStore().getListState(
+                       new ListStateDescriptor<>(
+                               "partition-monitoring-state",
+                               new ListSerializer<>(new 
ListSerializer<>(StringSerializer.INSTANCE))
+                       )
+               );
+
+               this.client = this.hiveShim.getHiveMetastoreClient(new 
HiveConf(conf.conf(), HiveConf.class));
+
+               Table hiveTable = client.getTable(tablePath.getDatabaseName(), 
tablePath.getObjectName());
+               this.tableProps = 
HiveReflectionUtils.getTableMetadata(hiveShim, hiveTable);
+               this.defaultPartitionName = 
conf.conf().get(HiveConf.ConfVars.DEFAULTPARTITIONNAME.varname,
+                               
HiveConf.ConfVars.DEFAULTPARTITIONNAME.defaultStrVal);
+
+               PartitionTimeExtractor extractor = 
PartitionTimeExtractor.create(
+                               getRuntimeContext().getUserCodeClassLoader(),
+                               extractorType,
+                               extractorClass,
+                               extractorPattern);
+
+               this.fetcher = new DirectoryMonitorDiscovery();
+
+               Path location = new Path(hiveTable.getSd().getLocation());
+               FileSystem fs = location.getFileSystem(conf.conf());
+               this.context = new PartitionDiscovery.Context() {
+
+                       @Override
+                       public List<String> partitionKeys() {
+                               return partitionKeys;
+                       }
+
+                       @Override
+                       public Partition getPartition(List<String> partValues) 
throws TException {
+                               return client.getPartition(
+                                               tablePath.getDatabaseName(),
+                                               tablePath.getObjectName(),
+                                               partValues);
+                       }
+
+                       @Override
+                       public FileSystem fileSystem() {
+                               return fs;
+                       }
+
+                       @Override
+                       public Path tableLocation() {
+                               return new 
Path(hiveTable.getSd().getLocation());
+                       }
+
+                       @Override
+                       public long extractTimestamp(
+                                       List<String> partKeys,
+                                       List<String> partValues,
+                                       Supplier<Long> fileTime) {
+                               switch (consumeOrder) {
+                                       case "create-time":
+                                               return fileTime.get();
+                                       case "partition-time":
+                                               return 
toMills(extractor.extract(partKeys, partValues));
+                                       default:
+                                               throw new 
UnsupportedOperationException(
+                                                               "Unsupported 
consumer order: " + consumeOrder);
+                               }
+                       }
+               };
+
+               this.distinctPartitions = new HashSet<>();
+               if (context.isRestored()) {
+                       LOG.info("Restoring state for the {}.", 
getClass().getSimpleName());
+                       this.currentReadTime = 
this.currReadTimeState.get().iterator().next();
+                       
this.distinctPartitions.addAll(this.distinctPartsState.get().iterator().next());
+               } else {
+                       LOG.info("No state to restore for the {}.", 
getClass().getSimpleName());
+                       if (consumeOffset != null) {
+                               this.currentReadTime = 
toMills(toLocalDateTime(consumeOffset));
+                       }
+               }
+       }
+
+       @Override
+       public void run(SourceContext<TimestampedHiveInputSplit> context) 
throws Exception {
+               checkpointLock = context.getCheckpointLock();
+               while (isRunning) {
+                       synchronized (checkpointLock) {
+                               monitorAndForwardSplits(context);
+                       }
+                       Thread.sleep(interval);
+               }
+       }
+
+       private void monitorAndForwardSplits(
+                       SourceContext<TimestampedHiveInputSplit> context) 
throws Exception {
+               assert (Thread.holdsLock(checkpointLock));
+
+               List<Tuple2<Partition, Long>> partitions = 
fetcher.fetchPartitions(this.context, currentReadTime);
+
+               if (partitions.isEmpty()) {
+                       return;
+               }
+
+               partitions.sort((o1, o2) -> (int) (o1.f1 - o2.f1));
+
+               long maxTimestamp = Long.MIN_VALUE;
+               Set<List<String>> nextDistinctParts = new HashSet<>();
+               for (Tuple2<Partition, Long> tuple2 : partitions) {
+                       Partition partition = tuple2.f0;
+                       List<String> partSpec = partition.getValues();
+                       if (!this.distinctPartitions.contains(partSpec)) {
+                               
this.distinctPartitions.add(partition.getValues());

Review comment:
       this.distinctPartitions.add(partSpec)?

##########
File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/HiveContinuousMonitoringFunction.java
##########
@@ -0,0 +1,280 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connectors.hive.read;
+
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeutils.base.ListSerializer;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.connectors.hive.HiveTablePartition;
+import org.apache.flink.connectors.hive.HiveTableSource;
+import org.apache.flink.connectors.hive.JobConfWrapper;
+import 
org.apache.flink.connectors.hive.read.PartitionStrategy.PartitionStrategyFactory;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import 
org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator;
+import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.hive.client.HiveShim;
+import org.apache.flink.table.catalog.hive.util.HiveReflectionUtils;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Properties;
+import java.util.Set;
+
+/**
+ * This is the single (non-parallel) monitoring task which takes a {@link 
HiveTableInputFormat},
+ * it is responsible for:
+ *
+ * <ol>
+ *     <li>Monitoring partitions of hive meta store.</li>
+ *     <li>Deciding which partitions should be further read and processed.</li>
+ *     <li>Creating the {@link HiveTableInputSplit splits} corresponding to 
those partitions.</li>
+ *     <li>Assigning them to downstream tasks for further processing.</li>
+ * </ol>
+ *
+ * <p>The splits to be read are forwarded to the downstream {@link 
ContinuousFileReaderOperator}
+ * which can have parallelism greater than one.
+ *
+ * <p><b>IMPORTANT NOTE: </b> Splits are forwarded downstream for reading in 
ascending partition time order,
+ * based on the partition time of the partitions they belong to.
+ */
+public class HiveContinuousMonitoringFunction
+               extends RichSourceFunction<TimestampedHiveInputSplit>
+               implements CheckpointedFunction {
+
+       private static final long serialVersionUID = 1L;
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(HiveContinuousMonitoringFunction.class);
+
+       /** The parallelism of the downstream readers. */
+       private final int readerParallelism;
+
+       /** The interval between consecutive path scans. */
+       private final long interval;
+
+       private final HiveShim hiveShim;
+
+       private final JobConfWrapper conf;
+
+       private final ObjectPath tablePath;
+
+       private final List<String> partitionKeys;
+
+       private final String[] fieldNames;
+
+       private final DataType[] fieldTypes;
+
+       private final long startupTimestampMillis;
+
+       private final PartitionStrategyFactory partStrategyFactory;
+
+       private volatile boolean isRunning = true;
+
+       /** The maximum partition read time seen so far. */
+       private volatile long currentReadTime;
+
+       private transient PartitionStrategy strategy;
+
+       private transient Object checkpointLock;
+
+       private transient ListState<Long> currReadTimeState;
+
+       private transient ListState<List<List<String>>> distinctPartsState;
+
+       private transient IMetaStoreClient client;
+
+       private transient Properties tableProps;
+
+       private transient String defaultPartitionName;
+
+       private transient Set<List<String>> distinctPartitions;
+
+       public HiveContinuousMonitoringFunction(
+                       HiveShim hiveShim,
+                       JobConf conf,
+                       ObjectPath tablePath,
+                       CatalogTable catalogTable,
+                       long startupTimestampMillis,
+                       PartitionStrategyFactory partStrategyFactory,
+                       int readerParallelism,
+                       long interval) {
+               this.hiveShim = hiveShim;
+               this.conf = new JobConfWrapper(conf);
+               this.tablePath = tablePath;
+               this.partitionKeys = catalogTable.getPartitionKeys();
+               this.fieldNames = catalogTable.getSchema().getFieldNames();
+               this.fieldTypes = catalogTable.getSchema().getFieldDataTypes();
+               this.startupTimestampMillis = startupTimestampMillis;
+               this.partStrategyFactory = partStrategyFactory;
+
+               this.interval = interval;
+               this.readerParallelism = Math.max(readerParallelism, 1);
+               this.currentReadTime = Long.MIN_VALUE;
+       }
+
+       @Override
+       public void initializeState(FunctionInitializationContext context) 
throws Exception {
+               this.strategy = 
partStrategyFactory.createStrategy(getRuntimeContext().getUserCodeClassLoader());
+
+               this.currReadTimeState = 
context.getOperatorStateStore().getListState(
+                       new ListStateDescriptor<>(
+                               "partition-monitoring-state",
+                               LongSerializer.INSTANCE
+                       )
+               );
+               this.distinctPartsState = 
context.getOperatorStateStore().getListState(
+                       new ListStateDescriptor<>(
+                               "partition-monitoring-state",
+                               new ListSerializer<>(new 
ListSerializer<>(StringSerializer.INSTANCE))
+                       )
+               );
+
+               this.client = this.hiveShim.getHiveMetastoreClient(new 
HiveConf(conf.conf(), HiveConf.class));
+
+               Table hiveTable = client.getTable(tablePath.getDatabaseName(), 
tablePath.getObjectName());
+               this.tableProps = 
HiveReflectionUtils.getTableMetadata(hiveShim, hiveTable);
+               this.defaultPartitionName = 
conf.conf().get(HiveConf.ConfVars.DEFAULTPARTITIONNAME.varname,
+                               
HiveConf.ConfVars.DEFAULTPARTITIONNAME.defaultStrVal);
+
+               this.distinctPartitions = new HashSet<>();
+               if (context.isRestored()) {
+                       LOG.info("Restoring state for the {}.", 
getClass().getSimpleName());
+                       this.currentReadTime = 
this.currReadTimeState.get().iterator().next();
+                       
this.distinctPartitions.addAll(this.distinctPartsState.get().iterator().next());
+               } else {
+                       LOG.info("No state to restore for the {}.", 
getClass().getSimpleName());
+                       this.currentReadTime = this.startupTimestampMillis;
+               }
+       }
+
+       @Override
+       public void run(SourceContext<TimestampedHiveInputSplit> context) 
throws Exception {
+               checkpointLock = context.getCheckpointLock();
+               while (isRunning) {
+                       synchronized (checkpointLock) {
+                               monitorAndForwardSplits(context);
+                       }
+                       Thread.sleep(interval);
+               }
+       }
+
+       private void monitorAndForwardSplits(
+                       SourceContext<TimestampedHiveInputSplit> context) 
throws IOException, TException {
+               assert (Thread.holdsLock(checkpointLock));
+
+               List<Partition> partitions = client.listPartitionsByFilter(
+                               tablePath.getDatabaseName(),
+                               tablePath.getObjectName(),
+                               strategy.generateFetchFilter(partitionKeys, 
currentReadTime),
+                               (short) -1);
+
+               if (partitions.isEmpty()) {
+                       return;
+               }
+
+               long maxTime = Long.MIN_VALUE;
+               for (Partition partition : partitions) {
+                       List<String> partSpec = partition.getValues();
+                       if (!this.distinctPartitions.contains(partSpec)) {
+                               
this.distinctPartitions.add(partition.getValues());
+                               long time = 
this.strategy.extractPartTime(partitionKeys, partition.getValues());
+                               if (time > maxTime) {
+                                       maxTime = time;
+                               }
+                               HiveTableInputSplit[] splits = 
HiveTableInputFormat.createInputSplits(
+                                               this.readerParallelism,
+                                               
Collections.singletonList(toHiveTablePartition(partition)),
+                                               this.conf.conf());
+                               for (HiveTableInputSplit split : splits) {
+                                       context.collect(new 
TimestampedHiveInputSplit(time, split));
+                               }
+                       }
+               }
+               this.currentReadTime = maxTime;
+
+               this.distinctPartitions.removeIf(partSpec -> 
this.strategy.canExpireForDistinct(
+                               this.strategy.extractPartTime(partitionKeys, 
partSpec),
+                               this.currentReadTime));
+       }
+
+       private HiveTablePartition toHiveTablePartition(Partition p) {
+               return HiveTableSource.toHiveTablePartition(
+                               partitionKeys, fieldNames, fieldTypes, 
hiveShim, tableProps, defaultPartitionName, p);
+       }
+
+       @Override
+       public void snapshotState(FunctionSnapshotContext context) throws 
Exception {
+               Preconditions.checkState(this.currReadTimeState != null,
+                               "The " + getClass().getSimpleName() + " state 
has not been properly initialized.");
+
+               this.currReadTimeState.clear();
+               this.currReadTimeState.add(this.currentReadTime);
+
+               this.distinctPartsState.clear();
+               this.distinctPartsState.add(new 
ArrayList<>(this.distinctPartitions));
+
+               if (LOG.isDebugEnabled()) {
+                       LOG.debug("{} checkpointed {}.", 
getClass().getSimpleName(), currentReadTime);
+               }
+       }
+
+       @Override
+       public void close() throws Exception {
+               super.close();
+
+               if (checkpointLock != null) {
+                       synchronized (checkpointLock) {
+                               currentReadTime = Long.MAX_VALUE;
+                               isRunning = false;
+                       }
+               }
+       }
+
+       @Override
+       public void cancel() {
+               if (checkpointLock != null) {
+                       // this is to cover the case where cancel() is called 
before the run()

Review comment:
       Let's move this comment to the outside of the if block, so that it's 
easier to understand.

##########
File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/SplitReader.java
##########
@@ -46,4 +46,17 @@
         * @throws IOException Thrown, if an I/O error occurred.
         */
        RowData nextRecord(RowData reuse) throws IOException;
+
+       /**
+        * Seek to a particular row number.
+        */
+       default void seekToRow(long rowCount, RowData reuse) throws IOException 
{
+               for (int i = 0; i < rowCount; i++) {
+                       boolean end = reachedEnd();
+                       if (end) {
+                               throw new RuntimeException("Seek to many 
rows.");

Review comment:
       "Seek too many rows." ?

##########
File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/TimestampedHiveInputSplit.java
##########
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connectors.hive.read;
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInfoFactory;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.SimpleTypeSerializerSnapshot;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
+import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton;
+import org.apache.flink.api.java.typeutils.runtime.DataInputViewStream;
+import org.apache.flink.api.java.typeutils.runtime.DataOutputViewStream;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.streaming.api.functions.source.TimestampedInputSplit;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.InstantiationUtil;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.lang.reflect.Type;
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * A {@link HiveTableInputSplit} with {@link TimestampedInputSplit}.
+ * Kryo serializer can not deal with hadoop split, need specific type 
information factory.
+ */
+@TypeInfo(TimestampedHiveInputSplit.SplitTypeInfoFactory.class)
+public class TimestampedHiveInputSplit extends HiveTableInputSplit implements 
TimestampedInputSplit {
+
+       private static final long serialVersionUID = 1L;
+
+       /** The modification time of the file this split belongs to. */
+       private final long modificationTime;
+
+       /**
+        * The state of the split. This information is used when
+        * restoring from a checkpoint and allows to resume reading the
+        * underlying file from the point we left off.
+        * */
+       private Serializable splitState;
+
+       public TimestampedHiveInputSplit(
+                       long modificationTime,
+                       HiveTableInputSplit split) {
+               super(
+                               split.getSplitNumber(),
+                               split.getHadoopInputSplit(),
+                               split.getJobConf(),
+                               split.getHiveTablePartition());
+               this.modificationTime = modificationTime;
+       }
+
+       @Override
+       public void setSplitState(Serializable state) {
+               this.splitState = state;
+       }
+
+       @Override
+       public Serializable getSplitState() {
+               return this.splitState;
+       }
+
+       @Override
+       public long getModificationTime() {
+               return modificationTime;
+       }
+
+       @Override
+       public int compareTo(TimestampedInputSplit o) {
+               TimestampedHiveInputSplit split = (TimestampedHiveInputSplit) o;
+               int modTimeComp = Long.compare(this.modificationTime, 
split.modificationTime);
+               if (modTimeComp != 0L) {
+                       return modTimeComp;
+               }
+
+               int pathComp = 
this.hiveTablePartition.getStorageDescriptor().compareTo(
+                               
split.hiveTablePartition.getStorageDescriptor());
+
+               return pathComp != 0 ? pathComp :
+                               this.getSplitNumber() - o.getSplitNumber();
+       }
+
+       @Override
+       public boolean equals(Object o) {
+               if (this == o) {
+                       return true;
+               }
+               if (o == null || getClass() != o.getClass()) {
+                       return false;
+               }
+               if (!super.equals(o)) {

Review comment:
       For two split instances, this means `equals` can return false while 
`compareTo` returns 0. Is this intended behavior?

##########
File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/TimestampedHiveInputSplit.java
##########
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connectors.hive.read;
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInfoFactory;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.SimpleTypeSerializerSnapshot;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
+import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton;
+import org.apache.flink.api.java.typeutils.runtime.DataInputViewStream;
+import org.apache.flink.api.java.typeutils.runtime.DataOutputViewStream;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.streaming.api.functions.source.TimestampedInputSplit;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.InstantiationUtil;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.lang.reflect.Type;
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * A {@link HiveTableInputSplit} with {@link TimestampedInputSplit}.
+ * Kryo serializer can not deal with hadoop split, need specific type 
information factory.
+ */
+@TypeInfo(TimestampedHiveInputSplit.SplitTypeInfoFactory.class)
+public class TimestampedHiveInputSplit extends HiveTableInputSplit implements 
TimestampedInputSplit {
+
+       private static final long serialVersionUID = 1L;
+
+       /** The modification time of the file this split belongs to. */
+       private final long modificationTime;
+
+       /**
+        * The state of the split. This information is used when
+        * restoring from a checkpoint and allows to resume reading the
+        * underlying file from the point we left off.
+        * */
+       private Serializable splitState;
+
+       public TimestampedHiveInputSplit(
+                       long modificationTime,
+                       HiveTableInputSplit split) {
+               super(
+                               split.getSplitNumber(),
+                               split.getHadoopInputSplit(),
+                               split.getJobConf(),
+                               split.getHiveTablePartition());
+               this.modificationTime = modificationTime;
+       }
+
+       @Override
+       public void setSplitState(Serializable state) {
+               this.splitState = state;
+       }
+
+       @Override
+       public Serializable getSplitState() {
+               return this.splitState;
+       }
+
+       @Override
+       public long getModificationTime() {
+               return modificationTime;
+       }
+
+       @Override
+       public int compareTo(TimestampedInputSplit o) {
+               TimestampedHiveInputSplit split = (TimestampedHiveInputSplit) o;
+               int modTimeComp = Long.compare(this.modificationTime, 
split.modificationTime);
+               if (modTimeComp != 0L) {
+                       return modTimeComp;
+               }
+
+               int pathComp = 
this.hiveTablePartition.getStorageDescriptor().compareTo(

Review comment:
       I think this compares more than just paths.

##########
File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/FileSystemOptions.java
##########
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.filesystem;
+
+import org.apache.flink.configuration.ConfigOption;
+
+import java.time.Duration;
+
+import static org.apache.flink.configuration.ConfigOptions.key;
+
+/**
+ * This class holds configuration constants used by filesystem(Including hive) 
connector.
+ */
+public class FileSystemOptions {
+
+       public static final ConfigOption<Boolean> STREAMING_SOURCE_ENABLE =
+                       key("streaming-source.enable")
+                                       .booleanType()
+                                       .defaultValue(false)
+                                       .withDescription("Enable streaming 
source or not.");
+
+       public static final ConfigOption<Duration> 
STREAMING_SOURCE_MONITOR_INTERVAL =
+                       key("streaming-source.monitor-interval")
+                                       .durationType()
+                                       .defaultValue(Duration.ofMillis(1))
+                                       .withDescription("The minimum interval 
allowed between consecutive partition/file discovery.");

Review comment:
       Why it's the minimum interval? It may seem confusing to users.

##########
File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/FileSystemOptions.java
##########
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.filesystem;
+
+import org.apache.flink.configuration.ConfigOption;
+
+import java.time.Duration;
+
+import static org.apache.flink.configuration.ConfigOptions.key;
+
+/**
+ * This class holds configuration constants used by filesystem(Including hive) 
connector.
+ */
+public class FileSystemOptions {
+
+       public static final ConfigOption<Boolean> STREAMING_SOURCE_ENABLE =

Review comment:
       Wondering what's the relationship between this and the execution mode? 
E.g. can I enable streaming source with batch execution mode?

##########
File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/FileSystemOptions.java
##########
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.filesystem;
+
+import org.apache.flink.configuration.ConfigOption;
+
+import java.time.Duration;
+
+import static org.apache.flink.configuration.ConfigOptions.key;
+
+/**
+ * This class holds configuration constants used by filesystem(Including hive) 
connector.
+ */
+public class FileSystemOptions {
+
+       public static final ConfigOption<Boolean> STREAMING_SOURCE_ENABLE =
+                       key("streaming-source.enable")
+                                       .booleanType()
+                                       .defaultValue(false)
+                                       .withDescription("Enable streaming 
source or not.");
+
+       public static final ConfigOption<Duration> 
STREAMING_SOURCE_MONITOR_INTERVAL =
+                       key("streaming-source.monitor-interval")
+                                       .durationType()
+                                       .defaultValue(Duration.ofMillis(1))
+                                       .withDescription("The minimum interval 
allowed between consecutive partition/file discovery.");
+
+       public static final ConfigOption<String> STREAMING_SOURCE_CONSUME_ORDER 
=
+                       key("streaming-source.consume-order")
+                                       .stringType()
+                                       .defaultValue("create-time")
+                                       .withDescription("The consume order of 
streaming source," +
+                                                       " support create-time 
and partition-time." +
+                                                       " create-time compare 
partition/file creation time;" +
+                                                       " partition-time 
compare time represented by partition name.");
+
+       public static final ConfigOption<String> 
STREAMING_SOURCE_CONSUME_START_OFFSET =
+                       key("streaming-source.consume-start-offset")
+                                       .stringType()
+                                       .defaultValue("1970-00-00")
+                                       .withDescription("Start offset for 
streaming consuming." +
+                                                       " How to parse and 
compare offsets depends on your order." +
+                                                       " For create-time and 
partition-time, should be a timestamp string.");
+
+       public static final ConfigOption<String> PARTITION_TIME_EXTRACTOR_TYPE =
+                       key("partition.time-extractor.type")
+                                       .stringType()
+                                       .defaultValue("timestamp")
+                                       .withDescription("Time extractor to 
extract time from partition values." +

Review comment:
       Clarify this should only be used if `STREAMING_SOURCE_CONSUME_ORDER` is 
set to `partition-time`?

##########
File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/FileSystemOptions.java
##########
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.filesystem;
+
+import org.apache.flink.configuration.ConfigOption;
+
+import java.time.Duration;
+
+import static org.apache.flink.configuration.ConfigOptions.key;
+
+/**
+ * This class holds configuration constants used by filesystem(Including hive) 
connector.
+ */
+public class FileSystemOptions {
+
+       public static final ConfigOption<Boolean> STREAMING_SOURCE_ENABLE =
+                       key("streaming-source.enable")
+                                       .booleanType()
+                                       .defaultValue(false)
+                                       .withDescription("Enable streaming 
source or not.");
+
+       public static final ConfigOption<Duration> 
STREAMING_SOURCE_MONITOR_INTERVAL =
+                       key("streaming-source.monitor-interval")
+                                       .durationType()
+                                       .defaultValue(Duration.ofMillis(1))
+                                       .withDescription("The minimum interval 
allowed between consecutive partition/file discovery.");
+
+       public static final ConfigOption<String> STREAMING_SOURCE_CONSUME_ORDER 
=
+                       key("streaming-source.consume-order")
+                                       .stringType()
+                                       .defaultValue("create-time")
+                                       .withDescription("The consume order of 
streaming source," +
+                                                       " support create-time 
and partition-time." +
+                                                       " create-time compare 
partition/file creation time;" +
+                                                       " partition-time 
compare time represented by partition name.");
+
+       public static final ConfigOption<String> 
STREAMING_SOURCE_CONSUME_START_OFFSET =
+                       key("streaming-source.consume-start-offset")
+                                       .stringType()
+                                       .defaultValue("1970-00-00")
+                                       .withDescription("Start offset for 
streaming consuming." +
+                                                       " How to parse and 
compare offsets depends on your order." +
+                                                       " For create-time and 
partition-time, should be a timestamp string.");
+
+       public static final ConfigOption<String> PARTITION_TIME_EXTRACTOR_TYPE =
+                       key("partition.time-extractor.type")
+                                       .stringType()
+                                       .defaultValue("timestamp")
+                                       .withDescription("Time extractor to 
extract time from partition values." +
+                                                       " Support timestamp and 
custom." +
+                                                       " For timestamp, can 
configure timestamp pattern." +
+                                                       " For custom, should 
configure extractor class.");
+
+       public static final ConfigOption<String> PARTITION_TIME_EXTRACTOR_CLASS 
=
+                       key("partition.time-extractor.class")
+                                       .stringType()
+                                       .noDefaultValue()
+                                       .withDescription("The extractor class 
for implement PartitionTimeExtractor interface.");
+
+       public static final ConfigOption<String> 
PARTITION_TIME_EXTRACTOR_TIMESTAMP_PATTERN =
+                       key("partition.time-extractor.timestamp-pattern")
+                                       .stringType()
+                                       .noDefaultValue()
+                                       .withDescription("The 'timestamp' 
construction way allows users to use partition" +
+                                                       " fields to get a legal 
timestamp pattern." +
+                                                       " Default support 
'yyyy-mm-dd hh:mm:ss' from first field." +
+                                                       " If timestamp in 
partition is single field 'dt', can configure: '$dt'." +
+                                                       " If timestamp in 
partition is year, month, day, hour," +
+                                                       " can configure: 
'$year-$month-$day $hour:00:00'." +
+                                                       " If timestamp in 
partition is dt and hour, can configure: '$dt $hour:00:00'.");
+
+       public static final ConfigOption<Duration> PARTITION_TIME_INTERVAL =

Review comment:
       Seems unused

##########
File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/FileSystemOptions.java
##########
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.filesystem;
+
+import org.apache.flink.configuration.ConfigOption;
+
+import java.time.Duration;
+
+import static org.apache.flink.configuration.ConfigOptions.key;
+
+/**
+ * This class holds configuration constants used by filesystem(Including hive) 
connector.
+ */
+public class FileSystemOptions {
+
+       public static final ConfigOption<Boolean> STREAMING_SOURCE_ENABLE =
+                       key("streaming-source.enable")
+                                       .booleanType()
+                                       .defaultValue(false)
+                                       .withDescription("Enable streaming 
source or not.");
+
+       public static final ConfigOption<Duration> 
STREAMING_SOURCE_MONITOR_INTERVAL =
+                       key("streaming-source.monitor-interval")
+                                       .durationType()
+                                       .defaultValue(Duration.ofMillis(1))
+                                       .withDescription("The minimum interval 
allowed between consecutive partition/file discovery.");
+
+       public static final ConfigOption<String> STREAMING_SOURCE_CONSUME_ORDER 
=
+                       key("streaming-source.consume-order")
+                                       .stringType()
+                                       .defaultValue("create-time")
+                                       .withDescription("The consume order of 
streaming source," +
+                                                       " support create-time 
and partition-time." +
+                                                       " create-time compare 
partition/file creation time;" +
+                                                       " partition-time 
compare time represented by partition name.");
+
+       public static final ConfigOption<String> 
STREAMING_SOURCE_CONSUME_START_OFFSET =
+                       key("streaming-source.consume-start-offset")
+                                       .stringType()
+                                       .defaultValue("1970-00-00")
+                                       .withDescription("Start offset for 
streaming consuming." +
+                                                       " How to parse and 
compare offsets depends on your order." +
+                                                       " For create-time and 
partition-time, should be a timestamp string.");
+
+       public static final ConfigOption<String> PARTITION_TIME_EXTRACTOR_TYPE =
+                       key("partition.time-extractor.type")
+                                       .stringType()
+                                       .defaultValue("timestamp")
+                                       .withDescription("Time extractor to 
extract time from partition values." +
+                                                       " Support timestamp and 
custom." +

Review comment:
       I find `timestamp` somehow difficult to understand. According to the 
`PartitionTimeExtractor` interface, any implementation should extract a 
"timestamp". We may as well just call it `default`?

##########
File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/PartitionTimeExtractor.java
##########
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.filesystem;
+
+import org.apache.flink.annotation.Experimental;
+
+import java.io.Serializable;
+import java.time.LocalDateTime;
+import java.util.List;
+
+/**
+ * Time extractor to extract time from partition values.
+ */
+@Experimental
+public interface PartitionTimeExtractor extends Serializable {
+
+       /**
+        * Extract time from partition keys and values.
+        */
+       LocalDateTime extract(List<String> partitionKeys, List<String> 
partitionValues);
+
+       static PartitionTimeExtractor create(
+                       ClassLoader userClassLoader,
+                       String extractorType,
+                       String extractorClass,
+                       String extractorPattern) {
+               switch (extractorType) {
+                       case "timestamp":

Review comment:
       Should add constants for these values

##########
File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/FileSystemOptions.java
##########
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.filesystem;
+
+import org.apache.flink.configuration.ConfigOption;
+
+import java.time.Duration;
+
+import static org.apache.flink.configuration.ConfigOptions.key;
+
+/**
+ * This class holds configuration constants used by filesystem(Including hive) 
connector.
+ */
+public class FileSystemOptions {
+
+       public static final ConfigOption<Boolean> STREAMING_SOURCE_ENABLE =
+                       key("streaming-source.enable")
+                                       .booleanType()
+                                       .defaultValue(false)
+                                       .withDescription("Enable streaming 
source or not.");
+
+       public static final ConfigOption<Duration> 
STREAMING_SOURCE_MONITOR_INTERVAL =
+                       key("streaming-source.monitor-interval")
+                                       .durationType()
+                                       .defaultValue(Duration.ofMillis(1))
+                                       .withDescription("The minimum interval 
allowed between consecutive partition/file discovery.");
+
+       public static final ConfigOption<String> STREAMING_SOURCE_CONSUME_ORDER 
=
+                       key("streaming-source.consume-order")
+                                       .stringType()
+                                       .defaultValue("create-time")
+                                       .withDescription("The consume order of 
streaming source," +
+                                                       " support create-time 
and partition-time." +
+                                                       " create-time compare 
partition/file creation time;" +

Review comment:
       I think we'd better clarify this is not the partition create time in 
HMS, but the folder/file create time in filesystem?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to