[ https://issues.apache.org/jira/browse/SPARK-27833?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Raviteja updated SPARK-27833: ----------------------------- Docs Text: (was: 19/05/24 17:31:58 INFO ConsumerConfig: ConsumerConfig values: metric.reporters = [] metadata.max.age.ms = 300000 partition.assignment.strategy = [org.apache.kafka.clients.consumer.RangeAssignor] reconnect.backoff.ms = 50 sasl.kerberos.ticket.renew.window.factor = 0.8 max.partition.fetch.bytes = 1048576 bootstrap.servers = [10.20.0.10:6667] ssl.keystore.type = JKS enable.auto.commit = false sasl.mechanism = GSSAPI interceptor.classes = null exclude.internal.topics = true ssl.truststore.password = null client.id = ssl.endpoint.identification.algorithm = null max.poll.records = 1 check.crcs = true request.timeout.ms = 40000 heartbeat.interval.ms = 3000 auto.commit.interval.ms = 5000 receive.buffer.bytes = 65536 ssl.truststore.type = JKS ssl.truststore.location = null ssl.keystore.password = null fetch.min.bytes = 1 send.buffer.bytes = 131072 value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer group.id = spark-kafka-source-ec244f99-4ea8-467c-bdc5-b2489e2dbf98-1705936181-driver-0 retry.backoff.ms = 100 sasl.kerberos.kinit.cmd = /usr/bin/kinit sasl.kerberos.service.name = null sasl.kerberos.ticket.renew.jitter = 0.05 ssl.trustmanager.algorithm = PKIX ssl.key.password = null fetch.max.wait.ms = 500 sasl.kerberos.min.time.before.relogin = 60000 connections.max.idle.ms = 540000 session.timeout.ms = 30000 metrics.num.samples = 2 key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer ssl.protocol = TLS ssl.provider = null ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] ssl.keystore.location = null ssl.cipher.suites = null security.protocol = PLAINTEXT ssl.keymanager.algorithm = SunX509 metrics.sample.window.ms = 30000 auto.offset.reset = earliest 19/05/24 17:31:58 INFO ConsumerConfig: ConsumerConfig values: metric.reporters = [] metadata.max.age.ms = 300000 partition.assignment.strategy = [org.apache.kafka.clients.consumer.RangeAssignor] reconnect.backoff.ms = 50 sasl.kerberos.ticket.renew.window.factor = 0.8 max.partition.fetch.bytes = 1048576 bootstrap.servers = [10.20.0.10:6667] ssl.keystore.type = JKS enable.auto.commit = false sasl.mechanism = GSSAPI interceptor.classes = null exclude.internal.topics = true ssl.truststore.password = null client.id = consumer-1 ssl.endpoint.identification.algorithm = null max.poll.records = 1 check.crcs = true request.timeout.ms = 40000 heartbeat.interval.ms = 3000 auto.commit.interval.ms = 5000 receive.buffer.bytes = 65536 ssl.truststore.type = JKS ssl.truststore.location = null ssl.keystore.password = null fetch.min.bytes = 1 send.buffer.bytes = 131072 value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer group.id = spark-kafka-source-ec244f99-4ea8-467c-bdc5-b2489e2dbf98-1705936181-driver-0 retry.backoff.ms = 100 sasl.kerberos.kinit.cmd = /usr/bin/kinit sasl.kerberos.service.name = null sasl.kerberos.ticket.renew.jitter = 0.05 ssl.trustmanager.algorithm = PKIX ssl.key.password = null fetch.max.wait.ms = 500 sasl.kerberos.min.time.before.relogin = 60000 connections.max.idle.ms = 540000 session.timeout.ms = 30000 metrics.num.samples = 2 key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer ssl.protocol = TLS ssl.provider = null ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] ssl.keystore.location = null ssl.cipher.suites = null security.protocol = PLAINTEXT ssl.keymanager.algorithm = SunX509 metrics.sample.window.ms = 30000 auto.offset.reset = earliest 19/05/24 17:31:58 INFO AppInfoParser: Kafka version : 0.10.0.1 19/05/24 17:31:58 INFO AppInfoParser: Kafka commitId : a7a17cdec9eaa6c5 19/05/24 17:32:00 INFO MicroBatchExecution: Starting [id = 9fab90c9-b1f8-469e-bc9c-71b3d1e5272c, runId = 56b3c327-d0fb-45b5-a820-4fe16f85f60d]. Use hdfs://nn6.htrunk.com:8020/user/tester/agg_fun_30 to store the query checkpoint. 19/05/24 17:32:00 INFO ConsumerConfig: ConsumerConfig values: metric.reporters = [] metadata.max.age.ms = 300000 partition.assignment.strategy = [org.apache.kafka.clients.consumer.RangeAssignor] reconnect.backoff.ms = 50 sasl.kerberos.ticket.renew.window.factor = 0.8 max.partition.fetch.bytes = 1048576 bootstrap.servers = [10.20.0.10:6667] ssl.keystore.type = JKS enable.auto.commit = false sasl.mechanism = GSSAPI interceptor.classes = null exclude.internal.topics = true ssl.truststore.password = null client.id = ssl.endpoint.identification.algorithm = null max.poll.records = 1 check.crcs = true request.timeout.ms = 40000 heartbeat.interval.ms = 3000 auto.commit.interval.ms = 5000 receive.buffer.bytes = 65536 ssl.truststore.type = JKS ssl.truststore.location = null ssl.keystore.password = null fetch.min.bytes = 1 send.buffer.bytes = 131072 value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer group.id = spark-kafka-source-f1621e11-d476-4604-ba35-75a02f45dbfe--1335777992-driver-0 retry.backoff.ms = 100 sasl.kerberos.kinit.cmd = /usr/bin/kinit sasl.kerberos.service.name = null sasl.kerberos.ticket.renew.jitter = 0.05 ssl.trustmanager.algorithm = PKIX ssl.key.password = null fetch.max.wait.ms = 500 sasl.kerberos.min.time.before.relogin = 60000 connections.max.idle.ms = 540000 session.timeout.ms = 30000 metrics.num.samples = 2 key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer ssl.protocol = TLS ssl.provider = null ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] ssl.keystore.location = null ssl.cipher.suites = null security.protocol = PLAINTEXT ssl.keymanager.algorithm = SunX509 metrics.sample.window.ms = 30000 auto.offset.reset = earliest 19/05/24 17:32:00 INFO ConsumerConfig: ConsumerConfig values: metric.reporters = [] metadata.max.age.ms = 300000 partition.assignment.strategy = [org.apache.kafka.clients.consumer.RangeAssignor] reconnect.backoff.ms = 50 sasl.kerberos.ticket.renew.window.factor = 0.8 max.partition.fetch.bytes = 1048576 bootstrap.servers = [10.20.0.10:6667] ssl.keystore.type = JKS enable.auto.commit = false sasl.mechanism = GSSAPI interceptor.classes = null exclude.internal.topics = true ssl.truststore.password = null client.id = consumer-2 ssl.endpoint.identification.algorithm = null max.poll.records = 1 check.crcs = true request.timeout.ms = 40000 heartbeat.interval.ms = 3000 auto.commit.interval.ms = 5000 receive.buffer.bytes = 65536 ssl.truststore.type = JKS ssl.truststore.location = null ssl.keystore.password = null fetch.min.bytes = 1 send.buffer.bytes = 131072 value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer group.id = spark-kafka-source-f1621e11-d476-4604-ba35-75a02f45dbfe--1335777992-driver-0 retry.backoff.ms = 100 sasl.kerberos.kinit.cmd = /usr/bin/kinit sasl.kerberos.service.name = null sasl.kerberos.ticket.renew.jitter = 0.05 ssl.trustmanager.algorithm = PKIX ssl.key.password = null fetch.max.wait.ms = 500 sasl.kerberos.min.time.before.relogin = 60000 connections.max.idle.ms = 540000 session.timeout.ms = 30000 metrics.num.samples = 2 key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer ssl.protocol = TLS ssl.provider = null ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] ssl.keystore.location = null ssl.cipher.suites = null security.protocol = PLAINTEXT ssl.keymanager.algorithm = SunX509 metrics.sample.window.ms = 30000 auto.offset.reset = earliest 19/05/24 17:32:00 INFO AppInfoParser: Kafka version : 0.10.0.1 19/05/24 17:32:00 INFO AppInfoParser: Kafka commitId : a7a17cdec9eaa6c5 19/05/24 17:32:00 INFO KafkaSource: GetBatch called with start = Some({"kafka-agg_1":{"0":77}}), end = {"kafka-agg_1":{"0":80}} 19/05/24 17:32:00 INFO KafkaSource: Partitions added: Map() 19/05/24 17:32:00 INFO KafkaSource: GetBatch generating RDD of offset range: KafkaSourceRDDOffsetRange(kafka-agg_1-0,77,80,None) 19/05/24 17:32:00 INFO AbstractCoordinator: Discovered coordinator nn10.htrunk.com:6667 (id: 2147482646 rack: null) for group spark-kafka-source-f1621e11-d476-4604-ba35-75a02f45dbfe--1335777992-driver-0. 19/05/24 17:32:00 INFO MicroBatchExecution: Streaming query made progress: { "id" : "9fab90c9-b1f8-469e-bc9c-71b3d1e5272c", "runId" : "56b3c327-d0fb-45b5-a820-4fe16f85f60d", "name" : null, "timestamp" : "2019-05-24T12:02:00.194Z", "batchId" : 6, "numInputRows" : 0, "processedRowsPerSecond" : 0.0, "durationMs" : { "getOffset" : 145, "triggerExecution" : 282 }, "eventTime" : { "watermark" : "2019-05-24T11:39:43.259Z" }, "stateOperators" : [ ], "sources" : [ { "description" : "KafkaSource[Assign[kafka-agg_1-0]]", "startOffset" : { "kafka-agg_1" : { "0" : 80 } }, "endOffset" : { "kafka-agg_1" : { "0" : 80 } }, "numInputRows" : 0, "processedRowsPerSecond" : 0.0 } ], "sink" : { "description" : "com.htrunk.streaming.job.processor.handlers.StreamingCustomSinkHandler$CustomSink@77ddc2e6" } } 19/05/24 17:32:15 INFO MicroBatchExecution: Committed offsets for batch 6. Metadata OffsetSeqMetadata(1558697983259,1558699335004,Map(spark.sql.shuffle.partitions -> 200, spark.sql.streaming.stateStore.providerClass -> org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider)) 19/05/24 17:32:15 INFO KafkaSource: GetBatch called with start = Some({"kafka-agg_1":{"0":80}}), end = {"kafka-agg_1":{"0":83}} 19/05/24 17:32:15 INFO KafkaSource: Partitions added: Map() 19/05/24 17:32:15 INFO KafkaSource: GetBatch generating RDD of offset range: KafkaSourceRDDOffsetRange(kafka-agg_1-0,80,83,None) 19/05/24 17:32:15 INFO MemoryStore: Block broadcast_2 stored as values in memory (estimated size 1080.0 B, free 912.3 MB) 19/05/24 17:32:15 INFO MemoryStore: Block broadcast_2_piece0 stored as bytes in memory (estimated size 350.0 B, free 912.3 MB) 19/05/24 17:32:15 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on nn6.htrunk.com:3467 (size: 350.0 B, free: 912.3 MB) 19/05/24 17:32:15 INFO SparkContext: Created broadcast 2 from start at SparkTransformStreamingHandler.java:323 19/05/24 17:32:15 ERROR MicroBatchExecution: Query [id = 9fab90c9-b1f8-469e-bc9c-71b3d1e5272c, runId = 56b3c327-d0fb-45b5-a820-4fe16f85f60d] terminated with error java.lang.AssertionError: assertion failed: No plan for EventTimeWatermark timestamp#39: timestamp, interval 2 minutes +- Project [a#37, timestamp#39] +- SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 0, a), StringType), true, false) AS a#37, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 1, b), StringType), true, false) AS b#38, staticinvoke(class org.apache.spark.sql.catalyst.util.DateTimeUtils$, TimestampType, fromJavaTimestamp, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 2, timestamp), TimestampType), true, false) AS timestamp#39] +- MapElements com.htrunk.streaming.job.processor.handlers.SparkTransformStreamingHandler$2@7072f0b7, interface org.apache.spark.sql.Row, [StructField(value,StringType,false), StructField(timestamp,TimestampType,false)], obj#36: org.apache.spark.sql.Row +- MapElements com.htrunk.streaming.job.processor.handlers.SparkTransformStreamingHandler$1@570e459e, interface org.apache.spark.sql.Row, [StructField(value,StringType,true), StructField(timestamp,TimestampType,true)], obj#28: org.apache.spark.sql.Row +- DeserializeToObject createexternalrow(value#21.toString, staticinvoke(class org.apache.spark.sql.catalyst.util.DateTimeUtils$, ObjectType(class java.sql.Timestamp), toJavaTimestamp, timestamp#74, true, false), StructField(value,StringType,true), StructField(timestamp,TimestampType,true)), obj#27: org.apache.spark.sql.Row +- Project [cast(value#70 as string) AS value#21, timestamp#74] +- LogicalRDD [key#69, value#70, topic#71, partition#72, offset#73L, timestamp#74, timestampType#75], true at scala.Predef$.assert(Predef.scala:170) at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93) at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:78) at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:75) at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) at scala.collection.Iterator$class.foreach(Iterator.scala:893) at scala.collection.AbstractIterator.foreach(Iterator.scala:1336) at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157) at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1336) at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2.apply(QueryPlanner.scala:75) at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2.apply(QueryPlanner.scala:67) at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434) at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440) at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93) at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:78) at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:75) at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) at scala.collection.Iterator$class.foreach(Iterator.scala:893) at scala.collection.AbstractIterator.foreach(Iterator.scala:1336) at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157) at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1336) at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2.apply(QueryPlanner.scala:75) at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2.apply(QueryPlanner.scala:67) at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434) at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440) at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93) at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:78) at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:75) at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) at scala.collection.Iterator$class.foreach(Iterator.scala:893) at scala.collection.AbstractIterator.foreach(Iterator.scala:1336) at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157) at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1336) at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2.apply(QueryPlanner.scala:75) at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2.apply(QueryPlanner.scala:67) at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434) at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440) at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93) at org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:72) at org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:68) at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:77) at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:77) at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3248) at org.apache.spark.sql.Dataset.head(Dataset.scala:2484) at org.apache.spark.sql.Dataset.take(Dataset.scala:2698) at org.apache.spark.sql.Dataset.showString(Dataset.scala:254) at org.apache.spark.sql.Dataset.show(Dataset.scala:723) at org.apache.spark.sql.Dataset.show(Dataset.scala:682) at org.apache.spark.sql.Dataset.show(Dataset.scala:691) at com.htrunk.streaming.job.processor.handlers.StreamingCustomSinkHandler$CustomSink.addBatch(StreamingCustomSinkHandler.java:91) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$3$$anonfun$apply$16.apply(MicroBatchExecution.scala:475) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$3.apply(MicroBatchExecution.scala:473) at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:271) at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch(MicroBatchExecution.scala:472) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:133) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:121) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:121) at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:271) at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:121) at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:117) at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:279) at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:189) org.apache.spark.sql.streaming.StreamingQueryException: assertion failed: No plan for EventTimeWatermark timestamp#39: timestamp, interval 2 minutes +- Project [a#37, timestamp#39] +- SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 0, a), StringType), true, false) AS a#37, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 1, b), StringType), true, false) AS b#38, staticinvoke(class org.apache.spark.sql.catalyst.util.DateTimeUtils$, TimestampType, fromJavaTimestamp, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 2, timestamp), TimestampType), true, false) AS timestamp#39] +- MapElements com.htrunk.streaming.job.processor.handlers.SparkTransformStreamingHandler$2@7072f0b7, interface org.apache.spark.sql.Row, [StructField(value,StringType,false), StructField(timestamp,TimestampType,false)], obj#36: org.apache.spark.sql.Row +- MapElements com.htrunk.streaming.job.processor.handlers.SparkTransformStreamingHandler$1@570e459e, interface org.apache.spark.sql.Row, [StructField(value,StringType,true), StructField(timestamp,TimestampType,true)], obj#28: org.apache.spark.sql.Row +- DeserializeToObject createexternalrow(value#21.toString, staticinvoke(class org.apache.spark.sql.catalyst.util.DateTimeUtils$, ObjectType(class java.sql.Timestamp), toJavaTimestamp, timestamp#74, true, false), StructField(value,StringType,true), StructField(timestamp,TimestampType,true)), obj#27: org.apache.spark.sql.Row +- Project [cast(value#70 as string) AS value#21, timestamp#74] +- LogicalRDD [key#69, value#70, topic#71, partition#72, offset#73L, timestamp#74, timestampType#75], true === Streaming Query === Identifier: [id = 9fab90c9-b1f8-469e-bc9c-71b3d1e5272c, runId = 56b3c327-d0fb-45b5-a820-4fe16f85f60d] Current Committed Offsets: {KafkaSource[Assign[kafka-agg_1-0]]: {"kafka-agg_1":{"0":80}}} Current Available Offsets: {KafkaSource[Assign[kafka-agg_1-0]]: {"kafka-agg_1":{"0":83}}} Current State: ACTIVE Thread State: RUNNABLE Logical Plan: Aggregate [window#48, a#37], [window#48 AS window#42, a#37, count(1) AS count#47L] +- Filter isnotnull(timestamp#39) +- Project [named_struct(start, precisetimestampconversion(((((CASE WHEN (cast(CEIL((cast((precisetimestampconversion(timestamp#39, TimestampType, LongType) - 0) as double) / cast(10000000 as double))) as double) = (cast((precisetimestampconversion(timestamp#39, TimestampType, LongType) - 0) as double) / cast(10000000 as double))) THEN (CEIL((cast((precisetimestampconversion(timestamp#39, TimestampType, LongType) - 0) as double) / cast(10000000 as double))) + cast(1 as bigint)) ELSE CEIL((cast((precisetimestampconversion(timestamp#39, TimestampType, LongType) - 0) as double) / cast(10000000 as double))) END + cast(0 as bigint)) - cast(1 as bigint)) * 10000000) + 0), LongType, TimestampType), end, precisetimestampconversion((((((CASE WHEN (cast(CEIL((cast((precisetimestampconversion(timestamp#39, TimestampType, LongType) - 0) as double) / cast(10000000 as double))) as double) = (cast((precisetimestampconversion(timestamp#39, TimestampType, LongType) - 0) as double) / cast(10000000 as double))) THEN (CEIL((cast((precisetimestampconversion(timestamp#39, TimestampType, LongType) - 0) as double) / cast(10000000 as double))) + cast(1 as bigint)) ELSE CEIL((cast((precisetimestampconversion(timestamp#39, TimestampType, LongType) - 0) as double) / cast(10000000 as double))) END + cast(0 as bigint)) - cast(1 as bigint)) * 10000000) + 0) + 10000000), LongType, TimestampType)) AS window#48, a#37, b#38, timestamp#39-T120000ms] +- EventTimeWatermark timestamp#39: timestamp, interval 2 minutes +- SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 0, a), StringType), true, false) AS a#37, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 1, b), StringType), true, false) AS b#38, staticinvoke(class org.apache.spark.sql.catalyst.util.DateTimeUtils$, TimestampType, fromJavaTimestamp, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 2, timestamp), TimestampType), true, false) AS timestamp#39] +- MapElements com.htrunk.streaming.job.processor.handlers.SparkTransformStreamingHandler$2@7072f0b7, interface org.apache.spark.sql.Row, [StructField(value,StringType,false), StructField(timestamp,TimestampType,false)], obj#36: org.apache.spark.sql.Row +- DeserializeToObject createexternalrow(value#29.toString, staticinvoke(class org.apache.spark.sql.catalyst.util.DateTimeUtils$, ObjectType(class java.sql.Timestamp), toJavaTimestamp, timestamp#30, true, false), StructField(value,StringType,false), StructField(timestamp,TimestampType,false)), obj#35: org.apache.spark.sql.Row +- SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 0, value), StringType), true, false) AS value#29, staticinvoke(class org.apache.spark.sql.catalyst.util.DateTimeUtils$, TimestampType, fromJavaTimestamp, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 1, timestamp), TimestampType), true, false) AS timestamp#30] +- MapElements com.htrunk.streaming.job.processor.handlers.SparkTransformStreamingHandler$1@570e459e, interface org.apache.spark.sql.Row, [StructField(value,StringType,true), StructField(timestamp,TimestampType,true)], obj#28: org.apache.spark.sql.Row +- DeserializeToObject createexternalrow(value#21.toString, staticinvoke(class org.apache.spark.sql.catalyst.util.DateTimeUtils$, ObjectType(class java.sql.Timestamp), toJavaTimestamp, timestamp#22, true, false), StructField(value,StringType,true), StructField(timestamp,TimestampType,true)), obj#27: org.apache.spark.sql.Row +- Project [cast(value#8 as string) AS value#21, cast(timestamp#12 as timestamp) AS timestamp#22] +- StreamingExecutionRelation KafkaSource[Assign[kafka-agg_1-0]], [key#7, value#8, topic#9, partition#10, offset#11L, timestamp#12, timestampType#13] at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:295) at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:189) Caused by: java.lang.AssertionError: assertion failed: No plan for EventTimeWatermark timestamp#39: timestamp, interval 2 minutes +- Project [a#37, timestamp#39] +- SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 0, a), StringType), true, false) AS a#37, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 1, b), StringType), true, false) AS b#38, staticinvoke(class org.apache.spark.sql.catalyst.util.DateTimeUtils$, TimestampType, fromJavaTimestamp, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 2, timestamp), TimestampType), true, false) AS timestamp#39] +- MapElements com.htrunk.streaming.job.processor.handlers.SparkTransformStreamingHandler$2@7072f0b7, interface org.apache.spark.sql.Row, [StructField(value,StringType,false), StructField(timestamp,TimestampType,false)], obj#36: org.apache.spark.sql.Row +- MapElements com.htrunk.streaming.job.processor.handlers.SparkTransformStreamingHandler$1@570e459e, interface org.apache.spark.sql.Row, [StructField(value,StringType,true), StructField(timestamp,TimestampType,true)], obj#28: org.apache.spark.sql.Row +- DeserializeToObject createexternalrow(value#21.toString, staticinvoke(class org.apache.spark.sql.catalyst.util.DateTimeUtils$, ObjectType(class java.sql.Timestamp), toJavaTimestamp, timestamp#74, true, false), StructField(value,StringType,true), StructField(timestamp,TimestampType,true)), obj#27: org.apache.spark.sql.Row +- Project [cast(value#70 as string) AS value#21, timestamp#74] +- LogicalRDD [key#69, value#70, topic#71, partition#72, offset#73L, timestamp#74, timestampType#75], true at scala.Predef$.assert(Predef.scala:170) at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93) at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:78) at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:75) at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) at scala.collection.Iterator$class.foreach(Iterator.scala:893) at scala.collection.AbstractIterator.foreach(Iterator.scala:1336) at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157) at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1336) at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2.apply(QueryPlanner.scala:75) at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2.apply(QueryPlanner.scala:67) at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434) at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440) at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93) at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:78) at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:75) at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) at scala.collection.Iterator$class.foreach(Iterator.scala:893) at scala.collection.AbstractIterator.foreach(Iterator.scala:1336) at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157) at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1336) at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2.apply(QueryPlanner.scala:75) at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2.apply(QueryPlanner.scala:67) at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434) at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440) at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93) at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:78) at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:75) at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) at scala.collection.Iterator$class.foreach(Iterator.scala:893) at scala.collection.AbstractIterator.foreach(Iterator.scala:1336) at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157) at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1336) at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2.apply(QueryPlanner.scala:75) at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2.apply(QueryPlanner.scala:67) at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434) at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440) at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93) at org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:72) at org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:68) at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:77) at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:77) at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3248) at org.apache.spark.sql.Dataset.head(Dataset.scala:2484) at org.apache.spark.sql.Dataset.take(Dataset.scala:2698) at org.apache.spark.sql.Dataset.showString(Dataset.scala:254) at org.apache.spark.sql.Dataset.show(Dataset.scala:723) at org.apache.spark.sql.Dataset.show(Dataset.scala:682) at org.apache.spark.sql.Dataset.show(Dataset.scala:691) at com.htrunk.streaming.job.processor.handlers.StreamingCustomSinkHandler$CustomSink.addBatch(StreamingCustomSinkHandler.java:91) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$3$$anonfun$apply$16.apply(MicroBatchExecution.scala:475) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$3.apply(MicroBatchExecution.scala:473) at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:271) at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch(MicroBatchExecution.scala:472) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:133) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:121) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:121) at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:271) at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:121) at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:117) at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:279) ... 1 more 19/05/24 17:32:16 INFO SparkTransform: Available memory before clear 287227824 19/05/24 17:32:16 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped! 19/05/24 17:32:16 INFO MemoryStore: MemoryStore cleared 19/05/24 17:32:16 INFO BlockManager: BlockManager stopped 19/05/24 17:32:16 INFO BlockManagerMaster: BlockManagerMaster stopped 19/05/24 17:32:16 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped! 19/05/24 17:32:16 INFO SparkContext: Successfully stopped SparkContext 19/05/24 17:32:16 INFO SparkContext: SparkContext already stopped. 19/05/24 17:32:16 INFO SparkTransform: Available memory After clear 554082912 19/05/24 17:32:16 INFO ShutdownHookManager: Shutdown hook called 19/05/24 17:32:16 INFO ShutdownHookManager: Deleting directory /tmp/spark-a1743426-7d6b-4a53-aada-959fbd6ccb16 19/05/24 17:32:16 INFO ShutdownHookManager: Deleting directory /tmp/temporaryReader-2055b9d8-e278-4ccb-97ce-93cd009d3218 19/05/24 17:32:16 INFO ShutdownHookManager: Deleting directory /tmp/spark-2bf8f89a-617d-4ae5-8a9a-2a46dd5696c0) > Structured Streaming Custom Sink -- > ----------------------------------- > > Key: SPARK-27833 > URL: https://issues.apache.org/jira/browse/SPARK-27833 > Project: Spark > Issue Type: Bug > Components: Structured Streaming > Affects Versions: 2.3.0 > Environment: spark 2.3.0 > java 1.8 > kafka version 0.10. > Reporter: Raviteja > Priority: Blocker > Labels: spark-streaming-kafka > Attachments: kafka_consumer_code.java, kafka_custom_sink.java, > kafka_error_log.txt > > > Hi , > We have a requirement to read data from kafka, apply some transformation and > store data to database .For this we are implementing watermarking feature > along with aggregate function and for storing we are writing our own sink > (Structured streaming) .we are using spark 2.3.0, java 1.8 and kafka version > 0.10. > We are getting the below error. > "*java.lang.AssertionError: assertion failed: No plan for EventTimeWatermark > timestamp#39: timestamp, interval 2 minutes*" > > works perfectly fine when we use Console as sink instead custom sink. For > Debugging the issue, we are performing "dataframe.show()" in our custom sink > and nothing else. > Please find the attachment for the Error log and the code. Please look into > this issue as this a blocker and we are not able to proceed further or find > any alternatives as we need watermarking feature. > -- This message was sent by Atlassian JIRA (v7.6.3#76005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org