This is an automated email from the ASF dual-hosted git repository.
arvid pushed a commit to branch release-1.12
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.12 by this push:
new a4fc365 [FLINK-20433][tests] Stabilizing UnalignedCheckpointITCase.
a4fc365 is described below
commit a4fc365af1a08be74b2ae12b65f975600008d6a8
Author: Arvid Heise <[email protected]>
AuthorDate: Wed Dec 9 14:34:34 2020 +0100
[FLINK-20433][tests] Stabilizing UnalignedCheckpointITCase.
Improves UnalignedCheckpointITCase in the following ways to avoid running
into rare issues or better deal with them:
- Reduce load on test machine: Executing a test with p=10 may spawn up to
70 tasks that until backpressured can potentially lead to a full load on 70
cores. This may causes larger GC pauses and other JVM freezes that will trigger
the rare PartitionNotFound exception. Now, sources are throttled until the sink
backpressures.
- Keep track of restart attempts in split enumerator and sink with source
instances. Only finish sources if the desired numbers of failures occurred to
avoid finishing too quickly.
- Avoid relying completely on notifyCheckpointComplete to finish test:
notifyCheckpointComplete is not guaranteed to be called but the test completely
relied on it. This may lead to indefinite test runs: Some sources are finished
while others are still running, but new checkpoints are canceled because of the
finished sources. Thus, too many aborted checkpoints will also lead to a
completed test.
- Readding splits to enumerator (after FLINK-20290 has been fixed). Any
unexpected failure may have caused all splits to be dropped, which causes
indefinite running tests.
- Removing test-class level timeout - AZP has its own timeout that also
provides thread dumps - something that would require a tremendous effort in
JUnit4.
---
.../checkpointing/UnalignedCheckpointITCase.java | 38 +++--
.../checkpointing/UnalignedCheckpointTestBase.java | 181 ++++++++++++++-------
2 files changed, 145 insertions(+), 74 deletions(-)
diff --git
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointITCase.java
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointITCase.java
index f555249..da168a7 100644
---
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointITCase.java
+++
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointITCase.java
@@ -19,7 +19,6 @@
package org.apache.flink.test.checkpointing;
-import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.Partitioner;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.state.ListState;
@@ -45,8 +44,8 @@ import org.junit.runners.Parameterized;
import java.util.Arrays;
import java.util.BitSet;
-import java.util.Random;
+import static
org.apache.flink.api.common.eventtime.WatermarkStrategy.noWatermarks;
import static
org.apache.flink.shaded.guava18.com.google.common.collect.Iterables.getOnlyElement;
/**
@@ -159,9 +158,12 @@ public class UnalignedCheckpointITCase extends
UnalignedCheckpointTestBase {
execute(settings);
}
- private static void createPipeline(StreamExecutionEnvironment env, long
minCheckpoints, boolean slotSharing) {
+ private static void createPipeline(StreamExecutionEnvironment env, long
minCheckpoints, boolean slotSharing, int expectedRestarts) {
final int parallelism = env.getParallelism();
- final SingleOutputStreamOperator<Long> stream =
env.fromSource(new LongSource(minCheckpoints, parallelism),
WatermarkStrategy.noWatermarks(), "source")
+ final SingleOutputStreamOperator<Long> stream = env.fromSource(
+ new LongSource(minCheckpoints, parallelism,
expectedRestarts),
+ noWatermarks(),
+ "source")
.slotSharingGroup(slotSharing ? "default" : "source")
.disableChaining()
.map(i -> i).name("forward").uid("forward")
@@ -172,27 +174,36 @@ public class UnalignedCheckpointITCase extends
UnalignedCheckpointTestBase {
addFailingPipeline(minCheckpoints, slotSharing, stream);
}
- private static void
createMultipleInputTopology(StreamExecutionEnvironment env, long
minCheckpoints, boolean slotSharing) {
+ private static void
createMultipleInputTopology(StreamExecutionEnvironment env, long
minCheckpoints, boolean slotSharing, int expectedRestarts) {
final int parallelism = env.getParallelism();
DataStream<Long> combinedSource = null;
for (int inputIndex = 0; inputIndex < 4; inputIndex++) {
- final SingleOutputStreamOperator<Long> source =
env.fromSource(new LongSource(minCheckpoints, parallelism),
WatermarkStrategy.noWatermarks(),
- "source" + inputIndex)
+ final SingleOutputStreamOperator<Long> source =
env.fromSource(
+ new LongSource(minCheckpoints,
parallelism, expectedRestarts),
+ noWatermarks(),
+ "source" + inputIndex)
.slotSharingGroup(slotSharing ? "default" :
("source" + inputIndex))
.disableChaining();
- combinedSource = combinedSource == null ? source :
combinedSource.connect(source).flatMap(new MinEmittingFunction());
+ combinedSource = combinedSource == null ?
+ source :
+ combinedSource.connect(source)
+ .flatMap(new MinEmittingFunction())
+ .name("min" + inputIndex).uid("min" +
inputIndex)
+ .slotSharingGroup(slotSharing ?
"default" : ("min" + inputIndex));
}
addFailingPipeline(minCheckpoints, slotSharing, combinedSource);
}
- private static void createUnionTopology(StreamExecutionEnvironment env,
long minCheckpoints, boolean slotSharing) {
+ private static void createUnionTopology(StreamExecutionEnvironment env,
long minCheckpoints, boolean slotSharing, int expectedRestarts) {
final int parallelism = env.getParallelism();
DataStream<Long> combinedSource = null;
final int numSources = 4;
for (int inputIndex = 0; inputIndex < numSources; inputIndex++)
{
- final SingleOutputStreamOperator<Long> source =
env.fromSource(new LongSource(minCheckpoints, parallelism),
WatermarkStrategy.noWatermarks(),
- "source" + inputIndex)
+ final SingleOutputStreamOperator<Long> source =
env.fromSource(
+ new LongSource(minCheckpoints,
parallelism, expectedRestarts),
+ noWatermarks(),
+ "source" + inputIndex)
.slotSharingGroup(slotSharing ? "default" :
("source" + inputIndex))
.disableChaining();
combinedSource = combinedSource == null ? source :
combinedSource.union(source);
@@ -246,7 +257,6 @@ public class UnalignedCheckpointITCase extends
UnalignedCheckpointTestBase {
* A sink that checks if the members arrive in the expected order
without any missing values.
*/
protected static class StrictOrderVerifyingSink extends
VerifyingSinkBase<StrictOrderVerifyingSink.State> {
- private Random random = new Random();
protected boolean backpressure;
private boolean firstOutOfOrder = true;
private boolean firstDuplicate = true;
@@ -320,9 +330,7 @@ public class UnalignedCheckpointITCase extends
UnalignedCheckpointTestBase {
if (backpressure) {
// induce backpressure until enough checkpoints
have been written
- if (random.nextInt(100) == 42) {
- Thread.sleep(0, 100_000);
- }
+ Thread.sleep(1);
}
// after all checkpoints have been completed, the
remaining data should be flushed out fairly quickly
}
diff --git
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointTestBase.java
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointTestBase.java
index 72e8207..2acf7b6 100644
---
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointTestBase.java
+++
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointTestBase.java
@@ -55,14 +55,12 @@ import
org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.util.TestLogger;
-import org.apache.flink.util.function.TriConsumer;
import org.apache.flink.shaded.guava18.com.google.common.collect.Iterables;
import org.junit.Rule;
import org.junit.rules.ErrorCollector;
import org.junit.rules.TemporaryFolder;
-import org.junit.rules.Timeout;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -79,7 +77,6 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
@@ -104,11 +101,6 @@ public abstract class UnalignedCheckpointTestBase extends
TestLogger {
public final TemporaryFolder temp = new TemporaryFolder();
@Rule
- public final Timeout timeout = Timeout.builder()
- .withTimeout(300, TimeUnit.SECONDS)
- .build();
-
- @Rule
public ErrorCollector collector = new ErrorCollector();
@Nullable
@@ -116,8 +108,7 @@ public abstract class UnalignedCheckpointTestBase extends
TestLogger {
final File checkpointDir = temp.newFolder();
StreamExecutionEnvironment env =
settings.createEnvironment(checkpointDir);
- int minCheckpoints = 10;
- settings.dagCreator.accept(env, minCheckpoints,
settings.slotSharing);
+ settings.dagCreator.create(env, settings.minCheckpoints,
settings.slotSharing, settings.expectedFailures - 1);
try {
final JobExecutionResult result = env.execute();
@@ -143,14 +134,15 @@ public abstract class UnalignedCheckpointTestBase extends
TestLogger {
/**
* A source that generates longs in a fixed number of splits.
*/
- protected static class LongSource
- implements Source<Long,
UnalignedCheckpointTestBase.LongSource.LongSplit,
List<UnalignedCheckpointTestBase.LongSource.LongSplit>> {
+ protected static class LongSource implements Source<Long,
LongSource.LongSplit, LongSource.EnumeratorState> {
private final long minCheckpoints;
private final int numSplits;
+ private final int expectedRestarts;
- protected LongSource(long minCheckpoints, int numSplits) {
+ protected LongSource(long minCheckpoints, int numSplits, int
expectedRestarts) {
this.minCheckpoints = minCheckpoints;
this.numSplits = numSplits;
+ this.expectedRestarts = expectedRestarts;
}
@Override
@@ -160,22 +152,20 @@ public abstract class UnalignedCheckpointTestBase extends
TestLogger {
@Override
public SourceReader<Long, LongSplit>
createReader(SourceReaderContext readerContext) {
- return new LongSourceReader(minCheckpoints);
+ return new LongSourceReader(minCheckpoints,
expectedRestarts);
}
@Override
- public SplitEnumerator<LongSplit,
List<UnalignedCheckpointTestBase.LongSource.LongSplit>>
createEnumerator(SplitEnumeratorContext<LongSplit> enumContext) {
+ public SplitEnumerator<LongSplit, EnumeratorState>
createEnumerator(SplitEnumeratorContext<LongSplit> enumContext) {
List<LongSplit> splits = IntStream.range(0, numSplits)
.mapToObj(i -> new LongSplit(i, numSplits, 0))
.collect(Collectors.toList());
- return new LongSplitSplitEnumerator(enumContext,
splits);
+ return new LongSplitSplitEnumerator(enumContext, new
EnumeratorState(splits, 0));
}
@Override
- public SplitEnumerator<LongSplit,
List<UnalignedCheckpointTestBase.LongSource.LongSplit>> restoreEnumerator(
- SplitEnumeratorContext<LongSplit> enumContext,
- List<UnalignedCheckpointTestBase.LongSource.LongSplit>
checkpoint) {
- return new LongSplitSplitEnumerator(enumContext,
checkpoint);
+ public SplitEnumerator<LongSplit, EnumeratorState>
restoreEnumerator(SplitEnumeratorContext<LongSplit> enumContext,
EnumeratorState state) {
+ return new LongSplitSplitEnumerator(enumContext, state);
}
@Override
@@ -184,18 +174,23 @@ public abstract class UnalignedCheckpointTestBase extends
TestLogger {
}
@Override
- public SimpleVersionedSerializer<List<LongSplit>>
getEnumeratorCheckpointSerializer() {
+ public SimpleVersionedSerializer<EnumeratorState>
getEnumeratorCheckpointSerializer() {
return new EnumeratorVersionedSerializer();
}
private static class LongSourceReader implements
SourceReader<Long, LongSplit> {
private final long minCheckpoints;
+ private final int expectedRestarts;
private final LongCounter numInputsCounter = new
LongCounter();
private LongSplit split;
+ private int numAbortedCheckpoints;
+ private boolean throttle = true;
+ private int numRestarts;
- public LongSourceReader(final long minCheckpoints) {
+ public LongSourceReader(final long minCheckpoints, int
expectedRestarts) {
this.minCheckpoints = minCheckpoints;
+ this.expectedRestarts = expectedRestarts;
}
@Override
@@ -203,14 +198,19 @@ public abstract class UnalignedCheckpointTestBase extends
TestLogger {
}
@Override
- public InputStatus pollNext(ReaderOutput<Long> output) {
+ public InputStatus pollNext(ReaderOutput<Long> output)
throws InterruptedException {
if (split == null) {
return InputStatus.NOTHING_AVAILABLE;
}
output.collect(split.nextNumber,
split.nextNumber);
split.nextNumber += split.increment;
- return split.numCompletedCheckpoints >=
minCheckpoints ? InputStatus.END_OF_INPUT : InputStatus.MORE_AVAILABLE;
+
+ if (throttle) {
+ // throttle source as long as sink is
not backpressuring (which it does only after full recovery)
+ Thread.sleep(1);
+ }
+ return split.numCompletedCheckpoints >=
minCheckpoints && numRestarts >= expectedRestarts ? InputStatus.END_OF_INPUT :
InputStatus.MORE_AVAILABLE;
}
@Override
@@ -218,14 +218,28 @@ public abstract class UnalignedCheckpointTestBase extends
TestLogger {
if (split == null) {
return Collections.emptyList();
}
- LOG.info("Snapshotted {} @ {} subtask (?
attempt)", split, split.nextNumber % split.increment);
+ throttle = split.numCompletedCheckpoints >=
minCheckpoints;
+ LOG.info("Snapshotted {} @ {} subtask ({}
attempt)", split, split.nextNumber % split.increment, numRestarts);
return singletonList(split);
}
@Override
public void notifyCheckpointComplete(long checkpointId)
{
if (split != null) {
- LOG.info("notifyCheckpointComplete {} @
{} subtask (? attempt)", split.numCompletedCheckpoints, split.nextNumber %
split.increment);
+ LOG.info("notifyCheckpointComplete {} @
{} subtask ({} attempt)",
+ split.numCompletedCheckpoints,
+ split.nextNumber %
split.increment,
+ numRestarts);
+ split.numCompletedCheckpoints++;
+ numAbortedCheckpoints = 0;
+ }
+ }
+
+ @Override
+ public void notifyCheckpointAborted(long checkpointId) {
+ if (numAbortedCheckpoints++ > 10) {
+ // aborted too many checkpoints in a
row, which usually indicates that part of the pipeline is already completed
+ // here simply also advance completed
checkpoints to avoid running into a live lock
split.numCompletedCheckpoints++;
}
}
@@ -241,7 +255,7 @@ public abstract class UnalignedCheckpointTestBase extends
TestLogger {
throw new IllegalStateException("Tried
to add " + splits + " but already got " + split);
}
split = Iterables.getOnlyElement(splits);
- LOG.info("Added split {} @ {} subtask (?
attempt)", split, split.nextNumber % split.increment);
+ LOG.info("Added split {} @ {} subtask ({}
attempt)", split, split.nextNumber % split.increment, numRestarts);
}
@Override
@@ -250,6 +264,9 @@ public abstract class UnalignedCheckpointTestBase extends
TestLogger {
@Override
public void handleSourceEvents(SourceEvent sourceEvent)
{
+ if (sourceEvent instanceof RestartEvent) {
+ numRestarts = ((RestartEvent)
sourceEvent).numRestarts;
+ }
}
@Override
@@ -260,12 +277,20 @@ public abstract class UnalignedCheckpointTestBase extends
TestLogger {
}
}
+ private static class RestartEvent implements SourceEvent {
+ final int numRestarts;
+
+ private RestartEvent(int numRestarts) {
+ this.numRestarts = numRestarts;
+ }
+ }
+
private static class LongSplit implements SourceSplit {
private final int increment;
private long nextNumber;
- private long numCompletedCheckpoints;
+ private int numCompletedCheckpoints;
- public LongSplit(long nextNumber, int increment, long
numCompletedCheckpoints) {
+ public LongSplit(long nextNumber, int increment, int
numCompletedCheckpoints) {
this.nextNumber = nextNumber;
this.increment = increment;
this.numCompletedCheckpoints =
numCompletedCheckpoints;
@@ -286,13 +311,13 @@ public abstract class UnalignedCheckpointTestBase extends
TestLogger {
}
}
- private static class LongSplitSplitEnumerator implements
SplitEnumerator<LongSplit, List<LongSplit>> {
+ private static class LongSplitSplitEnumerator implements
SplitEnumerator<LongSplit, EnumeratorState> {
private final SplitEnumeratorContext<LongSplit> context;
- private final List<LongSplit> unassignedSplits;
+ private final EnumeratorState state;
- private
LongSplitSplitEnumerator(SplitEnumeratorContext<LongSplit> context,
List<LongSplit> unassignedSplits) {
+ private
LongSplitSplitEnumerator(SplitEnumeratorContext<LongSplit> context,
EnumeratorState state) {
this.context = context;
- this.unassignedSplits = new
ArrayList<>(unassignedSplits);
+ this.state = state;
}
@Override
@@ -309,36 +334,42 @@ public abstract class UnalignedCheckpointTestBase extends
TestLogger {
@Override
public void addSplitsBack(List<LongSplit> splits, int
subtaskId) {
- LOG.info("addSplitsBack {}", splits);
- // disabled due to FLINK-20290, which may
duplicate splits
- // unassignedSplits.addAll(splits);
+ if (!splits.isEmpty()) {
+ LOG.info("addSplitsBack {}", splits);
+ state.unassignedSplits.addAll(splits);
+ }
+ if (subtaskId == 0) {
+ // currently always called on failure
+ state.numRestarts++;
+ }
}
@Override
public void addReader(int subtaskId) {
- if (context.registeredReaders().size() ==
context.currentParallelism()) {
+ if (context.registeredReaders().size() ==
context.currentParallelism() && !state.unassignedSplits.isEmpty()) {
int numReaders =
context.registeredReaders().size();
Map<Integer, List<LongSplit>>
assignment = new HashMap<>();
- for (int i = 0; i <
unassignedSplits.size(); i++) {
+ for (int i = 0; i <
state.unassignedSplits.size(); i++) {
assignment
.computeIfAbsent(i %
numReaders, t -> new ArrayList<>())
-
.add(unassignedSplits.get(i));
+
.add(state.unassignedSplits.get(i));
}
LOG.info("Assigning splits {}",
assignment);
context.assignSplits(new
SplitsAssignment<>(assignment));
- unassignedSplits.clear();
+ state.unassignedSplits.clear();
}
+ context.sendEventToSourceReader(subtaskId, new
RestartEvent(state.numRestarts));
}
@Override
public void notifyCheckpointComplete(long checkpointId)
{
- unassignedSplits.forEach(s ->
s.numCompletedCheckpoints++);
+ state.unassignedSplits.forEach(s ->
s.numCompletedCheckpoints++);
}
@Override
- public List<LongSplit> snapshotState() throws Exception
{
- LOG.info("snapshotState {}", unassignedSplits);
- return unassignedSplits;
+ public EnumeratorState snapshotState() throws Exception
{
+ LOG.info("snapshotState {}", state);
+ return state;
}
@Override
@@ -346,33 +377,61 @@ public abstract class UnalignedCheckpointTestBase extends
TestLogger {
}
}
- private static class EnumeratorVersionedSerializer implements
SimpleVersionedSerializer<List<LongSplit>> {
+ private static class EnumeratorState {
+ private final List<LongSplit> unassignedSplits;
+ private int numRestarts;
+
+ public EnumeratorState(List<LongSplit>
unassignedSplits, int numRestarts) {
+ this.unassignedSplits = unassignedSplits;
+ this.numRestarts = numRestarts;
+ }
+
+ @Override
+ public String toString() {
+ return "EnumeratorState{" +
+ "unassignedSplits=" + unassignedSplits +
+ ", numRestarts=" + numRestarts +
+ '}';
+ }
+ }
+
+ private static class EnumeratorVersionedSerializer implements
SimpleVersionedSerializer<EnumeratorState> {
+ private SplitVersionedSerializer
splitVersionedSerializer = new SplitVersionedSerializer();
+
@Override
public int getVersion() {
return 0;
}
@Override
- public byte[] serialize(List<LongSplit> splits) {
- final byte[] bytes = new byte[20 *
splits.size()];
- for (final LongSplit split : splits) {
-
ByteBuffer.wrap(bytes).putLong(split.nextNumber).putInt(split.increment).putLong(split.numCompletedCheckpoints);
+ public byte[] serialize(EnumeratorState state) {
+ final ByteBuffer byteBuffer =
ByteBuffer.allocate(state.unassignedSplits.size() *
SplitVersionedSerializer.LENGTH + 4);
+ byteBuffer.putInt(state.numRestarts);
+ for (final LongSplit unassignedSplit :
state.unassignedSplits) {
+
byteBuffer.put(splitVersionedSerializer.serialize(unassignedSplit));
}
- return bytes;
+ return byteBuffer.array();
}
@Override
- public List<LongSplit> deserialize(int version, byte[]
serialized) {
+ public EnumeratorState deserialize(int version, byte[]
serialized) {
final ByteBuffer byteBuffer =
ByteBuffer.wrap(serialized);
- final ArrayList<LongSplit> splits = new
ArrayList<>();
+ final int numRestarts = byteBuffer.getInt();
+
+ final List<LongSplit> splits = new
ArrayList<>(serialized.length / SplitVersionedSerializer.LENGTH);
+
+ final byte[] serializedSplit = new
byte[SplitVersionedSerializer.LENGTH];
while (byteBuffer.hasRemaining()) {
- splits.add(new
LongSplit(byteBuffer.getLong(), byteBuffer.getInt(), byteBuffer.getLong()));
+ byteBuffer.get(serializedSplit);
+
splits.add(splitVersionedSerializer.deserialize(version, serializedSplit));
}
- return splits;
+ return new EnumeratorState(splits, numRestarts);
}
}
private static class SplitVersionedSerializer implements
SimpleVersionedSerializer<LongSplit> {
+ static final int LENGTH = 16;
+
@Override
public int getVersion() {
return 0;
@@ -380,19 +439,22 @@ public abstract class UnalignedCheckpointTestBase extends
TestLogger {
@Override
public byte[] serialize(LongSplit split) {
- final byte[] bytes = new byte[20];
-
ByteBuffer.wrap(bytes).putLong(split.nextNumber).putInt(split.increment).putLong(split.numCompletedCheckpoints);
+ final byte[] bytes = new byte[LENGTH];
+
ByteBuffer.wrap(bytes).putLong(split.nextNumber).putInt(split.increment).putInt(split.numCompletedCheckpoints);
return bytes;
}
@Override
public LongSplit deserialize(int version, byte[]
serialized) {
final ByteBuffer byteBuffer =
ByteBuffer.wrap(serialized);
- return new LongSplit(byteBuffer.getLong(),
byteBuffer.getInt(), byteBuffer.getLong());
+ return new LongSplit(byteBuffer.getLong(),
byteBuffer.getInt(), byteBuffer.getInt());
}
}
}
+ interface DagCreator {
+ void create(StreamExecutionEnvironment environment, int
minCheckpoints, boolean slotSharing, int expectedFailures);
+ }
/**
* Builder-like interface for all relevant unaligned settings.
@@ -400,6 +462,7 @@ public abstract class UnalignedCheckpointTestBase extends
TestLogger {
protected static class UnalignedSettings {
private int parallelism;
private int slotsPerTaskManager = 1;
+ private int minCheckpoints = 10;
private boolean slotSharing = true;
@Nullable
private File restoreCheckpoint;
@@ -407,9 +470,9 @@ public abstract class UnalignedCheckpointTestBase extends
TestLogger {
private int numSlots;
private int numBuffers;
private int expectedFailures = 0;
- private final TriConsumer<StreamExecutionEnvironment, Integer,
Boolean> dagCreator;
+ private final DagCreator dagCreator;
- public
UnalignedSettings(TriConsumer<StreamExecutionEnvironment, Integer, Boolean>
dagCreator) {
+ public UnalignedSettings(DagCreator dagCreator) {
this.dagCreator = dagCreator;
}