This is an automated email from the ASF dual-hosted git repository. tzulitai pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink-statefun.git
commit b3f6b844e9f9b568b7c94f68b7cfdf758e8e66dc Author: Igal Shilman <[email protected]> AuthorDate: Tue May 5 21:32:07 2020 +0200 [FLINK-17533] Expose the UnboundedFeedbackLoggerFactory --- .../flink/statefun/flink/core/logger/Loggers.java | 19 +++++++++++++++++-- .../flink/core/logger/UnboundedFeedbackLogger.java | 11 ++++------- .../core/logger/UnboundedFeedbackLoggerTest.java | 2 +- 3 files changed, 22 insertions(+), 10 deletions(-) diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/logger/Loggers.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/logger/Loggers.java index 62720bf..948a808 100644 --- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/logger/Loggers.java +++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/logger/Loggers.java @@ -43,10 +43,25 @@ public final class Loggers { TypeSerializer<?> serializer, Function<?, ?> keySelector) { + UnboundedFeedbackLoggerFactory<?> factory = + unboundedSpillableLoggerFactory( + ioManager, maxParallelism, inMemoryMaxBufferSize, serializer, keySelector); + + return factory.create(); + } + + public static UnboundedFeedbackLoggerFactory<?> unboundedSpillableLoggerFactory( + IOManager ioManager, + int maxParallelism, + long inMemoryMaxBufferSize, + TypeSerializer<?> serializer, + Function<?, ?> keySelector) { + ObjectContainer container = unboundedSpillableLoggerContainer( ioManager, maxParallelism, inMemoryMaxBufferSize, serializer, keySelector); - return container.get(UnboundedFeedbackLogger.class); + + return container.get(UnboundedFeedbackLoggerFactory.class); } /** Wires the required dependencies to construct an {@link UnboundedFeedbackLogger}. */ @@ -70,7 +85,7 @@ public final class Loggers { "checkpoint-stream-ops", CheckpointedStreamOperations.class, KeyedStateCheckpointOutputStreamOps.INSTANCE); - container.add(UnboundedFeedbackLogger.class); + container.add(UnboundedFeedbackLoggerFactory.class); return container; } diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/logger/UnboundedFeedbackLogger.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/logger/UnboundedFeedbackLogger.java index 8ddc22e..ef0360a 100644 --- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/logger/UnboundedFeedbackLogger.java +++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/logger/UnboundedFeedbackLogger.java @@ -35,8 +35,6 @@ import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.core.memory.DataInputViewStreamWrapper; import org.apache.flink.core.memory.DataOutputView; import org.apache.flink.core.memory.DataOutputViewStreamWrapper; -import org.apache.flink.statefun.flink.core.di.Inject; -import org.apache.flink.statefun.flink.core.di.Label; import org.apache.flink.statefun.flink.core.feedback.FeedbackConsumer; import org.apache.flink.util.IOUtils; @@ -50,12 +48,11 @@ public final class UnboundedFeedbackLogger<T> implements Closeable { private TypeSerializer<T> serializer; private Closeable snapshotLease; - @Inject public UnboundedFeedbackLogger( - @Label("key-group-supplier") Supplier<KeyGroupStream<T>> supplier, - @Label("key-group-assigner") ToIntFunction<T> keyGroupAssigner, - @Label("checkpoint-stream-ops") CheckpointedStreamOperations ops, - @Label("envelope-serializer") TypeSerializer<T> serializer) { + Supplier<KeyGroupStream<T>> supplier, + ToIntFunction<T> keyGroupAssigner, + CheckpointedStreamOperations ops, + TypeSerializer<T> serializer) { this.supplier = Objects.requireNonNull(supplier); this.keyGroupAssigner = Objects.requireNonNull(keyGroupAssigner); this.serializer = Objects.requireNonNull(serializer); diff --git a/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/logger/UnboundedFeedbackLoggerTest.java b/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/logger/UnboundedFeedbackLoggerTest.java index 8b66b21..75c4c55 100644 --- a/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/logger/UnboundedFeedbackLoggerTest.java +++ b/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/logger/UnboundedFeedbackLoggerTest.java @@ -115,7 +115,7 @@ public class UnboundedFeedbackLoggerTest { IO_MANAGER, maxParallelism, totalMemory, IntSerializer.INSTANCE, Function.identity()); container.add("checkpoint-stream-ops", CheckpointedStreamOperations.class, NOOP.INSTANCE); - return container.get(UnboundedFeedbackLogger.class); + return container.get(UnboundedFeedbackLoggerFactory.class).create(); } enum NOOP implements CheckpointedStreamOperations {
