[
https://issues.apache.org/jira/browse/SPARK-22802?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16292527#comment-16292527
]
Annamalai Venugopal commented on SPARK-22802:
---------------------------------------------
I am doing a project with spark with the source code as:
import sys
import os
from nltk.stem import SnowballStemmer
from nltk.corpus import wordnet as wn
from os import listdir
from os.path import isfile, join
from pyspark.sql.types import *
from pyspark.ml.feature import RegexTokenizer
from pyspark.sql.functions import udf
from pyspark.ml.feature import StopWordsRemover, CountVectorizer, IDF
from pyspark.ml.linalg import SparseVector, VectorUDT
lmtzr = SnowballStemmer("english")
# Set the path for spark installation
# this is the path where you downloaded and uncompressed the Spark download
# Using forward slashes on windows, \\ should work too.
os.environ['SPARK_HOME'] =
"C:/Users/avenugopal/Downloads/spark-2.2.0-bin-hadoop2.7/"
# Append the python dir to PYTHONPATH so that pyspark could be found
sys.path.append("C:/Users/avenugopal/Downloads/spark-2.2.0-bin-hadoop2.7/python/")
# Append the python/build to PYTHONPATH so that py4j could be found
sys.path.append('C:/Users/avenugopal/Downloads/spark-2.2.0-bin-hadoop2.7/python/build')
# try the import Spark Modules
try:
from pyspark import SparkContext
from pyspark import SparkConf
from pyspark import SQLContext
from pyspark.sql import SparkSession
import pyspark.sql.functions as fun
except ImportError as e:
print("Error importing Spark Modules", e)
sys.exit(1)
# Run a word count example on the local machine:
# if len(sys.argv) != 2:
# print ( sys.stderr, "Usage: wordcount <file>")
# exit(-1)
conf = SparkConf()
conf.setMaster("local")
conf.setAppName("spark_wc").set("spark.driver.maxResultSize",
"3g").set("spark.executor.memory", "3g")
sc = SparkContext(conf=conf)
sc.range(100)
my_path = "venv//reuters-extracted"
only_files = [f for f in listdir(my_path) if isfile(join(my_path, f))]
schema = StructType([
StructField("id", IntegerType(), True),
StructField("sentence", StringType(), True),
StructField("file_name", StringType(), True)
])
rowList = []
index = 0
for file in enumerate(only_files):
with open(os.path.join(my_path, file[1]), 'r') as my_file:
data = my_file.read().replace('\n', '')
rowList.append((index, data, file))
index = index + 1
spark = SparkSession.builder.appName("TokenizerExample").getOrCreate()
# $example on$
# sentenceDataFrame = spark.createDataFrame([
# (0, "Hi I heard about Spark"),
# (1, "I wish Java could use case classes"),
# (2, "Logistic,regression,models,are,neat")
# ], ["id", "sentence"])
sentenceDataFrame = spark.createDataFrame(sc.parallelize(rowList), schema)
regexTokenizer = RegexTokenizer(inputCol="sentence", outputCol="words",
pattern="[^a-zA-Z]")
# alternatively, pattern="\\w+", gaps(False)
countTokens = udf(lambda words: len(words), IntegerType())
regexTokenized = regexTokenizer.transform(sentenceDataFrame)
remover = StopWordsRemover(inputCol="words", outputCol="filtered")
filtered = remover.transform(regexTokenized)
filtered = filtered.drop("sentence").drop("words")
dataType = ArrayType(StringType(), False)
stemmingFn = udf(lambda words: stemmer(words), dataType)
def stemmer(words):
stemmed_words = [lmtzr.stem(word) for word in words]
return stemmed_words
stemmed = filtered.withColumn("stemmedWords",
stemmingFn(fun.column("filtered")))
print(stemmed.rdd.getNumPartitions())
stemmed.select("stemmedWords").show(truncate=False)
cv = CountVectorizer(inputCol="stemmedWords", outputCol="features")
model = cv.fit(stemmed)
result = model.transform(stemmed)
result.select("features").show(truncate=False)
vocabulary = model.vocabulary
print(len(vocabulary))
idf = IDF(inputCol="features", outputCol="IDF_features")
idfModel = idf.fit(result)
rescaledData = idfModel.transform(result)
rescaledData.select("IDF_features").show(truncate=False)
print(rescaledData.rdd.getNumPartitions())
def filtering(vector):
size = vector.size
indices = vector.indices
values = vector.values
new_indices = []
new_values = []
for iterator, value in zip(indices, values):
if value >= 0.8:
new_indices.append(iterator)
new_values.append(value)
sparse_vector = SparseVector(size, new_indices, new_values)
return sparse_vector
filterFn = udf(lambda vector: filtering(vector), VectorUDT())
filteredData = rescaledData.withColumn("filteredData", filterFn("IDF_features"))
filteredData = filteredData.drop("filtered")
filteredData.select("filteredData").show(truncate=False)
def token_extract(vector):
indices = vector.indices
tokens = []
for iterator in indices:
tokens.append(vocabulary[iterator])
return tokens
token_extract_fn = udf(lambda vector: token_extract(vector),
ArrayType(StringType()))
token_extracted_data = filteredData.withColumn("token_extracted_data",
token_extract_fn("filteredData"))
token_extracted_data.select("token_extracted_data").show(truncate=False)
cv = CountVectorizer(inputCol="token_extracted_data", outputCol="newFeatures")
model = cv.fit(token_extracted_data)
result = model.transform(token_extracted_data)
result.select("newFeatures").show(truncate=False)
result = result.drop("filteredData")
vocabulary = model.vocabulary
print(len(vocabulary))
def hypernym_generation(token_array_a):
token_array = token_array_a
print(token_array)
hypernym_dictionary = {}
for token in token_array:
synsets = wn.synsets(token)
hypernym_set = []
for synset in synsets:
hypernyms = synset.hypernyms()
if len(hypernyms) > 0:
for hypernym in hypernyms:
hypernym_set.append(hypernym.lemma_names()[0])
hypernym_dictionary[token] = hypernym_set
return hypernym_dictionary
def convert(token_array):
print(token_array)
return hypernym_generation(token_array)
result.select("token_extracted_data").show(truncate=False)
hypernym_fn = udf(convert, MapType(StringType(), ArrayType(StringType(), True),
True))
hypernym_extracted_data = result.withColumn("hypernym_extracted_data",
hypernym_fn(fun.column("token_extracted_data")))
hypernym_extracted_data.select("hypernym_extracted_data").show(truncate=False)
sc.stop()
and at each stage after stage 8 :
it produces Traceback (most recent call last):
File
"C:\Users\avenugopal\AppData\Local\Programs\Python\Python36\lib\runpy.py", line
193, in _run_module_as_main
"__main__", mod_spec)
File
"C:\Users\avenugopal\AppData\Local\Programs\Python\Python36\lib\runpy.py", line
85, in _run_code
exec(code, run_globals)
File
"C:\Users\avenugopal\Downloads\spark-2.2.0-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\worker.py",
line 216, in <module>
File
"C:\Users\avenugopal\Downloads\spark-2.2.0-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\worker.py",
line 202, in main
File
"C:\Users\avenugopal\Downloads\spark-2.2.0-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\serializers.py",
line 577, in read_int
EOFError
I have no idea on it
> Regarding max tax size
> ----------------------
>
> Key: SPARK-22802
> URL: https://issues.apache.org/jira/browse/SPARK-22802
> Project: Spark
> Issue Type: Question
> Components: PySpark
> Affects Versions: 2.2.1
> Environment: Windows,Pycharm
> Reporter: Annamalai Venugopal
> Labels: windows
> Original Estimate: 72h
> Remaining Estimate: 72h
>
> 17/12/15 17:15:55 WARN TaskSetManager: Stage 15 contains a task of very large
> size (895 KB). The maximum recommended task size is 100 KB.
> Traceback (most recent call last):
> File
> "C:\Users\avenugopal\AppData\Local\Programs\Python\Python36\lib\runpy.py",
> line 193, in _run_module_as_main
> "__main__", mod_spec)
> File
> "C:\Users\avenugopal\AppData\Local\Programs\Python\Python36\lib\runpy.py",
> line 85, in _run_code
> exec(code, run_globals)
> File
> "C:\Users\avenugopal\Downloads\spark-2.2.0-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\worker.py",
> line 216, in <module>
> File
> "C:\Users\avenugopal\Downloads\spark-2.2.0-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\worker.py",
> line 202, in main
> File
> "C:\Users\avenugopal\Downloads\spark-2.2.0-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\serializers.py",
> line 577, in read_int
> EOFError
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]