wypoon commented on code in PR #4588:
URL: https://github.com/apache/iceberg/pull/4588#discussion_r855383946


##########
spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java:
##########
@@ -139,18 +151,55 @@ protected void dropTable(String name) {
   }
 
   @Override
-  public StructLikeSet rowSet(String name, Table table, String... columns) {
-    Dataset<Row> df = spark.read()
-        .format("iceberg")
-        .load(TableIdentifier.of("default", name).toString())
-        .selectExpr(columns);
+  protected boolean countDeletes() {
+    return true;
+  }
 
-    Types.StructType projection = table.schema().select(columns).asStruct();
-    StructLikeSet set = StructLikeSet.create(projection);
-    df.collectAsList().forEach(row -> {
-      SparkStructLike rowWrapper = new SparkStructLike(projection);
-      set.add(rowWrapper.wrap(row));
-    });
+  private void setDeleteCount(long count) {
+    deleteCount = count;
+  }
+
+  @Override
+  protected long deleteCount() {
+    return deleteCount;
+  }
+
+  @Override
+  public StructLikeSet rowSet(String name, Table table, String... columns) 
throws IOException {
+    Schema schema = table.schema().select(columns);
+    StructType sparkSchema = SparkSchemaUtil.convert(schema);
+    Types.StructType type = schema.asStruct();
+    StructLikeSet set = StructLikeSet.create(type);
+
+    CloseableIterable<CombinedScanTask> tasks = TableScanUtil.planTasks(
+        table.newScan().planFiles(),
+        TableProperties.METADATA_SPLIT_SIZE_DEFAULT,
+        TableProperties.SPLIT_LOOKBACK_DEFAULT,
+        TableProperties.SPLIT_OPEN_FILE_COST_DEFAULT);
+
+    long delCount = 0L;

Review Comment:
   I could replace 
   ```
       CloseableIterable<CombinedScanTask> tasks = TableScanUtil.planTasks(
           table.newScan().planFiles(),
           TableProperties.METADATA_SPLIT_SIZE_DEFAULT,
           TableProperties.SPLIT_LOOKBACK_DEFAULT,
           TableProperties.SPLIT_OPEN_FILE_COST_DEFAULT);
   ```
   with
   ```
       SparkScanBuilder scanBuilder = new SparkScanBuilder(spark, table, 
CaseInsensitiveStringMap.empty());
       scanBuilder.pruneColumns(sparkSchema);
       SparkScan scan = (SparkScan) scanBuilder.buildMergeOnReadScan();
       List<CombinedScanTask> tasks = scan.tasks();
   ```
   That is the path that a Spark read takes. I am also doing most of the rest 
of what Spark does once we have the tasks and hand them to Spark in the form of 
`InputPartition`s, which is in each `InputPartition`, get a `PartitionReader` 
(which is a subclass of either `BatchDataReader` or `RowDataReader`) and 
iterate through the reader. I just don't assemble the `InternalRow`s into a 
`DataFrame`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to