jeongyooneo closed pull request #22: [NEMO-68] Restrict the number of parallel 
connections between executors
URL: https://github.com/apache/incubator-nemo/pull/22
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/conf/src/main/java/edu/snu/nemo/conf/JobConf.java 
b/conf/src/main/java/edu/snu/nemo/conf/JobConf.java
index 19a654d7..da3d6713 100644
--- a/conf/src/main/java/edu/snu/nemo/conf/JobConf.java
+++ b/conf/src/main/java/edu/snu/nemo/conf/JobConf.java
@@ -141,6 +141,14 @@
   public final class IORequestHandleThreadsTotal implements Name<Integer> {
   }
 
+  /**
+   * Maximum number of parallel downloads for a runtime edge.
+   */
+  @NamedParameter(doc = "Maximum number of parallel downloads for a runtime 
edge.", short_name = "max_downloads",
+      default_value = "30")
+  public final class MaxNumDownloadsForARuntimeEdge implements Name<Integer> {
+  }
+
   /**
    * Max number of attempts for task scheduling.
    */
diff --git 
a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/BlockManagerWorker.java
 
b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/BlockManagerWorker.java
index bdc616fe..d7b13f29 100644
--- 
a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/BlockManagerWorker.java
+++ 
b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/BlockManagerWorker.java
@@ -72,6 +72,7 @@
   private final Map<String, AtomicInteger> blockToRemainingRead;
   private final SerializerManager serializerManager;
   private final Map<String, CompletableFuture<ControlMessage.Message>> 
pendingBlockLocationRequest;
+  private final BlockTransferConnectionQueue blockTransferConnectionQueue;
 
   /**
    * Constructor.
@@ -85,6 +86,7 @@
    * @param persistentConnectionToMasterMap the connection map.
    * @param byteTransfer                    the byte transfer.
    * @param serializerManager               the serializer manager.
+   * @param blockTransferConnectionQueue    restricts parallel connections
    */
   @Inject
   private BlockManagerWorker(@Parameter(JobConf.ExecutorId.class) final String 
executorId,
@@ -95,7 +97,8 @@ private 
BlockManagerWorker(@Parameter(JobConf.ExecutorId.class) final String exe
                              final RemoteFileStore remoteFileStore,
                              final PersistentConnectionToMasterMap 
persistentConnectionToMasterMap,
                              final ByteTransfer byteTransfer,
-                             final SerializerManager serializerManager) {
+                             final SerializerManager serializerManager,
+                             final BlockTransferConnectionQueue 
blockTransferConnectionQueue) {
     this.executorId = executorId;
     this.memoryStore = memoryStore;
     this.serializedMemoryStore = serializedMemoryStore;
@@ -107,6 +110,7 @@ private 
BlockManagerWorker(@Parameter(JobConf.ExecutorId.class) final String exe
     this.blockToRemainingRead = new ConcurrentHashMap<>();
     this.serializerManager = serializerManager;
     this.pendingBlockLocationRequest = new ConcurrentHashMap<>();
+    this.blockTransferConnectionQueue = blockTransferConnectionQueue;
   }
 
   /**
@@ -229,9 +233,13 @@ public Block createBlock(final String blockId,
             .setRuntimeEdgeId(runtimeEdgeId)
             
.setKeyRange(ByteString.copyFrom(SerializationUtils.serialize(keyRange)))
             .build();
-        return byteTransfer.newInputContext(targetExecutorId, 
descriptor.toByteArray())
-            .thenCompose(context -> context.getCompletedFuture())
-            .thenApply(streams -> new DataUtil.InputStreamIterator(streams,
+        final CompletableFuture<ByteInputContext> contextFuture = 
blockTransferConnectionQueue
+            .requestConnectPermission(runtimeEdgeId)
+            .thenCompose(obj -> byteTransfer.newInputContext(targetExecutorId, 
descriptor.toByteArray()));
+        contextFuture.thenApply(context -> context.getCompletedFuture()
+            .thenAccept(f -> 
blockTransferConnectionQueue.onConnectionFinished(runtimeEdgeId)));
+        return contextFuture
+            .thenApply(context -> new 
DataUtil.InputStreamIterator(context.getInputStreams(),
                 serializerManager.getSerializer(runtimeEdgeId)));
       }
     });
diff --git 
a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/BlockTransferConnectionQueue.java
 
b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/BlockTransferConnectionQueue.java
new file mode 100644
index 00000000..0870082c
--- /dev/null
+++ 
b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/BlockTransferConnectionQueue.java
@@ -0,0 +1,83 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.snu.nemo.runtime.executor.data;
+
+import edu.snu.nemo.conf.JobConf;
+import org.apache.reef.tang.annotations.Parameter;
+
+import javax.inject.Inject;
+import java.util.ArrayDeque;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Queue;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * A class to restrict parallel connection per runtime edge.
+ * Executors can suffer from performance degradation and network-related 
exceptions when there are massive connections,
+ * especially under low network bandwidth or high volume of data.
+ */
+public final class BlockTransferConnectionQueue {
+  private final Map<String, Integer> runtimeEdgeIdToNumCurrentConnections = 
new HashMap<>();
+  private final Map<String, Queue<CompletableFuture<Void>>> 
runtimeEdgeIdToPendingConnections = new HashMap<>();
+  private final int maxNum;
+
+  @Inject
+  private 
BlockTransferConnectionQueue(@Parameter(JobConf.MaxNumDownloadsForARuntimeEdge.class)
 final int maxNum) {
+    this.maxNum = maxNum;
+  }
+
+  /**
+   * Request a permission to make a connection.
+   * @param runtimeEdgeId the corresponding runtime edge id.
+   * @return a future that will be completed when the connection is granted.
+   */
+  public synchronized CompletableFuture<Void> requestConnectPermission(final 
String runtimeEdgeId) {
+    runtimeEdgeIdToNumCurrentConnections.putIfAbsent(runtimeEdgeId, 0);
+    runtimeEdgeIdToPendingConnections.computeIfAbsent(runtimeEdgeId, id -> new 
ArrayDeque<>());
+    final int currentOutstandingConnections = 
runtimeEdgeIdToNumCurrentConnections.get(runtimeEdgeId);
+
+    if (currentOutstandingConnections < maxNum) {
+      // grant immediately
+      runtimeEdgeIdToNumCurrentConnections.put(runtimeEdgeId, 
currentOutstandingConnections + 1);
+      return CompletableFuture.completedFuture(null);
+    } else {
+      // add to pending queue
+      final CompletableFuture<Void> future = new CompletableFuture<>();
+      runtimeEdgeIdToPendingConnections.get(runtimeEdgeId).add(future);
+      return future;
+    }
+  }
+
+  /**
+   * Indicates the connection has finished.
+   * @param runtimeEdgeId the corresponding runtime edge id.
+   */
+  public synchronized void onConnectionFinished(final String runtimeEdgeId) {
+    final Queue<CompletableFuture<Void>> pendingConnections = 
runtimeEdgeIdToPendingConnections.get(runtimeEdgeId);
+    if (pendingConnections.size() == 0) {
+      // Just decrease the number of current connections.
+      // Since we have no pending connections, we leave pendingConnections 
queue untouched.
+      final int numCurrentConnections = 
runtimeEdgeIdToNumCurrentConnections.get(runtimeEdgeId);
+      runtimeEdgeIdToNumCurrentConnections.put(runtimeEdgeId, 
numCurrentConnections - 1);
+    } else {
+      // Since pendingConnections has at least one element, the poll method 
invocation will immediately return.
+      // One connection is completed, and another connection kicks in; the 
number of current connection stays same
+      final CompletableFuture<Void> nextFuture = pendingConnections.poll();
+      nextFuture.complete(null);
+    }
+  }
+}
diff --git 
a/tests/src/test/java/edu/snu/nemo/tests/runtime/executor/data/BlockTransferConnectionQueueTest.java
 
b/tests/src/test/java/edu/snu/nemo/tests/runtime/executor/data/BlockTransferConnectionQueueTest.java
new file mode 100644
index 00000000..bc1ba30c
--- /dev/null
+++ 
b/tests/src/test/java/edu/snu/nemo/tests/runtime/executor/data/BlockTransferConnectionQueueTest.java
@@ -0,0 +1,75 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.snu.nemo.tests.runtime.executor.data;
+
+import edu.snu.nemo.conf.JobConf;
+import edu.snu.nemo.runtime.executor.data.BlockTransferConnectionQueue;
+import org.apache.reef.tang.Configuration;
+import org.apache.reef.tang.Injector;
+import org.apache.reef.tang.Tang;
+import org.apache.reef.tang.exceptions.InjectionException;
+import org.junit.Test;
+
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import static org.junit.Assert.assertFalse;
+
+public final class BlockTransferConnectionQueueTest {
+  private static final String THREAD_NAME = 
BlockTransferConnectionQueue.class.getSimpleName() + "-TestThread";
+  private static final String RUNTIME_EDGE_0 = "RuntimeEdge0";
+  private static final int WAIT_TIME = 1000;
+  /**
+   * Creates {@link BlockTransferConnectionQueue} for testing.
+   * @param maxNum value for {@link JobConf.MaxNumDownloadsForARuntimeEdge} 
parameter.
+   * @return {@link BlockTransferConnectionQueue} object created.
+   */
+  private final BlockTransferConnectionQueue getQueue(final int maxNum) {
+    final Configuration conf = Tang.Factory.getTang().newConfigurationBuilder()
+        .bindNamedParameter(JobConf.MaxNumDownloadsForARuntimeEdge.class, 
String.valueOf(maxNum))
+        .build();
+    final Injector injector = Tang.Factory.getTang().newInjector(conf);
+    try {
+      return injector.getInstance(BlockTransferConnectionQueue.class);
+    } catch (final InjectionException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Test(timeout = WAIT_TIME * 2)
+  public void test() throws InterruptedException, ExecutionException {
+    final ExecutorService executorService = Executors.newSingleThreadExecutor(
+        runnable -> new Thread(runnable, THREAD_NAME));
+    final BlockTransferConnectionQueue queue = getQueue(3);
+    final Future executorServiceFuture = executorService.submit(() -> {
+      try {
+        queue.requestConnectPermission(RUNTIME_EDGE_0).get();
+        queue.requestConnectPermission(RUNTIME_EDGE_0).get();
+        queue.requestConnectPermission(RUNTIME_EDGE_0).get();
+        queue.requestConnectPermission(RUNTIME_EDGE_0).get();
+      } catch (final InterruptedException | ExecutionException e) {
+        throw new RuntimeException(e);
+      }
+    });
+    Thread.sleep(WAIT_TIME);
+    // We must have one pending connection request.
+    assertFalse(executorServiceFuture.isDone());
+    queue.onConnectionFinished(RUNTIME_EDGE_0);
+    // The remaining request should be accepted before test timeout.
+    executorServiceFuture.get();
+  }
+}


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to