This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 77e03a35482 [HUDI-2141] Support flink compaction metrics (#9515)
77e03a35482 is described below

commit 77e03a354823d1d839d32edb91c63bc9e30efb8a
Author: StreamingFlames <[email protected]>
AuthorDate: Thu Sep 7 08:24:58 2023 +0800

    [HUDI-2141] Support flink compaction metrics (#9515)
---
 .../hudi/metrics/FlinkCompactionMetrics.java       | 106 ++++++++++++++++++++
 .../org/apache/hudi/metrics/FlinkWriteMetrics.java | 111 +++++++++++++++++++++
 .../apache/hudi/metrics/HoodieFlinkMetrics.java    |  23 +++++
 .../apache/hudi/sink/compact/CompactOperator.java  |  16 +++
 .../hudi/sink/compact/CompactionCommitSink.java    |  16 +++
 .../hudi/sink/compact/CompactionPlanOperator.java  |  19 +++-
 .../hudi/sink/utils/CompactFunctionWrapper.java    |  11 +-
 7 files changed, 298 insertions(+), 4 deletions(-)

diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/metrics/FlinkCompactionMetrics.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/metrics/FlinkCompactionMetrics.java
new file mode 100644
index 00000000000..abf7ef05a3f
--- /dev/null
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/metrics/FlinkCompactionMetrics.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.metrics;
+
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieInstantTimeGenerator;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.sink.compact.CompactOperator;
+import org.apache.hudi.sink.compact.CompactionPlanOperator;
+
+import org.apache.flink.metrics.MetricGroup;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.text.ParseException;
+import java.time.Duration;
+import java.time.Instant;
+
+/**
+ * Metrics for flink compaction.
+ */
+public class FlinkCompactionMetrics extends FlinkWriteMetrics {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(FlinkCompactionMetrics.class);
+
+  /**
+   * Key for compaction timer.
+   */
+  private static final String COMPACTION_KEY = "compaction";
+
+  /**
+   * Number of pending compaction instants.
+   *
+   * @see CompactionPlanOperator
+   */
+  private int pendingCompactionCount;
+
+  /**
+   * Duration between the earliest pending compaction instant time and now in 
seconds.
+   *
+   *  @see CompactionPlanOperator
+   */
+  private long compactionDelay;
+
+  /**
+   * Cost for consuming a compaction operation in milliseconds.
+   *
+   * @see CompactOperator
+   */
+  private long compactionCost;
+
+  public FlinkCompactionMetrics(MetricGroup metricGroup) {
+    super(metricGroup, HoodieTimeline.COMPACTION_ACTION);
+  }
+
+  @Override
+  public void registerMetrics() {
+    super.registerMetrics();
+    metricGroup.gauge(getMetricsName(actionType, "pendingCompactionCount"), () 
-> pendingCompactionCount);
+    metricGroup.gauge(getMetricsName(actionType, "compactionDelay"), () -> 
compactionDelay);
+    metricGroup.gauge(getMetricsName(actionType, "compactionCost"), () -> 
compactionCost);
+  }
+
+  public void setPendingCompactionCount(int pendingCompactionCount) {
+    this.pendingCompactionCount = pendingCompactionCount;
+  }
+
+  public void setFirstPendingCompactionInstant(Option<HoodieInstant> 
firstPendingCompactionInstant) {
+    try {
+      if (!firstPendingCompactionInstant.isPresent()) {
+        this.compactionDelay = 0L;
+      } else {
+        Instant start = 
HoodieInstantTimeGenerator.parseDateFromInstantTime(firstPendingCompactionInstant.get().getTimestamp()).toInstant();
+        this.compactionDelay = Duration.between(start, 
Instant.now()).getSeconds();
+      }
+    } catch (ParseException e) {
+      LOG.warn("Invalid input compaction instant" + 
firstPendingCompactionInstant);
+    }
+  }
+
+  public void startCompaction() {
+    startTimer(COMPACTION_KEY);
+  }
+
+  public void endCompaction() {
+    this.compactionCost = stopTimer(COMPACTION_KEY);
+  }
+
+}
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/metrics/FlinkWriteMetrics.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/metrics/FlinkWriteMetrics.java
new file mode 100644
index 00000000000..b19f8ef32d9
--- /dev/null
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/metrics/FlinkWriteMetrics.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.metrics;
+
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.table.timeline.HoodieInstantTimeGenerator;
+
+import org.apache.flink.metrics.MetricGroup;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.text.ParseException;
+
+/**
+ * Common flink write commit metadata metrics.
+ */
+public class FlinkWriteMetrics extends HoodieFlinkMetrics {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(FlinkWriteMetrics.class);
+
+  protected final String actionType;
+
+  private long totalPartitionsWritten;
+  private long totalFilesInsert;
+  private long totalFilesUpdate;
+  private long totalRecordsWritten;
+  private long totalUpdateRecordsWritten;
+  private long totalInsertRecordsWritten;
+  private long totalBytesWritten;
+  private long totalScanTime;
+  private long totalCompactedRecordsUpdated;
+  private long totalLogFilesCompacted;
+  private long totalLogFilesSize;
+  private long commitEpochTimeInMs;
+  private long durationInMs;
+
+  public FlinkWriteMetrics(MetricGroup metricGroup, String actionType) {
+    super(metricGroup);
+    this.actionType = actionType;
+  }
+
+  @Override
+  public void registerMetrics() {
+    // register commit gauge
+    metricGroup.gauge(getMetricsName(actionType, "totalPartitionsWritten"), () 
-> totalPartitionsWritten);
+    metricGroup.gauge(getMetricsName(actionType, "totalFilesInsert"), () -> 
totalFilesInsert);
+    metricGroup.gauge(getMetricsName(actionType, "totalFilesUpdate"), () -> 
totalFilesUpdate);
+    metricGroup.gauge(getMetricsName(actionType, "totalRecordsWritten"), () -> 
totalRecordsWritten);
+    metricGroup.gauge(getMetricsName(actionType, "totalUpdateRecordsWritten"), 
() -> totalUpdateRecordsWritten);
+    metricGroup.gauge(getMetricsName(actionType, "totalInsertRecordsWritten"), 
() -> totalInsertRecordsWritten);
+    metricGroup.gauge(getMetricsName(actionType, "totalBytesWritten"), () -> 
totalBytesWritten);
+    metricGroup.gauge(getMetricsName(actionType, "totalScanTime"), () -> 
totalScanTime);
+    metricGroup.gauge(getMetricsName(actionType, 
"totalCompactedRecordsUpdated"), () -> totalCompactedRecordsUpdated);
+    metricGroup.gauge(getMetricsName(actionType, "totalLogFilesCompacted"), () 
-> totalLogFilesCompacted);
+    metricGroup.gauge(getMetricsName(actionType, "totalLogFilesSize"), () -> 
totalLogFilesSize);
+    metricGroup.gauge(getMetricsName(actionType, "commitTime"), () -> 
commitEpochTimeInMs);
+    metricGroup.gauge(getMetricsName(actionType, "duration"), () -> 
durationInMs);
+  }
+
+  public void updateCommitMetrics(String instantTime, HoodieCommitMetadata 
metadata) {
+    long commitEpochTimeInMs;
+    try {
+      commitEpochTimeInMs = 
HoodieInstantTimeGenerator.parseDateFromInstantTime(instantTime).getTime();
+    } catch (ParseException e) {
+      LOG.warn("Invalid input issued instant: " + instantTime);
+      return;
+    }
+    updateCommitMetrics(commitEpochTimeInMs, System.currentTimeMillis() - 
commitEpochTimeInMs, metadata);
+  }
+
+  public void updateCommitMetrics(long commitEpochTimeInMs, long durationInMs, 
HoodieCommitMetadata metadata) {
+    updateCommitTimingMetrics(commitEpochTimeInMs, durationInMs);
+    totalPartitionsWritten = metadata.fetchTotalPartitionsWritten();
+    totalFilesInsert = metadata.fetchTotalFilesInsert();
+    totalFilesUpdate = metadata.fetchTotalFilesUpdated();
+    totalRecordsWritten = metadata.fetchTotalRecordsWritten();
+    totalUpdateRecordsWritten = metadata.fetchTotalUpdateRecordsWritten();
+    totalInsertRecordsWritten = metadata.fetchTotalInsertRecordsWritten();
+    totalBytesWritten = metadata.fetchTotalBytesWritten();
+    totalScanTime = metadata.getTotalScanTime();
+    totalCompactedRecordsUpdated = metadata.getTotalCompactedRecordsUpdated();
+    totalLogFilesCompacted = metadata.getTotalLogFilesCompacted();
+    totalLogFilesSize = metadata.getTotalLogFilesSize();
+  }
+
+  private void updateCommitTimingMetrics(long commitEpochTimeInMs, long 
durationInMs) {
+    this.commitEpochTimeInMs = commitEpochTimeInMs;
+    this.durationInMs = durationInMs;
+  }
+
+  protected String getMetricsName(String action, String metric) {
+    return String.format("%s.%s", action, metric);
+  }
+
+}
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/metrics/HoodieFlinkMetrics.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/metrics/HoodieFlinkMetrics.java
index a143010f278..ce58f35402a 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/metrics/HoodieFlinkMetrics.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/metrics/HoodieFlinkMetrics.java
@@ -22,18 +22,41 @@ import org.apache.flink.metrics.MetricGroup;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.HashMap;
+import java.util.Map;
+
 /**
  * Base class for flink read/write metrics.
  */
 public abstract class HoodieFlinkMetrics {
+
   private static final Logger LOG = 
LoggerFactory.getLogger(HoodieFlinkMetrics.class);
 
+  protected Map<String, Long> timers;
   protected final MetricGroup metricGroup;
 
   protected HoodieFlinkMetrics(MetricGroup metricGroup) {
+    this.timers = new HashMap<>();
     this.metricGroup = metricGroup;
   }
 
   public abstract void registerMetrics();
 
+  protected void startTimer(String name) {
+    if (timers.containsKey(name)) {
+      LOG.warn("Restarting timer for name: {}, override the value", name);
+    }
+    timers.put(name, System.currentTimeMillis());
+  }
+
+  protected long stopTimer(String name) {
+    if (!timers.containsKey(name)) {
+      LOG.warn("Cannot found name {} in timer, potentially caused by 
inconsistent call", name);
+      return 0;
+    }
+    long costs = System.currentTimeMillis() - timers.get(name);
+    timers.remove(name);
+    return costs;
+  }
+
 }
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactOperator.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactOperator.java
index 66743264457..fc034fcfc80 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactOperator.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactOperator.java
@@ -25,6 +25,7 @@ import org.apache.hudi.common.model.CompactionOperation;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.configuration.OptionsResolver;
+import org.apache.hudi.metrics.FlinkCompactionMetrics;
 import org.apache.hudi.sink.utils.NonThrownExecutor;
 import org.apache.hudi.table.HoodieFlinkCopyOnWriteTable;
 import 
org.apache.hudi.table.action.compact.HoodieFlinkMergeOnReadTableCompactor;
@@ -33,6 +34,7 @@ import org.apache.hudi.util.FlinkWriteClients;
 
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.operators.Output;
@@ -85,6 +87,11 @@ public class CompactOperator extends 
TableStreamOperator<CompactionCommitEvent>
    */
   private transient StreamRecordCollector<CompactionCommitEvent> collector;
 
+  /**
+   * Compaction metrics.
+   */
+  private transient FlinkCompactionMetrics compactionMetrics;
+
   public CompactOperator(Configuration conf) {
     this.conf = conf;
     this.asyncCompaction = OptionsResolver.needsAsyncCompaction(conf);
@@ -103,6 +110,7 @@ public class CompactOperator extends 
TableStreamOperator<CompactionCommitEvent>
       this.executor = NonThrownExecutor.builder(LOG).build();
     }
     this.collector = new StreamRecordCollector<>(output);
+    registerMetrics();
   }
 
   @Override
@@ -127,6 +135,7 @@ public class CompactOperator extends 
TableStreamOperator<CompactionCommitEvent>
                             CompactionOperation compactionOperation,
                             Collector<CompactionCommitEvent> collector,
                             HoodieWriteConfig writeConfig) throws IOException {
+    compactionMetrics.startCompaction();
     HoodieFlinkMergeOnReadTableCompactor<?> compactor = new 
HoodieFlinkMergeOnReadTableCompactor<>();
     HoodieTableMetaClient metaClient = 
writeClient.getHoodieTable().getMetaClient();
     String maxInstantTime = compactor.getMaxInstantTime(metaClient);
@@ -140,6 +149,7 @@ public class CompactOperator extends 
TableStreamOperator<CompactionCommitEvent>
         compactionOperation,
         instantTime, maxInstantTime,
         writeClient.getHoodieTable().getTaskContextSupplier());
+    compactionMetrics.endCompaction();
     collector.collect(new CompactionCommitEvent(instantTime, 
compactionOperation.getFileId(), writeStatuses, taskID));
   }
 
@@ -164,4 +174,10 @@ public class CompactOperator extends 
TableStreamOperator<CompactionCommitEvent>
       this.writeClient = null;
     }
   }
+
+  private void registerMetrics() {
+    MetricGroup metrics = getRuntimeContext().getMetricGroup();
+    compactionMetrics = new FlinkCompactionMetrics(metrics);
+    compactionMetrics.registerMetrics();
+  }
 }
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionCommitSink.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionCommitSink.java
index 828aa3c4265..192b5f5a397 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionCommitSink.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionCommitSink.java
@@ -26,6 +26,7 @@ import org.apache.hudi.common.util.CompactionUtils;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.configuration.FlinkOptions;
 import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.metrics.FlinkCompactionMetrics;
 import org.apache.hudi.sink.CleanFunction;
 import org.apache.hudi.table.HoodieFlinkTable;
 import org.apache.hudi.table.action.compact.CompactHelpers;
@@ -33,6 +34,7 @@ import org.apache.hudi.util.CompactionUtil;
 import org.apache.hudi.util.FlinkWriteClients;
 
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.MetricGroup;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -82,6 +84,11 @@ public class CompactionCommitSink extends 
CleanFunction<CompactionCommitEvent> {
    */
   private transient HoodieFlinkTable<?> table;
 
+  /**
+   * Compaction metrics.
+   */
+  private transient FlinkCompactionMetrics compactionMetrics;
+
   public CompactionCommitSink(Configuration conf) {
     super(conf);
     this.conf = conf;
@@ -96,6 +103,7 @@ public class CompactionCommitSink extends 
CleanFunction<CompactionCommitEvent> {
     this.commitBuffer = new HashMap<>();
     this.compactionPlanCache = new HashMap<>();
     this.table = this.writeClient.getHoodieTable();
+    registerMetrics();
   }
 
   @Override
@@ -174,6 +182,8 @@ public class CompactionCommitSink extends 
CleanFunction<CompactionCommitEvent> {
     // commit the compaction
     this.writeClient.commitCompaction(instant, metadata, Option.empty());
 
+    this.compactionMetrics.updateCommitMetrics(instant, metadata);
+
     // Whether to clean up the old log file when compaction
     if (!conf.getBoolean(FlinkOptions.CLEAN_ASYNC_ENABLED) && !isCleaning) {
       this.writeClient.clean();
@@ -184,4 +194,10 @@ public class CompactionCommitSink extends 
CleanFunction<CompactionCommitEvent> {
     this.commitBuffer.remove(instant);
     this.compactionPlanCache.remove(instant);
   }
+
+  private void registerMetrics() {
+    MetricGroup metrics = getRuntimeContext().getMetricGroup();
+    compactionMetrics = new FlinkCompactionMetrics(metrics);
+    compactionMetrics.registerMetrics();
+  }
 }
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionPlanOperator.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionPlanOperator.java
index d7446c9bfab..bb4ee0a34ac 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionPlanOperator.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionPlanOperator.java
@@ -24,6 +24,7 @@ import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.util.CompactionUtils;
 import org.apache.hudi.common.util.Option;
+import org.apache.hudi.metrics.FlinkCompactionMetrics;
 import org.apache.hudi.table.HoodieFlinkTable;
 import org.apache.hudi.table.marker.WriteMarkersFactory;
 import org.apache.hudi.util.CompactionUtil;
@@ -31,6 +32,7 @@ import org.apache.hudi.util.FlinkTables;
 
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
 import org.apache.flink.streaming.api.operators.BoundedOneInput;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
@@ -61,6 +63,8 @@ public class CompactionPlanOperator extends 
AbstractStreamOperator<CompactionPla
   @SuppressWarnings("rawtypes")
   private transient HoodieFlinkTable table;
 
+  private transient FlinkCompactionMetrics compactionMetrics;
+
   public CompactionPlanOperator(Configuration conf) {
     this.conf = conf;
   }
@@ -73,6 +77,7 @@ public class CompactionPlanOperator extends 
AbstractStreamOperator<CompactionPla
     // these instants are in priority for scheduling task because the 
compaction instants are
     // scheduled from earliest(FIFO sequence).
     CompactionUtil.rollbackCompaction(table);
+    registerMetrics();
   }
 
   @Override
@@ -100,9 +105,15 @@ public class CompactionPlanOperator extends 
AbstractStreamOperator<CompactionPla
   }
 
   private void scheduleCompaction(HoodieFlinkTable<?> table, long 
checkpointId) throws IOException {
+    HoodieTimeline pendingCompactionTimeline = 
table.getActiveTimeline().filterPendingCompactionTimeline();
+
     // the first instant takes the highest priority.
-    Option<HoodieInstant> firstRequested = 
table.getActiveTimeline().filterPendingCompactionTimeline()
+    Option<HoodieInstant> firstRequested = pendingCompactionTimeline
         .filter(instant -> instant.getState() == 
HoodieInstant.State.REQUESTED).firstInstant();
+    // record metrics
+    compactionMetrics.setFirstPendingCompactionInstant(firstRequested);
+    
compactionMetrics.setPendingCompactionCount(pendingCompactionTimeline.countInstants());
+
     if (!firstRequested.isPresent()) {
       // do nothing.
       LOG.info("No compaction plan for checkpoint " + checkpointId);
@@ -148,4 +159,10 @@ public class CompactionPlanOperator extends 
AbstractStreamOperator<CompactionPla
     // Called when the input data ends, only used in batch mode.
     notifyCheckpointComplete(-1);
   }
+
+  private void registerMetrics() {
+    MetricGroup metrics = getRuntimeContext().getMetricGroup();
+    compactionMetrics = new FlinkCompactionMetrics(metrics);
+    compactionMetrics.registerMetrics();
+  }
 }
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/CompactFunctionWrapper.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/CompactFunctionWrapper.java
index 78a8305c9c5..b042139aee4 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/CompactFunctionWrapper.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/CompactFunctionWrapper.java
@@ -55,6 +55,10 @@ public class CompactFunctionWrapper {
    * Function that generates the {@link HoodieCompactionPlan}.
    */
   private CompactionPlanOperator compactionPlanOperator;
+  /**
+   * Output to collect the compaction plan events.
+   */
+  private CollectorOutput<CompactionPlanEvent> planEventOutput;
   /**
    * Output to collect the compaction commit events.
    */
@@ -83,6 +87,8 @@ public class CompactFunctionWrapper {
 
   public void openFunction() throws Exception {
     compactionPlanOperator = new CompactionPlanOperator(conf);
+    planEventOutput =  new CollectorOutput<>();
+    compactionPlanOperator.setup(streamTask, streamConfig, planEventOutput);
     compactionPlanOperator.open();
 
     compactOperator = new CompactOperator(conf);
@@ -102,11 +108,10 @@ public class CompactFunctionWrapper {
 
   public void compact(long checkpointID) throws Exception {
     // collect the CompactEvents.
-    CollectorOutput<CompactionPlanEvent> output = new CollectorOutput<>();
-    compactionPlanOperator.setOutput(output);
+    compactionPlanOperator.setOutput(planEventOutput);
     compactionPlanOperator.notifyCheckpointComplete(checkpointID);
     // collect the CompactCommitEvents
-    for (CompactionPlanEvent event : output.getRecords()) {
+    for (CompactionPlanEvent event : planEventOutput.getRecords()) {
       compactOperator.processElement(new StreamRecord<>(event));
     }
     // handle and commit the compaction

Reply via email to