RussellSpitzer commented on issue #1894:
URL: https://github.com/apache/iceberg/issues/1894#issuecomment-774303754


   All Iceberg tables exist in the context of a Catalog. If you haven't 
specified one and just provided a path it is using the implied "HadoopCatalog". 
The table will have It's own Metadata.json file which will specify the schema 
of the table and partition spec. Once created, that is the schema and partition 
spec until it is specifically altered.
   
   So in your example "./data-iceberg" will be treated as a HadoopTable and 
look in that directory for a Metadata.json with a specified schema and 
partitioning. If such a file did not exist you would get an error like
   
   ```scala> 
spark.read.parquet("/Users/russellspitzer/Temp/parquet_data").write.format("iceberg").save("/Users/russellspitzer/Temp/iceberg_table")
   org.apache.iceberg.exceptions.NoSuchTableException: Table does not exist at 
location: /Users/russellspitzer/Temp/iceberg_table```
   
   Since you didn't get such an error means that a table does exist there and 
it has a schema.
   
   When attempting attempting to write Spark saw "partitionBy" "a", "b", and 
preformed a shuffle on the input data
   So all spark partitions are now like
   
   ```
   SparkPartition 1 - > (1,1,1,1) , (1,1,2,1), (1,1,1,3) ...
   SparkPartition 2 -> (1,2,1,1), (1,2,2,1), (1,2,1,2) ...
   SparkPartition 3 -> (2,4,1,1), (2,4,2,1), (2,4,1,2) ...
   ```
   
   The first two values are always the same within the spark partition, but the 
last two are in a random order.
   
   Then the Iceberg writer gets one of these partitions and it creates a writer 
based on it's Metadata.json which let's say is not
   the same as the `partitionedBy` argument. For this example let's say the 
table is actually partitioned by 'c'
   
   By default in Spark Iceberg's writer opens a file for each partition assumes 
it will write all the values for that partition, then close the file. If it 
tries to open partition 1 it ends up doing the following
   
   ```
   (1,1,1,1) -> Open partition c = 1 and start new file
   (1,1,2,1) -> Close partition c = 1 file; Open partition c =2  and start new 
file
   (1,1,1,3) -> Open partition c = 1 /// Error already closed file for 
Partition c=1
   ```
   
   Since this is a runtime check you won't see this error with a single row 
since that will always be able to open and close the file without incident, but 
a dataset with multiple rows can potentially trigger this. Most of the time you 
want this Spark partitioning to match the Iceberg partitioning because this 
minimizes the number of files you need to actually write.
   
   If you really don't want to have the Spark Partitioning match for some 
reason you can use the sortWithinPartitions to order the data within the 
partitions by their Iceberg partition values.
   
   The other possibility is to use the new spark fanout writer which keeps all 
of the files open until they entire task has been completed.
   
   Neither of these solutions is really as good as having your data partitioned 
like the data is expecting since that minimizes the number of files being 
written since all the files for a given partition will be done by the same 
Spark Task.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to