Repository: hbase
Updated Branches:
  refs/heads/branch-2 99399cdee -> 0dcbba156


HBASE-19486: Ensure threadsafe WriteBufferPeriodicFlush operations

Signed-off-by: Chia-Ping Tsai <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/0dcbba15
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/0dcbba15
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/0dcbba15

Branch: refs/heads/branch-2
Commit: 0dcbba156359298d21cf0aca36f37a989606e0cb
Parents: 99399cd
Author: Niels Basjes <[email protected]>
Authored: Sun Dec 31 11:58:24 2017 +0100
Committer: Chia-Ping Tsai <[email protected]>
Committed: Tue Jan 2 17:38:45 2018 +0800

----------------------------------------------------------------------
 .../hbase/client/BufferedMutatorImpl.java       | 42 ++++++++++----------
 1 file changed, 22 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/0dcbba15/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java
index 13b1a81..b171fc4 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java
@@ -78,8 +78,9 @@ public class BufferedMutatorImpl implements BufferedMutator {
   private final AtomicInteger undealtMutationCount = new AtomicInteger(0);
   private final long writeBufferSize;
 
-  private long  writeBufferPeriodicFlushTimeoutMs;
-  private long  writeBufferPeriodicFlushTimerTickMs = 
MIN_WRITE_BUFFER_PERIODIC_FLUSH_TIMERTICK_MS;
+  private final AtomicLong writeBufferPeriodicFlushTimeoutMs = new 
AtomicLong(0);
+  private final AtomicLong writeBufferPeriodicFlushTimerTickMs =
+          new AtomicLong(MIN_WRITE_BUFFER_PERIODIC_FLUSH_TIMERTICK_MS);
   private Timer writeBufferPeriodicFlushTimer = null;
 
   private final int maxKeyValueSize;
@@ -188,7 +189,7 @@ public class BufferedMutatorImpl implements BufferedMutator 
{
     }
 
     if (currentWriteBufferSize.get() == 0) {
-      firstRecordInBufferTimestamp = System.currentTimeMillis();
+      firstRecordInBufferTimestamp.set(System.currentTimeMillis());
     }
 
     // This behavior is highly non-intuitive... it does not protect us against
@@ -214,23 +215,23 @@ public class BufferedMutatorImpl implements 
BufferedMutator {
 
   @VisibleForTesting
   protected long getExecutedWriteBufferPeriodicFlushes() {
-    return executedWriteBufferPeriodicFlushes;
+    return executedWriteBufferPeriodicFlushes.get();
   }
 
-  private long firstRecordInBufferTimestamp = 0;
-  private long executedWriteBufferPeriodicFlushes = 0;
+  private final AtomicLong firstRecordInBufferTimestamp = new AtomicLong(0);
+  private final AtomicLong executedWriteBufferPeriodicFlushes = new 
AtomicLong(0);
 
   private void timerCallbackForWriteBufferPeriodicFlush() {
     if (currentWriteBufferSize.get() == 0) {
       return; // Nothing to flush
     }
     long now = System.currentTimeMillis();
-    if (firstRecordInBufferTimestamp + writeBufferPeriodicFlushTimeoutMs > 
now) {
+    if (firstRecordInBufferTimestamp.get() + 
writeBufferPeriodicFlushTimeoutMs.get() > now) {
       return; // No need to flush yet
     }
     // The first record in the writebuffer has been in there too long --> flush
     try {
-      executedWriteBufferPeriodicFlushes++;
+      executedWriteBufferPeriodicFlushes.incrementAndGet();
       flush();
     } catch (InterruptedIOException | RetriesExhaustedWithDetailsException e) {
       LOG.error("Exception during timerCallbackForWriteBufferPeriodicFlush --> 
" + e.getMessage());
@@ -370,18 +371,18 @@ public class BufferedMutatorImpl implements 
BufferedMutator {
   }
 
   @Override
-  public void setWriteBufferPeriodicFlush(long timeoutMs, long timerTickMs) {
-    long originalTimeoutMs   = this.writeBufferPeriodicFlushTimeoutMs;
-    long originalTimerTickMs = this.writeBufferPeriodicFlushTimerTickMs;
+  public synchronized void setWriteBufferPeriodicFlush(long timeoutMs, long 
timerTickMs) {
+    long originalTimeoutMs   = this.writeBufferPeriodicFlushTimeoutMs.get();
+    long originalTimerTickMs = this.writeBufferPeriodicFlushTimerTickMs.get();
 
     // Both parameters have minimal values.
-    this.writeBufferPeriodicFlushTimeoutMs   = Math.max(0, timeoutMs);
-    this.writeBufferPeriodicFlushTimerTickMs =
-            Math.max(MIN_WRITE_BUFFER_PERIODIC_FLUSH_TIMERTICK_MS, 
timerTickMs);
+    writeBufferPeriodicFlushTimeoutMs.set(Math.max(0, timeoutMs));
+    writeBufferPeriodicFlushTimerTickMs.set(
+            Math.max(MIN_WRITE_BUFFER_PERIODIC_FLUSH_TIMERTICK_MS, 
timerTickMs));
 
     // If something changed we stop the old Timer.
-    if (this.writeBufferPeriodicFlushTimeoutMs   != originalTimeoutMs  ||
-        this.writeBufferPeriodicFlushTimerTickMs != originalTimerTickMs) {
+    if (writeBufferPeriodicFlushTimeoutMs.get() != originalTimeoutMs ||
+        writeBufferPeriodicFlushTimerTickMs.get() != originalTimerTickMs) {
       if (writeBufferPeriodicFlushTimer != null) {
         writeBufferPeriodicFlushTimer.cancel();
         writeBufferPeriodicFlushTimer = null;
@@ -390,25 +391,26 @@ public class BufferedMutatorImpl implements 
BufferedMutator {
 
     // If we have the need for a timer and there is none we start it
     if (writeBufferPeriodicFlushTimer == null &&
-        writeBufferPeriodicFlushTimeoutMs > 0) {
+        writeBufferPeriodicFlushTimeoutMs.get() > 0) {
       writeBufferPeriodicFlushTimer = new Timer(true); // Create Timer running 
as Daemon.
       writeBufferPeriodicFlushTimer.schedule(new TimerTask() {
         @Override
         public void run() {
           BufferedMutatorImpl.this.timerCallbackForWriteBufferPeriodicFlush();
         }
-      }, writeBufferPeriodicFlushTimerTickMs, 
writeBufferPeriodicFlushTimerTickMs);
+      }, writeBufferPeriodicFlushTimerTickMs.get(),
+         writeBufferPeriodicFlushTimerTickMs.get());
     }
   }
 
   @Override
   public long getWriteBufferPeriodicFlushTimeoutMs() {
-    return this.writeBufferPeriodicFlushTimeoutMs;
+    return writeBufferPeriodicFlushTimeoutMs.get();
   }
 
   @Override
   public long getWriteBufferPeriodicFlushTimerTickMs() {
-    return this.writeBufferPeriodicFlushTimerTickMs;
+    return writeBufferPeriodicFlushTimerTickMs.get();
   }
 
   @Override

Reply via email to