xuyangzhong commented on code in PR #27159:
URL: https://github.com/apache/flink/pull/27159#discussion_r2467637345
##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/DeltaJoinUtil.java:
##########
@@ -227,56 +292,90 @@ private static List<IntPair>
reverseIntPairs(List<IntPair> intPairs) {
.collect(Collectors.toList());
}
- private static int[][] getColumnIndicesOfAllTableIndexes(TableSourceTable
tableSourceTable) {
- List<List<String>> columnsOfIndexes =
getAllIndexesColumnsOfTable(tableSourceTable);
+ private static int[][] getAllIndexesColumnsFromTableSchema(ResolvedSchema
schema) {
+ List<Index> indexes = schema.getIndexes();
+ List<List<String>> columnsOfIndexes =
+
indexes.stream().map(Index::getColumns).collect(Collectors.toList());
int[][] results = new int[columnsOfIndexes.size()][];
for (int i = 0; i < columnsOfIndexes.size(); i++) {
- List<String> fieldNames =
tableSourceTable.getRowType().getFieldNames();
+ List<String> fieldNames = schema.getColumnNames();
results[i] =
columnsOfIndexes.get(i).stream().mapToInt(fieldNames::indexOf).toArray();
}
return results;
}
- private static List<List<String>> getAllIndexesColumnsOfTable(
- TableSourceTable tableSourceTable) {
- ResolvedSchema schema =
tableSourceTable.contextResolvedTable().getResolvedSchema();
- List<Index> indexes = schema.getIndexes();
- return
indexes.stream().map(Index::getColumns).collect(Collectors.toList());
- }
-
private static boolean areJoinConditionsSupported(StreamPhysicalJoin join)
{
JoinInfo joinInfo = join.analyzeCondition();
// there must be one pair of join key
if (joinInfo.pairs().isEmpty()) {
return false;
}
+ JoinSpec joinSpec = join.joinSpec();
+ Optional<RexNode> nonEquiCond = joinSpec.getNonEquiCondition();
+ if (nonEquiCond.isPresent() &&
!RexUtil.isDeterministic(nonEquiCond.get())) {
Review Comment:
Add the comment:
```
// Delta joins may produce duplicate data, and when this data is sent
downstream, we want it
// to be processed in an idempotent manner. However, the presence of
non-deterministic
// functions can lead to unpredictable results, such as random
filtering or the addition of
// non-deterministic columns. Therefore, we strictly prohibit the
use of non-deterministic
// functions in this context to ensure consistent and reliable
processing.
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]