One quick observation is that you allocate all your local CPUs to Spark
then execute that app with 10 Threads i.e 10 spark apps and so you will
need 160cores in total as each will need 16CPUs IMHO. Wouldn't that create
CPU bottleneck?

Also on the side note, why you need Spark if you use that on local only?
Sparks power can only be (mainly) observed in a cluster env.
I have achieved great parallelism using pandas and pools on a local machine
in the past.


On Wed, 20 Jul 2022, 21:39 Bjørn Jørgensen, <bjornjorgen...@gmail.com>
wrote:

> I have 400k of JSON files. Which is between 10 kb and 500 kb in size.
> They don`t have the same schema, so I have to loop over them one at a
> time.
>
> This works, but is`s very slow. This process takes 5 days!
>
> So now I have tried to run this functions in a ThreadPool. But it don`t
> seems to work.
>
>
> *Start local spark. The system have 16 cores and 64 GB.*
>
> number_cores = int(multiprocessing.cpu_count())
>
> mem_bytes = os.sysconf('SC_PAGE_SIZE') * os.sysconf('SC_PHYS_PAGES')  #
> e.g. 4015976448
> memory_gb = int(mem_bytes/(1024.**3))  # e.g. 3.74
>
>
> def get_spark_session(app_name: str, conf: SparkConf):
>     conf.setMaster('local[{}]'.format(number_cores))
>     conf \
>       .set('spark.driver.memory', '{}g'.format(memory_gb)) \
>       .set("spark.sql.repl.eagerEval.enabled", "True") \
>       .set("spark.sql.adaptive.enabled", "True") \
>       .set("spark.serializer",
> "org.apache.spark.serializer.KryoSerializer") \
>       .set("spark.sql.repl.eagerEval.maxNumRows", "10000")
>
>     return
> SparkSession.builder.appName(app_name).config(conf=conf).getOrCreate()
>
> spark = get_spark_session("Falk", SparkConf())
>
>
> *Function to rename columns with \\ *
>
> # We take a dataframe and return a new one with required changes
> def cleanDataFrame(df: DataFrame) -> DataFrame:
>     # Returns a new sanitized field name (this function can be anything
> really)
>     def sanitizeFieldName(s: str) -> str:
>         return s.replace("-", "_").replace("&", "_").replace("\"", "_")\
>             .replace("[", "_").replace("]", "_").replace(".", "_")
>
>     # We call this on all fields to create a copy and to perform any
> changes we might
>     # want to do to the field.
>     def sanitizeField(field: StructField) -> StructField:
>         field = copy(field)
>         field.name = sanitizeFieldName(field.name)
>         # We recursively call cleanSchema on all types
>         field.dataType = cleanSchema(field.dataType)
>         return field
>
>     def cleanSchema(dataType: [DataType]) -> [DateType]:
>         dataType = copy(dataType)
>         # If the type is a StructType we need to recurse otherwise we can
> return since
>         # we've reached the leaf node
>         if isinstance(dataType, StructType):
>             # We call our sanitizer for all top level fields
>             dataType.fields = [sanitizeField(f) for f in dataType.fields]
>         elif isinstance(dataType, ArrayType):
>             dataType.elementType = cleanSchema(dataType.elementType)
>         return dataType
>
>     # Now since we have the new schema we can create a new DataFrame by
> using the old Frame's RDD as data and the new schema as the schema for the
> data
>     return spark.createDataFrame(df.rdd, cleanSchema(df.schema))
>
>
>
> *Function to flatten out a nested dataframe.*
>
>
> from pyspark.sql.types import *
> from pyspark.sql.functions import *
>
>
> def flatten_test(df, sep="_"):
>     """Returns a flattened dataframe.
>         .. versionadded:: x.X.X
>
>         Parameters
>         ----------
>         sep : str
>             Delimiter for flatted columns. Default `_`
>
>         Notes
>         -----
>         Don`t use `.` as `sep`
>         It won't work on nested data frames with more than one level.
>         And you will have to use `columns.name`.
>
>         Flattening Map Types will have to find every key in the column.
>         This can be slow.
>
>         Examples
>         --------
>
>         data_mixed = [
>             {
>                 "state": "Florida",
>                 "shortname": "FL",
>                 "info": {"governor": "Rick Scott"},
>                 "counties": [
>                     {"name": "Dade", "population": 12345},
>                     {"name": "Broward", "population": 40000},
>                     {"name": "Palm Beach", "population": 60000},
>                 ],
>             },
>             {
>                 "state": "Ohio",
>                 "shortname": "OH",
>                 "info": {"governor": "John Kasich"},
>                 "counties": [
>                     {"name": "Summit", "population": 1234},
>                     {"name": "Cuyahoga", "population": 1337},
>                 ],
>             },
>         ]
>
>         data_mixed = spark.createDataFrame(data=data_mixed)
>
>         data_mixed.printSchema()
>
>         root
>         |-- counties: array (nullable = true)
>         |    |-- element: map (containsNull = true)
>         |    |    |-- key: string
>         |    |    |-- value: string (valueContainsNull = true)
>         |-- info: map (nullable = true)
>         |    |-- key: string
>         |    |-- value: string (valueContainsNull = true)
>         |-- shortname: string (nullable = true)
>         |-- state: string (nullable = true)
>
>
>         data_mixed_flat = flatten_test(df, sep=":")
>         data_mixed_flat.printSchema()
>         root
>         |-- shortname: string (nullable = true)
>         |-- state: string (nullable = true)
>         |-- counties:name: string (nullable = true)
>         |-- counties:population: string (nullable = true)
>         |-- info:governor: string (nullable = true)
>
>
>
>
>         data = [
>             {
>                 "id": 1,
>                 "name": "Cole Volk",
>                 "fitness": {"height": 130, "weight": 60},
>             },
>             {"name": "Mark Reg", "fitness": {"height": 130, "weight": 60}},
>             {
>                 "id": 2,
>                 "name": "Faye Raker",
>                 "fitness": {"height": 130, "weight": 60},
>             },
>         ]
>
>
>         df = spark.createDataFrame(data=data)
>
>         df.printSchema()
>
>         root
>         |-- fitness: map (nullable = true)
>         |    |-- key: string
>         |    |-- value: long (valueContainsNull = true)
>         |-- id: long (nullable = true)
>         |-- name: string (nullable = true)
>
>         df_flat = flatten_test(df, sep=":")
>
>         df_flat.printSchema()
>
>         root
>         |-- id: long (nullable = true)
>         |-- name: string (nullable = true)
>         |-- fitness:height: long (nullable = true)
>         |-- fitness:weight: long (nullable = true)
>
>         data_struct = [
>                 (("James",None,"Smith"),"OH","M"),
>                 (("Anna","Rose",""),"NY","F"),
>                 (("Julia","","Williams"),"OH","F"),
>                 (("Maria","Anne","Jones"),"NY","M"),
>                 (("Jen","Mary","Brown"),"NY","M"),
>                 (("Mike","Mary","Williams"),"OH","M")
>                 ]
>
>
>         schema = StructType([
>             StructField('name', StructType([
>                 StructField('firstname', StringType(), True),
>                 StructField('middlename', StringType(), True),
>                 StructField('lastname', StringType(), True)
>                 ])),
>             StructField('state', StringType(), True),
>             StructField('gender', StringType(), True)
>             ])
>
>         df_struct = spark.createDataFrame(data = data_struct, schema =
> schema)
>
>         df_struct.printSchema()
>
>         root
>         |-- name: struct (nullable = true)
>         |    |-- firstname: string (nullable = true)
>         |    |-- middlename: string (nullable = true)
>         |    |-- lastname: string (nullable = true)
>         |-- state: string (nullable = true)
>         |-- gender: string (nullable = true)
>
>         df_struct_flat = flatten_test(df_struct, sep=":")
>
>         df_struct_flat.printSchema()
>
>         root
>         |-- state: string (nullable = true)
>         |-- gender: string (nullable = true)
>         |-- name:firstname: string (nullable = true)
>         |-- name:middlename: string (nullable = true)
>         |-- name:lastname: string (nullable = true)
>         """
>     # compute Complex Fields (Arrays, Structs and Maptypes) in Schema
>     complex_fields = dict([(field.name, field.dataType)
>                             for field in df.schema.fields
>                             if type(field.dataType) == ArrayType
>                             or type(field.dataType) == StructType
>                             or type(field.dataType) == MapType])
>
>     while len(complex_fields) !=0:
>         col_name = list(complex_fields.keys())[0]
>         #print ("Processing :"+col_name+" Type :
> "+str(type(complex_fields[col_name])))
>
>         # if StructType then convert all sub element to columns.
>         # i.e. flatten structs
>         if (type(complex_fields[col_name]) == StructType):
>             expanded = [col(col_name + '.' + k).alias(col_name + sep + k)
>             for k in [n.name for n in complex_fields[col_name]]]
>             df = df.select("*", *expanded).drop(col_name)
>
>         # if ArrayType then add the Array Elements as Rows using the
> explode function
>         # i.e. explode Arrays
>         elif (type(complex_fields[col_name]) == ArrayType):
>             df = df.withColumn(col_name, explode_outer(col_name))
>
>         # if MapType then convert all sub element to columns.
>         # i.e. flatten
>         elif (type(complex_fields[col_name]) == MapType):
>             keys_df =
> df.select(explode_outer(map_keys(col(col_name)))).distinct()
>             keys = list(map(lambda row: row[0], keys_df.collect()))
>             key_cols = list(map(lambda f: col(col_name).getItem(f)
>             .alias(str(col_name + sep + f)), keys))
>             drop_column_list = [col_name]
>             df = df.select([col_name for col_name in df.columns
>             if col_name not in drop_column_list] + key_cols)
>
>         # recompute remaining Complex Fields in Schema
>         complex_fields = dict([(field.name, field.dataType)
>                             for field in df.schema.fields
>                             if type(field.dataType) == ArrayType
>                             or type(field.dataType) == StructType
>                             or type(field.dataType) == MapType])
>
>     return df
>
>
> *Function to read each file, and apply the functions and save each file as
> JSON.*
>
> def json_to_norm_with_null(dir_path, path_to_save):
>     path = dir_path
>
>     for filename in os.listdir(path):
>         if not filename.endswith('._stript_list.json'):
>             continue
>
>
>         fullname = os.path.join(path, filename)
>         with open(fullname) as json_file:
>             jsonstr = json.load(json_file)
>
>         df = spark.read.json(fullname)
>         df = cleanDataFrame(df)
>         df = flatten_test(df, sep=":")
>         df.write.mode('append').option('compression',
> 'snappy').option("ignoreNullFields", "false").json(path_to_save)
>
>
> *Function to start everything of. With hopefully 10 processes.*
>
> from multiprocessing.pool import ThreadPool
> tpool = ThreadPool(processes=10)
>
> tpool.map(json_to_norm_with_null("/home/jovyan/notebooks/falk/data/form_version/F02",
> '/home/jovyan/notebooks/falk/F02.json'))
>
>
> --
> Bjørn Jørgensen
> Vestre Aspehaug 4, 6010 Ålesund
> Norge
>
> +47 480 94 297
>

Reply via email to