[FLINK-1296] [runtime] Fix bug when large record handling results in empty 
spill files


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5970e212
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5970e212
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5970e212

Branch: refs/heads/master
Commit: 5970e212b1beca29590dd138dc5e8eaf90bac498
Parents: 76eaef0
Author: Stephan Ewen <[email protected]>
Authored: Thu Dec 18 20:08:09 2014 +0100
Committer: Ufuk Celebi <[email protected]>
Committed: Wed Jan 21 12:01:36 2015 +0100

----------------------------------------------------------------------
 .../operators/sort/UnilateralSortMerger.java    |  4 +-
 .../sort/ExternalSortLargeRecordsITCase.java    | 66 ++++++++++++++++++++
 2 files changed, 69 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/5970e212/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/UnilateralSortMerger.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/UnilateralSortMerger.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/UnilateralSortMerger.java
index 6e89300..cdd5eb4 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/UnilateralSortMerger.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/UnilateralSortMerger.java
@@ -1320,7 +1320,9 @@ public class UnilateralSortMerger<E> implements Sorter<E> 
{
                                output.close();
                                
unregisterOpenChannelToBeRemovedAtShudown(writer);
                                
-                               channelIDs.add(new 
ChannelWithBlockCount(channel, output.getBlockCount()));
+                               if (output.getBytesWritten() > 0) {
+                                       channelIDs.add(new 
ChannelWithBlockCount(channel, output.getBlockCount()));
+                               }
 
                                // pass empty sort-buffer to reading thread
                                element.buffer.reset();

http://git-wip-us.apache.org/repos/asf/flink/blob/5970e212/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ExternalSortLargeRecordsITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ExternalSortLargeRecordsITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ExternalSortLargeRecordsITCase.java
index 33d15ae..ad15282 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ExternalSortLargeRecordsITCase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ExternalSortLargeRecordsITCase.java
@@ -293,6 +293,72 @@ public class ExternalSortLargeRecordsITCase {
                }
        }
        
+       @Test
+       public void testSortWithMediumRecordsOnly() {
+               try {
+                       final int NUM_RECORDS = 70;
+                       
+                       final TypeInformation<?>[] types = new 
TypeInformation<?>[] {
+                                       BasicTypeInfo.LONG_TYPE_INFO,
+                                       new 
ValueTypeInfo<SmallOrMediumOrLargeValue>(SmallOrMediumOrLargeValue.class)
+                               };
+                       
+                       final TupleTypeInfo<Tuple2<Long, 
SmallOrMediumOrLargeValue>> typeInfo = 
+                                                               new 
TupleTypeInfo<Tuple2<Long,SmallOrMediumOrLargeValue>>(types);
+                       
+                       final TypeSerializer<Tuple2<Long, 
SmallOrMediumOrLargeValue>> serializer = typeInfo.createSerializer();
+                       final TypeComparator<Tuple2<Long, 
SmallOrMediumOrLargeValue>> comparator = typeInfo.createComparator(new int[] 
{0}, new boolean[]{false}, 0);
+                       
+                       MutableObjectIterator<Tuple2<Long, 
SmallOrMediumOrLargeValue>> source = 
+                                       new MutableObjectIterator<Tuple2<Long, 
SmallOrMediumOrLargeValue>>()
+                       {
+                               private final Random rnd = new Random();
+                               private int num = -1;
+                               
+                               @Override
+                               public Tuple2<Long, SmallOrMediumOrLargeValue> 
next(Tuple2<Long, SmallOrMediumOrLargeValue> reuse) {
+                                       if (++num < NUM_RECORDS) {
+                                               long val = rnd.nextLong();
+                                               return new Tuple2<Long, 
SmallOrMediumOrLargeValue>(val, new SmallOrMediumOrLargeValue((int) val, 
SmallOrMediumOrLargeValue.MEDIUM_SIZE));
+                                       }
+                                       else {
+                                               return null;
+                                       }
+                                       
+                               }
+                       };
+                       
+                       @SuppressWarnings("unchecked")
+                       Sorter<Tuple2<Long, SmallOrMediumOrLargeValue>> sorter 
= new UnilateralSortMerger<Tuple2<Long, SmallOrMediumOrLargeValue>>(
+                                       this.memoryManager, this.ioManager, 
+                                       source, this.parentTask,
+                                       new 
RuntimeStatefulSerializerFactory<Tuple2<Long, 
SmallOrMediumOrLargeValue>>(serializer, (Class<Tuple2<Long, 
SmallOrMediumOrLargeValue>>) (Class<?>) Tuple2.class),
+                                       comparator, 1.0, 1, 128, 0.7f);
+                       
+                       // check order
+                       MutableObjectIterator<Tuple2<Long, 
SmallOrMediumOrLargeValue>> iterator = sorter.getIterator();
+                       
+                       Tuple2<Long, SmallOrMediumOrLargeValue> val = 
serializer.createInstance();
+                       
+                       long prevKey = Long.MAX_VALUE;
+
+                       for (int i = 0; i < NUM_RECORDS; i++) {
+                               val = iterator.next(val);
+                               
+                               assertTrue(val.f0 <= prevKey);
+                               assertTrue(val.f0.intValue() == val.f1.val());
+                       }
+                       
+                       assertNull(iterator.next(val));
+                       
+                       sorter.close();
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+       }
+       
        // 
--------------------------------------------------------------------------------------------
        
        public static final class SomeMaybeLongValue implements 
org.apache.flink.types.Value {

Reply via email to