This is an automated email from the ASF dual-hosted git repository.
lirui pushed a commit to branch release-1.12
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.12 by this push:
new 654e2a6 [FLINK-22890][hive] HiveTestUtils should create partition
after the data file is ready
654e2a6 is described below
commit 654e2a637a09568670c5b8f538647321a2800ddb
Author: Rui Li <[email protected]>
AuthorDate: Mon Jun 7 21:07:40 2021 +0800
[FLINK-22890][hive] HiveTestUtils should create partition after the data
file is ready
This closes #16099
---
.../flink/table/catalog/hive/HiveTestUtils.java | 24 ++++++++++++++--------
1 file changed, 15 insertions(+), 9 deletions(-)
diff --git
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveTestUtils.java
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveTestUtils.java
index ab0c210..f2effd0 100644
---
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveTestUtils.java
+++
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveTestUtils.java
@@ -26,11 +26,11 @@ import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.SqlDialect;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
-import org.apache.flink.table.catalog.CatalogPartitionSpec;
import org.apache.flink.table.catalog.CatalogTest;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.exceptions.CatalogException;
import org.apache.flink.table.catalog.hive.client.HiveShimLoader;
+import org.apache.flink.table.utils.PartitionPathUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.StringUtils;
@@ -39,7 +39,6 @@ import org.apache.calcite.sql.parser.SqlParser;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.metastore.api.Table;
import org.junit.rules.TemporaryFolder;
@@ -51,6 +50,7 @@ import java.net.BindException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.util.ArrayList;
+import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ThreadLocalRandom;
@@ -236,30 +236,36 @@ public class HiveTestUtils {
Path dest;
ObjectPath tablePath = new ObjectPath(dbName, tableName);
Table hiveTable = hiveCatalog.getHiveTable(tablePath);
+ String addPartDDL = null;
if (partitionSpec != null) {
- String ddl =
+ addPartDDL =
String.format(
"alter table `%s`.`%s` add if not exists
partition (%s)",
dbName, tableName, partitionSpec);
- tableEnv.executeSql(ddl);
// we need parser to parse the partition spec
SqlParser parser =
SqlParser.create(
- ddl,
+ addPartDDL,
SqlParser.config()
.withParserFactory(FlinkHiveSqlParserImpl.FACTORY)
.withLex(Lex.JAVA));
SqlAddHivePartitions sqlAddPart = (SqlAddHivePartitions)
parser.parseStmt();
- Map<String, String> spec =
+ LinkedHashMap<String, String> spec =
SqlPartitionUtils.getPartitionKVs(sqlAddPart.getPartSpecs().get(0));
- Partition hivePart =
- hiveCatalog.getHivePartition(hiveTable, new
CatalogPartitionSpec(spec));
- dest = new Path(hivePart.getSd().getLocation(), src.getName());
+ Path partLocation =
+ new Path(
+ hiveTable.getSd().getLocation(),
+
PartitionPathUtils.generatePartitionPath(spec));
+ dest = new Path(partLocation, src.getName());
} else {
dest = new Path(hiveTable.getSd().getLocation(),
src.getName());
}
FileSystem fs = dest.getFileSystem(hiveCatalog.getHiveConf());
Preconditions.checkState(fs.rename(src, dest));
+ if (addPartDDL != null) {
+ tableEnv.executeSql(
+ addPartDDL + String.format(" location '%s'",
dest.getParent().toString()));
+ }
}
private String toText(Object[] row) {