This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 105a162f94 [Enhancement](multi-catalog) Merge hms events every round
to speed up events processing. (#21589)
105a162f94 is described below
commit 105a162f94824c28e770a1013c259412b2dec25b
Author: Xiangyu Wang <[email protected]>
AuthorDate: Wed Jul 12 23:41:07 2023 +0800
[Enhancement](multi-catalog) Merge hms events every round to speed up
events processing. (#21589)
Currently we find that MetastoreEventsProcessor can not catch up the event
producing rate in our cluster, so we need to merge some hms events every round.
---
.../datasource/hive/event/AddPartitionEvent.java | 10 +-
.../datasource/hive/event/AlterDatabaseEvent.java | 72 +++++++-
.../datasource/hive/event/AlterPartitionEvent.java | 23 ++-
.../datasource/hive/event/AlterTableEvent.java | 46 ++++-
.../datasource/hive/event/CreateDatabaseEvent.java | 5 +
.../datasource/hive/event/CreateTableEvent.java | 11 ++
.../datasource/hive/event/DropPartitionEvent.java | 2 +-
.../datasource/hive/event/DropTableEvent.java | 19 ++
.../doris/datasource/hive/event/InsertEvent.java | 23 +++
.../datasource/hive/event/MetastoreEvent.java | 11 ++
.../hive/event/MetastoreEventFactory.java | 86 +++++++--
...baseEvent.java => MetastorePartitionEvent.java} | 26 +--
.../datasource/hive/event/MetastoreTableEvent.java | 50 ++++++
.../external/hms/MetastoreEventFactoryTest.java | 193 +++++++++++++++++++++
14 files changed, 543 insertions(+), 34 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/AddPartitionEvent.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/AddPartitionEvent.java
index 11d74ed9c2..b94333e41b 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/AddPartitionEvent.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/AddPartitionEvent.java
@@ -37,10 +37,18 @@ import java.util.stream.Collectors;
/**
* MetastoreEvent for ADD_PARTITION event type
*/
-public class AddPartitionEvent extends MetastoreTableEvent {
+public class AddPartitionEvent extends MetastorePartitionEvent {
private final Table hmsTbl;
private final List<String> partitionNames;
+ // for test
+ public AddPartitionEvent(long eventId, String catalogName, String dbName,
+ String tblName, List<String> partitionNames) {
+ super(eventId, catalogName, dbName, tblName);
+ this.partitionNames = partitionNames;
+ this.hmsTbl = null;
+ }
+
private AddPartitionEvent(NotificationEvent event,
String catalogName) {
super(event, catalogName);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/AlterDatabaseEvent.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/AlterDatabaseEvent.java
index 59445a5dc4..d56eb52fad 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/AlterDatabaseEvent.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/AlterDatabaseEvent.java
@@ -18,21 +18,74 @@
package org.apache.doris.datasource.hive.event;
+import org.apache.doris.catalog.Env;
+import org.apache.doris.common.DdlException;
+import org.apache.doris.datasource.CatalogIf;
+import org.apache.doris.datasource.ExternalCatalog;
+
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
+import org.apache.hadoop.hive.metastore.api.Database;
import org.apache.hadoop.hive.metastore.api.NotificationEvent;
+import
org.apache.hadoop.hive.metastore.messaging.json.JSONAlterDatabaseMessage;
import java.util.List;
/**
- * MetastoreEvent for Alter_DATABASE event type
+ * MetastoreEvent for ALTER_DATABASE event type
*/
public class AlterDatabaseEvent extends MetastoreEvent {
+ private final Database dbBefore;
+ private final Database dbAfter;
+
+ // true if this alter event was due to a rename operation
+ private final boolean isRename;
+
+ // for test
+ public AlterDatabaseEvent(long eventId, String catalogName, String dbName,
boolean isRename) {
+ super(eventId, catalogName, dbName, null);
+ this.isRename = isRename;
+ this.dbBefore = null;
+ this.dbAfter = null;
+ }
+
private AlterDatabaseEvent(NotificationEvent event,
String catalogName) {
super(event, catalogName);
Preconditions.checkArgument(getEventType().equals(MetastoreEventType.ALTER_DATABASE));
+
+ try {
+ JSONAlterDatabaseMessage alterDatabaseMessage =
+ (JSONAlterDatabaseMessage)
MetastoreEventsProcessor.getMessageDeserializer(event.getMessageFormat())
+ .getAlterDatabaseMessage(event.getMessage());
+ dbBefore =
Preconditions.checkNotNull(alterDatabaseMessage.getDbObjBefore());
+ dbAfter =
Preconditions.checkNotNull(alterDatabaseMessage.getDbObjAfter());
+ } catch (Exception e) {
+ throw new MetastoreNotificationException(
+ debugString("Unable to parse the alter database message"),
e);
+ }
+ // this is a rename event if either dbName of before and after object
changed
+ isRename = !dbBefore.getName().equalsIgnoreCase(dbAfter.getName());
+ }
+
+ private void processRename() throws DdlException {
+ CatalogIf catalog =
Env.getCurrentEnv().getCatalogMgr().getCatalog(catalogName);
+ if (catalog == null) {
+ throw new DdlException("No catalog found with name: " +
catalogName);
+ }
+ if (!(catalog instanceof ExternalCatalog)) {
+ throw new DdlException("Only support ExternalCatalog Databases");
+ }
+ if (catalog.getDbNullable(dbAfter.getName()) != null) {
+ infoLog("AlterExternalDatabase canceled, because dbAfter has
exist, "
+ + "catalogName:[{}],dbName:[{}]",
+ catalogName, dbAfter.getName());
+ return;
+ }
+
Env.getCurrentEnv().getCatalogMgr().dropExternalDatabase(dbBefore.getName(),
catalogName, true);
+
Env.getCurrentEnv().getCatalogMgr().createExternalDatabase(dbAfter.getName(),
catalogName, true);
+
}
protected static List<MetastoreEvent> getEvents(NotificationEvent event,
@@ -40,9 +93,22 @@ public class AlterDatabaseEvent extends MetastoreEvent {
return Lists.newArrayList(new AlterDatabaseEvent(event, catalogName));
}
+ public boolean isRename() {
+ return isRename;
+ }
+
@Override
protected void process() throws MetastoreNotificationException {
- // only can change properties,we do nothing
- infoLog("catalogName:[{}],dbName:[{}]", catalogName, dbName);
+ try {
+ if (isRename) {
+ processRename();
+ return;
+ }
+ // only can change properties,we do nothing
+ infoLog("catalogName:[{}],dbName:[{}]", catalogName, dbName);
+ } catch (Exception e) {
+ throw new MetastoreNotificationException(
+ debugString("Failed to process event"), e);
+ }
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/AlterPartitionEvent.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/AlterPartitionEvent.java
index 788b79f885..0fc6be375d 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/AlterPartitionEvent.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/AlterPartitionEvent.java
@@ -30,12 +30,13 @@ import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.metastore.messaging.AlterPartitionMessage;
import java.util.List;
+import java.util.Objects;
import java.util.stream.Collectors;
/**
* MetastoreEvent for ALTER_PARTITION event type
*/
-public class AlterPartitionEvent extends MetastoreTableEvent {
+public class AlterPartitionEvent extends MetastorePartitionEvent {
private final Table hmsTbl;
private final org.apache.hadoop.hive.metastore.api.Partition
partitionAfter;
private final org.apache.hadoop.hive.metastore.api.Partition
partitionBefore;
@@ -44,6 +45,18 @@ public class AlterPartitionEvent extends MetastoreTableEvent
{
// true if this alter event was due to a rename operation
private final boolean isRename;
+ // for test
+ public AlterPartitionEvent(long eventId, String catalogName, String
dbName, String tblName,
+ String partitionNameBefore, String
partitionNameAfter) {
+ super(eventId, catalogName, dbName, tblName);
+ this.partitionNameBefore = partitionNameBefore;
+ this.partitionNameAfter = partitionNameAfter;
+ this.hmsTbl = null;
+ this.partitionAfter = null;
+ this.partitionBefore = null;
+ isRename = !partitionNameBefore.equalsIgnoreCase(partitionNameAfter);
+ }
+
private AlterPartitionEvent(NotificationEvent event,
String catalogName) {
super(event, catalogName);
@@ -94,4 +107,12 @@ public class AlterPartitionEvent extends
MetastoreTableEvent {
debugString("Failed to process event"), e);
}
}
+
+ @Override
+ protected boolean canBeBatched(MetastoreEvent event) {
+ return isSameTable(event)
+ && event instanceof AlterPartitionEvent
+ && Objects.equals(partitionBefore, ((AlterPartitionEvent)
event).partitionBefore)
+ && Objects.equals(partitionAfter, ((AlterPartitionEvent)
event).partitionAfter);
+ }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/AlterTableEvent.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/AlterTableEvent.java
index cbb1ee8478..bc09d6ef2c 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/AlterTableEvent.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/AlterTableEvent.java
@@ -41,6 +41,18 @@ public class AlterTableEvent extends MetastoreTableEvent {
// true if this alter event was due to a rename operation
private final boolean isRename;
private final boolean isView;
+ private final boolean willCreateOrDropTable;
+
+ // for test
+ public AlterTableEvent(long eventId, String catalogName, String dbName,
+ String tblName, boolean isRename, boolean isView) {
+ super(eventId, catalogName, dbName, tblName);
+ this.isRename = isRename;
+ this.isView = isView;
+ this.tableBefore = null;
+ this.tableAfter = null;
+ this.willCreateOrDropTable = isRename || isView;
+ }
private AlterTableEvent(NotificationEvent event, String catalogName) {
super(event, catalogName);
@@ -61,13 +73,19 @@ public class AlterTableEvent extends MetastoreTableEvent {
isRename =
!tableBefore.getDbName().equalsIgnoreCase(tableAfter.getDbName())
||
!tableBefore.getTableName().equalsIgnoreCase(tableAfter.getTableName());
isView = tableBefore.isSetViewExpandedText() ||
tableBefore.isSetViewOriginalText();
+ this.willCreateOrDropTable = isRename || isView;
}
public static List<MetastoreEvent> getEvents(NotificationEvent event,
- String catalogName) {
+ String catalogName) {
return Lists.newArrayList(new AlterTableEvent(event, catalogName));
}
+ @Override
+ protected boolean willCreateOrDropTable() {
+ return willCreateOrDropTable;
+ }
+
private void processRecreateTable() throws DdlException {
if (!isView) {
return;
@@ -97,6 +115,14 @@ public class AlterTableEvent extends MetastoreTableEvent {
}
+ public boolean isRename() {
+ return isRename;
+ }
+
+ public boolean isView() {
+ return isView;
+ }
+
/**
* If the ALTER_TABLE event is due a table rename, this method removes the
old table
* and creates a new table with the new name. Else, we just refresh table
@@ -124,4 +150,22 @@ public class AlterTableEvent extends MetastoreTableEvent {
debugString("Failed to process event"), e);
}
}
+
+ @Override
+ protected boolean canBeBatched(MetastoreEvent that) {
+ if (!isSameTable(that)) {
+ return false;
+ }
+
+ // `that` event must not be a rename table event
+ // so if the process of this event will drop this table,
+ // it can merge all the table's events before
+ if (willCreateOrDropTable) {
+ return true;
+ }
+
+ // that event must be a MetastoreTableEvent event
+ // otherwise `isSameTable` will return false
+ return !((MetastoreTableEvent) that).willCreateOrDropTable();
+ }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/CreateDatabaseEvent.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/CreateDatabaseEvent.java
index e115f80f51..eb8da00cfe 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/CreateDatabaseEvent.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/CreateDatabaseEvent.java
@@ -32,6 +32,11 @@ import java.util.List;
*/
public class CreateDatabaseEvent extends MetastoreEvent {
+ // for test
+ public CreateDatabaseEvent(long eventId, String catalogName, String
dbName) {
+ super(eventId, catalogName, dbName, null);
+ }
+
private CreateDatabaseEvent(NotificationEvent event,
String catalogName) {
super(event, catalogName);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/CreateTableEvent.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/CreateTableEvent.java
index 9ac8fd4e68..1ec8cbfde5 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/CreateTableEvent.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/CreateTableEvent.java
@@ -35,6 +35,12 @@ import java.util.List;
public class CreateTableEvent extends MetastoreTableEvent {
private final Table hmsTbl;
+ // for test
+ public CreateTableEvent(long eventId, String catalogName, String dbName,
String tblName) {
+ super(eventId, catalogName, dbName, tblName);
+ this.hmsTbl = null;
+ }
+
private CreateTableEvent(NotificationEvent event, String catalogName)
throws MetastoreNotificationException {
super(event, catalogName);
Preconditions.checkArgument(MetastoreEventType.CREATE_TABLE.equals(getEventType()));
@@ -55,6 +61,11 @@ public class CreateTableEvent extends MetastoreTableEvent {
return Lists.newArrayList(new CreateTableEvent(event, catalogName));
}
+ @Override
+ protected boolean willCreateOrDropTable() {
+ return true;
+ }
+
@Override
protected void process() throws MetastoreNotificationException {
try {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/DropPartitionEvent.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/DropPartitionEvent.java
index a53cf218db..7f8ade0819 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/DropPartitionEvent.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/DropPartitionEvent.java
@@ -36,7 +36,7 @@ import java.util.stream.Collectors;
/**
* MetastoreEvent for ADD_PARTITION event type
*/
-public class DropPartitionEvent extends MetastoreTableEvent {
+public class DropPartitionEvent extends MetastorePartitionEvent {
private final Table hmsTbl;
private final List<String> partitionNames;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/DropTableEvent.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/DropTableEvent.java
index c73a59f1c3..7b43a09666 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/DropTableEvent.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/DropTableEvent.java
@@ -34,6 +34,13 @@ import java.util.List;
public class DropTableEvent extends MetastoreTableEvent {
private final String tableName;
+ // for test
+ public DropTableEvent(long eventId, String catalogName, String dbName,
+ String tblName) {
+ super(eventId, catalogName, dbName, tblName);
+ this.tableName = tblName;
+ }
+
private DropTableEvent(NotificationEvent event,
String catalogName) {
super(event, catalogName);
@@ -55,6 +62,11 @@ public class DropTableEvent extends MetastoreTableEvent {
return Lists.newArrayList(new DropTableEvent(event, catalogName));
}
+ @Override
+ protected boolean willCreateOrDropTable() {
+ return true;
+ }
+
@Override
protected void process() throws MetastoreNotificationException {
try {
@@ -65,4 +77,11 @@ public class DropTableEvent extends MetastoreTableEvent {
debugString("Failed to process event"), e);
}
}
+
+ @Override
+ protected boolean canBeBatched(MetastoreEvent that) {
+ // `that` event must not be a rename table event
+ // so merge all events which belong to this table before is ok
+ return isSameTable(that);
+ }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/InsertEvent.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/InsertEvent.java
index 27438a4dcb..3b5650ade4 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/InsertEvent.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/InsertEvent.java
@@ -35,6 +35,13 @@ import java.util.List;
public class InsertEvent extends MetastoreTableEvent {
private final Table hmsTbl;
+ // for test
+ public InsertEvent(long eventId, String catalogName, String dbName,
+ String tblName) {
+ super(eventId, catalogName, dbName, tblName);
+ this.hmsTbl = null;
+ }
+
private InsertEvent(NotificationEvent event, String catalogName) {
super(event, catalogName);
Preconditions.checkArgument(getEventType().equals(MetastoreEventType.INSERT));
@@ -54,6 +61,11 @@ public class InsertEvent extends MetastoreTableEvent {
return Lists.newArrayList(new InsertEvent(event, catalogName));
}
+ @Override
+ protected boolean willCreateOrDropTable() {
+ return false;
+ }
+
@Override
protected void process() throws MetastoreNotificationException {
try {
@@ -72,4 +84,15 @@ public class InsertEvent extends MetastoreTableEvent {
debugString("Failed to process event"), e);
}
}
+
+ @Override
+ protected boolean canBeBatched(MetastoreEvent that) {
+ if (!isSameTable(that)) {
+ return false;
+ }
+
+ // that event must be a MetastoreTableEvent event
+ // otherwise `isSameTable` will return false
+ return !((MetastoreTableEvent) that).willCreateOrDropTable();
+ }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreEvent.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreEvent.java
index 9693bb0c4c..08aff93dda 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreEvent.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreEvent.java
@@ -57,6 +57,17 @@ public abstract class MetastoreEvent {
protected final String catalogName;
+ // for test
+ protected MetastoreEvent(long eventId, String catalogName, String dbName,
String tblName) {
+ this.eventId = eventId;
+ this.catalogName = catalogName;
+ this.dbName = dbName;
+ this.tblName = tblName;
+ this.eventType = null;
+ this.metastoreNotificationEvent = null;
+ this.event = null;
+ }
+
protected MetastoreEvent(NotificationEvent event, String catalogName) {
this.event = event;
this.dbName = event.getDbName();
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreEventFactory.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreEventFactory.java
index 3ab2a7e030..a5bf0d953c 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreEventFactory.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreEventFactory.java
@@ -21,12 +21,17 @@ package org.apache.doris.datasource.hive.event;
import org.apache.doris.datasource.HMSExternalCatalog;
import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
import org.apache.hadoop.hive.metastore.api.NotificationEvent;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.stream.Collectors;
/**
* Factory class to create various MetastoreEvents.
@@ -36,7 +41,7 @@ public class MetastoreEventFactory implements EventFactory {
@Override
public List<MetastoreEvent>
transferNotificationEventToMetastoreEvents(NotificationEvent event,
- String catalogName) {
+
String catalogName) {
Preconditions.checkNotNull(event.getEventType());
MetastoreEventType metastoreEventType =
MetastoreEventType.from(event.getEventType());
switch (metastoreEventType) {
@@ -68,19 +73,80 @@ public class MetastoreEventFactory implements EventFactory {
List<MetastoreEvent> getMetastoreEvents(List<NotificationEvent> events,
HMSExternalCatalog hmsExternalCatalog) {
List<MetastoreEvent> metastoreEvents = Lists.newArrayList();
+ String catalogName = hmsExternalCatalog.getName();
for (NotificationEvent event : events) {
-
metastoreEvents.addAll(transferNotificationEventToMetastoreEvents(event,
hmsExternalCatalog.getName()));
+
metastoreEvents.addAll(transferNotificationEventToMetastoreEvents(event,
catalogName));
}
- return createBatchEvents(metastoreEvents);
+ return createBatchEvents(catalogName, metastoreEvents);
}
/**
- * Create batch event tasks according to HivePartitionName to facilitate
subsequent parallel processing.
- * For ADD_PARTITION and DROP_PARTITION, we directly override any events
before that partition.
- * For a partition, it is meaningless to process any events before the
drop partition.
- */
- List<MetastoreEvent> createBatchEvents(List<MetastoreEvent> events) {
- // now do nothing
- return events;
+ * Merge events to reduce the cost time on event processing, currently
mainly handles MetastoreTableEvent
+ * because merge MetastoreTableEvent is simple and cost-effective.
+ * For example, consider there are some events as following:
+ *
+ * event1: alter table db1.t1 add partition p1;
+ * event2: alter table db1.t1 drop partition p2;
+ * event3: alter table db1.t2 add partition p3;
+ * event4: alter table db2.t3 rename to t4;
+ * event5: drop table db1.t1;
+ *
+ * Only `event3 event4 event5` will be reserved and other events will be
skipped.
+ * */
+ public List<MetastoreEvent> createBatchEvents(String catalogName,
List<MetastoreEvent> events) {
+ List<MetastoreEvent> eventsCopy = Lists.newArrayList(events);
+ Map<MetastoreTableEvent.TableKey, List<Integer>> indexMap =
Maps.newLinkedHashMap();
+ for (int i = 0; i < events.size(); i++) {
+ MetastoreEvent event = events.get(i);
+
+ // if the event is a rename event, just clear indexMap
+ // to make sure the table references of these events in indexMap
will not change
+ if (event instanceof AlterDatabaseEvent && ((AlterDatabaseEvent)
event).isRename()) {
+ indexMap.clear();
+ continue;
+ }
+
+ // Only check MetastoreTableEvent
+ if (!(event instanceof MetastoreTableEvent)) {
+ continue;
+ }
+
+ // Divide events into multi groups to reduce check count
+ MetastoreTableEvent.TableKey groupKey = ((MetastoreTableEvent)
event).getTableKey();
+ if (!indexMap.containsKey(groupKey)) {
+ List<Integer> indexList = Lists.newLinkedList();
+ indexList.add(i);
+ indexMap.put(groupKey, indexList);
+ continue;
+ }
+
+ List<Integer> indexList = indexMap.get(groupKey);
+ for (int j = 0; j < indexList.size(); j++) {
+ int candidateIndex = indexList.get(j);
+ if (candidateIndex == -1) {
+ continue;
+ }
+ if (event.canBeBatched(events.get(candidateIndex))) {
+ eventsCopy.set(candidateIndex, null);
+ indexList.set(j, -1);
+ }
+ }
+ indexList = indexList.stream().filter(index -> index != -1)
+ .collect(Collectors.toList());
+ indexList.add(i);
+ indexMap.put(groupKey, indexList);
+
+ // if the event is a rename event, just clear indexMap
+ // to make sure the table references of these events in indexMap
will not change
+ if (event instanceof AlterTableEvent && ((AlterTableEvent)
event).isRename()) {
+ indexMap.clear();
+ }
+ }
+
+ List<MetastoreEvent> filteredEvents =
eventsCopy.stream().filter(Objects::nonNull)
+ .collect(Collectors.toList());
+ LOG.info("Event size on catalog [{}] before merge is [{}], after merge
is [{}]",
+ catalogName, events.size(), filteredEvents.size());
+ return ImmutableList.copyOf(filteredEvents);
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/AlterDatabaseEvent.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastorePartitionEvent.java
similarity index 55%
copy from
fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/AlterDatabaseEvent.java
copy to
fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastorePartitionEvent.java
index 59445a5dc4..f8bb457ea3 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/AlterDatabaseEvent.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastorePartitionEvent.java
@@ -18,31 +18,23 @@
package org.apache.doris.datasource.hive.event;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
import org.apache.hadoop.hive.metastore.api.NotificationEvent;
-import java.util.List;
-
/**
- * MetastoreEvent for Alter_DATABASE event type
+ * Base class for all the partition events
*/
-public class AlterDatabaseEvent extends MetastoreEvent {
+public abstract class MetastorePartitionEvent extends MetastoreTableEvent {
- private AlterDatabaseEvent(NotificationEvent event,
- String catalogName) {
- super(event, catalogName);
-
Preconditions.checkArgument(getEventType().equals(MetastoreEventType.ALTER_DATABASE));
+ // for test
+ protected MetastorePartitionEvent(long eventId, String catalogName, String
dbName, String tblName) {
+ super(eventId, catalogName, dbName, tblName);
}
- protected static List<MetastoreEvent> getEvents(NotificationEvent event,
- String catalogName) {
- return Lists.newArrayList(new AlterDatabaseEvent(event, catalogName));
+ protected MetastorePartitionEvent(NotificationEvent event, String
catalogName) {
+ super(event, catalogName);
}
- @Override
- protected void process() throws MetastoreNotificationException {
- // only can change properties,we do nothing
- infoLog("catalogName:[{}],dbName:[{}]", catalogName, dbName);
+ protected boolean willCreateOrDropTable() {
+ return false;
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreTableEvent.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreTableEvent.java
index 70f56bdbb0..c797c1c08d 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreTableEvent.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreTableEvent.java
@@ -23,12 +23,17 @@ import com.google.common.collect.ImmutableList;
import org.apache.hadoop.hive.metastore.api.NotificationEvent;
import java.util.List;
+import java.util.Objects;
/**
* Base class for all the table events
*/
public abstract class MetastoreTableEvent extends MetastoreEvent {
+ // for test
+ protected MetastoreTableEvent(long eventId, String catalogName, String
dbName, String tblName) {
+ super(eventId, catalogName, dbName, tblName);
+ }
protected MetastoreTableEvent(NotificationEvent event, String catalogName)
{
super(event, catalogName);
@@ -47,4 +52,49 @@ public abstract class MetastoreTableEvent extends
MetastoreEvent {
.add("numFiles")
.add("comment")
.build();
+
+ protected boolean isSameTable(MetastoreEvent that) {
+ if (!(that instanceof MetastoreTableEvent)) {
+ return false;
+ }
+ TableKey thisKey = getTableKey();
+ TableKey thatKey = ((MetastoreTableEvent) that).getTableKey();
+ return Objects.equals(thisKey, thatKey);
+ }
+
+ /**
+ * Returns if the process of this event will create or drop this table.
+ */
+ protected abstract boolean willCreateOrDropTable();
+
+ public TableKey getTableKey() {
+ return new TableKey(catalogName, dbName, tblName);
+ }
+
+ static class TableKey {
+ private final String catalogName;
+ private final String dbName;
+ private final String tblName;
+
+ private TableKey(String catalogName, String dbName, String tblName) {
+ this.catalogName = catalogName;
+ this.dbName = dbName;
+ this.tblName = tblName;
+ }
+
+ @Override
+ public boolean equals(Object that) {
+ if (!(that instanceof TableKey)) {
+ return false;
+ }
+ return Objects.equals(catalogName, ((TableKey) that).catalogName)
+ && Objects.equals(dbName, ((TableKey) that).dbName)
+ && Objects.equals(tblName, ((TableKey) that).tblName);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(catalogName, dbName, tblName);
+ }
+ }
}
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/external/hms/MetastoreEventFactoryTest.java
b/fe/fe-core/src/test/java/org/apache/doris/external/hms/MetastoreEventFactoryTest.java
new file mode 100644
index 0000000000..ba18c84bd7
--- /dev/null
+++
b/fe/fe-core/src/test/java/org/apache/doris/external/hms/MetastoreEventFactoryTest.java
@@ -0,0 +1,193 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.external.hms;
+
+import org.apache.doris.datasource.hive.event.AddPartitionEvent;
+import org.apache.doris.datasource.hive.event.AlterDatabaseEvent;
+import org.apache.doris.datasource.hive.event.AlterPartitionEvent;
+import org.apache.doris.datasource.hive.event.AlterTableEvent;
+import org.apache.doris.datasource.hive.event.CreateDatabaseEvent;
+import org.apache.doris.datasource.hive.event.CreateTableEvent;
+import org.apache.doris.datasource.hive.event.DropTableEvent;
+import org.apache.doris.datasource.hive.event.InsertEvent;
+import org.apache.doris.datasource.hive.event.MetastoreEvent;
+import org.apache.doris.datasource.hive.event.MetastoreEventFactory;
+
+import org.apache.hadoop.util.Lists;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.List;
+
+
+public class MetastoreEventFactoryTest {
+ private static final MetastoreEventFactory factory = new
MetastoreEventFactory();
+
+ @Test
+ public void testCreateBatchEvents() {
+ AlterPartitionEvent e1 = new AlterPartitionEvent(1L, "test_ctl",
"test_db", "t1", "p1", "p1");
+ AlterPartitionEvent e2 = new AlterPartitionEvent(2L, "test_ctl",
"test_db", "t1", "p1", "p1");
+ AddPartitionEvent e3 = new AddPartitionEvent(3L, "test_ctl",
"test_db", "t1", Arrays.asList("p1"));
+ AlterTableEvent e4 = new AlterTableEvent(4L, "test_ctl", "test_db",
"t1", false, false);
+ AlterTableEvent e5 = new AlterTableEvent(5L, "test_ctl", "test_db",
"t1", true, false);
+ AlterTableEvent e6 = new AlterTableEvent(6L, "test_ctl", "test_db",
"t1", false, true);
+ DropTableEvent e7 = new DropTableEvent(7L, "test_ctl", "test_db",
"t1");
+ InsertEvent e8 = new InsertEvent(8L, "test_ctl", "test_db", "t1");
+ CreateDatabaseEvent e9 = new CreateDatabaseEvent(9L, "test_ctl",
"test_db2");
+ AlterPartitionEvent e10 = new AlterPartitionEvent(10L, "test_ctl",
"test_db", "t2", "p1", "p1");
+ AlterTableEvent e11 = new AlterTableEvent(11L, "test_ctl", "test_db",
"t1", false, false);
+ CreateTableEvent e12 = new CreateTableEvent(12L, "test_ctl",
"test_db", "t1");
+ AlterDatabaseEvent e13 = new AlterDatabaseEvent(13L, "test_ctl",
"test_db", true);
+ AlterDatabaseEvent e14 = new AlterDatabaseEvent(14L, "test_ctl",
"test_db", false);
+
+ List<MetastoreEvent> mergedEvents;
+ List<MetastoreEvent> testEvents = Lists.newLinkedList();
+
+ testEvents.add(e1);
+ testEvents.add(e2);
+ mergedEvents = factory.createBatchEvents("test_ctl", testEvents);
+ Assertions.assertTrue(mergedEvents.size() == 1);
+ Assertions.assertTrue(mergedEvents.get(0).getEventId() == 2L);
+
+ testEvents.clear();
+ testEvents.add(e1);
+ testEvents.add(e2);
+ testEvents.add(e3);
+ testEvents.add(e9);
+ testEvents.add(e10);
+ testEvents.add(e4);
+ mergedEvents = factory.createBatchEvents("test_ctl", testEvents);
+ Assertions.assertTrue(mergedEvents.size() == 3);
+ Assertions.assertTrue(mergedEvents.get(0).getEventId() == 9L);
+ Assertions.assertTrue(mergedEvents.get(1).getEventId() == 10L);
+ Assertions.assertTrue(mergedEvents.get(2).getEventId() == 4L);
+
+ // because e5 is a rename event, it will not be merged
+ testEvents.clear();
+ testEvents.add(e1);
+ testEvents.add(e2);
+ testEvents.add(e10);
+ testEvents.add(e5);
+ testEvents.add(e4);
+ mergedEvents = factory.createBatchEvents("test_ctl", testEvents);
+ Assertions.assertTrue(mergedEvents.size() == 3);
+ Assertions.assertTrue(mergedEvents.get(0).getEventId() == 10L);
+ Assertions.assertTrue(mergedEvents.get(1).getEventId() == 5L);
+ Assertions.assertTrue(mergedEvents.get(2).getEventId() == 4L);
+
+ testEvents.clear();
+ testEvents.add(e1);
+ testEvents.add(e2);
+ testEvents.add(e10);
+ testEvents.add(e6);
+ testEvents.add(e4);
+ mergedEvents = factory.createBatchEvents("test_ctl", testEvents);
+ Assertions.assertTrue(mergedEvents.size() == 3);
+ Assertions.assertTrue(mergedEvents.get(0).getEventId() == 10L);
+ Assertions.assertTrue(mergedEvents.get(1).getEventId() == 6L);
+ Assertions.assertTrue(mergedEvents.get(2).getEventId() == 4L);
+
+ testEvents.clear();
+ testEvents.add(e1);
+ testEvents.add(e2);
+ testEvents.add(e10);
+ testEvents.add(e4);
+ testEvents.add(e11);
+ mergedEvents = factory.createBatchEvents("test_ctl", testEvents);
+ Assertions.assertTrue(mergedEvents.size() == 2);
+ Assertions.assertTrue(mergedEvents.get(0).getEventId() == 10L);
+ Assertions.assertTrue(mergedEvents.get(1).getEventId() == 11L);
+
+ testEvents.clear();
+ testEvents.add(e1);
+ testEvents.add(e2);
+ testEvents.add(e10);
+ testEvents.add(e4);
+ testEvents.add(e8);
+ mergedEvents = factory.createBatchEvents("test_ctl", testEvents);
+ Assertions.assertTrue(mergedEvents.size() == 2);
+ Assertions.assertTrue(mergedEvents.get(0).getEventId() == 10L);
+ Assertions.assertTrue(mergedEvents.get(1).getEventId() == 8L);
+
+ // because e5 is a rename event, it will not be merged
+ testEvents.clear();
+ testEvents.add(e1);
+ testEvents.add(e2);
+ testEvents.add(e10);
+ testEvents.add(e5);
+ testEvents.add(e8);
+ mergedEvents = factory.createBatchEvents("test_ctl", testEvents);
+ Assertions.assertTrue(mergedEvents.size() == 3);
+ Assertions.assertTrue(mergedEvents.get(0).getEventId() == 10L);
+ Assertions.assertTrue(mergedEvents.get(1).getEventId() == 5L);
+ Assertions.assertTrue(mergedEvents.get(2).getEventId() == 8L);
+
+ testEvents.clear();
+ testEvents.add(e1);
+ testEvents.add(e2);
+ testEvents.add(e10);
+ testEvents.add(e12);
+ testEvents.add(e4);
+ testEvents.add(e7);
+ mergedEvents = factory.createBatchEvents("test_ctl", testEvents);
+ Assertions.assertTrue(mergedEvents.size() == 2);
+ Assertions.assertTrue(mergedEvents.get(0).getEventId() == 10L);
+ Assertions.assertTrue(mergedEvents.get(1).getEventId() == 7L);
+
+ // because e5 is a rename event, it will not be merged
+ testEvents.clear();
+ testEvents.add(e1);
+ testEvents.add(e2);
+ testEvents.add(e10);
+ testEvents.add(e5);
+ testEvents.add(e7);
+ mergedEvents = factory.createBatchEvents("test_ctl", testEvents);
+ Assertions.assertTrue(mergedEvents.size() == 3);
+ Assertions.assertTrue(mergedEvents.get(0).getEventId() == 10L);
+ Assertions.assertTrue(mergedEvents.get(1).getEventId() == 5L);
+ Assertions.assertTrue(mergedEvents.get(2).getEventId() == 7L);
+
+ testEvents.clear();
+ testEvents.add(e1);
+ testEvents.add(e2);
+ testEvents.add(e10);
+ testEvents.add(e4);
+ testEvents.add(e13);
+ testEvents.add(e7);
+ mergedEvents = factory.createBatchEvents("test_ctl", testEvents);
+ Assertions.assertTrue(mergedEvents.size() == 4);
+ Assertions.assertTrue(mergedEvents.get(0).getEventId() == 10L);
+ Assertions.assertTrue(mergedEvents.get(1).getEventId() == 4L);
+ Assertions.assertTrue(mergedEvents.get(2).getEventId() == 13L);
+ Assertions.assertTrue(mergedEvents.get(3).getEventId() == 7L);
+
+ testEvents.clear();
+ testEvents.add(e1);
+ testEvents.add(e2);
+ testEvents.add(e10);
+ testEvents.add(e4);
+ testEvents.add(e14);
+ testEvents.add(e7);
+ mergedEvents = factory.createBatchEvents("test_ctl", testEvents);
+ Assertions.assertTrue(mergedEvents.size() == 3);
+ Assertions.assertTrue(mergedEvents.get(0).getEventId() == 10L);
+ Assertions.assertTrue(mergedEvents.get(1).getEventId() == 14L);
+ Assertions.assertTrue(mergedEvents.get(2).getEventId() == 7L);
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]