This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git

commit 1370e4b189de5b059caf0193ad033db6195b2767
Author: Igal Shilman <igalshil...@gmail.com>
AuthorDate: Wed Oct 21 14:32:03 2020 +0200

    [FLINK-19692][core] Expose the list of assigned key groups
    
    This commit exposes the list of key groups that can be written
    into the raw keyed stream.
---
 .../core/logger/CheckpointedStreamOperations.java    |  2 ++
 .../flink/statefun/flink/core/logger/Loggers.java    |  5 +++++
 .../core/logger/UnboundedFeedbackLoggerTest.java     | 20 +++++++++++++++++---
 3 files changed, 24 insertions(+), 3 deletions(-)

diff --git 
a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/logger/CheckpointedStreamOperations.java
 
b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/logger/CheckpointedStreamOperations.java
index 2540934..82f79c6 100644
--- 
a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/logger/CheckpointedStreamOperations.java
+++ 
b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/logger/CheckpointedStreamOperations.java
@@ -25,6 +25,8 @@ public interface CheckpointedStreamOperations {
 
   void requireKeyedStateCheckpointed(OutputStream 
keyedStateCheckpointOutputStream);
 
+  Iterable<Integer> keyGroupList(OutputStream stream);
+
   void startNewKeyGroup(OutputStream stream, int keyGroup) throws IOException;
 
   Closeable acquireLease(OutputStream keyedStateCheckpointOutputStream);
diff --git 
a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/logger/Loggers.java
 
b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/logger/Loggers.java
index aee7536..d0c8db6 100644
--- 
a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/logger/Loggers.java
+++ 
b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/logger/Loggers.java
@@ -87,6 +87,11 @@ public final class Loggers {
     }
 
     @Override
+    public Iterable<Integer> keyGroupList(OutputStream stream) {
+      return cast(stream).getKeyGroupList();
+    }
+
+    @Override
     public void startNewKeyGroup(OutputStream stream, int keyGroup) throws 
IOException {
       cast(stream).startNewKeyGroup(keyGroup);
     }
diff --git 
a/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/logger/UnboundedFeedbackLoggerTest.java
 
b/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/logger/UnboundedFeedbackLoggerTest.java
index dd7088f..08600ef 100644
--- 
a/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/logger/UnboundedFeedbackLoggerTest.java
+++ 
b/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/logger/UnboundedFeedbackLoggerTest.java
@@ -23,6 +23,7 @@ import static org.junit.Assert.assertThat;
 import java.io.*;
 import java.util.ArrayList;
 import java.util.function.Function;
+import java.util.stream.IntStream;
 import org.apache.flink.api.common.typeutils.base.IntSerializer;
 import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
 import org.apache.flink.statefun.flink.core.di.ObjectContainer;
@@ -114,12 +115,19 @@ public class UnboundedFeedbackLoggerTest {
         Loggers.unboundedSpillableLoggerContainer(
             IO_MANAGER, maxParallelism, totalMemory, IntSerializer.INSTANCE, 
Function.identity());
 
-    container.add("checkpoint-stream-ops", CheckpointedStreamOperations.class, 
NOOP.INSTANCE);
+    container.add(
+        "checkpoint-stream-ops",
+        CheckpointedStreamOperations.class,
+        new NoopStreamOps(maxParallelism));
     return container.get(UnboundedFeedbackLoggerFactory.class).create();
   }
 
-  enum NOOP implements CheckpointedStreamOperations {
-    INSTANCE;
+  static final class NoopStreamOps implements CheckpointedStreamOperations {
+    private final int maxParallelism;
+
+    NoopStreamOps(int maxParallelism) {
+      this.maxParallelism = maxParallelism;
+    }
 
     @Override
     public void requireKeyedStateCheckpointed(OutputStream 
keyedStateCheckpointOutputStream) {
@@ -127,6 +135,12 @@ public class UnboundedFeedbackLoggerTest {
     }
 
     @Override
+    public Iterable<Integer> keyGroupList(OutputStream stream) {
+      IntStream range = IntStream.range(0, maxParallelism);
+      return range::iterator;
+    }
+
+    @Override
     public void startNewKeyGroup(OutputStream stream, int keyGroup) {}
 
     @Override

Reply via email to