bvaradar commented on code in PR #8385:
URL: https://github.com/apache/hudi/pull/8385#discussion_r1175661779


##########
hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/compact/ITTestHoodieFlinkCompactor.java:
##########
@@ -169,6 +172,85 @@ public void testHoodieFlinkCompactor(boolean 
enableChangelog) throws Exception {
     TestData.checkWrittenDataCOW(tempFile, EXPECTED1);
   }
 
+  @ParameterizedTest
+  @ValueSource(booleans = {true, false})
+  public void testHoodieFlinkCompactorWithUpgradeAndDowngrade(boolean upgrade) 
throws Exception {
+    // Create hoodie table and insert into data.
+    EnvironmentSettings settings = 
EnvironmentSettings.newInstance().inBatchMode().build();
+    TableEnvironment tableEnv = TableEnvironmentImpl.create(settings);
+    tableEnv.getConfig().getConfiguration()
+        
.setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 4);
+    Map<String, String> options = new HashMap<>();
+    options.put(FlinkOptions.COMPACTION_SCHEDULE_ENABLED.key(), "false");
+    options.put(FlinkOptions.COMPACTION_ASYNC_ENABLED.key(), "false");
+    options.put(FlinkOptions.PATH.key(), tempFile.getAbsolutePath());
+    options.put(FlinkOptions.TABLE_TYPE.key(), "MERGE_ON_READ");
+    String hoodieTableDDL = TestConfigurations.getCreateHoodieTableDDL("t1", 
options);
+    tableEnv.executeSql(hoodieTableDDL);
+    tableEnv.executeSql(TestSQL.INSERT_T1).await();
+
+    // wait for the asynchronous commit to finish
+    TimeUnit.SECONDS.sleep(3);
+
+    // Make configuration and setAvroSchema.
+    StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+    FlinkCompactionConfig cfg = new FlinkCompactionConfig();
+    cfg.path = tempFile.getAbsolutePath();
+    Configuration conf = FlinkCompactionConfig.toFlinkConfig(cfg);
+    conf.setString(FlinkOptions.TABLE_TYPE.key(), "MERGE_ON_READ");
+
+    // create metaClient
+    HoodieTableMetaClient metaClient = StreamerUtil.createMetaClient(conf);
+
+    // set the table name
+    conf.setString(FlinkOptions.TABLE_NAME, 
metaClient.getTableConfig().getTableName());
+
+    // set table schema
+    CompactionUtil.setAvroSchema(conf, metaClient);
+
+    // infer changelog mode
+    CompactionUtil.inferChangelogMode(conf, metaClient);
+
+    HoodieFlinkWriteClient writeClient = 
FlinkWriteClients.createWriteClient(conf);
+
+    String compactionInstantTime = scheduleCompactionPlan(metaClient, 
writeClient);
+
+    HoodieFlinkTable<?> table = writeClient.getHoodieTable();
+
+    // try to upgrade or downgrade
+    if (upgrade) {
+      new FiveToSixUpgradeHandler().upgrade(writeClient.getConfig(), 
writeClient.getEngineContext(), "none", 
FlinkUpgradeDowngradeHelper.getInstance());

Review Comment:
   Thanks @Mulavar for adding the testcase.  Instead of directly calling 
FiveToSix and SixToFive upgrade/downgrade, can you call  
UpgradeDowngrade.run(). It will make the test generic and not specific to the 
version. We can make the test's goal be to ensure compaction succeeds after 
upgrading and downgrading. Let me know your thoughts. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to