[
https://issues.apache.org/jira/browse/SPARK-23337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16367933#comment-16367933
]
Michael Armbrust commented on SPARK-23337:
------------------------------------------
This is essentially the same issue as SPARK-18084. We are taking a column name
here, not an expression. As such you can only reference top level columns. I
agree this is an annoying aspect of the API, but changing it might have to
happen at a major release since it would be change in behavior.
> withWatermark raises an exception on struct objects
> ---------------------------------------------------
>
> Key: SPARK-23337
> URL: https://issues.apache.org/jira/browse/SPARK-23337
> Project: Spark
> Issue Type: Bug
> Components: Structured Streaming
> Affects Versions: 2.2.1
> Environment: Linux Ubuntu, Spark on standalone mode
> Reporter: Aydin Kocas
> Priority: Major
>
> Hi,
>
> when using a nested object (I mean an object within a struct, here concrete:
> _source.createTime) from a json file as the parameter for the
> withWatermark-method, I get an exception (see below).
> Anything else works flawlessly with the nested object.
>
> +*{color:#14892c}works:{color}*+
> {code:java}
> Dataset<Row> jsonRow =
> spark.readStream().schema(getJSONSchema()).json(file).dropDuplicates("_id").withWatermark("myTime",
> "10 seconds").toDF();{code}
>
> json structure:
> {code:java}
> root
> |-- _id: string (nullable = true)
> |-- _index: string (nullable = true)
> |-- _score: long (nullable = true)
> |-- myTime: timestamp (nullable = true)
> ..{code}
> +*{color:#d04437}does not work - nested json{color}:*+
> {code:java}
> Dataset<Row> jsonRow =
> spark.readStream().schema(getJSONSchema()).json(file).dropDuplicates("_id").withWatermark("_source.createTime",
> "10 seconds").toDF();{code}
>
> json structure:
>
> {code:java}
> root
> |-- _id: string (nullable = true)
> |-- _index: string (nullable = true)
> |-- _score: long (nullable = true)
> |-- _source: struct (nullable = true)
> | |-- createTime: timestamp (nullable = true)
> ..
>
> Exception in thread "main"
> org.apache.spark.sql.catalyst.errors.package$TreeNodeException: makeCopy,
> tree:
> 'EventTimeWatermark '_source.createTime, interval 10 seconds
> +- Deduplicate [_id#0], true
> +- StreamingRelation
> DataSource(org.apache.spark.sql.SparkSession@5dbbb292,json,List(),Some(StructType(StructField(_id,StringType,true),
> StructField(_index,StringType,true), StructField(_score,LongType,true),
> StructField(_source,StructType(StructField(additionalData,StringType,true),
> StructField(client,StringType,true),
> StructField(clientDomain,BooleanType,true),
> StructField(clientVersion,StringType,true),
> StructField(country,StringType,true),
> StructField(countryName,StringType,true),
> StructField(createTime,TimestampType,true),
> StructField(externalIP,StringType,true),
> StructField(hostname,StringType,true),
> StructField(internalIP,StringType,true),
> StructField(location,StringType,true),
> StructField(locationDestination,StringType,true),
> StructField(login,StringType,true),
> StructField(originalRequestString,StringType,true),
> StructField(password,StringType,true),
> StructField(peerIdent,StringType,true),
> StructField(peerType,StringType,true),
> StructField(recievedTime,TimestampType,true),
> StructField(sessionEnd,StringType,true),
> StructField(sessionStart,StringType,true),
> StructField(sourceEntryAS,StringType,true),
> StructField(sourceEntryIp,StringType,true),
> StructField(sourceEntryPort,StringType,true),
> StructField(targetCountry,StringType,true),
> StructField(targetCountryName,StringType,true),
> StructField(targetEntryAS,StringType,true),
> StructField(targetEntryIp,StringType,true),
> StructField(targetEntryPort,StringType,true),
> StructField(targetport,StringType,true),
> StructField(username,StringType,true),
> StructField(vulnid,StringType,true)),true),
> StructField(_type,StringType,true))),List(),None,Map(path -> ./input/),None),
> FileSource[./input/], [_id#0, _index#1, _score#2L, _source#3, _type#4]
> at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:56)
> at org.apache.spark.sql.catalyst.trees.TreeNode.makeCopy(TreeNode.scala:385)
> at
> org.apache.spark.sql.catalyst.plans.QueryPlan.mapExpressions(QueryPlan.scala:300)
> at
> org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressionsUp(QueryPlan.scala:268)
> at
> org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$$anonfun$apply$9.applyOrElse(Analyzer.scala:854)
> at
> org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$$anonfun$apply$9.applyOrElse(Analyzer.scala:796)
> at
> org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$resolveOperators$1.apply(LogicalPlan.scala:62)
> at
> org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$resolveOperators$1.apply(LogicalPlan.scala:62)
> at
> org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
> at
> org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperators(LogicalPlan.scala:61)
> at
> org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$.apply(Analyzer.scala:796)
> at
> org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$.apply(Analyzer.scala:674)
> at
> org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:85)
> at
> org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:82)
> at
> scala.collection.LinearSeqOptimized$class.foldLeft(LinearSeqOptimized.scala:124)
> at scala.collection.immutable.List.foldLeft(List.scala:84)
> at
> org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:82)
> at
> org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:74)
> at scala.collection.immutable.List.foreach(List.scala:381)
> at
> org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:74)
> at
> org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:69)
> at
> org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:67)
> at
> org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:50)
> at org.apache.spark.sql.Dataset.<init>(Dataset.scala:165)
> at org.apache.spark.sql.Dataset.<init>(Dataset.scala:171)
> at org.apache.spark.sql.Dataset$.apply(Dataset.scala:62)
> at org.apache.spark.sql.Dataset.withTypedPlan(Dataset.scala:2889)
> at org.apache.spark.sql.Dataset.withWatermark(Dataset.scala:569)
> at
> my.package.hpstatistics.Importer.importViaSparkStreaming(Importer.java:278)
> at my.package.hpstatistics.Main.main(Main.java:80)
> Caused by: org.apache.spark.sql.catalyst.errors.package$TreeNodeException:
> Failed to copy node.
> Is otherCopyArgs specified correctly for EventTimeWatermark.
> Exception message: argument type mismatch
> ctor: public
> org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark(org.apache.spark.sql.catalyst.expressions.Attribute,org.apache.spark.unsafe.types.CalendarInterval,org.apache.spark.sql.catalyst.plans.logical.LogicalPlan)?
> types: class org.apache.spark.sql.catalyst.expressions.Alias, class
> org.apache.spark.unsafe.types.CalendarInterval, class
> org.apache.spark.sql.catalyst.plans.logical.Deduplicate
> args: _source#3.createTime AS createTime#12, interval 10 seconds, Deduplicate
> [_id#0], true
> +- StreamingRelation
> DataSource(org.apache.spark.sql.SparkSession@5dbbb292,json,List(),Some(StructType(StructField(_id,StringType,true),
> StructField(_index,StringType,true), StructField(_score,LongType,true),
> StructField(_source,StructType(StructField(additionalData,StringType,true),
> StructField(client,StringType,true),
> StructField(clientDomain,BooleanType,true),
> StructField(clientVersion,StringType,true),
> StructField(country,StringType,true),
> StructField(countryName,StringType,true),
> StructField(createTime,TimestampType,true),
> StructField(externalIP,StringType,true),
> StructField(hostname,StringType,true),
> StructField(internalIP,StringType,true),
> StructField(location,StringType,true),
> StructField(locationDestination,StringType,true),
> StructField(login,StringType,true),
> StructField(originalRequestString,StringType,true),
> StructField(password,StringType,true),
> StructField(peerIdent,StringType,true),
> StructField(peerType,StringType,true),
> StructField(recievedTime,TimestampType,true),
> StructField(sessionEnd,StringType,true),
> StructField(sessionStart,StringType,true),
> StructField(sourceEntryAS,StringType,true),
> StructField(sourceEntryIp,StringType,true),
> StructField(sourceEntryPort,StringType,true),
> StructField(targetCountry,StringType,true),
> StructField(targetCountryName,StringType,true),
> StructField(targetEntryAS,StringType,true),
> StructField(targetEntryIp,StringType,true),
> StructField(targetEntryPort,StringType,true),
> StructField(targetport,StringType,true),
> StructField(username,StringType,true),
> StructField(vulnid,StringType,true)),true),
> StructField(_type,StringType,true))),List(),None,Map(path -> ./input/),None),
> FileSource[./input/], [_id#0, _index#1, _score#2L, _source#3, _type#4]
> , tree:
> 'EventTimeWatermark '_source.createTime, interval 10 seconds
> +- Deduplicate [_id#0], true
> +- StreamingRelation
> DataSource(org.apache.spark.sql.SparkSession@5dbbb292,json,List(),Some(StructType(StructField(_id,StringType,true),
> StructField(_index,StringType,true), StructField(_score,LongType,true),
> StructField(_source,StructType(StructField(additionalData,StringType,true),
> StructField(client,StringType,true),
> StructField(clientDomain,BooleanType,true),
> StructField(clientVersion,StringType,true),
> StructField(country,StringType,true),
> StructField(countryName,StringType,true),
> StructField(createTime,TimestampType,true),
> StructField(externalIP,StringType,true),
> StructField(hostname,StringType,true),
> StructField(internalIP,StringType,true),
> StructField(location,StringType,true),
> StructField(locationDestination,StringType,true),
> StructField(login,StringType,true),
> StructField(originalRequestString,StringType,true),
> StructField(password,StringType,true),
> StructField(peerIdent,StringType,true),
> StructField(peerType,StringType,true),
> StructField(recievedTime,TimestampType,true),
> StructField(sessionEnd,StringType,true),
> StructField(sessionStart,StringType,true),
> StructField(sourceEntryAS,StringType,true),
> StructField(sourceEntryIp,StringType,true),
> StructField(sourceEntryPort,StringType,true),
> StructField(targetCountry,StringType,true),
> StructField(targetCountryName,StringType,true),
> StructField(targetEntryAS,StringType,true),
> StructField(targetEntryIp,StringType,true),
> StructField(targetEntryPort,StringType,true),
> StructField(targetport,StringType,true),
> StructField(username,StringType,true),
> StructField(vulnid,StringType,true)),true),
> StructField(_type,StringType,true))),List(),None,Map(path -> ./input/),None),
> FileSource[./input/], [_id#0, _index#1, _score#2L, _source#3, _type#4]
> at
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$makeCopy$1.apply(TreeNode.scala:415)
> at
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$makeCopy$1.apply(TreeNode.scala:385)
> at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:52)
> ... 29 more
> {code}
>
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]