This is an automated email from the ASF dual-hosted git repository.
leonard pushed a commit to branch release-1.15
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.15 by this push:
new c257f98bf38 [FLINK-29992][hive] Fix Hive lookup join fail when column
pushdown to Hive lookup table source
c257f98bf38 is described below
commit c257f98bf38668f5bd9d48ccd307f9f76e2463b2
Author: yuxia Luo <[email protected]>
AuthorDate: Wed Nov 16 15:57:53 2022 +0800
[FLINK-29992][hive] Fix Hive lookup join fail when column pushdown to Hive
lookup table source
This closes #21309.
---
.../flink/connectors/hive/HiveLookupTableSource.java | 11 +++++++++++
.../apache/flink/connectors/hive/HiveTableSource.java | 4 ++--
.../flink/connectors/hive/HiveLookupJoinITCase.java | 19 +++++++++++++++++++
3 files changed, 32 insertions(+), 2 deletions(-)
diff --git
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveLookupTableSource.java
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveLookupTableSource.java
index e68072affe0..296f7cd89eb 100644
---
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveLookupTableSource.java
+++
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveLookupTableSource.java
@@ -30,6 +30,7 @@ import org.apache.flink.connectors.hive.util.JobConfUtils;
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.hive.client.HiveShim;
+import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.connector.source.LookupTableSource;
import org.apache.flink.table.connector.source.TableFunctionProvider;
import org.apache.flink.table.data.RowData;
@@ -90,6 +91,16 @@ public class HiveLookupTableSource extends HiveTableSource
implements LookupTabl
return TableFunctionProvider.of(getLookupFunction(context.getKeys()));
}
+ @Override
+ public DynamicTableSource copy() {
+ HiveLookupTableSource source =
+ new HiveLookupTableSource(jobConf, flinkConf, tablePath,
catalogTable);
+ source.remainingPartitions = remainingPartitions;
+ source.projectedFields = projectedFields;
+ source.limit = limit;
+ return source;
+ }
+
@VisibleForTesting
TableFunction<RowData> getLookupFunction(int[][] keys) {
int[] keyIndices = new int[keys.length];
diff --git
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSource.java
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSource.java
index cc5d63962c8..a9124c888e2 100644
---
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSource.java
+++
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSource.java
@@ -88,9 +88,9 @@ public class HiveTableSource
// Remaining partition specs after partition pruning is performed. Null if
pruning is not pushed
// down.
- @Nullable private List<Map<String, String>> remainingPartitions = null;
+ @Nullable protected List<Map<String, String>> remainingPartitions = null;
protected int[] projectedFields;
- private Long limit = null;
+ protected Long limit = null;
public HiveTableSource(
JobConf jobConf,
diff --git
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveLookupJoinITCase.java
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveLookupJoinITCase.java
index 38834a123c3..489062e287b 100644
---
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveLookupJoinITCase.java
+++
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveLookupJoinITCase.java
@@ -358,6 +358,25 @@ public class HiveLookupJoinITCase {
"[+I[1, a, 101, 2020, 08, 01], +I[2, b, 122, 2020, 08, 01]]",
results.toString());
}
+ @Test
+ public void testLookupJoinWithLookUpSourceProjectPushDown() throws
Exception {
+ TableEnvironment batchEnv =
HiveTestUtils.createTableEnvInBatchMode(SqlDialect.HIVE);
+ batchEnv.registerCatalog(hiveCatalog.getName(), hiveCatalog);
+ batchEnv.useCatalog(hiveCatalog.getName());
+ batchEnv.executeSql(
+ "insert overwrite bounded_table values
(1,'a',10),(2,'a',21),(2,'b',22),(3,'c',33)")
+ .await();
+ tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
+ TableImpl flinkTable =
+ (TableImpl)
+ tableEnv.sqlQuery(
+ "select b.x, b.y from "
+ + "
default_catalog.default_database.probe as p "
+ + " join bounded_table for system_time
as of p.p as b on p.x=b.x and p.y=b.y");
+ List<Row> results =
CollectionUtil.iteratorToList(flinkTable.execute().collect());
+ assertEquals("[+I[1, a], +I[2, b], +I[3, c]]", results.toString());
+ }
+
@Test
public void testLookupJoinTableWithColumnarStorage() throws Exception {
// constructs test data, as the DEFAULT_SIZE of VectorizedColumnBatch
is 2048, we should