spark.sql.shuffle.partitions=auto
May i know is spark.sql.shuffle.partitions=auto only available on Databricks? what about on vanilla Spark ? When i set this, it gives error need to put int. Any open source library that auto find the best partition , block size for dataframe?
auto create event log directory if not exist
Spark history server is set to use s3a, like below spark.eventLog.enabled true spark.eventLog.dir s3a://bucket-test/test-directory-log any configuration option i can set on the Spark config such that if the directory 'test-directory-log' does not exist auto create it before start Spark history server? Thank you
randomsplit has issue?
based on this blog post https://sergei-ivanov.medium.com/why-you-should-not-use-randomsplit-in-pyspark-to-split-data-into-train-and-test-58576d539a36 , I noticed a recommendation against using randomSplit for data splitting due to data sorting. Is the information provided in the blog accurate? I understand that the reason for data sorting is to partition the data using Spark. Could anyone clarify whether we should continue using randomSplit to divide our data into training and test sets or use filter() instead? Thank you
Re: conver panda image column to spark dataframe
Hello Adrian, here is the snippet import tensorflow_datasets as tfds (ds_train, ds_test), ds_info = tfds.load( dataset_name, data_dir='', split=["train", "test"], with_info=True, as_supervised=True ) schema = StructType([ StructField("image", ArrayType(ArrayType(ArrayType(IntegerType(, nullable=False), StructField("label", IntegerType(), nullable=False) ]) pp4 = spark.createDataFrame(pd.DataFrame(tfds.as_dataframe(ds_train.take(4), ds_info)), schema) raised error , TypeError: field image: ArrayType(ArrayType(ArrayType(IntegerType(), True), True), True) can not accept object array([[[14, 14, 14], [14, 14, 14], [14, 14, 14], ..., [19, 17, 20], [19, 17, 20], [19, 17, 20]], On Thursday, August 3, 2023 at 11:34:08 PM GMT+8, Adrian Pop-Tifrea wrote: Hello, can you also please show us how you created the pandas dataframe? I mean, how you added the actual data into the dataframe. It would help us for reproducing the error. Thank you,Pop-Tifrea Adrian On Mon, Jul 31, 2023 at 5:03 AM second_co...@yahoo.com wrote: i changed to ArrayType(ArrayType(ArrayType(IntegerType( , still get same error Thank you for responding On Thursday, July 27, 2023 at 06:58:09 PM GMT+8, Adrian Pop-Tifrea wrote: Hello, when you said your pandas Dataframe has 10 rows, does that mean it contains 10 images? Because if that's the case, then you'd want ro only use 3 layers of ArrayType when you define the schema. Best regards,Adrian On Thu, Jul 27, 2023, 11:04 second_co...@yahoo.com.INVALID wrote: i have panda dataframe with column 'image' using numpy.ndarray. shape is (500, 333, 3) per image. my panda dataframe has 10 rows, thus, shape is (10, 500, 333, 3) when using spark.createDataframe(panda_dataframe, schema), i need to specify the schema, schema = StructType([ StructField("image", ArrayType(ArrayType(ArrayType(ArrayType(IntegerType(), nullable=False) ]) i get error raise TypeError( , TypeError: field image: ArrayType(ArrayType(ArrayType(ArrayType(IntegerType(), True), True), True), True) can not accept object array([[[14, 14, 14],... Can advise how to set schema for image with numpy.ndarray ?
Re: conver panda image column to spark dataframe
i changed to ArrayType(ArrayType(ArrayType(IntegerType( , still get same error Thank you for responding On Thursday, July 27, 2023 at 06:58:09 PM GMT+8, Adrian Pop-Tifrea wrote: Hello, when you said your pandas Dataframe has 10 rows, does that mean it contains 10 images? Because if that's the case, then you'd want ro only use 3 layers of ArrayType when you define the schema. Best regards,Adrian On Thu, Jul 27, 2023, 11:04 second_co...@yahoo.com.INVALID wrote: i have panda dataframe with column 'image' using numpy.ndarray. shape is (500, 333, 3) per image. my panda dataframe has 10 rows, thus, shape is (10, 500, 333, 3) when using spark.createDataframe(panda_dataframe, schema), i need to specify the schema, schema = StructType([ StructField("image", ArrayType(ArrayType(ArrayType(ArrayType(IntegerType(), nullable=False) ]) i get error raise TypeError( , TypeError: field image: ArrayType(ArrayType(ArrayType(ArrayType(IntegerType(), True), True), True), True) can not accept object array([[[14, 14, 14],... Can advise how to set schema for image with numpy.ndarray ?
conver panda image column to spark dataframe
i have panda dataframe with column 'image' using numpy.ndarray. shape is (500, 333, 3) per image. my panda dataframe has 10 rows, thus, shape is (10, 500, 333, 3) when using spark.createDataframe(panda_dataframe, schema), i need to specify the schema, schema = StructType([ StructField("image", ArrayType(ArrayType(ArrayType(ArrayType(IntegerType(), nullable=False) ]) i get error raise TypeError( , TypeError: field image: ArrayType(ArrayType(ArrayType(ArrayType(IntegerType(), True), True), True), True) can not accept object array([[[14, 14, 14],... Can advise how to set schema for image with numpy.ndarray ?
spark context list_packages()
I ran the following code spark.sparkContext.list_packages() on spark 3.4.1 and i get below error An error was encountered: AttributeError [Traceback (most recent call last): , File "/tmp/spark-3d66c08a-08a3-4d4e-9fdf-45853f65e03d/shell_wrapper.py", line 113, in exec self._exec_then_eval(code) , File "/tmp/spark-3d66c08a-08a3-4d4e-9fdf-45853f65e03d/shell_wrapper.py", line 106, in _exec_then_eval exec(compile(last, '', 'single'), self.globals) , File "", line 1, in , AttributeError: 'SparkContext' object has no attribute 'list_packages' ] Is list_packages and install_pypi_package available for vanilla spark or only available for AWS services? Thank you
cannot load model using pyspark
spark.sparkContext.textFile("s3a://a_bucket/models/random_forest_zepp/bestModel/metadata", 1).getNumPartitions() when i run above code, i get below error. Can advice how to troubleshoot? i' using spark 3.3.0. the above file path exist. ---Py4JJavaError Traceback (most recent call last)Cell In[16], line 1> 1 spark.sparkContext.textFile("s3a://a)bucket/models/random_forest_zepp/bestModel/metadata", 1).getNumPartitions()File /spark/python/lib/pyspark.zip/pyspark/rdd.py:599, in RDD.getNumPartitions(self) 589 def getNumPartitions(self) -> int: 590 """ 591 Returns the number of partitions in RDD 592 (...) 597 2 598 """--> 599 return self._jrdd.partitions().size()File /spark/python/lib/py4j-0.10.9.5-src.zip/py4j/java_gateway.py:1321, in JavaMember.__call__(self, *args) 1315 command = proto.CALL_COMMAND_NAME +\ 1316 self.command_header +\ 1317 args_command +\ 1318 proto.END_COMMAND_PART 1320 answer = self.gateway_client.send_command(command)-> 1321 return_value = get_return_value( 1322 answer, self.gateway_client, self.target_id, self.name) 1324 for temp_arg in temp_args: 1325 temp_arg._detach()File /spark/python/lib/pyspark.zip/pyspark/sql/utils.py:190, in capture_sql_exception..deco(*a, **kw) 188 def deco(*a: Any, **kw: Any) -> Any: 189 try:--> 190 return f(*a, **kw) 191 except Py4JJavaError as e: 192 converted = convert_exception(e.java_exception)File /spark/python/lib/py4j-0.10.9.5-src.zip/py4j/protocol.py:326, in get_return_value(answer, gateway_client, target_id, name) 324 value = OUTPUT_CONVERTER[type](answer[2:], gateway_client) 325 if answer[1] == REFERENCE_TYPE:--> 326 raise Py4JJavaError( 327 "An error occurred while calling {0}{1}{2}.\n". 328 format(target_id, ".", name), value) 329 else: 330 raise Py4JError( 331 "An error occurred while calling {0}{1}{2}. Trace:\n{3}\n". 332 format(target_id, ".", name, value))Py4JJavaError: An error occurred while calling o114.partitions.: java.lang.NullPointerException at org.apache.hadoop.mapred.TextInputFormat.isSplitable(TextInputFormat.java:49) at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:370) at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:208) at org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:292) at scala.Option.getOrElse(Option.scala:189) at org.apache.spark.rdd.RDD.partitions(RDD.scala:288) at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49) at org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:292) at scala.Option.getOrElse(Option.scala:189) at org.apache.spark.rdd.RDD.partitions(RDD.scala:288) at org.apache.spark.api.java.JavaRDDLike.partitions(JavaRDDLike.scala:61) at org.apache.spark.api.java.JavaRDDLike.partitions$(JavaRDDLike.scala:61) at org.apache.spark.api.java.AbstractJavaRDDLike.partitions(JavaRDDLike.scala:45) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at py4j.Gateway.invoke(Gateway.java:282) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:238) at java.lang.Thread.run(Thread.java:750)
Re: Tensorflow on Spark CPU
I re-test with cifar10 example and below is the result . can advice why lesser num_slot is faster compared with more slots? num_slots=20 231 seconds num_slots=5 52 seconds num_slot=134 seconds the code is at below https://gist.github.com/cometta/240bbc549155e22f80f6ba670c9a2e32 Do you have an example of tensorflow+big dataset that I can test? On Saturday, April 29, 2023 at 08:44:04 PM GMT+8, Sean Owen wrote: You don't want to use CPUs with Tensorflow.If it's not scaling, you may have a problem that is far too small to distribute. On Sat, Apr 29, 2023 at 7:30 AM second_co...@yahoo.com.INVALID wrote: Anyone successfully run native tensorflow on Spark ? i tested example at https://github.com/tensorflow/ecosystem/tree/master/spark/spark-tensorflow-distributor on Kubernetes CPU . By running in on multiple workers CPUs. I do not see any speed up in training time by setting number of slot from1 to 10. The time taken to train is still the same. Anyone tested tensorflow training on Spark distributed workers with CPUs ? Can share your working example?
Tensorflow on Spark CPU
Anyone successfully run native tensorflow on Spark ? i tested example at https://github.com/tensorflow/ecosystem/tree/master/spark/spark-tensorflow-distributor on Kubernetes CPU . By running in on multiple workers CPUs. I do not see any speed up in training time by setting number of slot from1 to 10. The time taken to train is still the same. Anyone tested tensorflow training on Spark distributed workers with CPUs ? Can share your working example?
driver and executors shared same Kubernetes PVC
i able to shared same PVC for spark 3.3. but on Spark 3.4 onward. i get below error. I would like all the executors and driver to mount the same PVC. Is this a bug ? I don't want to use SPARK_EXECUTOR_ID or OnDemandOn because otherwise each of the executors will use an unique and separate PVC. "should contain OnDemand or SPARK_EXECUTOR_ID when requiring multiple executors" Can comment ?
read a binary file and save in another location
any example on how to read a binary file using pySpark and save it in another location . copy feature Thank you,Teoh
pyspark.sql.dataframe.DataFrame versus pyspark.pandas.frame.DataFrame
Good day, May i know what is the different between pyspark.sql.dataframe.DataFrame versus pyspark.pandas.frame.DataFrame ? Are both store in Spark dataframe format? I'm looking for a way to load a huge excel file (4-10GB), i wonder should i use third party library spark-excel or just use native pyspark.pandas ? I prefer to use Spark dataframe so that it uses the parallelization feature of Spark in the executors instead of running it on the driver. Can help to advice ? Detail---df = spark.read \ .format("com.crealytics.spark.excel") \ .option("header", "true") \ .load("/path/big_excel.xls") print(type(df)) # output pyspark.sql.dataframe.DataFrame import pyspark.pandas as ps from pyspark.sql import DataFrame path="/path/big-excel.xls" df= ps.read_excel(path) # output pyspark.pandas.frame.DataFrame Thank you.
cannot write spark log to s3a
when running spark job, i used "spark.eventLog.dir": "s3a://_some_bucket_on_prem/spark-history", "spark.eventLog.enabled": true i see the log of the job shows 22/11/10 06:42:30 INFO SingleEventLogFileWriter: Logging events to s3a://_some_bucket_on_prem/spark-history/spark-a2befd8cb9134190982a35663b61294b.inprogress22/11/10 06:42:30 WARN S3ABlockOutputStream: Application invoked the Syncable API against stream writing to _some_bucket_on_prem/spark-history/a2befd8cb9134190982a35663b61294b.inprogress. This is unsupported Does spark 3.3.0 support write to s3a bucket for the log? I can't write the log . It is a on-premise s3a. Do I miss out any jar library? Does it support any cloud blob storage providers?
Re: pyspark connect to spark thrift server port
Hello Artemis, Understand, if i gave hive metastore uri to anyone to connect using pyspark. the port 9083 is open for anyone without authentication feature. The only way pyspark able to connect to hive is through 9083 and not through port 1. On Friday, October 21, 2022 at 04:06:38 AM GMT+8, Artemis User wrote: By default, Spark uses Apache Derby (running in embedded mode with store content defined in local files) for hosting the Hive metastore. You can externalize the metastore on a JDBC-compliant database (e.g., PostgreSQL) and use the database authentication provided by the database. The JDBC configuration shall be defined in a hive-site.xml file in the Spark conf directory. Please see the metastore admin guide for more details, including an init script for setting up your metastore(https://cwiki.apache.org/confluence/display/Hive/AdminManual+Metastore+3.0+Administration). On 10/20/22 4:31 AM, second_co...@yahoo.com.INVALID wrote: Currently my pyspark code able to connect to hive metastore at port 9083. However using this approach i can't put in-place any security mechanism like LDAP and sql authentication control. Is there anyway to connect from pyspark to spark thrift server on port 1 without exposing hive metastore url to the pyspark ? I would like to authenticate the user before allow to execute spark sql, and user should only allow to query from databases,tables that they have the access. Thank you, comet
pyspark connect to spark thrift server port
Currently my pyspark code able to connect to hive metastore at port 9083. However using this approach i can't put in-place any security mechanism like LDAP and sql authentication control. Is there anyway to connect from pyspark to spark thrift server on port 1 without exposing hive metastore url to the pyspark ? I would like to authenticate the user before allow to execute spark sql, and user should only allow to query from databases,tables that they have the access. Thank you,comet