[ https://issues.apache.org/jira/browse/SPARK-24335?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16484649#comment-16484649 ]
Apache Spark commented on SPARK-24335: -------------------------------------- User 'Victsm' has created a pull request for this issue: https://github.com/apache/spark/pull/21402 > Dataset.map schema not applied in some cases > -------------------------------------------- > > Key: SPARK-24335 > URL: https://issues.apache.org/jira/browse/SPARK-24335 > Project: Spark > Issue Type: Bug > Components: Structured Streaming > Affects Versions: 2.1.0, 2.3.0 > Reporter: Robert Reid > Priority: Major > > In the following code an {color:#808080}UnsupportedOperationException{color} > is thrown in the filter() call just after the Dateset.map() call unless > withWatermark() is added between them. The error reports > `{color:#808080}fieldIndex on a Row without schema is undefined{color}`. I > expect the map() method to have applied the schema and for it to be > accessible in filter(). Without the extra withWatermark() call my debugger > reports that the `row` objects in the filter lambda are `GenericRow`. With > the watermark call it reports that they are `GenericRowWithSchema`. > I should add that I'm new to working with Structured Streaming. So if I'm > overlooking some implied dependency please fill me in. > I'm encountering this in new code for a new production job. The presented > code is distilled down to demonstrate the problem. While the problem can be > worked around simply by adding withWatermark() I'm concerned that this will > leave the code in a fragile state. With this simplified code if this error > occurs again it will be easy to identify what change led to the error. But > in the code I'm writing, with this functionality delegated to other classes, > it is (and has been) very challenging to identify the cause. > > {code:java} > public static void main(String[] args) { > SparkSession sparkSession = > SparkSession.builder().master("local").getOrCreate(); > sparkSession.conf().set( > "spark.sql.streaming.checkpointLocation", > "hdfs://localhost:9000/search_relevance/checkpoint" // for spark > 2.3 > // "spark.sql.streaming.checkpointLocation", "tmp/checkpoint" // > for spark 2.1 > ); > StructType inSchema = DataTypes.createStructType( > new StructField[] { > DataTypes.createStructField("id", DataTypes.StringType > , false), > DataTypes.createStructField("ts", DataTypes.TimestampType > , false), > DataTypes.createStructField("f1", DataTypes.LongType > , true) > } > ); > Dataset<Row> rawSet = sparkSession.sqlContext().readStream() > .format("rate") > .option("rowsPerSecond", 1) > .load() > .map( (MapFunction<Row, Row>) raw -> { > Object[] fields = new Object[3]; > fields[0] = "id1"; > fields[1] = raw.getAs("timestamp"); > fields[2] = raw.getAs("value"); > return RowFactory.create(fields); > }, > RowEncoder.apply(inSchema) > ) > // If withWatermark() is included above the filter() line then > this works. Without it we get: > // Caused by: java.lang.UnsupportedOperationException: > fieldIndex on a Row without schema is undefined. > // at the row.getAs() call. > // .withWatermark("ts", "10 seconds") // <-- This is required > for row.getAs("f1") to work ??? > .filter((FilterFunction<Row>) row -> !row.getAs("f1").equals(0L)) > .withWatermark("ts", "10 seconds") > ; > StreamingQuery streamingQuery = rawSet > .select("*") > .writeStream() > .format("console") > .outputMode("append") > .start(); > try { > streamingQuery.awaitTermination(30_000); > } catch (StreamingQueryException e) { > System.out.println("Caught exception at 'awaitTermination':"); > e.printStackTrace(); > } > }{code} -- This message was sent by Atlassian JIRA (v7.6.3#76005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org