yuxiqian commented on code in PR #3285:
URL: https://github.com/apache/flink-cdc/pull/3285#discussion_r1597351018
##########
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperator.java:
##########
@@ -221,73 +240,67 @@ private TableInfo
getTableInfoFromSchemaEvolutionClient(TableId tableId) throws
return tableInfo;
}
- private void transformSchema(TableId tableId, Schema schema) throws
Exception {
- for (Tuple4<Selectors, Optional<TransformProjection>,
Optional<TransformFilter>, Boolean>
- transform : transforms) {
- Selectors selectors = transform.f0;
- if (selectors.isMatch(tableId) && transform.f1.isPresent()) {
- TransformProjection transformProjection = transform.f1.get();
+ private Schema transformSchema(TableId tableId, Schema schema) throws
Exception {
+ List<Schema> newSchemas = new ArrayList<>();
+ for (PostTransformers transform : transforms) {
+ Selectors selectors = transform.getSelectors();
+ if (selectors.isMatch(tableId) &&
transform.getProjection().isPresent()) {
+ TransformProjection transformProjection =
transform.getProjection().get();
+ TransformFilter transformFilter =
transform.getFilter().orElse(null);
if (transformProjection.isValid()) {
if
(!transformProjectionProcessorMap.containsKey(transformProjection)) {
transformProjectionProcessorMap.put(
transformProjection,
-
TransformProjectionProcessor.of(transformProjection));
+ PostTransformProcessor.of(transformProjection,
transformFilter));
}
- TransformProjectionProcessor transformProjectionProcessor =
+ PostTransformProcessor postTransformProcessor =
transformProjectionProcessorMap.get(transformProjection);
// update the columns of projection and add the column of
projection into Schema
-
transformProjectionProcessor.processSchemaChangeEvent(schema);
+
newSchemas.add(postTransformProcessor.processSchemaChangeEvent(schema));
}
}
}
+ if (newSchemas.isEmpty()) {
+ return schema;
+ }
+
+ Schema firstSchema = newSchemas.get(0);
+ newSchemas.stream()
+ .skip(1)
+ .forEach(
+ testSchema -> {
+ if (!testSchema.equals(firstSchema)) {
+ throw new IllegalArgumentException(
+ String.format(
+ "Incompatible transform rules
result found. Inferred schema: %s and %s",
+ firstSchema, testSchema));
+ }
+ });
+ return firstSchema;
Review Comment:
Added `SchemaUtils.mergeCompatibleUpstreamSchema` to resolve this.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]