Repository: flink
Updated Branches:
  refs/heads/master 16afb8ec6 -> af477563e


[FLINK-2763] [runtime] Fix hash table spilling partition selection.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/af477563
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/af477563
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/af477563

Branch: refs/heads/master
Commit: af477563eb1acaab74da1a508c7e5fa37339c206
Parents: 16afb8e
Author: Stephan Ewen <[email protected]>
Authored: Tue Sep 29 14:07:01 2015 +0200
Committer: Stephan Ewen <[email protected]>
Committed: Tue Sep 29 14:07:01 2015 +0200

----------------------------------------------------------------------
 .../runtime/operators/hash/HashPartition.java   | 15 ++++-
 .../operators/hash/MutableHashTable.java        |  4 +-
 .../runtime/operators/hash/HashTableTest.java   | 69 ++++++++++++++++++++
 3 files changed, 85 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/af477563/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/HashPartition.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/HashPartition.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/HashPartition.java
index 7baaee7..32fd74a 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/HashPartition.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/HashPartition.java
@@ -198,6 +198,19 @@ public class HashPartition<BT, PT> extends 
AbstractPagedInputView implements See
        public final boolean isInMemory() {
                return this.buildSideChannel == null;
        }
+
+       /**
+        * Gets the number of memory segments used by this partition, which 
includes build side
+        * memory buffers and overflow memory segments.
+        * 
+        * @return The number of occupied memory segments.
+        */
+       public int getNumOccupiedMemorySegments() {
+               // either the number of memory segments, or one for spilling
+               final int numPartitionBuffers = this.partitionBuffers != null ? 
this.partitionBuffers.length : 1;
+               return numPartitionBuffers + numOverflowSegments;
+       }
+       
        
        public int getBuildSideBlockCount() {
                return this.partitionBuffers == null ? 
this.buildSideWriteBuffer.getBlockCount() : this.partitionBuffers.length;
@@ -284,7 +297,7 @@ public class HashPartition<BT, PT> extends 
AbstractPagedInputView implements See
                        throw new RuntimeException("Bug in Hybrid Hash Join: " +
                                        "Request to spill a partition that has 
already been spilled.");
                }
-               if (getBuildSideBlockCount() + this.numOverflowSegments < 2) {
+               if (getNumOccupiedMemorySegments() < 2) {
                        throw new RuntimeException("Bug in Hybrid Hash Join: " +
                                "Request to spill a partition with less than 
two buffers.");
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/af477563/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/MutableHashTable.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/MutableHashTable.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/MutableHashTable.java
index 2ad01aa..efaceea 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/MutableHashTable.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/MutableHashTable.java
@@ -1093,8 +1093,8 @@ public class MutableHashTable<BT, PT> implements 
MemorySegmentSource {
                
                for (int i = 0; i < partitions.size(); i++) {
                        HashPartition<BT, PT> p = partitions.get(i);
-                       if (p.isInMemory() && p.getBuildSideBlockCount() > 
largestNumBlocks) {
-                               largestNumBlocks = p.getBuildSideBlockCount();
+                       if (p.isInMemory() && p.getNumOccupiedMemorySegments() 
> largestNumBlocks) {
+                               largestNumBlocks = 
p.getNumOccupiedMemorySegments();
                                largestPartNum = i;
                        }
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/af477563/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashTableTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashTableTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashTableTest.java
index 0bca22a..92adc2a 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashTableTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashTableTest.java
@@ -21,19 +21,23 @@ package org.apache.flink.runtime.operators.hash;
 import org.apache.flink.api.common.typeutils.TypeComparator;
 import org.apache.flink.api.common.typeutils.TypePairComparator;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.ByteValueSerializer;
 import org.apache.flink.api.common.typeutils.base.LongComparator;
 import org.apache.flink.api.common.typeutils.base.LongSerializer;
 import 
org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.runtime.TupleComparator;
 import org.apache.flink.api.java.typeutils.runtime.TupleSerializer;
+import org.apache.flink.api.java.typeutils.runtime.ValueComparator;
 import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.core.memory.MemorySegmentFactory;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
+import org.apache.flink.types.ByteValue;
 import org.apache.flink.util.MutableObjectIterator;
 
 import org.junit.Test;
+import org.mockito.Mockito;
 
 import java.io.File;
 import java.util.ArrayList;
@@ -146,6 +150,47 @@ public class HashTableTest {
                        ioMan.shutdown();
                }
        }
+
+       /**
+        * This tests the case where no additional partition buffers are used 
at the point when spilling
+        * is triggered, testing that overflow bucket buffers are taken into 
account when deciding which
+        * partition to spill.
+        */
+       @Test
+       public void testSpillingFreesOnlyOverflowSegments() {
+               final IOManager ioMan = new IOManagerAsync();
+               
+               final TypeSerializer<ByteValue> serializer = 
ByteValueSerializer.INSTANCE;
+               final TypeComparator<ByteValue> buildComparator = new 
ValueComparator<>(true, ByteValue.class);
+               final TypeComparator<ByteValue> probeComparator = new 
ValueComparator<>(true, ByteValue.class);
+               
+               @SuppressWarnings("unchecked")
+               final TypePairComparator<ByteValue, ByteValue> pairComparator = 
Mockito.mock(TypePairComparator.class);
+               
+               try {
+                       final int pageSize = 32*1024;
+                       final int numSegments = 34;
+
+                       List<MemorySegment> memory = getMemory(numSegments, 
pageSize);
+
+                       MutableHashTable<ByteValue, ByteValue> table = new 
MutableHashTable<>(
+                                       serializer, serializer, 
buildComparator, probeComparator,
+                                       pairComparator, memory, ioMan, 1, 
false);
+
+                       table.open(new ByteValueIterator(100000000), new 
ByteValueIterator(1));
+                       
+                       table.close();
+                       
+                       checkNoTempFilesRemain(ioMan);
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+               finally {
+                       ioMan.shutdown();
+               }
+       }
        
        // 
------------------------------------------------------------------------
        //  Utilities
@@ -219,4 +264,28 @@ public class HashTableTest {
                        }
                }
        }
+
+       private static class ByteValueIterator implements 
MutableObjectIterator<ByteValue> {
+
+               private final long numRecords;
+               private long value = 0;
+
+               ByteValueIterator(long numRecords) {
+                       this.numRecords = numRecords;
+               }
+
+               @Override
+               public ByteValue next(ByteValue aLong) {
+                       return next();
+               }
+
+               @Override
+               public ByteValue next() {
+                       if (value++ < numRecords) {
+                               return new ByteValue((byte) 0);
+                       } else {
+                               return null;
+                       }
+               }
+       }
 }

Reply via email to