http://git-wip-us.apache.org/repos/asf/ignite/blob/6f1dc3ac/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/SqlListenerAbstractObjectWriter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/SqlListenerAbstractObjectWriter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/SqlListenerAbstractObjectWriter.java new file mode 100644 index 0000000..f5e9924 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/SqlListenerAbstractObjectWriter.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.odbc; + +import java.math.BigDecimal; +import java.sql.Time; +import java.sql.Timestamp; +import java.util.UUID; +import org.apache.ignite.binary.BinaryObjectException; +import org.apache.ignite.internal.binary.BinaryWriterExImpl; +import org.apache.ignite.internal.binary.GridBinaryMarshaller; +import org.jetbrains.annotations.Nullable; + +/** + * Binary writer with marshaling non-primitive and non-embedded objects with JDK marshaller.. + */ +public abstract class SqlListenerAbstractObjectWriter { + /** + * @param writer Writer. + * @param obj Object to write. + * @throws BinaryObjectException On error. + */ + public void writeObject(BinaryWriterExImpl writer, @Nullable Object obj) throws BinaryObjectException { + if (obj == null) { + writer.writeByte(GridBinaryMarshaller.NULL); + + return; + } + + Class<?> cls = obj.getClass(); + + if (cls == Boolean.class) + writer.writeBooleanFieldPrimitive((Boolean)obj); + else if (cls == Byte.class) + writer.writeByteFieldPrimitive((Byte)obj); + else if (cls == Character.class) + writer.writeCharFieldPrimitive((Character)obj); + else if (cls == Short.class) + writer.writeShortFieldPrimitive((Short)obj); + else if (cls == Integer.class) + writer.writeIntFieldPrimitive((Integer)obj); + else if (cls == Long.class) + writer.writeLongFieldPrimitive((Long)obj); + else if (cls == Float.class) + writer.writeFloatFieldPrimitive((Float)obj); + else if (cls == Double.class) + writer.writeDoubleFieldPrimitive((Double)obj); + else if (cls == String.class) + writer.doWriteString((String)obj); + else if (cls == BigDecimal.class) + writer.doWriteDecimal((BigDecimal)obj); + else if (cls == UUID.class) + writer.writeUuid((UUID)obj); + else if (cls == Time.class) + writer.writeTime((Time)obj); + else if (cls == Timestamp.class) + writer.writeTimestamp((Timestamp)obj); + else if (cls == java.sql.Date.class || cls == java.util.Date.class) + writer.writeDate((java.util.Date)obj); + else if (cls == boolean[].class) + writer.writeBooleanArray((boolean[])obj); + else if (cls == byte[].class) + writer.writeByteArray((byte[])obj); + else if (cls == char[].class) + writer.writeCharArray((char[])obj); + else if (cls == short[].class) + writer.writeShortArray((short[])obj); + else if (cls == int[].class) + writer.writeIntArray((int[])obj); + else if (cls == float[].class) + writer.writeFloatArray((float[])obj); + else if (cls == double[].class) + writer.writeDoubleArray((double[])obj); + else if (cls == String[].class) + writer.writeStringArray((String[])obj); + else if (cls == BigDecimal[].class) + writer.writeDecimalArray((BigDecimal[])obj); + else if (cls == UUID[].class) + writer.writeUuidArray((UUID[])obj); + else if (cls == Time[].class) + writer.writeTimeArray((Time[])obj); + else if (cls == Timestamp[].class) + writer.writeTimestampArray((Timestamp[])obj); + else if (cls == java.util.Date[].class || cls == java.sql.Date[].class) + writer.writeDateArray((java.util.Date[])obj); + else + writeCustomObject(writer, obj); + } + + /** + * @param writer Writer. + * @param obj Object to marshal with marshaller and write to binary stream. + * @throws BinaryObjectException On error. + */ + protected abstract void writeCustomObject(BinaryWriterExImpl writer, Object obj) throws BinaryObjectException; +}
http://git-wip-us.apache.org/repos/asf/ignite/blob/6f1dc3ac/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/SqlListenerNioListener.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/SqlListenerNioListener.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/SqlListenerNioListener.java new file mode 100644 index 0000000..9eaec04 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/SqlListenerNioListener.java @@ -0,0 +1,263 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.odbc; + +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.atomic.AtomicLong; +import org.apache.ignite.IgniteException; +import org.apache.ignite.IgniteLogger; +import org.apache.ignite.internal.GridKernalContext; +import org.apache.ignite.internal.binary.BinaryReaderExImpl; +import org.apache.ignite.internal.binary.BinaryWriterExImpl; +import org.apache.ignite.internal.binary.streams.BinaryHeapInputStream; +import org.apache.ignite.internal.binary.streams.BinaryHeapOutputStream; +import org.apache.ignite.internal.binary.streams.BinaryInputStream; +import org.apache.ignite.internal.processors.odbc.jdbc.JdbcMessageParser; +import org.apache.ignite.internal.processors.odbc.odbc.OdbcMessageParser; +import org.apache.ignite.internal.util.GridSpinBusyLock; +import org.apache.ignite.internal.util.nio.GridNioServerListenerAdapter; +import org.apache.ignite.internal.util.nio.GridNioSession; +import org.apache.ignite.internal.util.nio.GridNioSessionMetaKey; +import org.jetbrains.annotations.Nullable; + +/** + * ODBC message listener. + */ +public class SqlListenerNioListener extends GridNioServerListenerAdapter<byte[]> { + /** The value corresponds to ODBC driver of the parser field of the handshake request. */ + public static final byte ODBC_CLIENT = 0; + + /** The value corresponds to JDBC driver of the parser field of the handshake request. */ + public static final byte JDBC_CLIENT = 1; + + /** Current version. */ + private static final SqlListenerProtocolVersion CURRENT_VER = SqlListenerProtocolVersion.create(2, 1, 0); + + /** Supported versions. */ + private static final Set<SqlListenerProtocolVersion> SUPPORTED_VERS = new HashSet<>(); + + /** Connection-related metadata key. */ + private static final int CONN_CTX_META_KEY = GridNioSessionMetaKey.nextUniqueKey(); + + /** Request ID generator. */ + private static final AtomicLong REQ_ID_GEN = new AtomicLong(); + + /** Busy lock. */ + private final GridSpinBusyLock busyLock; + + /** Kernal context. */ + private final GridKernalContext ctx; + + /** Maximum allowed cursors. */ + private final int maxCursors; + + /** Logger. */ + private final IgniteLogger log; + + static { + SUPPORTED_VERS.add(CURRENT_VER); + } + + /** + * Constructor. + * + * @param ctx Context. + * @param busyLock Shutdown busy lock. + * @param maxCursors Maximum allowed cursors. + */ + public SqlListenerNioListener(GridKernalContext ctx, GridSpinBusyLock busyLock, int maxCursors) { + this.ctx = ctx; + this.busyLock = busyLock; + this.maxCursors = maxCursors; + + log = ctx.log(getClass()); + } + + /** {@inheritDoc} */ + @Override public void onConnected(GridNioSession ses) { + if (log.isDebugEnabled()) + log.debug("SQL client connected: " + ses.remoteAddress()); + } + + /** {@inheritDoc} */ + @Override public void onDisconnected(GridNioSession ses, @Nullable Exception e) { + if (log.isDebugEnabled()) { + if (e == null) + log.debug("SQL client disconnected: " + ses.remoteAddress()); + else + log.debug("SQL client disconnected due to an error [addr=" + ses.remoteAddress() + ", err=" + e + ']'); + } + } + + /** {@inheritDoc} */ + @Override public void onMessage(GridNioSession ses, byte[] msg) { + assert msg != null; + + SqlListenerConnectionContext connCtx = ses.meta(CONN_CTX_META_KEY); + + if (connCtx == null) { + onHandshake(ses, msg); + + return; + } + + SqlListenerMessageParser parser = connCtx.parser(); + + SqlListenerRequest req; + + try { + req = parser.decode(msg); + } + catch (Exception e) { + log.error("Failed to parse SQL client request [err=" + e + ']'); + + ses.close(); + + return; + } + + assert req != null; + + req.requestId(REQ_ID_GEN.incrementAndGet()); + + try { + long startTime = 0; + + if (log.isDebugEnabled()) { + startTime = System.nanoTime(); + + log.debug("SQL client request received [reqId=" + req.requestId() + ", addr=" + ses.remoteAddress() + + ", req=" + req + ']'); + } + + SqlListenerRequestHandler handler = connCtx.handler(); + + SqlListenerResponse resp = handler.handle(req); + + if (log.isDebugEnabled()) { + long dur = (System.nanoTime() - startTime) / 1000; + + log.debug("SQL client request processed [reqId=" + req.requestId() + ", dur(mcs)=" + dur + + ", resp=" + resp.status() + ']'); + } + + byte[] outMsg = parser.encode(resp); + + ses.send(outMsg); + } + catch (Exception e) { + log.error("Failed to process SQL client request [reqId=" + req.requestId() + ", err=" + e + ']'); + + ses.send(parser.encode(new SqlListenerResponse(SqlListenerResponse.STATUS_FAILED, e.toString()))); + } + } + + /** + * Perform handshake. + * + * @param ses Session. + * @param msg Message bytes. + */ + private void onHandshake(GridNioSession ses, byte[] msg) { + BinaryInputStream stream = new BinaryHeapInputStream(msg); + + BinaryReaderExImpl reader = new BinaryReaderExImpl(null, stream, null, true); + + byte cmd = reader.readByte(); + + if (cmd != SqlListenerRequest.HANDSHAKE) { + log.error("Unexpected SQL client request (will close session): " + ses.remoteAddress()); + + ses.close(); + + return; + } + + short verMajor = reader.readShort(); + short verMinor = reader.readShort(); + short verMaintenance = reader.readShort(); + + SqlListenerProtocolVersion ver = SqlListenerProtocolVersion.create(verMajor, verMinor, verMaintenance); + + String errMsg = null; + + if (SUPPORTED_VERS.contains(ver)) { + // Prepare context. + SqlListenerConnectionContext connCtx = prepareContext(ver, reader); + + ses.addMeta(CONN_CTX_META_KEY, connCtx); + } + else { + log.warning("Unsupported version: " + ver.toString()); + + errMsg = "Unsupported version."; + } + + // Send response. + BinaryWriterExImpl writer = new BinaryWriterExImpl(null, new BinaryHeapOutputStream(8), null, null); + + if (errMsg == null) + writer.writeBoolean(true); + else { + writer.writeBoolean(false); + writer.writeShort(CURRENT_VER.major()); + writer.writeShort(CURRENT_VER.minor()); + writer.writeShort(CURRENT_VER.maintenance()); + writer.doWriteString(errMsg); + } + + ses.send(writer.array()); + } + + /** + * Prepare context. + * + * @param ver Version. + * @param reader Reader. + * @return Context. + */ + private SqlListenerConnectionContext prepareContext(SqlListenerProtocolVersion ver, BinaryReaderExImpl reader) { + byte clientType = reader.readByte(); + + boolean distributedJoins = reader.readBoolean(); + boolean enforceJoinOrder = reader.readBoolean(); + + SqlListenerRequestHandlerImpl handler = new SqlListenerRequestHandlerImpl(ctx, busyLock, maxCursors, + distributedJoins, enforceJoinOrder); + + SqlListenerMessageParser parser = null; + + switch (clientType) { + case ODBC_CLIENT: + parser = new OdbcMessageParser(ctx); + + break; + + case JDBC_CLIENT: + parser = new JdbcMessageParser(ctx); + + break; + } + + if (parser == null) + throw new IgniteException("Unknown client type: " + clientType); + + return new SqlListenerConnectionContext(handler, parser); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/6f1dc3ac/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/SqlListenerProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/SqlListenerProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/SqlListenerProcessor.java new file mode 100644 index 0000000..cbe54df --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/SqlListenerProcessor.java @@ -0,0 +1,191 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.odbc; + +import java.net.InetAddress; +import java.nio.ByteOrder; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.LinkedBlockingQueue; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.configuration.OdbcConfiguration; +import org.apache.ignite.internal.GridKernalContext; +import org.apache.ignite.internal.processors.GridProcessorAdapter; +import org.apache.ignite.internal.util.GridSpinBusyLock; +import org.apache.ignite.internal.util.HostAndPortRange; +import org.apache.ignite.internal.util.nio.GridNioAsyncNotifyFilter; +import org.apache.ignite.internal.util.nio.GridNioCodecFilter; +import org.apache.ignite.internal.util.nio.GridNioFilter; +import org.apache.ignite.internal.util.nio.GridNioServer; +import org.apache.ignite.internal.util.nio.GridNioSession; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.spi.IgnitePortProtocol; +import org.apache.ignite.thread.IgniteThreadPoolExecutor; + +/** + * ODBC processor. + */ +public class SqlListenerProcessor extends GridProcessorAdapter { + /** Default number of selectors. */ + private static final int DFLT_SELECTOR_CNT = Math.min(4, Runtime.getRuntime().availableProcessors()); + + /** Default TCP_NODELAY flag. */ + private static final boolean DFLT_TCP_NODELAY = true; + + /** Default TCP direct buffer flag. */ + private static final boolean DFLT_TCP_DIRECT_BUF = false; + + /** Busy lock. */ + private final GridSpinBusyLock busyLock = new GridSpinBusyLock(); + + /** ODBC TCP Server. */ + private GridNioServer<byte[]> srv; + + /** ODBC executor service. */ + private ExecutorService odbcExecSvc; + + /** + * @param ctx Kernal context. + */ + public SqlListenerProcessor(GridKernalContext ctx) { + super(ctx); + } + + /** {@inheritDoc} */ + @Override public void start(boolean activeOnStart) throws IgniteCheckedException { + IgniteConfiguration cfg = ctx.config(); + + OdbcConfiguration odbcCfg = cfg.getOdbcConfiguration(); + + if (odbcCfg != null) { + try { + HostAndPortRange hostPort; + + if (F.isEmpty(odbcCfg.getEndpointAddress())) { + hostPort = new HostAndPortRange(OdbcConfiguration.DFLT_TCP_HOST, + OdbcConfiguration.DFLT_TCP_PORT_FROM, + OdbcConfiguration.DFLT_TCP_PORT_TO + ); + } + else { + hostPort = HostAndPortRange.parse(odbcCfg.getEndpointAddress(), + OdbcConfiguration.DFLT_TCP_PORT_FROM, + OdbcConfiguration.DFLT_TCP_PORT_TO, + "Failed to parse ODBC endpoint address" + ); + } + + assertParameter(odbcCfg.getThreadPoolSize() > 0, "threadPoolSize > 0"); + + odbcExecSvc = new IgniteThreadPoolExecutor( + "odbc", + cfg.getIgniteInstanceName(), + odbcCfg.getThreadPoolSize(), + odbcCfg.getThreadPoolSize(), + 0, + new LinkedBlockingQueue<Runnable>()); + + InetAddress host; + + try { + host = InetAddress.getByName(hostPort.host()); + } + catch (Exception e) { + throw new IgniteCheckedException("Failed to resolve ODBC host: " + hostPort.host(), e); + } + + Exception lastErr = null; + + for (int port = hostPort.portFrom(); port <= hostPort.portTo(); port++) { + try { + GridNioFilter[] filters = new GridNioFilter[] { + new GridNioAsyncNotifyFilter(ctx.igniteInstanceName(), odbcExecSvc, log) { + @Override public void onSessionOpened(GridNioSession ses) throws IgniteCheckedException { + proceedSessionOpened(ses); + } + }, + new GridNioCodecFilter(new SqlListenerBufferedParser(), log, false) + }; + + GridNioServer<byte[]> srv0 = GridNioServer.<byte[]>builder() + .address(host) + .port(port) + .listener(new SqlListenerNioListener(ctx, busyLock, odbcCfg.getMaxOpenCursors())) + .logger(log) + .selectorCount(DFLT_SELECTOR_CNT) + .igniteInstanceName(ctx.igniteInstanceName()) + .serverName("odbc") + .tcpNoDelay(DFLT_TCP_NODELAY) + .directBuffer(DFLT_TCP_DIRECT_BUF) + .byteOrder(ByteOrder.nativeOrder()) + .socketSendBufferSize(odbcCfg.getSocketSendBufferSize()) + .socketReceiveBufferSize(odbcCfg.getSocketReceiveBufferSize()) + .filters(filters) + .directMode(false) + .build(); + + srv0.start(); + + srv = srv0; + + ctx.ports().registerPort(port, IgnitePortProtocol.TCP, getClass()); + + log.info("ODBC processor has started on TCP port " + port); + + lastErr = null; + + break; + } + catch (Exception e) { + lastErr = e; + } + } + + assert (srv != null && lastErr == null) || (srv == null && lastErr != null); + + if (lastErr != null) + throw new IgniteCheckedException("Failed to bind to any [host:port] from the range [" + + "address=" + hostPort + ", lastErr=" + lastErr + ']'); + } + catch (Exception e) { + throw new IgniteCheckedException("Failed to start ODBC processor.", e); + } + } + } + + /** {@inheritDoc} */ + @Override public void onKernalStop(boolean cancel) { + if (srv != null) { + busyLock.block(); + + srv.stop(); + + ctx.ports().deregisterPorts(getClass()); + + if (odbcExecSvc != null) { + U.shutdownNow(getClass(), odbcExecSvc, log); + + odbcExecSvc = null; + } + + if (log.isDebugEnabled()) + log.debug("ODBC processor stopped."); + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/6f1dc3ac/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/SqlListenerRequestHandlerImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/SqlListenerRequestHandlerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/SqlListenerRequestHandlerImpl.java new file mode 100644 index 0000000..1230bb4 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/SqlListenerRequestHandlerImpl.java @@ -0,0 +1,494 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.odbc; + +import java.sql.ParameterMetaData; +import java.sql.PreparedStatement; +import java.sql.Types; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicLong; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.IgniteLogger; +import org.apache.ignite.cache.query.QueryCursor; +import org.apache.ignite.cache.query.SqlFieldsQuery; +import org.apache.ignite.internal.GridKernalContext; +import org.apache.ignite.internal.binary.GridBinaryMarshaller; +import org.apache.ignite.internal.processors.cache.QueryCursorImpl; +import org.apache.ignite.internal.processors.odbc.odbc.escape.OdbcEscapeUtils; +import org.apache.ignite.internal.processors.query.GridQueryFieldMetadata; +import org.apache.ignite.internal.processors.query.GridQueryTypeDescriptor; +import org.apache.ignite.internal.util.GridSpinBusyLock; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteBiTuple; + +import static org.apache.ignite.internal.processors.odbc.SqlListenerRequest.META_COLS; +import static org.apache.ignite.internal.processors.odbc.SqlListenerRequest.META_PARAMS; +import static org.apache.ignite.internal.processors.odbc.SqlListenerRequest.META_TBLS; +import static org.apache.ignite.internal.processors.odbc.SqlListenerRequest.QRY_CLOSE; +import static org.apache.ignite.internal.processors.odbc.SqlListenerRequest.QRY_EXEC; +import static org.apache.ignite.internal.processors.odbc.SqlListenerRequest.QRY_FETCH; + +/** + * SQL query handler. + */ +public class SqlListenerRequestHandlerImpl implements SqlListenerRequestHandler { + /** Query ID sequence. */ + private static final AtomicLong QRY_ID_GEN = new AtomicLong(); + + /** Kernel context. */ + private final GridKernalContext ctx; + + /** Logger. */ + private final IgniteLogger log; + + /** Busy lock. */ + private final GridSpinBusyLock busyLock; + + /** Maximum allowed cursors. */ + private final int maxCursors; + + /** Current queries cursors. */ + private final ConcurrentHashMap<Long, IgniteBiTuple<QueryCursor, Iterator>> qryCursors = new ConcurrentHashMap<>(); + + /** Distributed joins flag. */ + private final boolean distributedJoins; + + /** Enforce join order flag. */ + private final boolean enforceJoinOrder; + + /** + * Constructor. + * + * @param ctx Context. + * @param busyLock Shutdown latch. + * @param maxCursors Maximum allowed cursors. + * @param distributedJoins Distributed joins flag. + * @param enforceJoinOrder Enforce join order flag. + */ + public SqlListenerRequestHandlerImpl(GridKernalContext ctx, GridSpinBusyLock busyLock, int maxCursors, + boolean distributedJoins, boolean enforceJoinOrder) { + this.ctx = ctx; + this.busyLock = busyLock; + this.maxCursors = maxCursors; + this.distributedJoins = distributedJoins; + this.enforceJoinOrder = enforceJoinOrder; + + log = ctx.log(getClass()); + } + + /** {@inheritDoc} */ + @Override public SqlListenerResponse handle(SqlListenerRequest req) { + assert req != null; + + if (!busyLock.enterBusy()) + return new SqlListenerResponse(SqlListenerResponse.STATUS_FAILED, + "Failed to handle ODBC request because node is stopping: " + req); + + try { + switch (req.command()) { + case QRY_EXEC: + return executeQuery((SqlListenerQueryExecuteRequest)req); + + case QRY_FETCH: + return fetchQuery((SqlListenerQueryFetchRequest)req); + + case QRY_CLOSE: + return closeQuery((SqlListenerQueryCloseRequest)req); + + case META_COLS: + return getColumnsMeta((OdbcQueryGetColumnsMetaRequest)req); + + case META_TBLS: + return getTablesMeta((OdbcQueryGetTablesMetaRequest)req); + + case META_PARAMS: + return getParamsMeta((OdbcQueryGetParamsMetaRequest)req); + } + + return new SqlListenerResponse(SqlListenerResponse.STATUS_FAILED, "Unsupported ODBC request: " + req); + } + finally { + busyLock.leaveBusy(); + } + } + + /** + * {@link SqlListenerQueryExecuteRequest} command handler. + * + * @param req Execute query request. + * @return Response. + */ + private SqlListenerResponse executeQuery(SqlListenerQueryExecuteRequest req) { + int cursorCnt = qryCursors.size(); + + if (maxCursors > 0 && cursorCnt >= maxCursors) + return new SqlListenerResponse(SqlListenerResponse.STATUS_FAILED, "Too many opened cursors (either close other " + + "opened cursors or increase the limit through OdbcConfiguration.setMaxOpenCursors()) " + + "[maximum=" + maxCursors + ", current=" + cursorCnt + ']'); + + long qryId = QRY_ID_GEN.getAndIncrement(); + + try { + String sql = OdbcEscapeUtils.parse(req.sqlQuery()); + + if (log.isDebugEnabled()) + log.debug("ODBC query parsed [reqId=" + req.requestId() + ", original=" + req.sqlQuery() + + ", parsed=" + sql + ']'); + + SqlFieldsQuery qry = new SqlFieldsQuery(sql); + + qry.setArgs(req.arguments()); + + qry.setDistributedJoins(distributedJoins); + qry.setEnforceJoinOrder(enforceJoinOrder); + + IgniteCache<Object, Object> cache0 = ctx.grid().cache(req.cacheName()); + + if (cache0 == null) + return new SqlListenerResponse(SqlListenerResponse.STATUS_FAILED, + "Cache doesn't exist (did you configure it?): " + req.cacheName()); + + IgniteCache<Object, Object> cache = cache0.withKeepBinary(); + + if (cache == null) + return new SqlListenerResponse(SqlListenerResponse.STATUS_FAILED, + "Can not get cache with keep binary: " + req.cacheName()); + + QueryCursor qryCur = cache.query(qry); + + qryCursors.put(qryId, new IgniteBiTuple<QueryCursor, Iterator>(qryCur, null)); + + List<?> fieldsMeta = ((QueryCursorImpl) qryCur).fieldsMeta(); + + SqlListenerQueryExecuteResult res = new SqlListenerQueryExecuteResult(qryId, convertMetadata(fieldsMeta)); + + return new SqlListenerResponse(res); + } + catch (Exception e) { + qryCursors.remove(qryId); + + U.error(log, "Failed to execute SQL query [reqId=" + req.requestId() + ", req=" + req + ']', e); + + return new SqlListenerResponse(SqlListenerResponse.STATUS_FAILED, e.toString()); + } + } + + /** + * {@link SqlListenerQueryCloseRequest} command handler. + * + * @param req Execute query request. + * @return Response. + */ + private SqlListenerResponse closeQuery(SqlListenerQueryCloseRequest req) { + try { + IgniteBiTuple<QueryCursor, Iterator> tuple = qryCursors.get(req.queryId()); + + if (tuple == null) + return new SqlListenerResponse(SqlListenerResponse.STATUS_FAILED, + "Failed to find query with ID: " + req.queryId()); + + QueryCursor cur = tuple.get1(); + + assert(cur != null); + + cur.close(); + + qryCursors.remove(req.queryId()); + + SqlListenerQueryCloseResult res = new SqlListenerQueryCloseResult(req.queryId()); + + return new SqlListenerResponse(res); + } + catch (Exception e) { + qryCursors.remove(req.queryId()); + + U.error(log, "Failed to close SQL query [reqId=" + req.requestId() + ", req=" + req.queryId() + ']', e); + + return new SqlListenerResponse(SqlListenerResponse.STATUS_FAILED, e.toString()); + } + } + + /** + * {@link SqlListenerQueryFetchRequest} command handler. + * + * @param req Execute query request. + * @return Response. + */ + private SqlListenerResponse fetchQuery(SqlListenerQueryFetchRequest req) { + try { + IgniteBiTuple<QueryCursor, Iterator> tuple = qryCursors.get(req.queryId()); + + if (tuple == null) + return new SqlListenerResponse(SqlListenerResponse.STATUS_FAILED, + "Failed to find query with ID: " + req.queryId()); + + Iterator iter = tuple.get2(); + + if (iter == null) { + QueryCursor cur = tuple.get1(); + + iter = cur.iterator(); + + tuple.put(cur, iter); + } + + List<Object> items = new ArrayList<>(); + + for (int i = 0; i < req.pageSize() && iter.hasNext(); ++i) + items.add(iter.next()); + + SqlListenerQueryFetchResult res = new SqlListenerQueryFetchResult(req.queryId(), items, !iter.hasNext()); + + return new SqlListenerResponse(res); + } + catch (Exception e) { + U.error(log, "Failed to fetch SQL query result [reqId=" + req.requestId() + ", req=" + req + ']', e); + + return new SqlListenerResponse(SqlListenerResponse.STATUS_FAILED, e.toString()); + } + } + + /** + * {@link OdbcQueryGetColumnsMetaRequest} command handler. + * + * @param req Get columns metadata request. + * @return Response. + */ + private SqlListenerResponse getColumnsMeta(OdbcQueryGetColumnsMetaRequest req) { + try { + List<SqlListenerColumnMeta> meta = new ArrayList<>(); + + String cacheName; + String tableName; + + if (req.tableName().contains(".")) { + // Parsing two-part table name. + String[] parts = req.tableName().split("\\."); + + cacheName = OdbcUtils.removeQuotationMarksIfNeeded(parts[0]); + + tableName = parts[1]; + } + else { + cacheName = OdbcUtils.removeQuotationMarksIfNeeded(req.cacheName()); + + tableName = req.tableName(); + } + + Collection<GridQueryTypeDescriptor> tablesMeta = ctx.query().types(cacheName); + + for (GridQueryTypeDescriptor table : tablesMeta) { + if (!matches(table.name(), tableName)) + continue; + + for (Map.Entry<String, Class<?>> field : table.fields().entrySet()) { + if (!matches(field.getKey(), req.columnName())) + continue; + + SqlListenerColumnMeta columnMeta = new SqlListenerColumnMeta(req.cacheName(), table.name(), + field.getKey(), field.getValue()); + + if (!meta.contains(columnMeta)) + meta.add(columnMeta); + } + } + + OdbcQueryGetColumnsMetaResult res = new OdbcQueryGetColumnsMetaResult(meta); + + return new SqlListenerResponse(res); + } + catch (Exception e) { + U.error(log, "Failed to get columns metadata [reqId=" + req.requestId() + ", req=" + req + ']', e); + + return new SqlListenerResponse(SqlListenerResponse.STATUS_FAILED, e.toString()); + } + } + + /** + * {@link OdbcQueryGetTablesMetaRequest} command handler. + * + * @param req Get tables metadata request. + * @return Response. + */ + private SqlListenerResponse getTablesMeta(OdbcQueryGetTablesMetaRequest req) { + try { + List<OdbcTableMeta> meta = new ArrayList<>(); + + String realSchema = OdbcUtils.removeQuotationMarksIfNeeded(req.schema()); + + for (String cacheName : ctx.cache().cacheNames()) + { + if (!matches(cacheName, realSchema)) + continue; + + Collection<GridQueryTypeDescriptor> tablesMeta = ctx.query().types(cacheName); + + for (GridQueryTypeDescriptor table : tablesMeta) { + if (!matches(table.name(), req.table())) + continue; + + if (!matches("TABLE", req.tableType())) + continue; + + OdbcTableMeta tableMeta = new OdbcTableMeta(null, cacheName, table.name(), "TABLE"); + + if (!meta.contains(tableMeta)) + meta.add(tableMeta); + } + } + + OdbcQueryGetTablesMetaResult res = new OdbcQueryGetTablesMetaResult(meta); + + return new SqlListenerResponse(res); + } + catch (Exception e) { + U.error(log, "Failed to get tables metadata [reqId=" + req.requestId() + ", req=" + req + ']', e); + + return new SqlListenerResponse(SqlListenerResponse.STATUS_FAILED, e.toString()); + } + } + + /** + * {@link OdbcQueryGetParamsMetaRequest} command handler. + * + * @param req Get params metadata request. + * @return Response. + */ + private SqlListenerResponse getParamsMeta(OdbcQueryGetParamsMetaRequest req) { + try { + PreparedStatement stmt = ctx.query().prepareNativeStatement(req.cacheName(), req.query()); + + ParameterMetaData pmd = stmt.getParameterMetaData(); + + byte[] typeIds = new byte[pmd.getParameterCount()]; + + for (int i = 1; i <= pmd.getParameterCount(); ++i) { + int sqlType = pmd.getParameterType(i); + + typeIds[i - 1] = sqlTypeToBinary(sqlType); + } + + OdbcQueryGetParamsMetaResult res = new OdbcQueryGetParamsMetaResult(typeIds); + + return new SqlListenerResponse(res); + } + catch (Exception e) { + U.error(log, "Failed to get params metadata [reqId=" + req.requestId() + ", req=" + req + ']', e); + + return new SqlListenerResponse(SqlListenerResponse.STATUS_FAILED, e.toString()); + } + } + + /** + * Convert {@link java.sql.Types} to binary type constant (See {@link GridBinaryMarshaller} constants). + * + * @param sqlType SQL type. + * @return Binary type. + */ + private static byte sqlTypeToBinary(int sqlType) { + switch (sqlType) { + case Types.BIGINT: + return GridBinaryMarshaller.LONG; + + case Types.BOOLEAN: + return GridBinaryMarshaller.BOOLEAN; + + case Types.DATE: + return GridBinaryMarshaller.DATE; + + case Types.DOUBLE: + return GridBinaryMarshaller.DOUBLE; + + case Types.FLOAT: + case Types.REAL: + return GridBinaryMarshaller.FLOAT; + + case Types.NUMERIC: + case Types.DECIMAL: + return GridBinaryMarshaller.DECIMAL; + + case Types.INTEGER: + return GridBinaryMarshaller.INT; + + case Types.SMALLINT: + return GridBinaryMarshaller.SHORT; + + case Types.TIME: + return GridBinaryMarshaller.TIME; + + case Types.TIMESTAMP: + return GridBinaryMarshaller.TIMESTAMP; + + case Types.TINYINT: + return GridBinaryMarshaller.BYTE; + + case Types.CHAR: + case Types.VARCHAR: + case Types.LONGNVARCHAR: + return GridBinaryMarshaller.STRING; + + case Types.NULL: + return GridBinaryMarshaller.NULL; + + case Types.BINARY: + case Types.VARBINARY: + case Types.LONGVARBINARY: + default: + return GridBinaryMarshaller.BYTE_ARR; + } + } + + /** + * Convert metadata in collection from {@link GridQueryFieldMetadata} to + * {@link SqlListenerColumnMeta}. + * + * @param meta Internal query field metadata. + * @return Odbc query field metadata. + */ + private static Collection<SqlListenerColumnMeta> convertMetadata(Collection<?> meta) { + List<SqlListenerColumnMeta> res = new ArrayList<>(); + + if (meta != null) { + for (Object info : meta) { + assert info instanceof GridQueryFieldMetadata; + + res.add(new SqlListenerColumnMeta((GridQueryFieldMetadata)info)); + } + } + + return res; + } + + /** + * Checks whether string matches SQL pattern. + * + * @param str String. + * @param ptrn Pattern. + * @return Whether string matches pattern. + */ + private static boolean matches(String str, String ptrn) { + return str != null && (F.isEmpty(ptrn) || + str.toUpperCase().matches(ptrn.toUpperCase().replace("%", ".*").replace("_", "."))); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/6f1dc3ac/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcMessageParser.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcMessageParser.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcMessageParser.java new file mode 100644 index 0000000..cf87712 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcMessageParser.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.odbc.jdbc; + +import org.apache.ignite.internal.GridKernalContext; +import org.apache.ignite.internal.binary.BinaryReaderExImpl; +import org.apache.ignite.internal.binary.BinaryWriterExImpl; +import org.apache.ignite.internal.binary.streams.BinaryHeapInputStream; +import org.apache.ignite.internal.binary.streams.BinaryHeapOutputStream; +import org.apache.ignite.internal.binary.streams.BinaryInputStream; +import org.apache.ignite.internal.processors.odbc.SqlListenerAbstractMessageParser; + +/** + * JDBC message parser. + */ +public class JdbcMessageParser extends SqlListenerAbstractMessageParser { + /** + * @param ctx Context. + */ + public JdbcMessageParser(GridKernalContext ctx) { + super(ctx, new JdbcObjectReader(), new JdbcObjectWriter()); + } + + /** {@inheritDoc} */ + @Override protected BinaryReaderExImpl createReader(byte[] msg) { + BinaryInputStream stream = new BinaryHeapInputStream(msg); + + return new BinaryReaderExImpl(null, stream, ctx.config().getClassLoader(), true); + } + + /** {@inheritDoc} */ + @Override protected BinaryWriterExImpl createWriter(int cap) { + return new BinaryWriterExImpl(null, new BinaryHeapOutputStream(cap), null, null); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/6f1dc3ac/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcObjectReader.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcObjectReader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcObjectReader.java new file mode 100644 index 0000000..81c8c10 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcObjectReader.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.odbc.jdbc; + +import org.apache.ignite.binary.BinaryObjectException; +import org.apache.ignite.internal.binary.BinaryReaderExImpl; +import org.apache.ignite.internal.processors.odbc.SqlListenerAbstractObjectReader; + +/** + * Binary reader with marshaling non-primitive and non-embedded objects with JDK marshaller. + */ +@SuppressWarnings("unchecked") +public class JdbcObjectReader extends SqlListenerAbstractObjectReader { + /** {@inheritDoc} */ + @Override protected Object readCustomObject(BinaryReaderExImpl reader) throws BinaryObjectException { + throw new BinaryObjectException("JDBC doesn't support custom objects."); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/6f1dc3ac/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcObjectWriter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcObjectWriter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcObjectWriter.java new file mode 100644 index 0000000..e87ef50 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcObjectWriter.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.odbc.jdbc; + +import org.apache.ignite.binary.BinaryObjectException; +import org.apache.ignite.internal.binary.BinaryWriterExImpl; +import org.apache.ignite.internal.processors.odbc.SqlListenerAbstractObjectWriter; + +/** + * Binary writer with marshaling non-primitive and non-embedded objects with JDK marshaller.. + */ +public class JdbcObjectWriter extends SqlListenerAbstractObjectWriter { + /** {@inheritDoc} */ + @Override protected void writeCustomObject(BinaryWriterExImpl writer, Object obj) + throws BinaryObjectException { + throw new BinaryObjectException("JDBC doesn't support custom objects."); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/6f1dc3ac/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcMessageParser.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcMessageParser.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcMessageParser.java index af595b9..300385f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcMessageParser.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcMessageParser.java @@ -18,262 +18,51 @@ package org.apache.ignite.internal.processors.odbc.odbc; import org.apache.ignite.IgniteException; -import org.apache.ignite.IgniteLogger; import org.apache.ignite.internal.GridKernalContext; import org.apache.ignite.internal.binary.BinaryReaderExImpl; +import org.apache.ignite.internal.binary.BinaryThreadLocalContext; import org.apache.ignite.internal.binary.BinaryWriterExImpl; import org.apache.ignite.internal.binary.GridBinaryMarshaller; import org.apache.ignite.internal.binary.streams.BinaryHeapInputStream; import org.apache.ignite.internal.binary.streams.BinaryHeapOutputStream; import org.apache.ignite.internal.binary.streams.BinaryInputStream; import org.apache.ignite.internal.processors.cache.binary.CacheObjectBinaryProcessorImpl; -import org.apache.ignite.internal.processors.odbc.OdbcQueryGetColumnsMetaRequest; -import org.apache.ignite.internal.processors.odbc.OdbcQueryGetColumnsMetaResult; -import org.apache.ignite.internal.processors.odbc.OdbcQueryGetParamsMetaRequest; -import org.apache.ignite.internal.processors.odbc.OdbcQueryGetParamsMetaResult; -import org.apache.ignite.internal.processors.odbc.OdbcQueryGetTablesMetaRequest; -import org.apache.ignite.internal.processors.odbc.OdbcQueryGetTablesMetaResult; -import org.apache.ignite.internal.processors.odbc.OdbcTableMeta; -import org.apache.ignite.internal.processors.odbc.SqlListenerColumnMeta; -import org.apache.ignite.internal.processors.odbc.SqlListenerMessageParser; -import org.apache.ignite.internal.processors.odbc.SqlListenerQueryCloseRequest; -import org.apache.ignite.internal.processors.odbc.SqlListenerQueryCloseResult; -import org.apache.ignite.internal.processors.odbc.SqlListenerQueryExecuteRequest; -import org.apache.ignite.internal.processors.odbc.SqlListenerQueryExecuteResult; -import org.apache.ignite.internal.processors.odbc.SqlListenerQueryFetchRequest; -import org.apache.ignite.internal.processors.odbc.SqlListenerQueryFetchResult; -import org.apache.ignite.internal.processors.odbc.SqlListenerRequest; -import org.apache.ignite.internal.processors.odbc.SqlListenerResponse; - -import java.util.Collection; +import org.apache.ignite.internal.processors.odbc.SqlListenerAbstractMessageParser; /** - * ODBC message parser. + * JDBC message parser. */ -public class OdbcMessageParser implements SqlListenerMessageParser { - /** Initial output stream capacity. */ - private static final int INIT_CAP = 1024; - +public class OdbcMessageParser extends SqlListenerAbstractMessageParser { /** Marshaller. */ private final GridBinaryMarshaller marsh; - /** Logger. */ - private final IgniteLogger log; - /** * @param ctx Context. */ - public OdbcMessageParser(final GridKernalContext ctx) { - CacheObjectBinaryProcessorImpl cacheObjProc = (CacheObjectBinaryProcessorImpl)ctx.cacheObjects(); + public OdbcMessageParser(GridKernalContext ctx) { + super(ctx, new OdbcObjectReader(), new OdbcObjectWriter()); - this.marsh = cacheObjProc.marshaller(); + if (ctx.cacheObjects() instanceof CacheObjectBinaryProcessorImpl) { + CacheObjectBinaryProcessorImpl cacheObjProc = (CacheObjectBinaryProcessorImpl)ctx.cacheObjects(); - this.log = ctx.log(getClass()); + marsh = cacheObjProc.marshaller(); + } + else { + throw new IgniteException("ODBC can only be used with BinaryMarshaller (please set it " + + "through IgniteConfiguration.setMarshaller())"); + } } /** {@inheritDoc} */ - @Override public SqlListenerRequest decode(byte[] msg) { - assert msg != null; - + @Override protected BinaryReaderExImpl createReader(byte[] msg) { BinaryInputStream stream = new BinaryHeapInputStream(msg); - BinaryReaderExImpl reader = new BinaryReaderExImpl(null, stream, null, true); - - byte cmd = reader.readByte(); - - SqlListenerRequest res; - - switch (cmd) { - case SqlListenerRequest.QRY_EXEC: { - String cache = reader.readString(); - String sql = reader.readString(); - int argsNum = reader.readInt(); - - Object[] params = new Object[argsNum]; - - for (int i = 0; i < argsNum; ++i) - params[i] = reader.readObjectDetached(); - - res = new SqlListenerQueryExecuteRequest(cache, sql, params); - - break; - } - - case SqlListenerRequest.QRY_FETCH: { - long queryId = reader.readLong(); - int pageSize = reader.readInt(); - - res = new SqlListenerQueryFetchRequest(queryId, pageSize); - - break; - } - - case SqlListenerRequest.QRY_CLOSE: { - long queryId = reader.readLong(); - - res = new SqlListenerQueryCloseRequest(queryId); - - break; - } - - case SqlListenerRequest.META_COLS: { - String cache = reader.readString(); - String table = reader.readString(); - String column = reader.readString(); - - res = new OdbcQueryGetColumnsMetaRequest(cache, table, column); - - break; - } - - case SqlListenerRequest.META_TBLS: { - String catalog = reader.readString(); - String schema = reader.readString(); - String table = reader.readString(); - String tableType = reader.readString(); - - res = new OdbcQueryGetTablesMetaRequest(catalog, schema, table, tableType); - - break; - } - - case SqlListenerRequest.META_PARAMS: { - String cacheName = reader.readString(); - String sqlQuery = reader.readString(); - - res = new OdbcQueryGetParamsMetaRequest(cacheName, sqlQuery); - - break; - } - - default: - throw new IgniteException("Unknown ODBC command: [cmd=" + cmd + ']'); - } - - return res; + return new BinaryReaderExImpl(marsh.context(), stream, ctx.config().getClassLoader(), true); } /** {@inheritDoc} */ - @Override public byte[] encode(SqlListenerResponse msg) { - assert msg != null; - - // Creating new binary writer - BinaryWriterExImpl writer = marsh.writer(new BinaryHeapOutputStream(INIT_CAP)); - - // Writing status. - writer.writeByte((byte) msg.status()); - - if (msg.status() != SqlListenerResponse.STATUS_SUCCESS) { - writer.writeString(msg.error()); - - return writer.array(); - } - - Object res0 = msg.response(); - - if (res0 == null) - return writer.array(); - else if (res0 instanceof SqlListenerQueryExecuteResult) { - SqlListenerQueryExecuteResult res = (SqlListenerQueryExecuteResult) res0; - - if (log.isDebugEnabled()) - log.debug("Resulting query ID: " + res.getQueryId()); - - writer.writeLong(res.getQueryId()); - - Collection<SqlListenerColumnMeta> metas = res.getColumnsMetadata(); - - assert metas != null; - - writer.writeInt(metas.size()); - - for (SqlListenerColumnMeta meta : metas) - meta.write(writer); - } - else if (res0 instanceof SqlListenerQueryFetchResult) { - SqlListenerQueryFetchResult res = (SqlListenerQueryFetchResult) res0; - - if (log.isDebugEnabled()) - log.debug("Resulting query ID: " + res.queryId()); - - writer.writeLong(res.queryId()); - - Collection<?> items0 = res.items(); - - assert items0 != null; - - writer.writeBoolean(res.last()); - - writer.writeInt(items0.size()); - - for (Object row0 : items0) { - if (row0 != null) { - Collection<?> row = (Collection<?>)row0; - - writer.writeInt(row.size()); - - for (Object obj : row) { - if (obj == null) { - writer.writeObjectDetached(null); - continue; - } - - Class<?> cls = obj.getClass(); - - if (cls == java.sql.Time.class) - writer.writeTime((java.sql.Time)obj); - else if (cls == java.sql.Timestamp.class) - writer.writeTimestamp((java.sql.Timestamp)obj); - else if (cls == java.sql.Date.class) - writer.writeDate((java.util.Date)obj); - else - writer.writeObjectDetached(obj); - } - } - } - } - else if (res0 instanceof SqlListenerQueryCloseResult) { - SqlListenerQueryCloseResult res = (SqlListenerQueryCloseResult) res0; - - if (log.isDebugEnabled()) - log.debug("Resulting query ID: " + res.getQueryId()); - - writer.writeLong(res.getQueryId()); - } - else if (res0 instanceof OdbcQueryGetColumnsMetaResult) { - OdbcQueryGetColumnsMetaResult res = (OdbcQueryGetColumnsMetaResult) res0; - - Collection<SqlListenerColumnMeta> columnsMeta = res.meta(); - - assert columnsMeta != null; - - writer.writeInt(columnsMeta.size()); - - for (SqlListenerColumnMeta columnMeta : columnsMeta) - columnMeta.write(writer); - } - else if (res0 instanceof OdbcQueryGetTablesMetaResult) { - OdbcQueryGetTablesMetaResult res = (OdbcQueryGetTablesMetaResult) res0; - - Collection<OdbcTableMeta> tablesMeta = res.meta(); - - assert tablesMeta != null; - - writer.writeInt(tablesMeta.size()); - - for (OdbcTableMeta tableMeta : tablesMeta) - tableMeta.writeBinary(writer); - } - else if (res0 instanceof OdbcQueryGetParamsMetaResult) { - OdbcQueryGetParamsMetaResult res = (OdbcQueryGetParamsMetaResult) res0; - - byte[] typeIds = res.typeIds(); - - writer.writeObjectDetached(typeIds); - } - else - assert false : "Should not reach here."; - - return writer.array(); + @Override protected BinaryWriterExImpl createWriter(int cap) { + return new BinaryWriterExImpl(marsh.context(), new BinaryHeapOutputStream(cap), + BinaryThreadLocalContext.get().schemaHolder(), null); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/6f1dc3ac/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcObjectReader.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcObjectReader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcObjectReader.java new file mode 100644 index 0000000..586fbc5 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcObjectReader.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.odbc.odbc; + +import org.apache.ignite.binary.BinaryObjectException; +import org.apache.ignite.internal.binary.BinaryReaderExImpl; +import org.apache.ignite.internal.processors.odbc.SqlListenerAbstractObjectReader; + +/** + * Binary reader with marshaling non-primitive and non-embedded objects with JDK marshaller. + */ +@SuppressWarnings("unchecked") +public class OdbcObjectReader extends SqlListenerAbstractObjectReader { + /** {@inheritDoc} */ + @Override protected Object readCustomObject(BinaryReaderExImpl reader) throws BinaryObjectException { + return reader.readObjectDetached(); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/6f1dc3ac/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcObjectWriter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcObjectWriter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcObjectWriter.java new file mode 100644 index 0000000..c2f3aba --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcObjectWriter.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.odbc.odbc; + +import org.apache.ignite.binary.BinaryObjectException; +import org.apache.ignite.internal.binary.BinaryWriterExImpl; +import org.apache.ignite.internal.processors.odbc.SqlListenerAbstractObjectWriter; + +/** + * Binary writer with marshaling non-primitive and non-embedded objects with JDK marshaller.. + */ +public class OdbcObjectWriter extends SqlListenerAbstractObjectWriter { + /** {@inheritDoc} */ + @Override protected void writeCustomObject(BinaryWriterExImpl writer, Object obj) throws BinaryObjectException { + writer.writeObjectDetached(obj); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/6f1dc3ac/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcRequestHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcRequestHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcRequestHandler.java deleted file mode 100644 index eabc486..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcRequestHandler.java +++ /dev/null @@ -1,513 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.odbc.odbc; - -import org.apache.ignite.IgniteCache; -import org.apache.ignite.IgniteLogger; -import org.apache.ignite.cache.query.QueryCursor; -import org.apache.ignite.cache.query.SqlFieldsQuery; -import org.apache.ignite.internal.GridKernalContext; -import org.apache.ignite.internal.binary.GridBinaryMarshaller; -import org.apache.ignite.internal.processors.cache.QueryCursorImpl; -import org.apache.ignite.internal.processors.odbc.OdbcQueryGetColumnsMetaRequest; -import org.apache.ignite.internal.processors.odbc.OdbcQueryGetColumnsMetaResult; -import org.apache.ignite.internal.processors.odbc.OdbcQueryGetParamsMetaRequest; -import org.apache.ignite.internal.processors.odbc.OdbcQueryGetParamsMetaResult; -import org.apache.ignite.internal.processors.odbc.OdbcQueryGetTablesMetaRequest; -import org.apache.ignite.internal.processors.odbc.OdbcQueryGetTablesMetaResult; -import org.apache.ignite.internal.processors.odbc.OdbcTableMeta; -import org.apache.ignite.internal.processors.odbc.OdbcUtils; -import org.apache.ignite.internal.processors.odbc.SqlListenerColumnMeta; -import org.apache.ignite.internal.processors.odbc.SqlListenerQueryCloseRequest; -import org.apache.ignite.internal.processors.odbc.SqlListenerQueryCloseResult; -import org.apache.ignite.internal.processors.odbc.SqlListenerQueryExecuteRequest; -import org.apache.ignite.internal.processors.odbc.SqlListenerQueryExecuteResult; -import org.apache.ignite.internal.processors.odbc.SqlListenerQueryFetchRequest; -import org.apache.ignite.internal.processors.odbc.SqlListenerQueryFetchResult; -import org.apache.ignite.internal.processors.odbc.SqlListenerRequest; -import org.apache.ignite.internal.processors.odbc.SqlListenerRequestHandler; -import org.apache.ignite.internal.processors.odbc.SqlListenerResponse; -import org.apache.ignite.internal.processors.odbc.odbc.escape.OdbcEscapeUtils; -import org.apache.ignite.internal.processors.query.GridQueryFieldMetadata; -import org.apache.ignite.internal.processors.query.GridQueryTypeDescriptor; -import org.apache.ignite.internal.util.GridSpinBusyLock; -import org.apache.ignite.internal.util.typedef.F; -import org.apache.ignite.internal.util.typedef.internal.U; -import org.apache.ignite.lang.IgniteBiTuple; - -import java.sql.ParameterMetaData; -import java.sql.PreparedStatement; -import java.sql.Types; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.atomic.AtomicLong; - -import static org.apache.ignite.internal.processors.odbc.SqlListenerRequest.META_COLS; -import static org.apache.ignite.internal.processors.odbc.SqlListenerRequest.META_PARAMS; -import static org.apache.ignite.internal.processors.odbc.SqlListenerRequest.META_TBLS; -import static org.apache.ignite.internal.processors.odbc.SqlListenerRequest.QRY_CLOSE; -import static org.apache.ignite.internal.processors.odbc.SqlListenerRequest.QRY_EXEC; -import static org.apache.ignite.internal.processors.odbc.SqlListenerRequest.QRY_FETCH; - -/** - * SQL query handler. - */ -public class OdbcRequestHandler implements SqlListenerRequestHandler { - /** Query ID sequence. */ - private static final AtomicLong QRY_ID_GEN = new AtomicLong(); - - /** Kernel context. */ - private final GridKernalContext ctx; - - /** Logger. */ - private final IgniteLogger log; - - /** Busy lock. */ - private final GridSpinBusyLock busyLock; - - /** Maximum allowed cursors. */ - private final int maxCursors; - - /** Current queries cursors. */ - private final ConcurrentHashMap<Long, IgniteBiTuple<QueryCursor, Iterator>> qryCursors = new ConcurrentHashMap<>(); - - /** Distributed joins flag. */ - private final boolean distributedJoins; - - /** Enforce join order flag. */ - private final boolean enforceJoinOrder; - - /** - * Constructor. - * - * @param ctx Context. - * @param busyLock Shutdown latch. - * @param maxCursors Maximum allowed cursors. - * @param distributedJoins Distributed joins flag. - * @param enforceJoinOrder Enforce join order flag. - */ - public OdbcRequestHandler(GridKernalContext ctx, GridSpinBusyLock busyLock, int maxCursors, - boolean distributedJoins, boolean enforceJoinOrder) { - this.ctx = ctx; - this.busyLock = busyLock; - this.maxCursors = maxCursors; - this.distributedJoins = distributedJoins; - this.enforceJoinOrder = enforceJoinOrder; - - log = ctx.log(getClass()); - } - - /** {@inheritDoc} */ - @Override public SqlListenerResponse handle(SqlListenerRequest req) { - assert req != null; - - if (!busyLock.enterBusy()) - return new SqlListenerResponse(SqlListenerResponse.STATUS_FAILED, - "Failed to handle ODBC request because node is stopping: " + req); - - try { - switch (req.command()) { - case QRY_EXEC: - return executeQuery((SqlListenerQueryExecuteRequest)req); - - case QRY_FETCH: - return fetchQuery((SqlListenerQueryFetchRequest)req); - - case QRY_CLOSE: - return closeQuery((SqlListenerQueryCloseRequest)req); - - case META_COLS: - return getColumnsMeta((OdbcQueryGetColumnsMetaRequest)req); - - case META_TBLS: - return getTablesMeta((OdbcQueryGetTablesMetaRequest)req); - - case META_PARAMS: - return getParamsMeta((OdbcQueryGetParamsMetaRequest)req); - } - - return new SqlListenerResponse(SqlListenerResponse.STATUS_FAILED, "Unsupported ODBC request: " + req); - } - finally { - busyLock.leaveBusy(); - } - } - - /** - * {@link SqlListenerQueryExecuteRequest} command handler. - * - * @param req Execute query request. - * @return Response. - */ - private SqlListenerResponse executeQuery(SqlListenerQueryExecuteRequest req) { - int cursorCnt = qryCursors.size(); - - if (maxCursors > 0 && cursorCnt >= maxCursors) - return new SqlListenerResponse(SqlListenerResponse.STATUS_FAILED, "Too many opened cursors (either close other " + - "opened cursors or increase the limit through OdbcConfiguration.setMaxOpenCursors()) " + - "[maximum=" + maxCursors + ", current=" + cursorCnt + ']'); - - long qryId = QRY_ID_GEN.getAndIncrement(); - - try { - String sql = OdbcEscapeUtils.parse(req.sqlQuery()); - - if (log.isDebugEnabled()) - log.debug("ODBC query parsed [reqId=" + req.requestId() + ", original=" + req.sqlQuery() + - ", parsed=" + sql + ']'); - - SqlFieldsQuery qry = new SqlFieldsQuery(sql); - - qry.setArgs(req.arguments()); - - qry.setDistributedJoins(distributedJoins); - qry.setEnforceJoinOrder(enforceJoinOrder); - - IgniteCache<Object, Object> cache0 = ctx.grid().cache(req.cacheName()); - - if (cache0 == null) - return new SqlListenerResponse(SqlListenerResponse.STATUS_FAILED, - "Cache doesn't exist (did you configure it?): " + req.cacheName()); - - IgniteCache<Object, Object> cache = cache0.withKeepBinary(); - - if (cache == null) - return new SqlListenerResponse(SqlListenerResponse.STATUS_FAILED, - "Can not get cache with keep binary: " + req.cacheName()); - - QueryCursor qryCur = cache.query(qry); - - qryCursors.put(qryId, new IgniteBiTuple<QueryCursor, Iterator>(qryCur, null)); - - List<?> fieldsMeta = ((QueryCursorImpl) qryCur).fieldsMeta(); - - SqlListenerQueryExecuteResult res = new SqlListenerQueryExecuteResult(qryId, convertMetadata(fieldsMeta)); - - return new SqlListenerResponse(res); - } - catch (Exception e) { - qryCursors.remove(qryId); - - U.error(log, "Failed to execute SQL query [reqId=" + req.requestId() + ", req=" + req + ']', e); - - return new SqlListenerResponse(SqlListenerResponse.STATUS_FAILED, e.toString()); - } - } - - /** - * {@link SqlListenerQueryCloseRequest} command handler. - * - * @param req Execute query request. - * @return Response. - */ - private SqlListenerResponse closeQuery(SqlListenerQueryCloseRequest req) { - try { - IgniteBiTuple<QueryCursor, Iterator> tuple = qryCursors.get(req.queryId()); - - if (tuple == null) - return new SqlListenerResponse(SqlListenerResponse.STATUS_FAILED, - "Failed to find query with ID: " + req.queryId()); - - QueryCursor cur = tuple.get1(); - - assert(cur != null); - - cur.close(); - - qryCursors.remove(req.queryId()); - - SqlListenerQueryCloseResult res = new SqlListenerQueryCloseResult(req.queryId()); - - return new SqlListenerResponse(res); - } - catch (Exception e) { - qryCursors.remove(req.queryId()); - - U.error(log, "Failed to close SQL query [reqId=" + req.requestId() + ", req=" + req.queryId() + ']', e); - - return new SqlListenerResponse(SqlListenerResponse.STATUS_FAILED, e.toString()); - } - } - - /** - * {@link SqlListenerQueryFetchRequest} command handler. - * - * @param req Execute query request. - * @return Response. - */ - private SqlListenerResponse fetchQuery(SqlListenerQueryFetchRequest req) { - try { - IgniteBiTuple<QueryCursor, Iterator> tuple = qryCursors.get(req.queryId()); - - if (tuple == null) - return new SqlListenerResponse(SqlListenerResponse.STATUS_FAILED, - "Failed to find query with ID: " + req.queryId()); - - Iterator iter = tuple.get2(); - - if (iter == null) { - QueryCursor cur = tuple.get1(); - - iter = cur.iterator(); - - tuple.put(cur, iter); - } - - List<Object> items = new ArrayList<>(); - - for (int i = 0; i < req.pageSize() && iter.hasNext(); ++i) - items.add(iter.next()); - - SqlListenerQueryFetchResult res = new SqlListenerQueryFetchResult(req.queryId(), items, !iter.hasNext()); - - return new SqlListenerResponse(res); - } - catch (Exception e) { - U.error(log, "Failed to fetch SQL query result [reqId=" + req.requestId() + ", req=" + req + ']', e); - - return new SqlListenerResponse(SqlListenerResponse.STATUS_FAILED, e.toString()); - } - } - - /** - * {@link OdbcQueryGetColumnsMetaRequest} command handler. - * - * @param req Get columns metadata request. - * @return Response. - */ - private SqlListenerResponse getColumnsMeta(OdbcQueryGetColumnsMetaRequest req) { - try { - List<SqlListenerColumnMeta> meta = new ArrayList<>(); - - String cacheName; - String tableName; - - if (req.tableName().contains(".")) { - // Parsing two-part table name. - String[] parts = req.tableName().split("\\."); - - cacheName = OdbcUtils.removeQuotationMarksIfNeeded(parts[0]); - - tableName = parts[1]; - } - else { - cacheName = OdbcUtils.removeQuotationMarksIfNeeded(req.cacheName()); - - tableName = req.tableName(); - } - - Collection<GridQueryTypeDescriptor> tablesMeta = ctx.query().types(cacheName); - - for (GridQueryTypeDescriptor table : tablesMeta) { - if (!matches(table.name(), tableName)) - continue; - - for (Map.Entry<String, Class<?>> field : table.fields().entrySet()) { - if (!matches(field.getKey(), req.columnName())) - continue; - - SqlListenerColumnMeta columnMeta = new SqlListenerColumnMeta(req.cacheName(), table.name(), - field.getKey(), field.getValue()); - - if (!meta.contains(columnMeta)) - meta.add(columnMeta); - } - } - - OdbcQueryGetColumnsMetaResult res = new OdbcQueryGetColumnsMetaResult(meta); - - return new SqlListenerResponse(res); - } - catch (Exception e) { - U.error(log, "Failed to get columns metadata [reqId=" + req.requestId() + ", req=" + req + ']', e); - - return new SqlListenerResponse(SqlListenerResponse.STATUS_FAILED, e.toString()); - } - } - - /** - * {@link OdbcQueryGetTablesMetaRequest} command handler. - * - * @param req Get tables metadata request. - * @return Response. - */ - private SqlListenerResponse getTablesMeta(OdbcQueryGetTablesMetaRequest req) { - try { - List<OdbcTableMeta> meta = new ArrayList<>(); - - String realSchema = OdbcUtils.removeQuotationMarksIfNeeded(req.schema()); - - for (String cacheName : ctx.cache().cacheNames()) - { - if (!matches(cacheName, realSchema)) - continue; - - Collection<GridQueryTypeDescriptor> tablesMeta = ctx.query().types(cacheName); - - for (GridQueryTypeDescriptor table : tablesMeta) { - if (!matches(table.name(), req.table())) - continue; - - if (!matches("TABLE", req.tableType())) - continue; - - OdbcTableMeta tableMeta = new OdbcTableMeta(null, cacheName, table.name(), "TABLE"); - - if (!meta.contains(tableMeta)) - meta.add(tableMeta); - } - } - - OdbcQueryGetTablesMetaResult res = new OdbcQueryGetTablesMetaResult(meta); - - return new SqlListenerResponse(res); - } - catch (Exception e) { - U.error(log, "Failed to get tables metadata [reqId=" + req.requestId() + ", req=" + req + ']', e); - - return new SqlListenerResponse(SqlListenerResponse.STATUS_FAILED, e.toString()); - } - } - - /** - * {@link OdbcQueryGetParamsMetaRequest} command handler. - * - * @param req Get params metadata request. - * @return Response. - */ - private SqlListenerResponse getParamsMeta(OdbcQueryGetParamsMetaRequest req) { - try { - PreparedStatement stmt = ctx.query().prepareNativeStatement(req.cacheName(), req.query()); - - ParameterMetaData pmd = stmt.getParameterMetaData(); - - byte[] typeIds = new byte[pmd.getParameterCount()]; - - for (int i = 1; i <= pmd.getParameterCount(); ++i) { - int sqlType = pmd.getParameterType(i); - - typeIds[i - 1] = sqlTypeToBinary(sqlType); - } - - OdbcQueryGetParamsMetaResult res = new OdbcQueryGetParamsMetaResult(typeIds); - - return new SqlListenerResponse(res); - } - catch (Exception e) { - U.error(log, "Failed to get params metadata [reqId=" + req.requestId() + ", req=" + req + ']', e); - - return new SqlListenerResponse(SqlListenerResponse.STATUS_FAILED, e.toString()); - } - } - - /** - * Convert {@link java.sql.Types} to binary type constant (See {@link GridBinaryMarshaller} constants). - * - * @param sqlType SQL type. - * @return Binary type. - */ - private static byte sqlTypeToBinary(int sqlType) { - switch (sqlType) { - case Types.BIGINT: - return GridBinaryMarshaller.LONG; - - case Types.BOOLEAN: - return GridBinaryMarshaller.BOOLEAN; - - case Types.DATE: - return GridBinaryMarshaller.DATE; - - case Types.DOUBLE: - return GridBinaryMarshaller.DOUBLE; - - case Types.FLOAT: - case Types.REAL: - return GridBinaryMarshaller.FLOAT; - - case Types.NUMERIC: - case Types.DECIMAL: - return GridBinaryMarshaller.DECIMAL; - - case Types.INTEGER: - return GridBinaryMarshaller.INT; - - case Types.SMALLINT: - return GridBinaryMarshaller.SHORT; - - case Types.TIME: - return GridBinaryMarshaller.TIME; - - case Types.TIMESTAMP: - return GridBinaryMarshaller.TIMESTAMP; - - case Types.TINYINT: - return GridBinaryMarshaller.BYTE; - - case Types.CHAR: - case Types.VARCHAR: - case Types.LONGNVARCHAR: - return GridBinaryMarshaller.STRING; - - case Types.NULL: - return GridBinaryMarshaller.NULL; - - case Types.BINARY: - case Types.VARBINARY: - case Types.LONGVARBINARY: - default: - return GridBinaryMarshaller.BYTE_ARR; - } - } - - /** - * Convert metadata in collection from {@link GridQueryFieldMetadata} to - * {@link SqlListenerColumnMeta}. - * - * @param meta Internal query field metadata. - * @return Odbc query field metadata. - */ - private static Collection<SqlListenerColumnMeta> convertMetadata(Collection<?> meta) { - List<SqlListenerColumnMeta> res = new ArrayList<>(); - - if (meta != null) { - for (Object info : meta) { - assert info instanceof GridQueryFieldMetadata; - - res.add(new SqlListenerColumnMeta((GridQueryFieldMetadata)info)); - } - } - - return res; - } - - /** - * Checks whether string matches SQL pattern. - * - * @param str String. - * @param ptrn Pattern. - * @return Whether string matches pattern. - */ - private static boolean matches(String str, String ptrn) { - return str != null && (F.isEmpty(ptrn) || - str.toUpperCase().matches(ptrn.toUpperCase().replace("%", ".*").replace("_", "."))); - } -}
