Shaanan Cohney created SPARK-8553:
-------------------------------------

             Summary: Resuming Checkpointed QueueStream Fails
                 Key: SPARK-8553
                 URL: https://issues.apache.org/jira/browse/SPARK-8553
             Project: Spark
          Issue Type: Bug
          Components: PySpark, Streaming
    Affects Versions: 1.4.0
            Reporter: Shaanan Cohney


After using a QueueStream within a checkpointed StreamingContext, when the 
context is resumed the following error is triggered:

{code}
"ERROR StreamingContext: Error starting the context, marking it as stopped
org.apache.spark.SparkException: RDD transformations and actions can only be 
invoked by the driver, not inside of other transformations;... see SPARK-5063.
        at org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$sc(RDD.scala:87)" 
{code}


The code triggering the error is Python3 running on  Spark Standalone:

{code}
ssc = StreamingContext.getOrCreate(s3n_path, make_ssc)
....
p_batches = [ssc.sparkContext.parallelize(batch) for batch in task_batches]
sieving_tasks = ssc.queueStream(p_batches)
relations = sieving_tasks.map(lambda s: run_sieving_command(s, poly, poly_path, 
fb_paths))
countsState = relations.updateStateByKey(update_state)
countsState.foreachRDD(gen_finals)
ssc.checkpoint(s3n_path)
....
ssc.start()
....

def update_state(count, counts):
    if counts is None:
        counts = []
    print(count)
    counts.append(count)
    return counts



def gen_finals(rdd):
    for (link, rank) in rdd.collect():

        acc = 0
        for l in rank:
            acc += sum(l)
        run_sieving.counts.append(acc)
        run_sieving.out_files.add(link)
{code}




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to