lirui-apache commented on a change in pull request #13998:
URL: https://github.com/apache/flink/pull/13998#discussion_r526058826
##########
File path:
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/ContinuousHiveSplitEnumerator.java
##########
@@ -158,12 +142,64 @@ public void close() throws IOException {
}
}
- private Void monitorAndGetSplits() throws Exception {
- stateLock.writeLock().lock();
- try {
+ private void handleNewSplits(NewSplitsAndState<T> newSplitsAndState,
Throwable error) {
+ if (error != null) {
+ // we need to failover because the worker thread is
stateful
+ throw new FlinkHiveException("Failed to enumerate
files", error);
+ }
+ this.currentReadOffset = newSplitsAndState.offset;
+ this.seenPartitionsSinceOffset =
newSplitsAndState.seenPartitions;
+ splitAssigner.addSplits(new
ArrayList<>(newSplitsAndState.newSplits));
+ assignSplits();
+ }
+
+ private void assignSplits() {
+ final Iterator<Map.Entry<Integer, String>> awaitingReader =
readersAwaitingSplit.entrySet().iterator();
+ while (awaitingReader.hasNext()) {
+ final Map.Entry<Integer, String> nextAwaiting =
awaitingReader.next();
+ final String hostname = nextAwaiting.getValue();
+ final int awaitingSubtask = nextAwaiting.getKey();
+ final Optional<FileSourceSplit> nextSplit =
splitAssigner.getNext(hostname);
+ if (nextSplit.isPresent()) {
+ enumeratorContext.assignSplit((HiveSourceSplit)
nextSplit.get(), awaitingSubtask);
+ awaitingReader.remove();
+ } else {
+ break;
+ }
+ }
+ }
+
+ private static class NewPartitionMonitor<T extends Comparable<T>>
implements Callable<NewSplitsAndState<T>> {
+
+ // keep these locally so that we don't need to share state with
main thread
+ private T currentReadOffset;
+ private final Set<List<String>> seenPartitionsSinceOffset;
+
+ private final ObjectPath tablePath;
+ private final JobConf jobConf;
+ private final ContinuousPartitionFetcher<Partition, T> fetcher;
+ private final
HiveTableSource.HiveContinuousPartitionFetcherContext<T> fetcherContext;
+
+ NewPartitionMonitor(
+ T currentReadOffset,
+ Collection<List<String>>
seenPartitionsSinceOffset,
+ ObjectPath tablePath,
+ JobConf jobConf,
+ ContinuousPartitionFetcher<Partition, T>
fetcher,
+
HiveTableSource.HiveContinuousPartitionFetcherContext<T> fetcherContext) {
+ this.currentReadOffset = currentReadOffset;
+ this.seenPartitionsSinceOffset = new
HashSet<>(seenPartitionsSinceOffset);
+ this.tablePath = tablePath;
+ this.jobConf = jobConf;
+ this.fetcher = fetcher;
+ this.fetcherContext = fetcherContext;
+ }
+
+ @Override
+ public NewSplitsAndState<T> call() throws Exception {
List<Tuple2<Partition, T>> partitions =
fetcher.fetchPartitions(fetcherContext, currentReadOffset);
if (partitions.isEmpty()) {
- return null;
+ return new
NewSplitsAndState<>(Collections.emptyList(), currentReadOffset,
seenPartitionsSinceOffset);
Review comment:
The `seenPartitionsSinceOffset` here is local to this class and is a
copy of the set parameter in constructor. So it's not shared with main thread
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]