[ https://issues.apache.org/jira/browse/SPARK-22802?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16292527#comment-16292527 ]
Annamalai Venugopal commented on SPARK-22802: --------------------------------------------- I am doing a project with spark with the source code as: import sys import os from nltk.stem import SnowballStemmer from nltk.corpus import wordnet as wn from os import listdir from os.path import isfile, join from pyspark.sql.types import * from pyspark.ml.feature import RegexTokenizer from pyspark.sql.functions import udf from pyspark.ml.feature import StopWordsRemover, CountVectorizer, IDF from pyspark.ml.linalg import SparseVector, VectorUDT lmtzr = SnowballStemmer("english") # Set the path for spark installation # this is the path where you downloaded and uncompressed the Spark download # Using forward slashes on windows, \\ should work too. os.environ['SPARK_HOME'] = "C:/Users/avenugopal/Downloads/spark-2.2.0-bin-hadoop2.7/" # Append the python dir to PYTHONPATH so that pyspark could be found sys.path.append("C:/Users/avenugopal/Downloads/spark-2.2.0-bin-hadoop2.7/python/") # Append the python/build to PYTHONPATH so that py4j could be found sys.path.append('C:/Users/avenugopal/Downloads/spark-2.2.0-bin-hadoop2.7/python/build') # try the import Spark Modules try: from pyspark import SparkContext from pyspark import SparkConf from pyspark import SQLContext from pyspark.sql import SparkSession import pyspark.sql.functions as fun except ImportError as e: print("Error importing Spark Modules", e) sys.exit(1) # Run a word count example on the local machine: # if len(sys.argv) != 2: # print ( sys.stderr, "Usage: wordcount <file>") # exit(-1) conf = SparkConf() conf.setMaster("local") conf.setAppName("spark_wc").set("spark.driver.maxResultSize", "3g").set("spark.executor.memory", "3g") sc = SparkContext(conf=conf) sc.range(100) my_path = "venv//reuters-extracted" only_files = [f for f in listdir(my_path) if isfile(join(my_path, f))] schema = StructType([ StructField("id", IntegerType(), True), StructField("sentence", StringType(), True), StructField("file_name", StringType(), True) ]) rowList = [] index = 0 for file in enumerate(only_files): with open(os.path.join(my_path, file[1]), 'r') as my_file: data = my_file.read().replace('\n', '') rowList.append((index, data, file)) index = index + 1 spark = SparkSession.builder.appName("TokenizerExample").getOrCreate() # $example on$ # sentenceDataFrame = spark.createDataFrame([ # (0, "Hi I heard about Spark"), # (1, "I wish Java could use case classes"), # (2, "Logistic,regression,models,are,neat") # ], ["id", "sentence"]) sentenceDataFrame = spark.createDataFrame(sc.parallelize(rowList), schema) regexTokenizer = RegexTokenizer(inputCol="sentence", outputCol="words", pattern="[^a-zA-Z]") # alternatively, pattern="\\w+", gaps(False) countTokens = udf(lambda words: len(words), IntegerType()) regexTokenized = regexTokenizer.transform(sentenceDataFrame) remover = StopWordsRemover(inputCol="words", outputCol="filtered") filtered = remover.transform(regexTokenized) filtered = filtered.drop("sentence").drop("words") dataType = ArrayType(StringType(), False) stemmingFn = udf(lambda words: stemmer(words), dataType) def stemmer(words): stemmed_words = [lmtzr.stem(word) for word in words] return stemmed_words stemmed = filtered.withColumn("stemmedWords", stemmingFn(fun.column("filtered"))) print(stemmed.rdd.getNumPartitions()) stemmed.select("stemmedWords").show(truncate=False) cv = CountVectorizer(inputCol="stemmedWords", outputCol="features") model = cv.fit(stemmed) result = model.transform(stemmed) result.select("features").show(truncate=False) vocabulary = model.vocabulary print(len(vocabulary)) idf = IDF(inputCol="features", outputCol="IDF_features") idfModel = idf.fit(result) rescaledData = idfModel.transform(result) rescaledData.select("IDF_features").show(truncate=False) print(rescaledData.rdd.getNumPartitions()) def filtering(vector): size = vector.size indices = vector.indices values = vector.values new_indices = [] new_values = [] for iterator, value in zip(indices, values): if value >= 0.8: new_indices.append(iterator) new_values.append(value) sparse_vector = SparseVector(size, new_indices, new_values) return sparse_vector filterFn = udf(lambda vector: filtering(vector), VectorUDT()) filteredData = rescaledData.withColumn("filteredData", filterFn("IDF_features")) filteredData = filteredData.drop("filtered") filteredData.select("filteredData").show(truncate=False) def token_extract(vector): indices = vector.indices tokens = [] for iterator in indices: tokens.append(vocabulary[iterator]) return tokens token_extract_fn = udf(lambda vector: token_extract(vector), ArrayType(StringType())) token_extracted_data = filteredData.withColumn("token_extracted_data", token_extract_fn("filteredData")) token_extracted_data.select("token_extracted_data").show(truncate=False) cv = CountVectorizer(inputCol="token_extracted_data", outputCol="newFeatures") model = cv.fit(token_extracted_data) result = model.transform(token_extracted_data) result.select("newFeatures").show(truncate=False) result = result.drop("filteredData") vocabulary = model.vocabulary print(len(vocabulary)) def hypernym_generation(token_array_a): token_array = token_array_a print(token_array) hypernym_dictionary = {} for token in token_array: synsets = wn.synsets(token) hypernym_set = [] for synset in synsets: hypernyms = synset.hypernyms() if len(hypernyms) > 0: for hypernym in hypernyms: hypernym_set.append(hypernym.lemma_names()[0]) hypernym_dictionary[token] = hypernym_set return hypernym_dictionary def convert(token_array): print(token_array) return hypernym_generation(token_array) result.select("token_extracted_data").show(truncate=False) hypernym_fn = udf(convert, MapType(StringType(), ArrayType(StringType(), True), True)) hypernym_extracted_data = result.withColumn("hypernym_extracted_data", hypernym_fn(fun.column("token_extracted_data"))) hypernym_extracted_data.select("hypernym_extracted_data").show(truncate=False) sc.stop() and at each stage after stage 8 : it produces Traceback (most recent call last): File "C:\Users\avenugopal\AppData\Local\Programs\Python\Python36\lib\runpy.py", line 193, in _run_module_as_main "__main__", mod_spec) File "C:\Users\avenugopal\AppData\Local\Programs\Python\Python36\lib\runpy.py", line 85, in _run_code exec(code, run_globals) File "C:\Users\avenugopal\Downloads\spark-2.2.0-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\worker.py", line 216, in <module> File "C:\Users\avenugopal\Downloads\spark-2.2.0-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\worker.py", line 202, in main File "C:\Users\avenugopal\Downloads\spark-2.2.0-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\serializers.py", line 577, in read_int EOFError I have no idea on it > Regarding max tax size > ---------------------- > > Key: SPARK-22802 > URL: https://issues.apache.org/jira/browse/SPARK-22802 > Project: Spark > Issue Type: Question > Components: PySpark > Affects Versions: 2.2.1 > Environment: Windows,Pycharm > Reporter: Annamalai Venugopal > Labels: windows > Original Estimate: 72h > Remaining Estimate: 72h > > 17/12/15 17:15:55 WARN TaskSetManager: Stage 15 contains a task of very large > size (895 KB). The maximum recommended task size is 100 KB. > Traceback (most recent call last): > File > "C:\Users\avenugopal\AppData\Local\Programs\Python\Python36\lib\runpy.py", > line 193, in _run_module_as_main > "__main__", mod_spec) > File > "C:\Users\avenugopal\AppData\Local\Programs\Python\Python36\lib\runpy.py", > line 85, in _run_code > exec(code, run_globals) > File > "C:\Users\avenugopal\Downloads\spark-2.2.0-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\worker.py", > line 216, in <module> > File > "C:\Users\avenugopal\Downloads\spark-2.2.0-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\worker.py", > line 202, in main > File > "C:\Users\avenugopal\Downloads\spark-2.2.0-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\serializers.py", > line 577, in read_int > EOFError -- This message was sent by Atlassian JIRA (v6.4.14#64029) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org