Repository: ignite Updated Branches: refs/heads/master 61621b88d -> 87e37975d
IGNITE-6357: Added support of multiple SQL statements for ODBC Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/87e37975 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/87e37975 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/87e37975 Branch: refs/heads/master Commit: 87e37975d2cd24e2435de627d3ad6d8544af5972 Parents: 61621b8 Author: Igor Sapego <[email protected]> Authored: Tue Oct 31 14:20:45 2017 +0300 Committer: Igor Sapego <[email protected]> Committed: Tue Oct 31 14:20:45 2017 +0300 ---------------------------------------------------------------------- .../processors/odbc/odbc/OdbcMessageParser.java | 60 +++++- .../odbc/odbc/OdbcQueryExecuteBatchResult.java | 20 +- .../odbc/odbc/OdbcQueryExecuteResult.java | 11 +- .../odbc/odbc/OdbcQueryMoreResultsRequest.java | 61 ++++++ .../odbc/odbc/OdbcQueryMoreResultsResult.java | 66 ++++++ .../processors/odbc/odbc/OdbcQueryResults.java | 106 ++++++++++ .../processors/odbc/odbc/OdbcRequest.java | 3 + .../odbc/odbc/OdbcRequestHandler.java | 175 +++++++--------- .../processors/odbc/odbc/OdbcResultSet.java | 101 ++++++++++ .../processors/odbc/odbc/OdbcUtils.java | 31 +++ .../examples/odbc-example/src/odbc_example.cpp | 8 +- .../cpp/odbc-test/src/queries_test.cpp | 200 ++++++++++++++++++- .../cpp/odbc/include/ignite/odbc/message.h | 97 ++++++++- .../include/ignite/odbc/query/batch_query.h | 16 +- .../ignite/odbc/query/column_metadata_query.h | 7 + .../odbc/include/ignite/odbc/query/data_query.h | 22 +- .../ignite/odbc/query/foreign_keys_query.h | 7 + .../ignite/odbc/query/primary_keys_query.h | 7 + .../cpp/odbc/include/ignite/odbc/query/query.h | 7 + .../ignite/odbc/query/special_columns_query.h | 7 + .../ignite/odbc/query/table_metadata_query.h | 7 + .../include/ignite/odbc/query/type_info_query.h | 7 + .../cpp/odbc/include/ignite/odbc/statement.h | 6 +- modules/platforms/cpp/odbc/src/cursor.cpp | 2 +- modules/platforms/cpp/odbc/src/message.cpp | 62 +++++- modules/platforms/cpp/odbc/src/odbc.cpp | 2 +- .../cpp/odbc/src/query/batch_query.cpp | 80 ++++---- .../odbc/src/query/column_metadata_query.cpp | 5 + .../platforms/cpp/odbc/src/query/data_query.cpp | 93 ++++++++- .../cpp/odbc/src/query/foreign_keys_query.cpp | 5 + .../cpp/odbc/src/query/primary_keys_query.cpp | 5 + .../odbc/src/query/special_columns_query.cpp | 5 + .../cpp/odbc/src/query/table_metadata_query.cpp | 5 + .../cpp/odbc/src/query/type_info_query.cpp | 5 + modules/platforms/cpp/odbc/src/statement.cpp | 14 +- 35 files changed, 1103 insertions(+), 212 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/87e37975/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcMessageParser.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcMessageParser.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcMessageParser.java index 04e2e25..bf74bc5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcMessageParser.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcMessageParser.java @@ -165,6 +165,15 @@ public class OdbcMessageParser implements ClientListenerMessageParser { break; } + case OdbcRequest.MORE_RESULTS: { + long queryId = reader.readLong(); + int pageSize = reader.readInt(); + + res = new OdbcQueryMoreResultsRequest(queryId, pageSize); + + break; + } + default: throw new IgniteException("Unknown ODBC command: [cmd=" + cmd + ']'); } @@ -233,13 +242,13 @@ public class OdbcMessageParser implements ClientListenerMessageParser { for (OdbcColumnMeta meta : metas) meta.write(writer); - writer.writeLong(res.affectedRows()); + writeAffectedRows(writer, res.affectedRows()); } else if (res0 instanceof OdbcQueryExecuteBatchResult) { OdbcQueryExecuteBatchResult res = (OdbcQueryExecuteBatchResult) res0; writer.writeBoolean(res.errorMessage() == null); - writer.writeLong(res.rowsAffected()); + writeAffectedRows(writer, res.affectedRows()); if (res.errorMessage() != null) { writer.writeLong(res.errorSetIdx()); @@ -276,6 +285,33 @@ public class OdbcMessageParser implements ClientListenerMessageParser { } } } + else if (res0 instanceof OdbcQueryMoreResultsResult) { + OdbcQueryMoreResultsResult res = (OdbcQueryMoreResultsResult) res0; + + if (log.isDebugEnabled()) + log.debug("Resulting query ID: " + res.queryId()); + + writer.writeLong(res.queryId()); + + Collection<?> items0 = res.items(); + + assert items0 != null; + + writer.writeBoolean(res.last()); + + writer.writeInt(items0.size()); + + for (Object row0 : items0) { + if (row0 != null) { + Collection<?> row = (Collection<?>)row0; + + writer.writeInt(row.size()); + + for (Object obj : row) + SqlListenerUtils.writeObject(writer, obj, true); + } + } + } else if (res0 instanceof OdbcQueryCloseResult) { OdbcQueryCloseResult res = (OdbcQueryCloseResult) res0; @@ -320,4 +356,24 @@ public class OdbcMessageParser implements ClientListenerMessageParser { return writer.array(); } + + /** + * @param writer Writer to use. + * @param affectedRows Affected rows. + */ + private void writeAffectedRows(BinaryWriterExImpl writer, Collection<Long> affectedRows) { + if (ver.compareTo(OdbcConnectionContext.VER_2_3_0) < 0) { + long summ = 0; + + for (Long value : affectedRows) + summ += value == null ? 0 : value; + + writer.writeLong(summ); + } + else { + writer.writeInt(affectedRows.size()); + for (Long value : affectedRows) + writer.writeLong(value); + } + } } http://git-wip-us.apache.org/repos/asf/ignite/blob/87e37975/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcQueryExecuteBatchResult.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcQueryExecuteBatchResult.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcQueryExecuteBatchResult.java index c8f61dc..e86c7c8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcQueryExecuteBatchResult.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcQueryExecuteBatchResult.java @@ -17,6 +17,7 @@ package org.apache.ignite.internal.processors.odbc.odbc; +import java.util.Collection; import org.apache.ignite.internal.processors.odbc.ClientListenerResponse; import org.jetbrains.annotations.Nullable; @@ -25,7 +26,7 @@ import org.jetbrains.annotations.Nullable; */ public class OdbcQueryExecuteBatchResult { /** Rows affected. */ - private final long rowsAffected; + private final Collection<Long> affectedRows; /** Index of the set which caused an error. */ private final long errorSetIdx; @@ -37,23 +38,24 @@ public class OdbcQueryExecuteBatchResult { private final String errorMessage; /** - * @param rowsAffected Number of rows affected by the query. + * @param affectedRows Number of rows affected by the query. */ - public OdbcQueryExecuteBatchResult(long rowsAffected) { - this.rowsAffected = rowsAffected; + public OdbcQueryExecuteBatchResult(Collection<Long> affectedRows) { + this.affectedRows = affectedRows; this.errorSetIdx = -1; this.errorMessage = null; this.errorCode = ClientListenerResponse.STATUS_SUCCESS; } /** - * @param rowsAffected Number of rows affected by the query. + * @param affectedRows Number of rows affected by the query. * @param errorSetIdx Sets processed. * @param errorCode Error code. * @param errorMessage Error message. */ - public OdbcQueryExecuteBatchResult(long rowsAffected, long errorSetIdx, int errorCode, String errorMessage) { - this.rowsAffected = rowsAffected; + public OdbcQueryExecuteBatchResult(Collection<Long> affectedRows, long errorSetIdx, int errorCode, + String errorMessage) { + this.affectedRows = affectedRows; this.errorSetIdx = errorSetIdx; this.errorMessage = errorMessage; this.errorCode = errorCode; @@ -62,8 +64,8 @@ public class OdbcQueryExecuteBatchResult { /** * @return Number of rows affected by the query. */ - public long rowsAffected() { - return rowsAffected; + public Collection<Long> affectedRows() { + return affectedRows; } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/87e37975/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcQueryExecuteResult.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcQueryExecuteResult.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcQueryExecuteResult.java index 38dd0b4..8182e97 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcQueryExecuteResult.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcQueryExecuteResult.java @@ -29,15 +29,16 @@ public class OdbcQueryExecuteResult { /** Fields metadata. */ private final Collection<OdbcColumnMeta> columnsMetadata; - /** Rows affected by the query. */ - private final long affectedRows; + /** Rows affected by the statements. */ + private final Collection<Long> affectedRows; /** * @param queryId Query ID. * @param columnsMetadata Columns metadata. * @param affectedRows Affected rows. */ - public OdbcQueryExecuteResult(long queryId, Collection<OdbcColumnMeta> columnsMetadata, long affectedRows) { + public OdbcQueryExecuteResult(long queryId, Collection<OdbcColumnMeta> columnsMetadata, + Collection<Long> affectedRows) { this.queryId = queryId; this.columnsMetadata = columnsMetadata; this.affectedRows = affectedRows; @@ -58,9 +59,9 @@ public class OdbcQueryExecuteResult { } /** - * @return Number of rows affected by the query. + * @return Number of rows affected by the statements. */ - public long affectedRows() { + public Collection<Long> affectedRows() { return affectedRows; } } http://git-wip-us.apache.org/repos/asf/ignite/blob/87e37975/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcQueryMoreResultsRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcQueryMoreResultsRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcQueryMoreResultsRequest.java new file mode 100644 index 0000000..5651f6d --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcQueryMoreResultsRequest.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.odbc.odbc; + +import org.apache.ignite.internal.util.typedef.internal.S; + +/** + * SQL listener query fetch request. + */ +public class OdbcQueryMoreResultsRequest extends OdbcRequest { + /** Query ID. */ + private final long queryId; + + /** Page size - maximum number of rows to return. */ + private final int pageSize; + + /** + * @param queryId Query ID. + * @param pageSize Page size. + */ + public OdbcQueryMoreResultsRequest(long queryId, int pageSize) { + super(MORE_RESULTS); + + this.queryId = queryId; + this.pageSize = pageSize; + } + + /** + * @return Page size. + */ + public int pageSize() { + return pageSize; + } + + /** + * @return Query ID. + */ + public long queryId() { + return queryId; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(OdbcQueryMoreResultsRequest.class, this); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/87e37975/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcQueryMoreResultsResult.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcQueryMoreResultsResult.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcQueryMoreResultsResult.java new file mode 100644 index 0000000..faa5e27 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcQueryMoreResultsResult.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.odbc.odbc; + +import java.util.Collection; + +/** + * SQL listener query fetch result. + */ +public class OdbcQueryMoreResultsResult { + /** Query ID. */ + private final long queryId; + + /** Query result rows. */ + private final Collection<?> items; + + /** Flag indicating the query has no non-fetched results. */ + private final boolean last; + + /** + * @param queryId Query ID. + * @param items Query result rows. + * @param last Flag indicating the query has no unfetched results. + */ + public OdbcQueryMoreResultsResult(long queryId, Collection<?> items, boolean last){ + this.queryId = queryId; + this.items = items; + this.last = last; + } + + /** + * @return Query ID. + */ + public long queryId() { + return queryId; + } + + /** + * @return Query result rows. + */ + public Collection<?> items() { + return items; + } + + /** + * @return Flag indicating the query has no non-fetched results. + */ + public boolean last() { + return last; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/87e37975/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcQueryResults.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcQueryResults.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcQueryResults.java new file mode 100644 index 0000000..23788c7 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcQueryResults.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.odbc.odbc; + +import java.util.ArrayList; +import java.util.List; +import org.apache.ignite.cache.query.FieldsQueryCursor; +import org.apache.ignite.internal.processors.cache.QueryCursorImpl; + +/** + * ODBC result set + */ +public class OdbcQueryResults { + /** Current cursor. */ + private final List<FieldsQueryCursor<List<?>>> cursors; + + /** Rows affected. */ + private final List<Long> rowsAffected; + + /** Current result set. */ + private OdbcResultSet currentResultSet; + + /** Current result set index. */ + private int currentResultSetIdx; + + /** + * @param cursors Result set cursors. + */ + OdbcQueryResults(List<FieldsQueryCursor<List<?>>> cursors) { + this.cursors = cursors; + this.currentResultSetIdx = 0; + + rowsAffected = new ArrayList<>(cursors.size()); + + for (FieldsQueryCursor<List<?>> cursor : cursors) + rowsAffected.add(OdbcUtils.rowsAffected(cursor)); + + nextResultSet(); + } + + /** + * Get affected rows for all result sets. + * @return List of numbers of table rows affected by every statement. + */ + public List<Long> rowsAffected() { + return rowsAffected; + } + + /** + * @return {@code true} if any of the result sets still has non-fetched rows. + */ + public boolean hasUnfetchedRows() { + if (currentResultSet != null && currentResultSet.hasUnfetchedRows()) + return true; + + for (FieldsQueryCursor<List<?>> cursor : cursors) { + QueryCursorImpl<List<?>> cursor0 = (QueryCursorImpl<List<?>>)cursor; + + if (cursor0.isQuery()) + return true; + } + return false; + } + + /** + * Close all cursors. + */ + public void closeAll() { + for (FieldsQueryCursor<List<?>> cursor : cursors) + cursor.close(); + } + + /** + * @return Current result set. + */ + public OdbcResultSet currentResultSet() { + return currentResultSet; + } + + /** + * Move to next result set. + */ + public void nextResultSet() { + currentResultSet = null; + + if (currentResultSetIdx != cursors.size()) { + currentResultSet = new OdbcResultSet(cursors.get(currentResultSetIdx)); + ++currentResultSetIdx; + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/87e37975/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcRequest.java index 4b21b79..9b9aa01 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcRequest.java @@ -44,6 +44,9 @@ public class OdbcRequest extends ClientListenerRequestNoId { /** Execute sql query with the batch of parameters. */ public static final int QRY_EXEC_BATCH = 8; + /** Get next result set. */ + public static final int MORE_RESULTS = 9; + /** Command. */ private final int cmd; http://git-wip-us.apache.org/repos/asf/ignite/blob/87e37975/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcRequestHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcRequestHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcRequestHandler.java index 32375fd..7f6b48d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcRequestHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcRequestHandler.java @@ -22,13 +22,13 @@ import java.sql.PreparedStatement; import java.sql.Types; import java.util.ArrayList; import java.util.Collection; -import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicLong; import org.apache.ignite.IgniteException; import org.apache.ignite.IgniteLogger; +import org.apache.ignite.cache.query.FieldsQueryCursor; import org.apache.ignite.cache.query.QueryCursor; import org.apache.ignite.cache.query.SqlFieldsQuery; import org.apache.ignite.internal.GridKernalContext; @@ -41,17 +41,16 @@ import org.apache.ignite.internal.processors.odbc.ClientListenerRequest; import org.apache.ignite.internal.processors.odbc.ClientListenerRequestHandler; import org.apache.ignite.internal.processors.odbc.ClientListenerResponse; import org.apache.ignite.internal.processors.odbc.odbc.escape.OdbcEscapeUtils; -import org.apache.ignite.internal.processors.query.GridQueryFieldMetadata; import org.apache.ignite.internal.processors.query.GridQueryIndexing; import org.apache.ignite.internal.processors.query.GridQueryTypeDescriptor; import org.apache.ignite.internal.util.GridSpinBusyLock; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.U; -import org.apache.ignite.lang.IgniteBiTuple; import static org.apache.ignite.internal.processors.odbc.odbc.OdbcRequest.META_COLS; import static org.apache.ignite.internal.processors.odbc.odbc.OdbcRequest.META_PARAMS; import static org.apache.ignite.internal.processors.odbc.odbc.OdbcRequest.META_TBLS; +import static org.apache.ignite.internal.processors.odbc.odbc.OdbcRequest.MORE_RESULTS; import static org.apache.ignite.internal.processors.odbc.odbc.OdbcRequest.QRY_CLOSE; import static org.apache.ignite.internal.processors.odbc.odbc.OdbcRequest.QRY_EXEC; import static org.apache.ignite.internal.processors.odbc.odbc.OdbcRequest.QRY_EXEC_BATCH; @@ -77,7 +76,7 @@ public class OdbcRequestHandler implements ClientListenerRequestHandler { private final int maxCursors; /** Current queries cursors. */ - private final ConcurrentHashMap<Long, IgniteBiTuple<QueryCursor, Iterator>> qryCursors = new ConcurrentHashMap<>(); + private final ConcurrentHashMap<Long, OdbcQueryResults> qryResults = new ConcurrentHashMap<>(); /** Distributed joins flag. */ private final boolean distributedJoins; @@ -157,6 +156,9 @@ public class OdbcRequestHandler implements ClientListenerRequestHandler { case META_PARAMS: return getParamsMeta((OdbcQueryGetParamsMetaRequest)req); + + case MORE_RESULTS: + return moreResults((OdbcQueryMoreResultsRequest)req); } return new OdbcResponse(IgniteQueryErrorCode.UNKNOWN, "Unsupported ODBC request: " + req); @@ -185,8 +187,8 @@ public class OdbcRequestHandler implements ClientListenerRequestHandler { { try { - for (IgniteBiTuple<QueryCursor, Iterator> tuple : qryCursors.values()) - tuple.get1().close(); + for (OdbcQueryResults res : qryResults.values()) + res.closeAll(); } finally { busyLock.leaveBusy(); @@ -224,7 +226,7 @@ public class OdbcRequestHandler implements ClientListenerRequestHandler { * @return Response. */ private ClientListenerResponse executeQuery(OdbcQueryExecuteRequest req) { - int cursorCnt = qryCursors.size(); + int cursorCnt = qryResults.size(); if (maxCursors > 0 && cursorCnt >= maxCursors) return new OdbcResponse(IgniteQueryErrorCode.UNKNOWN, "Too many open cursors (either close " + @@ -243,26 +245,22 @@ public class OdbcRequestHandler implements ClientListenerRequestHandler { SqlFieldsQuery qry = makeQuery(req.schema(), sql, req.arguments()); - QueryCursorImpl<List<?>> qryCur = (QueryCursorImpl<List<?>>)ctx.query().querySqlFieldsNoCache(qry, true); - - long rowsAffected = 0; + List<FieldsQueryCursor<List<?>>> cursors = ctx.query().querySqlFieldsNoCache(qry, true, false); - if (!qryCur.isQuery()) { - rowsAffected = getRowsAffected(qryCur); + OdbcQueryResults results = new OdbcQueryResults(cursors); - qryCur.close(); - } + if (!results.hasUnfetchedRows()) + results.closeAll(); else - qryCursors.put(qryId, new IgniteBiTuple<QueryCursor, Iterator>(qryCur, null)); + qryResults.put(qryId, results); - List<?> fieldsMeta = ((QueryCursorImpl) qryCur).fieldsMeta(); - - OdbcQueryExecuteResult res = new OdbcQueryExecuteResult(qryId, convertMetadata(fieldsMeta), rowsAffected); + OdbcQueryExecuteResult res = new OdbcQueryExecuteResult(qryId, results.currentResultSet().fieldsMeta(), + results.rowsAffected()); return new OdbcResponse(res); } catch (Exception e) { - qryCursors.remove(qryId); + qryResults.remove(qryId); U.error(log, "Failed to execute SQL query [reqId=" + req.requestId() + ", req=" + req + ']', e); @@ -277,7 +275,7 @@ public class OdbcRequestHandler implements ClientListenerRequestHandler { * @return Response. */ private ClientListenerResponse executeBatchQuery(OdbcQueryExecuteBatchRequest req) { - long rowsAffected = 0; + List<Long> rowsAffected = new ArrayList<>(req.arguments().length); int currentSet = 0; try { @@ -304,10 +302,10 @@ public class OdbcRequestHandler implements ClientListenerRequestHandler { throw new IgniteException("Batching of parameters only supported for DML statements. [query=" + req.sqlQuery() + ']'); - rowsAffected += getRowsAffected(qryCur); + rowsAffected.add(OdbcUtils.rowsAffected(qryCur)); for (currentSet = 1; currentSet < paramSet.length; ++currentSet) - rowsAffected += executeQuery(qry, paramSet[currentSet]); + rowsAffected.add(executeQuery(qry, paramSet[currentSet])); OdbcQueryExecuteBatchResult res = new OdbcQueryExecuteBatchResult(rowsAffected); @@ -331,29 +329,7 @@ public class OdbcRequestHandler implements ClientListenerRequestHandler { QueryCursor<List<?>> cur = ctx.query().querySqlFieldsNoCache(qry, true); - return getRowsAffected(cur); - } - - /** - * Get affected rows for DML statement. - * @param qryCur Cursor. - * @return Number of table rows affected. - */ - private static long getRowsAffected(QueryCursor<List<?>> qryCur) { - Iterator<List<?>> iter = qryCur.iterator(); - - if (iter.hasNext()) { - List<?> res = iter.next(); - - if (res.size() > 0) { - Long affected = (Long) res.get(0); - - if (affected != null) - return affected; - } - } - - return 0; + return OdbcUtils.rowsAffected(cur); } /** @@ -366,20 +342,20 @@ public class OdbcRequestHandler implements ClientListenerRequestHandler { long queryId = req.queryId(); try { - IgniteBiTuple<QueryCursor, Iterator> tuple = qryCursors.get(queryId); + OdbcQueryResults results = qryResults.get(queryId); - if (tuple == null) + if (results == null) return new OdbcResponse(IgniteQueryErrorCode.UNKNOWN, "Failed to find query with ID: " + queryId); - CloseCursor(tuple, queryId); + CloseCursor(results, queryId); OdbcQueryCloseResult res = new OdbcQueryCloseResult(queryId); return new OdbcResponse(res); } catch (Exception e) { - qryCursors.remove(queryId); + qryResults.remove(queryId); U.error(log, "Failed to close SQL query [reqId=" + req.requestId() + ", req=" + queryId + ']', e); @@ -396,34 +372,21 @@ public class OdbcRequestHandler implements ClientListenerRequestHandler { private ClientListenerResponse fetchQuery(OdbcQueryFetchRequest req) { try { long queryId = req.queryId(); - IgniteBiTuple<QueryCursor, Iterator> tuple = qryCursors.get(queryId); + OdbcQueryResults results = qryResults.get(queryId); - if (tuple == null) + if (results == null) return new OdbcResponse(ClientListenerResponse.STATUS_FAILED, "Failed to find query with ID: " + queryId); - Iterator iter = tuple.get2(); + OdbcResultSet set = results.currentResultSet(); - if (iter == null) { - QueryCursor cur = tuple.get1(); + List<Object> items = set.fetch(req.pageSize()); - assert(cur != null); - - iter = cur.iterator(); - - tuple.put(cur, iter); - } - - List<Object> items = new ArrayList<>(); - - for (int i = 0; i < req.pageSize() && iter.hasNext(); ++i) - items.add(iter.next()); - - boolean lastPage = !iter.hasNext(); + boolean lastPage = !set.hasUnfetchedRows(); // Automatically closing cursor if no more data is available. - if (lastPage) - CloseCursor(tuple, queryId); + if (!results.hasUnfetchedRows()) + CloseCursor(results, queryId); OdbcQueryFetchResult res = new OdbcQueryFetchResult(queryId, items, lastPage); @@ -581,18 +544,55 @@ public class OdbcRequestHandler implements ClientListenerRequestHandler { } /** + * {@link OdbcQueryMoreResultsRequest} command handler. + * + * @param req Execute query request. + * @return Response. + */ + private ClientListenerResponse moreResults(OdbcQueryMoreResultsRequest req) { + try { + long queryId = req.queryId(); + OdbcQueryResults results = qryResults.get(queryId); + + if (results == null) + return new OdbcResponse(ClientListenerResponse.STATUS_FAILED, + "Failed to find query with ID: " + queryId); + + results.nextResultSet(); + + OdbcResultSet set = results.currentResultSet(); + + List<Object> items = set.fetch(req.pageSize()); + + boolean lastPage = !set.hasUnfetchedRows(); + + // Automatically closing cursor if no more data is available. + if (!results.hasUnfetchedRows()) + CloseCursor(results, queryId); + + OdbcQueryMoreResultsResult res = new OdbcQueryMoreResultsResult(queryId, items, lastPage); + + return new OdbcResponse(res); + } + catch (Exception e) { + U.error(log, "Failed to get more SQL query results [reqId=" + + req.requestId() + ", req=" + req + ']', e); + + return exceptionToResult(e); + } + } + + /** * Close cursor. - * @param tuple Query map element. + * @param results Query map element. * @param queryId Query ID. */ - private void CloseCursor(IgniteBiTuple<QueryCursor, Iterator> tuple, long queryId) { - QueryCursor cur = tuple.get1(); - - assert(cur != null); + private void CloseCursor(OdbcQueryResults results, long queryId) { + assert(results != null); - cur.close(); + results.closeAll(); - qryCursors.remove(queryId); + qryResults.remove(queryId); } /** @@ -655,27 +655,6 @@ public class OdbcRequestHandler implements ClientListenerRequestHandler { } /** - * Convert metadata in collection from {@link GridQueryFieldMetadata} to - * {@link OdbcColumnMeta}. - * - * @param meta Internal query field metadata. - * @return Odbc query field metadata. - */ - private static Collection<OdbcColumnMeta> convertMetadata(Collection<?> meta) { - List<OdbcColumnMeta> res = new ArrayList<>(); - - if (meta != null) { - for (Object info : meta) { - assert info instanceof GridQueryFieldMetadata; - - res.add(new OdbcColumnMeta((GridQueryFieldMetadata)info)); - } - } - - return res; - } - - /** * Checks whether string matches SQL pattern. * * @param str String. @@ -694,7 +673,7 @@ public class OdbcRequestHandler implements ClientListenerRequestHandler { * @param e Exception to convert. * @return resulting {@link OdbcResponse}. */ - private OdbcResponse exceptionToBatchResult(Exception e, long rowsAffected, long currentSet) { + private OdbcResponse exceptionToBatchResult(Exception e, Collection<Long> rowsAffected, long currentSet) { OdbcQueryExecuteBatchResult res = new OdbcQueryExecuteBatchResult(rowsAffected, currentSet, OdbcUtils.tryRetrieveSqlErrorCode(e), OdbcUtils.tryRetrieveH2ErrorMessage(e)); http://git-wip-us.apache.org/repos/asf/ignite/blob/87e37975/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcResultSet.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcResultSet.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcResultSet.java new file mode 100644 index 0000000..66b0776 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcResultSet.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.odbc.odbc; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Iterator; +import java.util.List; +import org.apache.ignite.cache.query.FieldsQueryCursor; +import org.apache.ignite.internal.processors.cache.QueryCursorImpl; +import org.apache.ignite.internal.processors.query.GridQueryFieldMetadata; + +/** + * Represents single result set. + */ +public class OdbcResultSet { + /** Cursor. */ + private final QueryCursorImpl<List<?>> cursor; + + /** Current iterator. */ + private Iterator iter; + + /** + * Constructor. + * @param cursor Result set cursor. + */ + OdbcResultSet(FieldsQueryCursor<List<?>> cursor) { + assert cursor instanceof QueryCursorImpl; + + this.cursor = (QueryCursorImpl<List<?>>)cursor; + + if (this.cursor.isQuery()) + iter = this.cursor.iterator(); + else + iter = null; + } + + /** + * @return {@code true} if has non-fetched rows. + */ + public boolean hasUnfetchedRows() { + return iter != null && iter.hasNext(); + } + + /** + * @return Fields metadata of the current result set. + */ + public Collection<OdbcColumnMeta> fieldsMeta() { + return convertMetadata(cursor.fieldsMeta()); + } + + /** + * Fetch up to specified number of rows of result set. + * @param maxSize Maximum number of records to fetch. + * @return List of fetched records. + */ + public List<Object> fetch(int maxSize) { + List<Object> items = new ArrayList<>(maxSize); + + if (iter == null) + return items; + + for (int i = 0; i < maxSize && iter.hasNext(); ++i) + items.add(iter.next()); + + return items; + } + + /** + * Convert metadata in collection from {@link GridQueryFieldMetadata} to + * {@link OdbcColumnMeta}. + * + * @param meta Internal query field metadata. + * @return Odbc query field metadata. + */ + private static Collection<OdbcColumnMeta> convertMetadata(Collection<GridQueryFieldMetadata> meta) { + List<OdbcColumnMeta> res = new ArrayList<>(); + + if (meta != null) { + for (GridQueryFieldMetadata info : meta) + res.add(new OdbcColumnMeta(info)); + } + + return res; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/87e37975/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcUtils.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcUtils.java index 98fa045..4aa864d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcUtils.java @@ -17,7 +17,11 @@ package org.apache.ignite.internal.processors.odbc.odbc; +import java.util.Iterator; +import java.util.List; import org.apache.ignite.IgniteException; +import org.apache.ignite.cache.query.QueryCursor; +import org.apache.ignite.internal.processors.cache.QueryCursorImpl; import org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode; import org.apache.ignite.internal.processors.odbc.SqlListenerDataTypes; import org.apache.ignite.internal.processors.query.IgniteSQLException; @@ -181,4 +185,31 @@ public class OdbcUtils { return msg; } + + /** + * Get affected rows for statement. + * @param qryCur Cursor. + * @return Number of table rows affected, if the query is DML, and -1 otherwise. + */ + public static long rowsAffected(QueryCursor<List<?>> qryCur) { + QueryCursorImpl<List<?>> qryCur0 = (QueryCursorImpl<List<?>>)qryCur; + + if (qryCur0.isQuery()) + return -1; + + Iterator<List<?>> iter = qryCur0.iterator(); + + if (iter.hasNext()) { + List<?> res = iter.next(); + + if (res.size() > 0) { + Long affected = (Long) res.get(0); + + if (affected != null) + return affected; + } + } + + return 0; + } } http://git-wip-us.apache.org/repos/asf/ignite/blob/87e37975/modules/platforms/cpp/examples/odbc-example/src/odbc_example.cpp ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/examples/odbc-example/src/odbc_example.cpp b/modules/platforms/cpp/examples/odbc-example/src/odbc_example.cpp index 29c2a4a..76b5c18 100644 --- a/modules/platforms/cpp/examples/odbc-example/src/odbc_example.cpp +++ b/modules/platforms/cpp/examples/odbc-example/src/odbc_example.cpp @@ -169,7 +169,7 @@ void GetDataWithOdbc(SQLHDBC dbc, const std::string& query) /** * Populate Person cache with sample data. - * + * * @param dbc Database connection. */ void PopulatePerson(SQLHDBC dbc) @@ -387,7 +387,7 @@ void PopulatePerson(SQLHDBC dbc) /** * Populate Organization cache with sample data. - * + * * @param dbc Database connection. */ void PopulateOrganization(SQLHDBC dbc) @@ -525,7 +525,7 @@ void DeletePerson(SQLHDBC dbc, int64_t key) /** * Query tables. - * + * * @param dbc Database connection. */ void QueryData(SQLHDBC dbc) @@ -552,7 +552,7 @@ void QueryData(SQLHDBC dbc) * * @return Exit code. */ -int main() +int main() { IgniteConfiguration cfg; http://git-wip-us.apache.org/repos/asf/ignite/blob/87e37975/modules/platforms/cpp/odbc-test/src/queries_test.cpp ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/odbc-test/src/queries_test.cpp b/modules/platforms/cpp/odbc-test/src/queries_test.cpp index 707669d..c6097e0 100644 --- a/modules/platforms/cpp/odbc-test/src/queries_test.cpp +++ b/modules/platforms/cpp/odbc-test/src/queries_test.cpp @@ -557,19 +557,27 @@ struct QueriesTestSuiteFixture if (!SQL_SUCCEEDED(ret)) BOOST_FAIL(GetOdbcErrorMessage(SQL_HANDLE_STMT, stmt)); - SQLLEN affected = 0; - ret = SQLRowCount(stmt, &affected); + SQLLEN totallyAffected = 0; - if (!SQL_SUCCEEDED(ret)) - BOOST_FAIL(GetOdbcErrorMessage(SQL_HANDLE_STMT, stmt)); + do + { + SQLLEN affected = 0; + ret = SQLRowCount(stmt, &affected); - BOOST_CHECK_EQUAL(affected, expectedToAffect); + if (!SQL_SUCCEEDED(ret)) + BOOST_FAIL(GetOdbcErrorMessage(SQL_HANDLE_STMT, stmt)); - BOOST_CHECKPOINT("Getting next result set"); - ret = SQLMoreResults(stmt); + totallyAffected += affected; - if (ret != SQL_NO_DATA) - BOOST_FAIL(GetOdbcErrorMessage(SQL_HANDLE_STMT, stmt)); + BOOST_CHECKPOINT("Getting next result set"); + + ret = SQLMoreResults(stmt); + + if (ret != SQL_SUCCESS && ret != SQL_NO_DATA) + BOOST_FAIL(GetOdbcErrorMessage(SQL_HANDLE_STMT, stmt)); + } while (ret != SQL_NO_DATA); + + BOOST_CHECK_EQUAL(totallyAffected, expectedToAffect); BOOST_CHECKPOINT("Resetting parameters."); ret = SQLFreeStmt(stmt, SQL_RESET_PARAMS); @@ -2198,4 +2206,178 @@ BOOST_AUTO_TEST_CASE(TestAffectedRows) BOOST_CHECK_EQUAL(affected, 0); } +BOOST_AUTO_TEST_CASE(TestMultipleSelects) +{ + Connect("DRIVER={Apache Ignite};ADDRESS=127.0.0.1:11110;SCHEMA=cache"); + + const int stmtCnt = 10; + + std::stringstream stream; + for (int i = 0; i < stmtCnt; ++i) + stream << "select " << i << "; "; + + stream << '\0'; + + std::string query0 = stream.str(); + std::vector<SQLCHAR> query(query0.begin(), query0.end()); + + SQLRETURN ret = SQLExecDirect(stmt, &query[0], SQL_NTS); + + if (!SQL_SUCCEEDED(ret)) + BOOST_FAIL(GetOdbcErrorMessage(SQL_HANDLE_STMT, stmt)); + + long res = 0; + + BOOST_CHECKPOINT("Binding column"); + ret = SQLBindCol(stmt, 1, SQL_C_SLONG, &res, 0, 0); + + if (!SQL_SUCCEEDED(ret)) + BOOST_FAIL(GetOdbcErrorMessage(SQL_HANDLE_STMT, stmt)); + + for (long i = 0; i < stmtCnt; ++i) + { + ret = SQLFetch(stmt); + + if (!SQL_SUCCEEDED(ret)) + BOOST_FAIL(GetOdbcErrorMessage(SQL_HANDLE_STMT, stmt)); + + BOOST_CHECK_EQUAL(res, i); + + ret = SQLFetch(stmt); + + BOOST_CHECK_EQUAL(ret, SQL_NO_DATA); + + ret = SQLMoreResults(stmt); + + if (i < stmtCnt - 1 && !SQL_SUCCEEDED(ret)) + BOOST_FAIL(GetOdbcErrorMessage(SQL_HANDLE_STMT, stmt)); + else if (i == stmtCnt - 1) + BOOST_CHECK_EQUAL(ret, SQL_NO_DATA); + } +} + +BOOST_AUTO_TEST_CASE(TestMultipleMixedStatements) +{ + Connect("DRIVER={Apache Ignite};ADDRESS=127.0.0.1:11110;SCHEMA=cache"); + + const int stmtCnt = 10; + + std::stringstream stream; + for (int i = 0; i < stmtCnt; ++i) + stream << "select " << i << "; insert into TestType(_key) values(" << i << "); "; + + stream << '\0'; + + std::string query0 = stream.str(); + std::vector<SQLCHAR> query(query0.begin(), query0.end()); + + SQLRETURN ret = SQLExecDirect(stmt, &query[0], SQL_NTS); + + if (!SQL_SUCCEEDED(ret)) + BOOST_FAIL(GetOdbcErrorMessage(SQL_HANDLE_STMT, stmt)); + + long res = 0; + + BOOST_CHECKPOINT("Binding column"); + ret = SQLBindCol(stmt, 1, SQL_C_SLONG, &res, 0, 0); + + if (!SQL_SUCCEEDED(ret)) + BOOST_FAIL(GetOdbcErrorMessage(SQL_HANDLE_STMT, stmt)); + + for (long i = 0; i < stmtCnt; ++i) + { + ret = SQLFetch(stmt); + + if (!SQL_SUCCEEDED(ret)) + BOOST_FAIL(GetOdbcErrorMessage(SQL_HANDLE_STMT, stmt)); + + BOOST_CHECK_EQUAL(res, i); + + ret = SQLFetch(stmt); + + BOOST_CHECK_EQUAL(ret, SQL_NO_DATA); + + ret = SQLMoreResults(stmt); + + if (!SQL_SUCCEEDED(ret)) + BOOST_FAIL(GetOdbcErrorMessage(SQL_HANDLE_STMT, stmt)); + + SQLLEN affected = 0; + ret = SQLRowCount(stmt, &affected); + + if (!SQL_SUCCEEDED(ret)) + BOOST_FAIL(GetOdbcErrorMessage(SQL_HANDLE_STMT, stmt)); + + BOOST_CHECK_EQUAL(affected, 1); + + ret = SQLFetch(stmt); + + BOOST_CHECK_EQUAL(ret, SQL_NO_DATA); + + ret = SQLMoreResults(stmt); + + if (i < stmtCnt - 1 && !SQL_SUCCEEDED(ret)) + BOOST_FAIL(GetOdbcErrorMessage(SQL_HANDLE_STMT, stmt)); + else if (i == stmtCnt - 1) + BOOST_CHECK_EQUAL(ret, SQL_NO_DATA); + } +} + +BOOST_AUTO_TEST_CASE(TestMultipleMixedStatementsNoFetch) +{ + Connect("DRIVER={Apache Ignite};ADDRESS=127.0.0.1:11110;SCHEMA=cache"); + + const int stmtCnt = 10; + + std::stringstream stream; + for (int i = 0; i < stmtCnt; ++i) + stream << "select " << i << "; insert into TestType(_key) values(" << i << "); "; + + stream << '\0'; + + std::string query0 = stream.str(); + std::vector<SQLCHAR> query(query0.begin(), query0.end()); + + SQLRETURN ret = SQLExecDirect(stmt, &query[0], SQL_NTS); + + if (!SQL_SUCCEEDED(ret)) + BOOST_FAIL(GetOdbcErrorMessage(SQL_HANDLE_STMT, stmt)); + + long res = 0; + + BOOST_CHECKPOINT("Binding column"); + ret = SQLBindCol(stmt, 1, SQL_C_SLONG, &res, 0, 0); + + if (!SQL_SUCCEEDED(ret)) + BOOST_FAIL(GetOdbcErrorMessage(SQL_HANDLE_STMT, stmt)); + + for (long i = 0; i < stmtCnt; ++i) + { + ret = SQLMoreResults(stmt); + + if (!SQL_SUCCEEDED(ret)) + BOOST_FAIL(GetOdbcErrorMessage(SQL_HANDLE_STMT, stmt)); + + SQLLEN affected = 0; + ret = SQLRowCount(stmt, &affected); + + if (!SQL_SUCCEEDED(ret)) + BOOST_FAIL(GetOdbcErrorMessage(SQL_HANDLE_STMT, stmt)); + + BOOST_CHECK_EQUAL(affected, 1); + + ret = SQLFetch(stmt); + + BOOST_CHECK_EQUAL(ret, SQL_NO_DATA); + + ret = SQLMoreResults(stmt); + + if (i < stmtCnt - 1 && !SQL_SUCCEEDED(ret)) + BOOST_FAIL(GetOdbcErrorMessage(SQL_HANDLE_STMT, stmt)); + else if (i == stmtCnt - 1) + BOOST_CHECK_EQUAL(ret, SQL_NO_DATA); + } +} + + BOOST_AUTO_TEST_SUITE_END() http://git-wip-us.apache.org/repos/asf/ignite/blob/87e37975/modules/platforms/cpp/odbc/include/ignite/odbc/message.h ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/odbc/include/ignite/odbc/message.h b/modules/platforms/cpp/odbc/include/ignite/odbc/message.h index dda0ba9..8d6c906 100644 --- a/modules/platforms/cpp/odbc/include/ignite/odbc/message.h +++ b/modules/platforms/cpp/odbc/include/ignite/odbc/message.h @@ -60,7 +60,9 @@ namespace ignite GET_PARAMS_METADATA = 7, - EXECUTE_SQL_QUERY_BATCH = 8 + EXECUTE_SQL_QUERY_BATCH = 8, + + QUERY_MORE_RESULTS = 9 }; }; @@ -387,6 +389,47 @@ namespace ignite }; /** + * Query fetch request. + */ + class QueryMoreResultsRequest + { + public: + /** + * Constructor. + * + * @param queryId Query ID. + * @param pageSize Required page size. + */ + QueryMoreResultsRequest(int64_t queryId, int32_t pageSize) : + queryId(queryId), + pageSize(pageSize) + { + // No-op. + } + + /** + * Destructor. + */ + ~QueryMoreResultsRequest() + { + // No-op. + } + + /** + * Write request using provided writer. + * @param writer Writer. + */ + void Write(impl::binary::BinaryWriterImpl& writer) const; + + private: + /** Query ID. */ + int64_t queryId; + + /** SQL query. */ + int32_t pageSize; + }; + + /** * General response. */ class Response @@ -575,7 +618,7 @@ namespace ignite * Get affected rows number. * @return Number of rows affected by the query. */ - int64_t GetAffectedRows() + const std::vector<int64_t>& GetAffectedRows() { return affectedRows; } @@ -585,7 +628,7 @@ namespace ignite * Read response using provided reader. * @param reader Reader. */ - virtual void ReadOnSuccess(impl::binary::BinaryReaderImpl& reader, const ProtocolVersion&); + virtual void ReadOnSuccess(impl::binary::BinaryReaderImpl& reader, const ProtocolVersion& ver); /** Query ID. */ int64_t queryId; @@ -594,7 +637,7 @@ namespace ignite meta::ColumnMetaVector meta; /** Number of affected rows. */ - int64_t affectedRows; + std::vector<int64_t> affectedRows; }; /** @@ -617,7 +660,7 @@ namespace ignite * Affected rows. * @return Affected rows. */ - int64_t GetAffectedRows() const + const std::vector<int64_t>& GetAffectedRows() const { return affectedRows; } @@ -628,7 +671,7 @@ namespace ignite */ int64_t GetErrorSetIdx() const { - return affectedRows; + return static_cast<int64_t>(affectedRows.size()); } /** @@ -658,7 +701,7 @@ namespace ignite virtual void ReadOnSuccess(impl::binary::BinaryReaderImpl& reader, const ProtocolVersion& ver); /** Affected rows. */ - int64_t affectedRows; + std::vector<int64_t> affectedRows; /** Index of the set which caused an error. */ int64_t errorSetIdx; @@ -817,6 +860,46 @@ namespace ignite /** Columns metadata. */ std::vector<int8_t> typeIds; }; + + /** + * Query fetch response. + */ + class QueryMoreResultsResponse : public Response + { + public: + /** + * Constructor. + * @param resultPage Result page. + */ + QueryMoreResultsResponse(ResultPage& resultPage); + + /** + * Destructor. + */ + virtual ~QueryMoreResultsResponse(); + + /** + * Get query ID. + * @return Query ID. + */ + int64_t GetQueryId() const + { + return queryId; + } + + private: + /** + * Read response using provided reader. + * @param reader Reader. + */ + virtual void ReadOnSuccess(impl::binary::BinaryReaderImpl& reader, const ProtocolVersion&); + + /** Query ID. */ + int64_t queryId; + + /** Result page. */ + ResultPage& resultPage; + }; } } http://git-wip-us.apache.org/repos/asf/ignite/blob/87e37975/modules/platforms/cpp/odbc/include/ignite/odbc/query/batch_query.h ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/odbc/include/ignite/odbc/query/batch_query.h b/modules/platforms/cpp/odbc/include/ignite/odbc/query/batch_query.h index 5e741ed..1e6c869 100644 --- a/modules/platforms/cpp/odbc/include/ignite/odbc/query/batch_query.h +++ b/modules/platforms/cpp/odbc/include/ignite/odbc/query/batch_query.h @@ -106,6 +106,13 @@ namespace ignite virtual int64_t AffectedRows() const; /** + * Move to the next result set. + * + * @return Operaion result. + */ + virtual SqlResult::Type NextResultSet(); + + /** * Get SQL query string. * * @return SQL query string. @@ -142,16 +149,13 @@ namespace ignite meta::ColumnMetaVector resultMeta; /** Number of rows affected. */ - int64_t rowsAffected; + std::vector<int64_t> rowsAffected; - /** Number of parameter sets successfully processed. */ - int64_t setsProcessed; + /** Rows affected index. */ + size_t rowsAffectedIdx; /** Query executed. */ bool executed; - - /** Data retrieved. */ - bool dataRetrieved; }; } } http://git-wip-us.apache.org/repos/asf/ignite/blob/87e37975/modules/platforms/cpp/odbc/include/ignite/odbc/query/column_metadata_query.h ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/odbc/include/ignite/odbc/query/column_metadata_query.h b/modules/platforms/cpp/odbc/include/ignite/odbc/query/column_metadata_query.h index 875b1ce..d742490 100644 --- a/modules/platforms/cpp/odbc/include/ignite/odbc/query/column_metadata_query.h +++ b/modules/platforms/cpp/odbc/include/ignite/odbc/query/column_metadata_query.h @@ -105,6 +105,13 @@ namespace ignite */ virtual int64_t AffectedRows() const; + /** + * Move to the next result set. + * + * @return Operatoin result. + */ + virtual SqlResult::Type NextResultSet(); + private: IGNITE_NO_COPY_ASSIGNMENT(ColumnMetadataQuery); http://git-wip-us.apache.org/repos/asf/ignite/blob/87e37975/modules/platforms/cpp/odbc/include/ignite/odbc/query/data_query.h ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/odbc/include/ignite/odbc/query/data_query.h b/modules/platforms/cpp/odbc/include/ignite/odbc/query/data_query.h index 5a4a978..c476000 100644 --- a/modules/platforms/cpp/odbc/include/ignite/odbc/query/data_query.h +++ b/modules/platforms/cpp/odbc/include/ignite/odbc/query/data_query.h @@ -106,6 +106,13 @@ namespace ignite virtual int64_t AffectedRows() const; /** + * Move to the next result set. + * + * @return Operaion result. + */ + virtual SqlResult::Type NextResultSet(); + + /** * Get SQL query string. * * @return SQL query string. @@ -139,6 +146,13 @@ namespace ignite * @return Result. */ SqlResult::Type MakeRequestFetch(); + + /** + * Make next result set request and use response to set internal state. + * + * @return Result. + */ + SqlResult::Type MakeRequestMoreResults(); /** * Close query. @@ -163,7 +177,13 @@ namespace ignite std::auto_ptr<Cursor> cursor; /** Number of rows affected. */ - int64_t rowsAffected; + std::vector<int64_t> rowsAffected; + + /** Rows affected index. */ + size_t rowsAffectedIdx; + + /** Cached next result page. */ + std::auto_ptr<ResultPage> cachedNextPage; }; } } http://git-wip-us.apache.org/repos/asf/ignite/blob/87e37975/modules/platforms/cpp/odbc/include/ignite/odbc/query/foreign_keys_query.h ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/odbc/include/ignite/odbc/query/foreign_keys_query.h b/modules/platforms/cpp/odbc/include/ignite/odbc/query/foreign_keys_query.h index 7d60728..9abd8b2 100644 --- a/modules/platforms/cpp/odbc/include/ignite/odbc/query/foreign_keys_query.h +++ b/modules/platforms/cpp/odbc/include/ignite/odbc/query/foreign_keys_query.h @@ -105,6 +105,13 @@ namespace ignite * @return Number of rows affected by the statement. */ virtual int64_t AffectedRows() const; + + /** + * Move to the next result set. + * + * @return Operatoin result. + */ + virtual SqlResult::Type NextResultSet(); private: IGNITE_NO_COPY_ASSIGNMENT(ForeignKeysQuery); http://git-wip-us.apache.org/repos/asf/ignite/blob/87e37975/modules/platforms/cpp/odbc/include/ignite/odbc/query/primary_keys_query.h ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/odbc/include/ignite/odbc/query/primary_keys_query.h b/modules/platforms/cpp/odbc/include/ignite/odbc/query/primary_keys_query.h index 65bac33..42f7e26 100644 --- a/modules/platforms/cpp/odbc/include/ignite/odbc/query/primary_keys_query.h +++ b/modules/platforms/cpp/odbc/include/ignite/odbc/query/primary_keys_query.h @@ -102,6 +102,13 @@ namespace ignite * @return Number of rows affected by the statement. */ virtual int64_t AffectedRows() const; + + /** + * Move to the next result set. + * + * @return Operatoin result. + */ + virtual SqlResult::Type NextResultSet(); private: IGNITE_NO_COPY_ASSIGNMENT(PrimaryKeysQuery); http://git-wip-us.apache.org/repos/asf/ignite/blob/87e37975/modules/platforms/cpp/odbc/include/ignite/odbc/query/query.h ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/odbc/include/ignite/odbc/query/query.h b/modules/platforms/cpp/odbc/include/ignite/odbc/query/query.h index 701f5c8..9d54b90 100644 --- a/modules/platforms/cpp/odbc/include/ignite/odbc/query/query.h +++ b/modules/platforms/cpp/odbc/include/ignite/odbc/query/query.h @@ -131,6 +131,13 @@ namespace ignite virtual int64_t AffectedRows() const = 0; /** + * Move to the next result set. + * + * @return Operatoin result. + */ + virtual SqlResult::Type NextResultSet() = 0; + + /** * Get query type. * * @return Query type. http://git-wip-us.apache.org/repos/asf/ignite/blob/87e37975/modules/platforms/cpp/odbc/include/ignite/odbc/query/special_columns_query.h ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/odbc/include/ignite/odbc/query/special_columns_query.h b/modules/platforms/cpp/odbc/include/ignite/odbc/query/special_columns_query.h index 0f6660f..d6a5c44 100644 --- a/modules/platforms/cpp/odbc/include/ignite/odbc/query/special_columns_query.h +++ b/modules/platforms/cpp/odbc/include/ignite/odbc/query/special_columns_query.h @@ -105,6 +105,13 @@ namespace ignite */ virtual int64_t AffectedRows() const; + /** + * Move to the next result set. + * + * @return Operatoin result. + */ + virtual SqlResult::Type NextResultSet(); + private: IGNITE_NO_COPY_ASSIGNMENT(SpecialColumnsQuery); http://git-wip-us.apache.org/repos/asf/ignite/blob/87e37975/modules/platforms/cpp/odbc/include/ignite/odbc/query/table_metadata_query.h ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/odbc/include/ignite/odbc/query/table_metadata_query.h b/modules/platforms/cpp/odbc/include/ignite/odbc/query/table_metadata_query.h index acd3f49..759bfd6 100644 --- a/modules/platforms/cpp/odbc/include/ignite/odbc/query/table_metadata_query.h +++ b/modules/platforms/cpp/odbc/include/ignite/odbc/query/table_metadata_query.h @@ -106,6 +106,13 @@ namespace ignite */ virtual int64_t AffectedRows() const; + /** + * Move to the next result set. + * + * @return Operatoin result. + */ + virtual SqlResult::Type NextResultSet(); + private: IGNITE_NO_COPY_ASSIGNMENT(TableMetadataQuery); http://git-wip-us.apache.org/repos/asf/ignite/blob/87e37975/modules/platforms/cpp/odbc/include/ignite/odbc/query/type_info_query.h ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/odbc/include/ignite/odbc/query/type_info_query.h b/modules/platforms/cpp/odbc/include/ignite/odbc/query/type_info_query.h index 00cca08..974ee01 100644 --- a/modules/platforms/cpp/odbc/include/ignite/odbc/query/type_info_query.h +++ b/modules/platforms/cpp/odbc/include/ignite/odbc/query/type_info_query.h @@ -96,6 +96,13 @@ namespace ignite */ virtual int64_t AffectedRows() const; + /** + * Move to the next result set. + * + * @return Operatoin result. + */ + virtual SqlResult::Type NextResultSet(); + private: IGNITE_NO_COPY_ASSIGNMENT(TypeInfoQuery); http://git-wip-us.apache.org/repos/asf/ignite/blob/87e37975/modules/platforms/cpp/odbc/include/ignite/odbc/statement.h ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/odbc/include/ignite/odbc/statement.h b/modules/platforms/cpp/odbc/include/ignite/odbc/statement.h index 27e883d..6d4b3ab 100644 --- a/modules/platforms/cpp/odbc/include/ignite/odbc/statement.h +++ b/modules/platforms/cpp/odbc/include/ignite/odbc/statement.h @@ -264,11 +264,11 @@ namespace ignite bool DataAvailable() const; /** - * Next results. + * More results. * * Move to next result set or affected rows number. */ - void NextResults(); + void MoreResults(); /** * Get column attribute. @@ -581,7 +581,7 @@ namespace ignite * * @return Operation result. */ - SqlResult::Type InternalNextResults(); + SqlResult::Type InternalMoreResults(); /** * Get column attribute. http://git-wip-us.apache.org/repos/asf/ignite/blob/87e37975/modules/platforms/cpp/odbc/src/cursor.cpp ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/odbc/src/cursor.cpp b/modules/platforms/cpp/odbc/src/cursor.cpp index b41f5b1..09e96cb 100644 --- a/modules/platforms/cpp/odbc/src/cursor.cpp +++ b/modules/platforms/cpp/odbc/src/cursor.cpp @@ -66,7 +66,7 @@ namespace ignite bool Cursor::IsClosedRemotely() const { - return currentPage.get() && currentPage->IsLast(); + return !currentPage.get() || currentPage->IsLast(); } void Cursor::UpdateData(std::auto_ptr<ResultPage>& newPage) http://git-wip-us.apache.org/repos/asf/ignite/blob/87e37975/modules/platforms/cpp/odbc/src/message.cpp ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/odbc/src/message.cpp b/modules/platforms/cpp/odbc/src/message.cpp index 4767c74..5595ddb 100644 --- a/modules/platforms/cpp/odbc/src/message.cpp +++ b/modules/platforms/cpp/odbc/src/message.cpp @@ -18,6 +18,29 @@ #include "ignite/odbc/message.h" #include "ignite/odbc/utility.h" +namespace +{ + using namespace ignite; + using namespace odbc; + + void ReadAffectedRows(impl::binary::BinaryReaderImpl& reader, const ProtocolVersion& protocolVersion, + std::vector<int64_t>& affectedRows) + { + affectedRows.clear(); + + if (protocolVersion < ProtocolVersion::VERSION_2_3_0) + affectedRows.push_back(reader.ReadInt64()); + else + { + int32_t len = reader.ReadInt32(); + + affectedRows.reserve(static_cast<size_t>(len)); + for (int32_t i = 0; i < len; ++i) + affectedRows.push_back(reader.ReadInt64()); + } + } +} + namespace ignite { namespace odbc @@ -152,6 +175,7 @@ namespace ignite void QueryFetchRequest::Write(impl::binary::BinaryWriterImpl& writer) const { writer.WriteInt8(RequestType::FETCH_SQL_QUERY); + writer.WriteInt64(queryId); writer.WriteInt32(pageSize); } @@ -212,6 +236,14 @@ namespace ignite writer.WriteObject<std::string>(sqlQuery); } + void QueryMoreResultsRequest::Write(impl::binary::BinaryWriterImpl& writer) const + { + writer.WriteInt8(RequestType::QUERY_MORE_RESULTS); + + writer.WriteInt64(queryId); + writer.WriteInt32(pageSize); + } + Response::Response() : status(ResponseStatus::UNKNOWN_ERROR), error() @@ -299,13 +331,13 @@ namespace ignite // No-op. } - void QueryExecuteResponse::ReadOnSuccess(impl::binary::BinaryReaderImpl& reader, const ProtocolVersion&) + void QueryExecuteResponse::ReadOnSuccess(impl::binary::BinaryReaderImpl& reader, const ProtocolVersion& ver) { queryId = reader.ReadInt64(); meta::ReadColumnMetaVector(reader, meta); - affectedRows = reader.ReadInt64(); + ReadAffectedRows(reader, ver, affectedRows); } QueryExecuteBatchResponse::QueryExecuteBatchResponse(): @@ -325,7 +357,8 @@ namespace ignite void QueryExecuteBatchResponse::ReadOnSuccess(impl::binary::BinaryReaderImpl& reader, const ProtocolVersion& ver) { bool success = reader.ReadBool(); - affectedRows = reader.ReadInt64(); + + ReadAffectedRows(reader, ver, affectedRows); if (!success) { @@ -337,7 +370,9 @@ namespace ignite } } - QueryFetchResponse::QueryFetchResponse(ResultPage& resultPage): queryId(0), resultPage(resultPage) + QueryFetchResponse::QueryFetchResponse(ResultPage& resultPage) : + queryId(0), + resultPage(resultPage) { // No-op. } @@ -398,6 +433,25 @@ namespace ignite { utility::ReadByteArray(reader, typeIds); } + + QueryMoreResultsResponse::QueryMoreResultsResponse(ResultPage & resultPage) : + queryId(0), + resultPage(resultPage) + { + // No-op. + } + + QueryMoreResultsResponse::~QueryMoreResultsResponse() + { + // No-op. + } + + void QueryMoreResultsResponse::ReadOnSuccess(impl::binary::BinaryReaderImpl& reader, const ProtocolVersion&) + { + queryId = reader.ReadInt64(); + + resultPage.Read(reader); + } } } http://git-wip-us.apache.org/repos/asf/ignite/blob/87e37975/modules/platforms/cpp/odbc/src/odbc.cpp ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/odbc/src/odbc.cpp b/modules/platforms/cpp/odbc/src/odbc.cpp index 8121a3b..1480d0b 100644 --- a/modules/platforms/cpp/odbc/src/odbc.cpp +++ b/modules/platforms/cpp/odbc/src/odbc.cpp @@ -593,7 +593,7 @@ namespace ignite if (!statement) return SQL_INVALID_HANDLE; - statement->NextResults(); + statement->MoreResults(); return statement->GetDiagnosticRecords().GetReturnCode(); } http://git-wip-us.apache.org/repos/asf/ignite/blob/87e37975/modules/platforms/cpp/odbc/src/query/batch_query.cpp ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/odbc/src/query/batch_query.cpp b/modules/platforms/cpp/odbc/src/query/batch_query.cpp index fc8fda4..29d11ca 100644 --- a/modules/platforms/cpp/odbc/src/query/batch_query.cpp +++ b/modules/platforms/cpp/odbc/src/query/batch_query.cpp @@ -34,10 +34,9 @@ namespace ignite sql(sql), params(params), resultMeta(), - rowsAffected(0), - setsProcessed(0), - executed(false), - dataRetrieved(false) + rowsAffected(), + rowsAffectedIdx(0), + executed(false) { // No-op. } @@ -62,6 +61,9 @@ namespace ignite int32_t processed = 0; + rowsAffected.clear(); + rowsAffected.reserve(static_cast<size_t>(params.GetParamSetSize())); + do { int32_t currentPageSize = std::min(maxPageSize, rowNum - processed); bool lastPage = currentPageSize == rowNum - processed; @@ -71,7 +73,7 @@ namespace ignite processed += currentPageSize; } while (res == SqlResult::AI_SUCCESS && processed < rowNum); - params.SetParamsProcessed(static_cast<SqlUlen>(setsProcessed)); + params.SetParamsProcessed(static_cast<SqlUlen>(rowsAffected.size())); return res; } @@ -90,17 +92,7 @@ namespace ignite return SqlResult::AI_ERROR; } - if (dataRetrieved) - return SqlResult::AI_NO_DATA; - - app::ColumnBindingMap::iterator it = columnBindings.find(1); - - if (it != columnBindings.end()) - it->second.PutInt64(rowsAffected); - - dataRetrieved = true; - - return SqlResult::AI_SUCCESS; + return SqlResult::AI_NO_DATA; } SqlResult::Type BatchQuery::GetColumn(uint16_t columnIdx, app::ApplicationDataBuffer& buffer) @@ -112,31 +104,18 @@ namespace ignite return SqlResult::AI_ERROR; } - if (dataRetrieved) - { - diag.AddStatusRecord(SqlState::S24000_INVALID_CURSOR_STATE, - "Cursor has reached end of the result set."); - - return SqlResult::AI_ERROR; - } - - if (columnIdx != 1) - { - std::stringstream builder; - builder << "Column with id " << columnIdx << " is not available in result set."; - - diag.AddStatusRecord(SqlState::SHY000_GENERAL_ERROR, builder.str()); + diag.AddStatusRecord(SqlState::S24000_INVALID_CURSOR_STATE, + "Cursor has reached end of the result set."); - return SqlResult::AI_ERROR; - } - - buffer.PutInt64(rowsAffected); - - return SqlResult::AI_SUCCESS; + return SqlResult::AI_ERROR; } SqlResult::Type BatchQuery::Close() { + executed = false; + rowsAffected.clear(); + rowsAffectedIdx = 0; + return SqlResult::AI_SUCCESS; } @@ -147,7 +126,21 @@ namespace ignite int64_t BatchQuery::AffectedRows() const { - return rowsAffected; + int64_t affected = rowsAffectedIdx < rowsAffected.size() ? rowsAffected[rowsAffectedIdx] : 0; + return affected < 0 ? 0 : affected; + } + + SqlResult::Type BatchQuery::NextResultSet() + { + if (rowsAffectedIdx + 1 >= rowsAffected.size()) + { + Close(); + return SqlResult::AI_NO_DATA; + } + + ++rowsAffectedIdx; + + return SqlResult::AI_SUCCESS; } SqlResult::Type BatchQuery::MakeRequestExecuteBatch(SqlUlen begin, SqlUlen end, bool last) @@ -183,25 +176,20 @@ namespace ignite return SqlResult::AI_ERROR; } - rowsAffected += rsp.GetAffectedRows(); - LOG_MSG("rowsAffected: " << rowsAffected); + rowsAffected.insert(rowsAffected.end(), rsp.GetAffectedRows().begin(), rsp.GetAffectedRows().end()); + LOG_MSG("Affected rows list size: " << rowsAffected.size()); if (!rsp.GetErrorMessage().empty()) { LOG_MSG("Error: " << rsp.GetErrorMessage()); - - setsProcessed += rsp.GetErrorSetIdx(); - LOG_MSG("setsProcessed: " << setsProcessed); + LOG_MSG("Sets Processed: " << rowsAffected.size()); diag.AddStatusRecord(ResponseStatusToSqlState(rsp.GetErrorCode()), rsp.GetErrorMessage(), - static_cast<int32_t>(setsProcessed), 0); + static_cast<int32_t>(rowsAffected.size()), 0); return SqlResult::AI_SUCCESS_WITH_INFO; } - setsProcessed += end - begin; - LOG_MSG("setsProcessed: " << setsProcessed); - return SqlResult::AI_SUCCESS; } } http://git-wip-us.apache.org/repos/asf/ignite/blob/87e37975/modules/platforms/cpp/odbc/src/query/column_metadata_query.cpp ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/odbc/src/query/column_metadata_query.cpp b/modules/platforms/cpp/odbc/src/query/column_metadata_query.cpp index 0910612..0a09159 100644 --- a/modules/platforms/cpp/odbc/src/query/column_metadata_query.cpp +++ b/modules/platforms/cpp/odbc/src/query/column_metadata_query.cpp @@ -289,6 +289,11 @@ namespace ignite return 0; } + SqlResult::Type ColumnMetadataQuery::NextResultSet() + { + return SqlResult::AI_NO_DATA; + } + SqlResult::Type ColumnMetadataQuery::MakeRequestGetColumnsMeta() { QueryGetColumnsMetaRequest req(schema, table, column); http://git-wip-us.apache.org/repos/asf/ignite/blob/87e37975/modules/platforms/cpp/odbc/src/query/data_query.cpp ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/odbc/src/query/data_query.cpp b/modules/platforms/cpp/odbc/src/query/data_query.cpp index 80fcc69..012b026 100644 --- a/modules/platforms/cpp/odbc/src/query/data_query.cpp +++ b/modules/platforms/cpp/odbc/src/query/data_query.cpp @@ -36,7 +36,9 @@ namespace ignite params(params), resultMeta(), cursor(), - rowsAffected(0) + rowsAffected(), + rowsAffectedIdx(0), + cachedNextPage() { // No-op. } @@ -79,10 +81,15 @@ namespace ignite if (cursor->NeedDataUpdate()) { - SqlResult::Type result = MakeRequestFetch(); + if (cachedNextPage.get()) + cursor->UpdateData(cachedNextPage); + else + { + SqlResult::Type result = MakeRequestFetch(); - if (result != SqlResult::AI_SUCCESS) - return result; + if (result != SqlResult::AI_SUCCESS) + return result; + } } if (!cursor->HasData()) @@ -168,6 +175,10 @@ namespace ignite cursor.reset(); resultMeta.clear(); + + rowsAffectedIdx = 0; + + rowsAffected.clear(); } return result; @@ -180,7 +191,25 @@ namespace ignite int64_t DataQuery::AffectedRows() const { - return rowsAffected; + int64_t affected = rowsAffectedIdx < rowsAffected.size() ? rowsAffected[rowsAffectedIdx] : 0; + return affected < 0 ? 0 : affected; + } + + SqlResult::Type DataQuery::NextResultSet() + { + if (rowsAffectedIdx + 1 >= rowsAffected.size()) + { + InternalClose(); + + return SqlResult::AI_NO_DATA; + } + + SqlResult::Type res = MakeRequestMoreResults(); + + if (res == SqlResult::AI_SUCCESS) + ++rowsAffectedIdx; + + return res; } SqlResult::Type DataQuery::MakeRequestExecute() @@ -221,7 +250,7 @@ namespace ignite rowsAffected = rsp.GetAffectedRows(); LOG_MSG("Query id: " << rsp.GetQueryId()); - LOG_MSG("Affected Rows: " << rowsAffected); + LOG_MSG("Affected Rows list size: " << rowsAffected.size()); for (size_t i = 0; i < resultMeta.size(); ++i) { @@ -231,10 +260,9 @@ namespace ignite << "\n[" << i << "] ColumnType: " << static_cast<int32_t>(resultMeta[i].GetDataType())); } - if (rowsAffected > 0) - cursor.reset(); - else - cursor.reset(new Cursor(rsp.GetQueryId())); + cursor.reset(new Cursor(rsp.GetQueryId())); + + rowsAffectedIdx = 0; return SqlResult::AI_SUCCESS; } @@ -308,10 +336,55 @@ namespace ignite return SqlResult::AI_ERROR; } + LOG_MSG("Page size: " << resultPage->GetSize()); + LOG_MSG("Page is last: " << resultPage->IsLast()); + cursor->UpdateData(resultPage); return SqlResult::AI_SUCCESS; } + + SqlResult::Type DataQuery::MakeRequestMoreResults() + { + std::auto_ptr<ResultPage> resultPage(new ResultPage()); + + QueryMoreResultsRequest req(cursor->GetQueryId(), connection.GetConfiguration().GetPageSize()); + QueryMoreResultsResponse rsp(*resultPage); + + try + { + connection.SyncMessage(req, rsp); + } + catch (const OdbcError& err) + { + diag.AddStatusRecord(err); + + return SqlResult::AI_ERROR; + } + catch (const IgniteError& err) + { + diag.AddStatusRecord(SqlState::SHY000_GENERAL_ERROR, err.GetText()); + + return SqlResult::AI_ERROR; + } + + if (rsp.GetStatus() != ResponseStatus::SUCCESS) + { + LOG_MSG("Error: " << rsp.GetError()); + + diag.AddStatusRecord(ResponseStatusToSqlState(rsp.GetStatus()), rsp.GetError()); + + return SqlResult::AI_ERROR; + } + + LOG_MSG("Page size: " << resultPage->GetSize()); + LOG_MSG("Page is last: " << resultPage->IsLast()); + + cachedNextPage = resultPage; + cursor.reset(new Cursor(rsp.GetQueryId())); + + return SqlResult::AI_SUCCESS; + } } } } http://git-wip-us.apache.org/repos/asf/ignite/blob/87e37975/modules/platforms/cpp/odbc/src/query/foreign_keys_query.cpp ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/odbc/src/query/foreign_keys_query.cpp b/modules/platforms/cpp/odbc/src/query/foreign_keys_query.cpp index 4ca7709..c22a3aa 100644 --- a/modules/platforms/cpp/odbc/src/query/foreign_keys_query.cpp +++ b/modules/platforms/cpp/odbc/src/query/foreign_keys_query.cpp @@ -125,6 +125,11 @@ namespace ignite { return 0; } + + SqlResult::Type ForeignKeysQuery::NextResultSet() + { + return SqlResult::AI_NO_DATA; + } } } } http://git-wip-us.apache.org/repos/asf/ignite/blob/87e37975/modules/platforms/cpp/odbc/src/query/primary_keys_query.cpp ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/odbc/src/query/primary_keys_query.cpp b/modules/platforms/cpp/odbc/src/query/primary_keys_query.cpp index ef99db3..bb6f908 100644 --- a/modules/platforms/cpp/odbc/src/query/primary_keys_query.cpp +++ b/modules/platforms/cpp/odbc/src/query/primary_keys_query.cpp @@ -207,6 +207,11 @@ namespace ignite { return 0; } + + SqlResult::Type PrimaryKeysQuery::NextResultSet() + { + return SqlResult::AI_NO_DATA; + } } } } http://git-wip-us.apache.org/repos/asf/ignite/blob/87e37975/modules/platforms/cpp/odbc/src/query/special_columns_query.cpp ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/odbc/src/query/special_columns_query.cpp b/modules/platforms/cpp/odbc/src/query/special_columns_query.cpp index b0f534c..01c7b96 100644 --- a/modules/platforms/cpp/odbc/src/query/special_columns_query.cpp +++ b/modules/platforms/cpp/odbc/src/query/special_columns_query.cpp @@ -116,6 +116,11 @@ namespace ignite { return 0; } + + SqlResult::Type SpecialColumnsQuery::NextResultSet() + { + return SqlResult::AI_NO_DATA; + } } } } http://git-wip-us.apache.org/repos/asf/ignite/blob/87e37975/modules/platforms/cpp/odbc/src/query/table_metadata_query.cpp ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/odbc/src/query/table_metadata_query.cpp b/modules/platforms/cpp/odbc/src/query/table_metadata_query.cpp index 93f1f79..53fe49d 100644 --- a/modules/platforms/cpp/odbc/src/query/table_metadata_query.cpp +++ b/modules/platforms/cpp/odbc/src/query/table_metadata_query.cpp @@ -215,6 +215,11 @@ namespace ignite return 0; } + SqlResult::Type TableMetadataQuery::NextResultSet() + { + return SqlResult::AI_NO_DATA; + } + SqlResult::Type TableMetadataQuery::MakeRequestGetTablesMeta() { QueryGetTablesMetaRequest req(catalog, schema, table, tableType); http://git-wip-us.apache.org/repos/asf/ignite/blob/87e37975/modules/platforms/cpp/odbc/src/query/type_info_query.cpp ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/odbc/src/query/type_info_query.cpp b/modules/platforms/cpp/odbc/src/query/type_info_query.cpp index b4efca0..939458a 100644 --- a/modules/platforms/cpp/odbc/src/query/type_info_query.cpp +++ b/modules/platforms/cpp/odbc/src/query/type_info_query.cpp @@ -401,6 +401,11 @@ namespace ignite { return 0; } + + SqlResult::Type TypeInfoQuery::NextResultSet() + { + return SqlResult::AI_NO_DATA; + } } } } http://git-wip-us.apache.org/repos/asf/ignite/blob/87e37975/modules/platforms/cpp/odbc/src/statement.cpp ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/odbc/src/statement.cpp b/modules/platforms/cpp/odbc/src/statement.cpp index 36c1a0b..b167d44 100644 --- a/modules/platforms/cpp/odbc/src/statement.cpp +++ b/modules/platforms/cpp/odbc/src/statement.cpp @@ -824,19 +824,21 @@ namespace ignite return currentQuery.get() && currentQuery->DataAvailable(); } - void Statement::NextResults() + void Statement::MoreResults() { - IGNITE_ODBC_API_CALL(InternalNextResults()); + IGNITE_ODBC_API_CALL(InternalMoreResults()); } - SqlResult::Type Statement::InternalNextResults() + SqlResult::Type Statement::InternalMoreResults() { if (!currentQuery.get()) - return SqlResult::AI_NO_DATA; + { + AddStatusRecord(SqlState::SHY010_SEQUENCE_ERROR, "Query is not executed."); - SqlResult::Type result = currentQuery->Close(); + return SqlResult::AI_ERROR; + } - return result == SqlResult::AI_SUCCESS ? SqlResult::AI_NO_DATA : result; + return currentQuery->NextResultSet(); } void Statement::GetColumnAttribute(uint16_t colIdx, uint16_t attrId,
