This is an automated email from the ASF dual-hosted git repository.

alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit f52ee7c4df53466df07bf6cbd2241d088a04fc04
Author: wenjie <[email protected]>
AuthorDate: Wed Jul 15 20:45:04 2020 +0800

    KUDU-1422 resize ErrorCollector
    
    ErrorCollector size was hard-coded. Added resize method in
    ErrorCollector to change its size accordingly.
    
    Change-Id: I53731f6367aa84d6435b3bf2143e86164c8eaa1e
    Reviewed-on: http://gerrit.cloudera.org:8080/16201
    Tested-by: Andrew Wong <[email protected]>
    Reviewed-by: Attila Bukor <[email protected]>
    Reviewed-by: Grant Henke <[email protected]>
---
 java/config/spotbugs/excludeFilter.xml             |  5 ++-
 .../org/apache/kudu/client/AsyncKuduSession.java   |  5 +++
 .../org/apache/kudu/client/ErrorCollector.java     | 28 ++++++++++++-
 .../java/org/apache/kudu/client/KuduSession.java   |  5 +++
 .../apache/kudu/client/SessionConfiguration.java   |  6 +++
 .../org/apache/kudu/client/TestErrorCollector.java | 47 ++++++++++++++++++++++
 6 files changed, 94 insertions(+), 2 deletions(-)

diff --git a/java/config/spotbugs/excludeFilter.xml 
b/java/config/spotbugs/excludeFilter.xml
index d570244..bcc7ad6 100644
--- a/java/config/spotbugs/excludeFilter.xml
+++ b/java/config/spotbugs/excludeFilter.xml
@@ -176,7 +176,10 @@
     <Match>
         <!-- The return value isn't needed. -->
         <Class name="org.apache.kudu.client.ErrorCollector"/>
-        <Method name="addError" />
+        <Or>
+            <Method name="addError" />
+            <Method name="resize" />
+        </Or>
         <Bug pattern="RV_RETURN_VALUE_IGNORED" />
     </Match>
     <Match>
diff --git 
a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduSession.java 
b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduSession.java
index 5456c37..c7d4e64 100644
--- 
a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduSession.java
+++ 
b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduSession.java
@@ -223,6 +223,11 @@ public class AsyncKuduSession implements 
SessionConfiguration {
     this.mutationBufferMaxOps = numOps;
   }
 
+  @Override
+  public void setErrorCollectorSpace(int size) {
+    this.errorCollector.resize(size);
+  }
+
   @Deprecated
   @Override
   public void setMutationBufferLowWatermark(float 
mutationBufferLowWatermarkPercentage) {
diff --git 
a/java/kudu-client/src/main/java/org/apache/kudu/client/ErrorCollector.java 
b/java/kudu-client/src/main/java/org/apache/kudu/client/ErrorCollector.java
index 9ddaf56..3a334f1 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/ErrorCollector.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/ErrorCollector.java
@@ -31,7 +31,7 @@ import org.apache.yetus.audience.InterfaceStability;
 @InterfaceStability.Evolving
 public class ErrorCollector {
   private final Queue<RowError> errorQueue;
-  private final int maxCapacity;
+  private int maxCapacity;
   private boolean overflowed;
 
   /**
@@ -81,4 +81,30 @@ public class ErrorCollector {
     overflowed = false;
     return returnObject;
   }
+
+  /**
+   * Resize ErrorCollector. If size < errorQueue.size(),
+   * the oldest errors will be discarded and overflowed will be set;
+   */
+  public synchronized void resize(int size) {
+    Preconditions.checkArgument(size > 0, "Need to be able to store at least 
one row error");
+    if (size == maxCapacity) {
+      return;
+    }
+
+    if (size < maxCapacity) {
+      int trimmedErrors = errorQueue.size() - size;
+      if (trimmedErrors > 0) {
+        overflowed = true;
+      } else {
+        trimmedErrors = 0;
+      }
+
+      for (int i = 0; i < trimmedErrors; ++i) {
+        errorQueue.poll();
+      }
+    }
+
+    maxCapacity = size;
+  }
 }
diff --git 
a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduSession.java 
b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduSession.java
index 2221a84..0916467 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduSession.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduSession.java
@@ -131,6 +131,11 @@ public class KuduSession implements SessionConfiguration {
   }
 
   @Override
+  public void setErrorCollectorSpace(int size) {
+    session.setErrorCollectorSpace(size);
+  }
+
+  @Override
   @Deprecated
   public void setMutationBufferLowWatermark(float 
mutationBufferLowWatermarkPercentage) {
     LOG.warn("setMutationBufferLowWatermark is deprecated");
diff --git 
a/java/kudu-client/src/main/java/org/apache/kudu/client/SessionConfiguration.java
 
b/java/kudu-client/src/main/java/org/apache/kudu/client/SessionConfiguration.java
index c7bae9a..c2071b6 100644
--- 
a/java/kudu-client/src/main/java/org/apache/kudu/client/SessionConfiguration.java
+++ 
b/java/kudu-client/src/main/java/org/apache/kudu/client/SessionConfiguration.java
@@ -186,6 +186,12 @@ public interface SessionConfiguration {
   void setIgnoreAllNotFoundRows(boolean ignoreAllNotFoundRows);
 
   /**
+   * Set the number of errors that can be collected.
+   * @param size number of errors.
+   */
+  void setErrorCollectorSpace(int size);
+
+  /**
    * Return the number of errors which are pending. Errors may accumulate when
    * using {@link FlushMode#AUTO_FLUSH_BACKGROUND AUTO_FLUSH_BACKGROUND} mode.
    * @return a count of errors
diff --git 
a/java/kudu-client/src/test/java/org/apache/kudu/client/TestErrorCollector.java 
b/java/kudu-client/src/test/java/org/apache/kudu/client/TestErrorCollector.java
index 63b37b9..0ae9d6d 100644
--- 
a/java/kudu-client/src/test/java/org/apache/kudu/client/TestErrorCollector.java
+++ 
b/java/kudu-client/src/test/java/org/apache/kudu/client/TestErrorCollector.java
@@ -81,6 +81,53 @@ public class TestErrorCollector {
     Assert.assertTrue(reos.isOverflowed());
     Assert.assertEquals(maxErrors, reos.getRowErrors().length);
     Assert.assertEquals(countToTest - 1, 
reos.getRowErrors()[9].getErrorStatus().getPosixCode());
+
+    // Test enlarging non-overflown collector
+    countToTest = 10;
+    fillCollectorWith(collector, countToTest);
+    Assert.assertEquals(maxErrors, collector.countErrors());
+    collector.resize(2 * maxErrors);
+    reos = collector.getErrors();
+    Assert.assertEquals(0, collector.countErrors());
+    Assert.assertFalse(reos.isOverflowed());
+    Assert.assertEquals(maxErrors, reos.getRowErrors().length);
+    Assert.assertEquals(countToTest - 1, 
reos.getRowErrors()[9].getErrorStatus().getPosixCode());
+
+    // Test enlarging overflown collector
+    countToTest = 11;
+    collector = new ErrorCollector(maxErrors);
+    fillCollectorWith(collector, countToTest);
+    Assert.assertEquals(maxErrors, collector.countErrors());
+    collector.resize(2 * maxErrors);
+    collector.addError(createRowError(42));
+    reos = collector.getErrors();
+    Assert.assertEquals(0, collector.countErrors());
+    Assert.assertTrue(reos.isOverflowed());
+    Assert.assertEquals(11, reos.getRowErrors().length);
+    Assert.assertEquals(42, 
reos.getRowErrors()[10].getErrorStatus().getPosixCode());
+
+    // Test shrinking without overflow
+    countToTest = 5;
+    fillCollectorWith(collector, countToTest);
+    Assert.assertEquals(countToTest, collector.countErrors());
+    collector.resize(maxErrors);
+    reos = collector.getErrors();
+    Assert.assertEquals(0, collector.countErrors());
+    Assert.assertFalse(reos.isOverflowed());
+    Assert.assertEquals(countToTest, reos.getRowErrors().length);
+    Assert.assertEquals(countToTest - 1, 
reos.getRowErrors()[4].getErrorStatus().getPosixCode());
+
+    // Test shrinking with overflow
+    countToTest = 5;
+    fillCollectorWith(collector, countToTest);
+    Assert.assertEquals(countToTest, collector.countErrors());
+    collector.resize(countToTest - 1);
+    reos = collector.getErrors();
+    Assert.assertEquals(0, collector.countErrors());
+    Assert.assertTrue(reos.isOverflowed());
+    Assert.assertEquals(countToTest - 1, reos.getRowErrors().length);
+    // the oldest error is popped
+    Assert.assertEquals(countToTest - 1, 
reos.getRowErrors()[3].getErrorStatus().getPosixCode());
   }
 
   private void fillCollectorWith(ErrorCollector collector, int errorsToAdd) {

Reply via email to