This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch trino-435
in repository https://gitbox.apache.org/repos/asf/doris-thirdparty.git
The following commit(s) were added to refs/heads/trino-435 by this push:
new d1c8da92ab9 [kafka] Support subject name mapping for Confluent Schema
Registry (#378)
d1c8da92ab9 is described below
commit d1c8da92ab9b7e41a8aa409da7832fe0a07f3499
Author: Mingyu Chen (Rayner) <[email protected]>
AuthorDate: Sat Feb 7 01:07:58 2026 +0800
[kafka] Support subject name mapping for Confluent Schema Registry (#378)
Sometimes the subject name in Schema Registry does not match the Kafka
topic name. This adds a new configuration property
kafka.confluent-schema-registry-subject-mapping that allows users to
manually specify schema.table to topic name mappings.
The mapping is configured as a comma-separated list in the format:
schema1.table1:topic1,schema2.table2:topic2
When a mapping is configured for a given schema.table, the topic name
in TopicAndSubjects is overridden with the mapped value, allowing the
connector to correctly resolve schemas for topics whose subject names
differ from their topic names.
---
.../confluent/ConfluentSchemaRegistryConfig.java | 56 ++++++++++++++++++++++
...uentSchemaRegistryTableDescriptionSupplier.java | 23 ++++++++-
...uentSchemaRegistryTableDescriptionSupplier.java | 3 +-
3 files changed, 79 insertions(+), 3 deletions(-)
diff --git
a/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/schema/confluent/ConfluentSchemaRegistryConfig.java
b/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/schema/confluent/ConfluentSchemaRegistryConfig.java
index 67b0f757d2e..c2810d9f422 100644
---
a/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/schema/confluent/ConfluentSchemaRegistryConfig.java
+++
b/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/schema/confluent/ConfluentSchemaRegistryConfig.java
@@ -14,6 +14,7 @@
package io.trino.plugin.kafka.schema.confluent;
import com.google.common.base.Splitter;
+import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import io.airlift.configuration.Config;
import io.airlift.configuration.ConfigDescription;
@@ -22,15 +23,20 @@ import io.airlift.units.MaxDuration;
import io.airlift.units.MinDuration;
import
io.trino.plugin.kafka.schema.confluent.AvroSchemaConverter.EmptyFieldStrategy;
import io.trino.spi.HostAddress;
+import io.trino.spi.connector.SchemaTableName;
import jakarta.validation.constraints.Max;
import jakarta.validation.constraints.Min;
import jakarta.validation.constraints.Size;
+import java.util.List;
+import java.util.Map;
import java.util.Set;
+import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.collect.ImmutableSet.toImmutableSet;
import static com.google.common.collect.Streams.stream;
import static
io.trino.plugin.kafka.schema.confluent.AvroSchemaConverter.EmptyFieldStrategy.IGNORE;
+import static java.util.Objects.requireNonNull;
import static java.util.concurrent.TimeUnit.SECONDS;
public class ConfluentSchemaRegistryConfig
@@ -46,6 +52,7 @@ public class ConfluentSchemaRegistryConfig
private int confluentSchemaRegistryClientCacheSize = 1000;
private EmptyFieldStrategy emptyFieldStrategy = IGNORE;
private Duration confluentSubjectsCacheRefreshInterval = new Duration(1,
SECONDS);
+ private Map<SchemaTableName, String> confluentSchemaRegistrySubjectMapping
= ImmutableMap.of();
@Size(min = 1)
public Set<HostAddress> getConfluentSchemaRegistryUrls()
@@ -117,6 +124,19 @@ public class ConfluentSchemaRegistryConfig
return this;
}
+ public Map<SchemaTableName, String>
getConfluentSchemaRegistrySubjectMapping()
+ {
+ return confluentSchemaRegistrySubjectMapping;
+ }
+
+ @Config("kafka.confluent-schema-registry-subject-mapping")
+ @ConfigDescription("Comma-separated list of schema.table to actual topic
name mappings. Format: schema1.table1:topic1,schema2.table2:topic2")
+ public ConfluentSchemaRegistryConfig
setConfluentSchemaRegistrySubjectMapping(String mapping)
+ {
+ this.confluentSchemaRegistrySubjectMapping = (mapping == null) ?
ImmutableMap.of() : parseSubjectMapping(mapping);
+ return this;
+ }
+
private static ImmutableSet<HostAddress> parseNodes(String nodes)
{
Splitter splitter = Splitter.on(',').omitEmptyStrings().trimResults();
@@ -129,4 +149,40 @@ public class ConfluentSchemaRegistryConfig
{
return HostAddress.fromString(value);
}
+
+ private static ImmutableMap<SchemaTableName, String>
parseSubjectMapping(String mapping)
+ {
+ requireNonNull(mapping, "mapping is null");
+
+ Splitter entrySplitter =
Splitter.on(',').omitEmptyStrings().trimResults();
+ Splitter keyValueSplitter = Splitter.on(':').trimResults();
+
+ ImmutableMap.Builder<SchemaTableName, String> builder =
ImmutableMap.builder();
+
+ for (String entry : entrySplitter.split(mapping)) {
+ List<String> parts = keyValueSplitter.splitToList(entry);
+ checkArgument(parts.size() == 2,
+ "Invalid mapping format '%s'. Expected format:
schema.table:topic", entry);
+
+ String schemaTable = parts.get(0);
+ String topicName = parts.get(1);
+
+ List<String> schemaTableParts =
Splitter.on('.').trimResults().splitToList(schemaTable);
+ checkArgument(schemaTableParts.size() == 2,
+ "Invalid schema.table format '%s'. Expected format:
schema.table", schemaTable);
+
+ String schema = schemaTableParts.get(0);
+ String table = schemaTableParts.get(1);
+
+ checkArgument(!schema.isEmpty() && !table.isEmpty(),
+ "Schema and table names cannot be empty in '%s'",
schemaTable);
+ checkArgument(!topicName.isEmpty(),
+ "Topic name cannot be empty in mapping '%s'", entry);
+
+ SchemaTableName schemaTableName = new SchemaTableName(schema,
table);
+ builder.put(schemaTableName, topicName);
+ }
+
+ return builder.buildOrThrow();
+ }
}
diff --git
a/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/schema/confluent/ConfluentSchemaRegistryTableDescriptionSupplier.java
b/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/schema/confluent/ConfluentSchemaRegistryTableDescriptionSupplier.java
index b18236eb065..40f851f454c 100644
---
a/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/schema/confluent/ConfluentSchemaRegistryTableDescriptionSupplier.java
+++
b/plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/schema/confluent/ConfluentSchemaRegistryTableDescriptionSupplier.java
@@ -76,16 +76,19 @@ public class ConfluentSchemaRegistryTableDescriptionSupplier
private final String defaultSchema;
private final Supplier<SetMultimap<String, TopicAndSubjects>>
topicAndSubjectsSupplier;
private final Supplier<SetMultimap<String, String>> subjectsSupplier;
+ private final Map<SchemaTableName, String> subjectMapping;
public ConfluentSchemaRegistryTableDescriptionSupplier(
SchemaRegistryClient schemaRegistryClient,
Map<String, SchemaParser> schemaParsers,
String defaultSchema,
- Duration subjectsCacheRefreshInterval)
+ Duration subjectsCacheRefreshInterval,
+ Map<SchemaTableName, String> subjectMapping)
{
this.schemaRegistryClient = requireNonNull(schemaRegistryClient,
"schemaRegistryClient is null");
this.schemaParsers = ImmutableMap.copyOf(requireNonNull(schemaParsers,
"schemaParsers is null"));
this.defaultSchema = requireNonNull(defaultSchema, "defaultSchema is
null");
+ this.subjectMapping =
ImmutableMap.copyOf(requireNonNull(subjectMapping, "subjectMapping is null"));
topicAndSubjectsSupplier =
memoizeWithExpiration(this::getTopicAndSubjects,
subjectsCacheRefreshInterval.toMillis(), MILLISECONDS);
subjectsSupplier = memoizeWithExpiration(this::getAllSubjects,
subjectsCacheRefreshInterval.toMillis(), MILLISECONDS);
}
@@ -97,6 +100,7 @@ public class ConfluentSchemaRegistryTableDescriptionSupplier
private final Map<String, SchemaParser> schemaParsers;
private final String defaultSchema;
private final Duration subjectsCacheRefreshInterval;
+ private final Map<SchemaTableName, String> subjectMapping;
@Inject
public Factory(
@@ -109,12 +113,18 @@ public class
ConfluentSchemaRegistryTableDescriptionSupplier
this.schemaParsers =
ImmutableMap.copyOf(requireNonNull(schemaParsers, "schemaParsers is null"));
this.defaultSchema = kafkaConfig.getDefaultSchema();
this.subjectsCacheRefreshInterval =
confluentConfig.getConfluentSubjectsCacheRefreshInterval();
+ this.subjectMapping =
confluentConfig.getConfluentSchemaRegistrySubjectMapping();
}
@Override
public TableDescriptionSupplier get()
{
- return new
ConfluentSchemaRegistryTableDescriptionSupplier(schemaRegistryClient,
schemaParsers, defaultSchema, subjectsCacheRefreshInterval);
+ return new ConfluentSchemaRegistryTableDescriptionSupplier(
+ schemaRegistryClient,
+ schemaParsers,
+ defaultSchema,
+ subjectsCacheRefreshInterval,
+ subjectMapping);
}
}
@@ -195,6 +205,15 @@ public class
ConfluentSchemaRegistryTableDescriptionSupplier
topicAndSubjects.getValueSubject().or(topicAndSubjectsFromCache::getValueSubject));
}
+ // Apply subject mapping override if configured
+ if (subjectMapping.containsKey(schemaTableName)) {
+ String overrideTopic = subjectMapping.get(schemaTableName);
+ topicAndSubjects = new TopicAndSubjects(
+ overrideTopic,
+ topicAndSubjects.getKeySubject(),
+ topicAndSubjects.getValueSubject());
+ }
+
if (topicAndSubjects.getKeySubject().isEmpty() &&
topicAndSubjects.getValueSubject().isEmpty()) {
return Optional.empty();
}
diff --git
a/plugin/trino-kafka/src/test/java/io/trino/plugin/kafka/schema/confluent/TestConfluentSchemaRegistryTableDescriptionSupplier.java
b/plugin/trino-kafka/src/test/java/io/trino/plugin/kafka/schema/confluent/TestConfluentSchemaRegistryTableDescriptionSupplier.java
index b9b7ca6e637..b43cf869c45 100644
---
a/plugin/trino-kafka/src/test/java/io/trino/plugin/kafka/schema/confluent/TestConfluentSchemaRegistryTableDescriptionSupplier.java
+++
b/plugin/trino-kafka/src/test/java/io/trino/plugin/kafka/schema/confluent/TestConfluentSchemaRegistryTableDescriptionSupplier.java
@@ -195,7 +195,8 @@ public class
TestConfluentSchemaRegistryTableDescriptionSupplier
SCHEMA_REGISTRY_CLIENT,
ImmutableMap.of("AVRO", new AvroSchemaParser(new
TestingTypeManager())),
DEFAULT_NAME,
- new Duration(1, SECONDS));
+ new Duration(1, SECONDS),
+ ImmutableMap.of());
}
private static Schema getAvroSchema(String topicName, String
columnNamePrefix)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]