twalthr commented on a change in pull request #18283:
URL: https://github.com/apache/flink/pull/18283#discussion_r779547250
##########
File path:
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaConnectorOptionsUtil.java
##########
@@ -519,6 +519,7 @@ private static boolean hasKafkaClientProperties(Map<String,
String> tableOptions
return new FactoryUtil.DefaultDynamicTableContext(
context.getObjectIdentifier(),
context.getCatalogTable().copy(newOptions),
Review comment:
side note: what is this method doing? could it be a problem for the
upgrade story?
##########
File path:
flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FormatFactory.java
##########
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.factories;
+
+import org.apache.flink.configuration.ConfigOption;
+
+import java.util.Collections;
+import java.util.Set;
+
+/** Base interface for {@link DecodingFormatFactory} and {@link
EncodingFormatFactory}. */
+public interface FormatFactory extends Factory {
Review comment:
add annotation
##########
File path:
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactory.java
##########
@@ -148,7 +148,15 @@ public String factoryIdentifier() {
public DynamicTableSource createDynamicTableSource(Context context) {
final TableFactoryHelper helper =
FactoryUtil.createTableFactoryHelper(this, context);
- final ReadableConfig tableOptions = helper.getOptions();
+ final ReadableConfig tableOptions =
+ helper.forwardOptions(
+ TOPIC,
+ TOPIC_PATTERN,
+ SCAN_STARTUP_MODE,
+ SCAN_STARTUP_SPECIFIC_OFFSETS,
+ SCAN_TOPIC_PARTITION_DISCOVERY,
+ SCAN_STARTUP_TIMESTAMP_MILLIS)
+ .getOptions();
Review comment:
let's call getOptions() after calling `validate`. maybe we should not
offer them as a return type of `forwardOptions`.
##########
File path:
flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FactoryUtil.java
##########
@@ -1035,8 +1124,62 @@ private String formatPrefix(Factory formatFactory,
ConfigOption<String> formatOp
return getFormatPrefix(formatOption, identifier);
}
- private ReadableConfig projectOptions(String formatPrefix) {
- return new DelegatingConfiguration(allOptions, formatPrefix);
+ @SuppressWarnings({"unchecked"})
+ private ReadableConfig createFormatOptions(
+ String formatPrefix, FormatFactory formatFactory) {
+ Set<ConfigOption<?>> mergeableConfigOptions =
formatFactory.mergeableOptions();
+ Configuration formatConf = new DelegatingConfiguration(allOptions,
formatPrefix);
+ if (mergeableConfigOptions.isEmpty()) {
+ return formatConf;
+ }
+
+ Configuration formatConfFromMergeableOptions =
+ new DelegatingConfiguration(mergeableOptions,
formatPrefix);
+
+ for (ConfigOption<?> option : mergeableConfigOptions) {
+ formatConfFromMergeableOptions
+ .getOptional(option)
+ .ifPresent(o -> formatConf.set((ConfigOption<? super
Object>) option, o));
+ }
+
+ return formatConf;
+ }
+
+ /**
+ * This function assumes that the format config is used only and only
if the original
+ * configuration contains the format config option. It will fail if
there is a mismatch of
+ * the identifier between the format in originalMap and the one in
catalog.
+ */
+ private void checkFormatIdentifierMatchesWithMergeableOptions(
+ ConfigOption<String> formatOption,
+ String resolvedIdentifierFromContextResolvedCatalogTable) {
+ Optional<String> identifierFromMergeableOptions =
+ mergeableOptions.getOptional(formatOption);
+
+ if (!identifierFromMergeableOptions.isPresent()) {
+ return;
+ }
+
+ if (resolvedIdentifierFromContextResolvedCatalogTable == null) {
+ throw new ValidationException(
+ String.format(
+ "The persisted plan has no format option '%s'
specified, while the catalog table has it with value '%s'. "
Review comment:
also:
```
while the enriching catalog table has it with
```
##########
File path:
flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FactoryUtil.java
##########
@@ -1035,8 +1124,62 @@ private String formatPrefix(Factory formatFactory,
ConfigOption<String> formatOp
return getFormatPrefix(formatOption, identifier);
}
- private ReadableConfig projectOptions(String formatPrefix) {
- return new DelegatingConfiguration(allOptions, formatPrefix);
+ @SuppressWarnings({"unchecked"})
+ private ReadableConfig createFormatOptions(
+ String formatPrefix, FormatFactory formatFactory) {
+ Set<ConfigOption<?>> mergeableConfigOptions =
formatFactory.mergeableOptions();
+ Configuration formatConf = new DelegatingConfiguration(allOptions,
formatPrefix);
+ if (mergeableConfigOptions.isEmpty()) {
+ return formatConf;
+ }
+
+ Configuration formatConfFromMergeableOptions =
+ new DelegatingConfiguration(mergeableOptions,
formatPrefix);
+
+ for (ConfigOption<?> option : mergeableConfigOptions) {
+ formatConfFromMergeableOptions
+ .getOptional(option)
+ .ifPresent(o -> formatConf.set((ConfigOption<? super
Object>) option, o));
+ }
+
+ return formatConf;
+ }
+
+ /**
+ * This function assumes that the format config is used only and only
if the original
+ * configuration contains the format config option. It will fail if
there is a mismatch of
+ * the identifier between the format in originalMap and the one in
catalog.
Review comment:
what is `originalMap`?
##########
File path:
flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FactoryUtil.java
##########
@@ -1035,8 +1124,62 @@ private String formatPrefix(Factory formatFactory,
ConfigOption<String> formatOp
return getFormatPrefix(formatOption, identifier);
}
- private ReadableConfig projectOptions(String formatPrefix) {
- return new DelegatingConfiguration(allOptions, formatPrefix);
+ @SuppressWarnings({"unchecked"})
+ private ReadableConfig createFormatOptions(
+ String formatPrefix, FormatFactory formatFactory) {
+ Set<ConfigOption<?>> mergeableConfigOptions =
formatFactory.mergeableOptions();
+ Configuration formatConf = new DelegatingConfiguration(allOptions,
formatPrefix);
+ if (mergeableConfigOptions.isEmpty()) {
+ return formatConf;
+ }
+
+ Configuration formatConfFromMergeableOptions =
+ new DelegatingConfiguration(mergeableOptions,
formatPrefix);
+
+ for (ConfigOption<?> option : mergeableConfigOptions) {
+ formatConfFromMergeableOptions
+ .getOptional(option)
+ .ifPresent(o -> formatConf.set((ConfigOption<? super
Object>) option, o));
+ }
+
+ return formatConf;
+ }
+
+ /**
+ * This function assumes that the format config is used only and only
if the original
+ * configuration contains the format config option. It will fail if
there is a mismatch of
+ * the identifier between the format in originalMap and the one in
catalog.
+ */
+ private void checkFormatIdentifierMatchesWithMergeableOptions(
+ ConfigOption<String> formatOption,
+ String resolvedIdentifierFromContextResolvedCatalogTable) {
+ Optional<String> identifierFromMergeableOptions =
+ mergeableOptions.getOptional(formatOption);
+
+ if (!identifierFromMergeableOptions.isPresent()) {
+ return;
+ }
+
+ if (resolvedIdentifierFromContextResolvedCatalogTable == null) {
+ throw new ValidationException(
+ String.format(
+ "The persisted plan has no format option '%s'
specified, while the catalog table has it with value '%s'. "
+ + "This is invalid, as either only the
persisted plan table defines the format, "
+ + "or both the persisted plan table
and the catalog table defines the same format",
Review comment:
nit: dot at the end
##########
File path:
flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonFormatFactory.java
##########
@@ -151,4 +151,15 @@ public String factoryIdentifier() {
options.add(ENCODE_DECIMAL_AS_PLAIN_NUMBER);
return options;
}
+
+ @Override
+ public Set<ConfigOption<?>> mergeableOptions() {
Review comment:
Call this `forwardOptions` similar to helper? Because those are not only
mergeable but will be forwarded.
##########
File path:
flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FactoryUtil.java
##########
@@ -1035,8 +1124,62 @@ private String formatPrefix(Factory formatFactory,
ConfigOption<String> formatOp
return getFormatPrefix(formatOption, identifier);
}
- private ReadableConfig projectOptions(String formatPrefix) {
- return new DelegatingConfiguration(allOptions, formatPrefix);
+ @SuppressWarnings({"unchecked"})
+ private ReadableConfig createFormatOptions(
+ String formatPrefix, FormatFactory formatFactory) {
+ Set<ConfigOption<?>> mergeableConfigOptions =
formatFactory.mergeableOptions();
+ Configuration formatConf = new DelegatingConfiguration(allOptions,
formatPrefix);
+ if (mergeableConfigOptions.isEmpty()) {
+ return formatConf;
+ }
+
+ Configuration formatConfFromMergeableOptions =
+ new DelegatingConfiguration(mergeableOptions,
formatPrefix);
+
+ for (ConfigOption<?> option : mergeableConfigOptions) {
+ formatConfFromMergeableOptions
+ .getOptional(option)
+ .ifPresent(o -> formatConf.set((ConfigOption<? super
Object>) option, o));
+ }
+
+ return formatConf;
+ }
+
+ /**
+ * This function assumes that the format config is used only and only
if the original
+ * configuration contains the format config option. It will fail if
there is a mismatch of
+ * the identifier between the format in originalMap and the one in
catalog.
+ */
+ private void checkFormatIdentifierMatchesWithMergeableOptions(
+ ConfigOption<String> formatOption,
+ String resolvedIdentifierFromContextResolvedCatalogTable) {
+ Optional<String> identifierFromMergeableOptions =
+ mergeableOptions.getOptional(formatOption);
+
+ if (!identifierFromMergeableOptions.isPresent()) {
+ return;
+ }
+
+ if (resolvedIdentifierFromContextResolvedCatalogTable == null) {
+ throw new ValidationException(
+ String.format(
+ "The persisted plan has no format option '%s'
specified, while the catalog table has it with value '%s'. "
+ + "This is invalid, as either only the
persisted plan table defines the format, "
+ + "or both the persisted plan table
and the catalog table defines the same format",
Review comment:
also:
```
and enriching catalog table defines...
```
##########
File path:
flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FactoryUtil.java
##########
@@ -1035,8 +1124,62 @@ private String formatPrefix(Factory formatFactory,
ConfigOption<String> formatOp
return getFormatPrefix(formatOption, identifier);
}
- private ReadableConfig projectOptions(String formatPrefix) {
- return new DelegatingConfiguration(allOptions, formatPrefix);
+ @SuppressWarnings({"unchecked"})
+ private ReadableConfig createFormatOptions(
+ String formatPrefix, FormatFactory formatFactory) {
+ Set<ConfigOption<?>> mergeableConfigOptions =
formatFactory.mergeableOptions();
+ Configuration formatConf = new DelegatingConfiguration(allOptions,
formatPrefix);
+ if (mergeableConfigOptions.isEmpty()) {
+ return formatConf;
+ }
+
+ Configuration formatConfFromMergeableOptions =
+ new DelegatingConfiguration(mergeableOptions,
formatPrefix);
+
+ for (ConfigOption<?> option : mergeableConfigOptions) {
+ formatConfFromMergeableOptions
+ .getOptional(option)
+ .ifPresent(o -> formatConf.set((ConfigOption<? super
Object>) option, o));
+ }
+
+ return formatConf;
+ }
+
+ /**
+ * This function assumes that the format config is used only and only
if the original
+ * configuration contains the format config option. It will fail if
there is a mismatch of
+ * the identifier between the format in originalMap and the one in
catalog.
+ */
+ private void checkFormatIdentifierMatchesWithMergeableOptions(
+ ConfigOption<String> formatOption,
+ String resolvedIdentifierFromContextResolvedCatalogTable) {
+ Optional<String> identifierFromMergeableOptions =
+ mergeableOptions.getOptional(formatOption);
+
+ if (!identifierFromMergeableOptions.isPresent()) {
+ return;
+ }
+
+ if (resolvedIdentifierFromContextResolvedCatalogTable == null) {
+ throw new ValidationException(
+ String.format(
+ "The persisted plan has no format option '%s'
specified, while the catalog table has it with value '%s'. "
Review comment:
Let's synchronize the variable names with the exception.
`resolvedIdentifierFromContextResolvedCatalogTable` -> `identifierFromPlan`?
##########
File path:
flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FactoryUtil.java
##########
@@ -1035,8 +1124,62 @@ private String formatPrefix(Factory formatFactory,
ConfigOption<String> formatOp
return getFormatPrefix(formatOption, identifier);
}
- private ReadableConfig projectOptions(String formatPrefix) {
- return new DelegatingConfiguration(allOptions, formatPrefix);
+ @SuppressWarnings({"unchecked"})
+ private ReadableConfig createFormatOptions(
+ String formatPrefix, FormatFactory formatFactory) {
+ Set<ConfigOption<?>> mergeableConfigOptions =
formatFactory.mergeableOptions();
+ Configuration formatConf = new DelegatingConfiguration(allOptions,
formatPrefix);
+ if (mergeableConfigOptions.isEmpty()) {
+ return formatConf;
+ }
+
+ Configuration formatConfFromMergeableOptions =
+ new DelegatingConfiguration(mergeableOptions,
formatPrefix);
+
+ for (ConfigOption<?> option : mergeableConfigOptions) {
+ formatConfFromMergeableOptions
+ .getOptional(option)
+ .ifPresent(o -> formatConf.set((ConfigOption<? super
Object>) option, o));
+ }
+
+ return formatConf;
+ }
+
+ /**
+ * This function assumes that the format config is used only and only
if the original
+ * configuration contains the format config option. It will fail if
there is a mismatch of
+ * the identifier between the format in originalMap and the one in
catalog.
+ */
+ private void checkFormatIdentifierMatchesWithMergeableOptions(
+ ConfigOption<String> formatOption,
+ String resolvedIdentifierFromContextResolvedCatalogTable) {
+ Optional<String> identifierFromMergeableOptions =
+ mergeableOptions.getOptional(formatOption);
+
+ if (!identifierFromMergeableOptions.isPresent()) {
+ return;
+ }
+
+ if (resolvedIdentifierFromContextResolvedCatalogTable == null) {
+ throw new ValidationException(
+ String.format(
+ "The persisted plan has no format option '%s'
specified, while the catalog table has it with value '%s'. "
Review comment:
also:
```
while the enriching table has it with
```
##########
File path:
flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FactoryUtil.java
##########
@@ -1035,8 +1124,62 @@ private String formatPrefix(Factory formatFactory,
ConfigOption<String> formatOp
return getFormatPrefix(formatOption, identifier);
}
- private ReadableConfig projectOptions(String formatPrefix) {
- return new DelegatingConfiguration(allOptions, formatPrefix);
+ @SuppressWarnings({"unchecked"})
+ private ReadableConfig createFormatOptions(
+ String formatPrefix, FormatFactory formatFactory) {
+ Set<ConfigOption<?>> mergeableConfigOptions =
formatFactory.mergeableOptions();
+ Configuration formatConf = new DelegatingConfiguration(allOptions,
formatPrefix);
+ if (mergeableConfigOptions.isEmpty()) {
+ return formatConf;
+ }
+
+ Configuration formatConfFromMergeableOptions =
+ new DelegatingConfiguration(mergeableOptions,
formatPrefix);
+
+ for (ConfigOption<?> option : mergeableConfigOptions) {
+ formatConfFromMergeableOptions
+ .getOptional(option)
+ .ifPresent(o -> formatConf.set((ConfigOption<? super
Object>) option, o));
+ }
+
+ return formatConf;
+ }
+
+ /**
+ * This function assumes that the format config is used only and only
if the original
+ * configuration contains the format config option. It will fail if
there is a mismatch of
+ * the identifier between the format in originalMap and the one in
catalog.
+ */
+ private void checkFormatIdentifierMatchesWithMergeableOptions(
+ ConfigOption<String> formatOption,
+ String resolvedIdentifierFromContextResolvedCatalogTable) {
+ Optional<String> identifierFromMergeableOptions =
+ mergeableOptions.getOptional(formatOption);
+
+ if (!identifierFromMergeableOptions.isPresent()) {
+ return;
+ }
+
+ if (resolvedIdentifierFromContextResolvedCatalogTable == null) {
+ throw new ValidationException(
+ String.format(
+ "The persisted plan has no format option '%s'
specified, while the catalog table has it with value '%s'. "
+ + "This is invalid, as either only the
persisted plan table defines the format, "
+ + "or both the persisted plan table
and the catalog table defines the same format",
Review comment:
also:
```
and enriching table defines...
```
##########
File path:
flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FactoryUtil.java
##########
@@ -1035,8 +1124,62 @@ private String formatPrefix(Factory formatFactory,
ConfigOption<String> formatOp
return getFormatPrefix(formatOption, identifier);
}
- private ReadableConfig projectOptions(String formatPrefix) {
- return new DelegatingConfiguration(allOptions, formatPrefix);
+ @SuppressWarnings({"unchecked"})
+ private ReadableConfig createFormatOptions(
+ String formatPrefix, FormatFactory formatFactory) {
+ Set<ConfigOption<?>> mergeableConfigOptions =
formatFactory.mergeableOptions();
+ Configuration formatConf = new DelegatingConfiguration(allOptions,
formatPrefix);
+ if (mergeableConfigOptions.isEmpty()) {
+ return formatConf;
+ }
+
+ Configuration formatConfFromMergeableOptions =
+ new DelegatingConfiguration(mergeableOptions,
formatPrefix);
+
+ for (ConfigOption<?> option : mergeableConfigOptions) {
+ formatConfFromMergeableOptions
+ .getOptional(option)
+ .ifPresent(o -> formatConf.set((ConfigOption<? super
Object>) option, o));
+ }
+
+ return formatConf;
+ }
+
+ /**
+ * This function assumes that the format config is used only and only
if the original
+ * configuration contains the format config option. It will fail if
there is a mismatch of
+ * the identifier between the format in originalMap and the one in
catalog.
+ */
+ private void checkFormatIdentifierMatchesWithMergeableOptions(
+ ConfigOption<String> formatOption,
+ String resolvedIdentifierFromContextResolvedCatalogTable) {
+ Optional<String> identifierFromMergeableOptions =
+ mergeableOptions.getOptional(formatOption);
+
+ if (!identifierFromMergeableOptions.isPresent()) {
+ return;
+ }
+
+ if (resolvedIdentifierFromContextResolvedCatalogTable == null) {
+ throw new ValidationException(
+ String.format(
+ "The persisted plan has no format option '%s'
specified, while the catalog table has it with value '%s'. "
Review comment:
+1
##########
File path:
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactory.java
##########
@@ -148,6 +148,14 @@ public String factoryIdentifier() {
public DynamicTableSource createDynamicTableSource(Context context) {
final TableFactoryHelper helper =
FactoryUtil.createTableFactoryHelper(this, context);
+ final Optional<DecodingFormat<DeserializationSchema<RowData>>>
keyDecodingFormat =
+ getKeyDecodingFormat(helper);
+
+ final DecodingFormat<DeserializationSchema<RowData>>
valueDecodingFormat =
+ getValueDecodingFormat(helper);
+
+ helper.validateExcept(PROPERTIES_PREFIX);
Review comment:
This would validate the non-enriched options first. Then you merge but
no validation happens again.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]