This is an automated email from the ASF dual-hosted git repository.
damccorm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new cbf36740a53 Add Support for EnvoyRateLimiter Implementation (#37573)
cbf36740a53 is described below
commit cbf36740a5362bd43c013ff9cb278ace7c664332
Author: Tarun Annapareddy <[email protected]>
AuthorDate: Fri Feb 13 06:20:17 2026 -0800
Add Support for EnvoyRateLimiter Implementation (#37573)
* Add EnvoyRateLimiter Implementation
* Add example
* fix style check
* simplify teardown
Co-authored-by: gemini-code-assist[bot]
<176961590+gemini-code-assist[bot]@users.noreply.github.com>
* add more jitter
Co-authored-by: gemini-code-assist[bot]
<176961590+gemini-code-assist[bot]@users.noreply.github.com>
* handle thread interrupt
Co-authored-by: gemini-code-assist[bot]
<176961590+gemini-code-assist[bot]@users.noreply.github.com>
* add connection keep-alive configs
---------
Co-authored-by: gemini-code-assist[bot]
<176961590+gemini-code-assist[bot]@users.noreply.github.com>
---
.../org/apache/beam/gradle/BeamModulePlugin.groovy | 1 +
examples/java/build.gradle | 1 +
.../apache/beam/examples/RateLimiterSimple.java | 131 ++++++++++++
.../resources/beam/checkstyle/suppressions.xml | 1 +
sdks/java/io/components/build.gradle | 5 +
.../components/ratelimiter/EnvoyRateLimiter.java | 46 ++++
.../ratelimiter/EnvoyRateLimiterContext.java | 65 ++++++
.../ratelimiter/EnvoyRateLimiterFactory.java | 238 +++++++++++++++++++++
.../ratelimiter/RateLimiterClientCache.java | 103 +++++++++
.../components/ratelimiter/RateLimiterOptions.java | 68 ++++++
.../ratelimiter/EnvoyRateLimiterTest.java | 168 +++++++++++++++
.../ratelimiter/RateLimiterClientCacheTest.java | 115 ++++++++++
.../ratelimiter/RateLimiterOptionsTest.java | 81 +++++++
13 files changed, 1023 insertions(+)
diff --git
a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
index df2db7df71b..325af8a8e19 100644
--- a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
+++ b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
@@ -726,6 +726,7 @@ class BeamModulePlugin implements Plugin<Project> {
commons_logging :
"commons-logging:commons-logging:1.2",
commons_math3 :
"org.apache.commons:commons-math3:3.6.1",
dbcp2 :
"org.apache.commons:commons-dbcp2:$dbcp2_version",
+ envoy_control_plane_api :
"io.envoyproxy.controlplane:api:1.0.49",
error_prone_annotations :
"com.google.errorprone:error_prone_annotations:$errorprone_version",
failsafe :
"dev.failsafe:failsafe:3.3.0",
flogger_system_backend :
"com.google.flogger:flogger-system-backend:0.7.4",
diff --git a/examples/java/build.gradle b/examples/java/build.gradle
index 5334538cc09..068c0d1b56f 100644
--- a/examples/java/build.gradle
+++ b/examples/java/build.gradle
@@ -52,6 +52,7 @@ dependencies {
implementation project(":sdks:java:extensions:avro")
implementation project(":sdks:java:extensions:google-cloud-platform-core")
implementation project(":sdks:java:extensions:python")
+ implementation project(":sdks:java:io:components")
implementation project(":sdks:java:io:google-cloud-platform")
implementation project(":sdks:java:io:kafka")
implementation project(":sdks:java:extensions:ml")
diff --git
a/examples/java/src/main/java/org/apache/beam/examples/RateLimiterSimple.java
b/examples/java/src/main/java/org/apache/beam/examples/RateLimiterSimple.java
new file mode 100644
index 00000000000..a33e99e4b23
--- /dev/null
+++
b/examples/java/src/main/java/org/apache/beam/examples/RateLimiterSimple.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.examples;
+
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.io.components.ratelimiter.EnvoyRateLimiterContext;
+import org.apache.beam.sdk.io.components.ratelimiter.EnvoyRateLimiterFactory;
+import org.apache.beam.sdk.io.components.ratelimiter.RateLimiter;
+import org.apache.beam.sdk.io.components.ratelimiter.RateLimiterContext;
+import org.apache.beam.sdk.io.components.ratelimiter.RateLimiterFactory;
+import org.apache.beam.sdk.io.components.ratelimiter.RateLimiterOptions;
+import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A simple example demonstrating how to use the {@link RateLimiter} in a
custom {@link DoFn}.
+ *
+ * <p>This pipeline creates a small set of elements and processes them using a
DoFn that calls an
+ * external service (simulated). The processing is rate-limited using an Envoy
Rate Limit Service.
+ *
+ * <p>To run this example, you need a running Envoy Rate Limit Service.
+ */
+public class RateLimiterSimple {
+
+ public interface Options extends PipelineOptions {
+ @Description("Address of the Envoy Rate Limit Service(eg:localhost:8081)")
+ String getRateLimiterAddress();
+
+ void setRateLimiterAddress(String value);
+
+ @Description("Domain for the Rate Limit Service(eg:mydomain)")
+ String getRateLimiterDomain();
+
+ void setRateLimiterDomain(String value);
+ }
+
+ static class CallExternalServiceFn extends DoFn<String, String> {
+ private final String rlsAddress;
+ private final String rlsDomain;
+ private transient @Nullable RateLimiter rateLimiter;
+ private static final Logger LOG =
LoggerFactory.getLogger(CallExternalServiceFn.class);
+
+ public CallExternalServiceFn(String rlsAddress, String rlsDomain) {
+ this.rlsAddress = rlsAddress;
+ this.rlsDomain = rlsDomain;
+ }
+
+ @Setup
+ public void setup() {
+ // Create the RateLimiterOptions.
+ RateLimiterOptions options =
RateLimiterOptions.builder().setAddress(rlsAddress).build();
+
+ // Static RateLimtier with pre-configured domain and descriptors
+ RateLimiterFactory factory = new EnvoyRateLimiterFactory(options);
+ RateLimiterContext context =
+ EnvoyRateLimiterContext.builder()
+ .setDomain(rlsDomain)
+ .addDescriptor("database", "users")
+ .build();
+ this.rateLimiter = factory.getLimiter(context);
+ }
+
+ @Teardown
+ public void teardown() {
+ if (rateLimiter != null) {
+ try {
+ rateLimiter.close();
+ } catch (Exception e) {
+ LOG.warn("Failed to close RateLimiter", e);
+ }
+ }
+ }
+
+ @ProcessElement
+ public void processElement(ProcessContext c) throws Exception {
+ String element = c.element();
+ try {
+ Preconditions.checkNotNull(rateLimiter).allow(1);
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to acquire rate limit token", e);
+ }
+
+ // Simulate external API call
+ LOG.info("Processing: " + element);
+ Thread.sleep(100);
+ c.output("Processed: " + element);
+ }
+ }
+
+ public static void main(String[] args) {
+ Options options =
PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
+ Pipeline p = Pipeline.create(options);
+
+ p.apply(
+ "CreateItems",
+ Create.of(
+ IntStream.range(0, 100).mapToObj(i -> "item" +
i).collect(Collectors.toList())))
+ .apply(
+ "CallExternalService",
+ ParDo.of(
+ new CallExternalServiceFn(
+ options.getRateLimiterAddress(),
options.getRateLimiterDomain())));
+
+ p.run().waitUntilFinish();
+ }
+}
diff --git
a/sdks/java/build-tools/src/main/resources/beam/checkstyle/suppressions.xml
b/sdks/java/build-tools/src/main/resources/beam/checkstyle/suppressions.xml
index f79bb6cf3bf..1fa51691818 100644
--- a/sdks/java/build-tools/src/main/resources/beam/checkstyle/suppressions.xml
+++ b/sdks/java/build-tools/src/main/resources/beam/checkstyle/suppressions.xml
@@ -62,6 +62,7 @@
<suppress id="ForbidNonVendoredGrpcProtobuf"
files=".*sdk.*core.*GroupByKeyIT.*" />
<suppress id="ForbidNonVendoredGrpcProtobuf"
files=".*sdk.*core.*ValidateRunnerXlangTest.*" />
<suppress id="ForbidNonVendoredGrpcProtobuf" files=".*sdk.*extensions.*ml.*"
/>
+ <suppress id="ForbidNonVendoredGrpcProtobuf"
files=".*sdk.*io.*components.*ratelimiter.*" />
<suppress id="ForbidNonVendoredGrpcProtobuf" files=".*sdk.*io.*gcp.*" />
<suppress id="ForbidNonVendoredGrpcProtobuf"
files=".*sdk.*io.*googleads.*DummyRateLimitPolicy\.java" />
<suppress id="ForbidNonVendoredGrpcProtobuf"
files=".*sdk.*io.*googleads.*GoogleAds.*\.java" />
diff --git a/sdks/java/io/components/build.gradle
b/sdks/java/io/components/build.gradle
index 25bf9577211..97342009834 100644
--- a/sdks/java/io/components/build.gradle
+++ b/sdks/java/io/components/build.gradle
@@ -26,6 +26,11 @@ ext.summary = "Components for building fully featured IOs"
dependencies {
implementation project(path: ":sdks:java:core", configuration: "shadow")
+ implementation library.java.auto_value_annotations
+ implementation library.java.envoy_control_plane_api
+ implementation library.java.grpc_api
+ implementation library.java.grpc_stub
+ implementation library.java.grpc_protobuf
implementation library.java.protobuf_java
permitUnusedDeclared library.java.protobuf_java // BEAM-11761
implementation library.java.slf4j_api
diff --git
a/sdks/java/io/components/src/main/java/org/apache/beam/sdk/io/components/ratelimiter/EnvoyRateLimiter.java
b/sdks/java/io/components/src/main/java/org/apache/beam/sdk/io/components/ratelimiter/EnvoyRateLimiter.java
new file mode 100644
index 00000000000..9fc3da80dca
--- /dev/null
+++
b/sdks/java/io/components/src/main/java/org/apache/beam/sdk/io/components/ratelimiter/EnvoyRateLimiter.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.components.ratelimiter;
+
+import java.io.IOException;
+
+/**
+ * A lightweight handle for an Envoy-based rate limiter.
+ *
+ * <p>Delegates work to the {@link EnvoyRateLimiterFactory} using the baked-in
{@link
+ * EnvoyRateLimiterContext}.
+ */
+public class EnvoyRateLimiter implements RateLimiter {
+ private final EnvoyRateLimiterFactory factory;
+ private final EnvoyRateLimiterContext context;
+
+ public EnvoyRateLimiter(EnvoyRateLimiterFactory factory,
EnvoyRateLimiterContext context) {
+ this.factory = factory;
+ this.context = context;
+ }
+
+ @Override
+ public boolean allow(int permits) throws IOException, InterruptedException {
+ return factory.allow(context, permits);
+ }
+
+ @Override
+ public void close() throws Exception {
+ factory.close();
+ }
+}
diff --git
a/sdks/java/io/components/src/main/java/org/apache/beam/sdk/io/components/ratelimiter/EnvoyRateLimiterContext.java
b/sdks/java/io/components/src/main/java/org/apache/beam/sdk/io/components/ratelimiter/EnvoyRateLimiterContext.java
new file mode 100644
index 00000000000..baebece7962
--- /dev/null
+++
b/sdks/java/io/components/src/main/java/org/apache/beam/sdk/io/components/ratelimiter/EnvoyRateLimiterContext.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.components.ratelimiter;
+
+import com.google.auto.value.AutoValue;
+import java.util.Map;
+import org.apache.beam.sdk.schemas.AutoValueSchema;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
+import org.apache.beam.sdk.schemas.annotations.SchemaFieldDescription;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
+import org.checkerframework.checker.nullness.qual.NonNull;
+
+/**
+ * Context for an Envoy Rate Limiter call.
+ *
+ * <p>Contains the domain and descriptors required to define a specific rate
limit bucket.
+ */
+@DefaultSchema(AutoValueSchema.class)
+@AutoValue
+public abstract class EnvoyRateLimiterContext implements RateLimiterContext {
+
+ @SchemaFieldDescription("Domain of the rate limiter.")
+ public abstract String getDomain();
+
+ @SchemaFieldDescription("Descriptors for the rate limiter.")
+ public abstract ImmutableMap<String, String> getDescriptors();
+
+ public static Builder builder() {
+ return new AutoValue_EnvoyRateLimiterContext.Builder();
+ }
+
+ @AutoValue.Builder
+ public abstract static class Builder {
+ public abstract Builder setDomain(@NonNull String domain);
+
+ public abstract ImmutableMap.Builder<String, String> descriptorsBuilder();
+
+ public Builder addDescriptor(@NonNull String key, @NonNull String value) {
+ descriptorsBuilder().put(key, value);
+ return this;
+ }
+
+ public Builder setDescriptors(@NonNull Map<String, String> descriptors) {
+ descriptorsBuilder().putAll(descriptors);
+ return this;
+ }
+
+ public abstract EnvoyRateLimiterContext build();
+ }
+}
diff --git
a/sdks/java/io/components/src/main/java/org/apache/beam/sdk/io/components/ratelimiter/EnvoyRateLimiterFactory.java
b/sdks/java/io/components/src/main/java/org/apache/beam/sdk/io/components/ratelimiter/EnvoyRateLimiterFactory.java
new file mode 100644
index 00000000000..fa512d38ab9
--- /dev/null
+++
b/sdks/java/io/components/src/main/java/org/apache/beam/sdk/io/components/ratelimiter/EnvoyRateLimiterFactory.java
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.components.ratelimiter;
+
+import static
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;
+
+import io.envoyproxy.envoy.extensions.common.ratelimit.v3.RateLimitDescriptor;
+import io.envoyproxy.envoy.service.ratelimit.v3.RateLimitRequest;
+import io.envoyproxy.envoy.service.ratelimit.v3.RateLimitResponse;
+import io.envoyproxy.envoy.service.ratelimit.v3.RateLimitServiceGrpc;
+import io.grpc.StatusRuntimeException;
+import java.io.IOException;
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.io.components.throttling.ThrottlingSignaler;
+import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.metrics.Distribution;
+import org.apache.beam.sdk.metrics.Metrics;
+import org.apache.beam.sdk.util.Sleeper;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** A {@link RateLimiterFactory} for Envoy Rate Limit Service. */
+public class EnvoyRateLimiterFactory implements RateLimiterFactory {
+ private static final Logger LOG =
LoggerFactory.getLogger(EnvoyRateLimiterFactory.class);
+ private static final int RPC_RETRY_COUNT = 5;
+ private static final long RPC_RETRY_DELAY_MILLIS = 1000;
+
+ private final RateLimiterOptions options;
+
+ private transient volatile @Nullable
RateLimitServiceGrpc.RateLimitServiceBlockingStub stub;
+ private transient @Nullable RateLimiterClientCache clientCache;
+ private final ThrottlingSignaler throttlingSignaler;
+ private final Sleeper sleeper;
+
+ private final Counter requestsTotal;
+ private final Counter requestsAllowed;
+ private final Counter requestsThrottled;
+ private final Counter rpcErrors;
+ private final Counter rpcRetries;
+ private final Distribution rpcLatency;
+
+ public EnvoyRateLimiterFactory(RateLimiterOptions options) {
+ this(options, Sleeper.DEFAULT);
+ }
+
+ @VisibleForTesting
+ EnvoyRateLimiterFactory(RateLimiterOptions options, Sleeper sleeper) {
+ this.options = options;
+ this.sleeper = sleeper;
+ String namespace = EnvoyRateLimiterFactory.class.getName();
+ this.throttlingSignaler = new ThrottlingSignaler(namespace);
+ this.requestsTotal = Metrics.counter(namespace,
"ratelimit-requests-total");
+ this.requestsAllowed = Metrics.counter(namespace,
"ratelimit-requests-allowed");
+ this.requestsThrottled = Metrics.counter(namespace,
"ratelimit-requests-throttled");
+ this.rpcErrors = Metrics.counter(namespace, "ratelimit-rpc-errors");
+ this.rpcRetries = Metrics.counter(namespace, "ratelimit-rpc-retries");
+ this.rpcLatency = Metrics.distribution(namespace,
"ratelimit-rpc-latency-ms");
+ }
+
+ @Override
+ public synchronized void close() {
+ if (clientCache != null) {
+ clientCache.release();
+ clientCache = null;
+ }
+ stub = null;
+ }
+
+ private void init() {
+ if (stub != null) {
+ return;
+ }
+ synchronized (this) {
+ if (stub == null) {
+ RateLimiterClientCache cache =
RateLimiterClientCache.getOrCreate(options.getAddress());
+ this.clientCache = cache;
+ stub = RateLimitServiceGrpc.newBlockingStub(cache.getChannel());
+ }
+ }
+ }
+
+ @VisibleForTesting
+ void setStub(RateLimitServiceGrpc.RateLimitServiceBlockingStub stub) {
+ this.stub = stub;
+ }
+
+ @Override
+ public RateLimiter getLimiter(RateLimiterContext context) {
+ if (!(context instanceof EnvoyRateLimiterContext)) {
+ throw new IllegalArgumentException(
+ "EnvoyRateLimiterFactory requires EnvoyRateLimiterContext");
+ }
+ return new EnvoyRateLimiter(this, (EnvoyRateLimiterContext) context);
+ }
+
+ @Override
+ public boolean allow(RateLimiterContext context, int permits)
+ throws IOException, InterruptedException {
+ if (permits == 0) {
+ return true;
+ }
+ if (!(context instanceof EnvoyRateLimiterContext)) {
+ throw new IllegalArgumentException(
+ "EnvoyRateLimiterFactory requires EnvoyRateLimiterContext, got: "
+ + context.getClass().getName());
+ }
+ checkArgument(permits >= 0, "Permits must be non-negative");
+ EnvoyRateLimiterContext envoyContext = (EnvoyRateLimiterContext) context;
+ return fetchTokens(envoyContext, permits);
+ }
+
+ private boolean fetchTokens(EnvoyRateLimiterContext context, int tokens)
+ throws IOException, InterruptedException {
+
+ init();
+ RateLimitServiceGrpc.RateLimitServiceBlockingStub currentStub = stub;
+ if (currentStub == null) {
+ throw new IllegalStateException("RateLimitServiceStub is null");
+ }
+
+ Map<String, String> descriptors = context.getDescriptors();
+ RateLimitDescriptor.Builder descriptorBuilder =
RateLimitDescriptor.newBuilder();
+
+ for (Map.Entry<String, String> entry : descriptors.entrySet()) {
+ descriptorBuilder.addEntries(
+ RateLimitDescriptor.Entry.newBuilder()
+ .setKey(entry.getKey())
+ .setValue(entry.getValue())
+ .build());
+ }
+
+ RateLimitRequest request =
+ RateLimitRequest.newBuilder()
+ .setDomain(context.getDomain())
+ .setHitsAddend(tokens)
+ .addDescriptors(descriptorBuilder.build())
+ .build();
+
+ Integer maxRetries = options.getMaxRetries();
+ long timeoutMillis = options.getTimeout().toMillis();
+
+ requestsTotal.inc();
+ int attempt = 0;
+ while (true) {
+ if (maxRetries != null && attempt > maxRetries) {
+ return false;
+ }
+
+ // RPC Retry Loop
+ RateLimitResponse response = null;
+ long startTime = System.currentTimeMillis();
+ for (int i = 0; i < RPC_RETRY_COUNT; i++) {
+ try {
+ response =
+ currentStub
+ .withDeadlineAfter(timeoutMillis,
java.util.concurrent.TimeUnit.MILLISECONDS)
+ .shouldRateLimit(request);
+ long endTime = System.currentTimeMillis();
+ rpcLatency.update(endTime - startTime);
+ break;
+ } catch (StatusRuntimeException e) {
+ rpcErrors.inc();
+ if (i == RPC_RETRY_COUNT - 1) {
+ LOG.error("RateLimitService call failed after {} attempts",
RPC_RETRY_COUNT, e);
+ throw new IOException("Failed to call Rate Limit Service", e);
+ }
+ rpcRetries.inc();
+ LOG.warn("RateLimitService call failed, retrying", e);
+ if (sleeper != null) {
+ sleeper.sleep(RPC_RETRY_DELAY_MILLIS);
+ }
+ }
+ }
+
+ if (response == null) {
+ throw new IOException("Failed to get response from Rate Limit
Service");
+ }
+
+ if (response.getOverallCode() == RateLimitResponse.Code.OK) {
+ requestsAllowed.inc();
+ return true;
+ } else if (response.getOverallCode() ==
RateLimitResponse.Code.OVER_LIMIT) {
+ long sleepMillis = 0;
+ for (RateLimitResponse.DescriptorStatus status :
response.getStatusesList()) {
+ if (status.getCode() == RateLimitResponse.Code.OVER_LIMIT
+ && status.hasDurationUntilReset()) {
+ long durationMillis =
+ status.getDurationUntilReset().getSeconds() * 1000
+ + status.getDurationUntilReset().getNanos() / 1_000_000;
+ if (durationMillis > sleepMillis) {
+ sleepMillis = durationMillis;
+ }
+ }
+ }
+
+ if (sleepMillis == 0) {
+ sleepMillis = 1000;
+ }
+
+ long jitter =
+ (long)
+ (java.util.concurrent.ThreadLocalRandom.current().nextDouble()
+ * (0.1 * sleepMillis));
+ sleepMillis += jitter;
+
+ LOG.warn("Throttled by RLS, sleeping for {} ms", sleepMillis);
+ if (sleeper != null) {
+ requestsThrottled.inc();
+ if (throttlingSignaler != null) {
+ throttlingSignaler.signalThrottling(sleepMillis);
+ }
+ sleeper.sleep(sleepMillis);
+ }
+ attempt++;
+ } else {
+ throw new IOException(
+ "Rate Limit Service returned unknown code: " +
response.getOverallCode());
+ }
+ }
+ }
+}
diff --git
a/sdks/java/io/components/src/main/java/org/apache/beam/sdk/io/components/ratelimiter/RateLimiterClientCache.java
b/sdks/java/io/components/src/main/java/org/apache/beam/sdk/io/components/ratelimiter/RateLimiterClientCache.java
new file mode 100644
index 00000000000..5fde6fbe898
--- /dev/null
+++
b/sdks/java/io/components/src/main/java/org/apache/beam/sdk/io/components/ratelimiter/RateLimiterClientCache.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.components.ratelimiter;
+
+import io.grpc.ManagedChannel;
+import io.grpc.ManagedChannelBuilder;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A static cache for {@link ManagedChannel}s to Rate Limit Service.
+ *
+ * <p>This class ensures that multiple DoFn instances (threads) in the same
Worker sharing the same
+ * RLS address will share a single {@link ManagedChannel}.
+ *
+ * <p>It uses reference counting to close the channel when it is no longer in
use by any RateLimiter
+ * instance.
+ */
+public class RateLimiterClientCache {
+ private static final Logger LOG =
LoggerFactory.getLogger(RateLimiterClientCache.class);
+ private static final Map<String, RateLimiterClientCache> CACHE = new
ConcurrentHashMap<>();
+ private static final int KEEP_ALIVE_TIME_SECONDS = 60;
+ private static final int KEEP_ALIVE_TIMEOUT_SECONDS = 15;
+
+ private final ManagedChannel channel;
+ private final String address;
+ private int refCount = 0;
+
+ private RateLimiterClientCache(String address) {
+ this.address = address;
+ LOG.info("Creating new ManagedChannel for RLS at {}", address);
+ this.channel =
+ ManagedChannelBuilder.forTarget(address)
+ .usePlaintext()
+ .keepAliveTime(KEEP_ALIVE_TIME_SECONDS, TimeUnit.SECONDS)
+ .keepAliveTimeout(KEEP_ALIVE_TIMEOUT_SECONDS, TimeUnit.SECONDS)
+ .keepAliveWithoutCalls(true)
+ .build();
+ }
+
+ /**
+ * Gets or creates a cached client for the given address. Increments the
reference count.
+ * Synchronized on the class to prevent race conditions when multiple
instances call getOrCreate()
+ * simultaneously
+ */
+ public static synchronized RateLimiterClientCache getOrCreate(String
address) {
+ RateLimiterClientCache client = CACHE.get(address);
+ if (client == null) {
+ client = new RateLimiterClientCache(address);
+ CACHE.put(address, client);
+ }
+ client.refCount++;
+ LOG.debug("Referenced RLS Channel for {}. New RefCount: {}", address,
client.refCount);
+ return client;
+ }
+
+ public ManagedChannel getChannel() {
+ return channel;
+ }
+
+ /**
+ * Releases the client. Decrements the reference count. If reference count
reaches 0, the channel
+ * is shut down and removed from the cache. Synchronized on the class to
prevent race conditions
+ * when multiple threads call release() simultaneously and to prevent race
conditions between
+ * getOrCreate() and release() calls.
+ */
+ public void release() {
+ synchronized (RateLimiterClientCache.class) {
+ refCount--;
+ LOG.debug("Released RLS Channel for {}. New RefCount: {}", address,
refCount);
+ if (refCount <= 0) {
+ LOG.info("Closing ManagedChannel for RLS at {}", address);
+ CACHE.remove(address);
+ channel.shutdown();
+ try {
+ channel.awaitTermination(10, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ LOG.error("Couldn't gracefully close gRPC channel={}", channel, e);
+ Thread.currentThread().interrupt();
+ }
+ channel.shutdownNow();
+ }
+ }
+ }
+}
diff --git
a/sdks/java/io/components/src/main/java/org/apache/beam/sdk/io/components/ratelimiter/RateLimiterOptions.java
b/sdks/java/io/components/src/main/java/org/apache/beam/sdk/io/components/ratelimiter/RateLimiterOptions.java
new file mode 100644
index 00000000000..acf49622eb9
--- /dev/null
+++
b/sdks/java/io/components/src/main/java/org/apache/beam/sdk/io/components/ratelimiter/RateLimiterOptions.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.components.ratelimiter;
+
+import static
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;
+
+import com.google.auto.value.AutoValue;
+import java.io.Serializable;
+import java.time.Duration;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.schemas.AutoValueSchema;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
+import org.apache.beam.sdk.schemas.annotations.SchemaFieldDescription;
+
+/** Configuration options for {@link RateLimiterFactory}. */
+@DefaultSchema(AutoValueSchema.class)
+@AutoValue
+public abstract class RateLimiterOptions implements Serializable {
+ @SchemaFieldDescription("Address of the rate limiter")
+ public abstract String getAddress();
+
+ @Nullable
+ @SchemaFieldDescription("Maximum number of retries, defaults to infinite")
+ public abstract Integer getMaxRetries();
+
+ @SchemaFieldDescription("Timeout for rate limiter operations, defaults to 5
seconds")
+ public abstract Duration getTimeout();
+
+ public static Builder builder() {
+ return new
AutoValue_RateLimiterOptions.Builder().setTimeout(Duration.ofSeconds(5));
+ }
+
+ @AutoValue.Builder
+ public abstract static class Builder {
+ public abstract Builder setAddress(String address);
+
+ public abstract Builder setMaxRetries(Integer maxRetries);
+
+ public abstract Builder setTimeout(Duration timeout);
+
+ abstract RateLimiterOptions autoBuild();
+
+ public RateLimiterOptions build() {
+ RateLimiterOptions options = autoBuild();
+ checkArgument(options.getTimeout().compareTo(Duration.ZERO) > 0,
"Timeout must be positive");
+ Integer maxRetries = options.getMaxRetries();
+ if (maxRetries != null) {
+ checkArgument(maxRetries >= 0, "MaxRetries must be non-negative");
+ }
+ return options;
+ }
+ }
+}
diff --git
a/sdks/java/io/components/src/test/java/org/apache/beam/sdk/io/components/ratelimiter/EnvoyRateLimiterTest.java
b/sdks/java/io/components/src/test/java/org/apache/beam/sdk/io/components/ratelimiter/EnvoyRateLimiterTest.java
new file mode 100644
index 00000000000..e94d0b42eb3
--- /dev/null
+++
b/sdks/java/io/components/src/test/java/org/apache/beam/sdk/io/components/ratelimiter/EnvoyRateLimiterTest.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.components.ratelimiter;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThrows;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.Mockito.verify;
+
+import io.envoyproxy.envoy.service.ratelimit.v3.RateLimitRequest;
+import io.envoyproxy.envoy.service.ratelimit.v3.RateLimitResponse;
+import io.envoyproxy.envoy.service.ratelimit.v3.RateLimitServiceGrpc;
+import io.grpc.ManagedChannel;
+import io.grpc.Server;
+import io.grpc.Status;
+import io.grpc.inprocess.InProcessChannelBuilder;
+import io.grpc.inprocess.InProcessServerBuilder;
+import io.grpc.stub.StreamObserver;
+import java.io.IOException;
+import org.apache.beam.sdk.util.Sleeper;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+
+/** Tests for {@link EnvoyRateLimiterFactory}. */
+@RunWith(JUnit4.class)
+public class EnvoyRateLimiterTest {
+ @Mock private Sleeper sleeper;
+
+ private EnvoyRateLimiterFactory factory;
+ private RateLimiterOptions options;
+ private EnvoyRateLimiterContext context;
+
+ private Server server;
+ private ManagedChannel channel;
+ private TestRateLimitService service;
+
+ @Before
+ public void setUp() throws Exception {
+ MockitoAnnotations.openMocks(this);
+ options =
+ RateLimiterOptions.builder()
+ .setAddress("localhost:8081")
+ .setTimeout(java.time.Duration.ofSeconds(1))
+ .build();
+
+ String serverName = InProcessServerBuilder.generateName();
+ service = new TestRateLimitService();
+ server =
+ InProcessServerBuilder.forName(serverName)
+ .directExecutor()
+ .addService(service)
+ .build()
+ .start();
+ channel =
InProcessChannelBuilder.forName(serverName).directExecutor().build();
+
+ factory = new EnvoyRateLimiterFactory(options, sleeper);
+ factory.setStub(RateLimitServiceGrpc.newBlockingStub(channel));
+
+ context =
+ EnvoyRateLimiterContext.builder()
+ .setDomain("test-domain")
+ .addDescriptor("key", "value")
+ .build();
+ }
+
+ @After
+ public void tearDown() {
+ if (channel != null) {
+ channel.shutdownNow();
+ }
+ if (server != null) {
+ server.shutdownNow();
+ }
+ }
+
+ @Test
+ public void testAllow_OK() throws Exception {
+ service.responseToReturn =
+
RateLimitResponse.newBuilder().setOverallCode(RateLimitResponse.Code.OK).build();
+
+ assertTrue(factory.allow(context, 1));
+ }
+
+ @Test
+ public void testAllow_OverLimit() throws Exception {
+ service.responseToReturn =
+ RateLimitResponse.newBuilder()
+ .setOverallCode(RateLimitResponse.Code.OVER_LIMIT)
+ .addStatuses(
+ RateLimitResponse.DescriptorStatus.newBuilder()
+ .setCode(RateLimitResponse.Code.OVER_LIMIT)
+ .setDurationUntilReset(
+
com.google.protobuf.Duration.newBuilder().setSeconds(1).build())
+ .build())
+ .build();
+
+ factory =
+ new EnvoyRateLimiterFactory(
+ RateLimiterOptions.builder()
+ .setAddress("foo")
+ .setTimeout(java.time.Duration.ofSeconds(1))
+ .setMaxRetries(1)
+ .build(),
+ sleeper);
+ factory.setStub(RateLimitServiceGrpc.newBlockingStub(channel));
+
+ assertFalse(factory.allow(context, 1));
+
+ // Verify sleep was called.
+ verify(sleeper, org.mockito.Mockito.atLeastOnce()).sleep(anyLong());
+ }
+
+ @Test
+ public void testAllow_RpcError() throws Exception {
+ service.errorToThrow = Status.UNAVAILABLE.asRuntimeException();
+ assertThrows(IOException.class, () -> factory.allow(context, 1));
+ }
+
+ @Test
+ public void testInvalidContext() {
+ assertThrows(
+ IllegalArgumentException.class, () -> factory.allow(new
RateLimiterContext() {}, 1));
+ }
+
+ static class TestRateLimitService extends
RateLimitServiceGrpc.RateLimitServiceImplBase {
+ volatile RateLimitResponse responseToReturn;
+ volatile RuntimeException errorToThrow;
+
+ @Override
+ public void shouldRateLimit(
+ RateLimitRequest request, StreamObserver<RateLimitResponse>
responseObserver) {
+ if (errorToThrow != null) {
+ responseObserver.onError(errorToThrow);
+ return;
+ }
+ if (responseToReturn != null) {
+ responseObserver.onNext(responseToReturn);
+ responseObserver.onCompleted();
+ } else {
+ // Default OK
+ responseObserver.onNext(
+
RateLimitResponse.newBuilder().setOverallCode(RateLimitResponse.Code.OK).build());
+ responseObserver.onCompleted();
+ }
+ }
+ }
+}
diff --git
a/sdks/java/io/components/src/test/java/org/apache/beam/sdk/io/components/ratelimiter/RateLimiterClientCacheTest.java
b/sdks/java/io/components/src/test/java/org/apache/beam/sdk/io/components/ratelimiter/RateLimiterClientCacheTest.java
new file mode 100644
index 00000000000..4eb61b279c3
--- /dev/null
+++
b/sdks/java/io/components/src/test/java/org/apache/beam/sdk/io/components/ratelimiter/RateLimiterClientCacheTest.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.components.ratelimiter;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotSame;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for {@link RateLimiterClientCache}. */
+@RunWith(JUnit4.class)
+public class RateLimiterClientCacheTest {
+
+ @Test
+ public void testGetOrCreate_SameAddress() {
+ String address = "addr1";
+ RateLimiterClientCache client1 =
RateLimiterClientCache.getOrCreate(address);
+ RateLimiterClientCache client2 =
RateLimiterClientCache.getOrCreate(address);
+
+ assertSame(client1, client2);
+ assertFalse(client1.getChannel().isShutdown());
+
+ // cleanup
+ client1.release();
+ // client2 is still using the same channel
+ assertFalse(client1.getChannel().isShutdown());
+ client2.release();
+ assertTrue(client1.getChannel().isShutdown());
+ }
+
+ @Test
+ public void testGetOrCreate_DifferentAddress_ReturnsDifferentInstances() {
+ RateLimiterClientCache client1 =
RateLimiterClientCache.getOrCreate("addr1");
+ RateLimiterClientCache client2 =
RateLimiterClientCache.getOrCreate("addr2");
+
+ assertNotSame(client1, client2);
+
+ assertFalse(client1.getChannel().isShutdown());
+ assertFalse(client2.getChannel().isShutdown());
+ client1.release();
+ assertTrue(client1.getChannel().isShutdown());
+ client2.release();
+ assertTrue(client2.getChannel().isShutdown());
+ }
+
+ @Test
+ public void testConcurrency() throws InterruptedException,
ExecutionException {
+ int threads = 10;
+ int iterations = 100;
+ String address = "concurrent-addr";
+ ExecutorService pool = Executors.newFixedThreadPool(threads);
+ List<Future<Boolean>> futures = new ArrayList<>();
+
+ for (int i = 0; i < threads; i++) {
+ futures.add(
+ pool.submit(
+ new Callable<Boolean>() {
+ @Override
+ public Boolean call() {
+ for (int j = 0; j < iterations; j++) {
+ RateLimiterClientCache client =
RateLimiterClientCache.getOrCreate(address);
+ // do some tiny work
+ try {
+ Thread.sleep(1);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ client.release();
+ }
+ return true;
+ }
+ }));
+ }
+
+ for (Future<Boolean> f : futures) {
+ assertTrue(f.get());
+ }
+
+ pool.shutdown();
+ pool.awaitTermination(5, TimeUnit.SECONDS);
+
+ // After all threads are done, cache should be empty or create new one
cleanly
+ RateLimiterClientCache client =
RateLimiterClientCache.getOrCreate(address);
+ assertFalse(client.getChannel().isShutdown());
+ client.release();
+ assertTrue(client.getChannel().isShutdown());
+ }
+}
diff --git
a/sdks/java/io/components/src/test/java/org/apache/beam/sdk/io/components/ratelimiter/RateLimiterOptionsTest.java
b/sdks/java/io/components/src/test/java/org/apache/beam/sdk/io/components/ratelimiter/RateLimiterOptionsTest.java
new file mode 100644
index 00000000000..cb8674b4e50
--- /dev/null
+++
b/sdks/java/io/components/src/test/java/org/apache/beam/sdk/io/components/ratelimiter/RateLimiterOptionsTest.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.components.ratelimiter;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThrows;
+
+import java.time.Duration;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for {@link RateLimiterOptions}. */
+@RunWith(JUnit4.class)
+public class RateLimiterOptionsTest {
+
+ @Test
+ public void testValidOptions() {
+ RateLimiterOptions options =
+ RateLimiterOptions.builder()
+ .setAddress("localhost:8081")
+ .setTimeout(Duration.ofSeconds(1))
+ .setMaxRetries(3)
+ .build();
+
+ assertEquals("localhost:8081", options.getAddress());
+ assertEquals(Duration.ofSeconds(1), options.getTimeout());
+ assertEquals(Integer.valueOf(3), options.getMaxRetries());
+ }
+
+ @Test
+ public void testNegativeTimeout() {
+ assertThrows(
+ IllegalArgumentException.class,
+ () ->
+ RateLimiterOptions.builder()
+ .setAddress("localhost:8081")
+ .setTimeout(Duration.ofSeconds(-1))
+ .build());
+ }
+
+ @Test
+ public void testZeroTimeout() {
+ assertThrows(
+ IllegalArgumentException.class,
+ () ->
+ RateLimiterOptions.builder()
+ .setAddress("localhost:8081")
+ .setTimeout(Duration.ZERO)
+ .build());
+ }
+
+ @Test
+ public void testNegativeMaxRetries() {
+ assertThrows(
+ IllegalArgumentException.class,
+ () ->
RateLimiterOptions.builder().setAddress("localhost:8081").setMaxRetries(-1).build());
+ }
+
+ @Test
+ public void testNullMaxRetriesIsAllowed() {
+ RateLimiterOptions options =
+
RateLimiterOptions.builder().setAddress("localhost:8081").setMaxRetries(null).build();
+ assertEquals(null, options.getMaxRetries());
+ }
+}