godfreyhe commented on a change in pull request #11797:
URL: https://github.com/apache/flink/pull/11797#discussion_r411073864
##########
File path:
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryRow.java
##########
@@ -129,13 +130,15 @@ public int getArity() {
}
@Override
- public byte getHeader() {
+ public RowKind getRowKind() {
// first nullBitsSizeInBytes byte is header.
- return segments[0].get(offset);
+ byte header = segments[0].get(offset);
+ return RowKind.values()[header];
Review comment:
big +1. If we add a new enum value in the middle, the index will also
change.
##########
File path:
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/MiniBatchGlobalGroupAggFunction.java
##########
@@ -199,27 +196,27 @@ public void finishBundle(Map<BaseRow, BaseRow> buffer,
Collector<BaseRow> out) t
// new row is not same with
prev row
if (generateUpdateBefore) {
// prepare retraction
message for previous row
Review comment:
please update the comment
##########
File path:
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/ObjectArrayRow.java
##########
@@ -113,13 +113,7 @@ public BaseRow getRow(int ordinal, int numFields) {
@Override
public String toString() {
StringBuilder sb = new StringBuilder();
- sb.append("(");
- if (BaseRowUtil.isAccumulateMsg(this)) {
- sb.append("+");
- } else {
- sb.append("-");
- }
- sb.append("|");
+ sb.append(rowKind.shortString()).append("(");
Review comment:
unify the format of `toString`. in `BinaryRow`, the format is [UA|...]
##########
File path:
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/join/stream/StreamingJoinOperator.java
##########
@@ -127,6 +127,10 @@ public void processElement2(StreamRecord<BaseRow> element)
throws Exception {
* method is too complex, so we provide the pseudo code to help
understand the logic. We should
* keep sync the following pseudo code with the real logic of the
method.
*
+ * <p>Note: "+" represents "INSERT", "-" represents "DELETE", "*"
represents input row kind.
Review comment:
consistent with the shortString value of RowKind ?
##########
File path:
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/JoinedRow.java
##########
@@ -17,14 +17,16 @@
package org.apache.flink.table.dataformat;
+import org.apache.flink.types.RowKind;
+
/**
* Join two row to one row.
*/
public final class JoinedRow implements BaseRow {
private BaseRow row1;
private BaseRow row2;
- private byte header;
+ private RowKind rowKind = RowKind.INSERT;
Review comment:
It's better we remove default value to avoid forgetting to set RowKind
somewhere
##########
File path:
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/deduplicate/DeduplicateFunctionHelper.java
##########
@@ -35,25 +37,29 @@
*
* @param currentRow latest row received by deduplicate function
* @param generateUpdateBefore whether need to send UPDATE_BEFORE
message for updates
- * @param state state of function
+ * @param state state of function, null if generateUpdateBefore is false
* @param out underlying collector
*/
static void processLastRow(
BaseRow currentRow,
boolean generateUpdateBefore,
- ValueState<BaseRow> state,
+ @Nullable ValueState<BaseRow> state,
Collector<BaseRow> out) throws Exception {
- // Check message should be accumulate
-
Preconditions.checkArgument(BaseRowUtil.isAccumulateMsg(currentRow));
+ // check message should be insert only.
+ Preconditions.checkArgument(currentRow.getRowKind() ==
RowKind.INSERT);
if (generateUpdateBefore) {
- // state stores complete row if generateUpdateBefore is
true
+ // state is not null when generateUpdateBefore is
enabled,
+ // the state stores complete row
BaseRow preRow = state.value();
state.update(currentRow);
if (preRow != null) {
- preRow.setHeader(BaseRowUtil.RETRACT_MSG);
+ preRow.setRowKind(RowKind.UPDATE_BEFORE);
out.collect(preRow);
}
}
+ // in order for better performance, we don't have state for
LastRow
+ // if not generate UPDATE_BEFORE, thus, we can't produce INSERT
messages for first row.
Review comment:
If the downstream is a database, Sink will execute a INSERT or a UPDATE
statement for a row based on RowKind. If there is no INSERT message, how to
insert the first row ?
##########
File path:
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/join/stream/StreamingJoinOperator.java
##########
@@ -182,14 +188,16 @@ private void processElement(
boolean inputIsLeft) throws Exception {
boolean inputIsOuter = inputIsLeft ? leftIsOuter : rightIsOuter;
boolean otherIsOuter = inputIsLeft ? rightIsOuter : leftIsOuter;
+ RowKind inputRowKind = input.getRowKind();
+ input.setRowKind(RowKind.INSERT); // erase RowKind for later
state updating
AssociatedRecords associatedRecords =
AssociatedRecords.of(input, inputIsLeft, otherSideStateView, joinCondition);
- if (BaseRowUtil.isAccumulateMsg(input)) { // record is
accumulate
+ if (inputRowKind == RowKind.INSERT || inputRowKind ==
RowKind.UPDATE_AFTER) { // record is accumulate
Review comment:
boolean isAccumulateMsg = BaseRowUtil.isAccumulateMsg(input);
...
if (isAccumulateMsg) ...
##########
File path:
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/ObjectArrayRow.java
##########
@@ -27,7 +27,7 @@
*/
public abstract class ObjectArrayRow implements BaseRow {
- private byte header;
+ private RowKind rowKind = RowKind.INSERT; // INSERT as default
Review comment:
ditto
##########
File path:
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/join/temporal/TemporalRowTimeJoinOperator.java
##########
@@ -153,7 +153,6 @@ public void open() throws Exception {
TIMERS_STATE_NAME, VoidNamespaceSerializer.INSTANCE,
this);
collector = new TimestampedCollector<>(output);
outRow = new JoinedRow();
- outRow.setHeader(BaseRowUtil.ACCUMULATE_MSG);
Review comment:
expicity call `outRow.setRowKind(RowKind.INSERT)`
##########
File path:
flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/harness/GroupAggregateHarnessTest.scala
##########
@@ -75,57 +75,57 @@ class GroupAggregateHarnessTest(mode: StateBackendMode)
extends HarnessTestBase(
// register cleanup timer with 3001
testHarness.setProcessingTime(1)
- // accumulate
- testHarness.processElement(new StreamRecord(binaryrow("aaa", 1L: JLong),
1))
- expectedOutput.add(new StreamRecord(binaryrow("aaa", 1L: JLong), 1))
+ // insertion
+ testHarness.processElement(binaryRecord(INSERT,"aaa", 1L: JLong))
Review comment:
is `timestamp` unnecessary ?
##########
File path:
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/join/stream/StreamingSemiAntiJoinOperator.java
##########
@@ -147,8 +154,11 @@ public void processElement1(StreamRecord<BaseRow> element)
throws Exception {
@Override
public void processElement2(StreamRecord<BaseRow> element) throws
Exception {
BaseRow input = element.getValue();
+ RowKind inputRowKind = input.getRowKind();
+ input.setRowKind(RowKind.INSERT); // erase RowKind for later
state updating
+
AssociatedRecords associatedRecords =
AssociatedRecords.of(input, false, leftRecordStateView, joinCondition);
- if (BaseRowUtil.isAccumulateMsg(input)) {
+ if (inputRowKind == RowKind.INSERT || inputRowKind ==
RowKind.UPDATE_AFTER) { // record is accumulate
Review comment:
ditto
##########
File path:
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/window/WindowOperatorBuilder.java
##########
@@ -156,13 +156,13 @@ public WindowOperatorBuilder withAllowedLateness(Duration
allowedLateness) {
if (allowedLateness.toMillis() > 0) {
this.allowedLateness = allowedLateness.toMillis();
// allow late element, which means this window will
send retractions
- this.sendRetraction = true;
+ this.produceUpdates = true;
}
return this;
}
- public WindowOperatorBuilder withSendRetraction() {
- this.sendRetraction = true;
+ public WindowOperatorBuilder withProduceUpdates() {
Review comment:
how about renaming to `produceUpdates` ?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]