[ 
https://issues.apache.org/jira/browse/SPARK-3948?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14172225#comment-14172225
 ] 

Mridul Muralidharan commented on SPARK-3948:
--------------------------------------------

That is weird, I tried a stripped down version to test just this - and it seems 
to be working fine.

In scala interpreter, this seems to work as expected.
{code:java}

import java.io._
import java.nio.ByteBuffer
import java.nio._
import java.nio.channels._
  def copyStream(in: InputStream,
                 out: OutputStream,
                 closeStreams: Boolean = false): Long =
  {
    var count = 0L
    try {
      if (in.isInstanceOf[FileInputStream] && 
out.isInstanceOf[FileOutputStream]) {
        // When both streams are File stream, use transferTo to improve copy 
performance.
        val inChannel = in.asInstanceOf[FileInputStream].getChannel()
        val outChannel = out.asInstanceOf[FileOutputStream].getChannel()
println("size = " + outChannel.size)
println("position = " + outChannel.position)
        val size = inChannel.size()
        // In case transferTo method transferred less data than we have 
required.
        while (count < size) {
          count += inChannel.transferTo(count, size - count, outChannel)
        }
      } else {
        val buf = new Array[Byte](8192)
        var n = 0
        while (n != -1) {
          n = in.read(buf)
          if (n != -1) {
            out.write(buf, 0, n)
            count += n
          }
        }
}
      count
   } finally {
      if (closeStreams) {
        try {
          in.close()
        } finally {
          out.close()
        }
      }
    }
  }
val out = new FileOutputStream("output")
for (i <- 0 until 10) {
val in = new FileInputStream("t")
         val size = copyStream(in, out, false)
println("size = " + size + " for i = " + i)
in.close()
}
out.close()

{code}


Scenarios tried :
a) No "output" file.
b) Empty "output" file.
c) Non empty "output" file.

And it seemed to work fine (and as expected) for all the cases.
Can you try this at your end ? I want to eliminate any potential environment 
issues.
I tried this with 1.7.0_55 and 1.8.0_25 ...

> Sort-based shuffle can lead to assorted stream-corruption exceptions
> --------------------------------------------------------------------
>
>                 Key: SPARK-3948
>                 URL: https://issues.apache.org/jira/browse/SPARK-3948
>             Project: Spark
>          Issue Type: Bug
>          Components: Shuffle
>    Affects Versions: 1.2.0
>            Reporter: Saisai Shao
>            Assignee: Saisai Shao
>
> Several exceptions occurred when running TPC-DS queries against latest master 
> branch with sort-based shuffle enable, like PARSING_ERROR(2) in snappy, 
> deserializing error in Kryo and offset out-range in FileManagedBuffer, all 
> these exceptions are gone when we changed to hash-based shuffle.
> With deep investigation, we found that some shuffle output file is 
> unexpectedly smaller than the others, as the log shows:
> {noformat}
> 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
> shuffle_6_9_11, offset: 3055635, length: 236708, file length: 47274167
> 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
> shuffle_6_10_11, offset: 2986484, length: 222755, file length: 47174539
> 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
> shuffle_6_11_11, offset: 2995341, length: 259871, file length: 383405
> 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
> shuffle_6_12_11, offset: 2991030, length: 268191, file length: 47478892
> 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
> shuffle_6_13_11, offset: 3016292, length: 230694, file length: 47420826
> 14/10/14 18:25:06 INFO shuffle.IndexShuffleBlockManager: Block id: 
> shuffle_6_14_11, offset: 3061400, length: 241136, file length: 47395509
> {noformat}
> As you can see the total file length of shuffle_6_11_11 is much smaller than 
> other same stage map output results.
> And we also dump the map outputs in map side to see if this small size output 
> is correct or not, below is the log:
> {noformat}
>  In bypass merge sort, file name: /mnt/DP_disk1/animal/spark/spark-local-
> 20141014182142-8345/22/shuffle_6_11_0.data, file length: 383405length:
>  274722 262597 291290 272902 264941 270358 291005 295285 252482 
> 287142 232617 259871 233734 241439 228897 234282 253834 235619 
> 233803 255532 270739 253825 262087 266404 234273 250120 262983 
> 257024 255947 254971 258908 247862 221613 258566 245399 251684 
> 274843 226150 264278 245279 225656 235084 239466 212851 242245 
> 218781 222191 215500 211548 234256 208601 204113 191923 217895 
> 227020 215331 212313 223725 250876 256875 239276 266777 235520 
> 237462 234063 242270 246825 255888 235937 236956 233099 264508 
> 260303 233294 239061 254856 257475 230105 246553 260412 210355 
> 211201 219572 206636 226866 209937 226618 218208 206255 248069 
> 221717 222112 215734 248088 239207 246125 239056 241133 253091 
> 246738 233128 242794 231606 255737 221123 252115 247286 229688 
> 251087 250047 237579 263079 256251 238214 208641 201120 204009 
> 200825 211965 200600 194492 226471 194887 226975 215072 206008 
> 233288 222132 208860 219064 218162 237126 220465 201343 225711 
> 232178 233786 212767 211462 213671 215853 227822 233782 214727 
> 247001 228968 247413 222674 214241 184122 215643 207665 219079 
> 215185 207718 212723 201613 216600 212591 208174 204195 208099 
> 229079 230274 223373 214999 256626 228895 231821 383405 229646 
> 220212 245495 245960 227556 213266 237203 203805 240509 239306 
> 242365 218416 238487 219397 240026 251011 258369 255365 259811 
> 283313 248450 264286 264562 257485 279459 249187 257609 274964 
> 292369 273826
> {noformat}
> Here I dump the file name, length and each partition's length, obviously the 
> sum of all partition lengths is not equal to file length. So I think there 
> may be a situation paritionWriter in ExternalSorter not always append to the 
> end of previous written file, the file's content is overwritten in some 
> parts, and this lead to the exceptions I mentioned before.
> Also I changed the code of copyStream by disable transferTo, use the previous 
> one, all the issues are gone. So I think there maybe some flushing problems 
> in transferTo when processed data is large.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to