spark git commit: [SPARK-18838][CORE] Add separate listener queues to LiveListenerBus.
Repository: spark Updated Branches: refs/heads/master 718bbc939 -> c6ff59a23 [SPARK-18838][CORE] Add separate listener queues to LiveListenerBus. This change modifies the live listener bus so that all listeners are added to queues; each queue has its own thread to dispatch events, making it possible to separate slow listeners from other more performance-sensitive ones. The public API has not changed - all listeners added with the existing "addListener" method, which after this change mostly means all user-defined listeners, end up in a default queue. Internally, there's an API allowing listeners to be added to specific queues, and that API is used to separate the internal Spark listeners into 3 categories: application status listeners (e.g. UI), executor management (e.g. dynamic allocation), and the event log. The queueing logic, while abstracted away in a separate class, is kept as much as possible hidden away from consumers. Aside from choosing their queue, there's no code change needed to take advantage of queues. Test coverage relies on existing tests; a few tests had to be tweaked because they relied on `LiveListenerBus.postToAll` being synchronous, and the change makes that method asynchronous. Other tests were simplified not to use the asynchronous LiveListenerBus. Author: Marcelo VanzinCloses #19211 from vanzin/SPARK-18838. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c6ff59a2 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c6ff59a2 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c6ff59a2 Branch: refs/heads/master Commit: c6ff59a230758b409fa9cc548b7d283eeb7ebe5d Parents: 718bbc9 Author: Marcelo Vanzin Authored: Wed Sep 20 13:41:29 2017 +0800 Committer: Wenchen Fan Committed: Wed Sep 20 13:41:29 2017 +0800 -- .../spark/ExecutorAllocationManager.scala | 2 +- .../org/apache/spark/HeartbeatReceiver.scala| 2 +- .../scala/org/apache/spark/SparkContext.scala | 13 +- .../spark/scheduler/AsyncEventQueue.scala | 196 + .../spark/scheduler/LiveListenerBus.scala | 277 --- .../scala/org/apache/spark/ui/SparkUI.scala | 24 +- .../spark/ExecutorAllocationManagerSuite.scala | 128 + .../spark/scheduler/DAGSchedulerSuite.scala | 2 +- .../scheduler/EventLoggingListenerSuite.scala | 6 +- .../spark/scheduler/SparkListenerSuite.scala| 95 --- .../spark/ui/storage/StorageTabSuite.scala | 4 +- .../streaming/StreamingQueryListenerBus.scala | 2 +- .../apache/spark/sql/internal/SharedState.scala | 3 +- .../spark/streaming/StreamingContext.scala | 3 +- .../scheduler/StreamingListenerBus.scala| 2 +- .../spark/streaming/StreamingContextSuite.scala | 4 +- 16 files changed, 473 insertions(+), 290 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/c6ff59a2/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala -- diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala index 7a5fb9a..119b426 100644 --- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala +++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala @@ -217,7 +217,7 @@ private[spark] class ExecutorAllocationManager( * the scheduling task. */ def start(): Unit = { -listenerBus.addListener(listener) +listenerBus.addToManagementQueue(listener) val scheduleTask = new Runnable() { override def run(): Unit = { http://git-wip-us.apache.org/repos/asf/spark/blob/c6ff59a2/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala -- diff --git a/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala b/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala index 5242ab6..ff960b3 100644 --- a/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala +++ b/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala @@ -63,7 +63,7 @@ private[spark] class HeartbeatReceiver(sc: SparkContext, clock: Clock) this(sc, new SystemClock) } - sc.addSparkListener(this) + sc.listenerBus.addToManagementQueue(this) override val rpcEnv: RpcEnv = sc.env.rpcEnv http://git-wip-us.apache.org/repos/asf/spark/blob/c6ff59a2/core/src/main/scala/org/apache/spark/SparkContext.scala -- diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 136f0af..1821bc8
spark git commit: [SPARK-22067][SQL] ArrowWriter should use position when setting UTF8String ByteBuffer
Repository: spark Updated Branches: refs/heads/master ee13f3e3d -> 718bbc939 [SPARK-22067][SQL] ArrowWriter should use position when setting UTF8String ByteBuffer ## What changes were proposed in this pull request? The ArrowWriter StringWriter was setting Arrow data using a position of 0 instead of the actual position in the ByteBuffer. This was currently working because of a bug ARROW-1443, and has been fixed as of Arrow 0.7.0. Testing with this version revealed the error in ArrowConvertersSuite test string conversion. ## How was this patch tested? Existing tests, manually verified working with Arrow 0.7.0 Author: Bryan CutlerCloses #19284 from BryanCutler/arrow-ArrowWriter-StringWriter-position-SPARK-22067. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/718bbc93 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/718bbc93 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/718bbc93 Branch: refs/heads/master Commit: 718bbc939037929ef5b8f4b4fe10aadfbab4408e Parents: ee13f3e Author: Bryan Cutler Authored: Wed Sep 20 10:51:00 2017 +0900 Committer: Takuya UESHIN Committed: Wed Sep 20 10:51:00 2017 +0900 -- .../scala/org/apache/spark/sql/execution/arrow/ArrowWriter.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/718bbc93/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowWriter.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowWriter.scala index 11ba04d..0b74073 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowWriter.scala @@ -234,8 +234,9 @@ private[arrow] class StringWriter(val valueVector: NullableVarCharVector) extend override def setValue(input: SpecializedGetters, ordinal: Int): Unit = { val utf8 = input.getUTF8String(ordinal) +val utf8ByteBuffer = utf8.getByteBuffer // todo: for off-heap UTF8String, how to pass in to arrow without copy? -valueMutator.setSafe(count, utf8.getByteBuffer, 0, utf8.numBytes()) +valueMutator.setSafe(count, utf8ByteBuffer, utf8ByteBuffer.position(), utf8.numBytes()) } } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-19318][SPARK-22041][SPARK-16625][BACKPORT-2.1][SQL] Docker test case failure: `: General data types to be mapped to Oracle`
Repository: spark Updated Branches: refs/heads/branch-2.1 30ce056d8 -> 56865a1e9 [SPARK-19318][SPARK-22041][SPARK-16625][BACKPORT-2.1][SQL] Docker test case failure: `: General data types to be mapped to Oracle` ## What changes were proposed in this pull request? This PR is backport of https://github.com/apache/spark/pull/16891 to Spark 2.1. ## How was this patch tested? unit tests Author: Yuming WangCloses #19259 from wangyum/SPARK-22041-BACKPORT-2.1. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/56865a1e Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/56865a1e Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/56865a1e Branch: refs/heads/branch-2.1 Commit: 56865a1e9319f18b83c7b7a10738f270d5b1dc50 Parents: 30ce056 Author: Yuming Wang Authored: Tue Sep 19 16:55:13 2017 -0700 Committer: gatorsmile Committed: Tue Sep 19 16:55:13 2017 -0700 -- .../spark/sql/jdbc/OracleIntegrationSuite.scala | 40 +++- .../spark/sql/catalyst/json/JSONOptions.scala | 4 +- .../sql/catalyst/util/CaseInsensitiveMap.scala | 31 .../sql/execution/datasources/DataSource.scala | 4 +- .../execution/datasources/csv/CSVOptions.scala | 6 +-- .../datasources/jdbc/JDBCOptions.scala | 10 ++-- .../datasources/parquet/ParquetOptions.scala| 4 +- .../execution/streaming/FileStreamOptions.scala | 4 +- .../org/apache/spark/sql/jdbc/JDBCSuite.scala | 50 +++- .../apache/spark/sql/jdbc/JDBCWriteSuite.scala | 13 + .../spark/sql/hive/HiveExternalCatalog.scala| 4 +- .../apache/spark/sql/hive/orc/OrcOptions.scala | 4 +- 12 files changed, 143 insertions(+), 31 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/56865a1e/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/OracleIntegrationSuite.scala -- diff --git a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/OracleIntegrationSuite.scala b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/OracleIntegrationSuite.scala index e111e17..3b773f9 100644 --- a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/OracleIntegrationSuite.scala +++ b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/OracleIntegrationSuite.scala @@ -67,10 +67,35 @@ class OracleIntegrationSuite extends DockerJDBCIntegrationSuite with SharedSQLCo conn.prepareStatement( "INSERT INTO numerics VALUES (4, 1.23, 99)").executeUpdate(); conn.commit(); - } +conn.prepareStatement("CREATE TABLE datetime (id NUMBER(10), d DATE, t TIMESTAMP)") + .executeUpdate() +conn.prepareStatement( + """INSERT INTO datetime VALUES +|(1, {d '1991-11-09'}, {ts '1996-01-01 01:23:45'}) + """.stripMargin.replaceAll("\n", " ")).executeUpdate() +conn.commit() + +sql( + s""" + |CREATE TEMPORARY VIEW datetime + |USING org.apache.spark.sql.jdbc + |OPTIONS (url '$jdbcUrl', dbTable 'datetime', oracle.jdbc.mapDateToTimestamp 'false') + """.stripMargin.replaceAll("\n", " ")) + +conn.prepareStatement("CREATE TABLE datetime1 (id NUMBER(10), d DATE, t TIMESTAMP)") + .executeUpdate() +conn.commit() + +sql( + s""" + |CREATE TEMPORARY VIEW datetime1 + |USING org.apache.spark.sql.jdbc + |OPTIONS (url '$jdbcUrl', dbTable 'datetime1', oracle.jdbc.mapDateToTimestamp 'false') + """.stripMargin.replaceAll("\n", " ")) + } - test("SPARK-16625 : Importing Oracle numeric types") { + test("SPARK-16625 : Importing Oracle numeric types") { val df = sqlContext.read.jdbc(jdbcUrl, "numerics", new Properties); val rows = df.collect() assert(rows.size == 1) @@ -172,4 +197,15 @@ class OracleIntegrationSuite extends DockerJDBCIntegrationSuite with SharedSQLCo assert(values.getDate(9).equals(dateVal)) assert(values.getTimestamp(10).equals(timestampVal)) } + + test("SPARK-19318: connection property keys should be case-sensitive") { +def checkRow(row: Row): Unit = { + assert(row.getDecimal(0).compareTo(BigDecimal.valueOf(1)) == 0) + assert(row.getDate(1).equals(Date.valueOf("1991-11-09"))) + assert(row.getTimestamp(2).equals(Timestamp.valueOf("1996-01-01 01:23:45"))) +} +checkRow(sql("SELECT * FROM datetime where id = 1").head()) +sql("INSERT INTO TABLE datetime1 SELECT * FROM datetime where id = 1") +checkRow(sql("SELECT * FROM datetime1 where id = 1").head()) + } }
spark git commit: [SPARK-21969][SQL] CommandUtils.updateTableStats should call refreshTable
Repository: spark Updated Branches: refs/heads/master d5aefa83a -> ee13f3e3d [SPARK-21969][SQL] CommandUtils.updateTableStats should call refreshTable ## What changes were proposed in this pull request? Tables in the catalog cache are not invalidated once their statistics are updated. As a consequence, existing sessions will use the cached information even though it is not valid anymore. Consider and an example below. ``` // step 1 spark.range(100).write.saveAsTable("tab1") // step 2 spark.sql("analyze table tab1 compute statistics") // step 3 spark.sql("explain cost select distinct * from tab1").show(false) // step 4 spark.range(100).write.mode("append").saveAsTable("tab1") // step 5 spark.sql("explain cost select distinct * from tab1").show(false) ``` After step 3, the table will be present in the catalog relation cache. Step 4 will correctly update the metadata inside the catalog but will NOT invalidate the cache. By the way, ``spark.sql("analyze table tab1 compute statistics")`` between step 3 and step 4 would also solve the problem. ## How was this patch tested? Current and additional unit tests. Author: aokolnychyiCloses #19252 from aokolnychyi/spark-21969. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ee13f3e3 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ee13f3e3 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ee13f3e3 Branch: refs/heads/master Commit: ee13f3e3dc3faa5152cefa91c22f8aaa8e425bb4 Parents: d5aefa8 Author: aokolnychyi Authored: Tue Sep 19 14:19:13 2017 -0700 Committer: gatorsmile Committed: Tue Sep 19 14:19:13 2017 -0700 -- .../sql/catalyst/catalog/SessionCatalog.scala | 2 + .../command/AnalyzeColumnCommand.scala | 3 - .../execution/command/AnalyzeTableCommand.scala | 2 - .../spark/sql/StatisticsCollectionSuite.scala | 73 .../sql/StatisticsCollectionTestBase.scala | 14 +++- 5 files changed, 87 insertions(+), 7 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/ee13f3e3/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala index 0908d68..9407b72 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala @@ -377,6 +377,8 @@ class SessionCatalog( requireDbExists(db) requireTableExists(tableIdentifier) externalCatalog.alterTableStats(db, table, newStats) +// Invalidate the table relation cache +refreshTable(identifier) } /** http://git-wip-us.apache.org/repos/asf/spark/blob/ee13f3e3/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala index 6588993..caf12ad 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala @@ -56,9 +56,6 @@ case class AnalyzeColumnCommand( sessionState.catalog.alterTableStats(tableIdentWithDB, Some(statistics)) -// Refresh the cached data source table in the catalog. -sessionState.catalog.refreshTable(tableIdentWithDB) - Seq.empty[Row] } http://git-wip-us.apache.org/repos/asf/spark/blob/ee13f3e3/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTableCommand.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTableCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTableCommand.scala index 04715bd..58b53e8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTableCommand.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTableCommand.scala @@ -48,8 +48,6 @@ case class AnalyzeTableCommand( val newStats = CommandUtils.compareAndGetNewStats(tableMeta.stats, newTotalSize, newRowCount) if (newStats.isDefined) { sessionState.catalog.alterTableStats(tableIdentWithDB, newStats) - // Refresh the cached data source table in
spark git commit: [SPARK-21338][SQL] implement isCascadingTruncateTable() method in AggregatedDialect
Repository: spark Updated Branches: refs/heads/master 2f962422a -> d5aefa83a [SPARK-21338][SQL] implement isCascadingTruncateTable() method in AggregatedDialect ## What changes were proposed in this pull request? org.apache.spark.sql.jdbc.JdbcDialect's method: def isCascadingTruncateTable(): Option[Boolean] = None is not overriden in org.apache.spark.sql.jdbc.AggregatedDialect class. Because of this issue, when you add more than one dialect Spark doesn't truncate table because isCascadingTruncateTable always returns default None for Aggregated Dialect. Will implement isCascadingTruncateTable in AggregatedDialect class in this PR. ## How was this patch tested? In JDBCSuite, inside test("Aggregated dialects"), will add one line to test AggregatedDialect.isCascadingTruncateTable Author: Huaxin GaoCloses #19256 from huaxingao/spark-21338. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d5aefa83 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d5aefa83 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d5aefa83 Branch: refs/heads/master Commit: d5aefa83ad8608fbea7c08e8d9164f8bee00863d Parents: 2f96242 Author: Huaxin Gao Authored: Tue Sep 19 09:27:05 2017 -0700 Committer: gatorsmile Committed: Tue Sep 19 09:27:05 2017 -0700 -- .../main/scala/org/apache/spark/sql/jdbc/AggregatedDialect.scala | 4 .../src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala | 2 ++ 2 files changed, 6 insertions(+) -- http://git-wip-us.apache.org/repos/asf/spark/blob/d5aefa83/sql/core/src/main/scala/org/apache/spark/sql/jdbc/AggregatedDialect.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/AggregatedDialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/AggregatedDialect.scala index 467d8d6..7432a15 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/AggregatedDialect.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/AggregatedDialect.scala @@ -41,4 +41,8 @@ private class AggregatedDialect(dialects: List[JdbcDialect]) extends JdbcDialect override def getJDBCType(dt: DataType): Option[JdbcType] = { dialects.flatMap(_.getJDBCType(dt)).headOption } + + override def isCascadingTruncateTable(): Option[Boolean] = { +dialects.flatMap(_.isCascadingTruncateTable()).reduceOption(_ || _) + } } http://git-wip-us.apache.org/repos/asf/spark/blob/d5aefa83/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala -- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala index 689f410..fd12bb9 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala @@ -740,11 +740,13 @@ class JDBCSuite extends SparkFunSuite } else { None } + override def isCascadingTruncateTable(): Option[Boolean] = Some(true) }, testH2Dialect)) assert(agg.canHandle("jdbc:h2:xxx")) assert(!agg.canHandle("jdbc:h2")) assert(agg.getCatalystType(0, "", 1, null) === Some(LongType)) assert(agg.getCatalystType(1, "", 1, null) === Some(StringType)) +assert(agg.isCascadingTruncateTable() === Some(true)) } test("DB2Dialect type mapping") { - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [MINOR][ML] Remove unnecessary default value setting for evaluators.
Repository: spark Updated Branches: refs/heads/master 8319432af -> 2f962422a [MINOR][ML] Remove unnecessary default value setting for evaluators. ## What changes were proposed in this pull request? Remove unnecessary default value setting for all evaluators, as we have set them in corresponding _HasXXX_ base classes. ## How was this patch tested? Existing tests. Author: Yanbo LiangCloses #19262 from yanboliang/evaluation. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2f962422 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2f962422 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2f962422 Branch: refs/heads/master Commit: 2f962422a25020582c915e15819f91f43c0b9d68 Parents: 8319432 Author: Yanbo Liang Authored: Tue Sep 19 22:22:35 2017 +0800 Committer: Yanbo Liang Committed: Tue Sep 19 22:22:35 2017 +0800 -- python/pyspark/ml/evaluation.py | 9 +++-- 1 file changed, 3 insertions(+), 6 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/2f962422/python/pyspark/ml/evaluation.py -- diff --git a/python/pyspark/ml/evaluation.py b/python/pyspark/ml/evaluation.py index 7cb8d62..09cdf9b 100644 --- a/python/pyspark/ml/evaluation.py +++ b/python/pyspark/ml/evaluation.py @@ -146,8 +146,7 @@ class BinaryClassificationEvaluator(JavaEvaluator, HasLabelCol, HasRawPrediction super(BinaryClassificationEvaluator, self).__init__() self._java_obj = self._new_java_obj( "org.apache.spark.ml.evaluation.BinaryClassificationEvaluator", self.uid) -self._setDefault(rawPredictionCol="rawPrediction", labelCol="label", - metricName="areaUnderROC") +self._setDefault(metricName="areaUnderROC") kwargs = self._input_kwargs self._set(**kwargs) @@ -224,8 +223,7 @@ class RegressionEvaluator(JavaEvaluator, HasLabelCol, HasPredictionCol, super(RegressionEvaluator, self).__init__() self._java_obj = self._new_java_obj( "org.apache.spark.ml.evaluation.RegressionEvaluator", self.uid) -self._setDefault(predictionCol="prediction", labelCol="label", - metricName="rmse") +self._setDefault(metricName="rmse") kwargs = self._input_kwargs self._set(**kwargs) @@ -297,8 +295,7 @@ class MulticlassClassificationEvaluator(JavaEvaluator, HasLabelCol, HasPredictio super(MulticlassClassificationEvaluator, self).__init__() self._java_obj = self._new_java_obj( "org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator", self.uid) -self._setDefault(predictionCol="prediction", labelCol="label", - metricName="f1") +self._setDefault(metricName="f1") kwargs = self._input_kwargs self._set(**kwargs) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-21917][CORE][YARN] Supporting adding http(s) resources in yarn mode
Repository: spark Updated Branches: refs/heads/master 581200af7 -> 8319432af [SPARK-21917][CORE][YARN] Supporting adding http(s) resources in yarn mode ## What changes were proposed in this pull request? In the current Spark, when submitting application on YARN with remote resources `./bin/spark-shell --jars http://central.maven.org/maven2/com/github/swagger-akka-http/swagger-akka-http_2.11/0.10.1/swagger-akka-http_2.11-0.10.1.jar --master yarn-client -v`, Spark will be failed with: ``` java.io.IOException: No FileSystem for scheme: http at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2586) at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2593) at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:91) at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2632) at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2614) at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:370) at org.apache.hadoop.fs.Path.getFileSystem(Path.java:296) at org.apache.spark.deploy.yarn.Client.copyFileToRemote(Client.scala:354) at org.apache.spark.deploy.yarn.Client.org$apache$spark$deploy$yarn$Client$$distribute$1(Client.scala:478) at org.apache.spark.deploy.yarn.Client$$anonfun$prepareLocalResources$11$$anonfun$apply$6.apply(Client.scala:600) at org.apache.spark.deploy.yarn.Client$$anonfun$prepareLocalResources$11$$anonfun$apply$6.apply(Client.scala:599) at scala.collection.mutable.ArraySeq.foreach(ArraySeq.scala:74) at org.apache.spark.deploy.yarn.Client$$anonfun$prepareLocalResources$11.apply(Client.scala:599) at org.apache.spark.deploy.yarn.Client$$anonfun$prepareLocalResources$11.apply(Client.scala:598) at scala.collection.immutable.List.foreach(List.scala:381) at org.apache.spark.deploy.yarn.Client.prepareLocalResources(Client.scala:598) at org.apache.spark.deploy.yarn.Client.createContainerLaunchContext(Client.scala:848) at org.apache.spark.deploy.yarn.Client.submitApplication(Client.scala:173) ``` This is because `YARN#client` assumes resources are on the Hadoop compatible FS. To fix this problem, here propose to download remote http(s) resources to local and add this local downloaded resources to dist cache. This solution has one downside: remote resources are downloaded and uploaded again, but it only restricted to only remote http(s) resources, also the overhead is not so big. The advantages of this solution is that it is simple and the code changes restricts to only `SparkSubmit`. ## How was this patch tested? Unit test added, also verified in local cluster. Author: jerryshaoCloses #19130 from jerryshao/SPARK-21917. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8319432a Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8319432a Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8319432a Branch: refs/heads/master Commit: 8319432af60b8e1dc00f08d794f7d80591e24d0c Parents: 581200a Author: jerryshao Authored: Tue Sep 19 22:20:05 2017 +0800 Committer: Wenchen Fan Committed: Tue Sep 19 22:20:05 2017 +0800 -- .../apache/spark/deploy/DependencyUtils.scala | 9 ++- .../org/apache/spark/deploy/SparkSubmit.scala | 51 ++- .../apache/spark/internal/config/package.scala | 10 +++ .../scala/org/apache/spark/util/Utils.scala | 3 + .../apache/spark/deploy/SparkSubmitSuite.scala | 65 docs/running-on-yarn.md | 9 +++ 6 files changed, 143 insertions(+), 4 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/8319432a/core/src/main/scala/org/apache/spark/deploy/DependencyUtils.scala -- diff --git a/core/src/main/scala/org/apache/spark/deploy/DependencyUtils.scala b/core/src/main/scala/org/apache/spark/deploy/DependencyUtils.scala index 51c3d9b..ecc82d7 100644 --- a/core/src/main/scala/org/apache/spark/deploy/DependencyUtils.scala +++ b/core/src/main/scala/org/apache/spark/deploy/DependencyUtils.scala @@ -94,7 +94,7 @@ private[deploy] object DependencyUtils { hadoopConf: Configuration, secMgr: SecurityManager): String = { require(fileList != null, "fileList cannot be null.") -fileList.split(",") +Utils.stringToSeq(fileList) .map(downloadFile(_, targetDir, sparkConf, hadoopConf, secMgr)) .mkString(",") } @@ -121,6 +121,11 @@ private[deploy] object DependencyUtils { uri.getScheme match { case "file" | "local" => path + case "http" | "https" | "ftp" if Utils.isTesting => +// This
spark git commit: [SPARK-21428][SQL][FOLLOWUP] CliSessionState should point to the actual metastore not a dummy one
Repository: spark Updated Branches: refs/heads/master 1bc17a6b8 -> 581200af7 [SPARK-21428][SQL][FOLLOWUP] CliSessionState should point to the actual metastore not a dummy one ## What changes were proposed in this pull request? While running bin/spark-sql, we will reuse cliSessionState, but the Hive configurations generated here just points to a dummy meta store which actually should be the real one. And the warehouse is determined later in SharedState, HiveClient should respect this config changing in this case too. ## How was this patch tested? existing ut cc cloud-fan jiangxb1987 Author: Kent YaoCloses #19068 from yaooqinn/SPARK-21428-FOLLOWUP. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/581200af Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/581200af Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/581200af Branch: refs/heads/master Commit: 581200af717bcefd11c9930ac063fe53c6fd2fde Parents: 1bc17a6 Author: Kent Yao Authored: Tue Sep 19 19:35:36 2017 +0800 Committer: Wenchen Fan Committed: Tue Sep 19 19:35:36 2017 +0800 -- .../sql/hive/thriftserver/SparkSQLCLIDriver.scala| 14 +++--- .../scala/org/apache/spark/sql/hive/HiveUtils.scala | 6 +++--- .../spark/sql/hive/client/HiveClientImpl.scala | 15 +-- .../spark/sql/hive/client/HiveVersionSuite.scala | 2 +- .../apache/spark/sql/hive/client/VersionsSuite.scala | 2 +- 5 files changed, 29 insertions(+), 10 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/581200af/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala -- diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala index 761e832..832a15d 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala @@ -37,6 +37,8 @@ import org.apache.hadoop.hive.ql.session.SessionState import org.apache.log4j.{Level, Logger} import org.apache.thrift.transport.TSocket +import org.apache.spark.SparkConf +import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.internal.Logging import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.hive.HiveUtils @@ -81,11 +83,17 @@ private[hive] object SparkSQLCLIDriver extends Logging { System.exit(1) } +val sparkConf = new SparkConf(loadDefaults = true) +val hadoopConf = SparkHadoopUtil.get.newConfiguration(sparkConf) +val extraConfigs = HiveUtils.formatTimeVarsForHiveClient(hadoopConf) + val cliConf = new HiveConf(classOf[SessionState]) -// Override the location of the metastore since this is only used for local execution. -HiveUtils.newTemporaryConfiguration(useInMemoryDerby = false).foreach { - case (key, value) => cliConf.set(key, value) +(hadoopConf.iterator().asScala.map(kv => kv.getKey -> kv.getValue) + ++ sparkConf.getAll.toMap ++ extraConfigs).foreach { + case (k, v) => +cliConf.set(k, v) } + val sessionState = new CliSessionState(cliConf) sessionState.in = System.in http://git-wip-us.apache.org/repos/asf/spark/blob/581200af/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala -- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala index 561c127..80b9a3d 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala @@ -176,9 +176,9 @@ private[spark] object HiveUtils extends Logging { } /** - * Configurations needed to create a [[HiveClient]]. + * Change time configurations needed to create a [[HiveClient]] into unified [[Long]] format. */ - private[hive] def hiveClientConfigurations(hadoopConf: Configuration): Map[String, String] = { + private[hive] def formatTimeVarsForHiveClient(hadoopConf: Configuration): Map[String, String] = { // Hive 0.14.0 introduces timeout operations in HiveConf, and changes default values of a bunch // of time `ConfVar`s by adding time suffixes (`s`, `ms`, and `d` etc.). This breaks backwards- // compatibility when users are trying to connecting to a Hive metastore of lower version, @@ -280,7 +280,7 @@ private[spark] object
spark git commit: [SPARK-22052] Incorrect Metric assigned in MetricsReporter.scala
Repository: spark Updated Branches: refs/heads/branch-2.2 d0234ebcf -> 6764408f6 [SPARK-22052] Incorrect Metric assigned in MetricsReporter.scala Current implementation for processingRate-total uses wrong metric: mistakenly uses inputRowsPerSecond instead of processedRowsPerSecond ## What changes were proposed in this pull request? Adjust processingRate-total from using inputRowsPerSecond to processedRowsPerSecond ## How was this patch tested? Built spark from source with proposed change and tested output with correct parameter. Before change the csv metrics file for inputRate-total and processingRate-total displayed the same values due to the error. After changing MetricsReporter.scala the processingRate-total csv file displayed the correct metric. https://user-images.githubusercontent.com/32072374/30554340-82eea12c-9ca4-11e7-8370-8168526ff9a2.png;> Please review http://spark.apache.org/contributing.html before opening a pull request. Author: Taaffy <32072374+taa...@users.noreply.github.com> Closes #19268 from Taaffy/patch-1. (cherry picked from commit 1bc17a6b8add02772a8a0a1048ac6a01d045baf4) Signed-off-by: Sean OwenProject: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6764408f Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6764408f Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6764408f Branch: refs/heads/branch-2.2 Commit: 6764408f68495e2ca7c1b9959db53ee12cabb197 Parents: d0234eb Author: Taaffy <32072374+taa...@users.noreply.github.com> Authored: Tue Sep 19 10:20:04 2017 +0100 Committer: Sean Owen Committed: Tue Sep 19 10:20:14 2017 +0100 -- .../org/apache/spark/sql/execution/streaming/MetricsReporter.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/6764408f/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetricsReporter.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetricsReporter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetricsReporter.scala index 5551d12..b84e6ce 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetricsReporter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetricsReporter.scala @@ -40,7 +40,7 @@ class MetricsReporter( // Metric names should not have . in them, so that all the metrics of a query are identified // together in Ganglia as a single metric group registerGauge("inputRate-total", () => stream.lastProgress.inputRowsPerSecond) - registerGauge("processingRate-total", () => stream.lastProgress.inputRowsPerSecond) + registerGauge("processingRate-total", () => stream.lastProgress.processedRowsPerSecond) registerGauge("latency", () => stream.lastProgress.durationMs.get("triggerExecution").longValue()) private def registerGauge[T](name: String, f: () => T)(implicit num: Numeric[T]): Unit = { - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-22052] Incorrect Metric assigned in MetricsReporter.scala
Repository: spark Updated Branches: refs/heads/master 7c92351f4 -> 1bc17a6b8 [SPARK-22052] Incorrect Metric assigned in MetricsReporter.scala Current implementation for processingRate-total uses wrong metric: mistakenly uses inputRowsPerSecond instead of processedRowsPerSecond ## What changes were proposed in this pull request? Adjust processingRate-total from using inputRowsPerSecond to processedRowsPerSecond ## How was this patch tested? Built spark from source with proposed change and tested output with correct parameter. Before change the csv metrics file for inputRate-total and processingRate-total displayed the same values due to the error. After changing MetricsReporter.scala the processingRate-total csv file displayed the correct metric. https://user-images.githubusercontent.com/32072374/30554340-82eea12c-9ca4-11e7-8370-8168526ff9a2.png;> Please review http://spark.apache.org/contributing.html before opening a pull request. Author: Taaffy <32072374+taa...@users.noreply.github.com> Closes #19268 from Taaffy/patch-1. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1bc17a6b Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1bc17a6b Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1bc17a6b Branch: refs/heads/master Commit: 1bc17a6b8add02772a8a0a1048ac6a01d045baf4 Parents: 7c92351 Author: Taaffy <32072374+taa...@users.noreply.github.com> Authored: Tue Sep 19 10:20:04 2017 +0100 Committer: Sean OwenCommitted: Tue Sep 19 10:20:04 2017 +0100 -- .../org/apache/spark/sql/execution/streaming/MetricsReporter.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/1bc17a6b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetricsReporter.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetricsReporter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetricsReporter.scala index 5551d12..b84e6ce 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetricsReporter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetricsReporter.scala @@ -40,7 +40,7 @@ class MetricsReporter( // Metric names should not have . in them, so that all the metrics of a query are identified // together in Ganglia as a single metric group registerGauge("inputRate-total", () => stream.lastProgress.inputRowsPerSecond) - registerGauge("processingRate-total", () => stream.lastProgress.inputRowsPerSecond) + registerGauge("processingRate-total", () => stream.lastProgress.processedRowsPerSecond) registerGauge("latency", () => stream.lastProgress.durationMs.get("triggerExecution").longValue()) private def registerGauge[T](name: String, f: () => T)(implicit num: Numeric[T]): Unit = { - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [MINOR][CORE] Cleanup dead code and duplication in Mem. Management
Repository: spark Updated Branches: refs/heads/master a11db942a -> 7c92351f4 [MINOR][CORE] Cleanup dead code and duplication in Mem. Management ## What changes were proposed in this pull request? * Removed the method `org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter#alignToWords`. It became unused as a result of 85b0a157543201895557d66306b38b3ca52f2151 (SPARK-15962) introducing word alignment for unsafe arrays. * Cleaned up duplicate code in memory management and unsafe sorters * The change extracting the exception paths is more than just cosmetics since it def. reduces the size the affected methods compile to ## How was this patch tested? * Build still passes after removing the method, grepping the codebase for `alignToWords` shows no reference to it anywhere either. * Dried up code is covered by existing tests. Author: ArminCloses #19254 from original-brownbear/cleanup-mem-consumer. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7c92351f Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7c92351f Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7c92351f Branch: refs/heads/master Commit: 7c92351f43ac4b1710e3c80c78f7978dad491ed2 Parents: a11db94 Author: Armin Authored: Tue Sep 19 10:06:32 2017 +0100 Committer: Sean Owen Committed: Tue Sep 19 10:06:32 2017 +0100 -- .../org/apache/spark/memory/MemoryConsumer.java | 26 .../spark/unsafe/map/BytesToBytesMap.java | 24 ++- .../unsafe/sort/UnsafeExternalSorter.java | 32 +--- .../expressions/codegen/UnsafeRowWriter.java| 16 -- 4 files changed, 37 insertions(+), 61 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/7c92351f/core/src/main/java/org/apache/spark/memory/MemoryConsumer.java -- diff --git a/core/src/main/java/org/apache/spark/memory/MemoryConsumer.java b/core/src/main/java/org/apache/spark/memory/MemoryConsumer.java index 4099fb0..0efae16 100644 --- a/core/src/main/java/org/apache/spark/memory/MemoryConsumer.java +++ b/core/src/main/java/org/apache/spark/memory/MemoryConsumer.java @@ -89,13 +89,7 @@ public abstract class MemoryConsumer { long required = size * 8L; MemoryBlock page = taskMemoryManager.allocatePage(required, this); if (page == null || page.size() < required) { - long got = 0; - if (page != null) { -got = page.size(); -taskMemoryManager.freePage(page, this); - } - taskMemoryManager.showMemoryUsage(); - throw new OutOfMemoryError("Unable to acquire " + required + " bytes of memory, got " + got); + throwOom(page, required); } used += required; return new LongArray(page); @@ -116,13 +110,7 @@ public abstract class MemoryConsumer { protected MemoryBlock allocatePage(long required) { MemoryBlock page = taskMemoryManager.allocatePage(Math.max(pageSize, required), this); if (page == null || page.size() < required) { - long got = 0; - if (page != null) { -got = page.size(); -taskMemoryManager.freePage(page, this); - } - taskMemoryManager.showMemoryUsage(); - throw new OutOfMemoryError("Unable to acquire " + required + " bytes of memory, got " + got); + throwOom(page, required); } used += page.size(); return page; @@ -152,4 +140,14 @@ public abstract class MemoryConsumer { taskMemoryManager.releaseExecutionMemory(size, this); used -= size; } + + private void throwOom(final MemoryBlock page, final long required) { +long got = 0; +if (page != null) { + got = page.size(); + taskMemoryManager.freePage(page, this); +} +taskMemoryManager.showMemoryUsage(); +throw new OutOfMemoryError("Unable to acquire " + required + " bytes of memory, got " + got); + } } http://git-wip-us.apache.org/repos/asf/spark/blob/7c92351f/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java -- diff --git a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java index 610ace3..4fadfe3 100644 --- a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java +++ b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java @@ -283,13 +283,7 @@ public final class BytesToBytesMap extends MemoryConsumer { } else { currentPage = null; if (reader != null) { -// remove the spill file from disk -File file = spillWriters.removeFirst().getFile(); -if (file != null && file.exists()) { -
spark git commit: [SPARK-21923][CORE] Avoid calling reserveUnrollMemoryForThisTask for every record
Repository: spark Updated Branches: refs/heads/master 10f45b3c8 -> a11db942a [SPARK-21923][CORE] Avoid calling reserveUnrollMemoryForThisTask for every record ## What changes were proposed in this pull request? When Spark persist data to Unsafe memory, we call the method `MemoryStore.putIteratorAsBytes`, which need synchronize the `memoryManager` for every record write. This implementation is not necessary, we can apply for more memory at a time to reduce unnecessary synchronization. ## How was this patch tested? Test case (with 1 executor 20 core): ```scala val start = System.currentTimeMillis() val data = sc.parallelize(0 until Integer.MAX_VALUE, 100) .persist(StorageLevel.OFF_HEAP) .count() println(System.currentTimeMillis() - start) ``` Test result: before | 27647 | 29108 | 28591 | 28264 | 27232 | after | 26868 | 26358 | 27767 | 26653 | 26693 | Author: Xianyang LiuCloses #19135 from ConeyLiu/memorystore. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a11db942 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a11db942 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a11db942 Branch: refs/heads/master Commit: a11db942aaf4c470a85f8a1b180f034f7a584254 Parents: 10f45b3 Author: Xianyang Liu Authored: Tue Sep 19 14:51:27 2017 +0800 Committer: Wenchen Fan Committed: Tue Sep 19 14:51:27 2017 +0800 -- .../apache/spark/internal/config/package.scala| 15 +++ .../apache/spark/storage/memory/MemoryStore.scala | 18 ++ 2 files changed, 29 insertions(+), 4 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/a11db942/core/src/main/scala/org/apache/spark/internal/config/package.scala -- diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index 0d3769a..e0f6960 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -385,4 +385,19 @@ package object config { .checkValue(v => v > 0 && v <= Int.MaxValue, s"The buffer size must be greater than 0 and less than ${Int.MaxValue}.") .createWithDefault(1024 * 1024) + + private[spark] val UNROLL_MEMORY_CHECK_PERIOD = +ConfigBuilder("spark.storage.unrollMemoryCheckPeriod") + .internal() + .doc("The memory check period is used to determine how often we should check whether " ++ "there is a need to request more memory when we try to unroll the given block in memory.") + .longConf + .createWithDefault(16) + + private[spark] val UNROLL_MEMORY_GROWTH_FACTOR = +ConfigBuilder("spark.storage.unrollMemoryGrowthFactor") + .internal() + .doc("Memory to request as a multiple of the size that used to unroll the block.") + .doubleConf + .createWithDefault(1.5) } http://git-wip-us.apache.org/repos/asf/spark/blob/a11db942/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala -- diff --git a/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala b/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala index 90e3af2..eb2201d 100644 --- a/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala @@ -29,6 +29,7 @@ import com.google.common.io.ByteStreams import org.apache.spark.{SparkConf, TaskContext} import org.apache.spark.internal.Logging +import org.apache.spark.internal.config.{UNROLL_MEMORY_CHECK_PERIOD, UNROLL_MEMORY_GROWTH_FACTOR} import org.apache.spark.memory.{MemoryManager, MemoryMode} import org.apache.spark.serializer.{SerializationStream, SerializerManager} import org.apache.spark.storage.{BlockId, BlockInfoManager, StorageLevel, StreamBlockId} @@ -190,11 +191,11 @@ private[spark] class MemoryStore( // Initial per-task memory to request for unrolling blocks (bytes). val initialMemoryThreshold = unrollMemoryThreshold // How often to check whether we need to request more memory -val memoryCheckPeriod = 16 +val memoryCheckPeriod = conf.get(UNROLL_MEMORY_CHECK_PERIOD) // Memory currently reserved by this task for this particular unrolling operation var memoryThreshold = initialMemoryThreshold // Memory to request as a multiple of current vector size -val memoryGrowthFactor = 1.5 +val memoryGrowthFactor = conf.get(UNROLL_MEMORY_GROWTH_FACTOR) // Keep track of unroll memory used by this