This is an automated email from the ASF dual-hosted git repository.

zhuzh pushed a commit to branch release-2.2
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-2.2 by this push:
     new d1da51a3ac3 [FLINK-39150][runtime] Fix join operator crashes jobs when 
using custom types or custom type serializers
d1da51a3ac3 is described below

commit d1da51a3ac3d589e68e2588eb348abd75023989e
Author: noorall <[email protected]>
AuthorDate: Fri Feb 27 14:25:15 2026 +0800

    [FLINK-39150][runtime] Fix join operator crashes jobs when using custom 
types or custom type serializers
---
 .../flink-end-to-end-tests-table-api/pom.xml       |  26 +++++
 .../table/test/join/JoinWithCustomTypeExample.java |  76 ++++++++++++
 .../apache/flink/table/test/join/TestClass.java    |  22 ++++
 flink-end-to-end-tests/run-nightly-tests.sh        |   2 +
 .../test-scripts/test_table_api.sh                 |  37 ++++++
 .../flink/table/planner/loader/PlannerModule.java  |   3 +-
 .../adaptive/AdaptiveJoinOperatorGenerator.java    |  79 ++-----------
 .../nodes/exec/batch/BatchExecAdaptiveJoin.java    |  43 +++++--
 .../AdaptiveJoinOperatorGeneratorTest.java         |  20 ++--
 .../operators/join/adaptive/AdaptiveJoin.java      |   4 +-
 .../join/adaptive/AdaptiveJoinGenerator.java       |  50 ++++++++
 .../join/adaptive/AdaptiveJoinOperatorFactory.java | 130 ++++++++++++++-------
 12 files changed, 360 insertions(+), 132 deletions(-)

diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-table-api/pom.xml 
b/flink-end-to-end-tests/flink-end-to-end-tests-table-api/pom.xml
index d8a6fbc8cc1..2e9afd9f7de 100644
--- a/flink-end-to-end-tests/flink-end-to-end-tests-table-api/pom.xml
+++ b/flink-end-to-end-tests/flink-end-to-end-tests-table-api/pom.xml
@@ -104,4 +104,30 @@ under the License.
                </dependency>
 
     </dependencies>
+
+       <build>
+               <plugins>
+                       <plugin>
+                               <groupId>org.apache.maven.plugins</groupId>
+                               <artifactId>maven-shade-plugin</artifactId>
+                               <executions>
+                                       <execution>
+                                               
<id>JoinWithCustomTypeExample</id>
+                                               <phase>package</phase>
+                                               <goals>
+                                                       <goal>shade</goal>
+                                               </goals>
+                                               <configuration>
+                                                       
<finalName>JoinWithCustomTypeExample</finalName>
+                                                       <transformers>
+                                                               <transformer 
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
+                                                                       
<mainClass>org.apache.flink.table.test.join.JoinWithCustomTypeExample</mainClass>
+                                                               </transformer>
+                                                       </transformers>
+                                               </configuration>
+                                       </execution>
+                               </executions>
+                       </plugin>
+               </plugins>
+       </build>
 </project> 
diff --git 
a/flink-end-to-end-tests/flink-end-to-end-tests-table-api/src/main/java/org/apache/flink/table/test/join/JoinWithCustomTypeExample.java
 
b/flink-end-to-end-tests/flink-end-to-end-tests-table-api/src/main/java/org/apache/flink/table/test/join/JoinWithCustomTypeExample.java
new file mode 100644
index 00000000000..a598a72f915
--- /dev/null
+++ 
b/flink-end-to-end-tests/flink-end-to-end-tests-table-api/src/main/java/org/apache/flink/table/test/join/JoinWithCustomTypeExample.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.test.join;
+
+import org.apache.flink.api.common.RuntimeExecutionMode;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.java.typeutils.PojoTypeInfo;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.v2.DiscardingSink;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.types.Row;
+
+import java.util.ArrayList;
+
+import static org.apache.flink.table.api.Expressions.$;
+
+/** Example application that tests JoinWithCustomType with Table API. */
+public class JoinWithCustomTypeExample {
+
+    private static final TypeInformation<Integer> INT = Types.INT;
+
+    public static void main(String[] args) throws Exception {
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setRuntimeMode(RuntimeExecutionMode.BATCH);
+
+        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
+
+        runTest(env, tEnv);
+    }
+
+    private static void runTest(StreamExecutionEnvironment env, 
StreamTableEnvironment tEnv)
+            throws Exception {
+        System.out.println("Running join with custom type test...");
+
+        TestClass value = new TestClass();
+        TypeInformation<TestClass> valueTypeInfo =
+                new PojoTypeInfo<>(TestClass.class, new ArrayList<>());
+
+        Table table1 =
+                tEnv.fromDataStream(
+                        env.fromData(Row.of(1)).returns(Types.ROW_NAMED(new 
String[] {"id"}, INT)));
+
+        Table table2 =
+                tEnv.fromDataStream(
+                        env.fromData(Row.of(1, value))
+                                .returns(
+                                        Types.ROW_NAMED(
+                                                new String[] {"id2", "value"},
+                                                INT,
+                                                valueTypeInfo)));
+
+        tEnv.toDataStream(table1.leftOuterJoin(table2, 
$("id").isEqual($("id2"))))
+                .sinkTo(new DiscardingSink<>());
+
+        env.execute("joinWithCustomType");
+        System.out.println("Job joinWithCustomType completed successfully!");
+    }
+}
diff --git 
a/flink-end-to-end-tests/flink-end-to-end-tests-table-api/src/main/java/org/apache/flink/table/test/join/TestClass.java
 
b/flink-end-to-end-tests/flink-end-to-end-tests-table-api/src/main/java/org/apache/flink/table/test/join/TestClass.java
new file mode 100644
index 00000000000..dad5872c0fd
--- /dev/null
+++ 
b/flink-end-to-end-tests/flink-end-to-end-tests-table-api/src/main/java/org/apache/flink/table/test/join/TestClass.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.test.join;
+
+/** Custom class for testing join with custom type. */
+public class TestClass {}
diff --git a/flink-end-to-end-tests/run-nightly-tests.sh 
b/flink-end-to-end-tests/run-nightly-tests.sh
index 92304ef616a..fe47e8e0635 100755
--- a/flink-end-to-end-tests/run-nightly-tests.sh
+++ b/flink-end-to-end-tests/run-nightly-tests.sh
@@ -223,6 +223,8 @@ function run_group_2 {
     #     run_test "PyFlink YARN application on Docker test" 
"$END_TO_END_DIR/test-scripts/test_pyflink_yarn.sh" "skip_check_exceptions"
     # fi
 
+    run_test "Flink Table API with custom type end-to-end test" 
"$END_TO_END_DIR/test-scripts/test_table_api.sh"
+
     
################################################################################
     # Sticky Scheduling
     
################################################################################
diff --git a/flink-end-to-end-tests/test-scripts/test_table_api.sh 
b/flink-end-to-end-tests/test-scripts/test_table_api.sh
new file mode 100755
index 00000000000..450eaa17e30
--- /dev/null
+++ b/flink-end-to-end-tests/test-scripts/test_table_api.sh
@@ -0,0 +1,37 @@
+#!/usr/bin/env bash
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+# End to end test for join with custom type examples. It only verifies that 
the job can be submitted
+# and run correctly.
+#
+# Usage:
+# FLINK_DIR=<flink dir> flink-end-to-end-tests/test-scripts/test_table_api.sh
+
+source "$(dirname "$0")"/common.sh
+
+start_cluster
+
+TEST_PROGRAM_JAR=${END_TO_END_DIR}/flink-end-to-end-tests-table-api/target/JoinWithCustomTypeExample.jar
+
+$FLINK_DIR/bin/flink run -p 1 $TEST_PROGRAM_JAR
+EXIT_CODE=$?
+
+stop_cluster
+
+exit $EXIT_CODE
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/planner/loader/PlannerModule.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/planner/loader/PlannerModule.java
index 289a3f23fa6..1a8375b5039 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/planner/loader/PlannerModule.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/planner/loader/PlannerModule.java
@@ -34,6 +34,7 @@ import org.apache.flink.util.IOUtils;
 import java.io.IOException;
 import java.io.InputStream;
 import java.net.URL;
+import java.net.URLClassLoader;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.Paths;
@@ -135,7 +136,7 @@ public class PlannerModule {
         }
     }
 
-    public ClassLoader getSubmoduleClassLoader() {
+    public URLClassLoader getSubmoduleClassLoader() {
         return this.submoduleClassLoader;
     }
 
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/adaptive/AdaptiveJoinOperatorGenerator.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/adaptive/AdaptiveJoinOperatorGenerator.java
index 78981a5e8ce..b69220399d4 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/adaptive/AdaptiveJoinOperatorGenerator.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/adaptive/AdaptiveJoinOperatorGenerator.java
@@ -20,30 +20,21 @@ package org.apache.flink.table.planner.adaptive;
 import org.apache.flink.configuration.ReadableConfig;
 import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
 import org.apache.flink.table.planner.plan.utils.HashJoinOperatorUtil;
-import org.apache.flink.table.planner.plan.utils.OperatorType;
 import org.apache.flink.table.planner.plan.utils.SorMergeJoinOperatorUtil;
 import org.apache.flink.table.runtime.generated.GeneratedJoinCondition;
 import org.apache.flink.table.runtime.operators.join.FlinkJoinType;
-import org.apache.flink.table.runtime.operators.join.adaptive.AdaptiveJoin;
+import 
org.apache.flink.table.runtime.operators.join.adaptive.AdaptiveJoinGenerator;
 import org.apache.flink.table.types.logical.RowType;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import static org.apache.flink.util.Preconditions.checkState;
-
 /**
- * Implementation class for {@link AdaptiveJoin}. It can selectively generate 
broadcast hash join,
- * shuffle hash join or shuffle merge join operator based on actual conditions.
+ * Implementation class for {@link AdaptiveJoinGenerator}. It can selectively 
generate broadcast
+ * hash join, shuffle hash join or shuffle merge join operator based on actual 
conditions.
  */
-public class AdaptiveJoinOperatorGenerator implements AdaptiveJoin {
-    private static final Logger LOG = 
LoggerFactory.getLogger(AdaptiveJoinOperatorGenerator.class);
+public class AdaptiveJoinOperatorGenerator implements AdaptiveJoinGenerator {
     private final int[] leftKeys;
 
     private final int[] rightKeys;
 
-    private final FlinkJoinType joinType;
-
     private final boolean[] filterNulls;
 
     private final RowType leftType;
@@ -64,18 +55,9 @@ public class AdaptiveJoinOperatorGenerator implements 
AdaptiveJoin {
 
     private final long managedMemory;
 
-    private final OperatorType originalJoin;
-
-    private boolean leftIsBuild;
-
-    private boolean originalLeftIsBuild;
-
-    private boolean isBroadcastJoin;
-
     public AdaptiveJoinOperatorGenerator(
             int[] leftKeys,
             int[] rightKeys,
-            FlinkJoinType joinType,
             boolean[] filterNulls,
             RowType leftType,
             RowType rightType,
@@ -85,12 +67,9 @@ public class AdaptiveJoinOperatorGenerator implements 
AdaptiveJoin {
             long leftRowCount,
             long rightRowCount,
             boolean tryDistinctBuildRow,
-            long managedMemory,
-            boolean leftIsBuild,
-            OperatorType originalJoin) {
+            long managedMemory) {
         this.leftKeys = leftKeys;
         this.rightKeys = rightKeys;
-        this.joinType = joinType;
         this.filterNulls = filterNulls;
         this.leftType = leftType;
         this.rightType = rightType;
@@ -101,23 +80,17 @@ public class AdaptiveJoinOperatorGenerator implements 
AdaptiveJoin {
         this.rightRowCount = rightRowCount;
         this.tryDistinctBuildRow = tryDistinctBuildRow;
         this.managedMemory = managedMemory;
-        checkState(
-                originalJoin == OperatorType.ShuffleHashJoin
-                        || originalJoin == OperatorType.SortMergeJoin,
-                String.format(
-                        "Adaptive join "
-                                + "currently only supports adaptive 
optimization for ShuffleHashJoin and "
-                                + "SortMergeJoin, not including %s.",
-                        originalJoin.toString()));
-        this.leftIsBuild = leftIsBuild;
-        this.originalLeftIsBuild = leftIsBuild;
-        this.originalJoin = originalJoin;
     }
 
     @Override
     public StreamOperatorFactory<?> genOperatorFactory(
-            ClassLoader classLoader, ReadableConfig config) {
-        if (isBroadcastJoin || originalJoin == OperatorType.ShuffleHashJoin) {
+            ClassLoader classLoader,
+            ReadableConfig config,
+            FlinkJoinType joinType,
+            boolean originIsSortMergeJoin,
+            boolean isBroadcastJoin,
+            boolean leftIsBuild) {
+        if (isBroadcastJoin || !originIsSortMergeJoin) {
             return HashJoinOperatorUtil.generateOperatorFactory(
                     leftKeys,
                     rightKeys,
@@ -150,32 +123,4 @@ public class AdaptiveJoinOperatorGenerator implements 
AdaptiveJoin {
                     classLoader);
         }
     }
-
-    @Override
-    public FlinkJoinType getJoinType() {
-        return joinType;
-    }
-
-    @Override
-    public void markAsBroadcastJoin(boolean canBroadcast, boolean leftIsBuild) 
{
-        this.isBroadcastJoin = canBroadcast;
-        this.leftIsBuild = leftIsBuild;
-    }
-
-    @Override
-    public boolean shouldReorderInputs() {
-        // Sort merge join requires the left side to be read first if the 
broadcast threshold is not
-        // met.
-        if (!isBroadcastJoin && originalJoin == OperatorType.SortMergeJoin) {
-            return false;
-        }
-
-        if (leftIsBuild != originalLeftIsBuild) {
-            LOG.info(
-                    "The build side of the adaptive join has been updated. 
Compile phase build side: {}, Runtime build side: {}.",
-                    originalLeftIsBuild ? "left" : "right",
-                    leftIsBuild ? "left" : "right");
-        }
-        return !leftIsBuild;
-    }
 }
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecAdaptiveJoin.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecAdaptiveJoin.java
index bf1122bde86..a103a8e349b 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecAdaptiveJoin.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecAdaptiveJoin.java
@@ -19,6 +19,7 @@
 package org.apache.flink.table.planner.plan.nodes.exec.batch;
 
 import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.ReadableConfig;
 import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
 import org.apache.flink.table.api.TableException;
@@ -36,7 +37,7 @@ import 
org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil;
 import org.apache.flink.table.planner.plan.utils.JoinUtil;
 import org.apache.flink.table.planner.plan.utils.OperatorType;
 import org.apache.flink.table.runtime.generated.GeneratedJoinCondition;
-import org.apache.flink.table.runtime.operators.join.adaptive.AdaptiveJoin;
+import org.apache.flink.table.runtime.operators.join.FlinkJoinType;
 import 
org.apache.flink.table.runtime.operators.join.adaptive.AdaptiveJoinOperatorFactory;
 import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
 import org.apache.flink.table.types.logical.RowType;
@@ -45,6 +46,8 @@ import org.apache.flink.util.InstantiationUtil;
 import java.io.IOException;
 import java.util.List;
 
+import static org.apache.flink.util.Preconditions.checkState;
+
 /** {@link BatchExecNode} for adaptive join. */
 public class BatchExecAdaptiveJoin extends ExecNodeBase<RowData>
         implements BatchExecNode<RowData>, 
SingleTransformationTranslator<RowData> {
@@ -87,6 +90,14 @@ public class BatchExecAdaptiveJoin extends 
ExecNodeBase<RowData>
         this.leftIsBuild = leftIsBuild;
         this.tryDistinctBuildRow = tryDistinctBuildRow;
         this.description = description;
+        checkState(
+                originalJoin == OperatorType.ShuffleHashJoin
+                        || originalJoin == OperatorType.SortMergeJoin,
+                String.format(
+                        "Adaptive join "
+                                + "currently only supports adaptive 
optimization for ShuffleHashJoin and "
+                                + "SortMergeJoin, not including %s.",
+                        originalJoin.toString()));
         this.originalJoin = originalJoin;
     }
 
@@ -113,11 +124,10 @@ public class BatchExecAdaptiveJoin extends 
ExecNodeBase<RowData>
                         leftType,
                         rightType);
 
-        AdaptiveJoinOperatorGenerator adaptiveJoin =
+        AdaptiveJoinOperatorGenerator adaptiveJoinGenerator =
                 new AdaptiveJoinOperatorGenerator(
                         joinSpec.getLeftKeys(),
                         joinSpec.getRightKeys(),
-                        joinSpec.getJoinType(),
                         joinSpec.getFilterNulls(),
                         leftType,
                         rightType,
@@ -127,16 +137,19 @@ public class BatchExecAdaptiveJoin extends 
ExecNodeBase<RowData>
                         estimatedLeftRowCount,
                         estimatedRightRowCount,
                         tryDistinctBuildRow,
-                        managedMemory,
-                        leftIsBuild,
-                        originalJoin);
+                        managedMemory);
 
         return ExecNodeUtil.createTwoInputTransformation(
                 leftInputTransform,
                 rightInputTransform,
                 createTransformationName(config),
                 createTransformationDescription(config),
-                getAdaptiveJoinOperatorFactory(adaptiveJoin),
+                getAdaptiveJoinOperatorFactory(
+                        adaptiveJoinGenerator,
+                        config.get(CoreOptions.CHECK_LEAKED_CLASSLOADER),
+                        joinSpec.getJoinType(),
+                        originalJoin,
+                        leftIsBuild),
                 InternalTypeInfo.of(getOutputType()),
                 // Given that the probe side might be decided at runtime, we 
choose the larger
                 // parallelism here.
@@ -146,10 +159,20 @@ public class BatchExecAdaptiveJoin extends 
ExecNodeBase<RowData>
     }
 
     private StreamOperatorFactory<RowData> getAdaptiveJoinOperatorFactory(
-            AdaptiveJoin adaptiveJoin) {
+            AdaptiveJoinOperatorGenerator adaptiveJoinGenerator,
+            boolean checkClassLoaderLeak,
+            FlinkJoinType joinType,
+            OperatorType originalJoin,
+            boolean leftIsBuild) {
         try {
-            byte[] adaptiveJoinSerialized = 
InstantiationUtil.serializeObject(adaptiveJoin);
-            return new AdaptiveJoinOperatorFactory<>(adaptiveJoinSerialized);
+            byte[] adaptiveJoinGeneratorSerialized =
+                    InstantiationUtil.serializeObject(adaptiveJoinGenerator);
+            return new AdaptiveJoinOperatorFactory<>(
+                    adaptiveJoinGeneratorSerialized,
+                    joinType,
+                    originalJoin == OperatorType.SortMergeJoin,
+                    leftIsBuild,
+                    checkClassLoaderLeak);
         } catch (IOException e) {
             throw new TableException("The adaptive join operator failed to 
serialize.", e);
         }
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/adaptive/AdaptiveJoinOperatorGeneratorTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/adaptive/AdaptiveJoinOperatorGeneratorTest.java
index 3baa5030cbe..d554465993b 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/adaptive/AdaptiveJoinOperatorGeneratorTest.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/adaptive/AdaptiveJoinOperatorGeneratorTest.java
@@ -28,7 +28,7 @@ import 
org.apache.flink.table.runtime.operators.join.FlinkJoinType;
 import org.apache.flink.table.runtime.operators.join.HashJoinOperator;
 import 
org.apache.flink.table.runtime.operators.join.Int2HashJoinOperatorTestBase;
 import org.apache.flink.table.runtime.operators.join.SortMergeJoinOperator;
-import org.apache.flink.table.runtime.operators.join.adaptive.AdaptiveJoin;
+import 
org.apache.flink.table.runtime.operators.join.adaptive.AdaptiveJoinGenerator;
 import org.apache.flink.table.runtime.util.UniformBinaryRowGenerator;
 import org.apache.flink.table.types.logical.IntType;
 import org.apache.flink.table.types.logical.RowType;
@@ -268,10 +268,15 @@ class AdaptiveJoinOperatorGeneratorTest extends 
Int2HashJoinOperatorTestBase {
             boolean buildLeft,
             boolean isBroadcast,
             OperatorType operatorType) {
-        AdaptiveJoin adaptiveJoin = genAdaptiveJoin(flinkJoinType, 
operatorType);
-        adaptiveJoin.markAsBroadcastJoin(isBroadcast, buildLeft);
+        AdaptiveJoinGenerator adaptiveJoinGenerator = 
genAdaptiveJoinGenerator();
 
-        return adaptiveJoin.genOperatorFactory(getClass().getClassLoader(), 
new Configuration());
+        return adaptiveJoinGenerator.genOperatorFactory(
+                getClass().getClassLoader(),
+                new Configuration(),
+                flinkJoinType,
+                operatorType == SortMergeJoin,
+                isBroadcast,
+                buildLeft);
     }
 
     public void assertOperatorType(Object operator, OperatorType 
expectedOperatorType) {
@@ -301,7 +306,7 @@ class AdaptiveJoinOperatorGeneratorTest extends 
Int2HashJoinOperatorTestBase {
         }
     }
 
-    public AdaptiveJoin genAdaptiveJoin(FlinkJoinType flinkJoinType, 
OperatorType operatorType) {
+    public AdaptiveJoinGenerator genAdaptiveJoinGenerator() {
         GeneratedJoinCondition condFuncCode =
                 new GeneratedJoinCondition(
                         
Int2HashJoinOperatorTestBase.MyJoinCondition.class.getCanonicalName(),
@@ -316,7 +321,6 @@ class AdaptiveJoinOperatorGeneratorTest extends 
Int2HashJoinOperatorTestBase {
         return new AdaptiveJoinOperatorGenerator(
                 new int[] {0},
                 new int[] {0},
-                flinkJoinType,
                 new boolean[] {true},
                 RowType.of(new IntType(), new IntType()),
                 RowType.of(new IntType(), new IntType()),
@@ -326,8 +330,6 @@ class AdaptiveJoinOperatorGeneratorTest extends 
Int2HashJoinOperatorTestBase {
                 20,
                 10000,
                 false,
-                TABLE_EXEC_RESOURCE_HASH_JOIN_MEMORY.defaultValue().getBytes(),
-                true,
-                operatorType);
+                
TABLE_EXEC_RESOURCE_HASH_JOIN_MEMORY.defaultValue().getBytes());
     }
 }
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/adaptive/AdaptiveJoin.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/adaptive/AdaptiveJoin.java
index 0d289ae9639..9ca97590389 100755
--- 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/adaptive/AdaptiveJoin.java
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/adaptive/AdaptiveJoin.java
@@ -32,11 +32,11 @@ public interface AdaptiveJoin extends Serializable {
      * Generates a StreamOperatorFactory for this join operator using the 
provided ClassLoader and
      * config.
      *
-     * @param classLoader the ClassLoader to be used for loading classes.
+     * @param userClassLoader the user ClassLoader to be used for loading 
classes.
      * @param config the configuration to be applied for creating the operator 
factory.
      * @return a StreamOperatorFactory instance.
      */
-    StreamOperatorFactory<?> genOperatorFactory(ClassLoader classLoader, 
ReadableConfig config);
+    StreamOperatorFactory<?> genOperatorFactory(ClassLoader userClassLoader, 
ReadableConfig config);
 
     /**
      * Get the join type of the join operator.
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/adaptive/AdaptiveJoinGenerator.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/adaptive/AdaptiveJoinGenerator.java
new file mode 100644
index 00000000000..02496ebb7b2
--- /dev/null
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/adaptive/AdaptiveJoinGenerator.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.join.adaptive;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
+import org.apache.flink.table.runtime.operators.join.FlinkJoinType;
+
+import java.io.Serializable;
+
+/** Interface for implementing an adaptive join operator generator. */
+@Internal
+public interface AdaptiveJoinGenerator extends Serializable {
+    /**
+     * Generates a StreamOperatorFactory for this join operator using the 
provided ClassLoader and
+     * parameters.
+     *
+     * @param classLoader the ClassLoader to be used for loading classes.
+     * @param config the configuration to be applied for creating the operator 
factory.
+     * @param joinType the join type.
+     * @param originIsSortMergeJoin whether the join operator is a 
SortMergeJoin.
+     * @param isBroadcastJoin whether the join operator can be optimized to 
broadcast hash join.
+     * @param leftIsBuild whether the left input side is the build side.
+     * @return a StreamOperatorFactory instance.
+     */
+    StreamOperatorFactory<?> genOperatorFactory(
+            ClassLoader classLoader,
+            ReadableConfig config,
+            FlinkJoinType joinType,
+            boolean originIsSortMergeJoin,
+            boolean isBroadcastJoin,
+            boolean leftIsBuild);
+}
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/adaptive/AdaptiveJoinOperatorFactory.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/adaptive/AdaptiveJoinOperatorFactory.java
index 3efac4e3b1b..7246f3718c0 100644
--- 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/adaptive/AdaptiveJoinOperatorFactory.java
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/adaptive/AdaptiveJoinOperatorFactory.java
@@ -25,12 +25,17 @@ import 
org.apache.flink.streaming.api.operators.StreamOperatorFactory;
 import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
 import org.apache.flink.table.planner.loader.PlannerModule;
 import org.apache.flink.table.runtime.operators.join.FlinkJoinType;
+import org.apache.flink.util.FlinkUserCodeClassLoaders;
 import org.apache.flink.util.InstantiationUtil;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import javax.annotation.Nullable;
 
 import java.io.IOException;
 
+import static 
org.apache.flink.util.FlinkUserCodeClassLoader.NOOP_EXCEPTION_HANDLER;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
@@ -46,49 +51,112 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
 @Internal
 public class AdaptiveJoinOperatorFactory<OUT> extends 
AbstractStreamOperatorFactory<OUT>
         implements AdaptiveJoin {
-    private static final long serialVersionUID = 1L;
+    private static final Logger LOG = 
LoggerFactory.getLogger(AdaptiveJoinOperatorFactory.class);
 
-    private final byte[] adaptiveJoinSerialized;
+    private static final long serialVersionUID = 1L;
 
-    @Nullable private transient AdaptiveJoin adaptiveJoin;
+    private final byte[] adaptiveJoinGeneratorSerialized;
 
     @Nullable private StreamOperatorFactory<OUT> finalFactory;
 
-    public AdaptiveJoinOperatorFactory(byte[] adaptiveJoinSerialized) {
-        this.adaptiveJoinSerialized = checkNotNull(adaptiveJoinSerialized);
+    private final boolean checkClassLoaderLeak;
+
+    private final FlinkJoinType joinType;
+
+    private final boolean originIsSortMergeJoin;
+
+    private final boolean originalLeftIsBuild;
+
+    private boolean leftIsBuild;
+
+    private boolean isBroadcastJoin;
+
+    public AdaptiveJoinOperatorFactory(
+            byte[] adaptiveJoinGeneratorSerialized,
+            FlinkJoinType joinType,
+            boolean originIsSortMergeJoin,
+            boolean leftIsBuild,
+            boolean checkClassLoaderLeak) {
+        this.adaptiveJoinGeneratorSerialized = 
checkNotNull(adaptiveJoinGeneratorSerialized);
+        this.joinType = joinType;
+        this.originIsSortMergeJoin = originIsSortMergeJoin;
+        this.leftIsBuild = leftIsBuild;
+        this.originalLeftIsBuild = leftIsBuild;
+        this.checkClassLoaderLeak = checkClassLoaderLeak;
     }
 
     @Override
     public StreamOperatorFactory<?> genOperatorFactory(
-            ClassLoader classLoader, ReadableConfig config) {
-        checkAndLazyInitialize();
+            ClassLoader userClassLoader, ReadableConfig config) {
+        // In some IT/E2E tests, plannerModule may be null, so we handle it 
specially to avoid
+        // breaking these tests.
+        PlannerModule plannerModule = null;
+        try {
+            plannerModule = PlannerModule.getInstance();
+        } catch (Throwable throwable) {
+            LOG.warn(
+                    "Failed to get PlannerModule instance, may cause adaptive 
join deserialization failure.",
+                    throwable);
+        }
+
+        ClassLoader classLoader =
+                plannerModule == null
+                        ? userClassLoader
+                        : FlinkUserCodeClassLoaders.parentFirst(
+                                
plannerModule.getSubmoduleClassLoader().getURLs(),
+                                userClassLoader,
+                                NOOP_EXCEPTION_HANDLER,
+                                checkClassLoaderLeak);
+
+        AdaptiveJoinGenerator adaptiveJoinGenerator;
+        try {
+            adaptiveJoinGenerator =
+                    InstantiationUtil.deserializeObject(
+                            adaptiveJoinGeneratorSerialized, classLoader);
+        } catch (ClassNotFoundException | IOException e) {
+            throw new RuntimeException(
+                    "Failed to deserialize AdaptiveJoinGenerator instance. "
+                            + "Please check whether the 
flink-table-planner-loader.jar is in the classpath.",
+                    e);
+        }
         this.finalFactory =
-                (StreamOperatorFactory<OUT>) 
adaptiveJoin.genOperatorFactory(classLoader, config);
+                (StreamOperatorFactory<OUT>)
+                        adaptiveJoinGenerator.genOperatorFactory(
+                                classLoader,
+                                config,
+                                joinType,
+                                originIsSortMergeJoin,
+                                isBroadcastJoin,
+                                leftIsBuild);
         return this.finalFactory;
     }
 
     @Override
     public FlinkJoinType getJoinType() {
-        checkAndLazyInitialize();
-        return adaptiveJoin.getJoinType();
+        return joinType;
     }
 
     @Override
-    public void markAsBroadcastJoin(boolean canBeBroadcast, boolean 
leftIsBuild) {
-        checkAndLazyInitialize();
-        adaptiveJoin.markAsBroadcastJoin(canBeBroadcast, leftIsBuild);
+    public void markAsBroadcastJoin(boolean canBroadcast, boolean leftIsBuild) 
{
+        this.isBroadcastJoin = canBroadcast;
+        this.leftIsBuild = leftIsBuild;
     }
 
     @Override
     public boolean shouldReorderInputs() {
-        checkAndLazyInitialize();
-        return adaptiveJoin.shouldReorderInputs();
-    }
+        // Sort merge join requires the left side to be read first if the 
broadcast threshold is not
+        // met.
+        if (!isBroadcastJoin && originIsSortMergeJoin) {
+            return false;
+        }
 
-    private void checkAndLazyInitialize() {
-        if (this.adaptiveJoin == null) {
-            lazyInitialize();
+        if (leftIsBuild != originalLeftIsBuild) {
+            LOG.info(
+                    "The build side of the adaptive join has been updated. 
Compile phase build side: {}, Runtime build side: {}.",
+                    originalLeftIsBuild ? "left" : "right",
+                    leftIsBuild ? "left" : "right");
         }
+        return !leftIsBuild;
     }
 
     @Override
@@ -113,28 +181,4 @@ public class AdaptiveJoinOperatorFactory<OUT> extends 
AbstractStreamOperatorFact
                 "The method should not be invoked in the "
                         + "adaptive join operator for batch jobs.");
     }
-
-    private void lazyInitialize() {
-        if 
(!tryInitializeAdaptiveJoin(Thread.currentThread().getContextClassLoader())) {
-            boolean isSuccess =
-                    tryInitializeAdaptiveJoin(
-                            
PlannerModule.getInstance().getSubmoduleClassLoader());
-            if (!isSuccess) {
-                throw new RuntimeException(
-                        "Failed to deserialize AdaptiveJoin instance. "
-                                + "Please check whether the 
flink-table-planner-loader.jar is in the classpath.");
-            }
-        }
-    }
-
-    private boolean tryInitializeAdaptiveJoin(ClassLoader classLoader) {
-        try {
-            this.adaptiveJoin =
-                    
InstantiationUtil.deserializeObject(adaptiveJoinSerialized, classLoader);
-        } catch (ClassNotFoundException | IOException e) {
-            return false;
-        }
-
-        return true;
-    }
 }


Reply via email to