[ 
https://issues.apache.org/jira/browse/HAMA-764?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13682147#comment-13682147
 ] 

Suraj Menon commented on HAMA-764:
----------------------------------

This is a nice catch! Thanks! The SpillingStream has handle to the 
SpilledDataProcessor object (this.processor). Instead of invoking the thread we 
can change the following in flushInternal. 
{code}
public void flushInternal() throws IOException {
      if (closed_)
        return;
      
      currentBuffer_.flip();
      spillStatus_.spillCompleted();
      try{
      if (this.startedSpilling_) {
        this.spillThread_.completeSpill();
        boolean completionState = false;
        try {
          completionState = spillThreadState_.get();
          if (!completionState) {
            throw new IOException(
                "Spilling Thread failed to complete sucessfully.");
          }
        } catch (ExecutionException e) {
          throw new IOException(e);
        } catch (InterruptedException e) {
          throw new IOException(e);
        }
      }
      else {
         this.processor.handleSpilledBuffer(currentBuffer_); // synchronously 
handle spilled data.
      } 
      }finally {
          closed_ = true;
          this.processor.close();
          this.spillThreadService_.shutdownNow();
        }

      }
    }
{code}
                
> In SpillingQueue, spilling thread dos not start when messages number less 
> than threshold
> ----------------------------------------------------------------------------------------
>
>                 Key: HAMA-764
>                 URL: https://issues.apache.org/jira/browse/HAMA-764
>             Project: Hama
>          Issue Type: Bug
>          Components: bsp core
>    Affects Versions: 0.7.0
>            Reporter: MaoYuan Xian
>
> In the SpillingDataOutputBuffer.SpillingStream, we can find the following 
> implementations to starting the spilling thread.
> {code}
>    private void checkSpillStart() throws IOException {
>       if (bytesWritten_ >= thresholdSize_) {
>         try {
>           startSpilling();
>         } catch (InterruptedException e) {
>           throw new IOException("Internal error occured writing to buffer.", 
> e);
>         }
>       }
>     }
> {code}
> But I encountered the message loss, when total bytes written number is less 
> than thresholdSize. Because the spilling thread is not invoked, and 
> handleSpilledBuffer method of SpilledDataProcessor can not get the chance to 
> run.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

Reply via email to