David Dreyfus created FLINK-7926: ------------------------------------ Summary: Bug in Hybrid Hash Join: Request to spill a partition with less than two buffers. Key: FLINK-7926 URL: https://issues.apache.org/jira/browse/FLINK-7926 Project: Flink Issue Type: Bug Components: Distributed Coordination Affects Versions: 1.3.2 Environment: standalone execution on MacBook Pro in flink-conf.yaml, taskmanager.numberOfTaskSlots changed from 1 to 3. taskmanager.heap.mb = 1024 taskmanager.memory.preallocate = false taskmanager.numberOfTaskSlots = 3 Reporter: David Dreyfus
The following exception is thrown as the number of tasks increases. {code:java} 10/25/2017 14:26:16 LeftOuterJoin(Join at with(JoinOperatorSetsBase.java:232))(1/1) switched to FAILED java.lang.RuntimeException: Bug in Hybrid Hash Join: Request to spill a partition with less than two buffers. at org.apache.flink.runtime.operators.hash.HashPartition.spillPartition(HashPartition.java:302) at org.apache.flink.runtime.operators.hash.MutableHashTable.spillPartition(MutableHashTable.java:1231) at org.apache.flink.runtime.operators.hash.MutableHashTable.insertBucketEntry(MutableHashTable.java:1053) at org.apache.flink.runtime.operators.hash.MutableHashTable.insertIntoTable(MutableHashTable.java:978) at org.apache.flink.runtime.operators.hash.MutableHashTable.buildTableFromSpilledPartition(MutableHashTable.java:938) at org.apache.flink.runtime.operators.hash.MutableHashTable.prepareNextPartition(MutableHashTable.java:631) at org.apache.flink.runtime.operators.hash.MutableHashTable.nextRecord(MutableHashTable.java:666) at org.apache.flink.runtime.operators.hash.NonReusingBuildSecondHashJoinIterator.callWithNextKey(NonReusingBuildSecondHashJoinIterator.java:114) at org.apache.flink.runtime.operators.AbstractOuterJoinDriver.run(AbstractOuterJoinDriver.java:160) at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:490) at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:355) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702) at java.lang.Thread.run(Thread.java:748) 10/25/2017 14:26:16 Job execution switched to status FAILING. java.lang.RuntimeException: Bug in Hybrid Hash Join: Request to spill a partition with less than two buffers. at org.apache.flink.runtime.operators.hash.HashPartition.spillPartition(HashPartition.java:302) at org.apache.flink.runtime.operators.hash.MutableHashTable.spillPartition(MutableHashTable.java:1231) at org.apache.flink.runtime.operators.hash.MutableHashTable.insertBucketEntry(MutableHashTable.java:1053) at org.apache.flink.runtime.operators.hash.MutableHashTable.insertIntoTable(MutableHashTable.java:978) at org.apache.flink.runtime.operators.hash.MutableHashTable.buildTableFromSpilledPartition(MutableHashTable.java:938) at org.apache.flink.runtime.operators.hash.MutableHashTable.prepareNextPartition(MutableHashTable.java:631) at org.apache.flink.runtime.operators.hash.MutableHashTable.nextRecord(MutableHashTable.java:666) at org.apache.flink.runtime.operators.hash.NonReusingBuildSecondHashJoinIterator.callWithNextKey(NonReusingBuildSecondHashJoinIterator.java:114) at org.apache.flink.runtime.operators.AbstractOuterJoinDriver.run(AbstractOuterJoinDriver.java:160) at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:490) at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:355) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702) at java.lang.Thread.run(Thread.java:748) {code} I run with the following command: {code:java} flink run -c com.northbay.union.Union3 ./FlinkWordCount/target/FlinkWordCount-1.0-SNAPSHOT.jar --left /Users/user/Documents/Flink/Quickstart/Files/manysmall --right /Users/user/Documents/Flink/Quickstart/Files/manysmall --output /tmp/test6d_nomatch --output2 /tmp/test6d --filecount 50 {code} The files submitted are all CSV (int, string, short) This is the code (break out into 3 separate files before using). The idea behind this test is to compare (hash-join) pairs of files and combine their results. {code:java} package com.northbay.hashcount; public class DeviceRecord1 { public int device; public String fingerprint; public short dma; public boolean match; public DeviceRecord1() { } public DeviceRecord1(DeviceRecord old) { this.device = old.device; this.fingerprint = old.fingerprint; this.dma = old.dma; this.match = false; } public DeviceRecord1(int device, String fingerprint, short dma) { this.device = device; this.fingerprint = fingerprint; this.dma = dma; this.match = false; } } package com.northbay.hashcount; public class DeviceRecord { public int device; public String fingerprint; public short dma; public DeviceRecord() { } public DeviceRecord(DeviceRecord old) { this.device = old.device; this.fingerprint = old.fingerprint; this.dma = old.dma; } public DeviceRecord(int device, String fingerprint, short dma) { this.device = device; this.fingerprint = fingerprint; this.dma = dma; } } package com.northbay.union; import com.northbay.hashcount.DeviceRecord; import com.northbay.hashcount.DeviceRecord1; import java.util.LinkedList; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.functions.FilterFunction; import org.apache.flink.api.common.functions.FlatJoinFunction; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.util.Collector; @SuppressWarnings("serial") public class Union2 { // ************************************************************************* // PROGRAM // ************************************************************************* public static void main(String[] args) throws Exception { final ParameterTool params = ParameterTool.fromArgs(args); // set up the execution environment final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); // make parameters available in the web interface env.getConfig().setGlobalJobParameters(params); // get input data LinkedList<DataSet<DeviceRecord1>> joined = new LinkedList<>(); DataSet<DeviceRecord1> joinedData = null; if (params.has("left") && params.has("right")) { for (int i = 0; i < Integer.decode(params.get("filecount")); i++) { DataSet<DeviceRecord> l, r; DataSet<DeviceRecord1> j; // read the text file from given input path l = env.readCsvFile(params.get("left") + "/" + Integer.toString(i)) .pojoType(DeviceRecord.class, "device", "fingerprint", "dma"); // read the text file from given input path r = env.readCsvFile(params.get("right") + "/" + Integer.toString(i)) .pojoType(DeviceRecord.class, "device", "fingerprint", "dma") .filter(new MyFilter()); j = l.leftOuterJoin(r) .where("fingerprint") .equalTo("fingerprint") .with(new FlatJoinFunction<DeviceRecord, DeviceRecord, DeviceRecord1>() { @Override public void join(DeviceRecord left, DeviceRecord right, Collector<DeviceRecord1> out) throws Exception { if (right == null) { out.collect(new DeviceRecord1(left)); } else { DeviceRecord1 d = new DeviceRecord1(left); d.match = true; out.collect(d); } } }); if (joinedData == null) { joinedData = j; } else { joinedData = joinedData.union(j); } joined.add(j); } } // Count by DMA DataSet<Tuple2<Integer, Integer>> counts = null; if (joinedData != null) { counts = joinedData .flatMap(new Mapper(false)) // group by the tuple field "0" (DMA -- it's been remapped) and sum up tuple field "1" .groupBy(0) .sum(1); } // emit result if (counts != null) { if (params.has("output")) { counts.writeAsCsv(params.get("output"), "\n", ", "); } else { System.out.println("Printing result to stdout. Use --output to specify output path."); counts.print(); } } // Count by Device DataSet<Tuple2<Integer, Integer>> counts2 = null; if (joinedData != null) { counts2 = joinedData .flatMap(new Mapper2(true)) // group by the tuple field "0" (Device -- it's been remapped) and sum up tuple field "1" .groupBy(0) .sum(1); } // emit result if (counts2 != null) { if (params.has("output2")) { counts2.writeAsCsv(params.get("output2"), "\n", ", "); } else { System.out.println("Printing result to stdout. Use --output2 to specify output path."); counts2.print(); } } // execute program env.execute("Union2"); } /** * Implements a FlatMapFunction that counts records in a DMA user-defined * FlatMapFunction. The function takes a device record and pulls out the * DMA, generating ({@code Tuple2<Integer, Integer>}). */ public static final class Mapper implements FlatMapFunction<DeviceRecord1, Tuple2<Integer, Integer>> { private boolean match = false; Mapper(boolean match) { this.match = match; } Mapper() { this(false); } @Override public void flatMap(DeviceRecord1 value, Collector<Tuple2<Integer, Integer>> out) { if (value.match == match) { out.collect(new Tuple2<>((int) value.dma, 1)); } } } public static final class Mapper2 implements FlatMapFunction<DeviceRecord1, Tuple2<Integer, Integer>> { private boolean match = false; Mapper2(boolean match) { this.match = match; } Mapper2() { this(false); } @Override public void flatMap(DeviceRecord1 value, Collector<Tuple2<Integer, Integer>> out) { if (value.match == match) { out.collect(new Tuple2<>((int) value.device, 1)); } } } public static final class MyFilter implements FilterFunction<DeviceRecord> { @Override public boolean filter(DeviceRecord value) { return value.dma != 4; } } } {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)