lukecwik commented on a change in pull request #16652:
URL: https://github.com/apache/beam/pull/16652#discussion_r795847493



##########
File path: 
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ConfluentSchemaRegistryDeserializerProvider.java
##########
@@ -73,25 +73,29 @@
   }
 
   public static <T> ConfluentSchemaRegistryDeserializerProvider<T> of(
-      String schemaRegistryUrl, String subject) {
-    return of(schemaRegistryUrl, subject, null, null);
+      String schemaRegistryUrl, Integer schemaRegistryCacheCapacity, String 
subject) {
+    return of(schemaRegistryUrl, schemaRegistryCacheCapacity, subject, null, 
null);
   }
 
   public static <T> ConfluentSchemaRegistryDeserializerProvider<T> of(
-      String schemaRegistryUrl, String subject, @Nullable Integer version) {
-    return of(schemaRegistryUrl, subject, version, null);
+      String schemaRegistryUrl,
+      Integer schemaRegistryCacheCapacity,
+      String subject,
+      @Nullable Integer version) {
+    return of(schemaRegistryUrl, schemaRegistryCacheCapacity, subject, 
version, null);
   }
 
   public static <T> ConfluentSchemaRegistryDeserializerProvider<T> of(
       String schemaRegistryUrl,
+      Integer schemaRegistryCacheCapacity,

Review comment:
       ```suggestion
         int schemaRegistryCacheCapacity,
   ```

##########
File path: 
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ConfluentSchemaRegistryDeserializerProvider.java
##########
@@ -73,25 +73,29 @@
   }
 
   public static <T> ConfluentSchemaRegistryDeserializerProvider<T> of(
-      String schemaRegistryUrl, String subject) {
-    return of(schemaRegistryUrl, subject, null, null);
+      String schemaRegistryUrl, Integer schemaRegistryCacheCapacity, String 
subject) {
+    return of(schemaRegistryUrl, schemaRegistryCacheCapacity, subject, null, 
null);
   }
 
   public static <T> ConfluentSchemaRegistryDeserializerProvider<T> of(
-      String schemaRegistryUrl, String subject, @Nullable Integer version) {
-    return of(schemaRegistryUrl, subject, version, null);
+      String schemaRegistryUrl,
+      Integer schemaRegistryCacheCapacity,

Review comment:
       ```suggestion
         int schemaRegistryCacheCapacity,
   ```

##########
File path: 
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ConfluentSchemaRegistryDeserializerProvider.java
##########
@@ -73,25 +73,29 @@
   }
 
   public static <T> ConfluentSchemaRegistryDeserializerProvider<T> of(
-      String schemaRegistryUrl, String subject) {
-    return of(schemaRegistryUrl, subject, null, null);
+      String schemaRegistryUrl, Integer schemaRegistryCacheCapacity, String 
subject) {
+    return of(schemaRegistryUrl, schemaRegistryCacheCapacity, subject, null, 
null);
   }
 
   public static <T> ConfluentSchemaRegistryDeserializerProvider<T> of(
-      String schemaRegistryUrl, String subject, @Nullable Integer version) {
-    return of(schemaRegistryUrl, subject, version, null);
+      String schemaRegistryUrl,
+      Integer schemaRegistryCacheCapacity,
+      String subject,
+      @Nullable Integer version) {
+    return of(schemaRegistryUrl, schemaRegistryCacheCapacity, subject, 
version, null);
   }
 
   public static <T> ConfluentSchemaRegistryDeserializerProvider<T> of(
       String schemaRegistryUrl,
+      Integer schemaRegistryCacheCapacity,
       String subject,
       @Nullable Integer version,
       @Nullable Map<String, ?> schemaRegistryConfigs) {
     return new ConfluentSchemaRegistryDeserializerProvider(
         (SerializableFunction<Void, SchemaRegistryClient>)
             input ->
                 new CachedSchemaRegistryClient(
-                    schemaRegistryUrl, Integer.MAX_VALUE, 
schemaRegistryConfigs),

Review comment:
       We try to keep backwards compatibility when possible so it would be nice 
to keep the existing methods around even if marked experimental.
   
   Finally, we should change this to `1000` since that seems to be the default 
if _unspecified_.

##########
File path: 
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ConfluentSchemaRegistryDeserializerProvider.java
##########
@@ -73,25 +73,29 @@
   }
 
   public static <T> ConfluentSchemaRegistryDeserializerProvider<T> of(
-      String schemaRegistryUrl, String subject) {
-    return of(schemaRegistryUrl, subject, null, null);
+      String schemaRegistryUrl, Integer schemaRegistryCacheCapacity, String 
subject) {

Review comment:
       ```suggestion
         String schemaRegistryUrl, int schemaRegistryCacheCapacity, String 
subject) {
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to