IGNITE-2627: Implemented OdbcNioServerBuffer and OdbcBufferedParser.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/bb0788c1 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/bb0788c1 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/bb0788c1 Branch: refs/heads/ignite-1786 Commit: bb0788c1128e7239c294fbc9a0ad17addd2026e7 Parents: a7ffce0 Author: isapego <[email protected]> Authored: Tue Feb 16 17:37:26 2016 +0300 Committer: isapego <[email protected]> Committed: Tue Feb 16 17:37:26 2016 +0300 ---------------------------------------------------------------------- .../processors/odbc/OdbcBufferedParser.java | 96 ++++++++++++++ .../processors/odbc/OdbcNioServerBuffer.java | 129 +++++++++++++++++++ .../internal/processors/odbc/OdbcProcessor.java | 2 +- modules/platforms/cpp/odbc/src/connection.cpp | 6 +- 4 files changed, 227 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/bb0788c1/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/OdbcBufferedParser.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/OdbcBufferedParser.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/OdbcBufferedParser.java new file mode 100644 index 0000000..4ce8a8a --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/OdbcBufferedParser.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.odbc; + +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.internal.util.nio.GridNioParser; +import org.apache.ignite.internal.util.nio.GridNioSession; +import org.apache.ignite.internal.util.nio.GridNioSessionMetaKey; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; + +/** + * This class implements stream parser based on {@link OdbcNioServerBuffer}. + * <p> + * The rule for this parser is that every message sent over the stream is prepended with + * 4-byte integer header containing message size. So, the stream structure is as follows: + * <pre> + * +--+--+--+--+--+--+...+--+--+--+--+--+--+--+...+--+ + * | MSG_SIZE | MESSAGE | MSG_SIZE | MESSAGE | + * +--+--+--+--+--+--+...+--+--+--+--+--+--+--+...+--+ + * </pre> + */ +public class OdbcBufferedParser implements GridNioParser { + /** Buffer metadata key. */ + private static final int BUF_META_KEY = GridNioSessionMetaKey.nextUniqueKey(); + + /** Direct buffer alocation flag. */ + private final boolean directBuf; + + /** Message size byte order. */ + private final ByteOrder order; + + /** + * @param directBuf Direct buffer. + * @param order Message size byte order. + */ + public OdbcBufferedParser(boolean directBuf, ByteOrder order) { + this.directBuf = directBuf; + this.order = order; + } + + /** {@inheritDoc} */ + @Override public byte[] decode(GridNioSession ses, ByteBuffer buf) throws IOException, IgniteCheckedException { + OdbcNioServerBuffer nioBuf = ses.meta(BUF_META_KEY); + + // Decode for a given session is called per one thread, so there should not be any concurrency issues. + // However, we make some additional checks. + if (nioBuf == null) { + nioBuf = new OdbcNioServerBuffer(order); + + OdbcNioServerBuffer old = ses.addMeta(BUF_META_KEY, nioBuf); + + assert old == null; + } + + return nioBuf.read(buf); + } + + /** {@inheritDoc} */ + @Override public ByteBuffer encode(GridNioSession ses, Object msg) throws IOException, IgniteCheckedException { + byte[] msg0 = (byte[])msg; + + ByteBuffer res = directBuf ? ByteBuffer.allocateDirect(msg0.length + 4) : ByteBuffer.allocate(msg0.length + 4); + + res.order(order); + + res.putInt(msg0.length); + res.put(msg0); + + res.flip(); + + return res; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return OdbcBufferedParser.class.getSimpleName(); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/bb0788c1/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/OdbcNioServerBuffer.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/OdbcNioServerBuffer.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/OdbcNioServerBuffer.java new file mode 100644 index 0000000..20cf4c5 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/OdbcNioServerBuffer.java @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.odbc; + +import org.apache.ignite.IgniteCheckedException; +import org.jetbrains.annotations.Nullable; + +import java.nio.ByteBuffer; +import java.nio.ByteOrder; + +/** + * NIO server buffer. + */ +public class OdbcNioServerBuffer { + /** Current message data. */ + private byte[] data; + + /** Count of received bytes of the current message. */ + private int cnt = -4; + + /** Current message size. */ + private int msgSize; + + /** Message size byte order. */ + private final ByteOrder order; + + /** + * @param order Byte order. + */ + public OdbcNioServerBuffer(ByteOrder order) { + this.order = order; + } + + /** + * Reset buffer state. + */ + public void reset() { + msgSize = 0; + cnt = -4; + data = null; + } + + /** + * Checks whether the byte array is filled. + * + * @return Flag indicating whether byte array is filled or not. + */ + public boolean isFilled() { + return cnt > 0 && cnt == msgSize; + } + + /** + * Get data withing the buffer. + * + * @return Data. + */ + public byte[] data() { + return data; + } + + /** + * @param buf Buffer. + * @return Message bytes or {@code null} if message is not fully read yet. + * @throws IgniteCheckedException If failed to parse message. + */ + @Nullable public byte[] read(ByteBuffer buf) throws IgniteCheckedException { + if (cnt < 0) { + for (; cnt < 0 && buf.hasRemaining(); cnt++) { + if (order == ByteOrder.BIG_ENDIAN) + msgSize = (msgSize << 8) | buf.get() & 0xFF; + else + msgSize |= (buf.get() & 0xFF) << (8*(4 + cnt)); + } + + if (cnt < 0) + return null; + + // If count is 0 then message size should be inited. + if (msgSize <= 0) + throw new IgniteCheckedException("Invalid message size: " + msgSize); + + data = new byte[msgSize]; + } + + assert msgSize > 0; + assert cnt >= 0; + + int remaining = buf.remaining(); + + // If there are more bytes in buffer. + if (remaining > 0) { + int missing = msgSize - cnt; + + // Read only up to message size. + if (missing > 0) { + int len = missing < remaining ? missing : remaining; + + buf.get(data, cnt, len); + + cnt += len; + } + } + + if (cnt == msgSize) { + byte[] data0 = data; + + reset(); + + return data0; + } + else + return null; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/bb0788c1/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/OdbcProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/OdbcProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/OdbcProcessor.java index a3ed422..831ef02 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/OdbcProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/OdbcProcessor.java @@ -84,7 +84,7 @@ public class OdbcProcessor extends GridProcessorAdapter { .socketSendBufferSize(odbcCfg.getSendBufferSize()) .socketReceiveBufferSize(odbcCfg.getReceiveBufferSize()) .sendQueueLimit(odbcCfg.getSendQueueLimit()) - .filters(new GridNioCodecFilter(new GridBufferedParser(false, ByteOrder.BIG_ENDIAN), log, false)) + .filters(new GridNioCodecFilter(new OdbcBufferedParser(false, ByteOrder.LITTLE_ENDIAN), log, false)) .directMode(false) .idleTimeout(odbcCfg.getIdleTimeout()) .build(); http://git-wip-us.apache.org/repos/asf/ignite/blob/bb0788c1/modules/platforms/cpp/odbc/src/connection.cpp ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/odbc/src/connection.cpp b/modules/platforms/cpp/odbc/src/connection.cpp index 2905ed7..37e81d0 100644 --- a/modules/platforms/cpp/odbc/src/connection.cpp +++ b/modules/platforms/cpp/odbc/src/connection.cpp @@ -172,8 +172,7 @@ namespace ignite OdbcProtocolHeader hdr; - // Lenght should has Big Endian byte order. - hdr.len = htonl(static_cast<unsigned long>(len)); + hdr.len = len; int sent = socket.Send(reinterpret_cast<int8_t*>(&hdr), sizeof(hdr)); @@ -210,9 +209,6 @@ namespace ignite int received = socket.Receive(reinterpret_cast<int8_t*>(&hdr), sizeof(hdr)); - // Lenght has Big Endian byte order. - hdr.len = ntohl(hdr.len); - LOG_MSG("Received: %d\n", received); if (received != sizeof(hdr))
