This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 6c91cc59998 [FLINK-26413][hive] Supports "LOAD DATA INPATH" in Hive
dialect
6c91cc59998 is described below
commit 6c91cc5999828f0a61f8a54498bc71a581a05dd8
Author: yuxia Luo <[email protected]>
AuthorDate: Tue Aug 9 14:46:37 2022 +0800
[FLINK-26413][hive] Supports "LOAD DATA INPATH" in Hive dialect
This closes #19556
---
flink-connectors/flink-connector-hive/pom.xml | 7 +
.../flink/table/catalog/hive/HiveCatalog.java | 42 +++
.../hive/client/HiveMetastoreClientWrapper.java | 26 ++
.../flink/table/catalog/hive/client/HiveShim.java | 13 +
.../table/catalog/hive/client/HiveShimV100.java | 75 ++++++
.../table/catalog/hive/client/HiveShimV200.java | 62 +++++
.../table/catalog/hive/client/HiveShimV210.java | 74 ++++++
.../table/catalog/hive/client/HiveShimV310.java | 132 ++++++++-
.../delegation/hive/HiveOperationExecutor.java | 93 +++++++
.../table/planner/delegation/hive/HiveParser.java | 156 +----------
.../planner/delegation/hive/HiveSessionState.java | 171 ++++++++++++
.../delegation/hive/SqlFunctionConverter.java | 3 +-
.../delegation/hive/copy/HiveParserContext.java | 2 +-
.../hive/operation/HiveLoadDataOperation.java | 99 +++++++
.../hive/parse/HiveParserLoadSemanticAnalyzer.java | 294 +++++++++++++++++++++
.../connectors/hive/HiveDialectQueryITCase.java | 99 +++++--
.../src/test/resources/explain/testLoadData.out | 8 +
17 files changed, 1189 insertions(+), 167 deletions(-)
diff --git a/flink-connectors/flink-connector-hive/pom.xml
b/flink-connectors/flink-connector-hive/pom.xml
index f42554dfae0..6fd4f86c199 100644
--- a/flink-connectors/flink-connector-hive/pom.xml
+++ b/flink-connectors/flink-connector-hive/pom.xml
@@ -891,6 +891,13 @@ under the License.
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-hdfs</artifactId>
+ <version>${hivemetastore.hadoop.version}</version>
+ <scope>test</scope>
+ </dependency>
+
<!-- ArchUit test dependencies -->
<dependency>
diff --git
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
index c72ebbe8edc..dee28e2a58f 100644
---
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
+++
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
@@ -21,6 +21,7 @@ package org.apache.flink.table.catalog.hive;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.java.hadoop.mapred.utils.HadoopUtils;
+import org.apache.flink.connectors.hive.FlinkHiveException;
import org.apache.flink.connectors.hive.HiveDynamicTableFactory;
import org.apache.flink.connectors.hive.HiveTableFactory;
import org.apache.flink.sql.parser.hive.ddl.HiveDDLUtils;
@@ -103,6 +104,7 @@ import
org.apache.hadoop.hive.metastore.api.StorageDescriptor;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.metastore.api.UnknownDBException;
import org.apache.hadoop.hive.metastore.partition.spec.PartitionSpecProxy;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -118,6 +120,7 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
+import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@@ -1769,6 +1772,45 @@ public class HiveCatalog extends AbstractCatalog {
return IDENTIFIER.equalsIgnoreCase(properties.get(CONNECTOR.key()));
}
+ @Internal
+ public void loadTable(
+ Path loadPath, ObjectPath tablePath, boolean isOverwrite, boolean
isSrcLocal) {
+ try {
+ client.loadTable(loadPath, tablePath.getFullName(), isOverwrite,
isSrcLocal);
+ } catch (HiveException e) {
+ throw new FlinkHiveException("Fail to load table.", e);
+ }
+ }
+
+ @Internal
+ public void loadPartition(
+ Path loadPath,
+ ObjectPath tablePath,
+ Map<String, String> partSpec,
+ boolean isOverwrite,
+ boolean isSrcLocal) {
+ Table hiveTable;
+ Map<String, String> orderedPartitionSpec = new LinkedHashMap<>();
+ try {
+ hiveTable = getHiveTable(tablePath);
+ } catch (TableNotExistException e) {
+ throw new FlinkHiveException("Fail to get Hive table when try to
load partition", e);
+ }
+ hiveTable
+ .getPartitionKeys()
+ .forEach(
+ column ->
+ orderedPartitionSpec.put(
+ column.getName(),
partSpec.get(column.getName())));
+ client.loadPartition(
+ loadPath,
+ tablePath.getFullName(),
+ orderedPartitionSpec,
+ hiveTable.getSd().isStoredAsSubDirectories(),
+ isOverwrite,
+ isSrcLocal);
+ }
+
private static void disallowChangeCatalogTableType(
Map<String, String> existingTableOptions, Map<String, String>
newTableOptions) {
CatalogTableType existingTableType =
getCatalogTableType(existingTableOptions);
diff --git
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveMetastoreClientWrapper.java
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveMetastoreClientWrapper.java
index d7f48e8d129..3acc9860025 100644
---
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveMetastoreClientWrapper.java
+++
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveMetastoreClientWrapper.java
@@ -19,11 +19,13 @@
package org.apache.flink.table.catalog.hive.client;
import org.apache.flink.annotation.Internal;
+import org.apache.flink.connectors.hive.FlinkHiveException;
import org.apache.flink.table.api.constraints.UniqueConstraint;
import org.apache.flink.table.catalog.hive.HiveCatalog;
import org.apache.flink.util.Preconditions;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
import org.apache.hadoop.hive.metastore.IMetaStoreClient;
@@ -47,6 +49,8 @@ import
org.apache.hadoop.hive.metastore.api.TxnAbortedException;
import org.apache.hadoop.hive.metastore.api.TxnOpenException;
import org.apache.hadoop.hive.metastore.api.UnknownDBException;
import org.apache.hadoop.hive.metastore.partition.spec.PartitionSpecProxy;
+import org.apache.hadoop.hive.ql.metadata.Hive;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -67,6 +71,7 @@ public class HiveMetastoreClientWrapper implements
AutoCloseable {
private static final Logger LOG =
LoggerFactory.getLogger(HiveMetastoreClientWrapper.class);
private final IMetaStoreClient client;
+ private final Hive hive;
private final HiveConf hiveConf;
private final HiveShim hiveShim;
@@ -82,6 +87,11 @@ public class HiveMetastoreClientWrapper implements
AutoCloseable {
HiveCatalog.isEmbeddedMetastore(hiveConf)
? createMetastoreClient()
:
HiveMetaStoreClient.newSynchronizedClient(createMetastoreClient());
+ try {
+ this.hive = Hive.get(hiveConf);
+ } catch (HiveException e) {
+ throw new FlinkHiveException(e);
+ }
}
@Override
@@ -336,4 +346,20 @@ public class HiveMetastoreClientWrapper implements
AutoCloseable {
public void unlock(long lockid) throws NoSuchLockException,
TxnOpenException, TException {
client.unlock(lockid);
}
+
+ public void loadTable(Path loadPath, String tableName, boolean replace,
boolean isSrcLocal)
+ throws HiveException {
+ hiveShim.loadTable(hive, loadPath, tableName, replace, isSrcLocal);
+ }
+
+ public void loadPartition(
+ Path loadPath,
+ String tableName,
+ Map<String, String> partSpec,
+ boolean isSkewedStoreAsSubdir,
+ boolean replace,
+ boolean isSrcLocal) {
+ hiveShim.loadPartition(
+ hive, loadPath, tableName, partSpec, isSkewedStoreAsSubdir,
replace, isSrcLocal);
+ }
}
diff --git
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShim.java
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShim.java
index 8e3e3cf2608..58a0e98b75b 100644
---
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShim.java
+++
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShim.java
@@ -38,6 +38,7 @@ import
org.apache.hadoop.hive.metastore.api.UnknownDBException;
import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
import org.apache.hadoop.hive.ql.exec.FunctionInfo;
import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
+import org.apache.hadoop.hive.ql.metadata.Hive;
import org.apache.hadoop.hive.ql.udf.generic.SimpleGenericUDAFParameterInfo;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
@@ -52,6 +53,7 @@ import java.io.Serializable;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.util.List;
+import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
@@ -241,4 +243,15 @@ public interface HiveShim extends Serializable {
}
void registerTemporaryFunction(String funcName, Class funcClass);
+
+ void loadPartition(
+ Hive hive,
+ Path loadPath,
+ String tableName,
+ Map<String, String> partSpec,
+ boolean isSkewedStoreAsSubdir,
+ boolean replace,
+ boolean isSrcLocal);
+
+ void loadTable(Hive hive, Path loadPath, String tableName, boolean
replace, boolean isSrcLocal);
}
diff --git
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV100.java
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV100.java
index 955c0e50fa4..976e0e00a3e 100644
---
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV100.java
+++
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV100.java
@@ -50,6 +50,7 @@ import org.apache.hadoop.hive.ql.exec.FunctionRegistry;
import org.apache.hadoop.hive.ql.exec.FunctionUtils;
import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils;
import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
+import org.apache.hadoop.hive.ql.metadata.Hive;
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.udf.generic.SimpleGenericUDAFParameterInfo;
import org.apache.hadoop.hive.serde2.Deserializer;
@@ -86,6 +87,7 @@ import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
+import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
@@ -94,6 +96,11 @@ import java.util.stream.Collectors;
/** Shim for Hive version 1.0.0. */
public class HiveShimV100 implements HiveShim {
+ protected final boolean holdDDLTime = false;
+ protected final boolean isAcid = false;
+ protected final boolean inheritTableSpecs = true;
+ protected final boolean isSkewedStoreAsSubdir = false;
+
private static final Method registerTemporaryFunction =
HiveReflectionUtils.tryGetMethod(
FunctionRegistry.class,
@@ -435,6 +442,74 @@ public class HiveShimV100 implements HiveShim {
}
}
+ @Override
+ public void loadPartition(
+ Hive hive,
+ Path loadPath,
+ String tableName,
+ Map<String, String> partSpec,
+ boolean isSkewedStoreAsSubdir,
+ boolean replace,
+ boolean isSrcLocal) {
+ try {
+ Class hiveClass = Hive.class;
+ Method loadPartitionMethod =
+ hiveClass.getDeclaredMethod(
+ "loadPartition",
+ Path.class,
+ String.class,
+ Map.class,
+ boolean.class,
+ boolean.class,
+ boolean.class,
+ boolean.class,
+ boolean.class,
+ boolean.class);
+ loadPartitionMethod.invoke(
+ hive,
+ loadPath,
+ tableName,
+ partSpec,
+ replace,
+ holdDDLTime,
+ inheritTableSpecs,
+ isSkewedStoreAsSubdir,
+ isSrcLocal,
+ isAcid);
+ } catch (Exception e) {
+ throw new FlinkHiveException("Failed to load partition", e);
+ }
+ }
+
+ @Override
+ public void loadTable(
+ Hive hive, Path loadPath, String tableName, boolean replace,
boolean isSrcLocal) {
+ try {
+ Class hiveClass = Hive.class;
+ Method loadTableMethod =
+ hiveClass.getDeclaredMethod(
+ "loadTable",
+ Path.class,
+ String.class,
+ boolean.class,
+ boolean.class,
+ boolean.class,
+ boolean.class,
+ boolean.class);
+ loadTableMethod.invoke(
+ hive,
+ loadPath,
+ tableName,
+ replace,
+ holdDDLTime,
+ isSrcLocal,
+ isSkewedStoreAsSubdir,
+ isAcid);
+ } catch (Exception e) {
+ throw new FlinkHiveException("Failed to load table", e);
+ }
+ }
+
boolean isBuiltInFunctionInfo(FunctionInfo info) {
return info.isNative();
}
diff --git
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV200.java
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV200.java
index 3fa8d83da65..5ef79d611a7 100644
---
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV200.java
+++
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV200.java
@@ -19,6 +19,7 @@
package org.apache.flink.table.catalog.hive.client;
import org.apache.flink.api.common.serialization.BulkWriter;
+import org.apache.flink.connectors.hive.FlinkHiveException;
import org.apache.flink.orc.vector.RowDataVectorizer;
import org.apache.flink.orc.writer.OrcBulkWriterFactory;
import org.apache.flink.table.catalog.exceptions.CatalogException;
@@ -26,12 +27,15 @@ import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
import org.apache.hadoop.hive.metastore.IMetaStoreClient;
import org.apache.hadoop.hive.metastore.RetryingMetaStoreClient;
+import org.apache.hadoop.hive.ql.metadata.Hive;
import java.lang.reflect.Method;
+import java.util.Map;
import java.util.Properties;
/** Shim for Hive version 2.0.0. */
@@ -68,4 +72,62 @@ public class HiveShimV200 extends HiveShimV122 {
return new OrcBulkWriterFactory<>(
new RowDataVectorizer(schema, fieldTypes), new Properties(),
conf);
}
+
+ @Override
+ public void loadTable(
+ Hive hive, Path loadPath, String tableName, boolean replace,
boolean isSrcLocal) {
+ try {
+ Class hiveClass = Hive.class;
+ Method loadTableMethod =
+ hiveClass.getDeclaredMethod(
+ "loadTable",
+ Path.class,
+ String.class,
+ boolean.class,
+ boolean.class,
+ boolean.class,
+ boolean.class);
+ loadTableMethod.invoke(
+ hive, loadPath, tableName, replace, isSrcLocal,
isSkewedStoreAsSubdir, isAcid);
+ } catch (Exception e) {
+ throw new FlinkHiveException("Failed to load table", e);
+ }
+ }
+
+ @Override
+ public void loadPartition(
+ Hive hive,
+ Path loadPath,
+ String tableName,
+ Map<String, String> partSpec,
+ boolean isSkewedStoreAsSubdir,
+ boolean replace,
+ boolean isSrcLocal) {
+ try {
+ Class hiveClass = Hive.class;
+ Method loadPartitionMethod =
+ hiveClass.getDeclaredMethod(
+ "loadPartition",
+ Path.class,
+ String.class,
+ Map.class,
+ boolean.class,
+ boolean.class,
+ boolean.class,
+ boolean.class,
+ boolean.class);
+ loadPartitionMethod.invoke(
+ hive,
+ loadPath,
+ tableName,
+ partSpec,
+ replace,
+ inheritTableSpecs,
+ isSkewedStoreAsSubdir,
+ isSrcLocal,
+ isAcid);
+ } catch (Exception e) {
+ throw new FlinkHiveException("Failed to load partition", e);
+ }
+ }
}
diff --git
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV210.java
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV210.java
index f32371b23e7..1daa75fa362 100644
---
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV210.java
+++
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV210.java
@@ -18,6 +18,7 @@
package org.apache.flink.table.catalog.hive.client;
+import org.apache.flink.connectors.hive.FlinkHiveException;
import org.apache.flink.table.api.constraints.UniqueConstraint;
import org.apache.flink.table.catalog.exceptions.CatalogException;
import org.apache.flink.table.catalog.hive.util.HiveReflectionUtils;
@@ -25,12 +26,14 @@ import
org.apache.flink.table.catalog.hive.util.HiveTableUtil;
import org.apache.flink.util.Preconditions;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.metastore.IMetaStoreClient;
import org.apache.hadoop.hive.metastore.api.EnvironmentContext;
import org.apache.hadoop.hive.metastore.api.InvalidOperationException;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.ql.metadata.Hive;
import org.apache.thrift.TApplicationException;
import org.apache.thrift.TException;
@@ -40,11 +43,14 @@ import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
+import java.util.Map;
import java.util.Optional;
/** Shim for Hive version 2.1.0. */
public class HiveShimV210 extends HiveShimV201 {
+ protected final boolean hasFollowingStatsTask = false;
+
@Override
public void alterPartition(
IMetaStoreClient client, String databaseName, String tableName,
Partition partition)
@@ -211,4 +217,72 @@ public class HiveShimV210 extends HiveShimV201 {
}
return res;
}
+
+ @Override
+ public void loadTable(
+ Hive hive, Path loadPath, String tableName, boolean replace,
boolean isSrcLocal) {
+ try {
+ Class hiveClass = Hive.class;
+ Method loadTableMethod =
+ hiveClass.getDeclaredMethod(
+ "loadTable",
+ Path.class,
+ String.class,
+ boolean.class,
+ boolean.class,
+ boolean.class,
+ boolean.class,
+ boolean.class);
+ loadTableMethod.invoke(
+ hive,
+ loadPath,
+ tableName,
+ replace,
+ isSrcLocal,
+ isSkewedStoreAsSubdir,
+ isAcid,
+ hasFollowingStatsTask);
+ } catch (Exception e) {
+ throw new FlinkHiveException("Failed to load table", e);
+ }
+ }
+
+ @Override
+ public void loadPartition(
+ Hive hive,
+ Path loadPath,
+ String tableName,
+ Map<String, String> partSpec,
+ boolean isSkewedStoreAsSubdir,
+ boolean replace,
+ boolean isSrcLocal) {
+ try {
+ Class hiveClass = Hive.class;
+ Method loadPartitionMethod =
+ hiveClass.getDeclaredMethod(
+ "loadPartition",
+ Path.class,
+ String.class,
+ Map.class,
+ boolean.class,
+ boolean.class,
+ boolean.class,
+ boolean.class,
+ boolean.class,
+ boolean.class);
+ loadPartitionMethod.invoke(
+ hive,
+ loadPath,
+ tableName,
+ partSpec,
+ replace,
+ inheritTableSpecs,
+ isSkewedStoreAsSubdir,
+ isSrcLocal,
+ isAcid,
+ hasFollowingStatsTask);
+ } catch (Exception e) {
+ throw new FlinkHiveException("Failed to load partition", e);
+ }
+ }
}
diff --git
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV310.java
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV310.java
index 0aeccf927bf..f01d5329ed1 100644
---
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV310.java
+++
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV310.java
@@ -26,10 +26,12 @@ import
org.apache.flink.table.catalog.hive.util.HiveTableUtil;
import org.apache.flink.util.Preconditions;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.IMetaStoreClient;
import org.apache.hadoop.hive.metastore.RetryingMetaStoreClient;
import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.ql.metadata.Hive;
import org.apache.hadoop.io.Writable;
import java.lang.reflect.Constructor;
@@ -41,9 +43,11 @@ import java.sql.Timestamp;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
+import java.util.Map;
import java.util.Optional;
import java.util.Set;
@@ -62,7 +66,14 @@ public class HiveShimV310 extends HiveShimV239 {
private static Field hiveDateLocalDate;
private static Constructor dateWritableConstructor;
- private static boolean hiveClassesInited;
+ private static volatile boolean hiveClassesInited;
+
+ // LoadFileType class
+ private static volatile boolean loadFileClassInited;
+ private static Class clazzLoadFileType;
+
+ protected final long writeIdInLoadTableOrPartition = 0L;
+ protected final int stmtIdInLoadTableOrPartition = 0;
private static void initDateTimeClasses() {
if (!hiveClassesInited) {
@@ -102,6 +113,23 @@ public class HiveShimV310 extends HiveShimV239 {
}
}
+ private static void initLoadFileTypeClass() {
+ if (!loadFileClassInited) {
+ synchronized (HiveShimV310.class) {
+ if (!loadFileClassInited) {
+ try {
+ clazzLoadFileType =
+ Class.forName(
+
"org.apache.hadoop.hive.ql.plan.LoadTableDesc$LoadFileType");
+ } catch (ClassNotFoundException e) {
+ throw new FlinkHiveException("Failed to get Hive
LoadFileType class", e);
+ }
+ loadFileClassInited = true;
+ }
+ }
+ }
+ }
+
@Override
public IMetaStoreClient getHiveMetastoreClient(HiveConf hiveConf) {
try {
@@ -319,6 +347,90 @@ public class HiveShimV310 extends HiveShimV239 {
}
}
+ @Override
+ public void loadTable(
+ Hive hive, Path loadPath, String tableName, boolean replace,
boolean isSrcLocal) {
+ try {
+ Class hiveClass = Hive.class;
+ initLoadFileTypeClass();
+ Method loadTableMethod =
+ hiveClass.getDeclaredMethod(
+ "loadTable",
+ Path.class,
+ String.class,
+ clazzLoadFileType,
+ boolean.class,
+ boolean.class,
+ boolean.class,
+ boolean.class,
+ long.class,
+ int.class,
+ boolean.class);
+ loadTableMethod.invoke(
+ hive,
+ loadPath,
+ tableName,
+ getLoadFileType(clazzLoadFileType, replace),
+ isSrcLocal,
+ isSkewedStoreAsSubdir,
+ isAcid,
+ hasFollowingStatsTask,
+ writeIdInLoadTableOrPartition,
+ stmtIdInLoadTableOrPartition,
+ replace);
+ } catch (Exception e) {
+ throw new FlinkHiveException("Failed to load table", e);
+ }
+ }
+
+ @Override
+ public void loadPartition(
+ Hive hive,
+ Path loadPath,
+ String tableName,
+ Map<String, String> partSpec,
+ boolean isSkewedStoreAsSubdir,
+ boolean replace,
+ boolean isSrcLocal) {
+ try {
+ initLoadFileTypeClass();
+ Class hiveClass = Hive.class;
+ Method loadPartitionMethod =
+ hiveClass.getDeclaredMethod(
+ "loadPartition",
+ Path.class,
+ org.apache.hadoop.hive.ql.metadata.Table.class,
+ Map.class,
+ boolean.class,
+ boolean.class,
+ boolean.class,
+ boolean.class,
+ boolean.class,
+ long.class,
+ int.class,
+ boolean.class);
+ org.apache.hadoop.hive.ql.metadata.Table table =
hive.getTable(tableName);
+ long writeIdInLoadTableOrPartition = 0L;
+ int stmtIdInLoadTableOrPartition = 0;
+ loadPartitionMethod.invoke(
+ hive,
+ loadPath,
+ table,
+ partSpec,
+ getLoadFileType(clazzLoadFileType, replace),
+ inheritTableSpecs,
+ isSkewedStoreAsSubdir,
+ isSrcLocal,
+ isAcid,
+ hasFollowingStatsTask,
+ writeIdInLoadTableOrPartition,
+ stmtIdInLoadTableOrPartition,
+ replace);
+ } catch (Exception e) {
+ throw new FlinkHiveException("Failed to load partition", e);
+ }
+ }
+
List<Object> createHiveNNs(
Table table, Configuration conf, List<String> nnCols, List<Byte>
traits)
throws ClassNotFoundException, NoSuchMethodException,
InvocationTargetException,
@@ -378,4 +490,22 @@ public class HiveShimV310 extends HiveShimV239 {
new Class[] {Configuration.class},
new Object[] {conf});
}
+
+ Object getLoadFileType(Class clazzLoadFileType, boolean replace) {
+ Object loadFileType;
+ if (replace) {
+ loadFileType =
+ Arrays.stream(clazzLoadFileType.getEnumConstants())
+ .filter(s -> s.toString().equals("REPLACE_ALL"))
+ .findFirst()
+ .get();
+ } else {
+ loadFileType =
+ Arrays.stream(clazzLoadFileType.getEnumConstants())
+ .filter(s -> s.toString().equals("KEEP_EXISTING"))
+ .findFirst()
+ .get();
+ }
+ return loadFileType;
+ }
}
diff --git
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveOperationExecutor.java
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveOperationExecutor.java
index 459a1bbd7dd..2010062fe4c 100644
---
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveOperationExecutor.java
+++
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveOperationExecutor.java
@@ -31,10 +31,12 @@ import org.apache.flink.table.catalog.Column;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.catalog.hive.HiveCatalog;
import org.apache.flink.table.delegation.ExtendedOperationExecutor;
+import org.apache.flink.table.operations.ExplainOperation;
import org.apache.flink.table.operations.HiveSetOperation;
import org.apache.flink.table.operations.Operation;
import org.apache.flink.table.planner.delegation.PlannerContext;
import org.apache.flink.table.planner.delegation.hive.copy.HiveSetProcessor;
+import
org.apache.flink.table.planner.delegation.hive.operation.HiveLoadDataOperation;
import org.apache.flink.types.Row;
import org.apache.hadoop.hive.conf.HiveConf;
@@ -65,6 +67,14 @@ public class HiveOperationExecutor implements
ExtendedOperationExecutor {
public Optional<TableResultInternal> executeOperation(Operation operation)
{
if (operation instanceof HiveSetOperation) {
return executeHiveSetOperation((HiveSetOperation) operation);
+ } else if (operation instanceof HiveLoadDataOperation) {
+ return executeHiveLoadDataOperation((HiveLoadDataOperation)
operation);
+ } else if (operation instanceof ExplainOperation) {
+ ExplainOperation explainOperation = (ExplainOperation) operation;
+ if (explainOperation.getChild() instanceof HiveLoadDataOperation) {
+ return explainHiveLoadDataOperation(
+ (HiveLoadDataOperation) explainOperation.getChild());
+ }
}
return Optional.empty();
}
@@ -123,4 +133,87 @@ public class HiveOperationExecutor implements
ExtendedOperationExecutor {
.data(rows)
.build();
}
+
+ private Optional<TableResultInternal> executeHiveLoadDataOperation(
+ HiveLoadDataOperation hiveLoadDataOperation) {
+ Catalog currentCatalog =
+
catalogManager.getCatalog(catalogManager.getCurrentCatalog()).orElse(null);
+ if (!(currentCatalog instanceof HiveCatalog)) {
+ throw new FlinkHiveException(
+ "Only support 'LOAD DATA INPATH' when the current catalog
is HiveCatalog in Hive dialect.");
+ }
+ try {
+ // Hive's loadTable/loadPartition will call method
+ // SessionState.get().getCurrentDatabase(), so we have to start a
session state
+ HiveSessionState.startSessionState(
+ ((HiveCatalog) currentCatalog).getHiveConf(),
catalogManager);
+ HiveCatalog hiveCatalog = (HiveCatalog) currentCatalog;
+ if (hiveLoadDataOperation.getPartitionSpec().size() > 0) {
+ hiveCatalog.loadPartition(
+ hiveLoadDataOperation.getPath(),
+ hiveLoadDataOperation.getTablePath(),
+ hiveLoadDataOperation.getPartitionSpec(),
+ hiveLoadDataOperation.isSrcLocal(),
+ hiveLoadDataOperation.isOverwrite());
+ } else {
+ hiveCatalog.loadTable(
+ hiveLoadDataOperation.getPath(),
+ hiveLoadDataOperation.getTablePath(),
+ hiveLoadDataOperation.isSrcLocal(),
+ hiveLoadDataOperation.isOverwrite());
+ }
+ return Optional.of(TableResultImpl.TABLE_RESULT_OK);
+ } finally {
+ HiveSessionState.clearSessionState();
+ }
+ }
+
+ private Optional<TableResultInternal> explainHiveLoadDataOperation(
+ HiveLoadDataOperation hiveLoadDataOperation) {
+ // get the plan for the partition part
+ String partitionExplain = "";
+ Map<String, String> partitionSpec =
hiveLoadDataOperation.getPartitionSpec();
+ if (!partitionSpec.isEmpty()) {
+ String[] pv = new String[partitionSpec.size()];
+ int i = 0;
+ for (Map.Entry<String, String> partition :
partitionSpec.entrySet()) {
+ pv[i++] = String.format("%s=%s", partition.getKey(),
partition.getValue());
+ }
+ partitionExplain = String.format(", partition=[%s]",
String.join(", ", pv));
+ }
+ // construct the full plan
+ String plan =
+ String.format(
+ "LoadData(filepath=[%s], "
+ + "table=[%s],"
+ + " overwrite=[%s], local=[%s]%s)",
+ hiveLoadDataOperation.getPath(),
+ hiveLoadDataOperation.getTablePath(),
+ hiveLoadDataOperation.isOverwrite(),
+ hiveLoadDataOperation.isSrcLocal(),
+ partitionExplain);
+
+ String explanation =
+ "== Abstract Syntax Tree =="
+ + System.lineSeparator()
+ + plan
+ + System.lineSeparator()
+ + System.lineSeparator()
+ + "== Optimized Physical Plan =="
+ + System.lineSeparator()
+ + plan
+ + System.lineSeparator()
+ + System.lineSeparator()
+ + "== Optimized Execution Plan =="
+ + System.lineSeparator()
+ + plan
+ + System.lineSeparator();
+
+ return Optional.of(
+ TableResultImpl.builder()
+ .resultKind(ResultKind.SUCCESS_WITH_CONTENT)
+ .schema(ResolvedSchema.of(Column.physical("result",
DataTypes.STRING())))
+ .data(Collections.singletonList(Row.of(explanation)))
+ .build());
+ }
}
diff --git
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParser.java
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParser.java
index 1fb2bf88ce0..4214dbda09c 100644
---
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParser.java
+++
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParser.java
@@ -18,7 +18,6 @@
package org.apache.flink.table.planner.delegation.hive;
-import org.apache.flink.connectors.hive.FlinkHiveException;
import org.apache.flink.connectors.hive.HiveInternalOptions;
import org.apache.flink.table.api.SqlParserException;
import org.apache.flink.table.api.TableConfig;
@@ -28,7 +27,6 @@ import org.apache.flink.table.catalog.CatalogManager;
import org.apache.flink.table.catalog.hive.HiveCatalog;
import org.apache.flink.table.catalog.hive.client.HiveShim;
import org.apache.flink.table.catalog.hive.client.HiveShimLoader;
-import org.apache.flink.table.catalog.hive.util.HiveReflectionUtils;
import org.apache.flink.table.module.hive.udf.generic.HiveGenericUDFGrouping;
import org.apache.flink.table.operations.ExplainOperation;
import org.apache.flink.table.operations.HiveSetOperation;
@@ -48,31 +46,22 @@ import
org.apache.flink.table.planner.delegation.hive.copy.HiveParserQueryState;
import org.apache.flink.table.planner.delegation.hive.parse.HiveASTParser;
import
org.apache.flink.table.planner.delegation.hive.parse.HiveParserCreateViewInfo;
import
org.apache.flink.table.planner.delegation.hive.parse.HiveParserDDLSemanticAnalyzer;
+import
org.apache.flink.table.planner.delegation.hive.parse.HiveParserLoadSemanticAnalyzer;
import org.apache.flink.table.planner.operations.PlannerQueryOperation;
import org.apache.flink.table.planner.parse.CalciteParser;
import org.apache.flink.table.planner.plan.FlinkCalciteCatalogReader;
-import org.apache.flink.util.FileUtils;
import org.apache.flink.util.Preconditions;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.tools.FrameworkConfig;
-import org.apache.hadoop.hive.common.JavaUtils;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.VariableSubstitution;
-import org.apache.hadoop.hive.ql.lockmgr.LockException;
-import org.apache.hadoop.hive.ql.metadata.Hive;
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.processors.HiveCommand;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.File;
-import java.io.IOException;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-import java.sql.Timestamp;
-import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@@ -90,13 +79,6 @@ public class HiveParser extends ParserImpl {
private static final Logger LOG =
LoggerFactory.getLogger(HiveParser.class);
- private static final Method setCurrentTSMethod =
- HiveReflectionUtils.tryGetMethod(
- SessionState.class, "setupQueryCurrentTimestamp", new
Class[0]);
- private static final Method getCurrentTSMethod =
- HiveReflectionUtils.tryGetMethod(
- SessionState.class, "getQueryCurrentTimestamp", new
Class[0]);
-
// need to maintain the HiveParserASTNode types for DDLs
private static final Set<Integer> DDL_NODES;
@@ -225,12 +207,12 @@ public class HiveParser extends ParserImpl {
// substitute variables for the statement
statement = substituteVariables(hiveConf, statement);
// creates SessionState
- startSessionState(hiveConf, catalogManager);
+ HiveSessionState.startSessionState(hiveConf, catalogManager);
// We override Hive's grouping function. Refer to the
implementation for more details.
hiveShim.registerTemporaryFunction("grouping",
HiveGenericUDFGrouping.class);
return processCmd(statement, hiveConf, hiveShim, (HiveCatalog)
currentCatalog);
} finally {
- clearSessionState();
+ HiveSessionState.clearSessionState();
}
}
@@ -389,6 +371,12 @@ public class HiveParser extends ParserImpl {
HiveShim hiveShim,
HiveParserASTNode input)
throws SemanticException {
+ if (isLoadData(input)) {
+ HiveParserLoadSemanticAnalyzer loadSemanticAnalyzer =
+ new HiveParserLoadSemanticAnalyzer(
+ hiveConf, frameworkConfig,
plannerContext.getCluster());
+ return loadSemanticAnalyzer.convertToOperation(input);
+ }
if (isMultiDestQuery(input)) {
return processMultiDestQuery(context, hiveConf, hiveShim, input);
} else {
@@ -396,6 +384,10 @@ public class HiveParser extends ParserImpl {
}
}
+ private boolean isLoadData(HiveParserASTNode input) {
+ return input.getType() == HiveASTParser.TOK_LOAD;
+ }
+
private boolean isMultiDestQuery(HiveParserASTNode astNode) {
// Hive's multi dest insert will always be [FROM, INSERT+]
// so, if it's children count is more than 2, it should be a
multi-dest query
@@ -481,126 +473,4 @@ public class HiveParser extends ParserImpl {
return new PlannerQueryOperation(relNode);
}
}
-
- private void startSessionState(HiveConf hiveConf, CatalogManager
catalogManager) {
- final ClassLoader contextCL =
Thread.currentThread().getContextClassLoader();
- try {
- HiveParserSessionState sessionState = new
HiveParserSessionState(hiveConf, contextCL);
- sessionState.initTxnMgr(hiveConf);
-
sessionState.setCurrentDatabase(catalogManager.getCurrentDatabase());
- // some Hive functions needs the timestamp
- setCurrentTimestamp(sessionState);
- SessionState.setCurrentSessionState(sessionState);
- } catch (LockException e) {
- throw new FlinkHiveException("Failed to init SessionState", e);
- } finally {
- // don't let SessionState mess up with our context classloader
- Thread.currentThread().setContextClassLoader(contextCL);
- }
- }
-
- private static void setCurrentTimestamp(HiveParserSessionState
sessionState) {
- if (setCurrentTSMethod != null) {
- try {
- setCurrentTSMethod.invoke(sessionState);
- Object currentTs = getCurrentTSMethod.invoke(sessionState);
- if (currentTs instanceof Instant) {
- sessionState.hiveParserCurrentTS =
Timestamp.from((Instant) currentTs);
- } else {
- sessionState.hiveParserCurrentTS = (Timestamp) currentTs;
- }
- } catch (IllegalAccessException | InvocationTargetException e) {
- throw new FlinkHiveException("Failed to set current timestamp
for session", e);
- }
- } else {
- sessionState.hiveParserCurrentTS = new
Timestamp(System.currentTimeMillis());
- }
- }
-
- private void clearSessionState() {
- SessionState sessionState = SessionState.get();
- if (sessionState != null) {
- try {
- sessionState.close();
- } catch (Exception e) {
- LOG.warn("Error closing SessionState", e);
- }
- }
- }
-
- /** Sub-class of SessionState to meet our needs. */
- public static class HiveParserSessionState extends SessionState {
-
- private static final Class registryClz;
- private static final Method getRegistry;
- private static final Method clearRegistry;
- private static final Method closeRegistryLoaders;
-
- private Timestamp hiveParserCurrentTS;
-
- static {
- registryClz =
-
HiveReflectionUtils.tryGetClass("org.apache.hadoop.hive.ql.exec.Registry");
- if (registryClz != null) {
- getRegistry =
- HiveReflectionUtils.tryGetMethod(
- SessionState.class, "getRegistry", new
Class[0]);
- clearRegistry =
- HiveReflectionUtils.tryGetMethod(registryClz, "clear",
new Class[0]);
- closeRegistryLoaders =
- HiveReflectionUtils.tryGetMethod(
- registryClz, "closeCUDFLoaders", new Class[0]);
- } else {
- getRegistry = null;
- clearRegistry = null;
- closeRegistryLoaders = null;
- }
- }
-
- private final ClassLoader originContextLoader;
- private final ClassLoader hiveLoader;
-
- public HiveParserSessionState(HiveConf conf, ClassLoader
contextLoader) {
- super(conf);
- this.originContextLoader = contextLoader;
- this.hiveLoader = getConf().getClassLoader();
- // added jars are handled by context class loader, so we always
use it as the session
- // class loader
- getConf().setClassLoader(contextLoader);
- }
-
- @Override
- public void close() throws IOException {
- clearSessionRegistry();
- if (getTxnMgr() != null) {
- getTxnMgr().closeTxnManager();
- }
- // close the classloader created in hive
- JavaUtils.closeClassLoadersTo(hiveLoader, originContextLoader);
- File resourceDir =
- new
File(getConf().getVar(HiveConf.ConfVars.DOWNLOADED_RESOURCES_DIR));
- LOG.debug("Removing resource dir " + resourceDir);
- FileUtils.deleteDirectoryQuietly(resourceDir);
- Hive.closeCurrent();
- detachSession();
- }
-
- public Timestamp getHiveParserCurrentTS() {
- return hiveParserCurrentTS;
- }
-
- private void clearSessionRegistry() {
- if (getRegistry != null) {
- try {
- Object registry = getRegistry.invoke(this);
- if (registry != null) {
- clearRegistry.invoke(registry);
- closeRegistryLoaders.invoke(registry);
- }
- } catch (IllegalAccessException | InvocationTargetException e)
{
- LOG.warn("Failed to clear session registry", e);
- }
- }
- }
- }
}
diff --git
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveSessionState.java
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveSessionState.java
new file mode 100644
index 00000000000..5bfa16481d9
--- /dev/null
+++
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveSessionState.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.delegation.hive;
+
+import org.apache.flink.connectors.hive.FlinkHiveException;
+import org.apache.flink.table.catalog.CatalogManager;
+import org.apache.flink.table.catalog.hive.util.HiveReflectionUtils;
+import org.apache.flink.util.FileUtils;
+
+import org.apache.hadoop.hive.common.JavaUtils;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.lockmgr.LockException;
+import org.apache.hadoop.hive.ql.metadata.Hive;
+import org.apache.hadoop.hive.ql.session.SessionState;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.sql.Timestamp;
+import java.time.Instant;
+
+/** Sub-class of SessionState to meet our needs. */
+public class HiveSessionState extends SessionState {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(HiveSessionState.class);
+
+ private static final Method setCurrentTSMethod =
+ HiveReflectionUtils.tryGetMethod(
+ SessionState.class, "setupQueryCurrentTimestamp", new
Class[0]);
+ private static final Method getCurrentTSMethod =
+ HiveReflectionUtils.tryGetMethod(
+ SessionState.class, "getQueryCurrentTimestamp", new
Class[0]);
+
+ private static final Class registryClz;
+ private static final Method getRegistry;
+ private static final Method clearRegistry;
+ private static final Method closeRegistryLoaders;
+
+ private Timestamp hiveParserCurrentTS;
+
+ static {
+ registryClz =
HiveReflectionUtils.tryGetClass("org.apache.hadoop.hive.ql.exec.Registry");
+ if (registryClz != null) {
+ getRegistry =
+ HiveReflectionUtils.tryGetMethod(
+ SessionState.class, "getRegistry", new Class[0]);
+ clearRegistry = HiveReflectionUtils.tryGetMethod(registryClz,
"clear", new Class[0]);
+ closeRegistryLoaders =
+ HiveReflectionUtils.tryGetMethod(registryClz,
"closeCUDFLoaders", new Class[0]);
+ } else {
+ getRegistry = null;
+ clearRegistry = null;
+ closeRegistryLoaders = null;
+ }
+ }
+
+ private final ClassLoader originContextLoader;
+ private final ClassLoader hiveLoader;
+
+ public HiveSessionState(HiveConf conf, ClassLoader contextLoader) {
+ super(conf);
+ this.originContextLoader = contextLoader;
+ this.hiveLoader = getConf().getClassLoader();
+ // added jars are handled by context class loader, so we always use it
as the session
+ // class loader
+ getConf().setClassLoader(contextLoader);
+ }
+
+ @Override
+ public void close() throws IOException {
+ clearSessionRegistry();
+ if (getTxnMgr() != null) {
+ getTxnMgr().closeTxnManager();
+ }
+ // close the classloader created in hive
+ JavaUtils.closeClassLoadersTo(hiveLoader, originContextLoader);
+ File resourceDir = new
File(getConf().getVar(HiveConf.ConfVars.DOWNLOADED_RESOURCES_DIR));
+ LOG.debug("Removing resource dir " + resourceDir);
+ FileUtils.deleteDirectoryQuietly(resourceDir);
+ Hive.closeCurrent();
+ detachSession();
+ }
+
+ public void setHiveParserCurrentTSCurrentTS(Timestamp hiveParserCurrentTS)
{
+ this.hiveParserCurrentTS = hiveParserCurrentTS;
+ }
+
+ public Timestamp getHiveParserCurrentTS() {
+ return hiveParserCurrentTS;
+ }
+
+ private void clearSessionRegistry() {
+ if (getRegistry != null) {
+ try {
+ Object registry = getRegistry.invoke(this);
+ if (registry != null) {
+ clearRegistry.invoke(registry);
+ closeRegistryLoaders.invoke(registry);
+ }
+ } catch (IllegalAccessException | InvocationTargetException e) {
+ LOG.warn("Failed to clear session registry", e);
+ }
+ }
+ }
+
+ public static void startSessionState(HiveConf hiveConf, CatalogManager
catalogManager) {
+ final ClassLoader contextCL =
Thread.currentThread().getContextClassLoader();
+ try {
+ HiveSessionState sessionState = new HiveSessionState(hiveConf,
contextCL);
+ sessionState.initTxnMgr(hiveConf);
+
sessionState.setCurrentDatabase(catalogManager.getCurrentDatabase());
+ // some Hive functions needs the timestamp
+ setCurrentTimestamp(sessionState);
+ SessionState.setCurrentSessionState(sessionState);
+ } catch (LockException e) {
+ throw new FlinkHiveException("Failed to init SessionState", e);
+ } finally {
+ // don't let SessionState mess up with our context classloader
+ Thread.currentThread().setContextClassLoader(contextCL);
+ }
+ }
+
+ private static void setCurrentTimestamp(HiveSessionState sessionState) {
+ if (setCurrentTSMethod != null) {
+ try {
+ setCurrentTSMethod.invoke(sessionState);
+ Object currentTs = getCurrentTSMethod.invoke(sessionState);
+ if (currentTs instanceof Instant) {
+ sessionState.setHiveParserCurrentTSCurrentTS(
+ Timestamp.from((Instant) currentTs));
+ } else {
+ sessionState.setHiveParserCurrentTSCurrentTS((Timestamp)
currentTs);
+ }
+ } catch (IllegalAccessException | InvocationTargetException e) {
+ throw new FlinkHiveException("Failed to set current timestamp
for session", e);
+ }
+ } else {
+ sessionState.setHiveParserCurrentTSCurrentTS(new
Timestamp(System.currentTimeMillis()));
+ }
+ }
+
+ public static void clearSessionState() {
+ SessionState sessionState = SessionState.get();
+ if (sessionState != null) {
+ try {
+ sessionState.close();
+ } catch (Exception e) {
+ LOG.warn("Error closing SessionState", e);
+ }
+ }
+ }
+}
diff --git
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/SqlFunctionConverter.java
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/SqlFunctionConverter.java
index c9bf54febee..a154cdc6afa 100644
---
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/SqlFunctionConverter.java
+++
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/SqlFunctionConverter.java
@@ -97,8 +97,7 @@ public class SqlFunctionConverter extends RexShuttle {
if (convertedOp instanceof FlinkSqlTimestampFunction) {
// flink's current_timestamp has different type from hive's,
convert it to a literal
Timestamp currentTS =
- ((HiveParser.HiveParserSessionState)
SessionState.get())
- .getHiveParserCurrentTS();
+ ((HiveSessionState)
SessionState.get()).getHiveParserCurrentTS();
HiveShim hiveShim = HiveParserUtils.getSessionHiveShim();
try {
return HiveParserRexNodeConverter.convertConstant(
diff --git
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/copy/HiveParserContext.java
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/copy/HiveParserContext.java
index f928be68724..33fd94fbe44 100644
---
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/copy/HiveParserContext.java
+++
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/copy/HiveParserContext.java
@@ -82,7 +82,7 @@ public class HiveParserContext {
*/
public HiveParserContext(Configuration conf) {
this.conf = conf;
- viewsTokenRewriteStreams = new HashMap<>();
+ this.viewsTokenRewriteStreams = new HashMap<>();
}
// Find whether we should execute the current query due to explain.
diff --git
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/operation/HiveLoadDataOperation.java
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/operation/HiveLoadDataOperation.java
new file mode 100644
index 00000000000..339985336b5
--- /dev/null
+++
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/operation/HiveLoadDataOperation.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.delegation.hive.operation;
+
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.operations.Operation;
+
+import org.apache.hadoop.fs.Path;
+
+import java.util.Map;
+
+/**
+ * An operation that loads data into a Hive table.
+ *
+ * <pre>The syntax is: {@code
+ * LOAD DATA [LOCAL] INPATH 'filepath' [OVERWRITE] INTO TABLE tablename
+ * [PARTITION (partcol1=val1, partcol2=val2 ...)]
+ * }
+ * </pre>
+ */
+public class HiveLoadDataOperation implements Operation {
+ private final Path path;
+ private final ObjectPath tablePath;
+ private final boolean isOverwrite;
+ private final boolean isSrcLocal;
+ private final Map<String, String> partitionSpec;
+
+ public HiveLoadDataOperation(
+ Path path,
+ ObjectPath tablePath,
+ boolean isOverwrite,
+ boolean isSrcLocal,
+ Map<String, String> partitionSpec) {
+ this.path = path;
+ this.tablePath = tablePath;
+ this.isOverwrite = isOverwrite;
+ this.isSrcLocal = isSrcLocal;
+ this.partitionSpec = partitionSpec;
+ }
+
+ public Path getPath() {
+ return path;
+ }
+
+ public ObjectPath getTablePath() {
+ return tablePath;
+ }
+
+ public boolean isOverwrite() {
+ return isOverwrite;
+ }
+
+ public boolean isSrcLocal() {
+ return isSrcLocal;
+ }
+
+ public Map<String, String> getPartitionSpec() {
+ return partitionSpec;
+ }
+
+ @Override
+ public String asSummaryString() {
+ StringBuilder stringBuilder = new StringBuilder("LOAD DATA");
+ if (isSrcLocal) {
+ stringBuilder.append(" LOCAL");
+ }
+ stringBuilder
+ .append(" INPATH")
+ .append(String.format(" '%s'", path))
+ .append(isOverwrite ? " OVERWRITE" : "")
+ .append(" INTO TABLE ")
+ .append(tablePath.getFullName());
+ if (partitionSpec.size() > 0) {
+ String[] pv = new String[partitionSpec.size()];
+ int i = 0;
+ for (Map.Entry<String, String> partition :
partitionSpec.entrySet()) {
+ pv[i++] = String.format("%s=%s", partition.getKey(),
partition.getValue());
+ }
+ stringBuilder.append(" PARTITION (").append(String.join(", ",
pv)).append(")");
+ }
+ return stringBuilder.toString();
+ }
+}
diff --git
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/parse/HiveParserLoadSemanticAnalyzer.java
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/parse/HiveParserLoadSemanticAnalyzer.java
new file mode 100644
index 00000000000..81047f2e5d6
--- /dev/null
+++
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/parse/HiveParserLoadSemanticAnalyzer.java
@@ -0,0 +1,294 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.delegation.hive.parse;
+
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.planner.delegation.hive.copy.HiveParserASTNode;
+import
org.apache.flink.table.planner.delegation.hive.copy.HiveParserBaseSemanticAnalyzer.TableSpec;
+import
org.apache.flink.table.planner.delegation.hive.operation.HiveLoadDataOperation;
+import org.apache.flink.util.StringUtils;
+
+import org.antlr.runtime.tree.Tree;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.tools.FrameworkConfig;
+import org.apache.commons.codec.DecoderException;
+import org.apache.commons.codec.net.URLCodec;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.TableType;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.ql.ErrorMsg;
+import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils;
+import org.apache.hadoop.hive.ql.metadata.Hive;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.parse.EximUtil;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.mapred.InputFormat;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.LinkedHashMap;
+import java.util.List;
+
+import static
org.apache.flink.table.planner.delegation.hive.copy.HiveParserBaseSemanticAnalyzer.stripQuotes;
+
+/** Ported hive's org.apache.hadoop.hive.ql.parse.LoadSemanticAnalyzer. */
+public class HiveParserLoadSemanticAnalyzer {
+
+ private final HiveConf conf;
+ private final Hive db;
+ private final FrameworkConfig frameworkConfig;
+ private final RelOptCluster cluster;
+
+ public HiveParserLoadSemanticAnalyzer(
+ HiveConf conf, FrameworkConfig frameworkConfig, RelOptCluster
cluster)
+ throws SemanticException {
+ this.conf = conf;
+ try {
+ this.db = Hive.get(conf);
+ } catch (HiveException e) {
+ throw new SemanticException(e);
+ }
+ this.frameworkConfig = frameworkConfig;
+ this.cluster = cluster;
+ }
+
+ public HiveLoadDataOperation convertToOperation(HiveParserASTNode ast)
+ throws SemanticException {
+ boolean isLocal = false;
+ boolean isOverWrite = false;
+ Tree fromTree = ast.getChild(0);
+ HiveParserASTNode tableTree = (HiveParserASTNode) ast.getChild(1);
+
+ if (ast.getChildCount() == 4) {
+ isLocal = true;
+ isOverWrite = true;
+ }
+
+ if (ast.getChildCount() == 3) {
+ if (ast.getChild(2).getText().equalsIgnoreCase("local")) {
+ isLocal = true;
+ } else {
+ isOverWrite = true;
+ }
+ }
+
+ // initialize load path
+ URI fromURI;
+ try {
+ String fromPath = stripQuotes(fromTree.getText());
+ fromURI = initializeFromURI(fromPath, isLocal);
+ } catch (IOException | URISyntaxException e) {
+ throw new SemanticException(ErrorMsg.INVALID_PATH.getMsg(fromTree,
e.getMessage()), e);
+ }
+
+ // initialize destination table/partition
+ TableSpec ts = new TableSpec(db, conf, tableTree, frameworkConfig,
cluster);
+
+ if (ts.tableHandle.isView() || ts.tableHandle.isMaterializedView()) {
+ throw new SemanticException(ErrorMsg.DML_AGAINST_VIEW.getMsg());
+ }
+ if (ts.tableHandle.isNonNative()) {
+ throw new
SemanticException(ErrorMsg.LOAD_INTO_NON_NATIVE.getMsg());
+ }
+
+ if (ts.tableHandle.isStoredAsSubDirectories()) {
+ throw new
SemanticException(ErrorMsg.LOAD_INTO_STORED_AS_DIR.getMsg());
+ }
+
+ List<FieldSchema> parts = ts.tableHandle.getPartitionKeys();
+ if ((parts != null && parts.size() > 0)
+ && (ts.partSpec == null || ts.partSpec.size() == 0)) {
+ throw new
SemanticException(ErrorMsg.NEED_PARTITION_ERROR.getMsg());
+ }
+
+ List<String> bucketCols = ts.tableHandle.getBucketCols();
+ if (bucketCols != null && !bucketCols.isEmpty()) {
+ String error = HiveConf.StrictChecks.checkBucketing(conf);
+ if (error != null) {
+ throw new SemanticException(
+ "Please load into an intermediate table"
+ + " and use 'insert... select' to allow Hive
to enforce bucketing. "
+ + error);
+ }
+ }
+
+ // make sure the arguments make sense, may need to write "LOAD DATA"
to "INSERT AS SELECT"
+ // when there's any directory in the fromURL
+ List<FileStatus> files = applyConstraintsAndGetFiles(fromURI,
fromTree, isLocal);
+
+ // for managed tables, make sure the file formats match
+ if (TableType.MANAGED_TABLE.equals(ts.tableHandle.getTableType())
+ && conf.getBoolVar(HiveConf.ConfVars.HIVECHECKFILEFORMAT)) {
+ ensureFileFormatsMatch(ts, files, fromURI);
+ }
+
+ return new HiveLoadDataOperation(
+ new Path(fromURI),
+ new ObjectPath(ts.tableHandle.getDbName(),
ts.tableHandle.getTableName()),
+ isOverWrite,
+ isLocal,
+ ts.partSpec == null ? new LinkedHashMap<>() : ts.partSpec);
+ }
+
+ private List<FileStatus> applyConstraintsAndGetFiles(URI fromURI, Tree
ast, boolean isLocal)
+ throws SemanticException {
+
+ FileStatus[] srcs;
+
+ // local mode implies that scheme should be "file"
+ // we can change this going forward
+ if (isLocal && !fromURI.getScheme().equals("file")) {
+ throw new SemanticException(
+ ErrorMsg.ILLEGAL_PATH.getMsg(
+ ast,
+ "Source file system should be \"file\" if
\"local\" is specified"));
+ }
+
+ try {
+ srcs = matchFilesOrDir(FileSystem.get(fromURI, conf), new
Path(fromURI));
+ if (srcs == null || srcs.length == 0) {
+ throw new SemanticException(
+ HiveParserErrorMsg.getMsg(
+ ErrorMsg.INVALID_PATH, ast, "No files matching
path " + fromURI));
+ }
+
+ for (FileStatus oneSrc : srcs) {
+ if (oneSrc.isDir()) {
+ throw new SemanticException(
+ ErrorMsg.INVALID_PATH.getMsg(
+ ast,
+ "source contains directory: " +
oneSrc.getPath().toString()));
+ }
+ }
+ } catch (IOException e) {
+ throw new
SemanticException(HiveParserErrorMsg.getMsg(ErrorMsg.INVALID_PATH, ast), e);
+ }
+
+ return Arrays.asList(srcs);
+ }
+
+ public static FileStatus[] matchFilesOrDir(FileSystem fs, Path path)
throws IOException {
+ FileStatus[] srcs =
+ fs.globStatus(
+ path,
+ p -> {
+ String name = p.getName();
+ return name.equals(EximUtil.METADATA_NAME)
+ || !name.startsWith("_") &&
!name.startsWith(".");
+ });
+ if ((srcs != null) && srcs.length == 1) {
+ if (srcs[0].isDir()) {
+ srcs =
+ fs.listStatus(
+ srcs[0].getPath(),
+ p -> {
+ String name = p.getName();
+ return !name.startsWith("_") &&
!name.startsWith(".");
+ });
+ }
+ }
+ return (srcs);
+ }
+
+ private URI initializeFromURI(String fromPath, boolean isLocal)
+ throws IOException, URISyntaxException, SemanticException {
+ URI fromURI = new Path(fromPath).toUri();
+
+ String fromScheme = fromURI.getScheme();
+ String fromAuthority = fromURI.getAuthority();
+ String path = fromURI.getPath();
+
+ // generate absolute path relative to current directory or hdfs home
+ // directory
+ if (!path.startsWith("/")) {
+ if (isLocal) {
+ try {
+ path =
+ new String(
+ URLCodec.decodeUrl(
+ new
Path(System.getProperty("user.dir"), fromPath)
+ .toUri()
+ .toString()
+
.getBytes(StandardCharsets.US_ASCII)),
+ StandardCharsets.US_ASCII);
+ } catch (DecoderException de) {
+ throw new SemanticException("URL Decode failed", de);
+ }
+ } else {
+ path =
+ new Path(new Path("/user/" +
System.getProperty("user.name")), path)
+ .toString();
+ }
+ }
+
+ // set correct scheme and authority
+ if (StringUtils.isNullOrWhitespaceOnly(fromScheme)) {
+ if (isLocal) {
+ // file for local
+ fromScheme = "file";
+ } else {
+ // use default values from fs.default.name
+ URI defaultURI = FileSystem.get(conf).getUri();
+ fromScheme = defaultURI.getScheme();
+ fromAuthority = defaultURI.getAuthority();
+ }
+ }
+
+ // if scheme is specified but not authority then use the default
authority
+ if ((!fromScheme.equals("file")) &&
StringUtils.isNullOrWhitespaceOnly(fromAuthority)) {
+ URI defaultURI = FileSystem.get(conf).getUri();
+ fromAuthority = defaultURI.getAuthority();
+ }
+
+ return new URI(fromScheme, fromAuthority, path, null, null);
+ }
+
+ private void ensureFileFormatsMatch(
+ TableSpec ts, List<FileStatus> fileStatuses, final URI fromURI)
+ throws SemanticException {
+ final Class<? extends InputFormat> destInputFormat;
+ try {
+ if (ts.getPartSpec() == null || ts.getPartSpec().isEmpty()) {
+ destInputFormat = ts.tableHandle.getInputFormatClass();
+ } else {
+ destInputFormat = ts.partHandle.getInputFormatClass();
+ }
+ } catch (HiveException e) {
+ throw new SemanticException(e);
+ }
+
+ try {
+ FileSystem fs = FileSystem.get(fromURI, conf);
+ boolean validFormat =
+ HiveFileFormatUtils.checkInputFormat(fs, conf,
destInputFormat, fileStatuses);
+ if (!validFormat) {
+ throw new
SemanticException(ErrorMsg.INVALID_FILE_FORMAT_IN_LOAD.getMsg());
+ }
+ } catch (Exception e) {
+ throw new SemanticException(
+ "Unable to load data to destination table." + " Error: " +
e.getMessage());
+ }
+ }
+}
diff --git
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectQueryITCase.java
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectQueryITCase.java
index 2a8fe3c6868..0817af4ff4e 100644
---
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectQueryITCase.java
+++
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectQueryITCase.java
@@ -51,6 +51,7 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
+import java.util.Objects;
import java.util.stream.Collectors;
import static
org.apache.flink.table.planner.utils.TableTestUtil.readFromResource;
@@ -66,6 +67,7 @@ public class HiveDialectQueryITCase {
private static HiveCatalog hiveCatalog;
private static TableEnvironment tableEnv;
+ private static String warehouse;
@BeforeClass
public static void setup() throws Exception {
@@ -74,6 +76,7 @@ public class HiveDialectQueryITCase {
hiveCatalog.getHiveConf().setVar(HiveConf.ConfVars.HIVE_QUOTEDID_SUPPORT,
"none");
hiveCatalog.open();
tableEnv = getTableEnvWithHiveCatalog();
+ warehouse =
hiveCatalog.getHiveConf().getVar(HiveConf.ConfVars.METASTOREWAREHOUSE);
// create tables
tableEnv.executeSql("create table foo (x int, y int)");
@@ -505,19 +508,14 @@ public class HiveDialectQueryITCase {
try {
// test explain transform
String actualPlan =
- (String)
- CollectionUtil.iteratorToList(
- tableEnv.executeSql(
- "explain select
transform(key, value)"
- + " ROW
FORMAT SERDE 'MySerDe'"
- + " WITH
SERDEPROPERTIES ('p1'='v1','p2'='v2')"
- + "
RECORDWRITER 'MyRecordWriter' "
- + " using
'cat' as (cola int, value string)"
- + " ROW
FORMAT DELIMITED FIELDS TERMINATED BY ','"
- + "
RECORDREADER 'MyRecordReader' from src")
- .collect())
- .get(0)
- .getField(0);
+ explainSql(
+ "select transform(key, value)"
+ + " ROW FORMAT SERDE 'MySerDe'"
+ + " WITH SERDEPROPERTIES
('p1'='v1','p2'='v2')"
+ + " RECORDWRITER 'MyRecordWriter' "
+ + " using 'cat' as (cola int, value
string)"
+ + " ROW FORMAT DELIMITED FIELDS TERMINATED
BY ','"
+ + " RECORDREADER 'MyRecordReader' from
src");
assertThat(actualPlan).isEqualTo(readFromResource("/explain/testScriptTransform.out"));
// transform using + specified schema
@@ -587,13 +585,7 @@ public class HiveDialectQueryITCase {
+ " insert overwrite table t1 select id, name
where age < 20"
+ " insert overwrite table t2 select id, name
where age > 20";
// test explain
- String actualPlan =
- (String)
- CollectionUtil.iteratorToList(
- tableEnv.executeSql("explain " +
multiInsertSql)
- .collect())
- .get(0)
- .getField(0);
+ String actualPlan = explainSql(multiInsertSql);
assertThat(actualPlan).isEqualTo(readFromResource("/explain/testMultiInsert.out"));
// test execution
tableEnv.executeSql("insert into table t3 values (1, 'test1', 18
), (2, 'test2', 28 )")
@@ -673,6 +665,66 @@ public class HiveDialectQueryITCase {
}
}
+ @Test
+ public void testLoadData() throws Exception {
+ tableEnv.executeSql("create table tab1 (col1 int, col2 int) stored as
orc");
+ tableEnv.executeSql("create table tab2 (col1 int, col2 int) STORED AS
ORC");
+ tableEnv.executeSql(
+ "create table p_table(col1 int, col2 int) partitioned by
(dateint int) row format delimited fields terminated by ','");
+ try {
+ String testLoadCsvFilePath =
+
Objects.requireNonNull(getClass().getResource("/csv/test.csv")).toString();
+ // test explain
+ String actualPlan =
+ explainSql(
+ String.format(
+ "load data local inpath '%s' overwrite
into table p_table partition (dateint=2022) ",
+ testLoadCsvFilePath));
+ assertThat(actualPlan)
+ .isEqualTo(
+ readFromResource("/explain/testLoadData.out")
+ .replace("$filepath",
testLoadCsvFilePath));
+
+ // test load data into table
+ tableEnv.executeSql("insert into tab1 values (1, 1), (1, 2), (2,
1), (2, 2)").await();
+ tableEnv.executeSql(
+ String.format(
+ "load data local inpath '%s' INTO TABLE tab2",
warehouse + "/tab1"));
+ List<Row> result =
+ CollectionUtil.iteratorToList(
+ tableEnv.executeSql("select * from
tab2").collect());
+ assertThat(result.toString()).isEqualTo("[+I[1, 1], +I[1, 2],
+I[2, 1], +I[2, 2]]");
+
+ // test load data overwrite
+ tableEnv.executeSql("insert into tab1 values (2, 1), (2,
2)").await();
+ tableEnv.executeSql(
+ String.format(
+ "load data local inpath '%s' overwrite into table
tab2",
+ warehouse + "/tab1"));
+ result =
+ CollectionUtil.iteratorToList(
+ tableEnv.executeSql("select * from
tab2").collect());
+ assertThat(result.toString()).isEqualTo("[+I[2, 1], +I[2, 2]]");
+
+ // test load data into partition
+ tableEnv.executeSql(
+ String.format(
+ "load data inpath '%s' into table p_table
partition (dateint=2022) ",
+ testLoadCsvFilePath))
+ .await();
+ result =
+ CollectionUtil.iteratorToList(
+ tableEnv.executeSql("select * from p_table where
dateint=2022")
+ .collect());
+ assertThat(result.toString())
+ .isEqualTo("[+I[1, 1, 2022], +I[2, 2, 2022], +I[3, 3,
2022]]");
+ } finally {
+ tableEnv.executeSql("drop table tab1");
+ tableEnv.executeSql("drop table tab2");
+ tableEnv.executeSql("drop table p_table");
+ }
+ }
+
private void runQFile(File qfile) throws Exception {
QTest qTest = extractQTest(qfile);
for (int i = 0; i < qTest.statements.size(); i++) {
@@ -771,6 +823,13 @@ public class HiveDialectQueryITCase {
}
}
+ private String explainSql(String sql) {
+ return (String)
+ CollectionUtil.iteratorToList(tableEnv.executeSql("explain " +
sql).collect())
+ .get(0)
+ .getField(0);
+ }
+
private static TableEnvironment getTableEnvWithHiveCatalog() {
TableEnvironment tableEnv =
HiveTestUtils.createTableEnvInBatchMode(SqlDialect.HIVE);
tableEnv.registerCatalog(hiveCatalog.getName(), hiveCatalog);
diff --git
a/flink-connectors/flink-connector-hive/src/test/resources/explain/testLoadData.out
b/flink-connectors/flink-connector-hive/src/test/resources/explain/testLoadData.out
new file mode 100644
index 00000000000..3f750bc9800
--- /dev/null
+++
b/flink-connectors/flink-connector-hive/src/test/resources/explain/testLoadData.out
@@ -0,0 +1,8 @@
+== Abstract Syntax Tree ==
+LoadData(filepath=[$filepath], table=[default.p_table], overwrite=[true],
local=[true], partition=[dateint=2022])
+
+== Optimized Physical Plan ==
+LoadData(filepath=[$filepath], table=[default.p_table], overwrite=[true],
local=[true], partition=[dateint=2022])
+
+== Optimized Execution Plan ==
+LoadData(filepath=[$filepath], table=[default.p_table], overwrite=[true],
local=[true], partition=[dateint=2022])