Hello,

I am running the BroadcastTest example in a standalone cluster using
spark-submit. I have 8 host machines and made Host1 the master. Host2 to
Host8 act as 7 workers to connect to the master. The connection was fine as
I could see all 7 hosts on the master web ui. The BroadcastTest example with
Http broadcast also works fine, I think, as there was no error msg and all
workers "EXITED" at the end. But when I changed the third argument from
"Http" to "Torrent" to use Torrent broadcast, all workers got a "KILLED"
status once they reached sc.stop(). 

Below is the stderr on one of the workers when running Torrent broadcast (I
masked the IP addresses):
==========================================================================================
14/07/02 18:20:03 INFO SecurityManager: Changing view acls to: root
14/07/02 18:20:03 INFO SecurityManager: SecurityManager: authentication
disabled; ui acls disabled; users with view permissions: Set(root)
14/07/02 18:20:04 INFO Slf4jLogger: Slf4jLogger started
14/07/02 18:20:04 INFO Remoting: Starting remoting
14/07/02 18:20:04 INFO Remoting: Remoting started; listening on addresses
:[akka.tcp://driverPropsFetcher@dyn-xxx-xx-xx-xx:37771]
14/07/02 18:20:04 INFO Remoting: Remoting now listens on addresses:
[akka.tcp://driverPropsFetcher@dyn-xxx-xx-xx-xx:37771]
14/07/02 18:20:04 INFO SecurityManager: Changing view acls to: root
14/07/02 18:20:04 INFO SecurityManager: SecurityManager: authentication
disabled; ui acls disabled; users with view permissions: Set(root)
14/07/02 18:20:04 INFO RemoteActorRefProvider$RemotingTerminator: Shutting
down remote daemon.
14/07/02 18:20:04 INFO RemoteActorRefProvider$RemotingTerminator: Remote
daemon shut down; proceeding with flushing remote transports.
14/07/02 18:20:04 INFO Slf4jLogger: Slf4jLogger started
14/07/02 18:20:04 INFO Remoting: Starting remoting
14/07/02 18:20:04 INFO Remoting: Remoting started; listening on addresses
:[akka.tcp://sparkExecutor@dyn-xxx-xx-xx-xx:53661]
14/07/02 18:20:04 INFO Remoting: Remoting now listens on addresses:
[akka.tcp://sparkExecutor@dyn-xxx-xx-xx-xx:53661]
14/07/02 18:20:04 INFO CoarseGrainedExecutorBackend: Connecting to driver:
akka.tcp://spark@dyn-xxx-xx-xx-xx:42436/user/CoarseGrainedScheduler
14/07/02 18:20:04 INFO WorkerWatcher: Connecting to worker
akka.tcp://sparkWorker@dyn-xxx-xx-xx-xx:32818/user/Worker
14/07/02 18:20:04 INFO Remoting: Remoting shut down
14/07/02 18:20:04 INFO RemoteActorRefProvider$RemotingTerminator: Remoting
shut down.
14/07/02 18:20:04 INFO WorkerWatcher: Successfully connected to
akka.tcp://sparkWorker@dyn-xxx-xx-xx-xx:32818/user/Worker
14/07/02 18:20:04 INFO CoarseGrainedExecutorBackend: Successfully registered
with driver
14/07/02 18:20:04 INFO SecurityManager: Changing view acls to: root
14/07/02 18:20:04 INFO SecurityManager: SecurityManager: authentication
disabled; ui acls disabled; users with view permissions: Set(root)
14/07/02 18:20:04 INFO Slf4jLogger: Slf4jLogger started
14/07/02 18:20:04 INFO Remoting: Starting remoting
14/07/02 18:20:05 INFO Remoting: Remoting started; listening on addresses
:[akka.tcp://spark@dyn-xxx-xx-xx-xx:57883]
14/07/02 18:20:05 INFO Remoting: Remoting now listens on addresses:
[akka.tcp://spark@dyn-xxx-xx-xx-xx:57883]
14/07/02 18:20:05 INFO SparkEnv: Connecting to MapOutputTracker:
akka.tcp://spark@dyn-xxx-xx-xx-xx:42436/user/MapOutputTracker
14/07/02 18:20:05 INFO SparkEnv: Connecting to BlockManagerMaster:
akka.tcp://spark@dyn-xxx-xx-xx-xx:42436/user/BlockManagerMaster
14/07/02 18:20:05 INFO DiskBlockManager: Created local directory at
/tmp/spark-local-20140702182005-30bd
14/07/02 18:20:05 INFO ConnectionManager: Bound socket to port 60368 with id
= ConnectionManagerId(dyn-xxx-xx-xx-xx,60368)
14/07/02 18:20:05 INFO MemoryStore: MemoryStore started with capacity 294.6
MB
14/07/02 18:20:05 INFO BlockManagerMaster: Trying to register BlockManager
14/07/02 18:20:05 INFO BlockManagerMaster: Registered BlockManager
14/07/02 18:20:05 INFO HttpFileServer: HTTP File server directory is
/tmp/spark-35f65442-e0e8-4122-9359-ca8232ca97a6
14/07/02 18:20:05 INFO HttpServer: Starting HTTP Server
14/07/02 18:20:06 INFO CoarseGrainedExecutorBackend: Got assigned task 9
14/07/02 18:20:06 INFO Executor: Running task ID 9
14/07/02 18:20:06 INFO Executor: Fetching
http://xxx.xx.xx.xx:54292/jars/broadcast-test_2.10-1.0.jar with timestamp
1404339601903
14/07/02 18:20:06 INFO Utils: Fetching
http://xxx.xx.xx.xx:54292/jars/broadcast-test_2.10-1.0.jar to
/tmp/fetchFileTemp5382215579021312284.tmp
14/07/02 18:20:06 INFO BlockManager: Removing broadcast 0
14/07/02 18:20:07 INFO Executor: Adding
file:/home/lrl/Desktop/spark-master/work/app-20140702182002-0006/3/./broadcast-test_2.10-1.0.jar
to class loader
14/07/02 18:20:07 INFO TorrentBroadcast: Started reading broadcast variable
1
14/07/02 18:20:07 INFO SendingConnection: Initiating connection to
[dyn-xxx-xx-xx-xx:60179]
14/07/02 18:20:07 INFO SendingConnection: Connected to
[dyn-xxx-xx-xx-xx:60179], 1 messages pending
14/07/02 18:20:07 INFO ConnectionManager: Accepted connection from
[DCTB-Host1/xxx.xx.xx.xx]
14/07/02 18:20:07 INFO BlockManager: Found block broadcast_1_meta remotely
14/07/02 18:20:07 INFO SendingConnection: Initiating connection to
[dyn-xxx-xx-xx-xx:55273]
14/07/02 18:20:07 INFO SendingConnection: Connected to
[dyn-xxx-xx-xx-xx:55273], 1 messages pending
14/07/02 18:20:07 INFO ConnectionManager: Accepted connection from
[dyn-xxx-xx-xx-xxx]
14/07/02 18:20:07 INFO BlockManager: Found block broadcast_1_piece0 remotely
14/07/02 18:20:07 WARN SizeEstimator: Failed to check whether
UseCompressedOops is set; assuming yes
14/07/02 18:20:07 INFO MemoryStore: ensureFreeSpace(4000168) called with
curMem=0, maxMem=308910489
14/07/02 18:20:07 INFO MemoryStore: Block broadcast_1_piece0 stored as
values in memory (estimated size 3.8 MB, free 290.8 MB)
14/07/02 18:20:07 INFO BlockManagerMaster: Updated info of block
broadcast_1_piece0
14/07/02 18:20:08 INFO MemoryStore: ensureFreeSpace(4000120) called with
curMem=4000168, maxMem=308910489
14/07/02 18:20:08 INFO MemoryStore: Block broadcast_1 stored as values in
memory (estimated size 3.8 MB, free 287.0 MB)
14/07/02 18:20:08 INFO TorrentBroadcast: Reading broadcast variable 1 took
0.909187542 s
14/07/02 18:20:08 INFO Executor: Serialized size of result for 9 is 599
14/07/02 18:20:08 INFO Executor: Sending result for 9 directly to driver
14/07/02 18:20:08 INFO Executor: Finished task ID 9
14/07/02 18:20:08 INFO CoarseGrainedExecutorBackend: Got assigned task 13
14/07/02 18:20:08 INFO Executor: Running task ID 13
14/07/02 18:20:08 INFO TorrentBroadcast: Started reading broadcast variable
2
14/07/02 18:20:08 INFO BlockManager: Found block broadcast_2_meta remotely
14/07/02 18:20:08 INFO BlockManager: Found block broadcast_2_piece0 remotely
14/07/02 18:20:08 INFO MemoryStore: ensureFreeSpace(4000168) called with
curMem=8000288, maxMem=308910489
14/07/02 18:20:08 INFO MemoryStore: Block broadcast_2_piece0 stored as
values in memory (estimated size 3.8 MB, free 283.2 MB)
14/07/02 18:20:08 INFO BlockManagerMaster: Updated info of block
broadcast_2_piece0
14/07/02 18:20:08 INFO MemoryStore: ensureFreeSpace(4000120) called with
curMem=12000456, maxMem=308910489
14/07/02 18:20:08 INFO MemoryStore: Block broadcast_2 stored as values in
memory (estimated size 3.8 MB, free 279.3 MB)
14/07/02 18:20:08 INFO TorrentBroadcast: Reading broadcast variable 2 took
0.269456367 s
14/07/02 18:20:08 INFO Executor: Serialized size of result for 13 is 599
14/07/02 18:20:08 INFO Executor: Sending result for 13 directly to driver
14/07/02 18:20:08 INFO Executor: Finished task ID 13
14/07/02 18:20:09 INFO BlockManager: Removing broadcast 2
14/07/02 18:20:09 INFO BlockManager: Removing block broadcast_2
14/07/02 18:20:09 INFO MemoryStore: Block broadcast_2 of size 4000120
dropped from memory (free 296910033)
14/07/02 18:20:09 INFO BlockManager: Removing block broadcast_2_piece0
14/07/02 18:20:09 INFO MemoryStore: Block broadcast_2_piece0 of size 4000168
dropped from memory (free 300910201)
14/07/02 18:20:09 INFO CoarseGrainedExecutorBackend: Driver commanded a
shutdown
14/07/02 18:20:09 INFO RemoteActorRefProvider$RemotingTerminator: Shutting
down remote daemon.
14/07/02 18:20:09 INFO BlockManagerMaster: Updated info of block
broadcast_2_piece0
14/07/02 18:20:09 INFO RemoteActorRefProvider$RemotingTerminator: Remote
daemon shut down; proceeding with flushing remote transports.
14/07/02 18:20:09 INFO Remoting: Remoting shut down
14/07/02 18:20:10 INFO ConnectionManager: Key not valid ?
sun.nio.ch.SelectionKeyImpl@1973a69
14/07/02 18:20:10 INFO ConnectionManager: Removing SendingConnection to
ConnectionManagerId(dyn-xxx-xx-xx-xx,60179)
14/07/02 18:20:10 INFO ConnectionManager: Removing ReceivingConnection to
ConnectionManagerId(DCTB-Host1,60179)
14/07/02 18:20:10 ERROR ConnectionManager: Corresponding
SendingConnectionManagerId not found
14/07/02 18:20:10 INFO ConnectionManager: key already cancelled ?
sun.nio.ch.SelectionKeyImpl@1973a69
java.nio.channels.CancelledKeyException
        at
org.apache.spark.network.ConnectionManager.run(ConnectionManager.scala:363)
        at
org.apache.spark.network.ConnectionManager$$anon$4.run(ConnectionManager.scala:116)
==========================================================================================

Also, here is the output when running http broadcast, as a comparision (IP
addresses masked):
==========================================================================================
14/07/02 18:02:04 INFO SecurityManager: Changing view acls to: root
14/07/02 18:02:04 INFO SecurityManager: SecurityManager: authentication
disabled; ui acls disabled; users with view permissions: Set(root)
14/07/02 18:02:05 INFO Slf4jLogger: Slf4jLogger started
14/07/02 18:02:05 INFO Remoting: Starting remoting
14/07/02 18:02:05 INFO Remoting: Remoting started; listening on addresses
:[akka.tcp://driverPropsFetcher@dyn-xxx-xx-xx-xx:37190]
14/07/02 18:02:05 INFO Remoting: Remoting now listens on addresses:
[akka.tcp://driverPropsFetcher@dyn-xxx-xx-xx-xx:37190]
14/07/02 18:02:05 INFO SecurityManager: Changing view acls to: root
14/07/02 18:02:05 INFO SecurityManager: SecurityManager: authentication
disabled; ui acls disabled; users with view permissions: Set(root)
14/07/02 18:02:05 INFO RemoteActorRefProvider$RemotingTerminator: Shutting
down remote daemon.
14/07/02 18:02:05 INFO RemoteActorRefProvider$RemotingTerminator: Remote
daemon shut down; proceeding with flushing remote transports.
14/07/02 18:02:05 INFO Slf4jLogger: Slf4jLogger started
14/07/02 18:02:05 INFO Remoting: Starting remoting
14/07/02 18:02:05 INFO Remoting: Remoting started; listening on addresses
:[akka.tcp://sparkExecutor@dyn-xxx-xx-xx-xx:44376]
14/07/02 18:02:05 INFO Remoting: Remoting now listens on addresses:
[akka.tcp://sparkExecutor@dyn-xxx-xx-xx-xx:44376]
14/07/02 18:02:05 INFO CoarseGrainedExecutorBackend: Connecting to driver:
akka.tcp://spark@dyn-xxx-xx-xx-xx:51300/user/CoarseGrainedScheduler
14/07/02 18:02:05 INFO WorkerWatcher: Connecting to worker
akka.tcp://sparkWorker@dyn-xxx-xx-xx-xx:32818/user/Worker
14/07/02 18:02:05 INFO Remoting: Remoting shut down
14/07/02 18:02:05 INFO RemoteActorRefProvider$RemotingTerminator: Remoting
shut down.
14/07/02 18:02:05 INFO WorkerWatcher: Successfully connected to
akka.tcp://sparkWorker@dyn-xxx-xx-xx-xx:32818/user/Worker
14/07/02 18:02:05 INFO CoarseGrainedExecutorBackend: Successfully registered
with driver
14/07/02 18:02:05 INFO SecurityManager: Changing view acls to: root
14/07/02 18:02:05 INFO SecurityManager: SecurityManager: authentication
disabled; ui acls disabled; users with view permissions: Set(root)
14/07/02 18:02:05 INFO Slf4jLogger: Slf4jLogger started
14/07/02 18:02:05 INFO Remoting: Starting remoting
14/07/02 18:02:05 INFO Remoting: Remoting started; listening on addresses
:[akka.tcp://spark@dyn-xxx-xx-xx-xx:39965]
14/07/02 18:02:05 INFO Remoting: Remoting now listens on addresses:
[akka.tcp://spark@dyn-xxx-xx-xx-xx:39965]
14/07/02 18:02:05 INFO SparkEnv: Connecting to MapOutputTracker:
akka.tcp://spark@dyn-xxx-xx-xx-xx:51300/user/MapOutputTracker
14/07/02 18:02:05 INFO SparkEnv: Connecting to BlockManagerMaster:
akka.tcp://spark@dyn-xxx-xx-xx-xx:51300/user/BlockManagerMaster
14/07/02 18:02:05 INFO DiskBlockManager: Created local directory at
/tmp/spark-local-20140702180205-113e
14/07/02 18:02:05 INFO ConnectionManager: Bound socket to port 48270 with id
= ConnectionManagerId(dyn-xxx-xx-xx-xx,48270)
14/07/02 18:02:05 INFO MemoryStore: MemoryStore started with capacity 294.6
MB
14/07/02 18:02:05 INFO BlockManagerMaster: Trying to register BlockManager
14/07/02 18:02:05 INFO BlockManagerMaster: Registered BlockManager
14/07/02 18:02:05 INFO HttpFileServer: HTTP File server directory is
/tmp/spark-5c87e636-faaa-489a-b5a6-c9100cfe4dc5
14/07/02 18:02:05 INFO HttpServer: Starting HTTP Server
14/07/02 18:02:06 INFO CoarseGrainedExecutorBackend: Got assigned task 11
14/07/02 18:02:06 INFO Executor: Running task ID 11
14/07/02 18:02:06 INFO Executor: Fetching
http://xxx.xx.xx.xx:43505/jars/broadcast-test_2.10-1.0.jar with timestamp
1404338522319
14/07/02 18:02:06 INFO Utils: Fetching
http://xxx.xx.xx.xx:43505/jars/broadcast-test_2.10-1.0.jar to
/tmp/fetchFileTemp4777840901789178395.tmp
14/07/02 18:02:06 INFO Executor: Adding
file:/home/lrl/Desktop/spark-master/work/app-20140702180202-0001/3/./broadcast-test_2.10-1.0.jar
to class loader
14/07/02 18:02:06 INFO HttpBroadcast: Started reading broadcast variable 1
14/07/02 18:02:06 WARN SizeEstimator: Failed to check whether
UseCompressedOops is set; assuming yes
14/07/02 18:02:06 INFO MemoryStore: ensureFreeSpace(4000120) called with
curMem=0, maxMem=308910489
14/07/02 18:02:06 INFO MemoryStore: Block broadcast_1 stored as values in
memory (estimated size 3.8 MB, free 290.8 MB)
14/07/02 18:02:06 INFO HttpBroadcast: Reading broadcast variable 1 took
0.121600945 s
14/07/02 18:02:06 INFO Executor: Serialized size of result for 11 is 599
14/07/02 18:02:06 INFO Executor: Sending result for 11 directly to driver
14/07/02 18:02:06 INFO Executor: Finished task ID 11
14/07/02 18:02:07 INFO CoarseGrainedExecutorBackend: Got assigned task 18
14/07/02 18:02:07 INFO Executor: Running task ID 18
14/07/02 18:02:07 INFO HttpBroadcast: Started reading broadcast variable 2
14/07/02 18:02:07 INFO MemoryStore: ensureFreeSpace(4000120) called with
curMem=4000120, maxMem=308910489
14/07/02 18:02:07 INFO MemoryStore: Block broadcast_2 stored as values in
memory (estimated size 3.8 MB, free 287.0 MB)
14/07/02 18:02:07 INFO HttpBroadcast: Reading broadcast variable 2 took
0.234879208 s
14/07/02 18:02:07 INFO Executor: Serialized size of result for 18 is 599
14/07/02 18:02:07 INFO Executor: Sending result for 18 directly to driver
14/07/02 18:02:07 INFO Executor: Finished task ID 18
14/07/02 18:02:07 INFO CoarseGrainedExecutorBackend: Driver commanded a
shutdown
14/07/02 18:02:07 INFO RemoteActorRefProvider$RemotingTerminator: Shutting
down remote daemon.
14/07/02 18:02:07 INFO RemoteActorRefProvider$RemotingTerminator: Remote
daemon shut down; proceeding with flushing remote transports.
14/07/02 18:02:07 INFO Remoting: Remoting shut down
==========================================================================================

It seems to me that the error does not happen until broadcasting is already
finished. But still I would like to make sure why would there be such an
error message occurring at sc.stop(). If it is issue with sc.stop() then why
is it happening only with Torrent broadcast but not Http broadcast? Any
insights on this will be very much appreciated! Thanks in advance!

Best,
Jack



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Running-the-BroadcastTest-scala-with-TorrentBroadcastFactory-in-a-standalone-cluster-tp8736.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Reply via email to