[
https://issues.apache.org/jira/browse/SPARK-23766?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17349428#comment-17349428
]
Jian Xu commented on SPARK-23766:
---------------------------------
Hi Hyukjin;
I see you close up this issue. I have similar problem as decribed. wonder how
you resolve the issue.
> Not able to execute multiple queries in spark structured streaming
> ------------------------------------------------------------------
>
> Key: SPARK-23766
> URL: https://issues.apache.org/jira/browse/SPARK-23766
> Project: Spark
> Issue Type: Bug
> Components: Structured Streaming
> Affects Versions: 2.2.0
> Reporter: Apeksha Agnihotri
> Priority: Major
> Labels: bulk-closed
>
> I am able to receive output of first query(.ie reader) only. Although all the
> queries are running in logs.No data is stored in hdfs also
>
> {code:java}
> public class A extends D implements Serializable {
> public Dataset<Row> getDataSet(SparkSession session) {
> Dataset<Row> dfs =
> session.readStream().format("socket").option("host", hostname).option("port",
> port).load();
> publish(dfs.toDF(), "reader");
> return dfs;
> }
> }
> public class B extends D implements Serializable {
> public Dataset<Row> execute(Dataset<Row> ds) {
> Dataset<Row> d =
> ds.select(functions.explode(functions.split(ds.col("value"), "\\s+")));
> publish(d.toDF(), "component");
> return d;
> }
> }
> public class C extends D implements Serializable {
> public Dataset<Row> execute(Dataset<Row> ds) {
> publish(inputDataSet.toDF(), "console");
> ds.writeStream().format("csv").option("path",
> "hdfs://hostname:9000/user/abc/data1/")
> .option("checkpointLocation",
> "hdfs://hostname:9000/user/abc/cp").outputMode("append").start();
> return ds;
> }
> }
> public class D {
> public void publish(Dataset<Row> dataset, String name) {
> dataset.writeStream().format("csv").queryName(name).option("path",
> "hdfs://hostname:9000/user/abc/" + name)
> .option("checkpointLocation",
> "hdfs://hostname:9000/user/abc/checkpoint/" + directory).outputMode("append")
> .start();
> }
> }
> public static void main(String[] args) {
> SparkSession session = createSession();
> try {
> A a = new A();
> Dataset<Row> records = a.getDataSet(session);
> B b = new B();
> Dataset<Row> ds = b.execute(records);
> C c = new C();
> c.execute(ds);
> session.streams().awaitAnyTermination();
> } catch (StreamingQueryException e) {
> e.printStackTrace();
> }
> }
> {code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]