This is an automated email from the ASF dual-hosted git repository.

damccorm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new cbf36740a53 Add Support for EnvoyRateLimiter Implementation (#37573)
cbf36740a53 is described below

commit cbf36740a5362bd43c013ff9cb278ace7c664332
Author: Tarun Annapareddy <[email protected]>
AuthorDate: Fri Feb 13 06:20:17 2026 -0800

    Add Support for EnvoyRateLimiter Implementation (#37573)
    
    * Add EnvoyRateLimiter Implementation
    
    * Add example
    
    * fix style check
    
    * simplify teardown
    
    Co-authored-by: gemini-code-assist[bot] 
<176961590+gemini-code-assist[bot]@users.noreply.github.com>
    
    * add more jitter
    
    Co-authored-by: gemini-code-assist[bot] 
<176961590+gemini-code-assist[bot]@users.noreply.github.com>
    
    * handle thread interrupt
    
    Co-authored-by: gemini-code-assist[bot] 
<176961590+gemini-code-assist[bot]@users.noreply.github.com>
    
    * add connection keep-alive configs
    
    ---------
    
    Co-authored-by: gemini-code-assist[bot] 
<176961590+gemini-code-assist[bot]@users.noreply.github.com>
---
 .../org/apache/beam/gradle/BeamModulePlugin.groovy |   1 +
 examples/java/build.gradle                         |   1 +
 .../apache/beam/examples/RateLimiterSimple.java    | 131 ++++++++++++
 .../resources/beam/checkstyle/suppressions.xml     |   1 +
 sdks/java/io/components/build.gradle               |   5 +
 .../components/ratelimiter/EnvoyRateLimiter.java   |  46 ++++
 .../ratelimiter/EnvoyRateLimiterContext.java       |  65 ++++++
 .../ratelimiter/EnvoyRateLimiterFactory.java       | 238 +++++++++++++++++++++
 .../ratelimiter/RateLimiterClientCache.java        | 103 +++++++++
 .../components/ratelimiter/RateLimiterOptions.java |  68 ++++++
 .../ratelimiter/EnvoyRateLimiterTest.java          | 168 +++++++++++++++
 .../ratelimiter/RateLimiterClientCacheTest.java    | 115 ++++++++++
 .../ratelimiter/RateLimiterOptionsTest.java        |  81 +++++++
 13 files changed, 1023 insertions(+)

diff --git 
a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy 
b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
index df2db7df71b..325af8a8e19 100644
--- a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
+++ b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
@@ -726,6 +726,7 @@ class BeamModulePlugin implements Plugin<Project> {
         commons_logging                             : 
"commons-logging:commons-logging:1.2",
         commons_math3                               : 
"org.apache.commons:commons-math3:3.6.1",
         dbcp2                                       : 
"org.apache.commons:commons-dbcp2:$dbcp2_version",
+        envoy_control_plane_api                     : 
"io.envoyproxy.controlplane:api:1.0.49",
         error_prone_annotations                     : 
"com.google.errorprone:error_prone_annotations:$errorprone_version",
         failsafe                                    : 
"dev.failsafe:failsafe:3.3.0",
         flogger_system_backend                      : 
"com.google.flogger:flogger-system-backend:0.7.4",
diff --git a/examples/java/build.gradle b/examples/java/build.gradle
index 5334538cc09..068c0d1b56f 100644
--- a/examples/java/build.gradle
+++ b/examples/java/build.gradle
@@ -52,6 +52,7 @@ dependencies {
   implementation project(":sdks:java:extensions:avro")
   implementation project(":sdks:java:extensions:google-cloud-platform-core")
   implementation project(":sdks:java:extensions:python")
+  implementation project(":sdks:java:io:components")
   implementation project(":sdks:java:io:google-cloud-platform")
   implementation project(":sdks:java:io:kafka")
   implementation project(":sdks:java:extensions:ml")
diff --git 
a/examples/java/src/main/java/org/apache/beam/examples/RateLimiterSimple.java 
b/examples/java/src/main/java/org/apache/beam/examples/RateLimiterSimple.java
new file mode 100644
index 00000000000..a33e99e4b23
--- /dev/null
+++ 
b/examples/java/src/main/java/org/apache/beam/examples/RateLimiterSimple.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.examples;
+
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.io.components.ratelimiter.EnvoyRateLimiterContext;
+import org.apache.beam.sdk.io.components.ratelimiter.EnvoyRateLimiterFactory;
+import org.apache.beam.sdk.io.components.ratelimiter.RateLimiter;
+import org.apache.beam.sdk.io.components.ratelimiter.RateLimiterContext;
+import org.apache.beam.sdk.io.components.ratelimiter.RateLimiterFactory;
+import org.apache.beam.sdk.io.components.ratelimiter.RateLimiterOptions;
+import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A simple example demonstrating how to use the {@link RateLimiter} in a 
custom {@link DoFn}.
+ *
+ * <p>This pipeline creates a small set of elements and processes them using a 
DoFn that calls an
+ * external service (simulated). The processing is rate-limited using an Envoy 
Rate Limit Service.
+ *
+ * <p>To run this example, you need a running Envoy Rate Limit Service.
+ */
+public class RateLimiterSimple {
+
+  public interface Options extends PipelineOptions {
+    @Description("Address of the Envoy Rate Limit Service(eg:localhost:8081)")
+    String getRateLimiterAddress();
+
+    void setRateLimiterAddress(String value);
+
+    @Description("Domain for the Rate Limit Service(eg:mydomain)")
+    String getRateLimiterDomain();
+
+    void setRateLimiterDomain(String value);
+  }
+
+  static class CallExternalServiceFn extends DoFn<String, String> {
+    private final String rlsAddress;
+    private final String rlsDomain;
+    private transient @Nullable RateLimiter rateLimiter;
+    private static final Logger LOG = 
LoggerFactory.getLogger(CallExternalServiceFn.class);
+
+    public CallExternalServiceFn(String rlsAddress, String rlsDomain) {
+      this.rlsAddress = rlsAddress;
+      this.rlsDomain = rlsDomain;
+    }
+
+    @Setup
+    public void setup() {
+      // Create the RateLimiterOptions.
+      RateLimiterOptions options = 
RateLimiterOptions.builder().setAddress(rlsAddress).build();
+
+      // Static RateLimtier with pre-configured domain and descriptors
+      RateLimiterFactory factory = new EnvoyRateLimiterFactory(options);
+      RateLimiterContext context =
+          EnvoyRateLimiterContext.builder()
+              .setDomain(rlsDomain)
+              .addDescriptor("database", "users")
+              .build();
+      this.rateLimiter = factory.getLimiter(context);
+    }
+
+    @Teardown
+    public void teardown() {
+      if (rateLimiter != null) {
+        try {
+          rateLimiter.close();
+        } catch (Exception e) {
+          LOG.warn("Failed to close RateLimiter", e);
+        }
+      }
+    }
+
+    @ProcessElement
+    public void processElement(ProcessContext c) throws Exception {
+      String element = c.element();
+      try {
+        Preconditions.checkNotNull(rateLimiter).allow(1);
+      } catch (Exception e) {
+        throw new RuntimeException("Failed to acquire rate limit token", e);
+      }
+
+      // Simulate external API call
+      LOG.info("Processing: " + element);
+      Thread.sleep(100);
+      c.output("Processed: " + element);
+    }
+  }
+
+  public static void main(String[] args) {
+    Options options = 
PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
+    Pipeline p = Pipeline.create(options);
+
+    p.apply(
+            "CreateItems",
+            Create.of(
+                IntStream.range(0, 100).mapToObj(i -> "item" + 
i).collect(Collectors.toList())))
+        .apply(
+            "CallExternalService",
+            ParDo.of(
+                new CallExternalServiceFn(
+                    options.getRateLimiterAddress(), 
options.getRateLimiterDomain())));
+
+    p.run().waitUntilFinish();
+  }
+}
diff --git 
a/sdks/java/build-tools/src/main/resources/beam/checkstyle/suppressions.xml 
b/sdks/java/build-tools/src/main/resources/beam/checkstyle/suppressions.xml
index f79bb6cf3bf..1fa51691818 100644
--- a/sdks/java/build-tools/src/main/resources/beam/checkstyle/suppressions.xml
+++ b/sdks/java/build-tools/src/main/resources/beam/checkstyle/suppressions.xml
@@ -62,6 +62,7 @@
   <suppress id="ForbidNonVendoredGrpcProtobuf" 
files=".*sdk.*core.*GroupByKeyIT.*" />
   <suppress id="ForbidNonVendoredGrpcProtobuf" 
files=".*sdk.*core.*ValidateRunnerXlangTest.*" />
   <suppress id="ForbidNonVendoredGrpcProtobuf" files=".*sdk.*extensions.*ml.*" 
/>
+  <suppress id="ForbidNonVendoredGrpcProtobuf" 
files=".*sdk.*io.*components.*ratelimiter.*" />
   <suppress id="ForbidNonVendoredGrpcProtobuf" files=".*sdk.*io.*gcp.*" />
   <suppress id="ForbidNonVendoredGrpcProtobuf" 
files=".*sdk.*io.*googleads.*DummyRateLimitPolicy\.java" />
   <suppress id="ForbidNonVendoredGrpcProtobuf" 
files=".*sdk.*io.*googleads.*GoogleAds.*\.java" />
diff --git a/sdks/java/io/components/build.gradle 
b/sdks/java/io/components/build.gradle
index 25bf9577211..97342009834 100644
--- a/sdks/java/io/components/build.gradle
+++ b/sdks/java/io/components/build.gradle
@@ -26,6 +26,11 @@ ext.summary = "Components for building fully featured IOs"
 
 dependencies {
     implementation project(path: ":sdks:java:core", configuration: "shadow")
+    implementation library.java.auto_value_annotations
+    implementation library.java.envoy_control_plane_api
+    implementation library.java.grpc_api
+    implementation library.java.grpc_stub
+    implementation library.java.grpc_protobuf
     implementation library.java.protobuf_java
     permitUnusedDeclared library.java.protobuf_java // BEAM-11761
     implementation library.java.slf4j_api
diff --git 
a/sdks/java/io/components/src/main/java/org/apache/beam/sdk/io/components/ratelimiter/EnvoyRateLimiter.java
 
b/sdks/java/io/components/src/main/java/org/apache/beam/sdk/io/components/ratelimiter/EnvoyRateLimiter.java
new file mode 100644
index 00000000000..9fc3da80dca
--- /dev/null
+++ 
b/sdks/java/io/components/src/main/java/org/apache/beam/sdk/io/components/ratelimiter/EnvoyRateLimiter.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.components.ratelimiter;
+
+import java.io.IOException;
+
+/**
+ * A lightweight handle for an Envoy-based rate limiter.
+ *
+ * <p>Delegates work to the {@link EnvoyRateLimiterFactory} using the baked-in 
{@link
+ * EnvoyRateLimiterContext}.
+ */
+public class EnvoyRateLimiter implements RateLimiter {
+  private final EnvoyRateLimiterFactory factory;
+  private final EnvoyRateLimiterContext context;
+
+  public EnvoyRateLimiter(EnvoyRateLimiterFactory factory, 
EnvoyRateLimiterContext context) {
+    this.factory = factory;
+    this.context = context;
+  }
+
+  @Override
+  public boolean allow(int permits) throws IOException, InterruptedException {
+    return factory.allow(context, permits);
+  }
+
+  @Override
+  public void close() throws Exception {
+    factory.close();
+  }
+}
diff --git 
a/sdks/java/io/components/src/main/java/org/apache/beam/sdk/io/components/ratelimiter/EnvoyRateLimiterContext.java
 
b/sdks/java/io/components/src/main/java/org/apache/beam/sdk/io/components/ratelimiter/EnvoyRateLimiterContext.java
new file mode 100644
index 00000000000..baebece7962
--- /dev/null
+++ 
b/sdks/java/io/components/src/main/java/org/apache/beam/sdk/io/components/ratelimiter/EnvoyRateLimiterContext.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.components.ratelimiter;
+
+import com.google.auto.value.AutoValue;
+import java.util.Map;
+import org.apache.beam.sdk.schemas.AutoValueSchema;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
+import org.apache.beam.sdk.schemas.annotations.SchemaFieldDescription;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
+import org.checkerframework.checker.nullness.qual.NonNull;
+
+/**
+ * Context for an Envoy Rate Limiter call.
+ *
+ * <p>Contains the domain and descriptors required to define a specific rate 
limit bucket.
+ */
+@DefaultSchema(AutoValueSchema.class)
+@AutoValue
+public abstract class EnvoyRateLimiterContext implements RateLimiterContext {
+
+  @SchemaFieldDescription("Domain of the rate limiter.")
+  public abstract String getDomain();
+
+  @SchemaFieldDescription("Descriptors for the rate limiter.")
+  public abstract ImmutableMap<String, String> getDescriptors();
+
+  public static Builder builder() {
+    return new AutoValue_EnvoyRateLimiterContext.Builder();
+  }
+
+  @AutoValue.Builder
+  public abstract static class Builder {
+    public abstract Builder setDomain(@NonNull String domain);
+
+    public abstract ImmutableMap.Builder<String, String> descriptorsBuilder();
+
+    public Builder addDescriptor(@NonNull String key, @NonNull String value) {
+      descriptorsBuilder().put(key, value);
+      return this;
+    }
+
+    public Builder setDescriptors(@NonNull Map<String, String> descriptors) {
+      descriptorsBuilder().putAll(descriptors);
+      return this;
+    }
+
+    public abstract EnvoyRateLimiterContext build();
+  }
+}
diff --git 
a/sdks/java/io/components/src/main/java/org/apache/beam/sdk/io/components/ratelimiter/EnvoyRateLimiterFactory.java
 
b/sdks/java/io/components/src/main/java/org/apache/beam/sdk/io/components/ratelimiter/EnvoyRateLimiterFactory.java
new file mode 100644
index 00000000000..fa512d38ab9
--- /dev/null
+++ 
b/sdks/java/io/components/src/main/java/org/apache/beam/sdk/io/components/ratelimiter/EnvoyRateLimiterFactory.java
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.components.ratelimiter;
+
+import static 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;
+
+import io.envoyproxy.envoy.extensions.common.ratelimit.v3.RateLimitDescriptor;
+import io.envoyproxy.envoy.service.ratelimit.v3.RateLimitRequest;
+import io.envoyproxy.envoy.service.ratelimit.v3.RateLimitResponse;
+import io.envoyproxy.envoy.service.ratelimit.v3.RateLimitServiceGrpc;
+import io.grpc.StatusRuntimeException;
+import java.io.IOException;
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.io.components.throttling.ThrottlingSignaler;
+import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.metrics.Distribution;
+import org.apache.beam.sdk.metrics.Metrics;
+import org.apache.beam.sdk.util.Sleeper;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** A {@link RateLimiterFactory} for Envoy Rate Limit Service. */
+public class EnvoyRateLimiterFactory implements RateLimiterFactory {
+  private static final Logger LOG = 
LoggerFactory.getLogger(EnvoyRateLimiterFactory.class);
+  private static final int RPC_RETRY_COUNT = 5;
+  private static final long RPC_RETRY_DELAY_MILLIS = 1000;
+
+  private final RateLimiterOptions options;
+
+  private transient volatile @Nullable 
RateLimitServiceGrpc.RateLimitServiceBlockingStub stub;
+  private transient @Nullable RateLimiterClientCache clientCache;
+  private final ThrottlingSignaler throttlingSignaler;
+  private final Sleeper sleeper;
+
+  private final Counter requestsTotal;
+  private final Counter requestsAllowed;
+  private final Counter requestsThrottled;
+  private final Counter rpcErrors;
+  private final Counter rpcRetries;
+  private final Distribution rpcLatency;
+
+  public EnvoyRateLimiterFactory(RateLimiterOptions options) {
+    this(options, Sleeper.DEFAULT);
+  }
+
+  @VisibleForTesting
+  EnvoyRateLimiterFactory(RateLimiterOptions options, Sleeper sleeper) {
+    this.options = options;
+    this.sleeper = sleeper;
+    String namespace = EnvoyRateLimiterFactory.class.getName();
+    this.throttlingSignaler = new ThrottlingSignaler(namespace);
+    this.requestsTotal = Metrics.counter(namespace, 
"ratelimit-requests-total");
+    this.requestsAllowed = Metrics.counter(namespace, 
"ratelimit-requests-allowed");
+    this.requestsThrottled = Metrics.counter(namespace, 
"ratelimit-requests-throttled");
+    this.rpcErrors = Metrics.counter(namespace, "ratelimit-rpc-errors");
+    this.rpcRetries = Metrics.counter(namespace, "ratelimit-rpc-retries");
+    this.rpcLatency = Metrics.distribution(namespace, 
"ratelimit-rpc-latency-ms");
+  }
+
+  @Override
+  public synchronized void close() {
+    if (clientCache != null) {
+      clientCache.release();
+      clientCache = null;
+    }
+    stub = null;
+  }
+
+  private void init() {
+    if (stub != null) {
+      return;
+    }
+    synchronized (this) {
+      if (stub == null) {
+        RateLimiterClientCache cache = 
RateLimiterClientCache.getOrCreate(options.getAddress());
+        this.clientCache = cache;
+        stub = RateLimitServiceGrpc.newBlockingStub(cache.getChannel());
+      }
+    }
+  }
+
+  @VisibleForTesting
+  void setStub(RateLimitServiceGrpc.RateLimitServiceBlockingStub stub) {
+    this.stub = stub;
+  }
+
+  @Override
+  public RateLimiter getLimiter(RateLimiterContext context) {
+    if (!(context instanceof EnvoyRateLimiterContext)) {
+      throw new IllegalArgumentException(
+          "EnvoyRateLimiterFactory requires EnvoyRateLimiterContext");
+    }
+    return new EnvoyRateLimiter(this, (EnvoyRateLimiterContext) context);
+  }
+
+  @Override
+  public boolean allow(RateLimiterContext context, int permits)
+      throws IOException, InterruptedException {
+    if (permits == 0) {
+      return true;
+    }
+    if (!(context instanceof EnvoyRateLimiterContext)) {
+      throw new IllegalArgumentException(
+          "EnvoyRateLimiterFactory requires EnvoyRateLimiterContext, got: "
+              + context.getClass().getName());
+    }
+    checkArgument(permits >= 0, "Permits must be non-negative");
+    EnvoyRateLimiterContext envoyContext = (EnvoyRateLimiterContext) context;
+    return fetchTokens(envoyContext, permits);
+  }
+
+  private boolean fetchTokens(EnvoyRateLimiterContext context, int tokens)
+      throws IOException, InterruptedException {
+
+    init();
+    RateLimitServiceGrpc.RateLimitServiceBlockingStub currentStub = stub;
+    if (currentStub == null) {
+      throw new IllegalStateException("RateLimitServiceStub is null");
+    }
+
+    Map<String, String> descriptors = context.getDescriptors();
+    RateLimitDescriptor.Builder descriptorBuilder = 
RateLimitDescriptor.newBuilder();
+
+    for (Map.Entry<String, String> entry : descriptors.entrySet()) {
+      descriptorBuilder.addEntries(
+          RateLimitDescriptor.Entry.newBuilder()
+              .setKey(entry.getKey())
+              .setValue(entry.getValue())
+              .build());
+    }
+
+    RateLimitRequest request =
+        RateLimitRequest.newBuilder()
+            .setDomain(context.getDomain())
+            .setHitsAddend(tokens)
+            .addDescriptors(descriptorBuilder.build())
+            .build();
+
+    Integer maxRetries = options.getMaxRetries();
+    long timeoutMillis = options.getTimeout().toMillis();
+
+    requestsTotal.inc();
+    int attempt = 0;
+    while (true) {
+      if (maxRetries != null && attempt > maxRetries) {
+        return false;
+      }
+
+      // RPC Retry Loop
+      RateLimitResponse response = null;
+      long startTime = System.currentTimeMillis();
+      for (int i = 0; i < RPC_RETRY_COUNT; i++) {
+        try {
+          response =
+              currentStub
+                  .withDeadlineAfter(timeoutMillis, 
java.util.concurrent.TimeUnit.MILLISECONDS)
+                  .shouldRateLimit(request);
+          long endTime = System.currentTimeMillis();
+          rpcLatency.update(endTime - startTime);
+          break;
+        } catch (StatusRuntimeException e) {
+          rpcErrors.inc();
+          if (i == RPC_RETRY_COUNT - 1) {
+            LOG.error("RateLimitService call failed after {} attempts", 
RPC_RETRY_COUNT, e);
+            throw new IOException("Failed to call Rate Limit Service", e);
+          }
+          rpcRetries.inc();
+          LOG.warn("RateLimitService call failed, retrying", e);
+          if (sleeper != null) {
+            sleeper.sleep(RPC_RETRY_DELAY_MILLIS);
+          }
+        }
+      }
+
+      if (response == null) {
+        throw new IOException("Failed to get response from Rate Limit 
Service");
+      }
+
+      if (response.getOverallCode() == RateLimitResponse.Code.OK) {
+        requestsAllowed.inc();
+        return true;
+      } else if (response.getOverallCode() == 
RateLimitResponse.Code.OVER_LIMIT) {
+        long sleepMillis = 0;
+        for (RateLimitResponse.DescriptorStatus status : 
response.getStatusesList()) {
+          if (status.getCode() == RateLimitResponse.Code.OVER_LIMIT
+              && status.hasDurationUntilReset()) {
+            long durationMillis =
+                status.getDurationUntilReset().getSeconds() * 1000
+                    + status.getDurationUntilReset().getNanos() / 1_000_000;
+            if (durationMillis > sleepMillis) {
+              sleepMillis = durationMillis;
+            }
+          }
+        }
+
+        if (sleepMillis == 0) {
+          sleepMillis = 1000;
+        }
+
+        long jitter =
+            (long)
+                (java.util.concurrent.ThreadLocalRandom.current().nextDouble()
+                    * (0.1 * sleepMillis));
+        sleepMillis += jitter;
+
+        LOG.warn("Throttled by RLS, sleeping for {} ms", sleepMillis);
+        if (sleeper != null) {
+          requestsThrottled.inc();
+          if (throttlingSignaler != null) {
+            throttlingSignaler.signalThrottling(sleepMillis);
+          }
+          sleeper.sleep(sleepMillis);
+        }
+        attempt++;
+      } else {
+        throw new IOException(
+            "Rate Limit Service returned unknown code: " + 
response.getOverallCode());
+      }
+    }
+  }
+}
diff --git 
a/sdks/java/io/components/src/main/java/org/apache/beam/sdk/io/components/ratelimiter/RateLimiterClientCache.java
 
b/sdks/java/io/components/src/main/java/org/apache/beam/sdk/io/components/ratelimiter/RateLimiterClientCache.java
new file mode 100644
index 00000000000..5fde6fbe898
--- /dev/null
+++ 
b/sdks/java/io/components/src/main/java/org/apache/beam/sdk/io/components/ratelimiter/RateLimiterClientCache.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.components.ratelimiter;
+
+import io.grpc.ManagedChannel;
+import io.grpc.ManagedChannelBuilder;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A static cache for {@link ManagedChannel}s to Rate Limit Service.
+ *
+ * <p>This class ensures that multiple DoFn instances (threads) in the same 
Worker sharing the same
+ * RLS address will share a single {@link ManagedChannel}.
+ *
+ * <p>It uses reference counting to close the channel when it is no longer in 
use by any RateLimiter
+ * instance.
+ */
+public class RateLimiterClientCache {
+  private static final Logger LOG = 
LoggerFactory.getLogger(RateLimiterClientCache.class);
+  private static final Map<String, RateLimiterClientCache> CACHE = new 
ConcurrentHashMap<>();
+  private static final int KEEP_ALIVE_TIME_SECONDS = 60;
+  private static final int KEEP_ALIVE_TIMEOUT_SECONDS = 15;
+
+  private final ManagedChannel channel;
+  private final String address;
+  private int refCount = 0;
+
+  private RateLimiterClientCache(String address) {
+    this.address = address;
+    LOG.info("Creating new ManagedChannel for RLS at {}", address);
+    this.channel =
+        ManagedChannelBuilder.forTarget(address)
+            .usePlaintext()
+            .keepAliveTime(KEEP_ALIVE_TIME_SECONDS, TimeUnit.SECONDS)
+            .keepAliveTimeout(KEEP_ALIVE_TIMEOUT_SECONDS, TimeUnit.SECONDS)
+            .keepAliveWithoutCalls(true)
+            .build();
+  }
+
+  /**
+   * Gets or creates a cached client for the given address. Increments the 
reference count.
+   * Synchronized on the class to prevent race conditions when multiple 
instances call getOrCreate()
+   * simultaneously
+   */
+  public static synchronized RateLimiterClientCache getOrCreate(String 
address) {
+    RateLimiterClientCache client = CACHE.get(address);
+    if (client == null) {
+      client = new RateLimiterClientCache(address);
+      CACHE.put(address, client);
+    }
+    client.refCount++;
+    LOG.debug("Referenced RLS Channel for {}. New RefCount: {}", address, 
client.refCount);
+    return client;
+  }
+
+  public ManagedChannel getChannel() {
+    return channel;
+  }
+
+  /**
+   * Releases the client. Decrements the reference count. If reference count 
reaches 0, the channel
+   * is shut down and removed from the cache. Synchronized on the class to 
prevent race conditions
+   * when multiple threads call release() simultaneously and to prevent race 
conditions between
+   * getOrCreate() and release() calls.
+   */
+  public void release() {
+    synchronized (RateLimiterClientCache.class) {
+      refCount--;
+      LOG.debug("Released RLS Channel for {}. New RefCount: {}", address, 
refCount);
+      if (refCount <= 0) {
+        LOG.info("Closing ManagedChannel for RLS at {}", address);
+        CACHE.remove(address);
+        channel.shutdown();
+        try {
+          channel.awaitTermination(10, TimeUnit.SECONDS);
+        } catch (InterruptedException e) {
+          LOG.error("Couldn't gracefully close gRPC channel={}", channel, e);
+          Thread.currentThread().interrupt();
+        }
+        channel.shutdownNow();
+      }
+    }
+  }
+}
diff --git 
a/sdks/java/io/components/src/main/java/org/apache/beam/sdk/io/components/ratelimiter/RateLimiterOptions.java
 
b/sdks/java/io/components/src/main/java/org/apache/beam/sdk/io/components/ratelimiter/RateLimiterOptions.java
new file mode 100644
index 00000000000..acf49622eb9
--- /dev/null
+++ 
b/sdks/java/io/components/src/main/java/org/apache/beam/sdk/io/components/ratelimiter/RateLimiterOptions.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.components.ratelimiter;
+
+import static 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;
+
+import com.google.auto.value.AutoValue;
+import java.io.Serializable;
+import java.time.Duration;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.schemas.AutoValueSchema;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
+import org.apache.beam.sdk.schemas.annotations.SchemaFieldDescription;
+
+/** Configuration options for {@link RateLimiterFactory}. */
+@DefaultSchema(AutoValueSchema.class)
+@AutoValue
+public abstract class RateLimiterOptions implements Serializable {
+  @SchemaFieldDescription("Address of the rate limiter")
+  public abstract String getAddress();
+
+  @Nullable
+  @SchemaFieldDescription("Maximum number of retries, defaults to infinite")
+  public abstract Integer getMaxRetries();
+
+  @SchemaFieldDescription("Timeout for rate limiter operations, defaults to 5 
seconds")
+  public abstract Duration getTimeout();
+
+  public static Builder builder() {
+    return new 
AutoValue_RateLimiterOptions.Builder().setTimeout(Duration.ofSeconds(5));
+  }
+
+  @AutoValue.Builder
+  public abstract static class Builder {
+    public abstract Builder setAddress(String address);
+
+    public abstract Builder setMaxRetries(Integer maxRetries);
+
+    public abstract Builder setTimeout(Duration timeout);
+
+    abstract RateLimiterOptions autoBuild();
+
+    public RateLimiterOptions build() {
+      RateLimiterOptions options = autoBuild();
+      checkArgument(options.getTimeout().compareTo(Duration.ZERO) > 0, 
"Timeout must be positive");
+      Integer maxRetries = options.getMaxRetries();
+      if (maxRetries != null) {
+        checkArgument(maxRetries >= 0, "MaxRetries must be non-negative");
+      }
+      return options;
+    }
+  }
+}
diff --git 
a/sdks/java/io/components/src/test/java/org/apache/beam/sdk/io/components/ratelimiter/EnvoyRateLimiterTest.java
 
b/sdks/java/io/components/src/test/java/org/apache/beam/sdk/io/components/ratelimiter/EnvoyRateLimiterTest.java
new file mode 100644
index 00000000000..e94d0b42eb3
--- /dev/null
+++ 
b/sdks/java/io/components/src/test/java/org/apache/beam/sdk/io/components/ratelimiter/EnvoyRateLimiterTest.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.components.ratelimiter;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThrows;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.Mockito.verify;
+
+import io.envoyproxy.envoy.service.ratelimit.v3.RateLimitRequest;
+import io.envoyproxy.envoy.service.ratelimit.v3.RateLimitResponse;
+import io.envoyproxy.envoy.service.ratelimit.v3.RateLimitServiceGrpc;
+import io.grpc.ManagedChannel;
+import io.grpc.Server;
+import io.grpc.Status;
+import io.grpc.inprocess.InProcessChannelBuilder;
+import io.grpc.inprocess.InProcessServerBuilder;
+import io.grpc.stub.StreamObserver;
+import java.io.IOException;
+import org.apache.beam.sdk.util.Sleeper;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+
+/** Tests for {@link EnvoyRateLimiterFactory}. */
+@RunWith(JUnit4.class)
+public class EnvoyRateLimiterTest {
+  @Mock private Sleeper sleeper;
+
+  private EnvoyRateLimiterFactory factory;
+  private RateLimiterOptions options;
+  private EnvoyRateLimiterContext context;
+
+  private Server server;
+  private ManagedChannel channel;
+  private TestRateLimitService service;
+
+  @Before
+  public void setUp() throws Exception {
+    MockitoAnnotations.openMocks(this);
+    options =
+        RateLimiterOptions.builder()
+            .setAddress("localhost:8081")
+            .setTimeout(java.time.Duration.ofSeconds(1))
+            .build();
+
+    String serverName = InProcessServerBuilder.generateName();
+    service = new TestRateLimitService();
+    server =
+        InProcessServerBuilder.forName(serverName)
+            .directExecutor()
+            .addService(service)
+            .build()
+            .start();
+    channel = 
InProcessChannelBuilder.forName(serverName).directExecutor().build();
+
+    factory = new EnvoyRateLimiterFactory(options, sleeper);
+    factory.setStub(RateLimitServiceGrpc.newBlockingStub(channel));
+
+    context =
+        EnvoyRateLimiterContext.builder()
+            .setDomain("test-domain")
+            .addDescriptor("key", "value")
+            .build();
+  }
+
+  @After
+  public void tearDown() {
+    if (channel != null) {
+      channel.shutdownNow();
+    }
+    if (server != null) {
+      server.shutdownNow();
+    }
+  }
+
+  @Test
+  public void testAllow_OK() throws Exception {
+    service.responseToReturn =
+        
RateLimitResponse.newBuilder().setOverallCode(RateLimitResponse.Code.OK).build();
+
+    assertTrue(factory.allow(context, 1));
+  }
+
+  @Test
+  public void testAllow_OverLimit() throws Exception {
+    service.responseToReturn =
+        RateLimitResponse.newBuilder()
+            .setOverallCode(RateLimitResponse.Code.OVER_LIMIT)
+            .addStatuses(
+                RateLimitResponse.DescriptorStatus.newBuilder()
+                    .setCode(RateLimitResponse.Code.OVER_LIMIT)
+                    .setDurationUntilReset(
+                        
com.google.protobuf.Duration.newBuilder().setSeconds(1).build())
+                    .build())
+            .build();
+
+    factory =
+        new EnvoyRateLimiterFactory(
+            RateLimiterOptions.builder()
+                .setAddress("foo")
+                .setTimeout(java.time.Duration.ofSeconds(1))
+                .setMaxRetries(1)
+                .build(),
+            sleeper);
+    factory.setStub(RateLimitServiceGrpc.newBlockingStub(channel));
+
+    assertFalse(factory.allow(context, 1));
+
+    // Verify sleep was called.
+    verify(sleeper, org.mockito.Mockito.atLeastOnce()).sleep(anyLong());
+  }
+
+  @Test
+  public void testAllow_RpcError() throws Exception {
+    service.errorToThrow = Status.UNAVAILABLE.asRuntimeException();
+    assertThrows(IOException.class, () -> factory.allow(context, 1));
+  }
+
+  @Test
+  public void testInvalidContext() {
+    assertThrows(
+        IllegalArgumentException.class, () -> factory.allow(new 
RateLimiterContext() {}, 1));
+  }
+
+  static class TestRateLimitService extends 
RateLimitServiceGrpc.RateLimitServiceImplBase {
+    volatile RateLimitResponse responseToReturn;
+    volatile RuntimeException errorToThrow;
+
+    @Override
+    public void shouldRateLimit(
+        RateLimitRequest request, StreamObserver<RateLimitResponse> 
responseObserver) {
+      if (errorToThrow != null) {
+        responseObserver.onError(errorToThrow);
+        return;
+      }
+      if (responseToReturn != null) {
+        responseObserver.onNext(responseToReturn);
+        responseObserver.onCompleted();
+      } else {
+        // Default OK
+        responseObserver.onNext(
+            
RateLimitResponse.newBuilder().setOverallCode(RateLimitResponse.Code.OK).build());
+        responseObserver.onCompleted();
+      }
+    }
+  }
+}
diff --git 
a/sdks/java/io/components/src/test/java/org/apache/beam/sdk/io/components/ratelimiter/RateLimiterClientCacheTest.java
 
b/sdks/java/io/components/src/test/java/org/apache/beam/sdk/io/components/ratelimiter/RateLimiterClientCacheTest.java
new file mode 100644
index 00000000000..4eb61b279c3
--- /dev/null
+++ 
b/sdks/java/io/components/src/test/java/org/apache/beam/sdk/io/components/ratelimiter/RateLimiterClientCacheTest.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.components.ratelimiter;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotSame;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for {@link RateLimiterClientCache}. */
+@RunWith(JUnit4.class)
+public class RateLimiterClientCacheTest {
+
+  @Test
+  public void testGetOrCreate_SameAddress() {
+    String address = "addr1";
+    RateLimiterClientCache client1 = 
RateLimiterClientCache.getOrCreate(address);
+    RateLimiterClientCache client2 = 
RateLimiterClientCache.getOrCreate(address);
+
+    assertSame(client1, client2);
+    assertFalse(client1.getChannel().isShutdown());
+
+    // cleanup
+    client1.release();
+    // client2 is still using the same channel
+    assertFalse(client1.getChannel().isShutdown());
+    client2.release();
+    assertTrue(client1.getChannel().isShutdown());
+  }
+
+  @Test
+  public void testGetOrCreate_DifferentAddress_ReturnsDifferentInstances() {
+    RateLimiterClientCache client1 = 
RateLimiterClientCache.getOrCreate("addr1");
+    RateLimiterClientCache client2 = 
RateLimiterClientCache.getOrCreate("addr2");
+
+    assertNotSame(client1, client2);
+
+    assertFalse(client1.getChannel().isShutdown());
+    assertFalse(client2.getChannel().isShutdown());
+    client1.release();
+    assertTrue(client1.getChannel().isShutdown());
+    client2.release();
+    assertTrue(client2.getChannel().isShutdown());
+  }
+
+  @Test
+  public void testConcurrency() throws InterruptedException, 
ExecutionException {
+    int threads = 10;
+    int iterations = 100;
+    String address = "concurrent-addr";
+    ExecutorService pool = Executors.newFixedThreadPool(threads);
+    List<Future<Boolean>> futures = new ArrayList<>();
+
+    for (int i = 0; i < threads; i++) {
+      futures.add(
+          pool.submit(
+              new Callable<Boolean>() {
+                @Override
+                public Boolean call() {
+                  for (int j = 0; j < iterations; j++) {
+                    RateLimiterClientCache client = 
RateLimiterClientCache.getOrCreate(address);
+                    // do some tiny work
+                    try {
+                      Thread.sleep(1);
+                    } catch (InterruptedException e) {
+                      Thread.currentThread().interrupt();
+                    }
+                    client.release();
+                  }
+                  return true;
+                }
+              }));
+    }
+
+    for (Future<Boolean> f : futures) {
+      assertTrue(f.get());
+    }
+
+    pool.shutdown();
+    pool.awaitTermination(5, TimeUnit.SECONDS);
+
+    // After all threads are done, cache should be empty or create new one 
cleanly
+    RateLimiterClientCache client = 
RateLimiterClientCache.getOrCreate(address);
+    assertFalse(client.getChannel().isShutdown());
+    client.release();
+    assertTrue(client.getChannel().isShutdown());
+  }
+}
diff --git 
a/sdks/java/io/components/src/test/java/org/apache/beam/sdk/io/components/ratelimiter/RateLimiterOptionsTest.java
 
b/sdks/java/io/components/src/test/java/org/apache/beam/sdk/io/components/ratelimiter/RateLimiterOptionsTest.java
new file mode 100644
index 00000000000..cb8674b4e50
--- /dev/null
+++ 
b/sdks/java/io/components/src/test/java/org/apache/beam/sdk/io/components/ratelimiter/RateLimiterOptionsTest.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.components.ratelimiter;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThrows;
+
+import java.time.Duration;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for {@link RateLimiterOptions}. */
+@RunWith(JUnit4.class)
+public class RateLimiterOptionsTest {
+
+  @Test
+  public void testValidOptions() {
+    RateLimiterOptions options =
+        RateLimiterOptions.builder()
+            .setAddress("localhost:8081")
+            .setTimeout(Duration.ofSeconds(1))
+            .setMaxRetries(3)
+            .build();
+
+    assertEquals("localhost:8081", options.getAddress());
+    assertEquals(Duration.ofSeconds(1), options.getTimeout());
+    assertEquals(Integer.valueOf(3), options.getMaxRetries());
+  }
+
+  @Test
+  public void testNegativeTimeout() {
+    assertThrows(
+        IllegalArgumentException.class,
+        () ->
+            RateLimiterOptions.builder()
+                .setAddress("localhost:8081")
+                .setTimeout(Duration.ofSeconds(-1))
+                .build());
+  }
+
+  @Test
+  public void testZeroTimeout() {
+    assertThrows(
+        IllegalArgumentException.class,
+        () ->
+            RateLimiterOptions.builder()
+                .setAddress("localhost:8081")
+                .setTimeout(Duration.ZERO)
+                .build());
+  }
+
+  @Test
+  public void testNegativeMaxRetries() {
+    assertThrows(
+        IllegalArgumentException.class,
+        () -> 
RateLimiterOptions.builder().setAddress("localhost:8081").setMaxRetries(-1).build());
+  }
+
+  @Test
+  public void testNullMaxRetriesIsAllowed() {
+    RateLimiterOptions options =
+        
RateLimiterOptions.builder().setAddress("localhost:8081").setMaxRetries(null).build();
+    assertEquals(null, options.getMaxRetries());
+  }
+}


Reply via email to