danny0405 commented on a change in pull request #3046:
URL: https://github.com/apache/hudi/pull/3046#discussion_r647922855



##########
File path: hudi-flink/src/test/java/org/apache/hudi/sink/StreamWriteITCase.java
##########
@@ -137,6 +159,101 @@ public void testWriteToHoodie() throws Exception {
     TestData.checkWrittenFullData(tempFile, EXPECTED);
   }
 
+  @Test
+  public void testHoodieFlinkCompactor() throws Exception {
+    // Create hoodie table and insert into data.
+    EnvironmentSettings settings = 
EnvironmentSettings.newInstance().inBatchMode().build();
+    TableEnvironment tableEnv = TableEnvironmentImpl.create(settings);
+    tableEnv.getConfig().getConfiguration()
+            
.setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1);
+    Map<String, String> options = new HashMap<>();
+    options.put(FlinkOptions.PATH.key(), tempFile.getAbsolutePath());
+    options.put(FlinkOptions.TABLE_TYPE.key(), "MERGE_ON_READ");
+    String hoodieTableDDL = TestConfigurations.getCreateHoodieTableDDL("t1", 
options);
+    tableEnv.executeSql(hoodieTableDDL);
+    String insertInto = "insert into t1 values\n"
+            + "('id1','Danny',23,TIMESTAMP '1970-01-01 00:00:01','par1'),\n"
+            + "('id2','Stephen',33,TIMESTAMP '1970-01-01 00:00:02','par1'),\n"
+            + "('id3','Julian',53,TIMESTAMP '1970-01-01 00:00:03','par2'),\n"
+            + "('id4','Fabian',31,TIMESTAMP '1970-01-01 00:00:04','par2'),\n"
+            + "('id5','Sophia',18,TIMESTAMP '1970-01-01 00:00:05','par3'),\n"
+            + "('id6','Emma',20,TIMESTAMP '1970-01-01 00:00:06','par3'),\n"
+            + "('id7','Bob',44,TIMESTAMP '1970-01-01 00:00:07','par4'),\n"
+            + "('id8','Han',56,TIMESTAMP '1970-01-01 00:00:08','par4')";
+    TableResult tableResult = tableEnv.executeSql(insertInto);
+    TimeUnit.SECONDS.sleep(5);
+    tableResult.await();
+
+    // Make configuration and setAvroSchema.
+    StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+    FlinkCompactionConfig cfg = new FlinkCompactionConfig();
+    cfg.path = tempFile.getAbsolutePath();
+    Configuration conf = FlinkCompactionConfig.toFlinkConfig(cfg);
+    conf.setString(FlinkOptions.TABLE_TYPE.key(), "MERGE_ON_READ");
+    conf.setString(FlinkOptions.PARTITION_PATH_FIELD.key(), "partition");
+
+    // set table schema.
+    conf = CompactionUtil.setAvroSchema(conf);
+
+    // judge whether have operation
+    // To compute the compaction instant time and do compaction.
+    String instantTime = 
CompactionUtil.getCompactionInstantTime(CompactionUtil.createMetaClient(conf));
+    HoodieFlinkWriteClient writeClient = StreamerUtil.createWriteClient(conf, 
null);
+    writeClient.scheduleCompactionAtInstant(instantTime, Option.empty());
+
+    HoodieFlinkTable<?> table = writeClient.getHoodieTable();
+    // the last instant takes the highest priority.
+    Option<HoodieInstant> compactionInstant = 
table.getActiveTimeline().filterPendingCompactionTimeline().lastInstant();
+    String compactionInstantTime = compactionInstant.get().getTimestamp();
+    if (compactionInstantTime == null) {
+      // do nothing.

Review comment:
       The check condition never hits, remove all these checks.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to