http://git-wip-us.apache.org/repos/asf/ignite/blob/e92707b6/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcRequestHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcRequestHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcRequestHandler.java new file mode 100644 index 0000000..b319293 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcRequestHandler.java @@ -0,0 +1,285 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.odbc.jdbc; + +import org.apache.ignite.IgniteCache; +import org.apache.ignite.IgniteLogger; +import org.apache.ignite.cache.query.SqlFieldsQuery; +import org.apache.ignite.internal.GridKernalContext; +import org.apache.ignite.internal.processors.cache.QueryCursorImpl; +import org.apache.ignite.internal.processors.odbc.SqlListenerRequest; +import org.apache.ignite.internal.processors.odbc.SqlListenerRequestHandler; +import org.apache.ignite.internal.processors.odbc.SqlListenerResponse; +import org.apache.ignite.internal.util.GridSpinBusyLock; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.internal.util.typedef.internal.U; + +import java.util.List; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicLong; + +import static org.apache.ignite.internal.processors.odbc.jdbc.JdbcRequest.QRY_CLOSE; +import static org.apache.ignite.internal.processors.odbc.jdbc.JdbcRequest.QRY_EXEC; +import static org.apache.ignite.internal.processors.odbc.jdbc.JdbcRequest.QRY_FETCH; +import static org.apache.ignite.internal.processors.odbc.jdbc.JdbcRequest.QRY_META; + +/** + * SQL query handler. + */ +public class JdbcRequestHandler implements SqlListenerRequestHandler { + /** Query ID sequence. */ + private static final AtomicLong QRY_ID_GEN = new AtomicLong(); + + /** Kernel context. */ + private final GridKernalContext ctx; + + /** Logger. */ + private final IgniteLogger log; + + /** Busy lock. */ + private final GridSpinBusyLock busyLock; + + /** Maximum allowed cursors. */ + private final int maxCursors; + + /** Current queries cursors. */ + private final ConcurrentHashMap<Long, JdbcQueryCursor> qryCursors = new ConcurrentHashMap<>(); + + /** Distributed joins flag. */ + private final boolean distributedJoins; + + /** Enforce join order flag. */ + private final boolean enforceJoinOrder; + + /** + * Constructor. + * + * @param ctx Context. + * @param busyLock Shutdown latch. + * @param maxCursors Maximum allowed cursors. + * @param distributedJoins Distributed joins flag. + * @param enforceJoinOrder Enforce join order flag. + */ + public JdbcRequestHandler(GridKernalContext ctx, GridSpinBusyLock busyLock, int maxCursors, + boolean distributedJoins, boolean enforceJoinOrder) { + this.ctx = ctx; + this.busyLock = busyLock; + this.maxCursors = maxCursors; + this.distributedJoins = distributedJoins; + this.enforceJoinOrder = enforceJoinOrder; + + log = ctx.log(getClass()); + } + + /** {@inheritDoc} */ + @Override public SqlListenerResponse handle(SqlListenerRequest req0) { + assert req0 != null; + + assert req0 instanceof JdbcRequest; + + JdbcRequest req = (JdbcRequest)req0; + + if (!busyLock.enterBusy()) + return new JdbcResponse(SqlListenerResponse.STATUS_FAILED, + "Failed to handle JDBC request because node is stopping."); + + try { + switch (req.type()) { + case QRY_EXEC: + return executeQuery((JdbcQueryExecuteRequest)req); + + case QRY_FETCH: + return fetchQuery((JdbcQueryFetchRequest)req); + + case QRY_CLOSE: + return closeQuery((JdbcQueryCloseRequest)req); + + case QRY_META: + return getQueryMeta((JdbcQueryMetadataRequest)req); + } + + return new JdbcResponse(SqlListenerResponse.STATUS_FAILED, "Unsupported JDBC request [req=" + req + ']'); + } + finally { + busyLock.leaveBusy(); + } + } + + /** {@inheritDoc} */ + @Override public SqlListenerResponse handleException(Exception e) { + return new JdbcResponse(SqlListenerResponse.STATUS_FAILED, e.toString()); + } + + /** + * {@link JdbcQueryExecuteRequest} command handler. + * + * @param req Execute query request. + * @return Response. + */ + @SuppressWarnings("unchecked") + private JdbcResponse executeQuery(JdbcQueryExecuteRequest req) { + int cursorCnt = qryCursors.size(); + + if (maxCursors > 0 && cursorCnt >= maxCursors) + return new JdbcResponse(SqlListenerResponse.STATUS_FAILED, "Too many opened cursors (either close other " + + "opened cursors or increase the limit through OdbcConfiguration.setMaxOpenCursors()) " + + "[maximum=" + maxCursors + ", current=" + cursorCnt + ']'); + + long qryId = QRY_ID_GEN.getAndIncrement(); + + try { + String sql = req.sqlQuery(); + + SqlFieldsQuery qry = new SqlFieldsQuery(sql); + + qry.setArgs(req.arguments()); + + qry.setDistributedJoins(distributedJoins); + qry.setEnforceJoinOrder(enforceJoinOrder); + + if (req.pageSize() <= 0) + return new JdbcResponse(SqlListenerResponse.STATUS_FAILED, + "Invalid fetch size : [fetchSize=" + req.pageSize() + ']'); + + qry.setPageSize(req.pageSize()); + + IgniteCache<Object, Object> cache0 = ctx.grid().cache(req.schemaName()); + + if (cache0 == null) + return new JdbcResponse(SqlListenerResponse.STATUS_FAILED, + "Cache doesn't exist (did you configure it?): " + req.schemaName()); + + IgniteCache<Object, Object> cache = cache0.withKeepBinary(); + + if (cache == null) + return new JdbcResponse(SqlListenerResponse.STATUS_FAILED, + "Can not get cache with keep binary: " + req.schemaName()); + + JdbcQueryCursor cur = new JdbcQueryCursor( + qryId, req.pageSize(), req.maxRows(), (QueryCursorImpl)cache.query(qry)); + + qryCursors.put(qryId, cur); + + JdbcQueryExecuteResult res; + + if (cur.isQuery()) + res = new JdbcQueryExecuteResult(qryId, cur.fetchRows(), !cur.hasNext()); + else { + List<List<Object>> items = cur.fetchRows(); + + assert items != null && items.size() == 1 && items.get(0).size() == 1 + && items.get(0).get(0) instanceof Long : + "Invalid result set for not-SELECT query. [qry=" + sql + + ", res=" + S.toString(List.class, items) + ']'; + + res = new JdbcQueryExecuteResult(qryId, (Long)items.get(0).get(0)); + } + + return new JdbcResponse(res); + } + catch (Exception e) { + qryCursors.remove(qryId); + + U.error(log, "Failed to execute SQL query [reqId=" + req.requestId() + ", req=" + req + ']', e); + + return new JdbcResponse(SqlListenerResponse.STATUS_FAILED, e.toString()); + } + } + + /** + * {@link JdbcQueryCloseRequest} command handler. + * + * @param req Execute query request. + * @return Response. + */ + private JdbcResponse closeQuery(JdbcQueryCloseRequest req) { + try { + JdbcQueryCursor cur = qryCursors.remove(req.queryId()); + + if (cur == null) + return new JdbcResponse(SqlListenerResponse.STATUS_FAILED, + "Failed to find query cursor with ID: " + req.queryId()); + + cur.close(); + + return new JdbcResponse(null); + } + catch (Exception e) { + qryCursors.remove(req.queryId()); + + U.error(log, "Failed to close SQL query [reqId=" + req.requestId() + ", req=" + req.queryId() + ']', e); + + return new JdbcResponse(SqlListenerResponse.STATUS_FAILED, e.toString()); + } + } + + /** + * {@link JdbcQueryFetchRequest} command handler. + * + * @param req Execute query request. + * @return Response. + */ + private JdbcResponse fetchQuery(JdbcQueryFetchRequest req) { + try { + JdbcQueryCursor cur = qryCursors.get(req.queryId()); + + if (cur == null) + return new JdbcResponse(SqlListenerResponse.STATUS_FAILED, + "Failed to find query cursor with ID: " + req.queryId()); + + if (req.pageSize() <= 0) + return new JdbcResponse(SqlListenerResponse.STATUS_FAILED, + "Invalid fetch size : [fetchSize=" + req.pageSize() + ']'); + + cur.pageSize(req.pageSize()); + + JdbcQueryFetchResult res = new JdbcQueryFetchResult(cur.fetchRows(), !cur.hasNext()); + + return new JdbcResponse(res); + } + catch (Exception e) { + U.error(log, "Failed to fetch SQL query result [reqId=" + req.requestId() + ", req=" + req + ']', e); + + return new JdbcResponse(SqlListenerResponse.STATUS_FAILED, e.toString()); + } + } + + /** + * @param req Request. + * @return Response. + */ + private JdbcResponse getQueryMeta(JdbcQueryMetadataRequest req) { + try { + JdbcQueryCursor cur = qryCursors.get(req.queryId()); + + if (cur == null) + return new JdbcResponse(SqlListenerResponse.STATUS_FAILED, + "Failed to find query with ID: " + req.queryId()); + + JdbcQueryMetadataResult res = new JdbcQueryMetadataResult(req.queryId(), + cur.meta()); + + return new JdbcResponse(res); + } + catch (Exception e) { + U.error(log, "Failed to fetch SQL query result [reqId=" + req.requestId() + ", req=" + req + ']', e); + + return new JdbcResponse(SqlListenerResponse.STATUS_FAILED, e.toString()); + } + } +}
http://git-wip-us.apache.org/repos/asf/ignite/blob/e92707b6/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcResponse.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcResponse.java new file mode 100644 index 0000000..f039db7 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcResponse.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.odbc.jdbc; + +import org.apache.ignite.binary.BinaryObjectException; +import org.apache.ignite.internal.binary.BinaryReaderExImpl; +import org.apache.ignite.internal.binary.BinaryWriterExImpl; +import org.apache.ignite.internal.processors.odbc.SqlListenerResponse; +import org.apache.ignite.internal.util.tostring.GridToStringInclude; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.jetbrains.annotations.Nullable; + +/** + * SQL listener response. + */ +public class JdbcResponse extends SqlListenerResponse implements JdbcRawBinarylizable { + /** Response object. */ + @GridToStringInclude + private JdbcResult res; + + /** + * Default constructs is used for deserialization + */ + public JdbcResponse() { + super(-1, null); + } + + /** + * Constructs successful rest response. + * + * @param res Response result. + */ + public JdbcResponse(JdbcResult res) { + super(STATUS_SUCCESS, null); + + this.res = res; + } + + /** + * Constructs failed rest response. + * + * @param status Response status. + * @param err Error, {@code null} if success is {@code true}. + */ + public JdbcResponse(int status, @Nullable String err) { + super(status, err); + + assert status != STATUS_SUCCESS; + } + + /** + * @return Response object. + */ + public JdbcResult response() { + return res; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(JdbcResponse.class, this, "status", status(),"err", error()); + } + + /** {@inheritDoc} */ + @Override public void writeBinary(BinaryWriterExImpl writer) throws BinaryObjectException { + writer.writeInt(status()); + + if (status() == STATUS_SUCCESS) { + writer.writeBoolean(res != null); + + if (res != null) + res.writeBinary(writer); + } + else + writer.writeString(error()); + + } + + /** {@inheritDoc} */ + @Override public void readBinary(BinaryReaderExImpl reader) throws BinaryObjectException { + status(reader.readInt()); + + if (status() == STATUS_SUCCESS) { + if (reader.readBoolean()) + res = JdbcResult.readResult(reader); + } + else + error(reader.readString()); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/e92707b6/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcResult.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcResult.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcResult.java new file mode 100644 index 0000000..2d7666e --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcResult.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.odbc.jdbc; + +import org.apache.ignite.IgniteException; +import org.apache.ignite.binary.BinaryObjectException; +import org.apache.ignite.internal.binary.BinaryReaderExImpl; +import org.apache.ignite.internal.binary.BinaryWriterExImpl; + +/** + * SQL listener response. + */ +public class JdbcResult implements JdbcRawBinarylizable { + /** Execute sql result. */ + public static final byte QRY_EXEC = 2; + + /** Fetch query results. */ + public static final byte QRY_FETCH = 3; + + /** Get columns meta query result. */ + public static final byte QRY_META = 4; + + /** Success status. */ + private byte type; + + /** + * Constructs result. + * + * @param type Type of results. + */ + public JdbcResult(byte type) { + this.type = type; + } + + /** {@inheritDoc} */ + @Override public void writeBinary(BinaryWriterExImpl writer) throws BinaryObjectException { + writer.writeByte(type); + } + + /** {@inheritDoc} */ + @Override public void readBinary(BinaryReaderExImpl reader) throws BinaryObjectException { + // No-op. + } + + /** + * @param reader Binary reader. + * @return Request object. + * @throws BinaryObjectException On error. + */ + public static JdbcResult readResult(BinaryReaderExImpl reader) throws BinaryObjectException { + int resId = reader.readByte(); + + JdbcResult res; + + switch(resId) { + case QRY_EXEC: + res = new JdbcQueryExecuteResult(); + break; + + case QRY_FETCH: + res = new JdbcQueryFetchResult(); + break; + + case QRY_META: + res = new JdbcQueryMetadataResult(); + break; + + default: + throw new IgniteException("Unknown SQL listener request ID: [request ID=" + resId + ']'); + } + + res.readBinary(reader); + + return res; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/e92707b6/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcUtils.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcUtils.java new file mode 100644 index 0000000..65efbf5 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcUtils.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.odbc.jdbc; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import org.apache.ignite.internal.binary.BinaryReaderExImpl; +import org.apache.ignite.internal.binary.BinaryWriterExImpl; +import org.apache.ignite.internal.processors.odbc.SqlListenerUtils; + +/** + * Various JDBC utility methods. + */ +public class JdbcUtils { + /** + * @param writer Binari writer. + * @param items Query results items. + */ + public static void writeItems(BinaryWriterExImpl writer, List<List<Object>> items) { + writer.writeInt(items.size()); + + for (List<Object> row : items) { + if (row != null) { + writer.writeInt(row.size()); + + for (Object obj : row) + SqlListenerUtils.writeObject(writer, obj, false); + } + } + } + + /** + * @param reader Binary reader. + * @return Query results items. + */ + public static List<List<Object>> readItems(BinaryReaderExImpl reader) { + int rowsSize = reader.readInt(); + + if (rowsSize > 0) { + List<List<Object>> items = new ArrayList<>(rowsSize); + + for (int i = 0; i < rowsSize; ++i) { + int colsSize = reader.readInt(); + + List<Object> col = new ArrayList<>(colsSize); + + for (int colCnt = 0; colCnt < colsSize; ++colCnt) + col.add(SqlListenerUtils.readObject(reader, false)); + + items.add(col); + } + + return items; + } else + return Collections.emptyList(); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/e92707b6/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcColumnMeta.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcColumnMeta.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcColumnMeta.java new file mode 100644 index 0000000..d9d39de --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcColumnMeta.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.odbc.odbc; + +import org.apache.ignite.binary.BinaryRawWriter; +import org.apache.ignite.internal.binary.BinaryUtils; +import org.apache.ignite.internal.processors.odbc.OdbcUtils; +import org.apache.ignite.internal.processors.query.GridQueryFieldMetadata; + +/** + * SQL listener column metadata. + */ +public class OdbcColumnMeta { + /** Cache name. */ + private final String schemaName; + + /** Table name. */ + private final String tableName; + + /** Column name. */ + private final String columnName; + + /** Data type. */ + private final Class<?> dataType; + + /** + * @param schemaName Cache name. + * @param tableName Table name. + * @param columnName Column name. + * @param dataType Data type. + */ + public OdbcColumnMeta(String schemaName, String tableName, String columnName, Class<?> dataType) { + this.schemaName = OdbcUtils.addQuotationMarksIfNeeded(schemaName); + this.tableName = tableName; + this.columnName = columnName; + this.dataType = dataType; + } + + /** + * @param info Field metadata. + */ + public OdbcColumnMeta(GridQueryFieldMetadata info) { + this.schemaName = OdbcUtils.addQuotationMarksIfNeeded(info.schemaName()); + this.tableName = info.typeName(); + this.columnName = info.fieldName(); + + Class<?> type; + + try { + type = Class.forName(info.fieldTypeName()); + } + catch (Exception ignored) { + type = Object.class; + } + + this.dataType = type; + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + int hash = schemaName.hashCode(); + + hash = 31 * hash + tableName.hashCode(); + hash = 31 * hash + columnName.hashCode(); + hash = 31 * hash + dataType.hashCode(); + + return hash; + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object o) { + if (o instanceof OdbcColumnMeta) { + OdbcColumnMeta other = (OdbcColumnMeta) o; + + return this == other || schemaName.equals(other.schemaName) && tableName.equals(other.tableName) && + columnName.equals(other.columnName) && dataType.equals(other.dataType); + } + + return false; + } + + /** + * Write in a binary format. + * + * @param writer Binary writer. + */ + public void write(BinaryRawWriter writer) { + writer.writeString(schemaName); + writer.writeString(tableName); + writer.writeString(columnName); + + byte typeId = BinaryUtils.typeByClass(dataType); + + writer.writeByte(typeId); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/e92707b6/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcMessageParser.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcMessageParser.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcMessageParser.java index 300385f..02e8676 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcMessageParser.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcMessageParser.java @@ -17,7 +17,9 @@ package org.apache.ignite.internal.processors.odbc.odbc; +import java.util.Collection; import org.apache.ignite.IgniteException; +import org.apache.ignite.IgniteLogger; import org.apache.ignite.internal.GridKernalContext; import org.apache.ignite.internal.binary.BinaryReaderExImpl; import org.apache.ignite.internal.binary.BinaryThreadLocalContext; @@ -27,20 +29,34 @@ import org.apache.ignite.internal.binary.streams.BinaryHeapInputStream; import org.apache.ignite.internal.binary.streams.BinaryHeapOutputStream; import org.apache.ignite.internal.binary.streams.BinaryInputStream; import org.apache.ignite.internal.processors.cache.binary.CacheObjectBinaryProcessorImpl; -import org.apache.ignite.internal.processors.odbc.SqlListenerAbstractMessageParser; +import org.apache.ignite.internal.processors.odbc.SqlListenerMessageParser; +import org.apache.ignite.internal.processors.odbc.SqlListenerRequest; +import org.apache.ignite.internal.processors.odbc.SqlListenerResponse; +import org.apache.ignite.internal.processors.odbc.SqlListenerUtils; /** * JDBC message parser. */ -public class OdbcMessageParser extends SqlListenerAbstractMessageParser { +public class OdbcMessageParser implements SqlListenerMessageParser { /** Marshaller. */ private final GridBinaryMarshaller marsh; + /** Initial output stream capacity. */ + protected static final int INIT_CAP = 1024; + + /** Kernal context. */ + protected GridKernalContext ctx; + + /** Logger. */ + private final IgniteLogger log; + /** * @param ctx Context. */ public OdbcMessageParser(GridKernalContext ctx) { - super(ctx, new OdbcObjectReader(), new OdbcObjectWriter()); + this.ctx = ctx; + + log = ctx.log(getClass()); if (ctx.cacheObjects() instanceof CacheObjectBinaryProcessorImpl) { CacheObjectBinaryProcessorImpl cacheObjProc = (CacheObjectBinaryProcessorImpl)ctx.cacheObjects(); @@ -54,15 +70,198 @@ public class OdbcMessageParser extends SqlListenerAbstractMessageParser { } /** {@inheritDoc} */ - @Override protected BinaryReaderExImpl createReader(byte[] msg) { + @Override public SqlListenerRequest decode(byte[] msg) { + assert msg != null; + BinaryInputStream stream = new BinaryHeapInputStream(msg); - return new BinaryReaderExImpl(marsh.context(), stream, ctx.config().getClassLoader(), true); + BinaryReaderExImpl reader = new BinaryReaderExImpl(marsh.context(), stream, ctx.config().getClassLoader(), true); + + byte cmd = reader.readByte(); + + SqlListenerRequest res; + + switch (cmd) { + case OdbcRequest.QRY_EXEC: { + String cache = reader.readString(); + String sql = reader.readString(); + int argsNum = reader.readInt(); + + Object[] params = new Object[argsNum]; + + for (int i = 0; i < argsNum; ++i) + params[i] = SqlListenerUtils.readObject(reader, true); + + res = new OdbcQueryExecuteRequest(cache, sql, params); + + break; + } + + case OdbcRequest.QRY_FETCH: { + long queryId = reader.readLong(); + int pageSize = reader.readInt(); + + res = new OdbcQueryFetchRequest(queryId, pageSize); + + break; + } + + case OdbcRequest.QRY_CLOSE: { + long queryId = reader.readLong(); + + res = new OdbcQueryCloseRequest(queryId); + + break; + } + + case OdbcRequest.META_COLS: { + String cache = reader.readString(); + String table = reader.readString(); + String column = reader.readString(); + + res = new OdbcQueryGetColumnsMetaRequest(cache, table, column); + + break; + } + + case OdbcRequest.META_TBLS: { + String catalog = reader.readString(); + String schema = reader.readString(); + String table = reader.readString(); + String tableType = reader.readString(); + + res = new OdbcQueryGetTablesMetaRequest(catalog, schema, table, tableType); + + break; + } + + case OdbcRequest.META_PARAMS: { + String cacheName = reader.readString(); + String sqlQuery = reader.readString(); + + res = new OdbcQueryGetParamsMetaRequest(cacheName, sqlQuery); + + break; + } + + default: + throw new IgniteException("Unknown ODBC command: [cmd=" + cmd + ']'); + } + + return res; } /** {@inheritDoc} */ - @Override protected BinaryWriterExImpl createWriter(int cap) { - return new BinaryWriterExImpl(marsh.context(), new BinaryHeapOutputStream(cap), + @Override public byte[] encode(SqlListenerResponse msg0) { + assert msg0 != null; + + assert msg0 instanceof OdbcResponse; + + OdbcResponse msg = (OdbcResponse)msg0; + + // Creating new binary writer + BinaryWriterExImpl writer = new BinaryWriterExImpl(marsh.context(), new BinaryHeapOutputStream(INIT_CAP), BinaryThreadLocalContext.get().schemaHolder(), null); + + // Writing status. + writer.writeByte((byte) msg.status()); + + if (msg.status() != SqlListenerResponse.STATUS_SUCCESS) { + writer.writeString(msg.error()); + + return writer.array(); + } + + Object res0 = msg.response(); + + if (res0 == null) + return writer.array(); + else if (res0 instanceof OdbcQueryExecuteResult) { + OdbcQueryExecuteResult res = (OdbcQueryExecuteResult) res0; + + if (log.isDebugEnabled()) + log.debug("Resulting query ID: " + res.getQueryId()); + + writer.writeLong(res.getQueryId()); + + Collection<OdbcColumnMeta> metas = res.getColumnsMetadata(); + + assert metas != null; + + writer.writeInt(metas.size()); + + for (OdbcColumnMeta meta : metas) + meta.write(writer); + } + else if (res0 instanceof OdbcQueryFetchResult) { + OdbcQueryFetchResult res = (OdbcQueryFetchResult) res0; + + if (log.isDebugEnabled()) + log.debug("Resulting query ID: " + res.queryId()); + + writer.writeLong(res.queryId()); + + Collection<?> items0 = res.items(); + + assert items0 != null; + + writer.writeBoolean(res.last()); + + writer.writeInt(items0.size()); + + for (Object row0 : items0) { + if (row0 != null) { + Collection<?> row = (Collection<?>)row0; + + writer.writeInt(row.size()); + + for (Object obj : row) + SqlListenerUtils.writeObject(writer, obj, true); + } + } + } + else if (res0 instanceof OdbcQueryCloseResult) { + OdbcQueryCloseResult res = (OdbcQueryCloseResult) res0; + + if (log.isDebugEnabled()) + log.debug("Resulting query ID: " + res.getQueryId()); + + writer.writeLong(res.getQueryId()); + } + else if (res0 instanceof OdbcQueryGetColumnsMetaResult) { + OdbcQueryGetColumnsMetaResult res = (OdbcQueryGetColumnsMetaResult) res0; + + Collection<OdbcColumnMeta> columnsMeta = res.meta(); + + assert columnsMeta != null; + + writer.writeInt(columnsMeta.size()); + + for (OdbcColumnMeta columnMeta : columnsMeta) + columnMeta.write(writer); + } + else if (res0 instanceof OdbcQueryGetTablesMetaResult) { + OdbcQueryGetTablesMetaResult res = (OdbcQueryGetTablesMetaResult) res0; + + Collection<OdbcTableMeta> tablesMeta = res.meta(); + + assert tablesMeta != null; + + writer.writeInt(tablesMeta.size()); + + for (OdbcTableMeta tableMeta : tablesMeta) + tableMeta.writeBinary(writer); + } + else if (res0 instanceof OdbcQueryGetParamsMetaResult) { + OdbcQueryGetParamsMetaResult res = (OdbcQueryGetParamsMetaResult) res0; + + byte[] typeIds = res.typeIds(); + + SqlListenerUtils.writeObject(writer, typeIds, true); + } + else + assert false : "Should not reach here."; + + return writer.array(); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/e92707b6/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcObjectReader.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcObjectReader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcObjectReader.java deleted file mode 100644 index 586fbc5..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcObjectReader.java +++ /dev/null @@ -1,33 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.odbc.odbc; - -import org.apache.ignite.binary.BinaryObjectException; -import org.apache.ignite.internal.binary.BinaryReaderExImpl; -import org.apache.ignite.internal.processors.odbc.SqlListenerAbstractObjectReader; - -/** - * Binary reader with marshaling non-primitive and non-embedded objects with JDK marshaller. - */ -@SuppressWarnings("unchecked") -public class OdbcObjectReader extends SqlListenerAbstractObjectReader { - /** {@inheritDoc} */ - @Override protected Object readCustomObject(BinaryReaderExImpl reader) throws BinaryObjectException { - return reader.readObjectDetached(); - } -} http://git-wip-us.apache.org/repos/asf/ignite/blob/e92707b6/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcObjectWriter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcObjectWriter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcObjectWriter.java deleted file mode 100644 index c2f3aba..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcObjectWriter.java +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.odbc.odbc; - -import org.apache.ignite.binary.BinaryObjectException; -import org.apache.ignite.internal.binary.BinaryWriterExImpl; -import org.apache.ignite.internal.processors.odbc.SqlListenerAbstractObjectWriter; - -/** - * Binary writer with marshaling non-primitive and non-embedded objects with JDK marshaller.. - */ -public class OdbcObjectWriter extends SqlListenerAbstractObjectWriter { - /** {@inheritDoc} */ - @Override protected void writeCustomObject(BinaryWriterExImpl writer, Object obj) throws BinaryObjectException { - writer.writeObjectDetached(obj); - } -} http://git-wip-us.apache.org/repos/asf/ignite/blob/e92707b6/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcQueryCloseRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcQueryCloseRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcQueryCloseRequest.java new file mode 100644 index 0000000..a9decb7 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcQueryCloseRequest.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.odbc.odbc; + +import org.apache.ignite.internal.util.typedef.internal.S; + +/** + * SQL listener query close request. + */ +public class OdbcQueryCloseRequest extends OdbcRequest { + /** Query ID. */ + private final long queryId; + + /** + * @param queryId Query ID. + */ + public OdbcQueryCloseRequest(long queryId) { + super(QRY_CLOSE); + + this.queryId = queryId; + } + + /** + * @return Query ID. + */ + public long queryId() { + return queryId; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(OdbcQueryCloseRequest.class, this); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/e92707b6/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcQueryCloseResult.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcQueryCloseResult.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcQueryCloseResult.java new file mode 100644 index 0000000..eb156fa --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcQueryCloseResult.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.odbc.odbc; + +/** + * SQL listener query close result. + */ +public class OdbcQueryCloseResult { + /** Query ID. */ + private final long queryId; + + /** + * @param queryId Query ID. + */ + public OdbcQueryCloseResult(long queryId){ + this.queryId = queryId; + } + + /** + * @return Query ID. + */ + public long getQueryId() { + return queryId; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/e92707b6/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcQueryExecuteRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcQueryExecuteRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcQueryExecuteRequest.java new file mode 100644 index 0000000..dd674d2 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcQueryExecuteRequest.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.odbc.odbc; + +import org.apache.ignite.internal.util.tostring.GridToStringExclude; +import org.apache.ignite.internal.util.tostring.GridToStringInclude; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.jetbrains.annotations.Nullable; + +/** + * SQL listener query execute request. + */ +public class OdbcQueryExecuteRequest extends OdbcRequest { + /** Cache name. */ + private final String cacheName; + + /** Sql query. */ + @GridToStringInclude(sensitive = true) + private final String sqlQry; + + /** Sql query arguments. */ + @GridToStringExclude + private final Object[] args; + + /** + * @param cacheName Cache name. + * @param sqlQry SQL query. + * @param args Arguments list. + */ + public OdbcQueryExecuteRequest(String cacheName, String sqlQry, Object[] args) { + super(QRY_EXEC); + + this.cacheName = cacheName.isEmpty() ? null : cacheName; + this.sqlQry = sqlQry; + this.args = args; + } + + /** + * @return Sql query. + */ + public String sqlQuery() { + return sqlQry; + } + + /** + * @return Sql query arguments. + */ + public Object[] arguments() { + return args; + } + + /** + * @return Cache name. + */ + @Nullable public String cacheName() { + return cacheName; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(OdbcQueryExecuteRequest.class, this, "args", args, true); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/e92707b6/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcQueryExecuteResult.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcQueryExecuteResult.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcQueryExecuteResult.java new file mode 100644 index 0000000..de5a8fd --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcQueryExecuteResult.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.odbc.odbc; + +import java.util.Collection; + +/** + * SQL listener query execute result. + */ +public class OdbcQueryExecuteResult { + /** Query ID. */ + private final long queryId; + + /** Fields metadata. */ + private final Collection<OdbcColumnMeta> columnsMeta; + + /** + * @param queryId Query ID. + * @param columnsMeta Columns metadata. + */ + public OdbcQueryExecuteResult(long queryId, Collection<OdbcColumnMeta> columnsMeta) { + this.queryId = queryId; + this.columnsMeta = columnsMeta; + } + + /** + * @return Query ID. + */ + public long getQueryId() { + return queryId; + } + + /** + * @return Columns metadata. + */ + public Collection<OdbcColumnMeta> getColumnsMetadata() { + return columnsMeta; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/e92707b6/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcQueryFetchRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcQueryFetchRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcQueryFetchRequest.java new file mode 100644 index 0000000..190fffd --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcQueryFetchRequest.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.odbc.odbc; + +import org.apache.ignite.internal.util.typedef.internal.S; + +/** + * SQL listener query fetch request. + */ +public class OdbcQueryFetchRequest extends OdbcRequest { + /** Query ID. */ + private final long queryId; + + /** Page size - maximum number of rows to return. */ + private final int pageSize; + + /** + * @param queryId Query ID. + * @param pageSize Page size. + */ + public OdbcQueryFetchRequest(long queryId, int pageSize) { + super(QRY_FETCH); + + this.queryId = queryId; + this.pageSize = pageSize; + } + + /** + * @return Page size. + */ + public int pageSize() { + return pageSize; + } + + /** + * @return Query ID. + */ + public long queryId() { + return queryId; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(OdbcQueryFetchRequest.class, this); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/e92707b6/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcQueryFetchResult.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcQueryFetchResult.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcQueryFetchResult.java new file mode 100644 index 0000000..f8075f3 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcQueryFetchResult.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.odbc.odbc; + +import java.util.Collection; + +/** + * SQL listener query fetch result. + */ +public class OdbcQueryFetchResult { + /** Query ID. */ + private final long queryId; + + /** Query result rows. */ + private final Collection<?> items; + + /** Flag indicating the query has no unfetched results. */ + private final boolean last; + + /** + * @param queryId Query ID. + * @param items Query result rows. + * @param last Flag indicating the query has no unfetched results. + */ + public OdbcQueryFetchResult(long queryId, Collection<?> items, boolean last){ + this.queryId = queryId; + this.items = items; + this.last = last; + } + + /** + * @return Query ID. + */ + public long queryId() { + return queryId; + } + + /** + * @return Query result rows. + */ + public Collection<?> items() { + return items; + } + + /** + * @return Flag indicating the query has no unfetched results. + */ + public boolean last() { + return last; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/e92707b6/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcQueryGetColumnsMetaRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcQueryGetColumnsMetaRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcQueryGetColumnsMetaRequest.java new file mode 100644 index 0000000..b60a9e3 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcQueryGetColumnsMetaRequest.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.odbc.odbc; + +import org.apache.ignite.internal.util.typedef.internal.S; +import org.jetbrains.annotations.Nullable; + +/** + * ODBC query get columns meta request. + */ +public class OdbcQueryGetColumnsMetaRequest extends OdbcRequest { + /** Cache name. */ + private final String cacheName; + + /** Table name. */ + private final String tableName; + + /** Column name. */ + private final String columnName; + + /** + * @param cacheName Cache name. + * @param tableName Table name. + * @param columnName Column name. + */ + public OdbcQueryGetColumnsMetaRequest(String cacheName, String tableName, String columnName) { + super(META_COLS); + + this.cacheName = cacheName; + this.tableName = tableName; + this.columnName = columnName; + } + + /** + * @return Cache name. + */ + @Nullable public String cacheName() { + return cacheName; + } + + /** + * @return Table name. + */ + public String tableName() { + return tableName; + } + + /** + * @return Column name. + */ + public String columnName() { + return columnName; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(OdbcQueryGetColumnsMetaRequest.class, this); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/e92707b6/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcQueryGetColumnsMetaResult.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcQueryGetColumnsMetaResult.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcQueryGetColumnsMetaResult.java new file mode 100644 index 0000000..7dbf7d8 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcQueryGetColumnsMetaResult.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.odbc.odbc; + +import java.util.Collection; + +/** + * Query get columns meta result. + */ +public class OdbcQueryGetColumnsMetaResult { + /** Query result rows. */ + private final Collection<OdbcColumnMeta> meta; + + /** + * @param meta Column metadata. + */ + public OdbcQueryGetColumnsMetaResult(Collection<OdbcColumnMeta> meta) { + this.meta = meta; + } + + /** + * @return Query result rows. + */ + public Collection<OdbcColumnMeta> meta() { + return meta; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/e92707b6/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcQueryGetParamsMetaRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcQueryGetParamsMetaRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcQueryGetParamsMetaRequest.java new file mode 100644 index 0000000..1d468b1 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcQueryGetParamsMetaRequest.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.odbc.odbc; + +import org.apache.ignite.internal.util.typedef.internal.S; + +/** + * ODBC query get params meta request. + */ +public class OdbcQueryGetParamsMetaRequest extends OdbcRequest { + /** Cache. */ + private final String cacheName; + + /** Query. */ + private final String query; + + /** + * @param query SQL Query. + */ + public OdbcQueryGetParamsMetaRequest(String cacheName, String query) { + super(META_PARAMS); + + this.cacheName = cacheName; + this.query = query; + } + + /** + * @return SQL Query. + */ + public String query() { + return query; + } + + /** + * @return Cache name. + */ + public String cacheName() { + return cacheName; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(OdbcQueryGetParamsMetaRequest.class, this); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/e92707b6/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcQueryGetParamsMetaResult.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcQueryGetParamsMetaResult.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcQueryGetParamsMetaResult.java new file mode 100644 index 0000000..d29b760 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcQueryGetParamsMetaResult.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.odbc.odbc; + +/** + * ODBC query get params meta result. + */ +public class OdbcQueryGetParamsMetaResult { + /** List of parameter type IDs. */ + private final byte[] typeIds; + + /** + * @param typeIds List of parameter type IDs. + */ + public OdbcQueryGetParamsMetaResult(byte[] typeIds) { + this.typeIds = typeIds; + } + + /** + * @return List of parameter type IDs. + */ + public byte[] typeIds() { + return typeIds; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/e92707b6/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcQueryGetTablesMetaRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcQueryGetTablesMetaRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcQueryGetTablesMetaRequest.java new file mode 100644 index 0000000..0ebb462 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcQueryGetTablesMetaRequest.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.odbc.odbc; + +import org.apache.ignite.internal.util.typedef.internal.S; + +/** + * ODBC query get tables meta request. + */ +public class OdbcQueryGetTablesMetaRequest extends OdbcRequest { + /** Catalog search pattern. */ + private final String catalog; + + /** Schema search pattern. */ + private final String schema; + + /** Table search pattern. */ + private final String table; + + /** Table type search pattern. */ + private final String tableType; + + /** + * @param catalog Catalog search pattern. + * @param schema Schema search pattern. + * @param table Table search pattern. + * @param tableType Table type search pattern. + */ + public OdbcQueryGetTablesMetaRequest(String catalog, String schema, String table, String tableType) { + super(META_TBLS); + + this.catalog = catalog; + this.schema = schema; + this.table = table; + this.tableType = tableType; + } + + /** + * @return catalog search pattern. + */ + public String catalog() { + return catalog; + } + + /** + * @return Schema search pattern. + */ + public String schema() { + return schema; + } + + /** + * @return Table search pattern. + */ + public String table() { + return table; + } + + /** + * @return Table type search pattern. + */ + public String tableType() { + return tableType; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(OdbcQueryGetTablesMetaRequest.class, this); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/e92707b6/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcQueryGetTablesMetaResult.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcQueryGetTablesMetaResult.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcQueryGetTablesMetaResult.java new file mode 100644 index 0000000..6316b2d --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcQueryGetTablesMetaResult.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.odbc.odbc; + +import java.util.Collection; + +/** + * Query get columns meta result. + */ +public class OdbcQueryGetTablesMetaResult { + /** Query result rows. */ + private final Collection<OdbcTableMeta> meta; + + /** + * @param meta Column metadata. + */ + public OdbcQueryGetTablesMetaResult(Collection<OdbcTableMeta> meta) { + this.meta = meta; + } + + /** + * @return Query result rows. + */ + public Collection<OdbcTableMeta> meta() { + return meta; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/e92707b6/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcRequest.java new file mode 100644 index 0000000..825e770 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcRequest.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.odbc.odbc; + +import org.apache.ignite.internal.processors.odbc.SqlListenerRequest; + +/** + * SQL listener command request. + */ +public class OdbcRequest extends SqlListenerRequest { + /** Execute sql query. */ + public static final int QRY_EXEC = 2; + + /** Fetch query results. */ + public static final int QRY_FETCH = 3; + + /** Close query. */ + public static final int QRY_CLOSE = 4; + + /** Get columns meta query. */ + public static final int META_COLS = 5; + + /** Get columns meta query. */ + public static final int META_TBLS = 6; + + /** Get parameters meta. */ + public static final int META_PARAMS = 7; + + /** Get parameters meta. */ + public static final int JDBC_REQ = 8; + + /** Command. */ + private final int cmd; + + /** + * @param cmd Command type. + */ + public OdbcRequest(int cmd) { + this.cmd = cmd; + } + + /** + * @return Command. + */ + public int command() { + return cmd; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/e92707b6/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcRequestHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcRequestHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcRequestHandler.java new file mode 100644 index 0000000..e1b421b --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcRequestHandler.java @@ -0,0 +1,505 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.odbc.odbc; + +import java.sql.ParameterMetaData; +import java.sql.PreparedStatement; +import java.sql.Types; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicLong; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.IgniteLogger; +import org.apache.ignite.cache.query.QueryCursor; +import org.apache.ignite.cache.query.SqlFieldsQuery; +import org.apache.ignite.internal.GridKernalContext; +import org.apache.ignite.internal.binary.GridBinaryMarshaller; +import org.apache.ignite.internal.processors.cache.QueryCursorImpl; +import org.apache.ignite.internal.processors.odbc.OdbcUtils; +import org.apache.ignite.internal.processors.odbc.SqlListenerRequest; +import org.apache.ignite.internal.processors.odbc.SqlListenerRequestHandler; +import org.apache.ignite.internal.processors.odbc.SqlListenerResponse; +import org.apache.ignite.internal.processors.odbc.odbc.escape.OdbcEscapeUtils; +import org.apache.ignite.internal.processors.query.GridQueryFieldMetadata; +import org.apache.ignite.internal.processors.query.GridQueryTypeDescriptor; +import org.apache.ignite.internal.util.GridSpinBusyLock; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteBiTuple; + +import static org.apache.ignite.internal.processors.odbc.odbc.OdbcRequest.META_COLS; +import static org.apache.ignite.internal.processors.odbc.odbc.OdbcRequest.META_PARAMS; +import static org.apache.ignite.internal.processors.odbc.odbc.OdbcRequest.META_TBLS; +import static org.apache.ignite.internal.processors.odbc.odbc.OdbcRequest.QRY_CLOSE; +import static org.apache.ignite.internal.processors.odbc.odbc.OdbcRequest.QRY_EXEC; +import static org.apache.ignite.internal.processors.odbc.odbc.OdbcRequest.QRY_FETCH; + +/** + * SQL query handler. + */ +public class OdbcRequestHandler implements SqlListenerRequestHandler { + /** Query ID sequence. */ + private static final AtomicLong QRY_ID_GEN = new AtomicLong(); + + /** Kernel context. */ + private final GridKernalContext ctx; + + /** Logger. */ + private final IgniteLogger log; + + /** Busy lock. */ + private final GridSpinBusyLock busyLock; + + /** Maximum allowed cursors. */ + private final int maxCursors; + + /** Current queries cursors. */ + private final ConcurrentHashMap<Long, IgniteBiTuple<QueryCursor, Iterator>> qryCursors = new ConcurrentHashMap<>(); + + /** Distributed joins flag. */ + private final boolean distributedJoins; + + /** Enforce join order flag. */ + private final boolean enforceJoinOrder; + + /** + * Constructor. + * + * @param ctx Context. + * @param busyLock Shutdown latch. + * @param maxCursors Maximum allowed cursors. + * @param distributedJoins Distributed joins flag. + * @param enforceJoinOrder Enforce join order flag. + */ + public OdbcRequestHandler(GridKernalContext ctx, GridSpinBusyLock busyLock, int maxCursors, + boolean distributedJoins, boolean enforceJoinOrder) { + this.ctx = ctx; + this.busyLock = busyLock; + this.maxCursors = maxCursors; + this.distributedJoins = distributedJoins; + this.enforceJoinOrder = enforceJoinOrder; + + log = ctx.log(getClass()); + } + + /** {@inheritDoc} */ + @Override public SqlListenerResponse handle(SqlListenerRequest req0) { + assert req0 != null; + + OdbcRequest req = (OdbcRequest)req0; + + if (!busyLock.enterBusy()) + return new OdbcResponse(OdbcResponse.STATUS_FAILED, + "Failed to handle ODBC request because node is stopping: " + req); + + try { + switch (req.command()) { + case QRY_EXEC: + return executeQuery((OdbcQueryExecuteRequest)req); + + case QRY_FETCH: + return fetchQuery((OdbcQueryFetchRequest)req); + + case QRY_CLOSE: + return closeQuery((OdbcQueryCloseRequest)req); + + case META_COLS: + return getColumnsMeta((OdbcQueryGetColumnsMetaRequest)req); + + case META_TBLS: + return getTablesMeta((OdbcQueryGetTablesMetaRequest)req); + + case META_PARAMS: + return getParamsMeta((OdbcQueryGetParamsMetaRequest)req); + } + + return new OdbcResponse(OdbcResponse.STATUS_FAILED, "Unsupported ODBC request: " + req); + } + finally { + busyLock.leaveBusy(); + } + } + + /** {@inheritDoc} */ + @Override public SqlListenerResponse handleException(Exception e) { + return new OdbcResponse(SqlListenerResponse.STATUS_FAILED, e.toString()); + } + + /** + * {@link OdbcQueryExecuteRequest} command handler. + * + * @param req Execute query request. + * @return Response. + */ + private SqlListenerResponse executeQuery(OdbcQueryExecuteRequest req) { + int cursorCnt = qryCursors.size(); + + if (maxCursors > 0 && cursorCnt >= maxCursors) + return new OdbcResponse(SqlListenerResponse.STATUS_FAILED, "Too many opened cursors (either close other " + + "opened cursors or increase the limit through OdbcConfiguration.setMaxOpenCursors()) " + + "[maximum=" + maxCursors + ", current=" + cursorCnt + ']'); + + long qryId = QRY_ID_GEN.getAndIncrement(); + + try { + String sql = OdbcEscapeUtils.parse(req.sqlQuery()); + + if (log.isDebugEnabled()) + log.debug("ODBC query parsed [reqId=" + req.requestId() + ", original=" + req.sqlQuery() + + ", parsed=" + sql + ']'); + + SqlFieldsQuery qry = new SqlFieldsQuery(sql); + + qry.setArgs(req.arguments()); + + qry.setDistributedJoins(distributedJoins); + qry.setEnforceJoinOrder(enforceJoinOrder); + + IgniteCache<Object, Object> cache0 = ctx.grid().cache(req.cacheName()); + + if (cache0 == null) + return new OdbcResponse(SqlListenerResponse.STATUS_FAILED, + "Cache doesn't exist (did you configure it?): " + req.cacheName()); + + IgniteCache<Object, Object> cache = cache0.withKeepBinary(); + + if (cache == null) + return new OdbcResponse(SqlListenerResponse.STATUS_FAILED, + "Can not get cache with keep binary: " + req.cacheName()); + + QueryCursor qryCur = cache.query(qry); + + qryCursors.put(qryId, new IgniteBiTuple<QueryCursor, Iterator>(qryCur, null)); + + List<?> fieldsMeta = ((QueryCursorImpl) qryCur).fieldsMeta(); + + OdbcQueryExecuteResult res = new OdbcQueryExecuteResult(qryId, convertMetadata(fieldsMeta)); + + return new OdbcResponse(res); + } + catch (Exception e) { + qryCursors.remove(qryId); + + U.error(log, "Failed to execute SQL query [reqId=" + req.requestId() + ", req=" + req + ']', e); + + return new OdbcResponse(SqlListenerResponse.STATUS_FAILED, e.toString()); + } + } + + /** + * {@link OdbcQueryCloseRequest} command handler. + * + * @param req Execute query request. + * @return Response. + */ + private SqlListenerResponse closeQuery(OdbcQueryCloseRequest req) { + try { + IgniteBiTuple<QueryCursor, Iterator> tuple = qryCursors.get(req.queryId()); + + if (tuple == null) + return new OdbcResponse(SqlListenerResponse.STATUS_FAILED, + "Failed to find query with ID: " + req.queryId()); + + QueryCursor cur = tuple.get1(); + + assert(cur != null); + + cur.close(); + + qryCursors.remove(req.queryId()); + + OdbcQueryCloseResult res = new OdbcQueryCloseResult(req.queryId()); + + return new OdbcResponse(res); + } + catch (Exception e) { + qryCursors.remove(req.queryId()); + + U.error(log, "Failed to close SQL query [reqId=" + req.requestId() + ", req=" + req.queryId() + ']', e); + + return new OdbcResponse(SqlListenerResponse.STATUS_FAILED, e.toString()); + } + } + + /** + * {@link OdbcQueryFetchRequest} command handler. + * + * @param req Execute query request. + * @return Response. + */ + private SqlListenerResponse fetchQuery(OdbcQueryFetchRequest req) { + try { + IgniteBiTuple<QueryCursor, Iterator> tuple = qryCursors.get(req.queryId()); + + if (tuple == null) + return new OdbcResponse(SqlListenerResponse.STATUS_FAILED, + "Failed to find query with ID: " + req.queryId()); + + Iterator iter = tuple.get2(); + + if (iter == null) { + QueryCursor cur = tuple.get1(); + + iter = cur.iterator(); + + tuple.put(cur, iter); + } + + List<Object> items = new ArrayList<>(); + + for (int i = 0; i < req.pageSize() && iter.hasNext(); ++i) + items.add(iter.next()); + + OdbcQueryFetchResult res = new OdbcQueryFetchResult(req.queryId(), items, !iter.hasNext()); + + return new OdbcResponse(res); + } + catch (Exception e) { + U.error(log, "Failed to fetch SQL query result [reqId=" + req.requestId() + ", req=" + req + ']', e); + + return new OdbcResponse(SqlListenerResponse.STATUS_FAILED, e.toString()); + } + } + + /** + * {@link OdbcQueryGetColumnsMetaRequest} command handler. + * + * @param req Get columns metadata request. + * @return Response. + */ + private SqlListenerResponse getColumnsMeta(OdbcQueryGetColumnsMetaRequest req) { + try { + List<OdbcColumnMeta> meta = new ArrayList<>(); + + String cacheName; + String tableName; + + if (req.tableName().contains(".")) { + // Parsing two-part table name. + String[] parts = req.tableName().split("\\."); + + cacheName = OdbcUtils.removeQuotationMarksIfNeeded(parts[0]); + + tableName = parts[1]; + } + else { + cacheName = OdbcUtils.removeQuotationMarksIfNeeded(req.cacheName()); + + tableName = req.tableName(); + } + + Collection<GridQueryTypeDescriptor> tablesMeta = ctx.query().types(cacheName); + + for (GridQueryTypeDescriptor table : tablesMeta) { + if (!matches(table.name(), tableName)) + continue; + + for (Map.Entry<String, Class<?>> field : table.fields().entrySet()) { + if (!matches(field.getKey(), req.columnName())) + continue; + + OdbcColumnMeta columnMeta = new OdbcColumnMeta(req.cacheName(), table.name(), + field.getKey(), field.getValue()); + + if (!meta.contains(columnMeta)) + meta.add(columnMeta); + } + } + + OdbcQueryGetColumnsMetaResult res = new OdbcQueryGetColumnsMetaResult(meta); + + return new OdbcResponse(res); + } + catch (Exception e) { + U.error(log, "Failed to get columns metadata [reqId=" + req.requestId() + ", req=" + req + ']', e); + + return new OdbcResponse(SqlListenerResponse.STATUS_FAILED, e.toString()); + } + } + + /** + * {@link OdbcQueryGetTablesMetaRequest} command handler. + * + * @param req Get tables metadata request. + * @return Response. + */ + private SqlListenerResponse getTablesMeta(OdbcQueryGetTablesMetaRequest req) { + try { + List<OdbcTableMeta> meta = new ArrayList<>(); + + String realSchema = OdbcUtils.removeQuotationMarksIfNeeded(req.schema()); + + for (String cacheName : ctx.cache().cacheNames()) + { + if (!matches(cacheName, realSchema)) + continue; + + Collection<GridQueryTypeDescriptor> tablesMeta = ctx.query().types(cacheName); + + for (GridQueryTypeDescriptor table : tablesMeta) { + if (!matches(table.name(), req.table())) + continue; + + if (!matches("TABLE", req.tableType())) + continue; + + OdbcTableMeta tableMeta = new OdbcTableMeta(null, cacheName, table.name(), "TABLE"); + + if (!meta.contains(tableMeta)) + meta.add(tableMeta); + } + } + + OdbcQueryGetTablesMetaResult res = new OdbcQueryGetTablesMetaResult(meta); + + return new OdbcResponse(res); + } + catch (Exception e) { + U.error(log, "Failed to get tables metadata [reqId=" + req.requestId() + ", req=" + req + ']', e); + + return new OdbcResponse(SqlListenerResponse.STATUS_FAILED, e.toString()); + } + } + + /** + * {@link OdbcQueryGetParamsMetaRequest} command handler. + * + * @param req Get params metadata request. + * @return Response. + */ + private SqlListenerResponse getParamsMeta(OdbcQueryGetParamsMetaRequest req) { + try { + PreparedStatement stmt = ctx.query().prepareNativeStatement(req.cacheName(), req.query()); + + ParameterMetaData pmd = stmt.getParameterMetaData(); + + byte[] typeIds = new byte[pmd.getParameterCount()]; + + for (int i = 1; i <= pmd.getParameterCount(); ++i) { + int sqlType = pmd.getParameterType(i); + + typeIds[i - 1] = sqlTypeToBinary(sqlType); + } + + OdbcQueryGetParamsMetaResult res = new OdbcQueryGetParamsMetaResult(typeIds); + + return new OdbcResponse(res); + } + catch (Exception e) { + U.error(log, "Failed to get params metadata [reqId=" + req.requestId() + ", req=" + req + ']', e); + + return new OdbcResponse(SqlListenerResponse.STATUS_FAILED, e.toString()); + } + } + + /** + * Convert {@link java.sql.Types} to binary type constant (See {@link GridBinaryMarshaller} constants). + * + * @param sqlType SQL type. + * @return Binary type. + */ + private static byte sqlTypeToBinary(int sqlType) { + switch (sqlType) { + case Types.BIGINT: + return GridBinaryMarshaller.LONG; + + case Types.BOOLEAN: + return GridBinaryMarshaller.BOOLEAN; + + case Types.DATE: + return GridBinaryMarshaller.DATE; + + case Types.DOUBLE: + return GridBinaryMarshaller.DOUBLE; + + case Types.FLOAT: + case Types.REAL: + return GridBinaryMarshaller.FLOAT; + + case Types.NUMERIC: + case Types.DECIMAL: + return GridBinaryMarshaller.DECIMAL; + + case Types.INTEGER: + return GridBinaryMarshaller.INT; + + case Types.SMALLINT: + return GridBinaryMarshaller.SHORT; + + case Types.TIME: + return GridBinaryMarshaller.TIME; + + case Types.TIMESTAMP: + return GridBinaryMarshaller.TIMESTAMP; + + case Types.TINYINT: + return GridBinaryMarshaller.BYTE; + + case Types.CHAR: + case Types.VARCHAR: + case Types.LONGNVARCHAR: + return GridBinaryMarshaller.STRING; + + case Types.NULL: + return GridBinaryMarshaller.NULL; + + case Types.BINARY: + case Types.VARBINARY: + case Types.LONGVARBINARY: + default: + return GridBinaryMarshaller.BYTE_ARR; + } + } + + /** + * Convert metadata in collection from {@link GridQueryFieldMetadata} to + * {@link OdbcColumnMeta}. + * + * @param meta Internal query field metadata. + * @return Odbc query field metadata. + */ + private static Collection<OdbcColumnMeta> convertMetadata(Collection<?> meta) { + List<OdbcColumnMeta> res = new ArrayList<>(); + + if (meta != null) { + for (Object info : meta) { + assert info instanceof GridQueryFieldMetadata; + + res.add(new OdbcColumnMeta((GridQueryFieldMetadata)info)); + } + } + + return res; + } + + /** + * Checks whether string matches SQL pattern. + * + * @param str String. + * @param ptrn Pattern. + * @return Whether string matches pattern. + */ + private static boolean matches(String str, String ptrn) { + return str != null && (F.isEmpty(ptrn) || + str.toUpperCase().matches(ptrn.toUpperCase().replace("%", ".*").replace("_", "."))); + } +}
