[ 
https://issues.apache.org/jira/browse/FLINK-7926?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann updated FLINK-7926:
---------------------------------
    Component/s:     (was: Runtime / Coordination)
                 API / DataSet

> Bug in Hybrid Hash Join: Request to spill a partition with less than two 
> buffers.
> ---------------------------------------------------------------------------------
>
>                 Key: FLINK-7926
>                 URL: https://issues.apache.org/jira/browse/FLINK-7926
>             Project: Flink
>          Issue Type: Bug
>          Components: API / DataSet
>    Affects Versions: 1.3.2
>         Environment: standalone execution on MacBook Pro
> in flink-conf.yaml, taskmanager.numberOfTaskSlots changed from 1 to 3.
> taskmanager.heap.mb = 1024
> taskmanager.memory.preallocate = false
> taskmanager.numberOfTaskSlots = 3
>            Reporter: David Dreyfus
>            Priority: Major
>
> The following exception is thrown as the number of tasks increases.
> {code:java}
> 10/25/2017 14:26:16     LeftOuterJoin(Join at 
> with(JoinOperatorSetsBase.java:232))(1/1) switched to FAILED
> java.lang.RuntimeException: Bug in Hybrid Hash Join: Request to spill a 
> partition with less than two buffers.
>         at 
> org.apache.flink.runtime.operators.hash.HashPartition.spillPartition(HashPartition.java:302)
>         at 
> org.apache.flink.runtime.operators.hash.MutableHashTable.spillPartition(MutableHashTable.java:1231)
>         at 
> org.apache.flink.runtime.operators.hash.MutableHashTable.insertBucketEntry(MutableHashTable.java:1053)
>         at 
> org.apache.flink.runtime.operators.hash.MutableHashTable.insertIntoTable(MutableHashTable.java:978)
>         at 
> org.apache.flink.runtime.operators.hash.MutableHashTable.buildTableFromSpilledPartition(MutableHashTable.java:938)
>         at 
> org.apache.flink.runtime.operators.hash.MutableHashTable.prepareNextPartition(MutableHashTable.java:631)
>         at 
> org.apache.flink.runtime.operators.hash.MutableHashTable.nextRecord(MutableHashTable.java:666)
>         at 
> org.apache.flink.runtime.operators.hash.NonReusingBuildSecondHashJoinIterator.callWithNextKey(NonReusingBuildSecondHashJoinIterator.java:114)
>         at 
> org.apache.flink.runtime.operators.AbstractOuterJoinDriver.run(AbstractOuterJoinDriver.java:160)
>         at 
> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:490)
>         at 
> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:355)
>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
>         at java.lang.Thread.run(Thread.java:748)
> 10/25/2017 14:26:16     Job execution switched to status FAILING.
> java.lang.RuntimeException: Bug in Hybrid Hash Join: Request to spill a 
> partition with less than two buffers.
>         at 
> org.apache.flink.runtime.operators.hash.HashPartition.spillPartition(HashPartition.java:302)
>         at 
> org.apache.flink.runtime.operators.hash.MutableHashTable.spillPartition(MutableHashTable.java:1231)
>         at 
> org.apache.flink.runtime.operators.hash.MutableHashTable.insertBucketEntry(MutableHashTable.java:1053)
>         at 
> org.apache.flink.runtime.operators.hash.MutableHashTable.insertIntoTable(MutableHashTable.java:978)
>         at 
> org.apache.flink.runtime.operators.hash.MutableHashTable.buildTableFromSpilledPartition(MutableHashTable.java:938)
>         at 
> org.apache.flink.runtime.operators.hash.MutableHashTable.prepareNextPartition(MutableHashTable.java:631)
>         at 
> org.apache.flink.runtime.operators.hash.MutableHashTable.nextRecord(MutableHashTable.java:666)
>         at 
> org.apache.flink.runtime.operators.hash.NonReusingBuildSecondHashJoinIterator.callWithNextKey(NonReusingBuildSecondHashJoinIterator.java:114)
>         at 
> org.apache.flink.runtime.operators.AbstractOuterJoinDriver.run(AbstractOuterJoinDriver.java:160)
>         at 
> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:490)
>         at 
> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:355)
>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
>         at java.lang.Thread.run(Thread.java:748)
> {code}
> I run with the following command:
> {code:java}
> flink run -c com.northbay.union.Union3 
> ./FlinkWordCount/target/FlinkWordCount-1.0-SNAPSHOT.jar --left 
> /Users/user/Documents/Flink/Quickstart/Files/manysmall --right 
> /Users/user/Documents/Flink/Quickstart/Files/manysmall --output 
> /tmp/test6d_nomatch --output2 /tmp/test6d --filecount 50
> {code}
> The files submitted are all CSV (int, string, short)
> This is the code (break out into 3 separate files before using). The idea 
> behind this test is to compare (hash-join) pairs of files and combine their 
> results.
> {code:java}
> package com.northbay.hashcount;
> public class DeviceRecord1 {
>     public int device;
>     public String fingerprint;
>     public short dma;
>     public boolean match;
>     public DeviceRecord1() {
>         
>     }
>     public DeviceRecord1(DeviceRecord old) {
>         this.device = old.device;
>         this.fingerprint = old.fingerprint;
>         this.dma = old.dma;
>         this.match = false;
>     }
>     public DeviceRecord1(int device, String fingerprint, short dma) {
>         this.device = device;
>         this.fingerprint = fingerprint;
>         this.dma = dma;
>         this.match = false;
>     }
> }
> package com.northbay.hashcount;
> public class DeviceRecord {
>     public int device;
>     public String fingerprint;
>     public short dma;
>     public DeviceRecord() {
>         
>     }
>     public DeviceRecord(DeviceRecord old) {
>         this.device = old.device;
>         this.fingerprint = old.fingerprint;
>         this.dma = old.dma;
>     }
>     public DeviceRecord(int device, String fingerprint, short dma) {
>         this.device = device;
>         this.fingerprint = fingerprint;
>         this.dma = dma;
>     }
> }
> package com.northbay.union;
> import com.northbay.hashcount.DeviceRecord;
> import com.northbay.hashcount.DeviceRecord1;
> import java.util.LinkedList;
> import org.apache.flink.api.common.functions.FlatMapFunction;
> import org.apache.flink.api.common.functions.FilterFunction;
> import org.apache.flink.api.common.functions.FlatJoinFunction;
> import org.apache.flink.api.java.DataSet;
> import org.apache.flink.api.java.ExecutionEnvironment;
> import org.apache.flink.api.java.tuple.Tuple2;
> import org.apache.flink.api.java.utils.ParameterTool;
> import org.apache.flink.util.Collector;
> @SuppressWarnings("serial")
> public class Union2 {
>     // 
> *************************************************************************
>     //     PROGRAM
>     // 
> *************************************************************************
>     public static void main(String[] args) throws Exception {
>         final ParameterTool params = ParameterTool.fromArgs(args);
>         // set up the execution environment
>         final ExecutionEnvironment env = 
> ExecutionEnvironment.getExecutionEnvironment();
>         // make parameters available in the web interface
>         env.getConfig().setGlobalJobParameters(params);
>         // get input data
>         LinkedList<DataSet<DeviceRecord1>> joined = new LinkedList<>();
>         DataSet<DeviceRecord1> joinedData = null;
>         if (params.has("left") && params.has("right")) {
>             for (int i = 0; i < Integer.decode(params.get("filecount")); i++) 
> {
>                 DataSet<DeviceRecord> l, r;
>                 DataSet<DeviceRecord1> j;
>                 // read the text file from given input path
>                 l = env.readCsvFile(params.get("left") + "/" + 
> Integer.toString(i))
>                     .pojoType(DeviceRecord.class, "device", "fingerprint", 
> "dma");
>                 // read the text file from given input path
>                 r = env.readCsvFile(params.get("right") + "/" + 
> Integer.toString(i))
>                     .pojoType(DeviceRecord.class, "device", "fingerprint", 
> "dma")
>                     .filter(new MyFilter());
>                 j = l.leftOuterJoin(r)
>                     .where("fingerprint")
>                     .equalTo("fingerprint")
>                     .with(new FlatJoinFunction<DeviceRecord, DeviceRecord, 
> DeviceRecord1>() {
>                         @Override
>                         public void join(DeviceRecord left, DeviceRecord 
> right, Collector<DeviceRecord1> out) throws Exception {
>                             if (right == null) {
>                                 out.collect(new DeviceRecord1(left));
>                             } else {
>                                 DeviceRecord1 d = new DeviceRecord1(left);
>                                 d.match = true;
>                                 out.collect(d);
>                             }
>                         }
>                     });
>                 
>                 if (joinedData == null) {
>                     joinedData = j;
>                 } else {
>                     joinedData = joinedData.union(j);
>                 }
>                 joined.add(j);
>             }
>         }
>         // Count by DMA
>         DataSet<Tuple2<Integer, Integer>> counts = null;
>         if (joinedData
>             != null) {
>             counts = joinedData
>                 .flatMap(new Mapper(false))
>                 // group by the tuple field "0" (DMA -- it's been remapped) 
> and sum up tuple field "1"
>                 .groupBy(0)
>                 .sum(1);
>         }
>         // emit result
>         if (counts != null) {
>             if (params.has("output")) {
>                 counts.writeAsCsv(params.get("output"), "\n", ", ");
>             } else {
>                 System.out.println("Printing result to stdout. Use --output 
> to specify output path.");
>                 counts.print();
>             }
>         }
>         // Count by Device
>         DataSet<Tuple2<Integer, Integer>> counts2 = null;
>         if (joinedData
>             != null) {
>             counts2 = joinedData
>                 .flatMap(new Mapper2(true))
>                 // group by the tuple field "0" (Device -- it's been 
> remapped) and sum up tuple field "1"
>                 .groupBy(0)
>                 .sum(1);
>         }
>         // emit result
>         if (counts2 != null) {
>             if (params.has("output2")) {
>                 counts2.writeAsCsv(params.get("output2"), "\n", ", ");
>             } else {
>                 System.out.println("Printing result to stdout. Use --output2 
> to specify output path.");
>                 counts2.print();
>             }
>         }
>         // execute program
>         env.execute("Union2");
>     }
>     /**
>      * Implements a FlatMapFunction that counts records in a DMA user-defined
>      * FlatMapFunction. The function takes a device record and pulls out the
>      * DMA, generating ({@code Tuple2<Integer, Integer>}).
>      */
>     public static final class Mapper implements 
> FlatMapFunction<DeviceRecord1, Tuple2<Integer, Integer>> {
>         private boolean match = false;
>         Mapper(boolean match) {
>             this.match = match;
>         }
>         Mapper() {
>             this(false);
>         }
>         @Override
>         public void flatMap(DeviceRecord1 value, Collector<Tuple2<Integer, 
> Integer>> out) {
>             if (value.match == match) {
>                 out.collect(new Tuple2<>((int) value.dma, 1));
>             }
>         }
>     }
>     public static final class Mapper2 implements 
> FlatMapFunction<DeviceRecord1, Tuple2<Integer, Integer>> {
>         private boolean match = false;
>         Mapper2(boolean match) {
>             this.match = match;
>         }
>         Mapper2() {
>             this(false);
>         }
>         @Override
>         public void flatMap(DeviceRecord1 value, Collector<Tuple2<Integer, 
> Integer>> out) {
>             if (value.match == match) {
>                 out.collect(new Tuple2<>((int) value.device, 1));
>             }
>         }
>     }
>     public static final class MyFilter
>         implements FilterFunction<DeviceRecord> {
>         @Override
>         public boolean filter(DeviceRecord value) {
>             return value.dma != 4;
>         }
>     }
> }
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to