This is an automated email from the ASF dual-hosted git repository. sewen pushed a commit to branch release-1.11 in repository https://gitbox.apache.org/repos/asf/flink.git
commit 301046ad36aeb2de0c97aeecc4fabc9dd2eb1520 Author: Stephan Ewen <[email protected]> AuthorDate: Fri Oct 2 13:49:38 2020 +0200 [FLINK-19492][core] Consolidate Source Events between Source API and Split Reader API --- .../base/source/reader/SourceReaderBase.java | 2 +- .../source/reader/mocks/MockSplitEnumerator.java | 2 +- .../connector}/source/event/NoMoreSplitsEvent.java | 2 +- .../connector}/source/event/RequestSplitEvent.java | 15 ++++++++++- .../source/lib/util/IteratorSourceEnumerator.java | 6 +++-- .../source/lib/util/IteratorSourceReader.java | 6 +++-- .../source/lib/util/NoSplitAvailableEvent.java | 31 ---------------------- .../source/lib/util/SplitRequestEvent.java | 31 ---------------------- 8 files changed, 25 insertions(+), 70 deletions(-) diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java index 97a6a95..1182cae 100644 --- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java +++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java @@ -24,8 +24,8 @@ import org.apache.flink.api.connector.source.SourceOutput; import org.apache.flink.api.connector.source.SourceReader; import org.apache.flink.api.connector.source.SourceReaderContext; import org.apache.flink.api.connector.source.SourceSplit; +import org.apache.flink.api.connector.source.event.NoMoreSplitsEvent; import org.apache.flink.configuration.Configuration; -import org.apache.flink.connector.base.source.event.NoMoreSplitsEvent; import org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager; import org.apache.flink.connector.base.source.reader.splitreader.SplitReader; import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue; diff --git a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/MockSplitEnumerator.java b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/MockSplitEnumerator.java index 2c84a70..8df3394 100644 --- a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/MockSplitEnumerator.java +++ b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/MockSplitEnumerator.java @@ -22,8 +22,8 @@ import org.apache.flink.api.connector.source.SourceEvent; import org.apache.flink.api.connector.source.SplitEnumerator; import org.apache.flink.api.connector.source.SplitEnumeratorContext; import org.apache.flink.api.connector.source.SplitsAssignment; +import org.apache.flink.api.connector.source.event.NoMoreSplitsEvent; import org.apache.flink.api.connector.source.mocks.MockSourceSplit; -import org.apache.flink.connector.base.source.event.NoMoreSplitsEvent; import java.io.IOException; import java.util.ArrayList; diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/event/NoMoreSplitsEvent.java b/flink-core/src/main/java/org/apache/flink/api/connector/source/event/NoMoreSplitsEvent.java similarity index 96% rename from flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/event/NoMoreSplitsEvent.java rename to flink-core/src/main/java/org/apache/flink/api/connector/source/event/NoMoreSplitsEvent.java index e2ca2e4..aa26808 100644 --- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/event/NoMoreSplitsEvent.java +++ b/flink-core/src/main/java/org/apache/flink/api/connector/source/event/NoMoreSplitsEvent.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.flink.connector.base.source.event; +package org.apache.flink.api.connector.source.event; import org.apache.flink.api.connector.source.SourceEvent; diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/event/RequestSplitEvent.java b/flink-core/src/main/java/org/apache/flink/api/connector/source/event/RequestSplitEvent.java similarity index 83% rename from flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/event/RequestSplitEvent.java rename to flink-core/src/main/java/org/apache/flink/api/connector/source/event/RequestSplitEvent.java index 4bdb92a..9ff2293 100644 --- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/event/RequestSplitEvent.java +++ b/flink-core/src/main/java/org/apache/flink/api/connector/source/event/RequestSplitEvent.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.flink.connector.base.source.event; +package org.apache.flink.api.connector.source.event; import org.apache.flink.api.connector.source.SourceEvent; @@ -26,6 +26,9 @@ import java.util.Objects; /** * An event to request splits, sent typically from the Source Reader to the Source Enumerator. + * + * <p>This event optionally carries the hostname of the location where the reader runs, to support + * locality-aware work assignment. */ public final class RequestSplitEvent implements SourceEvent { @@ -34,6 +37,16 @@ public final class RequestSplitEvent implements SourceEvent { @Nullable private final String hostName; + /** + * Creates a new {@code RequestSplitEvent} with no host information. + */ + public RequestSplitEvent() { + this(null); + } + + /** + * Creates a new {@code RequestSplitEvent} with a hostname. + */ public RequestSplitEvent(@Nullable String hostName) { this.hostName = hostName; } diff --git a/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/IteratorSourceEnumerator.java b/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/IteratorSourceEnumerator.java index 3ee2597..91bc283 100644 --- a/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/IteratorSourceEnumerator.java +++ b/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/IteratorSourceEnumerator.java @@ -21,6 +21,8 @@ package org.apache.flink.api.connector.source.lib.util; import org.apache.flink.api.connector.source.SourceEvent; import org.apache.flink.api.connector.source.SplitEnumerator; import org.apache.flink.api.connector.source.SplitEnumeratorContext; +import org.apache.flink.api.connector.source.event.NoMoreSplitsEvent; +import org.apache.flink.api.connector.source.event.RequestSplitEvent; import org.apache.flink.util.FlinkRuntimeException; import java.util.ArrayDeque; @@ -56,7 +58,7 @@ public class IteratorSourceEnumerator<SplitT extends IteratorSourceSplit<?, ?>> @Override public void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) { - if (!(sourceEvent instanceof SplitRequestEvent)) { + if (!(sourceEvent instanceof RequestSplitEvent)) { throw new FlinkRuntimeException("Unrecognized event: " + sourceEvent); } @@ -64,7 +66,7 @@ public class IteratorSourceEnumerator<SplitT extends IteratorSourceSplit<?, ?>> if (nextSplit != null) { context.assignSplit(nextSplit, subtaskId); } else { - context.sendEventToSourceReader(subtaskId, new NoSplitAvailableEvent()); + context.sendEventToSourceReader(subtaskId, new NoMoreSplitsEvent()); } } diff --git a/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/IteratorSourceReader.java b/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/IteratorSourceReader.java index f9d7461..54ddb35 100644 --- a/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/IteratorSourceReader.java +++ b/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/IteratorSourceReader.java @@ -22,6 +22,8 @@ import org.apache.flink.api.connector.source.ReaderOutput; import org.apache.flink.api.connector.source.SourceEvent; import org.apache.flink.api.connector.source.SourceReader; import org.apache.flink.api.connector.source.SourceReaderContext; +import org.apache.flink.api.connector.source.event.NoMoreSplitsEvent; +import org.apache.flink.api.connector.source.event.RequestSplitEvent; import org.apache.flink.core.io.InputStatus; import org.apache.flink.util.FlinkRuntimeException; @@ -84,7 +86,7 @@ public class IteratorSourceReader<E, IterT extends Iterator<E>, SplitT extends I public void start() { // request a split only if we did not get one during restore if (iterator == null) { - context.sendSourceEventToCoordinator(new SplitRequestEvent()); + context.sendSourceEventToCoordinator(new RequestSplitEvent()); } } @@ -133,7 +135,7 @@ public class IteratorSourceReader<E, IterT extends Iterator<E>, SplitT extends I @Override public void handleSourceEvents(SourceEvent sourceEvent) { - if (sourceEvent instanceof NoSplitAvailableEvent) { + if (sourceEvent instanceof NoMoreSplitsEvent) { // non-null queue signals splits were assigned, in this case no splits remainingSplits = new ArrayDeque<>(); } else { diff --git a/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/NoSplitAvailableEvent.java b/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/NoSplitAvailableEvent.java deleted file mode 100644 index 5a28565..0000000 --- a/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/NoSplitAvailableEvent.java +++ /dev/null @@ -1,31 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.connector.source.lib.util; - -import org.apache.flink.api.connector.source.SourceEvent; -import org.apache.flink.api.connector.source.SourceReader; -import org.apache.flink.api.connector.source.SplitEnumerator; - -/** - * A simple {@link SourceEvent} indicating that there is no split available for the reader (any more). - * This event is typically sent from the {@link SplitEnumerator} to the {@link SourceReader}. - */ -public final class NoSplitAvailableEvent implements SourceEvent { - private static final long serialVersionUID = 1L; -} diff --git a/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/SplitRequestEvent.java b/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/SplitRequestEvent.java deleted file mode 100644 index 3e561b7..0000000 --- a/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/SplitRequestEvent.java +++ /dev/null @@ -1,31 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.connector.source.lib.util; - -import org.apache.flink.api.connector.source.SourceEvent; -import org.apache.flink.api.connector.source.SourceReader; -import org.apache.flink.api.connector.source.SplitEnumerator; - -/** - * A {@link SourceEvent} representing the request for a split, typically sent from the - * {@link SourceReader} to the {@link SplitEnumerator}. - */ -public final class SplitRequestEvent implements SourceEvent { - private static final long serialVersionUID = 1L; -}
