This is an automated email from the ASF dual-hosted git repository.
bli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 15f8f3c [FLINK-14588][hive] Support Hive version 1.0.0 and 1.0.1
15f8f3c is described below
commit 15f8f3c52a1bf11ecf9f550388eee550b7fc763e
Author: Rui Li <[email protected]>
AuthorDate: Thu Oct 31 21:15:13 2019 +0800
[FLINK-14588][hive] Support Hive version 1.0.0 and 1.0.1
To support Hive 1.0.0 and 1.0.1.
This closes #10062.
---
flink-connectors/flink-connector-hive/pom.xml | 10 +-
.../connectors/hive/HiveTableOutputFormat.java | 51 ++--
.../flink/table/catalog/hive/HiveCatalog.java | 11 +-
.../flink/table/catalog/hive/client/HiveShim.java | 16 ++
.../table/catalog/hive/client/HiveShimLoader.java | 8 +
.../{HiveShimV110.java => HiveShimV100.java} | 47 +++-
.../table/catalog/hive/client/HiveShimV101.java | 25 ++
.../table/catalog/hive/client/HiveShimV110.java | 287 +++------------------
.../catalog/hive/util/HiveReflectionUtils.java | 25 --
.../connectors/hive/HiveRunnerShimLoader.java | 2 +
.../connectors/hive/TableEnvHiveConnectorTest.java | 6 +-
.../table/functions/hive/HiveGenericUDFTest.java | 7 +-
12 files changed, 164 insertions(+), 331 deletions(-)
diff --git a/flink-connectors/flink-connector-hive/pom.xml
b/flink-connectors/flink-connector-hive/pom.xml
index e20bdeb..de98156 100644
--- a/flink-connectors/flink-connector-hive/pom.xml
+++ b/flink-connectors/flink-connector-hive/pom.xml
@@ -666,7 +666,15 @@ under the License.
</build>
<profiles>
- <!-- Activate this profile with -Phive-1.2.1 to build and test
against hive-1.2.1 -->
+ <!-- Activate these profiles with -Phive-x.x.x to build and
test against different Hive versions -->
+ <profile>
+ <id>hive-1.0.1</id>
+ <properties>
+ <hive.version>1.0.1</hive.version>
+
<hivemetastore.hadoop.version>2.6.5</hivemetastore.hadoop.version>
+ <hiverunner.version>3.1.1</hiverunner.version>
+ </properties>
+ </profile>
<profile>
<id>hive-1.1.1</id>
<properties>
diff --git
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableOutputFormat.java
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableOutputFormat.java
index 0760ae5..15fcbd4 100644
---
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableOutputFormat.java
+++
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableOutputFormat.java
@@ -45,7 +45,7 @@ import org.apache.flink.util.StringUtils;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.common.FileUtils;
+import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.Warehouse;
import org.apache.hadoop.hive.metastore.api.MetaException;
@@ -53,8 +53,6 @@ import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
-import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils;
-import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.serde2.Deserializer;
import org.apache.hadoop.hive.serde2.SerDeException;
import org.apache.hadoop.hive.serde2.SerDeUtils;
@@ -71,8 +69,6 @@ import org.apache.hadoop.mapred.JobContext;
import org.apache.hadoop.mapred.JobContextImpl;
import org.apache.hadoop.mapred.JobID;
import org.apache.hadoop.mapred.OutputCommitter;
-import org.apache.hadoop.mapred.OutputFormat;
-import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.SequenceFileOutputFormat;
import org.apache.hadoop.mapred.TaskAttemptContext;
import org.apache.hadoop.mapred.TaskAttemptContextImpl;
@@ -108,6 +104,11 @@ public class HiveTableOutputFormat extends
HadoopOutputFormatCommonBase<Row> imp
private static final long serialVersionUID = 5167529504848109023L;
+ private static final PathFilter HIDDEN_FILES_PATH_FILTER = p -> {
+ String name = p.getName();
+ return !name.startsWith("_") && !name.startsWith(".");
+ };
+
private transient JobConf jobConf;
private transient ObjectPath tablePath;
private transient List<String> partitionColumns;
@@ -139,6 +140,8 @@ public class HiveTableOutputFormat extends
HadoopOutputFormatCommonBase<Row> imp
private transient String hiveVersion;
+ private transient HiveShim hiveShim;
+
// to convert Flink object to Hive object
private transient HiveObjectConversion[] hiveConversions;
@@ -167,6 +170,7 @@ public class HiveTableOutputFormat extends
HadoopOutputFormatCommonBase<Row> imp
isDynamicPartition = isPartitioned && partitionColumns.size() >
hiveTablePartition.getPartitionSpec().size();
hiveVersion =
Preconditions.checkNotNull(jobConf.get(HiveCatalogValidator.CATALOG_HIVE_VERSION),
"Hive version is not defined");
+ hiveShim = HiveShimLoader.loadHiveShim(hiveVersion);
}
// Custom serialization methods
@@ -209,6 +213,7 @@ public class HiveTableOutputFormat extends
HadoopOutputFormatCommonBase<Row> imp
partitionToWriter = new HashMap<>();
tableProperties = (Properties) in.readObject();
hiveVersion = (String) in.readObject();
+ hiveShim = HiveShimLoader.loadHiveShim(hiveVersion);
}
@Override
@@ -223,7 +228,6 @@ public class HiveTableOutputFormat extends
HadoopOutputFormatCommonBase<Row> imp
}
if (isPartitioned) {
if (isDynamicPartition) {
- HiveShim hiveShim =
HiveShimLoader.loadHiveShim(hiveVersion);
FileStatus[] generatedParts =
hiveShim.getFileStatusRecurse(stagingDir,
partitionColumns.size() -
hiveTablePartition.getPartitionSpec().size(), fs);
for (FileStatus part : generatedParts) {
@@ -378,16 +382,15 @@ public class HiveTableOutputFormat extends
HadoopOutputFormatCommonBase<Row> imp
// TODO: support setting auto-purge?
final boolean purge = true;
// Note we assume the srcDir is a hidden dir,
otherwise it will be deleted if it's a sub-dir of destDir
- FileStatus[] existingFiles =
fs.listStatus(destDir, FileUtils.HIDDEN_FILES_PATH_FILTER);
+ FileStatus[] existingFiles =
fs.listStatus(destDir, HIDDEN_FILES_PATH_FILTER);
if (existingFiles != null) {
- HiveShim hiveShim =
HiveShimLoader.loadHiveShim(hiveVersion);
for (FileStatus existingFile :
existingFiles) {
Preconditions.checkState(hiveShim.moveToTrash(fs, existingFile.getPath(),
jobConf, purge),
"Failed to overwrite
existing file " + existingFile);
}
}
}
- FileStatus[] srcFiles = fs.listStatus(srcDir,
FileUtils.HIDDEN_FILES_PATH_FILTER);
+ FileStatus[] srcFiles = fs.listStatus(srcDir,
HIDDEN_FILES_PATH_FILTER);
for (FileStatus srcFile : srcFiles) {
Path srcPath = srcFile.getPath();
Path destPath = new Path(destDir,
srcPath.getName());
@@ -441,17 +444,6 @@ public class HiveTableOutputFormat extends
HadoopOutputFormatCommonBase<Row> imp
private HivePartitionWriter writerForLocation(String location) throws
IOException {
JobConf clonedConf = new JobConf(jobConf);
clonedConf.set(OUTDIR, location);
- OutputFormat outputFormat;
- try {
- StorageDescriptor sd =
hiveTablePartition.getStorageDescriptor();
- Class outputFormatClz =
Class.forName(sd.getOutputFormat(), true,
- Thread.currentThread().getContextClassLoader());
- outputFormatClz =
HiveFileFormatUtils.getOutputFormatSubstitute(outputFormatClz);
- outputFormat = (OutputFormat)
outputFormatClz.newInstance();
- } catch (InstantiationException | IllegalAccessException |
ClassNotFoundException e) {
- throw new FlinkRuntimeException("Unable to instantiate
the hadoop output format", e);
- }
- ReflectionUtils.setConf(outputFormat, clonedConf);
OutputCommitter outputCommitter =
clonedConf.getOutputCommitter();
JobContext jobContext = new JobContextImpl(clonedConf, new
JobID());
outputCommitter.setupJob(jobContext);
@@ -474,28 +466,19 @@ public class HiveTableOutputFormat extends
HadoopOutputFormatCommonBase<Row> imp
SequenceFileOutputFormat.setOutputCompressionType(clonedConf, style);
}
}
+ StorageDescriptor sd =
hiveTablePartition.getStorageDescriptor();
String taskPartition =
String.valueOf(clonedConf.getInt("mapreduce.task.partition", -1));
Path taskPath = FileOutputFormat.getTaskOutputPath(clonedConf,
taskPartition);
- FileSinkOperator.RecordWriter recordWriter;
- try {
- recordWriter =
HiveFileFormatUtils.getRecordWriter(clonedConf, outputFormat,
- outputClass, isCompressed, tableProperties,
taskPath, Reporter.NULL);
- } catch (HiveException e) {
- throw new IOException(e);
- }
- return new HivePartitionWriter(clonedConf, outputFormat,
recordWriter, outputCommitter);
+ FileSinkOperator.RecordWriter recordWriter =
hiveShim.getHiveRecordWriter(
+ clonedConf, sd.getOutputFormat(), outputClass,
isCompressed, tableProperties, taskPath);
+ return new HivePartitionWriter(recordWriter, outputCommitter);
}
private static class HivePartitionWriter {
- private final JobConf jobConf;
- private final OutputFormat outputFormat;
private final FileSinkOperator.RecordWriter recordWriter;
private final OutputCommitter outputCommitter;
- HivePartitionWriter(JobConf jobConf, OutputFormat outputFormat,
FileSinkOperator.RecordWriter recordWriter,
- OutputCommitter
outputCommitter) {
- this.jobConf = jobConf;
- this.outputFormat = outputFormat;
+ HivePartitionWriter(FileSinkOperator.RecordWriter recordWriter,
OutputCommitter outputCommitter) {
this.recordWriter = recordWriter;
this.outputCommitter = outputCommitter;
}
diff --git
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
index 49ff7441..9afd55c 100644
---
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
+++
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
@@ -85,9 +85,7 @@ import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.metastore.api.UnknownDBException;
import org.apache.hadoop.hive.ql.io.StorageFormatDescriptor;
import org.apache.hadoop.hive.ql.io.StorageFormatFactory;
-import org.apache.hadoop.hive.serde2.SerDeException;
import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe;
-import org.apache.hive.common.util.HiveStringUtils;
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -525,12 +523,7 @@ public class HiveCatalog extends AbstractCatalog {
fields = hiveTable.getSd().getCols();
} else {
// get schema from deserializer
- try {
- fields =
HiveReflectionUtils.getFieldsFromDeserializer(hiveShim,
hiveTable.getTableName(),
-
HiveReflectionUtils.getDeserializer(hiveShim, hiveConf, hiveTable, true));
- } catch (SerDeException | MetaException e) {
- throw new CatalogException("Failed to get Hive
table schema from deserializer", e);
- }
+ fields = hiveShim.getFieldsFromDeserializer(hiveConf,
hiveTable, true);
}
TableSchema tableSchema =
HiveTableUtil.createTableSchema(fields,
hiveTable.getPartitionKeys());
@@ -1099,7 +1092,7 @@ public class HiveCatalog extends AbstractCatalog {
return new Function(
// due to
https://issues.apache.org/jira/browse/HIVE-22053, we have to normalize function
name ourselves
-
HiveStringUtils.normalizeIdentifier(functionPath.getObjectName()),
+ functionPath.getObjectName().trim().toLowerCase(),
functionPath.getDatabaseName(),
functionClassName,
null, // Owner name
diff --git
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShim.java
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShim.java
index 2111099..1e27fb6 100644
---
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShim.java
+++
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShim.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.IMetaStoreClient;
import org.apache.hadoop.hive.metastore.api.ColumnStatisticsData;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.Function;
import org.apache.hadoop.hive.metastore.api.InvalidOperationException;
import org.apache.hadoop.hive.metastore.api.MetaException;
@@ -34,15 +35,19 @@ import
org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.metastore.api.UnknownDBException;
+import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
import org.apache.hadoop.hive.ql.udf.generic.SimpleGenericUDAFParameterInfo;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.JobConf;
import org.apache.thrift.TException;
import java.io.IOException;
import java.io.Serializable;
import java.util.List;
import java.util.Map;
+import java.util.Properties;
/**
* A shim layer to support different versions of Hive.
@@ -184,4 +189,15 @@ public interface HiveShim extends Serializable {
*/
CatalogColumnStatisticsDataDate
toFlinkDateColStats(ColumnStatisticsData hiveDateColStats);
+ /**
+ * Get Hive's FileSinkOperator.RecordWriter.
+ */
+ FileSinkOperator.RecordWriter getHiveRecordWriter(JobConf jobConf,
String outputFormatClzName,
+ Class<? extends Writable> outValClz, boolean
isCompressed, Properties tableProps, Path outPath);
+
+ /**
+ * Get Hive table schema from deserializer.
+ */
+ List<FieldSchema> getFieldsFromDeserializer(Configuration conf, Table
table, boolean skipConfError);
+
}
diff --git
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimLoader.java
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimLoader.java
index 1645d08..bd4b414 100644
---
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimLoader.java
+++
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimLoader.java
@@ -32,6 +32,8 @@ import java.util.concurrent.ConcurrentHashMap;
*/
public class HiveShimLoader {
+ public static final String HIVE_VERSION_V1_0_0 = "1.0.0";
+ public static final String HIVE_VERSION_V1_0_1 = "1.0.1";
public static final String HIVE_VERSION_V1_1_0 = "1.1.0";
public static final String HIVE_VERSION_V1_1_1 = "1.1.1";
public static final String HIVE_VERSION_V1_2_0 = "1.2.0";
@@ -62,6 +64,12 @@ public class HiveShimLoader {
public static HiveShim loadHiveShim(String version) {
return hiveShims.computeIfAbsent(version, (v) -> {
+ if (v.startsWith(HIVE_VERSION_V1_0_0)) {
+ return new HiveShimV100();
+ }
+ if (v.startsWith(HIVE_VERSION_V1_0_1)) {
+ return new HiveShimV101();
+ }
if (v.startsWith(HIVE_VERSION_V1_1_0)) {
return new HiveShimV110();
}
diff --git
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV110.java
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV100.java
similarity index 84%
copy from
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV110.java
copy to
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV100.java
index 91fd80a..4afef67 100644
---
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV110.java
+++
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV100.java
@@ -23,6 +23,7 @@ import
org.apache.flink.table.catalog.hive.util.HiveReflectionUtils;
import org.apache.flink.table.catalog.stats.CatalogColumnStatisticsDataDate;
import org.apache.flink.table.functions.hive.FlinkHiveUDFException;
import org.apache.flink.table.functions.hive.conversion.HiveInspectors;
+import org.apache.flink.util.Preconditions;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
@@ -35,6 +36,7 @@ import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
import org.apache.hadoop.hive.metastore.IMetaStoreClient;
import org.apache.hadoop.hive.metastore.Warehouse;
import org.apache.hadoop.hive.metastore.api.ColumnStatisticsData;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.Function;
import org.apache.hadoop.hive.metastore.api.InvalidOperationException;
import org.apache.hadoop.hive.metastore.api.MetaException;
@@ -42,7 +44,11 @@ import
org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.metastore.api.UnknownDBException;
+import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
+import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils;
+import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
import org.apache.hadoop.hive.ql.udf.generic.SimpleGenericUDAFParameterInfo;
+import org.apache.hadoop.hive.serde2.Deserializer;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import
org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableConstantBinaryObjectInspector;
import
org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableConstantBooleanObjectInspector;
@@ -61,6 +67,9 @@ import
org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableConstantT
import org.apache.hadoop.hive.serde2.typeinfo.CharTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.VarcharTypeInfo;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Reporter;
import org.apache.thrift.TException;
import java.io.IOException;
@@ -70,11 +79,12 @@ import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
+import java.util.Properties;
/**
- * Shim for Hive version 1.1.0.
+ * Shim for Hive version 1.0.0.
*/
-public class HiveShimV110 implements HiveShim {
+public class HiveShimV100 implements HiveShim {
@Override
public IMetaStoreClient getHiveMetastoreClient(HiveConf hiveConf) {
@@ -276,7 +286,7 @@ public class HiveShimV110 implements HiveShim {
return
HiveReflectionUtils.createConstantObjectInspector(className, value.toString());
default:
throw new FlinkHiveUDFException(
- String.format("Cannot find
ConstantObjectInspector for %s", primitiveTypeInfo));
+ String.format("Cannot find
ConstantObjectInspector for %s", primitiveTypeInfo));
}
}
@@ -294,4 +304,35 @@ public class HiveShimV110 implements HiveShim {
public CatalogColumnStatisticsDataDate
toFlinkDateColStats(ColumnStatisticsData hiveDateColStats) {
throw new UnsupportedOperationException("DATE column stats are
not supported until Hive 1.2.0");
}
+
+ @Override
+ public FileSinkOperator.RecordWriter getHiveRecordWriter(JobConf
jobConf, String outputFormatClzName,
+ Class<? extends Writable> outValClz, boolean
isCompressed, Properties tableProps, Path outPath) {
+ try {
+ Class outputFormatClz =
Class.forName(outputFormatClzName);
+ Class utilClass = HiveFileFormatUtils.class;
+ Method utilMethod =
utilClass.getDeclaredMethod("getOutputFormatSubstitute", Class.class,
boolean.class);
+ outputFormatClz = (Class) utilMethod.invoke(null,
outputFormatClz, false);
+ Preconditions.checkState(outputFormatClz != null, "No
Hive substitute output format for " + outputFormatClzName);
+ HiveOutputFormat outputFormat = (HiveOutputFormat)
outputFormatClz.newInstance();
+ utilMethod =
utilClass.getDeclaredMethod("getRecordWriter", JobConf.class,
HiveOutputFormat.class,
+ Class.class, boolean.class,
Properties.class, Path.class, Reporter.class);
+ return (FileSinkOperator.RecordWriter)
utilMethod.invoke(null,
+ jobConf, outputFormat, outValClz,
isCompressed, tableProps, outPath, Reporter.NULL);
+ } catch (Exception e) {
+ throw new CatalogException("Failed to create Hive
RecordWriter", e);
+ }
+ }
+
+ @Override
+ public List<FieldSchema> getFieldsFromDeserializer(Configuration conf,
Table table, boolean skipConfError) {
+ try {
+ Method utilMethod =
getHiveMetaStoreUtilsClass().getMethod("getDeserializer", Configuration.class,
Table.class);
+ Deserializer deserializer = (Deserializer)
utilMethod.invoke(null, conf, table);
+ utilMethod =
getHiveMetaStoreUtilsClass().getMethod("getFieldsFromDeserializer",
String.class, Deserializer.class);
+ return (List<FieldSchema>) utilMethod.invoke(null,
table.getTableName(), deserializer);
+ } catch (Exception e) {
+ throw new CatalogException("Failed to get table schema
from deserializer", e);
+ }
+ }
}
diff --git
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV101.java
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV101.java
new file mode 100644
index 0000000..cf86905
--- /dev/null
+++
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV101.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog.hive.client;
+
+/**
+ * Shim for Hive version 1.0.1.
+ */
+public class HiveShimV101 extends HiveShimV100 {
+}
diff --git
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV110.java
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV110.java
index 91fd80a..691c14a 100644
---
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV110.java
+++
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV110.java
@@ -19,279 +19,58 @@
package org.apache.flink.table.catalog.hive.client;
import org.apache.flink.table.catalog.exceptions.CatalogException;
-import org.apache.flink.table.catalog.hive.util.HiveReflectionUtils;
-import org.apache.flink.table.catalog.stats.CatalogColumnStatisticsDataDate;
-import org.apache.flink.table.functions.hive.FlinkHiveUDFException;
-import org.apache.flink.table.functions.hive.conversion.HiveInspectors;
+import org.apache.flink.util.Preconditions;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.common.FileUtils;
-import org.apache.hadoop.hive.common.HiveStatsUtils;
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
-import org.apache.hadoop.hive.metastore.IMetaStoreClient;
-import org.apache.hadoop.hive.metastore.Warehouse;
-import org.apache.hadoop.hive.metastore.api.ColumnStatisticsData;
-import org.apache.hadoop.hive.metastore.api.Function;
-import org.apache.hadoop.hive.metastore.api.InvalidOperationException;
-import org.apache.hadoop.hive.metastore.api.MetaException;
-import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
-import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.Table;
-import org.apache.hadoop.hive.metastore.api.UnknownDBException;
-import org.apache.hadoop.hive.ql.udf.generic.SimpleGenericUDAFParameterInfo;
-import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
-import
org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableConstantBinaryObjectInspector;
-import
org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableConstantBooleanObjectInspector;
-import
org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableConstantByteObjectInspector;
-import
org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableConstantDateObjectInspector;
-import
org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableConstantDoubleObjectInspector;
-import
org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableConstantFloatObjectInspector;
-import
org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableConstantHiveCharObjectInspector;
-import
org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableConstantHiveDecimalObjectInspector;
-import
org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableConstantHiveVarcharObjectInspector;
-import
org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableConstantIntObjectInspector;
-import
org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableConstantLongObjectInspector;
-import
org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableConstantShortObjectInspector;
-import
org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableConstantStringObjectInspector;
-import
org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableConstantTimestampObjectInspector;
-import org.apache.hadoop.hive.serde2.typeinfo.CharTypeInfo;
-import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
-import org.apache.hadoop.hive.serde2.typeinfo.VarcharTypeInfo;
-import org.apache.thrift.TException;
+import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
+import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils;
+import org.apache.hadoop.hive.serde2.Deserializer;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.OutputFormat;
+import org.apache.hadoop.mapred.Reporter;
-import java.io.IOException;
-import java.lang.reflect.Constructor;
-import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
-import java.util.ArrayList;
import java.util.List;
-import java.util.Map;
+import java.util.Properties;
/**
* Shim for Hive version 1.1.0.
*/
-public class HiveShimV110 implements HiveShim {
+public class HiveShimV110 extends HiveShimV101 {
@Override
- public IMetaStoreClient getHiveMetastoreClient(HiveConf hiveConf) {
+ public FileSinkOperator.RecordWriter getHiveRecordWriter(JobConf
jobConf, String outputFormatClzName,
+ Class<? extends Writable> outValClz, boolean
isCompressed, Properties tableProps, Path outPath) {
try {
- return new HiveMetaStoreClient(hiveConf);
- } catch (MetaException ex) {
- throw new CatalogException("Failed to create Hive
Metastore client", ex);
+ Class outputFormatClz =
Class.forName(outputFormatClzName);
+ Class utilClass = HiveFileFormatUtils.class;
+ Method utilMethod =
utilClass.getDeclaredMethod("getOutputFormatSubstitute", Class.class);
+ outputFormatClz = (Class) utilMethod.invoke(null,
outputFormatClz);
+ Preconditions.checkState(outputFormatClz != null, "No
Hive substitute output format for " + outputFormatClzName);
+ OutputFormat outputFormat = (OutputFormat)
outputFormatClz.newInstance();
+ utilMethod =
utilClass.getDeclaredMethod("getRecordWriter", JobConf.class,
OutputFormat.class,
+ Class.class, boolean.class,
Properties.class, Path.class, Reporter.class);
+ return (FileSinkOperator.RecordWriter)
utilMethod.invoke(null,
+ jobConf, outputFormat, outValClz,
isCompressed, tableProps, outPath, Reporter.NULL);
+ } catch (Exception e) {
+ throw new CatalogException("Failed to create Hive
RecordWriter", e);
}
}
@Override
- // 1.x client doesn't support filtering tables by type, so here we need
to get all tables and filter by ourselves
- public List<String> getViews(IMetaStoreClient client, String
databaseName) throws UnknownDBException, TException {
- // We don't have to use reflection here because
client.getAllTables(String) is supposed to be there for
- // all versions.
- List<String> tableNames = client.getAllTables(databaseName);
- List<String> views = new ArrayList<>();
- for (String name : tableNames) {
- Table table = client.getTable(databaseName, name);
- String viewDef = table.getViewOriginalText();
- if (viewDef != null && !viewDef.isEmpty()) {
- views.add(table.getTableName());
- }
- }
- return views;
- }
-
- @Override
- public Function getFunction(IMetaStoreClient client, String dbName,
String functionName) throws NoSuchObjectException, TException {
- try {
- // hive-1.x doesn't throw NoSuchObjectException if
function doesn't exist, instead it throws a MetaException
- return client.getFunction(dbName, functionName);
- } catch (MetaException e) {
- // need to check the cause and message of this
MetaException to decide whether it should actually be a NoSuchObjectException
- if (e.getCause() instanceof NoSuchObjectException) {
- throw (NoSuchObjectException) e.getCause();
- }
- if
(e.getMessage().startsWith(NoSuchObjectException.class.getSimpleName())) {
- throw new NoSuchObjectException(e.getMessage());
- }
- throw e;
- }
- }
-
- @Override
- public boolean moveToTrash(FileSystem fs, Path path, Configuration
conf, boolean purge) throws IOException {
- try {
- Method method =
FileUtils.class.getDeclaredMethod("moveToTrash", FileSystem.class, Path.class,
Configuration.class);
- return (boolean) method.invoke(null, fs, path, conf);
- } catch (NoSuchMethodException | InvocationTargetException |
IllegalAccessException e) {
- throw new IOException("Failed to move " + path + " to
trash", e);
- }
- }
-
- @Override
- public void alterTable(IMetaStoreClient client, String databaseName,
String tableName, Table table) throws InvalidOperationException, MetaException,
TException {
- client.alter_table(databaseName, tableName, table);
- }
-
- @Override
- public void alterPartition(IMetaStoreClient client, String
databaseName, String tableName, Partition partition)
- throws InvalidOperationException, MetaException,
TException {
- String errorMsg = "Failed to alter partition for table %s in
database %s";
- try {
- Method method =
client.getClass().getMethod("alter_partition", String.class, String.class,
Partition.class);
- method.invoke(client, databaseName, tableName,
partition);
- } catch (InvocationTargetException ite) {
- Throwable targetEx = ite.getTargetException();
- if (targetEx instanceof TException) {
- throw (TException) targetEx;
- } else {
- throw new
CatalogException(String.format(errorMsg, tableName, databaseName), targetEx);
- }
- } catch (NoSuchMethodException | IllegalAccessException e) {
- throw new CatalogException(String.format(errorMsg,
tableName, databaseName), e);
- }
- }
-
- @Override
- public SimpleGenericUDAFParameterInfo
createUDAFParameterInfo(ObjectInspector[] params, boolean isWindowing, boolean
distinct, boolean allColumns) {
+ public List<FieldSchema> getFieldsFromDeserializer(Configuration conf,
Table table, boolean skipConfError) {
try {
- Constructor constructor =
SimpleGenericUDAFParameterInfo.class.getConstructor(ObjectInspector[].class,
- boolean.class, boolean.class);
- return (SimpleGenericUDAFParameterInfo)
constructor.newInstance(params, distinct, allColumns);
- } catch (NoSuchMethodException | IllegalAccessException |
InstantiationException | InvocationTargetException e) {
- throw new CatalogException("Failed to create
SimpleGenericUDAFParameterInfo", e);
+ Method utilMethod =
getHiveMetaStoreUtilsClass().getMethod("getDeserializer",
+ Configuration.class, Table.class,
boolean.class);
+ Deserializer deserializer = (Deserializer)
utilMethod.invoke(null, conf, table, skipConfError);
+ utilMethod =
getHiveMetaStoreUtilsClass().getMethod("getFieldsFromDeserializer",
String.class, Deserializer.class);
+ return (List<FieldSchema>) utilMethod.invoke(null,
table.getTableName(), deserializer);
+ } catch (Exception e) {
+ throw new CatalogException("Failed to get table schema
from deserializer", e);
}
}
-
- @Override
- public Class<?> getMetaStoreUtilsClass() {
- try {
- return
Class.forName("org.apache.hadoop.hive.metastore.MetaStoreUtils");
- } catch (ClassNotFoundException e) {
- throw new CatalogException("Failed to find class
MetaStoreUtils", e);
- }
- }
-
- @Override
- public Class<?> getHiveMetaStoreUtilsClass() {
- return getMetaStoreUtilsClass();
- }
-
- @Override
- public Class<?> getDateDataTypeClass() {
- return java.sql.Date.class;
- }
-
- @Override
- public Class<?> getTimestampDataTypeClass() {
- return java.sql.Timestamp.class;
- }
-
- @Override
- public FileStatus[] getFileStatusRecurse(Path path, int level,
FileSystem fs) throws IOException {
- try {
- Method method =
HiveStatsUtils.class.getMethod("getFileStatusRecurse", Path.class,
Integer.TYPE, FileSystem.class);
- // getFileStatusRecurse is a static method
- return (FileStatus[]) method.invoke(null, path, level,
fs);
- } catch (Exception ex) {
- throw new CatalogException("Failed to invoke
HiveStatsUtils.getFileStatusRecurse()", ex);
- }
- }
-
- @Override
- public void makeSpecFromName(Map<String, String> partSpec, Path
currPath) {
- try {
- Method method =
Warehouse.class.getMethod("makeSpecFromName", Map.class, Path.class);
- // makeSpecFromName is a static method
- method.invoke(null, partSpec, currPath);
- } catch (Exception ex) {
- throw new CatalogException("Failed to invoke
Warehouse.makeSpecFromName()", ex);
- }
- }
-
- @Override
- public ObjectInspector getObjectInspectorForConstant(PrimitiveTypeInfo
primitiveTypeInfo, Object value) {
- String className;
- value = HiveInspectors.hivePrimitiveToWritable(value);
- // Java constant object inspectors are not available until
1.2.0 -- https://issues.apache.org/jira/browse/HIVE-9766
- // So we have to use writable constant object inspectors for
1.1.x
- switch (primitiveTypeInfo.getPrimitiveCategory()) {
- case BOOLEAN:
- className =
WritableConstantBooleanObjectInspector.class.getName();
- return
HiveReflectionUtils.createConstantObjectInspector(className, value);
- case BYTE:
- className =
WritableConstantByteObjectInspector.class.getName();
- return
HiveReflectionUtils.createConstantObjectInspector(className, value);
- case SHORT:
- className =
WritableConstantShortObjectInspector.class.getName();
- return
HiveReflectionUtils.createConstantObjectInspector(className, value);
- case INT:
- className =
WritableConstantIntObjectInspector.class.getName();
- return
HiveReflectionUtils.createConstantObjectInspector(className, value);
- case LONG:
- className =
WritableConstantLongObjectInspector.class.getName();
- return
HiveReflectionUtils.createConstantObjectInspector(className, value);
- case FLOAT:
- className =
WritableConstantFloatObjectInspector.class.getName();
- return
HiveReflectionUtils.createConstantObjectInspector(className, value);
- case DOUBLE:
- className =
WritableConstantDoubleObjectInspector.class.getName();
- return
HiveReflectionUtils.createConstantObjectInspector(className, value);
- case STRING:
- className =
WritableConstantStringObjectInspector.class.getName();
- return
HiveReflectionUtils.createConstantObjectInspector(className, value);
- case CHAR:
- className =
WritableConstantHiveCharObjectInspector.class.getName();
- try {
- return (ObjectInspector)
Class.forName(className).getDeclaredConstructor(
- CharTypeInfo.class,
value.getClass()).newInstance(primitiveTypeInfo, value);
- } catch (Exception e) {
- throw new FlinkHiveUDFException("Failed
to create writable constant object inspector", e);
- }
- case VARCHAR:
- className =
WritableConstantHiveVarcharObjectInspector.class.getName();
- try {
- return (ObjectInspector)
Class.forName(className).getDeclaredConstructor(
- VarcharTypeInfo.class,
value.getClass()).newInstance(primitiveTypeInfo, value);
- } catch (Exception e) {
- throw new FlinkHiveUDFException("Failed
to create writable constant object inspector", e);
- }
- case DATE:
- className =
WritableConstantDateObjectInspector.class.getName();
- return
HiveReflectionUtils.createConstantObjectInspector(className, value);
- case TIMESTAMP:
- className =
WritableConstantTimestampObjectInspector.class.getName();
- return
HiveReflectionUtils.createConstantObjectInspector(className, value);
- case DECIMAL:
- className =
WritableConstantHiveDecimalObjectInspector.class.getName();
- return
HiveReflectionUtils.createConstantObjectInspector(className, value);
- case BINARY:
- className =
WritableConstantBinaryObjectInspector.class.getName();
- return
HiveReflectionUtils.createConstantObjectInspector(className, value);
- case UNKNOWN:
- case VOID:
- // If type is null, we use the Constant String
to replace
- className =
WritableConstantStringObjectInspector.class.getName();
- return
HiveReflectionUtils.createConstantObjectInspector(className, value.toString());
- default:
- throw new FlinkHiveUDFException(
- String.format("Cannot find
ConstantObjectInspector for %s", primitiveTypeInfo));
- }
- }
-
- @Override
- public ColumnStatisticsData
toHiveDateColStats(CatalogColumnStatisticsDataDate flinkDateColStats) {
- throw new UnsupportedOperationException("DATE column stats are
not supported until Hive 1.2.0");
- }
-
- @Override
- public boolean isDateStats(ColumnStatisticsData colStatsData) {
- return false;
- }
-
- @Override
- public CatalogColumnStatisticsDataDate
toFlinkDateColStats(ColumnStatisticsData hiveDateColStats) {
- throw new UnsupportedOperationException("DATE column stats are
not supported until Hive 1.2.0");
- }
}
diff --git
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/util/HiveReflectionUtils.java
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/util/HiveReflectionUtils.java
index b03eb66..5224b78 100644
---
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/util/HiveReflectionUtils.java
+++
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/util/HiveReflectionUtils.java
@@ -22,12 +22,8 @@ import
org.apache.flink.table.catalog.exceptions.CatalogException;
import org.apache.flink.table.catalog.hive.client.HiveShim;
import org.apache.flink.table.functions.hive.FlinkHiveUDFException;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
-import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.api.Table;
-import org.apache.hadoop.hive.serde2.Deserializer;
-import org.apache.hadoop.hive.serde2.SerDeException;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import java.lang.reflect.Constructor;
@@ -57,27 +53,6 @@ public class HiveReflectionUtils {
}
}
- public static List<FieldSchema> getFieldsFromDeserializer(HiveShim
hiveShim, String tableName, Deserializer deserializer)
- throws SerDeException, MetaException {
- try {
- Method method =
hiveShim.getHiveMetaStoreUtilsClass().getMethod("getFieldsFromDeserializer",
String.class, Deserializer.class);
- return (List<FieldSchema>) method.invoke(null,
tableName, deserializer);
- } catch (NoSuchMethodException | IllegalAccessException |
InvocationTargetException e) {
- throw new CatalogException("Failed to invoke
MetaStoreUtils.getFieldsFromDeserializer()", e);
- }
- }
-
- public static Deserializer getDeserializer(HiveShim hiveShim,
Configuration conf, Table table, boolean skipConfError)
- throws MetaException {
- try {
- Method method =
hiveShim.getHiveMetaStoreUtilsClass().getMethod("getDeserializer",
Configuration.class,
- Table.class, boolean.class);
- return (Deserializer) method.invoke(null, conf, table,
skipConfError);
- } catch (NoSuchMethodException | IllegalAccessException |
InvocationTargetException e) {
- throw new CatalogException("Failed to invoke
MetaStoreUtils.getDeserializer()", e);
- }
- }
-
public static List<String> getPvals(HiveShim hiveShim,
List<FieldSchema> partCols, Map<String, String> partSpec) {
try {
Method method =
hiveShim.getMetaStoreUtilsClass().getMethod("getPvals", List.class, Map.class);
diff --git
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveRunnerShimLoader.java
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveRunnerShimLoader.java
index 2a344d1..590d09d 100644
---
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveRunnerShimLoader.java
+++
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveRunnerShimLoader.java
@@ -37,6 +37,8 @@ public class HiveRunnerShimLoader {
String hiveVersion = HiveShimLoader.getHiveVersion();
return hiveRunnerShims.computeIfAbsent(hiveVersion, v -> {
switch (v) {
+ case HiveShimLoader.HIVE_VERSION_V1_0_0:
+ case HiveShimLoader.HIVE_VERSION_V1_0_1:
case HiveShimLoader.HIVE_VERSION_V1_1_0:
case HiveShimLoader.HIVE_VERSION_V1_1_1:
case HiveShimLoader.HIVE_VERSION_V1_2_0:
diff --git
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/TableEnvHiveConnectorTest.java
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/TableEnvHiveConnectorTest.java
index 2c50bef..ce00b23 100644
---
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/TableEnvHiveConnectorTest.java
+++
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/TableEnvHiveConnectorTest.java
@@ -141,7 +141,8 @@ public class TableEnvHiveConnectorTest {
hiveShell.execute("create table db1.dest (i int,s string) " +
suffix);
// prepare source data with Hive
- hiveShell.execute("insert into db1.src values (1,'a'),(2,'b')");
+ // TABLE keyword in INSERT INTO is mandatory prior to 1.1.0
+ hiveShell.execute("insert into table db1.src values
(1,'a'),(2,'b')");
// populate dest table with source table
tableEnv.sqlUpdate("insert into db1.dest select * from
db1.src");
@@ -161,7 +162,8 @@ public class TableEnvHiveConnectorTest {
hiveShell.execute("create table db1.src2 (x
decimal(10,2))");
hiveShell.execute("create table db1.dest (x
decimal(10,2))");
// populate src1 from Hive
- hiveShell.execute("insert into db1.src1 values
(1.0),(2.12),(5.123),(5.456),(123456789.12)");
+ // TABLE keyword in INSERT INTO is mandatory prior to
1.1.0
+ hiveShell.execute("insert into table db1.src1 values
(1.0),(2.12),(5.123),(5.456),(123456789.12)");
TableEnvironment tableEnv =
getTableEnvWithHiveCatalog();
// populate src2 with same data from Flink
diff --git
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/functions/hive/HiveGenericUDFTest.java
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/functions/hive/HiveGenericUDFTest.java
index f7c74dc..fc8c0b5 100644
---
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/functions/hive/HiveGenericUDFTest.java
+++
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/functions/hive/HiveGenericUDFTest.java
@@ -29,7 +29,6 @@ import org.apache.flink.types.Row;
import org.apache.hadoop.hive.ql.udf.UDFUnhex;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDFAbs;
-import org.apache.hadoop.hive.ql.udf.generic.GenericUDFAddMonths;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDFCase;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDFCeil;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDFCoalesce;
@@ -53,6 +52,7 @@ import static org.junit.Assert.assertEquals;
public class HiveGenericUDFTest {
private static HiveShim hiveShim =
HiveShimLoader.loadHiveShim(HiveShimLoader.getHiveVersion());
private static final boolean HIVE_120_OR_LATER =
HiveShimLoader.getHiveVersion().compareTo(HiveShimLoader.HIVE_VERSION_V1_2_0)
>= 0;
+ private static final boolean HIVE_110_OR_LATER =
HiveShimLoader.getHiveVersion().compareTo(HiveShimLoader.HIVE_VERSION_V1_1_0)
>= 0;
@Test
public void testAbs() {
@@ -94,9 +94,10 @@ public class HiveGenericUDFTest {
}
@Test
- public void testAddMonths() {
+ public void testAddMonths() throws Exception {
+ Assume.assumeTrue(HIVE_110_OR_LATER);
HiveGenericUDF udf = init(
- GenericUDFAddMonths.class,
+
Class.forName("org.apache.hadoop.hive.ql.udf.generic.GenericUDFAddMonths"),
new Object[] {
null,
1