This is an automated email from the ASF dual-hosted git repository.

laiyingchun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit 145210dbbcd33235b46171949f741ef8abea4dc7
Author: kedeng <[email protected]>
AuthorDate: Wed May 24 11:35:53 2023 +0800

    [java] add buffer space limit for KuduSession
    
    Currently, KuduSession only supports limiting the number
    of operations. In this patch, I added the functionality
    of configuring the buffer space according to the data
    size for KuduSession.
    
    To verify that the new feature works effectively, I also
    added corresponding unit tests.
    
    Change-Id: I312d47c98566f9405361d969a4b68b326bb3c4d9
    Reviewed-on: http://gerrit.cloudera.org:8080/19918
    Tested-by: Kudu Jenkins
    Tested-by: Yuqi Du <[email protected]>
    Reviewed-by: Yingchun Lai <[email protected]>
---
 .../org/apache/kudu/client/AsyncKuduSession.java   | 94 ++++++++++++++++------
 .../java/org/apache/kudu/client/KuduSession.java   |  4 +-
 .../java/org/apache/kudu/client/PartialRow.java    | 19 +++++
 .../apache/kudu/client/SessionConfiguration.java   | 12 ++-
 .../apache/kudu/client/TestAsyncKuduSession.java   | 40 +++++++++
 5 files changed, 142 insertions(+), 27 deletions(-)

diff --git 
a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduSession.java 
b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduSession.java
index 9c0ea6912..61289ee70 100644
--- 
a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduSession.java
+++ 
b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduSession.java
@@ -22,6 +22,7 @@ import static 
org.apache.kudu.client.ExternalConsistencyMode.CLIENT_PROPAGATED;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Random;
@@ -124,7 +125,10 @@ public class AsyncKuduSession implements 
SessionConfiguration {
   private final Random randomizer = new Random();
   private final ErrorCollector errorCollector;
   private int flushIntervalMillis = 1000;
-  private int mutationBufferMaxOps = 1000; // TODO express this in terms of 
data size.
+  private int mutationBufferMaxOps = 1000;
+
+  // NOTE : -1 means no limit, set a positive value to limit the max size.
+  private long mutationBufferMaxSize = -1;
   private FlushMode flushMode;
   private ExternalConsistencyMode consistencyMode;
   private long timeoutMillis;
@@ -244,12 +248,13 @@ public class AsyncKuduSession implements 
SessionConfiguration {
   }
 
   @Override
-  public void setMutationBufferSpace(int numOps) {
+  public void setMutationBufferSpace(int numOps, long maxSize) {
     if (hasPendingOperations()) {
       throw new IllegalArgumentException("Cannot change the buffer" +
           " size when operations are buffered");
     }
     this.mutationBufferMaxOps = numOps;
+    this.mutationBufferMaxSize = maxSize;
   }
 
   @Override
@@ -352,7 +357,7 @@ public class AsyncKuduSession implements 
SessionConfiguration {
     private final Deferred<List<BatchResponse>> deferred;
 
     public TabletLookupCB(Buffer buffer, Deferred<List<BatchResponse>> 
deferred) {
-      this.lookupsOutstanding = new 
AtomicInteger(buffer.getOperations().size());
+      this.lookupsOutstanding = new AtomicInteger(buffer.numOps());
       this.buffer = buffer;
       this.deferred = deferred;
     }
@@ -372,7 +377,7 @@ public class AsyncKuduSession implements 
SessionConfiguration {
       List<Integer> opsFailedIndexesList = new ArrayList<>();
 
       int currentIndex = 0;
-      for (BufferedOperation bufferedOp : buffer.getOperations()) {
+      for (BufferedOperation bufferedOp : buffer) {
         Operation operation = bufferedOp.getOperation();
         if (bufferedOp.tabletLookupFailed()) {
           Exception failure = bufferedOp.getTabletLookupFailure();
@@ -556,7 +561,7 @@ public class AsyncKuduSession implements 
SessionConfiguration {
    * @return the operation responses
    */
   private Deferred<List<OperationResponse>> doFlush(Buffer buffer) {
-    if (buffer == null || buffer.getOperations().isEmpty()) {
+    if (buffer == null || buffer.isEmpty()) {
       return Deferred.fromResult(ImmutableList.of());
     }
     LOG.debug("flushing buffer: {}", buffer);
@@ -564,7 +569,7 @@ public class AsyncKuduSession implements 
SessionConfiguration {
     Deferred<List<BatchResponse>> batchResponses = new Deferred<>();
     Callback<Void, Object> tabletLookupCB = new TabletLookupCB(buffer, 
batchResponses);
 
-    for (BufferedOperation bufferedOperation : buffer.getOperations()) {
+    for (BufferedOperation bufferedOperation : buffer) {
       AsyncUtil.addBoth(bufferedOperation.getTabletLookup(), tabletLookupCB);
     }
 
@@ -617,7 +622,7 @@ public class AsyncKuduSession implements 
SessionConfiguration {
   public boolean hasPendingOperations() {
     synchronized (monitor) {
       return activeBuffer == null ? inactiveBuffers.size() < 2 :
-             !activeBuffer.getOperations().isEmpty() || 
!inactiveBufferAvailable();
+             !activeBuffer.isEmpty() || !inactiveBufferAvailable();
     }
   }
 
@@ -641,6 +646,20 @@ public class AsyncKuduSession implements 
SessionConfiguration {
         .addErrback(new SingleOperationErrCallback(operation));
   }
 
+  private boolean isExcessMaxSize(long size) {
+    return mutationBufferMaxSize >= 0 && size >= mutationBufferMaxSize;
+  }
+
+  /**
+  * Check the buffer and determine whether a flush operation needs to be 
performed.
+  * @param activeBufferOps the number of active buffer ops
+  * @param activeBufferSize the number of active buffer byte size
+  * @return true if the flush in need.
+  */
+  private boolean needFlush(int activeBufferOps, long activeBufferSize) {
+    return activeBufferOps >= mutationBufferMaxOps || 
isExcessMaxSize(activeBufferSize);
+  }
+
   /**
    * Apply the given operation.
    * <p>
@@ -678,8 +697,8 @@ public class AsyncKuduSession implements 
SessionConfiguration {
                                                               LookupType.POINT,
                                                               timeoutMillis);
 
-    // Holds a buffer that should be flushed outside the synchronized block, 
if necessary.
-    Buffer fullBuffer = null;
+    // Holds buffers that should be flushed outside the synchronized block, if 
necessary.
+    List<Buffer> fullBuffers = new ArrayList<>();
     try {
       synchronized (monitor) {
         Deferred<Void> notification = flushNotification.get();
@@ -698,7 +717,8 @@ public class AsyncKuduSession implements 
SessionConfiguration {
           }
         }
 
-        int activeBufferSize = activeBuffer.getOperations().size();
+        int activeBufferOps = activeBuffer.numOps();
+        long activeBufferSize = activeBuffer.bufferSize();
         switch (flushMode) {
           case AUTO_FLUSH_SYNC: {
             // This case is handled above and is impossible here.
@@ -707,19 +727,19 @@ public class AsyncKuduSession implements 
SessionConfiguration {
             break;
           }
           case MANUAL_FLUSH: {
-            if (activeBufferSize >= mutationBufferMaxOps) {
+            if (needFlush(activeBufferOps, activeBufferSize)) {
               Status statusIllegalState =
                   Status.IllegalState("MANUAL_FLUSH is enabled but the buffer 
is too big");
               throw new NonRecoverableException(statusIllegalState);
             }
-            activeBuffer.getOperations().add(new BufferedOperation(tablet, 
operation));
+            activeBuffer.addOperation(new BufferedOperation(tablet, 
operation));
             break;
           }
           case AUTO_FLUSH_BACKGROUND: {
-            if (activeBufferSize >= mutationBufferMaxOps) {
+            if (needFlush(activeBufferOps, activeBufferSize)) {
               // If the active buffer is full or overflowing, be sure to kick 
off a flush.
-              fullBuffer = retireActiveBufferUnlocked();
-              activeBufferSize = 0;
+              fullBuffers.add(retireActiveBufferUnlocked());
+              activeBufferOps = 0;
 
               if (!inactiveBufferAvailable()) {
                 Status statusServiceUnavailable =
@@ -733,13 +753,14 @@ public class AsyncKuduSession implements 
SessionConfiguration {
             // Add the operation to the active buffer, and:
             // 1. If it's the first operation in the buffer, start a 
background flush timer.
             // 2. If it filled or overflowed the buffer, kick off a flush.
-            activeBuffer.getOperations().add(new BufferedOperation(tablet, 
operation));
-            if (activeBufferSize == 0) {
+            activeBuffer.addOperation(new BufferedOperation(tablet, 
operation));
+            if (activeBufferOps == 0) {
               AsyncKuduClient.newTimeout(client.getTimer(), 
activeBuffer.getFlusherTask(),
                   flushIntervalMillis);
             }
-            if (activeBufferSize + 1 >= mutationBufferMaxOps && 
inactiveBufferAvailable()) {
-              fullBuffer = retireActiveBufferUnlocked();
+            if (needFlush(activeBufferOps + 1, activeBufferSize + 
operation.getRow().size()) &&
+                inactiveBufferAvailable()) {
+              fullBuffers.add(retireActiveBufferUnlocked());
             }
             break;
           }
@@ -748,8 +769,10 @@ public class AsyncKuduSession implements 
SessionConfiguration {
         }
       }
     } finally {
-      // Flush the buffer outside of the synchronized block, if required.
-      doFlush(fullBuffer);
+      // Flush the buffers outside of the synchronized block, if required.
+      for (Buffer fullBuffer : fullBuffers) {
+        doFlush(fullBuffer);
+      }
     }
     return operation.getDeferred();
   }
@@ -874,16 +897,37 @@ public class AsyncKuduSession implements 
SessionConfiguration {
    * Buffer is externally synchronized. When the active buffer, {@link 
#monitor}
    * synchronizes access to it.
    */
-  private final class Buffer {
+  private final class Buffer implements Iterable<BufferedOperation> {
     private final List<BufferedOperation> operations = new ArrayList<>();
 
+    // NOTE: This param is different from operations.size().
+    // It's the number of total buffer operation size, mainly used to count 
the used buffer size.
+    private long operationSize;
     private FlusherTask flusherTask = null;
 
     private Deferred<Void> flushNotification = Deferred.fromResult(null);
     private boolean flushNotificationFired = false;
 
-    public List<BufferedOperation> getOperations() {
-      return operations;
+    public void addOperation(BufferedOperation operation) {
+      operations.add(operation);
+      operationSize += operation.getOperation().getRow().size();
+    }
+
+    @Override
+    public Iterator<BufferedOperation> iterator() {
+      return operations.iterator();
+    }
+
+    public boolean isEmpty() {
+      return operations.isEmpty();
+    }
+
+    public int numOps() {
+      return operations.size();
+    }
+
+    public long bufferSize() {
+      return operationSize;
     }
 
     @GuardedBy("monitor")
@@ -935,6 +979,7 @@ public class AsyncKuduSession implements 
SessionConfiguration {
     void resetUnlocked() {
       LOG.trace("buffer resetUnlocked: {}", this);
       operations.clear();
+      operationSize = 0;
       flushNotification = new Deferred<>();
       flushNotificationFired = false;
       flusherTask = null;
@@ -944,6 +989,7 @@ public class AsyncKuduSession implements 
SessionConfiguration {
     public String toString() {
       return MoreObjects.toStringHelper(this)
                         .add("operations", operations.size())
+                        .add("operationSize", operationSize)
                         .add("flusherTask", flusherTask)
                         .add("flushNotification", flushNotification)
                         .toString();
diff --git 
a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduSession.java 
b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduSession.java
index b1845bd5f..fcebc137f 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduSession.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduSession.java
@@ -126,8 +126,8 @@ public class KuduSession implements SessionConfiguration {
   }
 
   @Override
-  public void setMutationBufferSpace(int numOps) {
-    session.setMutationBufferSpace(numOps);
+  public void setMutationBufferSpace(int numOps, long maxSize) {
+    session.setMutationBufferSpace(numOps, maxSize);
   }
 
   @Override
diff --git 
a/java/kudu-client/src/main/java/org/apache/kudu/client/PartialRow.java 
b/java/kudu-client/src/main/java/org/apache/kudu/client/PartialRow.java
index 37de74fee..e41e7033a 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/PartialRow.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/PartialRow.java
@@ -1907,4 +1907,23 @@ public class PartialRow {
   void freeze() {
     this.frozen = true;
   }
+
+  /**
+   * @return in memory size of this row.
+   * <p>
+   * Note: the size here is not accurate, as we do not count all the fields, 
but it is
+   * enough for most scenarios.
+   */
+  long size() {
+    long size = (long) rowAlloc.length + columnsBitSet.size() / Byte.SIZE;
+    if (nullsBitSet != null) {
+      size += nullsBitSet.size() / Byte.SIZE;
+    }
+    for (ByteBuffer bb : varLengthData) {
+      if (bb != null) {
+        size += bb.capacity();
+      }
+    }
+    return size;
+  }
 }
diff --git 
a/java/kudu-client/src/main/java/org/apache/kudu/client/SessionConfiguration.java
 
b/java/kudu-client/src/main/java/org/apache/kudu/client/SessionConfiguration.java
index dfb756652..94bcb2934 100644
--- 
a/java/kudu-client/src/main/java/org/apache/kudu/client/SessionConfiguration.java
+++ 
b/java/kudu-client/src/main/java/org/apache/kudu/client/SessionConfiguration.java
@@ -90,7 +90,17 @@ public interface SessionConfiguration {
    * @param size number of ops.
    * @throws IllegalArgumentException if the buffer isn't empty.
    */
-  void setMutationBufferSpace(int size);
+  default void setMutationBufferSpace(int size) {
+    setMutationBufferSpace(size, -1);
+  }
+
+  /**
+   * Set the number and the maximum byte size of operations that can be 
buffered.
+   * @param numOps number of ops.
+   * @param maxSize max byte size of ops.
+   * @throws IllegalArgumentException if the buffer isn't empty.
+   */
+  void setMutationBufferSpace(int numOps, long maxSize);
 
   /**
    * Set the low watermark for this session. The default is set to half the 
mutation buffer space.
diff --git 
a/java/kudu-client/src/test/java/org/apache/kudu/client/TestAsyncKuduSession.java
 
b/java/kudu-client/src/test/java/org/apache/kudu/client/TestAsyncKuduSession.java
index e66789b69..49e3f0662 100644
--- 
a/java/kudu-client/src/test/java/org/apache/kudu/client/TestAsyncKuduSession.java
+++ 
b/java/kudu-client/src/test/java/org/apache/kudu/client/TestAsyncKuduSession.java
@@ -22,6 +22,7 @@ import static 
org.apache.kudu.test.ClientTestUtil.createBasicSchemaInsert;
 import static org.apache.kudu.test.ClientTestUtil.getBasicCreateTableOptions;
 import static org.apache.kudu.test.ClientTestUtil.getBasicSchema;
 import static org.apache.kudu.test.KuduTestHarness.DEFAULT_SLEEP;
+import static org.apache.kudu.test.junit.AssertHelpers.assertEventuallyTrue;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
@@ -40,6 +41,7 @@ import org.slf4j.LoggerFactory;
 import org.apache.kudu.Schema;
 import org.apache.kudu.WireProtocol.AppStatusPB;
 import org.apache.kudu.test.KuduTestHarness;
+import org.apache.kudu.test.junit.AssertHelpers.BooleanExpression;
 import org.apache.kudu.tserver.Tserver.TabletServerErrorPB;
 
 public class TestAsyncKuduSession {
@@ -387,6 +389,44 @@ public class TestAsyncKuduSession {
     assertEquals(2 * kNumOps, countRowsInTable(table));
   }
 
+  /**
+   * Test KuduSession supports configuring buffer space by data size.
+   */
+  @Test(timeout = 90000)
+  public void testFlushBySize() throws Exception {
+    AsyncKuduSession session = client.newSession();
+    final int kBufferSizeOps = 10;
+    final int kNumOps = 2;
+    // Set a small buffer size so we should flush every time.
+    session.setMutationBufferSpace(kBufferSizeOps, 1);
+    // Set a large flush interval so if the flush by size function is not 
correctly implemented,
+    // the test will timeout.
+    session.setFlushInterval(60 * 60 * 1000);
+    session.setFlushMode(SessionConfiguration.FlushMode.AUTO_FLUSH_BACKGROUND);
+
+    for (int i = 0; i < kNumOps; i++) {
+      // Should always flush immediately so here join will return soon.
+      OperationResponse resp = 
session.apply(createInsert(i)).join(DEFAULT_SLEEP);
+      assertFalse(resp.hasRowError());
+    }
+    // Mode AUTO_FLUSH_BACKGROUND also takes time, so we may need wait here.
+    assertEventuallyTrue(String.format("Timeout for flush pending operations"),
+        new BooleanExpression() {
+          @Override
+          public boolean get() throws Exception {
+            return !session.hasPendingOperations();
+          }
+        }, /* timeoutMillis = */500000);
+    assertEquals(0, session.countPendingErrors());
+    // Confirm that we can still make progress.
+    session.apply(createInsert(kNumOps)).join(DEFAULT_SLEEP);
+
+    for (OperationResponse resp: session.flush().join(DEFAULT_SLEEP)) {
+      assertFalse(resp.hasRowError());
+    }
+    assertEquals(0, session.close().join(DEFAULT_SLEEP).size());
+  }
+
   // A helper just to make some lines shorter.
   private Insert createInsert(int key) {
     return createBasicSchemaInsert(table, key);

Reply via email to