[ 
https://issues.apache.org/jira/browse/FLINK-26016?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17492374#comment-17492374
 ] 

jinfeng commented on FLINK-26016:
---------------------------------

[~luoyuxia] , Thanks for your reply.  I think the root cause is the 
ColumnarRowData::getString() method. 
{code:java}
//代码占位符

@Override
public StringData getString(int pos) {
Bytes byteArray = vectorizedColumnBatch.getByteArray(rowId, pos);
return StringData.{_}fromBytes{_}(byteArray.data, byteArray.offset, 
byteArray.len);
}
{code}

When reading StringData, the data is not copied. And vectorizedColumnBatch is 
reused when reading different batch. In the lookup join function, the bytes 
corresponding to the key read first in the cache will be overwritten by the 
bytes corresponding to the key read later.

> FileSystemLookupFunction does not produce correct results when hive table 
> uses columnar storage
> -----------------------------------------------------------------------------------------------
>
>                 Key: FLINK-26016
>                 URL: https://issues.apache.org/jira/browse/FLINK-26016
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / Hive
>    Affects Versions: 1.14.3
>            Reporter: jinfeng
>            Priority: Major
>
> When I use the parquet hive table as the lookup table, there will be some 
> records that cannot be joined. This can be reproduced by adding unit tests to 
> HiveLookupJoinITCase.
> {code:java}
>   // create the hive table with columnar storage.
>         tableEnv.executeSql(
>                 String.format(
>                         "create table columnar_table (x string) STORED AS 
> PARQUET "
>                                 + "tblproperties ('%s'='5min')",
>                         HiveOptions.LOOKUP_JOIN_CACHE_TTL.key()));
>     @Test
>     public void testLookupJoinTableWithColumnarStorage() throws Exception {
>         // constructs test data, as the DEFAULT_SIZE of VectorizedColumnBatch 
> is 2048, we should
>         // write as least 2048 records to the test table.
>         List<Row> testData = new ArrayList<>(4096);
>         for (int i = 0; i < 4096; i++) {
>             testData.add(Row.of(String.valueOf(i)));
>         }
>         // constructs test data using values table
>         TableEnvironment batchEnv = 
> HiveTestUtils.createTableEnvInBatchMode(SqlDialect.DEFAULT);
>         batchEnv.registerCatalog(hiveCatalog.getName(), hiveCatalog);
>         batchEnv.useCatalog(hiveCatalog.getName());
>         String dataId = TestValuesTableFactory.registerData(testData);
>         batchEnv.executeSql(
>                 String.format(
>                         "create table value_source(x string, p as proctime()) 
> with ("
>                                 + "'connector' = 'values', 'data-id' = '%s', 
> 'bounded'='true')",
>                         dataId));
>         batchEnv.executeSql("insert overwrite columnar_table select x from 
> value_source").await();
>         TableImpl flinkTable =
>                 (TableImpl)
>                         tableEnv.sqlQuery(
>                                 "select t.x as x1, c.x as x2 from 
> value_source t "
>                                         + "left join columnar_table for 
> system_time as of t.p c "
>                                         + "on t.x = c.x where c.x is null");
>         List<Row> results = 
> CollectionUtil.iteratorToList(flinkTable.execute().collect());
>         assertTrue(results.size() == 0);
>     }
> {code}
> The problem may be caused by the following code. 
> {code:java}
> RowData row;
> while ((row = partitionReader.read(reuse)) != null) {
>    count++;
>    RowData key = extractLookupKey(row);
>    List<RowData> rows = cache.computeIfAbsent(key, k -> new ArrayList<>());
>    rows.add(serializer.copy(row));
> }
> {code}
>          
> It can be fixed with the following modification
> {code:java}
> RowData row;
> while ((row = partitionReader.read(reuse)) != null) {
>     count++;
>     RowData rowData = serializer.copy(row);
>     RowData key = extractLookupKey(rowData);
>     List<RowData> rows = cache.computeIfAbsent(key, k -> new ArrayList<>());
>     rows.add(rowData);
> }
> {code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to