This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git


The following commit(s) were added to refs/heads/master by this push:
     new 334c6159 [FLINK-28717] Add null checking for all field getters
334c6159 is described below

commit 334c6159e546ba54aa1c38154882122758604d3d
Author: tsreaper <[email protected]>
AuthorDate: Thu Jul 28 10:30:12 2022 +0800

    [FLINK-28717] Add null checking for all field getters
    
    This closes #249
---
 .../store/utils/RowDataToObjectArrayConverter.java |   5 +-
 .../flink/table/store/utils/RowDataUtils.java      |  15 +
 .../apache/flink/table/store/file/KeyValue.java    |   4 +-
 .../file/stats/FieldStatsArraySerializer.java      |   6 +-
 .../store/file/utils/RowDataPartitionComputer.java |   6 +-
 .../table/ChangelogWithKeyFileStoreTable.java      |   4 +-
 .../TableStoreRowDataObjectInspector.java          |   3 +-
 .../hive/TableStoreHiveStorageHandlerITCase.java   | 486 +++++++++++++++++----
 .../store/kafka/KafkaLogDeserializationSchema.java |   3 +-
 9 files changed, 432 insertions(+), 100 deletions(-)

diff --git 
a/flink-table-store-common/src/main/java/org/apache/flink/table/store/utils/RowDataToObjectArrayConverter.java
 
b/flink-table-store-common/src/main/java/org/apache/flink/table/store/utils/RowDataToObjectArrayConverter.java
index ad708451..23508688 100644
--- 
a/flink-table-store-common/src/main/java/org/apache/flink/table/store/utils/RowDataToObjectArrayConverter.java
+++ 
b/flink-table-store-common/src/main/java/org/apache/flink/table/store/utils/RowDataToObjectArrayConverter.java
@@ -33,7 +33,10 @@ public class RowDataToObjectArrayConverter {
         this.rowType = rowType;
         this.fieldGetters =
                 IntStream.range(0, rowType.getFieldCount())
-                        .mapToObj(i -> 
RowData.createFieldGetter(rowType.getTypeAt(i), i))
+                        .mapToObj(
+                                i ->
+                                        
RowDataUtils.createNullCheckingFieldGetter(
+                                                rowType.getTypeAt(i), i))
                         .toArray(RowData.FieldGetter[]::new);
     }
 
diff --git 
a/flink-table-store-common/src/main/java/org/apache/flink/table/store/utils/RowDataUtils.java
 
b/flink-table-store-common/src/main/java/org/apache/flink/table/store/utils/RowDataUtils.java
index 723d1d3b..d95824ce 100644
--- 
a/flink-table-store-common/src/main/java/org/apache/flink/table/store/utils/RowDataUtils.java
+++ 
b/flink-table-store-common/src/main/java/org/apache/flink/table/store/utils/RowDataUtils.java
@@ -280,4 +280,19 @@ public class RowDataUtils {
         bd = bd.setScale(0, RoundingMode.DOWN);
         return bd.longValue();
     }
+
+    public static RowData.FieldGetter createNullCheckingFieldGetter(
+            LogicalType logicalType, int index) {
+        RowData.FieldGetter getter = RowData.createFieldGetter(logicalType, 
index);
+        if (logicalType.isNullable()) {
+            return getter;
+        } else {
+            return row -> {
+                if (row.isNullAt(index)) {
+                    return null;
+                }
+                return getter.getFieldOrNull(row);
+            };
+        }
+    }
 }
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/KeyValue.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/KeyValue.java
index 252ef84f..426b2696 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/KeyValue.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/KeyValue.java
@@ -21,6 +21,7 @@ package org.apache.flink.table.store.file;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
+import org.apache.flink.table.store.utils.RowDataUtils;
 import org.apache.flink.table.types.logical.BigIntType;
 import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.table.types.logical.TinyIntType;
@@ -147,7 +148,8 @@ public class KeyValue {
                 .mapToObj(
                         i ->
                                 String.valueOf(
-                                        
RowData.createFieldGetter(type.getTypeAt(i), i)
+                                        
RowDataUtils.createNullCheckingFieldGetter(
+                                                        type.getTypeAt(i), i)
                                                 .getFieldOrNull(row)))
                 .collect(Collectors.joining(", "));
     }
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/stats/FieldStatsArraySerializer.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/stats/FieldStatsArraySerializer.java
index 7312f78a..10d23e8b 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/stats/FieldStatsArraySerializer.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/stats/FieldStatsArraySerializer.java
@@ -22,6 +22,7 @@ import org.apache.flink.table.data.GenericRowData;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
 import org.apache.flink.table.store.format.FieldStats;
+import org.apache.flink.table.store.utils.RowDataUtils;
 import org.apache.flink.table.types.logical.ArrayType;
 import org.apache.flink.table.types.logical.BigIntType;
 import org.apache.flink.table.types.logical.LogicalType;
@@ -47,7 +48,10 @@ public class FieldStatsArraySerializer {
         this.serializer = new RowDataSerializer(safeType);
         this.fieldGetters =
                 IntStream.range(0, safeType.getFieldCount())
-                        .mapToObj(i -> 
RowData.createFieldGetter(safeType.getTypeAt(i), i))
+                        .mapToObj(
+                                i ->
+                                        
RowDataUtils.createNullCheckingFieldGetter(
+                                                safeType.getTypeAt(i), i))
                         .toArray(RowData.FieldGetter[]::new);
     }
 
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/RowDataPartitionComputer.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/RowDataPartitionComputer.java
index 99c2e1b1..a12d8105 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/RowDataPartitionComputer.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/RowDataPartitionComputer.java
@@ -19,6 +19,7 @@
 package org.apache.flink.table.store.file.utils;
 
 import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.store.utils.RowDataUtils;
 import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.util.StringUtils;
 
@@ -41,7 +42,10 @@ public class RowDataPartitionComputer {
         this.partitionFieldGetters =
                 Arrays.stream(partitionColumns)
                         .mapToInt(columnList::indexOf)
-                        .mapToObj(i -> 
RowData.createFieldGetter(rowType.getTypeAt(i), i))
+                        .mapToObj(
+                                i ->
+                                        
RowDataUtils.createNullCheckingFieldGetter(
+                                                rowType.getTypeAt(i), i))
                         .toArray(RowData.FieldGetter[]::new);
     }
 
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTable.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTable.java
index a47a1557..c7e3f0d8 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTable.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTable.java
@@ -46,6 +46,7 @@ import 
org.apache.flink.table.store.table.source.SplitGenerator;
 import org.apache.flink.table.store.table.source.TableRead;
 import org.apache.flink.table.store.table.source.TableScan;
 import 
org.apache.flink.table.store.table.source.ValueContentRowDataRecordIterator;
+import org.apache.flink.table.store.utils.RowDataUtils;
 import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.logical.RowType;
 
@@ -83,7 +84,8 @@ public class ChangelogWithKeyFileStoreTable extends 
AbstractFileStoreTable {
                 List<LogicalType> fieldTypes = rowType.getChildren();
                 RowData.FieldGetter[] fieldGetters = new 
RowData.FieldGetter[fieldTypes.size()];
                 for (int i = 0; i < fieldTypes.size(); i++) {
-                    fieldGetters[i] = 
RowData.createFieldGetter(fieldTypes.get(i), i);
+                    fieldGetters[i] =
+                            
RowDataUtils.createNullCheckingFieldGetter(fieldTypes.get(i), i);
                 }
                 mergeFunction = new PartialUpdateMergeFunction(fieldGetters);
                 break;
diff --git 
a/flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/hive/objectinspector/TableStoreRowDataObjectInspector.java
 
b/flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/hive/objectinspector/TableStoreRowDataObjectInspector.java
index a783be83..6b6d94c9 100644
--- 
a/flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/hive/objectinspector/TableStoreRowDataObjectInspector.java
+++ 
b/flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/hive/objectinspector/TableStoreRowDataObjectInspector.java
@@ -20,6 +20,7 @@ package org.apache.flink.table.store.hive.objectinspector;
 
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.store.hive.HiveTypeUtils;
+import org.apache.flink.table.store.utils.RowDataUtils;
 import org.apache.flink.table.types.logical.LogicalType;
 
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
@@ -53,7 +54,7 @@ public class TableStoreRowDataObjectInspector extends 
StructObjectInspector {
                             name,
                             
TableStoreObjectInspectorFactory.create(logicalType),
                             i,
-                            RowData.createFieldGetter(logicalType, i),
+                            
RowDataUtils.createNullCheckingFieldGetter(logicalType, i),
                             fieldComments.get(i));
             structFields.add(structField);
             structFieldMap.put(name, structField);
diff --git 
a/flink-table-store-hive/flink-table-store-hive-connector/src/test/java/org/apache/flink/table/store/hive/TableStoreHiveStorageHandlerITCase.java
 
b/flink-table-store-hive/flink-table-store-hive-connector/src/test/java/org/apache/flink/table/store/hive/TableStoreHiveStorageHandlerITCase.java
index c677eb51..b73f9b76 100644
--- 
a/flink-table-store-hive/flink-table-store-hive-connector/src/test/java/org/apache/flink/table/store/hive/TableStoreHiveStorageHandlerITCase.java
+++ 
b/flink-table-store-hive/flink-table-store-hive-connector/src/test/java/org/apache/flink/table/store/hive/TableStoreHiveStorageHandlerITCase.java
@@ -22,10 +22,12 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.connectors.hive.FlinkEmbeddedHiveRunner;
 import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.data.StringData;
 import org.apache.flink.table.data.TimestampData;
 import org.apache.flink.table.store.CoreOptions;
 import org.apache.flink.table.store.FileStoreTestUtils;
+import org.apache.flink.table.store.file.WriteMode;
 import 
org.apache.flink.table.store.hive.objectinspector.TableStoreObjectInspectorFactory;
 import org.apache.flink.table.store.table.FileStoreTable;
 import org.apache.flink.table.store.table.sink.TableCommit;
@@ -56,6 +58,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.UUID;
 import java.util.concurrent.ThreadLocalRandom;
 
 /**
@@ -82,90 +85,427 @@ public class TableStoreHiveStorageHandlerITCase {
     }
 
     @Test
-    public void testReadExternalTableWithPk() throws Exception {
-        String path = folder.newFolder().toURI().toString();
-        Configuration conf = new Configuration();
-        conf.setString(CoreOptions.PATH, path);
-        conf.setInteger(CoreOptions.BUCKET, 2);
-        conf.setString(CoreOptions.FILE_FORMAT, "avro");
-        FileStoreTable table =
-                FileStoreTestUtils.createFileStoreTable(
-                        conf,
+    public void testReadExternalTableNoPartitionWithPk() throws Exception {
+        List<RowData> data =
+                Arrays.asList(
+                        GenericRowData.of(1, 10L, StringData.fromString("Hi"), 
100L),
+                        GenericRowData.of(1, 20L, 
StringData.fromString("Hello"), 200L),
+                        GenericRowData.of(2, 30L, 
StringData.fromString("World"), 300L),
+                        GenericRowData.of(1, 10L, StringData.fromString("Hi 
Again"), 1000L),
+                        GenericRowData.ofKind(
+                                RowKind.DELETE, 2, 30L, 
StringData.fromString("World"), 300L),
+                        GenericRowData.of(2, 40L, null, 400L),
+                        GenericRowData.of(3, 50L, 
StringData.fromString("Store"), 200L));
+        String tableName =
+                createChangelogExternalTable(
+                        RowType.of(
+                                new LogicalType[] {
+                                    DataTypes.INT().getLogicalType(),
+                                    DataTypes.BIGINT().getLogicalType(),
+                                    DataTypes.STRING().getLogicalType(),
+                                    DataTypes.BIGINT().getLogicalType()
+                                },
+                                new String[] {"a", "b", "c", "d"}),
+                        Collections.emptyList(),
+                        Arrays.asList("a", "b"),
+                        data);
+
+        List<String> actual = hiveShell.executeQuery("SELECT * FROM " + 
tableName + " ORDER BY b");
+        List<String> expected =
+                Arrays.asList(
+                        "1\t10\tHi Again\t1000",
+                        "1\t20\tHello\t200",
+                        "2\t40\tNULL\t400",
+                        "3\t50\tStore\t200");
+        Assert.assertEquals(expected, actual);
+
+        actual = hiveShell.executeQuery("SELECT c, b FROM " + tableName + " 
ORDER BY b");
+        expected = Arrays.asList("Hi Again\t10", "Hello\t20", "NULL\t40", 
"Store\t50");
+        Assert.assertEquals(expected, actual);
+
+        actual = hiveShell.executeQuery("SELECT * FROM " + tableName + " WHERE 
d > 200 ORDER BY b");
+        expected = Arrays.asList("1\t10\tHi Again\t1000", "2\t40\tNULL\t400");
+        Assert.assertEquals(expected, actual);
+
+        actual =
+                hiveShell.executeQuery(
+                        "SELECT a, sum(d) FROM " + tableName + " GROUP BY a 
ORDER BY a");
+        expected = Arrays.asList("1\t1200", "2\t400", "3\t200");
+        Assert.assertEquals(expected, actual);
+
+        actual =
+                hiveShell.executeQuery(
+                        "SELECT d, sum(b) FROM " + tableName + " GROUP BY d 
ORDER BY d");
+        expected = Arrays.asList("200\t70", "400\t40", "1000\t10");
+        Assert.assertEquals(expected, actual);
+    }
+
+    @Test
+    public void testReadExternalTableWithPartitionWithPk() throws Exception {
+        List<RowData> data =
+                Arrays.asList(
+                        GenericRowData.of(1, 10, 100L, 
StringData.fromString("Hi")),
+                        GenericRowData.of(2, 10, 200L, 
StringData.fromString("Hello")),
+                        GenericRowData.of(1, 20, 300L, 
StringData.fromString("World")),
+                        GenericRowData.of(1, 10, 100L, 
StringData.fromString("Hi Again")),
+                        GenericRowData.ofKind(
+                                RowKind.DELETE, 1, 20, 300L, 
StringData.fromString("World")),
+                        GenericRowData.of(2, 20, 100L, null),
+                        GenericRowData.of(1, 30, 200L, 
StringData.fromString("Store")));
+        String tableName =
+                createChangelogExternalTable(
                         RowType.of(
                                 new LogicalType[] {
+                                    DataTypes.INT().getLogicalType(),
                                     DataTypes.INT().getLogicalType(),
                                     DataTypes.BIGINT().getLogicalType(),
                                     DataTypes.STRING().getLogicalType()
                                 },
-                                new String[] {"a", "b", "c"}),
+                                new String[] {"pt", "a", "b", "c"}),
+                        Collections.singletonList("pt"),
+                        Arrays.asList("pt", "a"),
+                        data);
+
+        List<String> actual =
+                hiveShell.executeQuery("SELECT * FROM " + tableName + " ORDER 
BY pt, a");
+        List<String> expected =
+                Arrays.asList(
+                        "1\t10\t100\tHi Again",
+                        "1\t30\t200\tStore",
+                        "2\t10\t200\tHello",
+                        "2\t20\t100\tNULL");
+        Assert.assertEquals(expected, actual);
+
+        actual = hiveShell.executeQuery("SELECT c, a FROM " + tableName + " 
ORDER BY c, a");
+        expected = Arrays.asList("NULL\t20", "Hello\t10", "Hi Again\t10", 
"Store\t30");
+        Assert.assertEquals(expected, actual);
+
+        actual =
+                hiveShell.executeQuery(
+                        "SELECT * FROM " + tableName + " WHERE b > 100 ORDER 
BY pt, a");
+        expected = Arrays.asList("1\t30\t200\tStore", "2\t10\t200\tHello");
+        Assert.assertEquals(expected, actual);
+
+        actual =
+                hiveShell.executeQuery(
+                        "SELECT pt, sum(b), max(c) FROM " + tableName + " 
GROUP BY pt ORDER BY pt");
+        expected = Arrays.asList("1\t300\tStore", "2\t300\tHello");
+        Assert.assertEquals(expected, actual);
+
+        actual =
+                hiveShell.executeQuery(
+                        "SELECT a, sum(b), max(c) FROM " + tableName + " GROUP 
BY a ORDER BY a");
+        expected = Arrays.asList("10\t300\tHi Again", "20\t100\tNULL", 
"30\t200\tStore");
+        Assert.assertEquals(expected, actual);
+
+        actual =
+                hiveShell.executeQuery(
+                        "SELECT b, sum(a), max(c) FROM " + tableName + " GROUP 
BY b ORDER BY b");
+        expected = Arrays.asList("100\t30\tHi Again", "200\t40\tStore");
+        Assert.assertEquals(expected, actual);
+    }
+
+    @Test
+    public void testReadExternalTableNoPartitionWithValueCount() throws 
Exception {
+        List<RowData> data =
+                Arrays.asList(
+                        GenericRowData.of(1, 10L, StringData.fromString("Hi"), 
100L),
+                        GenericRowData.of(1, 20L, 
StringData.fromString("Hello"), 200L),
+                        GenericRowData.of(2, 30L, 
StringData.fromString("World"), 300L),
+                        GenericRowData.of(1, 10L, StringData.fromString("Hi 
Again"), 1000L),
+                        GenericRowData.ofKind(
+                                RowKind.DELETE, 2, 30L, 
StringData.fromString("World"), 300L),
+                        GenericRowData.of(2, 40L, null, 400L),
+                        GenericRowData.of(3, 50L, 
StringData.fromString("Store"), 200L));
+        String tableName =
+                createChangelogExternalTable(
+                        RowType.of(
+                                new LogicalType[] {
+                                    DataTypes.INT().getLogicalType(),
+                                    DataTypes.BIGINT().getLogicalType(),
+                                    DataTypes.STRING().getLogicalType(),
+                                    DataTypes.BIGINT().getLogicalType()
+                                },
+                                new String[] {"a", "b", "c", "d"}),
                         Collections.emptyList(),
-                        Arrays.asList("a", "b"));
+                        Collections.emptyList(),
+                        data);
 
-        TableWrite write = table.newWrite();
-        TableCommit commit = table.newCommit("user");
-        write.write(GenericRowData.of(1, 10L, StringData.fromString("Hi")));
-        write.write(GenericRowData.of(1, 20L, StringData.fromString("Hello")));
-        write.write(GenericRowData.of(2, 30L, StringData.fromString("World")));
-        write.write(GenericRowData.of(1, 10L, StringData.fromString("Hi 
Again")));
-        write.write(GenericRowData.ofKind(RowKind.DELETE, 2, 30L, 
StringData.fromString("World")));
-        write.write(GenericRowData.of(2, 40L, StringData.fromString("Test")));
-        commit.commit("0", write.prepareCommit(true));
-        write.close();
+        List<String> actual =
+                hiveShell.executeQuery("SELECT * FROM " + tableName + " ORDER 
BY b, d");
+        List<String> expected =
+                Arrays.asList(
+                        "1\t10\tHi\t100",
+                        "1\t10\tHi Again\t1000",
+                        "1\t20\tHello\t200",
+                        "2\t40\tNULL\t400",
+                        "3\t50\tStore\t200");
+        Assert.assertEquals(expected, actual);
 
-        hiveShell.execute(
-                String.join(
-                        "\n",
-                        Arrays.asList(
-                                "CREATE EXTERNAL TABLE test_table",
-                                "STORED BY '" + 
TableStoreHiveStorageHandler.class.getName() + "'",
-                                "LOCATION '" + path + "'")));
-        List<String> actual = hiveShell.executeQuery("SELECT b, a, c FROM 
test_table ORDER BY b");
-        List<String> expected = Arrays.asList("10\t1\tHi Again", 
"20\t1\tHello", "40\t2\tTest");
+        actual = hiveShell.executeQuery("SELECT c, b FROM " + tableName + " 
ORDER BY c");
+        expected = Arrays.asList("NULL\t40", "Hello\t20", "Hi\t10", "Hi 
Again\t10", "Store\t50");
+        Assert.assertEquals(expected, actual);
+
+        actual =
+                hiveShell.executeQuery("SELECT * FROM " + tableName + " WHERE 
d <> 200 ORDER BY d");
+        expected = Arrays.asList("1\t10\tHi\t100", "2\t40\tNULL\t400", 
"1\t10\tHi Again\t1000");
+        Assert.assertEquals(expected, actual);
+
+        actual =
+                hiveShell.executeQuery(
+                        "SELECT a, sum(d) FROM " + tableName + " GROUP BY a 
ORDER BY a");
+        expected = Arrays.asList("1\t1300", "2\t400", "3\t200");
+        Assert.assertEquals(expected, actual);
+
+        actual =
+                hiveShell.executeQuery(
+                        "SELECT d, sum(b) FROM " + tableName + " GROUP BY d 
ORDER BY d");
+        expected = Arrays.asList("100\t10", "200\t70", "400\t40", "1000\t10");
         Assert.assertEquals(expected, actual);
     }
 
     @Test
-    public void testReadExternalTableWithoutPk() throws Exception {
-        String path = folder.newFolder().toURI().toString();
-        Configuration conf = new Configuration();
-        conf.setString(CoreOptions.PATH, path);
-        conf.setInteger(CoreOptions.BUCKET, 2);
-        conf.setString(CoreOptions.FILE_FORMAT, "avro");
-        FileStoreTable table =
-                FileStoreTestUtils.createFileStoreTable(
-                        conf,
+    public void testReadExternalTableWithPartitionWithValueCount() throws 
Exception {
+        List<RowData> data =
+                Arrays.asList(
+                        GenericRowData.of(1, 10, 100L, 
StringData.fromString("Hi")),
+                        GenericRowData.of(2, 10, 200L, 
StringData.fromString("Hello")),
+                        GenericRowData.of(1, 20, 300L, 
StringData.fromString("World")),
+                        GenericRowData.of(1, 10, 100L, 
StringData.fromString("Hi Again")),
+                        GenericRowData.ofKind(
+                                RowKind.DELETE, 1, 20, 300L, 
StringData.fromString("World")),
+                        GenericRowData.of(2, 20, 400L, null),
+                        GenericRowData.of(1, 30, 500L, 
StringData.fromString("Store")));
+        String tableName =
+                createChangelogExternalTable(
                         RowType.of(
                                 new LogicalType[] {
+                                    DataTypes.INT().getLogicalType(),
                                     DataTypes.INT().getLogicalType(),
                                     DataTypes.BIGINT().getLogicalType(),
                                     DataTypes.STRING().getLogicalType()
                                 },
-                                new String[] {"a", "b", "c"}),
+                                new String[] {"pt", "a", "b", "c"}),
+                        Collections.singletonList("pt"),
                         Collections.emptyList(),
-                        Collections.emptyList());
+                        data);
+
+        List<String> actual =
+                hiveShell.executeQuery("SELECT * FROM " + tableName + " ORDER 
BY pt, a, c");
+        List<String> expected =
+                Arrays.asList(
+                        "1\t10\t100\tHi",
+                        "1\t10\t100\tHi Again",
+                        "1\t30\t500\tStore",
+                        "2\t10\t200\tHello",
+                        "2\t20\t400\tNULL");
+        Assert.assertEquals(expected, actual);
+
+        actual = hiveShell.executeQuery("SELECT c, b FROM " + tableName + " 
ORDER BY c");
+        expected =
+                Arrays.asList("NULL\t400", "Hello\t200", "Hi\t100", "Hi 
Again\t100", "Store\t500");
+        Assert.assertEquals(expected, actual);
+
+        actual =
+                hiveShell.executeQuery(
+                        "SELECT * FROM " + tableName + " WHERE b < 400 ORDER 
BY b, c");
+        expected = Arrays.asList("1\t10\t100\tHi", "1\t10\t100\tHi Again", 
"2\t10\t200\tHello");
+        Assert.assertEquals(expected, actual);
+
+        actual =
+                hiveShell.executeQuery(
+                        "SELECT pt, max(a), min(c) FROM " + tableName + " 
GROUP BY pt ORDER BY pt");
+        expected = Arrays.asList("1\t30\tHi", "2\t20\tHello");
+        Assert.assertEquals(expected, actual);
+
+        actual =
+                hiveShell.executeQuery(
+                        "SELECT a, sum(b), min(c) FROM " + tableName + " GROUP 
BY a ORDER BY a");
+        expected = Arrays.asList("10\t400\tHello", "20\t400\tNULL", 
"30\t500\tStore");
+        Assert.assertEquals(expected, actual);
+    }
 
+    @Test
+    public void testReadExternalTableNoPartitionAppendOnly() throws Exception {
+        List<RowData> data =
+                Arrays.asList(
+                        GenericRowData.of(1, 10L, StringData.fromString("Hi"), 
100L),
+                        GenericRowData.of(1, 20L, 
StringData.fromString("Hello"), 200L),
+                        GenericRowData.of(2, 30L, 
StringData.fromString("World"), 300L),
+                        GenericRowData.of(1, 10L, StringData.fromString("Hi 
Again"), 1000L),
+                        GenericRowData.of(2, 40L, null, 400L),
+                        GenericRowData.of(3, 50L, 
StringData.fromString("Store"), 200L));
+        String tableName =
+                createAppendOnlyExternalTable(
+                        RowType.of(
+                                new LogicalType[] {
+                                    DataTypes.INT().getLogicalType(),
+                                    DataTypes.BIGINT().getLogicalType(),
+                                    DataTypes.STRING().getLogicalType(),
+                                    DataTypes.BIGINT().getLogicalType()
+                                },
+                                new String[] {"a", "b", "c", "d"}),
+                        Collections.emptyList(),
+                        data);
+
+        List<String> actual =
+                hiveShell.executeQuery("SELECT * FROM " + tableName + " ORDER 
BY a, b, c");
+        List<String> expected =
+                Arrays.asList(
+                        "1\t10\tHi\t100",
+                        "1\t10\tHi Again\t1000",
+                        "1\t20\tHello\t200",
+                        "2\t30\tWorld\t300",
+                        "2\t40\tNULL\t400",
+                        "3\t50\tStore\t200");
+        Assert.assertEquals(expected, actual);
+
+        actual = hiveShell.executeQuery("SELECT c, b FROM " + tableName + " 
ORDER BY c");
+        expected =
+                Arrays.asList(
+                        "NULL\t40",
+                        "Hello\t20",
+                        "Hi\t10",
+                        "Hi Again\t10",
+                        "Store\t50",
+                        "World\t30");
+        Assert.assertEquals(expected, actual);
+
+        actual = hiveShell.executeQuery("SELECT * FROM " + tableName + " WHERE 
d < 300 ORDER BY d");
+        expected = Arrays.asList("1\t10\tHi\t100", "1\t20\tHello\t200", 
"3\t50\tStore\t200");
+        Assert.assertEquals(expected, actual);
+
+        actual =
+                hiveShell.executeQuery(
+                        "SELECT a, sum(d) FROM " + tableName + " GROUP BY a 
ORDER BY a");
+        expected = Arrays.asList("1\t1300", "2\t700", "3\t200");
+        Assert.assertEquals(expected, actual);
+    }
+
+    @Test
+    public void testReadExternalTableWithPartitionAppendOnly() throws 
Exception {
+        List<RowData> data =
+                Arrays.asList(
+                        GenericRowData.of(1, 10, 100L, 
StringData.fromString("Hi")),
+                        GenericRowData.of(2, 10, 200L, 
StringData.fromString("Hello")),
+                        GenericRowData.of(1, 20, 300L, 
StringData.fromString("World")),
+                        GenericRowData.of(1, 10, 100L, 
StringData.fromString("Hi Again")),
+                        GenericRowData.of(2, 20, 400L, null),
+                        GenericRowData.of(1, 30, 500L, 
StringData.fromString("Store")));
+        String tableName =
+                createAppendOnlyExternalTable(
+                        RowType.of(
+                                new LogicalType[] {
+                                    DataTypes.INT().getLogicalType(),
+                                    DataTypes.INT().getLogicalType(),
+                                    DataTypes.BIGINT().getLogicalType(),
+                                    DataTypes.STRING().getLogicalType()
+                                },
+                                new String[] {"pt", "a", "b", "c"}),
+                        Collections.singletonList("pt"),
+                        data);
+
+        List<String> actual =
+                hiveShell.executeQuery("SELECT * FROM " + tableName + " ORDER 
BY pt, a, c");
+        List<String> expected =
+                Arrays.asList(
+                        "1\t10\t100\tHi",
+                        "1\t10\t100\tHi Again",
+                        "1\t20\t300\tWorld",
+                        "1\t30\t500\tStore",
+                        "2\t10\t200\tHello",
+                        "2\t20\t400\tNULL");
+        Assert.assertEquals(expected, actual);
+
+        actual = hiveShell.executeQuery("SELECT c, b FROM " + tableName + " 
ORDER BY c");
+        expected =
+                Arrays.asList(
+                        "NULL\t400",
+                        "Hello\t200",
+                        "Hi\t100",
+                        "Hi Again\t100",
+                        "Store\t500",
+                        "World\t300");
+        Assert.assertEquals(expected, actual);
+
+        actual =
+                hiveShell.executeQuery(
+                        "SELECT * FROM " + tableName + " WHERE b < 400 ORDER 
BY b, c");
+        expected =
+                Arrays.asList(
+                        "1\t10\t100\tHi",
+                        "1\t10\t100\tHi Again",
+                        "2\t10\t200\tHello",
+                        "1\t20\t300\tWorld");
+        Assert.assertEquals(expected, actual);
+
+        actual =
+                hiveShell.executeQuery(
+                        "SELECT pt, max(a), min(c) FROM " + tableName + " 
GROUP BY pt ORDER BY pt");
+        expected = Arrays.asList("1\t30\tHi", "2\t20\tHello");
+        Assert.assertEquals(expected, actual);
+
+        actual =
+                hiveShell.executeQuery(
+                        "SELECT a, sum(b), min(c) FROM " + tableName + " GROUP 
BY a ORDER BY a");
+        expected = Arrays.asList("10\t400\tHello", "20\t700\tWorld", 
"30\t500\tStore");
+        Assert.assertEquals(expected, actual);
+    }
+
+    private String createChangelogExternalTable(
+            RowType rowType,
+            List<String> partitionKeys,
+            List<String> primaryKeys,
+            List<RowData> data)
+            throws Exception {
+        String path = folder.newFolder().toURI().toString();
+        Configuration conf = new Configuration();
+        conf.setString(CoreOptions.PATH, path);
+        conf.setInteger(CoreOptions.BUCKET, 2);
+        conf.setString(CoreOptions.FILE_FORMAT, "avro");
+        FileStoreTable table =
+                FileStoreTestUtils.createFileStoreTable(conf, rowType, 
partitionKeys, primaryKeys);
+
+        return writeData(table, path, data);
+    }
+
+    private String createAppendOnlyExternalTable(
+            RowType rowType, List<String> partitionKeys, List<RowData> data) 
throws Exception {
+        String path = folder.newFolder().toURI().toString();
+        Configuration conf = new Configuration();
+        conf.setString(CoreOptions.PATH, path);
+        conf.setInteger(CoreOptions.BUCKET, 2);
+        conf.setString(CoreOptions.FILE_FORMAT, "avro");
+        conf.set(CoreOptions.WRITE_MODE, WriteMode.APPEND_ONLY);
+        FileStoreTable table =
+                FileStoreTestUtils.createFileStoreTable(
+                        conf, rowType, partitionKeys, Collections.emptyList());
+
+        return writeData(table, path, data);
+    }
+
+    private String writeData(FileStoreTable table, String path, List<RowData> 
data)
+            throws Exception {
         TableWrite write = table.newWrite();
         TableCommit commit = table.newCommit("user");
-        write.write(GenericRowData.of(1, 10L, StringData.fromString("Hi")));
-        write.write(GenericRowData.of(1, 20L, StringData.fromString("Hello")));
-        write.write(GenericRowData.of(2, 30L, StringData.fromString("World")));
-        write.write(GenericRowData.of(1, 10L, StringData.fromString("Hi")));
-        write.write(GenericRowData.ofKind(RowKind.DELETE, 2, 30L, 
StringData.fromString("World")));
-        write.write(GenericRowData.of(2, 40L, StringData.fromString("Test")));
-        commit.commit("0", write.prepareCommit(true));
+        for (RowData rowData : data) {
+            write.write(rowData);
+            if (ThreadLocalRandom.current().nextInt(5) == 0) {
+                commit.commit(UUID.randomUUID().toString(), 
write.prepareCommit(false));
+            }
+        }
+        commit.commit(UUID.randomUUID().toString(), write.prepareCommit(true));
         write.close();
 
+        String tableName = "test_table_" + 
(UUID.randomUUID().toString().substring(0, 4));
         hiveShell.execute(
                 String.join(
                         "\n",
                         Arrays.asList(
-                                "CREATE EXTERNAL TABLE test_table",
+                                "CREATE EXTERNAL TABLE " + tableName + " ",
                                 "STORED BY '" + 
TableStoreHiveStorageHandler.class.getName() + "'",
                                 "LOCATION '" + path + "'")));
-        List<String> actual = hiveShell.executeQuery("SELECT b, a, c FROM 
test_table ORDER BY b");
-        List<String> expected =
-                Arrays.asList("10\t1\tHi", "10\t1\tHi", "20\t1\tHello", 
"40\t2\tTest");
-        Assert.assertEquals(expected, actual);
+        return tableName;
     }
 
     @Test
@@ -433,44 +773,4 @@ public class TableStoreHiveStorageHandlerITCase {
                 hiveShell.executeQuery(
                         "SELECT * FROM test_table WHERE ts = '2022-06-18 
08:30:00'"));
     }
-
-    @Test
-    public void testProjectionPushdown() throws Exception {
-        String path = folder.newFolder().toURI().toString();
-        Configuration conf = new Configuration();
-        conf.setString(CoreOptions.PATH, path);
-        conf.setInteger(CoreOptions.BUCKET, 2);
-        conf.setString(CoreOptions.FILE_FORMAT, "avro");
-        FileStoreTable table =
-                FileStoreTestUtils.createFileStoreTable(
-                        conf,
-                        RowType.of(
-                                new LogicalType[] {
-                                    DataTypes.INT().getLogicalType(),
-                                    DataTypes.BIGINT().getLogicalType(),
-                                    DataTypes.STRING().getLogicalType()
-                                },
-                                new String[] {"a", "b", "c"}),
-                        Collections.emptyList(),
-                        Collections.emptyList());
-
-        TableWrite write = table.newWrite();
-        TableCommit commit = table.newCommit("user");
-        write.write(GenericRowData.of(1, 10L, StringData.fromString("Hi")));
-        write.write(GenericRowData.of(2, 20L, StringData.fromString("Hello")));
-        write.write(GenericRowData.of(3, 30L, StringData.fromString("World")));
-        commit.commit("0", write.prepareCommit(true));
-        write.close();
-
-        hiveShell.execute(
-                String.join(
-                        "\n",
-                        Arrays.asList(
-                                "CREATE EXTERNAL TABLE test_table",
-                                "STORED BY '" + 
TableStoreHiveStorageHandler.class.getName() + "'",
-                                "LOCATION '" + path + "'")));
-        List<String> actual = hiveShell.executeQuery("SELECT c, a FROM 
test_table ORDER BY a");
-        List<String> expected = Arrays.asList("Hi\t1", "Hello\t2", "World\t3");
-        Assert.assertEquals(expected, actual);
-    }
 }
diff --git 
a/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogDeserializationSchema.java
 
b/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogDeserializationSchema.java
index 7c2f9fa7..2bf79ecc 100644
--- 
a/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogDeserializationSchema.java
+++ 
b/flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogDeserializationSchema.java
@@ -26,6 +26,7 @@ import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
 import org.apache.flink.table.store.utils.ProjectedRowData;
 import org.apache.flink.table.store.utils.Projection;
+import org.apache.flink.table.store.utils.RowDataUtils;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.types.RowKind;
 import org.apache.flink.util.Collector;
@@ -71,7 +72,7 @@ public class KafkaLogDeserializationSchema implements 
KafkaDeserializationSchema
                 IntStream.range(0, primaryKey.length)
                         .mapToObj(
                                 i ->
-                                        RowData.createFieldGetter(
+                                        
RowDataUtils.createNullCheckingFieldGetter(
                                                 physicalType
                                                         .getChildren()
                                                         .get(primaryKey[i])

Reply via email to