This is an automated email from the ASF dual-hosted git repository.
zhuzh pushed a commit to branch release-2.2
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-2.2 by this push:
new d1da51a3ac3 [FLINK-39150][runtime] Fix join operator crashes jobs when
using custom types or custom type serializers
d1da51a3ac3 is described below
commit d1da51a3ac3d589e68e2588eb348abd75023989e
Author: noorall <[email protected]>
AuthorDate: Fri Feb 27 14:25:15 2026 +0800
[FLINK-39150][runtime] Fix join operator crashes jobs when using custom
types or custom type serializers
---
.../flink-end-to-end-tests-table-api/pom.xml | 26 +++++
.../table/test/join/JoinWithCustomTypeExample.java | 76 ++++++++++++
.../apache/flink/table/test/join/TestClass.java | 22 ++++
flink-end-to-end-tests/run-nightly-tests.sh | 2 +
.../test-scripts/test_table_api.sh | 37 ++++++
.../flink/table/planner/loader/PlannerModule.java | 3 +-
.../adaptive/AdaptiveJoinOperatorGenerator.java | 79 ++-----------
.../nodes/exec/batch/BatchExecAdaptiveJoin.java | 43 +++++--
.../AdaptiveJoinOperatorGeneratorTest.java | 20 ++--
.../operators/join/adaptive/AdaptiveJoin.java | 4 +-
.../join/adaptive/AdaptiveJoinGenerator.java | 50 ++++++++
.../join/adaptive/AdaptiveJoinOperatorFactory.java | 130 ++++++++++++++-------
12 files changed, 360 insertions(+), 132 deletions(-)
diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-table-api/pom.xml
b/flink-end-to-end-tests/flink-end-to-end-tests-table-api/pom.xml
index d8a6fbc8cc1..2e9afd9f7de 100644
--- a/flink-end-to-end-tests/flink-end-to-end-tests-table-api/pom.xml
+++ b/flink-end-to-end-tests/flink-end-to-end-tests-table-api/pom.xml
@@ -104,4 +104,30 @@ under the License.
</dependency>
</dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-shade-plugin</artifactId>
+ <executions>
+ <execution>
+
<id>JoinWithCustomTypeExample</id>
+ <phase>package</phase>
+ <goals>
+ <goal>shade</goal>
+ </goals>
+ <configuration>
+
<finalName>JoinWithCustomTypeExample</finalName>
+ <transformers>
+ <transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
+
<mainClass>org.apache.flink.table.test.join.JoinWithCustomTypeExample</mainClass>
+ </transformer>
+ </transformers>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
</project>
diff --git
a/flink-end-to-end-tests/flink-end-to-end-tests-table-api/src/main/java/org/apache/flink/table/test/join/JoinWithCustomTypeExample.java
b/flink-end-to-end-tests/flink-end-to-end-tests-table-api/src/main/java/org/apache/flink/table/test/join/JoinWithCustomTypeExample.java
new file mode 100644
index 00000000000..a598a72f915
--- /dev/null
+++
b/flink-end-to-end-tests/flink-end-to-end-tests-table-api/src/main/java/org/apache/flink/table/test/join/JoinWithCustomTypeExample.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.test.join;
+
+import org.apache.flink.api.common.RuntimeExecutionMode;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.java.typeutils.PojoTypeInfo;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.v2.DiscardingSink;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.types.Row;
+
+import java.util.ArrayList;
+
+import static org.apache.flink.table.api.Expressions.$;
+
+/** Example application that tests JoinWithCustomType with Table API. */
+public class JoinWithCustomTypeExample {
+
+ private static final TypeInformation<Integer> INT = Types.INT;
+
+ public static void main(String[] args) throws Exception {
+ StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setRuntimeMode(RuntimeExecutionMode.BATCH);
+
+ StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
+
+ runTest(env, tEnv);
+ }
+
+ private static void runTest(StreamExecutionEnvironment env,
StreamTableEnvironment tEnv)
+ throws Exception {
+ System.out.println("Running join with custom type test...");
+
+ TestClass value = new TestClass();
+ TypeInformation<TestClass> valueTypeInfo =
+ new PojoTypeInfo<>(TestClass.class, new ArrayList<>());
+
+ Table table1 =
+ tEnv.fromDataStream(
+ env.fromData(Row.of(1)).returns(Types.ROW_NAMED(new
String[] {"id"}, INT)));
+
+ Table table2 =
+ tEnv.fromDataStream(
+ env.fromData(Row.of(1, value))
+ .returns(
+ Types.ROW_NAMED(
+ new String[] {"id2", "value"},
+ INT,
+ valueTypeInfo)));
+
+ tEnv.toDataStream(table1.leftOuterJoin(table2,
$("id").isEqual($("id2"))))
+ .sinkTo(new DiscardingSink<>());
+
+ env.execute("joinWithCustomType");
+ System.out.println("Job joinWithCustomType completed successfully!");
+ }
+}
diff --git
a/flink-end-to-end-tests/flink-end-to-end-tests-table-api/src/main/java/org/apache/flink/table/test/join/TestClass.java
b/flink-end-to-end-tests/flink-end-to-end-tests-table-api/src/main/java/org/apache/flink/table/test/join/TestClass.java
new file mode 100644
index 00000000000..dad5872c0fd
--- /dev/null
+++
b/flink-end-to-end-tests/flink-end-to-end-tests-table-api/src/main/java/org/apache/flink/table/test/join/TestClass.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.test.join;
+
+/** Custom class for testing join with custom type. */
+public class TestClass {}
diff --git a/flink-end-to-end-tests/run-nightly-tests.sh
b/flink-end-to-end-tests/run-nightly-tests.sh
index 92304ef616a..fe47e8e0635 100755
--- a/flink-end-to-end-tests/run-nightly-tests.sh
+++ b/flink-end-to-end-tests/run-nightly-tests.sh
@@ -223,6 +223,8 @@ function run_group_2 {
# run_test "PyFlink YARN application on Docker test"
"$END_TO_END_DIR/test-scripts/test_pyflink_yarn.sh" "skip_check_exceptions"
# fi
+ run_test "Flink Table API with custom type end-to-end test"
"$END_TO_END_DIR/test-scripts/test_table_api.sh"
+
################################################################################
# Sticky Scheduling
################################################################################
diff --git a/flink-end-to-end-tests/test-scripts/test_table_api.sh
b/flink-end-to-end-tests/test-scripts/test_table_api.sh
new file mode 100755
index 00000000000..450eaa17e30
--- /dev/null
+++ b/flink-end-to-end-tests/test-scripts/test_table_api.sh
@@ -0,0 +1,37 @@
+#!/usr/bin/env bash
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+# End to end test for join with custom type examples. It only verifies that
the job can be submitted
+# and run correctly.
+#
+# Usage:
+# FLINK_DIR=<flink dir> flink-end-to-end-tests/test-scripts/test_table_api.sh
+
+source "$(dirname "$0")"/common.sh
+
+start_cluster
+
+TEST_PROGRAM_JAR=${END_TO_END_DIR}/flink-end-to-end-tests-table-api/target/JoinWithCustomTypeExample.jar
+
+$FLINK_DIR/bin/flink run -p 1 $TEST_PROGRAM_JAR
+EXIT_CODE=$?
+
+stop_cluster
+
+exit $EXIT_CODE
diff --git
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/planner/loader/PlannerModule.java
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/planner/loader/PlannerModule.java
index 289a3f23fa6..1a8375b5039 100644
---
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/planner/loader/PlannerModule.java
+++
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/planner/loader/PlannerModule.java
@@ -34,6 +34,7 @@ import org.apache.flink.util.IOUtils;
import java.io.IOException;
import java.io.InputStream;
import java.net.URL;
+import java.net.URLClassLoader;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
@@ -135,7 +136,7 @@ public class PlannerModule {
}
}
- public ClassLoader getSubmoduleClassLoader() {
+ public URLClassLoader getSubmoduleClassLoader() {
return this.submoduleClassLoader;
}
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/adaptive/AdaptiveJoinOperatorGenerator.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/adaptive/AdaptiveJoinOperatorGenerator.java
index 78981a5e8ce..b69220399d4 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/adaptive/AdaptiveJoinOperatorGenerator.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/adaptive/AdaptiveJoinOperatorGenerator.java
@@ -20,30 +20,21 @@ package org.apache.flink.table.planner.adaptive;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
import org.apache.flink.table.planner.plan.utils.HashJoinOperatorUtil;
-import org.apache.flink.table.planner.plan.utils.OperatorType;
import org.apache.flink.table.planner.plan.utils.SorMergeJoinOperatorUtil;
import org.apache.flink.table.runtime.generated.GeneratedJoinCondition;
import org.apache.flink.table.runtime.operators.join.FlinkJoinType;
-import org.apache.flink.table.runtime.operators.join.adaptive.AdaptiveJoin;
+import
org.apache.flink.table.runtime.operators.join.adaptive.AdaptiveJoinGenerator;
import org.apache.flink.table.types.logical.RowType;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import static org.apache.flink.util.Preconditions.checkState;
-
/**
- * Implementation class for {@link AdaptiveJoin}. It can selectively generate
broadcast hash join,
- * shuffle hash join or shuffle merge join operator based on actual conditions.
+ * Implementation class for {@link AdaptiveJoinGenerator}. It can selectively
generate broadcast
+ * hash join, shuffle hash join or shuffle merge join operator based on actual
conditions.
*/
-public class AdaptiveJoinOperatorGenerator implements AdaptiveJoin {
- private static final Logger LOG =
LoggerFactory.getLogger(AdaptiveJoinOperatorGenerator.class);
+public class AdaptiveJoinOperatorGenerator implements AdaptiveJoinGenerator {
private final int[] leftKeys;
private final int[] rightKeys;
- private final FlinkJoinType joinType;
-
private final boolean[] filterNulls;
private final RowType leftType;
@@ -64,18 +55,9 @@ public class AdaptiveJoinOperatorGenerator implements
AdaptiveJoin {
private final long managedMemory;
- private final OperatorType originalJoin;
-
- private boolean leftIsBuild;
-
- private boolean originalLeftIsBuild;
-
- private boolean isBroadcastJoin;
-
public AdaptiveJoinOperatorGenerator(
int[] leftKeys,
int[] rightKeys,
- FlinkJoinType joinType,
boolean[] filterNulls,
RowType leftType,
RowType rightType,
@@ -85,12 +67,9 @@ public class AdaptiveJoinOperatorGenerator implements
AdaptiveJoin {
long leftRowCount,
long rightRowCount,
boolean tryDistinctBuildRow,
- long managedMemory,
- boolean leftIsBuild,
- OperatorType originalJoin) {
+ long managedMemory) {
this.leftKeys = leftKeys;
this.rightKeys = rightKeys;
- this.joinType = joinType;
this.filterNulls = filterNulls;
this.leftType = leftType;
this.rightType = rightType;
@@ -101,23 +80,17 @@ public class AdaptiveJoinOperatorGenerator implements
AdaptiveJoin {
this.rightRowCount = rightRowCount;
this.tryDistinctBuildRow = tryDistinctBuildRow;
this.managedMemory = managedMemory;
- checkState(
- originalJoin == OperatorType.ShuffleHashJoin
- || originalJoin == OperatorType.SortMergeJoin,
- String.format(
- "Adaptive join "
- + "currently only supports adaptive
optimization for ShuffleHashJoin and "
- + "SortMergeJoin, not including %s.",
- originalJoin.toString()));
- this.leftIsBuild = leftIsBuild;
- this.originalLeftIsBuild = leftIsBuild;
- this.originalJoin = originalJoin;
}
@Override
public StreamOperatorFactory<?> genOperatorFactory(
- ClassLoader classLoader, ReadableConfig config) {
- if (isBroadcastJoin || originalJoin == OperatorType.ShuffleHashJoin) {
+ ClassLoader classLoader,
+ ReadableConfig config,
+ FlinkJoinType joinType,
+ boolean originIsSortMergeJoin,
+ boolean isBroadcastJoin,
+ boolean leftIsBuild) {
+ if (isBroadcastJoin || !originIsSortMergeJoin) {
return HashJoinOperatorUtil.generateOperatorFactory(
leftKeys,
rightKeys,
@@ -150,32 +123,4 @@ public class AdaptiveJoinOperatorGenerator implements
AdaptiveJoin {
classLoader);
}
}
-
- @Override
- public FlinkJoinType getJoinType() {
- return joinType;
- }
-
- @Override
- public void markAsBroadcastJoin(boolean canBroadcast, boolean leftIsBuild)
{
- this.isBroadcastJoin = canBroadcast;
- this.leftIsBuild = leftIsBuild;
- }
-
- @Override
- public boolean shouldReorderInputs() {
- // Sort merge join requires the left side to be read first if the
broadcast threshold is not
- // met.
- if (!isBroadcastJoin && originalJoin == OperatorType.SortMergeJoin) {
- return false;
- }
-
- if (leftIsBuild != originalLeftIsBuild) {
- LOG.info(
- "The build side of the adaptive join has been updated.
Compile phase build side: {}, Runtime build side: {}.",
- originalLeftIsBuild ? "left" : "right",
- leftIsBuild ? "left" : "right");
- }
- return !leftIsBuild;
- }
}
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecAdaptiveJoin.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecAdaptiveJoin.java
index bf1122bde86..a103a8e349b 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecAdaptiveJoin.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecAdaptiveJoin.java
@@ -19,6 +19,7 @@
package org.apache.flink.table.planner.plan.nodes.exec.batch;
import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
import org.apache.flink.table.api.TableException;
@@ -36,7 +37,7 @@ import
org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil;
import org.apache.flink.table.planner.plan.utils.JoinUtil;
import org.apache.flink.table.planner.plan.utils.OperatorType;
import org.apache.flink.table.runtime.generated.GeneratedJoinCondition;
-import org.apache.flink.table.runtime.operators.join.adaptive.AdaptiveJoin;
+import org.apache.flink.table.runtime.operators.join.FlinkJoinType;
import
org.apache.flink.table.runtime.operators.join.adaptive.AdaptiveJoinOperatorFactory;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.types.logical.RowType;
@@ -45,6 +46,8 @@ import org.apache.flink.util.InstantiationUtil;
import java.io.IOException;
import java.util.List;
+import static org.apache.flink.util.Preconditions.checkState;
+
/** {@link BatchExecNode} for adaptive join. */
public class BatchExecAdaptiveJoin extends ExecNodeBase<RowData>
implements BatchExecNode<RowData>,
SingleTransformationTranslator<RowData> {
@@ -87,6 +90,14 @@ public class BatchExecAdaptiveJoin extends
ExecNodeBase<RowData>
this.leftIsBuild = leftIsBuild;
this.tryDistinctBuildRow = tryDistinctBuildRow;
this.description = description;
+ checkState(
+ originalJoin == OperatorType.ShuffleHashJoin
+ || originalJoin == OperatorType.SortMergeJoin,
+ String.format(
+ "Adaptive join "
+ + "currently only supports adaptive
optimization for ShuffleHashJoin and "
+ + "SortMergeJoin, not including %s.",
+ originalJoin.toString()));
this.originalJoin = originalJoin;
}
@@ -113,11 +124,10 @@ public class BatchExecAdaptiveJoin extends
ExecNodeBase<RowData>
leftType,
rightType);
- AdaptiveJoinOperatorGenerator adaptiveJoin =
+ AdaptiveJoinOperatorGenerator adaptiveJoinGenerator =
new AdaptiveJoinOperatorGenerator(
joinSpec.getLeftKeys(),
joinSpec.getRightKeys(),
- joinSpec.getJoinType(),
joinSpec.getFilterNulls(),
leftType,
rightType,
@@ -127,16 +137,19 @@ public class BatchExecAdaptiveJoin extends
ExecNodeBase<RowData>
estimatedLeftRowCount,
estimatedRightRowCount,
tryDistinctBuildRow,
- managedMemory,
- leftIsBuild,
- originalJoin);
+ managedMemory);
return ExecNodeUtil.createTwoInputTransformation(
leftInputTransform,
rightInputTransform,
createTransformationName(config),
createTransformationDescription(config),
- getAdaptiveJoinOperatorFactory(adaptiveJoin),
+ getAdaptiveJoinOperatorFactory(
+ adaptiveJoinGenerator,
+ config.get(CoreOptions.CHECK_LEAKED_CLASSLOADER),
+ joinSpec.getJoinType(),
+ originalJoin,
+ leftIsBuild),
InternalTypeInfo.of(getOutputType()),
// Given that the probe side might be decided at runtime, we
choose the larger
// parallelism here.
@@ -146,10 +159,20 @@ public class BatchExecAdaptiveJoin extends
ExecNodeBase<RowData>
}
private StreamOperatorFactory<RowData> getAdaptiveJoinOperatorFactory(
- AdaptiveJoin adaptiveJoin) {
+ AdaptiveJoinOperatorGenerator adaptiveJoinGenerator,
+ boolean checkClassLoaderLeak,
+ FlinkJoinType joinType,
+ OperatorType originalJoin,
+ boolean leftIsBuild) {
try {
- byte[] adaptiveJoinSerialized =
InstantiationUtil.serializeObject(adaptiveJoin);
- return new AdaptiveJoinOperatorFactory<>(adaptiveJoinSerialized);
+ byte[] adaptiveJoinGeneratorSerialized =
+ InstantiationUtil.serializeObject(adaptiveJoinGenerator);
+ return new AdaptiveJoinOperatorFactory<>(
+ adaptiveJoinGeneratorSerialized,
+ joinType,
+ originalJoin == OperatorType.SortMergeJoin,
+ leftIsBuild,
+ checkClassLoaderLeak);
} catch (IOException e) {
throw new TableException("The adaptive join operator failed to
serialize.", e);
}
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/adaptive/AdaptiveJoinOperatorGeneratorTest.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/adaptive/AdaptiveJoinOperatorGeneratorTest.java
index 3baa5030cbe..d554465993b 100644
---
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/adaptive/AdaptiveJoinOperatorGeneratorTest.java
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/adaptive/AdaptiveJoinOperatorGeneratorTest.java
@@ -28,7 +28,7 @@ import
org.apache.flink.table.runtime.operators.join.FlinkJoinType;
import org.apache.flink.table.runtime.operators.join.HashJoinOperator;
import
org.apache.flink.table.runtime.operators.join.Int2HashJoinOperatorTestBase;
import org.apache.flink.table.runtime.operators.join.SortMergeJoinOperator;
-import org.apache.flink.table.runtime.operators.join.adaptive.AdaptiveJoin;
+import
org.apache.flink.table.runtime.operators.join.adaptive.AdaptiveJoinGenerator;
import org.apache.flink.table.runtime.util.UniformBinaryRowGenerator;
import org.apache.flink.table.types.logical.IntType;
import org.apache.flink.table.types.logical.RowType;
@@ -268,10 +268,15 @@ class AdaptiveJoinOperatorGeneratorTest extends
Int2HashJoinOperatorTestBase {
boolean buildLeft,
boolean isBroadcast,
OperatorType operatorType) {
- AdaptiveJoin adaptiveJoin = genAdaptiveJoin(flinkJoinType,
operatorType);
- adaptiveJoin.markAsBroadcastJoin(isBroadcast, buildLeft);
+ AdaptiveJoinGenerator adaptiveJoinGenerator =
genAdaptiveJoinGenerator();
- return adaptiveJoin.genOperatorFactory(getClass().getClassLoader(),
new Configuration());
+ return adaptiveJoinGenerator.genOperatorFactory(
+ getClass().getClassLoader(),
+ new Configuration(),
+ flinkJoinType,
+ operatorType == SortMergeJoin,
+ isBroadcast,
+ buildLeft);
}
public void assertOperatorType(Object operator, OperatorType
expectedOperatorType) {
@@ -301,7 +306,7 @@ class AdaptiveJoinOperatorGeneratorTest extends
Int2HashJoinOperatorTestBase {
}
}
- public AdaptiveJoin genAdaptiveJoin(FlinkJoinType flinkJoinType,
OperatorType operatorType) {
+ public AdaptiveJoinGenerator genAdaptiveJoinGenerator() {
GeneratedJoinCondition condFuncCode =
new GeneratedJoinCondition(
Int2HashJoinOperatorTestBase.MyJoinCondition.class.getCanonicalName(),
@@ -316,7 +321,6 @@ class AdaptiveJoinOperatorGeneratorTest extends
Int2HashJoinOperatorTestBase {
return new AdaptiveJoinOperatorGenerator(
new int[] {0},
new int[] {0},
- flinkJoinType,
new boolean[] {true},
RowType.of(new IntType(), new IntType()),
RowType.of(new IntType(), new IntType()),
@@ -326,8 +330,6 @@ class AdaptiveJoinOperatorGeneratorTest extends
Int2HashJoinOperatorTestBase {
20,
10000,
false,
- TABLE_EXEC_RESOURCE_HASH_JOIN_MEMORY.defaultValue().getBytes(),
- true,
- operatorType);
+
TABLE_EXEC_RESOURCE_HASH_JOIN_MEMORY.defaultValue().getBytes());
}
}
diff --git
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/adaptive/AdaptiveJoin.java
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/adaptive/AdaptiveJoin.java
index 0d289ae9639..9ca97590389 100755
---
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/adaptive/AdaptiveJoin.java
+++
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/adaptive/AdaptiveJoin.java
@@ -32,11 +32,11 @@ public interface AdaptiveJoin extends Serializable {
* Generates a StreamOperatorFactory for this join operator using the
provided ClassLoader and
* config.
*
- * @param classLoader the ClassLoader to be used for loading classes.
+ * @param userClassLoader the user ClassLoader to be used for loading
classes.
* @param config the configuration to be applied for creating the operator
factory.
* @return a StreamOperatorFactory instance.
*/
- StreamOperatorFactory<?> genOperatorFactory(ClassLoader classLoader,
ReadableConfig config);
+ StreamOperatorFactory<?> genOperatorFactory(ClassLoader userClassLoader,
ReadableConfig config);
/**
* Get the join type of the join operator.
diff --git
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/adaptive/AdaptiveJoinGenerator.java
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/adaptive/AdaptiveJoinGenerator.java
new file mode 100644
index 00000000000..02496ebb7b2
--- /dev/null
+++
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/adaptive/AdaptiveJoinGenerator.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.join.adaptive;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
+import org.apache.flink.table.runtime.operators.join.FlinkJoinType;
+
+import java.io.Serializable;
+
+/** Interface for implementing an adaptive join operator generator. */
+@Internal
+public interface AdaptiveJoinGenerator extends Serializable {
+ /**
+ * Generates a StreamOperatorFactory for this join operator using the
provided ClassLoader and
+ * parameters.
+ *
+ * @param classLoader the ClassLoader to be used for loading classes.
+ * @param config the configuration to be applied for creating the operator
factory.
+ * @param joinType the join type.
+ * @param originIsSortMergeJoin whether the join operator is a
SortMergeJoin.
+ * @param isBroadcastJoin whether the join operator can be optimized to
broadcast hash join.
+ * @param leftIsBuild whether the left input side is the build side.
+ * @return a StreamOperatorFactory instance.
+ */
+ StreamOperatorFactory<?> genOperatorFactory(
+ ClassLoader classLoader,
+ ReadableConfig config,
+ FlinkJoinType joinType,
+ boolean originIsSortMergeJoin,
+ boolean isBroadcastJoin,
+ boolean leftIsBuild);
+}
diff --git
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/adaptive/AdaptiveJoinOperatorFactory.java
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/adaptive/AdaptiveJoinOperatorFactory.java
index 3efac4e3b1b..7246f3718c0 100644
---
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/adaptive/AdaptiveJoinOperatorFactory.java
+++
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/adaptive/AdaptiveJoinOperatorFactory.java
@@ -25,12 +25,17 @@ import
org.apache.flink.streaming.api.operators.StreamOperatorFactory;
import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
import org.apache.flink.table.planner.loader.PlannerModule;
import org.apache.flink.table.runtime.operators.join.FlinkJoinType;
+import org.apache.flink.util.FlinkUserCodeClassLoaders;
import org.apache.flink.util.InstantiationUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import javax.annotation.Nullable;
import java.io.IOException;
+import static
org.apache.flink.util.FlinkUserCodeClassLoader.NOOP_EXCEPTION_HANDLER;
import static org.apache.flink.util.Preconditions.checkNotNull;
/**
@@ -46,49 +51,112 @@ import static
org.apache.flink.util.Preconditions.checkNotNull;
@Internal
public class AdaptiveJoinOperatorFactory<OUT> extends
AbstractStreamOperatorFactory<OUT>
implements AdaptiveJoin {
- private static final long serialVersionUID = 1L;
+ private static final Logger LOG =
LoggerFactory.getLogger(AdaptiveJoinOperatorFactory.class);
- private final byte[] adaptiveJoinSerialized;
+ private static final long serialVersionUID = 1L;
- @Nullable private transient AdaptiveJoin adaptiveJoin;
+ private final byte[] adaptiveJoinGeneratorSerialized;
@Nullable private StreamOperatorFactory<OUT> finalFactory;
- public AdaptiveJoinOperatorFactory(byte[] adaptiveJoinSerialized) {
- this.adaptiveJoinSerialized = checkNotNull(adaptiveJoinSerialized);
+ private final boolean checkClassLoaderLeak;
+
+ private final FlinkJoinType joinType;
+
+ private final boolean originIsSortMergeJoin;
+
+ private final boolean originalLeftIsBuild;
+
+ private boolean leftIsBuild;
+
+ private boolean isBroadcastJoin;
+
+ public AdaptiveJoinOperatorFactory(
+ byte[] adaptiveJoinGeneratorSerialized,
+ FlinkJoinType joinType,
+ boolean originIsSortMergeJoin,
+ boolean leftIsBuild,
+ boolean checkClassLoaderLeak) {
+ this.adaptiveJoinGeneratorSerialized =
checkNotNull(adaptiveJoinGeneratorSerialized);
+ this.joinType = joinType;
+ this.originIsSortMergeJoin = originIsSortMergeJoin;
+ this.leftIsBuild = leftIsBuild;
+ this.originalLeftIsBuild = leftIsBuild;
+ this.checkClassLoaderLeak = checkClassLoaderLeak;
}
@Override
public StreamOperatorFactory<?> genOperatorFactory(
- ClassLoader classLoader, ReadableConfig config) {
- checkAndLazyInitialize();
+ ClassLoader userClassLoader, ReadableConfig config) {
+ // In some IT/E2E tests, plannerModule may be null, so we handle it
specially to avoid
+ // breaking these tests.
+ PlannerModule plannerModule = null;
+ try {
+ plannerModule = PlannerModule.getInstance();
+ } catch (Throwable throwable) {
+ LOG.warn(
+ "Failed to get PlannerModule instance, may cause adaptive
join deserialization failure.",
+ throwable);
+ }
+
+ ClassLoader classLoader =
+ plannerModule == null
+ ? userClassLoader
+ : FlinkUserCodeClassLoaders.parentFirst(
+
plannerModule.getSubmoduleClassLoader().getURLs(),
+ userClassLoader,
+ NOOP_EXCEPTION_HANDLER,
+ checkClassLoaderLeak);
+
+ AdaptiveJoinGenerator adaptiveJoinGenerator;
+ try {
+ adaptiveJoinGenerator =
+ InstantiationUtil.deserializeObject(
+ adaptiveJoinGeneratorSerialized, classLoader);
+ } catch (ClassNotFoundException | IOException e) {
+ throw new RuntimeException(
+ "Failed to deserialize AdaptiveJoinGenerator instance. "
+ + "Please check whether the
flink-table-planner-loader.jar is in the classpath.",
+ e);
+ }
this.finalFactory =
- (StreamOperatorFactory<OUT>)
adaptiveJoin.genOperatorFactory(classLoader, config);
+ (StreamOperatorFactory<OUT>)
+ adaptiveJoinGenerator.genOperatorFactory(
+ classLoader,
+ config,
+ joinType,
+ originIsSortMergeJoin,
+ isBroadcastJoin,
+ leftIsBuild);
return this.finalFactory;
}
@Override
public FlinkJoinType getJoinType() {
- checkAndLazyInitialize();
- return adaptiveJoin.getJoinType();
+ return joinType;
}
@Override
- public void markAsBroadcastJoin(boolean canBeBroadcast, boolean
leftIsBuild) {
- checkAndLazyInitialize();
- adaptiveJoin.markAsBroadcastJoin(canBeBroadcast, leftIsBuild);
+ public void markAsBroadcastJoin(boolean canBroadcast, boolean leftIsBuild)
{
+ this.isBroadcastJoin = canBroadcast;
+ this.leftIsBuild = leftIsBuild;
}
@Override
public boolean shouldReorderInputs() {
- checkAndLazyInitialize();
- return adaptiveJoin.shouldReorderInputs();
- }
+ // Sort merge join requires the left side to be read first if the
broadcast threshold is not
+ // met.
+ if (!isBroadcastJoin && originIsSortMergeJoin) {
+ return false;
+ }
- private void checkAndLazyInitialize() {
- if (this.adaptiveJoin == null) {
- lazyInitialize();
+ if (leftIsBuild != originalLeftIsBuild) {
+ LOG.info(
+ "The build side of the adaptive join has been updated.
Compile phase build side: {}, Runtime build side: {}.",
+ originalLeftIsBuild ? "left" : "right",
+ leftIsBuild ? "left" : "right");
}
+ return !leftIsBuild;
}
@Override
@@ -113,28 +181,4 @@ public class AdaptiveJoinOperatorFactory<OUT> extends
AbstractStreamOperatorFact
"The method should not be invoked in the "
+ "adaptive join operator for batch jobs.");
}
-
- private void lazyInitialize() {
- if
(!tryInitializeAdaptiveJoin(Thread.currentThread().getContextClassLoader())) {
- boolean isSuccess =
- tryInitializeAdaptiveJoin(
-
PlannerModule.getInstance().getSubmoduleClassLoader());
- if (!isSuccess) {
- throw new RuntimeException(
- "Failed to deserialize AdaptiveJoin instance. "
- + "Please check whether the
flink-table-planner-loader.jar is in the classpath.");
- }
- }
- }
-
- private boolean tryInitializeAdaptiveJoin(ClassLoader classLoader) {
- try {
- this.adaptiveJoin =
-
InstantiationUtil.deserializeObject(adaptiveJoinSerialized, classLoader);
- } catch (ClassNotFoundException | IOException e) {
- return false;
- }
-
- return true;
- }
}