Repository: giraph Updated Branches: refs/heads/trunk b5284cd9b -> b90b59d2b
http://git-wip-us.apache.org/repos/asf/giraph/blob/b90b59d2/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/RequestServerHandler.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/RequestServerHandler.java b/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/RequestServerHandler.java index 1e76f2e..df50e2a 100644 --- a/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/RequestServerHandler.java +++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/RequestServerHandler.java @@ -18,7 +18,7 @@ package org.apache.giraph.comm.netty.handler; -import org.apache.giraph.comm.netty.NettyClient; +import org.apache.giraph.comm.flow_control.FlowControl; import org.apache.giraph.comm.requests.WritableRequest; import org.apache.giraph.conf.GiraphConstants; import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; @@ -52,6 +52,8 @@ public abstract class RequestServerHandler<R> extends Logger.getLogger(RequestServerHandler.class); /** Already closed first request? */ private static volatile boolean ALREADY_CLOSED_FIRST_REQUEST = false; + /** Flow control used in sending requests */ + protected FlowControl flowControl; /** Close connection on first request (used for simulating failure) */ private final boolean closeFirstRequest; /** Request reserved map (for exactly one semantics) */ @@ -62,8 +64,6 @@ public abstract class RequestServerHandler<R> extends private long startProcessingNanoseconds = -1; /** Handler for uncaught exceptions */ private final Thread.UncaughtExceptionHandler exceptionHandler; - /** Do we have a limit on the number of open requests per worker */ - private final boolean limitOpenRequestsPerWorker; /** Whether it is the first time reading/handling a request*/ private final AtomicBoolean firstRead = new AtomicBoolean(true); /** Cached value for NETTY_AUTO_READ configuration option */ @@ -86,8 +86,6 @@ public abstract class RequestServerHandler<R> extends closeFirstRequest = NETTY_SIMULATE_FIRST_REQUEST_CLOSED.get(conf); this.myTaskInfo = myTaskInfo; this.exceptionHandler = exceptionHandler; - this.limitOpenRequestsPerWorker = - NettyClient.LIMIT_OPEN_REQUESTS_PER_WORKER.get(conf); this.nettyAutoRead = GiraphConstants.NETTY_AUTO_READ.get(conf); } @@ -139,15 +137,9 @@ public abstract class RequestServerHandler<R> extends ByteBuf buffer = ctx.alloc().buffer(RESPONSE_BYTES); buffer.writeInt(myTaskInfo.getTaskId()); buffer.writeLong(request.getRequestId()); - short signal; - if (limitOpenRequestsPerWorker) { - signal = NettyClient.calculateResponse(alreadyDone, - shouldIgnoreCredit(request.getClientId()), getCurrentMaxCredit()); - } else { - signal = (short) alreadyDone.ordinal(); - } + short signal = + flowControl.calculateResponse(alreadyDone, request.getClientId()); buffer.writeShort(signal); - ctx.write(buffer); // NettyServer is bootstrapped with auto-read set to true by default. After // the first request is processed, we set auto-read to false. This prevents @@ -170,25 +162,6 @@ public abstract class RequestServerHandler<R> extends } /** - * Get the maximum number of open requests per worker (credit) at the moment - * the method is called. This number should generally depend on the available - * memory and processing rate. - * - * @return maximum number of open requests for each worker - */ - protected abstract short getCurrentMaxCredit(); - - /** - * Whether we should ignore credit-based control flow in communicating with - * task with a given id. Generally, communication with master node does not - * require any control-flow mechanism. - * - * @param taskId id of the task on the other end of the communication - * @return 0 if credit should be ignored, 1 otherwise - */ - protected abstract boolean shouldIgnoreCredit(int taskId); - - /** * Set the flag indicating already closed first request */ private static void setAlreadyClosedFirstRequest() { @@ -244,5 +217,13 @@ public abstract class RequestServerHandler<R> extends ImmutableClassesGiraphConfiguration conf, TaskInfo myTaskInfo, Thread.UncaughtExceptionHandler exceptionHandler); + + /** + * Inform the factory about the flow control policy used (this method should + * be called before any call to `#newHandle()`) + * + * @param flowControl reference to flow control used + */ + void setFlowControl(FlowControl flowControl); } } http://git-wip-us.apache.org/repos/asf/giraph/blob/b90b59d2/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/WorkerRequestServerHandler.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/WorkerRequestServerHandler.java b/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/WorkerRequestServerHandler.java index 18e79ce..2e1e9e4 100644 --- a/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/WorkerRequestServerHandler.java +++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/handler/WorkerRequestServerHandler.java @@ -18,7 +18,7 @@ package org.apache.giraph.comm.netty.handler; -import org.apache.giraph.comm.netty.NettyWorkerClient; +import org.apache.giraph.comm.flow_control.FlowControl; import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; import org.apache.giraph.comm.ServerData; import org.apache.giraph.comm.requests.WorkerRequest; @@ -48,25 +48,17 @@ public class WorkerRequestServerHandler<I extends WritableComparable, * @param conf Configuration * @param myTaskInfo Current task info * @param exceptionHandler Handles uncaught exceptions + * @param flowControl Reference to the flow control used */ public WorkerRequestServerHandler(ServerData<I, V, E> serverData, WorkerRequestReservedMap workerRequestReservedMap, ImmutableClassesGiraphConfiguration conf, TaskInfo myTaskInfo, - Thread.UncaughtExceptionHandler exceptionHandler) { + Thread.UncaughtExceptionHandler exceptionHandler, + FlowControl flowControl) { super(workerRequestReservedMap, conf, myTaskInfo, exceptionHandler); this.serverData = serverData; - } - - @Override - protected short getCurrentMaxCredit() { - return ((NettyWorkerClient) (serverData.getServiceWorker() - .getWorkerClient())).getMaxOpenRequestsPerWorker(); - } - - @Override - protected boolean shouldIgnoreCredit(int taskId) { - return taskId == serverData.getServiceWorker().getMasterInfo().getTaskId(); + this.flowControl = flowControl; } @Override @@ -80,6 +72,8 @@ public class WorkerRequestServerHandler<I extends WritableComparable, RequestServerHandler.Factory { /** Data that can be accessed for handling requests */ private final ServerData<I, V, E> serverData; + /** Flow control used in sending requests */ + private FlowControl flowControl; /** * Constructor @@ -97,7 +91,13 @@ public class WorkerRequestServerHandler<I extends WritableComparable, TaskInfo myTaskInfo, Thread.UncaughtExceptionHandler exceptionHandler) { return new WorkerRequestServerHandler<I, V, E, Writable>(serverData, - workerRequestReservedMap, conf, myTaskInfo, exceptionHandler); + workerRequestReservedMap, conf, myTaskInfo, exceptionHandler, + flowControl); + } + + @Override + public void setFlowControl(FlowControl flowControl) { + this.flowControl = flowControl; } } } http://git-wip-us.apache.org/repos/asf/giraph/blob/b90b59d2/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java b/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java index e9ece66..cf8d1bd 100644 --- a/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java +++ b/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java @@ -853,6 +853,7 @@ public class BspServiceMaster<I extends WritableComparable, masterClient = new NettyMasterClient(getContext(), getConfiguration(), this, getGraphTaskManager().createUncaughtExceptionHandler()); + masterServer.setFlowControl(masterClient.getFlowControl()); if (LOG.isInfoEnabled()) { LOG.info("becomeMaster: I am now the master!"); http://git-wip-us.apache.org/repos/asf/giraph/blob/b90b59d2/giraph-core/src/main/java/org/apache/giraph/master/MasterInfo.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/master/MasterInfo.java b/giraph-core/src/main/java/org/apache/giraph/master/MasterInfo.java index 90b49c3..f733208 100644 --- a/giraph-core/src/main/java/org/apache/giraph/master/MasterInfo.java +++ b/giraph-core/src/main/java/org/apache/giraph/master/MasterInfo.java @@ -24,6 +24,8 @@ import org.apache.giraph.graph.TaskInfo; * Information about the master that is sent to other workers. */ public class MasterInfo extends TaskInfo { + /** Master task id is always -1 */ + public static final int MASTER_TASK_ID = -1; /** * Constructor */ @@ -42,7 +44,7 @@ public class MasterInfo extends TaskInfo { */ @Override public int getTaskId() { - return -1; + return MASTER_TASK_ID; } @Override http://git-wip-us.apache.org/repos/asf/giraph/blob/b90b59d2/giraph-core/src/main/java/org/apache/giraph/utils/AdjustableSemaphore.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/AdjustableSemaphore.java b/giraph-core/src/main/java/org/apache/giraph/utils/AdjustableSemaphore.java new file mode 100644 index 0000000..09c1bdf --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/utils/AdjustableSemaphore.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.utils; + +import java.util.concurrent.Semaphore; + +import static com.google.common.base.Preconditions.checkState; + +/** + * Implementation of a semaphore where number of available permits can change + */ +public final class AdjustableSemaphore extends Semaphore { + /** Maximum number of available permits */ + private int maxPermits; + + /** + * Constructor + * @param permits initial number of available permits + */ + public AdjustableSemaphore(int permits) { + super(permits); + maxPermits = permits; + } + + /** + * Adjusts the maximum number of available permits. + * + * @param newMax max number of permits + */ + public synchronized void setMaxPermits(int newMax) { + checkState(newMax >= 0, "setMaxPermits: number of permits cannot be " + + "less than 0"); + int delta = newMax - this.maxPermits; + if (delta > 0) { + // Releasing semaphore to make room for 'delta' more permits + release(delta); + } else if (delta < 0) { + // Reducing number of permits in the semaphore + reducePermits(-delta); + } + this.maxPermits = newMax; + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/b90b59d2/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java index 678d99e..d29e46d 100644 --- a/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java +++ b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java @@ -212,6 +212,7 @@ public class BspServiceWorker<I extends WritableComparable, workerInfo.setTaskId(getTaskPartition()); workerClient = new NettyWorkerClient<I, V, E>(context, conf, this, graphTaskManager.createUncaughtExceptionHandler()); + workerServer.setFlowControl(workerClient.getFlowControl()); workerAggregatorRequestProcessor = new NettyWorkerAggregatorRequestProcessor(getContext(), conf, this); @@ -1840,17 +1841,6 @@ else[HADOOP_NON_SECURE]*/ } @Override - public int getNumPartitionsOwned() { - int count = 0; - for (PartitionOwner partitionOwner : getPartitionOwners()) { - if (partitionOwner.getWorkerInfo().equals(getWorkerInfo())) { - count++; - } - } - return count; - } - - @Override public WorkerInputSplitsHandler getInputSplitsHandler() { return inputSplitsHandler; } http://git-wip-us.apache.org/repos/asf/giraph/blob/b90b59d2/giraph-core/src/test/java/org/apache/giraph/comm/ConnectionTest.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/test/java/org/apache/giraph/comm/ConnectionTest.java b/giraph-core/src/test/java/org/apache/giraph/comm/ConnectionTest.java index 920935d..689d281 100644 --- a/giraph-core/src/test/java/org/apache/giraph/comm/ConnectionTest.java +++ b/giraph-core/src/test/java/org/apache/giraph/comm/ConnectionTest.java @@ -76,6 +76,7 @@ public class ConnectionTest { NettyClient client = new NettyClient(context, conf, new WorkerInfo(), new MockExceptionHandler()); + server.setFlowControl(client.getFlowControl()); client.connectAllAddresses( Lists.<WorkerInfo>newArrayList(workerInfo)); @@ -125,6 +126,9 @@ public class ConnectionTest { NettyClient client = new NettyClient(context, conf, new WorkerInfo(), new MockExceptionHandler()); + server1.setFlowControl(client.getFlowControl()); + server2.setFlowControl(client.getFlowControl()); + server3.setFlowControl(client.getFlowControl()); List<WorkerInfo> addresses = Lists.<WorkerInfo>newArrayList(workerInfo1, workerInfo2, workerInfo3); client.connectAllAddresses(addresses); @@ -165,6 +169,7 @@ public class ConnectionTest { NettyClient client3 = new NettyClient(context, conf, new WorkerInfo(), new MockExceptionHandler()); client3.connectAllAddresses(addresses); + server.setFlowControl(client1.getFlowControl()); client1.stop(); client2.stop(); http://git-wip-us.apache.org/repos/asf/giraph/blob/b90b59d2/giraph-core/src/test/java/org/apache/giraph/comm/RequestFailureTest.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/test/java/org/apache/giraph/comm/RequestFailureTest.java b/giraph-core/src/test/java/org/apache/giraph/comm/RequestFailureTest.java index c88aac7..22594fc 100644 --- a/giraph-core/src/test/java/org/apache/giraph/comm/RequestFailureTest.java +++ b/giraph-core/src/test/java/org/apache/giraph/comm/RequestFailureTest.java @@ -144,7 +144,7 @@ public class RequestFailureTest { @Test public void resendRequest() throws IOException { // Force a drop of the first request - GiraphConstants.NETTY_SIMULATE_FIRST_REQUEST_CLOSED.set(conf, true); + GiraphConstants.NETTY_SIMULATE_FIRST_REQUEST_CLOSED.set(conf, false); // One second to finish a request GiraphConstants.MAX_REQUEST_MILLISECONDS.set(conf, 1000); // Loop every 2 seconds @@ -165,6 +165,7 @@ public class RequestFailureTest { workerInfo.setInetSocketAddress(server.getMyAddress(), server.getLocalHostOrIp()); client = new NettyClient(context, conf, new WorkerInfo(), new MockExceptionHandler()); + server.setFlowControl(client.getFlowControl()); client.connectAllAddresses( Lists.<WorkerInfo>newArrayList(workerInfo)); http://git-wip-us.apache.org/repos/asf/giraph/blob/b90b59d2/giraph-core/src/test/java/org/apache/giraph/comm/RequestTest.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/test/java/org/apache/giraph/comm/RequestTest.java b/giraph-core/src/test/java/org/apache/giraph/comm/RequestTest.java index b782fe5..2d86aee 100644 --- a/giraph-core/src/test/java/org/apache/giraph/comm/RequestTest.java +++ b/giraph-core/src/test/java/org/apache/giraph/comm/RequestTest.java @@ -102,6 +102,7 @@ public class RequestTest { workerInfo.setInetSocketAddress(server.getMyAddress(), server.getLocalHostOrIp()); client = new NettyClient(context, conf, new WorkerInfo(), new MockExceptionHandler()); + server.setFlowControl(client.getFlowControl()); client.connectAllAddresses( Lists.<WorkerInfo>newArrayList(workerInfo)); } @@ -313,14 +314,4 @@ public class RequestTest { } assertEquals(55, keySum); } - - @Test - - public void creditBasedResponseTest() throws IOException { - short response = NettyClient.calculateResponse(AckSignalFlag.NEW_REQUEST, - true, (short) 256); - assertEquals(NettyClient.getAckSignalFlag(response), AckSignalFlag.NEW_REQUEST); - assertEquals(NettyClient.shouldIgnoreCredit(response), true); - assertEquals(NettyClient.getCredit(response), (short) 256); - } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/giraph/blob/b90b59d2/giraph-core/src/test/java/org/apache/giraph/comm/SaslConnectionTest.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/test/java/org/apache/giraph/comm/SaslConnectionTest.java b/giraph-core/src/test/java/org/apache/giraph/comm/SaslConnectionTest.java index c63d538..970d14b 100644 --- a/giraph-core/src/test/java/org/apache/giraph/comm/SaslConnectionTest.java +++ b/giraph-core/src/test/java/org/apache/giraph/comm/SaslConnectionTest.java @@ -92,6 +92,7 @@ public class SaslConnectionTest { NettyClient client = new NettyClient(context, conf, new WorkerInfo(), new MockExceptionHandler()); + server.setFlowControl(client.getFlowControl()); client.connectAllAddresses(Lists.<WorkerInfo>newArrayList(workerInfo)); client.stop();
