damccorm opened a new issue, #20555:
URL: https://github.com/apache/beam/issues/20555
I wish to use side-inputs in order to pass some configuration to my
pipeline, however the driver commands a shutdown after the `PCollectionView`
has been created when running on my local spark-cluster (spark version 2.4.7, 1
master, 1 worker, running on localhost). This however works perfectly on the
DirectRunner.
I have attempted to strip the code to its bare essentials (see below). Still
the issue persists when running on the spark cluster. DirectRunner still works
fine.
The spark-cluster does accept jobs, and I have sucessfully run a
"hello-world" pipeline that completed without issue.
What is happening here?
Logs pasted below.
```
// Pipeline
private static PipelineResult runPipeline(PipelineOptions options) {
Pipeline p = Pipeline.create(options);
PCollectionView<String> schema = p
.apply("Dummy tabular schema builder", Create.of("This is a string"))
.apply("Collect",
View.asSingleton());
p
.apply("Hello world", Create.of("Hello world"))
.apply("Side input test",
ParDo.of(DummyFn.builder().setSchemaView(schema).build()).withSideInput("schema",
schema))
.apply(ConsoleIO.create());
return p.run();
}
```
```
// Simple FN that prints the side input
@AutoValue
public abstract class DummyFn extends DoFn<String,
String> {
private final static Logger LOG = LoggerFactory.getLogger(DummyFn.class);
public
static Builder builder() {
return new org.odp.beam.io.fn.AutoValue_DummyFn.Builder();
}
public abstract PCollectionView<String> getSchemaView();
@ProcessElement
public
void processElement(@Element String element,
OutputReceiver<String>
out,
ProcessContext context) {
String schema = context.sideInput(getSchemaView());
LOG.warn(schema.toString());
out.output(element.toUpperCase());
}
@AutoValue.Builder
public abstract static class Builder {
public abstract Builder setSchemaView(PCollectionView<String>
value);
public abstract DummyFn build();
}
}
```
```
// Simple PTransform that prints the output of the toString-method
public class ConsoleIO<T> extends
PTransform<PCollection<T>, PDone> {
public static <T> ConsoleIO<T> create() {
return
new ConsoleIO();
}
@Override
public PDone expand(PCollection<T> input) {
input
.apply("Print elements", ParDo.of(new PrintElementFn<T>()));
return
PDone.in(input.getPipeline());
}
static class PrintElementFn<T> extends DoFn<T, Void> {
@DoFn.ProcessElement
public void processElement(@Element T element, ProcessContext
context)
throws Exception {
System.out.println(element.toString());
}
}
}
```
spark-submit output
```
$ spark-submit \
--class org.odp.beam.extractors.CsvToCdfRawExtractor \
--verbose \
--driver-memory
4G \
--executor-memory 4G \
--total-executor-cores 4 \
--deploy-mode client \
--supervise \
--conf
spark.dynamicAllocation.enabled=false \
--conf spark.network.timeout=420000 \
--master spark://192.168.10.172:7077
\
target/beam-poc-0.1-shaded.jar \
--runner=SparkRunner
Using properties file: null
20/11/10 15:46:44
WARN Utils: Your hostname, localhost.localdomain resolves to a loopback
address: 127.0.0.1; using 192.168.10.172
instead (on interface enp7s0)
20/11/10 15:46:44 WARN Utils: Set SPARK_LOCAL_IP if you need to bind
to another address
WARNING: An illegal reflective access operation has occurred
WARNING: Illegal reflective
access by org.apache.spark.unsafe.Platform
(file:/home/tom/app/spark/spark-2.4.7-bin-hadoop2.7/jars/spark-unsafe_2.11-2.4.7.jar)
to method java.nio.Bits.unaligned()
WARNING: Please consider reporting this to the maintainers of
org.apache.spark.unsafe.Platform
WARNING:
Use --illegal-access=warn to enable warnings of further illegal reflective
access operations
WARNING:
All illegal access operations will be denied in a future release
Parsed arguments:
master
spark://192.168.10.172:7077
deployMode client
executorMemory
4G
executorCores null
totalExecutorCores 4
propertiesFile null
driverMemory
4G
driverCores null
driverExtraClassPath null
driverExtraLibraryPath
null
driverExtraJavaOptions null
supervise true
queue null
numExecutors null
files null
pyFiles null
archives
null
mainClass org.odp.beam.extractors.CsvToCdfRawExtractor
primaryResource
file:/home/tom/project/odf/beam-poc/target/beam-poc-0.1-shaded.jar
name
org.odp.beam.extractors.CsvToCdfRawExtractor
childArgs [--runner=SparkRunner]
jars null
packages null
packagesExclusions null
repositories
null
verbose true
Spark properties used, including those specified
through
--conf and those from the properties file null:
(spark.network.timeout,420000)
(spark.driver.memory,4G)
(spark.dynamicAllocation.enabled,false)
20/11/10 15:46:45 WARN NativeCodeLoader: Unable to
load native-hadoop library for your platform... using builtin-java classes
where applicable
Main class:
org.odp.beam.extractors.CsvToCdfRawExtractor
Arguments:
--runner=SparkRunner
Spark
config:
(spark.jars,file:/home/tom/project/odf/beam-poc/target/beam-poc-0.1-shaded.jar)
(spark.app.name,org.odp.beam.extractors.CsvToCdfRawExtractor)
(spark.cores.max,4)
(spark.network.timeout,420000)
(spark.driver.memory,4G)
(spark.submit.deployMode,client)
(spark.master,spark://192.168.10.172:7077)
(spark.executor.memory,4G)
(spark.dynamicAllocation.enabled,false)
Classpath
elements:
file:/home/tom/project/odf/beam-poc/target/beam-poc-0.1-shaded.jar
log4j:WARN No appenders
could be found for logger
(org.apache.beam.sdk.options.PipelineOptionsFactory).
log4j:WARN Please initialize
the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for
more info.
Using Spark's default log4j profile:
org/apache/spark/log4j-defaults.properties
20/11/10
15:46:46 INFO SparkContext: Running Spark version 2.4.7
20/11/10 15:46:47 INFO SparkContext: Submitted
application: CsvToCdfRawExtractor
20/11/10 15:46:47 INFO SecurityManager: Changing view acls to: tom
20/11/10
15:46:47 INFO SecurityManager: Changing modify acls to: tom
20/11/10 15:46:47 INFO SecurityManager:
Changing view acls groups to:
20/11/10 15:46:47 INFO SecurityManager: Changing modify acls groups
to:
20/11/10 15:46:47 INFO SecurityManager: SecurityManager: authentication
disabled; ui acls disabled;
users with view permissions: Set(tom); groups with view permissions: Set();
users with modify permissions:
Set(tom); groups with modify permissions: Set()
20/11/10 15:46:47 INFO Utils: Successfully started
service 'sparkDriver' on port 35103.
20/11/10 15:46:47 INFO SparkEnv: Registering MapOutputTracker
20/11/10
15:46:47 INFO SparkEnv: Registering BlockManagerMaster
20/11/10 15:46:47 INFO BlockManagerMasterEndpoint:
Using org.apache.spark.storage.DefaultTopologyMapper for getting topology
information
20/11/10 15:46:47
INFO BlockManagerMasterEndpoint: BlockManagerMasterEndpoint up
20/11/10 15:46:47 INFO DiskBlockManager:
Created local directory at /tmp/blockmgr-58419068-d0ad-45c9-b90b-92b659dee1c3
20/11/10 15:46:47 INFO
MemoryStore: MemoryStore started with capacity 2.2 GB
20/11/10 15:46:47 INFO SparkEnv: Registering
OutputCommitCoordinator
20/11/10 15:46:47 INFO Utils: Successfully started service 'SparkUI' on port
4040.
20/11/10 15:46:47 INFO SparkUI: Bound SparkUI to 0.0.0.0, and started at
http://fedora:4040
20/11/10
15:46:47 INFO SparkContext: Added JAR
file:/home/tom/project/odf/beam-poc/target/beam-poc-0.1-shaded.jar
at spark://fedora:35103/jars/beam-poc-0.1-shaded.jar with timestamp
1605019607514
20/11/10 15:46:47
INFO StandaloneAppClient$ClientEndpoint: Connecting to master
spark://192.168.10.172:7077...
20/11/10
15:46:47 INFO TransportClientFactory: Successfully created connection to
/192.168.10.172:7077 after
25 ms (0 ms spent in bootstraps)
20/11/10 15:46:47 INFO StandaloneSchedulerBackend: Connected to Spark
cluster with app ID app-20201110154647-0020
20/11/10 15:46:47 INFO StandaloneAppClient$ClientEndpoint:
Executor added: app-20201110154647-0020/0 on
worker-20201109144752-192.168.10.172-45535 (192.168.10.172:45535)
with 4 core(s)
20/11/10 15:46:47 INFO StandaloneSchedulerBackend: Granted executor ID
app-20201110154647-0020/0
on hostPort 192.168.10.172:45535 with 4 core(s), 4.0 GB RAM
20/11/10 15:46:47 INFO Utils: Successfully
started service 'org.apache.spark.network.netty.NettyBlockTransferService'
on port 33169.
20/11/10
15:46:47 INFO NettyBlockTransferService: Server created on fedora:33169
20/11/10 15:46:47 INFO BlockManager:
Using org.apache.spark.storage.RandomBlockReplicationPolicy for block
replication policy
20/11/10 15:46:47
INFO StandaloneAppClient$ClientEndpoint: Executor updated:
app-20201110154647-0020/0 is now RUNNING
20/11/10
15:46:47 INFO BlockManagerMaster: Registering BlockManager
BlockManagerId(driver, fedora, 33169, None)
20/11/10
15:46:47 INFO BlockManagerMasterEndpoint: Registering block manager
fedora:33169 with 2.2 GB RAM, BlockManagerId(driver,
fedora, 33169, None)
20/11/10 15:46:47 INFO BlockManagerMaster: Registered BlockManager
BlockManagerId(driver,
fedora, 33169, None)
20/11/10 15:46:47 INFO BlockManager: Initialized BlockManager:
BlockManagerId(driver,
fedora, 33169, None)
20/11/10 15:46:47 INFO StandaloneSchedulerBackend: SchedulerBackend is ready
for
scheduling beginning after reached minRegisteredResourcesRatio: 0.0
20/11/10 15:46:48 INFO SparkRunner$Evaluator:
Entering directly-translatable composite transform:
'Collect/Combine.GloballyAsSingletonView/Combine.globally(Singleton)'
20/11/10
15:46:48 INFO MetricsAccumulator: Instantiated metrics accumulator:
MetricQueryResults()
20/11/10 15:46:48
INFO AggregatorsAccumulator: Instantiated aggregators accumulator:
20/11/10 15:46:48 INFO SparkRunner$Evaluator:
Evaluating Read(CreateSource)
20/11/10 15:46:48 INFO SparkRunner$Evaluator: Entering directly-translatable
composite transform:
'Collect/Combine.GloballyAsSingletonView/Combine.globally(Singleton)'
20/11/10
15:46:48 INFO SparkRunner$Evaluator: Evaluating Combine.globally(Singleton)
20/11/10 15:46:48 INFO
SparkContext: Starting job: aggregate at GroupCombineFunctions.java:107
20/11/10 15:46:48 INFO DAGScheduler:
Got job 0 (aggregate at GroupCombineFunctions.java:107) with 1 output
partitions
20/11/10 15:46:48
INFO DAGScheduler: Final stage: ResultStage 0 (aggregate at
GroupCombineFunctions.java:107)
20/11/10
15:46:48 INFO DAGScheduler: Parents of final stage: List()
20/11/10 15:46:48 INFO DAGScheduler: Missing
parents: List()
20/11/10 15:46:48 INFO DAGScheduler: Submitting ResultStage 0 (Dummy tabular
schema
builder/Read(CreateSource).out Bounded[0] at RDD at SourceRDD.java:80),
which has no missing parents
20/11/10
15:46:48 INFO MemoryStore: Block broadcast_0 stored as values in memory
(estimated size 16.2 KB, free
2.2 GB)
20/11/10 15:46:48 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes
in memory (estimated
size 6.8 KB, free 2.2 GB)
20/11/10 15:46:48 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory
on fedora:33169 (size: 6.8 KB, free: 2.2 GB)
20/11/10 15:46:48 INFO SparkContext: Created broadcast
0 from broadcast at DAGScheduler.scala:1184
20/11/10 15:46:48 INFO DAGScheduler: Submitting 1 missing
tasks from ResultStage 0 (Dummy tabular schema
builder/Read(CreateSource).out Bounded[0] at RDD at SourceRDD.java:80)
(first 15 tasks are for partitions Vector(0))
20/11/10 15:46:48 INFO TaskSchedulerImpl: Adding task
set 0.0 with 1 tasks
20/11/10 15:46:49 INFO CoarseGrainedSchedulerBackend$DriverEndpoint:
Registered
executor NettyRpcEndpointRef(spark-client://Executor) (192.168.10.172:48382)
with ID 0
20/11/10 15:46:49
INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, 192.168.10.172,
executor 0, partition 0,
PROCESS_LOCAL, 8546 bytes)
20/11/10 15:46:49 INFO BlockManagerMasterEndpoint: Registering block manager
192.168.10.172:43781 with 2.2 GB RAM, BlockManagerId(0, 192.168.10.172,
43781, None)
20/11/10 15:46:50
INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on
192.168.10.172:43781 (size: 6.8 KB, free:
2.2 GB)
20/11/10 15:46:51 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID
0) in 2056 ms on
192.168.10.172 (executor 0) (1/1)
20/11/10 15:46:51 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose
tasks have all completed, from pool
20/11/10 15:46:51 INFO DAGScheduler: ResultStage 0 (aggregate
at GroupCombineFunctions.java:107) finished in 3.091 s
20/11/10 15:46:51 INFO DAGScheduler: Job 0 finished:
aggregate at GroupCombineFunctions.java:107, took 3.132405 s
20/11/10 15:46:51 INFO SparkRunner$Evaluator:
Evaluating
org.apache.beam.sdk.transforms.View$VoidKeyToMultimapMaterialization$VoidKeyToMultimapMaterializationDoFn@14924f41
20/11/10
15:46:51 INFO SparkRunner$Evaluator: Evaluating View.CreatePCollectionView
20/11/10 15:46:51 INFO SparkContext:
Invoking stop() from shutdown hook
20/11/10 15:46:51 INFO SparkUI: Stopped Spark web UI at http://fedora:4040
20/11/10
15:46:51 INFO StandaloneSchedulerBackend: Shutting down all executors
20/11/10 15:46:51 INFO CoarseGrainedSchedulerBackend$DriverEndpoint:
Asking each executor to shut down
20/11/10 15:46:51 INFO MapOutputTrackerMasterEndpoint:
MapOutputTrackerMasterEndpoint
stopped!
20/11/10 15:46:51 INFO MemoryStore: MemoryStore cleared
20/11/10 15:46:51 INFO BlockManager:
BlockManager stopped
20/11/10 15:46:51 INFO BlockManagerMaster: BlockManagerMaster stopped
20/11/10
15:46:51 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint:
OutputCommitCoordinator stopped!
20/11/10
15:46:51 INFO SparkContext: Successfully stopped SparkContext
20/11/10 15:46:51 INFO ShutdownHookManager:
Shutdown hook called
20/11/10 15:46:51 INFO ShutdownHookManager: Deleting directory
/tmp/spark-665a903f-22db-497e-989f-a5ca3e0635e2
20/11/10
15:46:51 INFO ShutdownHookManager: Deleting directory
/tmp/spark-d4b5a04f-f6a3-48ff-b229-4eb966151d86
```
stderr from spark worker:
```
Spark Executor Command:
"/usr/lib/jvm/java-11-openjdk-11.0.9.6-0.0.ea.fc33.x86_64/bin/java" "-cp"
"/home/tom/app/spark/spark/conf/:/home/tom/app/spark/spark/jars/*"
"-Xmx4096M" "-Dspark.driver.port=35103" "-Dspark.network.timeout=420000"
"org.apache.spark.executor.CoarseGrainedExecutorBackend"
"--driver-url" "spark://CoarseGrainedScheduler@fedora:35103" "--executor-id"
"0" "--hostname" "192.168.10.172"
"--cores" "4" "--app-id" "app-20201110154647-0020" "--worker-url"
"spark://[email protected]:45535"
========================================
Using
Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
20/11/10 15:46:48 INFO CoarseGrainedExecutorBackend:
Started daemon with process name: [email protected]
20/11/10 15:46:48 INFO SignalUtils:
Registered signal handler for TERM
20/11/10 15:46:48 INFO SignalUtils: Registered signal handler for
HUP
20/11/10 15:46:48 INFO SignalUtils: Registered signal handler for INT
20/11/10 15:46:48 WARN Utils:
Your hostname, localhost.localdomain resolves to a loopback address:
127.0.0.1; using 192.168.10.172
instead (on interface enp7s0)
20/11/10 15:46:48 WARN Utils: Set SPARK_LOCAL_IP if you need to bind
to another address
WARNING: An illegal reflective access operation has occurred
WARNING: Illegal reflective
access by org.apache.spark.unsafe.Platform
(file:/home/tom/app/spark/spark-2.4.7-bin-hadoop2.7/jars/spark-unsafe_2.11-2.4.7.jar)
to method java.nio.Bits.unaligned()
WARNING: Please consider reporting this to the maintainers of
org.apache.spark.unsafe.Platform
WARNING:
Use --illegal-access=warn to enable warnings of further illegal reflective
access operations
WARNING:
All illegal access operations will be denied in a future release
20/11/10 15:46:48 WARN NativeCodeLoader:
Unable to load native-hadoop library for your platform... using builtin-java
classes where applicable
20/11/10
15:46:48 INFO SecurityManager: Changing view acls to: tom
20/11/10 15:46:48 INFO SecurityManager: Changing
modify acls to: tom
20/11/10 15:46:48 INFO SecurityManager: Changing view acls groups to:
20/11/10
15:46:48 INFO SecurityManager: Changing modify acls groups to:
20/11/10 15:46:48 INFO SecurityManager:
SecurityManager: authentication disabled; ui acls disabled; users with view
permissions: Set(tom);
groups with view permissions: Set(); users with modify permissions:
Set(tom); groups with modify permissions:
Set()
20/11/10 15:46:49 INFO TransportClientFactory: Successfully created
connection to fedora/192.168.10.172:35103
after 54 ms (0 ms spent in bootstraps)
20/11/10 15:46:49 INFO SecurityManager: Changing view acls to:
tom
20/11/10 15:46:49 INFO SecurityManager: Changing modify acls to: tom
20/11/10 15:46:49 INFO SecurityManager:
Changing view acls groups to:
20/11/10 15:46:49 INFO SecurityManager: Changing modify acls groups
to:
20/11/10 15:46:49 INFO SecurityManager: SecurityManager: authentication
disabled; ui acls disabled;
users with view permissions: Set(tom); groups with view permissions: Set();
users with modify permissions:
Set(tom); groups with modify permissions: Set()
20/11/10 15:46:49 INFO TransportClientFactory: Successfully
created connection to fedora/192.168.10.172:35103 after 4 ms (0 ms spent in
bootstraps)
20/11/10 15:46:49
INFO DiskBlockManager: Created local directory at
/tmp/spark-0e47fa97-8714-4e8e-950e-b1032fe36995/executor-e7667d04-198d-4144-8897-ddada0bfd1de/blockmgr-019262b3-4d3e-4158-b984-ff85c0846191
20/11/10
15:46:49 INFO MemoryStore: MemoryStore started with capacity 2.2 GB
20/11/10 15:46:49 INFO CoarseGrainedExecutorBackend:
Connecting to driver: spark://CoarseGrainedScheduler@fedora:35103
20/11/10 15:46:49 INFO WorkerWatcher:
Connecting to worker spark://[email protected]:45535
20/11/10 15:46:49 INFO TransportClientFactory:
Successfully created connection to /192.168.10.172:45535 after 2 ms (0 ms
spent in bootstraps)
20/11/10
15:46:49 INFO WorkerWatcher: Successfully connected to
spark://[email protected]:45535
20/11/10
15:46:49 INFO CoarseGrainedExecutorBackend: Successfully registered with
driver
20/11/10 15:46:49 INFO
Executor: Starting executor ID 0 on host 192.168.10.172
20/11/10 15:46:49 INFO Utils: Successfully
started service 'org.apache.spark.network.netty.NettyBlockTransferService'
on port 43781.
20/11/10
15:46:49 INFO NettyBlockTransferService: Server created on
192.168.10.172:43781
20/11/10 15:46:49 INFO
BlockManager: Using org.apache.spark.storage.RandomBlockReplicationPolicy
for block replication policy
20/11/10
15:46:49 INFO BlockManagerMaster: Registering BlockManager BlockManagerId(0,
192.168.10.172, 43781,
None)
20/11/10 15:46:49 INFO BlockManagerMaster: Registered BlockManager
BlockManagerId(0, 192.168.10.172,
43781, None)
20/11/10 15:46:49 INFO BlockManager: Initialized BlockManager:
BlockManagerId(0, 192.168.10.172,
43781, None)
20/11/10 15:46:49 INFO CoarseGrainedExecutorBackend: Got assigned task 0
20/11/10 15:46:49
INFO Executor: Running task 0.0 in stage 0.0 (TID 0)
20/11/10 15:46:49 INFO Executor: Fetching
spark://fedora:35103/jars/beam-poc-0.1-shaded.jar
with timestamp 1605019607514
20/11/10 15:46:49 INFO TransportClientFactory: Successfully created
connection
to fedora/192.168.10.172:35103 after 2 ms (0 ms spent in bootstraps)
20/11/10 15:46:49 INFO Utils:
Fetching spark://fedora:35103/jars/beam-poc-0.1-shaded.jar to
/tmp/spark-0e47fa97-8714-4e8e-950e-b1032fe36995/executor-e7667d04-198d-4144-8897-ddada0bfd1de/spark-62556d02-a044-4c2c-8f97-c7f25ef3e337/fetchFileTemp6325880319900581024.tmp
20/11/10
15:46:49 INFO Utils: Copying
/tmp/spark-0e47fa97-8714-4e8e-950e-b1032fe36995/executor-e7667d04-198d-4144-8897-ddada0bfd1de/spark-62556d02-a044-4c2c-8f97-c7f25ef3e337/2058038551605019607514_cache
to
/home/tom/app/spark/spark-2.4.7-bin-hadoop2.7/work/app-20201110154647-0020/0/./beam-poc-0.1-shaded.jar
20/11/10
15:46:50 INFO Executor: Adding
file:/home/tom/app/spark/spark-2.4.7-bin-hadoop2.7/work/app-20201110154647-0020/0/./beam-poc-0.1-shaded.jar
to class loader
20/11/10 15:46:50 INFO TorrentBroadcast: Started reading broadcast variable 0
20/11/10
15:46:50 INFO TransportClientFactory: Successfully created connection to
fedora/192.168.10.172:33169
after 2 ms (0 ms spent in bootstraps)
20/11/10 15:46:50 INFO MemoryStore: Block broadcast_0_piece0
stored as bytes in memory (estimated size 6.8 KB, free 2.2 GB)
20/11/10 15:46:50 INFO TorrentBroadcast:
Reading broadcast variable 0 took 112 ms
20/11/10 15:46:50 INFO MemoryStore: Block broadcast_0 stored
as values in memory (estimated size 16.2 KB, free 2.2 GB)
20/11/10 15:46:51 INFO Executor: Finished
task 0.0 in stage 0.0 (TID 0). 6312 bytes result sent to driver
20/11/10 15:46:51 INFO CoarseGrainedExecutorBackend:
Driver commanded a shutdown
20/11/10 15:46:51 ERROR CoarseGrainedExecutorBackend: RECEIVED SIGNAL TERM
tdown
```
Imported from Jira
[BEAM-11224](https://issues.apache.org/jira/browse/BEAM-11224). Original Jira
may contain additional context.
Reported by: [email protected].
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]