Streaming.

Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/d57d406f
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/d57d406f
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/d57d406f

Branch: refs/heads/ignite-6022-proto
Commit: d57d406f13db2457dd0e315dcf3d992a6af1f9b0
Parents: 03f9fe7
Author: devozerov <[email protected]>
Authored: Mon Dec 18 14:34:14 2017 +0300
Committer: devozerov <[email protected]>
Committed: Mon Dec 18 14:34:14 2017 +0300

----------------------------------------------------------------------
 .../cache/query/SqlFieldsQueryEx.java           | 13 +++
 .../odbc/jdbc/JdbcRequestHandler.java           |  5 ++
 .../query/h2/DmlStatementsProcessor.java        | 85 ++++++++++++++------
 .../processors/query/h2/IgniteH2Indexing.java   | 15 ++++
 4 files changed, 93 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/d57d406f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/SqlFieldsQueryEx.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/SqlFieldsQueryEx.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/SqlFieldsQueryEx.java
index fb098a7..2d46d28 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/SqlFieldsQueryEx.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/SqlFieldsQueryEx.java
@@ -38,6 +38,8 @@ public final class SqlFieldsQueryEx extends SqlFieldsQuery {
     /** Batched arguments. */
     private List<Object[]> batchedArgs;
 
+    private boolean streaming;
+
     public SqlFieldsQueryEx(String sql, Boolean isQry) {
         super(sql);
 
@@ -63,6 +65,7 @@ public final class SqlFieldsQueryEx extends SqlFieldsQuery {
         this.isQry = qry.isQry;
         this.skipReducerOnUpdate = qry.skipReducerOnUpdate;
         this.batchedArgs = qry.batchedArgs;
+        this.streaming = qry.streaming;
     }
 
     /**
@@ -164,6 +167,16 @@ public final class SqlFieldsQueryEx extends SqlFieldsQuery 
{
         return skipReducerOnUpdate;
     }
 
+    public boolean isStreaming() {
+        return streaming;
+    }
+
+    public SqlFieldsQuery setStreaming(boolean streaming) {
+        this.streaming = streaming;
+
+        return this;
+    }
+
     /**
      * Add batched arguments.
      *

http://git-wip-us.apache.org/repos/asf/ignite/blob/d57d406f/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcRequestHandler.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcRequestHandler.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcRequestHandler.java
index 2530360..c28c831 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcRequestHandler.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcRequestHandler.java
@@ -465,6 +465,8 @@ public class JdbcRequestHandler implements 
ClientListenerRequestHandler {
         }
     }
 
+    public static volatile boolean STREAMER = false;
+
     /**
      * @param req Request.
      * @return Response.
@@ -506,6 +508,9 @@ public class JdbcRequestHandler implements 
ClientListenerRequestHandler {
                     qry.setLazy(lazy);
 
                     qry.setSchema(schemaName);
+
+                    if (STREAMER)
+                        qry.setStreaming(true);
                 }
 
                 assert qry != null;

http://git-wip-us.apache.org/repos/asf/ignite/blob/d57d406f/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DmlStatementsProcessor.java
----------------------------------------------------------------------
diff --git 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DmlStatementsProcessor.java
 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DmlStatementsProcessor.java
index df14c85..60c67d8 100644
--- 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DmlStatementsProcessor.java
+++ 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DmlStatementsProcessor.java
@@ -44,6 +44,7 @@ import 
org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.QueryCursorImpl;
 import org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode;
 import org.apache.ignite.internal.processors.cache.query.SqlFieldsQueryEx;
+import org.apache.ignite.internal.processors.datastreamer.DataStreamerImpl;
 import org.apache.ignite.internal.processors.odbc.SqlStateCode;
 import 
org.apache.ignite.internal.processors.query.GridQueryCacheObjectsIterator;
 import org.apache.ignite.internal.processors.query.GridQueryCancel;
@@ -233,7 +234,8 @@ public class DmlStatementsProcessor {
             try {
                 List<List<?>> cur = plan.createRows(argss);
 
-                UpdateResult res = processDmlSelectResultBatched(cctx, plan, 
cur, fieldsQry.getPageSize());
+                UpdateResult res =
+                    processDmlSelectResultBatched(cctx, plan, cur, 
fieldsQry.getPageSize(), fieldsQry.isStreaming());
 
                 Collection<UpdateResult> ress = new ArrayList<>(1);
 
@@ -503,14 +505,14 @@ public class DmlStatementsProcessor {
     }
 
     private UpdateResult processDmlSelectResultBatched(GridCacheContext cctx, 
UpdatePlan plan, Collection<List<?>> rows,
-        int pageSize) throws IgniteCheckedException {
+        int pageSize, boolean streaming) throws IgniteCheckedException {
         switch (plan.mode()) {
             case MERGE:
                 // TODO
                 throw new IgniteCheckedException("Unsupported, fix");
 
             case INSERT:
-                return new UpdateResult(doInsertBatched(plan, rows, pageSize), 
X.EMPTY_OBJECT_ARRAY);
+                return new UpdateResult(doInsertBatched(plan, rows, pageSize, 
streaming), X.EMPTY_OBJECT_ARRAY);
 
             default:
                 throw new IgniteSQLException("Unexpected DML operation [mode=" 
+ plan.mode() + ']',
@@ -791,48 +793,81 @@ public class DmlStatementsProcessor {
         }
     }
 
+    public static volatile DataStreamerImpl streamer;
+
+    public void flushStreamer() {
+        DataStreamerImpl streamer0 = streamer;
+
+        if (streamer0 != null)
+            streamer0.flush();
+    }
+
     /**
      * Execute INSERT statement plan.
-     * @param cursor Cursor to take inserted data from.
+     * @param rows Cursor to take inserted data from.
      * @param pageSize Batch size for streaming, anything <= 0 for single page 
operations.
      * @return Number of items affected.
      * @throws IgniteCheckedException if failed, particularly in case of 
duplicate keys.
      */
     @SuppressWarnings({"unchecked", "ConstantConditions"})
-    private long doInsertBatched(UpdatePlan plan, Collection<List<?>> cursor, 
int pageSize)
+    private long doInsertBatched(UpdatePlan plan, Collection<List<?>> rows, 
int pageSize, boolean streaming)
         throws IgniteCheckedException {
         GridCacheContext cctx = plan.cacheContext();
 
-        // Keys that failed to INSERT due to duplication.
-        DmlBatchSender sender = new DmlBatchSender(cctx, pageSize);
+        if (streaming) {
+            DataStreamerImpl streamer0 = streamer;
 
-        for (List<?> row : cursor) {
-            final IgniteBiTuple keyValPair = plan.processRow(row);
+            if (streamer0 == null) {
+                streamer0 = 
cctx.kernalContext().dataStream().dataStreamer(cctx.name());
 
-            sender.add(keyValPair.getKey(), new 
InsertEntryProcessor(keyValPair.getValue()));
-        }
+                streamer = streamer0;
+            }
 
-        // TODO: Tale page size in count?
-        sender.flush();
+            List<IgniteBiTuple> keyValPairs = new ArrayList<>(rows.size());
 
-        SQLException resEx = sender.error();
+            for (List<?> row : rows) {
+                IgniteBiTuple keyValPair = plan.processRow(row);
 
-        if (!F.isEmpty(sender.failedKeys())) {
-            String msg = "Failed to INSERT some keys because they are already 
in cache " +
-                "[keys=" + sender.failedKeys() + ']';
+                keyValPairs.add(keyValPair);
+            }
 
-            SQLException dupEx = new SQLException(msg, 
SqlStateCode.CONSTRAINT_VIOLATION);
+            streamer.addData(keyValPairs);
 
-            if (resEx == null)
-                resEx = dupEx;
-            else
-                resEx.setNextException(dupEx);
+            return rows.size();
         }
+        else {
 
-        if (resEx != null)
-            throw new IgniteSQLException(resEx);
+            // Keys that failed to INSERT due to duplication.
+            DmlBatchSender sender = new DmlBatchSender(cctx, pageSize);
+
+            for (List<?> row : rows) {
+                final IgniteBiTuple keyValPair = plan.processRow(row);
+
+                sender.add(keyValPair.getKey(), new 
InsertEntryProcessor(keyValPair.getValue()));
+            }
+
+            // TODO: Tale page size in count?
+            sender.flush();
+
+            SQLException resEx = sender.error();
+
+            if (!F.isEmpty(sender.failedKeys())) {
+                String msg = "Failed to INSERT some keys because they are 
already in cache " +
+                    "[keys=" + sender.failedKeys() + ']';
+
+                SQLException dupEx = new SQLException(msg, 
SqlStateCode.CONSTRAINT_VIOLATION);
 
-        return sender.updateCount();
+                if (resEx == null)
+                    resEx = dupEx;
+                else
+                    resEx.setNextException(dupEx);
+            }
+
+            if (resEx != null)
+                throw new IgniteSQLException(resEx);
+
+            return sender.updateCount();
+        }
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/d57d406f/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
----------------------------------------------------------------------
diff --git 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
index 31a6645..f5124a4 100644
--- 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
+++ 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
@@ -1405,6 +1405,21 @@ public class IgniteH2Indexing implements 
GridQueryIndexing {
     /** {@inheritDoc} */
     @Override public List<FieldsQueryCursor<List<?>>> 
queryDistributedSqlFields(String schemaName, SqlFieldsQuery qry,
         boolean keepBinary, GridQueryCancel cancel, @Nullable Integer 
mainCacheId, boolean failOnMultipleStmts) {
+        if ("FLUSH".equalsIgnoreCase(qry.getSql())) {
+            dmlProc.flushStreamer();
+
+            QueryCursorImpl<List<?>> resCur = (QueryCursorImpl<List<?>>)new 
QueryCursorImpl(Collections.singletonList
+                (Collections.singletonList(0L)), null, false);
+
+            resCur.fieldsMeta(UPDATE_RESULT_META);
+
+            List<FieldsQueryCursor<List<?>>> res = new ArrayList<>(1);
+
+            res.add(resCur);
+
+            return res;
+        }
+
         List<FieldsQueryCursor<List<?>>> res = 
tryQueryDistributedSqlFieldsNative(schemaName, qry);
 
         if (res != null)

Reply via email to