spark git commit: [SPARK-18838][CORE] Add separate listener queues to LiveListenerBus.

2017-09-19 Thread wenchen
Repository: spark
Updated Branches:
  refs/heads/master 718bbc939 -> c6ff59a23


[SPARK-18838][CORE] Add separate listener queues to LiveListenerBus.

This change modifies the live listener bus so that all listeners are
added to queues; each queue has its own thread to dispatch events,
making it possible to separate slow listeners from other more
performance-sensitive ones.

The public API has not changed - all listeners added with the existing
"addListener" method, which after this change mostly means all
user-defined listeners, end up in a default queue. Internally, there's
an API allowing listeners to be added to specific queues, and that API
is used to separate the internal Spark listeners into 3 categories:
application status listeners (e.g. UI), executor management (e.g. dynamic
allocation), and the event log.

The queueing logic, while abstracted away in a separate class, is kept
as much as possible hidden away from consumers. Aside from choosing their
queue, there's no code change needed to take advantage of queues.

Test coverage relies on existing tests; a few tests had to be tweaked
because they relied on `LiveListenerBus.postToAll` being synchronous,
and the change makes that method asynchronous. Other tests were simplified
not to use the asynchronous LiveListenerBus.

Author: Marcelo Vanzin 

Closes #19211 from vanzin/SPARK-18838.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c6ff59a2
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c6ff59a2
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c6ff59a2

Branch: refs/heads/master
Commit: c6ff59a230758b409fa9cc548b7d283eeb7ebe5d
Parents: 718bbc9
Author: Marcelo Vanzin 
Authored: Wed Sep 20 13:41:29 2017 +0800
Committer: Wenchen Fan 
Committed: Wed Sep 20 13:41:29 2017 +0800

--
 .../spark/ExecutorAllocationManager.scala   |   2 +-
 .../org/apache/spark/HeartbeatReceiver.scala|   2 +-
 .../scala/org/apache/spark/SparkContext.scala   |  13 +-
 .../spark/scheduler/AsyncEventQueue.scala   | 196 +
 .../spark/scheduler/LiveListenerBus.scala   | 277 ---
 .../scala/org/apache/spark/ui/SparkUI.scala |  24 +-
 .../spark/ExecutorAllocationManagerSuite.scala  | 128 +
 .../spark/scheduler/DAGSchedulerSuite.scala |   2 +-
 .../scheduler/EventLoggingListenerSuite.scala   |   6 +-
 .../spark/scheduler/SparkListenerSuite.scala|  95 ---
 .../spark/ui/storage/StorageTabSuite.scala  |   4 +-
 .../streaming/StreamingQueryListenerBus.scala   |   2 +-
 .../apache/spark/sql/internal/SharedState.scala |   3 +-
 .../spark/streaming/StreamingContext.scala  |   3 +-
 .../scheduler/StreamingListenerBus.scala|   2 +-
 .../spark/streaming/StreamingContextSuite.scala |   4 +-
 16 files changed, 473 insertions(+), 290 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/c6ff59a2/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
--
diff --git 
a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala 
b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
index 7a5fb9a..119b426 100644
--- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
+++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
@@ -217,7 +217,7 @@ private[spark] class ExecutorAllocationManager(
* the scheduling task.
*/
   def start(): Unit = {
-listenerBus.addListener(listener)
+listenerBus.addToManagementQueue(listener)
 
 val scheduleTask = new Runnable() {
   override def run(): Unit = {

http://git-wip-us.apache.org/repos/asf/spark/blob/c6ff59a2/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala
--
diff --git a/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala 
b/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala
index 5242ab6..ff960b3 100644
--- a/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala
+++ b/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala
@@ -63,7 +63,7 @@ private[spark] class HeartbeatReceiver(sc: SparkContext, 
clock: Clock)
 this(sc, new SystemClock)
   }
 
-  sc.addSparkListener(this)
+  sc.listenerBus.addToManagementQueue(this)
 
   override val rpcEnv: RpcEnv = sc.env.rpcEnv
 

http://git-wip-us.apache.org/repos/asf/spark/blob/c6ff59a2/core/src/main/scala/org/apache/spark/SparkContext.scala
--
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala 
b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 136f0af..1821bc8 

spark git commit: [SPARK-22067][SQL] ArrowWriter should use position when setting UTF8String ByteBuffer

2017-09-19 Thread ueshin
Repository: spark
Updated Branches:
  refs/heads/master ee13f3e3d -> 718bbc939


[SPARK-22067][SQL] ArrowWriter should use position when setting UTF8String 
ByteBuffer

## What changes were proposed in this pull request?

The ArrowWriter StringWriter was setting Arrow data using a position of 0 
instead of the actual position in the ByteBuffer.  This was currently working 
because of a bug ARROW-1443, and has been fixed as of
Arrow 0.7.0.  Testing with this version revealed the error in 
ArrowConvertersSuite test string conversion.

## How was this patch tested?

Existing tests, manually verified working with Arrow 0.7.0

Author: Bryan Cutler 

Closes #19284 from 
BryanCutler/arrow-ArrowWriter-StringWriter-position-SPARK-22067.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/718bbc93
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/718bbc93
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/718bbc93

Branch: refs/heads/master
Commit: 718bbc939037929ef5b8f4b4fe10aadfbab4408e
Parents: ee13f3e
Author: Bryan Cutler 
Authored: Wed Sep 20 10:51:00 2017 +0900
Committer: Takuya UESHIN 
Committed: Wed Sep 20 10:51:00 2017 +0900

--
 .../scala/org/apache/spark/sql/execution/arrow/ArrowWriter.scala  | 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/718bbc93/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowWriter.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowWriter.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowWriter.scala
index 11ba04d..0b74073 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowWriter.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowWriter.scala
@@ -234,8 +234,9 @@ private[arrow] class StringWriter(val valueVector: 
NullableVarCharVector) extend
 
   override def setValue(input: SpecializedGetters, ordinal: Int): Unit = {
 val utf8 = input.getUTF8String(ordinal)
+val utf8ByteBuffer = utf8.getByteBuffer
 // todo: for off-heap UTF8String, how to pass in to arrow without copy?
-valueMutator.setSafe(count, utf8.getByteBuffer, 0, utf8.numBytes())
+valueMutator.setSafe(count, utf8ByteBuffer, utf8ByteBuffer.position(), 
utf8.numBytes())
   }
 }
 


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-19318][SPARK-22041][SPARK-16625][BACKPORT-2.1][SQL] Docker test case failure: `: General data types to be mapped to Oracle`

2017-09-19 Thread lixiao
Repository: spark
Updated Branches:
  refs/heads/branch-2.1 30ce056d8 -> 56865a1e9


[SPARK-19318][SPARK-22041][SPARK-16625][BACKPORT-2.1][SQL] Docker test case 
failure: `: General data types to be mapped to Oracle`

## What changes were proposed in this pull request?

This PR is backport of https://github.com/apache/spark/pull/16891 to Spark 2.1.

## How was this patch tested?

unit tests

Author: Yuming Wang 

Closes #19259 from wangyum/SPARK-22041-BACKPORT-2.1.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/56865a1e
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/56865a1e
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/56865a1e

Branch: refs/heads/branch-2.1
Commit: 56865a1e9319f18b83c7b7a10738f270d5b1dc50
Parents: 30ce056
Author: Yuming Wang 
Authored: Tue Sep 19 16:55:13 2017 -0700
Committer: gatorsmile 
Committed: Tue Sep 19 16:55:13 2017 -0700

--
 .../spark/sql/jdbc/OracleIntegrationSuite.scala | 40 +++-
 .../spark/sql/catalyst/json/JSONOptions.scala   |  4 +-
 .../sql/catalyst/util/CaseInsensitiveMap.scala  | 31 
 .../sql/execution/datasources/DataSource.scala  |  4 +-
 .../execution/datasources/csv/CSVOptions.scala  |  6 +--
 .../datasources/jdbc/JDBCOptions.scala  | 10 ++--
 .../datasources/parquet/ParquetOptions.scala|  4 +-
 .../execution/streaming/FileStreamOptions.scala |  4 +-
 .../org/apache/spark/sql/jdbc/JDBCSuite.scala   | 50 +++-
 .../apache/spark/sql/jdbc/JDBCWriteSuite.scala  | 13 +
 .../spark/sql/hive/HiveExternalCatalog.scala|  4 +-
 .../apache/spark/sql/hive/orc/OrcOptions.scala  |  4 +-
 12 files changed, 143 insertions(+), 31 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/56865a1e/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/OracleIntegrationSuite.scala
--
diff --git 
a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/OracleIntegrationSuite.scala
 
b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/OracleIntegrationSuite.scala
index e111e17..3b773f9 100644
--- 
a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/OracleIntegrationSuite.scala
+++ 
b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/OracleIntegrationSuite.scala
@@ -67,10 +67,35 @@ class OracleIntegrationSuite extends 
DockerJDBCIntegrationSuite with SharedSQLCo
 conn.prepareStatement(
   "INSERT INTO numerics VALUES (4, 1.23, 99)").executeUpdate();
 conn.commit();
-  }
 
+conn.prepareStatement("CREATE TABLE datetime (id NUMBER(10), d DATE, t 
TIMESTAMP)")
+  .executeUpdate()
+conn.prepareStatement(
+  """INSERT INTO datetime VALUES
+|(1, {d '1991-11-09'}, {ts '1996-01-01 01:23:45'})
+  """.stripMargin.replaceAll("\n", " ")).executeUpdate()
+conn.commit()
+
+sql(
+  s"""
+ |CREATE TEMPORARY VIEW datetime
+ |USING org.apache.spark.sql.jdbc
+ |OPTIONS (url '$jdbcUrl', dbTable 'datetime', 
oracle.jdbc.mapDateToTimestamp 'false')
+  """.stripMargin.replaceAll("\n", " "))
+
+conn.prepareStatement("CREATE TABLE datetime1 (id NUMBER(10), d DATE, t 
TIMESTAMP)")
+  .executeUpdate()
+conn.commit()
+
+sql(
+  s"""
+ |CREATE TEMPORARY VIEW datetime1
+ |USING org.apache.spark.sql.jdbc
+ |OPTIONS (url '$jdbcUrl', dbTable 'datetime1', 
oracle.jdbc.mapDateToTimestamp 'false')
+  """.stripMargin.replaceAll("\n", " "))
+  }
 
-  test("SPARK-16625 : Importing Oracle numeric types") { 
+  test("SPARK-16625 : Importing Oracle numeric types") {
 val df = sqlContext.read.jdbc(jdbcUrl, "numerics", new Properties);
 val rows = df.collect()
 assert(rows.size == 1)
@@ -172,4 +197,15 @@ class OracleIntegrationSuite extends 
DockerJDBCIntegrationSuite with SharedSQLCo
 assert(values.getDate(9).equals(dateVal))
 assert(values.getTimestamp(10).equals(timestampVal))
   }
+
+  test("SPARK-19318: connection property keys should be case-sensitive") {
+def checkRow(row: Row): Unit = {
+  assert(row.getDecimal(0).compareTo(BigDecimal.valueOf(1)) == 0)
+  assert(row.getDate(1).equals(Date.valueOf("1991-11-09")))
+  assert(row.getTimestamp(2).equals(Timestamp.valueOf("1996-01-01 
01:23:45")))
+}
+checkRow(sql("SELECT * FROM datetime where id = 1").head())
+sql("INSERT INTO TABLE datetime1 SELECT * FROM datetime where id = 1")
+checkRow(sql("SELECT * FROM datetime1 where id = 1").head())
+  }
 }


spark git commit: [SPARK-21969][SQL] CommandUtils.updateTableStats should call refreshTable

2017-09-19 Thread lixiao
Repository: spark
Updated Branches:
  refs/heads/master d5aefa83a -> ee13f3e3d


[SPARK-21969][SQL] CommandUtils.updateTableStats should call refreshTable

## What changes were proposed in this pull request?

Tables in the catalog cache are not invalidated once their statistics are 
updated. As a consequence, existing sessions will use the cached information 
even though it is not valid anymore. Consider and an example below.

```
// step 1
spark.range(100).write.saveAsTable("tab1")
// step 2
spark.sql("analyze table tab1 compute statistics")
// step 3
spark.sql("explain cost select distinct * from tab1").show(false)
// step 4
spark.range(100).write.mode("append").saveAsTable("tab1")
// step 5
spark.sql("explain cost select distinct * from tab1").show(false)
```

After step 3, the table will be present in the catalog relation cache. Step 4 
will correctly update the metadata inside the catalog but will NOT invalidate 
the cache.

By the way, ``spark.sql("analyze table tab1 compute statistics")`` between step 
3 and step 4 would also solve the problem.

## How was this patch tested?

Current and additional unit tests.

Author: aokolnychyi 

Closes #19252 from aokolnychyi/spark-21969.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ee13f3e3
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ee13f3e3
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ee13f3e3

Branch: refs/heads/master
Commit: ee13f3e3dc3faa5152cefa91c22f8aaa8e425bb4
Parents: d5aefa8
Author: aokolnychyi 
Authored: Tue Sep 19 14:19:13 2017 -0700
Committer: gatorsmile 
Committed: Tue Sep 19 14:19:13 2017 -0700

--
 .../sql/catalyst/catalog/SessionCatalog.scala   |  2 +
 .../command/AnalyzeColumnCommand.scala  |  3 -
 .../execution/command/AnalyzeTableCommand.scala |  2 -
 .../spark/sql/StatisticsCollectionSuite.scala   | 73 
 .../sql/StatisticsCollectionTestBase.scala  | 14 +++-
 5 files changed, 87 insertions(+), 7 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/ee13f3e3/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
index 0908d68..9407b72 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
@@ -377,6 +377,8 @@ class SessionCatalog(
 requireDbExists(db)
 requireTableExists(tableIdentifier)
 externalCatalog.alterTableStats(db, table, newStats)
+// Invalidate the table relation cache
+refreshTable(identifier)
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/ee13f3e3/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala
index 6588993..caf12ad 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala
@@ -56,9 +56,6 @@ case class AnalyzeColumnCommand(
 
 sessionState.catalog.alterTableStats(tableIdentWithDB, Some(statistics))
 
-// Refresh the cached data source table in the catalog.
-sessionState.catalog.refreshTable(tableIdentWithDB)
-
 Seq.empty[Row]
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/ee13f3e3/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTableCommand.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTableCommand.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTableCommand.scala
index 04715bd..58b53e8 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTableCommand.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTableCommand.scala
@@ -48,8 +48,6 @@ case class AnalyzeTableCommand(
 val newStats = CommandUtils.compareAndGetNewStats(tableMeta.stats, 
newTotalSize, newRowCount)
 if (newStats.isDefined) {
   sessionState.catalog.alterTableStats(tableIdentWithDB, newStats)
-  // Refresh the cached data source table in 

spark git commit: [SPARK-21338][SQL] implement isCascadingTruncateTable() method in AggregatedDialect

2017-09-19 Thread lixiao
Repository: spark
Updated Branches:
  refs/heads/master 2f962422a -> d5aefa83a


[SPARK-21338][SQL] implement isCascadingTruncateTable() method in 
AggregatedDialect

## What changes were proposed in this pull request?

org.apache.spark.sql.jdbc.JdbcDialect's method:
def isCascadingTruncateTable(): Option[Boolean] = None
is not overriden in org.apache.spark.sql.jdbc.AggregatedDialect class.
Because of this issue, when you add more than one dialect Spark doesn't 
truncate table because isCascadingTruncateTable always returns default None for 
Aggregated Dialect.
Will implement isCascadingTruncateTable in AggregatedDialect class in this PR.

## How was this patch tested?

In JDBCSuite, inside test("Aggregated dialects"), will add one line to test 
AggregatedDialect.isCascadingTruncateTable

Author: Huaxin Gao 

Closes #19256 from huaxingao/spark-21338.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d5aefa83
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d5aefa83
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d5aefa83

Branch: refs/heads/master
Commit: d5aefa83ad8608fbea7c08e8d9164f8bee00863d
Parents: 2f96242
Author: Huaxin Gao 
Authored: Tue Sep 19 09:27:05 2017 -0700
Committer: gatorsmile 
Committed: Tue Sep 19 09:27:05 2017 -0700

--
 .../main/scala/org/apache/spark/sql/jdbc/AggregatedDialect.scala | 4 
 .../src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala | 2 ++
 2 files changed, 6 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/d5aefa83/sql/core/src/main/scala/org/apache/spark/sql/jdbc/AggregatedDialect.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/AggregatedDialect.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/AggregatedDialect.scala
index 467d8d6..7432a15 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/AggregatedDialect.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/AggregatedDialect.scala
@@ -41,4 +41,8 @@ private class AggregatedDialect(dialects: List[JdbcDialect]) 
extends JdbcDialect
   override def getJDBCType(dt: DataType): Option[JdbcType] = {
 dialects.flatMap(_.getJDBCType(dt)).headOption
   }
+
+  override def isCascadingTruncateTable(): Option[Boolean] = {
+dialects.flatMap(_.isCascadingTruncateTable()).reduceOption(_ || _)
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/d5aefa83/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
--
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
index 689f410..fd12bb9 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
@@ -740,11 +740,13 @@ class JDBCSuite extends SparkFunSuite
 } else {
   None
 }
+  override def isCascadingTruncateTable(): Option[Boolean] = Some(true)
 }, testH2Dialect))
 assert(agg.canHandle("jdbc:h2:xxx"))
 assert(!agg.canHandle("jdbc:h2"))
 assert(agg.getCatalystType(0, "", 1, null) === Some(LongType))
 assert(agg.getCatalystType(1, "", 1, null) === Some(StringType))
+assert(agg.isCascadingTruncateTable() === Some(true))
   }
 
   test("DB2Dialect type mapping") {


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [MINOR][ML] Remove unnecessary default value setting for evaluators.

2017-09-19 Thread yliang
Repository: spark
Updated Branches:
  refs/heads/master 8319432af -> 2f962422a


[MINOR][ML] Remove unnecessary default value setting for evaluators.

## What changes were proposed in this pull request?
Remove unnecessary default value setting for all evaluators, as we have set 
them in corresponding _HasXXX_ base classes.

## How was this patch tested?
Existing tests.

Author: Yanbo Liang 

Closes #19262 from yanboliang/evaluation.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2f962422
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2f962422
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2f962422

Branch: refs/heads/master
Commit: 2f962422a25020582c915e15819f91f43c0b9d68
Parents: 8319432
Author: Yanbo Liang 
Authored: Tue Sep 19 22:22:35 2017 +0800
Committer: Yanbo Liang 
Committed: Tue Sep 19 22:22:35 2017 +0800

--
 python/pyspark/ml/evaluation.py | 9 +++--
 1 file changed, 3 insertions(+), 6 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/2f962422/python/pyspark/ml/evaluation.py
--
diff --git a/python/pyspark/ml/evaluation.py b/python/pyspark/ml/evaluation.py
index 7cb8d62..09cdf9b 100644
--- a/python/pyspark/ml/evaluation.py
+++ b/python/pyspark/ml/evaluation.py
@@ -146,8 +146,7 @@ class BinaryClassificationEvaluator(JavaEvaluator, 
HasLabelCol, HasRawPrediction
 super(BinaryClassificationEvaluator, self).__init__()
 self._java_obj = self._new_java_obj(
 "org.apache.spark.ml.evaluation.BinaryClassificationEvaluator", 
self.uid)
-self._setDefault(rawPredictionCol="rawPrediction", labelCol="label",
- metricName="areaUnderROC")
+self._setDefault(metricName="areaUnderROC")
 kwargs = self._input_kwargs
 self._set(**kwargs)
 
@@ -224,8 +223,7 @@ class RegressionEvaluator(JavaEvaluator, HasLabelCol, 
HasPredictionCol,
 super(RegressionEvaluator, self).__init__()
 self._java_obj = self._new_java_obj(
 "org.apache.spark.ml.evaluation.RegressionEvaluator", self.uid)
-self._setDefault(predictionCol="prediction", labelCol="label",
- metricName="rmse")
+self._setDefault(metricName="rmse")
 kwargs = self._input_kwargs
 self._set(**kwargs)
 
@@ -297,8 +295,7 @@ class MulticlassClassificationEvaluator(JavaEvaluator, 
HasLabelCol, HasPredictio
 super(MulticlassClassificationEvaluator, self).__init__()
 self._java_obj = self._new_java_obj(
 
"org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator", self.uid)
-self._setDefault(predictionCol="prediction", labelCol="label",
- metricName="f1")
+self._setDefault(metricName="f1")
 kwargs = self._input_kwargs
 self._set(**kwargs)
 


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-21917][CORE][YARN] Supporting adding http(s) resources in yarn mode

2017-09-19 Thread wenchen
Repository: spark
Updated Branches:
  refs/heads/master 581200af7 -> 8319432af


[SPARK-21917][CORE][YARN] Supporting adding http(s) resources in yarn mode

## What changes were proposed in this pull request?
In the current Spark, when submitting application on YARN with remote resources 
`./bin/spark-shell --jars 
http://central.maven.org/maven2/com/github/swagger-akka-http/swagger-akka-http_2.11/0.10.1/swagger-akka-http_2.11-0.10.1.jar
 --master yarn-client -v`, Spark will be failed with:

```
java.io.IOException: No FileSystem for scheme: http
at 
org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2586)
at 
org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2593)
at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:91)
at 
org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2632)
at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2614)
at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:370)
at org.apache.hadoop.fs.Path.getFileSystem(Path.java:296)
at 
org.apache.spark.deploy.yarn.Client.copyFileToRemote(Client.scala:354)
at 
org.apache.spark.deploy.yarn.Client.org$apache$spark$deploy$yarn$Client$$distribute$1(Client.scala:478)
at 
org.apache.spark.deploy.yarn.Client$$anonfun$prepareLocalResources$11$$anonfun$apply$6.apply(Client.scala:600)
at 
org.apache.spark.deploy.yarn.Client$$anonfun$prepareLocalResources$11$$anonfun$apply$6.apply(Client.scala:599)
at scala.collection.mutable.ArraySeq.foreach(ArraySeq.scala:74)
at 
org.apache.spark.deploy.yarn.Client$$anonfun$prepareLocalResources$11.apply(Client.scala:599)
at 
org.apache.spark.deploy.yarn.Client$$anonfun$prepareLocalResources$11.apply(Client.scala:598)
at scala.collection.immutable.List.foreach(List.scala:381)
at 
org.apache.spark.deploy.yarn.Client.prepareLocalResources(Client.scala:598)
at 
org.apache.spark.deploy.yarn.Client.createContainerLaunchContext(Client.scala:848)
at 
org.apache.spark.deploy.yarn.Client.submitApplication(Client.scala:173)
```

This is because `YARN#client` assumes resources are on the Hadoop compatible 
FS. To fix this problem, here propose to download remote http(s) resources to 
local and add this local downloaded resources to dist cache. This solution has 
one downside: remote resources are downloaded and uploaded again, but it only 
restricted to only remote http(s) resources, also the overhead is not so big. 
The advantages of this solution is that it is simple and the code changes 
restricts to only `SparkSubmit`.

## How was this patch tested?

Unit test added, also verified in local cluster.

Author: jerryshao 

Closes #19130 from jerryshao/SPARK-21917.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8319432a
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8319432a
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8319432a

Branch: refs/heads/master
Commit: 8319432af60b8e1dc00f08d794f7d80591e24d0c
Parents: 581200a
Author: jerryshao 
Authored: Tue Sep 19 22:20:05 2017 +0800
Committer: Wenchen Fan 
Committed: Tue Sep 19 22:20:05 2017 +0800

--
 .../apache/spark/deploy/DependencyUtils.scala   |  9 ++-
 .../org/apache/spark/deploy/SparkSubmit.scala   | 51 ++-
 .../apache/spark/internal/config/package.scala  | 10 +++
 .../scala/org/apache/spark/util/Utils.scala |  3 +
 .../apache/spark/deploy/SparkSubmitSuite.scala  | 65 
 docs/running-on-yarn.md |  9 +++
 6 files changed, 143 insertions(+), 4 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/8319432a/core/src/main/scala/org/apache/spark/deploy/DependencyUtils.scala
--
diff --git a/core/src/main/scala/org/apache/spark/deploy/DependencyUtils.scala 
b/core/src/main/scala/org/apache/spark/deploy/DependencyUtils.scala
index 51c3d9b..ecc82d7 100644
--- a/core/src/main/scala/org/apache/spark/deploy/DependencyUtils.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/DependencyUtils.scala
@@ -94,7 +94,7 @@ private[deploy] object DependencyUtils {
   hadoopConf: Configuration,
   secMgr: SecurityManager): String = {
 require(fileList != null, "fileList cannot be null.")
-fileList.split(",")
+Utils.stringToSeq(fileList)
   .map(downloadFile(_, targetDir, sparkConf, hadoopConf, secMgr))
   .mkString(",")
   }
@@ -121,6 +121,11 @@ private[deploy] object DependencyUtils {
 
 uri.getScheme match {
   case "file" | "local" => path
+  case "http" | "https" | "ftp" if Utils.isTesting =>
+// This 

spark git commit: [SPARK-21428][SQL][FOLLOWUP] CliSessionState should point to the actual metastore not a dummy one

2017-09-19 Thread wenchen
Repository: spark
Updated Branches:
  refs/heads/master 1bc17a6b8 -> 581200af7


[SPARK-21428][SQL][FOLLOWUP] CliSessionState should point to the actual 
metastore not a dummy one

## What changes were proposed in this pull request?

While running bin/spark-sql, we will reuse cliSessionState, but the Hive 
configurations generated here just points to a dummy meta store which actually 
should be the real one. And the warehouse is determined later in SharedState, 
HiveClient should respect this config changing in this case too.

## How was this patch tested?
existing ut

cc cloud-fan jiangxb1987

Author: Kent Yao 

Closes #19068 from yaooqinn/SPARK-21428-FOLLOWUP.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/581200af
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/581200af
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/581200af

Branch: refs/heads/master
Commit: 581200af717bcefd11c9930ac063fe53c6fd2fde
Parents: 1bc17a6
Author: Kent Yao 
Authored: Tue Sep 19 19:35:36 2017 +0800
Committer: Wenchen Fan 
Committed: Tue Sep 19 19:35:36 2017 +0800

--
 .../sql/hive/thriftserver/SparkSQLCLIDriver.scala| 14 +++---
 .../scala/org/apache/spark/sql/hive/HiveUtils.scala  |  6 +++---
 .../spark/sql/hive/client/HiveClientImpl.scala   | 15 +--
 .../spark/sql/hive/client/HiveVersionSuite.scala |  2 +-
 .../apache/spark/sql/hive/client/VersionsSuite.scala |  2 +-
 5 files changed, 29 insertions(+), 10 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/581200af/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala
--
diff --git 
a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala
 
b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala
index 761e832..832a15d 100644
--- 
a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala
+++ 
b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala
@@ -37,6 +37,8 @@ import org.apache.hadoop.hive.ql.session.SessionState
 import org.apache.log4j.{Level, Logger}
 import org.apache.thrift.transport.TSocket
 
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.hive.HiveUtils
@@ -81,11 +83,17 @@ private[hive] object SparkSQLCLIDriver extends Logging {
   System.exit(1)
 }
 
+val sparkConf = new SparkConf(loadDefaults = true)
+val hadoopConf = SparkHadoopUtil.get.newConfiguration(sparkConf)
+val extraConfigs = HiveUtils.formatTimeVarsForHiveClient(hadoopConf)
+
 val cliConf = new HiveConf(classOf[SessionState])
-// Override the location of the metastore since this is only used for 
local execution.
-HiveUtils.newTemporaryConfiguration(useInMemoryDerby = false).foreach {
-  case (key, value) => cliConf.set(key, value)
+(hadoopConf.iterator().asScala.map(kv => kv.getKey -> kv.getValue)
+  ++ sparkConf.getAll.toMap ++ extraConfigs).foreach {
+  case (k, v) =>
+cliConf.set(k, v)
 }
+
 val sessionState = new CliSessionState(cliConf)
 
 sessionState.in = System.in

http://git-wip-us.apache.org/repos/asf/spark/blob/581200af/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala
--
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala
index 561c127..80b9a3d 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala
@@ -176,9 +176,9 @@ private[spark] object HiveUtils extends Logging {
   }
 
   /**
-   * Configurations needed to create a [[HiveClient]].
+   * Change time configurations needed to create a [[HiveClient]] into unified 
[[Long]] format.
*/
-  private[hive] def hiveClientConfigurations(hadoopConf: Configuration): 
Map[String, String] = {
+  private[hive] def formatTimeVarsForHiveClient(hadoopConf: Configuration): 
Map[String, String] = {
 // Hive 0.14.0 introduces timeout operations in HiveConf, and changes 
default values of a bunch
 // of time `ConfVar`s by adding time suffixes (`s`, `ms`, and `d` etc.).  
This breaks backwards-
 // compatibility when users are trying to connecting to a Hive metastore 
of lower version,
@@ -280,7 +280,7 @@ private[spark] object 

spark git commit: [SPARK-22052] Incorrect Metric assigned in MetricsReporter.scala

2017-09-19 Thread srowen
Repository: spark
Updated Branches:
  refs/heads/branch-2.2 d0234ebcf -> 6764408f6


[SPARK-22052] Incorrect Metric assigned in MetricsReporter.scala

Current implementation for processingRate-total uses wrong metric:
mistakenly uses inputRowsPerSecond instead of processedRowsPerSecond

## What changes were proposed in this pull request?
Adjust processingRate-total from using inputRowsPerSecond to 
processedRowsPerSecond

## How was this patch tested?

Built spark from source with proposed change and tested output with correct 
parameter. Before change the csv metrics file for inputRate-total and 
processingRate-total displayed the same values due to the error. After changing 
MetricsReporter.scala the processingRate-total csv file displayed the correct 
metric.
https://user-images.githubusercontent.com/32072374/30554340-82eea12c-9ca4-11e7-8370-8168526ff9a2.png;>

Please review http://spark.apache.org/contributing.html before opening a pull 
request.

Author: Taaffy <32072374+taa...@users.noreply.github.com>

Closes #19268 from Taaffy/patch-1.

(cherry picked from commit 1bc17a6b8add02772a8a0a1048ac6a01d045baf4)
Signed-off-by: Sean Owen 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6764408f
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6764408f
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6764408f

Branch: refs/heads/branch-2.2
Commit: 6764408f68495e2ca7c1b9959db53ee12cabb197
Parents: d0234eb
Author: Taaffy <32072374+taa...@users.noreply.github.com>
Authored: Tue Sep 19 10:20:04 2017 +0100
Committer: Sean Owen 
Committed: Tue Sep 19 10:20:14 2017 +0100

--
 .../org/apache/spark/sql/execution/streaming/MetricsReporter.scala | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/6764408f/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetricsReporter.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetricsReporter.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetricsReporter.scala
index 5551d12..b84e6ce 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetricsReporter.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetricsReporter.scala
@@ -40,7 +40,7 @@ class MetricsReporter(
   // Metric names should not have . in them, so that all the metrics of a 
query are identified
   // together in Ganglia as a single metric group
   registerGauge("inputRate-total", () => 
stream.lastProgress.inputRowsPerSecond)
-  registerGauge("processingRate-total", () => 
stream.lastProgress.inputRowsPerSecond)
+  registerGauge("processingRate-total", () => 
stream.lastProgress.processedRowsPerSecond)
   registerGauge("latency", () => 
stream.lastProgress.durationMs.get("triggerExecution").longValue())
 
   private def registerGauge[T](name: String, f: () => T)(implicit num: 
Numeric[T]): Unit = {


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-22052] Incorrect Metric assigned in MetricsReporter.scala

2017-09-19 Thread srowen
Repository: spark
Updated Branches:
  refs/heads/master 7c92351f4 -> 1bc17a6b8


[SPARK-22052] Incorrect Metric assigned in MetricsReporter.scala

Current implementation for processingRate-total uses wrong metric:
mistakenly uses inputRowsPerSecond instead of processedRowsPerSecond

## What changes were proposed in this pull request?
Adjust processingRate-total from using inputRowsPerSecond to 
processedRowsPerSecond

## How was this patch tested?

Built spark from source with proposed change and tested output with correct 
parameter. Before change the csv metrics file for inputRate-total and 
processingRate-total displayed the same values due to the error. After changing 
MetricsReporter.scala the processingRate-total csv file displayed the correct 
metric.
https://user-images.githubusercontent.com/32072374/30554340-82eea12c-9ca4-11e7-8370-8168526ff9a2.png;>

Please review http://spark.apache.org/contributing.html before opening a pull 
request.

Author: Taaffy <32072374+taa...@users.noreply.github.com>

Closes #19268 from Taaffy/patch-1.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1bc17a6b
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1bc17a6b
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1bc17a6b

Branch: refs/heads/master
Commit: 1bc17a6b8add02772a8a0a1048ac6a01d045baf4
Parents: 7c92351
Author: Taaffy <32072374+taa...@users.noreply.github.com>
Authored: Tue Sep 19 10:20:04 2017 +0100
Committer: Sean Owen 
Committed: Tue Sep 19 10:20:04 2017 +0100

--
 .../org/apache/spark/sql/execution/streaming/MetricsReporter.scala | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/1bc17a6b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetricsReporter.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetricsReporter.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetricsReporter.scala
index 5551d12..b84e6ce 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetricsReporter.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetricsReporter.scala
@@ -40,7 +40,7 @@ class MetricsReporter(
   // Metric names should not have . in them, so that all the metrics of a 
query are identified
   // together in Ganglia as a single metric group
   registerGauge("inputRate-total", () => 
stream.lastProgress.inputRowsPerSecond)
-  registerGauge("processingRate-total", () => 
stream.lastProgress.inputRowsPerSecond)
+  registerGauge("processingRate-total", () => 
stream.lastProgress.processedRowsPerSecond)
   registerGauge("latency", () => 
stream.lastProgress.durationMs.get("triggerExecution").longValue())
 
   private def registerGauge[T](name: String, f: () => T)(implicit num: 
Numeric[T]): Unit = {


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [MINOR][CORE] Cleanup dead code and duplication in Mem. Management

2017-09-19 Thread srowen
Repository: spark
Updated Branches:
  refs/heads/master a11db942a -> 7c92351f4


[MINOR][CORE] Cleanup dead code and duplication in Mem. Management

## What changes were proposed in this pull request?

* Removed the method 
`org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter#alignToWords`.
It became unused as a result of 85b0a157543201895557d66306b38b3ca52f2151
(SPARK-15962) introducing word alignment for unsafe arrays.
* Cleaned up duplicate code in memory management and unsafe sorters
  * The change extracting the exception paths is more than just cosmetics since 
it def. reduces the size the affected methods compile to

## How was this patch tested?

* Build still passes after removing the method, grepping the codebase for 
`alignToWords` shows no reference to it anywhere either.
* Dried up code is covered by existing tests.

Author: Armin 

Closes #19254 from original-brownbear/cleanup-mem-consumer.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7c92351f
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7c92351f
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7c92351f

Branch: refs/heads/master
Commit: 7c92351f43ac4b1710e3c80c78f7978dad491ed2
Parents: a11db94
Author: Armin 
Authored: Tue Sep 19 10:06:32 2017 +0100
Committer: Sean Owen 
Committed: Tue Sep 19 10:06:32 2017 +0100

--
 .../org/apache/spark/memory/MemoryConsumer.java | 26 
 .../spark/unsafe/map/BytesToBytesMap.java   | 24 ++-
 .../unsafe/sort/UnsafeExternalSorter.java   | 32 +---
 .../expressions/codegen/UnsafeRowWriter.java| 16 --
 4 files changed, 37 insertions(+), 61 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/7c92351f/core/src/main/java/org/apache/spark/memory/MemoryConsumer.java
--
diff --git a/core/src/main/java/org/apache/spark/memory/MemoryConsumer.java 
b/core/src/main/java/org/apache/spark/memory/MemoryConsumer.java
index 4099fb0..0efae16 100644
--- a/core/src/main/java/org/apache/spark/memory/MemoryConsumer.java
+++ b/core/src/main/java/org/apache/spark/memory/MemoryConsumer.java
@@ -89,13 +89,7 @@ public abstract class MemoryConsumer {
 long required = size * 8L;
 MemoryBlock page = taskMemoryManager.allocatePage(required, this);
 if (page == null || page.size() < required) {
-  long got = 0;
-  if (page != null) {
-got = page.size();
-taskMemoryManager.freePage(page, this);
-  }
-  taskMemoryManager.showMemoryUsage();
-  throw new OutOfMemoryError("Unable to acquire " + required + " bytes of 
memory, got " + got);
+  throwOom(page, required);
 }
 used += required;
 return new LongArray(page);
@@ -116,13 +110,7 @@ public abstract class MemoryConsumer {
   protected MemoryBlock allocatePage(long required) {
 MemoryBlock page = taskMemoryManager.allocatePage(Math.max(pageSize, 
required), this);
 if (page == null || page.size() < required) {
-  long got = 0;
-  if (page != null) {
-got = page.size();
-taskMemoryManager.freePage(page, this);
-  }
-  taskMemoryManager.showMemoryUsage();
-  throw new OutOfMemoryError("Unable to acquire " + required + " bytes of 
memory, got " + got);
+  throwOom(page, required);
 }
 used += page.size();
 return page;
@@ -152,4 +140,14 @@ public abstract class MemoryConsumer {
 taskMemoryManager.releaseExecutionMemory(size, this);
 used -= size;
   }
+
+  private void throwOom(final MemoryBlock page, final long required) {
+long got = 0;
+if (page != null) {
+  got = page.size();
+  taskMemoryManager.freePage(page, this);
+}
+taskMemoryManager.showMemoryUsage();
+throw new OutOfMemoryError("Unable to acquire " + required + " bytes of 
memory, got " + got);
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/7c92351f/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
--
diff --git 
a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java 
b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
index 610ace3..4fadfe3 100644
--- a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
+++ b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
@@ -283,13 +283,7 @@ public final class BytesToBytesMap extends MemoryConsumer {
 } else {
   currentPage = null;
   if (reader != null) {
-// remove the spill file from disk
-File file = spillWriters.removeFirst().getFile();
-if (file != null && file.exists()) {
-   

spark git commit: [SPARK-21923][CORE] Avoid calling reserveUnrollMemoryForThisTask for every record

2017-09-19 Thread wenchen
Repository: spark
Updated Branches:
  refs/heads/master 10f45b3c8 -> a11db942a


[SPARK-21923][CORE] Avoid calling reserveUnrollMemoryForThisTask for every 
record

## What changes were proposed in this pull request?
When Spark persist data to Unsafe memory, we call  the method 
`MemoryStore.putIteratorAsBytes`, which need synchronize the `memoryManager` 
for every record write. This implementation is not necessary, we can apply for 
more memory at a time to reduce unnecessary synchronization.

## How was this patch tested?

Test case (with 1 executor 20 core):
```scala
val start = System.currentTimeMillis()
val data = sc.parallelize(0 until Integer.MAX_VALUE, 100)
  .persist(StorageLevel.OFF_HEAP)
  .count()

println(System.currentTimeMillis() - start)

```

Test result:

before

|  27647  |  29108  |  28591  |  28264  |  27232  |

after

|  26868  |  26358  |  27767  |  26653  |  26693  |

Author: Xianyang Liu 

Closes #19135 from ConeyLiu/memorystore.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a11db942
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a11db942
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a11db942

Branch: refs/heads/master
Commit: a11db942aaf4c470a85f8a1b180f034f7a584254
Parents: 10f45b3
Author: Xianyang Liu 
Authored: Tue Sep 19 14:51:27 2017 +0800
Committer: Wenchen Fan 
Committed: Tue Sep 19 14:51:27 2017 +0800

--
 .../apache/spark/internal/config/package.scala| 15 +++
 .../apache/spark/storage/memory/MemoryStore.scala | 18 ++
 2 files changed, 29 insertions(+), 4 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/a11db942/core/src/main/scala/org/apache/spark/internal/config/package.scala
--
diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala 
b/core/src/main/scala/org/apache/spark/internal/config/package.scala
index 0d3769a..e0f6960 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/package.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala
@@ -385,4 +385,19 @@ package object config {
   .checkValue(v => v > 0 && v <= Int.MaxValue,
 s"The buffer size must be greater than 0 and less than 
${Int.MaxValue}.")
   .createWithDefault(1024 * 1024)
+
+  private[spark] val UNROLL_MEMORY_CHECK_PERIOD =
+ConfigBuilder("spark.storage.unrollMemoryCheckPeriod")
+  .internal()
+  .doc("The memory check period is used to determine how often we should 
check whether "
++ "there is a need to request more memory when we try to unroll the 
given block in memory.")
+  .longConf
+  .createWithDefault(16)
+
+  private[spark] val UNROLL_MEMORY_GROWTH_FACTOR =
+ConfigBuilder("spark.storage.unrollMemoryGrowthFactor")
+  .internal()
+  .doc("Memory to request as a multiple of the size that used to unroll 
the block.")
+  .doubleConf
+  .createWithDefault(1.5)
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/a11db942/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala
--
diff --git 
a/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala 
b/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala
index 90e3af2..eb2201d 100644
--- a/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala
+++ b/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala
@@ -29,6 +29,7 @@ import com.google.common.io.ByteStreams
 
 import org.apache.spark.{SparkConf, TaskContext}
 import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.{UNROLL_MEMORY_CHECK_PERIOD, 
UNROLL_MEMORY_GROWTH_FACTOR}
 import org.apache.spark.memory.{MemoryManager, MemoryMode}
 import org.apache.spark.serializer.{SerializationStream, SerializerManager}
 import org.apache.spark.storage.{BlockId, BlockInfoManager, StorageLevel, 
StreamBlockId}
@@ -190,11 +191,11 @@ private[spark] class MemoryStore(
 // Initial per-task memory to request for unrolling blocks (bytes).
 val initialMemoryThreshold = unrollMemoryThreshold
 // How often to check whether we need to request more memory
-val memoryCheckPeriod = 16
+val memoryCheckPeriod = conf.get(UNROLL_MEMORY_CHECK_PERIOD)
 // Memory currently reserved by this task for this particular unrolling 
operation
 var memoryThreshold = initialMemoryThreshold
 // Memory to request as a multiple of current vector size
-val memoryGrowthFactor = 1.5
+val memoryGrowthFactor = conf.get(UNROLL_MEMORY_GROWTH_FACTOR)
 // Keep track of unroll memory used by this