This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new faf9a8a  [FLINK-26016][hive] Fix FileSystemLookupFunction does not 
produce correct results when hive table uses columnar storage
faf9a8a is described below

commit faf9a8a1f6849e3cb0b9e074cc647c9d24915c6a
Author: JIN FENG <[email protected]>
AuthorDate: Mon Feb 21 11:38:20 2022 +0800

    [FLINK-26016][hive] Fix FileSystemLookupFunction does not produce correct 
results when hive table uses columnar storage
    
    This closes #18777
---
 .../connectors/hive/FileSystemLookupFunction.java  |  5 +--
 .../connectors/hive/HiveLookupJoinITCase.java      | 40 ++++++++++++++++++++++
 2 files changed, 43 insertions(+), 2 deletions(-)

diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/FileSystemLookupFunction.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/FileSystemLookupFunction.java
index 2c3b530..ca8cb30 100644
--- 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/FileSystemLookupFunction.java
+++ 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/FileSystemLookupFunction.java
@@ -135,9 +135,10 @@ public class FileSystemLookupFunction<P> extends 
TableFunction<RowData> {
                 RowData row;
                 while ((row = partitionReader.read(reuse)) != null) {
                     count++;
-                    RowData key = extractLookupKey(row);
+                    RowData rowData = serializer.copy(row);
+                    RowData key = extractLookupKey(rowData);
                     List<RowData> rows = cache.computeIfAbsent(key, k -> new 
ArrayList<>());
-                    rows.add(serializer.copy(row));
+                    rows.add(rowData);
                 }
                 partitionReader.close();
                 nextLoadTime = System.currentTimeMillis() + 
reloadInterval.toMillis();
diff --git 
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveLookupJoinITCase.java
 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveLookupJoinITCase.java
index dd0d297..1dbb9a4 100644
--- 
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveLookupJoinITCase.java
+++ 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveLookupJoinITCase.java
@@ -34,6 +34,7 @@ import org.apache.flink.table.data.GenericRowData;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.factories.DynamicTableSourceFactory;
 import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.table.planner.factories.TestValuesTableFactory;
 import 
org.apache.flink.table.planner.factories.utils.TestCollectionTableFactory;
 import org.apache.flink.table.runtime.typeutils.InternalSerializers;
 import org.apache.flink.types.Row;
@@ -56,6 +57,7 @@ import static 
org.apache.flink.connectors.hive.HiveOptions.STREAMING_SOURCE_MONI
 import static 
org.apache.flink.connectors.hive.HiveOptions.STREAMING_SOURCE_PARTITION_INCLUDE;
 import static 
org.apache.flink.connectors.hive.HiveOptions.STREAMING_SOURCE_PARTITION_ORDER;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 
 /** Test lookup join of hive tables. */
 public class HiveLookupJoinITCase {
@@ -149,6 +151,12 @@ public class HiveLookupJoinITCase {
                         STREAMING_SOURCE_ENABLE.key(),
                         STREAMING_SOURCE_PARTITION_INCLUDE.key(),
                         STREAMING_SOURCE_PARTITION_ORDER.key()));
+        // create the hive table with columnar storage.
+        tableEnv.executeSql(
+                String.format(
+                        "create table columnar_table (x string) STORED AS 
PARQUET "
+                                + "tblproperties ('%s'='5min')",
+                        HiveOptions.LOOKUP_JOIN_CACHE_TTL.key()));
         tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
     }
 
@@ -350,6 +358,38 @@ public class HiveLookupJoinITCase {
                 "[+I[1, a, 101, 2020, 08, 01], +I[2, b, 122, 2020, 08, 01]]", 
results.toString());
     }
 
+    @Test
+    public void testLookupJoinTableWithColumnarStorage() throws Exception {
+        // constructs test data, as the DEFAULT_SIZE of VectorizedColumnBatch 
is 2048, we should
+        // write as least 2048 records to the test table.
+        List<Row> testData = new ArrayList<>(4096);
+        for (int i = 0; i < 4096; i++) {
+            testData.add(Row.of(String.valueOf(i)));
+        }
+
+        // constructs test data using values table
+        TableEnvironment batchEnv = 
HiveTestUtils.createTableEnvInBatchMode(SqlDialect.DEFAULT);
+        batchEnv.registerCatalog(hiveCatalog.getName(), hiveCatalog);
+        batchEnv.useCatalog(hiveCatalog.getName());
+        String dataId = TestValuesTableFactory.registerData(testData);
+        batchEnv.executeSql(
+                String.format(
+                        "create table value_source(x string, p as proctime()) 
with ("
+                                + "'connector' = 'values', 'data-id' = '%s', 
'bounded'='true')",
+                        dataId));
+        batchEnv.executeSql("insert overwrite columnar_table select x from 
value_source").await();
+        TableImpl flinkTable =
+                (TableImpl)
+                        tableEnv.sqlQuery(
+                                "select t.x as x1, c.x as x2 from value_source 
t "
+                                        + "left join columnar_table for 
system_time as of t.p c "
+                                        + "on t.x = c.x where c.x is null");
+        List<Row> results = 
CollectionUtil.iteratorToList(flinkTable.execute().collect());
+        assertTrue(
+                "All records should be able to be joined, and the final 
results should be empty.",
+                results.size() == 0);
+    }
+
     private FileSystemLookupFunction<HiveTablePartition> 
getLookupFunction(String tableName)
             throws Exception {
         TableEnvironmentInternal tableEnvInternal = (TableEnvironmentInternal) 
tableEnv;

Reply via email to