[
https://issues.apache.org/jira/browse/FLINK-7057?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16072336#comment-16072336
]
ASF GitHub Bot commented on FLINK-7057:
---------------------------------------
Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/4238#discussion_r125247578
--- Diff:
flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServerConnection.java
---
@@ -477,97 +406,96 @@ else if (contentAddressable == CONTENT_ADDRESSABLE) {
clientSocket.close();
}
finally {
- if (fos != null) {
- try {
- fos.close();
- } catch (Throwable t) {
- LOG.warn("Cannot close stream to BLOB
staging file", t);
- }
- }
if (incomingFile != null) {
- if (!incomingFile.delete()) {
+ if (!incomingFile.delete() &&
incomingFile.exists()) {
LOG.warn("Cannot delete BLOB server
staging file " + incomingFile.getAbsolutePath());
}
}
}
}
/**
- * Handles an incoming DELETE request from a BLOB client.
- *
- * @param inputStream The input stream to read the request from.
- * @param outputStream The output stream to write the response to.
- * @throws java.io.IOException Thrown if an I/O error occurs while
reading the request data from the input stream.
+ * Reads a full file from <tt>inputStream</tt> into
<tt>incomingFile</tt> returning its checksum.
+ *
+ * @param inputStream
+ * stream to read from
+ * @param incomingFile
+ * file to write to
+ * @param buf
+ * An auxiliary buffer for data
serialization/deserialization
+ *
+ * @return the received file's content hash as a BLOB key
+ *
+ * @throws IOException
+ * thrown if an I/O error occurs while reading/writing
data from/to the respective streams
*/
- private void delete(InputStream inputStream, OutputStream outputStream,
byte[] buf) throws IOException {
+ private static BlobKey readFileFully(
+ final InputStream inputStream, final File incomingFile,
final byte[] buf)
+ throws IOException {
+ MessageDigest md = BlobUtils.createMessageDigest();
+ FileOutputStream fos = new FileOutputStream(incomingFile);
try {
- int type = inputStream.read();
- if (type < 0) {
- throw new EOFException("Premature end of DELETE
request");
- }
-
- if (type == CONTENT_ADDRESSABLE) {
- BlobKey key =
BlobKey.readFromInputStream(inputStream);
- File blobFile =
blobServer.getStorageLocation(key);
-
- writeLock.lock();
-
- try {
- // we should make the local and remote
file deletion atomic, otherwise we might risk not
- // removing the remote file in case of
a concurrent put operation
- if (blobFile.exists() &&
!blobFile.delete()) {
- throw new IOException("Cannot
delete BLOB file " + blobFile.getAbsolutePath());
- }
-
- blobStore.delete(key);
- } finally {
- writeLock.unlock();
+ while (true) {
+ final int bytesExpected =
readLength(inputStream);
+ if (bytesExpected == -1) {
+ // done
+ break;
+ }
+ if (bytesExpected > BUFFER_SIZE) {
+ throw new IOException(
+ "Unexpected number of incoming
bytes: " + bytesExpected);
}
- }
- else if (type == NAME_ADDRESSABLE) {
- byte[] jidBytes = new byte[JobID.SIZE];
- readFully(inputStream, jidBytes, 0, JobID.SIZE,
"JobID");
- JobID jobID = JobID.fromByteArray(jidBytes);
- String key = readKey(buf, inputStream);
+ readFully(inputStream, buf, 0, bytesExpected,
"buffer");
+ fos.write(buf, 0, bytesExpected);
- File blobFile =
this.blobServer.getStorageLocation(jobID, key);
+ md.update(buf, 0, bytesExpected);
+ }
+ return new BlobKey(md.digest());
+ } finally {
--- End diff --
try with resources maybe?
> move BLOB ref-counting from LibraryCacheManager to BlobCache
> ------------------------------------------------------------
>
> Key: FLINK-7057
> URL: https://issues.apache.org/jira/browse/FLINK-7057
> Project: Flink
> Issue Type: Sub-task
> Components: Distributed Coordination, Network
> Affects Versions: 1.4.0
> Reporter: Nico Kruber
> Assignee: Nico Kruber
>
> Currently, the {{LibraryCacheManager}} is doing some ref-counting for JAR
> files managed by it. Instead, we want the {{BlobCache}} to do that itself for
> all job-related BLOBs. Also, we do not want to operate on a per-{{BlobKey}}
> level but rather per job. Therefore, the cleanup process should be adapted,
> too.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)