Airblader commented on a change in pull request #16598:
URL: https://github.com/apache/flink/pull/16598#discussion_r677386515
##########
File path:
flink-connectors/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/table/PubSubTableSourceFactory.java
##########
@@ -0,0 +1,78 @@
+package org.apache.flink.streaming.connectors.gcp.pubsub.table;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.table.connector.format.DecodingFormat;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.factories.DeserializationFormatFactory;
+import org.apache.flink.table.factories.DynamicTableSourceFactory;
+import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.table.types.DataType;
+
+import java.util.HashSet;
+import java.util.Set;
+
+@Internal
+public class PubSubTableSourceFactory implements DynamicTableSourceFactory {
Review comment:
Can you please add JavaDoc to all classes? This is required for the CI
to pass as well.
##########
File path:
flink-connectors/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/table/PubSubTableSourceFactory.java
##########
@@ -0,0 +1,78 @@
+package org.apache.flink.streaming.connectors.gcp.pubsub.table;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.table.connector.format.DecodingFormat;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.factories.DeserializationFormatFactory;
+import org.apache.flink.table.factories.DynamicTableSourceFactory;
+import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.table.types.DataType;
+
+import java.util.HashSet;
+import java.util.Set;
+
+@Internal
+public class PubSubTableSourceFactory implements DynamicTableSourceFactory {
+
+ // define all options statically
+ public static final ConfigOption<String> PROJECT_NAME =
Review comment:
Please move config options to a separate `PubSubConnectorConfigOptions`
class and mark it as `@PublicEvolving`, since config options are public API.
##########
File path:
flink-connectors/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/table/PubSubTableSourceFactory.java
##########
@@ -0,0 +1,78 @@
+package org.apache.flink.streaming.connectors.gcp.pubsub.table;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.table.connector.format.DecodingFormat;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.factories.DeserializationFormatFactory;
+import org.apache.flink.table.factories.DynamicTableSourceFactory;
+import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.table.types.DataType;
+
+import java.util.HashSet;
+import java.util.Set;
+
+@Internal
+public class PubSubTableSourceFactory implements DynamicTableSourceFactory {
Review comment:
The naming convention for these so far would be
`PubSubDynamicTableFactory`, can you please rename it?
##########
File path:
flink-connectors/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/table/PubsubDynamicSource.java
##########
@@ -0,0 +1,75 @@
+package org.apache.flink.streaming.connectors.gcp.pubsub.table;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.streaming.connectors.gcp.pubsub.PubSubSource;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.format.DecodingFormat;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.connector.source.ScanTableSource;
+import org.apache.flink.table.connector.source.SourceFunctionProvider;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.DataType;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+/** */
Review comment:
Please add proper documentation.
##########
File path:
flink-connectors/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/table/PubsubDynamicSource.java
##########
@@ -0,0 +1,75 @@
+package org.apache.flink.streaming.connectors.gcp.pubsub.table;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.streaming.connectors.gcp.pubsub.PubSubSource;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.format.DecodingFormat;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.connector.source.ScanTableSource;
+import org.apache.flink.table.connector.source.SourceFunctionProvider;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.DataType;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+/** */
+@Internal
+public class PubsubDynamicSource implements ScanTableSource {
+ private final String project;
+ private final String topic;
+ private final DecodingFormat<DeserializationSchema<RowData>>
decodingFormat;
+ private final DataType producedDataType;
+ private static Logger logger =
LoggerFactory.getLogger(PubsubDynamicSource.class);
Review comment:
Can we move the static field to the top and separate it from instance
fields?
##########
File path:
flink-connectors/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/table/PubSubTableSourceFactory.java
##########
@@ -0,0 +1,78 @@
+package org.apache.flink.streaming.connectors.gcp.pubsub.table;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.table.connector.format.DecodingFormat;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.factories.DeserializationFormatFactory;
+import org.apache.flink.table.factories.DynamicTableSourceFactory;
+import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.table.types.DataType;
+
+import java.util.HashSet;
+import java.util.Set;
+
+@Internal
+public class PubSubTableSourceFactory implements DynamicTableSourceFactory {
+
+ // define all options statically
+ public static final ConfigOption<String> PROJECT_NAME =
+ ConfigOptions.key("projectName").stringType().noDefaultValue();
+
+ public static final ConfigOption<String> TOPIC =
+ ConfigOptions.key("topic").stringType().noDefaultValue();
+
+ public static final ConfigOption<String> FORMAT =
+ ConfigOptions.key("format").stringType().noDefaultValue();
+
+ @Override
+ public String factoryIdentifier() {
+ return "pubsub";
+ }
+
+ @Override
+ public Set<ConfigOption<?>> requiredOptions() {
+ final Set<ConfigOption<?>> options = new HashSet<>();
+ options.add(PROJECT_NAME);
+ options.add(TOPIC);
+ options.add(FORMAT);
+ return options;
+ }
+
+ @Override
+ public Set<ConfigOption<?>> optionalOptions() {
+ return new HashSet<>();
+ }
+
+ @Override
+ public DynamicTableSource createDynamicTableSource(Context context) {
+ // either implement your custom validation logic here ...
+ // or use the provided helper utility
+ final FactoryUtil.TableFactoryHelper helper =
+ FactoryUtil.createTableFactoryHelper(this, context);
+
+ // validate all options
+ helper.validateExcept("json.");
Review comment:
There are no `json.*` options? What is this doing?
##########
File path:
flink-connectors/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/table/PubSubTableSourceFactory.java
##########
@@ -0,0 +1,78 @@
+package org.apache.flink.streaming.connectors.gcp.pubsub.table;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.table.connector.format.DecodingFormat;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.factories.DeserializationFormatFactory;
+import org.apache.flink.table.factories.DynamicTableSourceFactory;
+import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.table.types.DataType;
+
+import java.util.HashSet;
+import java.util.Set;
+
+@Internal
+public class PubSubTableSourceFactory implements DynamicTableSourceFactory {
+
+ // define all options statically
+ public static final ConfigOption<String> PROJECT_NAME =
+ ConfigOptions.key("projectName").stringType().noDefaultValue();
+
+ public static final ConfigOption<String> TOPIC =
+ ConfigOptions.key("topic").stringType().noDefaultValue();
+
+ public static final ConfigOption<String> FORMAT =
+ ConfigOptions.key("format").stringType().noDefaultValue();
+
+ @Override
+ public String factoryIdentifier() {
+ return "pubsub";
+ }
+
+ @Override
+ public Set<ConfigOption<?>> requiredOptions() {
+ final Set<ConfigOption<?>> options = new HashSet<>();
+ options.add(PROJECT_NAME);
+ options.add(TOPIC);
+ options.add(FORMAT);
+ return options;
+ }
+
+ @Override
+ public Set<ConfigOption<?>> optionalOptions() {
+ return new HashSet<>();
+ }
+
+ @Override
+ public DynamicTableSource createDynamicTableSource(Context context) {
+ // either implement your custom validation logic here ...
Review comment:
Looks like these comments should be removed.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]