This is an automated email from the ASF dual-hosted git repository.

chamikara pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 416a4f268a2 [RRIO] [Call] Create CallShouldBackoff and default 
implementation (#28952)
416a4f268a2 is described below

commit 416a4f268a27be9e02971b0efbce523ff4fc4644
Author: Damon <[email protected]>
AuthorDate: Tue Oct 17 13:53:00 2023 -0700

    [RRIO] [Call] Create CallShouldBackoff and default implementation (#28952)
    
    * Create CallShouldBackoff and default implementation
    
    * Fix test class comment.
    
    * Update javadoc
    
    * Update per PR comments
    
    * Patch code comment
---
 .../io/requestresponseio/CallShouldBackoff.java    |  33 +++++++
 ...llShouldBackoffBasedOnRejectionProbability.java | 103 +++++++++++++++++++++
 ...ouldBackoffBasedOnRejectionProbabilityTest.java |  92 ++++++++++++++++++
 3 files changed, 228 insertions(+)

diff --git 
a/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponseio/CallShouldBackoff.java
 
b/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponseio/CallShouldBackoff.java
new file mode 100644
index 00000000000..9a0c42de5b1
--- /dev/null
+++ 
b/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponseio/CallShouldBackoff.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.io.requestresponseio;
+
+import java.io.Serializable;
+
+/** Informs whether a call to an API should backoff. */
+public interface CallShouldBackoff<ResponseT> extends Serializable {
+
+  /** Update the state of whether to backoff using information about the 
exception. */
+  void update(UserCodeExecutionException exception);
+
+  /** Update the state of whether to backoff using information about the 
response. */
+  void update(ResponseT response);
+
+  /** Report whether to backoff. */
+  boolean value();
+}
diff --git 
a/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponseio/CallShouldBackoffBasedOnRejectionProbability.java
 
b/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponseio/CallShouldBackoffBasedOnRejectionProbability.java
new file mode 100644
index 00000000000..8298809fca8
--- /dev/null
+++ 
b/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponseio/CallShouldBackoffBasedOnRejectionProbability.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.io.requestresponseio;
+
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+/** Reports whether to apply backoff based on 
https://sre.google/sre-book/handling-overload/. */
+class CallShouldBackoffBasedOnRejectionProbability<ResponseT>
+    implements CallShouldBackoff<ResponseT> {
+
+  // Default multiplier value recommended by 
https://sre.google/sre-book/handling-overload/
+  private static final double DEFAULT_MULTIPLIER = 2.0;
+
+  // The threshold is the value that the rejection probability must exceed in 
order to report a
+  // value() of true. If null, then the computation relies on a random value.
+  private @Nullable Double threshold;
+
+  // The multiplier drives the impact of accepts on the rejection probability. 
See <a
+  // https://sre.google/sre-book/handling-overload/ for details.
+  private final double multiplier;
+
+  // The number of total requests called to a remote API.
+  private double requests = 0;
+
+  // The number of total accepts called to a remote API.
+  private double accepts = 0;
+
+  /** Instantiate class with the {@link #DEFAULT_MULTIPLIER}. */
+  CallShouldBackoffBasedOnRejectionProbability() {
+    this(DEFAULT_MULTIPLIER);
+  }
+
+  /**
+   * Instantiates class with the provided multiplier. The multiplier drives 
the impact of accepts on
+   * the rejection probability. See 
https://sre.google/sre-book/handling-overload/ for details.
+   */
+  CallShouldBackoffBasedOnRejectionProbability(double multiplier) {
+    this.multiplier = multiplier;
+  }
+
+  /**
+   * Setter for the threshold that overrides usage of a random value. The 
threshold is the value
+   * (within range [0, 1)) that {@link #getRejectionProbability()} must exceed 
in order to report a
+   * value() of true.
+   */
+  CallShouldBackoffBasedOnRejectionProbability<ResponseT> setThreshold(double 
threshold) {
+    this.threshold = threshold;
+    return this;
+  }
+
+  /** Update the state of whether to backoff using information about the 
exception. */
+  @Override
+  public void update(UserCodeExecutionException exception) {
+    this.requests++;
+  }
+
+  /** Update the state of whether to backoff using information about the 
response. */
+  @Override
+  public void update(ResponseT response) {
+    this.requests++;
+    this.accepts++;
+  }
+
+  /** Provide a threshold to evaluate backoff. */
+  double getThreshold() {
+    if (this.threshold != null) {
+      return this.threshold;
+    }
+    return Math.random();
+  }
+
+  /**
+   * Compute the probability of API call rejection based on
+   * https://sre.google/sre-book/handling-overload/.
+   */
+  double getRejectionProbability() {
+    double numerator = requests - multiplier * accepts;
+    double denominator = requests + 1;
+    double ratio = numerator / denominator;
+    return Math.max(0, ratio);
+  }
+
+  /** Report whether to backoff. */
+  @Override
+  public boolean value() {
+    return getRejectionProbability() > getThreshold();
+  }
+}
diff --git 
a/sdks/java/io/rrio/src/test/java/org/apache/beam/io/requestresponseio/CallShouldBackoffBasedOnRejectionProbabilityTest.java
 
b/sdks/java/io/rrio/src/test/java/org/apache/beam/io/requestresponseio/CallShouldBackoffBasedOnRejectionProbabilityTest.java
new file mode 100644
index 00000000000..b1e8347a25c
--- /dev/null
+++ 
b/sdks/java/io/rrio/src/test/java/org/apache/beam/io/requestresponseio/CallShouldBackoffBasedOnRejectionProbabilityTest.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.io.requestresponseio;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for {@link CallShouldBackoffBasedOnRejectionProbability}. */
+@RunWith(JUnit4.class)
+public class CallShouldBackoffBasedOnRejectionProbabilityTest {
+
+  @Test
+  public void testValue() {
+    for (Case caze : CASES) {
+      CallShouldBackoffBasedOnRejectionProbability<String> shouldBackoff = 
instance();
+      for (boolean ar : caze.acceptRejects) {
+        if (ar) {
+          shouldBackoff.update("");
+        } else {
+          shouldBackoff.update(new UserCodeExecutionException(""));
+        }
+      }
+      assertEquals(caze.toString(), caze.wantPReject, 
shouldBackoff.getRejectionProbability(), 0.1);
+      assertEquals(caze.toString(), caze.wantValue, shouldBackoff.value());
+    }
+  }
+
+  private static final List<Case> CASES =
+      Arrays.asList(
+          of(0, false),
+          of(0, false, true, true, true, true, true, true, true, true, true, 
true, true),
+          of(0, false, true),
+          of(0.5, false, false),
+          of(0.91, true, false, false, false, false, false, false, false, 
false, false, false));
+
+  private static Case of(double wantPReject, boolean wantValue, boolean... 
acceptRejects) {
+    List<Boolean> list = new ArrayList<>();
+    for (boolean ar : acceptRejects) {
+      list.add(ar);
+    }
+    return new Case(list, wantPReject, wantValue);
+  }
+
+  private static class Case {
+    private final List<Boolean> acceptRejects;
+    private final double wantPReject;
+    private final boolean wantValue;
+
+    Case(List<Boolean> acceptRejects, double wantPReject, boolean wantValue) {
+      this.acceptRejects = acceptRejects;
+      this.wantPReject = wantPReject;
+      this.wantValue = wantValue;
+    }
+
+    @Override
+    public String toString() {
+      return "Case{"
+          + "acceptRejects="
+          + acceptRejects
+          + ", wantPReject="
+          + wantPReject
+          + ", wantValue="
+          + wantValue
+          + '}';
+    }
+  }
+
+  CallShouldBackoffBasedOnRejectionProbability<String> instance() {
+    return new 
CallShouldBackoffBasedOnRejectionProbability<String>().setThreshold(0.5);
+  }
+}

Reply via email to