This is an automated email from the ASF dual-hosted git repository.
zjureel pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 031f4a473 [core] Introduce factory interface (#1513)
031f4a473 is described below
commit 031f4a4735bdb3fb4dc6481825ddc527e0344b7c
Author: Shammon FY <[email protected]>
AuthorDate: Mon Jul 10 13:59:37 2023 +0800
[core] Introduce factory interface (#1513)
* [core] Introduce factory interface
* [core] Fix KafkaLogStoreFactory
---
.../java/org/apache/paimon/factories/Factory.java | 40 +++++++++++++
.../apache/paimon/factories/FactoryException.java | 26 +++++++++
.../org/apache/paimon/factories/FactoryUtil.java | 68 ++++++++--------------
.../org/apache/paimon/factories/DummyFactory.java | 29 +++++++++
.../apache/paimon/factories/FactoryUtilTest.java | 48 +++++++++++++++
.../services/org.apache.paimon.factories.Factory | 2 +-
.../org/apache/paimon/catalog/CatalogFactory.java | 36 ++----------
...Factory => org.apache.paimon.factories.Factory} | 0
.../paimon/flink/kafka/KafkaLogStoreFactory.java | 2 +-
.../paimon/flink/log/LogStoreTableFactory.java | 17 ++----
.../services/org.apache.paimon.factories.Factory | 2 +-
...Factory => org.apache.paimon.factories.Factory} | 0
12 files changed, 180 insertions(+), 90 deletions(-)
diff --git
a/paimon-common/src/main/java/org/apache/paimon/factories/Factory.java
b/paimon-common/src/main/java/org/apache/paimon/factories/Factory.java
new file mode 100644
index 000000000..b0f1ec84c
--- /dev/null
+++ b/paimon-common/src/main/java/org/apache/paimon/factories/Factory.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.factories;
+
+/**
+ * Base interface for all kind of factories that create object instances from
a list of key-value
+ * pairs in Paimon's catalog, lineage.
+ *
+ * <p>A factory is uniquely identified by {@link Class} and {@link
#identifier()}.
+ *
+ * <p>The list of available factories is discovered using Java's Service
Provider Interfaces (SPI).
+ * Classes that implement this interface can be added to {@code
+ * META_INF/services/org.apache.paimon.factories.Factory} in JAR files.
+ */
+public interface Factory {
+ /**
+ * Returns a unique identifier among same factory interfaces.
+ *
+ * <p>For consistency, an identifier should be declared as one lower case
word (e.g. {@code
+ * kafka}). If multiple factories exist for different versions, a version
should be appended
+ * using "-" (e.g. {@code elasticsearch-7}).
+ */
+ String identifier();
+}
diff --git
a/paimon-common/src/main/java/org/apache/paimon/factories/FactoryException.java
b/paimon-common/src/main/java/org/apache/paimon/factories/FactoryException.java
new file mode 100644
index 000000000..8bc7fe336
--- /dev/null
+++
b/paimon-common/src/main/java/org/apache/paimon/factories/FactoryException.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.factories;
+
+/** Exception for factory. */
+public class FactoryException extends RuntimeException {
+ public FactoryException(String message) {
+ super(message);
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/factories/LogStoreFactoryUtil.java
b/paimon-common/src/main/java/org/apache/paimon/factories/FactoryUtil.java
similarity index 60%
rename from
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/factories/LogStoreFactoryUtil.java
rename to
paimon-common/src/main/java/org/apache/paimon/factories/FactoryUtil.java
index f7009c35b..79ddfe5c8 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/factories/LogStoreFactoryUtil.java
+++ b/paimon-common/src/main/java/org/apache/paimon/factories/FactoryUtil.java
@@ -16,12 +16,8 @@
* limitations under the License.
*/
-package org.apache.paimon.flink.factories;
+package org.apache.paimon.factories;
-import org.apache.paimon.flink.log.LogStoreTableFactory;
-
-import org.apache.flink.table.api.TableException;
-import org.apache.flink.table.api.ValidationException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -31,58 +27,52 @@ import java.util.List;
import java.util.ServiceLoader;
import java.util.stream.Collectors;
-import static
org.apache.flink.table.factories.ManagedTableFactory.DEFAULT_IDENTIFIER;
-
-/** Utility for working with {@link LogStoreTableFactory}s. */
-public final class LogStoreFactoryUtil {
+/** Utility for working with {@link Factory}s. */
+public class FactoryUtil {
+ private static final Logger LOG =
LoggerFactory.getLogger(FactoryUtil.class);
- private static final Logger LOG =
LoggerFactory.getLogger(LogStoreFactoryUtil.class);
-
- /** Discovers a LogStoreTableFactory using the given factory base class
and identifier. */
+ /** Discovers a factory using the given factory base class and identifier.
*/
@SuppressWarnings("unchecked")
- public static <T extends LogStoreTableFactory> T discoverLogStoreFactory(
- ClassLoader classLoader, Class<T> factoryClass, String
factoryIdentifier) {
- final List<LogStoreTableFactory> factories =
discoverLogStoreFactories(classLoader);
+ public static <T extends Factory> T discoverFactory(
+ ClassLoader classLoader, Class<T> factoryClass, String identifier)
{
+ final List<Factory> factories = discoverFactories(classLoader);
- final List<LogStoreTableFactory> foundFactories =
+ final List<Factory> foundFactories =
factories.stream()
.filter(f ->
factoryClass.isAssignableFrom(f.getClass()))
.collect(Collectors.toList());
if (foundFactories.isEmpty()) {
- throw new ValidationException(
+ throw new FactoryException(
String.format(
"Could not find any factories that implement '%s'
in the classpath.",
factoryClass.getName()));
}
- final List<LogStoreTableFactory> matchingFactories =
+ final List<Factory> matchingFactories =
foundFactories.stream()
- .filter(f ->
f.factoryIdentifier().equals(factoryIdentifier))
+ .filter(f -> f.identifier().equals(identifier))
.collect(Collectors.toList());
if (matchingFactories.isEmpty()) {
- throw new ValidationException(
+ throw new FactoryException(
String.format(
"Could not find any factory for identifier '%s'
that implements '%s' in the classpath.\n\n"
+ "Available factory identifiers are:\n\n"
+ "%s",
- factoryIdentifier,
+ identifier,
factoryClass.getName(),
foundFactories.stream()
-
.map(LogStoreTableFactory::factoryIdentifier)
- .filter(identifier ->
!DEFAULT_IDENTIFIER.equals(identifier))
- .distinct()
- .sorted()
+ .map(Factory::identifier)
.collect(Collectors.joining("\n"))));
}
if (matchingFactories.size() > 1) {
- throw new ValidationException(
+ throw new FactoryException(
String.format(
"Multiple factories for identifier '%s' that
implement '%s' found in the classpath.\n\n"
+ "Ambiguous factory classes are:\n\n"
+ "%s",
- factoryIdentifier,
+ identifier,
factoryClass.getName(),
matchingFactories.stream()
.map(f -> f.getClass().getName())
@@ -93,15 +83,11 @@ public final class LogStoreFactoryUtil {
return (T) matchingFactories.get(0);
}
- //
--------------------------------------------------------------------------------------------
- // Helper methods
- //
--------------------------------------------------------------------------------------------
+ private static List<Factory> discoverFactories(ClassLoader classLoader) {
+ final Iterator<Factory> serviceLoaderIterator =
+ ServiceLoader.load(Factory.class, classLoader).iterator();
- static List<LogStoreTableFactory> discoverLogStoreFactories(ClassLoader
classLoader) {
- final Iterator<LogStoreTableFactory> serviceLoaderIterator =
- ServiceLoader.load(LogStoreTableFactory.class,
classLoader).iterator();
-
- final List<LogStoreTableFactory> loadResults = new ArrayList<>();
+ final List<Factory> loadResults = new ArrayList<>();
while (true) {
try {
// error handling should also be applied to the hasNext() call
because service
@@ -115,11 +101,11 @@ public final class LogStoreFactoryUtil {
if (t instanceof NoClassDefFoundError) {
LOG.debug(
"NoClassDefFoundError when loading a "
- +
LogStoreTableFactory.class.getCanonicalName()
- + ". This is expected when trying to load
a format dependency but no flink-connector-files is loaded.",
+ + Factory.class.getCanonicalName()
+ + ". This is expected when trying to load
factory but no implementation is loaded.",
t);
} else {
- throw new TableException(
+ throw new RuntimeException(
"Unexpected error when trying to load service
provider.", t);
}
}
@@ -127,10 +113,4 @@ public final class LogStoreFactoryUtil {
return loadResults;
}
-
- //
--------------------------------------------------------------------------------------------
-
- private LogStoreFactoryUtil() {
- // no instantiation
- }
}
diff --git
a/paimon-common/src/test/java/org/apache/paimon/factories/DummyFactory.java
b/paimon-common/src/test/java/org/apache/paimon/factories/DummyFactory.java
new file mode 100644
index 000000000..a928f2a5c
--- /dev/null
+++ b/paimon-common/src/test/java/org/apache/paimon/factories/DummyFactory.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.factories;
+
+/** Dummy factory for {@link FactoryUtilTest}. */
+public class DummyFactory implements Factory {
+ public static final String IDENTIFIER = "dummy";
+
+ @Override
+ public String identifier() {
+ return IDENTIFIER;
+ }
+}
diff --git
a/paimon-common/src/test/java/org/apache/paimon/factories/FactoryUtilTest.java
b/paimon-common/src/test/java/org/apache/paimon/factories/FactoryUtilTest.java
new file mode 100644
index 000000000..fe6fd8b21
--- /dev/null
+++
b/paimon-common/src/test/java/org/apache/paimon/factories/FactoryUtilTest.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.factories;
+
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrowsExactly;
+
+/** Tests for {@link FactoryUtil}. */
+public class FactoryUtilTest {
+ @Test
+ public void testDiscoverFactory() {
+ DummyFactory factory =
+ FactoryUtil.discoverFactory(
+ Thread.currentThread().getContextClassLoader(),
+ DummyFactory.class,
+ DummyFactory.IDENTIFIER);
+ assertEquals(DummyFactory.IDENTIFIER, factory.identifier());
+
+ assertThrowsExactly(
+ FactoryException.class,
+ () ->
+ FactoryUtil.discoverFactory(
+ Thread.currentThread().getContextClassLoader(),
+ DummyFactory.class,
+ "non-exist-factory"),
+ String.format(
+ "Could not find any factory for identifier '%s' that
implements '%s' in the classpath.",
+ "non-exist-factory", DummyFactory.class.getName()));
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.flink.log.LogStoreTableFactory
b/paimon-common/src/test/resources/META-INF/services/org.apache.paimon.factories.Factory
similarity index 93%
rename from
paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.flink.log.LogStoreTableFactory
rename to
paimon-common/src/test/resources/META-INF/services/org.apache.paimon.factories.Factory
index 1d40d4751..20480001e 100644
---
a/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.flink.log.LogStoreTableFactory
+++
b/paimon-common/src/test/resources/META-INF/services/org.apache.paimon.factories.Factory
@@ -13,4 +13,4 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-org.apache.paimon.flink.kafka.KafkaLogStoreFactory
+org.apache.paimon.factories.DummyFactory
\ No newline at end of file
diff --git
a/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogFactory.java
b/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogFactory.java
index 5ea94c0ec..93fa18e7f 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogFactory.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogFactory.java
@@ -19,16 +19,14 @@
package org.apache.paimon.catalog;
import org.apache.paimon.annotation.Public;
+import org.apache.paimon.factories.Factory;
+import org.apache.paimon.factories.FactoryUtil;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.utils.Preconditions;
import java.io.IOException;
import java.io.UncheckedIOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.ServiceLoader;
-import java.util.stream.Collectors;
import static org.apache.paimon.options.CatalogOptions.METASTORE;
import static org.apache.paimon.options.CatalogOptions.WAREHOUSE;
@@ -40,9 +38,7 @@ import static
org.apache.paimon.utils.Preconditions.checkArgument;
* @since 0.4.0
*/
@Public
-public interface CatalogFactory {
-
- String identifier();
+public interface CatalogFactory extends Factory {
Catalog create(FileIO fileIO, Path warehouse, CatalogContext context);
@@ -69,28 +65,8 @@ public interface CatalogFactory {
String warehouse = warehouse(context).toUri().toString();
String metastore = context.options().get(METASTORE);
- List<CatalogFactory> factories = new ArrayList<>();
- ServiceLoader.load(CatalogFactory.class, classLoader)
- .iterator()
- .forEachRemaining(
- f -> {
- if (f.identifier().equals(metastore)) {
- factories.add(f);
- }
- });
- if (factories.size() != 1) {
- throw new RuntimeException(
- "Found "
- + factories.size()
- + " classes implementing "
- + CatalogFactory.class.getName()
- + " with metastore "
- + metastore
- + ". They are:\n"
- + factories.stream()
- .map(t -> t.getClass().getName())
- .collect(Collectors.joining("\n")));
- }
+ CatalogFactory catalogFactory =
+ FactoryUtil.discoverFactory(classLoader, CatalogFactory.class,
metastore);
Path warehousePath = new Path(warehouse);
FileIO fileIO;
@@ -110,6 +86,6 @@ public interface CatalogFactory {
throw new UncheckedIOException(e);
}
- return factories.get(0).create(fileIO, warehousePath, context);
+ return catalogFactory.create(fileIO, warehousePath, context);
}
}
diff --git
a/paimon-core/src/main/resources/META-INF/services/org.apache.paimon.catalog.CatalogFactory
b/paimon-core/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
similarity index 100%
rename from
paimon-core/src/main/resources/META-INF/services/org.apache.paimon.catalog.CatalogFactory
rename to
paimon-core/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/kafka/KafkaLogStoreFactory.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/kafka/KafkaLogStoreFactory.java
index 8ea9e6c47..9e2ca648d 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/kafka/KafkaLogStoreFactory.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/kafka/KafkaLogStoreFactory.java
@@ -59,7 +59,7 @@ public class KafkaLogStoreFactory implements
LogStoreTableFactory {
public static final String KAFKA_PREFIX = IDENTIFIER + ".";
@Override
- public String factoryIdentifier() {
+ public String identifier() {
return IDENTIFIER;
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/log/LogStoreTableFactory.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/log/LogStoreTableFactory.java
index e5b92aacd..c07eda125 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/log/LogStoreTableFactory.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/log/LogStoreTableFactory.java
@@ -18,8 +18,9 @@
package org.apache.paimon.flink.log;
+import org.apache.paimon.factories.Factory;
+import org.apache.paimon.factories.FactoryUtil;
import
org.apache.paimon.flink.factories.FlinkFactoryUtil.FlinkTableFactoryHelper;
-import org.apache.paimon.flink.factories.LogStoreFactoryUtil;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.serialization.SerializationSchema;
@@ -51,16 +52,7 @@ import static org.apache.paimon.CoreOptions.LOG_KEY_FORMAT;
* <p>Log tables are for processing only unbounded data. Support streaming
reading and streaming
* writing.
*/
-public interface LogStoreTableFactory {
-
- /**
- * Returns a unique identifier among same factory interfaces.
- *
- * <p>For consistency, an identifier should be declared as one lower case
word (e.g. {@code
- * kafka}). If multiple factories exist for different versions, a version
should be appended
- * using "-" (e.g. {@code elasticsearch-7}).
- */
- String factoryIdentifier();
+public interface LogStoreTableFactory extends Factory {
/**
* Creates a {@link LogSourceProvider} instance from a {@link
CatalogTable} and additional
@@ -92,8 +84,7 @@ public interface LogStoreTableFactory {
}
static LogStoreTableFactory discoverLogStoreFactory(ClassLoader cl, String
identifier) {
- return LogStoreFactoryUtil.discoverLogStoreFactory(
- cl, LogStoreTableFactory.class, identifier);
+ return FactoryUtil.discoverFactory(cl, LogStoreTableFactory.class,
identifier);
}
static DecodingFormat<DeserializationSchema<RowData>> getKeyDecodingFormat(
diff --git
a/paimon-hive/paimon-hive-catalog/src/main/resources/META-INF/services/org.apache.paimon.catalog.CatalogFactory
b/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
similarity index 93%
copy from
paimon-hive/paimon-hive-catalog/src/main/resources/META-INF/services/org.apache.paimon.catalog.CatalogFactory
copy to
paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
index dcc7e6554..d707adedf 100644
---
a/paimon-hive/paimon-hive-catalog/src/main/resources/META-INF/services/org.apache.paimon.catalog.CatalogFactory
+++
b/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
@@ -13,4 +13,4 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-org.apache.paimon.hive.HiveCatalogFactory
+org.apache.paimon.flink.kafka.KafkaLogStoreFactory
\ No newline at end of file
diff --git
a/paimon-hive/paimon-hive-catalog/src/main/resources/META-INF/services/org.apache.paimon.catalog.CatalogFactory
b/paimon-hive/paimon-hive-catalog/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
similarity index 100%
rename from
paimon-hive/paimon-hive-catalog/src/main/resources/META-INF/services/org.apache.paimon.catalog.CatalogFactory
rename to
paimon-hive/paimon-hive-catalog/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory