Hi,

I'm not aware of a performance report for this feature. I don't think it is
well known or used a lot.
The classes to check out for prepartitioned / presorted data are
SplitDataProperties [1], DataSource [2], and as an example
PropertyDataSourceTest [3].

[1]
https://github.com/apache/flink/blob/master/flink-java/src/main/java/org/apache/flink/api/java/io/SplitDataProperties.java
[2]
https://github.com/apache/flink/blob/master/flink-java/src/main/java/org/apache/flink/api/java/operators/DataSource.java
[3]
https://github.com/apache/flink/blob/master/flink-optimizer/src/test/java/org/apache/flink/optimizer/PropertyDataSourceTest.java

Best, Fabian
<https://github.com/apache/flink/commit/f0a28bf5345084a0a43df16021e60078e322e087#diff-c7c697fdb164023d6a737f3dcd23f2c0>

2017-05-18 13:54 GMT+02:00 weijie tong <tongweijie...@gmail.com>:

> thanks for tip @Stephan.
>
> To [1] , there's a description about  "I’ve got sooo much data to join,
> do I really need to ship it?" . How to configure Flink to touch that
> target? Is there a performance report ?
>
> [1] : https://flink.apache.org/news/2015/03/13/peeking-into-
> Apache-Flinks-Engine-Room.html
>
> On Wed, May 17, 2017 at 1:32 AM, Stephan Ewen <se...@apache.org> wrote:
>
>> Hi!
>>
>> Be aware that the "Row" and "Record" types are not very high performance
>> data types. You might be measuring the data type overhead, rather than the
>> hash table performance. Also, the build measurements include the data
>> generation, which influences the results.
>>
>> If you want to purely benchmark the HashTable performance, try using
>> something like "Tuple2<Long, Long>" or so (or write your own custom
>> TypeSerializer / TypeComparator).
>>
>> Stephan
>>
>>
>> On Tue, May 16, 2017 at 11:23 AM, weijie tong <tongweijie...@gmail.com>
>> wrote:
>>
>>> Thanks for all your enthusiastic response. Yes, My target was to try to
>>> find the best performance in memory. I got that.
>>>
>>> On Tue, 16 May 2017 at 4:10 PM Fabian Hueske <fhue...@gmail.com> wrote:
>>>
>>>> Hi,
>>>>
>>>> Flink's HashJoin implementation was designed to gracefully handle
>>>> inputs that exceed the main memory.
>>>> It is not explicitly optimized for in-memory processing and does not
>>>> play fancy tricks like optimizing cache accesses or batching.
>>>> I assume your benchmark is about in-memory joins only. This was not the
>>>> main design goal when the join was implemented but robustness.
>>>> Since most of the development of Flink focuses on streaming
>>>> applications at the moment, the join implementation has barely been touched
>>>> in recent years (except for minor extensions and bugfixes).
>>>>
>>>> Regarding your tests, Tuple should give better performance than Row
>>>> because Row is null-sensitive and serialized a null-mask.
>>>> There is also a blog post about Flink's join performance [1] which is
>>>> already a bit dusty but as I said, the algorithm hasn't change much since
>>>> then.
>>>>
>>>> Best, Fabian
>>>>
>>>> [1] https://flink.apache.org/news/2015/03/13/peeking-into-Apache
>>>> -Flinks-Engine-Room.html
>>>>
>>>>
>>>> 2017-05-15 16:26 GMT+02:00 weijie tong <tongweijie...@gmail.com>:
>>>>
>>>>> The Flink version is 1.2.0
>>>>>
>>>>> On Mon, May 15, 2017 at 10:24 PM, weijie tong <tongweijie...@gmail.com
>>>>> > wrote:
>>>>>
>>>>>> @Till thanks for your reply.
>>>>>>
>>>>>> My code is similar to   HashTableITCase.testInMemory
>>>>>> MutableHashTable()   . It just use the MutableHashTable class ,
>>>>>> there's  no other Flink's configuration.  The main code body is:
>>>>>>
>>>>>> this.recordBuildSideAccessor = RecordSerializer.get();
>>>>>>> this.recordProbeSideAccessor = RecordSerializer.get();
>>>>>>> final int[] buildKeyPos = new int[]{buildSideJoinIndex};
>>>>>>> final int[] probeKeyPos = new int[]{probeSideJoinIndex};
>>>>>>> final Class<? extends Value>[] keyType = (Class<? extends Value>[]) new 
>>>>>>> Class[]{BytesValue.class};
>>>>>>> this.recordBuildSideComparator = new RecordComparator(buildKeyPos, 
>>>>>>> keyType);
>>>>>>> this.recordProbeSideComparator = new RecordComparator(probeKeyPos, 
>>>>>>> keyType);
>>>>>>> this.pactRecordComparator = new 
>>>>>>> HashJoinVectorJointGroupIterator.RecordPairComparator(buildSideJoinIndex);
>>>>>>> Sequence<Record> buildSideRecordsSeq = 
>>>>>>> makeSequenceRecordOfSameSideSegments(buildSideSegs, localJoinQuery);
>>>>>>> Sequence<Record> probeSideRecordsSeq = 
>>>>>>> makeSequenceRecordOfSameSideSegments(probeSideSegs, localJoinQuery);
>>>>>>> List<MemorySegment> memorySegments;
>>>>>>> int pageSize = hashTableMemoryManager.getTotalNumPages();
>>>>>>> try {
>>>>>>>   memorySegments = this.hashTableMemoryManager.allocatePages(MEM_OWNER, 
>>>>>>> pageSize);
>>>>>>> }
>>>>>>> catch (MemoryAllocationException e) {
>>>>>>>   LOGGER.error("could not allocate " + pageSize + " pages memory for 
>>>>>>> HashJoin", e);
>>>>>>>   Throwables.propagate(e);
>>>>>>>   return;
>>>>>>> }
>>>>>>> try {
>>>>>>>   Stopwatch stopwatch = Stopwatch.createStarted();
>>>>>>>   UniformRecordGenerator buildInput = new 
>>>>>>> UniformRecordGenerator(buildSideRecordsSeq);
>>>>>>>   UniformRecordGenerator probeInput = new 
>>>>>>> UniformRecordGenerator(probeSideRecordsSeq);
>>>>>>>   join = new MutableHashTable<Record, Record>(
>>>>>>>       recordBuildSideAccessor,
>>>>>>>       recordProbeSideAccessor,
>>>>>>>       recordBuildSideComparator,
>>>>>>>       recordProbeSideComparator,
>>>>>>>       pactRecordComparator,
>>>>>>>       memorySegments,
>>>>>>>       ioManager
>>>>>>>   );
>>>>>>>   join.open(buildInput,probeInput);
>>>>>>>
>>>>>>>   LOGGER.info("construct hash table elapsed:" + 
>>>>>>> stopwatch.elapsed(TimeUnit.MILLISECONDS) + "ms");
>>>>>>>
>>>>>>>
>>>>>> The BytesValue type is self defined one which holds byte[] , but just
>>>>>> like the original StringValue, also has the same serDe performance.
>>>>>>
>>>>>>
>>>>>> while (join.nextRecord()) {
>>>>>>   Record currentProbeRecord = join.getCurrentProbeRecord();
>>>>>>   MutableObjectIterator<Record> buildSideIterator = 
>>>>>> join.getBuildSideIterator();
>>>>>>   while (buildSideIterator.next(reusedBuildSideRow) != null) {
>>>>>>     materializeRecord2OutVector(reusedBuildSideRow, 
>>>>>> buildSideIndex2Value, buildSideIndex2Vector, rowNum);
>>>>>>     materializeRecord2OutVector(currentProbeRecord, 
>>>>>> probeSideIndex2Value, probeSideIndex2Vector, rowNum);
>>>>>>     rowNum++;
>>>>>>   }}
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> I have tried both the Record ,Row class as the type of records
>>>>>> without any better improved performance . I also tried batched the input
>>>>>> records. That means the  buildInput or probeInput variables of the
>>>>>> first code block which iterate one Record a time from another batched
>>>>>> Records . Batched records's content stay in memory in Drill's ValueVector
>>>>>> format. Once a record is need to participate in the build or probe phase
>>>>>> from a iterate.next() call,
>>>>>> it will be fetched from the batched in memory ValueVector content.
>>>>>> But no performance gains.
>>>>>>
>>>>>>
>>>>>> The top hotspot profile from Jprofiler is below:
>>>>>> >
>>>>>> Hot spot,"Self time (microseconds)","Average Time","Invocations"
>>>>>> org.apache.flink.types.Record.serialize,1014127,"n/a","n/a"
>>>>>> org.apache.flink.types.Record.deserialize,60684,"n/a","n/a"
>>>>>> org.apache.flink.types.Record.copyTo,83007,"n/a","n/a"
>>>>>> org.apache.flink.runtime.operators.hash.MutableHashTable.ope
>>>>>> n,55238,"n/a","n/a"
>>>>>> org.apache.flink.runtime.operators.hash.MutableHashTable.nex
>>>>>> tRecord,10955,"n/a","n/a"
>>>>>> org.apache.flink.runtime.memory.MemoryManager.release,33484,
>>>>>> "n/a","n/a"
>>>>>> org.apache.flink.runtime.memory.MemoryManager.allocatePages,
>>>>>> 104259,"n/a","n/a"
>>>>>>
>>>>>>
>>>>>> My log show that hashjoin.open()  method costs too much time.
>>>>>> >
>>>>>> construct hash table elapsed:1885ms
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Mon, May 15, 2017 at 6:20 PM, Till Rohrmann <trohrm...@apache.org>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi Weijie,
>>>>>>>
>>>>>>> it might be the case that batching the processing of multiple rows
>>>>>>> can give you an improved performance compared to single row processing.
>>>>>>>
>>>>>>> Maybe you could share the exact benchmark base line results and the
>>>>>>> code you use to test Flink's MutableHashTable with us. Also the Flink
>>>>>>> configuration and how you run it would be of interest. That way we 
>>>>>>> might be
>>>>>>> able to see if we can tune Flink a bit more.
>>>>>>>
>>>>>>> Cheers,
>>>>>>> Till
>>>>>>>
>>>>>>> On Sun, May 14, 2017 at 5:23 AM, weijie tong <
>>>>>>> tongweijie...@gmail.com> wrote:
>>>>>>>
>>>>>>>> I has a test case to use Flink's MutableHashTable class to do a
>>>>>>>> hash join on a local machine with 64g memory, 64cores. The test case 
>>>>>>>> is one
>>>>>>>> build table with 14w rows ,one probe table with 320w rows ,the matched
>>>>>>>> result rows is 12 w.
>>>>>>>>
>>>>>>>> It takes 2.2 seconds to complete the join.The performance seems
>>>>>>>> bad. I ensure there's no overflow, the smaller table is the build 
>>>>>>>> side. The
>>>>>>>> MutableObjectIterator is a sequence of Rows. The Row is composed of 
>>>>>>>> several
>>>>>>>> fields which are byte[]. Through my log,I find the open() method takes
>>>>>>>> 1.560 seconds. The probe iterates phase takes 680ms.  And my 
>>>>>>>> Jprofiler's
>>>>>>>> profile shows the MutableObjectIterator's next() method call is the
>>>>>>>> hotspot.
>>>>>>>>
>>>>>>>>
>>>>>>>> I want to know how to tune this scenario. I find Drill's HashJoin
>>>>>>>> is batch model. Its build side's input is a RecordBatch which holds 
>>>>>>>> batch
>>>>>>>> of rows and memory size is approach to L2 cache. Through this strategy 
>>>>>>>> it
>>>>>>>> will gain less method calls (that means call to next() ) and much 
>>>>>>>> efficient
>>>>>>>> to cpu calculation.  I also find SQL server's paper noticed the batch
>>>>>>>> model's performance gains (https://www.microsoft.com/en-
>>>>>>>> us/research/wp-content/uploads/2013/06/Apollo3-Sigmod-2013-f
>>>>>>>> inal.pdf)  .   I guess the performance's down is due to the single
>>>>>>>> row iterate model.
>>>>>>>>
>>>>>>>>
>>>>>>>> Hope someone to correct my opinion. Also maybe I have a wrong use
>>>>>>>>  of the MutableHashTable. wait for someone to give an advice.
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>
>

Reply via email to