[
https://issues.apache.org/jira/browse/FLINK-26016?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17492326#comment-17492326
]
jinfeng edited comment on FLINK-26016 at 2/15/22, 2:41 AM:
-----------------------------------------------------------
[~MartijnVisser] [~luoyuxia] Can you help take a look? If it's a bug, I
can help fix it
was (Author: hackergin):
[~MartijnVisser] Can you help take a look?
> FileSystemLookupFunction does not produce correct results when hive table
> uses columnar storage
> -----------------------------------------------------------------------------------------------
>
> Key: FLINK-26016
> URL: https://issues.apache.org/jira/browse/FLINK-26016
> Project: Flink
> Issue Type: Bug
> Components: Connectors / Hive
> Affects Versions: 1.14.3
> Reporter: jinfeng
> Priority: Major
>
> When I use the parquet hive table as the lookup table, there will be some
> records that cannot be joined. This can be reproduced by adding unit tests to
> HiveLookupJoinITCase.
> {code:java}
> // create the hive table with columnar storage.
> tableEnv.executeSql(
> String.format(
> "create table columnar_table (x string) STORED AS
> PARQUET "
> + "tblproperties ('%s'='5min')",
> HiveOptions.LOOKUP_JOIN_CACHE_TTL.key()));
> @Test
> public void testLookupJoinTableWithColumnarStorage() throws Exception {
> // constructs test data, as the DEFAULT_SIZE of VectorizedColumnBatch
> is 2048, we should
> // write as least 2048 records to the test table.
> List<Row> testData = new ArrayList<>(4096);
> for (int i = 0; i < 4096; i++) {
> testData.add(Row.of(String.valueOf(i)));
> }
> // constructs test data using values table
> TableEnvironment batchEnv =
> HiveTestUtils.createTableEnvInBatchMode(SqlDialect.DEFAULT);
> batchEnv.registerCatalog(hiveCatalog.getName(), hiveCatalog);
> batchEnv.useCatalog(hiveCatalog.getName());
> String dataId = TestValuesTableFactory.registerData(testData);
> batchEnv.executeSql(
> String.format(
> "create table value_source(x string, p as proctime())
> with ("
> + "'connector' = 'values', 'data-id' = '%s',
> 'bounded'='true')",
> dataId));
> batchEnv.executeSql("insert overwrite columnar_table select x from
> value_source").await();
> TableImpl flinkTable =
> (TableImpl)
> tableEnv.sqlQuery(
> "select t.x as x1, c.x as x2 from
> value_source t "
> + "left join columnar_table for
> system_time as of t.p c "
> + "on t.x = c.x where c.x is null");
> List<Row> results =
> CollectionUtil.iteratorToList(flinkTable.execute().collect());
> assertTrue(results.size() == 0);
> }
> {code}
> The problem may be caused by the following code.
> {code:java}
> RowData row;
> while ((row = partitionReader.read(reuse)) != null) {
> count++;
> RowData key = extractLookupKey(row);
> List<RowData> rows = cache.computeIfAbsent(key, k -> new ArrayList<>());
> rows.add(serializer.copy(row));
> }
> {code}
>
> It can be fixed with the following modification
> {code:java}
> RowData row;
> while ((row = partitionReader.read(reuse)) != null) {
> count++;
> RowData rowData = serializer.copy(row);
> RowData key = extractLookupKey(rowData);
> List<RowData> rows = cache.computeIfAbsent(key, k -> new ArrayList<>());
> rows.add(rowData);
> }
> {code}
--
This message was sent by Atlassian Jira
(v8.20.1#820001)