This is an automated email from the ASF dual-hosted git repository.

ron pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 8170c457cb7 [FLINK-34221][table-runtime] Introduce 
MiniBatchStreamingJoinOperator
8170c457cb7 is described below

commit 8170c457cb70bb8fd88b98baf3acc612eaab8ec5
Author: yeming <[email protected]>
AuthorDate: Mon Jan 22 10:26:26 2024 +0800

    [FLINK-34221][table-runtime] Introduce MiniBatchStreamingJoinOperator
    
    This closes 24160
---
 .../stream/MiniBatchStreamingJoinOperator.java     |  430 +++++
 .../join/stream/StreamingJoinOperator.java         |   31 +-
 .../runtime/operators/metrics/SimpleGauge.java     |   44 +
 .../join/stream/StreamingJoinOperatorTestBase.java |   13 +-
 .../stream/StreamingMiniBatchJoinOperatorTest.java | 1956 ++++++++++++++++++++
 5 files changed, 2456 insertions(+), 18 deletions(-)

diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/stream/MiniBatchStreamingJoinOperator.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/stream/MiniBatchStreamingJoinOperator.java
new file mode 100644
index 00000000000..e0d667c67c6
--- /dev/null
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/stream/MiniBatchStreamingJoinOperator.java
@@ -0,0 +1,430 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.join.stream;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.util.RowDataUtil;
+import org.apache.flink.table.runtime.generated.GeneratedJoinCondition;
+import 
org.apache.flink.table.runtime.operators.bundle.trigger.BundleTriggerCallback;
+import org.apache.flink.table.runtime.operators.bundle.trigger.CoBundleTrigger;
+import org.apache.flink.table.runtime.operators.join.FlinkJoinType;
+import 
org.apache.flink.table.runtime.operators.join.stream.bundle.BufferBundle;
+import 
org.apache.flink.table.runtime.operators.join.stream.bundle.InputSideHasNoUniqueKeyBundle;
+import 
org.apache.flink.table.runtime.operators.join.stream.bundle.InputSideHasUniqueKeyBundle;
+import 
org.apache.flink.table.runtime.operators.join.stream.bundle.JoinKeyContainsUniqueKeyBundle;
+import 
org.apache.flink.table.runtime.operators.join.stream.state.JoinInputSideSpec;
+import 
org.apache.flink.table.runtime.operators.join.stream.state.JoinRecordStateView;
+import org.apache.flink.table.runtime.operators.metrics.SimpleGauge;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+
+import java.io.Serializable;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+/** Streaming unbounded Join base operator which support mini-batch join. */
+public abstract class MiniBatchStreamingJoinOperator extends 
StreamingJoinOperator
+        implements BundleTriggerCallback {
+
+    private static final long serialVersionUID = -1106342589994963997L;
+
+    private final CoBundleTrigger<RowData, RowData> coBundleTrigger;
+
+    private transient BufferBundle<?> leftBuffer;
+    private transient BufferBundle<?> rightBuffer;
+    private transient SimpleGauge<Integer> leftBundleReducedSizeGauge;
+    private transient SimpleGauge<Integer> rightBundleReducedSizeGauge;
+    private transient TypeSerializer<RowData> leftSerializer;
+    private transient TypeSerializer<RowData> rightSerializer;
+
+    public MiniBatchStreamingJoinOperator(MiniBatchStreamingJoinParameter 
parameter) {
+        super(
+                parameter.leftType,
+                parameter.rightType,
+                parameter.generatedJoinCondition,
+                parameter.leftInputSideSpec,
+                parameter.rightInputSideSpec,
+                parameter.leftIsOuter,
+                parameter.rightIsOuter,
+                parameter.filterNullKeys,
+                parameter.leftStateRetentionTime,
+                parameter.rightStateRetentionTime);
+
+        this.coBundleTrigger = parameter.coBundleTrigger;
+    }
+
+    @Override
+    public void open() throws Exception {
+        super.open();
+
+        this.leftBuffer = initialBuffer(leftInputSideSpec);
+        this.rightBuffer = initialBuffer(rightInputSideSpec);
+
+        coBundleTrigger.registerCallback(this);
+        coBundleTrigger.reset();
+        LOG.info("Initialize MiniBatchStreamingJoinOperator successfully.");
+
+        this.leftSerializer = leftType.createSerializer(getExecutionConfig());
+        this.rightSerializer = 
rightType.createSerializer(getExecutionConfig());
+
+        // register metrics
+        leftBundleReducedSizeGauge = new SimpleGauge<>(0);
+        rightBundleReducedSizeGauge = new SimpleGauge<>(0);
+
+        getRuntimeContext()
+                .getMetricGroup()
+                .gauge("leftBundleReducedSize", leftBundleReducedSizeGauge);
+        getRuntimeContext()
+                .getMetricGroup()
+                .gauge("rightBundleReducedSize", rightBundleReducedSizeGauge);
+    }
+
+    @Override
+    public void processElement1(StreamRecord<RowData> element) throws 
Exception {
+        RowData record = leftSerializer.copy(element.getValue());
+        RowData joinKey = (RowData) getCurrentKey();
+        RowData uniqueKey = null;
+        if (leftInputSideSpec.getUniqueKeySelector() != null) {
+            uniqueKey = 
leftInputSideSpec.getUniqueKeySelector().getKey(record);
+        }
+        leftBuffer.addRecord(joinKey, uniqueKey, record);
+        coBundleTrigger.onElement1(record);
+    }
+
+    @Override
+    public void processElement2(StreamRecord<RowData> element) throws 
Exception {
+        RowData record = rightSerializer.copy(element.getValue());
+        RowData joinKey = (RowData) getCurrentKey();
+        RowData uniqueKey = null;
+        if (rightInputSideSpec.getUniqueKeySelector() != null) {
+            uniqueKey = 
rightInputSideSpec.getUniqueKeySelector().getKey(record);
+        }
+        rightBuffer.addRecord(joinKey, uniqueKey, record);
+        coBundleTrigger.onElement2(record);
+    }
+
+    @Override
+    public void processWatermark1(Watermark mark) throws Exception {
+        finishBundle();
+        super.processWatermark1(mark);
+    }
+
+    @Override
+    public void processWatermark2(Watermark mark) throws Exception {
+        finishBundle();
+        super.processWatermark2(mark);
+    }
+
+    @Override
+    public void prepareSnapshotPreBarrier(long checkpointId) throws Exception {
+        finishBundle();
+    }
+
+    @Override
+    public void finish() throws Exception {
+        finishBundle();
+        super.finish();
+    }
+
+    @Override
+    public void close() throws Exception {
+        super.close();
+        this.leftBuffer.clear();
+        this.rightBuffer.clear();
+    }
+
+    @Override
+    public void finishBundle() throws Exception {
+        if (!leftBuffer.isEmpty() || !rightBuffer.isEmpty()) {
+            // update metrics value
+            leftBundleReducedSizeGauge.update(leftBuffer.reducedSize());
+            rightBundleReducedSizeGauge.update(rightBuffer.reducedSize());
+
+            this.processBundles(leftBuffer, rightBuffer);
+            leftBuffer.clear();
+            rightBuffer.clear();
+        }
+        coBundleTrigger.reset();
+    }
+
+    protected abstract void processBundles(BufferBundle<?> leftBuffer, 
BufferBundle<?> rightBuffer)
+            throws Exception;
+
+    private BufferBundle<?> initialBuffer(JoinInputSideSpec inputSideSpec) {
+        if (inputSideSpec.joinKeyContainsUniqueKey()) {
+            return new JoinKeyContainsUniqueKeyBundle();
+        }
+        if (inputSideSpec.hasUniqueKey()) {
+            return new InputSideHasUniqueKeyBundle();
+        }
+        return new InputSideHasNoUniqueKeyBundle();
+    }
+
+    private void processElementWithSuppress(
+            Iterator<RowData> iter,
+            JoinRecordStateView inputSideStateView,
+            JoinRecordStateView otherSideStateView,
+            boolean inputIsLeft)
+            throws Exception {
+        RowData pre = null; // always retractMsg if not null
+        while (iter.hasNext()) {
+            RowData current = iter.next();
+            boolean isSuppress = false;
+            if (RowDataUtil.isRetractMsg(current) && iter.hasNext()) {
+                RowData next = iter.next();
+                if (RowDataUtil.isAccumulateMsg(next)) {
+                    isSuppress = true;
+                } else {
+                    // retract + retract
+                    pre = next;
+                }
+                processElement(
+                        current, inputSideStateView, otherSideStateView, 
inputIsLeft, isSuppress);
+                if (isSuppress) {
+                    processElement(
+                            next, inputSideStateView, otherSideStateView, 
inputIsLeft, isSuppress);
+                }
+            } else {
+                // 1. current is accumulateMsg 2. current is retractMsg and no 
next row
+                if (pre != null) {
+                    if (RowDataUtil.isAccumulateMsg(current)) {
+                        isSuppress = true;
+                    }
+                    processElement(
+                            pre, inputSideStateView, otherSideStateView, 
inputIsLeft, isSuppress);
+                    pre = null;
+                }
+                processElement(
+                        current, inputSideStateView, otherSideStateView, 
inputIsLeft, isSuppress);
+            }
+        }
+    }
+
+    /**
+     * RetractMsg+accumulatingMsg would be optimized which would keep sending 
retractMsg but do not
+     * deal with state.
+     */
+    protected void processSingleSideBundles(
+            BufferBundle<?> inputBuffer,
+            JoinRecordStateView inputSideStateView,
+            JoinRecordStateView otherSideStateView,
+            boolean inputIsLeft)
+            throws Exception {
+        if (inputBuffer instanceof InputSideHasNoUniqueKeyBundle) {
+            // -U/+U pair is already folded in the buffer so it is no need to 
go to
+            // processElementByPair function
+            for (Map.Entry<RowData, List<RowData>> entry : 
inputBuffer.getRecords().entrySet()) {
+                // set state key first
+                setCurrentKey(entry.getKey());
+                for (RowData rowData : entry.getValue()) {
+                    processElement(
+                            rowData, inputSideStateView, otherSideStateView, 
inputIsLeft, false);
+                }
+            }
+        } else if (inputBuffer instanceof JoinKeyContainsUniqueKeyBundle) {
+            for (Map.Entry<RowData, List<RowData>> entry : 
inputBuffer.getRecords().entrySet()) {
+                // set state key first
+                setCurrentKey(entry.getKey());
+                Iterator<RowData> iter = entry.getValue().iterator();
+                processElementWithSuppress(
+                        iter, inputSideStateView, otherSideStateView, 
inputIsLeft);
+            }
+        } else if (inputBuffer instanceof InputSideHasUniqueKeyBundle) {
+            for (RowData joinKey : inputBuffer.getJoinKeys()) {
+                // set state key first
+                setCurrentKey(joinKey);
+                for (Map.Entry<RowData, List<RowData>> entry :
+                        inputBuffer.getRecordsWithJoinKey(joinKey).entrySet()) 
{
+                    Iterator<RowData> iter = entry.getValue().iterator();
+                    processElementWithSuppress(
+                            iter, inputSideStateView, otherSideStateView, 
inputIsLeft);
+                }
+            }
+        }
+    }
+
+    public static MiniBatchStreamingJoinOperator 
newMiniBatchStreamJoinOperator(
+            FlinkJoinType joinType,
+            InternalTypeInfo<RowData> leftType,
+            InternalTypeInfo<RowData> rightType,
+            GeneratedJoinCondition generatedJoinCondition,
+            JoinInputSideSpec leftInputSideSpec,
+            JoinInputSideSpec rightInputSideSpec,
+            boolean leftIsOuter,
+            boolean rightIsOuter,
+            boolean[] filterNullKeys,
+            long leftStateRetentionTime,
+            long rightStateRetentionTime,
+            CoBundleTrigger<RowData, RowData> coBundleTrigger) {
+        MiniBatchStreamingJoinParameter parameter =
+                new MiniBatchStreamingJoinParameter(
+                        leftType,
+                        rightType,
+                        generatedJoinCondition,
+                        leftInputSideSpec,
+                        rightInputSideSpec,
+                        leftIsOuter,
+                        rightIsOuter,
+                        filterNullKeys,
+                        leftStateRetentionTime,
+                        rightStateRetentionTime,
+                        coBundleTrigger);
+        switch (joinType) {
+            case INNER:
+                return new MiniBatchInnerJoinStreamOperator(parameter);
+            case LEFT:
+                return new MiniBatchLeftOuterJoinStreamOperator(parameter);
+            case RIGHT:
+                return new MiniBatchRightOuterJoinStreamOperator(parameter);
+            case FULL:
+                return new MiniBatchFullOuterJoinStreamOperator(parameter);
+            default:
+                throw new UnsupportedOperationException("Unsupported join 
type: " + joinType);
+        }
+    }
+
+    static class MiniBatchStreamingJoinParameter implements Serializable {
+        InternalTypeInfo<RowData> leftType;
+        InternalTypeInfo<RowData> rightType;
+        GeneratedJoinCondition generatedJoinCondition;
+        JoinInputSideSpec leftInputSideSpec;
+        JoinInputSideSpec rightInputSideSpec;
+        boolean leftIsOuter;
+        boolean rightIsOuter;
+        boolean[] filterNullKeys;
+        long leftStateRetentionTime;
+        long rightStateRetentionTime;
+
+        CoBundleTrigger<RowData, RowData> coBundleTrigger;
+
+        MiniBatchStreamingJoinParameter(
+                InternalTypeInfo<RowData> leftType,
+                InternalTypeInfo<RowData> rightType,
+                GeneratedJoinCondition generatedJoinCondition,
+                JoinInputSideSpec leftInputSideSpec,
+                JoinInputSideSpec rightInputSideSpec,
+                boolean leftIsOuter,
+                boolean rightIsOuter,
+                boolean[] filterNullKeys,
+                long leftStateRetentionTime,
+                long rightStateRetentionTime,
+                CoBundleTrigger<RowData, RowData> coBundleTrigger) {
+            this.leftType = leftType;
+            this.rightType = rightType;
+            this.generatedJoinCondition = generatedJoinCondition;
+            this.leftInputSideSpec = leftInputSideSpec;
+            this.rightInputSideSpec = rightInputSideSpec;
+            this.leftIsOuter = leftIsOuter;
+            this.rightIsOuter = rightIsOuter;
+            this.filterNullKeys = filterNullKeys;
+            this.leftStateRetentionTime = leftStateRetentionTime;
+            this.rightStateRetentionTime = rightStateRetentionTime;
+            this.coBundleTrigger = coBundleTrigger;
+        }
+    }
+
+    /** Inner MiniBatch Join operator. */
+    private static final class MiniBatchInnerJoinStreamOperator
+            extends MiniBatchStreamingJoinOperator {
+
+        public 
MiniBatchInnerJoinStreamOperator(MiniBatchStreamingJoinParameter parameter) {
+            super(parameter);
+        }
+
+        @Override
+        protected void processBundles(BufferBundle<?> leftBuffer, 
BufferBundle<?> rightBuffer)
+                throws Exception {
+            // process right
+            this.processSingleSideBundles(
+                    rightBuffer, rightRecordStateView, leftRecordStateView, 
false);
+            // process left
+            this.processSingleSideBundles(
+                    leftBuffer, leftRecordStateView, rightRecordStateView, 
true);
+        }
+    }
+
+    /** MiniBatch Left outer join operator. */
+    private static final class MiniBatchLeftOuterJoinStreamOperator
+            extends MiniBatchStreamingJoinOperator {
+
+        public 
MiniBatchLeftOuterJoinStreamOperator(MiniBatchStreamingJoinParameter parameter) 
{
+            super(parameter);
+        }
+
+        @Override
+        protected void processBundles(BufferBundle<?> leftBuffer, 
BufferBundle<?> rightBuffer)
+                throws Exception {
+            // more efficient to process right first for left out join, i.e, 
some retractions can be
+            // avoided for timing-dependent left and right stream
+            // process right
+            this.processSingleSideBundles(
+                    rightBuffer, rightRecordStateView, leftRecordStateView, 
false);
+            // process left
+            this.processSingleSideBundles(
+                    leftBuffer, leftRecordStateView, rightRecordStateView, 
true);
+        }
+    }
+
+    /** MiniBatch Right outer join operator. */
+    private static final class MiniBatchRightOuterJoinStreamOperator
+            extends MiniBatchStreamingJoinOperator {
+
+        public 
MiniBatchRightOuterJoinStreamOperator(MiniBatchStreamingJoinParameter 
parameter) {
+            super(parameter);
+        }
+
+        @Override
+        protected void processBundles(BufferBundle<?> leftBuffer, 
BufferBundle<?> rightBuffer)
+                throws Exception {
+
+            // more efficient to process left first for right out join, i.e, 
some retractions can be
+            // avoided for timing-dependent left and right stream
+            // process left
+            this.processSingleSideBundles(
+                    leftBuffer, leftRecordStateView, rightRecordStateView, 
true);
+
+            // process right
+            this.processSingleSideBundles(
+                    rightBuffer, rightRecordStateView, leftRecordStateView, 
false);
+        }
+    }
+
+    /** MiniBatch Full outer Join operator. */
+    private static final class MiniBatchFullOuterJoinStreamOperator
+            extends MiniBatchStreamingJoinOperator {
+
+        public 
MiniBatchFullOuterJoinStreamOperator(MiniBatchStreamingJoinParameter parameter) 
{
+            super(parameter);
+        }
+
+        @Override
+        protected void processBundles(BufferBundle<?> leftBuffer, 
BufferBundle<?> rightBuffer)
+                throws Exception {
+            // process right
+            this.processSingleSideBundles(
+                    rightBuffer, rightRecordStateView, leftRecordStateView, 
false);
+            // process left
+            this.processSingleSideBundles(
+                    leftBuffer, leftRecordStateView, rightRecordStateView, 
true);
+        }
+    }
+}
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/stream/StreamingJoinOperator.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/stream/StreamingJoinOperator.java
index 308b98e2794..5b9a139e5d2 100644
--- 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/stream/StreamingJoinOperator.java
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/stream/StreamingJoinOperator.java
@@ -38,18 +38,18 @@ public class StreamingJoinOperator extends 
AbstractStreamingJoinOperator {
     private static final long serialVersionUID = -376944622236540545L;
 
     // whether left side is outer side, e.g. left is outer but right is not 
when LEFT OUTER JOIN
-    private final boolean leftIsOuter;
+    protected final boolean leftIsOuter;
     // whether right side is outer side, e.g. right is outer but left is not 
when RIGHT OUTER JOIN
-    private final boolean rightIsOuter;
+    protected final boolean rightIsOuter;
 
     private transient JoinedRowData outRow;
     private transient RowData leftNullRow;
     private transient RowData rightNullRow;
 
     // left join state
-    private transient JoinRecordStateView leftRecordStateView;
+    protected transient JoinRecordStateView leftRecordStateView;
     // right join state
-    private transient JoinRecordStateView rightRecordStateView;
+    protected transient JoinRecordStateView rightRecordStateView;
 
     public StreamingJoinOperator(
             InternalTypeInfo<RowData> leftType,
@@ -123,12 +123,12 @@ public class StreamingJoinOperator extends 
AbstractStreamingJoinOperator {
 
     @Override
     public void processElement1(StreamRecord<RowData> element) throws 
Exception {
-        processElement(element.getValue(), leftRecordStateView, 
rightRecordStateView, true);
+        processElement(element.getValue(), leftRecordStateView, 
rightRecordStateView, true, false);
     }
 
     @Override
     public void processElement2(StreamRecord<RowData> element) throws 
Exception {
-        processElement(element.getValue(), rightRecordStateView, 
leftRecordStateView, false);
+        processElement(element.getValue(), rightRecordStateView, 
leftRecordStateView, false, false);
     }
 
     /**
@@ -196,12 +196,15 @@ public class StreamingJoinOperator extends 
AbstractStreamingJoinOperator {
      * @param inputSideStateView state of input side
      * @param otherSideStateView state of other side
      * @param inputIsLeft whether input side is left side
+     * @param isSuppress whether suppress the output of redundant messages 
when the other side is
+     *     outer join. This only applies to the case of mini-batch.
      */
-    private void processElement(
+    protected void processElement(
             RowData input,
             JoinRecordStateView inputSideStateView,
             JoinRecordStateView otherSideStateView,
-            boolean inputIsLeft)
+            boolean inputIsLeft,
+            boolean isSuppress)
             throws Exception {
         boolean inputIsOuter = inputIsLeft ? leftIsOuter : rightIsOuter;
         boolean otherIsOuter = inputIsLeft ? rightIsOuter : leftIsOuter;
@@ -228,7 +231,7 @@ public class StreamingJoinOperator extends 
AbstractStreamingJoinOperator {
                         for (OuterRecord outerRecord : 
associatedRecords.getOuterRecords()) {
                             RowData other = outerRecord.record;
                             // if the matched num in the matched rows == 0
-                            if (outerRecord.numOfAssociations == 0) {
+                            if (outerRecord.numOfAssociations == 0 && 
!isSuppress) {
                                 // send -D[null+other]
                                 outRow.setRowKind(RowKind.DELETE);
                                 outputNullPadding(other, !inputIsLeft);
@@ -254,8 +257,8 @@ public class StreamingJoinOperator extends 
AbstractStreamingJoinOperator {
                         OuterJoinRecordStateView otherSideOuterStateView =
                                 (OuterJoinRecordStateView) otherSideStateView;
                         for (OuterRecord outerRecord : 
associatedRecords.getOuterRecords()) {
-                            if (outerRecord.numOfAssociations
-                                    == 0) { // if the matched num in the 
matched rows == 0
+                            if (outerRecord.numOfAssociations == 0
+                                    && !isSuppress) { // if the matched num in 
the matched rows == 0
                                 // send -D[null+other]
                                 outRow.setRowKind(RowKind.DELETE);
                                 outputNullPadding(outerRecord.record, 
!inputIsLeft);
@@ -278,7 +281,9 @@ public class StreamingJoinOperator extends 
AbstractStreamingJoinOperator {
             }
         } else { // input record is retract
             // state.retract(record)
-            inputSideStateView.retractRecord(input);
+            if (!isSuppress) {
+                inputSideStateView.retractRecord(input);
+            }
             if (associatedRecords.isEmpty()) { // there is no matched rows on 
the other side
                 if (inputIsOuter) { // input side is outer
                     // send -D[record+null]
@@ -302,7 +307,7 @@ public class StreamingJoinOperator extends 
AbstractStreamingJoinOperator {
                     OuterJoinRecordStateView otherSideOuterStateView =
                             (OuterJoinRecordStateView) otherSideStateView;
                     for (OuterRecord outerRecord : 
associatedRecords.getOuterRecords()) {
-                        if (outerRecord.numOfAssociations == 1) {
+                        if (outerRecord.numOfAssociations == 1 && !isSuppress) 
{
                             // send +I[null+other]
                             outRow.setRowKind(RowKind.INSERT);
                             outputNullPadding(outerRecord.record, 
!inputIsLeft);
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/metrics/SimpleGauge.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/metrics/SimpleGauge.java
new file mode 100644
index 00000000000..b0a6cb51eb6
--- /dev/null
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/metrics/SimpleGauge.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.metrics;
+
+import org.apache.flink.metrics.Gauge;
+
+/**
+ * A Simple gauge providing method for value updating.
+ *
+ * @param <T> Type of the actual recording value
+ */
+public class SimpleGauge<T> implements Gauge<T> {
+
+    private T value;
+
+    public SimpleGauge(T initialValue) {
+        this.value = initialValue;
+    }
+
+    public void update(T value) {
+        this.value = value;
+    }
+
+    @Override
+    public T getValue() {
+        return value;
+    }
+}
diff --git 
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/stream/StreamingJoinOperatorTestBase.java
 
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/stream/StreamingJoinOperatorTestBase.java
index 48cc9884e66..68afbe72999 100644
--- 
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/stream/StreamingJoinOperatorTestBase.java
+++ 
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/stream/StreamingJoinOperatorTestBase.java
@@ -51,7 +51,7 @@ public abstract class StreamingJoinOperatorTestBase {
                             },
                             new String[] {"order_id", "line_order_id", 
"shipping_address"}));
 
-    protected final InternalTypeInfo<RowData> rightTypeInfo =
+    protected InternalTypeInfo<RowData> rightTypeInfo =
             InternalTypeInfo.of(
                     RowType.of(
                             new LogicalType[] {new CharType(false, 20), new 
CharType(true, 10)},
@@ -61,7 +61,7 @@ public abstract class StreamingJoinOperatorTestBase {
             HandwrittenSelectorUtil.getRowDataSelector(
                     new int[] {1},
                     leftTypeInfo.toRowType().getChildren().toArray(new 
LogicalType[0]));
-    protected final RowDataKeySelector rightKeySelector =
+    protected RowDataKeySelector rightKeySelector =
             HandwrittenSelectorUtil.getRowDataSelector(
                     new int[] {0},
                     rightTypeInfo.toRowType().getChildren().toArray(new 
LogicalType[0]));
@@ -94,8 +94,7 @@ public abstract class StreamingJoinOperatorTestBase {
     protected final GeneratedJoinCondition joinCondition =
             new GeneratedJoinCondition("ConditionFunction", funcCode, new 
Object[0]);
 
-    protected final RowDataHarnessAssertor assertor =
-            new 
RowDataHarnessAssertor(getOutputType().getChildren().toArray(new 
LogicalType[0]));
+    protected RowDataHarnessAssertor assertor;
 
     protected KeyedTwoInputStreamOperatorTestHarness<RowData, RowData, 
RowData, RowData>
             testHarness;
@@ -109,6 +108,10 @@ public abstract class StreamingJoinOperatorTestBase {
                         rightKeySelector,
                         joinKeyTypeInfo);
         testHarness.open();
+        // extend for mini-batch join test
+        assertor =
+                new RowDataHarnessAssertor(
+                        getOutputType().getChildren().toArray(new 
LogicalType[0]));
     }
 
     @AfterEach
@@ -121,7 +124,7 @@ public abstract class StreamingJoinOperatorTestBase {
                 if (tags.isEmpty()) {
                     return new Long[] {0L, 0L};
                 }
-                Long[] ttl = new Long[2];
+                Long[] ttl = new Long[] {0L, 0L};
                 for (String tag : tags) {
                     String[] splits = tag.split("=");
                     long value = Long.parseLong(splits[1].trim());
diff --git 
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/stream/StreamingMiniBatchJoinOperatorTest.java
 
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/stream/StreamingMiniBatchJoinOperatorTest.java
new file mode 100644
index 00000000000..62b8116a0b0
--- /dev/null
+++ 
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/stream/StreamingMiniBatchJoinOperatorTest.java
@@ -0,0 +1,1956 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.join.stream;
+
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.keyselector.RowDataKeySelector;
+import 
org.apache.flink.table.runtime.operators.bundle.trigger.CountCoBundleTrigger;
+import org.apache.flink.table.runtime.operators.join.FlinkJoinType;
+import 
org.apache.flink.table.runtime.operators.join.stream.state.JoinInputSideSpec;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.types.logical.CharType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.utils.HandwrittenSelectorUtil;
+import org.apache.flink.types.RowKind;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInfo;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Stream;
+
+import static 
org.apache.flink.table.runtime.util.StreamRecordUtils.deleteRecord;
+import static 
org.apache.flink.table.runtime.util.StreamRecordUtils.insertRecord;
+import static org.apache.flink.table.runtime.util.StreamRecordUtils.rowOfKind;
+import static 
org.apache.flink.table.runtime.util.StreamRecordUtils.updateAfterRecord;
+import static 
org.apache.flink.table.runtime.util.StreamRecordUtils.updateBeforeRecord;
+
+/** Test for StreamingMiniBatchJoinOperatorTest. */
+public final class StreamingMiniBatchJoinOperatorTest extends 
StreamingJoinOperatorTestBase {
+
+    private RowDataKeySelector leftUniqueKeySelector;
+    private RowDataKeySelector rightUniqueKeySelector;
+
+    @BeforeEach
+    public void beforeEach(TestInfo testInfo) throws Exception {
+        rightTypeInfo =
+                InternalTypeInfo.of(
+                        RowType.of(
+                                new LogicalType[] {
+                                    new CharType(false, 20),
+                                    new CharType(false, 20),
+                                    new CharType(true, 10)
+                                },
+                                new String[] {
+                                    "order_id#", "line_order_id0", 
"line_order_ship_mode"
+                                }));
+
+        rightKeySelector =
+                HandwrittenSelectorUtil.getRowDataSelector(
+                        new int[] {1},
+                        rightTypeInfo.toRowType().getChildren().toArray(new 
LogicalType[0]));
+        super.beforeEach(testInfo);
+    }
+
+    @Tag("miniBatchSize=3")
+    @Test
+    public void testLeftJoinWithLeftArriveFirst() throws Exception {
+        // joinKey is LineOrd
+        testHarness.processElement1(
+                insertRecord(
+                        "Ord#1", "LineOrd#2", "3 Bellevue Drive, Pottstown, PA 
19464")); // left +I
+        testHarness.processElement1(
+                insertRecord(
+                        "Ord#i",
+                        "LineOrd#6",
+                        "i6 Bellevue Drive, Pottstown, Pi 19464")); // left  +I
+        assertor.shouldEmitNothing(testHarness);
+        testHarness.processElement2(insertRecord("Ord#X", "LineOrd#2", 
"AIR")); // right
+
+        assertor.shouldEmit(
+                testHarness,
+                rowOfKind(
+                        RowKind.INSERT,
+                        "Ord#i",
+                        "LineOrd#6",
+                        "i6 Bellevue Drive, Pottstown, Pi 19464",
+                        null,
+                        null,
+                        null),
+                rowOfKind(
+                        RowKind.INSERT,
+                        "Ord#1",
+                        "LineOrd#2",
+                        "3 Bellevue Drive, Pottstown, PA 19464",
+                        "Ord#X",
+                        "LineOrd#2",
+                        "AIR"));
+    }
+
+    @Tag("miniBatchSize=1")
+    @Test
+    public void testLeftJoinWithLeftArriveFirstNoMiniBatch() throws Exception {
+        // joinKey is LineOrd
+        testHarness.processElement1(
+                insertRecord(
+                        "Ord#1", "LineOrd#2", "3 Bellevue Drive, Pottstown, PA 
19464")); // left +I
+
+        testHarness.processElement1(
+                insertRecord(
+                        "Ord#i",
+                        "LineOrd#6",
+                        "i6 Bellevue Drive, Pottstown, Pi 19464")); // left  +I
+        testHarness.processElement2(insertRecord("Ord#X", "LineOrd#2", 
"AIR")); // right
+
+        assertor.shouldEmit(
+                testHarness,
+                rowOfKind(
+                        RowKind.INSERT,
+                        "Ord#1",
+                        "LineOrd#2",
+                        "3 Bellevue Drive, Pottstown, PA 19464",
+                        null,
+                        null,
+                        null),
+                rowOfKind(
+                        RowKind.INSERT,
+                        "Ord#i",
+                        "LineOrd#6",
+                        "i6 Bellevue Drive, Pottstown, Pi 19464",
+                        null,
+                        null,
+                        null),
+                rowOfKind(
+                        RowKind.DELETE,
+                        "Ord#1",
+                        "LineOrd#2",
+                        "3 Bellevue Drive, Pottstown, PA 19464",
+                        null,
+                        null,
+                        null),
+                rowOfKind(
+                        RowKind.INSERT,
+                        "Ord#1",
+                        "LineOrd#2",
+                        "3 Bellevue Drive, Pottstown, PA 19464",
+                        "Ord#X",
+                        "LineOrd#2",
+                        "AIR"));
+    }
+
+    @Tag("miniBatchSize=3")
+    @Test
+    public void testRightJoinWithRightArriveFirst() throws Exception {
+        // joinKey is LineOrd
+        testHarness.processElement2(insertRecord("Ord#X", "LineOrd#2", 
"AIR")); // right
+
+        testHarness.processElement1(
+                insertRecord(
+                        "Ord#1", "LineOrd#2", "3 Bellevue Drive, Pottstown, PA 
19464")); // left +I
+        assertor.shouldEmitNothing(testHarness);
+        testHarness.processElement1(
+                insertRecord(
+                        "Ord#i",
+                        "LineOrd#6",
+                        "i6 Bellevue Drive, Pottstown, Pi 19464")); // left  +I
+
+        assertor.shouldEmit(
+                testHarness,
+                rowOfKind(
+                        RowKind.INSERT,
+                        "Ord#1",
+                        "LineOrd#2",
+                        "3 Bellevue Drive, Pottstown, PA 19464",
+                        "Ord#X",
+                        "LineOrd#2",
+                        "AIR"));
+    }
+
+    @Tag("miniBatchSize=1")
+    @Test
+    public void testRightJoinWithRightArriveFirstWithNoMiniBatch() throws 
Exception {
+        // joinKey is LineOrd
+        testHarness.processElement2(insertRecord("Ord#X", "LineOrd#2", 
"AIR")); // right
+
+        testHarness.processElement1(
+                insertRecord(
+                        "Ord#1", "LineOrd#2", "3 Bellevue Drive, Pottstown, PA 
19464")); // left +I
+        testHarness.processElement1(
+                insertRecord(
+                        "Ord#i",
+                        "LineOrd#6",
+                        "i6 Bellevue Drive, Pottstown, Pi 19464")); // left  +I
+
+        assertor.shouldEmit(
+                testHarness,
+                rowOfKind(RowKind.INSERT, null, null, null, "Ord#X", 
"LineOrd#2", "AIR"),
+                rowOfKind(RowKind.DELETE, null, null, null, "Ord#X", 
"LineOrd#2", "AIR"),
+                rowOfKind(
+                        RowKind.INSERT,
+                        "Ord#1",
+                        "LineOrd#2",
+                        "3 Bellevue Drive, Pottstown, PA 19464",
+                        "Ord#X",
+                        "LineOrd#2",
+                        "AIR"));
+    }
+
+    @Tag("miniBatchSize=3")
+    @Test
+    public void testFullJoinWithRightArriveFirst() throws Exception {
+        // joinKey is LineOrd
+        testHarness.processElement2(insertRecord("Ord#X", "LineOrd#2", 
"AIR")); // right
+
+        testHarness.processElement1(
+                insertRecord(
+                        "Ord#1", "LineOrd#2", "3 Bellevue Drive, Pottstown, PA 
19464")); // left +I
+        assertor.shouldEmitNothing(testHarness);
+        testHarness.processElement1(
+                insertRecord(
+                        "Ord#i",
+                        "LineOrd#6",
+                        "i6 Bellevue Drive, Pottstown, Pi 19464")); // left  +I
+
+        assertor.shouldEmit(
+                testHarness,
+                rowOfKind(RowKind.INSERT, null, null, null, "Ord#X", 
"LineOrd#2", "AIR"),
+                rowOfKind(
+                        RowKind.INSERT,
+                        "Ord#i",
+                        "LineOrd#6",
+                        "i6 Bellevue Drive, Pottstown, Pi 19464",
+                        null,
+                        null,
+                        null),
+                rowOfKind(RowKind.DELETE, null, null, null, "Ord#X", 
"LineOrd#2", "AIR"),
+                rowOfKind(
+                        RowKind.INSERT,
+                        "Ord#1",
+                        "LineOrd#2",
+                        "3 Bellevue Drive, Pottstown, PA 19464",
+                        "Ord#X",
+                        "LineOrd#2",
+                        "AIR"));
+    }
+
+    @Tag("miniBatchSize=1")
+    @Test
+    public void testFullJoinWithRightArriveFirstWithNoMiniBatch() throws 
Exception {
+        // joinKey is LineOrd
+        testHarness.processElement2(insertRecord("Ord#X", "LineOrd#2", 
"AIR")); // right
+
+        testHarness.processElement1(
+                insertRecord(
+                        "Ord#1", "LineOrd#2", "3 Bellevue Drive, Pottstown, PA 
19464")); // left +I
+        testHarness.processElement1(
+                insertRecord(
+                        "Ord#i",
+                        "LineOrd#6",
+                        "i6 Bellevue Drive, Pottstown, Pi 19464")); // left  +I
+
+        assertor.shouldEmit(
+                testHarness,
+                rowOfKind(RowKind.INSERT, null, null, null, "Ord#X", 
"LineOrd#2", "AIR"),
+                rowOfKind(RowKind.DELETE, null, null, null, "Ord#X", 
"LineOrd#2", "AIR"),
+                rowOfKind(
+                        RowKind.INSERT,
+                        "Ord#1",
+                        "LineOrd#2",
+                        "3 Bellevue Drive, Pottstown, PA 19464",
+                        "Ord#X",
+                        "LineOrd#2",
+                        "AIR"),
+                rowOfKind(
+                        RowKind.INSERT,
+                        "Ord#i",
+                        "LineOrd#6",
+                        "i6 Bellevue Drive, Pottstown, Pi 19464",
+                        null,
+                        null,
+                        null));
+    }
+
+    @Tag("miniBatchSize=4")
+    @Test
+    public void testInnerJoinJoinKeyContainsUniqueKeyWithoutFold() throws 
Exception {
+        // joinKey is LineOrd
+        testHarness.setStateTtlProcessingTime(1);
+        // basic test for that the mini-batch process could be triggerred 
normally
+        testHarness.processElement1(
+                insertRecord(
+                        "Ord#1",
+                        "LineOrd#1",
+                        "1 Bellevue Drive, Pottstown, PA 19464")); // left +I x
+        assertor.shouldEmitNothing(testHarness);
+
+        testHarness.processElement1(
+                insertRecord(
+                        "Ord#1",
+                        "LineOrd#2",
+                        "2 Bellevue Drive, Pottstown, PA 19464")); // left +I 
xx
+        assertor.shouldEmitNothing(testHarness);
+
+        testHarness.processElement2(insertRecord("Ord#Y", "LineOrd#1", 
"TRUCK")); // right +I x
+        assertor.shouldEmitNothing(testHarness);
+
+        // exactly reach to the mini-batch size
+        testHarness.processElement2(insertRecord("Ord#X", "LineOrd#2", 
"AIR")); // right +I   xx
+        assertor.shouldEmit(
+                testHarness,
+                rowOfKind(
+                        RowKind.INSERT,
+                        "Ord#1",
+                        "LineOrd#1",
+                        "1 Bellevue Drive, Pottstown, PA 19464",
+                        "Ord#Y",
+                        "LineOrd#1",
+                        "TRUCK"),
+                rowOfKind(
+                        RowKind.INSERT,
+                        "Ord#1",
+                        "LineOrd#2",
+                        "2 Bellevue Drive, Pottstown, PA 19464",
+                        "Ord#X",
+                        "LineOrd#2",
+                        "AIR"));
+    }
+
+    @Tag("miniBatchSize=18")
+    @Test
+    public void testInnerJoinWithJoinKeyContainsUniqueKeyWithinBatch() throws 
Exception {
+        // joinKey is LineOrd
+        // left fold  || right fold
+        // +I +U / +U +U / +U -D ||  +I -D / +U -U / +I -U
+        testHarness.processElement1(
+                insertRecord(
+                        "Ord#1",
+                        "LineOrd#1",
+                        "3 Bellevue Drive, Pottstown, PA 19464")); // left +I  
x
+        testHarness.processElement1(
+                updateAfterRecord(
+                        "Ord#2",
+                        "LineOrd#1",
+                        "4 Bellevue Drive, Pottstown, PB 19464")); // left +U 
x | +I +U
+        testHarness.processElement2(insertRecord("Ord#X", "LineOrd#7", 
"RAILWAY")); // right +I  a
+        testHarness.processElement1(
+                updateAfterRecord(
+                        "Ord#5",
+                        "LineOrd#5",
+                        "7 Bellevue Drive, Pottstown, PE 19464")); // left +U  
y
+        testHarness.processElement1(
+                updateAfterRecord(
+                        "Ord#x5",
+                        "LineOrd#5",
+                        "x3 Bellevue Drive, Pottstown, PAxx 19464")); // left 
+U   y |  +U  +U
+        testHarness.processElement1(
+                insertRecord(
+                        "Ord#i", "LineOrd#6", "i6 Bellevue Drive, Pottstown, 
Pi 19464")); // left +I
+        testHarness.processElement2(
+                deleteRecord("Ord#X", "LineOrd#7", "RAILWAY")); // right -D  a 
| +I -D
+        testHarness.processElement1(
+                updateAfterRecord(
+                        "Ord#3",
+                        "LineOrd#x3",
+                        "x5 Bellevue Drive, Pottstown, PCxx 19464")); // left 
+U
+        testHarness.processElement1(
+                deleteRecord(
+                        "Ord#3",
+                        "LineOrd#x3",
+                        "14y0 Bellevue Drive, Pottstown, PJyy 19464")); // 
left -D  | +U -D
+        testHarness.processElement2(
+                updateAfterRecord("Ord#X", "LineOrd#7", "AIR")); // right +U   
 b
+        testHarness.processElement1(
+                insertRecord(
+                        "Ord#4", "LineOrd#4", "6 Bellevue Drive, Pottstown, PD 
19464")); // left +I
+        assertor.shouldEmitNothing(testHarness);
+        testHarness.processElement2(
+                updateBeforeRecord("Ord#X", "LineOrd#7", "AIR")); // right -U  
  b  | +U -U
+        testHarness.processElement2(insertRecord("Ord#X", "LineOrd#2", 
"AIR")); // right +I     c
+        testHarness.processElement2(
+                updateBeforeRecord("Ord#X", "LineOrd#2", "AIR")); // right -U  
  c  | +I -U
+        // right state is empty
+        testHarness.processElement2(insertRecord("Ord#X", "LineOrd#1", 
"AIR")); // right +I
+        testHarness.processElement2(updateBeforeRecord("Ord#Y", "LineOrd#5", 
"TRUCK")); // right -U
+        testHarness.processElement2(insertRecord("Ord#Y", "LineOrd#6", 
"RAILWAY")); // right +I
+        testHarness.processElement2(
+                updateBeforeRecord(
+                        "Ord#Z", "LineOrd#4", "RAILWAY")); // no effect to 
state  // right -U
+        // left state  |  right state
+        // "Ord#2", "LineOrd#1", "4 Bellevue Drive, Pottstown, PB 19464"     | 
 "Ord#X",
+        // "LineOrd#1", "AIR"
+        // "Ord#x5", "LineOrd#5", "x3 Bellevue Drive, Pottstown, PAxx 19464" | 
 "Ord#Y",
+        // "LineOrd#6", "RAILWAY"
+        // "Ord#i", "LineOrd#6", "i6 Bellevue Drive, Pottstown, Pi 19464"
+        // "Ord#4", "LineOrd#4", "6 Bellevue Drive, Pottstown, PD 19464"
+        assertor.shouldEmit(
+                testHarness,
+                rowOfKind(
+                        RowKind.INSERT,
+                        "Ord#i",
+                        "LineOrd#6",
+                        "i6 Bellevue Drive, Pottstown, Pi 19464",
+                        "Ord#Y",
+                        "LineOrd#6",
+                        "RAILWAY"),
+                rowOfKind(
+                        RowKind.UPDATE_AFTER,
+                        "Ord#2",
+                        "LineOrd#1",
+                        "4 Bellevue Drive, Pottstown, PB 19464",
+                        "Ord#X",
+                        "LineOrd#1",
+                        "AIR"));
+    }
+
+    @Tag("miniBatchSize=10")
+    @Test
+    public void testInnerJoinWithJoinKeyContainsUniqueKeyCrossBatches() throws 
Exception {
+        // joinKey is LineOrd
+
+        testHarness.processElement1(
+                insertRecord(
+                        "Ord#1",
+                        "LineOrd#1",
+                        "3 Bellevue Drive, Pottstown, PA 19464")); // left +I  
 z
+        testHarness.processElement1(
+                insertRecord(
+                        "Ord#1",
+                        "LineOrd#1",
+                        "3 Bellevue Drive, Pottstown, PA 19464")); // left +I  
 z  | +I +I
+        testHarness.processElement2(insertRecord("Ord#Y", "LineOrd#4", 
"TRUCK")); // right +I    y
+        assertor.shouldEmitNothing(testHarness);
+        testHarness.processElement1(
+                insertRecord(
+                        "Ord#4", "LineOrd#4", "6 Bellevue Drive, Pottstown, PD 
19464")); // left +I
+        testHarness.processElement1(
+                insertRecord(
+                        "Ord#5",
+                        "LineOrd#5",
+                        "7 Bellevue Drive, Pottstown, PE 19464")); // left +I  
  x
+        testHarness.processElement1(
+                updateBeforeRecord(
+                        "Ord#5",
+                        "LineOrd#5",
+                        "7 Bellevue Drive, Pottstown, PE 19464")); // left -U  
x  |  +I -U
+        testHarness.processElement1(
+                updateBeforeRecord(
+                        "Ord#9", "LineOrd#9", "11 Bellevue Drive, Pottstown, 
PI 19464")); // left -U
+        assertor.shouldEmitNothing(testHarness);
+        testHarness.processElement2(insertRecord("Ord#X", "LineOrd#9", 
"AIR")); // right +I
+        testHarness.processElement2(updateAfterRecord("Ord#xyz", "LineOrd#1", 
"SHIP")); // right +U
+        testHarness.processElement2(
+                deleteRecord("Ord#Y", "LineOrd#4", "TRUCK")); // right -D    y 
| +I -D
+        assertor.shouldEmit(
+                testHarness,
+                rowOfKind(
+                        RowKind.UPDATE_BEFORE,
+                        "Ord#9",
+                        "LineOrd#9",
+                        "11 Bellevue Drive, Pottstown, PI 19464",
+                        "Ord#X",
+                        "LineOrd#9",
+                        "AIR"),
+                rowOfKind(
+                        RowKind.INSERT,
+                        "Ord#1",
+                        "LineOrd#1",
+                        "3 Bellevue Drive, Pottstown, PA 19464",
+                        "Ord#xyz",
+                        "LineOrd#1",
+                        "SHIP"));
+        // 
+---------------------------------------------------------------+---------------------------------+
+        // |                            left state                         |   
        right state
+        //         |
+        // 
|---------------------------------------------------------------|---------------------------------|
+        // | "Ord#1", "LineOrd#1", "3 Bellevue Drive, Pottstown, PA 19464" | 
"Ord#X", "LineOrd#9",
+        // "AIR"     |
+        // | "Ord#4", "LineOrd#4", "6 Bellevue Drive, Pottstown, PD 19464" | 
"Ord#xyz", "LineOrd#1",
+        // "SHIP"  |
+        // 
+---------------------------------------------------------------+---------------------------------+
+
+        // second join:
+        // 1.left stream state(last batch defined) join new input from right 
stream.
+        // 2.right stream state(current and last batch defined) join new input 
from left stream.
+
+        testHarness.processElement1(
+                updateAfterRecord(
+                        "Ord#adjust",
+                        "LineOrd#4",
+                        "14 Bellevue Drive, Pottstown, PJ 19464")); // left +U 
x
+        testHarness.processElement1(
+                updateAfterRecord(
+                        "Ord#18",
+                        "LineOrd#9",
+                        "22 Bellevue Drive, Pottstown, PK 19464")); // left +U
+        testHarness.processElement2(deleteRecord("Ord#X", "LineOrd#x3", 
"AIR")); // right -D
+        testHarness.processElement2(updateBeforeRecord("Ord#xyz", "LineOrd#1", 
"SHIP")); // right -U
+        testHarness.processElement2(insertRecord("Ord#Y", "LineOrd#4", 
"TRUCK")); // right +I
+        testHarness.processElement1(
+                updateAfterRecord(
+                        "Ord#14",
+                        "LineOrd#4",
+                        "18 Bellevue Drive, Pottstown, PL 19464")); // left +U 
 x | +U +U
+        testHarness.processElement1(
+                deleteRecord(
+                        "Ord#3",
+                        "LineOrd#x3",
+                        "x5 Bellevue Drive, Pottstown, PCxx 19464")); // left 
-D
+        testHarness.processElement1(
+                insertRecord(
+                        "Ord#3",
+                        "LineOrd#x3",
+                        "x5 Bellevue Drive, Pottstown, PCxx 19464")); // left 
+I
+        testHarness.processElement1(
+                insertRecord(
+                        "Ord#10",
+                        "LineOrd#100y",
+                        "14y0 Bellevue Drive, Pottstown, PJyy 19464")); // 
left +I
+        assertor.shouldEmitNothing(testHarness);
+
+        testHarness.processElement2(updateAfterRecord("Ord#101", "LineOrd#x3", 
"AIR")); // right +U
+
+        assertor.shouldEmit(
+                testHarness,
+                rowOfKind(
+                        RowKind.INSERT,
+                        "Ord#4",
+                        "LineOrd#4",
+                        "6 Bellevue Drive, Pottstown, PD 19464",
+                        "Ord#Y",
+                        "LineOrd#4",
+                        "TRUCK"),
+                rowOfKind(
+                        RowKind.UPDATE_BEFORE,
+                        "Ord#1",
+                        "LineOrd#1",
+                        "3 Bellevue Drive, Pottstown, PA 19464",
+                        "Ord#xyz",
+                        "LineOrd#1",
+                        "SHIP"),
+                rowOfKind(
+                        RowKind.DELETE,
+                        "Ord#3",
+                        "LineOrd#x3",
+                        "x5 Bellevue Drive, Pottstown, PCxx 19464",
+                        "Ord#101",
+                        "LineOrd#x3",
+                        "AIR"),
+                rowOfKind(
+                        RowKind.INSERT,
+                        "Ord#3",
+                        "LineOrd#x3",
+                        "x5 Bellevue Drive, Pottstown, PCxx 19464",
+                        "Ord#101",
+                        "LineOrd#x3",
+                        "AIR"),
+                rowOfKind(
+                        RowKind.UPDATE_AFTER,
+                        "Ord#18",
+                        "LineOrd#9",
+                        "22 Bellevue Drive, Pottstown, PK 19464",
+                        "Ord#X",
+                        "LineOrd#9",
+                        "AIR"),
+                rowOfKind(
+                        RowKind.UPDATE_AFTER,
+                        "Ord#14",
+                        "LineOrd#4",
+                        "18 Bellevue Drive, Pottstown, PL 19464",
+                        "Ord#Y",
+                        "LineOrd#4",
+                        "TRUCK"));
+    }
+
+    @Tag("miniBatchSize=13")
+    @Test
+    public void testInnerJoinWithHasUniqueKeyWithinBatch() throws Exception {
+        // joinKey is LineOrd and uniqueKey is Ord
+        // +I +U / +I -U / +I -D / +U -D /+U +U
+        List<StreamRecord<RowData>> records =
+                Arrays.asList(
+                        insertRecord(
+                                "Ord#1",
+                                "LineOrd#1",
+                                "3 Bellevue Drive, Pottstown, PA 19464"), // 2 
 +I -U
+                        insertRecord("Ord#2", "LineOrd#2", "4 Bellevue Drive, 
Pottstown, PB 19464"),
+                        insertRecord(
+                                "Ord#3",
+                                "LineOrd#10",
+                                "5 Bellevue Drive, Pottstown, PC 19464"), // 1 
 +I +U
+                        updateAfterRecord(
+                                "Ord#3",
+                                "LineOrd#10",
+                                "xxx Bellevue Drive, Pottstown, PJ 19464"), // 
1
+                        updateAfterRecord(
+                                "Ord#5", "LineOrd#5", "7 Bellevue Drive, 
Pottstown, PE 19464"),
+                        insertRecord(
+                                "Ord#6",
+                                "LineOrd#5",
+                                "8 Bellevue Drive, Pottstown, PF 19464"), // 3 
 +I -D
+                        updateBeforeRecord(
+                                "Ord#1", "LineOrd#1", "3 Bellevue Drive, 
Pottstown, PA 19464"), // 2
+                        deleteRecord(
+                                "Ord#6", "LineOrd#5", "7 Bellevue Drive, 
Pottstown, PE 19464"), // 3
+                        updateBeforeRecord(
+                                "Ord#12",
+                                "LineOrd#4",
+                                "6 Bellevue Drive, Pottstown, PD 19464"), // 
no effect
+                        updateAfterRecord(
+                                "Ord#9",
+                                "LineOrd#3",
+                                "5 Bellevue Drive, Pottstown, PC 19464"), // 4 
  +U -D
+                        deleteRecord(
+                                "Ord#9",
+                                "LineOrd#3",
+                                "5 Bellevue Drive, Pottstown, PC 19464")); // 4
+        for (StreamRecord<RowData> row : records) {
+            testHarness.processElement1(row);
+        }
+        // 
+-------------------------------------------------------------------+
+        // |                            left state                             
|
+        // 
|-------------------------------------------------------------------|
+        // | "Ord#2", "LineOrd#2", "4 Bellevue Drive, Pottstown, PB 19464"     
|
+        // | "Ord#3", "LineOrd#10", "xxx Bellevue Drive, Pottstown, PJ 19464"  
|
+        // | "Ord#5", "LineOrd#5", "7 Bellevue Drive, Pottstown, PE 19464"     
|
+        // 
+-------------------------------------------------------------------+
+        assertor.shouldEmitNothing(testHarness);
+        records =
+                Arrays.asList(
+                        insertRecord("Ord#5", "LineOrd#2", "SHIP"),
+                        updateAfterRecord("Ord#6", "LineOrd#5", "AIR"));
+        for (StreamRecord<RowData> row : records) {
+            testHarness.processElement2(row);
+        }
+        assertor.shouldEmit(
+                testHarness,
+                rowOfKind(
+                        RowKind.UPDATE_AFTER,
+                        "Ord#5",
+                        "LineOrd#5",
+                        "7 Bellevue Drive, Pottstown, PE 19464",
+                        "Ord#6",
+                        "LineOrd#5",
+                        "AIR"),
+                rowOfKind(
+                        RowKind.INSERT,
+                        "Ord#2",
+                        "LineOrd#2",
+                        "4 Bellevue Drive, Pottstown, PB 19464",
+                        "Ord#5",
+                        "LineOrd#2",
+                        "SHIP"));
+    }
+
+    @Tag("miniBatchSize=8")
+    @Test
+    public void testInnerJoinWithHasUniqueKeyCrossBatches() throws Exception {
+        // joinKey is LineOrd and uniqueKey is Ord
+        // fold +I/+U +U (same and different jks)
+        testHarness.processElement1(
+                updateBeforeRecord(
+                        "Ord#1", "LineOrd#1", "3 Bellevue Drive, Pottstown, PA 
19464")); // left -U
+        testHarness.processElement1(
+                updateAfterRecord(
+                        "Ord#2", "LineOrd#2", "4 Bellevue Drive, Pottstown, PB 
19464")); // left  +U
+        testHarness.processElement2(insertRecord("Ord#5", "LineOrd#5", 
"SHIP")); // right  +I
+        testHarness.processElement1(
+                insertRecord(
+                        "Ord#4",
+                        "LineOrd#4",
+                        "5 Bellevue Drive, Pottstown, PC 19464")); // left +I  
x
+        testHarness.processElement1(
+                updateAfterRecord(
+                        "Ord#4",
+                        "LineOrd#4",
+                        "6 Bellevue Drive, Pottstown, PD 19464")); // left +U  
x | +I +U
+        testHarness.processElement2(
+                updateAfterRecord("Ord#22", "LineOrd#4", "SHIP")); // right  
+U    join
+        testHarness.processElement2(insertRecord("Ord#23", "LineOrd#10", 
"AIR")); // right +I
+        assertor.shouldEmitNothing(testHarness);
+        testHarness.processElement1(
+                updateAfterRecord(
+                        "Ord#4",
+                        "LineOrd#4",
+                        "xxx Bellevue Drive, Pottstown, PJ 19464")); // left 
+U x | +I +U +U  join
+        assertor.shouldEmit(
+                testHarness,
+                rowOfKind(
+                        RowKind.UPDATE_AFTER,
+                        "Ord#4",
+                        "LineOrd#4",
+                        "xxx Bellevue Drive, Pottstown, PJ 19464",
+                        "Ord#22",
+                        "LineOrd#4",
+                        "SHIP"));
+
+        // 
+-----------------------------------------------------------------+----------------------------------+
+        // |                              left state                         | 
          right state
+        //            |
+        // 
|-----------------------------------------------------------------|----------------------------------|
+        // | "Ord#2", "LineOrd#2", "4 Bellevue Drive, Pottstown, PB 19464"   | 
"Ord#5", "LineOrd#5",
+        // "SHIP"     |
+        // | "Ord#4", "LineOrd#4", "xxx Bellevue Drive, Pottstown, PJ 19464" | 
"Ord#22",
+        // "LineOrd#4", "SHIP"    |
+        // |                                                                 | 
"Ord#23",
+        // "LineOrd#10", "AIR"    |
+        // 
+-----------------------------------------------------------------+----------------------------------+
+
+        // fold +I/+U -U/D (same and different jks)
+        testHarness.processElement1(
+                insertRecord(
+                        "Ord#5", "LineOrd#5", "7 Bellevue Drive, Pottstown, PE 
19464")); // left  +I
+        testHarness.processElement1(
+                updateBeforeRecord(
+                        "Ord#6",
+                        "LineOrd#6",
+                        "8 Bellevue Drive, Pottstown, PF 19464")); // left   -U
+        testHarness.processElement2(insertRecord("Ord#21", "LineOrd#5", 
"RAILWAY")); //  right +I  x
+        testHarness.processElement2(
+                insertRecord("Ord#1", "LineOrd#5", "TRUCK")); //   right +I  x 
| +I +I
+        testHarness.processElement1(
+                updateAfterRecord(
+                        "Ord#6", "LineOrd#6", "8 Bellevue Drive, Pottstown, PF 
19464")); // left +U
+        testHarness.processElement1(
+                deleteRecord(
+                        "Ord#2", "LineOrd#2", "4 Bellevue Drive, Pottstown, PB 
19464")); // left -D
+        testHarness.processElement2(
+                updateBeforeRecord("Ord#5", "LineOrd#5", "SHIP")); // right -U 
  x | +I +I -U
+        assertor.shouldEmitNothing(testHarness);
+        testHarness.processElement2(updateBeforeRecord("Ord#22", "LineOrd#6", 
"AIR")); // right -U
+
+        assertor.shouldEmit(
+                testHarness,
+                rowOfKind(
+                        RowKind.INSERT,
+                        "Ord#5",
+                        "LineOrd#5",
+                        "7 Bellevue Drive, Pottstown, PE 19464",
+                        "Ord#21",
+                        "LineOrd#5",
+                        "RAILWAY"),
+                rowOfKind(
+                        RowKind.INSERT,
+                        "Ord#5",
+                        "LineOrd#5",
+                        "7 Bellevue Drive, Pottstown, PE 19464",
+                        "Ord#1",
+                        "LineOrd#5",
+                        "TRUCK"));
+    }
+
+    @Tag("miniBatchSize=20")
+    @Test
+    public void testInnerJoinWithNoUniqueKeyWithinBatch() throws Exception {
+        // joinKey is LineOrd
+        // +I -U / +I -D / -U +U / -D +I
+        List<StreamRecord<RowData>> records =
+                Arrays.asList(
+                        insertRecord("Ord#1", "LineOrd#1", "3 Bellevue Drive, 
Pottstown, PA 19464"),
+                        insertRecord("Ord#1", "LineOrd#2", "4 Bellevue Drive, 
Pottstown, PB 19464"),
+                        insertRecord(
+                                "Ord#1",
+                                "LineOrd#2",
+                                "4 Bellevue Drive, Pottstown, PB 19464"), // 
2x    +I -D
+                        deleteRecord("Ord#6", "LineOrd#6", "8 Bellevue Drive, 
Pottstown, PF 19464"),
+                        insertRecord("Ord#1", "LineOrd#1", "3 Bellevue Drive, 
Pottstown, PA 19464"),
+                        insertRecord(
+                                "Ord#3",
+                                "LineOrd#3",
+                                "5 Bellevue Drive, Pottstown, PD 19464"), // x 
  +I -U
+                        updateBeforeRecord(
+                                "Ord#3", "LineOrd#3", "5 Bellevue Drive, 
Pottstown, PD 19464"), // x
+                        updateBeforeRecord(
+                                "Ord#9",
+                                "LineOrd#9",
+                                "11 Bellevue Drive, Pottstown, PI 19464"), // 
3x   -U +U
+                        updateAfterRecord(
+                                "Ord#10", "LineOrd#10", "14 Bellevue Drive, 
Pottstown, PJ 19464"),
+                        updateAfterRecord(
+                                "Ord#18", "LineOrd#18", "22 Bellevue Drive, 
Pottstown, PK 19464"),
+                        deleteRecord(
+                                "Ord#1",
+                                "LineOrd#2",
+                                "4 Bellevue Drive, Pottstown, PB 19464"), // 2x
+                        updateAfterRecord(
+                                "Ord#9",
+                                "LineOrd#9",
+                                "11 Bellevue Drive, Pottstown, PI 19464"), // 
3x
+                        deleteRecord(
+                                "Ord#6",
+                                "LineOrd#6",
+                                "8 Bellevue Drive, Pottstown, PF 19464"), // 
4x   -D +I
+                        insertRecord(
+                                "Ord#6", "LineOrd#6", "8 Bellevue Drive, 
Pottstown, PF 19464") // 4x
+                        );
+        for (StreamRecord<RowData> row : records) {
+            testHarness.processElement2(row);
+        }
+        records =
+                Arrays.asList(
+                        insertRecord("Ord#1", "LineOrd#1", "AIR"),
+                        updateAfterRecord("Ord#1", "LineOrd#2", "SHIP"),
+                        updateBeforeRecord("Ord#1", "LineOrd#2", "RAILWAY"),
+                        insertRecord("Ord#1", "LineOrd#2", "RAILWAY"),
+                        deleteRecord("Ord#6", "LineOrd#6", "RAILWAY"),
+                        insertRecord("Ord#6", "LineOrd#6", "RAILWAY"));
+
+        for (StreamRecord<RowData> row : records) {
+            testHarness.processElement1(row);
+        }
+        assertor.shouldEmit(
+                testHarness,
+                rowOfKind(
+                        RowKind.INSERT,
+                        "Ord#1",
+                        "LineOrd#1",
+                        "AIR",
+                        "Ord#1",
+                        "LineOrd#1",
+                        "3 Bellevue Drive, Pottstown, PA 19464"),
+                rowOfKind(
+                        RowKind.INSERT,
+                        "Ord#1",
+                        "LineOrd#1",
+                        "AIR",
+                        "Ord#1",
+                        "LineOrd#1",
+                        "3 Bellevue Drive, Pottstown, PA 19464"),
+                rowOfKind(
+                        RowKind.UPDATE_AFTER,
+                        "Ord#1",
+                        "LineOrd#2",
+                        "SHIP",
+                        "Ord#1",
+                        "LineOrd#2",
+                        "4 Bellevue Drive, Pottstown, PB 19464"));
+    }
+
+    @Tag("miniBatchSize=6")
+    @Test
+    public void testInnerJoinWithNoUniqueKeyCrossBatches() throws Exception {
+        // joinKey is LineOrd
+        // completely duplicate records
+        testHarness.processElement1(
+                insertRecord("Ord#1", "LineOrd#2", "4 Bellevue Drive, 
Pottstown, PB 19464"));
+        testHarness.processElement2(insertRecord("Ord#1", "LineOrd#1", "AIR"));
+        testHarness.processElement1(
+                insertRecord("Ord#1", "LineOrd#2", "4 Bellevue Drive, 
Pottstown, PB 19464"));
+        testHarness.processElement1(
+                deleteRecord("Ord#6", "LineOrd#6", "8 Bellevue Drive, 
Pottstown, PF 19464"));
+        testHarness.processElement2(updateAfterRecord("Ord#1", "LineOrd#2", 
"SHIP"));
+        assertor.shouldEmitNothing(testHarness);
+        testHarness.processElement1(
+                insertRecord("Ord#3", "LineOrd#3", "5 Bellevue Drive, 
Pottstown, PD 19464"));
+        // left state   |    right state
+        // "Ord#1", "LineOrd#2", "4 Bellevue Drive, Pottstown, PB 19464"  |  
"Ord#1", "LineOrd#1",
+        // "AIR"
+        // "Ord#1", "LineOrd#2", "4 Bellevue Drive, Pottstown, PB 19464"  |  
"Ord#1", "LineOrd#2",
+        // "SHIP"
+        // "Ord#3", "LineOrd#3", "5 Bellevue Drive, Pottstown, PD 19464"  |
+        assertor.shouldEmit(
+                testHarness,
+                rowOfKind(
+                        RowKind.INSERT,
+                        "Ord#1",
+                        "LineOrd#2",
+                        "4 Bellevue Drive, Pottstown, PB 19464",
+                        "Ord#1",
+                        "LineOrd#2",
+                        "SHIP"),
+                rowOfKind(
+                        RowKind.INSERT,
+                        "Ord#1",
+                        "LineOrd#2",
+                        "4 Bellevue Drive, Pottstown, PB 19464",
+                        "Ord#1",
+                        "LineOrd#2",
+                        "SHIP"));
+        testHarness.processElement1(
+                updateBeforeRecord("Ord#1", "LineOrd#2", "4 Bellevue Drive, 
Pottstown, PB 19464"));
+        testHarness.processElement1(
+                updateAfterRecord("Ord#1", "LineOrd#2", "4 Bellevue Drive, 
Pottstown, PB 19464"));
+        testHarness.processElement2(insertRecord("Ord#1", "LineOrd#3", "AIR"));
+        testHarness.processElement1(
+                deleteRecord("Ord#6", "LineOrd#1", "8 Bellevue Drive, 
Pottstown, PF 19464"));
+        testHarness.processElement2(deleteRecord("Ord#1", "LineOrd#2", 
"SHIP"));
+        // right state
+        // "Ord#1", "LineOrd#1", "AIR"
+        // "Ord#1", "LineOrd#3", "AIR"
+        testHarness.processElement1(
+                insertRecord("Ord#1", "LineOrd#2", "4 Bellevue Drive, 
Pottstown, PB 19464"));
+        assertor.shouldEmit(
+                testHarness,
+                rowOfKind(
+                        RowKind.INSERT,
+                        "Ord#3",
+                        "LineOrd#3",
+                        "5 Bellevue Drive, Pottstown, PD 19464",
+                        "Ord#1",
+                        "LineOrd#3",
+                        "AIR"),
+                rowOfKind(
+                        RowKind.DELETE,
+                        "Ord#1",
+                        "LineOrd#2",
+                        "4 Bellevue Drive, Pottstown, PB 19464",
+                        "Ord#1",
+                        "LineOrd#2",
+                        "SHIP"),
+                rowOfKind(
+                        RowKind.DELETE,
+                        "Ord#1",
+                        "LineOrd#2",
+                        "4 Bellevue Drive, Pottstown, PB 19464",
+                        "Ord#1",
+                        "LineOrd#2",
+                        "SHIP"),
+                rowOfKind(
+                        RowKind.DELETE,
+                        "Ord#6",
+                        "LineOrd#1",
+                        "8 Bellevue Drive, Pottstown, PF 19464",
+                        "Ord#1",
+                        "LineOrd#1",
+                        "AIR"));
+    }
+
+    /** Outer join only emits INSERT or DELETE Msg. */
+    @Tag("miniBatchSize=10")
+    @Test
+    public void testLeftJoinWithJoinKeyContainsUniqueKey() throws Exception {
+        // joinKey is LineOrd
+        // left fold  || right fold
+        // +I +U / +U +U / +U -D ||  +I -D / +U -U / +I -U
+        testHarness.processElement1(
+                insertRecord(
+                        "Ord#1", "LineOrd#1", "3 Bellevue Drive, Pottstown, PA 
19464")); // left +I
+        testHarness.processElement1(
+                updateAfterRecord(
+                        "Ord#2", "LineOrd#1", "4 Bellevue Drive, Pottstown, PB 
19464")); // left +U
+
+        testHarness.processElement1(
+                insertRecord(
+                        "Ord#i",
+                        "LineOrd#6",
+                        "i6 Bellevue Drive, Pottstown, Pi 19464")); // left  +I
+
+        testHarness.processElement1(
+                updateAfterRecord(
+                        "Ord#3", "LineOrd#x3", "x5 Bellevue Drive, Pottstown, 
PCxx 19464")); // left
+        testHarness.processElement1(
+                deleteRecord(
+                        "Ord#3",
+                        "LineOrd#x3",
+                        "14y0 Bellevue Drive, Pottstown, PJyy 19464")); // left
+
+        testHarness.processElement1(
+                insertRecord(
+                        "Ord#4", "LineOrd#4", "6 Bellevue Drive, Pottstown, PD 
19464")); // left
+        assertor.shouldEmitNothing(testHarness);
+
+        testHarness.processElement2(insertRecord("Ord#X", "LineOrd#2", 
"AIR")); // right
+        testHarness.processElement2(updateBeforeRecord("Ord#X", "LineOrd#2", 
"AIR")); // right
+        // right state is empty
+        testHarness.processElement2(insertRecord("Ord#X", "LineOrd#1", 
"AIR")); // right
+        testHarness.processElement2(insertRecord("Ord#Y", "LineOrd#6", 
"RAILWAY")); // right
+        // left state  |  right state
+        // "Ord#2", "LineOrd#1", "4 Bellevue Drive, Pottstown, PB 19464"  |  
"Ord#X","LineOrd#1",
+        // "AIR"
+        // "Ord#i", "LineOrd#6", "i6 Bellevue Drive, Pottstown, Pi 19464" |  
"Ord#Y","LineOrd#6",
+        // "RAILWAY"
+        // "Ord#4", "LineOrd#4", "6 Bellevue Drive, Pottstown, PD 19464"
+        assertor.shouldEmit(
+                testHarness,
+                rowOfKind(
+                        RowKind.INSERT,
+                        "Ord#i",
+                        "LineOrd#6",
+                        "i6 Bellevue Drive, Pottstown, Pi 19464",
+                        "Ord#Y",
+                        "LineOrd#6",
+                        "RAILWAY"),
+                rowOfKind(
+                        RowKind.INSERT,
+                        "Ord#4",
+                        "LineOrd#4",
+                        "6 Bellevue Drive, Pottstown, PD 19464",
+                        null,
+                        null,
+                        null),
+                rowOfKind(
+                        RowKind.INSERT,
+                        "Ord#2",
+                        "LineOrd#1",
+                        "4 Bellevue Drive, Pottstown, PB 19464",
+                        "Ord#X",
+                        "LineOrd#1",
+                        "AIR"));
+        // left state  |  right state
+        // "Ord#2", "LineOrd#1", "4 Bellevue Drive, Pottstown, PB 19464"  |  
"Ord#X","LineOrd#1",
+        // "AIR"
+        // "Ord#i", "LineOrd#6", "i6 Bellevue Drive, Pottstown, Pi 19464" |  
"Ord#Y","LineOrd#6",
+        // "RAILWAY"
+        // "Ord#4", "LineOrd#4", "6 Bellevue Drive, Pottstown, PD 19464"
+        testHarness.processElement2(
+                updateBeforeRecord("Ord#Y", "LineOrd#6", "RAILWAY")); // right 
-U
+        testHarness.processElement2(updateAfterRecord("Ord#UU", "LineOrd#6", 
"SHIP")); // right  +U
+        testHarness.processElement2(insertRecord("Ord#X", "LineOrd#3", 
"AIR")); // right
+        testHarness.processElement2(insertRecord("Ord#X", "LineOrd#2", 
"AIR")); // right
+        testHarness.processElement2(insertRecord("Ord#X", "LineOrd#7", 
"AIR")); // right
+        testHarness.processElement2(insertRecord("Ord#X", "LineOrd#8", 
"AIR")); // right
+        testHarness.processElement2(insertRecord("Ord#X", "LineOrd#9", 
"AIR")); // right
+        testHarness.processElement2(insertRecord("Ord#X", "LineOrd#10", 
"AIR")); // right
+        testHarness.processElement2(insertRecord("Ord#X", "LineOrd#11", 
"AIR")); // right
+        testHarness.processElement2(updateBeforeRecord("Ord#X", "LineOrd#1", 
"AIR")); // right
+
+        assertor.shouldEmit(
+                testHarness,
+                rowOfKind(
+                        RowKind.UPDATE_BEFORE,
+                        "Ord#i",
+                        "LineOrd#6",
+                        "i6 Bellevue Drive, Pottstown, Pi 19464",
+                        "Ord#Y",
+                        "LineOrd#6",
+                        "RAILWAY"),
+                rowOfKind(
+                        RowKind.INSERT,
+                        "Ord#i",
+                        "LineOrd#6",
+                        "i6 Bellevue Drive, Pottstown, Pi 19464",
+                        "Ord#UU",
+                        "LineOrd#6",
+                        "SHIP"),
+                rowOfKind(
+                        RowKind.UPDATE_BEFORE,
+                        "Ord#2",
+                        "LineOrd#1",
+                        "4 Bellevue Drive, Pottstown, PB 19464",
+                        "Ord#X",
+                        "LineOrd#1",
+                        "AIR"),
+                rowOfKind(
+                        RowKind.INSERT,
+                        "Ord#2",
+                        "LineOrd#1",
+                        "4 Bellevue Drive, Pottstown, PB 19464",
+                        null,
+                        null,
+                        null));
+    }
+
+    /** Outer join only emits INSERT or DELETE Msg. */
+    @Tag("miniBatchSize=4")
+    @Test
+    public void testLeftJoinWithHasUniqueKey() throws Exception {
+        // joinKey is LineOrd and uniqueKey is Ord
+        // +I +U / +I -U / +I -D /+U +U
+        List<StreamRecord<RowData>> records =
+                Arrays.asList(
+                        insertRecord(
+                                "Ord#1",
+                                "LineOrd#1",
+                                "3 Bellevue Drive, Pottstown, PA 19464"), // 2 
 +I -U
+                        insertRecord("Ord#2", "LineOrd#2", "4 Bellevue Drive, 
Pottstown, PB 19464"),
+                        insertRecord(
+                                "Ord#3",
+                                "LineOrd#10",
+                                "5 Bellevue Drive, Pottstown, PC 19464"), // 1 
 +I +U
+                        updateAfterRecord(
+                                "Ord#3",
+                                "LineOrd#10",
+                                "xxx Bellevue Drive, Pottstown, PJ 19464"), // 
1
+                        updateAfterRecord(
+                                "Ord#5", "LineOrd#5", "7 Bellevue Drive, 
Pottstown, PE 19464"),
+                        insertRecord(
+                                "Ord#6",
+                                "LineOrd#5",
+                                "8 Bellevue Drive, Pottstown, PF 19464"), // 3 
+I -D
+                        updateBeforeRecord(
+                                "Ord#1", "LineOrd#1", "3 Bellevue Drive, 
Pottstown, PA 19464"), // 2
+                        deleteRecord(
+                                "Ord#6", "LineOrd#5", "7 Bellevue Drive, 
Pottstown, PE 19464"), // 3
+                        updateAfterRecord(
+                                "Ord#6",
+                                "LineOrd#7",
+                                "8 Bellevue Drive, Pottstown, PF 19464"), // 5 
  +U +U
+                        updateAfterRecord(
+                                "Ord#6",
+                                "LineOrd#7",
+                                "9 Bellevue Drive, Pottstown, PF 19464")); // 5
+
+        for (StreamRecord<RowData> row : records) {
+            testHarness.processElement1(row);
+        }
+        assertor.shouldEmit(
+                testHarness,
+                rowOfKind(
+                        RowKind.INSERT,
+                        "Ord#3",
+                        "LineOrd#10",
+                        "xxx Bellevue Drive, Pottstown, PJ 19464",
+                        null,
+                        null,
+                        null),
+                rowOfKind(
+                        RowKind.INSERT,
+                        "Ord#1",
+                        "LineOrd#1",
+                        "3 Bellevue Drive, Pottstown, PA 19464",
+                        null,
+                        null,
+                        null),
+                rowOfKind(
+                        RowKind.INSERT,
+                        "Ord#2",
+                        "LineOrd#2",
+                        "4 Bellevue Drive, Pottstown, PB 19464",
+                        null,
+                        null,
+                        null),
+                rowOfKind(
+                        RowKind.DELETE,
+                        "Ord#1",
+                        "LineOrd#1",
+                        "3 Bellevue Drive, Pottstown, PA 19464",
+                        null,
+                        null,
+                        null),
+                rowOfKind(
+                        RowKind.INSERT,
+                        "Ord#5",
+                        "LineOrd#5",
+                        "7 Bellevue Drive, Pottstown, PE 19464",
+                        null,
+                        null,
+                        null));
+        // +----------------------------------------------------------------+
+        // |                       left state                               |
+        // |----------------------------------------------------------------|
+        // | "Ord#2","LineOrd#2","4 Bellevue Drive, Pottstown, PB 19464"    |
+        // | "Ord#3","LineOrd#10","xxx Bellevue Drive, Pottstown, PJ 19464" |
+        // | "Ord#5","LineOrd#5","7 Bellevue Drive, Pottstown, PE 19464"    |
+        // | "Ord#6","LineOrd#7","9 Bellevue Drive, Pottstown, PF 19464"    |
+        // +----------------------------------------------------------------+
+
+        records =
+                Arrays.asList(
+                        insertRecord("Ord#5", "LineOrd#2", "SHIP"),
+                        updateAfterRecord("Ord#6", "LineOrd#4", "AIR"),
+                        updateBeforeRecord("Ord#5", "LineOrd#2", "SHIP"), // 
-U +U pattern
+                        updateAfterRecord("Ord#5", "LineOrd#2", "TRUCK"), // 
-U +U pattern
+                        updateAfterRecord("Ord#7", "LineOrd#0", "AIR"),
+                        updateAfterRecord("Ord#8", "LineOrd#11", "AIR"));
+        for (StreamRecord<RowData> row : records) {
+            testHarness.processElement2(row);
+        }
+
+        assertor.shouldEmit(
+                testHarness,
+                rowOfKind(
+                        RowKind.DELETE,
+                        "Ord#2",
+                        "LineOrd#2",
+                        "4 Bellevue Drive, Pottstown, PB 19464",
+                        null,
+                        null,
+                        null),
+                rowOfKind(
+                        RowKind.INSERT,
+                        "Ord#2",
+                        "LineOrd#2",
+                        "4 Bellevue Drive, Pottstown, PB 19464",
+                        "Ord#5",
+                        "LineOrd#2",
+                        "SHIP"),
+                rowOfKind(
+                        RowKind.INSERT,
+                        "Ord#6",
+                        "LineOrd#7",
+                        "9 Bellevue Drive, Pottstown, PF 19464",
+                        null,
+                        null,
+                        null),
+                rowOfKind(
+                        RowKind.UPDATE_BEFORE,
+                        "Ord#2",
+                        "LineOrd#2",
+                        "4 Bellevue Drive, Pottstown, PB 19464",
+                        "Ord#5",
+                        "LineOrd#2",
+                        "SHIP"),
+                rowOfKind(
+                        RowKind.INSERT,
+                        "Ord#2",
+                        "LineOrd#2",
+                        "4 Bellevue Drive, Pottstown, PB 19464",
+                        "Ord#5",
+                        "LineOrd#2",
+                        "TRUCK"));
+    }
+
+    @Tag("miniBatchSize=2")
+    @Test
+    public void testLeftJoinHasUniqueKeyRetAndAcc() throws Exception {
+        // this case would create buffer of JoinHasUniqueKey
+        testLeftJoinWithUpdate();
+    }
+
+    @Tag("miniBatchSize=2")
+    @Test
+    public void testLeftJoinJoinKeyContainsUniqueKeyRetAndAcc() throws 
Exception {
+        // this case would create buffer of JoinKeyContainsUniqueKey
+        testLeftJoinWithUpdate();
+    }
+
+    @Tag("miniBatchSize=2")
+    @Test
+    public void testLeftJoinHasUniqueKeyWithoutRetract() throws Exception {
+        // this case would create buffer of JoinHasUniqueKey
+        testLeftJoinWithoutRetract();
+    }
+
+    @Tag("miniBatchSize=2")
+    @Test
+    public void testLeftJoinJoinKeyContainsUniqueKeyWithoutRetract() throws 
Exception {
+        // this case would create buffer of JoinKeyContainsUniqueKey
+        testLeftJoinWithoutRetract();
+    }
+
+    @Tag("miniBatchSize=2")
+    @Test
+    public void testLeftJoinJoinKeyContainsUniqueKeyWithoutAcc() throws 
Exception {
+        // this case would create buffer of JoinKeyContainsUniqueKey
+        testLeftJoinWithoutAcc();
+    }
+
+    @Tag("miniBatchSize=2")
+    @Test
+    public void testLeftJoinHasUniqueKeyWithoutAcc() throws Exception {
+        // this case would create buffer of JoinKeyContainsUniqueKey
+        testLeftJoinWithoutAcc();
+    }
+
+    @Tag("miniBatchSize=4")
+    @Test
+    public void testLeftJoinHasUniqueKeyWithUpdateMultipleCases() throws 
Exception {
+        // this case would create buffer of JoinKeyContainsUniqueKey
+        testLeftJoinWithUpdateRecordsMultipleCases();
+    }
+
+    @Tag("miniBatchSize=4")
+    @Test
+    public void testLeftJoinJoinKeyContainsUniqueKeyWithUpdateMultipleCases() 
throws Exception {
+        // this case would create buffer of JoinKeyContainsUniqueKey
+        testLeftJoinWithUpdateRecordsMultipleCases();
+    }
+
+    @Tag("miniBatchSize=4")
+    @Test
+    public void testRightJoinWithHasUniqueKey() throws Exception {
+        List<StreamRecord<RowData>> records =
+                Arrays.asList(
+                        insertRecord(
+                                "Ord#1",
+                                "LineOrd#1",
+                                "3 Bellevue Drive, Pottstown, PA 19464"), // 2 
 +I -U
+                        insertRecord("Ord#2", "LineOrd#2", "4 Bellevue Drive, 
Pottstown, PB 19464"),
+                        updateAfterRecord(
+                                "Ord#5", "LineOrd#5", "7 Bellevue Drive, 
Pottstown, PE 19464"),
+                        insertRecord(
+                                "Ord#6",
+                                "LineOrd#5",
+                                "8 Bellevue Drive, Pottstown, PF 19464"), // 3 
+I -D
+                        updateBeforeRecord(
+                                "Ord#1", "LineOrd#1", "3 Bellevue Drive, 
Pottstown, PA 19464"), // 2
+                        deleteRecord(
+                                "Ord#6",
+                                "LineOrd#5",
+                                "7 Bellevue Drive, Pottstown, PE 19464")); // 3
+
+        for (StreamRecord<RowData> row : records) {
+            testHarness.processElement1(row);
+        }
+        assertor.shouldEmitNothing(testHarness);
+        // +----------------------------------------------------------------+
+        // |                       left state                               |
+        // |----------------------------------------------------------------|
+        // | "Ord#2","LineOrd#2","4 Bellevue Drive, Pottstown, PB 19464"    |
+        // | "Ord#5","LineOrd#5","7 Bellevue Drive, Pottstown, PE 19464"    |
+        // +----------------------------------------------------------------+
+        records =
+                Arrays.asList(
+                        insertRecord("Ord#5", "LineOrd#2", "SHIP"),
+                        updateAfterRecord("Ord#6", "LineOrd#4", "AIR"),
+                        updateBeforeRecord("Ord#5", "LineOrd#2", "SHIP"),
+                        updateAfterRecord("Ord#5", "LineOrd#2", "TRUCK"),
+                        updateAfterRecord("Ord#7", "LineOrd#0", "AIR"),
+                        updateAfterRecord("Ord#8", "LineOrd#11", "AIR"));
+        for (StreamRecord<RowData> row : records) {
+            testHarness.processElement2(row);
+        }
+        assertor.shouldEmit(
+                testHarness,
+                rowOfKind(RowKind.INSERT, null, null, null, "Ord#6", 
"LineOrd#4", "AIR"),
+                rowOfKind(
+                        RowKind.INSERT,
+                        "Ord#2",
+                        "LineOrd#2",
+                        "4 Bellevue Drive, Pottstown, PB 19464",
+                        "Ord#5",
+                        "LineOrd#2",
+                        "SHIP"),
+                rowOfKind(RowKind.INSERT, null, null, null, "Ord#7", 
"LineOrd#0", "AIR"),
+                rowOfKind(RowKind.INSERT, null, null, null, "Ord#8", 
"LineOrd#11", "AIR"),
+                rowOfKind(
+                        RowKind.DELETE,
+                        "Ord#2",
+                        "LineOrd#2",
+                        "4 Bellevue Drive, Pottstown, PB 19464",
+                        "Ord#5",
+                        "LineOrd#2",
+                        "SHIP"),
+                rowOfKind(
+                        RowKind.INSERT,
+                        "Ord#2",
+                        "LineOrd#2",
+                        "4 Bellevue Drive, Pottstown, PB 19464",
+                        "Ord#5",
+                        "LineOrd#2",
+                        "TRUCK"));
+    }
+
+    @Tag("miniBatchSize=4")
+    @Test
+    public void testFullJoinWithHasUniqueKey() throws Exception {
+        List<StreamRecord<RowData>> records =
+                Arrays.asList(
+                        insertRecord(
+                                "Ord#1",
+                                "LineOrd#1",
+                                "3 Bellevue Drive, Pottstown, PA 19464"), // 2 
 +I -U
+                        insertRecord("Ord#2", "LineOrd#2", "4 Bellevue Drive, 
Pottstown, PB 19464"),
+                        updateAfterRecord(
+                                "Ord#5", "LineOrd#5", "7 Bellevue Drive, 
Pottstown, PE 19464"),
+                        insertRecord(
+                                "Ord#6",
+                                "LineOrd#5",
+                                "8 Bellevue Drive, Pottstown, PF 19464"), // 3 
+I -D
+                        updateBeforeRecord(
+                                "Ord#1", "LineOrd#1", "3 Bellevue Drive, 
Pottstown, PA 19464"), // 2
+                        deleteRecord(
+                                "Ord#6",
+                                "LineOrd#5",
+                                "7 Bellevue Drive, Pottstown, PE 19464")); // 3
+
+        for (StreamRecord<RowData> row : records) {
+            testHarness.processElement1(row);
+        }
+        assertor.shouldEmit(
+                testHarness,
+                rowOfKind(
+                        RowKind.INSERT,
+                        "Ord#1",
+                        "LineOrd#1",
+                        "3 Bellevue Drive, Pottstown, PA 19464",
+                        null,
+                        null,
+                        null),
+                rowOfKind(
+                        RowKind.INSERT,
+                        "Ord#6",
+                        "LineOrd#5",
+                        "8 Bellevue Drive, Pottstown, PF 19464",
+                        null,
+                        null,
+                        null),
+                rowOfKind(
+                        RowKind.INSERT,
+                        "Ord#5",
+                        "LineOrd#5",
+                        "7 Bellevue Drive, Pottstown, PE 19464",
+                        null,
+                        null,
+                        null),
+                rowOfKind(
+                        RowKind.INSERT,
+                        "Ord#2",
+                        "LineOrd#2",
+                        "4 Bellevue Drive, Pottstown, PB 19464",
+                        null,
+                        null,
+                        null));
+        // +----------------------------------------------------------------+
+        // |                       left state                               |
+        // |----------------------------------------------------------------|
+        // | "Ord#2","LineOrd#2","4 Bellevue Drive, Pottstown, PB 19464"    |
+        // | "Ord#5","LineOrd#5","7 Bellevue Drive, Pottstown, PE 19464"    |
+        // +----------------------------------------------------------------+
+
+        records =
+                Arrays.asList(
+                        insertRecord("Ord#5", "LineOrd#2", "SHIP"),
+                        updateAfterRecord("Ord#6", "LineOrd#4", "AIR"),
+                        updateBeforeRecord("Ord#5", "LineOrd#2", "SHIP"), // 
-U +U pattern
+                        updateAfterRecord("Ord#5", "LineOrd#2", "TRUCK"), // 
-U +U pattern
+                        updateAfterRecord("Ord#7", "LineOrd#0", "AIR"),
+                        updateAfterRecord("Ord#8", "LineOrd#11", "AIR"));
+        for (StreamRecord<RowData> row : records) {
+            testHarness.processElement2(row);
+        }
+        assertor.shouldEmit(
+                testHarness,
+                rowOfKind(RowKind.INSERT, null, null, null, "Ord#6", 
"LineOrd#4", "AIR"),
+                rowOfKind(
+                        RowKind.DELETE,
+                        "Ord#2",
+                        "LineOrd#2",
+                        "4 Bellevue Drive, Pottstown, PB 19464",
+                        null,
+                        null,
+                        null),
+                rowOfKind(
+                        RowKind.INSERT,
+                        "Ord#2",
+                        "LineOrd#2",
+                        "4 Bellevue Drive, Pottstown, PB 19464",
+                        "Ord#5",
+                        "LineOrd#2",
+                        "SHIP"),
+                rowOfKind(
+                        RowKind.DELETE,
+                        "Ord#1",
+                        "LineOrd#1",
+                        "3 Bellevue Drive, Pottstown, PA 19464",
+                        null,
+                        null,
+                        null),
+                rowOfKind(
+                        RowKind.DELETE,
+                        "Ord#6",
+                        "LineOrd#5",
+                        "7 Bellevue Drive, Pottstown, PE 19464",
+                        null,
+                        null,
+                        null),
+                rowOfKind(RowKind.INSERT, null, null, null, "Ord#7", 
"LineOrd#0", "AIR"),
+                rowOfKind(RowKind.INSERT, null, null, null, "Ord#8", 
"LineOrd#11", "AIR"),
+                rowOfKind(
+                        RowKind.DELETE,
+                        "Ord#2",
+                        "LineOrd#2",
+                        "4 Bellevue Drive, Pottstown, PB 19464",
+                        "Ord#5",
+                        "LineOrd#2",
+                        "SHIP"),
+                rowOfKind(
+                        RowKind.INSERT,
+                        "Ord#2",
+                        "LineOrd#2",
+                        "4 Bellevue Drive, Pottstown, PB 19464",
+                        "Ord#5",
+                        "LineOrd#2",
+                        "TRUCK"));
+    }
+
+    @Tag("miniBatchSize=15")
+    @Test
+    public void testLeftJoinWithNoUniqueKey() throws Exception {
+        // joinKey is LineOrd
+        // +I -U / +I -D / -U +U / -D +I
+        List<StreamRecord<RowData>> records =
+                Arrays.asList(
+                        insertRecord("Ord#1", "LineOrd#1", "3 Bellevue Drive, 
Pottstown, PA 19464"),
+                        insertRecord("Ord#1", "LineOrd#2", "4 Bellevue Drive, 
Pottstown, PB 19464"),
+                        insertRecord(
+                                "Ord#1",
+                                "LineOrd#2",
+                                "4 Bellevue Drive, Pottstown, PB 19464"), // 
2x    +I -D
+                        deleteRecord("Ord#6", "LineOrd#6", "8 Bellevue Drive, 
Pottstown, PF 19464"),
+                        insertRecord(
+                                "Ord#3",
+                                "LineOrd#3",
+                                "5 Bellevue Drive, Pottstown, PD 19464"), // x 
 +I -U
+                        updateBeforeRecord(
+                                "Ord#3", "LineOrd#3", "5 Bellevue Drive, 
Pottstown, PD 19464"), // x
+                        updateBeforeRecord(
+                                "Ord#9",
+                                "LineOrd#9",
+                                "11 Bellevue Drive, Pottstown, PI 19464"), // 
3x   -U +U
+                        updateAfterRecord(
+                                "Ord#10", "LineOrd#10", "14 Bellevue Drive, 
Pottstown, PJ 19464"),
+                        deleteRecord(
+                                "Ord#1",
+                                "LineOrd#2",
+                                "4 Bellevue Drive, Pottstown, PB 19464"), // 2x
+                        updateAfterRecord(
+                                "Ord#9",
+                                "LineOrd#9",
+                                "11 Bellevue Drive, Pottstown, PI 19464"), // 
3x
+                        deleteRecord(
+                                "Ord#6",
+                                "LineOrd#6",
+                                "8 Bellevue Drive, Pottstown, PF 19464"), // 
4x   -D +I
+                        insertRecord(
+                                "Ord#6", "LineOrd#6", "8 Bellevue Drive, 
Pottstown, PF 19464") // 4x
+                        );
+
+        for (StreamRecord<RowData> row : records) {
+            testHarness.processElement2(row);
+        }
+        // +------------------------------------------------------------------+
+        // |                       right state                                |
+        // |------------------------------------------------------------------|
+        // | "Ord#1", "LineOrd#1", "3 Bellevue Drive, Pottstown, PA 19464"    |
+        // | "Ord#1", "LineOrd#2", "4 Bellevue Drive, Pottstown, PB 19464"    |
+        // | "Ord#10", "LineOrd#10", "14 Bellevue Drive, Pottstown, PJ 19464" |
+        // +------------------------------------------------------------------+
+        records =
+                Arrays.asList(
+                        insertRecord("Ord#1", "LineOrd#1", "AIR"),
+                        updateAfterRecord("Ord#1", "LineOrd#3", "SHIP"),
+                        deleteRecord("Ord#6", "LineOrd#6", "RAILWAY"));
+
+        for (StreamRecord<RowData> row : records) {
+            testHarness.processElement1(row);
+        }
+        assertor.shouldEmit(
+                testHarness,
+                rowOfKind(RowKind.DELETE, "Ord#6", "LineOrd#6", "RAILWAY", 
null, null, null),
+                rowOfKind(RowKind.INSERT, "Ord#1", "LineOrd#3", "SHIP", null, 
null, null),
+                rowOfKind(
+                        RowKind.INSERT,
+                        "Ord#1",
+                        "LineOrd#1",
+                        "AIR",
+                        "Ord#1",
+                        "LineOrd#1",
+                        "3 Bellevue Drive, Pottstown, PA 19464"));
+    }
+
+    /** Special for the pair of retract and accumulate. */
+    private void testLeftJoinWithUpdate() throws Exception {
+        // joinKey is LineOrd and uniqueKey is Ord
+        List<StreamRecord<RowData>> records =
+                Collections.singletonList(
+                        insertRecord(
+                                "Ord#2", "LineOrd#2", "4 Bellevue Drive, 
Pottstown, PB 19464"));
+        for (StreamRecord<RowData> row : records) {
+            testHarness.processElement1(row);
+        }
+        assertor.shouldEmitNothing(testHarness);
+        records =
+                Arrays.asList(
+                        insertRecord("Ord#2", "LineOrd#2", "SHIP"),
+                        updateBeforeRecord("Ord#2", "LineOrd#2", "SHIP"),
+                        updateAfterRecord("Ord#2", "LineOrd#2", "AIR"));
+        for (StreamRecord<RowData> row : records) {
+            testHarness.processElement2(row);
+        }
+        assertor.shouldEmit(
+                testHarness,
+                rowOfKind(
+                        RowKind.INSERT,
+                        "Ord#2",
+                        "LineOrd#2",
+                        "4 Bellevue Drive, Pottstown, PB 19464",
+                        "Ord#2",
+                        "LineOrd#2",
+                        "SHIP"),
+                rowOfKind(
+                        RowKind.UPDATE_BEFORE,
+                        "Ord#2",
+                        "LineOrd#2",
+                        "4 Bellevue Drive, Pottstown, PB 19464",
+                        "Ord#2",
+                        "LineOrd#2",
+                        "SHIP"),
+                rowOfKind(
+                        RowKind.INSERT,
+                        "Ord#2",
+                        "LineOrd#2",
+                        "4 Bellevue Drive, Pottstown, PB 19464",
+                        "Ord#2",
+                        "LineOrd#2",
+                        "AIR"));
+    }
+
+    private void testLeftJoinWithoutRetract() throws Exception {
+        // joinKey is LineOrd and uniqueKey is Ord
+        List<StreamRecord<RowData>> records =
+                Arrays.asList(
+                        insertRecord("Ord#2", "LineOrd#2", "4 Bellevue Drive, 
Pottstown, PB 19464"),
+                        updateAfterRecord(
+                                "Ord#2", "LineOrd#2", "5 Bellevue Drive, 
Pottstown, PC 19464"));
+        for (StreamRecord<RowData> row : records) {
+            testHarness.processElement1(row);
+        }
+        assertor.shouldEmit(
+                testHarness,
+                rowOfKind(
+                        RowKind.INSERT,
+                        "Ord#2",
+                        "LineOrd#2",
+                        "5 Bellevue Drive, Pottstown, PC 19464",
+                        null,
+                        null,
+                        null));
+        records =
+                Arrays.asList(
+                        updateAfterRecord("Ord#2", "LineOrd#2", "SHIP"),
+                        updateAfterRecord("Ord#2", "LineOrd#2", "AIR"));
+        for (StreamRecord<RowData> row : records) {
+            testHarness.processElement2(row);
+        }
+        assertor.shouldEmit(
+                testHarness,
+                rowOfKind(
+                        RowKind.DELETE,
+                        "Ord#2",
+                        "LineOrd#2",
+                        "5 Bellevue Drive, Pottstown, PC 19464",
+                        null,
+                        null,
+                        null),
+                rowOfKind(
+                        RowKind.INSERT,
+                        "Ord#2",
+                        "LineOrd#2",
+                        "5 Bellevue Drive, Pottstown, PC 19464",
+                        "Ord#2",
+                        "LineOrd#2",
+                        "AIR"));
+    }
+
+    private void testLeftJoinWithoutAcc() throws Exception {
+        // joinKey is LineOrd and uniqueKey is Ord
+        List<StreamRecord<RowData>> records =
+                Arrays.asList(
+                        updateBeforeRecord(
+                                "Ord#2", "LineOrd#2", "4 Bellevue Drive, 
Pottstown, PB 19464"),
+                        deleteRecord(
+                                "Ord#2", "LineOrd#2", "5 Bellevue Drive, 
Pottstown, PC 19464"));
+        for (StreamRecord<RowData> row : records) {
+            testHarness.processElement1(row);
+        }
+        assertor.shouldEmit(
+                testHarness,
+                rowOfKind(
+                        RowKind.DELETE,
+                        "Ord#2",
+                        "LineOrd#2",
+                        "5 Bellevue Drive, Pottstown, PC 19464",
+                        null,
+                        null,
+                        null));
+        records =
+                Arrays.asList(
+                        deleteRecord("Ord#2", "LineOrd#2", "SHIP"),
+                        updateBeforeRecord("Ord#2", "LineOrd#2", "AIR"));
+        for (StreamRecord<RowData> row : records) {
+            testHarness.processElement2(row);
+        }
+        assertor.shouldEmitNothing(testHarness);
+    }
+
+    private void testLeftJoinWithUpdateRecordsMultipleCases() throws Exception 
{
+        // joinKey is LineOrd and uniqueKey is Ord
+        List<StreamRecord<RowData>> records =
+                Arrays.asList(
+                        insertRecord("Ord#2", "LineOrd#2", "4 Bellevue Drive, 
Pottstown, PB 19464"),
+                        updateAfterRecord(
+                                "Ord#5", "LineOrd#5", "7 Bellevue Drive, 
Pottstown, PE 19464"),
+                        insertRecord("Ord#0", "LineOrd#4", "5 Bellevue Drive, 
Pottstown, PB 19464"),
+                        updateAfterRecord(
+                                "Ord#4", "LineOrd#0", "6 Bellevue Drive, 
Pottstown, PB 19464"));
+
+        for (StreamRecord<RowData> row : records) {
+            testHarness.processElement1(row);
+        }
+
+        assertor.shouldEmit(
+                testHarness,
+                rowOfKind(
+                        RowKind.INSERT,
+                        "Ord#4",
+                        "LineOrd#0",
+                        "6 Bellevue Drive, Pottstown, PB 19464",
+                        null,
+                        null,
+                        null),
+                rowOfKind(
+                        RowKind.INSERT,
+                        "Ord#0",
+                        "LineOrd#4",
+                        "5 Bellevue Drive, Pottstown, PB 19464",
+                        null,
+                        null,
+                        null),
+                rowOfKind(
+                        RowKind.INSERT,
+                        "Ord#5",
+                        "LineOrd#5",
+                        "7 Bellevue Drive, Pottstown, PE 19464",
+                        null,
+                        null,
+                        null),
+                rowOfKind(
+                        RowKind.INSERT,
+                        "Ord#2",
+                        "LineOrd#2",
+                        "4 Bellevue Drive, Pottstown, PB 19464",
+                        null,
+                        null,
+                        null));
+
+        // only +U and the joinKey changes
+        records =
+                Arrays.asList(
+                        insertRecord("Ord#6", "LineOrd#4", "AIR"),
+                        // +I 6 joinKey=4
+                        updateAfterRecord("Ord#6", "LineOrd#4", "AIR"),
+                        // +U 6 joinKey=4
+                        updateAfterRecord("Ord#6", "LineOrd#5", "AIR"),
+                        // +U 6 joinKey=5  not expected record
+                        updateAfterRecord("Ord#6", "LineOrd#4", "TRUCK")
+                        // +U 6 joinKey=4
+                        );
+        for (StreamRecord<RowData> row : records) {
+            testHarness.processElement2(row);
+        }
+
+        assertor.shouldEmit(
+                testHarness,
+                rowOfKind(
+                        RowKind.DELETE,
+                        "Ord#0",
+                        "LineOrd#4",
+                        "5 Bellevue Drive, Pottstown, PB 19464",
+                        null,
+                        null,
+                        null),
+                rowOfKind(
+                        RowKind.INSERT,
+                        "Ord#0",
+                        "LineOrd#4",
+                        "5 Bellevue Drive, Pottstown, PB 19464",
+                        "Ord#6",
+                        "LineOrd#4",
+                        "TRUCK"),
+                rowOfKind(
+                        RowKind.DELETE,
+                        "Ord#5",
+                        "LineOrd#5",
+                        "7 Bellevue Drive, Pottstown, PE 19464",
+                        null,
+                        null,
+                        null),
+                rowOfKind(
+                        RowKind.INSERT,
+                        "Ord#5",
+                        "LineOrd#5",
+                        "7 Bellevue Drive, Pottstown, PE 19464",
+                        "Ord#6",
+                        "LineOrd#5",
+                        "AIR"));
+
+        // -D +I  update records
+        records =
+                Arrays.asList(
+                        deleteRecord("Ord#6", "LineOrd#4", "TRUCK"),
+                        // +I 6 joinKey=4
+                        insertRecord("Ord#6", "LineOrd#4", "TRUCK2"),
+                        // +U 6 joinKey=4
+                        deleteRecord("Ord#6", "LineOrd#4", "TRUCK3"),
+                        // +U 6 joinKey=5  not expected record
+                        insertRecord("Ord#6", "LineOrd#4", "AIR")
+                        // +U 6 joinKey=4
+                        );
+        for (StreamRecord<RowData> row : records) {
+            testHarness.processElement2(row);
+        }
+
+        assertor.shouldEmit(
+                testHarness,
+                rowOfKind(
+                        RowKind.DELETE,
+                        "Ord#0",
+                        "LineOrd#4",
+                        "5 Bellevue Drive, Pottstown, PB 19464",
+                        "Ord#6",
+                        "LineOrd#4",
+                        "TRUCK"),
+                rowOfKind(
+                        RowKind.INSERT,
+                        "Ord#0",
+                        "LineOrd#4",
+                        "5 Bellevue Drive, Pottstown, PB 19464",
+                        "Ord#6",
+                        "LineOrd#4",
+                        "AIR"));
+
+        // -U +U disOrder
+        records =
+                Arrays.asList(
+                        insertRecord("Ord#5", "LineOrd#2", "SHIP"), // +I 5
+                        updateAfterRecord("Ord#5", "LineOrd#2", "TRUCK"), // 
+U 5 disorder
+                        updateBeforeRecord("Ord#5", "LineOrd#2", "SHIP"), // 
-U 5 disorder
+                        updateAfterRecord("Ord#9", "LineOrd#7", "TRUCK"));
+        for (StreamRecord<RowData> row : records) {
+            testHarness.processElement2(row);
+        }
+        assertor.shouldEmitNothing(testHarness);
+    }
+
+    @Override
+    public MiniBatchStreamingJoinOperator createJoinOperator(TestInfo 
testInfo) {
+        RowDataKeySelector[] keySelectors = 
ukSelectorExtractor.apply(testInfo.getDisplayName());
+        leftUniqueKeySelector = keySelectors[0];
+        rightUniqueKeySelector = keySelectors[1];
+        JoinInputSideSpec[] inputSideSpecs = 
inputSpecExtractor.apply(testInfo.getDisplayName());
+        Boolean[] isOuter = joinTypeExtractor.apply(testInfo.getDisplayName());
+        FlinkJoinType joinType = 
flinkJoinTypeExtractor.apply(testInfo.getDisplayName());
+        int batchSize = miniBatchSizeExtractor.apply(testInfo.getTags());
+        Long[] ttl = STATE_RETENTION_TIME_EXTRACTOR.apply(testInfo.getTags());
+
+        return MiniBatchStreamingJoinOperator.newMiniBatchStreamJoinOperator(
+                joinType,
+                leftTypeInfo,
+                rightTypeInfo,
+                joinCondition,
+                inputSideSpecs[0],
+                inputSideSpecs[1],
+                isOuter[0],
+                isOuter[1],
+                new boolean[] {true},
+                ttl[0],
+                ttl[0],
+                new CountCoBundleTrigger<>(batchSize));
+    }
+
+    @Override
+    public RowType getOutputType() {
+        return RowType.of(
+                Stream.concat(
+                                
leftTypeInfo.toRowType().getChildren().stream(),
+                                
rightTypeInfo.toRowType().getChildren().stream())
+                        .toArray(LogicalType[]::new),
+                Stream.concat(
+                                
leftTypeInfo.toRowType().getFieldNames().stream(),
+                                
rightTypeInfo.toRowType().getFieldNames().stream())
+                        .toArray(String[]::new));
+    }
+
+    private final Function<String, JoinInputSideSpec[]> inputSpecExtractor =
+            (testDisplayName) -> {
+                if (testDisplayName.contains("JoinKeyContainsUniqueKey")) {
+                    return new JoinInputSideSpec[] {
+                        JoinInputSideSpec.withUniqueKeyContainedByJoinKey(
+                                leftTypeInfo, leftUniqueKeySelector),
+                        JoinInputSideSpec.withUniqueKeyContainedByJoinKey(
+                                rightTypeInfo, rightUniqueKeySelector)
+                    };
+                } else if (testDisplayName.contains("HasUniqueKey")) {
+                    return new JoinInputSideSpec[] {
+                        JoinInputSideSpec.withUniqueKey(leftTypeInfo, 
leftUniqueKeySelector),
+                        JoinInputSideSpec.withUniqueKey(rightTypeInfo, 
rightUniqueKeySelector)
+                    };
+                } else {
+                    return new JoinInputSideSpec[] {
+                        JoinInputSideSpec.withoutUniqueKey(), 
JoinInputSideSpec.withoutUniqueKey()
+                    };
+                }
+            };
+
+    private final Function<String, RowDataKeySelector[]> ukSelectorExtractor =
+            (testDisplayName) -> {
+                if (testDisplayName.contains("JoinKeyContainsUniqueKey")) {
+                    return new RowDataKeySelector[] {
+                        HandwrittenSelectorUtil.getRowDataSelector(
+                                new int[] {1},
+                                
leftTypeInfo.toRowType().getChildren().toArray(new LogicalType[0])),
+                        HandwrittenSelectorUtil.getRowDataSelector(
+                                new int[] {1},
+                                
rightTypeInfo.toRowType().getChildren().toArray(new LogicalType[0]))
+                    };
+                } else if (testDisplayName.contains("HasUniqueKey")) {
+                    return new RowDataKeySelector[] {
+                        HandwrittenSelectorUtil.getRowDataSelector(
+                                new int[] {0},
+                                
leftTypeInfo.toRowType().getChildren().toArray(new LogicalType[0])),
+                        HandwrittenSelectorUtil.getRowDataSelector(
+                                new int[] {0},
+                                
rightTypeInfo.toRowType().getChildren().toArray(new LogicalType[0]))
+                    };
+                } else {
+                    return new RowDataKeySelector[] {null, null};
+                }
+            };
+
+    private final Function<Set<String>, Integer> miniBatchSizeExtractor =
+            (tags) -> {
+                int size = 5;
+                if (tags.isEmpty()) {
+                    return size; // default
+                }
+                for (String tag : tags) {
+                    String[] splits = tag.split("=");
+                    int value = Integer.parseInt(splits[1].trim());
+                    if (splits[0].trim().startsWith("miniBatchSize")) {
+                        size = value;
+                        break;
+                    }
+                }
+                return size;
+            };
+
+    private final Function<String, Boolean[]> joinTypeExtractor =
+            (testDisplayName) -> {
+                if (testDisplayName.contains("InnerJoin")) {
+                    return new Boolean[] {false, false};
+                } else if (testDisplayName.contains("LeftJoin")) {
+                    return new Boolean[] {true, false};
+                } else if (testDisplayName.contains("RightJoin")) {
+                    return new Boolean[] {false, true};
+                } else {
+                    return new Boolean[] {true, true};
+                }
+            };
+
+    private final Function<String, FlinkJoinType> flinkJoinTypeExtractor =
+            (testDisplayName) -> {
+                if (testDisplayName.contains("InnerJoin")) {
+                    return FlinkJoinType.INNER;
+                } else if (testDisplayName.contains("LeftJoin")) {
+                    return FlinkJoinType.LEFT;
+                } else if (testDisplayName.contains("RightJoin")) {
+                    return FlinkJoinType.RIGHT;
+                } else {
+                    return FlinkJoinType.FULL;
+                }
+            };
+}

Reply via email to