Mulavar commented on code in PR #8385:
URL: https://github.com/apache/hudi/pull/8385#discussion_r1174716920


##########
hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/compact/ITTestHoodieFlinkCompactor.java:
##########
@@ -169,6 +172,85 @@ public void testHoodieFlinkCompactor(boolean 
enableChangelog) throws Exception {
     TestData.checkWrittenDataCOW(tempFile, EXPECTED1);
   }
 
+  @ParameterizedTest
+  @ValueSource(booleans = {true, false})
+  public void testHoodieFlinkCompactorWithUpgradeAndDowngrade(boolean upgrade) 
throws Exception {
+    // Create hoodie table and insert into data.
+    EnvironmentSettings settings = 
EnvironmentSettings.newInstance().inBatchMode().build();
+    TableEnvironment tableEnv = TableEnvironmentImpl.create(settings);
+    tableEnv.getConfig().getConfiguration()
+        
.setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 4);
+    Map<String, String> options = new HashMap<>();
+    options.put(FlinkOptions.COMPACTION_SCHEDULE_ENABLED.key(), "false");
+    options.put(FlinkOptions.COMPACTION_ASYNC_ENABLED.key(), "false");
+    options.put(FlinkOptions.PATH.key(), tempFile.getAbsolutePath());
+    options.put(FlinkOptions.TABLE_TYPE.key(), "MERGE_ON_READ");
+    String hoodieTableDDL = TestConfigurations.getCreateHoodieTableDDL("t1", 
options);
+    tableEnv.executeSql(hoodieTableDDL);
+    tableEnv.executeSql(TestSQL.INSERT_T1).await();
+
+    // wait for the asynchronous commit to finish
+    TimeUnit.SECONDS.sleep(3);
+
+    // Make configuration and setAvroSchema.
+    StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+    FlinkCompactionConfig cfg = new FlinkCompactionConfig();
+    cfg.path = tempFile.getAbsolutePath();
+    Configuration conf = FlinkCompactionConfig.toFlinkConfig(cfg);
+    conf.setString(FlinkOptions.TABLE_TYPE.key(), "MERGE_ON_READ");
+
+    // create metaClient
+    HoodieTableMetaClient metaClient = StreamerUtil.createMetaClient(conf);
+
+    // set the table name
+    conf.setString(FlinkOptions.TABLE_NAME, 
metaClient.getTableConfig().getTableName());
+
+    // set table schema
+    CompactionUtil.setAvroSchema(conf, metaClient);
+
+    // infer changelog mode
+    CompactionUtil.inferChangelogMode(conf, metaClient);
+
+    HoodieFlinkWriteClient writeClient = 
FlinkWriteClients.createWriteClient(conf);
+
+    String compactionInstantTime = scheduleCompactionPlan(metaClient, 
writeClient);
+
+    HoodieFlinkTable<?> table = writeClient.getHoodieTable();
+
+    // try to upgrade or downgrade
+    if (upgrade) {
+      new FiveToSixUpgradeHandler().upgrade(writeClient.getConfig(), 
writeClient.getEngineContext(), "none", 
FlinkUpgradeDowngradeHelper.getInstance());
+    } else {
+      new SixToFiveDowngradeHandler().downgrade(writeClient.getConfig(), 
writeClient.getEngineContext(), "none", 
FlinkUpgradeDowngradeHelper.getInstance());
+    }

Review Comment:
   @bvaradar Hi, I've added test about compaction after upgrading/downgrading 
operation, could u take a look at thie pr again?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to