fhueske commented on code in PR #28128:
URL: https://github.com/apache/flink/pull/28128#discussion_r3225814572


##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/PartitionedTable.java:
##########
@@ -169,4 +169,96 @@ public interface PartitionedTable {
      * @see ProcessTableFunction
      */
     Table process(Class<? extends UserDefinedFunction> function, Object... 
arguments);
+
+    /**
+     * Converts this partitioned dynamic table into an append-only table with 
an explicit operation
+     * code column using the built-in {@code TO_CHANGELOG} process table 
function with set
+     * semantics.
+     *
+     * <p>Each input row - regardless of its original change operation - is 
emitted as an
+     * INSERT-only row with a string {@code "op"} column indicating the 
original operation (INSERT,
+     * UPDATE_AFTER, DELETE, etc.). With set semantics, rows for the same 
partition key are
+     * co-located in the same parallel operator instance.
+     *
+     * <p>For row semantics (each row processed independently), use {@link 
Table#toChangelog} on the
+     * unpartitioned table.
+     *
+     * <p>Examples:
+     *
+     * <pre>{@code
+     * // Default: adds 'op' column and supports all changelog modes
+     * Table result = table.partitionBy($("id")).toChangelog();
+     *
+     * // Custom op column name and mapping
+     * Table result = table
+     *     .partitionBy($("id"))
+     *     .toChangelog(
+     *         descriptor("op_code").asArgument("op"),
+     *         map("INSERT", "I", "UPDATE_AFTER", "U").asArgument("op_mapping")
+     *     );
+     *
+     * // Deletion flag pattern: comma-separated keys map multiple change 
operations to the same code
+     * Table result = table
+     *     .partitionBy($("id"))
+     *     .toChangelog(
+     *         descriptor("deleted").asArgument("op"),
+     *         map("INSERT, UPDATE_AFTER", "false", "DELETE", 
"true").asArgument("op_mapping")
+     *     );
+     * }</pre>
+     *
+     * @param arguments optional named arguments for {@code op} and {@code 
op_mapping}
+     * @return an append-only {@link Table} ordered as {@code [partition_keys, 
op,

Review Comment:
   what do you mean by "ordered"?
   The output isn't explicitly ordered, right?



##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/PartitionedTable.java:
##########
@@ -169,4 +169,96 @@ public interface PartitionedTable {
      * @see ProcessTableFunction
      */
     Table process(Class<? extends UserDefinedFunction> function, Object... 
arguments);
+
+    /**
+     * Converts this partitioned dynamic table into an append-only table with 
an explicit operation
+     * code column using the built-in {@code TO_CHANGELOG} process table 
function with set
+     * semantics.
+     *
+     * <p>Each input row - regardless of its original change operation - is 
emitted as an
+     * INSERT-only row with a string {@code "op"} column indicating the 
original operation (INSERT,
+     * UPDATE_AFTER, DELETE, etc.). With set semantics, rows for the same 
partition key are
+     * co-located in the same parallel operator instance.
+     *
+     * <p>For row semantics (each row processed independently), use {@link 
Table#toChangelog} on the
+     * unpartitioned table.
+     *
+     * <p>Examples:
+     *
+     * <pre>{@code
+     * // Default: adds 'op' column and supports all changelog modes
+     * Table result = table.partitionBy($("id")).toChangelog();
+     *
+     * // Custom op column name and mapping
+     * Table result = table
+     *     .partitionBy($("id"))
+     *     .toChangelog(
+     *         descriptor("op_code").asArgument("op"),
+     *         map("INSERT", "I", "UPDATE_AFTER", "U").asArgument("op_mapping")
+     *     );
+     *
+     * // Deletion flag pattern: comma-separated keys map multiple change 
operations to the same code
+     * Table result = table
+     *     .partitionBy($("id"))
+     *     .toChangelog(
+     *         descriptor("deleted").asArgument("op"),
+     *         map("INSERT, UPDATE_AFTER", "false", "DELETE", 
"true").asArgument("op_mapping")
+     *     );
+     * }</pre>
+     *
+     * @param arguments optional named arguments for {@code op} and {@code 
op_mapping}
+     * @return an append-only {@link Table} ordered as {@code [partition_keys, 
op,
+     *     non_partition_input_columns]}
+     * @see Table#toChangelog(Expression...)
+     */
+    Table toChangelog(Expression... arguments);
+
+    /**
+     * Converts this partitioned append-only table with an explicit operation 
code column into a
+     * (potentially updating) dynamic table using the built-in {@code 
FROM_CHANGELOG} process table
+     * function with set semantics.
+     *
+     * <p>Each input row is expected to have a string column that indicates 
the change operation.
+     * The operation column is interpreted by the engine and removed from the 
output. With set
+     * semantics, rows for the same partition key are co-located in the same 
parallel operator
+     * instance, which is required when downstream operators are keyed on that 
column.
+     *
+     * <p>For row semantics (each row processed independently), use {@link 
Table#fromChangelog} on
+     * the unpartitioned table.
+     *
+     * <p>Examples:
+     *
+     * <pre>{@code
+     * // Default: reads 'op' column with standard change operation names
+     * Table result = cdcStream.partitionBy($("id")).fromChangelog();
+     *
+     * // With custom op column name
+     * Table result = cdcStream
+     *     .partitionBy($("id"))
+     *     .fromChangelog(descriptor("operation").asArgument("op"));
+     *
+     * // With custom op_mapping
+     * Table result = cdcStream
+     *     .partitionBy($("id"))
+     *     .fromChangelog(
+     *         descriptor("op").asArgument("op"),
+     *         map("c, r", "INSERT",
+     *             "ub", "UPDATE_BEFORE",
+     *             "ua", "UPDATE_AFTER",
+     *             "d", "DELETE").asArgument("op_mapping")
+     *     );
+     *
+     * // Silently skip rows with NULL or unmapped op codes instead of failing
+     * Table result = cdcStream
+     *     .partitionBy($("id"))
+     *     .fromChangelog(lit("SKIP").asArgument("error_handling"));
+     * }</pre>
+     *
+     * @param arguments optional named arguments for {@code op}, {@code 
op_mapping}, and {@code
+     *     error_handling}
+     * @return a dynamic {@link Table} ordered as {@code [partition_keys,

Review Comment:
   same as above



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to