http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1b0e45a2/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/package.html ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/package.html b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/package.html new file mode 100644 index 0000000..3122418 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/package.html @@ -0,0 +1,23 @@ +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + --> +<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd"> +<html> +<body> + <!-- Package description. --> + REST protocol handlers. +</body> +</html>
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1b0e45a2/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/tcp/GridClientPacketType.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/tcp/GridClientPacketType.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/tcp/GridClientPacketType.java new file mode 100644 index 0000000..fef6fe3 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/tcp/GridClientPacketType.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.rest.protocols.tcp; + +/** + * Type of message being parsed. + */ +public enum GridClientPacketType { + /** Memcache protocol message. */ + MEMCACHE, + + /** GridGain handshake. */ + GRIDGAIN_HANDSHAKE, + + /** GridGain message. */ + GRIDGAIN +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1b0e45a2/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/tcp/GridMemcachedMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/tcp/GridMemcachedMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/tcp/GridMemcachedMessage.java new file mode 100644 index 0000000..3587f3a --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/tcp/GridMemcachedMessage.java @@ -0,0 +1,485 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.rest.protocols.tcp; + +import org.apache.ignite.internal.processors.rest.client.message.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.jetbrains.annotations.*; + +import java.util.*; + +/** + * Memcached protocol request. + */ +public class GridMemcachedMessage implements GridClientMessage { + /** */ + private static final long serialVersionUID = 0L; + + /** Random UUID used for memcached clients authentication. */ + private static final UUID MEMCACHED_ID = UUID.randomUUID(); + + /** Header length. */ + public static final int HDR_LEN = 24; + + /** Flags length. */ + public static final byte FLAGS_LENGTH = 4; + + /** Memcache client request flag. */ + public static final byte MEMCACHE_REQ_FLAG = (byte)0x80; + + /** Response flag. */ + public static final byte MEMCACHE_RES_FLAG = (byte)0x81; + + /** Custom client request flag. */ + public static final byte GRIDGAIN_REQ_FLAG = (byte)0x90; + + /** Client handshake flag. */ + public static final byte GRIDGAIN_HANDSHAKE_FLAG = (byte)0x91; + + /** Success status. */ + public static final int SUCCESS = 0x0000; + + /** Key not found status. */ + public static final int KEY_NOT_FOUND = 0x0001; + + /** Failure status. */ + public static final int FAILURE = 0x0004; + + /** Serialized flag. */ + public static final int SERIALIZED_FLAG = 1; + + /** Boolean flag. */ + public static final int BOOLEAN_FLAG = (1 << 8); + + /** Integer flag. */ + public static final int INT_FLAG = (2 << 8); + + /** Long flag. */ + public static final int LONG_FLAG = (3 << 8); + + /** Date flag. */ + public static final int DATE_FLAG = (4 << 8); + + /** Byte flag. */ + public static final int BYTE_FLAG = (5 << 8); + + /** Float flag. */ + public static final int FLOAT_FLAG = (6 << 8); + + /** Double flag. */ + public static final int DOUBLE_FLAG = (7 << 8); + + /** Byte array flag. */ + public static final int BYTE_ARR_FLAG = (8 << 8); + + /** Request flag. */ + private byte reqFlag; + + /** Operation code. */ + private byte opCode; + + /** Key length. */ + private short keyLen; + + /** Extras length. */ + private byte extrasLen; + + /** Status. */ + private int status; + + /** Total body length. */ + private int totalLen; + + /** Opaque. */ + private byte[] opaque; + + /** Extras. */ + private byte[] extras; + + /** Key. */ + private Object key; + + /** Value. */ + private Object val; + + /** Value to add/subtract */ + private Long delta; + + /** Initial value for increment and decrement commands. */ + private Long init; + + /** Expiration time. */ + private Long expiration; + + /** Cache name. */ + private String cacheName; + + /** + * Creates empty packet which will be filled in parser. + */ + GridMemcachedMessage() { + } + + /** + * Creates copy of request packet for easy response construction. + * + * @param req Source request packet. + */ + GridMemcachedMessage(GridMemcachedMessage req) { + assert req != null; + + reqFlag = req.reqFlag; + opCode = req.opCode; + + opaque = new byte[req.opaque.length]; + U.arrayCopy(req.opaque, 0, opaque, 0, req.opaque.length); + } + + /** {@inheritDoc} */ + @Override public long requestId() { + return U.bytesToInt(opaque, 0); + } + + /** {@inheritDoc} */ + @Override public void requestId(long reqId) { + U.intToBytes((int)reqId, opaque, 0); + } + + /** {@inheritDoc} */ + @Override public UUID clientId() { + return MEMCACHED_ID; + } + + /** {@inheritDoc} */ + @Override public void clientId(UUID id) { + // No-op. + } + + /** {@inheritDoc} */ + @Override public UUID destinationId() { + return null; // No destination available for memcached packets. + } + + /** {@inheritDoc} */ + @Override public void destinationId(UUID id) { + throw new UnsupportedOperationException("destId is not supported by memcached packets."); + } + + /** {@inheritDoc} */ + @Override public byte[] sessionToken() { + return null; + } + + /** {@inheritDoc} */ + @Override public void sessionToken(byte[] sesTok) { + // No-op. + } + + /** + * @return Request flag. + */ + public byte requestFlag() { + return reqFlag; + } + + /** + * @param reqFlag Request flag. + */ + public void requestFlag(byte reqFlag) { + this.reqFlag = reqFlag; + } + + /** + * @return Operation code. + */ + public byte operationCode() { + return opCode; + } + + /** + * @param opCode Operation code. + */ + public void operationCode(byte opCode) { + assert opCode >= 0; + + this.opCode = opCode; + } + + /** + * @return Key length. + */ + public short keyLength() { + return keyLen; + } + + /** + * @param keyLen Key length. + */ + public void keyLength(short keyLen) { + assert keyLen >= 0; + + this.keyLen = keyLen; + } + + /** + * @return Extras length. + */ + public byte extrasLength() { + return extrasLen; + } + + /** + * @param extrasLen Extras length. + */ + public void extrasLength(byte extrasLen) { + assert extrasLen >= 0; + + this.extrasLen = extrasLen; + } + + /** + * @return Status. + */ + public int status() { + return status; + } + + /** + * @param status Status. + */ + public void status(int status) { + this.status = status; + } + + /** + * @return Total length. + */ + public int totalLength() { + return totalLen; + } + + /** + * @param totalLen Total length. + */ + public void totalLength(int totalLen) { + assert totalLen >= 0; + + this.totalLen = totalLen; + } + + /** + * @return Opaque. + */ + public byte[] opaque() { + return opaque; + } + + /** + * @param opaque Opaque. + */ + public void opaque(byte[] opaque) { + assert opaque != null; + + this.opaque = opaque; + } + + /** + * @return Extras. + */ + public byte[] extras() { + return extras; + } + + /** + * @param extras Extras. + */ + public void extras(byte[] extras) { + assert extras != null; + + this.extras = extras; + } + + /** + * @return Key. + */ + public Object key() { + return key; + } + + /** + * @param key Key. + */ + public void key(Object key) { + assert key != null; + + this.key = key; + } + + /** + * @return Value. + */ + public Object value() { + return val; + } + + /** + * @param val Value. + */ + public void value(Object val) { + assert val != null; + + this.val = val; + } + + /** + * @return Expiration. + */ + @Nullable public Long expiration() { + return expiration; + } + + /** + * @param expiration Expiration. + */ + public void expiration(long expiration) { + this.expiration = expiration; + } + + /** + * @return Delta for increment and decrement commands. + */ + @Nullable public Long delta() { + return delta; + } + + /** + * @param delta Delta for increment and decrement commands. + */ + public void delta(long delta) { + this.delta = delta; + } + + /** + * @return Initial value for increment and decrement commands. + */ + @Nullable public Long initial() { + return init; + } + + /** + * @param init Initial value for increment and decrement commands. + */ + public void initial(long init) { + this.init = init; + } + + /** + * @return Cache name. + */ + @Nullable public String cacheName() { + return cacheName; + } + + /** + * @param cacheName Cache name. + */ + public void cacheName(String cacheName) { + assert cacheName != null; + + this.cacheName = cacheName; + } + + /** + * @return Whether request MUST have flags in extras. + */ + public boolean hasFlags() { + return opCode == 0x01 || + opCode == 0x02 || + opCode == 0x03 || + opCode == 0x11 || + opCode == 0x12 || + opCode == 0x13; + } + + /** + * @return Whether request has expiration field. + */ + public boolean hasExpiration() { + return opCode == 0x01 || + opCode == 0x02 || + opCode == 0x03 || + opCode == 0x11 || + opCode == 0x12 || + opCode == 0x13; + } + + /** + * @return Whether request has delta field. + */ + public boolean hasDelta() { + return opCode == 0x05 || + opCode == 0x06 || + opCode == 0x15 || + opCode == 0x16; + } + + /** + * @return Whether request has initial field. + */ + public boolean hasInitial() { + return opCode == 0x05 || + opCode == 0x06 || + opCode == 0x15 || + opCode == 0x16; + } + + /** + * @return Whether to add data to response. + */ + public boolean addData() { + return opCode == 0x00 || + opCode == 0x05 || + opCode == 0x06 || + opCode == 0x09 || + opCode == 0x0B || + opCode == 0x0C || + opCode == 0x0D || + opCode == 0x20 || + opCode == 0x24 || + opCode == 0x25 || + opCode == 0x26 || + opCode == 0x27 || + opCode == 0x28 || + opCode == 0x29; + } + + /** + * @return Whether to add flags to response. + */ + public boolean addFlags() { + return opCode == 0x00 || + opCode == 0x09 || + opCode == 0x0C || + opCode == 0x0D; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(GridMemcachedMessage.class, this); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1b0e45a2/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/tcp/GridMemcachedMessageWrapper.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/tcp/GridMemcachedMessageWrapper.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/tcp/GridMemcachedMessageWrapper.java new file mode 100644 index 0000000..55f04aa --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/tcp/GridMemcachedMessageWrapper.java @@ -0,0 +1,269 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.rest.protocols.tcp; + +import org.apache.ignite.*; +import org.apache.ignite.internal.util.*; +import org.apache.ignite.marshaller.*; +import org.apache.ignite.internal.util.direct.*; +import org.apache.ignite.internal.util.typedef.internal.*; + +import java.io.*; +import java.nio.*; +import java.nio.charset.*; +import java.util.*; + +import static org.apache.ignite.internal.processors.rest.protocols.tcp.GridMemcachedMessage.*; + +/** + * Memcached message wrapper for direct marshalling. + */ +public class GridMemcachedMessageWrapper extends GridTcpCommunicationMessageAdapter { + /** */ + private static final long serialVersionUID = 3053626103006980626L; + + /** UTF-8 charset. */ + private static final Charset UTF_8 = Charset.forName("UTF-8"); + + /** + * Memcached message bytes. + */ + private byte[] bytes; + + /** + * + */ + public GridMemcachedMessageWrapper() { + // No-op. + } + + /** + * @param msg Message. + * @param jdkMarshaller JDK marshaller. + * @throws IgniteCheckedException If failed to marshal. + */ + public GridMemcachedMessageWrapper(GridMemcachedMessage msg, IgniteMarshaller jdkMarshaller) throws IgniteCheckedException { + bytes = encodeMemcache(msg, jdkMarshaller); + } + + /** {@inheritDoc} */ + @Override public boolean writeTo(ByteBuffer buf) { + commState.setBuffer(buf); + + if (!commState.typeWritten) { + if (!commState.putByte(directType())) + return false; + + commState.typeWritten = true; + } + + switch (commState.idx) { + case 0: + if (!commState.putByteArrayClient(bytes)) + return false; + + commState.idx++; + + } + + return true; + } + + /** {@inheritDoc} */ + @Override public boolean readFrom(ByteBuffer buf) { + throw new UnsupportedOperationException(); + } + + /** {@inheritDoc} */ + @Override public byte directType() { + return MEMCACHE_RES_FLAG; + } + + /** {@inheritDoc} */ + @SuppressWarnings({"CloneDoesntCallSuperClone", "CloneCallsConstructors"}) + @Override public GridTcpCommunicationMessageAdapter clone() { + GridMemcachedMessageWrapper _clone = new GridMemcachedMessageWrapper(); + + clone0(_clone); + + return _clone; + } + + /** {@inheritDoc} */ + @Override protected void clone0(GridTcpCommunicationMessageAdapter _msg) { + GridMemcachedMessageWrapper _clone = (GridMemcachedMessageWrapper)_msg; + + _clone.bytes = bytes; + } + + /** + * Encodes memcache message to a raw byte array. + * + * @param msg Message being serialized. + * @param jdkMarshaller JDK marshaller. + * @return Serialized message. + * @throws IgniteCheckedException If serialization failed. + */ + private byte[] encodeMemcache(GridMemcachedMessage msg, IgniteMarshaller jdkMarshaller) throws IgniteCheckedException { + GridByteArrayList res = new GridByteArrayList(HDR_LEN - 1); + + int keyLen = 0; + + int keyFlags = 0; + + if (msg.key() != null) { + ByteArrayOutputStream rawKey = new ByteArrayOutputStream(); + + keyFlags = encodeObj(msg.key(), rawKey, jdkMarshaller); + + msg.key(rawKey.toByteArray()); + + keyLen = rawKey.size(); + } + + int dataLen = 0; + + int valFlags = 0; + + if (msg.value() != null) { + ByteArrayOutputStream rawVal = new ByteArrayOutputStream(); + + valFlags = encodeObj(msg.value(), rawVal, jdkMarshaller); + + msg.value(rawVal.toByteArray()); + + dataLen = rawVal.size(); + } + + int flagsLen = 0; + + if (msg.addFlags()) + flagsLen = FLAGS_LENGTH; + + res.add(msg.operationCode()); + + // Cast is required due to packet layout. + res.add((short)keyLen); + + // Cast is required due to packet layout. + res.add((byte)flagsLen); + + // Data type is always 0x00. + res.add((byte)0x00); + + res.add((short)msg.status()); + + res.add(keyLen + flagsLen + dataLen); + + res.add(msg.opaque(), 0, msg.opaque().length); + + // CAS, unused. + res.add(0L); + + assert res.size() == HDR_LEN - 1; + + if (flagsLen > 0) { + res.add((short) keyFlags); + res.add((short) valFlags); + } + + assert msg.key() == null || msg.key() instanceof byte[]; + assert msg.value() == null || msg.value() instanceof byte[]; + + if (keyLen > 0) + res.add((byte[])msg.key(), 0, ((byte[])msg.key()).length); + + if (dataLen > 0) + res.add((byte[])msg.value(), 0, ((byte[])msg.value()).length); + + return res.entireArray(); + } + + /** + * Encodes given object to a byte array and returns flags that describe the type of serialized object. + * + * @param obj Object to serialize. + * @param out Output stream to which object should be written. + * @param jdkMarshaller JDK marshaller. + * @return Serialization flags. + * @throws IgniteCheckedException If JDK serialization failed. + */ + private int encodeObj(Object obj, ByteArrayOutputStream out, IgniteMarshaller jdkMarshaller) throws IgniteCheckedException { + int flags = 0; + + byte[] data = null; + + if (obj instanceof String) + data = ((String)obj).getBytes(UTF_8); + else if (obj instanceof Boolean) { + data = new byte[] {(byte)((Boolean)obj ? '1' : '0')}; + + flags |= BOOLEAN_FLAG; + } + else if (obj instanceof Integer) { + data = U.intToBytes((Integer) obj); + + flags |= INT_FLAG; + } + else if (obj instanceof Long) { + data = U.longToBytes((Long)obj); + + flags |= LONG_FLAG; + } + else if (obj instanceof Date) { + data = U.longToBytes(((Date)obj).getTime()); + + flags |= DATE_FLAG; + } + else if (obj instanceof Byte) { + data = new byte[] {(Byte)obj}; + + flags |= BYTE_FLAG; + } + else if (obj instanceof Float) { + data = U.intToBytes(Float.floatToIntBits((Float)obj)); + + flags |= FLOAT_FLAG; + } + else if (obj instanceof Double) { + data = U.longToBytes(Double.doubleToLongBits((Double)obj)); + + flags |= DOUBLE_FLAG; + } + else if (obj instanceof byte[]) { + data = (byte[])obj; + + flags |= BYTE_ARR_FLAG; + } + else { + jdkMarshaller.marshal(obj, out); + + flags |= SERIALIZED_FLAG; + } + + if (data != null) + out.write(data, 0, data.length); + + return flags; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(GridMemcachedMessageWrapper.class, this); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1b0e45a2/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/tcp/GridTcpMemcachedNioListener.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/tcp/GridTcpMemcachedNioListener.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/tcp/GridTcpMemcachedNioListener.java new file mode 100644 index 0000000..8fbe916 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/tcp/GridTcpMemcachedNioListener.java @@ -0,0 +1,442 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.rest.protocols.tcp; + +import org.apache.ignite.*; +import org.apache.ignite.internal.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.marshaller.*; +import org.apache.ignite.marshaller.jdk.*; +import org.apache.ignite.internal.processors.rest.*; +import org.apache.ignite.internal.processors.rest.handlers.cache.*; +import org.apache.ignite.internal.processors.rest.request.*; +import org.apache.ignite.internal.util.future.*; +import org.apache.ignite.internal.util.lang.*; +import org.apache.ignite.internal.util.nio.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.jetbrains.annotations.*; + +import java.util.*; + +import static org.apache.ignite.internal.processors.rest.GridRestCommand.*; +import static org.apache.ignite.internal.processors.rest.protocols.tcp.GridMemcachedMessage.*; +import static org.apache.ignite.internal.util.nio.GridNioSessionMetaKey.*; + +/** + * Handles memcache requests. + */ +public class GridTcpMemcachedNioListener extends GridNioServerListenerAdapter<GridMemcachedMessage> { + /** Logger */ + private final IgniteLogger log; + + /** Handler. */ + private final GridRestProtocolHandler hnd; + + /** JDK marshaller. */ + private final IgniteMarshaller jdkMarshaller = new IgniteJdkMarshaller(); + + /** Context. */ + private final GridKernalContext ctx; + + /** + * Creates listener which will convert incoming tcp packets to rest requests and forward them to + * a given rest handler. + * + * @param log Logger to use. + * @param hnd Rest handler. + * @param ctx Context. + */ + public GridTcpMemcachedNioListener(IgniteLogger log, GridRestProtocolHandler hnd, GridKernalContext ctx) { + this.log = log; + this.hnd = hnd; + this.ctx = ctx; + } + + /** {@inheritDoc} */ + @Override public void onConnected(GridNioSession ses) { + // No-op, never called. + assert false; + } + + /** {@inheritDoc} */ + @Override public void onDisconnected(GridNioSession ses, @Nullable Exception e) { + // No-op, never called. + assert false; + } + + /** {@inheritDoc} */ + @SuppressWarnings({"IfMayBeConditional"}) + @Override public void onMessage(final GridNioSession ses, final GridMemcachedMessage req) { + assert req != null; + + final GridTuple3<GridRestCommand, Boolean, Boolean> cmd = command(req.operationCode()); + + if (cmd == null) { + U.warn(log, "Cannot find corresponding REST command for op code (session will be closed) [ses=" + ses + + ", opCode=" + Integer.toHexString(req.operationCode()) + ']'); + + ses.close(); + + return; + } + + assert req.requestFlag() == MEMCACHE_REQ_FLAG; + assert cmd.get2() != null && cmd.get3() != null; + + // Close connection on 'Quit' command. + if (cmd.get1() == QUIT) { + try { + if (cmd.get2()) { + GridMemcachedMessage res = new GridMemcachedMessage(req); + + sendResponse(ses, res).get(); + } + } + // Catch all when quitting. + catch (Exception e) { + U.warn(log, "Failed to send quit response packet (session will be closed anyway) [ses=" + ses + + ", msg=" + e.getMessage() + "]"); + } + finally { + ses.close(); + } + + return; + } + + IgniteFuture<GridRestResponse> lastFut = ses.removeMeta(LAST_FUT.ordinal()); + + if (lastFut != null && lastFut.isDone()) + lastFut = null; + + IgniteFuture<GridRestResponse> f; + + if (lastFut == null) + f = handleRequest0(ses, req, cmd); + else { + f = new GridEmbeddedFuture<>( + lastFut, + new C2<GridRestResponse, Exception, IgniteFuture<GridRestResponse>>() { + @Override public IgniteFuture<GridRestResponse> apply(GridRestResponse res, Exception e) { + return handleRequest0(ses, req, cmd); + } + }, + ctx); + } + + if (f != null) + ses.addMeta(LAST_FUT.ordinal(), f); + } + + /** + * @param ses Session. + * @param req Request. + * @param cmd Command. + * @return Future or {@code null} if processed immediately. + */ + @Nullable private IgniteFuture<GridRestResponse> handleRequest0( + final GridNioSession ses, + final GridMemcachedMessage req, + final GridTuple3<GridRestCommand, Boolean, Boolean> cmd + ) { + if (cmd.get1() == NOOP) { + GridMemcachedMessage res0 = new GridMemcachedMessage(req); + + res0.status(SUCCESS); + + sendResponse(ses, res0); + + return null; + } + + IgniteFuture<GridRestResponse> f = hnd.handleAsync(createRestRequest(req, cmd.get1())); + + f.listenAsync(new CIX1<IgniteFuture<GridRestResponse>>() { + @Override public void applyx(IgniteFuture<GridRestResponse> f) throws IgniteCheckedException { + GridRestResponse restRes = f.get(); + + // Handle 'Stat' command (special case because several packets are included in response). + if (cmd.get1() == CACHE_METRICS) { + assert restRes.getResponse() instanceof GridCacheRestMetrics; + + Map<String, Long> metrics = ((GridCacheRestMetrics)restRes.getResponse()).map(); + + for (Map.Entry<String, Long> e : metrics.entrySet()) { + GridMemcachedMessage res = new GridMemcachedMessage(req); + + res.key(e.getKey()); + + res.value(String.valueOf(e.getValue())); + + sendResponse(ses, res); + } + + sendResponse(ses, new GridMemcachedMessage(req)); + } + else { + GridMemcachedMessage res = new GridMemcachedMessage(req); + + if (restRes.getSuccessStatus() == GridRestResponse.STATUS_SUCCESS) { + switch (cmd.get1()) { + case CACHE_GET: { + res.status(restRes.getResponse() == null ? KEY_NOT_FOUND : SUCCESS); + + break; + } + + case CACHE_PUT: + case CACHE_ADD: + case CACHE_REMOVE: + case CACHE_REPLACE: + case CACHE_CAS: + case CACHE_APPEND: + case CACHE_PREPEND: { + boolean res0 = restRes.getResponse().equals(Boolean.TRUE); + + res.status(res0 ? SUCCESS : FAILURE); + + break; + } + + default: { + res.status(SUCCESS); + + break; + } + } + } + else + res.status(FAILURE); + + if (cmd.get3()) + res.key(req.key()); + + if (restRes.getSuccessStatus() == GridRestResponse.STATUS_SUCCESS && res.addData() && + restRes.getResponse() != null) + res.value(restRes.getResponse()); + + sendResponse(ses, res); + } + } + }); + + return f; + } + + /** + * @param ses NIO session. + * @param res Response. + * @return NIO send future. + */ + private GridNioFuture<?> sendResponse(GridNioSession ses, GridMemcachedMessage res) { + try { + GridMemcachedMessageWrapper wrapper = new GridMemcachedMessageWrapper(res, jdkMarshaller); + + return ses.send(wrapper); + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to marshal response: " + res, e); + + ses.close(); + + return new GridNioFinishedFuture<>(e); + } + } + + /** + * Creates REST request from the protocol request. + * + * @param req Request. + * @param cmd Command. + * @return REST request. + */ + @SuppressWarnings("unchecked") + private GridRestCacheRequest createRestRequest(GridMemcachedMessage req, GridRestCommand cmd) { + assert req != null; + + GridRestCacheRequest restReq = new GridRestCacheRequest(); + + restReq.command(cmd); + restReq.clientId(req.clientId()); + restReq.ttl(req.expiration()); + restReq.delta(req.delta()); + restReq.initial(req.initial()); + restReq.cacheName(req.cacheName()); + restReq.key(req.key()); + + if (cmd == CACHE_REMOVE_ALL) { + Object[] keys = (Object[]) req.value(); + + if (keys != null) { + Map<Object, Object> map = new HashMap<>(); + + for (Object key : keys) { + map.put(key, null); + } + + restReq.values(map); + } + } + else { + if (req.value() != null) + restReq.value(req.value()); + } + + return restReq; + } + + /** + * Gets command and command attributes from operation code. + * + * @param opCode Operation code. + * @return Command. + */ + @Nullable private GridTuple3<GridRestCommand, Boolean, Boolean> command(int opCode) { + GridRestCommand cmd; + boolean quiet = false; + boolean retKey = false; + + switch (opCode) { + case 0x00: + cmd = CACHE_GET; + + break; + case 0x01: + cmd = CACHE_PUT; + + break; + case 0x02: + cmd = CACHE_ADD; + + break; + case 0x03: + cmd = CACHE_REPLACE; + + break; + case 0x04: + cmd = CACHE_REMOVE; + + break; + case 0x05: + cmd = CACHE_INCREMENT; + + break; + case 0x06: + cmd = CACHE_DECREMENT; + + break; + case 0x07: + cmd = QUIT; + + break; + case 0x08: + cmd = CACHE_REMOVE_ALL; + + break; + case 0x09: + cmd = CACHE_GET; + + break; + case 0x0A: + cmd = NOOP; + + break; + case 0x0B: + cmd = VERSION; + + break; + case 0x0C: + cmd = CACHE_GET; + retKey = true; + + break; + case 0x0D: + cmd = CACHE_GET; + retKey = true; + + break; + case 0x0E: + cmd = CACHE_APPEND; + + break; + case 0x0F: + cmd = CACHE_PREPEND; + + break; + case 0x10: + cmd = CACHE_METRICS; + + break; + case 0x11: + cmd = CACHE_PUT; + quiet = true; + + break; + case 0x12: + cmd = CACHE_ADD; + quiet = true; + + break; + case 0x13: + cmd = CACHE_REPLACE; + quiet = true; + + break; + case 0x14: + cmd = CACHE_REMOVE; + quiet = true; + + break; + case 0x15: + cmd = CACHE_INCREMENT; + quiet = true; + + break; + case 0x16: + cmd = CACHE_DECREMENT; + quiet = true; + + break; + case 0x17: + cmd = QUIT; + quiet = true; + + break; + case 0x18: + cmd = CACHE_REMOVE_ALL; + quiet = true; + + break; + case 0x19: + cmd = CACHE_APPEND; + quiet = true; + + break; + case 0x1A: + cmd = CACHE_PREPEND; + quiet = true; + + break; + default: + return null; + } + + return new GridTuple3<>(cmd, quiet, retKey); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1b0e45a2/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/tcp/GridTcpRestDirectParser.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/tcp/GridTcpRestDirectParser.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/tcp/GridTcpRestDirectParser.java new file mode 100644 index 0000000..55d7c95 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/tcp/GridTcpRestDirectParser.java @@ -0,0 +1,519 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.rest.protocols.tcp; + +import org.apache.ignite.*; +import org.apache.ignite.client.marshaller.*; +import org.apache.ignite.internal.processors.rest.client.message.*; +import org.apache.ignite.internal.util.direct.*; +import org.apache.ignite.internal.util.nio.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.jetbrains.annotations.*; + +import java.io.*; +import java.nio.*; +import java.nio.charset.*; +import java.util.*; + +import static org.apache.ignite.internal.processors.rest.protocols.tcp.GridMemcachedMessage.*; +import static org.apache.ignite.internal.util.nio.GridNioSessionMetaKey.*; + +/** + * + */ +public class GridTcpRestDirectParser implements GridNioParser { + /** UTF-8 charset. */ + private static final Charset UTF_8 = Charset.forName("UTF-8"); + + /** Message metadata key. */ + private static final int MSG_META_KEY = GridNioSessionMetaKey.nextUniqueKey(); + + /** Protocol handler. */ + private final GridTcpRestProtocol proto; + + /** Message reader. */ + private final GridNioMessageReader msgReader; + + /** + * @param proto Protocol handler. + * @param msgReader Message reader. + */ + public GridTcpRestDirectParser(GridTcpRestProtocol proto, GridNioMessageReader msgReader) { + this.proto = proto; + this.msgReader = msgReader; + } + + /** {@inheritDoc} */ + @Nullable @Override public Object decode(GridNioSession ses, ByteBuffer buf) throws IOException, IgniteCheckedException { + ParserState state = ses.removeMeta(PARSER_STATE.ordinal()); + + if (state != null) { + assert state.packetType() == GridClientPacketType.MEMCACHE; + + Object memcacheMsg = parseMemcachePacket(ses, buf, state); + + if (memcacheMsg == null) + ses.addMeta(PARSER_STATE.ordinal(), state); + + return memcacheMsg; + } + + GridTcpCommunicationMessageAdapter msg = ses.removeMeta(MSG_META_KEY); + + if (msg == null && buf.hasRemaining()) { + byte type = buf.get(buf.position()); + + if (type == GridClientMessageWrapper.REQ_HEADER) { + buf.get(); + + msg = new GridClientMessageWrapper(); + } + else if (type == GridClientHandshakeRequestWrapper.HANDSHAKE_HEADER) { + buf.get(); + + msg = new GridClientHandshakeRequestWrapper(); + } + else if (type == MEMCACHE_REQ_FLAG) { + state = new ParserState(); + + state.packet(new GridMemcachedMessage()); + state.packetType(GridClientPacketType.MEMCACHE); + + Object memcacheMsg = parseMemcachePacket(ses, buf, state); + + if (memcacheMsg == null) + ses.addMeta(PARSER_STATE.ordinal(), state); + + return memcacheMsg; + } + else + throw new IOException("Invalid message type: " + type); + } + + boolean finished = false; + + if (buf.hasRemaining()) + finished = msgReader.read(null, msg, buf); + + if (finished) { + if (msg instanceof GridClientMessageWrapper) { + GridClientMessageWrapper clientMsg = (GridClientMessageWrapper)msg; + + if (clientMsg.messageSize() == 0) + return GridClientPingPacket.PING_MESSAGE; + + GridClientMarshaller marsh = proto.marshaller(ses); + + GridClientMessage ret = marsh.unmarshal(clientMsg.messageArray()); + + ret.requestId(clientMsg.requestId()); + ret.clientId(clientMsg.clientId()); + ret.destinationId(clientMsg.destinationId()); + + return ret; + } + else { + assert msg instanceof GridClientHandshakeRequestWrapper; + + GridClientHandshakeRequestWrapper req = (GridClientHandshakeRequestWrapper)msg; + + GridClientHandshakeRequest ret = new GridClientHandshakeRequest(); + + ret.putBytes(req.bytes(), 0, 4); + + return ret; + } + } + else { + ses.addMeta(MSG_META_KEY, msg); + + return null; + } + } + + /** {@inheritDoc} */ + @Override public ByteBuffer encode(GridNioSession ses, Object msg) throws IOException, IgniteCheckedException { + // No encoding needed for direct messages. + throw new UnsupportedEncodingException(); + } + + /** + * Parses memcache protocol message. + * + * @param ses Session. + * @param buf Buffer containing not parsed bytes. + * @param state Current parser state. + * @return Parsed packet.s + * @throws IOException If packet cannot be parsed. + * @throws IgniteCheckedException If deserialization error occurred. + */ + @Nullable private GridClientMessage parseMemcachePacket(GridNioSession ses, ByteBuffer buf, ParserState state) + throws IOException, IgniteCheckedException { + assert state.packetType() == GridClientPacketType.MEMCACHE; + assert state.packet() != null; + assert state.packet() instanceof GridMemcachedMessage; + + GridMemcachedMessage req = (GridMemcachedMessage)state.packet(); + ByteArrayOutputStream tmp = state.buffer(); + int i = state.index(); + + while (buf.remaining() > 0) { + byte b = buf.get(); + + if (i == 0) + req.requestFlag(b); + else if (i == 1) + req.operationCode(b); + else if (i == 2 || i == 3) { + tmp.write(b); + + if (i == 3) { + req.keyLength(U.bytesToShort(tmp.toByteArray(), 0)); + + tmp.reset(); + } + } + else if (i == 4) + req.extrasLength(b); + else if (i >= 8 && i <= 11) { + tmp.write(b); + + if (i == 11) { + req.totalLength(U.bytesToInt(tmp.toByteArray(), 0)); + + tmp.reset(); + } + } + else if (i >= 12 && i <= 15) { + tmp.write(b); + + if (i == 15) { + req.opaque(tmp.toByteArray()); + + tmp.reset(); + } + } + else if (i >= HDR_LEN && i < HDR_LEN + req.extrasLength()) { + tmp.write(b); + + if (i == HDR_LEN + req.extrasLength() - 1) { + req.extras(tmp.toByteArray()); + + tmp.reset(); + } + } + else if (i >= HDR_LEN + req.extrasLength() && + i < HDR_LEN + req.extrasLength() + req.keyLength()) { + tmp.write(b); + + if (i == HDR_LEN + req.extrasLength() + req.keyLength() - 1) { + req.key(tmp.toByteArray()); + + tmp.reset(); + } + } + else if (i >= HDR_LEN + req.extrasLength() + req.keyLength() && + i < HDR_LEN + req.totalLength()) { + tmp.write(b); + + if (i == HDR_LEN + req.totalLength() - 1) { + req.value(tmp.toByteArray()); + + tmp.reset(); + } + } + + if (i == HDR_LEN + req.totalLength() - 1) + // Assembled the packet. + return assemble(ses, req); + + i++; + } + + state.index(i); + + return null; + } + + /** + * Validates incoming packet and deserializes all fields that need to be deserialized. + * + * @param ses Session on which packet is being parsed. + * @param req Raw packet. + * @return Same packet with fields deserialized. + * @throws IOException If parsing failed. + * @throws IgniteCheckedException If deserialization failed. + */ + private GridClientMessage assemble(GridNioSession ses, GridMemcachedMessage req) throws IOException, IgniteCheckedException { + byte[] extras = req.extras(); + + // First, decode key and value, if any + if (req.key() != null || req.value() != null) { + short keyFlags = 0; + short valFlags = 0; + + if (req.hasFlags()) { + if (extras == null || extras.length < FLAGS_LENGTH) + throw new IOException("Failed to parse incoming packet (flags required for command) [ses=" + + ses + ", opCode=" + Integer.toHexString(req.operationCode() & 0xFF) + ']'); + + keyFlags = U.bytesToShort(extras, 0); + valFlags = U.bytesToShort(extras, 2); + } + + if (req.key() != null) { + assert req.key() instanceof byte[]; + + byte[] rawKey = (byte[])req.key(); + + // Only values can be hessian-encoded. + req.key(decodeObj(keyFlags, rawKey)); + } + + if (req.value() != null) { + assert req.value() instanceof byte[]; + + byte[] rawVal = (byte[])req.value(); + + req.value(decodeObj(valFlags, rawVal)); + } + } + + if (req.hasExpiration()) { + if (extras == null || extras.length < 8) + throw new IOException("Failed to parse incoming packet (expiration value required for command) [ses=" + + ses + ", opCode=" + Integer.toHexString(req.operationCode() & 0xFF) + ']'); + + req.expiration(U.bytesToInt(extras, 4) & 0xFFFFFFFFL); + } + + if (req.hasInitial()) { + if (extras == null || extras.length < 16) + throw new IOException("Failed to parse incoming packet (initial value required for command) [ses=" + + ses + ", opCode=" + Integer.toHexString(req.operationCode() & 0xFF) + ']'); + + req.initial(U.bytesToLong(extras, 8)); + } + + if (req.hasDelta()) { + if (extras == null || extras.length < 8) + throw new IOException("Failed to parse incoming packet (delta value required for command) [ses=" + + ses + ", opCode=" + Integer.toHexString(req.operationCode() & 0xFF) + ']'); + + req.delta(U.bytesToLong(extras, 0)); + } + + if (extras != null) { + // Clients that include cache name must always include flags. + int len = 4; + + if (req.hasExpiration()) + len += 4; + + if (req.hasDelta()) + len += 8; + + if (req.hasInitial()) + len += 8; + + if (extras.length - len > 0) { + byte[] cacheName = new byte[extras.length - len]; + + U.arrayCopy(extras, len, cacheName, 0, extras.length - len); + + req.cacheName(new String(cacheName, UTF_8)); + } + } + + return req; + } + + /** + * Decodes value from a given byte array to the object according to the flags given. + * + * @param flags Flags. + * @param bytes Byte array to decode. + * @return Decoded value. + * @throws IgniteCheckedException If deserialization failed. + */ + private Object decodeObj(short flags, byte[] bytes) throws IgniteCheckedException { + assert bytes != null; + + if ((flags & SERIALIZED_FLAG) != 0) + return proto.jdkMarshaller().unmarshal(bytes, null); + + int masked = flags & 0xff00; + + switch (masked) { + case BOOLEAN_FLAG: + return bytes[0] == '1'; + case INT_FLAG: + return U.bytesToInt(bytes, 0); + case LONG_FLAG: + return U.bytesToLong(bytes, 0); + case DATE_FLAG: + return new Date(U.bytesToLong(bytes, 0)); + case BYTE_FLAG: + return bytes[0]; + case FLOAT_FLAG: + return Float.intBitsToFloat(U.bytesToInt(bytes, 0)); + case DOUBLE_FLAG: + return Double.longBitsToDouble(U.bytesToLong(bytes, 0)); + case BYTE_ARR_FLAG: + return bytes; + default: + return new String(bytes, UTF_8); + } + } + + /** + * Holder for parser state and temporary buffer. + */ + protected static class ParserState { + /** Parser index. */ + private int idx; + + /** Temporary data buffer. */ + private ByteArrayOutputStream buf = new ByteArrayOutputStream(); + + /** Packet being assembled. */ + private GridClientMessage packet; + + /** Packet type. */ + private GridClientPacketType packetType; + + /** Header data. */ + private HeaderData hdr; + + /** + * @return Stored parser index. + */ + public int index() { + return idx; + } + + /** + * @param idx Index to store. + */ + public void index(int idx) { + this.idx = idx; + } + + /** + * @return Temporary data buffer. + */ + public ByteArrayOutputStream buffer() { + return buf; + } + + /** + * @return Pending packet. + */ + @Nullable public GridClientMessage packet() { + return packet; + } + + /** + * @param packet Pending packet. + */ + public void packet(GridClientMessage packet) { + assert this.packet == null; + + this.packet = packet; + } + + /** + * @return Pending packet type. + */ + public GridClientPacketType packetType() { + return packetType; + } + + /** + * @param packetType Pending packet type. + */ + public void packetType(GridClientPacketType packetType) { + this.packetType = packetType; + } + + /** + * @return Header. + */ + public HeaderData header() { + return hdr; + } + + /** + * @param hdr Header. + */ + public void header(HeaderData hdr) { + this.hdr = hdr; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(ParserState.class, this); + } + } + + /** + * Header. + */ + protected static class HeaderData { + /** Request Id. */ + private final long reqId; + + /** Request Id. */ + private final UUID clientId; + + /** Request Id. */ + private final UUID destId; + + /** + * @param reqId Request Id. + * @param clientId Client Id. + * @param destId Destination Id. + */ + private HeaderData(long reqId, UUID clientId, UUID destId) { + this.reqId = reqId; + this.clientId = clientId; + this.destId = destId; + } + + /** + * @return Request Id. + */ + public long reqId() { + return reqId; + } + + /** + * @return Client Id. + */ + public UUID clientId() { + return clientId; + } + + /** + * @return Destination Id. + */ + public UUID destinationId() { + return destId; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1b0e45a2/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/tcp/GridTcpRestNioListener.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/tcp/GridTcpRestNioListener.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/tcp/GridTcpRestNioListener.java new file mode 100644 index 0000000..64a3a68 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/tcp/GridTcpRestNioListener.java @@ -0,0 +1,386 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.rest.protocols.tcp; + +import org.apache.ignite.*; +import org.apache.ignite.internal.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.client.marshaller.*; +import org.apache.ignite.internal.processors.rest.*; +import org.apache.ignite.internal.processors.rest.client.message.*; +import org.apache.ignite.internal.processors.rest.handlers.cache.*; +import org.apache.ignite.internal.processors.rest.request.*; +import org.apache.ignite.internal.util.nio.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.jetbrains.annotations.*; + +import java.io.*; +import java.nio.*; +import java.util.*; +import java.util.concurrent.*; + +import static org.apache.ignite.internal.processors.rest.GridRestCommand.*; +import static org.apache.ignite.internal.processors.rest.client.message.GridClientCacheRequest.GridCacheOperation.*; +import static org.apache.ignite.internal.processors.rest.client.message.GridClientHandshakeResponse.*; +import static org.apache.ignite.internal.util.nio.GridNioSessionMetaKey.*; + +/** + * Listener for nio server that handles incoming tcp rest packets. + */ +public class GridTcpRestNioListener extends GridNioServerListenerAdapter<GridClientMessage> { + /** Mapping of {@code GridCacheOperation} to {@code GridRestCommand}. */ + private static final Map<GridClientCacheRequest.GridCacheOperation, GridRestCommand> cacheCmdMap = + new EnumMap<>(GridClientCacheRequest.GridCacheOperation.class); + + /** Supported protocol versions. */ + private static final Collection<Short> SUPP_VERS = new HashSet<>(); + + /** + * Fills {@code cacheCmdMap}. + */ + static { + cacheCmdMap.put(PUT, CACHE_PUT); + cacheCmdMap.put(PUT_ALL, CACHE_PUT_ALL); + cacheCmdMap.put(GET, CACHE_GET); + cacheCmdMap.put(GET_ALL, CACHE_GET_ALL); + cacheCmdMap.put(RMV, CACHE_REMOVE); + cacheCmdMap.put(RMV_ALL, CACHE_REMOVE_ALL); + cacheCmdMap.put(REPLACE, CACHE_REPLACE); + cacheCmdMap.put(CAS, CACHE_CAS); + cacheCmdMap.put(METRICS, CACHE_METRICS); + cacheCmdMap.put(APPEND, CACHE_APPEND); + cacheCmdMap.put(PREPEND, CACHE_PREPEND); + + SUPP_VERS.add((short)1); + } + + /** */ + private final CountDownLatch marshMapLatch = new CountDownLatch(1); + + /** Marshallers map. */ + private Map<Byte, GridClientMarshaller> marshMap; + + /** Logger. */ + private IgniteLogger log; + + /** Protocol. */ + private GridTcpRestProtocol proto; + + /** Protocol handler. */ + private GridRestProtocolHandler hnd; + + /** Handler for all memcache requests */ + private GridTcpMemcachedNioListener memcachedLsnr; + + /** + * Creates listener which will convert incoming tcp packets to rest requests and forward them to + * a given rest handler. + * + * @param log Logger to use. + * @param proto Protocol. + * @param hnd Rest handler. + * @param ctx Context. + */ + public GridTcpRestNioListener(IgniteLogger log, GridTcpRestProtocol proto, GridRestProtocolHandler hnd, + GridKernalContext ctx) { + memcachedLsnr = new GridTcpMemcachedNioListener(log, hnd, ctx); + + this.log = log; + this.proto = proto; + this.hnd = hnd; + } + + /** + * @param marshMap Marshallers. + */ + void marshallers(Map<Byte, GridClientMarshaller> marshMap) { + assert marshMap != null; + + this.marshMap = marshMap; + + marshMapLatch.countDown(); + } + + /** {@inheritDoc} */ + @Override public void onConnected(GridNioSession ses) { + // No-op. + } + + /** {@inheritDoc} */ + @Override public void onDisconnected(GridNioSession ses, @Nullable Exception e) { + if (e != null) { + if (e instanceof RuntimeException) + U.error(log, "Failed to process request from remote client: " + ses, e); + else + U.warn(log, "Closed client session due to exception [ses=" + ses + ", msg=" + e.getMessage() + ']'); + } + } + + /** {@inheritDoc} */ + @SuppressWarnings("ConstantConditions") + @Override public void onMessage(final GridNioSession ses, final GridClientMessage msg) { + if (msg instanceof GridMemcachedMessage) + memcachedLsnr.onMessage(ses, (GridMemcachedMessage)msg); + else { + if (msg == GridClientPingPacket.PING_MESSAGE) + ses.send(new GridClientPingPacketWrapper()); + else if (msg instanceof GridClientHandshakeRequest) { + GridClientHandshakeRequest hs = (GridClientHandshakeRequest)msg; + + short ver = hs.version(); + + if (!SUPP_VERS.contains(ver)) { + U.error(log, "Client protocol version is not supported [ses=" + ses + + ", ver=" + ver + + ", supported=" + SUPP_VERS + ']'); + + ses.close(); + } + else { + byte marshId = hs.marshallerId(); + + if (marshMapLatch.getCount() > 0) + U.awaitQuiet(marshMapLatch); + + GridClientMarshaller marsh = marshMap.get(marshId); + + if (marsh == null) { + U.error(log, "Client marshaller ID is invalid. Note that .NET and C++ clients " + + "are supported only in enterprise edition [ses=" + ses + ", marshId=" + marshId + ']'); + + ses.close(); + } + else { + ses.addMeta(MARSHALLER.ordinal(), marsh); + + ses.send(new GridClientHandshakeResponseWrapper(CODE_OK)); + } + } + } + else { + final GridRestRequest req = createRestRequest(ses, msg); + + if (req != null) + hnd.handleAsync(req).listenAsync(new CI1<IgniteFuture<GridRestResponse>>() { + @Override public void apply(IgniteFuture<GridRestResponse> fut) { + GridClientResponse res = new GridClientResponse(); + + res.requestId(msg.requestId()); + res.clientId(msg.clientId()); + + try { + GridRestResponse restRes = fut.get(); + + res.sessionToken(restRes.sessionTokenBytes()); + res.successStatus(restRes.getSuccessStatus()); + res.errorMessage(restRes.getError()); + + Object o = restRes.getResponse(); + + // In case of metrics a little adjustment is needed. + if (o instanceof GridCacheRestMetrics) + o = ((GridCacheRestMetrics)o).map(); + + res.result(o); + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to process client request: " + msg, e); + + res.successStatus(GridClientResponse.STATUS_FAILED); + res.errorMessage("Failed to process client request: " + e.getMessage()); + } + + GridClientMessageWrapper wrapper = new GridClientMessageWrapper(); + + wrapper.requestId(msg.requestId()); + wrapper.clientId(msg.clientId()); + + try { + ByteBuffer bytes = proto.marshaller(ses).marshal(res, 0); + + wrapper.message(bytes); + + wrapper.messageSize(bytes.remaining() + 40); + } + catch (IOException e) { + U.error(log, "Failed to marshal response: " + res, e); + + ses.close(); + + return; + } + + ses.send(wrapper); + } + }); + else + U.error(log, "Failed to process client request (unknown packet type) [ses=" + ses + + ", msg=" + msg + ']'); + } + } + } + + /** + * Creates a REST request object from client TCP binary packet. + * + * @param ses NIO session. + * @param msg Request message. + * @return REST request object. + */ + @Nullable private GridRestRequest createRestRequest(GridNioSession ses, GridClientMessage msg) { + GridRestRequest restReq = null; + + if (msg instanceof GridClientAuthenticationRequest) { + GridClientAuthenticationRequest req = (GridClientAuthenticationRequest)msg; + + restReq = new GridRestTaskRequest(); + + restReq.command(NOOP); + + restReq.credentials(req.credentials()); + } + else if (msg instanceof GridClientCacheRequest) { + GridClientCacheRequest req = (GridClientCacheRequest)msg; + + GridRestCacheRequest restCacheReq = new GridRestCacheRequest(); + + restCacheReq.cacheName(req.cacheName()); + restCacheReq.cacheFlags(req.cacheFlagsOn()); + + restCacheReq.key(req.key()); + restCacheReq.value(req.value()); + restCacheReq.value2(req.value2()); + restCacheReq.portableMode(proto.portableMode(ses)); + + Map vals = req.values(); + if (vals != null) + restCacheReq.values(new HashMap<Object, Object>(vals)); + + restCacheReq.command(cacheCmdMap.get(req.operation())); + + restReq = restCacheReq; + } + else if (msg instanceof GridClientCacheQueryRequest) { + GridClientCacheQueryRequest req = (GridClientCacheQueryRequest) msg; + + restReq = new GridRestCacheQueryRequest(req); + + switch (req.operation()) { + case EXECUTE: + restReq.command(CACHE_QUERY_EXECUTE); + + break; + + case FETCH: + restReq.command(CACHE_QUERY_FETCH); + break; + + case REBUILD_INDEXES: + restReq.command(CACHE_QUERY_REBUILD_INDEXES); + + break; + + default: + throw new IllegalArgumentException("Unknown query operation: " + req.operation()); + } + } + else if (msg instanceof GridClientTaskRequest) { + GridClientTaskRequest req = (GridClientTaskRequest) msg; + + GridRestTaskRequest restTaskReq = new GridRestTaskRequest(); + + restTaskReq.command(EXE); + + restTaskReq.taskName(req.taskName()); + restTaskReq.params(Arrays.asList(req.argument())); + restTaskReq.keepPortables(req.keepPortables()); + restTaskReq.portableMode(proto.portableMode(ses)); + + restReq = restTaskReq; + } + else if (msg instanceof GridClientGetMetaDataRequest) { + GridClientGetMetaDataRequest req = (GridClientGetMetaDataRequest)msg; + + restReq = new GridRestPortableGetMetaDataRequest(req); + + restReq.command(GET_PORTABLE_METADATA); + } + else if (msg instanceof GridClientPutMetaDataRequest) { + GridClientPutMetaDataRequest req = (GridClientPutMetaDataRequest)msg; + + restReq = new GridRestPortablePutMetaDataRequest(req); + + restReq.command(PUT_PORTABLE_METADATA); + } + else if (msg instanceof GridClientTopologyRequest) { + GridClientTopologyRequest req = (GridClientTopologyRequest) msg; + + GridRestTopologyRequest restTopReq = new GridRestTopologyRequest(); + + restTopReq.includeMetrics(req.includeMetrics()); + restTopReq.includeAttributes(req.includeAttributes()); + + if (req.nodeId() != null) { + restTopReq.command(NODE); + + restTopReq.nodeId(req.nodeId()); + } + else if (req.nodeIp() != null) { + restTopReq.command(NODE); + + restTopReq.nodeIp(req.nodeIp()); + } + else + restTopReq.command(TOPOLOGY); + + restReq = restTopReq; + } + else if (msg instanceof GridClientLogRequest) { + GridClientLogRequest req = (GridClientLogRequest) msg; + + GridRestLogRequest restLogReq = new GridRestLogRequest(); + + restLogReq.command(LOG); + + restLogReq.path(req.path()); + restLogReq.from(req.from()); + restLogReq.to(req.to()); + + restReq = restLogReq; + } + + if (restReq != null) { + restReq.destinationId(msg.destinationId()); + restReq.clientId(msg.clientId()); + restReq.sessionToken(msg.sessionToken()); + restReq.address(ses.remoteAddress()); + } + + return restReq; + } + + /** + * Closes the session by timeout (i.e. inactivity within the configured period of time). + * + * @param ses Session, that was inactive. + */ + @Override public void onSessionIdleTimeout(GridNioSession ses) { + ses.close(); + } +}
