Dear Wiki user, You have subscribed to a wiki page or wiki category on "Hadoop Wiki" for change notification.
The "Hive/JoinOptimization" page has been changed by LiyinTang. http://wiki.apache.org/hadoop/Hive/JoinOptimization?action=diff&rev1=9&rev2=10 -------------------------------------------------- '''Fig 1. The Previous Map Join''' - So the basic idea of map join is to hold the data of small table in Mapper’s memory and do the join work in Map stage, which saves the Reduce stage. As shown in Fig 1, the previous map join operation is not scale for large data because each Mapper will directly read the small table data from HDFS. If the large data file is large enough, there will be thousands of Mapper launched to read different record of this large data file. And thousands of Mapper will read this small table data from HDFS into the memory, which makes the small table to be the performance bottleneck, or sometimes Mapper will get lots of time-out for reading this small file, which may cause the task failed. + So the basic idea of Map Join is to hold the data of small table in Mapper’s memory and do the join work in Map stage, which saves the Reduce stage. As shown in Fig 1, the previous map join operation is not scale for large data because each Mapper will directly read the small table data from HDFS. If the large data file is large enough, there will be thousands of Mapper launched to read different record of this large data file. And thousands of Mapper will read this small table data from HDFS into the memory, which makes the small table to be the performance bottleneck, or sometimes Mapper will get lots of time-out for reading this small file, which may cause the task failed. {{attachment:fig2.jpg||height="881px",width="1184px"}} '''Fig 2. The Optimized Map Join''' - Hive-1641 ([[http://issues.apache.org/jira/browse/HIVE-1293|http://issues.apache.org/jira/browse/HIVE-1641]]) has solved this problem. As shown in Fig2, the basic idea is to create a new task, MapReduce Local Task, before the orginal Join Map/Reduce Task. This new task will read the small table data from HDFS to in-memory hashtable. After reading, it will serialize the in-memory hashtable into files on disk and compress the hashtable file into a tar file. In next stage, when the MapReduce task is launching, it will put this tar file to Hadoop Distributed Cache, which will populate the tar file to each Mapper’s local disk and decompress the file. So all the Mappers can deserialize the hashtable file back into memory and do the join work as before. + Hive-1641 ([[http://issues.apache.org/jira/browse/HIVE-1293|http://issues.apache.org/jira/browse/HIVE-1641]]) has solved this problem. As shown in Fig2, the basic idea is to create a new task, MapReduce Local Task, before the original Join Map/Reduce Task. This new task will read the small table data from HDFS to in-memory hashtable. After reading, it will serialize the in-memory hashtable into files on disk and compress the hashtable file into a tar file. In next stage, when the MapReduce task is launching, it will put this tar file to Hadoop Distributed Cache, which will populate the tar file to each Mapper’s local disk and decompress the file. So all the Mappers can deserialize the hashtable file back into memory and do the join work as before. - Obviously, the Local Task is a very memory intensive. So the query processor will launch this task in a child jvm, which has the same heap size as the Mapper's. Since the Local Task may run out of memory, the query processor will measure the memory usage of the local task very carefully. Once the memory usage of the Local Task is higher than a threshold. This Local Task will abort itself and tells the user that this table is too large to hold in the memory. User can change this threshold by '''''set hive.mapjoin.localtask.max.memory.usage = 0.999;''''' + Obviously, the Local Task is a very memory intensive. So the query processor will launch this task in a child jvm, which has the same heap size as the Mapper's. Since the Local Task may run out of memory, the query processor will measure the memory usage of the local task very carefully. Once the memory usage of the Local Task is higher than a threshold number. This Local Task will abort itself and tells the user that this table is too large to hold in the memory. User can change this threshold by '''''set hive.mapjoin.localtask.max.memory.usage = 0.999;''''' == 1.2 Removing JDBM == - Previously, Hive uses JDBM ([[http://issues.apache.org/jira/browse/HIVE-1293|http://jdbm.sourceforge.net/]]) as a persistent hashtable. Whenever the in-memory hashtable cannot hold data any more, it will swap the key/value into the JDBM table. However when profiing the Map Join, we found out this JDBM component takes more than 70 % CPU time as shown in Fig3. Also the persistent file JDBM genreated is too large to put into the Distributed Cache. For example, if users put 67,000 simple interger key/value pairs into the JDBM, it will generate more 22M hashtable file. So the JDBM is too heavy weight for Map Join and it would better to remove this componet from Hive. Map Join is designed for holding the small table's data into memory. If the table is too large to hold, just run as a Common Join. There is no need to use persistent hashtable any more. Hive-1754 ([[http://issues.apache.org/jira/browse/HIVE-1293|http://issues.apache.org/jira/browse/HIVE-1754]]) + Previously, Hive uses JDBM ([[http://issues.apache.org/jira/browse/HIVE-1293|http://jdbm.sourceforge.net/]]) as a persistent hashtable. Whenever the in-memory hashtable cannot hold data any more, it will swap the key/value into the JDBM table. However when profiling the Map Join, we found out this JDBM component takes more than 70 % CPU time as shown in Fig3. Also the persistent file JDBM generated is too large to put into the Distributed Cache. For example, if users put 67,000 simple integer key/value pairs into the JDBM, it will generate more 22M hashtable file. So the JDBM is too heavy weight for Map Join and it would better to remove this component from Hive. Map Join is designed for holding the small table's data into memory. If the table is too large to hold, just run as a Common Join. There is no need to use persistent hashtable any more. Hive-1754 ([[http://issues.apache.org/jira/browse/HIVE-1293|http://issues.apache.org/jira/browse/HIVE-1754]]) {{attachment:fig3.jpg}} '''Fig 3. The Profiling Result of JDBM''' == 1.3 Performance Evaluation == + Here are some performance comparison results between the previous Map Join with the optimized Map Join + '''Table 1: The Comparison between the previous map join with the new optimized map join''' {{attachment:fig4.jpg}} - As shown in Table1, the optmized map join will be 12 ~ 26 times faster than the previous one. Most of map join performance improvement comes from removing the JDBM component. + As shown in Table1, the optimized Map Join will be 12 ~ 26 times faster than the previous one. Most of map join performance improvement comes from removing the JDBM component. = 2. Converting Join into Map Join Automatically = - == 2.1 New Join Exeuction Flow == + == 2.1 New Join Execution Flow == - Since map join is faster than the common join, it would better to run the map join whenever possible. Previously, Hive users need to give a hint in the query to assign which table the small table is. For example, '''''select /*+mapjoin(a)*/ a.key, b.value from src1 a join src2 b on a.key=b.key'''''; It is not a good way for user experience and query performance, because sometimes user may give a wrong hint and also users may not give any hints. It would be much better to convert the Common Join into Map Join without users' hint. + Since map join is faster than the common join, it would better to run the map join whenever possible. Previously, Hive users need to give a hint in the query to assign which table the small table is. For example, '''''select /*+mapjoin(a)*/ * from src1 x join src2y on x.key=y.key'''''; It is not a good way for user experience and query performance, because sometimes user may give a wrong hint and also users may not give any hints. It would be much better to convert the Common Join into Map Join without users' hint. - Hive-1642 ([[http://issues.apache.org/jira/browse/HIVE-1293|http://issues.apache.org/jira/browse/HIVE-1642]]) has solved the problem by converting the Common Join into Map Join automatically. For the Map Join, the query processor should know which input table the big table is. Other input table will be recognize as the small table during the execution stage and these tables need to be hold in the memory. However, the query processor has no idea of input file size during compiling time. Because some of the table may be intermediate tables generated from sub queries. So the query processor can only figure out the input file size during executiom time. + Hive-1642 ([[http://issues.apache.org/jira/browse/HIVE-1293|http://issues.apache.org/jira/browse/HIVE-1642]]) has solved the problem by converting the Common Join into Map Join automatically. For the Map Join, the query processor should know which input table the big table is. Other input table will be recognize as the small table during the execution stage and these tables need to be hold in the memory. However, the query processor has no idea of input file size during compiling time. Because some of the table may be intermediate tables generated from sub queries. So the query processor can only figure out the input file size during the execution time. Right now, users need to enable this feature by''''' set hive.auto.convert.join = true;''''' {{attachment:fig5.jpg||height="716px",width="1017px"}} - '''Fig 5, The Join Execution Flow''' + '''Fig 5: The Join Execution Flow''' - As shown in fig5, the left side shows the previous Common Join execution flow, which very straightfroward. On the contrast, the right side is the new Common Join execution flow. During the compile time, hte query processor will generate a Conditional Task, which contains a list of tasks and one of these tasks will be resolved to run during the execution time. It means the tasks in the Conditional Task's list are candidate and one of them will be chosen to run during the run time. First, the original Common Join Task should be into the list. Also the query processor will generate a series of Map Join Task by assuming each of the input tables may be the big table. Take the same example as before, '''''select /*+mapjoin(a)*/ a.key, b.value from src1 y a join src2 b on a.key=b.key'''''. Both table '''''src2 '''''and '''''src1 '''''may be the big table, so it will generate 2 Map Join Task. One is assuming src1 is the big table and the other is assuming src2 is the big table, as shown in Fig 6. + As shown in fig5, the left side shows the previous Common Join execution flow, which very straightforward. On the other side, the right side is the new Common Join execution flow. During the compile time, the query processor will generate a Conditional Task, which contains a list of tasks and one of these tasks will be resolved to run during the execution time. It means the tasks in the Conditional Task's list are the candidates and one of them will be chosen to run during the run time. First, the original Common Join Task should be put into the list. Also the query processor will generate a series of Map Join Task by assuming each of the input tables may be the big table. Take the same example as before, '''''select /*+mapjoin(a)*/ * from src1 x join src2y on x.key=y.key'''''. Both table '''''src2 '''''and '''''src1 '''''may be the big table, so it will generate 2 Map Join Task. One is assuming src1 is the big table and the other is assuming src2 is the big table, as shown in Fig 6. {{attachment:fig6.jpg||height="778px",width="1072px"}} - '''Fig 6, Create Map Join Task by Assuming One of the Input Table is the Big Table''' + '''Fig 6: Create Map Join Task by Assuming One of the Input Table is the Big Table''' == 2.2 Resolving the Join Operation at Run Time == During the execution stage, the Conditional Task can know exactly the file size of each input table, even the table is a intermediate table. If all the tables are too large to be converted into map join, then just run the Common Join Task as previously. If one of the tables is large and others are small enough to run Map Join, then the Conditional Task will pick the corresponding Map Join Local Task to run. By this mechanism, it can convert the Common Join into Map Join automatically and dynamically. - Currently, if total size of small tables are large than 25M, then the Conditional Task will choose the original Common Join run. 25M is a very conservative number and user can change this number by '''''set hive.smalltable.filesize = 30000000'''''. + Currently, if the total size of small tables are large than 25M, then the Conditional Task will choose the original Common Join run. 25M is a very conservative number and user can change this number by '''''set hive.smalltable.filesize = 30000000'''''. == 2.3 Backup Task == - As mentioned above, the Local Task of Map Join is a very memory intensive. So the query processor will launch this task in a child jvm, which has the same heap size as the Mapper's. Since the Local Task may run out of memory, the query processor will measure the memory usage of the local task very carefully. Once the memory usage of the Local Task is higher than a threshold. This Local Task will abort itself and it means the map join task fails. + As mentioned above, the Local Task of Map Join is a very memory intensive. So the query processor will launch this task in a child jvm, which has the same heap size as the Mapper's. Since the Local Task may run out of memory, the query processor will measure the memory usage of the local task very carefully. Once the memory usage of the Local Task is higher than a threshold. This Local Task will abort itself and it means the map join task fails. In this case, the query processor will launch the original Common Join task as a Backup Task to run, which is totally transparent to user. The basic idea is shown as Fig 7. {{attachment:fig7.jpg}} Fig7, Run the Original Common Join as a Backup Task - In this case, the query processor will launch the original Common Join task as a Backup Task to run, which is totally transparent to user. The basic idea is shown as Fig 7. - == 2.4 Performance Evaluation == - Here are some performance comparison results. All the benchmark queries here can be converted into Map Join. + Here are some performance comparison results between the previous Common Join with the optimized Common Join. All the benchmark queries here can be converted into Map Join. '''Table 2: The Comparison between the previous join with the new optimized join''' ''' {{attachment:fig8.jpg}} ''' - For the previous common join, the experiment only calculates the average time of map reduce task execution time. Because job finish time will include the job scheduling overhead. Sometimes it will wait for some time to start to run the job in the cluster. Also for the new optimized common join, the experiment only adds up the average time of local task execution time with the average time of map reduce execution time. So both of the results should avoid the job scheduling overhead. + For the previous common join, the experiment only calculates the average time of map reduce task execution time. Because job finish time will include the job scheduling overhead. Sometimes it will wait for some time to start to run the job in the cluster. Also for the new optimized common join, the experiment only adds up the average time of local task execution time with the average time of map reduce execution time. So both of the results have avoided the job scheduling overhead. From the result, if the new common join can be converted into map join, it will get 57% ~163 % performance improvement.
