This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 7455e45ff [flink] LogStoreTableFactory should not implements
DynamicTableFactory (#771)
7455e45ff is described below
commit 7455e45ff99e916fbf279a7efae8b71987d293c2
Author: HZY <[email protected]>
AuthorDate: Mon Apr 3 14:11:24 2023 +0800
[flink] LogStoreTableFactory should not implements DynamicTableFactory
(#771)
---
docs/content/concepts/external-log-systems.md | 17 +
.../paimon/flink/factories/FlinkFactoryUtil.java | 500 +++++++++++++++++++++
.../flink/factories/LogStoreFactoryUtil.java | 136 ++++++
.../paimon/flink/kafka/KafkaLogStoreFactory.java | 22 +-
.../paimon/flink/log/LogStoreTableFactory.java | 29 +-
.../org.apache.flink.table.factories.Factory | 1 -
...g.apache.paimon.flink.log.LogStoreTableFactory} | 2 -
7 files changed, 678 insertions(+), 29 deletions(-)
diff --git a/docs/content/concepts/external-log-systems.md
b/docs/content/concepts/external-log-systems.md
index a7f6435d1..38dd18e97 100644
--- a/docs/content/concepts/external-log-systems.md
+++ b/docs/content/concepts/external-log-systems.md
@@ -42,6 +42,23 @@ If `'log.consistency' = 'eventual'` is set, in order to
achieve correct results,
### Kafka
+#### Preparing flink-sql-connector-kafka Jar File
+
+Paimon currently supports Flink 1.16, 1.15 and 1.14. We recommend the latest
Flink version for a better experience.
+
+Download the flink-sql-connector-kafka jar file with corresponding version.
+
+{{< stable >}}
+
+| Version | Jar
|
+|---|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| Flink 1.16 |
[flink-sql-connector-kafka-1.16.1.jar](https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-kafka/1.16.1/flink-sql-connector-kafka-1.16.0.jar)
|
+| Flink 1.15 |
[flink-sql-connector-kafka-1.15.4.jar](https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-kafka/1.15.4/flink-sql-connector-kafka-1.15.4.jar)
|
+| Flink 1.14 |
[flink-sql-connector-kafka_2.11-1.14.4.jar](https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-kafka_2.11/1.14.4/flink-sql-connector-kafka_2.11-1.14.4.jar)
|
+
+{{< /stable >}}
+
+
By specifying `'log.system' = 'kafka'`, users can write changes into Kafka
along with table files.
{{< tabs "kafka-example" >}}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/factories/FlinkFactoryUtil.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/factories/FlinkFactoryUtil.java
new file mode 100644
index 000000000..87100ff0e
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/factories/FlinkFactoryUtil.java
@@ -0,0 +1,500 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.factories;
+
+import org.apache.paimon.flink.log.LogStoreTableFactory;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.DelegatingConfiguration;
+import org.apache.flink.configuration.FallbackKey;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.connector.format.DecodingFormat;
+import org.apache.flink.table.connector.format.EncodingFormat;
+import org.apache.flink.table.factories.DecodingFormatFactory;
+import org.apache.flink.table.factories.DynamicTableFactory;
+import org.apache.flink.table.factories.DynamicTableFactory.Context;
+import org.apache.flink.table.factories.EncodingFormatFactory;
+import org.apache.flink.table.factories.Factory;
+import org.apache.flink.table.factories.FormatFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.ServiceLoader;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import java.util.stream.StreamSupport;
+
+import static org.apache.flink.configuration.ConfigurationUtils.canBePrefixMap;
+import static
org.apache.flink.configuration.ConfigurationUtils.filterPrefixMapKey;
+import static
org.apache.flink.table.factories.ManagedTableFactory.DEFAULT_IDENTIFIER;
+
+/** Utility for working with {@link Factory}s. */
+public final class FlinkFactoryUtil {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(FlinkFactoryUtil.class);
+
+ public static final ConfigOption<String> FORMAT =
+ ConfigOptions.key("format")
+ .stringType()
+ .noDefaultValue()
+ .withDescription(
+ "Defines the format identifier for encoding data. "
+ + "The identifier is used to discover a
suitable format factory.");
+
+ /**
+ * Suffix for keys of {@link ConfigOption} in case a connector requires
multiple formats (e.g.
+ * for both key and value).
+ *
+ * <p>See {@link #createFlinkTableFactoryHelper(LogStoreTableFactory,
Context)} Context)} for
+ * more information.
+ */
+ public static final String FORMAT_SUFFIX = ".format";
+
+ /**
+ * Creates a utility that helps in discovering formats, merging options
with {@link
+ * DynamicTableFactory.Context#getEnrichmentOptions()} and validating them
all for a {@link
+ * LogStoreTableFactory}.
+ *
+ * <p>The following example sketches the usage:
+ *
+ * <pre>{@code
+ * // in createDynamicTableSource()
+ * helper = FlinkFactoryUtil.createFlinkTableFactoryHelper(this, context);
+ *
+ * keyFormat =
helper.discoverDecodingFormat(DeserializationFormatFactory.class, KEY_FORMAT);
+ * valueFormat =
helper.discoverDecodingFormat(DeserializationFormatFactory.class, VALUE_FORMAT);
+ *
+ * helper.validate();
+ *
+ * ... // construct connector with discovered formats
+ * }</pre>
+ */
+ public static FlinkTableFactoryHelper createFlinkTableFactoryHelper(
+ LogStoreTableFactory factory, DynamicTableFactory.Context context)
{
+ return new FlinkTableFactoryHelper(factory, context);
+ }
+
+ /** Discovers a flink Factory using the given factory base class and
identifier. */
+ @SuppressWarnings("unchecked")
+ public static <T extends Factory> T discoverFlinkFactory(
+ ClassLoader classLoader, Class<T> factoryClass, String
factoryIdentifier) {
+ final List<Factory> factories = discoverFlinkFactories(classLoader);
+
+ final List<Factory> foundFactories =
+ factories.stream()
+ .filter(f ->
factoryClass.isAssignableFrom(f.getClass()))
+ .collect(Collectors.toList());
+
+ if (foundFactories.isEmpty()) {
+ throw new ValidationException(
+ String.format(
+ "Could not find any factories that implement '%s'
in the classpath.",
+ factoryClass.getName()));
+ }
+
+ final List<Factory> matchingFactories =
+ foundFactories.stream()
+ .filter(f ->
f.factoryIdentifier().equals(factoryIdentifier))
+ .collect(Collectors.toList());
+
+ if (matchingFactories.isEmpty()) {
+ throw new ValidationException(
+ String.format(
+ "Could not find any factory for identifier '%s'
that implements '%s' in the classpath.\n\n"
+ + "Available factory identifiers are:\n\n"
+ + "%s",
+ factoryIdentifier,
+ factoryClass.getName(),
+ foundFactories.stream()
+ .map(Factory::factoryIdentifier)
+ .filter(identifier ->
!DEFAULT_IDENTIFIER.equals(identifier))
+ .distinct()
+ .sorted()
+ .collect(Collectors.joining("\n"))));
+ }
+ if (matchingFactories.size() > 1) {
+ throw new ValidationException(
+ String.format(
+ "Multiple factories for identifier '%s' that
implement '%s' found in the classpath.\n\n"
+ + "Ambiguous factory classes are:\n\n"
+ + "%s",
+ factoryIdentifier,
+ factoryClass.getName(),
+ matchingFactories.stream()
+ .map(f -> f.getClass().getName())
+ .sorted()
+ .collect(Collectors.joining("\n"))));
+ }
+
+ return (T) matchingFactories.get(0);
+ }
+
+ /** Returns the required option prefix for options of the given format. */
+ public static String getFormatPrefix(
+ ConfigOption<String> formatOption, String formatIdentifier) {
+ final String formatOptionKey = formatOption.key();
+ if (formatOptionKey.equals(FORMAT.key())) {
+ return formatIdentifier + ".";
+ } else if (formatOptionKey.endsWith(FORMAT_SUFFIX)) {
+ // extract the key prefix, e.g. extract 'key' from 'key.format'
+ String keyPrefix =
+ formatOptionKey.substring(0, formatOptionKey.length() -
FORMAT_SUFFIX.length());
+ return keyPrefix + "." + formatIdentifier + ".";
+ } else {
+ throw new ValidationException(
+ "Format identifier key should be 'format' or suffix with
'.format', "
+ + "don't support format identifier key '"
+ + formatOptionKey
+ + "'.");
+ }
+ }
+
+ //
--------------------------------------------------------------------------------------------
+ // Helper methods
+ //
--------------------------------------------------------------------------------------------
+
+ static List<Factory> discoverFlinkFactories(ClassLoader classLoader) {
+ final Iterator<Factory> serviceLoaderIterator =
+ ServiceLoader.load(Factory.class, classLoader).iterator();
+
+ final List<Factory> loadResults = new ArrayList<>();
+ while (true) {
+ try {
+ // error handling should also be applied to the hasNext() call
because service
+ // loading might cause problems here as well
+ if (!serviceLoaderIterator.hasNext()) {
+ break;
+ }
+
+ loadResults.add(serviceLoaderIterator.next());
+ } catch (Throwable t) {
+ if (t instanceof NoClassDefFoundError) {
+ LOG.debug(
+ "NoClassDefFoundError when loading a "
+ +
LogStoreTableFactory.class.getCanonicalName()
+ + ". This is expected when trying to load
a format dependency but no flink-connector-files is loaded.",
+ t);
+ } else {
+ throw new TableException(
+ "Unexpected error when trying to load service
provider.", t);
+ }
+ }
+ }
+
+ return loadResults;
+ }
+
+ private static Set<String> allKeysExpanded(ConfigOption<?> option,
Set<String> actualKeys) {
+ return allKeysExpanded("", option, actualKeys);
+ }
+
+ private static Set<String> allKeysExpanded(
+ String prefix, ConfigOption<?> option, Set<String> actualKeys) {
+ final Set<String> staticKeys =
+ allKeys(option).map(k -> prefix +
k).collect(Collectors.toSet());
+ if (!canBePrefixMap(option)) {
+ return staticKeys;
+ }
+ // include all prefix keys of a map option by considering the actually
provided keys
+ return Stream.concat(
+ staticKeys.stream(),
+ staticKeys.stream()
+ .flatMap(
+ k ->
+ actualKeys.stream()
+ .filter(c ->
filterPrefixMapKey(k, c))))
+ .collect(Collectors.toSet());
+ }
+
+ private static Stream<String> allKeys(ConfigOption<?> option) {
+ return Stream.concat(Stream.of(option.key()), fallbackKeys(option));
+ }
+
+ private static Stream<String> fallbackKeys(ConfigOption<?> option) {
+ return StreamSupport.stream(option.fallbackKeys().spliterator(), false)
+ .map(FallbackKey::getKey);
+ }
+
+ private static Stream<String> deprecatedKeys(ConfigOption<?> option) {
+ return StreamSupport.stream(option.fallbackKeys().spliterator(), false)
+ .filter(FallbackKey::isDeprecated)
+ .map(FallbackKey::getKey);
+ }
+
+ /** Base flink helper utility for validating all options for a {@link
LogStoreTableFactory}. */
+ public static class FlinkFactoryHelper<F extends LogStoreTableFactory> {
+
+ protected final F factory;
+
+ protected final Configuration allOptions;
+
+ protected final Set<String> consumedOptionKeys;
+
+ protected final Set<String> deprecatedOptionKeys;
+
+ public FlinkFactoryHelper(
+ F factory, Map<String, String> configuration,
ConfigOption<?>... implicitOptions) {
+ this.factory = factory;
+ this.allOptions = Configuration.fromMap(configuration);
+
+ final List<ConfigOption<?>> consumedOptions = new ArrayList<>();
+ consumedOptions.addAll(Arrays.asList(implicitOptions));
+
+ consumedOptionKeys =
+ consumedOptions.stream()
+ .flatMap(
+ option -> allKeysExpanded(option,
allOptions.keySet()).stream())
+ .collect(Collectors.toSet());
+
+ deprecatedOptionKeys =
+ consumedOptions.stream()
+ .flatMap(FlinkFactoryUtil::deprecatedKeys)
+ .collect(Collectors.toSet());
+ }
+
+ /** Returns all options currently being consumed by the factory. */
+ public ReadableConfig getOptions() {
+ return allOptions;
+ }
+ }
+
+ /**
+ * Helper utility for discovering formats and validating all options for a
{@link
+ * DynamicTableFactory}.
+ *
+ * @see #createFlinkTableFactoryHelper(LogStoreTableFactory, Context)
+ */
+ public static class FlinkTableFactoryHelper extends
FlinkFactoryHelper<LogStoreTableFactory> {
+
+ private final Context context;
+
+ private final Configuration enrichingOptions;
+
+ private FlinkTableFactoryHelper(LogStoreTableFactory tableFactory,
Context context) {
+ super(tableFactory, context.getCatalogTable().getOptions());
+ this.context = context;
+ this.enrichingOptions =
Configuration.fromMap(context.getEnrichmentOptions());
+ }
+
+ /**
+ * Returns all options currently being consumed by the factory. This
method returns the
+ * options already merged with {@link Context#getEnrichmentOptions()},
using {@link
+ * DynamicTableFactory#forwardOptions()} as reference of mergeable
options.
+ */
+ @Override
+ public ReadableConfig getOptions() {
+ return super.getOptions();
+ }
+
+ /**
+ * Discovers a {@link DecodingFormat} of the given type using the
given option as factory
+ * identifier.
+ */
+ public <I, F extends DecodingFormatFactory<I>> DecodingFormat<I>
discoverDecodingFormat(
+ Class<F> formatFactoryClass, ConfigOption<String>
formatOption) {
+ return discoverOptionalDecodingFormat(formatFactoryClass,
formatOption)
+ .orElseThrow(
+ () ->
+ new ValidationException(
+ String.format(
+ "Could not find required
scan format '%s'.",
+ formatOption.key())));
+ }
+
+ /**
+ * Discovers a {@link DecodingFormat} of the given type using the
given option (if present)
+ * as factory identifier.
+ */
+ public <I, F extends DecodingFormatFactory<I>>
+ Optional<DecodingFormat<I>> discoverOptionalDecodingFormat(
+ Class<F> formatFactoryClass, ConfigOption<String>
formatOption) {
+ return discoverOptionalFormatFactory(formatFactoryClass,
formatOption)
+ .map(
+ formatFactory -> {
+ String formatPrefix =
+ formatFlinkPrefix(formatFactory,
formatOption);
+ try {
+ return formatFactory.createDecodingFormat(
+ context,
+ createFormatOptions(formatPrefix,
formatFactory));
+ } catch (Throwable t) {
+ throw new ValidationException(
+ String.format(
+ "Error creating scan
format '%s' in option space '%s'.",
+
formatFactory.factoryIdentifier(),
+ formatPrefix),
+ t);
+ }
+ });
+ }
+
+ /**
+ * Discovers a {@link EncodingFormat} of the given type using the
given option as factory
+ * identifier.
+ */
+ public <I, F extends EncodingFormatFactory<I>> EncodingFormat<I>
discoverEncodingFormat(
+ Class<F> formatFactoryClass, ConfigOption<String>
formatOption) {
+ return discoverOptionalEncodingFormat(formatFactoryClass,
formatOption)
+ .orElseThrow(
+ () ->
+ new ValidationException(
+ String.format(
+ "Could not find required
sink format '%s'.",
+ formatOption.key())));
+ }
+
+ /**
+ * Discovers a {@link EncodingFormat} of the given type using the
given option (if present)
+ * as factory identifier.
+ */
+ public <I, F extends EncodingFormatFactory<I>>
+ Optional<EncodingFormat<I>> discoverOptionalEncodingFormat(
+ Class<F> formatFactoryClass, ConfigOption<String>
formatOption) {
+ return discoverOptionalFormatFactory(formatFactoryClass,
formatOption)
+ .map(
+ formatFactory -> {
+ String formatPrefix =
+ formatFlinkPrefix(formatFactory,
formatOption);
+ try {
+ return formatFactory.createEncodingFormat(
+ context,
+ createFormatOptions(formatPrefix,
formatFactory));
+ } catch (Throwable t) {
+ throw new ValidationException(
+ String.format(
+ "Error creating sink
format '%s' in option space '%s'.",
+
formatFactory.factoryIdentifier(),
+ formatPrefix),
+ t);
+ }
+ });
+ }
+
+ //
----------------------------------------------------------------------------------------
+
+ private <F extends Factory> Optional<F> discoverOptionalFormatFactory(
+ Class<F> formatFactoryClass, ConfigOption<String>
formatOption) {
+ final String identifier = allOptions.get(formatOption);
+ checkFormatIdentifierMatchesWithEnrichingOptions(formatOption,
identifier);
+ if (identifier == null) {
+ return Optional.empty();
+ }
+ final F factory =
+ discoverFlinkFactory(context.getClassLoader(),
formatFactoryClass, identifier);
+ String formatPrefix = formatFlinkPrefix(factory, formatOption);
+
+ // log all used options of other factories
+ final List<ConfigOption<?>> consumedOptions = new ArrayList<>();
+ consumedOptions.addAll(factory.requiredOptions());
+ consumedOptions.addAll(factory.optionalOptions());
+
+ consumedOptions.stream()
+ .flatMap(
+ option ->
+ allKeysExpanded(formatPrefix, option,
allOptions.keySet())
+ .stream())
+ .forEach(consumedOptionKeys::add);
+
+ consumedOptions.stream()
+ .flatMap(FlinkFactoryUtil::deprecatedKeys)
+ .map(k -> formatPrefix + k)
+ .forEach(deprecatedOptionKeys::add);
+
+ return Optional.of(factory);
+ }
+
+ private String formatFlinkPrefix(Factory formatFactory,
ConfigOption<String> formatOption) {
+ String identifier = formatFactory.factoryIdentifier();
+ return getFormatPrefix(formatOption, identifier);
+ }
+
+ @SuppressWarnings({"unchecked"})
+ private ReadableConfig createFormatOptions(
+ String formatPrefix, FormatFactory formatFactory) {
+ Set<ConfigOption<?>> forwardableConfigOptions =
formatFactory.forwardOptions();
+ Configuration formatConf = new DelegatingConfiguration(allOptions,
formatPrefix);
+ if (forwardableConfigOptions.isEmpty()) {
+ return formatConf;
+ }
+
+ Configuration formatConfFromEnrichingOptions =
+ new DelegatingConfiguration(enrichingOptions,
formatPrefix);
+
+ for (ConfigOption<?> option : forwardableConfigOptions) {
+ formatConfFromEnrichingOptions
+ .getOptional(option)
+ .ifPresent(o -> formatConf.set((ConfigOption<? super
Object>) option, o));
+ }
+
+ return formatConf;
+ }
+
+ /**
+ * This function assumes that the format config is used only and only
if the original
+ * configuration contains the format config option. It will fail if
there is a mismatch of
+ * the identifier between the format in the plan table map and the one
in enriching table
+ * map.
+ */
+ private void checkFormatIdentifierMatchesWithEnrichingOptions(
+ ConfigOption<String> formatOption, String identifierFromPlan) {
+ Optional<String> identifierFromEnrichingOptions =
+ enrichingOptions.getOptional(formatOption);
+
+ if (!identifierFromEnrichingOptions.isPresent()) {
+ return;
+ }
+
+ if (identifierFromPlan == null) {
+ throw new ValidationException(
+ String.format(
+ "The persisted plan has no format option '%s'
specified, while the catalog table has it with value '%s'. "
+ + "This is invalid, as either only the
persisted plan table defines the format, "
+ + "or both the persisted plan table
and the catalog table defines the same format.",
+ formatOption,
identifierFromEnrichingOptions.get()));
+ }
+
+ if (!Objects.equals(identifierFromPlan,
identifierFromEnrichingOptions.get())) {
+ throw new ValidationException(
+ String.format(
+ "Both persisted plan table and catalog table
define the format option '%s', "
+ + "but they mismatch: '%s' != '%s'.",
+ formatOption,
+ identifierFromPlan,
+ identifierFromEnrichingOptions.get()));
+ }
+ }
+ }
+ //
--------------------------------------------------------------------------------------------
+
+ private FlinkFactoryUtil() {
+ // no instantiation
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/factories/LogStoreFactoryUtil.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/factories/LogStoreFactoryUtil.java
new file mode 100644
index 000000000..f7009c35b
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/factories/LogStoreFactoryUtil.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.factories;
+
+import org.apache.paimon.flink.log.LogStoreTableFactory;
+
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.ValidationException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.ServiceLoader;
+import java.util.stream.Collectors;
+
+import static
org.apache.flink.table.factories.ManagedTableFactory.DEFAULT_IDENTIFIER;
+
+/** Utility for working with {@link LogStoreTableFactory}s. */
+public final class LogStoreFactoryUtil {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(LogStoreFactoryUtil.class);
+
+ /** Discovers a LogStoreTableFactory using the given factory base class
and identifier. */
+ @SuppressWarnings("unchecked")
+ public static <T extends LogStoreTableFactory> T discoverLogStoreFactory(
+ ClassLoader classLoader, Class<T> factoryClass, String
factoryIdentifier) {
+ final List<LogStoreTableFactory> factories =
discoverLogStoreFactories(classLoader);
+
+ final List<LogStoreTableFactory> foundFactories =
+ factories.stream()
+ .filter(f ->
factoryClass.isAssignableFrom(f.getClass()))
+ .collect(Collectors.toList());
+
+ if (foundFactories.isEmpty()) {
+ throw new ValidationException(
+ String.format(
+ "Could not find any factories that implement '%s'
in the classpath.",
+ factoryClass.getName()));
+ }
+
+ final List<LogStoreTableFactory> matchingFactories =
+ foundFactories.stream()
+ .filter(f ->
f.factoryIdentifier().equals(factoryIdentifier))
+ .collect(Collectors.toList());
+
+ if (matchingFactories.isEmpty()) {
+ throw new ValidationException(
+ String.format(
+ "Could not find any factory for identifier '%s'
that implements '%s' in the classpath.\n\n"
+ + "Available factory identifiers are:\n\n"
+ + "%s",
+ factoryIdentifier,
+ factoryClass.getName(),
+ foundFactories.stream()
+
.map(LogStoreTableFactory::factoryIdentifier)
+ .filter(identifier ->
!DEFAULT_IDENTIFIER.equals(identifier))
+ .distinct()
+ .sorted()
+ .collect(Collectors.joining("\n"))));
+ }
+ if (matchingFactories.size() > 1) {
+ throw new ValidationException(
+ String.format(
+ "Multiple factories for identifier '%s' that
implement '%s' found in the classpath.\n\n"
+ + "Ambiguous factory classes are:\n\n"
+ + "%s",
+ factoryIdentifier,
+ factoryClass.getName(),
+ matchingFactories.stream()
+ .map(f -> f.getClass().getName())
+ .sorted()
+ .collect(Collectors.joining("\n"))));
+ }
+
+ return (T) matchingFactories.get(0);
+ }
+
+ //
--------------------------------------------------------------------------------------------
+ // Helper methods
+ //
--------------------------------------------------------------------------------------------
+
+ static List<LogStoreTableFactory> discoverLogStoreFactories(ClassLoader
classLoader) {
+ final Iterator<LogStoreTableFactory> serviceLoaderIterator =
+ ServiceLoader.load(LogStoreTableFactory.class,
classLoader).iterator();
+
+ final List<LogStoreTableFactory> loadResults = new ArrayList<>();
+ while (true) {
+ try {
+ // error handling should also be applied to the hasNext() call
because service
+ // loading might cause problems here as well
+ if (!serviceLoaderIterator.hasNext()) {
+ break;
+ }
+
+ loadResults.add(serviceLoaderIterator.next());
+ } catch (Throwable t) {
+ if (t instanceof NoClassDefFoundError) {
+ LOG.debug(
+ "NoClassDefFoundError when loading a "
+ +
LogStoreTableFactory.class.getCanonicalName()
+ + ". This is expected when trying to load
a format dependency but no flink-connector-files is loaded.",
+ t);
+ } else {
+ throw new TableException(
+ "Unexpected error when trying to load service
provider.", t);
+ }
+ }
+ }
+
+ return loadResults;
+ }
+
+ //
--------------------------------------------------------------------------------------------
+
+ private LogStoreFactoryUtil() {
+ // no instantiation
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/kafka/KafkaLogStoreFactory.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/kafka/KafkaLogStoreFactory.java
index 816827867..9bb9e0f3a 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/kafka/KafkaLogStoreFactory.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/kafka/KafkaLogStoreFactory.java
@@ -19,12 +19,12 @@
package org.apache.paimon.flink.kafka;
import org.apache.paimon.CoreOptions;
+import
org.apache.paimon.flink.factories.FlinkFactoryUtil.FlinkTableFactoryHelper;
import org.apache.paimon.flink.log.LogStoreTableFactory;
import org.apache.paimon.options.Options;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.serialization.SerializationSchema;
-import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.table.catalog.ResolvedSchema;
@@ -32,24 +32,22 @@ import org.apache.flink.table.catalog.UniqueConstraint;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.table.factories.DynamicTableFactory.Context;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.utils.DataTypeUtils;
import javax.annotation.Nullable;
-import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
-import java.util.Set;
-import static
org.apache.flink.table.factories.FactoryUtil.createTableFactoryHelper;
import static
org.apache.kafka.clients.consumer.ConsumerConfig.ISOLATION_LEVEL_CONFIG;
import static org.apache.paimon.CoreOptions.LOG_CHANGELOG_MODE;
import static org.apache.paimon.CoreOptions.LOG_CONSISTENCY;
import static org.apache.paimon.CoreOptions.LogConsistency;
import static org.apache.paimon.CoreOptions.SCAN_TIMESTAMP_MILLIS;
+import static
org.apache.paimon.flink.factories.FlinkFactoryUtil.createFlinkTableFactoryHelper;
import static org.apache.paimon.flink.kafka.KafkaLogOptions.TOPIC;
/** The Kafka {@link LogStoreTableFactory} implementation. */
@@ -64,16 +62,6 @@ public class KafkaLogStoreFactory implements
LogStoreTableFactory {
return IDENTIFIER;
}
- @Override
- public Set<ConfigOption<?>> requiredOptions() {
- return new HashSet<>();
- }
-
- @Override
- public Set<ConfigOption<?>> optionalOptions() {
- return new HashSet<>();
- }
-
private String topic(Context context) {
return context.getCatalogTable().getOptions().get(TOPIC.key());
}
@@ -83,7 +71,7 @@ public class KafkaLogStoreFactory implements
LogStoreTableFactory {
Context context,
DynamicTableSource.Context sourceContext,
@Nullable int[][] projectFields) {
- FactoryUtil.TableFactoryHelper helper = createTableFactoryHelper(this,
context);
+ FlinkTableFactoryHelper helper = createFlinkTableFactoryHelper(this,
context);
ResolvedSchema schema = context.getCatalogTable().getResolvedSchema();
DataType physicalType = schema.toPhysicalRowDataType();
DeserializationSchema<RowData> primaryKeyDeserializer = null;
@@ -115,7 +103,7 @@ public class KafkaLogStoreFactory implements
LogStoreTableFactory {
@Override
public KafkaLogSinkProvider createSinkProvider(
Context context, DynamicTableSink.Context sinkContext) {
- FactoryUtil.TableFactoryHelper helper = createTableFactoryHelper(this,
context);
+ FlinkTableFactoryHelper helper = createFlinkTableFactoryHelper(this,
context);
ResolvedSchema schema = context.getCatalogTable().getResolvedSchema();
DataType physicalType = schema.toPhysicalRowDataType();
SerializationSchema<RowData> primaryKeySerializer = null;
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/log/LogStoreTableFactory.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/log/LogStoreTableFactory.java
index ecdd504ef..e5b92aacd 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/log/LogStoreTableFactory.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/log/LogStoreTableFactory.java
@@ -18,6 +18,9 @@
package org.apache.paimon.flink.log;
+import
org.apache.paimon.flink.factories.FlinkFactoryUtil.FlinkTableFactoryHelper;
+import org.apache.paimon.flink.factories.LogStoreFactoryUtil;
+
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.configuration.ConfigOption;
@@ -32,9 +35,7 @@ import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.factories.DeserializationFormatFactory;
-import org.apache.flink.table.factories.DynamicTableFactory;
-import org.apache.flink.table.factories.FactoryUtil;
-import org.apache.flink.table.factories.FactoryUtil.TableFactoryHelper;
+import org.apache.flink.table.factories.DynamicTableFactory.Context;
import org.apache.flink.table.factories.SerializationFormatFactory;
import org.apache.flink.types.RowKind;
@@ -50,7 +51,16 @@ import static org.apache.paimon.CoreOptions.LOG_KEY_FORMAT;
* <p>Log tables are for processing only unbounded data. Support streaming
reading and streaming
* writing.
*/
-public interface LogStoreTableFactory extends DynamicTableFactory {
+public interface LogStoreTableFactory {
+
+ /**
+ * Returns a unique identifier among same factory interfaces.
+ *
+ * <p>For consistency, an identifier should be declared as one lower case
word (e.g. {@code
+ * kafka}). If multiple factories exist for different versions, a version
should be appended
+ * using "-" (e.g. {@code elasticsearch-7}).
+ */
+ String factoryIdentifier();
/**
* Creates a {@link LogSourceProvider} instance from a {@link
CatalogTable} and additional
@@ -82,11 +92,12 @@ public interface LogStoreTableFactory extends
DynamicTableFactory {
}
static LogStoreTableFactory discoverLogStoreFactory(ClassLoader cl, String
identifier) {
- return FactoryUtil.discoverFactory(cl, LogStoreTableFactory.class,
identifier);
+ return LogStoreFactoryUtil.discoverLogStoreFactory(
+ cl, LogStoreTableFactory.class, identifier);
}
static DecodingFormat<DeserializationSchema<RowData>> getKeyDecodingFormat(
- TableFactoryHelper helper) {
+ FlinkTableFactoryHelper helper) {
DecodingFormat<DeserializationSchema<RowData>> format =
helper.discoverDecodingFormat(DeserializationFormatFactory.class,
logKeyFormat());
validateKeyFormat(format, helper.getOptions().get(logKeyFormat()));
@@ -94,7 +105,7 @@ public interface LogStoreTableFactory extends
DynamicTableFactory {
}
static EncodingFormat<SerializationSchema<RowData>> getKeyEncodingFormat(
- TableFactoryHelper helper) {
+ FlinkTableFactoryHelper helper) {
EncodingFormat<SerializationSchema<RowData>> format =
helper.discoverEncodingFormat(SerializationFormatFactory.class, logKeyFormat());
validateKeyFormat(format, helper.getOptions().get(logKeyFormat()));
@@ -102,7 +113,7 @@ public interface LogStoreTableFactory extends
DynamicTableFactory {
}
static DecodingFormat<DeserializationSchema<RowData>>
getValueDecodingFormat(
- TableFactoryHelper helper) {
+ FlinkTableFactoryHelper helper) {
DecodingFormat<DeserializationSchema<RowData>> format =
helper.discoverDecodingFormat(DeserializationFormatFactory.class, logFormat());
validateValueFormat(format, helper.getOptions().get(logFormat()));
@@ -110,7 +121,7 @@ public interface LogStoreTableFactory extends
DynamicTableFactory {
}
static EncodingFormat<SerializationSchema<RowData>> getValueEncodingFormat(
- TableFactoryHelper helper) {
+ FlinkTableFactoryHelper helper) {
EncodingFormat<SerializationSchema<RowData>> format =
helper.discoverEncodingFormat(SerializationFormatFactory.class, logFormat());
validateValueFormat(format, helper.getOptions().get(logFormat()));
diff --git
a/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
b/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
index b4522f01a..a981b82fb 100644
---
a/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
+++
b/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
@@ -15,4 +15,3 @@
org.apache.paimon.flink.FlinkTableFactory
org.apache.paimon.flink.FlinkCatalogFactory
-org.apache.paimon.flink.kafka.KafkaLogStoreFactory
diff --git
a/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
b/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.flink.log.LogStoreTableFactory
similarity index 90%
copy from
paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
copy to
paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.flink.log.LogStoreTableFactory
index b4522f01a..1d40d4751 100644
---
a/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
+++
b/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.flink.log.LogStoreTableFactory
@@ -13,6 +13,4 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-org.apache.paimon.flink.FlinkTableFactory
-org.apache.paimon.flink.FlinkCatalogFactory
org.apache.paimon.flink.kafka.KafkaLogStoreFactory