[ 
https://issues.apache.org/jira/browse/SPARK-3948?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Saisai Shao updated SPARK-3948:
-------------------------------
    Description: 
Several exceptions occurred when running TPC-DS queries against latest master 
branch with sort-based shuffle enable, like PARSING_ERROR(2) in snappy, 
deserializing error in Kryo and offset out-range in FileManagedBuffer, all 
these exceptions are gone when we changed to hash-based shuffle.

With deep investigation, we found that some shuffle output file is unexpectedly 
smaller than the others, as the log shows:

{noformat}
14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
shuffle_6_9_11, offset: 3055635, length: 236708, file length: 47274167
14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
shuffle_6_10_11, offset: 2986484, length: 222755, file length: 47174539
14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
shuffle_6_11_11, offset: 2995341, length: 259871, file length: 383405
14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
shuffle_6_12_11, offset: 2991030, length: 268191, file length: 47478892
14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
shuffle_6_13_11, offset: 3016292, length: 230694, file length: 47420826
14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
shuffle_6_14_11, offset: 3061400, length: 241136, file length: 47395509
{noformat}

As you can see the total file length of shuffle_6_11_11 is much smaller than 
other same stage map output results.

And we also dump the map outputs in map side to see if this small size output 
is correct or not, below is the log:

{noformat}
 In bypass merge sort, file name: /mnt/DP_disk1/animal/spark/spark-local-
20141014182142-8345/22/shuffle_6_11_0.data, file length: 383405length:
 274722 262597 291290 272902 264941 270358 291005 295285 252482 
287142 232617 259871 233734 241439 228897 234282 253834 235619 
233803 255532 270739 253825 262087 266404 234273 250120 262983 
257024 255947 254971 258908 247862 221613 258566 245399 251684 
274843 226150 264278 245279 225656 235084 239466 212851 242245 
218781 222191 215500 211548 234256 208601 204113 191923 217895 
227020 215331 212313 223725 250876 256875 239276 266777 235520 
237462 234063 242270 246825 255888 235937 236956 233099 264508 
260303 233294 239061 254856 257475 230105 246553 260412 210355 
211201 219572 206636 226866 209937 226618 218208 206255 248069 
221717 222112 215734 248088 239207 246125 239056 241133 253091 
246738 233128 242794 231606 255737 221123 252115 247286 229688 
251087 250047 237579 263079 256251 238214 208641 201120 204009 
200825 211965 200600 194492 226471 194887 226975 215072 206008 
233288 222132 208860 219064 218162 237126 220465 201343 225711 
232178 233786 212767 211462 213671 215853 227822 233782 214727 
247001 228968 247413 222674 214241 184122 215643 207665 219079 
215185 207718 212723 201613 216600 212591 208174 204195 208099 
229079 230274 223373 214999 256626 228895 231821 383405 229646 
220212 245495 245960 227556 213266 237203 203805 240509 239306 
242365 218416 238487 219397 240026 251011 258369 255365 259811 
283313 248450 264286 264562 257485 279459 249187 257609 274964 
292369 273826
{noformat}

Here I dump the file name, length and each partition's length, obviously the 
sum of all partition lengths is not equal to file length. So I think there may 
be a situation paritionWriter in ExternalSorter not always append to the end of 
previous written file, the file's content is overwritten in some parts, and 
this lead to the exceptions I mentioned before.

Also I changed the code of copyStream by disable transferTo, use the previous 
one, all the issues are gone. So I think there maybe some flushing problems in 
transferTo when processed data is large.



  was:
Several exceptions occurred when running TPC-DS queries against latest master 
branch with sort-based shuffle enable, like PARSING_ERROR(2) in snappy, 
deserializing error in Kryo and offset out-range in FileManagedBuffer, all 
these exceptions are gone when we changed to hash-based shuffle.

With deep investigation, we found that some shuffle output file is unexpectedly 
smaller than the others, as the log shows:

{noformat}
14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
shuffle_6_9_11, offset: 3055635, length: 236708, file length: 47274167
14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
shuffle_6_11_11, offset: 2986484, length: 222755, file length: 47174539
14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
shuffle_6_11_11, offset: 2995341, length: 259871, file length: 383405
14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
shuffle_6_12_11, offset: 2991030, length: 268191, file length: 47478892
14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
shuffle_6_13_11, offset: 3016292, length: 230694, file length: 47420826
14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
shuffle_6_14_11, offset: 3061400, length: 241136, file length: 47395509
{noformat}

As you can see the total file length of shuffle_6_10_11 is much smaller than 
other same stage map output results.

And we also dump the map outputs in map side to see if this small size output 
is correct or not, below is the log:

{noformat}
 In bypass merge sort, file name: /mnt/DP_disk1/animal/spark/spark-local-
20141014182142-8345/22/shuffle_6_11_0.data, file length: 383405length:
 274722 262597 291290 272902 264941 270358 291005 295285 252482 
287142 232617 259871 233734 241439 228897 234282 253834 235619 
233803 255532 270739 253825 262087 266404 234273 250120 262983 
257024 255947 254971 258908 247862 221613 258566 245399 251684 
274843 226150 264278 245279 225656 235084 239466 212851 242245 
218781 222191 215500 211548 234256 208601 204113 191923 217895 
227020 215331 212313 223725 250876 256875 239276 266777 235520 
237462 234063 242270 246825 255888 235937 236956 233099 264508 
260303 233294 239061 254856 257475 230105 246553 260412 210355 
211201 219572 206636 226866 209937 226618 218208 206255 248069 
221717 222112 215734 248088 239207 246125 239056 241133 253091 
246738 233128 242794 231606 255737 221123 252115 247286 229688 
251087 250047 237579 263079 256251 238214 208641 201120 204009 
200825 211965 200600 194492 226471 194887 226975 215072 206008 
233288 222132 208860 219064 218162 237126 220465 201343 225711 
232178 233786 212767 211462 213671 215853 227822 233782 214727 
247001 228968 247413 222674 214241 184122 215643 207665 219079 
215185 207718 212723 201613 216600 212591 208174 204195 208099 
229079 230274 223373 214999 256626 228895 231821 383405 229646 
220212 245495 245960 227556 213266 237203 203805 240509 239306 
242365 218416 238487 219397 240026 251011 258369 255365 259811 
283313 248450 264286 264562 257485 279459 249187 257609 274964 
292369 273826
{noformat}

Here I dump the file name, length and each partition's length, obviously the 
sum of all partition lengths is not equal to file length. So I think there may 
be a situation paritionWriter in ExternalSorter not always append to the end of 
previous written file, the file's content is overwritten in some parts, and 
this lead to the exceptions I mentioned before.

Also I changed the code of copyStream by disable transferTo, use the previous 
one, all the issues are gone. So I think there maybe some flushing problems in 
transferTo when processed data is large.




> Potential file append bugs in ExternalSorter which leads to sort-based 
> shuffle unexpected exception
> ---------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-3948
>                 URL: https://issues.apache.org/jira/browse/SPARK-3948
>             Project: Spark
>          Issue Type: Bug
>          Components: Shuffle
>    Affects Versions: 1.2.0
>            Reporter: Saisai Shao
>
> Several exceptions occurred when running TPC-DS queries against latest master 
> branch with sort-based shuffle enable, like PARSING_ERROR(2) in snappy, 
> deserializing error in Kryo and offset out-range in FileManagedBuffer, all 
> these exceptions are gone when we changed to hash-based shuffle.
> With deep investigation, we found that some shuffle output file is 
> unexpectedly smaller than the others, as the log shows:
> {noformat}
> 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
> shuffle_6_9_11, offset: 3055635, length: 236708, file length: 47274167
> 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
> shuffle_6_10_11, offset: 2986484, length: 222755, file length: 47174539
> 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
> shuffle_6_11_11, offset: 2995341, length: 259871, file length: 383405
> 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
> shuffle_6_12_11, offset: 2991030, length: 268191, file length: 47478892
> 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
> shuffle_6_13_11, offset: 3016292, length: 230694, file length: 47420826
> 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
> shuffle_6_14_11, offset: 3061400, length: 241136, file length: 47395509
> {noformat}
> As you can see the total file length of shuffle_6_11_11 is much smaller than 
> other same stage map output results.
> And we also dump the map outputs in map side to see if this small size output 
> is correct or not, below is the log:
> {noformat}
>  In bypass merge sort, file name: /mnt/DP_disk1/animal/spark/spark-local-
> 20141014182142-8345/22/shuffle_6_11_0.data, file length: 383405length:
>  274722 262597 291290 272902 264941 270358 291005 295285 252482 
> 287142 232617 259871 233734 241439 228897 234282 253834 235619 
> 233803 255532 270739 253825 262087 266404 234273 250120 262983 
> 257024 255947 254971 258908 247862 221613 258566 245399 251684 
> 274843 226150 264278 245279 225656 235084 239466 212851 242245 
> 218781 222191 215500 211548 234256 208601 204113 191923 217895 
> 227020 215331 212313 223725 250876 256875 239276 266777 235520 
> 237462 234063 242270 246825 255888 235937 236956 233099 264508 
> 260303 233294 239061 254856 257475 230105 246553 260412 210355 
> 211201 219572 206636 226866 209937 226618 218208 206255 248069 
> 221717 222112 215734 248088 239207 246125 239056 241133 253091 
> 246738 233128 242794 231606 255737 221123 252115 247286 229688 
> 251087 250047 237579 263079 256251 238214 208641 201120 204009 
> 200825 211965 200600 194492 226471 194887 226975 215072 206008 
> 233288 222132 208860 219064 218162 237126 220465 201343 225711 
> 232178 233786 212767 211462 213671 215853 227822 233782 214727 
> 247001 228968 247413 222674 214241 184122 215643 207665 219079 
> 215185 207718 212723 201613 216600 212591 208174 204195 208099 
> 229079 230274 223373 214999 256626 228895 231821 383405 229646 
> 220212 245495 245960 227556 213266 237203 203805 240509 239306 
> 242365 218416 238487 219397 240026 251011 258369 255365 259811 
> 283313 248450 264286 264562 257485 279459 249187 257609 274964 
> 292369 273826
> {noformat}
> Here I dump the file name, length and each partition's length, obviously the 
> sum of all partition lengths is not equal to file length. So I think there 
> may be a situation paritionWriter in ExternalSorter not always append to the 
> end of previous written file, the file's content is overwritten in some 
> parts, and this lead to the exceptions I mentioned before.
> Also I changed the code of copyStream by disable transferTo, use the previous 
> one, all the issues are gone. So I think there maybe some flushing problems 
> in transferTo when processed data is large.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to