This is an automated email from the ASF dual-hosted git repository.
ron pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 8170c457cb7 [FLINK-34221][table-runtime] Introduce
MiniBatchStreamingJoinOperator
8170c457cb7 is described below
commit 8170c457cb70bb8fd88b98baf3acc612eaab8ec5
Author: yeming <[email protected]>
AuthorDate: Mon Jan 22 10:26:26 2024 +0800
[FLINK-34221][table-runtime] Introduce MiniBatchStreamingJoinOperator
This closes 24160
---
.../stream/MiniBatchStreamingJoinOperator.java | 430 +++++
.../join/stream/StreamingJoinOperator.java | 31 +-
.../runtime/operators/metrics/SimpleGauge.java | 44 +
.../join/stream/StreamingJoinOperatorTestBase.java | 13 +-
.../stream/StreamingMiniBatchJoinOperatorTest.java | 1956 ++++++++++++++++++++
5 files changed, 2456 insertions(+), 18 deletions(-)
diff --git
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/stream/MiniBatchStreamingJoinOperator.java
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/stream/MiniBatchStreamingJoinOperator.java
new file mode 100644
index 00000000000..e0d667c67c6
--- /dev/null
+++
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/stream/MiniBatchStreamingJoinOperator.java
@@ -0,0 +1,430 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.join.stream;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.util.RowDataUtil;
+import org.apache.flink.table.runtime.generated.GeneratedJoinCondition;
+import
org.apache.flink.table.runtime.operators.bundle.trigger.BundleTriggerCallback;
+import org.apache.flink.table.runtime.operators.bundle.trigger.CoBundleTrigger;
+import org.apache.flink.table.runtime.operators.join.FlinkJoinType;
+import
org.apache.flink.table.runtime.operators.join.stream.bundle.BufferBundle;
+import
org.apache.flink.table.runtime.operators.join.stream.bundle.InputSideHasNoUniqueKeyBundle;
+import
org.apache.flink.table.runtime.operators.join.stream.bundle.InputSideHasUniqueKeyBundle;
+import
org.apache.flink.table.runtime.operators.join.stream.bundle.JoinKeyContainsUniqueKeyBundle;
+import
org.apache.flink.table.runtime.operators.join.stream.state.JoinInputSideSpec;
+import
org.apache.flink.table.runtime.operators.join.stream.state.JoinRecordStateView;
+import org.apache.flink.table.runtime.operators.metrics.SimpleGauge;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+
+import java.io.Serializable;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+/** Streaming unbounded Join base operator which support mini-batch join. */
+public abstract class MiniBatchStreamingJoinOperator extends
StreamingJoinOperator
+ implements BundleTriggerCallback {
+
+ private static final long serialVersionUID = -1106342589994963997L;
+
+ private final CoBundleTrigger<RowData, RowData> coBundleTrigger;
+
+ private transient BufferBundle<?> leftBuffer;
+ private transient BufferBundle<?> rightBuffer;
+ private transient SimpleGauge<Integer> leftBundleReducedSizeGauge;
+ private transient SimpleGauge<Integer> rightBundleReducedSizeGauge;
+ private transient TypeSerializer<RowData> leftSerializer;
+ private transient TypeSerializer<RowData> rightSerializer;
+
+ public MiniBatchStreamingJoinOperator(MiniBatchStreamingJoinParameter
parameter) {
+ super(
+ parameter.leftType,
+ parameter.rightType,
+ parameter.generatedJoinCondition,
+ parameter.leftInputSideSpec,
+ parameter.rightInputSideSpec,
+ parameter.leftIsOuter,
+ parameter.rightIsOuter,
+ parameter.filterNullKeys,
+ parameter.leftStateRetentionTime,
+ parameter.rightStateRetentionTime);
+
+ this.coBundleTrigger = parameter.coBundleTrigger;
+ }
+
+ @Override
+ public void open() throws Exception {
+ super.open();
+
+ this.leftBuffer = initialBuffer(leftInputSideSpec);
+ this.rightBuffer = initialBuffer(rightInputSideSpec);
+
+ coBundleTrigger.registerCallback(this);
+ coBundleTrigger.reset();
+ LOG.info("Initialize MiniBatchStreamingJoinOperator successfully.");
+
+ this.leftSerializer = leftType.createSerializer(getExecutionConfig());
+ this.rightSerializer =
rightType.createSerializer(getExecutionConfig());
+
+ // register metrics
+ leftBundleReducedSizeGauge = new SimpleGauge<>(0);
+ rightBundleReducedSizeGauge = new SimpleGauge<>(0);
+
+ getRuntimeContext()
+ .getMetricGroup()
+ .gauge("leftBundleReducedSize", leftBundleReducedSizeGauge);
+ getRuntimeContext()
+ .getMetricGroup()
+ .gauge("rightBundleReducedSize", rightBundleReducedSizeGauge);
+ }
+
+ @Override
+ public void processElement1(StreamRecord<RowData> element) throws
Exception {
+ RowData record = leftSerializer.copy(element.getValue());
+ RowData joinKey = (RowData) getCurrentKey();
+ RowData uniqueKey = null;
+ if (leftInputSideSpec.getUniqueKeySelector() != null) {
+ uniqueKey =
leftInputSideSpec.getUniqueKeySelector().getKey(record);
+ }
+ leftBuffer.addRecord(joinKey, uniqueKey, record);
+ coBundleTrigger.onElement1(record);
+ }
+
+ @Override
+ public void processElement2(StreamRecord<RowData> element) throws
Exception {
+ RowData record = rightSerializer.copy(element.getValue());
+ RowData joinKey = (RowData) getCurrentKey();
+ RowData uniqueKey = null;
+ if (rightInputSideSpec.getUniqueKeySelector() != null) {
+ uniqueKey =
rightInputSideSpec.getUniqueKeySelector().getKey(record);
+ }
+ rightBuffer.addRecord(joinKey, uniqueKey, record);
+ coBundleTrigger.onElement2(record);
+ }
+
+ @Override
+ public void processWatermark1(Watermark mark) throws Exception {
+ finishBundle();
+ super.processWatermark1(mark);
+ }
+
+ @Override
+ public void processWatermark2(Watermark mark) throws Exception {
+ finishBundle();
+ super.processWatermark2(mark);
+ }
+
+ @Override
+ public void prepareSnapshotPreBarrier(long checkpointId) throws Exception {
+ finishBundle();
+ }
+
+ @Override
+ public void finish() throws Exception {
+ finishBundle();
+ super.finish();
+ }
+
+ @Override
+ public void close() throws Exception {
+ super.close();
+ this.leftBuffer.clear();
+ this.rightBuffer.clear();
+ }
+
+ @Override
+ public void finishBundle() throws Exception {
+ if (!leftBuffer.isEmpty() || !rightBuffer.isEmpty()) {
+ // update metrics value
+ leftBundleReducedSizeGauge.update(leftBuffer.reducedSize());
+ rightBundleReducedSizeGauge.update(rightBuffer.reducedSize());
+
+ this.processBundles(leftBuffer, rightBuffer);
+ leftBuffer.clear();
+ rightBuffer.clear();
+ }
+ coBundleTrigger.reset();
+ }
+
+ protected abstract void processBundles(BufferBundle<?> leftBuffer,
BufferBundle<?> rightBuffer)
+ throws Exception;
+
+ private BufferBundle<?> initialBuffer(JoinInputSideSpec inputSideSpec) {
+ if (inputSideSpec.joinKeyContainsUniqueKey()) {
+ return new JoinKeyContainsUniqueKeyBundle();
+ }
+ if (inputSideSpec.hasUniqueKey()) {
+ return new InputSideHasUniqueKeyBundle();
+ }
+ return new InputSideHasNoUniqueKeyBundle();
+ }
+
+ private void processElementWithSuppress(
+ Iterator<RowData> iter,
+ JoinRecordStateView inputSideStateView,
+ JoinRecordStateView otherSideStateView,
+ boolean inputIsLeft)
+ throws Exception {
+ RowData pre = null; // always retractMsg if not null
+ while (iter.hasNext()) {
+ RowData current = iter.next();
+ boolean isSuppress = false;
+ if (RowDataUtil.isRetractMsg(current) && iter.hasNext()) {
+ RowData next = iter.next();
+ if (RowDataUtil.isAccumulateMsg(next)) {
+ isSuppress = true;
+ } else {
+ // retract + retract
+ pre = next;
+ }
+ processElement(
+ current, inputSideStateView, otherSideStateView,
inputIsLeft, isSuppress);
+ if (isSuppress) {
+ processElement(
+ next, inputSideStateView, otherSideStateView,
inputIsLeft, isSuppress);
+ }
+ } else {
+ // 1. current is accumulateMsg 2. current is retractMsg and no
next row
+ if (pre != null) {
+ if (RowDataUtil.isAccumulateMsg(current)) {
+ isSuppress = true;
+ }
+ processElement(
+ pre, inputSideStateView, otherSideStateView,
inputIsLeft, isSuppress);
+ pre = null;
+ }
+ processElement(
+ current, inputSideStateView, otherSideStateView,
inputIsLeft, isSuppress);
+ }
+ }
+ }
+
+ /**
+ * RetractMsg+accumulatingMsg would be optimized which would keep sending
retractMsg but do not
+ * deal with state.
+ */
+ protected void processSingleSideBundles(
+ BufferBundle<?> inputBuffer,
+ JoinRecordStateView inputSideStateView,
+ JoinRecordStateView otherSideStateView,
+ boolean inputIsLeft)
+ throws Exception {
+ if (inputBuffer instanceof InputSideHasNoUniqueKeyBundle) {
+ // -U/+U pair is already folded in the buffer so it is no need to
go to
+ // processElementByPair function
+ for (Map.Entry<RowData, List<RowData>> entry :
inputBuffer.getRecords().entrySet()) {
+ // set state key first
+ setCurrentKey(entry.getKey());
+ for (RowData rowData : entry.getValue()) {
+ processElement(
+ rowData, inputSideStateView, otherSideStateView,
inputIsLeft, false);
+ }
+ }
+ } else if (inputBuffer instanceof JoinKeyContainsUniqueKeyBundle) {
+ for (Map.Entry<RowData, List<RowData>> entry :
inputBuffer.getRecords().entrySet()) {
+ // set state key first
+ setCurrentKey(entry.getKey());
+ Iterator<RowData> iter = entry.getValue().iterator();
+ processElementWithSuppress(
+ iter, inputSideStateView, otherSideStateView,
inputIsLeft);
+ }
+ } else if (inputBuffer instanceof InputSideHasUniqueKeyBundle) {
+ for (RowData joinKey : inputBuffer.getJoinKeys()) {
+ // set state key first
+ setCurrentKey(joinKey);
+ for (Map.Entry<RowData, List<RowData>> entry :
+ inputBuffer.getRecordsWithJoinKey(joinKey).entrySet())
{
+ Iterator<RowData> iter = entry.getValue().iterator();
+ processElementWithSuppress(
+ iter, inputSideStateView, otherSideStateView,
inputIsLeft);
+ }
+ }
+ }
+ }
+
+ public static MiniBatchStreamingJoinOperator
newMiniBatchStreamJoinOperator(
+ FlinkJoinType joinType,
+ InternalTypeInfo<RowData> leftType,
+ InternalTypeInfo<RowData> rightType,
+ GeneratedJoinCondition generatedJoinCondition,
+ JoinInputSideSpec leftInputSideSpec,
+ JoinInputSideSpec rightInputSideSpec,
+ boolean leftIsOuter,
+ boolean rightIsOuter,
+ boolean[] filterNullKeys,
+ long leftStateRetentionTime,
+ long rightStateRetentionTime,
+ CoBundleTrigger<RowData, RowData> coBundleTrigger) {
+ MiniBatchStreamingJoinParameter parameter =
+ new MiniBatchStreamingJoinParameter(
+ leftType,
+ rightType,
+ generatedJoinCondition,
+ leftInputSideSpec,
+ rightInputSideSpec,
+ leftIsOuter,
+ rightIsOuter,
+ filterNullKeys,
+ leftStateRetentionTime,
+ rightStateRetentionTime,
+ coBundleTrigger);
+ switch (joinType) {
+ case INNER:
+ return new MiniBatchInnerJoinStreamOperator(parameter);
+ case LEFT:
+ return new MiniBatchLeftOuterJoinStreamOperator(parameter);
+ case RIGHT:
+ return new MiniBatchRightOuterJoinStreamOperator(parameter);
+ case FULL:
+ return new MiniBatchFullOuterJoinStreamOperator(parameter);
+ default:
+ throw new UnsupportedOperationException("Unsupported join
type: " + joinType);
+ }
+ }
+
+ static class MiniBatchStreamingJoinParameter implements Serializable {
+ InternalTypeInfo<RowData> leftType;
+ InternalTypeInfo<RowData> rightType;
+ GeneratedJoinCondition generatedJoinCondition;
+ JoinInputSideSpec leftInputSideSpec;
+ JoinInputSideSpec rightInputSideSpec;
+ boolean leftIsOuter;
+ boolean rightIsOuter;
+ boolean[] filterNullKeys;
+ long leftStateRetentionTime;
+ long rightStateRetentionTime;
+
+ CoBundleTrigger<RowData, RowData> coBundleTrigger;
+
+ MiniBatchStreamingJoinParameter(
+ InternalTypeInfo<RowData> leftType,
+ InternalTypeInfo<RowData> rightType,
+ GeneratedJoinCondition generatedJoinCondition,
+ JoinInputSideSpec leftInputSideSpec,
+ JoinInputSideSpec rightInputSideSpec,
+ boolean leftIsOuter,
+ boolean rightIsOuter,
+ boolean[] filterNullKeys,
+ long leftStateRetentionTime,
+ long rightStateRetentionTime,
+ CoBundleTrigger<RowData, RowData> coBundleTrigger) {
+ this.leftType = leftType;
+ this.rightType = rightType;
+ this.generatedJoinCondition = generatedJoinCondition;
+ this.leftInputSideSpec = leftInputSideSpec;
+ this.rightInputSideSpec = rightInputSideSpec;
+ this.leftIsOuter = leftIsOuter;
+ this.rightIsOuter = rightIsOuter;
+ this.filterNullKeys = filterNullKeys;
+ this.leftStateRetentionTime = leftStateRetentionTime;
+ this.rightStateRetentionTime = rightStateRetentionTime;
+ this.coBundleTrigger = coBundleTrigger;
+ }
+ }
+
+ /** Inner MiniBatch Join operator. */
+ private static final class MiniBatchInnerJoinStreamOperator
+ extends MiniBatchStreamingJoinOperator {
+
+ public
MiniBatchInnerJoinStreamOperator(MiniBatchStreamingJoinParameter parameter) {
+ super(parameter);
+ }
+
+ @Override
+ protected void processBundles(BufferBundle<?> leftBuffer,
BufferBundle<?> rightBuffer)
+ throws Exception {
+ // process right
+ this.processSingleSideBundles(
+ rightBuffer, rightRecordStateView, leftRecordStateView,
false);
+ // process left
+ this.processSingleSideBundles(
+ leftBuffer, leftRecordStateView, rightRecordStateView,
true);
+ }
+ }
+
+ /** MiniBatch Left outer join operator. */
+ private static final class MiniBatchLeftOuterJoinStreamOperator
+ extends MiniBatchStreamingJoinOperator {
+
+ public
MiniBatchLeftOuterJoinStreamOperator(MiniBatchStreamingJoinParameter parameter)
{
+ super(parameter);
+ }
+
+ @Override
+ protected void processBundles(BufferBundle<?> leftBuffer,
BufferBundle<?> rightBuffer)
+ throws Exception {
+ // more efficient to process right first for left out join, i.e,
some retractions can be
+ // avoided for timing-dependent left and right stream
+ // process right
+ this.processSingleSideBundles(
+ rightBuffer, rightRecordStateView, leftRecordStateView,
false);
+ // process left
+ this.processSingleSideBundles(
+ leftBuffer, leftRecordStateView, rightRecordStateView,
true);
+ }
+ }
+
+ /** MiniBatch Right outer join operator. */
+ private static final class MiniBatchRightOuterJoinStreamOperator
+ extends MiniBatchStreamingJoinOperator {
+
+ public
MiniBatchRightOuterJoinStreamOperator(MiniBatchStreamingJoinParameter
parameter) {
+ super(parameter);
+ }
+
+ @Override
+ protected void processBundles(BufferBundle<?> leftBuffer,
BufferBundle<?> rightBuffer)
+ throws Exception {
+
+ // more efficient to process left first for right out join, i.e,
some retractions can be
+ // avoided for timing-dependent left and right stream
+ // process left
+ this.processSingleSideBundles(
+ leftBuffer, leftRecordStateView, rightRecordStateView,
true);
+
+ // process right
+ this.processSingleSideBundles(
+ rightBuffer, rightRecordStateView, leftRecordStateView,
false);
+ }
+ }
+
+ /** MiniBatch Full outer Join operator. */
+ private static final class MiniBatchFullOuterJoinStreamOperator
+ extends MiniBatchStreamingJoinOperator {
+
+ public
MiniBatchFullOuterJoinStreamOperator(MiniBatchStreamingJoinParameter parameter)
{
+ super(parameter);
+ }
+
+ @Override
+ protected void processBundles(BufferBundle<?> leftBuffer,
BufferBundle<?> rightBuffer)
+ throws Exception {
+ // process right
+ this.processSingleSideBundles(
+ rightBuffer, rightRecordStateView, leftRecordStateView,
false);
+ // process left
+ this.processSingleSideBundles(
+ leftBuffer, leftRecordStateView, rightRecordStateView,
true);
+ }
+ }
+}
diff --git
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/stream/StreamingJoinOperator.java
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/stream/StreamingJoinOperator.java
index 308b98e2794..5b9a139e5d2 100644
---
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/stream/StreamingJoinOperator.java
+++
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/stream/StreamingJoinOperator.java
@@ -38,18 +38,18 @@ public class StreamingJoinOperator extends
AbstractStreamingJoinOperator {
private static final long serialVersionUID = -376944622236540545L;
// whether left side is outer side, e.g. left is outer but right is not
when LEFT OUTER JOIN
- private final boolean leftIsOuter;
+ protected final boolean leftIsOuter;
// whether right side is outer side, e.g. right is outer but left is not
when RIGHT OUTER JOIN
- private final boolean rightIsOuter;
+ protected final boolean rightIsOuter;
private transient JoinedRowData outRow;
private transient RowData leftNullRow;
private transient RowData rightNullRow;
// left join state
- private transient JoinRecordStateView leftRecordStateView;
+ protected transient JoinRecordStateView leftRecordStateView;
// right join state
- private transient JoinRecordStateView rightRecordStateView;
+ protected transient JoinRecordStateView rightRecordStateView;
public StreamingJoinOperator(
InternalTypeInfo<RowData> leftType,
@@ -123,12 +123,12 @@ public class StreamingJoinOperator extends
AbstractStreamingJoinOperator {
@Override
public void processElement1(StreamRecord<RowData> element) throws
Exception {
- processElement(element.getValue(), leftRecordStateView,
rightRecordStateView, true);
+ processElement(element.getValue(), leftRecordStateView,
rightRecordStateView, true, false);
}
@Override
public void processElement2(StreamRecord<RowData> element) throws
Exception {
- processElement(element.getValue(), rightRecordStateView,
leftRecordStateView, false);
+ processElement(element.getValue(), rightRecordStateView,
leftRecordStateView, false, false);
}
/**
@@ -196,12 +196,15 @@ public class StreamingJoinOperator extends
AbstractStreamingJoinOperator {
* @param inputSideStateView state of input side
* @param otherSideStateView state of other side
* @param inputIsLeft whether input side is left side
+ * @param isSuppress whether suppress the output of redundant messages
when the other side is
+ * outer join. This only applies to the case of mini-batch.
*/
- private void processElement(
+ protected void processElement(
RowData input,
JoinRecordStateView inputSideStateView,
JoinRecordStateView otherSideStateView,
- boolean inputIsLeft)
+ boolean inputIsLeft,
+ boolean isSuppress)
throws Exception {
boolean inputIsOuter = inputIsLeft ? leftIsOuter : rightIsOuter;
boolean otherIsOuter = inputIsLeft ? rightIsOuter : leftIsOuter;
@@ -228,7 +231,7 @@ public class StreamingJoinOperator extends
AbstractStreamingJoinOperator {
for (OuterRecord outerRecord :
associatedRecords.getOuterRecords()) {
RowData other = outerRecord.record;
// if the matched num in the matched rows == 0
- if (outerRecord.numOfAssociations == 0) {
+ if (outerRecord.numOfAssociations == 0 &&
!isSuppress) {
// send -D[null+other]
outRow.setRowKind(RowKind.DELETE);
outputNullPadding(other, !inputIsLeft);
@@ -254,8 +257,8 @@ public class StreamingJoinOperator extends
AbstractStreamingJoinOperator {
OuterJoinRecordStateView otherSideOuterStateView =
(OuterJoinRecordStateView) otherSideStateView;
for (OuterRecord outerRecord :
associatedRecords.getOuterRecords()) {
- if (outerRecord.numOfAssociations
- == 0) { // if the matched num in the
matched rows == 0
+ if (outerRecord.numOfAssociations == 0
+ && !isSuppress) { // if the matched num in
the matched rows == 0
// send -D[null+other]
outRow.setRowKind(RowKind.DELETE);
outputNullPadding(outerRecord.record,
!inputIsLeft);
@@ -278,7 +281,9 @@ public class StreamingJoinOperator extends
AbstractStreamingJoinOperator {
}
} else { // input record is retract
// state.retract(record)
- inputSideStateView.retractRecord(input);
+ if (!isSuppress) {
+ inputSideStateView.retractRecord(input);
+ }
if (associatedRecords.isEmpty()) { // there is no matched rows on
the other side
if (inputIsOuter) { // input side is outer
// send -D[record+null]
@@ -302,7 +307,7 @@ public class StreamingJoinOperator extends
AbstractStreamingJoinOperator {
OuterJoinRecordStateView otherSideOuterStateView =
(OuterJoinRecordStateView) otherSideStateView;
for (OuterRecord outerRecord :
associatedRecords.getOuterRecords()) {
- if (outerRecord.numOfAssociations == 1) {
+ if (outerRecord.numOfAssociations == 1 && !isSuppress)
{
// send +I[null+other]
outRow.setRowKind(RowKind.INSERT);
outputNullPadding(outerRecord.record,
!inputIsLeft);
diff --git
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/metrics/SimpleGauge.java
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/metrics/SimpleGauge.java
new file mode 100644
index 00000000000..b0a6cb51eb6
--- /dev/null
+++
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/metrics/SimpleGauge.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.metrics;
+
+import org.apache.flink.metrics.Gauge;
+
+/**
+ * A Simple gauge providing method for value updating.
+ *
+ * @param <T> Type of the actual recording value
+ */
+public class SimpleGauge<T> implements Gauge<T> {
+
+ private T value;
+
+ public SimpleGauge(T initialValue) {
+ this.value = initialValue;
+ }
+
+ public void update(T value) {
+ this.value = value;
+ }
+
+ @Override
+ public T getValue() {
+ return value;
+ }
+}
diff --git
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/stream/StreamingJoinOperatorTestBase.java
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/stream/StreamingJoinOperatorTestBase.java
index 48cc9884e66..68afbe72999 100644
---
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/stream/StreamingJoinOperatorTestBase.java
+++
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/stream/StreamingJoinOperatorTestBase.java
@@ -51,7 +51,7 @@ public abstract class StreamingJoinOperatorTestBase {
},
new String[] {"order_id", "line_order_id",
"shipping_address"}));
- protected final InternalTypeInfo<RowData> rightTypeInfo =
+ protected InternalTypeInfo<RowData> rightTypeInfo =
InternalTypeInfo.of(
RowType.of(
new LogicalType[] {new CharType(false, 20), new
CharType(true, 10)},
@@ -61,7 +61,7 @@ public abstract class StreamingJoinOperatorTestBase {
HandwrittenSelectorUtil.getRowDataSelector(
new int[] {1},
leftTypeInfo.toRowType().getChildren().toArray(new
LogicalType[0]));
- protected final RowDataKeySelector rightKeySelector =
+ protected RowDataKeySelector rightKeySelector =
HandwrittenSelectorUtil.getRowDataSelector(
new int[] {0},
rightTypeInfo.toRowType().getChildren().toArray(new
LogicalType[0]));
@@ -94,8 +94,7 @@ public abstract class StreamingJoinOperatorTestBase {
protected final GeneratedJoinCondition joinCondition =
new GeneratedJoinCondition("ConditionFunction", funcCode, new
Object[0]);
- protected final RowDataHarnessAssertor assertor =
- new
RowDataHarnessAssertor(getOutputType().getChildren().toArray(new
LogicalType[0]));
+ protected RowDataHarnessAssertor assertor;
protected KeyedTwoInputStreamOperatorTestHarness<RowData, RowData,
RowData, RowData>
testHarness;
@@ -109,6 +108,10 @@ public abstract class StreamingJoinOperatorTestBase {
rightKeySelector,
joinKeyTypeInfo);
testHarness.open();
+ // extend for mini-batch join test
+ assertor =
+ new RowDataHarnessAssertor(
+ getOutputType().getChildren().toArray(new
LogicalType[0]));
}
@AfterEach
@@ -121,7 +124,7 @@ public abstract class StreamingJoinOperatorTestBase {
if (tags.isEmpty()) {
return new Long[] {0L, 0L};
}
- Long[] ttl = new Long[2];
+ Long[] ttl = new Long[] {0L, 0L};
for (String tag : tags) {
String[] splits = tag.split("=");
long value = Long.parseLong(splits[1].trim());
diff --git
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/stream/StreamingMiniBatchJoinOperatorTest.java
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/stream/StreamingMiniBatchJoinOperatorTest.java
new file mode 100644
index 00000000000..62b8116a0b0
--- /dev/null
+++
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/stream/StreamingMiniBatchJoinOperatorTest.java
@@ -0,0 +1,1956 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.join.stream;
+
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.keyselector.RowDataKeySelector;
+import
org.apache.flink.table.runtime.operators.bundle.trigger.CountCoBundleTrigger;
+import org.apache.flink.table.runtime.operators.join.FlinkJoinType;
+import
org.apache.flink.table.runtime.operators.join.stream.state.JoinInputSideSpec;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.types.logical.CharType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.utils.HandwrittenSelectorUtil;
+import org.apache.flink.types.RowKind;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInfo;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Stream;
+
+import static
org.apache.flink.table.runtime.util.StreamRecordUtils.deleteRecord;
+import static
org.apache.flink.table.runtime.util.StreamRecordUtils.insertRecord;
+import static org.apache.flink.table.runtime.util.StreamRecordUtils.rowOfKind;
+import static
org.apache.flink.table.runtime.util.StreamRecordUtils.updateAfterRecord;
+import static
org.apache.flink.table.runtime.util.StreamRecordUtils.updateBeforeRecord;
+
+/** Test for StreamingMiniBatchJoinOperatorTest. */
+public final class StreamingMiniBatchJoinOperatorTest extends
StreamingJoinOperatorTestBase {
+
+ private RowDataKeySelector leftUniqueKeySelector;
+ private RowDataKeySelector rightUniqueKeySelector;
+
+ @BeforeEach
+ public void beforeEach(TestInfo testInfo) throws Exception {
+ rightTypeInfo =
+ InternalTypeInfo.of(
+ RowType.of(
+ new LogicalType[] {
+ new CharType(false, 20),
+ new CharType(false, 20),
+ new CharType(true, 10)
+ },
+ new String[] {
+ "order_id#", "line_order_id0",
"line_order_ship_mode"
+ }));
+
+ rightKeySelector =
+ HandwrittenSelectorUtil.getRowDataSelector(
+ new int[] {1},
+ rightTypeInfo.toRowType().getChildren().toArray(new
LogicalType[0]));
+ super.beforeEach(testInfo);
+ }
+
+ @Tag("miniBatchSize=3")
+ @Test
+ public void testLeftJoinWithLeftArriveFirst() throws Exception {
+ // joinKey is LineOrd
+ testHarness.processElement1(
+ insertRecord(
+ "Ord#1", "LineOrd#2", "3 Bellevue Drive, Pottstown, PA
19464")); // left +I
+ testHarness.processElement1(
+ insertRecord(
+ "Ord#i",
+ "LineOrd#6",
+ "i6 Bellevue Drive, Pottstown, Pi 19464")); // left +I
+ assertor.shouldEmitNothing(testHarness);
+ testHarness.processElement2(insertRecord("Ord#X", "LineOrd#2",
"AIR")); // right
+
+ assertor.shouldEmit(
+ testHarness,
+ rowOfKind(
+ RowKind.INSERT,
+ "Ord#i",
+ "LineOrd#6",
+ "i6 Bellevue Drive, Pottstown, Pi 19464",
+ null,
+ null,
+ null),
+ rowOfKind(
+ RowKind.INSERT,
+ "Ord#1",
+ "LineOrd#2",
+ "3 Bellevue Drive, Pottstown, PA 19464",
+ "Ord#X",
+ "LineOrd#2",
+ "AIR"));
+ }
+
+ @Tag("miniBatchSize=1")
+ @Test
+ public void testLeftJoinWithLeftArriveFirstNoMiniBatch() throws Exception {
+ // joinKey is LineOrd
+ testHarness.processElement1(
+ insertRecord(
+ "Ord#1", "LineOrd#2", "3 Bellevue Drive, Pottstown, PA
19464")); // left +I
+
+ testHarness.processElement1(
+ insertRecord(
+ "Ord#i",
+ "LineOrd#6",
+ "i6 Bellevue Drive, Pottstown, Pi 19464")); // left +I
+ testHarness.processElement2(insertRecord("Ord#X", "LineOrd#2",
"AIR")); // right
+
+ assertor.shouldEmit(
+ testHarness,
+ rowOfKind(
+ RowKind.INSERT,
+ "Ord#1",
+ "LineOrd#2",
+ "3 Bellevue Drive, Pottstown, PA 19464",
+ null,
+ null,
+ null),
+ rowOfKind(
+ RowKind.INSERT,
+ "Ord#i",
+ "LineOrd#6",
+ "i6 Bellevue Drive, Pottstown, Pi 19464",
+ null,
+ null,
+ null),
+ rowOfKind(
+ RowKind.DELETE,
+ "Ord#1",
+ "LineOrd#2",
+ "3 Bellevue Drive, Pottstown, PA 19464",
+ null,
+ null,
+ null),
+ rowOfKind(
+ RowKind.INSERT,
+ "Ord#1",
+ "LineOrd#2",
+ "3 Bellevue Drive, Pottstown, PA 19464",
+ "Ord#X",
+ "LineOrd#2",
+ "AIR"));
+ }
+
+ @Tag("miniBatchSize=3")
+ @Test
+ public void testRightJoinWithRightArriveFirst() throws Exception {
+ // joinKey is LineOrd
+ testHarness.processElement2(insertRecord("Ord#X", "LineOrd#2",
"AIR")); // right
+
+ testHarness.processElement1(
+ insertRecord(
+ "Ord#1", "LineOrd#2", "3 Bellevue Drive, Pottstown, PA
19464")); // left +I
+ assertor.shouldEmitNothing(testHarness);
+ testHarness.processElement1(
+ insertRecord(
+ "Ord#i",
+ "LineOrd#6",
+ "i6 Bellevue Drive, Pottstown, Pi 19464")); // left +I
+
+ assertor.shouldEmit(
+ testHarness,
+ rowOfKind(
+ RowKind.INSERT,
+ "Ord#1",
+ "LineOrd#2",
+ "3 Bellevue Drive, Pottstown, PA 19464",
+ "Ord#X",
+ "LineOrd#2",
+ "AIR"));
+ }
+
+ @Tag("miniBatchSize=1")
+ @Test
+ public void testRightJoinWithRightArriveFirstWithNoMiniBatch() throws
Exception {
+ // joinKey is LineOrd
+ testHarness.processElement2(insertRecord("Ord#X", "LineOrd#2",
"AIR")); // right
+
+ testHarness.processElement1(
+ insertRecord(
+ "Ord#1", "LineOrd#2", "3 Bellevue Drive, Pottstown, PA
19464")); // left +I
+ testHarness.processElement1(
+ insertRecord(
+ "Ord#i",
+ "LineOrd#6",
+ "i6 Bellevue Drive, Pottstown, Pi 19464")); // left +I
+
+ assertor.shouldEmit(
+ testHarness,
+ rowOfKind(RowKind.INSERT, null, null, null, "Ord#X",
"LineOrd#2", "AIR"),
+ rowOfKind(RowKind.DELETE, null, null, null, "Ord#X",
"LineOrd#2", "AIR"),
+ rowOfKind(
+ RowKind.INSERT,
+ "Ord#1",
+ "LineOrd#2",
+ "3 Bellevue Drive, Pottstown, PA 19464",
+ "Ord#X",
+ "LineOrd#2",
+ "AIR"));
+ }
+
+ @Tag("miniBatchSize=3")
+ @Test
+ public void testFullJoinWithRightArriveFirst() throws Exception {
+ // joinKey is LineOrd
+ testHarness.processElement2(insertRecord("Ord#X", "LineOrd#2",
"AIR")); // right
+
+ testHarness.processElement1(
+ insertRecord(
+ "Ord#1", "LineOrd#2", "3 Bellevue Drive, Pottstown, PA
19464")); // left +I
+ assertor.shouldEmitNothing(testHarness);
+ testHarness.processElement1(
+ insertRecord(
+ "Ord#i",
+ "LineOrd#6",
+ "i6 Bellevue Drive, Pottstown, Pi 19464")); // left +I
+
+ assertor.shouldEmit(
+ testHarness,
+ rowOfKind(RowKind.INSERT, null, null, null, "Ord#X",
"LineOrd#2", "AIR"),
+ rowOfKind(
+ RowKind.INSERT,
+ "Ord#i",
+ "LineOrd#6",
+ "i6 Bellevue Drive, Pottstown, Pi 19464",
+ null,
+ null,
+ null),
+ rowOfKind(RowKind.DELETE, null, null, null, "Ord#X",
"LineOrd#2", "AIR"),
+ rowOfKind(
+ RowKind.INSERT,
+ "Ord#1",
+ "LineOrd#2",
+ "3 Bellevue Drive, Pottstown, PA 19464",
+ "Ord#X",
+ "LineOrd#2",
+ "AIR"));
+ }
+
+ @Tag("miniBatchSize=1")
+ @Test
+ public void testFullJoinWithRightArriveFirstWithNoMiniBatch() throws
Exception {
+ // joinKey is LineOrd
+ testHarness.processElement2(insertRecord("Ord#X", "LineOrd#2",
"AIR")); // right
+
+ testHarness.processElement1(
+ insertRecord(
+ "Ord#1", "LineOrd#2", "3 Bellevue Drive, Pottstown, PA
19464")); // left +I
+ testHarness.processElement1(
+ insertRecord(
+ "Ord#i",
+ "LineOrd#6",
+ "i6 Bellevue Drive, Pottstown, Pi 19464")); // left +I
+
+ assertor.shouldEmit(
+ testHarness,
+ rowOfKind(RowKind.INSERT, null, null, null, "Ord#X",
"LineOrd#2", "AIR"),
+ rowOfKind(RowKind.DELETE, null, null, null, "Ord#X",
"LineOrd#2", "AIR"),
+ rowOfKind(
+ RowKind.INSERT,
+ "Ord#1",
+ "LineOrd#2",
+ "3 Bellevue Drive, Pottstown, PA 19464",
+ "Ord#X",
+ "LineOrd#2",
+ "AIR"),
+ rowOfKind(
+ RowKind.INSERT,
+ "Ord#i",
+ "LineOrd#6",
+ "i6 Bellevue Drive, Pottstown, Pi 19464",
+ null,
+ null,
+ null));
+ }
+
+ @Tag("miniBatchSize=4")
+ @Test
+ public void testInnerJoinJoinKeyContainsUniqueKeyWithoutFold() throws
Exception {
+ // joinKey is LineOrd
+ testHarness.setStateTtlProcessingTime(1);
+ // basic test for that the mini-batch process could be triggerred
normally
+ testHarness.processElement1(
+ insertRecord(
+ "Ord#1",
+ "LineOrd#1",
+ "1 Bellevue Drive, Pottstown, PA 19464")); // left +I x
+ assertor.shouldEmitNothing(testHarness);
+
+ testHarness.processElement1(
+ insertRecord(
+ "Ord#1",
+ "LineOrd#2",
+ "2 Bellevue Drive, Pottstown, PA 19464")); // left +I
xx
+ assertor.shouldEmitNothing(testHarness);
+
+ testHarness.processElement2(insertRecord("Ord#Y", "LineOrd#1",
"TRUCK")); // right +I x
+ assertor.shouldEmitNothing(testHarness);
+
+ // exactly reach to the mini-batch size
+ testHarness.processElement2(insertRecord("Ord#X", "LineOrd#2",
"AIR")); // right +I xx
+ assertor.shouldEmit(
+ testHarness,
+ rowOfKind(
+ RowKind.INSERT,
+ "Ord#1",
+ "LineOrd#1",
+ "1 Bellevue Drive, Pottstown, PA 19464",
+ "Ord#Y",
+ "LineOrd#1",
+ "TRUCK"),
+ rowOfKind(
+ RowKind.INSERT,
+ "Ord#1",
+ "LineOrd#2",
+ "2 Bellevue Drive, Pottstown, PA 19464",
+ "Ord#X",
+ "LineOrd#2",
+ "AIR"));
+ }
+
+ @Tag("miniBatchSize=18")
+ @Test
+ public void testInnerJoinWithJoinKeyContainsUniqueKeyWithinBatch() throws
Exception {
+ // joinKey is LineOrd
+ // left fold || right fold
+ // +I +U / +U +U / +U -D || +I -D / +U -U / +I -U
+ testHarness.processElement1(
+ insertRecord(
+ "Ord#1",
+ "LineOrd#1",
+ "3 Bellevue Drive, Pottstown, PA 19464")); // left +I
x
+ testHarness.processElement1(
+ updateAfterRecord(
+ "Ord#2",
+ "LineOrd#1",
+ "4 Bellevue Drive, Pottstown, PB 19464")); // left +U
x | +I +U
+ testHarness.processElement2(insertRecord("Ord#X", "LineOrd#7",
"RAILWAY")); // right +I a
+ testHarness.processElement1(
+ updateAfterRecord(
+ "Ord#5",
+ "LineOrd#5",
+ "7 Bellevue Drive, Pottstown, PE 19464")); // left +U
y
+ testHarness.processElement1(
+ updateAfterRecord(
+ "Ord#x5",
+ "LineOrd#5",
+ "x3 Bellevue Drive, Pottstown, PAxx 19464")); // left
+U y | +U +U
+ testHarness.processElement1(
+ insertRecord(
+ "Ord#i", "LineOrd#6", "i6 Bellevue Drive, Pottstown,
Pi 19464")); // left +I
+ testHarness.processElement2(
+ deleteRecord("Ord#X", "LineOrd#7", "RAILWAY")); // right -D a
| +I -D
+ testHarness.processElement1(
+ updateAfterRecord(
+ "Ord#3",
+ "LineOrd#x3",
+ "x5 Bellevue Drive, Pottstown, PCxx 19464")); // left
+U
+ testHarness.processElement1(
+ deleteRecord(
+ "Ord#3",
+ "LineOrd#x3",
+ "14y0 Bellevue Drive, Pottstown, PJyy 19464")); //
left -D | +U -D
+ testHarness.processElement2(
+ updateAfterRecord("Ord#X", "LineOrd#7", "AIR")); // right +U
b
+ testHarness.processElement1(
+ insertRecord(
+ "Ord#4", "LineOrd#4", "6 Bellevue Drive, Pottstown, PD
19464")); // left +I
+ assertor.shouldEmitNothing(testHarness);
+ testHarness.processElement2(
+ updateBeforeRecord("Ord#X", "LineOrd#7", "AIR")); // right -U
b | +U -U
+ testHarness.processElement2(insertRecord("Ord#X", "LineOrd#2",
"AIR")); // right +I c
+ testHarness.processElement2(
+ updateBeforeRecord("Ord#X", "LineOrd#2", "AIR")); // right -U
c | +I -U
+ // right state is empty
+ testHarness.processElement2(insertRecord("Ord#X", "LineOrd#1",
"AIR")); // right +I
+ testHarness.processElement2(updateBeforeRecord("Ord#Y", "LineOrd#5",
"TRUCK")); // right -U
+ testHarness.processElement2(insertRecord("Ord#Y", "LineOrd#6",
"RAILWAY")); // right +I
+ testHarness.processElement2(
+ updateBeforeRecord(
+ "Ord#Z", "LineOrd#4", "RAILWAY")); // no effect to
state // right -U
+ // left state | right state
+ // "Ord#2", "LineOrd#1", "4 Bellevue Drive, Pottstown, PB 19464" |
"Ord#X",
+ // "LineOrd#1", "AIR"
+ // "Ord#x5", "LineOrd#5", "x3 Bellevue Drive, Pottstown, PAxx 19464" |
"Ord#Y",
+ // "LineOrd#6", "RAILWAY"
+ // "Ord#i", "LineOrd#6", "i6 Bellevue Drive, Pottstown, Pi 19464"
+ // "Ord#4", "LineOrd#4", "6 Bellevue Drive, Pottstown, PD 19464"
+ assertor.shouldEmit(
+ testHarness,
+ rowOfKind(
+ RowKind.INSERT,
+ "Ord#i",
+ "LineOrd#6",
+ "i6 Bellevue Drive, Pottstown, Pi 19464",
+ "Ord#Y",
+ "LineOrd#6",
+ "RAILWAY"),
+ rowOfKind(
+ RowKind.UPDATE_AFTER,
+ "Ord#2",
+ "LineOrd#1",
+ "4 Bellevue Drive, Pottstown, PB 19464",
+ "Ord#X",
+ "LineOrd#1",
+ "AIR"));
+ }
+
+ @Tag("miniBatchSize=10")
+ @Test
+ public void testInnerJoinWithJoinKeyContainsUniqueKeyCrossBatches() throws
Exception {
+ // joinKey is LineOrd
+
+ testHarness.processElement1(
+ insertRecord(
+ "Ord#1",
+ "LineOrd#1",
+ "3 Bellevue Drive, Pottstown, PA 19464")); // left +I
z
+ testHarness.processElement1(
+ insertRecord(
+ "Ord#1",
+ "LineOrd#1",
+ "3 Bellevue Drive, Pottstown, PA 19464")); // left +I
z | +I +I
+ testHarness.processElement2(insertRecord("Ord#Y", "LineOrd#4",
"TRUCK")); // right +I y
+ assertor.shouldEmitNothing(testHarness);
+ testHarness.processElement1(
+ insertRecord(
+ "Ord#4", "LineOrd#4", "6 Bellevue Drive, Pottstown, PD
19464")); // left +I
+ testHarness.processElement1(
+ insertRecord(
+ "Ord#5",
+ "LineOrd#5",
+ "7 Bellevue Drive, Pottstown, PE 19464")); // left +I
x
+ testHarness.processElement1(
+ updateBeforeRecord(
+ "Ord#5",
+ "LineOrd#5",
+ "7 Bellevue Drive, Pottstown, PE 19464")); // left -U
x | +I -U
+ testHarness.processElement1(
+ updateBeforeRecord(
+ "Ord#9", "LineOrd#9", "11 Bellevue Drive, Pottstown,
PI 19464")); // left -U
+ assertor.shouldEmitNothing(testHarness);
+ testHarness.processElement2(insertRecord("Ord#X", "LineOrd#9",
"AIR")); // right +I
+ testHarness.processElement2(updateAfterRecord("Ord#xyz", "LineOrd#1",
"SHIP")); // right +U
+ testHarness.processElement2(
+ deleteRecord("Ord#Y", "LineOrd#4", "TRUCK")); // right -D y
| +I -D
+ assertor.shouldEmit(
+ testHarness,
+ rowOfKind(
+ RowKind.UPDATE_BEFORE,
+ "Ord#9",
+ "LineOrd#9",
+ "11 Bellevue Drive, Pottstown, PI 19464",
+ "Ord#X",
+ "LineOrd#9",
+ "AIR"),
+ rowOfKind(
+ RowKind.INSERT,
+ "Ord#1",
+ "LineOrd#1",
+ "3 Bellevue Drive, Pottstown, PA 19464",
+ "Ord#xyz",
+ "LineOrd#1",
+ "SHIP"));
+ //
+---------------------------------------------------------------+---------------------------------+
+ // | left state |
right state
+ // |
+ //
|---------------------------------------------------------------|---------------------------------|
+ // | "Ord#1", "LineOrd#1", "3 Bellevue Drive, Pottstown, PA 19464" |
"Ord#X", "LineOrd#9",
+ // "AIR" |
+ // | "Ord#4", "LineOrd#4", "6 Bellevue Drive, Pottstown, PD 19464" |
"Ord#xyz", "LineOrd#1",
+ // "SHIP" |
+ //
+---------------------------------------------------------------+---------------------------------+
+
+ // second join:
+ // 1.left stream state(last batch defined) join new input from right
stream.
+ // 2.right stream state(current and last batch defined) join new input
from left stream.
+
+ testHarness.processElement1(
+ updateAfterRecord(
+ "Ord#adjust",
+ "LineOrd#4",
+ "14 Bellevue Drive, Pottstown, PJ 19464")); // left +U
x
+ testHarness.processElement1(
+ updateAfterRecord(
+ "Ord#18",
+ "LineOrd#9",
+ "22 Bellevue Drive, Pottstown, PK 19464")); // left +U
+ testHarness.processElement2(deleteRecord("Ord#X", "LineOrd#x3",
"AIR")); // right -D
+ testHarness.processElement2(updateBeforeRecord("Ord#xyz", "LineOrd#1",
"SHIP")); // right -U
+ testHarness.processElement2(insertRecord("Ord#Y", "LineOrd#4",
"TRUCK")); // right +I
+ testHarness.processElement1(
+ updateAfterRecord(
+ "Ord#14",
+ "LineOrd#4",
+ "18 Bellevue Drive, Pottstown, PL 19464")); // left +U
x | +U +U
+ testHarness.processElement1(
+ deleteRecord(
+ "Ord#3",
+ "LineOrd#x3",
+ "x5 Bellevue Drive, Pottstown, PCxx 19464")); // left
-D
+ testHarness.processElement1(
+ insertRecord(
+ "Ord#3",
+ "LineOrd#x3",
+ "x5 Bellevue Drive, Pottstown, PCxx 19464")); // left
+I
+ testHarness.processElement1(
+ insertRecord(
+ "Ord#10",
+ "LineOrd#100y",
+ "14y0 Bellevue Drive, Pottstown, PJyy 19464")); //
left +I
+ assertor.shouldEmitNothing(testHarness);
+
+ testHarness.processElement2(updateAfterRecord("Ord#101", "LineOrd#x3",
"AIR")); // right +U
+
+ assertor.shouldEmit(
+ testHarness,
+ rowOfKind(
+ RowKind.INSERT,
+ "Ord#4",
+ "LineOrd#4",
+ "6 Bellevue Drive, Pottstown, PD 19464",
+ "Ord#Y",
+ "LineOrd#4",
+ "TRUCK"),
+ rowOfKind(
+ RowKind.UPDATE_BEFORE,
+ "Ord#1",
+ "LineOrd#1",
+ "3 Bellevue Drive, Pottstown, PA 19464",
+ "Ord#xyz",
+ "LineOrd#1",
+ "SHIP"),
+ rowOfKind(
+ RowKind.DELETE,
+ "Ord#3",
+ "LineOrd#x3",
+ "x5 Bellevue Drive, Pottstown, PCxx 19464",
+ "Ord#101",
+ "LineOrd#x3",
+ "AIR"),
+ rowOfKind(
+ RowKind.INSERT,
+ "Ord#3",
+ "LineOrd#x3",
+ "x5 Bellevue Drive, Pottstown, PCxx 19464",
+ "Ord#101",
+ "LineOrd#x3",
+ "AIR"),
+ rowOfKind(
+ RowKind.UPDATE_AFTER,
+ "Ord#18",
+ "LineOrd#9",
+ "22 Bellevue Drive, Pottstown, PK 19464",
+ "Ord#X",
+ "LineOrd#9",
+ "AIR"),
+ rowOfKind(
+ RowKind.UPDATE_AFTER,
+ "Ord#14",
+ "LineOrd#4",
+ "18 Bellevue Drive, Pottstown, PL 19464",
+ "Ord#Y",
+ "LineOrd#4",
+ "TRUCK"));
+ }
+
+ @Tag("miniBatchSize=13")
+ @Test
+ public void testInnerJoinWithHasUniqueKeyWithinBatch() throws Exception {
+ // joinKey is LineOrd and uniqueKey is Ord
+ // +I +U / +I -U / +I -D / +U -D /+U +U
+ List<StreamRecord<RowData>> records =
+ Arrays.asList(
+ insertRecord(
+ "Ord#1",
+ "LineOrd#1",
+ "3 Bellevue Drive, Pottstown, PA 19464"), // 2
+I -U
+ insertRecord("Ord#2", "LineOrd#2", "4 Bellevue Drive,
Pottstown, PB 19464"),
+ insertRecord(
+ "Ord#3",
+ "LineOrd#10",
+ "5 Bellevue Drive, Pottstown, PC 19464"), // 1
+I +U
+ updateAfterRecord(
+ "Ord#3",
+ "LineOrd#10",
+ "xxx Bellevue Drive, Pottstown, PJ 19464"), //
1
+ updateAfterRecord(
+ "Ord#5", "LineOrd#5", "7 Bellevue Drive,
Pottstown, PE 19464"),
+ insertRecord(
+ "Ord#6",
+ "LineOrd#5",
+ "8 Bellevue Drive, Pottstown, PF 19464"), // 3
+I -D
+ updateBeforeRecord(
+ "Ord#1", "LineOrd#1", "3 Bellevue Drive,
Pottstown, PA 19464"), // 2
+ deleteRecord(
+ "Ord#6", "LineOrd#5", "7 Bellevue Drive,
Pottstown, PE 19464"), // 3
+ updateBeforeRecord(
+ "Ord#12",
+ "LineOrd#4",
+ "6 Bellevue Drive, Pottstown, PD 19464"), //
no effect
+ updateAfterRecord(
+ "Ord#9",
+ "LineOrd#3",
+ "5 Bellevue Drive, Pottstown, PC 19464"), // 4
+U -D
+ deleteRecord(
+ "Ord#9",
+ "LineOrd#3",
+ "5 Bellevue Drive, Pottstown, PC 19464")); // 4
+ for (StreamRecord<RowData> row : records) {
+ testHarness.processElement1(row);
+ }
+ //
+-------------------------------------------------------------------+
+ // | left state
|
+ //
|-------------------------------------------------------------------|
+ // | "Ord#2", "LineOrd#2", "4 Bellevue Drive, Pottstown, PB 19464"
|
+ // | "Ord#3", "LineOrd#10", "xxx Bellevue Drive, Pottstown, PJ 19464"
|
+ // | "Ord#5", "LineOrd#5", "7 Bellevue Drive, Pottstown, PE 19464"
|
+ //
+-------------------------------------------------------------------+
+ assertor.shouldEmitNothing(testHarness);
+ records =
+ Arrays.asList(
+ insertRecord("Ord#5", "LineOrd#2", "SHIP"),
+ updateAfterRecord("Ord#6", "LineOrd#5", "AIR"));
+ for (StreamRecord<RowData> row : records) {
+ testHarness.processElement2(row);
+ }
+ assertor.shouldEmit(
+ testHarness,
+ rowOfKind(
+ RowKind.UPDATE_AFTER,
+ "Ord#5",
+ "LineOrd#5",
+ "7 Bellevue Drive, Pottstown, PE 19464",
+ "Ord#6",
+ "LineOrd#5",
+ "AIR"),
+ rowOfKind(
+ RowKind.INSERT,
+ "Ord#2",
+ "LineOrd#2",
+ "4 Bellevue Drive, Pottstown, PB 19464",
+ "Ord#5",
+ "LineOrd#2",
+ "SHIP"));
+ }
+
+ @Tag("miniBatchSize=8")
+ @Test
+ public void testInnerJoinWithHasUniqueKeyCrossBatches() throws Exception {
+ // joinKey is LineOrd and uniqueKey is Ord
+ // fold +I/+U +U (same and different jks)
+ testHarness.processElement1(
+ updateBeforeRecord(
+ "Ord#1", "LineOrd#1", "3 Bellevue Drive, Pottstown, PA
19464")); // left -U
+ testHarness.processElement1(
+ updateAfterRecord(
+ "Ord#2", "LineOrd#2", "4 Bellevue Drive, Pottstown, PB
19464")); // left +U
+ testHarness.processElement2(insertRecord("Ord#5", "LineOrd#5",
"SHIP")); // right +I
+ testHarness.processElement1(
+ insertRecord(
+ "Ord#4",
+ "LineOrd#4",
+ "5 Bellevue Drive, Pottstown, PC 19464")); // left +I
x
+ testHarness.processElement1(
+ updateAfterRecord(
+ "Ord#4",
+ "LineOrd#4",
+ "6 Bellevue Drive, Pottstown, PD 19464")); // left +U
x | +I +U
+ testHarness.processElement2(
+ updateAfterRecord("Ord#22", "LineOrd#4", "SHIP")); // right
+U join
+ testHarness.processElement2(insertRecord("Ord#23", "LineOrd#10",
"AIR")); // right +I
+ assertor.shouldEmitNothing(testHarness);
+ testHarness.processElement1(
+ updateAfterRecord(
+ "Ord#4",
+ "LineOrd#4",
+ "xxx Bellevue Drive, Pottstown, PJ 19464")); // left
+U x | +I +U +U join
+ assertor.shouldEmit(
+ testHarness,
+ rowOfKind(
+ RowKind.UPDATE_AFTER,
+ "Ord#4",
+ "LineOrd#4",
+ "xxx Bellevue Drive, Pottstown, PJ 19464",
+ "Ord#22",
+ "LineOrd#4",
+ "SHIP"));
+
+ //
+-----------------------------------------------------------------+----------------------------------+
+ // | left state |
right state
+ // |
+ //
|-----------------------------------------------------------------|----------------------------------|
+ // | "Ord#2", "LineOrd#2", "4 Bellevue Drive, Pottstown, PB 19464" |
"Ord#5", "LineOrd#5",
+ // "SHIP" |
+ // | "Ord#4", "LineOrd#4", "xxx Bellevue Drive, Pottstown, PJ 19464" |
"Ord#22",
+ // "LineOrd#4", "SHIP" |
+ // | |
"Ord#23",
+ // "LineOrd#10", "AIR" |
+ //
+-----------------------------------------------------------------+----------------------------------+
+
+ // fold +I/+U -U/D (same and different jks)
+ testHarness.processElement1(
+ insertRecord(
+ "Ord#5", "LineOrd#5", "7 Bellevue Drive, Pottstown, PE
19464")); // left +I
+ testHarness.processElement1(
+ updateBeforeRecord(
+ "Ord#6",
+ "LineOrd#6",
+ "8 Bellevue Drive, Pottstown, PF 19464")); // left -U
+ testHarness.processElement2(insertRecord("Ord#21", "LineOrd#5",
"RAILWAY")); // right +I x
+ testHarness.processElement2(
+ insertRecord("Ord#1", "LineOrd#5", "TRUCK")); // right +I x
| +I +I
+ testHarness.processElement1(
+ updateAfterRecord(
+ "Ord#6", "LineOrd#6", "8 Bellevue Drive, Pottstown, PF
19464")); // left +U
+ testHarness.processElement1(
+ deleteRecord(
+ "Ord#2", "LineOrd#2", "4 Bellevue Drive, Pottstown, PB
19464")); // left -D
+ testHarness.processElement2(
+ updateBeforeRecord("Ord#5", "LineOrd#5", "SHIP")); // right -U
x | +I +I -U
+ assertor.shouldEmitNothing(testHarness);
+ testHarness.processElement2(updateBeforeRecord("Ord#22", "LineOrd#6",
"AIR")); // right -U
+
+ assertor.shouldEmit(
+ testHarness,
+ rowOfKind(
+ RowKind.INSERT,
+ "Ord#5",
+ "LineOrd#5",
+ "7 Bellevue Drive, Pottstown, PE 19464",
+ "Ord#21",
+ "LineOrd#5",
+ "RAILWAY"),
+ rowOfKind(
+ RowKind.INSERT,
+ "Ord#5",
+ "LineOrd#5",
+ "7 Bellevue Drive, Pottstown, PE 19464",
+ "Ord#1",
+ "LineOrd#5",
+ "TRUCK"));
+ }
+
+ @Tag("miniBatchSize=20")
+ @Test
+ public void testInnerJoinWithNoUniqueKeyWithinBatch() throws Exception {
+ // joinKey is LineOrd
+ // +I -U / +I -D / -U +U / -D +I
+ List<StreamRecord<RowData>> records =
+ Arrays.asList(
+ insertRecord("Ord#1", "LineOrd#1", "3 Bellevue Drive,
Pottstown, PA 19464"),
+ insertRecord("Ord#1", "LineOrd#2", "4 Bellevue Drive,
Pottstown, PB 19464"),
+ insertRecord(
+ "Ord#1",
+ "LineOrd#2",
+ "4 Bellevue Drive, Pottstown, PB 19464"), //
2x +I -D
+ deleteRecord("Ord#6", "LineOrd#6", "8 Bellevue Drive,
Pottstown, PF 19464"),
+ insertRecord("Ord#1", "LineOrd#1", "3 Bellevue Drive,
Pottstown, PA 19464"),
+ insertRecord(
+ "Ord#3",
+ "LineOrd#3",
+ "5 Bellevue Drive, Pottstown, PD 19464"), // x
+I -U
+ updateBeforeRecord(
+ "Ord#3", "LineOrd#3", "5 Bellevue Drive,
Pottstown, PD 19464"), // x
+ updateBeforeRecord(
+ "Ord#9",
+ "LineOrd#9",
+ "11 Bellevue Drive, Pottstown, PI 19464"), //
3x -U +U
+ updateAfterRecord(
+ "Ord#10", "LineOrd#10", "14 Bellevue Drive,
Pottstown, PJ 19464"),
+ updateAfterRecord(
+ "Ord#18", "LineOrd#18", "22 Bellevue Drive,
Pottstown, PK 19464"),
+ deleteRecord(
+ "Ord#1",
+ "LineOrd#2",
+ "4 Bellevue Drive, Pottstown, PB 19464"), // 2x
+ updateAfterRecord(
+ "Ord#9",
+ "LineOrd#9",
+ "11 Bellevue Drive, Pottstown, PI 19464"), //
3x
+ deleteRecord(
+ "Ord#6",
+ "LineOrd#6",
+ "8 Bellevue Drive, Pottstown, PF 19464"), //
4x -D +I
+ insertRecord(
+ "Ord#6", "LineOrd#6", "8 Bellevue Drive,
Pottstown, PF 19464") // 4x
+ );
+ for (StreamRecord<RowData> row : records) {
+ testHarness.processElement2(row);
+ }
+ records =
+ Arrays.asList(
+ insertRecord("Ord#1", "LineOrd#1", "AIR"),
+ updateAfterRecord("Ord#1", "LineOrd#2", "SHIP"),
+ updateBeforeRecord("Ord#1", "LineOrd#2", "RAILWAY"),
+ insertRecord("Ord#1", "LineOrd#2", "RAILWAY"),
+ deleteRecord("Ord#6", "LineOrd#6", "RAILWAY"),
+ insertRecord("Ord#6", "LineOrd#6", "RAILWAY"));
+
+ for (StreamRecord<RowData> row : records) {
+ testHarness.processElement1(row);
+ }
+ assertor.shouldEmit(
+ testHarness,
+ rowOfKind(
+ RowKind.INSERT,
+ "Ord#1",
+ "LineOrd#1",
+ "AIR",
+ "Ord#1",
+ "LineOrd#1",
+ "3 Bellevue Drive, Pottstown, PA 19464"),
+ rowOfKind(
+ RowKind.INSERT,
+ "Ord#1",
+ "LineOrd#1",
+ "AIR",
+ "Ord#1",
+ "LineOrd#1",
+ "3 Bellevue Drive, Pottstown, PA 19464"),
+ rowOfKind(
+ RowKind.UPDATE_AFTER,
+ "Ord#1",
+ "LineOrd#2",
+ "SHIP",
+ "Ord#1",
+ "LineOrd#2",
+ "4 Bellevue Drive, Pottstown, PB 19464"));
+ }
+
+ @Tag("miniBatchSize=6")
+ @Test
+ public void testInnerJoinWithNoUniqueKeyCrossBatches() throws Exception {
+ // joinKey is LineOrd
+ // completely duplicate records
+ testHarness.processElement1(
+ insertRecord("Ord#1", "LineOrd#2", "4 Bellevue Drive,
Pottstown, PB 19464"));
+ testHarness.processElement2(insertRecord("Ord#1", "LineOrd#1", "AIR"));
+ testHarness.processElement1(
+ insertRecord("Ord#1", "LineOrd#2", "4 Bellevue Drive,
Pottstown, PB 19464"));
+ testHarness.processElement1(
+ deleteRecord("Ord#6", "LineOrd#6", "8 Bellevue Drive,
Pottstown, PF 19464"));
+ testHarness.processElement2(updateAfterRecord("Ord#1", "LineOrd#2",
"SHIP"));
+ assertor.shouldEmitNothing(testHarness);
+ testHarness.processElement1(
+ insertRecord("Ord#3", "LineOrd#3", "5 Bellevue Drive,
Pottstown, PD 19464"));
+ // left state | right state
+ // "Ord#1", "LineOrd#2", "4 Bellevue Drive, Pottstown, PB 19464" |
"Ord#1", "LineOrd#1",
+ // "AIR"
+ // "Ord#1", "LineOrd#2", "4 Bellevue Drive, Pottstown, PB 19464" |
"Ord#1", "LineOrd#2",
+ // "SHIP"
+ // "Ord#3", "LineOrd#3", "5 Bellevue Drive, Pottstown, PD 19464" |
+ assertor.shouldEmit(
+ testHarness,
+ rowOfKind(
+ RowKind.INSERT,
+ "Ord#1",
+ "LineOrd#2",
+ "4 Bellevue Drive, Pottstown, PB 19464",
+ "Ord#1",
+ "LineOrd#2",
+ "SHIP"),
+ rowOfKind(
+ RowKind.INSERT,
+ "Ord#1",
+ "LineOrd#2",
+ "4 Bellevue Drive, Pottstown, PB 19464",
+ "Ord#1",
+ "LineOrd#2",
+ "SHIP"));
+ testHarness.processElement1(
+ updateBeforeRecord("Ord#1", "LineOrd#2", "4 Bellevue Drive,
Pottstown, PB 19464"));
+ testHarness.processElement1(
+ updateAfterRecord("Ord#1", "LineOrd#2", "4 Bellevue Drive,
Pottstown, PB 19464"));
+ testHarness.processElement2(insertRecord("Ord#1", "LineOrd#3", "AIR"));
+ testHarness.processElement1(
+ deleteRecord("Ord#6", "LineOrd#1", "8 Bellevue Drive,
Pottstown, PF 19464"));
+ testHarness.processElement2(deleteRecord("Ord#1", "LineOrd#2",
"SHIP"));
+ // right state
+ // "Ord#1", "LineOrd#1", "AIR"
+ // "Ord#1", "LineOrd#3", "AIR"
+ testHarness.processElement1(
+ insertRecord("Ord#1", "LineOrd#2", "4 Bellevue Drive,
Pottstown, PB 19464"));
+ assertor.shouldEmit(
+ testHarness,
+ rowOfKind(
+ RowKind.INSERT,
+ "Ord#3",
+ "LineOrd#3",
+ "5 Bellevue Drive, Pottstown, PD 19464",
+ "Ord#1",
+ "LineOrd#3",
+ "AIR"),
+ rowOfKind(
+ RowKind.DELETE,
+ "Ord#1",
+ "LineOrd#2",
+ "4 Bellevue Drive, Pottstown, PB 19464",
+ "Ord#1",
+ "LineOrd#2",
+ "SHIP"),
+ rowOfKind(
+ RowKind.DELETE,
+ "Ord#1",
+ "LineOrd#2",
+ "4 Bellevue Drive, Pottstown, PB 19464",
+ "Ord#1",
+ "LineOrd#2",
+ "SHIP"),
+ rowOfKind(
+ RowKind.DELETE,
+ "Ord#6",
+ "LineOrd#1",
+ "8 Bellevue Drive, Pottstown, PF 19464",
+ "Ord#1",
+ "LineOrd#1",
+ "AIR"));
+ }
+
+ /** Outer join only emits INSERT or DELETE Msg. */
+ @Tag("miniBatchSize=10")
+ @Test
+ public void testLeftJoinWithJoinKeyContainsUniqueKey() throws Exception {
+ // joinKey is LineOrd
+ // left fold || right fold
+ // +I +U / +U +U / +U -D || +I -D / +U -U / +I -U
+ testHarness.processElement1(
+ insertRecord(
+ "Ord#1", "LineOrd#1", "3 Bellevue Drive, Pottstown, PA
19464")); // left +I
+ testHarness.processElement1(
+ updateAfterRecord(
+ "Ord#2", "LineOrd#1", "4 Bellevue Drive, Pottstown, PB
19464")); // left +U
+
+ testHarness.processElement1(
+ insertRecord(
+ "Ord#i",
+ "LineOrd#6",
+ "i6 Bellevue Drive, Pottstown, Pi 19464")); // left +I
+
+ testHarness.processElement1(
+ updateAfterRecord(
+ "Ord#3", "LineOrd#x3", "x5 Bellevue Drive, Pottstown,
PCxx 19464")); // left
+ testHarness.processElement1(
+ deleteRecord(
+ "Ord#3",
+ "LineOrd#x3",
+ "14y0 Bellevue Drive, Pottstown, PJyy 19464")); // left
+
+ testHarness.processElement1(
+ insertRecord(
+ "Ord#4", "LineOrd#4", "6 Bellevue Drive, Pottstown, PD
19464")); // left
+ assertor.shouldEmitNothing(testHarness);
+
+ testHarness.processElement2(insertRecord("Ord#X", "LineOrd#2",
"AIR")); // right
+ testHarness.processElement2(updateBeforeRecord("Ord#X", "LineOrd#2",
"AIR")); // right
+ // right state is empty
+ testHarness.processElement2(insertRecord("Ord#X", "LineOrd#1",
"AIR")); // right
+ testHarness.processElement2(insertRecord("Ord#Y", "LineOrd#6",
"RAILWAY")); // right
+ // left state | right state
+ // "Ord#2", "LineOrd#1", "4 Bellevue Drive, Pottstown, PB 19464" |
"Ord#X","LineOrd#1",
+ // "AIR"
+ // "Ord#i", "LineOrd#6", "i6 Bellevue Drive, Pottstown, Pi 19464" |
"Ord#Y","LineOrd#6",
+ // "RAILWAY"
+ // "Ord#4", "LineOrd#4", "6 Bellevue Drive, Pottstown, PD 19464"
+ assertor.shouldEmit(
+ testHarness,
+ rowOfKind(
+ RowKind.INSERT,
+ "Ord#i",
+ "LineOrd#6",
+ "i6 Bellevue Drive, Pottstown, Pi 19464",
+ "Ord#Y",
+ "LineOrd#6",
+ "RAILWAY"),
+ rowOfKind(
+ RowKind.INSERT,
+ "Ord#4",
+ "LineOrd#4",
+ "6 Bellevue Drive, Pottstown, PD 19464",
+ null,
+ null,
+ null),
+ rowOfKind(
+ RowKind.INSERT,
+ "Ord#2",
+ "LineOrd#1",
+ "4 Bellevue Drive, Pottstown, PB 19464",
+ "Ord#X",
+ "LineOrd#1",
+ "AIR"));
+ // left state | right state
+ // "Ord#2", "LineOrd#1", "4 Bellevue Drive, Pottstown, PB 19464" |
"Ord#X","LineOrd#1",
+ // "AIR"
+ // "Ord#i", "LineOrd#6", "i6 Bellevue Drive, Pottstown, Pi 19464" |
"Ord#Y","LineOrd#6",
+ // "RAILWAY"
+ // "Ord#4", "LineOrd#4", "6 Bellevue Drive, Pottstown, PD 19464"
+ testHarness.processElement2(
+ updateBeforeRecord("Ord#Y", "LineOrd#6", "RAILWAY")); // right
-U
+ testHarness.processElement2(updateAfterRecord("Ord#UU", "LineOrd#6",
"SHIP")); // right +U
+ testHarness.processElement2(insertRecord("Ord#X", "LineOrd#3",
"AIR")); // right
+ testHarness.processElement2(insertRecord("Ord#X", "LineOrd#2",
"AIR")); // right
+ testHarness.processElement2(insertRecord("Ord#X", "LineOrd#7",
"AIR")); // right
+ testHarness.processElement2(insertRecord("Ord#X", "LineOrd#8",
"AIR")); // right
+ testHarness.processElement2(insertRecord("Ord#X", "LineOrd#9",
"AIR")); // right
+ testHarness.processElement2(insertRecord("Ord#X", "LineOrd#10",
"AIR")); // right
+ testHarness.processElement2(insertRecord("Ord#X", "LineOrd#11",
"AIR")); // right
+ testHarness.processElement2(updateBeforeRecord("Ord#X", "LineOrd#1",
"AIR")); // right
+
+ assertor.shouldEmit(
+ testHarness,
+ rowOfKind(
+ RowKind.UPDATE_BEFORE,
+ "Ord#i",
+ "LineOrd#6",
+ "i6 Bellevue Drive, Pottstown, Pi 19464",
+ "Ord#Y",
+ "LineOrd#6",
+ "RAILWAY"),
+ rowOfKind(
+ RowKind.INSERT,
+ "Ord#i",
+ "LineOrd#6",
+ "i6 Bellevue Drive, Pottstown, Pi 19464",
+ "Ord#UU",
+ "LineOrd#6",
+ "SHIP"),
+ rowOfKind(
+ RowKind.UPDATE_BEFORE,
+ "Ord#2",
+ "LineOrd#1",
+ "4 Bellevue Drive, Pottstown, PB 19464",
+ "Ord#X",
+ "LineOrd#1",
+ "AIR"),
+ rowOfKind(
+ RowKind.INSERT,
+ "Ord#2",
+ "LineOrd#1",
+ "4 Bellevue Drive, Pottstown, PB 19464",
+ null,
+ null,
+ null));
+ }
+
+ /** Outer join only emits INSERT or DELETE Msg. */
+ @Tag("miniBatchSize=4")
+ @Test
+ public void testLeftJoinWithHasUniqueKey() throws Exception {
+ // joinKey is LineOrd and uniqueKey is Ord
+ // +I +U / +I -U / +I -D /+U +U
+ List<StreamRecord<RowData>> records =
+ Arrays.asList(
+ insertRecord(
+ "Ord#1",
+ "LineOrd#1",
+ "3 Bellevue Drive, Pottstown, PA 19464"), // 2
+I -U
+ insertRecord("Ord#2", "LineOrd#2", "4 Bellevue Drive,
Pottstown, PB 19464"),
+ insertRecord(
+ "Ord#3",
+ "LineOrd#10",
+ "5 Bellevue Drive, Pottstown, PC 19464"), // 1
+I +U
+ updateAfterRecord(
+ "Ord#3",
+ "LineOrd#10",
+ "xxx Bellevue Drive, Pottstown, PJ 19464"), //
1
+ updateAfterRecord(
+ "Ord#5", "LineOrd#5", "7 Bellevue Drive,
Pottstown, PE 19464"),
+ insertRecord(
+ "Ord#6",
+ "LineOrd#5",
+ "8 Bellevue Drive, Pottstown, PF 19464"), // 3
+I -D
+ updateBeforeRecord(
+ "Ord#1", "LineOrd#1", "3 Bellevue Drive,
Pottstown, PA 19464"), // 2
+ deleteRecord(
+ "Ord#6", "LineOrd#5", "7 Bellevue Drive,
Pottstown, PE 19464"), // 3
+ updateAfterRecord(
+ "Ord#6",
+ "LineOrd#7",
+ "8 Bellevue Drive, Pottstown, PF 19464"), // 5
+U +U
+ updateAfterRecord(
+ "Ord#6",
+ "LineOrd#7",
+ "9 Bellevue Drive, Pottstown, PF 19464")); // 5
+
+ for (StreamRecord<RowData> row : records) {
+ testHarness.processElement1(row);
+ }
+ assertor.shouldEmit(
+ testHarness,
+ rowOfKind(
+ RowKind.INSERT,
+ "Ord#3",
+ "LineOrd#10",
+ "xxx Bellevue Drive, Pottstown, PJ 19464",
+ null,
+ null,
+ null),
+ rowOfKind(
+ RowKind.INSERT,
+ "Ord#1",
+ "LineOrd#1",
+ "3 Bellevue Drive, Pottstown, PA 19464",
+ null,
+ null,
+ null),
+ rowOfKind(
+ RowKind.INSERT,
+ "Ord#2",
+ "LineOrd#2",
+ "4 Bellevue Drive, Pottstown, PB 19464",
+ null,
+ null,
+ null),
+ rowOfKind(
+ RowKind.DELETE,
+ "Ord#1",
+ "LineOrd#1",
+ "3 Bellevue Drive, Pottstown, PA 19464",
+ null,
+ null,
+ null),
+ rowOfKind(
+ RowKind.INSERT,
+ "Ord#5",
+ "LineOrd#5",
+ "7 Bellevue Drive, Pottstown, PE 19464",
+ null,
+ null,
+ null));
+ // +----------------------------------------------------------------+
+ // | left state |
+ // |----------------------------------------------------------------|
+ // | "Ord#2","LineOrd#2","4 Bellevue Drive, Pottstown, PB 19464" |
+ // | "Ord#3","LineOrd#10","xxx Bellevue Drive, Pottstown, PJ 19464" |
+ // | "Ord#5","LineOrd#5","7 Bellevue Drive, Pottstown, PE 19464" |
+ // | "Ord#6","LineOrd#7","9 Bellevue Drive, Pottstown, PF 19464" |
+ // +----------------------------------------------------------------+
+
+ records =
+ Arrays.asList(
+ insertRecord("Ord#5", "LineOrd#2", "SHIP"),
+ updateAfterRecord("Ord#6", "LineOrd#4", "AIR"),
+ updateBeforeRecord("Ord#5", "LineOrd#2", "SHIP"), //
-U +U pattern
+ updateAfterRecord("Ord#5", "LineOrd#2", "TRUCK"), //
-U +U pattern
+ updateAfterRecord("Ord#7", "LineOrd#0", "AIR"),
+ updateAfterRecord("Ord#8", "LineOrd#11", "AIR"));
+ for (StreamRecord<RowData> row : records) {
+ testHarness.processElement2(row);
+ }
+
+ assertor.shouldEmit(
+ testHarness,
+ rowOfKind(
+ RowKind.DELETE,
+ "Ord#2",
+ "LineOrd#2",
+ "4 Bellevue Drive, Pottstown, PB 19464",
+ null,
+ null,
+ null),
+ rowOfKind(
+ RowKind.INSERT,
+ "Ord#2",
+ "LineOrd#2",
+ "4 Bellevue Drive, Pottstown, PB 19464",
+ "Ord#5",
+ "LineOrd#2",
+ "SHIP"),
+ rowOfKind(
+ RowKind.INSERT,
+ "Ord#6",
+ "LineOrd#7",
+ "9 Bellevue Drive, Pottstown, PF 19464",
+ null,
+ null,
+ null),
+ rowOfKind(
+ RowKind.UPDATE_BEFORE,
+ "Ord#2",
+ "LineOrd#2",
+ "4 Bellevue Drive, Pottstown, PB 19464",
+ "Ord#5",
+ "LineOrd#2",
+ "SHIP"),
+ rowOfKind(
+ RowKind.INSERT,
+ "Ord#2",
+ "LineOrd#2",
+ "4 Bellevue Drive, Pottstown, PB 19464",
+ "Ord#5",
+ "LineOrd#2",
+ "TRUCK"));
+ }
+
+ @Tag("miniBatchSize=2")
+ @Test
+ public void testLeftJoinHasUniqueKeyRetAndAcc() throws Exception {
+ // this case would create buffer of JoinHasUniqueKey
+ testLeftJoinWithUpdate();
+ }
+
+ @Tag("miniBatchSize=2")
+ @Test
+ public void testLeftJoinJoinKeyContainsUniqueKeyRetAndAcc() throws
Exception {
+ // this case would create buffer of JoinKeyContainsUniqueKey
+ testLeftJoinWithUpdate();
+ }
+
+ @Tag("miniBatchSize=2")
+ @Test
+ public void testLeftJoinHasUniqueKeyWithoutRetract() throws Exception {
+ // this case would create buffer of JoinHasUniqueKey
+ testLeftJoinWithoutRetract();
+ }
+
+ @Tag("miniBatchSize=2")
+ @Test
+ public void testLeftJoinJoinKeyContainsUniqueKeyWithoutRetract() throws
Exception {
+ // this case would create buffer of JoinKeyContainsUniqueKey
+ testLeftJoinWithoutRetract();
+ }
+
+ @Tag("miniBatchSize=2")
+ @Test
+ public void testLeftJoinJoinKeyContainsUniqueKeyWithoutAcc() throws
Exception {
+ // this case would create buffer of JoinKeyContainsUniqueKey
+ testLeftJoinWithoutAcc();
+ }
+
+ @Tag("miniBatchSize=2")
+ @Test
+ public void testLeftJoinHasUniqueKeyWithoutAcc() throws Exception {
+ // this case would create buffer of JoinKeyContainsUniqueKey
+ testLeftJoinWithoutAcc();
+ }
+
+ @Tag("miniBatchSize=4")
+ @Test
+ public void testLeftJoinHasUniqueKeyWithUpdateMultipleCases() throws
Exception {
+ // this case would create buffer of JoinKeyContainsUniqueKey
+ testLeftJoinWithUpdateRecordsMultipleCases();
+ }
+
+ @Tag("miniBatchSize=4")
+ @Test
+ public void testLeftJoinJoinKeyContainsUniqueKeyWithUpdateMultipleCases()
throws Exception {
+ // this case would create buffer of JoinKeyContainsUniqueKey
+ testLeftJoinWithUpdateRecordsMultipleCases();
+ }
+
+ @Tag("miniBatchSize=4")
+ @Test
+ public void testRightJoinWithHasUniqueKey() throws Exception {
+ List<StreamRecord<RowData>> records =
+ Arrays.asList(
+ insertRecord(
+ "Ord#1",
+ "LineOrd#1",
+ "3 Bellevue Drive, Pottstown, PA 19464"), // 2
+I -U
+ insertRecord("Ord#2", "LineOrd#2", "4 Bellevue Drive,
Pottstown, PB 19464"),
+ updateAfterRecord(
+ "Ord#5", "LineOrd#5", "7 Bellevue Drive,
Pottstown, PE 19464"),
+ insertRecord(
+ "Ord#6",
+ "LineOrd#5",
+ "8 Bellevue Drive, Pottstown, PF 19464"), // 3
+I -D
+ updateBeforeRecord(
+ "Ord#1", "LineOrd#1", "3 Bellevue Drive,
Pottstown, PA 19464"), // 2
+ deleteRecord(
+ "Ord#6",
+ "LineOrd#5",
+ "7 Bellevue Drive, Pottstown, PE 19464")); // 3
+
+ for (StreamRecord<RowData> row : records) {
+ testHarness.processElement1(row);
+ }
+ assertor.shouldEmitNothing(testHarness);
+ // +----------------------------------------------------------------+
+ // | left state |
+ // |----------------------------------------------------------------|
+ // | "Ord#2","LineOrd#2","4 Bellevue Drive, Pottstown, PB 19464" |
+ // | "Ord#5","LineOrd#5","7 Bellevue Drive, Pottstown, PE 19464" |
+ // +----------------------------------------------------------------+
+ records =
+ Arrays.asList(
+ insertRecord("Ord#5", "LineOrd#2", "SHIP"),
+ updateAfterRecord("Ord#6", "LineOrd#4", "AIR"),
+ updateBeforeRecord("Ord#5", "LineOrd#2", "SHIP"),
+ updateAfterRecord("Ord#5", "LineOrd#2", "TRUCK"),
+ updateAfterRecord("Ord#7", "LineOrd#0", "AIR"),
+ updateAfterRecord("Ord#8", "LineOrd#11", "AIR"));
+ for (StreamRecord<RowData> row : records) {
+ testHarness.processElement2(row);
+ }
+ assertor.shouldEmit(
+ testHarness,
+ rowOfKind(RowKind.INSERT, null, null, null, "Ord#6",
"LineOrd#4", "AIR"),
+ rowOfKind(
+ RowKind.INSERT,
+ "Ord#2",
+ "LineOrd#2",
+ "4 Bellevue Drive, Pottstown, PB 19464",
+ "Ord#5",
+ "LineOrd#2",
+ "SHIP"),
+ rowOfKind(RowKind.INSERT, null, null, null, "Ord#7",
"LineOrd#0", "AIR"),
+ rowOfKind(RowKind.INSERT, null, null, null, "Ord#8",
"LineOrd#11", "AIR"),
+ rowOfKind(
+ RowKind.DELETE,
+ "Ord#2",
+ "LineOrd#2",
+ "4 Bellevue Drive, Pottstown, PB 19464",
+ "Ord#5",
+ "LineOrd#2",
+ "SHIP"),
+ rowOfKind(
+ RowKind.INSERT,
+ "Ord#2",
+ "LineOrd#2",
+ "4 Bellevue Drive, Pottstown, PB 19464",
+ "Ord#5",
+ "LineOrd#2",
+ "TRUCK"));
+ }
+
+ @Tag("miniBatchSize=4")
+ @Test
+ public void testFullJoinWithHasUniqueKey() throws Exception {
+ List<StreamRecord<RowData>> records =
+ Arrays.asList(
+ insertRecord(
+ "Ord#1",
+ "LineOrd#1",
+ "3 Bellevue Drive, Pottstown, PA 19464"), // 2
+I -U
+ insertRecord("Ord#2", "LineOrd#2", "4 Bellevue Drive,
Pottstown, PB 19464"),
+ updateAfterRecord(
+ "Ord#5", "LineOrd#5", "7 Bellevue Drive,
Pottstown, PE 19464"),
+ insertRecord(
+ "Ord#6",
+ "LineOrd#5",
+ "8 Bellevue Drive, Pottstown, PF 19464"), // 3
+I -D
+ updateBeforeRecord(
+ "Ord#1", "LineOrd#1", "3 Bellevue Drive,
Pottstown, PA 19464"), // 2
+ deleteRecord(
+ "Ord#6",
+ "LineOrd#5",
+ "7 Bellevue Drive, Pottstown, PE 19464")); // 3
+
+ for (StreamRecord<RowData> row : records) {
+ testHarness.processElement1(row);
+ }
+ assertor.shouldEmit(
+ testHarness,
+ rowOfKind(
+ RowKind.INSERT,
+ "Ord#1",
+ "LineOrd#1",
+ "3 Bellevue Drive, Pottstown, PA 19464",
+ null,
+ null,
+ null),
+ rowOfKind(
+ RowKind.INSERT,
+ "Ord#6",
+ "LineOrd#5",
+ "8 Bellevue Drive, Pottstown, PF 19464",
+ null,
+ null,
+ null),
+ rowOfKind(
+ RowKind.INSERT,
+ "Ord#5",
+ "LineOrd#5",
+ "7 Bellevue Drive, Pottstown, PE 19464",
+ null,
+ null,
+ null),
+ rowOfKind(
+ RowKind.INSERT,
+ "Ord#2",
+ "LineOrd#2",
+ "4 Bellevue Drive, Pottstown, PB 19464",
+ null,
+ null,
+ null));
+ // +----------------------------------------------------------------+
+ // | left state |
+ // |----------------------------------------------------------------|
+ // | "Ord#2","LineOrd#2","4 Bellevue Drive, Pottstown, PB 19464" |
+ // | "Ord#5","LineOrd#5","7 Bellevue Drive, Pottstown, PE 19464" |
+ // +----------------------------------------------------------------+
+
+ records =
+ Arrays.asList(
+ insertRecord("Ord#5", "LineOrd#2", "SHIP"),
+ updateAfterRecord("Ord#6", "LineOrd#4", "AIR"),
+ updateBeforeRecord("Ord#5", "LineOrd#2", "SHIP"), //
-U +U pattern
+ updateAfterRecord("Ord#5", "LineOrd#2", "TRUCK"), //
-U +U pattern
+ updateAfterRecord("Ord#7", "LineOrd#0", "AIR"),
+ updateAfterRecord("Ord#8", "LineOrd#11", "AIR"));
+ for (StreamRecord<RowData> row : records) {
+ testHarness.processElement2(row);
+ }
+ assertor.shouldEmit(
+ testHarness,
+ rowOfKind(RowKind.INSERT, null, null, null, "Ord#6",
"LineOrd#4", "AIR"),
+ rowOfKind(
+ RowKind.DELETE,
+ "Ord#2",
+ "LineOrd#2",
+ "4 Bellevue Drive, Pottstown, PB 19464",
+ null,
+ null,
+ null),
+ rowOfKind(
+ RowKind.INSERT,
+ "Ord#2",
+ "LineOrd#2",
+ "4 Bellevue Drive, Pottstown, PB 19464",
+ "Ord#5",
+ "LineOrd#2",
+ "SHIP"),
+ rowOfKind(
+ RowKind.DELETE,
+ "Ord#1",
+ "LineOrd#1",
+ "3 Bellevue Drive, Pottstown, PA 19464",
+ null,
+ null,
+ null),
+ rowOfKind(
+ RowKind.DELETE,
+ "Ord#6",
+ "LineOrd#5",
+ "7 Bellevue Drive, Pottstown, PE 19464",
+ null,
+ null,
+ null),
+ rowOfKind(RowKind.INSERT, null, null, null, "Ord#7",
"LineOrd#0", "AIR"),
+ rowOfKind(RowKind.INSERT, null, null, null, "Ord#8",
"LineOrd#11", "AIR"),
+ rowOfKind(
+ RowKind.DELETE,
+ "Ord#2",
+ "LineOrd#2",
+ "4 Bellevue Drive, Pottstown, PB 19464",
+ "Ord#5",
+ "LineOrd#2",
+ "SHIP"),
+ rowOfKind(
+ RowKind.INSERT,
+ "Ord#2",
+ "LineOrd#2",
+ "4 Bellevue Drive, Pottstown, PB 19464",
+ "Ord#5",
+ "LineOrd#2",
+ "TRUCK"));
+ }
+
+ @Tag("miniBatchSize=15")
+ @Test
+ public void testLeftJoinWithNoUniqueKey() throws Exception {
+ // joinKey is LineOrd
+ // +I -U / +I -D / -U +U / -D +I
+ List<StreamRecord<RowData>> records =
+ Arrays.asList(
+ insertRecord("Ord#1", "LineOrd#1", "3 Bellevue Drive,
Pottstown, PA 19464"),
+ insertRecord("Ord#1", "LineOrd#2", "4 Bellevue Drive,
Pottstown, PB 19464"),
+ insertRecord(
+ "Ord#1",
+ "LineOrd#2",
+ "4 Bellevue Drive, Pottstown, PB 19464"), //
2x +I -D
+ deleteRecord("Ord#6", "LineOrd#6", "8 Bellevue Drive,
Pottstown, PF 19464"),
+ insertRecord(
+ "Ord#3",
+ "LineOrd#3",
+ "5 Bellevue Drive, Pottstown, PD 19464"), // x
+I -U
+ updateBeforeRecord(
+ "Ord#3", "LineOrd#3", "5 Bellevue Drive,
Pottstown, PD 19464"), // x
+ updateBeforeRecord(
+ "Ord#9",
+ "LineOrd#9",
+ "11 Bellevue Drive, Pottstown, PI 19464"), //
3x -U +U
+ updateAfterRecord(
+ "Ord#10", "LineOrd#10", "14 Bellevue Drive,
Pottstown, PJ 19464"),
+ deleteRecord(
+ "Ord#1",
+ "LineOrd#2",
+ "4 Bellevue Drive, Pottstown, PB 19464"), // 2x
+ updateAfterRecord(
+ "Ord#9",
+ "LineOrd#9",
+ "11 Bellevue Drive, Pottstown, PI 19464"), //
3x
+ deleteRecord(
+ "Ord#6",
+ "LineOrd#6",
+ "8 Bellevue Drive, Pottstown, PF 19464"), //
4x -D +I
+ insertRecord(
+ "Ord#6", "LineOrd#6", "8 Bellevue Drive,
Pottstown, PF 19464") // 4x
+ );
+
+ for (StreamRecord<RowData> row : records) {
+ testHarness.processElement2(row);
+ }
+ // +------------------------------------------------------------------+
+ // | right state |
+ // |------------------------------------------------------------------|
+ // | "Ord#1", "LineOrd#1", "3 Bellevue Drive, Pottstown, PA 19464" |
+ // | "Ord#1", "LineOrd#2", "4 Bellevue Drive, Pottstown, PB 19464" |
+ // | "Ord#10", "LineOrd#10", "14 Bellevue Drive, Pottstown, PJ 19464" |
+ // +------------------------------------------------------------------+
+ records =
+ Arrays.asList(
+ insertRecord("Ord#1", "LineOrd#1", "AIR"),
+ updateAfterRecord("Ord#1", "LineOrd#3", "SHIP"),
+ deleteRecord("Ord#6", "LineOrd#6", "RAILWAY"));
+
+ for (StreamRecord<RowData> row : records) {
+ testHarness.processElement1(row);
+ }
+ assertor.shouldEmit(
+ testHarness,
+ rowOfKind(RowKind.DELETE, "Ord#6", "LineOrd#6", "RAILWAY",
null, null, null),
+ rowOfKind(RowKind.INSERT, "Ord#1", "LineOrd#3", "SHIP", null,
null, null),
+ rowOfKind(
+ RowKind.INSERT,
+ "Ord#1",
+ "LineOrd#1",
+ "AIR",
+ "Ord#1",
+ "LineOrd#1",
+ "3 Bellevue Drive, Pottstown, PA 19464"));
+ }
+
+ /** Special for the pair of retract and accumulate. */
+ private void testLeftJoinWithUpdate() throws Exception {
+ // joinKey is LineOrd and uniqueKey is Ord
+ List<StreamRecord<RowData>> records =
+ Collections.singletonList(
+ insertRecord(
+ "Ord#2", "LineOrd#2", "4 Bellevue Drive,
Pottstown, PB 19464"));
+ for (StreamRecord<RowData> row : records) {
+ testHarness.processElement1(row);
+ }
+ assertor.shouldEmitNothing(testHarness);
+ records =
+ Arrays.asList(
+ insertRecord("Ord#2", "LineOrd#2", "SHIP"),
+ updateBeforeRecord("Ord#2", "LineOrd#2", "SHIP"),
+ updateAfterRecord("Ord#2", "LineOrd#2", "AIR"));
+ for (StreamRecord<RowData> row : records) {
+ testHarness.processElement2(row);
+ }
+ assertor.shouldEmit(
+ testHarness,
+ rowOfKind(
+ RowKind.INSERT,
+ "Ord#2",
+ "LineOrd#2",
+ "4 Bellevue Drive, Pottstown, PB 19464",
+ "Ord#2",
+ "LineOrd#2",
+ "SHIP"),
+ rowOfKind(
+ RowKind.UPDATE_BEFORE,
+ "Ord#2",
+ "LineOrd#2",
+ "4 Bellevue Drive, Pottstown, PB 19464",
+ "Ord#2",
+ "LineOrd#2",
+ "SHIP"),
+ rowOfKind(
+ RowKind.INSERT,
+ "Ord#2",
+ "LineOrd#2",
+ "4 Bellevue Drive, Pottstown, PB 19464",
+ "Ord#2",
+ "LineOrd#2",
+ "AIR"));
+ }
+
+ private void testLeftJoinWithoutRetract() throws Exception {
+ // joinKey is LineOrd and uniqueKey is Ord
+ List<StreamRecord<RowData>> records =
+ Arrays.asList(
+ insertRecord("Ord#2", "LineOrd#2", "4 Bellevue Drive,
Pottstown, PB 19464"),
+ updateAfterRecord(
+ "Ord#2", "LineOrd#2", "5 Bellevue Drive,
Pottstown, PC 19464"));
+ for (StreamRecord<RowData> row : records) {
+ testHarness.processElement1(row);
+ }
+ assertor.shouldEmit(
+ testHarness,
+ rowOfKind(
+ RowKind.INSERT,
+ "Ord#2",
+ "LineOrd#2",
+ "5 Bellevue Drive, Pottstown, PC 19464",
+ null,
+ null,
+ null));
+ records =
+ Arrays.asList(
+ updateAfterRecord("Ord#2", "LineOrd#2", "SHIP"),
+ updateAfterRecord("Ord#2", "LineOrd#2", "AIR"));
+ for (StreamRecord<RowData> row : records) {
+ testHarness.processElement2(row);
+ }
+ assertor.shouldEmit(
+ testHarness,
+ rowOfKind(
+ RowKind.DELETE,
+ "Ord#2",
+ "LineOrd#2",
+ "5 Bellevue Drive, Pottstown, PC 19464",
+ null,
+ null,
+ null),
+ rowOfKind(
+ RowKind.INSERT,
+ "Ord#2",
+ "LineOrd#2",
+ "5 Bellevue Drive, Pottstown, PC 19464",
+ "Ord#2",
+ "LineOrd#2",
+ "AIR"));
+ }
+
+ private void testLeftJoinWithoutAcc() throws Exception {
+ // joinKey is LineOrd and uniqueKey is Ord
+ List<StreamRecord<RowData>> records =
+ Arrays.asList(
+ updateBeforeRecord(
+ "Ord#2", "LineOrd#2", "4 Bellevue Drive,
Pottstown, PB 19464"),
+ deleteRecord(
+ "Ord#2", "LineOrd#2", "5 Bellevue Drive,
Pottstown, PC 19464"));
+ for (StreamRecord<RowData> row : records) {
+ testHarness.processElement1(row);
+ }
+ assertor.shouldEmit(
+ testHarness,
+ rowOfKind(
+ RowKind.DELETE,
+ "Ord#2",
+ "LineOrd#2",
+ "5 Bellevue Drive, Pottstown, PC 19464",
+ null,
+ null,
+ null));
+ records =
+ Arrays.asList(
+ deleteRecord("Ord#2", "LineOrd#2", "SHIP"),
+ updateBeforeRecord("Ord#2", "LineOrd#2", "AIR"));
+ for (StreamRecord<RowData> row : records) {
+ testHarness.processElement2(row);
+ }
+ assertor.shouldEmitNothing(testHarness);
+ }
+
+ private void testLeftJoinWithUpdateRecordsMultipleCases() throws Exception
{
+ // joinKey is LineOrd and uniqueKey is Ord
+ List<StreamRecord<RowData>> records =
+ Arrays.asList(
+ insertRecord("Ord#2", "LineOrd#2", "4 Bellevue Drive,
Pottstown, PB 19464"),
+ updateAfterRecord(
+ "Ord#5", "LineOrd#5", "7 Bellevue Drive,
Pottstown, PE 19464"),
+ insertRecord("Ord#0", "LineOrd#4", "5 Bellevue Drive,
Pottstown, PB 19464"),
+ updateAfterRecord(
+ "Ord#4", "LineOrd#0", "6 Bellevue Drive,
Pottstown, PB 19464"));
+
+ for (StreamRecord<RowData> row : records) {
+ testHarness.processElement1(row);
+ }
+
+ assertor.shouldEmit(
+ testHarness,
+ rowOfKind(
+ RowKind.INSERT,
+ "Ord#4",
+ "LineOrd#0",
+ "6 Bellevue Drive, Pottstown, PB 19464",
+ null,
+ null,
+ null),
+ rowOfKind(
+ RowKind.INSERT,
+ "Ord#0",
+ "LineOrd#4",
+ "5 Bellevue Drive, Pottstown, PB 19464",
+ null,
+ null,
+ null),
+ rowOfKind(
+ RowKind.INSERT,
+ "Ord#5",
+ "LineOrd#5",
+ "7 Bellevue Drive, Pottstown, PE 19464",
+ null,
+ null,
+ null),
+ rowOfKind(
+ RowKind.INSERT,
+ "Ord#2",
+ "LineOrd#2",
+ "4 Bellevue Drive, Pottstown, PB 19464",
+ null,
+ null,
+ null));
+
+ // only +U and the joinKey changes
+ records =
+ Arrays.asList(
+ insertRecord("Ord#6", "LineOrd#4", "AIR"),
+ // +I 6 joinKey=4
+ updateAfterRecord("Ord#6", "LineOrd#4", "AIR"),
+ // +U 6 joinKey=4
+ updateAfterRecord("Ord#6", "LineOrd#5", "AIR"),
+ // +U 6 joinKey=5 not expected record
+ updateAfterRecord("Ord#6", "LineOrd#4", "TRUCK")
+ // +U 6 joinKey=4
+ );
+ for (StreamRecord<RowData> row : records) {
+ testHarness.processElement2(row);
+ }
+
+ assertor.shouldEmit(
+ testHarness,
+ rowOfKind(
+ RowKind.DELETE,
+ "Ord#0",
+ "LineOrd#4",
+ "5 Bellevue Drive, Pottstown, PB 19464",
+ null,
+ null,
+ null),
+ rowOfKind(
+ RowKind.INSERT,
+ "Ord#0",
+ "LineOrd#4",
+ "5 Bellevue Drive, Pottstown, PB 19464",
+ "Ord#6",
+ "LineOrd#4",
+ "TRUCK"),
+ rowOfKind(
+ RowKind.DELETE,
+ "Ord#5",
+ "LineOrd#5",
+ "7 Bellevue Drive, Pottstown, PE 19464",
+ null,
+ null,
+ null),
+ rowOfKind(
+ RowKind.INSERT,
+ "Ord#5",
+ "LineOrd#5",
+ "7 Bellevue Drive, Pottstown, PE 19464",
+ "Ord#6",
+ "LineOrd#5",
+ "AIR"));
+
+ // -D +I update records
+ records =
+ Arrays.asList(
+ deleteRecord("Ord#6", "LineOrd#4", "TRUCK"),
+ // +I 6 joinKey=4
+ insertRecord("Ord#6", "LineOrd#4", "TRUCK2"),
+ // +U 6 joinKey=4
+ deleteRecord("Ord#6", "LineOrd#4", "TRUCK3"),
+ // +U 6 joinKey=5 not expected record
+ insertRecord("Ord#6", "LineOrd#4", "AIR")
+ // +U 6 joinKey=4
+ );
+ for (StreamRecord<RowData> row : records) {
+ testHarness.processElement2(row);
+ }
+
+ assertor.shouldEmit(
+ testHarness,
+ rowOfKind(
+ RowKind.DELETE,
+ "Ord#0",
+ "LineOrd#4",
+ "5 Bellevue Drive, Pottstown, PB 19464",
+ "Ord#6",
+ "LineOrd#4",
+ "TRUCK"),
+ rowOfKind(
+ RowKind.INSERT,
+ "Ord#0",
+ "LineOrd#4",
+ "5 Bellevue Drive, Pottstown, PB 19464",
+ "Ord#6",
+ "LineOrd#4",
+ "AIR"));
+
+ // -U +U disOrder
+ records =
+ Arrays.asList(
+ insertRecord("Ord#5", "LineOrd#2", "SHIP"), // +I 5
+ updateAfterRecord("Ord#5", "LineOrd#2", "TRUCK"), //
+U 5 disorder
+ updateBeforeRecord("Ord#5", "LineOrd#2", "SHIP"), //
-U 5 disorder
+ updateAfterRecord("Ord#9", "LineOrd#7", "TRUCK"));
+ for (StreamRecord<RowData> row : records) {
+ testHarness.processElement2(row);
+ }
+ assertor.shouldEmitNothing(testHarness);
+ }
+
+ @Override
+ public MiniBatchStreamingJoinOperator createJoinOperator(TestInfo
testInfo) {
+ RowDataKeySelector[] keySelectors =
ukSelectorExtractor.apply(testInfo.getDisplayName());
+ leftUniqueKeySelector = keySelectors[0];
+ rightUniqueKeySelector = keySelectors[1];
+ JoinInputSideSpec[] inputSideSpecs =
inputSpecExtractor.apply(testInfo.getDisplayName());
+ Boolean[] isOuter = joinTypeExtractor.apply(testInfo.getDisplayName());
+ FlinkJoinType joinType =
flinkJoinTypeExtractor.apply(testInfo.getDisplayName());
+ int batchSize = miniBatchSizeExtractor.apply(testInfo.getTags());
+ Long[] ttl = STATE_RETENTION_TIME_EXTRACTOR.apply(testInfo.getTags());
+
+ return MiniBatchStreamingJoinOperator.newMiniBatchStreamJoinOperator(
+ joinType,
+ leftTypeInfo,
+ rightTypeInfo,
+ joinCondition,
+ inputSideSpecs[0],
+ inputSideSpecs[1],
+ isOuter[0],
+ isOuter[1],
+ new boolean[] {true},
+ ttl[0],
+ ttl[0],
+ new CountCoBundleTrigger<>(batchSize));
+ }
+
+ @Override
+ public RowType getOutputType() {
+ return RowType.of(
+ Stream.concat(
+
leftTypeInfo.toRowType().getChildren().stream(),
+
rightTypeInfo.toRowType().getChildren().stream())
+ .toArray(LogicalType[]::new),
+ Stream.concat(
+
leftTypeInfo.toRowType().getFieldNames().stream(),
+
rightTypeInfo.toRowType().getFieldNames().stream())
+ .toArray(String[]::new));
+ }
+
+ private final Function<String, JoinInputSideSpec[]> inputSpecExtractor =
+ (testDisplayName) -> {
+ if (testDisplayName.contains("JoinKeyContainsUniqueKey")) {
+ return new JoinInputSideSpec[] {
+ JoinInputSideSpec.withUniqueKeyContainedByJoinKey(
+ leftTypeInfo, leftUniqueKeySelector),
+ JoinInputSideSpec.withUniqueKeyContainedByJoinKey(
+ rightTypeInfo, rightUniqueKeySelector)
+ };
+ } else if (testDisplayName.contains("HasUniqueKey")) {
+ return new JoinInputSideSpec[] {
+ JoinInputSideSpec.withUniqueKey(leftTypeInfo,
leftUniqueKeySelector),
+ JoinInputSideSpec.withUniqueKey(rightTypeInfo,
rightUniqueKeySelector)
+ };
+ } else {
+ return new JoinInputSideSpec[] {
+ JoinInputSideSpec.withoutUniqueKey(),
JoinInputSideSpec.withoutUniqueKey()
+ };
+ }
+ };
+
+ private final Function<String, RowDataKeySelector[]> ukSelectorExtractor =
+ (testDisplayName) -> {
+ if (testDisplayName.contains("JoinKeyContainsUniqueKey")) {
+ return new RowDataKeySelector[] {
+ HandwrittenSelectorUtil.getRowDataSelector(
+ new int[] {1},
+
leftTypeInfo.toRowType().getChildren().toArray(new LogicalType[0])),
+ HandwrittenSelectorUtil.getRowDataSelector(
+ new int[] {1},
+
rightTypeInfo.toRowType().getChildren().toArray(new LogicalType[0]))
+ };
+ } else if (testDisplayName.contains("HasUniqueKey")) {
+ return new RowDataKeySelector[] {
+ HandwrittenSelectorUtil.getRowDataSelector(
+ new int[] {0},
+
leftTypeInfo.toRowType().getChildren().toArray(new LogicalType[0])),
+ HandwrittenSelectorUtil.getRowDataSelector(
+ new int[] {0},
+
rightTypeInfo.toRowType().getChildren().toArray(new LogicalType[0]))
+ };
+ } else {
+ return new RowDataKeySelector[] {null, null};
+ }
+ };
+
+ private final Function<Set<String>, Integer> miniBatchSizeExtractor =
+ (tags) -> {
+ int size = 5;
+ if (tags.isEmpty()) {
+ return size; // default
+ }
+ for (String tag : tags) {
+ String[] splits = tag.split("=");
+ int value = Integer.parseInt(splits[1].trim());
+ if (splits[0].trim().startsWith("miniBatchSize")) {
+ size = value;
+ break;
+ }
+ }
+ return size;
+ };
+
+ private final Function<String, Boolean[]> joinTypeExtractor =
+ (testDisplayName) -> {
+ if (testDisplayName.contains("InnerJoin")) {
+ return new Boolean[] {false, false};
+ } else if (testDisplayName.contains("LeftJoin")) {
+ return new Boolean[] {true, false};
+ } else if (testDisplayName.contains("RightJoin")) {
+ return new Boolean[] {false, true};
+ } else {
+ return new Boolean[] {true, true};
+ }
+ };
+
+ private final Function<String, FlinkJoinType> flinkJoinTypeExtractor =
+ (testDisplayName) -> {
+ if (testDisplayName.contains("InnerJoin")) {
+ return FlinkJoinType.INNER;
+ } else if (testDisplayName.contains("LeftJoin")) {
+ return FlinkJoinType.LEFT;
+ } else if (testDisplayName.contains("RightJoin")) {
+ return FlinkJoinType.RIGHT;
+ } else {
+ return FlinkJoinType.FULL;
+ }
+ };
+}