Streaming.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/d57d406f Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/d57d406f Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/d57d406f Branch: refs/heads/ignite-6022-proto Commit: d57d406f13db2457dd0e315dcf3d992a6af1f9b0 Parents: 03f9fe7 Author: devozerov <[email protected]> Authored: Mon Dec 18 14:34:14 2017 +0300 Committer: devozerov <[email protected]> Committed: Mon Dec 18 14:34:14 2017 +0300 ---------------------------------------------------------------------- .../cache/query/SqlFieldsQueryEx.java | 13 +++ .../odbc/jdbc/JdbcRequestHandler.java | 5 ++ .../query/h2/DmlStatementsProcessor.java | 85 ++++++++++++++------ .../processors/query/h2/IgniteH2Indexing.java | 15 ++++ 4 files changed, 93 insertions(+), 25 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/d57d406f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/SqlFieldsQueryEx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/SqlFieldsQueryEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/SqlFieldsQueryEx.java index fb098a7..2d46d28 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/SqlFieldsQueryEx.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/SqlFieldsQueryEx.java @@ -38,6 +38,8 @@ public final class SqlFieldsQueryEx extends SqlFieldsQuery { /** Batched arguments. */ private List<Object[]> batchedArgs; + private boolean streaming; + public SqlFieldsQueryEx(String sql, Boolean isQry) { super(sql); @@ -63,6 +65,7 @@ public final class SqlFieldsQueryEx extends SqlFieldsQuery { this.isQry = qry.isQry; this.skipReducerOnUpdate = qry.skipReducerOnUpdate; this.batchedArgs = qry.batchedArgs; + this.streaming = qry.streaming; } /** @@ -164,6 +167,16 @@ public final class SqlFieldsQueryEx extends SqlFieldsQuery { return skipReducerOnUpdate; } + public boolean isStreaming() { + return streaming; + } + + public SqlFieldsQuery setStreaming(boolean streaming) { + this.streaming = streaming; + + return this; + } + /** * Add batched arguments. * http://git-wip-us.apache.org/repos/asf/ignite/blob/d57d406f/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcRequestHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcRequestHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcRequestHandler.java index 2530360..c28c831 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcRequestHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcRequestHandler.java @@ -465,6 +465,8 @@ public class JdbcRequestHandler implements ClientListenerRequestHandler { } } + public static volatile boolean STREAMER = false; + /** * @param req Request. * @return Response. @@ -506,6 +508,9 @@ public class JdbcRequestHandler implements ClientListenerRequestHandler { qry.setLazy(lazy); qry.setSchema(schemaName); + + if (STREAMER) + qry.setStreaming(true); } assert qry != null; http://git-wip-us.apache.org/repos/asf/ignite/blob/d57d406f/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DmlStatementsProcessor.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DmlStatementsProcessor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DmlStatementsProcessor.java index df14c85..60c67d8 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DmlStatementsProcessor.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DmlStatementsProcessor.java @@ -44,6 +44,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.QueryCursorImpl; import org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode; import org.apache.ignite.internal.processors.cache.query.SqlFieldsQueryEx; +import org.apache.ignite.internal.processors.datastreamer.DataStreamerImpl; import org.apache.ignite.internal.processors.odbc.SqlStateCode; import org.apache.ignite.internal.processors.query.GridQueryCacheObjectsIterator; import org.apache.ignite.internal.processors.query.GridQueryCancel; @@ -233,7 +234,8 @@ public class DmlStatementsProcessor { try { List<List<?>> cur = plan.createRows(argss); - UpdateResult res = processDmlSelectResultBatched(cctx, plan, cur, fieldsQry.getPageSize()); + UpdateResult res = + processDmlSelectResultBatched(cctx, plan, cur, fieldsQry.getPageSize(), fieldsQry.isStreaming()); Collection<UpdateResult> ress = new ArrayList<>(1); @@ -503,14 +505,14 @@ public class DmlStatementsProcessor { } private UpdateResult processDmlSelectResultBatched(GridCacheContext cctx, UpdatePlan plan, Collection<List<?>> rows, - int pageSize) throws IgniteCheckedException { + int pageSize, boolean streaming) throws IgniteCheckedException { switch (plan.mode()) { case MERGE: // TODO throw new IgniteCheckedException("Unsupported, fix"); case INSERT: - return new UpdateResult(doInsertBatched(plan, rows, pageSize), X.EMPTY_OBJECT_ARRAY); + return new UpdateResult(doInsertBatched(plan, rows, pageSize, streaming), X.EMPTY_OBJECT_ARRAY); default: throw new IgniteSQLException("Unexpected DML operation [mode=" + plan.mode() + ']', @@ -791,48 +793,81 @@ public class DmlStatementsProcessor { } } + public static volatile DataStreamerImpl streamer; + + public void flushStreamer() { + DataStreamerImpl streamer0 = streamer; + + if (streamer0 != null) + streamer0.flush(); + } + /** * Execute INSERT statement plan. - * @param cursor Cursor to take inserted data from. + * @param rows Cursor to take inserted data from. * @param pageSize Batch size for streaming, anything <= 0 for single page operations. * @return Number of items affected. * @throws IgniteCheckedException if failed, particularly in case of duplicate keys. */ @SuppressWarnings({"unchecked", "ConstantConditions"}) - private long doInsertBatched(UpdatePlan plan, Collection<List<?>> cursor, int pageSize) + private long doInsertBatched(UpdatePlan plan, Collection<List<?>> rows, int pageSize, boolean streaming) throws IgniteCheckedException { GridCacheContext cctx = plan.cacheContext(); - // Keys that failed to INSERT due to duplication. - DmlBatchSender sender = new DmlBatchSender(cctx, pageSize); + if (streaming) { + DataStreamerImpl streamer0 = streamer; - for (List<?> row : cursor) { - final IgniteBiTuple keyValPair = plan.processRow(row); + if (streamer0 == null) { + streamer0 = cctx.kernalContext().dataStream().dataStreamer(cctx.name()); - sender.add(keyValPair.getKey(), new InsertEntryProcessor(keyValPair.getValue())); - } + streamer = streamer0; + } - // TODO: Tale page size in count? - sender.flush(); + List<IgniteBiTuple> keyValPairs = new ArrayList<>(rows.size()); - SQLException resEx = sender.error(); + for (List<?> row : rows) { + IgniteBiTuple keyValPair = plan.processRow(row); - if (!F.isEmpty(sender.failedKeys())) { - String msg = "Failed to INSERT some keys because they are already in cache " + - "[keys=" + sender.failedKeys() + ']'; + keyValPairs.add(keyValPair); + } - SQLException dupEx = new SQLException(msg, SqlStateCode.CONSTRAINT_VIOLATION); + streamer.addData(keyValPairs); - if (resEx == null) - resEx = dupEx; - else - resEx.setNextException(dupEx); + return rows.size(); } + else { - if (resEx != null) - throw new IgniteSQLException(resEx); + // Keys that failed to INSERT due to duplication. + DmlBatchSender sender = new DmlBatchSender(cctx, pageSize); + + for (List<?> row : rows) { + final IgniteBiTuple keyValPair = plan.processRow(row); + + sender.add(keyValPair.getKey(), new InsertEntryProcessor(keyValPair.getValue())); + } + + // TODO: Tale page size in count? + sender.flush(); + + SQLException resEx = sender.error(); + + if (!F.isEmpty(sender.failedKeys())) { + String msg = "Failed to INSERT some keys because they are already in cache " + + "[keys=" + sender.failedKeys() + ']'; + + SQLException dupEx = new SQLException(msg, SqlStateCode.CONSTRAINT_VIOLATION); - return sender.updateCount(); + if (resEx == null) + resEx = dupEx; + else + resEx.setNextException(dupEx); + } + + if (resEx != null) + throw new IgniteSQLException(resEx); + + return sender.updateCount(); + } } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/d57d406f/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java index 31a6645..f5124a4 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java @@ -1405,6 +1405,21 @@ public class IgniteH2Indexing implements GridQueryIndexing { /** {@inheritDoc} */ @Override public List<FieldsQueryCursor<List<?>>> queryDistributedSqlFields(String schemaName, SqlFieldsQuery qry, boolean keepBinary, GridQueryCancel cancel, @Nullable Integer mainCacheId, boolean failOnMultipleStmts) { + if ("FLUSH".equalsIgnoreCase(qry.getSql())) { + dmlProc.flushStreamer(); + + QueryCursorImpl<List<?>> resCur = (QueryCursorImpl<List<?>>)new QueryCursorImpl(Collections.singletonList + (Collections.singletonList(0L)), null, false); + + resCur.fieldsMeta(UPDATE_RESULT_META); + + List<FieldsQueryCursor<List<?>>> res = new ArrayList<>(1); + + res.add(resCur); + + return res; + } + List<FieldsQueryCursor<List<?>>> res = tryQueryDistributedSqlFieldsNative(schemaName, qry); if (res != null)
