danny0405 commented on code in PR #8102:
URL: https://github.com/apache/hudi/pull/8102#discussion_r1157081803
##########
hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java:
##########
@@ -1892,4 +1992,66 @@ private List<Row> execSelectSql(TableEnvironment tEnv,
String select, String sin
.flatMap(Collection::stream)
.collect(Collectors.toList());
}
+
+ private void testContinuousPartitionPrune(
+ HoodieTableType tableType,
+ boolean hiveStylePartitioning,
+ String filterCondition,
+ List<RowData> results
+ ) throws Exception {
+ Configuration conf =
TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());
+ conf.setString(FlinkOptions.TABLE_NAME, "t1");
+ conf.setString(FlinkOptions.TABLE_TYPE, tableType.name());
+ conf.setBoolean(FlinkOptions.HIVE_STYLE_PARTITIONING,
hiveStylePartitioning);
+
+ // write one commit
+ TestData.writeData(TestData.DATA_SET_INSERT, conf);
+
+ String hoodieTableDDL = sql("t1")
+ .option(FlinkOptions.PATH, tempFile.getAbsolutePath())
+ .option(FlinkOptions.TABLE_TYPE, tableType)
+ .option(FlinkOptions.READ_AS_STREAMING, true)
+ .option(FlinkOptions.READ_STREAMING_CHECK_INTERVAL, 2)
+ .option(FlinkOptions.HIVE_STYLE_PARTITIONING, hiveStylePartitioning)
+ .end();
+ streamTableEnv.executeSql(hoodieTableDDL);
+
+ String sinkDDL = "create table sink(\n"
+ + " uuid varchar(20),\n"
+ + " name varchar(20),\n"
+ + " age int,\n"
+ + " ts timestamp,\n"
+ + " part varchar(20)"
+ + ") with (\n"
+ + " 'connector' = '" + CollectSinkTableFactory.FACTORY_ID + "'"
+ + ")";
+ TableResult tableResult = submitSelectSql(
+ streamTableEnv,
+ "select uuid, name, age, ts, `partition` as part from t1 where " +
filterCondition,
+ sinkDDL);
+
+ // write second commit
+ TestData.writeData(DATA_SET_NEW_PARTITIONS, conf);
+ // stop the streaming query and fetch the result
+ List<Row> result = stopAndFetchData(streamTableEnv, tableResult, 10);
+ assertRowsEquals(result, results);
+ }
+
+ private TableResult submitSelectSql(TableEnvironment tEnv, String select,
String sinkDDL) {
+ tEnv.executeSql("DROP TABLE IF EXISTS sink");
+ tEnv.executeSql(sinkDDL);
+ TableResult tableResult = tEnv.executeSql("insert into sink " + select);
+ return tableResult;
+ }
+
+ private List<Row> stopAndFetchData(TableEnvironment tEnv, TableResult
tableResult, long timeout)
+ throws InterruptedException {
+ // wait for the timeout then cancels the job
Review Comment:
There is no need to hold on, just add enough num of partitions then start
the steaming read pipeline.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]