This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new bb674f498 [hive] Map object inspector should normalize map key
properly before search value (#1592)
bb674f498 is described below
commit bb674f498115eaca340351eece6add8a20c4b72d
Author: yuzelin <[email protected]>
AuthorDate: Tue Jul 18 23:11:25 2023 +0800
[hive] Map object inspector should normalize map key properly before search
value (#1592)
---
.../objectinspector/PaimonDateObjectInspector.java | 2 +
.../PaimonDecimalObjectInspector.java | 3 +
.../objectinspector/PaimonMapObjectInspector.java | 6 +-
.../PaimonTimestampObjectInspector.java | 2 +
.../paimon/hive/PaimonStorageHandlerITCase.java | 524 ++++++++++++---------
5 files changed, 320 insertions(+), 217 deletions(-)
diff --git
a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/objectinspector/PaimonDateObjectInspector.java
b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/objectinspector/PaimonDateObjectInspector.java
index a8eee30f7..2e009e15b 100644
---
a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/objectinspector/PaimonDateObjectInspector.java
+++
b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/objectinspector/PaimonDateObjectInspector.java
@@ -66,6 +66,8 @@ public class PaimonDateObjectInspector extends
AbstractPrimitiveJavaObjectInspec
}
if (value instanceof Date) {
return DateTimeUtils.toInternal((Date) value);
+ } else if (value instanceof DateWritable) {
+ return (((DateWritable) value).getDays());
} else {
return DateTimeUtils.toInternal((LocalDate) value);
}
diff --git
a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/objectinspector/PaimonDecimalObjectInspector.java
b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/objectinspector/PaimonDecimalObjectInspector.java
index e01c5cd86..710ce3230 100644
---
a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/objectinspector/PaimonDecimalObjectInspector.java
+++
b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/objectinspector/PaimonDecimalObjectInspector.java
@@ -65,6 +65,9 @@ public class PaimonDecimalObjectInspector extends
AbstractPrimitiveJavaObjectIns
return null;
}
+ if (o instanceof HiveDecimalWritable) {
+ o = ((HiveDecimalWritable) o).getHiveDecimal();
+ }
BigDecimal result = ((HiveDecimal) o).bigDecimalValue();
// during the HiveDecimal to BigDecimal conversion the scale is lost,
when the value is 0
result = result.setScale(scale());
diff --git
a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/objectinspector/PaimonMapObjectInspector.java
b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/objectinspector/PaimonMapObjectInspector.java
index 4d668574d..b36ae2f47 100644
---
a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/objectinspector/PaimonMapObjectInspector.java
+++
b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/objectinspector/PaimonMapObjectInspector.java
@@ -70,7 +70,11 @@ public class PaimonMapObjectInspector implements
MapObjectInspector {
InternalArray valueArrayData = mapData.valueArray();
for (int i = 0; i < mapData.size(); i++) {
Object k = keyGetter.getElementOrNull(keyArrayData, i);
- if (Objects.equals(k, key)) {
+ Object normalizedSearchKey = key;
+ if (keyObjectInspector instanceof WriteableObjectInspector) {
+ normalizedSearchKey = ((WriteableObjectInspector)
keyObjectInspector).convert(key);
+ }
+ if (Objects.equals(k, normalizedSearchKey)) {
return valueGetter.getElementOrNull(valueArrayData, i);
}
}
diff --git
a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/objectinspector/PaimonTimestampObjectInspector.java
b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/objectinspector/PaimonTimestampObjectInspector.java
index 7b3ae41d7..7f57b242b 100644
---
a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/objectinspector/PaimonTimestampObjectInspector.java
+++
b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/objectinspector/PaimonTimestampObjectInspector.java
@@ -66,6 +66,8 @@ public class PaimonTimestampObjectInspector extends
AbstractPrimitiveJavaObjectI
}
if (value instanceof java.sql.Timestamp) {
return Timestamp.fromSQLTimestamp((java.sql.Timestamp) value);
+ } else if (value instanceof TimestampWritable) {
+ return Timestamp.fromSQLTimestamp(((TimestampWritable)
value).getTimestamp());
} else {
return Timestamp.fromLocalDateTime((LocalDateTime) value);
}
diff --git
a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/PaimonStorageHandlerITCase.java
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/PaimonStorageHandlerITCase.java
index cc992be6e..9b9f60c00 100644
---
a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/PaimonStorageHandlerITCase.java
+++
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/PaimonStorageHandlerITCase.java
@@ -22,6 +22,8 @@ import org.apache.paimon.CoreOptions;
import org.apache.paimon.WriteMode;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.Decimal;
+import org.apache.paimon.data.GenericMap;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.data.Timestamp;
@@ -38,7 +40,6 @@ import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowKind;
import org.apache.paimon.types.RowType;
-import org.apache.paimon.utils.StringUtils;
import com.klarna.hiverunner.HiveShell;
import com.klarna.hiverunner.annotations.HiveSQL;
@@ -56,6 +57,8 @@ import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
+import java.io.IOException;
+import java.math.BigDecimal;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.Arrays;
@@ -80,6 +83,10 @@ public class PaimonStorageHandlerITCase {
private static String engine;
+ private String warehouse;
+ private String tablePath;
+ private Identifier identifier;
+ private String externalTable;
private long commitIdentifier;
@BeforeClass
@@ -91,7 +98,7 @@ public class PaimonStorageHandlerITCase {
}
@Before
- public void before() {
+ public void before() throws IOException {
if ("mr".equals(engine)) {
hiveShell.execute("SET hive.execution.engine=mr");
} else if ("tez".equals(engine)) {
@@ -110,6 +117,10 @@ public class PaimonStorageHandlerITCase {
hiveShell.execute("CREATE DATABASE IF NOT EXISTS test_db");
hiveShell.execute("USE test_db");
+ warehouse = folder.newFolder().toURI().toString();
+ tablePath = String.format("%s/default.db/%s", warehouse, TABLE_NAME);
+ identifier = Identifier.create(DATABASE_NAME, TABLE_NAME);
+ externalTable = "test_table_" +
UUID.randomUUID().toString().substring(0, 4);
commitIdentifier = 0;
}
@@ -130,21 +141,31 @@ public class PaimonStorageHandlerITCase {
RowKind.DELETE, 2, 30L,
BinaryString.fromString("World"), 300L),
GenericRow.of(2, 40L, null, 400L),
GenericRow.of(3, 50L,
BinaryString.fromString("Store"), 200L));
- String tableName =
- createChangelogExternalTable(
- RowType.of(
- new DataType[] {
- DataTypes.INT(),
- DataTypes.BIGINT(),
- DataTypes.STRING(),
- DataTypes.BIGINT()
- },
- new String[] {"a", "b", "c", "d"}),
+
+ Options conf = getBasicConf();
+ conf.set(CoreOptions.FILE_FORMAT, CoreOptions.FileFormatType.AVRO);
+ RowType rowType =
+ RowType.of(
+ new DataType[] {
+ DataTypes.INT(),
+ DataTypes.BIGINT(),
+ DataTypes.STRING(),
+ DataTypes.BIGINT()
+ },
+ new String[] {"a", "b", "c", "d"});
+ Table table =
+ FileStoreTestUtils.createFileStoreTable(
+ conf,
+ rowType,
Collections.emptyList(),
Arrays.asList("a", "b"),
- data);
+ identifier);
- List<String> actual = hiveShell.executeQuery("SELECT * FROM " +
tableName + " ORDER BY b");
+ createExternalTable();
+ writeData(table, data);
+
+ List<String> actual =
+ hiveShell.executeQuery("SELECT * FROM " + externalTable + "
ORDER BY b");
List<String> expected =
Arrays.asList(
"1\t10\tHi Again\t1000",
@@ -153,32 +174,34 @@ public class PaimonStorageHandlerITCase {
"3\t50\tStore\t200");
Assert.assertEquals(expected, actual);
- actual = hiveShell.executeQuery("SELECT c, b FROM " + tableName + "
ORDER BY b");
+ actual = hiveShell.executeQuery("SELECT c, b FROM " + externalTable +
" ORDER BY b");
expected = Arrays.asList("Hi Again\t10", "Hello\t20", "NULL\t40",
"Store\t50");
Assert.assertEquals(expected, actual);
- actual = hiveShell.executeQuery("SELECT * FROM " + tableName + " WHERE
d > 200 ORDER BY b");
+ actual =
+ hiveShell.executeQuery(
+ "SELECT * FROM " + externalTable + " WHERE d > 200
ORDER BY b");
expected = Arrays.asList("1\t10\tHi Again\t1000", "2\t40\tNULL\t400");
Assert.assertEquals(expected, actual);
actual =
hiveShell.executeQuery(
- "SELECT a, sum(d) FROM " + tableName + " GROUP BY a
ORDER BY a");
+ "SELECT a, sum(d) FROM " + externalTable + " GROUP BY
a ORDER BY a");
expected = Arrays.asList("1\t1200", "2\t400", "3\t200");
Assert.assertEquals(expected, actual);
actual =
hiveShell.executeQuery(
- "SELECT d, sum(b) FROM " + tableName + " GROUP BY d
ORDER BY d");
+ "SELECT d, sum(b) FROM " + externalTable + " GROUP BY
d ORDER BY d");
expected = Arrays.asList("200\t70", "400\t40", "1000\t10");
Assert.assertEquals(expected, actual);
actual =
hiveShell.executeQuery(
"SELECT T1.a, T1.b, T1.d + T2.d FROM "
- + tableName
+ + externalTable
+ " T1 INNER JOIN "
- + tableName
+ + externalTable
+ " T2 ON T1.a = T2.a AND T1.b = T2.b ORDER BY
T1.a, T1.b");
expected = Arrays.asList("1\t10\t2000", "1\t20\t400", "2\t40\t800",
"3\t50\t400");
Assert.assertEquals(expected, actual);
@@ -186,9 +209,9 @@ public class PaimonStorageHandlerITCase {
actual =
hiveShell.executeQuery(
"SELECT T1.a, T1.b, T2.b, T1.d + T2.d FROM "
- + tableName
+ + externalTable
+ " T1 INNER JOIN "
- + tableName
+ + externalTable
+ " T2 ON T1.a = T2.a ORDER BY T1.a, T1.b,
T2.b");
expected =
Arrays.asList(
@@ -213,22 +236,28 @@ public class PaimonStorageHandlerITCase {
RowKind.DELETE, 1, 20, 300L,
BinaryString.fromString("World")),
GenericRow.of(2, 20, 100L, null),
GenericRow.of(1, 30, 200L,
BinaryString.fromString("Store")));
- String tableName =
- createChangelogExternalTable(
- RowType.of(
- new DataType[] {
- DataTypes.INT(),
- DataTypes.INT(),
- DataTypes.BIGINT(),
- DataTypes.STRING()
- },
- new String[] {"pt", "a", "b", "c"}),
+
+ Options conf = getBasicConf();
+ conf.set(CoreOptions.FILE_FORMAT, CoreOptions.FileFormatType.AVRO);
+ RowType rowType =
+ RowType.of(
+ new DataType[] {
+ DataTypes.INT(), DataTypes.INT(),
DataTypes.BIGINT(), DataTypes.STRING()
+ },
+ new String[] {"pt", "a", "b", "c"});
+ Table table =
+ FileStoreTestUtils.createFileStoreTable(
+ conf,
+ rowType,
Collections.singletonList("pt"),
Arrays.asList("pt", "a"),
- data);
+ identifier);
+
+ createExternalTable();
+ writeData(table, data);
List<String> actual =
- hiveShell.executeQuery("SELECT * FROM " + tableName + " ORDER
BY pt, a");
+ hiveShell.executeQuery("SELECT * FROM " + externalTable + "
ORDER BY pt, a");
List<String> expected =
Arrays.asList(
"1\t10\t100\tHi Again",
@@ -237,40 +266,46 @@ public class PaimonStorageHandlerITCase {
"2\t20\t100\tNULL");
Assert.assertEquals(expected, actual);
- actual = hiveShell.executeQuery("SELECT c, a FROM " + tableName + "
ORDER BY c, a");
+ actual = hiveShell.executeQuery("SELECT c, a FROM " + externalTable +
" ORDER BY c, a");
expected = Arrays.asList("NULL\t20", "Hello\t10", "Hi Again\t10",
"Store\t30");
Assert.assertEquals(expected, actual);
actual =
hiveShell.executeQuery(
- "SELECT * FROM " + tableName + " WHERE b > 100 ORDER
BY pt, a");
+ "SELECT * FROM " + externalTable + " WHERE b > 100
ORDER BY pt, a");
expected = Arrays.asList("1\t30\t200\tStore", "2\t10\t200\tHello");
Assert.assertEquals(expected, actual);
actual =
hiveShell.executeQuery(
- "SELECT pt, sum(b), max(c) FROM " + tableName + "
GROUP BY pt ORDER BY pt");
+ "SELECT pt, sum(b), max(c) FROM "
+ + externalTable
+ + " GROUP BY pt ORDER BY pt");
expected = Arrays.asList("1\t300\tStore", "2\t300\tHello");
Assert.assertEquals(expected, actual);
actual =
hiveShell.executeQuery(
- "SELECT a, sum(b), max(c) FROM " + tableName + " GROUP
BY a ORDER BY a");
+ "SELECT a, sum(b), max(c) FROM "
+ + externalTable
+ + " GROUP BY a ORDER BY a");
expected = Arrays.asList("10\t300\tHi Again", "20\t100\tNULL",
"30\t200\tStore");
Assert.assertEquals(expected, actual);
actual =
hiveShell.executeQuery(
- "SELECT b, sum(a), max(c) FROM " + tableName + " GROUP
BY b ORDER BY b");
+ "SELECT b, sum(a), max(c) FROM "
+ + externalTable
+ + " GROUP BY b ORDER BY b");
expected = Arrays.asList("100\t30\tHi Again", "200\t40\tStore");
Assert.assertEquals(expected, actual);
actual =
hiveShell.executeQuery(
"SELECT a, b FROM (SELECT T1.a AS a, T1.b + T2.b AS b
FROM "
- + tableName
+ + externalTable
+ " T1 JOIN "
- + tableName
+ + externalTable
+ " T2 ON T1.a = T2.a) T3 ORDER BY a, b");
expected = Arrays.asList("10\t200", "10\t300", "10\t300", "10\t400",
"20\t200", "30\t400");
Assert.assertEquals(expected, actual);
@@ -278,9 +313,9 @@ public class PaimonStorageHandlerITCase {
actual =
hiveShell.executeQuery(
"SELECT b, a FROM (SELECT T1.b AS b, T1.a + T2.a AS a
FROM "
- + tableName
+ + externalTable
+ " T1 JOIN "
- + tableName
+ + externalTable
+ " T2 ON T1.b = T2.b) T3 ORDER BY b, a");
expected =
Arrays.asList(
@@ -301,22 +336,32 @@ public class PaimonStorageHandlerITCase {
RowKind.DELETE, 2, 30L,
BinaryString.fromString("World"), 300L),
GenericRow.of(2, 40L, null, 400L),
GenericRow.of(3, 50L,
BinaryString.fromString("Store"), 200L));
- String tableName =
- createChangelogExternalTable(
- RowType.of(
- new DataType[] {
- DataTypes.INT(),
- DataTypes.BIGINT(),
- DataTypes.STRING(),
- DataTypes.BIGINT()
- },
- new String[] {"a", "b", "c", "d"}),
+
+ Options conf = getBasicConf();
+ conf.set(CoreOptions.FILE_FORMAT, CoreOptions.FileFormatType.AVRO);
+ conf.set(CoreOptions.WRITE_MODE, WriteMode.CHANGE_LOG);
+ RowType rowType =
+ RowType.of(
+ new DataType[] {
+ DataTypes.INT(),
+ DataTypes.BIGINT(),
+ DataTypes.STRING(),
+ DataTypes.BIGINT()
+ },
+ new String[] {"a", "b", "c", "d"});
+ Table table =
+ FileStoreTestUtils.createFileStoreTable(
+ conf,
+ rowType,
Collections.emptyList(),
Collections.emptyList(),
- data);
+ identifier);
+
+ createExternalTable();
+ writeData(table, data);
List<String> actual =
- hiveShell.executeQuery("SELECT * FROM " + tableName + " ORDER
BY b, d");
+ hiveShell.executeQuery("SELECT * FROM " + externalTable + "
ORDER BY b, d");
List<String> expected =
Arrays.asList(
"1\t10\tHi\t100",
@@ -326,33 +371,34 @@ public class PaimonStorageHandlerITCase {
"3\t50\tStore\t200");
Assert.assertEquals(expected, actual);
- actual = hiveShell.executeQuery("SELECT c, b FROM " + tableName + "
ORDER BY c");
+ actual = hiveShell.executeQuery("SELECT c, b FROM " + externalTable +
" ORDER BY c");
expected = Arrays.asList("NULL\t40", "Hello\t20", "Hi\t10", "Hi
Again\t10", "Store\t50");
Assert.assertEquals(expected, actual);
actual =
- hiveShell.executeQuery("SELECT * FROM " + tableName + " WHERE
d <> 200 ORDER BY d");
+ hiveShell.executeQuery(
+ "SELECT * FROM " + externalTable + " WHERE d <> 200
ORDER BY d");
expected = Arrays.asList("1\t10\tHi\t100", "2\t40\tNULL\t400",
"1\t10\tHi Again\t1000");
Assert.assertEquals(expected, actual);
actual =
hiveShell.executeQuery(
- "SELECT a, sum(d) FROM " + tableName + " GROUP BY a
ORDER BY a");
+ "SELECT a, sum(d) FROM " + externalTable + " GROUP BY
a ORDER BY a");
expected = Arrays.asList("1\t1300", "2\t400", "3\t200");
Assert.assertEquals(expected, actual);
actual =
hiveShell.executeQuery(
- "SELECT d, sum(b) FROM " + tableName + " GROUP BY d
ORDER BY d");
+ "SELECT d, sum(b) FROM " + externalTable + " GROUP BY
d ORDER BY d");
expected = Arrays.asList("100\t10", "200\t70", "400\t40", "1000\t10");
Assert.assertEquals(expected, actual);
actual =
hiveShell.executeQuery(
"SELECT T1.b, T1.d, T2.d FROM "
- + tableName
+ + externalTable
+ " T1 JOIN "
- + tableName
+ + externalTable
+ " T2 ON T1.b = T2.b ORDER BY T1.b, T1.d,
T2.d");
expected =
Arrays.asList(
@@ -378,22 +424,29 @@ public class PaimonStorageHandlerITCase {
RowKind.DELETE, 1, 20, 300L,
BinaryString.fromString("World")),
GenericRow.of(2, 20, 400L, null),
GenericRow.of(1, 30, 500L,
BinaryString.fromString("Store")));
- String tableName =
- createChangelogExternalTable(
- RowType.of(
- new DataType[] {
- DataTypes.INT(),
- DataTypes.INT(),
- DataTypes.BIGINT(),
- DataTypes.STRING()
- },
- new String[] {"pt", "a", "b", "c"}),
+
+ Options conf = getBasicConf();
+ conf.set(CoreOptions.FILE_FORMAT, CoreOptions.FileFormatType.AVRO);
+ conf.set(CoreOptions.WRITE_MODE, WriteMode.CHANGE_LOG);
+ RowType rowType =
+ RowType.of(
+ new DataType[] {
+ DataTypes.INT(), DataTypes.INT(),
DataTypes.BIGINT(), DataTypes.STRING()
+ },
+ new String[] {"pt", "a", "b", "c"});
+ Table table =
+ FileStoreTestUtils.createFileStoreTable(
+ conf,
+ rowType,
Collections.singletonList("pt"),
Collections.emptyList(),
- data);
+ identifier);
+
+ createExternalTable();
+ writeData(table, data);
List<String> actual =
- hiveShell.executeQuery("SELECT * FROM " + tableName + " ORDER
BY pt, a, c");
+ hiveShell.executeQuery("SELECT * FROM " + externalTable + "
ORDER BY pt, a, c");
List<String> expected =
Arrays.asList(
"1\t10\t100\tHi",
@@ -403,35 +456,39 @@ public class PaimonStorageHandlerITCase {
"2\t20\t400\tNULL");
Assert.assertEquals(expected, actual);
- actual = hiveShell.executeQuery("SELECT c, b FROM " + tableName + "
ORDER BY c");
+ actual = hiveShell.executeQuery("SELECT c, b FROM " + externalTable +
" ORDER BY c");
expected =
Arrays.asList("NULL\t400", "Hello\t200", "Hi\t100", "Hi
Again\t100", "Store\t500");
Assert.assertEquals(expected, actual);
actual =
hiveShell.executeQuery(
- "SELECT * FROM " + tableName + " WHERE b < 400 ORDER
BY b, c");
+ "SELECT * FROM " + externalTable + " WHERE b < 400
ORDER BY b, c");
expected = Arrays.asList("1\t10\t100\tHi", "1\t10\t100\tHi Again",
"2\t10\t200\tHello");
Assert.assertEquals(expected, actual);
actual =
hiveShell.executeQuery(
- "SELECT pt, max(a), min(c) FROM " + tableName + "
GROUP BY pt ORDER BY pt");
+ "SELECT pt, max(a), min(c) FROM "
+ + externalTable
+ + " GROUP BY pt ORDER BY pt");
expected = Arrays.asList("1\t30\tHi", "2\t20\tHello");
Assert.assertEquals(expected, actual);
actual =
hiveShell.executeQuery(
- "SELECT a, sum(b), min(c) FROM " + tableName + " GROUP
BY a ORDER BY a");
+ "SELECT a, sum(b), min(c) FROM "
+ + externalTable
+ + " GROUP BY a ORDER BY a");
expected = Arrays.asList("10\t400\tHello", "20\t400\tNULL",
"30\t500\tStore");
Assert.assertEquals(expected, actual);
actual =
hiveShell.executeQuery(
"SELECT T1.b, T1.c, T2.c FROM "
- + tableName
+ + externalTable
+ " T1 JOIN "
- + tableName
+ + externalTable
+ " T2 ON T1.b = T2.b ORDER BY T1.b, T1.c,
T2.c");
expected =
Arrays.asList(
@@ -455,21 +512,31 @@ public class PaimonStorageHandlerITCase {
GenericRow.of(1, 10L, BinaryString.fromString("Hi
Again"), 1000L),
GenericRow.of(2, 40L, null, 400L),
GenericRow.of(3, 50L,
BinaryString.fromString("Store"), 200L));
- String tableName =
- createAppendOnlyExternalTable(
- RowType.of(
- new DataType[] {
- DataTypes.INT(),
- DataTypes.BIGINT(),
- DataTypes.STRING(),
- DataTypes.BIGINT()
- },
- new String[] {"a", "b", "c", "d"}),
+
+ Options conf = getBasicConf();
+ conf.set(CoreOptions.FILE_FORMAT, CoreOptions.FileFormatType.AVRO);
+ RowType rowType =
+ RowType.of(
+ new DataType[] {
+ DataTypes.INT(),
+ DataTypes.BIGINT(),
+ DataTypes.STRING(),
+ DataTypes.BIGINT()
+ },
+ new String[] {"a", "b", "c", "d"});
+ Table table =
+ FileStoreTestUtils.createFileStoreTable(
+ conf,
+ rowType,
Collections.emptyList(),
- data);
+ Collections.emptyList(),
+ identifier);
+
+ createExternalTable();
+ writeData(table, data);
List<String> actual =
- hiveShell.executeQuery("SELECT * FROM " + tableName + " ORDER
BY a, b, c");
+ hiveShell.executeQuery("SELECT * FROM " + externalTable + "
ORDER BY a, b, c");
List<String> expected =
Arrays.asList(
"1\t10\tHi\t100",
@@ -480,7 +547,7 @@ public class PaimonStorageHandlerITCase {
"3\t50\tStore\t200");
Assert.assertEquals(expected, actual);
- actual = hiveShell.executeQuery("SELECT c, b FROM " + tableName + "
ORDER BY c");
+ actual = hiveShell.executeQuery("SELECT c, b FROM " + externalTable +
" ORDER BY c");
expected =
Arrays.asList(
"NULL\t40",
@@ -493,22 +560,22 @@ public class PaimonStorageHandlerITCase {
actual =
hiveShell.executeQuery(
- "SELECT * FROM " + tableName + " WHERE d < 300 ORDER
BY b, d");
+ "SELECT * FROM " + externalTable + " WHERE d < 300
ORDER BY b, d");
expected = Arrays.asList("1\t10\tHi\t100", "1\t20\tHello\t200",
"3\t50\tStore\t200");
Assert.assertEquals(expected, actual);
actual =
hiveShell.executeQuery(
- "SELECT a, sum(d) FROM " + tableName + " GROUP BY a
ORDER BY a");
+ "SELECT a, sum(d) FROM " + externalTable + " GROUP BY
a ORDER BY a");
expected = Arrays.asList("1\t1300", "2\t700", "3\t200");
Assert.assertEquals(expected, actual);
actual =
hiveShell.executeQuery(
"SELECT T1.a, T1.b, T2.b FROM "
- + tableName
+ + externalTable
+ " T1 JOIN "
- + tableName
+ + externalTable
+ " T2 ON T1.a = T2.a WHERE T1.a > 1 ORDER BY
T1.a, T1.b, T2.b");
expected = Arrays.asList("2\t30\t30", "2\t30\t40", "2\t40\t30",
"2\t40\t40", "3\t50\t50");
Assert.assertEquals(expected, actual);
@@ -524,21 +591,28 @@ public class PaimonStorageHandlerITCase {
GenericRow.of(1, 10, 100L, BinaryString.fromString("Hi
Again")),
GenericRow.of(2, 20, 400L, null),
GenericRow.of(1, 30, 500L,
BinaryString.fromString("Store")));
- String tableName =
- createAppendOnlyExternalTable(
- RowType.of(
- new DataType[] {
- DataTypes.INT(),
- DataTypes.INT(),
- DataTypes.BIGINT(),
- DataTypes.STRING()
- },
- new String[] {"pt", "a", "b", "c"}),
+
+ Options conf = getBasicConf();
+ conf.set(CoreOptions.FILE_FORMAT, CoreOptions.FileFormatType.AVRO);
+ RowType rowType =
+ RowType.of(
+ new DataType[] {
+ DataTypes.INT(), DataTypes.INT(),
DataTypes.BIGINT(), DataTypes.STRING()
+ },
+ new String[] {"pt", "a", "b", "c"});
+ Table table =
+ FileStoreTestUtils.createFileStoreTable(
+ conf,
+ rowType,
Collections.singletonList("pt"),
- data);
+ Collections.emptyList(),
+ identifier);
+
+ createExternalTable();
+ writeData(table, data);
List<String> actual =
- hiveShell.executeQuery("SELECT * FROM " + tableName + " ORDER
BY pt, a, c");
+ hiveShell.executeQuery("SELECT * FROM " + externalTable + "
ORDER BY pt, a, c");
List<String> expected =
Arrays.asList(
"1\t10\t100\tHi",
@@ -549,7 +623,7 @@ public class PaimonStorageHandlerITCase {
"2\t20\t400\tNULL");
Assert.assertEquals(expected, actual);
- actual = hiveShell.executeQuery("SELECT c, b FROM " + tableName + "
ORDER BY c");
+ actual = hiveShell.executeQuery("SELECT c, b FROM " + externalTable +
" ORDER BY c");
expected =
Arrays.asList(
"NULL\t400",
@@ -562,7 +636,7 @@ public class PaimonStorageHandlerITCase {
actual =
hiveShell.executeQuery(
- "SELECT * FROM " + tableName + " WHERE b < 400 ORDER
BY b, c");
+ "SELECT * FROM " + externalTable + " WHERE b < 400
ORDER BY b, c");
expected =
Arrays.asList(
"1\t10\t100\tHi",
@@ -573,22 +647,26 @@ public class PaimonStorageHandlerITCase {
actual =
hiveShell.executeQuery(
- "SELECT pt, max(a), min(c) FROM " + tableName + "
GROUP BY pt ORDER BY pt");
+ "SELECT pt, max(a), min(c) FROM "
+ + externalTable
+ + " GROUP BY pt ORDER BY pt");
expected = Arrays.asList("1\t30\tHi", "2\t20\tHello");
Assert.assertEquals(expected, actual);
actual =
hiveShell.executeQuery(
- "SELECT a, sum(b), min(c) FROM " + tableName + " GROUP
BY a ORDER BY a");
+ "SELECT a, sum(b), min(c) FROM "
+ + externalTable
+ + " GROUP BY a ORDER BY a");
expected = Arrays.asList("10\t400\tHello", "20\t700\tWorld",
"30\t500\tStore");
Assert.assertEquals(expected, actual);
actual =
hiveShell.executeQuery(
"SELECT T1.a, T1.b, T2.b FROM "
- + tableName
+ + externalTable
+ " T1 JOIN "
- + tableName
+ + externalTable
+ " T2 ON T1.a = T2.a WHERE T1.a > 10 ORDER BY
T1.a, T1.b, T2.b");
expected =
Arrays.asList(
@@ -600,66 +678,7 @@ public class PaimonStorageHandlerITCase {
Assert.assertEquals(expected, actual);
}
- private String createChangelogExternalTable(
- RowType rowType,
- List<String> partitionKeys,
- List<String> primaryKeys,
- List<InternalRow> data)
- throws Exception {
-
- return createChangelogExternalTable(rowType, partitionKeys,
primaryKeys, data, "");
- }
-
- private String createChangelogExternalTable(
- RowType rowType,
- List<String> partitionKeys,
- List<String> primaryKeys,
- List<InternalRow> data,
- String tableName)
- throws Exception {
- String path = folder.newFolder().toURI().toString();
- String tableNameNotNull =
- StringUtils.isNullOrWhitespaceOnly(tableName) ? TABLE_NAME :
tableName;
- String tablePath = String.format("%s/default.db/%s", path,
tableNameNotNull);
- Options conf = new Options();
- conf.set(CatalogOptions.WAREHOUSE, path);
- conf.set(CoreOptions.BUCKET, 2);
- conf.set(CoreOptions.FILE_FORMAT, CoreOptions.FileFormatType.AVRO);
- conf.set(CoreOptions.WRITE_MODE, WriteMode.CHANGE_LOG);
- Identifier identifier = Identifier.create(DATABASE_NAME,
tableNameNotNull);
- Table table =
- FileStoreTestUtils.createFileStoreTable(
- conf, rowType, partitionKeys, primaryKeys, identifier);
-
- return writeData(table, tablePath, data);
- }
-
- private String createAppendOnlyExternalTable(
- RowType rowType, List<String> partitionKeys, List<InternalRow>
data) throws Exception {
- return createAppendOnlyExternalTable(rowType, partitionKeys, data, "");
- }
-
- private String createAppendOnlyExternalTable(
- RowType rowType, List<String> partitionKeys, List<InternalRow>
data, String tableName)
- throws Exception {
- String path = folder.newFolder().toURI().toString();
- String tableNameNotNull =
- StringUtils.isNullOrWhitespaceOnly(tableName) ? TABLE_NAME :
tableName;
- String tablePath = String.format("%s/default.db/%s", path,
tableNameNotNull);
- Options conf = new Options();
- conf.set(CatalogOptions.WAREHOUSE, path);
- conf.set(CoreOptions.BUCKET, 2);
- conf.set(CoreOptions.FILE_FORMAT, CoreOptions.FileFormatType.AVRO);
- conf.set(CoreOptions.WRITE_MODE, WriteMode.APPEND_ONLY);
- Identifier identifier = Identifier.create(DATABASE_NAME,
tableNameNotNull);
- Table table =
- FileStoreTestUtils.createFileStoreTable(
- conf, rowType, partitionKeys, Collections.emptyList(),
identifier);
-
- return writeData(table, tablePath, data);
- }
-
- private String writeData(Table table, String path, List<InternalRow> data)
throws Exception {
+ private void writeData(Table table, List<InternalRow> data) throws
Exception {
StreamWriteBuilder streamWriteBuilder = table.newStreamWriteBuilder();
StreamTableWrite write = streamWriteBuilder.newWrite();
StreamTableCommit commit = streamWriteBuilder.newCommit();
@@ -673,24 +692,11 @@ public class PaimonStorageHandlerITCase {
commit.commit(commitIdentifier, write.prepareCommit(true,
commitIdentifier));
commitIdentifier++;
write.close();
-
- String tableName = "test_table_" +
(UUID.randomUUID().toString().substring(0, 4));
- hiveShell.execute(
- String.join(
- "\n",
- Arrays.asList(
- "CREATE EXTERNAL TABLE " + tableName + " ",
- "STORED BY '" +
PaimonStorageHandler.class.getName() + "'",
- "LOCATION '" + path + "'")));
- return tableName;
}
@Test
public void testReadAllSupportedTypes() throws Exception {
- String root = folder.newFolder().toString();
- String tablePath = String.format("%s/default.db/hive_test_table",
root);
- Options conf = new Options();
- conf.set(CatalogOptions.WAREHOUSE, root);
+ Options conf = getBasicConf();
conf.set(CoreOptions.FILE_FORMAT, CoreOptions.FileFormatType.AVRO);
Table table =
FileStoreTestUtils.createFileStoreTable(
@@ -721,15 +727,10 @@ public class PaimonStorageHandlerITCase {
commit.commit(0, write.prepareCommit(true, 0));
write.close();
- hiveShell.execute(
- String.join(
- "\n",
- Arrays.asList(
- "CREATE EXTERNAL TABLE test_table",
- "STORED BY '" +
PaimonStorageHandler.class.getName() + "'",
- "LOCATION '" + tablePath + "'")));
+ createExternalTable();
List<Object[]> actual =
- hiveShell.executeStatement("SELECT * FROM test_table WHERE
f_int > 0");
+ hiveShell.executeStatement(
+ "SELECT * FROM `" + externalTable + "` WHERE f_int >
0");
Map<Integer, GenericRow> expected = new HashMap<>();
for (GenericRow rowData : input) {
@@ -761,7 +762,7 @@ public class PaimonStorageHandlerITCase {
Assert.assertArrayEquals(
(byte[]) expectedObject, (byte[])
actualRow[i]);
} else if (expectedObject instanceof HiveDecimal) {
- // HiveDecimal will remove trailing zeros
+ // HiveDecimal will remove trailing zeros,
// so we have to compare it from the original
DecimalData
Assert.assertEquals(expectedRow.getField(i).toString(), actualRow[i]);
} else {
@@ -805,10 +806,8 @@ public class PaimonStorageHandlerITCase {
@Test
public void testPredicatePushDown() throws Exception {
- String path = folder.newFolder().toURI().toString();
- String tablePath = String.format("%s/default.db/hive_test_table",
path);
- Options conf = new Options();
- conf.set(CatalogOptions.WAREHOUSE, path);
+ Options conf = getBasicConf();
+ conf.set(CoreOptions.BUCKET, 1);
conf.set(CoreOptions.FILE_FORMAT, CoreOptions.FileFormatType.AVRO);
Table table =
FileStoreTestUtils.createFileStoreTable(
@@ -896,11 +895,8 @@ public class PaimonStorageHandlerITCase {
@Test
public void testDateAndTimestamp() throws Exception {
- String path = folder.newFolder().toURI().toString();
- String tablePath = String.format("%s/default.db/hive_test_table",
path);
ThreadLocalRandom random = ThreadLocalRandom.current();
- Options conf = new Options();
- conf.set(CatalogOptions.WAREHOUSE, path);
+ Options conf = getBasicConf();
conf.set(
CoreOptions.FILE_FORMAT,
random.nextBoolean()
@@ -937,26 +933,122 @@ public class PaimonStorageHandlerITCase {
commit.commit(2, write.prepareCommit(true, 2));
write.close();
- hiveShell.execute(
- String.join(
- "\n",
- Arrays.asList(
- "CREATE EXTERNAL TABLE test_table",
- "STORED BY '" +
PaimonStorageHandler.class.getName() + "'",
- "LOCATION '" + tablePath + "'")));
+ createExternalTable();
Assert.assertEquals(
Collections.singletonList("1971-01-11\t2022-05-17 17:29:20.1"),
- hiveShell.executeQuery("SELECT * FROM test_table WHERE dt =
'1971-01-11'"));
+ hiveShell.executeQuery(
+ "SELECT * FROM `" + externalTable + "` WHERE dt =
'1971-01-11'"));
Assert.assertEquals(
Collections.singletonList("1971-01-11\t2022-05-17 17:29:20.1"),
hiveShell.executeQuery(
- "SELECT * FROM test_table WHERE ts = '2022-05-17
17:29:20.1'"));
+ "SELECT * FROM `"
+ + externalTable
+ + "` WHERE ts = '2022-05-17 17:29:20.1'"));
Assert.assertEquals(
Collections.singletonList("1971-01-12\tNULL"),
- hiveShell.executeQuery("SELECT * FROM test_table WHERE dt =
'1971-01-12'"));
+ hiveShell.executeQuery(
+ "SELECT * FROM `" + externalTable + "` WHERE dt =
'1971-01-12'"));
Assert.assertEquals(
Collections.singletonList("NULL\t2022-06-18 08:30:00.1"),
hiveShell.executeQuery(
- "SELECT * FROM test_table WHERE ts = '2022-06-18
08:30:00.1'"));
+ "SELECT * FROM `"
+ + externalTable
+ + "` WHERE ts = '2022-06-18 08:30:00.1'"));
+ }
+
+ @Test
+ public void testMapKey() throws Exception {
+ Options conf = getBasicConf();
+ // TODO fix PARQUET
+ // conf.set(
+ // CoreOptions.FILE_FORMAT,
+ // ThreadLocalRandom.current().nextBoolean()
+ // ? CoreOptions.FileFormatType.ORC
+ // : CoreOptions.FileFormatType.PARQUET);
+ conf.set(CoreOptions.FILE_FORMAT, CoreOptions.FileFormatType.ORC);
+ Table table =
+ FileStoreTestUtils.createFileStoreTable(
+ conf,
+ RowType.of(
+ new DataType[] {
+ DataTypes.MAP(DataTypes.DATE(),
DataTypes.STRING()),
+ DataTypes.MAP(DataTypes.TIMESTAMP(3),
DataTypes.STRING()),
+ DataTypes.MAP(DataTypes.DECIMAL(2, 1),
DataTypes.STRING()),
+ DataTypes.MAP(DataTypes.STRING(),
DataTypes.STRING()),
+ DataTypes.MAP(DataTypes.VARCHAR(10),
DataTypes.STRING())
+ },
+ new String[] {
+ "date_key",
+ "timestamp_key",
+ "decimal_key",
+ "string_key",
+ "varchar_key"
+ }),
+ Collections.emptyList(),
+ Collections.emptyList());
+
+ StreamWriteBuilder streamWriteBuilder = table.newStreamWriteBuilder();
+ StreamTableWrite write = streamWriteBuilder.newWrite();
+ StreamTableCommit commit = streamWriteBuilder.newCommit();
+
+ Map<Integer, BinaryString> dateMap =
+ Collections.singletonMap(375, BinaryString.fromString("Date
1971-01-11"));
+ Map<Timestamp, BinaryString> timestampMap =
+ Collections.singletonMap(
+ Timestamp.fromLocalDateTime(
+ LocalDateTime.of(2023, 7, 18, 12, 29, 59,
123_000_000)),
+ BinaryString.fromString("Test timestamp(3)"));
+ Map<Decimal, BinaryString> decimalMap =
+ Collections.singletonMap(
+ Decimal.fromBigDecimal(new BigDecimal("1.2"), 2, 1),
+ BinaryString.fromString("一点二"));
+ Map<BinaryString, BinaryString> stringMap =
+ Collections.singletonMap(
+ BinaryString.fromString("Engine"),
BinaryString.fromString("Hive"));
+ Map<BinaryString, BinaryString> varcharMap =
+ Collections.singletonMap(
+ BinaryString.fromString("Name"),
BinaryString.fromString("Paimon"));
+
+ write.write(
+ GenericRow.of(
+ new GenericMap(dateMap),
+ new GenericMap(timestampMap),
+ new GenericMap(decimalMap),
+ new GenericMap(stringMap),
+ new GenericMap(varcharMap)));
+ commit.commit(0, write.prepareCommit(true, 0));
+ write.close();
+
+ createExternalTable();
+
+ Assert.assertEquals(
+ Collections.singletonList("Date 1971-01-11\tTest
timestamp(3)\t一点二\tHive\tPaimon"),
+ hiveShell.executeQuery(
+ "SELECT "
+ + "date_key[CAST('1971-01-11' AS DATE)],"
+ + "timestamp_key[CAST('2023-7-18 12:29:59.123'
AS TIMESTAMP)],"
+ + "decimal_key[1.2],"
+ + "string_key['Engine'],"
+ + "varchar_key['Name']"
+ + " FROM `"
+ + externalTable
+ + "`"));
+ }
+
+ private Options getBasicConf() {
+ Options conf = new Options();
+ conf.set(CatalogOptions.WAREHOUSE, warehouse);
+ conf.set(CoreOptions.BUCKET, 2);
+ return conf;
+ }
+
+ private void createExternalTable() {
+ hiveShell.execute(
+ String.join(
+ "\n",
+ Arrays.asList(
+ "CREATE EXTERNAL TABLE " + externalTable + " ",
+ "STORED BY '" +
PaimonStorageHandler.class.getName() + "'",
+ "LOCATION '" + tablePath + "'")));
}
}