This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 77e03a35482 [HUDI-2141] Support flink compaction metrics (#9515)
77e03a35482 is described below
commit 77e03a354823d1d839d32edb91c63bc9e30efb8a
Author: StreamingFlames <[email protected]>
AuthorDate: Thu Sep 7 08:24:58 2023 +0800
[HUDI-2141] Support flink compaction metrics (#9515)
---
.../hudi/metrics/FlinkCompactionMetrics.java | 106 ++++++++++++++++++++
.../org/apache/hudi/metrics/FlinkWriteMetrics.java | 111 +++++++++++++++++++++
.../apache/hudi/metrics/HoodieFlinkMetrics.java | 23 +++++
.../apache/hudi/sink/compact/CompactOperator.java | 16 +++
.../hudi/sink/compact/CompactionCommitSink.java | 16 +++
.../hudi/sink/compact/CompactionPlanOperator.java | 19 +++-
.../hudi/sink/utils/CompactFunctionWrapper.java | 11 +-
7 files changed, 298 insertions(+), 4 deletions(-)
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/metrics/FlinkCompactionMetrics.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/metrics/FlinkCompactionMetrics.java
new file mode 100644
index 00000000000..abf7ef05a3f
--- /dev/null
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/metrics/FlinkCompactionMetrics.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.metrics;
+
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieInstantTimeGenerator;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.sink.compact.CompactOperator;
+import org.apache.hudi.sink.compact.CompactionPlanOperator;
+
+import org.apache.flink.metrics.MetricGroup;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.text.ParseException;
+import java.time.Duration;
+import java.time.Instant;
+
+/**
+ * Metrics for flink compaction.
+ */
+public class FlinkCompactionMetrics extends FlinkWriteMetrics {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(FlinkCompactionMetrics.class);
+
+ /**
+ * Key for compaction timer.
+ */
+ private static final String COMPACTION_KEY = "compaction";
+
+ /**
+ * Number of pending compaction instants.
+ *
+ * @see CompactionPlanOperator
+ */
+ private int pendingCompactionCount;
+
+ /**
+ * Duration between the earliest pending compaction instant time and now in
seconds.
+ *
+ * @see CompactionPlanOperator
+ */
+ private long compactionDelay;
+
+ /**
+ * Cost for consuming a compaction operation in milliseconds.
+ *
+ * @see CompactOperator
+ */
+ private long compactionCost;
+
+ public FlinkCompactionMetrics(MetricGroup metricGroup) {
+ super(metricGroup, HoodieTimeline.COMPACTION_ACTION);
+ }
+
+ @Override
+ public void registerMetrics() {
+ super.registerMetrics();
+ metricGroup.gauge(getMetricsName(actionType, "pendingCompactionCount"), ()
-> pendingCompactionCount);
+ metricGroup.gauge(getMetricsName(actionType, "compactionDelay"), () ->
compactionDelay);
+ metricGroup.gauge(getMetricsName(actionType, "compactionCost"), () ->
compactionCost);
+ }
+
+ public void setPendingCompactionCount(int pendingCompactionCount) {
+ this.pendingCompactionCount = pendingCompactionCount;
+ }
+
+ public void setFirstPendingCompactionInstant(Option<HoodieInstant>
firstPendingCompactionInstant) {
+ try {
+ if (!firstPendingCompactionInstant.isPresent()) {
+ this.compactionDelay = 0L;
+ } else {
+ Instant start =
HoodieInstantTimeGenerator.parseDateFromInstantTime(firstPendingCompactionInstant.get().getTimestamp()).toInstant();
+ this.compactionDelay = Duration.between(start,
Instant.now()).getSeconds();
+ }
+ } catch (ParseException e) {
+ LOG.warn("Invalid input compaction instant" +
firstPendingCompactionInstant);
+ }
+ }
+
+ public void startCompaction() {
+ startTimer(COMPACTION_KEY);
+ }
+
+ public void endCompaction() {
+ this.compactionCost = stopTimer(COMPACTION_KEY);
+ }
+
+}
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/metrics/FlinkWriteMetrics.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/metrics/FlinkWriteMetrics.java
new file mode 100644
index 00000000000..b19f8ef32d9
--- /dev/null
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/metrics/FlinkWriteMetrics.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.metrics;
+
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.table.timeline.HoodieInstantTimeGenerator;
+
+import org.apache.flink.metrics.MetricGroup;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.text.ParseException;
+
+/**
+ * Common flink write commit metadata metrics.
+ */
+public class FlinkWriteMetrics extends HoodieFlinkMetrics {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(FlinkWriteMetrics.class);
+
+ protected final String actionType;
+
+ private long totalPartitionsWritten;
+ private long totalFilesInsert;
+ private long totalFilesUpdate;
+ private long totalRecordsWritten;
+ private long totalUpdateRecordsWritten;
+ private long totalInsertRecordsWritten;
+ private long totalBytesWritten;
+ private long totalScanTime;
+ private long totalCompactedRecordsUpdated;
+ private long totalLogFilesCompacted;
+ private long totalLogFilesSize;
+ private long commitEpochTimeInMs;
+ private long durationInMs;
+
+ public FlinkWriteMetrics(MetricGroup metricGroup, String actionType) {
+ super(metricGroup);
+ this.actionType = actionType;
+ }
+
+ @Override
+ public void registerMetrics() {
+ // register commit gauge
+ metricGroup.gauge(getMetricsName(actionType, "totalPartitionsWritten"), ()
-> totalPartitionsWritten);
+ metricGroup.gauge(getMetricsName(actionType, "totalFilesInsert"), () ->
totalFilesInsert);
+ metricGroup.gauge(getMetricsName(actionType, "totalFilesUpdate"), () ->
totalFilesUpdate);
+ metricGroup.gauge(getMetricsName(actionType, "totalRecordsWritten"), () ->
totalRecordsWritten);
+ metricGroup.gauge(getMetricsName(actionType, "totalUpdateRecordsWritten"),
() -> totalUpdateRecordsWritten);
+ metricGroup.gauge(getMetricsName(actionType, "totalInsertRecordsWritten"),
() -> totalInsertRecordsWritten);
+ metricGroup.gauge(getMetricsName(actionType, "totalBytesWritten"), () ->
totalBytesWritten);
+ metricGroup.gauge(getMetricsName(actionType, "totalScanTime"), () ->
totalScanTime);
+ metricGroup.gauge(getMetricsName(actionType,
"totalCompactedRecordsUpdated"), () -> totalCompactedRecordsUpdated);
+ metricGroup.gauge(getMetricsName(actionType, "totalLogFilesCompacted"), ()
-> totalLogFilesCompacted);
+ metricGroup.gauge(getMetricsName(actionType, "totalLogFilesSize"), () ->
totalLogFilesSize);
+ metricGroup.gauge(getMetricsName(actionType, "commitTime"), () ->
commitEpochTimeInMs);
+ metricGroup.gauge(getMetricsName(actionType, "duration"), () ->
durationInMs);
+ }
+
+ public void updateCommitMetrics(String instantTime, HoodieCommitMetadata
metadata) {
+ long commitEpochTimeInMs;
+ try {
+ commitEpochTimeInMs =
HoodieInstantTimeGenerator.parseDateFromInstantTime(instantTime).getTime();
+ } catch (ParseException e) {
+ LOG.warn("Invalid input issued instant: " + instantTime);
+ return;
+ }
+ updateCommitMetrics(commitEpochTimeInMs, System.currentTimeMillis() -
commitEpochTimeInMs, metadata);
+ }
+
+ public void updateCommitMetrics(long commitEpochTimeInMs, long durationInMs,
HoodieCommitMetadata metadata) {
+ updateCommitTimingMetrics(commitEpochTimeInMs, durationInMs);
+ totalPartitionsWritten = metadata.fetchTotalPartitionsWritten();
+ totalFilesInsert = metadata.fetchTotalFilesInsert();
+ totalFilesUpdate = metadata.fetchTotalFilesUpdated();
+ totalRecordsWritten = metadata.fetchTotalRecordsWritten();
+ totalUpdateRecordsWritten = metadata.fetchTotalUpdateRecordsWritten();
+ totalInsertRecordsWritten = metadata.fetchTotalInsertRecordsWritten();
+ totalBytesWritten = metadata.fetchTotalBytesWritten();
+ totalScanTime = metadata.getTotalScanTime();
+ totalCompactedRecordsUpdated = metadata.getTotalCompactedRecordsUpdated();
+ totalLogFilesCompacted = metadata.getTotalLogFilesCompacted();
+ totalLogFilesSize = metadata.getTotalLogFilesSize();
+ }
+
+ private void updateCommitTimingMetrics(long commitEpochTimeInMs, long
durationInMs) {
+ this.commitEpochTimeInMs = commitEpochTimeInMs;
+ this.durationInMs = durationInMs;
+ }
+
+ protected String getMetricsName(String action, String metric) {
+ return String.format("%s.%s", action, metric);
+ }
+
+}
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/metrics/HoodieFlinkMetrics.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/metrics/HoodieFlinkMetrics.java
index a143010f278..ce58f35402a 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/metrics/HoodieFlinkMetrics.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/metrics/HoodieFlinkMetrics.java
@@ -22,18 +22,41 @@ import org.apache.flink.metrics.MetricGroup;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.HashMap;
+import java.util.Map;
+
/**
* Base class for flink read/write metrics.
*/
public abstract class HoodieFlinkMetrics {
+
private static final Logger LOG =
LoggerFactory.getLogger(HoodieFlinkMetrics.class);
+ protected Map<String, Long> timers;
protected final MetricGroup metricGroup;
protected HoodieFlinkMetrics(MetricGroup metricGroup) {
+ this.timers = new HashMap<>();
this.metricGroup = metricGroup;
}
public abstract void registerMetrics();
+ protected void startTimer(String name) {
+ if (timers.containsKey(name)) {
+ LOG.warn("Restarting timer for name: {}, override the value", name);
+ }
+ timers.put(name, System.currentTimeMillis());
+ }
+
+ protected long stopTimer(String name) {
+ if (!timers.containsKey(name)) {
+ LOG.warn("Cannot found name {} in timer, potentially caused by
inconsistent call", name);
+ return 0;
+ }
+ long costs = System.currentTimeMillis() - timers.get(name);
+ timers.remove(name);
+ return costs;
+ }
+
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactOperator.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactOperator.java
index 66743264457..fc034fcfc80 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactOperator.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactOperator.java
@@ -25,6 +25,7 @@ import org.apache.hudi.common.model.CompactionOperation;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.configuration.OptionsResolver;
+import org.apache.hudi.metrics.FlinkCompactionMetrics;
import org.apache.hudi.sink.utils.NonThrownExecutor;
import org.apache.hudi.table.HoodieFlinkCopyOnWriteTable;
import
org.apache.hudi.table.action.compact.HoodieFlinkMergeOnReadTableCompactor;
@@ -33,6 +34,7 @@ import org.apache.hudi.util.FlinkWriteClients;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.Output;
@@ -85,6 +87,11 @@ public class CompactOperator extends
TableStreamOperator<CompactionCommitEvent>
*/
private transient StreamRecordCollector<CompactionCommitEvent> collector;
+ /**
+ * Compaction metrics.
+ */
+ private transient FlinkCompactionMetrics compactionMetrics;
+
public CompactOperator(Configuration conf) {
this.conf = conf;
this.asyncCompaction = OptionsResolver.needsAsyncCompaction(conf);
@@ -103,6 +110,7 @@ public class CompactOperator extends
TableStreamOperator<CompactionCommitEvent>
this.executor = NonThrownExecutor.builder(LOG).build();
}
this.collector = new StreamRecordCollector<>(output);
+ registerMetrics();
}
@Override
@@ -127,6 +135,7 @@ public class CompactOperator extends
TableStreamOperator<CompactionCommitEvent>
CompactionOperation compactionOperation,
Collector<CompactionCommitEvent> collector,
HoodieWriteConfig writeConfig) throws IOException {
+ compactionMetrics.startCompaction();
HoodieFlinkMergeOnReadTableCompactor<?> compactor = new
HoodieFlinkMergeOnReadTableCompactor<>();
HoodieTableMetaClient metaClient =
writeClient.getHoodieTable().getMetaClient();
String maxInstantTime = compactor.getMaxInstantTime(metaClient);
@@ -140,6 +149,7 @@ public class CompactOperator extends
TableStreamOperator<CompactionCommitEvent>
compactionOperation,
instantTime, maxInstantTime,
writeClient.getHoodieTable().getTaskContextSupplier());
+ compactionMetrics.endCompaction();
collector.collect(new CompactionCommitEvent(instantTime,
compactionOperation.getFileId(), writeStatuses, taskID));
}
@@ -164,4 +174,10 @@ public class CompactOperator extends
TableStreamOperator<CompactionCommitEvent>
this.writeClient = null;
}
}
+
+ private void registerMetrics() {
+ MetricGroup metrics = getRuntimeContext().getMetricGroup();
+ compactionMetrics = new FlinkCompactionMetrics(metrics);
+ compactionMetrics.registerMetrics();
+ }
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionCommitSink.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionCommitSink.java
index 828aa3c4265..192b5f5a397 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionCommitSink.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionCommitSink.java
@@ -26,6 +26,7 @@ import org.apache.hudi.common.util.CompactionUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.metrics.FlinkCompactionMetrics;
import org.apache.hudi.sink.CleanFunction;
import org.apache.hudi.table.HoodieFlinkTable;
import org.apache.hudi.table.action.compact.CompactHelpers;
@@ -33,6 +34,7 @@ import org.apache.hudi.util.CompactionUtil;
import org.apache.hudi.util.FlinkWriteClients;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.MetricGroup;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -82,6 +84,11 @@ public class CompactionCommitSink extends
CleanFunction<CompactionCommitEvent> {
*/
private transient HoodieFlinkTable<?> table;
+ /**
+ * Compaction metrics.
+ */
+ private transient FlinkCompactionMetrics compactionMetrics;
+
public CompactionCommitSink(Configuration conf) {
super(conf);
this.conf = conf;
@@ -96,6 +103,7 @@ public class CompactionCommitSink extends
CleanFunction<CompactionCommitEvent> {
this.commitBuffer = new HashMap<>();
this.compactionPlanCache = new HashMap<>();
this.table = this.writeClient.getHoodieTable();
+ registerMetrics();
}
@Override
@@ -174,6 +182,8 @@ public class CompactionCommitSink extends
CleanFunction<CompactionCommitEvent> {
// commit the compaction
this.writeClient.commitCompaction(instant, metadata, Option.empty());
+ this.compactionMetrics.updateCommitMetrics(instant, metadata);
+
// Whether to clean up the old log file when compaction
if (!conf.getBoolean(FlinkOptions.CLEAN_ASYNC_ENABLED) && !isCleaning) {
this.writeClient.clean();
@@ -184,4 +194,10 @@ public class CompactionCommitSink extends
CleanFunction<CompactionCommitEvent> {
this.commitBuffer.remove(instant);
this.compactionPlanCache.remove(instant);
}
+
+ private void registerMetrics() {
+ MetricGroup metrics = getRuntimeContext().getMetricGroup();
+ compactionMetrics = new FlinkCompactionMetrics(metrics);
+ compactionMetrics.registerMetrics();
+ }
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionPlanOperator.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionPlanOperator.java
index d7446c9bfab..bb4ee0a34ac 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionPlanOperator.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionPlanOperator.java
@@ -24,6 +24,7 @@ import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.CompactionUtils;
import org.apache.hudi.common.util.Option;
+import org.apache.hudi.metrics.FlinkCompactionMetrics;
import org.apache.hudi.table.HoodieFlinkTable;
import org.apache.hudi.table.marker.WriteMarkersFactory;
import org.apache.hudi.util.CompactionUtil;
@@ -31,6 +32,7 @@ import org.apache.hudi.util.FlinkTables;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.BoundedOneInput;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
@@ -61,6 +63,8 @@ public class CompactionPlanOperator extends
AbstractStreamOperator<CompactionPla
@SuppressWarnings("rawtypes")
private transient HoodieFlinkTable table;
+ private transient FlinkCompactionMetrics compactionMetrics;
+
public CompactionPlanOperator(Configuration conf) {
this.conf = conf;
}
@@ -73,6 +77,7 @@ public class CompactionPlanOperator extends
AbstractStreamOperator<CompactionPla
// these instants are in priority for scheduling task because the
compaction instants are
// scheduled from earliest(FIFO sequence).
CompactionUtil.rollbackCompaction(table);
+ registerMetrics();
}
@Override
@@ -100,9 +105,15 @@ public class CompactionPlanOperator extends
AbstractStreamOperator<CompactionPla
}
private void scheduleCompaction(HoodieFlinkTable<?> table, long
checkpointId) throws IOException {
+ HoodieTimeline pendingCompactionTimeline =
table.getActiveTimeline().filterPendingCompactionTimeline();
+
// the first instant takes the highest priority.
- Option<HoodieInstant> firstRequested =
table.getActiveTimeline().filterPendingCompactionTimeline()
+ Option<HoodieInstant> firstRequested = pendingCompactionTimeline
.filter(instant -> instant.getState() ==
HoodieInstant.State.REQUESTED).firstInstant();
+ // record metrics
+ compactionMetrics.setFirstPendingCompactionInstant(firstRequested);
+
compactionMetrics.setPendingCompactionCount(pendingCompactionTimeline.countInstants());
+
if (!firstRequested.isPresent()) {
// do nothing.
LOG.info("No compaction plan for checkpoint " + checkpointId);
@@ -148,4 +159,10 @@ public class CompactionPlanOperator extends
AbstractStreamOperator<CompactionPla
// Called when the input data ends, only used in batch mode.
notifyCheckpointComplete(-1);
}
+
+ private void registerMetrics() {
+ MetricGroup metrics = getRuntimeContext().getMetricGroup();
+ compactionMetrics = new FlinkCompactionMetrics(metrics);
+ compactionMetrics.registerMetrics();
+ }
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/CompactFunctionWrapper.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/CompactFunctionWrapper.java
index 78a8305c9c5..b042139aee4 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/CompactFunctionWrapper.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/CompactFunctionWrapper.java
@@ -55,6 +55,10 @@ public class CompactFunctionWrapper {
* Function that generates the {@link HoodieCompactionPlan}.
*/
private CompactionPlanOperator compactionPlanOperator;
+ /**
+ * Output to collect the compaction plan events.
+ */
+ private CollectorOutput<CompactionPlanEvent> planEventOutput;
/**
* Output to collect the compaction commit events.
*/
@@ -83,6 +87,8 @@ public class CompactFunctionWrapper {
public void openFunction() throws Exception {
compactionPlanOperator = new CompactionPlanOperator(conf);
+ planEventOutput = new CollectorOutput<>();
+ compactionPlanOperator.setup(streamTask, streamConfig, planEventOutput);
compactionPlanOperator.open();
compactOperator = new CompactOperator(conf);
@@ -102,11 +108,10 @@ public class CompactFunctionWrapper {
public void compact(long checkpointID) throws Exception {
// collect the CompactEvents.
- CollectorOutput<CompactionPlanEvent> output = new CollectorOutput<>();
- compactionPlanOperator.setOutput(output);
+ compactionPlanOperator.setOutput(planEventOutput);
compactionPlanOperator.notifyCheckpointComplete(checkpointID);
// collect the CompactCommitEvents
- for (CompactionPlanEvent event : output.getRecords()) {
+ for (CompactionPlanEvent event : planEventOutput.getRecords()) {
compactOperator.processElement(new StreamRecord<>(event));
}
// handle and commit the compaction