brachi-wernick commented on a change in pull request #16598:
URL: https://github.com/apache/flink/pull/16598#discussion_r677542021



##########
File path: 
flink-connectors/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/table/PubSubTableSourceFactory.java
##########
@@ -0,0 +1,78 @@
+package org.apache.flink.streaming.connectors.gcp.pubsub.table;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.table.connector.format.DecodingFormat;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.factories.DeserializationFormatFactory;
+import org.apache.flink.table.factories.DynamicTableSourceFactory;
+import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.table.types.DataType;
+
+import java.util.HashSet;
+import java.util.Set;
+
+@Internal
+public class PubSubTableSourceFactory implements DynamicTableSourceFactory {
+
+    // define all options statically
+    public static final ConfigOption<String> PROJECT_NAME =
+            ConfigOptions.key("projectName").stringType().noDefaultValue();
+
+    public static final ConfigOption<String> TOPIC =
+            ConfigOptions.key("topic").stringType().noDefaultValue();
+
+    public static final ConfigOption<String> FORMAT =
+            ConfigOptions.key("format").stringType().noDefaultValue();
+
+    @Override
+    public String factoryIdentifier() {
+        return "pubsub";
+    }
+
+    @Override
+    public Set<ConfigOption<?>> requiredOptions() {
+        final Set<ConfigOption<?>> options = new HashSet<>();
+        options.add(PROJECT_NAME);
+        options.add(TOPIC);
+        options.add(FORMAT);
+        return options;
+    }
+
+    @Override
+    public Set<ConfigOption<?>> optionalOptions() {
+        return new HashSet<>();
+    }
+
+    @Override
+    public DynamicTableSource createDynamicTableSource(Context context) {
+        // either implement your custom validation logic here ...
+        // or use the provided helper utility
+        final FactoryUtil.TableFactoryHelper helper =
+                FactoryUtil.createTableFactoryHelper(this, context);
+
+        // validate all options
+        helper.validateExcept("json.");

Review comment:
       When there is json config in the create table sql, the validation here 
failed.
   These are the json config I see:
   ```
    -- 'json.map-null-key.mode' = 'FAIL', -- Optional flag to control the 
handling mode when serializing null key for map data, FAIL by default. Option 
DROP will drop null key entries for map data. Option LITERAL will use 
'map-null-key.literal' as key literal.
     -- 'json.encode.decimal-as-plain-number' = 'false', -- Optional flag to 
specify whether to encode all decimals as plain numbers instead of possible 
scientific notations, false by default.
     -- 'json.map-null-key.literal' = 'null', -- Optional flag to specify 
string literal for null keys when 'map-null-key.mode' is LITERAL, "null" by 
default.
     -- 'json.fail-on-missing-field' = 'false', -- Optional flag to specify 
whether to fail if a field is missing or not, false by default.
     -- 'json.ignore-parse-errors' = 'false', -- Optional flag to skip fields 
and rows with parse errors instead of failing; fields are set to null in case 
of errors, false by default.
     -- 'json.timestamp-format.standard' = 'SQL', -- Optional flag to specify 
timestamp format, SQL by default. Option ISO-8601 will parse input timestamp in 
"yyyy-MM-ddTHH:mm:ss.s{precision}" format and output timestamp in the same 
format. Option SQL will parse input timestamp in "yyyy-MM-dd 
HH:mm:ss.s{precision}" format and output timestamp in the same format.
     ```
   Any other idea how to address it?
    




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to