This is an automated email from the ASF dual-hosted git repository.
snuyanzin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-jdbc.git
The following commit(s) were added to refs/heads/main by this push:
new 5a90eb0a [FLINK-33365] include filters with Lookup joins
5a90eb0a is described below
commit 5a90eb0a73ca0ac8475331a74ae8f7c1c01646bb
Author: David Radley <[email protected]>
AuthorDate: Thu Jan 25 09:47:16 2024 +0000
[FLINK-33365] include filters with Lookup joins
This closes apache/flink-connector-jdbc#79
Signed-off-by: David Radley <[email protected]>
Co-authored-by: Benchao Li <[email protected]>
Co-authored-by: Sergey Nuyanzin <[email protected]>
---
.../statement/FieldNamedPreparedStatement.java | 13 +-
.../statement/FieldNamedPreparedStatementImpl.java | 28 +-
.../jdbc/table/JdbcDynamicTableSource.java | 4 +-
.../jdbc/table/JdbcRowDataLookupFunction.java | 39 ++-
.../jdbc/table/JdbcDynamicTableSourceITCase.java | 336 +++++++++++++++++----
.../connector/jdbc/table/JdbcLookupTestBase.java | 62 +++-
.../jdbc/table/JdbcRowDataLookupFunctionTest.java | 176 ++++++++++-
.../connector/jdbc/table/JdbcTablePlanTest.java | 82 ++++-
.../connector/jdbc/table/JdbcTablePlanTest.xml | 210 +++++++++++--
9 files changed, 829 insertions(+), 121 deletions(-)
diff --git
a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/statement/FieldNamedPreparedStatement.java
b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/statement/FieldNamedPreparedStatement.java
index a57d9ff6..85814ece 100644
---
a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/statement/FieldNamedPreparedStatement.java
+++
b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/statement/FieldNamedPreparedStatement.java
@@ -69,7 +69,18 @@ public interface FieldNamedPreparedStatement extends
AutoCloseable {
*/
static FieldNamedPreparedStatement prepareStatement(
Connection connection, String sql, String[] fieldNames) throws
SQLException {
- return FieldNamedPreparedStatementImpl.prepareStatement(connection,
sql, fieldNames);
+ return FieldNamedPreparedStatementImpl.prepareStatement(connection,
sql, fieldNames, "", 0);
+ }
+
+ static FieldNamedPreparedStatement prepareStatement(
+ Connection connection,
+ String sql,
+ String[] fieldNames,
+ String additionalPredicates,
+ int numberOfDynamicParams)
+ throws SQLException {
+ return FieldNamedPreparedStatementImpl.prepareStatement(
+ connection, sql, fieldNames, additionalPredicates,
numberOfDynamicParams);
}
/**
diff --git
a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/statement/FieldNamedPreparedStatementImpl.java
b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/statement/FieldNamedPreparedStatementImpl.java
index 20b9692f..fc05b90b 100644
---
a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/statement/FieldNamedPreparedStatementImpl.java
+++
b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/statement/FieldNamedPreparedStatementImpl.java
@@ -178,7 +178,12 @@ public class FieldNamedPreparedStatementImpl implements
FieldNamedPreparedStatem
//
----------------------------------------------------------------------------------------
public static FieldNamedPreparedStatement prepareStatement(
- Connection connection, String sql, String[] fieldNames) throws
SQLException {
+ Connection connection,
+ String sql,
+ String[] fieldNames,
+ String additionalPredicates,
+ int numberOfDynamicParams)
+ throws SQLException {
checkNotNull(connection, "connection must not be null.");
checkNotNull(sql, "sql must not be null.");
checkNotNull(fieldNames, "fieldNames must not be null.");
@@ -186,18 +191,33 @@ public class FieldNamedPreparedStatementImpl implements
FieldNamedPreparedStatem
if (sql.contains("?")) {
throw new IllegalArgumentException("SQL statement must not contain
? character.");
}
+ sql = sql + additionalPredicates;
HashMap<String, List<Integer>> parameterMap = new HashMap<>();
String parsedSQL = parseNamedStatement(sql, parameterMap);
+
// currently, the statements must contain all the field parameters
- checkArgument(parameterMap.size() == fieldNames.length);
- int[][] indexMapping = new int[fieldNames.length][];
- for (int i = 0; i < fieldNames.length; i++) {
+ final int parameterMapSize = parameterMap.size();
+ final int fieldNamesLength = fieldNames.length;
+ checkArgument(
+ parameterMapSize == fieldNamesLength,
+ "Expected "
+ + fieldNamesLength
+ + " fields, but the parsing found "
+ + parameterMapSize);
+ int[][] indexMapping = new int[fieldNamesLength +
numberOfDynamicParams][];
+ int numberOfNameBasedParams = 0;
+ for (int i = 0; i < fieldNamesLength; i++) {
String fieldName = fieldNames[i];
checkArgument(
parameterMap.containsKey(fieldName),
fieldName + " doesn't exist in the parameters of SQL
statement: " + sql);
indexMapping[i] = parameterMap.get(fieldName).stream().mapToInt(v
-> v).toArray();
+ numberOfNameBasedParams += parameterMap.get(fieldName).size();
+ }
+ for (int i = 0; i < numberOfDynamicParams; ++i) {
+ // FieldNamedPreparedStatement is 0-based, however,
PreparedStatement is 1-based
+ indexMapping[i + fieldNamesLength] = new int[] {i +
numberOfNameBasedParams + 1};
}
return new FieldNamedPreparedStatementImpl(
diff --git
a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableSource.java
b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableSource.java
index 48e1702b..c8ef2e33 100644
---
a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableSource.java
+++
b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableSource.java
@@ -110,7 +110,9 @@ public class JdbcDynamicTableSource
DataType.getFieldNames(physicalRowDataType).toArray(new String[0]),
DataType.getFieldDataTypes(physicalRowDataType).toArray(new DataType[0]),
keyNames,
- rowType);
+ rowType,
+ resolvedPredicates,
+ pushdownParams);
if (cache != null) {
return PartialCachingLookupProvider.of(lookupFunction, cache);
} else {
diff --git
a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcRowDataLookupFunction.java
b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcRowDataLookupFunction.java
index 4d327b4e..32d1b659 100644
---
a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcRowDataLookupFunction.java
+++
b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcRowDataLookupFunction.java
@@ -37,6 +37,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
+import java.io.Serializable;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
@@ -45,6 +46,7 @@ import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
+import java.util.stream.Collectors;
import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -63,6 +65,9 @@ public class JdbcRowDataLookupFunction extends LookupFunction
{
private final JdbcRowConverter jdbcRowConverter;
private final JdbcRowConverter lookupKeyRowConverter;
+ private final List<String> resolvedPredicates;
+ private final Serializable[] pushdownParams;
+
private transient FieldNamedPreparedStatement statement;
public JdbcRowDataLookupFunction(
@@ -71,11 +76,15 @@ public class JdbcRowDataLookupFunction extends
LookupFunction {
String[] fieldNames,
DataType[] fieldTypes,
String[] keyNames,
- RowType rowType) {
+ RowType rowType,
+ List<String> resolvedPredicates,
+ Serializable[] pushdownParams) {
checkNotNull(options, "No JdbcOptions supplied.");
checkNotNull(fieldNames, "No fieldNames supplied.");
checkNotNull(fieldTypes, "No fieldTypes supplied.");
checkNotNull(keyNames, "No keyNames supplied.");
+ checkNotNull(resolvedPredicates, "No resolvedPredicates supplied.");
+ checkNotNull(pushdownParams, "No pushdownParams supplied.");
this.connectionProvider = new SimpleJdbcConnectionProvider(options);
this.keyNames = keyNames;
List<String> nameList = Arrays.asList(fieldNames);
@@ -103,6 +112,8 @@ public class JdbcRowDataLookupFunction extends
LookupFunction {
Arrays.stream(keyTypes)
.map(DataType::getLogicalType)
.toArray(LogicalType[]::new)));
+ this.resolvedPredicates = resolvedPredicates;
+ this.pushdownParams = pushdownParams;
}
@Override
@@ -116,6 +127,15 @@ public class JdbcRowDataLookupFunction extends
LookupFunction {
}
}
+ private FieldNamedPreparedStatement
setPredicateParams(FieldNamedPreparedStatement statement)
+ throws SQLException {
+ for (int i = 0; i < pushdownParams.length; ++i) {
+ statement.setObject(i + keyNames.length, pushdownParams[i]);
+ }
+
+ return statement;
+ }
+
/**
* This is a lookup method which is called by Flink framework in runtime.
*
@@ -127,6 +147,7 @@ public class JdbcRowDataLookupFunction extends
LookupFunction {
try {
statement.clearParameters();
statement = lookupKeyRowConverter.toExternal(keyRow,
statement);
+ statement = setPredicateParams(statement);
try (ResultSet resultSet = statement.executeQuery()) {
ArrayList<RowData> rows = new ArrayList<>();
while (resultSet.next()) {
@@ -167,7 +188,21 @@ public class JdbcRowDataLookupFunction extends
LookupFunction {
private void establishConnectionAndStatement() throws SQLException,
ClassNotFoundException {
Connection dbConn = connectionProvider.getOrEstablishConnection();
- statement = FieldNamedPreparedStatement.prepareStatement(dbConn,
query, keyNames);
+ String additionalPredicates = "";
+ if (!resolvedPredicates.isEmpty()) {
+ String joinedConditions =
+ resolvedPredicates.stream()
+ .map(pred -> String.format("(%s)", pred))
+ .collect(Collectors.joining(" AND "));
+ if (keyNames.length == 0) {
+ additionalPredicates = " WHERE " + joinedConditions;
+ } else {
+ additionalPredicates = " AND " + joinedConditions;
+ }
+ }
+ statement =
+ FieldNamedPreparedStatement.prepareStatement(
+ dbConn, query, keyNames, additionalPredicates,
pushdownParams.length);
}
@Override
diff --git
a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableSourceITCase.java
b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableSourceITCase.java
index 51f75e34..bab5a408 100644
---
a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableSourceITCase.java
+++
b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableSourceITCase.java
@@ -76,6 +76,26 @@ public abstract class JdbcDynamicTableSourceITCase
implements DatabaseTest {
.setConfiguration(new Configuration())
.build());
+ public static final String CREATE_TABLE_WITH_NAME_STATEMENT =
+ "CREATE TABLE value_source ( "
+ + " `id` BIGINT, "
+ + " `name` STRING, "
+ + " `proctime` AS PROCTIME()"
+ + ") WITH ("
+ + " 'connector' = 'values', "
+ + " 'data-id' = '%s'"
+ + ")";
+ public static final String CREATE_TABLE_WITH_NAME_AND_NICKNAME_STATEMENT =
+ "CREATE TABLE value_source ( "
+ + " `id` BIGINT, "
+ + " `name` STRING, "
+ + " `nickname` STRING, "
+ + " `proctime` AS PROCTIME()"
+ + ") WITH ("
+ + " 'connector' = 'values', "
+ + " 'data-id' = '%s'"
+ + ")";
+
private final TableRow inputTable = createInputTable();
public static StreamExecutionEnvironment env;
@@ -276,9 +296,73 @@ public abstract class JdbcDynamicTableSourceITCase
implements DatabaseTest {
@ParameterizedTest
@EnumSource(Caching.class)
void testLookupJoin(Caching caching) {
+
+ String selectStatement =
+ "SELECT S.id, S.name, D.id, D.timestamp6_col, D.decimal_col
FROM value_source"
+ + " AS S JOIN jdbc_lookup for system_time as of
S.proctime AS D ON S.id = D.id";
+ List<Row> expectedResultSetRows =
+ Arrays.asList(
+ Row.of(
+ 1L,
+ "Alice",
+ 1L,
+
truncateTime(LocalDateTime.parse("2020-01-01T15:35:00.123456")),
+ BigDecimal.valueOf(100.1234)),
+ Row.of(
+ 1L,
+ "Alice",
+ 1L,
+
truncateTime(LocalDateTime.parse("2020-01-01T15:35:00.123456")),
+ BigDecimal.valueOf(100.1234)),
+ Row.of(
+ 2L,
+ "Bob",
+ 2L,
+
truncateTime(LocalDateTime.parse("2020-01-01T15:36:01.123456")),
+ BigDecimal.valueOf(101.1234)));
+
+ RowData key1 = GenericRowData.of(1L);
+ RowData value1 =
+ GenericRowData.of(
+ 1L,
+
DecimalData.fromBigDecimal(BigDecimal.valueOf(100.1234), 10, 4),
+ TimestampData.fromLocalDateTime(
+
truncateTime(LocalDateTime.parse("2020-01-01T15:35:00.123456"))));
+
+ RowData key2 = GenericRowData.of(2L);
+ RowData value2 =
+ GenericRowData.of(
+ 2L,
+
DecimalData.fromBigDecimal(BigDecimal.valueOf(101.1234), 10, 4),
+ TimestampData.fromLocalDateTime(
+
truncateTime(LocalDateTime.parse("2020-01-01T15:36:01.123456"))));
+
+ RowData key3 = GenericRowData.of(3L);
+
+ Map<RowData, Collection<RowData>> expectedCachedEntries = new
HashMap<>();
+ expectedCachedEntries.put(key1, Collections.singletonList(value1));
+ expectedCachedEntries.put(key2, Collections.singletonList(value2));
+ expectedCachedEntries.put(key3, Collections.emptyList());
+
+ lookupTableTest(
+ caching,
+ sampleTableData(),
+ CREATE_TABLE_WITH_NAME_STATEMENT,
+ selectStatement,
+ expectedResultSetRows,
+ expectedCachedEntries);
+ }
+
+ private void lookupTableTest(
+ Caching caching,
+ Collection<Row> dataToRegister,
+ String createTableStatement,
+ String selectStatement,
+ List<Row> expectedResultSetRows,
+ Map<RowData, Collection<RowData>> expectedCachedEntries) {
// Create JDBC lookup table
List<String> cachingOptions = Collections.emptyList();
- if (caching.equals(Caching.ENABLE_CACHE)) {
+ if (caching == Caching.ENABLE_CACHE) {
cachingOptions =
Arrays.asList(
"'lookup.cache.max-rows' = '100'",
"'lookup.cache.ttl' = '10min'");
@@ -287,24 +371,8 @@ public abstract class JdbcDynamicTableSourceITCase
implements DatabaseTest {
inputTable.getCreateQueryForFlink(getMetadata(),
"jdbc_lookup", cachingOptions));
// Create and prepare a value source
- String dataId =
- TestValuesTableFactory.registerData(
- Arrays.asList(
- Row.of(1L, "Alice"),
- Row.of(1L, "Alice"),
- Row.of(2L, "Bob"),
- Row.of(3L, "Charlie")));
- tEnv.executeSql(
- String.format(
- "CREATE TABLE value_source ( "
- + " `id` BIGINT, "
- + " `name` STRING, "
- + " `proctime` AS PROCTIME()"
- + ") WITH ("
- + " 'connector' = 'values', "
- + " 'data-id' = '%s'"
- + ")",
- dataId));
+ String dataId = TestValuesTableFactory.registerData(dataToRegister);
+ tEnv.executeSql(String.format(createTableStatement, dataId));
if (caching == Caching.ENABLE_CACHE) {
LookupCacheManager.keepCacheOnRelease(true);
@@ -312,40 +380,18 @@ public abstract class JdbcDynamicTableSourceITCase
implements DatabaseTest {
// Execute lookup join
try {
- List<Row> collected =
- executeQuery(
- "SELECT S.id, S.name, D.id, D.timestamp6_col,
D.decimal_col FROM value_source"
- + " AS S JOIN jdbc_lookup for system_time
as of S.proctime AS D ON S.id = D.id");
-
- assertThat(collected).hasSize(3);
-
- List<Row> expected =
- Arrays.asList(
- Row.of(
- 1L,
- "Alice",
- 1L,
-
truncateTime(LocalDateTime.parse("2020-01-01T15:35:00.123456")),
- BigDecimal.valueOf(100.1234)),
- Row.of(
- 1L,
- "Alice",
- 1L,
-
truncateTime(LocalDateTime.parse("2020-01-01T15:35:00.123456")),
- BigDecimal.valueOf(100.1234)),
- Row.of(
- 2L,
- "Bob",
- 2L,
-
truncateTime(LocalDateTime.parse("2020-01-01T15:36:01.123456")),
- BigDecimal.valueOf(101.1234)));
+ List<Row> collected = executeQuery(selectStatement);
+ int expectedSize = expectedResultSetRows.size();
+ // check we go the expected number of rows
assertThat(collected)
+ .as("Actual output is not size " + expectedSize)
+ .hasSize(expectedSize)
.as("The actual output is not a subset of the expected
set")
- .containsAll(expected);
+ .containsAll(expectedResultSetRows);
if (caching == Caching.ENABLE_CACHE) {
- validateCachedValues();
+ validateCachedValues(expectedCachedEntries);
}
} finally {
if (caching == Caching.ENABLE_CACHE) {
@@ -356,25 +402,150 @@ public abstract class JdbcDynamicTableSourceITCase
implements DatabaseTest {
}
}
- protected TemporalUnit timestampPrecision() {
- return ChronoUnit.MICROS;
+ @ParameterizedTest
+ @EnumSource(Caching.class)
+ void testLookupJoinWithFilter(Caching caching) {
+ List<Row> expectedResultSetRows =
+ Arrays.asList(
+ Row.of(
+ 2L,
+ "Bob",
+ 2L,
+
truncateTime(LocalDateTime.parse("2020-01-01T15:36:01.123456")),
+ BigDecimal.valueOf(101.1234)));
+
+ RowData key2 = GenericRowData.of(2L);
+ RowData value2 =
+ GenericRowData.of(
+ 2L,
+
DecimalData.fromBigDecimal(BigDecimal.valueOf(101.1234), 10, 4),
+ TimestampData.fromLocalDateTime(
+
truncateTime(LocalDateTime.parse("2020-01-01T15:36:01.123456"))));
+
+ Map<RowData, Collection<RowData>> expectedCachedEntries = new
HashMap<>();
+ expectedCachedEntries.put(key2, Collections.singletonList(value2));
+
+ lookupTableTest(
+ caching,
+ sampleTableData(),
+ CREATE_TABLE_WITH_NAME_STATEMENT,
+ "SELECT S.id, S.name, D.id, D.timestamp6_col, D.decimal_col
FROM value_source"
+ + " AS S JOIN jdbc_lookup for system_time as of
S.proctime AS D ON "
+ + "S.id = D.id AND S.name = \'Bob\'",
+ expectedResultSetRows,
+ expectedCachedEntries);
}
- private LocalDateTime truncateTime(LocalDateTime value) {
- return value.truncatedTo(timestampPrecision());
+ private static List<Row> sampleTableData() {
+ return Arrays.asList(
+ Row.of(1L, "Alice"), Row.of(1L, "Alice"), Row.of(2L, "Bob"),
Row.of(3L, "Charlie"));
}
- private List<Row> executeQuery(String query) {
- return CollectionUtil.iteratorToList(tEnv.executeSql(query).collect());
+ private static List<Row> sampleTableDataWithNickNames() {
+ return Arrays.asList(
+ Row.of(1L, "Alice", "ABC"),
+ Row.of(1L, "Alice", "ADD"),
+ Row.of(2L, "Bob", "BGH"),
+ Row.of(3L, "Charlie", "CHJ"));
}
- private void validateCachedValues() {
- // Validate cache
- Map<String, LookupCacheManager.RefCountedCache> managedCaches =
- LookupCacheManager.getInstance().getManagedCaches();
- assertThat(managedCaches).as("There should be only 1 shared cache
registered").hasSize(1);
- LookupCache cache =
managedCaches.get(managedCaches.keySet().iterator().next()).getCache();
- // jdbc does support project push down, the cached row has been
projected
+ @ParameterizedTest
+ @EnumSource(Caching.class)
+ void testLookupJoinWithMultipleFilters(Caching caching) {
+
+ List<Row> expectedResultSetRows =
+ Arrays.asList(
+ Row.of(
+ 1L,
+ "Alice",
+ "ADD",
+ 1L,
+
truncateTime(LocalDateTime.parse("2020-01-01T15:35:00.123456")),
+ BigDecimal.valueOf(100.1234)));
+
+ RowData key1 = GenericRowData.of(1L);
+ RowData value1 =
+ GenericRowData.of(
+ 1L,
+
DecimalData.fromBigDecimal(BigDecimal.valueOf(100.1234), 10, 4),
+ TimestampData.fromLocalDateTime(
+
truncateTime(LocalDateTime.parse("2020-01-01T15:35:00.123456"))));
+
+ Map<RowData, Collection<RowData>> expectedCachedEntries = new
HashMap<>();
+ expectedCachedEntries.put(key1, Collections.singletonList(value1));
+
+ lookupTableTest(
+ caching,
+ sampleTableDataWithNickNames(),
+ CREATE_TABLE_WITH_NAME_AND_NICKNAME_STATEMENT,
+ "SELECT S.id, S.name, S.nickname, D.id, D.timestamp6_col,
D.decimal_col FROM value_source"
+ + " AS S JOIN jdbc_lookup for system_time as of
S.proctime AS D ON "
+ + "S.id = D.id AND S.name = 'Alice' AND S.nickname =
'ADD'",
+ expectedResultSetRows,
+ expectedCachedEntries);
+ }
+
+ @ParameterizedTest
+ @EnumSource(Caching.class)
+ void testLookupJoinWithLikeFilter(Caching caching) {
+
+ List<Row> expectedResultSetRows =
+ Arrays.asList(
+ Row.of(
+ 1L,
+ "Alice",
+ "ABC",
+ 1L,
+
truncateTime(LocalDateTime.parse("2020-01-01T15:35:00.123456")),
+ BigDecimal.valueOf(100.1234)));
+
+ RowData key1 = GenericRowData.of(1L);
+ RowData value1 =
+ GenericRowData.of(
+ 1L,
+
DecimalData.fromBigDecimal(BigDecimal.valueOf(100.1234), 10, 4),
+ TimestampData.fromLocalDateTime(
+
truncateTime(LocalDateTime.parse("2020-01-01T15:35:00.123456"))));
+
+ Map<RowData, Collection<RowData>> expectedCachedEntries = new
HashMap<>();
+ expectedCachedEntries.put(key1, Collections.singletonList(value1));
+
+ lookupTableTest(
+ caching,
+ Arrays.asList(
+ Row.of(1L, "Alice", "ABC"),
+ Row.of(1L, "Alice", "ADD"),
+ Row.of(2L, "Bob", "BGH"),
+ Row.of(3L, "Charlie", "CHJ")),
+ CREATE_TABLE_WITH_NAME_AND_NICKNAME_STATEMENT,
+ "SELECT S.id, S.name, S.nickname, D.id, D.timestamp6_col,
D.decimal_col FROM value_source"
+ + " AS S JOIN jdbc_lookup for system_time as of
S.proctime AS D ON "
+ + "S.id = D.id AND S.name LIKE 'Al%' AND S.nickname =
'ABC' ",
+ expectedResultSetRows,
+ expectedCachedEntries);
+ }
+
+ @ParameterizedTest
+ @EnumSource(Caching.class)
+ void testLookupJoinWithORFilter(Caching caching) {
+
+ List<Row> expectedResultSetRows =
+ Arrays.asList(
+ Row.of(
+ 1L,
+ "Alice",
+ "ABC",
+ 1L,
+
truncateTime(LocalDateTime.parse("2020-01-01T15:35:00.123456")),
+ BigDecimal.valueOf(100.1234)),
+ Row.of(
+ 2L,
+ "Bob",
+ "BGH",
+ 2L,
+
truncateTime(LocalDateTime.parse("2020-01-01T15:36:01.123456")),
+ BigDecimal.valueOf(101.1234)));
+
RowData key1 = GenericRowData.of(1L);
RowData value1 =
GenericRowData.of(
@@ -391,14 +562,45 @@ public abstract class JdbcDynamicTableSourceITCase
implements DatabaseTest {
TimestampData.fromLocalDateTime(
truncateTime(LocalDateTime.parse("2020-01-01T15:36:01.123456"))));
- RowData key3 = GenericRowData.of(3L);
+ Map<RowData, Collection<RowData>> expectedCachedEntries = new
HashMap<>();
+ expectedCachedEntries.put(key1, Collections.singletonList(value1));
+ expectedCachedEntries.put(key2, Collections.singletonList(value2));
+
+ lookupTableTest(
+ caching,
+ Arrays.asList(
+ Row.of(1L, "Alice", "ABC"),
+ Row.of(1L, "Alice", "ADD"),
+ Row.of(2L, "Bob", "BGH"),
+ Row.of(3L, "Charlie", "CHJ")),
+ CREATE_TABLE_WITH_NAME_AND_NICKNAME_STATEMENT,
+ "SELECT S.id, S.name, S.nickname, D.id, D.timestamp6_col,
D.decimal_col FROM value_source"
+ + " AS S JOIN jdbc_lookup for system_time as of
S.proctime AS D ON "
+ + "S.id = D.id AND (S.name = \'Bob\' OR S.nickname =
\'ABC\')",
+ expectedResultSetRows,
+ expectedCachedEntries);
+ }
- Map<RowData, Collection<RowData>> expectedEntries = new HashMap<>();
- expectedEntries.put(key1, Collections.singletonList(value1));
- expectedEntries.put(key2, Collections.singletonList(value2));
- expectedEntries.put(key3, Collections.emptyList());
+ protected TemporalUnit timestampPrecision() {
+ return ChronoUnit.MICROS;
+ }
-
LookupCacheAssert.assertThat(cache).containsExactlyEntriesOf(expectedEntries);
+ private LocalDateTime truncateTime(LocalDateTime value) {
+ return value.truncatedTo(timestampPrecision());
+ }
+
+ private List<Row> executeQuery(String query) {
+ return CollectionUtil.iteratorToList(tEnv.executeSql(query).collect());
+ }
+
+ private void validateCachedValues(Map<RowData, Collection<RowData>>
expectedCachedEntries) {
+ // Validate cache
+ Map<String, LookupCacheManager.RefCountedCache> managedCaches =
+ LookupCacheManager.getInstance().getManagedCaches();
+ assertThat(managedCaches).as("There should be only 1 shared cache
registered").hasSize(1);
+ LookupCache cache =
managedCaches.get(managedCaches.keySet().iterator().next()).getCache();
+ // jdbc does support project push down, the cached row has been
projected
+
LookupCacheAssert.assertThat(cache).containsExactlyEntriesOf(expectedCachedEntries);
}
private enum Caching {
diff --git
a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcLookupTestBase.java
b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcLookupTestBase.java
index 585ff436..a37ed5ff 100644
---
a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcLookupTestBase.java
+++
b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcLookupTestBase.java
@@ -23,6 +23,7 @@ import
org.apache.flink.connector.jdbc.databases.derby.DerbyTestBase;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
+import java.math.BigDecimal;
import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Statement;
@@ -42,23 +43,68 @@ class JdbcLookupTestBase implements DerbyTestBase {
+ "id1 INT NOT NULL DEFAULT 0,"
+ "id2 VARCHAR(20) NOT NULL,"
+ "comment1 VARCHAR(1000),"
- + "comment2 VARCHAR(1000))");
+ + "comment2 VARCHAR(1000),"
+ + "decimal_col DECIMAL(10, 4),"
+ + "double_col DOUBLE,"
+ + "real_col FLOAT"
+ + ")");
Object[][] data =
new Object[][] {
- new Object[] {1, "1", "11-c1-v1", "11-c2-v1"},
- new Object[] {1, "1", "11-c1-v2", "11-c2-v2"},
- new Object[] {2, "3", null, "23-c2"},
- new Object[] {2, "5", "25-c1", "25-c2"},
- new Object[] {3, "8", "38-c1", "38-c2"}
+ new Object[] {
+ 1,
+ "1",
+ "11-c1-v1",
+ "11-c2-v1",
+ BigDecimal.valueOf(100.1011),
+ new Double(1.1),
+ new Float(2.2)
+ },
+ new Object[] {
+ 1,
+ "1",
+ "11-c1-v2",
+ "11-c2-v2",
+ BigDecimal.valueOf(100.2022),
+ new Double(2.2),
+ new Float(2.2)
+ },
+ new Object[] {
+ 2,
+ "3",
+ null,
+ "23-c2",
+ BigDecimal.valueOf(100.1011),
+ new Double(1.1),
+ new Float(1.1)
+ },
+ new Object[] {
+ 2,
+ "5",
+ "25-c1",
+ "25-c2",
+ BigDecimal.valueOf(100.1011),
+ new Double(1.1),
+ new Float(1.1)
+ },
+ new Object[] {
+ 1,
+ "8",
+ "11-c1-v1",
+ "11-c2-v1",
+ BigDecimal.valueOf(100.1011),
+ new Double(1.1),
+ new Float(3.3)
+ }
};
- boolean[] surroundedByQuotes = new boolean[] {false, true, true,
true};
+ boolean[] surroundedByQuotes =
+ new boolean[] {false, true, true, true, false, false,
false};
StringBuilder sqlQueryBuilder =
new StringBuilder(
"INSERT INTO "
+ LOOKUP_TABLE
- + " (id1, id2, comment1, comment2) VALUES
");
+ + " (id1, id2, comment1, comment2,
decimal_col, double_col, real_col) VALUES ");
for (int i = 0; i < data.length; i++) {
sqlQueryBuilder.append("(");
for (int j = 0; j < data[i].length; j++) {
diff --git
a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcRowDataLookupFunctionTest.java
b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcRowDataLookupFunctionTest.java
index b4f06cea..08d7d8fe 100644
---
a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcRowDataLookupFunctionTest.java
+++
b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcRowDataLookupFunctionTest.java
@@ -28,11 +28,17 @@ import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.util.Collector;
+import org.jetbrains.annotations.NotNull;
import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.MethodSource;
import org.junit.jupiter.params.provider.ValueSource;
+import org.testcontainers.shaded.com.google.common.collect.ImmutableList;
+import java.io.Serializable;
+import java.math.BigDecimal;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
@@ -47,12 +53,27 @@ class JdbcRowDataLookupFunctionTest extends
JdbcLookupTestBase {
new DataType[] {
DataTypes.INT(), DataTypes.STRING(), DataTypes.STRING(),
DataTypes.STRING()
};
+ private static final String[] fieldNames2 =
+ new String[] {
+ "id1", "id2", "comment1", "comment2", "decimal_col",
"double_col", "real_col"
+ };
+ private static final DataType[] fieldDataTypes2 =
+ new DataType[] {
+ DataTypes.INT(),
+ DataTypes.STRING(),
+ DataTypes.STRING(),
+ DataTypes.STRING(),
+ DataTypes.DECIMAL(10, 4),
+ DataTypes.DOUBLE(),
+ DataTypes.FLOAT()
+ };
private static final String[] lookupKeys = new String[] {"id1", "id2"};
@ParameterizedTest(name = "withFailure = {0}")
@ValueSource(booleans = {false, true})
void testLookup(boolean withFailure) throws Exception {
+
JdbcRowDataLookupFunction lookupFunction =
buildRowDataLookupFunction(withFailure);
ListOutputCollector collector = new ListOutputCollector();
@@ -82,6 +103,130 @@ class JdbcRowDataLookupFunctionTest extends
JdbcLookupTestBase {
assertThat(result).isEqualTo(expected);
}
+ @ParameterizedTest
+ @MethodSource("lookupWithPredicatesProvider")
+ void testEval(TestSpec testSpec) throws Exception {
+ JdbcRowDataLookupFunction lookupFunction =
+ buildRowDataLookupFunctionWithPredicates(
+ testSpec.withFailure, testSpec.resolvedPredicates,
testSpec.pushdownParams);
+
+ ListOutputCollector collector = new ListOutputCollector();
+ lookupFunction.setCollector(collector);
+ lookupFunction.open(null);
+ lookupFunction.eval(testSpec.keys);
+
+ if (testSpec.withFailure) {
+ // Close connection here, and this will be recovered by retry
+ if (lookupFunction.getDbConnection() != null) {
+ lookupFunction.getDbConnection().close();
+ }
+ }
+
+ List<String> result =
+ new ArrayList<>(collector.getOutputs())
+
.stream().map(RowData::toString).sorted().collect(Collectors.toList());
+ Collections.sort(testSpec.expected);
+ assertThat(result).isEqualTo(testSpec.expected);
+ }
+
+ private static class TestSpec {
+
+ private boolean withFailure;
+ private final List<String> resolvedPredicates;
+ private final Serializable[] pushdownParams;
+ private final Object[] keys;
+ private List<String> expected;
+
+ private TestSpec(
+ boolean withFailure,
+ List<String> resolvedPredicates,
+ Serializable[] pushdownParams,
+ Object[] keys,
+ List<String> expected) {
+ this.withFailure = withFailure;
+ this.resolvedPredicates = resolvedPredicates;
+ this.pushdownParams = pushdownParams;
+ this.keys = keys;
+ this.expected = expected;
+ }
+
+ @Override
+ public String toString() {
+ return "TestSpec{"
+ + "withFailure="
+ + withFailure
+ + ", resolvedPredicates="
+ + resolvedPredicates
+ + ", pushdownParams="
+ + Arrays.toString(pushdownParams)
+ + ", keys="
+ + Arrays.toString(keys)
+ + ", expected="
+ + expected
+ + '}';
+ }
+ }
+
+ static Collection<TestSpec> lookupWithPredicatesProvider() {
+ return ImmutableList.<TestSpec>builder()
+ .addAll(getTestSpecs(true))
+ .addAll(getTestSpecs(false))
+ .build();
+ }
+
+ @NotNull
+ private static ImmutableList<TestSpec> getTestSpecs(boolean withFailure) {
+ return ImmutableList.of(
+ // var char single filter
+ new TestSpec(
+ withFailure,
+ Collections.singletonList("(comment1 = ?)"),
+ new Serializable[] {"11-c1-v1"},
+ new Object[] {1, StringData.fromString("1")},
+
Collections.singletonList("+I(1,1,11-c1-v1,11-c2-v1,100.1011,1.1,2.2)")),
+ // decimal single filter
+ new TestSpec(
+ withFailure,
+ Collections.singletonList("(decimal_col = ?)"),
+ new Serializable[] {BigDecimal.valueOf(100.1011)},
+ new Object[] {1, StringData.fromString("1")},
+
Collections.singletonList("+I(1,1,11-c1-v1,11-c2-v1,100.1011,1.1,2.2)")),
+ // real single filter
+ new TestSpec(
+ withFailure,
+ Collections.singletonList("(real_col = ?)"),
+ new Serializable[] {2.2},
+ new Object[] {1, StringData.fromString("1")},
+ Arrays.asList(
+ "+I(1,1,11-c1-v1,11-c2-v1,100.1011,1.1,2.2)",
+ "+I(1,1,11-c1-v2,11-c2-v2,100.2022,2.2,2.2)")),
+ // double single filter
+ new TestSpec(
+ withFailure,
+ Collections.singletonList("(double_col = ?)"),
+ new Serializable[] {
+ 1.1,
+ },
+ new Object[] {1, StringData.fromString("1")},
+
Collections.singletonList("+I(1,1,11-c1-v1,11-c2-v1,100.1011,1.1,2.2)")),
+ // and
+ new TestSpec(
+ withFailure,
+ Collections.singletonList("(real_col = ?) AND
(double_col = ?)"),
+ new Serializable[] {2.2, 1.1},
+ new Object[] {1, StringData.fromString("1")},
+
Collections.singletonList("+I(1,1,11-c1-v1,11-c2-v1,100.1011,1.1,2.2)")),
+ // or
+ new TestSpec(
+ withFailure,
+ Collections.singletonList("(decimal_col = ?) OR
(double_col = ?)"),
+ new Serializable[] {BigDecimal.valueOf(100.2022), 1.1},
+ new Object[] {1, StringData.fromString("1")},
+ Arrays.asList(
+ "+I(1,1,11-c1-v1,11-c2-v1,100.1011,1.1,2.2)",
+
"+I(1,1,11-c1-v2,11-c2-v2,100.2022,2.2,2.2)")));
+ }
+
private JdbcRowDataLookupFunction buildRowDataLookupFunction(boolean
withFailure) {
InternalJdbcConnectionOptions jdbcOptions =
InternalJdbcConnectionOptions.builder()
@@ -103,7 +248,36 @@ class JdbcRowDataLookupFunctionTest extends
JdbcLookupTestBase {
fieldNames,
fieldDataTypes,
lookupKeys,
- rowType);
+ rowType,
+ Collections.emptyList(),
+ new Serializable[0]);
+ }
+
+ private JdbcRowDataLookupFunction buildRowDataLookupFunctionWithPredicates(
+ boolean withFailure, List<String> resolvedPredicates,
Serializable[] pushdownParams) {
+ InternalJdbcConnectionOptions jdbcOptions =
+ InternalJdbcConnectionOptions.builder()
+ .setDriverName(getMetadata().getDriverClass())
+ .setDBUrl(getMetadata().getJdbcUrl())
+ .setTableName(LOOKUP_TABLE)
+ .build();
+
+ RowType rowType =
+ RowType.of(
+ Arrays.stream(fieldDataTypes2)
+ .map(DataType::getLogicalType)
+ .toArray(LogicalType[]::new),
+ fieldNames2);
+
+ return new JdbcRowDataLookupFunction(
+ jdbcOptions,
+ withFailure ? 1 : LookupOptions.MAX_RETRIES.defaultValue(),
+ fieldNames2,
+ fieldDataTypes2,
+ lookupKeys,
+ rowType,
+ resolvedPredicates,
+ pushdownParams);
}
private static final class ListOutputCollector implements
Collector<RowData> {
diff --git
a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcTablePlanTest.java
b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcTablePlanTest.java
index 05d8a467..aa186a40 100644
---
a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcTablePlanTest.java
+++
b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcTablePlanTest.java
@@ -35,7 +35,7 @@ public class JdbcTablePlanTest extends TableTestBase {
private TestInfo testInfo;
@BeforeEach
- public void setup(TestInfo testInfo) {
+ void setup(TestInfo testInfo) {
this.testInfo = testInfo;
util.tableEnv()
.executeSql(
@@ -52,24 +52,98 @@ public class JdbcTablePlanTest extends TableTestBase {
+ " 'url'='jdbc:derby:memory:test',"
+ " 'table-name'='test_table'"
+ ")");
+ util.tableEnv()
+ .executeSql(
+ "CREATE TABLE d ( "
+ + "ip varchar(20), type int, age int"
+ + ") WITH ("
+ + " 'connector'='jdbc',"
+ + " 'url'='jdbc:derby:memory:test1',"
+ + " 'table-name'='d'"
+ + ")");
+
+ util.tableEnv()
+ .executeSql(
+ "CREATE TABLE table_with_weird_column_name ( "
+ + "ip varchar(20), type int, ```?age:` int"
+ + ") WITH ("
+ + " 'connector'='jdbc',"
+ + " 'url'='jdbc:derby:memory:test1',"
+ + " 'table-name'='d'"
+ + ")");
+ util.tableEnv()
+ .executeSql(
+ "CREATE TABLE a ( "
+ + " ip string, proctime as proctime() "
+ + ") WITH ("
+ + " 'connector'='jdbc',"
+ + " 'url'='jdbc:derby:memory:test2',"
+ + " 'table-name'='a'"
+ + ")");
}
@Test
- public void testProjectionPushDown() {
+ void testProjectionPushDown() {
util.verifyExecPlan("SELECT decimal_col, timestamp9_col, id FROM
jdbc");
}
@Test
- public void testLimitPushDown() {
+ void testLimitPushDown() {
util.verifyExecPlan("SELECT id, time_col FROM jdbc LIMIT 3");
}
@Test
- public void testFilterPushdown() {
+ void testFilterPushdown() {
util.verifyExecPlan(
"SELECT id, time_col, real_col FROM jdbc WHERE id = 900001 AND
time_col <> TIME '11:11:11' OR double_col >= -1000.23");
}
+ /**
+ * Note the join condition is not present in the optimized plan, see
FLINK-34170, as it is
+ * handled in the JDBC java code, where it adds the join conditions to the
select statement
+ * string.
+ */
+ @Test
+ void testLookupJoin() {
+ util.verifyExecPlan(
+ "SELECT * FROM a LEFT JOIN d FOR SYSTEM_TIME AS OF a.proctime
ON a.ip = d.ip");
+ }
+
+ @Test
+ void testLookupJoinWithFilter() {
+ util.verifyExecPlan(
+ "SELECT * FROM a LEFT JOIN d FOR SYSTEM_TIME AS OF a.proctime
ON d.type = 0 AND a.ip = d.ip");
+ }
+
+ @Test
+ void testLookupJoinWithANDAndORFilter() {
+ util.verifyExecPlan(
+ "SELECT * FROM a LEFT JOIN d FOR SYSTEM_TIME AS OF a.proctime
ON ((d.age = 50 AND d.type = 0) "
+ + "OR (d.type = 1 AND d.age = 40)) AND a.ip = d.ip");
+ }
+
+ @Test
+ void testLookupJoinWith2ANDsAndORFilter() {
+ util.verifyExecPlan(
+ "SELECT * FROM a JOIN d FOR SYSTEM_TIME AS OF a.proctime "
+ + "ON ((50 > d.age AND d.type = 1 AND d.age > 0 ) "
+ + "OR (70 > d.age AND d.type = 6 AND d.age > 10)) AND
a.ip = d.ip");
+ }
+
+ @Test
+ void testLookupJoinWithORFilter() {
+ util.verifyExecPlan(
+ "SELECT * FROM a LEFT JOIN d FOR SYSTEM_TIME AS OF a.proctime
ON (d.age = 50 OR d.type = 1) AND a.ip = d.ip");
+ }
+
+ @Test
+ void testLookupJoinWithWeirdColumnNames() {
+ util.verifyExecPlan(
+ "SELECT * FROM a LEFT JOIN table_with_weird_column_name FOR
SYSTEM_TIME AS OF a.proctime "
+ + "ON (table_with_weird_column_name.```?age:` = 50 OR
table_with_weird_column_name.type = 1) "
+ + "AND a.ip = table_with_weird_column_name.ip");
+ }
+
/**
* Get the test method name, in order to adapt to {@link TableTestBase}
that has not migrated to
* Junit5. Remove it when dropping support of Flink 1.18.
diff --git
a/flink-connector-jdbc/src/test/resources/org/apache/flink/connector/jdbc/table/JdbcTablePlanTest.xml
b/flink-connector-jdbc/src/test/resources/org/apache/flink/connector/jdbc/table/JdbcTablePlanTest.xml
index b69903f3..f05f5fb3 100644
---
a/flink-connector-jdbc/src/test/resources/org/apache/flink/connector/jdbc/table/JdbcTablePlanTest.xml
+++
b/flink-connector-jdbc/src/test/resources/org/apache/flink/connector/jdbc/table/JdbcTablePlanTest.xml
@@ -16,56 +16,200 @@ See the License for the specific language governing
permissions and
limitations under the License.
-->
<Root>
- <TestCase name="testLimitPushDown">
- <Resource name="sql">
- <![CDATA[SELECT id, time_col FROM jdbc LIMIT 3]]>
- </Resource>
- <Resource name="ast">
- <![CDATA[
+ <TestCase name="testLimitPushDown">
+ <Resource name="sql">
+ <![CDATA[SELECT id, time_col FROM jdbc LIMIT 3]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
LogicalSort(fetch=[3])
+- LogicalProject(id=[$0], time_col=[$3])
+- LogicalTableScan(table=[[default_catalog, default_database, jdbc]])
]]>
- </Resource>
- <Resource name="optimized exec plan">
- <![CDATA[
+ </Resource>
+ <Resource name="optimized exec plan">
+ <![CDATA[
Limit(offset=[0], fetch=[3])
+- Exchange(distribution=[single])
+- TableSourceScan(table=[[default_catalog, default_database, jdbc,
project=[id, time_col], limit=[3]]], fields=[id, time_col])
]]>
- </Resource>
- </TestCase>
- <TestCase name="testProjectionPushDown">
- <Resource name="sql">
- <![CDATA[SELECT decimal_col, timestamp9_col, id FROM jdbc]]>
- </Resource>
- <Resource name="ast">
- <![CDATA[
+ </Resource>
+ </TestCase>
+ <TestCase name="testProjectionPushDown">
+ <Resource name="sql">
+ <![CDATA[SELECT decimal_col, timestamp9_col, id FROM jdbc]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
LogicalProject(decimal_col=[$6], timestamp9_col=[$2], id=[$0])
+- LogicalTableScan(table=[[default_catalog, default_database, jdbc]])
]]>
- </Resource>
- <Resource name="optimized exec plan">
- <![CDATA[
+ </Resource>
+ <Resource name="optimized exec plan">
+ <![CDATA[
TableSourceScan(table=[[default_catalog, default_database, jdbc,
project=[decimal_col, timestamp9_col, id]]], fields=[decimal_col,
timestamp9_col, id])
]]>
- </Resource>
- </TestCase>
- <TestCase name="testFilterPushdown">
- <Resource name="sql">
- <![CDATA[SELECT id, time_col, real_col FROM jdbc WHERE id =
900001 AND time_col <> TIME '11:11:11' OR double_col >= -1000.23]]>
- </Resource>
- <Resource name="ast">
- <![CDATA[
+ </Resource>
+ </TestCase>
+ <TestCase name="testFilterPushdown">
+ <Resource name="sql">
+ <![CDATA[SELECT id, time_col, real_col FROM jdbc WHERE id = 900001
AND time_col <> TIME '11:11:11' OR double_col >= -1000.23]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
LogicalProject(id=[$0], time_col=[$3], real_col=[$4])
+- LogicalFilter(condition=[OR(AND(=($0, 900001), <>($3, 11:11:11)), >=($5,
-1000.23:DECIMAL(6, 2)))])
+- LogicalTableScan(table=[[default_catalog, default_database, jdbc]])
]]>
- </Resource>
- <Resource name="optimized exec plan">
- <![CDATA[
+ </Resource>
+ <Resource name="optimized exec plan">
+ <![CDATA[
TableSourceScan(table=[[default_catalog, default_database, jdbc,
filter=[and(OR(=(id, 900001:BIGINT), >=(double_col, -1000.23:DECIMAL(6, 2))),
OR(<>(time_col, 11:11:11), >=(double_col, -1000.23:DECIMAL(6, 2))))],
project=[id, time_col, real_col]]], fields=[id, time_col, real_col])
]]>
- </Resource>
- </TestCase>
+ </Resource>
+ </TestCase>
+ <TestCase name="testLookupJoinWithANDAndORFilter">
+ <Resource name="sql">
+ <![CDATA[SELECT * FROM a LEFT JOIN d FOR SYSTEM_TIME AS OF
a.proctime ON ((d.age = 50 AND d.type = 0) OR (d.type = 1 AND d.age = 40)) AND
a.ip = d.ip]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(ip=[$0], proctime=[$1], ip0=[$2], type=[$3], age=[$4])
++- LogicalCorrelate(correlation=[$cor0], joinType=[left], requiredColumns=[{0,
1}])
+ :- LogicalProject(ip=[$0], proctime=[PROCTIME()])
+ : +- LogicalTableScan(table=[[default_catalog, default_database, a]])
+ +- LogicalFilter(condition=[AND(OR(AND(=($2, 50), =($1, 0)), AND(=($1, 1),
=($2, 40))), =($cor0.ip, CAST($0):VARCHAR(2147483647) CHARACTER SET
"UTF-16LE"))])
+ +- LogicalSnapshot(period=[$cor0.proctime])
+ +- LogicalTableScan(table=[[default_catalog, default_database, d]])
+]]>
+ </Resource>
+ <Resource name="optimized exec plan">
+ <![CDATA[
+Calc(select=[ip, PROCTIME_MATERIALIZE(proctime) AS proctime, ip0, type, age])
++- LookupJoin(table=[default_catalog.default_database.d],
joinType=[LeftOuterJoin], lookup=[ip=ip], select=[ip, proctime, ip, type, age,
CAST(ip AS VARCHAR(2147483647)) AS ip0])
+ +- Calc(select=[ip, PROCTIME() AS proctime])
+ +- TableSourceScan(table=[[default_catalog, default_database, a]],
fields=[ip])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testLookupJoinWith2ANDsAndORFilter">
+ <Resource name="sql">
+ <![CDATA[SELECT * FROM a JOIN d FOR SYSTEM_TIME AS OF a.proctime
ON ((50 > d.age AND d.type = 1 AND d.age > 0 ) OR (70 > d.age AND d.type = 6
AND d.age > 10)) AND a.ip = d.ip]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(ip=[$0], proctime=[$1], ip0=[$2], type=[$3], age=[$4])
++- LogicalCorrelate(correlation=[$cor0], joinType=[inner],
requiredColumns=[{0, 1}])
+ :- LogicalProject(ip=[$0], proctime=[PROCTIME()])
+ : +- LogicalTableScan(table=[[default_catalog, default_database, a]])
+ +- LogicalFilter(condition=[AND(OR(AND(>(50, $2), =($1, 1), >($2, 0)),
AND(>(70, $2), =($1, 6), >($2, 10))), =($cor0.ip, CAST($0):VARCHAR(2147483647)
CHARACTER SET "UTF-16LE"))])
+ +- LogicalSnapshot(period=[$cor0.proctime])
+ +- LogicalTableScan(table=[[default_catalog, default_database, d]])
+]]>
+ </Resource>
+ <Resource name="optimized exec plan">
+ <![CDATA[
+Calc(select=[ip, PROCTIME_MATERIALIZE(proctime) AS proctime, ip0, type, age])
++- LookupJoin(table=[default_catalog.default_database.d],
joinType=[InnerJoin], lookup=[ip=ip], select=[ip, proctime, ip, type, age,
CAST(ip AS VARCHAR(2147483647)) AS ip0])
+ +- Calc(select=[ip, PROCTIME() AS proctime])
+ +- TableSourceScan(table=[[default_catalog, default_database, a]],
fields=[ip])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testLookupJoin">
+ <Resource name="sql">
+ <![CDATA[SELECT * FROM a LEFT JOIN d FOR SYSTEM_TIME AS OF
a.proctime ON a.ip = d.ip]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(ip=[$0], proctime=[$1], ip0=[$2], type=[$3], age=[$4])
++- LogicalCorrelate(correlation=[$cor0], joinType=[left], requiredColumns=[{0,
1}])
+ :- LogicalProject(ip=[$0], proctime=[PROCTIME()])
+ : +- LogicalTableScan(table=[[default_catalog, default_database, a]])
+ +- LogicalFilter(condition=[=($cor0.ip, CAST($0):VARCHAR(2147483647)
CHARACTER SET "UTF-16LE")])
+ +- LogicalSnapshot(period=[$cor0.proctime])
+ +- LogicalTableScan(table=[[default_catalog, default_database, d]])
+]]>
+ </Resource>
+ <Resource name="optimized exec plan">
+ <![CDATA[
+Calc(select=[ip, PROCTIME_MATERIALIZE(proctime) AS proctime, ip0, type, age])
++- LookupJoin(table=[default_catalog.default_database.d],
joinType=[LeftOuterJoin], lookup=[ip=ip], select=[ip, proctime, ip, type, age,
CAST(ip AS VARCHAR(2147483647)) AS ip0])
+ +- Calc(select=[ip, PROCTIME() AS proctime])
+ +- TableSourceScan(table=[[default_catalog, default_database, a]],
fields=[ip])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testLookupJoinWithFilter">
+ <Resource name="sql">
+ <![CDATA[SELECT * FROM a LEFT JOIN d FOR SYSTEM_TIME AS OF
a.proctime ON d.type = 0 AND a.ip = d.ip]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(ip=[$0], proctime=[$1], ip0=[$2], type=[$3], age=[$4])
++- LogicalCorrelate(correlation=[$cor0], joinType=[left], requiredColumns=[{0,
1}])
+ :- LogicalProject(ip=[$0], proctime=[PROCTIME()])
+ : +- LogicalTableScan(table=[[default_catalog, default_database, a]])
+ +- LogicalFilter(condition=[AND(=($1, 0), =($cor0.ip,
CAST($0):VARCHAR(2147483647) CHARACTER SET "UTF-16LE"))])
+ +- LogicalSnapshot(period=[$cor0.proctime])
+ +- LogicalTableScan(table=[[default_catalog, default_database, d]])
+]]>
+ </Resource>
+ <Resource name="optimized exec plan">
+ <![CDATA[
+Calc(select=[ip, PROCTIME_MATERIALIZE(proctime) AS proctime, ip0, type, age])
++- LookupJoin(table=[default_catalog.default_database.d],
joinType=[LeftOuterJoin], lookup=[ip=ip], select=[ip, proctime, ip, CAST(0 AS
INTEGER) AS type, age, CAST(ip AS VARCHAR(2147483647)) AS ip0])
+ +- Calc(select=[ip, PROCTIME() AS proctime])
+ +- TableSourceScan(table=[[default_catalog, default_database, a]],
fields=[ip])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testLookupJoinWithORFilter">
+ <Resource name="sql">
+ <![CDATA[SELECT * FROM a LEFT JOIN d FOR SYSTEM_TIME AS OF
a.proctime ON (d.age = 50 OR d.type = 1) AND a.ip = d.ip]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(ip=[$0], proctime=[$1], ip0=[$2], type=[$3], age=[$4])
++- LogicalCorrelate(correlation=[$cor0], joinType=[left], requiredColumns=[{0,
1}])
+ :- LogicalProject(ip=[$0], proctime=[PROCTIME()])
+ : +- LogicalTableScan(table=[[default_catalog, default_database, a]])
+ +- LogicalFilter(condition=[AND(OR(=($2, 50), =($1, 1)), =($cor0.ip,
CAST($0):VARCHAR(2147483647) CHARACTER SET "UTF-16LE"))])
+ +- LogicalSnapshot(period=[$cor0.proctime])
+ +- LogicalTableScan(table=[[default_catalog, default_database, d]])
+]]>
+ </Resource>
+ <Resource name="optimized exec plan">
+ <![CDATA[
+Calc(select=[ip, PROCTIME_MATERIALIZE(proctime) AS proctime, ip0, type, age])
++- LookupJoin(table=[default_catalog.default_database.d],
joinType=[LeftOuterJoin], lookup=[ip=ip], select=[ip, proctime, ip, type, age,
CAST(ip AS VARCHAR(2147483647)) AS ip0])
+ +- Calc(select=[ip, PROCTIME() AS proctime])
+ +- TableSourceScan(table=[[default_catalog, default_database, a]],
fields=[ip])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testLookupJoinWithWeirdColumnNames">
+ <Resource name="sql">
+ <![CDATA[SELECT * FROM a LEFT JOIN table_with_weird_column_name
FOR SYSTEM_TIME AS OF a.proctime ON (table_with_weird_column_name.```?age:` =
50 OR table_with_weird_column_name.type = 1) AND a.ip =
table_with_weird_column_name.ip]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(ip=[$0], proctime=[$1], ip0=[$2], type=[$3], `?age:=[$4])
++- LogicalCorrelate(correlation=[$cor0], joinType=[left], requiredColumns=[{0,
1}])
+ :- LogicalProject(ip=[$0], proctime=[PROCTIME()])
+ : +- LogicalTableScan(table=[[default_catalog, default_database, a]])
+ +- LogicalFilter(condition=[AND(OR(=($2, 50), =($1, 1)), =($cor0.ip,
CAST($0):VARCHAR(2147483647) CHARACTER SET "UTF-16LE"))])
+ +- LogicalSnapshot(period=[$cor0.proctime])
+ +- LogicalTableScan(table=[[default_catalog, default_database,
table_with_weird_column_name]])
+]]>
+ </Resource>
+ <Resource name="optimized exec plan">
+ <![CDATA[
+Calc(select=[ip, PROCTIME_MATERIALIZE(proctime) AS proctime, ip0, type,
`?age:])
++-
LookupJoin(table=[default_catalog.default_database.table_with_weird_column_name],
joinType=[LeftOuterJoin], lookup=[ip=ip], select=[ip, proctime, ip, type,
`?age:, CAST(ip AS VARCHAR(2147483647)) AS ip0])
+ +- Calc(select=[ip, PROCTIME() AS proctime])
+ +- TableSourceScan(table=[[default_catalog, default_database, a]],
fields=[ip])
+]]>
+ </Resource>
+ </TestCase>
</Root>