This is an automated email from the ASF dual-hosted git repository. alexey pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/kudu.git
commit f52ee7c4df53466df07bf6cbd2241d088a04fc04 Author: wenjie <[email protected]> AuthorDate: Wed Jul 15 20:45:04 2020 +0800 KUDU-1422 resize ErrorCollector ErrorCollector size was hard-coded. Added resize method in ErrorCollector to change its size accordingly. Change-Id: I53731f6367aa84d6435b3bf2143e86164c8eaa1e Reviewed-on: http://gerrit.cloudera.org:8080/16201 Tested-by: Andrew Wong <[email protected]> Reviewed-by: Attila Bukor <[email protected]> Reviewed-by: Grant Henke <[email protected]> --- java/config/spotbugs/excludeFilter.xml | 5 ++- .../org/apache/kudu/client/AsyncKuduSession.java | 5 +++ .../org/apache/kudu/client/ErrorCollector.java | 28 ++++++++++++- .../java/org/apache/kudu/client/KuduSession.java | 5 +++ .../apache/kudu/client/SessionConfiguration.java | 6 +++ .../org/apache/kudu/client/TestErrorCollector.java | 47 ++++++++++++++++++++++ 6 files changed, 94 insertions(+), 2 deletions(-) diff --git a/java/config/spotbugs/excludeFilter.xml b/java/config/spotbugs/excludeFilter.xml index d570244..bcc7ad6 100644 --- a/java/config/spotbugs/excludeFilter.xml +++ b/java/config/spotbugs/excludeFilter.xml @@ -176,7 +176,10 @@ <Match> <!-- The return value isn't needed. --> <Class name="org.apache.kudu.client.ErrorCollector"/> - <Method name="addError" /> + <Or> + <Method name="addError" /> + <Method name="resize" /> + </Or> <Bug pattern="RV_RETURN_VALUE_IGNORED" /> </Match> <Match> diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduSession.java b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduSession.java index 5456c37..c7d4e64 100644 --- a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduSession.java +++ b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduSession.java @@ -223,6 +223,11 @@ public class AsyncKuduSession implements SessionConfiguration { this.mutationBufferMaxOps = numOps; } + @Override + public void setErrorCollectorSpace(int size) { + this.errorCollector.resize(size); + } + @Deprecated @Override public void setMutationBufferLowWatermark(float mutationBufferLowWatermarkPercentage) { diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/ErrorCollector.java b/java/kudu-client/src/main/java/org/apache/kudu/client/ErrorCollector.java index 9ddaf56..3a334f1 100644 --- a/java/kudu-client/src/main/java/org/apache/kudu/client/ErrorCollector.java +++ b/java/kudu-client/src/main/java/org/apache/kudu/client/ErrorCollector.java @@ -31,7 +31,7 @@ import org.apache.yetus.audience.InterfaceStability; @InterfaceStability.Evolving public class ErrorCollector { private final Queue<RowError> errorQueue; - private final int maxCapacity; + private int maxCapacity; private boolean overflowed; /** @@ -81,4 +81,30 @@ public class ErrorCollector { overflowed = false; return returnObject; } + + /** + * Resize ErrorCollector. If size < errorQueue.size(), + * the oldest errors will be discarded and overflowed will be set; + */ + public synchronized void resize(int size) { + Preconditions.checkArgument(size > 0, "Need to be able to store at least one row error"); + if (size == maxCapacity) { + return; + } + + if (size < maxCapacity) { + int trimmedErrors = errorQueue.size() - size; + if (trimmedErrors > 0) { + overflowed = true; + } else { + trimmedErrors = 0; + } + + for (int i = 0; i < trimmedErrors; ++i) { + errorQueue.poll(); + } + } + + maxCapacity = size; + } } diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduSession.java b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduSession.java index 2221a84..0916467 100644 --- a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduSession.java +++ b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduSession.java @@ -131,6 +131,11 @@ public class KuduSession implements SessionConfiguration { } @Override + public void setErrorCollectorSpace(int size) { + session.setErrorCollectorSpace(size); + } + + @Override @Deprecated public void setMutationBufferLowWatermark(float mutationBufferLowWatermarkPercentage) { LOG.warn("setMutationBufferLowWatermark is deprecated"); diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/SessionConfiguration.java b/java/kudu-client/src/main/java/org/apache/kudu/client/SessionConfiguration.java index c7bae9a..c2071b6 100644 --- a/java/kudu-client/src/main/java/org/apache/kudu/client/SessionConfiguration.java +++ b/java/kudu-client/src/main/java/org/apache/kudu/client/SessionConfiguration.java @@ -186,6 +186,12 @@ public interface SessionConfiguration { void setIgnoreAllNotFoundRows(boolean ignoreAllNotFoundRows); /** + * Set the number of errors that can be collected. + * @param size number of errors. + */ + void setErrorCollectorSpace(int size); + + /** * Return the number of errors which are pending. Errors may accumulate when * using {@link FlushMode#AUTO_FLUSH_BACKGROUND AUTO_FLUSH_BACKGROUND} mode. * @return a count of errors diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/TestErrorCollector.java b/java/kudu-client/src/test/java/org/apache/kudu/client/TestErrorCollector.java index 63b37b9..0ae9d6d 100644 --- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestErrorCollector.java +++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestErrorCollector.java @@ -81,6 +81,53 @@ public class TestErrorCollector { Assert.assertTrue(reos.isOverflowed()); Assert.assertEquals(maxErrors, reos.getRowErrors().length); Assert.assertEquals(countToTest - 1, reos.getRowErrors()[9].getErrorStatus().getPosixCode()); + + // Test enlarging non-overflown collector + countToTest = 10; + fillCollectorWith(collector, countToTest); + Assert.assertEquals(maxErrors, collector.countErrors()); + collector.resize(2 * maxErrors); + reos = collector.getErrors(); + Assert.assertEquals(0, collector.countErrors()); + Assert.assertFalse(reos.isOverflowed()); + Assert.assertEquals(maxErrors, reos.getRowErrors().length); + Assert.assertEquals(countToTest - 1, reos.getRowErrors()[9].getErrorStatus().getPosixCode()); + + // Test enlarging overflown collector + countToTest = 11; + collector = new ErrorCollector(maxErrors); + fillCollectorWith(collector, countToTest); + Assert.assertEquals(maxErrors, collector.countErrors()); + collector.resize(2 * maxErrors); + collector.addError(createRowError(42)); + reos = collector.getErrors(); + Assert.assertEquals(0, collector.countErrors()); + Assert.assertTrue(reos.isOverflowed()); + Assert.assertEquals(11, reos.getRowErrors().length); + Assert.assertEquals(42, reos.getRowErrors()[10].getErrorStatus().getPosixCode()); + + // Test shrinking without overflow + countToTest = 5; + fillCollectorWith(collector, countToTest); + Assert.assertEquals(countToTest, collector.countErrors()); + collector.resize(maxErrors); + reos = collector.getErrors(); + Assert.assertEquals(0, collector.countErrors()); + Assert.assertFalse(reos.isOverflowed()); + Assert.assertEquals(countToTest, reos.getRowErrors().length); + Assert.assertEquals(countToTest - 1, reos.getRowErrors()[4].getErrorStatus().getPosixCode()); + + // Test shrinking with overflow + countToTest = 5; + fillCollectorWith(collector, countToTest); + Assert.assertEquals(countToTest, collector.countErrors()); + collector.resize(countToTest - 1); + reos = collector.getErrors(); + Assert.assertEquals(0, collector.countErrors()); + Assert.assertTrue(reos.isOverflowed()); + Assert.assertEquals(countToTest - 1, reos.getRowErrors().length); + // the oldest error is popped + Assert.assertEquals(countToTest - 1, reos.getRowErrors()[3].getErrorStatus().getPosixCode()); } private void fillCollectorWith(ErrorCollector collector, int errorsToAdd) {
