This is an automated email from the ASF dual-hosted git repository. zaleslaw pushed a commit to branch ignite-11723 in repository https://gitbox.apache.org/repos/asf/ignite.git
commit 1fa9f2cd3e3d79442dd2f3e27ed8e86291a8b9d1 Author: Alexey Zinoviev <zaleslaw....@gmail.com> AuthorDate: Mon Sep 30 16:53:52 2019 +0300 IGNITE-11723: fixed property --- .../apache/ignite/spark/IgniteDataFrameSettings.scala | 19 +++++++++++++++++++ .../scala/org/apache/ignite/spark/IgniteRDD.scala | 11 +++++++---- .../scala/org/apache/ignite/spark/JavaIgniteRDD.scala | 13 +++++++------ .../ignite/spark/impl/IgniteRelationProvider.scala | 3 +++ .../org/apache/ignite/spark/impl/QueryHelper.scala | 9 +++++++-- .../ignite/spark/JavaEmbeddedIgniteRDDSelfTest.java | 6 +++--- 6 files changed, 46 insertions(+), 15 deletions(-) diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteDataFrameSettings.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteDataFrameSettings.scala index e176721..4e0abf4 100644 --- a/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteDataFrameSettings.scala +++ b/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteDataFrameSettings.scala @@ -120,6 +120,25 @@ object IgniteDataFrameSettings { /** * Config option for saving data frame. * Internally all SQL inserts are done through `IgniteDataStreamer`. + * This options sets `skipStore` property of streamer. + * If `true` then write-through behavior will be disabled for data streaming. + * If `false` then write-through behavior will be enabled for data streaming. + * Default value if `false`. + * + * @example {{{ + * val igniteDF = spark.write.format(IGNITE) + * // other options ... + * .option(OPTION_STREAMER_SKIP_STORE, true) + * .save() + * }}} + * @see [[org.apache.ignite.IgniteDataStreamer]] + * @see [[org.apache.ignite.IgniteDataStreamer#skipStore(boolean)]] + */ + val OPTION_STREAMER_SKIP_STORE = "streamerSkipStore" + + /** + * Config option for saving data frame. + * Internally all SQL inserts are done through `IgniteDataStreamer`. * This options sets `autoFlushFrequency` property of streamer. * * @example {{{ diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala index 5fb81b6..0c38566 100644 --- a/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala +++ b/modules/spark/src/main/scala/org/apache/ignite/spark/IgniteRDD.scala @@ -17,7 +17,6 @@ package org.apache.ignite.spark import javax.cache.Cache - import org.apache.ignite.cache.query._ import org.apache.ignite.cluster.ClusterNode import org.apache.ignite.configuration.CacheConfiguration @@ -228,8 +227,9 @@ class IgniteRDD[K, V] ( * @param rdd RDD instance to save values from. * @param overwrite Boolean flag indicating whether the call on this method should overwrite existing * values in Ignite cache. + * @param skipStore Sets flag indicating that write-through behavior should be disabled for data streaming. */ - def savePairs(rdd: RDD[(K, V)], overwrite: Boolean = false) = { + def savePairs(rdd: RDD[(K, V)], overwrite: Boolean = false, skipStore: Boolean = false) = { rdd.foreachPartition(it ⇒ { val ig = ic.ignite() @@ -240,6 +240,7 @@ class IgniteRDD[K, V] ( try { streamer.allowOverwrite(overwrite) + streamer.skipStore(skipStore) it.foreach(tup ⇒ { streamer.addData(tup._1, tup._2) @@ -258,8 +259,9 @@ class IgniteRDD[K, V] ( * @param f Transformation function. * @param overwrite Boolean flag indicating whether the call on this method should overwrite existing * values in Ignite cache. + * @param skipStore Sets flag indicating that write-through behavior should be disabled for data streaming. */ - def savePairs[T](rdd: RDD[T], f: (T, IgniteContext) ⇒ (K, V), overwrite: Boolean) = { + def savePairs[T](rdd: RDD[T], f: (T, IgniteContext) ⇒ (K, V), overwrite: Boolean, skipStore: Boolean) = { rdd.foreachPartition(it ⇒ { val ig = ic.ignite() @@ -270,6 +272,7 @@ class IgniteRDD[K, V] ( try { streamer.allowOverwrite(overwrite) + streamer.skipStore(skipStore) it.foreach(t ⇒ { val tup = f(t, ic) @@ -290,7 +293,7 @@ class IgniteRDD[K, V] ( * @param f Transformation function. */ def savePairs[T](rdd: RDD[T], f: (T, IgniteContext) ⇒ (K, V)): Unit = { - savePairs(rdd, f, overwrite = false) + savePairs(rdd, f, overwrite = false, skipStore = false) } /** diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/JavaIgniteRDD.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/JavaIgniteRDD.scala index a44cb51..1937483 100644 --- a/modules/spark/src/main/scala/org/apache/ignite/spark/JavaIgniteRDD.scala +++ b/modules/spark/src/main/scala/org/apache/ignite/spark/JavaIgniteRDD.scala @@ -82,20 +82,21 @@ class JavaIgniteRDD[K, V](override val rdd: IgniteRDD[K, V]) def saveValues[T](jrdd: JavaRDD[T], f: (T, IgniteContext) ⇒ V) = rdd.saveValues(JavaRDD.toRDD(jrdd), f) - def savePairs(jrdd: JavaPairRDD[K, V], overwrite: Boolean) = { + def savePairs(jrdd: JavaPairRDD[K, V], overwrite: Boolean, skipStore: Boolean) = { val rrdd: RDD[(K, V)] = JavaPairRDD.toRDD(jrdd) - rdd.savePairs(rrdd, overwrite) + rdd.savePairs(rrdd, overwrite, skipStore) } - def savePairs(jrdd: JavaPairRDD[K, V]) : Unit = savePairs(jrdd, overwrite = false) + def savePairs(jrdd: JavaPairRDD[K, V]): Unit = savePairs(jrdd, overwrite = false, skipStore = false) - def savePairs[T](jrdd: JavaRDD[T], f: (T, IgniteContext) ⇒ (K, V), overwrite: Boolean = false) = { - rdd.savePairs(JavaRDD.toRDD(jrdd), f, overwrite) + def savePairs[T](jrdd: JavaRDD[T], f: (T, IgniteContext) ⇒ (K, V), overwrite: Boolean = false, + skipStore: Boolean = false) = { + rdd.savePairs(JavaRDD.toRDD(jrdd), f, overwrite, skipStore) } def savePairs[T](jrdd: JavaRDD[T], f: (T, IgniteContext) ⇒ (K, V)): Unit = - savePairs(jrdd, f, overwrite = false) + savePairs(jrdd, f, overwrite = false, skipStore = false) def clear(): Unit = rdd.clear() diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteRelationProvider.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteRelationProvider.scala index 039ca63..a4f6da1 100644 --- a/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteRelationProvider.scala +++ b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/IgniteRelationProvider.scala @@ -125,6 +125,7 @@ class IgniteRelationProvider extends RelationProvider params.get(OPTION_SCHEMA), ctx, params.get(OPTION_STREAMER_ALLOW_OVERWRITE).map(_.toBoolean), + params.get(OPTION_STREAMER_SKIP_STORE).map(_.toBoolean), params.get(OPTION_STREAMER_FLUSH_FREQUENCY).map(_.toLong), params.get(OPTION_STREAMER_PER_NODE_BUFFER_SIZE).map(_.toInt), params.get(OPTION_STREAMER_PER_NODE_PARALLEL_OPERATIONS).map(_.toInt)) @@ -135,6 +136,7 @@ class IgniteRelationProvider extends RelationProvider params.get(OPTION_SCHEMA), ctx, params.get(OPTION_STREAMER_ALLOW_OVERWRITE).map(_.toBoolean), + params.get(OPTION_STREAMER_SKIP_STORE).map(_.toBoolean), params.get(OPTION_STREAMER_FLUSH_FREQUENCY).map(_.toLong), params.get(OPTION_STREAMER_PER_NODE_BUFFER_SIZE).map(_.toInt), params.get(OPTION_STREAMER_PER_NODE_PARALLEL_OPERATIONS).map(_.toInt)) @@ -166,6 +168,7 @@ class IgniteRelationProvider extends RelationProvider params.get(OPTION_SCHEMA), ctx, params.get(OPTION_STREAMER_ALLOW_OVERWRITE).map(_.toBoolean), + params.get(OPTION_STREAMER_SKIP_STORE).map(_.toBoolean), params.get(OPTION_STREAMER_FLUSH_FREQUENCY).map(_.toLong), params.get(OPTION_STREAMER_PER_NODE_BUFFER_SIZE).map(_.toInt), params.get(OPTION_STREAMER_PER_NODE_PARALLEL_OPERATIONS).map(_.toInt)) diff --git a/modules/spark/src/main/scala/org/apache/ignite/spark/impl/QueryHelper.scala b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/QueryHelper.scala index f752b1a..d123b01 100644 --- a/modules/spark/src/main/scala/org/apache/ignite/spark/impl/QueryHelper.scala +++ b/modules/spark/src/main/scala/org/apache/ignite/spark/impl/QueryHelper.scala @@ -18,12 +18,12 @@ package org.apache.ignite.spark.impl import org.apache.ignite.cache.query.SqlFieldsQuery -import org.apache.ignite.spark.IgniteDataFrameSettings._ -import QueryUtils.{compileCreateTable, compileDropTable, compileInsert} import org.apache.ignite.internal.IgniteEx import org.apache.ignite.internal.processors.query.QueryTypeDescriptorImpl import org.apache.ignite.internal.processors.query.QueryUtils.DFLT_SCHEMA import org.apache.ignite.spark.IgniteContext +import org.apache.ignite.spark.IgniteDataFrameSettings._ +import org.apache.ignite.spark.impl.QueryUtils.{compileCreateTable, compileDropTable, compileInsert} import org.apache.ignite.{Ignite, IgniteException} import org.apache.spark.sql.types.StructType import org.apache.spark.sql.{DataFrame, Row} @@ -116,6 +116,7 @@ private[apache] object QueryHelper { schemaName: Option[String], ctx: IgniteContext, streamerAllowOverwrite: Option[Boolean], + streamerSkipStore: Option[Boolean], streamerFlushFrequency: Option[Long], streamerPerNodeBufferSize: Option[Int], streamerPerNodeParallelOperations: Option[Int] @@ -129,6 +130,7 @@ private[apache] object QueryHelper { schemaName, ctx, streamerAllowOverwrite, + streamerSkipStore, streamerFlushFrequency, streamerPerNodeBufferSize, streamerPerNodeParallelOperations @@ -160,6 +162,7 @@ private[apache] object QueryHelper { schemaName: Option[String], ctx: IgniteContext, streamerAllowOverwrite: Option[Boolean], + streamerSkipStore: Option[Boolean], streamerFlushFrequency: Option[Long], streamerPerNodeBufferSize: Option[Int], streamerPerNodeParallelOperations: Option[Int] @@ -170,6 +173,8 @@ private[apache] object QueryHelper { streamerAllowOverwrite.foreach(v ⇒ streamer.allowOverwrite(v)) + streamerSkipStore.foreach(v ⇒ streamer.skipStore(v)) + streamerFlushFrequency.foreach(v ⇒ streamer.autoFlushFrequency(v)) streamerPerNodeBufferSize.foreach(v ⇒ streamer.perNodeBufferSize(v)) diff --git a/modules/spark/src/test/java/org/apache/ignite/spark/JavaEmbeddedIgniteRDDSelfTest.java b/modules/spark/src/test/java/org/apache/ignite/spark/JavaEmbeddedIgniteRDDSelfTest.java index bf256c6..fbd4363 100644 --- a/modules/spark/src/test/java/org/apache/ignite/spark/JavaEmbeddedIgniteRDDSelfTest.java +++ b/modules/spark/src/test/java/org/apache/ignite/spark/JavaEmbeddedIgniteRDDSelfTest.java @@ -129,7 +129,7 @@ public class JavaEmbeddedIgniteRDDSelfTest extends GridCommonAbstractTest { ic = new JavaIgniteContext<>(sc, new IgniteConfigProvider(), false); ic.fromCache(PARTITIONED_CACHE_NAME) - .savePairs(sc.parallelize(F.range(0, KEYS_CNT), GRID_CNT).mapToPair(TO_PAIR_F), true); + .savePairs(sc.parallelize(F.range(0, KEYS_CNT), GRID_CNT).mapToPair(TO_PAIR_F), true, false); Ignite ignite = ic.ignite(); @@ -200,7 +200,7 @@ public class JavaEmbeddedIgniteRDDSelfTest extends GridCommonAbstractTest { JavaIgniteRDD<String, Entity> cache = ic.fromCache(PARTITIONED_CACHE_NAME); int cnt = 1001; - cache.savePairs(sc.parallelize(F.range(0, cnt), GRID_CNT).mapToPair(INT_TO_ENTITY_F), true); + cache.savePairs(sc.parallelize(F.range(0, cnt), GRID_CNT).mapToPair(INT_TO_ENTITY_F), true, false); List<Entity> res = cache.objectSql("Entity", "name = ? and salary = ?", "name50", 5000) .map(STR_ENTITY_PAIR_TO_ENTITY_F).collect(); @@ -238,7 +238,7 @@ public class JavaEmbeddedIgniteRDDSelfTest extends GridCommonAbstractTest { JavaIgniteRDD<String, Entity> cache = ic.fromCache(PARTITIONED_CACHE_NAME); - cache.savePairs(sc.parallelize(F.range(0, 1001), GRID_CNT).mapToPair(INT_TO_ENTITY_F), true); + cache.savePairs(sc.parallelize(F.range(0, 1001), GRID_CNT).mapToPair(INT_TO_ENTITY_F), true, false); Dataset<Row> df = cache.sql("select id, name, salary from Entity where name = ? and salary = ?", "name50", 5000);