[
https://issues.apache.org/jira/browse/SPARK-22504?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16249502#comment-16249502
]
Sean Owen commented on SPARK-22504:
-----------------------------------
Removing the original table isn't a problem; the user asked for that.
Your suggestion just swaps it out for a new problem: how do you ensure the new
table is cleaned up in case of failure? how do you make sure the old table is
deleted? what about the implications of having two of the tables' storage at
once?
I think the current semantics are correct and as expected in case of a failure.
> Optimization in overwrite table in case of failure
> --------------------------------------------------
>
> Key: SPARK-22504
> URL: https://issues.apache.org/jira/browse/SPARK-22504
> Project: Spark
> Issue Type: Improvement
> Components: SQL
> Affects Versions: 2.1.0
> Reporter: xuchuanyin
>
> Optimization in overwrite table in case of failure
> # SCENARIO
> Currently, `Overwrite` operation in spark is performed by following steps:
> 1. DROP : drop old table
> 2. WRITE: create and write data into new table
> If some runtime error occurs in Step2, then the origin table will be lost
> along with its data -- I think this will be a serious problem if someone
> perform `read-update-flushback` actions. The problem can be reproduced by the
> following code:
> ```scala
> 01: test("test spark df overwrite failed") {
> 02: // prepare table
> 03: val tableName = "test_spark_overwrite_failed"
> 04: sql(s"DROP TABLE IF EXISTS $tableName")
> 05: sql(s"CREATE TABLE IF NOT EXISTS $tableName ( field_int int,
> field_string String)" +
> 06: s" STORED AS parquet").collect()
> 07:
> 08: // load data first
> 09: val schema = StructType(
> 10: Seq(StructField("field_int", DataTypes.IntegerType, nullable =
> false),
> 11: StructField("field_string", DataTypes.StringType, nullable =
> false)))
> 12: val rdd1 = sqlContext.sparkContext.parallelize(
> 13: Row(20, "q") ::
> 14: Row(21, "qw") ::
> 15: Row(23, "qwe") :: Nil)
> 16: val dataFrame = sqlContext.createDataFrame(rdd1, schema)
> 17:
> dataFrame.write.format("parquet").mode(SaveMode.Overwrite).saveAsTable(tableName)
> 18: sql(s"SELECT * FROM $tableName").show()
> 19:
> 20: // load data again, the following data will cause failure in data
> loading
> 21: try {
> 22: val rdd2 = sqlContext.sparkContext.parallelize(
> 23: Row(31, "qwer") ::
> 24: Row(null, "qwer") ::
> 25: Row(32, "long_than_5") :: Nil)
> 26: val dataFrame2 = sqlContext.createDataFrame(rdd2, schema)
> 27:
> 28:
> dataFrame2.write.format("parquet").mode(SaveMode.Overwrite).saveAsTable(tableName)
> 29: } catch {
> 30: case e: Exception => LOGGER.error(e, "write overwrite failure")
> 31: }
> 32: // table `test_spark_overwrite_failed` has been dropped
> 33: sql(s"show tables").show(20, truncate = false)
> 34: // the content is empty even if table exists. We want it to be the
> same as
> 35: sql(s"SELECT * FROM $tableName").show()
> 36: }
> ```
> In Line24, we creata a `null` element while the schema is `notnull` -- This
> will cause runtime error in loading data.
> In Line33, table `test_spark_overwrite_failed` has already been dropped and
> no longger exists in the current table. And of course Line35 will fail.
> Instead, we want Line35 to show the origin data just as Line18.
> # ANALYZE
> I am thinking of optimizing `overwrite` in spark -- The goal is to keep the
> old data until the load has finished successfully. The old data can only be
> cleaned when the load is successful.
> Since sparksql already support `rename` operation, we can optimize
> `overwrite` in the following steps:
> 1. WRITE: create and write data to tempTable
> 2. SWAP: swap temptable1 with targetTable by using rename operation
> 3. CLEAN: clean up old data
> If step1 works fine, then swap tempTable with targetTable and clean up old
> data; otherwise, keep the target table not changed.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]