bvaradar commented on code in PR #8385:
URL: https://github.com/apache/hudi/pull/8385#discussion_r1175661779
##########
hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/compact/ITTestHoodieFlinkCompactor.java:
##########
@@ -169,6 +172,85 @@ public void testHoodieFlinkCompactor(boolean
enableChangelog) throws Exception {
TestData.checkWrittenDataCOW(tempFile, EXPECTED1);
}
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ public void testHoodieFlinkCompactorWithUpgradeAndDowngrade(boolean upgrade)
throws Exception {
+ // Create hoodie table and insert into data.
+ EnvironmentSettings settings =
EnvironmentSettings.newInstance().inBatchMode().build();
+ TableEnvironment tableEnv = TableEnvironmentImpl.create(settings);
+ tableEnv.getConfig().getConfiguration()
+
.setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 4);
+ Map<String, String> options = new HashMap<>();
+ options.put(FlinkOptions.COMPACTION_SCHEDULE_ENABLED.key(), "false");
+ options.put(FlinkOptions.COMPACTION_ASYNC_ENABLED.key(), "false");
+ options.put(FlinkOptions.PATH.key(), tempFile.getAbsolutePath());
+ options.put(FlinkOptions.TABLE_TYPE.key(), "MERGE_ON_READ");
+ String hoodieTableDDL = TestConfigurations.getCreateHoodieTableDDL("t1",
options);
+ tableEnv.executeSql(hoodieTableDDL);
+ tableEnv.executeSql(TestSQL.INSERT_T1).await();
+
+ // wait for the asynchronous commit to finish
+ TimeUnit.SECONDS.sleep(3);
+
+ // Make configuration and setAvroSchema.
+ StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ FlinkCompactionConfig cfg = new FlinkCompactionConfig();
+ cfg.path = tempFile.getAbsolutePath();
+ Configuration conf = FlinkCompactionConfig.toFlinkConfig(cfg);
+ conf.setString(FlinkOptions.TABLE_TYPE.key(), "MERGE_ON_READ");
+
+ // create metaClient
+ HoodieTableMetaClient metaClient = StreamerUtil.createMetaClient(conf);
+
+ // set the table name
+ conf.setString(FlinkOptions.TABLE_NAME,
metaClient.getTableConfig().getTableName());
+
+ // set table schema
+ CompactionUtil.setAvroSchema(conf, metaClient);
+
+ // infer changelog mode
+ CompactionUtil.inferChangelogMode(conf, metaClient);
+
+ HoodieFlinkWriteClient writeClient =
FlinkWriteClients.createWriteClient(conf);
+
+ String compactionInstantTime = scheduleCompactionPlan(metaClient,
writeClient);
+
+ HoodieFlinkTable<?> table = writeClient.getHoodieTable();
+
+ // try to upgrade or downgrade
+ if (upgrade) {
+ new FiveToSixUpgradeHandler().upgrade(writeClient.getConfig(),
writeClient.getEngineContext(), "none",
FlinkUpgradeDowngradeHelper.getInstance());
Review Comment:
Thanks @Mulavar for adding the testcase. Instead of directly calling
FiveToSix and SixToFive upgrade/downgrade, can you call
UpgradeDowngrade.run(). It will make the test generic and not specific to the
version. We can make the test's goal be to ensure compaction succeeds after
upgrading and downgrading. Let me know your thoughts.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]