[ 
https://issues.apache.org/jira/browse/SPARK-27409?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16819473#comment-16819473
 ] 

Prabhjot Singh Bharaj commented on SPARK-27409:
-----------------------------------------------

[~gsomogyi] I haven't tried it on master. I'm facing the problem with Spark 
2.3.2

 

Here is a complete log -

 

 
{code:java}
➜ ~/spark ((HEAD detached at v2.3.2)) ✗ ./bin/pyspark --packages 
org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.2
Python 2.7.10 (default, Feb 22 2019, 21:17:52)
[GCC 4.2.1 Compatible Apple LLVM 10.0.1 (clang-1001.0.37.14)] on darwin
Type "help", "copyright", "credits" or "license" for more information.
Ivy Default Cache set to: /<REDACTED>/.ivy2/cache
The jars for the packages stored in: /<REDACTED>/.ivy2/jars
:: loading settings :: url = 
jar:file:/<REDACTED>/spark/assembly/target/scala-2.11/jars/ivy-2.4.0.jar!/org/apache/ivy/core/settings/ivysettings.xml
org.apache.spark#spark-sql-kafka-0-10_2.11 added as a dependency
:: resolving dependencies :: 
org.apache.spark#spark-submit-parent-b75b99d4-ae39-49b0-b366-8b718542b4f8;1.0
 confs: [default]
 found org.apache.spark#spark-sql-kafka-0-10_2.11;2.3.2 in central
 found org.apache.kafka#kafka-clients;0.10.0.1 in local-m2-cache
 found net.jpountz.lz4#lz4;1.3.0 in local-m2-cache
 found org.xerial.snappy#snappy-java;1.1.2.6 in local-m2-cache
 found org.slf4j#slf4j-api;1.7.16 in spark-list
 found org.spark-project.spark#unused;1.0.0 in local-m2-cache
:: resolution report :: resolve 1580ms :: artifacts dl 4ms
 :: modules in use:
 net.jpountz.lz4#lz4;1.3.0 from local-m2-cache in [default]
 org.apache.kafka#kafka-clients;0.10.0.1 from local-m2-cache in [default]
 org.apache.spark#spark-sql-kafka-0-10_2.11;2.3.2 from central in [default]
 org.slf4j#slf4j-api;1.7.16 from spark-list in [default]
 org.spark-project.spark#unused;1.0.0 from local-m2-cache in [default]
 org.xerial.snappy#snappy-java;1.1.2.6 from local-m2-cache in [default]
 ---------------------------------------------------------------------
 | | modules || artifacts |
 | conf | number| search|dwnlded|evicted|| number|dwnlded|
 ---------------------------------------------------------------------
 | default | 6 | 2 | 2 | 0 || 6 | 0 |
 ---------------------------------------------------------------------

:: problems summary ::
:::: ERRORS
 unknown resolver null

 unknown resolver null


:: USE VERBOSE OR DEBUG MESSAGE LEVEL FOR MORE DETAILS
:: retrieving :: 
org.apache.spark#spark-submit-parent-b75b99d4-ae39-49b0-b366-8b718542b4f8
 confs: [default]
 0 artifacts copied, 6 already retrieved (0kB/6ms)
19/04/16 16:31:48 WARN NativeCodeLoader: Unable to load native-hadoop library 
for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use 
setLogLevel(newLevel).
Welcome to
 ____ __
 / __/__ ___ _____/ /__
 _\ \/ _ \/ _ `/ __/ '_/
 /__ / .__/\_,_/_/ /_/\_\ version 2.3.2
 /_/

Using Python version 2.7.10 (default, Feb 22 2019 21:17:52)
SparkSession available as 'spark'.
>>> df = spark.readStream.format('kafka').option('kafka.bootstrap.servers', 
>>> 'localhost:9093').option("kafka.security.protocol", 
>>> "SSL",).option("kafka.ssl.keystore.password", 
>>> "").option("kafka.ssl.keystore.type", 
>>> "PKCS12").option("kafka.ssl.keystore.location", 
>>> 'non-existent').option('subscribe', 'no existing topic').load()
Traceback (most recent call last):
 File "<stdin>", line 1, in <module>
 File "/<REDACTED>/spark/python/pyspark/sql/streaming.py", line 403, in load
 return self._df(self._jreader.load())
 File "/<REDACTED>/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", 
line 1257, in __call__
 File "/<REDACTED>/spark/python/pyspark/sql/utils.py", line 63, in deco
 return f(*a, **kw)
 File "/<REDACTED>/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 
328, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o37.load.
: org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
 at 
org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:702)
 at 
org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:557)
 at 
org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:540)
 at 
org.apache.spark.sql.kafka010.SubscribeStrategy.createConsumer(ConsumerStrategy.scala:62)
 at 
org.apache.spark.sql.kafka010.KafkaOffsetReader.createConsumer(KafkaOffsetReader.scala:314)
 at 
org.apache.spark.sql.kafka010.KafkaOffsetReader.<init>(KafkaOffsetReader.scala:78)
 at 
org.apache.spark.sql.kafka010.KafkaSourceProvider.createContinuousReader(KafkaSourceProvider.scala:130)
 at 
org.apache.spark.sql.kafka010.KafkaSourceProvider.createContinuousReader(KafkaSourceProvider.scala:43)
 at 
org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:185)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
 at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:498)
 at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
 at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
 at py4j.Gateway.invoke(Gateway.java:282)
 at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
 at py4j.commands.CallCommand.execute(CallCommand.java:79)
 at py4j.GatewayConnection.run(GatewayConnection.java:238)
 at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.common.KafkaException: 
org.apache.kafka.common.KafkaException: java.io.FileNotFoundException: 
non-existent (No such file or directory)
 at 
org.apache.kafka.common.network.SslChannelBuilder.configure(SslChannelBuilder.java:44)
 at 
org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:70)
 at 
org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:83)
 at 
org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:623)
 ... 19 more
Caused by: org.apache.kafka.common.KafkaException: 
java.io.FileNotFoundException: non-existent (No such file or directory)
 at 
org.apache.kafka.common.security.ssl.SslFactory.configure(SslFactory.java:110)
 at 
org.apache.kafka.common.network.SslChannelBuilder.configure(SslChannelBuilder.java:41)
 ... 22 more
Caused by: java.io.FileNotFoundException: non-existent (No such file or 
directory)
 at java.io.FileInputStream.open0(Native Method)
 at java.io.FileInputStream.open(FileInputStream.java:195)
 at java.io.FileInputStream.<init>(FileInputStream.java:138)
 at java.io.FileInputStream.<init>(FileInputStream.java:93)
 at 
org.apache.kafka.common.security.ssl.SslFactory$SecurityStore.load(SslFactory.java:205)
 at 
org.apache.kafka.common.security.ssl.SslFactory$SecurityStore.access$000(SslFactory.java:190)
 at 
org.apache.kafka.common.security.ssl.SslFactory.createSSLContext(SslFactory.java:126)
 at 
org.apache.kafka.common.security.ssl.SslFactory.configure(SslFactory.java:108)
 ... 23 more{code}
 

 

I'm running pyspark in the same way as mentioned in 
[https://spark.apache.org/docs/2.3.2/structured-streaming-kafka-integration.html#deploying]

 

The problem is that when a `non-existent` cert is given, it seems to go through 
the `createContinuousReader` path, without having to specify a trigger. It 
should go via the Micro Batch code

 

> Micro-batch support for Kafka Source in Spark 2.3
> -------------------------------------------------
>
>                 Key: SPARK-27409
>                 URL: https://issues.apache.org/jira/browse/SPARK-27409
>             Project: Spark
>          Issue Type: Question
>          Components: Structured Streaming
>    Affects Versions: 2.3.2
>            Reporter: Prabhjot Singh Bharaj
>            Priority: Major
>
> It seems with this change - 
> [https://github.com/apache/spark/commit/0a441d2edb0a3f6c6c7c370db8917e1c07f211e7#diff-eeac5bdf3a1ecd7b9f8aaf10fff37f05R50]
>  in Spark 2.3 for Kafka Source Provider, a Kafka source can not be run in 
> micro-batch mode but only in continuous mode. Is that understanding correct ?
> {code:java}
> E Py4JJavaError: An error occurred while calling o217.load.
> E : org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
> E at 
> org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:717)
> E at 
> org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:566)
> E at 
> org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:549)
> E at 
> org.apache.spark.sql.kafka010.SubscribeStrategy.createConsumer(ConsumerStrategy.scala:62)
> E at 
> org.apache.spark.sql.kafka010.KafkaOffsetReader.createConsumer(KafkaOffsetReader.scala:314)
> E at 
> org.apache.spark.sql.kafka010.KafkaOffsetReader.<init>(KafkaOffsetReader.scala:78)
> E at 
> org.apache.spark.sql.kafka010.KafkaSourceProvider.createContinuousReader(KafkaSourceProvider.scala:130)
> E at 
> org.apache.spark.sql.kafka010.KafkaSourceProvider.createContinuousReader(KafkaSourceProvider.scala:43)
> E at 
> org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:185)
> E at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> E at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> E at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> E at java.lang.reflect.Method.invoke(Method.java:498)
> E at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
> E at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
> E at py4j.Gateway.invoke(Gateway.java:282)
> E at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
> E at py4j.commands.CallCommand.execute(CallCommand.java:79)
> E at py4j.GatewayConnection.run(GatewayConnection.java:238)
> E at java.lang.Thread.run(Thread.java:748)
> E Caused by: org.apache.kafka.common.KafkaException: 
> org.apache.kafka.common.KafkaException: java.io.FileNotFoundException: 
> non-existent (No such file or directory)
> E at 
> org.apache.kafka.common.network.SslChannelBuilder.configure(SslChannelBuilder.java:44)
> E at 
> org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:93)
> E at 
> org.apache.kafka.common.network.ChannelBuilders.clientChannelBuilder(ChannelBuilders.java:51)
> E at 
> org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:84)
> E at 
> org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:657)
> E ... 19 more
> E Caused by: org.apache.kafka.common.KafkaException: 
> java.io.FileNotFoundException: non-existent (No such file or directory)
> E at 
> org.apache.kafka.common.security.ssl.SslFactory.configure(SslFactory.java:121)
> E at 
> org.apache.kafka.common.network.SslChannelBuilder.configure(SslChannelBuilder.java:41)
> E ... 23 more
> E Caused by: java.io.FileNotFoundException: non-existent (No such file or 
> directory)
> E at java.io.FileInputStream.open0(Native Method)
> E at java.io.FileInputStream.open(FileInputStream.java:195)
> E at java.io.FileInputStream.<init>(FileInputStream.java:138)
> E at java.io.FileInputStream.<init>(FileInputStream.java:93)
> E at 
> org.apache.kafka.common.security.ssl.SslFactory$SecurityStore.load(SslFactory.java:216)
> E at 
> org.apache.kafka.common.security.ssl.SslFactory$SecurityStore.access$000(SslFactory.java:201)
> E at 
> org.apache.kafka.common.security.ssl.SslFactory.createSSLContext(SslFactory.java:137)
> E at 
> org.apache.kafka.common.security.ssl.SslFactory.configure(SslFactory.java:119)
> E ... 24 more{code}
>  When running a simple data stream loader for kafka without an SSL cert, it 
> goes through this code block - 
>  
> {code:java}
> ...
> ...
> org.apache.spark.sql.kafka010.KafkaSourceProvider.createContinuousReader(KafkaSourceProvider.scala:130)
> E at 
> org.apache.spark.sql.kafka010.KafkaSourceProvider.createContinuousReader(KafkaSourceProvider.scala:43)
> E at 
> org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:185)
> ...
> ...{code}
>  
> Note that I haven't selected `trigger=continuous...` when creating the 
> dataframe, still the code is going through the continuous path. My 
> understanding was that `continuous` is optional and not the default.
>  
> Please clarify.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to