[ 
https://issues.apache.org/jira/browse/FLINK-7057?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16072339#comment-16072339
 ] 

ASF GitHub Bot commented on FLINK-7057:
---------------------------------------

Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4238#discussion_r125219915
  
    --- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java ---
    @@ -269,166 +276,128 @@ else if (response != RETURN_OKAY) {
        // 
--------------------------------------------------------------------------------------------
     
        /**
    -    * Uploads the data of the given byte array to the BLOB server in a 
content-addressable manner.
    +    * Uploads the data of the given byte array for the given job to the 
BLOB server.
         *
    +    * @param jobId
    +    *              the ID of the job the BLOB belongs to (or <tt>null</tt> 
if job-unrelated)
         * @param value
    -    *        the buffer to upload
    -    * @return the computed BLOB key identifying the BLOB on the server
    -    * @throws IOException
    -    *         thrown if an I/O error occurs while uploading the data to 
the BLOB server
    -    */
    -   public BlobKey put(byte[] value) throws IOException {
    -           return put(value, 0, value.length);
    -   }
    -
    -   /**
    -    * Uploads data from the given byte array to the BLOB server in a 
content-addressable manner.
    +    *              the buffer to upload
         *
    -    * @param value
    -    *        the buffer to upload data from
    -    * @param offset
    -    *        the read offset within the buffer
    -    * @param len
    -    *        the number of bytes to upload from the buffer
         * @return the computed BLOB key identifying the BLOB on the server
    -    * @throws IOException
    -    *         thrown if an I/O error occurs while uploading the data to 
the BLOB server
    -    */
    -   public BlobKey put(byte[] value, int offset, int len) throws 
IOException {
    -           return putBuffer(null, null, value, offset, len);
    -   }
    -
    -   /**
    -    * Uploads the data of the given byte array to the BLOB server and 
stores it under the given job ID and key.
         *
    -    * @param jobId
    -    *        the job ID to identify the uploaded data
    -    * @param key
    -    *        the key to identify the uploaded data
    -    * @param value
    -    *        the buffer to upload
         * @throws IOException
    -    *         thrown if an I/O error occurs while uploading the data to 
the BLOB server
    +    *              thrown if an I/O error occurs while uploading the data 
to the BLOB server
         */
    -   public void put(JobID jobId, String key, byte[] value) throws 
IOException {
    -           put(jobId, key, value, 0, value.length);
    +   @VisibleForTesting
    +   public BlobKey put(@Nullable JobID jobId, byte[] value) throws 
IOException {
    +           return put(jobId, value, 0, value.length);
        }
     
        /**
    -    * Uploads data from the given byte array to the BLOB server and stores 
it under the given job ID and key.
    +    * Uploads data from the given byte array for the given job to the BLOB 
server.
         *
         * @param jobId
    -    *        the job ID to identify the uploaded data
    -    * @param key
    -    *        the key to identify the uploaded data
    +    *              the ID of the job the BLOB belongs to (or <tt>null</tt> 
if job-unrelated)
         * @param value
    -    *        the buffer to upload data from
    +    *              the buffer to upload data from
         * @param offset
    -    *        the read offset within the buffer
    +    *              the read offset within the buffer
         * @param len
    -    *        the number of bytes to upload from the buffer
    +    *              the number of bytes to upload from the buffer
    +    *
    +    * @return the computed BLOB key identifying the BLOB on the server
    +    *
         * @throws IOException
    -    *         thrown if an I/O error occurs while uploading the data to 
the BLOB server
    +    *              thrown if an I/O error occurs while uploading the data 
to the BLOB server
         */
    -   public void put(JobID jobId, String key, byte[] value, int offset, int 
len) throws IOException {
    -           if (key.length() > MAX_KEY_LENGTH) {
    -                   throw new IllegalArgumentException("Keys must not be 
longer than " + MAX_KEY_LENGTH);
    -           }
    -
    -           putBuffer(jobId, key, value, offset, len);
    +   @VisibleForTesting
    +   public BlobKey put(@Nullable JobID jobId, byte[] value, int offset, int 
len) throws IOException {
    +           return putBuffer(jobId, value, offset, len);
        }
     
        /**
    -    * Uploads data from the given input stream to the BLOB server and 
stores it under the given job ID and key.
    +    * Uploads the (job-unrelated) data from the given input stream to the 
BLOB server.
         *
    -    * @param jobId
    -    *        the job ID to identify the uploaded data
    -    * @param key
    -    *        the key to identify the uploaded data
         * @param inputStream
    -    *        the input stream to read the data from
    +    *              the input stream to read the data from
    +    *
    +    * @return the computed BLOB key identifying the BLOB on the server
    +    *
         * @throws IOException
    -    *         thrown if an I/O error occurs while reading the data from 
the input stream or uploading the data to the
    -    *         BLOB server
    +    *              thrown if an I/O error occurs while reading the data 
from the input stream or uploading the
    +    *              data to the BLOB server
         */
    -   public void put(JobID jobId, String key, InputStream inputStream) 
throws IOException {
    -           if (key.length() > MAX_KEY_LENGTH) {
    -                   throw new IllegalArgumentException("Keys must not be 
longer than " + MAX_KEY_LENGTH);
    -           }
    -
    -           putInputStream(jobId, key, inputStream);
    +   public BlobKey put(InputStream inputStream) throws IOException {
    +           return putInputStream(null, inputStream);
        }
     
        /**
    -    * Uploads the data from the given input stream to the BLOB server in a 
content-addressable manner.
    +    * Uploads the data from the given input stream for the given job to 
the BLOB server.
         *
    +    * @param jobId
    +    *              ID of the job this blob belongs to
         * @param inputStream
    -    *        the input stream to read the data from
    +    *              the input stream to read the data from
    +    *
         * @return the computed BLOB key identifying the BLOB on the server
    +    *
         * @throws IOException
    -    *         thrown if an I/O error occurs while reading the data from 
the input stream or uploading the data to the
    -    *         BLOB server
    +    *              thrown if an I/O error occurs while reading the data 
from the input stream or uploading the
    +    *              data to the BLOB server
         */
    -   public BlobKey put(InputStream inputStream) throws IOException {
    -           return putInputStream(null, null, inputStream);
    +   public BlobKey put(@Nonnull JobID jobId, InputStream inputStream) 
throws IOException {
    +           checkNotNull(jobId);
    +           return putInputStream(jobId, inputStream);
        }
     
        /**
         * Uploads data from the given byte buffer to the BLOB server.
         *
         * @param jobId
    -    *        the ID of the job the BLOB belongs to or <code>null</code> to 
store the BLOB in a content-addressable
    -    *        manner
    -    * @param key
    -    *        the key to identify the BLOB on the server or 
<code>null</code> to store the BLOB in a content-addressable
    -    *        manner
    +    *              the ID of the job the BLOB belongs to (or <tt>null</tt> 
if job-unrelated)
         * @param value
    -    *        the buffer to read the data from
    +    *              the buffer to read the data from
         * @param offset
    -    *        the read offset within the buffer
    +    *              the read offset within the buffer
         * @param len
    -    *        the number of bytes to read from the buffer
    -    * @return the computed BLOB key if the BLOB has been stored in a 
content-addressable manner, <code>null</code>
    -    *         otherwise
    +    *              the number of bytes to read from the buffer
    +    *
    +    * @return the computed BLOB key of the uploaded BLOB
    +    *
         * @throws IOException
    -    *         thrown if an I/O error occurs while uploading the data to 
the BLOB server
    +    *              thrown if an I/O error occurs while uploading the data 
to the BLOB server
         */
    -   private BlobKey putBuffer(JobID jobId, String key, byte[] value, int 
offset, int len) throws IOException {
    +   private BlobKey putBuffer(@Nullable JobID jobId, byte[] value, int 
offset, int len) throws IOException {
                if (this.socket.isClosed()) {
                        throw new IllegalStateException("BLOB Client is not 
connected. " +
                                        "Client has been shut down or 
encountered an error before.");
                }
    +           checkNotNull(value);
     
                if (LOG.isDebugEnabled()) {
    -                   if (jobId == null) {
    -                           LOG.debug(String.format("PUT content 
addressable BLOB buffer (%d bytes) to %s",
    -                                           len, 
socket.getLocalSocketAddress()));
    -                   } else {
    -                           LOG.debug(String.format("PUT BLOB buffer (%d 
bytes) under %s / \"%s\" to %s",
    -                                           len, jobId, key, 
socket.getLocalSocketAddress()));
    -                   }
    +                   LOG.debug("PUT BLOB buffer ({} bytes) to {}.", len, 
socket.getLocalSocketAddress());
    --- End diff --
    
    If guarded by `LOG.isDebugEnabled`, then we don't have to use the 
placeholder. It is more efficient to use string concatenation then.


> move BLOB ref-counting from LibraryCacheManager to BlobCache
> ------------------------------------------------------------
>
>                 Key: FLINK-7057
>                 URL: https://issues.apache.org/jira/browse/FLINK-7057
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Distributed Coordination, Network
>    Affects Versions: 1.4.0
>            Reporter: Nico Kruber
>            Assignee: Nico Kruber
>
> Currently, the {{LibraryCacheManager}} is doing some ref-counting for JAR 
> files managed by it. Instead, we want the {{BlobCache}} to do that itself for 
> all job-related BLOBs. Also, we do not want to operate on a per-{{BlobKey}} 
> level but rather per job. Therefore, the cleanup process should be adapted, 
> too.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to