yuxiqian commented on code in PR #3572:
URL: https://github.com/apache/flink-cdc/pull/3572#discussion_r1836206210
##########
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformOperator.java:
##########
@@ -342,27 +342,77 @@ private CreateTableEvent
transformCreateTableEvent(CreateTableEvent createTableE
}
}
+ cachePreTransformProcessor(tableId, createTableEvent.getSchema());
+ if (preTransformProcessorMap.containsKey(tableId)) {
+ return preTransformProcessorMap
+ .get(tableId)
+ .preTransformCreateTableEvent(createTableEvent);
+ }
+ return createTableEvent;
+ }
+
+ private void cachePreTransformProcessor(TableId tableId, Schema
tableSchema) {
+ LinkedHashSet<Column> referencedColumnsSet = new LinkedHashSet<>();
+ boolean hasMatchTransform = false;
for (PreTransformer transform : transforms) {
- Selectors selectors = transform.getSelectors();
- if (selectors.isMatch(tableId) &&
transform.getProjection().isPresent()) {
+ if (!transform.getSelectors().isMatch(tableId)) {
+ continue;
+ }
+ if (!transform.getProjection().isPresent()) {
+ processProjectionTransform(tableId, tableSchema,
referencedColumnsSet, null);
+ hasMatchTransform = true;
+ } else {
Review Comment:
Is it correct if we omit this branch and let it fallback to
`if(!hasMatchTransform)` condition? Seems the handling logic is the same.
##########
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformOperator.java:
##########
@@ -342,27 +342,77 @@ private CreateTableEvent
transformCreateTableEvent(CreateTableEvent createTableE
}
}
+ cachePreTransformProcessor(tableId, createTableEvent.getSchema());
+ if (preTransformProcessorMap.containsKey(tableId)) {
+ return preTransformProcessorMap
+ .get(tableId)
+ .preTransformCreateTableEvent(createTableEvent);
+ }
+ return createTableEvent;
+ }
+
+ private void cachePreTransformProcessor(TableId tableId, Schema
tableSchema) {
+ LinkedHashSet<Column> referencedColumnsSet = new LinkedHashSet<>();
+ boolean hasMatchTransform = false;
for (PreTransformer transform : transforms) {
- Selectors selectors = transform.getSelectors();
- if (selectors.isMatch(tableId) &&
transform.getProjection().isPresent()) {
+ if (!transform.getSelectors().isMatch(tableId)) {
+ continue;
+ }
+ if (!transform.getProjection().isPresent()) {
+ processProjectionTransform(tableId, tableSchema,
referencedColumnsSet, null);
+ hasMatchTransform = true;
+ } else {
TransformProjection transformProjection =
transform.getProjection().get();
- TransformFilter transformFilter =
transform.getFilter().orElse(null);
if (transformProjection.isValid()) {
- if (!preTransformProcessorMap.containsKey(tableId)) {
- preTransformProcessorMap.put(
- tableId,
- new PreTransformProcessor(
- tableChangeInfo, transformProjection,
transformFilter));
- }
- PreTransformProcessor preTransformProcessor =
- preTransformProcessorMap.get(tableId);
- // TODO: Currently this code wrongly filters out rows that
weren't referenced in
- // the first matching transform rule but in the following
transform rules.
- return
preTransformProcessor.preTransformCreateTableEvent(createTableEvent);
+ processProjectionTransform(
+ tableId, tableSchema, referencedColumnsSet,
transform);
+ hasMatchTransform = true;
}
}
}
- return createTableEvent;
+ if (!hasMatchTransform) {
+ processProjectionTransform(tableId, tableSchema,
referencedColumnsSet, null);
+ }
+ }
+
+ public void processProjectionTransform(
+ TableId tableId,
+ Schema tableSchema,
+ LinkedHashSet<Column> referencedColumnsSet,
+ @Nullable PreTransformer transform) {
+ // If this TableId isn't presented in any transform block, it should
behave like a "*"
+ // projection and should be regarded as asterisk-ful.
+ if (transform == null) {
+ referencedColumnsSet.addAll(tableSchema.getColumns());
+ hasAsteriskMap.put(tableId, true);
+ } else {
+ TransformProjection transformProjection =
transform.getProjection().get();
+ boolean hasAsterisk =
TransformParser.hasAsterisk(transformProjection.getProjection());
+ if (hasAsterisk) {
+ referencedColumnsSet.addAll(tableSchema.getColumns());
+ hasAsteriskMap.put(tableId, true);
+ } else {
+ TransformFilter transformFilter =
transform.getFilter().orElse(null);
+ List<Column> referencedColumns =
+ TransformParser.generateReferencedColumns(
+ transformProjection.getProjection(),
+ transformFilter != null ?
transformFilter.getExpression() : null,
+ tableSchema.getColumns());
+ // update referenced columns of other projections of the same
tableId, if any
+ referencedColumnsSet.addAll(referencedColumns);
+ hasAsteriskMap.put(tableId, false);
Review Comment:
Should we check if `hasAsteriskMap[tableId]` is already `true`, perhaps set
by a previous transform rule?
Add: I think it should not be a problem, since if someone puts asteriskful
and non-asteriskful rules together, it will fail when column count changes from
upstream anyway.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]