This is an automated email from the ASF dual-hosted git repository.

zjureel pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 031f4a473 [core] Introduce factory interface (#1513)
031f4a473 is described below

commit 031f4a4735bdb3fb4dc6481825ddc527e0344b7c
Author: Shammon FY <[email protected]>
AuthorDate: Mon Jul 10 13:59:37 2023 +0800

    [core] Introduce factory interface (#1513)
    
    * [core] Introduce factory interface
    
    * [core] Fix KafkaLogStoreFactory
---
 .../java/org/apache/paimon/factories/Factory.java  | 40 +++++++++++++
 .../apache/paimon/factories/FactoryException.java  | 26 +++++++++
 .../org/apache/paimon/factories/FactoryUtil.java   | 68 ++++++++--------------
 .../org/apache/paimon/factories/DummyFactory.java  | 29 +++++++++
 .../apache/paimon/factories/FactoryUtilTest.java   | 48 +++++++++++++++
 .../services/org.apache.paimon.factories.Factory   |  2 +-
 .../org/apache/paimon/catalog/CatalogFactory.java  | 36 ++----------
 ...Factory => org.apache.paimon.factories.Factory} |  0
 .../paimon/flink/kafka/KafkaLogStoreFactory.java   |  2 +-
 .../paimon/flink/log/LogStoreTableFactory.java     | 17 ++----
 .../services/org.apache.paimon.factories.Factory   |  2 +-
 ...Factory => org.apache.paimon.factories.Factory} |  0
 12 files changed, 180 insertions(+), 90 deletions(-)

diff --git 
a/paimon-common/src/main/java/org/apache/paimon/factories/Factory.java 
b/paimon-common/src/main/java/org/apache/paimon/factories/Factory.java
new file mode 100644
index 000000000..b0f1ec84c
--- /dev/null
+++ b/paimon-common/src/main/java/org/apache/paimon/factories/Factory.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.factories;
+
+/**
+ * Base interface for all kind of factories that create object instances from 
a list of key-value
+ * pairs in Paimon's catalog, lineage.
+ *
+ * <p>A factory is uniquely identified by {@link Class} and {@link 
#identifier()}.
+ *
+ * <p>The list of available factories is discovered using Java's Service 
Provider Interfaces (SPI).
+ * Classes that implement this interface can be added to {@code
+ * META_INF/services/org.apache.paimon.factories.Factory} in JAR files.
+ */
+public interface Factory {
+    /**
+     * Returns a unique identifier among same factory interfaces.
+     *
+     * <p>For consistency, an identifier should be declared as one lower case 
word (e.g. {@code
+     * kafka}). If multiple factories exist for different versions, a version 
should be appended
+     * using "-" (e.g. {@code elasticsearch-7}).
+     */
+    String identifier();
+}
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/factories/FactoryException.java 
b/paimon-common/src/main/java/org/apache/paimon/factories/FactoryException.java
new file mode 100644
index 000000000..8bc7fe336
--- /dev/null
+++ 
b/paimon-common/src/main/java/org/apache/paimon/factories/FactoryException.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.factories;
+
+/** Exception for factory. */
+public class FactoryException extends RuntimeException {
+    public FactoryException(String message) {
+        super(message);
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/factories/LogStoreFactoryUtil.java
 b/paimon-common/src/main/java/org/apache/paimon/factories/FactoryUtil.java
similarity index 60%
rename from 
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/factories/LogStoreFactoryUtil.java
rename to 
paimon-common/src/main/java/org/apache/paimon/factories/FactoryUtil.java
index f7009c35b..79ddfe5c8 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/factories/LogStoreFactoryUtil.java
+++ b/paimon-common/src/main/java/org/apache/paimon/factories/FactoryUtil.java
@@ -16,12 +16,8 @@
  * limitations under the License.
  */
 
-package org.apache.paimon.flink.factories;
+package org.apache.paimon.factories;
 
-import org.apache.paimon.flink.log.LogStoreTableFactory;
-
-import org.apache.flink.table.api.TableException;
-import org.apache.flink.table.api.ValidationException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -31,58 +27,52 @@ import java.util.List;
 import java.util.ServiceLoader;
 import java.util.stream.Collectors;
 
-import static 
org.apache.flink.table.factories.ManagedTableFactory.DEFAULT_IDENTIFIER;
-
-/** Utility for working with {@link LogStoreTableFactory}s. */
-public final class LogStoreFactoryUtil {
+/** Utility for working with {@link Factory}s. */
+public class FactoryUtil {
+    private static final Logger LOG = 
LoggerFactory.getLogger(FactoryUtil.class);
 
-    private static final Logger LOG = 
LoggerFactory.getLogger(LogStoreFactoryUtil.class);
-
-    /** Discovers a LogStoreTableFactory using the given factory base class 
and identifier. */
+    /** Discovers a factory using the given factory base class and identifier. 
*/
     @SuppressWarnings("unchecked")
-    public static <T extends LogStoreTableFactory> T discoverLogStoreFactory(
-            ClassLoader classLoader, Class<T> factoryClass, String 
factoryIdentifier) {
-        final List<LogStoreTableFactory> factories = 
discoverLogStoreFactories(classLoader);
+    public static <T extends Factory> T discoverFactory(
+            ClassLoader classLoader, Class<T> factoryClass, String identifier) 
{
+        final List<Factory> factories = discoverFactories(classLoader);
 
-        final List<LogStoreTableFactory> foundFactories =
+        final List<Factory> foundFactories =
                 factories.stream()
                         .filter(f -> 
factoryClass.isAssignableFrom(f.getClass()))
                         .collect(Collectors.toList());
 
         if (foundFactories.isEmpty()) {
-            throw new ValidationException(
+            throw new FactoryException(
                     String.format(
                             "Could not find any factories that implement '%s' 
in the classpath.",
                             factoryClass.getName()));
         }
 
-        final List<LogStoreTableFactory> matchingFactories =
+        final List<Factory> matchingFactories =
                 foundFactories.stream()
-                        .filter(f -> 
f.factoryIdentifier().equals(factoryIdentifier))
+                        .filter(f -> f.identifier().equals(identifier))
                         .collect(Collectors.toList());
 
         if (matchingFactories.isEmpty()) {
-            throw new ValidationException(
+            throw new FactoryException(
                     String.format(
                             "Could not find any factory for identifier '%s' 
that implements '%s' in the classpath.\n\n"
                                     + "Available factory identifiers are:\n\n"
                                     + "%s",
-                            factoryIdentifier,
+                            identifier,
                             factoryClass.getName(),
                             foundFactories.stream()
-                                    
.map(LogStoreTableFactory::factoryIdentifier)
-                                    .filter(identifier -> 
!DEFAULT_IDENTIFIER.equals(identifier))
-                                    .distinct()
-                                    .sorted()
+                                    .map(Factory::identifier)
                                     .collect(Collectors.joining("\n"))));
         }
         if (matchingFactories.size() > 1) {
-            throw new ValidationException(
+            throw new FactoryException(
                     String.format(
                             "Multiple factories for identifier '%s' that 
implement '%s' found in the classpath.\n\n"
                                     + "Ambiguous factory classes are:\n\n"
                                     + "%s",
-                            factoryIdentifier,
+                            identifier,
                             factoryClass.getName(),
                             matchingFactories.stream()
                                     .map(f -> f.getClass().getName())
@@ -93,15 +83,11 @@ public final class LogStoreFactoryUtil {
         return (T) matchingFactories.get(0);
     }
 
-    // 
--------------------------------------------------------------------------------------------
-    // Helper methods
-    // 
--------------------------------------------------------------------------------------------
+    private static List<Factory> discoverFactories(ClassLoader classLoader) {
+        final Iterator<Factory> serviceLoaderIterator =
+                ServiceLoader.load(Factory.class, classLoader).iterator();
 
-    static List<LogStoreTableFactory> discoverLogStoreFactories(ClassLoader 
classLoader) {
-        final Iterator<LogStoreTableFactory> serviceLoaderIterator =
-                ServiceLoader.load(LogStoreTableFactory.class, 
classLoader).iterator();
-
-        final List<LogStoreTableFactory> loadResults = new ArrayList<>();
+        final List<Factory> loadResults = new ArrayList<>();
         while (true) {
             try {
                 // error handling should also be applied to the hasNext() call 
because service
@@ -115,11 +101,11 @@ public final class LogStoreFactoryUtil {
                 if (t instanceof NoClassDefFoundError) {
                     LOG.debug(
                             "NoClassDefFoundError when loading a "
-                                    + 
LogStoreTableFactory.class.getCanonicalName()
-                                    + ". This is expected when trying to load 
a format dependency but no flink-connector-files is loaded.",
+                                    + Factory.class.getCanonicalName()
+                                    + ". This is expected when trying to load 
factory but no implementation is loaded.",
                             t);
                 } else {
-                    throw new TableException(
+                    throw new RuntimeException(
                             "Unexpected error when trying to load service 
provider.", t);
                 }
             }
@@ -127,10 +113,4 @@ public final class LogStoreFactoryUtil {
 
         return loadResults;
     }
-
-    // 
--------------------------------------------------------------------------------------------
-
-    private LogStoreFactoryUtil() {
-        // no instantiation
-    }
 }
diff --git 
a/paimon-common/src/test/java/org/apache/paimon/factories/DummyFactory.java 
b/paimon-common/src/test/java/org/apache/paimon/factories/DummyFactory.java
new file mode 100644
index 000000000..a928f2a5c
--- /dev/null
+++ b/paimon-common/src/test/java/org/apache/paimon/factories/DummyFactory.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.factories;
+
+/** Dummy factory for {@link FactoryUtilTest}. */
+public class DummyFactory implements Factory {
+    public static final String IDENTIFIER = "dummy";
+
+    @Override
+    public String identifier() {
+        return IDENTIFIER;
+    }
+}
diff --git 
a/paimon-common/src/test/java/org/apache/paimon/factories/FactoryUtilTest.java 
b/paimon-common/src/test/java/org/apache/paimon/factories/FactoryUtilTest.java
new file mode 100644
index 000000000..fe6fd8b21
--- /dev/null
+++ 
b/paimon-common/src/test/java/org/apache/paimon/factories/FactoryUtilTest.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.factories;
+
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrowsExactly;
+
+/** Tests for {@link FactoryUtil}. */
+public class FactoryUtilTest {
+    @Test
+    public void testDiscoverFactory() {
+        DummyFactory factory =
+                FactoryUtil.discoverFactory(
+                        Thread.currentThread().getContextClassLoader(),
+                        DummyFactory.class,
+                        DummyFactory.IDENTIFIER);
+        assertEquals(DummyFactory.IDENTIFIER, factory.identifier());
+
+        assertThrowsExactly(
+                FactoryException.class,
+                () ->
+                        FactoryUtil.discoverFactory(
+                                Thread.currentThread().getContextClassLoader(),
+                                DummyFactory.class,
+                                "non-exist-factory"),
+                String.format(
+                        "Could not find any factory for identifier '%s' that 
implements '%s' in the classpath.",
+                        "non-exist-factory", DummyFactory.class.getName()));
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.flink.log.LogStoreTableFactory
 
b/paimon-common/src/test/resources/META-INF/services/org.apache.paimon.factories.Factory
similarity index 93%
rename from 
paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.flink.log.LogStoreTableFactory
rename to 
paimon-common/src/test/resources/META-INF/services/org.apache.paimon.factories.Factory
index 1d40d4751..20480001e 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.flink.log.LogStoreTableFactory
+++ 
b/paimon-common/src/test/resources/META-INF/services/org.apache.paimon.factories.Factory
@@ -13,4 +13,4 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-org.apache.paimon.flink.kafka.KafkaLogStoreFactory
+org.apache.paimon.factories.DummyFactory
\ No newline at end of file
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogFactory.java 
b/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogFactory.java
index 5ea94c0ec..93fa18e7f 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogFactory.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogFactory.java
@@ -19,16 +19,14 @@
 package org.apache.paimon.catalog;
 
 import org.apache.paimon.annotation.Public;
+import org.apache.paimon.factories.Factory;
+import org.apache.paimon.factories.FactoryUtil;
 import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.utils.Preconditions;
 
 import java.io.IOException;
 import java.io.UncheckedIOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.ServiceLoader;
-import java.util.stream.Collectors;
 
 import static org.apache.paimon.options.CatalogOptions.METASTORE;
 import static org.apache.paimon.options.CatalogOptions.WAREHOUSE;
@@ -40,9 +38,7 @@ import static 
org.apache.paimon.utils.Preconditions.checkArgument;
  * @since 0.4.0
  */
 @Public
-public interface CatalogFactory {
-
-    String identifier();
+public interface CatalogFactory extends Factory {
 
     Catalog create(FileIO fileIO, Path warehouse, CatalogContext context);
 
@@ -69,28 +65,8 @@ public interface CatalogFactory {
         String warehouse = warehouse(context).toUri().toString();
 
         String metastore = context.options().get(METASTORE);
-        List<CatalogFactory> factories = new ArrayList<>();
-        ServiceLoader.load(CatalogFactory.class, classLoader)
-                .iterator()
-                .forEachRemaining(
-                        f -> {
-                            if (f.identifier().equals(metastore)) {
-                                factories.add(f);
-                            }
-                        });
-        if (factories.size() != 1) {
-            throw new RuntimeException(
-                    "Found "
-                            + factories.size()
-                            + " classes implementing "
-                            + CatalogFactory.class.getName()
-                            + " with metastore "
-                            + metastore
-                            + ". They are:\n"
-                            + factories.stream()
-                                    .map(t -> t.getClass().getName())
-                                    .collect(Collectors.joining("\n")));
-        }
+        CatalogFactory catalogFactory =
+                FactoryUtil.discoverFactory(classLoader, CatalogFactory.class, 
metastore);
 
         Path warehousePath = new Path(warehouse);
         FileIO fileIO;
@@ -110,6 +86,6 @@ public interface CatalogFactory {
             throw new UncheckedIOException(e);
         }
 
-        return factories.get(0).create(fileIO, warehousePath, context);
+        return catalogFactory.create(fileIO, warehousePath, context);
     }
 }
diff --git 
a/paimon-core/src/main/resources/META-INF/services/org.apache.paimon.catalog.CatalogFactory
 
b/paimon-core/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
similarity index 100%
rename from 
paimon-core/src/main/resources/META-INF/services/org.apache.paimon.catalog.CatalogFactory
rename to 
paimon-core/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/kafka/KafkaLogStoreFactory.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/kafka/KafkaLogStoreFactory.java
index 8ea9e6c47..9e2ca648d 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/kafka/KafkaLogStoreFactory.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/kafka/KafkaLogStoreFactory.java
@@ -59,7 +59,7 @@ public class KafkaLogStoreFactory implements 
LogStoreTableFactory {
     public static final String KAFKA_PREFIX = IDENTIFIER + ".";
 
     @Override
-    public String factoryIdentifier() {
+    public String identifier() {
         return IDENTIFIER;
     }
 
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/log/LogStoreTableFactory.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/log/LogStoreTableFactory.java
index e5b92aacd..c07eda125 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/log/LogStoreTableFactory.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/log/LogStoreTableFactory.java
@@ -18,8 +18,9 @@
 
 package org.apache.paimon.flink.log;
 
+import org.apache.paimon.factories.Factory;
+import org.apache.paimon.factories.FactoryUtil;
 import 
org.apache.paimon.flink.factories.FlinkFactoryUtil.FlinkTableFactoryHelper;
-import org.apache.paimon.flink.factories.LogStoreFactoryUtil;
 
 import org.apache.flink.api.common.serialization.DeserializationSchema;
 import org.apache.flink.api.common.serialization.SerializationSchema;
@@ -51,16 +52,7 @@ import static org.apache.paimon.CoreOptions.LOG_KEY_FORMAT;
  * <p>Log tables are for processing only unbounded data. Support streaming 
reading and streaming
  * writing.
  */
-public interface LogStoreTableFactory {
-
-    /**
-     * Returns a unique identifier among same factory interfaces.
-     *
-     * <p>For consistency, an identifier should be declared as one lower case 
word (e.g. {@code
-     * kafka}). If multiple factories exist for different versions, a version 
should be appended
-     * using "-" (e.g. {@code elasticsearch-7}).
-     */
-    String factoryIdentifier();
+public interface LogStoreTableFactory extends Factory {
 
     /**
      * Creates a {@link LogSourceProvider} instance from a {@link 
CatalogTable} and additional
@@ -92,8 +84,7 @@ public interface LogStoreTableFactory {
     }
 
     static LogStoreTableFactory discoverLogStoreFactory(ClassLoader cl, String 
identifier) {
-        return LogStoreFactoryUtil.discoverLogStoreFactory(
-                cl, LogStoreTableFactory.class, identifier);
+        return FactoryUtil.discoverFactory(cl, LogStoreTableFactory.class, 
identifier);
     }
 
     static DecodingFormat<DeserializationSchema<RowData>> getKeyDecodingFormat(
diff --git 
a/paimon-hive/paimon-hive-catalog/src/main/resources/META-INF/services/org.apache.paimon.catalog.CatalogFactory
 
b/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
similarity index 93%
copy from 
paimon-hive/paimon-hive-catalog/src/main/resources/META-INF/services/org.apache.paimon.catalog.CatalogFactory
copy to 
paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
index dcc7e6554..d707adedf 100644
--- 
a/paimon-hive/paimon-hive-catalog/src/main/resources/META-INF/services/org.apache.paimon.catalog.CatalogFactory
+++ 
b/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
@@ -13,4 +13,4 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-org.apache.paimon.hive.HiveCatalogFactory
+org.apache.paimon.flink.kafka.KafkaLogStoreFactory
\ No newline at end of file
diff --git 
a/paimon-hive/paimon-hive-catalog/src/main/resources/META-INF/services/org.apache.paimon.catalog.CatalogFactory
 
b/paimon-hive/paimon-hive-catalog/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
similarity index 100%
rename from 
paimon-hive/paimon-hive-catalog/src/main/resources/META-INF/services/org.apache.paimon.catalog.CatalogFactory
rename to 
paimon-hive/paimon-hive-catalog/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory

Reply via email to