This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch trino-435
in repository https://gitbox.apache.org/repos/asf/doris-thirdparty.git


The following commit(s) were added to refs/heads/trino-435 by this push:
     new 331dfa386d9 [opt](kafka) support auth for schema registry (#375)
331dfa386d9 is described below

commit 331dfa386d9839ddd47620924c3e100a5b31aa5a
Author: Mingyu Chen (Rayner) <[email protected]>
AuthorDate: Tue Feb 3 12:06:44 2026 +0800

    [opt](kafka) support auth for schema registry (#375)
---
 ...afkaSchemaRegistryClientPropertiesProvider.java | 40 ++++++++++++++++
 .../kafka/schema/confluent/BasicAuthConfig.java    | 54 ++++++++++++++++++++++
 .../kafka/schema/confluent/ConfluentModule.java    | 14 ++++++
 .../ConfluentSchemaRegistryBasicAuth.java          | 44 ++++++++++++++++++
 .../confluent/ConfluentSchemaRegistryConfig.java   | 20 ++++++++
 .../confluent/ConfluentSchemaRegistryNoAuth.java   | 29 ++++++++++++
 .../TestConfluentSchemaRegistryConfig.java         |  5 ++
 7 files changed, 206 insertions(+)

diff --git 
a/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/schema/KafkaSchemaRegistryClientPropertiesProvider.java
 
b/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/schema/KafkaSchemaRegistryClientPropertiesProvider.java
new file mode 100644
index 00000000000..93ed2b72717
--- /dev/null
+++ 
b/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/schema/KafkaSchemaRegistryClientPropertiesProvider.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.trino.plugin.kafka.schema;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.inject.Inject;
+import 
io.trino.plugin.kafka.schema.confluent.SchemaRegistryClientPropertiesProvider;
+
+import java.util.Map;
+
+public class KafkaSchemaRegistryClientPropertiesProvider
+        implements SchemaRegistryClientPropertiesProvider
+{
+    private final SchemaRegistryClientPropertiesProvider auth;
+
+    @Inject
+    public 
KafkaSchemaRegistryClientPropertiesProvider(SchemaRegistryClientPropertiesProvider
 auth)
+    {
+        this.auth = auth;
+    }
+
+    @Override
+    public Map<String, Object> getSchemaRegistryClientProperties()
+    {
+        ImmutableMap.Builder<String, Object> properties = 
ImmutableMap.builder();
+        properties.putAll(auth.getSchemaRegistryClientProperties());
+        return properties.buildOrThrow();
+    }
+}
diff --git 
a/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/schema/confluent/BasicAuthConfig.java
 
b/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/schema/confluent/BasicAuthConfig.java
new file mode 100644
index 00000000000..8f281c04a6c
--- /dev/null
+++ 
b/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/schema/confluent/BasicAuthConfig.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.trino.plugin.kafka.schema.confluent;
+
+import io.airlift.configuration.Config;
+import io.airlift.configuration.ConfigDescription;
+import io.airlift.configuration.ConfigSecuritySensitive;
+import jakarta.validation.constraints.NotNull;
+
+public class BasicAuthConfig
+{
+    private String username;
+    private String password;
+
+    @NotNull
+    public String getConfluentSchemaRegistryUsername()
+    {
+        return username;
+    }
+
+    @Config("kafka.confluent-schema-registry.basic-auth.username")
+    @ConfigDescription("The username for the Confluent Schema Registry")
+    @ConfigSecuritySensitive
+    public BasicAuthConfig setConfluentSchemaRegistryUsername(String username)
+    {
+        this.username = username;
+        return this;
+    }
+
+    @NotNull
+    public String getConfluentSchemaRegistryPassword()
+    {
+        return password;
+    }
+
+    @Config("kafka.confluent-schema-registry.basic-auth.password")
+    @ConfigSecuritySensitive
+    public BasicAuthConfig setConfluentSchemaRegistryPassword(String password)
+    {
+        this.password = password;
+        return this;
+    }
+}
diff --git 
a/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/schema/confluent/ConfluentModule.java
 
b/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/schema/confluent/ConfluentModule.java
index a3003c135e9..5f47e2ee2e5 100644
--- 
a/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/schema/confluent/ConfluentModule.java
+++ 
b/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/schema/confluent/ConfluentModule.java
@@ -52,6 +52,7 @@ import io.trino.plugin.kafka.encoder.avro.AvroRowEncoder;
 import io.trino.plugin.kafka.encoder.protobuf.ProtobufRowEncoder;
 import io.trino.plugin.kafka.encoder.protobuf.ProtobufSchemaParser;
 import io.trino.plugin.kafka.schema.ContentSchemaProvider;
+import 
io.trino.plugin.kafka.schema.KafkaSchemaRegistryClientPropertiesProvider;
 import io.trino.plugin.kafka.schema.ProtobufAnySupportConfig;
 import io.trino.plugin.kafka.schema.TableDescriptionSupplier;
 import io.trino.spi.HostAddress;
@@ -76,6 +77,7 @@ import static 
com.google.inject.multibindings.Multibinder.newSetBinder;
 import static io.airlift.configuration.ConditionalModule.conditionalModule;
 import static io.airlift.configuration.ConfigBinder.configBinder;
 import static io.trino.plugin.kafka.encoder.EncoderModule.encoderFactory;
+import static 
io.trino.plugin.kafka.schema.confluent.ConfluentSchemaRegistryConfig.ConfluentSchemaRegistryAuthType.BASIC_AUTH;
 import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED;
 import static java.util.Objects.requireNonNull;
 
@@ -98,8 +100,10 @@ public class ConfluentModule
         install(new ConfluentDecoderModule());
         install(new ConfluentEncoderModule());
         
binder.bind(ContentSchemaProvider.class).to(AvroConfluentContentSchemaProvider.class).in(Scopes.SINGLETON);
+        newSetBinder(binder, 
SchemaRegistryClientPropertiesProvider.class).addBinding().to(KafkaSchemaRegistryClientPropertiesProvider.class).in(Scopes.SINGLETON);
         newSetBinder(binder, SchemaRegistryClientPropertiesProvider.class);
         newSetBinder(binder, 
SchemaProvider.class).addBinding().to(AvroSchemaProvider.class).in(Scopes.SINGLETON);
+
         // Each SchemaRegistry object should have a new instance of 
SchemaProvider
         newSetBinder(binder, 
SchemaProvider.class).addBinding().to(LazyLoadedProtobufSchemaProvider.class);
         
binder.bind(DynamicMessageProvider.Factory.class).to(ConfluentSchemaRegistryDynamicMessageProvider.Factory.class).in(SINGLETON);
@@ -107,6 +111,16 @@ public class ConfluentModule
         
binder.bind(TableDescriptionSupplier.class).toProvider(ConfluentSchemaRegistryTableDescriptionSupplier.Factory.class).in(Scopes.SINGLETON);
         newMapBinder(binder, String.class, 
SchemaParser.class).addBinding("AVRO").to(AvroSchemaParser.class).in(Scopes.SINGLETON);
         newMapBinder(binder, String.class, 
SchemaParser.class).addBinding("PROTOBUF").to(LazyLoadedProtobufSchemaParser.class).in(Scopes.SINGLETON);
+
+        // Bind the appropriate ConfluentSchemaRegistryAuth implementation 
based on configuration
+        ConfluentSchemaRegistryConfig schemaRegistryConfig = 
buildConfigObject(ConfluentSchemaRegistryConfig.class);
+        if (schemaRegistryConfig.getConfluentSchemaRegistryAuthType() == 
BASIC_AUTH) {
+            configBinder(binder).bindConfig(BasicAuthConfig.class);
+            
binder.bind(SchemaRegistryClientPropertiesProvider.class).to(ConfluentSchemaRegistryBasicAuth.class).in(Scopes.SINGLETON);
+        }
+        else {
+            
binder.bind(SchemaRegistryClientPropertiesProvider.class).to(ConfluentSchemaRegistryNoAuth.class).in(Scopes.SINGLETON);
+        }
     }
 
     @Provides
diff --git 
a/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/schema/confluent/ConfluentSchemaRegistryBasicAuth.java
 
b/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/schema/confluent/ConfluentSchemaRegistryBasicAuth.java
new file mode 100644
index 00000000000..204116d4e00
--- /dev/null
+++ 
b/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/schema/confluent/ConfluentSchemaRegistryBasicAuth.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.trino.plugin.kafka.schema.confluent;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.inject.Inject;
+
+import java.util.Map;
+
+import static java.util.Objects.requireNonNull;
+
+public class ConfluentSchemaRegistryBasicAuth
+        implements SchemaRegistryClientPropertiesProvider
+{
+    private final String user;
+    private final String password;
+
+    @Inject
+    public ConfluentSchemaRegistryBasicAuth(BasicAuthConfig basicAuthConfig)
+    {
+        this.user = 
requireNonNull(basicAuthConfig.getConfluentSchemaRegistryUsername(), "user is 
null");
+        this.password = 
requireNonNull(basicAuthConfig.getConfluentSchemaRegistryPassword(), "password 
is null");
+    }
+
+    @Override
+    public Map<String, Object> getSchemaRegistryClientProperties()
+    {
+        return ImmutableMap.<String, Object>builder()
+                .put("basic.auth.credentials.source", "USER_INFO")
+                .put("basic.auth.user.info", user + ":" + password)
+                .buildOrThrow();
+    }
+}
diff --git 
a/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/schema/confluent/ConfluentSchemaRegistryConfig.java
 
b/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/schema/confluent/ConfluentSchemaRegistryConfig.java
index a484d7fd8c5..67b0f757d2e 100644
--- 
a/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/schema/confluent/ConfluentSchemaRegistryConfig.java
+++ 
b/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/schema/confluent/ConfluentSchemaRegistryConfig.java
@@ -35,7 +35,14 @@ import static java.util.concurrent.TimeUnit.SECONDS;
 
 public class ConfluentSchemaRegistryConfig
 {
+    public enum ConfluentSchemaRegistryAuthType
+    {
+        NONE,
+        BASIC_AUTH,
+    }
+
     private Set<HostAddress> confluentSchemaRegistryUrls;
+    private ConfluentSchemaRegistryAuthType confluentSchemaRegistryAuthType = 
ConfluentSchemaRegistryAuthType.NONE;
     private int confluentSchemaRegistryClientCacheSize = 1000;
     private EmptyFieldStrategy emptyFieldStrategy = IGNORE;
     private Duration confluentSubjectsCacheRefreshInterval = new Duration(1, 
SECONDS);
@@ -54,6 +61,19 @@ public class ConfluentSchemaRegistryConfig
         return this;
     }
 
+    public ConfluentSchemaRegistryAuthType getConfluentSchemaRegistryAuthType()
+    {
+        return confluentSchemaRegistryAuthType;
+    }
+
+    @Config("kafka.confluent-schema-registry-auth-type")
+    @ConfigDescription("Auth type for logging in Confluent Schema Registry")
+    public ConfluentSchemaRegistryConfig 
setConfluentSchemaRegistryAuthType(ConfluentSchemaRegistryAuthType 
confluentSchemaRegistryAuthType)
+    {
+        this.confluentSchemaRegistryAuthType = confluentSchemaRegistryAuthType;
+        return this;
+    }
+
     @Min(1)
     @Max(2000)
     public int getConfluentSchemaRegistryClientCacheSize()
diff --git 
a/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/schema/confluent/ConfluentSchemaRegistryNoAuth.java
 
b/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/schema/confluent/ConfluentSchemaRegistryNoAuth.java
new file mode 100644
index 00000000000..869457227c2
--- /dev/null
+++ 
b/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/schema/confluent/ConfluentSchemaRegistryNoAuth.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.trino.plugin.kafka.schema.confluent;
+
+import com.google.common.collect.ImmutableMap;
+
+import java.util.Map;
+
+/* Empty Schema Registry Auth for registries without any authentication */
+public class ConfluentSchemaRegistryNoAuth
+        implements SchemaRegistryClientPropertiesProvider
+{
+    @Override
+    public Map<String, Object> getSchemaRegistryClientProperties()
+    {
+        return ImmutableMap.of();
+    }
+}
diff --git 
a/plugin/trino-kafka/src/test/java/io/trino/plugin/kafka/schema/confluent/TestConfluentSchemaRegistryConfig.java
 
b/plugin/trino-kafka/src/test/java/io/trino/plugin/kafka/schema/confluent/TestConfluentSchemaRegistryConfig.java
index d807877e915..f24bf2b1575 100644
--- 
a/plugin/trino-kafka/src/test/java/io/trino/plugin/kafka/schema/confluent/TestConfluentSchemaRegistryConfig.java
+++ 
b/plugin/trino-kafka/src/test/java/io/trino/plugin/kafka/schema/confluent/TestConfluentSchemaRegistryConfig.java
@@ -24,6 +24,8 @@ import static 
io.airlift.configuration.testing.ConfigAssertions.assertRecordedDe
 import static io.airlift.configuration.testing.ConfigAssertions.recordDefaults;
 import static 
io.trino.plugin.kafka.schema.confluent.AvroSchemaConverter.EmptyFieldStrategy.IGNORE;
 import static 
io.trino.plugin.kafka.schema.confluent.AvroSchemaConverter.EmptyFieldStrategy.MARK;
+import static 
io.trino.plugin.kafka.schema.confluent.ConfluentSchemaRegistryConfig.ConfluentSchemaRegistryAuthType.BASIC_AUTH;
+import static 
io.trino.plugin.kafka.schema.confluent.ConfluentSchemaRegistryConfig.ConfluentSchemaRegistryAuthType.NONE;
 import static java.util.concurrent.TimeUnit.SECONDS;
 
 public class TestConfluentSchemaRegistryConfig
@@ -33,6 +35,7 @@ public class TestConfluentSchemaRegistryConfig
     {
         
assertRecordedDefaults(recordDefaults(ConfluentSchemaRegistryConfig.class)
                 .setConfluentSchemaRegistryUrls(null)
+                .setConfluentSchemaRegistryAuthType(NONE)
                 .setConfluentSchemaRegistryClientCacheSize(1000)
                 .setEmptyFieldStrategy(IGNORE)
                 .setConfluentSubjectsCacheRefreshInterval(new Duration(1, 
SECONDS)));
@@ -43,6 +46,7 @@ public class TestConfluentSchemaRegistryConfig
     {
         Map<String, String> properties = ImmutableMap.<String, String>builder()
                 .put("kafka.confluent-schema-registry-url", 
"http://schema-registry-a:8081, http://schema-registry-b:8081";)
+                .put("kafka.confluent-schema-registry-auth-type", "BASIC_AUTH")
                 .put("kafka.confluent-schema-registry-client-cache-size", 
"1500")
                 .put("kafka.empty-field-strategy", "MARK")
                 .put("kafka.confluent-subjects-cache-refresh-interval", "2s")
@@ -50,6 +54,7 @@ public class TestConfluentSchemaRegistryConfig
 
         ConfluentSchemaRegistryConfig expected = new 
ConfluentSchemaRegistryConfig()
                 
.setConfluentSchemaRegistryUrls("http://schema-registry-a:8081, 
http://schema-registry-b:8081";)
+                .setConfluentSchemaRegistryAuthType(BASIC_AUTH)
                 .setConfluentSchemaRegistryClientCacheSize(1500)
                 .setEmptyFieldStrategy(MARK)
                 .setConfluentSubjectsCacheRefreshInterval(new Duration(2, 
SECONDS));


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to