JingsongLi commented on a change in pull request #13998:
URL: https://github.com/apache/flink/pull/13998#discussion_r525980833



##########
File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/ContinuousHiveSplitEnumerator.java
##########
@@ -158,12 +142,64 @@ public void close() throws IOException {
                }
        }
 
-       private Void monitorAndGetSplits() throws Exception {
-               stateLock.writeLock().lock();
-               try {
+       private void handleNewSplits(NewSplitsAndState<T> newSplitsAndState, 
Throwable error) {
+               if (error != null) {
+                       // we need to failover because the worker thread is 
stateful
+                       throw new FlinkHiveException("Failed to enumerate 
files", error);
+               }
+               this.currentReadOffset = newSplitsAndState.offset;
+               this.seenPartitionsSinceOffset = 
newSplitsAndState.seenPartitions;
+               splitAssigner.addSplits(new 
ArrayList<>(newSplitsAndState.newSplits));
+               assignSplits();
+       }
+
+       private void assignSplits() {
+               final Iterator<Map.Entry<Integer, String>> awaitingReader = 
readersAwaitingSplit.entrySet().iterator();
+               while (awaitingReader.hasNext()) {
+                       final Map.Entry<Integer, String> nextAwaiting = 
awaitingReader.next();
+                       final String hostname = nextAwaiting.getValue();
+                       final int awaitingSubtask = nextAwaiting.getKey();
+                       final Optional<FileSourceSplit> nextSplit = 
splitAssigner.getNext(hostname);
+                       if (nextSplit.isPresent()) {
+                               enumeratorContext.assignSplit((HiveSourceSplit) 
nextSplit.get(), awaitingSubtask);
+                               awaitingReader.remove();
+                       } else {
+                               break;
+                       }
+               }
+       }
+
+       private static class NewPartitionMonitor<T extends Comparable<T>> 
implements Callable<NewSplitsAndState<T>> {

Review comment:
       Just `PartitionMonitor`?

##########
File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/ContinuousHiveSplitEnumerator.java
##########
@@ -158,12 +142,64 @@ public void close() throws IOException {
                }
        }
 
-       private Void monitorAndGetSplits() throws Exception {
-               stateLock.writeLock().lock();
-               try {
+       private void handleNewSplits(NewSplitsAndState<T> newSplitsAndState, 
Throwable error) {
+               if (error != null) {
+                       // we need to failover because the worker thread is 
stateful
+                       throw new FlinkHiveException("Failed to enumerate 
files", error);
+               }
+               this.currentReadOffset = newSplitsAndState.offset;
+               this.seenPartitionsSinceOffset = 
newSplitsAndState.seenPartitions;
+               splitAssigner.addSplits(new 
ArrayList<>(newSplitsAndState.newSplits));
+               assignSplits();
+       }
+
+       private void assignSplits() {
+               final Iterator<Map.Entry<Integer, String>> awaitingReader = 
readersAwaitingSplit.entrySet().iterator();
+               while (awaitingReader.hasNext()) {
+                       final Map.Entry<Integer, String> nextAwaiting = 
awaitingReader.next();
+                       final String hostname = nextAwaiting.getValue();
+                       final int awaitingSubtask = nextAwaiting.getKey();
+                       final Optional<FileSourceSplit> nextSplit = 
splitAssigner.getNext(hostname);
+                       if (nextSplit.isPresent()) {
+                               enumeratorContext.assignSplit((HiveSourceSplit) 
nextSplit.get(), awaitingSubtask);
+                               awaitingReader.remove();
+                       } else {
+                               break;
+                       }
+               }
+       }
+
+       private static class NewPartitionMonitor<T extends Comparable<T>> 
implements Callable<NewSplitsAndState<T>> {
+
+               // keep these locally so that we don't need to share state with 
main thread
+               private T currentReadOffset;
+               private final Set<List<String>> seenPartitionsSinceOffset;
+
+               private final ObjectPath tablePath;
+               private final JobConf jobConf;
+               private final ContinuousPartitionFetcher<Partition, T> fetcher;
+               private final 
HiveTableSource.HiveContinuousPartitionFetcherContext<T> fetcherContext;
+
+               NewPartitionMonitor(

Review comment:
       private




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to