Khang Pham created SPARK-33232:
----------------------------------

             Summary: ConcurrentAppendException while updating delta lake table
                 Key: SPARK-33232
                 URL: https://issues.apache.org/jira/browse/SPARK-33232
             Project: Spark
          Issue Type: Bug
          Components: Structured Streaming
    Affects Versions: 3.0.0
            Reporter: Khang Pham


I have two Spark Streaming job run concurrently. 
 * Stream join Job: join in Kafka Stream with another stream from Amazon SQS. 
The result will be appended into delta lake table A
 * Upsert job: read from one Delta lake table B and update table A when there 
are matching ID. 

Environment:
 * Databricks cloud run time 7.2, Spark 3.0.0

 

The stream join job works fine but the Upsert job kept failing. 

 

Stack trace:

com.databricks.sql.transaction.tahoe.ConcurrentAppendException: Files were 
added to partition [dt=2020-xx-yy, request_hour=2020-xx-yy 23:00:00] by a 
concurrent update.

Please try the operation again. Conflicting commit: 
\{"timestamp":1603477588946,"userId":"xxxxx","operation":"OPTIMIZE","operationParameters":{"predicate":[],"zOrderBy":[],"batchId":0,"auto":true},"job":\{"jobId":"xxxxx","jobName":"Streaming
 join 
xxx","runId":"xxx","jobOwnerId":"xxxx","triggerType":"manual"},"notebook":\{"notebookId":"xxxx"},"clusterId":"xxxxxx","readVersion":2222222222,"isolationLevel":"SnapshotIsolation","isBlindAppend":false,"operationMetrics":\{"numRemovedFiles":"444444","numRemovedBytes":"64455537","p25FileSize":"63341646","minFileSize":"63341646","numAddedFiles":"1","maxFileSize":"63341646","p75FileSize":"63341646","p50FileSize":"63341646","numAddedBytes":"63341646"}}

 

Table A these setting: 

- 'delta.isolationLevel' = 'WriteSerializable'

- spark.databricks.delta.optimizeWrite.enable = True

- spark.databricks.delta.autoCompact.enabled = True

 

Other settings:

spark.databricks.io.cache.compression.enabled true

stateStore = rocksdb

spark.sql.adaptive.enabled true

spark.sql.adaptive.skewJoin.enabled true

 

I already set IsolationLevel to WriteSerializable to handle 
ConcurrentAppendingException as described in 
[https://docs.databricks.com/delta/optimizations/isolation-level.html]  

 

However the error says "SnapshotIsolation". 

 

What did I miss? 

 

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to