Repository: spark
Updated Branches:
refs/heads/master f4346f612 -> 0370abdfd
[MINOR] Hide the error logs for 'SQLListenerMemoryLeakSuite'
Hide the error logs for 'SQLListenerMemoryLeakSuite' to avoid noises. Most of
changes are space changes.
Author: Shixiong Zhu <shixi...@databricks.com>
Closes #10363 from zsxwing/hide-log.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0370abdf
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0370abdf
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0370abdf
Branch: refs/heads/master
Commit: 0370abdfd636566cd8df954c6f9ea5a794d275ef
Parents: f4346f6
Author: Shixiong Zhu <shixi...@databricks.com>
Authored: Thu Dec 17 18:18:12 2015 -0800
Committer: Shixiong Zhu <shixi...@databricks.com>
Committed: Thu Dec 17 18:18:12 2015 -0800
--
.../sql/execution/ui/SQLListenerSuite.scala | 64 +++-
1 file changed, 35 insertions(+), 29 deletions(-)
--
http://git-wip-us.apache.org/repos/asf/spark/blob/0370abdf/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala
--
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala
index 12a4e13..11a6ce9 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala
@@ -336,39 +336,45 @@ class SQLListenerSuite extends SparkFunSuite with
SharedSQLContext {
class SQLListenerMemoryLeakSuite extends SparkFunSuite {
test("no memory leak") {
-val conf = new SparkConf()
- .setMaster("local")
- .setAppName("test")
- .set("spark.task.maxFailures", "1") // Don't retry the tasks to run this
test quickly
- .set("spark.sql.ui.retainedExecutions", "50") // Set it to 50 to run
this test quickly
-val sc = new SparkContext(conf)
+val oldLogLevel = org.apache.log4j.Logger.getRootLogger().getLevel()
try {
- SQLContext.clearSqlListener()
- val sqlContext = new SQLContext(sc)
- import sqlContext.implicits._
- // Run 100 successful executions and 100 failed executions.
- // Each execution only has one job and one stage.
- for (i <- 0 until 100) {
-val df = Seq(
- (1, 1),
- (2, 2)
-).toDF()
-df.collect()
-try {
- df.foreach(_ => throw new RuntimeException("Oops"))
-} catch {
- case e: SparkException => // This is expected for a failed job
+
org.apache.log4j.Logger.getRootLogger().setLevel(org.apache.log4j.Level.FATAL)
+ val conf = new SparkConf()
+.setMaster("local")
+.setAppName("test")
+.set("spark.task.maxFailures", "1") // Don't retry the tasks to run
this test quickly
+.set("spark.sql.ui.retainedExecutions", "50") // Set it to 50 to run
this test quickly
+ val sc = new SparkContext(conf)
+ try {
+SQLContext.clearSqlListener()
+val sqlContext = new SQLContext(sc)
+import sqlContext.implicits._
+// Run 100 successful executions and 100 failed executions.
+// Each execution only has one job and one stage.
+for (i <- 0 until 100) {
+ val df = Seq(
+(1, 1),
+(2, 2)
+ ).toDF()
+ df.collect()
+ try {
+df.foreach(_ => throw new RuntimeException("Oops"))
+ } catch {
+case e: SparkException => // This is expected for a failed job
+ }
}
+sc.listenerBus.waitUntilEmpty(1)
+assert(sqlContext.listener.getCompletedExecutions.size <= 50)
+assert(sqlContext.listener.getFailedExecutions.size <= 50)
+// 50 for successful executions and 50 for failed executions
+assert(sqlContext.listener.executionIdToData.size <= 100)
+assert(sqlContext.listener.jobIdToExecutionId.size <= 100)
+assert(sqlContext.listener.stageIdToStageMetrics.size <= 100)
+ } finally {
+sc.stop()
}
- sc.listenerBus.waitUntilEmpty(1)
- assert(sqlContext.listener.getCompletedExecutions.size <= 50)
- assert(sqlContext.listener.getFailedExecutions.size <= 50)
- // 50 for successful executions and 50 for failed executions
- assert(sqlContext.listener.executionIdToData.size <= 100)
- assert(sqlContex
Repository: spark
Updated Branches:
refs/heads/branch-1.6 d7e3bfd7d -> fbf16da2e
[SPARK-12281][CORE] Fix a race condition when reporting ExecutorState in the
shutdown hook
1. Make sure workers and masters exit so that no worker or master will still be
running when triggering the shutdown hook.
2. Set ExecutorState to FAILED if it's still RUNNING when executing the
shutdown hook.
This should fix the potential exceptions when exiting a local cluster
```
java.lang.AssertionError: assertion failed: executor 4 state transfer from
RUNNING to RUNNING is illegal
at scala.Predef$.assert(Predef.scala:179)
at
org.apache.spark.deploy.master.Master$$anonfun$receive$1.applyOrElse(Master.scala:260)
at
org.apache.spark.rpc.netty.Inbox$$anonfun$process$1.apply$mcV$sp(Inbox.scala:116)
at org.apache.spark.rpc.netty.Inbox.safelyCall(Inbox.scala:204)
at org.apache.spark.rpc.netty.Inbox.process(Inbox.scala:100)
at
org.apache.spark.rpc.netty.Dispatcher$MessageLoop.run(Dispatcher.scala:215)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
java.lang.IllegalStateException: Shutdown hooks cannot be modified during
shutdown.
at
org.apache.spark.util.SparkShutdownHookManager.add(ShutdownHookManager.scala:246)
at
org.apache.spark.util.ShutdownHookManager$.addShutdownHook(ShutdownHookManager.scala:191)
at
org.apache.spark.util.ShutdownHookManager$.addShutdownHook(ShutdownHookManager.scala:180)
at
org.apache.spark.deploy.worker.ExecutorRunner.start(ExecutorRunner.scala:73)
at
org.apache.spark.deploy.worker.Worker$$anonfun$receive$1.applyOrElse(Worker.scala:474)
at
org.apache.spark.rpc.netty.Inbox$$anonfun$process$1.apply$mcV$sp(Inbox.scala:116)
at org.apache.spark.rpc.netty.Inbox.safelyCall(Inbox.scala:204)
at org.apache.spark.rpc.netty.Inbox.process(Inbox.scala:100)
at
org.apache.spark.rpc.netty.Dispatcher$MessageLoop.run(Dispatcher.scala:215)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
```
Author: Shixiong Zhu <shixi...@databricks.com>
Closes #10269 from zsxwing/executor-state.
(cherry picked from commit 2aecda284e22ec608992b6221e2f5ffbd51fcd24)
Signed-off-by: Shixiong Zhu <shixi...@databricks.com>
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/fbf16da2
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/fbf16da2
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/fbf16da2
Branch: refs/heads/branch-1.6
Commit: fbf16da2e53acc8678bd1454b0749d1923d4eddf
Parents: d7e3bfd
Author: Shixiong Zhu <shixi...@databricks.com>
Authored: Sun Dec 13 22:06:39 2015 -0800
Committer: Shixiong Zhu <shixi...@databricks.com>
Committed: Sun Dec 13 22:06:56 2015 -0800
--
.../main/scala/org/apache/spark/deploy/LocalSparkCluster.scala | 2 ++
core/src/main/scala/org/apache/spark/deploy/master/Master.scala | 5 ++---
.../scala/org/apache/spark/deploy/worker/ExecutorRunner.scala | 5 +
3 files changed, 9 insertions(+), 3 deletions(-)
--
http://git-wip-us.apache.org/repos/asf/spark/blob/fbf16da2/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala
--
diff --git
a/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala
b/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala
index 83ccaad..5bb62d3 100644
--- a/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala
@@ -75,6 +75,8 @@ class LocalSparkCluster(
// Stop the workers before the master so they don't get upset that it
disconnected
workerRpcEnvs.foreach(_.shutdown())
masterRpcEnvs.foreach(_.shutdown())
+workerRpcEnvs.foreach(_.awaitTermination())
+masterRpcEnvs.foreach(_.awaitTermination())
masterRpcEnvs.clear()
workerRpcEnvs.clear()
}
http://git-wip-us.apache.org/repos/asf/spark/blob/fbf16da2/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
--
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
index 04b20e0..1355e1a 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/ma
Repository: spark
Updated Branches:
refs/heads/master 8af2f8c61 -> 2aecda284
[SPARK-12281][CORE] Fix a race condition when reporting ExecutorState in the
shutdown hook
1. Make sure workers and masters exit so that no worker or master will still be
running when triggering the shutdown hook.
2. Set ExecutorState to FAILED if it's still RUNNING when executing the
shutdown hook.
This should fix the potential exceptions when exiting a local cluster
```
java.lang.AssertionError: assertion failed: executor 4 state transfer from
RUNNING to RUNNING is illegal
at scala.Predef$.assert(Predef.scala:179)
at
org.apache.spark.deploy.master.Master$$anonfun$receive$1.applyOrElse(Master.scala:260)
at
org.apache.spark.rpc.netty.Inbox$$anonfun$process$1.apply$mcV$sp(Inbox.scala:116)
at org.apache.spark.rpc.netty.Inbox.safelyCall(Inbox.scala:204)
at org.apache.spark.rpc.netty.Inbox.process(Inbox.scala:100)
at
org.apache.spark.rpc.netty.Dispatcher$MessageLoop.run(Dispatcher.scala:215)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
java.lang.IllegalStateException: Shutdown hooks cannot be modified during
shutdown.
at
org.apache.spark.util.SparkShutdownHookManager.add(ShutdownHookManager.scala:246)
at
org.apache.spark.util.ShutdownHookManager$.addShutdownHook(ShutdownHookManager.scala:191)
at
org.apache.spark.util.ShutdownHookManager$.addShutdownHook(ShutdownHookManager.scala:180)
at
org.apache.spark.deploy.worker.ExecutorRunner.start(ExecutorRunner.scala:73)
at
org.apache.spark.deploy.worker.Worker$$anonfun$receive$1.applyOrElse(Worker.scala:474)
at
org.apache.spark.rpc.netty.Inbox$$anonfun$process$1.apply$mcV$sp(Inbox.scala:116)
at org.apache.spark.rpc.netty.Inbox.safelyCall(Inbox.scala:204)
at org.apache.spark.rpc.netty.Inbox.process(Inbox.scala:100)
at
org.apache.spark.rpc.netty.Dispatcher$MessageLoop.run(Dispatcher.scala:215)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
```
Author: Shixiong Zhu <shixi...@databricks.com>
Closes #10269 from zsxwing/executor-state.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2aecda28
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2aecda28
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2aecda28
Branch: refs/heads/master
Commit: 2aecda284e22ec608992b6221e2f5ffbd51fcd24
Parents: 8af2f8c
Author: Shixiong Zhu <shixi...@databricks.com>
Authored: Sun Dec 13 22:06:39 2015 -0800
Committer: Shixiong Zhu <shixi...@databricks.com>
Committed: Sun Dec 13 22:06:39 2015 -0800
--
.../main/scala/org/apache/spark/deploy/LocalSparkCluster.scala | 2 ++
core/src/main/scala/org/apache/spark/deploy/master/Master.scala | 5 ++---
.../scala/org/apache/spark/deploy/worker/ExecutorRunner.scala | 5 +
3 files changed, 9 insertions(+), 3 deletions(-)
--
http://git-wip-us.apache.org/repos/asf/spark/blob/2aecda28/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala
--
diff --git
a/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala
b/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala
index 83ccaad..5bb62d3 100644
--- a/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala
@@ -75,6 +75,8 @@ class LocalSparkCluster(
// Stop the workers before the master so they don't get upset that it
disconnected
workerRpcEnvs.foreach(_.shutdown())
masterRpcEnvs.foreach(_.shutdown())
+workerRpcEnvs.foreach(_.awaitTermination())
+masterRpcEnvs.foreach(_.awaitTermination())
masterRpcEnvs.clear()
workerRpcEnvs.clear()
}
http://git-wip-us.apache.org/repos/asf/spark/blob/2aecda28/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
--
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
index 04b20e0..1355e1a 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
@@ -257,9 +257,8 @@ private[deploy] class Master(
exec.state = state
if (state ==
Repository: spark
Updated Branches:
refs/heads/branch-1.4 c7c99857d -> 43f02e41e
[STREAMING][DOC][MINOR] Update the description of direct Kafka stream doc
With the merge of
[SPARK-8337](https://issues.apache.org/jira/browse/SPARK-8337), now the Python
API has the same functionalities compared to Scala/Java, so here changing the
description to make it more precise.
zsxwing tdas , please review, thanks a lot.
Author: jerryshao <ss...@hortonworks.com>
Closes #10246 from jerryshao/direct-kafka-doc-update.
(cherry picked from commit 24d3357d66e14388faf8709b368edca70ea96432)
Signed-off-by: Shixiong Zhu <shixi...@databricks.com>
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/43f02e41
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/43f02e41
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/43f02e41
Branch: refs/heads/branch-1.4
Commit: 43f02e41e36256c2b90ee06d8b380ff2dfc7cabf
Parents: c7c9985
Author: jerryshao <ss...@hortonworks.com>
Authored: Thu Dec 10 15:31:46 2015 -0800
Committer: Shixiong Zhu <shixi...@databricks.com>
Committed: Thu Dec 10 15:32:16 2015 -0800
--
docs/streaming-kafka-integration.md | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
--
http://git-wip-us.apache.org/repos/asf/spark/blob/43f02e41/docs/streaming-kafka-integration.md
--
diff --git a/docs/streaming-kafka-integration.md
b/docs/streaming-kafka-integration.md
index 79b811c..100f637 100644
--- a/docs/streaming-kafka-integration.md
+++ b/docs/streaming-kafka-integration.md
@@ -74,7 +74,7 @@ Next, we discuss how to use this approach in your streaming
application.
[Maven
repository](http://search.maven.org/#search|ga|1|a%3A%22spark-streaming-kafka-assembly_2.10%22%20AND%20v%3A%22{{site.SPARK_VERSION_SHORT}}%22)
and add it to `spark-submit` with `--jars`.
## Approach 2: Direct Approach (No Receivers)
-This new receiver-less "direct" approach has been introduced in Spark 1.3 to
ensure stronger end-to-end guarantees. Instead of using receivers to receive
data, this approach periodically queries Kafka for the latest offsets in each
topic+partition, and accordingly defines the offset ranges to process in each
batch. When the jobs to process the data are launched, Kafka's simple consumer
API is used to read the defined ranges of offsets from Kafka (similar to read
files from a file system). Note that this is an experimental feature introduced
in Spark 1.3 for the Scala and Java API. Spark 1.4 added a Python API, but it
is not yet at full feature parity.
+This new receiver-less "direct" approach has been introduced in Spark 1.3 to
ensure stronger end-to-end guarantees. Instead of using receivers to receive
data, this approach periodically queries Kafka for the latest offsets in each
topic+partition, and accordingly defines the offset ranges to process in each
batch. When the jobs to process the data are launched, Kafka's simple consumer
API is used to read the defined ranges of offsets from Kafka (similar to read
files from a file system). Note that this is an experimental feature introduced
in Spark 1.3 for the Scala and Java API, in Spark 1.4 for the Python API.
This approach has the following advantages over the receiver-based approach
(i.e. Approach 1).
-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org
Repository: spark
Updated Branches:
refs/heads/branch-1.5 4b99f72f7 -> cb0246c93
[STREAMING][DOC][MINOR] Update the description of direct Kafka stream doc
With the merge of
[SPARK-8337](https://issues.apache.org/jira/browse/SPARK-8337), now the Python
API has the same functionalities compared to Scala/Java, so here changing the
description to make it more precise.
zsxwing tdas , please review, thanks a lot.
Author: jerryshao <ss...@hortonworks.com>
Closes #10246 from jerryshao/direct-kafka-doc-update.
(cherry picked from commit 24d3357d66e14388faf8709b368edca70ea96432)
Signed-off-by: Shixiong Zhu <shixi...@databricks.com>
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/cb0246c9
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/cb0246c9
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/cb0246c9
Branch: refs/heads/branch-1.5
Commit: cb0246c9314892dbb3403488154b2d987c90b1dd
Parents: 4b99f72
Author: jerryshao <ss...@hortonworks.com>
Authored: Thu Dec 10 15:31:46 2015 -0800
Committer: Shixiong Zhu <shixi...@databricks.com>
Committed: Thu Dec 10 15:32:05 2015 -0800
--
docs/streaming-kafka-integration.md | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
--
http://git-wip-us.apache.org/repos/asf/spark/blob/cb0246c9/docs/streaming-kafka-integration.md
--
diff --git a/docs/streaming-kafka-integration.md
b/docs/streaming-kafka-integration.md
index ab7f011..d58f4f6 100644
--- a/docs/streaming-kafka-integration.md
+++ b/docs/streaming-kafka-integration.md
@@ -74,7 +74,7 @@ Next, we discuss how to use this approach in your streaming
application.
[Maven
repository](http://search.maven.org/#search|ga|1|a%3A%22spark-streaming-kafka-assembly_2.10%22%20AND%20v%3A%22{{site.SPARK_VERSION_SHORT}}%22)
and add it to `spark-submit` with `--jars`.
## Approach 2: Direct Approach (No Receivers)
-This new receiver-less "direct" approach has been introduced in Spark 1.3 to
ensure stronger end-to-end guarantees. Instead of using receivers to receive
data, this approach periodically queries Kafka for the latest offsets in each
topic+partition, and accordingly defines the offset ranges to process in each
batch. When the jobs to process the data are launched, Kafka's simple consumer
API is used to read the defined ranges of offsets from Kafka (similar to read
files from a file system). Note that this is an experimental feature introduced
in Spark 1.3 for the Scala and Java API. Spark 1.4 added a Python API, but it
is not yet at full feature parity.
+This new receiver-less "direct" approach has been introduced in Spark 1.3 to
ensure stronger end-to-end guarantees. Instead of using receivers to receive
data, this approach periodically queries Kafka for the latest offsets in each
topic+partition, and accordingly defines the offset ranges to process in each
batch. When the jobs to process the data are launched, Kafka's simple consumer
API is used to read the defined ranges of offsets from Kafka (similar to read
files from a file system). Note that this is an experimental feature introduced
in Spark 1.3 for the Scala and Java API, in Spark 1.4 for the Python API.
This approach has the following advantages over the receiver-based approach
(i.e. Approach 1).
-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org
Repository: spark
Updated Branches:
refs/heads/master 5030923ea -> 24d3357d6
[STREAMING][DOC][MINOR] Update the description of direct Kafka stream doc
With the merge of
[SPARK-8337](https://issues.apache.org/jira/browse/SPARK-8337), now the Python
API has the same functionalities compared to Scala/Java, so here changing the
description to make it more precise.
zsxwing tdas , please review, thanks a lot.
Author: jerryshao <ss...@hortonworks.com>
Closes #10246 from jerryshao/direct-kafka-doc-update.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/24d3357d
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/24d3357d
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/24d3357d
Branch: refs/heads/master
Commit: 24d3357d66e14388faf8709b368edca70ea96432
Parents: 5030923
Author: jerryshao <ss...@hortonworks.com>
Authored: Thu Dec 10 15:31:46 2015 -0800
Committer: Shixiong Zhu <shixi...@databricks.com>
Committed: Thu Dec 10 15:31:46 2015 -0800
--
docs/streaming-kafka-integration.md | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
--
http://git-wip-us.apache.org/repos/asf/spark/blob/24d3357d/docs/streaming-kafka-integration.md
--
diff --git a/docs/streaming-kafka-integration.md
b/docs/streaming-kafka-integration.md
index b00351b..5be73c4 100644
--- a/docs/streaming-kafka-integration.md
+++ b/docs/streaming-kafka-integration.md
@@ -74,7 +74,7 @@ Next, we discuss how to use this approach in your streaming
application.
[Maven
repository](http://search.maven.org/#search|ga|1|a%3A%22spark-streaming-kafka-assembly_2.10%22%20AND%20v%3A%22{{site.SPARK_VERSION_SHORT}}%22)
and add it to `spark-submit` with `--jars`.
## Approach 2: Direct Approach (No Receivers)
-This new receiver-less "direct" approach has been introduced in Spark 1.3 to
ensure stronger end-to-end guarantees. Instead of using receivers to receive
data, this approach periodically queries Kafka for the latest offsets in each
topic+partition, and accordingly defines the offset ranges to process in each
batch. When the jobs to process the data are launched, Kafka's simple consumer
API is used to read the defined ranges of offsets from Kafka (similar to read
files from a file system). Note that this is an experimental feature introduced
in Spark 1.3 for the Scala and Java API. Spark 1.4 added a Python API, but it
is not yet at full feature parity.
+This new receiver-less "direct" approach has been introduced in Spark 1.3 to
ensure stronger end-to-end guarantees. Instead of using receivers to receive
data, this approach periodically queries Kafka for the latest offsets in each
topic+partition, and accordingly defines the offset ranges to process in each
batch. When the jobs to process the data are launched, Kafka's simple consumer
API is used to read the defined ranges of offsets from Kafka (similar to read
files from a file system). Note that this is an experimental feature introduced
in Spark 1.3 for the Scala and Java API, in Spark 1.4 for the Python API.
This approach has the following advantages over the receiver-based approach
(i.e. Approach 1).
-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org
Repository: spark
Updated Branches:
refs/heads/branch-1.6 699f497cf -> f6d866173
http://git-wip-us.apache.org/repos/asf/spark/blob/f6d86617/streaming/src/test/java/org/apache/spark/streaming/JavaTrackStateByKeySuite.java
--
diff --git
a/streaming/src/test/java/org/apache/spark/streaming/JavaTrackStateByKeySuite.java
b/streaming/src/test/java/org/apache/spark/streaming/JavaTrackStateByKeySuite.java
deleted file mode 100644
index eac4cdd..000
---
a/streaming/src/test/java/org/apache/spark/streaming/JavaTrackStateByKeySuite.java
+++ /dev/null
@@ -1,210 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming;
-
-import java.io.Serializable;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-import java.util.Set;
-
-import scala.Tuple2;
-
-import com.google.common.base.Optional;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
-import org.apache.spark.api.java.JavaRDD;
-import org.apache.spark.api.java.function.Function;
-import org.apache.spark.streaming.api.java.JavaDStream;
-import org.apache.spark.util.ManualClock;
-import org.junit.Assert;
-import org.junit.Test;
-
-import org.apache.spark.HashPartitioner;
-import org.apache.spark.api.java.JavaPairRDD;
-import org.apache.spark.api.java.function.Function2;
-import org.apache.spark.api.java.function.Function4;
-import org.apache.spark.streaming.api.java.JavaPairDStream;
-import org.apache.spark.streaming.api.java.JavaTrackStateDStream;
-
-public class JavaTrackStateByKeySuite extends LocalJavaStreamingContext
implements Serializable {
-
- /**
- * This test is only for testing the APIs. It's not necessary to run it.
- */
- public void testAPI() {
-JavaPairRDD initialRDD = null;
-JavaPairDStream wordsDstream = null;
-
-final Function4
[SPARK-12244][SPARK-12245][STREAMING] Rename trackStateByKey to mapWithState
and change tracking function signature
SPARK-12244:
Based on feedback from early users and personal experience attempting to
explain it, the name trackStateByKey had two problem.
"trackState" is a completely new term which really does not give any intuition
on what the operation is
the resultant data stream of objects returned by the function is called in docs
as the "emitted" data for the lack of a better.
"mapWithState" makes sense because the API is like a mapping function like
(Key, Value) => T with State as an additional parameter. The resultant data
stream is "mapped data". So both problems are solved.
SPARK-12245:
>From initial experiences, not having the key in the function makes it hard to
>return mapped stuff, as the whole information of the records is not there.
>Basically the user is restricted to doing something like mapValue() instead of
>map(). So adding the key as a parameter.
Author: Tathagata Das
Closes #10224 from tdas/rename.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f6d86617
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f6d86617
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f6d86617
Branch: refs/heads/branch-1.6
Commit: f6d8661738b5a4b139c4800d5c4e9f0094068451
Parents: 699f497
Author: Tathagata Das
Authored: Wed Dec 9 20:47:15 2015 -0800
Committer: Shixiong Zhu
Committed: Wed Dec 9 20:59:21 2015 -0800
--
.../streaming/JavaStatefulNetworkWordCount.java | 16 +-
.../streaming/StatefulNetworkWordCount.scala| 12 +-
.../apache/spark/streaming/Java8APISuite.java | 18 +-
.../org/apache/spark/streaming/State.scala | 20 +-
.../org/apache/spark/streaming/StateSpec.scala | 160 ++---
.../api/java/JavaMapWithStateDStream.scala | 44 ++
.../streaming/api/java/JavaPairDStream.scala| 50 +-
.../api/java/JavaTrackStateDStream.scala| 44 --
.../streaming/dstream/MapWithStateDStream.scala | 170 ++
.../dstream/PairDStreamFunctions.scala | 41 +-
.../streaming/dstream/TrackStateDStream.scala | 171 --
.../spark/streaming/rdd/MapWithStateRDD.scala | 223 +++
.../spark/streaming/rdd/TrackStateRDD.scala | 228
.../spark/streaming/JavaMapWithStateSuite.java | 210 +++
.../streaming/JavaTrackStateByKeySuite.java | 210 ---
.../spark/streaming/MapWithStateSuite.scala | 581 +++
.../spark/streaming/TrackStateByKeySuite.scala | 581 ---
.../streaming/rdd/MapWithStateRDDSuite.scala| 389 +
.../streaming/rdd/TrackStateRDDSuite.scala | 389 -
19 files changed, 1782 insertions(+), 1775 deletions(-)
--
http://git-wip-us.apache.org/repos/asf/spark/blob/f6d86617/examples/src/main/java/org/apache/spark/examples/streaming/JavaStatefulNetworkWordCount.java
--
diff --git
a/examples/src/main/java/org/apache/spark/examples/streaming/JavaStatefulNetworkWordCount.java
b/examples/src/main/java/org/apache/spark/examples/streaming/JavaStatefulNetworkWordCount.java
index c400e42..14997c6 100644
---
a/examples/src/main/java/org/apache/spark/examples/streaming/JavaStatefulNetworkWordCount.java
+++
b/examples/src/main/java/org/apache/spark/examples/streaming/JavaStatefulNetworkWordCount.java
@@ -65,7 +65,7 @@ public class JavaStatefulNetworkWordCount {
JavaStreamingContext ssc = new JavaStreamingContext(sparkConf,
Durations.seconds(1));
ssc.checkpoint(".");
-// Initial RDD input to trackStateByKey
+// Initial state RDD input to mapWithState
@SuppressWarnings("unchecked")
List> tuples = Arrays.asList(new Tuple2("hello", 1),
new Tuple2("world", 1));
@@ -90,21 +90,21 @@ public class JavaStatefulNetworkWordCount {
});
// Update the cumulative count function
-final Function4>> trackStateFunc =
-new Function4>>() {
+final Function3> mappingFunc =
+new Function3>() {
@Override
- public Optional> call(Time time, String
word, Optional one, State state) {
+ public Tuple2 call(String word, Optional
one, State state) {
int sum = one.or(0) + (state.exists() ? state.get() : 0);
Tuple2 output = new Tuple2(word,
[SPARK-12244][SPARK-12245][STREAMING] Rename trackStateByKey to mapWithState
and change tracking function signature
SPARK-12244:
Based on feedback from early users and personal experience attempting to
explain it, the name trackStateByKey had two problem.
"trackState" is a completely new term which really does not give any intuition
on what the operation is
the resultant data stream of objects returned by the function is called in docs
as the "emitted" data for the lack of a better.
"mapWithState" makes sense because the API is like a mapping function like
(Key, Value) => T with State as an additional parameter. The resultant data
stream is "mapped data". So both problems are solved.
SPARK-12245:
>From initial experiences, not having the key in the function makes it hard to
>return mapped stuff, as the whole information of the records is not there.
>Basically the user is restricted to doing something like mapValue() instead of
>map(). So adding the key as a parameter.
Author: Tathagata Das
Closes #10224 from tdas/rename.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/bd2cd4f5
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/bd2cd4f5
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/bd2cd4f5
Branch: refs/heads/master
Commit: bd2cd4f53d1ca10f4896bd39b0e180d4929867a2
Parents: 2166c2a
Author: Tathagata Das
Authored: Wed Dec 9 20:47:15 2015 -0800
Committer: Shixiong Zhu
Committed: Wed Dec 9 20:47:15 2015 -0800
--
.../streaming/JavaStatefulNetworkWordCount.java | 16 +-
.../streaming/StatefulNetworkWordCount.scala| 12 +-
.../apache/spark/streaming/Java8APISuite.java | 18 +-
.../org/apache/spark/streaming/State.scala | 20 +-
.../org/apache/spark/streaming/StateSpec.scala | 160 ++---
.../api/java/JavaMapWithStateDStream.scala | 44 ++
.../streaming/api/java/JavaPairDStream.scala| 50 +-
.../api/java/JavaTrackStateDStream.scala| 44 --
.../streaming/dstream/MapWithStateDStream.scala | 170 ++
.../dstream/PairDStreamFunctions.scala | 41 +-
.../streaming/dstream/TrackStateDStream.scala | 171 --
.../spark/streaming/rdd/MapWithStateRDD.scala | 223 +++
.../spark/streaming/rdd/TrackStateRDD.scala | 228
.../spark/streaming/JavaMapWithStateSuite.java | 210 +++
.../streaming/JavaTrackStateByKeySuite.java | 210 ---
.../spark/streaming/MapWithStateSuite.scala | 581 +++
.../spark/streaming/TrackStateByKeySuite.scala | 581 ---
.../streaming/rdd/MapWithStateRDDSuite.scala| 389 +
.../streaming/rdd/TrackStateRDDSuite.scala | 389 -
19 files changed, 1782 insertions(+), 1775 deletions(-)
--
http://git-wip-us.apache.org/repos/asf/spark/blob/bd2cd4f5/examples/src/main/java/org/apache/spark/examples/streaming/JavaStatefulNetworkWordCount.java
--
diff --git
a/examples/src/main/java/org/apache/spark/examples/streaming/JavaStatefulNetworkWordCount.java
b/examples/src/main/java/org/apache/spark/examples/streaming/JavaStatefulNetworkWordCount.java
index c400e42..14997c6 100644
---
a/examples/src/main/java/org/apache/spark/examples/streaming/JavaStatefulNetworkWordCount.java
+++
b/examples/src/main/java/org/apache/spark/examples/streaming/JavaStatefulNetworkWordCount.java
@@ -65,7 +65,7 @@ public class JavaStatefulNetworkWordCount {
JavaStreamingContext ssc = new JavaStreamingContext(sparkConf,
Durations.seconds(1));
ssc.checkpoint(".");
-// Initial RDD input to trackStateByKey
+// Initial state RDD input to mapWithState
@SuppressWarnings("unchecked")
List> tuples = Arrays.asList(new Tuple2("hello", 1),
new Tuple2("world", 1));
@@ -90,21 +90,21 @@ public class JavaStatefulNetworkWordCount {
});
// Update the cumulative count function
-final Function4>> trackStateFunc =
-new Function4>>() {
+final Function3> mappingFunc =
+new Function3>() {
@Override
- public Optional> call(Time time, String
word, Optional one, State state) {
+ public Tuple2 call(String word, Optional
one, State state) {
int sum = one.or(0) + (state.exists() ? state.get() : 0);
Tuple2 output = new Tuple2(word,
Repository: spark
Updated Branches:
refs/heads/master 2166c2a75 -> bd2cd4f53
http://git-wip-us.apache.org/repos/asf/spark/blob/bd2cd4f5/streaming/src/test/java/org/apache/spark/streaming/JavaTrackStateByKeySuite.java
--
diff --git
a/streaming/src/test/java/org/apache/spark/streaming/JavaTrackStateByKeySuite.java
b/streaming/src/test/java/org/apache/spark/streaming/JavaTrackStateByKeySuite.java
deleted file mode 100644
index 89d0bb7..000
---
a/streaming/src/test/java/org/apache/spark/streaming/JavaTrackStateByKeySuite.java
+++ /dev/null
@@ -1,210 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming;
-
-import java.io.Serializable;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-import java.util.Set;
-
-import scala.Tuple2;
-
-import com.google.common.base.Optional;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
-import org.apache.spark.api.java.JavaRDD;
-import org.apache.spark.api.java.function.Function;
-import org.apache.spark.streaming.api.java.JavaDStream;
-import org.apache.spark.util.ManualClock;
-import org.junit.Assert;
-import org.junit.Test;
-
-import org.apache.spark.HashPartitioner;
-import org.apache.spark.api.java.JavaPairRDD;
-import org.apache.spark.api.java.function.Function2;
-import org.apache.spark.api.java.function.Function4;
-import org.apache.spark.streaming.api.java.JavaPairDStream;
-import org.apache.spark.streaming.api.java.JavaTrackStateDStream;
-
-public class JavaTrackStateByKeySuite extends LocalJavaStreamingContext
implements Serializable {
-
- /**
- * This test is only for testing the APIs. It's not necessary to run it.
- */
- public void testAPI() {
-JavaPairRDD initialRDD = null;
-JavaPairDStream wordsDstream = null;
-
-final Function4
-trackStateFunc =
-new Function4() {
-
- @Override
- public Optional call(
- Time time, String word, Optional one, State
state) {
-// Use all State's methods here
-state.exists();
-state.get();
-state.isTimingOut();
-state.remove();
-state.update(true);
-return Optional.of(2.0);
- }
-};
-
-JavaTrackStateDStream stateDstream =
-wordsDstream.trackStateByKey(
-StateSpec.function(trackStateFunc)
-.initialState(initialRDD)
-.numPartitions(10)
-.partitioner(new HashPartitioner(10))
-.timeout(Durations.seconds(10)));
-
-JavaPairDStream emittedRecords =
stateDstream.stateSnapshots();
-
-final Function2 trackStateFunc2
=
-new Function2() {
-
- @Override
- public Double call(Optional one, State state) {
-// Use all State's methods here
-state.exists();
-state.get();
-state.isTimingOut();
-state.remove();
-state.update(true);
-return 2.0;
- }
-};
-
-JavaTrackStateDStream stateDstream2 =
-wordsDstream.trackStateByKey(
-StateSpec.function(trackStateFunc2)
-.initialState(initialRDD)
-.numPartitions(10)
-.partitioner(new HashPartitioner(10))
-.timeout(Durations.seconds(10)));
-
-JavaPairDStream emittedRecords2 =
stateDstream2.stateSnapshots();
- }
-
- @Test
- public void testBasicFunction() {
-List inputData = Arrays.asList(
-Collections.emptyList(),
-Arrays.asList("a"),
-Arrays.asList("a", "b"),
-Arrays.asList("a", "b", "c"),
-Arrays.asList("a", "b"),
-Arrays.asList("a"),
-Collections.emptyList()
-);
-
-List outputData = Arrays.asList(
-
Repository: spark
Updated Branches:
refs/heads/master 5d80d8c6a -> 3f4efb5c2
[SPARK-12060][CORE] Avoid memory copy in JavaSerializerInstance.serialize
Merged #10051 again since #10083 is resolved.
This reverts commit 328b757d5d4486ea3c2e246780792d7a57ee85e5.
Author: Shixiong Zhu <shixi...@databricks.com>
Closes #10167 from zsxwing/merge-SPARK-12060.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3f4efb5c
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3f4efb5c
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3f4efb5c
Branch: refs/heads/master
Commit: 3f4efb5c23b029496b112760fa062ff070c20334
Parents: 5d80d8c
Author: Shixiong Zhu <shixi...@databricks.com>
Authored: Mon Dec 7 12:01:09 2015 -0800
Committer: Shixiong Zhu <shixi...@databricks.com>
Committed: Mon Dec 7 12:01:09 2015 -0800
--
.../spark/serializer/JavaSerializer.scala | 7 ++---
.../spark/util/ByteBufferOutputStream.scala | 31
2 files changed, 34 insertions(+), 4 deletions(-)
--
http://git-wip-us.apache.org/repos/asf/spark/blob/3f4efb5c/core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala
--
diff --git
a/core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala
b/core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala
index b463a71..ea718a0 100644
--- a/core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala
+++ b/core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala
@@ -24,8 +24,7 @@ import scala.reflect.ClassTag
import org.apache.spark.SparkConf
import org.apache.spark.annotation.DeveloperApi
-import org.apache.spark.util.ByteBufferInputStream
-import org.apache.spark.util.Utils
+import org.apache.spark.util.{ByteBufferInputStream, ByteBufferOutputStream,
Utils}
private[spark] class JavaSerializationStream(
out: OutputStream, counterReset: Int, extraDebugInfo: Boolean)
@@ -96,11 +95,11 @@ private[spark] class JavaSerializerInstance(
extends SerializerInstance {
override def serialize[T: ClassTag](t: T): ByteBuffer = {
-val bos = new ByteArrayOutputStream()
+val bos = new ByteBufferOutputStream()
val out = serializeStream(bos)
out.writeObject(t)
out.close()
-ByteBuffer.wrap(bos.toByteArray)
+bos.toByteBuffer
}
override def deserialize[T: ClassTag](bytes: ByteBuffer): T = {
http://git-wip-us.apache.org/repos/asf/spark/blob/3f4efb5c/core/src/main/scala/org/apache/spark/util/ByteBufferOutputStream.scala
--
diff --git
a/core/src/main/scala/org/apache/spark/util/ByteBufferOutputStream.scala
b/core/src/main/scala/org/apache/spark/util/ByteBufferOutputStream.scala
new file mode 100644
index 000..92e4522
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/util/ByteBufferOutputStream.scala
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util
+
+import java.io.ByteArrayOutputStream
+import java.nio.ByteBuffer
+
+/**
+ * Provide a zero-copy way to convert data in ByteArrayOutputStream to
ByteBuffer
+ */
+private[spark] class ByteBufferOutputStream extends ByteArrayOutputStream {
+
+ def toByteBuffer: ByteBuffer = {
+return ByteBuffer.wrap(buf, 0, count)
+ }
+}
-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org
Repository: spark
Updated Branches:
refs/heads/branch-1.5 93a0510a5 -> 3868ab644
[SPARK-12101][CORE] Fix thread pools that cannot cache tasks in Worker and
AppClient (backport 1.5)
backport #10108 to branch 1.5
Author: Shixiong Zhu <shixi...@databricks.com>
Closes #10135 from zsxwing/fix-threadpool-1.5.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3868ab64
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3868ab64
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3868ab64
Branch: refs/heads/branch-1.5
Commit: 3868ab644cc87ce68ba1605f6da65c5e951ce412
Parents: 93a0510
Author: Shixiong Zhu <shixi...@databricks.com>
Authored: Mon Dec 7 12:04:18 2015 -0800
Committer: Shixiong Zhu <shixi...@databricks.com>
Committed: Mon Dec 7 12:04:18 2015 -0800
--
.../apache/spark/deploy/client/AppClient.scala | 10 --
.../org/apache/spark/deploy/worker/Worker.scala | 10 --
.../apache/spark/deploy/yarn/YarnAllocator.scala | 19 +--
3 files changed, 13 insertions(+), 26 deletions(-)
--
http://git-wip-us.apache.org/repos/asf/spark/blob/3868ab64/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala
--
diff --git a/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala
b/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala
index 25ea692..bd28429 100644
--- a/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala
@@ -66,12 +66,10 @@ private[spark] class AppClient(
// A thread pool for registering with masters. Because registering with a
master is a blocking
// action, this thread pool must be able to create
"masterRpcAddresses.size" threads at the same
// time so that we can register with all masters.
-private val registerMasterThreadPool = new ThreadPoolExecutor(
- 0,
- masterRpcAddresses.size, // Make sure we can register with all masters
at the same time
- 60L, TimeUnit.SECONDS,
- new SynchronousQueue[Runnable](),
- ThreadUtils.namedThreadFactory("appclient-register-master-threadpool"))
+private val registerMasterThreadPool =
ThreadUtils.newDaemonCachedThreadPool(
+ "appclient-register-master-threadpool",
+ masterRpcAddresses.length // Make sure we can register with all masters
at the same time
+)
// A scheduled executor for scheduling the registration actions
private val registrationRetryThread =
http://git-wip-us.apache.org/repos/asf/spark/blob/3868ab64/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
--
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
index 79b1536..a898bb1 100755
--- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
@@ -147,12 +147,10 @@ private[deploy] class Worker(
// A thread pool for registering with masters. Because registering with a
master is a blocking
// action, this thread pool must be able to create "masterRpcAddresses.size"
threads at the same
// time so that we can register with all masters.
- private val registerMasterThreadPool = new ThreadPoolExecutor(
-0,
-masterRpcAddresses.size, // Make sure we can register with all masters at
the same time
-60L, TimeUnit.SECONDS,
-new SynchronousQueue[Runnable](),
-ThreadUtils.namedThreadFactory("worker-register-master-threadpool"))
+ private val registerMasterThreadPool = ThreadUtils.newDaemonCachedThreadPool(
+"worker-register-master-threadpool",
+masterRpcAddresses.size // Make sure we can register with all masters at
the same time
+ )
var coresUsed = 0
var memoryUsed = 0
http://git-wip-us.apache.org/repos/asf/spark/blob/3868ab64/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
--
diff --git
a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
index 6a02848..52a3fd9 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
@@ -21,26 +21,21 @@ import java.util.Collections
import java.util.concurrent._
import java.util.regex.Pattern
-import org.apache.spark.util.Utils
-
import scala.collection.JavaConvers
Repository: spark
Updated Branches:
refs/heads/branch-1.6 8865d87f7 -> 4c84f6e91
[SPARK-12101][CORE] Fix thread pools that cannot cache tasks in Worker and
AppClient
`SynchronousQueue` cannot cache any task. This issue is similar to #9978. It's
an easy fix. Just use the fixed `ThreadUtils.newDaemonCachedThreadPool`.
Author: Shixiong Zhu <shixi...@databricks.com>
Closes #10108 from zsxwing/fix-threadpool.
(cherry picked from commit 649be4fa4532dcd3001df8345f9f7e970a3fbc65)
Signed-off-by: Shixiong Zhu <shixi...@databricks.com>
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4c84f6e9
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4c84f6e9
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4c84f6e9
Branch: refs/heads/branch-1.6
Commit: 4c84f6e91d61a358c179b04bf6d1bc8b9559b6d0
Parents: 8865d87
Author: Shixiong Zhu <shixi...@databricks.com>
Authored: Thu Dec 3 11:06:25 2015 -0800
Committer: Shixiong Zhu <shixi...@databricks.com>
Committed: Thu Dec 3 11:06:46 2015 -0800
--
.../org/apache/spark/deploy/client/AppClient.scala| 10 --
.../scala/org/apache/spark/deploy/worker/Worker.scala | 10 --
.../org/apache/spark/deploy/yarn/YarnAllocator.scala | 14 --
3 files changed, 12 insertions(+), 22 deletions(-)
--
http://git-wip-us.apache.org/repos/asf/spark/blob/4c84f6e9/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala
--
diff --git a/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala
b/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala
index df6ba7d..1e2f469 100644
--- a/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala
@@ -68,12 +68,10 @@ private[spark] class AppClient(
// A thread pool for registering with masters. Because registering with a
master is a blocking
// action, this thread pool must be able to create
"masterRpcAddresses.size" threads at the same
// time so that we can register with all masters.
-private val registerMasterThreadPool = new ThreadPoolExecutor(
- 0,
- masterRpcAddresses.length, // Make sure we can register with all masters
at the same time
- 60L, TimeUnit.SECONDS,
- new SynchronousQueue[Runnable](),
- ThreadUtils.namedThreadFactory("appclient-register-master-threadpool"))
+private val registerMasterThreadPool =
ThreadUtils.newDaemonCachedThreadPool(
+ "appclient-register-master-threadpool",
+ masterRpcAddresses.length // Make sure we can register with all masters
at the same time
+)
// A scheduled executor for scheduling the registration actions
private val registrationRetryThread =
http://git-wip-us.apache.org/repos/asf/spark/blob/4c84f6e9/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
--
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
index 418faf8..1afc1ff 100755
--- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
@@ -146,12 +146,10 @@ private[deploy] class Worker(
// A thread pool for registering with masters. Because registering with a
master is a blocking
// action, this thread pool must be able to create "masterRpcAddresses.size"
threads at the same
// time so that we can register with all masters.
- private val registerMasterThreadPool = new ThreadPoolExecutor(
-0,
-masterRpcAddresses.size, // Make sure we can register with all masters at
the same time
-60L, TimeUnit.SECONDS,
-new SynchronousQueue[Runnable](),
-ThreadUtils.namedThreadFactory("worker-register-master-threadpool"))
+ private val registerMasterThreadPool = ThreadUtils.newDaemonCachedThreadPool(
+"worker-register-master-threadpool",
+masterRpcAddresses.size // Make sure we can register with all masters at
the same time
+ )
var coresUsed = 0
var memoryUsed = 0
http://git-wip-us.apache.org/repos/asf/spark/blob/4c84f6e9/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
--
diff --git
a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
index 73cd903..4e044aa 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
+++ b/yarn/src/main/scala/org/apache/spark/de
Repository: spark
Updated Branches:
refs/heads/master 7bc9e1db2 -> 649be4fa4
[SPARK-12101][CORE] Fix thread pools that cannot cache tasks in Worker and
AppClient
`SynchronousQueue` cannot cache any task. This issue is similar to #9978. It's
an easy fix. Just use the fixed `ThreadUtils.newDaemonCachedThreadPool`.
Author: Shixiong Zhu <shixi...@databricks.com>
Closes #10108 from zsxwing/fix-threadpool.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/649be4fa
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/649be4fa
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/649be4fa
Branch: refs/heads/master
Commit: 649be4fa4532dcd3001df8345f9f7e970a3fbc65
Parents: 7bc9e1d
Author: Shixiong Zhu <shixi...@databricks.com>
Authored: Thu Dec 3 11:06:25 2015 -0800
Committer: Shixiong Zhu <shixi...@databricks.com>
Committed: Thu Dec 3 11:06:25 2015 -0800
--
.../org/apache/spark/deploy/client/AppClient.scala| 10 --
.../scala/org/apache/spark/deploy/worker/Worker.scala | 10 --
.../org/apache/spark/deploy/yarn/YarnAllocator.scala | 14 --
3 files changed, 12 insertions(+), 22 deletions(-)
--
http://git-wip-us.apache.org/repos/asf/spark/blob/649be4fa/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala
--
diff --git a/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala
b/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala
index df6ba7d..1e2f469 100644
--- a/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala
@@ -68,12 +68,10 @@ private[spark] class AppClient(
// A thread pool for registering with masters. Because registering with a
master is a blocking
// action, this thread pool must be able to create
"masterRpcAddresses.size" threads at the same
// time so that we can register with all masters.
-private val registerMasterThreadPool = new ThreadPoolExecutor(
- 0,
- masterRpcAddresses.length, // Make sure we can register with all masters
at the same time
- 60L, TimeUnit.SECONDS,
- new SynchronousQueue[Runnable](),
- ThreadUtils.namedThreadFactory("appclient-register-master-threadpool"))
+private val registerMasterThreadPool =
ThreadUtils.newDaemonCachedThreadPool(
+ "appclient-register-master-threadpool",
+ masterRpcAddresses.length // Make sure we can register with all masters
at the same time
+)
// A scheduled executor for scheduling the registration actions
private val registrationRetryThread =
http://git-wip-us.apache.org/repos/asf/spark/blob/649be4fa/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
--
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
index 418faf8..1afc1ff 100755
--- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
@@ -146,12 +146,10 @@ private[deploy] class Worker(
// A thread pool for registering with masters. Because registering with a
master is a blocking
// action, this thread pool must be able to create "masterRpcAddresses.size"
threads at the same
// time so that we can register with all masters.
- private val registerMasterThreadPool = new ThreadPoolExecutor(
-0,
-masterRpcAddresses.size, // Make sure we can register with all masters at
the same time
-60L, TimeUnit.SECONDS,
-new SynchronousQueue[Runnable](),
-ThreadUtils.namedThreadFactory("worker-register-master-threadpool"))
+ private val registerMasterThreadPool = ThreadUtils.newDaemonCachedThreadPool(
+"worker-register-master-threadpool",
+masterRpcAddresses.size // Make sure we can register with all masters at
the same time
+ )
var coresUsed = 0
var memoryUsed = 0
http://git-wip-us.apache.org/repos/asf/spark/blob/649be4fa/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
--
diff --git
a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
index 73cd903..4e044aa 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
@@ -25,8 +25,6 @@ import scala.collection.mutable
import scala.collection.mutable.{A
Repository: spark
Updated Branches:
refs/heads/branch-1.6 81db8d086 -> 21909b8ac
Revert "[SPARK-12060][CORE] Avoid memory copy in
JavaSerializerInstance.serialize"
This reverts commit 9b99b2b46c452ba396e922db5fc7eec02c45b158.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/21909b8a
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/21909b8a
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/21909b8a
Branch: refs/heads/branch-1.6
Commit: 21909b8ac0068658cc833f324c0f1f418c200d61
Parents: 81db8d0
Author: Shixiong Zhu
Authored: Tue Dec 1 15:16:07 2015 -0800
Committer: Shixiong Zhu
Committed: Tue Dec 1 15:16:07 2015 -0800
--
.../spark/serializer/JavaSerializer.scala | 7 +++--
.../spark/util/ByteBufferOutputStream.scala | 31
2 files changed, 4 insertions(+), 34 deletions(-)
--
http://git-wip-us.apache.org/repos/asf/spark/blob/21909b8a/core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala
--
diff --git
a/core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala
b/core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala
index ea718a0..b463a71 100644
--- a/core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala
+++ b/core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala
@@ -24,7 +24,8 @@ import scala.reflect.ClassTag
import org.apache.spark.SparkConf
import org.apache.spark.annotation.DeveloperApi
-import org.apache.spark.util.{ByteBufferInputStream, ByteBufferOutputStream,
Utils}
+import org.apache.spark.util.ByteBufferInputStream
+import org.apache.spark.util.Utils
private[spark] class JavaSerializationStream(
out: OutputStream, counterReset: Int, extraDebugInfo: Boolean)
@@ -95,11 +96,11 @@ private[spark] class JavaSerializerInstance(
extends SerializerInstance {
override def serialize[T: ClassTag](t: T): ByteBuffer = {
-val bos = new ByteBufferOutputStream()
+val bos = new ByteArrayOutputStream()
val out = serializeStream(bos)
out.writeObject(t)
out.close()
-bos.toByteBuffer
+ByteBuffer.wrap(bos.toByteArray)
}
override def deserialize[T: ClassTag](bytes: ByteBuffer): T = {
http://git-wip-us.apache.org/repos/asf/spark/blob/21909b8a/core/src/main/scala/org/apache/spark/util/ByteBufferOutputStream.scala
--
diff --git
a/core/src/main/scala/org/apache/spark/util/ByteBufferOutputStream.scala
b/core/src/main/scala/org/apache/spark/util/ByteBufferOutputStream.scala
deleted file mode 100644
index 92e4522..000
--- a/core/src/main/scala/org/apache/spark/util/ByteBufferOutputStream.scala
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.util
-
-import java.io.ByteArrayOutputStream
-import java.nio.ByteBuffer
-
-/**
- * Provide a zero-copy way to convert data in ByteArrayOutputStream to
ByteBuffer
- */
-private[spark] class ByteBufferOutputStream extends ByteArrayOutputStream {
-
- def toByteBuffer: ByteBuffer = {
-return ByteBuffer.wrap(buf, 0, count)
- }
-}
-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org
Repository: spark
Updated Branches:
refs/heads/master 60b541ee1 -> 328b757d5
Revert "[SPARK-12060][CORE] Avoid memory copy in
JavaSerializerInstance.serialize"
This reverts commit 1401166576c7018c5f9c31e0a6703d5fb16ea339.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/328b757d
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/328b757d
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/328b757d
Branch: refs/heads/master
Commit: 328b757d5d4486ea3c2e246780792d7a57ee85e5
Parents: 60b541e
Author: Shixiong Zhu
Authored: Tue Dec 1 15:13:10 2015 -0800
Committer: Shixiong Zhu
Committed: Tue Dec 1 15:13:10 2015 -0800
--
.../spark/serializer/JavaSerializer.scala | 7 +++--
.../spark/util/ByteBufferOutputStream.scala | 31
2 files changed, 4 insertions(+), 34 deletions(-)
--
http://git-wip-us.apache.org/repos/asf/spark/blob/328b757d/core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala
--
diff --git
a/core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala
b/core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala
index ea718a0..b463a71 100644
--- a/core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala
+++ b/core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala
@@ -24,7 +24,8 @@ import scala.reflect.ClassTag
import org.apache.spark.SparkConf
import org.apache.spark.annotation.DeveloperApi
-import org.apache.spark.util.{ByteBufferInputStream, ByteBufferOutputStream,
Utils}
+import org.apache.spark.util.ByteBufferInputStream
+import org.apache.spark.util.Utils
private[spark] class JavaSerializationStream(
out: OutputStream, counterReset: Int, extraDebugInfo: Boolean)
@@ -95,11 +96,11 @@ private[spark] class JavaSerializerInstance(
extends SerializerInstance {
override def serialize[T: ClassTag](t: T): ByteBuffer = {
-val bos = new ByteBufferOutputStream()
+val bos = new ByteArrayOutputStream()
val out = serializeStream(bos)
out.writeObject(t)
out.close()
-bos.toByteBuffer
+ByteBuffer.wrap(bos.toByteArray)
}
override def deserialize[T: ClassTag](bytes: ByteBuffer): T = {
http://git-wip-us.apache.org/repos/asf/spark/blob/328b757d/core/src/main/scala/org/apache/spark/util/ByteBufferOutputStream.scala
--
diff --git
a/core/src/main/scala/org/apache/spark/util/ByteBufferOutputStream.scala
b/core/src/main/scala/org/apache/spark/util/ByteBufferOutputStream.scala
deleted file mode 100644
index 92e4522..000
--- a/core/src/main/scala/org/apache/spark/util/ByteBufferOutputStream.scala
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.util
-
-import java.io.ByteArrayOutputStream
-import java.nio.ByteBuffer
-
-/**
- * Provide a zero-copy way to convert data in ByteArrayOutputStream to
ByteBuffer
- */
-private[spark] class ByteBufferOutputStream extends ByteArrayOutputStream {
-
- def toByteBuffer: ByteBuffer = {
-return ByteBuffer.wrap(buf, 0, count)
- }
-}
-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org
Repository: spark
Updated Branches:
refs/heads/branch-1.6 add4e6311 -> 9b99b2b46
[SPARK-12060][CORE] Avoid memory copy in JavaSerializerInstance.serialize
`JavaSerializerInstance.serialize` uses `ByteArrayOutputStream.toByteArray` to
get the serialized data. `ByteArrayOutputStream.toByteArray` needs to copy the
content in the internal array to a new array. However, since the array will be
converted to `ByteBuffer` at once, we can avoid the memory copy.
This PR added `ByteBufferOutputStream` to access the protected `buf` and
convert it to a `ByteBuffer` directly.
Author: Shixiong Zhu <shixi...@databricks.com>
Closes #10051 from zsxwing/SPARK-12060.
(cherry picked from commit 1401166576c7018c5f9c31e0a6703d5fb16ea339)
Signed-off-by: Shixiong Zhu <shixi...@databricks.com>
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9b99b2b4
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9b99b2b4
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9b99b2b4
Branch: refs/heads/branch-1.6
Commit: 9b99b2b46c452ba396e922db5fc7eec02c45b158
Parents: add4e63
Author: Shixiong Zhu <shixi...@databricks.com>
Authored: Tue Dec 1 09:45:55 2015 -0800
Committer: Shixiong Zhu <shixi...@databricks.com>
Committed: Tue Dec 1 09:46:07 2015 -0800
--
.../spark/serializer/JavaSerializer.scala | 7 ++---
.../spark/util/ByteBufferOutputStream.scala | 31
2 files changed, 34 insertions(+), 4 deletions(-)
--
http://git-wip-us.apache.org/repos/asf/spark/blob/9b99b2b4/core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala
--
diff --git
a/core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala
b/core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala
index b463a71..ea718a0 100644
--- a/core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala
+++ b/core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala
@@ -24,8 +24,7 @@ import scala.reflect.ClassTag
import org.apache.spark.SparkConf
import org.apache.spark.annotation.DeveloperApi
-import org.apache.spark.util.ByteBufferInputStream
-import org.apache.spark.util.Utils
+import org.apache.spark.util.{ByteBufferInputStream, ByteBufferOutputStream,
Utils}
private[spark] class JavaSerializationStream(
out: OutputStream, counterReset: Int, extraDebugInfo: Boolean)
@@ -96,11 +95,11 @@ private[spark] class JavaSerializerInstance(
extends SerializerInstance {
override def serialize[T: ClassTag](t: T): ByteBuffer = {
-val bos = new ByteArrayOutputStream()
+val bos = new ByteBufferOutputStream()
val out = serializeStream(bos)
out.writeObject(t)
out.close()
-ByteBuffer.wrap(bos.toByteArray)
+bos.toByteBuffer
}
override def deserialize[T: ClassTag](bytes: ByteBuffer): T = {
http://git-wip-us.apache.org/repos/asf/spark/blob/9b99b2b4/core/src/main/scala/org/apache/spark/util/ByteBufferOutputStream.scala
--
diff --git
a/core/src/main/scala/org/apache/spark/util/ByteBufferOutputStream.scala
b/core/src/main/scala/org/apache/spark/util/ByteBufferOutputStream.scala
new file mode 100644
index 000..92e4522
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/util/ByteBufferOutputStream.scala
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util
+
+import java.io.ByteArrayOutputStream
+import java.nio.ByteBuffer
+
+/**
+ * Provide a zero-copy way to convert data in ByteArrayOutputStream to
ByteBuffer
+ */
+private[spark] class ByteBufferOutputStream extends ByteArrayOutputStream {
+
+ def toByteBuffer: ByteBuffer = {
+return ByteBuffer.wrap(buf, 0, count)
+ }
+}
-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org
Repository: spark
Updated Branches:
refs/heads/master c87531b76 -> 140116657
[SPARK-12060][CORE] Avoid memory copy in JavaSerializerInstance.serialize
`JavaSerializerInstance.serialize` uses `ByteArrayOutputStream.toByteArray` to
get the serialized data. `ByteArrayOutputStream.toByteArray` needs to copy the
content in the internal array to a new array. However, since the array will be
converted to `ByteBuffer` at once, we can avoid the memory copy.
This PR added `ByteBufferOutputStream` to access the protected `buf` and
convert it to a `ByteBuffer` directly.
Author: Shixiong Zhu <shixi...@databricks.com>
Closes #10051 from zsxwing/SPARK-12060.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/14011665
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/14011665
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/14011665
Branch: refs/heads/master
Commit: 1401166576c7018c5f9c31e0a6703d5fb16ea339
Parents: c87531b
Author: Shixiong Zhu <shixi...@databricks.com>
Authored: Tue Dec 1 09:45:55 2015 -0800
Committer: Shixiong Zhu <shixi...@databricks.com>
Committed: Tue Dec 1 09:45:55 2015 -0800
--
.../spark/serializer/JavaSerializer.scala | 7 ++---
.../spark/util/ByteBufferOutputStream.scala | 31
2 files changed, 34 insertions(+), 4 deletions(-)
--
http://git-wip-us.apache.org/repos/asf/spark/blob/14011665/core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala
--
diff --git
a/core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala
b/core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala
index b463a71..ea718a0 100644
--- a/core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala
+++ b/core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala
@@ -24,8 +24,7 @@ import scala.reflect.ClassTag
import org.apache.spark.SparkConf
import org.apache.spark.annotation.DeveloperApi
-import org.apache.spark.util.ByteBufferInputStream
-import org.apache.spark.util.Utils
+import org.apache.spark.util.{ByteBufferInputStream, ByteBufferOutputStream,
Utils}
private[spark] class JavaSerializationStream(
out: OutputStream, counterReset: Int, extraDebugInfo: Boolean)
@@ -96,11 +95,11 @@ private[spark] class JavaSerializerInstance(
extends SerializerInstance {
override def serialize[T: ClassTag](t: T): ByteBuffer = {
-val bos = new ByteArrayOutputStream()
+val bos = new ByteBufferOutputStream()
val out = serializeStream(bos)
out.writeObject(t)
out.close()
-ByteBuffer.wrap(bos.toByteArray)
+bos.toByteBuffer
}
override def deserialize[T: ClassTag](bytes: ByteBuffer): T = {
http://git-wip-us.apache.org/repos/asf/spark/blob/14011665/core/src/main/scala/org/apache/spark/util/ByteBufferOutputStream.scala
--
diff --git
a/core/src/main/scala/org/apache/spark/util/ByteBufferOutputStream.scala
b/core/src/main/scala/org/apache/spark/util/ByteBufferOutputStream.scala
new file mode 100644
index 000..92e4522
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/util/ByteBufferOutputStream.scala
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util
+
+import java.io.ByteArrayOutputStream
+import java.nio.ByteBuffer
+
+/**
+ * Provide a zero-copy way to convert data in ByteArrayOutputStream to
ByteBuffer
+ */
+private[spark] class ByteBufferOutputStream extends ByteArrayOutputStream {
+
+ def toByteBuffer: ByteBuffer = {
+return ByteBuffer.wrap(buf, 0, count)
+ }
+}
-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org
Repository: spark
Updated Branches:
refs/heads/master 96691feae -> 8a75a3049
[SPARK-12087][STREAMING] Create new JobConf for every batch in saveAsHadoopFiles
The JobConf object created in `DStream.saveAsHadoopFiles` is used concurrently
in multiple places:
* The JobConf is updated by `RDD.saveAsHadoopFile()` before the job is launched
* The JobConf is serialized as part of the DStream checkpoints.
These concurrent accesses (updating in one thread, while the another thread is
serializing it) can lead to concurrentModidicationException in the underlying
Java hashmap using in the internal Hadoop Configuration object.
The solution is to create a new JobConf in every batch, that is updated by
`RDD.saveAsHadoopFile()`, while the checkpointing serializes the original
JobConf.
Tests to be added in #9988 will fail reliably without this patch. Keeping this
patch really small to make sure that it can be added to previous branches.
Author: Tathagata Das
Closes #10088 from tdas/SPARK-12087.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8a75a304
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8a75a304
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8a75a304
Branch: refs/heads/master
Commit: 8a75a3049539eeef04c0db51736e97070c162b46
Parents: 96691fe
Author: Tathagata Das
Authored: Tue Dec 1 21:04:52 2015 -0800
Committer: Shixiong Zhu
Committed: Tue Dec 1 21:04:52 2015 -0800
--
.../org/apache/spark/streaming/dstream/PairDStreamFunctions.scala | 3 ++-
1 file changed, 2 insertions(+), 1 deletion(-)
--
http://git-wip-us.apache.org/repos/asf/spark/blob/8a75a304/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala
--
diff --git
a/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala
b/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala
index fb691ee..2762309 100644
---
a/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala
+++
b/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala
@@ -730,7 +730,8 @@ class PairDStreamFunctions[K, V](self: DStream[(K, V)])
val serializableConf = new SerializableJobConf(conf)
val saveFunc = (rdd: RDD[(K, V)], time: Time) => {
val file = rddToFileName(prefix, suffix, time)
- rdd.saveAsHadoopFile(file, keyClass, valueClass, outputFormatClass,
serializableConf.value)
+ rdd.saveAsHadoopFile(file, keyClass, valueClass, outputFormatClass,
+new JobConf(serializableConf.value))
}
self.foreachRDD(saveFunc)
}
-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org
Repository: spark
Updated Branches:
refs/heads/branch-1.5 4f07a590c -> 0d57a4ae1
[SPARK-12087][STREAMING] Create new JobConf for every batch in saveAsHadoopFiles
The JobConf object created in `DStream.saveAsHadoopFiles` is used concurrently
in multiple places:
* The JobConf is updated by `RDD.saveAsHadoopFile()` before the job is launched
* The JobConf is serialized as part of the DStream checkpoints.
These concurrent accesses (updating in one thread, while the another thread is
serializing it) can lead to concurrentModidicationException in the underlying
Java hashmap using in the internal Hadoop Configuration object.
The solution is to create a new JobConf in every batch, that is updated by
`RDD.saveAsHadoopFile()`, while the checkpointing serializes the original
JobConf.
Tests to be added in #9988 will fail reliably without this patch. Keeping this
patch really small to make sure that it can be added to previous branches.
Author: Tathagata Das
Closes #10088 from tdas/SPARK-12087.
(cherry picked from commit 8a75a3049539eeef04c0db51736e97070c162b46)
Signed-off-by: Shixiong Zhu
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0d57a4ae
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0d57a4ae
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0d57a4ae
Branch: refs/heads/branch-1.5
Commit: 0d57a4ae10f4ec40386194bc3c8e27f32da09d4d
Parents: 4f07a59
Author: Tathagata Das
Authored: Tue Dec 1 21:04:52 2015 -0800
Committer: Shixiong Zhu
Committed: Tue Dec 1 21:05:18 2015 -0800
--
.../org/apache/spark/streaming/dstream/PairDStreamFunctions.scala | 3 ++-
1 file changed, 2 insertions(+), 1 deletion(-)
--
http://git-wip-us.apache.org/repos/asf/spark/blob/0d57a4ae/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala
--
diff --git
a/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala
b/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala
index 71bec96..aa36997 100644
---
a/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala
+++
b/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala
@@ -692,7 +692,8 @@ class PairDStreamFunctions[K, V](self: DStream[(K, V)])
val serializableConf = new SerializableJobConf(conf)
val saveFunc = (rdd: RDD[(K, V)], time: Time) => {
val file = rddToFileName(prefix, suffix, time)
- rdd.saveAsHadoopFile(file, keyClass, valueClass, outputFormatClass,
serializableConf.value)
+ rdd.saveAsHadoopFile(file, keyClass, valueClass, outputFormatClass,
+new JobConf(serializableConf.value))
}
self.foreachRDD(saveFunc)
}
-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org
Repository: spark
Updated Branches:
refs/heads/branch-1.6 a5743affc -> 1f42295b5
[SPARK-12087][STREAMING] Create new JobConf for every batch in saveAsHadoopFiles
The JobConf object created in `DStream.saveAsHadoopFiles` is used concurrently
in multiple places:
* The JobConf is updated by `RDD.saveAsHadoopFile()` before the job is launched
* The JobConf is serialized as part of the DStream checkpoints.
These concurrent accesses (updating in one thread, while the another thread is
serializing it) can lead to concurrentModidicationException in the underlying
Java hashmap using in the internal Hadoop Configuration object.
The solution is to create a new JobConf in every batch, that is updated by
`RDD.saveAsHadoopFile()`, while the checkpointing serializes the original
JobConf.
Tests to be added in #9988 will fail reliably without this patch. Keeping this
patch really small to make sure that it can be added to previous branches.
Author: Tathagata Das
Closes #10088 from tdas/SPARK-12087.
(cherry picked from commit 8a75a3049539eeef04c0db51736e97070c162b46)
Signed-off-by: Shixiong Zhu
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1f42295b
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1f42295b
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1f42295b
Branch: refs/heads/branch-1.6
Commit: 1f42295b5df69a6039ed2ba8ea67a8e57d77644d
Parents: a5743af
Author: Tathagata Das
Authored: Tue Dec 1 21:04:52 2015 -0800
Committer: Shixiong Zhu
Committed: Tue Dec 1 21:05:02 2015 -0800
--
.../org/apache/spark/streaming/dstream/PairDStreamFunctions.scala | 3 ++-
1 file changed, 2 insertions(+), 1 deletion(-)
--
http://git-wip-us.apache.org/repos/asf/spark/blob/1f42295b/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala
--
diff --git
a/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala
b/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala
index fb691ee..2762309 100644
---
a/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala
+++
b/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala
@@ -730,7 +730,8 @@ class PairDStreamFunctions[K, V](self: DStream[(K, V)])
val serializableConf = new SerializableJobConf(conf)
val saveFunc = (rdd: RDD[(K, V)], time: Time) => {
val file = rddToFileName(prefix, suffix, time)
- rdd.saveAsHadoopFile(file, keyClass, valueClass, outputFormatClass,
serializableConf.value)
+ rdd.saveAsHadoopFile(file, keyClass, valueClass, outputFormatClass,
+new JobConf(serializableConf.value))
}
self.foreachRDD(saveFunc)
}
-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org
Repository: spark
Updated Branches:
refs/heads/branch-1.4 f5af299ab -> b6ba2dab2
[SPARK-12087][STREAMING] Create new JobConf for every batch in saveAsHadoopFiles
The JobConf object created in `DStream.saveAsHadoopFiles` is used concurrently
in multiple places:
* The JobConf is updated by `RDD.saveAsHadoopFile()` before the job is launched
* The JobConf is serialized as part of the DStream checkpoints.
These concurrent accesses (updating in one thread, while the another thread is
serializing it) can lead to concurrentModidicationException in the underlying
Java hashmap using in the internal Hadoop Configuration object.
The solution is to create a new JobConf in every batch, that is updated by
`RDD.saveAsHadoopFile()`, while the checkpointing serializes the original
JobConf.
Tests to be added in #9988 will fail reliably without this patch. Keeping this
patch really small to make sure that it can be added to previous branches.
Author: Tathagata Das
Closes #10088 from tdas/SPARK-12087.
(cherry picked from commit 8a75a3049539eeef04c0db51736e97070c162b46)
Signed-off-by: Shixiong Zhu
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b6ba2dab
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b6ba2dab
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b6ba2dab
Branch: refs/heads/branch-1.4
Commit: b6ba2dab26092f56271114aa62f25b2fc9d6adad
Parents: f5af299
Author: Tathagata Das
Authored: Tue Dec 1 21:04:52 2015 -0800
Committer: Shixiong Zhu
Committed: Tue Dec 1 21:05:37 2015 -0800
--
.../org/apache/spark/streaming/dstream/PairDStreamFunctions.scala | 3 ++-
1 file changed, 2 insertions(+), 1 deletion(-)
--
http://git-wip-us.apache.org/repos/asf/spark/blob/b6ba2dab/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala
--
diff --git
a/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala
b/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala
index 358e4c6..4e392f5 100644
---
a/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala
+++
b/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala
@@ -691,7 +691,8 @@ class PairDStreamFunctions[K, V](self: DStream[(K, V)])
val serializableConf = new SerializableWritable(conf)
val saveFunc = (rdd: RDD[(K, V)], time: Time) => {
val file = rddToFileName(prefix, suffix, time)
- rdd.saveAsHadoopFile(file, keyClass, valueClass, outputFormatClass,
serializableConf.value)
+ rdd.saveAsHadoopFile(file, keyClass, valueClass, outputFormatClass,
+new JobConf(serializableConf.value))
}
self.foreachRDD(saveFunc)
}
-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org
Repository: spark
Updated Branches:
refs/heads/branch-1.6 dfc98fac9 -> 996635793
[SPARK-12021][STREAMING][TESTS] Fix the potential dead-lock in
StreamingListenerSuite
In StreamingListenerSuite."don't call ssc.stop in listener", after the main
thread calls `ssc.stop()`, `StreamingContextStoppingCollector` may call
`ssc.stop()` in the listener bus thread, which is a dead-lock. This PR updated
`StreamingContextStoppingCollector` to only call `ssc.stop()` in the first
batch to avoid the dead-lock.
Author: Shixiong Zhu <shixi...@databricks.com>
Closes #10011 from zsxwing/fix-test-deadlock.
(cherry picked from commit f57e6c9effdb9e282fc8ae66dc30fe053fed5272)
Signed-off-by: Shixiong Zhu <shixi...@databricks.com>
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/99663579
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/99663579
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/99663579
Branch: refs/heads/branch-1.6
Commit: 9966357932a50aa22f94f39201559beb8c0c6efb
Parents: dfc98fa
Author: Shixiong Zhu <shixi...@databricks.com>
Authored: Fri Nov 27 11:50:18 2015 -0800
Committer: Shixiong Zhu <shixi...@databricks.com>
Committed: Fri Nov 27 11:50:28 2015 -0800
--
.../streaming/StreamingListenerSuite.scala | 25 +++-
1 file changed, 19 insertions(+), 6 deletions(-)
--
http://git-wip-us.apache.org/repos/asf/spark/blob/99663579/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala
--
diff --git
a/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala
b/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala
index df4575a..04cd5bd 100644
---
a/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala
+++
b/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala
@@ -222,7 +222,11 @@ class StreamingListenerSuite extends TestSuiteBase with
Matchers {
val batchCounter = new BatchCounter(_ssc)
_ssc.start()
// Make sure running at least one batch
-batchCounter.waitUntilBatchesCompleted(expectedNumCompletedBatches = 1,
timeout = 1)
+if (!batchCounter.waitUntilBatchesCompleted(expectedNumCompletedBatches =
1, timeout = 1)) {
+ fail("The first batch cannot complete in 10 seconds")
+}
+// When reaching here, we can make sure
`StreamingContextStoppingCollector` won't call
+// `ssc.stop()`, so it's safe to call `_ssc.stop()` now.
_ssc.stop()
assert(contextStoppingCollector.sparkExSeen)
}
@@ -345,12 +349,21 @@ class FailureReasonsCollector extends StreamingListener {
*/
class StreamingContextStoppingCollector(val ssc: StreamingContext) extends
StreamingListener {
@volatile var sparkExSeen = false
+
+ private var isFirstBatch = true
+
override def onBatchCompleted(batchCompleted:
StreamingListenerBatchCompleted) {
-try {
- ssc.stop()
-} catch {
- case se: SparkException =>
-sparkExSeen = true
+if (isFirstBatch) {
+ // We should only call `ssc.stop()` in the first batch. Otherwise, it's
possible that the main
+ // thread is calling `ssc.stop()`, while
StreamingContextStoppingCollector is also calling
+ // `ssc.stop()` in the listener thread, which becomes a dead-lock.
+ isFirstBatch = false
+ try {
+ssc.stop()
+ } catch {
+case se: SparkException =>
+ sparkExSeen = true
+ }
}
}
}
-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org
Repository: spark
Updated Branches:
refs/heads/master ba02f6cb5 -> f57e6c9ef
[SPARK-12021][STREAMING][TESTS] Fix the potential dead-lock in
StreamingListenerSuite
In StreamingListenerSuite."don't call ssc.stop in listener", after the main
thread calls `ssc.stop()`, `StreamingContextStoppingCollector` may call
`ssc.stop()` in the listener bus thread, which is a dead-lock. This PR updated
`StreamingContextStoppingCollector` to only call `ssc.stop()` in the first
batch to avoid the dead-lock.
Author: Shixiong Zhu <shixi...@databricks.com>
Closes #10011 from zsxwing/fix-test-deadlock.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f57e6c9e
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f57e6c9e
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f57e6c9e
Branch: refs/heads/master
Commit: f57e6c9effdb9e282fc8ae66dc30fe053fed5272
Parents: ba02f6c
Author: Shixiong Zhu <shixi...@databricks.com>
Authored: Fri Nov 27 11:50:18 2015 -0800
Committer: Shixiong Zhu <shixi...@databricks.com>
Committed: Fri Nov 27 11:50:18 2015 -0800
--
.../streaming/StreamingListenerSuite.scala | 25 +++-
1 file changed, 19 insertions(+), 6 deletions(-)
--
http://git-wip-us.apache.org/repos/asf/spark/blob/f57e6c9e/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala
--
diff --git
a/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala
b/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala
index df4575a..04cd5bd 100644
---
a/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala
+++
b/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala
@@ -222,7 +222,11 @@ class StreamingListenerSuite extends TestSuiteBase with
Matchers {
val batchCounter = new BatchCounter(_ssc)
_ssc.start()
// Make sure running at least one batch
-batchCounter.waitUntilBatchesCompleted(expectedNumCompletedBatches = 1,
timeout = 1)
+if (!batchCounter.waitUntilBatchesCompleted(expectedNumCompletedBatches =
1, timeout = 1)) {
+ fail("The first batch cannot complete in 10 seconds")
+}
+// When reaching here, we can make sure
`StreamingContextStoppingCollector` won't call
+// `ssc.stop()`, so it's safe to call `_ssc.stop()` now.
_ssc.stop()
assert(contextStoppingCollector.sparkExSeen)
}
@@ -345,12 +349,21 @@ class FailureReasonsCollector extends StreamingListener {
*/
class StreamingContextStoppingCollector(val ssc: StreamingContext) extends
StreamingListener {
@volatile var sparkExSeen = false
+
+ private var isFirstBatch = true
+
override def onBatchCompleted(batchCompleted:
StreamingListenerBatchCompleted) {
-try {
- ssc.stop()
-} catch {
- case se: SparkException =>
-sparkExSeen = true
+if (isFirstBatch) {
+ // We should only call `ssc.stop()` in the first batch. Otherwise, it's
possible that the main
+ // thread is calling `ssc.stop()`, while
StreamingContextStoppingCollector is also calling
+ // `ssc.stop()` in the listener thread, which becomes a dead-lock.
+ isFirstBatch = false
+ try {
+ssc.stop()
+ } catch {
+case se: SparkException =>
+ sparkExSeen = true
+ }
}
}
}
-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org
Repository: spark
Updated Branches:
refs/heads/branch-1.5 b1fcefca6 -> 7900d192e
[SPARK-11999][CORE] Fix the issue that ThreadUtils.newDaemonCachedThreadPool
doesn't cache any task
In the previous codes, `newDaemonCachedThreadPool` uses `SynchronousQueue`,
which is wrong. `SynchronousQueue` is an empty queue that cannot cache any
task. This patch uses `LinkedBlockingQueue` to fix it along with other fixes to
make sure `newDaemonCachedThreadPool` can use at most `maxThreadNumber`
threads, and after that, cache tasks to `LinkedBlockingQueue`.
Author: Shixiong Zhu <shixi...@databricks.com>
Closes #9978 from zsxwing/cached-threadpool.
(cherry picked from commit d3ef693325f91a1ed340c9756c81244a80398eb2)
Signed-off-by: Shixiong Zhu <shixi...@databricks.com>
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7900d192
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7900d192
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7900d192
Branch: refs/heads/branch-1.5
Commit: 7900d192e8adf501fbed0d0704d60d2c0e63a764
Parents: b1fcefc
Author: Shixiong Zhu <shixi...@databricks.com>
Authored: Wed Nov 25 23:31:21 2015 -0800
Committer: Shixiong Zhu <shixi...@databricks.com>
Committed: Wed Nov 25 23:31:53 2015 -0800
--
.../org/apache/spark/util/ThreadUtils.scala | 14 --
.../apache/spark/util/ThreadUtilsSuite.scala| 45
2 files changed, 56 insertions(+), 3 deletions(-)
--
http://git-wip-us.apache.org/repos/asf/spark/blob/7900d192/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala
--
diff --git a/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala
b/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala
index 06976f8..3159ef7 100644
--- a/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala
+++ b/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala
@@ -57,10 +57,18 @@ private[spark] object ThreadUtils {
* Create a cached thread pool whose max number of threads is
`maxThreadNumber`. Thread names
* are formatted as prefix-ID, where ID is a unique, sequentially assigned
integer.
*/
- def newDaemonCachedThreadPool(prefix: String, maxThreadNumber: Int):
ThreadPoolExecutor = {
+ def newDaemonCachedThreadPool(
+ prefix: String, maxThreadNumber: Int, keepAliveSeconds: Int = 60):
ThreadPoolExecutor = {
val threadFactory = namedThreadFactory(prefix)
-new ThreadPoolExecutor(
- 0, maxThreadNumber, 60L, TimeUnit.SECONDS, new
SynchronousQueue[Runnable], threadFactory)
+val threadPool = new ThreadPoolExecutor(
+ maxThreadNumber, // corePoolSize: the max number of threads to create
before queuing the tasks
+ maxThreadNumber, // maximumPoolSize: because we use LinkedBlockingDeque,
this one is not used
+ keepAliveSeconds,
+ TimeUnit.SECONDS,
+ new LinkedBlockingQueue[Runnable],
+ threadFactory)
+threadPool.allowCoreThreadTimeOut(true)
+threadPool
}
/**
http://git-wip-us.apache.org/repos/asf/spark/blob/7900d192/core/src/test/scala/org/apache/spark/util/ThreadUtilsSuite.scala
--
diff --git a/core/src/test/scala/org/apache/spark/util/ThreadUtilsSuite.scala
b/core/src/test/scala/org/apache/spark/util/ThreadUtilsSuite.scala
index 620e4de..92ae038 100644
--- a/core/src/test/scala/org/apache/spark/util/ThreadUtilsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/ThreadUtilsSuite.scala
@@ -24,6 +24,8 @@ import scala.concurrent.duration._
import scala.concurrent.{Await, Future}
import scala.util.Random
+import org.scalatest.concurrent.Eventually._
+
import org.apache.spark.SparkFunSuite
class ThreadUtilsSuite extends SparkFunSuite {
@@ -59,6 +61,49 @@ class ThreadUtilsSuite extends SparkFunSuite {
}
}
+ test("newDaemonCachedThreadPool") {
+val maxThreadNumber = 10
+val startThreadsLatch = new CountDownLatch(maxThreadNumber)
+val latch = new CountDownLatch(1)
+val cachedThreadPool = ThreadUtils.newDaemonCachedThreadPool(
+ "ThreadUtilsSuite-newDaemonCachedThreadPool",
+ maxThreadNumber,
+ keepAliveSeconds = 2)
+try {
+ for (_ <- 1 to maxThreadNumber) {
+cachedThreadPool.execute(new Runnable {
+ override def run(): Unit = {
+startThreadsLatch.countDown()
+latch.await(10, TimeUnit.SECONDS)
+ }
+})
+ }
+ startThreadsLatch.await(10, TimeUnit.SECONDS)
+ assert(cachedThreadPool.getActiveCount === maxThreadNumber)
+ assert(cachedThreadPool.getQueue.size === 0)
+
+ // Submit a new task and it should be put into the queue since t
Repository: spark
Updated Branches:
refs/heads/branch-1.6 7e7f2627f -> 0df6beccc
[SPARK-11999][CORE] Fix the issue that ThreadUtils.newDaemonCachedThreadPool
doesn't cache any task
In the previous codes, `newDaemonCachedThreadPool` uses `SynchronousQueue`,
which is wrong. `SynchronousQueue` is an empty queue that cannot cache any
task. This patch uses `LinkedBlockingQueue` to fix it along with other fixes to
make sure `newDaemonCachedThreadPool` can use at most `maxThreadNumber`
threads, and after that, cache tasks to `LinkedBlockingQueue`.
Author: Shixiong Zhu <shixi...@databricks.com>
Closes #9978 from zsxwing/cached-threadpool.
(cherry picked from commit d3ef693325f91a1ed340c9756c81244a80398eb2)
Signed-off-by: Shixiong Zhu <shixi...@databricks.com>
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0df6becc
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0df6becc
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0df6becc
Branch: refs/heads/branch-1.6
Commit: 0df6beccc84166a00c7c98929bf487d9cea68e1d
Parents: 7e7f262
Author: Shixiong Zhu <shixi...@databricks.com>
Authored: Wed Nov 25 23:31:21 2015 -0800
Committer: Shixiong Zhu <shixi...@databricks.com>
Committed: Wed Nov 25 23:31:36 2015 -0800
--
.../org/apache/spark/util/ThreadUtils.scala | 14 --
.../apache/spark/util/ThreadUtilsSuite.scala| 45
2 files changed, 56 insertions(+), 3 deletions(-)
--
http://git-wip-us.apache.org/repos/asf/spark/blob/0df6becc/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala
--
diff --git a/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala
b/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala
index 5328344..f9fbe2f 100644
--- a/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala
+++ b/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala
@@ -56,10 +56,18 @@ private[spark] object ThreadUtils {
* Create a cached thread pool whose max number of threads is
`maxThreadNumber`. Thread names
* are formatted as prefix-ID, where ID is a unique, sequentially assigned
integer.
*/
- def newDaemonCachedThreadPool(prefix: String, maxThreadNumber: Int):
ThreadPoolExecutor = {
+ def newDaemonCachedThreadPool(
+ prefix: String, maxThreadNumber: Int, keepAliveSeconds: Int = 60):
ThreadPoolExecutor = {
val threadFactory = namedThreadFactory(prefix)
-new ThreadPoolExecutor(
- 0, maxThreadNumber, 60L, TimeUnit.SECONDS, new
SynchronousQueue[Runnable], threadFactory)
+val threadPool = new ThreadPoolExecutor(
+ maxThreadNumber, // corePoolSize: the max number of threads to create
before queuing the tasks
+ maxThreadNumber, // maximumPoolSize: because we use LinkedBlockingDeque,
this one is not used
+ keepAliveSeconds,
+ TimeUnit.SECONDS,
+ new LinkedBlockingQueue[Runnable],
+ threadFactory)
+threadPool.allowCoreThreadTimeOut(true)
+threadPool
}
/**
http://git-wip-us.apache.org/repos/asf/spark/blob/0df6becc/core/src/test/scala/org/apache/spark/util/ThreadUtilsSuite.scala
--
diff --git a/core/src/test/scala/org/apache/spark/util/ThreadUtilsSuite.scala
b/core/src/test/scala/org/apache/spark/util/ThreadUtilsSuite.scala
index 620e4de..92ae038 100644
--- a/core/src/test/scala/org/apache/spark/util/ThreadUtilsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/ThreadUtilsSuite.scala
@@ -24,6 +24,8 @@ import scala.concurrent.duration._
import scala.concurrent.{Await, Future}
import scala.util.Random
+import org.scalatest.concurrent.Eventually._
+
import org.apache.spark.SparkFunSuite
class ThreadUtilsSuite extends SparkFunSuite {
@@ -59,6 +61,49 @@ class ThreadUtilsSuite extends SparkFunSuite {
}
}
+ test("newDaemonCachedThreadPool") {
+val maxThreadNumber = 10
+val startThreadsLatch = new CountDownLatch(maxThreadNumber)
+val latch = new CountDownLatch(1)
+val cachedThreadPool = ThreadUtils.newDaemonCachedThreadPool(
+ "ThreadUtilsSuite-newDaemonCachedThreadPool",
+ maxThreadNumber,
+ keepAliveSeconds = 2)
+try {
+ for (_ <- 1 to maxThreadNumber) {
+cachedThreadPool.execute(new Runnable {
+ override def run(): Unit = {
+startThreadsLatch.countDown()
+latch.await(10, TimeUnit.SECONDS)
+ }
+})
+ }
+ startThreadsLatch.await(10, TimeUnit.SECONDS)
+ assert(cachedThreadPool.getActiveCount === maxThreadNumber)
+ assert(cachedThreadPool.getQueue.size === 0)
+
+ // Submit a new task and it should be put into the queue since t
Repository: spark
Updated Branches:
refs/heads/master 068b6438d -> d3ef69332
[SPARK-11999][CORE] Fix the issue that ThreadUtils.newDaemonCachedThreadPool
doesn't cache any task
In the previous codes, `newDaemonCachedThreadPool` uses `SynchronousQueue`,
which is wrong. `SynchronousQueue` is an empty queue that cannot cache any
task. This patch uses `LinkedBlockingQueue` to fix it along with other fixes to
make sure `newDaemonCachedThreadPool` can use at most `maxThreadNumber`
threads, and after that, cache tasks to `LinkedBlockingQueue`.
Author: Shixiong Zhu <shixi...@databricks.com>
Closes #9978 from zsxwing/cached-threadpool.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d3ef6933
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d3ef6933
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d3ef6933
Branch: refs/heads/master
Commit: d3ef693325f91a1ed340c9756c81244a80398eb2
Parents: 068b643
Author: Shixiong Zhu <shixi...@databricks.com>
Authored: Wed Nov 25 23:31:21 2015 -0800
Committer: Shixiong Zhu <shixi...@databricks.com>
Committed: Wed Nov 25 23:31:21 2015 -0800
--
.../org/apache/spark/util/ThreadUtils.scala | 14 --
.../apache/spark/util/ThreadUtilsSuite.scala| 45
2 files changed, 56 insertions(+), 3 deletions(-)
--
http://git-wip-us.apache.org/repos/asf/spark/blob/d3ef6933/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala
--
diff --git a/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala
b/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala
index 5328344..f9fbe2f 100644
--- a/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala
+++ b/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala
@@ -56,10 +56,18 @@ private[spark] object ThreadUtils {
* Create a cached thread pool whose max number of threads is
`maxThreadNumber`. Thread names
* are formatted as prefix-ID, where ID is a unique, sequentially assigned
integer.
*/
- def newDaemonCachedThreadPool(prefix: String, maxThreadNumber: Int):
ThreadPoolExecutor = {
+ def newDaemonCachedThreadPool(
+ prefix: String, maxThreadNumber: Int, keepAliveSeconds: Int = 60):
ThreadPoolExecutor = {
val threadFactory = namedThreadFactory(prefix)
-new ThreadPoolExecutor(
- 0, maxThreadNumber, 60L, TimeUnit.SECONDS, new
SynchronousQueue[Runnable], threadFactory)
+val threadPool = new ThreadPoolExecutor(
+ maxThreadNumber, // corePoolSize: the max number of threads to create
before queuing the tasks
+ maxThreadNumber, // maximumPoolSize: because we use LinkedBlockingDeque,
this one is not used
+ keepAliveSeconds,
+ TimeUnit.SECONDS,
+ new LinkedBlockingQueue[Runnable],
+ threadFactory)
+threadPool.allowCoreThreadTimeOut(true)
+threadPool
}
/**
http://git-wip-us.apache.org/repos/asf/spark/blob/d3ef6933/core/src/test/scala/org/apache/spark/util/ThreadUtilsSuite.scala
--
diff --git a/core/src/test/scala/org/apache/spark/util/ThreadUtilsSuite.scala
b/core/src/test/scala/org/apache/spark/util/ThreadUtilsSuite.scala
index 620e4de..92ae038 100644
--- a/core/src/test/scala/org/apache/spark/util/ThreadUtilsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/ThreadUtilsSuite.scala
@@ -24,6 +24,8 @@ import scala.concurrent.duration._
import scala.concurrent.{Await, Future}
import scala.util.Random
+import org.scalatest.concurrent.Eventually._
+
import org.apache.spark.SparkFunSuite
class ThreadUtilsSuite extends SparkFunSuite {
@@ -59,6 +61,49 @@ class ThreadUtilsSuite extends SparkFunSuite {
}
}
+ test("newDaemonCachedThreadPool") {
+val maxThreadNumber = 10
+val startThreadsLatch = new CountDownLatch(maxThreadNumber)
+val latch = new CountDownLatch(1)
+val cachedThreadPool = ThreadUtils.newDaemonCachedThreadPool(
+ "ThreadUtilsSuite-newDaemonCachedThreadPool",
+ maxThreadNumber,
+ keepAliveSeconds = 2)
+try {
+ for (_ <- 1 to maxThreadNumber) {
+cachedThreadPool.execute(new Runnable {
+ override def run(): Unit = {
+startThreadsLatch.countDown()
+latch.await(10, TimeUnit.SECONDS)
+ }
+})
+ }
+ startThreadsLatch.await(10, TimeUnit.SECONDS)
+ assert(cachedThreadPool.getActiveCount === maxThreadNumber)
+ assert(cachedThreadPool.getQueue.size === 0)
+
+ // Submit a new task and it should be put into the queue since the
thread number reaches the
+ // limitation
+ cachedThreadPool.execute(new Runnable {
+override def run(): Unit = {
Repository: spark
Updated Branches:
refs/heads/master 4d6bbbc03 -> a5d988763
[STREAMING][FLAKY-TEST] Catch execution context race condition in
`FileBasedWriteAheadLog.close()`
There is a race condition in `FileBasedWriteAheadLog.close()`, where if
delete's of old log files are in progress, the write ahead log may close, and
result in a `RejectedExecutionException`. This is okay, and should be handled
gracefully.
Example test failures:
https://amplab.cs.berkeley.edu/jenkins/job/Spark-1.6-SBT/AMPLAB_JENKINS_BUILD_PROFILE=hadoop1.0,label=spark-test/95/testReport/junit/org.apache.spark.streaming.util/BatchedWriteAheadLogWithCloseFileAfterWriteSuite/BatchedWriteAheadLog___clean_old_logs/
The reason the test fails is in `afterEach`, `writeAheadLog.close` is called,
and there may still be async deletes in flight.
tdas zsxwing
Author: Burak Yavuz <brk...@gmail.com>
Closes #9953 from brkyvz/flaky-ss.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a5d98876
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a5d98876
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a5d98876
Branch: refs/heads/master
Commit: a5d988763319f63a8e2b58673dd4f9098f17c835
Parents: 4d6bbbc
Author: Burak Yavuz <brk...@gmail.com>
Authored: Tue Nov 24 20:58:47 2015 -0800
Committer: Shixiong Zhu <shixi...@databricks.com>
Committed: Tue Nov 24 20:58:47 2015 -0800
--
.../streaming/util/FileBasedWriteAheadLog.scala | 16 +++-
1 file changed, 11 insertions(+), 5 deletions(-)
--
http://git-wip-us.apache.org/repos/asf/spark/blob/a5d98876/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala
--
diff --git
a/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala
b/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala
index 72705f1..f5165f7 100644
---
a/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala
+++
b/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala
@@ -17,7 +17,7 @@
package org.apache.spark.streaming.util
import java.nio.ByteBuffer
-import java.util.concurrent.ThreadPoolExecutor
+import java.util.concurrent.{RejectedExecutionException, ThreadPoolExecutor}
import java.util.{Iterator => JIterator}
import scala.collection.JavaConverters._
@@ -176,10 +176,16 @@ private[streaming] class FileBasedWriteAheadLog(
}
oldLogFiles.foreach { logInfo =>
if (!executionContext.isShutdown) {
-val f = Future { deleteFile(logInfo) }(executionContext)
-if (waitForCompletion) {
- import scala.concurrent.duration._
- Await.ready(f, 1 second)
+try {
+ val f = Future { deleteFile(logInfo) }(executionContext)
+ if (waitForCompletion) {
+import scala.concurrent.duration._
+Await.ready(f, 1 second)
+ }
+} catch {
+ case e: RejectedExecutionException =>
+logWarning("Execution context shutdown before deleting old
WriteAheadLogs. " +
+ "This would not affect recovery correctness.", e)
}
}
}
-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org
Repository: spark
Updated Branches:
refs/heads/branch-1.6 862d788fc -> b18112666
[STREAMING][FLAKY-TEST] Catch execution context race condition in
`FileBasedWriteAheadLog.close()`
There is a race condition in `FileBasedWriteAheadLog.close()`, where if
delete's of old log files are in progress, the write ahead log may close, and
result in a `RejectedExecutionException`. This is okay, and should be handled
gracefully.
Example test failures:
https://amplab.cs.berkeley.edu/jenkins/job/Spark-1.6-SBT/AMPLAB_JENKINS_BUILD_PROFILE=hadoop1.0,label=spark-test/95/testReport/junit/org.apache.spark.streaming.util/BatchedWriteAheadLogWithCloseFileAfterWriteSuite/BatchedWriteAheadLog___clean_old_logs/
The reason the test fails is in `afterEach`, `writeAheadLog.close` is called,
and there may still be async deletes in flight.
tdas zsxwing
Author: Burak Yavuz <brk...@gmail.com>
Closes #9953 from brkyvz/flaky-ss.
(cherry picked from commit a5d988763319f63a8e2b58673dd4f9098f17c835)
Signed-off-by: Shixiong Zhu <shixi...@databricks.com>
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b1811266
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b1811266
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b1811266
Branch: refs/heads/branch-1.6
Commit: b18112666adec0a942d1cfe8d6b9f1e7c7201fcd
Parents: 862d788
Author: Burak Yavuz <brk...@gmail.com>
Authored: Tue Nov 24 20:58:47 2015 -0800
Committer: Shixiong Zhu <shixi...@databricks.com>
Committed: Tue Nov 24 20:59:01 2015 -0800
--
.../streaming/util/FileBasedWriteAheadLog.scala | 16 +++-
1 file changed, 11 insertions(+), 5 deletions(-)
--
http://git-wip-us.apache.org/repos/asf/spark/blob/b1811266/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala
--
diff --git
a/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala
b/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala
index 72705f1..f5165f7 100644
---
a/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala
+++
b/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala
@@ -17,7 +17,7 @@
package org.apache.spark.streaming.util
import java.nio.ByteBuffer
-import java.util.concurrent.ThreadPoolExecutor
+import java.util.concurrent.{RejectedExecutionException, ThreadPoolExecutor}
import java.util.{Iterator => JIterator}
import scala.collection.JavaConverters._
@@ -176,10 +176,16 @@ private[streaming] class FileBasedWriteAheadLog(
}
oldLogFiles.foreach { logInfo =>
if (!executionContext.isShutdown) {
-val f = Future { deleteFile(logInfo) }(executionContext)
-if (waitForCompletion) {
- import scala.concurrent.duration._
- Await.ready(f, 1 second)
+try {
+ val f = Future { deleteFile(logInfo) }(executionContext)
+ if (waitForCompletion) {
+import scala.concurrent.duration._
+Await.ready(f, 1 second)
+ }
+} catch {
+ case e: RejectedExecutionException =>
+logWarning("Execution context shutdown before deleting old
WriteAheadLogs. " +
+ "This would not affect recovery correctness.", e)
}
}
}
-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org
Repository: spark
Updated Branches:
refs/heads/master 151d7c2ba -> 216988688
[SPARK-11979][STREAMING] Empty TrackStateRDD cannot be checkpointed and
recovered from checkpoint file
This solves the following exception caused when empty state RDD is checkpointed
and recovered. The root cause is that an empty OpenHashMapBasedStateMap cannot
be deserialized as the initialCapacity is set to zero.
```
Job aborted due to stage failure: Task 0 in stage 6.0 failed 1 times, most
recent failure: Lost task 0.0 in stage 6.0 (TID 20, localhost):
java.lang.IllegalArgumentException: requirement failed: Invalid initial capacity
at scala.Predef$.require(Predef.scala:233)
at
org.apache.spark.streaming.util.OpenHashMapBasedStateMap.(StateMap.scala:96)
at
org.apache.spark.streaming.util.OpenHashMapBasedStateMap.(StateMap.scala:86)
at
org.apache.spark.streaming.util.OpenHashMapBasedStateMap.readObject(StateMap.scala:291)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at
java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
at
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
at
org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:76)
at
org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:181)
at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at
scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
at
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
at
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
at scala.collection.AbstractIterator.to(Iterator.scala:1157)
at
scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
at
scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
at
org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:921)
at
org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:921)
at
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858)
at
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:88)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:744)
```
Author: Tathagata Das
Closes #9958 from tdas/SPARK-11979.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/21698868
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/21698868
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/21698868
Branch: refs/heads/master
Commit: 2169886883d33b33acf378ac42a626576b342df1
Parents: 151d7c2
Author: Tathagata Das
Authored: Tue Nov 24 23:13:01 2015 -0800
Committer: Shixiong Zhu
Committed: Tue Nov 24 23:13:01 2015 -0800
--
.../apache/spark/streaming/util/StateMap.scala | 19 -
.../apache/spark/streaming/StateMapSuite.scala | 30 +---
.../streaming/rdd/TrackStateRDDSuite.scala | 10 +++
3 files changed, 42 insertions(+), 17
Repository: spark
Updated Branches:
refs/heads/branch-1.6 68bcb9b33 -> 7f030aa42
[SPARK-11979][STREAMING] Empty TrackStateRDD cannot be checkpointed and
recovered from checkpoint file
This solves the following exception caused when empty state RDD is checkpointed
and recovered. The root cause is that an empty OpenHashMapBasedStateMap cannot
be deserialized as the initialCapacity is set to zero.
```
Job aborted due to stage failure: Task 0 in stage 6.0 failed 1 times, most
recent failure: Lost task 0.0 in stage 6.0 (TID 20, localhost):
java.lang.IllegalArgumentException: requirement failed: Invalid initial capacity
at scala.Predef$.require(Predef.scala:233)
at
org.apache.spark.streaming.util.OpenHashMapBasedStateMap.(StateMap.scala:96)
at
org.apache.spark.streaming.util.OpenHashMapBasedStateMap.(StateMap.scala:86)
at
org.apache.spark.streaming.util.OpenHashMapBasedStateMap.readObject(StateMap.scala:291)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at
java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
at
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
at
org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:76)
at
org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:181)
at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at
scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
at
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
at
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
at scala.collection.AbstractIterator.to(Iterator.scala:1157)
at
scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
at
scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
at
org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:921)
at
org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:921)
at
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858)
at
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:88)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:744)
```
Author: Tathagata Das
Closes #9958 from tdas/SPARK-11979.
(cherry picked from commit 2169886883d33b33acf378ac42a626576b342df1)
Signed-off-by: Shixiong Zhu
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7f030aa4
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7f030aa4
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7f030aa4
Branch: refs/heads/branch-1.6
Commit: 7f030aa422802a8e7077e1c74a59ab9a5fe54488
Parents: 68bcb9b
Author: Tathagata Das
Authored: Tue Nov 24 23:13:01 2015 -0800
Committer: Shixiong Zhu
Committed: Tue Nov 24 23:13:29 2015 -0800
--
.../apache/spark/streaming/util/StateMap.scala | 19 -