[FLINK-1296] [runtime] Fix bug when large record handling results in empty spill files
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5970e212 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5970e212 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5970e212 Branch: refs/heads/master Commit: 5970e212b1beca29590dd138dc5e8eaf90bac498 Parents: 76eaef0 Author: Stephan Ewen <[email protected]> Authored: Thu Dec 18 20:08:09 2014 +0100 Committer: Ufuk Celebi <[email protected]> Committed: Wed Jan 21 12:01:36 2015 +0100 ---------------------------------------------------------------------- .../operators/sort/UnilateralSortMerger.java | 4 +- .../sort/ExternalSortLargeRecordsITCase.java | 66 ++++++++++++++++++++ 2 files changed, 69 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/5970e212/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/UnilateralSortMerger.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/UnilateralSortMerger.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/UnilateralSortMerger.java index 6e89300..cdd5eb4 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/UnilateralSortMerger.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/UnilateralSortMerger.java @@ -1320,7 +1320,9 @@ public class UnilateralSortMerger<E> implements Sorter<E> { output.close(); unregisterOpenChannelToBeRemovedAtShudown(writer); - channelIDs.add(new ChannelWithBlockCount(channel, output.getBlockCount())); + if (output.getBytesWritten() > 0) { + channelIDs.add(new ChannelWithBlockCount(channel, output.getBlockCount())); + } // pass empty sort-buffer to reading thread element.buffer.reset(); http://git-wip-us.apache.org/repos/asf/flink/blob/5970e212/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ExternalSortLargeRecordsITCase.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ExternalSortLargeRecordsITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ExternalSortLargeRecordsITCase.java index 33d15ae..ad15282 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ExternalSortLargeRecordsITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ExternalSortLargeRecordsITCase.java @@ -293,6 +293,72 @@ public class ExternalSortLargeRecordsITCase { } } + @Test + public void testSortWithMediumRecordsOnly() { + try { + final int NUM_RECORDS = 70; + + final TypeInformation<?>[] types = new TypeInformation<?>[] { + BasicTypeInfo.LONG_TYPE_INFO, + new ValueTypeInfo<SmallOrMediumOrLargeValue>(SmallOrMediumOrLargeValue.class) + }; + + final TupleTypeInfo<Tuple2<Long, SmallOrMediumOrLargeValue>> typeInfo = + new TupleTypeInfo<Tuple2<Long,SmallOrMediumOrLargeValue>>(types); + + final TypeSerializer<Tuple2<Long, SmallOrMediumOrLargeValue>> serializer = typeInfo.createSerializer(); + final TypeComparator<Tuple2<Long, SmallOrMediumOrLargeValue>> comparator = typeInfo.createComparator(new int[] {0}, new boolean[]{false}, 0); + + MutableObjectIterator<Tuple2<Long, SmallOrMediumOrLargeValue>> source = + new MutableObjectIterator<Tuple2<Long, SmallOrMediumOrLargeValue>>() + { + private final Random rnd = new Random(); + private int num = -1; + + @Override + public Tuple2<Long, SmallOrMediumOrLargeValue> next(Tuple2<Long, SmallOrMediumOrLargeValue> reuse) { + if (++num < NUM_RECORDS) { + long val = rnd.nextLong(); + return new Tuple2<Long, SmallOrMediumOrLargeValue>(val, new SmallOrMediumOrLargeValue((int) val, SmallOrMediumOrLargeValue.MEDIUM_SIZE)); + } + else { + return null; + } + + } + }; + + @SuppressWarnings("unchecked") + Sorter<Tuple2<Long, SmallOrMediumOrLargeValue>> sorter = new UnilateralSortMerger<Tuple2<Long, SmallOrMediumOrLargeValue>>( + this.memoryManager, this.ioManager, + source, this.parentTask, + new RuntimeStatefulSerializerFactory<Tuple2<Long, SmallOrMediumOrLargeValue>>(serializer, (Class<Tuple2<Long, SmallOrMediumOrLargeValue>>) (Class<?>) Tuple2.class), + comparator, 1.0, 1, 128, 0.7f); + + // check order + MutableObjectIterator<Tuple2<Long, SmallOrMediumOrLargeValue>> iterator = sorter.getIterator(); + + Tuple2<Long, SmallOrMediumOrLargeValue> val = serializer.createInstance(); + + long prevKey = Long.MAX_VALUE; + + for (int i = 0; i < NUM_RECORDS; i++) { + val = iterator.next(val); + + assertTrue(val.f0 <= prevKey); + assertTrue(val.f0.intValue() == val.f1.val()); + } + + assertNull(iterator.next(val)); + + sorter.close(); + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + // -------------------------------------------------------------------------------------------- public static final class SomeMaybeLongValue implements org.apache.flink.types.Value {
