This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 301046ad36aeb2de0c97aeecc4fabc9dd2eb1520
Author: Stephan Ewen <[email protected]>
AuthorDate: Fri Oct 2 13:49:38 2020 +0200

    [FLINK-19492][core] Consolidate Source Events between Source API and Split 
Reader API
---
 .../base/source/reader/SourceReaderBase.java       |  2 +-
 .../source/reader/mocks/MockSplitEnumerator.java   |  2 +-
 .../connector}/source/event/NoMoreSplitsEvent.java |  2 +-
 .../connector}/source/event/RequestSplitEvent.java | 15 ++++++++++-
 .../source/lib/util/IteratorSourceEnumerator.java  |  6 +++--
 .../source/lib/util/IteratorSourceReader.java      |  6 +++--
 .../source/lib/util/NoSplitAvailableEvent.java     | 31 ----------------------
 .../source/lib/util/SplitRequestEvent.java         | 31 ----------------------
 8 files changed, 25 insertions(+), 70 deletions(-)

diff --git 
a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java
 
b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java
index 97a6a95..1182cae 100644
--- 
a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java
+++ 
b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java
@@ -24,8 +24,8 @@ import org.apache.flink.api.connector.source.SourceOutput;
 import org.apache.flink.api.connector.source.SourceReader;
 import org.apache.flink.api.connector.source.SourceReaderContext;
 import org.apache.flink.api.connector.source.SourceSplit;
+import org.apache.flink.api.connector.source.event.NoMoreSplitsEvent;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.connector.base.source.event.NoMoreSplitsEvent;
 import 
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager;
 import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
 import 
org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
diff --git 
a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/MockSplitEnumerator.java
 
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/MockSplitEnumerator.java
index 2c84a70..8df3394 100644
--- 
a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/MockSplitEnumerator.java
+++ 
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/MockSplitEnumerator.java
@@ -22,8 +22,8 @@ import org.apache.flink.api.connector.source.SourceEvent;
 import org.apache.flink.api.connector.source.SplitEnumerator;
 import org.apache.flink.api.connector.source.SplitEnumeratorContext;
 import org.apache.flink.api.connector.source.SplitsAssignment;
+import org.apache.flink.api.connector.source.event.NoMoreSplitsEvent;
 import org.apache.flink.api.connector.source.mocks.MockSourceSplit;
-import org.apache.flink.connector.base.source.event.NoMoreSplitsEvent;
 
 import java.io.IOException;
 import java.util.ArrayList;
diff --git 
a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/event/NoMoreSplitsEvent.java
 
b/flink-core/src/main/java/org/apache/flink/api/connector/source/event/NoMoreSplitsEvent.java
similarity index 96%
rename from 
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/event/NoMoreSplitsEvent.java
rename to 
flink-core/src/main/java/org/apache/flink/api/connector/source/event/NoMoreSplitsEvent.java
index e2ca2e4..aa26808 100644
--- 
a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/event/NoMoreSplitsEvent.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/connector/source/event/NoMoreSplitsEvent.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.flink.connector.base.source.event;
+package org.apache.flink.api.connector.source.event;
 
 import org.apache.flink.api.connector.source.SourceEvent;
 
diff --git 
a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/event/RequestSplitEvent.java
 
b/flink-core/src/main/java/org/apache/flink/api/connector/source/event/RequestSplitEvent.java
similarity index 83%
rename from 
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/event/RequestSplitEvent.java
rename to 
flink-core/src/main/java/org/apache/flink/api/connector/source/event/RequestSplitEvent.java
index 4bdb92a..9ff2293 100644
--- 
a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/event/RequestSplitEvent.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/connector/source/event/RequestSplitEvent.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.flink.connector.base.source.event;
+package org.apache.flink.api.connector.source.event;
 
 import org.apache.flink.api.connector.source.SourceEvent;
 
@@ -26,6 +26,9 @@ import java.util.Objects;
 
 /**
  * An event to request splits, sent typically from the Source Reader to the 
Source Enumerator.
+ *
+ * <p>This event optionally carries the hostname of the location where the 
reader runs, to support
+ * locality-aware work assignment.
  */
 public final class RequestSplitEvent implements SourceEvent {
 
@@ -34,6 +37,16 @@ public final class RequestSplitEvent implements SourceEvent {
        @Nullable
        private final String hostName;
 
+       /**
+        * Creates a new {@code RequestSplitEvent} with no host information.
+        */
+       public RequestSplitEvent() {
+               this(null);
+       }
+
+       /**
+        * Creates a new {@code RequestSplitEvent} with a hostname.
+        */
        public RequestSplitEvent(@Nullable String hostName) {
                this.hostName = hostName;
        }
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/IteratorSourceEnumerator.java
 
b/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/IteratorSourceEnumerator.java
index 3ee2597..91bc283 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/IteratorSourceEnumerator.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/IteratorSourceEnumerator.java
@@ -21,6 +21,8 @@ package org.apache.flink.api.connector.source.lib.util;
 import org.apache.flink.api.connector.source.SourceEvent;
 import org.apache.flink.api.connector.source.SplitEnumerator;
 import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.api.connector.source.event.NoMoreSplitsEvent;
+import org.apache.flink.api.connector.source.event.RequestSplitEvent;
 import org.apache.flink.util.FlinkRuntimeException;
 
 import java.util.ArrayDeque;
@@ -56,7 +58,7 @@ public class IteratorSourceEnumerator<SplitT extends 
IteratorSourceSplit<?, ?>>
 
        @Override
        public void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) {
-               if (!(sourceEvent instanceof SplitRequestEvent)) {
+               if (!(sourceEvent instanceof RequestSplitEvent)) {
                        throw new FlinkRuntimeException("Unrecognized event: " 
+ sourceEvent);
                }
 
@@ -64,7 +66,7 @@ public class IteratorSourceEnumerator<SplitT extends 
IteratorSourceSplit<?, ?>>
                if (nextSplit != null) {
                        context.assignSplit(nextSplit, subtaskId);
                } else {
-                       context.sendEventToSourceReader(subtaskId, new 
NoSplitAvailableEvent());
+                       context.sendEventToSourceReader(subtaskId, new 
NoMoreSplitsEvent());
                }
        }
 
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/IteratorSourceReader.java
 
b/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/IteratorSourceReader.java
index f9d7461..54ddb35 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/IteratorSourceReader.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/IteratorSourceReader.java
@@ -22,6 +22,8 @@ import org.apache.flink.api.connector.source.ReaderOutput;
 import org.apache.flink.api.connector.source.SourceEvent;
 import org.apache.flink.api.connector.source.SourceReader;
 import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.connector.source.event.NoMoreSplitsEvent;
+import org.apache.flink.api.connector.source.event.RequestSplitEvent;
 import org.apache.flink.core.io.InputStatus;
 import org.apache.flink.util.FlinkRuntimeException;
 
@@ -84,7 +86,7 @@ public class IteratorSourceReader<E, IterT extends 
Iterator<E>, SplitT extends I
        public void start() {
                // request a split only if we did not get one during restore
                if (iterator == null) {
-                       context.sendSourceEventToCoordinator(new 
SplitRequestEvent());
+                       context.sendSourceEventToCoordinator(new 
RequestSplitEvent());
                }
        }
 
@@ -133,7 +135,7 @@ public class IteratorSourceReader<E, IterT extends 
Iterator<E>, SplitT extends I
 
        @Override
        public void handleSourceEvents(SourceEvent sourceEvent) {
-               if (sourceEvent instanceof NoSplitAvailableEvent) {
+               if (sourceEvent instanceof NoMoreSplitsEvent) {
                        // non-null queue signals splits were assigned, in this 
case no splits
                        remainingSplits = new ArrayDeque<>();
                } else {
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/NoSplitAvailableEvent.java
 
b/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/NoSplitAvailableEvent.java
deleted file mode 100644
index 5a28565..0000000
--- 
a/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/NoSplitAvailableEvent.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.connector.source.lib.util;
-
-import org.apache.flink.api.connector.source.SourceEvent;
-import org.apache.flink.api.connector.source.SourceReader;
-import org.apache.flink.api.connector.source.SplitEnumerator;
-
-/**
- * A simple {@link SourceEvent} indicating that there is no split available 
for the reader (any more).
- * This event is typically sent from the {@link SplitEnumerator} to the {@link 
SourceReader}.
- */
-public final class NoSplitAvailableEvent implements SourceEvent {
-       private static final long serialVersionUID = 1L;
-}
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/SplitRequestEvent.java
 
b/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/SplitRequestEvent.java
deleted file mode 100644
index 3e561b7..0000000
--- 
a/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/SplitRequestEvent.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.connector.source.lib.util;
-
-import org.apache.flink.api.connector.source.SourceEvent;
-import org.apache.flink.api.connector.source.SourceReader;
-import org.apache.flink.api.connector.source.SplitEnumerator;
-
-/**
- * A {@link SourceEvent} representing the request for a split, typically sent 
from the
- * {@link SourceReader} to the {@link SplitEnumerator}.
- */
-public final class SplitRequestEvent implements SourceEvent {
-       private static final long serialVersionUID = 1L;
-}

Reply via email to