This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch release-1.12
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.12 by this push:
new 8a9b632 [FLINK-20213][fs-connector] Partition commit is delayed when
records keep coming
8a9b632 is described below
commit 8a9b632dfc41e20b35c85b357f9ae5cfc2f70fff
Author: Jingsong Lee <[email protected]>
AuthorDate: Tue Dec 22 10:13:23 2020 +0800
[FLINK-20213][fs-connector] Partition commit is delayed when records keep
coming
This closes #14442
---
.../filesystem/stream/AbstractStreamingWriter.java | 6 ++++
.../filesystem/stream/StreamingFileWriter.java | 35 ++++++++++++++++++----
.../stream/compact/CompactFileWriter.java | 4 +++
.../filesystem/stream/StreamingFileWriterTest.java | 25 ++++++++++++++++
4 files changed, 65 insertions(+), 5 deletions(-)
diff --git
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/stream/AbstractStreamingWriter.java
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/stream/AbstractStreamingWriter.java
index b269aaa..254c16c 100644
---
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/stream/AbstractStreamingWriter.java
+++
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/stream/AbstractStreamingWriter.java
@@ -67,6 +67,11 @@ public abstract class AbstractStreamingWriter<IN, OUT>
extends AbstractStreamOpe
}
/**
+ * Notifies a partition created.
+ */
+ protected abstract void partitionCreated(String partition);
+
+ /**
* Notifies a partition become inactive. A partition becomes inactive
after all
* the records received so far have been committed.
*/
@@ -97,6 +102,7 @@ public abstract class AbstractStreamingWriter<IN, OUT>
extends AbstractStreamOpe
@Override
public void bucketCreated(Bucket<IN, String> bucket) {
+
AbstractStreamingWriter.this.partitionCreated(bucket.getBucketId());
}
@Override
diff --git
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/stream/StreamingFileWriter.java
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/stream/StreamingFileWriter.java
index 2f9d42b..4b139d7 100644
---
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/stream/StreamingFileWriter.java
+++
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/stream/StreamingFileWriter.java
@@ -20,12 +20,15 @@ package org.apache.flink.table.filesystem.stream;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateSnapshotContext;
import
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import java.util.ArrayList;
import java.util.HashSet;
+import java.util.NavigableMap;
import java.util.Set;
+import java.util.TreeMap;
/**
* Writer for emitting {@link PartitionCommitInfo} to downstream.
@@ -34,7 +37,9 @@ public class StreamingFileWriter<IN> extends
AbstractStreamingWriter<IN, Partiti
private static final long serialVersionUID = 2L;
- private transient Set<String> inactivePartitions;
+ private transient Set<String> currentNewPartitions;
+ private transient TreeMap<Long, Set<String>> newPartitions;
+ private transient Set<String> committablePartitions;
public StreamingFileWriter(
long bucketCheckInterval,
@@ -45,13 +50,20 @@ public class StreamingFileWriter<IN> extends
AbstractStreamingWriter<IN, Partiti
@Override
public void initializeState(StateInitializationContext context) throws
Exception {
- inactivePartitions = new HashSet<>();
+ currentNewPartitions = new HashSet<>();
+ newPartitions = new TreeMap<>();
+ committablePartitions = new HashSet<>();
super.initializeState(context);
}
@Override
+ protected void partitionCreated(String partition) {
+ currentNewPartitions.add(partition);
+ }
+
+ @Override
protected void partitionInactive(String partition) {
- inactivePartitions.add(partition);
+ committablePartitions.add(partition);
}
@Override
@@ -59,13 +71,26 @@ public class StreamingFileWriter<IN> extends
AbstractStreamingWriter<IN, Partiti
}
@Override
+ public void snapshotState(StateSnapshotContext context) throws
Exception {
+ super.snapshotState(context);
+ newPartitions.put(context.getCheckpointId(), new
HashSet<>(currentNewPartitions));
+ currentNewPartitions.clear();
+ }
+
+ @Override
protected void commitUpToCheckpoint(long checkpointId) throws Exception
{
super.commitUpToCheckpoint(checkpointId);
+
+ NavigableMap<Long, Set<String>> headPartitions =
this.newPartitions.headMap(checkpointId, true);
+ Set<String> partitions = new HashSet<>(committablePartitions);
+ committablePartitions.clear();
+ headPartitions.values().forEach(partitions::addAll);
+ headPartitions.clear();
+
output.collect(new StreamRecord<>(new PartitionCommitInfo(
checkpointId,
getRuntimeContext().getIndexOfThisSubtask(),
getRuntimeContext().getNumberOfParallelSubtasks(),
- new ArrayList<>(inactivePartitions))));
- inactivePartitions.clear();
+ new ArrayList<>(partitions))));
}
}
diff --git
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/stream/compact/CompactFileWriter.java
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/stream/compact/CompactFileWriter.java
index 319b9b0..da080e7 100644
---
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/stream/compact/CompactFileWriter.java
+++
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/stream/compact/CompactFileWriter.java
@@ -40,6 +40,10 @@ public class CompactFileWriter<T> extends
AbstractStreamingWriter<T, CompactMess
}
@Override
+ protected void partitionCreated(String partition) {
+ }
+
+ @Override
protected void partitionInactive(String partition) {
}
diff --git
a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/filesystem/stream/StreamingFileWriterTest.java
b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/filesystem/stream/StreamingFileWriterTest.java
index 5f3c2cf..8a358e5 100644
---
a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/filesystem/stream/StreamingFileWriterTest.java
+++
b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/filesystem/stream/StreamingFileWriterTest.java
@@ -125,6 +125,31 @@ public class StreamingFileWriterTest {
}
}
+ @Test
+ public void testCommitImmediately() throws Exception {
+ try (OneInputStreamOperatorTestHarness<RowData,
PartitionCommitInfo> harness = create()) {
+ harness.setup();
+ harness.initializeEmptyState();
+ harness.open();
+
+ harness.processElement(row("1"), 0);
+ harness.processElement(row("2"), 0);
+ harness.processElement(row("2"), 0);
+
+ harness.snapshot(1, 1);
+
+ // repeat partition 1
+ harness.processElement(row("1"), 0);
+
+ harness.processElement(row("3"), 0);
+ harness.processElement(row("4"), 0);
+
+ harness.notifyOfCompletedCheckpoint(1);
+ List<String> partitions = collect(harness);
+ Assert.assertEquals(Arrays.asList("1", "2"),
partitions);
+ }
+ }
+
private static RowData row(String s) {
return GenericRowData.of(StringData.fromString(s));
}