openinx commented on a change in pull request #2047:
URL: https://github.com/apache/iceberg/pull/2047#discussion_r585563087
##########
File path:
flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkScan.java
##########
@@ -113,68 +93,47 @@ public void before() throws IOException {
File warehouseFile = TEMPORARY_FOLDER.newFolder();
Assert.assertTrue(warehouseFile.delete());
// before variables
- Configuration conf = new Configuration();
warehouse = "file:" + warehouseFile;
+ Configuration conf = new Configuration();
catalog = new HadoopCatalog(conf, warehouse);
+ location = String.format("%s/%s/%s", warehouse, TestFixtures.DATABASE,
TestFixtures.TABLE);
}
- private List<Row> runWithProjection(String... projected) throws IOException {
- TableSchema.Builder builder = TableSchema.builder();
- TableSchema schema = FlinkSchemaUtil.toSchema(FlinkSchemaUtil.convert(
- catalog.loadTable(TableIdentifier.of("default", "t")).schema()));
- for (String field : projected) {
- TableColumn column = schema.getTableColumn(field).get();
- builder.field(column.getName(), column.getType());
- }
- return run(FlinkSource.forRowData().project(builder.build()),
Maps.newHashMap(), "", projected);
- }
-
- protected List<Row> runWithFilter(Expression filter, String sqlFilter)
throws IOException {
- FlinkSource.Builder builder =
FlinkSource.forRowData().filters(Collections.singletonList(filter));
- return run(builder, Maps.newHashMap(), sqlFilter, "*");
- }
-
- private List<Row> runWithOptions(Map<String, String> options) throws
IOException {
- FlinkSource.Builder builder = FlinkSource.forRowData();
- Optional.ofNullable(options.get("snapshot-id")).ifPresent(value ->
builder.snapshotId(Long.parseLong(value)));
- Optional.ofNullable(options.get("start-snapshot-id"))
- .ifPresent(value -> builder.startSnapshotId(Long.parseLong(value)));
- Optional.ofNullable(options.get("end-snapshot-id"))
- .ifPresent(value -> builder.endSnapshotId(Long.parseLong(value)));
- Optional.ofNullable(options.get("as-of-timestamp"))
- .ifPresent(value -> builder.asOfTimestamp(Long.parseLong(value)));
- return run(builder, options, "", "*");
+ @After
+ public void after() throws IOException {
}
- private List<Row> run() throws IOException {
- return run(FlinkSource.forRowData(), Maps.newHashMap(), "", "*");
+ protected TableLoader tableLoader() {
+ return TableLoader.fromHadoopTable(location);
}
- protected abstract List<Row> run(FlinkSource.Builder formatBuilder,
Map<String, String> sqlOptions, String sqlFilter,
- String... sqlSelectedFields) throws
IOException;
+ protected abstract List<Row> runWithProjection(String... projected) throws
Exception;
Review comment:
What's the implementation for those four methods in FLIP-27 ? Looks
like we are just filling options in `TestFlinkSource`, will the FLIP-27 have
those different implementations ?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]