RussellSpitzer commented on code in PR #4588:
URL: https://github.com/apache/iceberg/pull/4588#discussion_r888122799


##########
spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java:
##########
@@ -152,9 +220,53 @@ public StructLikeSet rowSet(String name, Table table, 
String... columns) {
       set.add(rowWrapper.wrap(row));
     });
 
+    extractDeleteCount();
     return set;
   }
 
+  private void extractDeleteCount() {
+    // Get the executionId of the query we just executed
+    List<Long> executed = listener.executed();
+    long lastExecuted = executed.get(executed.size() - 1);
+    // Ensure that the execution end was registered

Review Comment:
   Oh this is great, Looking into the apis you dug up I think there may be an 
even easier way to do this. Sorry I didn't realize you could do this before
   
   Looking into sharedState you can do something like
   ```scala
   // Some operation
   scala> spark.range(1, 100).withColumn("x", 
col("id")).withColumnRenamed("id", "y").writeTo("local.default.test").append
   
   // UI Metrics
   scala> spark.sharedState.statusStore.executionsList.last.metrics
   res41: Seq[org.apache.spark.sql.execution.ui.SQLPlanMetric] = 
List(SQLPlanMetric(number of output rows,25,sum), 
SQLPlanMetric(duration,24,timing))
   
   // Description
   scala> spark.sharedState.statusStore.executionsList.last.description
   res42: String = append at <console>:23
   
   // Execution id
   scala> spark.sharedState.statusStore.executionsList.last.executionId
   res47: Long = 6
   ```
   
   So this block could be like
   ```java
     private long lastExecutedMetric(String metricName) {
       SQLExecutionUIData lastExecution = 
spark.sharedState().statusStore().executionsList().last();
       Option<SQLPlanMetric> sqlPlanMetric = 
lastExecution.metrics().find(metric -> metric.name().equals(metricName));
       Assert.assertTrue("Metric xxxx not present in last execution", 
sqlPlanMetric.isDefined());
       long metricId = sqlPlanMetric.get().accumulatorId();
   
       // Refresh metricValues, they will remain null until the execution is 
complete and metrics are aggregated
       int attempts = 3;
       while (lastExecution.metricValues() == null && attempts > 0) {
         try {
           Thread.sleep(100);
           attempts -= 1;
         } catch (InterruptedException e) {
           throw new RuntimeException(e);
         }
         lastExecution = 
spark.sharedState().statusStore().execution(lastExecution.executionId()).get();
       }
       Assert.assertNotNull(String.format("Metric %s was never finalized", 
metricName), lastExecution.metricValues());
       return Long.parseLong(lastExecution.metricValues().get(metricId).get());
     }
     ```
     
     Then we just have
     ```
       @Override
     protected void checkDeleteCount(long expectedDeletes) {
       Assert.assertEquals("Unexpected number of deletes recorded", 
expectedDeletes,
           lastExecutedMetric(NumDeletes.DISPLAY_STRING));
     }
     ```
     
     And we can drop the other state in the test class.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to