Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/4238#discussion_r125219915
--- Diff:
flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java ---
@@ -269,166 +276,128 @@ else if (response != RETURN_OKAY) {
//
--------------------------------------------------------------------------------------------
/**
- * Uploads the data of the given byte array to the BLOB server in a
content-addressable manner.
+ * Uploads the data of the given byte array for the given job to the
BLOB server.
*
+ * @param jobId
+ * the ID of the job the BLOB belongs to (or <tt>null</tt>
if job-unrelated)
* @param value
- * the buffer to upload
- * @return the computed BLOB key identifying the BLOB on the server
- * @throws IOException
- * thrown if an I/O error occurs while uploading the data to
the BLOB server
- */
- public BlobKey put(byte[] value) throws IOException {
- return put(value, 0, value.length);
- }
-
- /**
- * Uploads data from the given byte array to the BLOB server in a
content-addressable manner.
+ * the buffer to upload
*
- * @param value
- * the buffer to upload data from
- * @param offset
- * the read offset within the buffer
- * @param len
- * the number of bytes to upload from the buffer
* @return the computed BLOB key identifying the BLOB on the server
- * @throws IOException
- * thrown if an I/O error occurs while uploading the data to
the BLOB server
- */
- public BlobKey put(byte[] value, int offset, int len) throws
IOException {
- return putBuffer(null, null, value, offset, len);
- }
-
- /**
- * Uploads the data of the given byte array to the BLOB server and
stores it under the given job ID and key.
*
- * @param jobId
- * the job ID to identify the uploaded data
- * @param key
- * the key to identify the uploaded data
- * @param value
- * the buffer to upload
* @throws IOException
- * thrown if an I/O error occurs while uploading the data to
the BLOB server
+ * thrown if an I/O error occurs while uploading the data
to the BLOB server
*/
- public void put(JobID jobId, String key, byte[] value) throws
IOException {
- put(jobId, key, value, 0, value.length);
+ @VisibleForTesting
+ public BlobKey put(@Nullable JobID jobId, byte[] value) throws
IOException {
+ return put(jobId, value, 0, value.length);
}
/**
- * Uploads data from the given byte array to the BLOB server and stores
it under the given job ID and key.
+ * Uploads data from the given byte array for the given job to the BLOB
server.
*
* @param jobId
- * the job ID to identify the uploaded data
- * @param key
- * the key to identify the uploaded data
+ * the ID of the job the BLOB belongs to (or <tt>null</tt>
if job-unrelated)
* @param value
- * the buffer to upload data from
+ * the buffer to upload data from
* @param offset
- * the read offset within the buffer
+ * the read offset within the buffer
* @param len
- * the number of bytes to upload from the buffer
+ * the number of bytes to upload from the buffer
+ *
+ * @return the computed BLOB key identifying the BLOB on the server
+ *
* @throws IOException
- * thrown if an I/O error occurs while uploading the data to
the BLOB server
+ * thrown if an I/O error occurs while uploading the data
to the BLOB server
*/
- public void put(JobID jobId, String key, byte[] value, int offset, int
len) throws IOException {
- if (key.length() > MAX_KEY_LENGTH) {
- throw new IllegalArgumentException("Keys must not be
longer than " + MAX_KEY_LENGTH);
- }
-
- putBuffer(jobId, key, value, offset, len);
+ @VisibleForTesting
+ public BlobKey put(@Nullable JobID jobId, byte[] value, int offset, int
len) throws IOException {
+ return putBuffer(jobId, value, offset, len);
}
/**
- * Uploads data from the given input stream to the BLOB server and
stores it under the given job ID and key.
+ * Uploads the (job-unrelated) data from the given input stream to the
BLOB server.
*
- * @param jobId
- * the job ID to identify the uploaded data
- * @param key
- * the key to identify the uploaded data
* @param inputStream
- * the input stream to read the data from
+ * the input stream to read the data from
+ *
+ * @return the computed BLOB key identifying the BLOB on the server
+ *
* @throws IOException
- * thrown if an I/O error occurs while reading the data from
the input stream or uploading the data to the
- * BLOB server
+ * thrown if an I/O error occurs while reading the data
from the input stream or uploading the
+ * data to the BLOB server
*/
- public void put(JobID jobId, String key, InputStream inputStream)
throws IOException {
- if (key.length() > MAX_KEY_LENGTH) {
- throw new IllegalArgumentException("Keys must not be
longer than " + MAX_KEY_LENGTH);
- }
-
- putInputStream(jobId, key, inputStream);
+ public BlobKey put(InputStream inputStream) throws IOException {
+ return putInputStream(null, inputStream);
}
/**
- * Uploads the data from the given input stream to the BLOB server in a
content-addressable manner.
+ * Uploads the data from the given input stream for the given job to
the BLOB server.
*
+ * @param jobId
+ * ID of the job this blob belongs to
* @param inputStream
- * the input stream to read the data from
+ * the input stream to read the data from
+ *
* @return the computed BLOB key identifying the BLOB on the server
+ *
* @throws IOException
- * thrown if an I/O error occurs while reading the data from
the input stream or uploading the data to the
- * BLOB server
+ * thrown if an I/O error occurs while reading the data
from the input stream or uploading the
+ * data to the BLOB server
*/
- public BlobKey put(InputStream inputStream) throws IOException {
- return putInputStream(null, null, inputStream);
+ public BlobKey put(@Nonnull JobID jobId, InputStream inputStream)
throws IOException {
+ checkNotNull(jobId);
+ return putInputStream(jobId, inputStream);
}
/**
* Uploads data from the given byte buffer to the BLOB server.
*
* @param jobId
- * the ID of the job the BLOB belongs to or <code>null</code> to
store the BLOB in a content-addressable
- * manner
- * @param key
- * the key to identify the BLOB on the server or
<code>null</code> to store the BLOB in a content-addressable
- * manner
+ * the ID of the job the BLOB belongs to (or <tt>null</tt>
if job-unrelated)
* @param value
- * the buffer to read the data from
+ * the buffer to read the data from
* @param offset
- * the read offset within the buffer
+ * the read offset within the buffer
* @param len
- * the number of bytes to read from the buffer
- * @return the computed BLOB key if the BLOB has been stored in a
content-addressable manner, <code>null</code>
- * otherwise
+ * the number of bytes to read from the buffer
+ *
+ * @return the computed BLOB key of the uploaded BLOB
+ *
* @throws IOException
- * thrown if an I/O error occurs while uploading the data to
the BLOB server
+ * thrown if an I/O error occurs while uploading the data
to the BLOB server
*/
- private BlobKey putBuffer(JobID jobId, String key, byte[] value, int
offset, int len) throws IOException {
+ private BlobKey putBuffer(@Nullable JobID jobId, byte[] value, int
offset, int len) throws IOException {
if (this.socket.isClosed()) {
throw new IllegalStateException("BLOB Client is not
connected. " +
"Client has been shut down or
encountered an error before.");
}
+ checkNotNull(value);
if (LOG.isDebugEnabled()) {
- if (jobId == null) {
- LOG.debug(String.format("PUT content
addressable BLOB buffer (%d bytes) to %s",
- len,
socket.getLocalSocketAddress()));
- } else {
- LOG.debug(String.format("PUT BLOB buffer (%d
bytes) under %s / \"%s\" to %s",
- len, jobId, key,
socket.getLocalSocketAddress()));
- }
+ LOG.debug("PUT BLOB buffer ({} bytes) to {}.", len,
socket.getLocalSocketAddress());
--- End diff --
If guarded by `LOG.isDebugEnabled`, then we don't have to use the
placeholder. It is more efficient to use string concatenation then.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---