Airblader commented on a change in pull request #17847:
URL: https://github.com/apache/flink/pull/17847#discussion_r755058481



##########
File path: 
flink-connectors/flink-connector-mongodb/src/main/java/org/apache/flink/mongodb/table/util/ContextUtil.java
##########
@@ -0,0 +1,19 @@
+package org.apache.flink.mongodb.table.util;
+
+import org.apache.flink.table.factories.DynamicTableFactory;
+
+import java.util.Map;
+
+public class ContextUtil {
+    public static void transformContext(DynamicTableFactory factory, 
DynamicTableFactory.Context context) {
+        Map<String, String> catalogOptions = 
context.getCatalogTable().getOptions();
+
+        Map<String, String> convertedOptions = 
FactoryOptionUtil.normalizeOptionCaseAsFactory(factory, catalogOptions);
+
+        catalogOptions.clear();

Review comment:
       Modifying the table's options doesn't sound like a good idea, and I 
don't think it's good we even allow this. The table should probably return an 
unmodifieable map instead.

##########
File path: 
flink-connectors/flink-connector-mongodb/src/main/java/org/apache/flink/mongodb/table/MongodbConnectorOptions.java
##########
@@ -0,0 +1,80 @@
+package org.apache.flink.mongodb.table;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+
+/** Options for the Mongodb connector. */
+@PublicEvolving
+public class MongodbConnectorOptions {
+
+    public static final ConfigOption<String> DATABASE = 
ConfigOptions.key("database".toLowerCase())
+            .stringType()
+            .noDefaultValue()
+            .withDescription("The data base to connect.");
+    public static final ConfigOption<String> URI = 
ConfigOptions.key("uri".toLowerCase())
+            .stringType()
+            .noDefaultValue()
+            .withDescription("The uri to connect.");
+    public static final ConfigOption<String> COLLECTION_NAME = ConfigOptions
+            .key("collection".toLowerCase())
+            .stringType()
+            .noDefaultValue()
+            .withDescription("The name of the collection to return.");
+    public static final ConfigOption<Integer> MAX_CONNECTION_IDLE_TIME = 
ConfigOptions
+            .key("maxConnectionIdleTime".toLowerCase())
+            .intType()
+            .defaultValue(Integer.valueOf(60000))
+            .withDescription("The maximum idle time for a pooled connection.");
+    public static final ConfigOption<Integer> BATCH_SIZE = ConfigOptions
+            .key("batchSize".toLowerCase())
+            .intType()
+            .defaultValue(Integer.valueOf(1024))
+            .withDescription("The batch size when table invoking.");
+
+    public static final ConfigOption<String> FORMAT =
+            ConfigOptions.key("format")
+                    .stringType()
+                    .defaultValue("json")
+                    .withDescription(
+                            "Defines the format identifier for encoding data. "
+                                    + "The identifier is used to discover a 
suitable format factory.");
+    public static final ConfigOption<String> SERVER =
+            ConfigOptions.key("format")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "Defines the format identifier for encoding data. "
+                                    + "The identifier is used to discover a 
suitable format factory.");
+    public static final ConfigOption<String> USERNAME =
+            ConfigOptions.key("format")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "Defines the format identifier for encoding data. "
+                                    + "The identifier is used to discover a 
suitable format factory.");
+    public static final ConfigOption<String> PASSWORD =
+            ConfigOptions.key("format")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "Defines the format identifier for encoding data. "
+                                    + "The identifier is used to discover a 
suitable format factory.");
+    public static final ConfigOption<Boolean> RETRY_WRITES =
+            ConfigOptions.key("format")
+                    .booleanType()
+                    .defaultValue(true)
+                    .withDescription(
+                            "Defines the format identifier for encoding data. "
+                                    + "The identifier is used to discover a 
suitable format factory.");
+    public static final ConfigOption<Long> TIMEOUT =
+            ConfigOptions.key("format")
+                    .longType()
+                    .defaultValue(-1L)
+                    .withDescription(
+                            "Defines the format identifier for encoding data. "
+                                    + "The identifier is used to discover a 
suitable format factory.");
+
+    public MongodbConnectorOptions() {

Review comment:
       nit: this should be private

##########
File path: 
flink-connectors/flink-connector-mongodb/src/main/java/org/apache/flink/mongodb/table/MongodbDynamicTableSourceSinkFactory.java
##########
@@ -0,0 +1,102 @@
+package org.apache.flink.mongodb.table;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.mongodb.table.sink.MongodbDynamicTableSink;
+import org.apache.flink.mongodb.table.sink.MongodbSinkConf;
+import org.apache.flink.mongodb.table.source.MongodbDynamicTableSource;
+import org.apache.flink.mongodb.table.util.ContextUtil;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.connector.format.DecodingFormat;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.factories.DeserializationFormatFactory;
+import org.apache.flink.table.factories.DynamicTableSinkFactory;
+import org.apache.flink.table.factories.DynamicTableSourceFactory;
+import org.apache.flink.table.factories.FactoryUtil;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import static 
org.apache.flink.mongodb.table.MongodbConnectorOptions.BATCH_SIZE;
+import static 
org.apache.flink.mongodb.table.MongodbConnectorOptions.COLLECTION_NAME;
+import static org.apache.flink.mongodb.table.MongodbConnectorOptions.DATABASE;
+import static org.apache.flink.mongodb.table.MongodbConnectorOptions.FORMAT;
+import static 
org.apache.flink.mongodb.table.MongodbConnectorOptions.MAX_CONNECTION_IDLE_TIME;
+import static org.apache.flink.mongodb.table.MongodbConnectorOptions.PASSWORD;
+import static 
org.apache.flink.mongodb.table.MongodbConnectorOptions.RETRY_WRITES;
+import static org.apache.flink.mongodb.table.MongodbConnectorOptions.SERVER;
+import static org.apache.flink.mongodb.table.MongodbConnectorOptions.TIMEOUT;
+import static org.apache.flink.mongodb.table.MongodbConnectorOptions.URI;
+
+@Internal
+public class MongodbDynamicTableSourceSinkFactory implements 
DynamicTableSinkFactory, DynamicTableSourceFactory {
+    public static final String IDENTIFIER = "mongodb";
+
+    @Override
+    public DynamicTableSink createDynamicTableSink(Context context) {
+        ResolvedSchema physicalSchema = 
context.getCatalogTable().getResolvedSchema();
+
+        ContextUtil.transformContext(this, context);
+        FactoryUtil.TableFactoryHelper helper = 
FactoryUtil.createTableFactoryHelper(this, context);
+        helper.validate();
+
+        MongodbSinkConf mongodbSinkConf = new MongodbSinkConf((String) helper
+                .getOptions()
+                .get(DATABASE),
+                (String) helper.getOptions().get(COLLECTION_NAME),
+                (String) helper.getOptions().get(URI),
+                ((Integer) 
helper.getOptions().get(MAX_CONNECTION_IDLE_TIME)).intValue(),
+                ((Integer) helper.getOptions().get(BATCH_SIZE)).intValue());
+        return new MongodbDynamicTableSink(mongodbSinkConf, physicalSchema);
+    }
+
+    @Override
+    public String factoryIdentifier() {
+        return IDENTIFIER;
+    }
+
+    @Override
+    public Set<ConfigOption<?>> requiredOptions() {
+        Set<ConfigOption<?>> requiredOptions = new HashSet();
+        requiredOptions.add(DATABASE);
+        requiredOptions.add(SERVER);
+        requiredOptions.add(PASSWORD);
+        requiredOptions.add(COLLECTION_NAME);
+        return requiredOptions;
+    }
+
+    @Override
+    public Set<ConfigOption<?>> optionalOptions() {
+        Set<ConfigOption<?>> optionals = new HashSet();
+        optionals.add(MAX_CONNECTION_IDLE_TIME);
+        optionals.add(BATCH_SIZE);
+        optionals.add(RETRY_WRITES);
+        optionals.add(TIMEOUT);
+        return optionals;
+    }
+
+    @Override
+    public DynamicTableSource createDynamicTableSource(Context context) {
+        ContextUtil.transformContext(this, context);
+        FactoryUtil.TableFactoryHelper helper = 
FactoryUtil.createTableFactoryHelper(this, context);
+        MongodbSinkConf mongodbSinkConf = new MongodbSinkConf(
+                (String) helper
+                        .getOptions()
+                        .get(DATABASE),
+                (String) helper.getOptions().get(COLLECTION_NAME),
+                (String) helper.getOptions().get(URI),
+                ((Integer) 
helper.getOptions().get(MAX_CONNECTION_IDLE_TIME)).intValue(),
+                ((Integer) helper.getOptions().get(BATCH_SIZE)).intValue());
+
+        ResolvedSchema physicalSchema = 
context.getCatalogTable().getResolvedSchema();
+
+        final DecodingFormat<DeserializationSchema<RowData>> decodingFormat =
+                helper.discoverDecodingFormat(
+                        DeserializationFormatFactory.class, FORMAT);
+        return new MongodbDynamicTableSource(mongodbSinkConf, physicalSchema, 
decodingFormat);

Review comment:
       Please don't pass the schema at all, but only the data type. Inside the 
source that's all you need, and you're actually using the wrong data type there 
currently. You can use the shortcut method `context.getPhysicalRowDataType()` 
which will give you the correct data type.

##########
File path: 
flink-connectors/flink-connector-mongodb/src/main/java/org/apache/flink/mongodb/table/util/FactoryOptionUtil.java
##########
@@ -0,0 +1,22 @@
+package org.apache.flink.mongodb.table.util;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.table.factories.Factory;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+public class FactoryOptionUtil {
+    public static Map<String, String> normalizeOptionCaseAsFactory(final 
Factory factory, final Map<String, String> options) {

Review comment:
       What is the purpose of this?

##########
File path: 
flink-connectors/flink-connector-mongodb/src/main/java/org/apache/flink/mongodb/table/MongodbDynamicTableSourceSinkFactory.java
##########
@@ -0,0 +1,102 @@
+package org.apache.flink.mongodb.table;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.mongodb.table.sink.MongodbDynamicTableSink;
+import org.apache.flink.mongodb.table.sink.MongodbSinkConf;
+import org.apache.flink.mongodb.table.source.MongodbDynamicTableSource;
+import org.apache.flink.mongodb.table.util.ContextUtil;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.connector.format.DecodingFormat;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.factories.DeserializationFormatFactory;
+import org.apache.flink.table.factories.DynamicTableSinkFactory;
+import org.apache.flink.table.factories.DynamicTableSourceFactory;
+import org.apache.flink.table.factories.FactoryUtil;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import static 
org.apache.flink.mongodb.table.MongodbConnectorOptions.BATCH_SIZE;
+import static 
org.apache.flink.mongodb.table.MongodbConnectorOptions.COLLECTION_NAME;
+import static org.apache.flink.mongodb.table.MongodbConnectorOptions.DATABASE;
+import static org.apache.flink.mongodb.table.MongodbConnectorOptions.FORMAT;
+import static 
org.apache.flink.mongodb.table.MongodbConnectorOptions.MAX_CONNECTION_IDLE_TIME;
+import static org.apache.flink.mongodb.table.MongodbConnectorOptions.PASSWORD;
+import static 
org.apache.flink.mongodb.table.MongodbConnectorOptions.RETRY_WRITES;
+import static org.apache.flink.mongodb.table.MongodbConnectorOptions.SERVER;
+import static org.apache.flink.mongodb.table.MongodbConnectorOptions.TIMEOUT;
+import static org.apache.flink.mongodb.table.MongodbConnectorOptions.URI;
+
+@Internal
+public class MongodbDynamicTableSourceSinkFactory implements 
DynamicTableSinkFactory, DynamicTableSourceFactory {
+    public static final String IDENTIFIER = "mongodb";
+
+    @Override
+    public DynamicTableSink createDynamicTableSink(Context context) {
+        ResolvedSchema physicalSchema = 
context.getCatalogTable().getResolvedSchema();
+
+        ContextUtil.transformContext(this, context);
+        FactoryUtil.TableFactoryHelper helper = 
FactoryUtil.createTableFactoryHelper(this, context);
+        helper.validate();
+
+        MongodbSinkConf mongodbSinkConf = new MongodbSinkConf((String) helper
+                .getOptions()
+                .get(DATABASE),
+                (String) helper.getOptions().get(COLLECTION_NAME),
+                (String) helper.getOptions().get(URI),
+                ((Integer) 
helper.getOptions().get(MAX_CONNECTION_IDLE_TIME)).intValue(),
+                ((Integer) helper.getOptions().get(BATCH_SIZE)).intValue());
+        return new MongodbDynamicTableSink(mongodbSinkConf, physicalSchema);
+    }
+
+    @Override
+    public String factoryIdentifier() {
+        return IDENTIFIER;
+    }
+
+    @Override
+    public Set<ConfigOption<?>> requiredOptions() {
+        Set<ConfigOption<?>> requiredOptions = new HashSet();
+        requiredOptions.add(DATABASE);
+        requiredOptions.add(SERVER);
+        requiredOptions.add(PASSWORD);
+        requiredOptions.add(COLLECTION_NAME);
+        return requiredOptions;
+    }
+
+    @Override
+    public Set<ConfigOption<?>> optionalOptions() {
+        Set<ConfigOption<?>> optionals = new HashSet();
+        optionals.add(MAX_CONNECTION_IDLE_TIME);
+        optionals.add(BATCH_SIZE);
+        optionals.add(RETRY_WRITES);
+        optionals.add(TIMEOUT);
+        return optionals;
+    }
+
+    @Override
+    public DynamicTableSource createDynamicTableSource(Context context) {
+        ContextUtil.transformContext(this, context);
+        FactoryUtil.TableFactoryHelper helper = 
FactoryUtil.createTableFactoryHelper(this, context);
+        MongodbSinkConf mongodbSinkConf = new MongodbSinkConf(
+                (String) helper

Review comment:
       These casts are all unnecessary. Please make sure to have your IDE 
warnings enabled and that your code doesn't produce new warnings.

##########
File path: 
flink-connectors/flink-connector-mongodb/src/main/java/org/apache/flink/mongodb/table/MongodbConnectorOptions.java
##########
@@ -0,0 +1,80 @@
+package org.apache.flink.mongodb.table;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+
+/** Options for the Mongodb connector. */
+@PublicEvolving
+public class MongodbConnectorOptions {
+
+    public static final ConfigOption<String> DATABASE = 
ConfigOptions.key("database".toLowerCase())

Review comment:
       All of these `toLowerCase` in the options calls are not needed and just 
make it harder to read.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to