You specify the schema when loading a dataframe by calling spark.read.schema(...)...
On Tue, Sep 12, 2017 at 4:50 PM, Sunita Arvind <sunitarv...@gmail.com> wrote: > Hi Michael, > > I am wondering what I am doing wrong. I get error like: > > Exception in thread "main" java.lang.IllegalArgumentException: Schema > must be specified when creating a streaming source DataFrame. If some files > already exist in the directory, then depending on the file format you may > be able to create a static DataFrame on that directory with > 'spark.read.load(directory)' and infer schema from it. > at org.apache.spark.sql.execution.datasources.DataSource.sourceSchema( > DataSource.scala:223) > at org.apache.spark.sql.execution.datasources.DataSource.sourceInfo$ > lzycompute(DataSource.scala:87) > at org.apache.spark.sql.execution.datasources.DataSource.sourceInfo( > DataSource.scala:87) > at org.apache.spark.sql.execution.streaming.StreamingRelation$.apply( > StreamingRelation.scala:30) > at org.apache.spark.sql.streaming.DataStreamReader. > load(DataStreamReader.scala:125) > at org.apache.spark.sql.streaming.DataStreamReader. > load(DataStreamReader.scala:134) > at com.aol.persist.UplynkAggregates$.aggregator( > UplynkAggregates.scala:23) > at com.aol.persist.UplynkAggregates$.main(UplynkAggregates.scala:41) > at com.aol.persist.UplynkAggregates.main(UplynkAggregates.scala) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at sun.reflect.NativeMethodAccessorImpl.invoke( > NativeMethodAccessorImpl.java:62) > at sun.reflect.DelegatingMethodAccessorImpl.invoke( > DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at com.intellij.rt.execution.application.AppMain.main( > AppMain.java:144) > 17/09/12 14:46:22 INFO SparkContext: Invoking stop() from shutdown hook > > > I tried specifying the schema as well. > Here is my code: > > object Aggregates { > > val aggregation= > """select sum(col1), sum(col2), id, first(name) > from enrichedtb > group by id > """.stripMargin > > def aggregator(conf:Config)={ > implicit val spark = > SparkSession.builder().appName(conf.getString("AppName")).getOrCreate() > implicit val sqlctx = spark.sqlContext > printf("Source path is" + conf.getString("source.path")) > val schemadf = sqlctx.read.parquet(conf.getString("source.path")) // > Added this as it was complaining about schema. > val df=spark.readStream.format("parquet").option("inferSchema", > true).schema(schemadf.schema).load(conf.getString("source.path")) > df.createOrReplaceTempView("enrichedtb") > val res = spark.sql(aggregation) > > res.writeStream.format("parquet").outputMode("append").option("checkpointLocation",conf.getString("checkpointdir")).start(conf.getString("sink.outputpath")) > } > > def main(args: Array[String]): Unit = { > val mainconf = ConfigFactory.load() > val conf = mainconf.getConfig(mainconf.getString("pipeline")) > print(conf.toString) > aggregator(conf) > } > > } > > > I tried to extract schema from static read of the input path and provided it > to the readStream API. With that, I get this error: > > at > org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.org$apache$spark$sql$catalyst$analysis$UnsupportedOperationChecker$$throwError(UnsupportedOperationChecker.scala:297) > at > org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.checkForStreaming(UnsupportedOperationChecker.scala:109) > at > org.apache.spark.sql.streaming.StreamingQueryManager.createQuery(StreamingQueryManager.scala:232) > at > org.apache.spark.sql.streaming.StreamingQueryManager.startQuery(StreamingQueryManager.scala:278) > at > org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:282) > at > org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:222) > > While running on the EMR cluster all paths point to S3. In my laptop, they > all point to local filesystem. > > I am using Spark2.2.0 > > Appreciate your help. > > regards > > Sunita > > > On Wed, Aug 23, 2017 at 2:30 PM, Michael Armbrust <mich...@databricks.com> > wrote: > >> If you use structured streaming and the file sink, you can have a >> subsequent stream read using the file source. This will maintain exactly >> once processing even if there are hiccups or failures. >> >> On Mon, Aug 21, 2017 at 2:02 PM, Sunita Arvind <sunitarv...@gmail.com> >> wrote: >> >>> Hello Spark Experts, >>> >>> I have a design question w.r.t Spark Streaming. I have a streaming job >>> that consumes protocol buffer encoded real time logs from a Kafka cluster >>> on premise. My spark application runs on EMR (aws) and persists data onto >>> s3. Before I persist, I need to strip header and convert protobuffer to >>> parquet (I use sparksql-scalapb to convert from Protobuff to >>> Spark.sql.Row). I need to persist Raw logs as is. I can continue the >>> enrichment on the same dataframe after persisting the raw data, however, in >>> order to modularize I am planning to have a separate job which picks up the >>> raw data and performs enrichment on it. Also, I am trying to avoid all in >>> 1 job as the enrichments could get project specific while raw data >>> persistence stays customer/project agnostic.The enriched data is allowed to >>> have some latency (few minutes) >>> >>> My challenge is, after persisting the raw data, how do I chain the next >>> streaming job. The only way I can think of is - job 1 (raw data) >>> partitions on current date (YYYYMMDD) and within current date, the job 2 >>> (enrichment job) filters for records within 60s of current time and >>> performs enrichment on it in 60s batches. >>> Is this a good option? It seems to be error prone. When either of the >>> jobs get delayed due to bursts or any error/exception this could lead to >>> huge data losses and non-deterministic behavior . What are other >>> alternatives to this? >>> >>> Appreciate any guidance in this regard. >>> >>> regards >>> Sunita Koppar >>> >> >> >