Github user babokim commented on a diff in the pull request:

    https://github.com/apache/tajo/pull/107#discussion_r16764866
  
    --- Diff: 
tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java
 ---
    @@ -312,6 +377,63 @@ public ChannelPipeline getPipeline() throws Exception {
         }
       }
     
    +
    +  Map<String, ProcessingStatus> processingStatusMap = new 
ConcurrentHashMap<String, ProcessingStatus>();
    +
    +  public void completeFileChunk(FadvisedFileRegion filePart,
    +                                   String requestUri,
    +                                   long startTime) {
    +    ProcessingStatus status = processingStatusMap.get(requestUri);
    +    if (status != null) {
    +      status.decrementRemainFiles(filePart, startTime);
    +    }
    +  }
    +
    +  class ProcessingStatus {
    +    String requestUri;
    +    int numFiles;
    +    AtomicInteger remainFiles;
    +    long startTime;
    +    long makeFileListTime;
    +    long minTime = Long.MAX_VALUE;
    +    long maxTime;
    +    int numSlowFile;
    +
    +    public ProcessingStatus(String requestUri) {
    +      this.requestUri = requestUri;
    +      this.startTime = System.currentTimeMillis();
    +    }
    +
    +    public void setNumFiles(int numFiles) {
    +      this.numFiles = numFiles;
    +      this.remainFiles = new AtomicInteger(numFiles);
    +    }
    +    public void decrementRemainFiles(FadvisedFileRegion filePart, long 
fileStartTime) {
    +      synchronized(remainFiles) {
    --- End diff --
    
    The maxTime and minTime variable is not atomic variable and this variable 
should be guaranteed thread safe.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

Reply via email to