This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git
The following commit(s) were added to refs/heads/master by this push:
new 334c6159 [FLINK-28717] Add null checking for all field getters
334c6159 is described below
commit 334c6159e546ba54aa1c38154882122758604d3d
Author: tsreaper <[email protected]>
AuthorDate: Thu Jul 28 10:30:12 2022 +0800
[FLINK-28717] Add null checking for all field getters
This closes #249
---
.../store/utils/RowDataToObjectArrayConverter.java | 5 +-
.../flink/table/store/utils/RowDataUtils.java | 15 +
.../apache/flink/table/store/file/KeyValue.java | 4 +-
.../file/stats/FieldStatsArraySerializer.java | 6 +-
.../store/file/utils/RowDataPartitionComputer.java | 6 +-
.../table/ChangelogWithKeyFileStoreTable.java | 4 +-
.../TableStoreRowDataObjectInspector.java | 3 +-
.../hive/TableStoreHiveStorageHandlerITCase.java | 486 +++++++++++++++++----
.../store/kafka/KafkaLogDeserializationSchema.java | 3 +-
9 files changed, 432 insertions(+), 100 deletions(-)
diff --git
a/flink-table-store-common/src/main/java/org/apache/flink/table/store/utils/RowDataToObjectArrayConverter.java
b/flink-table-store-common/src/main/java/org/apache/flink/table/store/utils/RowDataToObjectArrayConverter.java
index ad708451..23508688 100644
---
a/flink-table-store-common/src/main/java/org/apache/flink/table/store/utils/RowDataToObjectArrayConverter.java
+++
b/flink-table-store-common/src/main/java/org/apache/flink/table/store/utils/RowDataToObjectArrayConverter.java
@@ -33,7 +33,10 @@ public class RowDataToObjectArrayConverter {
this.rowType = rowType;
this.fieldGetters =
IntStream.range(0, rowType.getFieldCount())
- .mapToObj(i ->
RowData.createFieldGetter(rowType.getTypeAt(i), i))
+ .mapToObj(
+ i ->
+
RowDataUtils.createNullCheckingFieldGetter(
+ rowType.getTypeAt(i), i))
.toArray(RowData.FieldGetter[]::new);
}
diff --git
a/flink-table-store-common/src/main/java/org/apache/flink/table/store/utils/RowDataUtils.java
b/flink-table-store-common/src/main/java/org/apache/flink/table/store/utils/RowDataUtils.java
index 723d1d3b..d95824ce 100644
---
a/flink-table-store-common/src/main/java/org/apache/flink/table/store/utils/RowDataUtils.java
+++
b/flink-table-store-common/src/main/java/org/apache/flink/table/store/utils/RowDataUtils.java
@@ -280,4 +280,19 @@ public class RowDataUtils {
bd = bd.setScale(0, RoundingMode.DOWN);
return bd.longValue();
}
+
+ public static RowData.FieldGetter createNullCheckingFieldGetter(
+ LogicalType logicalType, int index) {
+ RowData.FieldGetter getter = RowData.createFieldGetter(logicalType,
index);
+ if (logicalType.isNullable()) {
+ return getter;
+ } else {
+ return row -> {
+ if (row.isNullAt(index)) {
+ return null;
+ }
+ return getter.getFieldOrNull(row);
+ };
+ }
+ }
}
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/KeyValue.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/KeyValue.java
index 252ef84f..426b2696 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/KeyValue.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/KeyValue.java
@@ -21,6 +21,7 @@ package org.apache.flink.table.store.file;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
+import org.apache.flink.table.store.utils.RowDataUtils;
import org.apache.flink.table.types.logical.BigIntType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.logical.TinyIntType;
@@ -147,7 +148,8 @@ public class KeyValue {
.mapToObj(
i ->
String.valueOf(
-
RowData.createFieldGetter(type.getTypeAt(i), i)
+
RowDataUtils.createNullCheckingFieldGetter(
+ type.getTypeAt(i), i)
.getFieldOrNull(row)))
.collect(Collectors.joining(", "));
}
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/stats/FieldStatsArraySerializer.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/stats/FieldStatsArraySerializer.java
index 7312f78a..10d23e8b 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/stats/FieldStatsArraySerializer.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/stats/FieldStatsArraySerializer.java
@@ -22,6 +22,7 @@ import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
import org.apache.flink.table.store.format.FieldStats;
+import org.apache.flink.table.store.utils.RowDataUtils;
import org.apache.flink.table.types.logical.ArrayType;
import org.apache.flink.table.types.logical.BigIntType;
import org.apache.flink.table.types.logical.LogicalType;
@@ -47,7 +48,10 @@ public class FieldStatsArraySerializer {
this.serializer = new RowDataSerializer(safeType);
this.fieldGetters =
IntStream.range(0, safeType.getFieldCount())
- .mapToObj(i ->
RowData.createFieldGetter(safeType.getTypeAt(i), i))
+ .mapToObj(
+ i ->
+
RowDataUtils.createNullCheckingFieldGetter(
+ safeType.getTypeAt(i), i))
.toArray(RowData.FieldGetter[]::new);
}
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/RowDataPartitionComputer.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/RowDataPartitionComputer.java
index 99c2e1b1..a12d8105 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/RowDataPartitionComputer.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/RowDataPartitionComputer.java
@@ -19,6 +19,7 @@
package org.apache.flink.table.store.file.utils;
import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.store.utils.RowDataUtils;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.util.StringUtils;
@@ -41,7 +42,10 @@ public class RowDataPartitionComputer {
this.partitionFieldGetters =
Arrays.stream(partitionColumns)
.mapToInt(columnList::indexOf)
- .mapToObj(i ->
RowData.createFieldGetter(rowType.getTypeAt(i), i))
+ .mapToObj(
+ i ->
+
RowDataUtils.createNullCheckingFieldGetter(
+ rowType.getTypeAt(i), i))
.toArray(RowData.FieldGetter[]::new);
}
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTable.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTable.java
index a47a1557..c7e3f0d8 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTable.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTable.java
@@ -46,6 +46,7 @@ import
org.apache.flink.table.store.table.source.SplitGenerator;
import org.apache.flink.table.store.table.source.TableRead;
import org.apache.flink.table.store.table.source.TableScan;
import
org.apache.flink.table.store.table.source.ValueContentRowDataRecordIterator;
+import org.apache.flink.table.store.utils.RowDataUtils;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
@@ -83,7 +84,8 @@ public class ChangelogWithKeyFileStoreTable extends
AbstractFileStoreTable {
List<LogicalType> fieldTypes = rowType.getChildren();
RowData.FieldGetter[] fieldGetters = new
RowData.FieldGetter[fieldTypes.size()];
for (int i = 0; i < fieldTypes.size(); i++) {
- fieldGetters[i] =
RowData.createFieldGetter(fieldTypes.get(i), i);
+ fieldGetters[i] =
+
RowDataUtils.createNullCheckingFieldGetter(fieldTypes.get(i), i);
}
mergeFunction = new PartialUpdateMergeFunction(fieldGetters);
break;
diff --git
a/flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/hive/objectinspector/TableStoreRowDataObjectInspector.java
b/flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/hive/objectinspector/TableStoreRowDataObjectInspector.java
index a783be83..6b6d94c9 100644
---
a/flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/hive/objectinspector/TableStoreRowDataObjectInspector.java
+++
b/flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/hive/objectinspector/TableStoreRowDataObjectInspector.java
@@ -20,6 +20,7 @@ package org.apache.flink.table.store.hive.objectinspector;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.store.hive.HiveTypeUtils;
+import org.apache.flink.table.store.utils.RowDataUtils;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
@@ -53,7 +54,7 @@ public class TableStoreRowDataObjectInspector extends
StructObjectInspector {
name,
TableStoreObjectInspectorFactory.create(logicalType),
i,
- RowData.createFieldGetter(logicalType, i),
+
RowDataUtils.createNullCheckingFieldGetter(logicalType, i),
fieldComments.get(i));
structFields.add(structField);
structFieldMap.put(name, structField);
diff --git
a/flink-table-store-hive/flink-table-store-hive-connector/src/test/java/org/apache/flink/table/store/hive/TableStoreHiveStorageHandlerITCase.java
b/flink-table-store-hive/flink-table-store-hive-connector/src/test/java/org/apache/flink/table/store/hive/TableStoreHiveStorageHandlerITCase.java
index c677eb51..b73f9b76 100644
---
a/flink-table-store-hive/flink-table-store-hive-connector/src/test/java/org/apache/flink/table/store/hive/TableStoreHiveStorageHandlerITCase.java
+++
b/flink-table-store-hive/flink-table-store-hive-connector/src/test/java/org/apache/flink/table/store/hive/TableStoreHiveStorageHandlerITCase.java
@@ -22,10 +22,12 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.connectors.hive.FlinkEmbeddedHiveRunner;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.data.TimestampData;
import org.apache.flink.table.store.CoreOptions;
import org.apache.flink.table.store.FileStoreTestUtils;
+import org.apache.flink.table.store.file.WriteMode;
import
org.apache.flink.table.store.hive.objectinspector.TableStoreObjectInspectorFactory;
import org.apache.flink.table.store.table.FileStoreTable;
import org.apache.flink.table.store.table.sink.TableCommit;
@@ -56,6 +58,7 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.UUID;
import java.util.concurrent.ThreadLocalRandom;
/**
@@ -82,90 +85,427 @@ public class TableStoreHiveStorageHandlerITCase {
}
@Test
- public void testReadExternalTableWithPk() throws Exception {
- String path = folder.newFolder().toURI().toString();
- Configuration conf = new Configuration();
- conf.setString(CoreOptions.PATH, path);
- conf.setInteger(CoreOptions.BUCKET, 2);
- conf.setString(CoreOptions.FILE_FORMAT, "avro");
- FileStoreTable table =
- FileStoreTestUtils.createFileStoreTable(
- conf,
+ public void testReadExternalTableNoPartitionWithPk() throws Exception {
+ List<RowData> data =
+ Arrays.asList(
+ GenericRowData.of(1, 10L, StringData.fromString("Hi"),
100L),
+ GenericRowData.of(1, 20L,
StringData.fromString("Hello"), 200L),
+ GenericRowData.of(2, 30L,
StringData.fromString("World"), 300L),
+ GenericRowData.of(1, 10L, StringData.fromString("Hi
Again"), 1000L),
+ GenericRowData.ofKind(
+ RowKind.DELETE, 2, 30L,
StringData.fromString("World"), 300L),
+ GenericRowData.of(2, 40L, null, 400L),
+ GenericRowData.of(3, 50L,
StringData.fromString("Store"), 200L));
+ String tableName =
+ createChangelogExternalTable(
+ RowType.of(
+ new LogicalType[] {
+ DataTypes.INT().getLogicalType(),
+ DataTypes.BIGINT().getLogicalType(),
+ DataTypes.STRING().getLogicalType(),
+ DataTypes.BIGINT().getLogicalType()
+ },
+ new String[] {"a", "b", "c", "d"}),
+ Collections.emptyList(),
+ Arrays.asList("a", "b"),
+ data);
+
+ List<String> actual = hiveShell.executeQuery("SELECT * FROM " +
tableName + " ORDER BY b");
+ List<String> expected =
+ Arrays.asList(
+ "1\t10\tHi Again\t1000",
+ "1\t20\tHello\t200",
+ "2\t40\tNULL\t400",
+ "3\t50\tStore\t200");
+ Assert.assertEquals(expected, actual);
+
+ actual = hiveShell.executeQuery("SELECT c, b FROM " + tableName + "
ORDER BY b");
+ expected = Arrays.asList("Hi Again\t10", "Hello\t20", "NULL\t40",
"Store\t50");
+ Assert.assertEquals(expected, actual);
+
+ actual = hiveShell.executeQuery("SELECT * FROM " + tableName + " WHERE
d > 200 ORDER BY b");
+ expected = Arrays.asList("1\t10\tHi Again\t1000", "2\t40\tNULL\t400");
+ Assert.assertEquals(expected, actual);
+
+ actual =
+ hiveShell.executeQuery(
+ "SELECT a, sum(d) FROM " + tableName + " GROUP BY a
ORDER BY a");
+ expected = Arrays.asList("1\t1200", "2\t400", "3\t200");
+ Assert.assertEquals(expected, actual);
+
+ actual =
+ hiveShell.executeQuery(
+ "SELECT d, sum(b) FROM " + tableName + " GROUP BY d
ORDER BY d");
+ expected = Arrays.asList("200\t70", "400\t40", "1000\t10");
+ Assert.assertEquals(expected, actual);
+ }
+
+ @Test
+ public void testReadExternalTableWithPartitionWithPk() throws Exception {
+ List<RowData> data =
+ Arrays.asList(
+ GenericRowData.of(1, 10, 100L,
StringData.fromString("Hi")),
+ GenericRowData.of(2, 10, 200L,
StringData.fromString("Hello")),
+ GenericRowData.of(1, 20, 300L,
StringData.fromString("World")),
+ GenericRowData.of(1, 10, 100L,
StringData.fromString("Hi Again")),
+ GenericRowData.ofKind(
+ RowKind.DELETE, 1, 20, 300L,
StringData.fromString("World")),
+ GenericRowData.of(2, 20, 100L, null),
+ GenericRowData.of(1, 30, 200L,
StringData.fromString("Store")));
+ String tableName =
+ createChangelogExternalTable(
RowType.of(
new LogicalType[] {
+ DataTypes.INT().getLogicalType(),
DataTypes.INT().getLogicalType(),
DataTypes.BIGINT().getLogicalType(),
DataTypes.STRING().getLogicalType()
},
- new String[] {"a", "b", "c"}),
+ new String[] {"pt", "a", "b", "c"}),
+ Collections.singletonList("pt"),
+ Arrays.asList("pt", "a"),
+ data);
+
+ List<String> actual =
+ hiveShell.executeQuery("SELECT * FROM " + tableName + " ORDER
BY pt, a");
+ List<String> expected =
+ Arrays.asList(
+ "1\t10\t100\tHi Again",
+ "1\t30\t200\tStore",
+ "2\t10\t200\tHello",
+ "2\t20\t100\tNULL");
+ Assert.assertEquals(expected, actual);
+
+ actual = hiveShell.executeQuery("SELECT c, a FROM " + tableName + "
ORDER BY c, a");
+ expected = Arrays.asList("NULL\t20", "Hello\t10", "Hi Again\t10",
"Store\t30");
+ Assert.assertEquals(expected, actual);
+
+ actual =
+ hiveShell.executeQuery(
+ "SELECT * FROM " + tableName + " WHERE b > 100 ORDER
BY pt, a");
+ expected = Arrays.asList("1\t30\t200\tStore", "2\t10\t200\tHello");
+ Assert.assertEquals(expected, actual);
+
+ actual =
+ hiveShell.executeQuery(
+ "SELECT pt, sum(b), max(c) FROM " + tableName + "
GROUP BY pt ORDER BY pt");
+ expected = Arrays.asList("1\t300\tStore", "2\t300\tHello");
+ Assert.assertEquals(expected, actual);
+
+ actual =
+ hiveShell.executeQuery(
+ "SELECT a, sum(b), max(c) FROM " + tableName + " GROUP
BY a ORDER BY a");
+ expected = Arrays.asList("10\t300\tHi Again", "20\t100\tNULL",
"30\t200\tStore");
+ Assert.assertEquals(expected, actual);
+
+ actual =
+ hiveShell.executeQuery(
+ "SELECT b, sum(a), max(c) FROM " + tableName + " GROUP
BY b ORDER BY b");
+ expected = Arrays.asList("100\t30\tHi Again", "200\t40\tStore");
+ Assert.assertEquals(expected, actual);
+ }
+
+ @Test
+ public void testReadExternalTableNoPartitionWithValueCount() throws
Exception {
+ List<RowData> data =
+ Arrays.asList(
+ GenericRowData.of(1, 10L, StringData.fromString("Hi"),
100L),
+ GenericRowData.of(1, 20L,
StringData.fromString("Hello"), 200L),
+ GenericRowData.of(2, 30L,
StringData.fromString("World"), 300L),
+ GenericRowData.of(1, 10L, StringData.fromString("Hi
Again"), 1000L),
+ GenericRowData.ofKind(
+ RowKind.DELETE, 2, 30L,
StringData.fromString("World"), 300L),
+ GenericRowData.of(2, 40L, null, 400L),
+ GenericRowData.of(3, 50L,
StringData.fromString("Store"), 200L));
+ String tableName =
+ createChangelogExternalTable(
+ RowType.of(
+ new LogicalType[] {
+ DataTypes.INT().getLogicalType(),
+ DataTypes.BIGINT().getLogicalType(),
+ DataTypes.STRING().getLogicalType(),
+ DataTypes.BIGINT().getLogicalType()
+ },
+ new String[] {"a", "b", "c", "d"}),
Collections.emptyList(),
- Arrays.asList("a", "b"));
+ Collections.emptyList(),
+ data);
- TableWrite write = table.newWrite();
- TableCommit commit = table.newCommit("user");
- write.write(GenericRowData.of(1, 10L, StringData.fromString("Hi")));
- write.write(GenericRowData.of(1, 20L, StringData.fromString("Hello")));
- write.write(GenericRowData.of(2, 30L, StringData.fromString("World")));
- write.write(GenericRowData.of(1, 10L, StringData.fromString("Hi
Again")));
- write.write(GenericRowData.ofKind(RowKind.DELETE, 2, 30L,
StringData.fromString("World")));
- write.write(GenericRowData.of(2, 40L, StringData.fromString("Test")));
- commit.commit("0", write.prepareCommit(true));
- write.close();
+ List<String> actual =
+ hiveShell.executeQuery("SELECT * FROM " + tableName + " ORDER
BY b, d");
+ List<String> expected =
+ Arrays.asList(
+ "1\t10\tHi\t100",
+ "1\t10\tHi Again\t1000",
+ "1\t20\tHello\t200",
+ "2\t40\tNULL\t400",
+ "3\t50\tStore\t200");
+ Assert.assertEquals(expected, actual);
- hiveShell.execute(
- String.join(
- "\n",
- Arrays.asList(
- "CREATE EXTERNAL TABLE test_table",
- "STORED BY '" +
TableStoreHiveStorageHandler.class.getName() + "'",
- "LOCATION '" + path + "'")));
- List<String> actual = hiveShell.executeQuery("SELECT b, a, c FROM
test_table ORDER BY b");
- List<String> expected = Arrays.asList("10\t1\tHi Again",
"20\t1\tHello", "40\t2\tTest");
+ actual = hiveShell.executeQuery("SELECT c, b FROM " + tableName + "
ORDER BY c");
+ expected = Arrays.asList("NULL\t40", "Hello\t20", "Hi\t10", "Hi
Again\t10", "Store\t50");
+ Assert.assertEquals(expected, actual);
+
+ actual =
+ hiveShell.executeQuery("SELECT * FROM " + tableName + " WHERE
d <> 200 ORDER BY d");
+ expected = Arrays.asList("1\t10\tHi\t100", "2\t40\tNULL\t400",
"1\t10\tHi Again\t1000");
+ Assert.assertEquals(expected, actual);
+
+ actual =
+ hiveShell.executeQuery(
+ "SELECT a, sum(d) FROM " + tableName + " GROUP BY a
ORDER BY a");
+ expected = Arrays.asList("1\t1300", "2\t400", "3\t200");
+ Assert.assertEquals(expected, actual);
+
+ actual =
+ hiveShell.executeQuery(
+ "SELECT d, sum(b) FROM " + tableName + " GROUP BY d
ORDER BY d");
+ expected = Arrays.asList("100\t10", "200\t70", "400\t40", "1000\t10");
Assert.assertEquals(expected, actual);
}
@Test
- public void testReadExternalTableWithoutPk() throws Exception {
- String path = folder.newFolder().toURI().toString();
- Configuration conf = new Configuration();
- conf.setString(CoreOptions.PATH, path);
- conf.setInteger(CoreOptions.BUCKET, 2);
- conf.setString(CoreOptions.FILE_FORMAT, "avro");
- FileStoreTable table =
- FileStoreTestUtils.createFileStoreTable(
- conf,
+ public void testReadExternalTableWithPartitionWithValueCount() throws
Exception {
+ List<RowData> data =
+ Arrays.asList(
+ GenericRowData.of(1, 10, 100L,
StringData.fromString("Hi")),
+ GenericRowData.of(2, 10, 200L,
StringData.fromString("Hello")),
+ GenericRowData.of(1, 20, 300L,
StringData.fromString("World")),
+ GenericRowData.of(1, 10, 100L,
StringData.fromString("Hi Again")),
+ GenericRowData.ofKind(
+ RowKind.DELETE, 1, 20, 300L,
StringData.fromString("World")),
+ GenericRowData.of(2, 20, 400L, null),
+ GenericRowData.of(1, 30, 500L,
StringData.fromString("Store")));
+ String tableName =
+ createChangelogExternalTable(
RowType.of(
new LogicalType[] {
+ DataTypes.INT().getLogicalType(),
DataTypes.INT().getLogicalType(),
DataTypes.BIGINT().getLogicalType(),
DataTypes.STRING().getLogicalType()
},
- new String[] {"a", "b", "c"}),
+ new String[] {"pt", "a", "b", "c"}),
+ Collections.singletonList("pt"),
Collections.emptyList(),
- Collections.emptyList());
+ data);
+
+ List<String> actual =
+ hiveShell.executeQuery("SELECT * FROM " + tableName + " ORDER
BY pt, a, c");
+ List<String> expected =
+ Arrays.asList(
+ "1\t10\t100\tHi",
+ "1\t10\t100\tHi Again",
+ "1\t30\t500\tStore",
+ "2\t10\t200\tHello",
+ "2\t20\t400\tNULL");
+ Assert.assertEquals(expected, actual);
+
+ actual = hiveShell.executeQuery("SELECT c, b FROM " + tableName + "
ORDER BY c");
+ expected =
+ Arrays.asList("NULL\t400", "Hello\t200", "Hi\t100", "Hi
Again\t100", "Store\t500");
+ Assert.assertEquals(expected, actual);
+
+ actual =
+ hiveShell.executeQuery(
+ "SELECT * FROM " + tableName + " WHERE b < 400 ORDER
BY b, c");
+ expected = Arrays.asList("1\t10\t100\tHi", "1\t10\t100\tHi Again",
"2\t10\t200\tHello");
+ Assert.assertEquals(expected, actual);
+
+ actual =
+ hiveShell.executeQuery(
+ "SELECT pt, max(a), min(c) FROM " + tableName + "
GROUP BY pt ORDER BY pt");
+ expected = Arrays.asList("1\t30\tHi", "2\t20\tHello");
+ Assert.assertEquals(expected, actual);
+
+ actual =
+ hiveShell.executeQuery(
+ "SELECT a, sum(b), min(c) FROM " + tableName + " GROUP
BY a ORDER BY a");
+ expected = Arrays.asList("10\t400\tHello", "20\t400\tNULL",
"30\t500\tStore");
+ Assert.assertEquals(expected, actual);
+ }
+ @Test
+ public void testReadExternalTableNoPartitionAppendOnly() throws Exception {
+ List<RowData> data =
+ Arrays.asList(
+ GenericRowData.of(1, 10L, StringData.fromString("Hi"),
100L),
+ GenericRowData.of(1, 20L,
StringData.fromString("Hello"), 200L),
+ GenericRowData.of(2, 30L,
StringData.fromString("World"), 300L),
+ GenericRowData.of(1, 10L, StringData.fromString("Hi
Again"), 1000L),
+ GenericRowData.of(2, 40L, null, 400L),
+ GenericRowData.of(3, 50L,
StringData.fromString("Store"), 200L));
+ String tableName =
+ createAppendOnlyExternalTable(
+ RowType.of(
+ new LogicalType[] {
+ DataTypes.INT().getLogicalType(),
+ DataTypes.BIGINT().getLogicalType(),
+ DataTypes.STRING().getLogicalType(),
+ DataTypes.BIGINT().getLogicalType()
+ },
+ new String[] {"a", "b", "c", "d"}),
+ Collections.emptyList(),
+ data);
+
+ List<String> actual =
+ hiveShell.executeQuery("SELECT * FROM " + tableName + " ORDER
BY a, b, c");
+ List<String> expected =
+ Arrays.asList(
+ "1\t10\tHi\t100",
+ "1\t10\tHi Again\t1000",
+ "1\t20\tHello\t200",
+ "2\t30\tWorld\t300",
+ "2\t40\tNULL\t400",
+ "3\t50\tStore\t200");
+ Assert.assertEquals(expected, actual);
+
+ actual = hiveShell.executeQuery("SELECT c, b FROM " + tableName + "
ORDER BY c");
+ expected =
+ Arrays.asList(
+ "NULL\t40",
+ "Hello\t20",
+ "Hi\t10",
+ "Hi Again\t10",
+ "Store\t50",
+ "World\t30");
+ Assert.assertEquals(expected, actual);
+
+ actual = hiveShell.executeQuery("SELECT * FROM " + tableName + " WHERE
d < 300 ORDER BY d");
+ expected = Arrays.asList("1\t10\tHi\t100", "1\t20\tHello\t200",
"3\t50\tStore\t200");
+ Assert.assertEquals(expected, actual);
+
+ actual =
+ hiveShell.executeQuery(
+ "SELECT a, sum(d) FROM " + tableName + " GROUP BY a
ORDER BY a");
+ expected = Arrays.asList("1\t1300", "2\t700", "3\t200");
+ Assert.assertEquals(expected, actual);
+ }
+
+ @Test
+ public void testReadExternalTableWithPartitionAppendOnly() throws
Exception {
+ List<RowData> data =
+ Arrays.asList(
+ GenericRowData.of(1, 10, 100L,
StringData.fromString("Hi")),
+ GenericRowData.of(2, 10, 200L,
StringData.fromString("Hello")),
+ GenericRowData.of(1, 20, 300L,
StringData.fromString("World")),
+ GenericRowData.of(1, 10, 100L,
StringData.fromString("Hi Again")),
+ GenericRowData.of(2, 20, 400L, null),
+ GenericRowData.of(1, 30, 500L,
StringData.fromString("Store")));
+ String tableName =
+ createAppendOnlyExternalTable(
+ RowType.of(
+ new LogicalType[] {
+ DataTypes.INT().getLogicalType(),
+ DataTypes.INT().getLogicalType(),
+ DataTypes.BIGINT().getLogicalType(),
+ DataTypes.STRING().getLogicalType()
+ },
+ new String[] {"pt", "a", "b", "c"}),
+ Collections.singletonList("pt"),
+ data);
+
+ List<String> actual =
+ hiveShell.executeQuery("SELECT * FROM " + tableName + " ORDER
BY pt, a, c");
+ List<String> expected =
+ Arrays.asList(
+ "1\t10\t100\tHi",
+ "1\t10\t100\tHi Again",
+ "1\t20\t300\tWorld",
+ "1\t30\t500\tStore",
+ "2\t10\t200\tHello",
+ "2\t20\t400\tNULL");
+ Assert.assertEquals(expected, actual);
+
+ actual = hiveShell.executeQuery("SELECT c, b FROM " + tableName + "
ORDER BY c");
+ expected =
+ Arrays.asList(
+ "NULL\t400",
+ "Hello\t200",
+ "Hi\t100",
+ "Hi Again\t100",
+ "Store\t500",
+ "World\t300");
+ Assert.assertEquals(expected, actual);
+
+ actual =
+ hiveShell.executeQuery(
+ "SELECT * FROM " + tableName + " WHERE b < 400 ORDER
BY b, c");
+ expected =
+ Arrays.asList(
+ "1\t10\t100\tHi",
+ "1\t10\t100\tHi Again",
+ "2\t10\t200\tHello",
+ "1\t20\t300\tWorld");
+ Assert.assertEquals(expected, actual);
+
+ actual =
+ hiveShell.executeQuery(
+ "SELECT pt, max(a), min(c) FROM " + tableName + "
GROUP BY pt ORDER BY pt");
+ expected = Arrays.asList("1\t30\tHi", "2\t20\tHello");
+ Assert.assertEquals(expected, actual);
+
+ actual =
+ hiveShell.executeQuery(
+ "SELECT a, sum(b), min(c) FROM " + tableName + " GROUP
BY a ORDER BY a");
+ expected = Arrays.asList("10\t400\tHello", "20\t700\tWorld",
"30\t500\tStore");
+ Assert.assertEquals(expected, actual);
+ }
+
+ private String createChangelogExternalTable(
+ RowType rowType,
+ List<String> partitionKeys,
+ List<String> primaryKeys,
+ List<RowData> data)
+ throws Exception {
+ String path = folder.newFolder().toURI().toString();
+ Configuration conf = new Configuration();
+ conf.setString(CoreOptions.PATH, path);
+ conf.setInteger(CoreOptions.BUCKET, 2);
+ conf.setString(CoreOptions.FILE_FORMAT, "avro");
+ FileStoreTable table =
+ FileStoreTestUtils.createFileStoreTable(conf, rowType,
partitionKeys, primaryKeys);
+
+ return writeData(table, path, data);
+ }
+
+ private String createAppendOnlyExternalTable(
+ RowType rowType, List<String> partitionKeys, List<RowData> data)
throws Exception {
+ String path = folder.newFolder().toURI().toString();
+ Configuration conf = new Configuration();
+ conf.setString(CoreOptions.PATH, path);
+ conf.setInteger(CoreOptions.BUCKET, 2);
+ conf.setString(CoreOptions.FILE_FORMAT, "avro");
+ conf.set(CoreOptions.WRITE_MODE, WriteMode.APPEND_ONLY);
+ FileStoreTable table =
+ FileStoreTestUtils.createFileStoreTable(
+ conf, rowType, partitionKeys, Collections.emptyList());
+
+ return writeData(table, path, data);
+ }
+
+ private String writeData(FileStoreTable table, String path, List<RowData>
data)
+ throws Exception {
TableWrite write = table.newWrite();
TableCommit commit = table.newCommit("user");
- write.write(GenericRowData.of(1, 10L, StringData.fromString("Hi")));
- write.write(GenericRowData.of(1, 20L, StringData.fromString("Hello")));
- write.write(GenericRowData.of(2, 30L, StringData.fromString("World")));
- write.write(GenericRowData.of(1, 10L, StringData.fromString("Hi")));
- write.write(GenericRowData.ofKind(RowKind.DELETE, 2, 30L,
StringData.fromString("World")));
- write.write(GenericRowData.of(2, 40L, StringData.fromString("Test")));
- commit.commit("0", write.prepareCommit(true));
+ for (RowData rowData : data) {
+ write.write(rowData);
+ if (ThreadLocalRandom.current().nextInt(5) == 0) {
+ commit.commit(UUID.randomUUID().toString(),
write.prepareCommit(false));
+ }
+ }
+ commit.commit(UUID.randomUUID().toString(), write.prepareCommit(true));
write.close();
+ String tableName = "test_table_" +
(UUID.randomUUID().toString().substring(0, 4));
hiveShell.execute(
String.join(
"\n",
Arrays.asList(
- "CREATE EXTERNAL TABLE test_table",
+ "CREATE EXTERNAL TABLE " + tableName + " ",
"STORED BY '" +
TableStoreHiveStorageHandler.class.getName() + "'",
"LOCATION '" + path + "'")));
- List<String> actual = hiveShell.executeQuery("SELECT b, a, c FROM
test_table ORDER BY b");
- List<String> expected =
- Arrays.asList("10\t1\tHi", "10\t1\tHi", "20\t1\tHello",
"40\t2\tTest");
- Assert.assertEquals(expected, actual);
+ return tableName;
}
@Test
@@ -433,44 +773,4 @@ public class TableStoreHiveStorageHandlerITCase {
hiveShell.executeQuery(
"SELECT * FROM test_table WHERE ts = '2022-06-18
08:30:00'"));
}
-
- @Test
- public void testProjectionPushdown() throws Exception {
- String path = folder.newFolder().toURI().toString();
- Configuration conf = new Configuration();
- conf.setString(CoreOptions.PATH, path);
- conf.setInteger(CoreOptions.BUCKET, 2);
- conf.setString(CoreOptions.FILE_FORMAT, "avro");
- FileStoreTable table =
- FileStoreTestUtils.createFileStoreTable(
- conf,
- RowType.of(
- new LogicalType[] {
- DataTypes.INT().getLogicalType(),
- DataTypes.BIGINT().getLogicalType(),
- DataTypes.STRING().getLogicalType()
- },
- new String[] {"a", "b", "c"}),
- Collections.emptyList(),
- Collections.emptyList());
-
- TableWrite write = table.newWrite();
- TableCommit commit = table.newCommit("user");
- write.write(GenericRowData.of(1, 10L, StringData.fromString("Hi")));
- write.write(GenericRowData.of(2, 20L, StringData.fromString("Hello")));
- write.write(GenericRowData.of(3, 30L, StringData.fromString("World")));
- commit.commit("0", write.prepareCommit(true));
- write.close();
-
- hiveShell.execute(
- String.join(
- "\n",
- Arrays.asList(
- "CREATE EXTERNAL TABLE test_table",
- "STORED BY '" +
TableStoreHiveStorageHandler.class.getName() + "'",
- "LOCATION '" + path + "'")));
- List<String> actual = hiveShell.executeQuery("SELECT c, a FROM
test_table ORDER BY a");
- List<String> expected = Arrays.asList("Hi\t1", "Hello\t2", "World\t3");
- Assert.assertEquals(expected, actual);
- }
}
diff --git
a/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogDeserializationSchema.java
b/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogDeserializationSchema.java
index 7c2f9fa7..2bf79ecc 100644
---
a/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogDeserializationSchema.java
+++
b/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogDeserializationSchema.java
@@ -26,6 +26,7 @@ import org.apache.flink.table.data.RowData;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.store.utils.ProjectedRowData;
import org.apache.flink.table.store.utils.Projection;
+import org.apache.flink.table.store.utils.RowDataUtils;
import org.apache.flink.table.types.DataType;
import org.apache.flink.types.RowKind;
import org.apache.flink.util.Collector;
@@ -71,7 +72,7 @@ public class KafkaLogDeserializationSchema implements
KafkaDeserializationSchema
IntStream.range(0, primaryKey.length)
.mapToObj(
i ->
- RowData.createFieldGetter(
+
RowDataUtils.createNullCheckingFieldGetter(
physicalType
.getChildren()
.get(primaryKey[i])