[
https://issues.apache.org/jira/browse/FLINK-7057?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16072339#comment-16072339
]
ASF GitHub Bot commented on FLINK-7057:
---------------------------------------
Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/4238#discussion_r125219915
--- Diff:
flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java ---
@@ -269,166 +276,128 @@ else if (response != RETURN_OKAY) {
//
--------------------------------------------------------------------------------------------
/**
- * Uploads the data of the given byte array to the BLOB server in a
content-addressable manner.
+ * Uploads the data of the given byte array for the given job to the
BLOB server.
*
+ * @param jobId
+ * the ID of the job the BLOB belongs to (or <tt>null</tt>
if job-unrelated)
* @param value
- * the buffer to upload
- * @return the computed BLOB key identifying the BLOB on the server
- * @throws IOException
- * thrown if an I/O error occurs while uploading the data to
the BLOB server
- */
- public BlobKey put(byte[] value) throws IOException {
- return put(value, 0, value.length);
- }
-
- /**
- * Uploads data from the given byte array to the BLOB server in a
content-addressable manner.
+ * the buffer to upload
*
- * @param value
- * the buffer to upload data from
- * @param offset
- * the read offset within the buffer
- * @param len
- * the number of bytes to upload from the buffer
* @return the computed BLOB key identifying the BLOB on the server
- * @throws IOException
- * thrown if an I/O error occurs while uploading the data to
the BLOB server
- */
- public BlobKey put(byte[] value, int offset, int len) throws
IOException {
- return putBuffer(null, null, value, offset, len);
- }
-
- /**
- * Uploads the data of the given byte array to the BLOB server and
stores it under the given job ID and key.
*
- * @param jobId
- * the job ID to identify the uploaded data
- * @param key
- * the key to identify the uploaded data
- * @param value
- * the buffer to upload
* @throws IOException
- * thrown if an I/O error occurs while uploading the data to
the BLOB server
+ * thrown if an I/O error occurs while uploading the data
to the BLOB server
*/
- public void put(JobID jobId, String key, byte[] value) throws
IOException {
- put(jobId, key, value, 0, value.length);
+ @VisibleForTesting
+ public BlobKey put(@Nullable JobID jobId, byte[] value) throws
IOException {
+ return put(jobId, value, 0, value.length);
}
/**
- * Uploads data from the given byte array to the BLOB server and stores
it under the given job ID and key.
+ * Uploads data from the given byte array for the given job to the BLOB
server.
*
* @param jobId
- * the job ID to identify the uploaded data
- * @param key
- * the key to identify the uploaded data
+ * the ID of the job the BLOB belongs to (or <tt>null</tt>
if job-unrelated)
* @param value
- * the buffer to upload data from
+ * the buffer to upload data from
* @param offset
- * the read offset within the buffer
+ * the read offset within the buffer
* @param len
- * the number of bytes to upload from the buffer
+ * the number of bytes to upload from the buffer
+ *
+ * @return the computed BLOB key identifying the BLOB on the server
+ *
* @throws IOException
- * thrown if an I/O error occurs while uploading the data to
the BLOB server
+ * thrown if an I/O error occurs while uploading the data
to the BLOB server
*/
- public void put(JobID jobId, String key, byte[] value, int offset, int
len) throws IOException {
- if (key.length() > MAX_KEY_LENGTH) {
- throw new IllegalArgumentException("Keys must not be
longer than " + MAX_KEY_LENGTH);
- }
-
- putBuffer(jobId, key, value, offset, len);
+ @VisibleForTesting
+ public BlobKey put(@Nullable JobID jobId, byte[] value, int offset, int
len) throws IOException {
+ return putBuffer(jobId, value, offset, len);
}
/**
- * Uploads data from the given input stream to the BLOB server and
stores it under the given job ID and key.
+ * Uploads the (job-unrelated) data from the given input stream to the
BLOB server.
*
- * @param jobId
- * the job ID to identify the uploaded data
- * @param key
- * the key to identify the uploaded data
* @param inputStream
- * the input stream to read the data from
+ * the input stream to read the data from
+ *
+ * @return the computed BLOB key identifying the BLOB on the server
+ *
* @throws IOException
- * thrown if an I/O error occurs while reading the data from
the input stream or uploading the data to the
- * BLOB server
+ * thrown if an I/O error occurs while reading the data
from the input stream or uploading the
+ * data to the BLOB server
*/
- public void put(JobID jobId, String key, InputStream inputStream)
throws IOException {
- if (key.length() > MAX_KEY_LENGTH) {
- throw new IllegalArgumentException("Keys must not be
longer than " + MAX_KEY_LENGTH);
- }
-
- putInputStream(jobId, key, inputStream);
+ public BlobKey put(InputStream inputStream) throws IOException {
+ return putInputStream(null, inputStream);
}
/**
- * Uploads the data from the given input stream to the BLOB server in a
content-addressable manner.
+ * Uploads the data from the given input stream for the given job to
the BLOB server.
*
+ * @param jobId
+ * ID of the job this blob belongs to
* @param inputStream
- * the input stream to read the data from
+ * the input stream to read the data from
+ *
* @return the computed BLOB key identifying the BLOB on the server
+ *
* @throws IOException
- * thrown if an I/O error occurs while reading the data from
the input stream or uploading the data to the
- * BLOB server
+ * thrown if an I/O error occurs while reading the data
from the input stream or uploading the
+ * data to the BLOB server
*/
- public BlobKey put(InputStream inputStream) throws IOException {
- return putInputStream(null, null, inputStream);
+ public BlobKey put(@Nonnull JobID jobId, InputStream inputStream)
throws IOException {
+ checkNotNull(jobId);
+ return putInputStream(jobId, inputStream);
}
/**
* Uploads data from the given byte buffer to the BLOB server.
*
* @param jobId
- * the ID of the job the BLOB belongs to or <code>null</code> to
store the BLOB in a content-addressable
- * manner
- * @param key
- * the key to identify the BLOB on the server or
<code>null</code> to store the BLOB in a content-addressable
- * manner
+ * the ID of the job the BLOB belongs to (or <tt>null</tt>
if job-unrelated)
* @param value
- * the buffer to read the data from
+ * the buffer to read the data from
* @param offset
- * the read offset within the buffer
+ * the read offset within the buffer
* @param len
- * the number of bytes to read from the buffer
- * @return the computed BLOB key if the BLOB has been stored in a
content-addressable manner, <code>null</code>
- * otherwise
+ * the number of bytes to read from the buffer
+ *
+ * @return the computed BLOB key of the uploaded BLOB
+ *
* @throws IOException
- * thrown if an I/O error occurs while uploading the data to
the BLOB server
+ * thrown if an I/O error occurs while uploading the data
to the BLOB server
*/
- private BlobKey putBuffer(JobID jobId, String key, byte[] value, int
offset, int len) throws IOException {
+ private BlobKey putBuffer(@Nullable JobID jobId, byte[] value, int
offset, int len) throws IOException {
if (this.socket.isClosed()) {
throw new IllegalStateException("BLOB Client is not
connected. " +
"Client has been shut down or
encountered an error before.");
}
+ checkNotNull(value);
if (LOG.isDebugEnabled()) {
- if (jobId == null) {
- LOG.debug(String.format("PUT content
addressable BLOB buffer (%d bytes) to %s",
- len,
socket.getLocalSocketAddress()));
- } else {
- LOG.debug(String.format("PUT BLOB buffer (%d
bytes) under %s / \"%s\" to %s",
- len, jobId, key,
socket.getLocalSocketAddress()));
- }
+ LOG.debug("PUT BLOB buffer ({} bytes) to {}.", len,
socket.getLocalSocketAddress());
--- End diff --
If guarded by `LOG.isDebugEnabled`, then we don't have to use the
placeholder. It is more efficient to use string concatenation then.
> move BLOB ref-counting from LibraryCacheManager to BlobCache
> ------------------------------------------------------------
>
> Key: FLINK-7057
> URL: https://issues.apache.org/jira/browse/FLINK-7057
> Project: Flink
> Issue Type: Sub-task
> Components: Distributed Coordination, Network
> Affects Versions: 1.4.0
> Reporter: Nico Kruber
> Assignee: Nico Kruber
>
> Currently, the {{LibraryCacheManager}} is doing some ref-counting for JAR
> files managed by it. Instead, we want the {{BlobCache}} to do that itself for
> all job-related BLOBs. Also, we do not want to operate on a per-{{BlobKey}}
> level but rather per job. Therefore, the cleanup process should be adapted,
> too.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)