Repository: ignite Updated Branches: refs/heads/ignite-3553 6a3e59192 -> b77bee0fe
Added message processing logic. Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/b5322e8a Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/b5322e8a Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/b5322e8a Branch: refs/heads/ignite-3553 Commit: b5322e8af63bd1194acc1c1fc0ea1efebd434afd Parents: 6a3e591 Author: vozerov-gridgain <[email protected]> Authored: Wed Jul 27 15:35:35 2016 +0300 Committer: vozerov-gridgain <[email protected]> Committed: Wed Jul 27 15:35:35 2016 +0300 ---------------------------------------------------------------------- .../org/apache/ignite/internal/GridTopic.java | 2 +- .../communication/GridIoMessageFactory.java | 8 +- .../internal/processors/igfs/IgfsContext.java | 8 +- .../internal/processors/igfs/IgfsProcessor.java | 6 +- .../igfs/client/IgfsClientAbstractCallable.java | 10 +- .../igfs/client/IgfsClientClosureManager.java | 191 ---------------- .../client/IgfsClientClosureOutOperation.java | 76 ------- .../igfs/client/IgfsClientClosureRequest.java | 203 ----------------- .../igfs/client/IgfsClientClosureResponse.java | 133 ----------- .../client/IgfsClientClosureResponseType.java | 53 ----- .../igfs/client/IgfsClientManager.java | 228 +++++++++++++++++++ .../igfs/client/IgfsClientOutOperation.java | 76 +++++++ .../igfs/client/IgfsClientRequest.java | 203 +++++++++++++++++ .../igfs/client/IgfsClientResponse.java | 133 +++++++++++ .../igfs/client/IgfsClientResponseType.java | 53 +++++ 15 files changed, 714 insertions(+), 669 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/b5322e8a/modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java b/modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java index e5d47c0..e073d41 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java @@ -97,7 +97,7 @@ public enum GridTopic { TOPIC_TX, /** Topic to handle IGFS closures. */ - TOPIC_IGFS_CLO; + TOPIC_IGFS_CLI; /** Enum values. */ private static final GridTopic[] VALS = values(); http://git-wip-us.apache.org/repos/asf/ignite/blob/b5322e8a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java index b0f75dc..1097ad5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java @@ -120,8 +120,8 @@ import org.apache.ignite.internal.processors.igfs.IgfsFileAffinityRange; import org.apache.ignite.internal.processors.igfs.IgfsFragmentizerRequest; import org.apache.ignite.internal.processors.igfs.IgfsFragmentizerResponse; import org.apache.ignite.internal.processors.igfs.IgfsSyncMessage; -import org.apache.ignite.internal.processors.igfs.client.IgfsClientClosureRequest; -import org.apache.ignite.internal.processors.igfs.client.IgfsClientClosureResponse; +import org.apache.ignite.internal.processors.igfs.client.IgfsClientRequest; +import org.apache.ignite.internal.processors.igfs.client.IgfsClientResponse; import org.apache.ignite.internal.processors.query.h2.twostep.messages.GridQueryCancelRequest; import org.apache.ignite.internal.processors.query.h2.twostep.messages.GridQueryFailResponse; import org.apache.ignite.internal.processors.query.h2.twostep.messages.GridQueryNextPageRequest; @@ -163,12 +163,12 @@ public class GridIoMessageFactory implements MessageFactory { switch (type) { case -28: - msg = new IgfsClientClosureResponse(); + msg = new IgfsClientResponse(); break; case -27: - msg = new IgfsClientClosureRequest(); + msg = new IgfsClientRequest(); break; http://git-wip-us.apache.org/repos/asf/ignite/blob/b5322e8a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsContext.java index 3f67156..ed9f302 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsContext.java @@ -24,7 +24,7 @@ import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.configuration.FileSystemConfiguration; import org.apache.ignite.internal.GridKernalContext; -import org.apache.ignite.internal.processors.igfs.client.IgfsClientClosureManager; +import org.apache.ignite.internal.processors.igfs.client.IgfsClientManager; import org.jetbrains.annotations.Nullable; /** @@ -41,7 +41,7 @@ public class IgfsContext { private List<IgfsManager> mgrs = new LinkedList<>(); /** Closure manager. */ - private final IgfsClientClosureManager cloMgr; + private final IgfsClientManager cloMgr; /** Meta manager. */ private final IgfsMetaManager metaMgr; @@ -73,7 +73,7 @@ public class IgfsContext { public IgfsContext( GridKernalContext ctx, FileSystemConfiguration cfg, - IgfsClientClosureManager cloMgr, + IgfsClientManager cloMgr, IgfsMetaManager metaMgr, IgfsDataManager dataMgr, IgfsServerManager srvMgr, @@ -123,7 +123,7 @@ public class IgfsContext { /** * @return Closure manager. */ - public IgfsClientClosureManager closure() { + public IgfsClientManager closure() { return cloMgr; } http://git-wip-us.apache.org/repos/asf/ignite/blob/b5322e8a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsProcessor.java index 7d6753d..58e0d48 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsProcessor.java @@ -33,7 +33,7 @@ import org.apache.ignite.igfs.mapreduce.IgfsJob; import org.apache.ignite.igfs.mapreduce.IgfsRecordResolver; import org.apache.ignite.internal.GridKernalContext; import org.apache.ignite.internal.IgniteNodeAttributes; -import org.apache.ignite.internal.processors.igfs.client.IgfsClientClosureManager; +import org.apache.ignite.internal.processors.igfs.client.IgfsClientManager; import org.apache.ignite.internal.processors.query.GridQueryProcessor; import org.apache.ignite.internal.util.ipc.IpcServerEndpoint; import org.apache.ignite.internal.util.typedef.C1; @@ -85,7 +85,7 @@ public class IgfsProcessor extends IgfsProcessorAdapter { new ConcurrentHashMap8<>(); /** Client closure manager. */ - private final IgfsClientClosureManager cloMgr; + private final IgfsClientManager cloMgr; /** * @param ctx Kernal context. @@ -93,7 +93,7 @@ public class IgfsProcessor extends IgfsProcessorAdapter { public IgfsProcessor(GridKernalContext ctx) { super(ctx); - cloMgr = new IgfsClientClosureManager(ctx); + cloMgr = new IgfsClientManager(ctx); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/b5322e8a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/client/IgfsClientAbstractCallable.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/client/IgfsClientAbstractCallable.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/client/IgfsClientAbstractCallable.java index a184f2e..92bdf30 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/client/IgfsClientAbstractCallable.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/client/IgfsClientAbstractCallable.java @@ -136,6 +136,7 @@ public abstract class IgfsClientAbstractCallable<T> implements IgniteCallable<T> } /** {@inheritDoc} */ + // TODO. @Override public final T call() throws Exception { assert ignite != null; @@ -151,7 +152,14 @@ public abstract class IgfsClientAbstractCallable<T> implements IgniteCallable<T> * @return Result. * @throws Exception If failed. */ - protected abstract T call0(IgfsContext ctx) throws Exception; + public abstract T call0(IgfsContext ctx) throws Exception; + + /** + * @return IGFS name. + */ + public String igfsName() { + return igfsName; + } /** {@inheritDoc} */ @Override public final void writeBinary(BinaryWriter writer) throws BinaryObjectException { http://git-wip-us.apache.org/repos/asf/ignite/blob/b5322e8a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/client/IgfsClientClosureManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/client/IgfsClientClosureManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/client/IgfsClientClosureManager.java deleted file mode 100644 index 4da47e0..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/client/IgfsClientClosureManager.java +++ /dev/null @@ -1,191 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.igfs.client; - -import org.apache.ignite.IgniteCheckedException; -import org.apache.ignite.internal.GridKernalContext; -import org.apache.ignite.internal.IgniteInternalFuture; -import org.apache.ignite.internal.processors.igfs.IgfsContext; -import org.apache.ignite.internal.processors.igfs.IgfsManager; -import org.apache.ignite.internal.util.StripedCompositeReadWriteLock; -import org.apache.ignite.internal.util.typedef.internal.S; -import org.apache.ignite.internal.util.typedef.internal.U; -import org.apache.ignite.marshaller.Marshaller; -import org.jetbrains.annotations.Nullable; - -import java.util.Map; -import java.util.UUID; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentLinkedDeque; - -/** - * Manager to handle IGFS client closures. - */ -public class IgfsClientClosureManager extends IgfsManager { - /** Pending input operations received when manager is not started yet. */ - private final ConcurrentLinkedDeque<IgfsClientClosureRequest> pending = new ConcurrentLinkedDeque<>(); - - /** Outgoing operations. */ - private final Map<Long, IgfsClientClosureOutOperation> outOps = new ConcurrentHashMap<>(); - - /** Marshaller. */ - private final Marshaller marsh; - - /** Whether manager is fully started and ready to process requests. */ - private volatile boolean ready; - - /** Stopping flag. */ - private volatile boolean stopping; - - /** RW lock for synchronization. */ - private final StripedCompositeReadWriteLock rwLock = - new StripedCompositeReadWriteLock(Runtime.getRuntime().availableProcessors() * 2); - - /** - * Constructor. - * - * @param ctx Kernal context. - */ - public IgfsClientClosureManager(GridKernalContext ctx) { - super(ctx); - - marsh = ctx.config().getMarshaller(); - } - - /** {@inheritDoc} */ - @Override protected void start0() throws IgniteCheckedException { - // TODO - } - - /** {@inheritDoc} */ - @Override protected void onKernalStart0() throws IgniteCheckedException { - // TODO - } - - /** {@inheritDoc} */ - @Override protected void onKernalStop0(boolean cancel) { - // TODO - } - - /** {@inheritDoc} */ - @Override protected void stop0(boolean cancel) { - // TODO - } - - /** - * Execute IGFS closure. - * - * @param igfsCtx IGFS context. - * @param clo Closure. - * @return Result. - */ - public <T> T execute(IgfsContext igfsCtx, IgfsClientAbstractCallable<T> clo) throws IgniteCheckedException { - return executeAsync(igfsCtx, clo).get(); - } - - /** - * Execute IGFS closure asynchronously. - * - * @param igfsCtx IGFS context. - * @param clo Closure. - * @return Future. - */ - public <T> IgniteInternalFuture<T> executeAsync(IgfsContext igfsCtx, IgfsClientAbstractCallable<T> clo) { - - // TODO - - return null; - } - - /** - * Create closure response. - * - * @param msgId Message ID. - * @param res Response. - * @param marsh Marshaller. - * @return Response. - */ - private IgfsClientClosureResponse createResponse(long msgId, @Nullable Object res, @Nullable Throwable resErr, - Marshaller marsh) { - try { - if (resErr != null) - return new IgfsClientClosureResponse(msgId, IgfsClientClosureResponseType.ERR, null, - marsh.marshal(resErr)); - else { - if (res == null) - return new IgfsClientClosureResponse(msgId, IgfsClientClosureResponseType.NULL, null, null); - else if (res instanceof Boolean) - return new IgfsClientClosureResponse(msgId, IgfsClientClosureResponseType.BOOL, res, null); - else - return new IgfsClientClosureResponse(msgId, IgfsClientClosureResponseType.OBJ, null, - marsh.marshal(res)); - } - } - catch (Exception e) { - U.error(log, "Failed to marshal IGFS closure result [msgId=" + msgId + ", res=" + res + - ", resErr=" + resErr + ']', e); - - return new IgfsClientClosureResponse(msgId, IgfsClientClosureResponseType.MARSH_ERR, null, null); - } - } - - /** - * Handle node leave event. - * - * @param nodeId Node ID. - */ - private void onNodeLeft(UUID nodeId) { - // TODO - } - - /** - * Handle closure request. - * - * @param req Request. - */ - private void onClosureRequest(IgfsClientClosureRequest req) { - rwLock.readLock().lock(); - - try { - if (stopping) - return; // Discovery listener on remote node will handle node leave. - - if (ready) - onClosureRequest0(req); // Normal execution flow. - else - pending.add(req); // Add to pending set if manager is not fully started yet. - } - finally { - rwLock.readLock().unlock(); - } - } - - /** - * Actual request processing. Happens inside appropriate thread pool. - * - * @param req Request. - */ - private void onClosureRequest0(IgfsClientClosureRequest req) { - // TODO - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(IgfsClientClosureManager.class, this); - } -} http://git-wip-us.apache.org/repos/asf/ignite/blob/b5322e8a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/client/IgfsClientClosureOutOperation.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/client/IgfsClientClosureOutOperation.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/client/IgfsClientClosureOutOperation.java deleted file mode 100644 index 2eb15a6..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/client/IgfsClientClosureOutOperation.java +++ /dev/null @@ -1,76 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.igfs.client; - -import org.apache.ignite.internal.IgniteInternalFuture; -import org.apache.ignite.internal.util.typedef.internal.S; - -import java.util.UUID; - -/** - * IGFS client closure outgoing opeartion descriptor. - */ -public class IgfsClientClosureOutOperation { - /** Target node ID. */ - private final UUID nodeId; - - /** Target operation. */ - private final IgfsClientAbstractCallable target; - - /** Future completed when operation is ready. */ - private final IgniteInternalFuture fut; - - /** - * Constructor. - * - * @param nodeId Target node ID. - * @param target Target operation. - * @param fut Future completed when operation is ready. - */ - public IgfsClientClosureOutOperation(UUID nodeId, IgfsClientAbstractCallable target, IgniteInternalFuture fut) { - this.nodeId = nodeId; - this.target = target; - this.fut = fut; - } - - /** - * @return Target node ID. - */ - public UUID nodeId() { - return nodeId; - } - - /** - * @return Target operation. - */ - public IgfsClientAbstractCallable target() { - return target; - } - - /** - * @return Future completed when operation is ready. - */ - public IgniteInternalFuture future() { - return fut; - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(IgfsClientClosureOutOperation.class, this); - } -} http://git-wip-us.apache.org/repos/asf/ignite/blob/b5322e8a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/client/IgfsClientClosureRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/client/IgfsClientClosureRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/client/IgfsClientClosureRequest.java deleted file mode 100644 index 1eda024..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/client/IgfsClientClosureRequest.java +++ /dev/null @@ -1,203 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.igfs.client; - -import org.apache.ignite.internal.util.tostring.GridToStringInclude; -import org.apache.ignite.internal.util.typedef.internal.S; -import org.apache.ignite.plugin.extensions.communication.Message; -import org.apache.ignite.plugin.extensions.communication.MessageReader; -import org.apache.ignite.plugin.extensions.communication.MessageWriter; - -import java.nio.ByteBuffer; -import java.util.UUID; - -/** - * IGFS client closure execute request. - */ -public class IgfsClientClosureRequest implements Message { - /** Base fields (all except of target) count. */ - private static final byte BASE_FIELDS_CNT = 3; - - /** Originating node ID. */ - private UUID nodeId; - - /** Message ID. */ - private long msgId; - - /** Target callable. */ - @GridToStringInclude - private IgfsClientAbstractCallable target; - - /** - * Default constructor. - */ - public IgfsClientClosureRequest() { - // No-op. - } - - /** - * Constructor. - * - * @param nodeId Originating node ID. - * @param msgId Message ID. - * @param target Target callable. - */ - public IgfsClientClosureRequest(UUID nodeId, long msgId, IgfsClientAbstractCallable target) { - assert nodeId != null; - assert target != null; - - this.nodeId = nodeId; - this.msgId = msgId; - this.target = target; - } - - /** - * @return Node ID. - */ - public UUID nodeId() { - return nodeId; - } - - /** - * @return Message ID. - */ - public long messageId() { - return msgId; - } - - /** - * @return Target callable. - */ - public IgfsClientAbstractCallable target() { - return target; - } - - /** {@inheritDoc} */ - @Override public byte directType() { - return -27; - } - - /** {@inheritDoc} */ - @Override public byte fieldsCount() { - return (byte)(BASE_FIELDS_CNT + target.fieldsCount()); - } - - /** {@inheritDoc} */ - @Override public void onAckReceived() { - // No-op. - } - - /** {@inheritDoc} */ - @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { - byte fieldsCount = fieldsCount(); - - writer.setBuffer(buf); - - if (!writer.isHeaderWritten()) { - if (!writer.writeHeader(directType(), fieldsCount)) - return false; - - writer.onHeaderWritten(); - } - - switch (writer.state()) { - case 0: - if (!writer.writeUuid("nodeId", nodeId)) - return false; - - writer.incrementState(); - - case 1: - if (!writer.writeLong("msgId", msgId)) - return false; - - writer.incrementState(); - - case 2: - if (!writer.writeShort("typeId", target.typeId())) - return false; - - writer.incrementState(); - - default: - while (writer.state() < fieldsCount) { - if (!target.writeTo(writer, writer.state() - BASE_FIELDS_CNT)) - return false; - - writer.incrementState(); - } - } - - return true; - } - - /** {@inheritDoc} */ - @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) { - reader.setBuffer(buf); - - if (!reader.beforeMessageRead()) - return false; - - switch (reader.state()) { - case 0: - nodeId = reader.readUuid("nodeId"); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - - case 1: - msgId = reader.readLong("msgId"); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - - case 2: - short typeId; - - typeId = reader.readShort("typeId"); - - if (!reader.isLastRead()) - return false; - - target = IgfsClientAbstractCallable.callableForTypeId(typeId); - - reader.incrementState(); - - default: - while (reader.state() < fieldsCount()) { - target.readFrom(reader, reader.state() - BASE_FIELDS_CNT); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - } - } - - return reader.afterMessageRead(IgfsClientClosureRequest.class); - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(IgfsClientClosureRequest.class, this); - } -} http://git-wip-us.apache.org/repos/asf/ignite/blob/b5322e8a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/client/IgfsClientClosureResponse.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/client/IgfsClientClosureResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/client/IgfsClientClosureResponse.java deleted file mode 100644 index 40e2802..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/client/IgfsClientClosureResponse.java +++ /dev/null @@ -1,133 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.igfs.client; - -import org.apache.ignite.internal.util.tostring.GridToStringExclude; -import org.apache.ignite.internal.util.tostring.GridToStringInclude; -import org.apache.ignite.internal.util.typedef.internal.S; -import org.apache.ignite.plugin.extensions.communication.Message; -import org.apache.ignite.plugin.extensions.communication.MessageReader; -import org.apache.ignite.plugin.extensions.communication.MessageWriter; -import org.jetbrains.annotations.Nullable; - -import java.nio.ByteBuffer; - -/** - * IGFS client closure execute response. - */ -public class IgfsClientClosureResponse implements Message { - /** Message ID. */ - private long msgId; - - /** Response type. */ - private IgfsClientClosureResponseType typ; - - /** Result. */ - @GridToStringInclude - private Object res; - - /** Result bytes. */ - @GridToStringExclude - private byte[] resBytes; - - /** - * Default constructor. - */ - public IgfsClientClosureResponse() { - // No-op. - } - - /** - * Constructor. - * - * @param msgId Message ID. - * @param typ Type. - * @param res Result. - * @param resBytes Result bytes. - */ - public IgfsClientClosureResponse(long msgId, IgfsClientClosureResponseType typ, @Nullable Object res, - @Nullable byte[] resBytes) { - this.msgId = msgId; - this.typ = typ; - this.res = res; - this.resBytes = resBytes; - } - - /** - * @return Message ID. - */ - public long messageId() { - return msgId; - } - - /** - * @return Type. - */ - public IgfsClientClosureResponseType type() { - return typ; - } - - /** - * @return Result. - */ - @Nullable public Object result() { - return res; - } - - /** - * @return Result bytes. - */ - @Nullable public byte[] resultBytes() { - return resBytes; - } - - /** {@inheritDoc} */ - @Override public byte directType() { - return -28; - } - - /** {@inheritDoc} */ - @Override public byte fieldsCount() { - return (byte)(typ == IgfsClientClosureResponseType.NULL || typ == IgfsClientClosureResponseType.MARSH_ERR ? - 2 : 3); - } - - /** {@inheritDoc} */ - @Override public void onAckReceived() { - // No-op. - } - - /** {@inheritDoc} */ - @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { - // TODO - - return false; - } - - /** {@inheritDoc} */ - @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) { - // TODO - - return false; - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(IgfsClientClosureResponse.class, this); - } -} http://git-wip-us.apache.org/repos/asf/ignite/blob/b5322e8a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/client/IgfsClientClosureResponseType.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/client/IgfsClientClosureResponseType.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/client/IgfsClientClosureResponseType.java deleted file mode 100644 index 8fd45c9..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/client/IgfsClientClosureResponseType.java +++ /dev/null @@ -1,53 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.igfs.client; - -import org.jetbrains.annotations.Nullable; - -/** - * IGFS client closure response type. - */ -public enum IgfsClientClosureResponseType { - /** Object. */ - OBJ, - - /** Null result. */ - NULL, - - /** Boolean result. */ - BOOL, - - /** Error. */ - ERR, - - /** Marshalling error. */ - MARSH_ERR; - - /** Enum values. */ - private static final IgfsClientClosureResponseType[] VALS = values(); - - /** - * Efficiently gets enumerated value from its ordinal. - * - * @param ord Ordinal value. - * @return Enumerated value. - */ - @Nullable public static IgfsClientClosureResponseType fromOrdinal(int ord) { - return ord >= 0 && ord < VALS.length ? VALS[ord] : null; - } -} http://git-wip-us.apache.org/repos/asf/ignite/blob/b5322e8a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/client/IgfsClientManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/client/IgfsClientManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/client/IgfsClientManager.java new file mode 100644 index 0000000..59297bf --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/client/IgfsClientManager.java @@ -0,0 +1,228 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.igfs.client; + +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.igfs.IgfsException; +import org.apache.ignite.internal.GridKernalContext; +import org.apache.ignite.internal.GridTopic; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.managers.communication.GridIoPolicy; +import org.apache.ignite.internal.processors.igfs.IgfsContext; +import org.apache.ignite.internal.processors.igfs.IgfsImpl; +import org.apache.ignite.internal.processors.igfs.IgfsManager; +import org.apache.ignite.internal.util.StripedCompositeReadWriteLock; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.marshaller.Marshaller; +import org.jetbrains.annotations.Nullable; + +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedDeque; + +/** + * Manager to handle IGFS client closures. + */ +public class IgfsClientManager extends IgfsManager { + /** Pending input operations received when manager is not started yet. */ + private final ConcurrentLinkedDeque<IgfsClientRequest> pending = new ConcurrentLinkedDeque<>(); + + /** Outgoing operations. */ + private final Map<Long, IgfsClientOutOperation> outOps = new ConcurrentHashMap<>(); + + /** Marshaller. */ + private final Marshaller marsh; + + /** Whether manager is fully started and ready to process requests. */ + private volatile boolean ready; + + /** Stopping flag. */ + private volatile boolean stopping; + + /** RW lock for synchronization. */ + private final StripedCompositeReadWriteLock rwLock = + new StripedCompositeReadWriteLock(Runtime.getRuntime().availableProcessors() * 2); + + /** + * Constructor. + * + * @param ctx Kernal context. + */ + public IgfsClientManager(GridKernalContext ctx) { + super(ctx); + + marsh = ctx.config().getMarshaller(); + } + + /** {@inheritDoc} */ + @Override protected void start0() throws IgniteCheckedException { + // TODO + } + + /** {@inheritDoc} */ + @Override protected void onKernalStart0() throws IgniteCheckedException { + // TODO + } + + /** {@inheritDoc} */ + @Override protected void onKernalStop0(boolean cancel) { + // TODO + } + + /** {@inheritDoc} */ + @Override protected void stop0(boolean cancel) { + // TODO + } + + /** + * Execute IGFS closure. + * + * @param igfsCtx IGFS context. + * @param clo Closure. + * @return Result. + */ + public <T> T execute(IgfsContext igfsCtx, IgfsClientAbstractCallable<T> clo) throws IgniteCheckedException { + return executeAsync(igfsCtx, clo).get(); + } + + /** + * Execute IGFS closure asynchronously. + * + * @param igfsCtx IGFS context. + * @param clo Closure. + * @return Future. + */ + public <T> IgniteInternalFuture<T> executeAsync(IgfsContext igfsCtx, IgfsClientAbstractCallable<T> clo) { + + // TODO + + return null; + } + + /** + * Create closure response. + * + * @param msgId Message ID. + * @param res Response. + * @return Response. + */ + private IgfsClientResponse createResponse(long msgId, @Nullable Object res, @Nullable Throwable resErr) { + try { + if (resErr != null) + return new IgfsClientResponse(msgId, IgfsClientResponseType.ERR, null, + marsh.marshal(resErr)); + else { + if (res == null) + return new IgfsClientResponse(msgId, IgfsClientResponseType.NULL, null, null); + else if (res instanceof Boolean) + return new IgfsClientResponse(msgId, IgfsClientResponseType.BOOL, res, null); + else + return new IgfsClientResponse(msgId, IgfsClientResponseType.OBJ, null, + marsh.marshal(res)); + } + } + catch (Exception e) { + U.error(log, "Failed to marshal IGFS closure result [msgId=" + msgId + ", res=" + res + + ", resErr=" + resErr + ']', e); + + return new IgfsClientResponse(msgId, IgfsClientResponseType.MARSH_ERR, null, null); + } + } + + /** + * Handle node leave event. + * + * @param nodeId Node ID. + */ + private void onNodeLeft(UUID nodeId) { + // TODO + } + + /** + * Handle request. + * + * @param req Request. + */ + private void onRequest(IgfsClientRequest req) { + rwLock.readLock().lock(); + + try { + if (stopping) + return; // Discovery listener on remote node will handle node leave. + + if (ready) + processRequest(req); // Normal execution flow. + else + pending.add(req); // Add to pending set if manager is not fully started yet. + } + finally { + rwLock.readLock().unlock(); + } + } + + /** + * Handle response. + * + * @param resp Response. + */ + private void onResponse(IgfsClientResponse resp) { + // TODO. + } + + /** + * Actual request processing. Happens inside appropriate thread pool. + * + * @param req Request. + */ + private void processRequest(IgfsClientRequest req) { + IgfsClientResponse resp; + + try { + IgfsClientAbstractCallable target = req.target(); + + IgfsImpl igfs = (IgfsImpl) ctx.igfs().igfs(target.igfsName()); + + if (igfs == null) + throw new IgfsException("IGFS with the given name is not configured on the node: " + target.igfsName()); + + Object res = target.call0(igfs.context()); + + resp = createResponse(req.messageId(), res, null); + } + catch (Exception e) { + // Wrap exception. + resp = createResponse(req.messageId(), null, e); + } + + // Send response. + try { + ctx.io().send(req.nodeId(), GridTopic.TOPIC_IGFS_CLI, resp, GridIoPolicy.PUBLIC_POOL); + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to send IGFS client response [nodeId=" + req.nodeId() + + ", msgId=" + req.messageId() + ']', e); + } + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(IgfsClientManager.class, this); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/b5322e8a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/client/IgfsClientOutOperation.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/client/IgfsClientOutOperation.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/client/IgfsClientOutOperation.java new file mode 100644 index 0000000..9e1c3ec --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/client/IgfsClientOutOperation.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.igfs.client; + +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.util.typedef.internal.S; + +import java.util.UUID; + +/** + * IGFS client closure outgoing opeartion descriptor. + */ +public class IgfsClientOutOperation { + /** Target node ID. */ + private final UUID nodeId; + + /** Target operation. */ + private final IgfsClientAbstractCallable target; + + /** Future completed when operation is ready. */ + private final IgniteInternalFuture fut; + + /** + * Constructor. + * + * @param nodeId Target node ID. + * @param target Target operation. + * @param fut Future completed when operation is ready. + */ + public IgfsClientOutOperation(UUID nodeId, IgfsClientAbstractCallable target, IgniteInternalFuture fut) { + this.nodeId = nodeId; + this.target = target; + this.fut = fut; + } + + /** + * @return Target node ID. + */ + public UUID nodeId() { + return nodeId; + } + + /** + * @return Target operation. + */ + public IgfsClientAbstractCallable target() { + return target; + } + + /** + * @return Future completed when operation is ready. + */ + public IgniteInternalFuture future() { + return fut; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(IgfsClientOutOperation.class, this); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/b5322e8a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/client/IgfsClientRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/client/IgfsClientRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/client/IgfsClientRequest.java new file mode 100644 index 0000000..49fe523 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/client/IgfsClientRequest.java @@ -0,0 +1,203 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.igfs.client; + +import org.apache.ignite.internal.util.tostring.GridToStringInclude; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.plugin.extensions.communication.Message; +import org.apache.ignite.plugin.extensions.communication.MessageReader; +import org.apache.ignite.plugin.extensions.communication.MessageWriter; + +import java.nio.ByteBuffer; +import java.util.UUID; + +/** + * IGFS client closure execute request. + */ +public class IgfsClientRequest implements Message { + /** Base fields (all except of target) count. */ + private static final byte BASE_FIELDS_CNT = 3; + + /** Originating node ID. */ + private UUID nodeId; + + /** Message ID. */ + private long msgId; + + /** Target callable. */ + @GridToStringInclude + private IgfsClientAbstractCallable target; + + /** + * Default constructor. + */ + public IgfsClientRequest() { + // No-op. + } + + /** + * Constructor. + * + * @param nodeId Originating node ID. + * @param msgId Message ID. + * @param target Target callable. + */ + public IgfsClientRequest(UUID nodeId, long msgId, IgfsClientAbstractCallable target) { + assert nodeId != null; + assert target != null; + + this.nodeId = nodeId; + this.msgId = msgId; + this.target = target; + } + + /** + * @return Node ID. + */ + public UUID nodeId() { + return nodeId; + } + + /** + * @return Message ID. + */ + public long messageId() { + return msgId; + } + + /** + * @return Target callable. + */ + public IgfsClientAbstractCallable target() { + return target; + } + + /** {@inheritDoc} */ + @Override public byte directType() { + return -27; + } + + /** {@inheritDoc} */ + @Override public byte fieldsCount() { + return (byte)(BASE_FIELDS_CNT + target.fieldsCount()); + } + + /** {@inheritDoc} */ + @Override public void onAckReceived() { + // No-op. + } + + /** {@inheritDoc} */ + @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { + byte fieldsCount = fieldsCount(); + + writer.setBuffer(buf); + + if (!writer.isHeaderWritten()) { + if (!writer.writeHeader(directType(), fieldsCount)) + return false; + + writer.onHeaderWritten(); + } + + switch (writer.state()) { + case 0: + if (!writer.writeUuid("nodeId", nodeId)) + return false; + + writer.incrementState(); + + case 1: + if (!writer.writeLong("msgId", msgId)) + return false; + + writer.incrementState(); + + case 2: + if (!writer.writeShort("typeId", target.typeId())) + return false; + + writer.incrementState(); + + default: + while (writer.state() < fieldsCount) { + if (!target.writeTo(writer, writer.state() - BASE_FIELDS_CNT)) + return false; + + writer.incrementState(); + } + } + + return true; + } + + /** {@inheritDoc} */ + @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) { + reader.setBuffer(buf); + + if (!reader.beforeMessageRead()) + return false; + + switch (reader.state()) { + case 0: + nodeId = reader.readUuid("nodeId"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 1: + msgId = reader.readLong("msgId"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 2: + short typeId; + + typeId = reader.readShort("typeId"); + + if (!reader.isLastRead()) + return false; + + target = IgfsClientAbstractCallable.callableForTypeId(typeId); + + reader.incrementState(); + + default: + while (reader.state() < fieldsCount()) { + target.readFrom(reader, reader.state() - BASE_FIELDS_CNT); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + } + } + + return reader.afterMessageRead(IgfsClientRequest.class); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(IgfsClientRequest.class, this); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/b5322e8a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/client/IgfsClientResponse.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/client/IgfsClientResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/client/IgfsClientResponse.java new file mode 100644 index 0000000..2fd500f --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/client/IgfsClientResponse.java @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.igfs.client; + +import org.apache.ignite.internal.util.tostring.GridToStringExclude; +import org.apache.ignite.internal.util.tostring.GridToStringInclude; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.plugin.extensions.communication.Message; +import org.apache.ignite.plugin.extensions.communication.MessageReader; +import org.apache.ignite.plugin.extensions.communication.MessageWriter; +import org.jetbrains.annotations.Nullable; + +import java.nio.ByteBuffer; + +/** + * IGFS client closure execute response. + */ +public class IgfsClientResponse implements Message { + /** Message ID. */ + private long msgId; + + /** Response type. */ + private IgfsClientResponseType typ; + + /** Result. */ + @GridToStringInclude + private Object res; + + /** Result bytes. */ + @GridToStringExclude + private byte[] resBytes; + + /** + * Default constructor. + */ + public IgfsClientResponse() { + // No-op. + } + + /** + * Constructor. + * + * @param msgId Message ID. + * @param typ Type. + * @param res Result. + * @param resBytes Result bytes. + */ + public IgfsClientResponse(long msgId, IgfsClientResponseType typ, @Nullable Object res, + @Nullable byte[] resBytes) { + this.msgId = msgId; + this.typ = typ; + this.res = res; + this.resBytes = resBytes; + } + + /** + * @return Message ID. + */ + public long messageId() { + return msgId; + } + + /** + * @return Type. + */ + public IgfsClientResponseType type() { + return typ; + } + + /** + * @return Result. + */ + @Nullable public Object result() { + return res; + } + + /** + * @return Result bytes. + */ + @Nullable public byte[] resultBytes() { + return resBytes; + } + + /** {@inheritDoc} */ + @Override public byte directType() { + return -28; + } + + /** {@inheritDoc} */ + @Override public byte fieldsCount() { + return (byte)(typ == IgfsClientResponseType.NULL || typ == IgfsClientResponseType.MARSH_ERR ? + 2 : 3); + } + + /** {@inheritDoc} */ + @Override public void onAckReceived() { + // No-op. + } + + /** {@inheritDoc} */ + @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { + // TODO + + return false; + } + + /** {@inheritDoc} */ + @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) { + // TODO + + return false; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(IgfsClientResponse.class, this); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/b5322e8a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/client/IgfsClientResponseType.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/client/IgfsClientResponseType.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/client/IgfsClientResponseType.java new file mode 100644 index 0000000..458b5be --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/client/IgfsClientResponseType.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.igfs.client; + +import org.jetbrains.annotations.Nullable; + +/** + * IGFS client closure response type. + */ +public enum IgfsClientResponseType { + /** Object. */ + OBJ, + + /** Null result. */ + NULL, + + /** Boolean result. */ + BOOL, + + /** Error. */ + ERR, + + /** Marshalling error. */ + MARSH_ERR; + + /** Enum values. */ + private static final IgfsClientResponseType[] VALS = values(); + + /** + * Efficiently gets enumerated value from its ordinal. + * + * @param ord Ordinal value. + * @return Enumerated value. + */ + @Nullable public static IgfsClientResponseType fromOrdinal(int ord) { + return ord >= 0 && ord < VALS.length ? VALS[ord] : null; + } +}
