Removed nodeId from message.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/d0b4aa63 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/d0b4aa63 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/d0b4aa63 Branch: refs/heads/ignite-3553 Commit: d0b4aa633854dbb7358259591f24bd3814096ddf Parents: 16d4283 Author: vozerov-gridgain <[email protected]> Authored: Wed Jul 27 15:57:40 2016 +0300 Committer: vozerov-gridgain <[email protected]> Committed: Wed Jul 27 15:57:40 2016 +0300 ---------------------------------------------------------------------- .../igfs/client/IgfsClientInOperation.java | 64 ++++++++++++++++++++ .../igfs/client/IgfsClientManager.java | 19 +++--- .../igfs/client/IgfsClientOutOperation.java | 2 +- .../igfs/client/IgfsClientRequest.java | 31 +--------- 4 files changed, 79 insertions(+), 37 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/d0b4aa63/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/client/IgfsClientInOperation.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/client/IgfsClientInOperation.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/client/IgfsClientInOperation.java new file mode 100644 index 0000000..9a53067 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/client/IgfsClientInOperation.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.igfs.client; + +import org.apache.ignite.internal.util.future.GridFutureAdapter; +import org.apache.ignite.internal.util.typedef.internal.S; + +import java.util.UUID; + +/** + * IGFS client closure incoming operation descriptor. + */ +public class IgfsClientInOperation { + /** Target node ID. */ + private final UUID nodeId; + + /** Request. */ + private final IgfsClientRequest req; + + /** + * Constructor. + * + * @param nodeId Target node ID. + * @param req Request. + */ + public IgfsClientInOperation(UUID nodeId, IgfsClientRequest req) { + this.nodeId = nodeId; + this.req = req; + } + + /** + * @return Target node ID. + */ + public UUID nodeId() { + return nodeId; + } + + /** + * @return Target operation. + */ + public IgfsClientRequest request() { + return req; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(IgfsClientInOperation.class, this); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/d0b4aa63/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/client/IgfsClientManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/client/IgfsClientManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/client/IgfsClientManager.java index 9504cf4..357d5d9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/client/IgfsClientManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/client/IgfsClientManager.java @@ -43,7 +43,7 @@ import java.util.concurrent.ConcurrentLinkedDeque; */ public class IgfsClientManager extends IgfsManager { /** Pending input operations received when manager is not started yet. */ - private final ConcurrentLinkedDeque<IgfsClientRequest> pending = new ConcurrentLinkedDeque<>(); + private final ConcurrentLinkedDeque<IgfsClientInOperation> pending = new ConcurrentLinkedDeque<>(); /** Outgoing operations. */ private final Map<Long, IgfsClientOutOperation> outOps = new ConcurrentHashMap<>(); @@ -168,9 +168,10 @@ public class IgfsClientManager extends IgfsManager { /** * Handle request. * + * @param nodeId Node ID. * @param req Request. */ - private void onRequest(IgfsClientRequest req) { + private void onRequest(UUID nodeId, IgfsClientRequest req) { rwLock.readLock().lock(); try { @@ -178,9 +179,10 @@ public class IgfsClientManager extends IgfsManager { return; // Discovery listener on remote node will handle node leave. if (ready) - processRequest(req); // Normal execution flow. + processRequest(nodeId, req); // Normal execution flow. else - pending.add(req); // Add to pending set if manager is not fully started yet. + // Add to pending set if manager is not operational yet. + pending.add(new IgfsClientInOperation(nodeId, req)); } finally { rwLock.readLock().unlock(); @@ -248,9 +250,10 @@ public class IgfsClientManager extends IgfsManager { /** * Actual request processing. Happens inside appropriate thread pool. * + * @param nodeId Node ID. * @param req Request. */ - private void processRequest(IgfsClientRequest req) { + private void processRequest(UUID nodeId, IgfsClientRequest req) { IgfsClientResponse resp; try { @@ -272,10 +275,10 @@ public class IgfsClientManager extends IgfsManager { // Send response. try { - ctx.io().send(req.nodeId(), GridTopic.TOPIC_IGFS_CLI, resp, GridIoPolicy.PUBLIC_POOL); + ctx.io().send(nodeId, GridTopic.TOPIC_IGFS_CLI, resp, GridIoPolicy.PUBLIC_POOL); } catch (IgniteCheckedException e) { - U.error(log, "Failed to send IGFS client response [nodeId=" + req.nodeId() + + U.error(log, "Failed to send IGFS client response [nodeId=" + nodeId + ", msgId=" + req.messageId() + ']', e); } } @@ -295,7 +298,7 @@ public class IgfsClientManager extends IgfsManager { assert msg != null; if (msg instanceof IgfsClientRequest) - onRequest((IgfsClientRequest)msg); + onRequest(nodeId, (IgfsClientRequest)msg); else if (msg instanceof IgfsClientResponse) onResponse((IgfsClientResponse)msg); else http://git-wip-us.apache.org/repos/asf/ignite/blob/d0b4aa63/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/client/IgfsClientOutOperation.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/client/IgfsClientOutOperation.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/client/IgfsClientOutOperation.java index 03c066b..47cea16 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/client/IgfsClientOutOperation.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/client/IgfsClientOutOperation.java @@ -24,7 +24,7 @@ import org.apache.ignite.internal.util.typedef.internal.S; import java.util.UUID; /** - * IGFS client closure outgoing opeartion descriptor. + * IGFS client closure outgoing operation descriptor. */ public class IgfsClientOutOperation { /** Target node ID. */ http://git-wip-us.apache.org/repos/asf/ignite/blob/d0b4aa63/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/client/IgfsClientRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/client/IgfsClientRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/client/IgfsClientRequest.java index 49fe523..f7f7ebf 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/client/IgfsClientRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/client/IgfsClientRequest.java @@ -31,10 +31,7 @@ import java.util.UUID; */ public class IgfsClientRequest implements Message { /** Base fields (all except of target) count. */ - private static final byte BASE_FIELDS_CNT = 3; - - /** Originating node ID. */ - private UUID nodeId; + private static final byte BASE_FIELDS_CNT = 2; /** Message ID. */ private long msgId; @@ -61,19 +58,11 @@ public class IgfsClientRequest implements Message { assert nodeId != null; assert target != null; - this.nodeId = nodeId; this.msgId = msgId; this.target = target; } /** - * @return Node ID. - */ - public UUID nodeId() { - return nodeId; - } - - /** * @return Message ID. */ public long messageId() { @@ -117,18 +106,12 @@ public class IgfsClientRequest implements Message { switch (writer.state()) { case 0: - if (!writer.writeUuid("nodeId", nodeId)) - return false; - - writer.incrementState(); - - case 1: if (!writer.writeLong("msgId", msgId)) return false; writer.incrementState(); - case 2: + case 1: if (!writer.writeShort("typeId", target.typeId())) return false; @@ -155,14 +138,6 @@ public class IgfsClientRequest implements Message { switch (reader.state()) { case 0: - nodeId = reader.readUuid("nodeId"); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - - case 1: msgId = reader.readLong("msgId"); if (!reader.isLastRead()) @@ -170,7 +145,7 @@ public class IgfsClientRequest implements Message { reader.incrementState(); - case 2: + case 1: short typeId; typeId = reader.readShort("typeId");
