This is an automated email from the ASF dual-hosted git repository. laiyingchun pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/kudu.git
commit 145210dbbcd33235b46171949f741ef8abea4dc7 Author: kedeng <[email protected]> AuthorDate: Wed May 24 11:35:53 2023 +0800 [java] add buffer space limit for KuduSession Currently, KuduSession only supports limiting the number of operations. In this patch, I added the functionality of configuring the buffer space according to the data size for KuduSession. To verify that the new feature works effectively, I also added corresponding unit tests. Change-Id: I312d47c98566f9405361d969a4b68b326bb3c4d9 Reviewed-on: http://gerrit.cloudera.org:8080/19918 Tested-by: Kudu Jenkins Tested-by: Yuqi Du <[email protected]> Reviewed-by: Yingchun Lai <[email protected]> --- .../org/apache/kudu/client/AsyncKuduSession.java | 94 ++++++++++++++++------ .../java/org/apache/kudu/client/KuduSession.java | 4 +- .../java/org/apache/kudu/client/PartialRow.java | 19 +++++ .../apache/kudu/client/SessionConfiguration.java | 12 ++- .../apache/kudu/client/TestAsyncKuduSession.java | 40 +++++++++ 5 files changed, 142 insertions(+), 27 deletions(-) diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduSession.java b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduSession.java index 9c0ea6912..61289ee70 100644 --- a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduSession.java +++ b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduSession.java @@ -22,6 +22,7 @@ import static org.apache.kudu.client.ExternalConsistencyMode.CLIENT_PROPAGATED; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Random; @@ -124,7 +125,10 @@ public class AsyncKuduSession implements SessionConfiguration { private final Random randomizer = new Random(); private final ErrorCollector errorCollector; private int flushIntervalMillis = 1000; - private int mutationBufferMaxOps = 1000; // TODO express this in terms of data size. + private int mutationBufferMaxOps = 1000; + + // NOTE : -1 means no limit, set a positive value to limit the max size. + private long mutationBufferMaxSize = -1; private FlushMode flushMode; private ExternalConsistencyMode consistencyMode; private long timeoutMillis; @@ -244,12 +248,13 @@ public class AsyncKuduSession implements SessionConfiguration { } @Override - public void setMutationBufferSpace(int numOps) { + public void setMutationBufferSpace(int numOps, long maxSize) { if (hasPendingOperations()) { throw new IllegalArgumentException("Cannot change the buffer" + " size when operations are buffered"); } this.mutationBufferMaxOps = numOps; + this.mutationBufferMaxSize = maxSize; } @Override @@ -352,7 +357,7 @@ public class AsyncKuduSession implements SessionConfiguration { private final Deferred<List<BatchResponse>> deferred; public TabletLookupCB(Buffer buffer, Deferred<List<BatchResponse>> deferred) { - this.lookupsOutstanding = new AtomicInteger(buffer.getOperations().size()); + this.lookupsOutstanding = new AtomicInteger(buffer.numOps()); this.buffer = buffer; this.deferred = deferred; } @@ -372,7 +377,7 @@ public class AsyncKuduSession implements SessionConfiguration { List<Integer> opsFailedIndexesList = new ArrayList<>(); int currentIndex = 0; - for (BufferedOperation bufferedOp : buffer.getOperations()) { + for (BufferedOperation bufferedOp : buffer) { Operation operation = bufferedOp.getOperation(); if (bufferedOp.tabletLookupFailed()) { Exception failure = bufferedOp.getTabletLookupFailure(); @@ -556,7 +561,7 @@ public class AsyncKuduSession implements SessionConfiguration { * @return the operation responses */ private Deferred<List<OperationResponse>> doFlush(Buffer buffer) { - if (buffer == null || buffer.getOperations().isEmpty()) { + if (buffer == null || buffer.isEmpty()) { return Deferred.fromResult(ImmutableList.of()); } LOG.debug("flushing buffer: {}", buffer); @@ -564,7 +569,7 @@ public class AsyncKuduSession implements SessionConfiguration { Deferred<List<BatchResponse>> batchResponses = new Deferred<>(); Callback<Void, Object> tabletLookupCB = new TabletLookupCB(buffer, batchResponses); - for (BufferedOperation bufferedOperation : buffer.getOperations()) { + for (BufferedOperation bufferedOperation : buffer) { AsyncUtil.addBoth(bufferedOperation.getTabletLookup(), tabletLookupCB); } @@ -617,7 +622,7 @@ public class AsyncKuduSession implements SessionConfiguration { public boolean hasPendingOperations() { synchronized (monitor) { return activeBuffer == null ? inactiveBuffers.size() < 2 : - !activeBuffer.getOperations().isEmpty() || !inactiveBufferAvailable(); + !activeBuffer.isEmpty() || !inactiveBufferAvailable(); } } @@ -641,6 +646,20 @@ public class AsyncKuduSession implements SessionConfiguration { .addErrback(new SingleOperationErrCallback(operation)); } + private boolean isExcessMaxSize(long size) { + return mutationBufferMaxSize >= 0 && size >= mutationBufferMaxSize; + } + + /** + * Check the buffer and determine whether a flush operation needs to be performed. + * @param activeBufferOps the number of active buffer ops + * @param activeBufferSize the number of active buffer byte size + * @return true if the flush in need. + */ + private boolean needFlush(int activeBufferOps, long activeBufferSize) { + return activeBufferOps >= mutationBufferMaxOps || isExcessMaxSize(activeBufferSize); + } + /** * Apply the given operation. * <p> @@ -678,8 +697,8 @@ public class AsyncKuduSession implements SessionConfiguration { LookupType.POINT, timeoutMillis); - // Holds a buffer that should be flushed outside the synchronized block, if necessary. - Buffer fullBuffer = null; + // Holds buffers that should be flushed outside the synchronized block, if necessary. + List<Buffer> fullBuffers = new ArrayList<>(); try { synchronized (monitor) { Deferred<Void> notification = flushNotification.get(); @@ -698,7 +717,8 @@ public class AsyncKuduSession implements SessionConfiguration { } } - int activeBufferSize = activeBuffer.getOperations().size(); + int activeBufferOps = activeBuffer.numOps(); + long activeBufferSize = activeBuffer.bufferSize(); switch (flushMode) { case AUTO_FLUSH_SYNC: { // This case is handled above and is impossible here. @@ -707,19 +727,19 @@ public class AsyncKuduSession implements SessionConfiguration { break; } case MANUAL_FLUSH: { - if (activeBufferSize >= mutationBufferMaxOps) { + if (needFlush(activeBufferOps, activeBufferSize)) { Status statusIllegalState = Status.IllegalState("MANUAL_FLUSH is enabled but the buffer is too big"); throw new NonRecoverableException(statusIllegalState); } - activeBuffer.getOperations().add(new BufferedOperation(tablet, operation)); + activeBuffer.addOperation(new BufferedOperation(tablet, operation)); break; } case AUTO_FLUSH_BACKGROUND: { - if (activeBufferSize >= mutationBufferMaxOps) { + if (needFlush(activeBufferOps, activeBufferSize)) { // If the active buffer is full or overflowing, be sure to kick off a flush. - fullBuffer = retireActiveBufferUnlocked(); - activeBufferSize = 0; + fullBuffers.add(retireActiveBufferUnlocked()); + activeBufferOps = 0; if (!inactiveBufferAvailable()) { Status statusServiceUnavailable = @@ -733,13 +753,14 @@ public class AsyncKuduSession implements SessionConfiguration { // Add the operation to the active buffer, and: // 1. If it's the first operation in the buffer, start a background flush timer. // 2. If it filled or overflowed the buffer, kick off a flush. - activeBuffer.getOperations().add(new BufferedOperation(tablet, operation)); - if (activeBufferSize == 0) { + activeBuffer.addOperation(new BufferedOperation(tablet, operation)); + if (activeBufferOps == 0) { AsyncKuduClient.newTimeout(client.getTimer(), activeBuffer.getFlusherTask(), flushIntervalMillis); } - if (activeBufferSize + 1 >= mutationBufferMaxOps && inactiveBufferAvailable()) { - fullBuffer = retireActiveBufferUnlocked(); + if (needFlush(activeBufferOps + 1, activeBufferSize + operation.getRow().size()) && + inactiveBufferAvailable()) { + fullBuffers.add(retireActiveBufferUnlocked()); } break; } @@ -748,8 +769,10 @@ public class AsyncKuduSession implements SessionConfiguration { } } } finally { - // Flush the buffer outside of the synchronized block, if required. - doFlush(fullBuffer); + // Flush the buffers outside of the synchronized block, if required. + for (Buffer fullBuffer : fullBuffers) { + doFlush(fullBuffer); + } } return operation.getDeferred(); } @@ -874,16 +897,37 @@ public class AsyncKuduSession implements SessionConfiguration { * Buffer is externally synchronized. When the active buffer, {@link #monitor} * synchronizes access to it. */ - private final class Buffer { + private final class Buffer implements Iterable<BufferedOperation> { private final List<BufferedOperation> operations = new ArrayList<>(); + // NOTE: This param is different from operations.size(). + // It's the number of total buffer operation size, mainly used to count the used buffer size. + private long operationSize; private FlusherTask flusherTask = null; private Deferred<Void> flushNotification = Deferred.fromResult(null); private boolean flushNotificationFired = false; - public List<BufferedOperation> getOperations() { - return operations; + public void addOperation(BufferedOperation operation) { + operations.add(operation); + operationSize += operation.getOperation().getRow().size(); + } + + @Override + public Iterator<BufferedOperation> iterator() { + return operations.iterator(); + } + + public boolean isEmpty() { + return operations.isEmpty(); + } + + public int numOps() { + return operations.size(); + } + + public long bufferSize() { + return operationSize; } @GuardedBy("monitor") @@ -935,6 +979,7 @@ public class AsyncKuduSession implements SessionConfiguration { void resetUnlocked() { LOG.trace("buffer resetUnlocked: {}", this); operations.clear(); + operationSize = 0; flushNotification = new Deferred<>(); flushNotificationFired = false; flusherTask = null; @@ -944,6 +989,7 @@ public class AsyncKuduSession implements SessionConfiguration { public String toString() { return MoreObjects.toStringHelper(this) .add("operations", operations.size()) + .add("operationSize", operationSize) .add("flusherTask", flusherTask) .add("flushNotification", flushNotification) .toString(); diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduSession.java b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduSession.java index b1845bd5f..fcebc137f 100644 --- a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduSession.java +++ b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduSession.java @@ -126,8 +126,8 @@ public class KuduSession implements SessionConfiguration { } @Override - public void setMutationBufferSpace(int numOps) { - session.setMutationBufferSpace(numOps); + public void setMutationBufferSpace(int numOps, long maxSize) { + session.setMutationBufferSpace(numOps, maxSize); } @Override diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/PartialRow.java b/java/kudu-client/src/main/java/org/apache/kudu/client/PartialRow.java index 37de74fee..e41e7033a 100644 --- a/java/kudu-client/src/main/java/org/apache/kudu/client/PartialRow.java +++ b/java/kudu-client/src/main/java/org/apache/kudu/client/PartialRow.java @@ -1907,4 +1907,23 @@ public class PartialRow { void freeze() { this.frozen = true; } + + /** + * @return in memory size of this row. + * <p> + * Note: the size here is not accurate, as we do not count all the fields, but it is + * enough for most scenarios. + */ + long size() { + long size = (long) rowAlloc.length + columnsBitSet.size() / Byte.SIZE; + if (nullsBitSet != null) { + size += nullsBitSet.size() / Byte.SIZE; + } + for (ByteBuffer bb : varLengthData) { + if (bb != null) { + size += bb.capacity(); + } + } + return size; + } } diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/SessionConfiguration.java b/java/kudu-client/src/main/java/org/apache/kudu/client/SessionConfiguration.java index dfb756652..94bcb2934 100644 --- a/java/kudu-client/src/main/java/org/apache/kudu/client/SessionConfiguration.java +++ b/java/kudu-client/src/main/java/org/apache/kudu/client/SessionConfiguration.java @@ -90,7 +90,17 @@ public interface SessionConfiguration { * @param size number of ops. * @throws IllegalArgumentException if the buffer isn't empty. */ - void setMutationBufferSpace(int size); + default void setMutationBufferSpace(int size) { + setMutationBufferSpace(size, -1); + } + + /** + * Set the number and the maximum byte size of operations that can be buffered. + * @param numOps number of ops. + * @param maxSize max byte size of ops. + * @throws IllegalArgumentException if the buffer isn't empty. + */ + void setMutationBufferSpace(int numOps, long maxSize); /** * Set the low watermark for this session. The default is set to half the mutation buffer space. diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/TestAsyncKuduSession.java b/java/kudu-client/src/test/java/org/apache/kudu/client/TestAsyncKuduSession.java index e66789b69..49e3f0662 100644 --- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestAsyncKuduSession.java +++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestAsyncKuduSession.java @@ -22,6 +22,7 @@ import static org.apache.kudu.test.ClientTestUtil.createBasicSchemaInsert; import static org.apache.kudu.test.ClientTestUtil.getBasicCreateTableOptions; import static org.apache.kudu.test.ClientTestUtil.getBasicSchema; import static org.apache.kudu.test.KuduTestHarness.DEFAULT_SLEEP; +import static org.apache.kudu.test.junit.AssertHelpers.assertEventuallyTrue; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; @@ -40,6 +41,7 @@ import org.slf4j.LoggerFactory; import org.apache.kudu.Schema; import org.apache.kudu.WireProtocol.AppStatusPB; import org.apache.kudu.test.KuduTestHarness; +import org.apache.kudu.test.junit.AssertHelpers.BooleanExpression; import org.apache.kudu.tserver.Tserver.TabletServerErrorPB; public class TestAsyncKuduSession { @@ -387,6 +389,44 @@ public class TestAsyncKuduSession { assertEquals(2 * kNumOps, countRowsInTable(table)); } + /** + * Test KuduSession supports configuring buffer space by data size. + */ + @Test(timeout = 90000) + public void testFlushBySize() throws Exception { + AsyncKuduSession session = client.newSession(); + final int kBufferSizeOps = 10; + final int kNumOps = 2; + // Set a small buffer size so we should flush every time. + session.setMutationBufferSpace(kBufferSizeOps, 1); + // Set a large flush interval so if the flush by size function is not correctly implemented, + // the test will timeout. + session.setFlushInterval(60 * 60 * 1000); + session.setFlushMode(SessionConfiguration.FlushMode.AUTO_FLUSH_BACKGROUND); + + for (int i = 0; i < kNumOps; i++) { + // Should always flush immediately so here join will return soon. + OperationResponse resp = session.apply(createInsert(i)).join(DEFAULT_SLEEP); + assertFalse(resp.hasRowError()); + } + // Mode AUTO_FLUSH_BACKGROUND also takes time, so we may need wait here. + assertEventuallyTrue(String.format("Timeout for flush pending operations"), + new BooleanExpression() { + @Override + public boolean get() throws Exception { + return !session.hasPendingOperations(); + } + }, /* timeoutMillis = */500000); + assertEquals(0, session.countPendingErrors()); + // Confirm that we can still make progress. + session.apply(createInsert(kNumOps)).join(DEFAULT_SLEEP); + + for (OperationResponse resp: session.flush().join(DEFAULT_SLEEP)) { + assertFalse(resp.hasRowError()); + } + assertEquals(0, session.close().join(DEFAULT_SLEEP).size()); + } + // A helper just to make some lines shorter. private Insert createInsert(int key) { return createBasicSchemaInsert(table, key);
