This is an automated email from the ASF dual-hosted git repository.
gaoyunhaii pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 155f17d [FLINK-26564][connectors/filesystem] Fix the bug that
CompactCoordinatorStateHandler doesn't properly handle the cleanup-in-progress
requests.
155f17d is described below
commit 155f17d62952fff5f5b583256f959f5fb63d1a8e
Author: Gen Luo <[email protected]>
AuthorDate: Thu Mar 10 11:37:58 2022 +0800
[FLINK-26564][connectors/filesystem] Fix the bug that
CompactCoordinatorStateHandler doesn't properly handle the cleanup-in-progress
requests.
This closes #19032.
---
.../compactor/operator/CompactCoordinator.java | 2 +-
.../operator/CompactCoordinatorStateHandler.java | 15 +++++++++++++-
.../sink/compactor/operator/CompactorRequest.java | 3 +++
.../sink/compactor/CompactCoordinatorTest.java | 23 ++++++++++++++--------
4 files changed, 33 insertions(+), 10 deletions(-)
diff --git
a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/operator/CompactCoordinator.java
b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/operator/CompactCoordinator.java
index 23437a5..8509423 100644
---
a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/operator/CompactCoordinator.java
+++
b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/operator/CompactCoordinator.java
@@ -209,7 +209,7 @@ public class CompactCoordinator extends
AbstractStreamOperator<CompactorRequest>
PASS_THROUGH
}
- private static class CompactTrigger {
+ static class CompactTrigger {
private final long threshold;
private final int numCheckpointsBeforeCompaction;
diff --git
a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/operator/CompactCoordinatorStateHandler.java
b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/operator/CompactCoordinatorStateHandler.java
index 2b147ae..1e4d0f9 100644
---
a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/operator/CompactCoordinatorStateHandler.java
+++
b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/operator/CompactCoordinatorStateHandler.java
@@ -21,6 +21,9 @@ package
org.apache.flink.connector.file.sink.compactor.operator;
import org.apache.flink.api.common.state.CheckpointListener;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.connector.file.sink.FileSinkCommittable;
+import org.apache.flink.connector.file.sink.compactor.FileCompactStrategy;
+import
org.apache.flink.connector.file.sink.compactor.operator.CompactCoordinator.CompactTrigger;
+import
org.apache.flink.connector.file.sink.compactor.operator.CompactCoordinator.CompactTriggerResult;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.streaming.api.connector.sink2.CommittableMessage;
@@ -70,6 +73,11 @@ public class CompactCoordinatorStateHandler
.getListState(REMAINING_COMMITTABLE_RAW_STATES_DESC),
committableSerializer);
+ // A default trigger to judge whether a pending file should be
compacted or passed through
+ CompactTrigger trigger =
+ new CompactTrigger(
+
FileCompactStrategy.Builder.newBuilder().setSizeThreshold(0).build());
+
Iterable<FileSinkCommittable> stateRemaining =
remainingCommittableState.get();
if (stateRemaining != null) {
for (FileSinkCommittable committable : stateRemaining) {
@@ -77,7 +85,12 @@ public class CompactCoordinatorStateHandler
// compacting is not available now
String bucketId = committable.getBucketId();
CompactorRequest request = new CompactorRequest(bucketId);
- request.addToCompact(committable);
+ if (committable.hasPendingFile()
+ && trigger.onElement(committable) !=
CompactTriggerResult.PASS_THROUGH) {
+ request.addToCompact(committable);
+ } else {
+ request.addToPassthrough(committable);
+ }
output.collect(new StreamRecord<>(Either.Right(request)));
}
}
diff --git
a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/operator/CompactorRequest.java
b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/operator/CompactorRequest.java
index 86bc78f..483468a 100644
---
a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/operator/CompactorRequest.java
+++
b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/operator/CompactorRequest.java
@@ -26,6 +26,8 @@ import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
+import static org.apache.flink.util.Preconditions.checkState;
+
/** Request of file compacting for {@link FileSink}. */
@Internal
public class CompactorRequest implements Serializable {
@@ -49,6 +51,7 @@ public class CompactorRequest implements Serializable {
}
public void addToCompact(FileSinkCommittable committable) {
+ checkState(committable.hasPendingFile());
committableToCompact.add(committable);
}
diff --git
a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/compactor/CompactCoordinatorTest.java
b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/compactor/CompactCoordinatorTest.java
index 0fbfb9b..9a07683 100644
---
a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/compactor/CompactCoordinatorTest.java
+++
b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/compactor/CompactCoordinatorTest.java
@@ -341,6 +341,8 @@ public class CompactCoordinatorTest extends
AbstractCompactTestBase {
// without . prefix
FileSinkCommittable committable2 = committable("0", "2", 6);
+ FileSinkCommittable cleanup3 = cleanupInprogress("0", "3", 7);
+
OperatorSubtaskState state;
try (OneInputStreamOperatorTestHarness<
CommittableMessage<FileSinkCommittable>,
CompactorRequest>
@@ -351,6 +353,9 @@ public class CompactCoordinatorTest extends
AbstractCompactTestBase {
harness.processElement(message(committable0));
Assert.assertEquals(0, harness.extractOutputValues().size());
+ harness.processElement(message(cleanup3));
+ Assert.assertEquals(0, harness.extractOutputValues().size());
+
harness.prepareSnapshotPreBarrier(1);
state = harness.snapshot(1, 1);
}
@@ -374,34 +379,36 @@ public class CompactCoordinatorTest extends
AbstractCompactTestBase {
harness.initializeState(state);
harness.open();
- Assert.assertEquals(1, harness.extractOutputValues().size());
+ Assert.assertEquals(2, harness.extractOutputValues().size());
harness.processElement(message(committable1));
harness.processElement(message(committable2));
List<Either<CommittableMessage<FileSinkCommittable>,
CompactorRequest>> results =
harness.extractOutputValues();
- Assert.assertEquals(3, results.size());
+ Assert.assertEquals(4, results.size());
// restored request
Assert.assertTrue(results.get(0).isRight());
assertToCompact(results.get(0).right(), committable0);
+ assertToPassthrough(results.get(1).right(), cleanup3);
+
// committable with . prefix should also be passed through
Assert.assertTrue(
- results.get(1).isLeft()
- && results.get(1).left() instanceof
CommittableWithLineage);
+ results.get(2).isLeft()
+ && results.get(2).left() instanceof
CommittableWithLineage);
Assert.assertEquals(
- ((CommittableWithLineage<FileSinkCommittable>)
results.get(1).left())
+ ((CommittableWithLineage<FileSinkCommittable>)
results.get(2).left())
.getCommittable(),
committable1);
// committable without . prefix should be passed through normally
Assert.assertTrue(
- results.get(2).isLeft()
- && results.get(2).left() instanceof
CommittableWithLineage);
+ results.get(3).isLeft()
+ && results.get(3).left() instanceof
CommittableWithLineage);
Assert.assertEquals(
- ((CommittableWithLineage<FileSinkCommittable>)
results.get(2).left())
+ ((CommittableWithLineage<FileSinkCommittable>)
results.get(3).left())
.getCommittable(),
committable2);
}