[ https://issues.apache.org/jira/browse/SPARK-23766?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Apeksha Agnihotri updated SPARK-23766: -------------------------------------- Component/s: (was: Spark Core) Structured Streaming > Not able to execute multiple queries in spark structured streaming > ------------------------------------------------------------------ > > Key: SPARK-23766 > URL: https://issues.apache.org/jira/browse/SPARK-23766 > Project: Spark > Issue Type: Bug > Components: Structured Streaming > Affects Versions: 2.2.0 > Reporter: Apeksha Agnihotri > Priority: Major > > I am able to receive output of first query(.ie reader) only. Although all the > queries are running in logs.No data is stored in hdfs also > > {code:java} > public class A extends D implements Serializable { > public Dataset<Row> getDataSet(SparkSession session) { > Dataset<Row> dfs = > session.readStream().format("socket").option("host", hostname).option("port", > port).load(); > publish(dfs.toDF(), "reader"); > return dfs; > } > } > public class B extends D implements Serializable { > public Dataset<Row> execute(Dataset<Row> ds) { > Dataset<Row> d = > ds.select(functions.explode(functions.split(ds.col("value"), "\\s+"))); > publish(d.toDF(), "component"); > return d; > } > } > public class C extends D implements Serializable { > public Dataset<Row> execute(Dataset<Row> ds) { > publish(inputDataSet.toDF(), "console"); > ds.writeStream().format("csv").option("path", > "hdfs://hostname:9000/user/abc/data1/") > .option("checkpointLocation", > "hdfs://hostname:9000/user/abc/cp").outputMode("append").start(); > return ds; > } > } > public class D { > public void publish(Dataset<Row> dataset, String name) { > dataset.writeStream().format("csv").queryName(name).option("path", > "hdfs://hostname:9000/user/abc/" + name) > .option("checkpointLocation", > "hdfs://hostname:9000/user/abc/checkpoint/" + directory).outputMode("append") > .start(); > } > } > public static void main(String[] args) { > SparkSession session = createSession(); > try { > A a = new A(); > Dataset<Row> records = a.getDataSet(session); > B b = new B(); > Dataset<Row> ds = b.execute(records); > C c = new C(); > c.execute(ds); > session.streams().awaitAnyTermination(); > } catch (StreamingQueryException e) { > e.printStackTrace(); > } > } > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org