This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch trino-435
in repository https://gitbox.apache.org/repos/asf/doris-thirdparty.git
The following commit(s) were added to refs/heads/trino-435 by this push:
new 331dfa386d9 [opt](kafka) support auth for schema registry (#375)
331dfa386d9 is described below
commit 331dfa386d9839ddd47620924c3e100a5b31aa5a
Author: Mingyu Chen (Rayner) <[email protected]>
AuthorDate: Tue Feb 3 12:06:44 2026 +0800
[opt](kafka) support auth for schema registry (#375)
---
...afkaSchemaRegistryClientPropertiesProvider.java | 40 ++++++++++++++++
.../kafka/schema/confluent/BasicAuthConfig.java | 54 ++++++++++++++++++++++
.../kafka/schema/confluent/ConfluentModule.java | 14 ++++++
.../ConfluentSchemaRegistryBasicAuth.java | 44 ++++++++++++++++++
.../confluent/ConfluentSchemaRegistryConfig.java | 20 ++++++++
.../confluent/ConfluentSchemaRegistryNoAuth.java | 29 ++++++++++++
.../TestConfluentSchemaRegistryConfig.java | 5 ++
7 files changed, 206 insertions(+)
diff --git
a/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/schema/KafkaSchemaRegistryClientPropertiesProvider.java
b/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/schema/KafkaSchemaRegistryClientPropertiesProvider.java
new file mode 100644
index 00000000000..93ed2b72717
--- /dev/null
+++
b/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/schema/KafkaSchemaRegistryClientPropertiesProvider.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.trino.plugin.kafka.schema;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.inject.Inject;
+import
io.trino.plugin.kafka.schema.confluent.SchemaRegistryClientPropertiesProvider;
+
+import java.util.Map;
+
+public class KafkaSchemaRegistryClientPropertiesProvider
+ implements SchemaRegistryClientPropertiesProvider
+{
+ private final SchemaRegistryClientPropertiesProvider auth;
+
+ @Inject
+ public
KafkaSchemaRegistryClientPropertiesProvider(SchemaRegistryClientPropertiesProvider
auth)
+ {
+ this.auth = auth;
+ }
+
+ @Override
+ public Map<String, Object> getSchemaRegistryClientProperties()
+ {
+ ImmutableMap.Builder<String, Object> properties =
ImmutableMap.builder();
+ properties.putAll(auth.getSchemaRegistryClientProperties());
+ return properties.buildOrThrow();
+ }
+}
diff --git
a/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/schema/confluent/BasicAuthConfig.java
b/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/schema/confluent/BasicAuthConfig.java
new file mode 100644
index 00000000000..8f281c04a6c
--- /dev/null
+++
b/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/schema/confluent/BasicAuthConfig.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.trino.plugin.kafka.schema.confluent;
+
+import io.airlift.configuration.Config;
+import io.airlift.configuration.ConfigDescription;
+import io.airlift.configuration.ConfigSecuritySensitive;
+import jakarta.validation.constraints.NotNull;
+
+public class BasicAuthConfig
+{
+ private String username;
+ private String password;
+
+ @NotNull
+ public String getConfluentSchemaRegistryUsername()
+ {
+ return username;
+ }
+
+ @Config("kafka.confluent-schema-registry.basic-auth.username")
+ @ConfigDescription("The username for the Confluent Schema Registry")
+ @ConfigSecuritySensitive
+ public BasicAuthConfig setConfluentSchemaRegistryUsername(String username)
+ {
+ this.username = username;
+ return this;
+ }
+
+ @NotNull
+ public String getConfluentSchemaRegistryPassword()
+ {
+ return password;
+ }
+
+ @Config("kafka.confluent-schema-registry.basic-auth.password")
+ @ConfigSecuritySensitive
+ public BasicAuthConfig setConfluentSchemaRegistryPassword(String password)
+ {
+ this.password = password;
+ return this;
+ }
+}
diff --git
a/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/schema/confluent/ConfluentModule.java
b/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/schema/confluent/ConfluentModule.java
index a3003c135e9..5f47e2ee2e5 100644
---
a/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/schema/confluent/ConfluentModule.java
+++
b/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/schema/confluent/ConfluentModule.java
@@ -52,6 +52,7 @@ import io.trino.plugin.kafka.encoder.avro.AvroRowEncoder;
import io.trino.plugin.kafka.encoder.protobuf.ProtobufRowEncoder;
import io.trino.plugin.kafka.encoder.protobuf.ProtobufSchemaParser;
import io.trino.plugin.kafka.schema.ContentSchemaProvider;
+import
io.trino.plugin.kafka.schema.KafkaSchemaRegistryClientPropertiesProvider;
import io.trino.plugin.kafka.schema.ProtobufAnySupportConfig;
import io.trino.plugin.kafka.schema.TableDescriptionSupplier;
import io.trino.spi.HostAddress;
@@ -76,6 +77,7 @@ import static
com.google.inject.multibindings.Multibinder.newSetBinder;
import static io.airlift.configuration.ConditionalModule.conditionalModule;
import static io.airlift.configuration.ConfigBinder.configBinder;
import static io.trino.plugin.kafka.encoder.EncoderModule.encoderFactory;
+import static
io.trino.plugin.kafka.schema.confluent.ConfluentSchemaRegistryConfig.ConfluentSchemaRegistryAuthType.BASIC_AUTH;
import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED;
import static java.util.Objects.requireNonNull;
@@ -98,8 +100,10 @@ public class ConfluentModule
install(new ConfluentDecoderModule());
install(new ConfluentEncoderModule());
binder.bind(ContentSchemaProvider.class).to(AvroConfluentContentSchemaProvider.class).in(Scopes.SINGLETON);
+ newSetBinder(binder,
SchemaRegistryClientPropertiesProvider.class).addBinding().to(KafkaSchemaRegistryClientPropertiesProvider.class).in(Scopes.SINGLETON);
newSetBinder(binder, SchemaRegistryClientPropertiesProvider.class);
newSetBinder(binder,
SchemaProvider.class).addBinding().to(AvroSchemaProvider.class).in(Scopes.SINGLETON);
+
// Each SchemaRegistry object should have a new instance of
SchemaProvider
newSetBinder(binder,
SchemaProvider.class).addBinding().to(LazyLoadedProtobufSchemaProvider.class);
binder.bind(DynamicMessageProvider.Factory.class).to(ConfluentSchemaRegistryDynamicMessageProvider.Factory.class).in(SINGLETON);
@@ -107,6 +111,16 @@ public class ConfluentModule
binder.bind(TableDescriptionSupplier.class).toProvider(ConfluentSchemaRegistryTableDescriptionSupplier.Factory.class).in(Scopes.SINGLETON);
newMapBinder(binder, String.class,
SchemaParser.class).addBinding("AVRO").to(AvroSchemaParser.class).in(Scopes.SINGLETON);
newMapBinder(binder, String.class,
SchemaParser.class).addBinding("PROTOBUF").to(LazyLoadedProtobufSchemaParser.class).in(Scopes.SINGLETON);
+
+ // Bind the appropriate ConfluentSchemaRegistryAuth implementation
based on configuration
+ ConfluentSchemaRegistryConfig schemaRegistryConfig =
buildConfigObject(ConfluentSchemaRegistryConfig.class);
+ if (schemaRegistryConfig.getConfluentSchemaRegistryAuthType() ==
BASIC_AUTH) {
+ configBinder(binder).bindConfig(BasicAuthConfig.class);
+
binder.bind(SchemaRegistryClientPropertiesProvider.class).to(ConfluentSchemaRegistryBasicAuth.class).in(Scopes.SINGLETON);
+ }
+ else {
+
binder.bind(SchemaRegistryClientPropertiesProvider.class).to(ConfluentSchemaRegistryNoAuth.class).in(Scopes.SINGLETON);
+ }
}
@Provides
diff --git
a/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/schema/confluent/ConfluentSchemaRegistryBasicAuth.java
b/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/schema/confluent/ConfluentSchemaRegistryBasicAuth.java
new file mode 100644
index 00000000000..204116d4e00
--- /dev/null
+++
b/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/schema/confluent/ConfluentSchemaRegistryBasicAuth.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.trino.plugin.kafka.schema.confluent;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.inject.Inject;
+
+import java.util.Map;
+
+import static java.util.Objects.requireNonNull;
+
+public class ConfluentSchemaRegistryBasicAuth
+ implements SchemaRegistryClientPropertiesProvider
+{
+ private final String user;
+ private final String password;
+
+ @Inject
+ public ConfluentSchemaRegistryBasicAuth(BasicAuthConfig basicAuthConfig)
+ {
+ this.user =
requireNonNull(basicAuthConfig.getConfluentSchemaRegistryUsername(), "user is
null");
+ this.password =
requireNonNull(basicAuthConfig.getConfluentSchemaRegistryPassword(), "password
is null");
+ }
+
+ @Override
+ public Map<String, Object> getSchemaRegistryClientProperties()
+ {
+ return ImmutableMap.<String, Object>builder()
+ .put("basic.auth.credentials.source", "USER_INFO")
+ .put("basic.auth.user.info", user + ":" + password)
+ .buildOrThrow();
+ }
+}
diff --git
a/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/schema/confluent/ConfluentSchemaRegistryConfig.java
b/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/schema/confluent/ConfluentSchemaRegistryConfig.java
index a484d7fd8c5..67b0f757d2e 100644
---
a/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/schema/confluent/ConfluentSchemaRegistryConfig.java
+++
b/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/schema/confluent/ConfluentSchemaRegistryConfig.java
@@ -35,7 +35,14 @@ import static java.util.concurrent.TimeUnit.SECONDS;
public class ConfluentSchemaRegistryConfig
{
+ public enum ConfluentSchemaRegistryAuthType
+ {
+ NONE,
+ BASIC_AUTH,
+ }
+
private Set<HostAddress> confluentSchemaRegistryUrls;
+ private ConfluentSchemaRegistryAuthType confluentSchemaRegistryAuthType =
ConfluentSchemaRegistryAuthType.NONE;
private int confluentSchemaRegistryClientCacheSize = 1000;
private EmptyFieldStrategy emptyFieldStrategy = IGNORE;
private Duration confluentSubjectsCacheRefreshInterval = new Duration(1,
SECONDS);
@@ -54,6 +61,19 @@ public class ConfluentSchemaRegistryConfig
return this;
}
+ public ConfluentSchemaRegistryAuthType getConfluentSchemaRegistryAuthType()
+ {
+ return confluentSchemaRegistryAuthType;
+ }
+
+ @Config("kafka.confluent-schema-registry-auth-type")
+ @ConfigDescription("Auth type for logging in Confluent Schema Registry")
+ public ConfluentSchemaRegistryConfig
setConfluentSchemaRegistryAuthType(ConfluentSchemaRegistryAuthType
confluentSchemaRegistryAuthType)
+ {
+ this.confluentSchemaRegistryAuthType = confluentSchemaRegistryAuthType;
+ return this;
+ }
+
@Min(1)
@Max(2000)
public int getConfluentSchemaRegistryClientCacheSize()
diff --git
a/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/schema/confluent/ConfluentSchemaRegistryNoAuth.java
b/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/schema/confluent/ConfluentSchemaRegistryNoAuth.java
new file mode 100644
index 00000000000..869457227c2
--- /dev/null
+++
b/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/schema/confluent/ConfluentSchemaRegistryNoAuth.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.trino.plugin.kafka.schema.confluent;
+
+import com.google.common.collect.ImmutableMap;
+
+import java.util.Map;
+
+/* Empty Schema Registry Auth for registries without any authentication */
+public class ConfluentSchemaRegistryNoAuth
+ implements SchemaRegistryClientPropertiesProvider
+{
+ @Override
+ public Map<String, Object> getSchemaRegistryClientProperties()
+ {
+ return ImmutableMap.of();
+ }
+}
diff --git
a/plugin/trino-kafka/src/test/java/io/trino/plugin/kafka/schema/confluent/TestConfluentSchemaRegistryConfig.java
b/plugin/trino-kafka/src/test/java/io/trino/plugin/kafka/schema/confluent/TestConfluentSchemaRegistryConfig.java
index d807877e915..f24bf2b1575 100644
---
a/plugin/trino-kafka/src/test/java/io/trino/plugin/kafka/schema/confluent/TestConfluentSchemaRegistryConfig.java
+++
b/plugin/trino-kafka/src/test/java/io/trino/plugin/kafka/schema/confluent/TestConfluentSchemaRegistryConfig.java
@@ -24,6 +24,8 @@ import static
io.airlift.configuration.testing.ConfigAssertions.assertRecordedDe
import static io.airlift.configuration.testing.ConfigAssertions.recordDefaults;
import static
io.trino.plugin.kafka.schema.confluent.AvroSchemaConverter.EmptyFieldStrategy.IGNORE;
import static
io.trino.plugin.kafka.schema.confluent.AvroSchemaConverter.EmptyFieldStrategy.MARK;
+import static
io.trino.plugin.kafka.schema.confluent.ConfluentSchemaRegistryConfig.ConfluentSchemaRegistryAuthType.BASIC_AUTH;
+import static
io.trino.plugin.kafka.schema.confluent.ConfluentSchemaRegistryConfig.ConfluentSchemaRegistryAuthType.NONE;
import static java.util.concurrent.TimeUnit.SECONDS;
public class TestConfluentSchemaRegistryConfig
@@ -33,6 +35,7 @@ public class TestConfluentSchemaRegistryConfig
{
assertRecordedDefaults(recordDefaults(ConfluentSchemaRegistryConfig.class)
.setConfluentSchemaRegistryUrls(null)
+ .setConfluentSchemaRegistryAuthType(NONE)
.setConfluentSchemaRegistryClientCacheSize(1000)
.setEmptyFieldStrategy(IGNORE)
.setConfluentSubjectsCacheRefreshInterval(new Duration(1,
SECONDS)));
@@ -43,6 +46,7 @@ public class TestConfluentSchemaRegistryConfig
{
Map<String, String> properties = ImmutableMap.<String, String>builder()
.put("kafka.confluent-schema-registry-url",
"http://schema-registry-a:8081, http://schema-registry-b:8081")
+ .put("kafka.confluent-schema-registry-auth-type", "BASIC_AUTH")
.put("kafka.confluent-schema-registry-client-cache-size",
"1500")
.put("kafka.empty-field-strategy", "MARK")
.put("kafka.confluent-subjects-cache-refresh-interval", "2s")
@@ -50,6 +54,7 @@ public class TestConfluentSchemaRegistryConfig
ConfluentSchemaRegistryConfig expected = new
ConfluentSchemaRegistryConfig()
.setConfluentSchemaRegistryUrls("http://schema-registry-a:8081,
http://schema-registry-b:8081")
+ .setConfluentSchemaRegistryAuthType(BASIC_AUTH)
.setConfluentSchemaRegistryClientCacheSize(1500)
.setEmptyFieldStrategy(MARK)
.setConfluentSubjectsCacheRefreshInterval(new Duration(2,
SECONDS));
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]