JingsongLi commented on a change in pull request #13998: URL: https://github.com/apache/flink/pull/13998#discussion_r525980833
########## File path: flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/ContinuousHiveSplitEnumerator.java ########## @@ -158,12 +142,64 @@ public void close() throws IOException { } } - private Void monitorAndGetSplits() throws Exception { - stateLock.writeLock().lock(); - try { + private void handleNewSplits(NewSplitsAndState<T> newSplitsAndState, Throwable error) { + if (error != null) { + // we need to failover because the worker thread is stateful + throw new FlinkHiveException("Failed to enumerate files", error); + } + this.currentReadOffset = newSplitsAndState.offset; + this.seenPartitionsSinceOffset = newSplitsAndState.seenPartitions; + splitAssigner.addSplits(new ArrayList<>(newSplitsAndState.newSplits)); + assignSplits(); + } + + private void assignSplits() { + final Iterator<Map.Entry<Integer, String>> awaitingReader = readersAwaitingSplit.entrySet().iterator(); + while (awaitingReader.hasNext()) { + final Map.Entry<Integer, String> nextAwaiting = awaitingReader.next(); + final String hostname = nextAwaiting.getValue(); + final int awaitingSubtask = nextAwaiting.getKey(); + final Optional<FileSourceSplit> nextSplit = splitAssigner.getNext(hostname); + if (nextSplit.isPresent()) { + enumeratorContext.assignSplit((HiveSourceSplit) nextSplit.get(), awaitingSubtask); + awaitingReader.remove(); + } else { + break; + } + } + } + + private static class NewPartitionMonitor<T extends Comparable<T>> implements Callable<NewSplitsAndState<T>> { Review comment: Just `PartitionMonitor`? ########## File path: flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/ContinuousHiveSplitEnumerator.java ########## @@ -158,12 +142,64 @@ public void close() throws IOException { } } - private Void monitorAndGetSplits() throws Exception { - stateLock.writeLock().lock(); - try { + private void handleNewSplits(NewSplitsAndState<T> newSplitsAndState, Throwable error) { + if (error != null) { + // we need to failover because the worker thread is stateful + throw new FlinkHiveException("Failed to enumerate files", error); + } + this.currentReadOffset = newSplitsAndState.offset; + this.seenPartitionsSinceOffset = newSplitsAndState.seenPartitions; + splitAssigner.addSplits(new ArrayList<>(newSplitsAndState.newSplits)); + assignSplits(); + } + + private void assignSplits() { + final Iterator<Map.Entry<Integer, String>> awaitingReader = readersAwaitingSplit.entrySet().iterator(); + while (awaitingReader.hasNext()) { + final Map.Entry<Integer, String> nextAwaiting = awaitingReader.next(); + final String hostname = nextAwaiting.getValue(); + final int awaitingSubtask = nextAwaiting.getKey(); + final Optional<FileSourceSplit> nextSplit = splitAssigner.getNext(hostname); + if (nextSplit.isPresent()) { + enumeratorContext.assignSplit((HiveSourceSplit) nextSplit.get(), awaitingSubtask); + awaitingReader.remove(); + } else { + break; + } + } + } + + private static class NewPartitionMonitor<T extends Comparable<T>> implements Callable<NewSplitsAndState<T>> { + + // keep these locally so that we don't need to share state with main thread + private T currentReadOffset; + private final Set<List<String>> seenPartitionsSinceOffset; + + private final ObjectPath tablePath; + private final JobConf jobConf; + private final ContinuousPartitionFetcher<Partition, T> fetcher; + private final HiveTableSource.HiveContinuousPartitionFetcherContext<T> fetcherContext; + + NewPartitionMonitor( Review comment: private ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org