This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 7455e45ff [flink] LogStoreTableFactory should not implements 
DynamicTableFactory  (#771)
7455e45ff is described below

commit 7455e45ff99e916fbf279a7efae8b71987d293c2
Author: HZY <[email protected]>
AuthorDate: Mon Apr 3 14:11:24 2023 +0800

    [flink] LogStoreTableFactory should not implements DynamicTableFactory  
(#771)
---
 docs/content/concepts/external-log-systems.md      |  17 +
 .../paimon/flink/factories/FlinkFactoryUtil.java   | 500 +++++++++++++++++++++
 .../flink/factories/LogStoreFactoryUtil.java       | 136 ++++++
 .../paimon/flink/kafka/KafkaLogStoreFactory.java   |  22 +-
 .../paimon/flink/log/LogStoreTableFactory.java     |  29 +-
 .../org.apache.flink.table.factories.Factory       |   1 -
 ...g.apache.paimon.flink.log.LogStoreTableFactory} |   2 -
 7 files changed, 678 insertions(+), 29 deletions(-)

diff --git a/docs/content/concepts/external-log-systems.md 
b/docs/content/concepts/external-log-systems.md
index a7f6435d1..38dd18e97 100644
--- a/docs/content/concepts/external-log-systems.md
+++ b/docs/content/concepts/external-log-systems.md
@@ -42,6 +42,23 @@ If `'log.consistency' = 'eventual'` is set, in order to 
achieve correct results,
 
 ### Kafka
 
+#### Preparing flink-sql-connector-kafka Jar File
+
+Paimon currently supports Flink 1.16, 1.15 and 1.14. We recommend the latest 
Flink version for a better experience.
+
+Download the flink-sql-connector-kafka jar file with corresponding version.
+
+{{< stable >}}
+
+| Version | Jar                                                                
                                                                                
                                |
+|---|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| Flink 1.16 | 
[flink-sql-connector-kafka-1.16.1.jar](https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-kafka/1.16.1/flink-sql-connector-kafka-1.16.0.jar)
                |
+| Flink 1.15 | 
[flink-sql-connector-kafka-1.15.4.jar](https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-kafka/1.15.4/flink-sql-connector-kafka-1.15.4.jar)
                   |
+| Flink 1.14 | 
[flink-sql-connector-kafka_2.11-1.14.4.jar](https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-kafka_2.11/1.14.4/flink-sql-connector-kafka_2.11-1.14.4.jar)
 |
+
+{{< /stable >}}
+
+
 By specifying `'log.system' = 'kafka'`, users can write changes into Kafka 
along with table files.
 
 {{< tabs "kafka-example" >}}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/factories/FlinkFactoryUtil.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/factories/FlinkFactoryUtil.java
new file mode 100644
index 000000000..87100ff0e
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/factories/FlinkFactoryUtil.java
@@ -0,0 +1,500 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.factories;
+
+import org.apache.paimon.flink.log.LogStoreTableFactory;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.DelegatingConfiguration;
+import org.apache.flink.configuration.FallbackKey;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.connector.format.DecodingFormat;
+import org.apache.flink.table.connector.format.EncodingFormat;
+import org.apache.flink.table.factories.DecodingFormatFactory;
+import org.apache.flink.table.factories.DynamicTableFactory;
+import org.apache.flink.table.factories.DynamicTableFactory.Context;
+import org.apache.flink.table.factories.EncodingFormatFactory;
+import org.apache.flink.table.factories.Factory;
+import org.apache.flink.table.factories.FormatFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.ServiceLoader;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import java.util.stream.StreamSupport;
+
+import static org.apache.flink.configuration.ConfigurationUtils.canBePrefixMap;
+import static 
org.apache.flink.configuration.ConfigurationUtils.filterPrefixMapKey;
+import static 
org.apache.flink.table.factories.ManagedTableFactory.DEFAULT_IDENTIFIER;
+
+/** Utility for working with {@link Factory}s. */
+public final class FlinkFactoryUtil {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(FlinkFactoryUtil.class);
+
+    public static final ConfigOption<String> FORMAT =
+            ConfigOptions.key("format")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "Defines the format identifier for encoding data. "
+                                    + "The identifier is used to discover a 
suitable format factory.");
+
+    /**
+     * Suffix for keys of {@link ConfigOption} in case a connector requires 
multiple formats (e.g.
+     * for both key and value).
+     *
+     * <p>See {@link #createFlinkTableFactoryHelper(LogStoreTableFactory, 
Context)} Context)} for
+     * more information.
+     */
+    public static final String FORMAT_SUFFIX = ".format";
+
+    /**
+     * Creates a utility that helps in discovering formats, merging options 
with {@link
+     * DynamicTableFactory.Context#getEnrichmentOptions()} and validating them 
all for a {@link
+     * LogStoreTableFactory}.
+     *
+     * <p>The following example sketches the usage:
+     *
+     * <pre>{@code
+     * // in createDynamicTableSource()
+     * helper = FlinkFactoryUtil.createFlinkTableFactoryHelper(this, context);
+     *
+     * keyFormat = 
helper.discoverDecodingFormat(DeserializationFormatFactory.class, KEY_FORMAT);
+     * valueFormat = 
helper.discoverDecodingFormat(DeserializationFormatFactory.class, VALUE_FORMAT);
+     *
+     * helper.validate();
+     *
+     * ... // construct connector with discovered formats
+     * }</pre>
+     */
+    public static FlinkTableFactoryHelper createFlinkTableFactoryHelper(
+            LogStoreTableFactory factory, DynamicTableFactory.Context context) 
{
+        return new FlinkTableFactoryHelper(factory, context);
+    }
+
+    /** Discovers a flink Factory using the given factory base class and 
identifier. */
+    @SuppressWarnings("unchecked")
+    public static <T extends Factory> T discoverFlinkFactory(
+            ClassLoader classLoader, Class<T> factoryClass, String 
factoryIdentifier) {
+        final List<Factory> factories = discoverFlinkFactories(classLoader);
+
+        final List<Factory> foundFactories =
+                factories.stream()
+                        .filter(f -> 
factoryClass.isAssignableFrom(f.getClass()))
+                        .collect(Collectors.toList());
+
+        if (foundFactories.isEmpty()) {
+            throw new ValidationException(
+                    String.format(
+                            "Could not find any factories that implement '%s' 
in the classpath.",
+                            factoryClass.getName()));
+        }
+
+        final List<Factory> matchingFactories =
+                foundFactories.stream()
+                        .filter(f -> 
f.factoryIdentifier().equals(factoryIdentifier))
+                        .collect(Collectors.toList());
+
+        if (matchingFactories.isEmpty()) {
+            throw new ValidationException(
+                    String.format(
+                            "Could not find any factory for identifier '%s' 
that implements '%s' in the classpath.\n\n"
+                                    + "Available factory identifiers are:\n\n"
+                                    + "%s",
+                            factoryIdentifier,
+                            factoryClass.getName(),
+                            foundFactories.stream()
+                                    .map(Factory::factoryIdentifier)
+                                    .filter(identifier -> 
!DEFAULT_IDENTIFIER.equals(identifier))
+                                    .distinct()
+                                    .sorted()
+                                    .collect(Collectors.joining("\n"))));
+        }
+        if (matchingFactories.size() > 1) {
+            throw new ValidationException(
+                    String.format(
+                            "Multiple factories for identifier '%s' that 
implement '%s' found in the classpath.\n\n"
+                                    + "Ambiguous factory classes are:\n\n"
+                                    + "%s",
+                            factoryIdentifier,
+                            factoryClass.getName(),
+                            matchingFactories.stream()
+                                    .map(f -> f.getClass().getName())
+                                    .sorted()
+                                    .collect(Collectors.joining("\n"))));
+        }
+
+        return (T) matchingFactories.get(0);
+    }
+
+    /** Returns the required option prefix for options of the given format. */
+    public static String getFormatPrefix(
+            ConfigOption<String> formatOption, String formatIdentifier) {
+        final String formatOptionKey = formatOption.key();
+        if (formatOptionKey.equals(FORMAT.key())) {
+            return formatIdentifier + ".";
+        } else if (formatOptionKey.endsWith(FORMAT_SUFFIX)) {
+            // extract the key prefix, e.g. extract 'key' from 'key.format'
+            String keyPrefix =
+                    formatOptionKey.substring(0, formatOptionKey.length() - 
FORMAT_SUFFIX.length());
+            return keyPrefix + "." + formatIdentifier + ".";
+        } else {
+            throw new ValidationException(
+                    "Format identifier key should be 'format' or suffix with 
'.format', "
+                            + "don't support format identifier key '"
+                            + formatOptionKey
+                            + "'.");
+        }
+    }
+
+    // 
--------------------------------------------------------------------------------------------
+    // Helper methods
+    // 
--------------------------------------------------------------------------------------------
+
+    static List<Factory> discoverFlinkFactories(ClassLoader classLoader) {
+        final Iterator<Factory> serviceLoaderIterator =
+                ServiceLoader.load(Factory.class, classLoader).iterator();
+
+        final List<Factory> loadResults = new ArrayList<>();
+        while (true) {
+            try {
+                // error handling should also be applied to the hasNext() call 
because service
+                // loading might cause problems here as well
+                if (!serviceLoaderIterator.hasNext()) {
+                    break;
+                }
+
+                loadResults.add(serviceLoaderIterator.next());
+            } catch (Throwable t) {
+                if (t instanceof NoClassDefFoundError) {
+                    LOG.debug(
+                            "NoClassDefFoundError when loading a "
+                                    + 
LogStoreTableFactory.class.getCanonicalName()
+                                    + ". This is expected when trying to load 
a format dependency but no flink-connector-files is loaded.",
+                            t);
+                } else {
+                    throw new TableException(
+                            "Unexpected error when trying to load service 
provider.", t);
+                }
+            }
+        }
+
+        return loadResults;
+    }
+
+    private static Set<String> allKeysExpanded(ConfigOption<?> option, 
Set<String> actualKeys) {
+        return allKeysExpanded("", option, actualKeys);
+    }
+
+    private static Set<String> allKeysExpanded(
+            String prefix, ConfigOption<?> option, Set<String> actualKeys) {
+        final Set<String> staticKeys =
+                allKeys(option).map(k -> prefix + 
k).collect(Collectors.toSet());
+        if (!canBePrefixMap(option)) {
+            return staticKeys;
+        }
+        // include all prefix keys of a map option by considering the actually 
provided keys
+        return Stream.concat(
+                        staticKeys.stream(),
+                        staticKeys.stream()
+                                .flatMap(
+                                        k ->
+                                                actualKeys.stream()
+                                                        .filter(c -> 
filterPrefixMapKey(k, c))))
+                .collect(Collectors.toSet());
+    }
+
+    private static Stream<String> allKeys(ConfigOption<?> option) {
+        return Stream.concat(Stream.of(option.key()), fallbackKeys(option));
+    }
+
+    private static Stream<String> fallbackKeys(ConfigOption<?> option) {
+        return StreamSupport.stream(option.fallbackKeys().spliterator(), false)
+                .map(FallbackKey::getKey);
+    }
+
+    private static Stream<String> deprecatedKeys(ConfigOption<?> option) {
+        return StreamSupport.stream(option.fallbackKeys().spliterator(), false)
+                .filter(FallbackKey::isDeprecated)
+                .map(FallbackKey::getKey);
+    }
+
+    /** Base flink helper utility for validating all options for a {@link 
LogStoreTableFactory}. */
+    public static class FlinkFactoryHelper<F extends LogStoreTableFactory> {
+
+        protected final F factory;
+
+        protected final Configuration allOptions;
+
+        protected final Set<String> consumedOptionKeys;
+
+        protected final Set<String> deprecatedOptionKeys;
+
+        public FlinkFactoryHelper(
+                F factory, Map<String, String> configuration, 
ConfigOption<?>... implicitOptions) {
+            this.factory = factory;
+            this.allOptions = Configuration.fromMap(configuration);
+
+            final List<ConfigOption<?>> consumedOptions = new ArrayList<>();
+            consumedOptions.addAll(Arrays.asList(implicitOptions));
+
+            consumedOptionKeys =
+                    consumedOptions.stream()
+                            .flatMap(
+                                    option -> allKeysExpanded(option, 
allOptions.keySet()).stream())
+                            .collect(Collectors.toSet());
+
+            deprecatedOptionKeys =
+                    consumedOptions.stream()
+                            .flatMap(FlinkFactoryUtil::deprecatedKeys)
+                            .collect(Collectors.toSet());
+        }
+
+        /** Returns all options currently being consumed by the factory. */
+        public ReadableConfig getOptions() {
+            return allOptions;
+        }
+    }
+
+    /**
+     * Helper utility for discovering formats and validating all options for a 
{@link
+     * DynamicTableFactory}.
+     *
+     * @see #createFlinkTableFactoryHelper(LogStoreTableFactory, Context)
+     */
+    public static class FlinkTableFactoryHelper extends 
FlinkFactoryHelper<LogStoreTableFactory> {
+
+        private final Context context;
+
+        private final Configuration enrichingOptions;
+
+        private FlinkTableFactoryHelper(LogStoreTableFactory tableFactory, 
Context context) {
+            super(tableFactory, context.getCatalogTable().getOptions());
+            this.context = context;
+            this.enrichingOptions = 
Configuration.fromMap(context.getEnrichmentOptions());
+        }
+
+        /**
+         * Returns all options currently being consumed by the factory. This 
method returns the
+         * options already merged with {@link Context#getEnrichmentOptions()}, 
using {@link
+         * DynamicTableFactory#forwardOptions()} as reference of mergeable 
options.
+         */
+        @Override
+        public ReadableConfig getOptions() {
+            return super.getOptions();
+        }
+
+        /**
+         * Discovers a {@link DecodingFormat} of the given type using the 
given option as factory
+         * identifier.
+         */
+        public <I, F extends DecodingFormatFactory<I>> DecodingFormat<I> 
discoverDecodingFormat(
+                Class<F> formatFactoryClass, ConfigOption<String> 
formatOption) {
+            return discoverOptionalDecodingFormat(formatFactoryClass, 
formatOption)
+                    .orElseThrow(
+                            () ->
+                                    new ValidationException(
+                                            String.format(
+                                                    "Could not find required 
scan format '%s'.",
+                                                    formatOption.key())));
+        }
+
+        /**
+         * Discovers a {@link DecodingFormat} of the given type using the 
given option (if present)
+         * as factory identifier.
+         */
+        public <I, F extends DecodingFormatFactory<I>>
+                Optional<DecodingFormat<I>> discoverOptionalDecodingFormat(
+                        Class<F> formatFactoryClass, ConfigOption<String> 
formatOption) {
+            return discoverOptionalFormatFactory(formatFactoryClass, 
formatOption)
+                    .map(
+                            formatFactory -> {
+                                String formatPrefix =
+                                        formatFlinkPrefix(formatFactory, 
formatOption);
+                                try {
+                                    return formatFactory.createDecodingFormat(
+                                            context,
+                                            createFormatOptions(formatPrefix, 
formatFactory));
+                                } catch (Throwable t) {
+                                    throw new ValidationException(
+                                            String.format(
+                                                    "Error creating scan 
format '%s' in option space '%s'.",
+                                                    
formatFactory.factoryIdentifier(),
+                                                    formatPrefix),
+                                            t);
+                                }
+                            });
+        }
+
+        /**
+         * Discovers a {@link EncodingFormat} of the given type using the 
given option as factory
+         * identifier.
+         */
+        public <I, F extends EncodingFormatFactory<I>> EncodingFormat<I> 
discoverEncodingFormat(
+                Class<F> formatFactoryClass, ConfigOption<String> 
formatOption) {
+            return discoverOptionalEncodingFormat(formatFactoryClass, 
formatOption)
+                    .orElseThrow(
+                            () ->
+                                    new ValidationException(
+                                            String.format(
+                                                    "Could not find required 
sink format '%s'.",
+                                                    formatOption.key())));
+        }
+
+        /**
+         * Discovers a {@link EncodingFormat} of the given type using the 
given option (if present)
+         * as factory identifier.
+         */
+        public <I, F extends EncodingFormatFactory<I>>
+                Optional<EncodingFormat<I>> discoverOptionalEncodingFormat(
+                        Class<F> formatFactoryClass, ConfigOption<String> 
formatOption) {
+            return discoverOptionalFormatFactory(formatFactoryClass, 
formatOption)
+                    .map(
+                            formatFactory -> {
+                                String formatPrefix =
+                                        formatFlinkPrefix(formatFactory, 
formatOption);
+                                try {
+                                    return formatFactory.createEncodingFormat(
+                                            context,
+                                            createFormatOptions(formatPrefix, 
formatFactory));
+                                } catch (Throwable t) {
+                                    throw new ValidationException(
+                                            String.format(
+                                                    "Error creating sink 
format '%s' in option space '%s'.",
+                                                    
formatFactory.factoryIdentifier(),
+                                                    formatPrefix),
+                                            t);
+                                }
+                            });
+        }
+
+        // 
----------------------------------------------------------------------------------------
+
+        private <F extends Factory> Optional<F> discoverOptionalFormatFactory(
+                Class<F> formatFactoryClass, ConfigOption<String> 
formatOption) {
+            final String identifier = allOptions.get(formatOption);
+            checkFormatIdentifierMatchesWithEnrichingOptions(formatOption, 
identifier);
+            if (identifier == null) {
+                return Optional.empty();
+            }
+            final F factory =
+                    discoverFlinkFactory(context.getClassLoader(), 
formatFactoryClass, identifier);
+            String formatPrefix = formatFlinkPrefix(factory, formatOption);
+
+            // log all used options of other factories
+            final List<ConfigOption<?>> consumedOptions = new ArrayList<>();
+            consumedOptions.addAll(factory.requiredOptions());
+            consumedOptions.addAll(factory.optionalOptions());
+
+            consumedOptions.stream()
+                    .flatMap(
+                            option ->
+                                    allKeysExpanded(formatPrefix, option, 
allOptions.keySet())
+                                            .stream())
+                    .forEach(consumedOptionKeys::add);
+
+            consumedOptions.stream()
+                    .flatMap(FlinkFactoryUtil::deprecatedKeys)
+                    .map(k -> formatPrefix + k)
+                    .forEach(deprecatedOptionKeys::add);
+
+            return Optional.of(factory);
+        }
+
+        private String formatFlinkPrefix(Factory formatFactory, 
ConfigOption<String> formatOption) {
+            String identifier = formatFactory.factoryIdentifier();
+            return getFormatPrefix(formatOption, identifier);
+        }
+
+        @SuppressWarnings({"unchecked"})
+        private ReadableConfig createFormatOptions(
+                String formatPrefix, FormatFactory formatFactory) {
+            Set<ConfigOption<?>> forwardableConfigOptions = 
formatFactory.forwardOptions();
+            Configuration formatConf = new DelegatingConfiguration(allOptions, 
formatPrefix);
+            if (forwardableConfigOptions.isEmpty()) {
+                return formatConf;
+            }
+
+            Configuration formatConfFromEnrichingOptions =
+                    new DelegatingConfiguration(enrichingOptions, 
formatPrefix);
+
+            for (ConfigOption<?> option : forwardableConfigOptions) {
+                formatConfFromEnrichingOptions
+                        .getOptional(option)
+                        .ifPresent(o -> formatConf.set((ConfigOption<? super 
Object>) option, o));
+            }
+
+            return formatConf;
+        }
+
+        /**
+         * This function assumes that the format config is used only and only 
if the original
+         * configuration contains the format config option. It will fail if 
there is a mismatch of
+         * the identifier between the format in the plan table map and the one 
in enriching table
+         * map.
+         */
+        private void checkFormatIdentifierMatchesWithEnrichingOptions(
+                ConfigOption<String> formatOption, String identifierFromPlan) {
+            Optional<String> identifierFromEnrichingOptions =
+                    enrichingOptions.getOptional(formatOption);
+
+            if (!identifierFromEnrichingOptions.isPresent()) {
+                return;
+            }
+
+            if (identifierFromPlan == null) {
+                throw new ValidationException(
+                        String.format(
+                                "The persisted plan has no format option '%s' 
specified, while the catalog table has it with value '%s'. "
+                                        + "This is invalid, as either only the 
persisted plan table defines the format, "
+                                        + "or both the persisted plan table 
and the catalog table defines the same format.",
+                                formatOption, 
identifierFromEnrichingOptions.get()));
+            }
+
+            if (!Objects.equals(identifierFromPlan, 
identifierFromEnrichingOptions.get())) {
+                throw new ValidationException(
+                        String.format(
+                                "Both persisted plan table and catalog table 
define the format option '%s', "
+                                        + "but they mismatch: '%s' != '%s'.",
+                                formatOption,
+                                identifierFromPlan,
+                                identifierFromEnrichingOptions.get()));
+            }
+        }
+    }
+    // 
--------------------------------------------------------------------------------------------
+
+    private FlinkFactoryUtil() {
+        // no instantiation
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/factories/LogStoreFactoryUtil.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/factories/LogStoreFactoryUtil.java
new file mode 100644
index 000000000..f7009c35b
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/factories/LogStoreFactoryUtil.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.factories;
+
+import org.apache.paimon.flink.log.LogStoreTableFactory;
+
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.ValidationException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.ServiceLoader;
+import java.util.stream.Collectors;
+
+import static 
org.apache.flink.table.factories.ManagedTableFactory.DEFAULT_IDENTIFIER;
+
+/** Utility for working with {@link LogStoreTableFactory}s. */
+public final class LogStoreFactoryUtil {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(LogStoreFactoryUtil.class);
+
+    /** Discovers a LogStoreTableFactory using the given factory base class 
and identifier. */
+    @SuppressWarnings("unchecked")
+    public static <T extends LogStoreTableFactory> T discoverLogStoreFactory(
+            ClassLoader classLoader, Class<T> factoryClass, String 
factoryIdentifier) {
+        final List<LogStoreTableFactory> factories = 
discoverLogStoreFactories(classLoader);
+
+        final List<LogStoreTableFactory> foundFactories =
+                factories.stream()
+                        .filter(f -> 
factoryClass.isAssignableFrom(f.getClass()))
+                        .collect(Collectors.toList());
+
+        if (foundFactories.isEmpty()) {
+            throw new ValidationException(
+                    String.format(
+                            "Could not find any factories that implement '%s' 
in the classpath.",
+                            factoryClass.getName()));
+        }
+
+        final List<LogStoreTableFactory> matchingFactories =
+                foundFactories.stream()
+                        .filter(f -> 
f.factoryIdentifier().equals(factoryIdentifier))
+                        .collect(Collectors.toList());
+
+        if (matchingFactories.isEmpty()) {
+            throw new ValidationException(
+                    String.format(
+                            "Could not find any factory for identifier '%s' 
that implements '%s' in the classpath.\n\n"
+                                    + "Available factory identifiers are:\n\n"
+                                    + "%s",
+                            factoryIdentifier,
+                            factoryClass.getName(),
+                            foundFactories.stream()
+                                    
.map(LogStoreTableFactory::factoryIdentifier)
+                                    .filter(identifier -> 
!DEFAULT_IDENTIFIER.equals(identifier))
+                                    .distinct()
+                                    .sorted()
+                                    .collect(Collectors.joining("\n"))));
+        }
+        if (matchingFactories.size() > 1) {
+            throw new ValidationException(
+                    String.format(
+                            "Multiple factories for identifier '%s' that 
implement '%s' found in the classpath.\n\n"
+                                    + "Ambiguous factory classes are:\n\n"
+                                    + "%s",
+                            factoryIdentifier,
+                            factoryClass.getName(),
+                            matchingFactories.stream()
+                                    .map(f -> f.getClass().getName())
+                                    .sorted()
+                                    .collect(Collectors.joining("\n"))));
+        }
+
+        return (T) matchingFactories.get(0);
+    }
+
+    // 
--------------------------------------------------------------------------------------------
+    // Helper methods
+    // 
--------------------------------------------------------------------------------------------
+
+    static List<LogStoreTableFactory> discoverLogStoreFactories(ClassLoader 
classLoader) {
+        final Iterator<LogStoreTableFactory> serviceLoaderIterator =
+                ServiceLoader.load(LogStoreTableFactory.class, 
classLoader).iterator();
+
+        final List<LogStoreTableFactory> loadResults = new ArrayList<>();
+        while (true) {
+            try {
+                // error handling should also be applied to the hasNext() call 
because service
+                // loading might cause problems here as well
+                if (!serviceLoaderIterator.hasNext()) {
+                    break;
+                }
+
+                loadResults.add(serviceLoaderIterator.next());
+            } catch (Throwable t) {
+                if (t instanceof NoClassDefFoundError) {
+                    LOG.debug(
+                            "NoClassDefFoundError when loading a "
+                                    + 
LogStoreTableFactory.class.getCanonicalName()
+                                    + ". This is expected when trying to load 
a format dependency but no flink-connector-files is loaded.",
+                            t);
+                } else {
+                    throw new TableException(
+                            "Unexpected error when trying to load service 
provider.", t);
+                }
+            }
+        }
+
+        return loadResults;
+    }
+
+    // 
--------------------------------------------------------------------------------------------
+
+    private LogStoreFactoryUtil() {
+        // no instantiation
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/kafka/KafkaLogStoreFactory.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/kafka/KafkaLogStoreFactory.java
index 816827867..9bb9e0f3a 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/kafka/KafkaLogStoreFactory.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/kafka/KafkaLogStoreFactory.java
@@ -19,12 +19,12 @@
 package org.apache.paimon.flink.kafka;
 
 import org.apache.paimon.CoreOptions;
+import 
org.apache.paimon.flink.factories.FlinkFactoryUtil.FlinkTableFactoryHelper;
 import org.apache.paimon.flink.log.LogStoreTableFactory;
 import org.apache.paimon.options.Options;
 
 import org.apache.flink.api.common.serialization.DeserializationSchema;
 import org.apache.flink.api.common.serialization.SerializationSchema;
-import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.ReadableConfig;
 import org.apache.flink.table.catalog.ResolvedSchema;
@@ -32,24 +32,22 @@ import org.apache.flink.table.catalog.UniqueConstraint;
 import org.apache.flink.table.connector.sink.DynamicTableSink;
 import org.apache.flink.table.connector.source.DynamicTableSource;
 import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.table.factories.DynamicTableFactory.Context;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.utils.DataTypeUtils;
 
 import javax.annotation.Nullable;
 
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
-import java.util.Set;
 
-import static 
org.apache.flink.table.factories.FactoryUtil.createTableFactoryHelper;
 import static 
org.apache.kafka.clients.consumer.ConsumerConfig.ISOLATION_LEVEL_CONFIG;
 import static org.apache.paimon.CoreOptions.LOG_CHANGELOG_MODE;
 import static org.apache.paimon.CoreOptions.LOG_CONSISTENCY;
 import static org.apache.paimon.CoreOptions.LogConsistency;
 import static org.apache.paimon.CoreOptions.SCAN_TIMESTAMP_MILLIS;
+import static 
org.apache.paimon.flink.factories.FlinkFactoryUtil.createFlinkTableFactoryHelper;
 import static org.apache.paimon.flink.kafka.KafkaLogOptions.TOPIC;
 
 /** The Kafka {@link LogStoreTableFactory} implementation. */
@@ -64,16 +62,6 @@ public class KafkaLogStoreFactory implements 
LogStoreTableFactory {
         return IDENTIFIER;
     }
 
-    @Override
-    public Set<ConfigOption<?>> requiredOptions() {
-        return new HashSet<>();
-    }
-
-    @Override
-    public Set<ConfigOption<?>> optionalOptions() {
-        return new HashSet<>();
-    }
-
     private String topic(Context context) {
         return context.getCatalogTable().getOptions().get(TOPIC.key());
     }
@@ -83,7 +71,7 @@ public class KafkaLogStoreFactory implements 
LogStoreTableFactory {
             Context context,
             DynamicTableSource.Context sourceContext,
             @Nullable int[][] projectFields) {
-        FactoryUtil.TableFactoryHelper helper = createTableFactoryHelper(this, 
context);
+        FlinkTableFactoryHelper helper = createFlinkTableFactoryHelper(this, 
context);
         ResolvedSchema schema = context.getCatalogTable().getResolvedSchema();
         DataType physicalType = schema.toPhysicalRowDataType();
         DeserializationSchema<RowData> primaryKeyDeserializer = null;
@@ -115,7 +103,7 @@ public class KafkaLogStoreFactory implements 
LogStoreTableFactory {
     @Override
     public KafkaLogSinkProvider createSinkProvider(
             Context context, DynamicTableSink.Context sinkContext) {
-        FactoryUtil.TableFactoryHelper helper = createTableFactoryHelper(this, 
context);
+        FlinkTableFactoryHelper helper = createFlinkTableFactoryHelper(this, 
context);
         ResolvedSchema schema = context.getCatalogTable().getResolvedSchema();
         DataType physicalType = schema.toPhysicalRowDataType();
         SerializationSchema<RowData> primaryKeySerializer = null;
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/log/LogStoreTableFactory.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/log/LogStoreTableFactory.java
index ecdd504ef..e5b92aacd 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/log/LogStoreTableFactory.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/log/LogStoreTableFactory.java
@@ -18,6 +18,9 @@
 
 package org.apache.paimon.flink.log;
 
+import 
org.apache.paimon.flink.factories.FlinkFactoryUtil.FlinkTableFactoryHelper;
+import org.apache.paimon.flink.factories.LogStoreFactoryUtil;
+
 import org.apache.flink.api.common.serialization.DeserializationSchema;
 import org.apache.flink.api.common.serialization.SerializationSchema;
 import org.apache.flink.configuration.ConfigOption;
@@ -32,9 +35,7 @@ import org.apache.flink.table.connector.sink.DynamicTableSink;
 import org.apache.flink.table.connector.source.DynamicTableSource;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.factories.DeserializationFormatFactory;
-import org.apache.flink.table.factories.DynamicTableFactory;
-import org.apache.flink.table.factories.FactoryUtil;
-import org.apache.flink.table.factories.FactoryUtil.TableFactoryHelper;
+import org.apache.flink.table.factories.DynamicTableFactory.Context;
 import org.apache.flink.table.factories.SerializationFormatFactory;
 import org.apache.flink.types.RowKind;
 
@@ -50,7 +51,16 @@ import static org.apache.paimon.CoreOptions.LOG_KEY_FORMAT;
  * <p>Log tables are for processing only unbounded data. Support streaming 
reading and streaming
  * writing.
  */
-public interface LogStoreTableFactory extends DynamicTableFactory {
+public interface LogStoreTableFactory {
+
+    /**
+     * Returns a unique identifier among same factory interfaces.
+     *
+     * <p>For consistency, an identifier should be declared as one lower case 
word (e.g. {@code
+     * kafka}). If multiple factories exist for different versions, a version 
should be appended
+     * using "-" (e.g. {@code elasticsearch-7}).
+     */
+    String factoryIdentifier();
 
     /**
      * Creates a {@link LogSourceProvider} instance from a {@link 
CatalogTable} and additional
@@ -82,11 +92,12 @@ public interface LogStoreTableFactory extends 
DynamicTableFactory {
     }
 
     static LogStoreTableFactory discoverLogStoreFactory(ClassLoader cl, String 
identifier) {
-        return FactoryUtil.discoverFactory(cl, LogStoreTableFactory.class, 
identifier);
+        return LogStoreFactoryUtil.discoverLogStoreFactory(
+                cl, LogStoreTableFactory.class, identifier);
     }
 
     static DecodingFormat<DeserializationSchema<RowData>> getKeyDecodingFormat(
-            TableFactoryHelper helper) {
+            FlinkTableFactoryHelper helper) {
         DecodingFormat<DeserializationSchema<RowData>> format =
                 
helper.discoverDecodingFormat(DeserializationFormatFactory.class, 
logKeyFormat());
         validateKeyFormat(format, helper.getOptions().get(logKeyFormat()));
@@ -94,7 +105,7 @@ public interface LogStoreTableFactory extends 
DynamicTableFactory {
     }
 
     static EncodingFormat<SerializationSchema<RowData>> getKeyEncodingFormat(
-            TableFactoryHelper helper) {
+            FlinkTableFactoryHelper helper) {
         EncodingFormat<SerializationSchema<RowData>> format =
                 
helper.discoverEncodingFormat(SerializationFormatFactory.class, logKeyFormat());
         validateKeyFormat(format, helper.getOptions().get(logKeyFormat()));
@@ -102,7 +113,7 @@ public interface LogStoreTableFactory extends 
DynamicTableFactory {
     }
 
     static DecodingFormat<DeserializationSchema<RowData>> 
getValueDecodingFormat(
-            TableFactoryHelper helper) {
+            FlinkTableFactoryHelper helper) {
         DecodingFormat<DeserializationSchema<RowData>> format =
                 
helper.discoverDecodingFormat(DeserializationFormatFactory.class, logFormat());
         validateValueFormat(format, helper.getOptions().get(logFormat()));
@@ -110,7 +121,7 @@ public interface LogStoreTableFactory extends 
DynamicTableFactory {
     }
 
     static EncodingFormat<SerializationSchema<RowData>> getValueEncodingFormat(
-            TableFactoryHelper helper) {
+            FlinkTableFactoryHelper helper) {
         EncodingFormat<SerializationSchema<RowData>> format =
                 
helper.discoverEncodingFormat(SerializationFormatFactory.class, logFormat());
         validateValueFormat(format, helper.getOptions().get(logFormat()));
diff --git 
a/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
 
b/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
index b4522f01a..a981b82fb 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
+++ 
b/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
@@ -15,4 +15,3 @@
 
 org.apache.paimon.flink.FlinkTableFactory
 org.apache.paimon.flink.FlinkCatalogFactory
-org.apache.paimon.flink.kafka.KafkaLogStoreFactory
diff --git 
a/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
 
b/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.flink.log.LogStoreTableFactory
similarity index 90%
copy from 
paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
copy to 
paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.flink.log.LogStoreTableFactory
index b4522f01a..1d40d4751 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
+++ 
b/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.flink.log.LogStoreTableFactory
@@ -13,6 +13,4 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-org.apache.paimon.flink.FlinkTableFactory
-org.apache.paimon.flink.FlinkCatalogFactory
 org.apache.paimon.flink.kafka.KafkaLogStoreFactory


Reply via email to