Repository: ignite Updated Branches: refs/heads/ignite-1786 0d88f82df -> d3763304c
IGNITE-2627: Removed OdbcNioParser. Using GridBufferedParser for message separation. Message parsing moved to OdbcNioListener. Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/feb8350d Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/feb8350d Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/feb8350d Branch: refs/heads/ignite-1786 Commit: feb8350d0ae40a6e98cf4b911ab16c85ed0f659a Parents: 344a38e Author: isapego <[email protected]> Authored: Mon Feb 15 21:23:31 2016 +0300 Committer: isapego <[email protected]> Committed: Mon Feb 15 21:23:31 2016 +0300 ---------------------------------------------------------------------- .../processors/odbc/OdbcNioListener.java | 447 +++++++++---------- .../internal/processors/odbc/OdbcNioParser.java | 383 ---------------- .../internal/processors/odbc/OdbcProcessor.java | 9 +- .../processors/odbc/OdbcRequestHandler.java | 335 ++++++++++++++ .../cpp/odbc/include/ignite/odbc/parser.h | 4 - modules/platforms/cpp/odbc/src/connection.cpp | 22 +- 6 files changed, 571 insertions(+), 629 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/feb8350d/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/OdbcNioListener.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/OdbcNioListener.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/OdbcNioListener.java index 0a5ae0e..1a7ece3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/OdbcNioListener.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/OdbcNioListener.java @@ -17,42 +17,31 @@ package org.apache.ignite.internal.processors.odbc; -import org.apache.ignite.IgniteCache; import org.apache.ignite.IgniteLogger; -import org.apache.ignite.cache.query.QueryCursor; -import org.apache.ignite.cache.query.SqlFieldsQuery; import org.apache.ignite.internal.GridKernalContext; -import org.apache.ignite.internal.processors.cache.QueryCursorImpl; -import org.apache.ignite.internal.processors.query.GridQueryFieldMetadata; -import org.apache.ignite.internal.processors.query.GridQueryTypeDescriptor; +import org.apache.ignite.internal.binary.BinaryRawWriterEx; +import org.apache.ignite.internal.binary.BinaryReaderExImpl; +import org.apache.ignite.internal.binary.GridBinaryMarshaller; +import org.apache.ignite.internal.binary.streams.BinaryHeapInputStream; +import org.apache.ignite.internal.binary.streams.BinaryHeapOutputStream; +import org.apache.ignite.internal.binary.streams.BinaryInputStream; +import org.apache.ignite.internal.processors.cache.binary.CacheObjectBinaryProcessorImpl; import org.apache.ignite.internal.util.GridSpinBusyLock; import org.apache.ignite.internal.util.nio.GridNioServerListenerAdapter; import org.apache.ignite.internal.util.nio.GridNioSession; -import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.U; -import org.apache.ignite.lang.IgniteBiTuple; import org.jetbrains.annotations.Nullable; -import java.util.ArrayList; +import java.io.IOException; +import java.util.Arrays; import java.util.Collection; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.atomic.AtomicLong; - -import static org.apache.ignite.internal.processors.odbc.OdbcRequest.CLOSE_SQL_QUERY; -import static org.apache.ignite.internal.processors.odbc.OdbcRequest.EXECUTE_SQL_QUERY; -import static org.apache.ignite.internal.processors.odbc.OdbcRequest.FETCH_SQL_QUERY; -import static org.apache.ignite.internal.processors.odbc.OdbcRequest.GET_COLUMNS_META; -import static org.apache.ignite.internal.processors.odbc.OdbcRequest.GET_TABLES_META; /** * SQL query handler. */ -public class OdbcNioListener extends GridNioServerListenerAdapter<OdbcRequest> { - /** Query ID sequence. */ - private static final AtomicLong qryIdGen = new AtomicLong(); +public class OdbcNioListener extends GridNioServerListenerAdapter<byte[]> { + /** Initial output stream capacity. */ + private static final int INIT_CAP = 1024; /** Kernel context. */ private final GridKernalContext ctx; @@ -63,17 +52,24 @@ public class OdbcNioListener extends GridNioServerListenerAdapter<OdbcRequest> { /** Logger. */ private final IgniteLogger log; - /** Current queries cursors. */ - private final ConcurrentHashMap<Long, IgniteBiTuple<QueryCursor, Iterator>> qryCurs = new ConcurrentHashMap<>(); + /** Request handler. */ + private final OdbcRequestHandler handler; + + /** Marshaller. */ + private final GridBinaryMarshaller marsh; /** * Constructor. * * @param ctx Context. */ - public OdbcNioListener(final GridKernalContext ctx, final GridSpinBusyLock busyLock) { + public OdbcNioListener(final GridKernalContext ctx, final GridSpinBusyLock busyLock, + final OdbcRequestHandler handler) { this.ctx = ctx; this.busyLock = busyLock; + this.handler = handler; + + this.marsh = ((CacheObjectBinaryProcessorImpl)ctx.cacheObjects()).marshaller(); this.log = ctx.log(getClass()); } @@ -98,303 +94,282 @@ public class OdbcNioListener extends GridNioServerListenerAdapter<OdbcRequest> { } /** {@inheritDoc} */ - @Override public void onMessage(GridNioSession ses, OdbcRequest msg) { + @Override public void onMessage(GridNioSession ses, byte[] msg) { assert msg != null; - if (log.isDebugEnabled()) - log.debug("Received request from client: [msg=" + msg + ']'); + try { + OdbcRequest req = decode(msg); - OdbcResponse res = handle(msg); + OdbcResponse rsp = handle(req); - if (log.isDebugEnabled()) - log.debug("Handling result: [res=" + res.status() + ']'); + if (log.isDebugEnabled()) + log.debug("Handling result: [res=" + rsp.status() + ']'); + + byte[] outMsg = encode(rsp); - ses.send(res); + ses.send(outMsg); + } + catch (Exception e) { + trySendErrorMessage(ses, e.getMessage()); + } } /** - * Handle request. - * - * @param req Request. - * @return Response. + * Try to send simple response message to ODBC driver. + * @param ses Session. + * @param err Error message. */ - public OdbcResponse handle(OdbcRequest req) { - assert req != null; - - if (!busyLock.enterBusy()) { - String errMsg = "Failed to handle request [req=" + req + - ", err=Received request while stopping grid]"; - - U.error(log, errMsg); - - return new OdbcResponse(OdbcResponse.STATUS_FAILED, errMsg); - } + private void trySendErrorMessage(GridNioSession ses, String err) { + log.error(err); try { - switch (req.command()) { - case EXECUTE_SQL_QUERY: - return executeQuery((OdbcQueryExecuteRequest)req); - - case FETCH_SQL_QUERY: - return fetchQuery((OdbcQueryFetchRequest)req); - - case CLOSE_SQL_QUERY: - return closeQuery((OdbcQueryCloseRequest)req); - - case GET_COLUMNS_META: - return getColumnsMeta((OdbcQueryGetColumnsMetaRequest) req); - - case GET_TABLES_META: - return getTablesMeta((OdbcQueryGetTablesMetaRequest) req); - } - - return new OdbcResponse(OdbcResponse.STATUS_FAILED, - "Failed to find registered handler for command: " + req.command()); + ses.send(encode(new OdbcResponse(OdbcResponse.STATUS_FAILED, err))); } - finally { - busyLock.leaveBusy(); + catch (Exception e) { + log.error("Can not send error response message: [err=" + e.getMessage() + ']'); } } /** - * {@link OdbcQueryExecuteRequest} command handler. + * Decode OdbcRequest from byte array. * - * @param req Execute query request. - * @return Response. + * @param msg Message. + * @return Assembled ODBC request. */ - private OdbcResponse executeQuery(OdbcQueryExecuteRequest req) { - long qryId = qryIdGen.getAndIncrement(); + private OdbcRequest decode(byte[] msg) throws IOException { + assert msg != null; - try { - SqlFieldsQuery qry = new SqlFieldsQuery(req.sqlQuery()); + BinaryInputStream stream = new BinaryHeapInputStream(msg); - qry.setArgs(req.arguments()); + BinaryReaderExImpl reader = new BinaryReaderExImpl(null, stream, null); - IgniteCache<Object, Object> cache = ctx.grid().cache(req.cacheName()); + OdbcRequest res; - if (cache == null) - return new OdbcResponse(OdbcResponse.STATUS_FAILED, - "Failed to find cache with name: " + req.cacheName()); + byte cmd = reader.readByte(); - QueryCursor qryCur = cache.query(qry); + switch (cmd) { + case OdbcRequest.EXECUTE_SQL_QUERY: { - Iterator cur = qryCur.iterator(); + String cache = reader.readString(); + String sql = reader.readString(); + int argsNum = reader.readInt(); - qryCurs.put(qryId, new IgniteBiTuple<>(qryCur, cur)); + if (log.isDebugEnabled()) { + log.debug("Message: [cmd=EXECUTE_SQL_QUERY, cache=" + cache + + ", query=" + sql + ", argsNum=" + argsNum + ']'); + } - List<?> fieldsMeta = ((QueryCursorImpl) qryCur).fieldsMeta(); + Object[] params = new Object[argsNum]; - if (log.isDebugEnabled()) - log.debug("Field meta: " + fieldsMeta); + for (int i = 0; i < argsNum; ++i) + params[i] = reader.readObjectDetached(); - OdbcQueryExecuteResult res = new OdbcQueryExecuteResult(qryId, convertMetadata(fieldsMeta)); + res = new OdbcQueryExecuteRequest(cache, sql, params); - return new OdbcResponse(res); - } - catch (Exception e) { - qryCurs.remove(qryId); + break; + } - return new OdbcResponse(OdbcResponse.STATUS_FAILED, e.getMessage()); - } - } + case OdbcRequest.FETCH_SQL_QUERY: { - /** - * {@link OdbcQueryCloseRequest} command handler. - * - * @param req Execute query request. - * @return Response. - */ - private OdbcResponse closeQuery(OdbcQueryCloseRequest req) { - try { - QueryCursor cur = qryCurs.get(req.queryId()).get1(); + long queryId = reader.readLong(); + int pageSize = reader.readInt(); - if (cur == null) - return new OdbcResponse(OdbcResponse.STATUS_FAILED, "Failed to find query with ID: " + req.queryId()); + if (log.isDebugEnabled()) + log.debug("Message: [cmd=FETCH_SQL_QUERY, queryId=" + queryId + ", pageSize=" + pageSize + ']'); - cur.close(); + res = new OdbcQueryFetchRequest(queryId, pageSize); - qryCurs.remove(req.queryId()); + break; + } - OdbcQueryCloseResult res = new OdbcQueryCloseResult(req.queryId()); + case OdbcRequest.CLOSE_SQL_QUERY: { - return new OdbcResponse(res); - } - catch (Exception e) { - qryCurs.remove(req.queryId()); + long queryId = reader.readLong(); - return new OdbcResponse(OdbcResponse.STATUS_FAILED, e.getMessage()); - } - } + if (log.isDebugEnabled()) { + log.debug("Message: [cmd=CLOSE_SQL_QUERY, queryId=" + queryId + ']'); + } - /** - * {@link OdbcQueryFetchRequest} command handler. - * - * @param req Execute query request. - * @return Response. - */ - private OdbcResponse fetchQuery(OdbcQueryFetchRequest req) { - try { - Iterator cur = qryCurs.get(req.queryId()).get2(); + res = new OdbcQueryCloseRequest(queryId); - if (cur == null) - return new OdbcResponse(OdbcResponse.STATUS_FAILED, - "Failed to find query with ID: " + req.queryId()); + break; + } - List<Object> items = new ArrayList<>(); + case OdbcRequest.GET_COLUMNS_META: { - for (int i = 0; i < req.pageSize() && cur.hasNext(); ++i) - items.add(cur.next()); + String cache = reader.readString(); + String table = reader.readString(); + String column = reader.readString(); - OdbcQueryFetchResult res = new OdbcQueryFetchResult(req.queryId(), items, !cur.hasNext()); + if (log.isDebugEnabled()) { + log.debug("Message: [cmd=GET_COLUMNS_META, cache=" + cache + + ", table=" + table + ", column: " + column + ']'); + } - return new OdbcResponse(res); - } - catch (Exception e) { - qryCurs.remove(req.queryId()); + res = new OdbcQueryGetColumnsMetaRequest(cache, table, column); + + break; + } + + case OdbcRequest.GET_TABLES_META: { + + String catalog = reader.readString(); + String schema = reader.readString(); + String table = reader.readString(); + String tableType = reader.readString(); - return new OdbcResponse(OdbcResponse.STATUS_FAILED, e.getMessage()); + if (log.isDebugEnabled()) { + log.debug("Message: [cmd=GET_COLUMNS_META, catalog=" + catalog + + ", schema=" + schema + ", table=" + table + ", tableType=" + tableType + ']'); + } + + res = new OdbcQueryGetTablesMetaRequest(catalog, schema, table, tableType); + + break; + } + + default: + throw new IOException("Failed to parse incoming packet (unknown command type) " + + "[cmd=[" + Byte.toString(cmd) + ']'); } + + return res; } /** - * {@link OdbcQueryGetColumnsMetaRequest} command handler. + * Encode OdbcResponse to byte array. * - * @param req Get columns metadata request. - * @return Response. + * @param msg Message. + * @return Byte array. */ - private OdbcResponse getColumnsMeta(OdbcQueryGetColumnsMetaRequest req) { - try { - List<OdbcColumnMeta> meta = new ArrayList<>(); + private byte[] encode(OdbcResponse msg) throws IOException { + assert msg != null; - String cacheName; - String tableName; + // Creating new binary writer + BinaryRawWriterEx writer = marsh.writer(new BinaryHeapOutputStream(INIT_CAP)); - if (req.tableName().contains(".")) { - // Parsing two-part table name. - String[] parts = req.tableName().split("\\."); + // Writing status + writer.writeByte((byte) msg.status()); - cacheName = removeQuotationMarksIfNeeded(parts[0]); + if (msg.status() != OdbcResponse.STATUS_SUCCESS) { + writer.writeString(msg.error()); - tableName = parts[1]; - } - else { - cacheName = removeQuotationMarksIfNeeded(req.cacheName()); + return Arrays.copyOf(writer.out().array(), writer.out().position()); + } - tableName = req.tableName(); - } + Object res0 = msg.response(); - Collection<GridQueryTypeDescriptor> tablesMeta = ctx.query().types(cacheName); + if (res0 instanceof OdbcQueryExecuteResult) { + OdbcQueryExecuteResult res = (OdbcQueryExecuteResult) res0; - for (GridQueryTypeDescriptor table : tablesMeta) { - if (!matches(table.name(), tableName)) - continue; + if (log.isDebugEnabled()) + log.debug("Resulting query ID: " + res.getQueryId()); - for (Map.Entry<String, Class<?>> field : table.fields().entrySet()) { - if (!matches(field.getKey(), req.columnName())) - continue; + writer.writeLong(res.getQueryId()); - OdbcColumnMeta columnMeta = new OdbcColumnMeta(req.cacheName(), - table.name(), field.getKey(), field.getValue()); + Collection<OdbcColumnMeta> metas = res.getColumnsMetadata(); - if (!meta.contains(columnMeta)) - meta.add(columnMeta); - } - } - OdbcQueryGetColumnsMetaResult res = new OdbcQueryGetColumnsMetaResult(meta); + assert metas != null; + + writer.writeInt(metas.size()); + + for (OdbcColumnMeta meta : metas) + meta.writeBinary(writer, marsh.context()); - return new OdbcResponse(res); - } - catch (Exception e) { - return new OdbcResponse(OdbcResponse.STATUS_FAILED, e.getMessage()); } - } + else if (res0 instanceof OdbcQueryFetchResult) { + OdbcQueryFetchResult res = (OdbcQueryFetchResult) res0; - /** - * {@link OdbcQueryGetTablesMetaRequest} command handler. - * - * @param req Get tables metadata request. - * @return Response. - */ - private OdbcResponse getTablesMeta(OdbcQueryGetTablesMetaRequest req) { - try { - List<OdbcTableMeta> meta = new ArrayList<>(); + if (log.isDebugEnabled()) + log.debug("Resulting query ID: " + res.queryId()); + + writer.writeLong(res.queryId()); - String realSchema = removeQuotationMarksIfNeeded(req.schema()); + Collection<?> items0 = res.items(); - for (String cacheName : ctx.cache().cacheNames()) - { - if (!matches(cacheName, realSchema)) - continue; + assert items0 != null; - Collection<GridQueryTypeDescriptor> tablesMeta = ctx.query().types(cacheName); + writer.writeBoolean(res.last()); - for (GridQueryTypeDescriptor table : tablesMeta) { - if (!matches(table.name(), req.table())) - continue; + writer.writeInt(items0.size()); - if (!matches("TABLE", req.tableType())) - continue; + for (Object row0 : items0) { + if (row0 != null) { + Collection<?> row = (Collection<?>)row0; - OdbcTableMeta tableMeta = new OdbcTableMeta(req.catalog(), cacheName, - table.name(), "TABLE"); + writer.writeInt(row.size()); - if (!meta.contains(tableMeta)) - meta.add(tableMeta); + for (Object obj : row) + writer.writeObjectDetached(obj); } } + } + else if (res0 instanceof OdbcQueryCloseResult) { + OdbcQueryCloseResult res = (OdbcQueryCloseResult) res0; + + if (log.isDebugEnabled()) + log.debug("Resulting query ID: " + res.getQueryId()); - OdbcQueryGetTablesMetaResult res = new OdbcQueryGetTablesMetaResult(meta); + writer.writeLong(res.getQueryId()); - return new OdbcResponse(res); } - catch (Exception e) { - return new OdbcResponse(OdbcResponse.STATUS_FAILED, e.getMessage()); + else if (res0 instanceof OdbcQueryGetColumnsMetaResult) { + OdbcQueryGetColumnsMetaResult res = (OdbcQueryGetColumnsMetaResult) res0; + + Collection<OdbcColumnMeta> columnsMeta = res.meta(); + + assert columnsMeta != null; + + writer.writeInt(columnsMeta.size()); + + for (OdbcColumnMeta columnMeta : columnsMeta) + columnMeta.writeBinary(writer, marsh.context()); + } - } + else if (res0 instanceof OdbcQueryGetTablesMetaResult) { + OdbcQueryGetTablesMetaResult res = (OdbcQueryGetTablesMetaResult) res0; - /** - * Convert metadata in collection from {@link GridQueryFieldMetadata} to - * {@link OdbcColumnMeta}. - * - * @param meta Internal query field metadata. - * @return Odbc query field metadata. - */ - private static Collection<OdbcColumnMeta> convertMetadata(Collection<?> meta) { - List<OdbcColumnMeta> res = new ArrayList<>(); + Collection<OdbcTableMeta> tablesMeta = res.meta(); - if (meta != null) { - for (Object info : meta) { - assert info instanceof GridQueryFieldMetadata; + assert tablesMeta != null; - res.add(new OdbcColumnMeta((GridQueryFieldMetadata)info)); - } + writer.writeInt(tablesMeta.size()); + + for (OdbcTableMeta tableMeta : tablesMeta) + tableMeta.writeBinary(writer); } + else + throw new IOException("Failed to serialize response packet (unknown response type)"); - return res; + return Arrays.copyOf(writer.out().array(), writer.out().position()); } /** - * Checks whether string matches SQL pattern. + * Handle request. * - * @param str String. - * @param ptrn Pattern. - * @return Whether string matches pattern. + * @param req Request. + * @return Response. */ - private static boolean matches(String str, String ptrn) { - return str != null && (F.isEmpty(ptrn) || - str.toUpperCase().matches(ptrn.toUpperCase().replace("%", ".*").replace("_", "."))); - } + private OdbcResponse handle(OdbcRequest req) { + assert req != null; - /** - * Remove quotation marks at the beginning and end of the string if present. - * - * @param str Input string. - * @return String without leading and trailing quotation marks. - */ - private static String removeQuotationMarksIfNeeded(String str) { - if (str.startsWith("\"") && str.endsWith("\"")) - return str.substring(1, str.length() - 1); + if (log.isDebugEnabled()) + log.debug("Received request from client: [req=" + req + ']'); + + if (!busyLock.enterBusy()) { + String errMsg = "Failed to handle request [req=" + req + + ", err=Received request while stopping grid]"; + + U.error(log, errMsg); - return str; + return new OdbcResponse(OdbcResponse.STATUS_FAILED, errMsg); + } + + try { + return handler.handle(req); + } + finally { + busyLock.leaveBusy(); + } } } http://git-wip-us.apache.org/repos/asf/ignite/blob/feb8350d/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/OdbcNioParser.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/OdbcNioParser.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/OdbcNioParser.java deleted file mode 100644 index 540f8b6..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/OdbcNioParser.java +++ /dev/null @@ -1,383 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.odbc; - -import org.apache.ignite.IgniteCheckedException; -import org.apache.ignite.IgniteLogger; -import org.apache.ignite.internal.GridKernalContext; -import org.apache.ignite.internal.binary.BinaryRawReaderEx; -import org.apache.ignite.internal.binary.BinaryRawWriterEx; -import org.apache.ignite.internal.binary.BinaryReaderExImpl; -import org.apache.ignite.internal.binary.GridBinaryMarshaller; -import org.apache.ignite.internal.binary.streams.BinaryHeapInputStream; -import org.apache.ignite.internal.binary.streams.BinaryHeapOutputStream; -import org.apache.ignite.internal.binary.streams.BinaryInputStream; -import org.apache.ignite.internal.processors.cache.binary.CacheObjectBinaryProcessorImpl; -import org.apache.ignite.internal.util.nio.GridNioParser; -import org.apache.ignite.internal.util.nio.GridNioSession; -import org.jetbrains.annotations.Nullable; - -import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.Collection; - -/** - * ODBC protocol parser. - */ -public class OdbcNioParser implements GridNioParser { - /** Initial output stream capacity. */ - private static final int INIT_CAP = 1024; - - /** Length in bytes of the remaining message part. */ - private int leftToReceive = 0; - - /** Already received bytes of current message. */ - private ByteBuffer curMsg = null; - - /** Marshaller. */ - private final GridBinaryMarshaller marsh; - - /** Logger. */ - private final IgniteLogger log; - - /** - * Constructor. - * - * @param ctx Kernel context. - */ - public OdbcNioParser(GridKernalContext ctx) { - marsh = ((CacheObjectBinaryProcessorImpl)ctx.cacheObjects()).marshaller(); - - log = ctx.log(getClass()); - } - - /** - * Process data chunk and try to construct new message using stored and - * freshly received data. - * - * @param buf Fresh data buffer. - * @return Instance of the {@link BinaryReaderExImpl} positioned to read - * from the beginning of the message on success and null otherwise. - */ - private BinaryRawReaderEx tryConstructMessage(ByteBuffer buf) { - if (leftToReceive != 0) { - // Still receiving message - int toConsume = Math.min(leftToReceive, buf.remaining()); - - curMsg.put(buf.array(), buf.arrayOffset(), toConsume); - - leftToReceive -= toConsume; - - buf.position(buf.position() + toConsume); - - if (leftToReceive != 0) - return null; - - BinaryInputStream stream = new BinaryHeapInputStream(curMsg.array()); - - BinaryReaderExImpl reader = new BinaryReaderExImpl(null, stream, null); - - curMsg = null; - - return reader; - } - - // Receiving new message - BinaryInputStream stream = new BinaryHeapInputStream(buf.array()); - - BinaryReaderExImpl reader = new BinaryReaderExImpl(null, stream, null); - - // Getting message length. It's in the first four bytes of the message. - int msgLen = reader.readInt(); - - // Just skipping int here to sync position. - buf.getInt(); - - int remaining = buf.remaining(); - - // Checking if we have not entire message in buffer. - if (msgLen > remaining) { - leftToReceive = msgLen - remaining; - - curMsg = ByteBuffer.allocate(msgLen); - - curMsg.put(buf); - - return null; - } - - buf.position(buf.position() + msgLen); - - return reader; - } - - /** {@inheritDoc} */ - @Nullable @Override public OdbcRequest decode(GridNioSession ses, ByteBuffer buf) - throws IOException, IgniteCheckedException { - BinaryRawReaderEx messageReader = tryConstructMessage(buf); - - return messageReader == null ? null : readRequest(ses, messageReader); - } - - /** {@inheritDoc} */ - @Override public ByteBuffer encode(GridNioSession ses, Object msg) throws IOException, IgniteCheckedException { - assert msg != null; - assert msg instanceof OdbcResponse; - - if (log.isDebugEnabled()) - log.debug("Encoding query processing result"); - - BinaryRawWriterEx writer = marsh.writer(new BinaryHeapOutputStream(INIT_CAP)); - - // Reserving space for the message length. - int msgLenPos = writer.reserveInt(); - - writeResponse(ses, writer, (OdbcResponse)msg); - - int msgLenWithHdr = writer.out().position() - msgLenPos; - - int msgLen = msgLenWithHdr - 4; - - writer.writeInt(msgLenPos, msgLen); - - ByteBuffer buf = ByteBuffer.allocate(msgLenWithHdr); - - buf.put(writer.out().array(), msgLenPos, msgLenWithHdr); - - buf.flip(); - - return buf; - } - - /** - * Read ODBC request from the raw data using provided {@link BinaryReaderExImpl} instance. - * - * @param ses Current session. - * @param reader Reader positioned to read the request. - * @return Instance of the {@link OdbcRequest}. - * @throws IOException if the type of the request is unknown to the parser. - */ - private OdbcRequest readRequest(GridNioSession ses, BinaryRawReaderEx reader) throws IOException { - OdbcRequest res; - - byte cmd = reader.readByte(); - - switch (cmd) { - case OdbcRequest.EXECUTE_SQL_QUERY: { - - String cache = reader.readString(); - String sql = reader.readString(); - int argsNum = reader.readInt(); - - if (log.isDebugEnabled()) { - log.debug("Message EXECUTE_SQL_QUERY:"); - log.debug("cache: " + cache); - log.debug("query: " + sql); - log.debug("argsNum: " + argsNum); - } - - Object[] params = new Object[argsNum]; - - for (int i = 0; i < argsNum; ++i) - params[i] = reader.readObjectDetached(); - - res = new OdbcQueryExecuteRequest(cache, sql, params); - - break; - } - - case OdbcRequest.FETCH_SQL_QUERY: { - - long queryId = reader.readLong(); - int pageSize = reader.readInt(); - - if (log.isDebugEnabled()) { - log.debug("Message FETCH_SQL_QUERY:"); - log.debug("queryId: " + queryId); - log.debug("pageSize: " + pageSize); - } - - res = new OdbcQueryFetchRequest(queryId, pageSize); - - break; - } - - case OdbcRequest.CLOSE_SQL_QUERY: { - - long queryId = reader.readLong(); - - if (log.isDebugEnabled()) { - log.debug("Message CLOSE_SQL_QUERY:"); - log.debug("queryId: " + queryId); - } - - res = new OdbcQueryCloseRequest(queryId); - - break; - } - - case OdbcRequest.GET_COLUMNS_META: { - - String cache = reader.readString(); - String table = reader.readString(); - String column = reader.readString(); - - if (log.isDebugEnabled()) { - log.debug("Message GET_COLUMNS_META:"); - log.debug("cache: " + cache); - log.debug("table: " + table); - log.debug("column: " + column); - } - - res = new OdbcQueryGetColumnsMetaRequest(cache, table, column); - - break; - } - - case OdbcRequest.GET_TABLES_META: { - - String catalog = reader.readString(); - String schema = reader.readString(); - String table = reader.readString(); - String tableType = reader.readString(); - - if (log.isDebugEnabled()) { - log.debug("Message GET_COLUMNS_META:"); - log.debug("catalog: " + catalog); - log.debug("schema: " + schema); - log.debug("table: " + table); - log.debug("tableType: " + tableType); - } - - res = new OdbcQueryGetTablesMetaRequest(catalog, schema, table, tableType); - - break; - } - - default: - throw new IOException("Failed to parse incoming packet (unknown command type) [ses=" + ses + - ", cmd=[" + Byte.toString(cmd) + ']'); - } - - return res; - } - - /** - * Write ODBC response using provided {@link BinaryRawWriterEx} instance. - * - * @param ses Current session. - * @param writer Writer. - * @param rsp ODBC response that should be written. - * @throws IOException if the type of the response is unknown to the parser. - */ - private void writeResponse(GridNioSession ses, BinaryRawWriterEx writer, OdbcResponse rsp) throws IOException { - // Writing status - writer.writeByte((byte)rsp.status()); - - if (rsp.status() != OdbcResponse.STATUS_SUCCESS) { - writer.writeString(rsp.error()); - - return; - } - - Object res0 = rsp.response(); - - if (res0 instanceof OdbcQueryExecuteResult) { - OdbcQueryExecuteResult res = (OdbcQueryExecuteResult) res0; - - if (log.isDebugEnabled()) - log.debug("Resulting query ID: " + res.getQueryId()); - - writer.writeLong(res.getQueryId()); - - Collection<OdbcColumnMeta> metas = res.getColumnsMetadata(); - - assert metas != null; - - writer.writeInt(metas.size()); - - for (OdbcColumnMeta meta : metas) - meta.writeBinary(writer, marsh.context()); - - } - else if (res0 instanceof OdbcQueryFetchResult) { - OdbcQueryFetchResult res = (OdbcQueryFetchResult) res0; - - if (log.isDebugEnabled()) - log.debug("Resulting query ID: " + res.queryId()); - - writer.writeLong(res.queryId()); - - Collection<?> items0 = res.items(); - - assert items0 != null; - - writer.writeBoolean(res.last()); - - writer.writeInt(items0.size()); - - for (Object row0 : items0) { - if (row0 != null) { - Collection<?> row = (Collection<?>)row0; - - writer.writeInt(row.size()); - - for (Object obj : row) - writer.writeObjectDetached(obj); - } - } - } - else if (res0 instanceof OdbcQueryCloseResult) { - OdbcQueryCloseResult res = (OdbcQueryCloseResult) res0; - - if (log.isDebugEnabled()) - log.debug("Resulting query ID: " + res.getQueryId()); - - writer.writeLong(res.getQueryId()); - - } - else if (res0 instanceof OdbcQueryGetColumnsMetaResult) { - OdbcQueryGetColumnsMetaResult res = (OdbcQueryGetColumnsMetaResult) res0; - - Collection<OdbcColumnMeta> columnsMeta = res.meta(); - - assert columnsMeta != null; - - writer.writeInt(columnsMeta.size()); - - for (OdbcColumnMeta columnMeta : columnsMeta) - columnMeta.writeBinary(writer, marsh.context()); - - } - else if (res0 instanceof OdbcQueryGetTablesMetaResult) { - OdbcQueryGetTablesMetaResult res = (OdbcQueryGetTablesMetaResult) res0; - - Collection<OdbcTableMeta> tablesMeta = res.meta(); - - assert tablesMeta != null; - - writer.writeInt(tablesMeta.size()); - - for (OdbcTableMeta tableMeta : tablesMeta) - tableMeta.writeBinary(writer); - - } - else - throw new IOException("Failed to serialize response packet (unknown response type) [ses=" + ses + "]"); - } -} http://git-wip-us.apache.org/repos/asf/ignite/blob/feb8350d/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/OdbcProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/OdbcProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/OdbcProcessor.java index 5d9e01f..a3ed422 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/OdbcProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/OdbcProcessor.java @@ -23,6 +23,7 @@ import org.apache.ignite.internal.GridKernalContext; import org.apache.ignite.internal.binary.BinaryMarshaller; import org.apache.ignite.internal.processors.GridProcessorAdapter; import org.apache.ignite.internal.util.GridSpinBusyLock; +import org.apache.ignite.internal.util.nio.GridBufferedParser; import org.apache.ignite.internal.util.nio.GridNioCodecFilter; import org.apache.ignite.internal.util.nio.GridNioServer; import org.apache.ignite.internal.util.typedef.internal.U; @@ -40,7 +41,7 @@ public class OdbcProcessor extends GridProcessorAdapter { private final GridSpinBusyLock busyLock = new GridSpinBusyLock(); /** OBCD TCP Server. */ - private GridNioServer<OdbcRequest> srv; + private GridNioServer<byte[]> srv; /** * @param ctx Kernal context. @@ -70,10 +71,10 @@ public class OdbcProcessor extends GridProcessorAdapter { int port = odbcCfg.getPort(); - srv = GridNioServer.<OdbcRequest>builder() + srv = GridNioServer.<byte[]>builder() .address(host) .port(port) - .listener(new OdbcNioListener(ctx, busyLock)) + .listener(new OdbcNioListener(ctx, busyLock, new OdbcRequestHandler(ctx))) .logger(log) .selectorCount(odbcCfg.getSelectorCount()) .gridName(ctx.gridName()) @@ -83,7 +84,7 @@ public class OdbcProcessor extends GridProcessorAdapter { .socketSendBufferSize(odbcCfg.getSendBufferSize()) .socketReceiveBufferSize(odbcCfg.getReceiveBufferSize()) .sendQueueLimit(odbcCfg.getSendQueueLimit()) - .filters(new GridNioCodecFilter(new OdbcNioParser(ctx), log, false)) + .filters(new GridNioCodecFilter(new GridBufferedParser(false, ByteOrder.BIG_ENDIAN), log, false)) .directMode(false) .idleTimeout(odbcCfg.getIdleTimeout()) .build(); http://git-wip-us.apache.org/repos/asf/ignite/blob/feb8350d/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/OdbcRequestHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/OdbcRequestHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/OdbcRequestHandler.java new file mode 100644 index 0000000..a6e4493 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/OdbcRequestHandler.java @@ -0,0 +1,335 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.odbc; + +import org.apache.ignite.IgniteCache; +import org.apache.ignite.IgniteLogger; +import org.apache.ignite.cache.query.QueryCursor; +import org.apache.ignite.cache.query.SqlFieldsQuery; +import org.apache.ignite.internal.GridKernalContext; +import org.apache.ignite.internal.processors.cache.QueryCursorImpl; +import org.apache.ignite.internal.processors.query.GridQueryFieldMetadata; +import org.apache.ignite.internal.processors.query.GridQueryTypeDescriptor; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.lang.IgniteBiTuple; + +import java.util.*; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicLong; + +import static org.apache.ignite.internal.processors.odbc.OdbcRequest.*; + +/** + * ODBC request handler. + */ +public class OdbcRequestHandler { + /** Query ID sequence. */ + private static final AtomicLong qryIdGen = new AtomicLong(); + + /** Kernel context. */ + private final GridKernalContext ctx; + + /** Logger. */ + private final IgniteLogger log; + + /** Current queries cursors. */ + private final ConcurrentHashMap<Long, IgniteBiTuple<QueryCursor, Iterator>> qryCurs = new ConcurrentHashMap<>(); + + /** + * Constructor. + * + * @param ctx Context. + */ + public OdbcRequestHandler(final GridKernalContext ctx) { + this.ctx = ctx; + + this.log = ctx.log(getClass()); + } + + /** + * Handle request. + * + * @param req Request. + * @return Response. + */ + public OdbcResponse handle(OdbcRequest req) { + assert req != null; + + switch (req.command()) { + case EXECUTE_SQL_QUERY: + return executeQuery((OdbcQueryExecuteRequest)req); + + case FETCH_SQL_QUERY: + return fetchQuery((OdbcQueryFetchRequest)req); + + case CLOSE_SQL_QUERY: + return closeQuery((OdbcQueryCloseRequest)req); + + case GET_COLUMNS_META: + return getColumnsMeta((OdbcQueryGetColumnsMetaRequest) req); + + case GET_TABLES_META: + return getTablesMeta((OdbcQueryGetTablesMetaRequest) req); + } + + return new OdbcResponse(OdbcResponse.STATUS_FAILED, + "Failed to find registered handler for command: " + req.command()); + } + + /** + * {@link OdbcQueryExecuteRequest} command handler. + * + * @param req Execute query request. + * @return Response. + */ + private OdbcResponse executeQuery(OdbcQueryExecuteRequest req) { + long qryId = qryIdGen.getAndIncrement(); + + try { + SqlFieldsQuery qry = new SqlFieldsQuery(req.sqlQuery()); + + qry.setArgs(req.arguments()); + + IgniteCache<Object, Object> cache = ctx.grid().cache(req.cacheName()); + + if (cache == null) + return new OdbcResponse(OdbcResponse.STATUS_FAILED, + "Failed to find cache with name: " + req.cacheName()); + + QueryCursor qryCur = cache.query(qry); + + Iterator cur = qryCur.iterator(); + + qryCurs.put(qryId, new IgniteBiTuple<>(qryCur, cur)); + + List<?> fieldsMeta = ((QueryCursorImpl) qryCur).fieldsMeta(); + + if (log.isDebugEnabled()) + log.debug("Field meta: [meta: " + fieldsMeta + ']'); + + OdbcQueryExecuteResult res = new OdbcQueryExecuteResult(qryId, convertMetadata(fieldsMeta)); + + return new OdbcResponse(res); + } + catch (Exception e) { + qryCurs.remove(qryId); + + return new OdbcResponse(OdbcResponse.STATUS_FAILED, e.getMessage()); + } + } + + /** + * {@link OdbcQueryCloseRequest} command handler. + * + * @param req Execute query request. + * @return Response. + */ + private OdbcResponse closeQuery(OdbcQueryCloseRequest req) { + try { + QueryCursor cur = qryCurs.get(req.queryId()).get1(); + + if (cur == null) + return new OdbcResponse(OdbcResponse.STATUS_FAILED, "Failed to find query with ID: " + req.queryId()); + + cur.close(); + + qryCurs.remove(req.queryId()); + + OdbcQueryCloseResult res = new OdbcQueryCloseResult(req.queryId()); + + return new OdbcResponse(res); + } + catch (Exception e) { + qryCurs.remove(req.queryId()); + + return new OdbcResponse(OdbcResponse.STATUS_FAILED, e.getMessage()); + } + } + + /** + * {@link OdbcQueryFetchRequest} command handler. + * + * @param req Execute query request. + * @return Response. + */ + private OdbcResponse fetchQuery(OdbcQueryFetchRequest req) { + try { + Iterator cur = qryCurs.get(req.queryId()).get2(); + + if (cur == null) + return new OdbcResponse(OdbcResponse.STATUS_FAILED, + "Failed to find query with ID: " + req.queryId()); + + List<Object> items = new ArrayList<>(); + + for (int i = 0; i < req.pageSize() && cur.hasNext(); ++i) + items.add(cur.next()); + + OdbcQueryFetchResult res = new OdbcQueryFetchResult(req.queryId(), items, !cur.hasNext()); + + return new OdbcResponse(res); + } + catch (Exception e) { + qryCurs.remove(req.queryId()); + + return new OdbcResponse(OdbcResponse.STATUS_FAILED, e.getMessage()); + } + } + + /** + * {@link OdbcQueryGetColumnsMetaRequest} command handler. + * + * @param req Get columns metadata request. + * @return Response. + */ + private OdbcResponse getColumnsMeta(OdbcQueryGetColumnsMetaRequest req) { + try { + List<OdbcColumnMeta> meta = new ArrayList<>(); + + String cacheName; + String tableName; + + if (req.tableName().contains(".")) { + // Parsing two-part table name. + String[] parts = req.tableName().split("\\."); + + cacheName = removeQuotationMarksIfNeeded(parts[0]); + + tableName = parts[1]; + } + else { + cacheName = removeQuotationMarksIfNeeded(req.cacheName()); + + tableName = req.tableName(); + } + + Collection<GridQueryTypeDescriptor> tablesMeta = ctx.query().types(cacheName); + + for (GridQueryTypeDescriptor table : tablesMeta) { + if (!matches(table.name(), tableName)) + continue; + + for (Map.Entry<String, Class<?>> field : table.fields().entrySet()) { + if (!matches(field.getKey(), req.columnName())) + continue; + + OdbcColumnMeta columnMeta = new OdbcColumnMeta(req.cacheName(), + table.name(), field.getKey(), field.getValue()); + + if (!meta.contains(columnMeta)) + meta.add(columnMeta); + } + } + OdbcQueryGetColumnsMetaResult res = new OdbcQueryGetColumnsMetaResult(meta); + + return new OdbcResponse(res); + } + catch (Exception e) { + return new OdbcResponse(OdbcResponse.STATUS_FAILED, e.getMessage()); + } + } + + /** + * {@link OdbcQueryGetTablesMetaRequest} command handler. + * + * @param req Get tables metadata request. + * @return Response. + */ + private OdbcResponse getTablesMeta(OdbcQueryGetTablesMetaRequest req) { + try { + List<OdbcTableMeta> meta = new ArrayList<>(); + + String realSchema = removeQuotationMarksIfNeeded(req.schema()); + + for (String cacheName : ctx.cache().cacheNames()) + { + if (!matches(cacheName, realSchema)) + continue; + + Collection<GridQueryTypeDescriptor> tablesMeta = ctx.query().types(cacheName); + + for (GridQueryTypeDescriptor table : tablesMeta) { + if (!matches(table.name(), req.table())) + continue; + + if (!matches("TABLE", req.tableType())) + continue; + + OdbcTableMeta tableMeta = new OdbcTableMeta(req.catalog(), cacheName, + table.name(), "TABLE"); + + if (!meta.contains(tableMeta)) + meta.add(tableMeta); + } + } + + OdbcQueryGetTablesMetaResult res = new OdbcQueryGetTablesMetaResult(meta); + + return new OdbcResponse(res); + } + catch (Exception e) { + return new OdbcResponse(OdbcResponse.STATUS_FAILED, e.getMessage()); + } + } + + /** + * Convert metadata in collection from {@link GridQueryFieldMetadata} to + * {@link OdbcColumnMeta}. + * + * @param meta Internal query field metadata. + * @return Odbc query field metadata. + */ + private static Collection<OdbcColumnMeta> convertMetadata(Collection<?> meta) { + List<OdbcColumnMeta> res = new ArrayList<>(); + + if (meta != null) { + for (Object info : meta) { + assert info instanceof GridQueryFieldMetadata; + + res.add(new OdbcColumnMeta((GridQueryFieldMetadata)info)); + } + } + + return res; + } + + /** + * Checks whether string matches SQL pattern. + * + * @param str String. + * @param ptrn Pattern. + * @return Whether string matches pattern. + */ + private static boolean matches(String str, String ptrn) { + return str != null && (F.isEmpty(ptrn) || + str.toUpperCase().matches(ptrn.toUpperCase().replace("%", ".*").replace("_", "."))); + } + + /** + * Remove quotation marks at the beginning and end of the string if present. + * + * @param str Input string. + * @return String without leading and trailing quotation marks. + */ + private static String removeQuotationMarksIfNeeded(String str) { + if (str.startsWith("\"") && str.endsWith("\"")) + return str.substring(1, str.length() - 1); + + return str; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/feb8350d/modules/platforms/cpp/odbc/include/ignite/odbc/parser.h ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/odbc/include/ignite/odbc/parser.h b/modules/platforms/cpp/odbc/include/ignite/odbc/parser.h index 8b8f277..f589531 100644 --- a/modules/platforms/cpp/odbc/include/ignite/odbc/parser.h +++ b/modules/platforms/cpp/odbc/include/ignite/odbc/parser.h @@ -73,12 +73,8 @@ namespace ignite BinaryWriterImpl writer(&outStream, 0); - int32_t msgLenPos = outStream.Reserve(4); - msg.Write(writer); - outStream.WriteInt32(msgLenPos, outStream.Position() - 4); - buf.resize(outStream.Position()); memcpy(&buf[0], outMem.Data(), outStream.Position()); http://git-wip-us.apache.org/repos/asf/ignite/blob/feb8350d/modules/platforms/cpp/odbc/src/connection.cpp ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/odbc/src/connection.cpp b/modules/platforms/cpp/odbc/src/connection.cpp index 5277038..2905ed7 100644 --- a/modules/platforms/cpp/odbc/src/connection.cpp +++ b/modules/platforms/cpp/odbc/src/connection.cpp @@ -170,11 +170,25 @@ namespace ignite if (!connected) return false; - size_t sent = 0; + OdbcProtocolHeader hdr; + + // Lenght should has Big Endian byte order. + hdr.len = htonl(static_cast<unsigned long>(len)); + + int sent = socket.Send(reinterpret_cast<int8_t*>(&hdr), sizeof(hdr)); + + LOG_MSG("Sent: %d\n", sent); + + if (sent != sizeof(hdr)) + return false; + + sent = 0; while (sent != len) { - size_t res = socket.Send(data + sent, len - sent); + int res = socket.Send(data + sent, len - sent); + + LOG_MSG("Sent: %d\n", res); if (res <= 0) return false; @@ -195,6 +209,10 @@ namespace ignite OdbcProtocolHeader hdr; int received = socket.Receive(reinterpret_cast<int8_t*>(&hdr), sizeof(hdr)); + + // Lenght has Big Endian byte order. + hdr.len = ntohl(hdr.len); + LOG_MSG("Received: %d\n", received); if (received != sizeof(hdr))
