This is an automated email from the ASF dual-hosted git repository.

bli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 15f8f3c  [FLINK-14588][hive] Support Hive version 1.0.0 and 1.0.1
15f8f3c is described below

commit 15f8f3c52a1bf11ecf9f550388eee550b7fc763e
Author: Rui Li <[email protected]>
AuthorDate: Thu Oct 31 21:15:13 2019 +0800

    [FLINK-14588][hive] Support Hive version 1.0.0 and 1.0.1
    
    To support Hive 1.0.0 and 1.0.1.
    
    This closes #10062.
---
 flink-connectors/flink-connector-hive/pom.xml      |  10 +-
 .../connectors/hive/HiveTableOutputFormat.java     |  51 ++--
 .../flink/table/catalog/hive/HiveCatalog.java      |  11 +-
 .../flink/table/catalog/hive/client/HiveShim.java  |  16 ++
 .../table/catalog/hive/client/HiveShimLoader.java  |   8 +
 .../{HiveShimV110.java => HiveShimV100.java}       |  47 +++-
 .../table/catalog/hive/client/HiveShimV101.java    |  25 ++
 .../table/catalog/hive/client/HiveShimV110.java    | 287 +++------------------
 .../catalog/hive/util/HiveReflectionUtils.java     |  25 --
 .../connectors/hive/HiveRunnerShimLoader.java      |   2 +
 .../connectors/hive/TableEnvHiveConnectorTest.java |   6 +-
 .../table/functions/hive/HiveGenericUDFTest.java   |   7 +-
 12 files changed, 164 insertions(+), 331 deletions(-)

diff --git a/flink-connectors/flink-connector-hive/pom.xml 
b/flink-connectors/flink-connector-hive/pom.xml
index e20bdeb..de98156 100644
--- a/flink-connectors/flink-connector-hive/pom.xml
+++ b/flink-connectors/flink-connector-hive/pom.xml
@@ -666,7 +666,15 @@ under the License.
        </build>
 
        <profiles>
-               <!-- Activate this profile with -Phive-1.2.1 to build and test 
against hive-1.2.1 -->
+               <!-- Activate these profiles with -Phive-x.x.x to build and 
test against different Hive versions -->
+               <profile>
+                       <id>hive-1.0.1</id>
+                       <properties>
+                               <hive.version>1.0.1</hive.version>
+                               
<hivemetastore.hadoop.version>2.6.5</hivemetastore.hadoop.version>
+                               <hiverunner.version>3.1.1</hiverunner.version>
+                       </properties>
+               </profile>
                <profile>
                        <id>hive-1.1.1</id>
                        <properties>
diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableOutputFormat.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableOutputFormat.java
index 0760ae5..15fcbd4 100644
--- 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableOutputFormat.java
+++ 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableOutputFormat.java
@@ -45,7 +45,7 @@ import org.apache.flink.util.StringUtils;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.common.FileUtils;
+import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.Warehouse;
 import org.apache.hadoop.hive.metastore.api.MetaException;
@@ -53,8 +53,6 @@ import org.apache.hadoop.hive.metastore.api.Partition;
 import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
 import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
-import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils;
-import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.serde2.Deserializer;
 import org.apache.hadoop.hive.serde2.SerDeException;
 import org.apache.hadoop.hive.serde2.SerDeUtils;
@@ -71,8 +69,6 @@ import org.apache.hadoop.mapred.JobContext;
 import org.apache.hadoop.mapred.JobContextImpl;
 import org.apache.hadoop.mapred.JobID;
 import org.apache.hadoop.mapred.OutputCommitter;
-import org.apache.hadoop.mapred.OutputFormat;
-import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.mapred.SequenceFileOutputFormat;
 import org.apache.hadoop.mapred.TaskAttemptContext;
 import org.apache.hadoop.mapred.TaskAttemptContextImpl;
@@ -108,6 +104,11 @@ public class HiveTableOutputFormat extends 
HadoopOutputFormatCommonBase<Row> imp
 
        private static final long serialVersionUID = 5167529504848109023L;
 
+       private static final PathFilter HIDDEN_FILES_PATH_FILTER = p -> {
+               String name = p.getName();
+               return !name.startsWith("_") && !name.startsWith(".");
+       };
+
        private transient JobConf jobConf;
        private transient ObjectPath tablePath;
        private transient List<String> partitionColumns;
@@ -139,6 +140,8 @@ public class HiveTableOutputFormat extends 
HadoopOutputFormatCommonBase<Row> imp
 
        private transient String hiveVersion;
 
+       private transient HiveShim hiveShim;
+
        // to convert Flink object to Hive object
        private transient HiveObjectConversion[] hiveConversions;
 
@@ -167,6 +170,7 @@ public class HiveTableOutputFormat extends 
HadoopOutputFormatCommonBase<Row> imp
                isDynamicPartition = isPartitioned && partitionColumns.size() > 
hiveTablePartition.getPartitionSpec().size();
                hiveVersion = 
Preconditions.checkNotNull(jobConf.get(HiveCatalogValidator.CATALOG_HIVE_VERSION),
                                "Hive version is not defined");
+               hiveShim = HiveShimLoader.loadHiveShim(hiveVersion);
        }
 
        //  Custom serialization methods
@@ -209,6 +213,7 @@ public class HiveTableOutputFormat extends 
HadoopOutputFormatCommonBase<Row> imp
                partitionToWriter = new HashMap<>();
                tableProperties = (Properties) in.readObject();
                hiveVersion = (String) in.readObject();
+               hiveShim = HiveShimLoader.loadHiveShim(hiveVersion);
        }
 
        @Override
@@ -223,7 +228,6 @@ public class HiveTableOutputFormat extends 
HadoopOutputFormatCommonBase<Row> imp
                        }
                        if (isPartitioned) {
                                if (isDynamicPartition) {
-                                       HiveShim hiveShim = 
HiveShimLoader.loadHiveShim(hiveVersion);
                                        FileStatus[] generatedParts = 
hiveShim.getFileStatusRecurse(stagingDir,
                                                partitionColumns.size() - 
hiveTablePartition.getPartitionSpec().size(), fs);
                                        for (FileStatus part : generatedParts) {
@@ -378,16 +382,15 @@ public class HiveTableOutputFormat extends 
HadoopOutputFormatCommonBase<Row> imp
                                // TODO: support setting auto-purge?
                                final boolean purge = true;
                                // Note we assume the srcDir is a hidden dir, 
otherwise it will be deleted if it's a sub-dir of destDir
-                               FileStatus[] existingFiles = 
fs.listStatus(destDir, FileUtils.HIDDEN_FILES_PATH_FILTER);
+                               FileStatus[] existingFiles = 
fs.listStatus(destDir, HIDDEN_FILES_PATH_FILTER);
                                if (existingFiles != null) {
-                                       HiveShim hiveShim = 
HiveShimLoader.loadHiveShim(hiveVersion);
                                        for (FileStatus existingFile : 
existingFiles) {
                                                
Preconditions.checkState(hiveShim.moveToTrash(fs, existingFile.getPath(), 
jobConf, purge),
                                                        "Failed to overwrite 
existing file " + existingFile);
                                        }
                                }
                        }
-                       FileStatus[] srcFiles = fs.listStatus(srcDir, 
FileUtils.HIDDEN_FILES_PATH_FILTER);
+                       FileStatus[] srcFiles = fs.listStatus(srcDir, 
HIDDEN_FILES_PATH_FILTER);
                        for (FileStatus srcFile : srcFiles) {
                                Path srcPath = srcFile.getPath();
                                Path destPath = new Path(destDir, 
srcPath.getName());
@@ -441,17 +444,6 @@ public class HiveTableOutputFormat extends 
HadoopOutputFormatCommonBase<Row> imp
        private HivePartitionWriter writerForLocation(String location) throws 
IOException {
                JobConf clonedConf = new JobConf(jobConf);
                clonedConf.set(OUTDIR, location);
-               OutputFormat outputFormat;
-               try {
-                       StorageDescriptor sd = 
hiveTablePartition.getStorageDescriptor();
-                       Class outputFormatClz = 
Class.forName(sd.getOutputFormat(), true,
-                               Thread.currentThread().getContextClassLoader());
-                       outputFormatClz = 
HiveFileFormatUtils.getOutputFormatSubstitute(outputFormatClz);
-                       outputFormat = (OutputFormat) 
outputFormatClz.newInstance();
-               } catch (InstantiationException | IllegalAccessException | 
ClassNotFoundException e) {
-                       throw new FlinkRuntimeException("Unable to instantiate 
the hadoop output format", e);
-               }
-               ReflectionUtils.setConf(outputFormat, clonedConf);
                OutputCommitter outputCommitter = 
clonedConf.getOutputCommitter();
                JobContext jobContext = new JobContextImpl(clonedConf, new 
JobID());
                outputCommitter.setupJob(jobContext);
@@ -474,28 +466,19 @@ public class HiveTableOutputFormat extends 
HadoopOutputFormatCommonBase<Row> imp
                                
SequenceFileOutputFormat.setOutputCompressionType(clonedConf, style);
                        }
                }
+               StorageDescriptor sd = 
hiveTablePartition.getStorageDescriptor();
                String taskPartition = 
String.valueOf(clonedConf.getInt("mapreduce.task.partition", -1));
                Path taskPath = FileOutputFormat.getTaskOutputPath(clonedConf, 
taskPartition);
-               FileSinkOperator.RecordWriter recordWriter;
-               try {
-                       recordWriter = 
HiveFileFormatUtils.getRecordWriter(clonedConf, outputFormat,
-                               outputClass, isCompressed, tableProperties, 
taskPath, Reporter.NULL);
-               } catch (HiveException e) {
-                       throw new IOException(e);
-               }
-               return new HivePartitionWriter(clonedConf, outputFormat, 
recordWriter, outputCommitter);
+               FileSinkOperator.RecordWriter recordWriter = 
hiveShim.getHiveRecordWriter(
+                               clonedConf, sd.getOutputFormat(), outputClass, 
isCompressed, tableProperties, taskPath);
+               return new HivePartitionWriter(recordWriter, outputCommitter);
        }
 
        private static class HivePartitionWriter {
-               private final JobConf jobConf;
-               private final OutputFormat outputFormat;
                private final FileSinkOperator.RecordWriter recordWriter;
                private final OutputCommitter outputCommitter;
 
-               HivePartitionWriter(JobConf jobConf, OutputFormat outputFormat, 
FileSinkOperator.RecordWriter recordWriter,
-                                                       OutputCommitter 
outputCommitter) {
-                       this.jobConf = jobConf;
-                       this.outputFormat = outputFormat;
+               HivePartitionWriter(FileSinkOperator.RecordWriter recordWriter, 
OutputCommitter outputCommitter) {
                        this.recordWriter = recordWriter;
                        this.outputCommitter = outputCommitter;
                }
diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
index 49ff7441..9afd55c 100644
--- 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
+++ 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
@@ -85,9 +85,7 @@ import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.hadoop.hive.metastore.api.UnknownDBException;
 import org.apache.hadoop.hive.ql.io.StorageFormatDescriptor;
 import org.apache.hadoop.hive.ql.io.StorageFormatFactory;
-import org.apache.hadoop.hive.serde2.SerDeException;
 import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe;
-import org.apache.hive.common.util.HiveStringUtils;
 import org.apache.thrift.TException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -525,12 +523,7 @@ public class HiveCatalog extends AbstractCatalog {
                        fields = hiveTable.getSd().getCols();
                } else {
                        // get schema from deserializer
-                       try {
-                               fields = 
HiveReflectionUtils.getFieldsFromDeserializer(hiveShim, 
hiveTable.getTableName(),
-                                               
HiveReflectionUtils.getDeserializer(hiveShim, hiveConf, hiveTable, true));
-                       } catch (SerDeException | MetaException e) {
-                               throw new CatalogException("Failed to get Hive 
table schema from deserializer", e);
-                       }
+                       fields = hiveShim.getFieldsFromDeserializer(hiveConf, 
hiveTable, true);
                }
                TableSchema tableSchema =
                        HiveTableUtil.createTableSchema(fields, 
hiveTable.getPartitionKeys());
@@ -1099,7 +1092,7 @@ public class HiveCatalog extends AbstractCatalog {
 
                return new Function(
                        // due to 
https://issues.apache.org/jira/browse/HIVE-22053, we have to normalize function 
name ourselves
-                       
HiveStringUtils.normalizeIdentifier(functionPath.getObjectName()),
+                       functionPath.getObjectName().trim().toLowerCase(),
                        functionPath.getDatabaseName(),
                        functionClassName,
                        null,                   // Owner name
diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShim.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShim.java
index 2111099..1e27fb6 100644
--- 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShim.java
+++ 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShim.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.IMetaStoreClient;
 import org.apache.hadoop.hive.metastore.api.ColumnStatisticsData;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.metastore.api.Function;
 import org.apache.hadoop.hive.metastore.api.InvalidOperationException;
 import org.apache.hadoop.hive.metastore.api.MetaException;
@@ -34,15 +35,19 @@ import 
org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
 import org.apache.hadoop.hive.metastore.api.Partition;
 import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.hadoop.hive.metastore.api.UnknownDBException;
+import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
 import org.apache.hadoop.hive.ql.udf.generic.SimpleGenericUDAFParameterInfo;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
 import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.JobConf;
 import org.apache.thrift.TException;
 
 import java.io.IOException;
 import java.io.Serializable;
 import java.util.List;
 import java.util.Map;
+import java.util.Properties;
 
 /**
  * A shim layer to support different versions of Hive.
@@ -184,4 +189,15 @@ public interface HiveShim extends Serializable {
         */
        CatalogColumnStatisticsDataDate 
toFlinkDateColStats(ColumnStatisticsData hiveDateColStats);
 
+       /**
+        * Get Hive's FileSinkOperator.RecordWriter.
+        */
+       FileSinkOperator.RecordWriter getHiveRecordWriter(JobConf jobConf, 
String outputFormatClzName,
+                       Class<? extends Writable> outValClz, boolean 
isCompressed, Properties tableProps, Path outPath);
+
+       /**
+        * Get Hive table schema from deserializer.
+        */
+       List<FieldSchema> getFieldsFromDeserializer(Configuration conf, Table 
table, boolean skipConfError);
+
 }
diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimLoader.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimLoader.java
index 1645d08..bd4b414 100644
--- 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimLoader.java
+++ 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimLoader.java
@@ -32,6 +32,8 @@ import java.util.concurrent.ConcurrentHashMap;
  */
 public class HiveShimLoader {
 
+       public static final String HIVE_VERSION_V1_0_0 = "1.0.0";
+       public static final String HIVE_VERSION_V1_0_1 = "1.0.1";
        public static final String HIVE_VERSION_V1_1_0 = "1.1.0";
        public static final String HIVE_VERSION_V1_1_1 = "1.1.1";
        public static final String HIVE_VERSION_V1_2_0 = "1.2.0";
@@ -62,6 +64,12 @@ public class HiveShimLoader {
 
        public static HiveShim loadHiveShim(String version) {
                return hiveShims.computeIfAbsent(version, (v) -> {
+                       if (v.startsWith(HIVE_VERSION_V1_0_0)) {
+                               return new HiveShimV100();
+                       }
+                       if (v.startsWith(HIVE_VERSION_V1_0_1)) {
+                               return new HiveShimV101();
+                       }
                        if (v.startsWith(HIVE_VERSION_V1_1_0)) {
                                return new HiveShimV110();
                        }
diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV110.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV100.java
similarity index 84%
copy from 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV110.java
copy to 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV100.java
index 91fd80a..4afef67 100644
--- 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV110.java
+++ 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV100.java
@@ -23,6 +23,7 @@ import 
org.apache.flink.table.catalog.hive.util.HiveReflectionUtils;
 import org.apache.flink.table.catalog.stats.CatalogColumnStatisticsDataDate;
 import org.apache.flink.table.functions.hive.FlinkHiveUDFException;
 import org.apache.flink.table.functions.hive.conversion.HiveInspectors;
+import org.apache.flink.util.Preconditions;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
@@ -35,6 +36,7 @@ import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
 import org.apache.hadoop.hive.metastore.IMetaStoreClient;
 import org.apache.hadoop.hive.metastore.Warehouse;
 import org.apache.hadoop.hive.metastore.api.ColumnStatisticsData;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.metastore.api.Function;
 import org.apache.hadoop.hive.metastore.api.InvalidOperationException;
 import org.apache.hadoop.hive.metastore.api.MetaException;
@@ -42,7 +44,11 @@ import 
org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
 import org.apache.hadoop.hive.metastore.api.Partition;
 import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.hadoop.hive.metastore.api.UnknownDBException;
+import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
+import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils;
+import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
 import org.apache.hadoop.hive.ql.udf.generic.SimpleGenericUDAFParameterInfo;
+import org.apache.hadoop.hive.serde2.Deserializer;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
 import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableConstantBinaryObjectInspector;
 import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableConstantBooleanObjectInspector;
@@ -61,6 +67,9 @@ import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableConstantT
 import org.apache.hadoop.hive.serde2.typeinfo.CharTypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.VarcharTypeInfo;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Reporter;
 import org.apache.thrift.TException;
 
 import java.io.IOException;
@@ -70,11 +79,12 @@ import java.lang.reflect.Method;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
+import java.util.Properties;
 
 /**
- * Shim for Hive version 1.1.0.
+ * Shim for Hive version 1.0.0.
  */
-public class HiveShimV110 implements HiveShim {
+public class HiveShimV100 implements HiveShim {
 
        @Override
        public IMetaStoreClient getHiveMetastoreClient(HiveConf hiveConf) {
@@ -276,7 +286,7 @@ public class HiveShimV110 implements HiveShim {
                                return 
HiveReflectionUtils.createConstantObjectInspector(className, value.toString());
                        default:
                                throw new FlinkHiveUDFException(
-                                       String.format("Cannot find 
ConstantObjectInspector for %s", primitiveTypeInfo));
+                                               String.format("Cannot find 
ConstantObjectInspector for %s", primitiveTypeInfo));
                }
        }
 
@@ -294,4 +304,35 @@ public class HiveShimV110 implements HiveShim {
        public CatalogColumnStatisticsDataDate 
toFlinkDateColStats(ColumnStatisticsData hiveDateColStats) {
                throw new UnsupportedOperationException("DATE column stats are 
not supported until Hive 1.2.0");
        }
+
+       @Override
+       public FileSinkOperator.RecordWriter getHiveRecordWriter(JobConf 
jobConf, String outputFormatClzName,
+                       Class<? extends Writable> outValClz, boolean 
isCompressed, Properties tableProps, Path outPath) {
+               try {
+                       Class outputFormatClz = 
Class.forName(outputFormatClzName);
+                       Class utilClass = HiveFileFormatUtils.class;
+                       Method utilMethod = 
utilClass.getDeclaredMethod("getOutputFormatSubstitute", Class.class, 
boolean.class);
+                       outputFormatClz = (Class) utilMethod.invoke(null, 
outputFormatClz, false);
+                       Preconditions.checkState(outputFormatClz != null, "No 
Hive substitute output format for " + outputFormatClzName);
+                       HiveOutputFormat outputFormat = (HiveOutputFormat) 
outputFormatClz.newInstance();
+                       utilMethod = 
utilClass.getDeclaredMethod("getRecordWriter", JobConf.class, 
HiveOutputFormat.class,
+                                       Class.class, boolean.class, 
Properties.class, Path.class, Reporter.class);
+                       return (FileSinkOperator.RecordWriter) 
utilMethod.invoke(null,
+                                       jobConf, outputFormat, outValClz, 
isCompressed, tableProps, outPath, Reporter.NULL);
+               } catch (Exception e) {
+                       throw new CatalogException("Failed to create Hive 
RecordWriter", e);
+               }
+       }
+
+       @Override
+       public List<FieldSchema> getFieldsFromDeserializer(Configuration conf, 
Table table, boolean skipConfError) {
+               try {
+                       Method utilMethod = 
getHiveMetaStoreUtilsClass().getMethod("getDeserializer", Configuration.class, 
Table.class);
+                       Deserializer deserializer = (Deserializer) 
utilMethod.invoke(null, conf, table);
+                       utilMethod = 
getHiveMetaStoreUtilsClass().getMethod("getFieldsFromDeserializer", 
String.class, Deserializer.class);
+                       return (List<FieldSchema>) utilMethod.invoke(null, 
table.getTableName(), deserializer);
+               } catch (Exception e) {
+                       throw new CatalogException("Failed to get table schema 
from deserializer", e);
+               }
+       }
 }
diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV101.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV101.java
new file mode 100644
index 0000000..cf86905
--- /dev/null
+++ 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV101.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog.hive.client;
+
+/**
+ * Shim for Hive version 1.0.1.
+ */
+public class HiveShimV101 extends HiveShimV100 {
+}
diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV110.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV110.java
index 91fd80a..691c14a 100644
--- 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV110.java
+++ 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV110.java
@@ -19,279 +19,58 @@
 package org.apache.flink.table.catalog.hive.client;
 
 import org.apache.flink.table.catalog.exceptions.CatalogException;
-import org.apache.flink.table.catalog.hive.util.HiveReflectionUtils;
-import org.apache.flink.table.catalog.stats.CatalogColumnStatisticsDataDate;
-import org.apache.flink.table.functions.hive.FlinkHiveUDFException;
-import org.apache.flink.table.functions.hive.conversion.HiveInspectors;
+import org.apache.flink.util.Preconditions;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.common.FileUtils;
-import org.apache.hadoop.hive.common.HiveStatsUtils;
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
-import org.apache.hadoop.hive.metastore.IMetaStoreClient;
-import org.apache.hadoop.hive.metastore.Warehouse;
-import org.apache.hadoop.hive.metastore.api.ColumnStatisticsData;
-import org.apache.hadoop.hive.metastore.api.Function;
-import org.apache.hadoop.hive.metastore.api.InvalidOperationException;
-import org.apache.hadoop.hive.metastore.api.MetaException;
-import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
-import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.metastore.api.Table;
-import org.apache.hadoop.hive.metastore.api.UnknownDBException;
-import org.apache.hadoop.hive.ql.udf.generic.SimpleGenericUDAFParameterInfo;
-import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
-import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableConstantBinaryObjectInspector;
-import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableConstantBooleanObjectInspector;
-import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableConstantByteObjectInspector;
-import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableConstantDateObjectInspector;
-import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableConstantDoubleObjectInspector;
-import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableConstantFloatObjectInspector;
-import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableConstantHiveCharObjectInspector;
-import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableConstantHiveDecimalObjectInspector;
-import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableConstantHiveVarcharObjectInspector;
-import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableConstantIntObjectInspector;
-import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableConstantLongObjectInspector;
-import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableConstantShortObjectInspector;
-import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableConstantStringObjectInspector;
-import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableConstantTimestampObjectInspector;
-import org.apache.hadoop.hive.serde2.typeinfo.CharTypeInfo;
-import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
-import org.apache.hadoop.hive.serde2.typeinfo.VarcharTypeInfo;
-import org.apache.thrift.TException;
+import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
+import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils;
+import org.apache.hadoop.hive.serde2.Deserializer;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.OutputFormat;
+import org.apache.hadoop.mapred.Reporter;
 
-import java.io.IOException;
-import java.lang.reflect.Constructor;
-import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
-import java.util.ArrayList;
 import java.util.List;
-import java.util.Map;
+import java.util.Properties;
 
 /**
  * Shim for Hive version 1.1.0.
  */
-public class HiveShimV110 implements HiveShim {
+public class HiveShimV110 extends HiveShimV101 {
 
        @Override
-       public IMetaStoreClient getHiveMetastoreClient(HiveConf hiveConf) {
+       public FileSinkOperator.RecordWriter getHiveRecordWriter(JobConf 
jobConf, String outputFormatClzName,
+                       Class<? extends Writable> outValClz, boolean 
isCompressed, Properties tableProps, Path outPath) {
                try {
-                       return new HiveMetaStoreClient(hiveConf);
-               } catch (MetaException ex) {
-                       throw new CatalogException("Failed to create Hive 
Metastore client", ex);
+                       Class outputFormatClz = 
Class.forName(outputFormatClzName);
+                       Class utilClass = HiveFileFormatUtils.class;
+                       Method utilMethod = 
utilClass.getDeclaredMethod("getOutputFormatSubstitute", Class.class);
+                       outputFormatClz = (Class) utilMethod.invoke(null, 
outputFormatClz);
+                       Preconditions.checkState(outputFormatClz != null, "No 
Hive substitute output format for " + outputFormatClzName);
+                       OutputFormat outputFormat = (OutputFormat) 
outputFormatClz.newInstance();
+                       utilMethod = 
utilClass.getDeclaredMethod("getRecordWriter", JobConf.class, 
OutputFormat.class,
+                                       Class.class, boolean.class, 
Properties.class, Path.class, Reporter.class);
+                       return (FileSinkOperator.RecordWriter) 
utilMethod.invoke(null,
+                                       jobConf, outputFormat, outValClz, 
isCompressed, tableProps, outPath, Reporter.NULL);
+               } catch (Exception e) {
+                       throw new CatalogException("Failed to create Hive 
RecordWriter", e);
                }
        }
 
        @Override
-       // 1.x client doesn't support filtering tables by type, so here we need 
to get all tables and filter by ourselves
-       public List<String> getViews(IMetaStoreClient client, String 
databaseName) throws UnknownDBException, TException {
-               // We don't have to use reflection here because 
client.getAllTables(String) is supposed to be there for
-               // all versions.
-               List<String> tableNames = client.getAllTables(databaseName);
-               List<String> views = new ArrayList<>();
-               for (String name : tableNames) {
-                       Table table = client.getTable(databaseName, name);
-                       String viewDef = table.getViewOriginalText();
-                       if (viewDef != null && !viewDef.isEmpty()) {
-                               views.add(table.getTableName());
-                       }
-               }
-               return views;
-       }
-
-       @Override
-       public Function getFunction(IMetaStoreClient client, String dbName, 
String functionName) throws NoSuchObjectException, TException {
-               try {
-                       // hive-1.x doesn't throw NoSuchObjectException if 
function doesn't exist, instead it throws a MetaException
-                       return client.getFunction(dbName, functionName);
-               } catch (MetaException e) {
-                       // need to check the cause and message of this 
MetaException to decide whether it should actually be a NoSuchObjectException
-                       if (e.getCause() instanceof NoSuchObjectException) {
-                               throw (NoSuchObjectException) e.getCause();
-                       }
-                       if 
(e.getMessage().startsWith(NoSuchObjectException.class.getSimpleName())) {
-                               throw new NoSuchObjectException(e.getMessage());
-                       }
-                       throw e;
-               }
-       }
-
-       @Override
-       public boolean moveToTrash(FileSystem fs, Path path, Configuration 
conf, boolean purge) throws IOException {
-               try {
-                       Method method = 
FileUtils.class.getDeclaredMethod("moveToTrash", FileSystem.class, Path.class, 
Configuration.class);
-                       return (boolean) method.invoke(null, fs, path, conf);
-               } catch (NoSuchMethodException | InvocationTargetException | 
IllegalAccessException e) {
-                       throw new IOException("Failed to move " + path + " to 
trash", e);
-               }
-       }
-
-       @Override
-       public void alterTable(IMetaStoreClient client, String databaseName, 
String tableName, Table table) throws InvalidOperationException, MetaException, 
TException {
-               client.alter_table(databaseName, tableName, table);
-       }
-
-       @Override
-       public void alterPartition(IMetaStoreClient client, String 
databaseName, String tableName, Partition partition)
-                       throws InvalidOperationException, MetaException, 
TException {
-               String errorMsg = "Failed to alter partition for table %s in 
database %s";
-               try {
-                       Method method = 
client.getClass().getMethod("alter_partition", String.class, String.class, 
Partition.class);
-                       method.invoke(client, databaseName, tableName, 
partition);
-               } catch (InvocationTargetException ite) {
-                       Throwable targetEx = ite.getTargetException();
-                       if (targetEx instanceof TException) {
-                               throw (TException) targetEx;
-                       } else {
-                               throw new 
CatalogException(String.format(errorMsg, tableName, databaseName), targetEx);
-                       }
-               } catch (NoSuchMethodException | IllegalAccessException e) {
-                       throw new CatalogException(String.format(errorMsg, 
tableName, databaseName), e);
-               }
-       }
-
-       @Override
-       public SimpleGenericUDAFParameterInfo 
createUDAFParameterInfo(ObjectInspector[] params, boolean isWindowing, boolean 
distinct, boolean allColumns) {
+       public List<FieldSchema> getFieldsFromDeserializer(Configuration conf, 
Table table, boolean skipConfError) {
                try {
-                       Constructor constructor = 
SimpleGenericUDAFParameterInfo.class.getConstructor(ObjectInspector[].class,
-                                       boolean.class, boolean.class);
-                       return (SimpleGenericUDAFParameterInfo) 
constructor.newInstance(params, distinct, allColumns);
-               } catch (NoSuchMethodException | IllegalAccessException | 
InstantiationException | InvocationTargetException e) {
-                       throw new CatalogException("Failed to create 
SimpleGenericUDAFParameterInfo", e);
+                       Method utilMethod = 
getHiveMetaStoreUtilsClass().getMethod("getDeserializer",
+                                       Configuration.class, Table.class, 
boolean.class);
+                       Deserializer deserializer = (Deserializer) 
utilMethod.invoke(null, conf, table, skipConfError);
+                       utilMethod = 
getHiveMetaStoreUtilsClass().getMethod("getFieldsFromDeserializer", 
String.class, Deserializer.class);
+                       return (List<FieldSchema>) utilMethod.invoke(null, 
table.getTableName(), deserializer);
+               } catch (Exception e) {
+                       throw new CatalogException("Failed to get table schema 
from deserializer", e);
                }
        }
-
-       @Override
-       public Class<?> getMetaStoreUtilsClass() {
-               try {
-                       return 
Class.forName("org.apache.hadoop.hive.metastore.MetaStoreUtils");
-               } catch (ClassNotFoundException e) {
-                       throw new CatalogException("Failed to find class 
MetaStoreUtils", e);
-               }
-       }
-
-       @Override
-       public Class<?> getHiveMetaStoreUtilsClass() {
-               return getMetaStoreUtilsClass();
-       }
-
-       @Override
-       public Class<?> getDateDataTypeClass() {
-               return java.sql.Date.class;
-       }
-
-       @Override
-       public Class<?> getTimestampDataTypeClass() {
-               return java.sql.Timestamp.class;
-       }
-
-       @Override
-       public FileStatus[] getFileStatusRecurse(Path path, int level, 
FileSystem fs) throws IOException {
-               try {
-                       Method method = 
HiveStatsUtils.class.getMethod("getFileStatusRecurse", Path.class, 
Integer.TYPE, FileSystem.class);
-                       // getFileStatusRecurse is a static method
-                       return (FileStatus[]) method.invoke(null, path, level, 
fs);
-               } catch (Exception ex) {
-                       throw new CatalogException("Failed to invoke 
HiveStatsUtils.getFileStatusRecurse()", ex);
-               }
-       }
-
-       @Override
-       public void makeSpecFromName(Map<String, String> partSpec, Path 
currPath) {
-               try {
-                       Method method = 
Warehouse.class.getMethod("makeSpecFromName", Map.class, Path.class);
-                       // makeSpecFromName is a static method
-                       method.invoke(null, partSpec, currPath);
-               } catch (Exception ex) {
-                       throw new CatalogException("Failed to invoke 
Warehouse.makeSpecFromName()", ex);
-               }
-       }
-
-       @Override
-       public ObjectInspector getObjectInspectorForConstant(PrimitiveTypeInfo 
primitiveTypeInfo, Object value) {
-               String className;
-               value = HiveInspectors.hivePrimitiveToWritable(value);
-               // Java constant object inspectors are not available until 
1.2.0 -- https://issues.apache.org/jira/browse/HIVE-9766
-               // So we have to use writable constant object inspectors for 
1.1.x
-               switch (primitiveTypeInfo.getPrimitiveCategory()) {
-                       case BOOLEAN:
-                               className = 
WritableConstantBooleanObjectInspector.class.getName();
-                               return 
HiveReflectionUtils.createConstantObjectInspector(className, value);
-                       case BYTE:
-                               className = 
WritableConstantByteObjectInspector.class.getName();
-                               return 
HiveReflectionUtils.createConstantObjectInspector(className, value);
-                       case SHORT:
-                               className = 
WritableConstantShortObjectInspector.class.getName();
-                               return 
HiveReflectionUtils.createConstantObjectInspector(className, value);
-                       case INT:
-                               className = 
WritableConstantIntObjectInspector.class.getName();
-                               return 
HiveReflectionUtils.createConstantObjectInspector(className, value);
-                       case LONG:
-                               className = 
WritableConstantLongObjectInspector.class.getName();
-                               return 
HiveReflectionUtils.createConstantObjectInspector(className, value);
-                       case FLOAT:
-                               className = 
WritableConstantFloatObjectInspector.class.getName();
-                               return 
HiveReflectionUtils.createConstantObjectInspector(className, value);
-                       case DOUBLE:
-                               className = 
WritableConstantDoubleObjectInspector.class.getName();
-                               return 
HiveReflectionUtils.createConstantObjectInspector(className, value);
-                       case STRING:
-                               className = 
WritableConstantStringObjectInspector.class.getName();
-                               return 
HiveReflectionUtils.createConstantObjectInspector(className, value);
-                       case CHAR:
-                               className = 
WritableConstantHiveCharObjectInspector.class.getName();
-                               try {
-                                       return (ObjectInspector) 
Class.forName(className).getDeclaredConstructor(
-                                                       CharTypeInfo.class, 
value.getClass()).newInstance(primitiveTypeInfo, value);
-                               } catch (Exception e) {
-                                       throw new FlinkHiveUDFException("Failed 
to create writable constant object inspector", e);
-                               }
-                       case VARCHAR:
-                               className = 
WritableConstantHiveVarcharObjectInspector.class.getName();
-                               try {
-                                       return (ObjectInspector) 
Class.forName(className).getDeclaredConstructor(
-                                                       VarcharTypeInfo.class, 
value.getClass()).newInstance(primitiveTypeInfo, value);
-                               } catch (Exception e) {
-                                       throw new FlinkHiveUDFException("Failed 
to create writable constant object inspector", e);
-                               }
-                       case DATE:
-                               className = 
WritableConstantDateObjectInspector.class.getName();
-                               return 
HiveReflectionUtils.createConstantObjectInspector(className, value);
-                       case TIMESTAMP:
-                               className = 
WritableConstantTimestampObjectInspector.class.getName();
-                               return 
HiveReflectionUtils.createConstantObjectInspector(className, value);
-                       case DECIMAL:
-                               className = 
WritableConstantHiveDecimalObjectInspector.class.getName();
-                               return 
HiveReflectionUtils.createConstantObjectInspector(className, value);
-                       case BINARY:
-                               className = 
WritableConstantBinaryObjectInspector.class.getName();
-                               return 
HiveReflectionUtils.createConstantObjectInspector(className, value);
-                       case UNKNOWN:
-                       case VOID:
-                               // If type is null, we use the Constant String 
to replace
-                               className = 
WritableConstantStringObjectInspector.class.getName();
-                               return 
HiveReflectionUtils.createConstantObjectInspector(className, value.toString());
-                       default:
-                               throw new FlinkHiveUDFException(
-                                       String.format("Cannot find 
ConstantObjectInspector for %s", primitiveTypeInfo));
-               }
-       }
-
-       @Override
-       public ColumnStatisticsData 
toHiveDateColStats(CatalogColumnStatisticsDataDate flinkDateColStats) {
-               throw new UnsupportedOperationException("DATE column stats are 
not supported until Hive 1.2.0");
-       }
-
-       @Override
-       public boolean isDateStats(ColumnStatisticsData colStatsData) {
-               return false;
-       }
-
-       @Override
-       public CatalogColumnStatisticsDataDate 
toFlinkDateColStats(ColumnStatisticsData hiveDateColStats) {
-               throw new UnsupportedOperationException("DATE column stats are 
not supported until Hive 1.2.0");
-       }
 }
diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/util/HiveReflectionUtils.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/util/HiveReflectionUtils.java
index b03eb66..5224b78 100644
--- 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/util/HiveReflectionUtils.java
+++ 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/util/HiveReflectionUtils.java
@@ -22,12 +22,8 @@ import 
org.apache.flink.table.catalog.exceptions.CatalogException;
 import org.apache.flink.table.catalog.hive.client.HiveShim;
 import org.apache.flink.table.functions.hive.FlinkHiveUDFException;
 
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
-import org.apache.hadoop.hive.metastore.api.MetaException;
 import org.apache.hadoop.hive.metastore.api.Table;
-import org.apache.hadoop.hive.serde2.Deserializer;
-import org.apache.hadoop.hive.serde2.SerDeException;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
 
 import java.lang.reflect.Constructor;
@@ -57,27 +53,6 @@ public class HiveReflectionUtils {
                }
        }
 
-       public static List<FieldSchema> getFieldsFromDeserializer(HiveShim 
hiveShim, String tableName, Deserializer deserializer)
-                       throws SerDeException, MetaException {
-               try {
-                       Method method = 
hiveShim.getHiveMetaStoreUtilsClass().getMethod("getFieldsFromDeserializer", 
String.class, Deserializer.class);
-                       return (List<FieldSchema>) method.invoke(null, 
tableName, deserializer);
-               } catch (NoSuchMethodException | IllegalAccessException | 
InvocationTargetException e) {
-                       throw new CatalogException("Failed to invoke 
MetaStoreUtils.getFieldsFromDeserializer()", e);
-               }
-       }
-
-       public static Deserializer getDeserializer(HiveShim hiveShim, 
Configuration conf, Table table, boolean skipConfError)
-                       throws MetaException {
-               try {
-                       Method method = 
hiveShim.getHiveMetaStoreUtilsClass().getMethod("getDeserializer", 
Configuration.class,
-                               Table.class, boolean.class);
-                       return (Deserializer) method.invoke(null, conf, table, 
skipConfError);
-               } catch (NoSuchMethodException | IllegalAccessException | 
InvocationTargetException e) {
-                       throw new CatalogException("Failed to invoke 
MetaStoreUtils.getDeserializer()", e);
-               }
-       }
-
        public static List<String> getPvals(HiveShim hiveShim, 
List<FieldSchema> partCols, Map<String, String> partSpec) {
                try {
                        Method method = 
hiveShim.getMetaStoreUtilsClass().getMethod("getPvals", List.class, Map.class);
diff --git 
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveRunnerShimLoader.java
 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveRunnerShimLoader.java
index 2a344d1..590d09d 100644
--- 
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveRunnerShimLoader.java
+++ 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveRunnerShimLoader.java
@@ -37,6 +37,8 @@ public class HiveRunnerShimLoader {
                String hiveVersion = HiveShimLoader.getHiveVersion();
                return hiveRunnerShims.computeIfAbsent(hiveVersion, v -> {
                        switch (v) {
+                               case HiveShimLoader.HIVE_VERSION_V1_0_0:
+                               case HiveShimLoader.HIVE_VERSION_V1_0_1:
                                case HiveShimLoader.HIVE_VERSION_V1_1_0:
                                case HiveShimLoader.HIVE_VERSION_V1_1_1:
                                case HiveShimLoader.HIVE_VERSION_V1_2_0:
diff --git 
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/TableEnvHiveConnectorTest.java
 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/TableEnvHiveConnectorTest.java
index 2c50bef..ce00b23 100644
--- 
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/TableEnvHiveConnectorTest.java
+++ 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/TableEnvHiveConnectorTest.java
@@ -141,7 +141,8 @@ public class TableEnvHiveConnectorTest {
                hiveShell.execute("create table db1.dest (i int,s string) " + 
suffix);
 
                // prepare source data with Hive
-               hiveShell.execute("insert into db1.src values (1,'a'),(2,'b')");
+               // TABLE keyword in INSERT INTO is mandatory prior to 1.1.0
+               hiveShell.execute("insert into table db1.src values 
(1,'a'),(2,'b')");
 
                // populate dest table with source table
                tableEnv.sqlUpdate("insert into db1.dest select * from 
db1.src");
@@ -161,7 +162,8 @@ public class TableEnvHiveConnectorTest {
                        hiveShell.execute("create table db1.src2 (x 
decimal(10,2))");
                        hiveShell.execute("create table db1.dest (x 
decimal(10,2))");
                        // populate src1 from Hive
-                       hiveShell.execute("insert into db1.src1 values 
(1.0),(2.12),(5.123),(5.456),(123456789.12)");
+                       // TABLE keyword in INSERT INTO is mandatory prior to 
1.1.0
+                       hiveShell.execute("insert into table db1.src1 values 
(1.0),(2.12),(5.123),(5.456),(123456789.12)");
 
                        TableEnvironment tableEnv = 
getTableEnvWithHiveCatalog();
                        // populate src2 with same data from Flink
diff --git 
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/functions/hive/HiveGenericUDFTest.java
 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/functions/hive/HiveGenericUDFTest.java
index f7c74dc..fc8c0b5 100644
--- 
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/functions/hive/HiveGenericUDFTest.java
+++ 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/functions/hive/HiveGenericUDFTest.java
@@ -29,7 +29,6 @@ import org.apache.flink.types.Row;
 
 import org.apache.hadoop.hive.ql.udf.UDFUnhex;
 import org.apache.hadoop.hive.ql.udf.generic.GenericUDFAbs;
-import org.apache.hadoop.hive.ql.udf.generic.GenericUDFAddMonths;
 import org.apache.hadoop.hive.ql.udf.generic.GenericUDFCase;
 import org.apache.hadoop.hive.ql.udf.generic.GenericUDFCeil;
 import org.apache.hadoop.hive.ql.udf.generic.GenericUDFCoalesce;
@@ -53,6 +52,7 @@ import static org.junit.Assert.assertEquals;
 public class HiveGenericUDFTest {
        private static HiveShim hiveShim = 
HiveShimLoader.loadHiveShim(HiveShimLoader.getHiveVersion());
        private static final boolean HIVE_120_OR_LATER = 
HiveShimLoader.getHiveVersion().compareTo(HiveShimLoader.HIVE_VERSION_V1_2_0) 
>= 0;
+       private static final boolean HIVE_110_OR_LATER = 
HiveShimLoader.getHiveVersion().compareTo(HiveShimLoader.HIVE_VERSION_V1_1_0) 
>= 0;
 
        @Test
        public void testAbs() {
@@ -94,9 +94,10 @@ public class HiveGenericUDFTest {
        }
 
        @Test
-       public void testAddMonths() {
+       public void testAddMonths() throws Exception {
+               Assume.assumeTrue(HIVE_110_OR_LATER);
                HiveGenericUDF udf = init(
-                       GenericUDFAddMonths.class,
+                       
Class.forName("org.apache.hadoop.hive.ql.udf.generic.GenericUDFAddMonths"),
                        new Object[] {
                                null,
                                1

Reply via email to