This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new 508c7a7040d [fix](hive)Modify the Hive notification event processing
method when using meta cache and add parameters to the Hive catalog. (#39239)
(#39865)
508c7a7040d is described below
commit 508c7a7040d53adb14e39ca7145faadcf001dec5
Author: Mingyu Chen <[email protected]>
AuthorDate: Fri Aug 23 23:21:02 2024 +0800
[fix](hive)Modify the Hive notification event processing method when using
meta cache and add parameters to the Hive catalog. (#39239) (#39865)
bp #39239
Co-authored-by: daidai <[email protected]>
---
.../docker-compose/hive/hadoop-hive.env.tpl | 3 +
.../main/java/org/apache/doris/catalog/Env.java | 5 +-
.../org/apache/doris/datasource/CatalogMgr.java | 14 -
.../apache/doris/datasource/ExternalCatalog.java | 10 +-
.../apache/doris/datasource/ExternalDatabase.java | 12 +-
.../doris/datasource/hive/HMSExternalCatalog.java | 30 +-
.../datasource/hive/event/AlterPartitionEvent.java | 4 +-
.../doris/datasource/hive/event/IgnoredEvent.java | 2 +-
.../doris/datasource/hive/event/InsertEvent.java | 12 -
.../datasource/hive/event/MetastoreEvent.java | 12 +
.../hive/event/MetastoreEventFactory.java | 3 +
.../hive/event/MetastoreEventsProcessor.java | 17 +-
.../doris/datasource/metacache/MetaCache.java | 7 +-
.../property/constants/HMSProperties.java | 3 +-
regression-test/pipeline/external/conf/fe.conf | 2 +
.../hive/ddl/test_hive_ctas.groovy | 2 +-
.../hive/test_hms_event_notification.groovy | 392 ++++++++++++
...est_hms_event_notification_multi_catalog.groovy | 676 +++++++++++++++++++++
.../hive/write/test_hive_write_insert.groovy | 2 +-
.../hive/write/test_hive_write_partitions.groovy | 2 +-
20 files changed, 1157 insertions(+), 53 deletions(-)
diff --git a/docker/thirdparties/docker-compose/hive/hadoop-hive.env.tpl
b/docker/thirdparties/docker-compose/hive/hadoop-hive.env.tpl
index 0e074228410..b7e662f5e52 100644
--- a/docker/thirdparties/docker-compose/hive/hadoop-hive.env.tpl
+++ b/docker/thirdparties/docker-compose/hive/hadoop-hive.env.tpl
@@ -28,6 +28,9 @@ HIVE_SITE_CONF_hive_server2_webui_port=0
HIVE_SITE_CONF_hive_compactor_initiator_on=true
HIVE_SITE_CONF_hive_compactor_worker_threads=2
HIVE_SITE_CONF_metastore_storage_schema_reader_impl=org.apache.hadoop.hive.metastore.SerDeStorageSchemaReader
+HIVE_SITE_CONF_hive_metastore_event_db_notification_api_auth=false
+HIVE_SITE_CONF_hive_metastore_dml_events=true
+HIVE_SITE_CONF_hive_metastore_transactional_event_listeners=org.apache.hive.hcatalog.listener.DbNotificationListener
CORE_CONF_fs_defaultFS=hdfs://${IP_HOST}:${FS_PORT}
CORE_CONF_hadoop_http_staticuser_user=root
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
index bda81faca2d..36590775a52 100755
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
@@ -1755,9 +1755,8 @@ public class Env {
domainResolver.start();
// fe disk updater
feDiskUpdater.start();
- if (Config.enable_hms_events_incremental_sync) {
- metastoreEventsProcessor.start();
- }
+
+ metastoreEventsProcessor.start();
dnsCache.start();
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogMgr.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogMgr.java
index 887b62151ce..6989ec23851 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogMgr.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogMgr.java
@@ -655,13 +655,6 @@ public class CatalogMgr implements Writable,
GsonPostProcessable {
return;
}
- TableIf table = db.getTableNullable(tableName);
- if (table != null) {
- if (!ignoreIfExists) {
- throw new DdlException("Table " + tableName + " has exist in
db " + dbName);
- }
- return;
- }
long tblId;
HMSExternalCatalog hmsCatalog = (HMSExternalCatalog) catalog;
if (hmsCatalog.getUseMetaCache().get()) {
@@ -712,13 +705,6 @@ public class CatalogMgr implements Writable,
GsonPostProcessable {
if (!(catalog instanceof ExternalCatalog)) {
throw new DdlException("Only support create ExternalCatalog
databases");
}
- DatabaseIf db = catalog.getDbNullable(dbName);
- if (db != null) {
- if (!ignoreIfExists) {
- throw new DdlException("Database " + dbName + " has exist in
catalog " + catalog.getName());
- }
- return;
- }
HMSExternalCatalog hmsCatalog = (HMSExternalCatalog) catalog;
long dbId;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java
index 974f052bb1c..b36644cde71 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java
@@ -289,11 +289,11 @@ public abstract class ExternalCatalog
}
}
- if (properties.getOrDefault(ExternalCatalog.USE_META_CACHE,
"true").equals("false")) {
- LOG.warn("force to set use_meta_cache to true for catalog: {} when
creating", name);
- getCatalogProperty().addProperty(ExternalCatalog.USE_META_CACHE,
"true");
- useMetaCache = Optional.of(true);
- }
+ // if (properties.getOrDefault(ExternalCatalog.USE_META_CACHE,
"true").equals("false")) {
+ // LOG.warn("force to set use_meta_cache to true for catalog: {}
when creating", name);
+ //
getCatalogProperty().addProperty(ExternalCatalog.USE_META_CACHE, "true");
+ // useMetaCache = Optional.of(true);
+ // }
}
/**
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalDatabase.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalDatabase.java
index dc6f9aaea73..d653a5a178e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalDatabase.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalDatabase.java
@@ -451,13 +451,14 @@ public abstract class ExternalDatabase<T extends
ExternalTable>
@Override
public void unregisterTable(String tableName) {
+ makeSureInitialized();
if (LOG.isDebugEnabled()) {
LOG.debug("create table [{}]", tableName);
}
if (extCatalog.getUseMetaCache().get()) {
if (isInitialized()) {
- metaCache.invalidate(tableName);
+ metaCache.invalidate(tableName,
Util.genIdByName(getQualifiedName(tableName)));
}
} else {
Long tableId = tableNameToId.remove(tableName);
@@ -480,6 +481,7 @@ public abstract class ExternalDatabase<T extends
ExternalTable>
// Only used for sync hive metastore event
@Override
public boolean registerTable(TableIf tableIf) {
+ makeSureInitialized();
long tableId = tableIf.getId();
String tableName = tableIf.getName();
if (LOG.isDebugEnabled()) {
@@ -487,11 +489,13 @@ public abstract class ExternalDatabase<T extends
ExternalTable>
}
if (extCatalog.getUseMetaCache().get()) {
if (isInitialized()) {
- metaCache.updateCache(tableName, (T) tableIf);
+ metaCache.updateCache(tableName, (T) tableIf,
Util.genIdByName(getQualifiedName(tableName)));
}
} else {
- tableNameToId.put(tableName, tableId);
- idToTbl.put(tableId, buildTableForInit(tableName, tableId,
extCatalog));
+ if (!tableNameToId.containsKey(tableName)) {
+ tableNameToId.put(tableName, tableId);
+ idToTbl.put(tableId, buildTableForInit(tableName, tableId,
extCatalog));
+ }
}
setLastUpdateTime(System.currentTimeMillis());
return true;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalCatalog.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalCatalog.java
index be9bf388adb..5faf1f2bb6e 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalCatalog.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalCatalog.java
@@ -25,6 +25,7 @@ import org.apache.doris.common.DdlException;
import org.apache.doris.common.ThreadPoolManager;
import org.apache.doris.common.security.authentication.AuthenticationConfig;
import org.apache.doris.common.security.authentication.HadoopAuthenticator;
+import org.apache.doris.common.util.Util;
import org.apache.doris.datasource.CatalogProperty;
import org.apache.doris.datasource.ExternalCatalog;
import org.apache.doris.datasource.ExternalDatabase;
@@ -73,6 +74,10 @@ public class HMSExternalCatalog extends ExternalCatalog {
@Getter
private HadoopAuthenticator authenticator;
+ private int hmsEventsBatchSizePerRpc = -1;
+ private boolean enableHmsEventsIncrementalSync = false;
+
+
@VisibleForTesting
public HMSExternalCatalog() {
catalogProperty = new CatalogProperty(null, null);
@@ -100,6 +105,19 @@ public class HMSExternalCatalog extends ExternalCatalog {
throw new DdlException(
"The parameter " + FILE_META_CACHE_TTL_SECOND + " is
wrong, value is " + fileMetaCacheTtlSecond);
}
+ Map<String, String> properties = catalogProperty.getProperties();
+ if
(properties.containsKey(HMSProperties.ENABLE_HMS_EVENTS_INCREMENTAL_SYNC)) {
+ enableHmsEventsIncrementalSync =
+
properties.get(HMSProperties.ENABLE_HMS_EVENTS_INCREMENTAL_SYNC).equals("true");
+ } else {
+ enableHmsEventsIncrementalSync =
Config.enable_hms_events_incremental_sync;
+ }
+
+ if
(properties.containsKey(HMSProperties.HMS_EVENTIS_BATCH_SIZE_PER_RPC)) {
+ hmsEventsBatchSizePerRpc =
Integer.valueOf(properties.get(HMSProperties.HMS_EVENTIS_BATCH_SIZE_PER_RPC));
+ } else {
+ hmsEventsBatchSizePerRpc = Config.hms_events_batch_size_per_rpc;
+ }
// check the dfs.ha properties
// 'dfs.nameservices'='your-nameservice',
@@ -212,7 +230,7 @@ public class HMSExternalCatalog extends ExternalCatalog {
}
if (useMetaCache.get()) {
if (isInitialized()) {
- metaCache.invalidate(dbName);
+ metaCache.invalidate(dbName,
Util.genIdByName(getQualifiedName(dbName)));
}
} else {
Long dbId = dbNameToId.remove(dbName);
@@ -233,7 +251,7 @@ public class HMSExternalCatalog extends ExternalCatalog {
ExternalDatabase<? extends ExternalTable> db = buildDbForInit(dbName,
dbId, logType);
if (useMetaCache.get()) {
if (isInitialized()) {
- metaCache.updateCache(dbName, db);
+ metaCache.updateCache(dbName, db,
Util.genIdByName(getQualifiedName(dbName)));
}
} else {
dbNameToId.put(dbName, dbId);
@@ -266,4 +284,12 @@ public class HMSExternalCatalog extends ExternalCatalog {
public String getHiveVersion() {
return catalogProperty.getOrDefault(HMSProperties.HIVE_VERSION, "");
}
+
+ public int getHmsEventsBatchSizePerRpc() {
+ return hmsEventsBatchSizePerRpc;
+ }
+
+ public boolean isEnableHmsEventsIncrementalSync() {
+ return enableHmsEventsIncrementalSync;
+ }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/AlterPartitionEvent.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/AlterPartitionEvent.java
index 6be0215f143..569d9878d7a 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/AlterPartitionEvent.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/AlterPartitionEvent.java
@@ -145,9 +145,7 @@ public class AlterPartitionEvent extends
MetastorePartitionEvent {
// `that` event can be batched if this event's partitions contains all
of the partitions which `that` event has
// else just remove `that` event's relevant partitions
for (String partitionName : getAllPartitionNames()) {
- if (thatPartitionEvent instanceof AddPartitionEvent) {
- ((AddPartitionEvent)
thatPartitionEvent).removePartition(partitionName);
- } else if (thatPartitionEvent instanceof DropPartitionEvent) {
+ if (thatPartitionEvent instanceof DropPartitionEvent) {
((DropPartitionEvent)
thatPartitionEvent).removePartition(partitionName);
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/IgnoredEvent.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/IgnoredEvent.java
index d504c2917f9..e7e6643e647 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/IgnoredEvent.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/IgnoredEvent.java
@@ -28,7 +28,7 @@ import java.util.List;
*/
public class IgnoredEvent extends MetastoreEvent {
private IgnoredEvent(NotificationEvent event, String catalogName) {
- super(event, catalogName);
+ super(event);
}
protected static List<MetastoreEvent> getEvents(NotificationEvent event,
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/InsertEvent.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/InsertEvent.java
index f793ab8b068..7b76d4913d5 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/InsertEvent.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/InsertEvent.java
@@ -24,8 +24,6 @@ import org.apache.doris.common.DdlException;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import org.apache.hadoop.hive.metastore.api.NotificationEvent;
-import org.apache.hadoop.hive.metastore.api.Table;
-import org.apache.hadoop.hive.metastore.messaging.InsertMessage;
import java.util.List;
@@ -33,13 +31,11 @@ import java.util.List;
* MetastoreEvent for INSERT event type
*/
public class InsertEvent extends MetastoreTableEvent {
- private final Table hmsTbl;
// for test
public InsertEvent(long eventId, String catalogName, String dbName,
String tblName) {
super(eventId, catalogName, dbName, tblName,
MetastoreEventType.INSERT);
- this.hmsTbl = null;
}
private InsertEvent(NotificationEvent event, String catalogName) {
@@ -47,14 +43,6 @@ public class InsertEvent extends MetastoreTableEvent {
Preconditions.checkArgument(getEventType().equals(MetastoreEventType.INSERT));
Preconditions
.checkNotNull(event.getMessage(), debugString("Event message
is null"));
- try {
- InsertMessage insertMessage =
-
MetastoreEventsProcessor.getMessageDeserializer(event.getMessageFormat())
- .getInsertMessage(event.getMessage());
- hmsTbl = Preconditions.checkNotNull(insertMessage.getTableObj());
- } catch (Exception ex) {
- throw new MetastoreNotificationException(ex);
- }
}
protected static List<MetastoreEvent> getEvents(NotificationEvent event,
String catalogName) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreEvent.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreEvent.java
index 04b0ccab799..695dd57b215 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreEvent.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreEvent.java
@@ -78,6 +78,18 @@ public abstract class MetastoreEvent {
this.event = null;
}
+ // for IgnoredEvent
+ protected MetastoreEvent(NotificationEvent event) {
+ this.event = event;
+ this.metastoreNotificationEvent = event;
+ this.eventId = -1;
+ this.eventTime = -1L;
+ this.catalogName = null;
+ this.dbName = null;
+ this.tblName = null;
+ this.eventType = null;
+ }
+
protected MetastoreEvent(NotificationEvent event, String catalogName) {
this.event = event;
// Some events that we don't care about, dbName may be empty
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreEventFactory.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreEventFactory.java
index 493f1f7cb71..7f697cf9738 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreEventFactory.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreEventFactory.java
@@ -46,6 +46,9 @@ public class MetastoreEventFactory implements EventFactory {
String catalogName) {
Preconditions.checkNotNull(event.getEventType());
MetastoreEventType metastoreEventType =
MetastoreEventType.from(event.getEventType());
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("catalogName = {}, Event = {}", catalogName,
event.toString());
+ }
switch (metastoreEventType) {
case CREATE_TABLE:
return CreateTableEvent.getEvents(event, catalogName);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreEventsProcessor.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreEventsProcessor.java
index 6e12c35e2b8..cbd0bfb5fa6 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreEventsProcessor.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreEventsProcessor.java
@@ -115,6 +115,9 @@ public class MetastoreEventsProcessor extends MasterDaemon {
CatalogIf catalog =
Env.getCurrentEnv().getCatalogMgr().getCatalog(catalogId);
if (catalog instanceof HMSExternalCatalog) {
HMSExternalCatalog hmsExternalCatalog = (HMSExternalCatalog)
catalog;
+ if (!hmsExternalCatalog.isEnableHmsEventsIncrementalSync()) {
+ continue;
+ }
try {
List<NotificationEvent> events =
getNextHMSEvents(hmsExternalCatalog);
if (!events.isEmpty()) {
@@ -125,6 +128,8 @@ public class MetastoreEventsProcessor extends MasterDaemon {
} catch (MetastoreNotificationFetchException e) {
LOG.warn("Failed to fetch hms events on {}. msg: ",
hmsExternalCatalog.getName(), e);
} catch (Exception ex) {
+ hmsExternalCatalog.onRefreshCache(true);
+ updateLastSyncedEventId(hmsExternalCatalog, -1);
LOG.warn("Failed to process hive metastore [{}] events .",
hmsExternalCatalog.getName(), ex);
}
@@ -147,7 +152,7 @@ public class MetastoreEventsProcessor extends MasterDaemon {
response = getNextEventResponseForSlave(hmsExternalCatalog);
}
- if (response == null) {
+ if (response == null || response.getEventsSize() == 0) {
return Collections.emptyList();
}
return response.getEvents();
@@ -207,9 +212,15 @@ public class MetastoreEventsProcessor extends MasterDaemon
{
return null;
}
+ int batchSize = hmsExternalCatalog.getHmsEventsBatchSizePerRpc();
try {
- return
hmsExternalCatalog.getClient().getNextNotification(lastSyncedEventId,
- Config.hms_events_batch_size_per_rpc, null);
+ NotificationEventResponse notificationEventResponse =
+
hmsExternalCatalog.getClient().getNextNotification(lastSyncedEventId,
batchSize, null);
+ LOG.info("CatalogName = {}, lastSyncedEventId = {}, currentEventId
= {},"
+ + "batchSize = {}, getEventsSize = {}",
hmsExternalCatalog.getName(), lastSyncedEventId,
+ currentEventId, batchSize,
notificationEventResponse.getEvents().size());
+
+ return notificationEventResponse;
} catch (MetastoreNotificationFetchException e) {
// Need a fallback to handle this because this error state can not
be recovered until restarting FE
if (StringUtils.isNotEmpty(e.getMessage())
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/metacache/MetaCache.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/metacache/MetaCache.java
index e3ad8668fb5..6e4198186e8 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/metacache/MetaCache.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/metacache/MetaCache.java
@@ -90,7 +90,7 @@ public class MetaCache<T> {
return name == null ? Optional.empty() : getMetaObj(name, id);
}
- public void updateCache(String objName, T obj) {
+ public void updateCache(String objName, T obj, long id) {
metaObjCache.put(objName, Optional.of(obj));
namesCache.asMap().compute("", (k, v) -> {
if (v == null) {
@@ -100,9 +100,10 @@ public class MetaCache<T> {
return v;
}
});
+ idToName.put(id, objName);
}
- public void invalidate(String objName) {
+ public void invalidate(String objName, long id) {
namesCache.asMap().compute("", (k, v) -> {
if (v == null) {
return Lists.newArrayList();
@@ -112,11 +113,13 @@ public class MetaCache<T> {
}
});
metaObjCache.invalidate(objName);
+ idToName.remove(id);
}
public void invalidateAll() {
namesCache.invalidateAll();
metaObjCache.invalidateAll();
+ idToName.clear();
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/constants/HMSProperties.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/constants/HMSProperties.java
index 050ed1d5414..81baf042fae 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/constants/HMSProperties.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/constants/HMSProperties.java
@@ -28,5 +28,6 @@ public class HMSProperties {
// required
public static final String HIVE_METASTORE_URIS = "hive.metastore.uris";
public static final List<String> REQUIRED_FIELDS =
Collections.singletonList(HMSProperties.HIVE_METASTORE_URIS);
-
+ public static final String ENABLE_HMS_EVENTS_INCREMENTAL_SYNC =
"hive.enable_hms_events_incremental_sync";
+ public static final String HMS_EVENTIS_BATCH_SIZE_PER_RPC =
"hive.hms_events_batch_size_per_rpc";
}
diff --git a/regression-test/pipeline/external/conf/fe.conf
b/regression-test/pipeline/external/conf/fe.conf
index e325acfbb1c..b876ba40e28 100644
--- a/regression-test/pipeline/external/conf/fe.conf
+++ b/regression-test/pipeline/external/conf/fe.conf
@@ -97,4 +97,6 @@ enable_feature_binlog=true
auth_token = 5ff161c3-2c08-4079-b108-26c8850b6598
infodb_support_ext_catalog=true
+hms_events_polling_interval_ms=2000
+
KRB5_CONFIG=/keytabs/krb5.conf
diff --git
a/regression-test/suites/external_table_p0/hive/ddl/test_hive_ctas.groovy
b/regression-test/suites/external_table_p0/hive/ddl/test_hive_ctas.groovy
index deebb781f19..265d200984e 100644
--- a/regression-test/suites/external_table_p0/hive/ddl/test_hive_ctas.groovy
+++ b/regression-test/suites/external_table_p0/hive/ddl/test_hive_ctas.groovy
@@ -22,7 +22,7 @@ suite("test_hive_ctas",
"p0,external,hive,external_docker,external_docker_hive")
return;
}
- for (String hivePrefix : ["hive2", "hive3"]) {
+ for (String hivePrefix : [ "hive3"]) {
def file_formats = ["parquet", "orc"]
setHivePrefix(hivePrefix)
def generateSrcDDLForCTAS = { String file_format, String catalog_name
->
diff --git
a/regression-test/suites/external_table_p0/hive/test_hms_event_notification.groovy
b/regression-test/suites/external_table_p0/hive/test_hms_event_notification.groovy
new file mode 100644
index 00000000000..52724b807d3
--- /dev/null
+++
b/regression-test/suites/external_table_p0/hive/test_hms_event_notification.groovy
@@ -0,0 +1,392 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_hms_event_notification",
"p0,external,hive,external_docker,external_docker_hive") {
+ String enabled = context.config.otherConfigs.get("enableHiveTest")
+ if (enabled == null || !enabled.equalsIgnoreCase("true")) {
+ logger.info("diable Hive test.")
+ return;
+ }
+ for (String useMetaCache : ["true","false"] ) {
+ for (String hivePrefix : [ "hive2","hive3"]) {
+ try {
+ setHivePrefix(hivePrefix)
+ hive_docker """ set hive.stats.autogather=false; """
+
+
+ String hms_port = context.config.otherConfigs.get(hivePrefix +
"HmsPort")
+ String catalog_name = "test_hms_event_notification_${hivePrefix}"
+ String catalog_name_2 =
"test_hms_event_notification_${hivePrefix}_2"
+ String externalEnvIp =
context.config.otherConfigs.get("externalEnvIp")
+ int wait_time = 10000;
+
+ sql """drop catalog if exists ${catalog_name}"""
+ sql """create catalog if not exists ${catalog_name} properties (
+ "type"="hms",
+ 'hive.metastore.uris' =
'thrift://${externalEnvIp}:${hms_port}',
+ "use_meta_cache" = "${useMetaCache}",
+ "hive.enable_hms_events_incremental_sync" ="true",
+ "hive.hms_events_batch_size_per_rpc" = "1000"
+ );"""
+
+ sql """create catalog if not exists ${catalog_name_2} properties (
+ "type"="hms",
+ 'hive.metastore.uris' =
'thrift://${externalEnvIp}:${hms_port}',
+ "use_meta_cache" = "${useMetaCache}",
+ "hive.enable_hms_events_incremental_sync" ="true"
+ );"""
+
+ sleep(wait_time);
+
+ sql """ switch ${catalog_name} """
+
+ String tb1 = """${catalog_name}_tb_1"""
+ String tb2 = """${catalog_name}_tb_2"""
+ String db1 = "${catalog_name}_db_1";
+ String db2 = "${catalog_name}_db_2";
+ String partition_tb = "${catalog_name}_partition_tb";
+
+ try {
+ hive_docker """ use ${db1};"""
+ }catch (Exception e){
+ }
+
+ hive_docker """ drop table if exists ${tb1};"""
+ hive_docker """ drop table if exists ${tb2};"""
+ hive_docker """ drop table if exists ${partition_tb} """
+ hive_docker """ drop database if exists ${db1};"""
+ hive_docker """ drop database if exists ${db2};"""
+
+//CREATE DATABASE
+ hive_docker """ create database ${db1};"""
+ hive_docker """ create database ${db2};"""
+ sleep(wait_time);
+
+ List<List<String>> dbs = sql """ show databases """
+ logger.info("result = " + dbs);
+
+ int flag_db_count = 0 ;
+ dbs.forEach {
+ if (it[0] == db1) {
+ flag_db_count ++;
+ }else if (it[0] == db2) {
+ flag_db_count ++;
+ }
+ }
+ assertTrue(flag_db_count == 2);
+
+
+
+
+//ALTER DATABASE
+ if (hivePrefix == "hive3") {
+ String db2_location = (sql """ SHOW CREATE DATABASE ${db2}
""")[0][1]
+ logger.info("db2 location = " + db2_location )
+
+ def loc_start = db2_location.indexOf("hdfs://")
+ def loc_end = db2_location.indexOf(".db") + 3
+ db2_location = db2_location.substring(loc_start, loc_end)
+ logger.info("db2 location = " + db2_location )
+
+ String new_db2_location = db2_location.replace("warehouse",
"new_warehouse_xxx")
+ logger.info("change db2 location to ${new_db2_location} ")
+
+ logger.info(" alter database begin")
+ hive_docker """ ALTER DATABASE ${db2} SET LOCATION
'${new_db2_location}'; """
+ logger.info(" alter database end")
+ sleep(wait_time);
+
+ String query_db2_location = (sql """ SHOW CREATE DATABASE
${db2} """)[0][1]
+ logger.info("query_db2_location = ${query_db2_location} ")
+
+ loc_start = query_db2_location.indexOf("hdfs://")
+ loc_end = query_db2_location.indexOf(".db") + 3
+ query_db2_location = query_db2_location.substring(loc_start,
loc_end)
+
+ assertTrue(query_db2_location == new_db2_location);
+ }
+
+
+//DROP DATABASE
+ hive_docker """drop database ${db2}; """;
+ sleep(wait_time);
+ dbs = sql """ show databases """
+ logger.info("result = " + dbs);
+ flag_db_count = 0 ;
+ dbs.forEach {
+ if (it[0].toString() == db1) {
+ flag_db_count ++;
+ } else if (it[0].toString() == db2) {
+ logger.info(" exists ${db2}")
+ assertTrue(false);
+ }
+ }
+ assertTrue(flag_db_count == 1);
+
+
+//CREATE TABLE
+ hive_docker """ use ${db1} """
+ sql """ use ${db1} """
+ List<List<String>> tbs = sql """ show tables; """
+ logger.info(" tbs = ${tbs}")
+ assertTrue(tbs.isEmpty())
+
+
+ hive_docker """ create table ${tb1} (id int,name string) ;"""
+ hive_docker """ create table ${tb2} (id int,name string) ;"""
+ sleep(wait_time);
+ tbs = sql """ show tables; """
+ logger.info(" tbs = ${tbs}")
+ int flag_tb_count = 0 ;
+ tbs.forEach {
+ logger.info("it[0] = " + it[0])
+ if (it[0].toString() == "${tb1}") {
+ flag_tb_count ++;
+ logger.info(" ${tb1} exists ")
+ }else if (it[0].toString() == tb2) {
+ flag_tb_count ++;
+ logger.info(" ${tb2} exists ")
+ }
+ }
+ assertTrue(flag_tb_count == 2);
+
+
+//ALTER TABLE
+ List<List<String>> ans = sql """ select * from ${tb1} """
+ logger.info("ans = ${ans}")
+ assertTrue(ans.isEmpty())
+
+ hive_docker """ insert into ${tb1} select 1,"xxx"; """
+ sleep(wait_time);
+ ans = sql """ select * from ${tb1} """
+ logger.info("ans = ${ans}")
+ assertTrue(ans.size() == 1)
+ assertTrue(ans[0][0].toString() == "1")
+ assertTrue(ans[0][1].toString() == "xxx")
+
+
+ hive_docker """ insert into ${tb1} values( 2,"yyy"); """
+ sleep(wait_time);
+ ans = sql """ select * from ${tb1} order by id """
+ logger.info("ans = ${ans}")
+ assertTrue(ans.size() == 2)
+ assertTrue(ans[0][0].toString() == "1")
+ assertTrue(ans[0][1].toString() == "xxx")
+ assertTrue(ans[1][0].toString() == "2")
+ assertTrue(ans[1][1].toString() == "yyy")
+
+
+ ans = sql """ desc ${tb1} """
+ logger.info("ans = ${ans}")
+ assertTrue(ans.size() == 2)
+ assertTrue(ans[0][0].toString() == "id")
+ assertTrue(ans[0][1].toString() == "int")
+ assertTrue(ans[1][0].toString() == "name")
+ assertTrue(ans[1][1].toString() == "text")
+
+ hive_docker """ alter table ${tb1} change column id id bigint; """
+ sleep(wait_time);
+ ans = sql """ desc ${tb1} """
+ logger.info("ans = ${ans}")
+ assertTrue(ans.size() == 2)
+ assertTrue(ans[0][0].toString() == "id")
+ assertTrue(ans[0][1].toString() == "bigint")
+ assertTrue(ans[1][0].toString() == "name")
+ assertTrue(ans[1][1].toString() == "text")
+ ans = sql """ select * from ${tb1} order by id """
+ logger.info("ans = ${ans}")
+ assertTrue(ans.size() == 2)
+ assertTrue(ans[0][0].toString() == "1")
+ assertTrue(ans[0][1].toString() == "xxx")
+ assertTrue(ans[1][0].toString() == "2")
+ assertTrue(ans[1][1].toString() == "yyy")
+
+
+
+ hive_docker """ alter table ${tb1} change column name new_name
string; """
+ sleep(wait_time);
+ ans = sql """ desc ${tb1} """
+ logger.info("ans = ${ans}")
+ assertTrue(ans.size() == 2)
+ assertTrue(ans[0][0].toString() == "id")
+ assertTrue(ans[0][1].toString() == "bigint")
+ assertTrue(ans[1][0].toString() == "new_name")
+ assertTrue(ans[1][1].toString() == "text")
+ ans = sql """ select * from ${tb1} order by id """
+ logger.info("ans = ${ans}")
+ assertTrue(ans.size() == 2)
+ assertTrue(ans[0][0].toString() == "1")
+ assertTrue(ans[0][1].toString() == "xxx")
+ assertTrue(ans[1][0].toString() == "2")
+ assertTrue(ans[1][1].toString() == "yyy")
+
+
+//DROP TABLE
+ hive_docker """ drop table ${tb2} """
+ sleep(wait_time);
+ tbs = sql """ show tables; """
+
+ logger.info(""" tbs = ${tbs}""")
+
+ flag_tb_count = 0 ;
+ tbs.forEach {
+ if (it[0] == tb1) {
+ flag_tb_count ++;
+ } else if (it[0] == tb2) {
+ logger.info("exists ${tb1}")
+ assertTrue(false);
+ }
+ }
+ assertTrue(flag_tb_count == 1);
+
+
+
+ hive_docker """ drop table ${tb1} """
+ sleep(wait_time);
+ tbs = sql """ show tables; """
+
+ logger.info(""" tbs = ${tbs}""")
+
+ tbs.forEach {
+ if (it[0] == tb1) {
+ logger.info("exists ${tb1}")
+ assertTrue(false);
+ } else if (it[0] == tb2) {
+ logger.info("exists ${tb2}")
+ assertTrue(false);
+ }
+ }
+
+//ADD PARTITION
+
+ hive_docker """ use ${db1} """
+ sql """ use ${db1} """
+
+ hive_docker """ CREATE TABLE ${partition_tb} (
+ id INT,
+ name STRING,
+ age INT
+ )
+ PARTITIONED BY (country STRING); """
+ hive_docker """
+ INSERT INTO TABLE ${partition_tb} PARTITION (country='USA')
+ VALUES (1, 'John Doe', 30),
+ (2, 'Jane Smith', 25);"""
+
+ hive_docker """
+ INSERT INTO TABLE ${partition_tb} PARTITION (country='India')
+ VALUES (3, 'Rahul Kumar', 28),
+ (4, 'Priya Singh', 24);
+ """
+ sleep(wait_time);
+ ans = sql """ select * from ${partition_tb} order by id"""
+ logger.info("ans = ${ans}")
+ assertTrue(ans.size() == 4)
+ assertTrue(ans[0][0].toString() == "1")
+ assertTrue(ans[0][3].toString() == "USA")
+ assertTrue(ans[1][3].toString() == "USA")
+ assertTrue(ans[3][0].toString() == "4")
+ assertTrue(ans[2][3].toString() == "India")
+ assertTrue(ans[3][3].toString() == "India")
+
+
+ List<List<String>> pars = sql """ SHOW PARTITIONS from
${partition_tb}; """
+ logger.info("pars = ${pars}")
+ assertTrue(pars.size() == 2)
+ int flag_partition_count = 0 ;
+ pars.forEach {
+ if (it[0] == "country=India") {
+ flag_partition_count ++;
+ } else if (it[0] == "country=USA") {
+ flag_partition_count ++;
+ }
+ }
+ assertTrue(flag_partition_count ==2)
+
+
+ hive_docker """
+ ALTER TABLE ${partition_tb} ADD PARTITION (country='Canada');
+ """
+ sleep(wait_time);
+ pars = sql """ SHOW PARTITIONS from ${partition_tb}; """
+ logger.info("pars = ${pars}")
+ assertTrue(pars.size() == 3)
+ flag_partition_count = 0 ;
+ pars.forEach {
+ if (it[0].toString() == "country=India") {
+ flag_partition_count ++;
+ } else if (it[0].toString() == "country=USA") {
+ flag_partition_count ++;
+ } else if (it[0].toString() == "country=Canada") {
+ flag_partition_count ++;
+ }
+ }
+ assertTrue(flag_partition_count ==3)
+
+
+//ALTER PARTITION
+ hive_docker """
+ alter table ${partition_tb} partition(country='USA') rename to
partition(country='US') ;
+ """
+ sleep(wait_time);
+ pars = sql """ SHOW PARTITIONS from ${partition_tb}; """
+ logger.info("pars = ${pars}")
+ assertTrue(pars.size() == 3)
+ flag_partition_count = 0 ;
+ pars.forEach {
+ if (it[0].toString() == "country=India") {
+ flag_partition_count ++;
+ } else if (it[0].toString() == "country=US") {
+ flag_partition_count ++;
+ } else if (it[0].toString() == "country=Canada") {
+ flag_partition_count ++;
+ }
+ }
+ assertTrue(flag_partition_count ==3)
+
+//DROP PARTITION
+ hive_docker """
+ ALTER TABLE ${partition_tb} DROP PARTITION (country='Canada');
+ """
+ sleep(wait_time);
+ pars = sql """ SHOW PARTITIONS from ${partition_tb}; """
+ logger.info("pars = ${pars}")
+ assertTrue(pars.size() == 2)
+ flag_partition_count = 0
+ pars.forEach {
+ if (it[0].toString() == "country=India") {
+ flag_partition_count ++;
+ } else if (it[0].toString() == "country=US") {
+ flag_partition_count ++;
+ } else if (it[0].toString() == "country=Canada") {
+ logger.info("exists partition canada")
+ assertTrue(false);
+ }
+ }
+ assertTrue(flag_partition_count ==2)
+
+
+ sql """drop catalog if exists ${catalog_name}"""
+ } finally {
+ }
+ }
+ }
+}
+
+
+
+
diff --git
a/regression-test/suites/external_table_p0/hive/test_hms_event_notification_multi_catalog.groovy
b/regression-test/suites/external_table_p0/hive/test_hms_event_notification_multi_catalog.groovy
new file mode 100644
index 00000000000..24c2ac3b7fb
--- /dev/null
+++
b/regression-test/suites/external_table_p0/hive/test_hms_event_notification_multi_catalog.groovy
@@ -0,0 +1,676 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_hms_event_notification_multi_catalog",
"p0,external,hive,external_docker,external_docker_hive") {
+ String enabled = context.config.otherConfigs.get("enableHiveTest")
+ if (enabled == null || !enabled.equalsIgnoreCase("true")) {
+ logger.info("diable Hive test.")
+ return;
+ }
+
+ for (String useMetaCache : ["true","false"] ) {
+
+ for (String hivePrefix : [ "hive2","hive3"]) {
+ try {
+ setHivePrefix(hivePrefix)
+ hive_docker """ set hive.stats.autogather=false; """
+
+
+ String hms_port = context.config.otherConfigs.get(hivePrefix +
"HmsPort")
+ String catalog_name =
"test_hms_event_notification_multi_catalog_${hivePrefix}"
+ String catalog_name_2 =
"test_hms_event_notification_multi_catalog_${hivePrefix}_2"
+ String externalEnvIp =
context.config.otherConfigs.get("externalEnvIp")
+ int wait_time = 10000;
+
+ sql """drop catalog if exists ${catalog_name}"""
+ sql """create catalog if not exists ${catalog_name} properties (
+ "type"="hms",
+ 'hive.metastore.uris' =
'thrift://${externalEnvIp}:${hms_port}',
+ "use_meta_cache" = "${useMetaCache}",
+ "hive.enable_hms_events_incremental_sync" ="true",
+ "hive.hms_events_batch_size_per_rpc" = "10000"
+ );"""
+
+ sql """drop catalog if exists ${catalog_name_2}"""
+ sql """create catalog if not exists ${catalog_name_2} properties (
+ "type"="hms",
+ 'hive.metastore.uris' =
'thrift://${externalEnvIp}:${hms_port}',
+ "use_meta_cache" = "${useMetaCache}",
+ "hive.enable_hms_events_incremental_sync" ="true",
+ "hive.hms_events_batch_size_per_rpc" = "100000"
+ );"""
+ sleep(wait_time);
+
+ sql """ switch ${catalog_name} """
+
+ String tb1 = """${catalog_name}_tb_1"""
+ String tb2 = """${catalog_name}_tb_2"""
+ String db1 = "${catalog_name}_db_1";
+ String db2 = "${catalog_name}_db_2";
+ String partition_tb = "${catalog_name}_partition_tb";
+
+ try {
+ hive_docker """ use ${db1};"""
+ }catch (Exception e){
+ }
+
+ hive_docker """ drop table if exists ${tb1};"""
+ hive_docker """ drop table if exists ${tb2};"""
+ hive_docker """ drop table if exists ${partition_tb} """
+ hive_docker """ drop database if exists ${db1};"""
+ hive_docker """ drop database if exists ${db2};"""
+
+//CREATE DATABASE
+ hive_docker """ create database ${db1};"""
+ hive_docker """ create database ${db2};"""
+ sleep(wait_time);
+
+ List<List<String>> dbs = sql """ show databases """
+ logger.info("result = " + dbs);
+
+ int flag_db_count = 0 ;
+ dbs.forEach {
+ if (it[0] == db1) {
+ flag_db_count ++;
+ }else if (it[0] == db2) {
+ flag_db_count ++;
+ }
+ }
+ assertTrue(flag_db_count == 2);
+
+ sql """ switch ${catalog_name_2} """
+ dbs = sql """ show databases """
+ logger.info("result = " + dbs);
+ flag_db_count = 0 ;
+ dbs.forEach {
+ if (it[0] == db1) {
+ flag_db_count ++;
+ }else if (it[0] == db2) {
+ flag_db_count ++;
+ }
+ }
+ assertTrue(flag_db_count == 2);
+
+ sql """ switch ${catalog_name} """
+
+
+
+//ALTER DATABASE
+ if (hivePrefix == "hive3") {
+ String db2_location = (sql """ SHOW CREATE DATABASE ${db2}
""")[0][1]
+ logger.info("db2 location = " + db2_location )
+
+ def loc_start = db2_location.indexOf("hdfs://")
+ def loc_end = db2_location.indexOf(".db") + 3
+ db2_location = db2_location.substring(loc_start, loc_end)
+ logger.info("db2 location = " + db2_location )
+
+ String new_db2_location = db2_location.replace("warehouse",
"new_warehouse_xxx")
+ logger.info("change db2 location to ${new_db2_location} ")
+
+ logger.info(" alter database begin")
+ hive_docker """ ALTER DATABASE ${db2} SET LOCATION
'${new_db2_location}'; """
+ logger.info(" alter database end")
+ sleep(wait_time);
+
+ String query_db2_location = (sql """ SHOW CREATE DATABASE
${db2} """)[0][1]
+ logger.info("query_db2_location = ${query_db2_location} ")
+
+ loc_start = query_db2_location.indexOf("hdfs://")
+ loc_end = query_db2_location.indexOf(".db") + 3
+ query_db2_location = query_db2_location.substring(loc_start,
loc_end)
+ assertTrue(query_db2_location == new_db2_location);
+
+
+
+ sql """ switch ${catalog_name_2} """
+ query_db2_location = (sql """ SHOW CREATE DATABASE ${db2}
""")[0][1]
+ logger.info("query_db2_location = ${query_db2_location} ")
+
+ loc_start = query_db2_location.indexOf("hdfs://")
+ loc_end = query_db2_location.indexOf(".db") + 3
+ query_db2_location = query_db2_location.substring(loc_start,
loc_end)
+ assertTrue(query_db2_location == new_db2_location);
+ sql """ switch ${catalog_name} """
+ }
+
+
+//DROP DATABASE
+ hive_docker """drop database ${db2}; """;
+ sleep(wait_time);
+ dbs = sql """ show databases """
+ logger.info("result = " + dbs);
+ flag_db_count = 0 ;
+ dbs.forEach {
+ if (it[0].toString() == db1) {
+ flag_db_count ++;
+ } else if (it[0].toString() == db2) {
+ logger.info(" exists ${db2}")
+ assertTrue(false);
+ }
+ }
+ assertTrue(flag_db_count == 1);
+
+ sql """ switch ${catalog_name_2} """
+ dbs = sql """ show databases """
+ logger.info("result = " + dbs);
+ flag_db_count = 0 ;
+ dbs.forEach {
+ if (it[0].toString() == db1) {
+ flag_db_count ++;
+ } else if (it[0].toString() == db2) {
+ logger.info(" exists ${db2}")
+ assertTrue(false);
+ }
+ }
+ assertTrue(flag_db_count == 1);
+ sql """ switch ${catalog_name} """
+
+//CREATE TABLE
+ hive_docker """ use ${db1} """
+ sql """ use ${db1} """
+ List<List<String>> tbs = sql """ show tables; """
+ logger.info(" tbs = ${tbs}")
+ assertTrue(tbs.isEmpty())
+
+
+ hive_docker """ create table ${tb1} (id int,name string) ;"""
+ hive_docker """ create table ${tb2} (id int,name string) ;"""
+ sleep(wait_time);
+ tbs = sql """ show tables; """
+ logger.info(" tbs = ${tbs}")
+ int flag_tb_count = 0 ;
+ tbs.forEach {
+ logger.info("it[0] = " + it[0])
+ if (it[0].toString() == "${tb1}") {
+ flag_tb_count ++;
+ logger.info(" ${tb1} exists ")
+ }else if (it[0].toString() == tb2) {
+ flag_tb_count ++;
+ logger.info(" ${tb2} exists ")
+ }
+ }
+ assertTrue(flag_tb_count == 2);
+
+
+ sql """ switch ${catalog_name_2} """
+ sql """ use ${db1} """
+ tbs = sql """ show tables; """
+ logger.info(" tbs = ${tbs}")
+ flag_tb_count = 0 ;
+ tbs.forEach {
+ logger.info("it[0] = " + it[0])
+ if (it[0].toString() == "${tb1}") {
+ flag_tb_count ++;
+ logger.info(" ${tb1} exists ")
+ }else if (it[0].toString() == tb2) {
+ flag_tb_count ++;
+ logger.info(" ${tb2} exists ")
+ }
+ }
+ assertTrue(flag_tb_count == 2);
+ sql """ switch ${catalog_name} """
+ sql """ use ${db1} """
+
+
+//ALTER TABLE
+ List<List<String>> ans = sql """ select * from ${tb1} """
+ logger.info("ans = ${ans}")
+ assertTrue(ans.isEmpty())
+
+ hive_docker """ insert into ${tb1} select 1,"xxx"; """
+ sleep(wait_time);
+ ans = sql """ select * from ${tb1} """
+ logger.info("ans = ${ans}")
+ assertTrue(ans.size() == 1)
+ assertTrue(ans[0][0].toString() == "1")
+ assertTrue(ans[0][1].toString() == "xxx")
+
+ sql """ switch ${catalog_name_2} """
+ sql """ use ${db1} """
+ ans = sql """ select * from ${tb1} """
+ logger.info("ans = ${ans}")
+ assertTrue(ans.size() == 1)
+ assertTrue(ans[0][0].toString() == "1")
+ assertTrue(ans[0][1].toString() == "xxx")
+
+ sql """ switch ${catalog_name} """
+ sql """ use ${db1} """
+
+ hive_docker """ insert into ${tb1} values( 2,"yyy"); """
+ sleep(wait_time);
+ ans = sql """ select * from ${tb1} order by id """
+ logger.info("ans = ${ans}")
+ assertTrue(ans.size() == 2)
+ assertTrue(ans[0][0].toString() == "1")
+ assertTrue(ans[0][1].toString() == "xxx")
+ assertTrue(ans[1][0].toString() == "2")
+ assertTrue(ans[1][1].toString() == "yyy")
+
+
+ ans = sql """ desc ${tb1} """
+ logger.info("ans = ${ans}")
+ assertTrue(ans.size() == 2)
+ assertTrue(ans[0][0].toString() == "id")
+ assertTrue(ans[0][1].toString() == "int")
+ assertTrue(ans[1][0].toString() == "name")
+ assertTrue(ans[1][1].toString() == "text")
+
+
+ sql """ switch ${catalog_name_2} """
+ sql """ use ${db1} """
+ ans = sql """ select * from ${tb1} order by id """
+ logger.info("ans = ${ans}")
+ assertTrue(ans.size() == 2)
+ assertTrue(ans[0][0].toString() == "1")
+ assertTrue(ans[0][1].toString() == "xxx")
+ assertTrue(ans[1][0].toString() == "2")
+ assertTrue(ans[1][1].toString() == "yyy")
+
+ ans = sql """ desc ${tb1} """
+ logger.info("ans = ${ans}")
+ assertTrue(ans.size() == 2)
+ assertTrue(ans[0][0].toString() == "id")
+ assertTrue(ans[0][1].toString() == "int")
+ assertTrue(ans[1][0].toString() == "name")
+ assertTrue(ans[1][1].toString() == "text")
+
+ sql """ switch ${catalog_name} """
+ sql """ use ${db1} """
+
+
+ hive_docker """ alter table ${tb1} change column id id bigint; """
+ sleep(wait_time);
+ ans = sql """ desc ${tb1} """
+ logger.info("ans = ${ans}")
+ assertTrue(ans.size() == 2)
+ assertTrue(ans[0][0].toString() == "id")
+ assertTrue(ans[0][1].toString() == "bigint")
+ assertTrue(ans[1][0].toString() == "name")
+ assertTrue(ans[1][1].toString() == "text")
+ ans = sql """ select * from ${tb1} order by id """
+ logger.info("ans = ${ans}")
+ assertTrue(ans.size() == 2)
+ assertTrue(ans[0][0].toString() == "1")
+ assertTrue(ans[0][1].toString() == "xxx")
+ assertTrue(ans[1][0].toString() == "2")
+ assertTrue(ans[1][1].toString() == "yyy")
+
+
+ sql """ switch ${catalog_name_2} """
+ sql """ use ${db1} """
+ ans = sql """ desc ${tb1} """
+ logger.info("ans = ${ans}")
+ assertTrue(ans.size() == 2)
+ assertTrue(ans[0][0].toString() == "id")
+ assertTrue(ans[0][1].toString() == "bigint")
+ assertTrue(ans[1][0].toString() == "name")
+ assertTrue(ans[1][1].toString() == "text")
+ ans = sql """ select * from ${tb1} order by id """
+ logger.info("ans = ${ans}")
+ assertTrue(ans.size() == 2)
+ assertTrue(ans[0][0].toString() == "1")
+ assertTrue(ans[0][1].toString() == "xxx")
+ assertTrue(ans[1][0].toString() == "2")
+ assertTrue(ans[1][1].toString() == "yyy")
+
+ sql """ switch ${catalog_name} """
+ sql """ use ${db1} """
+
+
+
+ hive_docker """ alter table ${tb1} change column name new_name
string; """
+ sleep(wait_time);
+ ans = sql """ desc ${tb1} """
+ logger.info("ans = ${ans}")
+ assertTrue(ans.size() == 2)
+ assertTrue(ans[0][0].toString() == "id")
+ assertTrue(ans[0][1].toString() == "bigint")
+ assertTrue(ans[1][0].toString() == "new_name")
+ assertTrue(ans[1][1].toString() == "text")
+ ans = sql """ select * from ${tb1} order by id """
+ logger.info("ans = ${ans}")
+ assertTrue(ans.size() == 2)
+ assertTrue(ans[0][0].toString() == "1")
+ assertTrue(ans[0][1].toString() == "xxx")
+ assertTrue(ans[1][0].toString() == "2")
+ assertTrue(ans[1][1].toString() == "yyy")
+
+ sql """ switch ${catalog_name_2} """
+ sql """ use ${db1} """
+ ans = sql """ desc ${tb1} """
+ logger.info("ans = ${ans}")
+ assertTrue(ans.size() == 2)
+ assertTrue(ans[0][0].toString() == "id")
+ assertTrue(ans[0][1].toString() == "bigint")
+ assertTrue(ans[1][0].toString() == "new_name")
+ assertTrue(ans[1][1].toString() == "text")
+ ans = sql """ select * from ${tb1} order by id """
+ logger.info("ans = ${ans}")
+ assertTrue(ans.size() == 2)
+ assertTrue(ans[0][0].toString() == "1")
+ assertTrue(ans[0][1].toString() == "xxx")
+ assertTrue(ans[1][0].toString() == "2")
+ assertTrue(ans[1][1].toString() == "yyy")
+
+ sql """ switch ${catalog_name} """
+ sql """ use ${db1} """
+
+
+
+
+//DROP TABLE
+ hive_docker """ drop table ${tb2} """
+ sleep(wait_time);
+ tbs = sql """ show tables; """
+
+ logger.info(""" tbs = ${tbs}""")
+
+ flag_tb_count = 0 ;
+ tbs.forEach {
+ if (it[0] == tb1) {
+ flag_tb_count ++;
+ } else if (it[0] == tb2) {
+ logger.info("exists ${tb1}")
+ assertTrue(false);
+ }
+ }
+ assertTrue(flag_tb_count == 1);
+
+
+ sql """ switch ${catalog_name_2} """
+ sql """ use ${db1} """
+ tbs = sql """ show tables; """
+ logger.info(""" tbs = ${tbs}""")
+ flag_tb_count = 0 ;
+ tbs.forEach {
+ if (it[0] == tb1) {
+ flag_tb_count ++;
+ } else if (it[0] == tb2) {
+ logger.info("exists ${tb2}")
+ assertTrue(false);
+ }
+ }
+ assertTrue(flag_tb_count == 1);
+
+ sql """ switch ${catalog_name} """
+ sql """ use ${db1} """
+
+
+
+
+ hive_docker """ drop table ${tb1} """
+ sleep(wait_time);
+ tbs = sql """ show tables; """
+
+ logger.info(""" tbs = ${tbs}""")
+
+ tbs.forEach {
+ if (it[0] == tb1) {
+ logger.info("exists ${tb1}")
+ assertTrue(false);
+ } else if (it[0] == tb2) {
+ logger.info("exists ${tb2}")
+ assertTrue(false);
+ }
+ }
+
+
+ sql """ switch ${catalog_name_2} """
+ sql """ use ${db1} """
+ tbs = sql """ show tables; """
+ logger.info(""" tbs = ${tbs}""")
+ tbs.forEach {
+ if (it[0] == tb1) {
+ logger.info("exists ${tb1}")
+ assertTrue(false);
+ } else if (it[0] == tb2) {
+ logger.info("exists ${tb2}")
+ assertTrue(false);
+ }
+ }
+ sql """ switch ${catalog_name} """
+ sql """ use ${db1} """
+
+
+
+//ADD PARTITION
+
+ hive_docker """ use ${db1} """
+ sql """ use ${db1} """
+
+ hive_docker """ CREATE TABLE ${partition_tb} (
+ id INT,
+ name STRING,
+ age INT
+ )
+ PARTITIONED BY (country STRING); """
+ hive_docker """
+ INSERT INTO TABLE ${partition_tb} PARTITION (country='USA')
+ VALUES (1, 'John Doe', 30),
+ (2, 'Jane Smith', 25);"""
+
+ hive_docker """
+ INSERT INTO TABLE ${partition_tb} PARTITION (country='India')
+ VALUES (3, 'Rahul Kumar', 28),
+ (4, 'Priya Singh', 24);
+ """
+ sleep(wait_time);
+ ans = sql """ select * from ${partition_tb} order by id"""
+ logger.info("ans = ${ans}")
+ assertTrue(ans.size() == 4)
+ assertTrue(ans[0][0].toString() == "1")
+ assertTrue(ans[0][3].toString() == "USA")
+ assertTrue(ans[1][3].toString() == "USA")
+ assertTrue(ans[3][0].toString() == "4")
+ assertTrue(ans[2][3].toString() == "India")
+ assertTrue(ans[3][3].toString() == "India")
+
+ sql """ switch ${catalog_name_2} """
+ sql """ use ${db1} """
+ ans = sql """ select * from ${partition_tb} order by id"""
+ logger.info("ans = ${ans}")
+ assertTrue(ans.size() == 4)
+ assertTrue(ans[0][0].toString() == "1")
+ assertTrue(ans[0][3].toString() == "USA")
+ assertTrue(ans[1][3].toString() == "USA")
+ assertTrue(ans[3][0].toString() == "4")
+ assertTrue(ans[2][3].toString() == "India")
+ assertTrue(ans[3][3].toString() == "India")
+
+ sql """ switch ${catalog_name} """
+ sql """ use ${db1} """
+
+
+
+ List<List<String>> pars = sql """ SHOW PARTITIONS from
${partition_tb}; """
+ logger.info("pars = ${pars}")
+ assertTrue(pars.size() == 2)
+ int flag_partition_count = 0 ;
+ pars.forEach {
+ if (it[0] == "country=India") {
+ flag_partition_count ++;
+ } else if (it[0] == "country=USA") {
+ flag_partition_count ++;
+ }
+ }
+ assertTrue(flag_partition_count ==2)
+
+ sql """ switch ${catalog_name_2} """
+ sql """ use ${db1} """
+ pars = sql """ SHOW PARTITIONS from ${partition_tb}; """
+ logger.info("pars = ${pars}")
+ assertTrue(pars.size() == 2)
+ flag_partition_count = 0 ;
+ pars.forEach {
+ if (it[0] == "country=India") {
+ flag_partition_count ++;
+ } else if (it[0] == "country=USA") {
+ flag_partition_count ++;
+ }
+ }
+ assertTrue(flag_partition_count ==2)
+
+ sql """ switch ${catalog_name} """
+ sql """ use ${db1} """
+
+
+
+
+ hive_docker """
+ ALTER TABLE ${partition_tb} ADD PARTITION (country='Canada');
+ """
+ sleep(wait_time);
+ pars = sql """ SHOW PARTITIONS from ${partition_tb}; """
+ logger.info("pars = ${pars}")
+ assertTrue(pars.size() == 3)
+ flag_partition_count = 0 ;
+ pars.forEach {
+ if (it[0].toString() == "country=India") {
+ flag_partition_count ++;
+ } else if (it[0].toString() == "country=USA") {
+ flag_partition_count ++;
+ } else if (it[0].toString() == "country=Canada") {
+ flag_partition_count ++;
+ }
+ }
+ assertTrue(flag_partition_count ==3)
+
+
+ sql """ switch ${catalog_name_2} """
+ sql """ use ${db1} """
+ pars = sql """ SHOW PARTITIONS from ${partition_tb}; """
+ logger.info("pars = ${pars}")
+ assertTrue(pars.size() == 3)
+ flag_partition_count = 0 ;
+ pars.forEach {
+ if (it[0].toString() == "country=India") {
+ flag_partition_count ++;
+ } else if (it[0].toString() == "country=USA") {
+ flag_partition_count ++;
+ } else if (it[0].toString() == "country=Canada") {
+ flag_partition_count ++;
+ }
+ }
+ assertTrue(flag_partition_count ==3)
+
+
+ sql """ switch ${catalog_name} """
+ sql """ use ${db1} """
+
+
+
+
+//ALTER PARTITION
+ hive_docker """
+ alter table ${partition_tb} partition(country='USA') rename to
partition(country='US') ;
+ """
+ sleep(wait_time);
+ pars = sql """ SHOW PARTITIONS from ${partition_tb}; """
+ logger.info("pars = ${pars}")
+ assertTrue(pars.size() == 3)
+ flag_partition_count = 0 ;
+ pars.forEach {
+ if (it[0].toString() == "country=India") {
+ flag_partition_count ++;
+ } else if (it[0].toString() == "country=US") {
+ flag_partition_count ++;
+ } else if (it[0].toString() == "country=Canada") {
+ flag_partition_count ++;
+ }
+ }
+ assertTrue(flag_partition_count ==3)
+
+
+ sql """ switch ${catalog_name_2} """
+ sql """ use ${db1} """
+ pars = sql """ SHOW PARTITIONS from ${partition_tb}; """
+ logger.info("pars = ${pars}")
+ assertTrue(pars.size() == 3)
+ flag_partition_count = 0 ;
+ pars.forEach {
+ if (it[0].toString() == "country=India") {
+ flag_partition_count ++;
+ } else if (it[0].toString() == "country=US") {
+ flag_partition_count ++;
+ } else if (it[0].toString() == "country=Canada") {
+ flag_partition_count ++;
+ }
+ }
+ assertTrue(flag_partition_count ==3)
+
+ sql """ switch ${catalog_name} """
+ sql """ use ${db1} """
+
+
+
+
+//DROP PARTITION
+ hive_docker """
+ ALTER TABLE ${partition_tb} DROP PARTITION (country='Canada');
+ """
+ sleep(wait_time);
+ pars = sql """ SHOW PARTITIONS from ${partition_tb}; """
+ logger.info("pars = ${pars}")
+ assertTrue(pars.size() == 2)
+ flag_partition_count = 0
+ pars.forEach {
+ if (it[0].toString() == "country=India") {
+ flag_partition_count ++;
+ } else if (it[0].toString() == "country=US") {
+ flag_partition_count ++;
+ } else if (it[0].toString() == "country=Canada") {
+ logger.info("exists partition canada")
+ assertTrue(false);
+ }
+ }
+ assertTrue(flag_partition_count ==2)
+
+
+ sql """ switch ${catalog_name_2} """
+ sql """ use ${db1} """
+ pars = sql """ SHOW PARTITIONS from ${partition_tb}; """
+ logger.info("pars = ${pars}")
+ assertTrue(pars.size() == 2)
+ flag_partition_count = 0
+ pars.forEach {
+ if (it[0].toString() == "country=India") {
+ flag_partition_count ++;
+ } else if (it[0].toString() == "country=US") {
+ flag_partition_count ++;
+ } else if (it[0].toString() == "country=Canada") {
+ logger.info("exists partition canada")
+ assertTrue(false);
+ }
+ }
+ assertTrue(flag_partition_count ==2)
+ sql """ switch ${catalog_name} """
+ sql """ use ${db1} """
+
+
+
+ sql """drop catalog if exists ${catalog_name}"""
+ sql """drop catalog if exists ${catalog_name_2}"""
+ } finally {
+ }
+ }
+ }
+}
+
+
+
+
diff --git
a/regression-test/suites/external_table_p0/hive/write/test_hive_write_insert.groovy
b/regression-test/suites/external_table_p0/hive/write/test_hive_write_insert.groovy
index 0b6fab86b2b..087b797faaf 100644
---
a/regression-test/suites/external_table_p0/hive/write/test_hive_write_insert.groovy
+++
b/regression-test/suites/external_table_p0/hive/write/test_hive_write_insert.groovy
@@ -880,7 +880,7 @@ INSERT INTO
all_types_par_${format_compression}_${catalog_name}_q03
return;
}
- for (String hivePrefix : ["hive2", "hive3"]) {
+ for (String hivePrefix : ["hive3"]) {
setHivePrefix(hivePrefix)
try {
String hms_port = context.config.otherConfigs.get(hivePrefix +
"HmsPort")
diff --git
a/regression-test/suites/external_table_p0/hive/write/test_hive_write_partitions.groovy
b/regression-test/suites/external_table_p0/hive/write/test_hive_write_partitions.groovy
index cd0533d00d9..7e3f070636e 100644
---
a/regression-test/suites/external_table_p0/hive/write/test_hive_write_partitions.groovy
+++
b/regression-test/suites/external_table_p0/hive/write/test_hive_write_partitions.groovy
@@ -195,7 +195,7 @@ suite("test_hive_write_partitions",
"p0,external,hive,external_docker,external_d
return;
}
- for (String hivePrefix : ["hive2", "hive3"]) {
+ for (String hivePrefix : ["hive3"]) {
setHivePrefix(hivePrefix)
try {
String hms_port = context.config.otherConfigs.get(hivePrefix +
"HmsPort")
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]