[ https://issues.apache.org/jira/browse/SPARK-28336?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Hyukjin Kwon updated SPARK-28336: --------------------------------- Labels: kafka (was: beginner kafka newbie) > Tried running same code in local machine in IDE pycharm it running fine but > issue arises when i setup all on EC2 my RDD has Json Value and convert it to > data frame and show dataframe by Show method it fails to show my data frame. > ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- > > Key: SPARK-28336 > URL: https://issues.apache.org/jira/browse/SPARK-28336 > Project: Spark > Issue Type: Bug > Components: Deploy, DStreams, EC2, PySpark, Spark Submit > Affects Versions: 2.4.3 > Environment: Using EC2 Ubuntu 18.04.2 LTS > Spark version : Spark 2.4.3 built for Hadoop 2.7.3 > Kafka version : kafka_2.12-2.2.1 > Reporter: Aditya > Priority: Minor > Labels: kafka > > I am a beginner to pyspark and I am creating a pilot project in spark i used > pycharm IDE for developing my project and it runs fine on my IDE Let me > explain my project I am producing JSON in Kafka topic and consuming topic in > spark and converting RDD VALUE(which is i JSON) converting to data frame > using this method (productInfo = sqlContext.read.json(rdd)) and working > perfectly on my local machine after converting RDD to DataFrame I am > displaying that DataFrame to my console using .Show() method and working fine. > But my problem arises when I setup all this(Kafka,apache-spark) in EC2(Ubuntu > 18.04.2 LTS) and tried to execute using spark-submit console stop when it > reached my show() method and display nothing again starts and stops at show() > method I can't figure out what is error not showing any error in console and > also check if my data is coming in RDD or not it is in RDD > {color:#ff0000}My Code: {color} > {code:java} > # coding: utf-8 > from pyspark import SparkContext > from pyspark import SparkConf > from pyspark.streaming import StreamingContext > from pyspark.streaming.kafka import KafkaUtils > from pyspark.sql import Row, DataFrame, SQLContext > import pandas as pd > def getSqlContextInstance(sparkContext): > if ('sqlContextSingletonInstance' not in globals()): > globals()['sqlContextSingletonInstance'] = SQLContext(sparkContext) > return globals()['sqlContextSingletonInstance'] > def process(time, rdd): > print("========= %s =========" % str(time)) > try: > #print("--------------Also cross check my data is present in rdd I > checked by printing ----------------") > #results = rdd.collect() > #for result in results: > #print(result) > # Get the singleton instance of SparkSession > sqlContext = getSqlContextInstance(rdd.context) > productInfo = sqlContext.read.json(rdd) > # problem comes here when i try to show it > productInfo.show() > except: > pass > if _name_ == '_main_': > conf = SparkConf().set("spark.cassandra.connection.host", "127.0.0.1") > sc = SparkContext(conf = conf) > sc.setLogLevel("WARN") > sqlContext = SQLContext(sc) > ssc = StreamingContext(sc,10) > kafkaStream = KafkaUtils.createStream(ssc, 'localhost:2181', > 'spark-streaming', {'new_topic':1}) > lines = kafkaStream.map(lambda x: x[1]) > lines.foreachRDD(process) > #lines.pprint() > ssc.start() > ssc.awaitTermination() > {code} > > {color:#ff0000}My console:{color} > {code:java} > ./spark-submit ReadingJsonFromKafkaAndWritingToScylla_CSV_Example.py > 19/07/10 11:13:15 WARN NativeCodeLoader: Unable to load native-hadoop > library for your platform... using builtin-java classes where applicable > Using Spark's default log4j profile: > org/apache/spark/log4j-defaults.properties > 19/07/10 11:13:15 INFO SparkContext: Running Spark version 2.4.3 > 19/07/10 11:13:15 INFO SparkContext: Submitted application: > ReadingJsonFromKafkaAndWritingToScylla_CSV_Example.py > 19/07/10 11:13:15 INFO SecurityManager: Changing view acls to: kafka > 19/07/10 11:13:15 INFO SecurityManager: Changing modify acls to: kafka > 19/07/10 11:13:15 INFO SecurityManager: Changing view acls groups to: > 19/07/10 11:13:15 INFO SecurityManager: Changing modify acls groups to: > 19/07/10 11:13:15 INFO SecurityManager: SecurityManager: authentication > disabled; ui acls disabled; users with view permissions: Set(kafka); groups > with view permissions: Set(); users with modify permissions: Set(kafka); > groups with modify permissions: Set() > 19/07/10 11:13:16 INFO Utils: Successfully started service 'sparkDriver' on > port 41655. > 19/07/10 11:13:16 INFO SparkEnv: Registering MapOutputTracker > 19/07/10 11:13:16 INFO SparkEnv: Registering BlockManagerMaster > 19/07/10 11:13:16 INFO BlockManagerMasterEndpoint: Using > org.apache.spark.storage.DefaultTopologyMapper for getting topology > information > 19/07/10 11:13:16 INFO BlockManagerMasterEndpoint: > BlockManagerMasterEndpoint up > 19/07/10 11:13:16 INFO DiskBlockManager: Created local directory at > /tmp/blockmgr-33f848fe-88d7-4c8f-8440-8384e094c59c > 19/07/10 11:13:16 INFO MemoryStore: MemoryStore started with capacity 366.3 > MB > 19/07/10 11:13:16 INFO SparkEnv: Registering OutputCommitCoordinator > 19/07/10 11:13:16 WARN Utils: Service 'SparkUI' could not bind on port 4040. > Attempting port 4041. > 19/07/10 11:13:16 WARN Utils: Service 'SparkUI' could not bind on port 4041. > Attempting port 4042. > 19/07/10 11:13:16 WARN Utils: Service 'SparkUI' could not bind on port 4042. > Attempting port 4043. > 19/07/10 11:13:16 WARN Utils: Service 'SparkUI' could not bind on port 4043. > Attempting port 4044. > 19/07/10 11:13:16 WARN Utils: Service 'SparkUI' could not bind on port 4044. > Attempting port 4045. > 19/07/10 11:13:16 WARN Utils: Service 'SparkUI' could not bind on port 4045. > Attempting port 4046. > 19/07/10 11:13:16 INFO Utils: Successfully started service 'SparkUI' on port > 4046. > 19/07/10 11:13:16 INFO SparkUI: Bound SparkUI to 0.0.0.0, and started at > [http://ip-172-31-92-134.ec2.internal:4046|http://ip-172-31-92-134.ec2.internal:4046/] > 19/07/10 11:13:16 INFO Executor: Starting executor ID driver on host > localhost > 19/07/10 11:13:16 INFO Utils: Successfully started service > 'org.apache.spark.network.netty.NettyBlockTransferService' on port 34719. > 19/07/10 11:13:16 INFO NettyBlockTransferService: Server created on > ip-172-31-92-134.ec2.internal:34719 > 19/07/10 11:13:16 INFO BlockManager: Using > org.apache.spark.storage.RandomBlockReplicationPolicy for block replication > policy > 19/07/10 11:13:16 INFO BlockManagerMaster: Registering BlockManager > BlockManagerId(driver, ip-172-31-92-134.ec2.internal, 34719, None) > 19/07/10 11:13:16 INFO BlockManagerMasterEndpoint: Registering block manager > ip-172-31-92-134.ec2.internal:34719 with 366.3 MB RAM, BlockManagerId(driver, > ip-172-31-92-134.ec2.internal, 34719, None) > 19/07/10 11:13:16 INFO BlockManagerMaster: Registered BlockManager > BlockManagerId(driver, ip-172-31-92-134.ec2.internal, 34719, None) > 19/07/10 11:13:16 INFO BlockManager: Initialized BlockManager: > BlockManagerId(driver, ip-172-31-92-134.ec2.internal, 34719, None) > 19/07/10 11:13:17 WARN AppInfo$: Can't read Kafka version from MANIFEST.MF. > Possible cause: java.lang.NullPointerException > 19/07/10 11:13:18 WARN RandomBlockReplicationPolicy: Expecting 1 replicas > with only 0 peer/s. > 19/07/10 11:13:18 WARN BlockManager: Block input-0-1562757198000 replicated > to only 0 peer(s) instead of 1 peers > {code} > {color:#ff0000}This is when I am not producing in data in my kafka > topic{color} > {code:java} > ========= 2019-07-10 11:13:20 ========= > ---------------------in function procces---------------------- > -----------------------before printing---------------------- > ========= 2019-07-10 11:13:30 ========= > ---------------------in function procces---------------------- > -----------------------before printing---------------------- > ++ > > ++ > ++ > ------------------------after printing----------------------- > ========= 2019-07-10 11:13:40 ========= > ---------------------in function procces---------------------- > -----------------------before printing---------------------- > ++ > > ++ > ++ > ------------------------after printing----------------------- > ========= 2019-07-10 11:15:40 ========= > ---------------------in function procces---------------------- > -----------------------before printing---------------------- > ++ > > ++ > ++ > ------------------------after printing----------------------- > 19/07/10 11:15:47 WARN RandomBlockReplicationPolicy: Expecting 1 replicas > with only 0 peer/s. > 19/07/10 11:15:47 WARN BlockManager: Block input-0-1562757347200 replicated > to only 0 peer(s) instead of 1 peers > {code} > {color:#ff0000}This is when I start producing my data in kafka topic{color} > {code:java} > ========= 2019-07-10 11:15:50 ========= > ---------------------in function procces---------------------- > -----------------------before printing---------------------- > 19/07/10 11:15:52 WARN RandomBlockReplicationPolicy: Expecting 1 replicas > with only 0 peer/s. > 19/07/10 11:15:52 WARN BlockManager: Block input-0-1562757352200 replicated > to only 0 peer(s) instead of 1 peers > 19/07/10 11:15:57 WARN RandomBlockReplicationPolicy: Expecting 1 replicas > with only 0 peer/s. > 19/07/10 11:15:57 WARN BlockManager: Block input-0-1562757357200 replicated > to only 0 peer(s) instead of 1 peers > ========= 2019-07-10 11:16:00 ========= > ---------------------in function procces---------------------- > -----------------------before printing---------------------- > 19/07/10 11:16:02 WARN RandomBlockReplicationPolicy: Expecting 1 replicas > with only 0 peer/s. > 19/07/10 11:16:02 WARN BlockManager: Block input-0-1562757362200 replicated > to only 0 peer(s) instead of 1 peers > 19/07/10 11:16:07 WARN RandomBlockReplicationPolicy: Expecting 1 replicas > with only 0 peer/s. > 19/07/10 11:16:07 WARN BlockManager: Block input-0-1562757367400 replicated > to only 0 peer(s) instead of 1 peers > ========= 2019-07-10 11:16:10 ========= > ---------------------in function procces---------------------- > -----------------------before printing---------------------- > 19/07/10 11:16:12 WARN RandomBlockReplicationPolicy: Expecting 1 replicas > with only 0 peer/s. > 19/07/10 11:16:12 WARN BlockManager: Block input-0-1562757372400 replicated > to only 0 peer(s) instead of 1 peers > 19/07/10 11:16:17 WARN RandomBlockReplicationPolicy: Expecting 1 replicas > with only 0 peer/s. > 19/07/10 11:16:17 WARN BlockManager: Block input-0-1562757377400 replicated > to only 0 peer(s) instead of 1 peers > {code} > I don't how to figure out can anyone help me really appreciated. > Thank you -- This message was sent by Atlassian JIRA (v7.6.3#76005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org