This is an automated email from the ASF dual-hosted git repository.
bli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 07dfffe [FLINK-13814][hive] HiveTableSink should strip quotes from
partition values
07dfffe is described below
commit 07dfffec8248788630bff5e99afe9866d8b50487
Author: Rui Li <[email protected]>
AuthorDate: Thu Aug 22 14:53:26 2019 +0800
[FLINK-13814][hive] HiveTableSink should strip quotes from partition values
Strip quotes from partition value in order to get proper string values.
This closes #9502.
---
.../flink/connectors/hive/HiveTableSinkTest.java | 57 ----------------------
.../connectors/hive/TableEnvHiveConnectorTest.java | 52 ++++++++++++++++++++
.../apache/flink/sql/parser/dml/RichSqlInsert.java | 7 ++-
3 files changed, 57 insertions(+), 59 deletions(-)
diff --git
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSinkTest.java
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSinkTest.java
index 13bddc0..51a56fb 100644
---
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSinkTest.java
+++
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSinkTest.java
@@ -27,7 +27,6 @@ import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.TableSchema;
-import org.apache.flink.table.catalog.CatalogPartitionSpec;
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.CatalogTableImpl;
import org.apache.flink.table.catalog.ObjectPath;
@@ -41,7 +40,6 @@ import org.apache.flink.types.Row;
import com.klarna.hiverunner.HiveShell;
import com.klarna.hiverunner.annotations.HiveSQL;
import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.mapred.JobConf;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
@@ -107,31 +105,6 @@ public class HiveTableSinkTest {
}
@Test
- public void testInsertIntoDynamicPartition() throws Exception {
- String dbName = "default";
- String tblName = "dest";
- RowTypeInfo rowTypeInfo = createDestTable(dbName, tblName, 1);
- ObjectPath tablePath = new ObjectPath(dbName, tblName);
-
- TableEnvironment tableEnv = HiveTestUtils.createTableEnv();
-
- List<Row> toWrite = generateRecords(5);
- Table src = tableEnv.fromTableSource(new
CollectionTableSource(toWrite, rowTypeInfo));
- tableEnv.registerTable("src", src);
-
- tableEnv.registerCatalog("hive", hiveCatalog);
- tableEnv.sqlQuery("select * from src").insertInto("hive",
"default", "dest");
- tableEnv.execute("mytest");
-
- List<CatalogPartitionSpec> partitionSpecs =
hiveCatalog.listPartitions(tablePath);
- assertEquals(toWrite.size(), partitionSpecs.size());
-
- verifyWrittenData(toWrite, hiveShell.executeQuery("select *
from " + tblName));
-
- hiveCatalog.dropTable(tablePath, false);
- }
-
- @Test
public void testWriteComplexType() throws Exception {
String dbName = "default";
String tblName = "dest";
@@ -213,36 +186,6 @@ public class HiveTableSinkTest {
hiveCatalog.dropTable(tablePath, false);
}
- @Test
- public void testInsertIntoStaticPartition() throws Exception {
- String dbName = "default";
- String tblName = "dest";
- RowTypeInfo rowTypeInfo = createDestTable(dbName, tblName, 1);
- ObjectPath tablePath = new ObjectPath(dbName, tblName);
-
- TableEnvironment tableEnv = HiveTestUtils.createTableEnv();
- List<Row> toWrite = generateRecords(1);
- Table src = tableEnv.fromTableSource(new
CollectionTableSource(toWrite, rowTypeInfo));
- tableEnv.registerTable("src", src);
-
- Map<String, String> partSpec = new HashMap<>();
- partSpec.put("s", "a");
-
- CatalogTable table = (CatalogTable)
hiveCatalog.getTable(tablePath);
- HiveTableSink hiveTableSink = new HiveTableSink(new
JobConf(hiveConf), tablePath, table);
- hiveTableSink.setStaticPartition(partSpec);
- tableEnv.registerTableSink("destSink", hiveTableSink);
- tableEnv.sqlQuery("select * from src").insertInto("destSink");
- tableEnv.execute("mytest");
-
- // make sure new partition is created
- assertEquals(toWrite.size(),
hiveCatalog.listPartitions(tablePath).size());
-
- verifyWrittenData(toWrite, hiveShell.executeQuery("select *
from " + tblName));
-
- hiveCatalog.dropTable(tablePath, false);
- }
-
private RowTypeInfo createDestTable(String dbName, String tblName,
TableSchema tableSchema, int numPartCols) throws Exception {
CatalogTable catalogTable = createCatalogTable(tableSchema,
numPartCols);
hiveCatalog.createTable(new ObjectPath(dbName, tblName),
catalogTable, false);
diff --git
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/TableEnvHiveConnectorTest.java
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/TableEnvHiveConnectorTest.java
index ebf9901..07dd674 100644
---
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/TableEnvHiveConnectorTest.java
+++
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/TableEnvHiveConnectorTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.connectors.hive;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.internal.TableImpl;
+import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.hive.HiveCatalog;
import org.apache.flink.table.catalog.hive.HiveTestUtils;
import org.apache.flink.table.catalog.hive.client.HiveMetastoreClientFactory;
@@ -188,6 +189,57 @@ public class TableEnvHiveConnectorTest {
}
}
+ @Test
+ public void testStaticPartition() throws Exception {
+ hiveShell.execute("create database db1");
+ try {
+ hiveShell.execute("create table db1.src (x int)");
+ hiveShell.insertInto("db1",
"src").addRow(1).addRow(2).commit();
+ hiveShell.execute("create table db1.dest (x int)
partitioned by (p1 string, p2 double)");
+ TableEnvironment tableEnv =
getTableEnvWithHiveCatalog();
+ tableEnv.sqlUpdate("insert into db1.dest partition
(p1='1''1', p2=1.1) select x from db1.src");
+ tableEnv.execute("static partitioning");
+ assertEquals(1, hiveCatalog.listPartitions(new
ObjectPath("db1", "dest")).size());
+ verifyHiveQueryResult("select * from db1.dest",
Arrays.asList("1\t1'1\t1.1", "2\t1'1\t1.1"));
+ } finally {
+ hiveShell.execute("drop database db1 cascade");
+ }
+ }
+
+ @Test
+ public void testDynamicPartition() throws Exception {
+ hiveShell.execute("create database db1");
+ try {
+ hiveShell.execute("create table db1.src (x int, y
string, z double)");
+ hiveShell.insertInto("db1", "src").addRow(1, "a",
1.1).addRow(2, "a", 2.2).addRow(3, "b", 3.3).commit();
+ hiveShell.execute("create table db1.dest (x int)
partitioned by (p1 string, p2 double)");
+ TableEnvironment tableEnv =
getTableEnvWithHiveCatalog();
+ tableEnv.sqlUpdate("insert into db1.dest select * from
db1.src");
+ tableEnv.execute("dynamic partitioning");
+ assertEquals(3, hiveCatalog.listPartitions(new
ObjectPath("db1", "dest")).size());
+ verifyHiveQueryResult("select * from db1.dest",
Arrays.asList("1\ta\t1.1", "2\ta\t2.2", "3\tb\t3.3"));
+ } finally {
+ hiveShell.execute("drop database db1 cascade");
+ }
+ }
+
+ @Test
+ public void testPartialDynamicPartition() throws Exception {
+ hiveShell.execute("create database db1");
+ try {
+ hiveShell.execute("create table db1.src (x int, y
string)");
+ hiveShell.insertInto("db1", "src").addRow(1,
"a").addRow(2, "b").commit();
+ hiveShell.execute("create table db1.dest (x int)
partitioned by (p1 double, p2 string)");
+ TableEnvironment tableEnv =
getTableEnvWithHiveCatalog();
+ tableEnv.sqlUpdate("insert into db1.dest partition
(p1=1.1) select x,y from db1.src");
+ tableEnv.execute("partial dynamic partitioning");
+ assertEquals(2, hiveCatalog.listPartitions(new
ObjectPath("db1", "dest")).size());
+ verifyHiveQueryResult("select * from db1.dest",
Arrays.asList("1\t1.1\ta", "2\t1.1\tb"));
+ } finally {
+ hiveShell.execute("drop database db1 cascade");
+ }
+ }
+
private TableEnvironment getTableEnvWithHiveCatalog() {
TableEnvironment tableEnv = HiveTestUtils.createTableEnv();
tableEnv.registerCatalog(hiveCatalog.getName(), hiveCatalog);
diff --git
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/dml/RichSqlInsert.java
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/dml/RichSqlInsert.java
index d9df258..4b681b2 100644
---
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/dml/RichSqlInsert.java
+++
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/dml/RichSqlInsert.java
@@ -29,6 +29,7 @@ import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.sql.SqlNodeList;
import org.apache.calcite.sql.SqlWriter;
import org.apache.calcite.sql.parser.SqlParserPos;
+import org.apache.calcite.util.NlsString;
import java.util.LinkedHashMap;
import java.util.List;
@@ -61,7 +62,8 @@ public class RichSqlInsert extends SqlInsert implements
ExtendedSqlNode {
/** Get static partition key value pair as strings.
*
- * <p>Caution that we use {@link SqlLiteral#toString()} to get
+ * <p>For character literals we return the unquoted and unescaped
values.
+ * For other types we use {@link SqlLiteral#toString()} to get
* the string format of the value literal. If the string format is not
* what you need, use {@link #getStaticPartitions()}.
*
@@ -75,7 +77,8 @@ public class RichSqlInsert extends SqlInsert implements
ExtendedSqlNode {
}
for (SqlNode node : this.staticPartitions.getList()) {
SqlProperty sqlProperty = (SqlProperty) node;
- String value =
SqlLiteral.value(sqlProperty.getValue()).toString();
+ Comparable comparable =
SqlLiteral.value(sqlProperty.getValue());
+ String value = comparable instanceof NlsString ?
((NlsString) comparable).getValue() : comparable.toString();
ret.put(sqlProperty.getKey().getSimple(), value);
}
return ret;