This is an automated email from the ASF dual-hosted git repository. bli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new 0437ad2 [FLINK-13841][hive] Extend Hive version support to all 1.2 and 2.3 versions 0437ad2 is described below commit 0437ad2fce8a064424522b4b01d3dcea55597033 Author: Xuefu Zhang <xuef...@alibaba-inc.com> AuthorDate: Fri Aug 23 15:40:01 2019 -0700 [FLINK-13841][hive] Extend Hive version support to all 1.2 and 2.3 versions Support all 1.2 and 2.3 minor Hive versions instead of currently 1.2.1 and 2.3.4 only. This closes #9524. --- .../hive/client/HiveMetastoreClientWrapper.java | 12 ++++--- .../flink/table/catalog/hive/client/HiveShim.java | 4 +++ .../table/catalog/hive/client/HiveShimLoader.java | 40 ++++++++++++++++++---- .../client/{HiveShimV1.java => HiveShimV120.java} | 24 +++++++++++-- .../table/catalog/hive/client/HiveShimV121.java} | 29 ++-------------- .../table/catalog/hive/client/HiveShimV122.java} | 29 ++-------------- .../client/{HiveShimV2.java => HiveShimV230.java} | 27 +++++++++++++-- .../table/catalog/hive/client/HiveShimV231.java} | 29 ++-------------- .../table/catalog/hive/client/HiveShimV232.java} | 29 ++-------------- .../table/catalog/hive/client/HiveShimV233.java} | 29 ++-------------- .../table/catalog/hive/client/HiveShimV234.java} | 29 ++-------------- .../table/catalog/hive/client/HiveShimV235.java} | 29 ++-------------- .../connectors/hive/HiveRunnerShimLoader.java | 11 ++++-- 13 files changed, 122 insertions(+), 199 deletions(-) diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveMetastoreClientWrapper.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveMetastoreClientWrapper.java index e1b25e6..c7f8895 100644 --- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveMetastoreClientWrapper.java +++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveMetastoreClientWrapper.java @@ -156,11 +156,6 @@ public class HiveMetastoreClientWrapper implements AutoCloseable { return client.dropPartition(databaseName, tableName, partitionValues, deleteData); } - public void alter_partition(String databaseName, String tableName, Partition partition) - throws InvalidOperationException, MetaException, TException { - client.alter_partition(databaseName, tableName, partition); - } - public void renamePartition(String databaseName, String tableName, List<String> partitionValues, Partition partition) throws InvalidOperationException, MetaException, TException { client.renamePartition(databaseName, tableName, partitionValues, partition); @@ -235,4 +230,11 @@ public class HiveMetastoreClientWrapper implements AutoCloseable { HiveShim hiveShim = HiveShimLoader.loadHiveShim(hiveVersion); hiveShim.alterTable(client, databaseName, tableName, table); } + + public void alter_partition(String databaseName, String tableName, Partition partition) + throws InvalidOperationException, MetaException, TException { + HiveShim hiveShim = HiveShimLoader.loadHiveShim(hiveVersion); + hiveShim.alterPartition(client, databaseName, tableName, partition); + } + } diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShim.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShim.java index 812ece1..33e9708 100644 --- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShim.java +++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShim.java @@ -27,6 +27,7 @@ import org.apache.hadoop.hive.metastore.api.Function; import org.apache.hadoop.hive.metastore.api.InvalidOperationException; import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; +import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.metastore.api.UnknownDBException; import org.apache.hadoop.hive.ql.udf.generic.SimpleGenericUDAFParameterInfo; @@ -100,6 +101,9 @@ public interface HiveShim { void alterTable(IMetaStoreClient client, String databaseName, String tableName, Table table) throws InvalidOperationException, MetaException, TException; + void alterPartition(IMetaStoreClient client, String databaseName, String tableName, Partition partition) + throws InvalidOperationException, MetaException, TException; + /** * Creates SimpleGenericUDAFParameterInfo. */ diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimLoader.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimLoader.java index 771cfc0..5681cfb 100644 --- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimLoader.java +++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimLoader.java @@ -32,8 +32,15 @@ import java.util.concurrent.ConcurrentHashMap; */ public class HiveShimLoader { - public static final String HIVE_V1_VERSION_NAME = "1.2.1"; - public static final String HIVE_V2_VERSION_NAME = "2.3.4"; + public static final String HIVE_VERSION_V1_2_0 = "1.2.0"; + public static final String HIVE_VERSION_V1_2_1 = "1.2.1"; + public static final String HIVE_VERSION_V1_2_2 = "1.2.2"; + public static final String HIVE_VERSION_V2_3_0 = "2.3.0"; + public static final String HIVE_VERSION_V2_3_1 = "2.3.1"; + public static final String HIVE_VERSION_V2_3_2 = "2.3.2"; + public static final String HIVE_VERSION_V2_3_3 = "2.3.3"; + public static final String HIVE_VERSION_V2_3_4 = "2.3.4"; + public static final String HIVE_VERSION_V2_3_5 = "2.3.5"; private static final Map<String, HiveShim> hiveShims = new ConcurrentHashMap<>(2); @@ -44,11 +51,32 @@ public class HiveShimLoader { public static HiveShim loadHiveShim(String version) { return hiveShims.computeIfAbsent(version, (v) -> { - if (v.startsWith(HIVE_V1_VERSION_NAME)) { - return new HiveShimV1(); + if (v.startsWith(HIVE_VERSION_V1_2_0)) { + return new HiveShimV120(); } - if (v.startsWith(HIVE_V2_VERSION_NAME)) { - return new HiveShimV2(); + if (v.startsWith(HIVE_VERSION_V1_2_1)) { + return new HiveShimV121(); + } + if (v.startsWith(HIVE_VERSION_V1_2_2)) { + return new HiveShimV122(); + } + if (v.startsWith(HIVE_VERSION_V2_3_0)) { + return new HiveShimV230(); + } + if (v.startsWith(HIVE_VERSION_V2_3_1)) { + return new HiveShimV231(); + } + if (v.startsWith(HIVE_VERSION_V2_3_2)) { + return new HiveShimV232(); + } + if (v.startsWith(HIVE_VERSION_V2_3_3)) { + return new HiveShimV233(); + } + if (v.startsWith(HIVE_VERSION_V2_3_4)) { + return new HiveShimV234(); + } + if (v.startsWith(HIVE_VERSION_V2_3_5)) { + return new HiveShimV235(); } throw new CatalogException("Unsupported Hive version " + v); }); diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV1.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV120.java similarity index 84% rename from flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV1.java rename to flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV120.java index 6afcf5a..a3f30ff 100644 --- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV1.java +++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV120.java @@ -32,6 +32,7 @@ import org.apache.hadoop.hive.metastore.api.Function; import org.apache.hadoop.hive.metastore.api.InvalidOperationException; import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; +import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.metastore.api.UnknownDBException; import org.apache.hadoop.hive.ql.udf.generic.SimpleGenericUDAFParameterInfo; @@ -46,9 +47,9 @@ import java.util.ArrayList; import java.util.List; /** - * Shim for Hive version 1.x. + * Shim for Hive version 1.2.0. */ -public class HiveShimV1 implements HiveShim { +public class HiveShimV120 implements HiveShim { @Override public IMetaStoreClient getHiveMetastoreClient(HiveConf hiveConf) { @@ -114,6 +115,25 @@ public class HiveShimV1 implements HiveShim { } @Override + public void alterPartition(IMetaStoreClient client, String databaseName, String tableName, Partition partition) + throws InvalidOperationException, MetaException, TException { + String errorMsg = "Failed to alter partition for table %s in database %s"; + try { + Method method = client.getClass().getMethod("alter_partition", String.class, String.class, Partition.class); + method.invoke(client, databaseName, tableName, partition); + } catch (InvocationTargetException ite) { + Throwable targetEx = ite.getTargetException(); + if (targetEx instanceof TException) { + throw (TException) targetEx; + } else { + throw new CatalogException(String.format(errorMsg, tableName, databaseName), targetEx); + } + } catch (NoSuchMethodException | IllegalAccessException e) { + throw new CatalogException(String.format(errorMsg, tableName, databaseName), e); + } + } + + @Override public SimpleGenericUDAFParameterInfo createUDAFParameterInfo(ObjectInspector[] params, boolean isWindowing, boolean distinct, boolean allColumns) { try { Constructor constructor = SimpleGenericUDAFParameterInfo.class.getConstructor(ObjectInspector[].class, diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveRunnerShimLoader.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV121.java similarity index 50% copy from flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveRunnerShimLoader.java copy to flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV121.java index 3bf2b74..b2db763 100644 --- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveRunnerShimLoader.java +++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV121.java @@ -16,34 +16,11 @@ * limitations under the License. */ -package org.apache.flink.connectors.hive; - -import org.apache.flink.table.catalog.hive.client.HiveShimLoader; - -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; +package org.apache.flink.table.catalog.hive.client; /** - * Loader to load proper HiveRunnerShim. + * Shim for Hive version 1.2.1. */ -public class HiveRunnerShimLoader { - - private static final Map<String, HiveRunnerShim> hiveRunnerShims = new ConcurrentHashMap<>(); - - private HiveRunnerShimLoader() { - } +public class HiveShimV121 extends HiveShimV120 { - public static HiveRunnerShim load() { - String hiveVersion = HiveShimLoader.getHiveVersion(); - return hiveRunnerShims.computeIfAbsent(hiveVersion, v -> { - switch (v) { - case HiveShimLoader.HIVE_V1_VERSION_NAME: - return new HiveRunnerShimV3(); - case HiveShimLoader.HIVE_V2_VERSION_NAME: - return new HiveRunnerShimV4(); - default: - throw new RuntimeException("Unsupported Hive version " + v); - } - }); - } } diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveRunnerShimLoader.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV122.java similarity index 50% copy from flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveRunnerShimLoader.java copy to flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV122.java index 3bf2b74..318cc74 100644 --- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveRunnerShimLoader.java +++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV122.java @@ -16,34 +16,11 @@ * limitations under the License. */ -package org.apache.flink.connectors.hive; - -import org.apache.flink.table.catalog.hive.client.HiveShimLoader; - -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; +package org.apache.flink.table.catalog.hive.client; /** - * Loader to load proper HiveRunnerShim. + * Shim for Hive version 1.2.2. */ -public class HiveRunnerShimLoader { - - private static final Map<String, HiveRunnerShim> hiveRunnerShims = new ConcurrentHashMap<>(); - - private HiveRunnerShimLoader() { - } +public class HiveShimV122 extends HiveShimV121 { - public static HiveRunnerShim load() { - String hiveVersion = HiveShimLoader.getHiveVersion(); - return hiveRunnerShims.computeIfAbsent(hiveVersion, v -> { - switch (v) { - case HiveShimLoader.HIVE_V1_VERSION_NAME: - return new HiveRunnerShimV3(); - case HiveShimLoader.HIVE_V2_VERSION_NAME: - return new HiveRunnerShimV4(); - default: - throw new RuntimeException("Unsupported Hive version " + v); - } - }); - } } diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV2.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV230.java similarity index 81% rename from flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV2.java rename to flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV230.java index 2510497..7965b8f 100644 --- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV2.java +++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV230.java @@ -28,10 +28,12 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.IMetaStoreClient; import org.apache.hadoop.hive.metastore.RetryingMetaStoreClient; import org.apache.hadoop.hive.metastore.TableType; +import org.apache.hadoop.hive.metastore.api.EnvironmentContext; import org.apache.hadoop.hive.metastore.api.Function; import org.apache.hadoop.hive.metastore.api.InvalidOperationException; import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; +import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.metastore.api.UnknownDBException; import org.apache.hadoop.hive.ql.udf.generic.SimpleGenericUDAFParameterInfo; @@ -45,9 +47,9 @@ import java.lang.reflect.Method; import java.util.List; /** - * Shim for Hive version 2.x. + * Shim for Hive version 2.3.0. */ -public class HiveShimV2 implements HiveShim { +public class HiveShimV230 extends HiveShimV122 { @Override public IMetaStoreClient getHiveMetastoreClient(HiveConf hiveConf) { @@ -100,6 +102,26 @@ public class HiveShimV2 implements HiveShim { } @Override + public void alterPartition(IMetaStoreClient client, String databaseName, String tableName, Partition partition) + throws InvalidOperationException, MetaException, TException { + String errorMsg = "Failed to alter partition for table %s in database %s"; + try { + Method method = client.getClass().getMethod("alter_partition", String.class, String.class, + Partition.class, EnvironmentContext.class); + method.invoke(client, databaseName, tableName, partition, null); + } catch (InvocationTargetException ite) { + Throwable targetEx = ite.getTargetException(); + if (targetEx instanceof TException) { + throw (TException) targetEx; + } else { + throw new CatalogException(String.format(errorMsg, tableName, databaseName), targetEx); + } + } catch (NoSuchMethodException | IllegalAccessException e) { + throw new CatalogException(String.format(errorMsg, tableName, databaseName), e); + } + } + + @Override public SimpleGenericUDAFParameterInfo createUDAFParameterInfo(ObjectInspector[] params, boolean isWindowing, boolean distinct, boolean allColumns) { try { Constructor constructor = SimpleGenericUDAFParameterInfo.class.getConstructor(ObjectInspector[].class, @@ -109,4 +131,5 @@ public class HiveShimV2 implements HiveShim { throw new CatalogException("Failed to create SimpleGenericUDAFParameterInfo", e); } } + } diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveRunnerShimLoader.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV231.java similarity index 50% copy from flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveRunnerShimLoader.java copy to flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV231.java index 3bf2b74..494a2b9 100644 --- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveRunnerShimLoader.java +++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV231.java @@ -16,34 +16,11 @@ * limitations under the License. */ -package org.apache.flink.connectors.hive; - -import org.apache.flink.table.catalog.hive.client.HiveShimLoader; - -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; +package org.apache.flink.table.catalog.hive.client; /** - * Loader to load proper HiveRunnerShim. + * Shim for Hive version 2.3.1. */ -public class HiveRunnerShimLoader { - - private static final Map<String, HiveRunnerShim> hiveRunnerShims = new ConcurrentHashMap<>(); - - private HiveRunnerShimLoader() { - } +public class HiveShimV231 extends HiveShimV230 { - public static HiveRunnerShim load() { - String hiveVersion = HiveShimLoader.getHiveVersion(); - return hiveRunnerShims.computeIfAbsent(hiveVersion, v -> { - switch (v) { - case HiveShimLoader.HIVE_V1_VERSION_NAME: - return new HiveRunnerShimV3(); - case HiveShimLoader.HIVE_V2_VERSION_NAME: - return new HiveRunnerShimV4(); - default: - throw new RuntimeException("Unsupported Hive version " + v); - } - }); - } } diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveRunnerShimLoader.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV232.java similarity index 50% copy from flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveRunnerShimLoader.java copy to flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV232.java index 3bf2b74..a1ce30e 100644 --- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveRunnerShimLoader.java +++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV232.java @@ -16,34 +16,11 @@ * limitations under the License. */ -package org.apache.flink.connectors.hive; - -import org.apache.flink.table.catalog.hive.client.HiveShimLoader; - -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; +package org.apache.flink.table.catalog.hive.client; /** - * Loader to load proper HiveRunnerShim. + * Shim for Hive version 2.3.2. */ -public class HiveRunnerShimLoader { - - private static final Map<String, HiveRunnerShim> hiveRunnerShims = new ConcurrentHashMap<>(); - - private HiveRunnerShimLoader() { - } +public class HiveShimV232 extends HiveShimV231 { - public static HiveRunnerShim load() { - String hiveVersion = HiveShimLoader.getHiveVersion(); - return hiveRunnerShims.computeIfAbsent(hiveVersion, v -> { - switch (v) { - case HiveShimLoader.HIVE_V1_VERSION_NAME: - return new HiveRunnerShimV3(); - case HiveShimLoader.HIVE_V2_VERSION_NAME: - return new HiveRunnerShimV4(); - default: - throw new RuntimeException("Unsupported Hive version " + v); - } - }); - } } diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveRunnerShimLoader.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV233.java similarity index 50% copy from flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveRunnerShimLoader.java copy to flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV233.java index 3bf2b74..3fdb961 100644 --- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveRunnerShimLoader.java +++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV233.java @@ -16,34 +16,11 @@ * limitations under the License. */ -package org.apache.flink.connectors.hive; - -import org.apache.flink.table.catalog.hive.client.HiveShimLoader; - -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; +package org.apache.flink.table.catalog.hive.client; /** - * Loader to load proper HiveRunnerShim. + * Shim for Hive version 2.3.3. */ -public class HiveRunnerShimLoader { - - private static final Map<String, HiveRunnerShim> hiveRunnerShims = new ConcurrentHashMap<>(); - - private HiveRunnerShimLoader() { - } +public class HiveShimV233 extends HiveShimV232 { - public static HiveRunnerShim load() { - String hiveVersion = HiveShimLoader.getHiveVersion(); - return hiveRunnerShims.computeIfAbsent(hiveVersion, v -> { - switch (v) { - case HiveShimLoader.HIVE_V1_VERSION_NAME: - return new HiveRunnerShimV3(); - case HiveShimLoader.HIVE_V2_VERSION_NAME: - return new HiveRunnerShimV4(); - default: - throw new RuntimeException("Unsupported Hive version " + v); - } - }); - } } diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveRunnerShimLoader.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV234.java similarity index 50% copy from flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveRunnerShimLoader.java copy to flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV234.java index 3bf2b74..2330a29 100644 --- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveRunnerShimLoader.java +++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV234.java @@ -16,34 +16,11 @@ * limitations under the License. */ -package org.apache.flink.connectors.hive; - -import org.apache.flink.table.catalog.hive.client.HiveShimLoader; - -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; +package org.apache.flink.table.catalog.hive.client; /** - * Loader to load proper HiveRunnerShim. + * Shim for Hive version 2.3.4. */ -public class HiveRunnerShimLoader { - - private static final Map<String, HiveRunnerShim> hiveRunnerShims = new ConcurrentHashMap<>(); - - private HiveRunnerShimLoader() { - } +public class HiveShimV234 extends HiveShimV233 { - public static HiveRunnerShim load() { - String hiveVersion = HiveShimLoader.getHiveVersion(); - return hiveRunnerShims.computeIfAbsent(hiveVersion, v -> { - switch (v) { - case HiveShimLoader.HIVE_V1_VERSION_NAME: - return new HiveRunnerShimV3(); - case HiveShimLoader.HIVE_V2_VERSION_NAME: - return new HiveRunnerShimV4(); - default: - throw new RuntimeException("Unsupported Hive version " + v); - } - }); - } } diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveRunnerShimLoader.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV235.java similarity index 50% copy from flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveRunnerShimLoader.java copy to flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV235.java index 3bf2b74..2d956b2 100644 --- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveRunnerShimLoader.java +++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV235.java @@ -16,34 +16,11 @@ * limitations under the License. */ -package org.apache.flink.connectors.hive; - -import org.apache.flink.table.catalog.hive.client.HiveShimLoader; - -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; +package org.apache.flink.table.catalog.hive.client; /** - * Loader to load proper HiveRunnerShim. + * Shim for Hive version 2.3.5. */ -public class HiveRunnerShimLoader { - - private static final Map<String, HiveRunnerShim> hiveRunnerShims = new ConcurrentHashMap<>(); - - private HiveRunnerShimLoader() { - } +public class HiveShimV235 extends HiveShimV234 { - public static HiveRunnerShim load() { - String hiveVersion = HiveShimLoader.getHiveVersion(); - return hiveRunnerShims.computeIfAbsent(hiveVersion, v -> { - switch (v) { - case HiveShimLoader.HIVE_V1_VERSION_NAME: - return new HiveRunnerShimV3(); - case HiveShimLoader.HIVE_V2_VERSION_NAME: - return new HiveRunnerShimV4(); - default: - throw new RuntimeException("Unsupported Hive version " + v); - } - }); - } } diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveRunnerShimLoader.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveRunnerShimLoader.java index 3bf2b74..2f9f76f 100644 --- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveRunnerShimLoader.java +++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveRunnerShimLoader.java @@ -37,9 +37,16 @@ public class HiveRunnerShimLoader { String hiveVersion = HiveShimLoader.getHiveVersion(); return hiveRunnerShims.computeIfAbsent(hiveVersion, v -> { switch (v) { - case HiveShimLoader.HIVE_V1_VERSION_NAME: + case HiveShimLoader.HIVE_VERSION_V1_2_0: + case HiveShimLoader.HIVE_VERSION_V1_2_1: + case HiveShimLoader.HIVE_VERSION_V1_2_2: return new HiveRunnerShimV3(); - case HiveShimLoader.HIVE_V2_VERSION_NAME: + case HiveShimLoader.HIVE_VERSION_V2_3_0: + case HiveShimLoader.HIVE_VERSION_V2_3_1: + case HiveShimLoader.HIVE_VERSION_V2_3_2: + case HiveShimLoader.HIVE_VERSION_V2_3_3: + case HiveShimLoader.HIVE_VERSION_V2_3_4: + case HiveShimLoader.HIVE_VERSION_V2_3_5: return new HiveRunnerShimV4(); default: throw new RuntimeException("Unsupported Hive version " + v);