[
https://issues.apache.org/jira/browse/SPARK-22504?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16255353#comment-16255353
]
Sean Owen commented on SPARK-22504:
-----------------------------------
You're overlooking the new problem by just saying 'we can drop the temp table'.
What if that fails? after all, that is the premise of your problem here in the
first place, that part of a multi-step process outside a transaction fails.
Your change does not make the operation any more atomic, just changes the
failure mode.
You are right that this is not like an RDBMS because there is no notion of
"update". So "insert overwrite" becomes equivalent to "drop" and "insert", or
else, the implementation would be to "delete" and "insert", perhaps.
> Optimization in overwrite table in case of failure
> --------------------------------------------------
>
> Key: SPARK-22504
> URL: https://issues.apache.org/jira/browse/SPARK-22504
> Project: Spark
> Issue Type: Improvement
> Components: SQL
> Affects Versions: 2.1.0
> Reporter: xuchuanyin
>
> Optimization in overwrite table in case of failure
> # SCENARIO
> Currently, `Overwrite` operation in spark is performed by following steps:
> 1. DROP : drop old table
> 2. WRITE: create and write data into new table
> If some runtime error occurs in Step2, then the origin table will be lost
> along with its data -- I think this will be a serious problem if someone
> perform `read-update-flushback` actions. The problem can be reproduced by the
> following code:
> ```scala
> 01: test("test spark df overwrite failed") {
> 02: // prepare table
> 03: val tableName = "test_spark_overwrite_failed"
> 04: sql(s"DROP TABLE IF EXISTS $tableName")
> 05: sql(s"CREATE TABLE IF NOT EXISTS $tableName ( field_int int,
> field_string String)" +
> 06: s" STORED AS parquet").collect()
> 07:
> 08: // load data first
> 09: val schema = StructType(
> 10: Seq(StructField("field_int", DataTypes.IntegerType, nullable =
> false),
> 11: StructField("field_string", DataTypes.StringType, nullable =
> false)))
> 12: val rdd1 = sqlContext.sparkContext.parallelize(
> 13: Row(20, "q") ::
> 14: Row(21, "qw") ::
> 15: Row(23, "qwe") :: Nil)
> 16: val dataFrame = sqlContext.createDataFrame(rdd1, schema)
> 17:
> dataFrame.write.format("parquet").mode(SaveMode.Overwrite).saveAsTable(tableName)
> 18: sql(s"SELECT * FROM $tableName").show()
> 19:
> 20: // load data again, the following data will cause failure in data
> loading
> 21: try {
> 22: val rdd2 = sqlContext.sparkContext.parallelize(
> 23: Row(31, "qwer") ::
> 24: Row(null, "qwer") ::
> 25: Row(32, "long_than_5") :: Nil)
> 26: val dataFrame2 = sqlContext.createDataFrame(rdd2, schema)
> 27:
> 28:
> dataFrame2.write.format("parquet").mode(SaveMode.Overwrite).saveAsTable(tableName)
> 29: } catch {
> 30: case e: Exception => LOGGER.error(e, "write overwrite failure")
> 31: }
> 32: // table `test_spark_overwrite_failed` has been dropped
> 33: sql(s"show tables").show(20, truncate = false)
> 34: // the content is empty even if table exists. We want it to be the
> same as
> 35: sql(s"SELECT * FROM $tableName").show()
> 36: }
> ```
> In Line24, we creata a `null` element while the schema is `notnull` -- This
> will cause runtime error in loading data.
> In Line33, table `test_spark_overwrite_failed` has already been dropped and
> no longger exists in the current table. And of course Line35 will fail.
> Instead, we want Line35 to show the origin data just as Line18.
> # ANALYZE
> I am thinking of optimizing `overwrite` in spark -- The goal is to keep the
> old data until the load has finished successfully. The old data can only be
> cleaned when the load is successful.
> Since sparksql already support `rename` operation, we can optimize
> `overwrite` in the following steps:
> 1. WRITE: create and write data to tempTable
> 2. SWAP: swap temptable1 with targetTable by using rename operation
> 3. CLEAN: clean up old data
> If step1 works fine, then swap tempTable with targetTable and clean up old
> data; otherwise, keep the target table not changed.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]