This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new fc581da  [FLINK-19888][hive] Migrate Hive source to FLIP-27 source 
interface for streaming
fc581da is described below

commit fc581da73fcb9bbaa443604d36498c9a5dbdf0cd
Author: Rui Li <[email protected]>
AuthorDate: Sun Nov 8 21:13:37 2020 +0800

    [FLINK-19888][hive] Migrate Hive source to FLIP-27 source interface for 
streaming
    
    This closes #13963
---
 .../connector/file/src/AbstractFileSource.java     |  14 ++
 .../file/src/ContinuousEnumerationSettings.java    |   4 +-
 .../file/src/PendingSplitsCheckpoint.java          |   4 +-
 .../ContinuousHivePendingSplitsCheckpoint.java     |  53 +++++
 ...nuousHivePendingSplitsCheckpointSerializer.java | 139 ++++++++++++
 .../hive/ContinuousHiveSplitEnumerator.java        | 230 ++++++++++++++++++++
 .../apache/flink/connectors/hive/HiveSource.java   | 205 +++++++++++++++---
 .../flink/connectors/hive/HiveTableSource.java     | 195 +++++------------
 .../read/HiveContinuousMonitoringFunction.java     | 241 ---------------------
 .../hive/read/HiveContinuousPartitionFetcher.java  |   2 +-
 .../filesystem/ContinuousPartitionFetcher.java     |   4 +-
 .../flink/table/filesystem/PartitionFetcher.java   |   2 +-
 12 files changed, 669 insertions(+), 424 deletions(-)

diff --git 
a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/AbstractFileSource.java
 
b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/AbstractFileSource.java
index 147691b..46d3d37 100644
--- 
a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/AbstractFileSource.java
+++ 
b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/AbstractFileSource.java
@@ -100,6 +100,20 @@ public abstract class AbstractFileSource<T, SplitT extends 
FileSourceSplit>
        }
 
        // 
------------------------------------------------------------------------
+       //  Getters
+       // 
------------------------------------------------------------------------
+
+       public FileSplitAssigner.Provider getAssignerFactory() {
+               return assignerFactory;
+       }
+
+       @Nullable
+       public ContinuousEnumerationSettings getContinuousEnumerationSettings() 
{
+               return continuousEnumerationSettings;
+       }
+
+
+       // 
------------------------------------------------------------------------
        //  Source API Methods
        // 
------------------------------------------------------------------------
 
diff --git 
a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/ContinuousEnumerationSettings.java
 
b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/ContinuousEnumerationSettings.java
index 4123c43..72ab516 100644
--- 
a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/ContinuousEnumerationSettings.java
+++ 
b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/ContinuousEnumerationSettings.java
@@ -30,13 +30,13 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
  * file source's continuous discovery and streaming mode.
  */
 @Internal
-final class ContinuousEnumerationSettings implements Serializable {
+public final class ContinuousEnumerationSettings implements Serializable {
 
        private static final long serialVersionUID = 1L;
 
        private final Duration discoveryInterval;
 
-       ContinuousEnumerationSettings(Duration discoveryInterval) {
+       public ContinuousEnumerationSettings(Duration discoveryInterval) {
                this.discoveryInterval = checkNotNull(discoveryInterval);
        }
 
diff --git 
a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/PendingSplitsCheckpoint.java
 
b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/PendingSplitsCheckpoint.java
index 847f99e..5950df2 100644
--- 
a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/PendingSplitsCheckpoint.java
+++ 
b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/PendingSplitsCheckpoint.java
@@ -33,7 +33,7 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
  * A checkpoint of the current state of the containing the currently pending 
splits that are not yet assigned.
  */
 @PublicEvolving
-public final class PendingSplitsCheckpoint<SplitT extends FileSourceSplit> {
+public class PendingSplitsCheckpoint<SplitT extends FileSourceSplit> {
 
        /** The splits in the checkpoint. */
        private final Collection<SplitT> splits;
@@ -48,7 +48,7 @@ public final class PendingSplitsCheckpoint<SplitT extends 
FileSourceSplit> {
        @Nullable
        byte[] serializedFormCache;
 
-       private PendingSplitsCheckpoint(Collection<SplitT> splits, 
Collection<Path> alreadyProcessedPaths) {
+       protected PendingSplitsCheckpoint(Collection<SplitT> splits, 
Collection<Path> alreadyProcessedPaths) {
                this.splits = Collections.unmodifiableCollection(splits);
                this.alreadyProcessedPaths = 
Collections.unmodifiableCollection(alreadyProcessedPaths);
        }
diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/ContinuousHivePendingSplitsCheckpoint.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/ContinuousHivePendingSplitsCheckpoint.java
new file mode 100644
index 0000000..e562c75
--- /dev/null
+++ 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/ContinuousHivePendingSplitsCheckpoint.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connectors.hive;
+
+import org.apache.flink.connector.file.src.PendingSplitsCheckpoint;
+import org.apache.flink.connectors.hive.read.HiveSourceSplit;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * The checkpoint of current state of continuous hive source reading.
+ */
+public class ContinuousHivePendingSplitsCheckpoint extends 
PendingSplitsCheckpoint<HiveSourceSplit> {
+
+       private final Comparable<?> currentReadOffset;
+       private final Collection<List<String>> seenPartitionsSinceOffset;
+
+       public ContinuousHivePendingSplitsCheckpoint(
+                       Collection<HiveSourceSplit> splits,
+                       Comparable<?> currentReadOffset,
+                       Collection<List<String>> seenPartitionsSinceOffset) {
+               super(new ArrayList<>(splits), Collections.emptyList());
+               this.currentReadOffset = currentReadOffset;
+               this.seenPartitionsSinceOffset = 
Collections.unmodifiableCollection(new ArrayList<>(seenPartitionsSinceOffset));
+       }
+
+       public Comparable<?> getCurrentReadOffset() {
+               return currentReadOffset;
+       }
+
+       public Collection<List<String>> getSeenPartitionsSinceOffset() {
+               return seenPartitionsSinceOffset;
+       }
+}
diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/ContinuousHivePendingSplitsCheckpointSerializer.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/ContinuousHivePendingSplitsCheckpointSerializer.java
new file mode 100644
index 0000000..ab3f54f
--- /dev/null
+++ 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/ContinuousHivePendingSplitsCheckpointSerializer.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connectors.hive;
+
+import org.apache.flink.api.common.typeutils.base.ListSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.connector.file.src.PendingSplitsCheckpoint;
+import org.apache.flink.connector.file.src.PendingSplitsCheckpointSerializer;
+import org.apache.flink.connectors.hive.read.HiveSourceSplit;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.util.InstantiationUtil;
+import org.apache.flink.util.Preconditions;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * SerDe for {@link ContinuousHivePendingSplitsCheckpoint}.
+ */
+public class ContinuousHivePendingSplitsCheckpointSerializer implements 
SimpleVersionedSerializer<PendingSplitsCheckpoint<HiveSourceSplit>> {
+
+       private static final int VERSION = 1;
+
+       private final PendingSplitsCheckpointSerializer<HiveSourceSplit> 
superSerDe;
+       // SerDe for a single partition
+       private final ListSerializer<String> partitionSerDe = new 
ListSerializer<>(StringSerializer.INSTANCE);
+       // SerDe for the current read offset
+       private final ReadOffsetSerDe readOffsetSerDe = 
ReadOffsetSerDeImpl.INSTANCE;
+
+       public 
ContinuousHivePendingSplitsCheckpointSerializer(SimpleVersionedSerializer<HiveSourceSplit>
 splitSerDe) {
+               superSerDe = new 
PendingSplitsCheckpointSerializer<>(splitSerDe);
+       }
+
+       @Override
+       public int getVersion() {
+               return VERSION;
+       }
+
+       @Override
+       public byte[] serialize(PendingSplitsCheckpoint<HiveSourceSplit> 
checkpoint) throws IOException {
+               Preconditions.checkArgument(checkpoint.getClass() == 
ContinuousHivePendingSplitsCheckpoint.class,
+                               "Only supports %s", 
ContinuousHivePendingSplitsCheckpoint.class.getName());
+
+               ContinuousHivePendingSplitsCheckpoint hiveCheckpoint = 
(ContinuousHivePendingSplitsCheckpoint) checkpoint;
+               PendingSplitsCheckpoint<HiveSourceSplit> superCP = 
PendingSplitsCheckpoint.fromCollectionSnapshot(
+                               hiveCheckpoint.getSplits(), 
hiveCheckpoint.getAlreadyProcessedPaths());
+               byte[] superBytes = superSerDe.serialize(superCP);
+               ByteArrayOutputStream byteArrayOutputStream = new 
ByteArrayOutputStream();
+
+               try (DataOutputViewStreamWrapper outputWrapper = new 
DataOutputViewStreamWrapper(byteArrayOutputStream)) {
+                       outputWrapper.writeInt(superBytes.length);
+                       outputWrapper.write(superBytes);
+                       
readOffsetSerDe.serialize(hiveCheckpoint.getCurrentReadOffset(), outputWrapper);
+                       
outputWrapper.writeInt(hiveCheckpoint.getSeenPartitionsSinceOffset().size());
+                       for (List<String> partition : 
hiveCheckpoint.getSeenPartitionsSinceOffset()) {
+                               partitionSerDe.serialize(partition, 
outputWrapper);
+                       }
+               }
+               return byteArrayOutputStream.toByteArray();
+       }
+
+       @Override
+       public PendingSplitsCheckpoint<HiveSourceSplit> deserialize(int 
version, byte[] serialized) throws IOException {
+               if (version == 1) {
+                       try (DataInputViewStreamWrapper inputWrapper = new 
DataInputViewStreamWrapper(new ByteArrayInputStream(serialized))) {
+                               return deserializeV1(inputWrapper);
+                       }
+               }
+               throw new IOException("Unknown version: " + version);
+       }
+
+       private PendingSplitsCheckpoint<HiveSourceSplit> 
deserializeV1(DataInputViewStreamWrapper inputWrapper) throws IOException {
+               byte[] superBytes = new byte[inputWrapper.readInt()];
+               inputWrapper.readFully(superBytes);
+               PendingSplitsCheckpoint<HiveSourceSplit> superCP = 
superSerDe.deserialize(superSerDe.getVersion(), superBytes);
+               try {
+                       Comparable<?> currentReadOffset = 
readOffsetSerDe.deserialize(inputWrapper);
+                       int numParts = inputWrapper.readInt();
+                       List<List<String>> parts = new ArrayList<>(numParts);
+                       for (int i = 0; i < numParts; i++) {
+                               
parts.add(partitionSerDe.deserialize(inputWrapper));
+                       }
+                       return new 
ContinuousHivePendingSplitsCheckpoint(superCP.getSplits(), currentReadOffset, 
parts);
+               } catch (ClassNotFoundException e) {
+                       throw new IOException("Failed to deserialize " + 
getClass().getName(), e);
+               }
+       }
+
+       private static class ReadOffsetSerDeImpl implements ReadOffsetSerDe {
+
+               private static final ReadOffsetSerDeImpl INSTANCE = new 
ReadOffsetSerDeImpl();
+
+               private ReadOffsetSerDeImpl() {
+               }
+
+               @Override
+               public void serialize(Comparable<?> offset, OutputStream 
outputStream) throws IOException {
+                       InstantiationUtil.serializeObject(outputStream, offset);
+               }
+
+               @Override
+               public Comparable<?> deserialize(InputStream inputStream) 
throws IOException, ClassNotFoundException {
+                       return InstantiationUtil.deserializeObject(inputStream, 
Thread.currentThread().getContextClassLoader());
+               }
+       }
+
+       /**
+        * SerDe for the current read offset.
+        */
+       interface ReadOffsetSerDe {
+
+               void serialize(Comparable<?> offset, OutputStream outputStream) 
throws IOException;
+
+               Comparable<?> deserialize(InputStream inputStream) throws 
IOException, ClassNotFoundException;
+       }
+}
diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/ContinuousHiveSplitEnumerator.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/ContinuousHiveSplitEnumerator.java
new file mode 100644
index 0000000..628203f
--- /dev/null
+++ 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/ContinuousHiveSplitEnumerator.java
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connectors.hive;
+
+import org.apache.flink.api.connector.source.SourceEvent;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.api.connector.source.event.RequestSplitEvent;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.connector.file.src.FileSourceSplit;
+import org.apache.flink.connector.file.src.PendingSplitsCheckpoint;
+import org.apache.flink.connector.file.src.assigners.FileSplitAssigner;
+import org.apache.flink.connectors.hive.read.HiveSourceSplit;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.filesystem.ContinuousPartitionFetcher;
+
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.mapred.JobConf;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+/**
+ * A continuously monitoring {@link SplitEnumerator} for hive source.
+ */
+public class ContinuousHiveSplitEnumerator<T extends Comparable<T>> implements 
SplitEnumerator<HiveSourceSplit, PendingSplitsCheckpoint<HiveSourceSplit>> {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(ContinuousHiveSplitEnumerator.class);
+
+       private final SplitEnumeratorContext<HiveSourceSplit> enumeratorContext;
+       private final LinkedHashMap<Integer, String> readersAwaitingSplit;
+       private final FileSplitAssigner splitAssigner;
+       private final long discoveryInterval;
+
+       private final JobConf jobConf;
+       private final ObjectPath tablePath;
+
+       private final ContinuousPartitionFetcher<Partition, T> fetcher;
+       private final HiveTableSource.HiveContinuousPartitionFetcherContext<T> 
fetcherContext;
+
+       private final ReadWriteLock stateLock = new ReentrantReadWriteLock();
+       // the maximum partition read offset seen so far
+       private volatile T currentReadOffset;
+       // the partitions that have been processed for the current read offset
+       private final Set<List<String>> seenPartitionsSinceOffset;
+
+       public ContinuousHiveSplitEnumerator(
+                       SplitEnumeratorContext<HiveSourceSplit> 
enumeratorContext,
+                       T currentReadOffset,
+                       Collection<List<String>> seenPartitionsSinceOffset,
+                       FileSplitAssigner splitAssigner,
+                       long discoveryInterval,
+                       JobConf jobConf,
+                       ObjectPath tablePath,
+                       ContinuousPartitionFetcher<Partition, T> fetcher,
+                       
HiveTableSource.HiveContinuousPartitionFetcherContext<T> fetcherContext) {
+               this.enumeratorContext = enumeratorContext;
+               this.currentReadOffset = currentReadOffset;
+               this.seenPartitionsSinceOffset = new 
HashSet<>(seenPartitionsSinceOffset);
+               this.splitAssigner = splitAssigner;
+               this.discoveryInterval = discoveryInterval;
+               this.jobConf = jobConf;
+               this.tablePath = tablePath;
+               this.fetcher = fetcher;
+               this.fetcherContext = fetcherContext;
+               readersAwaitingSplit = new LinkedHashMap<>();
+       }
+
+       @Override
+       public void start() {
+               try {
+                       fetcherContext.open();
+                       enumeratorContext.callAsync(
+                                       this::monitorAndGetSplits,
+                                       this::handleNewSplits,
+                                       discoveryInterval,
+                                       discoveryInterval);
+               } catch (Exception e) {
+                       throw new FlinkHiveException("Failed to start 
continuous split enumerator", e);
+               }
+       }
+
+       @Override
+       public void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) {
+               if (sourceEvent instanceof RequestSplitEvent) {
+                       readersAwaitingSplit.put(subtaskId, 
((RequestSplitEvent) sourceEvent).hostName());
+                       assignSplits();
+               } else {
+                       LOG.error("Received unrecognized event: {}", 
sourceEvent);
+               }
+       }
+
+       @Override
+       public void addSplitsBack(List<HiveSourceSplit> splits, int subtaskId) {
+               LOG.debug("Continuous Hive Source Enumerator adds splits back: 
{}", splits);
+               stateLock.writeLock().lock();
+               try {
+                       splitAssigner.addSplits(new ArrayList<>(splits));
+               } finally {
+                       stateLock.writeLock().unlock();
+               }
+       }
+
+       @Override
+       public void addReader(int subtaskId) {
+               // this source is purely lazy-pull-based, nothing to do upon 
registration
+       }
+
+       @Override
+       public PendingSplitsCheckpoint<HiveSourceSplit> snapshotState() throws 
Exception {
+               stateLock.readLock().lock();
+               try {
+                       Collection<HiveSourceSplit> remainingSplits = 
(Collection<HiveSourceSplit>) (Collection<?>) splitAssigner.remainingSplits();
+                       return new 
ContinuousHivePendingSplitsCheckpoint(remainingSplits, currentReadOffset, 
seenPartitionsSinceOffset);
+               } finally {
+                       stateLock.readLock().unlock();
+               }
+       }
+
+       @Override
+       public void close() throws IOException {
+               try {
+                       fetcherContext.close();
+               } catch (Exception e) {
+                       throw new IOException(e);
+               }
+       }
+
+       private Void monitorAndGetSplits() throws Exception {
+               stateLock.writeLock().lock();
+               try {
+                       List<Tuple2<Partition, T>> partitions = 
fetcher.fetchPartitions(fetcherContext, currentReadOffset);
+                       if (partitions.isEmpty()) {
+                               return null;
+                       }
+
+                       partitions.sort(Comparator.comparing(o -> o.f1));
+                       List<HiveSourceSplit> newSplits = new ArrayList<>();
+                       // the max offset of new partitions
+                       T maxOffset = currentReadOffset;
+                       Set<List<String>> nextSeen = new HashSet<>();
+                       for (Tuple2<Partition, T> tuple2 : partitions) {
+                               Partition partition = tuple2.f0;
+                               List<String> partSpec = partition.getValues();
+                               if (seenPartitionsSinceOffset.add(partSpec)) {
+                                       T offset = tuple2.f1;
+                                       if (offset.compareTo(currentReadOffset) 
> 0) {
+                                               nextSeen.add(partSpec);
+                                       }
+                                       if (offset.compareTo(maxOffset) > 0) {
+                                               maxOffset = offset;
+                                       }
+                                       LOG.info("Found new partition {} of 
table {}, generating splits for it",
+                                                       partSpec, 
tablePath.getFullName());
+                                       
newSplits.addAll(HiveSourceFileEnumerator.createInputSplits(
+                                                       0, 
Collections.singletonList(fetcherContext.toHiveTablePartition(partition)), 
jobConf));
+                               }
+                       }
+                       currentReadOffset = maxOffset;
+                       splitAssigner.addSplits(new ArrayList<>(newSplits));
+                       if (!nextSeen.isEmpty()) {
+                               seenPartitionsSinceOffset.clear();
+                               seenPartitionsSinceOffset.addAll(nextSeen);
+                       }
+                       return null;
+               } finally {
+                       stateLock.writeLock().unlock();
+               }
+       }
+
+       private void handleNewSplits(Void v, Throwable error) {
+               if (error != null) {
+                       LOG.error("Failed to enumerate files", error);
+                       return;
+               }
+               assignSplits();
+       }
+
+       private void assignSplits() {
+               final Iterator<Map.Entry<Integer, String>> awaitingReader = 
readersAwaitingSplit.entrySet().iterator();
+
+               stateLock.writeLock().lock();
+               try {
+                       while (awaitingReader.hasNext()) {
+                               final Map.Entry<Integer, String> nextAwaiting = 
awaitingReader.next();
+                               final String hostname = nextAwaiting.getValue();
+                               final int awaitingSubtask = 
nextAwaiting.getKey();
+                               final Optional<FileSourceSplit> nextSplit = 
splitAssigner.getNext(hostname);
+                               if (nextSplit.isPresent()) {
+                                       
enumeratorContext.assignSplit((HiveSourceSplit) nextSplit.get(), 
awaitingSubtask);
+                                       awaitingReader.remove();
+                               } else {
+                                       break;
+                               }
+                       }
+               } finally {
+                       stateLock.writeLock().unlock();
+               }
+       }
+}
diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveSource.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveSource.java
index a765646..243e4a4 100644
--- 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveSource.java
+++ 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveSource.java
@@ -18,22 +18,37 @@
 
 package org.apache.flink.connectors.hive;
 
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
 import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
 import org.apache.flink.connector.file.src.AbstractFileSource;
+import org.apache.flink.connector.file.src.ContinuousEnumerationSettings;
+import org.apache.flink.connector.file.src.PendingSplitsCheckpoint;
+import org.apache.flink.connector.file.src.assigners.FileSplitAssigner;
+import org.apache.flink.connector.file.src.assigners.SimpleSplitAssigner;
+import org.apache.flink.connector.file.src.enumerate.FileEnumerator;
 import org.apache.flink.connector.file.src.reader.BulkFormat;
 import org.apache.flink.connectors.hive.read.HiveBulkFormatAdapter;
 import org.apache.flink.connectors.hive.read.HiveSourceSplit;
+import org.apache.flink.core.fs.Path;
 import org.apache.flink.core.io.SimpleVersionedSerializer;
 import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.ObjectPath;
 import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.filesystem.ContinuousPartitionFetcher;
 import org.apache.flink.table.filesystem.LimitableBulkFormat;
 import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.util.Preconditions;
 
+import org.apache.hadoop.hive.metastore.api.Partition;
 import org.apache.hadoop.mapred.JobConf;
 
 import javax.annotation.Nullable;
 
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
 import java.util.List;
 
 import static 
org.apache.flink.connector.file.src.FileSource.DEFAULT_SPLIT_ASSIGNER;
@@ -46,22 +61,34 @@ public class HiveSource extends AbstractFileSource<RowData, 
HiveSourceSplit> imp
 
        private static final long serialVersionUID = 1L;
 
+       private final JobConfWrapper jobConfWrapper;
+       private final List<String> partitionKeys;
+       private final ContinuousPartitionFetcher<Partition, ?> fetcher;
+       private final HiveTableSource.HiveContinuousPartitionFetcherContext<?> 
fetcherContext;
+       private final ObjectPath tablePath;
+
        HiveSource(
+                       Path[] inputPaths,
+                       FileEnumerator.Provider fileEnumerator,
+                       FileSplitAssigner.Provider splitAssigner,
+                       BulkFormat<RowData, HiveSourceSplit> readerFormat,
+                       @Nullable ContinuousEnumerationSettings 
continuousEnumerationSettings,
                        JobConf jobConf,
-                       CatalogTable catalogTable,
-                       List<HiveTablePartition> partitions,
-                       @Nullable Long limit,
-                       String hiveVersion,
-                       boolean useMapRedReader,
-                       boolean isStreamingSource,
-                       RowType producedRowType) {
+                       ObjectPath tablePath,
+                       List<String> partitionKeys,
+                       @Nullable ContinuousPartitionFetcher<Partition, ?> 
fetcher,
+                       @Nullable 
HiveTableSource.HiveContinuousPartitionFetcherContext<?> fetcherContext) {
                super(
-                               new org.apache.flink.core.fs.Path[1],
-                               new 
HiveSourceFileEnumerator.Provider(partitions, new JobConfWrapper(jobConf)),
-                               DEFAULT_SPLIT_ASSIGNER,
-                               createBulkFormat(new JobConf(jobConf), 
catalogTable, hiveVersion, producedRowType, useMapRedReader, limit),
-                               null);
-               Preconditions.checkArgument(!isStreamingSource, "HiveSource 
currently only supports bounded mode");
+                               inputPaths,
+                               fileEnumerator,
+                               splitAssigner,
+                               readerFormat,
+                               continuousEnumerationSettings);
+               this.jobConfWrapper = new JobConfWrapper(jobConf);
+               this.tablePath = tablePath;
+               this.partitionKeys = partitionKeys;
+               this.fetcher = fetcher;
+               this.fetcherContext = fetcherContext;
        }
 
        @Override
@@ -69,24 +96,140 @@ public class HiveSource extends 
AbstractFileSource<RowData, HiveSourceSplit> imp
                return HiveSourceSplitSerializer.INSTANCE;
        }
 
-       private static BulkFormat<RowData, HiveSourceSplit> createBulkFormat(
-                       JobConf jobConf,
-                       CatalogTable catalogTable,
-                       String hiveVersion,
-                       RowType producedRowType,
-                       boolean useMapRedReader,
-                       Long limit) {
-               checkNotNull(catalogTable, "catalogTable can not be null.");
-               return LimitableBulkFormat.create(
-                               new HiveBulkFormatAdapter(
-                                               new JobConfWrapper(jobConf),
-                                               catalogTable.getPartitionKeys(),
-                                               
catalogTable.getSchema().getFieldNames(),
-                                               
catalogTable.getSchema().getFieldDataTypes(),
-                                               hiveVersion,
-                                               producedRowType,
-                                               useMapRedReader),
-                               limit
+       @Override
+       public 
SimpleVersionedSerializer<PendingSplitsCheckpoint<HiveSourceSplit>> 
getEnumeratorCheckpointSerializer() {
+               if (continuousPartitionedEnumerator()) {
+                       return new 
ContinuousHivePendingSplitsCheckpointSerializer(getSplitSerializer());
+               } else {
+                       return super.getEnumeratorCheckpointSerializer();
+               }
+       }
+
+       @Override
+       public SplitEnumerator<HiveSourceSplit, 
PendingSplitsCheckpoint<HiveSourceSplit>> createEnumerator(
+                       SplitEnumeratorContext<HiveSourceSplit> enumContext) {
+               if (continuousPartitionedEnumerator()) {
+                       return createContinuousSplitEnumerator(
+                                       enumContext, 
fetcherContext.getConsumeStartOffset(), Collections.emptyList(), 
Collections.emptyList());
+               } else {
+                       return super.createEnumerator(enumContext);
+               }
+       }
+
+       @Override
+       public SplitEnumerator<HiveSourceSplit, 
PendingSplitsCheckpoint<HiveSourceSplit>> restoreEnumerator(
+                       SplitEnumeratorContext<HiveSourceSplit> enumContext, 
PendingSplitsCheckpoint<HiveSourceSplit> checkpoint) {
+               if (continuousPartitionedEnumerator()) {
+                       Preconditions.checkState(checkpoint instanceof 
ContinuousHivePendingSplitsCheckpoint,
+                                       "Illegal type of splits checkpoint %s 
for streaming read partitioned table", checkpoint.getClass().getName());
+                       ContinuousHivePendingSplitsCheckpoint hiveCheckpoint = 
(ContinuousHivePendingSplitsCheckpoint) checkpoint;
+                       return createContinuousSplitEnumerator(
+                                       enumContext, 
hiveCheckpoint.getCurrentReadOffset(), 
hiveCheckpoint.getSeenPartitionsSinceOffset(), hiveCheckpoint.getSplits());
+               } else {
+                       return super.restoreEnumerator(enumContext, checkpoint);
+               }
+       }
+
+       private boolean continuousPartitionedEnumerator() {
+               return getBoundedness() == Boundedness.CONTINUOUS_UNBOUNDED && 
!partitionKeys.isEmpty();
+       }
+
+       private SplitEnumerator<HiveSourceSplit, 
PendingSplitsCheckpoint<HiveSourceSplit>> createContinuousSplitEnumerator(
+                       SplitEnumeratorContext<HiveSourceSplit> enumContext,
+                       Comparable<?> currentReadOffset,
+                       Collection<List<String>> seenPartitions,
+                       Collection<HiveSourceSplit> splits) {
+               return new ContinuousHiveSplitEnumerator(
+                               enumContext,
+                               currentReadOffset,
+                               seenPartitions,
+                               getAssignerFactory().create(new 
ArrayList<>(splits)),
+                               
getContinuousEnumerationSettings().getDiscoveryInterval().toMillis(),
+                               jobConfWrapper.conf(),
+                               tablePath,
+                               fetcher,
+                               fetcherContext
                );
        }
+
+       /**
+        * Builder to build HiveSource instances.
+        */
+       public static class HiveSourceBuilder extends 
AbstractFileSourceBuilder<RowData, HiveSourceSplit, HiveSourceBuilder> {
+
+               private final JobConf jobConf;
+               private final ObjectPath tablePath;
+               private final List<String> partitionKeys;
+
+               private ContinuousPartitionFetcher<Partition, ?> fetcher = null;
+               private 
HiveTableSource.HiveContinuousPartitionFetcherContext<?> fetcherContext = null;
+
+               HiveSourceBuilder(
+                               JobConf jobConf,
+                               ObjectPath tablePath,
+                               CatalogTable catalogTable,
+                               List<HiveTablePartition> partitions,
+                               @Nullable Long limit,
+                               String hiveVersion,
+                               boolean useMapRedReader,
+                               RowType producedRowType) {
+                       super(
+                                       new Path[1],
+                                       createBulkFormat(new JobConf(jobConf), 
catalogTable, hiveVersion, producedRowType, useMapRedReader, limit),
+                                       new 
HiveSourceFileEnumerator.Provider(partitions, new JobConfWrapper(jobConf)),
+                                       null);
+                       this.jobConf = jobConf;
+                       this.tablePath = tablePath;
+                       this.partitionKeys = catalogTable.getPartitionKeys();
+               }
+
+               @Override
+               public HiveSource build() {
+                       FileSplitAssigner.Provider splitAssigner = 
continuousSourceSettings == null || partitionKeys.isEmpty() ?
+                                       DEFAULT_SPLIT_ASSIGNER : 
SimpleSplitAssigner::new;
+                       return new HiveSource(
+                                       inputPaths,
+                                       fileEnumerator,
+                                       splitAssigner,
+                                       readerFormat,
+                                       continuousSourceSettings,
+                                       jobConf,
+                                       tablePath,
+                                       partitionKeys,
+                                       fetcher,
+                                       fetcherContext
+                       );
+               }
+
+               public HiveSourceBuilder 
setFetcher(ContinuousPartitionFetcher<Partition, ?> fetcher) {
+                       this.fetcher = fetcher;
+                       return this;
+               }
+
+               public HiveSourceBuilder 
setFetcherContext(HiveTableSource.HiveContinuousPartitionFetcherContext<?> 
fetcherContext) {
+                       this.fetcherContext = fetcherContext;
+                       return this;
+               }
+
+               private static BulkFormat<RowData, HiveSourceSplit> 
createBulkFormat(
+                               JobConf jobConf,
+                               CatalogTable catalogTable,
+                               String hiveVersion,
+                               RowType producedRowType,
+                               boolean useMapRedReader,
+                               Long limit) {
+                       checkNotNull(catalogTable, "catalogTable can not be 
null.");
+                       return LimitableBulkFormat.create(
+                                       new HiveBulkFormatAdapter(
+                                                       new 
JobConfWrapper(jobConf),
+                                                       
catalogTable.getPartitionKeys(),
+                                                       
catalogTable.getSchema().getFieldNames(),
+                                                       
catalogTable.getSchema().getFieldDataTypes(),
+                                                       hiveVersion,
+                                                       producedRowType,
+                                                       useMapRedReader),
+                                       limit
+                       );
+               }
+       }
 }
diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSource.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSource.java
index ec127e5..f158625 100644
--- 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSource.java
+++ 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSource.java
@@ -20,27 +20,18 @@ package org.apache.flink.connectors.hive;
 
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.eventtime.WatermarkStrategy;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.base.LongSerializer;
 import org.apache.flink.api.common.typeutils.base.StringSerializer;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.ReadableConfig;
-import org.apache.flink.connectors.hive.read.HiveContinuousMonitoringFunction;
 import org.apache.flink.connectors.hive.read.HiveContinuousPartitionFetcher;
 import org.apache.flink.connectors.hive.read.HivePartitionFetcherContextBase;
-import org.apache.flink.connectors.hive.read.HiveTableFileInputFormat;
 import org.apache.flink.connectors.hive.read.HiveTableInputFormat;
-import org.apache.flink.connectors.hive.read.TimestampedHiveInputSplit;
 import org.apache.flink.connectors.hive.util.HivePartitionUtils;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.DataStreamSource;
-import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import 
org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction;
-import 
org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperatorFactory;
-import org.apache.flink.streaming.api.functions.source.FileProcessingMode;
-import 
org.apache.flink.streaming.api.functions.source.TimestampedFileInputSplit;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.catalog.CatalogTable;
 import org.apache.flink.table.catalog.ObjectPath;
@@ -55,9 +46,7 @@ import 
org.apache.flink.table.connector.source.abilities.SupportsLimitPushDown;
 import 
org.apache.flink.table.connector.source.abilities.SupportsPartitionPushDown;
 import 
org.apache.flink.table.connector.source.abilities.SupportsProjectionPushDown;
 import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.data.TimestampData;
 import org.apache.flink.table.filesystem.ContinuousPartitionFetcher;
-import org.apache.flink.table.runtime.types.TypeInfoDataTypeConverter;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.util.Preconditions;
@@ -77,11 +66,9 @@ import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
-import java.util.stream.Collectors;
 
 import static 
org.apache.flink.connectors.hive.util.HivePartitionUtils.getAllPartitions;
 import static 
org.apache.flink.table.catalog.hive.util.HiveTableUtil.checkAcidTable;
-import static 
org.apache.flink.table.filesystem.DefaultPartTimeExtractor.toLocalDateTime;
 import static 
org.apache.flink.table.filesystem.DefaultPartTimeExtractor.toMills;
 import static 
org.apache.flink.table.filesystem.FileSystemOptions.STREAMING_SOURCE_CONSUME_START_OFFSET;
 import static 
org.apache.flink.table.filesystem.FileSystemOptions.STREAMING_SOURCE_ENABLE;
@@ -151,23 +138,62 @@ public class HiveTableSource implements
                                catalogTable,
                                hiveShim,
                                remainingPartitions);
+               Configuration configuration = 
Configuration.fromMap(catalogTable.getOptions());
 
-               @SuppressWarnings("unchecked")
-               TypeInformation<RowData> typeInfo =
-                               (TypeInformation<RowData>) 
TypeInfoDataTypeConverter.fromDataTypeToTypeInfo(getProducedDataType());
-
-               HiveTableInputFormat inputFormat = getInputFormat(
+               HiveSource.HiveSourceBuilder sourceBuilder = new 
HiveSource.HiveSourceBuilder(
+                               jobConf,
+                               tablePath,
+                               catalogTable,
                                allHivePartitions,
-                               
flinkConf.get(HiveOptions.TABLE_EXEC_HIVE_FALLBACK_MAPRED_READER));
-
+                               limit,
+                               hiveVersion,
+                               
flinkConf.get(HiveOptions.TABLE_EXEC_HIVE_FALLBACK_MAPRED_READER),
+                               (RowType) 
getProducedDataType().getLogicalType());
                if (isStreamingSource()) {
                        if (catalogTable.getPartitionKeys().isEmpty()) {
-                               return 
createStreamSourceForNonPartitionTable(execEnv, typeInfo, inputFormat, 
allHivePartitions.get(0));
-                       } else {
-                               return 
createStreamSourceForPartitionTable(execEnv, typeInfo, inputFormat);
+                               String consumeOrderStr = 
configuration.get(STREAMING_SOURCE_PARTITION_ORDER);
+                               ConsumeOrder consumeOrder = 
ConsumeOrder.getConsumeOrder(consumeOrderStr);
+                               if (consumeOrder != 
ConsumeOrder.CREATE_TIME_ORDER) {
+                                       throw new UnsupportedOperationException(
+                                                       "Only " + 
ConsumeOrder.CREATE_TIME_ORDER + " is supported for non partition table.");
+                               }
                        }
+
+                       Duration monitorInterval = 
configuration.get(STREAMING_SOURCE_MONITOR_INTERVAL) == null
+                                       ? DEFAULT_SCAN_MONITOR_INTERVAL
+                                       : 
configuration.get(STREAMING_SOURCE_MONITOR_INTERVAL);
+                       sourceBuilder.monitorContinuously(monitorInterval);
+
+                       if (!catalogTable.getPartitionKeys().isEmpty()) {
+                               sourceBuilder.setFetcher(new 
HiveContinuousPartitionFetcher());
+                               final String defaultPartitionName = 
jobConf.get(HiveConf.ConfVars.DEFAULTPARTITIONNAME.varname,
+                                               
HiveConf.ConfVars.DEFAULTPARTITIONNAME.defaultStrVal);
+                               HiveContinuousPartitionFetcherContext<?> 
fetcherContext = new HiveContinuousPartitionFetcherContext(
+                                               tablePath,
+                                               hiveShim,
+                                               new JobConfWrapper(jobConf),
+                                               catalogTable.getPartitionKeys(),
+                                               
getProducedTableSchema().getFieldDataTypes(),
+                                               
getProducedTableSchema().getFieldNames(),
+                                               configuration,
+                                               defaultPartitionName);
+                               sourceBuilder.setFetcherContext(fetcherContext);
+                       }
+               }
+
+               HiveSource hiveSource = sourceBuilder.build();
+               DataStreamSource<RowData> source = execEnv.fromSource(
+                               hiveSource, WatermarkStrategy.noWatermarks(), 
"HiveSource-" + tablePath.getFullName());
+
+               if (isStreamingSource()) {
+                       return source;
                } else {
-                       return createBatchSource(execEnv, allHivePartitions);
+                       int parallelism = new 
HiveParallelismInference(tablePath, flinkConf)
+                                       .infer(
+                                                       () -> 
HiveSourceFileEnumerator.getNumFiles(allHivePartitions, jobConf),
+                                                       () -> 
HiveSourceFileEnumerator.createInputSplits(0, allHivePartitions, 
jobConf).size())
+                                       .limit(limit);
+                       return source.setParallelism(parallelism);
                }
        }
 
@@ -189,119 +215,6 @@ public class HiveTableSource implements
                                
STREAMING_SOURCE_ENABLE.defaultValue().toString()));
        }
 
-       private DataStream<RowData> 
createBatchSource(StreamExecutionEnvironment execEnv,
-                       List<HiveTablePartition> allHivePartitions) {
-               HiveSource hiveSource = new HiveSource(
-                               jobConf,
-                               catalogTable,
-                               allHivePartitions,
-                               limit,
-                               hiveVersion,
-                               
flinkConf.get(HiveOptions.TABLE_EXEC_HIVE_FALLBACK_MAPRED_READER),
-                               isStreamingSource(),
-                               (RowType) 
getProducedDataType().getLogicalType());
-               DataStreamSource<RowData> source = execEnv.fromSource(
-                               hiveSource, WatermarkStrategy.noWatermarks(), 
"HiveSource-" + tablePath.getFullName());
-
-               int parallelism = new HiveParallelismInference(tablePath, 
flinkConf)
-                               .infer(
-                                               () -> 
HiveSourceFileEnumerator.getNumFiles(allHivePartitions, jobConf),
-                                               () -> 
HiveSourceFileEnumerator.createInputSplits(0, allHivePartitions, 
jobConf).size())
-                               .limit(limit);
-
-               return source.setParallelism(parallelism);
-       }
-
-       @SuppressWarnings("unchecked")
-       private DataStream<RowData> createStreamSourceForPartitionTable(
-                       StreamExecutionEnvironment execEnv,
-                       TypeInformation<RowData> typeInfo,
-                       HiveTableInputFormat inputFormat) {
-               Configuration configuration = new Configuration();
-               catalogTable.getOptions().forEach(configuration::setString);
-
-               Duration monitorInterval = 
configuration.get(STREAMING_SOURCE_MONITOR_INTERVAL) == null
-                               ? DEFAULT_SCAN_MONITOR_INTERVAL
-                               : 
configuration.get(STREAMING_SOURCE_MONITOR_INTERVAL);
-
-               ContinuousPartitionFetcher<Partition, Comparable> fetcher = new 
HiveContinuousPartitionFetcher();
-
-               final String defaultPartitionName = 
jobConf.get(HiveConf.ConfVars.DEFAULTPARTITIONNAME.varname,
-                               
HiveConf.ConfVars.DEFAULTPARTITIONNAME.defaultStrVal);
-               HiveContinuousPartitionFetcherContext fetcherContext = new 
HiveContinuousPartitionFetcherContext(
-                               tablePath,
-                               hiveShim,
-                               new JobConfWrapper(jobConf),
-                               catalogTable.getPartitionKeys(),
-                               getProducedTableSchema().getFieldDataTypes(),
-                               getProducedTableSchema().getFieldNames(),
-                               configuration,
-                               defaultPartitionName);
-
-               HiveContinuousMonitoringFunction monitoringFunction = new 
HiveContinuousMonitoringFunction(
-                               fetcher,
-                               fetcherContext,
-                               jobConf,
-                               tablePath,
-                               execEnv.getParallelism(),
-                               monitorInterval.toMillis());
-
-               ContinuousFileReaderOperatorFactory<RowData, 
TimestampedHiveInputSplit> factory =
-                               new 
ContinuousFileReaderOperatorFactory<>(inputFormat);
-
-               String sourceName = "HiveMonitoringFunction";
-               SingleOutputStreamOperator<RowData> source = execEnv
-                               .addSource(monitoringFunction, sourceName)
-                               .transform("Split Reader: " + sourceName, 
typeInfo, factory);
-
-               return new DataStreamSource<>(source);
-       }
-
-       private DataStream<RowData> createStreamSourceForNonPartitionTable(
-                       StreamExecutionEnvironment execEnv,
-                       TypeInformation<RowData> typeInfo,
-                       HiveTableInputFormat inputFormat,
-                       HiveTablePartition hiveTable) {
-               HiveTableFileInputFormat fileInputFormat = new 
HiveTableFileInputFormat(inputFormat, hiveTable);
-
-               Configuration configuration = new Configuration();
-               catalogTable.getOptions().forEach(configuration::setString);
-               String consumeOrderStr = 
configuration.get(STREAMING_SOURCE_PARTITION_ORDER);
-               ConsumeOrder consumeOrder = 
ConsumeOrder.getConsumeOrder(consumeOrderStr);
-               if (consumeOrder != ConsumeOrder.CREATE_TIME_ORDER) {
-                       throw new UnsupportedOperationException(
-                                       "Only " + 
ConsumeOrder.CREATE_TIME_ORDER + " is supported for non partition table.");
-               }
-
-               String consumeOffset = 
configuration.get(STREAMING_SOURCE_CONSUME_START_OFFSET);
-               long currentReadTime = 0L;
-               if 
(configuration.contains(STREAMING_SOURCE_CONSUME_START_OFFSET)) {
-                       currentReadTime = 
TimestampData.fromLocalDateTime(toLocalDateTime(consumeOffset))
-                                       .toTimestamp().getTime();
-               }
-
-               Duration monitorInterval = 
configuration.get(STREAMING_SOURCE_MONITOR_INTERVAL) == null
-                               ? DEFAULT_SCAN_MONITOR_INTERVAL
-                               : 
configuration.get(STREAMING_SOURCE_MONITOR_INTERVAL);
-
-               ContinuousFileMonitoringFunction<RowData> monitoringFunction =
-                               new ContinuousFileMonitoringFunction<>(
-                                               fileInputFormat,
-                                               
FileProcessingMode.PROCESS_CONTINUOUSLY,
-                                               execEnv.getParallelism(),
-                                               monitorInterval.toMillis(),
-                                               currentReadTime);
-
-               ContinuousFileReaderOperatorFactory<RowData, 
TimestampedFileInputSplit> factory =
-                               new 
ContinuousFileReaderOperatorFactory<>(fileInputFormat);
-
-               String sourceName = "HiveFileMonitoringFunction";
-               SingleOutputStreamOperator<RowData> source = 
execEnv.addSource(monitoringFunction, sourceName)
-                               .transform("Split Reader: " + sourceName, 
typeInfo, factory);
-
-               return new DataStreamSource<>(source);
-       }
-
        @VisibleForTesting
        HiveTableInputFormat getInputFormat(List<HiveTablePartition> 
allHivePartitions, boolean useMapRedReader) {
                return new HiveTableInputFormat(
@@ -365,12 +278,6 @@ public class HiveTableSource implements
                this.projectedFields = 
Arrays.stream(projectedFields).mapToInt(value -> value[0]).toArray();
        }
 
-       private static List<String> partitionSpecToValues(Map<String, String> 
spec, List<String> partitionColNames) {
-               Preconditions.checkArgument(spec.size() == 
partitionColNames.size() && spec.keySet().containsAll(partitionColNames),
-                               "Partition spec (%s) and partition column names 
(%s) doesn't match", spec, partitionColNames);
-               return 
partitionColNames.stream().map(spec::get).collect(Collectors.toList());
-       }
-
        @Override
        public String asSummaryString() {
                return "HiveSource";
@@ -394,7 +301,7 @@ public class HiveTableSource implements
         * PartitionFetcher.Context for {@link ContinuousPartitionFetcher}.
         */
        @SuppressWarnings("unchecked")
-       public static class HiveContinuousPartitionFetcherContext<T extends 
Comparable> extends HivePartitionFetcherContextBase<Partition>
+       public static class HiveContinuousPartitionFetcherContext<T extends 
Comparable<T>> extends HivePartitionFetcherContextBase<Partition>
                        implements 
ContinuousPartitionFetcher.Context<Partition, T> {
 
                private static final long serialVersionUID = 1L;
diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/HiveContinuousMonitoringFunction.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/HiveContinuousMonitoringFunction.java
deleted file mode 100644
index 5abb67a..0000000
--- 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/HiveContinuousMonitoringFunction.java
+++ /dev/null
@@ -1,241 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.connectors.hive.read;
-
-import org.apache.flink.api.common.state.ListState;
-import org.apache.flink.api.common.state.ListStateDescriptor;
-import org.apache.flink.api.common.typeutils.base.ListSerializer;
-import org.apache.flink.api.common.typeutils.base.StringSerializer;
-import org.apache.flink.api.java.tuple.Tuple2;
-import 
org.apache.flink.connectors.hive.HiveTableSource.HiveContinuousPartitionFetcherContext;
-import org.apache.flink.connectors.hive.JobConfWrapper;
-import org.apache.flink.runtime.state.FunctionInitializationContext;
-import org.apache.flink.runtime.state.FunctionSnapshotContext;
-import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
-import 
org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator;
-import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
-import org.apache.flink.table.api.TableException;
-import org.apache.flink.table.catalog.ObjectPath;
-import org.apache.flink.table.filesystem.ContinuousPartitionFetcher;
-import org.apache.flink.util.Preconditions;
-
-import org.apache.hadoop.hive.metastore.api.Partition;
-import org.apache.hadoop.mapred.JobConf;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-
-/**
- * This is the single (non-parallel) monitoring task which takes a {@link 
HiveTableInputFormat},
- * it is responsible for:
- *
- * <ol>
- *     <li>Monitoring partitions of hive meta store.</li>
- *     <li>Deciding which partitions should be further read and processed.</li>
- *     <li>Creating the {@link HiveTableInputSplit splits} corresponding to 
those partitions.</li>
- *     <li>Assigning them to downstream tasks for further processing.</li>
- * </ol>
- *
- * <p>The splits to be read are forwarded to the downstream {@link 
ContinuousFileReaderOperator}
- * which can have parallelism greater than one.
- *
- * <p><b>IMPORTANT NOTE: </b> Splits are forwarded downstream for reading in 
ascending partition time order,
- * based on the partition time of the partitions they belong to.
- *
- * @param <T> The serializer type of partition offset.
- */
-public class HiveContinuousMonitoringFunction<T extends Comparable>
-               extends RichSourceFunction<TimestampedHiveInputSplit>
-               implements CheckpointedFunction {
-
-       private static final long serialVersionUID = 1L;
-
-       private static final Logger LOG = 
LoggerFactory.getLogger(HiveContinuousMonitoringFunction.class);
-
-       private final ContinuousPartitionFetcher<Partition, T> fetcher;
-       private final HiveContinuousPartitionFetcherContext<T> fetcherContext;
-
-       /** The parallelism of the downstream readers. */
-       private final int readerParallelism;
-
-       /** The interval between consecutive path scans. */
-       private final long interval;
-       private ObjectPath tablePath;
-       private final JobConfWrapper conf;
-
-       private volatile boolean isRunning = true;
-
-       /** The maximum partition read offset seen so far. */
-       private volatile T currentReadOffset;
-
-       private transient Object checkpointLock;
-
-       private transient ListState<T> currReadOffsetState;
-
-       private transient ListState<List<List<String>>> distinctPartsState;
-
-       private transient Set<List<String>> distinctPartitions;
-
-       public HiveContinuousMonitoringFunction(
-                       ContinuousPartitionFetcher<Partition, T> fetcher,
-                       HiveContinuousPartitionFetcherContext<T> fetcherContext,
-                       JobConf conf,
-                       ObjectPath tablePath,
-                       int readerParallelism,
-                       long interval) {
-               this.conf = new JobConfWrapper(conf);
-               this.tablePath = tablePath;
-               this.interval = interval;
-               this.readerParallelism = Math.max(readerParallelism, 1);
-               this.fetcher = fetcher;
-               this.fetcherContext = fetcherContext;
-               this.currentReadOffset = fetcherContext.getConsumeStartOffset();
-       }
-
-       @Override
-       public void initializeState(FunctionInitializationContext context) 
throws Exception {
-               this.fetcherContext.open();
-
-               this.currReadOffsetState = 
context.getOperatorStateStore().getListState(
-                       new ListStateDescriptor<>(
-                               "current-read-offset-state",
-                               fetcherContext.getTypeSerializer()
-                       )
-               );
-               this.distinctPartsState = 
context.getOperatorStateStore().getListState(
-                       new ListStateDescriptor<>(
-                               "distinct-partitions-state",
-                               new ListSerializer<>(new 
ListSerializer<>(StringSerializer.INSTANCE))
-                       )
-               );
-
-               this.distinctPartitions = new HashSet<>();
-               if (context.isRestored()) {
-                       LOG.info("Restoring state for the {}.", 
getClass().getSimpleName());
-                       this.currentReadOffset = 
this.currReadOffsetState.get().iterator().next();
-                       
this.distinctPartitions.addAll(this.distinctPartsState.get().iterator().next());
-               } else {
-                       LOG.info("No state to restore for the {}.", 
getClass().getSimpleName());
-                       this.currentReadOffset = 
fetcherContext.getConsumeStartOffset();
-               }
-       }
-
-       @Override
-       public void run(SourceContext<TimestampedHiveInputSplit> context) 
throws Exception {
-               checkpointLock = context.getCheckpointLock();
-               while (isRunning) {
-                       synchronized (checkpointLock) {
-                               if (isRunning) {
-                                       monitorAndForwardSplits(context);
-                               }
-                       }
-                       Thread.sleep(interval);
-               }
-       }
-
-       private void monitorAndForwardSplits(
-                       SourceContext<TimestampedHiveInputSplit> context) 
throws Exception {
-               assert (Thread.holdsLock(checkpointLock));
-
-               List<Tuple2<Partition, T>> partitions = 
fetcher.fetchPartitions(this.fetcherContext, currentReadOffset);
-               if (partitions.isEmpty()) {
-                       return;
-               }
-
-               partitions.sort(Comparator.comparing(o -> o.f1));
-
-               T maxPartitionOffset = null;
-               Set<List<String>> nextDistinctParts = new HashSet<>();
-               for (Tuple2<Partition, T> tuple2 : partitions) {
-                       Partition partition = tuple2.f0;
-                       List<String> partSpec = partition.getValues();
-                       if (!this.distinctPartitions.contains(partSpec)) {
-                               this.distinctPartitions.add(partSpec);
-                               T partitionOffset = tuple2.f1;
-                               if 
(partitionOffset.compareTo(currentReadOffset) > 0) {
-                                       nextDistinctParts.add(partSpec);
-                               }
-                               if (maxPartitionOffset == null || 
partitionOffset.compareTo(maxPartitionOffset) > 0) {
-                                       maxPartitionOffset = partitionOffset;
-                               }
-                               LOG.info("Found new partition {} of table {}, 
forwarding splits to downstream readers",
-                                               partSpec, 
tablePath.getFullName());
-                               HiveTableInputSplit[] splits = 
HiveTableInputFormat.createInputSplits(
-                                               this.readerParallelism,
-                                               
Collections.singletonList(fetcherContext.toHiveTablePartition(partition)),
-                                               this.conf.conf());
-                               long modificationTime = 
fetcherContext.getModificationTime(partition, tuple2.f1);
-                               for (HiveTableInputSplit split : splits) {
-                                       context.collect(new 
TimestampedHiveInputSplit(modificationTime, split));
-                               }
-                       }
-               }
-
-               if (maxPartitionOffset != null || 
maxPartitionOffset.compareTo(currentReadOffset) > 0) {
-                       currentReadOffset = maxPartitionOffset;
-                       distinctPartitions.clear();
-                       distinctPartitions.addAll(nextDistinctParts);
-               }
-       }
-
-       @Override
-       public void snapshotState(FunctionSnapshotContext context) throws 
Exception {
-               Preconditions.checkState(this.currReadOffsetState != null,
-                               "The " + getClass().getSimpleName() + " state 
has not been properly initialized.");
-
-               this.currReadOffsetState.clear();
-               this.currReadOffsetState.add(this.currentReadOffset);
-
-               this.distinctPartsState.clear();
-               this.distinctPartsState.add(new 
ArrayList<>(this.distinctPartitions));
-
-               if (LOG.isDebugEnabled()) {
-                       LOG.debug("{} checkpointed {}.", 
getClass().getSimpleName(), currentReadOffset);
-               }
-       }
-
-       @Override
-       public void close() {
-               cancel();
-       }
-
-       @Override
-       public void cancel() {
-               // this is to cover the case where cancel() is called before 
the run()
-               if (checkpointLock != null) {
-                       synchronized (checkpointLock) {
-                               isRunning = false;
-                       }
-               } else {
-                       isRunning = false;
-               }
-               try {
-                       if (this.fetcherContext != null) {
-                               this.fetcherContext.close();
-                       }
-               } catch (Exception e) {
-                       throw new TableException("failed to close the 
context.");
-               }
-       }
-}
diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/HiveContinuousPartitionFetcher.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/HiveContinuousPartitionFetcher.java
index 67e0d5f..1d487be 100644
--- 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/HiveContinuousPartitionFetcher.java
+++ 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/HiveContinuousPartitionFetcher.java
@@ -33,7 +33,7 @@ import java.util.List;
  * A {@link ContinuousPartitionFetcher} for hive table.
  */
 @Internal
-public class HiveContinuousPartitionFetcher<T extends Comparable> implements 
ContinuousPartitionFetcher<Partition, T> {
+public class HiveContinuousPartitionFetcher<T extends Comparable<T>> 
implements ContinuousPartitionFetcher<Partition, T> {
 
        private static final long serialVersionUID = 1L;
 
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/ContinuousPartitionFetcher.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/ContinuousPartitionFetcher.java
index 67a2dde..921e12e 100644
--- 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/ContinuousPartitionFetcher.java
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/ContinuousPartitionFetcher.java
@@ -33,7 +33,7 @@ import java.util.List;
  *          or create-time order, be String when fetches in partition-name 
order.
  */
 @Internal
-public interface ContinuousPartitionFetcher<P, T extends Comparable> extends 
PartitionFetcher<P> {
+public interface ContinuousPartitionFetcher<P, T extends Comparable<T>> 
extends PartitionFetcher<P> {
 
        /**
         * Fetch partitions by previous partition offset (Including).
@@ -47,7 +47,7 @@ public interface ContinuousPartitionFetcher<P, T extends 
Comparable> extends Par
         * @param <T> The type of partition offset, the type could be Long when 
fetches in partition-time
         *          or create-time order, be String when fetches in 
partition-name order.
         */
-       interface Context<P, T extends Comparable> extends 
PartitionFetcher.Context<P>  {
+       interface Context<P, T extends Comparable<T>> extends 
PartitionFetcher.Context<P>  {
                /**
                 * The table full path.
                 */
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/PartitionFetcher.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/PartitionFetcher.java
index 6e322de..4b94af0 100644
--- 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/PartitionFetcher.java
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/PartitionFetcher.java
@@ -73,7 +73,7 @@ public interface PartitionFetcher<P> extends Serializable {
                 * @param <P> The type of partition value.
                 * @param <T> The tye of the partition offset, the partition 
offset is comparable.
                 */
-               interface ComparablePartitionValue<P, T extends Comparable> 
extends Serializable {
+               interface ComparablePartitionValue<P, T extends Comparable<T>> 
extends Serializable {
 
                        /**
                         * Get the partition value.

Reply via email to