This is an automated email from the ASF dual-hosted git repository.
bli pushed a commit to branch release-1.10
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.10 by this push:
new 15c58a8 [FLINK-15453][hive] Remove unneeded HiveShim methods
15c58a8 is described below
commit 15c58a889f7dd6d4d90f428aeb10b2c0f4477e62
Author: Rui Li <[email protected]>
AuthorDate: Fri Jan 3 13:22:28 2020 +0800
[FLINK-15453][hive] Remove unneeded HiveShim methods
closes #10755
---
.../flink/table/catalog/hive/client/HiveShim.java | 46 -------
.../table/catalog/hive/client/HiveShimV100.java | 137 ---------------------
.../table/catalog/hive/client/HiveShimV230.java | 16 ---
.../table/catalog/hive/client/HiveShimV310.java | 30 -----
.../functions/hive/conversion/HiveInspectors.java | 99 ++++++++++++++-
5 files changed, 98 insertions(+), 230 deletions(-)
diff --git
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShim.java
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShim.java
index cd6bc0b..346a8e1 100644
---
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShim.java
+++
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShim.java
@@ -22,8 +22,6 @@ import
org.apache.flink.table.api.constraints.UniqueConstraint;
import org.apache.flink.table.catalog.stats.CatalogColumnStatisticsDataDate;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.IMetaStoreClient;
@@ -38,19 +36,16 @@ import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
import org.apache.hadoop.hive.ql.exec.FunctionInfo;
import org.apache.hadoop.hive.ql.udf.generic.SimpleGenericUDAFParameterInfo;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
-import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.JobConf;
import org.apache.thrift.TException;
import javax.annotation.Nullable;
-import java.io.IOException;
import java.io.Serializable;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.util.List;
-import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
@@ -80,23 +75,6 @@ public interface HiveShim extends Serializable {
List<String> getViews(IMetaStoreClient client, String databaseName)
throws UnknownDBException, TException;
/**
- * Moves a particular file or directory to trash.
- * The file/directory can potentially be deleted (w/o going to trash)
if purge is set to true, or if it cannot
- * be moved properly.
- *
- * <p>This interface is here because FileUtils.moveToTrash in different
Hive versions have different signatures.
- *
- * @param fs the FileSystem to use
- * @param path the path of the file or directory to be moved to trash.
- * @param conf the Configuration to use
- * @param purge whether try to skip trash and directly delete the
file/directory. This flag may be ignored by
- * old Hive versions prior to 2.3.0.
- * @return true if the move is successful, and false otherwise
- * @throws IOException if the file/directory cannot be properly moved
or deleted
- */
- boolean moveToTrash(FileSystem fs, Path path, Configuration conf,
boolean purge) throws IOException;
-
- /**
* Alters a Hive table.
*
* @param client the Hive metastore client
@@ -145,30 +123,6 @@ public interface HiveShim extends Serializable {
Class<?> getTimestampDataTypeClass();
/**
- * The return type of HiveStatsUtils.getFileStatusRecurse was changed
from array to List in Hive 3.1.0.
- *
- * @param path the path of the directory
- * @param level the level of recursion
- * @param fs the file system of the directory
- * @return an array of the entries
- * @throws IOException in case of any io error
- */
- FileStatus[] getFileStatusRecurse(Path path, int level, FileSystem fs)
throws IOException;
-
- /**
- * The signature of HiveStatsUtils.makeSpecFromName() was changed in
Hive 3.1.0.
- *
- * @param partSpec partition specs
- * @param currPath the current path
- */
- void makeSpecFromName(Map<String, String> partSpec, Path currPath);
-
- /**
- * Get ObjectInspector for a constant value.
- */
- ObjectInspector getObjectInspectorForConstant(PrimitiveTypeInfo
primitiveTypeInfo, Object value);
-
- /**
* Generate Hive ColumnStatisticsData from Flink
CatalogColumnStatisticsDataDate for DATE columns.
*/
ColumnStatisticsData toHiveDateColStats(CatalogColumnStatisticsDataDate
flinkDateColStats);
diff --git
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV100.java
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV100.java
index d1d7c55..a6fcd78 100644
---
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV100.java
+++
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV100.java
@@ -21,24 +21,17 @@ package org.apache.flink.table.catalog.hive.client;
import org.apache.flink.connectors.hive.FlinkHiveException;
import org.apache.flink.table.api.constraints.UniqueConstraint;
import org.apache.flink.table.catalog.exceptions.CatalogException;
-import org.apache.flink.table.catalog.hive.util.HiveReflectionUtils;
import org.apache.flink.table.catalog.stats.CatalogColumnStatisticsDataDate;
-import org.apache.flink.table.functions.hive.FlinkHiveUDFException;
import org.apache.flink.util.Preconditions;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.common.FileUtils;
-import org.apache.hadoop.hive.common.HiveStatsUtils;
import org.apache.hadoop.hive.common.type.HiveChar;
import org.apache.hadoop.hive.common.type.HiveDecimal;
import org.apache.hadoop.hive.common.type.HiveVarchar;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
import org.apache.hadoop.hive.metastore.IMetaStoreClient;
-import org.apache.hadoop.hive.metastore.Warehouse;
import org.apache.hadoop.hive.metastore.api.ColumnStatisticsData;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.InvalidOperationException;
@@ -61,24 +54,6 @@ import org.apache.hadoop.hive.serde2.io.HiveVarcharWritable;
import org.apache.hadoop.hive.serde2.io.ShortWritable;
import org.apache.hadoop.hive.serde2.io.TimestampWritable;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
-import
org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableConstantBinaryObjectInspector;
-import
org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableConstantBooleanObjectInspector;
-import
org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableConstantByteObjectInspector;
-import
org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableConstantDateObjectInspector;
-import
org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableConstantDoubleObjectInspector;
-import
org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableConstantFloatObjectInspector;
-import
org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableConstantHiveCharObjectInspector;
-import
org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableConstantHiveDecimalObjectInspector;
-import
org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableConstantHiveVarcharObjectInspector;
-import
org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableConstantIntObjectInspector;
-import
org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableConstantLongObjectInspector;
-import
org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableConstantShortObjectInspector;
-import
org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableConstantStringObjectInspector;
-import
org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableConstantTimestampObjectInspector;
-import org.apache.hadoop.hive.serde2.typeinfo.CharTypeInfo;
-import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo;
-import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
-import org.apache.hadoop.hive.serde2.typeinfo.VarcharTypeInfo;
import org.apache.hadoop.io.BooleanWritable;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.FloatWritable;
@@ -92,7 +67,6 @@ import org.apache.thrift.TException;
import javax.annotation.Nonnull;
-import java.io.IOException;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
@@ -104,7 +78,6 @@ import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
-import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
@@ -141,16 +114,6 @@ public class HiveShimV100 implements HiveShim {
}
@Override
- public boolean moveToTrash(FileSystem fs, Path path, Configuration
conf, boolean purge) throws IOException {
- try {
- Method method =
FileUtils.class.getDeclaredMethod("moveToTrash", FileSystem.class, Path.class,
Configuration.class);
- return (boolean) method.invoke(null, fs, path, conf);
- } catch (NoSuchMethodException | InvocationTargetException |
IllegalAccessException e) {
- throw new IOException("Failed to move " + path + " to
trash", e);
- }
- }
-
- @Override
public void alterTable(IMetaStoreClient client, String databaseName,
String tableName, Table table) throws InvalidOperationException, MetaException,
TException {
client.alter_table(databaseName, tableName, table);
}
@@ -210,106 +173,6 @@ public class HiveShimV100 implements HiveShim {
}
@Override
- public FileStatus[] getFileStatusRecurse(Path path, int level,
FileSystem fs) throws IOException {
- try {
- Method method =
HiveStatsUtils.class.getMethod("getFileStatusRecurse", Path.class,
Integer.TYPE, FileSystem.class);
- // getFileStatusRecurse is a static method
- return (FileStatus[]) method.invoke(null, path, level,
fs);
- } catch (Exception ex) {
- throw new CatalogException("Failed to invoke
HiveStatsUtils.getFileStatusRecurse()", ex);
- }
- }
-
- @Override
- public void makeSpecFromName(Map<String, String> partSpec, Path
currPath) {
- try {
- Method method =
Warehouse.class.getMethod("makeSpecFromName", Map.class, Path.class);
- // makeSpecFromName is a static method
- method.invoke(null, partSpec, currPath);
- } catch (Exception ex) {
- throw new CatalogException("Failed to invoke
Warehouse.makeSpecFromName()", ex);
- }
- }
-
- @Override
- public ObjectInspector getObjectInspectorForConstant(PrimitiveTypeInfo
primitiveTypeInfo, Object value) {
- String className;
- value = hivePrimitiveToWritable(value);
- // Java constant object inspectors are not available until
1.2.0 -- https://issues.apache.org/jira/browse/HIVE-9766
- // So we have to use writable constant object inspectors for
1.1.x
- switch (primitiveTypeInfo.getPrimitiveCategory()) {
- case BOOLEAN:
- className =
WritableConstantBooleanObjectInspector.class.getName();
- return
HiveReflectionUtils.createConstantObjectInspector(className, value);
- case BYTE:
- className =
WritableConstantByteObjectInspector.class.getName();
- return
HiveReflectionUtils.createConstantObjectInspector(className, value);
- case SHORT:
- className =
WritableConstantShortObjectInspector.class.getName();
- return
HiveReflectionUtils.createConstantObjectInspector(className, value);
- case INT:
- className =
WritableConstantIntObjectInspector.class.getName();
- return
HiveReflectionUtils.createConstantObjectInspector(className, value);
- case LONG:
- className =
WritableConstantLongObjectInspector.class.getName();
- return
HiveReflectionUtils.createConstantObjectInspector(className, value);
- case FLOAT:
- className =
WritableConstantFloatObjectInspector.class.getName();
- return
HiveReflectionUtils.createConstantObjectInspector(className, value);
- case DOUBLE:
- className =
WritableConstantDoubleObjectInspector.class.getName();
- return
HiveReflectionUtils.createConstantObjectInspector(className, value);
- case STRING:
- className =
WritableConstantStringObjectInspector.class.getName();
- return
HiveReflectionUtils.createConstantObjectInspector(className, value);
- case CHAR:
- try {
-
Constructor<WritableConstantHiveCharObjectInspector> constructor =
-
WritableConstantHiveCharObjectInspector.class.getDeclaredConstructor(CharTypeInfo.class,
value.getClass());
- constructor.setAccessible(true);
- return
constructor.newInstance(primitiveTypeInfo, value);
- } catch (Exception e) {
- throw new FlinkHiveUDFException("Failed
to create writable constant object inspector", e);
- }
- case VARCHAR:
- try {
-
Constructor<WritableConstantHiveVarcharObjectInspector> constructor =
-
WritableConstantHiveVarcharObjectInspector.class.getDeclaredConstructor(VarcharTypeInfo.class,
value.getClass());
- constructor.setAccessible(true);
- return
constructor.newInstance(primitiveTypeInfo, value);
- } catch (Exception e) {
- throw new FlinkHiveUDFException("Failed
to create writable constant object inspector", e);
- }
- case DATE:
- className =
WritableConstantDateObjectInspector.class.getName();
- return
HiveReflectionUtils.createConstantObjectInspector(className, value);
- case TIMESTAMP:
- className =
WritableConstantTimestampObjectInspector.class.getName();
- return
HiveReflectionUtils.createConstantObjectInspector(className, value);
- case DECIMAL:
- try {
-
Constructor<WritableConstantHiveDecimalObjectInspector> constructor =
-
WritableConstantHiveDecimalObjectInspector.class.getDeclaredConstructor(DecimalTypeInfo.class,
value.getClass());
- constructor.setAccessible(true);
- return
constructor.newInstance(primitiveTypeInfo, value);
- } catch (Exception e) {
- throw new FlinkHiveUDFException("Failed
to create writable constant object inspector", e);
- }
- case BINARY:
- className =
WritableConstantBinaryObjectInspector.class.getName();
- return
HiveReflectionUtils.createConstantObjectInspector(className, value);
- case UNKNOWN:
- case VOID:
- // If type is null, we use the Constant String
to replace
- className =
WritableConstantStringObjectInspector.class.getName();
- return
HiveReflectionUtils.createConstantObjectInspector(className, value.toString());
- default:
- throw new FlinkHiveUDFException(
- String.format("Cannot find
ConstantObjectInspector for %s", primitiveTypeInfo));
- }
- }
-
- @Override
public ColumnStatisticsData
toHiveDateColStats(CatalogColumnStatisticsDataDate flinkDateColStats) {
throw new UnsupportedOperationException("DATE column stats are
not supported until Hive 1.2.0");
}
diff --git
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV230.java
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV230.java
index c373b69..a31b2ec 100644
---
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV230.java
+++
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV230.java
@@ -20,10 +20,6 @@ package org.apache.flink.table.catalog.hive.client;
import org.apache.flink.table.catalog.exceptions.CatalogException;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.common.FileUtils;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.IMetaStoreClient;
import org.apache.hadoop.hive.metastore.RetryingMetaStoreClient;
@@ -36,7 +32,6 @@ import
org.apache.hadoop.hive.ql.udf.generic.SimpleGenericUDAFParameterInfo;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.thrift.TException;
-import java.io.IOException;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
@@ -76,17 +71,6 @@ public class HiveShimV230 extends HiveShimV220 {
}
@Override
- public boolean moveToTrash(FileSystem fs, Path path, Configuration
conf, boolean purge) throws IOException {
- try {
- Method method =
FileUtils.class.getDeclaredMethod("moveToTrash", FileSystem.class, Path.class,
- Configuration.class, boolean.class);
- return (boolean) method.invoke(null, fs, path, conf,
purge);
- } catch (NoSuchMethodException | IllegalAccessException |
InvocationTargetException e) {
- throw new IOException("Failed to move " + path + " to
trash", e);
- }
- }
-
- @Override
public void alterTable(IMetaStoreClient client, String databaseName,
String tableName, Table table) throws InvalidOperationException, MetaException,
TException {
// For Hive-2.3.4, we don't need to tell HMS not to update
stats.
client.alter_table(databaseName, tableName, table);
diff --git
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV310.java
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV310.java
index 43cc79a..435a8ca 100644
---
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV310.java
+++
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV310.java
@@ -24,17 +24,11 @@ import
org.apache.flink.table.catalog.hive.util.HiveReflectionUtils;
import org.apache.flink.util.Preconditions;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.common.HiveStatsUtils;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.IMetaStoreClient;
import org.apache.hadoop.hive.metastore.RetryingMetaStoreClient;
-import org.apache.hadoop.hive.metastore.Warehouse;
import org.apache.hadoop.io.Writable;
-import java.io.IOException;
import java.lang.reflect.Constructor;
import java.lang.reflect.Field;
import java.lang.reflect.InvocationTargetException;
@@ -45,7 +39,6 @@ import java.time.LocalDate;
import java.time.LocalDateTime;
import java.util.HashSet;
import java.util.List;
-import java.util.Map;
import java.util.Optional;
import java.util.Set;
@@ -139,29 +132,6 @@ public class HiveShimV310 extends HiveShimV235 {
}
@Override
- public FileStatus[] getFileStatusRecurse(Path path, int level,
FileSystem fs) throws IOException {
- try {
- Method method =
HiveStatsUtils.class.getMethod("getFileStatusRecurse", Path.class,
Integer.TYPE, FileSystem.class);
- // getFileStatusRecurse is a static method
- List<FileStatus> results = (List<FileStatus>)
method.invoke(null, path, level, fs);
- return results.toArray(new FileStatus[0]);
- } catch (Exception ex) {
- throw new CatalogException("Failed to invoke
HiveStatsUtils.getFileStatusRecurse()", ex);
- }
- }
-
- @Override
- public void makeSpecFromName(Map<String, String> partSpec, Path
currPath) {
- try {
- Method method =
Warehouse.class.getMethod("makeSpecFromName", Map.class, Path.class, Set.class);
- // makeSpecFromName is a static method
- method.invoke(null, partSpec, currPath, null);
- } catch (Exception ex) {
- throw new CatalogException("Failed to invoke
Warehouse.makeSpecFromName()", ex);
- }
- }
-
- @Override
public Set<String> getNotNullColumns(IMetaStoreClient client,
Configuration conf, String dbName, String tableName) {
try {
// HMS catalog
(https://issues.apache.org/jira/browse/HIVE-18685) is an on-going feature and
we currently
diff --git
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/conversion/HiveInspectors.java
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/conversion/HiveInspectors.java
index 1e1a9cc..c9d4ffd 100644
---
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/conversion/HiveInspectors.java
+++
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/conversion/HiveInspectors.java
@@ -21,6 +21,7 @@ package org.apache.flink.table.functions.hive.conversion;
import org.apache.flink.annotation.Internal;
import org.apache.flink.table.catalog.exceptions.CatalogException;
import org.apache.flink.table.catalog.hive.client.HiveShim;
+import org.apache.flink.table.catalog.hive.util.HiveReflectionUtils;
import org.apache.flink.table.catalog.hive.util.HiveTypeUtil;
import org.apache.flink.table.functions.hive.FlinkHiveUDFException;
import org.apache.flink.table.types.DataType;
@@ -67,6 +68,22 @@ import
org.apache.hadoop.hive.serde2.objectinspector.primitive.ShortObjectInspec
import
org.apache.hadoop.hive.serde2.objectinspector.primitive.StringObjectInspector;
import
org.apache.hadoop.hive.serde2.objectinspector.primitive.TimestampObjectInspector;
import
org.apache.hadoop.hive.serde2.objectinspector.primitive.VoidObjectInspector;
+import
org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableConstantBinaryObjectInspector;
+import
org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableConstantBooleanObjectInspector;
+import
org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableConstantByteObjectInspector;
+import
org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableConstantDateObjectInspector;
+import
org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableConstantDoubleObjectInspector;
+import
org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableConstantFloatObjectInspector;
+import
org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableConstantHiveCharObjectInspector;
+import
org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableConstantHiveDecimalObjectInspector;
+import
org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableConstantHiveVarcharObjectInspector;
+import
org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableConstantIntObjectInspector;
+import
org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableConstantLongObjectInspector;
+import
org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableConstantShortObjectInspector;
+import
org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableConstantStringObjectInspector;
+import
org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableConstantTimestampObjectInspector;
+import org.apache.hadoop.hive.serde2.typeinfo.CharTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.ListTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.MapTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
@@ -74,6 +91,7 @@ import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
+import org.apache.hadoop.hive.serde2.typeinfo.VarcharTypeInfo;
import org.apache.hadoop.io.BooleanWritable;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.FloatWritable;
@@ -81,7 +99,10 @@ import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
+import javax.annotation.Nonnull;
+
import java.lang.reflect.Array;
+import java.lang.reflect.Constructor;
import java.math.BigDecimal;
import java.util.ArrayList;
import java.util.HashMap;
@@ -117,7 +138,7 @@ public class HiveInspectors {
PrimitiveTypeInfo primitiveTypeInfo =
(PrimitiveTypeInfo) HiveTypeUtil.toHiveTypeInfo(argTypes[i], false);
constant =
getConversion(getObjectInspector(primitiveTypeInfo),
argTypes[i].getLogicalType(), hiveShim)
.toHiveObject(constant);
- argumentInspectors[i] =
hiveShim.getObjectInspectorForConstant(primitiveTypeInfo, constant);
+ argumentInspectors[i] =
getObjectInspectorForPrimitiveConstant(primitiveTypeInfo, constant, hiveShim);
}
}
@@ -387,6 +408,82 @@ public class HiveInspectors {
return
getObjectInspector(HiveTypeUtil.toHiveTypeInfo(flinkType, true));
}
+ private static ObjectInspector getObjectInspectorForPrimitiveConstant(
+ PrimitiveTypeInfo primitiveTypeInfo, @Nonnull Object
value, HiveShim hiveShim) {
+ String className;
+ value = hiveShim.hivePrimitiveToWritable(value);
+ switch (primitiveTypeInfo.getPrimitiveCategory()) {
+ case BOOLEAN:
+ className =
WritableConstantBooleanObjectInspector.class.getName();
+ return
HiveReflectionUtils.createConstantObjectInspector(className, value);
+ case BYTE:
+ className =
WritableConstantByteObjectInspector.class.getName();
+ return
HiveReflectionUtils.createConstantObjectInspector(className, value);
+ case SHORT:
+ className =
WritableConstantShortObjectInspector.class.getName();
+ return
HiveReflectionUtils.createConstantObjectInspector(className, value);
+ case INT:
+ className =
WritableConstantIntObjectInspector.class.getName();
+ return
HiveReflectionUtils.createConstantObjectInspector(className, value);
+ case LONG:
+ className =
WritableConstantLongObjectInspector.class.getName();
+ return
HiveReflectionUtils.createConstantObjectInspector(className, value);
+ case FLOAT:
+ className =
WritableConstantFloatObjectInspector.class.getName();
+ return
HiveReflectionUtils.createConstantObjectInspector(className, value);
+ case DOUBLE:
+ className =
WritableConstantDoubleObjectInspector.class.getName();
+ return
HiveReflectionUtils.createConstantObjectInspector(className, value);
+ case STRING:
+ className =
WritableConstantStringObjectInspector.class.getName();
+ return
HiveReflectionUtils.createConstantObjectInspector(className, value);
+ case CHAR:
+ try {
+
Constructor<WritableConstantHiveCharObjectInspector> constructor =
+
WritableConstantHiveCharObjectInspector.class.getDeclaredConstructor(CharTypeInfo.class,
value.getClass());
+ constructor.setAccessible(true);
+ return
constructor.newInstance(primitiveTypeInfo, value);
+ } catch (Exception e) {
+ throw new FlinkHiveUDFException("Failed
to create writable constant object inspector", e);
+ }
+ case VARCHAR:
+ try {
+
Constructor<WritableConstantHiveVarcharObjectInspector> constructor =
+
WritableConstantHiveVarcharObjectInspector.class.getDeclaredConstructor(VarcharTypeInfo.class,
value.getClass());
+ constructor.setAccessible(true);
+ return
constructor.newInstance(primitiveTypeInfo, value);
+ } catch (Exception e) {
+ throw new FlinkHiveUDFException("Failed
to create writable constant object inspector", e);
+ }
+ case DATE:
+ className =
WritableConstantDateObjectInspector.class.getName();
+ return
HiveReflectionUtils.createConstantObjectInspector(className, value);
+ case TIMESTAMP:
+ className =
WritableConstantTimestampObjectInspector.class.getName();
+ return
HiveReflectionUtils.createConstantObjectInspector(className, value);
+ case DECIMAL:
+ try {
+
Constructor<WritableConstantHiveDecimalObjectInspector> constructor =
+
WritableConstantHiveDecimalObjectInspector.class.getDeclaredConstructor(DecimalTypeInfo.class,
value.getClass());
+ constructor.setAccessible(true);
+ return
constructor.newInstance(primitiveTypeInfo, value);
+ } catch (Exception e) {
+ throw new FlinkHiveUDFException("Failed
to create writable constant object inspector", e);
+ }
+ case BINARY:
+ className =
WritableConstantBinaryObjectInspector.class.getName();
+ return
HiveReflectionUtils.createConstantObjectInspector(className, value);
+ case UNKNOWN:
+ case VOID:
+ // If type is null, we use the Constant String
to replace
+ className =
WritableConstantStringObjectInspector.class.getName();
+ return
HiveReflectionUtils.createConstantObjectInspector(className, value.toString());
+ default:
+ throw new FlinkHiveUDFException(
+ String.format("Cannot find
ConstantObjectInspector for %s", primitiveTypeInfo));
+ }
+ }
+
private static ObjectInspector getObjectInspector(TypeInfo type) {
switch (type.getCategory()) {