yuxiqian commented on code in PR #3758:
URL: https://github.com/apache/flink-cdc/pull/3758#discussion_r1891309541
##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceOptions.java:
##########
@@ -272,4 +272,13 @@ public class MySqlDataSourceOptions {
+ "The difference between
scan.newly-added-table.enabled and scan.binlog.newly-added-table.enabled
options is: \n"
+ "scan.newly-added-table.enabled: do
re-snapshot & binlog-reading for newly added table when restored; \n"
+ "scan.binlog.newly-added-table.enabled:
only do binlog-reading for newly added table during binlog reading phase.");
+
+ @Experimental
+ public static final ConfigOption<String> METADATA_LIST =
+ ConfigOptions.key("metadata.list")
+ .stringType()
+ .noDefaultValue()
+ .withDescription(
+ "List of readable metadata from SourceRecord to be
passed to downstream, split by `,`. "
+ + "Refer to MySqlReadableMetadata,
available readable metadata are: table_name,database_name,op_ts,row_kind.");
Review Comment:
Can we iterate `MySqlReadableMetadata.values()` and generate names
dynamically, instead of hard-encoding available enums here?
##########
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/metadata/MetadataColumns.java:
##########
@@ -26,6 +26,7 @@
/** Contains all supported metadata columns that could be used in transform
expressions. */
public class MetadataColumns {
+
Review Comment:
Accidental change?
##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/factory/MySqlDataSourceFactory.java:
##########
@@ -234,8 +239,24 @@ public DataSource createDataSource(Context context) {
LOG.info("Add chunkKeyColumn {}.", chunkKeyColumnMap);
configFactory.chunkKeyColumn(chunkKeyColumnMap);
}
+ String metadataList = config.get(METADATA_LIST);
+ List<MySqlReadableMetadata> readableMetadataList =
listReadableMetadata(metadataList);
+ return new MySqlDataSource(configFactory, readableMetadataList);
+ }
- return new MySqlDataSource(configFactory);
+ private List<MySqlReadableMetadata> listReadableMetadata(String
metadataList) {
+ if (StringUtils.isNullOrWhitespaceOnly(metadataList)) {
+ return new ArrayList<>();
+ }
+ List<String> readableMetadataList =
+ Arrays.stream(metadataList.split(","))
+ .map(String::trim)
+ .collect(Collectors.toList());
+ return Arrays.stream(MySqlReadableMetadata.values())
+ .filter(
+ (mySqlReadableMetadata ->
+
readableMetadataList.contains(mySqlReadableMetadata.getKey())))
Review Comment:
What if user passes an invalid metadata field name? Can we report an error
instead of silently discarding them?
##########
flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposer.java:
##########
@@ -103,14 +104,18 @@ public PipelineExecution compose(PipelineDef pipelineDef)
{
sourceTranslator.translate(
pipelineDef.getSource(), env, pipelineDefConfig,
parallelism);
+ DataSource dataSource =
Review Comment:
As we've created a data source from the outside,
`DataStreamSource#translate` should accept a `DataSource` as its argument,
instead of creating one internally. Just like what `DataSink` do.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]