Github user zhijiangW commented on a diff in the pull request:
https://github.com/apache/flink/pull/5400#discussion_r166175837
--- Diff:
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
---
@@ -131,10 +131,14 @@ public StreamInputProcessor(
long maxAlign =
taskManagerConfig.getLong(TaskManagerOptions.TASK_CHECKPOINT_ALIGNMENT_BYTES_LIMIT);
if (!(maxAlign == -1 || maxAlign > 0)) {
throw new IllegalConfigurationException(
-
TaskManagerOptions.TASK_CHECKPOINT_ALIGNMENT_BYTES_LIMIT.key()
- + " must be positive or -1
(infinite)");
+
TaskManagerOptions.TASK_CHECKPOINT_ALIGNMENT_BYTES_LIMIT.key()
--- End diff --
I think we can change the current `CheckpointBarrierHandler` interface into
abstract class and then add a `createBarrierHanlder` method for extracting the
common parts in `StreamInputProcessor` and `StreamTwoInputProcessor`. Or we
define a new class for the common method. I prefer the first way.
What do you think?
---