Thanks Ha for the quick response.
I've installed 3.2.1 locally and see the same issue you're having. I went
through the PySpark codebase and this appears to be a bug. I don't see any
test-cases where they don't actually supply a predicate. A possible
workaround might be overwritePartitions
<https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrameWriterV2.overwritePartitions.html>
.
>>> table_name = "local.test.persons_with_age"
>>> spark.sql(f"""DROP TABLE IF EXISTS {table_name}""").show()
>>>
>>> spark.sql(f"""
... CREATE TABLE {table_name} (
... name string,
... age int
... )
... USING iceberg
... PARTITIONED BY (age);
... """).show()
>>> spark.table(table_name).show()
+----+---+
|name|age|
+----+---+
+----+---+
>>> persons = [('Fokko', 1), ('Gurbe', 2), ('Pieter', 2)]
>>> df = spark.createDataFrame(persons, ['name', 'age'])
>>>
>>> df.writeTo(table_name).append()
>>> spark.table(table_name).show()
+------+---+
| name|age|
+------+---+
| Fokko| 1|
| Gurbe| 2|
|Pieter| 2|
+------+---+
>>>
>>> new_person = [('Joe', 2)]
>>> df_overwrite = spark.createDataFrame(new_person, ['name', 'age'])
>>>
>>> from pyspark.sql.functions import col
>>>
>>> df_overwrite.writeTo(table_name).overwritePartitions()
>>>
>>> spark.table(table_name).show()
+-----+---+
| name|age|
+-----+---+
|Fokko| 1|
| Joe| 2|
+-----+---+
Let me know if this could solve the issues on your end.
Kind regards,
Fokko Driesprong
Op vr 28 jun 2024 om 21:26 schreef Ha Cao <[email protected]>:
> Hi Fokko,
>
>
>
> Thanks so much for sharing. I am using version 3.2.1. Is this not
> supported in 3.2.1?
>
>
>
> I do get the error with the `col` syntax:
> df2.writeTo(spark_table_path).using("iceberg").overwrite(col("tid") >= 2)
>
>
>
> The stack trace would look like this:
>
>
>
> ---------------------------------------------------------------------------
>
> TypeError Traceback (most recent call last)
>
>
>
> 1 from pyspark.sql.functions import col
>
> ----> 2
> df2.writeTo(spark_table_path).using("iceberg").overwrite(col("tid") >= 2)
>
>
>
> ...pyspark/sql/readwriter.py in overwrite(self, condition)
>
> 1164 the output table.
>
> 1165 """
>
> -> 1166 self._jwriter.overwrite(condition)
>
> 1167
>
> 1168 @since(3.1)
>
>
>
> ...py4j/java_gateway.py in __call__(self, *args)
>
> 1311
>
> 1312 def __call__(self, *args):
>
> -> 1313 args_command, temp_args = self._build_args(*args)
>
> 1314
>
> 1315 command = proto.CALL_COMMAND_NAME +\
>
>
>
> ...py4j/java_gateway.py in _build_args(self, *args)
>
> 1275 def _build_args(self, *args):
>
> 1276 if self.converters is not None and len(self.converters) >
> 0:
>
> -> 1277 (new_args, temp_args) = self._get_args(args)
>
> 1278 else:
>
> 1279 new_args = args
>
>
>
> ...py4j/java_gateway.py in _get_args(self, args)
>
> 1262 for converter in self.gateway_client.converters:
>
> 1263 if converter.can_convert(arg):
>
> -> 1264 temp_arg = converter.convert(arg,
> self.gateway_client)
>
> 1265 temp_args.append(temp_arg)
>
> 1266 new_args.append(temp_arg)
>
>
>
> ...py4j/java_collections.py in convert(self, object, gateway_client)
>
> 508 ArrayList = JavaClass("java.util.ArrayList",
> gateway_client)
>
> 509 java_list = ArrayList()
>
> --> 510 for element in object:
>
> 511 java_list.add(element)
>
> 512 return java_list
>
>
>
> ...pyspark/sql/column.py in __iter__(self)
>
> 461
>
> 462 def __iter__(self):
>
> --> 463 raise TypeError("Column is not iterable")
>
> 464
>
> 465 # string methods
>
>
>
> TypeError: Column is not iterable
>
>
>
> Thanks!
>
> Best,
>
> Ha
>
>
>
> *From:* Fokko Driesprong <[email protected]>
> *Sent:* Friday, June 28, 2024 3:00 PM
> *To:* [email protected]
> *Subject:* Re: Iceberg - PySpark overwrite with a condition
>
>
>
> Hey Ha,
>
>
>
> What version of Spark are you using? Can you share the whole stack trace?
> I tried to reproduce it locally and it worked fine:
>
>
>
> pyspark --packages org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.5.2\
> --conf
> spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions
> \
> --conf
> spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog
> \
> --conf spark.sql.catalog.spark_catalog.type=hive \
> --conf spark.sql.catalog.local=org.apache.iceberg.spark.SparkCatalog \
> --conf spark.sql.catalog.local.type=hadoop \
> --conf spark.sql.catalog.local.warehouse=$PWD/warehouse \
> --conf spark.sql.defaultCatalog=local
> Python 3.9.6 (default, Feb 3 2024, 15:58:27)
> [Clang 15.0.0 (clang-1500.3.9.4)] on darwin
> Welcome to
> ____ __
> / __/__ ___ _____/ /__
> _\ \/ _ \/ _ `/ __/ '_/
> /__ / .__/\_,_/_/ /_/\_\ version 3.5.1
> /_/
>
> Using Python version 3.9.6 (default, Feb 3 2024 15:58:27)
> Spark context Web UI available at
> http://secure-web.cisco.com/1AplqucVgy6zNuU83jHXzTaAD7IXV0U88upo3R0ciGSYtjHwLm9Bdtd78mSwe_bPBtysvIxaZmASm2vknr_GVtMRAgGMde99uUqvLdu1fTxVt8ptznXMo4blxxjNIVJA4-7Cm59oYdA7m0fmKvhYtYy59vrlM4tAGHgE-_oq5HLnogBWQpe1hLalhCvXA78yHcTAYLxNPPka3mPFVSsQhJ5qX908IWNbeGG17g9lUKML0NumrAjj6Q8Izqs-z8MPx/http%3A%2F%2F192.168.1.10%3A4040
> Spark context available as 'sc' (master = local[*], app id =
> local-1719599873923).
> SparkSession available as 'spark'.
>
>
>
> >>> table_name = "local.test.person_with_age"
> >>>
> >>> spark.sql(f"""
> ... CREATE TABLE {table_name} (
> ... name string,
> ... age int
> ... )
> ... USING iceberg
> ... PARTITIONED BY (age);
> ... """).show()
> ++
> ||
> ++
> ++
>
> >>> spark.table(table_name).show()
> +----+---+
> |name|age|
> +----+---+
> +----+---+
>
> >>> persons = [('Fokko', 1), ('Gurbe', 2), ('Pieter', 2)]
> >>> df = spark.createDataFrame(persons, ['name', 'age'])
> >>> df.writeTo(table_name).append()
> >>> spark.table(table_name).show()
> +------+---+
> | name|age|
> +------+---+
> | Fokko| 1|
> | Gurbe| 2|
> |Pieter| 2|
> +------+---+
>
> >>> new_person = [('Rho', 2)]
> >>> df_overwrite = spark.createDataFrame(new_person, ['name', 'age'])
> >>> from pyspark.sql.functions import col
> >>> df_overwrite.writeTo(table_name).overwrite(col("age") >= 2)
> >>> spark.table(table_name).show()
> +-----+---+
> | name|age|
> +-----+---+
> | Rho| 2|
> |Fokko| 1|
> +-----+---+
>
>
>
> The syntax with the col is the way to go. I hope this helps and let me
> know if this doesn't work for you.
>
>
>
> Kind regards,
>
> Fokko
>
>
>
> Op vr 28 jun 2024 om 18:09 schreef Ha Cao <[email protected]>:
>
> Hi Ajantha,
>
>
>
> Thanks for replying! The example, however, is in Java. I figure that that
> syntax probably only works for Java and Scala. I have tried similarly for
> PySpark but still got `Column is not iterable` with:
>
> df.writeTo(spark_table_path).using("iceberg").overwrite(col("time") >
> target_timestamp)
>
>
>
> For this, I get `Column object is not callable`:
>
>
> df.writeTo(spark_table_path).using("iceberg").overwrite(col("time").less(target_timestamp))
>
>
>
> The only example I can find in the PySpark codebase is
> https://github.com/apache/spark/blob/master/python/pyspark/sql/tests/test_readwriter.py#L251
> but even with this, it throws `Column is not iterable`. I cannot find any
> other test case that tests `overwrite()` as a method.
>
>
>
> Thank you!
>
> Best,
>
> Ha
>
>
>
> *From:* Ajantha Bhat <[email protected]>
> *Sent:* Friday, June 28, 2024 3:52 AM
> *To:* [email protected]
> *Subject:* Re: Iceberg - PySpark overwrite with a condition
>
>
>
> Hi,
>
> Please refer this doc:
> https://iceberg.apache.org/docs/nightly/spark-writes/#overwriting-data
>
> We do have some test cases for the same:
> https://github.com/apache/iceberg/blob/91fbcaa62c25308aa815557dd2c0041f75530705/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/sql/PartitionedWritesTestBase.java#L153
>
> - Ajantha
>
>
>
> On Fri, Jun 28, 2024 at 1:00 AM Ha Cao <[email protected]> wrote:
>
> Hello,
>
>
>
> I am experimenting with PySpark’s DataFrameWriterV2 overwrite()
> <https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrameWriterV2.overwrite.html>
> to an Iceberg table with existing data in a target partition. My goal is
> that instead of overwriting the entire partition, it will only overwrite
> specific rows that match the condition. However, I can’t get it to work
> with any syntax and I keep getting “Column is not iterable”. I have tried:
>
>
>
> df.writeTo(spark_table_path).using("iceberg").overwrite(df.tid)
>
> df.writeTo(spark_table_path).using("iceberg").overwrite(df.tid.isin(1))
>
> df.writeTo(spark_table_path).using("iceberg").overwrite(df.tid >= 1)
>
>
>
> and all of these syntaxes fail with “Column is not iterable”.
>
>
>
> What is the correct syntax for this? I also think that there is a
> possibility that Iceberg-PySpark integration doesn’t support overwrite, but
> I don’t know how to confirm this.
>
>
>
> Thank you so much!
>
> Best,
> Ha
>
>