SteNicholas commented on code in PR #558:
URL: https://github.com/apache/flink-table-store/pull/558#discussion_r1118546503


##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/aggregate/FieldAggregator.java:
##########
@@ -31,44 +31,59 @@ public FieldAggregator(DataType dataType) {
     }
 
     static FieldAggregator createFieldAggregator(
-            DataType fieldType, String strAgg, boolean isPrimaryKey) {
-        final FieldAggregator fieldAggregator;
+            DataType fieldType, String strAgg, boolean ignoreRetract, boolean 
isPrimaryKey) {
+        FieldAggregator fieldAggregator;
         if (isPrimaryKey) {
-            fieldAggregator = new FieldLastValueAgg(fieldType);
+            fieldAggregator = new FieldPrimaryKeyAgg(fieldType);
         } else {
             // ordered by type root definition
             switch (strAgg) {
-                case "sum":
+                case FieldSumAgg.NAME:
                     fieldAggregator = new FieldSumAgg(fieldType);
                     break;
-                case "max":
+                case FieldMaxAgg.NAME:
                     fieldAggregator = new FieldMaxAgg(fieldType);
                     break;
-                case "min":
+                case FieldMinAgg.NAME:
                     fieldAggregator = new FieldMinAgg(fieldType);
                     break;
-                case "last_non_null_value":
+                case FieldLastNonNullValueAgg.NAME:
                     fieldAggregator = new FieldLastNonNullValueAgg(fieldType);
                     break;
-                case "last_value":
+                case FieldLastValueAgg.NAME:
                     fieldAggregator = new FieldLastValueAgg(fieldType);
                     break;
-                case "listagg":
+                case FieldListaggAgg.NAME:
                     fieldAggregator = new FieldListaggAgg(fieldType);
                     break;
-                case "bool_or":
+                case FieldBoolOrAgg.NAME:
                     fieldAggregator = new FieldBoolOrAgg(fieldType);
                     break;
-                case "bool_and":
+                case FieldBoolAndAgg.NAME:
                     fieldAggregator = new FieldBoolAndAgg(fieldType);
                     break;
                 default:
                     throw new RuntimeException(
                             "Use unsupported aggregation or spell aggregate 
function incorrectly!");
             }
         }
+
+        if (ignoreRetract) {
+            fieldAggregator = new FieldIgnoreRetractAgg(fieldAggregator);
+        }
         return fieldAggregator;
     }
 
+    abstract String name();
+
     abstract Object agg(Object accumulator, Object inputField);
+
+    Object retract(Object accumulator, Object retractField) {
+        throw new UnsupportedOperationException(
+                String.format(
+                        "Aggregate function '%s' dose not support retraction,"

Review Comment:
   ```suggestion
                           "Aggregate function '%s' does not support 
retraction,"
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to