[ https://issues.apache.org/jira/browse/FLINK-7926?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Aljoscha Krettek closed FLINK-7926. ----------------------------------- Resolution: Won't Fix I'm closing this as "Abandoned", since there is no more activity and the code base has moved on quite a bit. Please re-open this if you feel otherwise and work should continue. > Bug in Hybrid Hash Join: Request to spill a partition with less than two > buffers. > --------------------------------------------------------------------------------- > > Key: FLINK-7926 > URL: https://issues.apache.org/jira/browse/FLINK-7926 > Project: Flink > Issue Type: Bug > Components: API / DataSet > Affects Versions: 1.3.2 > Environment: standalone execution on MacBook Pro > in flink-conf.yaml, taskmanager.numberOfTaskSlots changed from 1 to 3. > taskmanager.heap.mb = 1024 > taskmanager.memory.preallocate = false > taskmanager.numberOfTaskSlots = 3 > Reporter: David Dreyfus > Priority: Major > > The following exception is thrown as the number of tasks increases. > {code:java} > 10/25/2017 14:26:16 LeftOuterJoin(Join at > with(JoinOperatorSetsBase.java:232))(1/1) switched to FAILED > java.lang.RuntimeException: Bug in Hybrid Hash Join: Request to spill a > partition with less than two buffers. > at > org.apache.flink.runtime.operators.hash.HashPartition.spillPartition(HashPartition.java:302) > at > org.apache.flink.runtime.operators.hash.MutableHashTable.spillPartition(MutableHashTable.java:1231) > at > org.apache.flink.runtime.operators.hash.MutableHashTable.insertBucketEntry(MutableHashTable.java:1053) > at > org.apache.flink.runtime.operators.hash.MutableHashTable.insertIntoTable(MutableHashTable.java:978) > at > org.apache.flink.runtime.operators.hash.MutableHashTable.buildTableFromSpilledPartition(MutableHashTable.java:938) > at > org.apache.flink.runtime.operators.hash.MutableHashTable.prepareNextPartition(MutableHashTable.java:631) > at > org.apache.flink.runtime.operators.hash.MutableHashTable.nextRecord(MutableHashTable.java:666) > at > org.apache.flink.runtime.operators.hash.NonReusingBuildSecondHashJoinIterator.callWithNextKey(NonReusingBuildSecondHashJoinIterator.java:114) > at > org.apache.flink.runtime.operators.AbstractOuterJoinDriver.run(AbstractOuterJoinDriver.java:160) > at > org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:490) > at > org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:355) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702) > at java.lang.Thread.run(Thread.java:748) > 10/25/2017 14:26:16 Job execution switched to status FAILING. > java.lang.RuntimeException: Bug in Hybrid Hash Join: Request to spill a > partition with less than two buffers. > at > org.apache.flink.runtime.operators.hash.HashPartition.spillPartition(HashPartition.java:302) > at > org.apache.flink.runtime.operators.hash.MutableHashTable.spillPartition(MutableHashTable.java:1231) > at > org.apache.flink.runtime.operators.hash.MutableHashTable.insertBucketEntry(MutableHashTable.java:1053) > at > org.apache.flink.runtime.operators.hash.MutableHashTable.insertIntoTable(MutableHashTable.java:978) > at > org.apache.flink.runtime.operators.hash.MutableHashTable.buildTableFromSpilledPartition(MutableHashTable.java:938) > at > org.apache.flink.runtime.operators.hash.MutableHashTable.prepareNextPartition(MutableHashTable.java:631) > at > org.apache.flink.runtime.operators.hash.MutableHashTable.nextRecord(MutableHashTable.java:666) > at > org.apache.flink.runtime.operators.hash.NonReusingBuildSecondHashJoinIterator.callWithNextKey(NonReusingBuildSecondHashJoinIterator.java:114) > at > org.apache.flink.runtime.operators.AbstractOuterJoinDriver.run(AbstractOuterJoinDriver.java:160) > at > org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:490) > at > org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:355) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702) > at java.lang.Thread.run(Thread.java:748) > {code} > I run with the following command: > {code:java} > flink run -c com.northbay.union.Union3 > ./FlinkWordCount/target/FlinkWordCount-1.0-SNAPSHOT.jar --left > /Users/user/Documents/Flink/Quickstart/Files/manysmall --right > /Users/user/Documents/Flink/Quickstart/Files/manysmall --output > /tmp/test6d_nomatch --output2 /tmp/test6d --filecount 50 > {code} > The files submitted are all CSV (int, string, short) > This is the code (break out into 3 separate files before using). The idea > behind this test is to compare (hash-join) pairs of files and combine their > results. > {code:java} > package com.northbay.hashcount; > public class DeviceRecord1 { > public int device; > public String fingerprint; > public short dma; > public boolean match; > public DeviceRecord1() { > > } > public DeviceRecord1(DeviceRecord old) { > this.device = old.device; > this.fingerprint = old.fingerprint; > this.dma = old.dma; > this.match = false; > } > public DeviceRecord1(int device, String fingerprint, short dma) { > this.device = device; > this.fingerprint = fingerprint; > this.dma = dma; > this.match = false; > } > } > package com.northbay.hashcount; > public class DeviceRecord { > public int device; > public String fingerprint; > public short dma; > public DeviceRecord() { > > } > public DeviceRecord(DeviceRecord old) { > this.device = old.device; > this.fingerprint = old.fingerprint; > this.dma = old.dma; > } > public DeviceRecord(int device, String fingerprint, short dma) { > this.device = device; > this.fingerprint = fingerprint; > this.dma = dma; > } > } > package com.northbay.union; > import com.northbay.hashcount.DeviceRecord; > import com.northbay.hashcount.DeviceRecord1; > import java.util.LinkedList; > import org.apache.flink.api.common.functions.FlatMapFunction; > import org.apache.flink.api.common.functions.FilterFunction; > import org.apache.flink.api.common.functions.FlatJoinFunction; > import org.apache.flink.api.java.DataSet; > import org.apache.flink.api.java.ExecutionEnvironment; > import org.apache.flink.api.java.tuple.Tuple2; > import org.apache.flink.api.java.utils.ParameterTool; > import org.apache.flink.util.Collector; > @SuppressWarnings("serial") > public class Union2 { > // > ************************************************************************* > // PROGRAM > // > ************************************************************************* > public static void main(String[] args) throws Exception { > final ParameterTool params = ParameterTool.fromArgs(args); > // set up the execution environment > final ExecutionEnvironment env = > ExecutionEnvironment.getExecutionEnvironment(); > // make parameters available in the web interface > env.getConfig().setGlobalJobParameters(params); > // get input data > LinkedList<DataSet<DeviceRecord1>> joined = new LinkedList<>(); > DataSet<DeviceRecord1> joinedData = null; > if (params.has("left") && params.has("right")) { > for (int i = 0; i < Integer.decode(params.get("filecount")); i++) > { > DataSet<DeviceRecord> l, r; > DataSet<DeviceRecord1> j; > // read the text file from given input path > l = env.readCsvFile(params.get("left") + "/" + > Integer.toString(i)) > .pojoType(DeviceRecord.class, "device", "fingerprint", > "dma"); > // read the text file from given input path > r = env.readCsvFile(params.get("right") + "/" + > Integer.toString(i)) > .pojoType(DeviceRecord.class, "device", "fingerprint", > "dma") > .filter(new MyFilter()); > j = l.leftOuterJoin(r) > .where("fingerprint") > .equalTo("fingerprint") > .with(new FlatJoinFunction<DeviceRecord, DeviceRecord, > DeviceRecord1>() { > @Override > public void join(DeviceRecord left, DeviceRecord > right, Collector<DeviceRecord1> out) throws Exception { > if (right == null) { > out.collect(new DeviceRecord1(left)); > } else { > DeviceRecord1 d = new DeviceRecord1(left); > d.match = true; > out.collect(d); > } > } > }); > > if (joinedData == null) { > joinedData = j; > } else { > joinedData = joinedData.union(j); > } > joined.add(j); > } > } > // Count by DMA > DataSet<Tuple2<Integer, Integer>> counts = null; > if (joinedData > != null) { > counts = joinedData > .flatMap(new Mapper(false)) > // group by the tuple field "0" (DMA -- it's been remapped) > and sum up tuple field "1" > .groupBy(0) > .sum(1); > } > // emit result > if (counts != null) { > if (params.has("output")) { > counts.writeAsCsv(params.get("output"), "\n", ", "); > } else { > System.out.println("Printing result to stdout. Use --output > to specify output path."); > counts.print(); > } > } > // Count by Device > DataSet<Tuple2<Integer, Integer>> counts2 = null; > if (joinedData > != null) { > counts2 = joinedData > .flatMap(new Mapper2(true)) > // group by the tuple field "0" (Device -- it's been > remapped) and sum up tuple field "1" > .groupBy(0) > .sum(1); > } > // emit result > if (counts2 != null) { > if (params.has("output2")) { > counts2.writeAsCsv(params.get("output2"), "\n", ", "); > } else { > System.out.println("Printing result to stdout. Use --output2 > to specify output path."); > counts2.print(); > } > } > // execute program > env.execute("Union2"); > } > /** > * Implements a FlatMapFunction that counts records in a DMA user-defined > * FlatMapFunction. The function takes a device record and pulls out the > * DMA, generating ({@code Tuple2<Integer, Integer>}). > */ > public static final class Mapper implements > FlatMapFunction<DeviceRecord1, Tuple2<Integer, Integer>> { > private boolean match = false; > Mapper(boolean match) { > this.match = match; > } > Mapper() { > this(false); > } > @Override > public void flatMap(DeviceRecord1 value, Collector<Tuple2<Integer, > Integer>> out) { > if (value.match == match) { > out.collect(new Tuple2<>((int) value.dma, 1)); > } > } > } > public static final class Mapper2 implements > FlatMapFunction<DeviceRecord1, Tuple2<Integer, Integer>> { > private boolean match = false; > Mapper2(boolean match) { > this.match = match; > } > Mapper2() { > this(false); > } > @Override > public void flatMap(DeviceRecord1 value, Collector<Tuple2<Integer, > Integer>> out) { > if (value.match == match) { > out.collect(new Tuple2<>((int) value.device, 1)); > } > } > } > public static final class MyFilter > implements FilterFunction<DeviceRecord> { > @Override > public boolean filter(DeviceRecord value) { > return value.dma != 4; > } > } > } > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)