IGNITE-7253: Streaming mode for JDBC thin driver. This closes #3499.

Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/692e4888
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/692e4888
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/692e4888

Branch: refs/heads/master
Commit: 692e48880dfaae23c415047656de632fbb5e23d1
Parents: 625ad41
Author: Alexander Paschenko <alexander.a.pasche...@gmail.com>
Authored: Mon Feb 19 15:52:49 2018 +0300
Committer: devozerov <voze...@gridgain.com>
Committed: Mon Feb 19 15:52:49 2018 +0300

----------------------------------------------------------------------
 .../jdbc2/JdbcNoCacheStreamingSelfTest.java     | 182 ++++++++++++
 .../internal/jdbc2/JdbcStreamingSelfTest.java   | 218 +++++++++++---
 .../jdbc/suite/IgniteJdbcDriverTestSuite.java   |   3 +
 .../jdbc/thin/JdbcThinStreamingSelfTest.java    | 285 +++++++++++++++++++
 .../jdbc/thin/ConnectionProperties.java         |  66 +++++
 .../jdbc/thin/ConnectionPropertiesImpl.java     | 136 ++++++++-
 .../internal/jdbc/thin/JdbcThinConnection.java  |  69 +++++
 .../jdbc/thin/JdbcThinPreparedStatement.java    |  18 +-
 .../internal/jdbc/thin/JdbcThinStatement.java   |  53 +++-
 .../internal/jdbc/thin/JdbcThinTcpIo.java       |  13 +-
 .../ignite/internal/jdbc2/JdbcConnection.java   |  10 +-
 .../jdbc2/JdbcStreamedPreparedStatement.java    |   2 +-
 .../processors/cache/IgniteCacheProxyImpl.java  |   4 +-
 .../odbc/jdbc/JdbcConnectionContext.java        |  29 +-
 .../odbc/jdbc/JdbcRequestHandler.java           |  82 ++++--
 .../processors/query/GridQueryIndexing.java     |  32 ++-
 .../processors/query/GridQueryProcessor.java    |  85 +++++-
 .../processors/query/SqlClientContext.java      | 195 +++++++++++++
 .../apache/ignite/internal/sql/SqlParser.java   |   2 +-
 ...IgniteClientCacheInitializationFailTest.java |  15 +-
 .../ignite/testframework/GridTestUtils.java     |   2 +-
 .../query/h2/DmlStatementsProcessor.java        | 100 +++----
 .../processors/query/h2/IgniteH2Indexing.java   | 119 ++++++--
 .../query/h2/ddl/DdlStatementsProcessor.java    |  22 +-
 .../processors/query/h2/dml/UpdatePlan.java     |   2 +-
 .../query/h2/dml/UpdatePlanBuilder.java         |  13 +-
 .../query/h2/sql/GridSqlQueryParser.java        |  12 +
 .../cache/GridCacheCrossCacheQuerySelfTest.java |  18 +-
 28 files changed, 1552 insertions(+), 235 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/692e4888/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcNoCacheStreamingSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcNoCacheStreamingSelfTest.java
 
b/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcNoCacheStreamingSelfTest.java
new file mode 100644
index 0000000..74c2820
--- /dev/null
+++ 
b/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcNoCacheStreamingSelfTest.java
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.jdbc2;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.util.Collections;
+import java.util.Properties;
+import org.apache.ignite.IgniteJdbcDriver;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.ConnectorConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+import static org.apache.ignite.IgniteJdbcDriver.CFG_URL_PREFIX;
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+
+/**
+ * Data streaming test for thick driver and no explicit caches.
+ */
+public class JdbcNoCacheStreamingSelfTest extends GridCommonAbstractTest {
+    /** JDBC URL. */
+    private static final String BASE_URL = CFG_URL_PREFIX +
+        "cache=default@modules/clients/src/test/config/jdbc-config.xml";
+
+    /** Connection. */
+    protected Connection conn;
+
+    /** */
+    protected transient IgniteLogger log;
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) 
throws Exception {
+        return getConfiguration0(gridName);
+    }
+
+    /**
+     * @param gridName Grid name.
+     * @return Grid configuration used for starting the grid.
+     * @throws Exception If failed.
+     */
+    private IgniteConfiguration getConfiguration0(String gridName) throws 
Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        CacheConfiguration<?,?> cache = defaultCacheConfiguration();
+
+        cache.setCacheMode(PARTITIONED);
+        cache.setBackups(1);
+        cache.setWriteSynchronizationMode(FULL_SYNC);
+        cache.setIndexedTypes(
+            Integer.class, Integer.class
+        );
+
+        cfg.setCacheConfiguration(cache);
+        cfg.setLocalHost("127.0.0.1");
+
+        TcpDiscoverySpi disco = new TcpDiscoverySpi();
+
+        TcpDiscoveryVmIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+        ipFinder.setAddresses(Collections.singleton("127.0.0.1:47500..47501"));
+
+        disco.setIpFinder(ipFinder);
+
+        cfg.setDiscoverySpi(disco);
+
+        cfg.setConnectorConfiguration(new ConnectorConfiguration());
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        startGrids(2);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTestsStopped() throws Exception {
+        stopAllGrids();
+    }
+
+    /**
+     * @param allowOverwrite Allow overwriting of existing keys.
+     * @return Connection to use for the test.
+     * @throws Exception if failed.
+     */
+    protected Connection createConnection(boolean allowOverwrite) throws 
Exception {
+        Properties props = new Properties();
+
+        props.setProperty(IgniteJdbcDriver.PROP_STREAMING, "true");
+        props.setProperty(IgniteJdbcDriver.PROP_STREAMING_FLUSH_FREQ, "500");
+
+        if (allowOverwrite)
+            props.setProperty(IgniteJdbcDriver.PROP_STREAMING_ALLOW_OVERWRITE, 
"true");
+
+        return DriverManager.getConnection(BASE_URL, props);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        U.closeQuiet(conn);
+
+        ignite(0).cache(DEFAULT_CACHE_NAME).clear();
+
+        super.afterTest();
+    }
+
+    /**
+     * @throws Exception if failed.
+     */
+    public void testStreamedInsert() throws Exception {
+        for (int i = 10; i <= 100; i += 10)
+            ignite(0).cache(DEFAULT_CACHE_NAME).put(i, i * 100);
+
+        try (Connection conn = createConnection(false)) {
+            try (PreparedStatement stmt = conn.prepareStatement("insert into 
Integer(_key, _val) values (?, ?)")) {
+                for (int i = 1; i <= 100; i++) {
+                    stmt.setInt(1, i);
+                    stmt.setInt(2, i);
+
+                    stmt.executeUpdate();
+                }
+            }
+        }
+
+        U.sleep(500);
+
+        // Now let's check it's all there.
+        for (int i = 1; i <= 100; i++) {
+            if (i % 10 != 0)
+                assertEquals(i, grid(0).cache(DEFAULT_CACHE_NAME).get(i));
+            else // All that divides by 10 evenly should point to numbers 100 
times greater - see above
+                assertEquals(i * 100, 
grid(0).cache(DEFAULT_CACHE_NAME).get(i));
+        }
+    }
+
+    /**
+     * @throws Exception if failed.
+     */
+    public void testStreamedInsertWithOverwritesAllowed() throws Exception {
+        for (int i = 10; i <= 100; i += 10)
+            ignite(0).cache(DEFAULT_CACHE_NAME).put(i, i * 100);
+
+        try (Connection conn = createConnection(true)) {
+            try (PreparedStatement stmt = conn.prepareStatement("insert into 
Integer(_key, _val) values (?, ?)")) {
+                for (int i = 1; i <= 100; i++) {
+                    stmt.setInt(1, i);
+                    stmt.setInt(2, i);
+
+                    stmt.executeUpdate();
+                }
+            }
+        }
+
+        U.sleep(500);
+
+        // Now let's check it's all there.
+        // i should point to i at all times as we've turned overwrites on 
above.
+        for (int i = 1; i <= 100; i++)
+            assertEquals(i, grid(0).cache(DEFAULT_CACHE_NAME).get(i));
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/692e4888/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcStreamingSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcStreamingSelfTest.java
 
b/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcStreamingSelfTest.java
index 5418ca0..ebb6bc9 100644
--- 
a/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcStreamingSelfTest.java
+++ 
b/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcStreamingSelfTest.java
@@ -20,16 +20,24 @@ package org.apache.ignite.internal.jdbc2;
 import java.sql.Connection;
 import java.sql.DriverManager;
 import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.sql.Statement;
 import java.util.Collections;
 import java.util.Properties;
+import org.apache.ignite.IgniteCache;
 import org.apache.ignite.IgniteJdbcDriver;
 import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.binary.BinaryObject;
+import org.apache.ignite.binary.BinaryObjectBuilder;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.ConnectorConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.processors.query.QueryUtils;
 import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteCallable;
 import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.GridTestUtils;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 
 import static org.apache.ignite.IgniteJdbcDriver.CFG_URL_PREFIX;
@@ -41,10 +49,12 @@ import static 
org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
  */
 public class JdbcStreamingSelfTest extends GridCommonAbstractTest {
     /** JDBC URL. */
-    private static final String BASE_URL = CFG_URL_PREFIX + 
"cache=default@modules/clients/src/test/config/jdbc-config.xml";
+    private static final String BASE_URL = CFG_URL_PREFIX +
+        "cache=default@modules/clients/src/test/config/jdbc-config.xml";
 
-    /** Connection. */
-    protected Connection conn;
+    /** Streaming URL. */
+    private static final String STREAMING_URL = CFG_URL_PREFIX +
+        "cache=person@modules/clients/src/test/config/jdbc-config.xml";
 
     /** */
     protected transient IgniteLogger log;
@@ -90,7 +100,18 @@ public class JdbcStreamingSelfTest extends 
GridCommonAbstractTest {
 
     /** {@inheritDoc} */
     @Override protected void beforeTestsStarted() throws Exception {
+        super.beforeTestsStarted();
+
         startGrids(2);
+
+        try (Connection c = createOrdinaryConnection()) {
+            try (Statement s = c.createStatement()) {
+                s.execute("CREATE TABLE PUBLIC.Person(\"id\" int primary key, 
\"name\" varchar) WITH " +
+                    "\"cache_name=person,value_type=Person\"");
+            }
+        }
+
+        U.sleep(1000);
     }
 
     /** {@inheritDoc} */
@@ -99,27 +120,51 @@ public class JdbcStreamingSelfTest extends 
GridCommonAbstractTest {
     }
 
     /**
+     * @return Connection without streaming initially turned on.
+     * @throws SQLException if failed.
+     */
+    protected Connection createOrdinaryConnection() throws SQLException {
+        Connection res = DriverManager.getConnection(BASE_URL, new 
Properties());
+
+        res.setSchema(QueryUtils.DFLT_SCHEMA);
+
+        return res;
+    }
+
+    /**
+     * @param allowOverwrite Allow overwriting of existing keys.
+     * @return Connection to use for the test.
+     * @throws Exception if failed.
+     */
+    protected Connection createStreamedConnection(boolean allowOverwrite) 
throws Exception {
+        return createStreamedConnection(allowOverwrite, 500);
+    }
+
+    /**
      * @param allowOverwrite Allow overwriting of existing keys.
+     * @param flushTimeout Stream flush timeout.
      * @return Connection to use for the test.
      * @throws Exception if failed.
      */
-    private Connection createConnection(boolean allowOverwrite) throws 
Exception {
+    protected Connection createStreamedConnection(boolean allowOverwrite, long 
flushTimeout) throws Exception {
         Properties props = new Properties();
 
         props.setProperty(IgniteJdbcDriver.PROP_STREAMING, "true");
-        props.setProperty(IgniteJdbcDriver.PROP_STREAMING_FLUSH_FREQ, "500");
+        props.setProperty(IgniteJdbcDriver.PROP_STREAMING_FLUSH_FREQ, 
String.valueOf(flushTimeout));
 
         if (allowOverwrite)
             props.setProperty(IgniteJdbcDriver.PROP_STREAMING_ALLOW_OVERWRITE, 
"true");
 
-        return DriverManager.getConnection(BASE_URL, props);
+        Connection res = DriverManager.getConnection(STREAMING_URL, props);
+
+        res.setSchema(QueryUtils.DFLT_SCHEMA);
+
+        return res;
     }
 
     /** {@inheritDoc} */
     @Override protected void afterTest() throws Exception {
-        U.closeQuiet(conn);
-
-        ignite(0).cache(DEFAULT_CACHE_NAME).clear();
+        cache().clear();
 
         super.afterTest();
     }
@@ -128,30 +173,59 @@ public class JdbcStreamingSelfTest extends 
GridCommonAbstractTest {
      * @throws Exception if failed.
      */
     public void testStreamedInsert() throws Exception {
-        conn = createConnection(false);
-
         for (int i = 10; i <= 100; i += 10)
-            ignite(0).cache(DEFAULT_CACHE_NAME).put(i, i * 100);
+            put(i, nameForId(i * 100));
+
+        try (Connection conn = createStreamedConnection(false)) {
+            try (PreparedStatement stmt = conn.prepareStatement("insert into 
PUBLIC.Person(\"id\", \"name\") " +
+                "values (?, ?)")) {
+                for (int i = 1; i <= 100; i++) {
+                    stmt.setInt(1, i);
+                    stmt.setString(2, nameForId(i));
+
+                    stmt.executeUpdate();
+                }
+            }
+        }
 
-        PreparedStatement stmt = conn.prepareStatement("insert into 
Integer(_key, _val) values (?, ?)");
+        U.sleep(500);
 
+        // Now let's check it's all there.
         for (int i = 1; i <= 100; i++) {
-            stmt.setInt(1, i);
-            stmt.setInt(2, i);
+            if (i % 10 != 0)
+                assertEquals(nameForId(i), nameForIdInCache(i));
+            else // All that divides by 10 evenly should point to numbers 100 
times greater - see above
+                assertEquals(nameForId(i * 100), nameForIdInCache(i));
+        }
+    }
 
-            stmt.executeUpdate();
+    /**
+     * @throws Exception if failed.
+     */
+    public void testStreamedInsertWithoutColumnsList() throws Exception {
+        for (int i = 10; i <= 100; i += 10)
+            put(i, nameForId(i * 100));
+
+        try (Connection conn = createStreamedConnection(false)) {
+            try (PreparedStatement stmt = conn.prepareStatement("insert into 
PUBLIC.Person(\"id\", \"name\") " +
+                "values (?, ?)")) {
+                for (int i = 1; i <= 100; i++) {
+                    stmt.setInt(1, i);
+                    stmt.setString(2, nameForId(i));
+
+                    stmt.executeUpdate();
+                }
+            }
         }
 
-        // Closing connection makes it wait for streamer close
-        // and thus for data load completion as well
-        conn.close();
+        U.sleep(500);
 
         // Now let's check it's all there.
         for (int i = 1; i <= 100; i++) {
             if (i % 10 != 0)
-                assertEquals(i, grid(0).cache(DEFAULT_CACHE_NAME).get(i));
+                assertEquals(nameForId(i), nameForIdInCache(i));
             else // All that divides by 10 evenly should point to numbers 100 
times greater - see above
-                assertEquals(i * 100, 
grid(0).cache(DEFAULT_CACHE_NAME).get(i));
+                assertEquals(nameForId(i * 100), nameForIdInCache(i));
         }
     }
 
@@ -159,27 +233,99 @@ public class JdbcStreamingSelfTest extends 
GridCommonAbstractTest {
      * @throws Exception if failed.
      */
     public void testStreamedInsertWithOverwritesAllowed() throws Exception {
-        conn = createConnection(true);
-
         for (int i = 10; i <= 100; i += 10)
-            ignite(0).cache(DEFAULT_CACHE_NAME).put(i, i * 100);
-
-        PreparedStatement stmt = conn.prepareStatement("insert into 
Integer(_key, _val) values (?, ?)");
-
-        for (int i = 1; i <= 100; i++) {
-            stmt.setInt(1, i);
-            stmt.setInt(2, i);
-
-            stmt.executeUpdate();
+            put(i, nameForId(i * 100));
+
+        try (Connection conn = createStreamedConnection(true)) {
+            try (PreparedStatement stmt = conn.prepareStatement("insert into 
PUBLIC.Person(\"id\", \"name\") " +
+                "values (?, ?)")) {
+                for (int i = 1; i <= 100; i++) {
+                    stmt.setInt(1, i);
+                    stmt.setString(2, nameForId(i));
+
+                    stmt.executeUpdate();
+                }
+            }
         }
 
-        // Closing connection makes it wait for streamer close
-        // and thus for data load completion as well
-        conn.close();
+        U.sleep(500);
 
         // Now let's check it's all there.
         // i should point to i at all times as we've turned overwrites on 
above.
         for (int i = 1; i <= 100; i++)
-            assertEquals(i, grid(0).cache(DEFAULT_CACHE_NAME).get(i));
+            assertEquals(nameForId(i), nameForIdInCache(i));
+    }
+
+    /** */
+    public void testOnlyInsertsAllowed() {
+        assertStatementForbidden("CREATE TABLE PUBLIC.X (x int primary key, y 
int)");
+
+        assertStatementForbidden("SELECT * from Person");
+
+        assertStatementForbidden("insert into PUBLIC.Person(\"id\", \"name\") 
" +
+            "(select \"id\" + 1, CONCAT(\"name\", '1') from Person)");
+
+        assertStatementForbidden("DELETE from Person");
+
+        assertStatementForbidden("UPDATE Person SET \"name\" = 'name0'");
+
+        assertStatementForbidden("alter table Person add column y int");
+    }
+
+    /**
+     * @param sql Statement to check.
+     */
+    @SuppressWarnings("ThrowableNotThrown")
+    protected void assertStatementForbidden(String sql) {
+        GridTestUtils.assertThrows(null, new IgniteCallable<Object>() {
+            @Override public Object call() throws Exception {
+                try (Connection c = createStreamedConnection(false)) {
+                    try (PreparedStatement s = c.prepareStatement(sql)) {
+                        s.execute();
+                    }
+                }
+
+                return null;
+            }
+        }, SQLException.class,"Only tuple based INSERT statements are 
supported in streaming mode");
+    }
+
+    /**
+     * @return Person cache.
+     */
+    protected IgniteCache<Integer, Object> cache() {
+        return grid(0).cache("person");
+    }
+
+    /**
+     * @param id id of person to put.
+     * @param name name of person to put.
+     */
+    protected void put(int id, String name) {
+        BinaryObjectBuilder bldr = grid(0).binary().builder("Person");
+
+        bldr.setField("name", name);
+
+        cache().put(id, bldr.build());
+    }
+
+    /**
+     * @param id Person id.
+     * @return Default name for person w/given id.
+     */
+    protected String nameForId(int id) {
+        return "Person" + id;
+    }
+
+    /**
+     * @param id person id.
+     * @return Name for person with given id currently stored in cache.
+     */
+    protected String nameForIdInCache(int id) {
+        Object o = cache().withKeepBinary().get(id);
+
+        assertTrue(String.valueOf(o), o instanceof BinaryObject);
+
+        return ((BinaryObject)o).field("name");
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/692e4888/modules/clients/src/test/java/org/apache/ignite/jdbc/suite/IgniteJdbcDriverTestSuite.java
----------------------------------------------------------------------
diff --git 
a/modules/clients/src/test/java/org/apache/ignite/jdbc/suite/IgniteJdbcDriverTestSuite.java
 
b/modules/clients/src/test/java/org/apache/ignite/jdbc/suite/IgniteJdbcDriverTestSuite.java
index 4530ae7..3e31d78 100644
--- 
a/modules/clients/src/test/java/org/apache/ignite/jdbc/suite/IgniteJdbcDriverTestSuite.java
+++ 
b/modules/clients/src/test/java/org/apache/ignite/jdbc/suite/IgniteJdbcDriverTestSuite.java
@@ -65,6 +65,7 @@ import org.apache.ignite.jdbc.thin.JdbcThinResultSetSelfTest;
 import org.apache.ignite.jdbc.thin.JdbcThinSchemaCaseTest;
 import org.apache.ignite.jdbc.thin.JdbcThinSelectAfterAlterTable;
 import org.apache.ignite.jdbc.thin.JdbcThinStatementSelfTest;
+import org.apache.ignite.jdbc.thin.JdbcThinStreamingSelfTest;
 import org.apache.ignite.jdbc.thin.JdbcThinUpdateStatementSelfTest;
 import 
org.apache.ignite.jdbc.thin.JdbcThinComplexDmlDdlSkipReducerOnUpdateSelfTest;
 import 
org.apache.ignite.jdbc.thin.JdbcThinInsertStatementSkipReducerOnUpdateSelfTest;
@@ -121,9 +122,11 @@ public class IgniteJdbcDriverTestSuite extends TestSuite {
         suite.addTest(new 
TestSuite(org.apache.ignite.internal.jdbc2.JdbcStatementBatchingSelfTest.class));
         suite.addTest(new 
TestSuite(org.apache.ignite.internal.jdbc2.JdbcErrorsSelfTest.class));
         suite.addTest(new 
TestSuite(org.apache.ignite.internal.jdbc2.JdbcStreamingToPublicCacheTest.class));
+        suite.addTest(new 
TestSuite(org.apache.ignite.internal.jdbc2.JdbcNoCacheStreamingSelfTest.class));
 
         suite.addTest(new TestSuite(JdbcBlobTest.class));
         suite.addTest(new 
TestSuite(org.apache.ignite.internal.jdbc2.JdbcStreamingSelfTest.class));
+        suite.addTest(new TestSuite(JdbcThinStreamingSelfTest.class));
 
         // DDL tests.
         suite.addTest(new 
TestSuite(org.apache.ignite.internal.jdbc2.JdbcDynamicIndexAtomicPartitionedNearSelfTest.class));

http://git-wip-us.apache.org/repos/asf/ignite/blob/692e4888/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinStreamingSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinStreamingSelfTest.java
 
b/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinStreamingSelfTest.java
new file mode 100644
index 0000000..9eba4da
--- /dev/null
+++ 
b/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinStreamingSelfTest.java
@@ -0,0 +1,285 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.jdbc.thin;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Set;
+import org.apache.ignite.IgniteDataStreamer;
+import org.apache.ignite.cache.CachePeekMode;
+import org.apache.ignite.internal.jdbc2.JdbcStreamingSelfTest;
+import org.apache.ignite.internal.processors.query.SqlClientContext;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.U;
+
+/**
+ * Tests for streaming via thin driver.
+ */
+public class JdbcThinStreamingSelfTest extends JdbcStreamingSelfTest {
+    /** */
+    private int batchSize = 17;
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        super.beforeTestsStarted();
+
+        batchSize = 17;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        try (Connection c = createOrdinaryConnection()) {
+            execute(c, "DROP TABLE PUBLIC.T IF EXISTS");
+        }
+
+        super.afterTest();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected Connection createStreamedConnection(boolean 
allowOverwrite, long flushFreq) throws Exception {
+        return JdbcThinAbstractSelfTest.connect(grid(0), 
"streaming=true&streamingFlushFrequency="
+            + flushFreq + "&" + "streamingAllowOverwrite=" + allowOverwrite + 
"&streamingPerNodeBufferSize=1000&"
+            + "streamingBatchSize=" + batchSize);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected Connection createOrdinaryConnection() throws 
SQLException {
+        return JdbcThinAbstractSelfTest.connect(grid(0), null);
+    }
+
+    /**
+     * @throws Exception if failed.
+     */
+    public void testStreamedBatchedInsert() throws Exception {
+        for (int i = 10; i <= 100; i += 10)
+            put(i, nameForId(i * 100));
+
+        try (Connection conn = createStreamedConnection(false)) {
+            assertStreamingOn();
+
+            try (PreparedStatement stmt = conn.prepareStatement("insert into 
Person(\"id\", \"name\") values (?, ?), " +
+                "(?, ?)")) {
+                for (int i = 1; i <= 100; i+=2) {
+                    stmt.setInt(1, i);
+                    stmt.setString(2, nameForId(i));
+                    stmt.setInt(3, i + 1);
+                    stmt.setString(4, nameForId(i + 1));
+
+                    stmt.addBatch();
+                }
+
+                stmt.executeBatch();
+            }
+        }
+
+        U.sleep(500);
+
+        // Now let's check it's all there.
+        for (int i = 1; i <= 100; i++) {
+            if (i % 10 != 0)
+                assertEquals(nameForId(i), nameForIdInCache(i));
+            else // All that divides by 10 evenly should point to numbers 100 
times greater - see above
+                assertEquals(nameForId(i * 100), nameForIdInCache(i));
+        }
+    }
+
+    /**
+     * @throws SQLException if failed.
+     */
+    public void testSimultaneousStreaming() throws Exception {
+        try (Connection anotherConn = createOrdinaryConnection()) {
+            execute(anotherConn, "CREATE TABLE PUBLIC.T(x int primary key, y 
int) WITH " +
+                "\"cache_name=T,wrap_value=false\"");
+        }
+
+        // Timeout to let connection close be handled on server side.
+        U.sleep(500);
+
+        try (Connection conn = createStreamedConnection(false, 10000)) {
+            assertStreamingOn();
+
+            PreparedStatement firstStmt = conn.prepareStatement("insert into 
Person(\"id\", \"name\") values (?, ?)");
+
+            PreparedStatement secondStmt = conn.prepareStatement("insert into 
PUBLIC.T(x, y) values (?, ?)");
+
+            try {
+                for (int i = 1; i <= 10; i++) {
+                    firstStmt.setInt(1, i);
+                    firstStmt.setString(2, nameForId(i));
+
+                    firstStmt.executeUpdate();
+                }
+
+                for (int i = 51; i <= 67; i++) {
+                    secondStmt.setInt(1, i);
+                    secondStmt.setInt(2, i);
+
+                    secondStmt.executeUpdate();
+                }
+
+                for (int i = 11; i <= 50; i++) {
+                    firstStmt.setInt(1, i);
+                    firstStmt.setString(2, nameForId(i));
+
+                    firstStmt.executeUpdate();
+                }
+
+                for (int i = 68; i <= 100; i++) {
+                    secondStmt.setInt(1, i);
+                    secondStmt.setInt(2, i);
+
+                    secondStmt.executeUpdate();
+                }
+
+                assertCacheEmpty();
+
+                SqlClientContext cliCtx = sqlClientContext();
+
+                HashMap<String, IgniteDataStreamer<?, ?>> streamers = 
U.field(cliCtx, "streamers");
+
+                assertEquals(2, streamers.size());
+
+                assertEqualsCollections(new HashSet<>(Arrays.asList("person", 
"T")), streamers.keySet());
+            }
+            finally {
+                U.closeQuiet(firstStmt);
+
+                U.closeQuiet(secondStmt);
+            }
+        }
+
+        // Let's wait a little so that all data arrives to destination - we 
can't intercept streamers' flush
+        // on connection close in any way.
+        U.sleep(1000);
+
+        // Now let's check it's all there.
+        for (int i = 1; i <= 50; i++)
+            assertEquals(nameForId(i), nameForIdInCache(i));
+
+        for (int i = 51; i <= 100; i++)
+            assertEquals(i, grid(0).cache("T").get(i));
+    }
+
+    /**
+     *
+     */
+    public void testStreamingWithMixedStatementTypes() throws Exception {
+        String prepStmtStr = "insert into Person(\"id\", \"name\") values (?, 
?)";
+
+        String stmtStr = "insert into Person(\"id\", \"name\") values (%d, 
'%s')";
+
+        try (Connection conn = createStreamedConnection(false, 10000)) {
+            assertStreamingOn();
+
+            PreparedStatement firstStmt = conn.prepareStatement(prepStmtStr);
+
+            Statement secondStmt = conn.createStatement();
+
+            try {
+                for (int i = 1; i <= 100; i++) {
+                    boolean usePrep = Math.random() > 0.5;
+
+                    boolean useBatch = Math.random() > 0.5;
+
+                    if (usePrep) {
+                        firstStmt.setInt(1, i);
+                        firstStmt.setString(2, nameForId(i));
+
+                        if (useBatch)
+                            firstStmt.addBatch();
+                        else
+                            firstStmt.execute();
+                    }
+                    else {
+                        String sql = String.format(stmtStr, i, nameForId(i));
+
+                        if (useBatch)
+                            secondStmt.addBatch(sql);
+                        else
+                            secondStmt.execute(sql);
+                    }
+                }
+            }
+            finally {
+                U.closeQuiet(firstStmt);
+
+                U.closeQuiet(secondStmt);
+            }
+        }
+
+        // Let's wait a little so that all data arrives to destination - we 
can't intercept streamers' flush
+        // on connection close in any way.
+        U.sleep(1000);
+
+        // Now let's check it's all there.
+        for (int i = 1; i <= 100; i++)
+            assertEquals(nameForId(i), nameForIdInCache(i));
+    }
+
+    /**
+     * Check that there's nothing in cache.
+     */
+    private void assertCacheEmpty() {
+        assertEquals(0, 
grid(0).cache(DEFAULT_CACHE_NAME).size(CachePeekMode.ALL));
+    }
+
+    /**
+     * @param conn Connection.
+     * @param sql Statement.
+     * @throws SQLException if failed.
+     */
+    private static void execute(Connection conn, String sql) throws 
SQLException {
+        try (Statement s = conn.createStatement()) {
+            s.execute(sql);
+        }
+    }
+
+    /**
+     * @return Active SQL client context.
+     */
+    private SqlClientContext sqlClientContext() {
+        Set<SqlClientContext> ctxs = U.field(grid(0).context().query(), 
"cliCtxs");
+
+        assertFalse(F.isEmpty(ctxs));
+
+        assertEquals(1, ctxs.size());
+
+        return ctxs.iterator().next();
+    }
+
+    /**
+     * Check that streaming state on target node is as expected.
+     */
+    private void assertStreamingOn() {
+        SqlClientContext cliCtx = sqlClientContext();
+
+        assertTrue(cliCtx.isStream());
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void assertStatementForbidden(String sql) {
+        batchSize = 1;
+
+        super.assertStatementForbidden(sql);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/692e4888/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/ConnectionProperties.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/ConnectionProperties.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/ConnectionProperties.java
index 169f85b..e7a25a5 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/ConnectionProperties.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/ConnectionProperties.java
@@ -351,4 +351,70 @@ public interface ConnectionProperties {
      * @param sslFactory Custom class name that implements 
Factory&lt;SSLSocketFactory&gt;.
      */
     public void setSslFactory(String sslFactory);
+
+    /**
+     * @return Streamed connection flag.
+     */
+    public boolean isStream();
+
+    /**
+     * @param stream Streamed connection flag.
+     */
+    public void setStream(boolean stream);
+
+    /**
+     * @return Allow overwrites during streaming connection flag.
+     */
+    public boolean isStreamAllowOverwrite();
+
+    /**
+     * @param streamAllowOverwrite Allow overwrites during streaming 
connection flag.
+     */
+    public void setStreamAllowOverwrite(boolean streamAllowOverwrite);
+
+    /**
+     * @return Number of parallel operations per node during streaming 
connection param.
+     */
+    public int getStreamParallelOperations();
+
+    /**
+     * @param streamParallelOperations Number of parallel operations per node 
during streaming connection param.
+     * @throws SQLException if value check failed.
+     */
+    public void setStreamParallelOperations(int streamParallelOperations) 
throws SQLException;
+
+    /**
+     * @return Buffer size during streaming connection param.
+     */
+    public int getStreamBufferSize();
+
+    /**
+     * @param streamBufSize Buffer size during streaming connection param.
+     * @throws SQLException if value check failed.
+     */
+    public void setStreamBufferSize(int streamBufSize) throws SQLException;
+
+    /**
+     * @return Flush timeout during streaming connection param.
+     */
+    public long getStreamFlushFrequency();
+
+    /**
+     * @param streamFlushFreq Flush timeout during streaming connection param.
+     * @throws SQLException if value check failed.
+     */
+    public void setStreamFlushFrequency(long streamFlushFreq) throws 
SQLException;
+
+    /**
+     * @return Batch size for streaming (number of commands to accumulate 
internally before actually
+     * sending over the wire).
+     */
+    public int getStreamBatchSize();
+
+    /**
+     * @param streamBatchSize Batch size for streaming (number of commands to 
accumulate internally before actually
+     * sending over the wire).
+     * @throws SQLException if value check failed.
+     */
+    public void setStreamBatchSize(int streamBatchSize) throws SQLException;
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/692e4888/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/ConnectionPropertiesImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/ConnectionPropertiesImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/ConnectionPropertiesImpl.java
index 471381b..3f1819d 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/ConnectionPropertiesImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/ConnectionPropertiesImpl.java
@@ -146,15 +146,44 @@ public class ConnectionPropertiesImpl implements 
ConnectionProperties, Serializa
     private StringProperty sslFactory = new StringProperty("sslFactory",
         "Custom class name that implements Factory<SSLSocketFactory>", null, 
null, false, null);
 
+    /** Turn on streaming mode on this connection. */
+    private BooleanProperty stream = new BooleanProperty(
+        "streaming", "Turn on streaming mode on this connection", false, 
false);
+
+    /** Turn on overwrite during streaming on this connection. */
+    private BooleanProperty streamAllowOverwrite = new BooleanProperty(
+        "streamingAllowOverwrite", "Turn on overwrite during streaming on this 
connection", false, false);
+
+    /** Number of parallel operations per cluster node during streaming. */
+    private IntegerProperty streamParOps = new IntegerProperty(
+        "streamingPerNodeParallelOperations", "Number of parallel operations 
per cluster node during streaming",
+        0, false, 0, Integer.MAX_VALUE);
+
+    /** Buffer size per cluster node during streaming. */
+    private IntegerProperty streamBufSize = new IntegerProperty(
+        "streamingPerNodeBufferSize", "Buffer size per cluster node during 
streaming",
+        0, false, 0, Integer.MAX_VALUE);
+
+    /** Buffer size per cluster node during streaming. */
+    private LongProperty streamFlushFreq = new LongProperty(
+        "streamingFlushFrequency", "Buffer size per cluster node during 
streaming",
+        0, false, 0, Long.MAX_VALUE);
+
+    /** Buffer size per cluster node during streaming. */
+    private IntegerProperty streamBatchSize = new IntegerProperty(
+        "streamingBatchSize", "Batch size for streaming (number of commands to 
accumulate internally " +
+        "before actually sending over the wire)", 10, false, 1, 
Integer.MAX_VALUE);
+
     /** Properties array. */
-    private final ConnectionProperty [] propsArray = {
+    private final ConnectionProperty [] props = {
         host, port,
         distributedJoins, enforceJoinOrder, collocated, replicatedOnly, 
autoCloseServerCursor,
         tcpNoDelay, lazy, socketSendBuffer, socketReceiveBuffer, 
skipReducerOnUpdate,
         sslMode, sslProtocol, sslKeyAlgorithm,
         sslClientCertificateKeyStoreUrl, sslClientCertificateKeyStorePassword, 
sslClientCertificateKeyStoreType,
         sslTrustCertificateKeyStoreUrl, sslTrustCertificateKeyStorePassword, 
sslTrustCertificateKeyStoreType,
-        sslTrustAll, sslFactory
+        sslTrustAll, sslFactory,
+        stream, streamAllowOverwrite, streamParOps, streamBufSize, 
streamFlushFreq, streamBatchSize
     };
 
     /** {@inheritDoc} */
@@ -387,6 +416,66 @@ public class ConnectionPropertiesImpl implements 
ConnectionProperties, Serializa
         this.sslFactory.setValue(sslFactory);
     }
 
+    /** {@inheritDoc} */
+    @Override public boolean isStream() {
+        return stream.value();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void setStream(boolean val) {
+        stream.setValue(val);
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean isStreamAllowOverwrite() {
+        return streamAllowOverwrite.value();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void setStreamAllowOverwrite(boolean val) {
+        streamAllowOverwrite.setValue(val);
+    }
+
+    /** {@inheritDoc} */
+    @Override public int getStreamParallelOperations() {
+        return streamParOps.value();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void setStreamParallelOperations(int val) throws 
SQLException {
+        streamParOps.setValue(val);
+    }
+
+    /** {@inheritDoc} */
+    @Override public int getStreamBufferSize() {
+        return streamBufSize.value();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void setStreamBufferSize(int val) throws SQLException {
+        streamBufSize.setValue(val);
+    }
+
+    /** {@inheritDoc} */
+    @Override public long getStreamFlushFrequency() {
+        return streamFlushFreq.value();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void setStreamFlushFrequency(long val) throws 
SQLException {
+        streamFlushFreq.setValue(val);
+    }
+
+    /** {@inheritDoc} */
+    @Override public int getStreamBatchSize() {
+        return streamBatchSize.value();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void setStreamBatchSize(int val) throws SQLException {
+        streamBatchSize.setValue(val);
+    }
+
     /**
      * @param props Environment properties.
      * @throws SQLException On error.
@@ -394,7 +483,7 @@ public class ConnectionPropertiesImpl implements 
ConnectionProperties, Serializa
     void init(Properties props) throws SQLException {
         Properties props0 = (Properties)props.clone();
 
-        for (ConnectionProperty aPropsArray : propsArray)
+        for (ConnectionProperty aPropsArray : this.props)
             aPropsArray.init(props0);
     }
 
@@ -402,10 +491,10 @@ public class ConnectionPropertiesImpl implements 
ConnectionProperties, Serializa
      * @return Driver's properties info array.
      */
     private DriverPropertyInfo[] getDriverPropertyInfo() {
-        DriverPropertyInfo[] dpis = new DriverPropertyInfo[propsArray.length];
+        DriverPropertyInfo[] dpis = new DriverPropertyInfo[props.length];
 
-        for (int i = 0; i < propsArray.length; ++i)
-            dpis[i] = propsArray[i].getDriverPropertyInfo();
+        for (int i = 0; i < props.length; ++i)
+            dpis[i] = props[i].getDriverPropertyInfo();
 
         return dpis;
     }
@@ -722,7 +811,8 @@ public class ConnectionPropertiesImpl implements 
ConnectionProperties, Serializa
             else {
                 try {
                     setValue(parse(str));
-                } catch (NumberFormatException e) {
+                }
+                catch (NumberFormatException e) {
                     throw new SQLException("Failed to parse int property 
[name=" + name +
                         ", value=" + str + ']', 
SqlStateCode.CLIENT_CONNECTION_FAILED);
                 }
@@ -797,6 +887,38 @@ public class ConnectionPropertiesImpl implements 
ConnectionProperties, Serializa
     /**
      *
      */
+    private static class LongProperty extends NumberProperty {
+        /** */
+        private static final long serialVersionUID = 0L;
+
+        /**
+         * @param name Name.
+         * @param desc Description.
+         * @param dfltVal Default value.
+         * @param required {@code true} if the property is required.
+         * @param min Lower bound of allowed range.
+         * @param max Upper bound of allowed range.
+         */
+        LongProperty(String name, String desc, Number dfltVal, boolean 
required, long min, long max) {
+            super(name, desc, dfltVal, required, min, max);
+        }
+
+        /** {@inheritDoc} */
+        @Override protected Number parse(String str) throws 
NumberFormatException {
+            return Long.parseLong(str);
+        }
+
+        /**
+         * @return Property value.
+         */
+        long value() {
+            return val.longValue();
+        }
+    }
+
+    /**
+     *
+     */
     private static class StringProperty extends ConnectionProperty {
         /** */
         private static final long serialVersionUID = 0L;

http://git-wip-us.apache.org/repos/asf/ignite/blob/692e4888/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinConnection.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinConnection.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinConnection.java
index ac9925d..b223f54 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinConnection.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinConnection.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.internal.jdbc.thin;
 
 import java.sql.Array;
+import java.sql.BatchUpdateException;
 import java.sql.Blob;
 import java.sql.CallableStatement;
 import java.sql.Clob;
@@ -33,13 +34,19 @@ import java.sql.SQLXML;
 import java.sql.Savepoint;
 import java.sql.Statement;
 import java.sql.Struct;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 import java.util.concurrent.Executor;
+import java.util.logging.Level;
 import java.util.logging.Logger;
 import org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode;
 import org.apache.ignite.internal.processors.odbc.ClientListenerResponse;
 import org.apache.ignite.internal.processors.odbc.SqlStateCode;
+import org.apache.ignite.internal.processors.odbc.jdbc.JdbcBatchExecuteRequest;
+import org.apache.ignite.internal.processors.odbc.jdbc.JdbcBatchExecuteResult;
+import org.apache.ignite.internal.processors.odbc.jdbc.JdbcQuery;
 import org.apache.ignite.internal.processors.odbc.jdbc.JdbcRequest;
 import org.apache.ignite.internal.processors.odbc.jdbc.JdbcResponse;
 import org.apache.ignite.internal.processors.odbc.jdbc.JdbcResult;
@@ -94,6 +101,12 @@ public class JdbcThinConnection implements Connection {
     /** Connection properties. */
     private ConnectionProperties connProps;
 
+    /** Batch for streaming. */
+    private List<JdbcQuery> streamBatch;
+
+    /** Last added query to recognize batches. */
+    private String lastStreamQry;
+
     /**
      * Creates new connection.
      *
@@ -135,6 +148,53 @@ public class JdbcThinConnection implements Connection {
         }
     }
 
+    /**
+     * @return Whether this connection is streamed or not.
+     */
+    public boolean isStream() {
+        return connProps.isStream();
+    }
+
+    /**
+     * Add another query for batched execution.
+     * @param sql Query.
+     * @param args Arguments.
+     */
+    synchronized void addBatch(String sql, List<Object> args) throws 
SQLException {
+        boolean newQry = (args == null || !F.eq(lastStreamQry, sql));
+
+        // Providing null as SQL here allows for recognizing subbatches on 
server and handling them more efficiently.
+        JdbcQuery q  = new JdbcQuery(newQry ? sql : null, args != null ? 
args.toArray() : null);
+
+        if (streamBatch == null)
+            streamBatch = new ArrayList<>(connProps.getStreamBatchSize());
+
+        streamBatch.add(q);
+
+        // Null args means "addBatch(String)" was called on non-prepared 
Statement,
+        // we don't want to remember its query string.
+        lastStreamQry = (args != null ? sql : null);
+
+        if (streamBatch.size() == connProps.getStreamBatchSize())
+            executeBatch();
+    }
+
+    /**
+     * @throws SQLException if failed.
+     */
+    private void executeBatch() throws SQLException {
+        JdbcBatchExecuteResult res = sendRequest(new 
JdbcBatchExecuteRequest(schema, streamBatch));
+
+        streamBatch = null;
+
+        lastStreamQry = null;
+
+        if (res.errorCode() != ClientListenerResponse.STATUS_SUCCESS) {
+            throw new BatchUpdateException(res.errorMessage(), 
IgniteQueryErrorCode.codeToSqlState(res.errorCode()),
+                res.errorCode(), res.updateCounts());
+        }
+    }
+
     /** {@inheritDoc} */
     @Override public Statement createStatement() throws SQLException {
         return createStatement(TYPE_FORWARD_ONLY, CONCUR_READ_ONLY, 
HOLD_CURSORS_OVER_COMMIT);
@@ -277,6 +337,15 @@ public class JdbcThinConnection implements Connection {
         if (isClosed())
             return;
 
+        if (!F.isEmpty(streamBatch)) {
+            try {
+                executeBatch();
+            }
+            catch (SQLException e) {
+                LOG.log(Level.WARNING, "Exception during batch send on 
streamed connection close", e);
+            }
+        }
+
         closed = true;
 
         cliIo.close();

http://git-wip-us.apache.org/repos/asf/ignite/blob/692e4888/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinPreparedStatement.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinPreparedStatement.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinPreparedStatement.java
index 23d3bbe..dfd1e77 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinPreparedStatement.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinPreparedStatement.java
@@ -262,13 +262,19 @@ public class JdbcThinPreparedStatement extends 
JdbcThinStatement implements Prep
     @Override public void addBatch() throws SQLException {
         ensureNotClosed();
 
-        if (batch == null) {
-            batch = new ArrayList<>();
-
-            batch.add(new JdbcQuery(sql, args.toArray(new 
Object[args.size()])));
+        batchSize++;
+
+        if (conn.isStream())
+            conn.addBatch(sql, args);
+        else {
+            if (batch == null) {
+                batch = new ArrayList<>();
+
+                batch.add(new JdbcQuery(sql, args.toArray(new 
Object[args.size()])));
+            }
+            else
+                batch.add(new JdbcQuery(null, args.toArray(new 
Object[args.size()])));
         }
-        else
-            batch.add(new JdbcQuery(null, args.toArray(new 
Object[args.size()])));
 
         args = null;
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/692e4888/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinStatement.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinStatement.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinStatement.java
index 9c41804..a71812f 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinStatement.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinStatement.java
@@ -79,6 +79,9 @@ public class JdbcThinStatement implements Statement {
     /** Result set  holdability*/
     private final int resHoldability;
 
+    /** Batch size to keep track of number of items to return as fake update 
counters for executeBatch. */
+    protected int batchSize;
+
     /** Batch. */
     protected List<JdbcQuery> batch;
 
@@ -133,6 +136,19 @@ public class JdbcThinStatement implements Statement {
         if (sql == null || sql.isEmpty())
             throw new SQLException("SQL query is empty.");
 
+        if (conn.isStream()) {
+            if (stmtType == JdbcStatementType.SELECT_STATEMENT_TYPE)
+                throw new SQLException("Only tuple based INSERT statements are 
supported in streaming mode.",
+                    SqlStateCode.INTERNAL_ERROR,
+                    IgniteQueryErrorCode.UNSUPPORTED_OPERATION);
+
+            conn.addBatch(sql, args);
+
+            resultSets = Collections.singletonList(resultSetForUpdate(0));
+
+            return;
+        }
+
         JdbcResult res0 = conn.sendRequest(new 
JdbcQueryExecuteRequest(stmtType, schema, pageSize,
             maxRows, sql, args == null ? null : args.toArray(new 
Object[args.size()])));
 
@@ -158,11 +174,8 @@ public class JdbcThinStatement implements Statement {
             boolean firstRes = true;
 
             for(JdbcResultInfo rsInfo : resInfos) {
-                if (!rsInfo.isQuery()) {
-                    resultSets.add(new JdbcThinResultSet(this, -1, pageSize,
-                        true, Collections.<List<Object>>emptyList(), false,
-                        conn.autoCloseServerCursor(), rsInfo.updateCount(), 
closeOnCompletion));
-                }
+                if (!rsInfo.isQuery())
+                    resultSets.add(resultSetForUpdate(rsInfo.updateCount()));
                 else {
                     if (firstRes) {
                         firstRes = false;
@@ -186,6 +199,16 @@ public class JdbcThinStatement implements Statement {
     }
 
     /**
+     * @param cnt Update counter.
+     * @return Result set for given update counter.
+     */
+    private JdbcThinResultSet resultSetForUpdate(long cnt) {
+        return new JdbcThinResultSet(this, -1, pageSize,
+            true, Collections.<List<Object>>emptyList(), false,
+            conn.autoCloseServerCursor(), cnt, closeOnCompletion);
+    }
+
+    /**
      * Sends a file to server in batches via multiple {@link 
JdbcBulkLoadBatchRequest}s.
      *
      * @param cmdRes Result of invoking COPY command: contains server-parsed
@@ -469,6 +492,14 @@ public class JdbcThinStatement implements Statement {
     @Override public void addBatch(String sql) throws SQLException {
         ensureNotClosed();
 
+        batchSize++;
+
+        if (conn.isStream()) {
+            conn.addBatch(sql, null);
+
+            return;
+        }
+
         if (batch == null)
             batch = new ArrayList<>();
 
@@ -479,6 +510,8 @@ public class JdbcThinStatement implements Statement {
     @Override public void clearBatch() throws SQLException {
         ensureNotClosed();
 
+        batchSize = 0;
+
         batch = null;
     }
 
@@ -488,6 +521,14 @@ public class JdbcThinStatement implements Statement {
 
         closeResults();
 
+        if (conn.isStream()) {
+            int[] res = new int[batchSize];
+
+            batchSize = 0;
+
+            return res;
+        }
+
         if (batch == null || batch.isEmpty())
             throw new SQLException("Batch is empty.");
 
@@ -502,6 +543,8 @@ public class JdbcThinStatement implements Statement {
             return res.updateCounts();
         }
         finally {
+            batchSize = 0;
+
             batch = null;
         }
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/692e4888/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinTcpIo.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinTcpIo.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinTcpIo.java
index 7aa6c33..efd7df0 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinTcpIo.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinTcpIo.java
@@ -62,8 +62,11 @@ public class JdbcThinTcpIo {
     /** Version 2.4.0. */
     private static final ClientListenerProtocolVersion VER_2_4_0 = 
ClientListenerProtocolVersion.create(2, 4, 0);
 
+    /** Version 2.5.0. */
+    private static final ClientListenerProtocolVersion VER_2_5_0 = 
ClientListenerProtocolVersion.create(2, 5, 0);
+
     /** Current version. */
-    private static final ClientListenerProtocolVersion CURRENT_VER = VER_2_4_0;
+    private static final ClientListenerProtocolVersion CURRENT_VER = VER_2_5_0;
 
     /** Initial output stream capacity for handshake. */
     private static final int HANDSHAKE_MSG_SIZE = 13;
@@ -265,6 +268,11 @@ public class JdbcThinTcpIo {
         writer.writeBoolean(connProps.isAutoCloseServerCursor());
         writer.writeBoolean(connProps.isLazy());
         writer.writeBoolean(connProps.isSkipReducerOnUpdate());
+        writer.writeBoolean(connProps.isStream());
+        writer.writeBoolean(connProps.isStreamAllowOverwrite());
+        writer.writeInt(connProps.getStreamParallelOperations());
+        writer.writeInt(connProps.getStreamBufferSize());
+        writer.writeLong(connProps.getStreamFlushFrequency());
 
         send(writer.array());
 
@@ -298,7 +306,8 @@ public class JdbcThinTcpIo {
 
             ClientListenerProtocolVersion srvProtocolVer = 
ClientListenerProtocolVersion.create(maj, min, maintenance);
 
-            if (VER_2_3_0.equals(srvProtocolVer) || 
VER_2_1_5.equals(srvProtocolVer))
+            if (VER_2_4_0.equals(srvProtocolVer) || 
VER_2_3_0.equals(srvProtocolVer) ||
+                VER_2_1_5.equals(srvProtocolVer))
                 handshake(srvProtocolVer);
             else if (VER_2_1_0.equals(srvProtocolVer))
                 handshake_2_1_0();

http://git-wip-us.apache.org/repos/asf/ignite/blob/692e4888/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcConnection.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcConnection.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcConnection.java
index b51e0b9..f61ccf9 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcConnection.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcConnection.java
@@ -60,6 +60,7 @@ import 
org.apache.ignite.internal.processors.cache.DynamicCacheDescriptor;
 import org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode;
 import org.apache.ignite.internal.processors.odbc.SqlStateCode;
 import org.apache.ignite.internal.processors.query.GridQueryIndexing;
+import org.apache.ignite.internal.processors.query.IgniteSQLException;
 import org.apache.ignite.internal.processors.query.QueryUtils;
 import 
org.apache.ignite.internal.processors.resource.GridSpringResourceContext;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
@@ -612,10 +613,11 @@ public class JdbcConnection implements Connection {
 
             PreparedStatement nativeStmt = prepareNativeStatement(sql);
 
-            if (!idx.isInsertStatement(nativeStmt)) {
-                throw new SQLException("Only INSERT operations are supported 
in streaming mode",
-                    SqlStateCode.INTERNAL_ERROR,
-                    IgniteQueryErrorCode.UNSUPPORTED_OPERATION);
+            try {
+                idx.checkStatementStreamable(nativeStmt);
+            }
+            catch (IgniteSQLException e) {
+                throw e.toJdbcException();
             }
 
             IgniteDataStreamer streamer = ignite().dataStreamer(cacheName);

http://git-wip-us.apache.org/repos/asf/ignite/blob/692e4888/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcStreamedPreparedStatement.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcStreamedPreparedStatement.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcStreamedPreparedStatement.java
index 408f089..25f55f2 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcStreamedPreparedStatement.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcStreamedPreparedStatement.java
@@ -55,7 +55,7 @@ class JdbcStreamedPreparedStatement extends 
JdbcPreparedStatement {
 
     /** {@inheritDoc} */
     @Override protected void execute0(String sql, Boolean isQuery) throws 
SQLException {
-        assert isQuery != null && !isQuery;
+        assert isQuery == null || !isQuery;
 
         long updCnt = 
conn.ignite().context().query().streamUpdateQuery(conn.cacheName(), 
conn.schemaName(),
             streamer, sql, getArgs());

http://git-wip-us.apache.org/repos/asf/ignite/blob/692e4888/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxyImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxyImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxyImpl.java
index a834022..c5d68b5 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxyImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxyImpl.java
@@ -629,7 +629,7 @@ public class IgniteCacheProxyImpl<K, V> extends 
AsyncSupportAdapter<IgniteCache<
 
             boolean keepBinary = opCtxCall != null && opCtxCall.isKeepBinary();
 
-            return ctx.kernalContext().query().querySqlFields(ctx, qry, 
keepBinary, false);
+            return ctx.kernalContext().query().querySqlFields(ctx, qry, null, 
keepBinary, false);
         }
         catch (Exception e) {
             if (e instanceof CacheException)
@@ -662,7 +662,7 @@ public class IgniteCacheProxyImpl<K, V> extends 
AsyncSupportAdapter<IgniteCache<
 
             if (qry instanceof SqlFieldsQuery)
                 return 
(FieldsQueryCursor<R>)ctx.kernalContext().query().querySqlFields(ctx, 
(SqlFieldsQuery)qry,
-                    keepBinary, true).get(0);
+                    null, keepBinary, true).get(0);
 
             if (qry instanceof ScanQuery)
                 return query((ScanQuery)qry, null, projection(qry.isLocal()));

http://git-wip-us.apache.org/repos/asf/ignite/blob/692e4888/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcConnectionContext.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcConnectionContext.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcConnectionContext.java
index 5841a4d..214d006 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcConnectionContext.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcConnectionContext.java
@@ -28,7 +28,7 @@ import 
org.apache.ignite.internal.processors.odbc.ClientListenerRequestHandler;
 import org.apache.ignite.internal.util.GridSpinBusyLock;
 
 /**
- * ODBC Connection Context.
+ * JDBC Connection Context.
  */
 public class JdbcConnectionContext implements ClientListenerConnectionContext {
     /** Version 2.1.0. */
@@ -38,13 +38,16 @@ public class JdbcConnectionContext implements 
ClientListenerConnectionContext {
     private static final ClientListenerProtocolVersion VER_2_1_5 = 
ClientListenerProtocolVersion.create(2, 1, 5);
 
     /** Version 2.3.1: added "multiple statements query" feature. */
-    public static final ClientListenerProtocolVersion VER_2_3_0 = 
ClientListenerProtocolVersion.create(2, 3, 0);
+    static final ClientListenerProtocolVersion VER_2_3_0 = 
ClientListenerProtocolVersion.create(2, 3, 0);
 
     /** Version 2.4.0: adds default values for columns feature. */
-    public static final ClientListenerProtocolVersion VER_2_4_0 = 
ClientListenerProtocolVersion.create(2, 4, 0);
+    static final ClientListenerProtocolVersion VER_2_4_0 = 
ClientListenerProtocolVersion.create(2, 4, 0);
+
+    /** Version 2.5.0: adds streaming via thin connection. */
+    static final ClientListenerProtocolVersion VER_2_5_0 = 
ClientListenerProtocolVersion.create(2, 5, 0);
 
     /** Current version. */
-    private static final ClientListenerProtocolVersion CURRENT_VER = VER_2_4_0;
+    private static final ClientListenerProtocolVersion CURRENT_VER = VER_2_5_0;
 
     /** Supported versions. */
     private static final Set<ClientListenerProtocolVersion> SUPPORTED_VERS = 
new HashSet<>();
@@ -66,6 +69,7 @@ public class JdbcConnectionContext implements 
ClientListenerConnectionContext {
 
     static {
         SUPPORTED_VERS.add(CURRENT_VER);
+        SUPPORTED_VERS.add(VER_2_4_0);
         SUPPORTED_VERS.add(VER_2_3_0);
         SUPPORTED_VERS.add(VER_2_1_5);
         SUPPORTED_VERS.add(VER_2_1_0);
@@ -113,8 +117,23 @@ public class JdbcConnectionContext implements 
ClientListenerConnectionContext {
         if (ver.compareTo(VER_2_3_0) >= 0)
             skipReducerOnUpdate = reader.readBoolean();
 
+        boolean stream = false;
+        boolean streamAllowOverwrites = false;
+        int streamParOps = 0;
+        int streamBufSize = 0;
+        long streamFlushFreq = 0;
+
+        if (ver.compareTo(VER_2_5_0) >= 0) {
+            stream = reader.readBoolean();
+            streamAllowOverwrites = reader.readBoolean();
+            streamParOps = reader.readInt();
+            streamBufSize = reader.readInt();
+            streamFlushFreq = reader.readLong();
+        }
+
         handler = new JdbcRequestHandler(ctx, busyLock, maxCursors, 
distributedJoins, enforceJoinOrder,
-            collocated, replicatedOnly, autoCloseCursors, lazyExec, 
skipReducerOnUpdate, ver);
+            collocated, replicatedOnly, autoCloseCursors, lazyExec, 
skipReducerOnUpdate, stream, streamAllowOverwrites,
+            streamParOps, streamBufSize, streamFlushFreq, ver);
 
         parser = new JdbcMessageParser(ctx);
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/692e4888/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcRequestHandler.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcRequestHandler.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcRequestHandler.java
index 59fc06b..8786f26 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcRequestHandler.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcRequestHandler.java
@@ -53,6 +53,7 @@ import 
org.apache.ignite.internal.processors.query.GridQueryProperty;
 import org.apache.ignite.internal.processors.query.GridQueryTypeDescriptor;
 import org.apache.ignite.internal.processors.query.IgniteSQLException;
 import org.apache.ignite.internal.processors.query.QueryUtils;
+import org.apache.ignite.internal.processors.query.SqlClientContext;
 import org.apache.ignite.internal.util.GridSpinBusyLock;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.X;
@@ -88,6 +89,9 @@ public class JdbcRequestHandler implements 
ClientListenerRequestHandler {
     /** Kernel context. */
     private final GridKernalContext ctx;
 
+    /** Client context. */
+    private final SqlClientContext cliCtx;
+
     /** Logger. */
     private final IgniteLogger log;
 
@@ -103,24 +107,9 @@ public class JdbcRequestHandler implements 
ClientListenerRequestHandler {
     /** Current bulk load processors. */
     private final ConcurrentHashMap<Long, JdbcBulkLoadProcessor> 
bulkLoadRequests = new ConcurrentHashMap<>();
 
-    /** Distributed joins flag. */
-    private final boolean distributedJoins;
-
-    /** Enforce join order flag. */
-    private final boolean enforceJoinOrder;
-
-    /** Collocated flag. */
-    private final boolean collocated;
-
     /** Replicated only flag. */
     private final boolean replicatedOnly;
 
-    /** Lazy query execution flag. */
-    private final boolean lazy;
-
-    /** Skip reducer on update flag. */
-    private final boolean skipReducerOnUpdate;
-
     /** Automatic close of cursors. */
     private final boolean autoCloseCursors;
 
@@ -140,22 +129,38 @@ public class JdbcRequestHandler implements 
ClientListenerRequestHandler {
      * @param autoCloseCursors Flag to automatically close server cursors.
      * @param lazy Lazy query execution flag.
      * @param skipReducerOnUpdate Skip reducer on update flag.
+     * @param stream Streaming flag.
+     * @param streamAllowOverwrites Streaming overwrites flag.
+     * @param streamParOps Number of parallel ops per cluster node during 
streaming.
+     * @param streamBufSize Buffer size per cluster node during streaming.
+     * @param streamFlushFreq Data streamers' flush timeout.
      * @param protocolVer Protocol version.
      */
     public JdbcRequestHandler(GridKernalContext ctx, GridSpinBusyLock 
busyLock, int maxCursors,
         boolean distributedJoins, boolean enforceJoinOrder, boolean 
collocated, boolean replicatedOnly,
         boolean autoCloseCursors, boolean lazy, boolean skipReducerOnUpdate,
+        boolean stream, boolean streamAllowOverwrites, int streamParOps, int 
streamBufSize, long streamFlushFreq,
         ClientListenerProtocolVersion protocolVer) {
         this.ctx = ctx;
+
+        this.cliCtx = new SqlClientContext(
+            ctx,
+            distributedJoins,
+            enforceJoinOrder,
+            collocated,
+            lazy,
+            skipReducerOnUpdate,
+            stream,
+            streamAllowOverwrites,
+            streamParOps,
+            streamBufSize,
+            streamFlushFreq
+        );
+
         this.busyLock = busyLock;
         this.maxCursors = maxCursors;
-        this.distributedJoins = distributedJoins;
-        this.enforceJoinOrder = enforceJoinOrder;
-        this.collocated = collocated;
         this.replicatedOnly = replicatedOnly;
         this.autoCloseCursors = autoCloseCursors;
-        this.lazy = lazy;
-        this.skipReducerOnUpdate = skipReducerOnUpdate;
         this.protocolVer = protocolVer;
 
         log = ctx.log(getClass());
@@ -301,6 +306,8 @@ public class JdbcRequestHandler implements 
ClientListenerRequestHandler {
                 }
 
                 bulkLoadRequests.clear();
+
+                U.close(cliCtx, log);
             }
             finally {
                 busyLock.leaveBusy();
@@ -326,6 +333,8 @@ public class JdbcRequestHandler implements 
ClientListenerRequestHandler {
 
         long qryId = QRY_ID_GEN.getAndIncrement();
 
+        assert !cliCtx.isStream();
+
         try {
             String sql = req.sqlQuery();
 
@@ -347,17 +356,17 @@ public class JdbcRequestHandler implements 
ClientListenerRequestHandler {
 
                     qry = new SqlFieldsQueryEx(sql, false);
 
-                    if (skipReducerOnUpdate)
+                    if (cliCtx.isSkipReducerOnUpdate())
                         ((SqlFieldsQueryEx)qry).setSkipReducerOnUpdate(true);
             }
 
             qry.setArgs(req.arguments());
 
-            qry.setDistributedJoins(distributedJoins);
-            qry.setEnforceJoinOrder(enforceJoinOrder);
-            qry.setCollocated(collocated);
+            qry.setDistributedJoins(cliCtx.isDistributedJoins());
+            qry.setEnforceJoinOrder(cliCtx.isEnforceJoinOrder());
+            qry.setCollocated(cliCtx.isCollocated());
             qry.setReplicatedOnly(replicatedOnly);
-            qry.setLazy(lazy);
+            qry.setLazy(cliCtx.isLazy());
 
             if (req.pageSize() <= 0)
                 return new JdbcResponse(IgniteQueryErrorCode.UNKNOWN, "Invalid 
fetch size: " + req.pageSize());
@@ -371,7 +380,7 @@ public class JdbcRequestHandler implements 
ClientListenerRequestHandler {
 
             qry.setSchema(schemaName);
 
-            List<FieldsQueryCursor<List<?>>> results = 
ctx.query().querySqlFields(qry, true,
+            List<FieldsQueryCursor<List<?>>> results = 
ctx.query().querySqlFields(null, qry, cliCtx, true,
                 protocolVer.compareTo(VER_2_3_0) < 0);
 
             FieldsQueryCursor<List<?>> fieldsCur = results.get(0);
@@ -569,11 +578,11 @@ public class JdbcRequestHandler implements 
ClientListenerRequestHandler {
 
                 qry = new SqlFieldsQueryEx(q.sql(), false);
 
-                qry.setDistributedJoins(distributedJoins);
-                qry.setEnforceJoinOrder(enforceJoinOrder);
-                qry.setCollocated(collocated);
+                qry.setDistributedJoins(cliCtx.isDistributedJoins());
+                qry.setEnforceJoinOrder(cliCtx.isEnforceJoinOrder());
+                qry.setCollocated(cliCtx.isCollocated());
                 qry.setReplicatedOnly(replicatedOnly);
-                qry.setLazy(lazy);
+                qry.setLazy(cliCtx.isLazy());
 
                 qry.setSchema(schemaName);
             }
@@ -601,10 +610,21 @@ public class JdbcRequestHandler implements 
ClientListenerRequestHandler {
      * @param updCntsAcc Per query rows updates counter.
      * @param firstErr First error data - code and message.
      */
+    @SuppressWarnings("ForLoopReplaceableByForEach")
     private void executeBatchedQuery(SqlFieldsQueryEx qry, List<Integer> 
updCntsAcc,
         IgniteBiTuple<Integer, String> firstErr) {
         try {
-            List<FieldsQueryCursor<List<?>>> qryRes = 
ctx.query().querySqlFields(qry, true, true);
+            if (cliCtx.isStream()) {
+                List<Long> cnt = 
ctx.query().streamBatchedUpdateQuery(qry.getSchema(), cliCtx, qry.getSql(),
+                    qry.batchedArguments());
+
+                for (int i = 0; i < cnt.size(); i++)
+                    updCntsAcc.add(cnt.get(i).intValue());
+
+                return;
+            }
+
+            List<FieldsQueryCursor<List<?>>> qryRes = 
ctx.query().querySqlFields(null, qry, cliCtx, true, true);
 
             for (FieldsQueryCursor<List<?>> cur : qryRes) {
                 if (cur instanceof BulkLoadContextCursor)

http://git-wip-us.apache.org/repos/asf/ignite/blob/692e4888/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java
index 6b425a1..dedd075 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java
@@ -76,33 +76,46 @@ public interface GridQueryIndexing {
      * Detect whether SQL query should be executed in distributed or local 
manner and execute it.
      * @param schemaName Schema name.
      * @param qry Query.
+     * @param cliCtx Client context.
      * @param keepBinary Keep binary flag.
      * @param failOnMultipleStmts Whether an exception should be thrown for 
multiple statements query.
-     * @param cancel Query cancel state handler.
-     * @return Cursor.
+     * @param cancel Query cancel state handler.    @return Cursor.
      */
-    public List<FieldsQueryCursor<List<?>>> querySqlFields(String schemaName, 
SqlFieldsQuery qry, boolean keepBinary,
-        boolean failOnMultipleStmts, GridQueryCancel cancel);
+    public List<FieldsQueryCursor<List<?>>> querySqlFields(String schemaName, 
SqlFieldsQuery qry,
+        SqlClientContext cliCtx, boolean keepBinary, boolean 
failOnMultipleStmts, GridQueryCancel cancel);
 
     /**
-     * Perform a MERGE statement using data streamer as receiver.
+     * Execute an INSERT statement using data streamer as receiver.
      *
      * @param schemaName Schema name.
      * @param qry Query.
      * @param params Query parameters.
      * @param streamer Data streamer to feed data to.
-     * @return Query result.
+     * @return Update counter.
      * @throws IgniteCheckedException If failed.
      */
     public long streamUpdateQuery(String schemaName, String qry, @Nullable 
Object[] params,
         IgniteDataStreamer<?, ?> streamer) throws IgniteCheckedException;
 
     /**
+     * Execute a batched INSERT statement using data streamer as receiver.
+     *
+     * @param schemaName Schema name.
+     * @param qry Query.
+     * @param params Query parameters.
+     * @param cliCtx Client connection context.
+     * @return Update counters.
+     * @throws IgniteCheckedException If failed.
+     */
+    public List<Long> streamBatchedUpdateQuery(String schemaName, String qry, 
List<Object[]> params,
+        SqlClientContext cliCtx) throws IgniteCheckedException;
+
+    /**
      * Executes regular query.
      *
      * @param schemaName Schema name.
      * @param cacheName Cache name.
-     *@param qry Query.
+     * @param qry Query.
      * @param filter Cache name and key filter.
      * @param keepBinary Keep binary flag.    @return Cursor.
      */
@@ -313,12 +326,11 @@ public interface GridQueryIndexing {
     public String schema(String cacheName);
 
     /**
-     * Check if passed statement is insert statemtn.
+     * Check if passed statement is insert statement eligible for streaming, 
throw an {@link IgniteSQLException} if not.
      *
      * @param nativeStmt Native statement.
-     * @return {@code True} if insert.
      */
-    public boolean isInsertStatement(PreparedStatement nativeStmt);
+    public void checkStatementStreamable(PreparedStatement nativeStmt);
 
     /**
      * Return row cache cleaner.

http://git-wip-us.apache.org/repos/asf/ignite/blob/692e4888/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
index f666cdd..fcea905 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
@@ -100,6 +100,7 @@ import 
org.apache.ignite.internal.util.tostring.GridToStringExclude;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.T2;
 import org.apache.ignite.internal.util.typedef.T3;
+import org.apache.ignite.internal.util.typedef.internal.A;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
@@ -189,6 +190,9 @@ public class GridQueryProcessor extends 
GridProcessorAdapter {
     /** Pending status messages. */
     private final LinkedList<SchemaOperationStatusMessage> pendingMsgs = new 
LinkedList<>();
 
+    /** All currently open client contexts. */
+    private final Set<SqlClientContext> cliCtxs = 
Collections.newSetFromMap(new ConcurrentHashMap<>());
+
     /** Current cache that has a query running on it. */
     private final ThreadLocal<GridCacheContext> curCache = new ThreadLocal<>();
 
@@ -259,11 +263,15 @@ public class GridQueryProcessor extends 
GridProcessorAdapter {
 
         if (cancel && idx != null) {
             try {
-                while (!busyLock.tryBlock(500))
+                while (!busyLock.tryBlock(500)) {
                     idx.cancelAllQueries();
 
+                    closeAllSqlStreams();
+                }
+
                 return;
-            } catch (InterruptedException ignored) {
+            }
+            catch (InterruptedException ignored) {
                 U.warn(log, "Interrupted while waiting for active queries 
cancellation.");
 
                 Thread.currentThread().interrupt();
@@ -347,6 +355,32 @@ public class GridQueryProcessor extends 
GridProcessorAdapter {
     }
 
     /**
+     * @param cliCtx Client context to register.
+     */
+    void registerClientContext(SqlClientContext cliCtx) {
+        A.notNull(cliCtx, "cliCtx");
+
+        cliCtxs.add(cliCtx);
+    }
+
+    /**
+     * @param cliCtx Client context to register.
+     */
+    void unregisterClientContext(SqlClientContext cliCtx) {
+        A.notNull(cliCtx, "cliCtx");
+
+        cliCtxs.remove(cliCtx);
+    }
+
+    /**
+     * Flush streamers on all currently open client contexts.
+     */
+    private void closeAllSqlStreams() {
+        for (SqlClientContext cliCtx : cliCtxs)
+            U.close(cliCtx, log);
+    }
+
+    /**
      * Process schema propose message from discovery thread.
      *
      * @param msg Message.
@@ -1974,13 +2008,7 @@ public class GridQueryProcessor extends 
GridProcessorAdapter {
      */
     public List<FieldsQueryCursor<List<?>>> querySqlFields(final 
SqlFieldsQuery qry, final boolean keepBinary,
         final boolean failOnMultipleStmts) {
-        return querySqlFields(null, qry, keepBinary, failOnMultipleStmts);
-    }
-
-    @SuppressWarnings("unchecked")
-    public FieldsQueryCursor<List<?>> querySqlFields(final 
GridCacheContext<?,?> cctx, final SqlFieldsQuery qry,
-        final boolean keepBinary) {
-        return querySqlFields(cctx, qry, keepBinary, true).get(0);
+        return querySqlFields(null, qry, null, keepBinary, 
failOnMultipleStmts);
     }
 
     /**
@@ -1991,7 +2019,7 @@ public class GridQueryProcessor extends 
GridProcessorAdapter {
      * @return Cursor.
      */
     public FieldsQueryCursor<List<?>> querySqlFields(final SqlFieldsQuery qry, 
final boolean keepBinary) {
-        return querySqlFields(null, qry, keepBinary, true).get(0);
+        return querySqlFields(null, qry, null, keepBinary, true).get(0);
     }
 
     /**
@@ -1999,14 +2027,16 @@ public class GridQueryProcessor extends 
GridProcessorAdapter {
      *
      * @param cctx Cache context.
      * @param qry Query.
+     * @param cliCtx Client context.
      * @param keepBinary Keep binary flag.
      * @param failOnMultipleStmts If {@code true} the method must throws 
exception when query contains
      *      more then one SQL statement.
      * @return Cursor.
      */
     @SuppressWarnings("unchecked")
-    public List<FieldsQueryCursor<List<?>>> querySqlFields(@Nullable final 
GridCacheContext<?,?> cctx,
-        final SqlFieldsQuery qry, final boolean keepBinary, final boolean 
failOnMultipleStmts) {
+    public List<FieldsQueryCursor<List<?>>> querySqlFields(@Nullable final 
GridCacheContext<?, ?> cctx,
+        final SqlFieldsQuery qry, final SqlClientContext cliCtx, final boolean 
keepBinary,
+        final boolean failOnMultipleStmts) {
         checkxEnabled();
 
         validateSqlFieldsQuery(qry);
@@ -2034,7 +2064,7 @@ public class GridQueryProcessor extends 
GridProcessorAdapter {
                     GridQueryCancel cancel = new GridQueryCancel();
 
                     List<FieldsQueryCursor<List<?>>> res =
-                        idx.querySqlFields(schemaName, qry, keepBinary, 
failOnMultipleStmts, cancel);
+                        idx.querySqlFields(schemaName, qry, cliCtx, 
keepBinary, failOnMultipleStmts, cancel);
 
                     if (cctx != null)
                         sendQueryExecutedEvent(qry.getSql(), qry.getArgs(), 
cctx);
@@ -2073,7 +2103,7 @@ public class GridQueryProcessor extends 
GridProcessorAdapter {
      * @param schemaName Schema name.
      * @param streamer Data streamer.
      * @param qry Query.
-     * @return Iterator.
+     * @return Update counter.
      */
     public long streamUpdateQuery(@Nullable final String cacheName, final 
String schemaName,
         final IgniteDataStreamer<?, ?> streamer, final String qry, final 
Object[] args) {
@@ -2100,6 +2130,33 @@ public class GridQueryProcessor extends 
GridProcessorAdapter {
     }
 
     /**
+     * @param schemaName Schema name.
+     * @param cliCtx Client context.
+     * @param qry Query.
+     * @param args Query arguments.
+     * @return Update counters.
+     */
+    public List<Long> streamBatchedUpdateQuery(final String schemaName, final 
SqlClientContext cliCtx,
+        final String qry, final List<Object[]> args) {
+        if (!busyLock.enterBusy())
+            throw new IllegalStateException("Failed to execute query (grid is 
stopping).");
+
+        try {
+            return executeQuery(GridCacheQueryType.SQL_FIELDS, qry, null, new 
IgniteOutClosureX<List<Long>>() {
+                @Override public List<Long> applyx() throws 
IgniteCheckedException {
+                    return idx.streamBatchedUpdateQuery(schemaName, qry, args, 
cliCtx);
+                }
+            }, true);
+        }
+        catch (IgniteCheckedException e) {
+            throw new CacheException(e);
+        }
+        finally {
+            busyLock.leaveBusy();
+        }
+    }
+
+    /**
      * Execute distributed SQL query.
      *
      * @param cctx Cache context.

Reply via email to