Repository: flink Updated Branches: refs/heads/master 16afb8ec6 -> af477563e
[FLINK-2763] [runtime] Fix hash table spilling partition selection. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/af477563 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/af477563 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/af477563 Branch: refs/heads/master Commit: af477563eb1acaab74da1a508c7e5fa37339c206 Parents: 16afb8e Author: Stephan Ewen <[email protected]> Authored: Tue Sep 29 14:07:01 2015 +0200 Committer: Stephan Ewen <[email protected]> Committed: Tue Sep 29 14:07:01 2015 +0200 ---------------------------------------------------------------------- .../runtime/operators/hash/HashPartition.java | 15 ++++- .../operators/hash/MutableHashTable.java | 4 +- .../runtime/operators/hash/HashTableTest.java | 69 ++++++++++++++++++++ 3 files changed, 85 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/af477563/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/HashPartition.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/HashPartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/HashPartition.java index 7baaee7..32fd74a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/HashPartition.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/HashPartition.java @@ -198,6 +198,19 @@ public class HashPartition<BT, PT> extends AbstractPagedInputView implements See public final boolean isInMemory() { return this.buildSideChannel == null; } + + /** + * Gets the number of memory segments used by this partition, which includes build side + * memory buffers and overflow memory segments. + * + * @return The number of occupied memory segments. + */ + public int getNumOccupiedMemorySegments() { + // either the number of memory segments, or one for spilling + final int numPartitionBuffers = this.partitionBuffers != null ? this.partitionBuffers.length : 1; + return numPartitionBuffers + numOverflowSegments; + } + public int getBuildSideBlockCount() { return this.partitionBuffers == null ? this.buildSideWriteBuffer.getBlockCount() : this.partitionBuffers.length; @@ -284,7 +297,7 @@ public class HashPartition<BT, PT> extends AbstractPagedInputView implements See throw new RuntimeException("Bug in Hybrid Hash Join: " + "Request to spill a partition that has already been spilled."); } - if (getBuildSideBlockCount() + this.numOverflowSegments < 2) { + if (getNumOccupiedMemorySegments() < 2) { throw new RuntimeException("Bug in Hybrid Hash Join: " + "Request to spill a partition with less than two buffers."); } http://git-wip-us.apache.org/repos/asf/flink/blob/af477563/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/MutableHashTable.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/MutableHashTable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/MutableHashTable.java index 2ad01aa..efaceea 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/MutableHashTable.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/MutableHashTable.java @@ -1093,8 +1093,8 @@ public class MutableHashTable<BT, PT> implements MemorySegmentSource { for (int i = 0; i < partitions.size(); i++) { HashPartition<BT, PT> p = partitions.get(i); - if (p.isInMemory() && p.getBuildSideBlockCount() > largestNumBlocks) { - largestNumBlocks = p.getBuildSideBlockCount(); + if (p.isInMemory() && p.getNumOccupiedMemorySegments() > largestNumBlocks) { + largestNumBlocks = p.getNumOccupiedMemorySegments(); largestPartNum = i; } } http://git-wip-us.apache.org/repos/asf/flink/blob/af477563/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashTableTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashTableTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashTableTest.java index 0bca22a..92adc2a 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashTableTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashTableTest.java @@ -21,19 +21,23 @@ package org.apache.flink.runtime.operators.hash; import org.apache.flink.api.common.typeutils.TypeComparator; import org.apache.flink.api.common.typeutils.TypePairComparator; import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.base.ByteValueSerializer; import org.apache.flink.api.common.typeutils.base.LongComparator; import org.apache.flink.api.common.typeutils.base.LongSerializer; import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.typeutils.runtime.TupleComparator; import org.apache.flink.api.java.typeutils.runtime.TupleSerializer; +import org.apache.flink.api.java.typeutils.runtime.ValueComparator; import org.apache.flink.core.memory.MemorySegment; import org.apache.flink.core.memory.MemorySegmentFactory; import org.apache.flink.runtime.io.disk.iomanager.IOManager; import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync; +import org.apache.flink.types.ByteValue; import org.apache.flink.util.MutableObjectIterator; import org.junit.Test; +import org.mockito.Mockito; import java.io.File; import java.util.ArrayList; @@ -146,6 +150,47 @@ public class HashTableTest { ioMan.shutdown(); } } + + /** + * This tests the case where no additional partition buffers are used at the point when spilling + * is triggered, testing that overflow bucket buffers are taken into account when deciding which + * partition to spill. + */ + @Test + public void testSpillingFreesOnlyOverflowSegments() { + final IOManager ioMan = new IOManagerAsync(); + + final TypeSerializer<ByteValue> serializer = ByteValueSerializer.INSTANCE; + final TypeComparator<ByteValue> buildComparator = new ValueComparator<>(true, ByteValue.class); + final TypeComparator<ByteValue> probeComparator = new ValueComparator<>(true, ByteValue.class); + + @SuppressWarnings("unchecked") + final TypePairComparator<ByteValue, ByteValue> pairComparator = Mockito.mock(TypePairComparator.class); + + try { + final int pageSize = 32*1024; + final int numSegments = 34; + + List<MemorySegment> memory = getMemory(numSegments, pageSize); + + MutableHashTable<ByteValue, ByteValue> table = new MutableHashTable<>( + serializer, serializer, buildComparator, probeComparator, + pairComparator, memory, ioMan, 1, false); + + table.open(new ByteValueIterator(100000000), new ByteValueIterator(1)); + + table.close(); + + checkNoTempFilesRemain(ioMan); + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + finally { + ioMan.shutdown(); + } + } // ------------------------------------------------------------------------ // Utilities @@ -219,4 +264,28 @@ public class HashTableTest { } } } + + private static class ByteValueIterator implements MutableObjectIterator<ByteValue> { + + private final long numRecords; + private long value = 0; + + ByteValueIterator(long numRecords) { + this.numRecords = numRecords; + } + + @Override + public ByteValue next(ByteValue aLong) { + return next(); + } + + @Override + public ByteValue next() { + if (value++ < numRecords) { + return new ByteValue((byte) 0); + } else { + return null; + } + } + } }
