Some additional info to Mike's comment:

I discussed this "skew-resistant parallel join" with Mike yesterday and
checked some papers briefly. It looks like the common strategy is splitting
the block of tuples that has the same join key from one side (e.g., R) to
several nodes and let the tuple from the other side (e.g., S) to be
broadcasted to those nodes if they have the same value. Teradata utilizes
the stats to apply this plan over hash plan.

*Teradata*: if the statistics is available, then the optimizer chooses
"broadcasting" strategy on skewed values over "hash" strategy.
http://community.teradata.com/t5/Database/tuning-the-Skewed-joins/td-p/13305
http://www.info.teradata.com/htmlpubs/DB_TTU_14_00/index.html#page/Database_Management/B035_1094_111A/ch09.050.139.html

*Pig*: use "skewed" keyword to let Pig to do skewed join handling.
https://pig.apache.org/docs/r0.9.2/perf.html#skewed-joins


Best,
Taewoo

On Wed, Dec 7, 2016 at 11:27 PM, Mike Carey <dtab...@gmail.com> wrote:

> To all, just to clarify:  This is a self-join (equijoin) query on a
> non-key attribute using real data (Amazon reviews, key is reviewer id)
> which has a non-uniform value distribution in terms of the number of
> entries per join key value, and in this case we really (someday...) need a
> more sophisticated parallel join method to have a balanced load.  (This
> case needs several nodes to work together on joining the biggest value with
> itself, actually - we need to look at how Teradata, Pig, and other systems
> with skew-resistant join algorithms handle this case.  Could be a good
> immigration project for someone joining the team, perhaps.  Or maybe it
> would be something of interest to Mingda+Tyson, since they've been joining
> things recently?)
>
> Cheers,
>
> Mike
>
>
>
> On 12/7/16 3:08 PM, Taewoo Kim wrote:
>
>> In short, the reason on why a specific one node among 9 nodes didn't stop
>> its hash-join job was due to a skewness (out of 9M records, 40,000 records
>> was containing the same join key) as Abdullah suggested. Thanks all for
>> the
>> information. Our system works fine as expected for this matter! Along the
>> way, I found some optimization chances and excessive byte[] allocation
>> issues and fixing this will increase the performance. :-)
>>
>> Best,
>> Taewoo
>>
>> On Mon, Dec 5, 2016 at 9:39 PM, Taewoo Kim <wangs...@gmail.com> wrote:
>>
>> @Abdullah: Thanks. I missed your e-mail and just checked that. Will try.
>>>
>>> Best,
>>> Taewoo
>>>
>>> On Fri, Dec 2, 2016 at 10:32 AM, abdullah alamoudi <bamou...@gmail.com>
>>> wrote:
>>>
>>> Taewoo,
>>>> You can use the diagnostics end point (/admin/diagnostics) to look at
>>>> all
>>>> the stack traces from a single interface when that happens. This could
>>>> give
>>>> an idea on what is happening in such case.
>>>> Although, from what you described, it could be that we have some
>>>> skewness
>>>> during query execution? (could be nulls,missing? any special values?).
>>>> That
>>>> is also worth considering.
>>>>
>>>> Trying to help without enough context :-). Cheers,
>>>> Abdullah.
>>>>
>>>> On Dec 2, 2016, at 10:22 AM, Taewoo Kim <wangs...@gmail.com> wrote:
>>>>>
>>>>> Additional note: @Till: Yes. It happened again for the same hash-join
>>>>> query. As we can see in the bold part of the following CC.log, one node
>>>>> alone was executing for two hours.
>>>>>
>>>>>
>>>>> Dec 01, 2016 10:41:56 PM
>>>>> org.apache.hyracks.control.cc.scheduler.ActivityClusterPlanner
>>>>> planActivityCluster
>>>>> INFO: Plan for org.apache.hyracks.api.job.ActivityCluster@383ecfdd
>>>>> Dec 01, 2016 10:41:56 PM
>>>>> org.apache.hyracks.control.cc.scheduler.ActivityClusterPlanner
>>>>> planActivityCluster
>>>>> INFO: Built 1 Task Clusters
>>>>> Dec 01, 2016 10:41:56 PM
>>>>> org.apache.hyracks.control.cc.scheduler.ActivityClusterPlanner
>>>>> planActivityCluster
>>>>> INFO: Tasks: [TID:ANID:ODID:1:1:0, TID:ANID:ODID:1:1:1,
>>>>> TID:ANID:ODID:1:1:2, TID:ANID:ODID:1:1:3, TID:ANID:ODID:1:1:4,
>>>>> TID:ANID:ODID:1:1:5, TID:ANID:ODID:1:1:6, TID:ANID:ODID:1:1:7,
>>>>> TID:ANID:ODID:1:1:8, TID:ANID:ODID:1:1:9, TID:ANID:ODID:1:1:10,
>>>>> TID:ANID:ODID:1:1:11, TID:ANID:ODID:1:1:12, TID:ANID:ODID:1:1:13,
>>>>> TID:ANID:ODID:1:1:14, TID:ANID:ODID:1:1:15, TID:ANID:ODID:1:1:16,
>>>>> TID:ANID:ODID:1:1:17, TID:ANID:ODID:4:0:0]
>>>>> Dec 01, 2016 10:43:18 PM
>>>>> org.apache.hyracks.control.common.work.WorkQueue$WorkerThread run
>>>>> INFO: Executing: TaskComplete: [ss1120_nc3[JID:5:TAID:TID:ANI
>>>>>
>>>> D:ODID:1:1:5:0]
>>>>
>>>>> Dec 01, 2016 10:43:22 PM
>>>>> org.apache.hyracks.control.common.work.WorkQueue$WorkerThread run
>>>>> INFO: Executing: TaskComplete: [ss1120_nc4[JID:5:TAID:TID:ANI
>>>>>
>>>> D:ODID:1:1:7:0]
>>>>
>>>>> Dec 01, 2016 10:43:23 PM
>>>>> org.apache.hyracks.control.common.work.WorkQueue$WorkerThread run
>>>>> INFO: Executing: TaskComplete:
>>>>> [ss1120_nc9[JID:5:TAID:TID:ANID:ODID:1:1:16:0]
>>>>> Dec 01, 2016 10:43:28 PM
>>>>> org.apache.hyracks.control.common.work.WorkQueue$WorkerThread run
>>>>> INFO: Executing: TaskComplete: [ss1120_nc2[JID:5:TAID:TID:ANI
>>>>>
>>>> D:ODID:1:1:2:0]
>>>>
>>>>> Dec 01, 2016 10:43:31 PM
>>>>> org.apache.hyracks.control.common.work.WorkQueue$WorkerThread run
>>>>> INFO: Executing: TaskComplete: [ss1120_nc2[JID:5:TAID:TID:ANI
>>>>>
>>>> D:ODID:1:1:3:0]
>>>>
>>>>> Dec 01, 2016 10:43:34 PM
>>>>> org.apache.hyracks.control.common.work.WorkQueue$WorkerThread run
>>>>> INFO: Executing: TaskComplete: [ss1120_nc5[JID:5:TAID:TID:ANI
>>>>>
>>>> D:ODID:1:1:8:0]
>>>>
>>>>> Dec 01, 2016 10:43:40 PM
>>>>> org.apache.hyracks.control.common.work.WorkQueue$WorkerThread run
>>>>> INFO: Executing: TaskComplete:
>>>>> [ss1120_nc9[JID:5:TAID:TID:ANID:ODID:1:1:17:0]
>>>>> Dec 01, 2016 10:43:41 PM
>>>>> org.apache.hyracks.control.common.work.WorkQueue$WorkerThread run
>>>>> INFO: Executing: TaskComplete: [ss1120_nc4[JID:5:TAID:TID:ANI
>>>>>
>>>> D:ODID:1:1:6:0]
>>>>
>>>>> Dec 01, 2016 10:43:49 PM
>>>>> org.apache.hyracks.control.common.work.WorkQueue$WorkerThread run
>>>>> INFO: Executing: TaskComplete:
>>>>> [ss1120_nc7[JID:5:TAID:TID:ANID:ODID:1:1:12:0]
>>>>> Dec 01, 2016 10:43:51 PM
>>>>> org.apache.hyracks.control.common.work.WorkQueue$WorkerThread run
>>>>> INFO: Executing: TaskComplete: [ss1120_nc1[JID:5:TAID:TID:ANI
>>>>>
>>>> D:ODID:1:1:1:0]
>>>>
>>>>> Dec 01, 2016 10:43:53 PM
>>>>> org.apache.hyracks.control.common.work.WorkQueue$WorkerThread run
>>>>> INFO: Executing: TaskComplete: [ss1120_nc5[JID:5:TAID:TID:ANI
>>>>>
>>>> D:ODID:1:1:9:0]
>>>>
>>>>> Dec 01, 2016 10:43:58 PM
>>>>> org.apache.hyracks.control.common.work.WorkQueue$WorkerThread run
>>>>> INFO: Executing: TaskComplete:
>>>>> [ss1120_nc7[JID:5:TAID:TID:ANID:ODID:1:1:13:0]
>>>>> Dec 01, 2016 10:44:25 PM
>>>>> org.apache.hyracks.control.common.work.WorkQueue$WorkerThread run
>>>>> INFO: Executing: TaskComplete:
>>>>> [ss1120_nc8[JID:5:TAID:TID:ANID:ODID:1:1:14:0]
>>>>> Dec 01, 2016 10:44:29 PM
>>>>> org.apache.hyracks.control.common.work.WorkQueue$WorkerThread run
>>>>> INFO: Executing: TaskComplete:
>>>>> [ss1120_nc6[JID:5:TAID:TID:ANID:ODID:1:1:11:0]
>>>>> Dec 01, 2016 10:44:51 PM
>>>>> org.apache.hyracks.control.common.work.WorkQueue$WorkerThread run
>>>>> INFO: Executing: TaskComplete:
>>>>> [ss1120_nc8[JID:5:TAID:TID:ANID:ODID:1:1:15:0]
>>>>> Dec 01, 2016 10:45:19 PM
>>>>> org.apache.hyracks.control.common.work.WorkQueue$WorkerThread run
>>>>> INFO: Executing: TaskComplete: [ss1120_nc3[JID:5:TAID:TID:ANI
>>>>>
>>>> D:ODID:1:1:4:0]
>>>>
>>>>> *Dec 01, 2016 10:46:31 PM
>>>>> org.apache.hyracks.control.common.work.WorkQueue$WorkerThread run*
>>>>> *INFO: Executing: TaskComplete:
>>>>> [ss1120_nc1[JID:5:TAID:TID:ANID:ODID:1:1:0:0]*
>>>>> *Dec 02, 2016 12:30:19 AM
>>>>> org.apache.hyracks.control.common.work.WorkQueue$WorkerThread run*
>>>>> *INFO: Executing: RegisterResultPartitionLocation: JobId@JID:5
>>>>> ResultSetId@RSID:0 Partition@0 NPartitions@1
>>>>> ResultPartitionLocation@128.195.11.47:45470
>>>>> <http://ResultPartitionLocation@128.195.11.47:45470>
>>>>> OrderedResult@true
>>>>> EmptyResult@false*
>>>>> Dec 02, 2016 12:30:19 AM
>>>>> org.apache.hyracks.control.common.work.WorkQueue$WorkerThread run
>>>>> INFO: Executing: ReportResultPartitionWriteCompletion: JobId@JID:5
>>>>> ResultSetId@RSID:0 Partition@0
>>>>> Dec 02, 2016 12:30:19 AM
>>>>> org.apache.hyracks.control.common.work.WorkQueue$WorkerThread run
>>>>> INFO: Executing: TaskComplete: [ss1120_nc6[JID:5:TAID:TID:ANI
>>>>>
>>>> D:ODID:4:0:0:0]
>>>>
>>>>> Dec 02, 2016 12:30:19 AM
>>>>> org.apache.hyracks.control.common.work.WorkQueue$WorkerThread run
>>>>> INFO: Executing: TaskComplete:
>>>>> [ss1120_nc6[JID:5:TAID:TID:ANID:ODID:1:1:10:0]
>>>>> Dec 02, 2016 12:30:19 AM
>>>>> org.apache.hyracks.control.common.work.WorkQueue$WorkerThread run
>>>>> INFO: Executing: JobCleanup: JobId@JID:5 Status@TERMINATED
>>>>> Dec 02, 2016 12:30:19 AM org.apache.hyracks.control.cc.
>>>>>
>>>> work.JobCleanupWork
>>>>
>>>>> run
>>>>> INFO: Cleanup for JobRun with id: JID:5
>>>>> Dec 02, 2016 12:30:19 AM
>>>>> org.apache.hyracks.control.common.work.WorkQueue$WorkerThread run
>>>>> INFO: Executing: JobletCleanupNotification
>>>>> Dec 02, 2016 12:30:19 AM
>>>>> org.apache.hyracks.control.common.work.WorkQueue$WorkerThread run
>>>>> INFO: Executing: JobletCleanupNotification
>>>>> Dec 02, 2016 12:30:19 AM
>>>>> org.apache.hyracks.control.common.work.WorkQueue$WorkerThread run
>>>>> INFO: Executing: JobletCleanupNotification
>>>>> Dec 02, 2016 12:30:19 AM
>>>>> org.apache.hyracks.control.common.work.WorkQueue$WorkerThread run
>>>>> INFO: Executing: JobletCleanupNotification
>>>>> Dec 02, 2016 12:30:19 AM
>>>>> org.apache.hyracks.control.common.work.WorkQueue$WorkerThread run
>>>>> INFO: Executing: JobletCleanupNotification
>>>>> Dec 02, 2016 12:30:19 AM
>>>>> org.apache.hyracks.control.common.work.WorkQueue$WorkerThread run
>>>>> INFO: Executing: JobletCleanupNotification
>>>>> Dec 02, 2016 12:30:19 AM
>>>>> org.apache.hyracks.control.common.work.WorkQueue$WorkerThread run
>>>>> INFO: Executing: JobletCleanupNotification
>>>>> Dec 02, 2016 12:30:19 AM
>>>>> org.apache.hyracks.control.common.work.WorkQueue$WorkerThread run
>>>>> INFO: Executing: JobletCleanupNotification
>>>>> Dec 02, 2016 12:30:19 AM
>>>>> org.apache.hyracks.control.common.work.WorkQueue$WorkerThread run
>>>>> INFO: Executing: JobletCleanupNotification
>>>>> Dec 02, 2016 12:30:19 AM org.apache.asterix.active.Acti
>>>>>
>>>> veLifecycleListener
>>>>
>>>>> notifyJobFinish
>>>>> INFO: NO NEED TO NOTIFY JOB FINISH!
>>>>> Dec 02, 2016 12:30:19 AM
>>>>> org.apache.hyracks.control.common.work.WorkQueue$WorkerThread run
>>>>> INFO: Executing: GetResultPartitionLocations: JobId@JID:5
>>>>>
>>>> ResultSetId@RSID:0
>>>>
>>>>> Known@null
>>>>>
>>>>> Best,
>>>>> Taewoo
>>>>>
>>>>> On Thu, Dec 1, 2016 at 10:43 PM, Taewoo Kim <wangs...@gmail.com>
>>>>> wrote:
>>>>>
>>>>> @Ian: I have a separate CC on one node that doesn't have a NC. Yourkit
>>>>>> might be a good way to find the reason. Thanks.
>>>>>>
>>>>>> @Till: I think so. I am sending the same query now to see what happens
>>>>>> this time.
>>>>>>
>>>>>> Best,
>>>>>> Taewoo
>>>>>>
>>>>>> On Thu, Dec 1, 2016 at 10:41 PM, Till Westmann <ti...@apache.org>
>>>>>>
>>>>> wrote:
>>>>
>>>>> Hi Taewoo,
>>>>>>>
>>>>>>> is this behavior reproducible?
>>>>>>>
>>>>>>> Cheers,
>>>>>>> Till
>>>>>>>
>>>>>>>
>>>>>>> On 1 Dec 2016, at 22:14, Taewoo Kim wrote:
>>>>>>>
>>>>>>> PS: It took 2 more hours to finish the job on one NC. I wonder why
>>>>>>>
>>>>>> this
>>>>
>>>>> happens.
>>>>>>>>
>>>>>>>> Dec 01, 2016 7:19:35 PM
>>>>>>>> org.apache.hyracks.control.common.work.WorkQueue$WorkerThread run
>>>>>>>> INFO: Executing: NotifyTaskComplete
>>>>>>>> Dec 01, 2016 9:11:23 PM
>>>>>>>> org.apache.hyracks.control.common.work.WorkQueue$WorkerThread run
>>>>>>>> INFO: Executing: CleanupJoblet
>>>>>>>> Dec 01, 2016 9:11:23 PM
>>>>>>>> org.apache.hyracks.control.nc.work.CleanupJobletWork run
>>>>>>>> INFO: Cleaning up after job: JID:4
>>>>>>>> Dec 01, 2016 9:11:23 PM org.apache.hyracks.control.nc.Joblet close
>>>>>>>> WARNING: Freeing leaked 54919521 bytes
>>>>>>>>
>>>>>>>> Best,
>>>>>>>> Taewoo
>>>>>>>>
>>>>>>>> On Thu, Dec 1, 2016 at 8:39 PM, Taewoo Kim <wangs...@gmail.com>
>>>>>>>>
>>>>>>> wrote:
>>>>
>>>>> Hi All,
>>>>>>>>
>>>>>>>>> Have you experienced this case?
>>>>>>>>>
>>>>>>>>> I have 9 NCs and the CPU utilization of one NC shows 100% for 1
>>>>>>>>>
>>>>>>>> hour and
>>>>
>>>>> 30 minutes while other NCs have finished their job about 1 hour ago.
>>>>>>>>> Even
>>>>>>>>> the problematic NC shows the following log at the end. So, looks
>>>>>>>>>
>>>>>>>> like
>>>>
>>>>> it's
>>>>>>>>> done but I'm not sure why this job never finishes. It's a simple
>>>>>>>>>
>>>>>>>> hash
>>>>
>>>>> join
>>>>>>>>> for 9M records on 9 nodes.
>>>>>>>>>
>>>>>>>>> Dec 01, 2016 7:18:02 PM org.apache.hyracks.control.com
>>>>>>>>> mon.work.WorkQueue$WorkerThread
>>>>>>>>> run
>>>>>>>>> INFO: Executing: NotifyTaskComplete
>>>>>>>>>
>>>>>>>>> Best,
>>>>>>>>> Taewoo
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>
>

Reply via email to