This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch release-1.12
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.12 by this push:
     new 8a9b632  [FLINK-20213][fs-connector] Partition commit is delayed when 
records keep coming
8a9b632 is described below

commit 8a9b632dfc41e20b35c85b357f9ae5cfc2f70fff
Author: Jingsong Lee <[email protected]>
AuthorDate: Tue Dec 22 10:13:23 2020 +0800

    [FLINK-20213][fs-connector] Partition commit is delayed when records keep 
coming
    
    This closes #14442
---
 .../filesystem/stream/AbstractStreamingWriter.java |  6 ++++
 .../filesystem/stream/StreamingFileWriter.java     | 35 ++++++++++++++++++----
 .../stream/compact/CompactFileWriter.java          |  4 +++
 .../filesystem/stream/StreamingFileWriterTest.java | 25 ++++++++++++++++
 4 files changed, 65 insertions(+), 5 deletions(-)

diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/stream/AbstractStreamingWriter.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/stream/AbstractStreamingWriter.java
index b269aaa..254c16c 100644
--- 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/stream/AbstractStreamingWriter.java
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/stream/AbstractStreamingWriter.java
@@ -67,6 +67,11 @@ public abstract class AbstractStreamingWriter<IN, OUT> 
extends AbstractStreamOpe
        }
 
        /**
+        * Notifies a partition created.
+        */
+       protected abstract void partitionCreated(String partition);
+
+       /**
         * Notifies a partition become inactive. A partition becomes inactive 
after all
         * the records received so far have been committed.
         */
@@ -97,6 +102,7 @@ public abstract class AbstractStreamingWriter<IN, OUT> 
extends AbstractStreamOpe
 
                        @Override
                        public void bucketCreated(Bucket<IN, String> bucket) {
+                               
AbstractStreamingWriter.this.partitionCreated(bucket.getBucketId());
                        }
 
                        @Override
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/stream/StreamingFileWriter.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/stream/StreamingFileWriter.java
index 2f9d42b..4b139d7 100644
--- 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/stream/StreamingFileWriter.java
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/stream/StreamingFileWriter.java
@@ -20,12 +20,15 @@ package org.apache.flink.table.filesystem.stream;
 
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateSnapshotContext;
 import 
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 
 import java.util.ArrayList;
 import java.util.HashSet;
+import java.util.NavigableMap;
 import java.util.Set;
+import java.util.TreeMap;
 
 /**
  * Writer for emitting {@link PartitionCommitInfo} to downstream.
@@ -34,7 +37,9 @@ public class StreamingFileWriter<IN> extends 
AbstractStreamingWriter<IN, Partiti
 
        private static final long serialVersionUID = 2L;
 
-       private transient Set<String> inactivePartitions;
+       private transient Set<String> currentNewPartitions;
+       private transient TreeMap<Long, Set<String>> newPartitions;
+       private transient Set<String> committablePartitions;
 
        public StreamingFileWriter(
                        long bucketCheckInterval,
@@ -45,13 +50,20 @@ public class StreamingFileWriter<IN> extends 
AbstractStreamingWriter<IN, Partiti
 
        @Override
        public void initializeState(StateInitializationContext context) throws 
Exception {
-               inactivePartitions = new HashSet<>();
+               currentNewPartitions = new HashSet<>();
+               newPartitions = new TreeMap<>();
+               committablePartitions = new HashSet<>();
                super.initializeState(context);
        }
 
        @Override
+       protected void partitionCreated(String partition) {
+               currentNewPartitions.add(partition);
+       }
+
+       @Override
        protected void partitionInactive(String partition) {
-               inactivePartitions.add(partition);
+               committablePartitions.add(partition);
        }
 
        @Override
@@ -59,13 +71,26 @@ public class StreamingFileWriter<IN> extends 
AbstractStreamingWriter<IN, Partiti
        }
 
        @Override
+       public void snapshotState(StateSnapshotContext context) throws 
Exception {
+               super.snapshotState(context);
+               newPartitions.put(context.getCheckpointId(), new 
HashSet<>(currentNewPartitions));
+               currentNewPartitions.clear();
+       }
+
+       @Override
        protected void commitUpToCheckpoint(long checkpointId) throws Exception 
{
                super.commitUpToCheckpoint(checkpointId);
+
+               NavigableMap<Long, Set<String>> headPartitions = 
this.newPartitions.headMap(checkpointId, true);
+               Set<String> partitions = new HashSet<>(committablePartitions);
+               committablePartitions.clear();
+               headPartitions.values().forEach(partitions::addAll);
+               headPartitions.clear();
+
                output.collect(new StreamRecord<>(new PartitionCommitInfo(
                                checkpointId,
                                getRuntimeContext().getIndexOfThisSubtask(),
                                
getRuntimeContext().getNumberOfParallelSubtasks(),
-                               new ArrayList<>(inactivePartitions))));
-               inactivePartitions.clear();
+                               new ArrayList<>(partitions))));
        }
 }
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/stream/compact/CompactFileWriter.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/stream/compact/CompactFileWriter.java
index 319b9b0..da080e7 100644
--- 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/stream/compact/CompactFileWriter.java
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/stream/compact/CompactFileWriter.java
@@ -40,6 +40,10 @@ public class CompactFileWriter<T> extends 
AbstractStreamingWriter<T, CompactMess
        }
 
        @Override
+       protected void partitionCreated(String partition) {
+       }
+
+       @Override
        protected void partitionInactive(String partition) {
        }
 
diff --git 
a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/filesystem/stream/StreamingFileWriterTest.java
 
b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/filesystem/stream/StreamingFileWriterTest.java
index 5f3c2cf..8a358e5 100644
--- 
a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/filesystem/stream/StreamingFileWriterTest.java
+++ 
b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/filesystem/stream/StreamingFileWriterTest.java
@@ -125,6 +125,31 @@ public class StreamingFileWriterTest {
                }
        }
 
+       @Test
+       public void testCommitImmediately() throws Exception {
+               try (OneInputStreamOperatorTestHarness<RowData, 
PartitionCommitInfo> harness = create()) {
+                       harness.setup();
+                       harness.initializeEmptyState();
+                       harness.open();
+
+                       harness.processElement(row("1"), 0);
+                       harness.processElement(row("2"), 0);
+                       harness.processElement(row("2"), 0);
+
+                       harness.snapshot(1, 1);
+
+                       // repeat partition 1
+                       harness.processElement(row("1"), 0);
+
+                       harness.processElement(row("3"), 0);
+                       harness.processElement(row("4"), 0);
+
+                       harness.notifyOfCompletedCheckpoint(1);
+                       List<String> partitions = collect(harness);
+                       Assert.assertEquals(Arrays.asList("1", "2"), 
partitions);
+               }
+       }
+
        private static RowData row(String s) {
                return GenericRowData.of(StringData.fromString(s));
        }

Reply via email to