This is an automated email from the ASF dual-hosted git repository.

xuyangzhong pushed a commit to branch release-2.2
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-2.2 by this push:
     new ef31ad4086f [FLINK-38695][table-planner] Fix wrong metric about left 
cache request count in DeltaJoinCache (#27253)
ef31ad4086f is described below

commit ef31ad4086fd6ef1d67fa41c4e9c245d5e54299e
Author: Xuyang <[email protected]>
AuthorDate: Thu Nov 20 17:23:31 2025 +0800

    [FLINK-38695][table-planner] Fix wrong metric about left cache request 
count in DeltaJoinCache (#27253)
---
 .../operators/join/deltajoin/DeltaJoinCache.java   |  17 +--
 .../join/deltajoin/DeltaJoinCacheTest.java         | 157 +++++++++++++++++++++
 2 files changed, 166 insertions(+), 8 deletions(-)

diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/deltajoin/DeltaJoinCache.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/deltajoin/DeltaJoinCache.java
index ffaca5736bd..9a10edd2626 100644
--- 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/deltajoin/DeltaJoinCache.java
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/deltajoin/DeltaJoinCache.java
@@ -41,16 +41,17 @@ import java.util.concurrent.atomic.AtomicLong;
  * <p>Note: This cache is not thread-safe although its inner {@link Cache} is 
thread-safe.
  */
 @NotThreadSafe
+@VisibleForTesting
 public class DeltaJoinCache {
 
-    private static final String LEFT_CACHE_METRIC_PREFIX = 
"deltaJoin.leftCache.";
-    private static final String RIGHT_CACHE_METRIC_PREFIX = 
"deltaJoin.rightCache.";
+    protected static final String LEFT_CACHE_METRIC_PREFIX = 
"deltaJoin.leftCache.";
+    protected static final String RIGHT_CACHE_METRIC_PREFIX = 
"deltaJoin.rightCache.";
 
-    private static final String METRIC_HIT_RATE = "hitRate";
-    private static final String METRIC_REQUEST_COUNT = "requestCount";
-    private static final String METRIC_HIT_COUNT = "hitCount";
-    private static final String METRIC_KEY_SIZE = "keySize";
-    private static final String METRIC_TOTAL_NON_EMPTY_VALUE_SIZE = 
"totalNonEmptyValues";
+    protected static final String METRIC_HIT_RATE = "hitRate";
+    protected static final String METRIC_REQUEST_COUNT = "requestCount";
+    protected static final String METRIC_HIT_COUNT = "hitCount";
+    protected static final String METRIC_KEY_SIZE = "keySize";
+    protected static final String METRIC_TOTAL_NON_EMPTY_VALUE_SIZE = 
"totalNonEmptyValues";
 
     // use LinkedHashMap to keep order
     private final Cache<RowData, LinkedHashMap<RowData, Object>> leftCache;
@@ -87,7 +88,7 @@ public class DeltaJoinCache {
                                 : 
Long.valueOf(leftHitCount.get()).doubleValue()
                                         / leftRequestCount.get());
         metricGroup.<Long, Gauge<Long>>gauge(
-                LEFT_CACHE_METRIC_PREFIX + METRIC_REQUEST_COUNT, 
rightRequestCount::get);
+                LEFT_CACHE_METRIC_PREFIX + METRIC_REQUEST_COUNT, 
leftRequestCount::get);
         metricGroup.<Long, Gauge<Long>>gauge(
                 LEFT_CACHE_METRIC_PREFIX + METRIC_HIT_COUNT, 
leftHitCount::get);
         metricGroup.<Long, Gauge<Long>>gauge(
diff --git 
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/deltajoin/DeltaJoinCacheTest.java
 
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/deltajoin/DeltaJoinCacheTest.java
new file mode 100644
index 00000000000..7c23e794ca5
--- /dev/null
+++ 
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/deltajoin/DeltaJoinCacheTest.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.join.deltajoin;
+
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.Metric;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.testutils.junit.extensions.parameterized.Parameter;
+import 
org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension;
+import org.apache.flink.testutils.junit.extensions.parameterized.Parameters;
+import org.apache.flink.util.function.TriConsumer;
+
+import org.apache.flink.shaded.guava33.com.google.common.collect.Maps;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.TestTemplate;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.BiConsumer;
+
+import static 
org.apache.flink.table.runtime.operators.join.deltajoin.DeltaJoinCache.LEFT_CACHE_METRIC_PREFIX;
+import static 
org.apache.flink.table.runtime.operators.join.deltajoin.DeltaJoinCache.METRIC_HIT_COUNT;
+import static 
org.apache.flink.table.runtime.operators.join.deltajoin.DeltaJoinCache.METRIC_HIT_RATE;
+import static 
org.apache.flink.table.runtime.operators.join.deltajoin.DeltaJoinCache.METRIC_KEY_SIZE;
+import static 
org.apache.flink.table.runtime.operators.join.deltajoin.DeltaJoinCache.METRIC_REQUEST_COUNT;
+import static 
org.apache.flink.table.runtime.operators.join.deltajoin.DeltaJoinCache.METRIC_TOTAL_NON_EMPTY_VALUE_SIZE;
+import static 
org.apache.flink.table.runtime.operators.join.deltajoin.DeltaJoinCache.RIGHT_CACHE_METRIC_PREFIX;
+import static org.apache.flink.table.runtime.util.StreamRecordUtils.row;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link DeltaJoinCache}. */
+@ExtendWith(ParameterizedTestExtension.class)
+class DeltaJoinCacheTest {
+
+    private static final Long LEFT_CACHE_SIZE = 3L;
+    private static final Long RIGHT_CACHE_SIZE = 2L;
+
+    @Parameters(name = "testRightCache = {0}")
+    private static List<Boolean> parameters() {
+        return Arrays.asList(false, true);
+    }
+
+    @Parameter private boolean testRightCache;
+
+    private DeltaJoinCache cache;
+    private Runnable requestCacheFunc;
+    private Runnable hitCacheFunc;
+    private BiConsumer<RowData, LinkedHashMap<RowData, Object>> buildCacheFunc;
+    private TriConsumer<RowData, RowData, Object> upsertCacheFunc;
+
+    @BeforeEach
+    void before() {
+        cache = new DeltaJoinCache(LEFT_CACHE_SIZE, RIGHT_CACHE_SIZE);
+
+        requestCacheFunc =
+                () -> {
+                    if (testRightCache) {
+                        cache.requestRightCache();
+                    } else {
+                        cache.requestLeftCache();
+                    }
+                };
+        hitCacheFunc =
+                () -> {
+                    if (testRightCache) {
+                        cache.hitRightCache();
+                    } else {
+                        cache.hitLeftCache();
+                    }
+                };
+        buildCacheFunc = (key, ukDataMap) -> cache.buildCache(key, ukDataMap, 
testRightCache);
+        upsertCacheFunc = (key, uk, data) -> cache.upsertCache(key, uk, data, 
testRightCache);
+    }
+
+    @TestTemplate
+    void testReportMetrics() {
+        Map<String, Metric> allMetrics = new HashMap<>();
+        cache.registerMetrics(
+                new UnregisteredMetricGroups.UnregisteredOperatorMetricGroup() 
{
+                    @Override
+                    protected void addMetric(String name, Metric metric) {
+                        allMetrics.put(name, metric);
+                        super.addMetric(name, metric);
+                    }
+                });
+
+        assertReportMetricsInternal(allMetrics, 0, 0, 0.0, 0, 0);
+        requestCacheFunc.run();
+        assertReportMetricsInternal(allMetrics, 1, 0, 0.0, 0, 0);
+        hitCacheFunc.run();
+        assertReportMetricsInternal(allMetrics, 1, 1, 1.0, 0, 0);
+        requestCacheFunc.run();
+        assertReportMetricsInternal(allMetrics, 2, 1, 0.5, 0, 0);
+
+        buildCacheFunc.accept(row("ck1"), Maps.newLinkedHashMap());
+        assertReportMetricsInternal(allMetrics, 2, 1, 0.5, 1, 0);
+        buildCacheFunc.accept(
+                row("ck2"),
+                Maps.newLinkedHashMap(Map.of(row("pk1"), 1, row("pk2"), 2, 
row("pk3"), 3)));
+        assertReportMetricsInternal(allMetrics, 2, 1, 0.5, 2, 3);
+        upsertCacheFunc.accept(row("ck1"), row("pk4"), 4);
+        assertReportMetricsInternal(allMetrics, 2, 1, 0.5, 2, 4);
+    }
+
+    @SuppressWarnings("unchecked")
+    private void assertReportMetricsInternal(
+            Map<String, Metric> actualAllMetrics,
+            long expectedRequestCount,
+            long expectedHitCount,
+            double expectedHitRate,
+            long expectedKeySize,
+            long expectedNonEmptyValueSize) {
+        String prefix = testRightCache ? RIGHT_CACHE_METRIC_PREFIX : 
LEFT_CACHE_METRIC_PREFIX;
+
+        String hitRate = prefix + METRIC_HIT_RATE;
+        assertThat(((Gauge<Double>) actualAllMetrics.get(hitRate)).getValue())
+                .isEqualTo(expectedHitRate);
+
+        String requestCount = prefix + METRIC_REQUEST_COUNT;
+        assertThat(((Gauge<Long>) 
actualAllMetrics.get(requestCount)).getValue())
+                .isEqualTo(expectedRequestCount);
+
+        String hitCount = prefix + METRIC_HIT_COUNT;
+        assertThat(((Gauge<Long>) actualAllMetrics.get(hitCount)).getValue())
+                .isEqualTo(expectedHitCount);
+
+        String keySize = prefix + METRIC_KEY_SIZE;
+        assertThat(((Gauge<Long>) actualAllMetrics.get(keySize)).getValue())
+                .isEqualTo(expectedKeySize);
+
+        String totalNonEmptyValueSize = prefix + 
METRIC_TOTAL_NON_EMPTY_VALUE_SIZE;
+        assertThat(((Gauge<Long>) 
actualAllMetrics.get(totalNonEmptyValueSize)).getValue())
+                .isEqualTo(expectedNonEmptyValueSize);
+    }
+}

Reply via email to