lirui-apache commented on a change in pull request #13998:
URL: https://github.com/apache/flink/pull/13998#discussion_r526058826



##########
File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/ContinuousHiveSplitEnumerator.java
##########
@@ -158,12 +142,64 @@ public void close() throws IOException {
                }
        }
 
-       private Void monitorAndGetSplits() throws Exception {
-               stateLock.writeLock().lock();
-               try {
+       private void handleNewSplits(NewSplitsAndState<T> newSplitsAndState, 
Throwable error) {
+               if (error != null) {
+                       // we need to failover because the worker thread is 
stateful
+                       throw new FlinkHiveException("Failed to enumerate 
files", error);
+               }
+               this.currentReadOffset = newSplitsAndState.offset;
+               this.seenPartitionsSinceOffset = 
newSplitsAndState.seenPartitions;
+               splitAssigner.addSplits(new 
ArrayList<>(newSplitsAndState.newSplits));
+               assignSplits();
+       }
+
+       private void assignSplits() {
+               final Iterator<Map.Entry<Integer, String>> awaitingReader = 
readersAwaitingSplit.entrySet().iterator();
+               while (awaitingReader.hasNext()) {
+                       final Map.Entry<Integer, String> nextAwaiting = 
awaitingReader.next();
+                       final String hostname = nextAwaiting.getValue();
+                       final int awaitingSubtask = nextAwaiting.getKey();
+                       final Optional<FileSourceSplit> nextSplit = 
splitAssigner.getNext(hostname);
+                       if (nextSplit.isPresent()) {
+                               enumeratorContext.assignSplit((HiveSourceSplit) 
nextSplit.get(), awaitingSubtask);
+                               awaitingReader.remove();
+                       } else {
+                               break;
+                       }
+               }
+       }
+
+       private static class NewPartitionMonitor<T extends Comparable<T>> 
implements Callable<NewSplitsAndState<T>> {
+
+               // keep these locally so that we don't need to share state with 
main thread
+               private T currentReadOffset;
+               private final Set<List<String>> seenPartitionsSinceOffset;
+
+               private final ObjectPath tablePath;
+               private final JobConf jobConf;
+               private final ContinuousPartitionFetcher<Partition, T> fetcher;
+               private final 
HiveTableSource.HiveContinuousPartitionFetcherContext<T> fetcherContext;
+
+               NewPartitionMonitor(
+                               T currentReadOffset,
+                               Collection<List<String>> 
seenPartitionsSinceOffset,
+                               ObjectPath tablePath,
+                               JobConf jobConf,
+                               ContinuousPartitionFetcher<Partition, T> 
fetcher,
+                               
HiveTableSource.HiveContinuousPartitionFetcherContext<T> fetcherContext) {
+                       this.currentReadOffset = currentReadOffset;
+                       this.seenPartitionsSinceOffset = new 
HashSet<>(seenPartitionsSinceOffset);
+                       this.tablePath = tablePath;
+                       this.jobConf = jobConf;
+                       this.fetcher = fetcher;
+                       this.fetcherContext = fetcherContext;
+               }
+
+               @Override
+               public NewSplitsAndState<T> call() throws Exception {
                        List<Tuple2<Partition, T>> partitions = 
fetcher.fetchPartitions(fetcherContext, currentReadOffset);
                        if (partitions.isEmpty()) {
-                               return null;
+                               return new 
NewSplitsAndState<>(Collections.emptyList(), currentReadOffset, 
seenPartitionsSinceOffset);

Review comment:
       The `seenPartitionsSinceOffset` here is local to this class and is a 
copy of the set parameter in constructor. So it's not shared with main thread




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to