This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 0189db46316eff35d3d07f57ebcf116b1f61cea6
Author: Zhijiang <wangzhijiang...@aliyun.com>
AuthorDate: Mon May 20 18:55:59 2019 +0800

    [hotfix][network] Introduce PartititonRequestClient interface for creating 
simple client instance in tests
---
 .../runtime/io/network/ConnectionManager.java      |  2 -
 .../runtime/io/network/LocalConnectionManager.java |  2 -
 .../runtime/io/network/PartitionRequestClient.java | 70 ++++++++++++++++++++++
 .../io/network/netty/NettyConnectionManager.java   |  1 +
 ...lient.java => NettyPartitionRequestClient.java} | 22 +++----
 .../netty/PartitionRequestClientFactory.java       | 21 +++----
 .../partition/consumer/RemoteInputChannel.java     |  2 +-
 .../netty/ClientTransportErrorHandlingTest.java    | 19 +++---
 ...editBasedPartitionRequestClientHandlerTest.java |  5 +-
 ...t.java => NettyPartitionRequestClientTest.java} |  9 +--
 .../network/partition/InputChannelTestUtils.java   |  2 +-
 .../partition/consumer/RemoteInputChannelTest.java |  2 +-
 12 files changed, 113 insertions(+), 44 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ConnectionManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ConnectionManager.java
index 75f39e9..c342750 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ConnectionManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ConnectionManager.java
@@ -18,8 +18,6 @@
 
 package org.apache.flink.runtime.io.network;
 
-import org.apache.flink.runtime.io.network.netty.PartitionRequestClient;
-
 import java.io.IOException;
 
 /**
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/LocalConnectionManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/LocalConnectionManager.java
index 46ca7fc..319a9ea 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/LocalConnectionManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/LocalConnectionManager.java
@@ -18,8 +18,6 @@
 
 package org.apache.flink.runtime.io.network;
 
-import org.apache.flink.runtime.io.network.netty.PartitionRequestClient;
-
 /**
  * A connection manager implementation to bypass setup overhead for task 
managers running in local
  * execution mode.
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/PartitionRequestClient.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/PartitionRequestClient.java
new file mode 100644
index 0000000..a215700
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/PartitionRequestClient.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network;
+
+import org.apache.flink.runtime.event.TaskEvent;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import 
org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel;
+
+import java.io.IOException;
+
+/**
+ * Client to send messages or task events via network for {@link 
RemoteInputChannel}.
+ */
+public interface PartitionRequestClient {
+
+       /**
+        * Requests a remote sub partition.
+        *
+        * @param partitionId The identifier of result partition to be 
requested.
+        * @param subpartitionIndex The sub partition index in the requested 
result partition.
+        * @param inputChannel The remote input channel for requesting the sub 
partition.
+        * @param delayMs The request is scheduled within a delay time.
+        */
+       void requestSubpartition(
+               ResultPartitionID partitionId,
+               int subpartitionIndex,
+               RemoteInputChannel inputChannel,
+               int delayMs) throws IOException;
+
+       /**
+        * Notifies available credits from one remote input channel.
+        *
+        * @param inputChannel The remote input channel who announces the 
available credits.
+        */
+       void notifyCreditAvailable(RemoteInputChannel inputChannel);
+
+       /**
+        * Sends a task event backwards to an intermediate result partition.
+        *
+        * @param partitionId The identifier of result partition.
+        * @param event The task event to be sent.
+        * @param inputChannel The remote input channel for sending this event.
+        */
+       void sendTaskEvent(ResultPartitionID partitionId, TaskEvent event, 
RemoteInputChannel inputChannel) throws IOException;
+
+       /**
+        * Cancels the partition request for the given remote input channel and 
removes
+        * this client from factory if it is not referenced by any other input 
channels.
+        *
+        * @param inputChannel The remote input channel for canceling partition 
and to
+        *                     be removed from network stack.
+        */
+       void close(RemoteInputChannel inputChannel) throws IOException;
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConnectionManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConnectionManager.java
index 73d9b11..ef3db13 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConnectionManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConnectionManager.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.io.network.netty;
 
 import org.apache.flink.runtime.io.network.ConnectionID;
 import org.apache.flink.runtime.io.network.ConnectionManager;
+import org.apache.flink.runtime.io.network.PartitionRequestClient;
 import org.apache.flink.runtime.io.network.TaskEventPublisher;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionProvider;
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClient.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyPartitionRequestClient.java
similarity index 94%
rename from 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClient.java
rename to 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyPartitionRequestClient.java
index 9c9deaa..4d42a3c 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClient.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyPartitionRequestClient.java
@@ -19,8 +19,9 @@
 package org.apache.flink.runtime.io.network.netty;
 
 import org.apache.flink.runtime.event.TaskEvent;
-import org.apache.flink.runtime.io.network.NetworkClientHandler;
 import org.apache.flink.runtime.io.network.ConnectionID;
+import org.apache.flink.runtime.io.network.NetworkClientHandler;
+import org.apache.flink.runtime.io.network.PartitionRequestClient;
 import 
org.apache.flink.runtime.io.network.netty.exception.LocalTransportException;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import 
org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel;
@@ -47,9 +48,9 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
  * <p>This client is shared by all remote input channels, which request a 
partition
  * from the same {@link ConnectionID}.
  */
-public class PartitionRequestClient {
+public class NettyPartitionRequestClient implements PartitionRequestClient {
 
-       private static final Logger LOG = 
LoggerFactory.getLogger(PartitionRequestClient.class);
+       private static final Logger LOG = 
LoggerFactory.getLogger(NettyPartitionRequestClient.class);
 
        private final Channel tcpChannel;
 
@@ -62,7 +63,7 @@ public class PartitionRequestClient {
        /** If zero, the underlying TCP channel can be safely closed. */
        private final AtomicDisposableReferenceCounter closeReferenceCounter = 
new AtomicDisposableReferenceCounter();
 
-       PartitionRequestClient(
+       NettyPartitionRequestClient(
                        Channel tcpChannel,
                        NetworkClientHandler clientHandler,
                        ConnectionID connectionId,
@@ -94,7 +95,8 @@ public class PartitionRequestClient {
         * <p>The request goes to the remote producer, for which this partition
         * request client instance has been created.
         */
-       public ChannelFuture requestSubpartition(
+       @Override
+       public void requestSubpartition(
                        final ResultPartitionID partitionId,
                        final int subpartitionIndex,
                        final RemoteInputChannel inputChannel,
@@ -128,7 +130,6 @@ public class PartitionRequestClient {
                if (delayMs == 0) {
                        ChannelFuture f = tcpChannel.writeAndFlush(request);
                        f.addListener(listener);
-                       return f;
                } else {
                        final ChannelFuture[] f = new ChannelFuture[1];
                        tcpChannel.eventLoop().schedule(new Runnable() {
@@ -138,19 +139,18 @@ public class PartitionRequestClient {
                                        f[0].addListener(listener);
                                }
                        }, delayMs, TimeUnit.MILLISECONDS);
-
-                       return f[0];
                }
        }
 
        /**
         * Sends a task event backwards to an intermediate result partition 
producer.
-        * <p>
-        * Backwards task events flow between readers and writers and therefore
+        *
+        * <p>Backwards task events flow between readers and writers and 
therefore
         * will only work when both are running at the same time, which is only
         * guaranteed to be the case when both the respective producer and
         * consumer task run pipelined.
         */
+       @Override
        public void sendTaskEvent(ResultPartitionID partitionId, TaskEvent 
event, final RemoteInputChannel inputChannel) throws IOException {
                checkNotClosed();
 
@@ -170,10 +170,12 @@ public class PartitionRequestClient {
                                                });
        }
 
+       @Override
        public void notifyCreditAvailable(RemoteInputChannel inputChannel) {
                clientHandler.notifyCreditAvailable(inputChannel);
        }
 
+       @Override
        public void close(RemoteInputChannel inputChannel) throws IOException {
 
                clientHandler.removeInputChannel(inputChannel);
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactory.java
index 2df094b..229121e 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactory.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.io.network.netty;
 
 import org.apache.flink.runtime.io.network.ConnectionID;
 import org.apache.flink.runtime.io.network.NetworkClientHandler;
+import org.apache.flink.runtime.io.network.PartitionRequestClient;
 import 
org.apache.flink.runtime.io.network.netty.exception.LocalTransportException;
 import 
org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException;
 import 
org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel;
@@ -33,7 +34,7 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 
 /**
- * Factory for {@link PartitionRequestClient} instances.
+ * Factory for {@link NettyPartitionRequestClient} instances.
  *
  * <p>Instances of partition requests clients are shared among several {@link 
RemoteInputChannel}
  * instances.
@@ -50,19 +51,19 @@ class PartitionRequestClientFactory {
 
        /**
         * Atomically establishes a TCP connection to the given remote address 
and
-        * creates a {@link PartitionRequestClient} instance for this 
connection.
+        * creates a {@link NettyPartitionRequestClient} instance for this 
connection.
         */
-       PartitionRequestClient createPartitionRequestClient(ConnectionID 
connectionId) throws IOException, InterruptedException {
+       NettyPartitionRequestClient createPartitionRequestClient(ConnectionID 
connectionId) throws IOException, InterruptedException {
                Object entry;
-               PartitionRequestClient client = null;
+               NettyPartitionRequestClient client = null;
 
                while (client == null) {
                        entry = clients.get(connectionId);
 
                        if (entry != null) {
                                // Existing channel or connecting channel
-                               if (entry instanceof PartitionRequestClient) {
-                                       client = (PartitionRequestClient) entry;
+                               if (entry instanceof 
NettyPartitionRequestClient) {
+                                       client = (NettyPartitionRequestClient) 
entry;
                                }
                                else {
                                        ConnectingChannel future = 
(ConnectingChannel) entry;
@@ -92,7 +93,7 @@ class PartitionRequestClientFactory {
                                        clients.replace(connectionId, old, 
client);
                                }
                                else {
-                                       client = (PartitionRequestClient) old;
+                                       client = (NettyPartitionRequestClient) 
old;
                                }
                        }
 
@@ -166,7 +167,7 @@ class PartitionRequestClientFactory {
                        synchronized (connectLock) {
                                try {
                                        NetworkClientHandler clientHandler = 
channel.pipeline().get(NetworkClientHandler.class);
-                                       partitionRequestClient = new 
PartitionRequestClient(
+                                       partitionRequestClient = new 
NettyPartitionRequestClient(
                                                channel, clientHandler, 
connectionId, clientFactory);
 
                                        if (disposeRequestClient) {
@@ -181,11 +182,11 @@ class PartitionRequestClientFactory {
                        }
                }
 
-               private volatile PartitionRequestClient partitionRequestClient;
+               private volatile NettyPartitionRequestClient 
partitionRequestClient;
 
                private volatile Throwable error;
 
-               private PartitionRequestClient waitForChannel() throws 
IOException, InterruptedException {
+               private NettyPartitionRequestClient waitForChannel() throws 
IOException, InterruptedException {
                        synchronized (connectLock) {
                                while (error == null && partitionRequestClient 
== null) {
                                        connectLock.wait(2000);
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
index 397c4fe..fabc495 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
@@ -24,13 +24,13 @@ import org.apache.flink.core.memory.MemorySegmentProvider;
 import org.apache.flink.runtime.event.TaskEvent;
 import org.apache.flink.runtime.io.network.ConnectionID;
 import org.apache.flink.runtime.io.network.ConnectionManager;
+import org.apache.flink.runtime.io.network.PartitionRequestClient;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.buffer.BufferListener;
 import org.apache.flink.runtime.io.network.buffer.BufferProvider;
 import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
 import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
 import org.apache.flink.runtime.io.network.metrics.InputChannelMetrics;
-import org.apache.flink.runtime.io.network.netty.PartitionRequestClient;
 import 
org.apache.flink.runtime.io.network.partition.PartitionNotFoundException;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.util.ExceptionUtils;
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ClientTransportErrorHandlingTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ClientTransportErrorHandlingTest.java
index 266461b..a0689e6 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ClientTransportErrorHandlingTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ClientTransportErrorHandlingTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.io.network.netty;
 
 import org.apache.flink.runtime.io.network.ConnectionID;
 import org.apache.flink.runtime.io.network.NetworkClientHandler;
+import org.apache.flink.runtime.io.network.PartitionRequestClient;
 import org.apache.flink.runtime.io.network.TaskEventDispatcher;
 import 
org.apache.flink.runtime.io.network.netty.NettyTestUtil.NettyServerAndClient;
 import 
org.apache.flink.runtime.io.network.netty.exception.LocalTransportException;
@@ -32,7 +33,6 @@ import org.apache.flink.runtime.testingUtils.TestingUtils;
 
 import org.apache.flink.shaded.netty4.io.netty.buffer.Unpooled;
 import org.apache.flink.shaded.netty4.io.netty.channel.Channel;
-import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFuture;
 import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler;
 import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
 import 
org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter;
@@ -56,7 +56,6 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.isA;
@@ -113,7 +112,7 @@ public class ClientTransportErrorHandlingTest {
                        }
                });
 
-               PartitionRequestClient requestClient = new 
PartitionRequestClient(
+               PartitionRequestClient requestClient = new 
NettyPartitionRequestClient(
                                ch, handler, mock(ConnectionID.class), 
mock(PartitionRequestClientFactory.class));
 
                // Create input channels
@@ -134,22 +133,20 @@ public class ClientTransportErrorHandlingTest {
                }).when(rich[1]).onError(isA(LocalTransportException.class));
 
                // First request is successful
-               ChannelFuture f = requestClient.requestSubpartition(new 
ResultPartitionID(), 0, rich[0], 0);
-               assertTrue(f.await().isSuccess());
+               requestClient.requestSubpartition(new ResultPartitionID(), 0, 
rich[0], 0);
 
                // Second request is *not* successful
-               f = requestClient.requestSubpartition(new ResultPartitionID(), 
0, rich[1], 0);
-               assertFalse(f.await().isSuccess());
+               requestClient.requestSubpartition(new ResultPartitionID(), 0, 
rich[1], 0);
 
-               // Only the second channel should be notified about the error
-               verify(rich[0], 
times(0)).onError(any(LocalTransportException.class));
-
-               // Wait for the notification
+               // Wait for the notification and it could confirm all the 
request operations are done
                if (!sync.await(TestingUtils.TESTING_DURATION().toMillis(), 
TimeUnit.MILLISECONDS)) {
                        fail("Timed out after waiting for " + 
TestingUtils.TESTING_DURATION().toMillis() +
                                        " ms to be notified about the channel 
error.");
                }
 
+               // Only the second channel should be notified about the error
+               verify(rich[0], 
times(0)).onError(any(LocalTransportException.class));
+
                shutdown(serverAndClient);
        }
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandlerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandlerTest.java
index 92ae98d..1517dc3 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandlerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandlerTest.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.io.network.netty;
 
 import org.apache.flink.runtime.io.network.ConnectionID;
+import org.apache.flink.runtime.io.network.PartitionRequestClient;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.buffer.BufferListener;
 import org.apache.flink.runtime.io.network.buffer.BufferPool;
@@ -244,7 +245,7 @@ public class CreditBasedPartitionRequestClientHandlerTest {
        public void testNotifyCreditAvailable() throws Exception {
                final CreditBasedPartitionRequestClientHandler handler = new 
CreditBasedPartitionRequestClientHandler();
                final EmbeddedChannel channel = new EmbeddedChannel(handler);
-               final PartitionRequestClient client = new 
PartitionRequestClient(
+               final PartitionRequestClient client = new 
NettyPartitionRequestClient(
                        channel, handler, mock(ConnectionID.class), 
mock(PartitionRequestClientFactory.class));
 
                final NetworkBufferPool networkBufferPool = new 
NetworkBufferPool(10, 32, 2);
@@ -344,7 +345,7 @@ public class CreditBasedPartitionRequestClientHandlerTest {
        public void testNotifyCreditAvailableAfterReleased() throws Exception {
                final CreditBasedPartitionRequestClientHandler handler = new 
CreditBasedPartitionRequestClientHandler();
                final EmbeddedChannel channel = new EmbeddedChannel(handler);
-               final PartitionRequestClient client = new 
PartitionRequestClient(
+               final PartitionRequestClient client = new 
NettyPartitionRequestClient(
                        channel, handler, mock(ConnectionID.class), 
mock(PartitionRequestClientFactory.class));
 
                final NetworkBufferPool networkBufferPool = new 
NetworkBufferPool(10, 32, 2);
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyPartitionRequestClientTest.java
similarity index 95%
rename from 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientTest.java
rename to 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyPartitionRequestClientTest.java
index dcc3ad4..a119b51 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyPartitionRequestClientTest.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.io.network.netty;
 
 import org.apache.flink.runtime.io.network.ConnectionID;
+import org.apache.flink.runtime.io.network.PartitionRequestClient;
 import org.apache.flink.runtime.io.network.buffer.BufferPool;
 import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
 import org.apache.flink.runtime.io.network.netty.NettyMessage.PartitionRequest;
@@ -41,9 +42,9 @@ import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.mock;
 
 /**
- * Tests for {@link PartitionRequestClient}.
+ * Tests for {@link NettyPartitionRequestClient}.
  */
-public class PartitionRequestClientTest {
+public class NettyPartitionRequestClientTest {
 
        @Test
        public void testRetriggerPartitionRequest() throws Exception {
@@ -51,7 +52,7 @@ public class PartitionRequestClientTest {
 
                final PartitionRequestClientHandler handler = new 
PartitionRequestClientHandler();
                final EmbeddedChannel channel = new EmbeddedChannel(handler);
-               final PartitionRequestClient client = new 
PartitionRequestClient(
+               final PartitionRequestClient client = new 
NettyPartitionRequestClient(
                        channel, handler, mock(ConnectionID.class), 
mock(PartitionRequestClientFactory.class));
 
                final int numExclusiveBuffers = 2;
@@ -110,7 +111,7 @@ public class PartitionRequestClientTest {
        public void testDoublePartitionRequest() throws Exception {
                final PartitionRequestClientHandler handler = new 
PartitionRequestClientHandler();
                final EmbeddedChannel channel = new EmbeddedChannel(handler);
-               final PartitionRequestClient client = new 
PartitionRequestClient(
+               final PartitionRequestClient client = new 
NettyPartitionRequestClient(
                        channel, handler, mock(ConnectionID.class), 
mock(PartitionRequestClientFactory.class));
 
                final int numExclusiveBuffers = 2;
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputChannelTestUtils.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputChannelTestUtils.java
index ece1009..4ff472e 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputChannelTestUtils.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputChannelTestUtils.java
@@ -22,8 +22,8 @@ import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.core.memory.MemorySegmentProvider;
 import org.apache.flink.runtime.io.network.ConnectionID;
 import org.apache.flink.runtime.io.network.ConnectionManager;
+import org.apache.flink.runtime.io.network.PartitionRequestClient;
 import org.apache.flink.runtime.io.network.metrics.InputChannelMetrics;
-import org.apache.flink.runtime.io.network.netty.PartitionRequestClient;
 import 
org.apache.flink.runtime.io.network.partition.consumer.InputChannelBuilder;
 import 
org.apache.flink.runtime.io.network.partition.consumer.LocalInputChannel;
 import 
org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel;
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
index e7f0648..6c03106 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
@@ -22,11 +22,11 @@ import org.apache.flink.core.memory.MemorySegmentProvider;
 import org.apache.flink.runtime.execution.CancelTaskException;
 import org.apache.flink.runtime.io.network.ConnectionID;
 import org.apache.flink.runtime.io.network.ConnectionManager;
+import org.apache.flink.runtime.io.network.PartitionRequestClient;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import 
org.apache.flink.runtime.io.network.buffer.BufferListener.NotificationResult;
 import org.apache.flink.runtime.io.network.buffer.BufferPool;
 import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
-import org.apache.flink.runtime.io.network.netty.PartitionRequestClient;
 import org.apache.flink.runtime.io.network.partition.InputChannelTestUtils;
 import org.apache.flink.runtime.io.network.partition.ProducerFailedException;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;

Reply via email to