http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1b0e45a2/modules/core/src/main/java/org/gridgain/grid/kernal/processors/rest/protocols/tcp/GridTcpRestParser.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/rest/protocols/tcp/GridTcpRestParser.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/rest/protocols/tcp/GridTcpRestParser.java deleted file mode 100644 index 736339a..0000000 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/rest/protocols/tcp/GridTcpRestParser.java +++ /dev/null @@ -1,878 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.gridgain.grid.kernal.processors.rest.protocols.tcp; - -import org.apache.ignite.*; -import org.apache.ignite.internal.util.*; -import org.apache.ignite.marshaller.*; -import org.apache.ignite.marshaller.jdk.*; -import org.apache.ignite.client.marshaller.*; -import org.gridgain.grid.kernal.processors.rest.client.message.*; -import org.apache.ignite.internal.util.nio.*; -import org.apache.ignite.internal.util.typedef.internal.*; -import org.jetbrains.annotations.*; - -import java.io.*; -import java.nio.*; -import java.nio.charset.*; -import java.util.*; - -import static org.gridgain.grid.kernal.processors.rest.protocols.tcp.GridMemcachedMessage.*; -import static org.apache.ignite.internal.util.nio.GridNioSessionMetaKey.*; - -/** - * Parser for extended memcache protocol. Handles parsing and encoding activity. - */ -public class GridTcpRestParser implements GridNioParser { - /** UTF-8 charset. */ - private static final Charset UTF_8 = Charset.forName("UTF-8"); - - /** JDK marshaller. */ - private final IgniteMarshaller jdkMarshaller = new IgniteJdkMarshaller(); - - /** {@inheritDoc} */ - @Nullable @Override public GridClientMessage decode(GridNioSession ses, ByteBuffer buf) throws IOException, - IgniteCheckedException { - ParserState state = ses.removeMeta(PARSER_STATE.ordinal()); - - if (state == null) - state = new ParserState(); - - GridClientPacketType type = state.packetType(); - - if (type == null) { - byte hdr = buf.get(buf.position()); - - switch (hdr) { - case MEMCACHE_REQ_FLAG: - state.packet(new GridMemcachedMessage()); - state.packetType(GridClientPacketType.MEMCACHE); - - break; - - case GRIDGAIN_REQ_FLAG: - // Skip header. - buf.get(); - - state.packetType(GridClientPacketType.GRIDGAIN); - - break; - - case GRIDGAIN_HANDSHAKE_FLAG: - // Skip header. - buf.get(); - - state.packetType(GridClientPacketType.GRIDGAIN_HANDSHAKE); - - break; - - default: - throw new IOException("Failed to parse incoming packet (invalid packet start) [ses=" + ses + - ", b=" + Integer.toHexString(hdr & 0xFF) + ']'); - } - } - - GridClientMessage res = null; - - switch (state.packetType()) { - case MEMCACHE: - res = parseMemcachePacket(ses, buf, state); - - break; - - case GRIDGAIN_HANDSHAKE: - res = parseHandshake(buf, state); - - break; - - case GRIDGAIN: - res = parseCustomPacket(ses, buf, state); - - break; - } - - if (res == null) - // Packet was not fully parsed yet. - ses.addMeta(PARSER_STATE.ordinal(), state); - - return res; - } - - /** {@inheritDoc} */ - @Override public ByteBuffer encode(GridNioSession ses, Object msg0) throws IOException, IgniteCheckedException { - assert msg0 != null; - - GridClientMessage msg = (GridClientMessage)msg0; - - if (msg instanceof GridMemcachedMessage) - return encodeMemcache((GridMemcachedMessage)msg); - else if (msg == GridClientPingPacket.PING_MESSAGE) - return ByteBuffer.wrap(GridClientPingPacket.PING_PACKET); - else if (msg instanceof GridClientHandshakeResponse) - return ByteBuffer.wrap(new byte[] { - ((GridClientHandshakeResponse)msg).resultCode() - }); - else { - GridClientMarshaller marsh = marshaller(ses); - - ByteBuffer res = marsh.marshal(msg, 45); - - ByteBuffer slice = res.slice(); - - slice.put(GRIDGAIN_REQ_FLAG); - slice.putInt(res.remaining() - 5); - slice.putLong(msg.requestId()); - slice.put(U.uuidToBytes(msg.clientId())); - slice.put(U.uuidToBytes(msg.destinationId())); - - return res; - } - } - - /** - * Parses memcache protocol message. - * - * @param ses Session. - * @param buf Buffer containing not parsed bytes. - * @param state Current parser state. - * @return Parsed packet.s - * @throws IOException If packet cannot be parsed. - * @throws IgniteCheckedException If deserialization error occurred. - */ - @Nullable private GridClientMessage parseMemcachePacket(GridNioSession ses, ByteBuffer buf, ParserState state) - throws IOException, IgniteCheckedException { - assert state.packetType() == GridClientPacketType.MEMCACHE; - assert state.packet() != null; - assert state.packet() instanceof GridMemcachedMessage; - - GridMemcachedMessage req = (GridMemcachedMessage)state.packet(); - ByteArrayOutputStream tmp = state.buffer(); - int i = state.index(); - - while (buf.remaining() > 0) { - byte b = buf.get(); - - if (i == 0) - req.requestFlag(b); - else if (i == 1) - req.operationCode(b); - else if (i == 2 || i == 3) { - tmp.write(b); - - if (i == 3) { - req.keyLength(U.bytesToShort(tmp.toByteArray(), 0)); - - tmp.reset(); - } - } - else if (i == 4) - req.extrasLength(b); - else if (i >= 8 && i <= 11) { - tmp.write(b); - - if (i == 11) { - req.totalLength(U.bytesToInt(tmp.toByteArray(), 0)); - - tmp.reset(); - } - } - else if (i >= 12 && i <= 15) { - tmp.write(b); - - if (i == 15) { - req.opaque(tmp.toByteArray()); - - tmp.reset(); - } - } - else if (i >= HDR_LEN && i < HDR_LEN + req.extrasLength()) { - tmp.write(b); - - if (i == HDR_LEN + req.extrasLength() - 1) { - req.extras(tmp.toByteArray()); - - tmp.reset(); - } - } - else if (i >= HDR_LEN + req.extrasLength() && - i < HDR_LEN + req.extrasLength() + req.keyLength()) { - tmp.write(b); - - if (i == HDR_LEN + req.extrasLength() + req.keyLength() - 1) { - req.key(tmp.toByteArray()); - - tmp.reset(); - } - } - else if (i >= HDR_LEN + req.extrasLength() + req.keyLength() && - i < HDR_LEN + req.totalLength()) { - tmp.write(b); - - if (i == HDR_LEN + req.totalLength() - 1) { - req.value(tmp.toByteArray()); - - tmp.reset(); - } - } - - if (i == HDR_LEN + req.totalLength() - 1) - // Assembled the packet. - return assemble(ses, req); - - i++; - } - - state.index(i); - - return null; - } - - /** - * Parses a client handshake, checking a client version and - * reading the marshaller protocol ID. - * - * @param buf Message bytes. - * @param state Parser state. - * @return True if a hint was parsed, false if still need more bytes to parse. - */ - @Nullable private GridClientMessage parseHandshake(ByteBuffer buf, ParserState state) { - assert state.packetType() == GridClientPacketType.GRIDGAIN_HANDSHAKE; - - int idx = state.index(); - - GridClientHandshakeRequest packet = (GridClientHandshakeRequest)state.packet(); - - if (packet == null) { - packet = new GridClientHandshakeRequest(); - - state.packet(packet); - } - - int rem = buf.remaining(); - - if (rem > 0) { - byte[] bbuf = new byte[5]; // Buffer to read data to. - - int nRead = Math.min(rem, bbuf.length); // Number of bytes to read. - - buf.get(bbuf, 0, nRead); // Batch read from buffer. - - int nAvailable = nRead; // Number of available bytes. - - if (idx < 4) { // Need to read version bytes. - int len = Math.min(nRead, 4 - idx); // Number of version bytes available in buffer. - - packet.putBytes(bbuf, idx, len); - - idx += len; - state.index(idx); - nAvailable -= len; - } - - assert idx <= 4 : "Wrong idx: " + idx; - assert nAvailable == 0 || nAvailable == 1 : "Wrong nav: " + nAvailable; - - if (idx == 4 && nAvailable > 0) - return packet; - } - - return null; // Wait for more data. - } - - /** - * Parses custom packet serialized by GridGain marshaller. - * - * @param ses Session. - * @param buf Buffer containing not parsed bytes. - * @param state Parser state. - * @return Parsed message. - * @throws IOException If packet parsing or deserialization failed. - * @throws IgniteCheckedException If failed. - */ - @Nullable private GridClientMessage parseCustomPacket(GridNioSession ses, ByteBuffer buf, ParserState state) - throws IOException, IgniteCheckedException { - assert state.packetType() == GridClientPacketType.GRIDGAIN; - assert state.packet() == null; - - ByteArrayOutputStream tmp = state.buffer(); - - int len = state.index(); - - if (buf.remaining() > 0) { - if (len == 0) { // Don't know the size yet. - byte[] lenBytes = statefulRead(buf, tmp, 4); - - if (lenBytes != null) { - len = U.bytesToInt(lenBytes, 0); - - if (len == 0) - return GridClientPingPacket.PING_MESSAGE; - else if (len < 0) - throw new IOException("Failed to parse incoming packet (invalid packet length) [ses=" + ses + - ", len=" + len + ']'); - - state.index(len); - } - } - - if (len > 0 && state.header() == null) { - byte[] hdrBytes = statefulRead(buf, tmp, 40); - - if (hdrBytes != null) { - long reqId = GridClientByteUtils.bytesToLong(hdrBytes, 0); - UUID clientId = GridClientByteUtils.bytesToUuid(hdrBytes, 8); - UUID destId = GridClientByteUtils.bytesToUuid(hdrBytes, 24); - - state.header(new HeaderData(reqId, clientId, destId)); - } - } - - if (len > 0 && state.header() != null) { - final int packetSize = len - 40; - - if (tmp.size() + buf.remaining() >= packetSize) { - if (buf.remaining() > 0) { - byte[] bodyBytes = new byte[packetSize - tmp.size()]; - - buf.get(bodyBytes); - - tmp.write(bodyBytes); - } - - return parseClientMessage(ses, state); - } - else - copyRemaining(buf, tmp); - } - } - - return null; - } - - /** - * Tries to read the specified amount of bytes using intermediate buffer. Stores - * the bytes to intermediate buffer, if size requirement is not met. - * - * @param buf Byte buffer to read from. - * @param intBuf Intermediate buffer to read bytes from and to save remaining bytes to. - * @param size Number of bytes to read. - * @return Resulting byte array or {@code null}, if both buffers contain less bytes - * than required. In case of non-null result, the intermediate buffer is empty. - * In case of {@code null} result, the input buffer is empty (read fully). - * @throws IOException If IO error occurs. - */ - @Nullable private byte[] statefulRead(ByteBuffer buf, ByteArrayOutputStream intBuf, int size) throws IOException { - if (intBuf.size() + buf.remaining() >= size) { - int off = 0; - byte[] bytes = new byte[size]; - - if (intBuf.size() > 0) { - assert intBuf.size() < size; - - byte[] tmpBytes = intBuf.toByteArray(); - - System.arraycopy(tmpBytes, 0, bytes, 0, tmpBytes.length); - - off = intBuf.size(); - - intBuf.reset(); - } - - buf.get(bytes, off, size - off); - - return bytes; - } - else { - copyRemaining(buf, intBuf); - - return null; - } - } - - /** - * Copies remaining bytes from byte buffer to output stream. - * - * @param src Source buffer. - * @param dest Destination stream. - * @throws IOException If IO error occurs. - */ - private void copyRemaining(ByteBuffer src, OutputStream dest) throws IOException { - byte[] b = new byte[src.remaining()]; - - src.get(b); - - dest.write(b); - } - - /** - * Parses {@link GridClientMessage} from raw bytes. - * - * @param ses Session. - * @param state Parser state. - * @return A parsed client message. - * @throws IOException On marshaller error. - * @throws IgniteCheckedException If no marshaller was defined for the session. - */ - protected GridClientMessage parseClientMessage(GridNioSession ses, ParserState state) throws IOException, IgniteCheckedException { - GridClientMarshaller marsh = marshaller(ses); - - GridClientMessage msg = marsh.unmarshal(state.buffer().toByteArray()); - - msg.requestId(state.header().reqId()); - msg.clientId(state.header().clientId()); - msg.destinationId(state.header().destinationId()); - - return msg; - } - - /** - * Encodes memcache message to a raw byte array. - * - * @param msg Message being serialized. - * @return Serialized message. - * @throws IgniteCheckedException If serialization failed. - */ - private ByteBuffer encodeMemcache(GridMemcachedMessage msg) throws IgniteCheckedException { - GridByteArrayList res = new GridByteArrayList(HDR_LEN); - - int keyLen = 0; - - int keyFlags = 0; - - if (msg.key() != null) { - ByteArrayOutputStream rawKey = new ByteArrayOutputStream(); - - keyFlags = encodeObj(msg.key(), rawKey); - - msg.key(rawKey.toByteArray()); - - keyLen = rawKey.size(); - } - - int dataLen = 0; - - int valFlags = 0; - - if (msg.value() != null) { - ByteArrayOutputStream rawVal = new ByteArrayOutputStream(); - - valFlags = encodeObj(msg.value(), rawVal); - - msg.value(rawVal.toByteArray()); - - dataLen = rawVal.size(); - } - - int flagsLen = 0; - - if (msg.addFlags())// || keyFlags > 0 || valFlags > 0) - flagsLen = FLAGS_LENGTH; - - res.add(MEMCACHE_RES_FLAG); - - res.add(msg.operationCode()); - - // Cast is required due to packet layout. - res.add((short)keyLen); - - // Cast is required due to packet layout. - res.add((byte)flagsLen); - - // Data type is always 0x00. - res.add((byte)0x00); - - res.add((short)msg.status()); - - res.add(keyLen + flagsLen + dataLen); - - res.add(msg.opaque(), 0, msg.opaque().length); - - // CAS, unused. - res.add(0L); - - assert res.size() == HDR_LEN; - - if (flagsLen > 0) { - res.add((short) keyFlags); - res.add((short) valFlags); - } - - assert msg.key() == null || msg.key() instanceof byte[]; - assert msg.value() == null || msg.value() instanceof byte[]; - - if (keyLen > 0) - res.add((byte[])msg.key(), 0, ((byte[])msg.key()).length); - - if (dataLen > 0) - res.add((byte[])msg.value(), 0, ((byte[])msg.value()).length); - - return ByteBuffer.wrap(res.entireArray()); - } - - /** - * Validates incoming packet and deserializes all fields that need to be deserialized. - * - * @param ses Session on which packet is being parsed. - * @param req Raw packet. - * @return Same packet with fields deserialized. - * @throws IOException If parsing failed. - * @throws IgniteCheckedException If deserialization failed. - */ - private GridClientMessage assemble(GridNioSession ses, GridMemcachedMessage req) throws IOException, IgniteCheckedException { - byte[] extras = req.extras(); - - // First, decode key and value, if any - if (req.key() != null || req.value() != null) { - short keyFlags = 0; - short valFlags = 0; - - if (req.hasFlags()) { - if (extras == null || extras.length < FLAGS_LENGTH) - throw new IOException("Failed to parse incoming packet (flags required for command) [ses=" + - ses + ", opCode=" + Integer.toHexString(req.operationCode() & 0xFF) + ']'); - - keyFlags = U.bytesToShort(extras, 0); - valFlags = U.bytesToShort(extras, 2); - } - - if (req.key() != null) { - assert req.key() instanceof byte[]; - - byte[] rawKey = (byte[])req.key(); - - // Only values can be hessian-encoded. - req.key(decodeObj(keyFlags, rawKey)); - } - - if (req.value() != null) { - assert req.value() instanceof byte[]; - - byte[] rawVal = (byte[])req.value(); - - req.value(decodeObj(valFlags, rawVal)); - } - } - - if (req.hasExpiration()) { - if (extras == null || extras.length < 8) - throw new IOException("Failed to parse incoming packet (expiration value required for command) [ses=" + - ses + ", opCode=" + Integer.toHexString(req.operationCode() & 0xFF) + ']'); - - req.expiration(U.bytesToInt(extras, 4) & 0xFFFFFFFFL); - } - - if (req.hasInitial()) { - if (extras == null || extras.length < 16) - throw new IOException("Failed to parse incoming packet (initial value required for command) [ses=" + - ses + ", opCode=" + Integer.toHexString(req.operationCode() & 0xFF) + ']'); - - req.initial(U.bytesToLong(extras, 8)); - } - - if (req.hasDelta()) { - if (extras == null || extras.length < 8) - throw new IOException("Failed to parse incoming packet (delta value required for command) [ses=" + - ses + ", opCode=" + Integer.toHexString(req.operationCode() & 0xFF) + ']'); - - req.delta(U.bytesToLong(extras, 0)); - } - - if (extras != null) { - // Clients that include cache name must always include flags. - int len = 4; - - if (req.hasExpiration()) - len += 4; - - if (req.hasDelta()) - len += 8; - - if (req.hasInitial()) - len += 8; - - if (extras.length - len > 0) { - byte[] cacheName = new byte[extras.length - len]; - - U.arrayCopy(extras, len, cacheName, 0, extras.length - len); - - req.cacheName(new String(cacheName, UTF_8)); - } - } - - return req; - } - - /** - * Decodes value from a given byte array to the object according to the flags given. - * - * @param flags Flags. - * @param bytes Byte array to decode. - * @return Decoded value. - * @throws IgniteCheckedException If deserialization failed. - */ - private Object decodeObj(short flags, byte[] bytes) throws IgniteCheckedException { - assert bytes != null; - - if ((flags & SERIALIZED_FLAG) != 0) - return jdkMarshaller.unmarshal(bytes, null); - - int masked = flags & 0xff00; - - switch (masked) { - case BOOLEAN_FLAG: - return bytes[0] == '1'; - case INT_FLAG: - return U.bytesToInt(bytes, 0); - case LONG_FLAG: - return U.bytesToLong(bytes, 0); - case DATE_FLAG: - return new Date(U.bytesToLong(bytes, 0)); - case BYTE_FLAG: - return bytes[0]; - case FLOAT_FLAG: - return Float.intBitsToFloat(U.bytesToInt(bytes, 0)); - case DOUBLE_FLAG: - return Double.longBitsToDouble(U.bytesToLong(bytes, 0)); - case BYTE_ARR_FLAG: - return bytes; - default: - return new String(bytes, UTF_8); - } - } - - /** - * Encodes given object to a byte array and returns flags that describe the type of serialized object. - * - * @param obj Object to serialize. - * @param out Output stream to which object should be written. - * @return Serialization flags. - * @throws IgniteCheckedException If JDK serialization failed. - */ - private int encodeObj(Object obj, ByteArrayOutputStream out) throws IgniteCheckedException { - int flags = 0; - - byte[] data = null; - - if (obj instanceof String) - data = ((String)obj).getBytes(UTF_8); - else if (obj instanceof Boolean) { - data = new byte[] {(byte)((Boolean)obj ? '1' : '0')}; - - flags |= BOOLEAN_FLAG; - } - else if (obj instanceof Integer) { - data = U.intToBytes((Integer)obj); - - flags |= INT_FLAG; - } - else if (obj instanceof Long) { - data = U.longToBytes((Long)obj); - - flags |= LONG_FLAG; - } - else if (obj instanceof Date) { - data = U.longToBytes(((Date)obj).getTime()); - - flags |= DATE_FLAG; - } - else if (obj instanceof Byte) { - data = new byte[] {(Byte)obj}; - - flags |= BYTE_FLAG; - } - else if (obj instanceof Float) { - data = U.intToBytes(Float.floatToIntBits((Float)obj)); - - flags |= FLOAT_FLAG; - } - else if (obj instanceof Double) { - data = U.longToBytes(Double.doubleToLongBits((Double)obj)); - - flags |= DOUBLE_FLAG; - } - else if (obj instanceof byte[]) { - data = (byte[])obj; - - flags |= BYTE_ARR_FLAG; - } - else { - jdkMarshaller.marshal(obj, out); - - flags |= SERIALIZED_FLAG; - } - - if (data != null) - out.write(data, 0, data.length); - - return flags; - } - - /** - * Returns marshaller. - * - * @return Marshaller. - */ - protected GridClientMarshaller marshaller(GridNioSession ses) { - GridClientMarshaller marsh = ses.meta(MARSHALLER.ordinal()); - - assert marsh != null; - - return marsh; - } - - /** {@inheritDoc} */ - public String toString() { - return S.toString(GridTcpRestParser.class, this); - } - - /** - * Holder for parser state and temporary buffer. - */ - protected static class ParserState { - /** Parser index. */ - private int idx; - - /** Temporary data buffer. */ - private ByteArrayOutputStream buf = new ByteArrayOutputStream(); - - /** Packet being assembled. */ - private GridClientMessage packet; - - /** Packet type. */ - private GridClientPacketType packetType; - - /** Header data. */ - private HeaderData hdr; - - /** - * @return Stored parser index. - */ - public int index() { - return idx; - } - - /** - * @param idx Index to store. - */ - public void index(int idx) { - this.idx = idx; - } - - /** - * @return Temporary data buffer. - */ - public ByteArrayOutputStream buffer() { - return buf; - } - - /** - * @return Pending packet. - */ - @Nullable public GridClientMessage packet() { - return packet; - } - - /** - * @param packet Pending packet. - */ - public void packet(GridClientMessage packet) { - assert this.packet == null; - - this.packet = packet; - } - - /** - * @return Pending packet type. - */ - public GridClientPacketType packetType() { - return packetType; - } - - /** - * @param packetType Pending packet type. - */ - public void packetType(GridClientPacketType packetType) { - this.packetType = packetType; - } - - /** - * @return Header. - */ - public HeaderData header() { - return hdr; - } - - /** - * @param hdr Header. - */ - public void header(HeaderData hdr) { - this.hdr = hdr; - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(ParserState.class, this); - } - } - - /** - * Header. - */ - protected static class HeaderData { - /** Request Id. */ - private final long reqId; - - /** Request Id. */ - private final UUID clientId; - - /** Request Id. */ - private final UUID destId; - - /** - * @param reqId Request Id. - * @param clientId Client Id. - * @param destId Destination Id. - */ - private HeaderData(long reqId, UUID clientId, UUID destId) { - this.reqId = reqId; - this.clientId = clientId; - this.destId = destId; - } - - /** - * @return Request Id. - */ - public long reqId() { - return reqId; - } - - /** - * @return Client Id. - */ - public UUID clientId() { - return clientId; - } - - /** - * @return Destination Id. - */ - public UUID destinationId() { - return destId; - } - } -}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1b0e45a2/modules/core/src/main/java/org/gridgain/grid/kernal/processors/rest/protocols/tcp/GridTcpRestProtocol.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/rest/protocols/tcp/GridTcpRestProtocol.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/rest/protocols/tcp/GridTcpRestProtocol.java deleted file mode 100644 index 9a470f7..0000000 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/rest/protocols/tcp/GridTcpRestProtocol.java +++ /dev/null @@ -1,335 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.gridgain.grid.kernal.processors.rest.protocols.tcp; - -import org.apache.ignite.*; -import org.apache.ignite.configuration.*; -import org.apache.ignite.internal.*; -import org.apache.ignite.marshaller.*; -import org.apache.ignite.marshaller.jdk.*; -import org.apache.ignite.spi.*; -import org.apache.ignite.client.marshaller.*; -import org.apache.ignite.client.marshaller.jdk.*; -import org.apache.ignite.client.marshaller.optimized.*; -import org.apache.ignite.client.ssl.*; -import org.gridgain.grid.kernal.processors.rest.*; -import org.gridgain.grid.kernal.processors.rest.client.message.*; -import org.gridgain.grid.kernal.processors.rest.protocols.*; -import org.apache.ignite.internal.util.direct.*; -import org.apache.ignite.internal.util.nio.*; -import org.apache.ignite.internal.util.nio.ssl.*; -import org.apache.ignite.internal.util.typedef.internal.*; -import org.jetbrains.annotations.*; - -import javax.net.ssl.*; -import java.io.*; -import java.net.*; -import java.nio.*; -import java.util.*; - -import static org.apache.ignite.internal.util.nio.GridNioSessionMetaKey.*; - -/** - * TCP binary protocol implementation. - */ -public class GridTcpRestProtocol extends GridRestProtocolAdapter { - /** Server. */ - private GridNioServer<GridClientMessage> srv; - - /** JDK marshaller. */ - private final IgniteMarshaller jdkMarshaller = new IgniteJdkMarshaller(); - - /** NIO server listener. */ - private GridTcpRestNioListener lsnr; - - /** Message reader. */ - private final GridNioMessageReader msgReader = new GridNioMessageReader() { - @Override public boolean read(@Nullable UUID nodeId, GridTcpCommunicationMessageAdapter msg, ByteBuffer buf) { - assert msg != null; - assert buf != null; - - msg.messageReader(this, nodeId); - - return msg.readFrom(buf); - } - - @Nullable @Override public GridTcpMessageFactory messageFactory() { - return null; - } - }; - - /** Message writer. */ - private final GridNioMessageWriter msgWriter = new GridNioMessageWriter() { - @Override public boolean write(@Nullable UUID nodeId, GridTcpCommunicationMessageAdapter msg, ByteBuffer buf) { - assert msg != null; - assert buf != null; - - msg.messageWriter(this, nodeId); - - return msg.writeTo(buf); - } - - @Override public int writeFully(@Nullable UUID nodeId, GridTcpCommunicationMessageAdapter msg, OutputStream out, - ByteBuffer buf) throws IOException { - assert msg != null; - assert out != null; - assert buf != null; - assert buf.hasArray(); - - msg.messageWriter(this, nodeId); - - boolean finished = false; - int cnt = 0; - - while (!finished) { - finished = msg.writeTo(buf); - - out.write(buf.array(), 0, buf.position()); - - cnt += buf.position(); - - buf.clear(); - } - - return cnt; - } - }; - - /** @param ctx Context. */ - public GridTcpRestProtocol(GridKernalContext ctx) { - super(ctx); - } - - /** - * @return JDK marshaller. - */ - IgniteMarshaller jdkMarshaller() { - return jdkMarshaller; - } - - /** - * Returns marshaller. - * - * @param ses Session. - * @return Marshaller. - */ - GridClientMarshaller marshaller(GridNioSession ses) { - GridClientMarshaller marsh = ses.meta(MARSHALLER.ordinal()); - - assert marsh != null; - - return marsh; - } - - /** - * @param ses Session. - * @return Whether portable marshaller is used. - */ - boolean portableMode(GridNioSession ses) { - return ctx.portable().isPortable(marshaller(ses)); - } - - /** {@inheritDoc} */ - @Override public String name() { - return "TCP binary"; - } - - /** {@inheritDoc} */ - @SuppressWarnings("BusyWait") - @Override public void start(final GridRestProtocolHandler hnd) throws IgniteCheckedException { - assert hnd != null; - - ClientConnectionConfiguration cfg = ctx.config().getClientConnectionConfiguration(); - - assert cfg != null; - - lsnr = new GridTcpRestNioListener(log, this, hnd, ctx); - - GridNioParser parser = new GridTcpRestDirectParser(this, msgReader); - - try { - host = resolveRestTcpHost(ctx.config()); - - SSLContext sslCtx = null; - - if (cfg.isRestTcpSslEnabled()) { - GridSslContextFactory factory = cfg.getRestTcpSslContextFactory(); - - if (factory == null) - // Thrown SSL exception instead of IgniteCheckedException for writing correct warning message into log. - throw new SSLException("SSL is enabled, but SSL context factory is not specified."); - - sslCtx = factory.createSslContext(); - } - - int lastPort = cfg.getRestTcpPort() + cfg.getRestPortRange() - 1; - - for (int port0 = cfg.getRestTcpPort(); port0 <= lastPort; port0++) { - if (startTcpServer(host, port0, lsnr, parser, sslCtx, cfg)) { - port = port0; - - if (log.isInfoEnabled()) - log.info(startInfo()); - - return; - } - } - - U.warn(log, "Failed to start TCP binary REST server (possibly all ports in range are in use) " + - "[firstPort=" + cfg.getRestTcpPort() + ", lastPort=" + lastPort + ", host=" + host + ']'); - } - catch (SSLException e) { - U.warn(log, "Failed to start " + name() + " protocol on port " + port + ": " + e.getMessage(), - "Failed to start " + name() + " protocol on port " + port + ". Check if SSL context factory is " + - "properly configured."); - } - catch (IOException e) { - U.warn(log, "Failed to start " + name() + " protocol on port " + port + ": " + e.getMessage(), - "Failed to start " + name() + " protocol on port " + port + ". " + - "Check restTcpHost configuration property."); - } - } - - /** {@inheritDoc} */ - @Override public void onKernalStart() { - super.onKernalStart(); - - Map<Byte, GridClientMarshaller> marshMap = new HashMap<>(); - - marshMap.put(GridClientOptimizedMarshaller.ID, new GridClientOptimizedMarshaller()); - marshMap.put(GridClientJdkMarshaller.ID, new GridClientJdkMarshaller()); - marshMap.put((byte)0, ctx.portable().portableMarshaller()); - - lsnr.marshallers(marshMap); - } - - /** {@inheritDoc} */ - @Override public void stop() { - if (srv != null) { - ctx.ports().deregisterPorts(getClass()); - - srv.stop(); - } - - if (log.isInfoEnabled()) - log.info(stopInfo()); - } - - /** - * Resolves host for REST TCP server using grid configuration. - * - * @param cfg Grid configuration. - * @return REST host. - * @throws IOException If failed to resolve REST host. - */ - private InetAddress resolveRestTcpHost(IgniteConfiguration cfg) throws IOException { - String host = cfg.getClientConnectionConfiguration().getRestTcpHost(); - - if (host == null) - host = cfg.getLocalHost(); - - return U.resolveLocalHost(host); - } - - /** - * Tries to start server with given parameters. - * - * @param hostAddr Host on which server should be bound. - * @param port Port on which server should be bound. - * @param lsnr Server message listener. - * @param parser Server message parser. - * @param sslCtx SSL context in case if SSL is enabled. - * @param cfg Configuration for other parameters. - * @return {@code True} if server successfully started, {@code false} if port is used and - * server was unable to start. - */ - private boolean startTcpServer(InetAddress hostAddr, int port, GridNioServerListener<GridClientMessage> lsnr, - GridNioParser parser, @Nullable SSLContext sslCtx, ClientConnectionConfiguration cfg) { - try { - GridNioFilter codec = new GridNioCodecFilter(parser, log, true); - - GridNioFilter[] filters; - - if (sslCtx != null) { - GridNioSslFilter sslFilter = new GridNioSslFilter(sslCtx, log); - - sslFilter.directMode(true); - - boolean auth = cfg.isRestTcpSslClientAuth(); - - sslFilter.wantClientAuth(auth); - - sslFilter.needClientAuth(auth); - - filters = new GridNioFilter[] { - codec, - sslFilter - }; - } - else - filters = new GridNioFilter[] { codec }; - - srv = GridNioServer.<GridClientMessage>builder() - .address(hostAddr) - .port(port) - .listener(lsnr) - .logger(log) - .selectorCount(cfg.getRestTcpSelectorCount()) - .gridName(ctx.gridName()) - .tcpNoDelay(cfg.isRestTcpNoDelay()) - .directBuffer(cfg.isRestTcpDirectBuffer()) - .byteOrder(ByteOrder.nativeOrder()) - .socketSendBufferSize(cfg.getRestTcpSendBufferSize()) - .socketReceiveBufferSize(cfg.getRestTcpReceiveBufferSize()) - .sendQueueLimit(cfg.getRestTcpSendQueueLimit()) - .filters(filters) - .directMode(true) - .messageWriter(msgWriter) - .build(); - - srv.idleTimeout(cfg.getRestIdleTimeout()); - - srv.start(); - - ctx.ports().registerPort(port, IgnitePortProtocol.TCP, getClass()); - - return true; - } - catch (IgniteCheckedException e) { - if (log.isDebugEnabled()) - log.debug("Failed to start " + name() + " protocol on port " + port + ": " + e.getMessage()); - - return false; - } - } - - /** {@inheritDoc} */ - @Override protected String getAddressPropertyName() { - return GridNodeAttributes.ATTR_REST_TCP_ADDRS; - } - - /** {@inheritDoc} */ - @Override protected String getHostNamePropertyName() { - return GridNodeAttributes.ATTR_REST_TCP_HOST_NAMES; - } - - /** {@inheritDoc} */ - @Override protected String getPortPropertyName() { - return GridNodeAttributes.ATTR_REST_TCP_PORT; - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1b0e45a2/modules/core/src/main/java/org/gridgain/grid/kernal/processors/rest/protocols/tcp/package.html ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/rest/protocols/tcp/package.html b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/rest/protocols/tcp/package.html deleted file mode 100644 index 31d10f0..0000000 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/rest/protocols/tcp/package.html +++ /dev/null @@ -1,23 +0,0 @@ -<!-- - Licensed to the Apache Software Foundation (ASF) under one or more - contributor license agreements. See the NOTICE file distributed with - this work for additional information regarding copyright ownership. - The ASF licenses this file to You under the Apache License, Version 2.0 - (the "License"); you may not use this file except in compliance with - the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. - --> -<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd"> -<html> -<body> - <!-- Package description. --> - Adapters for TCP-based REST protocols. -</body> -</html> http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1b0e45a2/modules/core/src/main/java/org/gridgain/grid/kernal/processors/rest/request/GridRestCacheQueryRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/rest/request/GridRestCacheQueryRequest.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/rest/request/GridRestCacheQueryRequest.java deleted file mode 100644 index 791afc8..0000000 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/rest/request/GridRestCacheQueryRequest.java +++ /dev/null @@ -1,143 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.gridgain.grid.kernal.processors.rest.request; - -import org.gridgain.grid.kernal.processors.rest.client.message.*; - -import java.io.Serializable; - -/** - * Cache query request. - */ -public class GridRestCacheQueryRequest extends GridRestRequest implements Serializable { - /** */ - private static final long serialVersionUID = 0L; - - /** Request message. */ - private GridClientCacheQueryRequest msg; - - /** - * @param msg Client request message. - */ - public GridRestCacheQueryRequest(GridClientCacheQueryRequest msg) { - this.msg = msg; - } - - /** - * @return Query ID. - */ - public long queryId() { - return msg.queryId(); - } - - /** - * @return Operation. - */ - public GridClientCacheQueryRequest.GridQueryOperation operation() { - return msg.operation(); - } - - /** - * @return Cache name. - */ - public String cacheName() { - return msg.cacheName(); - } - - /** - * @return Query clause. - */ - public String clause() { - return msg.clause(); - } - - /** - * @return Query type. - */ - public GridClientCacheQueryRequest.GridQueryType type() { - return msg.type(); - } - - /** - * @return Page size. - */ - public int pageSize() { - return msg.pageSize(); - } - - /** - * @return Timeout. - */ - public long timeout() { - return msg.timeout(); - } - - /** - * @return Include backups. - */ - public boolean includeBackups() { - return msg.includeBackups(); - } - - /** - * @return Enable dedup. - */ - public boolean enableDedup() { - return msg.enableDedup(); - } - - /** - * @return Keep portable flag. - */ - public boolean keepPortable() { - return msg.keepPortable(); - } - - /** - * @return Class name. - */ - public String className() { - return msg.className(); - } - - /** - * @return Remot reducer class name. - */ - public String remoteReducerClassName() { - return msg.remoteReducerClassName(); - } - - /** - * @return Remote transformer class name. - */ - public String remoteTransformerClassName() { - return msg.remoteTransformerClassName(); - } - - /** - * @return Query arguments. - */ - public Object[] queryArguments() { - return msg.queryArguments(); - } - - /** {@inheritDoc} */ - @Override public String toString() { - return msg.toString(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1b0e45a2/modules/core/src/main/java/org/gridgain/grid/kernal/processors/rest/request/GridRestCacheRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/rest/request/GridRestCacheRequest.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/rest/request/GridRestCacheRequest.java deleted file mode 100644 index db6e294..0000000 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/rest/request/GridRestCacheRequest.java +++ /dev/null @@ -1,182 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.gridgain.grid.kernal.processors.rest.request; - -import org.apache.ignite.internal.util.typedef.internal.*; - -import java.util.*; - -public class GridRestCacheRequest extends GridRestRequest { - /** Cache name. */ - private String cacheName; - - /** Key. */ - private Object key; - - /** Value (expected value for CAS). */ - private Object val; - - /** New value for CAS. */ - private Object val2; - - /** Keys and values for put all, get all, remove all operations. */ - private Map<Object, Object> vals; - - /** Bit map of cache flags to be enabled on cache projection. */ - private int cacheFlags; - - /** Expiration time. */ - private Long ttl; - - /** Value to add/subtract. */ - private Long delta; - - /** Initial value for increment and decrement commands. */ - private Long init; - - /** - * @return Cache name, or {@code null} if not set. - */ - public String cacheName() { - return cacheName; - } - - /** - * @param cacheName Cache name. - */ - public void cacheName(String cacheName) { - this.cacheName = cacheName; - } - - /** - * @return Key. - */ - public Object key() { - return key; - } - - /** - * @param key Key. - */ - public void key(Object key) { - this.key = key; - } - - /** - * @return Value 1. - */ - public Object value() { - return val; - } - - /** - * @param val Value 1. - */ - public void value(Object val) { - this.val = val; - } - - /** - * @return Value 2. - */ - public Object value2() { - return val2; - } - - /** - * @param val2 Value 2. - */ - public void value2(Object val2) { - this.val2 = val2; - } - - /** - * @return Keys and values for put all, get all, remove all operations. - */ - public Map<Object, Object> values() { - return vals; - } - - /** - * @param vals Keys and values for put all, get all, remove all operations. - */ - public void values(Map<Object, Object> vals) { - this.vals = vals; - } - - /** - * @param cacheFlags Bit representation of cache flags. - */ - public void cacheFlags(int cacheFlags) { - this.cacheFlags = cacheFlags; - } - - /** - * @return Bit representation of cache flags. - */ - public int cacheFlags() { - return cacheFlags; - } - - /** - * @return Expiration time. - */ - public Long ttl() { - return ttl; - } - - /** - * @param ttl Expiration time. - */ - public void ttl(Long ttl) { - this.ttl = ttl; - } - - /** - * @return Delta for increment and decrement commands. - */ - public Long delta() { - return delta; - } - - /** - * @param delta Delta for increment and decrement commands. - */ - public void delta(Long delta) { - this.delta = delta; - } - - /** - * @return Initial value for increment and decrement commands. - */ - public Long initial() { - return init; - } - - /** - * @param init Initial value for increment and decrement commands. - */ - public void initial(Long init) { - this.init = init; - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(GridRestCacheRequest.class, this, super.toString()); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1b0e45a2/modules/core/src/main/java/org/gridgain/grid/kernal/processors/rest/request/GridRestLogRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/rest/request/GridRestLogRequest.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/rest/request/GridRestLogRequest.java deleted file mode 100644 index a49a64c..0000000 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/rest/request/GridRestLogRequest.java +++ /dev/null @@ -1,81 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.gridgain.grid.kernal.processors.rest.request; - -import org.apache.ignite.internal.util.typedef.internal.*; - -/** - * Grid command request of log file. - */ -public class GridRestLogRequest extends GridRestRequest { - /** Task name. */ - private String path; - - /** From line, inclusive, indexing from 0. */ - private int from = -1; - - /** To line, inclusive, indexing from 0, can exceed count of lines in log. */ - private int to = -1; - - /** - * @return Path to log file. - */ - public String path() { - return path; - } - - /** - * @param path Path to log file. - */ - public void path(String path) { - this.path = path; - } - - /** - * @return From line, inclusive, indexing from 0. - */ - public int from() { - return from; - } - - /** - * @param from From line, inclusive, indexing from 0. - */ - public void from(int from) { - this.from = from; - } - - /** - * @return To line, inclusive, indexing from 0. - */ - public int to() { - return to; - } - - /** - * @param to To line, inclusive, indexing from 0. - */ - public void to(int to) { - this.to = to; - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(GridRestLogRequest.class, this, super.toString()); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1b0e45a2/modules/core/src/main/java/org/gridgain/grid/kernal/processors/rest/request/GridRestPortableGetMetaDataRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/rest/request/GridRestPortableGetMetaDataRequest.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/rest/request/GridRestPortableGetMetaDataRequest.java deleted file mode 100644 index fe04048..0000000 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/rest/request/GridRestPortableGetMetaDataRequest.java +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.gridgain.grid.kernal.processors.rest.request; - -import org.gridgain.grid.kernal.processors.rest.client.message.*; - -import java.util.*; - -/** - * Portable get metadata request. - */ -public class GridRestPortableGetMetaDataRequest extends GridRestRequest { - /** */ - private final GridClientGetMetaDataRequest msg; - - /** - * @param msg Client message. - */ - public GridRestPortableGetMetaDataRequest(GridClientGetMetaDataRequest msg) { - this.msg = msg; - } - - /** - * @return Type IDs. - */ - public Collection<Integer> typeIds() { - return msg.typeIds(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1b0e45a2/modules/core/src/main/java/org/gridgain/grid/kernal/processors/rest/request/GridRestPortablePutMetaDataRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/rest/request/GridRestPortablePutMetaDataRequest.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/rest/request/GridRestPortablePutMetaDataRequest.java deleted file mode 100644 index 4b190ee..0000000 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/rest/request/GridRestPortablePutMetaDataRequest.java +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.gridgain.grid.kernal.processors.rest.request; - -import org.gridgain.grid.kernal.processors.rest.client.message.*; - -import java.util.*; - -/** - * Portable get metadata request. - */ -public class GridRestPortablePutMetaDataRequest extends GridRestRequest { - /** */ - private final GridClientPutMetaDataRequest msg; - - /** - * @param msg Client message. - */ - public GridRestPortablePutMetaDataRequest(GridClientPutMetaDataRequest msg) { - this.msg = msg; - } - - /** - * @return Type IDs. - */ - public Collection<GridClientPortableMetaData> metaData() { - return msg.metaData(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1b0e45a2/modules/core/src/main/java/org/gridgain/grid/kernal/processors/rest/request/GridRestRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/rest/request/GridRestRequest.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/rest/request/GridRestRequest.java deleted file mode 100644 index 22a69ac..0000000 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/rest/request/GridRestRequest.java +++ /dev/null @@ -1,167 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.gridgain.grid.kernal.processors.rest.request; - -import org.gridgain.grid.kernal.processors.rest.*; -import org.apache.ignite.internal.util.tostring.*; -import org.apache.ignite.internal.util.typedef.internal.*; - -import java.net.*; -import java.util.*; - -/** - * Grid command request. - */ -public class GridRestRequest { - /** Destination ID. */ - private UUID destId; - - /** Client ID. */ - private UUID clientId; - - /** Client network address. */ - private InetSocketAddress addr; - - /** Client credentials. */ - @GridToStringExclude - private Object cred; - - /** Client session token. */ - private byte[] sesTok; - - /** Command. */ - private GridRestCommand cmd; - - /** Portable mode flag. */ - private boolean portableMode; - - /** - * @return Destination ID. - */ - public UUID destinationId() { - return destId; - } - - /** - * @param destId Destination ID. - */ - public void destinationId(UUID destId) { - this.destId = destId; - } - - /** - * @return Command. - */ - public GridRestCommand command() { - return cmd; - } - - /** - * @param cmd Command. - */ - public void command(GridRestCommand cmd) { - this.cmd = cmd; - } - - /** - * Gets client ID that performed request. - * - * @return Client ID. - */ - public UUID clientId() { - return clientId; - } - - /** - * Sets client ID that performed request. - * - * @param clientId Client ID. - */ - public void clientId(UUID clientId) { - this.clientId = clientId; - } - - /** - * Gets client credentials for authentication process. - * - * @return Credentials. - */ - public Object credentials() { - return cred; - } - - /** - * Sets client credentials for authentication. - * - * @param cred Credentials. - */ - public void credentials(Object cred) { - this.cred = cred; - } - - /** - * Gets session token for already authenticated client. - * - * @return Session token. - */ - public byte[] sessionToken() { - return sesTok; - } - - /** - * Sets session token for already authenticated client. - * - * @param sesTok Session token. - */ - public void sessionToken(byte[] sesTok) { - this.sesTok = sesTok; - } - - /** - * @return Client address. - */ - public InetSocketAddress address() { - return addr; - } - - /** - * @param addr Client address. - */ - public void address(InetSocketAddress addr) { - this.addr = addr; - } - - /** - * @return Portable mode flag. - */ - public boolean portableMode() { - return portableMode; - } - - /** - * @param portableMode Portable mode flag. - */ - public void portableMode(boolean portableMode) { - this.portableMode = portableMode; - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(GridRestRequest.class, this); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1b0e45a2/modules/core/src/main/java/org/gridgain/grid/kernal/processors/rest/request/GridRestTaskRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/rest/request/GridRestTaskRequest.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/rest/request/GridRestTaskRequest.java deleted file mode 100644 index 4baa576..0000000 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/rest/request/GridRestTaskRequest.java +++ /dev/null @@ -1,134 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.gridgain.grid.kernal.processors.rest.request; - -import org.apache.ignite.internal.util.typedef.internal.*; - -import java.util.*; - -/** - * Grid task command request. - */ -public class GridRestTaskRequest extends GridRestRequest { - /** Task name. */ - private String taskName; - - /** Task Id. */ - private String taskId; - - /** Parameters. */ - private List<Object> params; - - /** Asynchronous execution flag. */ - private boolean async; - - /** Timeout. */ - private long timeout; - - /** Keep portables flag. */ - private boolean keepPortables; - - /** - * @return Task name, if specified, {@code null} otherwise. - */ - public String taskName() { - return taskName; - } - - /** - * @param taskName Name of task for execution. - */ - public void taskName(String taskName) { - this.taskName = taskName; - } - - /** - * @return Task identifier, if specified, {@code null} otherwise. - */ - public String taskId() { - return taskId; - } - - /** - * @param taskId Task identifier. - */ - public void taskId(String taskId) { - this.taskId = taskId; - } - - /** - * @return Asynchronous execution flag. - */ - public boolean async() { - return async; - } - - /** - * @param async Asynchronous execution flag. - */ - public void async(boolean async) { - this.async = async; - } - - /** - * @return Task execution parameters. - */ - public List<Object> params() { - return params; - } - - /** - * @param params Task execution parameters. - */ - public void params(List<Object> params) { - this.params = params; - } - - /** - * @return Timeout. - */ - public long timeout() { - return timeout; - } - - /** - * @param timeout Timeout. - */ - public void timeout(long timeout) { - this.timeout = timeout; - } - - /** - * @return Keep portables flag. - */ - public boolean keepPortables() { - return keepPortables; - } - - /** - * @param keepPortables Keep portables flag. - */ - public void keepPortables(boolean keepPortables) { - this.keepPortables = keepPortables; - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(GridRestTaskRequest.class, this, super.toString()); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1b0e45a2/modules/core/src/main/java/org/gridgain/grid/kernal/processors/rest/request/GridRestTopologyRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/rest/request/GridRestTopologyRequest.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/rest/request/GridRestTopologyRequest.java deleted file mode 100644 index 1dca91d..0000000 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/rest/request/GridRestTopologyRequest.java +++ /dev/null @@ -1,100 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.gridgain.grid.kernal.processors.rest.request; - -import org.apache.ignite.internal.util.typedef.internal.*; - -import java.util.*; - -/** - * Grid command topology request. - */ -public class GridRestTopologyRequest extends GridRestRequest { - /** Id of requested node. */ - private UUID nodeId; - - /** IP address of requested node. */ - private String nodeIp; - - /** Include metrics flag. */ - private boolean includeMetrics; - - /** Include node attributes flag. */ - private boolean includeAttrs; - - /** - * @return Include metrics flag. - */ - public boolean includeMetrics() { - return includeMetrics; - } - - /** - * @param includeMetrics Include metrics flag. - */ - public void includeMetrics(boolean includeMetrics) { - this.includeMetrics = includeMetrics; - } - - /** - * @return Include node attributes flag. - */ - public boolean includeAttributes() { - return includeAttrs; - } - - /** - * @param includeAttrs Include node attributes flag. - */ - public void includeAttributes(boolean includeAttrs) { - this.includeAttrs = includeAttrs; - } - - /** - * @return Node identifier, if specified, {@code null} otherwise. - */ - public UUID nodeId() { - return nodeId; - } - - /** - * @param nodeId Node identifier to lookup. - */ - public void nodeId(UUID nodeId) { - this.nodeId = nodeId; - } - - /** - * @return Node ip address if specified, {@code null} otherwise. - */ - public String nodeIp() { - return nodeIp; - } - - /** - * @param nodeIp Node ip address to lookup. - */ - public void nodeIp(String nodeIp) { - this.nodeIp = nodeIp; - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(GridRestTopologyRequest.class, this, super.toString()); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1b0e45a2/modules/core/src/main/java/org/gridgain/grid/kernal/processors/rest/request/package.html ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/rest/request/package.html b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/rest/request/package.html deleted file mode 100644 index 3e414be..0000000 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/rest/request/package.html +++ /dev/null @@ -1,23 +0,0 @@ -<!-- - Licensed to the Apache Software Foundation (ASF) under one or more - contributor license agreements. See the NOTICE file distributed with - this work for additional information regarding copyright ownership. - The ASF licenses this file to You under the Apache License, Version 2.0 - (the "License"); you may not use this file except in compliance with - the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. - --> -<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd"> -<html> -<body> - <!-- Package description. --> - REST requests. -</body> -</html> http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1b0e45a2/modules/core/src/test/java/org/apache/ignite/internal/processors/rest/handlers/cache/GridCacheCommandHandlerSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/rest/handlers/cache/GridCacheCommandHandlerSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/rest/handlers/cache/GridCacheCommandHandlerSelfTest.java new file mode 100644 index 0000000..ddbbe7f --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/rest/handlers/cache/GridCacheCommandHandlerSelfTest.java @@ -0,0 +1,263 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.rest.handlers.cache; + +import org.apache.ignite.*; +import org.apache.ignite.cache.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.internal.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.spi.discovery.tcp.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; +import org.gridgain.grid.kernal.processors.cache.*; +import org.apache.ignite.internal.processors.rest.*; +import org.apache.ignite.internal.processors.rest.handlers.*; +import org.apache.ignite.internal.processors.rest.request.*; +import org.apache.ignite.internal.util.future.*; +import org.apache.ignite.internal.util.typedef.*; +import org.gridgain.testframework.junits.common.*; + +import java.lang.reflect.*; +import java.util.*; + +/** + * Tests command handler directly. + */ +public class GridCacheCommandHandlerSelfTest extends GridCommonAbstractTest { + /** + * Constructor. + */ + public GridCacheCommandHandlerSelfTest() { + super(true); + } + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration() throws Exception { + // Discovery config. + TcpDiscoverySpi disco = new TcpDiscoverySpi(); + + disco.setIpFinder(new TcpDiscoveryVmIpFinder(true)); + + // Cache config. + CacheConfiguration cacheCfg = defaultCacheConfiguration(); + + cacheCfg.setCacheMode(GridCacheMode.LOCAL); + cacheCfg.setQueryIndexEnabled(false); + + // Grid config. + IgniteConfiguration cfg = super.getConfiguration(); + + cfg.setLocalHost("localhost"); + cfg.setRestEnabled(true); + cfg.setDiscoverySpi(disco); + cfg.setCacheConfiguration(cacheCfg); // Add 'null' cache configuration. + + return cfg; + } + + /** + * Tests the cache failure during the execution of the CACHE_GET command. + * + * @throws Exception If failed. + */ + public void testCacheGetFailsSyncNotify() throws Exception { + GridRestCommandHandler hnd = new TestableGridCacheCommandHandler(((GridKernal)grid()).context(), "getAsync", + true); + + GridRestCacheRequest req = new GridRestCacheRequest(); + + req.command(GridRestCommand.CACHE_GET); + + req.key("k1"); + + try { + hnd.handleAsync(req).get(); + + fail("Expected exception not thrown."); + } + catch (IgniteCheckedException e) { + info("Got expected exception: " + e); + } + } + + /** + * Tests the cache failure during the execution of the CACHE_GET command. + * + * @throws Exception If failed. + */ + public void testCacheGetFailsAsyncNotify() throws Exception { + GridRestCommandHandler hnd = new TestableGridCacheCommandHandler(((GridKernal)grid()).context(), "getAsync", + false); + + GridRestCacheRequest req = new GridRestCacheRequest(); + + req.command(GridRestCommand.CACHE_GET); + + req.key("k1"); + + try { + hnd.handleAsync(req).get(); + + fail("Expected exception not thrown."); + } + catch (IgniteCheckedException e) { + info("Got expected exception: " + e); + } + } + + /** + * Test cache handler append/prepend commands. + * + * @throws Exception In case of any exception. + */ + @SuppressWarnings("NullableProblems") + public void testAppendPrepend() throws Exception { + assertEquals("as" + "df", testAppend("as", "df", true)); + assertEquals("df" + "as", testAppend("as", "df", false)); + + List<String> curList = new ArrayList<>(Arrays.asList("a", "b")); + List<String> newList = new ArrayList<>(Arrays.asList("b", "c")); + + assertEquals(Arrays.asList("a", "b", "b", "c"), testAppend(curList, newList, true)); + assertEquals(Arrays.asList("b", "c", "a", "b"), testAppend(curList, newList, false)); + + Set<String> curSet = new HashSet<>(Arrays.asList("a", "b")); + Set<String> newSet = new HashSet<>(Arrays.asList("b", "c")); + Set<String> resSet = new HashSet<>(Arrays.asList("a", "b", "c")); + + assertEquals(resSet, testAppend(curSet, newSet, true)); + assertEquals(resSet, testAppend(curSet, newSet, false)); + assertEquals(resSet, testAppend(newSet, curList, true)); + assertEquals(resSet, testAppend(newSet, curList, false)); + assertEquals(resSet, testAppend(curSet, newList, true)); + assertEquals(resSet, testAppend(curSet, newList, false)); + + Map<String, String> curMap = F.asMap("a", "1", "b", "2", "c", "3"); + Map<String, String> newMap = F.asMap("a", "#", "b", null, "c", "%", "d", "4"); + + assertEquals(F.asMap("a", "#", "c", "%", "d", "4"), testAppend(curMap, newMap, true)); + assertEquals(F.asMap("a", "1", "b", "2", "c", "3", "d", "4"), testAppend(curMap, newMap, false)); + + try { + testAppend("as", Arrays.asList("df"), true); + + fail("Expects failed with incompatible types message."); + } + catch (IgniteCheckedException e) { + info("Got expected exception: " + e); + + assertTrue(e.getMessage().startsWith("Incompatible types")); + } + } + + /** + * Test cache handler append/prepend commands with specified environment. + * + * @param curVal Current value in cache. + * @param newVal New value to append/prepend. + * @param append Append or prepend flag. + * @param <T> Cache value type. + * @return Resulting value in cache. + * @throws IgniteCheckedException In case of any grid exception. + */ + private <T> T testAppend(T curVal, T newVal, boolean append) throws IgniteCheckedException { + GridRestCommandHandler hnd = new GridCacheCommandHandler(((GridKernal)grid()).context()); + + String key = UUID.randomUUID().toString(); + + GridRestCacheRequest req = new GridRestCacheRequest(); + + req.command(append ? GridRestCommand.CACHE_APPEND : GridRestCommand.CACHE_PREPEND); + + req.key(key); + req.value(newVal); + + assertFalse("Expects failure due to no value in cache.", (Boolean)hnd.handleAsync(req).get().getResponse()); + + T res; + + try { + // Change cache state. + cache().putx(key, curVal); + + // Validate behavior for initialized cache (has current value). + assertTrue("Expects succeed.", (Boolean)hnd.handleAsync(req).get().getResponse()); + } + finally { + res = (T)cache().remove(key); + } + + return res; + } + + /** + * Test command handler. + */ + private static class TestableGridCacheCommandHandler extends GridCacheCommandHandler { + /** */ + private final String failMtd; + + /** */ + private final boolean sync; + + /** + * Constructor. + * + * @param ctx Context. + * @param failMtd Method to fail. + * @param sync Sync notification flag. + */ + TestableGridCacheCommandHandler(final GridKernalContext ctx, final String failMtd, final boolean sync) { + super(ctx); + + this.failMtd = failMtd; + this.sync = sync; + } + + /** + * @param cacheName Name of the cache. + * + * @return Instance of a GridCache proxy. + */ + @Override protected GridCacheProjectionEx<Object, Object> localCache(String cacheName) throws IgniteCheckedException { + final GridCacheProjectionEx<Object, Object> cache = super.localCache(cacheName); + + return (GridCacheProjectionEx<Object, Object>)Proxy.newProxyInstance(getClass().getClassLoader(), + new Class[] {GridCacheProjectionEx.class}, + new InvocationHandler() { + @Override public Object invoke(Object proxy, Method mtd, Object[] args) throws Throwable { + if (failMtd.equals(mtd.getName())) { + IgniteFuture<Object> fut = new GridFinishedFuture<>(ctx, + new IgniteCheckedException("Operation failed")); + + fut.syncNotify(sync); + + return fut; + } + // Rewriting flagsOn result to keep intercepting invocations after it. + else if ("flagsOn".equals(mtd.getName())) + return proxy; + else if ("forSubjectId".equals(mtd.getName())) + return proxy; + + return mtd.invoke(cache, args); + } + }); + } + } +}
