RussellSpitzer commented on code in PR #4588:
URL: https://github.com/apache/iceberg/pull/4588#discussion_r888122799
##########
spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java:
##########
@@ -152,9 +220,53 @@ public StructLikeSet rowSet(String name, Table table,
String... columns) {
set.add(rowWrapper.wrap(row));
});
+ extractDeleteCount();
return set;
}
+ private void extractDeleteCount() {
+ // Get the executionId of the query we just executed
+ List<Long> executed = listener.executed();
+ long lastExecuted = executed.get(executed.size() - 1);
+ // Ensure that the execution end was registered
Review Comment:
Oh this is great, Looking into the apis you dug up I think there may be an
even easier way to do this. Sorry I didn't realize you could do this before
Looking into sharedState you can do something like
```scala
// Some operation
scala> spark.range(1, 100).withColumn("x",
col("id")).withColumnRenamed("id", "y").writeTo("local.default.test").append
// UI Metrics
scala> spark.sharedState.statusStore.executionsList.last.metrics
res41: Seq[org.apache.spark.sql.execution.ui.SQLPlanMetric] =
List(SQLPlanMetric(number of output rows,25,sum),
SQLPlanMetric(duration,24,timing))
// Description
scala> spark.sharedState.statusStore.executionsList.last.description
res42: String = append at <console>:23
// Execution id
scala> spark.sharedState.statusStore.executionsList.last.executionId
res47: Long = 6
```
So this block could be like
```java
private long lastExecutedMetric(String metricName) {
SQLExecutionUIData lastExecution =
spark.sharedState().statusStore().executionsList().last();
Option<SQLPlanMetric> sqlPlanMetric =
lastExecution.metrics().find(metric -> metric.name().equals(metricName));
Assert.assertTrue(String.format("Metric %s not present in last
execution", metricName), sqlPlanMetric.isDefined());
long metricId = sqlPlanMetric.get().accumulatorId();
// Refresh metricValues, they will remain null until the execution is
complete and metrics are aggregated
int attempts = 3;
while (lastExecution.metricValues() == null && attempts > 0) {
try {
Thread.sleep(100);
attempts -= 1;
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
lastExecution =
spark.sharedState().statusStore().execution(lastExecution.executionId()).get();
}
Assert.assertNotNull(String.format("Metric %s was never finalized",
metricName), lastExecution.metricValues());
return Long.parseLong(lastExecution.metricValues().get(metricId).get());
}
```
Then we just have
```java
@Override
protected void checkDeleteCount(long expectedDeletes) {
Assert.assertEquals("Unexpected number of deletes recorded",
expectedDeletes,
lastExecutedMetric(NumDeletes.DISPLAY_STRING));
}
```
And we can drop the other state in the test class.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]