This is an automated email from the ASF dual-hosted git repository.
cloud-fan pushed a commit to branch branch-4.2
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-4.2 by this push:
new 699f10bf36cf [SPARK-56743][SQL] Use SQLLastAttemptMetric for DSv2
UPDATE/DELETE/MERGE metrics
699f10bf36cf is described below
commit 699f10bf36cf56230665b58eab8bd895d09221b5
Author: Juliusz Sompolski <[email protected]>
AuthorDate: Thu May 21 02:12:49 2026 +0800
[SPARK-56743][SQL] Use SQLLastAttemptMetric for DSv2 UPDATE/DELETE/MERGE
metrics
### What changes were proposed in this pull request?
Switches the DSv2 row-level operation metrics that count rows produced by
executor tasks to use `SQLLastAttemptMetric` instead of plain `SQLMetric`, so
the values surfaced in `UpdateSummary`, `DeleteSummary`, and `MergeSummary` are
stable across stage retries.
Specifically:
- In `RowLevelWriteExec.sparkMetrics`, the UPDATE and DELETE branches now
construct their row counters (`numUpdatedRows` / `numDeletedRows` /
`numCopiedRows`) via `SQLLastAttemptMetrics.createMetric`. Increment paths are
unchanged (`SQLLastAttemptMetric extends SQLMetric`).
- In `MergeRowsExec.metrics`, all 8 row counters (`numTargetRowsCopied`,
`numTargetRowsInserted`, `numTargetRowsDeleted`, `numTargetRowsUpdated`, and
the matched / not-matched-by-source splits) are switched to SLAM. Both the
interpreted (`longMetric("...") += 1`) and codegen (`metricTerm(...).add(1)`)
increment paths work unchanged.
- `BatchScanExec` overrides `sparkMetrics` so `numOutputRows` becomes SLAM
only when the scan reads on behalf of a row-level DELETE (i.e. `table` is
`RowLevelOperationTable` with `operation.command() == DELETE`). This is needed
because group-based DELETE derives `numDeletedRows = numScannedRows -
numCopiedRows` on the driver in `ReplaceDataExec.getWriteSummary`; if either
input is overcounted the difference is overcounted in lockstep on a
writer-stage retry. Other scans (regular reads [...]
- `RowLevelWriteExec.getMetricValue` now reads via
`lastAttemptValueForHighestRDDId()` for `SQLLastAttemptMetric`, falling back to
`slam.value` if SLAM bailed out. This handles the UPDATE/DELETE summary paths,
the MERGE summary path (`MergeRowsExec.metrics`), and the scan side
`numOutputRows` referenced from `ReplaceDataExec.getWriteSummary` uniformly.
The driver-side `metrics("numDeletedRows").set(...)` in
`ReplaceDataExec.getWriteSummary` continues to work after the swap because for
the group-based DELETE path that metric is only updated on the driver, so SLAM
records it as a driver value and surfaces it via
`lastAttemptValueForHighestRDDId()` without bailing out.
### Why are the changes needed?
`SQLMetric.value` aggregates increments from every task attempt that ever
ran, so on a stage retry the row counts double up. The values flow into the
connector-visible `WriteSummary` (and downstream into operator metrics
consumers such as Delta's invariant checks), so an inflated count mis-reports
what the operation actually did. `SQLLastAttemptMetric` reports only the last
attempt's contribution and so gives the row count that matches what was
actually committed.
### Does this PR introduce _any_ user-facing change?
The values surfaced via `UpdateSummary.numUpdatedRows / numCopiedRows`,
`DeleteSummary.numDeletedRows / numCopiedRows`, and
`MergeSummary.numTargetRows*` (i.e. what the connector receives from
`BatchWrite.commit(messages, summary)`) will, in the presence of stage retries,
be the row counts from the last attempt rather than the sum across all
attempts. With no retries, behavior is unchanged. The metric names, display
strings, and presence in the SQL UI are unchanged.
Note: the SQL UI still shows the raw accumulator value (`SQLMetric.value`),
which on stage retries is the sum across all task attempts and therefore
overcounts. Only the values passed to the connector via `WriteSummary` are
SLAM-corrected. Making the SQL UI also display the last-attempt value is a
bigger follow-up that would touch the SQL UI's metric collection pipeline.
### How was this patch tested?
Existing tests in `UpdateTableSuiteBase`, `DeleteFromTableSuiteBase`, and
the merge suites exercise the metric values; they continue to pass since SLAM
and `SQLMetric` report the same value when there are no stage retries.
Three new tests cover the retry behavior directly, one in each of
`UpdateTableSuiteBase`, `DeleteFromTableSuiteBase`, and
`MergeIntoTableSuiteBase`, all named "metric values are stable across stage
retries". Each runs the operation with an `IN`-subquery (or a join, for MERGE)
to force a shuffle, sets `spark.sql.autoBroadcastJoinThreshold = -1` so the
join doesn't get broadcast away (AQE is left at its default), and then enables
`spark.test.injectShuffleFetchFailures`. The DAGScheduler [...]
The injected retries are visible in the test logs as `FetchFailedException`
/ `MetadataFetchFailedException`. The summary-value assertions only pass
because the new SLAM-aware reader returns the last-attempt value rather than
the doubled raw accumulator value.
### Was this patch authored or co-authored using generative AI tooling?
Generated-by: Claude Opus 4.7
Closes #55711 from juliuszsompolski/dsv2-dml-slam.
Lead-authored-by: Juliusz Sompolski
<[email protected]>
Co-authored-by: Juliusz Sompolski <Juliusz Sompolski>
Co-authored-by: Wenchen Fan <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
(cherry picked from commit 34892381f057e9db089ca456bb6bdd7814518914)
Signed-off-by: Wenchen Fan <[email protected]>
---
.../execution/datasources/v2/BatchScanExec.scala | 17 +++++++
.../execution/datasources/v2/MergeRowsExec.scala | 18 ++++----
.../datasources/v2/WriteToDataSourceV2Exec.scala | 25 +++++++---
.../sql/connector/DeleteFromTableSuiteBase.scala | 42 +++++++++++++++++
.../sql/connector/MergeIntoTableSuiteBase.scala | 53 ++++++++++++++++++++++
.../spark/sql/connector/UpdateTableSuiteBase.scala | 42 +++++++++++++++++
6 files changed, 181 insertions(+), 16 deletions(-)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExec.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExec.scala
index e9a18833ed9a..5f18e7637521 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExec.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExec.scala
@@ -28,6 +28,9 @@ import
org.apache.spark.sql.catalyst.plans.physical.{KeyedPartitioning, SinglePa
import org.apache.spark.sql.catalyst.util.{truncatedString,
InternalRowComparableWrapper}
import org.apache.spark.sql.connector.catalog.Table
import org.apache.spark.sql.connector.read._
+import org.apache.spark.sql.connector.write.RowLevelOperation.Command.DELETE
+import org.apache.spark.sql.connector.write.RowLevelOperationTable
+import org.apache.spark.sql.execution.metric.{SQLLastAttemptMetrics,
SQLMetric, SQLMetrics}
import org.apache.spark.util.ArrayImplicits._
/**
@@ -44,6 +47,20 @@ case class BatchScanExec(
@transient lazy val batch: Batch = if (scan == null) null else scan.toBatch
+ override protected lazy val sparkMetrics: Map[String, SQLMetric] = {
+ val name = "number of output rows"
+ val metric = table match {
+ // Use SLAM for the scan-output count when this scan reads on behalf of
a row-level DELETE,
+ // so that the driver-side derivation `numDeletedRows = numScannedRows -
numCopiedRows` in
+ // `ReplaceDataExec.getWriteSummary` stays correct under stage retries.
+ case rlot: RowLevelOperationTable if rlot.operation.command() == DELETE
=>
+ SQLLastAttemptMetrics.createMetric(sparkContext, name)
+ case _ =>
+ SQLMetrics.createMetric(sparkContext, name)
+ }
+ Map("numOutputRows" -> metric)
+ }
+
// TODO: unify the equal/hashCode implementation for all data source v2
query plans.
override def equals(other: Any): Boolean = other match {
case other: BatchScanExec =>
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/MergeRowsExec.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/MergeRowsExec.scala
index 526ff843a149..67212de165e9 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/MergeRowsExec.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/MergeRowsExec.scala
@@ -36,7 +36,7 @@ import
org.apache.spark.sql.catalyst.plans.logical.MergeRows.{Context, Copy, Del
import org.apache.spark.sql.catalyst.util.truncatedString
import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.sql.execution.{CodegenSupport, SparkPlan,
UnaryExecNode}
-import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
+import org.apache.spark.sql.execution.metric.{SQLLastAttemptMetrics, SQLMetric}
import org.apache.spark.sql.types.BooleanType
case class MergeRowsExec(
@@ -50,21 +50,21 @@ case class MergeRowsExec(
child: SparkPlan) extends UnaryExecNode with CodegenSupport {
override lazy val metrics: Map[String, SQLMetric] = Map(
- "numTargetRowsCopied" -> SQLMetrics.createMetric(sparkContext,
+ "numTargetRowsCopied" -> SQLLastAttemptMetrics.createMetric(sparkContext,
"number of target rows copied unmodified because they did not match any
action"),
- "numTargetRowsInserted" -> SQLMetrics.createMetric(sparkContext,
+ "numTargetRowsInserted" -> SQLLastAttemptMetrics.createMetric(sparkContext,
"number of target rows inserted"),
- "numTargetRowsDeleted" -> SQLMetrics.createMetric(sparkContext,
+ "numTargetRowsDeleted" -> SQLLastAttemptMetrics.createMetric(sparkContext,
"number of target rows deleted"),
- "numTargetRowsUpdated" -> SQLMetrics.createMetric(sparkContext,
+ "numTargetRowsUpdated" -> SQLLastAttemptMetrics.createMetric(sparkContext,
"number of target rows updated"),
- "numTargetRowsMatchedUpdated" -> SQLMetrics.createMetric(sparkContext,
+ "numTargetRowsMatchedUpdated" ->
SQLLastAttemptMetrics.createMetric(sparkContext,
"number of target rows updated by a matched clause"),
- "numTargetRowsMatchedDeleted" -> SQLMetrics.createMetric(sparkContext,
+ "numTargetRowsMatchedDeleted" ->
SQLLastAttemptMetrics.createMetric(sparkContext,
"number of target rows deleted by a matched clause"),
- "numTargetRowsNotMatchedBySourceUpdated" ->
SQLMetrics.createMetric(sparkContext,
+ "numTargetRowsNotMatchedBySourceUpdated" ->
SQLLastAttemptMetrics.createMetric(sparkContext,
"number of target rows updated by a not matched by source clause"),
- "numTargetRowsNotMatchedBySourceDeleted" ->
SQLMetrics.createMetric(sparkContext,
+ "numTargetRowsNotMatchedBySourceDeleted" ->
SQLLastAttemptMetrics.createMetric(sparkContext,
"number of target rows deleted by a not matched by source clause"))
@transient override lazy val producedAttributes: AttributeSet = {
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala
index 5d8b5a081c90..33709fbd5f5a 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala
@@ -38,7 +38,7 @@ import
org.apache.spark.sql.connector.write.RowLevelOperation.Command._
import org.apache.spark.sql.errors.{QueryCompilationErrors,
QueryExecutionErrors}
import org.apache.spark.sql.execution.{QueryExecution, SparkPlan,
SQLExecution, UnaryExecNode}
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
-import org.apache.spark.sql.execution.metric.{CustomMetrics, SQLMetric,
SQLMetrics}
+import org.apache.spark.sql.execution.metric.{CustomMetrics,
SQLLastAttemptMetric, SQLLastAttemptMetrics, SQLMetric, SQLMetrics}
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.util.SchemaValidationMode.PROHIBIT_CHANGES
import org.apache.spark.util.ArrayImplicits._
@@ -512,20 +512,31 @@ trait RowLevelWriteExec extends V2ExistingTableWriteExec {
rowLevelCommand match {
case UPDATE =>
Map(
- "numUpdatedRows" -> SQLMetrics.createMetric(sparkContext, "number of
updated rows"),
- "numCopiedRows" -> SQLMetrics.createMetric(sparkContext, "number of
copied rows"))
+ "numUpdatedRows" ->
+ SQLLastAttemptMetrics.createMetric(sparkContext, "number of
updated rows"),
+ "numCopiedRows" ->
+ SQLLastAttemptMetrics.createMetric(sparkContext, "number of copied
rows"))
case DELETE =>
Map(
- "numDeletedRows" -> SQLMetrics.createMetric(sparkContext, "number of
deleted rows"),
- "numCopiedRows" -> SQLMetrics.createMetric(sparkContext, "number of
copied rows"))
+ "numDeletedRows" ->
+ SQLLastAttemptMetrics.createMetric(sparkContext, "number of
deleted rows"),
+ "numCopiedRows" ->
+ SQLLastAttemptMetrics.createMetric(sparkContext, "number of copied
rows"))
case _ => Map.empty
})
/**
- * Returns the value of the named metric, or -1 if the metric is not found.
+ * Returns the value of the named metric, or -1 if the metric is not found.
For
+ * [[SQLLastAttemptMetric]] values, prefers the last-attempt value so the
result is stable across
+ * stage retries; falls back to the regular accumulator value if the
last-attempt value is
+ * unavailable (e.g. the accumulator bailed out).
*/
protected def getMetricValue(metrics: Map[String, SQLMetric], name: String):
Long = {
- metrics.get(name).map(_.value).getOrElse(-1L)
+ metrics.get(name).map {
+ case slam: SQLLastAttemptMetric =>
+ slam.lastAttemptValueForHighestRDDId().getOrElse(slam.value)
+ case m => m.value
+ }.getOrElse(-1L)
}
override protected def getWriteSummary(): Option[WriteSummary] = {
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DeleteFromTableSuiteBase.scala
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DeleteFromTableSuiteBase.scala
index 89e3ce503fed..b894d5d75b3c 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DeleteFromTableSuiteBase.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DeleteFromTableSuiteBase.scala
@@ -17,6 +17,7 @@
package org.apache.spark.sql.connector
+import org.apache.spark.internal.config
import org.apache.spark.sql.{AnalysisException, Row}
import org.apache.spark.sql.catalyst.expressions.CheckInvariant
import org.apache.spark.sql.catalyst.plans.logical.Filter
@@ -24,6 +25,7 @@ import org.apache.spark.sql.connector.catalog.{Aborted,
Committed}
import org.apache.spark.sql.connector.catalog.InMemoryTable
import org.apache.spark.sql.connector.write.DeleteSummary
import org.apache.spark.sql.execution.datasources.v2.{DeleteFromTableExec,
ReplaceDataExec, WriteDeltaExec}
+import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources
abstract class DeleteFromTableSuiteBase extends RowLevelOperationSuiteBase {
@@ -425,6 +427,46 @@ abstract class DeleteFromTableSuiteBase extends
RowLevelOperationSuiteBase {
}
}
+ test("metric values are stable across stage retries") {
+ // Force a shuffle in the DELETE plan via an IN-subquery (with broadcast
disabled), then
+ // have the DAGScheduler corrupt the first attempt of every upstream
shuffle map stage.
+ // The scan-side numOutputRows doubles up across attempts, and the
driver-side derivation
+ // numDeletedRows = numScannedRows - numCopiedRows in
`ReplaceDataExec.getWriteSummary`
+ // propagates that doubling into `DeleteSummary`. With
SQLLastAttemptMetric on the scan,
+ // the surfaced numDeletedRows stays correct. (The current fetch-failure
injection does
+ // not retry the writer stage, so writer-side numCopiedRows isn't actually
exercised
+ // here; follow-up #55738 will fill that gap.)
+ withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") {
+ withTempView("source") {
+ createAndInitTable("pk INT NOT NULL, salary INT, dep STRING",
+ """{ "pk": 1, "salary": 100, "dep": "hr" }
+ |{ "pk": 2, "salary": 200, "dep": "software" }
+ |{ "pk": 3, "salary": 300, "dep": "hr" }
+ |{ "pk": 4, "salary": 400, "dep": "software" }
+ |""".stripMargin)
+
+ val sourceDF = Seq(1, 2).toDF("pk")
+ sourceDF.createOrReplaceTempView("source")
+
+ withSparkContextConf(
+ config.Tests.INJECT_SHUFFLE_FETCH_FAILURES.key -> "true") {
+ sql(
+ s"""DELETE FROM $tableNameAsString
+ |WHERE pk IN (SELECT pk FROM source)
+ |""".stripMargin)
+ }
+
+ checkDeleteMetrics(numDeletedRows = 2, numCopiedRows = 2)
+
+ checkAnswer(
+ sql(s"SELECT * FROM $tableNameAsString"),
+ Seq(
+ Row(3, 300, "hr"),
+ Row(4, 400, "software")))
+ }
+ }
+ }
+
test("delete with NOT IN subqueries") {
withTempView("deleted_id", "deleted_dep") {
createAndInitTable("pk INT NOT NULL, id INT, dep STRING",
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/connector/MergeIntoTableSuiteBase.scala
b/sql/core/src/test/scala/org/apache/spark/sql/connector/MergeIntoTableSuiteBase.scala
index b902074b547c..f37a614f99b5 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/connector/MergeIntoTableSuiteBase.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/connector/MergeIntoTableSuiteBase.scala
@@ -18,6 +18,7 @@
package org.apache.spark.sql.connector
import org.apache.spark.SparkRuntimeException
+import org.apache.spark.internal.config
import org.apache.spark.sql.{AnalysisException, Row}
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, EqualTo,
In, Not}
import org.apache.spark.sql.catalyst.optimizer.BuildLeft
@@ -2693,6 +2694,58 @@ abstract class MergeIntoTableSuiteBase extends
RowLevelOperationSuiteBase
}
}
+ test("metric values are stable across stage retries") {
+ // The join in the MERGE plan introduces a shuffle (with broadcast
disabled), and the
+ // DAGScheduler corrupts the first attempt of every upstream shuffle map
stage. Note:
+ // the current fetch-failure injection does not retry the
MergeRowsExec/writer stage,
+ // so this test passes equally well with plain SQLMetric — it only
exercises the
+ // SLAM-aware read path. Follow-up #55738 will add infra to actually retry
the writer
+ // stage and exercise the SLAM behavior end-to-end for MERGE.
+ withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") {
+ withTempView("source") {
+ createAndInitTable("pk INT NOT NULL, salary INT, dep STRING",
+ """{ "pk": 1, "salary": 100, "dep": "hr" }
+ |{ "pk": 2, "salary": 200, "dep": "software" }
+ |{ "pk": 3, "salary": 300, "dep": "hr" }
+ |""".stripMargin)
+
+ val sourceDF = Seq(1, 2, 10).toDF("pk")
+ sourceDF.createOrReplaceTempView("source")
+
+ withSparkContextConf(
+ config.Tests.INJECT_SHUFFLE_FETCH_FAILURES.key -> "true") {
+ sql(
+ s"""MERGE INTO $tableNameAsString t
+ |USING source s
+ |ON t.pk = s.pk
+ |WHEN MATCHED THEN
+ | UPDATE SET salary = salary + 100
+ |WHEN NOT MATCHED THEN
+ | INSERT (pk, salary, dep) VALUES (s.pk, 999, 'unknown')
+ |""".stripMargin)
+ }
+
+ val mergeSummary = getMergeSummary()
+ assert(mergeSummary.numTargetRowsUpdated === 2L)
+ assert(mergeSummary.numTargetRowsMatchedUpdated === 2L)
+ assert(mergeSummary.numTargetRowsInserted === 1L)
+ assert(mergeSummary.numTargetRowsCopied === (if (deltaMerge) 0L else
1L))
+ assert(mergeSummary.numTargetRowsDeleted === 0L)
+ assert(mergeSummary.numTargetRowsMatchedDeleted === 0L)
+ assert(mergeSummary.numTargetRowsNotMatchedBySourceUpdated === 0L)
+ assert(mergeSummary.numTargetRowsNotMatchedBySourceDeleted === 0L)
+
+ checkAnswer(
+ sql(s"SELECT pk, salary FROM $tableNameAsString ORDER BY pk"),
+ Seq(
+ Row(1, 200),
+ Row(2, 300),
+ Row(3, 300),
+ Row(10, 999)))
+ }
+ }
+ }
+
test("SPARK-55074: imerge with type coercion from INT to STRING") {
// INT -> STRING is allowed in ANSI mode, merge should succeed via type
coercion
// without requiring schema evolution
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/connector/UpdateTableSuiteBase.scala
b/sql/core/src/test/scala/org/apache/spark/sql/connector/UpdateTableSuiteBase.scala
index c5264ca87a70..6e9afe7abc97 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/connector/UpdateTableSuiteBase.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/connector/UpdateTableSuiteBase.scala
@@ -18,6 +18,7 @@
package org.apache.spark.sql.connector
import org.apache.spark.SparkRuntimeException
+import org.apache.spark.internal.config
import org.apache.spark.sql.{sources, AnalysisException, Row}
import org.apache.spark.sql.connector.catalog.{Aborted, Column,
ColumnDefaultValue, Committed, InMemoryTable, TableChange, TableInfo}
import org.apache.spark.sql.connector.expressions.{GeneralScalarExpression,
LiteralValue}
@@ -340,6 +341,47 @@ abstract class UpdateTableSuiteBase extends
RowLevelOperationSuiteBase {
checkUpdateMetrics(numUpdatedRows = 2, numCopiedRows = 1)
}
+ test("metric values are stable across stage retries") {
+ // Force a shuffle in the UPDATE plan via an IN-subquery (with broadcast
disabled), then
+ // have the DAGScheduler corrupt the first attempt of every upstream
shuffle map stage.
+ // Note: the current fetch-failure injection does not retry the writer
stage, so this
+ // test passes equally well with plain SQLMetric — it only exercises the
SLAM-aware
+ // read path. Follow-up #55738 will add infra to actually retry the writer
stage and
+ // exercise the SLAM behavior end-to-end for UPDATE.
+ withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") {
+ withTempView("source") {
+ createAndInitTable("pk INT NOT NULL, salary INT, dep STRING",
+ """{ "pk": 1, "salary": 100, "dep": "hr" }
+ |{ "pk": 2, "salary": 200, "dep": "software" }
+ |{ "pk": 3, "salary": 300, "dep": "hr" }
+ |{ "pk": 4, "salary": 400, "dep": "software" }
+ |""".stripMargin)
+
+ val sourceDF = Seq(1, 2).toDF("pk")
+ sourceDF.createOrReplaceTempView("source")
+
+ withSparkContextConf(
+ config.Tests.INJECT_SHUFFLE_FETCH_FAILURES.key -> "true") {
+ sql(
+ s"""UPDATE $tableNameAsString
+ |SET salary = salary + 100
+ |WHERE pk IN (SELECT pk FROM source)
+ |""".stripMargin)
+ }
+
+ checkUpdateMetrics(numUpdatedRows = 2, numCopiedRows = 2)
+
+ checkAnswer(
+ sql(s"SELECT * FROM $tableNameAsString"),
+ Seq(
+ Row(1, 200, "hr"),
+ Row(2, 300, "software"),
+ Row(3, 300, "hr"),
+ Row(4, 400, "software")))
+ }
+ }
+ }
+
test("update nested struct fields") {
createAndInitTable(
s"""pk INT NOT NULL,
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]