This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch branch-1.2-lts in repository https://gitbox.apache.org/repos/asf/doris.git
commit 394a763d843114da2fd4b43d1839542b92e1d65a Author: Xiangyu Wang <[email protected]> AuthorDate: Mon Mar 13 09:47:28 2023 +0800 [Enhancement](multi-catalog) support hms event deserialization for HDP/CDH Hive versions. (#17660) Some HDP/CDH Hive versions use gzip to compress the message body of hms NotificationEvent, so com.qihoo.finance.hms.event.MetastoreEventFactory can not transfer it rightly. --- .../datasource/hive/event/AddPartitionEvent.java | 2 +- .../datasource/hive/event/AlterPartitionEvent.java | 2 +- .../datasource/hive/event/AlterTableEvent.java | 2 +- .../datasource/hive/event/CreateTableEvent.java | 3 +- .../datasource/hive/event/DropPartitionEvent.java | 2 +- .../datasource/hive/event/DropTableEvent.java | 2 +- .../hive/event/GzipJSONMessageDeserializer.java | 172 +++++++++++++++++++++ .../hive/event/MetastoreEventsProcessor.java | 14 +- .../event/GzipJSONMessageDeserializerTest.java | 63 ++++++++ 9 files changed, 252 insertions(+), 10 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/AddPartitionEvent.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/AddPartitionEvent.java index 004efa3d15..e5f8d1bb71 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/AddPartitionEvent.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/AddPartitionEvent.java @@ -49,7 +49,7 @@ public class AddPartitionEvent extends MetastoreTableEvent { .checkNotNull(event.getMessage(), debugString("Event message is null")); try { AddPartitionMessage addPartitionMessage = - MetastoreEventsProcessor.getMessageDeserializer() + MetastoreEventsProcessor.getMessageDeserializer(event.getMessageFormat()) .getAddPartitionMessage(event.getMessage()); hmsTbl = Preconditions.checkNotNull(addPartitionMessage.getTableObj()); Iterable<Partition> addedPartitions = addPartitionMessage.getPartitionObjs(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/AlterPartitionEvent.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/AlterPartitionEvent.java index b2ba44f842..5209edccab 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/AlterPartitionEvent.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/AlterPartitionEvent.java @@ -52,7 +52,7 @@ public class AlterPartitionEvent extends MetastoreTableEvent { .checkNotNull(event.getMessage(), debugString("Event message is null")); try { AlterPartitionMessage alterPartitionMessage = - MetastoreEventsProcessor.getMessageDeserializer() + MetastoreEventsProcessor.getMessageDeserializer(event.getMessageFormat()) .getAlterPartitionMessage(event.getMessage()); hmsTbl = Preconditions.checkNotNull(alterPartitionMessage.getTableObj()); partitionBefore = Preconditions.checkNotNull(alterPartitionMessage.getPtnObjBefore()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/AlterTableEvent.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/AlterTableEvent.java index b14a1c6577..0dea5e8968 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/AlterTableEvent.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/AlterTableEvent.java @@ -48,7 +48,7 @@ public class AlterTableEvent extends MetastoreTableEvent { .checkNotNull(event.getMessage(), debugString("Event message is null")); try { JSONAlterTableMessage alterTableMessage = - (JSONAlterTableMessage) MetastoreEventsProcessor.getMessageDeserializer() + (JSONAlterTableMessage) MetastoreEventsProcessor.getMessageDeserializer(event.getMessageFormat()) .getAlterTableMessage(event.getMessage()); tableAfter = Preconditions.checkNotNull(alterTableMessage.getTableObjAfter()); tableBefore = Preconditions.checkNotNull(alterTableMessage.getTableObjBefore()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/CreateTableEvent.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/CreateTableEvent.java index e4e8e8bb4b..2d364c4c14 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/CreateTableEvent.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/CreateTableEvent.java @@ -42,7 +42,8 @@ public class CreateTableEvent extends MetastoreTableEvent { .checkNotNull(event.getMessage(), debugString("Event message is null")); try { CreateTableMessage createTableMessage = - MetastoreEventsProcessor.getMessageDeserializer().getCreateTableMessage(event.getMessage()); + MetastoreEventsProcessor.getMessageDeserializer(event.getMessageFormat()) + .getCreateTableMessage(event.getMessage()); hmsTbl = Preconditions.checkNotNull(createTableMessage.getTableObj()); } catch (Exception e) { throw new MetastoreNotificationException( diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/DropPartitionEvent.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/DropPartitionEvent.java index cd5ed5bfbd..1dce403f55 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/DropPartitionEvent.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/DropPartitionEvent.java @@ -48,7 +48,7 @@ public class DropPartitionEvent extends MetastoreTableEvent { .checkNotNull(event.getMessage(), debugString("Event message is null")); try { DropPartitionMessage dropPartitionMessage = - MetastoreEventsProcessor.getMessageDeserializer() + MetastoreEventsProcessor.getMessageDeserializer(event.getMessageFormat()) .getDropPartitionMessage(event.getMessage()); hmsTbl = Preconditions.checkNotNull(dropPartitionMessage.getTableObj()); List<Map<String, String>> droppedPartitions = dropPartitionMessage.getPartitions(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/DropTableEvent.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/DropTableEvent.java index 2b84c1bb42..aa74c67512 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/DropTableEvent.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/DropTableEvent.java @@ -42,7 +42,7 @@ public class DropTableEvent extends MetastoreTableEvent { .checkNotNull(event.getMessage(), debugString("Event message is null")); try { JSONDropTableMessage dropTableMessage = - (JSONDropTableMessage) MetastoreEventsProcessor.getMessageDeserializer() + (JSONDropTableMessage) MetastoreEventsProcessor.getMessageDeserializer(event.getMessageFormat()) .getDropTableMessage(event.getMessage()); tableName = dropTableMessage.getTable(); } catch (Exception e) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/GzipJSONMessageDeserializer.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/GzipJSONMessageDeserializer.java new file mode 100644 index 0000000000..0606f4e44d --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/GzipJSONMessageDeserializer.java @@ -0,0 +1,172 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.hive.event; + +import org.apache.commons.io.IOUtils; +import org.apache.hadoop.hive.metastore.messaging.AbortTxnMessage; +import org.apache.hadoop.hive.metastore.messaging.AddForeignKeyMessage; +import org.apache.hadoop.hive.metastore.messaging.AddNotNullConstraintMessage; +import org.apache.hadoop.hive.metastore.messaging.AddPartitionMessage; +import org.apache.hadoop.hive.metastore.messaging.AddPrimaryKeyMessage; +import org.apache.hadoop.hive.metastore.messaging.AddUniqueConstraintMessage; +import org.apache.hadoop.hive.metastore.messaging.AllocWriteIdMessage; +import org.apache.hadoop.hive.metastore.messaging.AlterDatabaseMessage; +import org.apache.hadoop.hive.metastore.messaging.AlterPartitionMessage; +import org.apache.hadoop.hive.metastore.messaging.AlterTableMessage; +import org.apache.hadoop.hive.metastore.messaging.CommitTxnMessage; +import org.apache.hadoop.hive.metastore.messaging.CreateDatabaseMessage; +import org.apache.hadoop.hive.metastore.messaging.CreateFunctionMessage; +import org.apache.hadoop.hive.metastore.messaging.CreateTableMessage; +import org.apache.hadoop.hive.metastore.messaging.DropConstraintMessage; +import org.apache.hadoop.hive.metastore.messaging.DropDatabaseMessage; +import org.apache.hadoop.hive.metastore.messaging.DropFunctionMessage; +import org.apache.hadoop.hive.metastore.messaging.DropPartitionMessage; +import org.apache.hadoop.hive.metastore.messaging.DropTableMessage; +import org.apache.hadoop.hive.metastore.messaging.InsertMessage; +import org.apache.hadoop.hive.metastore.messaging.OpenTxnMessage; +import org.apache.hadoop.hive.metastore.messaging.json.JSONMessageDeserializer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.ByteArrayInputStream; +import java.nio.charset.StandardCharsets; +import java.util.Base64; +import java.util.zip.GZIPInputStream; + +public class GzipJSONMessageDeserializer extends JSONMessageDeserializer { + private static final Logger LOG = LoggerFactory.getLogger(GzipJSONMessageDeserializer.class.getName()); + + public GzipJSONMessageDeserializer() { + } + + private static String deCompress(String messageBody) { + try { + byte[] decodedBytes = Base64.getDecoder().decode(messageBody.getBytes(StandardCharsets.UTF_8)); + String body; + ByteArrayInputStream in = new ByteArrayInputStream(decodedBytes); + try { + GZIPInputStream is = new GZIPInputStream(in); + try { + byte[] bytes = IOUtils.toByteArray(is); + body = new String(bytes, StandardCharsets.UTF_8); + } finally { + try { + is.close(); + } catch (Throwable ignore) { + LOG.warn("close GZIPInputStream failed", ignore); + } + } + } finally { + try { + in.close(); + } catch (Throwable ignore) { + LOG.warn("close ByteArrayInputStream failed", ignore); + } + } + return body; + } catch (Exception e) { + LOG.error("cannot decode the stream", e); + throw new RuntimeException("cannot decode the stream ", e); + } + } + + public CreateDatabaseMessage getCreateDatabaseMessage(String messageBody) { + return super.getCreateDatabaseMessage(deCompress(messageBody)); + } + + public AlterDatabaseMessage getAlterDatabaseMessage(String messageBody) { + return super.getAlterDatabaseMessage(deCompress(messageBody)); + } + + public DropDatabaseMessage getDropDatabaseMessage(String messageBody) { + return super.getDropDatabaseMessage(deCompress(messageBody)); + } + + public CreateTableMessage getCreateTableMessage(String messageBody) { + return super.getCreateTableMessage(deCompress(messageBody)); + } + + public AlterTableMessage getAlterTableMessage(String messageBody) { + return super.getAlterTableMessage(deCompress(messageBody)); + } + + public DropTableMessage getDropTableMessage(String messageBody) { + return super.getDropTableMessage(deCompress(messageBody)); + } + + public AddPartitionMessage getAddPartitionMessage(String messageBody) { + return super.getAddPartitionMessage(deCompress(messageBody)); + } + + public AlterPartitionMessage getAlterPartitionMessage(String messageBody) { + return super.getAlterPartitionMessage(deCompress(messageBody)); + } + + public DropPartitionMessage getDropPartitionMessage(String messageBody) { + return super.getDropPartitionMessage(deCompress(messageBody)); + } + + public CreateFunctionMessage getCreateFunctionMessage(String messageBody) { + return super.getCreateFunctionMessage(deCompress(messageBody)); + } + + public DropFunctionMessage getDropFunctionMessage(String messageBody) { + return super.getDropFunctionMessage(deCompress(messageBody)); + } + + public InsertMessage getInsertMessage(String messageBody) { + return super.getInsertMessage(deCompress(messageBody)); + } + + public AddPrimaryKeyMessage getAddPrimaryKeyMessage(String messageBody) { + return super.getAddPrimaryKeyMessage(deCompress(messageBody)); + } + + public AddForeignKeyMessage getAddForeignKeyMessage(String messageBody) { + return super.getAddForeignKeyMessage(deCompress(messageBody)); + } + + public AddUniqueConstraintMessage getAddUniqueConstraintMessage(String messageBody) { + return super.getAddUniqueConstraintMessage(deCompress(messageBody)); + } + + public AddNotNullConstraintMessage getAddNotNullConstraintMessage(String messageBody) { + return super.getAddNotNullConstraintMessage(deCompress(messageBody)); + } + + public DropConstraintMessage getDropConstraintMessage(String messageBody) { + return super.getDropConstraintMessage(deCompress(messageBody)); + } + + public OpenTxnMessage getOpenTxnMessage(String messageBody) { + return super.getOpenTxnMessage(deCompress(messageBody)); + } + + public CommitTxnMessage getCommitTxnMessage(String messageBody) { + return super.getCommitTxnMessage(deCompress(messageBody)); + } + + public AbortTxnMessage getAbortTxnMessage(String messageBody) { + return super.getAbortTxnMessage(deCompress(messageBody)); + } + + public AllocWriteIdMessage getAllocWriteIdMessage(String messageBody) { + return super.getAllocWriteIdMessage(deCompress(messageBody)); + } + +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreEventsProcessor.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreEventsProcessor.java index 1ff3bd98b2..ad89bb2fbe 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreEventsProcessor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/MetastoreEventsProcessor.java @@ -57,8 +57,11 @@ public class MetastoreEventsProcessor extends MasterDaemon { "hive.metastore.notifications.add.thrift.objects"; // for deserializing from JSON strings from metastore event - private static final MessageDeserializer MESSAGE_DESERIALIZER = new JSONMessageDeserializer(); - + private static final MessageDeserializer JSON_MESSAGE_DESERIALIZER = new JSONMessageDeserializer(); + // for deserializing from GZIP JSON strings from metastore event + // (some HDP Hive and CDH Hive versions use this format) + private static final MessageDeserializer GZIP_JSON_MESSAGE_DESERIALIZER = new GzipJSONMessageDeserializer(); + private static final String GZIP_JSON_FORMAT_PREFIX = "gzip"; // event factory which is used to get or create MetastoreEvents private final MetastoreEventFactory metastoreEventFactory; @@ -145,7 +148,10 @@ public class MetastoreEventsProcessor extends MasterDaemon { } } - public static MessageDeserializer getMessageDeserializer() { - return MESSAGE_DESERIALIZER; + public static MessageDeserializer getMessageDeserializer(String messageFormat) { + if (messageFormat != null && messageFormat.startsWith(GZIP_JSON_FORMAT_PREFIX)) { + return GZIP_JSON_MESSAGE_DESERIALIZER; + } + return JSON_MESSAGE_DESERIALIZER; } } diff --git a/fe/fe-core/src/test/java/org/apache/doris/datasource/hive/event/GzipJSONMessageDeserializerTest.java b/fe/fe-core/src/test/java/org/apache/doris/datasource/hive/event/GzipJSONMessageDeserializerTest.java new file mode 100644 index 0000000000..7a3bc29d32 --- /dev/null +++ b/fe/fe-core/src/test/java/org/apache/doris/datasource/hive/event/GzipJSONMessageDeserializerTest.java @@ -0,0 +1,63 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.hive.event; + +import org.apache.hadoop.hive.metastore.messaging.AlterPartitionMessage; +import org.apache.hadoop.hive.metastore.messaging.EventMessage; +import org.apache.hadoop.hive.metastore.messaging.MessageDeserializer; +import org.junit.Assert; +import org.junit.Test; + +public class GzipJSONMessageDeserializerTest { + + @Test + public void testGetAlterPartitionMessage() { + // copy from an online HDP Hive cluster, + // eventType: ALTER_PARTITION, messageFormat: gzip(json-2.0) + String messageBody = "H4sIAAAAAAAAAO1WW2sbRxT+L/usjHa1ullQqOMkbUpSBWwKJVuW0e7IGntvmZlVcIwhT8UlfuhDiimUGk" + + "NKk5fQQkmD3dI/Y8nuv+iZmZWtXa+VKNSUgh7sHc05cy5z5jvf2TY4YUPCjI4hBoz2RadaTVqW3WwP0SANNlFvI/TRIzqIY" + + "xQR0Vky27ZRUYeoRx4wGnk0wQEcH9AhqbqfdlfXPr735coKWuneB0W/B6I+jVzsczeOAhoR2BW4FxAQyM3ETd0eGeAhjZnL" + + "iBcz3wXTLhdYUC6oByo+nRxa20rkwfvLny9/cvuWu7Z8897tiazb2/iMxxGItx3DcowOfLhgsHDm8OQYOxXHqOWP5zPQKnZ" + + "eReZ/nrnWqGsNaktrVrPVqNVNq2lLUWNaZMqd5qWdlt6BSPUiSyngAhYPM0GlUbmcbZacoCEpTQe+NFovTeP0+cF499uz17" + + "+M/vxuvP/m7/3fQG2nxMfUrfl4a243xwdnrw/H+z+Nfzi4wgEZkki4HhZkPWZzOzg5enZy/Obk7dPTo59Hu1+f/no8042Ad" + + "/VhLsDy6MdnVxj3scDuEAdpufEeXaeRmHFBo6d/SMtflZwd+H0OYLWb5gaNWBytB1ueV00BmVUFxceYkUEMv6v5x4v8XnVe" + + "NBRii9k6wgn2BgQNsB/HCZIe0aMA0RjFzENd5t2NklTciVmIRQ4M89jopqJgJION6Jei5oZVBpva/I5XCfPzKA9xokGnzFQ" + + "mX0vWHK6c4oA+gXuLI9TPwu2oxwCl25Fm2gXs6uOmKuxSKa61zDJn+jcr2/pxWNY7msW0w1pBphaZrJivluX9qYysWq4WpV" + + "nqTKwSYCQf0DPGe9+MXrwCMI/2js5RsTTzfpZUfRLMNhGHCvM4ZR7hiEP1Q4yiNHyAmViJAz6pV2WGOuwLZCrNbceBzGXPc" + + "ORvR/lLPSFXFfmvT0ngcyV9qJQjHF4oT/fnyYmiNXkLE1mUBoFkOCUHR0TthkRg2WDUrnLixWEI3ezcTEkzd7JOVQyp0M6v" + + "Narp3n9VPIXuf53xlFDF7KgmQVxzRBmzXBXLFLeUxhLE/1qtJA1dFUZy3c8lB3tHA18B1WMEnsdkvgtp9G74AtQzBOsGBPq" + + "C4YhTWdYAc3HLD9bU1CTbwfnEphR7qbdJBGTkwrzModUrpZoOJYtZtanvj07+Ojx9/nK8+/vZ4d7J26PsEl+8Ojt8OTvGrC" + + "OVtyOdL8p5RzaSfxaqI/OGbTUU40x150nvzA3Murtal8jUahVGDCDGTLc9zbOKZq2s89JmXRMAjOGUr7E0kqDtJnLuxwFX4" + + "z7cKPSXMFFTcLtu11v1RsV4zKggd304XDE2ydYX8ilzsKlqA8drZs02LduCCAxZPCr5FYb8mwQ4llwa9QtUZ0luySw4RtkE" + + "9R5D/byDUsnEb7Vbsyf+xXy/mO//s/m+KrH20QVOFtP+Ytp/32m/NTNYW17WTHLVRKDJVcQCB6v0iRYuNTT3AR3eoQHhU/d" + + "7cYMlLGVepqRp4ljuw7iw4I0Fbyx4Y8EbC974f/NG4zp5Y+cflz5IfxgZAAA="; + MessageDeserializer messageDeserializer = MetastoreEventsProcessor.getMessageDeserializer("gzip(json-2.0)"); + Assert.assertTrue(messageDeserializer instanceof GzipJSONMessageDeserializer); + + try { + AlterPartitionMessage alterPartitionMessage = messageDeserializer.getAlterPartitionMessage(messageBody); + Assert.assertTrue(alterPartitionMessage != null); + Assert.assertTrue(alterPartitionMessage.getEventType() == EventMessage.EventType.ALTER_PARTITION); + Assert.assertTrue(alterPartitionMessage.getTableObj() != null); + } catch (Exception e) { + e.printStackTrace(); + Assert.assertTrue(false); + } + } + +} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
