[ 
https://issues.apache.org/jira/browse/SPARK-27833?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Raviteja updated SPARK-27833:
-----------------------------
    Docs Text:   (was: 19/05/24 17:31:58 INFO ConsumerConfig: ConsumerConfig 
values:
        metric.reporters = []
        metadata.max.age.ms = 300000
        partition.assignment.strategy = 
[org.apache.kafka.clients.consumer.RangeAssignor]
        reconnect.backoff.ms = 50
        sasl.kerberos.ticket.renew.window.factor = 0.8
        max.partition.fetch.bytes = 1048576
        bootstrap.servers = [10.20.0.10:6667]
        ssl.keystore.type = JKS
        enable.auto.commit = false
        sasl.mechanism = GSSAPI
        interceptor.classes = null
        exclude.internal.topics = true
        ssl.truststore.password = null
        client.id =
        ssl.endpoint.identification.algorithm = null
        max.poll.records = 1
        check.crcs = true
        request.timeout.ms = 40000
        heartbeat.interval.ms = 3000
        auto.commit.interval.ms = 5000
        receive.buffer.bytes = 65536
        ssl.truststore.type = JKS
        ssl.truststore.location = null
        ssl.keystore.password = null
        fetch.min.bytes = 1
        send.buffer.bytes = 131072
        value.deserializer = class 
org.apache.kafka.common.serialization.ByteArrayDeserializer
        group.id = 
spark-kafka-source-ec244f99-4ea8-467c-bdc5-b2489e2dbf98-1705936181-driver-0
        retry.backoff.ms = 100
        sasl.kerberos.kinit.cmd = /usr/bin/kinit
        sasl.kerberos.service.name = null
        sasl.kerberos.ticket.renew.jitter = 0.05
        ssl.trustmanager.algorithm = PKIX
        ssl.key.password = null
        fetch.max.wait.ms = 500
        sasl.kerberos.min.time.before.relogin = 60000
        connections.max.idle.ms = 540000
        session.timeout.ms = 30000
        metrics.num.samples = 2
        key.deserializer = class 
org.apache.kafka.common.serialization.ByteArrayDeserializer
        ssl.protocol = TLS
        ssl.provider = null
        ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
        ssl.keystore.location = null
        ssl.cipher.suites = null
        security.protocol = PLAINTEXT
        ssl.keymanager.algorithm = SunX509
        metrics.sample.window.ms = 30000
        auto.offset.reset = earliest

19/05/24 17:31:58 INFO ConsumerConfig: ConsumerConfig values:
        metric.reporters = []
        metadata.max.age.ms = 300000
        partition.assignment.strategy = 
[org.apache.kafka.clients.consumer.RangeAssignor]
        reconnect.backoff.ms = 50
        sasl.kerberos.ticket.renew.window.factor = 0.8
        max.partition.fetch.bytes = 1048576
        bootstrap.servers = [10.20.0.10:6667]
        ssl.keystore.type = JKS
        enable.auto.commit = false
        sasl.mechanism = GSSAPI
        interceptor.classes = null
        exclude.internal.topics = true
        ssl.truststore.password = null
        client.id = consumer-1
        ssl.endpoint.identification.algorithm = null
        max.poll.records = 1
        check.crcs = true
        request.timeout.ms = 40000
        heartbeat.interval.ms = 3000
        auto.commit.interval.ms = 5000
        receive.buffer.bytes = 65536
        ssl.truststore.type = JKS
        ssl.truststore.location = null
        ssl.keystore.password = null
        fetch.min.bytes = 1
        send.buffer.bytes = 131072
        value.deserializer = class 
org.apache.kafka.common.serialization.ByteArrayDeserializer
        group.id = 
spark-kafka-source-ec244f99-4ea8-467c-bdc5-b2489e2dbf98-1705936181-driver-0
        retry.backoff.ms = 100
        sasl.kerberos.kinit.cmd = /usr/bin/kinit
        sasl.kerberos.service.name = null
        sasl.kerberos.ticket.renew.jitter = 0.05
        ssl.trustmanager.algorithm = PKIX
        ssl.key.password = null
        fetch.max.wait.ms = 500
        sasl.kerberos.min.time.before.relogin = 60000
        connections.max.idle.ms = 540000
        session.timeout.ms = 30000
        metrics.num.samples = 2
        key.deserializer = class 
org.apache.kafka.common.serialization.ByteArrayDeserializer
        ssl.protocol = TLS
        ssl.provider = null
        ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
        ssl.keystore.location = null
        ssl.cipher.suites = null
        security.protocol = PLAINTEXT
        ssl.keymanager.algorithm = SunX509
        metrics.sample.window.ms = 30000
        auto.offset.reset = earliest

19/05/24 17:31:58 INFO AppInfoParser: Kafka version : 0.10.0.1
19/05/24 17:31:58 INFO AppInfoParser: Kafka commitId : a7a17cdec9eaa6c5
19/05/24 17:32:00 INFO MicroBatchExecution: Starting [id = 
9fab90c9-b1f8-469e-bc9c-71b3d1e5272c, runId = 
56b3c327-d0fb-45b5-a820-4fe16f85f60d]. Use 
hdfs://nn6.htrunk.com:8020/user/tester/agg_fun_30 to store the query checkpoint.
19/05/24 17:32:00 INFO ConsumerConfig: ConsumerConfig values:
        metric.reporters = []
        metadata.max.age.ms = 300000
        partition.assignment.strategy = 
[org.apache.kafka.clients.consumer.RangeAssignor]
        reconnect.backoff.ms = 50
        sasl.kerberos.ticket.renew.window.factor = 0.8
        max.partition.fetch.bytes = 1048576
        bootstrap.servers = [10.20.0.10:6667]
        ssl.keystore.type = JKS
        enable.auto.commit = false
        sasl.mechanism = GSSAPI
        interceptor.classes = null
        exclude.internal.topics = true
        ssl.truststore.password = null
        client.id =
        ssl.endpoint.identification.algorithm = null
        max.poll.records = 1
        check.crcs = true
        request.timeout.ms = 40000
        heartbeat.interval.ms = 3000
        auto.commit.interval.ms = 5000
        receive.buffer.bytes = 65536
        ssl.truststore.type = JKS
        ssl.truststore.location = null
        ssl.keystore.password = null
        fetch.min.bytes = 1
        send.buffer.bytes = 131072
        value.deserializer = class 
org.apache.kafka.common.serialization.ByteArrayDeserializer
        group.id = 
spark-kafka-source-f1621e11-d476-4604-ba35-75a02f45dbfe--1335777992-driver-0
        retry.backoff.ms = 100
        sasl.kerberos.kinit.cmd = /usr/bin/kinit
        sasl.kerberos.service.name = null
        sasl.kerberos.ticket.renew.jitter = 0.05
        ssl.trustmanager.algorithm = PKIX
        ssl.key.password = null
        fetch.max.wait.ms = 500
        sasl.kerberos.min.time.before.relogin = 60000
        connections.max.idle.ms = 540000
        session.timeout.ms = 30000
        metrics.num.samples = 2
        key.deserializer = class 
org.apache.kafka.common.serialization.ByteArrayDeserializer
        ssl.protocol = TLS
        ssl.provider = null
        ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
        ssl.keystore.location = null
        ssl.cipher.suites = null
        security.protocol = PLAINTEXT
        ssl.keymanager.algorithm = SunX509
        metrics.sample.window.ms = 30000
        auto.offset.reset = earliest

19/05/24 17:32:00 INFO ConsumerConfig: ConsumerConfig values:
        metric.reporters = []
        metadata.max.age.ms = 300000
        partition.assignment.strategy = 
[org.apache.kafka.clients.consumer.RangeAssignor]
        reconnect.backoff.ms = 50
        sasl.kerberos.ticket.renew.window.factor = 0.8
        max.partition.fetch.bytes = 1048576
        bootstrap.servers = [10.20.0.10:6667]
        ssl.keystore.type = JKS
        enable.auto.commit = false
        sasl.mechanism = GSSAPI
        interceptor.classes = null
        exclude.internal.topics = true
        ssl.truststore.password = null
        client.id = consumer-2
        ssl.endpoint.identification.algorithm = null
        max.poll.records = 1
        check.crcs = true
        request.timeout.ms = 40000
        heartbeat.interval.ms = 3000
        auto.commit.interval.ms = 5000
        receive.buffer.bytes = 65536
        ssl.truststore.type = JKS
        ssl.truststore.location = null
        ssl.keystore.password = null
        fetch.min.bytes = 1
        send.buffer.bytes = 131072
        value.deserializer = class 
org.apache.kafka.common.serialization.ByteArrayDeserializer
        group.id = 
spark-kafka-source-f1621e11-d476-4604-ba35-75a02f45dbfe--1335777992-driver-0
        retry.backoff.ms = 100
        sasl.kerberos.kinit.cmd = /usr/bin/kinit
        sasl.kerberos.service.name = null
        sasl.kerberos.ticket.renew.jitter = 0.05
        ssl.trustmanager.algorithm = PKIX
        ssl.key.password = null
        fetch.max.wait.ms = 500
        sasl.kerberos.min.time.before.relogin = 60000
        connections.max.idle.ms = 540000
        session.timeout.ms = 30000
        metrics.num.samples = 2
        key.deserializer = class 
org.apache.kafka.common.serialization.ByteArrayDeserializer
        ssl.protocol = TLS
        ssl.provider = null
        ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
        ssl.keystore.location = null
        ssl.cipher.suites = null
        security.protocol = PLAINTEXT
        ssl.keymanager.algorithm = SunX509
        metrics.sample.window.ms = 30000
        auto.offset.reset = earliest

19/05/24 17:32:00 INFO AppInfoParser: Kafka version : 0.10.0.1
19/05/24 17:32:00 INFO AppInfoParser: Kafka commitId : a7a17cdec9eaa6c5
19/05/24 17:32:00 INFO KafkaSource: GetBatch called with start = 
Some({"kafka-agg_1":{"0":77}}), end = {"kafka-agg_1":{"0":80}}
19/05/24 17:32:00 INFO KafkaSource: Partitions added: Map()
19/05/24 17:32:00 INFO KafkaSource: GetBatch generating RDD of offset range: 
KafkaSourceRDDOffsetRange(kafka-agg_1-0,77,80,None)
19/05/24 17:32:00 INFO AbstractCoordinator: Discovered coordinator 
nn10.htrunk.com:6667 (id: 2147482646 rack: null) for group 
spark-kafka-source-f1621e11-d476-4604-ba35-75a02f45dbfe--1335777992-driver-0.
19/05/24 17:32:00 INFO MicroBatchExecution: Streaming query made progress: {
  "id" : "9fab90c9-b1f8-469e-bc9c-71b3d1e5272c",
  "runId" : "56b3c327-d0fb-45b5-a820-4fe16f85f60d",
  "name" : null,
  "timestamp" : "2019-05-24T12:02:00.194Z",
  "batchId" : 6,
  "numInputRows" : 0,
  "processedRowsPerSecond" : 0.0,
  "durationMs" : {
    "getOffset" : 145,
    "triggerExecution" : 282
  },
  "eventTime" : {
    "watermark" : "2019-05-24T11:39:43.259Z"
  },
  "stateOperators" : [ ],
  "sources" : [ {
    "description" : "KafkaSource[Assign[kafka-agg_1-0]]",
    "startOffset" : {
      "kafka-agg_1" : {
        "0" : 80
      }
    },
    "endOffset" : {
      "kafka-agg_1" : {
        "0" : 80
      }
    },
    "numInputRows" : 0,
    "processedRowsPerSecond" : 0.0
  } ],
  "sink" : {
    "description" : 
"com.htrunk.streaming.job.processor.handlers.StreamingCustomSinkHandler$CustomSink@77ddc2e6"
  }
}
19/05/24 17:32:15 INFO MicroBatchExecution: Committed offsets for batch 6. 
Metadata 
OffsetSeqMetadata(1558697983259,1558699335004,Map(spark.sql.shuffle.partitions 
-> 200, spark.sql.streaming.stateStore.providerClass -> 
org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider))
19/05/24 17:32:15 INFO KafkaSource: GetBatch called with start = 
Some({"kafka-agg_1":{"0":80}}), end = {"kafka-agg_1":{"0":83}}
19/05/24 17:32:15 INFO KafkaSource: Partitions added: Map()
19/05/24 17:32:15 INFO KafkaSource: GetBatch generating RDD of offset range: 
KafkaSourceRDDOffsetRange(kafka-agg_1-0,80,83,None)
19/05/24 17:32:15 INFO MemoryStore: Block broadcast_2 stored as values in 
memory (estimated size 1080.0 B, free 912.3 MB)
19/05/24 17:32:15 INFO MemoryStore: Block broadcast_2_piece0 stored as bytes in 
memory (estimated size 350.0 B, free 912.3 MB)
19/05/24 17:32:15 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on 
nn6.htrunk.com:3467 (size: 350.0 B, free: 912.3 MB)
19/05/24 17:32:15 INFO SparkContext: Created broadcast 2 from start at 
SparkTransformStreamingHandler.java:323
19/05/24 17:32:15 ERROR MicroBatchExecution: Query [id = 
9fab90c9-b1f8-469e-bc9c-71b3d1e5272c, runId = 
56b3c327-d0fb-45b5-a820-4fe16f85f60d] terminated with error
java.lang.AssertionError: assertion failed: No plan for EventTimeWatermark 
timestamp#39: timestamp, interval 2 minutes
+- Project [a#37, timestamp#39]
   +- SerializeFromObject [staticinvoke(class 
org.apache.spark.unsafe.types.UTF8String, StringType, fromString, 
validateexternaltype(getexternalrowfield(assertnotnull(input[0, 
org.apache.spark.sql.Row, true]), 0, a), StringType), true, false) AS a#37, 
staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, 
fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, 
org.apache.spark.sql.Row, true]), 1, b), StringType), true, false) AS b#38, 
staticinvoke(class org.apache.spark.sql.catalyst.util.DateTimeUtils$, 
TimestampType, fromJavaTimestamp, 
validateexternaltype(getexternalrowfield(assertnotnull(input[0, 
org.apache.spark.sql.Row, true]), 2, timestamp), TimestampType), true, false) 
AS timestamp#39]
      +- MapElements 
com.htrunk.streaming.job.processor.handlers.SparkTransformStreamingHandler$2@7072f0b7,
 interface org.apache.spark.sql.Row, [StructField(value,StringType,false), 
StructField(timestamp,TimestampType,false)], obj#36: org.apache.spark.sql.Row
         +- MapElements 
com.htrunk.streaming.job.processor.handlers.SparkTransformStreamingHandler$1@570e459e,
 interface org.apache.spark.sql.Row, [StructField(value,StringType,true), 
StructField(timestamp,TimestampType,true)], obj#28: org.apache.spark.sql.Row
            +- DeserializeToObject createexternalrow(value#21.toString, 
staticinvoke(class org.apache.spark.sql.catalyst.util.DateTimeUtils$, 
ObjectType(class java.sql.Timestamp), toJavaTimestamp, timestamp#74, true, 
false), StructField(value,StringType,true), 
StructField(timestamp,TimestampType,true)), obj#27: org.apache.spark.sql.Row
               +- Project [cast(value#70 as string) AS value#21, timestamp#74]
                  +- LogicalRDD [key#69, value#70, topic#71, partition#72, 
offset#73L, timestamp#74, timestampType#75], true

        at scala.Predef$.assert(Predef.scala:170)
        at 
org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
        at 
org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:78)
        at 
org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:75)
        at 
scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
        at 
scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
        at scala.collection.Iterator$class.foreach(Iterator.scala:893)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
        at 
scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
        at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1336)
        at 
org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2.apply(QueryPlanner.scala:75)
        at 
org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2.apply(QueryPlanner.scala:67)
        at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
        at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
        at 
org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
        at 
org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:78)
        at 
org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:75)
        at 
scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
        at 
scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
        at scala.collection.Iterator$class.foreach(Iterator.scala:893)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
        at 
scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
        at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1336)
        at 
org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2.apply(QueryPlanner.scala:75)
        at 
org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2.apply(QueryPlanner.scala:67)
        at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
        at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
        at 
org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
        at 
org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:78)
        at 
org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:75)
        at 
scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
        at 
scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
        at scala.collection.Iterator$class.foreach(Iterator.scala:893)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
        at 
scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
        at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1336)
        at 
org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2.apply(QueryPlanner.scala:75)
        at 
org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2.apply(QueryPlanner.scala:67)
        at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
        at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
        at 
org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
        at 
org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:72)
        at 
org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:68)
        at 
org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:77)
        at 
org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:77)
        at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3248)
        at org.apache.spark.sql.Dataset.head(Dataset.scala:2484)
        at org.apache.spark.sql.Dataset.take(Dataset.scala:2698)
        at org.apache.spark.sql.Dataset.showString(Dataset.scala:254)
        at org.apache.spark.sql.Dataset.show(Dataset.scala:723)
        at org.apache.spark.sql.Dataset.show(Dataset.scala:682)
        at org.apache.spark.sql.Dataset.show(Dataset.scala:691)
        at 
com.htrunk.streaming.job.processor.handlers.StreamingCustomSinkHandler$CustomSink.addBatch(StreamingCustomSinkHandler.java:91)
        at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$3$$anonfun$apply$16.apply(MicroBatchExecution.scala:475)
        at 
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77)
        at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$3.apply(MicroBatchExecution.scala:473)
        at 
org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:271)
        at 
org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
        at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch(MicroBatchExecution.scala:472)
        at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:133)
        at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:121)
        at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:121)
        at 
org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:271)
        at 
org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
        at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:121)
        at 
org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)
        at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:117)
        at 
org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:279)
        at 
org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:189)
org.apache.spark.sql.streaming.StreamingQueryException: assertion failed: No 
plan for EventTimeWatermark timestamp#39: timestamp, interval 2 minutes
+- Project [a#37, timestamp#39]
   +- SerializeFromObject [staticinvoke(class 
org.apache.spark.unsafe.types.UTF8String, StringType, fromString, 
validateexternaltype(getexternalrowfield(assertnotnull(input[0, 
org.apache.spark.sql.Row, true]), 0, a), StringType), true, false) AS a#37, 
staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, 
fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, 
org.apache.spark.sql.Row, true]), 1, b), StringType), true, false) AS b#38, 
staticinvoke(class org.apache.spark.sql.catalyst.util.DateTimeUtils$, 
TimestampType, fromJavaTimestamp, 
validateexternaltype(getexternalrowfield(assertnotnull(input[0, 
org.apache.spark.sql.Row, true]), 2, timestamp), TimestampType), true, false) 
AS timestamp#39]
      +- MapElements 
com.htrunk.streaming.job.processor.handlers.SparkTransformStreamingHandler$2@7072f0b7,
 interface org.apache.spark.sql.Row, [StructField(value,StringType,false), 
StructField(timestamp,TimestampType,false)], obj#36: org.apache.spark.sql.Row
         +- MapElements 
com.htrunk.streaming.job.processor.handlers.SparkTransformStreamingHandler$1@570e459e,
 interface org.apache.spark.sql.Row, [StructField(value,StringType,true), 
StructField(timestamp,TimestampType,true)], obj#28: org.apache.spark.sql.Row
            +- DeserializeToObject createexternalrow(value#21.toString, 
staticinvoke(class org.apache.spark.sql.catalyst.util.DateTimeUtils$, 
ObjectType(class java.sql.Timestamp), toJavaTimestamp, timestamp#74, true, 
false), StructField(value,StringType,true), 
StructField(timestamp,TimestampType,true)), obj#27: org.apache.spark.sql.Row
               +- Project [cast(value#70 as string) AS value#21, timestamp#74]
                  +- LogicalRDD [key#69, value#70, topic#71, partition#72, 
offset#73L, timestamp#74, timestampType#75], true

=== Streaming Query ===
Identifier: [id = 9fab90c9-b1f8-469e-bc9c-71b3d1e5272c, runId = 
56b3c327-d0fb-45b5-a820-4fe16f85f60d]
Current Committed Offsets: {KafkaSource[Assign[kafka-agg_1-0]]: 
{"kafka-agg_1":{"0":80}}}
Current Available Offsets: {KafkaSource[Assign[kafka-agg_1-0]]: 
{"kafka-agg_1":{"0":83}}}

Current State: ACTIVE
Thread State: RUNNABLE

Logical Plan:
Aggregate [window#48, a#37], [window#48 AS window#42, a#37, count(1) AS 
count#47L]
+- Filter isnotnull(timestamp#39)
   +- Project [named_struct(start, precisetimestampconversion(((((CASE WHEN 
(cast(CEIL((cast((precisetimestampconversion(timestamp#39, TimestampType, 
LongType) - 0) as double) / cast(10000000 as double))) as double) = 
(cast((precisetimestampconversion(timestamp#39, TimestampType, LongType) - 0) 
as double) / cast(10000000 as double))) THEN 
(CEIL((cast((precisetimestampconversion(timestamp#39, TimestampType, LongType) 
- 0) as double) / cast(10000000 as double))) + cast(1 as bigint)) ELSE 
CEIL((cast((precisetimestampconversion(timestamp#39, TimestampType, LongType) - 
0) as double) / cast(10000000 as double))) END + cast(0 as bigint)) - cast(1 as 
bigint)) * 10000000) + 0), LongType, TimestampType), end, 
precisetimestampconversion((((((CASE WHEN 
(cast(CEIL((cast((precisetimestampconversion(timestamp#39, TimestampType, 
LongType) - 0) as double) / cast(10000000 as double))) as double) = 
(cast((precisetimestampconversion(timestamp#39, TimestampType, LongType) - 0) 
as double) / cast(10000000 as double))) THEN 
(CEIL((cast((precisetimestampconversion(timestamp#39, TimestampType, LongType) 
- 0) as double) / cast(10000000 as double))) + cast(1 as bigint)) ELSE 
CEIL((cast((precisetimestampconversion(timestamp#39, TimestampType, LongType) - 
0) as double) / cast(10000000 as double))) END + cast(0 as bigint)) - cast(1 as 
bigint)) * 10000000) + 0) + 10000000), LongType, TimestampType)) AS window#48, 
a#37, b#38, timestamp#39-T120000ms]
      +- EventTimeWatermark timestamp#39: timestamp, interval 2 minutes
         +- SerializeFromObject [staticinvoke(class 
org.apache.spark.unsafe.types.UTF8String, StringType, fromString, 
validateexternaltype(getexternalrowfield(assertnotnull(input[0, 
org.apache.spark.sql.Row, true]), 0, a), StringType), true, false) AS a#37, 
staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, 
fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, 
org.apache.spark.sql.Row, true]), 1, b), StringType), true, false) AS b#38, 
staticinvoke(class org.apache.spark.sql.catalyst.util.DateTimeUtils$, 
TimestampType, fromJavaTimestamp, 
validateexternaltype(getexternalrowfield(assertnotnull(input[0, 
org.apache.spark.sql.Row, true]), 2, timestamp), TimestampType), true, false) 
AS timestamp#39]
            +- MapElements 
com.htrunk.streaming.job.processor.handlers.SparkTransformStreamingHandler$2@7072f0b7,
 interface org.apache.spark.sql.Row, [StructField(value,StringType,false), 
StructField(timestamp,TimestampType,false)], obj#36: org.apache.spark.sql.Row
               +- DeserializeToObject createexternalrow(value#29.toString, 
staticinvoke(class org.apache.spark.sql.catalyst.util.DateTimeUtils$, 
ObjectType(class java.sql.Timestamp), toJavaTimestamp, timestamp#30, true, 
false), StructField(value,StringType,false), 
StructField(timestamp,TimestampType,false)), obj#35: org.apache.spark.sql.Row
                  +- SerializeFromObject [staticinvoke(class 
org.apache.spark.unsafe.types.UTF8String, StringType, fromString, 
validateexternaltype(getexternalrowfield(assertnotnull(input[0, 
org.apache.spark.sql.Row, true]), 0, value), StringType), true, false) AS 
value#29, staticinvoke(class org.apache.spark.sql.catalyst.util.DateTimeUtils$, 
TimestampType, fromJavaTimestamp, 
validateexternaltype(getexternalrowfield(assertnotnull(input[0, 
org.apache.spark.sql.Row, true]), 1, timestamp), TimestampType), true, false) 
AS timestamp#30]
                     +- MapElements 
com.htrunk.streaming.job.processor.handlers.SparkTransformStreamingHandler$1@570e459e,
 interface org.apache.spark.sql.Row, [StructField(value,StringType,true), 
StructField(timestamp,TimestampType,true)], obj#28: org.apache.spark.sql.Row
                        +- DeserializeToObject 
createexternalrow(value#21.toString, staticinvoke(class 
org.apache.spark.sql.catalyst.util.DateTimeUtils$, ObjectType(class 
java.sql.Timestamp), toJavaTimestamp, timestamp#22, true, false), 
StructField(value,StringType,true), StructField(timestamp,TimestampType,true)), 
obj#27: org.apache.spark.sql.Row
                           +- Project [cast(value#8 as string) AS value#21, 
cast(timestamp#12 as timestamp) AS timestamp#22]
                              +- StreamingExecutionRelation 
KafkaSource[Assign[kafka-agg_1-0]], [key#7, value#8, topic#9, partition#10, 
offset#11L, timestamp#12, timestampType#13]

        at 
org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:295)
        at 
org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:189)
Caused by: java.lang.AssertionError: assertion failed: No plan for 
EventTimeWatermark timestamp#39: timestamp, interval 2 minutes
+- Project [a#37, timestamp#39]
   +- SerializeFromObject [staticinvoke(class 
org.apache.spark.unsafe.types.UTF8String, StringType, fromString, 
validateexternaltype(getexternalrowfield(assertnotnull(input[0, 
org.apache.spark.sql.Row, true]), 0, a), StringType), true, false) AS a#37, 
staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, 
fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, 
org.apache.spark.sql.Row, true]), 1, b), StringType), true, false) AS b#38, 
staticinvoke(class org.apache.spark.sql.catalyst.util.DateTimeUtils$, 
TimestampType, fromJavaTimestamp, 
validateexternaltype(getexternalrowfield(assertnotnull(input[0, 
org.apache.spark.sql.Row, true]), 2, timestamp), TimestampType), true, false) 
AS timestamp#39]
      +- MapElements 
com.htrunk.streaming.job.processor.handlers.SparkTransformStreamingHandler$2@7072f0b7,
 interface org.apache.spark.sql.Row, [StructField(value,StringType,false), 
StructField(timestamp,TimestampType,false)], obj#36: org.apache.spark.sql.Row
         +- MapElements 
com.htrunk.streaming.job.processor.handlers.SparkTransformStreamingHandler$1@570e459e,
 interface org.apache.spark.sql.Row, [StructField(value,StringType,true), 
StructField(timestamp,TimestampType,true)], obj#28: org.apache.spark.sql.Row
            +- DeserializeToObject createexternalrow(value#21.toString, 
staticinvoke(class org.apache.spark.sql.catalyst.util.DateTimeUtils$, 
ObjectType(class java.sql.Timestamp), toJavaTimestamp, timestamp#74, true, 
false), StructField(value,StringType,true), 
StructField(timestamp,TimestampType,true)), obj#27: org.apache.spark.sql.Row
               +- Project [cast(value#70 as string) AS value#21, timestamp#74]
                  +- LogicalRDD [key#69, value#70, topic#71, partition#72, 
offset#73L, timestamp#74, timestampType#75], true

        at scala.Predef$.assert(Predef.scala:170)
        at 
org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
        at 
org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:78)
        at 
org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:75)
        at 
scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
        at 
scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
        at scala.collection.Iterator$class.foreach(Iterator.scala:893)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
        at 
scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
        at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1336)
        at 
org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2.apply(QueryPlanner.scala:75)
        at 
org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2.apply(QueryPlanner.scala:67)
        at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
        at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
        at 
org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
        at 
org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:78)
        at 
org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:75)
        at 
scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
        at 
scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
        at scala.collection.Iterator$class.foreach(Iterator.scala:893)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
        at 
scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
        at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1336)
        at 
org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2.apply(QueryPlanner.scala:75)
        at 
org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2.apply(QueryPlanner.scala:67)
        at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
        at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
        at 
org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
        at 
org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:78)
        at 
org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:75)
        at 
scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
        at 
scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
        at scala.collection.Iterator$class.foreach(Iterator.scala:893)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
        at 
scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
        at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1336)
        at 
org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2.apply(QueryPlanner.scala:75)
        at 
org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2.apply(QueryPlanner.scala:67)
        at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
        at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
        at 
org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
        at 
org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:72)
        at 
org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:68)
        at 
org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:77)
        at 
org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:77)
        at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3248)
        at org.apache.spark.sql.Dataset.head(Dataset.scala:2484)
        at org.apache.spark.sql.Dataset.take(Dataset.scala:2698)
        at org.apache.spark.sql.Dataset.showString(Dataset.scala:254)
        at org.apache.spark.sql.Dataset.show(Dataset.scala:723)
        at org.apache.spark.sql.Dataset.show(Dataset.scala:682)
        at org.apache.spark.sql.Dataset.show(Dataset.scala:691)
        at 
com.htrunk.streaming.job.processor.handlers.StreamingCustomSinkHandler$CustomSink.addBatch(StreamingCustomSinkHandler.java:91)
        at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$3$$anonfun$apply$16.apply(MicroBatchExecution.scala:475)
        at 
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77)
        at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$3.apply(MicroBatchExecution.scala:473)
        at 
org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:271)
        at 
org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
        at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch(MicroBatchExecution.scala:472)
        at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:133)
        at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:121)
        at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:121)
        at 
org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:271)
        at 
org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
        at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:121)
        at 
org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)
        at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:117)
        at 
org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:279)
        ... 1 more
19/05/24 17:32:16 INFO SparkTransform: Available memory before clear 287227824
19/05/24 17:32:16 INFO MapOutputTrackerMasterEndpoint: 
MapOutputTrackerMasterEndpoint stopped!
19/05/24 17:32:16 INFO MemoryStore: MemoryStore cleared
19/05/24 17:32:16 INFO BlockManager: BlockManager stopped
19/05/24 17:32:16 INFO BlockManagerMaster: BlockManagerMaster stopped
19/05/24 17:32:16 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: 
OutputCommitCoordinator stopped!
19/05/24 17:32:16 INFO SparkContext: Successfully stopped SparkContext
19/05/24 17:32:16 INFO SparkContext: SparkContext already stopped.
19/05/24 17:32:16 INFO SparkTransform: Available memory After clear 554082912
19/05/24 17:32:16 INFO ShutdownHookManager: Shutdown hook called
19/05/24 17:32:16 INFO ShutdownHookManager: Deleting directory 
/tmp/spark-a1743426-7d6b-4a53-aada-959fbd6ccb16
19/05/24 17:32:16 INFO ShutdownHookManager: Deleting directory 
/tmp/temporaryReader-2055b9d8-e278-4ccb-97ce-93cd009d3218
19/05/24 17:32:16 INFO ShutdownHookManager: Deleting directory 
/tmp/spark-2bf8f89a-617d-4ae5-8a9a-2a46dd5696c0)

> Structured Streaming Custom Sink --
> -----------------------------------
>
>                 Key: SPARK-27833
>                 URL: https://issues.apache.org/jira/browse/SPARK-27833
>             Project: Spark
>          Issue Type: Bug
>          Components: Structured Streaming
>    Affects Versions: 2.3.0
>         Environment: spark 2.3.0
> java 1.8
> kafka version 0.10.
>            Reporter: Raviteja
>            Priority: Blocker
>              Labels: spark-streaming-kafka
>         Attachments: kafka_consumer_code.java, kafka_custom_sink.java, 
> kafka_error_log.txt
>
>
> Hi ,
> We have a requirement to read data from kafka, apply some transformation and 
> store data to database .For this we are implementing watermarking feature 
> along with aggregate function and  for storing we are writing our own sink 
> (Structured streaming) .we are using spark 2.3.0, java 1.8 and kafka version 
> 0.10.
>  We are getting the below error.
> "*java.lang.AssertionError: assertion failed: No plan for EventTimeWatermark 
> timestamp#39: timestamp, interval 2 minutes*"
>  
> works perfectly fine when we use Console as sink instead custom sink.  For 
> Debugging the issue, we are performing  "dataframe.show()" in our custom sink 
> and nothing else.  
> Please find the attachment for the Error log and the code. Please look into 
> this issue as this a blocker and we are not able to proceed further or find 
> any alternatives as we need watermarking feature. 
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to