fhueske commented on code in PR #28329: URL: https://github.com/apache/flink/pull/28329#discussion_r3481055379
########## flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/snapshot/LateralSnapshotJoinOperatorTest.java: ########## @@ -0,0 +1,1666 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.operators.join.snapshot; + +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; +import org.apache.flink.runtime.state.VoidNamespace; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus; +import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness; +import org.apache.flink.streaming.util.KeyedTwoInputStreamOperatorTestHarness; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.data.binary.BinaryRowData; +import org.apache.flink.table.data.writer.BinaryRowWriter; +import org.apache.flink.table.runtime.generated.GeneratedJoinCondition; +import org.apache.flink.table.runtime.operators.join.snapshot.LateralSnapshotJoinOperator.Phase; +import org.apache.flink.table.runtime.typeutils.InternalTypeInfo; +import org.apache.flink.table.runtime.util.RowDataHarnessAssertor; +import org.apache.flink.table.types.logical.BigIntType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.VarCharType; +import org.apache.flink.types.RowKind; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.CsvSource; +import org.junit.jupiter.params.provider.ValueSource; + +import java.util.ArrayList; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentLinkedQueue; + +import static org.apache.flink.table.runtime.operators.join.snapshot.LateralSnapshotJoinOperator.BUILD_CHANGE_BUFFER_STATE_NAME; +import static org.apache.flink.table.runtime.operators.join.snapshot.LateralSnapshotJoinOperator.BUILD_TABLE_STATE_NAME; +import static org.apache.flink.table.runtime.operators.join.snapshot.LateralSnapshotJoinOperator.PROBE_BUFFER_STATE_NAME; +import static org.apache.flink.table.runtime.util.StreamRecordUtils.deleteRecord; +import static org.apache.flink.table.runtime.util.StreamRecordUtils.insertRecord; +import static org.apache.flink.table.runtime.util.StreamRecordUtils.updateAfterRecord; +import static org.apache.flink.table.runtime.util.StreamRecordUtils.updateBeforeRecord; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.within; + +/** Harness tests for {@link LateralSnapshotJoinOperator}. */ +class LateralSnapshotJoinOperatorTest { + + // ----------------------------------------------------------------- Schema + + /** Probe row schema: (id BIGINT, key VARCHAR, val VARCHAR). */ + private static final InternalTypeInfo<RowData> PROBE_TYPE = + InternalTypeInfo.ofFields( + new BigIntType(), VarCharType.STRING_TYPE, VarCharType.STRING_TYPE); + + /** Build row schema: (key VARCHAR, val VARCHAR, rt BIGINT). */ + private static final InternalTypeInfo<RowData> BUILD_TYPE = + InternalTypeInfo.ofFields( + VarCharType.STRING_TYPE, VarCharType.STRING_TYPE, new BigIntType()); + + /** Joined output schema: probe ++ build = (id, pKey, pVal, bKey, bVal, bRt). */ + private static final LogicalType[] OUTPUT_TYPES = { + new BigIntType(), + VarCharType.STRING_TYPE, + VarCharType.STRING_TYPE, + VarCharType.STRING_TYPE, + VarCharType.STRING_TYPE, + new BigIntType() + }; + + /** Probe key column index (key VARCHAR is at field 1). */ + private static final int PROBE_KEY_IDX = 1; + + /** Build key column index (key VARCHAR is at field 0). */ + private static final int BUILD_KEY_IDX = 0; + + /** Build row-time column index (rt BIGINT is at field 2). */ + private static final int BUILD_RT_IDX = 2; + + private static final InternalTypeInfo<RowData> KEY_TYPE = + InternalTypeInfo.ofFields(VarCharType.STRING_TYPE); + + private static final KeySelector<RowData, RowData> PROBE_KEY_SELECTOR = + nullSafeStringKeySelector(PROBE_KEY_IDX); + private static final KeySelector<RowData, RowData> BUILD_KEY_SELECTOR = + nullSafeStringKeySelector(BUILD_KEY_IDX); + + private static final RowDataHarnessAssertor JOINED_ASSERTOR = + new RowDataHarnessAssertor(OUTPUT_TYPES); + + // ----------------------------------------------------------------- Join conditions + + /** Trivial join condition that always matches (equality is enforced by partitioning). */ + private static final String ALWAYS_TRUE_JOIN_FUNC_CODE = + "public class LateralSnapshotJoinConditionStub extends " + + "org.apache.flink.api.common.functions.AbstractRichFunction " + + "implements org.apache.flink.table.runtime.generated.JoinCondition {\n" + + " public LateralSnapshotJoinConditionStub(Object[] reference) {}\n" + + " @Override public boolean apply(" + + " org.apache.flink.table.data.RowData in1," + + " org.apache.flink.table.data.RowData in2) { return true; }\n" + + "}\n"; + + /** + * Join condition that only matches when the probe value (field 2) equals {@code "match"}. Used + * to verify that the codegen'd condition is actually invoked at join time. + */ + private static final String MATCH_VAL_JOIN_FUNC_CODE = + "public class LateralSnapshotJoinConditionMatchVal extends " + + "org.apache.flink.api.common.functions.AbstractRichFunction " + + "implements org.apache.flink.table.runtime.generated.JoinCondition {\n" + + " public LateralSnapshotJoinConditionMatchVal(Object[] reference) {}\n" + + " @Override public boolean apply(" + + " org.apache.flink.table.data.RowData in1," + + " org.apache.flink.table.data.RowData in2) {\n" + + " if (in1.isNullAt(2)) { return false; }\n" + + " return \"match\".equals(in1.getString(2).toString());\n" + + " }\n" + + "}\n"; + + private static GeneratedJoinCondition newTrueCondition() { + return new GeneratedJoinCondition( + "LateralSnapshotJoinConditionStub", ALWAYS_TRUE_JOIN_FUNC_CODE, new Object[0]); + } + + private static GeneratedJoinCondition newMatchValCondition() { + return new GeneratedJoinCondition( + "LateralSnapshotJoinConditionMatchVal", MATCH_VAL_JOIN_FUNC_CODE, new Object[0]); + } + + // ----------------------------------------------------------------- Operator / harness + // factories + + private static LateralSnapshotJoinOperator newOperator( + boolean isLeftOuterJoin, + GeneratedJoinCondition joinCondition, + boolean[] filterNullKeys, + Long loadCompletedTime, + Long loadCompletedIdleTimeoutMs, + Long stateTtlMs) { + + return new LateralSnapshotJoinOperator( + isLeftOuterJoin, + PROBE_TYPE, + BUILD_TYPE, + BUILD_RT_IDX, + joinCondition, + filterNullKeys, + loadCompletedTime, + loadCompletedIdleTimeoutMs, + stateTtlMs); + } + + private static LateralSnapshotJoinOperator newOperator( + boolean isLeftOuterJoin, + Long loadCompletedTime, + Long loadCompletedIdleTimeoutMs, + Long stateTtlMs) { + + return newOperator( + isLeftOuterJoin, + newTrueCondition(), + new boolean[] {true}, + loadCompletedTime, + loadCompletedIdleTimeoutMs, + stateTtlMs); + } + + private static KeyedTwoInputStreamOperatorTestHarness<RowData, RowData, RowData, RowData> + newHarness(LateralSnapshotJoinOperator op) throws Exception { + return new KeyedTwoInputStreamOperatorTestHarness<>( + op, PROBE_KEY_SELECTOR, BUILD_KEY_SELECTOR, KEY_TYPE); + } + + private static KeySelector<RowData, RowData> nullSafeStringKeySelector(final int keyIdx) { + return value -> { + BinaryRowData ret = new BinaryRowData(1); + BinaryRowWriter writer = new BinaryRowWriter(ret); + if (value.isNullAt(keyIdx)) { + writer.setNullAt(0); + } else { + writer.writeString(0, value.getString(keyIdx)); + } + writer.complete(); + return ret; + }; + } + + // ----------------------------------------------------------------- LOAD phase + + @Test + void loadPhaseBuildSideChangeProcessing() throws Exception { + LateralSnapshotJoinOperator op = newOperator(false, 100L, null, null); + try (KeyedTwoInputStreamOperatorTestHarness<RowData, RowData, RowData, RowData> h = + newHarness(op)) { + h.open(); + // During LOAD, build-side changes are buffered and later applied in event-time order. + // -D for a never-inserted (key, value) pair is defensively ignored. + addBuildChange(h, deleteRecord("k1", "ghost", 5L)); + // Two identical records (same key/val/row-time) → count(k1, v1, 20) = 2. + addBuildChange(h, insertRecord("k1", "v1", 20L)); + addBuildChange(h, insertRecord("k1", "v1", 20L)); + // Earlier row-time than v1, but arrives later. + addBuildChange(h, insertRecord("k1", "v2", 10L)); + + // Still LOAD: changes are buffered, nothing applied or emitted yet. + assertPhase(op, Phase.LOAD); + assertThat(h.getOutput()).isEmpty(); + assertThat(bufferedChangesForKey(h, op, "k1")).hasSize(4); + assertThat(buildTableKeys(h)).isEmpty(); + + // Advance the build watermark (still below the flip point) and access k1 again. The + // access drains the buffered batch in event-time order before buffering the new change. + addBuildWm(h, 50L); + addBuildChange(h, insertRecord("k1", "v3", 30L)); + // TODO: also add updateBefore and updateAfter changes, to ensure that these are handled Review Comment: Thanks for catching this! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
