twalthr commented on a change in pull request #17897:
URL: https://github.com/apache/flink/pull/17897#discussion_r760853470
##########
File path: flink-dist/pom.xml
##########
@@ -129,6 +129,12 @@ under the License.
<!-- Default file system support. The Hadoop and MapR
dependencies -->
<!-- are optional, so not being added to the dist jar
-->
+ <dependency>
Review comment:
with this you add the `flink-connector-files` to the `flink-dist` jar,
no? So it is in the release two times?
##########
File path:
flink-table/flink-table-common/src/test/java/org/apache/flink/table/factories/FactoryUtilTest.java
##########
@@ -419,22 +410,110 @@ public void testInvalidFactoryHelperWithMapOption() {
final FactoryUtil.TableFactoryHelper helper =
FactoryUtil.createTableFactoryHelper(
new TestFactoryWithMap(),
FactoryMocks.createTableContext(SCHEMA, options));
- helper.validate();
+
+ assertThatThrownBy(helper::validate)
+ .satisfies(
+ anyCauseMatches(
+ ValidationException.class,
+ "Unsupported options found for
'test-factory-with-map'.\n\n"
+ + "Unsupported options:\n\n"
+ + "unknown\n\n"
+ + "Supported options:\n\n"
+ + "connector\n"
+ + "properties\n"
+ + "properties.prop-1\n"
+ + "properties.prop-2\n"
+ + "property-version"));
+ }
+
+ @Test
+ public void testDiscoverFactoryBadClass(@TempDir Path tempDir) throws
IOException {
+ // Let's prepare the classloader with a factory interface and 2
classes, one implements our
+ // sub-interface of SerializationFormatFactory and the other
implements only
+ // SerializationFormatFactory.
+ final String subInterfaceName = "MyFancySerializationSchemaFormat";
+ final String subInterfaceImplementationName =
"MyFancySerializationSchemaFormatImpl";
+ final String serializationSchemaImplementationName =
"AnotherSerializationSchema";
+
+ final URLClassLoader classLoaderIncludingTheInterface =
+ ClassLoaderUtils.withRoot(tempDir.toFile())
+ .addClass(
+ subInterfaceName,
+ "public interface "
+ + subInterfaceName
+ + " extends "
+ +
SerializationFormatFactory.class.getName()
+ + " {}")
+ .addClass(
+ subInterfaceImplementationName,
+ "import
org.apache.flink.api.common.serialization.SerializationSchema;"
+ + "import
org.apache.flink.configuration.ConfigOption;"
+ + "import
org.apache.flink.configuration.ReadableConfig;"
+ + "import
org.apache.flink.table.connector.format.EncodingFormat;"
+ + "import
org.apache.flink.table.data.RowData;"
+ + "import
org.apache.flink.table.factories.DynamicTableFactory;"
+ + "import
org.apache.flink.table.factories.SerializationFormatFactory;"
+ + "import java.util.Set;"
+ + "public class "
+ + subInterfaceImplementationName
+ + " implements "
+ + subInterfaceName
+ + " {"
+ + "@Override public String
factoryIdentifier() { return null; }"
+ + "@Override public
Set<ConfigOption<?>> requiredOptions() { return null; }"
+ + "@Override public
Set<ConfigOption<?>> optionalOptions() { return null; }"
+ + "@Override public
EncodingFormat<SerializationSchema<RowData>>
createEncodingFormat(DynamicTableFactory.Context context, ReadableConfig
formatOptions) { return null; }"
+ + "}")
+ .addClass(
+ serializationSchemaImplementationName,
+ "import
org.apache.flink.api.common.serialization.SerializationSchema;"
+ + "import
org.apache.flink.configuration.ConfigOption;"
+ + "import
org.apache.flink.configuration.ReadableConfig;"
+ + "import
org.apache.flink.table.connector.format.EncodingFormat;"
+ + "import
org.apache.flink.table.data.RowData;"
+ + "import
org.apache.flink.table.factories.DynamicTableFactory;"
+ + "import
org.apache.flink.table.factories.SerializationFormatFactory;"
+ + "import java.util.Set;"
+ + "public class "
+ + serializationSchemaImplementationName
+ + " implements "
+ +
SerializationFormatFactory.class.getName()
+ + " {"
+ + "@Override public String
factoryIdentifier() { return null; }"
+ + "@Override public
Set<ConfigOption<?>> requiredOptions() { return null; }"
+ + "@Override public
Set<ConfigOption<?>> optionalOptions() { return null; }"
+ + "@Override public
EncodingFormat<SerializationSchema<RowData>>
createEncodingFormat(DynamicTableFactory.Context context, ReadableConfig
formatOptions) { return null; }"
+ + "}")
+ .addService(Factory.class.getName(),
subInterfaceImplementationName)
+ .addService(Factory.class.getName(),
serializationSchemaImplementationName)
+ .build();
+
+ // Delete the sub interface now, so it can't be loaded
+ Files.delete(tempDir.resolve(subInterfaceName + ".class"));
+
+
assertThat(FactoryUtil.discoverFactories(classLoaderIncludingTheInterface))
Review comment:
nice test 👍
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]