Repository: ignite Updated Branches: refs/heads/master 4abcb3107 -> 7c0145299
IGNITE-7337: Implementation of saving DataFrame to Ignite SQL tables - Fixes #3438. Signed-off-by: Nikolay Izhikov <nizhi...@apache.org> Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/7c014529 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/7c014529 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/7c014529 Branch: refs/heads/master Commit: 7c01452990ad0de0fb84ab4c0424a6d71e5bccba Parents: 4abcb31 Author: Nikolay Izhikov <nizhi...@apache.org> Authored: Fri Feb 9 07:00:54 2018 +0300 Committer: Nikolay Izhikov <nizhi...@apache.org> Committed: Fri Feb 9 07:00:54 2018 +0300 ---------------------------------------------------------------------- examples/src/main/resources/person.json | 10 + .../spark/IgniteDataFrameWriteExample.scala | 179 +++++++++ .../spark/examples/IgniteDataFrameSelfTest.java | 9 + .../ignite/spark/IgniteDataFrameSettings.scala | 100 +++++ .../spark/impl/IgniteRelationProvider.scala | 175 ++++++++- .../ignite/spark/impl/IgniteSQLRelation.scala | 93 ++--- .../apache/ignite/spark/impl/QueryHelper.scala | 186 ++++++++++ .../apache/ignite/spark/impl/QueryUtils.scala | 225 +++++++++++ .../org/apache/ignite/spark/impl/package.scala | 2 +- .../sql/ignite/IgniteExternalCatalog.scala | 61 ++- .../src/test/resources/cities_non_unique.json | 6 + .../ignite/spark/AbstractDataFrameSpec.scala | 41 +- .../apache/ignite/spark/IgniteCatalogSpec.scala | 10 +- .../ignite/spark/IgniteDataFrameSuite.scala | 2 + .../spark/IgniteDataFrameWrongConfigSpec.scala | 4 +- ...niteSQLDataFrameIgniteSessionWriteSpec.scala | 106 ++++++ .../spark/IgniteSQLDataFrameWriteSpec.scala | 371 +++++++++++++++++++ 17 files changed, 1462 insertions(+), 118 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/7c014529/examples/src/main/resources/person.json ---------------------------------------------------------------------- diff --git a/examples/src/main/resources/person.json b/examples/src/main/resources/person.json new file mode 100644 index 0000000..d651b0d --- /dev/null +++ b/examples/src/main/resources/person.json @@ -0,0 +1,10 @@ +{ "id": 1, "name": "Ivan Ivanov", "department": "Executive commitee" } +{ "id": 2, "name": "Petr Petrov", "department": "Executive commitee" } +{ "id": 3, "name": "Jonh Doe", "department": "Production" } +{ "id": 4, "name": "Smith Ann", "department": "Production" } +{ "id": 5, "name": "Sergey Smirnov", "department": "Accounting" } +{ "id": 6, "name": "Alexandra Sergeeva", "department": "Accounting" } +{ "id": 7, "name": "Adam West", "department": "IT" } +{ "id": 8, "name": "Beverley Chase", "department": "Head Office" } +{ "id": 9, "name": "Igor Rozhkov", "department": "Head Office" } +{ "id": 10, "name": "Anastasia Borisova", "department": "IT" } http://git-wip-us.apache.org/repos/asf/ignite/blob/7c014529/examples/src/main/spark/org/apache/ignite/examples/spark/IgniteDataFrameWriteExample.scala ---------------------------------------------------------------------- diff --git a/examples/src/main/spark/org/apache/ignite/examples/spark/IgniteDataFrameWriteExample.scala b/examples/src/main/spark/org/apache/ignite/examples/spark/IgniteDataFrameWriteExample.scala new file mode 100644 index 0000000..7662fb4 --- /dev/null +++ b/examples/src/main/spark/org/apache/ignite/examples/spark/IgniteDataFrameWriteExample.scala @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.examples.spark + +import java.lang.{Long â JLong, String â JString} + +import org.apache.ignite.cache.query.SqlFieldsQuery +import org.apache.ignite.configuration.CacheConfiguration +import org.apache.ignite.{Ignite, Ignition} +import org.apache.log4j.{Level, Logger} +import org.apache.spark.sql.{SaveMode, SparkSession} +import org.apache.ignite.spark.IgniteDataFrameSettings._ +import org.apache.spark.sql.functions._ + +import scala.collection.JavaConversions._ + +/** + * Example application showing use-case for writing Spark DataFrame API to Ignite. + */ +object IgniteDataFrameWriteExample extends App { + /** + * Ignite config file. + */ + private val CONFIG = "examples/config/example-ignite.xml" + + /** + * Test cache name. + */ + private val CACHE_NAME = "testCache" + + //Starting Ignite server node. + val ignite = setupServerAndData + + closeAfter(ignite) { _ â + //Creating spark session. + implicit val spark: SparkSession = SparkSession.builder() + .appName("Spark Ignite data sources write example") + .master("local") + .config("spark.executor.instances", "2") + .getOrCreate() + + // Adjust the logger to exclude the logs of no interest. + Logger.getRootLogger.setLevel(Level.INFO) + Logger.getLogger("org.apache.ignite").setLevel(Level.INFO) + + // Executing examples. + println("Example of writing json file to Ignite:") + + writeJSonToIgnite + + println("Example of modifying existing Ignite table data through Data Fram API:") + + editDataAndSaveToNewTable + } + + def writeJSonToIgnite(implicit spark: SparkSession): Unit = { + //Load content of json file to data frame. + val personsDataFrame = spark.read.json("examples/src/main/resources/person.json") + + println() + println("Json file content:") + println() + + //Printing content of json file to console. + personsDataFrame.show() + + println() + println("Writing Data Frame to Ignite:") + println() + + //Writing content of data frame to Ignite. + personsDataFrame.write + .format(FORMAT_IGNITE) + .option(OPTION_CONFIG_FILE, CONFIG) + .option(OPTION_TABLE, "json_person") + .option(OPTION_CREATE_TABLE_PRIMARY_KEY_FIELDS, "id") + .option(OPTION_CREATE_TABLE_PARAMETERS, "template=replicated") + .save() + + println("Done!") + + println() + println("Reading data from Ignite table:") + println() + + val cache = ignite.cache[Any, Any](CACHE_NAME) + + //Reading saved data from Ignite. + val data = cache.query(new SqlFieldsQuery("SELECT id, name, department FROM json_person")).getAll + + data.foreach { row â println(row.mkString("[", ", ", "]")) } + } + + def editDataAndSaveToNewTable(implicit spark: SparkSession): Unit = { + //Load content of Ignite table to data frame. + val personDataFrame = spark.read + .format(FORMAT_IGNITE) + .option(OPTION_CONFIG_FILE, CONFIG) + .option(OPTION_TABLE, "person") + .load() + + println() + println("Data frame content:") + println() + + //Printing content of data frame to console. + personDataFrame.show() + + println() + println("Modifying Data Frame and write it to Ignite:") + println() + + personDataFrame + .withColumn("id", col("id") + 42) //Edit id column + .withColumn("name", reverse(col("name"))) //Edit name column + .write.format(FORMAT_IGNITE) + .option(OPTION_CONFIG_FILE, CONFIG) + .option(OPTION_TABLE, "new_persons") + .option(OPTION_CREATE_TABLE_PRIMARY_KEY_FIELDS, "id, city_id") + .option(OPTION_CREATE_TABLE_PARAMETERS, "backups=1") + .mode(SaveMode.Overwrite) //Overwriting entire table. + .save() + + println("Done!") + + println() + println("Reading data from Ignite table:") + println() + + val cache = ignite.cache[Any, Any](CACHE_NAME) + + //Reading saved data from Ignite. + val data = cache.query(new SqlFieldsQuery("SELECT id, name, city_id FROM new_persons")).getAll + + data.foreach { row â println(row.mkString("[", ", ", "]")) } + } + + def setupServerAndData: Ignite = { + //Starting Ignite. + val ignite = Ignition.start(CONFIG) + + //Creating first test cache. + val ccfg = new CacheConfiguration[JLong, JString](CACHE_NAME).setSqlSchema("PUBLIC") + + val cache = ignite.getOrCreateCache(ccfg) + + //Creating SQL table. + cache.query(new SqlFieldsQuery( + "CREATE TABLE person (id LONG, name VARCHAR, city_id LONG, PRIMARY KEY (id)) " + + "WITH \"backups=1\"")).getAll + + cache.query(new SqlFieldsQuery("CREATE INDEX on Person (city_id)")).getAll + + //Inserting some data to tables. + val qry = new SqlFieldsQuery("INSERT INTO person (id, name, city_id) values (?, ?, ?)") + + cache.query(qry.setArgs(1L.asInstanceOf[JLong], "John Doe", 3L.asInstanceOf[JLong])).getAll + cache.query(qry.setArgs(2L.asInstanceOf[JLong], "Jane Roe", 2L.asInstanceOf[JLong])).getAll + cache.query(qry.setArgs(3L.asInstanceOf[JLong], "Mary Major", 1L.asInstanceOf[JLong])).getAll + cache.query(qry.setArgs(4L.asInstanceOf[JLong], "Richard Miles", 2L.asInstanceOf[JLong])).getAll + + ignite + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/7c014529/examples/src/test/spark/org/apache/ignite/spark/examples/IgniteDataFrameSelfTest.java ---------------------------------------------------------------------- diff --git a/examples/src/test/spark/org/apache/ignite/spark/examples/IgniteDataFrameSelfTest.java b/examples/src/test/spark/org/apache/ignite/spark/examples/IgniteDataFrameSelfTest.java index b18d870..f9f1d64 100644 --- a/examples/src/test/spark/org/apache/ignite/spark/examples/IgniteDataFrameSelfTest.java +++ b/examples/src/test/spark/org/apache/ignite/spark/examples/IgniteDataFrameSelfTest.java @@ -19,6 +19,7 @@ package org.apache.ignite.spark.examples; import org.apache.ignite.examples.spark.IgniteCatalogExample; import org.apache.ignite.examples.spark.IgniteDataFrameExample; +import org.apache.ignite.examples.spark.IgniteDataFrameWriteExample; import org.junit.Test; /** @@ -41,4 +42,12 @@ public class IgniteDataFrameSelfTest { public void testDataFrameExample() throws Exception { IgniteDataFrameExample.main(EMPTY_ARGS); } + + /** + * @throws Exception If failed. + */ + @Test + public void testDataFrameWriteExample() throws Exception { + IgniteDataFrameWriteExample.main(EMPTY_ARGS); + } } http://git-wip-us.apache.org/repos/asf/ignite/blob/7c014529/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteDataFrameSettings.scala ---------------------------------------------------------------------- diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteDataFrameSettings.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteDataFrameSettings.scala index 4261032..6bff476 100644 --- a/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteDataFrameSettings.scala +++ b/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteDataFrameSettings.scala @@ -53,4 +53,104 @@ object IgniteDataFrameSettings { * @see [[org.apache.ignite.cache.QueryEntity#tableName]] */ val OPTION_TABLE = "table" + + /** + * Config option to specify newly created Ignite SQL table parameters. + * Value of these option will be used in `CREATE TABLE ... WITH "option value goes here"` + * + * @example {{{ + * val igniteDF = spark.write.format(IGNITE) + * // other options ... + * .option( OPTION_CREATE_TABLE_PARAMETERS, "backups=1, template=replicated") + * .save() + * }}} + * + * @see [[https://apacheignite-sql.readme.io/docs/create-table]] + */ + val OPTION_CREATE_TABLE_PARAMETERS = "createTableParameters" + + /** + * Config option to specify comma separated list of primary key fields for a newly created Ignite SQL table. + * + * @example {{{ + * val igniteDF = spark.write.format(IGNITE) + * // other options ... + * .option(OPTION_CREATE_TABLE_PRIMARY_KEY_FIELDS, "id") + * .save() + * }}} + * + * @see [[https://apacheignite-sql.readme.io/docs/create-table]] + */ + val OPTION_CREATE_TABLE_PRIMARY_KEY_FIELDS = "primaryKeyFields" + + /** + * Config option for saving data frame. + * Internally all SQL inserts are done through `IgniteDataStreamer`. + * This options sets `allowOverwrite` property of streamer. + * If `true` then row with same primary key value will be written to the table. + * If `false` then row with same primary key value will be skipped. Existing row will be left in the table. + * Default value if `false`. + * + * @example {{{ + * val igniteDF = spark.write.format(IGNITE) + * // other options ... + * .option(OPTION_STREAMER_ALLOW_OVERWRITE, true) + * .save() + * }}} + * + * @see [[org.apache.ignite.IgniteDataStreamer]] + * @see [[org.apache.ignite.IgniteDataStreamer#allowOverwrite(boolean)]] + */ + val OPTION_STREAMER_ALLOW_OVERWRITE = "streamerAllowOverwrite" + + /** + * Config option for saving data frame. + * Internally all SQL inserts are done through `IgniteDataStreamer`. + * This options sets `autoFlushFrequency` property of streamer. + * + * @example {{{ + * val igniteDF = spark.write.format(IGNITE) + * // other options ... + * .option(OPTION_STREAMING_FLUSH_FREQUENCY, 10000) + * .save() + * }}} + * + * @see [[org.apache.ignite.IgniteDataStreamer]] + * @see [[org.apache.ignite.IgniteDataStreamer#autoFlushFrequency(long)]] + */ + val OPTION_STREAMER_FLUSH_FREQUENCY = "streamerFlushFrequency" + + /** + * Config option for saving data frame. + * Internally all SQL inserts are done through `IgniteDataStreamer`. + * This options sets perNodeBufferSize` property of streamer. + * + * @example {{{ + * val igniteDF = spark.write.format(IGNITE) + * // other options ... + * .option(OPTION_STREAMING_PER_NODE_BUFFER_SIZE, 1024) + * .save() + * }}} + * + * @see [[org.apache.ignite.IgniteDataStreamer]] + * @see [[org.apache.ignite.IgniteDataStreamer#perNodeBufferSize(int)]] + */ + val OPTION_STREAMER_PER_NODE_BUFFER_SIZE = "streamerPerNodeBufferSize" + + /** + * Config option for saving data frame. + * Internally all SQL inserts are done through `IgniteDataStreamer`. + * This options sets `perNodeParallelOperations` property of streamer. + * + * @example {{{ + * val igniteDF = spark.write.format(IGNITE) + * // other options ... + * .option(OPTION_STREAMING_PER_NODE_PARALLEL_OPERATIONS, 42) + * .save() + * }}} + * + * @see [[org.apache.ignite.IgniteDataStreamer]] + * @see [[org.apache.ignite.IgniteDataStreamer#perNodeParallelOperations(int)]] + */ + val OPTION_STREAMER_PER_NODE_PARALLEL_OPERATIONS = "streamerPerNodeParallelOperations" } http://git-wip-us.apache.org/repos/asf/ignite/blob/7c014529/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteRelationProvider.scala ---------------------------------------------------------------------- diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteRelationProvider.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteRelationProvider.scala index 4762d8d..a9f9f89 100644 --- a/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteRelationProvider.scala +++ b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteRelationProvider.scala @@ -17,21 +17,24 @@ package org.apache.ignite.spark.impl +import org.apache.ignite.IgniteException import org.apache.ignite.configuration.IgniteConfiguration import org.apache.ignite.internal.IgnitionEx import org.apache.ignite.internal.util.IgniteUtils -import org.apache.ignite.spark.IgniteDataFrameSettings.OPTION_TABLE import org.apache.ignite.spark.IgniteContext import org.apache.ignite.spark.IgniteDataFrameSettings._ -import org.apache.ignite.IgniteException -import org.apache.spark.sql.SQLContext -import org.apache.spark.sql.ignite.IgniteExternalCatalog.OPTION_GRID +import org.apache.ignite.spark.impl.QueryHelper.{createTable, dropTable, ensureCreateTableOptions, saveTable} +import org.apache.spark.sql.SaveMode.{Append, Overwrite} +import org.apache.spark.sql.ignite.IgniteExternalCatalog.{IGNITE_PROTOCOL, OPTION_GRID} import org.apache.spark.sql.sources._ +import org.apache.spark.sql.{DataFrame, SQLContext, SaveMode} /** * Apache Ignite relation provider. */ -class IgniteRelationProvider extends RelationProvider with DataSourceRegister { +class IgniteRelationProvider extends RelationProvider + with CreatableRelationProvider + with DataSourceRegister { /** * @return "ignite" - name of relation provider. */ @@ -42,8 +45,7 @@ class IgniteRelationProvider extends RelationProvider with DataSourceRegister { * To refer cluster user have to specify one of config parameter: * <ul> * <li><code>config</code> - path to ignite configuration file. - * <li><code>grid</code> - grid name. Note that grid has to be started in the same jvm. - * <ul> + * </ul> * Existing table inside Apache Ignite should be referred via <code>table</code> parameter. * * @param sqlCtx SQLContext. @@ -51,8 +53,141 @@ class IgniteRelationProvider extends RelationProvider with DataSourceRegister { * @return IgniteRelation. * @see IgniteRelation * @see IgnitionEx#grid(String) + * @see org.apache.ignite.spark.IgniteDataFrameSettings.OPTION_TABLE + * @see org.apache.ignite.spark.IgniteDataFrameSettings.OPTION_CONFIG_FILE + */ + override def createRelation(sqlCtx: SQLContext, params: Map[String, String]): BaseRelation = + createRelation( + igniteContext(params, sqlCtx), + params.getOrElse(OPTION_TABLE, throw new IgniteException("'table' must be specified.")), + sqlCtx) + + /** + * Save `data` to corresponding Ignite table and returns Relation for saved data. + * + * To save data or create IgniteRelation we need a link to a ignite cluster and a table name. + * To refer cluster user have to specify one of config parameter: + * <ul> + * <li><code>config</code> - path to ignite configuration file. + * </ul> + * Existing table inside Apache Ignite should be referred via <code>table</code> or <code>path</code> parameter. + * + * If table doesn't exists it will be created. + * If `mode` is Overwrite and `table` already exists it will be recreated(DROP TABLE, CREATE TABLE). + * + * If table create is required use can set following options: + * + * <ul> + * <li>`OPTION_PRIMARY_KEY_FIELDS` - required option. comma separated list of fields for primary key.</li> + * <li>`OPTION_CACHE_FOR_DDL` - required option. Existing cache name for executing SQL DDL statements. + * <li>`OPTION_CREATE_TABLE_OPTIONS` - Ignite specific parameters for a new table. See WITH [https://apacheignite-sql.readme.io/docs/create-table].</li> + * </ul> + * + * Data write executed 'by partition'. User can set `OPTION_WRITE_PARTITIONS_NUM` - number of partition for data. + * + * @param sqlCtx SQLContext. + * @param mode Save mode. + * @param params Additional parameters. + * @param data Data to save. + * @return IgniteRelation. + */ + override def createRelation(sqlCtx: SQLContext, + mode: SaveMode, + params: Map[String, String], + data: DataFrame): BaseRelation = { + + val ctx = igniteContext(params, sqlCtx) + + val tblName = tableName(params) + + val tblInfoOption = sqlTableInfo[Any, Any](ctx.ignite(), tblName) + + if (tblInfoOption.isDefined) { + mode match { + case Overwrite â + ensureCreateTableOptions(data.schema, params, ctx) + + dropTable(tblName, ctx.ignite()) + + val createTblOpts = params.get(OPTION_CREATE_TABLE_PARAMETERS) + + createTable(data.schema, + tblName, + primaryKeyFields(params), + createTblOpts, + ctx.ignite()) + + saveTable(data, + tblName, + ctx, + params.get(OPTION_STREAMER_ALLOW_OVERWRITE).map(_.toBoolean), + params.get(OPTION_STREAMER_FLUSH_FREQUENCY).map(_.toLong), + params.get(OPTION_STREAMER_PER_NODE_BUFFER_SIZE).map(_.toInt), + params.get(OPTION_STREAMER_PER_NODE_PARALLEL_OPERATIONS).map(_.toInt)) + + case Append â + saveTable(data, + tblName, + ctx, + params.get(OPTION_STREAMER_ALLOW_OVERWRITE).map(_.toBoolean), + params.get(OPTION_STREAMER_FLUSH_FREQUENCY).map(_.toLong), + params.get(OPTION_STREAMER_PER_NODE_BUFFER_SIZE).map(_.toInt), + params.get(OPTION_STREAMER_PER_NODE_PARALLEL_OPERATIONS).map(_.toInt)) + + case SaveMode.ErrorIfExists => + throw new IgniteException(s"Table or view '$tblName' already exists. SaveMode: ErrorIfExists.") + + case SaveMode.Ignore => + // With `SaveMode.Ignore` mode, if table already exists, the save operation is expected + // to not save the contents of the DataFrame and to not change the existing data. + // Therefore, it is okay to do nothing here and then just return the relation below. + } + } + else { + ensureCreateTableOptions(data.schema, params, ctx) + + val primaryKeyFields = params(OPTION_CREATE_TABLE_PRIMARY_KEY_FIELDS).split(",") + + val createTblOpts = params.get(OPTION_CREATE_TABLE_PARAMETERS) + + createTable(data.schema, + tblName, + primaryKeyFields, + createTblOpts, + ctx.ignite()) + + saveTable(data, + tblName, + ctx, + params.get(OPTION_STREAMER_ALLOW_OVERWRITE).map(_.toBoolean), + params.get(OPTION_STREAMER_FLUSH_FREQUENCY).map(_.toLong), + params.get(OPTION_STREAMER_PER_NODE_BUFFER_SIZE).map(_.toInt), + params.get(OPTION_STREAMER_PER_NODE_PARALLEL_OPERATIONS).map(_.toInt)) + } + + createRelation(ctx, + tblName, + sqlCtx) + } + + /** + * @param igniteCtx Ignite context. + * @param tblName Table name. + * @param sqlCtx SQL context. + * @return Ignite SQL relation. + */ + private def createRelation(igniteCtx: IgniteContext, tblName: String, sqlCtx: SQLContext): BaseRelation = + IgniteSQLRelation( + igniteCtx, + tblName, + sqlCtx) + + /** + * @param params Params. + * @param sqlCtx SQL Context. + * @return IgniteContext. */ - override def createRelation(sqlCtx: SQLContext, params: Map[String, String]): BaseRelation = { + private def igniteContext(params: Map[String, String], sqlCtx: SQLContext): IgniteContext = { val igniteHome = IgniteUtils.getIgniteHome def configProvider: () â IgniteConfiguration = { @@ -80,11 +215,27 @@ class IgniteRelationProvider extends RelationProvider with DataSourceRegister { throw new IgniteException("'config' must be specified to connect to ignite cluster.") } - val ic = IgniteContext(sqlCtx.sparkContext, configProvider) + IgniteContext(sqlCtx.sparkContext, configProvider) + } + + /** + * @param params Params. + * @return Table name. + */ + private def tableName(params: Map[String, String]): String = { + val tblName = params.getOrElse(OPTION_TABLE, + params.getOrElse("path", throw new IgniteException("'table' or 'path' must be specified."))) - if (params.contains(OPTION_TABLE)) - IgniteSQLRelation(ic, params(OPTION_TABLE).toUpperCase, sqlCtx) + if (tblName.startsWith(IGNITE_PROTOCOL)) + tblName.replace(IGNITE_PROTOCOL, "").toUpperCase() else - throw new IgniteException("'table' must be specified for loading ignite data.") + tblName.toUpperCase } + + /** + * @param params Params. + * @return Sequence of primary key fields. + */ + private def primaryKeyFields(params: Map[String, String]): Seq[String] = + params(OPTION_CREATE_TABLE_PRIMARY_KEY_FIELDS).split(",") } http://git-wip-us.apache.org/repos/asf/ignite/blob/7c014529/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteSQLRelation.scala ---------------------------------------------------------------------- diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteSQLRelation.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteSQLRelation.scala index bdcf57b..1fb8de7 100644 --- a/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteSQLRelation.scala +++ b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteSQLRelation.scala @@ -17,12 +17,13 @@ package org.apache.ignite.spark.impl -import org.apache.ignite.{Ignite, IgniteException} +import org.apache.ignite.IgniteException import org.apache.ignite.cache.{CacheMode, QueryEntity} import org.apache.ignite.cluster.ClusterNode import org.apache.ignite.configuration.CacheConfiguration import org.apache.ignite.spark.{IgniteContext, IgniteRDD} import org.apache.spark.Partition +import org.apache.spark.internal.Logging import org.apache.spark.rdd.RDD import org.apache.spark.sql.sources._ import org.apache.spark.sql.types._ @@ -37,7 +38,7 @@ import scala.collection.mutable.ArrayBuffer class IgniteSQLRelation[K, V]( private[spark] val ic: IgniteContext, private[spark] val tableName: String) - (@transient val sqlContext: SQLContext) extends BaseRelation with PrunedFilteredScan { + (@transient val sqlContext: SQLContext) extends BaseRelation with PrunedFilteredScan with Logging { /** * @return Schema of Ignite SQL table. @@ -55,6 +56,19 @@ class IgniteSQLRelation[K, V]( * @return Apache Ignite RDD implementation. */ override def buildScan(columns: Array[String], filters: Array[Filter]): RDD[Row] = { + val qryAndArgs = queryAndArgs(columns, filters) + + IgniteSQLDataFrameRDD[K, V](ic, cacheName, schema, qryAndArgs._1, qryAndArgs._2, calcPartitions(filters)) + } + + override def toString = s"IgniteSQLRelation[table=$tableName]" + + /** + * @param columns Columns to select. + * @param filters Filters to apply. + * @return SQL query string and arguments for it. + */ + private def queryAndArgs(columns: Array[String], filters: Array[Filter]): (String, List[Any]) = { val columnsStr = if (columns.isEmpty) "*" @@ -65,82 +79,17 @@ class IgniteSQLRelation[K, V]( //Query will be executed by Ignite SQL Engine. val qryAndArgs = filters match { case Array(_, _*) â - val where = compileWhere(filters) + val where = QueryUtils.compileWhere(filters) + (s"SELECT $columnsStr FROM $tableName WHERE ${where._1}", where._2) + case _ â (s"SELECT $columnsStr FROM $tableName", List.empty) } - IgniteSQLDataFrameRDD[K, V](ic, cacheName, schema, qryAndArgs._1, qryAndArgs._2, calcPartitions(filters)) - } - - override def toString = s"IgniteSQLRelation[table=$tableName]" + logInfo(qryAndArgs._1) - /** - * Builds `where` part of SQL query. - * - * @param filters Filter to apply. - * @return Tuple contains `where` string and `List[Any]` of query parameters. - */ - private def compileWhere(filters: Array[Filter]): (String, List[Any]) = - filters.foldLeft(("", List[Any]()))(buildSingleClause) - - /** - * Adds single where clause to `state` and returns new state. - * - * @param state Current `where` state. - * @param clause Clause to add. - * @return `where` with given clause. - */ - private def buildSingleClause(state: (String, List[Any]), clause: Filter): (String, List[Any]) = { - val filterStr = state._1 - val params = state._2 - - clause match { - case EqualTo(attr, value) â (addStrClause(filterStr, s"$attr = ?"), params :+ value) - - case EqualNullSafe(attr, value) â (addStrClause(filterStr, s"($attr IS NULL OR $attr = ?)"), params :+ value) - - case GreaterThan(attr, value) â (addStrClause(filterStr, s"$attr > ?"), params :+ value) - - case GreaterThanOrEqual(attr, value) â (addStrClause(filterStr, s"$attr >= ?"), params :+ value) - - case LessThan(attr, value) â (addStrClause(filterStr, s"$attr < ?"), params :+ value) - - case LessThanOrEqual(attr, value) â (addStrClause(filterStr, s"$attr <= ?"), params :+ value) - - case In(attr, values) â (addStrClause(filterStr, s"$attr IN (${values.map(_ â "?").mkString(",")})"), params ++ values) - - case IsNull(attr) â (addStrClause(filterStr, s"$attr IS NULL"), params) - - case IsNotNull(attr) â (addStrClause(filterStr, s"$attr IS NOT NULL"), params) - - case And(left, right) â - val leftClause = buildSingleClause(("", params), left) - val rightClause = buildSingleClause(("", leftClause._2), right) - - (addStrClause(filterStr, s"${leftClause._1} AND ${rightClause._1}"), rightClause._2) - - case Or(left, right) â - val leftClause = buildSingleClause(("", params), left) - val rightClause = buildSingleClause(("", leftClause._2), right) - - (addStrClause(filterStr, s"${leftClause._1} OR ${rightClause._1}"), rightClause._2) - - case Not(child) â - val innerClause = buildSingleClause(("", params), child) - - (addStrClause(filterStr, s"NOT ${innerClause._1}"), innerClause._2) - - case StringStartsWith(attr, value) â - (addStrClause(filterStr, s"$attr LIKE ?"), params :+ (value + "%")) - - case StringEndsWith(attr, value) â - (addStrClause(filterStr, s"$attr LIKE ?"), params :+ ("%" + value)) - - case StringContains(attr, value) â - (addStrClause(filterStr, s"$attr LIKE ?"), params :+ ("%" + value + "%")) - } + qryAndArgs } private def calcPartitions(filters: Array[Filter]): Array[Partition] = { http://git-wip-us.apache.org/repos/asf/ignite/blob/7c014529/modules/spark/src/main/scala/org/apache/ignite/spark/impl/QueryHelper.scala ---------------------------------------------------------------------- diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/impl/QueryHelper.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/QueryHelper.scala new file mode 100644 index 0000000..4342265 --- /dev/null +++ b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/QueryHelper.scala @@ -0,0 +1,186 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spark.impl + +import org.apache.ignite.cache.query.SqlFieldsQuery +import org.apache.ignite.spark.IgniteDataFrameSettings._ +import QueryUtils.{compileCreateTable, compileDropTable, compileInsert} +import org.apache.ignite.internal.IgniteEx +import org.apache.ignite.spark.IgniteContext +import org.apache.ignite.{Ignite, IgniteException} +import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.{DataFrame, Row} + +/** + * Helper class for executing DDL queries. + */ +private[apache] object QueryHelper { + /** + * Drops provided table. + * + * @param tableName Table name. + * @param ignite Ignite. + */ + def dropTable(tableName: String, ignite: Ignite): Unit = { + val qryProcessor = ignite.asInstanceOf[IgniteEx].context().query() + + val qry = compileDropTable(tableName) + + qryProcessor.querySqlFields(new SqlFieldsQuery(qry), true).getAll + } + + /** + * Creates table. + * + * @param schema Schema. + * @param tblName Table name. + * @param primaryKeyFields Primary key fields. + * @param createTblOpts Ignite specific options. + * @param ignite Ignite. + */ + def createTable(schema: StructType, tblName: String, primaryKeyFields: Seq[String], createTblOpts: Option[String], + ignite: Ignite): Unit = { + val qryProcessor = ignite.asInstanceOf[IgniteEx].context().query() + + val qry = compileCreateTable(schema, tblName, primaryKeyFields, createTblOpts) + + qryProcessor.querySqlFields(new SqlFieldsQuery(qry), true).getAll + } + + /** + * Ensures all options are specified correctly to create table based on provided `schema`. + * + * @param schema Schema of new table. + * @param params Parameters. + */ + def ensureCreateTableOptions(schema: StructType, params: Map[String, String], ctx: IgniteContext): Unit = { + if (!params.contains(OPTION_TABLE) && !params.contains("path")) + throw new IgniteException("'table' must be specified.") + + params.get(OPTION_CREATE_TABLE_PRIMARY_KEY_FIELDS) + .map(_.split(',')) + .getOrElse(throw new IgniteException("Can't create table! Primary key fields has to be specified.")) + .map(_.trim) + .foreach { pkField â + if (pkField == "") + throw new IgniteException("PK field can't be empty.") + + if (!schema.exists(_.name.equalsIgnoreCase(pkField))) + throw new IgniteException(s"'$pkField' doesn't exists in DataFrame schema.") + + } + } + + /** + * Saves data to the table. + * + * @param data Data. + * @param tblName Table name. + * @param ctx Ignite context. + * @param streamerAllowOverwrite Flag enabling overwriting existing values in cache. + * @param streamerFlushFrequency Insert query streamer automatic flush frequency. + * @param streamerPerNodeBufferSize Insert query streamer size of per node query buffer. + * @param streamerPerNodeParallelOperations Insert query streamer maximum number of parallel operations for a single node. + * + * @see [[org.apache.ignite.IgniteDataStreamer]] + * @see [[org.apache.ignite.IgniteDataStreamer#allowOverwrite(boolean)]] + * @see [[org.apache.ignite.IgniteDataStreamer#autoFlushFrequency(long)]] + * @see [[org.apache.ignite.IgniteDataStreamer#perNodeBufferSize(int)]] + * @see [[org.apache.ignite.IgniteDataStreamer#perNodeParallelOperations(int)]] + */ + def saveTable(data: DataFrame, + tblName: String, + ctx: IgniteContext, + streamerAllowOverwrite: Option[Boolean], + streamerFlushFrequency: Option[Long], + streamerPerNodeBufferSize: Option[Int], + streamerPerNodeParallelOperations: Option[Int] + ): Unit = { + val insertQry = compileInsert(tblName, data.schema) + + data.rdd.foreachPartition(iterator => + savePartition(iterator, + insertQry, + tblName, + ctx, + streamerAllowOverwrite, + streamerFlushFrequency, + streamerPerNodeBufferSize, + streamerPerNodeParallelOperations + )) + } + + /** + * Saves partition data to the Ignite table. + * + * @param iterator Data iterator. + * @param insertQry Insert query. + * @param tblName Table name. + * @param ctx Ignite context. + * @param streamerAllowOverwrite Flag enabling overwriting existing values in cache. + * @param streamerFlushFrequency Insert query streamer automatic flush frequency. + * @param streamerPerNodeBufferSize Insert query streamer size of per node query buffer. + * @param streamerPerNodeParallelOperations Insert query streamer maximum number of parallel operations for a single node. + * + * @see [[org.apache.ignite.IgniteDataStreamer]] + * @see [[org.apache.ignite.IgniteDataStreamer#allowOverwrite(boolean)]] + * @see [[org.apache.ignite.IgniteDataStreamer#autoFlushFrequency(long)]] + * @see [[org.apache.ignite.IgniteDataStreamer#perNodeBufferSize(int)]] + * @see [[org.apache.ignite.IgniteDataStreamer#perNodeParallelOperations(int)]] + */ + private def savePartition(iterator: Iterator[Row], + insertQry: String, + tblName: String, + ctx: IgniteContext, + streamerAllowOverwrite: Option[Boolean], + streamerFlushFrequency: Option[Long], + streamerPerNodeBufferSize: Option[Int], + streamerPerNodeParallelOperations: Option[Int] + ): Unit = { + val tblInfo = sqlTableInfo[Any, Any](ctx.ignite(), tblName).get + + val streamer = ctx.ignite().dataStreamer(tblInfo._1.getName) + + streamerAllowOverwrite.foreach(v â streamer.allowOverwrite(v)) + + streamerFlushFrequency.foreach(v â streamer.autoFlushFrequency(v)) + + streamerPerNodeBufferSize.foreach(v â streamer.perNodeBufferSize(v)) + + streamerPerNodeParallelOperations.foreach(v â streamer.perNodeParallelOperations(v)) + + try { + val qryProcessor = ctx.ignite().asInstanceOf[IgniteEx].context().query() + + iterator.foreach { row â + val schema = row.schema + + val args = schema.map { f â + row.get(row.fieldIndex(f.name)).asInstanceOf[Object] + } + + qryProcessor.streamUpdateQuery(tblInfo._1.getName, + tblInfo._1.getSqlSchema, streamer, insertQry, args.toArray) + } + } + finally { + streamer.close() + } + + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/7c014529/modules/spark/src/main/scala/org/apache/ignite/spark/impl/QueryUtils.scala ---------------------------------------------------------------------- diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/impl/QueryUtils.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/QueryUtils.scala new file mode 100644 index 0000000..79aa523 --- /dev/null +++ b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/QueryUtils.scala @@ -0,0 +1,225 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spark.impl + +import org.apache.ignite.IgniteException +import org.apache.spark.internal.Logging +import org.apache.spark.sql.sources._ +import org.apache.spark.sql.types._ + +/** + * Utility class for building SQL queries. + */ +private[impl] object QueryUtils extends Logging { + /** + * Builds `where` part of SQL query. + * + * @param filters Filter to apply. + * @return Tuple contains `where` string and `List[Any]` of query parameters. + */ + def compileWhere(filters: Seq[Filter]): (String, List[Any]) = + filters.foldLeft(("", List[Any]()))(buildSingleClause) + + /** + * Builds `insert` query for provided table and schema. + * + * @param tblName Table name. + * @param tblSchema Schema. + * @return SQL query to insert data into table. + */ + def compileInsert(tblName: String, tblSchema: StructType): String = { + val columns = tblSchema.fields.map(_.name).mkString(",") + val placeholder = tblSchema.fields.map(_ â "?").mkString(",") + + val qry = s"INSERT INTO $tblName($columns) VALUES($placeholder)" + + logInfo(qry) + + qry + } + + /** + * Builds `drop table` query. + * + * @param tblName Table name. + * @return SQL query to drop table. + */ + def compileDropTable(tblName: String): String = { + val qry = s"DROP TABLE ${tblName}" + + logInfo(qry) + + qry + } + + /** + * Builds `create table` query. + * + * @param schema Schema. + * @param tblName Table name. + * @param primaryKeyFields Primary key fields. + * @param createTblOpts Ignite specific options for table. + * @return SQL query to create table. + */ + def compileCreateTable(schema: StructType, tblName: String, primaryKeyFields: Seq[String], createTblOpts: Option[String]): String = { + val pk = s", PRIMARY KEY (${primaryKeyFields.mkString(",")})" + + val withParams = createTblOpts.map(w â s"""WITH \"$w\"""").getOrElse("") + + val qry = s"CREATE TABLE $tblName (${schema.map(compileColumn).mkString(", ")} $pk) $withParams" + + logInfo(qry) + + qry + } + + /** + * @param field Column. + * @return SQL query part for column. + */ + private def compileColumn(field: StructField): String = { + val col = s"${field.name} ${dataType(field)}" + + if (!field.nullable) + col + " NOT NULL" + else + col + } + + /** + * Gets Ignite data type based on type name. + * + * @param field Field. + * @return SQL data type. + */ + private def dataType(field: StructField): String = field.dataType match { + case BooleanType â + "BOOLEAN" + + case ByteType â + "TINYINT" + + case ShortType â + "SMALLINT" + + case IntegerType â + "INT" + + case LongType â + "BIGINT" + + case FloatType â + "FLOAT" + + case DoubleType â + "DOUBLE" + + //For now Ignite doesn't provide correct information about DECIMAL column precision and scale. + //All we have is default scale and precision. + //Just replace it with some "common sense" values. + case decimal: DecimalType if decimal.precision == 10 && decimal.scale == 0 â + s"DECIMAL(10, 5)" + + case decimal: DecimalType â + s"DECIMAL(${decimal.precision}, ${decimal.scale})" + + case StringType â + "VARCHAR" + + case DateType â + "DATE" + + case TimestampType â + "TIMESTAMP" + + case _ â + throw new IgniteException(s"Unsupported data type ${field.dataType}") + } + + /** + * Adds single where clause to `state` and returns new state. + * + * @param state Current `where` state. + * @param clause Clause to add. + * @return `where` with given clause. + */ + private def buildSingleClause(state: (String, List[Any]), clause: Filter): (String, List[Any]) = { + val filterStr = state._1 + + val params = state._2 + + clause match { + case EqualTo(attr, value) â (addStrClause(filterStr, s"$attr = ?"), params :+ value) + + case EqualNullSafe(attr, value) â (addStrClause(filterStr, s"($attr IS NULL OR $attr = ?)"), params :+ value) + + case GreaterThan(attr, value) â (addStrClause(filterStr, s"$attr > ?"), params :+ value) + + case GreaterThanOrEqual(attr, value) â (addStrClause(filterStr, s"$attr >= ?"), params :+ value) + + case LessThan(attr, value) â (addStrClause(filterStr, s"$attr < ?"), params :+ value) + + case LessThanOrEqual(attr, value) â (addStrClause(filterStr, s"$attr <= ?"), params :+ value) + + case In(attr, values) â (addStrClause(filterStr, s"$attr IN (${values.map(_ â "?").mkString(",")})"), params ++ values) + + case IsNull(attr) â (addStrClause(filterStr, s"$attr IS NULL"), params) + + case IsNotNull(attr) â (addStrClause(filterStr, s"$attr IS NOT NULL"), params) + + case And(left, right) â + val leftClause = buildSingleClause(("", params), left) + val rightClause = buildSingleClause(("", leftClause._2), right) + + (addStrClause(filterStr, s"${leftClause._1} AND ${rightClause._1}"), rightClause._2) + + case Or(left, right) â + val leftClause = buildSingleClause(("", params), left) + val rightClause = buildSingleClause(("", leftClause._2), right) + + (addStrClause(filterStr, s"${leftClause._1} OR ${rightClause._1}"), rightClause._2) + + case Not(child) â + val innerClause = buildSingleClause(("", params), child) + + (addStrClause(filterStr, s"NOT ${innerClause._1}"), innerClause._2) + + case StringStartsWith(attr, value) â + (addStrClause(filterStr, s"$attr LIKE ?"), params :+ (value + "%")) + + case StringEndsWith(attr, value) â + (addStrClause(filterStr, s"$attr LIKE ?"), params :+ ("%" + value)) + + case StringContains(attr, value) â + (addStrClause(filterStr, s"$attr LIKE ?"), params :+ ("%" + value + "%")) + } + } + + /** + * Utility method to add clause to sql WHERE string. + * + * @param filterStr Current filter string + * @param clause Clause to add. + * @return Filter string. + */ + private def addStrClause(filterStr: String, clause: String) = + if (filterStr.isEmpty) + clause + else + filterStr + " AND " + clause +} http://git-wip-us.apache.org/repos/asf/ignite/blob/7c014529/modules/spark/src/main/scala/org/apache/ignite/spark/impl/package.scala ---------------------------------------------------------------------- diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/impl/package.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/package.scala index 815854c..4634a97 100644 --- a/modules/spark/src/main/scala/org/apache/ignite/spark/impl/package.scala +++ b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/package.scala @@ -113,7 +113,7 @@ package object impl { * @tparam V Value class. * @return CacheConfiguration and QueryEntity for a given table. */ - private def sqlTableInfo[K, V](ignite: Ignite, tabName: String): Option[(CacheConfiguration[K, V], QueryEntity)] = + def sqlTableInfo[K, V](ignite: Ignite, tabName: String): Option[(CacheConfiguration[K, V], QueryEntity)] = ignite.cacheNames().map { cacheName â val ccfg = ignite.cache[K, V](cacheName).getConfiguration(classOf[CacheConfiguration[K, V]]) http://git-wip-us.apache.org/repos/asf/ignite/blob/7c014529/modules/spark/src/main/scala/org/apache/spark/sql/ignite/IgniteExternalCatalog.scala ---------------------------------------------------------------------- diff --git a/modules/spark/src/main/scala/org/apache/spark/sql/ignite/IgniteExternalCatalog.scala b/modules/spark/src/main/scala/org/apache/spark/sql/ignite/IgniteExternalCatalog.scala index 9c1fe0c..6876a3e 100644 --- a/modules/spark/src/main/scala/org/apache/spark/sql/ignite/IgniteExternalCatalog.scala +++ b/modules/spark/src/main/scala/org/apache/spark/sql/ignite/IgniteExternalCatalog.scala @@ -17,12 +17,14 @@ package org.apache.spark.sql.ignite +import java.net.URI + import org.apache.ignite.configuration.CacheConfiguration import org.apache.ignite.spark.IgniteDataFrameSettings.OPTION_TABLE import org.apache.ignite.spark.IgniteContext import org.apache.ignite.spark.IgniteDataFrameSettings._ import org.apache.ignite.spark.impl.IgniteSQLRelation.schema -import org.apache.ignite.{Ignite, Ignition} +import org.apache.ignite.{Ignite, IgniteException, Ignition} import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec @@ -31,7 +33,8 @@ import org.apache.spark.sql.catalyst.expressions.Expression import org.apache.spark.sql.catalyst.util.StringUtils import org.apache.spark.sql.types.StructType import org.apache.ignite.spark.impl._ -import org.apache.spark.sql.ignite.IgniteExternalCatalog.OPTION_GRID +import org.apache.spark.sql.catalyst.catalog.SessionCatalog.DEFAULT_DATABASE +import org.apache.spark.sql.ignite.IgniteExternalCatalog.{IGNITE_PROTOCOL, IGNITE_URI, OPTION_GRID} import scala.collection.JavaConversions._ @@ -40,7 +43,8 @@ import scala.collection.JavaConversions._ * * @param defaultIgniteContext Ignite context to provide access to Ignite instance. If <code>None</code> passed then no-name instance of Ignite used. */ -private[ignite] class IgniteExternalCatalog(defaultIgniteContext: IgniteContext) extends ExternalCatalog { +private[ignite] class IgniteExternalCatalog(defaultIgniteContext: IgniteContext) + extends ExternalCatalog { /** * Default Ignite instance. */ @@ -51,7 +55,7 @@ private[ignite] class IgniteExternalCatalog(defaultIgniteContext: IgniteContext) * @return Description of Ignite instance. */ override def getDatabase(db: String): CatalogDatabase = - CatalogDatabase(db, db, null, Map.empty) + CatalogDatabase(db, db, IGNITE_URI, Map.empty) /** * Checks Ignite instance with provided name exists. @@ -61,7 +65,7 @@ private[ignite] class IgniteExternalCatalog(defaultIgniteContext: IgniteContext) * @return True is Ignite instance exists. */ override def databaseExists(db: String): Boolean = - db == SessionCatalog.DEFAULT_DATABASE || igniteExists(db) + db == DEFAULT_DATABASE || igniteExists(db) /** * @return List of all known Ignite instances names. @@ -104,7 +108,7 @@ private[ignite] class IgniteExternalCatalog(defaultIgniteContext: IgniteContext) identifier = new TableIdentifier(tableName, Some(gridName)), tableType = CatalogTableType.EXTERNAL, storage = CatalogStorageFormat( - locationUri = None, + locationUri = Some(URI.create(IGNITE_PROTOCOL + tableName)), inputFormat = Some(FORMAT_IGNITE), outputFormat = Some(FORMAT_IGNITE), serde = None, @@ -259,12 +263,39 @@ private[ignite] class IgniteExternalCatalog(defaultIgniteContext: IgniteContext) throw new UnsupportedOperationException("unsupported") /** @inheritdoc */ - override protected def doCreateTable(tableDefinition: CatalogTable, ignoreIfExists: Boolean): Unit = - throw new UnsupportedOperationException("unsupported") + override protected def doCreateTable(tableDefinition: CatalogTable, ignoreIfExists: Boolean): Unit = { + val ignite = igniteOrDefault(tableDefinition.identifier.database.getOrElse(DEFAULT_DATABASE), default) + + igniteSQLTable(ignite, tableDefinition.identifier.table) match { + case Some(_) â + /* no-op */ + + case None â + val props = tableDefinition.storage.properties + + QueryHelper.createTable(tableDefinition.schema, + tableDefinition.identifier.table, + props(OPTION_CREATE_TABLE_PRIMARY_KEY_FIELDS).split(","), + props.get(OPTION_CREATE_TABLE_PARAMETERS), + ignite) + } + } /** @inheritdoc */ - override protected def doDropTable(db: String, table: String, ignoreIfNotExists: Boolean, purge: Boolean): Unit = - throw new UnsupportedOperationException("unsupported") + override protected def doDropTable(db: String, tabName: String, ignoreIfNotExists: Boolean, purge: Boolean): Unit = { + val ignite = igniteOrDefault(db, default) + + igniteSQLTable(ignite, tabName) match { + case Some(table) â + val tableName = table.getTableName + + QueryHelper.dropTable(tableName, ignite) + + case None â + if (!ignoreIfNotExists) + throw new IgniteException(s"Table $tabName doesn't exists.") + } + } /** @inheritdoc */ override protected def doRenameTable(db: String, oldName: String, newName: String): Unit = @@ -302,4 +333,14 @@ object IgniteExternalCatalog { * @see [[org.apache.ignite.Ignite#name()]] */ private[apache] val OPTION_GRID = "grid" + + /** + * Location of ignite tables. + */ + private[apache] val IGNITE_PROTOCOL = "ignite:/" + + /** + * URI location of ignite tables. + */ + private val IGNITE_URI = new URI(IGNITE_PROTOCOL) } http://git-wip-us.apache.org/repos/asf/ignite/blob/7c014529/modules/spark/src/test/resources/cities_non_unique.json ---------------------------------------------------------------------- diff --git a/modules/spark/src/test/resources/cities_non_unique.json b/modules/spark/src/test/resources/cities_non_unique.json new file mode 100644 index 0000000..f971c86 --- /dev/null +++ b/modules/spark/src/test/resources/cities_non_unique.json @@ -0,0 +1,6 @@ +{ "id": 1, "name": "Forest Hill" } +{ "id": 2, "name": "Denver" } +{ "id": 3, "name": "St. Petersburg" } +{ "id": 1, "name": "Paris" } +{ "id": 2, "name": "New York" } +{ "id": 3, "name": "Moscow" } http://git-wip-us.apache.org/repos/asf/ignite/blob/7c014529/modules/spark/src/test/scala/org/apache/ignite/spark/AbstractDataFrameSpec.scala ---------------------------------------------------------------------- diff --git a/modules/spark/src/test/scala/org/apache/ignite/spark/AbstractDataFrameSpec.scala b/modules/spark/src/test/scala/org/apache/ignite/spark/AbstractDataFrameSpec.scala index 4b0b2de..263f235 100644 --- a/modules/spark/src/test/scala/org/apache/ignite/spark/AbstractDataFrameSpec.scala +++ b/modules/spark/src/test/scala/org/apache/ignite/spark/AbstractDataFrameSpec.scala @@ -26,7 +26,7 @@ import java.lang.{Long â JLong} import org.apache.ignite.cache.query.SqlFieldsQuery import org.apache.ignite.cache.query.annotations.QuerySqlField import org.apache.ignite.internal.IgnitionEx -import org.apache.ignite.spark.AbstractDataFrameSpec.configuration +import org.apache.ignite.spark.AbstractDataFrameSpec._ import scala.annotation.meta.field import scala.reflect.ClassTag @@ -39,16 +39,12 @@ abstract class AbstractDataFrameSpec extends FunSpec with Matchers with BeforeAn var client: Ignite = _ override protected def beforeAll() = { - spark = SparkSession.builder() - .appName("DataFrameSpec") - .master("local") - .config("spark.executor.instances", "2") - .getOrCreate() - for (i â 0 to 3) Ignition.start(configuration("grid-" + i, client = false)) client = Ignition.getOrStart(configuration("client", client = true)) + + createSparkSession() } override protected def afterAll() = { @@ -60,12 +56,26 @@ abstract class AbstractDataFrameSpec extends FunSpec with Matchers with BeforeAn spark.close() } - def createPersonTable(client: Ignite, cacheName: String): Unit = { + protected def createSparkSession(): Unit = { + spark = SparkSession.builder() + .appName("DataFrameSpec") + .master("local") + .config("spark.executor.instances", "2") + .getOrCreate() + } + + def createPersonTable2(client: Ignite, cacheName: String): Unit = + createPersonTable0(client, cacheName, PERSON_TBL_NAME_2) + + def createPersonTable(client: Ignite, cacheName: String): Unit = + createPersonTable0(client, cacheName, PERSON_TBL_NAME) + + private def createPersonTable0(client: Ignite, cacheName: String, tblName: String): Unit = { val cache = client.cache(cacheName) cache.query(new SqlFieldsQuery( - """ - | CREATE TABLE person ( + s""" + | CREATE TABLE $tblName ( | id LONG, | name VARCHAR, | birth_date DATE, @@ -78,7 +88,7 @@ abstract class AbstractDataFrameSpec extends FunSpec with Matchers with BeforeAn | PRIMARY KEY (id, city_id)) WITH "backups=1, affinityKey=city_id" """.stripMargin)).getAll - val qry = new SqlFieldsQuery("INSERT INTO person (id, name, city_id) values (?, ?, ?)") + val qry = new SqlFieldsQuery(s"INSERT INTO $tblName (id, name, city_id) values (?, ?, ?)") cache.query(qry.setArgs(1L.asInstanceOf[JLong], "John Doe", 3L.asInstanceOf[JLong])).getAll cache.query(qry.setArgs(2L.asInstanceOf[JLong], "Jane Roe", 2L.asInstanceOf[JLong])).getAll @@ -120,6 +130,10 @@ object AbstractDataFrameSpec { val EMPLOYEE_CACHE_NAME = "cache3" + val PERSON_TBL_NAME = "person" + + val PERSON_TBL_NAME_2 = "person2" + def configuration(igniteInstanceName: String, client: Boolean): IgniteConfiguration = { val cfg = IgnitionEx.loadConfiguration(TEST_CONFIG_FILE).get1() @@ -151,6 +165,11 @@ object AbstractDataFrameSpec { ccfg } + + /** + * Enclose some closure, so it doesn't on outer object(default scala behaviour) while serializing. + */ + def enclose[E, R](enclosed: E)(func: E => R): R = func(enclosed) } case class Employee ( http://git-wip-us.apache.org/repos/asf/ignite/blob/7c014529/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteCatalogSpec.scala ---------------------------------------------------------------------- diff --git a/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteCatalogSpec.scala b/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteCatalogSpec.scala index 7a51198..23cfffd 100644 --- a/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteCatalogSpec.scala +++ b/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteCatalogSpec.scala @@ -21,14 +21,11 @@ import java.lang.{Long â JLong} import org.apache.ignite.cache.query.SqlFieldsQuery import org.apache.ignite.internal.IgnitionEx -import org.apache.ignite.spark.AbstractDataFrameSpec.{EMPLOYEE_CACHE_NAME, DEFAULT_CACHE, TEST_CONFIG_FILE} +import org.apache.ignite.spark.AbstractDataFrameSpec.{DEFAULT_CACHE, EMPLOYEE_CACHE_NAME, TEST_CONFIG_FILE, enclose} import org.apache.spark.sql.ignite.IgniteSparkSession import org.apache.spark.sql.types.{LongType, StringType} import org.junit.runner.RunWith import org.scalatest.junit.JUnitRunner -import org.apache.ignite.spark.impl._ - -import scala.collection.JavaConversions._ /** * Tests to check Spark Catalog implementation. @@ -152,9 +149,4 @@ class IgniteCatalogSpec extends AbstractDataFrameSpec { .igniteConfigProvider(configProvider) .getOrCreate() } - - /** - * Enclose some closure, so it doesn't on outer object(default scala behaviour) while serializing. - */ - def enclose[E, R](enclosed: E)(func: E => R): R = func(enclosed) } http://git-wip-us.apache.org/repos/asf/ignite/blob/7c014529/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteDataFrameSuite.scala ---------------------------------------------------------------------- diff --git a/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteDataFrameSuite.scala b/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteDataFrameSuite.scala index 67ebf3b..2ceb44a 100644 --- a/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteDataFrameSuite.scala +++ b/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteDataFrameSuite.scala @@ -25,6 +25,8 @@ import org.scalatest.Suites class IgniteDataFrameSuite extends Suites ( new IgniteDataFrameSchemaSpec, new IgniteSQLDataFrameSpec, + new IgniteSQLDataFrameWriteSpec, + new IgniteSQLDataFrameIgniteSessionWriteSpec, new IgniteDataFrameWrongConfigSpec, new IgniteCatalogSpec ) http://git-wip-us.apache.org/repos/asf/ignite/blob/7c014529/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteDataFrameWrongConfigSpec.scala ---------------------------------------------------------------------- diff --git a/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteDataFrameWrongConfigSpec.scala b/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteDataFrameWrongConfigSpec.scala index ce9f485..c32cb18 100644 --- a/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteDataFrameWrongConfigSpec.scala +++ b/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteDataFrameWrongConfigSpec.scala @@ -17,11 +17,9 @@ package org.apache.ignite.spark -import java.lang.{Integer â JInteger, String â JString} - import org.apache.ignite.spark.AbstractDataFrameSpec.TEST_CONFIG_FILE import org.apache.ignite.spark.IgniteDataFrameSettings._ -import org.apache.ignite.{IgniteException, IgniteIllegalStateException} +import org.apache.ignite.IgniteException import org.junit.runner.RunWith import org.scalatest.junit.JUnitRunner http://git-wip-us.apache.org/repos/asf/ignite/blob/7c014529/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteSQLDataFrameIgniteSessionWriteSpec.scala ---------------------------------------------------------------------- diff --git a/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteSQLDataFrameIgniteSessionWriteSpec.scala b/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteSQLDataFrameIgniteSessionWriteSpec.scala new file mode 100644 index 0000000..eb29cc8 --- /dev/null +++ b/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteSQLDataFrameIgniteSessionWriteSpec.scala @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spark + +import org.apache.ignite.internal.IgnitionEx +import org.apache.ignite.spark.AbstractDataFrameSpec.{TEST_CONFIG_FILE, enclose} +import org.apache.ignite.spark.IgniteDataFrameSettings._ +import org.apache.ignite.testframework.GridTestUtils.resolveIgnitePath +import org.apache.spark.sql.SaveMode +import org.apache.spark.sql.ignite.IgniteSparkSession +import org.junit.runner.RunWith +import org.scalatest.junit.JUnitRunner +import org.apache.spark.sql.functions._ + +/** + */ +@RunWith(classOf[JUnitRunner]) +class IgniteSQLDataFrameIgniteSessionWriteSpec extends IgniteSQLDataFrameWriteSpec { + describe("Additional features for IgniteSparkSession") { + it("Save data frame as a existing table with saveAsTable('table_name') - Overwrite") { + val citiesDataFrame = spark.read.json( + resolveIgnitePath("modules/spark/src/test/resources/cities.json").getAbsolutePath) + + citiesDataFrame.write + .format(FORMAT_IGNITE) + .option(OPTION_CONFIG_FILE, TEST_CONFIG_FILE) + .option(OPTION_CREATE_TABLE_PRIMARY_KEY_FIELDS, "id") + .option(OPTION_CREATE_TABLE_PARAMETERS, "template=replicated") + .mode(SaveMode.Overwrite) + .saveAsTable("city") + + assert(rowsCount("city") == citiesDataFrame.count(), + s"Table json_city should contain data from json file.") + } + + it("Save data frame as a existing table with saveAsTable('table_name') - Append") { + val citiesDataFrame = spark.read.json( + resolveIgnitePath("modules/spark/src/test/resources/cities.json").getAbsolutePath) + + val rowCnt = citiesDataFrame.count() + + citiesDataFrame + .withColumn("id", col("id") + rowCnt) //Edit id column to prevent duplication + .write + .format(FORMAT_IGNITE) + .option(OPTION_CONFIG_FILE, TEST_CONFIG_FILE) + .option(OPTION_CREATE_TABLE_PRIMARY_KEY_FIELDS, "id") + .option(OPTION_CREATE_TABLE_PARAMETERS, "template=replicated") + .mode(SaveMode.Append) + .partitionBy("id") + .saveAsTable("city") + + assert(rowsCount("city") == rowCnt*2, + s"Table json_city should contain data from json file.") + } + + it("Save data frame as a new table with saveAsTable('table_name')") { + val citiesDataFrame = spark.read.json( + resolveIgnitePath("modules/spark/src/test/resources/cities.json").getAbsolutePath) + + citiesDataFrame.write + .format(FORMAT_IGNITE) + .option(OPTION_CONFIG_FILE, TEST_CONFIG_FILE) + .option(OPTION_CREATE_TABLE_PRIMARY_KEY_FIELDS, "id") + .option(OPTION_CREATE_TABLE_PARAMETERS, "template=replicated") + .saveAsTable("new_cities") + + assert(rowsCount("new_cities") == citiesDataFrame.count(), + s"Table json_city should contain data from json file.") + } + } + + override protected def createSparkSession(): Unit = { + val configProvider = enclose(null) (x â () â { + val cfg = IgnitionEx.loadConfiguration(TEST_CONFIG_FILE).get1() + + cfg.setClientMode(true) + + cfg.setIgniteInstanceName("client-2") + + cfg + }) + + spark = IgniteSparkSession.builder() + .appName("DataFrameSpec") + .master("local") + .config("spark.executor.instances", "2") + .igniteConfigProvider(configProvider) + .getOrCreate() + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/7c014529/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteSQLDataFrameWriteSpec.scala ---------------------------------------------------------------------- diff --git a/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteSQLDataFrameWriteSpec.scala b/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteSQLDataFrameWriteSpec.scala new file mode 100644 index 0000000..30df27e --- /dev/null +++ b/modules/spark/src/test/scala/org/apache/ignite/spark/IgniteSQLDataFrameWriteSpec.scala @@ -0,0 +1,371 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spark + +import org.apache.ignite.IgniteException +import org.apache.ignite.spark.AbstractDataFrameSpec.{PERSON_TBL_NAME, PERSON_TBL_NAME_2, TEST_CONFIG_FILE} +import org.apache.ignite.spark.IgniteDataFrameSettings._ +import org.apache.ignite.spark.impl.sqlTableInfo +import org.apache.ignite.testframework.GridTestUtils.resolveIgnitePath +import org.apache.spark.sql.SaveMode.{Append, Ignore, Overwrite} +import org.apache.spark.sql.{DataFrame, SaveMode} +import org.junit.runner.RunWith +import org.scalatest.junit.JUnitRunner +import org.apache.spark.sql.functions._ + +/** + */ +@RunWith(classOf[JUnitRunner]) +class IgniteSQLDataFrameWriteSpec extends AbstractDataFrameSpec { + var personDataFrame: DataFrame = _ + + describe("Write DataFrame into a Ignite SQL table") { + it("Save data frame as a new table") { + val rowsCnt = personDataFrame.count() + + personDataFrame.write + .format(FORMAT_IGNITE) + .option(OPTION_CONFIG_FILE, TEST_CONFIG_FILE) + .option(OPTION_TABLE, "new_persons") + .option(OPTION_CREATE_TABLE_PRIMARY_KEY_FIELDS, "id") + .save() + + assert(rowsCnt == rowsCount("new_persons"), "Data should be saved into 'new_persons' table") + } + + it("Save data frame to existing table") { + val rowsCnt = personDataFrame.count() + + personDataFrame.write + .format(FORMAT_IGNITE) + .option(OPTION_CONFIG_FILE, TEST_CONFIG_FILE) + .option(OPTION_TABLE, PERSON_TBL_NAME_2) + .option(OPTION_CREATE_TABLE_PRIMARY_KEY_FIELDS, "id, city_id") + .option(OPTION_CREATE_TABLE_PARAMETERS, "backups=1, affinityKey=city_id") + .mode(Overwrite) + .save() + + assert(rowsCnt == rowsCount(PERSON_TBL_NAME_2), s"Data should be saved into $PERSON_TBL_NAME_2 table") + } + + it("Save data frame to existing table with streamer options") { + val rowsCnt = personDataFrame.count() + + personDataFrame.write + .format(FORMAT_IGNITE) + .option(OPTION_CONFIG_FILE, TEST_CONFIG_FILE) + .option(OPTION_TABLE, PERSON_TBL_NAME_2) + .option(OPTION_CREATE_TABLE_PRIMARY_KEY_FIELDS, "id, city_id") + .option(OPTION_CREATE_TABLE_PARAMETERS, "backups=1, affinityKey=city_id") + .option(OPTION_STREAMER_PER_NODE_PARALLEL_OPERATIONS, 3) + .option(OPTION_STREAMER_PER_NODE_BUFFER_SIZE, 1) + .option(OPTION_STREAMER_FLUSH_FREQUENCY, 10000) + .mode(Overwrite) + .save() + + assert(rowsCnt == rowsCount(PERSON_TBL_NAME_2), s"Data should be saved into $PERSON_TBL_NAME_2 table") + } + + it("Ignore save operation if table exists") { + //Count of records before saving + val person2RowsCntBeforeSave = rowsCount(PERSON_TBL_NAME_2) + + personDataFrame.write + .format(FORMAT_IGNITE) + .option(OPTION_CONFIG_FILE, TEST_CONFIG_FILE) + .option(OPTION_TABLE, PERSON_TBL_NAME_2) + .option(OPTION_CREATE_TABLE_PRIMARY_KEY_FIELDS, "id, city_id") + .option(OPTION_CREATE_TABLE_PARAMETERS, "backups=1, affinityKey=city_id") + .mode(Ignore) + .save() + + assert(rowsCount(PERSON_TBL_NAME_2) == person2RowsCntBeforeSave, "Save operation should be ignored.") + } + + it("Append data frame data to existing table") { + //Count of records before appending + val person2RowsCnt = rowsCount(PERSON_TBL_NAME_2) + + //Count of appended records + val personRowsCnt = personDataFrame.count() + + personDataFrame + .withColumn("id", col("id") + person2RowsCnt) //Edit id column to prevent duplication + .write.format(FORMAT_IGNITE) + .option(OPTION_CONFIG_FILE, TEST_CONFIG_FILE) + .option(OPTION_TABLE, PERSON_TBL_NAME_2) + .option(OPTION_CREATE_TABLE_PRIMARY_KEY_FIELDS, "id, city_id") + .option(OPTION_CREATE_TABLE_PARAMETERS, "backups=1, affinityKey=city_id") + .mode(Append) + .save() + + assert(rowsCount(PERSON_TBL_NAME_2) == person2RowsCnt + personRowsCnt, + s"Table $PERSON_TBL_NAME_2 should contain data from $PERSON_TBL_NAME") + } + + it("Save another data source data as a Ignite table") { + val citiesDataFrame = spark.read.json( + resolveIgnitePath("modules/spark/src/test/resources/cities.json").getAbsolutePath) + + citiesDataFrame.write + .format(FORMAT_IGNITE) + .option(OPTION_CONFIG_FILE, TEST_CONFIG_FILE) + .option(OPTION_TABLE, "json_city") + .option(OPTION_CREATE_TABLE_PRIMARY_KEY_FIELDS, "id") + .option(OPTION_CREATE_TABLE_PARAMETERS, "template=replicated") + .save() + + assert(rowsCount("json_city") == citiesDataFrame.count(), + "Table json_city should contain data from json file.") + } + + it("Save data frame as a new table with save('table_name')") { + val rowsCnt = personDataFrame.count() + + personDataFrame.write + .format(FORMAT_IGNITE) + .option(OPTION_CONFIG_FILE, TEST_CONFIG_FILE) + .option(OPTION_CREATE_TABLE_PRIMARY_KEY_FIELDS, "id") + .save("saved_persons") + + assert(rowsCnt == rowsCount("saved_persons"), "Data should be saved into 'saved_persons' table") + } + + it("Should keep first row if allowOverwrite is false") { + val nonUniqueCitiesDataFrame = spark.read.json( + resolveIgnitePath("modules/spark/src/test/resources/cities_non_unique.json").getAbsolutePath) + + nonUniqueCitiesDataFrame.write + .format(FORMAT_IGNITE) + .option(OPTION_CONFIG_FILE, TEST_CONFIG_FILE) + .option(OPTION_TABLE, "first_row_json_city") + .option(OPTION_CREATE_TABLE_PRIMARY_KEY_FIELDS, "id") + .option(OPTION_CREATE_TABLE_PARAMETERS, "template=replicated") + .option(OPTION_STREAMER_ALLOW_OVERWRITE, false) + .save() + + val cities = readTable("first_row_json_city").collect().sortBy(_.getAs[Long]("ID")) + + assert(cities(0).getAs[String]("NAME") == "Forest Hill") + assert(cities(1).getAs[String]("NAME") == "Denver") + assert(cities(2).getAs[String]("NAME") == "St. Petersburg") + } + + it("Should keep last row if allowOverwrite is true") { + val nonUniqueCitiesDataFrame = spark.read.json( + resolveIgnitePath("modules/spark/src/test/resources/cities_non_unique.json").getAbsolutePath) + + nonUniqueCitiesDataFrame.write + .format(FORMAT_IGNITE) + .option(OPTION_CONFIG_FILE, TEST_CONFIG_FILE) + .option(OPTION_TABLE, "last_row_json_city") + .option(OPTION_CREATE_TABLE_PRIMARY_KEY_FIELDS, "id") + .option(OPTION_CREATE_TABLE_PARAMETERS, "template=replicated") + .option(OPTION_STREAMER_ALLOW_OVERWRITE, true) + .save() + + val cities = readTable("last_row_json_city").collect().sortBy(_.getAs[Long]("ID")) + + assert(cities(0).getAs[String]("NAME") == "Paris") + assert(cities(1).getAs[String]("NAME") == "New York") + assert(cities(2).getAs[String]("NAME") == "Moscow") + } + } + + describe("Wrong DataFrame Write Options") { + it("Should throw exception with ErrorIfExists for a existing table") { + intercept[IgniteException] { + personDataFrame.write + .format(FORMAT_IGNITE) + .option(OPTION_CONFIG_FILE, TEST_CONFIG_FILE) + .option(OPTION_TABLE, PERSON_TBL_NAME) + .option(OPTION_CREATE_TABLE_PRIMARY_KEY_FIELDS, "id") + .mode(SaveMode.ErrorIfExists) + .save() + } + } + + it("Should throw exception if primary key fields not specified") { + intercept[IgniteException] { + personDataFrame.write + .format(FORMAT_IGNITE) + .option(OPTION_CONFIG_FILE, TEST_CONFIG_FILE) + .option(OPTION_TABLE, "persons_no_pk") + .save() + } + } + + it("Should throw exception if primary key fields not specified for existing table") { + intercept[IgniteException] { + personDataFrame.write + .format(FORMAT_IGNITE) + .option(OPTION_CONFIG_FILE, TEST_CONFIG_FILE) + .option(OPTION_TABLE, PERSON_TBL_NAME) + .mode(Overwrite) + .save() + } + + val tblInfo = sqlTableInfo[Any, Any](client, PERSON_TBL_NAME) + + assert(tblInfo.isDefined, s"Table $PERSON_TBL_NAME should exists.") + } + + it("Should throw exception for wrong pk field") { + intercept[IgniteException] { + personDataFrame.write + .format(FORMAT_IGNITE) + .option(OPTION_CONFIG_FILE, TEST_CONFIG_FILE) + .option(OPTION_TABLE, PERSON_TBL_NAME) + .option(OPTION_CREATE_TABLE_PRIMARY_KEY_FIELDS, "unknown_field") + .mode(Overwrite) + .save() + } + + val tblInfo = sqlTableInfo[Any, Any](client, PERSON_TBL_NAME) + + assert(tblInfo.isDefined, s"Table $PERSON_TBL_NAME should exists.") + } + + it("Should throw exception for wrong pk field - 2") { + intercept[IgniteException] { + personDataFrame.write + .format(FORMAT_IGNITE) + .option(OPTION_CONFIG_FILE, TEST_CONFIG_FILE) + .option(OPTION_TABLE, PERSON_TBL_NAME) + .option(OPTION_CREATE_TABLE_PRIMARY_KEY_FIELDS, "id,unknown_field") + .mode(Overwrite) + .save() + } + + val tblInfo = sqlTableInfo[Any, Any](client, PERSON_TBL_NAME) + + assert(tblInfo.isDefined, s"Table $PERSON_TBL_NAME should exists.") + } + + it("Should throw exception for wrong WITH clause") { + intercept[IgniteException] { + personDataFrame.write + .format(FORMAT_IGNITE) + .option(OPTION_CONFIG_FILE, TEST_CONFIG_FILE) + .option(OPTION_TABLE, "person_unsupported_with") + .option(OPTION_CREATE_TABLE_PARAMETERS, "unsupported_with_clause") + .mode(Overwrite) + .save() + } + } + + it("Should throw exception for wrong table name") { + intercept[IgniteException] { + personDataFrame.write + .format(FORMAT_IGNITE) + .option(OPTION_CONFIG_FILE, TEST_CONFIG_FILE) + .option(OPTION_TABLE, "wrong-table-name") + .option(OPTION_CREATE_TABLE_PARAMETERS, "unsupported_with_clause") + .mode(Overwrite) + .save() + } + } + + it("Should throw exception if streamingFlushFrequency is not a number") { + intercept[NumberFormatException] { + personDataFrame.write + .format(FORMAT_IGNITE) + .option(OPTION_CONFIG_FILE, TEST_CONFIG_FILE) + .option(OPTION_TABLE, PERSON_TBL_NAME) + .option(OPTION_CREATE_TABLE_PRIMARY_KEY_FIELDS, "id") + .option(OPTION_STREAMER_FLUSH_FREQUENCY, "not_a_number") + .mode(Overwrite) + .save() + } + } + + it("Should throw exception if streamingPerNodeBufferSize is not a number") { + intercept[NumberFormatException] { + personDataFrame.write + .format(FORMAT_IGNITE) + .option(OPTION_CONFIG_FILE, TEST_CONFIG_FILE) + .option(OPTION_TABLE, PERSON_TBL_NAME) + .option(OPTION_CREATE_TABLE_PRIMARY_KEY_FIELDS, "id") + .option(OPTION_STREAMER_PER_NODE_BUFFER_SIZE, "not_a_number") + .mode(Overwrite) + .save() + } + } + + it("Should throw exception if streamingPerNodeParallelOperations is not a number") { + intercept[NumberFormatException] { + personDataFrame.write + .format(FORMAT_IGNITE) + .option(OPTION_CONFIG_FILE, TEST_CONFIG_FILE) + .option(OPTION_TABLE, PERSON_TBL_NAME) + .option(OPTION_CREATE_TABLE_PRIMARY_KEY_FIELDS, "id") + .option(OPTION_STREAMER_PER_NODE_PARALLEL_OPERATIONS, "not_a_number") + .mode(Overwrite) + .save() + } + } + + it("Should throw exception if streamerAllowOverwrite is not a boolean") { + intercept[IllegalArgumentException] { + personDataFrame.write + .format(FORMAT_IGNITE) + .option(OPTION_CONFIG_FILE, TEST_CONFIG_FILE) + .option(OPTION_TABLE, PERSON_TBL_NAME) + .option(OPTION_CREATE_TABLE_PRIMARY_KEY_FIELDS, "id") + .option(OPTION_STREAMER_ALLOW_OVERWRITE, "not_a_boolean") + .mode(Overwrite) + .save() + } + } + } + + override protected def beforeAll(): Unit = { + super.beforeAll() + + createPersonTable(client, "cache1") + + createPersonTable2(client, "cache1") + + createCityTable(client, "cache1") + + personDataFrame = spark.read + .format(FORMAT_IGNITE) + .option(OPTION_CONFIG_FILE, TEST_CONFIG_FILE) + .option(OPTION_TABLE, PERSON_TBL_NAME) + .load() + + personDataFrame.createOrReplaceTempView("person") + } + + /** + * @param tbl Table name. + * @return Count of rows in table. + */ + protected def rowsCount(tbl: String): Long = readTable(tbl).count() + + /** + * @param tbl Table name. + * @return Ignite Table DataFrame. + */ + protected def readTable(tbl: String): DataFrame = + spark.read + .format(FORMAT_IGNITE) + .option(OPTION_CONFIG_FILE, TEST_CONFIG_FILE) + .option(OPTION_TABLE, tbl) + .load() +}