[ https://issues.apache.org/jira/browse/SPARK-17573?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Jianfei Wang updated SPARK-17573: --------------------------------- Description: I think that the InputStream may never be closed when some exceptions occur, we should surround this with try catch. {code} private def addFilesToZipStream(parent: String, source: File, output: ZipOutputStream): Unit = { if (source.isDirectory()) { output.putNextEntry(new ZipEntry(parent + source.getName())) for (file <- source.listFiles()) { addFilesToZipStream(parent + source.getName() + File.separator, file, output) } } else { val in = new FileInputStream(source) output.putNextEntry(new ZipEntry(parent + source.getName())) val buf = new Array[Byte](8192) var n = 0 while (n != -1) { n = in.read(buf) if (n != -1) { output.write(buf, 0, n) } } output.closeEntry() in.close() } } {code} was: I find that there are many places in spark that we don't close the input/output Streams manually, if so ,there will potential "OOM" errors and some other errors happen such as: {code} private[sql] def bytesToRow(bytes: Array[Byte], schema: StructType): Row = { val bis = new ByteArrayInputStream(bytes) val dis = new DataInputStream(bis) val num = SerDe.readInt(dis) Row.fromSeq((0 until num).map { i => doConversion(SerDe.readObject(dis), schema.fields(i).dataType) }) } private[sql] def rowToRBytes(row: Row): Array[Byte] = { val bos = new ByteArrayOutputStream() val dos = new DataOutputStream(bos) val cols = (0 until row.length).map(row(_).asInstanceOf[Object]).toArray SerDe.writeObject(dos, cols) bos.toByteArray() } override def deserialize(storageFormat: Array[Byte]): MaxValue = { val in = new ByteArrayInputStream(storageFormat) val stream = new DataInputStream(in) val isValueSet = stream.readBoolean() val value = stream.readInt() new MaxValue(value, isValueSet) } {code} > The FileInputStream may be uncloseed when some exceptions occurs > ---------------------------------------------------------------- > > Key: SPARK-17573 > URL: https://issues.apache.org/jira/browse/SPARK-17573 > Project: Spark > Issue Type: Improvement > Components: SQL > Affects Versions: 2.0.0 > Reporter: Jianfei Wang > Labels: performance > > I think that the InputStream may never be closed when some exceptions occur, > we should surround this with try catch. > {code} > private def addFilesToZipStream(parent: String, source: File, output: > ZipOutputStream): Unit = { > if (source.isDirectory()) { > output.putNextEntry(new ZipEntry(parent + source.getName())) > for (file <- source.listFiles()) { > addFilesToZipStream(parent + source.getName() + File.separator, file, > output) > } > } else { > val in = new FileInputStream(source) > output.putNextEntry(new ZipEntry(parent + source.getName())) > val buf = new Array[Byte](8192) > var n = 0 > while (n != -1) { > n = in.read(buf) > if (n != -1) { > output.write(buf, 0, n) > } > } > output.closeEntry() > in.close() > } > } > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org