openinx commented on a change in pull request #2047:
URL: https://github.com/apache/iceberg/pull/2047#discussion_r585563087



##########
File path: 
flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkScan.java
##########
@@ -113,68 +93,47 @@ public void before() throws IOException {
     File warehouseFile = TEMPORARY_FOLDER.newFolder();
     Assert.assertTrue(warehouseFile.delete());
     // before variables
-    Configuration conf = new Configuration();
     warehouse = "file:" + warehouseFile;
+    Configuration conf = new Configuration();
     catalog = new HadoopCatalog(conf, warehouse);
+    location = String.format("%s/%s/%s", warehouse, TestFixtures.DATABASE, 
TestFixtures.TABLE);
   }
 
-  private List<Row> runWithProjection(String... projected) throws IOException {
-    TableSchema.Builder builder = TableSchema.builder();
-    TableSchema schema = FlinkSchemaUtil.toSchema(FlinkSchemaUtil.convert(
-        catalog.loadTable(TableIdentifier.of("default", "t")).schema()));
-    for (String field : projected) {
-      TableColumn column = schema.getTableColumn(field).get();
-      builder.field(column.getName(), column.getType());
-    }
-    return run(FlinkSource.forRowData().project(builder.build()), 
Maps.newHashMap(), "", projected);
-  }
-
-  protected List<Row> runWithFilter(Expression filter, String sqlFilter) 
throws IOException {
-    FlinkSource.Builder builder = 
FlinkSource.forRowData().filters(Collections.singletonList(filter));
-    return run(builder, Maps.newHashMap(), sqlFilter, "*");
-  }
-
-  private List<Row> runWithOptions(Map<String, String> options) throws 
IOException {
-    FlinkSource.Builder builder = FlinkSource.forRowData();
-    Optional.ofNullable(options.get("snapshot-id")).ifPresent(value -> 
builder.snapshotId(Long.parseLong(value)));
-    Optional.ofNullable(options.get("start-snapshot-id"))
-        .ifPresent(value -> builder.startSnapshotId(Long.parseLong(value)));
-    Optional.ofNullable(options.get("end-snapshot-id"))
-        .ifPresent(value -> builder.endSnapshotId(Long.parseLong(value)));
-    Optional.ofNullable(options.get("as-of-timestamp"))
-        .ifPresent(value -> builder.asOfTimestamp(Long.parseLong(value)));
-    return run(builder, options, "", "*");
+  @After
+  public void after() throws IOException {
   }
 
-  private List<Row> run() throws IOException {
-    return run(FlinkSource.forRowData(), Maps.newHashMap(), "", "*");
+  protected TableLoader tableLoader() {
+    return TableLoader.fromHadoopTable(location);
   }
 
-  protected abstract List<Row> run(FlinkSource.Builder formatBuilder, 
Map<String, String> sqlOptions, String sqlFilter,
-                                   String... sqlSelectedFields) throws 
IOException;
+  protected abstract List<Row> runWithProjection(String... projected) throws 
Exception;

Review comment:
       What's the implementation for those four methods in FLIP-27  ?  Looks 
like we are just filling options in `TestFlinkSource`,  will the FLIP-27 have 
those different implementations ?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to