stevenlii commented on issue #7226:
URL: https://github.com/apache/iceberg/issues/7226#issuecomment-2047156687

   > We've found a workaround in our use case. (Iceberg 1.4.3, Spark 3.3.0 on 
Glue 4.0).
   > 
   > Our previous flow was:
   > 
   > ```
   > TABLE = "glue_catalog.<database>.<table>"
   > 
   > # set up readStream
   > read_stream = spark.readStream.format(
   > <setup read stream>
   > .load()
   > 
   > # dataframe operations
   > df = read_stream.select(
   > <various dataframe operations>
   > )
   > 
   > # setup write stream
   > write_stream = 
df.writeStream.format("iceberg").outputMode("append").trigger(
   >     processingTime=job_args["TRIGGER_PROCESSING_TIME"]
   > ).options(**{
   >     "fanout-enabled": job_args["FANOUT_ENABLED"],
   >     "checkpointLocation": job_args["CHECKPOINT_LOCATION"],
   > }).toTable(TABLE)
   > ```
   > 
   > which always failed on the insert with the above described error.
   > 
   > Our new flow is to use processBatch:
   > 
   > ```
   > def process_batch(df, batch_id):
   >     df = df.select(
   >     <various dataframe operations>
   >     )
   > 
   >     df.writeTo(TABLE).append()
   > 
   > 
   > read_stream.writeStream.forEachBatch(process_batch).start()
   > ```
   > 
   > The above is for completeness, as we're actually using Glue's inbuilt 
[`GlueContext.forEachBatch`](https://docs.aws.amazon.com/glue/latest/dg/aws-glue-api-crawler-pyspark-extensions-glue-context.html#aws-glue-api-crawler-pyspark-extensions-glue-context-forEachBatch)
 but it [does exactly the same 
thing](https://github.com/awslabs/aws-glue-libs/blob/master/awsglue/context.py#L602).
   > 
   > and this is no longer failing. We're able to write to the table with 
partition transforms (we're using `hour()` to partition our data).
   > 
   > Interestingly, the data is now being written to S3 as you'd expect for the 
S3FileIO implementation (i.e. writes are prefixed with a [random 
string](https://iceberg.apache.org/docs/latest/aws/#object-store-file-layout), 
where previously this wasn't happening.
   > 
   > It would be nice to use the inbuilt write triggers as described [in the 
docs](https://iceberg.apache.org/docs/latest/spark-structured-streaming/#streaming-writes)
 but we are happy with a working solution. and this allows us to add MERGE 
behaviour in with SQL.
   > 
   > Hope someone else finds this useful!
   
   But this way there is no checkpointLocation. How do you manage the offset?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to