This is an automated email from the ASF dual-hosted git repository. vihangk1 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/impala.git
commit e0ed7d321c1dca85a0f1482842f8db6db517c909 Author: xiaomeng <[email protected]> AuthorDate: Thu Feb 20 16:32:48 2020 -0800 IMPALA-8632: Add support for self-event detection for insert events In case of INSERT_EVENTS if Impala inserts into a table it causes a refresh to the underlying table/partition. This could be unnecessary when there is only one Impala cluster in the system. We can detect a self-event in such cases when the HMS API to fire a listener event returns the event id. This is used by EventProcessor to ignore the event when it is fetched later in the next polling cycle. Testing: Add testInsertFromImpala() in MetastoreEventsProcessorTest.java to test insert event self-event detection when insert into table and partition. Change-Id: I7873fbb2c159343690f93b9d120f6b425b983dcf Reviewed-on: http://gerrit.cloudera.org:8080/15648 Reviewed-by: Vihang Karajgaonkar <[email protected]> Tested-by: Impala Public Jenkins <[email protected]> --- be/src/common/global-flags.cc | 6 + be/src/util/backend-gflag-util.cc | 2 + common/thrift/BackendGflags.thrift | 2 + .../org/apache/impala/compat/MetastoreShim.java | 69 ++++++++++++ .../org/apache/impala/compat/MetastoreShim.java | 72 ++++++++++++ .../impala/catalog/CatalogServiceCatalog.java | 64 ++++++----- fe/src/main/java/org/apache/impala/catalog/Db.java | 4 +- .../org/apache/impala/catalog/HdfsPartition.java | 25 +++-- .../main/java/org/apache/impala/catalog/Table.java | 21 ++-- .../impala/catalog/events/InFlightEvents.java | 91 +++++++++++---- .../impala/catalog/events/MetastoreEvents.java | 34 ++++-- .../impala/catalog/events/SelfEventContext.java | 24 ++-- .../org/apache/impala/service/BackendConfig.java | 2 + .../apache/impala/service/CatalogOpExecutor.java | 67 +++++------ .../java/org/apache/impala/util/MetaStoreUtil.java | 57 +++------- .../events/MetastoreEventsProcessorTest.java | 124 ++++++++++++++++++++- tests/custom_cluster/test_event_processing.py | 11 +- 17 files changed, 504 insertions(+), 171 deletions(-) diff --git a/be/src/common/global-flags.cc b/be/src/common/global-flags.cc index 79084ff..69e856e 100644 --- a/be/src/common/global-flags.cc +++ b/be/src/common/global-flags.cc @@ -266,6 +266,12 @@ DEFINE_int32(hms_event_polling_interval_s, 0, "feature and not recommended to be deployed on production systems until it is " "made generally available."); +DEFINE_bool(enable_insert_events, true, + "Enables insert events in the events processor. When this configuration is set to " + "true Impala will generate INSERT event types which when received by other Impala " + "clusters can be used to automatically refresh the tables or partitions. Event " + "processing must be turned on for this flag to have any effect."); + DEFINE_string(blacklisted_dbs, "sys,information_schema", "Comma separated list for blacklisted databases. Configure which databases to be " "skipped for loading (in startup and global INVALIDATE METADATA). Users can't access," diff --git a/be/src/util/backend-gflag-util.cc b/be/src/util/backend-gflag-util.cc index f3e3755..90122d2 100644 --- a/be/src/util/backend-gflag-util.cc +++ b/be/src/util/backend-gflag-util.cc @@ -67,6 +67,7 @@ DECLARE_int64(exchg_node_buffer_size_bytes); DECLARE_int32(kudu_mutation_buffer_size); DECLARE_int32(kudu_error_buffer_size); DECLARE_int32(hms_event_polling_interval_s); +DECLARE_bool(enable_insert_events); DECLARE_string(authorization_factory_class); DECLARE_bool(unlock_mt_dop); DECLARE_bool(mt_dop_auto_fallback); @@ -155,6 +156,7 @@ Status GetThriftBackendGflags(JNIEnv* jni_env, jbyteArray* cfg_bytes) { cfg.__set_kudu_mutation_buffer_size(FLAGS_kudu_mutation_buffer_size); cfg.__set_kudu_error_buffer_size(FLAGS_kudu_error_buffer_size); cfg.__set_hms_event_polling_interval_s(FLAGS_hms_event_polling_interval_s); + cfg.__set_enable_insert_events(FLAGS_enable_insert_events); cfg.__set_impala_build_version(::GetDaemonBuildVersion()); cfg.__set_authorization_factory_class(FLAGS_authorization_factory_class); cfg.__set_unlock_mt_dop(FLAGS_unlock_mt_dop); diff --git a/common/thrift/BackendGflags.thrift b/common/thrift/BackendGflags.thrift index 90fad1a..c9b7d47 100644 --- a/common/thrift/BackendGflags.thrift +++ b/common/thrift/BackendGflags.thrift @@ -155,4 +155,6 @@ struct TBackendGflags { 65: required bool use_customized_user_groups_mapper_for_ranger 66: required bool enable_column_masking + + 67: required bool enable_insert_events } diff --git a/fe/src/compat-hive-2/java/org/apache/impala/compat/MetastoreShim.java b/fe/src/compat-hive-2/java/org/apache/impala/compat/MetastoreShim.java index 87e8ecf..39cd4ff 100644 --- a/fe/src/compat-hive-2/java/org/apache/impala/compat/MetastoreShim.java +++ b/fe/src/compat-hive-2/java/org/apache/impala/compat/MetastoreShim.java @@ -23,8 +23,14 @@ import static org.apache.impala.service.MetadataOp.TABLE_TYPE_VIEW; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableMap; + +import java.util.ArrayList; +import java.util.Collections; import java.util.EnumSet; import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.FileUtils; @@ -51,6 +57,9 @@ import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.metastore.api.FireEventRequest; +import org.apache.hadoop.hive.metastore.api.FireEventRequestData; +import org.apache.hadoop.hive.metastore.api.InsertEventRequestData; import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer; import org.apache.hadoop.hive.metastore.messaging.AlterTableMessage; import org.apache.hadoop.hive.metastore.messaging.EventMessage; @@ -62,6 +71,9 @@ import org.apache.hive.service.rpc.thrift.TGetFunctionsReq; import org.apache.hive.service.rpc.thrift.TGetSchemasReq; import org.apache.hive.service.rpc.thrift.TGetTablesReq; import org.apache.impala.authorization.User; +import org.apache.impala.catalog.CatalogServiceCatalog; +import org.apache.impala.catalog.HdfsPartition; +import org.apache.impala.catalog.MetaStoreClientPool.MetaStoreClient; import org.apache.impala.common.ImpalaException; import org.apache.impala.common.ImpalaRuntimeException; import org.apache.impala.common.Pair; @@ -71,6 +83,8 @@ import org.apache.impala.service.MetadataOp; import org.apache.impala.thrift.TMetadataOpRequest; import org.apache.impala.thrift.TResultSet; import org.apache.impala.util.AcidUtils.TblTransaction; +import org.apache.impala.util.MetaStoreUtil.InsertEventInfo; +import org.apache.log4j.Logger; import org.apache.thrift.TException; /** @@ -78,6 +92,7 @@ import org.apache.thrift.TException; * between major versions of Hive. This implements the shimmed methods for Hive 2. */ public class MetastoreShim { + private static final Logger LOG = Logger.getLogger(MetastoreShim.class); public static TblTransaction createTblTransaction( IMetaStoreClient client, Table tbl, long txnId) { @@ -484,4 +499,58 @@ public class MetastoreShim { throws MetaException { return new Path(db.getLocationUri(), tbl.getTableName().toLowerCase()).toString(); } + + /** + * Fire insert events asynchronously. This creates a single thread to execute the + * fireInsertEvent method and shuts down the thread after it has finished. + * In case of any exception, we just log the failure of firing insert events. + */ + public static List<Long> fireInsertEvents(MetaStoreClient msClient, + List<InsertEventInfo> insertEventInfos, String dbName, String tableName) { + ExecutorService fireInsertEventThread = Executors.newSingleThreadExecutor(); + CompletableFuture.runAsync(() -> { + try { + fireInsertEventHelper(msClient.getHiveClient(), insertEventInfos, dbName, tableName); + } catch (Exception e) { + LOG.error("Failed to fire insert event. Some tables might not be" + + " refreshed on other impala clusters.", e); + } finally { + msClient.close(); + } + }, Executors.newSingleThreadExecutor()).thenRun(() -> + fireInsertEventThread.shutdown()); + return Collections.emptyList(); + } + + /** + * Fires an insert event to HMS notification log. In Hive-2 for partitioned table, + * each existing partition touched by the insert will fire a separate insert event. + * @param msClient Metastore client, + * @param insertEventInfos A list of insert event encapsulating the information needed + * to fire insert + * @param dbName + * @param tableName + */ + @VisibleForTesting + public static void fireInsertEventHelper(IMetaStoreClient msClient, + List<InsertEventInfo> insertEventInfos, String dbName, String tableName) + throws TException { + Preconditions.checkNotNull(msClient); + Preconditions.checkNotNull(dbName); + Preconditions.checkNotNull(tableName); + for (InsertEventInfo info : insertEventInfos) { + Preconditions.checkNotNull(info.getNewFiles()); + LOG.debug("Firing an insert event for " + tableName); + FireEventRequestData data = new FireEventRequestData(); + InsertEventRequestData insertData = new InsertEventRequestData(); + data.setInsertData(insertData); + FireEventRequest rqst = new FireEventRequest(true, data); + rqst.setDbName(dbName); + rqst.setTableName(tableName); + insertData.setFilesAdded(new ArrayList<>(info.getNewFiles())); + insertData.setReplace(info.isOverwrite()); + if (info.getPartVals() != null) rqst.setPartitionVals(info.getPartVals()); + msClient.fireListenerEvent(rqst); + } + } } diff --git a/fe/src/compat-hive-3/java/org/apache/impala/compat/MetastoreShim.java b/fe/src/compat-hive-3/java/org/apache/impala/compat/MetastoreShim.java index 43f174c..5981b2c 100644 --- a/fe/src/compat-hive-3/java/org/apache/impala/compat/MetastoreShim.java +++ b/fe/src/compat-hive-3/java/org/apache/impala/compat/MetastoreShim.java @@ -32,6 +32,7 @@ import java.net.UnknownHostException; import java.util.ArrayList; import java.util.Arrays; import java.util.BitSet; +import java.util.Collections; import java.util.EnumSet; import java.util.List; @@ -52,6 +53,10 @@ import org.apache.hadoop.hive.metastore.api.ColumnStatistics; import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj; import org.apache.hadoop.hive.metastore.api.Database; import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.metastore.api.FireEventRequest; +import org.apache.hadoop.hive.metastore.api.FireEventRequestData; +import org.apache.hadoop.hive.metastore.api.FireEventResponse; +import org.apache.hadoop.hive.metastore.api.InsertEventRequestData; import org.apache.hadoop.hive.metastore.api.InvalidInputException; import org.apache.hadoop.hive.metastore.api.InvalidObjectException; import org.apache.hadoop.hive.metastore.api.InvalidOperationException; @@ -83,6 +88,9 @@ import org.apache.hive.service.rpc.thrift.TGetFunctionsReq; import org.apache.hive.service.rpc.thrift.TGetSchemasReq; import org.apache.hive.service.rpc.thrift.TGetTablesReq; import org.apache.impala.authorization.User; +import org.apache.impala.catalog.CatalogServiceCatalog; +import org.apache.impala.catalog.HdfsPartition; +import org.apache.impala.catalog.MetaStoreClientPool.MetaStoreClient; import org.apache.impala.common.ImpalaException; import org.apache.impala.common.ImpalaRuntimeException; import org.apache.impala.common.Pair; @@ -94,6 +102,7 @@ import org.apache.impala.thrift.TMetadataOpRequest; import org.apache.impala.thrift.TResultSet; import org.apache.impala.util.AcidUtils; import org.apache.impala.util.AcidUtils.TblTransaction; +import org.apache.impala.util.MetaStoreUtil.InsertEventInfo; import org.apache.log4j.Logger; import org.apache.thrift.TException; @@ -984,4 +993,67 @@ public class MetastoreShim { return wh.getDefaultTablePath(db, tbl.getTableName().toLowerCase(), isExternal) .toString(); } + + /** + * Fire insert events for table and partition. + * In case of any exception, we just log the failure of firing insert events. + */ + public static List<Long> fireInsertEvents(MetaStoreClient msClient, + List<InsertEventInfo> insertEventInfos, String dbName, String tableName) { + try { + return fireInsertEventHelper(msClient.getHiveClient(), insertEventInfos, dbName, tableName); + } catch (Exception e) { + LOG.error("Failed to fire insert event. Some tables might not be" + + " refreshed on other impala clusters.", e); + } finally { + msClient.close(); + } + return Collections.emptyList(); + } + + /** + * Fires an insert event to HMS notification log. In Hive-3 for partitioned table, + * all partition insert events will be fired by a bulk API. + * + * @param msClient Metastore client, + * @param insertEventInfos A list of insert event encapsulating the information needed + * to fire insert + * @param dbName + * @param tableName + * @return a list of eventIds for the insert events + */ + @VisibleForTesting + public static List<Long> fireInsertEventHelper(IMetaStoreClient msClient, + List<InsertEventInfo> insertEventInfos, String dbName, String tableName) + throws TException { + Preconditions.checkNotNull(msClient); + Preconditions.checkNotNull(dbName); + Preconditions.checkNotNull(tableName); + LOG.debug(String.format( + "Firing %s insert event for %s", insertEventInfos.size(), tableName)); + FireEventRequestData data = new FireEventRequestData(); + FireEventRequest rqst = new FireEventRequest(true, data); + rqst.setDbName(dbName); + rqst.setTableName(tableName); + List<InsertEventRequestData> insertDatas = new ArrayList<>(); + for (InsertEventInfo info : insertEventInfos) { + InsertEventRequestData insertData = new InsertEventRequestData(); + Preconditions.checkNotNull(info.getNewFiles()); + insertData.setFilesAdded(new ArrayList<>(info.getNewFiles())); + insertData.setReplace(info.isOverwrite()); + if (info.getPartVals() != null) insertData.setPartitionVal(info.getPartVals()); + insertDatas.add(insertData); + } + if (insertDatas.size() == 1) { + if (insertEventInfos.get(0).getPartVals() != null) { + rqst.setPartitionVals(insertEventInfos.get(0).getPartVals()); + } + data.setInsertData(insertDatas.get(0)); + } else { + data.setInsertDatas(insertDatas); + } + FireEventResponse response = msClient.fireListenerEvent(rqst); + + return response.getEventIds(); + } } diff --git a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java index fb7ef4d..87465b6 100644 --- a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java +++ b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java @@ -817,29 +817,36 @@ public class CatalogServiceCatalog extends Catalog { * Evaluates if the information from an event (serviceId and versionNumber) matches to * the catalog object. If there is match, the in-flight version for that object is * removed and method returns true. If it does not match, returns false + * @param ctx self context which provides all the information needed to * evaluate if this is a self-event or not * @return true if given event information evaluates to a self-event, false otherwise */ - public boolean evaluateSelfEvent(SelfEventContext ctx) + public boolean evaluateSelfEvent(boolean isInsertEvent, SelfEventContext ctx) throws CatalogException { Preconditions.checkState(isEventProcessingActive(), "Event processing should be enabled when calling this method"); - long versionNumber = ctx.getVersionNumberFromEvent(); + long versionNumber = + isInsertEvent ? ctx.getIdFromEvent() : ctx.getVersionNumberFromEvent(); String serviceIdFromEvent = ctx.getServiceIdFromEvent(); - LOG.debug("Input arguments for self-event evaluation: {} {}",versionNumber, - serviceIdFromEvent); - // no version info or service id in the event - if (versionNumber == -1 || serviceIdFromEvent.isEmpty()) { - LOG.info("Not a self-event since the given version is {} and service id is {}", - versionNumber, serviceIdFromEvent); - return false; - } - // if the service id from event doesn't match with our service id this is not a - // self-event - if (!getCatalogServiceId().equals(serviceIdFromEvent)) { - LOG.info("Not a self-event because service id of this catalog {} does not match " - + "with one in event {}.", getCatalogServiceId(), serviceIdFromEvent); + + if (!isInsertEvent) { + // no version info or service id in the event + if (versionNumber == -1 || serviceIdFromEvent.isEmpty()) { + LOG.info("Not a self-event since the given version is {} and service id is {}", + versionNumber, serviceIdFromEvent); + return false; + } + // if the service id from event doesn't match with our service id this is not a + // self-event + if (!getCatalogServiceId().equals(serviceIdFromEvent)) { + LOG.info("Not a self-event because service id of this catalog {} does not match " + + "with one in event {}.", + getCatalogServiceId(), serviceIdFromEvent); + } + } else if (versionNumber == -1) { + // if insert event, we only compare eventId + LOG.info("Not a self-event because eventId is {}", versionNumber); return false; } Db db = getDb(ctx.getDbName()); @@ -881,10 +888,11 @@ public class CatalogServiceCatalog extends Catalog { List<List<TPartitionKeyValue>> partitionKeyValues = ctx.getPartitionKeyValues(); // if the partitionKeyValues is null, we look for tbl's in-flight events if (partitionKeyValues == null) { - boolean removed = tbl.removeFromVersionsForInflightEvents(versionNumber); + boolean removed = + tbl.removeFromVersionsForInflightEvents(isInsertEvent, versionNumber); if (!removed) { - LOG.info("Could not find version {} in in-flight event list of table {}", - versionNumber, tbl.getFullName()); + LOG.info("Could not find {} {} in in-flight event list of table {}", + isInsertEvent ? "eventId" : "version", versionNumber, tbl.getFullName()); } return removed; } @@ -893,8 +901,9 @@ public class CatalogServiceCatalog extends Catalog { for (List<TPartitionKeyValue> partitionKeyValue : partitionKeyValues) { HdfsPartition hdfsPartition = ((HdfsTable) tbl).getPartitionFromThriftPartitionSpec(partitionKeyValue); - if (hdfsPartition == null || !hdfsPartition - .removeFromVersionsForInflightEvents(versionNumber)) { + if (hdfsPartition == null + || !hdfsPartition.removeFromVersionsForInflightEvents( + isInsertEvent, versionNumber)) { // even if this is an error condition we should not bail out early since we // should clean up the self-event state on the rest of the partitions String partName = HdfsTable.constructPartitionName(partitionKeyValue); @@ -919,15 +928,18 @@ public class CatalogServiceCatalog extends Catalog { /** * Adds a given version number from the catalog table's list of versions for in-flight * events. Applicable only when external event processing is enabled. - * + * @param isInsertEvent if false add versionNumber for DDL Event, otherwise add eventId + * for Insert Event. * @param tbl Catalog table - * @param versionNumber version number to be added + * @param versionNumber when isInsertEvent is true, it is eventId to add + * when isInsertEvent is false, it is version number to add */ - public void addVersionsForInflightEvents(Table tbl, long versionNumber) { + public void addVersionsForInflightEvents( + boolean isInsertEvent, Table tbl, long versionNumber) { if (!isEventProcessingActive()) return; - tbl.addToVersionsForInflightEvents(versionNumber); - LOG.info("Added catalog version {} in table's {} in-flight events", - versionNumber, tbl.getFullName()); + tbl.addToVersionsForInflightEvents(isInsertEvent, versionNumber); + LOG.info("Added {} {} in table's {} in-flight events", + isInsertEvent ? "eventId" : "catalog version", versionNumber, tbl.getFullName()); } /** diff --git a/fe/src/main/java/org/apache/impala/catalog/Db.java b/fe/src/main/java/org/apache/impala/catalog/Db.java index 330227c..8a465eb 100644 --- a/fe/src/main/java/org/apache/impala/catalog/Db.java +++ b/fe/src/main/java/org/apache/impala/catalog/Db.java @@ -511,7 +511,7 @@ public class Db extends CatalogObjectImpl implements FeDb { Preconditions.checkState(dbLock_.isHeldByCurrentThread(), "removeFromVersionsForInflightEvents called without getting the db lock for " + getName() + " database."); - return inFlightEvents_.remove(versionNumber); + return inFlightEvents_.remove(false, versionNumber); } /** @@ -525,7 +525,7 @@ public class Db extends CatalogObjectImpl implements FeDb { Preconditions.checkState(dbLock_.isHeldByCurrentThread(), "addToVersionsForInFlightEvents called without getting the db lock for " + getName() + " database."); - if (!inFlightEvents_.add(versionNumber)) { + if (!inFlightEvents_.add(false, versionNumber)) { LOG.warn(String.format("Could not add version %s to the list of in-flight " + "events. This could cause unnecessary database %s invalidation when the " + "event is processed", versionNumber, getName())); diff --git a/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java b/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java index 2069756..e7a5bb8 100644 --- a/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java +++ b/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java @@ -852,25 +852,33 @@ public class HdfsPartition implements FeFsPartition, PrunablePartition { /** * Removes a given version from the in-flight events - * @param versionNumber version number to remove + * @param isInsertEvent If true, remove eventId from list of eventIds for in-flight + * Insert events. If false, remove version number from list of versions for in-flight + * DDL events. + * @param versionNumber when isInsertEvent is true, it's eventId to remove + * when isInsertEvent is false, it's version number to remove * @return true if the versionNumber was removed, false if it didn't exist */ - public boolean removeFromVersionsForInflightEvents(long versionNumber) { + public boolean removeFromVersionsForInflightEvents( + boolean isInsertEvent, long versionNumber) { Preconditions.checkState(table_.getLock().isHeldByCurrentThread(), "removeFromVersionsForInflightEvents called without holding the table lock on " + "partition " + getPartitionName() + " of table " + table_.getFullName()); - return inFlightEvents_.remove(versionNumber); + return inFlightEvents_.remove(isInsertEvent, versionNumber); } /** * Adds a version number to the in-flight events of this partition - * @param versionNumber version number to add + * @param isInsertEvent if true, add eventId to list of eventIds for in-flight Insert + * events if false, add version number to list of versions for in-flight DDL events + * @param versionNumber when isInsertEvent is true, it's eventId to add + * when isInsertEvent is false, it's version number to add */ - public void addToVersionsForInflightEvents(long versionNumber) { + public void addToVersionsForInflightEvents(boolean isInsertEvent, long versionNumber) { Preconditions.checkState(table_.getLock().isHeldByCurrentThread(), "addToVersionsForInflightEvents called without holding the table lock on " + "partition " + getPartitionName() + " of table " + table_.getFullName()); - if (!inFlightEvents_.add(versionNumber)) { + if (!inFlightEvents_.add(isInsertEvent, versionNumber)) { LOG.warn(String.format("Could not add %s version to the partition %s of table %s. " + "This could cause unnecessary refresh of the partition when the event is" + "received by the Events processor.", versionNumber, getPartitionName(), @@ -886,13 +894,14 @@ public class HdfsPartition implements FeFsPartition, PrunablePartition { */ private void addInflightVersionsFromParameters() { Preconditions.checkNotNull(hmsParameters_); - Preconditions.checkState(inFlightEvents_.size() == 0); + Preconditions.checkState(inFlightEvents_.size(false) == 0); // we should not check for table lock being held here since there are certain code // paths which call this method without holding the table lock (eg. getOrLoadTable()) if (!hmsParameters_.containsKey(MetastoreEventPropertyKey.CATALOG_VERSION.getKey())) { return; } - inFlightEvents_.add(Long.parseLong( + inFlightEvents_.add(false, + Long.parseLong( hmsParameters_.get(MetastoreEventPropertyKey.CATALOG_VERSION.getKey()))); } diff --git a/fe/src/main/java/org/apache/impala/catalog/Table.java b/fe/src/main/java/org/apache/impala/catalog/Table.java index 0c6ac31..8f0f5ad 100644 --- a/fe/src/main/java/org/apache/impala/catalog/Table.java +++ b/fe/src/main/java/org/apache/impala/catalog/Table.java @@ -63,7 +63,6 @@ import org.apache.log4j.Logger; import com.codahale.metrics.Timer; import com.google.common.base.Preconditions; import com.google.common.base.Stopwatch; -import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; /** @@ -801,14 +800,19 @@ public abstract class Table extends CatalogObjectImpl implements FeTable { /** * Removes a given version from the collection of version numbers for in-flight events - * @param versionNumber version number to remove from the collection + * @param isInsertEvent If true, remove eventId from list of eventIds for in-flight + * Insert events. If false, remove versionNumber from list of versions for in-flight DDL + * events + * @param versionNumber when isInsertEvent is true, it's eventId to remove + * when isInsertEvent is false, it's version number to remove * @return true if version was successfully removed, false if didn't exist */ - public boolean removeFromVersionsForInflightEvents(long versionNumber) { + public boolean removeFromVersionsForInflightEvents( + boolean isInsertEvent, long versionNumber) { Preconditions.checkState(tableLock_.isHeldByCurrentThread(), "removeFromVersionsForInFlightEvents called without taking the table lock on " + getFullName()); - return inFlightEvents.remove(versionNumber); + return inFlightEvents.remove(isInsertEvent, versionNumber); } /** @@ -816,16 +820,19 @@ public abstract class Table extends CatalogObjectImpl implements FeTable { * collection is already at the max size defined by * <code>MAX_NUMBER_OF_INFLIGHT_EVENTS</code>, then it ignores the given version and * does not add it - * @param versionNumber version number to add + * @param isInsertEvent if true, add eventId to list of eventIds for in-flight Insert + * events. If false, add versionNumber to list of versions for in-flight DDL events + * @param versionNumber when isInsertEvent is true, it's eventId to add + * when isInsertEvent is false, it's version number to add * @return True if version number was added, false if the collection is at its max * capacity */ - public void addToVersionsForInflightEvents(long versionNumber) { + public void addToVersionsForInflightEvents(boolean isInsertEvent, long versionNumber) { // we generally don't take locks on Incomplete tables since they are atomically // replaced during load Preconditions.checkState( this instanceof IncompleteTable || tableLock_.isHeldByCurrentThread()); - if (!inFlightEvents.add(versionNumber)) { + if (!inFlightEvents.add(isInsertEvent, versionNumber)) { LOG.warn(String.format("Could not add %s version to the table %s. This could " + "cause unnecessary refresh of the table when the event is received by the " + "Events processor.", versionNumber, getFullName())); diff --git a/fe/src/main/java/org/apache/impala/catalog/events/InFlightEvents.java b/fe/src/main/java/org/apache/impala/catalog/events/InFlightEvents.java index 1aed6d1..11d38f6 100644 --- a/fe/src/main/java/org/apache/impala/catalog/events/InFlightEvents.java +++ b/fe/src/main/java/org/apache/impala/catalog/events/InFlightEvents.java @@ -34,41 +34,66 @@ public class InFlightEvents { // maximum number of catalog versions to store for in-flight events for this table private static final int DEFAULT_MAX_NUMBER_OF_INFLIGHT_EVENTS = 10; + // maximum number of eventIds to store for in-flight events for this table + private static final int DEFAULT_MAX_NUMBER_OF_INFLIGHT_INSERT_EVENTS = 100; + private static final Logger LOG = LoggerFactory.getLogger(InFlightEvents.class); - // FIFO list of versions for all the in-flight metastore events in this table + // FIFO list of versions for all the in-flight metastore DDL events in this table // This queue can only grow up to MAX_NUMBER_OF_INFLIGHT_EVENTS size. Anything which // is attempted to be added to this list when its at maximum capacity is ignored private final LinkedList<Long> versionsForInflightEvents_ = new LinkedList<>(); + // FIFO list of eventIds for all the in-flight metastore Insert events in this table + // This queue can only grow up to MAX_NUMBER_OF_INFLIGHT_INSERT_EVENTS size. Anything + // which is attempted to be added to this list when its at maximum capacity is ignored + private final LinkedList<Long> idsForInflightDmlEvents_ = new LinkedList<>(); + // maximum number of versions to store - private final int capacity_; + private final int capacity_for_versions_; + + // maximum number of eventIds to store + private final int capacity_for_eventIds_; public InFlightEvents() { - this.capacity_ = DEFAULT_MAX_NUMBER_OF_INFLIGHT_EVENTS; + this.capacity_for_versions_ = DEFAULT_MAX_NUMBER_OF_INFLIGHT_EVENTS; + this.capacity_for_eventIds_ = DEFAULT_MAX_NUMBER_OF_INFLIGHT_INSERT_EVENTS; } public InFlightEvents(int capacity) { Preconditions.checkState(capacity > 0); - this.capacity_ = capacity; + this.capacity_for_versions_ = capacity; + this.capacity_for_eventIds_ = DEFAULT_MAX_NUMBER_OF_INFLIGHT_INSERT_EVENTS; } - /** * Gets the current list of versions for in-flight events for this table + * @param isInsertEvent if true, return list of eventIds for in-flight Insert events + * if false, return list of versions for in-flight DDL events */ - public List<Long> getAll() { - return ImmutableList.copyOf(versionsForInflightEvents_); + public List<Long> getAll(boolean isInsertEvent) { + if (isInsertEvent) { + return ImmutableList.copyOf(idsForInflightDmlEvents_); + } else { + return ImmutableList.copyOf(versionsForInflightEvents_); + } } /** * Removes a given version from the collection of version numbers for in-flight * events. - * - * @param versionNumber version number to remove from the collection + * @param isInsertEvent If true, remove eventId from list of eventIds for in-flight + * Insert events. If false, remove version number from list of versions for in-flight + * DDL events. + * @param versionNumber when isInsertEvent is true, it's eventId to remove + * when isInsertEvent is false, it's version number to remove * @return true if the version was found and successfully removed, false * otherwise */ - public boolean remove(long versionNumber) { - return versionsForInflightEvents_.remove(versionNumber); + public boolean remove(boolean isInsertEvent, long versionNumber) { + if (isInsertEvent) { + return idsForInflightDmlEvents_.remove(versionNumber); + } else { + return versionsForInflightEvents_.remove(versionNumber); + } } /** @@ -76,23 +101,45 @@ public class InFlightEvents { * collection is already at the max size defined by * <code>MAX_NUMBER_OF_INFLIGHT_EVENTS</code>, then it ignores the given version and * does not add it - * - * @param versionNumber version number to add + * @param isInsertEvent If true, add eventId to list of eventIds for in-flight Insert + * events. If false, add versionNumber to list of versions for in-flight DDL events. + * @param versionNumber when isInsertEvent is true, it's eventId to add + * when isInsertEvent is false, it's version number to add * @return True if version number was added, false if the collection is at its max * capacity */ - public boolean add(long versionNumber) { - if (versionsForInflightEvents_.size() == capacity_) { - LOG.warn(String.format("Number of versions to be stored is at " - + " its max capacity %d. Ignoring add request for version number %d.", - DEFAULT_MAX_NUMBER_OF_INFLIGHT_EVENTS, versionNumber)); - return false; + public boolean add(boolean isInsertEvent, long versionNumber) { + if (isInsertEvent) { + if (idsForInflightDmlEvents_.size() == capacity_for_eventIds_) { + LOG.warn(String.format("Number of Insert events to be stored is at " + + " its max capacity %d. Ignoring add request for eventId %d.", + DEFAULT_MAX_NUMBER_OF_INFLIGHT_EVENTS, versionNumber)); + return false; + } + idsForInflightDmlEvents_.add(versionNumber); + } else { + if (versionsForInflightEvents_.size() == capacity_for_versions_) { + LOG.warn(String.format("Number of DDL events to be stored is at " + + "its max capacity %d. Ignoring add request for version %d.", + DEFAULT_MAX_NUMBER_OF_INFLIGHT_EVENTS, versionNumber)); + return false; + } + versionsForInflightEvents_.add(versionNumber); } - versionsForInflightEvents_.add(versionNumber); return true; } - public int size() { - return versionsForInflightEvents_.size(); + /** + * Get the size of in-flight DDL or DML events list + * @param isInsertEvent if true, return size of Insert events list + * if false, return size of DDL events list + * @return size of events list + */ + public int size(boolean isInsertEvent) { + if (isInsertEvent) { + return idsForInflightDmlEvents_.size(); + } else { + return versionsForInflightEvents_.size(); + } } } diff --git a/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java b/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java index 2be56e9..e64578b 100644 --- a/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java +++ b/fe/src/main/java/org/apache/impala/catalog/events/MetastoreEvents.java @@ -269,7 +269,7 @@ public class MetastoreEvents { protected final String tblName_; // eventId of the event. Used instead of calling getter on event_ everytime - protected final long eventId_; + protected long eventId_; // eventType from the NotificationEvent protected final MetastoreEventType eventType_; @@ -419,13 +419,15 @@ public class MetastoreEvents { * serviceId and version. More details on complete flow of self-event handling * logic can be read in <code>MetastoreEventsProcessor</code> documentation. * + * @param isInsertEvent if true, check in flight events list of Insert event + * if false, check events list of DDL * @return True if this event is a self-generated event. If the returned value is * true, this method also clears the version number from the catalog database/table. * Returns false if the version numbers or service id don't match */ - protected boolean isSelfEvent() { + protected boolean isSelfEvent(boolean isInsertEvent) { try { - if (catalog_.evaluateSelfEvent(getSelfEventContext())) { + if (catalog_.evaluateSelfEvent(isInsertEvent, getSelfEventContext())) { metrics_.getCounter(MetastoreEventsProcessor.NUMBER_OF_SELF_EVENTS).inc(); return true; } @@ -434,6 +436,8 @@ public class MetastoreEvents { } return false; } + + protected boolean isSelfEvent() { return isSelfEvent(false); } } public static String getStringProperty( @@ -766,19 +770,25 @@ public class MetastoreEvents { @Override public SelfEventContext getSelfEventContext() { - throw new UnsupportedOperationException("Self-event evaluation is not implemented" - + " for insert event type"); + if (insertPartition_ != null) { + // create selfEventContext for insert partition event + List<TPartitionKeyValue> tPartSpec = + getTPartitionSpecFromHmsPartition(msTbl_, insertPartition_); + return new SelfEventContext(dbName_, tblName_, Arrays.asList(tPartSpec), + insertPartition_.getParameters(), eventId_); + } else { + // create selfEventContext for insert table event + return new SelfEventContext( + dbName_, tblName_, null, msTbl_.getParameters(), eventId_); + } } - /** - * Currently we do not check for self-events in Inserts. Existing self-events logic - * cannot be used for insert events since firing insert event does not allow us to - * modify table parameters in HMS. Hence, we cannot get CatalogServiceIdentifiers in - * Insert Events. - * TODO: Handle self-events for insert case. - */ @Override public void process() throws MetastoreNotificationException { + if (isSelfEvent(true)) { + infoLog("Not processing the event as it is a self-event"); + return; + } // Reload the whole table if it's a transactional table. if (AcidUtils.isTransactionalTable(msTbl_.getParameters())) { insertPartition_ = null; diff --git a/fe/src/main/java/org/apache/impala/catalog/events/SelfEventContext.java b/fe/src/main/java/org/apache/impala/catalog/events/SelfEventContext.java index 83aa69b..10dd4eb 100644 --- a/fe/src/main/java/org/apache/impala/catalog/events/SelfEventContext.java +++ b/fe/src/main/java/org/apache/impala/catalog/events/SelfEventContext.java @@ -32,15 +32,21 @@ import org.apache.impala.thrift.TPartitionKeyValue; public class SelfEventContext { private final String dbName_; private final String tblName_; + private final long insertEventId_; // version number from the event object parameters used for self-event detection private final long versionNumberFromEvent_; // service id from the event object parameters used for self-event detection - private final String serviceIdFromEvent_; + private final String serviceidFromEvent_; private final List<List<TPartitionKeyValue>> partitionKeyValues_; SelfEventContext(String dbName, String tblName, Map<String, String> parameters) { - this(dbName, tblName, null, parameters); + this(dbName, tblName, null, parameters, -1); + } + + SelfEventContext(String dbName, String tblName, + List<List<TPartitionKeyValue>> partitionKeyValues, Map<String, String> parameters) { + this(dbName, tblName, partitionKeyValues, parameters, -1); } /** @@ -55,17 +61,17 @@ public class SelfEventContext { */ SelfEventContext(String dbName, @Nullable String tblName, @Nullable List<List<TPartitionKeyValue>> partitionKeyValues, - Map<String, String> parameters) { + Map<String, String> parameters, long eventId) { Preconditions.checkNotNull(parameters); this.dbName_ = Preconditions.checkNotNull(dbName); this.tblName_ = tblName; this.partitionKeyValues_ = partitionKeyValues; + insertEventId_ = eventId; versionNumberFromEvent_ = Long.parseLong( MetastoreEvents.getStringProperty(parameters, MetastoreEventPropertyKey.CATALOG_VERSION.getKey(), "-1")); - serviceIdFromEvent_ = - MetastoreEvents.getStringProperty(parameters, - MetastoreEventPropertyKey.CATALOG_SERVICE_ID.getKey(), ""); + serviceidFromEvent_ = MetastoreEvents.getStringProperty( + parameters, MetastoreEventPropertyKey.CATALOG_SERVICE_ID.getKey(), ""); } public String getDbName() { @@ -76,13 +82,13 @@ public class SelfEventContext { return tblName_; } + public long getIdFromEvent() { return insertEventId_; } + public long getVersionNumberFromEvent() { return versionNumberFromEvent_; } - public String getServiceIdFromEvent() { - return serviceIdFromEvent_; - } + public String getServiceIdFromEvent() { return serviceidFromEvent_; } public List<List<TPartitionKeyValue>> getPartitionKeyValues() { return partitionKeyValues_ == null ? diff --git a/fe/src/main/java/org/apache/impala/service/BackendConfig.java b/fe/src/main/java/org/apache/impala/service/BackendConfig.java index c3957b9..c62ac6f 100644 --- a/fe/src/main/java/org/apache/impala/service/BackendConfig.java +++ b/fe/src/main/java/org/apache/impala/service/BackendConfig.java @@ -126,6 +126,8 @@ public class BackendConfig { return backendCfg_.hms_event_polling_interval_s; } + public boolean isInsertEventsEnabled() { return backendCfg_.enable_insert_events; } + public boolean isOrcScannerEnabled() { return backendCfg_.enable_orc_scanner; } diff --git a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java index 63a7b81..3ac2a05 100644 --- a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java +++ b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java @@ -711,7 +711,7 @@ public class CatalogOpExecutor { // pre-existing there is no alter table event generated. Hence we should // only add the versions for in-flight events when we are sure that the // partition was really added. - catalog_.addVersionsForInflightEvents(tbl, newCatalogVersion); + catalog_.addVersionsForInflightEvents(false, tbl, newCatalogVersion); addTableToCatalogUpdate(refreshedTable, response.result); } reloadMetadata = false; @@ -851,7 +851,7 @@ public class CatalogOpExecutor { reloadTableSchema, null, "ALTER TABLE " + params.getAlter_type().name()); // now that HMS alter operation has succeeded, add this version to list of // inflight events in catalog table if event processing is enabled - catalog_.addVersionsForInflightEvents(tbl, newCatalogVersion); + catalog_.addVersionsForInflightEvents(false, tbl, newCatalogVersion); addTableToCatalogUpdate(tbl, response.result); } } finally { @@ -3098,7 +3098,7 @@ public class CatalogOpExecutor { "'%s' and the new table name '%s' may fix the problem." , tableName.toString(), newTableName.toString())); } - catalog_.addVersionsForInflightEvents(result.second, newCatalogVersion); + catalog_.addVersionsForInflightEvents(false, result.second, newCatalogVersion); // TODO(todd): if client is a 'v2' impalad, only send back invalidation response.result.addToRemoved_catalog_objects(result.first.toMinimalTCatalogObject()); response.result.addToUpdated_catalog_objects(result.second.toTCatalogObject()); @@ -3678,7 +3678,7 @@ public class CatalogOpExecutor { // catalog service identifiers if (catalog_.getCatalogServiceId().equals(serviceId)) { Preconditions.checkNotNull(version); - hdfsPartition.addToVersionsForInflightEvents(Long.parseLong(version)); + hdfsPartition.addToVersionsForInflightEvents(false, Long.parseLong(version)); } } @@ -4430,7 +4430,7 @@ public class CatalogOpExecutor { } /** - * Populates insert event data and calls fireInsertEventAysnc() if external event + * Populates insert event data and calls fireInsertEvents() if external event * processing is enabled. This is no-op if event processing is disabled or there are * no existing partitions affected by this insert. * @@ -4440,11 +4440,16 @@ public class CatalogOpExecutor { */ private void createInsertEvents(Table table, List<FeFsPartition> affectedExistingPartitions, boolean isInsertOverwrite) { - if (!catalog_.isEventProcessingActive() || - affectedExistingPartitions.size() == 0) return; + boolean isInsertEventsEnabled = BackendConfig.INSTANCE.isInsertEventsEnabled(); + if (!catalog_.isEventProcessingActive() || !isInsertEventsEnabled + || affectedExistingPartitions.size() == 0) { + return; + } // List of all insert events that we call HMS fireInsertEvent() on. List<InsertEventInfo> insertEventInfos = new ArrayList<>(); + // List of all partitions that we insert into + List<HdfsPartition> partitions = new ArrayList<>(); // Map of partition names to file names of all existing partitions touched by the // insert. @@ -4471,58 +4476,54 @@ public class CatalogOpExecutor { // Find the delta of the files added by the insert if it is not an overwrite // operation. HMS fireListenerEvent() expects an empty list if no new files are // added or if the operation is an insert overwrite. + HdfsPartition hdfsPartition = (HdfsPartition) part; Set<String> deltaFiles = new HashSet<>(); List<String> partVals = null; if (!isInsertOverwrite) { - String partitionName = part.getPartitionName() + "/"; + String partitionName = hdfsPartition.getPartitionName() + "/"; Set<String> filesPostInsert = partitionFilesMapPostInsert.get(partitionName); if (table.getNumClusteringCols() > 0) { Set<String> filesBeforeInsert = partitionFilesMapBeforeInsert.get(partitionName); deltaFiles = Sets.difference(filesBeforeInsert, filesPostInsert); - partVals = part.getPartitionValuesAsStrings(true); + partVals = hdfsPartition.getPartitionValuesAsStrings(true); } else { Map.Entry<String, Set<String>> entry = partitionFilesMapBeforeInsert.entrySet().iterator().next(); deltaFiles = Sets.difference(entry.getValue(), filesPostInsert); } LOG.info("{} new files detected for table {} partition {}.", - filesPostInsert.size(), table.getTableName(), part.getPartitionName()); + filesPostInsert.size(), table.getTableName(), + hdfsPartition.getPartitionName()); } if (deltaFiles != null || isInsertOverwrite) { // Collect all the insert events. - insertEventInfos.add(new InsertEventInfo(table.getDb().getName(), - table.getName(), partVals, deltaFiles, isInsertOverwrite)); + insertEventInfos.add( + new InsertEventInfo(partVals, deltaFiles, isInsertOverwrite)); + if (partVals != null) { + // insert into partition + partitions.add(hdfsPartition); + } } else { LOG.info("No new files were created, and is not a replace. Skipping " + "generating INSERT event."); } } - // Firing insert events by making calls to HMS APIs can be slow for tables with - // large number of partitions. Hence, we fire the insert events asynchronously. - fireInsertEventsAsync(insertEventInfos); - } - - /** - * Helper method to fire insert events asynchronously. This creates a single thread - * to execute the fireInsertEvent method and shuts down the thread after it has - * finished. In case of any exception, we just log the failure of firing insert events. - */ - private void fireInsertEventsAsync(List<InsertEventInfo> insertEventInfos) { - ExecutorService fireInsertEventThread = Executors.newSingleThreadExecutor(); - CompletableFuture.runAsync(() -> { - for (InsertEventInfo info : insertEventInfos) { - try (MetaStoreClient metaStoreClient = catalog_.getMetaStoreClient()) { - MetaStoreUtil.fireInsertEvent(metaStoreClient.getHiveClient(), info); - } catch (Exception e) { - LOG.error("Failed to fire insert event. Some tables might not be" - + " refreshed on other impala clusters.", e); + MetaStoreClient metaStoreClient = catalog_.getMetaStoreClient(); + List<Long> eventIds = MetastoreShim.fireInsertEvents(metaStoreClient, + insertEventInfos, table.getDb().getName(), table.getName()); + if (!eventIds.isEmpty()) { + if (partitions.size() == 0) { // insert into table + catalog_.addVersionsForInflightEvents(true, table, eventIds.get(0)); + } else { // insert into partition + for (int par_idx = 0; par_idx < partitions.size(); par_idx++) { + partitions.get(par_idx).addToVersionsForInflightEvents( + true, eventIds.get(par_idx)); } } - }, Executors.newSingleThreadExecutor()).thenRun(() -> - fireInsertEventThread.shutdown()); + } } /** diff --git a/fe/src/main/java/org/apache/impala/util/MetaStoreUtil.java b/fe/src/main/java/org/apache/impala/util/MetaStoreUtil.java index 7bc463f..81b62f6 100644 --- a/fe/src/main/java/org/apache/impala/util/MetaStoreUtil.java +++ b/fe/src/main/java/org/apache/impala/util/MetaStoreUtil.java @@ -30,9 +30,6 @@ import org.apache.hadoop.hive.metastore.IMetaStoreClient; import org.apache.hadoop.hive.metastore.Warehouse; import org.apache.hadoop.hive.metastore.api.ConfigValSecurityException; import org.apache.hadoop.hive.metastore.api.FieldSchema; -import org.apache.hadoop.hive.metastore.api.FireEventRequest; -import org.apache.hadoop.hive.metastore.api.FireEventRequestData; -import org.apache.hadoop.hive.metastore.api.InsertEventRequestData; import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.metastore.api.SQLPrimaryKey; @@ -317,13 +314,18 @@ public class MetaStoreUtil { } /** + * Check if the hms table is a bucketed table or not + */ + public static boolean isBucketedTable(Table msTbl) { + Preconditions.checkNotNull(msTbl); + return msTbl.getSd().getNumBuckets() > 0; + } + + /** * A helper class that encapsulates all the information needed to fire and insert event * with HMS. */ public static class InsertEventInfo { - private String dbName; - private String tableName; - // List of partition values corresponding to the partition keys in // a partitioned table. This is null for non-partitioned table. private List<String> partVals; @@ -337,48 +339,15 @@ public class MetaStoreUtil { // false otherwise. private boolean isOverwrite; - public InsertEventInfo(String dbName, String tableName, List<String> partVals, - Collection<String> newFiles, boolean isOverwrite) { - this.dbName = dbName; - this.tableName = tableName; + public InsertEventInfo( + List<String> partVals, Collection<String> newFiles, boolean isOverwrite) { this.partVals = partVals; this.newFiles = newFiles; this.isOverwrite = isOverwrite; } - } - /** - * Fires an insert event to HMS notification log. For partitioned table, each - * existing partition touched by the insert will fire a separate insert event. - * - * @param msClient Metastore client, - * @param info A singe insert event encapsulating the information needed to fire insert - * event with HMS. - */ - public static void fireInsertEvent(IMetaStoreClient msClient, - InsertEventInfo info) throws TException { - Preconditions.checkNotNull(msClient); - Preconditions.checkNotNull(info.dbName); - Preconditions.checkNotNull(info.tableName); - Preconditions.checkNotNull(info.newFiles); - LOG.debug("Firing an insert event for {}", info.tableName); - FireEventRequestData data = new FireEventRequestData(); - InsertEventRequestData insertData = new InsertEventRequestData(); - data.setInsertData(insertData); - FireEventRequest rqst = new FireEventRequest(true, data); - rqst.setDbName(info.dbName); - rqst.setTableName(info.tableName); - insertData.setFilesAdded(new ArrayList<>(info.newFiles)); - insertData.setReplace(info.isOverwrite); - if (info.partVals != null) rqst.setPartitionVals(info.partVals); - msClient.fireListenerEvent(rqst); - } - - /** - * Check if the hms table is a bucketed table or not - */ - public static boolean isBucketedTable(Table msTbl) { - Preconditions.checkNotNull(msTbl); - return msTbl.getSd().getNumBuckets() > 0; + public List<String> getPartVals() { return this.partVals; } + public Collection<String> getNewFiles() { return this.newFiles; } + public boolean isOverwrite() { return this.isOverwrite; } } } diff --git a/fe/src/test/java/org/apache/impala/catalog/events/MetastoreEventsProcessorTest.java b/fe/src/test/java/org/apache/impala/catalog/events/MetastoreEventsProcessorTest.java index 41995a8..ba4a0b5 100644 --- a/fe/src/test/java/org/apache/impala/catalog/events/MetastoreEventsProcessorTest.java +++ b/fe/src/test/java/org/apache/impala/catalog/events/MetastoreEventsProcessorTest.java @@ -37,6 +37,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Random; @@ -90,6 +91,7 @@ import org.apache.impala.compat.MetastoreShim; import org.apache.impala.service.CatalogOpExecutor; import org.apache.impala.service.FeSupport; import org.apache.impala.testutil.CatalogServiceTestCatalog; +import org.apache.impala.testutil.TestUtils; import org.apache.impala.thrift.TAlterDbParams; import org.apache.impala.thrift.TAlterDbSetOwnerParams; import org.apache.impala.thrift.TAlterDbType; @@ -107,6 +109,7 @@ import org.apache.impala.thrift.TAlterTableSetRowFormatParams; import org.apache.impala.thrift.TAlterTableSetTblPropertiesParams; import org.apache.impala.thrift.TAlterTableType; import org.apache.impala.thrift.TAlterTableUpdateStatsParams; +import org.apache.impala.thrift.TCatalogServiceRequestHeader; import org.apache.impala.thrift.TColumn; import org.apache.impala.thrift.TColumnType; import org.apache.impala.thrift.TCreateDbParams; @@ -134,12 +137,14 @@ import org.apache.impala.thrift.TTableStats; import org.apache.impala.thrift.TTypeNode; import org.apache.impala.thrift.TTypeNodeType; import org.apache.impala.thrift.TUniqueId; +import org.apache.impala.thrift.TUpdateCatalogRequest; import org.apache.impala.util.MetaStoreUtil; import org.apache.impala.util.MetaStoreUtil.InsertEventInfo; import org.apache.thrift.TException; import org.junit.After; import org.junit.AfterClass; import org.junit.Assert; +import org.junit.Assume; import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; @@ -712,6 +717,67 @@ public class MetastoreEventsProcessorTest { } /** + * Test insert from impala. Insert into table and partition from impala + * should be treated as self-event. + */ + @Test + public void testInsertFromImpala() throws ImpalaException { + Assume.assumeTrue("Skipping this test because it only works with Hive-3 or greater", + TestUtils.getHiveMajorVersion() >= 3); + // Test insert into multiple partitions + createDatabaseFromImpala(TEST_DB_NAME, null); + String tableToInsertPart = "tbl_with_mul_part"; + createTableFromImpala(TEST_DB_NAME, tableToInsertPart, true); + String tableToInsertMulPart = "tbl_to_insert_mul_part"; + createTableFromImpala(TEST_DB_NAME, tableToInsertMulPart, true); + // add first partition + TPartitionDef partitionDef = new TPartitionDef(); + partitionDef.addToPartition_spec(new TPartitionKeyValue("p1", "1")); + partitionDef.addToPartition_spec(new TPartitionKeyValue("p2", "100")); + alterTableAddPartition(TEST_DB_NAME, tableToInsertPart, partitionDef); + alterTableAddPartition(TEST_DB_NAME, tableToInsertMulPart, partitionDef); + // add second partition + partitionDef = new TPartitionDef(); + partitionDef.addToPartition_spec(new TPartitionKeyValue("p1", "1")); + partitionDef.addToPartition_spec(new TPartitionKeyValue("p2", "200")); + alterTableAddPartition(TEST_DB_NAME, tableToInsertPart, partitionDef); + alterTableAddPartition(TEST_DB_NAME, tableToInsertMulPart, partitionDef); + eventsProcessor_.processEvents(); + // count self event from here, numberOfSelfEventsBefore=4 as we have 4 ADD PARTITION + // events + long numberOfSelfEventsBefore = + eventsProcessor_.getMetrics() + .getCounter(MetastoreEventsProcessor.NUMBER_OF_SELF_EVENTS) + .getCount(); + // insert into partition + insertFromImpala(tableToInsertPart, true, "1", "100"); + insertFromImpala(tableToInsertPart, true, "1", "200"); + // insert into multiple partition + Set<String> created_partitions = new HashSet<String>(); + String partition1 = "p1=1/p2=100/"; + String partition2 = "p1=1/p2=200/"; + created_partitions.add(partition1); + created_partitions.add(partition2); + insertMulPartFromImpala(tableToInsertMulPart, tableToInsertPart, created_partitions); + eventsProcessor_.processEvents(); + + // Test insert into table + String tableToInsert = "tbl_to_insert"; + createTableFromImpala(TEST_DB_NAME, tableToInsert, false); + insertFromImpala(tableToInsert, false, "", ""); + eventsProcessor_.processEvents(); + + long selfEventsCountAfter = + eventsProcessor_.getMetrics() + .getCounter(MetastoreEventsProcessor.NUMBER_OF_SELF_EVENTS) + .getCount(); + // 2 single insert partition events, 1 multi insert partitions which includes 2 single + // insert events 1 single insert table event + assertEquals("Unexpected number of self-events generated", + numberOfSelfEventsBefore + 5, selfEventsCountAfter); + } + + /** * Test generates a sequence of create_table, insert and drop_table in the event stream * to make sure when the insert event is processed on a removed table, it doesn't cause * any issues with the event processing. @@ -835,9 +901,10 @@ public class MetastoreEventsProcessorTest { List <String> newFiles = addFilesToDirectory(parentPath, "testFile.", totalNumberOfFilesToAdd, isOverwrite); try (MetaStoreClient metaStoreClient = catalog_.getMetaStoreClient()) { - MetaStoreUtil.fireInsertEvent(metaStoreClient.getHiveClient(), - new InsertEventInfo(msTbl.getDbName(), msTbl.getTableName(), null, - newFiles, isOverwrite)); + List<InsertEventInfo> insertEventInfos = new ArrayList<>(); + insertEventInfos.add(new InsertEventInfo(null, newFiles, isOverwrite)); + MetastoreShim.fireInsertEventHelper(metaStoreClient.getHiveClient(), + insertEventInfos, msTbl.getDbName(), msTbl.getTableName()); } } @@ -2711,6 +2778,57 @@ public class MetastoreEventsProcessorTest { catalogOpExecutor_.execDdlRequest(req); } + /** + * Insert multiple partitions into table from Impala + */ + private void insertMulPartFromImpala(String tblName1, String tblName2, + Set<String> created_partitions) throws ImpalaException { + String insert_mul_part = String.format( + "insert into table %s partition(p1, p2) select * from %s", tblName1, tblName2); + TUpdateCatalogRequest testInsertRequest = createTestTUpdateCatalogRequest( + TEST_DB_NAME, tblName1, insert_mul_part, created_partitions); + catalogOpExecutor_.updateCatalog(testInsertRequest); + } + + /** + * Insert into table or partition from Impala + * @param tblName + * @param isPartitioned + * @return + */ + private void insertFromImpala(String tblName, boolean isPartitioned, String p1val, + String p2val) throws ImpalaException { + String partition = String.format("partition (p1=%s, p2='%s')", p1val, p2val); + String test_insert_tbl = String.format("insert into table %s %s values ('a','aa') ", + tblName, isPartitioned ? partition : ""); + Set<String> created_partitions = new HashSet<String>(); + String created_part_str = + isPartitioned ? String.format("p1=%s/p2=%s/", p1val, p2val) : ""; + created_partitions.add(created_part_str); + TUpdateCatalogRequest testInsertRequest = createTestTUpdateCatalogRequest( + TEST_DB_NAME, tblName, test_insert_tbl, created_partitions); + catalogOpExecutor_.updateCatalog(testInsertRequest); + } + + /** + * Create DML request to Catalog + * @param dBName + * @param tableName + * @param redacted_sql_stmt + * @param created_partitions + * @return + */ + private TUpdateCatalogRequest createTestTUpdateCatalogRequest(String dBName, + String tableName, String redacted_sql_stmt, Set<String> created_partitions) { + TUpdateCatalogRequest tUpdateCatalogRequest = new TUpdateCatalogRequest(); + tUpdateCatalogRequest.setDb_name(dBName); + tUpdateCatalogRequest.setTarget_table(tableName); + tUpdateCatalogRequest.setCreated_partitions((created_partitions)); + tUpdateCatalogRequest.setHeader(new TCatalogServiceRequestHeader()); + tUpdateCatalogRequest.getHeader().setRedacted_sql_stmt(redacted_sql_stmt); + return tUpdateCatalogRequest; + } + private TColumn getScalarColumn(String colName, TPrimitiveType type) { TTypeNode tTypeNode = new TTypeNode(TTypeNodeType.SCALAR); tTypeNode.setScalar_type(new TScalarType(type)); diff --git a/tests/custom_cluster/test_event_processing.py b/tests/custom_cluster/test_event_processing.py index f39f55f..cdd1a48 100644 --- a/tests/custom_cluster/test_event_processing.py +++ b/tests/custom_cluster/test_event_processing.py @@ -21,6 +21,7 @@ import pytest from tests.common.skip import SkipIfHive2 from tests.common.custom_cluster_test_suite import CustomClusterTestSuite +from tests.common.environ import HIVE_MAJOR_VERSION from tests.common.skip import SkipIfS3, SkipIfABFS, SkipIfADLS, SkipIfIsilon, SkipIfLocal from tests.util.hive_utils import HiveDbWrapper from tests.util.event_processor_utils import EventProcessorUtils @@ -268,11 +269,6 @@ class TestEventProcessing(CustomClusterTestSuite): # add partition "alter table {0}.{1} add if not exists partition (year=1111, month=1)".format( db_name, tbl2), - # insert into a existing partition; generates ALTER_PARTITION - # TODO add support for insert_events (IMPALA-8632) - # "insert into table {0}.{1} partition (year, month) " - # "select * from functional.alltypessmall where year=2009 and month=1".format( - # db_name, tbl2), # compute stats will generates ALTER_PARTITION "compute stats {0}.{1}".format(db_name, tbl2), "alter table {0}.{1} recover partitions".format(db_name, recover_tbl_name)], @@ -295,6 +291,11 @@ class TestEventProcessing(CustomClusterTestSuite): "alter table {0}.{1} drop if exists partition (year=2100, month=1)".format( db_name, tbl_name)] } + if HIVE_MAJOR_VERSION >= 3: + # insert into a existing partition; generates INSERT events + self_event_test_queries[True].append("insert into table {0}.{1} partition " + "(year, month) select * from functional.alltypessmall where year=2009 " + "and month=1".format(db_name, tbl2)) return self_event_test_queries def __get_hive_test_queries(self, db_name, recover_tbl_name):
