This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new faf9a8a [FLINK-26016][hive] Fix FileSystemLookupFunction does not
produce correct results when hive table uses columnar storage
faf9a8a is described below
commit faf9a8a1f6849e3cb0b9e074cc647c9d24915c6a
Author: JIN FENG <[email protected]>
AuthorDate: Mon Feb 21 11:38:20 2022 +0800
[FLINK-26016][hive] Fix FileSystemLookupFunction does not produce correct
results when hive table uses columnar storage
This closes #18777
---
.../connectors/hive/FileSystemLookupFunction.java | 5 +--
.../connectors/hive/HiveLookupJoinITCase.java | 40 ++++++++++++++++++++++
2 files changed, 43 insertions(+), 2 deletions(-)
diff --git
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/FileSystemLookupFunction.java
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/FileSystemLookupFunction.java
index 2c3b530..ca8cb30 100644
---
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/FileSystemLookupFunction.java
+++
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/FileSystemLookupFunction.java
@@ -135,9 +135,10 @@ public class FileSystemLookupFunction<P> extends
TableFunction<RowData> {
RowData row;
while ((row = partitionReader.read(reuse)) != null) {
count++;
- RowData key = extractLookupKey(row);
+ RowData rowData = serializer.copy(row);
+ RowData key = extractLookupKey(rowData);
List<RowData> rows = cache.computeIfAbsent(key, k -> new
ArrayList<>());
- rows.add(serializer.copy(row));
+ rows.add(rowData);
}
partitionReader.close();
nextLoadTime = System.currentTimeMillis() +
reloadInterval.toMillis();
diff --git
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveLookupJoinITCase.java
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveLookupJoinITCase.java
index dd0d297..1dbb9a4 100644
---
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveLookupJoinITCase.java
+++
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveLookupJoinITCase.java
@@ -34,6 +34,7 @@ import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.factories.DynamicTableSourceFactory;
import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.table.planner.factories.TestValuesTableFactory;
import
org.apache.flink.table.planner.factories.utils.TestCollectionTableFactory;
import org.apache.flink.table.runtime.typeutils.InternalSerializers;
import org.apache.flink.types.Row;
@@ -56,6 +57,7 @@ import static
org.apache.flink.connectors.hive.HiveOptions.STREAMING_SOURCE_MONI
import static
org.apache.flink.connectors.hive.HiveOptions.STREAMING_SOURCE_PARTITION_INCLUDE;
import static
org.apache.flink.connectors.hive.HiveOptions.STREAMING_SOURCE_PARTITION_ORDER;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
/** Test lookup join of hive tables. */
public class HiveLookupJoinITCase {
@@ -149,6 +151,12 @@ public class HiveLookupJoinITCase {
STREAMING_SOURCE_ENABLE.key(),
STREAMING_SOURCE_PARTITION_INCLUDE.key(),
STREAMING_SOURCE_PARTITION_ORDER.key()));
+ // create the hive table with columnar storage.
+ tableEnv.executeSql(
+ String.format(
+ "create table columnar_table (x string) STORED AS
PARQUET "
+ + "tblproperties ('%s'='5min')",
+ HiveOptions.LOOKUP_JOIN_CACHE_TTL.key()));
tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
}
@@ -350,6 +358,38 @@ public class HiveLookupJoinITCase {
"[+I[1, a, 101, 2020, 08, 01], +I[2, b, 122, 2020, 08, 01]]",
results.toString());
}
+ @Test
+ public void testLookupJoinTableWithColumnarStorage() throws Exception {
+ // constructs test data, as the DEFAULT_SIZE of VectorizedColumnBatch
is 2048, we should
+ // write as least 2048 records to the test table.
+ List<Row> testData = new ArrayList<>(4096);
+ for (int i = 0; i < 4096; i++) {
+ testData.add(Row.of(String.valueOf(i)));
+ }
+
+ // constructs test data using values table
+ TableEnvironment batchEnv =
HiveTestUtils.createTableEnvInBatchMode(SqlDialect.DEFAULT);
+ batchEnv.registerCatalog(hiveCatalog.getName(), hiveCatalog);
+ batchEnv.useCatalog(hiveCatalog.getName());
+ String dataId = TestValuesTableFactory.registerData(testData);
+ batchEnv.executeSql(
+ String.format(
+ "create table value_source(x string, p as proctime())
with ("
+ + "'connector' = 'values', 'data-id' = '%s',
'bounded'='true')",
+ dataId));
+ batchEnv.executeSql("insert overwrite columnar_table select x from
value_source").await();
+ TableImpl flinkTable =
+ (TableImpl)
+ tableEnv.sqlQuery(
+ "select t.x as x1, c.x as x2 from value_source
t "
+ + "left join columnar_table for
system_time as of t.p c "
+ + "on t.x = c.x where c.x is null");
+ List<Row> results =
CollectionUtil.iteratorToList(flinkTable.execute().collect());
+ assertTrue(
+ "All records should be able to be joined, and the final
results should be empty.",
+ results.size() == 0);
+ }
+
private FileSystemLookupFunction<HiveTablePartition>
getLookupFunction(String tableName)
throws Exception {
TableEnvironmentInternal tableEnvInternal = (TableEnvironmentInternal)
tableEnv;