brachi-wernick commented on a change in pull request #16598:
URL: https://github.com/apache/flink/pull/16598#discussion_r677542021
##########
File path:
flink-connectors/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/table/PubSubTableSourceFactory.java
##########
@@ -0,0 +1,78 @@
+package org.apache.flink.streaming.connectors.gcp.pubsub.table;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.table.connector.format.DecodingFormat;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.factories.DeserializationFormatFactory;
+import org.apache.flink.table.factories.DynamicTableSourceFactory;
+import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.table.types.DataType;
+
+import java.util.HashSet;
+import java.util.Set;
+
+@Internal
+public class PubSubTableSourceFactory implements DynamicTableSourceFactory {
+
+ // define all options statically
+ public static final ConfigOption<String> PROJECT_NAME =
+ ConfigOptions.key("projectName").stringType().noDefaultValue();
+
+ public static final ConfigOption<String> TOPIC =
+ ConfigOptions.key("topic").stringType().noDefaultValue();
+
+ public static final ConfigOption<String> FORMAT =
+ ConfigOptions.key("format").stringType().noDefaultValue();
+
+ @Override
+ public String factoryIdentifier() {
+ return "pubsub";
+ }
+
+ @Override
+ public Set<ConfigOption<?>> requiredOptions() {
+ final Set<ConfigOption<?>> options = new HashSet<>();
+ options.add(PROJECT_NAME);
+ options.add(TOPIC);
+ options.add(FORMAT);
+ return options;
+ }
+
+ @Override
+ public Set<ConfigOption<?>> optionalOptions() {
+ return new HashSet<>();
+ }
+
+ @Override
+ public DynamicTableSource createDynamicTableSource(Context context) {
+ // either implement your custom validation logic here ...
+ // or use the provided helper utility
+ final FactoryUtil.TableFactoryHelper helper =
+ FactoryUtil.createTableFactoryHelper(this, context);
+
+ // validate all options
+ helper.validateExcept("json.");
Review comment:
When there is json config in the create table sql, the validation here
failed.
These are the json config I see:
```
-- 'json.map-null-key.mode' = 'FAIL', -- Optional flag to control the
handling mode when serializing null key for map data, FAIL by default. Option
DROP will drop null key entries for map data. Option LITERAL will use
'map-null-key.literal' as key literal.
-- 'json.encode.decimal-as-plain-number' = 'false', -- Optional flag to
specify whether to encode all decimals as plain numbers instead of possible
scientific notations, false by default.
-- 'json.map-null-key.literal' = 'null', -- Optional flag to specify
string literal for null keys when 'map-null-key.mode' is LITERAL, "null" by
default.
-- 'json.fail-on-missing-field' = 'false', -- Optional flag to specify
whether to fail if a field is missing or not, false by default.
-- 'json.ignore-parse-errors' = 'false', -- Optional flag to skip fields
and rows with parse errors instead of failing; fields are set to null in case
of errors, false by default.
-- 'json.timestamp-format.standard' = 'SQL', -- Optional flag to specify
timestamp format, SQL by default. Option ISO-8601 will parse input timestamp in
"yyyy-MM-ddTHH:mm:ss.s{precision}" format and output timestamp in the same
format. Option SQL will parse input timestamp in "yyyy-MM-dd
HH:mm:ss.s{precision}" format and output timestamp in the same format.
```
Any other idea how to address it?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]