This is an automated email from the ASF dual-hosted git repository.

bli pushed a commit to branch release-1.10
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.10 by this push:
     new 15c58a8  [FLINK-15453][hive] Remove unneeded HiveShim methods
15c58a8 is described below

commit 15c58a889f7dd6d4d90f428aeb10b2c0f4477e62
Author: Rui Li <[email protected]>
AuthorDate: Fri Jan 3 13:22:28 2020 +0800

    [FLINK-15453][hive] Remove unneeded HiveShim methods
    
    closes #10755
---
 .../flink/table/catalog/hive/client/HiveShim.java  |  46 -------
 .../table/catalog/hive/client/HiveShimV100.java    | 137 ---------------------
 .../table/catalog/hive/client/HiveShimV230.java    |  16 ---
 .../table/catalog/hive/client/HiveShimV310.java    |  30 -----
 .../functions/hive/conversion/HiveInspectors.java  |  99 ++++++++++++++-
 5 files changed, 98 insertions(+), 230 deletions(-)

diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShim.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShim.java
index cd6bc0b..346a8e1 100644
--- 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShim.java
+++ 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShim.java
@@ -22,8 +22,6 @@ import 
org.apache.flink.table.api.constraints.UniqueConstraint;
 import org.apache.flink.table.catalog.stats.CatalogColumnStatisticsDataDate;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.IMetaStoreClient;
@@ -38,19 +36,16 @@ import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
 import org.apache.hadoop.hive.ql.exec.FunctionInfo;
 import org.apache.hadoop.hive.ql.udf.generic.SimpleGenericUDAFParameterInfo;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
-import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.thrift.TException;
 
 import javax.annotation.Nullable;
 
-import java.io.IOException;
 import java.io.Serializable;
 import java.time.LocalDate;
 import java.time.LocalDateTime;
 import java.util.List;
-import java.util.Map;
 import java.util.Optional;
 import java.util.Properties;
 import java.util.Set;
@@ -80,23 +75,6 @@ public interface HiveShim extends Serializable {
        List<String> getViews(IMetaStoreClient client, String databaseName) 
throws UnknownDBException, TException;
 
        /**
-        * Moves a particular file or directory to trash.
-        * The file/directory can potentially be deleted (w/o going to trash) 
if purge is set to true, or if it cannot
-        * be moved properly.
-        *
-        * <p>This interface is here because FileUtils.moveToTrash in different 
Hive versions have different signatures.
-        *
-        * @param fs    the FileSystem to use
-        * @param path  the path of the file or directory to be moved to trash.
-        * @param conf  the Configuration to use
-        * @param purge whether try to skip trash and directly delete the 
file/directory. This flag may be ignored by
-        *              old Hive versions prior to 2.3.0.
-        * @return true if the move is successful, and false otherwise
-        * @throws IOException if the file/directory cannot be properly moved 
or deleted
-        */
-       boolean moveToTrash(FileSystem fs, Path path, Configuration conf, 
boolean purge) throws IOException;
-
-       /**
         * Alters a Hive table.
         *
         * @param client       the Hive metastore client
@@ -145,30 +123,6 @@ public interface HiveShim extends Serializable {
        Class<?> getTimestampDataTypeClass();
 
        /**
-        * The return type of HiveStatsUtils.getFileStatusRecurse was changed 
from array to List in Hive 3.1.0.
-        *
-        * @param path the path of the directory
-        * @param level the level of recursion
-        * @param fs the file system of the directory
-        * @return an array of the entries
-        * @throws IOException in case of any io error
-        */
-       FileStatus[] getFileStatusRecurse(Path path, int level, FileSystem fs) 
throws IOException;
-
-       /**
-        * The signature of HiveStatsUtils.makeSpecFromName() was changed in 
Hive 3.1.0.
-        *
-        * @param partSpec partition specs
-        * @param currPath the current path
-        */
-       void makeSpecFromName(Map<String, String> partSpec, Path currPath);
-
-       /**
-        * Get ObjectInspector for a constant value.
-        */
-       ObjectInspector getObjectInspectorForConstant(PrimitiveTypeInfo 
primitiveTypeInfo, Object value);
-
-       /**
         * Generate Hive ColumnStatisticsData from Flink 
CatalogColumnStatisticsDataDate for DATE columns.
         */
        ColumnStatisticsData toHiveDateColStats(CatalogColumnStatisticsDataDate 
flinkDateColStats);
diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV100.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV100.java
index d1d7c55..a6fcd78 100644
--- 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV100.java
+++ 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV100.java
@@ -21,24 +21,17 @@ package org.apache.flink.table.catalog.hive.client;
 import org.apache.flink.connectors.hive.FlinkHiveException;
 import org.apache.flink.table.api.constraints.UniqueConstraint;
 import org.apache.flink.table.catalog.exceptions.CatalogException;
-import org.apache.flink.table.catalog.hive.util.HiveReflectionUtils;
 import org.apache.flink.table.catalog.stats.CatalogColumnStatisticsDataDate;
-import org.apache.flink.table.functions.hive.FlinkHiveUDFException;
 import org.apache.flink.util.Preconditions;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.common.FileUtils;
-import org.apache.hadoop.hive.common.HiveStatsUtils;
 import org.apache.hadoop.hive.common.type.HiveChar;
 import org.apache.hadoop.hive.common.type.HiveDecimal;
 import org.apache.hadoop.hive.common.type.HiveVarchar;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
 import org.apache.hadoop.hive.metastore.IMetaStoreClient;
-import org.apache.hadoop.hive.metastore.Warehouse;
 import org.apache.hadoop.hive.metastore.api.ColumnStatisticsData;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.metastore.api.InvalidOperationException;
@@ -61,24 +54,6 @@ import org.apache.hadoop.hive.serde2.io.HiveVarcharWritable;
 import org.apache.hadoop.hive.serde2.io.ShortWritable;
 import org.apache.hadoop.hive.serde2.io.TimestampWritable;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
-import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableConstantBinaryObjectInspector;
-import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableConstantBooleanObjectInspector;
-import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableConstantByteObjectInspector;
-import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableConstantDateObjectInspector;
-import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableConstantDoubleObjectInspector;
-import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableConstantFloatObjectInspector;
-import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableConstantHiveCharObjectInspector;
-import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableConstantHiveDecimalObjectInspector;
-import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableConstantHiveVarcharObjectInspector;
-import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableConstantIntObjectInspector;
-import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableConstantLongObjectInspector;
-import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableConstantShortObjectInspector;
-import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableConstantStringObjectInspector;
-import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableConstantTimestampObjectInspector;
-import org.apache.hadoop.hive.serde2.typeinfo.CharTypeInfo;
-import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo;
-import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
-import org.apache.hadoop.hive.serde2.typeinfo.VarcharTypeInfo;
 import org.apache.hadoop.io.BooleanWritable;
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.FloatWritable;
@@ -92,7 +67,6 @@ import org.apache.thrift.TException;
 
 import javax.annotation.Nonnull;
 
-import java.io.IOException;
 import java.lang.reflect.Constructor;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
@@ -104,7 +78,6 @@ import java.time.LocalDateTime;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
-import java.util.Map;
 import java.util.Optional;
 import java.util.Properties;
 import java.util.Set;
@@ -141,16 +114,6 @@ public class HiveShimV100 implements HiveShim {
        }
 
        @Override
-       public boolean moveToTrash(FileSystem fs, Path path, Configuration 
conf, boolean purge) throws IOException {
-               try {
-                       Method method = 
FileUtils.class.getDeclaredMethod("moveToTrash", FileSystem.class, Path.class, 
Configuration.class);
-                       return (boolean) method.invoke(null, fs, path, conf);
-               } catch (NoSuchMethodException | InvocationTargetException | 
IllegalAccessException e) {
-                       throw new IOException("Failed to move " + path + " to 
trash", e);
-               }
-       }
-
-       @Override
        public void alterTable(IMetaStoreClient client, String databaseName, 
String tableName, Table table) throws InvalidOperationException, MetaException, 
TException {
                client.alter_table(databaseName, tableName, table);
        }
@@ -210,106 +173,6 @@ public class HiveShimV100 implements HiveShim {
        }
 
        @Override
-       public FileStatus[] getFileStatusRecurse(Path path, int level, 
FileSystem fs) throws IOException {
-               try {
-                       Method method = 
HiveStatsUtils.class.getMethod("getFileStatusRecurse", Path.class, 
Integer.TYPE, FileSystem.class);
-                       // getFileStatusRecurse is a static method
-                       return (FileStatus[]) method.invoke(null, path, level, 
fs);
-               } catch (Exception ex) {
-                       throw new CatalogException("Failed to invoke 
HiveStatsUtils.getFileStatusRecurse()", ex);
-               }
-       }
-
-       @Override
-       public void makeSpecFromName(Map<String, String> partSpec, Path 
currPath) {
-               try {
-                       Method method = 
Warehouse.class.getMethod("makeSpecFromName", Map.class, Path.class);
-                       // makeSpecFromName is a static method
-                       method.invoke(null, partSpec, currPath);
-               } catch (Exception ex) {
-                       throw new CatalogException("Failed to invoke 
Warehouse.makeSpecFromName()", ex);
-               }
-       }
-
-       @Override
-       public ObjectInspector getObjectInspectorForConstant(PrimitiveTypeInfo 
primitiveTypeInfo, Object value) {
-               String className;
-               value = hivePrimitiveToWritable(value);
-               // Java constant object inspectors are not available until 
1.2.0 -- https://issues.apache.org/jira/browse/HIVE-9766
-               // So we have to use writable constant object inspectors for 
1.1.x
-               switch (primitiveTypeInfo.getPrimitiveCategory()) {
-                       case BOOLEAN:
-                               className = 
WritableConstantBooleanObjectInspector.class.getName();
-                               return 
HiveReflectionUtils.createConstantObjectInspector(className, value);
-                       case BYTE:
-                               className = 
WritableConstantByteObjectInspector.class.getName();
-                               return 
HiveReflectionUtils.createConstantObjectInspector(className, value);
-                       case SHORT:
-                               className = 
WritableConstantShortObjectInspector.class.getName();
-                               return 
HiveReflectionUtils.createConstantObjectInspector(className, value);
-                       case INT:
-                               className = 
WritableConstantIntObjectInspector.class.getName();
-                               return 
HiveReflectionUtils.createConstantObjectInspector(className, value);
-                       case LONG:
-                               className = 
WritableConstantLongObjectInspector.class.getName();
-                               return 
HiveReflectionUtils.createConstantObjectInspector(className, value);
-                       case FLOAT:
-                               className = 
WritableConstantFloatObjectInspector.class.getName();
-                               return 
HiveReflectionUtils.createConstantObjectInspector(className, value);
-                       case DOUBLE:
-                               className = 
WritableConstantDoubleObjectInspector.class.getName();
-                               return 
HiveReflectionUtils.createConstantObjectInspector(className, value);
-                       case STRING:
-                               className = 
WritableConstantStringObjectInspector.class.getName();
-                               return 
HiveReflectionUtils.createConstantObjectInspector(className, value);
-                       case CHAR:
-                               try {
-                                       
Constructor<WritableConstantHiveCharObjectInspector> constructor =
-                                                       
WritableConstantHiveCharObjectInspector.class.getDeclaredConstructor(CharTypeInfo.class,
 value.getClass());
-                                       constructor.setAccessible(true);
-                                       return 
constructor.newInstance(primitiveTypeInfo, value);
-                               } catch (Exception e) {
-                                       throw new FlinkHiveUDFException("Failed 
to create writable constant object inspector", e);
-                               }
-                       case VARCHAR:
-                               try {
-                                       
Constructor<WritableConstantHiveVarcharObjectInspector> constructor =
-                                                       
WritableConstantHiveVarcharObjectInspector.class.getDeclaredConstructor(VarcharTypeInfo.class,
 value.getClass());
-                                       constructor.setAccessible(true);
-                                       return 
constructor.newInstance(primitiveTypeInfo, value);
-                               } catch (Exception e) {
-                                       throw new FlinkHiveUDFException("Failed 
to create writable constant object inspector", e);
-                               }
-                       case DATE:
-                               className = 
WritableConstantDateObjectInspector.class.getName();
-                               return 
HiveReflectionUtils.createConstantObjectInspector(className, value);
-                       case TIMESTAMP:
-                               className = 
WritableConstantTimestampObjectInspector.class.getName();
-                               return 
HiveReflectionUtils.createConstantObjectInspector(className, value);
-                       case DECIMAL:
-                               try {
-                                       
Constructor<WritableConstantHiveDecimalObjectInspector> constructor =
-                                                       
WritableConstantHiveDecimalObjectInspector.class.getDeclaredConstructor(DecimalTypeInfo.class,
 value.getClass());
-                                       constructor.setAccessible(true);
-                                       return 
constructor.newInstance(primitiveTypeInfo, value);
-                               } catch (Exception e) {
-                                       throw new FlinkHiveUDFException("Failed 
to create writable constant object inspector", e);
-                               }
-                       case BINARY:
-                               className = 
WritableConstantBinaryObjectInspector.class.getName();
-                               return 
HiveReflectionUtils.createConstantObjectInspector(className, value);
-                       case UNKNOWN:
-                       case VOID:
-                               // If type is null, we use the Constant String 
to replace
-                               className = 
WritableConstantStringObjectInspector.class.getName();
-                               return 
HiveReflectionUtils.createConstantObjectInspector(className, value.toString());
-                       default:
-                               throw new FlinkHiveUDFException(
-                                               String.format("Cannot find 
ConstantObjectInspector for %s", primitiveTypeInfo));
-               }
-       }
-
-       @Override
        public ColumnStatisticsData 
toHiveDateColStats(CatalogColumnStatisticsDataDate flinkDateColStats) {
                throw new UnsupportedOperationException("DATE column stats are 
not supported until Hive 1.2.0");
        }
diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV230.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV230.java
index c373b69..a31b2ec 100644
--- 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV230.java
+++ 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV230.java
@@ -20,10 +20,6 @@ package org.apache.flink.table.catalog.hive.client;
 
 import org.apache.flink.table.catalog.exceptions.CatalogException;
 
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.common.FileUtils;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.IMetaStoreClient;
 import org.apache.hadoop.hive.metastore.RetryingMetaStoreClient;
@@ -36,7 +32,6 @@ import 
org.apache.hadoop.hive.ql.udf.generic.SimpleGenericUDAFParameterInfo;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
 import org.apache.thrift.TException;
 
-import java.io.IOException;
 import java.lang.reflect.Constructor;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
@@ -76,17 +71,6 @@ public class HiveShimV230 extends HiveShimV220 {
        }
 
        @Override
-       public boolean moveToTrash(FileSystem fs, Path path, Configuration 
conf, boolean purge) throws IOException {
-               try {
-                       Method method = 
FileUtils.class.getDeclaredMethod("moveToTrash", FileSystem.class, Path.class,
-                                       Configuration.class, boolean.class);
-                       return (boolean) method.invoke(null, fs, path, conf, 
purge);
-               } catch (NoSuchMethodException | IllegalAccessException | 
InvocationTargetException e) {
-                       throw new IOException("Failed to move " + path + " to 
trash", e);
-               }
-       }
-
-       @Override
        public void alterTable(IMetaStoreClient client, String databaseName, 
String tableName, Table table) throws InvalidOperationException, MetaException, 
TException {
                // For Hive-2.3.4, we don't need to tell HMS not to update 
stats.
                client.alter_table(databaseName, tableName, table);
diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV310.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV310.java
index 43cc79a..435a8ca 100644
--- 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV310.java
+++ 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV310.java
@@ -24,17 +24,11 @@ import 
org.apache.flink.table.catalog.hive.util.HiveReflectionUtils;
 import org.apache.flink.util.Preconditions;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.common.HiveStatsUtils;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.IMetaStoreClient;
 import org.apache.hadoop.hive.metastore.RetryingMetaStoreClient;
-import org.apache.hadoop.hive.metastore.Warehouse;
 import org.apache.hadoop.io.Writable;
 
-import java.io.IOException;
 import java.lang.reflect.Constructor;
 import java.lang.reflect.Field;
 import java.lang.reflect.InvocationTargetException;
@@ -45,7 +39,6 @@ import java.time.LocalDate;
 import java.time.LocalDateTime;
 import java.util.HashSet;
 import java.util.List;
-import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
 
@@ -139,29 +132,6 @@ public class HiveShimV310 extends HiveShimV235 {
        }
 
        @Override
-       public FileStatus[] getFileStatusRecurse(Path path, int level, 
FileSystem fs) throws IOException {
-               try {
-                       Method method = 
HiveStatsUtils.class.getMethod("getFileStatusRecurse", Path.class, 
Integer.TYPE, FileSystem.class);
-                       // getFileStatusRecurse is a static method
-                       List<FileStatus> results = (List<FileStatus>) 
method.invoke(null, path, level, fs);
-                       return results.toArray(new FileStatus[0]);
-               } catch (Exception ex) {
-                       throw new CatalogException("Failed to invoke 
HiveStatsUtils.getFileStatusRecurse()", ex);
-               }
-       }
-
-       @Override
-       public void makeSpecFromName(Map<String, String> partSpec, Path 
currPath) {
-               try {
-                       Method method = 
Warehouse.class.getMethod("makeSpecFromName", Map.class, Path.class, Set.class);
-                       // makeSpecFromName is a static method
-                       method.invoke(null, partSpec, currPath, null);
-               } catch (Exception ex) {
-                       throw new CatalogException("Failed to invoke 
Warehouse.makeSpecFromName()", ex);
-               }
-       }
-
-       @Override
        public Set<String> getNotNullColumns(IMetaStoreClient client, 
Configuration conf, String dbName, String tableName) {
                try {
                        // HMS catalog 
(https://issues.apache.org/jira/browse/HIVE-18685) is an on-going feature and 
we currently
diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/conversion/HiveInspectors.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/conversion/HiveInspectors.java
index 1e1a9cc..c9d4ffd 100644
--- 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/conversion/HiveInspectors.java
+++ 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/conversion/HiveInspectors.java
@@ -21,6 +21,7 @@ package org.apache.flink.table.functions.hive.conversion;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.table.catalog.exceptions.CatalogException;
 import org.apache.flink.table.catalog.hive.client.HiveShim;
+import org.apache.flink.table.catalog.hive.util.HiveReflectionUtils;
 import org.apache.flink.table.catalog.hive.util.HiveTypeUtil;
 import org.apache.flink.table.functions.hive.FlinkHiveUDFException;
 import org.apache.flink.table.types.DataType;
@@ -67,6 +68,22 @@ import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.ShortObjectInspec
 import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.StringObjectInspector;
 import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.TimestampObjectInspector;
 import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.VoidObjectInspector;
+import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableConstantBinaryObjectInspector;
+import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableConstantBooleanObjectInspector;
+import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableConstantByteObjectInspector;
+import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableConstantDateObjectInspector;
+import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableConstantDoubleObjectInspector;
+import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableConstantFloatObjectInspector;
+import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableConstantHiveCharObjectInspector;
+import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableConstantHiveDecimalObjectInspector;
+import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableConstantHiveVarcharObjectInspector;
+import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableConstantIntObjectInspector;
+import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableConstantLongObjectInspector;
+import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableConstantShortObjectInspector;
+import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableConstantStringObjectInspector;
+import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableConstantTimestampObjectInspector;
+import org.apache.hadoop.hive.serde2.typeinfo.CharTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.ListTypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.MapTypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
@@ -74,6 +91,7 @@ import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
+import org.apache.hadoop.hive.serde2.typeinfo.VarcharTypeInfo;
 import org.apache.hadoop.io.BooleanWritable;
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.FloatWritable;
@@ -81,7 +99,10 @@ import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
 
+import javax.annotation.Nonnull;
+
 import java.lang.reflect.Array;
+import java.lang.reflect.Constructor;
 import java.math.BigDecimal;
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -117,7 +138,7 @@ public class HiveInspectors {
                                PrimitiveTypeInfo primitiveTypeInfo = 
(PrimitiveTypeInfo) HiveTypeUtil.toHiveTypeInfo(argTypes[i], false);
                                constant = 
getConversion(getObjectInspector(primitiveTypeInfo), 
argTypes[i].getLogicalType(), hiveShim)
                                                .toHiveObject(constant);
-                               argumentInspectors[i] = 
hiveShim.getObjectInspectorForConstant(primitiveTypeInfo, constant);
+                               argumentInspectors[i] = 
getObjectInspectorForPrimitiveConstant(primitiveTypeInfo, constant, hiveShim);
                        }
                }
 
@@ -387,6 +408,82 @@ public class HiveInspectors {
                return 
getObjectInspector(HiveTypeUtil.toHiveTypeInfo(flinkType, true));
        }
 
+       private static ObjectInspector getObjectInspectorForPrimitiveConstant(
+                       PrimitiveTypeInfo primitiveTypeInfo, @Nonnull Object 
value, HiveShim hiveShim) {
+               String className;
+               value = hiveShim.hivePrimitiveToWritable(value);
+               switch (primitiveTypeInfo.getPrimitiveCategory()) {
+                       case BOOLEAN:
+                               className = 
WritableConstantBooleanObjectInspector.class.getName();
+                               return 
HiveReflectionUtils.createConstantObjectInspector(className, value);
+                       case BYTE:
+                               className = 
WritableConstantByteObjectInspector.class.getName();
+                               return 
HiveReflectionUtils.createConstantObjectInspector(className, value);
+                       case SHORT:
+                               className = 
WritableConstantShortObjectInspector.class.getName();
+                               return 
HiveReflectionUtils.createConstantObjectInspector(className, value);
+                       case INT:
+                               className = 
WritableConstantIntObjectInspector.class.getName();
+                               return 
HiveReflectionUtils.createConstantObjectInspector(className, value);
+                       case LONG:
+                               className = 
WritableConstantLongObjectInspector.class.getName();
+                               return 
HiveReflectionUtils.createConstantObjectInspector(className, value);
+                       case FLOAT:
+                               className = 
WritableConstantFloatObjectInspector.class.getName();
+                               return 
HiveReflectionUtils.createConstantObjectInspector(className, value);
+                       case DOUBLE:
+                               className = 
WritableConstantDoubleObjectInspector.class.getName();
+                               return 
HiveReflectionUtils.createConstantObjectInspector(className, value);
+                       case STRING:
+                               className = 
WritableConstantStringObjectInspector.class.getName();
+                               return 
HiveReflectionUtils.createConstantObjectInspector(className, value);
+                       case CHAR:
+                               try {
+                                       
Constructor<WritableConstantHiveCharObjectInspector> constructor =
+                                                       
WritableConstantHiveCharObjectInspector.class.getDeclaredConstructor(CharTypeInfo.class,
 value.getClass());
+                                       constructor.setAccessible(true);
+                                       return 
constructor.newInstance(primitiveTypeInfo, value);
+                               } catch (Exception e) {
+                                       throw new FlinkHiveUDFException("Failed 
to create writable constant object inspector", e);
+                               }
+                       case VARCHAR:
+                               try {
+                                       
Constructor<WritableConstantHiveVarcharObjectInspector> constructor =
+                                                       
WritableConstantHiveVarcharObjectInspector.class.getDeclaredConstructor(VarcharTypeInfo.class,
 value.getClass());
+                                       constructor.setAccessible(true);
+                                       return 
constructor.newInstance(primitiveTypeInfo, value);
+                               } catch (Exception e) {
+                                       throw new FlinkHiveUDFException("Failed 
to create writable constant object inspector", e);
+                               }
+                       case DATE:
+                               className = 
WritableConstantDateObjectInspector.class.getName();
+                               return 
HiveReflectionUtils.createConstantObjectInspector(className, value);
+                       case TIMESTAMP:
+                               className = 
WritableConstantTimestampObjectInspector.class.getName();
+                               return 
HiveReflectionUtils.createConstantObjectInspector(className, value);
+                       case DECIMAL:
+                               try {
+                                       
Constructor<WritableConstantHiveDecimalObjectInspector> constructor =
+                                                       
WritableConstantHiveDecimalObjectInspector.class.getDeclaredConstructor(DecimalTypeInfo.class,
 value.getClass());
+                                       constructor.setAccessible(true);
+                                       return 
constructor.newInstance(primitiveTypeInfo, value);
+                               } catch (Exception e) {
+                                       throw new FlinkHiveUDFException("Failed 
to create writable constant object inspector", e);
+                               }
+                       case BINARY:
+                               className = 
WritableConstantBinaryObjectInspector.class.getName();
+                               return 
HiveReflectionUtils.createConstantObjectInspector(className, value);
+                       case UNKNOWN:
+                       case VOID:
+                               // If type is null, we use the Constant String 
to replace
+                               className = 
WritableConstantStringObjectInspector.class.getName();
+                               return 
HiveReflectionUtils.createConstantObjectInspector(className, value.toString());
+                       default:
+                               throw new FlinkHiveUDFException(
+                                               String.format("Cannot find 
ConstantObjectInspector for %s", primitiveTypeInfo));
+               }
+       }
+
        private static ObjectInspector getObjectInspector(TypeInfo type) {
                switch (type.getCategory()) {
 

Reply via email to