[ 
https://issues.apache.org/jira/browse/HBASE-19486?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16302819#comment-16302819
 ] 

Chia-Ping Tsai commented on HBASE-19486:
----------------------------------------

If we pass the same parameters, the old Timer won't be ceased and an new timer 
will be created. 
{code}
  @Override
  public void setWriteBufferPeriodicFlush(long timeoutMs, long timerTickMs) {
    long originalTimeoutMs   = this.writeBufferPeriodicFlushTimeoutMs;
    long originalTimerTickMs = this.writeBufferPeriodicFlushTimerTickMs;

    // Both parameters have minimal values.
    this.writeBufferPeriodicFlushTimeoutMs   = Math.max(0, timeoutMs);
    this.writeBufferPeriodicFlushTimerTickMs =
            Math.max(MIN_WRITE_BUFFER_PERIODIC_FLUSH_TIMERTICK_MS, timerTickMs);

    // If something changed we stop the old Timer.
    if (this.writeBufferPeriodicFlushTimeoutMs   != originalTimeoutMs  ||
        this.writeBufferPeriodicFlushTimerTickMs != originalTimerTickMs) {
      if (writeBufferPeriodicFlushTimer != null) {
        writeBufferPeriodicFlushTimer.cancel();
        writeBufferPeriodicFlushTimer = null;
      }
    }

    // If we have the need for a new timer we start it
    if (this.writeBufferPeriodicFlushTimeoutMs > 0) {
      writeBufferPeriodicFlushTimer = new Timer(true); // Create Timer running 
as Daemon.
      writeBufferPeriodicFlushTimer.schedule(new TimerTask() {
        @Override
        public void run() {
          BufferedMutatorImpl.this.timerCallbackForWriteBufferPeriodicFlush();
        }
      }, writeBufferPeriodicFlushTimerTickMs, 
writeBufferPeriodicFlushTimerTickMs);
    }
  }
{code}

>  Periodically ensure records are not buffered too long by BufferedMutator
> -------------------------------------------------------------------------
>
>                 Key: HBASE-19486
>                 URL: https://issues.apache.org/jira/browse/HBASE-19486
>             Project: HBase
>          Issue Type: Improvement
>          Components: Client
>            Reporter: Niels Basjes
>            Assignee: Niels Basjes
>         Attachments: HBASE-19486-20171212-2117.patch, 
> HBASE-19486-20171218-1229.patch, HBASE-19486-20171218-1300.patch, 
> HBASE-19486-20171219-0933.patch, HBASE-19486-20171219-1026.patch, 
> HBASE-19486-20171219-1122-trigger-qa-run.patch, 
> HBASE-19486-20171220-1612-trigger-qa-run.patch, 
> HBASE-19486-20171220-2228-trigger-qa-run.patch, 
> HBASE-19486-20171223-1438-trigger-qa-run.patch, 
> HBASE-19486-20171223-1728-trigger-qa-run.patch, 
> HBASE-19486-20171223-2222-trigger-qa-run.patch, 
> HBASE-19486-20171224-1101-trigger-qa-run.patch
>
>
> I'm working on several projects where we are doing stream / event type 
> processing instead of batch type processing. We mostly use Apache Flink and 
> Apache Beam for these projects.
> When we ingest a continuous stream of events and feed that into HBase via a 
> BufferedMutator this all works fine. The buffer fills up at a predictable 
> rate and we can make sure it flushes several times per second into HBase by 
> tuning the buffer size.
> We also have situations where the event rate is unpredictable. Some times 
> because the source is in reality a batch job that puts records into Kafka, 
> sometimes because it is the "predictable in production" application in our 
> testing environment (where only the dev triggers a handful of events).
> For these kinds of use cases we need a way to 'force' the BufferedMutator to 
> automatically flush any records in the buffer even if the buffer is not full.
> I'll put up a pull request with a proposed implementation for review against 
> the master (i.e. 3.0.0).
> When approved I would like to backport this to the 1.x and 2.x versions of 
> the client in the same (as close as possible) way.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to