This is an automated email from the ASF dual-hosted git repository.
chamikara pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 416a4f268a2 [RRIO] [Call] Create CallShouldBackoff and default
implementation (#28952)
416a4f268a2 is described below
commit 416a4f268a27be9e02971b0efbce523ff4fc4644
Author: Damon <[email protected]>
AuthorDate: Tue Oct 17 13:53:00 2023 -0700
[RRIO] [Call] Create CallShouldBackoff and default implementation (#28952)
* Create CallShouldBackoff and default implementation
* Fix test class comment.
* Update javadoc
* Update per PR comments
* Patch code comment
---
.../io/requestresponseio/CallShouldBackoff.java | 33 +++++++
...llShouldBackoffBasedOnRejectionProbability.java | 103 +++++++++++++++++++++
...ouldBackoffBasedOnRejectionProbabilityTest.java | 92 ++++++++++++++++++
3 files changed, 228 insertions(+)
diff --git
a/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponseio/CallShouldBackoff.java
b/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponseio/CallShouldBackoff.java
new file mode 100644
index 00000000000..9a0c42de5b1
--- /dev/null
+++
b/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponseio/CallShouldBackoff.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.io.requestresponseio;
+
+import java.io.Serializable;
+
+/** Informs whether a call to an API should backoff. */
+public interface CallShouldBackoff<ResponseT> extends Serializable {
+
+ /** Update the state of whether to backoff using information about the
exception. */
+ void update(UserCodeExecutionException exception);
+
+ /** Update the state of whether to backoff using information about the
response. */
+ void update(ResponseT response);
+
+ /** Report whether to backoff. */
+ boolean value();
+}
diff --git
a/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponseio/CallShouldBackoffBasedOnRejectionProbability.java
b/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponseio/CallShouldBackoffBasedOnRejectionProbability.java
new file mode 100644
index 00000000000..8298809fca8
--- /dev/null
+++
b/sdks/java/io/rrio/src/main/java/org/apache/beam/io/requestresponseio/CallShouldBackoffBasedOnRejectionProbability.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.io.requestresponseio;
+
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+/** Reports whether to apply backoff based on
https://sre.google/sre-book/handling-overload/. */
+class CallShouldBackoffBasedOnRejectionProbability<ResponseT>
+ implements CallShouldBackoff<ResponseT> {
+
+ // Default multiplier value recommended by
https://sre.google/sre-book/handling-overload/
+ private static final double DEFAULT_MULTIPLIER = 2.0;
+
+ // The threshold is the value that the rejection probability must exceed in
order to report a
+ // value() of true. If null, then the computation relies on a random value.
+ private @Nullable Double threshold;
+
+ // The multiplier drives the impact of accepts on the rejection probability.
See <a
+ // https://sre.google/sre-book/handling-overload/ for details.
+ private final double multiplier;
+
+ // The number of total requests called to a remote API.
+ private double requests = 0;
+
+ // The number of total accepts called to a remote API.
+ private double accepts = 0;
+
+ /** Instantiate class with the {@link #DEFAULT_MULTIPLIER}. */
+ CallShouldBackoffBasedOnRejectionProbability() {
+ this(DEFAULT_MULTIPLIER);
+ }
+
+ /**
+ * Instantiates class with the provided multiplier. The multiplier drives
the impact of accepts on
+ * the rejection probability. See
https://sre.google/sre-book/handling-overload/ for details.
+ */
+ CallShouldBackoffBasedOnRejectionProbability(double multiplier) {
+ this.multiplier = multiplier;
+ }
+
+ /**
+ * Setter for the threshold that overrides usage of a random value. The
threshold is the value
+ * (within range [0, 1)) that {@link #getRejectionProbability()} must exceed
in order to report a
+ * value() of true.
+ */
+ CallShouldBackoffBasedOnRejectionProbability<ResponseT> setThreshold(double
threshold) {
+ this.threshold = threshold;
+ return this;
+ }
+
+ /** Update the state of whether to backoff using information about the
exception. */
+ @Override
+ public void update(UserCodeExecutionException exception) {
+ this.requests++;
+ }
+
+ /** Update the state of whether to backoff using information about the
response. */
+ @Override
+ public void update(ResponseT response) {
+ this.requests++;
+ this.accepts++;
+ }
+
+ /** Provide a threshold to evaluate backoff. */
+ double getThreshold() {
+ if (this.threshold != null) {
+ return this.threshold;
+ }
+ return Math.random();
+ }
+
+ /**
+ * Compute the probability of API call rejection based on
+ * https://sre.google/sre-book/handling-overload/.
+ */
+ double getRejectionProbability() {
+ double numerator = requests - multiplier * accepts;
+ double denominator = requests + 1;
+ double ratio = numerator / denominator;
+ return Math.max(0, ratio);
+ }
+
+ /** Report whether to backoff. */
+ @Override
+ public boolean value() {
+ return getRejectionProbability() > getThreshold();
+ }
+}
diff --git
a/sdks/java/io/rrio/src/test/java/org/apache/beam/io/requestresponseio/CallShouldBackoffBasedOnRejectionProbabilityTest.java
b/sdks/java/io/rrio/src/test/java/org/apache/beam/io/requestresponseio/CallShouldBackoffBasedOnRejectionProbabilityTest.java
new file mode 100644
index 00000000000..b1e8347a25c
--- /dev/null
+++
b/sdks/java/io/rrio/src/test/java/org/apache/beam/io/requestresponseio/CallShouldBackoffBasedOnRejectionProbabilityTest.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.io.requestresponseio;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for {@link CallShouldBackoffBasedOnRejectionProbability}. */
+@RunWith(JUnit4.class)
+public class CallShouldBackoffBasedOnRejectionProbabilityTest {
+
+ @Test
+ public void testValue() {
+ for (Case caze : CASES) {
+ CallShouldBackoffBasedOnRejectionProbability<String> shouldBackoff =
instance();
+ for (boolean ar : caze.acceptRejects) {
+ if (ar) {
+ shouldBackoff.update("");
+ } else {
+ shouldBackoff.update(new UserCodeExecutionException(""));
+ }
+ }
+ assertEquals(caze.toString(), caze.wantPReject,
shouldBackoff.getRejectionProbability(), 0.1);
+ assertEquals(caze.toString(), caze.wantValue, shouldBackoff.value());
+ }
+ }
+
+ private static final List<Case> CASES =
+ Arrays.asList(
+ of(0, false),
+ of(0, false, true, true, true, true, true, true, true, true, true,
true, true),
+ of(0, false, true),
+ of(0.5, false, false),
+ of(0.91, true, false, false, false, false, false, false, false,
false, false, false));
+
+ private static Case of(double wantPReject, boolean wantValue, boolean...
acceptRejects) {
+ List<Boolean> list = new ArrayList<>();
+ for (boolean ar : acceptRejects) {
+ list.add(ar);
+ }
+ return new Case(list, wantPReject, wantValue);
+ }
+
+ private static class Case {
+ private final List<Boolean> acceptRejects;
+ private final double wantPReject;
+ private final boolean wantValue;
+
+ Case(List<Boolean> acceptRejects, double wantPReject, boolean wantValue) {
+ this.acceptRejects = acceptRejects;
+ this.wantPReject = wantPReject;
+ this.wantValue = wantValue;
+ }
+
+ @Override
+ public String toString() {
+ return "Case{"
+ + "acceptRejects="
+ + acceptRejects
+ + ", wantPReject="
+ + wantPReject
+ + ", wantValue="
+ + wantValue
+ + '}';
+ }
+ }
+
+ CallShouldBackoffBasedOnRejectionProbability<String> instance() {
+ return new
CallShouldBackoffBasedOnRejectionProbability<String>().setThreshold(0.5);
+ }
+}