Thanks for all your enthusiastic response. Yes, My target was to try to
find the best performance in memory. I got that.
On Tue, 16 May 2017 at 4:10 PM Fabian Hueske <> wrote:

> Hi,
> Flink's HashJoin implementation was designed to gracefully handle inputs
> that exceed the main memory.
> It is not explicitly optimized for in-memory processing and does not play
> fancy tricks like optimizing cache accesses or batching.
> I assume your benchmark is about in-memory joins only. This was not the
> main design goal when the join was implemented but robustness.
> Since most of the development of Flink focuses on streaming applications
> at the moment, the join implementation has barely been touched in recent
> years (except for minor extensions and bugfixes).
> Regarding your tests, Tuple should give better performance than Row
> because Row is null-sensitive and serialized a null-mask.
> There is also a blog post about Flink's join performance [1] which is
> already a bit dusty but as I said, the algorithm hasn't change much since
> then.
> Best, Fabian
> [1]
> 2017-05-15 16:26 GMT+02:00 weijie tong <>:
>> The Flink version is 1.2.0
>> On Mon, May 15, 2017 at 10:24 PM, weijie tong <>
>> wrote:
>>> @Till thanks for your reply.
>>> My code is similar to   HashTableITCase.testInMemoryMutableHashTable()
>>> . It just use the MutableHashTable class , there's  no other Flink's
>>> configuration.  The main code body is:
>>> this.recordBuildSideAccessor = RecordSerializer.get();
>>>> this.recordProbeSideAccessor = RecordSerializer.get();
>>>> final int[] buildKeyPos = new int[]{buildSideJoinIndex};
>>>> final int[] probeKeyPos = new int[]{probeSideJoinIndex};
>>>> final Class<? extends Value>[] keyType = (Class<? extends Value>[]) new 
>>>> Class[]{BytesValue.class};
>>>> this.recordBuildSideComparator = new RecordComparator(buildKeyPos, 
>>>> keyType);
>>>> this.recordProbeSideComparator = new RecordComparator(probeKeyPos, 
>>>> keyType);
>>>> this.pactRecordComparator = new 
>>>> HashJoinVectorJointGroupIterator.RecordPairComparator(buildSideJoinIndex);
>>>> Sequence<Record> buildSideRecordsSeq = 
>>>> makeSequenceRecordOfSameSideSegments(buildSideSegs, localJoinQuery);
>>>> Sequence<Record> probeSideRecordsSeq = 
>>>> makeSequenceRecordOfSameSideSegments(probeSideSegs, localJoinQuery);
>>>> List<MemorySegment> memorySegments;
>>>> int pageSize = hashTableMemoryManager.getTotalNumPages();
>>>> try {
>>>>   memorySegments = this.hashTableMemoryManager.allocatePages(MEM_OWNER, 
>>>> pageSize);
>>>> }
>>>> catch (MemoryAllocationException e) {
>>>>   LOGGER.error("could not allocate " + pageSize + " pages memory for 
>>>> HashJoin", e);
>>>>   Throwables.propagate(e);
>>>>   return;
>>>> }
>>>> try {
>>>>   Stopwatch stopwatch = Stopwatch.createStarted();
>>>>   UniformRecordGenerator buildInput = new 
>>>> UniformRecordGenerator(buildSideRecordsSeq);
>>>>   UniformRecordGenerator probeInput = new 
>>>> UniformRecordGenerator(probeSideRecordsSeq);
>>>>   join = new MutableHashTable<Record, Record>(
>>>>       recordBuildSideAccessor,
>>>>       recordProbeSideAccessor,
>>>>       recordBuildSideComparator,
>>>>       recordProbeSideComparator,
>>>>       pactRecordComparator,
>>>>       memorySegments,
>>>>       ioManager
>>>>   );
>>>>"construct hash table elapsed:" + 
>>>> stopwatch.elapsed(TimeUnit.MILLISECONDS) + "ms");
>>> The BytesValue type is self defined one which holds byte[] , but just
>>> like the original StringValue, also has the same serDe performance.
>>> while (join.nextRecord()) {
>>>   Record currentProbeRecord = join.getCurrentProbeRecord();
>>>   MutableObjectIterator<Record> buildSideIterator = 
>>> join.getBuildSideIterator();
>>>   while ( != null) {
>>>     materializeRecord2OutVector(reusedBuildSideRow, buildSideIndex2Value, 
>>> buildSideIndex2Vector, rowNum);
>>>     materializeRecord2OutVector(currentProbeRecord, probeSideIndex2Value, 
>>> probeSideIndex2Vector, rowNum);
>>>     rowNum++;
>>>   }}
>>> I have tried both the Record ,Row class as the type of records without
>>> any better improved performance . I also tried batched the input records.
>>> That means the  buildInput or probeInput variables of the first code
>>> block which iterate one Record a time from another batched Records .
>>> Batched records's content stay in memory in Drill's ValueVector format.
>>> Once a record is need to participate in the build or probe phase from a
>>> call,
>>> it will be fetched from the batched in memory ValueVector content. But
>>> no performance gains.
>>> The top hotspot profile from Jprofiler is below:
>>> >
>>> Hot spot,"Self time (microseconds)","Average Time","Invocations"
>>> org.apache.flink.types.Record.serialize,1014127,"n/a","n/a"
>>> org.apache.flink.types.Record.deserialize,60684,"n/a","n/a"
>>> org.apache.flink.types.Record.copyTo,83007,"n/a","n/a"
>>> org.apache.flink.runtime.operators.hash.MutableHashTable.nextRecord,10955,"n/a","n/a"
>>> org.apache.flink.runtime.memory.MemoryManager.release,33484,"n/a","n/a"
>>> org.apache.flink.runtime.memory.MemoryManager.allocatePages,104259,"n/a","n/a"
>>> My log show that  method costs too much time.
>>> >
>>> construct hash table elapsed:1885ms
>>> On Mon, May 15, 2017 at 6:20 PM, Till Rohrmann <>
>>> wrote:
>>>> Hi Weijie,
>>>> it might be the case that batching the processing of multiple rows can
>>>> give you an improved performance compared to single row processing.
>>>> Maybe you could share the exact benchmark base line results and the
>>>> code you use to test Flink's MutableHashTable with us. Also the Flink
>>>> configuration and how you run it would be of interest. That way we might be
>>>> able to see if we can tune Flink a bit more.
>>>> Cheers,
>>>> Till
>>>> On Sun, May 14, 2017 at 5:23 AM, weijie tong <>
>>>> wrote:
>>>>> I has a test case to use Flink's MutableHashTable class to do a hash
>>>>> join on a local machine with 64g memory, 64cores. The test case is one
>>>>> build table with 14w rows ,one probe table with 320w rows ,the matched
>>>>> result rows is 12 w.
>>>>> It takes 2.2 seconds to complete the join.The performance seems bad. I
>>>>> ensure there's no overflow, the smaller table is the build side. The
>>>>> MutableObjectIterator is a sequence of Rows. The Row is composed of 
>>>>> several
>>>>> fields which are byte[]. Through my log,I find the open() method takes
>>>>> 1.560 seconds. The probe iterates phase takes 680ms.  And my Jprofiler's
>>>>> profile shows the MutableObjectIterator's next() method call is the
>>>>> hotspot.
>>>>> I want to know how to tune this scenario. I find Drill's HashJoin is
>>>>> batch model. Its build side's input is a RecordBatch which holds batch of
>>>>> rows and memory size is approach to L2 cache. Through this strategy it 
>>>>> will
>>>>> gain less method calls (that means call to next() ) and much efficient to
>>>>> cpu calculation.  I also find SQL server's paper noticed the batch model's
>>>>> performance gains (
>>>>>  .   I guess the performance's down is due to the single row iterate 
>>>>> model.
>>>>> Hope someone to correct my opinion. Also maybe I have a wrong use  of
>>>>> the MutableHashTable. wait for someone to give an advice.

Reply via email to