This is an automated email from the ASF dual-hosted git repository.

arvid pushed a commit to branch release-1.12
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.12 by this push:
     new a4fc365  [FLINK-20433][tests] Stabilizing UnalignedCheckpointITCase.
a4fc365 is described below

commit a4fc365af1a08be74b2ae12b65f975600008d6a8
Author: Arvid Heise <[email protected]>
AuthorDate: Wed Dec 9 14:34:34 2020 +0100

    [FLINK-20433][tests] Stabilizing UnalignedCheckpointITCase.
    
    Improves UnalignedCheckpointITCase in the following ways to avoid running 
into rare issues or better deal with them:
    - Reduce load on test machine: Executing a test with p=10 may spawn up to 
70 tasks that until backpressured can potentially lead to a full load on 70 
cores. This may causes larger GC pauses and other JVM freezes that will trigger 
the rare PartitionNotFound exception. Now, sources are throttled until the sink 
backpressures.
    - Keep track of restart attempts in split enumerator and sink with source 
instances. Only finish sources if the desired numbers of failures occurred to 
avoid finishing too quickly.
    - Avoid relying completely on notifyCheckpointComplete to finish test: 
notifyCheckpointComplete is not guaranteed to be called but the test completely 
relied on it. This may lead to indefinite test runs: Some sources are finished 
while others are still running, but new checkpoints are canceled because of the 
finished sources. Thus, too many aborted checkpoints will also lead to a 
completed test.
    - Readding splits to enumerator (after FLINK-20290 has been fixed). Any 
unexpected failure may have caused all splits to be dropped, which causes 
indefinite running tests.
    - Removing test-class level timeout - AZP has its own timeout that also 
provides thread dumps - something that would require a tremendous effort in 
JUnit4.
---
 .../checkpointing/UnalignedCheckpointITCase.java   |  38 +++--
 .../checkpointing/UnalignedCheckpointTestBase.java | 181 ++++++++++++++-------
 2 files changed, 145 insertions(+), 74 deletions(-)

diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointITCase.java
index f555249..da168a7 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointITCase.java
@@ -19,7 +19,6 @@
 
 package org.apache.flink.test.checkpointing;
 
-import org.apache.flink.api.common.eventtime.WatermarkStrategy;
 import org.apache.flink.api.common.functions.Partitioner;
 import org.apache.flink.api.common.functions.RichFlatMapFunction;
 import org.apache.flink.api.common.state.ListState;
@@ -45,8 +44,8 @@ import org.junit.runners.Parameterized;
 
 import java.util.Arrays;
 import java.util.BitSet;
-import java.util.Random;
 
+import static 
org.apache.flink.api.common.eventtime.WatermarkStrategy.noWatermarks;
 import static 
org.apache.flink.shaded.guava18.com.google.common.collect.Iterables.getOnlyElement;
 
 /**
@@ -159,9 +158,12 @@ public class UnalignedCheckpointITCase extends 
UnalignedCheckpointTestBase {
                execute(settings);
        }
 
-       private static void createPipeline(StreamExecutionEnvironment env, long 
minCheckpoints, boolean slotSharing) {
+       private static void createPipeline(StreamExecutionEnvironment env, long 
minCheckpoints, boolean slotSharing, int expectedRestarts) {
                final int parallelism = env.getParallelism();
-               final SingleOutputStreamOperator<Long> stream = 
env.fromSource(new LongSource(minCheckpoints, parallelism), 
WatermarkStrategy.noWatermarks(), "source")
+               final SingleOutputStreamOperator<Long> stream = env.fromSource(
+                               new LongSource(minCheckpoints, parallelism, 
expectedRestarts),
+                               noWatermarks(),
+                               "source")
                        .slotSharingGroup(slotSharing ? "default" : "source")
                        .disableChaining()
                        .map(i -> i).name("forward").uid("forward")
@@ -172,27 +174,36 @@ public class UnalignedCheckpointITCase extends 
UnalignedCheckpointTestBase {
                addFailingPipeline(minCheckpoints, slotSharing, stream);
        }
 
-       private static void 
createMultipleInputTopology(StreamExecutionEnvironment env, long 
minCheckpoints, boolean slotSharing) {
+       private static void 
createMultipleInputTopology(StreamExecutionEnvironment env, long 
minCheckpoints, boolean slotSharing, int expectedRestarts) {
                final int parallelism = env.getParallelism();
                DataStream<Long> combinedSource = null;
                for (int inputIndex = 0; inputIndex < 4; inputIndex++) {
-                       final SingleOutputStreamOperator<Long> source = 
env.fromSource(new LongSource(minCheckpoints, parallelism), 
WatermarkStrategy.noWatermarks(),
-                               "source" + inputIndex)
+                       final SingleOutputStreamOperator<Long> source = 
env.fromSource(
+                                       new LongSource(minCheckpoints, 
parallelism, expectedRestarts),
+                                       noWatermarks(),
+                                       "source" + inputIndex)
                                .slotSharingGroup(slotSharing ? "default" : 
("source" + inputIndex))
                                .disableChaining();
-                       combinedSource = combinedSource == null ? source : 
combinedSource.connect(source).flatMap(new MinEmittingFunction());
+                       combinedSource = combinedSource == null ?
+                               source :
+                               combinedSource.connect(source)
+                                       .flatMap(new MinEmittingFunction())
+                                       .name("min" + inputIndex).uid("min" + 
inputIndex)
+                                       .slotSharingGroup(slotSharing ? 
"default" : ("min" + inputIndex));
                }
 
                addFailingPipeline(minCheckpoints, slotSharing, combinedSource);
        }
 
-       private static void createUnionTopology(StreamExecutionEnvironment env, 
long minCheckpoints, boolean slotSharing) {
+       private static void createUnionTopology(StreamExecutionEnvironment env, 
long minCheckpoints, boolean slotSharing, int expectedRestarts) {
                final int parallelism = env.getParallelism();
                DataStream<Long> combinedSource = null;
                final int numSources = 4;
                for (int inputIndex = 0; inputIndex < numSources; inputIndex++) 
{
-                       final SingleOutputStreamOperator<Long> source = 
env.fromSource(new LongSource(minCheckpoints, parallelism), 
WatermarkStrategy.noWatermarks(),
-                               "source" + inputIndex)
+                       final SingleOutputStreamOperator<Long> source = 
env.fromSource(
+                                       new LongSource(minCheckpoints, 
parallelism, expectedRestarts),
+                                       noWatermarks(),
+                                       "source" + inputIndex)
                                .slotSharingGroup(slotSharing ? "default" : 
("source" + inputIndex))
                                .disableChaining();
                        combinedSource = combinedSource == null ? source : 
combinedSource.union(source);
@@ -246,7 +257,6 @@ public class UnalignedCheckpointITCase extends 
UnalignedCheckpointTestBase {
         * A sink that checks if the members arrive in the expected order 
without any missing values.
         */
        protected static class StrictOrderVerifyingSink extends 
VerifyingSinkBase<StrictOrderVerifyingSink.State> {
-               private Random random = new Random();
                protected boolean backpressure;
                private boolean firstOutOfOrder = true;
                private boolean firstDuplicate = true;
@@ -320,9 +330,7 @@ public class UnalignedCheckpointITCase extends 
UnalignedCheckpointTestBase {
 
                        if (backpressure) {
                                // induce backpressure until enough checkpoints 
have been written
-                               if (random.nextInt(100) == 42) {
-                                       Thread.sleep(0, 100_000);
-                               }
+                               Thread.sleep(1);
                        }
                        // after all checkpoints have been completed, the 
remaining data should be flushed out fairly quickly
                }
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointTestBase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointTestBase.java
index 72e8207..2acf7b6 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointTestBase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointTestBase.java
@@ -55,14 +55,12 @@ import 
org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
 import org.apache.flink.util.TestLogger;
-import org.apache.flink.util.function.TriConsumer;
 
 import org.apache.flink.shaded.guava18.com.google.common.collect.Iterables;
 
 import org.junit.Rule;
 import org.junit.rules.ErrorCollector;
 import org.junit.rules.TemporaryFolder;
-import org.junit.rules.Timeout;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -79,7 +77,6 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
@@ -104,11 +101,6 @@ public abstract class UnalignedCheckpointTestBase extends 
TestLogger {
        public final TemporaryFolder temp = new TemporaryFolder();
 
        @Rule
-       public final Timeout timeout = Timeout.builder()
-               .withTimeout(300, TimeUnit.SECONDS)
-               .build();
-
-       @Rule
        public ErrorCollector collector = new ErrorCollector();
 
        @Nullable
@@ -116,8 +108,7 @@ public abstract class UnalignedCheckpointTestBase extends 
TestLogger {
                final File checkpointDir = temp.newFolder();
                StreamExecutionEnvironment env = 
settings.createEnvironment(checkpointDir);
 
-               int minCheckpoints = 10;
-               settings.dagCreator.accept(env, minCheckpoints, 
settings.slotSharing);
+               settings.dagCreator.create(env, settings.minCheckpoints, 
settings.slotSharing, settings.expectedFailures - 1);
                try {
                        final JobExecutionResult result = env.execute();
 
@@ -143,14 +134,15 @@ public abstract class UnalignedCheckpointTestBase extends 
TestLogger {
        /**
         * A source that generates longs in a fixed number of splits.
         */
-       protected static class LongSource
-                       implements Source<Long, 
UnalignedCheckpointTestBase.LongSource.LongSplit, 
List<UnalignedCheckpointTestBase.LongSource.LongSplit>> {
+       protected static class LongSource implements Source<Long, 
LongSource.LongSplit, LongSource.EnumeratorState> {
                private final long minCheckpoints;
                private final int numSplits;
+               private final int expectedRestarts;
 
-               protected LongSource(long minCheckpoints, int numSplits) {
+               protected LongSource(long minCheckpoints, int numSplits, int 
expectedRestarts) {
                        this.minCheckpoints = minCheckpoints;
                        this.numSplits = numSplits;
+                       this.expectedRestarts = expectedRestarts;
                }
 
                @Override
@@ -160,22 +152,20 @@ public abstract class UnalignedCheckpointTestBase extends 
TestLogger {
 
                @Override
                public SourceReader<Long, LongSplit> 
createReader(SourceReaderContext readerContext) {
-                       return new LongSourceReader(minCheckpoints);
+                       return new LongSourceReader(minCheckpoints, 
expectedRestarts);
                }
 
                @Override
-               public SplitEnumerator<LongSplit, 
List<UnalignedCheckpointTestBase.LongSource.LongSplit>> 
createEnumerator(SplitEnumeratorContext<LongSplit> enumContext) {
+               public SplitEnumerator<LongSplit, EnumeratorState> 
createEnumerator(SplitEnumeratorContext<LongSplit> enumContext) {
                        List<LongSplit> splits = IntStream.range(0, numSplits)
                                .mapToObj(i -> new LongSplit(i, numSplits, 0))
                                .collect(Collectors.toList());
-                       return new LongSplitSplitEnumerator(enumContext, 
splits);
+                       return new LongSplitSplitEnumerator(enumContext, new 
EnumeratorState(splits, 0));
                }
 
                @Override
-               public SplitEnumerator<LongSplit, 
List<UnalignedCheckpointTestBase.LongSource.LongSplit>> restoreEnumerator(
-                       SplitEnumeratorContext<LongSplit> enumContext,
-                       List<UnalignedCheckpointTestBase.LongSource.LongSplit> 
checkpoint) {
-                       return new LongSplitSplitEnumerator(enumContext, 
checkpoint);
+               public SplitEnumerator<LongSplit, EnumeratorState> 
restoreEnumerator(SplitEnumeratorContext<LongSplit> enumContext, 
EnumeratorState state) {
+                       return new LongSplitSplitEnumerator(enumContext, state);
                }
 
                @Override
@@ -184,18 +174,23 @@ public abstract class UnalignedCheckpointTestBase extends 
TestLogger {
                }
 
                @Override
-               public SimpleVersionedSerializer<List<LongSplit>> 
getEnumeratorCheckpointSerializer() {
+               public SimpleVersionedSerializer<EnumeratorState> 
getEnumeratorCheckpointSerializer() {
                        return new EnumeratorVersionedSerializer();
                }
 
                private static class LongSourceReader implements 
SourceReader<Long, LongSplit> {
 
                        private final long minCheckpoints;
+                       private final int expectedRestarts;
                        private final LongCounter numInputsCounter = new 
LongCounter();
                        private LongSplit split;
+                       private int numAbortedCheckpoints;
+                       private boolean throttle = true;
+                       private int numRestarts;
 
-                       public LongSourceReader(final long minCheckpoints) {
+                       public LongSourceReader(final long minCheckpoints, int 
expectedRestarts) {
                                this.minCheckpoints = minCheckpoints;
+                               this.expectedRestarts = expectedRestarts;
                        }
 
                        @Override
@@ -203,14 +198,19 @@ public abstract class UnalignedCheckpointTestBase extends 
TestLogger {
                        }
 
                        @Override
-                       public InputStatus pollNext(ReaderOutput<Long> output) {
+                       public InputStatus pollNext(ReaderOutput<Long> output) 
throws InterruptedException {
                                if (split == null) {
                                        return InputStatus.NOTHING_AVAILABLE;
                                }
 
                                output.collect(split.nextNumber, 
split.nextNumber);
                                split.nextNumber += split.increment;
-                               return split.numCompletedCheckpoints >= 
minCheckpoints ? InputStatus.END_OF_INPUT : InputStatus.MORE_AVAILABLE;
+
+                               if (throttle) {
+                                       // throttle source as long as sink is 
not backpressuring (which it does only after full recovery)
+                                       Thread.sleep(1);
+                               }
+                               return split.numCompletedCheckpoints >= 
minCheckpoints && numRestarts >= expectedRestarts ? InputStatus.END_OF_INPUT : 
InputStatus.MORE_AVAILABLE;
                        }
 
                        @Override
@@ -218,14 +218,28 @@ public abstract class UnalignedCheckpointTestBase extends 
TestLogger {
                                if (split == null) {
                                        return Collections.emptyList();
                                }
-                               LOG.info("Snapshotted {} @ {} subtask (? 
attempt)", split, split.nextNumber % split.increment);
+                               throttle = split.numCompletedCheckpoints >= 
minCheckpoints;
+                               LOG.info("Snapshotted {} @ {} subtask ({} 
attempt)", split, split.nextNumber % split.increment, numRestarts);
                                return singletonList(split);
                        }
 
                        @Override
                        public void notifyCheckpointComplete(long checkpointId) 
{
                                if (split != null) {
-                                       LOG.info("notifyCheckpointComplete {} @ 
{} subtask (? attempt)", split.numCompletedCheckpoints, split.nextNumber % 
split.increment);
+                                       LOG.info("notifyCheckpointComplete {} @ 
{} subtask ({} attempt)",
+                                               split.numCompletedCheckpoints,
+                                               split.nextNumber % 
split.increment,
+                                               numRestarts);
+                                       split.numCompletedCheckpoints++;
+                                       numAbortedCheckpoints = 0;
+                               }
+                       }
+
+                       @Override
+                       public void notifyCheckpointAborted(long checkpointId) {
+                               if (numAbortedCheckpoints++ > 10) {
+                                       // aborted too many checkpoints in a 
row, which usually indicates that part of the pipeline is already completed
+                                       // here simply also advance completed 
checkpoints to avoid running into a live lock
                                        split.numCompletedCheckpoints++;
                                }
                        }
@@ -241,7 +255,7 @@ public abstract class UnalignedCheckpointTestBase extends 
TestLogger {
                                        throw new IllegalStateException("Tried 
to add " + splits + " but already got " + split);
                                }
                                split = Iterables.getOnlyElement(splits);
-                               LOG.info("Added split {} @ {} subtask (? 
attempt)", split, split.nextNumber % split.increment);
+                               LOG.info("Added split {} @ {} subtask ({} 
attempt)", split, split.nextNumber % split.increment, numRestarts);
                        }
 
                        @Override
@@ -250,6 +264,9 @@ public abstract class UnalignedCheckpointTestBase extends 
TestLogger {
 
                        @Override
                        public void handleSourceEvents(SourceEvent sourceEvent) 
{
+                               if (sourceEvent instanceof RestartEvent) {
+                                       numRestarts = ((RestartEvent) 
sourceEvent).numRestarts;
+                               }
                        }
 
                        @Override
@@ -260,12 +277,20 @@ public abstract class UnalignedCheckpointTestBase extends 
TestLogger {
                        }
                }
 
+               private static class RestartEvent implements SourceEvent {
+                       final int numRestarts;
+
+                       private RestartEvent(int numRestarts) {
+                               this.numRestarts = numRestarts;
+                       }
+               }
+
                private static class LongSplit implements SourceSplit {
                        private final int increment;
                        private long nextNumber;
-                       private long numCompletedCheckpoints;
+                       private int numCompletedCheckpoints;
 
-                       public LongSplit(long nextNumber, int increment, long 
numCompletedCheckpoints) {
+                       public LongSplit(long nextNumber, int increment, int 
numCompletedCheckpoints) {
                                this.nextNumber = nextNumber;
                                this.increment = increment;
                                this.numCompletedCheckpoints = 
numCompletedCheckpoints;
@@ -286,13 +311,13 @@ public abstract class UnalignedCheckpointTestBase extends 
TestLogger {
                        }
                }
 
-               private static class LongSplitSplitEnumerator implements 
SplitEnumerator<LongSplit, List<LongSplit>> {
+               private static class LongSplitSplitEnumerator implements 
SplitEnumerator<LongSplit, EnumeratorState> {
                        private final SplitEnumeratorContext<LongSplit> context;
-                       private final List<LongSplit> unassignedSplits;
+                       private final EnumeratorState state;
 
-                       private 
LongSplitSplitEnumerator(SplitEnumeratorContext<LongSplit> context, 
List<LongSplit> unassignedSplits) {
+                       private 
LongSplitSplitEnumerator(SplitEnumeratorContext<LongSplit> context, 
EnumeratorState state) {
                                this.context = context;
-                               this.unassignedSplits = new 
ArrayList<>(unassignedSplits);
+                               this.state = state;
                        }
 
                        @Override
@@ -309,36 +334,42 @@ public abstract class UnalignedCheckpointTestBase extends 
TestLogger {
 
                        @Override
                        public void addSplitsBack(List<LongSplit> splits, int 
subtaskId) {
-                               LOG.info("addSplitsBack {}", splits);
-                               // disabled due to FLINK-20290, which may 
duplicate splits
-                               // unassignedSplits.addAll(splits);
+                               if (!splits.isEmpty()) {
+                                       LOG.info("addSplitsBack {}", splits);
+                                       state.unassignedSplits.addAll(splits);
+                               }
+                               if (subtaskId == 0) {
+                                       // currently always called on failure
+                                       state.numRestarts++;
+                               }
                        }
 
                        @Override
                        public void addReader(int subtaskId) {
-                               if (context.registeredReaders().size() == 
context.currentParallelism()) {
+                               if (context.registeredReaders().size() == 
context.currentParallelism() && !state.unassignedSplits.isEmpty()) {
                                        int numReaders = 
context.registeredReaders().size();
                                        Map<Integer, List<LongSplit>> 
assignment = new HashMap<>();
-                                       for (int i = 0; i < 
unassignedSplits.size(); i++) {
+                                       for (int i = 0; i < 
state.unassignedSplits.size(); i++) {
                                                assignment
                                                        .computeIfAbsent(i % 
numReaders, t -> new ArrayList<>())
-                                                       
.add(unassignedSplits.get(i));
+                                                       
.add(state.unassignedSplits.get(i));
                                        }
                                        LOG.info("Assigning splits {}", 
assignment);
                                        context.assignSplits(new 
SplitsAssignment<>(assignment));
-                                       unassignedSplits.clear();
+                                       state.unassignedSplits.clear();
                                }
+                               context.sendEventToSourceReader(subtaskId, new 
RestartEvent(state.numRestarts));
                        }
 
                        @Override
                        public void notifyCheckpointComplete(long checkpointId) 
{
-                               unassignedSplits.forEach(s -> 
s.numCompletedCheckpoints++);
+                               state.unassignedSplits.forEach(s -> 
s.numCompletedCheckpoints++);
                        }
 
                        @Override
-                       public List<LongSplit> snapshotState() throws Exception 
{
-                               LOG.info("snapshotState {}", unassignedSplits);
-                               return unassignedSplits;
+                       public EnumeratorState snapshotState() throws Exception 
{
+                               LOG.info("snapshotState {}", state);
+                               return state;
                        }
 
                        @Override
@@ -346,33 +377,61 @@ public abstract class UnalignedCheckpointTestBase extends 
TestLogger {
                        }
                }
 
-               private static class EnumeratorVersionedSerializer implements 
SimpleVersionedSerializer<List<LongSplit>> {
+               private static class EnumeratorState {
+                       private final List<LongSplit> unassignedSplits;
+                       private int numRestarts;
+
+                       public EnumeratorState(List<LongSplit> 
unassignedSplits, int numRestarts) {
+                               this.unassignedSplits = unassignedSplits;
+                               this.numRestarts = numRestarts;
+                       }
+
+                       @Override
+                       public String toString() {
+                               return "EnumeratorState{" +
+                                       "unassignedSplits=" + unassignedSplits +
+                                       ", numRestarts=" + numRestarts +
+                                       '}';
+                       }
+               }
+
+               private static class EnumeratorVersionedSerializer implements 
SimpleVersionedSerializer<EnumeratorState> {
+                       private SplitVersionedSerializer 
splitVersionedSerializer = new SplitVersionedSerializer();
+
                        @Override
                        public int getVersion() {
                                return 0;
                        }
 
                        @Override
-                       public byte[] serialize(List<LongSplit> splits) {
-                               final byte[] bytes = new byte[20 * 
splits.size()];
-                               for (final LongSplit split : splits) {
-                                       
ByteBuffer.wrap(bytes).putLong(split.nextNumber).putInt(split.increment).putLong(split.numCompletedCheckpoints);
+                       public byte[] serialize(EnumeratorState state) {
+                               final ByteBuffer byteBuffer = 
ByteBuffer.allocate(state.unassignedSplits.size() * 
SplitVersionedSerializer.LENGTH + 4);
+                               byteBuffer.putInt(state.numRestarts);
+                               for (final LongSplit unassignedSplit : 
state.unassignedSplits) {
+                                       
byteBuffer.put(splitVersionedSerializer.serialize(unassignedSplit));
                                }
-                               return bytes;
+                               return byteBuffer.array();
                        }
 
                        @Override
-                       public List<LongSplit> deserialize(int version, byte[] 
serialized) {
+                       public EnumeratorState deserialize(int version, byte[] 
serialized) {
                                final ByteBuffer byteBuffer = 
ByteBuffer.wrap(serialized);
-                               final ArrayList<LongSplit> splits = new 
ArrayList<>();
+                               final int numRestarts = byteBuffer.getInt();
+
+                               final List<LongSplit> splits = new 
ArrayList<>(serialized.length / SplitVersionedSerializer.LENGTH);
+
+                               final byte[] serializedSplit = new 
byte[SplitVersionedSerializer.LENGTH];
                                while (byteBuffer.hasRemaining()) {
-                                       splits.add(new 
LongSplit(byteBuffer.getLong(), byteBuffer.getInt(), byteBuffer.getLong()));
+                                       byteBuffer.get(serializedSplit);
+                                       
splits.add(splitVersionedSerializer.deserialize(version, serializedSplit));
                                }
-                               return splits;
+                               return new EnumeratorState(splits, numRestarts);
                        }
                }
 
                private static class SplitVersionedSerializer implements 
SimpleVersionedSerializer<LongSplit> {
+                       static final int LENGTH = 16;
+
                        @Override
                        public int getVersion() {
                                return 0;
@@ -380,19 +439,22 @@ public abstract class UnalignedCheckpointTestBase extends 
TestLogger {
 
                        @Override
                        public byte[] serialize(LongSplit split) {
-                               final byte[] bytes = new byte[20];
-                               
ByteBuffer.wrap(bytes).putLong(split.nextNumber).putInt(split.increment).putLong(split.numCompletedCheckpoints);
+                               final byte[] bytes = new byte[LENGTH];
+                               
ByteBuffer.wrap(bytes).putLong(split.nextNumber).putInt(split.increment).putInt(split.numCompletedCheckpoints);
                                return bytes;
                        }
 
                        @Override
                        public LongSplit deserialize(int version, byte[] 
serialized) {
                                final ByteBuffer byteBuffer = 
ByteBuffer.wrap(serialized);
-                               return new LongSplit(byteBuffer.getLong(), 
byteBuffer.getInt(), byteBuffer.getLong());
+                               return new LongSplit(byteBuffer.getLong(), 
byteBuffer.getInt(), byteBuffer.getInt());
                        }
                }
        }
 
+       interface DagCreator {
+               void create(StreamExecutionEnvironment environment, int 
minCheckpoints, boolean slotSharing, int expectedFailures);
+       }
 
        /**
         * Builder-like interface for all relevant unaligned settings.
@@ -400,6 +462,7 @@ public abstract class UnalignedCheckpointTestBase extends 
TestLogger {
        protected static class UnalignedSettings {
                private int parallelism;
                private int slotsPerTaskManager = 1;
+               private int minCheckpoints = 10;
                private boolean slotSharing = true;
                @Nullable
                private File restoreCheckpoint;
@@ -407,9 +470,9 @@ public abstract class UnalignedCheckpointTestBase extends 
TestLogger {
                private int numSlots;
                private int numBuffers;
                private int expectedFailures = 0;
-               private final TriConsumer<StreamExecutionEnvironment, Integer, 
Boolean> dagCreator;
+               private final DagCreator dagCreator;
 
-               public 
UnalignedSettings(TriConsumer<StreamExecutionEnvironment, Integer, Boolean> 
dagCreator) {
+               public UnalignedSettings(DagCreator dagCreator) {
                        this.dagCreator = dagCreator;
                }
 

Reply via email to