This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 8f77862fa5ecbec5ee26b7b2b68478ad50943a3e
Author: slinkydeveloper <[email protected]>
AuthorDate: Thu Jan 6 16:37:17 2022 +0100

    [FLINK-25391][format-avro] Forward catalog table options
---
 .../docs/connectors/table/formats/avro-confluent.md   | 15 ++++++++++++++-
 docs/content/docs/connectors/table/formats/avro.md    |  5 ++++-
 .../registry/confluent/RegistryAvroFormatFactory.java | 19 +++++++++++++++++++
 .../flink/formats/avro/AvroFileFormatFactory.java     |  5 +++++
 4 files changed, 42 insertions(+), 2 deletions(-)

diff --git a/docs/content/docs/connectors/table/formats/avro-confluent.md 
b/docs/content/docs/connectors/table/formats/avro-confluent.md
index cf2fe70..28b33da 100644
--- a/docs/content/docs/connectors/table/formats/avro-confluent.md
+++ b/docs/content/docs/connectors/table/formats/avro-confluent.md
@@ -176,15 +176,17 @@ Format Options
       <tr>
         <th class="text-left" style="width: 25%">Option</th>
         <th class="text-center" style="width: 8%">Required</th>
+        <th class="text-center" style="width: 8%">Forwarded</th>
         <th class="text-center" style="width: 7%">Default</th>
         <th class="text-center" style="width: 10%">Type</th>
-        <th class="text-center" style="width: 50%">Description</th>
+        <th class="text-center" style="width: 42%">Description</th>
       </tr>
     </thead>
     <tbody>
         <tr>
             <td><h5>format</h5></td>
             <td>required</td>
+            <td>no</td>
             <td style="word-wrap: break-word;">(none)</td>
             <td>String</td>
             <td>Specify what format to use, here should be 
<code>'avro-confluent'</code>.</td>
@@ -192,6 +194,7 @@ Format Options
         <tr>
             <td><h5>avro-confluent.basic-auth.credentials-source</h5></td>
             <td>optional</td>
+            <td>yes</td>
             <td style="word-wrap: break-word;">(none)</td>
             <td>String</td>
             <td>Basic auth credentials source for Schema Registry</td>
@@ -199,6 +202,7 @@ Format Options
         <tr>
             <td><h5>avro-confluent.basic-auth.user-info</h5></td>
             <td>optional</td>
+            <td>yes</td>
             <td style="word-wrap: break-word;">(none)</td>
             <td>String</td>
             <td>Basic auth user info for schema registry</td>
@@ -206,6 +210,7 @@ Format Options
         <tr>
             <td><h5>avro-confluent.bearer-auth.credentials-source</h5></td>
             <td>optional</td>
+            <td>yes</td>
             <td style="word-wrap: break-word;">(none)</td>
             <td>String</td>
             <td>Bearer auth credentials source for Schema Registry</td>
@@ -213,6 +218,7 @@ Format Options
         <tr>
             <td><h5>avro-confluent.bearer-auth.token</h5></td>
             <td>optional</td>
+            <td>yes</td>
             <td style="word-wrap: break-word;">(none)</td>
             <td>String</td>
             <td>Bearer auth token for Schema Registry</td>
@@ -220,6 +226,7 @@ Format Options
         <tr>
             <td><h5>avro-confluent.properties</h5></td>
             <td>optional</td>
+            <td>yes</td>
             <td style="word-wrap: break-word;">(none)</td>
             <td>Map</td>
             <td>Properties map that is forwarded to the underlying Schema 
Registry. This is useful for options that are not officially exposed via Flink 
config options. However, note that Flink options have higher precedence.</td>
@@ -227,6 +234,7 @@ Format Options
         <tr>
             <td><h5>avro-confluent.ssl.keystore.location</h5></td>
             <td>optional</td>
+            <td>yes</td>
             <td style="word-wrap: break-word;">(none)</td>
             <td>String</td>
             <td>Location / File of SSL keystore</td>
@@ -234,6 +242,7 @@ Format Options
         <tr>
             <td><h5>avro-confluent.ssl.keystore.password</h5></td>
             <td>optional</td>
+            <td>yes</td>
             <td style="word-wrap: break-word;">(none)</td>
             <td>String</td>
             <td>Password for SSL keystore</td>
@@ -241,6 +250,7 @@ Format Options
         <tr>
             <td><h5>avro-confluent.ssl.truststore.location</h5></td>
             <td>optional</td>
+            <td>yes</td>
             <td style="word-wrap: break-word;">(none)</td>
             <td>String</td>
             <td>Location / File of SSL truststore</td>
@@ -248,6 +258,7 @@ Format Options
         <tr>
             <td><h5>avro-confluent.ssl.truststore.password</h5></td>
             <td>optional</td>
+            <td>yes</td>
             <td style="word-wrap: break-word;">(none)</td>
             <td>String</td>
             <td>Password for SSL truststore</td>
@@ -255,6 +266,7 @@ Format Options
         <tr>
             <td><h5>avro-confluent.subject</h5></td>
             <td>optional</td>
+            <td>yes</td>
             <td style="word-wrap: break-word;">(none)</td>
             <td>String</td>
             <td>The Confluent Schema Registry subject under which to register 
the schema used by this format during serialization. By default, 'kafka' and 
'upsert-kafka' connectors use '&lt;topic_name&gt;-value' or 
'&lt;topic_name&gt;-key' as the default subject name if this format is used as 
the value or key format. But for other connectors (e.g. 'filesystem'), the 
subject option is required when used as sink.</td>
@@ -262,6 +274,7 @@ Format Options
         <tr>
             <td><h5>avro-confluent.url</h5></td>
             <td>required</td>
+            <td>yes</td>
             <td style="word-wrap: break-word;">(none)</td>
             <td>String</td>
             <td>The URL of the Confluent Schema Registry to fetch/register 
schemas.</td>
diff --git a/docs/content/docs/connectors/table/formats/avro.md 
b/docs/content/docs/connectors/table/formats/avro.md
index 341ca0a..601a9dc 100644
--- a/docs/content/docs/connectors/table/formats/avro.md
+++ b/docs/content/docs/connectors/table/formats/avro.md
@@ -65,15 +65,17 @@ Format Options
       <tr>
         <th class="text-left" style="width: 25%">Option</th>
         <th class="text-center" style="width: 8%">Required</th>
+        <th class="text-center" style="width: 8%">Forwarded</th>
         <th class="text-center" style="width: 7%">Default</th>
         <th class="text-center" style="width: 10%">Type</th>
-        <th class="text-center" style="width: 50%">Description</th>
+        <th class="text-center" style="width: 42%">Description</th>
       </tr>
     </thead>
     <tbody>
     <tr>
       <td><h5>format</h5></td>
       <td>required</td>
+      <td>no</td>
       <td style="word-wrap: break-word;">(none)</td>
       <td>String</td>
       <td>Specify what format to use, here should be <code>'avro'</code>.</td>
@@ -81,6 +83,7 @@ Format Options
     <tr>
       <td><h5>avro.codec</h5></td>
       <td>optional</td>
+      <td>yes</td>
       <td style="word-wrap: break-word;">(none)</td>
       <td>String</td>
       <td>For <a href="{{< ref "docs/connectors/table/filesystem" 
>}}">Filesystem</a> only, the compression codec for avro. Snappy compression as 
default. The valid enumerations are: null, deflate, snappy, bzip2, xz.</td>
diff --git 
a/flink-formats/flink-avro-confluent-registry/src/main/java/org/apache/flink/formats/avro/registry/confluent/RegistryAvroFormatFactory.java
 
b/flink-formats/flink-avro-confluent-registry/src/main/java/org/apache/flink/formats/avro/registry/confluent/RegistryAvroFormatFactory.java
index 659160b..4030168 100644
--- 
a/flink-formats/flink-avro-confluent-registry/src/main/java/org/apache/flink/formats/avro/registry/confluent/RegistryAvroFormatFactory.java
+++ 
b/flink-formats/flink-avro-confluent-registry/src/main/java/org/apache/flink/formats/avro/registry/confluent/RegistryAvroFormatFactory.java
@@ -52,6 +52,8 @@ import java.util.HashSet;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 import static 
org.apache.flink.formats.avro.registry.confluent.AvroConfluentFormatOptions.BASIC_AUTH_CREDENTIALS_SOURCE;
 import static 
org.apache.flink.formats.avro.registry.confluent.AvroConfluentFormatOptions.BASIC_AUTH_USER_INFO;
@@ -175,6 +177,23 @@ public class RegistryAvroFormatFactory
         return options;
     }
 
+    @Override
+    public Set<ConfigOption<?>> forwardOptions() {
+        return Stream.of(
+                        URL,
+                        SUBJECT,
+                        PROPERTIES,
+                        SSL_KEYSTORE_LOCATION,
+                        SSL_KEYSTORE_PASSWORD,
+                        SSL_TRUSTSTORE_LOCATION,
+                        SSL_TRUSTSTORE_PASSWORD,
+                        BASIC_AUTH_CREDENTIALS_SOURCE,
+                        BASIC_AUTH_USER_INFO,
+                        BEARER_AUTH_CREDENTIALS_SOURCE,
+                        BEARER_AUTH_TOKEN)
+                .collect(Collectors.toSet());
+    }
+
     public static @Nullable Map<String, String> buildOptionalPropertiesMap(
             ReadableConfig formatOptions) {
         final Map<String, String> properties = new HashMap<>();
diff --git 
a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroFileFormatFactory.java
 
b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroFileFormatFactory.java
index 630d990..e188405 100644
--- 
a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroFileFormatFactory.java
+++ 
b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroFileFormatFactory.java
@@ -116,6 +116,11 @@ public class AvroFileFormatFactory implements 
BulkReaderFormatFactory, BulkWrite
         return options;
     }
 
+    @Override
+    public Set<ConfigOption<?>> forwardOptions() {
+        return optionalOptions();
+    }
+
     private static class AvroGenericRecordBulkFormat
             extends AbstractAvroBulkFormat<GenericRecord, RowData, 
FileSourceSplit> {
 

Reply via email to