aiwenmo commented on code in PR #3285:
URL: https://github.com/apache/flink-cdc/pull/3285#discussion_r1597329925


##########
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperator.java:
##########
@@ -362,25 +376,51 @@ private Optional<DataChangeEvent> processFilter(
     }
 
     private Optional<DataChangeEvent> processProjection(
-            TransformProjectionProcessor transformProjectionProcessor,
+            PostTransformProcessor postTransformProcessor,
             DataChangeEvent dataChangeEvent,
-            long epochTime)
-            throws Exception {
+            long epochTime) {
         BinaryRecordData before = (BinaryRecordData) dataChangeEvent.before();
         BinaryRecordData after = (BinaryRecordData) dataChangeEvent.after();
         if (before != null) {
             BinaryRecordData projectedBefore =
-                    transformProjectionProcessor.processData(before, 
epochTime);
+                    postTransformProcessor.processData(before, epochTime);
+            dataChangeEvent = DataChangeEvent.projectBefore(dataChangeEvent, 
projectedBefore);
+        }
+        if (after != null) {
+            BinaryRecordData projectedAfter = 
postTransformProcessor.processData(after, epochTime);
+            dataChangeEvent = DataChangeEvent.projectAfter(dataChangeEvent, 
projectedAfter);
+        }
+        return Optional.of(dataChangeEvent);
+    }
+
+    private Optional<DataChangeEvent> processPostProjection(
+            TableInfo tableInfo, DataChangeEvent dataChangeEvent) throws 
Exception {
+        BinaryRecordData before = (BinaryRecordData) dataChangeEvent.before();
+        BinaryRecordData after = (BinaryRecordData) dataChangeEvent.after();
+        if (before != null) {
+            BinaryRecordData projectedBefore = projectRecord(tableInfo, 
before);
             dataChangeEvent = DataChangeEvent.projectBefore(dataChangeEvent, 
projectedBefore);
         }
         if (after != null) {
-            BinaryRecordData projectedAfter =
-                    transformProjectionProcessor.processData(after, epochTime);
+            BinaryRecordData projectedAfter = projectRecord(tableInfo, after);
             dataChangeEvent = DataChangeEvent.projectAfter(dataChangeEvent, 
projectedAfter);
         }
         return Optional.of(dataChangeEvent);
     }
 
+    private BinaryRecordData projectRecord(TableInfo tableInfo, 
BinaryRecordData recordData) {
+        List<Object> valueList = new ArrayList<>();
+        RecordData.FieldGetter[] fieldGetters = tableInfo.getFieldGetters();
+
+        for (RecordData.FieldGetter fieldGetter : fieldGetters) {
+            valueList.add(fieldGetter.getFieldOrNull(recordData));
+        }
+
+        return tableInfo
+                .getRecordDataGenerator()
+                .generate(valueList.toArray(new Object[valueList.size()]));
+    }
+
     private boolean containFilteredComputedColumn(String projection, String 
filter) {
         boolean contain = false;

Review Comment:
   Due to the filter always executing before the projection, this method is no 
longer useful.



##########
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformers.java:
##########
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.runtime.operators.transform;
+
+import org.apache.flink.cdc.common.schema.Selectors;
+
+import javax.annotation.Nullable;
+
+import java.util.Optional;
+
+/** Transformation rules used by {@link PostTransformOperator}. */
+public class PostTransformers {
+    private final Selectors selectors;
+
+    private final Optional<TransformProjection> projection;
+    private final Optional<TransformFilter> filter;
+
+    private final boolean containFilteredComputedColumn;
+

Review Comment:
   Due to the filter always executing before the projection, this private 
variable is no longer useful.



##########
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperator.java:
##########
@@ -221,73 +240,67 @@ private TableInfo 
getTableInfoFromSchemaEvolutionClient(TableId tableId) throws
         return tableInfo;
     }
 
-    private void transformSchema(TableId tableId, Schema schema) throws 
Exception {
-        for (Tuple4<Selectors, Optional<TransformProjection>, 
Optional<TransformFilter>, Boolean>
-                transform : transforms) {
-            Selectors selectors = transform.f0;
-            if (selectors.isMatch(tableId) && transform.f1.isPresent()) {
-                TransformProjection transformProjection = transform.f1.get();
+    private Schema transformSchema(TableId tableId, Schema schema) throws 
Exception {
+        List<Schema> newSchemas = new ArrayList<>();
+        for (PostTransformers transform : transforms) {
+            Selectors selectors = transform.getSelectors();
+            if (selectors.isMatch(tableId) && 
transform.getProjection().isPresent()) {
+                TransformProjection transformProjection = 
transform.getProjection().get();
+                TransformFilter transformFilter = 
transform.getFilter().orElse(null);
                 if (transformProjection.isValid()) {
                     if 
(!transformProjectionProcessorMap.containsKey(transformProjection)) {
                         transformProjectionProcessorMap.put(
                                 transformProjection,
-                                
TransformProjectionProcessor.of(transformProjection));
+                                PostTransformProcessor.of(transformProjection, 
transformFilter));
                     }
-                    TransformProjectionProcessor transformProjectionProcessor =
+                    PostTransformProcessor postTransformProcessor =
                             
transformProjectionProcessorMap.get(transformProjection);
                     // update the columns of projection and add the column of 
projection into Schema
-                    
transformProjectionProcessor.processSchemaChangeEvent(schema);
+                    
newSchemas.add(postTransformProcessor.processSchemaChangeEvent(schema));
                 }
             }
         }
+        if (newSchemas.isEmpty()) {
+            return schema;
+        }
+
+        Schema firstSchema = newSchemas.get(0);
+        newSchemas.stream()
+                .skip(1)
+                .forEach(
+                        testSchema -> {
+                            if (!testSchema.equals(firstSchema)) {
+                                throw new IllegalArgumentException(
+                                        String.format(
+                                                "Incompatible transform rules 
result found. Inferred schema: %s and %s",
+                                                firstSchema, testSchema));
+                            }
+                        });
+        return firstSchema;

Review Comment:
   Compatible data types, length, precision, and non nulls may be recognized as 
unequal.



##########
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/TransformParser.java:
##########
@@ -384,10 +481,73 @@ public static SqlSelect parseFilterExpression(String 
filterExpression) {
         StringBuilder statement = new StringBuilder();
         statement.append("SELECT * FROM ");
         statement.append(DEFAULT_TABLE);
-        if (!StringUtils.isNullOrWhitespaceOnly(filterExpression)) {
+        if (!isNullOrWhitespaceOnly(filterExpression)) {
             statement.append(" WHERE ");
             statement.append(filterExpression);
         }
         return parseSelect(statement.toString());
     }
+
+    public static SqlNode rewriteExpression(SqlNode sqlNode, Map<String, 
SqlNode> replaceMap) {
+        if (sqlNode instanceof SqlBasicCall) {
+            SqlBasicCall sqlBasicCall = (SqlBasicCall) sqlNode;

Review Comment:
   Need to handle SqlCase.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to