This is an automated email from the ASF dual-hosted git repository. twalthr pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 8f77862fa5ecbec5ee26b7b2b68478ad50943a3e Author: slinkydeveloper <[email protected]> AuthorDate: Thu Jan 6 16:37:17 2022 +0100 [FLINK-25391][format-avro] Forward catalog table options --- .../docs/connectors/table/formats/avro-confluent.md | 15 ++++++++++++++- docs/content/docs/connectors/table/formats/avro.md | 5 ++++- .../registry/confluent/RegistryAvroFormatFactory.java | 19 +++++++++++++++++++ .../flink/formats/avro/AvroFileFormatFactory.java | 5 +++++ 4 files changed, 42 insertions(+), 2 deletions(-) diff --git a/docs/content/docs/connectors/table/formats/avro-confluent.md b/docs/content/docs/connectors/table/formats/avro-confluent.md index cf2fe70..28b33da 100644 --- a/docs/content/docs/connectors/table/formats/avro-confluent.md +++ b/docs/content/docs/connectors/table/formats/avro-confluent.md @@ -176,15 +176,17 @@ Format Options <tr> <th class="text-left" style="width: 25%">Option</th> <th class="text-center" style="width: 8%">Required</th> + <th class="text-center" style="width: 8%">Forwarded</th> <th class="text-center" style="width: 7%">Default</th> <th class="text-center" style="width: 10%">Type</th> - <th class="text-center" style="width: 50%">Description</th> + <th class="text-center" style="width: 42%">Description</th> </tr> </thead> <tbody> <tr> <td><h5>format</h5></td> <td>required</td> + <td>no</td> <td style="word-wrap: break-word;">(none)</td> <td>String</td> <td>Specify what format to use, here should be <code>'avro-confluent'</code>.</td> @@ -192,6 +194,7 @@ Format Options <tr> <td><h5>avro-confluent.basic-auth.credentials-source</h5></td> <td>optional</td> + <td>yes</td> <td style="word-wrap: break-word;">(none)</td> <td>String</td> <td>Basic auth credentials source for Schema Registry</td> @@ -199,6 +202,7 @@ Format Options <tr> <td><h5>avro-confluent.basic-auth.user-info</h5></td> <td>optional</td> + <td>yes</td> <td style="word-wrap: break-word;">(none)</td> <td>String</td> <td>Basic auth user info for schema registry</td> @@ -206,6 +210,7 @@ Format Options <tr> <td><h5>avro-confluent.bearer-auth.credentials-source</h5></td> <td>optional</td> + <td>yes</td> <td style="word-wrap: break-word;">(none)</td> <td>String</td> <td>Bearer auth credentials source for Schema Registry</td> @@ -213,6 +218,7 @@ Format Options <tr> <td><h5>avro-confluent.bearer-auth.token</h5></td> <td>optional</td> + <td>yes</td> <td style="word-wrap: break-word;">(none)</td> <td>String</td> <td>Bearer auth token for Schema Registry</td> @@ -220,6 +226,7 @@ Format Options <tr> <td><h5>avro-confluent.properties</h5></td> <td>optional</td> + <td>yes</td> <td style="word-wrap: break-word;">(none)</td> <td>Map</td> <td>Properties map that is forwarded to the underlying Schema Registry. This is useful for options that are not officially exposed via Flink config options. However, note that Flink options have higher precedence.</td> @@ -227,6 +234,7 @@ Format Options <tr> <td><h5>avro-confluent.ssl.keystore.location</h5></td> <td>optional</td> + <td>yes</td> <td style="word-wrap: break-word;">(none)</td> <td>String</td> <td>Location / File of SSL keystore</td> @@ -234,6 +242,7 @@ Format Options <tr> <td><h5>avro-confluent.ssl.keystore.password</h5></td> <td>optional</td> + <td>yes</td> <td style="word-wrap: break-word;">(none)</td> <td>String</td> <td>Password for SSL keystore</td> @@ -241,6 +250,7 @@ Format Options <tr> <td><h5>avro-confluent.ssl.truststore.location</h5></td> <td>optional</td> + <td>yes</td> <td style="word-wrap: break-word;">(none)</td> <td>String</td> <td>Location / File of SSL truststore</td> @@ -248,6 +258,7 @@ Format Options <tr> <td><h5>avro-confluent.ssl.truststore.password</h5></td> <td>optional</td> + <td>yes</td> <td style="word-wrap: break-word;">(none)</td> <td>String</td> <td>Password for SSL truststore</td> @@ -255,6 +266,7 @@ Format Options <tr> <td><h5>avro-confluent.subject</h5></td> <td>optional</td> + <td>yes</td> <td style="word-wrap: break-word;">(none)</td> <td>String</td> <td>The Confluent Schema Registry subject under which to register the schema used by this format during serialization. By default, 'kafka' and 'upsert-kafka' connectors use '<topic_name>-value' or '<topic_name>-key' as the default subject name if this format is used as the value or key format. But for other connectors (e.g. 'filesystem'), the subject option is required when used as sink.</td> @@ -262,6 +274,7 @@ Format Options <tr> <td><h5>avro-confluent.url</h5></td> <td>required</td> + <td>yes</td> <td style="word-wrap: break-word;">(none)</td> <td>String</td> <td>The URL of the Confluent Schema Registry to fetch/register schemas.</td> diff --git a/docs/content/docs/connectors/table/formats/avro.md b/docs/content/docs/connectors/table/formats/avro.md index 341ca0a..601a9dc 100644 --- a/docs/content/docs/connectors/table/formats/avro.md +++ b/docs/content/docs/connectors/table/formats/avro.md @@ -65,15 +65,17 @@ Format Options <tr> <th class="text-left" style="width: 25%">Option</th> <th class="text-center" style="width: 8%">Required</th> + <th class="text-center" style="width: 8%">Forwarded</th> <th class="text-center" style="width: 7%">Default</th> <th class="text-center" style="width: 10%">Type</th> - <th class="text-center" style="width: 50%">Description</th> + <th class="text-center" style="width: 42%">Description</th> </tr> </thead> <tbody> <tr> <td><h5>format</h5></td> <td>required</td> + <td>no</td> <td style="word-wrap: break-word;">(none)</td> <td>String</td> <td>Specify what format to use, here should be <code>'avro'</code>.</td> @@ -81,6 +83,7 @@ Format Options <tr> <td><h5>avro.codec</h5></td> <td>optional</td> + <td>yes</td> <td style="word-wrap: break-word;">(none)</td> <td>String</td> <td>For <a href="{{< ref "docs/connectors/table/filesystem" >}}">Filesystem</a> only, the compression codec for avro. Snappy compression as default. The valid enumerations are: null, deflate, snappy, bzip2, xz.</td> diff --git a/flink-formats/flink-avro-confluent-registry/src/main/java/org/apache/flink/formats/avro/registry/confluent/RegistryAvroFormatFactory.java b/flink-formats/flink-avro-confluent-registry/src/main/java/org/apache/flink/formats/avro/registry/confluent/RegistryAvroFormatFactory.java index 659160b..4030168 100644 --- a/flink-formats/flink-avro-confluent-registry/src/main/java/org/apache/flink/formats/avro/registry/confluent/RegistryAvroFormatFactory.java +++ b/flink-formats/flink-avro-confluent-registry/src/main/java/org/apache/flink/formats/avro/registry/confluent/RegistryAvroFormatFactory.java @@ -52,6 +52,8 @@ import java.util.HashSet; import java.util.Map; import java.util.Optional; import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.Stream; import static org.apache.flink.formats.avro.registry.confluent.AvroConfluentFormatOptions.BASIC_AUTH_CREDENTIALS_SOURCE; import static org.apache.flink.formats.avro.registry.confluent.AvroConfluentFormatOptions.BASIC_AUTH_USER_INFO; @@ -175,6 +177,23 @@ public class RegistryAvroFormatFactory return options; } + @Override + public Set<ConfigOption<?>> forwardOptions() { + return Stream.of( + URL, + SUBJECT, + PROPERTIES, + SSL_KEYSTORE_LOCATION, + SSL_KEYSTORE_PASSWORD, + SSL_TRUSTSTORE_LOCATION, + SSL_TRUSTSTORE_PASSWORD, + BASIC_AUTH_CREDENTIALS_SOURCE, + BASIC_AUTH_USER_INFO, + BEARER_AUTH_CREDENTIALS_SOURCE, + BEARER_AUTH_TOKEN) + .collect(Collectors.toSet()); + } + public static @Nullable Map<String, String> buildOptionalPropertiesMap( ReadableConfig formatOptions) { final Map<String, String> properties = new HashMap<>(); diff --git a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroFileFormatFactory.java b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroFileFormatFactory.java index 630d990..e188405 100644 --- a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroFileFormatFactory.java +++ b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroFileFormatFactory.java @@ -116,6 +116,11 @@ public class AvroFileFormatFactory implements BulkReaderFormatFactory, BulkWrite return options; } + @Override + public Set<ConfigOption<?>> forwardOptions() { + return optionalOptions(); + } + private static class AvroGenericRecordBulkFormat extends AbstractAvroBulkFormat<GenericRecord, RowData, FileSourceSplit> {
