IGNITE-7337: Implementation of saving DataFrame to Ignite SQL tables - Fixes 
#3438.

Signed-off-by: Nikolay Izhikov <nizhi...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/7c014529
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/7c014529
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/7c014529

Branch: refs/heads/ignite-7485-2
Commit: 7c01452990ad0de0fb84ab4c0424a6d71e5bccba
Parents: 4abcb31
Author: Nikolay Izhikov <nizhi...@apache.org>
Authored: Fri Feb 9 07:00:54 2018 +0300
Committer: Nikolay Izhikov <nizhi...@apache.org>
Committed: Fri Feb 9 07:00:54 2018 +0300

----------------------------------------------------------------------
 examples/src/main/resources/person.json         |  10 +
 .../spark/IgniteDataFrameWriteExample.scala     | 179 +++++++++
 .../spark/examples/IgniteDataFrameSelfTest.java |   9 +
 .../ignite/spark/IgniteDataFrameSettings.scala  | 100 +++++
 .../spark/impl/IgniteRelationProvider.scala     | 175 ++++++++-
 .../ignite/spark/impl/IgniteSQLRelation.scala   |  93 ++---
 .../apache/ignite/spark/impl/QueryHelper.scala  | 186 ++++++++++
 .../apache/ignite/spark/impl/QueryUtils.scala   | 225 +++++++++++
 .../org/apache/ignite/spark/impl/package.scala  |   2 +-
 .../sql/ignite/IgniteExternalCatalog.scala      |  61 ++-
 .../src/test/resources/cities_non_unique.json   |   6 +
 .../ignite/spark/AbstractDataFrameSpec.scala    |  41 +-
 .../apache/ignite/spark/IgniteCatalogSpec.scala |  10 +-
 .../ignite/spark/IgniteDataFrameSuite.scala     |   2 +
 .../spark/IgniteDataFrameWrongConfigSpec.scala  |   4 +-
 ...niteSQLDataFrameIgniteSessionWriteSpec.scala | 106 ++++++
 .../spark/IgniteSQLDataFrameWriteSpec.scala     | 371 +++++++++++++++++++
 17 files changed, 1462 insertions(+), 118 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/7c014529/examples/src/main/resources/person.json
----------------------------------------------------------------------
diff --git a/examples/src/main/resources/person.json 
b/examples/src/main/resources/person.json
new file mode 100644
index 0000000..d651b0d
--- /dev/null
+++ b/examples/src/main/resources/person.json
@@ -0,0 +1,10 @@
+{ "id": 1, "name": "Ivan Ivanov", "department": "Executive commitee" }
+{ "id": 2, "name": "Petr Petrov", "department": "Executive commitee" }
+{ "id": 3, "name": "Jonh Doe", "department": "Production" }
+{ "id": 4, "name": "Smith Ann", "department": "Production" }
+{ "id": 5, "name": "Sergey Smirnov", "department": "Accounting" }
+{ "id": 6, "name": "Alexandra Sergeeva", "department": "Accounting" }
+{ "id": 7, "name": "Adam West", "department": "IT" }
+{ "id": 8, "name": "Beverley Chase", "department": "Head Office" }
+{ "id": 9, "name": "Igor Rozhkov", "department": "Head Office" }
+{ "id": 10, "name": "Anastasia Borisova", "department": "IT" }

http://git-wip-us.apache.org/repos/asf/ignite/blob/7c014529/examples/src/main/spark/org/apache/ignite/examples/spark/IgniteDataFrameWriteExample.scala
----------------------------------------------------------------------
diff --git 
a/examples/src/main/spark/org/apache/ignite/examples/spark/IgniteDataFrameWriteExample.scala
 
b/examples/src/main/spark/org/apache/ignite/examples/spark/IgniteDataFrameWriteExample.scala
new file mode 100644
index 0000000..7662fb4
--- /dev/null
+++ 
b/examples/src/main/spark/org/apache/ignite/examples/spark/IgniteDataFrameWriteExample.scala
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.examples.spark
+
+import java.lang.{Long ⇒ JLong, String ⇒ JString}
+
+import org.apache.ignite.cache.query.SqlFieldsQuery
+import org.apache.ignite.configuration.CacheConfiguration
+import org.apache.ignite.{Ignite, Ignition}
+import org.apache.log4j.{Level, Logger}
+import org.apache.spark.sql.{SaveMode, SparkSession}
+import org.apache.ignite.spark.IgniteDataFrameSettings._
+import org.apache.spark.sql.functions._
+
+import scala.collection.JavaConversions._
+
+/**
+  * Example application showing use-case for writing Spark DataFrame API to 
Ignite.
+  */
+object IgniteDataFrameWriteExample extends App {
+    /**
+      * Ignite config file.
+      */
+    private val CONFIG = "examples/config/example-ignite.xml"
+
+    /**
+      * Test cache name.
+      */
+    private val CACHE_NAME = "testCache"
+
+    //Starting Ignite server node.
+    val ignite = setupServerAndData
+
+    closeAfter(ignite) { _ ⇒
+        //Creating spark session.
+        implicit val spark: SparkSession = SparkSession.builder()
+            .appName("Spark Ignite data sources write example")
+            .master("local")
+            .config("spark.executor.instances", "2")
+            .getOrCreate()
+
+        // Adjust the logger to exclude the logs of no interest.
+        Logger.getRootLogger.setLevel(Level.INFO)
+        Logger.getLogger("org.apache.ignite").setLevel(Level.INFO)
+
+        // Executing examples.
+        println("Example of writing json file to Ignite:")
+
+        writeJSonToIgnite
+
+        println("Example of modifying existing Ignite table data through Data 
Fram API:")
+
+        editDataAndSaveToNewTable
+    }
+
+    def writeJSonToIgnite(implicit spark: SparkSession): Unit = {
+        //Load content of json file to data frame.
+        val personsDataFrame = 
spark.read.json("examples/src/main/resources/person.json")
+
+        println()
+        println("Json file content:")
+        println()
+
+        //Printing content of json file to console.
+        personsDataFrame.show()
+
+        println()
+        println("Writing Data Frame to Ignite:")
+        println()
+
+        //Writing content of data frame to Ignite.
+        personsDataFrame.write
+            .format(FORMAT_IGNITE)
+            .option(OPTION_CONFIG_FILE, CONFIG)
+            .option(OPTION_TABLE, "json_person")
+            .option(OPTION_CREATE_TABLE_PRIMARY_KEY_FIELDS, "id")
+            .option(OPTION_CREATE_TABLE_PARAMETERS, "template=replicated")
+            .save()
+
+        println("Done!")
+
+        println()
+        println("Reading data from Ignite table:")
+        println()
+
+        val cache = ignite.cache[Any, Any](CACHE_NAME)
+
+        //Reading saved data from Ignite.
+        val data = cache.query(new SqlFieldsQuery("SELECT id, name, department 
FROM json_person")).getAll
+
+        data.foreach { row ⇒ println(row.mkString("[", ", ", "]")) }
+    }
+
+    def editDataAndSaveToNewTable(implicit spark: SparkSession): Unit = {
+        //Load content of Ignite table to data frame.
+        val personDataFrame = spark.read
+            .format(FORMAT_IGNITE)
+            .option(OPTION_CONFIG_FILE, CONFIG)
+            .option(OPTION_TABLE, "person")
+            .load()
+
+        println()
+        println("Data frame content:")
+        println()
+
+        //Printing content of data frame to console.
+        personDataFrame.show()
+
+        println()
+        println("Modifying Data Frame and write it to Ignite:")
+        println()
+
+        personDataFrame
+            .withColumn("id", col("id") + 42) //Edit id column
+            .withColumn("name", reverse(col("name"))) //Edit name column
+            .write.format(FORMAT_IGNITE)
+            .option(OPTION_CONFIG_FILE, CONFIG)
+            .option(OPTION_TABLE, "new_persons")
+            .option(OPTION_CREATE_TABLE_PRIMARY_KEY_FIELDS, "id, city_id")
+            .option(OPTION_CREATE_TABLE_PARAMETERS, "backups=1")
+            .mode(SaveMode.Overwrite) //Overwriting entire table.
+            .save()
+
+        println("Done!")
+
+        println()
+        println("Reading data from Ignite table:")
+        println()
+
+        val cache = ignite.cache[Any, Any](CACHE_NAME)
+
+        //Reading saved data from Ignite.
+        val data = cache.query(new SqlFieldsQuery("SELECT id, name, city_id 
FROM new_persons")).getAll
+
+        data.foreach { row ⇒ println(row.mkString("[", ", ", "]")) }
+    }
+
+    def setupServerAndData: Ignite = {
+        //Starting Ignite.
+        val ignite = Ignition.start(CONFIG)
+
+        //Creating first test cache.
+        val ccfg = new CacheConfiguration[JLong, 
JString](CACHE_NAME).setSqlSchema("PUBLIC")
+
+        val cache = ignite.getOrCreateCache(ccfg)
+
+        //Creating SQL table.
+        cache.query(new SqlFieldsQuery(
+            "CREATE TABLE person (id LONG, name VARCHAR, city_id LONG, PRIMARY 
KEY (id)) " +
+                "WITH \"backups=1\"")).getAll
+
+        cache.query(new SqlFieldsQuery("CREATE INDEX on Person 
(city_id)")).getAll
+
+        //Inserting some data to tables.
+        val qry = new SqlFieldsQuery("INSERT INTO person (id, name, city_id) 
values (?, ?, ?)")
+
+        cache.query(qry.setArgs(1L.asInstanceOf[JLong], "John Doe", 
3L.asInstanceOf[JLong])).getAll
+        cache.query(qry.setArgs(2L.asInstanceOf[JLong], "Jane Roe", 
2L.asInstanceOf[JLong])).getAll
+        cache.query(qry.setArgs(3L.asInstanceOf[JLong], "Mary Major", 
1L.asInstanceOf[JLong])).getAll
+        cache.query(qry.setArgs(4L.asInstanceOf[JLong], "Richard Miles", 
2L.asInstanceOf[JLong])).getAll
+
+        ignite
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/7c014529/examples/src/test/spark/org/apache/ignite/spark/examples/IgniteDataFrameSelfTest.java
----------------------------------------------------------------------
diff --git 
a/examples/src/test/spark/org/apache/ignite/spark/examples/IgniteDataFrameSelfTest.java
 
b/examples/src/test/spark/org/apache/ignite/spark/examples/IgniteDataFrameSelfTest.java
index b18d870..f9f1d64 100644
--- 
a/examples/src/test/spark/org/apache/ignite/spark/examples/IgniteDataFrameSelfTest.java
+++ 
b/examples/src/test/spark/org/apache/ignite/spark/examples/IgniteDataFrameSelfTest.java
@@ -19,6 +19,7 @@ package org.apache.ignite.spark.examples;
 
 import org.apache.ignite.examples.spark.IgniteCatalogExample;
 import org.apache.ignite.examples.spark.IgniteDataFrameExample;
+import org.apache.ignite.examples.spark.IgniteDataFrameWriteExample;
 import org.junit.Test;
 
 /**
@@ -41,4 +42,12 @@ public class IgniteDataFrameSelfTest {
     public void testDataFrameExample() throws Exception {
         IgniteDataFrameExample.main(EMPTY_ARGS);
     }
+
+    /**
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testDataFrameWriteExample() throws Exception {
+        IgniteDataFrameWriteExample.main(EMPTY_ARGS);
+    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/7c014529/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteDataFrameSettings.scala
----------------------------------------------------------------------
diff --git 
a/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteDataFrameSettings.scala
 
b/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteDataFrameSettings.scala
index 4261032..6bff476 100644
--- 
a/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteDataFrameSettings.scala
+++ 
b/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteDataFrameSettings.scala
@@ -53,4 +53,104 @@ object IgniteDataFrameSettings {
       * @see [[org.apache.ignite.cache.QueryEntity#tableName]]
       */
     val OPTION_TABLE = "table"
+
+    /**
+      * Config option to specify newly created Ignite SQL table parameters.
+      * Value of these option will be used in `CREATE TABLE ...  WITH "option 
value goes here"`
+      *
+      * @example {{{
+      * val igniteDF = spark.write.format(IGNITE)
+      *     // other options ...
+      *     .option( OPTION_CREATE_TABLE_PARAMETERS, "backups=1, 
template=replicated")
+      *     .save()
+      * }}}
+      *
+      * @see [[https://apacheignite-sql.readme.io/docs/create-table]]
+      */
+    val OPTION_CREATE_TABLE_PARAMETERS = "createTableParameters"
+
+    /**
+      * Config option to specify comma separated list of primary key fields 
for a newly created Ignite SQL table.
+      *
+      * @example {{{
+      * val igniteDF = spark.write.format(IGNITE)
+      *     // other options ...
+      *     .option(OPTION_CREATE_TABLE_PRIMARY_KEY_FIELDS, "id")
+      *     .save()
+      * }}}
+      *
+      * @see [[https://apacheignite-sql.readme.io/docs/create-table]]
+      */
+    val OPTION_CREATE_TABLE_PRIMARY_KEY_FIELDS = "primaryKeyFields"
+
+    /**
+      * Config option for saving data frame.
+      * Internally all SQL inserts are done through `IgniteDataStreamer`.
+      * This options sets `allowOverwrite` property of streamer.
+      * If `true` then row with same primary key value will be written to the 
table.
+      * If `false` then row with same primary key value will be skipped. 
Existing row will be left in the table.
+      * Default value if `false`.
+      *
+      * @example {{{
+      * val igniteDF = spark.write.format(IGNITE)
+      *     // other options ...
+      *     .option(OPTION_STREAMER_ALLOW_OVERWRITE, true)
+      *     .save()
+      * }}}
+      *
+      * @see [[org.apache.ignite.IgniteDataStreamer]]
+      * @see [[org.apache.ignite.IgniteDataStreamer#allowOverwrite(boolean)]]
+      */
+    val OPTION_STREAMER_ALLOW_OVERWRITE = "streamerAllowOverwrite"
+
+    /**
+      * Config option for saving data frame.
+      * Internally all SQL inserts are done through `IgniteDataStreamer`.
+      * This options sets `autoFlushFrequency` property of streamer.
+      *
+      * @example {{{
+      * val igniteDF = spark.write.format(IGNITE)
+      *     // other options ...
+      *     .option(OPTION_STREAMING_FLUSH_FREQUENCY, 10000)
+      *     .save()
+      * }}}
+      *
+      * @see [[org.apache.ignite.IgniteDataStreamer]]
+      * @see [[org.apache.ignite.IgniteDataStreamer#autoFlushFrequency(long)]]
+      */
+    val OPTION_STREAMER_FLUSH_FREQUENCY = "streamerFlushFrequency"
+
+    /**
+      * Config option for saving data frame.
+      * Internally all SQL inserts are done through `IgniteDataStreamer`.
+      * This options sets perNodeBufferSize` property of streamer.
+      *
+      * @example {{{
+      * val igniteDF = spark.write.format(IGNITE)
+      *     // other options ...
+      *     .option(OPTION_STREAMING_PER_NODE_BUFFER_SIZE, 1024)
+      *     .save()
+      * }}}
+      *
+      * @see [[org.apache.ignite.IgniteDataStreamer]]
+      * @see [[org.apache.ignite.IgniteDataStreamer#perNodeBufferSize(int)]]
+      */
+    val OPTION_STREAMER_PER_NODE_BUFFER_SIZE = "streamerPerNodeBufferSize"
+
+    /**
+      * Config option for saving data frame.
+      * Internally all SQL inserts are done through `IgniteDataStreamer`.
+      * This options sets `perNodeParallelOperations` property of streamer.
+      *
+      * @example {{{
+      * val igniteDF = spark.write.format(IGNITE)
+      *     // other options ...
+      *     .option(OPTION_STREAMING_PER_NODE_PARALLEL_OPERATIONS, 42)
+      *     .save()
+      * }}}
+      *
+      * @see [[org.apache.ignite.IgniteDataStreamer]]
+      * @see 
[[org.apache.ignite.IgniteDataStreamer#perNodeParallelOperations(int)]]
+      */
+    val OPTION_STREAMER_PER_NODE_PARALLEL_OPERATIONS = 
"streamerPerNodeParallelOperations"
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/7c014529/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteRelationProvider.scala
----------------------------------------------------------------------
diff --git 
a/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteRelationProvider.scala
 
b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteRelationProvider.scala
index 4762d8d..a9f9f89 100644
--- 
a/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteRelationProvider.scala
+++ 
b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteRelationProvider.scala
@@ -17,21 +17,24 @@
 
 package org.apache.ignite.spark.impl
 
+import org.apache.ignite.IgniteException
 import org.apache.ignite.configuration.IgniteConfiguration
 import org.apache.ignite.internal.IgnitionEx
 import org.apache.ignite.internal.util.IgniteUtils
-import org.apache.ignite.spark.IgniteDataFrameSettings.OPTION_TABLE
 import org.apache.ignite.spark.IgniteContext
 import org.apache.ignite.spark.IgniteDataFrameSettings._
-import org.apache.ignite.IgniteException
-import org.apache.spark.sql.SQLContext
-import org.apache.spark.sql.ignite.IgniteExternalCatalog.OPTION_GRID
+import org.apache.ignite.spark.impl.QueryHelper.{createTable, dropTable, 
ensureCreateTableOptions, saveTable}
+import org.apache.spark.sql.SaveMode.{Append, Overwrite}
+import org.apache.spark.sql.ignite.IgniteExternalCatalog.{IGNITE_PROTOCOL, 
OPTION_GRID}
 import org.apache.spark.sql.sources._
+import org.apache.spark.sql.{DataFrame, SQLContext, SaveMode}
 
 /**
   * Apache Ignite relation provider.
   */
-class IgniteRelationProvider extends RelationProvider with DataSourceRegister {
+class IgniteRelationProvider extends RelationProvider
+    with CreatableRelationProvider
+    with DataSourceRegister {
     /**
       * @return "ignite" - name of relation provider.
       */
@@ -42,8 +45,7 @@ class IgniteRelationProvider extends RelationProvider with 
DataSourceRegister {
       * To refer cluster user have to specify one of config parameter:
       * <ul>
       *     <li><code>config</code> - path to ignite configuration file.
-      *     <li><code>grid</code> - grid name. Note that grid has to be 
started in the same jvm.
-      * <ul>
+      * </ul>
       * Existing table inside Apache Ignite should be referred via 
<code>table</code> parameter.
       *
       * @param sqlCtx SQLContext.
@@ -51,8 +53,141 @@ class IgniteRelationProvider extends RelationProvider with 
DataSourceRegister {
       * @return IgniteRelation.
       * @see IgniteRelation
       * @see IgnitionEx#grid(String)
+      * @see org.apache.ignite.spark.IgniteDataFrameSettings.OPTION_TABLE
+      * @see org.apache.ignite.spark.IgniteDataFrameSettings.OPTION_CONFIG_FILE
+      */
+    override def createRelation(sqlCtx: SQLContext, params: Map[String, 
String]): BaseRelation =
+        createRelation(
+            igniteContext(params, sqlCtx),
+            params.getOrElse(OPTION_TABLE, throw new IgniteException("'table' 
must be specified.")),
+            sqlCtx)
+
+    /**
+      * Save `data` to corresponding Ignite table and returns Relation for 
saved data.
+      *
+      * To save data or create IgniteRelation we need a link to a ignite 
cluster and a table name.
+      * To refer cluster user have to specify one of config parameter:
+      * <ul>
+      *     <li><code>config</code> - path to ignite configuration file.
+      * </ul>
+      * Existing table inside Apache Ignite should be referred via 
<code>table</code> or <code>path</code> parameter.
+      *
+      * If table doesn't exists it will be created.
+      * If `mode` is Overwrite and `table` already exists it will be 
recreated(DROP TABLE, CREATE TABLE).
+      *
+      * If table create is required use can set following options:
+      *
+      * <ul>
+      *     <li>`OPTION_PRIMARY_KEY_FIELDS` - required option. comma separated 
list of fields for primary key.</li>
+      *     <li>`OPTION_CACHE_FOR_DDL` - required option. Existing cache name 
for executing SQL DDL statements.
+      *     <li>`OPTION_CREATE_TABLE_OPTIONS` - Ignite specific parameters for 
a new table. See WITH 
[https://apacheignite-sql.readme.io/docs/create-table].</li>
+      * </ul>
+      *
+      * Data write executed 'by partition'. User can set 
`OPTION_WRITE_PARTITIONS_NUM` - number of partition for data.
+      *
+      * @param sqlCtx SQLContext.
+      * @param mode Save mode.
+      * @param params Additional parameters.
+      * @param data Data to save.
+      * @return IgniteRelation.
+      */
+    override def createRelation(sqlCtx: SQLContext,
+        mode: SaveMode,
+        params: Map[String, String],
+        data: DataFrame): BaseRelation = {
+
+        val ctx = igniteContext(params, sqlCtx)
+
+        val tblName = tableName(params)
+
+        val tblInfoOption = sqlTableInfo[Any, Any](ctx.ignite(), tblName)
+
+        if (tblInfoOption.isDefined) {
+            mode match {
+                case Overwrite ⇒
+                    ensureCreateTableOptions(data.schema, params, ctx)
+
+                    dropTable(tblName, ctx.ignite())
+
+                    val createTblOpts = 
params.get(OPTION_CREATE_TABLE_PARAMETERS)
+
+                    createTable(data.schema,
+                        tblName,
+                        primaryKeyFields(params),
+                        createTblOpts,
+                        ctx.ignite())
+
+                    saveTable(data,
+                        tblName,
+                        ctx,
+                        
params.get(OPTION_STREAMER_ALLOW_OVERWRITE).map(_.toBoolean),
+                        
params.get(OPTION_STREAMER_FLUSH_FREQUENCY).map(_.toLong),
+                        
params.get(OPTION_STREAMER_PER_NODE_BUFFER_SIZE).map(_.toInt),
+                        
params.get(OPTION_STREAMER_PER_NODE_PARALLEL_OPERATIONS).map(_.toInt))
+
+                case Append ⇒
+                    saveTable(data,
+                        tblName,
+                        ctx,
+                        
params.get(OPTION_STREAMER_ALLOW_OVERWRITE).map(_.toBoolean),
+                        
params.get(OPTION_STREAMER_FLUSH_FREQUENCY).map(_.toLong),
+                        
params.get(OPTION_STREAMER_PER_NODE_BUFFER_SIZE).map(_.toInt),
+                        
params.get(OPTION_STREAMER_PER_NODE_PARALLEL_OPERATIONS).map(_.toInt))
+
+                case SaveMode.ErrorIfExists =>
+                    throw new IgniteException(s"Table or view '$tblName' 
already exists. SaveMode: ErrorIfExists.")
+
+                case SaveMode.Ignore =>
+                    // With `SaveMode.Ignore` mode, if table already exists, 
the save operation is expected
+                    // to not save the contents of the DataFrame and to not 
change the existing data.
+                    // Therefore, it is okay to do nothing here and then just 
return the relation below.
+            }
+        }
+        else {
+            ensureCreateTableOptions(data.schema, params, ctx)
+
+            val primaryKeyFields = 
params(OPTION_CREATE_TABLE_PRIMARY_KEY_FIELDS).split(",")
+
+            val createTblOpts = params.get(OPTION_CREATE_TABLE_PARAMETERS)
+
+            createTable(data.schema,
+                tblName,
+                primaryKeyFields,
+                createTblOpts,
+                ctx.ignite())
+
+            saveTable(data,
+                tblName,
+                ctx,
+                params.get(OPTION_STREAMER_ALLOW_OVERWRITE).map(_.toBoolean),
+                params.get(OPTION_STREAMER_FLUSH_FREQUENCY).map(_.toLong),
+                params.get(OPTION_STREAMER_PER_NODE_BUFFER_SIZE).map(_.toInt),
+                
params.get(OPTION_STREAMER_PER_NODE_PARALLEL_OPERATIONS).map(_.toInt))
+        }
+
+        createRelation(ctx,
+            tblName,
+            sqlCtx)
+    }
+
+    /**
+      * @param igniteCtx Ignite context.
+      * @param tblName Table name.
+      * @param sqlCtx SQL context.
+      * @return Ignite SQL relation.
+      */
+    private def createRelation(igniteCtx: IgniteContext, tblName: String, 
sqlCtx: SQLContext): BaseRelation =
+        IgniteSQLRelation(
+            igniteCtx,
+            tblName,
+            sqlCtx)
+
+    /**
+      * @param params Params.
+      * @param sqlCtx SQL Context.
+      * @return IgniteContext.
       */
-    override def createRelation(sqlCtx: SQLContext, params: Map[String, 
String]): BaseRelation = {
+    private def igniteContext(params: Map[String, String], sqlCtx: 
SQLContext): IgniteContext = {
         val igniteHome = IgniteUtils.getIgniteHome
 
         def configProvider: () ⇒ IgniteConfiguration = {
@@ -80,11 +215,27 @@ class IgniteRelationProvider extends RelationProvider with 
DataSourceRegister {
                 throw new IgniteException("'config' must be specified to 
connect to ignite cluster.")
         }
 
-        val ic = IgniteContext(sqlCtx.sparkContext, configProvider)
+        IgniteContext(sqlCtx.sparkContext, configProvider)
+    }
+
+    /**
+      * @param params Params.
+      * @return Table name.
+      */
+    private def tableName(params: Map[String, String]): String = {
+        val tblName = params.getOrElse(OPTION_TABLE,
+            params.getOrElse("path", throw new IgniteException("'table' or 
'path' must be specified.")))
 
-        if (params.contains(OPTION_TABLE))
-            IgniteSQLRelation(ic, params(OPTION_TABLE).toUpperCase, sqlCtx)
+        if (tblName.startsWith(IGNITE_PROTOCOL))
+            tblName.replace(IGNITE_PROTOCOL, "").toUpperCase()
         else
-            throw new IgniteException("'table' must be specified for loading 
ignite data.")
+            tblName.toUpperCase
     }
+
+    /**
+      * @param params Params.
+      * @return Sequence of primary key fields.
+      */
+    private def primaryKeyFields(params: Map[String, String]): Seq[String] =
+        params(OPTION_CREATE_TABLE_PRIMARY_KEY_FIELDS).split(",")
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/7c014529/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteSQLRelation.scala
----------------------------------------------------------------------
diff --git 
a/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteSQLRelation.scala
 
b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteSQLRelation.scala
index bdcf57b..1fb8de7 100644
--- 
a/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteSQLRelation.scala
+++ 
b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteSQLRelation.scala
@@ -17,12 +17,13 @@
 
 package org.apache.ignite.spark.impl
 
-import org.apache.ignite.{Ignite, IgniteException}
+import org.apache.ignite.IgniteException
 import org.apache.ignite.cache.{CacheMode, QueryEntity}
 import org.apache.ignite.cluster.ClusterNode
 import org.apache.ignite.configuration.CacheConfiguration
 import org.apache.ignite.spark.{IgniteContext, IgniteRDD}
 import org.apache.spark.Partition
+import org.apache.spark.internal.Logging
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.sources._
 import org.apache.spark.sql.types._
@@ -37,7 +38,7 @@ import scala.collection.mutable.ArrayBuffer
 class IgniteSQLRelation[K, V](
     private[spark] val ic: IgniteContext,
     private[spark] val tableName: String)
-    (@transient val sqlContext: SQLContext) extends BaseRelation with 
PrunedFilteredScan {
+    (@transient val sqlContext: SQLContext) extends BaseRelation with 
PrunedFilteredScan with Logging {
 
     /**
       * @return Schema of Ignite SQL table.
@@ -55,6 +56,19 @@ class IgniteSQLRelation[K, V](
       * @return Apache Ignite RDD implementation.
       */
     override def buildScan(columns: Array[String], filters: Array[Filter]): 
RDD[Row] = {
+        val qryAndArgs = queryAndArgs(columns, filters)
+
+        IgniteSQLDataFrameRDD[K, V](ic, cacheName, schema, qryAndArgs._1, 
qryAndArgs._2, calcPartitions(filters))
+    }
+
+    override def toString = s"IgniteSQLRelation[table=$tableName]"
+
+    /**
+      * @param columns Columns to select.
+      * @param filters Filters to apply.
+      * @return SQL query string and arguments for it.
+      */
+    private def queryAndArgs(columns: Array[String], filters: Array[Filter]): 
(String, List[Any]) = {
         val columnsStr =
             if (columns.isEmpty)
                 "*"
@@ -65,82 +79,17 @@ class IgniteSQLRelation[K, V](
         //Query will be executed by Ignite SQL Engine.
         val qryAndArgs = filters match {
             case Array(_, _*) ⇒
-                val where = compileWhere(filters)
+                val where = QueryUtils.compileWhere(filters)
+
                 (s"SELECT $columnsStr FROM $tableName WHERE ${where._1}", 
where._2)
+
             case _ ⇒
                 (s"SELECT $columnsStr FROM $tableName", List.empty)
         }
 
-        IgniteSQLDataFrameRDD[K, V](ic, cacheName, schema, qryAndArgs._1, 
qryAndArgs._2, calcPartitions(filters))
-    }
-
-    override def toString = s"IgniteSQLRelation[table=$tableName]"
+        logInfo(qryAndArgs._1)
 
-    /**
-      * Builds `where` part of SQL query.
-      *
-      * @param filters Filter to apply.
-      * @return Tuple contains `where` string and `List[Any]` of query 
parameters.
-      */
-    private def compileWhere(filters: Array[Filter]): (String, List[Any]) =
-        filters.foldLeft(("", List[Any]()))(buildSingleClause)
-
-    /**
-      * Adds single where clause to `state` and returns new state.
-      *
-      * @param state Current `where` state.
-      * @param clause Clause to add.
-      * @return `where` with given clause.
-      */
-    private def buildSingleClause(state: (String, List[Any]), clause: Filter): 
(String, List[Any]) = {
-        val filterStr = state._1
-        val params = state._2
-
-        clause match {
-            case EqualTo(attr, value) ⇒ (addStrClause(filterStr, s"$attr = 
?"), params :+ value)
-
-            case EqualNullSafe(attr, value) ⇒ (addStrClause(filterStr, 
s"($attr IS NULL OR $attr = ?)"), params :+ value)
-
-            case GreaterThan(attr, value) ⇒ (addStrClause(filterStr, s"$attr 
> ?"), params :+ value)
-
-            case GreaterThanOrEqual(attr, value) ⇒ (addStrClause(filterStr, 
s"$attr >= ?"), params :+ value)
-
-            case LessThan(attr, value) ⇒ (addStrClause(filterStr, s"$attr < 
?"), params :+ value)
-
-            case LessThanOrEqual(attr, value) ⇒ (addStrClause(filterStr, 
s"$attr <= ?"), params :+ value)
-
-            case In(attr, values) ⇒ (addStrClause(filterStr, s"$attr IN 
(${values.map(_ ⇒ "?").mkString(",")})"), params ++ values)
-
-            case IsNull(attr) ⇒ (addStrClause(filterStr, s"$attr IS NULL"), 
params)
-
-            case IsNotNull(attr) ⇒ (addStrClause(filterStr, s"$attr IS NOT 
NULL"), params)
-
-            case And(left, right) ⇒
-                val leftClause = buildSingleClause(("", params), left)
-                val rightClause = buildSingleClause(("", leftClause._2), right)
-
-                (addStrClause(filterStr, s"${leftClause._1} AND 
${rightClause._1}"), rightClause._2)
-
-            case Or(left, right) ⇒
-                val leftClause = buildSingleClause(("", params), left)
-                val rightClause = buildSingleClause(("", leftClause._2), right)
-
-                (addStrClause(filterStr, s"${leftClause._1} OR 
${rightClause._1}"), rightClause._2)
-
-            case Not(child) ⇒
-                val innerClause = buildSingleClause(("", params), child)
-
-                (addStrClause(filterStr, s"NOT ${innerClause._1}"), 
innerClause._2)
-
-            case StringStartsWith(attr, value) ⇒
-                (addStrClause(filterStr, s"$attr LIKE ?"), params :+ (value + 
"%"))
-
-            case StringEndsWith(attr, value) ⇒
-                (addStrClause(filterStr, s"$attr LIKE ?"), params :+ ("%" + 
value))
-
-            case StringContains(attr, value) ⇒
-                (addStrClause(filterStr, s"$attr LIKE ?"), params :+ ("%" + 
value + "%"))
-        }
+        qryAndArgs
     }
 
     private def calcPartitions(filters: Array[Filter]): Array[Partition] = {

http://git-wip-us.apache.org/repos/asf/ignite/blob/7c014529/modules/spark/src/main/scala/org/apache/ignite/spark/impl/QueryHelper.scala
----------------------------------------------------------------------
diff --git 
a/modules/spark/src/main/scala/org/apache/ignite/spark/impl/QueryHelper.scala 
b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/QueryHelper.scala
new file mode 100644
index 0000000..4342265
--- /dev/null
+++ 
b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/QueryHelper.scala
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spark.impl
+
+import org.apache.ignite.cache.query.SqlFieldsQuery
+import org.apache.ignite.spark.IgniteDataFrameSettings._
+import QueryUtils.{compileCreateTable, compileDropTable, compileInsert}
+import org.apache.ignite.internal.IgniteEx
+import org.apache.ignite.spark.IgniteContext
+import org.apache.ignite.{Ignite, IgniteException}
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.{DataFrame, Row}
+
+/**
+  * Helper class for executing DDL queries.
+  */
+private[apache] object QueryHelper {
+    /**
+      * Drops provided table.
+      *
+      * @param tableName Table name.
+      * @param ignite Ignite.
+      */
+    def dropTable(tableName: String, ignite: Ignite): Unit = {
+        val qryProcessor = ignite.asInstanceOf[IgniteEx].context().query()
+
+        val qry = compileDropTable(tableName)
+
+        qryProcessor.querySqlFields(new SqlFieldsQuery(qry), true).getAll
+    }
+
+    /**
+      * Creates table.
+      *
+      * @param schema Schema.
+      * @param tblName Table name.
+      * @param primaryKeyFields Primary key fields.
+      * @param createTblOpts Ignite specific options.
+      * @param ignite Ignite.
+      */
+    def createTable(schema: StructType, tblName: String, primaryKeyFields: 
Seq[String], createTblOpts: Option[String],
+        ignite: Ignite): Unit = {
+        val qryProcessor = ignite.asInstanceOf[IgniteEx].context().query()
+
+        val qry = compileCreateTable(schema, tblName, primaryKeyFields, 
createTblOpts)
+
+        qryProcessor.querySqlFields(new SqlFieldsQuery(qry), true).getAll
+    }
+
+    /**
+      * Ensures all options are specified correctly to create table based on 
provided `schema`.
+      *
+      * @param schema Schema of new table.
+      * @param params Parameters.
+      */
+    def ensureCreateTableOptions(schema: StructType, params: Map[String, 
String], ctx: IgniteContext): Unit = {
+        if (!params.contains(OPTION_TABLE) && !params.contains("path"))
+            throw new IgniteException("'table' must be specified.")
+
+        params.get(OPTION_CREATE_TABLE_PRIMARY_KEY_FIELDS)
+            .map(_.split(','))
+            .getOrElse(throw new IgniteException("Can't create table! Primary 
key fields has to be specified."))
+            .map(_.trim)
+            .foreach { pkField ⇒
+                if (pkField == "")
+                    throw new IgniteException("PK field can't be empty.")
+
+                if (!schema.exists(_.name.equalsIgnoreCase(pkField)))
+                    throw new IgniteException(s"'$pkField' doesn't exists in 
DataFrame schema.")
+
+            }
+    }
+
+    /**
+      * Saves data to the table.
+      *
+      * @param data Data.
+      * @param tblName Table name.
+      * @param ctx Ignite context.
+      * @param streamerAllowOverwrite Flag enabling overwriting existing 
values in cache.
+      * @param streamerFlushFrequency Insert query streamer automatic flush 
frequency.
+      * @param streamerPerNodeBufferSize Insert query streamer size of per 
node query buffer.
+      * @param streamerPerNodeParallelOperations Insert query streamer maximum 
number of parallel operations for a single node.
+      *
+      * @see [[org.apache.ignite.IgniteDataStreamer]]
+      * @see [[org.apache.ignite.IgniteDataStreamer#allowOverwrite(boolean)]]
+      * @see [[org.apache.ignite.IgniteDataStreamer#autoFlushFrequency(long)]]
+      * @see [[org.apache.ignite.IgniteDataStreamer#perNodeBufferSize(int)]]
+      * @see 
[[org.apache.ignite.IgniteDataStreamer#perNodeParallelOperations(int)]]
+      */
+    def saveTable(data: DataFrame,
+        tblName: String,
+        ctx: IgniteContext,
+        streamerAllowOverwrite: Option[Boolean],
+        streamerFlushFrequency: Option[Long],
+        streamerPerNodeBufferSize: Option[Int],
+        streamerPerNodeParallelOperations: Option[Int]
+    ): Unit = {
+        val insertQry = compileInsert(tblName, data.schema)
+
+        data.rdd.foreachPartition(iterator =>
+            savePartition(iterator,
+                insertQry,
+                tblName,
+                ctx,
+                streamerAllowOverwrite,
+                streamerFlushFrequency,
+                streamerPerNodeBufferSize,
+                streamerPerNodeParallelOperations
+            ))
+    }
+
+    /**
+      * Saves partition data to the Ignite table.
+      *
+      * @param iterator Data iterator.
+      * @param insertQry Insert query.
+      * @param tblName Table name.
+      * @param ctx Ignite context.
+      * @param streamerAllowOverwrite Flag enabling overwriting existing 
values in cache.
+      * @param streamerFlushFrequency Insert query streamer automatic flush 
frequency.
+      * @param streamerPerNodeBufferSize Insert query streamer size of per 
node query buffer.
+      * @param streamerPerNodeParallelOperations Insert query streamer maximum 
number of parallel operations for a single node.
+      *
+      * @see [[org.apache.ignite.IgniteDataStreamer]]
+      * @see [[org.apache.ignite.IgniteDataStreamer#allowOverwrite(boolean)]]
+      * @see [[org.apache.ignite.IgniteDataStreamer#autoFlushFrequency(long)]]
+      * @see [[org.apache.ignite.IgniteDataStreamer#perNodeBufferSize(int)]]
+      * @see 
[[org.apache.ignite.IgniteDataStreamer#perNodeParallelOperations(int)]]
+      */
+    private def savePartition(iterator: Iterator[Row],
+        insertQry: String,
+        tblName: String,
+        ctx: IgniteContext,
+        streamerAllowOverwrite: Option[Boolean],
+        streamerFlushFrequency: Option[Long],
+        streamerPerNodeBufferSize: Option[Int],
+        streamerPerNodeParallelOperations: Option[Int]
+    ): Unit = {
+        val tblInfo = sqlTableInfo[Any, Any](ctx.ignite(), tblName).get
+
+        val streamer = ctx.ignite().dataStreamer(tblInfo._1.getName)
+
+        streamerAllowOverwrite.foreach(v ⇒ streamer.allowOverwrite(v))
+
+        streamerFlushFrequency.foreach(v ⇒ streamer.autoFlushFrequency(v))
+
+        streamerPerNodeBufferSize.foreach(v ⇒ streamer.perNodeBufferSize(v))
+
+        streamerPerNodeParallelOperations.foreach(v ⇒ 
streamer.perNodeParallelOperations(v))
+
+        try {
+            val qryProcessor = 
ctx.ignite().asInstanceOf[IgniteEx].context().query()
+
+            iterator.foreach { row ⇒
+                val schema = row.schema
+
+                val args = schema.map { f ⇒
+                    row.get(row.fieldIndex(f.name)).asInstanceOf[Object]
+                }
+
+                qryProcessor.streamUpdateQuery(tblInfo._1.getName,
+                    tblInfo._1.getSqlSchema, streamer, insertQry, args.toArray)
+            }
+        }
+        finally {
+            streamer.close()
+        }
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/7c014529/modules/spark/src/main/scala/org/apache/ignite/spark/impl/QueryUtils.scala
----------------------------------------------------------------------
diff --git 
a/modules/spark/src/main/scala/org/apache/ignite/spark/impl/QueryUtils.scala 
b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/QueryUtils.scala
new file mode 100644
index 0000000..79aa523
--- /dev/null
+++ b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/QueryUtils.scala
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spark.impl
+
+import org.apache.ignite.IgniteException
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.sources._
+import org.apache.spark.sql.types._
+
+/**
+  * Utility class for building SQL queries.
+  */
+private[impl] object QueryUtils extends Logging {
+    /**
+      * Builds `where` part of SQL query.
+      *
+      * @param filters Filter to apply.
+      * @return Tuple contains `where` string and `List[Any]` of query 
parameters.
+      */
+    def compileWhere(filters: Seq[Filter]): (String, List[Any]) =
+        filters.foldLeft(("", List[Any]()))(buildSingleClause)
+
+    /**
+      * Builds `insert` query for provided table and schema.
+      *
+      * @param tblName Table name.
+      * @param tblSchema Schema.
+      * @return SQL query to insert data into table.
+      */
+    def compileInsert(tblName: String, tblSchema: StructType): String = {
+        val columns = tblSchema.fields.map(_.name).mkString(",")
+        val placeholder = tblSchema.fields.map(_ ⇒ "?").mkString(",")
+
+        val qry = s"INSERT INTO $tblName($columns) VALUES($placeholder)"
+
+        logInfo(qry)
+
+        qry
+    }
+
+    /**
+      * Builds `drop table` query.
+      *
+      * @param tblName Table name.
+      * @return SQL query to drop table.
+      */
+    def compileDropTable(tblName: String): String = {
+        val qry = s"DROP TABLE ${tblName}"
+
+        logInfo(qry)
+
+        qry
+    }
+
+    /**
+      * Builds `create table` query.
+      *
+      * @param schema Schema.
+      * @param tblName Table name.
+      * @param primaryKeyFields Primary key fields.
+      * @param createTblOpts Ignite specific options for table.
+      * @return SQL query to create table.
+      */
+    def compileCreateTable(schema: StructType, tblName: String, 
primaryKeyFields: Seq[String], createTblOpts: Option[String]): String = {
+        val pk = s", PRIMARY KEY (${primaryKeyFields.mkString(",")})"
+
+        val withParams = createTblOpts.map(w ⇒ s"""WITH 
\"$w\"""").getOrElse("")
+
+        val qry = s"CREATE TABLE $tblName 
(${schema.map(compileColumn).mkString(", ")} $pk) $withParams"
+
+        logInfo(qry)
+
+        qry
+    }
+
+    /**
+      * @param field Column.
+      * @return SQL query part for column.
+      */
+    private def compileColumn(field: StructField): String = {
+        val col = s"${field.name} ${dataType(field)}"
+
+        if (!field.nullable)
+            col + " NOT NULL"
+        else
+            col
+    }
+
+    /**
+      * Gets Ignite data type based on type name.
+      *
+      * @param field Field.
+      * @return SQL data type.
+      */
+    private def dataType(field: StructField): String = field.dataType match {
+        case BooleanType ⇒
+            "BOOLEAN"
+
+        case ByteType ⇒
+            "TINYINT"
+
+        case ShortType ⇒
+            "SMALLINT"
+
+        case IntegerType ⇒
+            "INT"
+
+        case LongType ⇒
+            "BIGINT"
+
+        case FloatType ⇒
+            "FLOAT"
+
+        case DoubleType ⇒
+            "DOUBLE"
+
+        //For now Ignite doesn't provide correct information about DECIMAL 
column precision and scale.
+        //All we have is default scale and precision.
+        //Just replace it with some "common sense" values.
+        case decimal: DecimalType if decimal.precision == 10 && decimal.scale 
== 0 ⇒
+            s"DECIMAL(10, 5)"
+
+        case decimal: DecimalType ⇒
+            s"DECIMAL(${decimal.precision}, ${decimal.scale})"
+
+        case StringType ⇒
+            "VARCHAR"
+
+        case DateType ⇒
+            "DATE"
+
+        case TimestampType ⇒
+            "TIMESTAMP"
+
+        case _ ⇒
+            throw new IgniteException(s"Unsupported data type 
${field.dataType}")
+    }
+
+    /**
+      * Adds single where clause to `state` and returns new state.
+      *
+      * @param state Current `where` state.
+      * @param clause Clause to add.
+      * @return `where` with given clause.
+      */
+    private def buildSingleClause(state: (String, List[Any]), clause: Filter): 
(String, List[Any]) = {
+        val filterStr = state._1
+
+        val params = state._2
+
+        clause match {
+            case EqualTo(attr, value) ⇒ (addStrClause(filterStr, s"$attr = 
?"), params :+ value)
+
+            case EqualNullSafe(attr, value) ⇒ (addStrClause(filterStr, 
s"($attr IS NULL OR $attr = ?)"), params :+ value)
+
+            case GreaterThan(attr, value) ⇒ (addStrClause(filterStr, s"$attr 
> ?"), params :+ value)
+
+            case GreaterThanOrEqual(attr, value) ⇒ (addStrClause(filterStr, 
s"$attr >= ?"), params :+ value)
+
+            case LessThan(attr, value) ⇒ (addStrClause(filterStr, s"$attr < 
?"), params :+ value)
+
+            case LessThanOrEqual(attr, value) ⇒ (addStrClause(filterStr, 
s"$attr <= ?"), params :+ value)
+
+            case In(attr, values) ⇒ (addStrClause(filterStr, s"$attr IN 
(${values.map(_ ⇒ "?").mkString(",")})"), params ++ values)
+
+            case IsNull(attr) ⇒ (addStrClause(filterStr, s"$attr IS NULL"), 
params)
+
+            case IsNotNull(attr) ⇒ (addStrClause(filterStr, s"$attr IS NOT 
NULL"), params)
+
+            case And(left, right) ⇒
+                val leftClause = buildSingleClause(("", params), left)
+                val rightClause = buildSingleClause(("", leftClause._2), right)
+
+                (addStrClause(filterStr, s"${leftClause._1} AND 
${rightClause._1}"), rightClause._2)
+
+            case Or(left, right) ⇒
+                val leftClause = buildSingleClause(("", params), left)
+                val rightClause = buildSingleClause(("", leftClause._2), right)
+
+                (addStrClause(filterStr, s"${leftClause._1} OR 
${rightClause._1}"), rightClause._2)
+
+            case Not(child) ⇒
+                val innerClause = buildSingleClause(("", params), child)
+
+                (addStrClause(filterStr, s"NOT ${innerClause._1}"), 
innerClause._2)
+
+            case StringStartsWith(attr, value) ⇒
+                (addStrClause(filterStr, s"$attr LIKE ?"), params :+ (value + 
"%"))
+
+            case StringEndsWith(attr, value) ⇒
+                (addStrClause(filterStr, s"$attr LIKE ?"), params :+ ("%" + 
value))
+
+            case StringContains(attr, value) ⇒
+                (addStrClause(filterStr, s"$attr LIKE ?"), params :+ ("%" + 
value + "%"))
+        }
+    }
+
+    /**
+      * Utility method to add clause to sql WHERE string.
+      *
+      * @param filterStr Current filter string
+      * @param clause Clause to add.
+      * @return Filter string.
+      */
+    private def addStrClause(filterStr: String, clause: String) =
+        if (filterStr.isEmpty)
+            clause
+        else
+            filterStr + " AND " + clause
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/7c014529/modules/spark/src/main/scala/org/apache/ignite/spark/impl/package.scala
----------------------------------------------------------------------
diff --git 
a/modules/spark/src/main/scala/org/apache/ignite/spark/impl/package.scala 
b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/package.scala
index 815854c..4634a97 100644
--- a/modules/spark/src/main/scala/org/apache/ignite/spark/impl/package.scala
+++ b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/package.scala
@@ -113,7 +113,7 @@ package object impl {
       * @tparam V Value class.
       * @return CacheConfiguration and QueryEntity for a given table.
       */
-    private def sqlTableInfo[K, V](ignite: Ignite, tabName: String): 
Option[(CacheConfiguration[K, V], QueryEntity)] =
+    def sqlTableInfo[K, V](ignite: Ignite, tabName: String): 
Option[(CacheConfiguration[K, V], QueryEntity)] =
         ignite.cacheNames().map { cacheName ⇒
             val ccfg = ignite.cache[K, 
V](cacheName).getConfiguration(classOf[CacheConfiguration[K, V]])
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/7c014529/modules/spark/src/main/scala/org/apache/spark/sql/ignite/IgniteExternalCatalog.scala
----------------------------------------------------------------------
diff --git 
a/modules/spark/src/main/scala/org/apache/spark/sql/ignite/IgniteExternalCatalog.scala
 
b/modules/spark/src/main/scala/org/apache/spark/sql/ignite/IgniteExternalCatalog.scala
index 9c1fe0c..6876a3e 100644
--- 
a/modules/spark/src/main/scala/org/apache/spark/sql/ignite/IgniteExternalCatalog.scala
+++ 
b/modules/spark/src/main/scala/org/apache/spark/sql/ignite/IgniteExternalCatalog.scala
@@ -17,12 +17,14 @@
 
 package org.apache.spark.sql.ignite
 
+import java.net.URI
+
 import org.apache.ignite.configuration.CacheConfiguration
 import org.apache.ignite.spark.IgniteDataFrameSettings.OPTION_TABLE
 import org.apache.ignite.spark.IgniteContext
 import org.apache.ignite.spark.IgniteDataFrameSettings._
 import org.apache.ignite.spark.impl.IgniteSQLRelation.schema
-import org.apache.ignite.{Ignite, Ignition}
+import org.apache.ignite.{Ignite, IgniteException, Ignition}
 import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
@@ -31,7 +33,8 @@ import org.apache.spark.sql.catalyst.expressions.Expression
 import org.apache.spark.sql.catalyst.util.StringUtils
 import org.apache.spark.sql.types.StructType
 import org.apache.ignite.spark.impl._
-import org.apache.spark.sql.ignite.IgniteExternalCatalog.OPTION_GRID
+import org.apache.spark.sql.catalyst.catalog.SessionCatalog.DEFAULT_DATABASE
+import org.apache.spark.sql.ignite.IgniteExternalCatalog.{IGNITE_PROTOCOL, 
IGNITE_URI, OPTION_GRID}
 
 import scala.collection.JavaConversions._
 
@@ -40,7 +43,8 @@ import scala.collection.JavaConversions._
   *
   * @param defaultIgniteContext Ignite context to provide access to Ignite 
instance. If <code>None</code> passed then no-name instance of Ignite used.
   */
-private[ignite] class IgniteExternalCatalog(defaultIgniteContext: 
IgniteContext) extends ExternalCatalog {
+private[ignite] class IgniteExternalCatalog(defaultIgniteContext: 
IgniteContext)
+    extends ExternalCatalog {
     /**
       * Default Ignite instance.
       */
@@ -51,7 +55,7 @@ private[ignite] class 
IgniteExternalCatalog(defaultIgniteContext: IgniteContext)
       * @return Description of Ignite instance.
       */
     override def getDatabase(db: String): CatalogDatabase =
-        CatalogDatabase(db, db, null, Map.empty)
+        CatalogDatabase(db, db, IGNITE_URI, Map.empty)
 
     /**
       * Checks Ignite instance with provided name exists.
@@ -61,7 +65,7 @@ private[ignite] class 
IgniteExternalCatalog(defaultIgniteContext: IgniteContext)
       * @return True is Ignite instance exists.
       */
     override def databaseExists(db: String): Boolean =
-        db == SessionCatalog.DEFAULT_DATABASE || igniteExists(db)
+        db == DEFAULT_DATABASE || igniteExists(db)
 
     /**
       * @return List of all known Ignite instances names.
@@ -104,7 +108,7 @@ private[ignite] class 
IgniteExternalCatalog(defaultIgniteContext: IgniteContext)
                     identifier = new TableIdentifier(tableName, 
Some(gridName)),
                     tableType = CatalogTableType.EXTERNAL,
                     storage = CatalogStorageFormat(
-                        locationUri = None,
+                        locationUri = Some(URI.create(IGNITE_PROTOCOL + 
tableName)),
                         inputFormat = Some(FORMAT_IGNITE),
                         outputFormat = Some(FORMAT_IGNITE),
                         serde = None,
@@ -259,12 +263,39 @@ private[ignite] class 
IgniteExternalCatalog(defaultIgniteContext: IgniteContext)
         throw new UnsupportedOperationException("unsupported")
 
     /** @inheritdoc */
-    override protected def doCreateTable(tableDefinition: CatalogTable, 
ignoreIfExists: Boolean): Unit =
-        throw new UnsupportedOperationException("unsupported")
+    override protected def doCreateTable(tableDefinition: CatalogTable, 
ignoreIfExists: Boolean): Unit = {
+        val ignite = 
igniteOrDefault(tableDefinition.identifier.database.getOrElse(DEFAULT_DATABASE),
 default)
+
+        igniteSQLTable(ignite, tableDefinition.identifier.table) match {
+            case Some(_) ⇒
+                /* no-op */
+
+            case None ⇒
+                val props = tableDefinition.storage.properties
+
+                QueryHelper.createTable(tableDefinition.schema,
+                    tableDefinition.identifier.table,
+                    props(OPTION_CREATE_TABLE_PRIMARY_KEY_FIELDS).split(","),
+                    props.get(OPTION_CREATE_TABLE_PARAMETERS),
+                    ignite)
+        }
+    }
 
     /** @inheritdoc */
-    override protected def doDropTable(db: String, table: String, 
ignoreIfNotExists: Boolean, purge: Boolean): Unit =
-        throw new UnsupportedOperationException("unsupported")
+    override protected def doDropTable(db: String, tabName: String, 
ignoreIfNotExists: Boolean, purge: Boolean): Unit = {
+        val ignite = igniteOrDefault(db, default)
+
+        igniteSQLTable(ignite, tabName) match {
+            case Some(table) ⇒
+                val tableName = table.getTableName
+
+                QueryHelper.dropTable(tableName, ignite)
+
+            case None ⇒
+                if (!ignoreIfNotExists)
+                    throw new IgniteException(s"Table $tabName doesn't 
exists.")
+        }
+    }
 
     /** @inheritdoc */
     override protected def doRenameTable(db: String, oldName: String, newName: 
String): Unit =
@@ -302,4 +333,14 @@ object IgniteExternalCatalog {
       * @see [[org.apache.ignite.Ignite#name()]]
       */
     private[apache] val OPTION_GRID = "grid"
+
+    /**
+      * Location of ignite tables.
+      */
+    private[apache] val IGNITE_PROTOCOL = "ignite:/"
+
+    /**
+      * URI location of ignite tables.
+      */
+    private val IGNITE_URI = new URI(IGNITE_PROTOCOL)
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/7c014529/modules/spark/src/test/resources/cities_non_unique.json
----------------------------------------------------------------------
diff --git a/modules/spark/src/test/resources/cities_non_unique.json 
b/modules/spark/src/test/resources/cities_non_unique.json
new file mode 100644
index 0000000..f971c86
--- /dev/null
+++ b/modules/spark/src/test/resources/cities_non_unique.json
@@ -0,0 +1,6 @@
+{ "id": 1, "name": "Forest Hill" }
+{ "id": 2, "name": "Denver" }
+{ "id": 3, "name": "St. Petersburg" }
+{ "id": 1, "name": "Paris" }
+{ "id": 2, "name": "New York" }
+{ "id": 3, "name": "Moscow" }

http://git-wip-us.apache.org/repos/asf/ignite/blob/7c014529/modules/spark/src/test/scala/org/apache/ignite/spark/AbstractDataFrameSpec.scala
----------------------------------------------------------------------
diff --git 
a/modules/spark/src/test/scala/org/apache/ignite/spark/AbstractDataFrameSpec.scala
 
b/modules/spark/src/test/scala/org/apache/ignite/spark/AbstractDataFrameSpec.scala
index 4b0b2de..263f235 100644
--- 
a/modules/spark/src/test/scala/org/apache/ignite/spark/AbstractDataFrameSpec.scala
+++ 
b/modules/spark/src/test/scala/org/apache/ignite/spark/AbstractDataFrameSpec.scala
@@ -26,7 +26,7 @@ import java.lang.{Long ⇒ JLong}
 import org.apache.ignite.cache.query.SqlFieldsQuery
 import org.apache.ignite.cache.query.annotations.QuerySqlField
 import org.apache.ignite.internal.IgnitionEx
-import org.apache.ignite.spark.AbstractDataFrameSpec.configuration
+import org.apache.ignite.spark.AbstractDataFrameSpec._
 
 import scala.annotation.meta.field
 import scala.reflect.ClassTag
@@ -39,16 +39,12 @@ abstract class AbstractDataFrameSpec extends FunSpec with 
Matchers with BeforeAn
     var client: Ignite = _
 
     override protected def beforeAll() = {
-        spark = SparkSession.builder()
-            .appName("DataFrameSpec")
-            .master("local")
-            .config("spark.executor.instances", "2")
-            .getOrCreate()
-
         for (i ← 0 to 3)
             Ignition.start(configuration("grid-" + i, client = false))
 
         client = Ignition.getOrStart(configuration("client", client = true))
+
+        createSparkSession()
     }
 
     override protected def afterAll() = {
@@ -60,12 +56,26 @@ abstract class AbstractDataFrameSpec extends FunSpec with 
Matchers with BeforeAn
         spark.close()
     }
 
-    def createPersonTable(client: Ignite, cacheName: String): Unit = {
+    protected def createSparkSession(): Unit = {
+        spark = SparkSession.builder()
+            .appName("DataFrameSpec")
+            .master("local")
+            .config("spark.executor.instances", "2")
+            .getOrCreate()
+    }
+
+    def createPersonTable2(client: Ignite, cacheName: String): Unit =
+        createPersonTable0(client, cacheName, PERSON_TBL_NAME_2)
+
+    def createPersonTable(client: Ignite, cacheName: String): Unit =
+        createPersonTable0(client, cacheName, PERSON_TBL_NAME)
+
+    private def createPersonTable0(client: Ignite, cacheName: String, tblName: 
String): Unit = {
         val cache = client.cache(cacheName)
 
         cache.query(new SqlFieldsQuery(
-            """
-              | CREATE TABLE person (
+            s"""
+              | CREATE TABLE $tblName (
               |    id LONG,
               |    name VARCHAR,
               |    birth_date DATE,
@@ -78,7 +88,7 @@ abstract class AbstractDataFrameSpec extends FunSpec with 
Matchers with BeforeAn
               |    PRIMARY KEY (id, city_id)) WITH "backups=1, 
affinityKey=city_id"
             """.stripMargin)).getAll
 
-        val qry = new SqlFieldsQuery("INSERT INTO person (id, name, city_id) 
values (?, ?, ?)")
+        val qry = new SqlFieldsQuery(s"INSERT INTO $tblName (id, name, 
city_id) values (?, ?, ?)")
 
         cache.query(qry.setArgs(1L.asInstanceOf[JLong], "John Doe", 
3L.asInstanceOf[JLong])).getAll
         cache.query(qry.setArgs(2L.asInstanceOf[JLong], "Jane Roe", 
2L.asInstanceOf[JLong])).getAll
@@ -120,6 +130,10 @@ object AbstractDataFrameSpec {
 
     val EMPLOYEE_CACHE_NAME = "cache3"
 
+    val PERSON_TBL_NAME = "person"
+
+    val PERSON_TBL_NAME_2 = "person2"
+
     def configuration(igniteInstanceName: String, client: Boolean): 
IgniteConfiguration = {
         val cfg = IgnitionEx.loadConfiguration(TEST_CONFIG_FILE).get1()
 
@@ -151,6 +165,11 @@ object AbstractDataFrameSpec {
 
         ccfg
     }
+
+    /**
+      * Enclose some closure, so it doesn't on outer object(default scala 
behaviour) while serializing.
+      */
+    def enclose[E, R](enclosed: E)(func: E => R): R = func(enclosed)
 }
 
 case class Employee (

http://git-wip-us.apache.org/repos/asf/ignite/blob/7c014529/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteCatalogSpec.scala
----------------------------------------------------------------------
diff --git 
a/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteCatalogSpec.scala 
b/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteCatalogSpec.scala
index 7a51198..23cfffd 100644
--- 
a/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteCatalogSpec.scala
+++ 
b/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteCatalogSpec.scala
@@ -21,14 +21,11 @@ import java.lang.{Long ⇒ JLong}
 
 import org.apache.ignite.cache.query.SqlFieldsQuery
 import org.apache.ignite.internal.IgnitionEx
-import org.apache.ignite.spark.AbstractDataFrameSpec.{EMPLOYEE_CACHE_NAME, 
DEFAULT_CACHE, TEST_CONFIG_FILE}
+import org.apache.ignite.spark.AbstractDataFrameSpec.{DEFAULT_CACHE, 
EMPLOYEE_CACHE_NAME, TEST_CONFIG_FILE, enclose}
 import org.apache.spark.sql.ignite.IgniteSparkSession
 import org.apache.spark.sql.types.{LongType, StringType}
 import org.junit.runner.RunWith
 import org.scalatest.junit.JUnitRunner
-import org.apache.ignite.spark.impl._
-
-import scala.collection.JavaConversions._
 
 /**
   * Tests to check Spark Catalog implementation.
@@ -152,9 +149,4 @@ class IgniteCatalogSpec extends AbstractDataFrameSpec {
             .igniteConfigProvider(configProvider)
             .getOrCreate()
     }
-
-    /**
-      * Enclose some closure, so it doesn't on outer object(default scala 
behaviour) while serializing.
-      */
-    def enclose[E, R](enclosed: E)(func: E => R): R = func(enclosed)
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/7c014529/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteDataFrameSuite.scala
----------------------------------------------------------------------
diff --git 
a/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteDataFrameSuite.scala
 
b/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteDataFrameSuite.scala
index 67ebf3b..2ceb44a 100644
--- 
a/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteDataFrameSuite.scala
+++ 
b/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteDataFrameSuite.scala
@@ -25,6 +25,8 @@ import org.scalatest.Suites
 class IgniteDataFrameSuite extends Suites (
     new IgniteDataFrameSchemaSpec,
     new IgniteSQLDataFrameSpec,
+    new IgniteSQLDataFrameWriteSpec,
+    new IgniteSQLDataFrameIgniteSessionWriteSpec,
     new IgniteDataFrameWrongConfigSpec,
     new IgniteCatalogSpec
 )

http://git-wip-us.apache.org/repos/asf/ignite/blob/7c014529/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteDataFrameWrongConfigSpec.scala
----------------------------------------------------------------------
diff --git 
a/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteDataFrameWrongConfigSpec.scala
 
b/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteDataFrameWrongConfigSpec.scala
index ce9f485..c32cb18 100644
--- 
a/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteDataFrameWrongConfigSpec.scala
+++ 
b/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteDataFrameWrongConfigSpec.scala
@@ -17,11 +17,9 @@
 
 package org.apache.ignite.spark
 
-import java.lang.{Integer ⇒ JInteger, String ⇒ JString}
-
 import org.apache.ignite.spark.AbstractDataFrameSpec.TEST_CONFIG_FILE
 import org.apache.ignite.spark.IgniteDataFrameSettings._
-import org.apache.ignite.{IgniteException, IgniteIllegalStateException}
+import org.apache.ignite.IgniteException
 import org.junit.runner.RunWith
 import org.scalatest.junit.JUnitRunner
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/7c014529/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteSQLDataFrameIgniteSessionWriteSpec.scala
----------------------------------------------------------------------
diff --git 
a/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteSQLDataFrameIgniteSessionWriteSpec.scala
 
b/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteSQLDataFrameIgniteSessionWriteSpec.scala
new file mode 100644
index 0000000..eb29cc8
--- /dev/null
+++ 
b/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteSQLDataFrameIgniteSessionWriteSpec.scala
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spark
+
+import org.apache.ignite.internal.IgnitionEx
+import org.apache.ignite.spark.AbstractDataFrameSpec.{TEST_CONFIG_FILE, 
enclose}
+import org.apache.ignite.spark.IgniteDataFrameSettings._
+import org.apache.ignite.testframework.GridTestUtils.resolveIgnitePath
+import org.apache.spark.sql.SaveMode
+import org.apache.spark.sql.ignite.IgniteSparkSession
+import org.junit.runner.RunWith
+import org.scalatest.junit.JUnitRunner
+import org.apache.spark.sql.functions._
+
+/**
+  */
+@RunWith(classOf[JUnitRunner])
+class IgniteSQLDataFrameIgniteSessionWriteSpec extends 
IgniteSQLDataFrameWriteSpec {
+    describe("Additional features for IgniteSparkSession") {
+        it("Save data frame as a existing table with saveAsTable('table_name') 
- Overwrite") {
+            val citiesDataFrame = spark.read.json(
+                
resolveIgnitePath("modules/spark/src/test/resources/cities.json").getAbsolutePath)
+
+            citiesDataFrame.write
+                .format(FORMAT_IGNITE)
+                .option(OPTION_CONFIG_FILE, TEST_CONFIG_FILE)
+                .option(OPTION_CREATE_TABLE_PRIMARY_KEY_FIELDS, "id")
+                .option(OPTION_CREATE_TABLE_PARAMETERS, "template=replicated")
+                .mode(SaveMode.Overwrite)
+                .saveAsTable("city")
+
+            assert(rowsCount("city") == citiesDataFrame.count(),
+                s"Table json_city should contain data from json file.")
+        }
+
+        it("Save data frame as a existing table with saveAsTable('table_name') 
- Append") {
+            val citiesDataFrame = spark.read.json(
+                
resolveIgnitePath("modules/spark/src/test/resources/cities.json").getAbsolutePath)
+
+            val rowCnt = citiesDataFrame.count()
+
+            citiesDataFrame
+                .withColumn("id", col("id") + rowCnt) //Edit id column to 
prevent duplication
+                .write
+                .format(FORMAT_IGNITE)
+                .option(OPTION_CONFIG_FILE, TEST_CONFIG_FILE)
+                .option(OPTION_CREATE_TABLE_PRIMARY_KEY_FIELDS, "id")
+                .option(OPTION_CREATE_TABLE_PARAMETERS, "template=replicated")
+                .mode(SaveMode.Append)
+                .partitionBy("id")
+                .saveAsTable("city")
+
+            assert(rowsCount("city") == rowCnt*2,
+                s"Table json_city should contain data from json file.")
+        }
+
+        it("Save data frame as a new table with saveAsTable('table_name')") {
+            val citiesDataFrame = spark.read.json(
+                
resolveIgnitePath("modules/spark/src/test/resources/cities.json").getAbsolutePath)
+
+            citiesDataFrame.write
+                .format(FORMAT_IGNITE)
+                .option(OPTION_CONFIG_FILE, TEST_CONFIG_FILE)
+                .option(OPTION_CREATE_TABLE_PRIMARY_KEY_FIELDS, "id")
+                .option(OPTION_CREATE_TABLE_PARAMETERS, "template=replicated")
+                .saveAsTable("new_cities")
+
+            assert(rowsCount("new_cities") == citiesDataFrame.count(),
+                s"Table json_city should contain data from json file.")
+        }
+    }
+
+    override protected def createSparkSession(): Unit = {
+        val configProvider = enclose(null) (x ⇒ () ⇒ {
+            val cfg = IgnitionEx.loadConfiguration(TEST_CONFIG_FILE).get1()
+
+            cfg.setClientMode(true)
+
+            cfg.setIgniteInstanceName("client-2")
+
+            cfg
+        })
+
+        spark = IgniteSparkSession.builder()
+            .appName("DataFrameSpec")
+            .master("local")
+            .config("spark.executor.instances", "2")
+            .igniteConfigProvider(configProvider)
+            .getOrCreate()
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/7c014529/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteSQLDataFrameWriteSpec.scala
----------------------------------------------------------------------
diff --git 
a/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteSQLDataFrameWriteSpec.scala
 
b/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteSQLDataFrameWriteSpec.scala
new file mode 100644
index 0000000..30df27e
--- /dev/null
+++ 
b/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteSQLDataFrameWriteSpec.scala
@@ -0,0 +1,371 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spark
+
+import org.apache.ignite.IgniteException
+import org.apache.ignite.spark.AbstractDataFrameSpec.{PERSON_TBL_NAME, 
PERSON_TBL_NAME_2, TEST_CONFIG_FILE}
+import org.apache.ignite.spark.IgniteDataFrameSettings._
+import org.apache.ignite.spark.impl.sqlTableInfo
+import org.apache.ignite.testframework.GridTestUtils.resolveIgnitePath
+import org.apache.spark.sql.SaveMode.{Append, Ignore, Overwrite}
+import org.apache.spark.sql.{DataFrame, SaveMode}
+import org.junit.runner.RunWith
+import org.scalatest.junit.JUnitRunner
+import org.apache.spark.sql.functions._
+
+/**
+  */
+@RunWith(classOf[JUnitRunner])
+class IgniteSQLDataFrameWriteSpec extends AbstractDataFrameSpec {
+    var personDataFrame: DataFrame = _
+
+    describe("Write DataFrame into a Ignite SQL table") {
+        it("Save data frame as a new table") {
+            val rowsCnt = personDataFrame.count()
+
+            personDataFrame.write
+                .format(FORMAT_IGNITE)
+                .option(OPTION_CONFIG_FILE, TEST_CONFIG_FILE)
+                .option(OPTION_TABLE, "new_persons")
+                .option(OPTION_CREATE_TABLE_PRIMARY_KEY_FIELDS, "id")
+                .save()
+
+            assert(rowsCnt == rowsCount("new_persons"), "Data should be saved 
into 'new_persons' table")
+        }
+
+        it("Save data frame to existing table") {
+            val rowsCnt = personDataFrame.count()
+
+            personDataFrame.write
+                .format(FORMAT_IGNITE)
+                .option(OPTION_CONFIG_FILE, TEST_CONFIG_FILE)
+                .option(OPTION_TABLE, PERSON_TBL_NAME_2)
+                .option(OPTION_CREATE_TABLE_PRIMARY_KEY_FIELDS, "id, city_id")
+                .option(OPTION_CREATE_TABLE_PARAMETERS, "backups=1, 
affinityKey=city_id")
+                .mode(Overwrite)
+                .save()
+
+            assert(rowsCnt == rowsCount(PERSON_TBL_NAME_2), s"Data should be 
saved into $PERSON_TBL_NAME_2 table")
+        }
+
+        it("Save data frame to existing table with streamer options") {
+            val rowsCnt = personDataFrame.count()
+
+            personDataFrame.write
+                .format(FORMAT_IGNITE)
+                .option(OPTION_CONFIG_FILE, TEST_CONFIG_FILE)
+                .option(OPTION_TABLE, PERSON_TBL_NAME_2)
+                .option(OPTION_CREATE_TABLE_PRIMARY_KEY_FIELDS, "id, city_id")
+                .option(OPTION_CREATE_TABLE_PARAMETERS, "backups=1, 
affinityKey=city_id")
+                .option(OPTION_STREAMER_PER_NODE_PARALLEL_OPERATIONS, 3)
+                .option(OPTION_STREAMER_PER_NODE_BUFFER_SIZE, 1)
+                .option(OPTION_STREAMER_FLUSH_FREQUENCY, 10000)
+                .mode(Overwrite)
+                .save()
+
+            assert(rowsCnt == rowsCount(PERSON_TBL_NAME_2), s"Data should be 
saved into $PERSON_TBL_NAME_2 table")
+        }
+
+        it("Ignore save operation if table exists") {
+            //Count of records before saving
+            val person2RowsCntBeforeSave = rowsCount(PERSON_TBL_NAME_2)
+
+            personDataFrame.write
+                .format(FORMAT_IGNITE)
+                .option(OPTION_CONFIG_FILE, TEST_CONFIG_FILE)
+                .option(OPTION_TABLE, PERSON_TBL_NAME_2)
+                .option(OPTION_CREATE_TABLE_PRIMARY_KEY_FIELDS, "id, city_id")
+                .option(OPTION_CREATE_TABLE_PARAMETERS, "backups=1, 
affinityKey=city_id")
+                .mode(Ignore)
+                .save()
+
+            assert(rowsCount(PERSON_TBL_NAME_2) == person2RowsCntBeforeSave, 
"Save operation should be ignored.")
+        }
+
+        it("Append data frame data to existing table") {
+            //Count of records before appending
+            val person2RowsCnt = rowsCount(PERSON_TBL_NAME_2)
+
+            //Count of appended records
+            val personRowsCnt = personDataFrame.count()
+
+            personDataFrame
+                .withColumn("id", col("id") + person2RowsCnt) //Edit id column 
to prevent duplication
+                .write.format(FORMAT_IGNITE)
+                .option(OPTION_CONFIG_FILE, TEST_CONFIG_FILE)
+                .option(OPTION_TABLE, PERSON_TBL_NAME_2)
+                .option(OPTION_CREATE_TABLE_PRIMARY_KEY_FIELDS, "id, city_id")
+                .option(OPTION_CREATE_TABLE_PARAMETERS, "backups=1, 
affinityKey=city_id")
+                .mode(Append)
+                .save()
+
+            assert(rowsCount(PERSON_TBL_NAME_2) == person2RowsCnt + 
personRowsCnt,
+                s"Table $PERSON_TBL_NAME_2 should contain data from 
$PERSON_TBL_NAME")
+        }
+
+        it("Save another data source data as a Ignite table") {
+            val citiesDataFrame = spark.read.json(
+                
resolveIgnitePath("modules/spark/src/test/resources/cities.json").getAbsolutePath)
+
+            citiesDataFrame.write
+                .format(FORMAT_IGNITE)
+                .option(OPTION_CONFIG_FILE, TEST_CONFIG_FILE)
+                .option(OPTION_TABLE, "json_city")
+                .option(OPTION_CREATE_TABLE_PRIMARY_KEY_FIELDS, "id")
+                .option(OPTION_CREATE_TABLE_PARAMETERS, "template=replicated")
+                .save()
+
+            assert(rowsCount("json_city") == citiesDataFrame.count(),
+                "Table json_city should contain data from json file.")
+        }
+
+        it("Save data frame as a new table with save('table_name')") {
+            val rowsCnt = personDataFrame.count()
+
+            personDataFrame.write
+                .format(FORMAT_IGNITE)
+                .option(OPTION_CONFIG_FILE, TEST_CONFIG_FILE)
+                .option(OPTION_CREATE_TABLE_PRIMARY_KEY_FIELDS, "id")
+                .save("saved_persons")
+
+            assert(rowsCnt == rowsCount("saved_persons"), "Data should be 
saved into 'saved_persons' table")
+        }
+
+        it("Should keep first row if allowOverwrite is false") {
+            val nonUniqueCitiesDataFrame = spark.read.json(
+                
resolveIgnitePath("modules/spark/src/test/resources/cities_non_unique.json").getAbsolutePath)
+
+            nonUniqueCitiesDataFrame.write
+                .format(FORMAT_IGNITE)
+                .option(OPTION_CONFIG_FILE, TEST_CONFIG_FILE)
+                .option(OPTION_TABLE, "first_row_json_city")
+                .option(OPTION_CREATE_TABLE_PRIMARY_KEY_FIELDS, "id")
+                .option(OPTION_CREATE_TABLE_PARAMETERS, "template=replicated")
+                .option(OPTION_STREAMER_ALLOW_OVERWRITE, false)
+                .save()
+
+            val cities = 
readTable("first_row_json_city").collect().sortBy(_.getAs[Long]("ID"))
+
+            assert(cities(0).getAs[String]("NAME") == "Forest Hill")
+            assert(cities(1).getAs[String]("NAME") == "Denver")
+            assert(cities(2).getAs[String]("NAME") == "St. Petersburg")
+        }
+
+        it("Should keep last row if allowOverwrite is true") {
+            val nonUniqueCitiesDataFrame = spark.read.json(
+                
resolveIgnitePath("modules/spark/src/test/resources/cities_non_unique.json").getAbsolutePath)
+
+            nonUniqueCitiesDataFrame.write
+                .format(FORMAT_IGNITE)
+                .option(OPTION_CONFIG_FILE, TEST_CONFIG_FILE)
+                .option(OPTION_TABLE, "last_row_json_city")
+                .option(OPTION_CREATE_TABLE_PRIMARY_KEY_FIELDS, "id")
+                .option(OPTION_CREATE_TABLE_PARAMETERS, "template=replicated")
+                .option(OPTION_STREAMER_ALLOW_OVERWRITE, true)
+                .save()
+
+            val cities = 
readTable("last_row_json_city").collect().sortBy(_.getAs[Long]("ID"))
+
+            assert(cities(0).getAs[String]("NAME") == "Paris")
+            assert(cities(1).getAs[String]("NAME") == "New York")
+            assert(cities(2).getAs[String]("NAME") == "Moscow")
+        }
+    }
+
+    describe("Wrong DataFrame Write Options") {
+        it("Should throw exception with ErrorIfExists for a existing table") {
+            intercept[IgniteException] {
+                personDataFrame.write
+                    .format(FORMAT_IGNITE)
+                    .option(OPTION_CONFIG_FILE, TEST_CONFIG_FILE)
+                    .option(OPTION_TABLE, PERSON_TBL_NAME)
+                    .option(OPTION_CREATE_TABLE_PRIMARY_KEY_FIELDS, "id")
+                    .mode(SaveMode.ErrorIfExists)
+                    .save()
+            }
+        }
+
+        it("Should throw exception if primary key fields not specified") {
+            intercept[IgniteException] {
+                personDataFrame.write
+                    .format(FORMAT_IGNITE)
+                    .option(OPTION_CONFIG_FILE, TEST_CONFIG_FILE)
+                    .option(OPTION_TABLE, "persons_no_pk")
+                    .save()
+            }
+        }
+
+        it("Should throw exception if primary key fields not specified for 
existing table") {
+            intercept[IgniteException] {
+                personDataFrame.write
+                    .format(FORMAT_IGNITE)
+                    .option(OPTION_CONFIG_FILE, TEST_CONFIG_FILE)
+                    .option(OPTION_TABLE, PERSON_TBL_NAME)
+                    .mode(Overwrite)
+                    .save()
+            }
+
+            val tblInfo = sqlTableInfo[Any, Any](client, PERSON_TBL_NAME)
+
+            assert(tblInfo.isDefined, s"Table $PERSON_TBL_NAME should exists.")
+        }
+
+        it("Should throw exception for wrong pk field") {
+            intercept[IgniteException] {
+                personDataFrame.write
+                    .format(FORMAT_IGNITE)
+                    .option(OPTION_CONFIG_FILE, TEST_CONFIG_FILE)
+                    .option(OPTION_TABLE, PERSON_TBL_NAME)
+                    .option(OPTION_CREATE_TABLE_PRIMARY_KEY_FIELDS, 
"unknown_field")
+                    .mode(Overwrite)
+                    .save()
+            }
+
+            val tblInfo = sqlTableInfo[Any, Any](client, PERSON_TBL_NAME)
+
+            assert(tblInfo.isDefined, s"Table $PERSON_TBL_NAME should exists.")
+        }
+
+        it("Should throw exception for wrong pk field - 2") {
+            intercept[IgniteException] {
+                personDataFrame.write
+                    .format(FORMAT_IGNITE)
+                    .option(OPTION_CONFIG_FILE, TEST_CONFIG_FILE)
+                    .option(OPTION_TABLE, PERSON_TBL_NAME)
+                    .option(OPTION_CREATE_TABLE_PRIMARY_KEY_FIELDS, 
"id,unknown_field")
+                    .mode(Overwrite)
+                    .save()
+            }
+
+            val tblInfo = sqlTableInfo[Any, Any](client, PERSON_TBL_NAME)
+
+            assert(tblInfo.isDefined, s"Table $PERSON_TBL_NAME should exists.")
+        }
+
+        it("Should throw exception for wrong WITH clause") {
+            intercept[IgniteException] {
+                personDataFrame.write
+                    .format(FORMAT_IGNITE)
+                    .option(OPTION_CONFIG_FILE, TEST_CONFIG_FILE)
+                    .option(OPTION_TABLE, "person_unsupported_with")
+                    .option(OPTION_CREATE_TABLE_PARAMETERS, 
"unsupported_with_clause")
+                    .mode(Overwrite)
+                    .save()
+            }
+        }
+
+        it("Should throw exception for wrong table name") {
+            intercept[IgniteException] {
+                personDataFrame.write
+                    .format(FORMAT_IGNITE)
+                    .option(OPTION_CONFIG_FILE, TEST_CONFIG_FILE)
+                    .option(OPTION_TABLE, "wrong-table-name")
+                    .option(OPTION_CREATE_TABLE_PARAMETERS, 
"unsupported_with_clause")
+                    .mode(Overwrite)
+                    .save()
+            }
+        }
+
+        it("Should throw exception if streamingFlushFrequency is not a 
number") {
+            intercept[NumberFormatException] {
+                personDataFrame.write
+                    .format(FORMAT_IGNITE)
+                    .option(OPTION_CONFIG_FILE, TEST_CONFIG_FILE)
+                    .option(OPTION_TABLE, PERSON_TBL_NAME)
+                    .option(OPTION_CREATE_TABLE_PRIMARY_KEY_FIELDS, "id")
+                    .option(OPTION_STREAMER_FLUSH_FREQUENCY, "not_a_number")
+                    .mode(Overwrite)
+                    .save()
+            }
+        }
+
+        it("Should throw exception if streamingPerNodeBufferSize is not a 
number") {
+            intercept[NumberFormatException] {
+                personDataFrame.write
+                    .format(FORMAT_IGNITE)
+                    .option(OPTION_CONFIG_FILE, TEST_CONFIG_FILE)
+                    .option(OPTION_TABLE, PERSON_TBL_NAME)
+                    .option(OPTION_CREATE_TABLE_PRIMARY_KEY_FIELDS, "id")
+                    .option(OPTION_STREAMER_PER_NODE_BUFFER_SIZE, 
"not_a_number")
+                    .mode(Overwrite)
+                    .save()
+            }
+        }
+
+        it("Should throw exception if streamingPerNodeParallelOperations is 
not a number") {
+            intercept[NumberFormatException] {
+                personDataFrame.write
+                    .format(FORMAT_IGNITE)
+                    .option(OPTION_CONFIG_FILE, TEST_CONFIG_FILE)
+                    .option(OPTION_TABLE, PERSON_TBL_NAME)
+                    .option(OPTION_CREATE_TABLE_PRIMARY_KEY_FIELDS, "id")
+                    .option(OPTION_STREAMER_PER_NODE_PARALLEL_OPERATIONS, 
"not_a_number")
+                    .mode(Overwrite)
+                    .save()
+            }
+        }
+
+        it("Should throw exception if streamerAllowOverwrite is not a 
boolean") {
+            intercept[IllegalArgumentException] {
+                personDataFrame.write
+                    .format(FORMAT_IGNITE)
+                    .option(OPTION_CONFIG_FILE, TEST_CONFIG_FILE)
+                    .option(OPTION_TABLE, PERSON_TBL_NAME)
+                    .option(OPTION_CREATE_TABLE_PRIMARY_KEY_FIELDS, "id")
+                    .option(OPTION_STREAMER_ALLOW_OVERWRITE, "not_a_boolean")
+                    .mode(Overwrite)
+                    .save()
+            }
+        }
+    }
+
+    override protected def beforeAll(): Unit = {
+        super.beforeAll()
+
+        createPersonTable(client, "cache1")
+
+        createPersonTable2(client, "cache1")
+
+        createCityTable(client, "cache1")
+
+        personDataFrame = spark.read
+            .format(FORMAT_IGNITE)
+            .option(OPTION_CONFIG_FILE, TEST_CONFIG_FILE)
+            .option(OPTION_TABLE, PERSON_TBL_NAME)
+            .load()
+
+        personDataFrame.createOrReplaceTempView("person")
+    }
+
+    /**
+      * @param tbl Table name.
+      * @return Count of rows in table.
+      */
+    protected def rowsCount(tbl: String): Long = readTable(tbl).count()
+
+    /**
+      * @param tbl Table name.
+      * @return Ignite Table DataFrame.
+      */
+    protected def readTable(tbl: String): DataFrame =
+        spark.read
+            .format(FORMAT_IGNITE)
+            .option(OPTION_CONFIG_FILE, TEST_CONFIG_FILE)
+            .option(OPTION_TABLE, tbl)
+            .load()
+}

Reply via email to