This is an automated email from the ASF dual-hosted git repository.

zhangyue19921010 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new c6441134ba6 [MINOR] Fix testHoodieFlinkClusteringScheduleAfterArchive 
(#8110)
c6441134ba6 is described below

commit c6441134ba679177466eb945319d4245a08e14b0
Author: voonhous <[email protected]>
AuthorDate: Wed Mar 8 09:48:41 2023 +0800

    [MINOR] Fix testHoodieFlinkClusteringScheduleAfterArchive (#8110)
    
    * Fix testHoodieFlinkClusteringScheduleAfterArchive
    
    * Fix checkstyle
---
 .../sink/cluster/ITTestHoodieFlinkClustering.java    | 20 +++++++++++---------
 1 file changed, 11 insertions(+), 9 deletions(-)

diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/cluster/ITTestHoodieFlinkClustering.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/cluster/ITTestHoodieFlinkClustering.java
index 7dcd0cec1c3..1ffde729001 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/cluster/ITTestHoodieFlinkClustering.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/cluster/ITTestHoodieFlinkClustering.java
@@ -18,9 +18,9 @@
 
 package org.apache.hudi.sink.cluster;
 
+import org.apache.hudi.avro.model.HoodieClusteringGroup;
 import org.apache.hudi.avro.model.HoodieClusteringPlan;
 import org.apache.hudi.client.HoodieFlinkWriteClient;
-import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.WriteOperationType;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
@@ -65,6 +65,7 @@ import org.junit.jupiter.api.io.TempDir;
 
 import java.io.File;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
 
@@ -348,12 +349,12 @@ public class ITTestHoodieFlinkClustering {
 
     // judge whether have operation
     // To compute the clustering instant time and do clustering.
-    String clusteringInstantTime = HoodieActiveTimeline.createNewInstantTime();
+    String firstClusteringInstant = 
HoodieActiveTimeline.createNewInstantTime();
 
     HoodieFlinkWriteClient<?> writeClient = 
FlinkWriteClients.createWriteClient(conf);
     HoodieFlinkTable<?> table = writeClient.getHoodieTable();
 
-    boolean scheduled = 
writeClient.scheduleClusteringAtInstant(clusteringInstantTime, Option.empty());
+    boolean scheduled = 
writeClient.scheduleClusteringAtInstant(firstClusteringInstant, Option.empty());
 
     assertTrue(scheduled, "The clustering plan should be scheduled");
 
@@ -370,7 +371,7 @@ public class ITTestHoodieFlinkClustering {
     HoodieClusteringPlan clusteringPlan = 
clusteringPlanOption.get().getRight();
 
     // Mark instant as clustering inflight
-    HoodieInstant instant = 
HoodieTimeline.getReplaceCommitRequestedInstant(clusteringInstantTime);
+    HoodieInstant instant = 
HoodieTimeline.getReplaceCommitRequestedInstant(firstClusteringInstant);
     table.getActiveTimeline().transitionReplaceRequestedToInflight(instant, 
Option.empty());
 
     final Schema tableAvroSchema = 
StreamerUtil.getTableAvroSchema(table.getMetaClient(), false);
@@ -378,7 +379,7 @@ public class ITTestHoodieFlinkClustering {
     final RowType rowType = (RowType) rowDataType.getLogicalType();
 
     DataStream<ClusteringCommitEvent> dataStream =
-        env.addSource(new ClusteringPlanSourceFunction(clusteringInstantTime, 
clusteringPlan))
+        env.addSource(new ClusteringPlanSourceFunction(firstClusteringInstant, 
clusteringPlan))
             .name("clustering_source")
             .uid("uid_clustering_source")
             .rebalance()
@@ -415,10 +416,11 @@ public class ITTestHoodieFlinkClustering {
     timeline = table.getActiveTimeline().filterPendingReplaceTimeline()
         .filter(i -> i.getState() == HoodieInstant.State.REQUESTED);
 
+    HoodieInstant secondClusteringInstant = timeline.lastInstant().get();
+    List<HoodieClusteringGroup> inputFileGroups = 
ClusteringUtils.getClusteringPlan(table.getMetaClient(), 
secondClusteringInstant).get().getRight().getInputGroups();
     // clustering plan has no previous file slice generated by previous 
pending clustering
-    assertFalse(ClusteringUtils.getClusteringPlan(table.getMetaClient(), 
timeline.lastInstant().get()).get()
-        .getRight().getInputGroups()
-        .stream().anyMatch(g -> g.getSlices()
-            .stream().anyMatch(f -> 
clusteringInstantTime.equals(FSUtils.getCommitTime(f.getDataFilePath())))));
+    assertFalse(inputFileGroups
+        .stream().anyMatch(fg -> fg.getSlices()
+            .stream().anyMatch(s -> 
s.getDataFilePath().contains(firstClusteringInstant))));
   }
 }

Reply via email to