beyond1920 commented on a change in pull request #11797:
URL: https://github.com/apache/flink/pull/11797#discussion_r412756766
##########
File path:
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/DeduplicateFunctionHelper.java
##########
@@ -35,26 +35,31 @@
*
* @param currentRow latest row received by deduplicate function
* @param generateUpdateBefore whether need to send UPDATE_BEFORE
message for updates
- * @param state state of function
+ * @param state state of function, null if generateUpdateBefore is false
* @param out underlying collector
*/
static void processLastRow(
BaseRow currentRow,
boolean generateUpdateBefore,
ValueState<BaseRow> state,
Collector<BaseRow> out) throws Exception {
- // Check message should be accumulate
-
Preconditions.checkArgument(BaseRowUtil.isAccumulateMsg(currentRow));
- if (generateUpdateBefore) {
- // state stores complete row if generateUpdateBefore is
true
- BaseRow preRow = state.value();
- state.update(currentRow);
- if (preRow != null) {
- preRow.setHeader(BaseRowUtil.RETRACT_MSG);
+ // check message should be insert only.
+ Preconditions.checkArgument(currentRow.getRowKind() ==
RowKind.INSERT);
+ BaseRow preRow = state.value();
+ state.update(currentRow);
Review comment:
This refactor may cause performance degrade. If generateUpdateBefore is
false, we does not need to put currentRow to state.
##########
File path:
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/DeduplicateKeepLastRowFunction.java
##########
@@ -56,15 +56,12 @@ public DeduplicateKeepLastRowFunction(
@Override
public void open(Configuration configure) throws Exception {
super.open(configure);
- if (generateUpdateBefore) {
- // state stores complete row if need generate
retraction, otherwise do not need a state
- ValueStateDescriptor<BaseRow> stateDesc = new
ValueStateDescriptor<>("preRowState", rowTypeInfo);
- StateTtlConfig ttlConfig =
createTtlConfig(minRetentionTime);
- if (ttlConfig.isEnabled()) {
- stateDesc.enableTimeToLive(ttlConfig);
- }
- state = getRuntimeContext().getState(stateDesc);
+ ValueStateDescriptor<BaseRow> stateDesc = new
ValueStateDescriptor<>("preRowState", rowTypeInfo);
Review comment:
If generateUpdateBefore is false, we does not need to put currentRow to
state.
##########
File path:
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/DeduplicateFunctionHelper.java
##########
@@ -35,26 +35,31 @@
*
* @param currentRow latest row received by deduplicate function
* @param generateUpdateBefore whether need to send UPDATE_BEFORE
message for updates
- * @param state state of function
+ * @param state state of function, null if generateUpdateBefore is false
* @param out underlying collector
*/
static void processLastRow(
BaseRow currentRow,
boolean generateUpdateBefore,
ValueState<BaseRow> state,
Collector<BaseRow> out) throws Exception {
- // Check message should be accumulate
-
Preconditions.checkArgument(BaseRowUtil.isAccumulateMsg(currentRow));
- if (generateUpdateBefore) {
- // state stores complete row if generateUpdateBefore is
true
- BaseRow preRow = state.value();
- state.update(currentRow);
- if (preRow != null) {
- preRow.setHeader(BaseRowUtil.RETRACT_MSG);
+ // check message should be insert only.
+ Preconditions.checkArgument(currentRow.getRowKind() ==
RowKind.INSERT);
+ BaseRow preRow = state.value();
+ state.update(currentRow);
Review comment:
shall we send currentRow with RowKind.UPDATE_AFTER if
generateUpdateBefore is false?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]