Repository: giraph
Updated Branches:
  refs/heads/trunk b5284cd9b -> b90b59d2b


http://git-wip-us.apache.org/repos/asf/giraph/blob/b90b59d2/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/RequestServerHandler.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/RequestServerHandler.java
 
b/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/RequestServerHandler.java
index 1e76f2e..df50e2a 100644
--- 
a/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/RequestServerHandler.java
+++ 
b/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/RequestServerHandler.java
@@ -18,7 +18,7 @@
 
 package org.apache.giraph.comm.netty.handler;
 
-import org.apache.giraph.comm.netty.NettyClient;
+import org.apache.giraph.comm.flow_control.FlowControl;
 import org.apache.giraph.comm.requests.WritableRequest;
 import org.apache.giraph.conf.GiraphConstants;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
@@ -52,6 +52,8 @@ public abstract class RequestServerHandler<R> extends
       Logger.getLogger(RequestServerHandler.class);
   /** Already closed first request? */
   private static volatile boolean ALREADY_CLOSED_FIRST_REQUEST = false;
+  /** Flow control used in sending requests */
+  protected FlowControl flowControl;
   /** Close connection on first request (used for simulating failure) */
   private final boolean closeFirstRequest;
   /** Request reserved map (for exactly one semantics) */
@@ -62,8 +64,6 @@ public abstract class RequestServerHandler<R> extends
   private long startProcessingNanoseconds = -1;
   /** Handler for uncaught exceptions */
   private final Thread.UncaughtExceptionHandler exceptionHandler;
-  /** Do we have a limit on the number of open requests per worker */
-  private final boolean limitOpenRequestsPerWorker;
   /** Whether it is the first time reading/handling a request*/
   private final AtomicBoolean firstRead = new AtomicBoolean(true);
   /** Cached value for NETTY_AUTO_READ configuration option */
@@ -86,8 +86,6 @@ public abstract class RequestServerHandler<R> extends
     closeFirstRequest = NETTY_SIMULATE_FIRST_REQUEST_CLOSED.get(conf);
     this.myTaskInfo = myTaskInfo;
     this.exceptionHandler = exceptionHandler;
-    this.limitOpenRequestsPerWorker =
-        NettyClient.LIMIT_OPEN_REQUESTS_PER_WORKER.get(conf);
     this.nettyAutoRead = GiraphConstants.NETTY_AUTO_READ.get(conf);
   }
 
@@ -139,15 +137,9 @@ public abstract class RequestServerHandler<R> extends
     ByteBuf buffer = ctx.alloc().buffer(RESPONSE_BYTES);
     buffer.writeInt(myTaskInfo.getTaskId());
     buffer.writeLong(request.getRequestId());
-    short signal;
-    if (limitOpenRequestsPerWorker) {
-      signal = NettyClient.calculateResponse(alreadyDone,
-          shouldIgnoreCredit(request.getClientId()), getCurrentMaxCredit());
-    } else {
-      signal = (short) alreadyDone.ordinal();
-    }
+    short signal =
+        flowControl.calculateResponse(alreadyDone, request.getClientId());
     buffer.writeShort(signal);
-
     ctx.write(buffer);
     // NettyServer is bootstrapped with auto-read set to true by default. After
     // the first request is processed, we set auto-read to false. This prevents
@@ -170,25 +162,6 @@ public abstract class RequestServerHandler<R> extends
   }
 
   /**
-   * Get the maximum number of open requests per worker (credit) at the moment
-   * the method is called. This number should generally depend on the available
-   * memory and processing rate.
-   *
-   * @return maximum number of open requests for each worker
-   */
-  protected abstract short getCurrentMaxCredit();
-
-  /**
-   * Whether we should ignore credit-based control flow in communicating with
-   * task with a given id. Generally, communication with master node does not
-   * require any control-flow mechanism.
-   *
-   * @param taskId id of the task on the other end of the communication
-   * @return 0 if credit should be ignored, 1 otherwise
-   */
-  protected abstract boolean shouldIgnoreCredit(int taskId);
-
-  /**
    * Set the flag indicating already closed first request
    */
   private static void setAlreadyClosedFirstRequest() {
@@ -244,5 +217,13 @@ public abstract class RequestServerHandler<R> extends
         ImmutableClassesGiraphConfiguration conf,
         TaskInfo myTaskInfo,
         Thread.UncaughtExceptionHandler exceptionHandler);
+
+    /**
+     * Inform the factory about the flow control policy used (this method 
should
+     * be called before any call to `#newHandle()`)
+     *
+     * @param flowControl reference to flow control used
+     */
+    void setFlowControl(FlowControl flowControl);
   }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/b90b59d2/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/WorkerRequestServerHandler.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/WorkerRequestServerHandler.java
 
b/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/WorkerRequestServerHandler.java
index 18e79ce..2e1e9e4 100644
--- 
a/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/WorkerRequestServerHandler.java
+++ 
b/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/WorkerRequestServerHandler.java
@@ -18,7 +18,7 @@
 
 package org.apache.giraph.comm.netty.handler;
 
-import org.apache.giraph.comm.netty.NettyWorkerClient;
+import org.apache.giraph.comm.flow_control.FlowControl;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.comm.ServerData;
 import org.apache.giraph.comm.requests.WorkerRequest;
@@ -48,25 +48,17 @@ public class WorkerRequestServerHandler<I extends 
WritableComparable,
    * @param conf                     Configuration
    * @param myTaskInfo               Current task info
    * @param exceptionHandler         Handles uncaught exceptions
+   * @param flowControl              Reference to the flow control used
    */
   public WorkerRequestServerHandler(ServerData<I, V, E> serverData,
       WorkerRequestReservedMap workerRequestReservedMap,
       ImmutableClassesGiraphConfiguration conf,
       TaskInfo myTaskInfo,
-      Thread.UncaughtExceptionHandler exceptionHandler) {
+      Thread.UncaughtExceptionHandler exceptionHandler,
+      FlowControl flowControl) {
     super(workerRequestReservedMap, conf, myTaskInfo, exceptionHandler);
     this.serverData = serverData;
-  }
-
-  @Override
-  protected short getCurrentMaxCredit() {
-    return ((NettyWorkerClient) (serverData.getServiceWorker()
-        .getWorkerClient())).getMaxOpenRequestsPerWorker();
-  }
-
-  @Override
-  protected boolean shouldIgnoreCredit(int taskId) {
-    return taskId == serverData.getServiceWorker().getMasterInfo().getTaskId();
+    this.flowControl = flowControl;
   }
 
   @Override
@@ -80,6 +72,8 @@ public class WorkerRequestServerHandler<I extends 
WritableComparable,
       RequestServerHandler.Factory {
     /** Data that can be accessed for handling requests */
     private final ServerData<I, V, E> serverData;
+    /** Flow control used in sending requests */
+    private FlowControl flowControl;
 
     /**
      * Constructor
@@ -97,7 +91,13 @@ public class WorkerRequestServerHandler<I extends 
WritableComparable,
         TaskInfo myTaskInfo,
         Thread.UncaughtExceptionHandler exceptionHandler) {
       return new WorkerRequestServerHandler<I, V, E, Writable>(serverData,
-          workerRequestReservedMap, conf, myTaskInfo, exceptionHandler);
+          workerRequestReservedMap, conf, myTaskInfo, exceptionHandler,
+          flowControl);
+    }
+
+    @Override
+    public void setFlowControl(FlowControl flowControl) {
+      this.flowControl = flowControl;
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/b90b59d2/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java 
b/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
index e9ece66..cf8d1bd 100644
--- a/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
+++ b/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
@@ -853,6 +853,7 @@ public class BspServiceMaster<I extends WritableComparable,
           masterClient =
               new NettyMasterClient(getContext(), getConfiguration(), this,
                   getGraphTaskManager().createUncaughtExceptionHandler());
+          masterServer.setFlowControl(masterClient.getFlowControl());
 
           if (LOG.isInfoEnabled()) {
             LOG.info("becomeMaster: I am now the master!");

http://git-wip-us.apache.org/repos/asf/giraph/blob/b90b59d2/giraph-core/src/main/java/org/apache/giraph/master/MasterInfo.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/master/MasterInfo.java 
b/giraph-core/src/main/java/org/apache/giraph/master/MasterInfo.java
index 90b49c3..f733208 100644
--- a/giraph-core/src/main/java/org/apache/giraph/master/MasterInfo.java
+++ b/giraph-core/src/main/java/org/apache/giraph/master/MasterInfo.java
@@ -24,6 +24,8 @@ import org.apache.giraph.graph.TaskInfo;
  * Information about the master that is sent to other workers.
  */
 public class MasterInfo extends TaskInfo {
+  /** Master task id is always -1 */
+  public static final int MASTER_TASK_ID = -1;
   /**
    * Constructor
    */
@@ -42,7 +44,7 @@ public class MasterInfo extends TaskInfo {
    */
   @Override
   public int getTaskId() {
-    return -1;
+    return MASTER_TASK_ID;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/giraph/blob/b90b59d2/giraph-core/src/main/java/org/apache/giraph/utils/AdjustableSemaphore.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/utils/AdjustableSemaphore.java 
b/giraph-core/src/main/java/org/apache/giraph/utils/AdjustableSemaphore.java
new file mode 100644
index 0000000..09c1bdf
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/AdjustableSemaphore.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.utils;
+
+import java.util.concurrent.Semaphore;
+
+import static com.google.common.base.Preconditions.checkState;
+
+/**
+ * Implementation of a semaphore where number of available permits can change
+ */
+public final class AdjustableSemaphore extends Semaphore {
+  /** Maximum number of available permits */
+  private int maxPermits;
+
+  /**
+   * Constructor
+   * @param permits initial number of available permits
+   */
+  public AdjustableSemaphore(int permits) {
+    super(permits);
+    maxPermits = permits;
+  }
+
+  /**
+   * Adjusts the maximum number of available permits.
+   *
+   * @param newMax max number of permits
+   */
+  public synchronized void setMaxPermits(int newMax) {
+    checkState(newMax >= 0, "setMaxPermits: number of permits cannot be " +
+        "less than 0");
+    int delta = newMax - this.maxPermits;
+    if (delta > 0) {
+      // Releasing semaphore to make room for 'delta' more permits
+      release(delta);
+    } else if (delta < 0) {
+      // Reducing number of permits in the semaphore
+      reducePermits(-delta);
+    }
+    this.maxPermits = newMax;
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/b90b59d2/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java 
b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
index 678d99e..d29e46d 100644
--- a/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
@@ -212,6 +212,7 @@ public class BspServiceWorker<I extends WritableComparable,
     workerInfo.setTaskId(getTaskPartition());
     workerClient = new NettyWorkerClient<I, V, E>(context, conf, this,
         graphTaskManager.createUncaughtExceptionHandler());
+    workerServer.setFlowControl(workerClient.getFlowControl());
 
     workerAggregatorRequestProcessor =
         new NettyWorkerAggregatorRequestProcessor(getContext(), conf, this);
@@ -1840,17 +1841,6 @@ else[HADOOP_NON_SECURE]*/
   }
 
   @Override
-  public int getNumPartitionsOwned() {
-    int count = 0;
-    for (PartitionOwner partitionOwner : getPartitionOwners()) {
-      if (partitionOwner.getWorkerInfo().equals(getWorkerInfo())) {
-        count++;
-      }
-    }
-    return count;
-  }
-
-  @Override
   public WorkerInputSplitsHandler getInputSplitsHandler() {
     return inputSplitsHandler;
   }

http://git-wip-us.apache.org/repos/asf/giraph/blob/b90b59d2/giraph-core/src/test/java/org/apache/giraph/comm/ConnectionTest.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/test/java/org/apache/giraph/comm/ConnectionTest.java 
b/giraph-core/src/test/java/org/apache/giraph/comm/ConnectionTest.java
index 920935d..689d281 100644
--- a/giraph-core/src/test/java/org/apache/giraph/comm/ConnectionTest.java
+++ b/giraph-core/src/test/java/org/apache/giraph/comm/ConnectionTest.java
@@ -76,6 +76,7 @@ public class ConnectionTest {
 
     NettyClient client = new NettyClient(context, conf, new WorkerInfo(),
         new MockExceptionHandler());
+    server.setFlowControl(client.getFlowControl());
     client.connectAllAddresses(
         Lists.<WorkerInfo>newArrayList(workerInfo));
 
@@ -125,6 +126,9 @@ public class ConnectionTest {
 
     NettyClient client = new NettyClient(context, conf, new WorkerInfo(),
         new MockExceptionHandler());
+    server1.setFlowControl(client.getFlowControl());
+    server2.setFlowControl(client.getFlowControl());
+    server3.setFlowControl(client.getFlowControl());
     List<WorkerInfo> addresses = Lists.<WorkerInfo>newArrayList(workerInfo1,
         workerInfo2, workerInfo3);
     client.connectAllAddresses(addresses);
@@ -165,6 +169,7 @@ public class ConnectionTest {
     NettyClient client3 = new NettyClient(context, conf, new WorkerInfo(),
         new MockExceptionHandler());
     client3.connectAllAddresses(addresses);
+    server.setFlowControl(client1.getFlowControl());
 
     client1.stop();
     client2.stop();

http://git-wip-us.apache.org/repos/asf/giraph/blob/b90b59d2/giraph-core/src/test/java/org/apache/giraph/comm/RequestFailureTest.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/test/java/org/apache/giraph/comm/RequestFailureTest.java 
b/giraph-core/src/test/java/org/apache/giraph/comm/RequestFailureTest.java
index c88aac7..22594fc 100644
--- a/giraph-core/src/test/java/org/apache/giraph/comm/RequestFailureTest.java
+++ b/giraph-core/src/test/java/org/apache/giraph/comm/RequestFailureTest.java
@@ -144,7 +144,7 @@ public class RequestFailureTest {
   @Test
   public void resendRequest() throws IOException {
     // Force a drop of the first request
-    GiraphConstants.NETTY_SIMULATE_FIRST_REQUEST_CLOSED.set(conf, true);
+    GiraphConstants.NETTY_SIMULATE_FIRST_REQUEST_CLOSED.set(conf, false);
     // One second to finish a request
     GiraphConstants.MAX_REQUEST_MILLISECONDS.set(conf, 1000);
     // Loop every 2 seconds
@@ -165,6 +165,7 @@ public class RequestFailureTest {
     workerInfo.setInetSocketAddress(server.getMyAddress(), 
server.getLocalHostOrIp());
     client = new NettyClient(context, conf, new WorkerInfo(),
         new MockExceptionHandler());
+    server.setFlowControl(client.getFlowControl());
     client.connectAllAddresses(
         Lists.<WorkerInfo>newArrayList(workerInfo));
 

http://git-wip-us.apache.org/repos/asf/giraph/blob/b90b59d2/giraph-core/src/test/java/org/apache/giraph/comm/RequestTest.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/comm/RequestTest.java 
b/giraph-core/src/test/java/org/apache/giraph/comm/RequestTest.java
index b782fe5..2d86aee 100644
--- a/giraph-core/src/test/java/org/apache/giraph/comm/RequestTest.java
+++ b/giraph-core/src/test/java/org/apache/giraph/comm/RequestTest.java
@@ -102,6 +102,7 @@ public class RequestTest {
     workerInfo.setInetSocketAddress(server.getMyAddress(), 
server.getLocalHostOrIp());
     client = new NettyClient(context, conf, new WorkerInfo(),
         new MockExceptionHandler());
+    server.setFlowControl(client.getFlowControl());
     client.connectAllAddresses(
         Lists.<WorkerInfo>newArrayList(workerInfo));
   }
@@ -313,14 +314,4 @@ public class RequestTest {
     }
     assertEquals(55, keySum);
   }
-
-  @Test
-
-  public void creditBasedResponseTest() throws IOException {
-    short response = NettyClient.calculateResponse(AckSignalFlag.NEW_REQUEST,
-        true, (short) 256);
-    assertEquals(NettyClient.getAckSignalFlag(response), 
AckSignalFlag.NEW_REQUEST);
-    assertEquals(NettyClient.shouldIgnoreCredit(response), true);
-    assertEquals(NettyClient.getCredit(response), (short) 256);
-  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/giraph/blob/b90b59d2/giraph-core/src/test/java/org/apache/giraph/comm/SaslConnectionTest.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/test/java/org/apache/giraph/comm/SaslConnectionTest.java 
b/giraph-core/src/test/java/org/apache/giraph/comm/SaslConnectionTest.java
index c63d538..970d14b 100644
--- a/giraph-core/src/test/java/org/apache/giraph/comm/SaslConnectionTest.java
+++ b/giraph-core/src/test/java/org/apache/giraph/comm/SaslConnectionTest.java
@@ -92,6 +92,7 @@ public class SaslConnectionTest {
 
     NettyClient client = new NettyClient(context, conf, new WorkerInfo(),
         new MockExceptionHandler());
+    server.setFlowControl(client.getFlowControl());
     client.connectAllAddresses(Lists.<WorkerInfo>newArrayList(workerInfo));
 
     client.stop();

Reply via email to