[ 
https://issues.apache.org/jira/browse/SPARK-24335?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16484649#comment-16484649
 ] 

Apache Spark commented on SPARK-24335:
--------------------------------------

User 'Victsm' has created a pull request for this issue:
https://github.com/apache/spark/pull/21402

> Dataset.map schema not applied in some cases
> --------------------------------------------
>
>                 Key: SPARK-24335
>                 URL: https://issues.apache.org/jira/browse/SPARK-24335
>             Project: Spark
>          Issue Type: Bug
>          Components: Structured Streaming
>    Affects Versions: 2.1.0, 2.3.0
>            Reporter: Robert Reid
>            Priority: Major
>
> In the following code an {color:#808080}UnsupportedOperationException{color} 
> is thrown in the filter() call just after the Dateset.map() call unless 
> withWatermark() is added between them. The error reports 
> `{color:#808080}fieldIndex on a Row without schema is undefined{color}`.  I 
> expect the map() method to have applied the schema and for it to be 
> accessible in filter().  Without the extra withWatermark() call my debugger 
> reports that the `row` objects in the filter lambda are `GenericRow`.  With 
> the watermark call it reports that they are `GenericRowWithSchema`.
> I should add that I'm new to working with Structured Streaming.  So if I'm 
> overlooking some implied dependency please fill me in.
> I'm encountering this in new code for a new production job. The presented 
> code is distilled down to demonstrate the problem.  While the problem can be 
> worked around simply by adding withWatermark() I'm concerned that this will 
> leave the code in a fragile state.  With this simplified code if this error 
> occurs again it will be easy to identify what change led to the error.  But 
> in the code I'm writing, with this functionality delegated to other classes, 
> it is (and has been) very challenging to identify the cause.
>  
> {code:java}
> public static void main(String[] args) {
>     SparkSession sparkSession = 
> SparkSession.builder().master("local").getOrCreate();
>     sparkSession.conf().set(
>             "spark.sql.streaming.checkpointLocation",
>             "hdfs://localhost:9000/search_relevance/checkpoint" // for spark 
> 2.3
>             // "spark.sql.streaming.checkpointLocation", "tmp/checkpoint" // 
> for spark 2.1
>     );
>     StructType inSchema = DataTypes.createStructType(
>             new StructField[] {
>                     DataTypes.createStructField("id", DataTypes.StringType    
>   , false),
>                     DataTypes.createStructField("ts", DataTypes.TimestampType 
>   , false),
>                     DataTypes.createStructField("f1", DataTypes.LongType      
>   , true)
>             }
>     );
>     Dataset<Row> rawSet = sparkSession.sqlContext().readStream()
>             .format("rate")
>             .option("rowsPerSecond", 1)
>             .load()
>             .map(   (MapFunction<Row, Row>) raw -> {
>                         Object[] fields = new Object[3];
>                         fields[0] = "id1";
>                         fields[1] = raw.getAs("timestamp");
>                         fields[2] = raw.getAs("value");
>                         return RowFactory.create(fields);
>                     },
>                     RowEncoder.apply(inSchema)
>             )
>             // If withWatermark() is included above the filter() line then 
> this works.  Without it we get:
>             //    Caused by: java.lang.UnsupportedOperationException: 
> fieldIndex on a Row without schema is undefined.
>             // at the row.getAs() call.
>             // .withWatermark("ts", "10 seconds")  // <-- This is required 
> for row.getAs("f1") to work ???
>             .filter((FilterFunction<Row>) row -> !row.getAs("f1").equals(0L))
>             .withWatermark("ts", "10 seconds")
>             ;
>     StreamingQuery streamingQuery = rawSet
>             .select("*")
>             .writeStream()
>             .format("console")
>             .outputMode("append")
>             .start();
>     try {
>         streamingQuery.awaitTermination(30_000);
>     } catch (StreamingQueryException e) {
>         System.out.println("Caught exception at 'awaitTermination':");
>         e.printStackTrace();
>     }
> }{code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to