[ https://issues.apache.org/jira/browse/SPARK-33110?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17211769#comment-17211769 ]
Andrea Viano commented on SPARK-33110: -------------------------------------- import findspark findspark.init() import pyspark from pyspark.sql import SparkSession spark=SparkSession.builder.appName('Apriori').getOrCreate() #https://stackoverflow.com/questions/21971449/how-do-i-increase-the-cell-width-of-the-jupyter-ipython-notebook-in-my-browser from IPython.core.display import display, HTML display(HTML("<style>.container \{ width:105% !important; }</style>")) from pyspark.sql.types import StringType from pyspark.sql.types import IntegerType from pyspark.sql.types import StructType from pyspark.sql.types import StructField from pyspark.sql.types import ArrayType from pyspark.sql.types import BooleanType from pyspark.sql.functions import array_contains from pyspark.sql.functions import col from pyspark.sql.functions import concat from pyspark.sql.functions import collect_list import pandas as pd #https://community.cloudera.com/t5/Support-Questions/Pyspark-can-t-show-a-CSV-with-an-array/td-p/229618 string = "Coals" from pyspark.sql.functions import udf def str_to_arr(my_list): my_list = my_list.split(",") new_list=[] for elem in my_list: new_list.append(elem) new_list.sort() return new_list def transaction_size(my_list): return len(my_list) '''def transaction_check(my_list): for e in my_list: if e != "Beer": return True break else: return False''' def transaction_check(array): return (any("Coals" in word for word in array)) # https://stackoverflow.com/questions/37284077/combine-pyspark-dataframe-arraytype-fields-into-single-arraytype-field def concat(type): def concat_(*args): return list(chain.from_iterable((arg if arg else [] for arg in args))) return udf(concat_, ArrayType(type)) concat_string_arrays = concat(StringType()) ##def make_list(my_list): schema = StructType([StructField('TRANSACTIONS', StringType(),True)]) transaction_set = spark.read.option("delimiter", ";").csv('testApriori2.csv',schema=schema) transaction_set.show() #https://stackoverflow.com/questions/43406887/spark-dataframe-how-to-add-a-index-column-aka-distributed-data-index from pyspark.sql.functions import monotonically_increasing_id transaction_set = transaction_set.select("*").withColumn("id", monotonically_increasing_id()+1) transaction_set.show() str_to_arr_udf = udf(str_to_arr,ArrayType(StringType())) transaction_set= transaction_set.withColumn('TRANSACTIONS_LIST',str_to_arr_udf(transaction_set["TRANSACTIONS"])) transaction_set = transaction_set.drop("TRANSACTIONS") transaction_set.show() transaction_set.printSchema() rep_trans_count=transaction_set.groupBy("TRANSACTIONS_LIST").count() rep_trans_count.show() rep_trans_count.printSchema() from pyspark.sql.functions import size #https://stackoverflow.com/questions/44541605/how-to-get-the-lists-length-in-one-column-in-dataframe-spark count_for_transact = rep_trans_count.select('*',size("TRANSACTIONS_LIST").alias("TRANSACTIONS_LIST_SIZE")) count_for_transact.show() #transaction_size.show() transaction_check_udf = udf(transaction_check,BooleanType()) object_transaction= count_for_transact.withColumn('TRANSACTIONS_LIST_BOOL',transaction_check_udf(count_for_transact["TRANSACTIONS_LIST"])) object_transaction.show(80,False) object_transaction_sel=object_transaction.filter(col("TRANSACTIONS_LIST_BOOL")==True) object_transaction_sel.show(80,False) element_to_compare = object_transaction_sel.groupby("TRANSACTIONS_LIST_BOOL").agg(collect_list("TRANSACTIONS_LIST")).show(80,False) element_to_compare = element_to_compare.select(concat(col("TRANSACTIONS_LIST"))).show(80,False) #element_to_compare.select(concat_string_arrays("collect_list(TRANSACTIONS_LIST)")).show(truncate=False) beer=rep_trans_count.select('*').where(array_contains(rep_trans_count["TRANSACTIONS_LIST"],"Beer")) beer.show(80,False)[^testApriori3.csv] > array_contains doesn't pick element in the array but just at the end > -------------------------------------------------------------------- > > Key: SPARK-33110 > URL: https://issues.apache.org/jira/browse/SPARK-33110 > Project: Spark > Issue Type: Bug > Components: SQL > Affects Versions: 2.4.7, 3.0.1 > Environment: Windows 10 > Python 3.7.4 > Java JDK 8 > hadoop 3.0.1 > Reporter: Andrea Viano > Priority: Major > Attachments: beer0.PNG, beer1.PNG, beer2.PNG, testApriori3.csv > > > Hello, > given a column of array of strings. I used array_contains to find all the > array with the string "Beer". array_contains is recognise just the string > "Beer" if it is at the end of the array but not if it is in the middle of the > array. > beer=rep_trans_count.withColumn("keep", > array_contains(rep_trans_count.TRANSACTIONSLIST,"Beer")).filter(col("keep")==True) > > -- This message was sent by Atlassian Jira (v8.3.4#803005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org