IGNITE-6917: Implemented SQL COPY command

This closes #3419


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/25d38cc9
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/25d38cc9
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/25d38cc9

Branch: refs/heads/ignite-7485-2
Commit: 25d38cc98c4ca098679d69ad90fc8bed66e6916d
Parents: faf50f1
Author: gg-shq <kshiro...@gridgain.com>
Authored: Wed Feb 7 14:28:04 2018 +0300
Committer: Igor Sapego <isap...@gridgain.com>
Committed: Wed Feb 7 14:30:39 2018 +0300

----------------------------------------------------------------------
 .../internal/jdbc2/JdbcBulkLoadSelfTest.java    | 185 ++++++
 .../ignite/jdbc/JdbcErrorsAbstractSelfTest.java |   2 +-
 .../jdbc/suite/IgniteJdbcDriverTestSuite.java   |  14 +
 .../thin/JdbcThinBulkLoadAbstractSelfTest.java  | 601 +++++++++++++++++++
 ...inBulkLoadAtomicPartitionedNearSelfTest.java |  39 ++
 ...bcThinBulkLoadAtomicPartitionedSelfTest.java |  39 ++
 ...dbcThinBulkLoadAtomicReplicatedSelfTest.java |  39 ++
 ...oadTransactionalPartitionedNearSelfTest.java |  39 ++
 ...ulkLoadTransactionalPartitionedSelfTest.java |  39 ++
 ...BulkLoadTransactionalReplicatedSelfTest.java |  39 ++
 .../JdbcThinDynamicIndexAbstractSelfTest.java   |   1 -
 .../clients/src/test/resources/bulkload0.csv    |   0
 .../clients/src/test/resources/bulkload1.csv    |   1 +
 .../clients/src/test/resources/bulkload2.csv    |   2 +
 .../src/test/resources/bulkload2_utf.csv        |   2 +
 .../cache/query/BulkLoadContextCursor.java      |  97 +++
 .../internal/jdbc/thin/JdbcThinStatement.java   |  68 ++-
 .../ignite/internal/jdbc2/JdbcQueryTask.java    |  12 +-
 .../bulkload/BulkLoadAckClientParameters.java   |  92 +++
 .../bulkload/BulkLoadCacheWriter.java           |  31 +
 .../processors/bulkload/BulkLoadCsvFormat.java  | 159 +++++
 .../processors/bulkload/BulkLoadCsvParser.java  |  65 ++
 .../processors/bulkload/BulkLoadFormat.java     |  33 +
 .../processors/bulkload/BulkLoadParser.java     |  61 ++
 .../processors/bulkload/BulkLoadProcessor.java  | 104 ++++
 .../bulkload/BulkLoadStreamerWriter.java        |  65 ++
 .../bulkload/pipeline/CharsetDecoderBlock.java  | 132 ++++
 .../pipeline/CsvLineProcessorBlock.java         |  70 +++
 .../bulkload/pipeline/LineSplitterBlock.java    |  72 +++
 .../bulkload/pipeline/PipelineBlock.java        |  66 ++
 .../bulkload/pipeline/StrListAppenderBlock.java |  52 ++
 .../odbc/jdbc/JdbcBulkLoadAckResult.java        | 111 ++++
 .../odbc/jdbc/JdbcBulkLoadBatchRequest.java     | 183 ++++++
 .../odbc/jdbc/JdbcBulkLoadProcessor.java        | 144 +++++
 .../processors/odbc/jdbc/JdbcRequest.java       |   7 +
 .../odbc/jdbc/JdbcRequestHandler.java           |  90 ++-
 .../processors/odbc/jdbc/JdbcResult.java        |   8 +
 .../apache/ignite/internal/sql/SqlKeyword.java  |  15 +
 .../apache/ignite/internal/sql/SqlParser.java   |  18 +-
 .../sql/command/SqlBulkLoadCommand.java         | 273 +++++++++
 .../internal/sql/SqlParserBulkLoadSelfTest.java |  70 +++
 .../query/h2/DmlStatementsProcessor.java        |  99 +++
 .../processors/query/h2/IgniteH2Indexing.java   |  35 +-
 .../query/h2/ddl/DdlStatementsProcessor.java    |   2 +
 .../processors/query/h2/dml/UpdateMode.java     |  11 +-
 .../processors/query/h2/dml/UpdatePlan.java     |  20 +-
 .../query/h2/dml/UpdatePlanBuilder.java         |  86 +++
 .../IgniteCacheQuerySelfTestSuite.java          |   2 +
 parent/pom.xml                                  |   3 +-
 49 files changed, 3361 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/25d38cc9/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcBulkLoadSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcBulkLoadSelfTest.java
 
b/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcBulkLoadSelfTest.java
new file mode 100644
index 0000000..d9506cf
--- /dev/null
+++ 
b/modules/clients/src/test/java/org/apache/ignite/internal/jdbc2/JdbcBulkLoadSelfTest.java
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.jdbc2;
+
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.cache.query.annotations.QuerySqlField;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.ConnectorConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+import java.io.Serializable;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.Collections;
+import java.util.Properties;
+import java.util.concurrent.Callable;
+
+import static org.apache.ignite.IgniteJdbcDriver.CFG_URL_PREFIX;
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+
+/** COPY command test for the regular JDBC driver. */
+public class JdbcBulkLoadSelfTest extends GridCommonAbstractTest {
+    /** JDBC URL. */
+    private static final String BASE_URL = CFG_URL_PREFIX +
+        "cache=default@modules/clients/src/test/config/jdbc-config.xml";
+
+    /** Connection. */
+    protected Connection conn;
+
+    /** The logger. */
+    protected transient IgniteLogger log;
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) 
throws Exception {
+        return getConfiguration0(gridName);
+    }
+
+    /**
+     * @param gridName Grid name.
+     * @return Grid configuration used for starting the grid.
+     * @throws Exception If failed.
+     */
+    private IgniteConfiguration getConfiguration0(String gridName) throws 
Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        CacheConfiguration<?,?> cache = defaultCacheConfiguration();
+
+        cache.setCacheMode(PARTITIONED);
+        cache.setBackups(1);
+        cache.setWriteSynchronizationMode(FULL_SYNC);
+        cache.setIndexedTypes(
+            Integer.class, Person.class
+        );
+
+        cfg.setCacheConfiguration(cache);
+        cfg.setLocalHost("127.0.0.1");
+
+        TcpDiscoverySpi disco = new TcpDiscoverySpi();
+
+        TcpDiscoveryVmIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+        ipFinder.setAddresses(Collections.singleton("127.0.0.1:47500..47501"));
+
+        disco.setIpFinder(ipFinder);
+
+        cfg.setDiscoverySpi(disco);
+
+        cfg.setConnectorConfiguration(new ConnectorConfiguration());
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        startGrids(2);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTestsStopped() throws Exception {
+        stopAllGrids();
+    }
+
+    /**
+     * Establishes the JDBC connection.
+     *
+     * @return Connection to use for the test.
+     * @throws Exception if failed.
+     */
+    private Connection createConnection() throws Exception {
+        Properties props = new Properties();
+
+        return DriverManager.getConnection(BASE_URL, props);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        U.closeQuiet(conn);
+
+        ignite(0).cache(DEFAULT_CACHE_NAME).clear();
+
+        super.afterTest();
+    }
+
+    /**
+     * This is more a placeholder for implementation of IGNITE-7553.
+     *
+     * @throws Exception if failed.
+     */
+    public void testBulkLoadThrows() throws Exception {
+        GridTestUtils.assertThrows(null, new Callable<Object>() {
+            @Override public Object call() throws Exception {
+                conn = createConnection();
+
+                try (Statement stmt = conn.createStatement()) {
+                    stmt.executeUpdate("copy from \"dummy.csv\" into Person" +
+                        " (_key, id, firstName, lastName) format csv");
+
+                    return null;
+                }
+            }
+        }, SQLException.class, "COPY command is currently supported only in 
thin JDBC driver.");
+    }
+
+    /**
+     * A test class for creating a query entity.
+     */
+    @SuppressWarnings("UnusedDeclaration")
+    private static class Person implements Serializable {
+        /** ID. */
+        @QuerySqlField
+        private final int id;
+
+        /** First name. */
+        @QuerySqlField(index = false)
+        private final String firstName;
+
+        /** Last name. */
+        @QuerySqlField(index = false)
+        private final String lastName;
+
+        /** Age. */
+        @QuerySqlField
+        private final int age;
+
+        /**
+         * @param id ID.
+         * @param firstName First name
+         * @param lastName Last name
+         * @param age Age.
+         */
+        private Person(int id, String firstName, String lastName, int age) {
+            assert !F.isEmpty(firstName);
+            assert !F.isEmpty(lastName);
+            assert age > 0;
+
+            this.id = id;
+            this.firstName = firstName;
+            this.lastName = lastName;
+            this.age = age;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/25d38cc9/modules/clients/src/test/java/org/apache/ignite/jdbc/JdbcErrorsAbstractSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/clients/src/test/java/org/apache/ignite/jdbc/JdbcErrorsAbstractSelfTest.java
 
b/modules/clients/src/test/java/org/apache/ignite/jdbc/JdbcErrorsAbstractSelfTest.java
index 49746b6..2059408 100644
--- 
a/modules/clients/src/test/java/org/apache/ignite/jdbc/JdbcErrorsAbstractSelfTest.java
+++ 
b/modules/clients/src/test/java/org/apache/ignite/jdbc/JdbcErrorsAbstractSelfTest.java
@@ -107,7 +107,7 @@ public abstract class JdbcErrorsAbstractSelfTest extends 
GridCommonAbstractTest
      */
     public void testDmlErrors() throws SQLException {
         checkErrorState("INSERT INTO \"test\".INTEGER(_key, _val) values(1, 
null)", "22004",
-            "Value for INSERT, MERGE, or UPDATE must not be null");
+            "Value for INSERT, COPY, MERGE, or UPDATE must not be null");
 
         checkErrorState("INSERT INTO \"test\".INTEGER(_key, _val) values(1, 
'zzz')", "0700B",
             "Value conversion failed [from=java.lang.String, 
to=java.lang.Integer]");

http://git-wip-us.apache.org/repos/asf/ignite/blob/25d38cc9/modules/clients/src/test/java/org/apache/ignite/jdbc/suite/IgniteJdbcDriverTestSuite.java
----------------------------------------------------------------------
diff --git 
a/modules/clients/src/test/java/org/apache/ignite/jdbc/suite/IgniteJdbcDriverTestSuite.java
 
b/modules/clients/src/test/java/org/apache/ignite/jdbc/suite/IgniteJdbcDriverTestSuite.java
index ff4d69f..656e218 100644
--- 
a/modules/clients/src/test/java/org/apache/ignite/jdbc/suite/IgniteJdbcDriverTestSuite.java
+++ 
b/modules/clients/src/test/java/org/apache/ignite/jdbc/suite/IgniteJdbcDriverTestSuite.java
@@ -35,6 +35,12 @@ import org.apache.ignite.jdbc.JdbcResultSetSelfTest;
 import org.apache.ignite.jdbc.JdbcStatementSelfTest;
 import org.apache.ignite.jdbc.thin.JdbcThinAutoCloseServerCursorTest;
 import org.apache.ignite.jdbc.thin.JdbcThinBatchSelfTest;
+import 
org.apache.ignite.jdbc.thin.JdbcThinBulkLoadAtomicPartitionedNearSelfTest;
+import org.apache.ignite.jdbc.thin.JdbcThinBulkLoadAtomicPartitionedSelfTest;
+import org.apache.ignite.jdbc.thin.JdbcThinBulkLoadAtomicReplicatedSelfTest;
+import 
org.apache.ignite.jdbc.thin.JdbcThinBulkLoadTransactionalPartitionedNearSelfTest;
+import 
org.apache.ignite.jdbc.thin.JdbcThinBulkLoadTransactionalPartitionedSelfTest;
+import 
org.apache.ignite.jdbc.thin.JdbcThinBulkLoadTransactionalReplicatedSelfTest;
 import org.apache.ignite.jdbc.thin.JdbcThinComplexDmlDdlSelfTest;
 import org.apache.ignite.jdbc.thin.JdbcThinComplexQuerySelfTest;
 import org.apache.ignite.jdbc.thin.JdbcThinConnectionSelfTest;
@@ -154,6 +160,14 @@ public class IgniteJdbcDriverTestSuite extends TestSuite {
         suite.addTest(new 
TestSuite(JdbcThinDynamicIndexTransactionalPartitionedSelfTest.class));
         suite.addTest(new 
TestSuite(JdbcThinDynamicIndexTransactionalReplicatedSelfTest.class));
 
+        // New thin JDBC driver, DML tests
+        suite.addTest(new 
TestSuite(JdbcThinBulkLoadAtomicPartitionedNearSelfTest.class));
+        suite.addTest(new 
TestSuite(JdbcThinBulkLoadAtomicPartitionedSelfTest.class));
+        suite.addTest(new 
TestSuite(JdbcThinBulkLoadAtomicReplicatedSelfTest.class));
+        suite.addTest(new 
TestSuite(JdbcThinBulkLoadTransactionalPartitionedNearSelfTest.class));
+        suite.addTest(new 
TestSuite(JdbcThinBulkLoadTransactionalPartitionedSelfTest.class));
+        suite.addTest(new 
TestSuite(JdbcThinBulkLoadTransactionalReplicatedSelfTest.class));
+
         // New thin JDBC driver, full SQL tests
         suite.addTest(new TestSuite(JdbcThinComplexDmlDdlSelfTest.class));
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/25d38cc9/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinBulkLoadAbstractSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinBulkLoadAbstractSelfTest.java
 
b/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinBulkLoadAbstractSelfTest.java
new file mode 100644
index 0000000..761f700
--- /dev/null
+++ 
b/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinBulkLoadAbstractSelfTest.java
@@ -0,0 +1,601 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.jdbc.thin;
+
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.QueryEntity;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.NearCacheConfiguration;
+import org.apache.ignite.internal.processors.query.QueryUtils;
+import org.apache.ignite.testframework.GridTestUtils;
+
+import java.sql.BatchUpdateException;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Objects;
+import java.util.concurrent.Callable;
+
+import static 
org.apache.ignite.IgniteSystemProperties.IGNITE_SQL_PARSER_DISABLE_H2_FALLBACK;
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+import static org.apache.ignite.internal.util.IgniteUtils.resolveIgnitePath;
+
+/**
+ * COPY statement tests.
+ */
+public abstract class JdbcThinBulkLoadAbstractSelfTest extends 
JdbcThinAbstractDmlStatementSelfTest {
+    /** Default table name. */
+    private static final String TBL_NAME = "Person";
+
+    /** JDBC statement. */
+    private Statement stmt;
+
+    /** A CSV file with zero records */
+    private static final String BULKLOAD_EMPTY_CSV_FILE =
+        
Objects.requireNonNull(resolveIgnitePath("/modules/clients/src/test/resources/bulkload0.csv"))
+            .getAbsolutePath();
+
+    /** A CSV file with one record. */
+    private static final String BULKLOAD_ONE_LINE_CSV_FILE =
+        
Objects.requireNonNull(resolveIgnitePath("/modules/clients/src/test/resources/bulkload1.csv"))
+            .getAbsolutePath();
+
+    /** A CSV file with two records. */
+    private static final String BULKLOAD_TWO_LINES_CSV_FILE =
+        
Objects.requireNonNull(resolveIgnitePath("/modules/clients/src/test/resources/bulkload2.csv"))
+            .getAbsolutePath();
+
+    /** A file with UTF records. */
+    private static final String BULKLOAD_UTF_CSV_FILE =
+        
Objects.requireNonNull(resolveIgnitePath("/modules/clients/src/test/resources/bulkload2_utf.csv"))
+            .getAbsolutePath();
+
+    /** Basic COPY statement used in majority of the tests. */
+    public static final String BASIC_SQL_COPY_STMT =
+        "copy from \"" + BULKLOAD_TWO_LINES_CSV_FILE + "\"" +
+            " into " + TBL_NAME +
+            " (_key, age, firstName, lastName)" +
+            " format csv";
+
+    /** {@inheritDoc} */
+    @Override protected CacheConfiguration cacheConfig() {
+        return cacheConfigWithIndexedTypes();
+    }
+
+    /**
+     * Creates cache configuration with {@link QueryEntity} created
+     * using {@link CacheConfiguration#setIndexedTypes(Class[])} call.
+     *
+     * @return The cache configuration.
+     */
+    @SuppressWarnings("unchecked")
+    private CacheConfiguration cacheConfigWithIndexedTypes() {
+        CacheConfiguration<?,?> cache = defaultCacheConfiguration();
+
+        cache.setCacheMode(cacheMode());
+        cache.setAtomicityMode(atomicityMode());
+        cache.setWriteSynchronizationMode(FULL_SYNC);
+
+        if (cacheMode() == PARTITIONED)
+            cache.setBackups(1);
+
+        if (nearCache())
+            cache.setNearConfiguration(new NearCacheConfiguration());
+
+        cache.setIndexedTypes(
+            String.class, Person.class
+        );
+
+        return cache;
+    }
+
+    /**
+     * Returns true if we are testing near cache.
+     *
+     * @return true if we are testing near cache.
+     */
+    protected abstract boolean nearCache();
+
+    /**
+     * Returns cache atomicity mode we are testing.
+     *
+     * @return The cache atomicity mode we are testing.
+     */
+    protected abstract CacheAtomicityMode atomicityMode();
+
+    /**
+     * Returns cache mode we are testing.
+     *
+     * @return The cache mode we are testing.
+     */
+    protected abstract CacheMode cacheMode();
+
+    /**
+     * Creates cache configuration with {@link QueryEntity} created
+     * using {@link CacheConfiguration#setQueryEntities(Collection)} call.
+     *
+     * @return The cache configuration.
+     */
+    private CacheConfiguration cacheConfigWithQueryEntity() {
+        CacheConfiguration<?,?> cache = defaultCacheConfiguration();
+
+        cache.setCacheMode(PARTITIONED);
+        cache.setBackups(1);
+        cache.setWriteSynchronizationMode(FULL_SYNC);
+
+        QueryEntity e = new QueryEntity();
+
+        e.setKeyType(String.class.getName());
+        e.setValueType("Person");
+
+        e.addQueryField("id", Integer.class.getName(), null);
+        e.addQueryField("age", Integer.class.getName(), null);
+        e.addQueryField("firstName", String.class.getName(), null);
+        e.addQueryField("lastName", String.class.getName(), null);
+
+        cache.setQueryEntities(Collections.singletonList(e));
+
+        return cache;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        super.beforeTest();
+
+        System.setProperty(IGNITE_SQL_PARSER_DISABLE_H2_FALLBACK, "TRUE");
+
+        stmt = conn.createStatement();
+
+        assertNotNull(stmt);
+        assertFalse(stmt.isClosed());
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        if (stmt != null && !stmt.isClosed())
+            stmt.close();
+
+        assertTrue(stmt.isClosed());
+
+        System.clearProperty(IGNITE_SQL_PARSER_DISABLE_H2_FALLBACK);
+
+        super.afterTest();
+    }
+
+    /**
+     * Dead-on-arrival test. Imports two-entry CSV file into a table and checks
+     * the created entries using SELECT statement.
+     *
+     * @throws SQLException If failed.
+     */
+    public void testBasicStatement() throws SQLException {
+        int updatesCnt = stmt.executeUpdate(BASIC_SQL_COPY_STMT);
+
+        assertEquals(2, updatesCnt);
+
+        checkCacheContents(TBL_NAME, true, 2);
+    }
+
+    /**
+     * Imports two-entry CSV file with UTF-8 characters into a table and checks
+     * the created entries using SELECT statement.
+     *
+     * @throws SQLException If failed.
+     */
+    public void testUtf() throws SQLException {
+        int updatesCnt = stmt.executeUpdate(
+            "copy from \"" + BULKLOAD_UTF_CSV_FILE + "\" into " + TBL_NAME +
+                " (_key, age, firstName, lastName)" +
+                " format csv");
+
+        assertEquals(2, updatesCnt);
+
+        checkUtfCacheContents(TBL_NAME, true, 2);
+    }
+
+    /**
+     * Imports two-entry CSV file with UTF-8 characters into a table using 
batch size of one byte
+     * (thus splitting each two-byte UTF-8 character into two batches)
+     * and checks the created entries using SELECT statement.
+     *
+     * @throws SQLException If failed.
+     */
+    public void testUtfBatchSize_1() throws SQLException {
+        int updatesCnt = stmt.executeUpdate(
+            "copy from \"" + BULKLOAD_UTF_CSV_FILE + "\" into " + TBL_NAME +
+                " (_key, age, firstName, lastName)" +
+                " format csv batch_size 1");
+
+        assertEquals(2, updatesCnt);
+
+        checkUtfCacheContents(TBL_NAME, true, 2);
+    }
+
+    /**
+     * Imports one-entry CSV file into a table and checks the entry created 
using SELECT statement.
+     *
+     * @throws SQLException If failed.
+     */
+    public void testOneLineFile() throws SQLException {
+        int updatesCnt = stmt.executeUpdate(
+            "copy from \"" + BULKLOAD_ONE_LINE_CSV_FILE + "\" into " + 
TBL_NAME +
+                " (_key, age, firstName, lastName)" +
+                " format csv");
+
+        assertEquals(1, updatesCnt);
+
+        checkCacheContents(TBL_NAME, true, 1);
+    }
+
+    /**
+     * Imports zero-entry CSV file into a table and checks that no entries are 
created
+     * using SELECT statement.
+     *
+     * @throws SQLException If failed.
+     */
+    public void testEmptyFile() throws SQLException {
+        int updatesCnt = stmt.executeUpdate(
+            "copy from \"" + BULKLOAD_EMPTY_CSV_FILE + "\" into " + TBL_NAME +
+                " (_key, age, firstName, lastName)" +
+                " format csv");
+
+        assertEquals(0, updatesCnt);
+
+        checkCacheContents(TBL_NAME, true, 0);
+    }
+
+    /**
+     * Checks that error is reported for a non-existent file.
+     */
+    public void testWrongFileName() {
+        GridTestUtils.assertThrows(log, new Callable<Object>() {
+            @Override public Object call() throws Exception {
+                stmt.executeUpdate(
+                    "copy from \"nonexistent\" into Person" +
+                        " (_key, age, firstName, lastName)" +
+                        " format csv");
+
+                return null;
+            }
+        }, SQLException.class, "Failed to read file: 'nonexistent'");
+    }
+
+    /**
+     * Checks that error is reported if the destination table is missing.
+     */
+    public void testMissingTable() {
+        GridTestUtils.assertThrows(log, new Callable<Object>() {
+            @Override public Object call() throws Exception {
+                stmt.executeUpdate(
+                    "copy from \"" + BULKLOAD_TWO_LINES_CSV_FILE + "\" into 
Peterson" +
+                        " (_key, age, firstName, lastName)" +
+                        " format csv");
+
+                return null;
+            }
+        }, SQLException.class, "Table does not exist: PETERSON");
+    }
+
+    /**
+     * Checks that error is reported when a non-existing column is specified 
in the SQL command.
+     */
+    public void testWrongColumnName() {
+        GridTestUtils.assertThrows(log, new Callable<Object>() {
+            @Override public Object call() throws Exception {
+                stmt.executeUpdate(
+                    "copy from \"" + BULKLOAD_TWO_LINES_CSV_FILE + "\" into 
Person" +
+                        " (_key, age, firstName, lostName)" +
+                        " format csv");
+
+                return null;
+            }
+        }, SQLException.class, "Column \"LOSTNAME\" not found");
+    }
+
+    /**
+     * Checks that error is reported if field read from CSV file cannot be 
converted to the type of the column.
+     */
+    public void testWrongColumnType() {
+        GridTestUtils.assertThrows(log, new Callable<Object>() {
+            @Override public Object call() throws Exception {
+                stmt.executeUpdate(
+                    "copy from \"" + BULKLOAD_TWO_LINES_CSV_FILE + "\" into 
Person" +
+                        " (_key, firstName, age, lastName)" +
+                        " format csv");
+
+                return null;
+            }
+        }, SQLException.class, "Value conversion failed 
[from=java.lang.String, to=java.lang.Integer]");
+    }
+
+    /**
+     * Checks that if even a subset of fields is imported, the imported fields 
are set correctly.
+     *
+     * @throws SQLException If failed.
+     */
+    public void testFieldsSubset() throws SQLException {
+        int updatesCnt = stmt.executeUpdate(
+            "copy from \"" + BULKLOAD_TWO_LINES_CSV_FILE + "\" into " + 
TBL_NAME +
+                " (_key, age, firstName)" +
+                " format csv");
+
+        assertEquals(2, updatesCnt);
+
+        checkCacheContents(TBL_NAME, false, 2);
+    }
+
+    /**
+     * Checks that bulk load works when we create table using 'CREATE TABLE' 
command.
+     *
+     * The majority of the tests in this class use {@link 
CacheConfiguration#setIndexedTypes(Class[])}
+     * to create the table.
+     *
+     * @throws SQLException If failed.
+     */
+    public void testCreateAndBulkLoadTable() throws SQLException {
+        String tblName = QueryUtils.DFLT_SCHEMA + ".\"PersonTbl\"";
+
+        execute(conn, "create table " + tblName +
+            " (id int primary key, age int, firstName varchar(30), lastName 
varchar(30))");
+
+        int updatesCnt = stmt.executeUpdate(
+            "copy from \"" + BULKLOAD_TWO_LINES_CSV_FILE + "\" into " + 
tblName +
+                "(_key, age, firstName, lastName)" +
+                " format csv");
+
+        assertEquals(2, updatesCnt);
+
+        checkCacheContents(tblName, true, 2);
+    }
+
+    /**
+     * Checks that bulk load works when we create table with {@link 
CacheConfiguration#setQueryEntities(Collection)}.
+     *
+     * The majority of the tests in this class use {@link 
CacheConfiguration#setIndexedTypes(Class[])}
+     * to create a table.
+     *
+     * @throws SQLException If failed.
+     */
+    @SuppressWarnings("unchecked")
+    public void testConfigureQueryEntityAndBulkLoad() throws SQLException {
+        ignite(0).getOrCreateCache(cacheConfigWithQueryEntity());
+
+        int updatesCnt = stmt.executeUpdate(BASIC_SQL_COPY_STMT);
+
+        assertEquals(2, updatesCnt);
+
+        checkCacheContents(TBL_NAME, true, 2);
+    }
+
+    /**
+     * Checks that bulk load works when we use batch size of 1 byte and thus
+     * create multiple batches per COPY.
+     *
+     * @throws SQLException If failed.
+     */
+    public void testBatchSize_1() throws SQLException {
+        int updatesCnt = stmt.executeUpdate(BASIC_SQL_COPY_STMT + " batch_size 
1");
+
+        assertEquals(2, updatesCnt);
+
+        checkCacheContents(TBL_NAME, true, 2);
+    }
+
+    /**
+     * Verifies exception thrown if COPY is added into a batch.
+     *
+     * @throws SQLException If failed.
+     */
+    public void testMultipleStatement() throws SQLException {
+        GridTestUtils.assertThrows(log, new Callable<Object>() {
+            @Override public Object call() throws Exception {
+                stmt.addBatch(BASIC_SQL_COPY_STMT);
+
+                stmt.addBatch("copy from \"" + BULKLOAD_ONE_LINE_CSV_FILE + 
"\" into " + TBL_NAME +
+                    " (_key, age, firstName, lastName)" +
+                    " format csv");
+
+                stmt.addBatch("copy from \"" + BULKLOAD_UTF_CSV_FILE + "\" 
into " + TBL_NAME +
+                    " (_key, age, firstName, lastName)" +
+                    " format csv");
+
+                stmt.executeBatch();
+
+                return null;
+            }
+        }, BatchUpdateException.class, "COPY command cannot be executed in 
batch mode.");
+    }
+
+    /**
+     * Verifies that COPY command is rejected by Statement.executeQuery().
+     *
+     * @throws SQLException If failed.
+     */
+    public void testExecuteQuery() throws SQLException {
+        GridTestUtils.assertThrows(log, new Callable<Object>() {
+            @Override public Object call() throws Exception {
+                stmt.executeQuery(BASIC_SQL_COPY_STMT);
+
+                return null;
+            }
+        }, SQLException.class, "The query isn't SELECT query");
+    }
+
+    /**
+     * Verifies that COPY command works in Statement.execute().
+     *
+     * @throws SQLException If failed.
+     */
+    public void testExecute() throws SQLException {
+        boolean isRowSet = stmt.execute(BASIC_SQL_COPY_STMT);
+
+        assertFalse(isRowSet);
+
+        checkCacheContents(TBL_NAME, true, 2);
+    }
+
+    /**
+     * Verifies that COPY command can be called with 
PreparedStatement.executeUpdate().
+     *
+     * @throws SQLException If failed.
+     */
+    public void testPreparedStatementWithExecuteUpdate() throws SQLException {
+        PreparedStatement pstmt = conn.prepareStatement(BASIC_SQL_COPY_STMT);
+
+        int updatesCnt = pstmt.executeUpdate();
+
+        assertEquals(2, updatesCnt);
+
+        checkCacheContents(TBL_NAME, true, 2);
+    }
+
+    /**
+     * Verifies that COPY command reports an error when used with 
PreparedStatement parameter.
+     *
+     * @throws SQLException If failed.
+     */
+    public void testPreparedStatementWithParameter() throws SQLException {
+        GridTestUtils.assertThrows(log, new Callable<Object>() {
+                @Override public Object call() throws Exception {
+                    PreparedStatement pstmt = conn.prepareStatement(
+                        "copy from \"" + BULKLOAD_TWO_LINES_CSV_FILE + "\" 
into " + TBL_NAME +
+                            " (_key, age, firstName, lastName)" +
+                            " format ?");
+
+                    pstmt.setString(1, "csv");
+
+                    pstmt.executeUpdate();
+
+                    return null;
+                }
+            }, SQLException.class, "Unexpected token: \"?\" (expected: 
\"[identifier]\"");
+    }
+
+    /**
+     * Verifies that COPY command can be called with 
PreparedStatement.execute().
+     *
+     * @throws SQLException If failed.
+     */
+    public void testPreparedStatementWithExecute() throws SQLException {
+        PreparedStatement pstmt = conn.prepareStatement(BASIC_SQL_COPY_STMT);
+
+        boolean isRowSet = pstmt.execute();
+
+        assertFalse(isRowSet);
+
+        checkCacheContents(TBL_NAME, true, 2);
+    }
+
+    /**
+     * Verifies that COPY command is rejected by 
PreparedStatement.executeQuery().
+     */
+    public void testPreparedStatementWithExecuteQuery() {
+        GridTestUtils.assertThrows(log, new Callable<Object>() {
+            @Override public Object call() throws Exception {
+                PreparedStatement pstmt = 
conn.prepareStatement(BASIC_SQL_COPY_STMT);
+
+                pstmt.executeQuery();
+
+                return null;
+            }
+        }, SQLException.class, "The query isn't SELECT query");
+    }
+
+    /**
+     * Checks cache contents for a typical test using SQL SELECT command.
+     *
+     * @param tblName Table name to query.
+     * @param checkLastName Check 'lastName' column (not imported in some 
tests).
+     * @param recCnt Number of records to expect.
+     * @throws SQLException When one of checks has failed.
+     */
+    private void checkCacheContents(String tblName, boolean checkLastName, int 
recCnt) throws SQLException {
+        ResultSet rs = stmt.executeQuery("select _key, age, firstName, 
lastName from " + tblName);
+
+        assert rs != null;
+
+        int cnt = 0;
+
+        while (rs.next()) {
+            int id = rs.getInt("_key");
+
+            if (id == 123) {
+                assertEquals(12, rs.getInt("age"));
+                assertEquals("FirstName123 MiddleName123", 
rs.getString("firstName"));
+                if (checkLastName)
+                    assertEquals("LastName123", rs.getString("lastName"));
+            }
+            else if (id == 456) {
+                assertEquals(45, rs.getInt("age"));
+                assertEquals("FirstName456", rs.getString("firstName"));
+                if (checkLastName)
+                    assertEquals("LastName456", rs.getString("lastName"));
+            }
+            else
+                fail("Wrong ID: " + id);
+
+            cnt++;
+        }
+
+        assertEquals(recCnt, cnt);
+    }
+
+    /**
+     * Checks cache contents for a UTF-8 bulk load tests using SQL SELECT 
command.
+     *
+     * @param tblName Table name to query.
+     * @param checkLastName Check 'lastName' column (not imported in some 
tests).
+     * @param recCnt Number of records to expect.
+     * @throws SQLException When one of checks has failed.
+     */
+    private void checkUtfCacheContents(String tblName, boolean checkLastName, 
int recCnt) throws SQLException {
+        ResultSet rs = stmt.executeQuery("select _key, age, firstName, 
lastName from " + tblName);
+
+        assert rs != null;
+
+        int cnt = 0;
+
+        while (rs.next()) {
+            int id = rs.getInt("_key");
+
+            if (id == 123) {
+                assertEquals(12, rs.getInt("age"));
+                assertEquals("Имя123 Отчество123", 
rs.getString("firstName"));
+                if (checkLastName)
+                    assertEquals("Фамилия123", 
rs.getString("lastName"));
+            }
+            else if (id == 456) {
+                assertEquals(45, rs.getInt("age"));
+                assertEquals("Имя456", rs.getString("firstName"));
+                if (checkLastName)
+                    assertEquals("Фамилия456", 
rs.getString("lastName"));
+            }
+            else
+                fail("Wrong ID: " + id);
+
+            cnt++;
+        }
+
+        assertEquals(recCnt, cnt);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/25d38cc9/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinBulkLoadAtomicPartitionedNearSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinBulkLoadAtomicPartitionedNearSelfTest.java
 
b/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinBulkLoadAtomicPartitionedNearSelfTest.java
new file mode 100644
index 0000000..887b1d9
--- /dev/null
+++ 
b/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinBulkLoadAtomicPartitionedNearSelfTest.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.jdbc.thin;
+
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheMode;
+
+/** A {@link JdbcThinBulkLoadAbstractSelfTest} for partitioned atomic 
near-cache mode. */
+public class JdbcThinBulkLoadAtomicPartitionedNearSelfTest extends 
JdbcThinBulkLoadAbstractSelfTest {
+    /** {@inheritDoc} */
+    @Override protected CacheMode cacheMode() {
+        return CacheMode.PARTITIONED;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected CacheAtomicityMode atomicityMode() {
+        return CacheAtomicityMode.ATOMIC;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected boolean nearCache() {
+        return true;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/25d38cc9/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinBulkLoadAtomicPartitionedSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinBulkLoadAtomicPartitionedSelfTest.java
 
b/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinBulkLoadAtomicPartitionedSelfTest.java
new file mode 100644
index 0000000..5581333
--- /dev/null
+++ 
b/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinBulkLoadAtomicPartitionedSelfTest.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.jdbc.thin;
+
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheMode;
+
+/** A {@link JdbcThinBulkLoadAbstractSelfTest} for partitioned atomic mode. */
+public class JdbcThinBulkLoadAtomicPartitionedSelfTest extends 
JdbcThinBulkLoadAbstractSelfTest {
+    /** {@inheritDoc} */
+    @Override protected CacheMode cacheMode() {
+        return CacheMode.PARTITIONED;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected CacheAtomicityMode atomicityMode() {
+        return CacheAtomicityMode.ATOMIC;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected boolean nearCache() {
+        return false;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/25d38cc9/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinBulkLoadAtomicReplicatedSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinBulkLoadAtomicReplicatedSelfTest.java
 
b/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinBulkLoadAtomicReplicatedSelfTest.java
new file mode 100644
index 0000000..c3d69af
--- /dev/null
+++ 
b/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinBulkLoadAtomicReplicatedSelfTest.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.jdbc.thin;
+
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheMode;
+
+/** A {@link JdbcThinBulkLoadAbstractSelfTest} for replicated atomic 
near-cache mode. */
+public class JdbcThinBulkLoadAtomicReplicatedSelfTest extends 
JdbcThinBulkLoadAbstractSelfTest {
+    /** {@inheritDoc} */
+    @Override protected CacheMode cacheMode() {
+        return CacheMode.REPLICATED;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected CacheAtomicityMode atomicityMode() {
+        return CacheAtomicityMode.ATOMIC;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected boolean nearCache() {
+        return false;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/25d38cc9/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinBulkLoadTransactionalPartitionedNearSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinBulkLoadTransactionalPartitionedNearSelfTest.java
 
b/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinBulkLoadTransactionalPartitionedNearSelfTest.java
new file mode 100644
index 0000000..9336dd1
--- /dev/null
+++ 
b/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinBulkLoadTransactionalPartitionedNearSelfTest.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.jdbc.thin;
+
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheMode;
+
+/** A {@link JdbcThinBulkLoadAbstractSelfTest} for partitioned transactional 
near-cache mode. */
+public class JdbcThinBulkLoadTransactionalPartitionedNearSelfTest extends 
JdbcThinBulkLoadAbstractSelfTest {
+    /** {@inheritDoc} */
+    @Override protected CacheMode cacheMode() {
+        return CacheMode.PARTITIONED;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected CacheAtomicityMode atomicityMode() {
+        return CacheAtomicityMode.TRANSACTIONAL;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected boolean nearCache() {
+        return true;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/25d38cc9/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinBulkLoadTransactionalPartitionedSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinBulkLoadTransactionalPartitionedSelfTest.java
 
b/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinBulkLoadTransactionalPartitionedSelfTest.java
new file mode 100644
index 0000000..d1dea2a
--- /dev/null
+++ 
b/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinBulkLoadTransactionalPartitionedSelfTest.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.jdbc.thin;
+
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheMode;
+
+/** A {@link JdbcThinBulkLoadAbstractSelfTest} for partitioned transactional 
mode. */
+public class JdbcThinBulkLoadTransactionalPartitionedSelfTest extends 
JdbcThinBulkLoadAbstractSelfTest {
+    /** {@inheritDoc} */
+    @Override protected CacheMode cacheMode() {
+        return CacheMode.PARTITIONED;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected CacheAtomicityMode atomicityMode() {
+        return CacheAtomicityMode.TRANSACTIONAL;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected boolean nearCache() {
+        return false;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/25d38cc9/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinBulkLoadTransactionalReplicatedSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinBulkLoadTransactionalReplicatedSelfTest.java
 
b/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinBulkLoadTransactionalReplicatedSelfTest.java
new file mode 100644
index 0000000..1c377fa
--- /dev/null
+++ 
b/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinBulkLoadTransactionalReplicatedSelfTest.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.jdbc.thin;
+
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheMode;
+
+/** A {@link JdbcThinBulkLoadAbstractSelfTest} for replicated transactional 
mode. */
+public class JdbcThinBulkLoadTransactionalReplicatedSelfTest extends 
JdbcThinBulkLoadAbstractSelfTest {
+    /** {@inheritDoc} */
+    @Override protected CacheMode cacheMode() {
+        return CacheMode.REPLICATED;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected CacheAtomicityMode atomicityMode() {
+        return CacheAtomicityMode.TRANSACTIONAL;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected boolean nearCache() {
+        return false;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/25d38cc9/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinDynamicIndexAbstractSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinDynamicIndexAbstractSelfTest.java
 
b/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinDynamicIndexAbstractSelfTest.java
index dbe93a4..539713a 100644
--- 
a/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinDynamicIndexAbstractSelfTest.java
+++ 
b/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinDynamicIndexAbstractSelfTest.java
@@ -25,7 +25,6 @@ import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.Callable;
 import org.apache.ignite.IgniteCache;
-import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.cache.CacheAtomicityMode;
 import org.apache.ignite.cache.CacheMode;
 import org.apache.ignite.cache.CacheWriteSynchronizationMode;

http://git-wip-us.apache.org/repos/asf/ignite/blob/25d38cc9/modules/clients/src/test/resources/bulkload0.csv
----------------------------------------------------------------------
diff --git a/modules/clients/src/test/resources/bulkload0.csv 
b/modules/clients/src/test/resources/bulkload0.csv
new file mode 100644
index 0000000..e69de29

http://git-wip-us.apache.org/repos/asf/ignite/blob/25d38cc9/modules/clients/src/test/resources/bulkload1.csv
----------------------------------------------------------------------
diff --git a/modules/clients/src/test/resources/bulkload1.csv 
b/modules/clients/src/test/resources/bulkload1.csv
new file mode 100644
index 0000000..596ac32
--- /dev/null
+++ b/modules/clients/src/test/resources/bulkload1.csv
@@ -0,0 +1 @@
+123,12,"FirstName123 MiddleName123",LastName123
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/25d38cc9/modules/clients/src/test/resources/bulkload2.csv
----------------------------------------------------------------------
diff --git a/modules/clients/src/test/resources/bulkload2.csv 
b/modules/clients/src/test/resources/bulkload2.csv
new file mode 100644
index 0000000..d398c19
--- /dev/null
+++ b/modules/clients/src/test/resources/bulkload2.csv
@@ -0,0 +1,2 @@
+123,12,"FirstName123 MiddleName123",LastName123
+456,45,"FirstName456","LastName456"
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/25d38cc9/modules/clients/src/test/resources/bulkload2_utf.csv
----------------------------------------------------------------------
diff --git a/modules/clients/src/test/resources/bulkload2_utf.csv 
b/modules/clients/src/test/resources/bulkload2_utf.csv
new file mode 100644
index 0000000..bdb6489
--- /dev/null
+++ b/modules/clients/src/test/resources/bulkload2_utf.csv
@@ -0,0 +1,2 @@
+123,12,"Имя123 Отчество123",Фамилия123
+456,45,"Имя456","Фамилия456"
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/25d38cc9/modules/core/src/main/java/org/apache/ignite/cache/query/BulkLoadContextCursor.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/cache/query/BulkLoadContextCursor.java
 
b/modules/core/src/main/java/org/apache/ignite/cache/query/BulkLoadContextCursor.java
new file mode 100644
index 0000000..b7fdec3
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/cache/query/BulkLoadContextCursor.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cache.query;
+
+import 
org.apache.ignite.internal.processors.bulkload.BulkLoadAckClientParameters;
+import org.apache.ignite.internal.processors.bulkload.BulkLoadProcessor;
+import org.jetbrains.annotations.NotNull;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+
+/**
+ * A special FieldsQueryCursor subclass that is used as a sentinel to transfer 
data from bulk load
+ * (COPY) command to the JDBC or other client-facing driver: the bulk load 
batch processor
+ * and parameters to send to the client.
+ * */
+public class BulkLoadContextCursor implements FieldsQueryCursor<List<?>> {
+    /** Bulk load context from SQL command. */
+    private final BulkLoadProcessor processor;
+
+    /** Bulk load parameters to send to the client. */
+    private final BulkLoadAckClientParameters clientParams;
+
+    /**
+     * Creates a cursor.
+     *
+     * @param processor Bulk load context object to store.
+     * @param clientParams Parameters to send to client.
+     */
+    public BulkLoadContextCursor(BulkLoadProcessor processor, 
BulkLoadAckClientParameters clientParams) {
+        this.processor = processor;
+        this.clientParams = clientParams;
+    }
+
+    /**
+     * Returns a bulk load context.
+     *
+     * @return a bulk load context.
+     */
+    public BulkLoadProcessor bulkLoadProcessor() {
+        return processor;
+    }
+
+    /**
+     * Returns the bulk load parameters to send to the client.
+     *
+     * @return The bulk load parameters to send to the client.
+     */
+    public BulkLoadAckClientParameters clientParams() {
+        return clientParams;
+    }
+
+    /** {@inheritDoc} */
+    @Override public List<List<?>> getAll() {
+        return Collections.singletonList(Arrays.asList(processor, 
clientParams));
+    }
+
+    /** {@inheritDoc} */
+    @NotNull @Override public Iterator<List<?>> iterator() {
+        return getAll().iterator();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void close() {
+        // no-op
+    }
+
+    /** {@inheritDoc} */
+    @Override public String getFieldName(int idx) {
+        if (idx < 0 || idx > 1)
+            throw new IndexOutOfBoundsException();
+
+        return idx == 0 ? "processor" : "clientParams";
+    }
+
+    /** {@inheritDoc} */
+    @Override public int getColumnsCount() {
+        return 2;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/25d38cc9/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinStatement.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinStatement.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinStatement.java
index d29df93..2020011 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinStatement.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinStatement.java
@@ -17,6 +17,9 @@
 
 package org.apache.ignite.internal.jdbc.thin;
 
+import java.io.BufferedInputStream;
+import java.io.FileInputStream;
+import java.io.InputStream;
 import java.sql.BatchUpdateException;
 import java.sql.Connection;
 import java.sql.ResultSet;
@@ -25,21 +28,24 @@ import java.sql.SQLFeatureNotSupportedException;
 import java.sql.SQLWarning;
 import java.sql.Statement;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import org.apache.ignite.cache.query.SqlQuery;
 import org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode;
-import org.apache.ignite.internal.processors.odbc.SqlStateCode;
 import org.apache.ignite.internal.processors.odbc.ClientListenerResponse;
+import org.apache.ignite.internal.processors.odbc.SqlStateCode;
 import org.apache.ignite.internal.processors.odbc.jdbc.JdbcBatchExecuteRequest;
 import org.apache.ignite.internal.processors.odbc.jdbc.JdbcBatchExecuteResult;
+import org.apache.ignite.internal.processors.odbc.jdbc.JdbcBulkLoadAckResult;
 import org.apache.ignite.internal.processors.odbc.jdbc.JdbcQuery;
 import 
org.apache.ignite.internal.processors.odbc.jdbc.JdbcQueryExecuteMultipleStatementsResult;
 import org.apache.ignite.internal.processors.odbc.jdbc.JdbcQueryExecuteRequest;
 import org.apache.ignite.internal.processors.odbc.jdbc.JdbcQueryExecuteResult;
-import org.apache.ignite.internal.processors.odbc.jdbc.JdbcStatementType;
 import org.apache.ignite.internal.processors.odbc.jdbc.JdbcResult;
 import org.apache.ignite.internal.processors.odbc.jdbc.JdbcResultInfo;
+import 
org.apache.ignite.internal.processors.odbc.jdbc.JdbcBulkLoadBatchRequest;
+import org.apache.ignite.internal.processors.odbc.jdbc.JdbcStatementType;
 
 import static java.sql.ResultSet.CONCUR_READ_ONLY;
 import static java.sql.ResultSet.FETCH_FORWARD;
@@ -132,6 +138,9 @@ public class JdbcThinStatement implements Statement {
 
         assert res0 != null;
 
+        if (res0 instanceof JdbcBulkLoadAckResult)
+            res0 = sendFile((JdbcBulkLoadAckResult)res0);
+
         if (res0 instanceof JdbcQueryExecuteResult) {
             JdbcQueryExecuteResult res = (JdbcQueryExecuteResult)res0;
 
@@ -176,6 +185,61 @@ public class JdbcThinStatement implements Statement {
         assert resultSets.size() > 0 : "At least one results set is expected";
     }
 
+    /**
+     * Sends a file to server in batches via multiple {@link 
JdbcBulkLoadBatchRequest}s.
+     *
+     * @param cmdRes Result of invoking COPY command: contains server-parsed
+     *    bulk load parameters, such as file name and batch size.
+     */
+    private JdbcResult sendFile(JdbcBulkLoadAckResult cmdRes) throws 
SQLException {
+        String fileName = cmdRes.params().localFileName();
+        int batchSize = cmdRes.params().batchSize();
+
+        int batchNum = 0;
+
+        try {
+            try (InputStream input = new BufferedInputStream(new 
FileInputStream(fileName))) {
+                byte[] buf = new byte[batchSize];
+
+                int readBytes;
+                while ((readBytes = input.read(buf)) != -1) {
+                    if (readBytes == 0)
+                        continue;
+
+                    JdbcResult res = conn.sendRequest(new 
JdbcBulkLoadBatchRequest(
+                        cmdRes.queryId(),
+                        batchNum++,
+                        JdbcBulkLoadBatchRequest.CMD_CONTINUE,
+                        readBytes == buf.length ? buf : Arrays.copyOf(buf, 
readBytes)));
+
+                    if (!(res instanceof JdbcQueryExecuteResult))
+                        throw new SQLException("Unknown response sent by the 
server: " + res);
+                }
+
+                return conn.sendRequest(new JdbcBulkLoadBatchRequest(
+                    cmdRes.queryId(),
+                    batchNum++,
+                    JdbcBulkLoadBatchRequest.CMD_FINISHED_EOF));
+            }
+        }
+        catch (Exception e) {
+            try {
+                conn.sendRequest(new JdbcBulkLoadBatchRequest(
+                    cmdRes.queryId(),
+                    batchNum,
+                    JdbcBulkLoadBatchRequest.CMD_FINISHED_ERROR));
+            }
+            catch (SQLException e1) {
+                throw new SQLException("Cannot send finalization request: " + 
e1.getMessage(), e);
+            }
+
+            if (e instanceof SQLException)
+                throw (SQLException) e;
+            else
+                throw new SQLException("Failed to read file: '" + fileName + 
"'", SqlStateCode.INTERNAL_ERROR, e);
+        }
+    }
+
     /** {@inheritDoc} */
     @Override public int executeUpdate(String sql) throws SQLException {
         execute0(JdbcStatementType.UPDATE_STMT_TYPE, sql, null);

http://git-wip-us.apache.org/repos/asf/ignite/blob/25d38cc9/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcQueryTask.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcQueryTask.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcQueryTask.java
index aa9f009..07034f4 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcQueryTask.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcQueryTask.java
@@ -32,6 +32,8 @@ import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.IgniteJdbcDriver;
 import org.apache.ignite.IgniteSystemProperties;
+import org.apache.ignite.cache.query.BulkLoadContextCursor;
+import org.apache.ignite.cache.query.FieldsQueryCursor;
 import org.apache.ignite.cache.query.QueryCursor;
 import org.apache.ignite.cache.query.SqlFieldsQuery;
 import org.apache.ignite.internal.IgniteKernal;
@@ -168,7 +170,15 @@ class JdbcQueryTask implements 
IgniteCallable<JdbcQueryTaskResult> {
             qry.setLazy(lazy());
             qry.setSchema(schemaName);
 
-            QueryCursorImpl<List<?>> qryCursor = 
(QueryCursorImpl<List<?>>)cache.withKeepBinary().query(qry);
+            FieldsQueryCursor<List<?>> fldQryCursor = 
cache.withKeepBinary().query(qry);
+
+            if (fldQryCursor instanceof BulkLoadContextCursor) {
+                fldQryCursor.close();
+                
+                throw new SQLException("COPY command is currently supported 
only in thin JDBC driver.");
+            }
+
+            QueryCursorImpl<List<?>> qryCursor = 
(QueryCursorImpl<List<?>>)fldQryCursor;
 
             if (isQry == null)
                 isQry = qryCursor.isQuery();

http://git-wip-us.apache.org/repos/asf/ignite/blob/25d38cc9/modules/core/src/main/java/org/apache/ignite/internal/processors/bulkload/BulkLoadAckClientParameters.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/bulkload/BulkLoadAckClientParameters.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/bulkload/BulkLoadAckClientParameters.java
new file mode 100644
index 0000000..119d9f9
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/bulkload/BulkLoadAckClientParameters.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.bulkload;
+
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * Bulk load parameters, which are parsed from SQL command and sent from 
server to client.
+ */
+public class BulkLoadAckClientParameters {
+    /** Minimum batch size. */
+    public static final int MIN_BATCH_SIZE = 1;
+
+    /**
+     * Maximum batch size. Note that the batch is wrapped to transport objects 
and the overall packet should fit
+     * into a Java array. 512 has been chosen arbitrarily.
+     */
+    public static final int MAX_BATCH_SIZE = Integer.MAX_VALUE - 512;
+
+    /** Size of a file batch for COPY command. */
+    public static final int DEFAULT_BATCH_SIZE = 4 * 1024 * 1024;
+
+    /** Local name of the file to send to server */
+    @NotNull private final String locFileName;
+
+    /** File batch size in bytes. */
+    private final int batchSize;
+
+    /**
+     * Creates a bulk load parameters.
+     *
+     * @param locFileName File name to send from client to server.
+     * @param batchSize Batch size (Number of bytes in a portion of a file to 
send in one JDBC request/response).
+     */
+    public BulkLoadAckClientParameters(@NotNull String locFileName, int 
batchSize) {
+        this.locFileName = locFileName;
+        this.batchSize = batchSize;
+    }
+
+    /**
+     * Returns the local name of file to send.
+     *
+     * @return The local name of file to send.
+     */
+    @NotNull public String localFileName() {
+        return locFileName;
+    }
+
+    /**
+     * Returns the batch size.
+     *
+     * @return The batch size.
+     */
+    public int batchSize() {
+        return batchSize;
+    }
+
+    /**
+     * Checks if batch size value is valid.
+     *
+     * @param sz The batch size to check.
+     * @throws IllegalArgumentException if batch size is invalid.
+     */
+    public static boolean isValidBatchSize(int sz) {
+        return sz >= MIN_BATCH_SIZE && sz <= MAX_BATCH_SIZE;
+    }
+
+    /**
+     * Creates proper batch size error message if {@link 
#isValidBatchSize(int)} check has failed.
+     *
+     * @param sz The batch size.
+     * @return The string with the error message.
+     */
+    public static String batchSizeErrorMsg(int sz) {
+        return "Batch size should be within [" + MIN_BATCH_SIZE + ".." + 
MAX_BATCH_SIZE + "]: " + sz;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/25d38cc9/modules/core/src/main/java/org/apache/ignite/internal/processors/bulkload/BulkLoadCacheWriter.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/bulkload/BulkLoadCacheWriter.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/bulkload/BulkLoadCacheWriter.java
new file mode 100644
index 0000000..90714c8
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/bulkload/BulkLoadCacheWriter.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.bulkload;
+
+import org.apache.ignite.lang.IgniteBiTuple;
+import org.apache.ignite.lang.IgniteInClosure;
+
+/** A proxy, which stores given key+value pair to a cache. */
+public abstract class BulkLoadCacheWriter implements 
IgniteInClosure<IgniteBiTuple<?, ?>>, AutoCloseable {
+    /**
+     * Returns number of entry updates made by the writer.
+     *
+     * @return The number of cache entry updates.
+     */
+    public abstract long updateCnt();
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/25d38cc9/modules/core/src/main/java/org/apache/ignite/internal/processors/bulkload/BulkLoadCsvFormat.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/bulkload/BulkLoadCsvFormat.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/bulkload/BulkLoadCsvFormat.java
new file mode 100644
index 0000000..6f5e91e
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/bulkload/BulkLoadCsvFormat.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.bulkload;
+
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+
+import java.util.regex.Pattern;
+
+/** A placeholder for bulk load CSV format parser options. */
+public class BulkLoadCsvFormat extends BulkLoadFormat {
+
+    /** Line separator pattern. */
+    @NotNull public static final Pattern DEFAULT_LINE_SEPARATOR = 
Pattern.compile("[\r\n]+");
+
+    /** Field separator pattern. */
+    @NotNull public static final Pattern DEFAULT_FIELD_SEPARATOR = 
Pattern.compile(",");
+
+    /** Quote characters */
+    @NotNull public static final String DEFAULT_QUOTE_CHARS = "\"";
+
+    /** Default escape sequence start characters. */
+    @Nullable public static final String DEFAULT_ESCAPE_CHARS = null;
+
+    /** Line comment start pattern. */
+    @Nullable public static final Pattern DEFAULT_COMMENT_CHARS = null;
+
+    /** Format name. */
+    public static final String NAME = "CSV";
+
+    /** Line separator pattern. */
+    @Nullable private Pattern lineSeparator;
+
+    /** Field separator pattern. */
+    @Nullable private Pattern fieldSeparator;
+
+    /** Set of quote characters. */
+    @Nullable private String quoteChars;
+
+    /** Line comment start pattern. */
+    @Nullable private Pattern commentChars;
+
+    /** Set of escape start characters. */
+    @Nullable private String escapeChars;
+
+    /**
+     * Returns the name of the format.
+     *
+     * @return The name of the format.
+     */
+    @Override public String name() {
+        return NAME;
+    }
+
+    /**
+     * Returns the line separator pattern.
+     *
+     * @return The line separator pattern.
+     */
+    @Nullable public Pattern lineSeparator() {
+        return lineSeparator;
+    }
+
+    /**
+     * Sets the line separator pattern.
+     *
+     * @param lineSeparator The line separator pattern.
+     */
+    public void lineSeparator(@Nullable Pattern lineSeparator) {
+        this.lineSeparator = lineSeparator;
+    }
+
+    /**
+     * Returns the field separator pattern.
+     *
+     * @return The field separator pattern.
+     */
+    @Nullable public Pattern fieldSeparator() {
+        return fieldSeparator;
+    }
+
+    /**
+     * Sets the field separator pattern.
+     *
+     * @param fieldSeparator The field separator pattern.
+     */
+    public void fieldSeparator(@Nullable Pattern fieldSeparator) {
+        this.fieldSeparator = fieldSeparator;
+    }
+
+    /**
+     * Returns the quote characters.
+     *
+     * @return The quote characters.
+     */
+    @Nullable public String quoteChars() {
+        return quoteChars;
+    }
+
+    /**
+     * Sets the quote characters.
+     *
+     * @param quoteChars The quote characters.
+     */
+    public void quoteChars(@Nullable String quoteChars) {
+        this.quoteChars = quoteChars;
+    }
+
+    /**
+     * Returns the line comment start pattern.
+     *
+     * @return The line comment start pattern.
+     */
+    @Nullable public Pattern commentChars() {
+        return commentChars;
+    }
+
+    /**
+     * Sets the line comment start pattern.
+     *
+     * @param commentChars The line comment start pattern.
+     */
+    public void commentChars(@Nullable Pattern commentChars) {
+        this.commentChars = commentChars;
+    }
+
+    /**
+     * Returns the escape characters.
+     *
+     * @return The escape characters.
+     */
+    @Nullable public String escapeChars() {
+        return escapeChars;
+    }
+
+    /**
+     * Sets the escape characters.
+     *
+     * @param escapeChars The escape characters.
+     */
+    public void escapeChars(@Nullable String escapeChars) {
+        this.escapeChars = escapeChars;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/25d38cc9/modules/core/src/main/java/org/apache/ignite/internal/processors/bulkload/BulkLoadCsvParser.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/bulkload/BulkLoadCsvParser.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/bulkload/BulkLoadCsvParser.java
new file mode 100644
index 0000000..0511596
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/bulkload/BulkLoadCsvParser.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.bulkload;
+
+import org.apache.ignite.IgniteCheckedException;
+import 
org.apache.ignite.internal.processors.bulkload.pipeline.CharsetDecoderBlock;
+import 
org.apache.ignite.internal.processors.bulkload.pipeline.CsvLineProcessorBlock;
+import org.apache.ignite.internal.processors.bulkload.pipeline.PipelineBlock;
+import 
org.apache.ignite.internal.processors.bulkload.pipeline.StrListAppenderBlock;
+import 
org.apache.ignite.internal.processors.bulkload.pipeline.LineSplitterBlock;
+
+import java.util.LinkedList;
+import java.util.List;
+
+/** CSV parser for COPY command. */
+public class BulkLoadCsvParser extends BulkLoadParser {
+    /** Processing pipeline input block: a decoder for the input stream of 
bytes */
+    private final PipelineBlock<byte[], char[]> inputBlock;
+
+    /** A record collecting block that appends its input to {@code 
List<String>}. */
+    private final StrListAppenderBlock collectorBlock;
+
+    /**
+     * Creates bulk load CSV parser.
+     *
+     *  @param format Format options (parsed from COPY command).
+     */
+    public BulkLoadCsvParser(BulkLoadCsvFormat format) {
+        inputBlock = new 
CharsetDecoderBlock(BulkLoadFormat.DEFAULT_INPUT_CHARSET);
+
+        collectorBlock = new StrListAppenderBlock();
+
+        // Handling of the other options is to be implemented in IGNITE-7537.
+        inputBlock.append(new LineSplitterBlock(format.lineSeparator()))
+               .append(new CsvLineProcessorBlock(format.fieldSeparator(), 
format.quoteChars()))
+               .append(collectorBlock);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected Iterable<List<Object>> parseBatch(byte[] batchData, 
boolean isLastBatch)
+        throws IgniteCheckedException {
+        List<List<Object>> res = new LinkedList<>();
+
+        collectorBlock.output(res);
+
+        inputBlock.accept(batchData, isLastBatch);
+
+        return res;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/25d38cc9/modules/core/src/main/java/org/apache/ignite/internal/processors/bulkload/BulkLoadFormat.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/bulkload/BulkLoadFormat.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/bulkload/BulkLoadFormat.java
new file mode 100644
index 0000000..cff93c5
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/bulkload/BulkLoadFormat.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.bulkload;
+
+import java.nio.charset.Charset;
+
+/** A superclass and a factory for bulk load format options. */
+public abstract class BulkLoadFormat {
+    /** The default input charset. */
+    public static final Charset DEFAULT_INPUT_CHARSET = 
Charset.forName("UTF-8");
+
+    /**
+     * Returns the format name.
+     *
+     * @return The format name.
+     */
+    public abstract String name();
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/25d38cc9/modules/core/src/main/java/org/apache/ignite/internal/processors/bulkload/BulkLoadParser.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/bulkload/BulkLoadParser.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/bulkload/BulkLoadParser.java
new file mode 100644
index 0000000..252e87b
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/bulkload/BulkLoadParser.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.bulkload;
+
+import org.apache.ignite.IgniteCheckedException;
+
+import java.util.List;
+
+/**
+ * Bulk load file format parser superclass + factory of known formats.
+ *
+ * <p>The parser processes a batch of input data and return a list of records.
+ *
+ * <p>The parser uses corresponding options from {@link BulkLoadFormat} 
subclass.
+ */
+public abstract class BulkLoadParser {
+    /**
+     * Parses a batch of input data and returns a list of records parsed
+     * (in most cases this is a list of strings).
+     *
+     * <p>Note that conversion between parsed and database table type is done 
by the other
+     * object (see {@link BulkLoadProcessor#dataConverter}) by the request 
processing code.
+     * This method is not obliged to do this conversion.
+     *
+     * @param batchData Data from the current batch.
+     * @param isLastBatch true if this is the last batch.
+     * @return The list of records.
+     * @throws IgniteCheckedException If any processing error occurs.
+     */
+    protected abstract Iterable<List<Object>> parseBatch(byte[] batchData, 
boolean isLastBatch)
+        throws IgniteCheckedException;
+
+    /**
+     * Creates a parser for a given format options.
+     *
+     * @param format The input format object.
+     * @return The parser.
+     * @throws IllegalArgumentException if the format is not known to the 
factory.
+     */
+    public static BulkLoadParser createParser(BulkLoadFormat format) {
+        if (format instanceof BulkLoadCsvFormat)
+            return new BulkLoadCsvParser((BulkLoadCsvFormat)format);
+
+        throw new IllegalArgumentException("Internal error: format is not 
defined");
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/25d38cc9/modules/core/src/main/java/org/apache/ignite/internal/processors/bulkload/BulkLoadProcessor.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/bulkload/BulkLoadProcessor.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/bulkload/BulkLoadProcessor.java
new file mode 100644
index 0000000..ccf3e25
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/bulkload/BulkLoadProcessor.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.bulkload;
+
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteDataStreamer;
+import org.apache.ignite.IgniteIllegalStateException;
+import org.apache.ignite.internal.util.lang.IgniteClosureX;
+import org.apache.ignite.lang.IgniteBiTuple;
+
+import java.util.List;
+
+/**
+ * Bulk load (COPY) command processor used on server to keep various context 
data and process portions of input
+ * received from the client side.
+ */
+public class BulkLoadProcessor implements AutoCloseable {
+    /** Parser of the input bytes. */
+    private final BulkLoadParser inputParser;
+
+    /**
+     * Converter, which transforms the list of strings parsed from the input 
stream to the key+value entry to add to
+     * the cache.
+     */
+    private final IgniteClosureX<List<?>, IgniteBiTuple<?, ?>> dataConverter;
+
+    /** Streamer that puts actual key/value into the cache. */
+    private final BulkLoadCacheWriter outputStreamer;
+
+    /** Becomes true after {@link #close()} method is called. */
+    private boolean isClosed;
+
+    /**
+     * Creates bulk load processor.
+     *
+     * @param inputParser Parser of the input bytes.
+     * @param dataConverter Converter, which transforms the list of strings 
parsed from the input stream to the
+     *     key+value entry to add to the cache.
+     * @param outputStreamer Streamer that puts actual key/value into the 
cache.
+     */
+    public BulkLoadProcessor(BulkLoadParser inputParser, 
IgniteClosureX<List<?>, IgniteBiTuple<?, ?>> dataConverter,
+        BulkLoadCacheWriter outputStreamer) {
+        this.inputParser = inputParser;
+        this.dataConverter = dataConverter;
+        this.outputStreamer = outputStreamer;
+        isClosed = false;
+    }
+
+    /**
+     * Returns the streamer that puts actual key/value into the cache.
+     *
+     * @return Streamer that puts actual key/value into the cache.
+     */
+    public BulkLoadCacheWriter outputStreamer() {
+        return outputStreamer;
+    }
+
+    /**
+     * Processes the incoming batch and writes data to the cache by calling 
the data converter and output streamer.
+     *
+     * @param batchData Data from the current batch.
+     * @param isLastBatch true if this is the last batch.
+     * @throws IgniteIllegalStateException when called after {@link #close()}.
+     */
+    public void processBatch(byte[] batchData, boolean isLastBatch) throws 
IgniteCheckedException {
+        if (isClosed)
+            throw new IgniteIllegalStateException("Attempt to process a batch 
on a closed BulkLoadProcessor");
+
+        Iterable<List<Object>> inputRecords = 
inputParser.parseBatch(batchData, isLastBatch);
+
+        for (List<Object> record : inputRecords) {
+            IgniteBiTuple<?, ?> kv = dataConverter.apply(record);
+
+            outputStreamer.apply(kv);
+        }
+    }
+
+    /**
+     * Aborts processing and closes the underlying objects ({@link 
IgniteDataStreamer}).
+     */
+    @Override public void close() throws Exception {
+        if (isClosed)
+            return;
+
+        isClosed = true;
+
+        outputStreamer.close();
+    }
+}

Reply via email to