twalthr commented on a change in pull request #17897:
URL: https://github.com/apache/flink/pull/17897#discussion_r760853470



##########
File path: flink-dist/pom.xml
##########
@@ -129,6 +129,12 @@ under the License.
                <!-- Default file system support. The Hadoop and MapR 
dependencies -->
                <!--       are optional, so not being added to the dist jar     
   -->
 
+               <dependency>

Review comment:
       with this you add the `flink-connector-files` to the `flink-dist` jar, 
no? So it is in the release two times?

##########
File path: 
flink-table/flink-table-common/src/test/java/org/apache/flink/table/factories/FactoryUtilTest.java
##########
@@ -419,22 +410,110 @@ public void testInvalidFactoryHelperWithMapOption() {
         final FactoryUtil.TableFactoryHelper helper =
                 FactoryUtil.createTableFactoryHelper(
                         new TestFactoryWithMap(), 
FactoryMocks.createTableContext(SCHEMA, options));
-        helper.validate();
+
+        assertThatThrownBy(helper::validate)
+                .satisfies(
+                        anyCauseMatches(
+                                ValidationException.class,
+                                "Unsupported options found for 
'test-factory-with-map'.\n\n"
+                                        + "Unsupported options:\n\n"
+                                        + "unknown\n\n"
+                                        + "Supported options:\n\n"
+                                        + "connector\n"
+                                        + "properties\n"
+                                        + "properties.prop-1\n"
+                                        + "properties.prop-2\n"
+                                        + "property-version"));
+    }
+
+    @Test
+    public void testDiscoverFactoryBadClass(@TempDir Path tempDir) throws 
IOException {
+        // Let's prepare the classloader with a factory interface and 2 
classes, one implements our
+        // sub-interface of SerializationFormatFactory and the other 
implements only
+        // SerializationFormatFactory.
+        final String subInterfaceName = "MyFancySerializationSchemaFormat";
+        final String subInterfaceImplementationName = 
"MyFancySerializationSchemaFormatImpl";
+        final String serializationSchemaImplementationName = 
"AnotherSerializationSchema";
+
+        final URLClassLoader classLoaderIncludingTheInterface =
+                ClassLoaderUtils.withRoot(tempDir.toFile())
+                        .addClass(
+                                subInterfaceName,
+                                "public interface "
+                                        + subInterfaceName
+                                        + " extends "
+                                        + 
SerializationFormatFactory.class.getName()
+                                        + " {}")
+                        .addClass(
+                                subInterfaceImplementationName,
+                                "import 
org.apache.flink.api.common.serialization.SerializationSchema;"
+                                        + "import 
org.apache.flink.configuration.ConfigOption;"
+                                        + "import 
org.apache.flink.configuration.ReadableConfig;"
+                                        + "import 
org.apache.flink.table.connector.format.EncodingFormat;"
+                                        + "import 
org.apache.flink.table.data.RowData;"
+                                        + "import 
org.apache.flink.table.factories.DynamicTableFactory;"
+                                        + "import 
org.apache.flink.table.factories.SerializationFormatFactory;"
+                                        + "import java.util.Set;"
+                                        + "public class "
+                                        + subInterfaceImplementationName
+                                        + " implements "
+                                        + subInterfaceName
+                                        + " {"
+                                        + "@Override public String 
factoryIdentifier() { return null; }"
+                                        + "@Override public 
Set<ConfigOption<?>> requiredOptions() { return null; }"
+                                        + "@Override public 
Set<ConfigOption<?>> optionalOptions() { return null; }"
+                                        + "@Override public 
EncodingFormat<SerializationSchema<RowData>> 
createEncodingFormat(DynamicTableFactory.Context context, ReadableConfig 
formatOptions) { return null; }"
+                                        + "}")
+                        .addClass(
+                                serializationSchemaImplementationName,
+                                "import 
org.apache.flink.api.common.serialization.SerializationSchema;"
+                                        + "import 
org.apache.flink.configuration.ConfigOption;"
+                                        + "import 
org.apache.flink.configuration.ReadableConfig;"
+                                        + "import 
org.apache.flink.table.connector.format.EncodingFormat;"
+                                        + "import 
org.apache.flink.table.data.RowData;"
+                                        + "import 
org.apache.flink.table.factories.DynamicTableFactory;"
+                                        + "import 
org.apache.flink.table.factories.SerializationFormatFactory;"
+                                        + "import java.util.Set;"
+                                        + "public class "
+                                        + serializationSchemaImplementationName
+                                        + " implements "
+                                        + 
SerializationFormatFactory.class.getName()
+                                        + " {"
+                                        + "@Override public String 
factoryIdentifier() { return null; }"
+                                        + "@Override public 
Set<ConfigOption<?>> requiredOptions() { return null; }"
+                                        + "@Override public 
Set<ConfigOption<?>> optionalOptions() { return null; }"
+                                        + "@Override public 
EncodingFormat<SerializationSchema<RowData>> 
createEncodingFormat(DynamicTableFactory.Context context, ReadableConfig 
formatOptions) { return null; }"
+                                        + "}")
+                        .addService(Factory.class.getName(), 
subInterfaceImplementationName)
+                        .addService(Factory.class.getName(), 
serializationSchemaImplementationName)
+                        .build();
+
+        // Delete the sub interface now, so it can't be loaded
+        Files.delete(tempDir.resolve(subInterfaceName + ".class"));
+
+        
assertThat(FactoryUtil.discoverFactories(classLoaderIncludingTheInterface))

Review comment:
       nice test 👍 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to